我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。

我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?


当前回答

我在搜索单元测试的超时调用时遇到了这个线程。我没有在答案或第三方包中找到任何简单的东西,所以我写了下面的装饰器,你可以直接放入代码中:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

然后就像这样简单地超时测试或任何你喜欢的函数:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

其他回答

我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

你可以使用多处理。过程来做到这一点。

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()

我在搜索单元测试的超时调用时遇到了这个线程。我没有在答案或第三方包中找到任何简单的东西,所以我写了下面的装饰器,你可以直接放入代码中:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

然后就像这样简单地超时测试或任何你喜欢的函数:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

突出了

引发TimeoutError使用异常在超时时发出警报-可以很容易地修改 跨平台:Windows和Mac OS X 兼容性:Python 3.6+(我也在Python 2.7上进行了测试,它可以在很小的语法调整下工作)

有关平行地图的完整解释和扩展,请参见https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts

最小的例子

>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
  ...
__main__.TimeoutError: function 'bar' timed out after 4s

正如预期的那样

>>> bar(2)
2

完整代码

import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill

from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any

class TimeoutError(Exception):

    def __init__(self, func: Callable, timeout: int):
        self.t = timeout
        self.fname = func.__name__

    def __str__(self):
            return f"function '{self.fname}' timed out after {self.t}s"


def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
    """lemmiwinks crawls into the unknown"""
    q.put(dill.loads(func)(*args, **kwargs))


def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
    """
    Single function call with a timeout

    Args:
        func: the function
        timeout: The timeout in seconds
    """

    if not isinstance(timeout, int):
        raise ValueError(f'timeout needs to be an int. Got: {timeout}')

    if func is None:
        return functools.partial(killer_call, timeout=timeout)

    @functools.wraps(killer_call)
    def _inners(*args, **kwargs) -> Any:
        q_worker = mp.Queue()
        proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
        proc.start()
        try:
            return q_worker.get(timeout=timeout)
        except mpq.Empty:
            raise TimeoutError(func, timeout)
        finally:
            try:
                proc.terminate()
            except:
                pass
    return _inners

if __name__ == '__main__':
    @killer_call(timeout=4)
    def bar(x):
        import time
        time.sleep(x)
        return x

    print(bar(2))
    bar(10)

笔记

由于dill的工作方式,您需要在函数内部导入。

这也意味着如果目标函数中有导入,这些函数可能与doctest不兼容。你将会遇到__import__未找到的问题。

下面是一个POSIX版本,它结合了前面的许多答案来提供以下特性:

子进程阻塞执行。 timeout函数在类成员函数上的使用。 严格要求终止时间。

下面是代码和一些测试用例:

import threading
import signal
import os
import time

class TerminateExecution(Exception):
    """
    Exception to indicate that execution has exceeded the preset running time.
    """


def quit_function(pid):
    # Killing all subprocesses
    os.setpgrp()
    os.killpg(0, signal.SIGTERM)

    # Killing the main thread
    os.kill(pid, signal.SIGTERM)


def handle_term(signum, frame):
    raise TerminateExecution()


def invoke_with_timeout(timeout, fn, *args, **kwargs):
    # Setting a sigterm handler and initiating a timer
    old_handler = signal.signal(signal.SIGTERM, handle_term)
    timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
    terminate = False

    # Executing the function
    timer.start()
    try:
        result = fn(*args, **kwargs)
    except TerminateExecution:
        terminate = True
    finally:
        # Restoring original handler and cancel timer
        signal.signal(signal.SIGTERM, old_handler)
        timer.cancel()

    if terminate:
        raise BaseException("xxx")

    return result

### Test cases
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        time.sleep(1)
    print('countdown finished')
    return 1337


def really_long_function():
    time.sleep(10)


def really_long_function2():
    os.system("sleep 787")


# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337

# Testing various scenarios
t1 = time.time()
try:
    print(invoke_with_timeout(1, countdown, 3))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function2))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)


t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)


class X:
    def __init__(self):
        self.value = 0

    def set(self, v):
        self.value = v


x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9