• 分布式任务队列系统 celery 进阶


    通过前面的入门,我们大概了解了celery的工作原理及简单的入门代码示例(传送门),下面进行一些稍微复杂的任务调度学习

    多目录结构异步执行

    在实际项目中,使用Celery进行异步任务处理时,经常需要将代码组织在多个目录和模块中,以便更好地管理和维护。下面展示如何在多目录结构下使用Celery进行异步任务执行。

    项目结构

    假设我们有以下项目结构:

    在这里插入图片描述

    1. 配置文件 (config/celery_config.py)

    首先,我们需要配置Celery的broker和backend。在这个示例中,我们将配置文件放在 config 目录下。

    broker_url = 'redis://localhost:32769/0'
    result_backend = 'redis://localhost:32769/1'
    
    '
    运行

    2. 定义任务 (app/task1.py 和 app/task2.py)

    接下来,在 app 目录下定义我们的任务函数。

    task1

    import time
    
    from celery_demo.multi_task.app.celery_worker import celery_app
    
    
    @celery_app.task
    def send_email(name):
        print(f"开始给{name}发送邮件")
        time.sleep(1)
        return "done"
    
    
    

    task2

    import time
    
    from celery_demo.multi_task.app.celery_worker import celery_app
    
    
    @celery_app.task
    def send_msg(name):
        print(f"开始给{name}发送短信")
        time.sleep(1)
        return "done"
    
    

    3. 启动Worker (app/celery_worker.py)

    为了能够启动worker来处理任务,我们需要编写一个脚本来启动它。在这个示例中,这个脚本放在 app/worker.py 中。

    注意!:include参数的task路径必须写全,不然各种找不到模块报错

    from celery import Celery
    
    celery_app = Celery('celery_worker', include=['celery_demo.multi_task.app.task1', 'celery_demo.multi_task.app.task2'])
    celery_app.config_from_object('celery_demo.multi_task.config.celery_config')
    
    # 时区
    celery_app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    celery_app.conf.enable_utc = False
    
    # 启动celery,准备好接收消息,一旦接收到消息就执行任务,并存储结果
    if __name__ == '__main__':
        celery_app.worker_main(['worker', '--loglevel=info'])
    
    

    运行命令启动Worker:

    在根目录下:
    celery -A celery_demo.multi_task.app.celery_worker worker --loglevel=info
    

    或者直接运行该脚本:

    /usr/bin/python3 /Users/fangyirui/PycharmProjects/pythonProject/celery_demo/multi_task/app/celery_worker.py 
    

    或者pychram可以右键直接run

    4. 主程序调用异步任务 (main.py)

    最后,在主程序中调用这些异步任务。我们可以通过导入 add.delay() 或者其他方法来发送消息到消息队列。

    from celery_demo.multi_task.app import task1, task2
    
    result1 = task1.send_email.delay('AA')
    result2 = task2.send_msg.delay('BB')
    
    print(result1)
    print(result1.get())
    print(result2)
    print(result2.get())
    
    

    总结

    在上述示例中,通过合理的项目结构,将不同功能模块分离开来,使得代码更加清晰易维护。具体步骤如下:

    1. 配置文件:定义了Celery的broker和backend设置。
    2. 定义任务:创建了包含具体业务逻辑的异步函数,并用@task装饰器标记为可被调度执行的task.
    3. 启动Worker:编写了用于启动celerey worker 的脚本,使其能够从消息队列拉取并执行相应操作.
    4. 主程序调用:通过导入task 模块中的方法,实现对某些操作发起异步请求.
    5. 注意各种模块的导包和引用需要把路径写全,不然就算pycharm不提示报错,运行时也会报错,博主我已经踩了很多坑了!

    定时任务

    代码结构目录

    在这里插入图片描述

    启动beat服务方式

    配置 Celery 应用

    创建一个名为 celery_worker.py 的文件,并配置你的 Celery 应用。

    from datetime import timedelta
    
    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery('celery_worker', broker='redis://localhost:32769/0', backend='redis://localhost:32769/1',
                 include=['task']
                 )
    
    app.conf.beat_schedule = {
        'add-every-seconds': {
            # 指定执行的任务
            'task': 'task.add',
    
            # 单位为秒,每10秒触发一次
            # 'schedule': 10.0,
    
            # corn表达式触发,每一分钟触发一次
            # 'schedule': crontab(minute="*/1"),
    
            # 每年6月5号,16点0分执行
            'schedule': crontab(minute=0, hour=16, day_of_month=5, month_of_year=6),
    
            # 每6秒执行一次
            # 'schedule': timedelta(seconds=6),
    
            'args': (16, 16)
        },
    }
    app.conf.timezone = 'Asia/Shanghai'
    
    

    定义任务

    创建一个名为 task.py 的文件,并定义你的任务。

    from celery_worker import app
    
    
    @app.task
    def add(x, y):
        print(f"执行成功: {x} {y} ,结果为{x + y}")
        return x + y
    
    

    启动 Worker 和 Beat 服务

    启动 worker:

    celery -A celery_worker worker --loglevel=info
    

    启动 beat:

    celery -A celery_worker beat --loglevel=info
    

    这样,Celery Beat 会按照你配置的时间间隔发送 add(16, 16) 的任务到队列中,而 Worker 会从队列中取出并执行这个任务。

    单个任务时指定时间

    send_task.py

    from datetime import datetime
    from datetime import timedelta
    
    from task import add
    
    # # 方式一
    # # 如果是过去的时间,则会立马执行
    # v1 = datetime(2024, 6, 5, 16, 5, 00)
    # print(v1)
    # # 如果不转utc时间,会比预计的晚8个小时才执行
    # v2 = datetime.utcfromtimestamp(v1.timestamp())
    # print(v2)
    # result = add.apply_async(args=[5, 5], eta=v2)
    # print(result.id)
    
    # 方式二
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    
    # 用当前时间延迟10秒执行
    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    
    # 使用apply_async并设定时间
    result = add.apply_async(args=[7, 7], eta=task_time)
    print(result.id)
    
    

    zsh: command not found: celery

    问题描述

    在终端执行celery的命令时报错,zsh: command not found: celery

    但执行pip3 show celery查看已经安装过的celery时,发现安装没有问题:

    ➜ pythonProject pip3 show celery
    Name: celery
    Version: 5.4.0
    Summary: Distributed Task Queue.
    Home-page: https://docs.celeryq.dev/
    Author: Ask Solem
    Author-email: auvipy@gmail.com
    License: BSD-3-Clause
    Location: /Users/fangyirui/Library/Python/3.9/lib/python/site-packages
    Requires: billiard, python-dateutil, click-didyoumean, click, click-repl, click-plugins, vine, kombu, tzdata
    Required-by: 
    

    在网上查阅其他解决方案后,问题仍没有解决,那么你很可能跟我一样还没有将 Python 二进制安装目录添加到 PATH 中。

    解决方案:

    在 .zshrc 文件中添加你的python/bin目录: /Users/fangyirui/Library/Python/3.9/bin

    或者执行:

    export PATH=$PATH:/Users/fangyirui/Library/Python/3.9/bin
    
  • 相关阅读:
    排序1---插入排序
    java基于ssm+vue高校人事管理系统
    Linux基本指令(上)
    为什么不建议在MySQL中使用 utf8?
    ORB(Oriented FAST and Rotated BRIEF)
    在线协作文档哪家强?微软 Loop 、Notion、FlowUs
    移动端列表分页,删除交互优化
    UE5 局域网联机,寻找会话失败。
    Linux笔记--vi编辑器
    HTML+CSS详细知识点(下)
  • 原文地址:https://blog.csdn.net/weixin_39743356/article/details/139480091