我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。

我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?


当前回答

在pypi上找到的stopit包似乎可以很好地处理超时。

我喜欢@stopit。Threading_timeoutable装饰器,它向被装饰的函数添加了一个超时参数,该参数执行您所期望的操作,它将停止该函数。

在pypi上查看:https://pypi.python.org/pypi/stopit

其他回答

Tim Savannah的func_timeout包对我来说工作得很好。

安装:

PIP安装func_timeout

用法:

import time
from func_timeout import func_timeout, FunctionTimedOut

def my_func(n):
    time.sleep(n)

time_to_sleep = 10

# time out after 2 seconds using kwargs
func_timeout(2, my_func, kwargs={'n' : time_to_sleep})

# time out after 2 seconds using args
func_timeout(2, my_func, args=(time_to_sleep,))

asyncio的另一个解决方案:

如果你想取消后台任务,而不仅仅是在运行的主代码上超时,那么你需要一个来自主线程的显式通信,要求任务的代码取消,比如threading.Event()

import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor


class SingletonTimeOut:
    pool = None

    @classmethod
    def run(cls, to_run: functools.partial, timeout: float):
        pool = cls.get_pool()
        loop = cls.get_loop()
        try:
            task = loop.run_in_executor(pool, to_run)
            return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
        except asyncio.TimeoutError as e:
            error_type = type(e).__name__ #TODO
            raise e

    @classmethod
    def get_pool(cls):
        if cls.pool is None:
            cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
        return cls.pool

    @classmethod
    def get_loop(cls):
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            asyncio.set_event_loop(asyncio.new_event_loop())
            # print("NEW LOOP" + str(threading.current_thread().ident))
            return asyncio.get_event_loop()

# ---------------

TIME_OUT = float('0.2')  # seconds

def toto(input_items,nb_predictions):
    return 1

to_run = functools.partial(toto,
                           input_items=1,
                           nb_predictions="a")

results = SingletonTimeOut.run(to_run, TIME_OUT)

我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

有很多建议,但没有一个是使用并发的。期货,我认为这是最清晰的处理方式。

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

超级简单的阅读和维护。

我们创建一个池,提交一个进程,然后等待5秒,然后引发一个TimeoutError,你可以根据需要捕获和处理它。

本机为python 3.2+,并反向移植到2.7 (pip install futures)。

线程和进程之间的切换非常简单,只需将ProcessPoolExecutor替换为ThreadPoolExecutor。

如果您想在超时时终止进程,我建议您查看Pebble。