要从我的python脚本启动程序,我使用以下方法:

def execute(command):
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    output = process.communicate()[0]
    exitCode = process.returncode

    if (exitCode == 0):
        return output
    else:
        raise ProcessException(command, exitCode, output)

所以当我启动一个进程,比如进程。执行("mvn clean install")时,我的程序会一直等待,直到进程完成,只有到那时我才能得到程序的完整输出。这是恼人的,如果我正在运行一个进程,需要一段时间才能完成。

我能让我的程序一行一行地写进程输出吗,在循环结束之前轮询进程输出什么的?

我找到了这篇文章,可能与此有关。


当前回答

在@jfs的出色回答的基础上,这里有一个完整的工作示例供您使用。要求Python 3.7或更新版本。

sub.py

import time

for i in range(10):
    print(i, flush=True)
    time.sleep(1)

main.py

from subprocess import PIPE, Popen
import sys

with Popen([sys.executable, 'sub.py'], bufsize=1, stdout=PIPE, text=True) as sub:
    for line in sub.stdout:
        print(line, end='')

注意子脚本中使用的flush参数。

其他回答

这里没有一个答案能满足我所有的需求。

没有用于标准输出的线程(也没有队列等) 非阻塞,因为我需要检查其他事情正在进行 使用PIPE,因为我需要做很多事情,例如流输出,写入日志文件,并返回输出的字符串副本。

一点背景知识:我使用ThreadPoolExecutor来管理一个线程池,每个线程启动一个子进程并并发地运行它们。(在Python2.7中,但这应该在更新的3中工作。X也是)。我不希望只使用线程来收集输出,因为我希望有尽可能多的可用线程用于其他事情(一个20个进程的池将使用40个线程来运行;1用于进程线程,1用于stdout…如果你想要stderr我猜)

我在这里剥离了很多异常,所以这是基于在生产中工作的代码。希望我没有在复制粘贴过程中破坏它。同时,非常欢迎反馈!

import time
import fcntl
import subprocess
import time

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

# Make stdout non-blocking when using read/readline
proc_stdout = proc.stdout
fl = fcntl.fcntl(proc_stdout, fcntl.F_GETFL)
fcntl.fcntl(proc_stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)

def handle_stdout(proc_stream, my_buffer, echo_streams=True, log_file=None):
    """A little inline function to handle the stdout business. """
    # fcntl makes readline non-blocking so it raises an IOError when empty
    try:
        for s in iter(proc_stream.readline, ''):   # replace '' with b'' for Python 3
            my_buffer.append(s)

            if echo_streams:
                sys.stdout.write(s)

            if log_file:
                log_file.write(s)
    except IOError:
        pass

# The main loop while subprocess is running
stdout_parts = []
while proc.poll() is None:
    handle_stdout(proc_stdout, stdout_parts)

    # ...Check for other things here...
    # For example, check a multiprocessor.Value('b') to proc.kill()

    time.sleep(0.01)

# Not sure if this is needed, but run it again just to be sure we got it all?
handle_stdout(proc_stdout, stdout_parts)

stdout_str = "".join(stdout_parts)  # Just to demo

我确信这里有额外的开销,但在我的情况下这不是一个问题。从功能上讲,它满足了我的需要。我唯一没有解决的问题是,为什么这对于日志消息非常有效,但我看到一些打印消息在稍后同时显示。

要回答最初的问题,IMO的最佳方法是直接将子进程stdout重定向到程序的stdout(可选地,对于stderr也可以这样做,如下例所示)

p = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
p.communicate()

@tokland

尝试了你的代码,并针对3.4和Windows进行了修正 dir。CMD是一个简单的dir命令,保存为CMD -file

import subprocess
c = "dir.cmd"

def execute(command):
    popen = subprocess.Popen(command, stdout=subprocess.PIPE,bufsize=1)
    lines_iterator = iter(popen.stdout.readline, b"")
    while popen.poll() is None:
        for line in lines_iterator:
            nline = line.rstrip()
            print(nline.decode("latin"), end = "\r\n",flush =True) # yield line

execute(c)
import time
import sys
import subprocess
import threading
import queue

cmd='esptool.py --chip esp8266 write_flash -z 0x1000 /home/pi/zero2/fw/base/boot_40m.bin'
cmd2='esptool.py --chip esp32 -b 115200 write_flash -z 0x1000 /home/pi/zero2/fw/test.bin'
cmd3='esptool.py --chip esp32 -b 115200 erase_flash'

class ExecutorFlushSTDOUT(object):
    def __init__(self,timeout=15):
        self.process = None
        self.process_output = queue.Queue(0)
        self.capture_output = threading.Thread(target=self.output_reader)
        self.timeout=timeout
        self.result=False
        self.validator=None
        
    def output_reader(self):
        start=time.time()
        while self.process.poll() is None and (time.time() - start) < self.timeout:
            try:
                if not self.process_output.full():
                    line=self.process.stdout.readline()
                    if line:
                        line=line.decode().rstrip("\n")
                        start=time.time()
                        self.process_output.put(line)
                        if self.validator:
                            if self.validator in line: print("Valid");self.result=True

            except:pass
        self.process.kill()
        return
            
    def start_process(self,cmd_list,callback=None,validator=None,timeout=None):
        if timeout: self.timeout=timeout
        self.validator=validator
        self.process = subprocess.Popen(cmd_list,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
        self.capture_output.start()
        line=None
        self.result=False
        while self.process.poll() is None:
            try:
                if not self.process_output.empty():
                    line = self.process_output.get()
                if line:
                    if callback:callback(line)
                    #print(line)
                    line=None
            except:pass                
        error = self.process.returncode
        if error:
            print("Error Found",str(error))
            raise RuntimeError(error)
        return self.result

execute = ExecutorFlushSTDOUT()

def liveOUTPUT(line):
    print("liveOUTPUT",line)
    try:
        if "Writing" in line:
            line=''.join([n for n in line.split(' ')[3] if n.isdigit()])
            print("percent={}".format(line))
    except Exception as e:
        pass
    


result=execute.start_process(cmd2,callback=liveOUTPUT,validator="Hash of data verified.")

print("Finish",result)

在Python 3中,只要子进程的stdout缓冲区被刷新,就逐行打印子进程的输出:

from subprocess import Popen, PIPE, CalledProcessError

with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
    for line in p.stdout:
        print(line, end='') # process line here

if p.returncode != 0:
    raise CalledProcessError(p.returncode, p.args)

注意:你不需要p.poll()——当到达eof时循环结束。你不需要iter(p.s dout。readline, ")——在Python 3中修复了预读错误。

另请参阅Python:从subprocess. communication()读取流输入。