我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。
在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作
while True:
# Code executed here
time.sleep(60)
这段代码是否存在任何可预见的问题?
如果漂移不是一个问题
import threading, time
def print_every_n_seconds(n=2):
while True:
print(time.ctime())
time.sleep(n)
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()
异步输出。
#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018
如果正在运行的任务需要相当多的时间,那么间隔就变成2秒+任务时间,所以如果您需要精确的调度,那么这并不适合您。
注意daemon=True标志意味着这个线程不会阻止应用程序关闭。例如,在运行测试等待此头停止后,pytest将无限期挂起的问题。
如果您的程序还没有事件循环,请使用sched模块,它实现了一个通用的事件调度器。
import sched, time
def do_something(scheduler):
# schedule the next call first
scheduler.enter(60, 1, do_something, (scheduler,))
print("Doing stuff...")
# then do your stuff
my_scheduler = sched.scheduler(time.time, time.sleep)
my_scheduler.enter(60, 1, do_something, (my_scheduler,))
my_scheduler.run()
如果您已经在使用事件循环库,如asyncio、trio、tkinter、PyQt5、gobject、kivy等,则只需使用现有事件循环库的方法来调度任务。
另一种灵活性解决方案是Apscheduler。
pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
pass
sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds
sched.start()
另外,apscheduler提供了如下所示的许多调度程序。
BlockingScheduler: use when the scheduler is the only thing running in your process
BackgroundScheduler: use when you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application
AsyncIOScheduler: use if your application uses the asyncio module
GeventScheduler: use if your application uses gevent
TornadoScheduler: use if you’re building a Tornado application
TwistedScheduler: use if you’re building a Twisted application
QtScheduler: use if you’re building a Qt application
如果你想要一种非阻塞的方式来周期性地执行你的函数,而不是阻塞无限循环,我会使用线程计时器。这样,您的代码可以继续运行并执行其他任务,并且仍然每n秒调用一次您的函数。我经常使用这种技术打印长时间、CPU/磁盘/网络密集型任务的进度信息。
下面是我在类似问题中发布的代码,带有start()和stop()控件:
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
用法:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
特点:
只有标准库,没有外部依赖
即使计时器已经启动/停止,也可以安全地多次调用Start()和stop()
要调用的函数可以有位置参数和命名参数
您可以随时更改间隔,它将在下次运行后生效。args、kwargs甚至function也一样!