我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
如果工作没有完成,我打算杀死进程,使用线程和进程来实现这一点。
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import multiprocessing
# test case 1
def worker_1(a,b,c):
for _ in range(2):
print('very time consuming sleep')
sleep(1)
return a+b+c
# test case 2
def worker_2(in_name):
for _ in range(10):
print('very time consuming sleep')
sleep(1)
return 'hello '+in_name
作为上下文管理器的实际类
class FuncTimer():
def __init__(self,fn,args,runtime):
self.fn = fn
self.args = args
self.queue = multiprocessing.Queue()
self.runtime = runtime
self.process = multiprocessing.Process(target=self.thread_caller)
def thread_caller(self):
with ThreadPoolExecutor() as executor:
future = executor.submit(self.fn, *self.args)
self.queue.put(future.result())
def __enter__(self):
return self
def start_run(self):
self.process.start()
self.process.join(timeout=self.runtime)
if self.process.exitcode is None:
self.process.kill()
if self.process.exitcode is None:
out_res = None
print('killed premature')
else:
out_res = self.queue.get()
return out_res
def __exit__(self, exc_type, exc_value, exc_traceback):
self.process.kill()
如何使用
print('testing case 1')
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp:
res = fp.start_run()
print(res)
print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp:
res = fp.start_run()
print(res)
其他回答
我在搜索单元测试的超时调用时遇到了这个线程。我没有在答案或第三方包中找到任何简单的东西,所以我写了下面的装饰器,你可以直接放入代码中:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
然后就像这样简单地超时测试或任何你喜欢的函数:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
有很多建议,但没有一个是使用并发的。期货,我认为这是最清晰的处理方式。
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
超级简单的阅读和维护。
我们创建一个池,提交一个进程,然后等待5秒,然后引发一个TimeoutError,你可以根据需要捕获和处理它。
本机为python 3.2+,并反向移植到2.7 (pip install futures)。
线程和进程之间的切换非常简单,只需将ProcessPoolExecutor替换为ThreadPoolExecutor。
如果您想在超时时终止进程,我建议您查看Pebble。
下面是一个POSIX版本,它结合了前面的许多答案来提供以下特性:
子进程阻塞执行。 timeout函数在类成员函数上的使用。 严格要求终止时间。
下面是代码和一些测试用例:
import threading
import signal
import os
import time
class TerminateExecution(Exception):
"""
Exception to indicate that execution has exceeded the preset running time.
"""
def quit_function(pid):
# Killing all subprocesses
os.setpgrp()
os.killpg(0, signal.SIGTERM)
# Killing the main thread
os.kill(pid, signal.SIGTERM)
def handle_term(signum, frame):
raise TerminateExecution()
def invoke_with_timeout(timeout, fn, *args, **kwargs):
# Setting a sigterm handler and initiating a timer
old_handler = signal.signal(signal.SIGTERM, handle_term)
timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
terminate = False
# Executing the function
timer.start()
try:
result = fn(*args, **kwargs)
except TerminateExecution:
terminate = True
finally:
# Restoring original handler and cancel timer
signal.signal(signal.SIGTERM, old_handler)
timer.cancel()
if terminate:
raise BaseException("xxx")
return result
### Test cases
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
time.sleep(1)
print('countdown finished')
return 1337
def really_long_function():
time.sleep(10)
def really_long_function2():
os.system("sleep 787")
# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337
# Testing various scenarios
t1 = time.time()
try:
print(invoke_with_timeout(1, countdown, 3))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function2))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)
class X:
def __init__(self):
self.value = 0
def set(self, v):
self.value = v
x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9
我怎么调用函数或者我怎么包装它,如果它超过5秒脚本取消它?
我发布了一个要点,用装饰器和threading.Timer解决了这个问题。下面是它的分类。
导入和设置兼容性
它是用Python 2和3测试的。它也应该在Unix/Linux和Windows下工作。
首先是进口。这些尝试保持代码的一致性,而不管Python版本:
from __future__ import print_function
import sys
import threading
from time import sleep
try:
import thread
except ImportError:
import _thread as thread
使用版本独立代码:
try:
range, _print = xrange, print
def print(*args, **kwargs):
flush = kwargs.pop('flush', False)
_print(*args, **kwargs)
if flush:
kwargs.get('file', sys.stdout).flush()
except NameError:
pass
现在我们已经从标准库导入了我们的功能。
exit_after装饰
接下来,我们需要一个函数来终止子线程的main():
def quit_function(fn_name):
# print to stderr, unbuffered in Python 2.
print('{0} took too long'.format(fn_name), file=sys.stderr)
sys.stderr.flush() # Python 3 stderr is likely buffered.
thread.interrupt_main() # raises KeyboardInterrupt
这是decorator本身:
def exit_after(s):
'''
use as decorator to exit process if
function takes longer than s seconds
'''
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer
使用
下面这个用法直接回答了你关于5秒后退出的问题!:
@exit_after(5)
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
sleep(1)
print('countdown finished')
演示:
>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 6, in countdown
KeyboardInterrupt
第二个函数调用将不会结束,相反,进程应该退出并返回一个跟踪!
KeyboardInterrupt并不总是停止一个睡眠线程
注意,在Windows上的Python 2中,睡眠并不总是被键盘中断中断,例如:
@exit_after(1)
def sleep10():
sleep(10)
print('slept 10 seconds')
>>> sleep10()
sleep10 took too long # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 3, in sleep10
KeyboardInterrupt
它也不可能中断扩展中运行的代码,除非它显式地检查PyErr_CheckSignals(),参见忽略Cython, Python和KeyboardInterrupt
在任何情况下,我都会避免让线程休眠超过一秒钟——这在处理器时间上是一eon。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
要捕获它并做其他事情,你可以捕获KeyboardInterrupt。
>>> try:
... countdown(10)
... except KeyboardInterrupt:
... print('do something else')
...
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
我也遇到过同样的问题,但我的情况是需要在子线程上工作,信号不适合我,所以我写了一个python包:timeout-timer来解决这个问题,支持用作上下文或装饰器,使用信号或子线程模块来触发超时中断:
from timeout_timer import timeout, TimeoutInterrupt
class TimeoutInterruptNested(TimeoutInterrupt):
pass
def test_timeout_nested_loop_both_timeout(timer="thread"):
cnt = 0
try:
with timeout(5, timer=timer):
try:
with timeout(2, timer=timer, exception=TimeoutInterruptNested):
sleep(2)
except TimeoutInterruptNested:
cnt += 1
time.sleep(10)
except TimeoutInterrupt:
cnt += 1
assert cnt == 2
查看更多信息:https://github.com/dozysun/timeout-timer