下面是运行任意命令返回其标准输出数据的Python代码,或在非零退出码上引发异常:

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communication用于等待进程退出:

stdoutdata, stderrdata = proc.communicate()

子进程模块不支持超时——杀死运行超过X秒的进程的能力——因此,通信可能需要很长时间才能运行。

在Windows和Linux上运行的Python程序中实现超时的最简单方法是什么?


当前回答

只是想写些简单点的东西。

#!/usr/bin/python

from subprocess import Popen, PIPE
import datetime
import time 

popen = Popen(["/bin/sleep", "10"]);
pid = popen.pid
sttime = time.time();
waittime =  3

print "Start time %s"%(sttime)

while True:
    popen.poll();
    time.sleep(1)
    rcode = popen.returncode
    now = time.time();
    if [ rcode is None ]  and  [ now > (sttime + waittime) ] :
        print "Killing it now"
        popen.kill()

其他回答

如果你用的是Unix,

import signal
  ...
class Alarm(Exception):
    pass

def alarm_handler(signum, frame):
    raise Alarm

signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60)  # 5 minutes
try:
    stdoutdata, stderrdata = proc.communicate()
    signal.alarm(0)  # reset the alarm
except Alarm:
    print "Oops, taking too long!"
    # whatever else

在Python 3.7.8中超时测试后捕获的输出示例:

try:
    return subprocess.run(command, shell=True, capture_output=True, timeout=20, cwd=cwd, universal_newlines=True)
except subprocess.TimeoutExpired as e:
    print(e.output.decode(encoding="utf-8", errors="ignore"))
    assert False;

异常子进程。TimeoutExpired有输出和其他成员:

cmd -用于生成子进程的命令。 timeout -超时时间,单位为秒。 output -子进程的输出,如果它是由run()或 check_output()。否则,没有。 stdout -输出的别名,用于与stderr对称。 stderr -子进程的stderr输出,如果它被捕获 run()。否则,没有。

更多信息:https://docs.python.org/3/library/subprocess.html#subprocess.TimeoutExpired

有一个想法是子类化Popen类并用一些简单的方法装饰器来扩展它。我们叫它ExpirablePopen吧。

from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread


class ExpirablePopen(Popen):

    def __init__(self, *args, **kwargs):
        self.timeout = kwargs.pop('timeout', 0)
        self.timer = None
        self.done = Event()

        Popen.__init__(self, *args, **kwargs)

    def __tkill(self):
        timeout = self.timeout
        if not self.done.wait(timeout):
            error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
            self.kill()

    def expirable(func):
        def wrapper(self, *args, **kwargs):
            # zero timeout means call of parent method
            if self.timeout == 0:
                return func(self, *args, **kwargs)

            # if timer is None, need to start it
            if self.timer is None:
                self.timer = thr = Thread(target=self.__tkill)
                thr.daemon = True
                thr.start()

            result = func(self, *args, **kwargs)
            self.done.set()

            return result
        return wrapper

    wait = expirable(Popen.wait)
    communicate = expirable(Popen.communicate)


if __name__ == '__main__':
    from subprocess import PIPE

    print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()

预先设置Linux命令超时并不是一个糟糕的解决方法,它对我来说是有效的。

cmd = "timeout 20 "+ cmd
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = p.communicate()

另一种选择是写入临时文件以防止标准输出阻塞,而不需要使用communication()轮询。在其他答案没有的地方,这个方法对我有用;比如在窗户上。

    outFile =  tempfile.SpooledTemporaryFile() 
    errFile =   tempfile.SpooledTemporaryFile() 
    proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
    wait_remaining_sec = timeout

    while proc.poll() is None and wait_remaining_sec > 0:
        time.sleep(1)
        wait_remaining_sec -= 1

    if wait_remaining_sec <= 0:
        killProc(proc.pid)
        raise ProcessIncompleteError(proc, timeout)

    # read temp streams from start
    outFile.seek(0);
    errFile.seek(0);
    out = outFile.read()
    err = errFile.read()
    outFile.close()
    errFile.close()