• (二)Celery: 分布式异步消息任务队列



    1. Celery 介绍

    Celery 的分布式表明多个客户端可同时访问;它的异步性体现在:Celery 可以将生产完毕的任务放入消息队列,待消费端空闲后可自行去取,而不必阻塞生产端的运行。

    Celery 本身不提供消息队列的功能,通常通过中间件(broker)实现,常用中间件有 RabbitMQ 和 Redis 等。同时为了便于查找任务执行过程中的状态,还需要存储器来保存每次任务的执行结果,可以使用各类数据库以及 Redis 等。


    2. 基本使用

    结合上一节的内容,首先定义一个 Celery 对象,中间件和存储(backend)均使用 Redis。

    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    celery = Celery(main=__name__, broker=broker, backend=backend)
    
    • 1
    • 2
    • 3

    初始化 Celery 对象后,通常使用装饰器指定可被 Celery 调度的任务。

    @celery.task()
    def add(x, y):
    	return x + y
    
    • 1
    • 2
    • 3

    然后,通过命令行启动 Celery 服务。

    celery worker -A tasks -l info
    
    • 1

    其中,-A 用于指定 Celery 实例、worker 启动 Celery 服务、-l 指定日志等级。

    $ celery -A main worker -l info
     
     -------------- celery@${machinename} v5.2.7 (dawn-chorus)
    --- ***** ----- 
    -- ******* ---- Linux-5.15.0-41-generic-x86_64-with-debian-bullseye-sid 2022-07-31 10:38:07
    - *** --- * --- 
    - ** ---------- [config]
    - ** ---------- .> app:         main:0x7f9ab8843f90
    - ** ---------- .> transport:   redis://127.0.0.1:6379/1
    - ** ---------- .> results:     redis://127.0.0.1:6379/2
    - *** --- * --- .> concurrency: 16 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
                    
    
    [tasks]
      . main.add
    
    [2022-07-31 10:38:08,160: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
    [2022-07-31 10:38:08,161: INFO/MainProcess] mingle: searching for neighbors
    [2022-07-31 10:38:09,168: INFO/MainProcess] mingle: all alone
    [2022-07-31 10:38:09,177: INFO/MainProcess] celery@cn0014010240l ready.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    借由 Celery 提供的分布式功能,当有多个应用程序调用任务时,为了保证调用不出错,Celery 可指定队列名来确定应用程序操作指定任务。在上述启动 Celery 时,选项 [queues] 表明默认使用的队列是 celery,在启动时也可通过 -Q 命令指定监听的队列。

    在输出的 [tasks] 选项可看到当前可调用任务为 main.add。然后在应用程序中通过 delay 或 apply_async 调用指定任务。

    >>> from main import add
    >>> r = add.delay(2, 6)
    >>> r.ready()	# 判断当前任务是否执行完毕
    True
    >>> r.get()	# 获取任务执行结果
    8		
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    其他的,可使用 r.state 或 r.status 获取任务状态。


    3. 总结

    本文介绍了 Celery 的安装和基本使用方法,官方文档:https://docs.celeryq.dev/en/stable/index.html.


  • 相关阅读:
    SLAM从入门到精通(rviz自定义形状)
    《每日一题》NO.43:如何使用CG(clock gating) cell?
    神经网络怎么解决过拟合,解决神经网络过拟合
    vue3 html2canvas 网络图片 空白 及使用
    Ant Design Pro of Vue 构建打包后使用 Nginx 发布,API 请求报 404 错
    flutter单独页面修改状态栏不生效问题
    openGauss数据库的安装部署
    【可视化分析案例】用python分析B站Top100排行榜数据
    Ubuntu下QT操作Mysql数据库
    JAVA注解总结
  • 原文地址:https://blog.csdn.net/Skies_/article/details/126082021