我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。

我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?


当前回答

#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

其他回答

超时装饰器不能在Windows系统上工作,因为Windows不太支持信号。

如果你在windows系统中使用超时装饰器,你会得到以下结果

AttributeError: module 'signal' has no attribute 'SIGALRM'

有些人建议使用use_signals=False,但对我没用。

作者@bitranox创建了以下包:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

代码示例:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

给出以下例外:

TimeoutError: Function mytest timed out after 5 seconds

Tim Savannah的func_timeout包对我来说工作得很好。

安装:

PIP安装func_timeout

用法:

import time
from func_timeout import func_timeout, FunctionTimedOut

def my_func(n):
    time.sleep(n)

time_to_sleep = 10

# time out after 2 seconds using kwargs
func_timeout(2, my_func, kwargs={'n' : time_to_sleep})

# time out after 2 seconds using args
func_timeout(2, my_func, args=(time_to_sleep,))

asyncio的另一个解决方案:

如果你想取消后台任务,而不仅仅是在运行的主代码上超时,那么你需要一个来自主线程的显式通信,要求任务的代码取消,比如threading.Event()

import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor


class SingletonTimeOut:
    pool = None

    @classmethod
    def run(cls, to_run: functools.partial, timeout: float):
        pool = cls.get_pool()
        loop = cls.get_loop()
        try:
            task = loop.run_in_executor(pool, to_run)
            return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
        except asyncio.TimeoutError as e:
            error_type = type(e).__name__ #TODO
            raise e

    @classmethod
    def get_pool(cls):
        if cls.pool is None:
            cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
        return cls.pool

    @classmethod
    def get_loop(cls):
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            asyncio.set_event_loop(asyncio.new_event_loop())
            # print("NEW LOOP" + str(threading.current_thread().ident))
            return asyncio.get_event_loop()

# ---------------

TIME_OUT = float('0.2')  # seconds

def toto(input_items,nb_predictions):
    return 1

to_run = functools.partial(toto,
                           input_items=1,
                           nb_predictions="a")

results = SingletonTimeOut.run(to_run, TIME_OUT)

我是wrapt_timeout_decorator的作者。

这里介绍的大多数解决方案乍一看在Linux下都工作得很好——因为我们有fork()和signals()——但在windows上看起来有点不同。 当涉及到Linux上的子线程时,你不能再使用信号了。

为了在Windows下生成一个进程,它需要是可pickle的——许多装饰函数或Class方法都不是。

所以你需要使用一个更好的pickler像莳萝和multiprocess(不是pickle和multiprocessing) -这就是为什么你不能使用ProcessPoolExecutor(或只有有限的功能)。

For the timeout itself - You need to define what timeout means - because on Windows it will take considerable (and not determinable) time to spawn the process. This can be tricky on short timeouts. Lets assume, spawning the process takes about 0.5 seconds (easily !!!). If You give a timeout of 0.2 seconds what should happen? Should the function time out after 0.5 + 0.2 seconds (so let the method run for 0.2 seconds)? Or should the called process time out after 0.2 seconds (in that case, the decorated function will ALWAYS timeout, because in that time it is not even spawned)?

嵌套的装饰器也很讨厌,你不能在子线程中使用信号。如果你想要创建一个真正通用的、跨平台的装饰器,所有这些都需要考虑(并测试)。

其他问题是将异常传递回调用者,以及记录问题(如果在装饰函数中使用-不支持记录到另一个进程中的文件)

我试图涵盖所有的边缘情况,您可以查看包wrapt_timeout_decorator,或者至少测试您自己的解决方案,受到那里使用的单元测试的启发。

@Alexis Eggermont -不幸的是,我没有足够的分数来评论-也许其他人可以通知你-我认为我解决了你的进口问题。

你可以使用多处理。过程来做到这一点。

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()