• Celery快速使用(定时任务、Django中使用Celery、秒杀逻辑、双写一致性)


    一、Celery快速使用

    简单介绍Celery

    1. Celery官网:http://www.celeryproject.org/
    2. Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform(芹菜是一个资金很少的项目,所以我们不支持微软Windows。 请不要打开任何与该平台相关的问题 )
    3. Celery是独立的服务
      - 可以不依赖任何服务器,通过自身命令,启动服务
      - celery服务为为其他项目服务提供异步解决任务需求的
      - 会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

    安装Celery插件

    pip install celery
    
    • 1

    使用步骤

    1. 新建文件 实例化得到app对象 写函数、任务、注册成Celery的任务
    import time
    from celery import Celery
    
    backend = 'redis://@127.0.0.1:6379/1'
    broker = 'redis://@127.0.0.1:6379/2'
    app = Celery('test', backend=backend, broker=broker)
    
    @app.task
    def add(a, b):
        time.sleep(2)
        print(a + b)
        return a + b
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    1. 新建第二个文件 提交任务>>>>>提交到broker中
    from main import add
    
    print('from s1')
    
    res = add.delay(3, 7)
    print(res)      # 6c3cd997-2726-4ce6-a994-23287c476889 唯一的uuid
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. 启动worker 从broker中获取任务执行 执行完放到backend里面
    命令:
    	Windows
    		celery worker -A main -l info -P eventlet  # 4.x及之前用这个 
            celery -A main worker -l info -P eventlet  # 5.x及之后用这个
    	Mac、Linux
    		celery worker -A main -l info				# 4.x及之前用这个 
    		celery -A main worker -l info				# 5.x及之后用这个
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1. 通过代码查看任务执行结果
    from main import app
    from celery.result import AsyncResult
    id = '7bef14a0-f83e-4901-b4d8-e47aaac09b39'		# 任务id
    if __name__ == '__main__':
        res = AsyncResult(id=id, app=app)
        if res.successful():
            result = res.get()  #10
            print(result)
        elif res.failed():
            print('任务失败')
        elif res.status == 'PENDING':
            print('任务等待中被执行')
        elif res.status == 'RETRY':
            print('任务异常后正在重试')
        elif res.status == 'STARTED':
            print('任务已经开始被执行')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    1. 也可以通过图形化软件查看结果了
      在这里插入图片描述

    二、Celery包结构

    写一个celery的包以后再任意项目中想用把包copy进去导入使用即可

    目录结构

    	celery_task		# 包
    		__init__.py
    		celery.py
    		user_task.py
    		home_task.py
    	add_task.py
    	get_result.py
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    使用步骤

    	新建包celery_task
    	在包先新建一个 celery.py
    	在里面写app的初始化
    	在包里新建user_task.py 编写用户相关任务 
    	在包里新建home_task.py 编写首页相关任务 
    	其它程序,提交任务
    	启动worker ---》它可以先启动,在提交任务之前-->包所在的目录下
    	celery -A celery_task worker -l info			# # 注意名称错误就会报错
    	查看任务执行的结果
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    celery_task/celery.py

    from celery import Celery
    
    backend = 'redis://127.0.0.1:6379/1'
    broker = 'redis://127.0.0.1:6379/0'					# 一定不要忘了include
    app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])	
    
    • 1
    • 2
    • 3
    • 4
    • 5

    celery_task/home_task.py

    from .celery import app
    @app.task
    def add(a, b):
        time.sleep(3)
        print('计算结果是:%s' % (a + b))
        return a + b
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    celery_task/user_task

    import time
    from .celery import app
    @app.task
    def send_sms(mobile, code):
        time.sleep(1)
        print('短信发送成功:%s,验证吗是%s' % (mobile, code))
        return True
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    add_task.py

    from celery_task.user_task import send_sms
    # 提交了一个发送短信异步任务	
    res=send_sms.delay('18723345455','9999')
    print(res)  # 672237ce-c941-415e-9145-f31f90b94627
    
    # 任务执行,要启动worker
    
    # 查看任务执行的结果
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    get_resuly.py

    # 查询执行完的结果
    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '672237ce-c941-415e-9145-f31f90b94627'
    if __name__ == '__main__':
        res = AsyncResult(id=id, app=app)
        if res.successful():
            result = res.get()  #7
            print(result)
        elif res.failed():
            print('任务失败')
        elif res.status == 'PENDING':
            print('任务等待中被执行')
        elif res.status == 'RETRY':
            print('任务异常后正在重试')
        elif res.status == 'STARTED':
            print('任务已经开始被执行')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在这里插入图片描述

    三、Celery异步任务 延时任务 定时任务

    定时任务配置

    app.conf.beat_schedule = {
           'send_sms_task': {
               'task': 'celery_task.user_task.send_sms',
               'schedule': timedelta(seconds=5),
               # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
               'args': ('1897334444', '7777'),
           },
           'add_task': {
               'task': 'celery_task.home_task.add',
               'schedule': crontab(hour=12, minute=10, day_of_week=3),  # 每周3十二点十分
               'args': (10, 20),
           }
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    add_task.py

    from datetime import datetime, timedelta
    from celery_task.home_task import add
    
    eta = datetime.utcnow() + timedelta(seconds=10)
    res = add.apply_async(args=(200, 50), eta=eta)
    print(res)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. 运行项目提价任务
    2. 启动worker执行任务
    celery -A celery_task worker -l info
    
    • 1
    1. 启动beat定时任务
    celery -A celery_task beat -l info
    
    • 1

    在这里插入图片描述
    在这里插入图片描述

    四、Django中使用Celery

    1. 将我们写好的复制到项目路径下
    2. 在包内celery.py中添加代码
    import os       # django中集成celery需要加入
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
    import django
    django.setup()
    
    • 1
    • 2
    • 3
    • 4
    1. 在django的视图类中,导入,提交任务
    from celery_task.user_task import send_sms
    
    def index(request):
        mobile = request.GET.get('mobile')
        res = send_sms.delay(mobile, '8888')
        print(res)
        return HttpResponse('以及发送了!!')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    user_task.py

    import time
    from .celery import app
    from lib.send_tx_sms import send_sms_by_phone
    from user.models import UserInfo
    
    @app.task
    def send_sms(mobile, code):
        send_sms_by_phone(mobile, code)
        user = UserInfo.objects.all().filter(mobile=mobile).first()
        print('给%s发送短信,短息发送成功:%s, 验证码是%s' % (user.username, mobile, code))
        return True
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    1. 启动worker beat (这个时候访问视图就会发送短信了 并发量高)

    在这里插入图片描述

    五、秒杀逻辑

    1. 前端编写秒杀按钮
    2. 事件:向后端秒杀接口发送请求,发送完立马起了一个定时任务,每个5s,向后端查看一下是否秒杀成功,如果秒杀没成功,定时任务继续执行,如果秒杀成功了,清空定时任务,弹窗告诉他
    handleClick() {
          this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => {
            if (res.data.code == 100) {
              let task_id = res.data.id
              this.$message({
                message: res.data.msg,
                type: 'error'
              });
              // 起个定时任务,每隔5s向后端查询一下是否秒杀成功
              let t = setInterval(() => {
                this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(
                    res => {
                      if (res.data.code == 100 || res.data.code == 101) {  //秒杀结束了,要么成功,要么失败了
                        alert(res.data.msg)
                        // 销毁掉定时任务
                        clearInterval(t)
                      } else if (res.data.code == 102) {
                        //什么事都不干
                      }
                    }
                )
              }, 5000)
            }
          })
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    1. 后端秒杀接口 提交秒杀任务
     def seckill(request):
         # 提交秒杀任务
         res = seckill_task.delay()
         return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})
    
    • 1
    • 2
    • 3
    • 4
    1. 查询是否秒杀成功的接口 根据用户传入的id,查询任务是否成功
    def get_result(request):
                task_id = request.GET.get('id')
                res = AsyncResult(id=task_id, app=app)
                if res.successful():
                    result = res.get()  # 7
                    return JsonResponse({'code': 100, 'msg': str(result)})
                elif res.failed():
                    print('任务失败')
                    return JsonResponse({'code': 101, 'msg': '秒杀失败'})
                elif res.status == 'PENDING':
                    print('任务等待中被执行')
                    return JsonResponse({'code': 102, 'msg': '还在排队'})
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    六、双写一致性

    1)路飞项目接口加缓存

    轮播图接口添加缓存 提高响应速度 提高并发量(第一次走数据库查询 第二次之以后都走缓存)

    from django.core.cache import cache
    from utils.response import APIResponse
    
    class BannerView(GenericViewSet, CommonListModelMixin):
        queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
                   :settings.common_settings.BANNER_NUM]
        serializer_class = BannerSerializer
    
        def list(self, request, *args, **kwargs):
            result = cache.get('banner_list')
            if result:
                print('走了缓存 速度很快!!')
                return APIResponse(result=result)
            else:
                print('走了数据库查询 很慢')
                res = super().list(request, *args, **kwargs)
                result = res.data.get('result')
                cache.set('banner_list', result)
                return res
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在这里插入图片描述

    加了缓存,如果mysql数据变了,由于请求的都是缓存的数据,导致mysql和redis的数据不一致
    解决方法

    1. 修改mysql数据库,删除缓存 【缓存的修改是在后】
    2. 修改数据库,修改缓存 【缓存的修改是在后】
    3. 定时更新缓存 —》针对于实时性不是很高的接口适合定时更新

    2)Celery定时任务实现双写一致性

    celery.py

    app.conf.beat_schedule = {
        'update_banner': {
            'task': 'celery_task.home_task.update_banner',
            'schedule': timedelta(seconds=5),
            'args': (),
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    home_task.py

    @app.task
    def update_banner():
        queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:common_settings.BANNER_NUM]
        ser = BannerSerializer(instance=queryset, many=True)
        print(ser.data)
        for item in ser.data:
            item['image'] = dev.HOST_URL + item['image']
        cache.set('banner_list', ser.data)
        return True
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    整体流程

    1. 启动Django
    2. 启动worker
    3. 启动beat
      在这里插入图片描述

    技术小白记录学习过程,有错误或不解的地方请指出,如果这篇文章对你有所帮助请点点赞收藏+关注谢谢支持 !!!

  • 相关阅读:
    小程序的 web-view 组件:实现点击跳转外部链接的高级技巧
    四十五、ORM相关
    二叉树和堆
    Deepfake!黑客冒充非洲联盟主席与多位欧洲领导人通话
    C语言初学者工具选择:vscode + MSYS2 + cmake 搭建 C环境
    【BurpSuite】插件开发学习之J2EEScan(上)-被动扫描
    【Web】https 与 http 的区别
    Linux下对PC/SC智能卡接口编程
    1483. 树节点的第 K 个祖先 折半/倍增
    大数据发展史
  • 原文地址:https://blog.csdn.net/MeiJin_/article/details/127890078