• Django框架集成Celery异步-【2】:django集成celery,拿来即用,可用操作django的orm等功能


    一、项目结构和依赖

    study_celery

              | --user

                    |-- models.py

                    |--views.py

                    |--urls.py

              |--celery_task

                      |--__init__.py

                      |--async_task.py

                      |-- celery.py

                      | --check_task.py

                      | --config.py

                      | --scheduler_task.py

            | --study_celery

                    | --settings.py

            | --manage.py

    依赖:redis数据库
     

    1. redis==4.6.0
    2. Django==3.2
    3. django-redis==5.3.0
    4. celery==5.3.1

    二、celery框架配置详情

    1、配置文件

    config.py

    1. from celery.schedules import crontab
    2. from datetime import timedelta
    3. '''
    4. 参数解析:
    5. accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
    6. task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
    7. result_serializer:结果序列化格式,默认值为json;
    8. timezone:配置Celery以使用自定义时区;
    9. enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
    10. result_expires: 异步任务结果存活时长
    11. beat_schedule:设置定时任务
    12. '''
    13. #手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
    14. task_module = [
    15. 'celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法
    16. 'celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
    17. ]
    18. #celery的配置
    19. config = {
    20. "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
    21. "result_backend" : 'redis://127.0.0.1:6379/1',
    22. "task_serializer" : 'json',
    23. "result_serializer" : 'json',
    24. "accept_content" : ['json'],
    25. "timezone" : 'Asia/Shanghai',
    26. "enable_utc" : False,
    27. "result_expires" : 1*60*60,
    28. "beat_schedule" : { #定时任务配置
    29. # 名字随意命名
    30. 'add-func-30-seconds': {
    31. # 执行add_task下的addy函数
    32. 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
    33. # 每10秒执行一次
    34. 'schedule': timedelta(seconds=30),
    35. # add函数传递的参数
    36. 'args': (10, 21)
    37. },
    38. # 名字随意起
    39. 'add-func-5-minutes': {
    40. 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
    41. # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
    42. 'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
    43. 'args': (19, 22) # 定时任务需要的参数
    44. },
    45. # 缓存用户数据到cache中
    46. 'cache-user-func': {
    47. 'task': 'celery_task.scheduler_task.cache_user_func',
    48. # 导入任务函数:from celery_task.scheduler_task import cache_user_func
    49. 'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中
    50. }
    51. }
    52. }

    2、celery对象创建

    celery.py

    1. from celery import Celery
    2. from celery.schedules import crontab
    3. from datetime import timedelta
    4. from .config import config,task_module
    5. # 生成celery对象,'task'相当于key,用于区分celery对象
    6. # broker是指定消息处理,backend是指定结果后端的存储位置 include参数需要指定任务模块
    7. app = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
    8. app.conf.update(**config)

    3、异步任务模块

    async_task.py

    1. '1、因为需要用到django中的内容,所以需要配置django环境'
    2. import os
    3. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")#根目录下study_celery/settings.py
    4. import django
    5. django.setup()
    6. # 导入celery对象app
    7. from celery_task.celery import app
    8. # 导入django自带的发送邮件模块
    9. from django.core.mail import send_mail
    10. import threading
    11. from study_celery import settings
    12. from apps.user.models import UserModel
    13. '''
    14. 2、异步任务
    15. 不保存函数返回值的,@app.task(ignore_result=True)
    16. 保存函数返回值的任务,@app.task
    17. '''
    18. #没有返回值,禁用掉结果后端
    19. @app.task
    20. def send_email_task(email,code): # 此时可以直接传邮箱,还能减少一次数据库的IO操作
    21. '''
    22. :param email: 接收消息的邮箱,用户的邮箱
    23. :return:
    24. '''
    25. # 启用线程发送邮件,此处最好加线程池
    26. t = threading.Thread(
    27. target=send_mail,
    28. args=(
    29. "登录前获取的验证码", # 邮件标题
    30. '点击该邮件激活你的账号,否则无法登陆', # 给html_message参数传值后,该参数信息失效
    31. settings.EMAIL_HOST_USER, # 用于发送邮件的邮箱地址
    32. [email], # 接收邮件的邮件地址,可以写多个
    33. ),
    34. # html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中
    35. kwargs={
    36. 'html_message': f"

      验证码:{code}

      "
    37. }
    38. )
    39. t.start()
    40. return {'email':email,'code':code}
    41. @app.task
    42. def search_user_task():
    43. users = UserModel.objects.all()
    44. lis = []
    45. for user in users:
    46. dic = {'id':user.id,'name':user.name}
    47. lis.append(dic)
    48. return {'users':lis}

    4、定时任务模块

    scheduler_task.py

    1. '1、因为需要用到django中的内容,所以需要配置django环境 '
    2. import os
    3. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")#根目录下study_celery/settings.py
    4. import django
    5. django.setup()
    6. from celery_task.celery import app
    7. from apps.user.views import models as user_models
    8. from django.core.cache import cache
    9. import time
    10. from django.forms import model_to_dict
    11. '2、定时任务'
    12. #有返回值,返回值可以从结果后端中获取
    13. @app.task
    14. def add_func(a,b):
    15. print('执行了加法函数')
    16. cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a+b})
    17. return a+b
    18. #不需要返回值,禁用掉结果后端
    19. @app.task(ignore_result=True)
    20. def cache_user_func():
    21. user = user_models.UserModel.objects.all()
    22. user_dict = {}
    23. for obj in user:
    24. user_dict[obj.account] = model_to_dict(obj)
    25. cache.set('all-user-data',user_dict,timeout=35*60)

    5、检测任务完成状态

    check_task.py

    1. from celery.result import AsyncResult
    2. from celery_task.celery import app
    3. '''验证任务的执行状态的'''
    4. def check_task_status(task_id):
    5. '''
    6. 任务的执行状态:
    7. PENDING :等待执行
    8. STARTED :开始执行
    9. RETRY :重新尝试执行
    10. SUCCESS :执行成功
    11. FAILURE :执行失败
    12. :param task_id:
    13. :return:
    14. '''
    15. result = AsyncResult(id=task_id, app=app)
    16. dic = {
    17. 'type':result.status,
    18. 'msg':'',
    19. 'data':'',
    20. 'code':400
    21. }
    22. if result.status == 'PENDING':
    23. dic['msg'] = '任务等待中'
    24. elif result.status == 'STARTED':
    25. dic['msg'] = '任务开始执行'
    26. elif result.status == 'RETRY':
    27. dic['msg']='任务重新尝试执行'
    28. elif result.status =='FAILURE':
    29. dic['msg'] = '任务执行失败了'
    30. elif result.status == 'SUCCESS':
    31. result = result.get()
    32. dic['msg'] = '任务执行成功'
    33. dic['data'] = result
    34. dic['code'] = 200
    35. # result.forget() # 将结果删除
    36. # async.revoke(terminate=True) # 无论现在是什么时候,都要终止
    37. # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    38. return dic

    三、django项目的配置

    1、settings.py

    模块注册

    1. #app注册
    2. INSTALLED_APPS = [
    3. 'django.contrib.admin',
    4. 'django.contrib.auth',
    5. 'django.contrib.contenttypes',
    6. 'django.contrib.sessions',
    7. 'django.contrib.messages',
    8. 'django.contrib.staticfiles',
    9. 'user.apps.UserConfig',
    10. ]
    11. #cache缓存
    12. CACHES = {
    13. "default": {
    14. "BACKEND": "django_redis.cache.RedisCache",
    15. "LOCATION": "redis://127.0.0.1:6379/2",
    16. "OPTIONS": {
    17. "CLIENT_CLASS": "django_redis.client.DefaultClient",
    18. "CONNECTION_POOL_KWARGS": {"max_connections": 1000}
    19. # "PASSWORD": "123",
    20. },
    21. 'TIMEOUT':30*60 #缓存过期时间
    22. }
    23. }
    24. #邮件配置
    25. # EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
    26. EMAIL_HOST = 'smtp.qq.com' # 如果是 163 改成 smtp.163.com
    27. EMAIL_PORT = 465
    28. EMAIL_HOST_USER = 'xxxx@qq.com' # 发送邮件的邮箱帐号
    29. EMAIL_HOST_PASSWORD = 'xxx' # 授权码,各邮箱的设置中启用smtp服务时获取
    30. DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
    31. # 这样收到的邮件,收件人处就会这样显示
    32. # DEFAULT_FROM_EMAIL = '2333<'1234567890@qq.com>'
    33. EMAIL_USE_SSL = True # 使用ssl
    34. # EMAIL_USE_TLS = False # 使用tls
    35. # EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True

    2、关于视图函数的方法

    1、获取cache对象,操作cache

    from django.core.cache import cache

    cache.set(key,value)

    cache.get(key)

    2、执行异步任务

    from celery_task.async_task import send_email_task

    res = send_email_task.delay('xxx@qq.com','23456')

    task_id = res.id #获取异步任务的id,通过该id可用获取任务的运行状态

    from celery_task.async_task import search_user_task
    res = search_user_task.delay()

    四、启动项目:项目根目录下执行

    django项目

    python manage.py runserver

    celery框架

    #windows系统

    celery -A celery_task.celery worker -l info  -P  eventlet

    #linux系统

    celery -A celery_task.celery worker -l info 

    定时任务启动

    celery -A celery_task beat -l info

    五、码云地址

    django配置celery: django配置使用celery,django使用celery,django+celeryicon-default.png?t=N7T8https://gitee.com/liuhaizhang/django-configuration-celery

  • 相关阅读:
    AI资讯--Meta AI工具“指哪打哪“;OpenAI CEO事件梳理;
    React 学习笔记目录
    传统三维重建和深度学习三维重建 MVS笔记总结、问题总结
    Windows 11 22621.1 , 10.0.22622.290 文件资源管理器中启用多标签,全新导航栏
    实时数据仓库-从0到1实时数据仓库设计&实现(SparkStreaming3.x)
    【微众银行秋招】230903三、平均值 <前缀和>
    【vue】vue本地储存、会话存储插件vue-ls的使用:
    推出多项云安全服务和功能,亚马逊云科技 re:Inforce 全球大会来袭,为企业创新护航!
    this用法,systemVerilog语法
    解析标准树状文件为sql语句及代码解析(python版本,txt,mysql)
  • 原文地址:https://blog.csdn.net/weixin_46371752/article/details/133806513