我一直在使用Django开发一个web应用程序,我很好奇是否有一种方法可以安排一个作业定期运行。

基本上,我只是想运行数据库,并在自动的、定期的基础上进行一些计算/更新,但我似乎找不到任何关于这样做的文档。

有人知道怎么设置吗?

澄清一下:我知道我可以设置一个cron作业来完成这个任务,但我很好奇Django中是否有一些特性提供了这个功能。我希望人们能够自己部署这个应用程序,而不需要做很多配置(最好是零配置)。

我曾经考虑过“回溯性”地触发这些操作,方法是简单地检查自上一次请求发送到站点以来作业是否应该运行,但我希望使用更简洁的方法。


当前回答

虽然不是Django的一部分,但气流是一个更近期的项目(截至2016年),用于任务管理。

风流是一个工作流自动化和调度系统,可用于创建和管理数据管道。基于web的UI为开发人员提供了一系列管理和查看这些管道的选项。

《气流》是用Python编写的,使用Flask构建的。

“气流”是由Airbnb的Maxime Beauchemin在2015年春天创建的开源软件。它在2016年冬天加入了Apache软件基金会的孵化计划。下面是Git项目页面和一些附加的背景信息。

其他回答

我采用的一个解决方案是这样的:

1)创建一个自定义管理命令,例如:

python manage.py my_cool_command

2)使用cron(在Linux上)或at(在Windows上)在需要的时间运行我的命令。

这是一个简单的解决方案,不需要安装沉重的AMQP堆栈。然而,使用像芹菜这样的东西有很好的优点,在其他答案中提到过。特别是,使用芹菜,不需要将应用程序逻辑扩展到crontab文件中是很好的。然而,cron解决方案非常适合中小型应用程序,并且不需要大量外部依赖。

编辑:

在windows的后续版本中,at命令在windows 8、Server 2012及以上版本中已弃用。你可以使用schtasks.exe来做同样的事情。

****更新**** 这是django doc中用来编写自定义管理命令的新链接

我只是想到了一个相当简单的解决方案:

定义一个视图函数do_work(req, param),就像使用其他任何视图一样,使用URL映射,返回一个HttpResponse等等。 使用您的定时首选项(或在Windows中使用AT或Scheduled Tasks)设置一个运行curl http://localhost/your/mapped/url?param=value的cron作业。

你可以添加参数,但只是将参数添加到URL。

告诉我你们的想法。

[更新]我现在使用runjob命令从django扩展而不是curl。

我的cron看起来是这样的:

@hourly python /path/to/project/manage.py runjobs hourly

... 每天,每月等等。还可以将其设置为运行特定作业。

我觉得这样更容易管理,也更干净。不需要将URL映射到视图。只需定义作业类和crontab,就完成了。

我有完全相同的需求一段时间前,并最终解决它使用APScheduler(用户指南)

它使调度任务超级简单,并使其独立于某些代码的基于请求的执行。下面是一个简单的例子。

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
job = None

def tick():
    print('One tick!')\

def start_job():
    global job
    job = scheduler.add_job(tick, 'interval', seconds=3600)
    try:
        scheduler.start()
    except:
        pass

希望这能帮助到一些人!

看看Django Poor Man’s Cron,这是一个Django应用程序,它利用垃圾邮件机器人、搜索引擎索引机器人等,以大约定期的时间间隔运行计划任务

参见:http://code.google.com/p/django-poormanscron/

我用芹菜来创建我的定期任务。首先你需要安装它,如下所示:

pip install django-celery

不要忘记在你的设置中注册django-芹菜,然后你可以这样做:

from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
 #your code