我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
我们也可以用信号来表示。我认为下面的例子会对你有用。与线程相比,它非常简单。
import signal
def timeout(signum, frame):
raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
print 'Starting Main ',
while 1:
print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
main()
except myException:
print "whoops"
其他回答
如果您在UNIX上运行,则可以使用信号包:
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print("Forever is over!")
...: raise Exception("end of time")
...:
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print("sec")
...: time.sleep(1)
...:
...:
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print(exc)
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
在调用signal.alarm(10)后10秒,调用处理程序。这会引发一个异常,您可以从常规Python代码中拦截该异常。
这个模块不能很好地使用线程(但是,谁能呢?)
注意,由于我们在超时发生时引发异常,它可能最终在函数内部被捕获并忽略,例如这样一个函数:
def loop_forever():
while 1:
print('sec')
try:
time.sleep(10)
except:
continue
下面是对给定的基于线程的解决方案的轻微改进。
下面的代码支持异常:
def runFunctionCatchExceptions(func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception, message:
return ["exception", message]
return ["RESULT", result]
def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
import threading
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = runFunctionCatchExceptions(func, *args, **kwargs)
it = InterruptableThread()
it.start()
it.join(timeout_duration)
if it.isAlive():
return default
if it.result[0] == "exception":
raise it.result[1]
return it.result[1]
用5秒超时调用它:
result = timeout(remote_calculate, (myarg,), timeout_duration=5)
我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
下面是一个POSIX版本,它结合了前面的许多答案来提供以下特性:
子进程阻塞执行。 timeout函数在类成员函数上的使用。 严格要求终止时间。
下面是代码和一些测试用例:
import threading
import signal
import os
import time
class TerminateExecution(Exception):
"""
Exception to indicate that execution has exceeded the preset running time.
"""
def quit_function(pid):
# Killing all subprocesses
os.setpgrp()
os.killpg(0, signal.SIGTERM)
# Killing the main thread
os.kill(pid, signal.SIGTERM)
def handle_term(signum, frame):
raise TerminateExecution()
def invoke_with_timeout(timeout, fn, *args, **kwargs):
# Setting a sigterm handler and initiating a timer
old_handler = signal.signal(signal.SIGTERM, handle_term)
timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
terminate = False
# Executing the function
timer.start()
try:
result = fn(*args, **kwargs)
except TerminateExecution:
terminate = True
finally:
# Restoring original handler and cancel timer
signal.signal(signal.SIGTERM, old_handler)
timer.cancel()
if terminate:
raise BaseException("xxx")
return result
### Test cases
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
time.sleep(1)
print('countdown finished')
return 1337
def really_long_function():
time.sleep(10)
def really_long_function2():
os.system("sleep 787")
# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337
# Testing various scenarios
t1 = time.time()
try:
print(invoke_with_timeout(1, countdown, 3))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function2))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)
class X:
def __init__(self):
self.value = 0
def set(self, v):
self.value = v
x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9
如果工作没有完成,我打算杀死进程,使用线程和进程来实现这一点。
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import multiprocessing
# test case 1
def worker_1(a,b,c):
for _ in range(2):
print('very time consuming sleep')
sleep(1)
return a+b+c
# test case 2
def worker_2(in_name):
for _ in range(10):
print('very time consuming sleep')
sleep(1)
return 'hello '+in_name
作为上下文管理器的实际类
class FuncTimer():
def __init__(self,fn,args,runtime):
self.fn = fn
self.args = args
self.queue = multiprocessing.Queue()
self.runtime = runtime
self.process = multiprocessing.Process(target=self.thread_caller)
def thread_caller(self):
with ThreadPoolExecutor() as executor:
future = executor.submit(self.fn, *self.args)
self.queue.put(future.result())
def __enter__(self):
return self
def start_run(self):
self.process.start()
self.process.join(timeout=self.runtime)
if self.process.exitcode is None:
self.process.kill()
if self.process.exitcode is None:
out_res = None
print('killed premature')
else:
out_res = self.queue.get()
return out_res
def __exit__(self, exc_type, exc_value, exc_traceback):
self.process.kill()
如何使用
print('testing case 1')
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp:
res = fp.start_run()
print(res)
print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp:
res = fp.start_run()
print(res)