I'm using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen to run the code, collect the output from stdout and stderr into a subprocess.PIPE --- then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.
是否有一种方法既存储输出(用于日志记录和错误检查),又产生实时流输出?
我的代码的相关部分:
ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
print "RUN failed\n\n%s\n\n" % (errors)
success = False
if( errors ): log_file.write("\n\n%s\n\n" % errors)
最初,我将run_command通过tee输送,以便将副本直接发送到日志文件,流仍然直接输出到终端——但这样我就不能存储任何错误(据我所知)。
目前我的临时解决方案是:
ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
log_file.flush()
然后,在另一个终端上运行tail -f log.txt (s.t. log_file = 'log.txt')。
我找到了一个解决复杂问题的简单方法。
stdout和stderr都需要流化。
它们都需要是非阻塞的:当没有输出时,当有太多输出时。
不想使用Threading或multiprocessing,也不愿意使用pexpect。
这个解决方案使用了我在这里找到的一个要点
import subprocess as sbp
import fcntl
import os
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.readline()
except:
return ""
with sbp.Popen('find / -name fdsfjdlsjf',
shell=True,
universal_newlines=True,
encoding='utf-8',
bufsize=1,
stdout=sbp.PIPE,
stderr=sbp.PIPE) as p:
while True:
out = non_block_read(p.stdout)
err = non_block_read(p.stderr)
if out:
print(out, end='')
if err:
print('E: ' + err, end='')
if p.poll() is not None:
break
在我看来,“来自subprocess命令的实时输出”意味着stdout和stderr都应该是实时的。stdin也应该被传递给子进程。
下面的片段在stdout和stderr上生成实时输出,并在outcome.{stdout,stderr}中将它们作为字节捕获。
窍门包括正确使用select和poll。
在Python 3.9上运行良好。
if self.log == 1:
print(f"** cmnd= {fullCmndStr}")
self.outcome.stdcmnd = fullCmndStr
try:
process = subprocess.Popen(
fullCmndStr,
shell=True,
encoding='utf8',
executable="/bin/bash",
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except OSError:
self.outcome.error = OSError
else:
process.stdin.write(stdin)
process.stdin.close() # type: ignore
stdoutStrFile = io.StringIO("")
stderrStrFile = io.StringIO("")
pollStdout = select.poll()
pollStderr = select.poll()
pollStdout.register(process.stdout, select.POLLIN)
pollStderr.register(process.stderr, select.POLLIN)
stdoutEOF = False
stderrEOF = False
while True:
stdoutActivity = pollStdout.poll(0)
if stdoutActivity:
c= process.stdout.read(1)
if c:
stdoutStrFile.write(c)
if self.log == 1:
sys.stdout.write(c)
else:
stdoutEOF = True
stderrActivity = pollStderr.poll(0)
if stderrActivity:
c= process.stderr.read(1)
if c:
stderrStrFile.write(c)
if self.log == 1:
sys.stderr.write(c)
else:
stderrEOF = True
if stdoutEOF and stderrEOF:
break
if self.log == 1:
print(f"** cmnd={fullCmndStr}")
process.wait() # type: ignore
self.outcome.stdout = stdoutStrFile.getvalue()
self.outcome.stderr = stderrStrFile.getvalue()
self.outcome.error = process.returncode # type: ignore
Here is a class which I'm using in one of my projects. It redirects output of a subprocess to the log. At first I tried simply overwriting the write-method but that doesn't work as the subprocess will never call it (redirection happens on filedescriptor level). So I'm using my own pipe, similar to how it's done in the subprocess-module. This has the advantage of encapsulating all logging/printing logic in the adapter and you can simply pass instances of the logger to Popen: subprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))
class LogAdapter(threading.Thread):
def __init__(self, logname, level = logging.INFO):
super().__init__()
self.log = logging.getLogger(logname)
self.readpipe, self.writepipe = os.pipe()
logFunctions = {
logging.DEBUG: self.log.debug,
logging.INFO: self.log.info,
logging.WARN: self.log.warn,
logging.ERROR: self.log.warn,
}
try:
self.logFunction = logFunctions[level]
except KeyError:
self.logFunction = self.log.info
def fileno(self):
#when fileno is called this indicates the subprocess is about to fork => start thread
self.start()
return self.writepipe
def finished(self):
"""If the write-filedescriptor is not closed this thread will
prevent the whole program from exiting. You can use this method
to clean up after the subprocess has terminated."""
os.close(self.writepipe)
def run(self):
inputFile = os.fdopen(self.readpipe)
while True:
line = inputFile.readline()
if len(line) == 0:
#no new data was added
break
self.logFunction(line.strip())
如果您不需要日志记录,而只是想使用print(),显然可以删除大部分代码并使类更短。你也可以通过__enter__和__exit__方法来扩展它,并在__exit__中调用finished,这样你就可以很容易地将它用作上下文。