我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
我们也可以用信号来表示。我认为下面的例子会对你有用。与线程相比,它非常简单。
import signal
def timeout(signum, frame):
raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
print 'Starting Main ',
while 1:
print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
main()
except myException:
print "whoops"
其他回答
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
有很多建议,但没有一个是使用并发的。期货,我认为这是最清晰的处理方式。
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
超级简单的阅读和维护。
我们创建一个池,提交一个进程,然后等待5秒,然后引发一个TimeoutError,你可以根据需要捕获和处理它。
本机为python 3.2+,并反向移植到2.7 (pip install futures)。
线程和进程之间的切换非常简单,只需将ProcessPoolExecutor替换为ThreadPoolExecutor。
如果您想在超时时终止进程,我建议您查看Pebble。
突出了
引发TimeoutError使用异常在超时时发出警报-可以很容易地修改 跨平台:Windows和Mac OS X 兼容性:Python 3.6+(我也在Python 2.7上进行了测试,它可以在很小的语法调整下工作)
有关平行地图的完整解释和扩展,请参见https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts
最小的例子
>>> @killer_call(timeout=4)
... def bar(x):
... import time
... time.sleep(x)
... return x
>>> bar(10)
Traceback (most recent call last):
...
__main__.TimeoutError: function 'bar' timed out after 4s
正如预期的那样
>>> bar(2)
2
完整代码
import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill
from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any
class TimeoutError(Exception):
def __init__(self, func: Callable, timeout: int):
self.t = timeout
self.fname = func.__name__
def __str__(self):
return f"function '{self.fname}' timed out after {self.t}s"
def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
"""lemmiwinks crawls into the unknown"""
q.put(dill.loads(func)(*args, **kwargs))
def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
"""
Single function call with a timeout
Args:
func: the function
timeout: The timeout in seconds
"""
if not isinstance(timeout, int):
raise ValueError(f'timeout needs to be an int. Got: {timeout}')
if func is None:
return functools.partial(killer_call, timeout=timeout)
@functools.wraps(killer_call)
def _inners(*args, **kwargs) -> Any:
q_worker = mp.Queue()
proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
proc.start()
try:
return q_worker.get(timeout=timeout)
except mpq.Empty:
raise TimeoutError(func, timeout)
finally:
try:
proc.terminate()
except:
pass
return _inners
if __name__ == '__main__':
@killer_call(timeout=4)
def bar(x):
import time
time.sleep(x)
return x
print(bar(2))
bar(10)
笔记
由于dill的工作方式,您需要在函数内部导入。
这也意味着如果目标函数中有导入,这些函数可能与doctest不兼容。你将会遇到__import__未找到的问题。
伟大的,易于使用和可靠的PyPi项目超时装饰器(https://pypi.org/project/timeout-decorator/)
安装:
pip install timeout-decorator
用法:
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i
if __name__ == '__main__':
mytest()
我在搜索单元测试的超时调用时遇到了这个线程。我没有在答案或第三方包中找到任何简单的东西,所以我写了下面的装饰器,你可以直接放入代码中:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
然后就像这样简单地超时测试或任何你喜欢的函数:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...