我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。

我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?


当前回答

下面是对给定的基于线程的解决方案的轻微改进。

下面的代码支持异常:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

用5秒超时调用它:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

其他回答

我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

下面是一个简单的例子,运行一个带有timeout的方法,并在成功时检索它的值。

import multiprocessing
import time

ret = {"foo": False}


def worker(queue):
    """worker function"""

    ret = queue.get()

    time.sleep(1)

    ret["foo"] = True
    queue.put(ret)


if __name__ == "__main__":
    queue = multiprocessing.Queue()
    queue.put(ret)

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join(timeout=10)

    if p.exitcode is None:
        print("The worker timed out.")
    else:
        print(f"The worker completed and returned: {queue.get()}")

你可以使用多处理。过程来做到这一点。

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()

我也遇到过同样的问题,但我的情况是需要在子线程上工作,信号不适合我,所以我写了一个python包:timeout-timer来解决这个问题,支持用作上下文或装饰器,使用信号或子线程模块来触发超时中断:

from timeout_timer import timeout, TimeoutInterrupt

class TimeoutInterruptNested(TimeoutInterrupt):
    pass

def test_timeout_nested_loop_both_timeout(timer="thread"):
    cnt = 0
    try:
        with timeout(5, timer=timer):
            try:
                with timeout(2, timer=timer, exception=TimeoutInterruptNested):
                    sleep(2)
            except TimeoutInterruptNested:
                cnt += 1
            time.sleep(10)
    except TimeoutInterrupt:
        cnt += 1
    assert cnt == 2

查看更多信息:https://github.com/dozysun/timeout-timer

伟大的,易于使用和可靠的PyPi项目超时装饰器(https://pypi.org/project/timeout-decorator/)

安装:

pip install timeout-decorator

用法:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()