我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
    # Code executed here
    time.sleep(60)

这段代码是否存在任何可预见的问题?


当前回答

import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

如果你想在不阻塞剩余代码的情况下这样做,你可以使用这个让它在自己的线程中运行:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

该解决方案结合了其他解决方案中很少结合的几个特性:

Exception handling: As far as possible on this level, exceptions are handled properly, i. e. get logged for debugging purposes without aborting our program. No chaining: The common chain-like implementation (for scheduling the next event) you find in many answers is brittle in the aspect that if anything goes wrong within the scheduling mechanism (threading.Timer or whatever), this will terminate the chain. No further executions will happen then, even if the reason of the problem is already fixed. A simple loop and waiting with a simple sleep() is much more robust in comparison. No drift: My solution keeps an exact track of the times it is supposed to run at. There is no drift depending on the execution time (as in many other solutions). Skipping: My solution will skip tasks if one execution took too much time (e. g. do X every five seconds, but X took 6 seconds). This is the standard cron behavior (and for a good reason). Many other solutions then simply execute the task several times in a row without any delay. For most cases (e. g. cleanup tasks) this is not wished. If it is wished, simply use next_time += delay instead.

其他回答

我认为更简单的方法是:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

这样,你的代码被执行,然后等待60秒,然后再次执行,等待,执行,等等…… 没有必要把事情复杂化:D

我以前也遇到过类似的问题。也许http://cronus.readthedocs.org会有帮助?

对于v0.2,下面的代码片段可以工作

import cronus.beat as beat

beat.set_rate(2) # run twice per second
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec

我认为这取决于你想做什么,你的问题没有详细说明。

对我来说,我想在一个已经多线程的进程中做一个昂贵的操作。所以我让leader流程检查时间,只有她做昂贵的操作(检查点深度学习模型)。为了做到这一点,我增加了计数器,以确保5秒、10秒、15秒过去,每5秒保存一次(或使用math.floor的模块化算术):

def print_every_5_seconds_have_passed_exit_eventually():
    """
    https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
    https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
    :return:
    """
    opts = argparse.Namespace(start=time.time())
    next_time_to_print = 0
    while True:
        current_time_passed = time.time() - opts.start
        if current_time_passed >= next_time_to_print:
            next_time_to_print += 5
            print(f'worked and {current_time_passed=}')
            print(f'{current_time_passed % 5=}')
            print(f'{math.floor(current_time_passed % 5) == 0}')
starting __main__ at __init__
worked and current_time_passed=0.0001709461212158203
current_time_passed % 5=0.0001709461212158203
True
worked and current_time_passed=5.0
current_time_passed % 5=0.0
True
worked and current_time_passed=10.0
current_time_passed % 5=0.0
True
worked and current_time_passed=15.0
current_time_passed % 5=0.0
True

对我来说,检查if语句是我所需要的。在我已经复杂的多处理器多gpu代码中拥有线程,调度器并不是我想要添加的复杂性,如果我可以避免它,似乎我可以。检查worker id很容易确保只有一个进程在做这件事。

注意,我使用True print语句来确保模块化的算术技巧有效,因为检查确切的时间显然是行不通的!但令我惊喜的是,地板竟然起了作用。

如果你想要一种非阻塞的方式来周期性地执行你的函数,而不是阻塞无限循环,我会使用线程计时器。这样,您的代码可以继续运行并执行其他任务,并且仍然每n秒调用一次您的函数。我经常使用这种技术打印长时间、CPU/磁盘/网络密集型任务的进度信息。

下面是我在类似问题中发布的代码,带有start()和stop()控件:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

特点:

只有标准库,没有外部依赖 即使计时器已经启动/停止,也可以安全地多次调用Start()和stop() 要调用的函数可以有位置参数和命名参数 您可以随时更改间隔,它将在下次运行后生效。args、kwargs甚至function也一样!

    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()