study_celery
| --user
|-- models.py
|--views.py
|--urls.py
|--celery_task
|--__init__.py
|--async_task.py
|-- celery.py
| --check_task.py
| --config.py
| --scheduler_task.py
| --study_celery
| --settings.py
| --manage.py
依赖:redis数据库
- redis==4.6.0
- Django==3.2
- django-redis==5.3.0
- celery==5.3.1
config.py
- from celery.schedules import crontab
- from datetime import timedelta
- '''
- 参数解析:
- accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
- task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
- result_serializer:结果序列化格式,默认值为json;
- timezone:配置Celery以使用自定义时区;
- enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
- result_expires: 异步任务结果存活时长
- beat_schedule:设置定时任务
- '''
- #手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
- task_module = [
- 'celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法
- 'celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
- ]
-
- #celery的配置
- config = {
- "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
- "result_backend" : 'redis://127.0.0.1:6379/1',
- "task_serializer" : 'json',
- "result_serializer" : 'json',
- "accept_content" : ['json'],
- "timezone" : 'Asia/Shanghai',
- "enable_utc" : False,
- "result_expires" : 1*60*60,
- "beat_schedule" : { #定时任务配置
- # 名字随意命名
- 'add-func-30-seconds': {
- # 执行add_task下的addy函数
- 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
- # 每10秒执行一次
- 'schedule': timedelta(seconds=30),
- # add函数传递的参数
- 'args': (10, 21)
- },
- # 名字随意起
- 'add-func-5-minutes': {
- 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
- # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
- 'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
- 'args': (19, 22) # 定时任务需要的参数
- },
- # 缓存用户数据到cache中
- 'cache-user-func': {
- 'task': 'celery_task.scheduler_task.cache_user_func',
- # 导入任务函数:from celery_task.scheduler_task import cache_user_func
- 'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中
- }
- }
- }
celery.py
- from celery import Celery
- from celery.schedules import crontab
- from datetime import timedelta
- from .config import config,task_module
-
- # 生成celery对象,'task'相当于key,用于区分celery对象
- # broker是指定消息处理,backend是指定结果后端的存储位置 include参数需要指定任务模块
- app = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
- app.conf.update(**config)
async_task.py
- '1、因为需要用到django中的内容,所以需要配置django环境'
- import os
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")#根目录下study_celery/settings.py
- import django
- django.setup()
-
- # 导入celery对象app
- from celery_task.celery import app
- # 导入django自带的发送邮件模块
- from django.core.mail import send_mail
- import threading
- from study_celery import settings
- from apps.user.models import UserModel
- '''
- 2、异步任务
- 不保存函数返回值的,@app.task(ignore_result=True)
- 保存函数返回值的任务,@app.task
- '''
-
- #没有返回值,禁用掉结果后端
- @app.task
- def send_email_task(email,code): # 此时可以直接传邮箱,还能减少一次数据库的IO操作
- '''
- :param email: 接收消息的邮箱,用户的邮箱
- :return:
- '''
- # 启用线程发送邮件,此处最好加线程池
- t = threading.Thread(
- target=send_mail,
- args=(
- "登录前获取的验证码", # 邮件标题
- '点击该邮件激活你的账号,否则无法登陆', # 给html_message参数传值后,该参数信息失效
- settings.EMAIL_HOST_USER, # 用于发送邮件的邮箱地址
- [email], # 接收邮件的邮件地址,可以写多个
- ),
- # html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中
- kwargs={
- 'html_message': f"
验证码:{code}
" - }
- )
- t.start()
- return {'email':email,'code':code}
-
- @app.task
- def search_user_task():
- users = UserModel.objects.all()
- lis = []
- for user in users:
- dic = {'id':user.id,'name':user.name}
- lis.append(dic)
- return {'users':lis}
scheduler_task.py
- '1、因为需要用到django中的内容,所以需要配置django环境 '
- import os
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")#根目录下study_celery/settings.py
- import django
- django.setup()
-
- from celery_task.celery import app
- from apps.user.views import models as user_models
- from django.core.cache import cache
- import time
- from django.forms import model_to_dict
-
- '2、定时任务'
- #有返回值,返回值可以从结果后端中获取
- @app.task
- def add_func(a,b):
- print('执行了加法函数')
- cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a+b})
- return a+b
-
- #不需要返回值,禁用掉结果后端
- @app.task(ignore_result=True)
- def cache_user_func():
- user = user_models.UserModel.objects.all()
- user_dict = {}
- for obj in user:
- user_dict[obj.account] = model_to_dict(obj)
- cache.set('all-user-data',user_dict,timeout=35*60)
-
-
-
check_task.py
- from celery.result import AsyncResult
- from celery_task.celery import app
- '''验证任务的执行状态的'''
-
- def check_task_status(task_id):
- '''
- 任务的执行状态:
- PENDING :等待执行
- STARTED :开始执行
- RETRY :重新尝试执行
- SUCCESS :执行成功
- FAILURE :执行失败
- :param task_id:
- :return:
- '''
- result = AsyncResult(id=task_id, app=app)
- dic = {
- 'type':result.status,
- 'msg':'',
- 'data':'',
- 'code':400
- }
- if result.status == 'PENDING':
- dic['msg'] = '任务等待中'
- elif result.status == 'STARTED':
- dic['msg'] = '任务开始执行'
- elif result.status == 'RETRY':
- dic['msg']='任务重新尝试执行'
- elif result.status =='FAILURE':
- dic['msg'] = '任务执行失败了'
- elif result.status == 'SUCCESS':
- result = result.get()
- dic['msg'] = '任务执行成功'
- dic['data'] = result
- dic['code'] = 200
- # result.forget() # 将结果删除
- # async.revoke(terminate=True) # 无论现在是什么时候,都要终止
- # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
- return dic
-
-
模块注册
- #app注册
- INSTALLED_APPS = [
- 'django.contrib.admin',
- 'django.contrib.auth',
- 'django.contrib.contenttypes',
- 'django.contrib.sessions',
- 'django.contrib.messages',
- 'django.contrib.staticfiles',
- 'user.apps.UserConfig',
- ]
-
- #cache缓存
- CACHES = {
- "default": {
- "BACKEND": "django_redis.cache.RedisCache",
- "LOCATION": "redis://127.0.0.1:6379/2",
- "OPTIONS": {
- "CLIENT_CLASS": "django_redis.client.DefaultClient",
- "CONNECTION_POOL_KWARGS": {"max_connections": 1000}
- # "PASSWORD": "123",
- },
- 'TIMEOUT':30*60 #缓存过期时间
- }
- }
-
- #邮件配置
- # EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
- EMAIL_HOST = 'smtp.qq.com' # 如果是 163 改成 smtp.163.com
- EMAIL_PORT = 465
- EMAIL_HOST_USER = 'xxxx@qq.com' # 发送邮件的邮箱帐号
- EMAIL_HOST_PASSWORD = 'xxx' # 授权码,各邮箱的设置中启用smtp服务时获取
- DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
- # 这样收到的邮件,收件人处就会这样显示
- # DEFAULT_FROM_EMAIL = '2333<'1234567890@qq.com>'
- EMAIL_USE_SSL = True # 使用ssl
- # EMAIL_USE_TLS = False # 使用tls
- # EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True
1、获取cache对象,操作cache
from django.core.cache import cachecache.set(key,value)
cache.get(key)
2、执行异步任务
from celery_task.async_task import send_email_taskres = send_email_task.delay('xxx@qq.com','23456')
task_id = res.id #获取异步任务的id,通过该id可用获取任务的运行状态
from celery_task.async_task import search_user_task res = search_user_task.delay()
django项目
python manage.py runserver
celery框架
#windows系统
celery -A celery_task.celery worker -l info -P eventlet
#linux系统
celery -A celery_task.celery worker -l info
定时任务启动
celery -A celery_task beat -l info