我一直在使用Django开发一个web应用程序,我很好奇是否有一种方法可以安排一个作业定期运行。

基本上,我只是想运行数据库,并在自动的、定期的基础上进行一些计算/更新,但我似乎找不到任何关于这样做的文档。

有人知道怎么设置吗?

澄清一下:我知道我可以设置一个cron作业来完成这个任务,但我很好奇Django中是否有一些特性提供了这个功能。我希望人们能够自己部署这个应用程序,而不需要做很多配置(最好是零配置)。

我曾经考虑过“回溯性”地触发这些操作,方法是简单地检查自上一次请求发送到站点以来作业是否应该运行,但我希望使用更简洁的方法。


当前回答

芹菜是一个分布式任务队列,建立在AMQP (RabbitMQ)上。它还以类似cron的方式处理周期性任务(参见周期性任务)。根据你的应用,它可能值得一看。

用django (docs)设置芹菜非常容易,周期性任务实际上会在停机的情况下跳过错过的任务。芹菜还有内置的重试机制,以防任务失败。

其他回答

在代码部分之后,我可以像我的views.py一样写任何东西:)

#######################################
import os,sys
sys.path.append('/home/administrator/development/store')
os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
from django.core.management impor setup_environ
from store import settings
setup_environ(settings)
#######################################

从 http://www.cotellese.net/2007/09/27/running-external-scripts-against-django-models/

Django APScheduler调度器。Advanced Python Scheduler (APScheduler)是一个Python库,可以安排Python代码稍后执行,可以只执行一次,也可以定期执行。您可以随时添加新作业或删除旧作业。

注:我是这个库的作者

安装APScheduler

pip install apscheduler

查看要调用的文件函数

文件名称:scheduler_jobs.py

def FirstCronTest():
    print("")
    print("I am executed..!")

配置调度程序

创建execute.py文件并添加以下代码

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

在这里,调度器函数写在scheduler_jobs中

import scheduler_jobs 

scheduler.add_job(scheduler_jobs.FirstCronTest, 'interval', seconds=10)
scheduler.start()

链接文件以执行

现在,在Url文件的底部添加下面的行

import execute

您可以通过执行检查完整的代码 (点击这里) https://github.com/devchandansh/django-apscheduler

有趣的可插拔Django应用程序:Django -chronograph

你只需要添加一个作为计时器的cron条目,就可以在脚本中运行一个非常好的Django管理界面。

RabbitMQ和芹菜比Cron有更多的特性和任务处理能力。如果任务失败不是问题,并且您认为将在下一个调用中处理中断的任务,那么Cron就足够了。

Celery & AMQP将允许您处理中断的任务,它将由另一个worker再次执行(Celery worker侦听下一个要处理的任务),直到到达任务的max_retries属性。您甚至可以在失败时调用任务,比如记录失败,或者在到达max_retries时向管理员发送电子邮件。

当需要扩展应用程序时,可以分发芹菜和AMQP服务器。

对于简单的dockerized项目,我真的看不到任何现有的答案适合。

所以我写了一个非常简单的解决方案,不需要外部库或触发器,它可以自己运行。不需要外部os-cron,应该可以在任何环境下工作。

它通过添加一个中间件来工作:middleware.py

import threading

def should_run(name, seconds_interval):
    from application.models import CronJob
    from django.utils.timezone import now

    try:
        c = CronJob.objects.get(name=name)
    except CronJob.DoesNotExist:
        CronJob(name=name, last_ran=now()).save()
        return True

    if (now() - c.last_ran).total_seconds() >= seconds_interval:
        c.last_ran = now()
        c.save()
        return True

    return False


class CronTask:
    def __init__(self, name, seconds_interval, function):
        self.name = name
        self.seconds_interval = seconds_interval
        self.function = function


def cron_worker(*_):
    if not should_run("main", 60):
        return

    # customize this part:
    from application.models import Event
    tasks = [
        CronTask("events", 60 * 30, Event.clean_stale_objects),
        # ...
    ]

    for task in tasks:
        if should_run(task.name, task.seconds_interval):
            task.function()


def cron_middleware(get_response):

    def middleware(request):
        response = get_response(request)
        threading.Thread(target=cron_worker).start()
        return response

    return middleware

模型/ cron.py:

from django.db import models


class CronJob(models.Model):
    name = models.CharField(max_length=10, primary_key=True)
    last_ran = models.DateTimeField()

settings.py:

MIDDLEWARE = [
    ...
    'application.middleware.cron_middleware',
    ...
]