我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
asyncio的另一个解决方案:
如果你想取消后台任务,而不仅仅是在运行的主代码上超时,那么你需要一个来自主线程的显式通信,要求任务的代码取消,比如threading.Event()
import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor
class SingletonTimeOut:
pool = None
@classmethod
def run(cls, to_run: functools.partial, timeout: float):
pool = cls.get_pool()
loop = cls.get_loop()
try:
task = loop.run_in_executor(pool, to_run)
return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
except asyncio.TimeoutError as e:
error_type = type(e).__name__ #TODO
raise e
@classmethod
def get_pool(cls):
if cls.pool is None:
cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
return cls.pool
@classmethod
def get_loop(cls):
try:
return asyncio.get_event_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
# print("NEW LOOP" + str(threading.current_thread().ident))
return asyncio.get_event_loop()
# ---------------
TIME_OUT = float('0.2') # seconds
def toto(input_items,nb_predictions):
return 1
to_run = functools.partial(toto,
input_items=1,
nb_predictions="a")
results = SingletonTimeOut.run(to_run, TIME_OUT)
其他回答
在pypi上找到的stopit包似乎可以很好地处理超时。
我喜欢@stopit。Threading_timeoutable装饰器,它向被装饰的函数添加了一个超时参数,该参数执行您所期望的操作,它将停止该函数。
在pypi上查看:https://pypi.python.org/pypi/stopit
有很多建议,但没有一个是使用并发的。期货,我认为这是最清晰的处理方式。
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
超级简单的阅读和维护。
我们创建一个池,提交一个进程,然后等待5秒,然后引发一个TimeoutError,你可以根据需要捕获和处理它。
本机为python 3.2+,并反向移植到2.7 (pip install futures)。
线程和进程之间的切换非常简单,只需将ProcessPoolExecutor替换为ThreadPoolExecutor。
如果您想在超时时终止进程,我建议您查看Pebble。
我在搜索单元测试的超时调用时遇到了这个线程。我没有在答案或第三方包中找到任何简单的东西,所以我写了下面的装饰器,你可以直接放入代码中:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
然后就像这样简单地超时测试或任何你喜欢的函数:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
伟大的,易于使用和可靠的PyPi项目超时装饰器(https://pypi.org/project/timeout-decorator/)
安装:
pip install timeout-decorator
用法:
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i
if __name__ == '__main__':
mytest()