下面是运行任意命令返回其标准输出数据的Python代码,或在非零退出码上引发异常:
proc = subprocess.Popen(
cmd,
stderr=subprocess.STDOUT, # Merge stdout and stderr
stdout=subprocess.PIPE,
shell=True)
communication用于等待进程退出:
stdoutdata, stderrdata = proc.communicate()
子进程模块不支持超时——杀死运行超过X秒的进程的能力——因此,通信可能需要很长时间才能运行。
在Windows和Linux上运行的Python程序中实现超时的最简单方法是什么?
有时需要处理(ffmpeg)而不使用communication(),在这种情况下需要异步超时,这是使用ttldict实现的一种实用方法
PIP安装ttldict
from ttldict import TTLOrderedDict
sp_timeout = TTLOrderedDict(default_ttl=10)
def kill_on_timeout(done, proc):
while True:
now = time.time()
if sp_timeout.get('exp_time') == None:
proc.kill()
break
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, stderr=subprocess.STDOUT)
sp_timeout['exp_time'] = time.time()
done = Event()
watcher = Thread(target=kill_on_timeout, args=(done, process))
watcher.daemon = True
watcher.start()
done.set()
for line in process.stdout:
.......
我对底层细节了解不多;但是,考虑到这一点
python 2.6的API提供了等待线程和的能力
终止进程,那么在一个单独的进程中运行该进程呢
的线程吗?
import subprocess, threading
class Command(object):
def __init__(self, cmd):
self.cmd = cmd
self.process = None
def run(self, timeout):
def target():
print 'Thread started'
self.process = subprocess.Popen(self.cmd, shell=True)
self.process.communicate()
print 'Thread finished'
thread = threading.Thread(target=target)
thread.start()
thread.join(timeout)
if thread.is_alive():
print 'Terminating process'
self.process.terminate()
thread.join()
print self.process.returncode
command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)
这段代码在我的机器中的输出是:
Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15
在哪里可以看到,在第一次执行的过程
正确完成(返回代码0),而在第二个
进程被终止(返回代码-15)。
我没有在windows中测试;但是,除了更新示例之外
命令,我想它应该工作,因为我还没有找到
记录任何说明该线程的内容。Join或process.terminate
不支持。
下面是Alex Martelli的解决方案,它是一个具有适当进程终止的模块。其他方法不起作用,因为它们没有使用proc. communication()。所以如果你有一个产生大量输出的进程,它会填满它的输出缓冲区,然后阻塞,直到你从中读取一些东西。
from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen
def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None):
'''
Run a command with a timeout after which it will be forcibly
killed.
'''
class Alarm(Exception):
pass
def alarm_handler(signum, frame):
raise Alarm
p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
if timeout != -1:
signal(SIGALRM, alarm_handler)
alarm(timeout)
try:
stdout, stderr = p.communicate()
if timeout != -1:
alarm(0)
except Alarm:
pids = [p.pid]
if kill_tree:
pids.extend(get_process_children(p.pid))
for pid in pids:
# process might have died before getting to this line
# so wrap to avoid OSError: no such process
try:
kill(pid, SIGKILL)
except OSError:
pass
return -9, '', ''
return p.returncode, stdout, stderr
def get_process_children(pid):
p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True,
stdout = PIPE, stderr = PIPE)
stdout, stderr = p.communicate()
return [int(p) for p in stdout.split()]
if __name__ == '__main__':
print run('find /', shell = True, timeout = 3)
print run('find', shell = True)
这是我的解决方案,我使用线程和事件:
import subprocess
from threading import Thread, Event
def kill_on_timeout(done, timeout, proc):
if not done.wait(timeout):
proc.kill()
def exec_command(command, timeout):
done = Event()
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc))
watcher.daemon = True
watcher.start()
data, stderr = proc.communicate()
done.set()
return data, stderr, proc.returncode
在行动:
In [2]: exec_command(['sleep', '10'], 5)
Out[2]: ('', '', -9)
In [3]: exec_command(['sleep', '10'], 11)
Out[3]: ('', '', 0)