在之前的文章我们已经学习了Celery和APScheduler的基本使用,下面让我们来了解一下如何在Django中使用Celery和APScheduler
- python 3.7
- pip install celery
- pip install eventlet
- #5.0版本以下
- pip install importlib-metadata==4.8.3(python3.7下可能会出现报错)
- #tasks.py
- from ..main import celery_app
- # 装饰器将send_sms_code装饰为异步任务,并设置别名
- @celery_app.task(name='send_sms_code')
- def send_sms_code(mobile, sms_code):
- print("向手机号{}发送验证码{}".format(mobile,sms_code))
- # config.py
- broker_url = "redis://127.0.0.1:6379/7"
'运行
- #main.py
- # celery启动⽂件
- from celery import Celery
- import os
-
- #配置环境
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProject7.settings')
- # 创建celery实例
- celery_app = Celery('test')
- # 加载celery配置
- celery_app.config_from_object('celery_tasks.config')
- # ⾃动注册celery任务
- celery_app.autodiscover_tasks(['celery_tasks.sms'])
命令启动:
与manage.py平级,执行命令celery -A celery_tasks.main worker -l info -P eventlet
编写视图函数和路由:
- #views.py
- from django.shortcuts import render,HttpResponse
- from celery_tasks.sms import tasks
- # Create your views here.
- def index(request):
- tasks.send_sms_code("13417361123","123456")
- return HttpResponse("111")
-
- #urls.py
- from django.contrib import admin
- from django.urls import path
- from app01 import views
- urlpatterns = [
- path('admin/', admin.site.urls),
- path('index/',views.index)
- ]
启动项目,然后访问视图即可
代码只需要改config.py即可:
- # config.py
- broker_url = "redis://127.0.0.1:6379/7"
- from celery_tasks.main import celery_app
- from celery.schedules import crontab
-
- #设置定时任务
- from datetime import timedelta
-
- # 设置定时任务
- celery_app.conf.beat_schedule = {
- "test_task": {
- "task": "send_sms_code",
- # "schedule": crontab(hour=11, minute=28),# 每天的11点28分执行一次任务
- "schedule": timedelta(seconds=1), # 每秒执行一次任务
- "args": ("13417366781","1111"), # 这里是传递给任务的参数,元组形式
- }
- }
然后在终端启动分别执行两条命令(开两个终端执行):
- celery --app=celery_tasks.main worker -P eventlet -l INFO
- celery -A celery_tasks.main beat
pip install django-apscheduler
在settings.py中加入
- INSTALLED_APPS = (
- # ...
- "django_apscheduler",
- )
'运行
apscheduler存在数据库依赖,所以得配置一下数据库信息:
- DATABASES = {
- 'default': {
- 'ENGINE': 'django.db.backends.mysql',
- 'NAME': 'test',
- 'USER': 'root',
- 'PASSWORD': '547710',
- 'HOST': 'localhost',
- 'PORT': '3306'
- }
- }
'运行
然后运行python manage.py migrate,接着会在数据库中生成两张表:
django_apscheduler_djangojob 表保存注册的任务以及下次执行的时间
django_apscheduler_djangojobexecution 保存每次任务执行的时间和结果和任务状态
- #views.py
- from django_apscheduler.jobstores import DjangoJobStore, register_job
-
- from apscheduler.schedulers.blocking import BlockingScheduler
- from datetime import datetime
-
- scheduler = BlockingScheduler() # 创建调度器
- scheduler.add_jobstore(DjangoJobStore(), "default")
-
- #添加定时任务方式一
- @register_job(scheduler, "interval", seconds=5, id="func", replace_existing=True, misfire_grace_time=120)
- def job():
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
-
- # 添加定时任务方式二
- def job1():
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- scheduler.add_job(
- job1,
- 'interval',
- seconds=5,
- id='my_job', # 任务的唯一标识
- replace_existing=True,
- )
-
- scheduler.start()
在终端运行python manage.py runserver,效果如下
数据库表记录(有时间差8小时,这个去配置时区即可):
django_apscheduler_djangojob
django_apscheduler_djangojobexecution