我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
以防对任何人都有帮助,在@piro的回答的基础上,我做了一个函数装饰器:
import time
import signal
from functools import wraps
def timeout(timeout_secs: int):
def wrapper(func):
@wraps(func)
def time_limited(*args, **kwargs):
# Register an handler for the timeout
def handler(signum, frame):
raise Exception(f"Timeout for function '{func.__name__}'")
# Register the signal function handler
signal.signal(signal.SIGALRM, handler)
# Define a timeout for your function
signal.alarm(timeout_secs)
result = None
try:
result = func(*args, **kwargs)
except Exception as exc:
raise exc
finally:
# disable the signal alarm
signal.alarm(0)
return result
return time_limited
return wrapper
在一个有20秒超时的函数上使用包装器看起来像这样:
@timeout(20)
def my_slow_or_never_ending_function(name):
while True:
time.sleep(1)
print(f"Yet another second passed {name}...")
try:
results = my_slow_or_never_ending_function("Yooo!")
except Exception as e:
print(f"ERROR: {e}")
其他回答
以防对任何人都有帮助,在@piro的回答的基础上,我做了一个函数装饰器:
import time
import signal
from functools import wraps
def timeout(timeout_secs: int):
def wrapper(func):
@wraps(func)
def time_limited(*args, **kwargs):
# Register an handler for the timeout
def handler(signum, frame):
raise Exception(f"Timeout for function '{func.__name__}'")
# Register the signal function handler
signal.signal(signal.SIGALRM, handler)
# Define a timeout for your function
signal.alarm(timeout_secs)
result = None
try:
result = func(*args, **kwargs)
except Exception as exc:
raise exc
finally:
# disable the signal alarm
signal.alarm(0)
return result
return time_limited
return wrapper
在一个有20秒超时的函数上使用包装器看起来像这样:
@timeout(20)
def my_slow_or_never_ending_function(name):
while True:
time.sleep(1)
print(f"Yet another second passed {name}...")
try:
results = my_slow_or_never_ending_function("Yooo!")
except Exception as e:
print(f"ERROR: {e}")
asyncio的另一个解决方案:
如果你想取消后台任务,而不仅仅是在运行的主代码上超时,那么你需要一个来自主线程的显式通信,要求任务的代码取消,比如threading.Event()
import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor
class SingletonTimeOut:
pool = None
@classmethod
def run(cls, to_run: functools.partial, timeout: float):
pool = cls.get_pool()
loop = cls.get_loop()
try:
task = loop.run_in_executor(pool, to_run)
return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
except asyncio.TimeoutError as e:
error_type = type(e).__name__ #TODO
raise e
@classmethod
def get_pool(cls):
if cls.pool is None:
cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
return cls.pool
@classmethod
def get_loop(cls):
try:
return asyncio.get_event_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
# print("NEW LOOP" + str(threading.current_thread().ident))
return asyncio.get_event_loop()
# ---------------
TIME_OUT = float('0.2') # seconds
def toto(input_items,nb_predictions):
return 1
to_run = functools.partial(toto,
input_items=1,
nb_predictions="a")
results = SingletonTimeOut.run(to_run, TIME_OUT)
在@piro答案的基础上,您可以构建一个contextmanager。这允许非常易读的代码,将在成功运行后禁用警报信号(sets signal.alarm(0))
from contextlib import contextmanager
import signal
import time
@contextmanager
def timeout(duration):
def timeout_handler(signum, frame):
raise TimeoutError(f'block timedout after {duration} seconds')
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(duration)
try:
yield
finally:
signal.alarm(0)
def sleeper(duration):
time.sleep(duration)
print('finished')
使用示例:
In [19]: with timeout(2):
...: sleeper(1)
...:
finished
In [20]: with timeout(2):
...: sleeper(3)
...:
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
1 with timeout(2):
----> 2 sleeper(3)
3
<ipython-input-7-a75b966bf7ac> in sleeper(t)
1 def sleeper(t):
----> 2 time.sleep(t)
3 print('finished')
4
<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
2 def timeout(duration):
3 def timeout_handler(signum, frame):
----> 4 raise Exception(f'block timedout after {duration} seconds')
5 signal.signal(signal.SIGALRM, timeout_handler)
6 signal.alarm(duration)
Exception: block timedout after 2 seconds
超时装饰器不能在Windows系统上工作,因为Windows不太支持信号。
如果你在windows系统中使用超时装饰器,你会得到以下结果
AttributeError: module 'signal' has no attribute 'SIGALRM'
有些人建议使用use_signals=False,但对我没用。
作者@bitranox创建了以下包:
pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
代码示例:
import time
from wrapt_timeout_decorator import *
@timeout(5)
def mytest(message):
print(message)
for i in range(1,10):
time.sleep(1)
print('{} seconds have passed'.format(i))
def main():
mytest('starting')
if __name__ == '__main__':
main()
给出以下例外:
TimeoutError: Function mytest timed out after 5 seconds
我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result