• Flask框架配置celery-[1]:flask工厂模式集成使用celery,可在异步任务中使用flask应用上下文,即拿即用,无需更多配置


    一、概述

    1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。

    2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的

    3、使用celery是需要使用到中间件的,简单点就使用redis做中间件

    注意:

    在flask工厂模式集成celery异步框架,在celery的异步任务中能够获取到flask的应用上下文管理器,也就是说在celery异步任务中你可以去调用flask项目中功能,如orm操作等。

    使用本文配置,可以无需修改flask创建app应用的程序,直接将celery相关包创建,运行就可以使用,且能够在异步任务使用flask的功能。

    二、项目结构

    依赖环境:

    1. celery==4.4.7
    2. eventlet==0.33.3
    3. Flask==2.1.3
    4. Flask-Caching==1.10.1
    5. Flask-Cors==3.0.10
    6. Flask-Migrate==2.7.0
    7. Flask-RESTful==0.3.9
    8. Flask-SocketIO==5.1.1
    9. Flask-SQLAlchemy==2.5.1
    10. PyMySQL==1.0.2
    11. redis==3.5.3
    12. SQLAlchemy==1.4.0
    13. Werkzeug==2.0.2

    目录结构:

    flask-project

            |--apps

                    |-- user

                            |-- models

                            |--views.py

                            |--urls.py

                    |--__init__.py

            |--ext

                    |--__init__.py

                    |--config.py

            |--celery_task

                    |--__init__.py

                    |--async_task.py

                    |--celery.py

                    |--celeryconfig.py

                    |--check_task.py

                    |--scheduler_task.py

            app.py

    三、flask工厂模式下各模块功能

    1、apps/user/models.py : 写了一个user表

    2、apps/user/views.py:写了测试调用celery异步任务的接口

    3、apps/user/urls.py: 注册路由的

    4、ext/__init__.py:cache、db、cors的拓展

    5、ext/config.py : cache和cors使用到的配置

    6、apps/__init__.py: 一个函数create_app,生成flask应用对象

    7、app.py: 启动flask应用对象的模块

    本文重点不在flask工厂模式,默认看官都懂如何创建flaks工厂模式的项目了。

    在视图中在执行异步任务,并获取异步任务的id:

    1. from celery_task.async_task import send_email_task,cache_user_task
    2. #用户资源:get\put\delete, 对单个进行操作
    3. class UserOneResource(ResourceBase):
    4. def put(self,id):
    5. #测试异步发邮件
    6. email = request.args.get('email')
    7. code = request.args.get('code')
    8. res = send_email_task.delay(email,code)
    9. print(res.id)
    10. return NewResponse(msg='put',data={'task_id':res.id})
    11. def patch(self,id):
    12. #测试异步操作flask的orm和cache
    13. p = request.args.get('p')
    14. if p=='set':
    15. res = cache_user_task.delay()
    16. print(res,type(res))
    17. return NewResponse(msg='patch',data={'task_id':res.id})
    18. else:
    19. from ext import cache
    20. data = cache.get('all-user-data')
    21. return NewResponse(msg='patch',data=data)

    res = 异步函数.delay(函数需要的参数)

    task_id = res.id

    注意:task_id 可以知道对应的任务的完成情况,获取任务的返回值等。

    四、celery项目的配置

    1、celery的配置

    将celery的配置都放到一个py文件中,方便后期的维护和使用

    celeryconfig.py

    1. from celery.schedules import crontab
    2. from datetime import timedelta
    3. '''
    4. 参数解析:
    5. accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
    6. task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
    7. result_serializer:结果序列化格式,默认值为json;
    8. timezone:配置Celery以使用自定义时区;
    9. enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
    10. result_expires: 异步任务结果存活时长
    11. beat_schedule:设置定时任务
    12. '''
    13. #手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
    14. task_module = [
    15. 'celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法
    16. 'celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
    17. ]
    18. #celery的配置
    19. config = {
    20. "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
    21. "result_backend" : 'redis://127.0.0.1:6379/1',
    22. "task_serializer" : 'json',
    23. "result_serializer" : 'json',
    24. "accept_content" : ['json'],
    25. "timezone" : 'Asia/Shanghai',
    26. "enable_utc" : False,
    27. "result_expires" : 1*60*60,
    28. "beat_schedule" : { #定时任务配置
    29. # 名字随意命名
    30. 'add-func-30-seconds': {
    31. # 执行add_task下的addy函数
    32. 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
    33. # 每10秒执行一次
    34. 'schedule': timedelta(seconds=30),
    35. # add函数传递的参数
    36. 'args': (10, 21)
    37. },
    38. # 名字随意起
    39. 'add-func-5-minutes': {
    40. 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
    41. # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
    42. 'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
    43. 'args': (19, 22) # 定时任务需要的参数
    44. },
    45. # 缓存用户数据到cache中
    46. 'cache-user-func': {
    47. 'task': 'celery_task.scheduler_task.cache_user_func',
    48. # 导入任务函数:from celery_task.scheduler_task import cache_user_func
    49. 'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中
    50. }
    51. }
    52. }

    2、创建celery对象

    celery.py

    1. from celery import Celery,Task
    2. from .celeryconfig import config,task_module
    3. import sys
    4. import os
    5. '1、把flask项目路径添加到系统环境变量中'
    6. project_path = os.path.dirname(os.path.dirname(__file__))
    7. sys.path.append(project_path)
    8. '''
    9. 2、创建celery应用对象
    10. 'task'可以任务是该celery对象名字,用于区分celery对象
    11. broker是指定消息中间件
    12. backend是指定任务结果存储位置
    13. include是手动指定异步任务所在的模块的位置
    14. '''
    15. #创建celery异步对象
    16. celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
    17. #导入一些基本配置
    18. celery.conf.update(**config)
    19. '3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'
    20. class ContextTask(celery.Task):
    21. def __call__(self, *args, **kwargs):
    22. from apps import create_app
    23. app = create_app()
    24. with app.app_context():
    25. return self.run(*args, **kwargs)
    26. celery.Task = ContextTask

    注意:

    1、第一步很关键,设置到python项目运行时,加载环境变量的问题。这一步是将flask项目的根目录加载环境变量中,这样第3步才能从apps中导入create_app函数。

    2、第二步是创建celery通用的方法了,没什么好说的。

    3、第三步很关键,涉及到celery异步任务能否在flask应用上下文管理器运行,从而可以调用flask中的功能,例如orm操作,cache操作.。(在执行任务时,先套上flask的应用上下文管理器)

    3、异步任务模块

    将所有异步任务相关的函数都集中到一个模块中,方便维护和使用。

    async_task.py

    1. # 导入celery对象app
    2. from celery_task.celery import celery
    3. from ext import cache
    4. import time
    5. '''
    6. 1、没有返回值的,@app.task(ignore_result=True)
    7. 2、有返回值的任务,@app.task 默认就是(ignore_result=False)
    8. '''
    9. # 没有返回值,禁用掉结果后端
    10. @celery.task
    11. def send_email_task(receiver_email,code): # 此时可以直接传邮箱,还能减少一次数据库的IO操作
    12. '''
    13. :param email: 接收消息的邮箱,用户的邮箱
    14. :return:
    15. '''
    16. # 模拟邮件发送验证码
    17. time.sleep(5)
    18. return {'result':'邮件已经发送',receiver_email:'2356'}
    19. @celery.task
    20. def cache_user_task():
    21. #orm查询数据,放到cache中
    22. from apps.user.models import UserModel
    23. user = UserModel.query.all()
    24. lis = []
    25. for u in user:
    26. id = u.id
    27. name = u.name
    28. dic = {'id':id,'name':name}
    29. lis.append(dic)
    30. print(dic)
    31. cache.set('all-user-data',lis)
    32. return {'code':200,'msg':'查询数据成功'}

    4、定时任务模块

    将所有定时任务相关的函数都集中到一个模块中,方便维护和使用。

    schedulser_task.py

    1. from celery_task.celery import celery
    2. import time
    3. # 有返回值,返回值可以从结果后端中获取
    4. @celery.task
    5. def add_func(a, b):
    6. print('执行了加法函数',a+b)
    7. return a + b
    8. # 不需要返回值,禁用掉结果后端
    9. @celery.task(ignore_result=True)
    10. def cache_user_func():
    11. print('all')

    5、检测任务id获取任务状态和返回值

    check_task.py:

    1. from celery.result import AsyncResult
    2. from celery_task.celery import celery
    3. '''验证任务的执行状态的'''
    4. def check_task_status(task_id):
    5. '''
    6. 任务的执行状态:
    7. PENDING :等待执行
    8. STARTED :开始执行
    9. RETRY :重新尝试执行
    10. SUCCESS :执行成功
    11. FAILURE :执行失败
    12. :param task_id:
    13. :return:
    14. '''
    15. result = AsyncResult(id=task_id, app=celery)
    16. dic = {
    17. 'type': result.status,
    18. 'msg': '',
    19. 'data': None,
    20. 'code': 400
    21. }
    22. if result.status == 'PENDING':
    23. dic['msg'] = '任务等待中'
    24. elif result.status == 'STARTED':
    25. dic['msg'] = '任务开始执行'
    26. elif result.status == 'RETRY':
    27. dic['msg'] = '任务重新尝试执行'
    28. elif result.status == 'FAILURE':
    29. dic['msg'] = '任务执行失败了'
    30. elif result.status == 'SUCCESS':
    31. result = result.get()
    32. dic['msg'] = '任务执行成功'
    33. dic['data'] = result
    34. dic['code'] = 200
    35. # result.forget() # 将结果删除
    36. # async.revoke(terminate=True) # 无论现在是什么时候,都要终止
    37. # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    38. return dic

    在视图函数中调用该方法,通过task_id ,返回任务的运行结果。

    五、测试

    1、运行项目

    flask项目(在项目根目录下执行):

            flask run --host 0.0.0.0 --port 5000

    celery项目(在项目根目录下执行):

    启动celery进程:

    windows系统:

            celery -A celery_task.celery worker -l info  -P  eventlet

    linux系统:

            celery -A celery_task.celery worker -l info 

    启动定时任务(先启动celery进程在启动定时任务):

    celery -A celery_task.celery beat -l info

    2、运行结果

    1、执行异步任务中,将orm数据存到cache中

    2、执行定时任务了

    六、注意事项

    1、在系统中要先安装好redis和mysql,并都启动了

    2、在测试异步操作orm时,会使用到flask的cache存数据,注意flask的cache不能配置内存模式,不然celery进程存到cache中的数据,flask进程中取不到的。

    3、当前的配置下,celery的目录必须是在flask根目录下

    七、拓展-改变celery_task的位置

    如果你想将celery_task包移动到apps包下,此时你需要修改什么?

    1、apps/celery_task/celery.py:将flask项目根目录加载到系统环境变量中的路径有变

    1. '1、把flask项目路径添加到系统环境变量中'
    2. project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))

    2、apps/celery_task/celeryconfig.py: 注册异步任务的模块,定时任务的模块的位置变化

    1. '1、加上apps.'
    2. task_module = [
    3. 'apps.celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法
    4. 'apps.celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
    5. ]
    6. '2、task参数对应的字符串,加上apps.'
    7. config = {
    8. "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
    9. "result_backend" : 'redis://127.0.0.1:6379/1',
    10. "task_serializer" : 'json',
    11. "result_serializer" : 'json',
    12. "accept_content" : ['json'],
    13. "timezone" : 'Asia/Shanghai',
    14. "enable_utc" : False,
    15. "result_expires" : 1*60*60,
    16. "beat_schedule" : { #定时任务配置
    17. # 名字随意命名
    18. 'add-func-30-seconds': {
    19. # 执行add_task下的addy函数
    20. 'task': 'apps.celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
    21. # 每10秒执行一次
    22. 'schedule': timedelta(seconds=30),
    23. # add函数传递的参数
    24. 'args': (10, 21)
    25. },
    26. # 名字随意起
    27. 'add-func-5-minutes': {
    28. 'task': 'apps.celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
    29. # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
    30. 'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
    31. 'args': (19, 22) # 定时任务需要的参数
    32. },
    33. # 缓存用户数据到cache中
    34. 'cache-user-func': {
    35. 'task': 'apps.celery_task.scheduler_task.cache_user_func',
    36. # 导入任务函数:from celery_task.scheduler_task import cache_user_func
    37. 'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中
    38. }
    39. }
    40. }

    3、在视图函数导入异步任务的路径也变了

    1. #异步任务
    2. from apps.celery_task.async_task import send_email_task,cache_user_task

    4、启动celery和定时任务的命令变量【在项目根目录下执行命令】

    启动celery:

    windows启动命令: celery  -A  apps.celery_task.celery worker -l info  -P  eventlet

    linux启动命令: celery  -A  apps.celery_task.celery worker -l info 

    启动定时任务:

    celery -A apps.celery_task beat -l info

  • 相关阅读:
    Python---列表 集合 字典 推导式(本文以 列表 为主)
    mybatis之主键返回
    记一次 .NET 某医保平台 CPU 爆高分析
    使用C#实现服务端与客户端的简陋聊天
    Express 6 指南 - 路由 6.1 简介 & 6.2 路由方法
    基于腾讯地图实现精准定位,实现微信小程序考勤打卡功能
    华为云 CodeArts Snap 智能编程助手 PyCharm 插件安装与使用指南
    ESP8266-Arduino编程实例-PCF8591数据采集驱动
    FastDFS 存储原理
    String、StringBuffer以及StringBuilder的比较
  • 原文地址:https://blog.csdn.net/weixin_46371752/article/details/133790591