我使用subprocess模块启动子流程并连接到其输出流(标准输出)。我希望能够对其标准输出执行非阻塞读取。是否有一种方法使.readline非阻塞或在我调用.readline之前检查流上是否有数据?我希望这是可移植的,或者至少可以在Windows和Linux下工作。
下面是我现在怎么做的(如果没有数据可用,它会阻塞在.readline上):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
这个版本的非阻塞读取不需要特殊的模块,并且可以在大多数Linux发行版上开箱即用。
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
一种解决方案是让另一个进程执行对该进程的读取,或者创建一个带有超时的进程线程。
这是一个超时函数的线程版本:
http://code.activestate.com/recipes/473878/
但是,是否需要在stdout传入时读取它?
另一种解决方案可能是将输出转储到一个文件中,然后使用p.t wait()等待进程完成。
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn't solve the problem because readline() blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can't because readline() is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
在我看来,这比使用select或signal模块来解决这个问题要简洁一些,但它只适用于UNIX…
I also faced the problem described by Jesse and solved it by using "select" as Bradley, Andy and others did but in a blocking mode to avoid a busy loop. It uses a dummy Pipe as a fake stdin. The select blocks and wait for either stdin or the pipe to be ready. When a key is pressed stdin unblocks the select and the key value can be retrieved with read(1). When a different thread writes to the pipe then the pipe unblocks the select and it can be taken as an indication that the need for stdin is over. Here is some reference code:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
现有的解决方案不适合我(详情见下文)。最后成功的是使用read(1)实现readline(基于这个答案)。后者不阻塞:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
为什么现有的解决方案不起作用:
Solutions that require readline (including the Queue based ones) always block. It is difficult (impossible?) to kill the thread that executes readline. It only gets killed when the process that created it finishes, but not when the output-producing process is killed.
Mixing low-level fcntl with high-level readline calls may not work properly as anonnn has pointed out.
Using select.poll() is neat, but doesn't work on Windows according to python docs.
Using third-party libraries seems overkill for this task and adds additional dependencies.
免责声明:这只适用于龙卷风
您可以通过将fd设置为非阻塞,然后使用ioloop来注册回调来实现这一点。我把它打包在一个名为tornado_subprocess的鸡蛋中,你可以通过PyPI安装它:
easy_install tornado_subprocess
现在你可以这样做:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
你也可以将它与RequestHandler一起使用
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()