简单介绍Celery
安装Celery插件
pip install celery
使用步骤
import time
from celery import Celery
backend = 'redis://@127.0.0.1:6379/1'
broker = 'redis://@127.0.0.1:6379/2'
app = Celery('test', backend=backend, broker=broker)
@app.task
def add(a, b):
time.sleep(2)
print(a + b)
return a + b
from main import add
print('from s1')
res = add.delay(3, 7)
print(res) # 6c3cd997-2726-4ce6-a994-23287c476889 唯一的uuid
命令:
Windows
celery worker -A main -l info -P eventlet # 4.x及之前用这个
celery -A main worker -l info -P eventlet # 5.x及之后用这个
Mac、Linux
celery worker -A main -l info # 4.x及之前用这个
celery -A main worker -l info # 5.x及之后用这个
from main import app
from celery.result import AsyncResult
id = '7bef14a0-f83e-4901-b4d8-e47aaac09b39' # 任务id
if __name__ == '__main__':
res = AsyncResult(id=id, app=app)
if res.successful():
result = res.get() #10
print(result)
elif res.failed():
print('任务失败')
elif res.status == 'PENDING':
print('任务等待中被执行')
elif res.status == 'RETRY':
print('任务异常后正在重试')
elif res.status == 'STARTED':
print('任务已经开始被执行')
写一个celery的包以后再任意项目中想用把包copy进去导入使用即可
目录结构
celery_task # 包
__init__.py
celery.py
user_task.py
home_task.py
add_task.py
get_result.py
使用步骤
新建包celery_task
在包先新建一个 celery.py
在里面写app的初始化
在包里新建user_task.py 编写用户相关任务
在包里新建home_task.py 编写首页相关任务
其它程序,提交任务
启动worker ---》它可以先启动,在提交任务之前-->包所在的目录下
celery -A celery_task worker -l info # # 注意名称错误就会报错
查看任务执行的结果
celery_task/celery.py
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0' # 一定不要忘了include
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])
celery_task/home_task.py
from .celery import app
@app.task
def add(a, b):
time.sleep(3)
print('计算结果是:%s' % (a + b))
return a + b
celery_task/user_task
import time
from .celery import app
@app.task
def send_sms(mobile, code):
time.sleep(1)
print('短信发送成功:%s,验证吗是%s' % (mobile, code))
return True
add_task.py
from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务
res=send_sms.delay('18723345455','9999')
print(res) # 672237ce-c941-415e-9145-f31f90b94627
# 任务执行,要启动worker
# 查看任务执行的结果
get_resuly.py
# 查询执行完的结果
from celery_task.celery import app
from celery.result import AsyncResult
id = '672237ce-c941-415e-9145-f31f90b94627'
if __name__ == '__main__':
res = AsyncResult(id=id, app=app)
if res.successful():
result = res.get() #7
print(result)
elif res.failed():
print('任务失败')
elif res.status == 'PENDING':
print('任务等待中被执行')
elif res.status == 'RETRY':
print('任务异常后正在重试')
elif res.status == 'STARTED':
print('任务已经开始被执行')
定时任务配置
app.conf.beat_schedule = {
'send_sms_task': {
'task': 'celery_task.user_task.send_sms',
'schedule': timedelta(seconds=5),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': ('1897334444', '7777'),
},
'add_task': {
'task': 'celery_task.home_task.add',
'schedule': crontab(hour=12, minute=10, day_of_week=3), # 每周3十二点十分
'args': (10, 20),
}
}
add_task.py
from datetime import datetime, timedelta
from celery_task.home_task import add
eta = datetime.utcnow() + timedelta(seconds=10)
res = add.apply_async(args=(200, 50), eta=eta)
print(res)
celery -A celery_task worker -l info
celery -A celery_task beat -l info
import os # django中集成celery需要加入
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
import django
django.setup()
from celery_task.user_task import send_sms
def index(request):
mobile = request.GET.get('mobile')
res = send_sms.delay(mobile, '8888')
print(res)
return HttpResponse('以及发送了!!')
user_task.py
import time
from .celery import app
from lib.send_tx_sms import send_sms_by_phone
from user.models import UserInfo
@app.task
def send_sms(mobile, code):
send_sms_by_phone(mobile, code)
user = UserInfo.objects.all().filter(mobile=mobile).first()
print('给%s发送短信,短息发送成功:%s, 验证码是%s' % (user.username, mobile, code))
return True
handleClick() {
this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => {
if (res.data.code == 100) {
let task_id = res.data.id
this.$message({
message: res.data.msg,
type: 'error'
});
// 起个定时任务,每隔5s向后端查询一下是否秒杀成功
let t = setInterval(() => {
this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(
res => {
if (res.data.code == 100 || res.data.code == 101) { //秒杀结束了,要么成功,要么失败了
alert(res.data.msg)
// 销毁掉定时任务
clearInterval(t)
} else if (res.data.code == 102) {
//什么事都不干
}
}
)
}, 5000)
}
})
}
def seckill(request):
# 提交秒杀任务
res = seckill_task.delay()
return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})
def get_result(request):
task_id = request.GET.get('id')
res = AsyncResult(id=task_id, app=app)
if res.successful():
result = res.get() # 7
return JsonResponse({'code': 100, 'msg': str(result)})
elif res.failed():
print('任务失败')
return JsonResponse({'code': 101, 'msg': '秒杀失败'})
elif res.status == 'PENDING':
print('任务等待中被执行')
return JsonResponse({'code': 102, 'msg': '还在排队'})
轮播图接口添加缓存 提高响应速度 提高并发量(第一次走数据库查询 第二次之以后都走缓存)
from django.core.cache import cache
from utils.response import APIResponse
class BannerView(GenericViewSet, CommonListModelMixin):
queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
:settings.common_settings.BANNER_NUM]
serializer_class = BannerSerializer
def list(self, request, *args, **kwargs):
result = cache.get('banner_list')
if result:
print('走了缓存 速度很快!!')
return APIResponse(result=result)
else:
print('走了数据库查询 很慢')
res = super().list(request, *args, **kwargs)
result = res.data.get('result')
cache.set('banner_list', result)
return res
加了缓存,如果mysql数据变了,由于请求的都是缓存的数据,导致mysql和redis的数据不一致
解决方法
celery.py
app.conf.beat_schedule = {
'update_banner': {
'task': 'celery_task.home_task.update_banner',
'schedule': timedelta(seconds=5),
'args': (),
}
}
home_task.py
@app.task
def update_banner():
queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:common_settings.BANNER_NUM]
ser = BannerSerializer(instance=queryset, many=True)
print(ser.data)
for item in ser.data:
item['image'] = dev.HOST_URL + item['image']
cache.set('banner_list', ser.data)
return True
整体流程
技术小白记录学习过程,有错误或不解的地方请指出,如果这篇文章对你有所帮助请
点点赞收藏+关注
谢谢支持 !!!