• flask框架添加异步任务处理模型任务


    • 在进行模型部署的时候, 由于后台业务和模型预测过程中, 模型进行批量预测的时候属于耗时任务, 所以在这个时候需要进行异步操作, 当预测完成之后后台在写一个定时任务, 去预测完成的地方获取结果. 这个部分的逻辑代码如下:
    # -*- coding: utf-8 -*-
    # @Time : 2022/7/20 下午5:19
    # @Author : junzai
    # @File : test_cpv_predict.py
    # @Software: PyCharm
    
    import json
    from celery import Celery
    from flask import Flask, jsonify, request
    from celery.result import AsyncResult
    from rule_ner import *
    from cpv_predict import main, load_cls_model, load_entity_model_config, load_entity_model
    
    from conf import save_path, conf_path, cls_pred_model_path, cpv_result_path, oss_up_path, save_dir, data_dir
    
    flask_app = Flask(__name__)
    flask_app.config.update(
       CELERY_BROKER_URL='redis://127.0.0.1:6379/0',
       CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0'
    
    )
    
    celery = Celery(flask_app.name, broker=flask_app.config['CELERY_RESULT_BACKEND'],
                   backend=flask_app.config['CELERY_BROKER_URL'])
    
    redis_uuid_list = []
    
    # 类目预测模型加载
    cls_model = load_cls_model(cls_pred_model_path)
    
    # 实体识别模型加载
    label_vocab, tokenizer, trans_func, batchify_fn = load_entity_model_config(data_dir)
    rank, entity_model = load_entity_model(label_vocab, save_dir)
    
    # 加载规则词典
    r_ner = Rule_NER('./userdict/bcat_3', sep='\002')
    
    
    @celery.task()
    def cpv_server(task_id, oss_download_path):
       print("cpv开始使用模型进行预测")
       json_data = main(task_id, oss_download_path, cls_model, label_vocab, trans_func, batchify_fn, rank, entity_model,
                        r_ner)
       print("cpv处理完成返回结果")
       return json_data
    
    @celery.task()
    def cpv_server(task_id, oss_download_path):
       print("cpv开始使用模型进行预测")
       time.sleep(10) # 模型耗时任务
       print("cpv处理完成返回结果")
       return '处理结果'
    
    @flask_app.route('/ner', methods=['POST'])
    def ner():
       task_id = request.form['task_id']
       oss_download_path = request.form['oss_download_path']
       print("准备执行耗时任务===")
       task = cpv_server.delay(task_id, oss_download_path)
       print('准备返回耗时任务结果')
       print(str(task))
       redis_uuid_list.append(str(task))
       return 'success'
    
    
    @flask_app.route('/result')
    def cpv_result():
       result = {}
       if redis_uuid_list:
           id = redis_uuid_list[0]
           a = AsyncResult(id=id, app=celery)
           if a.successful():
               result = a.get()  # task中return的数据
               data_dict = json.loads(result)
               data_dict['status'] = 0
               data_dict['message'] = '成功'
               result = json.dumps(data_dict)
               redis_uuid_list.pop(0)
               print('任务完成, uuid从列表中删除成功')
               return result
           elif a.failed():
               result['status'] = 1
               result['message'] = '失败'
               return json.dumps(result)
           elif a.status == 'PENDING':
               result['status'] = 2
               result['message'] = '等待执行中'
               return json.dumps(result)
           elif a.status == 'RETRY':
               result['status'] = 3
               result['message'] = '任务异常后正在重试'
               return json.dumps(result)
           elif a.status == 'STARTED':
               result['status'] = 4
               result['message'] = '任务已经开始被执行'
               return json.dumps(result)
       else:
           result['status'] = 5
           result['message'] = '无任务结果'
           return json.dumps(result)
    
    
    if __name__ == '__main__':
       flask_app.run(host='0.0.0.0', port=5000)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 说明: 这个代码完成之后, 在服务启动的时候就需要启动两个服务.第一个服务是这个程序的逻辑服务:** python xxx.py启动, 或者是gunicorn -w 1 -b 0.0.0.0:5000 xxx:flask_app**启动. 这个服务启动完成之后, 会给后台提供实时服务,第二个服务是celery服务: 启动命令: celery -A cpv_celery worker --loglevel=info -P eventlet

    启动问题:

    • 如果gunicorn没有安装, 需要提前安装. 安装命令
    pip install gunicorn
    # 启动程序的命令
    # -w: 代表工作的进程数
    # -b: 代表绑定的ip和port
    # 后年的两个app: 第一个app表示文件的名字, 第二个app代表文件中flask的实力app, 如果文件名和文件中的app和这里的不同, 根据自己的需求进行调整即可.
    gunicorn -w 1 -b 0.0.0.0:5000 app:app
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 如果异步服务启动, 先要安装eventlet和celery两个包
    pip install eventlet
    pip install celery
    
    # 安装完成之后开始启动异步服务
    # 其中: cpv_celery为异步服务的文件的文件名
    celery -A cpv_celery worker --loglevel=info -P eventlet
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 应为在异步使用的过程中, 中间件用的是redis数据库, 所以在启动celery之前需要先安装redis数据库

    • redis数据库的安装

    wget http://download.redis.io/releases/redis-6.0.8.tar.gz
    tar xzf redis-6.0.8.tar.gz
    cd redis-6.0.8
    make
    
    • 1
    • 2
    • 3
    • 4

    执行完 make 命令后,redis-6.0.8 的 src 目录下会出现编译后的 redis 服务程序 redis-server,还有用于测试的客户端程序 redis-cli:

    下面启动 redis 服务

    cd src
    ./redis-server
    
    • 1
    • 2

    注意这种方式启动 redis 使用的是默认配置。也可以通过启动参数告诉 redis 使用指定配置文件使用下面命令启动

    cd src
    ./redis-server ../redis.conf
    
    • 1
    • 2

    redis.conf 是一个默认的配置文件。我们可以根据需要使用自己的配置文件。

    启动 redis 服务进程后,就可以使用测试客户端程序 redis-cli 和 redis 服务交互了。 比如:

    cd src
    ./redis-cli
    redis> set foo bar
    OK
    redis> get foo
    "bar"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在redis工程化启动过程中,一般redis是以守护进行启动, 所以我们可以修改一下配置文件redis.conf

    # 刚安装好的redis 这里默认是no, 将其改成yes, 即可变成守护进行启动
    136 daemonize yes
    
    • 1
    • 2

    然后再进行redis服务的重启即可

    cd src
    ./redis-server ../redis.conf
    
    
    • 1
    • 2
    • 3
  • 相关阅读:
    实体机安装centos7问题总结
    文字转语音朗读如何操作?手把手教你如何将文字转语音
    VBA驱动SAP GUI完成界面元素值初始化
    leetcode 64. 最小路径和
    为什么要前后端分离
    react 学习 —— 16、使用 ref 操作 DOM
    Vue前后端项目开发指南(三)【后端Springboot项目的搭建】
    uniapp 全局置灰、哀悼置灰(可动态、同时支持nvue、vue)插件 Ba-Gray
    【数值计算方法】曲线拟合与插值:Lagrange插值、Newton插值及其python/C实现
    Linux系统使用EPSON的L3255型号打印机遇到的问题解决方法
  • 原文地址:https://blog.csdn.net/junjunzai123/article/details/125910887