• python下celery的基本使用


    1.基本介绍

    Celery 是由Python 编写的简单,灵活,可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。Celery 专注于实时任务处理,支持任务调度

    简单的说,它就是一个分布式队列的管理工具,用celery提供的接口快速实现并管理一个分布式的任务队列。

    有一点我们需要搞清楚,Celery 本身并不是任务队列,它是一个分布式队列的管理工具,Celery封装好了操作常见任务队列的各种操作,比如说从监听某个任务队列并从该队列中拿到数据进行消费。

    2.使用场景

    它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

    • 异步任务: 将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
    • 定时任务: 定时执行某件事情,比如每天数据统计

    3.工作流程和组成部分

    这里用一张图片说明下:
    在这里插入图片描述

    Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括RabbitMQ, Redis等等,官方推荐用rabbitMQ,因为它持久稳定。

    任务执行单元

    WorkerCelery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis

    另外, Celery还支持不同的并发和序列化的手段。

    • 并发:Prefork, Eventlet, gevent, threads/single threaded
    • 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等
    先安装模块
    pip install celery
    pip install redis
    

    4.Celery执行异步任务

    4.1 基础使用

    这里项目结构如下:

    在这里插入图片描述

    第一步:先创建celery相关配置配置celery_object.py

    import celery
    
    # 执行如下命令: celery -A celery_object worker -l info
    
    backend = "redis://127.0.0.1:6379/4"  # 设置redis的4号数据库来存放结果
    broker = "redis://127.0.0.1:6379/5"  # 设置redis的5号数据库存放消息中间件
    celery_app = celery.Celery(
        "celery_demo",
        backend=backend,
        broker=broker,
        include=[
            "celery_task",
        ],
    )
    
    celery_app.conf.task_serializer = "json"
    celery_app.conf.result_serializer = "json"
    celery_app.conf.accept_content = ["json"]
    
    celery_app.conf.timezone = "Asia/Shanghai"  # 时区
    celery_app.conf.enable_utc = False  # 是否使用UTC
    

    参数说明:

    • backend 就是异步任务执行完成以后,结果存放的地方。
    • broker 就是具体执行任务的工作节点。
    • celery.Celery()方法是实例化一个celery对象。
    • include需要把celery所有的task都写入到列表中。

    第二步:创建任务相关的文件celery_task.py

    import time
    
    from celery_object import celery_app
    
    @celery_app.task
    def send_email(name):
        print("向%s发送邮件..." % name)
        time.sleep(5)
        print("向%s发送邮件完成" % name)
        return f"成功拿到{name}发送的邮件!"
    
    @celery_app.task
    def send_msg(name):
        print("向%s发送短信..." % name)
        time.sleep(5)
        print("向%s发送短信完成" % name)
        return f"成功拿到{name}发送的短信!"
    

    通过@celery_app.task这样的装饰器,成功的把对应的函数变成对应celery的异步worker函数。

    紧接着我们在项目当前所在的目录执行命令:

    celery -A celery_object worker -l info
    
    • -A 指的是application应用对象
    • celery_object 就是脚本的名字(ps:celery对目录-路径-名字要求很高
    • worker 就是工作人(固定写法)
    • -l 指的是日志级别,这里是打印info级别的日志

    之后就可以有下面的输出显示就代表celery启动成功:

    在这里插入图片描述
    之后我们就可以向celery生产任务了,创建produce_result.py文件。

    from celery_task import send_email, send_msg
    
    if __name__ == "__main__":
        for i in range(10):
            result = send_email.delay(f"张三{i}")
            print(result.id)
            result2 = send_msg.delay(f"李四{i}")
            print(result2.id)
    

    运行生产任务的程序,会看到如下的数据,这里打印的就是任务ID。

    在这里插入图片描述

    然后在终端可以看到下面的东西,就代表celery成功的拿到队列中任务 并进行消费了。

    在这里插入图片描述
    然后打开我们的redis可以看到有对应的数据记录。

    在这里插入图片描述

    与此同时 我们还可以查看celery任务ID的状态check_result.py写入如下:

    from celery.result import AsyncResult
    from celery_object import celery_app
    
    async_result = AsyncResult(id="d1c722fa-4ebf-432e-967e-a462bdefeac4", app=celery_app)
    print("任务状态:", async_result.status)
    if async_result.successful():
        result = async_result.get()
        print(result)
        # result.forget() # 将结果删除
    elif async_result.failed():
        print("执行失败")
    elif async_result.status == "PENDING":
        print("任务等待中被执行")
    elif async_result.status == "RETRY":
        print("任务异常后正在重试")
    elif async_result.status == "STARTED":
        print("任务已经开始被执行")
    

    运行结果:

    任务状态: SUCCESS
    成功拿到李四0发送的短信!
    

    4.2 指定任务队列

    Celery 默认使用名为 celery 的队列来存放任务,网上找到一种这样的写法

    # 定义任务队列.
    Queue(‘default’, routing_key=“task.#”),
    
    # 路由键 以 “task.” 开头的消息都进入 default 队列.
    Queue(‘web_tasks’, routing_key=“web.#”)
    
    # 路由键 以 “web.” 开头的消息都进入 web_tasks 队列.)
    CELERY_DEFAULT_EXCHANGE = ‘tasks’
    
    # 默认的交换机名字为
    tasksCELERY_DEFAULT_EXCHANGE_KEY = ‘topic’
    
    # 默认的交换机类型为
    topicCELERY_DEFAULT_ROUTING_KEY = ‘task.default’
    
    # 默认的路由键是 task.default , 这个路由键符合上面的 default 队列.
    CELERY_ROUTES = { ‘proj.tasks.add’: { ‘queue’: ‘web_tasks’, ‘routing_key’: ‘web.add’, }}
    
    # 使用指定队列的方式启动消费者进程.$ celery -A proj worker -Q web_tasks -l info
    
    # 该 worker 只会执行 web_tasks 中任务, 我们可以合理安排消费者数量, 让 web_tasks 中任务的优先级更高.
    
    

    但是感觉不太优雅,这里博主推荐另外一种写法。

    目录结构稍微做了点调整:

    请添加图片描述
    创建celery_object.py文件,写入如下:

    import celery
    
    backend = "redis://127.0.0.1:6379/4"  # 设置redis的4号数据库来存放结果
    broker = "amqp://guest:guest@127.0.0.1:5672"  # 设置mq当做boker中间人消息件
    celery_app = celery.Celery(
        "celery_demo",
        backend=backend,
        broker=broker,
        include=[
            "celery_email_task",
            "celery_msg_task",
        ],
    )
    
    celery_app.conf.task_serializer = "json"
    celery_app.conf.result_serializer = "json"
    celery_app.conf.accept_content = ["json"]
    
    celery_app.conf.timezone = "Asia/Shanghai"  # 时区
    celery_app.conf.enable_utc = False  # 是否使用UTC
    
    celery_app.conf.ONCE = {
        "backend": "celery_once.backends.Redis",
        "settings": {"url": "redis://localhost:6379/8", "default_timeout": 60 * 60},
    }
    
    
    celery_app.conf.task_routes = {
        "send_email": "send_email_queue",
        "send_msg": "send_msg_queue",
    }
    # 启动执行命令: celery -A celery_object worker -l info -Q send_email_queue
    # 启动执行命令: celery -A celery_object worker -l info -Q send_msg_queue
    

    对之前celery的配置文件做了稍微的调整,增加了一个ONCE属性(必要的),又增加task_routes属性(必要的),该属性配置会针对不同的woker选择不同的任务队列。

    然后是celery_email_task.py,写入如下:

    import time
    from celery_once import QueueOnce
    
    from celery_object import celery_app
    
    class SendEmailClass(QueueOnce):
        name = "send_email"
        once = {"graceful": True}
        ignore_result = True
    
        def __init__(self, *args, **kwargs):
            super(SendEmailClass, self).__init__(*args, **kwargs)
    
        def run(self, name):
            print("向%s发送邮件..." % name)
            time.sleep(5)
            print("向%s发送邮件完成" % name)
            return f"成功拿到{name}发送的邮件!"
    
    
    send_email = celery_app.register_task(SendEmailClass())
    

    celery_msg_task.py写入如下:

    import time
    from celery_once import QueueOnce
    
    from celery_object import celery_app
    
    
    class SendEmailClass(QueueOnce):
        name = "send_email"
        once = {"graceful": True}
        ignore_result = True
    
        def __init__(self, *args, **kwargs):
            super(SendEmailClass, self).__init__(*args, **kwargs)
    
        def run(self, name):
            print("向%s发送邮件..." % name)
            time.sleep(5)
            print("向%s发送邮件完成" % name)
            return f"成功拿到{name}发送的邮件!"
    
    
    send_email = celery_app.register_task(SendEmailClass())
    

    参数解释:

    • register_task()是把对应的函数或类装饰成celery对象,效果同@celery_app.task。
    • QueueOnce类 是继承自celery_once父类的子类,自定义类对象和方法必须继承该类,并且该类默认会执行run()方法,支持传参数,我这里是name

    Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base

    @task(base=QueueOnce, once={'graceful': True}) 
    

    后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

    接着创建produce_task.py,写入如下代码:

    from celery_object import celery_app
    
    if __name__ == "__main__":
        for i in range(10):
    
            result3 = celery_app.send_task("send_email", args=[f"王五{i}"])
            print(result3.id)
    

    send_task()方法会向指定的woker发送任务,支持传参数args,也可以指定具体的队列queue,效果同delay()方法类似。

    然后运行命令:

    celery -A celery_object worker -l info -Q send_email_queue
    
    • -Q 参数 是监控并消费指定的任务队列

    看到如下的输出就代表celery启动成功

    请添加图片描述
    运行produce_task.py,开始生产任务:

    请添加图片描述

    打开终端可以看到celery从指定的send_email_queue队列消费数据

    请添加图片描述

    再看看mq是否成功的创建队列
    请添加图片描述
    ok,通过我们这波操作,成功的实现celery监控指定的队列并消费数据。

  • 相关阅读:
    4WRBA6EB15-2X/G24N9Z4/M比例换向阀控制器
    工厂需要什么样的现场管理能力?
    【JVM】为什么静态内部类实现单例模式是线程安全?
    路由守卫的参数to,from,next是什么?怎么用?
    C++11 ——— 类的新功能
    使用Leaflet对WMS做空间几何范围查询
    C# 类class、继承、多态性、运算符重载,相关练习题
    《自控力》读后感
    【Python】环境管理Pipenv
    Base64解码
  • 原文地址:https://blog.csdn.net/weixin_38819889/article/details/127113250