下面是运行任意命令返回其标准输出数据的Python代码,或在非零退出码上引发异常:

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communication用于等待进程退出:

stdoutdata, stderrdata = proc.communicate()

子进程模块不支持超时——杀死运行超过X秒的进程的能力——因此,通信可能需要很长时间才能运行。

在Windows和Linux上运行的Python程序中实现超时的最简单方法是什么?


当前回答

虽然我还没有广泛地研究它,但我在ActiveState中发现的这个装饰器似乎对这类事情非常有用。伴随着subprocess.Popen(…, close_fds=True),至少我已经准备好在Python中编写shell脚本。

其他回答

我使用的解决方案是在shell命令前面加上timelimit。如果命令花费的时间太长,timelimit将停止它,Popen将有一个由timelimit设置的返回码。如果它是> 128,它意味着时间限制杀死进程。

参见python subprocess with timeout and large output (>64K)

仅适用于Linux,但如果有人想使用subprocess.getstatusoutput(),其中timeout参数不可用,您可以在命令的开头使用内置的Linux timeout,即:

import subprocess

timeout = 25 # seconds
cmd = f"timeout --preserve-status --foreground {timeout} ping duckgo.com"
exit_c, out = subprocess.getstatusoutput(cmd)

if (exit_c == 0):
    print("success")
else:
    print("Error: ", out)

超时参数:

——preserve-status:保留退出状态 ——前台:在前台运行 25:超时时间,单位为秒

如果你用的是Unix,

import signal
  ...
class Alarm(Exception):
    pass

def alarm_handler(signum, frame):
    raise Alarm

signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60)  # 5 minutes
try:
    stdoutdata, stderrdata = proc.communicate()
    signal.alarm(0)  # reset the alarm
except Alarm:
    print "Oops, taking too long!"
    # whatever else

有一个想法是子类化Popen类并用一些简单的方法装饰器来扩展它。我们叫它ExpirablePopen吧。

from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread


class ExpirablePopen(Popen):

    def __init__(self, *args, **kwargs):
        self.timeout = kwargs.pop('timeout', 0)
        self.timer = None
        self.done = Event()

        Popen.__init__(self, *args, **kwargs)

    def __tkill(self):
        timeout = self.timeout
        if not self.done.wait(timeout):
            error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
            self.kill()

    def expirable(func):
        def wrapper(self, *args, **kwargs):
            # zero timeout means call of parent method
            if self.timeout == 0:
                return func(self, *args, **kwargs)

            # if timer is None, need to start it
            if self.timer is None:
                self.timer = thr = Thread(target=self.__tkill)
                thr.daemon = True
                thr.start()

            result = func(self, *args, **kwargs)
            self.done.set()

            return result
        return wrapper

    wait = expirable(Popen.wait)
    communicate = expirable(Popen.communicate)


if __name__ == '__main__':
    from subprocess import PIPE

    print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()

有时需要处理(ffmpeg)而不使用communication(),在这种情况下需要异步超时,这是使用ttldict实现的一种实用方法

PIP安装ttldict

from ttldict import  TTLOrderedDict   
sp_timeout = TTLOrderedDict(default_ttl=10)

def kill_on_timeout(done, proc):
    while True:
        now = time.time()
        if sp_timeout.get('exp_time') == None:
                proc.kill()
                break
    
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, stderr=subprocess.STDOUT)
            
sp_timeout['exp_time'] = time.time()
            
done = Event()
watcher = Thread(target=kill_on_timeout, args=(done, process))
watcher.daemon = True
watcher.start()
done.set()

for line in process.stdout:
.......