I'm using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen to run the code, collect the output from stdout and stderr into a subprocess.PIPE --- then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.

是否有一种方法既存储输出(用于日志记录和错误检查),又产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初,我将run_command通过tee输送,以便将副本直接发送到日志文件,流仍然直接输出到终端——但这样我就不能存储任何错误(据我所知)。


目前我的临时解决方案是:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端上运行tail -f log.txt (s.t. log_file = 'log.txt')。


当前回答

遇到了同样的问题,并使用process.sdtout.read1()得出了一个简单而干净的解决方案,它完全满足了我在python3中的需求。

下面是一个使用ping命令的演示(需要网络连接):

from subprocess import Popen, PIPE

cmd = "ping 8.8.8.8"
proc = Popen([cmd], shell=True, stdout=PIPE)
while True:
    print(proc.stdout.read1())

当ping命令实时报告其数据时,大约每秒钟就会在python控制台中打印一行新行。

其他回答

python的解决方案对我都不起作用。 原来proc.stdout.read()或类似的可能永远阻塞。

因此,我这样使用tee:

subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')

如果你已经在使用shell=True,这个解决方案很方便。

${PIPESTATUS}捕获整个命令链的成功状态(仅在Bash中可用)。 如果我省略&&退出${PIPESTATUS},那么这将总是返回零,因为tee从未失败过。

Unbuffer对于立即将每一行打印到终端可能是必要的,而不是等待太长时间,直到“管道缓冲区”被填满。 然而,unbuffer吞下了assert的退出状态(SIG Abort)…

2>&1也记录文件的标准错误。

为什么不直接将stdout设置为sys.stdout?如果你也需要输出到日志,那么你可以简单地重写f的write方法。

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

Python 3的TLDR:

import subprocess
import sys

with open("test.log", "wb") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), b""):
        sys.stdout.buffer.write(c)
        f.buffer.write(c)

你有两种方法来做到这一点,要么从read或readline函数创建一个迭代器,然后做:

import subprocess
import sys

# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    # replace "" with b'' for Python 3
    for c in iter(lambda: process.stdout.read(1), ""):
        sys.stdout.write(c)
        f.write(c)

or

import subprocess
import sys

# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    # replace "" with b"" for Python 3
    for line in iter(process.stdout.readline, ""):
        sys.stdout.write(line)
        f.write(line)

或者您可以创建一个读取器和一个写入器文件。将写入器传递给Popen并从读取器读取

import io
import time
import subprocess
import sys

filename = "test.log"
with io.open(filename, "wb") as writer, io.open(filename, "rb", 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

通过这种方式,您可以将数据写入test.log和标准输出中。

文件方法的唯一优点是代码不会阻塞。因此,您可以在此期间做任何您想做的事情,并以无阻塞的方式随时从阅读器读取。当您使用PIPE时,read和readline函数将阻塞,直到分别将一个字符写入管道或将一行字符写入管道。

我认为subprocess. communication方法有点误导人:它实际上填充了您在subprocess.Popen中指定的stdout和stderr。

然而,从子进程中读取。可以提供给子流程的PIPE。Popen的stdout和stderr参数最终会填满OS管道缓冲区并导致应用程序死锁(特别是当你有多个必须使用subprocess的进程/线程时)。

我建议的解决方案是提供带有文件的标准输出和标准输出-并读取文件的内容,而不是从死锁PIPE中读取。这些文件可以是tempfile.NamedTemporaryFile()——当subprocess. communication写入这些文件时,也可以访问该文件进行读取。

下面是一个示例用法:

try:
    with ProcessRunner(
        ("python", "task.py"), env=os.environ.copy(), seconds_to_wait=0.01
    ) as process_runner:
        for out in process_runner:
            print(out)
except ProcessError as e:
    print(e.error_message)
    raise

这是源代码,准备使用尽可能多的评论,因为我可以提供解释它的功能:

如果您正在使用python 2,请确保首先从pypi安装最新版本的subprocess32包。

import os
import sys
import threading
import time
import tempfile
import logging

if os.name == 'posix' and sys.version_info[0] < 3:
    # Support python 2
    import subprocess32 as subprocess
else:
    # Get latest and greatest from python 3
    import subprocess

logger = logging.getLogger(__name__)


class ProcessError(Exception):
    """Base exception for errors related to running the process"""


class ProcessTimeout(ProcessError):
    """Error that will be raised when the process execution will exceed a timeout"""


class ProcessRunner(object):
    def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
        """
        Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
        Process Runner. This is a class that should be used as a context manager - and that provides an iterator
        for reading captured output from subprocess.communicate in near realtime.

        Example usage:


        try:
            with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        except ProcessError as e:
            print(e.error_message)
            raise

        :param args: same as subprocess.Popen
        :param env: same as subprocess.Popen
        :param timeout: same as subprocess.communicate
        :param bufsize: same as subprocess.Popen
        :param seconds_to_wait: time to wait between each readline from the temporary file
        :param kwargs: same as subprocess.Popen
        """
        self._seconds_to_wait = seconds_to_wait
        self._process_has_timed_out = False
        self._timeout = timeout
        self._process_done = False
        self._std_file_handle = tempfile.NamedTemporaryFile()
        self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
                                         stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
        self._thread = threading.Thread(target=self._run_process)
        self._thread.daemon = True

    def __enter__(self):
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._thread.join()
        self._std_file_handle.close()

    def __iter__(self):
        # read all output from stdout file that subprocess.communicate fills
        with open(self._std_file_handle.name, 'r') as stdout:
            # while process is alive, keep reading data
            while not self._process_done:
                out = stdout.readline()
                out_without_trailing_whitespaces = out.rstrip()
                if out_without_trailing_whitespaces:
                    # yield stdout data without trailing \n
                    yield out_without_trailing_whitespaces
                else:
                    # if there is nothing to read, then please wait a tiny little bit
                    time.sleep(self._seconds_to_wait)

            # this is a hack: terraform seems to write to buffer after process has finished
            out = stdout.read()
            if out:
                yield out

        if self._process_has_timed_out:
            raise ProcessTimeout('Process has timed out')

        if self._process.returncode != 0:
            raise ProcessError('Process has failed')

    def _run_process(self):
        try:
            # Start gathering information (stdout and stderr) from the opened process
            self._process.communicate(timeout=self._timeout)
            # Graceful termination of the opened process
            self._process.terminate()
        except subprocess.TimeoutExpired:
            self._process_has_timed_out = True
            # Force termination of the opened process
            self._process.kill()

        self._process_done = True

    @property
    def return_code(self):
        return self._process.returncode



在我看来,“来自subprocess命令的实时输出”意味着stdout和stderr都应该是实时的。stdin也应该被传递给子进程。

下面的片段在stdout和stderr上生成实时输出,并在outcome.{stdout,stderr}中将它们作为字节捕获。

窍门包括正确使用select和poll。

在Python 3.9上运行良好。


        if self.log == 1:
            print(f"** cmnd= {fullCmndStr}")

        self.outcome.stdcmnd = fullCmndStr
        try:
            process = subprocess.Popen(
                fullCmndStr,
                shell=True,
                encoding='utf8',
                executable="/bin/bash",
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
        except OSError:
            self.outcome.error = OSError
        else:
            process.stdin.write(stdin)
            process.stdin.close() # type: ignore

        stdoutStrFile = io.StringIO("")
        stderrStrFile = io.StringIO("")

        pollStdout = select.poll()
        pollStderr = select.poll()

        pollStdout.register(process.stdout, select.POLLIN)
        pollStderr.register(process.stderr, select.POLLIN)

        stdoutEOF = False
        stderrEOF = False

        while True:
            stdoutActivity = pollStdout.poll(0)
            if stdoutActivity:
                c= process.stdout.read(1)
                if c:
                    stdoutStrFile.write(c)
                    if self.log == 1:
                        sys.stdout.write(c)
                else:
                   stdoutEOF = True

            stderrActivity = pollStderr.poll(0)
            if stderrActivity:
                c= process.stderr.read(1)
                if c:
                    stderrStrFile.write(c)
                    if self.log == 1:
                        sys.stderr.write(c)
                else:
                   stderrEOF = True
            if stdoutEOF and stderrEOF:
                break

        if self.log == 1:
            print(f"** cmnd={fullCmndStr}")

        process.wait() # type: ignore

        self.outcome.stdout = stdoutStrFile.getvalue()
        self.outcome.stderr = stderrStrFile.getvalue()
        self.outcome.error = process.returncode # type: ignore