我一直在使用Django开发一个web应用程序,我很好奇是否有一种方法可以安排一个作业定期运行。
基本上,我只是想运行数据库,并在自动的、定期的基础上进行一些计算/更新,但我似乎找不到任何关于这样做的文档。
有人知道怎么设置吗?
澄清一下:我知道我可以设置一个cron作业来完成这个任务,但我很好奇Django中是否有一些特性提供了这个功能。我希望人们能够自己部署这个应用程序,而不需要做很多配置(最好是零配置)。
我曾经考虑过“回溯性”地触发这些操作,方法是简单地检查自上一次请求发送到站点以来作业是否应该运行,但我希望使用更简洁的方法。
我采用的一个解决方案是这样的:
1)创建一个自定义管理命令,例如:
python manage.py my_cool_command
2)使用cron(在Linux上)或at(在Windows上)在需要的时间运行我的命令。
这是一个简单的解决方案,不需要安装沉重的AMQP堆栈。然而,使用像芹菜这样的东西有很好的优点,在其他答案中提到过。特别是,使用芹菜,不需要将应用程序逻辑扩展到crontab文件中是很好的。然而,cron解决方案非常适合中小型应用程序,并且不需要大量外部依赖。
编辑:
在windows的后续版本中,at命令在windows 8、Server 2012及以上版本中已弃用。你可以使用schtasks.exe来做同样的事情。
****更新****
这是django doc中用来编写自定义管理命令的新链接
在代码部分之后,我可以像我的views.py一样写任何东西:)
#######################################
import os,sys
sys.path.append('/home/administrator/development/store')
os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
from django.core.management impor setup_environ
from store import settings
setup_environ(settings)
#######################################
从
http://www.cotellese.net/2007/09/27/running-external-scripts-against-django-models/
我只是想到了一个相当简单的解决方案:
定义一个视图函数do_work(req, param),就像使用其他任何视图一样,使用URL映射,返回一个HttpResponse等等。
使用您的定时首选项(或在Windows中使用AT或Scheduled Tasks)设置一个运行curl http://localhost/your/mapped/url?param=value的cron作业。
你可以添加参数,但只是将参数添加到URL。
告诉我你们的想法。
[更新]我现在使用runjob命令从django扩展而不是curl。
我的cron看起来是这样的:
@hourly python /path/to/project/manage.py runjobs hourly
... 每天,每月等等。还可以将其设置为运行特定作业。
我觉得这样更容易管理,也更干净。不需要将URL映射到视图。只需定义作业类和crontab,就完成了。
我用芹菜来创建我的定期任务。首先你需要安装它,如下所示:
pip install django-celery
不要忘记在你的设置中注册django-芹菜,然后你可以这样做:
from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
#your code
我有完全相同的需求一段时间前,并最终解决它使用APScheduler(用户指南)
它使调度任务超级简单,并使其独立于某些代码的基于请求的执行。下面是一个简单的例子。
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
job = None
def tick():
print('One tick!')\
def start_job():
global job
job = scheduler.add_job(tick, 'interval', seconds=3600)
try:
scheduler.start()
except:
pass
希望这能帮助到一些人!
你一定要看看django-q!
它不需要额外的配置,并且很可能具备在商业项目中处理任何生产问题所需的一切。
它是积极开发的,与django, django ORM, mongo, redis集成得很好。以下是我的配置:
# django-q
# -------------------------------------------------------------------------
# See: http://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
# Match recommended settings from docs.
'name': 'DjangoORM',
'workers': 4,
'queue_limit': 50,
'bulk': 10,
'orm': 'default',
# Custom Settings
# ---------------
# Limit the amount of successful tasks saved to Django.
'save_limit': 10000,
# See https://github.com/Koed00/django-q/issues/110.
'catch_up': False,
# Number of seconds a worker can spend on a task before it's terminated.
'timeout': 60 * 5,
# Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be
# longer than `timeout`, otherwise the same task will be processed multiple times.
'retry': 60 * 6,
# Whether to force all async() calls to be run with sync=True (making them synchronous).
'sync': False,
# Redirect worker exceptions directly to Sentry error reporter.
'error_reporter': {
'sentry': RAVEN_CONFIG,
},
}
对于简单的dockerized项目,我真的看不到任何现有的答案适合。
所以我写了一个非常简单的解决方案,不需要外部库或触发器,它可以自己运行。不需要外部os-cron,应该可以在任何环境下工作。
它通过添加一个中间件来工作:middleware.py
import threading
def should_run(name, seconds_interval):
from application.models import CronJob
from django.utils.timezone import now
try:
c = CronJob.objects.get(name=name)
except CronJob.DoesNotExist:
CronJob(name=name, last_ran=now()).save()
return True
if (now() - c.last_ran).total_seconds() >= seconds_interval:
c.last_ran = now()
c.save()
return True
return False
class CronTask:
def __init__(self, name, seconds_interval, function):
self.name = name
self.seconds_interval = seconds_interval
self.function = function
def cron_worker(*_):
if not should_run("main", 60):
return
# customize this part:
from application.models import Event
tasks = [
CronTask("events", 60 * 30, Event.clean_stale_objects),
# ...
]
for task in tasks:
if should_run(task.name, task.seconds_interval):
task.function()
def cron_middleware(get_response):
def middleware(request):
response = get_response(request)
threading.Thread(target=cron_worker).start()
return response
return middleware
模型/ cron.py:
from django.db import models
class CronJob(models.Model):
name = models.CharField(max_length=10, primary_key=True)
last_ran = models.DateTimeField()
settings.py:
MIDDLEWARE = [
...
'application.middleware.cron_middleware',
...
]
Django APScheduler调度器。Advanced Python Scheduler (APScheduler)是一个Python库,可以安排Python代码稍后执行,可以只执行一次,也可以定期执行。您可以随时添加新作业或删除旧作业。
注:我是这个库的作者
安装APScheduler
pip install apscheduler
查看要调用的文件函数
文件名称:scheduler_jobs.py
def FirstCronTest():
print("")
print("I am executed..!")
配置调度程序
创建execute.py文件并添加以下代码
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
在这里,调度器函数写在scheduler_jobs中
import scheduler_jobs
scheduler.add_job(scheduler_jobs.FirstCronTest, 'interval', seconds=10)
scheduler.start()
链接文件以执行
现在,在Url文件的底部添加下面的行
import execute
您可以通过执行检查完整的代码
(点击这里)
https://github.com/devchandansh/django-apscheduler
另一种选择是使用火箭技术:
from rocketry import Rocketry
from rocketry.conds import daily, after_success
app = Rocketry()
@app.task(daily.at("10:00"))
def do_daily():
...
@app.task(after_success(do_daily))
def do_after_another():
...
if __name__ == "__main__":
app.run()
它还支持自定义条件:
from pathlib import Path
@app.cond()
def file_exists(file):
return Path(file).exists()
@app.task(daily & file_exists("myfile.csv"))
def do_custom():
...
它也支持Cron:
from rocketry.conds import cron
@app.task(cron('*/2 12-18 * Oct Fri'))
def do_cron():
...
它可以很好地与FastAPI集成,我认为它也可以与Django集成,Rocketry本质上只是一个复杂的循环,可以生成异步任务,线程和进程。
声明:我是作者。