我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
如果工作没有完成,我打算杀死进程,使用线程和进程来实现这一点。
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import multiprocessing
# test case 1
def worker_1(a,b,c):
for _ in range(2):
print('very time consuming sleep')
sleep(1)
return a+b+c
# test case 2
def worker_2(in_name):
for _ in range(10):
print('very time consuming sleep')
sleep(1)
return 'hello '+in_name
作为上下文管理器的实际类
class FuncTimer():
def __init__(self,fn,args,runtime):
self.fn = fn
self.args = args
self.queue = multiprocessing.Queue()
self.runtime = runtime
self.process = multiprocessing.Process(target=self.thread_caller)
def thread_caller(self):
with ThreadPoolExecutor() as executor:
future = executor.submit(self.fn, *self.args)
self.queue.put(future.result())
def __enter__(self):
return self
def start_run(self):
self.process.start()
self.process.join(timeout=self.runtime)
if self.process.exitcode is None:
self.process.kill()
if self.process.exitcode is None:
out_res = None
print('killed premature')
else:
out_res = self.queue.get()
return out_res
def __exit__(self, exc_type, exc_value, exc_traceback):
self.process.kill()
如何使用
print('testing case 1')
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp:
res = fp.start_run()
print(res)
print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp:
res = fp.start_run()
print(res)
其他回答
我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
asyncio的另一个解决方案:
如果你想取消后台任务,而不仅仅是在运行的主代码上超时,那么你需要一个来自主线程的显式通信,要求任务的代码取消,比如threading.Event()
import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor
class SingletonTimeOut:
pool = None
@classmethod
def run(cls, to_run: functools.partial, timeout: float):
pool = cls.get_pool()
loop = cls.get_loop()
try:
task = loop.run_in_executor(pool, to_run)
return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
except asyncio.TimeoutError as e:
error_type = type(e).__name__ #TODO
raise e
@classmethod
def get_pool(cls):
if cls.pool is None:
cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
return cls.pool
@classmethod
def get_loop(cls):
try:
return asyncio.get_event_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
# print("NEW LOOP" + str(threading.current_thread().ident))
return asyncio.get_event_loop()
# ---------------
TIME_OUT = float('0.2') # seconds
def toto(input_items,nb_predictions):
return 1
to_run = functools.partial(toto,
input_items=1,
nb_predictions="a")
results = SingletonTimeOut.run(to_run, TIME_OUT)
你可以使用多处理。过程来做到这一点。
Code
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate - may not work if process is stuck for good
p.terminate()
# OR Kill - will work for sure, no chance for process to finish nicely however
# p.kill()
p.join()
我也遇到过同样的问题,但我的情况是需要在子线程上工作,信号不适合我,所以我写了一个python包:timeout-timer来解决这个问题,支持用作上下文或装饰器,使用信号或子线程模块来触发超时中断:
from timeout_timer import timeout, TimeoutInterrupt
class TimeoutInterruptNested(TimeoutInterrupt):
pass
def test_timeout_nested_loop_both_timeout(timer="thread"):
cnt = 0
try:
with timeout(5, timer=timer):
try:
with timeout(2, timer=timer, exception=TimeoutInterruptNested):
sleep(2)
except TimeoutInterruptNested:
cnt += 1
time.sleep(10)
except TimeoutInterrupt:
cnt += 1
assert cnt == 2
查看更多信息:https://github.com/dozysun/timeout-timer
我需要一个不会被时间阻塞的可嵌套定时中断(SIGALARM不能做到)。Sleep(基于线程的方法不能做到)。我最终复制了这里的代码并对其进行了轻微修改:http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
代码本身:
#!/usr/bin/python
# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
"""alarm.py: Permits multiple SIGALRM events to be queued.
Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""
import heapq
import signal
from time import time
__version__ = '$Revision: 2539 $'.split()[1]
alarmlist = []
__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))
class TimeoutError(Exception):
def __init__(self, message, id_=None):
self.message = message
self.id_ = id_
class Timeout:
''' id_ allows for nested timeouts. '''
def __init__(self, id_=None, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
self.id_ = id_
def handle_timeout(self):
raise TimeoutError(self.error_message, self.id_)
def __enter__(self):
self.this_alarm = alarm(self.seconds, self.handle_timeout)
def __exit__(self, type, value, traceback):
try:
cancel(self.this_alarm)
except ValueError:
pass
def __clear_alarm():
"""Clear an existing alarm.
If the alarm signal was set to a callable other than our own, queue the
previous alarm settings.
"""
oldsec = signal.alarm(0)
oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
if oldsec > 0 and oldfunc != __alarm_handler:
heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))
def __alarm_handler(*zargs):
"""Handle an alarm by calling any due heap entries and resetting the alarm.
Note that multiple heap entries might get called, especially if calling an
entry takes a lot of time.
"""
try:
nextt = __next_alarm()
while nextt is not None and nextt <= 0:
(tm, func, args, keys) = heapq.heappop(alarmlist)
func(*args, **keys)
nextt = __next_alarm()
finally:
if alarmlist: __set_alarm()
def alarm(sec, func, *args, **keys):
"""Set an alarm.
When the alarm is raised in `sec` seconds, the handler will call `func`,
passing `args` and `keys`. Return the heap entry (which is just a big
tuple), so that it can be cancelled by calling `cancel()`.
"""
__clear_alarm()
try:
newalarm = __new_alarm(sec, func, args, keys)
heapq.heappush(alarmlist, newalarm)
return newalarm
finally:
__set_alarm()
def cancel(alarm):
"""Cancel an alarm by passing the heap entry returned by `alarm()`.
It is an error to try to cancel an alarm which has already occurred.
"""
__clear_alarm()
try:
alarmlist.remove(alarm)
heapq.heapify(alarmlist)
finally:
if alarmlist: __set_alarm()
还有一个用法示例:
import alarm
from time import sleep
try:
with alarm.Timeout(id_='a', seconds=5):
try:
with alarm.Timeout(id_='b', seconds=2):
sleep(3)
except alarm.TimeoutError as e:
print 'raised', e.id_
sleep(30)
except alarm.TimeoutError as e:
print 'raised', e.id_
else:
print 'nope.'