下面是运行任意命令返回其标准输出数据的Python代码,或在非零退出码上引发异常:

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communication用于等待进程退出:

stdoutdata, stderrdata = proc.communicate()

子进程模块不支持超时——杀死运行超过X秒的进程的能力——因此,通信可能需要很长时间才能运行。

在Windows和Linux上运行的Python程序中实现超时的最简单方法是什么?


当前回答

从Python 3.5开始,有了一个新的subprocess.run通用命令(意味着取代check_call, check_output…),它也有timeout=参数。

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None) 执行args描述的命令。等待命令完成,然后返回一个CompletedProcess实例。

它引发一个子流程。超时超时时出现TimeoutExpired异常。

其他回答

另一种选择是写入临时文件以防止标准输出阻塞,而不需要使用communication()轮询。在其他答案没有的地方,这个方法对我有用;比如在窗户上。

    outFile =  tempfile.SpooledTemporaryFile() 
    errFile =   tempfile.SpooledTemporaryFile() 
    proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
    wait_remaining_sec = timeout

    while proc.poll() is None and wait_remaining_sec > 0:
        time.sleep(1)
        wait_remaining_sec -= 1

    if wait_remaining_sec <= 0:
        killProc(proc.pid)
        raise ProcessIncompleteError(proc, timeout)

    # read temp streams from start
    outFile.seek(0);
    errFile.seek(0);
    out = outFile.read()
    err = errFile.read()
    outFile.close()
    errFile.close()

您可以使用select来完成此操作

import subprocess
from datetime import datetime
from select import select

def call_with_timeout(cmd, timeout):
    started = datetime.now()
    sp = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    while True:
        p = select([sp.stdout], [], [], timeout)
        if p[0]:
            p[0][0].read()
        ret = sp.poll()
        if ret is not None:
            return ret
        if (datetime.now()-started).total_seconds() > timeout:
            sp.kill()
            return None

我有一个问题,我想终止一个多线程子进程,如果它花费的时间超过给定的超时长度。我想在Popen()中设置一个超时,但它不起作用。然后,我意识到Popen().wait()等于call(),所以我有了在.wait(timeout=xxx)方法中设置超时的想法,这最终工作了。因此,我是这样解决的:

import os
import sys
import signal
import subprocess
from multiprocessing import Pool

cores_for_parallelization = 4
timeout_time = 15  # seconds

def main():
    jobs = [...YOUR_JOB_LIST...]
    with Pool(cores_for_parallelization) as p:
        p.map(run_parallel_jobs, jobs)

def run_parallel_jobs(args):
    # Define the arguments including the paths
    initial_terminal_command = 'C:\\Python34\\python.exe'  # Python executable
    function_to_start = 'C:\\temp\\xyz.py'  # The multithreading script
    final_list = [initial_terminal_command, function_to_start]
    final_list.extend(args)

    # Start the subprocess and determine the process PID
    subp = subprocess.Popen(final_list)  # starts the process
    pid = subp.pid

    # Wait until the return code returns from the function by considering the timeout. 
    # If not, terminate the process.
    try:
        returncode = subp.wait(timeout=timeout_time)  # should be zero if accomplished
    except subprocess.TimeoutExpired:
        # Distinguish between Linux and Windows and terminate the process if 
        # the timeout has been expired
        if sys.platform == 'linux2':
            os.kill(pid, signal.SIGTERM)
        elif sys.platform == 'win32':
            subp.terminate()

if __name__ == '__main__':
    main()

我已经实现了我能从其中一些收集到的东西。这适用于Windows,因为这是一个社区维基,我想我也会分享我的代码:

class Command(threading.Thread):
    def __init__(self, cmd, outFile, errFile, timeout):
        threading.Thread.__init__(self)
        self.cmd = cmd
        self.process = None
        self.outFile = outFile
        self.errFile = errFile
        self.timed_out = False
        self.timeout = timeout

    def run(self):
        self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \
            stderr = self.errFile)

        while (self.process.poll() is None and self.timeout > 0):
            time.sleep(1)
            self.timeout -= 1

        if not self.timeout > 0:
            self.process.terminate()
            self.timed_out = True
        else:
            self.timed_out = False

然后从另一个类或文件:

        outFile =  tempfile.SpooledTemporaryFile()
        errFile =   tempfile.SpooledTemporaryFile()

        executor = command.Command(c, outFile, errFile, timeout)
        executor.daemon = True
        executor.start()

        executor.join()
        if executor.timed_out:
            out = 'timed out'
        else:
            outFile.seek(0)
            errFile.seek(0)
            out = outFile.read()
            err = errFile.read()

        outFile.close()
        errFile.close()

仅适用于Linux,但如果有人想使用subprocess.getstatusoutput(),其中timeout参数不可用,您可以在命令的开头使用内置的Linux timeout,即:

import subprocess

timeout = 25 # seconds
cmd = f"timeout --preserve-status --foreground {timeout} ping duckgo.com"
exit_c, out = subprocess.getstatusoutput(cmd)

if (exit_c == 0):
    print("success")
else:
    print("Error: ", out)

超时参数:

——preserve-status:保留退出状态 ——前台:在前台运行 25:超时时间,单位为秒