我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。

我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?


当前回答

伟大的,易于使用和可靠的PyPi项目超时装饰器(https://pypi.org/project/timeout-decorator/)

安装:

pip install timeout-decorator

用法:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()

其他回答

我们也可以用信号来表示。我认为下面的例子会对你有用。与线程相比,它非常简单。

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

你可以使用多处理。过程来做到这一点。

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()

asyncio的另一个解决方案:

如果你想取消后台任务,而不仅仅是在运行的主代码上超时,那么你需要一个来自主线程的显式通信,要求任务的代码取消,比如threading.Event()

import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor


class SingletonTimeOut:
    pool = None

    @classmethod
    def run(cls, to_run: functools.partial, timeout: float):
        pool = cls.get_pool()
        loop = cls.get_loop()
        try:
            task = loop.run_in_executor(pool, to_run)
            return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
        except asyncio.TimeoutError as e:
            error_type = type(e).__name__ #TODO
            raise e

    @classmethod
    def get_pool(cls):
        if cls.pool is None:
            cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
        return cls.pool

    @classmethod
    def get_loop(cls):
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            asyncio.set_event_loop(asyncio.new_event_loop())
            # print("NEW LOOP" + str(threading.current_thread().ident))
            return asyncio.get_event_loop()

# ---------------

TIME_OUT = float('0.2')  # seconds

def toto(input_items,nb_predictions):
    return 1

to_run = functools.partial(toto,
                           input_items=1,
                           nb_predictions="a")

results = SingletonTimeOut.run(to_run, TIME_OUT)

我在搜索单元测试的超时调用时遇到了这个线程。我没有在答案或第三方包中找到任何简单的东西,所以我写了下面的装饰器,你可以直接放入代码中:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

然后就像这样简单地超时测试或任何你喜欢的函数:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

在pypi上找到的stopit包似乎可以很好地处理超时。

我喜欢@stopit。Threading_timeoutable装饰器,它向被装饰的函数添加了一个超时参数,该参数执行您所期望的操作,它将停止该函数。

在pypi上查看:https://pypi.python.org/pypi/stopit