• 数据同步到Redis消息队列,并实现消息发布/订阅


    一、假设需求:

    • 某系统在MySQL某表中操作了一条数据
    • 在其他系统中,实时获取最新被操作数据的数据库名、数据表名、操作类型、数据内容

    应用场景:
    按最近项目的一个需求来说:
    1.当某子系统向报警表中新增了一条报警数据;
    2.项目中各个子系统需要获取刚刚新增的报警数据;
    3.如果使用传统入库查库方式:

    • 大批量插入时获取最新的报警数据需要新增查询逻辑
    • 频繁获取最新新增数据效率较低

    二、实现思路

    • 使用ApplicationListener监听数据库
    • 将监听到的数据同步并发布到Redis消息队列中
    • 其他系统订阅Redis消息队列频道获取新增的最新数据

    三、代码实现

    • 引入redis客户端依赖(SpringBoot并未集成)
    		<dependency>
                <groupId>redis.clientsgroupId>
                <artifactId>jedisartifactId>
                <version>5.0.0version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 创建数据同步事件
    public class MessageEvent extends ApplicationEvent {
    
        private CdcMessage message;
    
        /**
         * 初始化对象
         * 
         * @param source
         */
        public MessageEvent(Object source, CdcMessage message) {
            super(source);
            this.message = message;
        }
    
        @Override
        public Object getSource() {
            return super.getSource();
        }
    
        public CdcMessage getMessage() {
            return this.message;
        }
    
        public void setMessage(CdcMessage message) {
            this.message = message;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 创建数据信息类CdcMessage
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public class CdcMessage implements Serializable {
        /**
         * 数据
         */
        private JSONObject data;
        /**
         * 数据库类型
         */
        private String dbType;
        /**
         * 处理类型(UPDATE DELETE CREATE)
         */
        private String handleType;
        /**
         * 数据库名
         */
        private String database;
        /**
         * 表名
         */
        private String table;
        
        /**
         * JSON 转对象
         *
         * @param clazz 转换类型
         * @param    泛型
         * @return 集合结果
         */
        public <T> List<T> toBean(Class<T> clazz) {
            List<T> rst = new LinkedList<>();
            rst.add(JSON.toJavaObject(data, clazz));
            return rst;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 创建数据同步方法(实现ApplicationListener数据监听接口,实现onApplicationEvent方法)
    @Slf4j
    @Component
    public class Process implements ApplicationListener<MessageEvent> {
        
        @Override
        public void onApplicationEvent(MessageEvent event) {
            CdcMessage message = event.getMessage();
            // 当TableName表进行新增操作时,执行数据同步操作
            if ("TableName".equalsIgnoreCase(message.getTable()) && "CREATE".equals(message.getHandleType())) {
                // 创建Jedis对象,连接到Redis服务器
                Jedis jedis = new Jedis("ip", 6379);
                // 设置认证密码
                jedis.auth("psssword");
                JSONObject messageData = message.getData();
                // 发布消息给消费者
                jedis.publish("频道名称", JSON.toJSONString(messageData ));
                // 关闭Jedis连接
                jedis.close();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    四、测试

    • 编写测试代码(消息订阅)
    @Test
        public void test() {
            // 创建Jedis对象,连接到Redis服务器
            Jedis jedis = new Jedis("ip", 6379);
            // 设置认证密码
            jedis.auth("password");
            // 创建消息订阅器对象
            JedisPubSub jedisPubSub = new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    // 在接收到消息时执行的逻辑,可以根据实际需求进行编写
                    System.out.println(message);
                }
            };
            // 订阅指定频道
            jedis.subscribe(jedisPubSub, "频道名称");
            // 关闭Jedis连接
            jedis.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 新增数据

    在这里插入图片描述

    • 获取消息订阅数据

    在这里插入图片描述

    五、总结

    该功能主要实现方式为传统数据监听+MQ消息发布/订阅。由于该项目系统MQ只集成了Redis,所以未使用四大MQ从而使用Redis。

  • 相关阅读:
    几种常见的跨域解决方法
    centos 中:Nginx开启https和局域网访问配置
    1039 Course List for Student
    移动端页面秒开优化总结
    阿里云大数据实战记录2:调度实例跑完数据表为空?
    LeetCode50天刷题计划(Day 11—— 最接近的三数之和(8.40-10.00)
    【小程序 - 基础】页面导航、页面事件、生命周期、WXS脚本_04
    javaee springMVC model的使用
    mysql5.7免安装版本
    Netty(四)- NIO三大组件之Selector
  • 原文地址:https://blog.csdn.net/Odinpeng/article/details/134273554