我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
    # Code executed here
    time.sleep(60)

这段代码是否存在任何可预见的问题?


当前回答

    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()

其他回答

另一种灵活性解决方案是Apscheduler。

pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
  pass

sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds

sched.start()

另外,apscheduler提供了如下所示的许多调度程序。

BlockingScheduler: use when the scheduler is the only thing running in your process BackgroundScheduler: use when you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application AsyncIOScheduler: use if your application uses the asyncio module GeventScheduler: use if your application uses gevent TornadoScheduler: use if you’re building a Tornado application TwistedScheduler: use if you’re building a Twisted application QtScheduler: use if you’re building a Qt application

例如:显示当前本地时间

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)

你可能会考虑Twisted,它是一个实现了Reactor Pattern的Python网络库。

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

虽然“While True: sleep(60)”可能会工作,Twisted可能已经实现了许多你最终需要的功能(如bobince指出的守护进程化、日志记录或异常处理),并且可能是一个更健壮的解决方案

    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

如果你想在不阻塞剩余代码的情况下这样做,你可以使用这个让它在自己的线程中运行:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

该解决方案结合了其他解决方案中很少结合的几个特性:

Exception handling: As far as possible on this level, exceptions are handled properly, i. e. get logged for debugging purposes without aborting our program. No chaining: The common chain-like implementation (for scheduling the next event) you find in many answers is brittle in the aspect that if anything goes wrong within the scheduling mechanism (threading.Timer or whatever), this will terminate the chain. No further executions will happen then, even if the reason of the problem is already fixed. A simple loop and waiting with a simple sleep() is much more robust in comparison. No drift: My solution keeps an exact track of the times it is supposed to run at. There is no drift depending on the execution time (as in many other solutions). Skipping: My solution will skip tasks if one execution took too much time (e. g. do X every five seconds, but X took 6 seconds). This is the standard cron behavior (and for a good reason). Many other solutions then simply execute the task several times in a row without any delay. For most cases (e. g. cleanup tasks) this is not wished. If it is wished, simply use next_time += delay instead.