我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。
下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
该解决方案使用select模块从IO流中“读取任何可用数据”。这个函数一开始会阻塞,直到数据可用,但随后只读取可用的数据,不再进一步阻塞。
鉴于它使用了select模块,这只适用于Unix。
该代码完全符合pep8。
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
一种解决方案是让另一个进程执行对该进程的读取,或者创建一个带有超时的进程线程。
这是一个超时函数的线程版本:
http://code.activestate.com/recipes/473878/
但是,是否需要在stdout传入时读取它?
另一种解决方案可能是将输出转储到一个文件中,然后使用p.t wait()等待进程完成。
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
现有的解决方案不适合我(详情见下文)。最后成功的是使用read(1)实现readline(基于这个答案)。后者不阻塞:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
为什么现有的解决方案不起作用:
Solutions that require readline (including the Queue based ones) always block. It is difficult (impossible?) to kill the thread that executes readline. It only gets killed when the process that created it finishes, but not when the output-producing process is killed.
Mixing low-level fcntl with high-level readline calls may not work properly as anonnn has pointed out.
Using select.poll() is neat, but doesn't work on Windows according to python docs.
Using third-party libraries seems overkill for this task and adds additional dependencies.
我有最初提问者的问题,但不希望调用线程。我将Jesse的解决方案与来自管道的直接read()和我自己的行读取缓冲处理程序混合在一起(然而,我的子进程- ping -总是写入<系统页面大小的整行)。我通过读入一个gobject-registered io watch来避免忙碌的等待。这些天我通常在gobject MainLoop中运行代码以避免线程。
def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
观察者是
def watch(f, *other):
print 'reading',f.read()
return True
主程序建立一个ping,然后调用gobject邮件循环。
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
任何其他工作都附加到gobject中的回调。
该解决方案使用select模块从IO流中“读取任何可用数据”。这个函数一开始会阻塞,直到数据可用,但随后只读取可用的数据,不再进一步阻塞。
鉴于它使用了select模块,这只适用于Unix。
该代码完全符合pep8。
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer