我一直在使用Django开发一个web应用程序,我很好奇是否有一种方法可以安排一个作业定期运行。
基本上,我只是想运行数据库,并在自动的、定期的基础上进行一些计算/更新,但我似乎找不到任何关于这样做的文档。
有人知道怎么设置吗?
澄清一下:我知道我可以设置一个cron作业来完成这个任务,但我很好奇Django中是否有一些特性提供了这个功能。我希望人们能够自己部署这个应用程序,而不需要做很多配置(最好是零配置)。
我曾经考虑过“回溯性”地触发这些操作,方法是简单地检查自上一次请求发送到站点以来作业是否应该运行,但我希望使用更简洁的方法。
我用芹菜来创建我的定期任务。首先你需要安装它,如下所示:
pip install django-celery
不要忘记在你的设置中注册django-芹菜,然后你可以这样做:
from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
#your code
另一种选择是使用火箭技术:
from rocketry import Rocketry
from rocketry.conds import daily, after_success
app = Rocketry()
@app.task(daily.at("10:00"))
def do_daily():
...
@app.task(after_success(do_daily))
def do_after_another():
...
if __name__ == "__main__":
app.run()
它还支持自定义条件:
from pathlib import Path
@app.cond()
def file_exists(file):
return Path(file).exists()
@app.task(daily & file_exists("myfile.csv"))
def do_custom():
...
它也支持Cron:
from rocketry.conds import cron
@app.task(cron('*/2 12-18 * Oct Fri'))
def do_cron():
...
它可以很好地与FastAPI集成,我认为它也可以与Django集成,Rocketry本质上只是一个复杂的循环,可以生成异步任务,线程和进程。
声明:我是作者。
我用芹菜来创建我的定期任务。首先你需要安装它,如下所示:
pip install django-celery
不要忘记在你的设置中注册django-芹菜,然后你可以这样做:
from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
#your code