下面是运行任意命令返回其标准输出数据的Python代码,或在非零退出码上引发异常:
proc = subprocess.Popen(
cmd,
stderr=subprocess.STDOUT, # Merge stdout and stderr
stdout=subprocess.PIPE,
shell=True)
communication用于等待进程退出:
stdoutdata, stderrdata = proc.communicate()
子进程模块不支持超时——杀死运行超过X秒的进程的能力——因此,通信可能需要很长时间才能运行。
在Windows和Linux上运行的Python程序中实现超时的最简单方法是什么?
Jcollado的答案可以使用线程来简化。定时器类:
import shlex
from subprocess import Popen, PIPE
from threading import Timer
def run(cmd, timeout_sec):
proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
timer = Timer(timeout_sec, proc.kill)
try:
timer.start()
stdout, stderr = proc.communicate()
finally:
timer.cancel()
# Examples: both take 1 second
run("sleep 1", 5) # process ends normally at 1 second
run("sleep 5", 1) # timeout happens at 1 second
另一种选择是写入临时文件以防止标准输出阻塞,而不需要使用communication()轮询。在其他答案没有的地方,这个方法对我有用;比如在窗户上。
outFile = tempfile.SpooledTemporaryFile()
errFile = tempfile.SpooledTemporaryFile()
proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
wait_remaining_sec = timeout
while proc.poll() is None and wait_remaining_sec > 0:
time.sleep(1)
wait_remaining_sec -= 1
if wait_remaining_sec <= 0:
killProc(proc.pid)
raise ProcessIncompleteError(proc, timeout)
# read temp streams from start
outFile.seek(0);
errFile.seek(0);
out = outFile.read()
err = errFile.read()
outFile.close()
errFile.close()
有时需要处理(ffmpeg)而不使用communication(),在这种情况下需要异步超时,这是使用ttldict实现的一种实用方法
PIP安装ttldict
from ttldict import TTLOrderedDict
sp_timeout = TTLOrderedDict(default_ttl=10)
def kill_on_timeout(done, proc):
while True:
now = time.time()
if sp_timeout.get('exp_time') == None:
proc.kill()
break
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, stderr=subprocess.STDOUT)
sp_timeout['exp_time'] = time.time()
done = Event()
watcher = Thread(target=kill_on_timeout, args=(done, process))
watcher.daemon = True
watcher.start()
done.set()
for line in process.stdout:
.......
一旦你理解了在*unix中运行机器的整个过程,你将很容易找到更简单的解决方案:
考虑这个简单的例子,如何使用select.select()(现在几乎在*nix上随处可见)创建可超时的communication()冰毒。这也可以用epoll/poll/kqueue来编写,但select.select()变体可能是一个很好的例子。select.select()的主要限制(速度和1024 max fds)不适用于您的任务。
它在*nix下工作,不创建线程,不使用信号,可以从任何线程启动(不仅仅是主线程),并且足够快,可以从我机器上的stdout读取250mb/s的数据(i5 2.3ghz)。
在通信结束时连接stdout/stderr时出现问题。如果你有大量的程序输出,这可能会导致大量的内存使用。但是您可以多次调用communication(),超时时间较小。
class Popen(subprocess.Popen):
def communicate(self, input=None, timeout=None):
if timeout is None:
return subprocess.Popen.communicate(self, input)
if self.stdin:
# Flush stdio buffer, this might block if user
# has been writing to .stdin in an uncontrolled
# fashion.
self.stdin.flush()
if not input:
self.stdin.close()
read_set, write_set = [], []
stdout = stderr = None
if self.stdin and input:
write_set.append(self.stdin)
if self.stdout:
read_set.append(self.stdout)
stdout = []
if self.stderr:
read_set.append(self.stderr)
stderr = []
input_offset = 0
deadline = time.time() + timeout
while read_set or write_set:
try:
rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time()))
except select.error as ex:
if ex.args[0] == errno.EINTR:
continue
raise
if not (rlist or wlist):
# Just break if timeout
# Since we do not close stdout/stderr/stdin, we can call
# communicate() several times reading data by smaller pieces.
break
if self.stdin in wlist:
chunk = input[input_offset:input_offset + subprocess._PIPE_BUF]
try:
bytes_written = os.write(self.stdin.fileno(), chunk)
except OSError as ex:
if ex.errno == errno.EPIPE:
self.stdin.close()
write_set.remove(self.stdin)
else:
raise
else:
input_offset += bytes_written
if input_offset >= len(input):
self.stdin.close()
write_set.remove(self.stdin)
# Read stdout / stderr by 1024 bytes
for fn, tgt in (
(self.stdout, stdout),
(self.stderr, stderr),
):
if fn in rlist:
data = os.read(fn.fileno(), 1024)
if data == '':
fn.close()
read_set.remove(fn)
tgt.append(data)
if stdout is not None:
stdout = ''.join(stdout)
if stderr is not None:
stderr = ''.join(stderr)
return (stdout, stderr)