我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。

我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?


当前回答

asyncio的另一个解决方案:

如果你想取消后台任务,而不仅仅是在运行的主代码上超时,那么你需要一个来自主线程的显式通信,要求任务的代码取消,比如threading.Event()

import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor


class SingletonTimeOut:
    pool = None

    @classmethod
    def run(cls, to_run: functools.partial, timeout: float):
        pool = cls.get_pool()
        loop = cls.get_loop()
        try:
            task = loop.run_in_executor(pool, to_run)
            return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
        except asyncio.TimeoutError as e:
            error_type = type(e).__name__ #TODO
            raise e

    @classmethod
    def get_pool(cls):
        if cls.pool is None:
            cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
        return cls.pool

    @classmethod
    def get_loop(cls):
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            asyncio.set_event_loop(asyncio.new_event_loop())
            # print("NEW LOOP" + str(threading.current_thread().ident))
            return asyncio.get_event_loop()

# ---------------

TIME_OUT = float('0.2')  # seconds

def toto(input_items,nb_predictions):
    return 1

to_run = functools.partial(toto,
                           input_items=1,
                           nb_predictions="a")

results = SingletonTimeOut.run(to_run, TIME_OUT)

其他回答

我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

如果工作没有完成,我打算杀死进程,使用线程和进程来实现这一点。

from concurrent.futures import ThreadPoolExecutor

from time import sleep
import multiprocessing


# test case 1
def worker_1(a,b,c):
    for _ in range(2):
        print('very time consuming sleep')
        sleep(1)

    return a+b+c

# test case 2
def worker_2(in_name):
    for _ in range(10):
        print('very time consuming sleep')
        sleep(1)

    return 'hello '+in_name

作为上下文管理器的实际类

class FuncTimer():
    def __init__(self,fn,args,runtime):
        self.fn = fn
        self.args = args
        self.queue = multiprocessing.Queue()
        self.runtime = runtime
        self.process = multiprocessing.Process(target=self.thread_caller)

    def thread_caller(self):
        with ThreadPoolExecutor() as executor:
            future = executor.submit(self.fn, *self.args)
            self.queue.put(future.result())

    def  __enter__(self):
        return self

    def start_run(self):
        self.process.start()
        self.process.join(timeout=self.runtime)
        if self.process.exitcode is None:
            self.process.kill()
        if self.process.exitcode is None:
            out_res = None
            print('killed premature')
        else:
            out_res = self.queue.get()
        return out_res


    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.process.kill()

如何使用

print('testing case 1') 
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)

print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)

下面是对给定的基于线程的解决方案的轻微改进。

下面的代码支持异常:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

用5秒超时调用它:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

我们也可以用信号来表示。我认为下面的例子会对你有用。与线程相比,它非常简单。

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

以防对任何人都有帮助,在@piro的回答的基础上,我做了一个函数装饰器:

import time
import signal
from functools import wraps


def timeout(timeout_secs: int):
    def wrapper(func):
        @wraps(func)
        def time_limited(*args, **kwargs):
            # Register an handler for the timeout
            def handler(signum, frame):
                raise Exception(f"Timeout for function '{func.__name__}'")

            # Register the signal function handler
            signal.signal(signal.SIGALRM, handler)

            # Define a timeout for your function
            signal.alarm(timeout_secs)

            result = None
            try:
                result = func(*args, **kwargs)
            except Exception as exc:
                raise exc
            finally:
                # disable the signal alarm
                signal.alarm(0)

            return result

        return time_limited

    return wrapper

在一个有20秒超时的函数上使用包装器看起来像这样:

    @timeout(20)
    def my_slow_or_never_ending_function(name):
        while True:
            time.sleep(1)
            print(f"Yet another second passed {name}...")

    try:
        results = my_slow_or_never_ending_function("Yooo!")
    except Exception as e:
        print(f"ERROR: {e}")