我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。
下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
免责声明:这只适用于龙卷风
您可以通过将fd设置为非阻塞,然后使用ioloop来注册回调来实现这一点。我把它打包在一个名为tornado_subprocess的鸡蛋中,你可以通过PyPI安装它:
easy_install tornado_subprocess
现在你可以这样做:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
你也可以将它与RequestHandler一起使用
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
Python 3.4为异步IO引入了新的临时API——asyncio模块。
该方法类似于@Bryan Ward的twisted-based回答——定义一个协议,一旦数据准备好,就调用它的方法:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
请参阅文档中的“Subprocess”。
有一个高级接口asyncio.create_subprocess_exec(),它返回允许使用StreamReader.readline()协程异步读取一行的Process对象
(使用async/await Python 3.5+语法):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
Readline_and_kill()执行以下任务:
启动子进程,将其标准输出重定向到管道
异步从子进程的stdout中读取一行
杀子流程
等待它退出
如果需要,每个步骤都可以被超时秒限制。
不是第一个,也可能不是最后一个,我已经构建了一个包,它使用两种不同的方法执行非阻塞标准输出PIPE读取,一种是基于J.F. Sebastian (@jfs)的答案,另一种是一个简单的communication()循环,使用线程检查超时。
两种标准输出捕获方法都在Linux和Windows下进行了测试,截至撰写本文时,Python版本从2.7到3.9
由于它是非阻塞的,它保证了超时强制,即使有多个子进程和孙子进程,甚至在Python 2.7下也是如此。
该包还处理字节和文本标准输出编码,当试图捕获EOF时,这是一个噩梦。
您可以在https://github.com/netinvent/command_runner上找到该软件包
如果你需要一些经过良好测试的非阻塞读取实现,可以尝试一下(或修改代码):
pip install command_runner
from command_runner import command_runner
exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')
您可以在_poll_process()或_monitor_process()中找到核心的非阻塞读取代码,这取决于所使用的捕获方法。
在此基础上,您可以实现自己想要的功能,或者简单地使用整个包作为子进程替换来执行命令。
在类unix系统和Python 3.5+上有os。Set_blocking,做的就是它所说的。
import os
import time
import subprocess
cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
# first iteration always produces empty byte string in non-blocking mode
for i in range(2):
line = p.stdout.readline()
print(i, line)
time.sleep(0.5)
if time.time() > start + 5:
break
p.terminate()
这个输出:
1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'
与操作系统。Set_blocking注释它是:
0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''