我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。

下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

当前回答

在类unix系统和Python 3.5+上有os。Set_blocking,做的就是它所说的。

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

这个输出:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

与操作系统。Set_blocking注释它是:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''

其他回答

一种解决方案是让另一个进程执行对该进程的读取,或者创建一个带有超时的进程线程。

这是一个超时函数的线程版本:

http://code.activestate.com/recipes/473878/

但是,是否需要在stdout传入时读取它? 另一种解决方案可能是将输出转储到一个文件中,然后使用p.t wait()等待进程完成。

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

下面是一个基于线程的简单解决方案:

适用于Linux和Windows(不依赖于select)。 异步读取stdout和stderr。 不依赖于具有任意等待时间的活动轮询(CPU友好)。 不使用asyncio(这可能与其他库冲突)。 一直运行到子进程终止为止。

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

我有最初提问者的问题,但不希望调用线程。我将Jesse的解决方案与来自管道的直接read()和我自己的行读取缓冲处理程序混合在一起(然而,我的子进程- ping -总是写入<系统页面大小的整行)。我通过读入一个gobject-registered io watch来避免忙碌的等待。这些天我通常在gobject MainLoop中运行代码以避免线程。

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

观察者是

def watch(f, *other):
    print 'reading',f.read()
    return True

主程序建立一个ping,然后调用gobject邮件循环。

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

任何其他工作都附加到gobject中的回调。

在现代Python中,情况要好得多。

下面是一个简单的子程序"hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

以及一个与之交互的程序:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

打印出来:

b'hello bob\n'
b'hello alice\n'

请注意,实际的模式(几乎所有前面的回答,包括这里和相关的问题)是将子进程的stdout文件描述符设置为非阻塞,然后在某种选择循环中轮询它。当然,现在这个循环是由asyncio提供的。

根据J.F. Sebastian的答案和其他几个来源,我组合了一个简单的子流程管理器。它提供了请求的非阻塞读取,以及并行运行几个进程。它不使用任何特定于操作系统的调用(我知道),因此应该在任何地方工作。

它可以从pypi获得,所以只需pip安装shelljob。参考项目页面的示例和完整的文档。