我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
如果您在UNIX上运行,则可以使用信号包:
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print("Forever is over!")
...: raise Exception("end of time")
...:
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print("sec")
...: time.sleep(1)
...:
...:
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print(exc)
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
在调用signal.alarm(10)后10秒,调用处理程序。这会引发一个异常,您可以从常规Python代码中拦截该异常。
这个模块不能很好地使用线程(但是,谁能呢?)
注意,由于我们在超时发生时引发异常,它可能最终在函数内部被捕获并忽略,例如这样一个函数:
def loop_forever():
while 1:
print('sec')
try:
time.sleep(10)
except:
continue
其他回答
以防对任何人都有帮助,在@piro的回答的基础上,我做了一个函数装饰器:
import time
import signal
from functools import wraps
def timeout(timeout_secs: int):
def wrapper(func):
@wraps(func)
def time_limited(*args, **kwargs):
# Register an handler for the timeout
def handler(signum, frame):
raise Exception(f"Timeout for function '{func.__name__}'")
# Register the signal function handler
signal.signal(signal.SIGALRM, handler)
# Define a timeout for your function
signal.alarm(timeout_secs)
result = None
try:
result = func(*args, **kwargs)
except Exception as exc:
raise exc
finally:
# disable the signal alarm
signal.alarm(0)
return result
return time_limited
return wrapper
在一个有20秒超时的函数上使用包装器看起来像这样:
@timeout(20)
def my_slow_or_never_ending_function(name):
while True:
time.sleep(1)
print(f"Yet another second passed {name}...")
try:
results = my_slow_or_never_ending_function("Yooo!")
except Exception as e:
print(f"ERROR: {e}")
你可以使用多处理。过程来做到这一点。
Code
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate - may not work if process is stuck for good
p.terminate()
# OR Kill - will work for sure, no chance for process to finish nicely however
# p.kill()
p.join()
我们也可以用信号来表示。我认为下面的例子会对你有用。与线程相比,它非常简单。
import signal
def timeout(signum, frame):
raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
print 'Starting Main ',
while 1:
print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
main()
except myException:
print "whoops"
下面是一个POSIX版本,它结合了前面的许多答案来提供以下特性:
子进程阻塞执行。 timeout函数在类成员函数上的使用。 严格要求终止时间。
下面是代码和一些测试用例:
import threading
import signal
import os
import time
class TerminateExecution(Exception):
"""
Exception to indicate that execution has exceeded the preset running time.
"""
def quit_function(pid):
# Killing all subprocesses
os.setpgrp()
os.killpg(0, signal.SIGTERM)
# Killing the main thread
os.kill(pid, signal.SIGTERM)
def handle_term(signum, frame):
raise TerminateExecution()
def invoke_with_timeout(timeout, fn, *args, **kwargs):
# Setting a sigterm handler and initiating a timer
old_handler = signal.signal(signal.SIGTERM, handle_term)
timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
terminate = False
# Executing the function
timer.start()
try:
result = fn(*args, **kwargs)
except TerminateExecution:
terminate = True
finally:
# Restoring original handler and cancel timer
signal.signal(signal.SIGTERM, old_handler)
timer.cancel()
if terminate:
raise BaseException("xxx")
return result
### Test cases
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
time.sleep(1)
print('countdown finished')
return 1337
def really_long_function():
time.sleep(10)
def really_long_function2():
os.system("sleep 787")
# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337
# Testing various scenarios
t1 = time.time()
try:
print(invoke_with_timeout(1, countdown, 3))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function2))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)
class X:
def __init__(self):
self.value = 0
def set(self, v):
self.value = v
x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9
asyncio的另一个解决方案:
如果你想取消后台任务,而不仅仅是在运行的主代码上超时,那么你需要一个来自主线程的显式通信,要求任务的代码取消,比如threading.Event()
import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor
class SingletonTimeOut:
pool = None
@classmethod
def run(cls, to_run: functools.partial, timeout: float):
pool = cls.get_pool()
loop = cls.get_loop()
try:
task = loop.run_in_executor(pool, to_run)
return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
except asyncio.TimeoutError as e:
error_type = type(e).__name__ #TODO
raise e
@classmethod
def get_pool(cls):
if cls.pool is None:
cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
return cls.pool
@classmethod
def get_loop(cls):
try:
return asyncio.get_event_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
# print("NEW LOOP" + str(threading.current_thread().ident))
return asyncio.get_event_loop()
# ---------------
TIME_OUT = float('0.2') # seconds
def toto(input_items,nb_predictions):
return 1
to_run = functools.partial(toto,
input_items=1,
nb_predictions="a")
results = SingletonTimeOut.run(to_run, TIME_OUT)