I'm using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen to run the code, collect the output from stdout and stderr into a subprocess.PIPE --- then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.

是否有一种方法既存储输出(用于日志记录和错误检查),又产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初,我将run_command通过tee输送,以便将副本直接发送到日志文件,流仍然直接输出到终端——但这样我就不能存储任何错误(据我所知)。


目前我的临时解决方案是:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端上运行tail -f log.txt (s.t. log_file = 'log.txt')。


当前回答

解决方案1:实时并发记录stdout和stderr

一种简单的解决方案,可以将stdout和stderr同时记录,逐行实时地记录到日志文件中。

import subprocess as sp
from concurrent.futures import ThreadPoolExecutor


def log_popen_pipe(p, stdfile):

    with open("mylog.txt", "w") as f:

        while p.poll() is None:
            f.write(stdfile.readline())
            f.flush()

        # Write the rest from the buffer
        f.write(stdfile.read())


with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    with ThreadPoolExecutor(2) as pool:
        r1 = pool.submit(log_popen_pipe, p, p.stdout)
        r2 = pool.submit(log_popen_pipe, p, p.stderr)
        r1.result()
        r2.result()

解决方案2:函数read_popen_pipes(),它允许同时实时遍历两个管道(stdout/stderr)

import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

# The function in use:

with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):
        print(out_line, end='')
        print(err_line, end='')

    p.poll()

其他回答

为什么不直接将stdout设置为sys.stdout?如果你也需要输出到日志,那么你可以简单地重写f的write方法。

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

一个好的但“重量级”的解决方案是使用Twisted -见底部。

如果你愿意只使用stdout,那么下面这些代码应该可以工作:

import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
   stdoutdata = popenobj.stdout.readline()
   if stdoutdata:
      sys.stdout.write(stdoutdata)
   else:
      break
print "Return code", popenobj.returncode

(如果你使用read(),它会尝试读取整个“文件”,这是没有用的,我们真正可以在这里使用的是读取管道中当前所有数据的东西)

你也可以尝试用线程来解决这个问题,例如:

import subprocess
import sys
import threading

popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)

def stdoutprocess(o):
   while True:
      stdoutdata = o.stdout.readline()
      if stdoutdata:
         sys.stdout.write(stdoutdata)
      else:
         break

t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode

现在我们可以通过两个线程来添加stderr。

但是请注意,子进程文档不鼓励直接使用这些文件,并建议使用communication()(主要涉及死锁,我认为这不是上面的问题),解决方案有点笨,所以看起来子进程模块不太适合这项工作(也请参阅:http://www.python.org/dev/peps/pep-3145/),我们需要看看其他东西。

一个更复杂的解决方案是使用Twisted,如下所示:https://twistedmatrix.com/documents/11.1.0/core/howto/process.html

Twisted的方法是使用reactor.spawnprocess()创建进程,并提供一个ProcessProtocol,然后异步处理输出。Twisted示例Python代码在这里:https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py

在我看来,“来自subprocess命令的实时输出”意味着stdout和stderr都应该是实时的。stdin也应该被传递给子进程。

下面的片段在stdout和stderr上生成实时输出,并在outcome.{stdout,stderr}中将它们作为字节捕获。

窍门包括正确使用select和poll。

在Python 3.9上运行良好。


        if self.log == 1:
            print(f"** cmnd= {fullCmndStr}")

        self.outcome.stdcmnd = fullCmndStr
        try:
            process = subprocess.Popen(
                fullCmndStr,
                shell=True,
                encoding='utf8',
                executable="/bin/bash",
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
        except OSError:
            self.outcome.error = OSError
        else:
            process.stdin.write(stdin)
            process.stdin.close() # type: ignore

        stdoutStrFile = io.StringIO("")
        stderrStrFile = io.StringIO("")

        pollStdout = select.poll()
        pollStderr = select.poll()

        pollStdout.register(process.stdout, select.POLLIN)
        pollStderr.register(process.stderr, select.POLLIN)

        stdoutEOF = False
        stderrEOF = False

        while True:
            stdoutActivity = pollStdout.poll(0)
            if stdoutActivity:
                c= process.stdout.read(1)
                if c:
                    stdoutStrFile.write(c)
                    if self.log == 1:
                        sys.stdout.write(c)
                else:
                   stdoutEOF = True

            stderrActivity = pollStderr.poll(0)
            if stderrActivity:
                c= process.stderr.read(1)
                if c:
                    stderrStrFile.write(c)
                    if self.log == 1:
                        sys.stderr.write(c)
                else:
                   stderrEOF = True
            if stdoutEOF and stderrEOF:
                break

        if self.log == 1:
            print(f"** cmnd={fullCmndStr}")

        process.wait() # type: ignore

        self.outcome.stdout = stdoutStrFile.getvalue()
        self.outcome.stderr = stderrStrFile.getvalue()
        self.outcome.error = process.returncode # type: ignore
import os

def execute(cmd, callback):
    for line in iter(os.popen(cmd).readline, ''): 
            callback(line[:-1])

execute('ls -a', print)

我找到了一个解决复杂问题的简单方法。

stdout和stderr都需要流化。 它们都需要是非阻塞的:当没有输出时,当有太多输出时。 不想使用Threading或multiprocessing,也不愿意使用pexpect。

这个解决方案使用了我在这里找到的一个要点

import subprocess as sbp
import fcntl
import os

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.readline()
    except:
        return ""

with sbp.Popen('find / -name fdsfjdlsjf',
                shell=True,
                universal_newlines=True,
                encoding='utf-8',
                bufsize=1,
                stdout=sbp.PIPE,
                stderr=sbp.PIPE) as p:
    while True:
        out = non_block_read(p.stdout)
        err = non_block_read(p.stderr)
        if out:
            print(out, end='')
        if err:
            print('E: ' + err, end='')
        if p.poll() is not None:
            break