我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。
下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
下面是一个基于线程的简单解决方案:
适用于Linux和Windows(不依赖于select)。
异步读取stdout和stderr。
不依赖于具有任意等待时间的活动轮询(CPU友好)。
不使用asyncio(这可能与其他库冲突)。
一直运行到子进程终止为止。
printer.py
import time
import sys
sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)
reader.py
import queue
import subprocess
import sys
import threading
def enqueue_stream(stream, queue, type):
for line in iter(stream.readline, b''):
queue.put(str(type) + line.decode('utf-8'))
stream.close()
def enqueue_process(process, queue):
process.wait()
queue.put('x')
p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()
while True:
line = q.get()
if line[0] == 'x':
break
if line[0] == '2': # stderr
sys.stdout.write("\033[0;31m") # ANSI red color
sys.stdout.write(line[1:])
if line[0] == '2':
sys.stdout.write("\033[0m") # reset ANSI code
sys.stdout.flush()
tp.join()
to.join()
te.join()
我添加这个问题是为了读一些子进程。Popen stdout。
下面是我的非阻塞读解决方案:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Python 3.4为异步IO引入了新的临时API——asyncio模块。
该方法类似于@Bryan Ward的twisted-based回答——定义一个协议,一旦数据准备好,就调用它的方法:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
请参阅文档中的“Subprocess”。
有一个高级接口asyncio.create_subprocess_exec(),它返回允许使用StreamReader.readline()协程异步读取一行的Process对象
(使用async/await Python 3.5+语法):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
Readline_and_kill()执行以下任务:
启动子进程,将其标准输出重定向到管道
异步从子进程的stdout中读取一行
杀子流程
等待它退出
如果需要,每个步骤都可以被超时秒限制。
这是一个在子进程中执行交互命令的例子,通过伪终端实现了stdout的交互。您可以参考:https://stackoverflow.com/a/43012138/3555925
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)