我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。

下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

当前回答

不是第一个,也可能不是最后一个,我已经构建了一个包,它使用两种不同的方法执行非阻塞标准输出PIPE读取,一种是基于J.F. Sebastian (@jfs)的答案,另一种是一个简单的communication()循环,使用线程检查超时。

两种标准输出捕获方法都在Linux和Windows下进行了测试,截至撰写本文时,Python版本从2.7到3.9

由于它是非阻塞的,它保证了超时强制,即使有多个子进程和孙子进程,甚至在Python 2.7下也是如此。

该包还处理字节和文本标准输出编码,当试图捕获EOF时,这是一个噩梦。

您可以在https://github.com/netinvent/command_runner上找到该软件包

如果你需要一些经过良好测试的非阻塞读取实现,可以尝试一下(或修改代码):

pip install command_runner

from command_runner import command_runner

exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')

您可以在_poll_process()或_monitor_process()中找到核心的非阻塞读取代码,这取决于所使用的捕获方法。 在此基础上,您可以实现自己想要的功能,或者简单地使用整个包作为子进程替换来执行命令。

其他回答

不是第一个,也可能不是最后一个,我已经构建了一个包,它使用两种不同的方法执行非阻塞标准输出PIPE读取,一种是基于J.F. Sebastian (@jfs)的答案,另一种是一个简单的communication()循环,使用线程检查超时。

两种标准输出捕获方法都在Linux和Windows下进行了测试,截至撰写本文时,Python版本从2.7到3.9

由于它是非阻塞的,它保证了超时强制,即使有多个子进程和孙子进程,甚至在Python 2.7下也是如此。

该包还处理字节和文本标准输出编码,当试图捕获EOF时,这是一个噩梦。

您可以在https://github.com/netinvent/command_runner上找到该软件包

如果你需要一些经过良好测试的非阻塞读取实现,可以尝试一下(或修改代码):

pip install command_runner

from command_runner import command_runner

exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')

您可以在_poll_process()或_monitor_process()中找到核心的非阻塞读取代码,这取决于所使用的捕获方法。 在此基础上,您可以实现自己想要的功能,或者简单地使用整个包作为子进程替换来执行命令。

下面是一个基于线程的简单解决方案:

适用于Linux和Windows(不依赖于select)。 异步读取stdout和stderr。 不依赖于具有任意等待时间的活动轮询(CPU友好)。 不使用asyncio(这可能与其他库冲突)。 一直运行到子进程终止为止。

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

现有的解决方案不适合我(详情见下文)。最后成功的是使用read(1)实现readline(基于这个答案)。后者不阻塞:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

为什么现有的解决方案不起作用:

Solutions that require readline (including the Queue based ones) always block. It is difficult (impossible?) to kill the thread that executes readline. It only gets killed when the process that created it finishes, but not when the output-producing process is killed. Mixing low-level fcntl with high-level readline calls may not work properly as anonnn has pointed out. Using select.poll() is neat, but doesn't work on Windows according to python docs. Using third-party libraries seems overkill for this task and adds additional dependencies.

You can do this really easily in Twisted. Depending upon your existing code base, this might not be that easy to use, but if you are building a twisted application, then things like this become almost trivial. You create a ProcessProtocol class, and override the outReceived() method. Twisted (depending upon the reactor used) is usually just a big select() loop with callbacks installed to handle data from different file descriptors (often network sockets). So the outReceived() method is simply installing a callback for handling data coming from STDOUT. A simple example demonstrating this behavior is as follows:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Twisted文档在这方面有一些很好的信息。

如果您围绕Twisted构建整个应用程序,它可以与其他进程(本地或远程)进行异步通信,就像这样非常优雅。另一方面,如果您的程序不是构建在Twisted之上,那么这真的不会有多大帮助。希望这能对其他读者有所帮助,即使它不适用于您的特定应用程序。

编辑:这个实现仍然会阻塞。请用j·f·塞巴斯蒂安的答案。

我尝试了上面的答案,但是线程代码的额外风险和维护令人担忧。

通过io模块(仅限于2.6),我找到了BufferedReader。这是我的无线程、无阻塞的解决方案。

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line