• Flask框架——基于Celery的后台任务


    目录

    Celery

    安装

    创建Celery程序

    对比说明

    Celery配置

    在Flask项目中使用Celery


    上篇文章我们学习了Flask框架——MongoEngine使用MongoDB数据库,这篇文章我们学习Flask框架——基于Celery的后台任务。

    Celery

    在Web开发中,我们经常会遇到一些耗时的操作,例如:上传/下载数据、发送邮件/短信,执行各种任务等等。这时我们可以使用分布式异步消息任务队列去执行这些任务。

    Celery是一款非常简单、灵活、可靠的分布式异步消息队列工具,可以用于处理大量消息、实时数据以及任务调度。

    Celery通过消息机制进行通信,一般使用中间人(Broker)作为客户端和职程(Worker)调节。

    其工作流程如下图所示:

    客户端发送消息任务给中间人(Broker),任务执行单元(Celery Worker)监控中间人中的任务队列,当中间人有消息任务时就分配任务给任务执行单元,任务执行单元在后台运行任务并返回请求。

    注意:Celery可以有多个职程(Worker)和中间人(Broker),用来提高Celery的高可用性以及横向扩展能力。

    Celery优点

    • 简单:上手比较简单,不需要配置文件就可以直接运行;

    • 高可用:如果出现丢失连接或连接失败,职程(Worker)和客户端会自动重试,并且中间人通过 主/主 主/从 的方式来进行提高可用性;

    • 快速:单个 Celery 进行每分钟可以处理数以百万的任务,而且延迟仅为亚毫秒(使用 RabbitMQ、 librabbitmq 在优化过后);

    • 灵活:Celery 的每个部分几乎都可以自定义扩展和单独使用,例如自定义连接池、序列化方式、压缩方式、日志记录方式、任务调度、生产者、消费者、中间人(Broker)等。

    安装

    Celery安装方式很简单,执行如下命令即可:

    pip install celery
    

    这里我们使用redis作为中间人,执行如下代码安装redis:

    pip install redis
    

    创建Celery程序

    对比说明

    (1)不使用Celery执行耗时任务,创建一个名为test.py文件,其示例代码如下:

    1. import time
    2. def add(a,b):     
    3.     time.sleep(5)    #休眠5秒
    4.     return a+b  
    5. if __name__ == '__main__':
    6.     print('开始执行')
    7.     result=add(2,3)    #调用add函数
    8.     print('执行结束')
    9.     print(result)

    运行test.py文件,运行结果如下图:

    (2)使用Celery执行耗时任务,创建一个名为tasks.py文件,示例代码如下:

    1. import time
    2. from celery import Celery
    3. celery = Celery(        #实例化Celery对象
    4.     'tasks',       #当前模块名
    5.     broker='redis://localhost:6379/1',      #使用redis为中间人
    6.     backend='redis://localhost:6379/2'      #结果存储
    7. )
    8. @celery.task()    #使用异步任务装饰器task
    9. def add(a,b):
    10.     time.sleep(5)   #休眠5秒
    11.     return a+b
    12. if __name__ == '__main__':
    13.     print('开始执行')
    14.     result=add.delay(2,3)   #调用add方法并使用delay延时函数 
    15.     print('执行结束')
    16.     print(result)

    实例化Celery对象,其中第一个参数为当前模块名,第二个参数为中间人(Broker)的URL链接,第三个参数为中间人结果放回的存储URL链接,再调用add()方法时,需要使用delay延时函数。

    运行tasks.py文件,运行结果如下图所示:

    当我们运行tasks.py文件时,发现程序一下子就运行结束并返回任务id,

    在终端执行如下代码运行Celery职程(Worker)服务:

    celery -A tasks worker -l info
    

    如下图所示:

    虽然职程已经收到任务并且在分配到子进程运行了,但是发现该任务没有运行结束,这时因为Celery不支持在windows下运行任务,需要借助eventlet来完成,执行如下安装eventlet:

    pip install eventlet 
    

    安装成功后,执行如下代码运行Celery职程(Worker)服务:

    celery -A tasks worker -l info -P eventlet  -c 10
    

    运行结果如下:

    Celery配置

    大多数情况下,使用默认的配置即可满足我们的开发,不需要修改配置,当我们需要修改配置时,可以通过update进行配置,在上面的tasks.py添加如下代码:

    1. celery.conf.update(
    2.     task_serializer='json',
    3.     accept_content=['json'],   
    4.     result_serializer='json',
    5.     timezone='Europe/Oslo',
    6.     enable_utc=True,
    7. )

    其中:

    • accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;

    • task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;

    • result_serializer:结果序列化格式,默认值为json;

    • timezone:配置Celery以使用自定义时区;

    • enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。

    除了上面的配置参数,Celery还提供了很多很多配置参数,大家可以在官方配置文档中查看

    Celery的配置信息比较多,通常情况下,我们会在tasks.py同级目录下为创建Celery的配置文件, 这里命名为celeryconfig.py,示例代码如下:

    1. broker_url = 'redis://localhost:6379/1'
    2. result_backend = 'redis://localhost:6379/2'
    3. task_serializer = 'json'
    4. result_serializer = 'json'
    5. accept_content = ['json']
    6. timezone = 'Europe/Oslo'
    7. enable_utc = True

    执行如下代码加载配置:

    1. import celeryconfig
    2. app.config_from_object('celeryconfig')

    在Flask项目中使用Celery

    首先创建一个名为mycelery.py文件,该文件用来实例化Celery对象,示例代码如下:

    1. from celery import Celery
    2. def make_celery(app):
    3.     celery = Celery(      #实例化Celery
    4.         'tasks',
    5.         broker='redis://localhost:6379/1',      #使用redis为中间人
    6.         backend='redis://localhost:6379/2'      #结果存储
    7.     )
    8.     class ContextTask(celery.Task):    #创建ContextTask类并继承Celery.Task子类
    9.         def __call__(self, *args, **kwargs): 
    10.             with app.app_context():     #和Flask中的app建立关系
    11.                 return self.run(*args, **kwargs) #返回任务
    12.     celery.Task = ContextTask     #异步任务实例化ContextTask
    13.     return celery        #返回celery对象

    首先自定义一个名为make_celery()方法,该方法传入Flask程序中的app,在方法中实例化Celery,并创建一个名为ContextTask类用来和Flask中的app建立关系,最后返回celery。

    创建名为tasks.py文件,该文件用来存放我们的耗时任务,示例代码如下:

    1. import time
    2. from app import celery
    3. @celery.task   #使用异步任务装饰器task
    4. def add(x, y):
    5.     time.sleep(5)  #休眠5秒
    6.     return x + y

    这里我们通过休眠的方式来模拟耗时的下载任务。

    Flask程序app.py文件示例代码如下:

    1. from flask import Flask
    2. import tasks
    3. from mycelery import make_celery
    4. app = Flask(__name__)
    5. celery = make_celery(app)    #调用make_celery方法并传入app使celery和app进行关联
    6. @app.route('/')
    7. def hello():
    8.     tasks.add.delay(1,2)    #调用tasks文件中的add()异步任务方法
    9.     return '请求正在后台处理中,您可以去处理其他事情'
    10. if __name__ == '__main__':
    11.     app.run(debug=True)

    app.py文件很简单,就调用make_celery方法使celery和app进行关联,并在视图函数中使用tasks中的异步任务方法。

    在终端执行如下代码运行Celery职程(Worker)服务:

    celery -A tasks worker -l info -P eventlet  -c 10
    

    启动Flask程序,访问http://127.0.0.1:5000/后在终端查Worker服务,如下图所示:

    这样就成功使用Celery把耗时任务交给后台来处理,避免了不必要的耗时等待(如下载数据任务)。

    当我们不使用Celery时,用户在执行耗时任务时,用户可能要等耗时任务完成后,才能进行其他操作。

    好了,Flask框架——基于Celery的后台任务就讲到这里了,感谢观看,下篇文章继续学习Flask框架的其他知识。

    公众号:白巧克力LIN

    该公众号发布Python、数据库、Linux、Flask、自动化测试、Git等相关文章!

    - END -

  • 相关阅读:
    Git学习笔记(四)远程仓库
    10-134 4-6 查询在具有最小内存容量的所有PC中具有最快处理器的PC制造商
    Python-中北大学人工智能OpenCV人脸识别(根据图片训练数据,根据训练好的数据识别人脸)
    【算法】背包问题应用
    计算机网络基础(三):IPv4编址方式、子网划分、IPv4通信的建立与验证及ICMP协议
    【王道】计算机组成原理第四章指令系统(四)
    本地环境运行Llama 3大型模型:可行性与实践指南
    【DL论文精读笔记】Object Detection in 20 Years: A Survey目标检测综述
    【数据结构与算法】LinkedList与链表
    Go 语言中的反射
  • 原文地址:https://blog.csdn.net/weixin_52122271/article/details/126222770