我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。

下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

当前回答

我的问题有点不同,因为我想从正在运行的进程中收集stdout和stderr,但最终是一样的,因为我想在小部件生成时在小部件中呈现输出。

我不希望使用队列或额外的线程来解决许多建议的解决方案,因为执行运行另一个脚本并收集其输出这样的常见任务应该不需要它们。

在阅读了建议的解决方案和python文档后,我解决了以下实现的问题。是的,它只适用于POSIX,因为我正在使用select函数调用。

我同意,对于这样一个常见的脚本任务,文档是令人困惑的,实现是尴尬的。我相信旧版本的python对Popen有不同的默认值和不同的解释,所以造成了很多困惑。这似乎对Python 2.7.12和3.5.2都很有效。

关键是将bufsize=1设置为行缓冲,然后universal_newlines=True处理为文本文件而不是二进制文件,这似乎成为设置bufsize=1时的默认值。

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG和VERBOSE仅仅是将输出打印到终端的宏。

这个解决方案在我看来是99.99%有效的,因为它仍然使用了阻塞readline函数,所以我们假设子进程很好,输出完整的行。

我欢迎反馈,以改进解决方案,因为我仍然是Python新手。

其他回答

在现代Python中,情况要好得多。

下面是一个简单的子程序"hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

以及一个与之交互的程序:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

打印出来:

b'hello bob\n'
b'hello alice\n'

请注意,实际的模式(几乎所有前面的回答,包括这里和相关的问题)是将子进程的stdout文件描述符设置为非阻塞,然后在某种选择循环中轮询它。当然,现在这个循环是由asyncio提供的。

为什么要打扰线程和队列? 与readline()不同,BufferedReader.read1()不会阻塞等待\r\n,如果有任何输出进入,它会尽快返回。

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

使用select & read(1)。

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

readline()例如:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

选择模块帮助您确定下一个有用的输入在哪里。

但是,您几乎总是更喜欢使用单独的线程。一个是阻塞读取stdin,另一个是阻塞你不想阻塞的地方。

我基于J. F. Sebastian的解决方案创建了一个库。你可以使用它。

https://github.com/cenkalti/what