我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
    # Code executed here
    time.sleep(60)

这段代码是否存在任何可预见的问题?


当前回答

如果你想要一种非阻塞的方式来周期性地执行你的函数,而不是阻塞无限循环,我会使用线程计时器。这样,您的代码可以继续运行并执行其他任务,并且仍然每n秒调用一次您的函数。我经常使用这种技术打印长时间、CPU/磁盘/网络密集型任务的进度信息。

下面是我在类似问题中发布的代码,带有start()和stop()控件:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

特点:

只有标准库,没有外部依赖 即使计时器已经启动/停止,也可以安全地多次调用Start()和stop() 要调用的函数可以有位置参数和命名参数 您可以随时更改间隔,它将在下次运行后生效。args、kwargs甚至function也一样!

其他回答

这里是另一个不使用任何额外库的解决方案。

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

如果你想在不阻塞剩余代码的情况下这样做,你可以使用这个让它在自己的线程中运行:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

该解决方案结合了其他解决方案中很少结合的几个特性:

Exception handling: As far as possible on this level, exceptions are handled properly, i. e. get logged for debugging purposes without aborting our program. No chaining: The common chain-like implementation (for scheduling the next event) you find in many answers is brittle in the aspect that if anything goes wrong within the scheduling mechanism (threading.Timer or whatever), this will terminate the chain. No further executions will happen then, even if the reason of the problem is already fixed. A simple loop and waiting with a simple sleep() is much more robust in comparison. No drift: My solution keeps an exact track of the times it is supposed to run at. There is no drift depending on the execution time (as in many other solutions). Skipping: My solution will skip tasks if one execution took too much time (e. g. do X every five seconds, but X took 6 seconds). This is the standard cron behavior (and for a good reason). Many other solutions then simply execute the task several times in a row without any delay. For most cases (e. g. cleanup tasks) this is not wished. If it is wished, simply use next_time += delay instead.

如果你想要一种非阻塞的方式来周期性地执行你的函数,而不是阻塞无限循环,我会使用线程计时器。这样,您的代码可以继续运行并执行其他任务,并且仍然每n秒调用一次您的函数。我经常使用这种技术打印长时间、CPU/磁盘/网络密集型任务的进度信息。

下面是我在类似问题中发布的代码,带有start()和stop()控件:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

特点:

只有标准库,没有外部依赖 即使计时器已经启动/停止,也可以安全地多次调用Start()和stop() 要调用的函数可以有位置参数和命名参数 您可以随时更改间隔,它将在下次运行后生效。args、kwargs甚至function也一样!

另一种灵活性解决方案是Apscheduler。

pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
  pass

sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds

sched.start()

另外,apscheduler提供了如下所示的许多调度程序。

BlockingScheduler: use when the scheduler is the only thing running in your process BackgroundScheduler: use when you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application AsyncIOScheduler: use if your application uses the asyncio module GeventScheduler: use if your application uses gevent TornadoScheduler: use if you’re building a Tornado application TwistedScheduler: use if you’re building a Twisted application QtScheduler: use if you’re building a Qt application

计时计数可以做到高精度(即< 1毫秒),因为它与系统时钟同步。它不会随着时间的推移而漂移,也不受代码执行时间长度的影响(当然,前提是它小于间隔时间)。

一个简单的阻塞的例子:

from timed_count import timed_count

for count in timed_count(60):
    # Execute code here exactly every 60 seconds
    ...

你可以通过在线程中运行它来让它变得不阻塞:

from threading import Thread
from timed_count import timed_count

def periodic():
    for count in timed_count(60):
        # Execute code here exactly every 60 seconds
        ...

thread = Thread(target=periodic)
thread.start()