我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
    # Code executed here
    time.sleep(60)

这段代码是否存在任何可预见的问题?


当前回答

以下是MestreLion代码的改编版本。 除了原来的函数,这段代码:

1)添加用于在特定时间触发计时器的first_interval(调用者需要计算first_interval并传递进来)

2)在原代码中解决一个竞态条件。在原始代码中,如果控制线程未能取消正在运行的计时器(“停止计时器,并取消计时器动作的执行。这只会在计时器仍处于等待阶段时起作用。”引用自https://docs.python.org/2/library/threading.html),计时器将无休止地运行。

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False

其他回答

我认为更简单的方法是:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

这样,你的代码被执行,然后等待60秒,然后再次执行,等待,执行,等等…… 没有必要把事情复杂化:D

你可能会考虑Twisted,它是一个实现了Reactor Pattern的Python网络库。

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

虽然“While True: sleep(60)”可能会工作,Twisted可能已经实现了许多你最终需要的功能(如bobince指出的守护进程化、日志记录或异常处理),并且可能是一个更健壮的解决方案

我用这个方法使每小时产生60个事件,其中大多数事件在整分钟后的相同秒数内发生:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

根据实际情况,你可能会得到长度的刻度:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

但在60分钟结束时,你会有60个滴答;而且它们中的大多数都将出现在您喜欢的正确偏移时间。

在我的系统中,我得到了< 1/20秒的典型漂移,直到需要纠正。

该方法的优点是具有较好的时钟漂移分辨率;这可能会导致问题,如果你做的事情,比如每tick追加一个项目,你希望每小时追加60个项目。未能考虑漂移可能导致次要指标,如移动平均线,将数据考虑得过于深入过去,从而导致错误的输出。

另一种灵活性解决方案是Apscheduler。

pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
  pass

sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds

sched.start()

另外,apscheduler提供了如下所示的许多调度程序。

BlockingScheduler: use when the scheduler is the only thing running in your process BackgroundScheduler: use when you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application AsyncIOScheduler: use if your application uses the asyncio module GeventScheduler: use if your application uses gevent TornadoScheduler: use if you’re building a Tornado application TwistedScheduler: use if you’re building a Twisted application QtScheduler: use if you’re building a Qt application

我以前也遇到过类似的问题。也许http://cronus.readthedocs.org会有帮助?

对于v0.2,下面的代码片段可以工作

import cronus.beat as beat

beat.set_rate(2) # run twice per second
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec