我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
伟大的,易于使用和可靠的PyPi项目超时装饰器(https://pypi.org/project/timeout-decorator/)
安装:
pip install timeout-decorator
用法:
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i
if __name__ == '__main__':
mytest()
其他回答
在pypi上找到的stopit包似乎可以很好地处理超时。
我喜欢@stopit。Threading_timeoutable装饰器,它向被装饰的函数添加了一个超时参数,该参数执行您所期望的操作,它将停止该函数。
在pypi上查看:https://pypi.python.org/pypi/stopit
Tim Savannah的func_timeout包对我来说工作得很好。
安装:
PIP安装func_timeout
用法:
import time
from func_timeout import func_timeout, FunctionTimedOut
def my_func(n):
time.sleep(n)
time_to_sleep = 10
# time out after 2 seconds using kwargs
func_timeout(2, my_func, kwargs={'n' : time_to_sleep})
# time out after 2 seconds using args
func_timeout(2, my_func, args=(time_to_sleep,))
我是wrapt_timeout_decorator的作者。
这里介绍的大多数解决方案乍一看在Linux下都工作得很好——因为我们有fork()和signals()——但在windows上看起来有点不同。 当涉及到Linux上的子线程时,你不能再使用信号了。
为了在Windows下生成一个进程,它需要是可pickle的——许多装饰函数或Class方法都不是。
所以你需要使用一个更好的pickler像莳萝和multiprocess(不是pickle和multiprocessing) -这就是为什么你不能使用ProcessPoolExecutor(或只有有限的功能)。
For the timeout itself - You need to define what timeout means - because on Windows it will take considerable (and not determinable) time to spawn the process. This can be tricky on short timeouts. Lets assume, spawning the process takes about 0.5 seconds (easily !!!). If You give a timeout of 0.2 seconds what should happen? Should the function time out after 0.5 + 0.2 seconds (so let the method run for 0.2 seconds)? Or should the called process time out after 0.2 seconds (in that case, the decorated function will ALWAYS timeout, because in that time it is not even spawned)?
嵌套的装饰器也很讨厌,你不能在子线程中使用信号。如果你想要创建一个真正通用的、跨平台的装饰器,所有这些都需要考虑(并测试)。
其他问题是将异常传递回调用者,以及记录问题(如果在装饰函数中使用-不支持记录到另一个进程中的文件)
我试图涵盖所有的边缘情况,您可以查看包wrapt_timeout_decorator,或者至少测试您自己的解决方案,受到那里使用的单元测试的启发。
@Alexis Eggermont -不幸的是,我没有足够的分数来评论-也许其他人可以通知你-我认为我解决了你的进口问题。
我需要一个不会被时间阻塞的可嵌套定时中断(SIGALARM不能做到)。Sleep(基于线程的方法不能做到)。我最终复制了这里的代码并对其进行了轻微修改:http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
代码本身:
#!/usr/bin/python
# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
"""alarm.py: Permits multiple SIGALRM events to be queued.
Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""
import heapq
import signal
from time import time
__version__ = '$Revision: 2539 $'.split()[1]
alarmlist = []
__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))
class TimeoutError(Exception):
def __init__(self, message, id_=None):
self.message = message
self.id_ = id_
class Timeout:
''' id_ allows for nested timeouts. '''
def __init__(self, id_=None, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
self.id_ = id_
def handle_timeout(self):
raise TimeoutError(self.error_message, self.id_)
def __enter__(self):
self.this_alarm = alarm(self.seconds, self.handle_timeout)
def __exit__(self, type, value, traceback):
try:
cancel(self.this_alarm)
except ValueError:
pass
def __clear_alarm():
"""Clear an existing alarm.
If the alarm signal was set to a callable other than our own, queue the
previous alarm settings.
"""
oldsec = signal.alarm(0)
oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
if oldsec > 0 and oldfunc != __alarm_handler:
heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))
def __alarm_handler(*zargs):
"""Handle an alarm by calling any due heap entries and resetting the alarm.
Note that multiple heap entries might get called, especially if calling an
entry takes a lot of time.
"""
try:
nextt = __next_alarm()
while nextt is not None and nextt <= 0:
(tm, func, args, keys) = heapq.heappop(alarmlist)
func(*args, **keys)
nextt = __next_alarm()
finally:
if alarmlist: __set_alarm()
def alarm(sec, func, *args, **keys):
"""Set an alarm.
When the alarm is raised in `sec` seconds, the handler will call `func`,
passing `args` and `keys`. Return the heap entry (which is just a big
tuple), so that it can be cancelled by calling `cancel()`.
"""
__clear_alarm()
try:
newalarm = __new_alarm(sec, func, args, keys)
heapq.heappush(alarmlist, newalarm)
return newalarm
finally:
__set_alarm()
def cancel(alarm):
"""Cancel an alarm by passing the heap entry returned by `alarm()`.
It is an error to try to cancel an alarm which has already occurred.
"""
__clear_alarm()
try:
alarmlist.remove(alarm)
heapq.heapify(alarmlist)
finally:
if alarmlist: __set_alarm()
还有一个用法示例:
import alarm
from time import sleep
try:
with alarm.Timeout(id_='a', seconds=5):
try:
with alarm.Timeout(id_='b', seconds=2):
sleep(3)
except alarm.TimeoutError as e:
print 'raised', e.id_
sleep(30)
except alarm.TimeoutError as e:
print 'raised', e.id_
else:
print 'nope.'
伟大的,易于使用和可靠的PyPi项目超时装饰器(https://pypi.org/project/timeout-decorator/)
安装:
pip install timeout-decorator
用法:
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i
if __name__ == '__main__':
mytest()