我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
    # Code executed here
    time.sleep(60)

这段代码是否存在任何可预见的问题?


当前回答

我认为这取决于你想做什么,你的问题没有详细说明。

对我来说,我想在一个已经多线程的进程中做一个昂贵的操作。所以我让leader流程检查时间,只有她做昂贵的操作(检查点深度学习模型)。为了做到这一点,我增加了计数器,以确保5秒、10秒、15秒过去,每5秒保存一次(或使用math.floor的模块化算术):

def print_every_5_seconds_have_passed_exit_eventually():
    """
    https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
    https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
    :return:
    """
    opts = argparse.Namespace(start=time.time())
    next_time_to_print = 0
    while True:
        current_time_passed = time.time() - opts.start
        if current_time_passed >= next_time_to_print:
            next_time_to_print += 5
            print(f'worked and {current_time_passed=}')
            print(f'{current_time_passed % 5=}')
            print(f'{math.floor(current_time_passed % 5) == 0}')
starting __main__ at __init__
worked and current_time_passed=0.0001709461212158203
current_time_passed % 5=0.0001709461212158203
True
worked and current_time_passed=5.0
current_time_passed % 5=0.0
True
worked and current_time_passed=10.0
current_time_passed % 5=0.0
True
worked and current_time_passed=15.0
current_time_passed % 5=0.0
True

对我来说,检查if语句是我所需要的。在我已经复杂的多处理器多gpu代码中拥有线程,调度器并不是我想要添加的复杂性,如果我可以避免它,似乎我可以。检查worker id很容易确保只有一个进程在做这件事。

注意,我使用True print语句来确保模块化的算术技巧有效,因为检查确切的时间显然是行不通的!但令我惊喜的是,地板竟然起了作用。

其他回答

例如:显示当前本地时间

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)

简单地使用

import time

while True:
    print("this will run after every 30 sec")
    #Your code here
    time.sleep(30)

以下是MestreLion代码的改编版本。 除了原来的函数,这段代码:

1)添加用于在特定时间触发计时器的first_interval(调用者需要计算first_interval并传递进来)

2)在原代码中解决一个竞态条件。在原始代码中,如果控制线程未能取消正在运行的计时器(“停止计时器,并取消计时器动作的执行。这只会在计时器仍处于等待阶段时起作用。”引用自https://docs.python.org/2/library/threading.html),计时器将无休止地运行。

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False

如果漂移不是一个问题

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)
    
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

异步输出。

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

如果正在运行的任务需要相当多的时间,那么间隔就变成2秒+任务时间,所以如果您需要精确的调度,那么这并不适合您。

注意daemon=True标志意味着这个线程不会阻止应用程序关闭。例如,在运行测试等待此头停止后,pytest将无限期挂起的问题。

它和cron之间的主要区别是异常会永久地杀死守护进程。您可能希望使用异常捕获器和记录器进行包装。