• celery命令参数详解及在django中的用法


    celery 应用场景

    • 异步任务:异步执行比较耗时的任务,比如发送短信/邮件、消息推送、音视频处理,文件导入导出等等
    • 定时任务:定时执行某件事情,比如每天数据统计
      在这里插入图片描述
    • 消息中间件(broker):Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括, Redis等等
    • 任务执行单元(workers): Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中(本质,一个work就是一个进程)。
    • 任务结果存储(backend):Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括 redis
    • 任务调度器(beat):Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列
      -生产者 (producer):调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

    win中启动work
    pip install eventlet
    celery -A proj worker -l INFO -P eventlet
    linux启动work
    celery -A proj worker -l DEBUG -f ./log/all.log -Q quene -n hostname

    启动beat的命令(负责每隔几秒钟,向任务队列中提交任务)
    celery -A proj beat -l INFO -f ./log/all.log

    常用参数

    -A/--app	            要使用的应用程序实例
    -n/--hostname	        设置自定义主机名
    -Q/--queues	            指定一个消息队列,该进程只接受此队列的任务
    --max-tasks-per-child	配置工作单元子进程在被一个新进程取代之前可以执行的最大任务数量
    --max-memory-per-child	设置工作单元子进程被替换之前可以使用的最大内存
    -l/--loglevel	        定义打印log的等级 DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL
    --autoscale	            池的进程的最大数量和最小数量
    -c/--concurrency	    同时处理任务的工作进程数量,默认值是系统上可用的cpu数量
    -B/--beat	            定义运行celery打周期任务调度程序
    -h/--help	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    celery在django web项目中的用法

    在项目中的配置(settings.py)

    BROKER_URL: 指定以什么方式作为消息代理
    CELERY_RESULT_BACKEND: 结果存储方式
    CELERY_TASK_SERIALIZER: 任务序列化和反序列化方案 如:'json'
    CELERY_RESULT_SERIALIZER: 结果序列化
    CELERY_ACCEPT_CONTENT: 读取数据可接受的 数据格式 如:['json']
    CELERY_TIMEZONE: 指定时区  如:'Asia/Shanghai'
    CELERY_ENABLE_UTC: 是否启动时区设置,默认值是True
    
    from celery.schedules import crontab
    from datetime import timedelta
    
    CELERY_BEAT_SCHEDULE = {
        'task_name': {
            'task': 'celery_task.tasks.test',
            'schedule': crontab(hour='23', minute='59'),
            # 'schedule': timedelta(seconds=2), 秒级定时
            'args': ()  # 传参
        },
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    crontab 例子

    ExampleMeaning
    crontab()Execute every minute.
    crontab(minute=0, hour=0)Execute daily at midnight.
    crontab(minute=0, hour=‘*/3’)Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
    crontab(minute=0,
    hour=‘0,3,6,9,12,15,18,21’)Same as previous.
    crontab(minute=‘*/15’)Execute every 15 minutes.
    crontab(day_of_week=‘sunday’)Execute every minute (!) at Sundays.
    crontab(minute=‘‘, hour=’’, day_of_week=‘sun’)Same as previous.
    crontab(minute=‘*/10’, hour=‘3,17,22’, day_of_week=‘thu,fri’)Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
    crontab(minute=0, hour=‘/2,/3’)Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
    crontab(minute=0, hour=‘*/5’)Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
    crontab(minute=0, hour=‘*/3,8-17’)Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
    crontab(0, 0, day_of_month=‘2’)Execute on the second day of every month.
    crontab(0, 0, day_of_month=‘2-30/2’)Execute on every even numbered day.
    crontab(0, 0, day_of_month=‘1-7,15-21’)Execute on the first and third weeks of the month.
    crontab(0, 0, day_of_month=‘11’, month_of_year=‘5’)Execute on the eleventh of May every year.
    crontab(0, 0, month_of_year=‘*/3’)Execute every day on the first month of every quarter.

    celery.py

    import os
    
    from celery import Celery
    from kombu import Queue, Exchange
    
    from proj import settings
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    
    app = Celery('proj')
    app.config_from_object('django.conf:settings', namespace='CELERY')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    # 默认没有指定队列的任务一般是放在了celery这个队列来处理的(历史原因被命名为celery),当然也可以通过配置来修改。
    app.conf.task_default_queue = 'default'
    #全局设置任务的默认exchange
    app.conf.task_default_exchange = 'tasks'
    app.conf.task_default_exchange_type = 'topic'
    #全局设置任务的默认routing_key
    app.conf.task_default_routing_key = 'task.default'
    
    #可以通过指定routing_key更具体的路由不同队列
    app.conf.task_queues = (
        Queue('default', routing_key='tasks.#'),
        Queue('celery',routing_key='tasks.#'),
        )
    #全局设置路由配置,将任务进入到指定队列
    app.conf.task_routes = {
        'app01.tasks.test_task': {
            'queue': 'web_tasks',
            'routing_key': 'default',
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    tasks.py

    from proj.celery import app
    
    @app.task
    def task1():
        # something to do
        pass
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    生产者文件 client.py

    # task1.delay()
    task1.apply_async(args=(), kwargs={}, route_name=None, **options)
    
    apply_async 常用的参数如下:
    1. countdown:指定多少秒后执行任务
    task1.apply_async(args=(2, 3), countdown=5)    # 5 秒后执行任务
    2. eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime
    from datetime import datetime, timedelta
    # 当前 UTC 时间再加 10 秒后执行任务
    task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))
     
    
    3. expires:任务过期时间,参数类型可以是 int,也可以是 datetime
    task1.multiply.apply_async(args=[3, 7], expires=10)    # 10 秒后过期
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    java-net-php-python-jsp音像店租赁录像计算机毕业设计程序
    基于SpringBoot+Vue+uniapp的OA办公系统(源码+lw+部署文档+讲解等)
    NeurIPS 2023 | MQ-Det: 首个支持多模态查询的开放世界目标检测大模型
    【高级网络程序设计】Mid-Term Test
    【Jmeter+Influxdb+Grafana性能监控平台安装与部署】
    【Rust 基础篇】Rust 父trait:扩展和组织trait的继承体系
    基于springboot、logback的日志脱敏组件
    测试人进阶技能:单元测试报告应用指南
    【Spring boot】静态资源、首页及Thymeleaf框架
    P4147 玉蟾宫
  • 原文地址:https://blog.csdn.net/Miss_Audrey/article/details/126060755