我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。
在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作
while True:
# Code executed here
time.sleep(60)
这段代码是否存在任何可预见的问题?
我用这个方法使每小时产生60个事件,其中大多数事件在整分钟后的相同秒数内发生:
import math
import time
import random
TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging
def set_timing():
now = time.time()
elapsed = now - info['begin']
minutes = math.floor(elapsed/TICK)
tick_elapsed = now - info['completion_time']
if (info['tick']+1) > minutes:
wait = max(0,(TICK_TIMING-(time.time() % TICK)))
print ('standard wait: %.2f' % wait)
time.sleep(wait)
elif tick_elapsed < TICK_MINIMUM:
wait = TICK_MINIMUM-tick_elapsed
print ('minimum wait: %.2f' % wait)
time.sleep(wait)
else:
print ('skip set_timing(); no wait')
drift = ((time.time() - info['begin']) - info['tick']*TICK -
TICK_TIMING + info['begin']%TICK)
print ('drift: %.6f' % drift)
info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK
while 1:
set_timing()
print('hello world')
#random real world event
time.sleep(random.random()*TICK_MINIMUM)
info['tick'] += 1
info['completion_time'] = time.time()
根据实际情况,你可能会得到长度的刻度:
60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.
但在60分钟结束时,你会有60个滴答;而且它们中的大多数都将出现在您喜欢的正确偏移时间。
在我的系统中,我得到了< 1/20秒的典型漂移,直到需要纠正。
该方法的优点是具有较好的时钟漂移分辨率;这可能会导致问题,如果你做的事情,比如每tick追加一个项目,你希望每小时追加60个项目。未能考虑漂移可能导致次要指标,如移动平均线,将数据考虑得过于深入过去,从而导致错误的输出。
我使用Tkinter after()方法,它不会“窃取游戏”(就像之前提出的sched模块),即它允许其他东西并行运行:
import Tkinter
def do_something1():
global n1
n1 += 1
if n1 == 6: # (Optional condition)
print "* do_something1() is done *"; return
# Do your stuff here
# ...
print "do_something1() "+str(n1)
tk.after(1000, do_something1)
def do_something2():
global n2
n2 += 1
if n2 == 6: # (Optional condition)
print "* do_something2() is done *"; return
# Do your stuff here
# ...
print "do_something2() "+str(n2)
tk.after(500, do_something2)
tk = Tkinter.Tk();
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()
Do_something1()和do_something2()可以以任意的间隔速度并行运行。在这里,第2个将以两倍的速度执行。还要注意,我使用了一个简单的计数器作为终止任一函数的条件。你可以使用任何你喜欢的条件,或者不使用,如果你想让一个函数运行到程序终止(例如一个时钟)。
另一种灵活性解决方案是Apscheduler。
pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
pass
sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds
sched.start()
另外,apscheduler提供了如下所示的许多调度程序。
BlockingScheduler: use when the scheduler is the only thing running in your process
BackgroundScheduler: use when you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application
AsyncIOScheduler: use if your application uses the asyncio module
GeventScheduler: use if your application uses gevent
TornadoScheduler: use if you’re building a Tornado application
TwistedScheduler: use if you’re building a Twisted application
QtScheduler: use if you’re building a Qt application