我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
    # Code executed here
    time.sleep(60)

这段代码是否存在任何可预见的问题?


当前回答

它和cron之间的主要区别是异常会永久地杀死守护进程。您可能希望使用异常捕获器和记录器进行包装。

其他回答

一个可能的答案是:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()

例如:显示当前本地时间

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)

我认为这取决于你想做什么,你的问题没有详细说明。

对我来说,我想在一个已经多线程的进程中做一个昂贵的操作。所以我让leader流程检查时间,只有她做昂贵的操作(检查点深度学习模型)。为了做到这一点,我增加了计数器,以确保5秒、10秒、15秒过去,每5秒保存一次(或使用math.floor的模块化算术):

def print_every_5_seconds_have_passed_exit_eventually():
    """
    https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
    https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
    :return:
    """
    opts = argparse.Namespace(start=time.time())
    next_time_to_print = 0
    while True:
        current_time_passed = time.time() - opts.start
        if current_time_passed >= next_time_to_print:
            next_time_to_print += 5
            print(f'worked and {current_time_passed=}')
            print(f'{current_time_passed % 5=}')
            print(f'{math.floor(current_time_passed % 5) == 0}')
starting __main__ at __init__
worked and current_time_passed=0.0001709461212158203
current_time_passed % 5=0.0001709461212158203
True
worked and current_time_passed=5.0
current_time_passed % 5=0.0
True
worked and current_time_passed=10.0
current_time_passed % 5=0.0
True
worked and current_time_passed=15.0
current_time_passed % 5=0.0
True

对我来说,检查if语句是我所需要的。在我已经复杂的多处理器多gpu代码中拥有线程,调度器并不是我想要添加的复杂性,如果我可以避免它,似乎我可以。检查worker id很容易确保只有一个进程在做这件事。

注意,我使用True print语句来确保模块化的算术技巧有效,因为检查确切的时间显然是行不通的!但令我惊喜的是,地板竟然起了作用。

以下是MestreLion代码的改编版本。 除了原来的函数,这段代码:

1)添加用于在特定时间触发计时器的first_interval(调用者需要计算first_interval并传递进来)

2)在原代码中解决一个竞态条件。在原始代码中,如果控制线程未能取消正在运行的计时器(“停止计时器,并取消计时器动作的执行。这只会在计时器仍处于等待阶段时起作用。”引用自https://docs.python.org/2/library/threading.html),计时器将无休止地运行。

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False

我认为更简单的方法是:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

这样,你的代码被执行,然后等待60秒,然后再次执行,等待,执行,等等…… 没有必要把事情复杂化:D