安装Celery:
$ pip install -U Celery
Celery官方文档:https://docs.celeryq.dev/en/latest/index.html
最常用的解耦方式之一,寻找中间人(broker)搭桥,保证两个业务没有直接关联。
我们称这一解耦方式为:生产者消费者设计模式
总结:
示例:此处演示Redis数据库作为中间人broker
Celery需要一种解决消息的发送和接受的方式,我们把这种用来存储消息的的中间装置叫做message broker, 也可叫做消息中间人。
作为中间人,我们有几种方案可选择:
如果使用的是Ubuntu或者Debian发行版的Linux,可以直接通过命令安装RabbitMQ:
sudo apt-get install rabbitmq-server
安装完毕之后,RabbitMQ-server服务器就已经在后台运行。
如果用的并不是Ubuntu或Debian, 可以在以下网址:
http://www.rabbitmq.com/download.html
去查找自己所需要的版本软件。
Redis也是一款功能完备的broker可选项,但是其更可能因意外中断或者电源故障导致数据丢失的情况。
关于是由那个Redis作为Broker,可访下面网址: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis
# 队列,中间人
class Broker(object):
# 任务队列
broker_list = []
# 消费者
class Worker(object):
# 任务执行者
def run(self, broker, func):
if func in broker.broker_list:
func()
else:
return 'error'
# Celery 将这3者 串联起来了
class Celery(object):
def __init__(self):
self.broker = Broker()
self.worker = Worker()
def add(self, func):
self.broker.broker_list.append(func)
def work(self, func):
self.worker.run(self.broker,func)
# 任务(函数),生产者
def send_sms_code():
print('send_sms_code')
# 1.创建celery实例
app=Celery()
# 2. 添加任务
app.add(send_sms_code)
# 3.执行任务
app.work(send_sms_code)
1)创建Celery实例并加载配置
1.定义Celery包
2.创建Celery实例
mian文件代码celery_tasks.main.py
# 0. 为celery的运行 设置Django的环境
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'meiduo_mall.settings')
# 1. 创建celery实例
from celery import Celery
# 参数1: main 设置脚本路径就可以了。 脚本路径是唯一的
app=Celery('celery_tasks')
#2. 设置broker
# 我们通过加载配置文件来设置broker
app.config_from_object('celery_tasks.config')
#3.需要celery 自动检测指定包的任务
# autodiscover_tasks 参数是列表
# 列表中的元素是 tasks的路径
app.autodiscover_tasks(['celery_tasks.sms','celery_tasks.email'])
3.加载Celery配置
代码config.py
broker_url = "redis://127.0.0.1/15"
###########################说明#############################
# 如果使用别的作为中间人, 例如使用 rabbitmq
# 则 rabbitmq 配置如下:
# broker_url= 'amqp://用户名:密码@ip地址:5672'
# 例如:
# meihao: 在rabbitq中创建的用户名, 注意: 远端链接时不能使用guest账户.
# 123456: 在rabbitq中用户名对应的密码
# ip部分: 指的是当前rabbitq所在的电脑ip
# 5672: 是规定的端口号
# broker_url = 'amqp://meihao:123456@172.16.238.128:5672'
1.注册任务:celery_tasks.main.py
# 自动注册celery任务
#3.需要celery 自动检测指定包的任务
# autodiscover_tasks 参数是列表
# 列表中的元素是 tasks的路径
app.autodiscover_tasks(['celery_tasks.sms','celery_tasks.email'])
2.定义任务:celery_tasks.sms.tasks.py
task.py
代码
# 生产者 -- 任务,函数
# 1. 这个函数 必须要让celery的实例的 task装饰器 装饰
# 2. 需要celery 自动检测指定包的任务
from libs.yuntongxun.sms import CCP
from celery_tasks.main import app
@app.task
def celery_send_sms_code(mobie,code):
CCP().send_template_sms(mobie,[code,5],1)
示例demo
from celery_tasks.main import celery_app
from libs.yuntongxun.sms import CCP
import logging
logger = logging.getLogger('django')
# name:异步任务别名
@celery_app.task(name='send_sms_code')
def send_sms_code(mobile, sms_code):
"""
发送短信异步任务
:param mobile: 手机号
:param sms_code: 短信验证码
"""
try:
send_ret = CCP().send_template_sms(mobile, [sms_code, 5], 1)
except Exception as e:
logger.error(e)
4)启动Celery服务
进入虚拟环境,输入命令:$ celery -A celery_tasks.main worker -l INFO
发送短信业务逻辑代码
from celery_tasks.sms.tasks import celery_send_sms_code
# delay 的参数 等同于 任务(函数)的参数
celery_send_sms_code.delay(mobile,sms_code)
示例demo
# 发送短信验证码
# CCP().send_template_sms(mobile,[sms_code, 5], 1)
# Celery异步发送短信验证码
send_sms_code.delay(mobile, sms_code)
安装eventlet模块: $ pip install eventlet
启用 Eventlet 池: $ celery -A celery_tasks.main worker -l info -P eventlet -c 1000