• 开源模型应用落地-业务优化篇(八)


    一、前言

        在之前的学习中,我相信您已经学会了一些优化技巧,比如分布式锁、线程池优化、请求排队、服务实例扩容和消息解耦等等。现在,我要给您介绍最后一篇业务优化的内容了。这个优化方法是通过定时统计问题的请求频率,然后将一些经常被请求的问题缓存起来,以提高系统的响应速度。


    二、术语

    2.1、 任务调度框架

        是一种用于管理和执行任务的软件工具或平台。它提供了一种结构和机制,使用户能够以自动化的方式安排、调度和执行任务,以满足特定的需求和要求。

    2.2、分布式任务调度框架

        是一种用于管理和调度分布式环境中任务的软件工具或平台。它专注于在分布式系统中协调和执行任务,以提高整体性能、可伸缩性和容错性。

        分布式任务调度框架通常用于处理大规模任务和作业,并利用集群、云计算或容器化环境中的多个计算节点来并行执行任务。它们提供了一种分布式任务调度器,可以协调和分配任务到可用的计算节点,并监控任务的执行状态和进度。

    2.3、XXL-JOB

        是一个开源的分布式任务调度平台,用于解决大规模任务调度和分布式定时任务管理的需求。它提供了一个可视化的任务调度中心,可以集中管理和调度各种类型的任务,包括定时任务、流程任务和API任务等。

    2.4、Milvus

        是一个开源的向量数据库引擎,专门用于存储和处理大规模高维向量数据。它提供了高效的向量索引和相似性搜索功能,使用户能够快速地进行向量数据的存储、查询和分析。

        Milvus的设计目标是为了满足现代应用中对大规模向量数据的需求,例如人脸识别、图像搜索、推荐系统等。它采用了向量空间模型和多种索引算法,包括倒排索引、近似最近邻(Approximate Nearest Neighbor,ANN)等,以支持高效的相似性搜索。

        Milvus提供了易于使用的编程接口和丰富的功能,使用户可以方便地插入、查询和分析向量数据。它支持多种数据类型的向量,包括浮点型、整型等,也支持多种距离度量方法,如欧氏距离、余弦相似度等。

        Milvus还提供了分布式部署和横向扩展的能力,可以在多台机器上构建高可用性和高性能的向量数据库集群。它支持数据的分片和负载均衡,可以处理大规模数据集和高并发查询。
     


    三、前置条件

    3.1、已经根据前面的“开源模型应用落地”的学习搭建起完整AI流程

        1) 如何部署AI服务

        2) 如何使用向量数据库

        3) 如何使用RocketMQ

        ......

        本篇将通过定时任务周期性的统计问题请求的频次,并从向量数据库中,将热点问题同步至Redis,实现缓存前置,提升访问性能。


    四、技术实现

    4.1、新增定时任务处理类

    1. import cn.hutool.core.map.MapUtil;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.apache.commons.lang3.StringUtils;
    4. import org.redisson.api.RMap;
    5. import org.springframework.beans.factory.annotation.Autowired;
    6. import org.springframework.scheduling.annotation.Scheduled;
    7. import org.springframework.stereotype.Component;
    8. import java.util.Map;
    9. @Slf4j
    10. @Component
    11. public class HotTopicStatistics {
    12. private static final Long DEFAULT_HOTSPOT_THRESHOLD = 10L;
    13. @Autowired
    14. private RedisUtils redisUtils;
    15. @Autowired
    16. private ContentCacheUtils contentCacheUtils;
    17. @Scheduled(cron = "*/30 * * * * ?")
    18. public void statistics() {
    19. RMap rmap = redisUtils.hincget("CONTENT_COUNTER");
    20. if (MapUtil.isNotEmpty(rmap)) {
    21. for (Map.Entry entry : rmap.entrySet()) {
    22. String keyword = entry.getKey();
    23. Long count = Long.parseLong(entry.getValue());
    24. // 计数器统计数值 > 热度阈值
    25. if(count.compareTo(DEFAULT_HOTSPOT_THRESHOLD) > 0){
    26. // 从向量数据库中拉取数据
    27. log.info("从向量数据库中拉取数据");
    28. String cacheContent = contentCacheUtils.cacheFromMilvus(keyword);
    29. if(StringUtils.isNotEmpty(cacheContent) && StringUtils.isNotBlank(cacheContent)){
    30. log.info("将热点内容缓存至redis中,过期时间设置为3600秒,内容为:{}",cacheContent);
    31. // 将热点内容缓存至redis中,过期时间设置为3600秒
    32. redisUtils.buckSet(keyword,cacheContent,60*60);
    33. }
    34. }
    35. }
    36. }
    37. }
    38. }

    4.2、新增内容缓存公共类

    1. import cn.hutool.core.collection.CollUtil;
    2. import cn.hutool.core.convert.Convert;
    3. import cn.hutool.core.lang.Console;
    4. import lombok.extern.slf4j.Slf4j;
    5. import org.apache.commons.lang3.StringUtils;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.stereotype.Component;
    8. import java.util.ArrayList;
    9. import java.util.List;
    10. import java.util.Map;
    11. @Slf4j
    12. @Component
    13. public class ContentCacheUtils {
    14. private static final int DIM = 256;
    15. @Autowired
    16. private AIChatUtils aiChatUtils;
    17. @Autowired
    18. private MilvusUtils milvusUtils;
    19. public String cacheFromMilvus(String keyWord){
    20. if (StringUtils.isEmpty(keyWord) || StringUtils.isBlank(keyWord)){
    21. return null;
    22. }
    23. float[] vector = aiChatUtils.getVector("", keyWord);
    24. double[] double_arr = milvusUtils.pretreatment(vector, DIM);
    25. Float[] float_arr = Convert.toFloatArray(double_arr);
    26. List vectorList = CollUtil.list(false, float_arr);
    27. List search_vectors = new ArrayList(1);
    28. // 打印日志
    29. Console.log(search_vectors);
    30. search_vectors.add(vectorList);
    31. Map resultMap = milvusUtils.search_data_vector("tb_content", "keyword",
    32. search_vectors, null, 1, CollUtil.list(false, "content"));
    33. String status = resultMap.get("status");
    34. String cacheContent = null;
    35. if (StringUtils.equals(status, "0")) {
    36. cacheContent = resultMap.get("content");
    37. }
    38. return cacheContent;
    39. }
    40. }

    4.3、修改Redis公共类

        增加以下方法

    1. public RMap hincget(String key){
    2. RMap rmap = null;
    3. if (StringUtils.isNotEmpty(key) && StringUtils.isNoneBlank(key) ) {
    4. rmap = redissonClient.getMap(key);
    5. }
    6. return rmap;
    7. }
    8. public void buckSet(String key, String value,long second) {
    9. if (StringUtils.isNotEmpty(key) && StringUtils.isNoneBlank(key) && StringUtils.isNotEmpty(value) && StringUtils.isNoneBlank(value)) {
    10. RBucket bucket = redissonClient.getBucket(key);
    11. bucket.set(value,second, TimeUnit.SECONDS);
    12. }
    13. }

    4.4、修改业务处理类

        使用上面内容缓存公共类替换早前未封装的代码


    五、测试

    5.1、启动Redis

        启动windows版本的redis服务,redis-server.exe redis.windows.conf

    5.2、将CONTENT_COUNTER的值设置为11

        下面使用Redis Desktop Manager工具编辑CONTENT_COUNTER的值

    5.3、启动Milvus Server,并初始化数据

    5.4、启动SpringBoot项目

        (一)运行Application

        (二)Redis当前只有一个Key,热点内容未缓存

        (三)定时任务触发

        (四)Redis缓存热点内容


    六、附带说明

    6.1、Spring Boot开启定时任务

        启用类增加@EnableScheduling注解

        需要将具体任务类加入到Spring管理,例如:增加@Component注解

        

    6.2、实际项目中,应用使用分布式任务调度平台去替换本示例中SpringBoot内置的任务调度功能

    6.3、Milvus Server启动超时

      直接编辑milvus下面的__init__.py文件,将timeout设置大一些

    6.4、本章BusinessHandler完整代码

    1. import cn.hutool.core.collection.CollUtil;
    2. import cn.hutool.core.convert.Convert;
    3. import cn.hutool.core.lang.Console;
    4. import com.alibaba.fastjson.JSON;
    5. import io.netty.channel.ChannelHandler;
    6. import lombok.extern.slf4j.Slf4j;
    7. import io.netty.channel.ChannelHandlerContext;
    8. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    9. import org.apache.commons.lang3.StringUtils;
    10. import org.redisson.api.RLock;
    11. import org.redisson.api.RedissonClient;
    12. import org.springframework.beans.factory.annotation.Autowired;
    13. import org.springframework.stereotype.Component;
    14. import java.util.ArrayList;
    15. import java.util.Arrays;
    16. import java.util.List;
    17. import java.util.Map;
    18. import java.util.concurrent.TimeUnit;
    19. /**
    20. * @Description: 处理消息的handler
    21. */
    22. @Slf4j
    23. @ChannelHandler.Sharable
    24. @Component
    25. public class BusinessHandler extends AbstractBusinessLogicHandler {
    26. public static final String LINE_UP_QUEUE_NAME = "AI-REQ-QUEUE";
    27. private static final String LINE_UP_LOCK_NAME = "AI-REQ-LOCK";
    28. private static final int MAX_QUEUE_SIZE = 100;
    29. // @Autowired
    30. // private TaskUtils taskExecuteUtils;
    31. // @Autowired
    32. // private AIChatUtils aiChatUtils;
    33. // @Autowired
    34. // private MilvusUtils milvusUtils;
    35. @Autowired
    36. private RedisUtils redisUtils;
    37. @Autowired
    38. private RedissonClient redissonClient;
    39. @Autowired
    40. private NettyConfig nettyConfig;
    41. @Autowired
    42. private RocketMQProducer rocketMQProducer;
    43. @Autowired
    44. private ContentCacheUtils contentCacheUtils;
    45. @Override
    46. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    47. String channelId = ctx.channel().id().asShortText();
    48. log.info("add client,channelId:{}", channelId);
    49. }
    50. @Override
    51. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    52. String channelId = ctx.channel().id().asShortText();
    53. log.info("remove client,channelId:{}", channelId);
    54. }
    55. @Override
    56. protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame)
    57. throws Exception {
    58. // 获取客户端传输过来的消息
    59. String content = textWebSocketFrame.text();
    60. // 兼容在线测试
    61. if (StringUtils.equals(content, "PING")) {
    62. buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode()))
    63. .respTime(String.valueOf(System.currentTimeMillis()))
    64. .msgType(String.valueOf(MsgType.HEARTBEAT.getCode()))
    65. .contents("心跳测试,很高兴收到你的心跳包")
    66. .build());
    67. return;
    68. }
    69. log.info("接收到客户端发送的信息: {}", content);
    70. Long userIdForReq;
    71. String msgType = "";
    72. String contents = "";
    73. try {
    74. ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);
    75. msgType = apiReqMessage.getMsgType();
    76. contents = apiReqMessage.getContents();
    77. userIdForReq = apiReqMessage.getUserId();
    78. // 用户身份标识校验
    79. if (null == userIdForReq || (long) userIdForReq <= 10000) {
    80. ApiRespMessage apiRespMessage = ApiRespMessage.builder().code(String.valueOf(StatusCode.SYSTEM_ERROR.getCode()))
    81. .respTime(String.valueOf(System.currentTimeMillis()))
    82. .contents("用户身份标识有误!")
    83. .msgType(String.valueOf(MsgType.SYSTEM.getCode()))
    84. .build();
    85. buildResponseAndClose(channelHandlerContext, apiRespMessage);
    86. return;
    87. }
    88. if (StringUtils.equals(msgType, String.valueOf(MsgType.CHAT.getCode()))) {
    89. // 对用户输入的内容进行自定义违规词检测
    90. // 对用户输入的内容进行第三方在线违规词检测
    91. // 对用户输入的内容进行组装成Prompt
    92. // 对Prompt根据业务进行增强(完善prompt的内容)
    93. // 对history进行裁剪或总结(检测history是否操作模型支持的上下文长度,例如qwen-7b支持的上下文长度为8192)
    94. // ...
    95. // 通过线程池来处理
    96. // String messageId = apiReqMessage.getMessageId();
    97. // List history = apiReqMessage.getHistory();
    98. // AITaskReqMessage aiTaskReqMessage = AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();
    99. // taskExecuteUtils.execute(aiTaskReqMessage);
    100. // 违规词检测
    101. log.info("contents: {}",contents);
    102. if (WordDetection.contains_illegal_word(contents)) {
    103. log.warn("the content sent contains illegal words");
    104. ApiRespMessage apiRespMessage = ApiRespMessage.builder().code(String.valueOf(StatusCode.ILLEGAL_WORDS_FAILURE_731.getCode()))
    105. .respTime(String.valueOf(System.currentTimeMillis()))
    106. .contents("内容不合规!")
    107. .msgType(String.valueOf(MsgType.SYSTEM.getCode()))
    108. .build();
    109. buildResponseAndClose(channelHandlerContext, apiRespMessage);
    110. return;
    111. }
    112. String[] filterWords = new String[]{"一", "语文", "老师"};
    113. List keyWordsList = KeyWordsUtils.extractKeywords(contents, Arrays.asList(filterWords));
    114. String keyWord = CollUtil.join(keyWordsList, "");
    115. log.info("keyWord: {}", keyWord);
    116. String cacheContent = redisUtils.buckGet(keyWord);
    117. // 返回redis中的缓存数据
    118. if (StringUtils.isNotEmpty(cacheContent) && StringUtils.isNoneBlank(cacheContent)) {
    119. buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode()))
    120. .respTime(String.valueOf(System.currentTimeMillis()))
    121. .msgType(String.valueOf(MsgType.CHAT.getCode()))
    122. .contents(cacheContent)
    123. .build());
    124. return;
    125. } else {
    126. // 从milvus中检索数据
    127. cacheContent = contentCacheUtils.cacheFromMilvus(keyWord);
    128. if (StringUtils.isNotEmpty(cacheContent) && StringUtils.isNoneBlank(cacheContent)) {
    129. buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode()))
    130. .respTime(String.valueOf(System.currentTimeMillis()))
    131. .msgType(String.valueOf(MsgType.CHAT.getCode()))
    132. .contents(cacheContent)
    133. .build());
    134. return;
    135. }
    136. //投递消息
    137. String msg = "{\"msg\":\""+keyWord+"\"}";
    138. rocketMQProducer.send("ai-topic",msg);
    139. }
    140. // 通过队列来缓冲
    141. boolean flag = true;
    142. RLock lock = redissonClient.getLock(LINE_UP_LOCK_NAME);
    143. String queueName = LINE_UP_QUEUE_NAME + "-" + nettyConfig.getNode();
    144. //尝试获取锁,最多等待3秒,锁的自动释放时间为10秒
    145. if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {
    146. try {
    147. if (redisUtils.queueSize(queueName) < MAX_QUEUE_SIZE) {
    148. redisUtils.queueAdd(queueName, content);
    149. log.info("当前线程为:{}, 添加请求至redis队列", Thread.currentThread().getName());
    150. } else {
    151. flag = false;
    152. }
    153. } catch (Throwable e) {
    154. log.error("系统处理异常", e);
    155. } finally {
    156. lock.unlock();
    157. }
    158. } else {
    159. flag = false;
    160. }
    161. if (!flag) {
    162. buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode()))
    163. .respTime(String.valueOf(System.currentTimeMillis()))
    164. .msgType(String.valueOf(MsgType.SYSTEM.getCode()))
    165. .contents("当前排队人数较多,请稍后再重试!")
    166. .build());
    167. }
    168. } else if (StringUtils.equals(msgType, String.valueOf(MsgType.INIT.getCode()))) {
    169. //一、业务黑名单检测(多次违规,永久锁定)
    170. //二、账户锁定检测(临时锁定)
    171. //三、多设备登录检测
    172. //四、剩余对话次数检测
    173. //检测通过,绑定用户与channel之间关系
    174. addChannel(channelHandlerContext, userIdForReq);
    175. String respMessage = "用户标识: " + userIdForReq + " 登录成功";
    176. buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode()))
    177. .respTime(String.valueOf(System.currentTimeMillis()))
    178. .msgType(String.valueOf(MsgType.INIT.getCode()))
    179. .contents(respMessage)
    180. .build());
    181. } else if (StringUtils.equals(msgType, String.valueOf(MsgType.HEARTBEAT.getCode()))) {
    182. buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode()))
    183. .respTime(String.valueOf(System.currentTimeMillis()))
    184. .msgType(String.valueOf(MsgType.HEARTBEAT.getCode()))
    185. .contents("心跳测试,很高兴收到你的心跳包")
    186. .build());
    187. } else {
    188. log.info("用户标识: {}, 消息类型有误,不支持类型: {}", userIdForReq, msgType);
    189. }
    190. } catch (Exception e) {
    191. log.warn("【BusinessHandler】接收到请求内容:{},异常信息:{}", content, e.getMessage(), e);
    192. // 异常返回
    193. return;
    194. }
    195. }
    196. }

  • 相关阅读:
    .net core跨平台桌面程序 avalonia:从项目创建到打包部署linux-arm系统ubuntu
    超分辨率论文阅读
    Sentic GCN (2022 Knowledge-Based Systems)
    【DataRoom】- 基于VUE的开源的大屏可视化设计器
    MySQL 5.7.x--命令行自带帮助文档的使用,超级棒!!!
    Yolov5 中添加Network Slimming剪枝--稀疏训练部分
    【Spring】Spring中更简单的存储和读取Bean手术刀剖析
    element-ui select 多选框 使用 默认数据回显
    BFS——武士风度的牛
    Linux 软件包管理器 yum
  • 原文地址:https://blog.csdn.net/qq839019311/article/details/136617573