Celery 的分布式表明多个客户端可同时访问;它的异步性体现在:Celery 可以将生产完毕的任务放入消息队列,待消费端空闲后可自行去取,而不必阻塞生产端的运行。
Celery 本身不提供消息队列的功能,通常通过中间件(broker)实现,常用中间件有 RabbitMQ 和 Redis 等。同时为了便于查找任务执行过程中的状态,还需要存储器来保存每次任务的执行结果,可以使用各类数据库以及 Redis 等。
结合上一节的内容,首先定义一个 Celery 对象,中间件和存储(backend)均使用 Redis。
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
celery = Celery(main=__name__, broker=broker, backend=backend)
初始化 Celery 对象后,通常使用装饰器指定可被 Celery 调度的任务。
@celery.task()
def add(x, y):
return x + y
然后,通过命令行启动 Celery 服务。
celery worker -A tasks -l info
其中,-A 用于指定 Celery 实例、worker 启动 Celery 服务、-l 指定日志等级。
$ celery -A main worker -l info
-------------- celery@${machinename} v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.15.0-41-generic-x86_64-with-debian-bullseye-sid 2022-07-31 10:38:07
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: main:0x7f9ab8843f90
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. main.add
[2022-07-31 10:38:08,160: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2022-07-31 10:38:08,161: INFO/MainProcess] mingle: searching for neighbors
[2022-07-31 10:38:09,168: INFO/MainProcess] mingle: all alone
[2022-07-31 10:38:09,177: INFO/MainProcess] celery@cn0014010240l ready.
借由 Celery 提供的分布式功能,当有多个应用程序调用任务时,为了保证调用不出错,Celery 可指定队列名来确定应用程序操作指定任务。在上述启动 Celery 时,选项 [queues] 表明默认使用的队列是 celery,在启动时也可通过 -Q 命令指定监听的队列。
在输出的 [tasks] 选项可看到当前可调用任务为 main.add。然后在应用程序中通过 delay 或 apply_async 调用指定任务。
>>> from main import add
>>> r = add.delay(2, 6)
>>> r.ready() # 判断当前任务是否执行完毕
True
>>> r.get() # 获取任务执行结果
8
其他的,可使用 r.state 或 r.status 获取任务状态。
本文介绍了 Celery 的安装和基本使用方法,官方文档:https://docs.celeryq.dev/en/stable/index.html.