I'm using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen to run the code, collect the output from stdout and stderr into a subprocess.PIPE --- then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.
是否有一种方法既存储输出(用于日志记录和错误检查),又产生实时流输出?
我的代码的相关部分:
ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
print "RUN failed\n\n%s\n\n" % (errors)
success = False
if( errors ): log_file.write("\n\n%s\n\n" % errors)
最初,我将run_command通过tee输送,以便将副本直接发送到日志文件,流仍然直接输出到终端——但这样我就不能存储任何错误(据我所知)。
目前我的临时解决方案是:
ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
log_file.flush()
然后,在另一个终端上运行tail -f log.txt (s.t. log_file = 'log.txt')。
解决方案1:实时并发记录stdout和stderr
一种简单的解决方案,可以将stdout和stderr同时记录,逐行实时地记录到日志文件中。
import subprocess as sp
from concurrent.futures import ThreadPoolExecutor
def log_popen_pipe(p, stdfile):
with open("mylog.txt", "w") as f:
while p.poll() is None:
f.write(stdfile.readline())
f.flush()
# Write the rest from the buffer
f.write(stdfile.read())
with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
with ThreadPoolExecutor(2) as pool:
r1 = pool.submit(log_popen_pipe, p, p.stdout)
r2 = pool.submit(log_popen_pipe, p, p.stderr)
r1.result()
r2.result()
解决方案2:函数read_popen_pipes(),它允许同时实时遍历两个管道(stdout/stderr)
import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
def enqueue_output(file, queue):
for line in iter(file.readline, ''):
queue.put(line)
file.close()
def read_popen_pipes(p):
with ThreadPoolExecutor(2) as pool:
q_stdout, q_stderr = Queue(), Queue()
pool.submit(enqueue_output, p.stdout, q_stdout)
pool.submit(enqueue_output, p.stderr, q_stderr)
while True:
if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
break
out_line = err_line = ''
try:
out_line = q_stdout.get_nowait()
err_line = q_stderr.get_nowait()
except Empty:
pass
yield (out_line, err_line)
# The function in use:
with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
for out_line, err_line in read_popen_pipes(p):
print(out_line, end='')
print(err_line, end='')
p.poll()
一个好的但“重量级”的解决方案是使用Twisted -见底部。
如果你愿意只使用stdout,那么下面这些代码应该可以工作:
import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
stdoutdata = popenobj.stdout.readline()
if stdoutdata:
sys.stdout.write(stdoutdata)
else:
break
print "Return code", popenobj.returncode
(如果你使用read(),它会尝试读取整个“文件”,这是没有用的,我们真正可以在这里使用的是读取管道中当前所有数据的东西)
你也可以尝试用线程来解决这个问题,例如:
import subprocess
import sys
import threading
popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)
def stdoutprocess(o):
while True:
stdoutdata = o.stdout.readline()
if stdoutdata:
sys.stdout.write(stdoutdata)
else:
break
t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode
现在我们可以通过两个线程来添加stderr。
但是请注意,子进程文档不鼓励直接使用这些文件,并建议使用communication()(主要涉及死锁,我认为这不是上面的问题),解决方案有点笨,所以看起来子进程模块不太适合这项工作(也请参阅:http://www.python.org/dev/peps/pep-3145/),我们需要看看其他东西。
一个更复杂的解决方案是使用Twisted,如下所示:https://twistedmatrix.com/documents/11.1.0/core/howto/process.html
Twisted的方法是使用reactor.spawnprocess()创建进程,并提供一个ProcessProtocol,然后异步处理输出。Twisted示例Python代码在这里:https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py
在我看来,“来自subprocess命令的实时输出”意味着stdout和stderr都应该是实时的。stdin也应该被传递给子进程。
下面的片段在stdout和stderr上生成实时输出,并在outcome.{stdout,stderr}中将它们作为字节捕获。
窍门包括正确使用select和poll。
在Python 3.9上运行良好。
if self.log == 1:
print(f"** cmnd= {fullCmndStr}")
self.outcome.stdcmnd = fullCmndStr
try:
process = subprocess.Popen(
fullCmndStr,
shell=True,
encoding='utf8',
executable="/bin/bash",
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except OSError:
self.outcome.error = OSError
else:
process.stdin.write(stdin)
process.stdin.close() # type: ignore
stdoutStrFile = io.StringIO("")
stderrStrFile = io.StringIO("")
pollStdout = select.poll()
pollStderr = select.poll()
pollStdout.register(process.stdout, select.POLLIN)
pollStderr.register(process.stderr, select.POLLIN)
stdoutEOF = False
stderrEOF = False
while True:
stdoutActivity = pollStdout.poll(0)
if stdoutActivity:
c= process.stdout.read(1)
if c:
stdoutStrFile.write(c)
if self.log == 1:
sys.stdout.write(c)
else:
stdoutEOF = True
stderrActivity = pollStderr.poll(0)
if stderrActivity:
c= process.stderr.read(1)
if c:
stderrStrFile.write(c)
if self.log == 1:
sys.stderr.write(c)
else:
stderrEOF = True
if stdoutEOF and stderrEOF:
break
if self.log == 1:
print(f"** cmnd={fullCmndStr}")
process.wait() # type: ignore
self.outcome.stdout = stdoutStrFile.getvalue()
self.outcome.stderr = stderrStrFile.getvalue()
self.outcome.error = process.returncode # type: ignore
Python 3的TLDR:
import subprocess
import sys
with open("test.log", "wb") as f:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for c in iter(lambda: process.stdout.read(1), b""):
sys.stdout.buffer.write(c)
f.buffer.write(c)
你有两种方法来做到这一点,要么从read或readline函数创建一个迭代器,然后做:
import subprocess
import sys
# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
# replace "" with b'' for Python 3
for c in iter(lambda: process.stdout.read(1), ""):
sys.stdout.write(c)
f.write(c)
or
import subprocess
import sys
# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
# replace "" with b"" for Python 3
for line in iter(process.stdout.readline, ""):
sys.stdout.write(line)
f.write(line)
或者您可以创建一个读取器和一个写入器文件。将写入器传递给Popen并从读取器读取
import io
import time
import subprocess
import sys
filename = "test.log"
with io.open(filename, "wb") as writer, io.open(filename, "rb", 1) as reader:
process = subprocess.Popen(command, stdout=writer)
while process.poll() is None:
sys.stdout.write(reader.read())
time.sleep(0.5)
# Read the remaining
sys.stdout.write(reader.read())
通过这种方式,您可以将数据写入test.log和标准输出中。
文件方法的唯一优点是代码不会阻塞。因此,您可以在此期间做任何您想做的事情,并以无阻塞的方式随时从阅读器读取。当您使用PIPE时,read和readline函数将阻塞,直到分别将一个字符写入管道或将一行字符写入管道。