要从我的python脚本启动程序,我使用以下方法:

def execute(command):
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    output = process.communicate()[0]
    exitCode = process.returncode

    if (exitCode == 0):
        return output
    else:
        raise ProcessException(command, exitCode, output)

所以当我启动一个进程,比如进程。执行("mvn clean install")时,我的程序会一直等待,直到进程完成,只有到那时我才能得到程序的完整输出。这是恼人的,如果我正在运行一个进程,需要一段时间才能完成。

我能让我的程序一行一行地写进程输出吗,在循环结束之前轮询进程输出什么的?

我找到了这篇文章,可能与此有关。


当前回答

实际上有一个非常简单的方法来做到这一点,当你只想打印输出:

import subprocess
import sys

def execute(command):
    subprocess.check_call(command, shell=True, stdout=sys.stdout, stderr=subprocess.STDOUT)

在这里,我们只是将子进程指向我们自己的标准输出,并使用现有的成功或异常api。

其他回答

如果您想在进程运行时从stdout打印,请使用-u Python选项和subprocess.Popen()。(shell=True是必要的,尽管有风险…)

如果有人想同时使用线程从stdout和stderr读取,这是我想到的:

import threading
import subprocess
import Queue

class AsyncLineReader(threading.Thread):
    def __init__(self, fd, outputQueue):
        threading.Thread.__init__(self)

        assert isinstance(outputQueue, Queue.Queue)
        assert callable(fd.readline)

        self.fd = fd
        self.outputQueue = outputQueue

    def run(self):
        map(self.outputQueue.put, iter(self.fd.readline, ''))

    def eof(self):
        return not self.is_alive() and self.outputQueue.empty()

    @classmethod
    def getForFd(cls, fd, start=True):
        queue = Queue.Queue()
        reader = cls(fd, queue)

        if start:
            reader.start()

        return reader, queue


process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdoutReader, stdoutQueue) = AsyncLineReader.getForFd(process.stdout)
(stderrReader, stderrQueue) = AsyncLineReader.getForFd(process.stderr)

# Keep checking queues until there is no more output.
while not stdoutReader.eof() or not stderrReader.eof():
   # Process all available lines from the stdout Queue.
   while not stdoutQueue.empty():
       line = stdoutQueue.get()
       print 'Received stdout: ' + repr(line)

       # Do stuff with stdout line.

   # Process all available lines from the stderr Queue.
   while not stderrQueue.empty():
       line = stderrQueue.get()
       print 'Received stderr: ' + repr(line)

       # Do stuff with stderr line.

   # Sleep for a short time to avoid excessive CPU use while waiting for data.
   sleep(0.05)

print "Waiting for async readers to finish..."
stdoutReader.join()
stderrReader.join()

# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()

print "Waiting for process to exit..."
returnCode = process.wait()

if returnCode != 0:
   raise subprocess.CalledProcessError(returnCode, command)

我只是想分享这个,因为我在这个问题上试图做类似的事情,但没有一个答案能解决我的问题。希望它能帮助到某些人!

请注意,在我的用例中,外部进程杀死了Popen()所使用的进程。

实际上有一个非常简单的方法来做到这一点,当你只想打印输出:

import subprocess
import sys

def execute(command):
    subprocess.check_call(command, shell=True, stdout=sys.stdout, stderr=subprocess.STDOUT)

在这里,我们只是将子进程指向我们自己的标准输出,并使用现有的成功或异常api。

import time
import sys
import subprocess
import threading
import queue

cmd='esptool.py --chip esp8266 write_flash -z 0x1000 /home/pi/zero2/fw/base/boot_40m.bin'
cmd2='esptool.py --chip esp32 -b 115200 write_flash -z 0x1000 /home/pi/zero2/fw/test.bin'
cmd3='esptool.py --chip esp32 -b 115200 erase_flash'

class ExecutorFlushSTDOUT(object):
    def __init__(self,timeout=15):
        self.process = None
        self.process_output = queue.Queue(0)
        self.capture_output = threading.Thread(target=self.output_reader)
        self.timeout=timeout
        self.result=False
        self.validator=None
        
    def output_reader(self):
        start=time.time()
        while self.process.poll() is None and (time.time() - start) < self.timeout:
            try:
                if not self.process_output.full():
                    line=self.process.stdout.readline()
                    if line:
                        line=line.decode().rstrip("\n")
                        start=time.time()
                        self.process_output.put(line)
                        if self.validator:
                            if self.validator in line: print("Valid");self.result=True

            except:pass
        self.process.kill()
        return
            
    def start_process(self,cmd_list,callback=None,validator=None,timeout=None):
        if timeout: self.timeout=timeout
        self.validator=validator
        self.process = subprocess.Popen(cmd_list,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
        self.capture_output.start()
        line=None
        self.result=False
        while self.process.poll() is None:
            try:
                if not self.process_output.empty():
                    line = self.process_output.get()
                if line:
                    if callback:callback(line)
                    #print(line)
                    line=None
            except:pass                
        error = self.process.returncode
        if error:
            print("Error Found",str(error))
            raise RuntimeError(error)
        return self.result

execute = ExecutorFlushSTDOUT()

def liveOUTPUT(line):
    print("liveOUTPUT",line)
    try:
        if "Writing" in line:
            line=''.join([n for n in line.split(' ')[3] if n.isdigit()])
            print("percent={}".format(line))
    except Exception as e:
        pass
    


result=execute.start_process(cmd2,callback=liveOUTPUT,validator="Hash of data verified.")

print("Finish",result)

在Python 3中,只要子进程的stdout缓冲区被刷新,就逐行打印子进程的输出:

from subprocess import Popen, PIPE, CalledProcessError

with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
    for line in p.stdout:
        print(line, end='') # process line here

if p.returncode != 0:
    raise CalledProcessError(p.returncode, p.args)

注意:你不需要p.poll()——当到达eof时循环结束。你不需要iter(p.s dout。readline, ")——在Python 3中修复了预读错误。

另请参阅Python:从subprocess. communication()读取流输入。