智能分析平台实现了用户导入需要分析的原始数据集后,利用AI自动生成可视化图表和分析结论,改善了传统BI系统需要用户具备相关数据分析技能的问题。该项目使用到的技术是SSM+Spring Boot、redis、rabbitMq、mysql等。在项目中,使用第三方AI助手平台编写一段系统预设prompt来生成指定的json,全局指定助手的职责、输入内容和回复格式。在项目中,由于AIGC是一个消耗资源和成本的重操作,所以使用了redisson提供的rateLimiter实现对单用户使用AI生成图表功能的限流,以保护系统。在AI生成内容时,处于服务能力的考虑,可能会出现第三方接口处理和返回时长较长,就引入异步化来提高用户体验,使用自定义线程池(JUC并发包中的ThreadPoolExcutor)+任务队列来管理线程、协调任务的执行。最后本项目使用RabbitMq把任务提交改为向消息队列发送消息来解决异步化是通过本地线程池实现带来的限制,实现应用解耦。测试得出,若应用程序中断,消息未确认,还会重发消息用以消费。
1.智能分析:用户输入目标和原始数据,自动生成图表和结论
2.图表管理
3.图表生成的异步化(消息队列)
4.对接AI能力
Spring Boot+Mysql+Mybatis Plus+消息队列(RabbitMQ)+AI能力
- -- 用户表
- create table if not exists user
- (
- id bigint auto_increment comment 'id' primary key,
- userAccount varchar(256) not null comment '账号',
- userPassword varchar(512) not null comment '密码',
- userName varchar(256) null comment '用户昵称',
- userAvatar varchar(1024) null comment '用户头像',
- userRole varchar(256) default 'user' not null comment '用户角色:user/admin',
- createTime datetime default CURRENT_TIMESTAMP not null comment '创建时间',
- updateTime datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间',
- isDelete tinyint default 0 not null comment '是否删除',
- index idx_userAccount (userAccount)
- ) comment '用户' collate = utf8mb4_unicode_ci;
- -- 图表表
- create table if not exists chart
- (
- id bigint auto_increment comment 'id' primary key,
- goal text null comment '分析目标',
- 'name' varchar(128) null comment '图表名称',
- chartData text null comment '图表数据',
- chartType varchar(128) null comment '图表类型',
- genChart text null comment '生成的图表数据',
- genResult text null comment '生成的分析结论',
- status varchar(128) not null default 'wait' comment 'wait,running,succeed,failed',
- execMessage text null comment '执行信息',
- userId bigint null comment '创建用户 id',
- createTime datetime default CURRENT_TIMESTAMP not null comment '创建时间',
- updateTime datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间',
- isDelete tinyint default 0 not null comment '是否删除'
- ) comment '图表信息表' collate = utf8mb4_unicode_ci;
调用AI:
系统预设(提前告诉他职责、功能、回复格式要求)+分析目标+压缩后的数据
系统预设例子:你是一个数据分析师,接下来我会告诉你我的分析目标和原始数据,请帮我分析并告诉我结论。
Prompt 预设:
你是一个数据分析师和前端开发专家,接下来我会按照以下固定格式给你提供内容: 分析需求: {数据分析的需求或者目标} 原始数据: {csv格式的原始数据,用,作为分隔符} 请根据这两部分内容,按照以下指定格式生成内容(此外不要输出任何多余的开头、结尾、注释) 【【【【【 {前端 Echarts V5 的 option 配置对象js代码,合理地将数据进行可视化,不要生成任何多余的内容,比如注释} 【【【【【 {明确的数据分析结论、越详细越好,不要生成多余的注释}
生成图表:AI无法直接生成现成的图表、但是AI可以生成图标代码--->可以把代码利用前端的组件库(Echarts)在网页展示
根据用户的输入,最后返回图表信息和结论文本
1.构造用户请求:用户消息、csv数据、图表类型
2.调用SDK,得到AI响应结果
3.从AI响应结果中,取出需要的信息
4.保存图表到数据库
1.校验文件传入:解决用户上传一个超大的文件
2.限流:解决用户频繁点击提交,导致服务器资源被占满,其他用户无法使用,控制成本,限制用户调用总次数
3.异步化:解决调用的服务处理能力有限,或者接口的处理时间较长。当用户要进行耗时很长的操作时,点击提交后,不需要在界面等待,而是把这个任务保存到数据库中记录下来。当任务提交成功时,如果我们的程序还有多余的线程空闲,可以立刻执行这个任务,若没有空闲的,则放入到等待队列中。当任务提交失败时,若没有空闲线程,任务队列满了的情况下,会拒绝这个任务或者保存到数据库中记录失败的任务,并且在程序空闲的时候,可以把这个任务拉出来在执行。
4.自定义线程池:解决线程管理复杂、任务存取复杂问题。使用线程池帮助轻松管理线程、协助调取任务的执行过程。
(1).自定义线程池:
- @Configuration
- public class ThreadPoolExecutorConfig {
-
- @Bean
- public ThreadPoolExecutor threadPoolExecutor() {
- ThreadFactory threadFactory = new ThreadFactory() {
- private int count = 1;
-
- @Override
- public Thread newThread(@NotNull Runnable r) {
- Thread thread = new Thread(r);
- thread.setName("线程" + count);
- count++;
- return thread;
- }
- };
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(4), threadFactory);
- return threadPoolExecutor;
- }
- }
(2).提交任务到线程池:
- CompletableFuture.runAsync(() -> {
- System.out.println("任务执行中:" + name + ",执行人:" + Thread.currentThread().getName());
- try {
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, threadPoolExecutor);
5.消息队列:本项目的异步通过本地的线程池实现,可能会出现数据持久化、可扩展行、应用耦合较高情况,使用分布式消息队列可以解决上述问题。使用消息队列后,如果程序中断了,消息没有确认,还会重发
本项目使用的是RabbitMQ
实现步骤:
创建交换机和队列
将线程池中的执行代码移到消费者类中
根据消费者的需求来确认消息的格式(chartId)
将提交线程池改造为发送消息到队列
(1).引入依赖:使用的版本需要和自己的springboot版本一致
-
-
-
org.springframework.boot -
spring-boot-starter-amqp -
2.7.2 -
(2).在yml中引入配置
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- password: guest
- username: guest
(3).创建交换机和队列
- public class BiInitMain {
- public static void main(String[] args) {
-
- try {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- //交换机名称
- String EXCHANGE_NAME=BiMqConstant.BI_EXCHANGE_NAME;
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
-
- // 创建队列,随机分配一个队列名称
- String queueName = BiMqConstant.BI_QUEUE_NAME;
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, EXCHANGE_NAME, BiMqConstant.BI_ROUTING_KEY);
-
- }catch(Exception e){
-
- }
-
- }
- }
(4).生产者代码
- @Component
- public class BiMessageProducer {
-
- /**
- * 发送消息
- */
- @Resource
- public RabbitTemplate rabbitTemplate;
-
- public void sendMessage(String message){
- rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME,BiMqConstant.BI_ROUTING_KEY,message);
-
- }
- }
(5).消费者代码:
-
- @Component
- @Slf4j
- public class BiMessageConsumer {
-
- @Resource
- private ChartService chartService;
-
- @Resource
- private AiManager aiManager;
-
- //指定程序监听的消息队列和确认机制
- @SneakyThrows
- @RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME},ackMode = "MANUAL")
- public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliverTag){
- if(StringUtils.isBlank(message)){
- //如果失败,消息拒接
- channel.basicNack(deliverTag,false,false);
- throw new BusinessException(ErrorCode.SYSTEM_ERROR,"消息为空");
- }
- long chartId=Long.parseLong(message);
- Chart chart=chartService.getById(chartId);
- if(chart==null){
- channel.basicNack(deliverTag,false,false);
- throw new BusinessException(ErrorCode.NOT_FOUND_ERROR,"图表为空");
- }
-
-
- // 先修改图表任务状态为 “执行中”。等执行成功后,修改为 “已完成”、保存执行结果;执行失败后,状态修改为 “失败”,记录任务失败信息。
- Chart updateChart = new Chart();
- updateChart.setId(chart.getId());
- updateChart.setStatus("running");
- boolean b = chartService.updateById(updateChart);
- if (!b) {
- channel.basicNack(deliverTag,false,false);
- handleChartUpdateError(chart.getId(), "更新图表执行中状态失败");
- return;
- }
- // 调用 AI
- String result = aiManager.doChat(CommonConstant.BI_MODEL_ID, buildUserInput(chart));
- String[] splits = result.split("【【【【【");
- if (splits.length < 3) {
- channel.basicNack(deliverTag,false,false);
- handleChartUpdateError(chart.getId(), "AI 生成错误");
- return;
- }
- String genChart = splits[1].trim();
- String genResult = splits[2].trim();
- Chart updateChartResult = new Chart();
- updateChartResult.setId(chart.getId());
- updateChartResult.setGenChart(genChart);
- updateChartResult.setGenResult(genResult);
- // todo 建议定义状态为枚举值
- updateChartResult.setStatus("succeed");
- boolean updateResult = chartService.updateById(updateChartResult);
- if (!updateResult) {
- channel.basicNack(deliverTag,false,false);
- handleChartUpdateError(chart.getId(), "更新图表成功状态失败");
- }
- //消息确认
- channel.basicAck(deliverTag,false);
- }
-
- /**
- * 构造用户输入
- * @param chart
- * @return
- */
- private String buildUserInput(Chart chart){
- String goal=chart.getGoal();
- String chartType= chart.getChartType();
- String csvData=chart.getChartData();
- //构造用户输入
- StringBuilder userInput = new StringBuilder();
- userInput.append("分析需求:").append("\n");
- //拼接分析目标
- String userGoal=goal;
- if(StringUtils.isNotBlank(chartType)){
- userGoal+=",请使用"+chartType;
- }
- userInput.append(userGoal).append("\n");
- userInput.append("原始数据:").append("\n");
- userInput.append(csvData).append("\n");
- return userInput.toString();
- }
-
- private void handleChartUpdateError(long chartId,String execMessage){
- Chart updateChartResult=new Chart();
- updateChartResult.setId(chartId);
- updateChartResult.setStatus("failed");
- updateChartResult.setExecMessage("execMessage");
- boolean updateResult = chartService.updateById(updateChartResult);
- if(!updateResult){
- log.error("更新图表失败状态失败" + chartId + "," + execMessage);
- }
- }
-
-
-
- }