• Canal+Kafka实现MySQL与Redis数据同步(二)


    Canal+Kafka实现MySQL与Redis数据同步(二)

    创建MQ消费者进行同步

    在application.yml配置文件加上kafka的配置信息:

    spring:
      kafka:
          # Kafka服务地址
        bootstrap-servers: 127.0.0.1:9092
        consumer:
          # 指定一个默认的组名
          group-id: consumer-group1
          #序列化反序列化
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringDeserializer
          value-serializer: org.apache.kafka.common.serialization.StringDeserializer
          # 批量抓取
          batch-size: 65536
          # 缓存容量
          buffer-memory: 524288
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:

    public class CanalBean {
        //数据
        private List<TbCommodityInfo> data;
        //数据库名称
        private String database;
        private long es;
        //递增,从1开始
        private int id;
        //是否是DDL语句
        private boolean isDdl;
        //表结构的字段类型
        private MysqlType mysqlType;
        //UPDATE语句,旧数据
        private String old;
        //主键名称
        private List<String> pkNames;
        //sql语句
        private String sql;
        private SqlType sqlType;
        //表名
        private String table;
        private long ts;
        //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
        private String type;
        //getter、setter方法
    }
    public class MysqlType {
        private String id;
        private String commodity_name;
        private String commodity_price;
        private String number;
        private String description;
        //getter、setter方法
    }
    public class SqlType {
        private int id;
        private int commodity_name;
        private int commodity_price;
        private int number;
        private int description;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    最后就可以创建一个消费者CanalConsumer进行消费:

    @Component
    public class CanalConsumer {
        //日志记录
        private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);
        //redis操作工具类
        @Resource
        private RedisClient redisClient;
        //监听的队列名称为:canaltopic
        @KafkaListener(topics = "canaltopic")
        public void receive(ConsumerRecord<?, ?> consumer) {
            String value = (String) consumer.value();
            log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);
            //转换为javaBean
            CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
            //获取是否是DDL语句
            boolean isDdl = canalBean.getIsDdl();
            //获取类型
            String type = canalBean.getType();
            //不是DDL语句
            if (!isDdl) {
                List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();
                //过期时间
                long TIME_OUT = 600L;
                if ("INSERT".equals(type)) {
                    //新增语句
                    for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                        String id = tbCommodityInfo.getId();
                        //新增到redis中,过期时间是10分钟
                        redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                    }
                } else if ("UPDATE".equals(type)) {
                    //更新语句
                    for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                        String id = tbCommodityInfo.getId();
                        //更新到redis中,过期时间是10分钟
                        redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                    }
                } else {
                    //删除语句
                    for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                        String id = tbCommodityInfo.getId();
                        //从redis中删除
                        redisClient.deleteKey(id);
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    测试MySQL与Redis同步

    mysql对应的表结构如下:

    CREATE TABLE `tb_commodity_info` (
      `id` varchar(32) NOT NULL,
      `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
      `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
      `number` int(10) DEFAULT '0' COMMENT '商品数量',
      `description` varchar(2048) DEFAULT '' COMMENT '商品描述',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    首先在MySQL创建表。然后启动项目,接着新增一条数据:

    INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉包', '3.99', '3', '大叉包,老喜欢');
    
    • 1

    tb_commodity_info表查到新增的数据:

    img

    Redis也查到了对应的数据,证明同步成功!

    img

    如果更新呢?试一下Update语句:

    UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='便宜的青菜包' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';
    
    • 1

    img

    img

    没有问题!

    总结

    canal的缺点:

    1. canal只能同步增量数据。
    2. 不是实时同步,是准实时同步。
    3. 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
    4. MQ顺序性问题。
      网的回答,大家参考一下
      img

    尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。

  • 相关阅读:
    Tomcat 部署 war 包
    回调函数和钩子函数
    Android OpenGL ES 学习(五) -- 渐变色
    Day07--wxs的概念以及其基本的用法
    数据库测试技术点
    基于反序位域的大端协议处理方法
    【概率论】条件概率、贝叶斯公式、相关系数、中心极限定理、参数估计、假设检验
    Python基础入门例程14-NP14 不用循环语句的重复输出(字符串)
    vue路由与nodeJS环境搭建
    【DevOps基础篇】容器化架构基础设施监控方案
  • 原文地址:https://blog.csdn.net/weixin_46294086/article/details/134489332