我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。

下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

当前回答

在现代Python中,情况要好得多。

下面是一个简单的子程序"hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

以及一个与之交互的程序:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

打印出来:

b'hello bob\n'
b'hello alice\n'

请注意,实际的模式(几乎所有前面的回答,包括这里和相关的问题)是将子进程的stdout文件描述符设置为非阻塞,然后在某种选择循环中轮询它。当然,现在这个循环是由asyncio提供的。

其他回答

我添加这个问题是为了读一些子进程。Popen stdout。 下面是我的非阻塞读解决方案:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

在我的例子中,我需要一个日志模块来捕获后台应用程序的输出并对其进行扩充(添加时间戳、颜色等)。

我最终使用了一个后台线程来执行实际的I/O操作。以下代码仅适用于POSIX平台。我去掉了不重要的部分。

如果有人打算长期使用这个野兽,可以考虑管理开放描述符。对我来说,这不是什么大问题。

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

我有最初提问者的问题,但不希望调用线程。我将Jesse的解决方案与来自管道的直接read()和我自己的行读取缓冲处理程序混合在一起(然而,我的子进程- ping -总是写入<系统页面大小的整行)。我通过读入一个gobject-registered io watch来避免忙碌的等待。这些天我通常在gobject MainLoop中运行代码以避免线程。

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

观察者是

def watch(f, *other):
    print 'reading',f.read()
    return True

主程序建立一个ping,然后调用gobject邮件循环。

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

任何其他工作都附加到gobject中的回调。

我的问题有点不同,因为我想从正在运行的进程中收集stdout和stderr,但最终是一样的,因为我想在小部件生成时在小部件中呈现输出。

我不希望使用队列或额外的线程来解决许多建议的解决方案,因为执行运行另一个脚本并收集其输出这样的常见任务应该不需要它们。

在阅读了建议的解决方案和python文档后,我解决了以下实现的问题。是的,它只适用于POSIX,因为我正在使用select函数调用。

我同意,对于这样一个常见的脚本任务,文档是令人困惑的,实现是尴尬的。我相信旧版本的python对Popen有不同的默认值和不同的解释,所以造成了很多困惑。这似乎对Python 2.7.12和3.5.2都很有效。

关键是将bufsize=1设置为行缓冲,然后universal_newlines=True处理为文本文件而不是二进制文件,这似乎成为设置bufsize=1时的默认值。

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG和VERBOSE仅仅是将输出打印到终端的宏。

这个解决方案在我看来是99.99%有效的,因为它仍然使用了阻塞readline函数,所以我们假设子进程很好,输出完整的行。

我欢迎反馈,以改进解决方案,因为我仍然是Python新手。

在类unix系统和Python 3.5+上有os。Set_blocking,做的就是它所说的。

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

这个输出:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

与操作系统。Set_blocking注释它是:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''