celery 应用场景

win中启动work
pip install eventlet
celery -A proj worker -l INFO -P eventlet
linux启动work
celery -A proj worker -l DEBUG -f ./log/all.log -Q quene -n hostname
启动beat的命令(负责每隔几秒钟,向任务队列中提交任务)
celery -A proj beat -l INFO -f ./log/all.log
常用参数
-A/--app 要使用的应用程序实例
-n/--hostname 设置自定义主机名
-Q/--queues 指定一个消息队列,该进程只接受此队列的任务
--max-tasks-per-child 配置工作单元子进程在被一个新进程取代之前可以执行的最大任务数量
--max-memory-per-child 设置工作单元子进程被替换之前可以使用的最大内存
-l/--loglevel 定义打印log的等级 DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL
--autoscale 池的进程的最大数量和最小数量
-c/--concurrency 同时处理任务的工作进程数量,默认值是系统上可用的cpu数量
-B/--beat 定义运行celery打周期任务调度程序
-h/--help
celery在django web项目中的用法
在项目中的配置(settings.py)
BROKER_URL: 指定以什么方式作为消息代理
CELERY_RESULT_BACKEND: 结果存储方式
CELERY_TASK_SERIALIZER: 任务序列化和反序列化方案 如:'json'
CELERY_RESULT_SERIALIZER: 结果序列化
CELERY_ACCEPT_CONTENT: 读取数据可接受的 数据格式 如:['json']
CELERY_TIMEZONE: 指定时区 如:'Asia/Shanghai'
CELERY_ENABLE_UTC: 是否启动时区设置,默认值是True
from celery.schedules import crontab
from datetime import timedelta
CELERY_BEAT_SCHEDULE = {
'task_name': {
'task': 'celery_task.tasks.test',
'schedule': crontab(hour='23', minute='59'),
# 'schedule': timedelta(seconds=2), 秒级定时
'args': () # 传参
},
}
crontab 例子
| Example | Meaning |
|---|---|
| crontab() | Execute every minute. |
| crontab(minute=0, hour=0) | Execute daily at midnight. |
| crontab(minute=0, hour=‘*/3’) | Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm. |
| crontab(minute=0, | |
| hour=‘0,3,6,9,12,15,18,21’) | Same as previous. |
| crontab(minute=‘*/15’) | Execute every 15 minutes. |
| crontab(day_of_week=‘sunday’) | Execute every minute (!) at Sundays. |
| crontab(minute=‘‘, hour=’’, day_of_week=‘sun’) | Same as previous. |
| crontab(minute=‘*/10’, hour=‘3,17,22’, day_of_week=‘thu,fri’) | Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays. |
| crontab(minute=0, hour=‘/2,/3’) | Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
| crontab(minute=0, hour=‘*/5’) | Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5). |
| crontab(minute=0, hour=‘*/3,8-17’) | Execute every hour divisible by 3, and every hour during office hours (8am-5pm). |
| crontab(0, 0, day_of_month=‘2’) | Execute on the second day of every month. |
| crontab(0, 0, day_of_month=‘2-30/2’) | Execute on every even numbered day. |
| crontab(0, 0, day_of_month=‘1-7,15-21’) | Execute on the first and third weeks of the month. |
| crontab(0, 0, day_of_month=‘11’, month_of_year=‘5’) | Execute on the eleventh of May every year. |
| crontab(0, 0, month_of_year=‘*/3’) | Execute every day on the first month of every quarter. |
celery.py
import os
from celery import Celery
from kombu import Queue, Exchange
from proj import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 默认没有指定队列的任务一般是放在了celery这个队列来处理的(历史原因被命名为celery),当然也可以通过配置来修改。
app.conf.task_default_queue = 'default'
#全局设置任务的默认exchange
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
#全局设置任务的默认routing_key
app.conf.task_default_routing_key = 'task.default'
#可以通过指定routing_key更具体的路由不同队列
app.conf.task_queues = (
Queue('default', routing_key='tasks.#'),
Queue('celery',routing_key='tasks.#'),
)
#全局设置路由配置,将任务进入到指定队列
app.conf.task_routes = {
'app01.tasks.test_task': {
'queue': 'web_tasks',
'routing_key': 'default',
}
}
tasks.py
from proj.celery import app
@app.task
def task1():
# something to do
pass
生产者文件 client.py
# task1.delay()
task1.apply_async(args=(), kwargs={}, route_name=None, **options)
apply_async 常用的参数如下:
1. countdown:指定多少秒后执行任务
task1.apply_async(args=(2, 3), countdown=5) # 5 秒后执行任务
2. eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime
from datetime import datetime, timedelta
# 当前 UTC 时间再加 10 秒后执行任务
task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))
3. expires:任务过期时间,参数类型可以是 int,也可以是 datetime
task1.multiply.apply_async(args=[3, 7], expires=10) # 10 秒后过期