• Flask框架配置Celery-[2]:将前端上传的文件,通过异步任务保存,异步处理上传的文件


    一、项目大致目录

    主要介绍celery的配置,flask相关配置就默认大家都会了。

    flask-object

            |--apps

                    |--user

                            views.py

                            __init__.py

            |--celery_task

                    __init__.py

                    asycn_task.py

                    celery.py        

                    celeryconfig.py

                    check_task.py

                    scheduler_task.py

            app.py

    依赖包:

    1. celery==4.4.7
    2. eventlet==0.33.3
    3. Flask==2.1.3
    4. Flask-Caching==1.10.1
    5. Flask-Cors==3.0.10
    6. Flask-Migrate==2.7.0
    7. Flask-RESTful==0.3.9
    8. Flask-SocketIO==5.1.1
    9. Flask-SQLAlchemy==2.5.1
    10. PyMySQL==1.0.2
    11. redis==3.5.3
    12. SQLAlchemy==1.4.0
    13. Werkzeug==2.0.2

    二、celery项目配置

    1、celery相关配置:celeryconfig.py

    celery默认使用json作为序列化工具,要操作flask上传的文件,需要改为pickle

    1. from celery.schedules import crontab
    2. from datetime import timedelta
    3. '''
    4. 参数解析:
    5. accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
    6. task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
    7. result_serializer:结果序列化格式,默认值为json;
    8. timezone:配置Celery以使用自定义时区;
    9. enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
    10. result_expires: 异步任务结果存活时长
    11. beat_schedule:设置定时任务
    12. '''
    13. #手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
    14. task_module = [
    15. 'celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法
    16. 'celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
    17. ]
    18. #celery的配置
    19. config = {
    20. "broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
    21. "result_backend" : 'redis://127.0.0.1:6379/1',
    22. "task_serializer" : 'pickle', #json格式支持的数据格式比较少,改为pickle
    23. "result_serializer" : 'pickle', #json格式支持的数据格式比较少,改为pickle
    24. "accept_content" : ['pickle'], #json格式支持的数据格式比较少,改为pickle
    25. "timezone" : 'Asia/Shanghai',
    26. "enable_utc" : False,
    27. "result_expires" : 1*60*60,
    28. "beat_schedule" : { #定时任务配置
    29. # 名字随意命名
    30. 'add-func-30-seconds': {
    31. # 执行add_task下的addy函数
    32. 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
    33. # 每10秒执行一次
    34. 'schedule': timedelta(seconds=30),
    35. # add函数传递的参数
    36. 'args': (10, 21)
    37. },
    38. # 名字随意起
    39. 'add-func-5-minutes': {
    40. 'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func
    41. # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
    42. 'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
    43. 'args': (19, 22) # 定时任务需要的参数
    44. },
    45. # 缓存用户数据到cache中
    46. 'cache-user-func': {
    47. 'task': 'celery_task.scheduler_task.cache_user_func',
    48. # 导入任务函数:from celery_task.scheduler_task import cache_user_func
    49. 'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中
    50. }
    51. }
    52. }

    2、创建celery对象:celery.py

    1. from celery import Celery,Task
    2. from .celeryconfig import config,task_module
    3. import sys
    4. import os
    5. '1、把flask项目路径添加到系统环境变量中'
    6. project_path = os.path.dirname(os.path.dirname(__file__))
    7. sys.path.append(project_path)
    8. '''
    9. 2、创建celery应用对象
    10. 'task'可以任务是该celery对象名字,用于区分celery对象
    11. broker是指定消息中间件
    12. backend是指定任务结果存储位置
    13. include是手动指定异步任务所在的模块的位置
    14. '''
    15. #创建celery异步对象
    16. celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
    17. #导入一些基本配置
    18. celery.conf.update(**config)
    19. '3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'
    20. class ContextTask(celery.Task):
    21. def __call__(self, *args, **kwargs):
    22. from apps import create_app
    23. app = create_app()
    24. with app.app_context():
    25. return self.run(*args, **kwargs)
    26. celery.Task = ContextTask

      3、异步任务模块:async_task.py

    1. import time
    2. # 导入celery对象app
    3. from celery_task.celery import celery
    4. import os
    5. #导入发送邮件的模块
    6. from base.email_ import send_email
    7. from ext import cache
    8. '''
    9. 所有异步任务:
    10. 1、没有返回值的,@app.task(ignore_result=True)
    11. 2、有返回值的任务,@app.task 默认就是(ignore_result=False)
    12. '''
    13. # 没有返回值,禁用掉结果后端
    14. @celery.task
    15. def send_email_task(receiver_email,code): # 此时可以直接传邮箱,还能减少一次数据库的IO操作
    16. '''
    17. :param email: 接收消息的邮箱,用户的邮箱
    18. :return:
    19. '''
    20. # 启用线程发送邮件,此处最好加线程池
    21. ret = send_email(
    22. receiver_email=receiver_email,
    23. subject='登录验证码',
    24. message=f'您的验证码是:{code}',
    25. )
    26. return {'result':ret,receiver_email:code}
    27. @celery.task
    28. def cache_user_task():
    29. from apps.user.models import UserModel
    30. user = UserModel.query.all()
    31. lis = []
    32. for u in user:
    33. id = u.id
    34. name = u.name
    35. dic = {'id':id,'name':name}
    36. lis.append(dic)
    37. print(dic)
    38. cache.set('all-user-data',lis)
    39. return {'code':200,'msg':'查询数据成功'}
    40. @celery.task
    41. def test_check_fun():
    42. print('耗时开始:30秒')
    43. time.sleep(30)
    44. print('耗时结束...')
    45. return {'result':'异步任务执行后返回值','time':30}
    46. @celery.task(ignore_result=True)
    47. def save_file_task(file_bytes,file_path,delete_file_path=None):
    48. '''
    49. file_bytes: flask调用使用传递的文件字节流
    50. file_path: 文件保存的绝对路径
    51. 1、对与路径的拼接和文件名重复的逻辑,通通在视图函数中处理
    52. 2、在这里,只负责将文件保存到系统中,不管其他的逻辑
    53. '''
    54. #1、查看文件所在路径是否存在,不存在就创建
    55. file_path_dir = os.path.dirname(file_path)
    56. if not os.path.exists(file_path_dir):
    57. os.makedirs(file_path_dir)
    58. #2、保存文件
    59. with open(file_path, 'wb+') as f:
    60. f.write(file_bytes)
    61. #3、删除旧文件
    62. if delete_file_path:
    63. #想头像图片,一个用户只保存一个头像文件就可以了,删除之前旧的头像文件
    64. try:
    65. os.remove(delete_file_path)
    66. except Exception as _:
    67. print('删除失败')
    68. pass

    4、定时任务模块:scheduler_task.py

    1. from celery_task.celery import celery
    2. import time
    3. '''
    4. 所有定时任务
    5. '''
    6. # 有返回值,返回值可以从结果后端中获取
    7. @celery.task
    8. def add_func(a, b):
    9. print('执行了加法函数',a+b)
    10. return a + b
    11. # 不需要返回值,禁用掉结果后端
    12. @celery.task(ignore_result=True)
    13. def cache_user_func():
    14. print('all')

    5、校验任务是否成功:check_task.py

    1. from celery.result import AsyncResult
    2. from celery_task.celery import celery
    3. '''验证任务的执行状态的'''
    4. def check_task_status(task_id):
    5. '''
    6. 任务的执行状态:
    7. PENDING :等待执行
    8. STARTED :开始执行
    9. RETRY :重新尝试执行
    10. SUCCESS :执行成功
    11. FAILURE :执行失败
    12. :param task_id:
    13. :return:
    14. '''
    15. result = AsyncResult(id=task_id, app=celery)
    16. dic = {
    17. 'type': result.status,
    18. 'msg': '',
    19. 'data': None,
    20. 'code': 400
    21. }
    22. if result.status == 'PENDING':
    23. dic['msg'] = '任务等待中'
    24. elif result.status == 'STARTED':
    25. dic['msg'] = '任务开始执行'
    26. elif result.status == 'RETRY':
    27. dic['msg'] = '任务重新尝试执行'
    28. elif result.status == 'FAILURE':
    29. dic['msg'] = '任务执行失败了'
    30. elif result.status == 'SUCCESS':
    31. result = result.get()
    32. dic['msg'] = '任务执行成功'
    33. dic['data'] = result
    34. dic['code'] = 200
    35. # result.forget() # 将结果删除
    36. # async.revoke(terminate=True) # 无论现在是什么时候,都要终止
    37. # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    38. return dic

    三、flask中的视图函数

    写一个视图函数,接收用户上传的头像图片,将该图片保存到系统中,将保存的操作放到异步任务中。

    1. class UserOneResource(Resource):
    2. def put(self,id):
    3. '''
    4. 用户更新头像的接口
    5. '''
    6. file = request.files.get('picture')
    7. if not file:
    8. return NewResponse(error='请携带头像文件')
    9. user = models.UserModel.query.filter_by(id=id).first()
    10. if not user:
    11. return NewResponse(error='用户不存在')
    12. # print(file.filename,'=============')
    13. _,suffix = file.filename.split('.')
    14. #1、使用uuid创建文件名
    15. file_name = str(uuid.uuid4())+f'-{id}'+'.'+suffix
    16. file_path = os.path.join(STATIC_PATH,'picture',file_name).replace('\\','/')
    17. #2、读取出旧的头像路径
    18. delete_file_path = None
    19. if user.picture:
    20. picture = user.picture
    21. if picture[0] in ['\\','/']:
    22. picture = picture[1:]
    23. delete_file_path = os.path.join(BASE_PATH,picture).replace('\\','/')
    24. print(delete_file_path)
    25. #3、读取文件字节码
    26. file_bytes = file.read()
    27. #4、调用异步保存文件任务
    28. res = save_file_task.delay(file_bytes,file_path,delete_file_path)
    29. task_id = res.id
    30. #5、将头像新路径保存到数据库中
    31. picture = file_path.split('static/')[-1]
    32. picture = 'static/' + picture.replace('\\','/')
    33. user.picture = picture
    34. db.session.add(user)
    35. db.session.commit()
    36. return NewResponse(data={'task_id':task_id},msg='操作成功')

    四、启动项目

    1、启动celery
    windows系统需要借助:eventlet
    
        celery -A celery_task.celery worker -l info  -P  eventlet
    
    linux系统:
    
        celery -A celery_task.celery worker -l info
    
    
    2、启动定时任务
    
        celery -A celery_task beat -l info
  • 相关阅读:
    springcloudalibaba架构(23):RocketMQ普通消息和顺序消息
    mybatisplus查询特定字段空指针异常
    35. 搜索插入位置、Leetcode的Python实现
    Android使用Zxing库生成PDF417扫描后多一个字符A
    数据宝荣获“2021-2022年度最具影响力数字化转型服务商”称号
    Android View自定义参数declare-styleable介绍与使用
    基于JAVA流行病调查平台计算机毕业设计源码+系统+mysql数据库+lw文档+部署
    无代码开发添加数据入门教程
    512 - Spreadsheet Tracking (UVA)
    python 列表去重的5种方式
  • 原文地址:https://blog.csdn.net/weixin_46371752/article/details/133855063