I'm using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen to run the code, collect the output from stdout and stderr into a subprocess.PIPE --- then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.

是否有一种方法既存储输出(用于日志记录和错误检查),又产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初,我将run_command通过tee输送,以便将副本直接发送到日志文件,流仍然直接输出到终端——但这样我就不能存储任何错误(据我所知)。


目前我的临时解决方案是:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端上运行tail -f log.txt (s.t. log_file = 'log.txt')。


当前回答

我尝试过的所有上述解决方案都无法分离stderr和stdout输出(多个管道),或者当操作系统管道缓冲区已满时永远阻塞,这发生在你运行输出太快的命令时(在python poll() subprocess手册上对此有警告)。我发现唯一可靠的方法是通过选择,但这是一个posix-only解决方案:

import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    poller = select.poll()
    poller.register(p.stdout, select.POLLIN)
    poller.register(p.stderr, select.POLLIN)

    coutput=''
    cerror=''
    fdhup={}
    fdhup[p.stdout.fileno()]=0
    fdhup[p.stderr.fileno()]=0
    while sum(fdhup.values()) < len(fdhup):
        try:
            r = poller.poll(1)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            r=[]
        for fd, flags in r:
            if flags & (select.POLLIN | select.POLLPRI):
                c = os.read(fd, 1024)
                if rtoutput:
                    sys.stdout.write(c)
                    sys.stdout.flush()
                if fd == p.stderr.fileno():
                    cerror+=c
                else:
                    coutput+=c
            else:
                fdhup[fd]=1
    return p.poll(), coutput.strip(), cerror.strip()

其他回答

Python 3的TLDR:

import subprocess
import sys

with open("test.log", "wb") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), b""):
        sys.stdout.buffer.write(c)
        f.buffer.write(c)

你有两种方法来做到这一点,要么从read或readline函数创建一个迭代器,然后做:

import subprocess
import sys

# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    # replace "" with b'' for Python 3
    for c in iter(lambda: process.stdout.read(1), ""):
        sys.stdout.write(c)
        f.write(c)

or

import subprocess
import sys

# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    # replace "" with b"" for Python 3
    for line in iter(process.stdout.readline, ""):
        sys.stdout.write(line)
        f.write(line)

或者您可以创建一个读取器和一个写入器文件。将写入器传递给Popen并从读取器读取

import io
import time
import subprocess
import sys

filename = "test.log"
with io.open(filename, "wb") as writer, io.open(filename, "rb", 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

通过这种方式,您可以将数据写入test.log和标准输出中。

文件方法的唯一优点是代码不会阻塞。因此,您可以在此期间做任何您想做的事情,并以无阻塞的方式随时从阅读器读取。当您使用PIPE时,read和readline函数将阻塞,直到分别将一个字符写入管道或将一行字符写入管道。

我发现如何以流的方式读取子进程的输出(同时也在一个变量中捕获它)在Python中(对于多个输出流,即stdout和stderr)是通过传递子进程一个命名的临时文件来写入,然后在单独的读取句柄中打开相同的临时文件。

注意:这是针对Python 3的

    stdout_write = tempfile.NamedTemporaryFile()
    stdout_read = io.open(stdout_write.name, "r")
    stderr_write = tempfile.NamedTemporaryFile()
    stderr_read = io.open(stderr_write.name, "r")

    stdout_captured = ""
    stderr_captured = ""

    proc = subprocess.Popen(["command"], stdout=stdout_write, stderr=stderr_write)
    while True:
        proc_done: bool = cli_process.poll() is not None

        while True:
            content = stdout_read.read(1024)
            sys.stdout.write(content)
            stdout_captured += content
            if len(content) < 1024:
                break

        while True:
            content = stderr_read.read(1024)
            sys.stderr.write(content)
            stdout_captured += content
            if len(content) < 1024:
                break

        if proc_done:
            break

        time.sleep(0.1)

    stdout_write.close()
    stdout_read.close()
    stderr_write.close()
    stderr_read.close()

但是,如果您不需要捕获输出,那么您可以简单地传递sys。Stdout和sys。stderr流从你的Python脚本到被调用的子进程,正如xaav在他的回答中建议的那样:

subprocess.Popen(["command"], stdout=sys.stdout, stderr=sys.stderr)

如果您能够使用第三方库,您可能能够使用像sarge这样的东西(披露:我是它的维护者)。这个库允许非阻塞地访问子流程的输出流——它是分层在子流程模块之上的。

基于以上所有内容,我建议使用稍微修改过的版本(python3):

while循环调用readline (iter建议的解决方案似乎永远阻塞我- Python 3, Windows 7) 结构化的,因此在轮询返回not- none后不需要重复读取数据的处理 Stderr管道到stdout,因此两个输出输出都被读取 增加了获取cmd退出值的代码。

代码:

import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT, universal_newlines=True)
while True:
    rd = proc.stdout.readline()
    print(rd, end='')  # and whatever you want to do...
    if not rd:  # EOF
        returncode = proc.poll()
        if returncode is not None:
            break
        time.sleep(0.1)  # cmd closed stdout, but not exited yet

# You may want to check on ReturnCode here

解决方案1:实时并发记录stdout和stderr

一种简单的解决方案,可以将stdout和stderr同时记录,逐行实时地记录到日志文件中。

import subprocess as sp
from concurrent.futures import ThreadPoolExecutor


def log_popen_pipe(p, stdfile):

    with open("mylog.txt", "w") as f:

        while p.poll() is None:
            f.write(stdfile.readline())
            f.flush()

        # Write the rest from the buffer
        f.write(stdfile.read())


with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    with ThreadPoolExecutor(2) as pool:
        r1 = pool.submit(log_popen_pipe, p, p.stdout)
        r2 = pool.submit(log_popen_pipe, p, p.stderr)
        r1.result()
        r2.result()

解决方案2:函数read_popen_pipes(),它允许同时实时遍历两个管道(stdout/stderr)

import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

# The function in use:

with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):
        print(out_line, end='')
        print(err_line, end='')

    p.poll()