我的python脚本使用subprocess调用一个非常吵闹的linux实用程序。我想将所有输出存储到一个日志文件中,并将其中一些显示给用户。我认为下面的代码可以工作,但是直到实用程序产生大量输出,输出才显示在我的应用程序中。
#fake_utility.py, just generates lots of output over time
import time
i = 0
while True:
print hex(i)*512
i += 1
time.sleep(0.5)
#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
for line in proc.stdout:
#the real code does filtering here
print "test:", line.rstrip()
我真正想要的行为是让过滤器脚本打印从子进程接收到的每一行。有点像tee,但用的是python代码。
我错过了什么?这可能吗?
更新:
如果将sys.stdout.flush()添加到fake_utility.py中,代码在python 3.1中具有所需的行为。我使用的是python 2.6。您可能认为使用proc.stdout.xreadlines()的工作方式与py3k相同,但事实并非如此。
更新2:
下面是最小的工作代码。
#fake_utility.py, just generates lots of output over time
import sys, time
for i in range(10):
print i
sys.stdout.flush()
time.sleep(0.5)
#display out put line by line
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
#works in python 3.0+
#for line in proc.stdout:
for line in iter(proc.stdout.readline,''):
print line.rstrip()
我尝试用python3,它工作,源代码
当你使用popen生成新线程时,你告诉操作系统PIPE子进程的stdout,这样父进程就可以读取它,在这里,stderr被复制到父进程的stderr。
在output_reader中,我们读取子进程的每一行stdout,方法是将它包装在迭代器中,每当有新行准备好时,迭代器就会逐行填充子进程的输出。
def output_reader(proc):
for line in iter(proc.stdout.readline, b''):
print('got line: {0}'.format(line.decode('utf-8')), end='')
def main():
proc = subprocess.Popen(['python', 'fake_utility.py'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
t = threading.Thread(target=output_reader, args=(proc,))
t.start()
try:
time.sleep(0.2)
import time
i = 0
while True:
print (hex(i)*512)
i += 1
time.sleep(0.5)
finally:
proc.terminate()
try:
proc.wait(timeout=0.2)
print('== subprocess exited with rc =', proc.returncode)
except subprocess.TimeoutExpired:
print('subprocess did not terminate in time')
t.join()
允许逐行实时遍历stdout和stderr的函数
如果需要同时获取stdout和stderr的输出流,可以使用下面的函数。
该函数使用Queues将两个Popen管道合并到一个迭代器中。
这里我们创建了函数read_popen_pipes():
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
def enqueue_output(file, queue):
for line in iter(file.readline, ''):
queue.put(line)
file.close()
def read_popen_pipes(p):
with ThreadPoolExecutor(2) as pool:
q_stdout, q_stderr = Queue(), Queue()
pool.submit(enqueue_output, p.stdout, q_stdout)
pool.submit(enqueue_output, p.stderr, q_stderr)
while True:
if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
break
out_line = err_line = ''
try:
out_line = q_stdout.get_nowait()
except Empty:
pass
try:
err_line = q_stderr.get_nowait()
except Empty:
pass
yield (out_line, err_line)
Read_popen_pipes()正在使用:
import subprocess as sp
with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
for out_line, err_line in read_popen_pipes(p):
# Do stuff with each line, e.g.:
print(out_line, end='')
print(err_line, end='')
return p.poll() # return status-code
实际上,如果您对迭代器进行了排序,那么缓冲现在可能是您的问题。你可以告诉子进程中的python不要缓冲它的输出。
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
就变成了
proc = subprocess.Popen(['python','-u', 'fake_utility.py'],stdout=subprocess.PIPE)
当从python内部调用python时,我需要这个。