我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
你可以使用多处理。过程来做到这一点。
Code
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate - may not work if process is stuck for good
p.terminate()
# OR Kill - will work for sure, no chance for process to finish nicely however
# p.kill()
p.join()
其他回答
如果工作没有完成,我打算杀死进程,使用线程和进程来实现这一点。
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import multiprocessing
# test case 1
def worker_1(a,b,c):
for _ in range(2):
print('very time consuming sleep')
sleep(1)
return a+b+c
# test case 2
def worker_2(in_name):
for _ in range(10):
print('very time consuming sleep')
sleep(1)
return 'hello '+in_name
作为上下文管理器的实际类
class FuncTimer():
def __init__(self,fn,args,runtime):
self.fn = fn
self.args = args
self.queue = multiprocessing.Queue()
self.runtime = runtime
self.process = multiprocessing.Process(target=self.thread_caller)
def thread_caller(self):
with ThreadPoolExecutor() as executor:
future = executor.submit(self.fn, *self.args)
self.queue.put(future.result())
def __enter__(self):
return self
def start_run(self):
self.process.start()
self.process.join(timeout=self.runtime)
if self.process.exitcode is None:
self.process.kill()
if self.process.exitcode is None:
out_res = None
print('killed premature')
else:
out_res = self.queue.get()
return out_res
def __exit__(self, exc_type, exc_value, exc_traceback):
self.process.kill()
如何使用
print('testing case 1')
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp:
res = fp.start_run()
print(res)
print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp:
res = fp.start_run()
print(res)
我也遇到过同样的问题,但我的情况是需要在子线程上工作,信号不适合我,所以我写了一个python包:timeout-timer来解决这个问题,支持用作上下文或装饰器,使用信号或子线程模块来触发超时中断:
from timeout_timer import timeout, TimeoutInterrupt
class TimeoutInterruptNested(TimeoutInterrupt):
pass
def test_timeout_nested_loop_both_timeout(timer="thread"):
cnt = 0
try:
with timeout(5, timer=timer):
try:
with timeout(2, timer=timer, exception=TimeoutInterruptNested):
sleep(2)
except TimeoutInterruptNested:
cnt += 1
time.sleep(10)
except TimeoutInterrupt:
cnt += 1
assert cnt == 2
查看更多信息:https://github.com/dozysun/timeout-timer
在pypi上找到的stopit包似乎可以很好地处理超时。
我喜欢@stopit。Threading_timeoutable装饰器,它向被装饰的函数添加了一个超时参数,该参数执行您所期望的操作,它将停止该函数。
在pypi上查看:https://pypi.python.org/pypi/stopit
你可以使用多处理。过程来做到这一点。
Code
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate - may not work if process is stuck for good
p.terminate()
# OR Kill - will work for sure, no chance for process to finish nicely however
# p.kill()
p.join()
超时装饰器不能在Windows系统上工作,因为Windows不太支持信号。
如果你在windows系统中使用超时装饰器,你会得到以下结果
AttributeError: module 'signal' has no attribute 'SIGALRM'
有些人建议使用use_signals=False,但对我没用。
作者@bitranox创建了以下包:
pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
代码示例:
import time
from wrapt_timeout_decorator import *
@timeout(5)
def mytest(message):
print(message)
for i in range(1,10):
time.sleep(1)
print('{} seconds have passed'.format(i))
def main():
mytest('starting')
if __name__ == '__main__':
main()
给出以下例外:
TimeoutError: Function mytest timed out after 5 seconds