下面是运行任意命令返回其标准输出数据的Python代码,或在非零退出码上引发异常:

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communication用于等待进程退出:

stdoutdata, stderrdata = proc.communicate()

子进程模块不支持超时——杀死运行超过X秒的进程的能力——因此,通信可能需要很长时间才能运行。

在Windows和Linux上运行的Python程序中实现超时的最简单方法是什么?


当前回答

我已经修改了sussudio的答案。现在函数返回:(returncode, stdout, stderr, timeout) - stdout和stderr被解码为utf-8字符串

def kill_proc(proc, timeout):
  timeout["value"] = True
  proc.kill()

def run(cmd, timeout_sec):
  proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  timeout = {"value": False}
  timer = Timer(timeout_sec, kill_proc, [proc, timeout])
  timer.start()
  stdout, stderr = proc.communicate()
  timer.cancel()
  return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]

其他回答

不幸的是,我的雇主在公开源代码方面有非常严格的规定,所以我不能提供实际的代码。但对我来说,最好的解决方案是创建一个子类重写Popen.wait()来轮询,而不是无限期地等待。__init__接受超时参数。一旦你这样做,所有其他Popen方法(调用等待)将按预期工作,包括通信。

从Python 3.5开始,有了一个新的subprocess.run通用命令(意味着取代check_call, check_output…),它也有timeout=参数。

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None) 执行args描述的命令。等待命令完成,然后返回一个CompletedProcess实例。

它引发一个子流程。超时超时时出现TimeoutExpired异常。

有一个想法是子类化Popen类并用一些简单的方法装饰器来扩展它。我们叫它ExpirablePopen吧。

from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread


class ExpirablePopen(Popen):

    def __init__(self, *args, **kwargs):
        self.timeout = kwargs.pop('timeout', 0)
        self.timer = None
        self.done = Event()

        Popen.__init__(self, *args, **kwargs)

    def __tkill(self):
        timeout = self.timeout
        if not self.done.wait(timeout):
            error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
            self.kill()

    def expirable(func):
        def wrapper(self, *args, **kwargs):
            # zero timeout means call of parent method
            if self.timeout == 0:
                return func(self, *args, **kwargs)

            # if timer is None, need to start it
            if self.timer is None:
                self.timer = thr = Thread(target=self.__tkill)
                thr.daemon = True
                thr.start()

            result = func(self, *args, **kwargs)
            self.done.set()

            return result
        return wrapper

    wait = expirable(Popen.wait)
    communicate = expirable(Popen.communicate)


if __name__ == '__main__':
    from subprocess import PIPE

    print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()

如果你正在使用python2,请尝试一下

import subprocess32

try:
    output = subprocess32.check_output(command, shell=True, timeout=3)
except subprocess32.TimeoutExpired as e:
    print e

如果你用的是Unix,

import signal
  ...
class Alarm(Exception):
    pass

def alarm_handler(signum, frame):
    raise Alarm

signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60)  # 5 minutes
try:
    stdoutdata, stderrdata = proc.communicate()
    signal.alarm(0)  # reset the alarm
except Alarm:
    print "Oops, taking too long!"
    # whatever else