I'm using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen to run the code, collect the output from stdout and stderr into a subprocess.PIPE --- then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.

是否有一种方法既存储输出(用于日志记录和错误检查),又产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初,我将run_command通过tee输送,以便将副本直接发送到日志文件,流仍然直接输出到终端——但这样我就不能存储任何错误(据我所知)。


目前我的临时解决方案是:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端上运行tail -f log.txt (s.t. log_file = 'log.txt')。


当前回答

如果您所需要的只是输出将在控制台上可见,对我来说最简单的解决方案是将以下参数传递给Popen

with Popen(cmd, stdout=sys.stdout, stderr=sys.stderr) as proc:

哪个将使用您的python脚本stdio文件句柄

其他回答

Here is a class which I'm using in one of my projects. It redirects output of a subprocess to the log. At first I tried simply overwriting the write-method but that doesn't work as the subprocess will never call it (redirection happens on filedescriptor level). So I'm using my own pipe, similar to how it's done in the subprocess-module. This has the advantage of encapsulating all logging/printing logic in the adapter and you can simply pass instances of the logger to Popen: subprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

class LogAdapter(threading.Thread):

    def __init__(self, logname, level = logging.INFO):
        super().__init__()
        self.log = logging.getLogger(logname)
        self.readpipe, self.writepipe = os.pipe()

        logFunctions = {
            logging.DEBUG: self.log.debug,
            logging.INFO: self.log.info,
            logging.WARN: self.log.warn,
            logging.ERROR: self.log.warn,
        }

        try:
            self.logFunction = logFunctions[level]
        except KeyError:
            self.logFunction = self.log.info

    def fileno(self):
        #when fileno is called this indicates the subprocess is about to fork => start thread
        self.start()
        return self.writepipe

    def finished(self):
       """If the write-filedescriptor is not closed this thread will
       prevent the whole program from exiting. You can use this method
       to clean up after the subprocess has terminated."""
       os.close(self.writepipe)

    def run(self):
        inputFile = os.fdopen(self.readpipe)

        while True:
            line = inputFile.readline()

            if len(line) == 0:
                #no new data was added
                break

            self.logFunction(line.strip())

如果您不需要日志记录,而只是想使用print(),显然可以删除大部分代码并使类更短。你也可以通过__enter__和__exit__方法来扩展它,并在__exit__中调用finished,这样你就可以很容易地将它用作上下文。

如果您能够使用第三方库,您可能能够使用像sarge这样的东西(披露:我是它的维护者)。这个库允许非阻塞地访问子流程的输出流——它是分层在子流程模块之上的。

在我看来,“来自subprocess命令的实时输出”意味着stdout和stderr都应该是实时的。stdin也应该被传递给子进程。

下面的片段在stdout和stderr上生成实时输出,并在outcome.{stdout,stderr}中将它们作为字节捕获。

窍门包括正确使用select和poll。

在Python 3.9上运行良好。


        if self.log == 1:
            print(f"** cmnd= {fullCmndStr}")

        self.outcome.stdcmnd = fullCmndStr
        try:
            process = subprocess.Popen(
                fullCmndStr,
                shell=True,
                encoding='utf8',
                executable="/bin/bash",
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
        except OSError:
            self.outcome.error = OSError
        else:
            process.stdin.write(stdin)
            process.stdin.close() # type: ignore

        stdoutStrFile = io.StringIO("")
        stderrStrFile = io.StringIO("")

        pollStdout = select.poll()
        pollStderr = select.poll()

        pollStdout.register(process.stdout, select.POLLIN)
        pollStderr.register(process.stderr, select.POLLIN)

        stdoutEOF = False
        stderrEOF = False

        while True:
            stdoutActivity = pollStdout.poll(0)
            if stdoutActivity:
                c= process.stdout.read(1)
                if c:
                    stdoutStrFile.write(c)
                    if self.log == 1:
                        sys.stdout.write(c)
                else:
                   stdoutEOF = True

            stderrActivity = pollStderr.poll(0)
            if stderrActivity:
                c= process.stderr.read(1)
                if c:
                    stderrStrFile.write(c)
                    if self.log == 1:
                        sys.stderr.write(c)
                else:
                   stderrEOF = True
            if stdoutEOF and stderrEOF:
                break

        if self.log == 1:
            print(f"** cmnd={fullCmndStr}")

        process.wait() # type: ignore

        self.outcome.stdout = stdoutStrFile.getvalue()
        self.outcome.stderr = stderrStrFile.getvalue()
        self.outcome.error = process.returncode # type: ignore

为什么不直接将stdout设置为sys.stdout?如果你也需要输出到日志,那么你可以简单地重写f的write方法。

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

执行摘要(或“tl;dr”版本):当最多只有一个子流程时,这很容易。管道,否则很难。

现在可能是时候解释一下子流程如何。Popen做了自己的事情。

(注意:这是针对Python 2的。X,尽管3。X是相似的;我不太清楚Windows版本。我更了解POSIX之类的东西。)

Popen函数需要同时处理0到3个I/O流。它们通常被表示为stdin、stdout和stderr。

你可以提供:

None, indicating that you don't want to redirect the stream. It will inherit these as usual instead. Note that on POSIX systems, at least, this does not mean it will use Python's sys.stdout, just Python's actual stdout; see demo at end. An int value. This is a "raw" file descriptor (in POSIX at least). (Side note: PIPE and STDOUT are actually ints internally, but are "impossible" descriptors, -1 and -2.) A stream—really, any object with a fileno method. Popen will find the descriptor for that stream, using stream.fileno(), and then proceed as for an int value. subprocess.PIPE, indicating that Python should create a pipe. subprocess.STDOUT (for stderr only): tell Python to use the same descriptor as for stdout. This only makes sense if you provided a (non-None) value for stdout, and even then, it is only needed if you set stdout=subprocess.PIPE. (Otherwise you can just provide the same argument you provided for stdout, e.g., Popen(..., stdout=stream, stderr=stream).)

最简单的情况(没有管道)

如果您不重定向任何内容(将这三个都保留为默认的None值或提供显式的None),那么Pipe很容易做到这一点。它只需要剥离子进程并让它运行。或者,如果你重定向到一个非pipe——一个int或流的fileno()——它仍然很容易,因为操作系统会做所有的工作。Python只需要剥离子进程,将其stdin、stdout和/或stderr连接到所提供的文件描述符。

仍然简单的情况是:一根管子

如果你只重定向一个流,Pipe仍然很简单。我们每次选一条小溪看吧。

假设您想要提供一些stdin,但是让stdout和stderr不重定向,或者转到文件描述符。作为父进程,您的Python程序只需要使用write()将数据发送到管道中。你可以自己做,例如:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc

或者你可以将标准输入数据传递给proc. communication(),它会执行标准输入。写如上所示。没有返回的输出,因此communication()只有另一个真正的工作:它还为您关闭管道。(如果你不调用proc. communication(),你必须调用proc.stdin.close()来关闭管道,这样子进程就知道没有更多的数据通过了。)

假设您希望捕获stdout,但保留stdin和stderr。同样,这很简单:只需调用proc.stdout.read()(或等效),直到没有更多输出。由于proc.stdout()是一个正常的Python I/O流,你可以在它上面使用所有正常的结构,比如:

for line in proc.stdout:

或者,您可以再次使用proc. communication(),它只是为您执行read()。

如果您只想捕获stderr,它的工作原理与stdout相同。

在事情变得复杂之前,还有一个技巧。假设你想捕获stdout,也捕获stderr,但与stdout在同一个管道上:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

在这种情况下,子进程“作弊”!好吧,它必须这样做,所以它不是真正的欺骗:它启动子进程时,将它的标准输出和标准derr指向(单个)管道描述符,然后反馈给它的父进程(Python)。在父端,同样只有一个用于读取输出的管道描述符。所有“stderr”输出都显示在proc.stdout中,如果调用proc. communication (), stderr结果(元组中的第二个值)将为None,而不是字符串。

硬的情况:两个或更多的管道

当您想要使用至少两个管道时,就会出现这些问题。事实上,子进程代码本身有这样的部分:

def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

但是,哎呀,这里我们至少创建了两个,也许是三个不同的管道,所以count(None)返回1或0。我们必须用艰难的方式做事。

在Windows上,这使用线程。线程为自己积累结果。Stdout和self。Stderr,并让父线程传递自我。Stdin输入数据(然后关闭管道)。

在POSIX上,如果可用,则使用轮询,否则使用select,以累积输出并交付stdin输入。所有这些都运行在(单个)父进程/线程中。

Threads or poll/select are needed here to avoid deadlock. Suppose, for instance, that we've redirected all three streams to three separate pipes. Suppose further that there's a small limit on how much data can be stuffed into to a pipe before the writing process is suspended, waiting for the reading process to "clean out" the pipe from the other end. Let's set that small limit to a single byte, just for illustration. (This is in fact how things work, except that the limit is much bigger than one byte.)

如果父进程(Python)试图写入几个字节——比如,'go\n'到proc.stdin,第一个字节进入,然后第二个字节导致Python进程挂起,等待子进程读取第一个字节,清空管道。

同时,假设子进程决定打印友好的“Hello!”别慌!”H进入它的标准输出管道,但是e使它挂起,等待它的父结点读取H,清空标准输出管道。

现在我们被困住了:Python进程睡着了,等待完成说“go”,子进程也睡着了,等待完成说“Hello!”别慌!”

The subprocess.Popen code avoids this problem with threading-or-select/poll. When bytes can go over the pipes, they go. When they can't, only a thread (not the whole process) has to sleep—or, in the case of select/poll, the Python process waits simultaneously for "can write" or "data available", writes to the process's stdin only when there is room, and reads its stdout and/or stderr only when data are ready. The proc.communicate() code (actually _communicate where the hairy cases are handled) returns once all stdin data (if any) have been sent and all stdout and/or stderr data have been accumulated.

如果希望同时读取两个不同管道上的stdout和stderr(不管是否有stdin重定向),还需要避免死锁。这里的死锁场景有所不同——当您从stdout提取数据时,子进程将较长的内容写入stderr时就会发生死锁,反之亦然——但死锁仍然存在。


演示

我承诺演示,在未重定向的情况下,Python子进程写入底层标准输出,而不是sys.stdout。这里有一些代码:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
   print 'start show1'
   save = sys.stdout
   sys.stdout = StringIO()
   print 'sys.stdout being buffered'
   proc = subprocess.Popen(['echo', 'hello'])
   proc.wait()
   in_stdout = sys.stdout.getvalue()
   sys.stdout = save
   print 'in buffer:', in_stdout

def show2():
   print 'start show2'
   save = sys.stdout
   sys.stdout = open(os.devnull, 'w')
   print 'after redirect sys.stdout'
   proc = subprocess.Popen(['echo', 'hello'])
   proc.wait()
   sys.stdout = save

show1()
show2()

运行时:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

注意,如果添加stdout=sys,第一个例程将失败。stdout,因为StringIO对象没有文件。如果添加stdout=sys,第二个函数将忽略hello。自sys开始的Stdout。Stdout已重定向到os.devnull。

(如果重定向Python的file-descriptor-1,子进程将跟随该重定向。open(os.devnull, 'w')调用产生一个fileno()大于2的流。