我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。

下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

当前回答

试试asyncproc模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

该模块负责S.Lott建议的所有线程。

其他回答

Fcntl, select, asyncproc在这种情况下不起作用。

不管操作系统如何,读取流而不阻塞的可靠方法是使用Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn't solve the problem because readline() blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can't because readline() is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

在我看来,这比使用select或signal模块来解决这个问题要简洁一些,但它只适用于UNIX…

下面是一个基于线程的简单解决方案:

适用于Linux和Windows(不依赖于select)。 异步读取stdout和stderr。 不依赖于具有任意等待时间的活动轮询(CPU友好)。 不使用asyncio(这可能与其他库冲突)。 一直运行到子进程终止为止。

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

试试asyncproc模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

该模块负责S.Lott建议的所有线程。

选择模块帮助您确定下一个有用的输入在哪里。

但是,您几乎总是更喜欢使用单独的线程。一个是阻塞读取stdin,另一个是阻塞你不想阻塞的地方。