我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。

下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

当前回答

Python 3.4为异步IO引入了新的临时API——asyncio模块。

该方法类似于@Bryan Ward的twisted-based回答——定义一个协议,一旦数据准备好,就调用它的方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

请参阅文档中的“Subprocess”。

有一个高级接口asyncio.create_subprocess_exec(),它返回允许使用StreamReader.readline()协程异步读取一行的Process对象 (使用async/await Python 3.5+语法):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

Readline_and_kill()执行以下任务:

启动子进程,将其标准输出重定向到管道 异步从子进程的stdout中读取一行 杀子流程 等待它退出

如果需要,每个步骤都可以被超时秒限制。

其他回答

我的问题有点不同,因为我想从正在运行的进程中收集stdout和stderr,但最终是一样的,因为我想在小部件生成时在小部件中呈现输出。

我不希望使用队列或额外的线程来解决许多建议的解决方案,因为执行运行另一个脚本并收集其输出这样的常见任务应该不需要它们。

在阅读了建议的解决方案和python文档后,我解决了以下实现的问题。是的,它只适用于POSIX,因为我正在使用select函数调用。

我同意,对于这样一个常见的脚本任务,文档是令人困惑的,实现是尴尬的。我相信旧版本的python对Popen有不同的默认值和不同的解释,所以造成了很多困惑。这似乎对Python 2.7.12和3.5.2都很有效。

关键是将bufsize=1设置为行缓冲,然后universal_newlines=True处理为文本文件而不是二进制文件,这似乎成为设置bufsize=1时的默认值。

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG和VERBOSE仅仅是将输出打印到终端的宏。

这个解决方案在我看来是99.99%有效的,因为它仍然使用了阻塞readline函数,所以我们假设子进程很好,输出完整的行。

我欢迎反馈,以改进解决方案,因为我仍然是Python新手。

不是第一个,也可能不是最后一个,我已经构建了一个包,它使用两种不同的方法执行非阻塞标准输出PIPE读取,一种是基于J.F. Sebastian (@jfs)的答案,另一种是一个简单的communication()循环,使用线程检查超时。

两种标准输出捕获方法都在Linux和Windows下进行了测试,截至撰写本文时,Python版本从2.7到3.9

由于它是非阻塞的,它保证了超时强制,即使有多个子进程和孙子进程,甚至在Python 2.7下也是如此。

该包还处理字节和文本标准输出编码,当试图捕获EOF时,这是一个噩梦。

您可以在https://github.com/netinvent/command_runner上找到该软件包

如果你需要一些经过良好测试的非阻塞读取实现,可以尝试一下(或修改代码):

pip install command_runner

from command_runner import command_runner

exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')

您可以在_poll_process()或_monitor_process()中找到核心的非阻塞读取代码,这取决于所使用的捕获方法。 在此基础上,您可以实现自己想要的功能,或者简单地使用整个包作为子进程替换来执行命令。

一种解决方案是让另一个进程执行对该进程的读取,或者创建一个带有超时的进程线程。

这是一个超时函数的线程版本:

http://code.activestate.com/recipes/473878/

但是,是否需要在stdout传入时读取它? 另一种解决方案可能是将输出转储到一个文件中,然后使用p.t wait()等待进程完成。

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

我有最初提问者的问题,但不希望调用线程。我将Jesse的解决方案与来自管道的直接read()和我自己的行读取缓冲处理程序混合在一起(然而,我的子进程- ping -总是写入<系统页面大小的整行)。我通过读入一个gobject-registered io watch来避免忙碌的等待。这些天我通常在gobject MainLoop中运行代码以避免线程。

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

观察者是

def watch(f, *other):
    print 'reading',f.read()
    return True

主程序建立一个ping,然后调用gobject邮件循环。

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

任何其他工作都附加到gobject中的回调。

选择模块帮助您确定下一个有用的输入在哪里。

但是,您几乎总是更喜欢使用单独的线程。一个是阻塞读取stdin,另一个是阻塞你不想阻塞的地方。