我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
其他回答
在@piro答案的基础上,您可以构建一个contextmanager。这允许非常易读的代码,将在成功运行后禁用警报信号(sets signal.alarm(0))
from contextlib import contextmanager
import signal
import time
@contextmanager
def timeout(duration):
def timeout_handler(signum, frame):
raise TimeoutError(f'block timedout after {duration} seconds')
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(duration)
try:
yield
finally:
signal.alarm(0)
def sleeper(duration):
time.sleep(duration)
print('finished')
使用示例:
In [19]: with timeout(2):
...: sleeper(1)
...:
finished
In [20]: with timeout(2):
...: sleeper(3)
...:
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
1 with timeout(2):
----> 2 sleeper(3)
3
<ipython-input-7-a75b966bf7ac> in sleeper(t)
1 def sleeper(t):
----> 2 time.sleep(t)
3 print('finished')
4
<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
2 def timeout(duration):
3 def timeout_handler(signum, frame):
----> 4 raise Exception(f'block timedout after {duration} seconds')
5 signal.signal(signal.SIGALRM, timeout_handler)
6 signal.alarm(duration)
Exception: block timedout after 2 seconds
下面是一个POSIX版本,它结合了前面的许多答案来提供以下特性:
子进程阻塞执行。 timeout函数在类成员函数上的使用。 严格要求终止时间。
下面是代码和一些测试用例:
import threading
import signal
import os
import time
class TerminateExecution(Exception):
"""
Exception to indicate that execution has exceeded the preset running time.
"""
def quit_function(pid):
# Killing all subprocesses
os.setpgrp()
os.killpg(0, signal.SIGTERM)
# Killing the main thread
os.kill(pid, signal.SIGTERM)
def handle_term(signum, frame):
raise TerminateExecution()
def invoke_with_timeout(timeout, fn, *args, **kwargs):
# Setting a sigterm handler and initiating a timer
old_handler = signal.signal(signal.SIGTERM, handle_term)
timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
terminate = False
# Executing the function
timer.start()
try:
result = fn(*args, **kwargs)
except TerminateExecution:
terminate = True
finally:
# Restoring original handler and cancel timer
signal.signal(signal.SIGTERM, old_handler)
timer.cancel()
if terminate:
raise BaseException("xxx")
return result
### Test cases
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
time.sleep(1)
print('countdown finished')
return 1337
def really_long_function():
time.sleep(10)
def really_long_function2():
os.system("sleep 787")
# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337
# Testing various scenarios
t1 = time.time()
try:
print(invoke_with_timeout(1, countdown, 3))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function2))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)
class X:
def __init__(self):
self.value = 0
def set(self, v):
self.value = v
x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9
我也遇到过同样的问题,但我的情况是需要在子线程上工作,信号不适合我,所以我写了一个python包:timeout-timer来解决这个问题,支持用作上下文或装饰器,使用信号或子线程模块来触发超时中断:
from timeout_timer import timeout, TimeoutInterrupt
class TimeoutInterruptNested(TimeoutInterrupt):
pass
def test_timeout_nested_loop_both_timeout(timer="thread"):
cnt = 0
try:
with timeout(5, timer=timer):
try:
with timeout(2, timer=timer, exception=TimeoutInterruptNested):
sleep(2)
except TimeoutInterruptNested:
cnt += 1
time.sleep(10)
except TimeoutInterrupt:
cnt += 1
assert cnt == 2
查看更多信息:https://github.com/dozysun/timeout-timer
你可以使用多处理。过程来做到这一点。
Code
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate - may not work if process is stuck for good
p.terminate()
# OR Kill - will work for sure, no chance for process to finish nicely however
# p.kill()
p.join()
我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result