• aisr接入指引


    aisr接入指引

    包括自动读配置,建立redis队列,mq对象,还有redis队列的异步多进程处理.

    1. 导入依赖
    pip install aisrfwk==1.0.4 -i https://pypi.org/simple
    如果是本地已经引用旧版本,需要使用更新到指定版本
    pip3 install --upgrade aisrfwk==1.0.4 -i https://pypi.org/simple
    
    • 1
    • 2
    • 3
    2. 添加配置

    根据环境配置.文件名固定server-{env}.conf格式

    env即为运行启动命令后的第一个参数. 例如: python mian.py sit

    配置文件会根据env加载对应的配置文件,环境支持,dev,uat,prod,默认为local

    ###====配置文件模板====####
    
    # 系统主题,关系到错误提示.邮件主题等
    system.name="地库排布系统"
    
    # 环境 配置项
    # env=local
    # server 配置项
    server.port=15555
    
    # redis 配置项
    redis.host=10.220.37.1
    redis.port=16379
    redis.passwd=***
    redis.db=7
    # 任务队列key
    redis.task_queue_key=task_queue
    # 任务状态
    redis.task_status_key=task_status
    # 任务请求数据
    redis.task_data_key=task_data
    # 任务过程发送邮件
    redis.task_mail_key=task_mail
    
    # mq 配置项
    mq.addr=0.0.0.0:9876
    mq.producer=aisr-carbarn-ai
    mq.topic=aisr-carbarn-park-plan-test
    
    # 图片上传接口
    upload.url=https://ipark-dev.bgy.com.cn/background/file-service/file/v1/publicRead/upload
    
    
    # 邮件主题
    mail.subject="算法运行出错"
    # 邮件标题
    mail.title="智能地库算法排布运行错误"
    # 邮件发送人简称
    mail.send.user="智能地库"
    # 邮件接收人简称
    mail.to.user="智能地库管理员"
    
    # 发送邮件配置
    mail.host=smtp.countrygarden.com.cn
    mail.port=587
    mail.ssl=false
    mail.user=admin@countrygarden.com.cn
    mail.passwd=***
    mail.send.nick=FDDS
    # 多个邮件逗号隔开,不配合默认不发送
    mail.to.users=aa@contrygarden.com.cn
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    3. 自行添加http接口

    http使用flask配置

    ###====启动方法====####
    
    def start():
        """启动web服务"""
        server_host = config.get("server.host", "0.0.0.0")
        server_port = config.get("server.port", 5000)
        log.info("当前配置.端口号:{},ip:{}", server_host, server_port)
        # scheduleTask()
        app.run(host=server_host, port=server_port)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    api模板
    import sys
    import json
    
    from aisrfwk.util import log
    from aisrfwk.domain.base_result import Result
    from flask import Flask, jsonify, request
    from flask_cors import cross_origin
    from aisrfwk.util.config import config, service_port
    import aisrfwk.util.redis_util as redis
    
    from aisrfwk.util import api
    
    sys.path.append("..")
    
    app = Flask(__name__)
    
    
    @app.route('/park', methods=['POST'])
    @cross_origin()
    def park():
        """
        车位排布主方法
        """
        try:
            param = request.get_data()
            json_param = json.loads(param)
            validateParam(json_param)
            res = api.pushQueue({json_param)
            return Result().success(res)
        except Exception as e:
            log.info(type(e))
            log.error("车位排布任务启动失败:" + str(e))
            return Result().error(str(e))
    
    
    def validateParam(param):
        """根据图纸生成方案参数校验"""
        assert param.get("projectId") is not None, '项目ID参数[projectId]必须传入'
        # assert param.get("params") is not None, '指标参数[params]必须传入'
        # assert param.get("outer") is not None, '地库退线参数[outer]必须传入'
        # assert param.get("building") is not None, '楼栋外轮廓参数[outer]必须传入'
        # assert param.get("column") is not None, '剪力墙参数[column]必须传入'
        # assert param.get("coreBarrel") is not None, '核心筒参数[coreBarrel]必须传入'
    
    
    def start():
        """启动web服务"""
        server_host = config.get("server.host", "0.0.0.0")
        server_port = service_port if service_port != 0 else config.get("server.port", 5000)
        log.info("当前配置.端口号:{},ip:{}", server_host, server_port)
        # scheduleTask()
        app.run(host=server_host, port=server_port)
    
    
    if __name__ == '__main__':
        # 本地运行Flask
        start()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    4.加入业务逻辑处理类

    创建业务逻辑处理类,需要继承BaseAisrTask,并且重写run方法

    import sys
    import time
    
    from aisrfwk.domain.base_aisr_task import BaseAisrTask
    from aisrfwk.util import log, redis_util, rocketmq_util
    from aisrfwk.util.exception_util import formatException
    
    
    class BusinessDemo(BaseAisrTask):
    
        def run(self, data):
            start_time = time.time()
    
            task_id = None
            try:
                log.info("业务方法处理逻辑")
    
                # 过程中处理结果发送到服务端
                rocketmq_util.sendResultToServer()
            except BaseException as e:
                _msg = "任务失败,请检查底图和图层选择后再次重试!"
                _err_msg = "【严重】任务失败,排布过程中出现异常:"
                log.error(_err_msg + "{}", str(e))
                exc_text = _err_msg + "
    "
    + formatException(sys.exc_info()) redis_util.errorTaskStatus(task_id, exc_text) rocketmq_util.sendErrorToServer() # 处理结束后发送消息到后端服务 time.sleep(1) rocketmq_util.sendTaskOverToServer() end_time = time.time() log.info("排布完成,总耗时:[{}]秒", round(end_time - start_time, 2))
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    5.主方法启动

    主方法启动时需要启动队列,并传入业务处理类,然后再启动api

    from aisrfwk.aisr_queue import consume_queue as queue
    
    import rest_apis_demo
    from business import BusinessDemo
    
    if __name__ == '__main__':
        queue.start(BusinessDemo())
        rest_apis_demo.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    6.公共API
    push消息到队列
    from aisrfwk.util import api
    
    # 将参数对象push到消息队列中.
    res = api.pushQueue({"projectId": 12,"test": "name"})
    # 返回的对象会包含自动生成的id
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    推送消息到rocketmq
    from aisrfwk.util import rocketmq_util
    
    # 推送方案对象到rocketmq
    rocketmq_util.sendResultToServer({返回对象})
    # 推送错误消息到rocketmq
    rocketmq_util.sendErrorToServer()
    # 推送任务完结消息到rocketmq
    rocketmq_util.sendTaskOverToServer()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    详细使用可见上步业务逻辑处理类

  • 相关阅读:
    面试官:告诉我为什么static和transient关键字修饰的变量不能被序列化?
    Python 函数的定义
    担忧CentOS停服?KeyarchOS系统来支撑
    Docker与Kubernetes介绍
    19.flink task数量,slot数量和taskManage数量
    mac怎么恢复删除的文件,mac照片图库删除后怎么恢复
    Obsidian插件推荐_231005
    使用C语言实现前,中,后序线索化二叉树
    小结笔记:多位管理大师关于管理的要素的论述
    TcpServer::start都做了些什么
  • 原文地址:https://blog.csdn.net/a249040113/article/details/133268865