• Django中使用Celery和APScheduler实现定时任务


    在之前的文章我们已经学习了Celery和APScheduler的基本使用,下面让我们来了解一下如何在Django中使用Celery和APScheduler

    Celery

    1.前提工作

    1. python 3.7
    2. pip install celery
    3. pip install eventlet
    4. #5.0版本以下
    5. pip install importlib-metadata==4.8.3(python3.7下可能会出现报错)

    2.项目结构

    3.异步任务

    1. #tasks.py
    2. from ..main import celery_app
    3. # 装饰器将send_sms_code装饰为异步任务,并设置别名
    4. @celery_app.task(name='send_sms_code')
    5. def send_sms_code(mobile, sms_code):
    6. print("向手机号{}发送验证码{}".format(mobile,sms_code))
    1. # config.py
    2. broker_url = "redis://127.0.0.1:6379/7"
    '
    运行
    1. #main.py
    2. # celery启动⽂件
    3. from celery import Celery
    4. import os
    5. #配置环境
    6. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProject7.settings')
    7. # 创建celery实例
    8. celery_app = Celery('test')
    9. # 加载celery配置
    10. celery_app.config_from_object('celery_tasks.config')
    11. # ⾃动注册celery任务
    12. celery_app.autodiscover_tasks(['celery_tasks.sms'])

    命令启动:

    与manage.py平级,执行命令celery -A celery_tasks.main  worker -l info -P eventlet

    编写视图函数和路由:

    1. #views.py
    2. from django.shortcuts import render,HttpResponse
    3. from celery_tasks.sms import tasks
    4. # Create your views here.
    5. def index(request):
    6. tasks.send_sms_code("13417361123","123456")
    7. return HttpResponse("111")
    8. #urls.py
    9. from django.contrib import admin
    10. from django.urls import path
    11. from app01 import views
    12. urlpatterns = [
    13. path('admin/', admin.site.urls),
    14. path('index/',views.index)
    15. ]

     启动项目,然后访问视图即可

    4.定时任务

    代码只需要改config.py即可:

    1. # config.py
    2. broker_url = "redis://127.0.0.1:6379/7"
    3. from celery_tasks.main import celery_app
    4. from celery.schedules import crontab
    5. #设置定时任务
    6. from datetime import timedelta
    7. # 设置定时任务
    8. celery_app.conf.beat_schedule = {
    9. "test_task": {
    10. "task": "send_sms_code",
    11. # "schedule": crontab(hour=11, minute=28),# 每天的11点28分执行一次任务
    12. "schedule": timedelta(seconds=1), # 每秒执行一次任务
    13. "args": ("13417366781","1111"), # 这里是传递给任务的参数,元组形式
    14. }
    15. }

    然后在终端启动分别执行两条命令(开两个终端执行):

    1. celery --app=celery_tasks.main worker -P eventlet -l INFO
    2. celery -A celery_tasks.main beat

    APScheduler

    1.前提工作

    pip install django-apscheduler

     2.配置

    在settings.py中加入

    1. INSTALLED_APPS = (
    2. # ...
    3. "django_apscheduler",
    4. )
    '
    运行

    apscheduler存在数据库依赖,所以得配置一下数据库信息:

    1. DATABASES = {
    2. 'default': {
    3. 'ENGINE': 'django.db.backends.mysql',
    4. 'NAME': 'test',
    5. 'USER': 'root',
    6. 'PASSWORD': '547710',
    7. 'HOST': 'localhost',
    8. 'PORT': '3306'
    9. }
    10. }
    '
    运行

    然后运行python manage.py migrate,接着会在数据库中生成两张表:

    django_apscheduler_djangojob 表保存注册的任务以及下次执行的时间

    django_apscheduler_djangojobexecution 保存每次任务执行的时间和结果和任务状态

    3.使用

    1. #views.py
    2. from django_apscheduler.jobstores import DjangoJobStore, register_job
    3. from apscheduler.schedulers.blocking import BlockingScheduler
    4. from datetime import datetime
    5. scheduler = BlockingScheduler() # 创建调度器
    6. scheduler.add_jobstore(DjangoJobStore(), "default")
    7. #添加定时任务方式一
    8. @register_job(scheduler, "interval", seconds=5, id="func", replace_existing=True, misfire_grace_time=120)
    9. def job():
    10. print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    11. # 添加定时任务方式二
    12. def job1():
    13. print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    14. scheduler.add_job(
    15. job1,
    16. 'interval',
    17. seconds=5,
    18. id='my_job', # 任务的唯一标识
    19. replace_existing=True,
    20. )
    21. scheduler.start()

     4.启动

    在终端运行python manage.py runserver,效果如下

    数据库表记录(有时间差8小时,这个去配置时区即可): 

    django_apscheduler_djangojob

    django_apscheduler_djangojobexecution  

  • 相关阅读:
    K8S Pod Sidecar 应用场景之一-加入 NGINX Sidecar 做反代和 web 服务器
    栈(顺序栈)实现Language C
    骨传导无线耳机哪款比较好?值得买的骨传导耳机推荐
    SystemVerilog学习-10-验证量化和覆盖率
    pycharm如何配置 .gitignore 文件
    关于官方提供的mindspore镜像mindspore-modelzoo
    GPU加速Pinterest推荐模型,参数量增加100倍,用户活跃度提高16%
    程序单实例运行的一种实现
    9.5 利用可执行内存挑战DEP
    聊一聊固态硬盘的那些事
  • 原文地址:https://blog.csdn.net/m0_71660867/article/details/139380342