我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。

我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?


当前回答

突出了

引发TimeoutError使用异常在超时时发出警报-可以很容易地修改 跨平台:Windows和Mac OS X 兼容性:Python 3.6+(我也在Python 2.7上进行了测试,它可以在很小的语法调整下工作)

有关平行地图的完整解释和扩展,请参见https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts

最小的例子

>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
  ...
__main__.TimeoutError: function 'bar' timed out after 4s

正如预期的那样

>>> bar(2)
2

完整代码

import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill

from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any

class TimeoutError(Exception):

    def __init__(self, func: Callable, timeout: int):
        self.t = timeout
        self.fname = func.__name__

    def __str__(self):
            return f"function '{self.fname}' timed out after {self.t}s"


def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
    """lemmiwinks crawls into the unknown"""
    q.put(dill.loads(func)(*args, **kwargs))


def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
    """
    Single function call with a timeout

    Args:
        func: the function
        timeout: The timeout in seconds
    """

    if not isinstance(timeout, int):
        raise ValueError(f'timeout needs to be an int. Got: {timeout}')

    if func is None:
        return functools.partial(killer_call, timeout=timeout)

    @functools.wraps(killer_call)
    def _inners(*args, **kwargs) -> Any:
        q_worker = mp.Queue()
        proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
        proc.start()
        try:
            return q_worker.get(timeout=timeout)
        except mpq.Empty:
            raise TimeoutError(func, timeout)
        finally:
            try:
                proc.terminate()
            except:
                pass
    return _inners

if __name__ == '__main__':
    @killer_call(timeout=4)
    def bar(x):
        import time
        time.sleep(x)
        return x

    print(bar(2))
    bar(10)

笔记

由于dill的工作方式,您需要在函数内部导入。

这也意味着如果目标函数中有导入,这些函数可能与doctest不兼容。你将会遇到__import__未找到的问题。

其他回答

我是wrapt_timeout_decorator的作者。

这里介绍的大多数解决方案乍一看在Linux下都工作得很好——因为我们有fork()和signals()——但在windows上看起来有点不同。 当涉及到Linux上的子线程时,你不能再使用信号了。

为了在Windows下生成一个进程,它需要是可pickle的——许多装饰函数或Class方法都不是。

所以你需要使用一个更好的pickler像莳萝和multiprocess(不是pickle和multiprocessing) -这就是为什么你不能使用ProcessPoolExecutor(或只有有限的功能)。

For the timeout itself - You need to define what timeout means - because on Windows it will take considerable (and not determinable) time to spawn the process. This can be tricky on short timeouts. Lets assume, spawning the process takes about 0.5 seconds (easily !!!). If You give a timeout of 0.2 seconds what should happen? Should the function time out after 0.5 + 0.2 seconds (so let the method run for 0.2 seconds)? Or should the called process time out after 0.2 seconds (in that case, the decorated function will ALWAYS timeout, because in that time it is not even spawned)?

嵌套的装饰器也很讨厌,你不能在子线程中使用信号。如果你想要创建一个真正通用的、跨平台的装饰器,所有这些都需要考虑(并测试)。

其他问题是将异常传递回调用者,以及记录问题(如果在装饰函数中使用-不支持记录到另一个进程中的文件)

我试图涵盖所有的边缘情况,您可以查看包wrapt_timeout_decorator,或者至少测试您自己的解决方案,受到那里使用的单元测试的启发。

@Alexis Eggermont -不幸的是,我没有足够的分数来评论-也许其他人可以通知你-我认为我解决了你的进口问题。

在pypi上找到的stopit包似乎可以很好地处理超时。

我喜欢@stopit。Threading_timeoutable装饰器,它向被装饰的函数添加了一个超时参数,该参数执行您所期望的操作,它将停止该函数。

在pypi上查看:https://pypi.python.org/pypi/stopit

下面是一个POSIX版本,它结合了前面的许多答案来提供以下特性:

子进程阻塞执行。 timeout函数在类成员函数上的使用。 严格要求终止时间。

下面是代码和一些测试用例:

import threading
import signal
import os
import time

class TerminateExecution(Exception):
    """
    Exception to indicate that execution has exceeded the preset running time.
    """


def quit_function(pid):
    # Killing all subprocesses
    os.setpgrp()
    os.killpg(0, signal.SIGTERM)

    # Killing the main thread
    os.kill(pid, signal.SIGTERM)


def handle_term(signum, frame):
    raise TerminateExecution()


def invoke_with_timeout(timeout, fn, *args, **kwargs):
    # Setting a sigterm handler and initiating a timer
    old_handler = signal.signal(signal.SIGTERM, handle_term)
    timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
    terminate = False

    # Executing the function
    timer.start()
    try:
        result = fn(*args, **kwargs)
    except TerminateExecution:
        terminate = True
    finally:
        # Restoring original handler and cancel timer
        signal.signal(signal.SIGTERM, old_handler)
        timer.cancel()

    if terminate:
        raise BaseException("xxx")

    return result

### Test cases
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        time.sleep(1)
    print('countdown finished')
    return 1337


def really_long_function():
    time.sleep(10)


def really_long_function2():
    os.system("sleep 787")


# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337

# Testing various scenarios
t1 = time.time()
try:
    print(invoke_with_timeout(1, countdown, 3))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function2))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)


t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)


class X:
    def __init__(self):
        self.value = 0

    def set(self, v):
        self.value = v


x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9

如果您在UNIX上运行,则可以使用信号包:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

在调用signal.alarm(10)后10秒,调用处理程序。这会引发一个异常,您可以从常规Python代码中拦截该异常。

这个模块不能很好地使用线程(但是,谁能呢?)

注意,由于我们在超时发生时引发异常,它可能最终在函数内部被捕获并忽略,例如这样一个函数:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue

我需要一个不会被时间阻塞的可嵌套定时中断(SIGALARM不能做到)。Sleep(基于线程的方法不能做到)。我最终复制了这里的代码并对其进行了轻微修改:http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

代码本身:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

还有一个用法示例:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'