下面是运行任意命令返回其标准输出数据的Python代码,或在非零退出码上引发异常:

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communication用于等待进程退出:

stdoutdata, stderrdata = proc.communicate()

子进程模块不支持超时——杀死运行超过X秒的进程的能力——因此,通信可能需要很长时间才能运行。

在Windows和Linux上运行的Python程序中实现超时的最简单方法是什么?


当前回答

这是我的解决方案,我使用线程和事件:

import subprocess
from threading import Thread, Event

def kill_on_timeout(done, timeout, proc):
    if not done.wait(timeout):
        proc.kill()

def exec_command(command, timeout):

    done = Event()
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc))
    watcher.daemon = True
    watcher.start()

    data, stderr = proc.communicate()
    done.set()

    return data, stderr, proc.returncode

在行动:

In [2]: exec_command(['sleep', '10'], 5)
Out[2]: ('', '', -9)

In [3]: exec_command(['sleep', '10'], 11)
Out[3]: ('', '', 0)

其他回答

在Python 3.7.8中超时测试后捕获的输出示例:

try:
    return subprocess.run(command, shell=True, capture_output=True, timeout=20, cwd=cwd, universal_newlines=True)
except subprocess.TimeoutExpired as e:
    print(e.output.decode(encoding="utf-8", errors="ignore"))
    assert False;

异常子进程。TimeoutExpired有输出和其他成员:

cmd -用于生成子进程的命令。 timeout -超时时间,单位为秒。 output -子进程的输出,如果它是由run()或 check_output()。否则,没有。 stdout -输出的别名,用于与stderr对称。 stderr -子进程的stderr输出,如果它被捕获 run()。否则,没有。

更多信息:https://docs.python.org/3/library/subprocess.html#subprocess.TimeoutExpired

虽然我还没有广泛地研究它,但我在ActiveState中发现的这个装饰器似乎对这类事情非常有用。伴随着subprocess.Popen(…, close_fds=True),至少我已经准备好在Python中编写shell脚本。

我对底层细节了解不多;但是,考虑到这一点 python 2.6的API提供了等待线程和的能力 终止进程,那么在一个单独的进程中运行该进程呢 的线程吗?

import subprocess, threading

class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None

    def run(self, timeout):
        def target():
            print 'Thread started'
            self.process = subprocess.Popen(self.cmd, shell=True)
            self.process.communicate()
            print 'Thread finished'

        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if thread.is_alive():
            print 'Terminating process'
            self.process.terminate()
            thread.join()
        print self.process.returncode

command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)

这段代码在我的机器中的输出是:

Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15

在哪里可以看到,在第一次执行的过程 正确完成(返回代码0),而在第二个 进程被终止(返回代码-15)。

我没有在windows中测试;但是,除了更新示例之外 命令,我想它应该工作,因为我还没有找到 记录任何说明该线程的内容。Join或process.terminate 不支持。

我有一个问题,我想终止一个多线程子进程,如果它花费的时间超过给定的超时长度。我想在Popen()中设置一个超时,但它不起作用。然后,我意识到Popen().wait()等于call(),所以我有了在.wait(timeout=xxx)方法中设置超时的想法,这最终工作了。因此,我是这样解决的:

import os
import sys
import signal
import subprocess
from multiprocessing import Pool

cores_for_parallelization = 4
timeout_time = 15  # seconds

def main():
    jobs = [...YOUR_JOB_LIST...]
    with Pool(cores_for_parallelization) as p:
        p.map(run_parallel_jobs, jobs)

def run_parallel_jobs(args):
    # Define the arguments including the paths
    initial_terminal_command = 'C:\\Python34\\python.exe'  # Python executable
    function_to_start = 'C:\\temp\\xyz.py'  # The multithreading script
    final_list = [initial_terminal_command, function_to_start]
    final_list.extend(args)

    # Start the subprocess and determine the process PID
    subp = subprocess.Popen(final_list)  # starts the process
    pid = subp.pid

    # Wait until the return code returns from the function by considering the timeout. 
    # If not, terminate the process.
    try:
        returncode = subp.wait(timeout=timeout_time)  # should be zero if accomplished
    except subprocess.TimeoutExpired:
        # Distinguish between Linux and Windows and terminate the process if 
        # the timeout has been expired
        if sys.platform == 'linux2':
            os.kill(pid, signal.SIGTERM)
        elif sys.platform == 'win32':
            subp.terminate()

if __name__ == '__main__':
    main()

仅适用于Linux,但如果有人想使用subprocess.getstatusoutput(),其中timeout参数不可用,您可以在命令的开头使用内置的Linux timeout,即:

import subprocess

timeout = 25 # seconds
cmd = f"timeout --preserve-status --foreground {timeout} ping duckgo.com"
exit_c, out = subprocess.getstatusoutput(cmd)

if (exit_c == 0):
    print("success")
else:
    print("Error: ", out)

超时参数:

——preserve-status:保留退出状态 ——前台:在前台运行 25:超时时间,单位为秒