I'm using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen to run the code, collect the output from stdout and stderr into a subprocess.PIPE --- then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.

是否有一种方法既存储输出(用于日志记录和错误检查),又产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初,我将run_command通过tee输送,以便将副本直接发送到日志文件,流仍然直接输出到终端——但这样我就不能存储任何错误(据我所知)。


目前我的临时解决方案是:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端上运行tail -f log.txt (s.t. log_file = 'log.txt')。


当前回答

如果您所需要的只是输出将在控制台上可见,对我来说最简单的解决方案是将以下参数传递给Popen

with Popen(cmd, stdout=sys.stdout, stderr=sys.stderr) as proc:

哪个将使用您的python脚本stdio文件句柄

其他回答

除了这些答案之外,一个简单的方法也可以如下:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)

while process.stdout.readable():
    line = process.stdout.readline()

    if not line:
        break

    print(line.strip())

循环可读流,只要它是可读的,如果它得到一个空的结果,停止。

这里的关键是,readline()返回一行(结尾是\n),只要有输出,如果确实在末尾,则返回空行。

希望这能帮助到一些人。

解决方案1:实时并发记录stdout和stderr

一种简单的解决方案,可以将stdout和stderr同时记录,逐行实时地记录到日志文件中。

import subprocess as sp
from concurrent.futures import ThreadPoolExecutor


def log_popen_pipe(p, stdfile):

    with open("mylog.txt", "w") as f:

        while p.poll() is None:
            f.write(stdfile.readline())
            f.flush()

        # Write the rest from the buffer
        f.write(stdfile.read())


with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    with ThreadPoolExecutor(2) as pool:
        r1 = pool.submit(log_popen_pipe, p, p.stdout)
        r2 = pool.submit(log_popen_pipe, p, p.stderr)
        r1.result()
        r2.result()

解决方案2:函数read_popen_pipes(),它允许同时实时遍历两个管道(stdout/stderr)

import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

# The function in use:

with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):
        print(out_line, end='')
        print(err_line, end='')

    p.poll()

我尝试过的所有上述解决方案都无法分离stderr和stdout输出(多个管道),或者当操作系统管道缓冲区已满时永远阻塞,这发生在你运行输出太快的命令时(在python poll() subprocess手册上对此有警告)。我发现唯一可靠的方法是通过选择,但这是一个posix-only解决方案:

import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    poller = select.poll()
    poller.register(p.stdout, select.POLLIN)
    poller.register(p.stderr, select.POLLIN)

    coutput=''
    cerror=''
    fdhup={}
    fdhup[p.stdout.fileno()]=0
    fdhup[p.stderr.fileno()]=0
    while sum(fdhup.values()) < len(fdhup):
        try:
            r = poller.poll(1)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            r=[]
        for fd, flags in r:
            if flags & (select.POLLIN | select.POLLPRI):
                c = os.read(fd, 1024)
                if rtoutput:
                    sys.stdout.write(c)
                    sys.stdout.flush()
                if fd == p.stderr.fileno():
                    cerror+=c
                else:
                    coutput+=c
            else:
                fdhup[fd]=1
    return p.poll(), coutput.strip(), cerror.strip()
import os

def execute(cmd, callback):
    for line in iter(os.popen(cmd).readline, ''): 
            callback(line[:-1])

execute('ls -a', print)

Here is a class which I'm using in one of my projects. It redirects output of a subprocess to the log. At first I tried simply overwriting the write-method but that doesn't work as the subprocess will never call it (redirection happens on filedescriptor level). So I'm using my own pipe, similar to how it's done in the subprocess-module. This has the advantage of encapsulating all logging/printing logic in the adapter and you can simply pass instances of the logger to Popen: subprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

class LogAdapter(threading.Thread):

    def __init__(self, logname, level = logging.INFO):
        super().__init__()
        self.log = logging.getLogger(logname)
        self.readpipe, self.writepipe = os.pipe()

        logFunctions = {
            logging.DEBUG: self.log.debug,
            logging.INFO: self.log.info,
            logging.WARN: self.log.warn,
            logging.ERROR: self.log.warn,
        }

        try:
            self.logFunction = logFunctions[level]
        except KeyError:
            self.logFunction = self.log.info

    def fileno(self):
        #when fileno is called this indicates the subprocess is about to fork => start thread
        self.start()
        return self.writepipe

    def finished(self):
       """If the write-filedescriptor is not closed this thread will
       prevent the whole program from exiting. You can use this method
       to clean up after the subprocess has terminated."""
       os.close(self.writepipe)

    def run(self):
        inputFile = os.fdopen(self.readpipe)

        while True:
            line = inputFile.readline()

            if len(line) == 0:
                #no new data was added
                break

            self.logFunction(line.strip())

如果您不需要日志记录,而只是想使用print(),显然可以删除大部分代码并使类更短。你也可以通过__enter__和__exit__方法来扩展它,并在__exit__中调用finished,这样你就可以很容易地将它用作上下文。