I'm using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen to run the code, collect the output from stdout and stderr into a subprocess.PIPE --- then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.

是否有一种方法既存储输出(用于日志记录和错误检查),又产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初,我将run_command通过tee输送,以便将副本直接发送到日志文件,流仍然直接输出到终端——但这样我就不能存储任何错误(据我所知)。


目前我的临时解决方案是:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端上运行tail -f log.txt (s.t. log_file = 'log.txt')。


当前回答

我认为subprocess. communication方法有点误导人:它实际上填充了您在subprocess.Popen中指定的stdout和stderr。

然而,从子进程中读取。可以提供给子流程的PIPE。Popen的stdout和stderr参数最终会填满OS管道缓冲区并导致应用程序死锁(特别是当你有多个必须使用subprocess的进程/线程时)。

我建议的解决方案是提供带有文件的标准输出和标准输出-并读取文件的内容,而不是从死锁PIPE中读取。这些文件可以是tempfile.NamedTemporaryFile()——当subprocess. communication写入这些文件时,也可以访问该文件进行读取。

下面是一个示例用法:

try:
    with ProcessRunner(
        ("python", "task.py"), env=os.environ.copy(), seconds_to_wait=0.01
    ) as process_runner:
        for out in process_runner:
            print(out)
except ProcessError as e:
    print(e.error_message)
    raise

这是源代码,准备使用尽可能多的评论,因为我可以提供解释它的功能:

如果您正在使用python 2,请确保首先从pypi安装最新版本的subprocess32包。

import os
import sys
import threading
import time
import tempfile
import logging

if os.name == 'posix' and sys.version_info[0] < 3:
    # Support python 2
    import subprocess32 as subprocess
else:
    # Get latest and greatest from python 3
    import subprocess

logger = logging.getLogger(__name__)


class ProcessError(Exception):
    """Base exception for errors related to running the process"""


class ProcessTimeout(ProcessError):
    """Error that will be raised when the process execution will exceed a timeout"""


class ProcessRunner(object):
    def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
        """
        Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
        Process Runner. This is a class that should be used as a context manager - and that provides an iterator
        for reading captured output from subprocess.communicate in near realtime.

        Example usage:


        try:
            with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        except ProcessError as e:
            print(e.error_message)
            raise

        :param args: same as subprocess.Popen
        :param env: same as subprocess.Popen
        :param timeout: same as subprocess.communicate
        :param bufsize: same as subprocess.Popen
        :param seconds_to_wait: time to wait between each readline from the temporary file
        :param kwargs: same as subprocess.Popen
        """
        self._seconds_to_wait = seconds_to_wait
        self._process_has_timed_out = False
        self._timeout = timeout
        self._process_done = False
        self._std_file_handle = tempfile.NamedTemporaryFile()
        self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
                                         stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
        self._thread = threading.Thread(target=self._run_process)
        self._thread.daemon = True

    def __enter__(self):
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._thread.join()
        self._std_file_handle.close()

    def __iter__(self):
        # read all output from stdout file that subprocess.communicate fills
        with open(self._std_file_handle.name, 'r') as stdout:
            # while process is alive, keep reading data
            while not self._process_done:
                out = stdout.readline()
                out_without_trailing_whitespaces = out.rstrip()
                if out_without_trailing_whitespaces:
                    # yield stdout data without trailing \n
                    yield out_without_trailing_whitespaces
                else:
                    # if there is nothing to read, then please wait a tiny little bit
                    time.sleep(self._seconds_to_wait)

            # this is a hack: terraform seems to write to buffer after process has finished
            out = stdout.read()
            if out:
                yield out

        if self._process_has_timed_out:
            raise ProcessTimeout('Process has timed out')

        if self._process.returncode != 0:
            raise ProcessError('Process has failed')

    def _run_process(self):
        try:
            # Start gathering information (stdout and stderr) from the opened process
            self._process.communicate(timeout=self._timeout)
            # Graceful termination of the opened process
            self._process.terminate()
        except subprocess.TimeoutExpired:
            self._process_has_timed_out = True
            # Force termination of the opened process
            self._process.kill()

        self._process_done = True

    @property
    def return_code(self):
        return self._process.returncode



其他回答

为什么不直接将stdout设置为sys.stdout?如果你也需要输出到日志,那么你可以简单地重写f的write方法。

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

我们也可以使用默认的文件迭代器来读取stdout,而不是使用iter构造readline()。

import subprocess
import sys

process = subprocess.Popen(
    your_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
for line in process.stdout:
    sys.stdout.write(line)

如果您能够使用第三方库,您可能能够使用像sarge这样的东西(披露:我是它的维护者)。这个库允许非阻塞地访问子流程的输出流——它是分层在子流程模块之上的。

执行摘要(或“tl;dr”版本):当最多只有一个子流程时,这很容易。管道,否则很难。

现在可能是时候解释一下子流程如何。Popen做了自己的事情。

(注意:这是针对Python 2的。X,尽管3。X是相似的;我不太清楚Windows版本。我更了解POSIX之类的东西。)

Popen函数需要同时处理0到3个I/O流。它们通常被表示为stdin、stdout和stderr。

你可以提供:

None, indicating that you don't want to redirect the stream. It will inherit these as usual instead. Note that on POSIX systems, at least, this does not mean it will use Python's sys.stdout, just Python's actual stdout; see demo at end. An int value. This is a "raw" file descriptor (in POSIX at least). (Side note: PIPE and STDOUT are actually ints internally, but are "impossible" descriptors, -1 and -2.) A stream—really, any object with a fileno method. Popen will find the descriptor for that stream, using stream.fileno(), and then proceed as for an int value. subprocess.PIPE, indicating that Python should create a pipe. subprocess.STDOUT (for stderr only): tell Python to use the same descriptor as for stdout. This only makes sense if you provided a (non-None) value for stdout, and even then, it is only needed if you set stdout=subprocess.PIPE. (Otherwise you can just provide the same argument you provided for stdout, e.g., Popen(..., stdout=stream, stderr=stream).)

最简单的情况(没有管道)

如果您不重定向任何内容(将这三个都保留为默认的None值或提供显式的None),那么Pipe很容易做到这一点。它只需要剥离子进程并让它运行。或者,如果你重定向到一个非pipe——一个int或流的fileno()——它仍然很容易,因为操作系统会做所有的工作。Python只需要剥离子进程,将其stdin、stdout和/或stderr连接到所提供的文件描述符。

仍然简单的情况是:一根管子

如果你只重定向一个流,Pipe仍然很简单。我们每次选一条小溪看吧。

假设您想要提供一些stdin,但是让stdout和stderr不重定向,或者转到文件描述符。作为父进程,您的Python程序只需要使用write()将数据发送到管道中。你可以自己做,例如:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc

或者你可以将标准输入数据传递给proc. communication(),它会执行标准输入。写如上所示。没有返回的输出,因此communication()只有另一个真正的工作:它还为您关闭管道。(如果你不调用proc. communication(),你必须调用proc.stdin.close()来关闭管道,这样子进程就知道没有更多的数据通过了。)

假设您希望捕获stdout,但保留stdin和stderr。同样,这很简单:只需调用proc.stdout.read()(或等效),直到没有更多输出。由于proc.stdout()是一个正常的Python I/O流,你可以在它上面使用所有正常的结构,比如:

for line in proc.stdout:

或者,您可以再次使用proc. communication(),它只是为您执行read()。

如果您只想捕获stderr,它的工作原理与stdout相同。

在事情变得复杂之前,还有一个技巧。假设你想捕获stdout,也捕获stderr,但与stdout在同一个管道上:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

在这种情况下,子进程“作弊”!好吧,它必须这样做,所以它不是真正的欺骗:它启动子进程时,将它的标准输出和标准derr指向(单个)管道描述符,然后反馈给它的父进程(Python)。在父端,同样只有一个用于读取输出的管道描述符。所有“stderr”输出都显示在proc.stdout中,如果调用proc. communication (), stderr结果(元组中的第二个值)将为None,而不是字符串。

硬的情况:两个或更多的管道

当您想要使用至少两个管道时,就会出现这些问题。事实上,子进程代码本身有这样的部分:

def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

但是,哎呀,这里我们至少创建了两个,也许是三个不同的管道,所以count(None)返回1或0。我们必须用艰难的方式做事。

在Windows上,这使用线程。线程为自己积累结果。Stdout和self。Stderr,并让父线程传递自我。Stdin输入数据(然后关闭管道)。

在POSIX上,如果可用,则使用轮询,否则使用select,以累积输出并交付stdin输入。所有这些都运行在(单个)父进程/线程中。

Threads or poll/select are needed here to avoid deadlock. Suppose, for instance, that we've redirected all three streams to three separate pipes. Suppose further that there's a small limit on how much data can be stuffed into to a pipe before the writing process is suspended, waiting for the reading process to "clean out" the pipe from the other end. Let's set that small limit to a single byte, just for illustration. (This is in fact how things work, except that the limit is much bigger than one byte.)

如果父进程(Python)试图写入几个字节——比如,'go\n'到proc.stdin,第一个字节进入,然后第二个字节导致Python进程挂起,等待子进程读取第一个字节,清空管道。

同时,假设子进程决定打印友好的“Hello!”别慌!”H进入它的标准输出管道,但是e使它挂起,等待它的父结点读取H,清空标准输出管道。

现在我们被困住了:Python进程睡着了,等待完成说“go”,子进程也睡着了,等待完成说“Hello!”别慌!”

The subprocess.Popen code avoids this problem with threading-or-select/poll. When bytes can go over the pipes, they go. When they can't, only a thread (not the whole process) has to sleep—or, in the case of select/poll, the Python process waits simultaneously for "can write" or "data available", writes to the process's stdin only when there is room, and reads its stdout and/or stderr only when data are ready. The proc.communicate() code (actually _communicate where the hairy cases are handled) returns once all stdin data (if any) have been sent and all stdout and/or stderr data have been accumulated.

如果希望同时读取两个不同管道上的stdout和stderr(不管是否有stdin重定向),还需要避免死锁。这里的死锁场景有所不同——当您从stdout提取数据时,子进程将较长的内容写入stderr时就会发生死锁,反之亦然——但死锁仍然存在。


演示

我承诺演示,在未重定向的情况下,Python子进程写入底层标准输出,而不是sys.stdout。这里有一些代码:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
   print 'start show1'
   save = sys.stdout
   sys.stdout = StringIO()
   print 'sys.stdout being buffered'
   proc = subprocess.Popen(['echo', 'hello'])
   proc.wait()
   in_stdout = sys.stdout.getvalue()
   sys.stdout = save
   print 'in buffer:', in_stdout

def show2():
   print 'start show2'
   save = sys.stdout
   sys.stdout = open(os.devnull, 'w')
   print 'after redirect sys.stdout'
   proc = subprocess.Popen(['echo', 'hello'])
   proc.wait()
   sys.stdout = save

show1()
show2()

运行时:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

注意,如果添加stdout=sys,第一个例程将失败。stdout,因为StringIO对象没有文件。如果添加stdout=sys,第二个函数将忽略hello。自sys开始的Stdout。Stdout已重定向到os.devnull。

(如果重定向Python的file-descriptor-1,子进程将跟随该重定向。open(os.devnull, 'w')调用产生一个fileno()大于2的流。

遇到了同样的问题,并使用process.sdtout.read1()得出了一个简单而干净的解决方案,它完全满足了我在python3中的需求。

下面是一个使用ping命令的演示(需要网络连接):

from subprocess import Popen, PIPE

cmd = "ping 8.8.8.8"
proc = Popen([cmd], shell=True, stdout=PIPE)
while True:
    print(proc.stdout.read1())

当ping命令实时报告其数据时,大约每秒钟就会在python控制台中打印一行新行。