我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。

下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

当前回答

下面是一个在python中支持非阻塞读和后台写的模块:

https://pypi.python.org/pypi/python-nonblock

提供一个函数,

nonblock_read将从流中读取数据,如果可用,否则返回一个空字符串(或None,如果流在另一端关闭,并且所有可能的数据都已读取)

你也可以考虑python-subprocess2模块,

https://pypi.python.org/pypi/python-subprocess2

这将添加到子流程模块。所以在subprocess返回的对象上。Popen”被添加了一个额外的方法,runInBackground。这将启动一个线程并返回一个对象,该对象将在写入stdout/stderr时自动填充,而不会阻塞主线程。

享受吧!

其他回答

You can do this really easily in Twisted. Depending upon your existing code base, this might not be that easy to use, but if you are building a twisted application, then things like this become almost trivial. You create a ProcessProtocol class, and override the outReceived() method. Twisted (depending upon the reactor used) is usually just a big select() loop with callbacks installed to handle data from different file descriptors (often network sockets). So the outReceived() method is simply installing a callback for handling data coming from STDOUT. A simple example demonstrating this behavior is as follows:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Twisted文档在这方面有一些很好的信息。

如果您围绕Twisted构建整个应用程序,它可以与其他进程(本地或远程)进行异步通信,就像这样非常优雅。另一方面,如果您的程序不是构建在Twisted之上,那么这真的不会有多大帮助。希望这能对其他读者有所帮助,即使它不适用于您的特定应用程序。

现有的解决方案不适合我(详情见下文)。最后成功的是使用read(1)实现readline(基于这个答案)。后者不阻塞:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

为什么现有的解决方案不起作用:

Solutions that require readline (including the Queue based ones) always block. It is difficult (impossible?) to kill the thread that executes readline. It only gets killed when the process that created it finishes, but not when the output-producing process is killed. Mixing low-level fcntl with high-level readline calls may not work properly as anonnn has pointed out. Using select.poll() is neat, but doesn't work on Windows according to python docs. Using third-party libraries seems overkill for this task and adds additional dependencies.

试试asyncproc模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

该模块负责S.Lott建议的所有线程。

这个版本的非阻塞读取不需要特殊的模块,并且可以在大多数Linux发行版上开箱即用。

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

我基于J. F. Sebastian的解决方案创建了一个库。你可以使用它。

https://github.com/cenkalti/what