我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。

下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

当前回答

下面是我的代码,用于捕获子进程ASAP的每个输出,包括部分行。它同时泵浦,stdout和stderr的顺序几乎是正确的。

在Python 2.7 linux和windows上测试并正确工作。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

其他回答

Fcntl, select, asyncproc在这种情况下不起作用。

不管操作系统如何,读取流而不阻塞的可靠方法是使用Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn't solve the problem because readline() blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can't because readline() is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

在我看来,这比使用select或signal模块来解决这个问题要简洁一些,但它只适用于UNIX…

下面是一个在python中支持非阻塞读和后台写的模块:

https://pypi.python.org/pypi/python-nonblock

提供一个函数,

nonblock_read将从流中读取数据,如果可用,否则返回一个空字符串(或None,如果流在另一端关闭,并且所有可能的数据都已读取)

你也可以考虑python-subprocess2模块,

https://pypi.python.org/pypi/python-subprocess2

这将添加到子流程模块。所以在subprocess返回的对象上。Popen”被添加了一个额外的方法,runInBackground。这将启动一个线程并返回一个对象,该对象将在写入stdout/stderr时自动填充,而不会阻塞主线程。

享受吧!

不是第一个,也可能不是最后一个,我已经构建了一个包,它使用两种不同的方法执行非阻塞标准输出PIPE读取,一种是基于J.F. Sebastian (@jfs)的答案,另一种是一个简单的communication()循环,使用线程检查超时。

两种标准输出捕获方法都在Linux和Windows下进行了测试,截至撰写本文时,Python版本从2.7到3.9

由于它是非阻塞的,它保证了超时强制,即使有多个子进程和孙子进程,甚至在Python 2.7下也是如此。

该包还处理字节和文本标准输出编码,当试图捕获EOF时,这是一个噩梦。

您可以在https://github.com/netinvent/command_runner上找到该软件包

如果你需要一些经过良好测试的非阻塞读取实现,可以尝试一下(或修改代码):

pip install command_runner

from command_runner import command_runner

exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')

您可以在_poll_process()或_monitor_process()中找到核心的非阻塞读取代码,这取决于所使用的捕获方法。 在此基础上,您可以实现自己想要的功能,或者简单地使用整个包作为子进程替换来执行命令。

Python 3.4为异步IO引入了新的临时API——asyncio模块。

该方法类似于@Bryan Ward的twisted-based回答——定义一个协议,一旦数据准备好,就调用它的方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

请参阅文档中的“Subprocess”。

有一个高级接口asyncio.create_subprocess_exec(),它返回允许使用StreamReader.readline()协程异步读取一行的Process对象 (使用async/await Python 3.5+语法):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

Readline_and_kill()执行以下任务:

启动子进程,将其标准输出重定向到管道 异步从子进程的stdout中读取一行 杀子流程 等待它退出

如果需要,每个步骤都可以被超时秒限制。