我一直在使用Django开发一个web应用程序,我很好奇是否有一种方法可以安排一个作业定期运行。

基本上,我只是想运行数据库,并在自动的、定期的基础上进行一些计算/更新,但我似乎找不到任何关于这样做的文档。

有人知道怎么设置吗?

澄清一下:我知道我可以设置一个cron作业来完成这个任务,但我很好奇Django中是否有一些特性提供了这个功能。我希望人们能够自己部署这个应用程序,而不需要做很多配置(最好是零配置)。

我曾经考虑过“回溯性”地触发这些操作,方法是简单地检查自上一次请求发送到站点以来作业是否应该运行,但我希望使用更简洁的方法。


如果您使用的是标准POSIX操作系统,则使用cron。

如果使用Windows,则使用at。

编写一个Django管理命令

弄清楚他们在哪个站台。 为您的用户执行适当的“AT”命令,或者为您的用户更新crontab。


我采用的一个解决方案是这样的:

1)创建一个自定义管理命令,例如:

python manage.py my_cool_command

2)使用cron(在Linux上)或at(在Windows上)在需要的时间运行我的命令。

这是一个简单的解决方案,不需要安装沉重的AMQP堆栈。然而,使用像芹菜这样的东西有很好的优点,在其他答案中提到过。特别是,使用芹菜,不需要将应用程序逻辑扩展到crontab文件中是很好的。然而,cron解决方案非常适合中小型应用程序,并且不需要大量外部依赖。

编辑:

在windows的后续版本中,at命令在windows 8、Server 2012及以上版本中已弃用。你可以使用schtasks.exe来做同样的事情。

****更新**** 这是django doc中用来编写自定义管理命令的新链接


看看Django Poor Man’s Cron,这是一个Django应用程序,它利用垃圾邮件机器人、搜索引擎索引机器人等,以大约定期的时间间隔运行计划任务

参见:http://code.google.com/p/django-poormanscron/


我个人使用cron,但是django扩展的作业调度部分看起来很有趣。


有趣的可插拔Django应用程序:Django -chronograph

你只需要添加一个作为计时器的cron条目,就可以在脚本中运行一个非常好的Django管理界面。


芹菜是一个分布式任务队列,建立在AMQP (RabbitMQ)上。它还以类似cron的方式处理周期性任务(参见周期性任务)。根据你的应用,它可能值得一看。

用django (docs)设置芹菜非常容易,周期性任务实际上会在停机的情况下跳过错过的任务。芹菜还有内置的重试机制,以防任务失败。


将以下内容放在你的cron.py文件的顶部:

#!/usr/bin/python
import os, sys
sys.path.append('/path/to/') # the parent directory of the project
sys.path.append('/path/to/project') # these lines only needed if not on path
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproj.settings'

# imports and code below

在代码部分之后,我可以像我的views.py一样写任何东西:)

#######################################
import os,sys
sys.path.append('/home/administrator/development/store')
os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
from django.core.management impor setup_environ
from store import settings
setup_environ(settings)
#######################################

从 http://www.cotellese.net/2007/09/27/running-external-scripts-against-django-models/


我今天遇到了类似的问题。

我不想让服务器通过cron来处理它(而且大多数库最终只是cron助手)。

所以我已经创建了一个调度模块,并将其附加到init。

这不是最好的方法,但它帮助我把所有的代码放在一个地方,它的执行与主应用程序相关。


我只是想到了一个相当简单的解决方案:

定义一个视图函数do_work(req, param),就像使用其他任何视图一样,使用URL映射,返回一个HttpResponse等等。 使用您的定时首选项(或在Windows中使用AT或Scheduled Tasks)设置一个运行curl http://localhost/your/mapped/url?param=value的cron作业。

你可以添加参数,但只是将参数添加到URL。

告诉我你们的想法。

[更新]我现在使用runjob命令从django扩展而不是curl。

我的cron看起来是这样的:

@hourly python /path/to/project/manage.py runjobs hourly

... 每天,每月等等。还可以将其设置为运行特定作业。

我觉得这样更容易管理,也更干净。不需要将URL映射到视图。只需定义作业类和crontab,就完成了。


Brian Neal建议通过cron运行管理命令,但如果您正在寻找更健壮的东西(但不像芹菜那样精细),我会考虑像Kronos这样的库:

# app/cron.py

import kronos

@kronos.register('0 * * * *')
def task():
    pass

我们已经开源了我认为是一个结构化的应用程序,Brian的解决方案也提到了这一点。我们希望得到任何/所有的反馈!

https://github.com/tivix/django-cron

它有一个管理命令:

./manage.py runcrons

这就行了。每个cron都被建模为一个类(所以它都是面向对象的),每个cron以不同的频率运行,我们确保相同的cron类型不会并行运行(以防cron本身的运行时间比它们的频率长!)


RabbitMQ和芹菜比Cron有更多的特性和任务处理能力。如果任务失败不是问题,并且您认为将在下一个调用中处理中断的任务,那么Cron就足够了。

Celery & AMQP将允许您处理中断的任务,它将由另一个worker再次执行(Celery worker侦听下一个要处理的任务),直到到达任务的max_retries属性。您甚至可以在失败时调用任务,比如记录失败,或者在到达max_retries时向管理员发送电子邮件。

当需要扩展应用程序时,可以分发芹菜和AMQP服务器。


是的,上面的方法太棒了。我尝试了其中一些。最后,我找到了这样一个方法:

    from threading import Timer

    def sync():

        do something...

        sync_timer = Timer(self.interval, sync, ())
        sync_timer.start()

就像递归一样。

好的,我希望这个方法能满足你的要求。:)


我用芹菜来创建我的定期任务。首先你需要安装它,如下所示:

pip install django-celery

不要忘记在你的设置中注册django-芹菜,然后你可以这样做:

from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
 #your code

虽然不是Django的一部分,但气流是一个更近期的项目(截至2016年),用于任务管理。

风流是一个工作流自动化和调度系统,可用于创建和管理数据管道。基于web的UI为开发人员提供了一系列管理和查看这些管道的选项。

《气流》是用Python编写的,使用Flask构建的。

“气流”是由Airbnb的Maxime Beauchemin在2015年春天创建的开源软件。它在2016年冬天加入了Apache软件基金会的孵化计划。下面是Git项目页面和一些附加的背景信息。


我不确定这对任何人都有用,因为我必须提供系统的其他用户来调度作业,而不给他们访问实际的服务器(windows)任务调度器,我创建了这个可重用的应用程序。

请注意,用户可以访问服务器上的一个共享文件夹,在那里他们可以创建所需的命令/任务/.bat文件。这个任务然后可以使用这个应用程序安排。

应用程序名称为Django_Windows_Scheduler

截图:


我有完全相同的需求一段时间前,并最终解决它使用APScheduler(用户指南)

它使调度任务超级简单,并使其独立于某些代码的基于请求的执行。下面是一个简单的例子。

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
job = None

def tick():
    print('One tick!')\

def start_job():
    global job
    job = scheduler.add_job(tick, 'interval', seconds=3600)
    try:
        scheduler.start()
    except:
        pass

希望这能帮助到一些人!


一个更现代的解决方案(与芹菜相比)是Django Q: https://django-q.readthedocs.io/en/latest/index.html

它有很好的文档,很容易理解。缺乏Windows支持,因为Windows不支持进程分叉。但是如果您使用Windows for Linux子系统创建您的开发环境,那么它工作得很好。


如果你想要比芹菜更可靠的东西,可以尝试构建在AWS SQS/SNS之上的TaskHawk。

参见:http://taskhawk.readthedocs.io


你一定要看看django-q! 它不需要额外的配置,并且很可能具备在商业项目中处理任何生产问题所需的一切。

它是积极开发的,与django, django ORM, mongo, redis集成得很好。以下是我的配置:

# django-q
# -------------------------------------------------------------------------
# See: http://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
    # Match recommended settings from docs.
    'name': 'DjangoORM',
    'workers': 4,
    'queue_limit': 50,
    'bulk': 10,
    'orm': 'default',

# Custom Settings
# ---------------
# Limit the amount of successful tasks saved to Django.
'save_limit': 10000,

# See https://github.com/Koed00/django-q/issues/110.
'catch_up': False,

# Number of seconds a worker can spend on a task before it's terminated.
'timeout': 60 * 5,

# Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be
# longer than `timeout`, otherwise the same task will be processed multiple times.
'retry': 60 * 6,

# Whether to force all async() calls to be run with sync=True (making them synchronous).
'sync': False,

# Redirect worker exceptions directly to Sentry error reporter.
'error_reporter': {
    'sentry': RAVEN_CONFIG,
},
}

对于简单的dockerized项目,我真的看不到任何现有的答案适合。

所以我写了一个非常简单的解决方案,不需要外部库或触发器,它可以自己运行。不需要外部os-cron,应该可以在任何环境下工作。

它通过添加一个中间件来工作:middleware.py

import threading

def should_run(name, seconds_interval):
    from application.models import CronJob
    from django.utils.timezone import now

    try:
        c = CronJob.objects.get(name=name)
    except CronJob.DoesNotExist:
        CronJob(name=name, last_ran=now()).save()
        return True

    if (now() - c.last_ran).total_seconds() >= seconds_interval:
        c.last_ran = now()
        c.save()
        return True

    return False


class CronTask:
    def __init__(self, name, seconds_interval, function):
        self.name = name
        self.seconds_interval = seconds_interval
        self.function = function


def cron_worker(*_):
    if not should_run("main", 60):
        return

    # customize this part:
    from application.models import Event
    tasks = [
        CronTask("events", 60 * 30, Event.clean_stale_objects),
        # ...
    ]

    for task in tasks:
        if should_run(task.name, task.seconds_interval):
            task.function()


def cron_middleware(get_response):

    def middleware(request):
        response = get_response(request)
        threading.Thread(target=cron_worker).start()
        return response

    return middleware

模型/ cron.py:

from django.db import models


class CronJob(models.Model):
    name = models.CharField(max_length=10, primary_key=True)
    last_ran = models.DateTimeField()

settings.py:

MIDDLEWARE = [
    ...
    'application.middleware.cron_middleware',
    ...
]

简单的方法是编写一个自定义shell命令,参见Django文档,并在linux上使用cronjob执行它。然而,我强烈建议使用像RabbitMQ这样的消息代理加上芹菜。也许你可以看看 本教程


Django APScheduler调度器。Advanced Python Scheduler (APScheduler)是一个Python库,可以安排Python代码稍后执行,可以只执行一次,也可以定期执行。您可以随时添加新作业或删除旧作业。

注:我是这个库的作者

安装APScheduler

pip install apscheduler

查看要调用的文件函数

文件名称:scheduler_jobs.py

def FirstCronTest():
    print("")
    print("I am executed..!")

配置调度程序

创建execute.py文件并添加以下代码

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

在这里,调度器函数写在scheduler_jobs中

import scheduler_jobs 

scheduler.add_job(scheduler_jobs.FirstCronTest, 'interval', seconds=10)
scheduler.start()

链接文件以执行

现在,在Url文件的底部添加下面的行

import execute

您可以通过执行检查完整的代码 (点击这里) https://github.com/devchandansh/django-apscheduler


另一种选择是使用火箭技术:

from rocketry import Rocketry
from rocketry.conds import daily, after_success

app = Rocketry()

@app.task(daily.at("10:00"))
def do_daily():
    ...

@app.task(after_success(do_daily))
def do_after_another():
    ...

if __name__ == "__main__":
    app.run()

它还支持自定义条件:

from pathlib import Path

@app.cond()
def file_exists(file):
    return Path(file).exists()

@app.task(daily & file_exists("myfile.csv"))
def do_custom():
    ...

它也支持Cron:

from rocketry.conds import cron

@app.task(cron('*/2 12-18 * Oct Fri'))
def do_cron():
    ...

它可以很好地与FastAPI集成,我认为它也可以与Django集成,Rocketry本质上只是一个复杂的循环,可以生成异步任务,线程和进程。

声明:我是作者。


另一种选择,类似于Brian Neal的答案是使用RunScripts

这样就不需要设置命令了。这具有更灵活或更清晰的文件夹结构的优点。

该文件必须实现run()函数。这是运行脚本时调用的内容。你可以导入django项目的任何模型或其他部分在这些脚本中使用。

然后,就

python manage.py runscript path.to.script