• Celery基本语法


    Celery

    1. 定义

    Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统

    它是一个专注于实时处理的任务队列,同时也支持任务调度

    中文官网:http://docs.jinkan.org/docs/celery/

    在线安装 sudo pip3 install -U Celery

    离线安装

    tar xvfz celery-0.0.0.tar.gz
    cd celery-0.0.0
    python3 setup.py build
    python3 setup.py install
    
    • 1
    • 2
    • 3
    • 4

    名词解释:

    broker - 消息传输的中间件,生产者一旦有消息发送,将发至broker;【RQ,redis】

    backend - 用于存储消息/任务结果,如果需要跟踪和查询任务状态,则需添加要配置相关

    worker - 工作者 - 消费/执行broker中消息/任务的进程

    2. 使用Celery

    2.1 创建woker

    #创建 tasks.py 文件
    
    from celery import Celery
    #初始化celery, 指定broker
    app = Celery('chaogege', broker='redis://:@127.0.0.1:6379/1')
    
    # 如果redis有密码,可添加password
    # app = Celery('chaogege', broker='redis://:password@127.0.0.1:6379/1')
    
    # 创建任务函数
    @app.task
    def task_test():
        print("task is running....") 
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    #Ubuntu 终端中, tasks.py文件同级目录下 执行
    celery -A tasks worker --loglevel=info
    #执行后终端显示如下,证明成功!
    
    • 1
    • 2
    • 3

    2.2 创建生产者 - 推送任务

    ​ 在tasks.py文件的同级目录进入 ipython3 执行 如下代码

    from tasks import task_test
    task_test.delay()
    #执行后,worker终端中现如如下
    
    • 1
    • 2
    • 3

    2.3 存储执行结果

    ​ Celery提供存储任务执行结果的方案,需借助 redis 或 mysql 或Memcached 等

    ​ 详情可见 http://docs.celeryproject.org/en/latest/reference/celery.result.html#module-celery.result

    #创建 tasks_result.py
    from celery import Celery
    app = Celery('demo',
                 broker='redis://@127.0.0.1:6379/1',
                 backend='redis://@127.0.0.1:6379/2',
                 )
    
    # 创建任务函数
    @app.task
    def task_test(a, b):
        print("task is running")
        return a + b
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    tasks_result.py 同级目录终端中-启动celery worker

    celery -A tasks_result worker --loglevel=info
    
    • 1

    在相同目录下 打开终端创建生产者 - 同【上步】;执行成功后,可调用如下方法取得执行结果

    from tasks_result import task_test
    s = task_test.delay(10,100)
    
    • 1
    • 2

    3. Django + Celery

    1,创建项目+应用

    #常规命令
    django-admin startproject test_celery
    cd test_celery
    python3 manage.py startapp user
    # 并在settings.py中添加应用
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2,创建celery.py

    在settings.py同级目录下 创建 celery.py文件

    文件内容如下:

    from celery import Celery
    from django.conf import settings
    import os
    
    
    # 为celery设置环境变量
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test_celery.settings')
    
    # 创建应用
    app = Celery("test_celery", broker='redis://@127.0.0.1:6379/1')
    
    # 设置app自动加载任务
    app.autodiscover_tasks(settings.INSTALLED_APPS)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    3, 在应用模块【user目录下】创建tasks.py文件

    文件内容如下:

    from test_celery.celery import app
    import time
    
    @app.task
    def task_test():
        print("task begin....")
        time.sleep(10)
        print("task over....")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4, 应用视图编写;内容如下:

    from django.http import HttpResponse
    from .tasks import task_test
    import datetime
    
    def test_celery(request):
        task_test.delay()
    	now = datetime.datetime.now()
        html = "return at %s"%(now.strftime('%H:%M:%S'))
        return HttpResponse(html)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    5, 设置路由

    # test_celery.urls
    from django.urls import path, include
    
    urlpatterns = [
        path('admin/', admin.site.urlcs),
        path('v1/users/', include('user.urls')),
    ]
    
    # user.urls
    from django.urls import path
    from . import views
    
    urlpatterns = [
        path('test_celery', views.test_celery),
    ]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    6, 启动django python3 manage.py runserver

    7, 创建 celery worker

    ​ 在项目路径下,即test_celery 下 执行如下

    celery -A test_celery worker -l info
    
    • 1

    8,浏览器中执行对应url

    4. 生产环境 启动

    4.1 并发模式切换

    ​ 默认并发采用 - prefork

    ​ 推荐采用 - gevent 模式 - 协程模式

    celery -A proj worker -P gevent -c 1000
    # P POOL Pool implementation: 支持 perfork or eventlet or gevent
    # C CONCURRENCY 并发数
    
    • 1
    • 2
    • 3

    4.2 后台启动命令

    nohup celery -A proj worker -P gevent -c 1000 > celery.log 2>&1 &
    
    #1,nohup: 忽略所有挂断(SIGHUP)信号
    #2,标准输入是文件描述符0。它是命令的输入,缺省是键盘,也可以是文件或其他命令的输出。
    #标准输出是文件描述符1。它是命令的输出,缺省是屏幕,也可以是文件。
    #标准错误是文件描述符2。这是命令错误的输出,缺省是屏幕,同样也可以是文件。
    #3,&符号:代表将命令在后台执行
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    外星人Alienware m16R1 原厂Windows11系统 oem系统
    苹果全球销量超越小米重回第二,荣耀回归国内手机市场第一梯队
    基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持本地图片上传与回显的功能实现(二)
    二手车价格预测 | 构建AI模型并部署Web应用 ⛵
    【UT】如何进行单元测试
    人工智能基础_机器学习040_Sigmoid函数详解_单位阶跃函数与对数几率函数_伯努利分布---人工智能工作笔记0080
    架构师-软件工程习题选择题
    【Java基础】方法
    Opencv 4.5.5 linux contrib编译
    SpringBoot使用Nacos作为配置中心服务
  • 原文地址:https://blog.csdn.net/weixin_45185267/article/details/126261469