• Canal实现Mysql数据增量同步更新至Mysql/Redis


    大家好,我是孙嵓,今天这篇文章可有点东西啊,我觉得这个场景你司估计也用得到,其实这个功能不归我管,但我实在是看不下去同事对这个业务的操作,居然写了个定时任务每隔五分钟对表数据进行查询看看有没有新增更新的内容,没错我当时就是这个表情。
    在这里插入图片描述

    我顿时觉得单位工资给我开的太少了,单位不配拥有我,废话少说,这个周必须得谈谈提薪了,步入正题吧。
    在这里插入图片描述

    项目需求:

    提示:这里简述项目相关背景:

    场景一: A主系统有一张部门表,B子系统也需要部门表于是把A数据库的部门表复制到了B数据库;但是A系统如果要rud部门表的信息的话,B这边无法实时的更新部门表的信息就会触发一些不必要的问题

    **场景二:**A系统有一个表信息缓存到redis了,当表信息要rud的时候,redis没有实时更新造成面试经典的缓存一致性问题。

    本文只针对单体应用做说明,集群部署的话这边建议自行摸索亲~

    名词解释(Canal)

    Canal主要用途是基于MySQL 数据库增量日志解析,提供增量数据订阅和消费

    基于日志增量订阅和消费的业务包括

    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护(拆分异构索引、倒排索引等)
    • 业务 cache 刷新
    • 带业务逻辑的增量数据处理
      当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

    更多详细介绍请认准官方链接:

    https://github.com/alibaba/canal


    部署实现

    环境:windows10,mysql8.0.21,rabbitmq3.10.6,redis5.0.9, canal1.1.6

    下载canal应用包(目前只用到了deployer就能实现需求):https://github.com/alibaba/canal/releases

    1.修改mysql配置文件

    linux:

    vim /etc/my.cnf
    
    • 1

    windows:
    mysql安装目录下的my.ini

    拿my.ini为例,开启binlog,mysql8.0好像是自动开启的,其余版本需要手动开

    [mysqld]
    log-bin=mysql-bin # 开启binlog
    binlog-format=ROW # 选择ROW模式
    server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可
    
    • 1
    • 2
    • 3
    • 4

    校验是否成功

    SHOW VARIABLES LIKE 'log_bin';
    
    • 1

    2.修改canal配置文件

    就是你下载的那个deployer包有几个配置文件需要下载,这里我们分情况讨论我们实现了两种方案一种是TCP,一种是RabbitMQ。

    这里只列举需要改动的配置,其他的默认即可

    conf/example/instance.properties

    数据库查看binlog查看大小语句

    show master status
    
    • 1

    tcp、rabbitmq通用配置

    # position info
    # 数据库所在地址
    canal.instance.master.address=127.0.0.1:3306
    # binlog日志名
    canal.instance.master.journal.name=binlog.000090
    # binlog日志偏移量其实就是他的大小(从这个大小开始进行增量监控)
    canal.instance.master.position=436
    
    # 数据库用户名和密码及字符
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.connectionCharset = UTF-8
    
    # table regex
    # 数据解析关注的表,Perl正则表达式.
    # 这个代表监控所有表
    # canal.instance.filter.regex = .*\\..*
    # 监控某个库表test库下的test表
    canal.instance.filter.regex = test.test
    # table black regex
    # 过滤不需要监控的表,mysql8.0启动实例的时候会报错有的表明带BASE于是先将其过滤
    canal.instance.filter.black.regex=.*\\.BASE.*
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    rabbitmq需要添加的配置

    # mq config
    # 这个是交换机的绑定队列的routing key
    canal.mq.topic=canal.routing.key
    
    • 1
    • 2
    • 3

    conf/canal.properties
    tcp方式维持默认配置即可

    rabbitmq需要修改如下配置

    # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
    canal.serverMode = rabbitMQ
    ##################################################
    ######### 		    RabbitMQ	     #############
    ##################################################
    rabbitmq.host = 127.0.0.1
    rabbitmq.virtual.host =/
    rabbitmq.exchange =canal.exchange
    rabbitmq.username =guest
    rabbitmq.password =guest
    rabbitmq.deliveryMode =
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.双击bin目录下的startup.bat启动服务


    代码实现

    这里只贴rabbitmq实现方式,想要tcp模式的话下次再说吧,贴不过来了

    引入依赖

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    <dependency>
        <groupId>com.alibaba.ottergroupId>
        <artifactId>canal.clientartifactId>
        <version>1.1.6version>
    dependency>
    <dependency>
        <groupId>com.alibaba.ottergroupId>
        <artifactId>canal.protocolartifactId>
        <version>1.1.6version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    添加rabbitmq配置

    spring:
      # rabbitmq
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: /
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    RabbitMQ监听
    这里用的Hutool的数据库工具类,加了事务管理

    /**
     * Canal + RabbitMQ 监听数据库数据变化
     *
     * @author sunyan
     */
    @Component
    public class CanalRabbitMQListener {
        @Autowired
        private RedisService redisService;
        @Autowired
        private SpringApplicationConfig config;
        private static final Logger log = LoggerFactory.getLogger(CanalRabbitMQListener.class);
        private static String ERRORTIMES = "5";
        private Session session;
        
        CanalRabbitMQListener() {
            //默认数据源
            this.session = Session.create();
        }
        @RabbitListener(bindings = {
                @QueueBinding(
                        value = @Queue(value = "canal.queue", durable = "true"),
                        exchange = @Exchange(value = "canal.exchange"),
                        key = "canal.routing.key"
                )
        })
        public void handleDataChange(@Header(AmqpHeaders.CHANNEL) Channel channel, Message msg, @Payload CanalMessage message) throws IOException {
    //        JSONObject object = JSONObject.parseObject(message);
            log.info("Canal监听到数据发生变化\n库名:{}\n表名:{}\n类型:{}\n数据:{}", message.getDatabase(), message.getTable(), message.getType(), message.getData());
            /**
             * TODO 同步Mysql和Redis
             */
            String type = message.getType();
            Entity entity = Entity.create(message.getTable());
            List<LinkedHashMap<String, String>> data = message.getData();
            List<LinkedHashMap<String, String>> old = message.getOld();
            if (ObjectUtil.isNotEmpty(old)) {
                old.get(0).keySet().forEach(column ->
                        entity.set(column, data.get(0).get(column))
                );
            } else {
                data.get(0).keySet().forEach(column ->
                        entity.set(column, data.get(0).get(column))
                );
            }
            String keyName = (String) message.getPkNames().get(0);
            String keyId = data.get(0).get(keyName);
            Entity where = Entity.create(message.getTable()).set(keyName, keyId);
            try {
                session.beginTransaction();
                //判断执行哪个DML操作
                if ("INSERT".equals(type)) {
                    session.insert(entity);
                } else if ("UPDATE".equals(type)) {
                    session.update(entity, where);
                } else if ("DELETE".equals(type)) {
                    session.del(where);
                }
                session.commit();
            } catch (SQLException throwables) {
                //redis计数器,判断达到最大次数手动进行ack
                if(redisService.judgeMaxRequestTimes(config.getName() + message.getTable() + message.getId(), ERRORTIMES)){
                    channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
                }
                session.quietRollback();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    CanalMessage

    public class CanalMessage<T> {
        @JsonProperty("type")
        private String type;
        @JsonProperty("table")
        private String table;
        @JsonProperty("data")
        private List<T> data;
        @JsonProperty("database")
        private String database;
        @JsonProperty("es")
        private Long es;
        @JsonProperty("id")
        private Integer id;
        @JsonProperty("isDdl")
        private Boolean isDdl;
        @JsonProperty("old")
        private List<T> old;
        @JsonProperty("pkNames")
        private List<String> pkNames;
        @JsonProperty("sql")
        private String sql;
        @JsonProperty("ts")
        private Long ts;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    遇到的问题

    • DDL类型的操作导致消费者消息逻辑一直循环报错无法消费
    1. 修改canal.properties,将关于ddl的全部设为true即可过滤ddl类的操作
    canal.instance.filter.druid.ddl = true
    canal.instance.filter.query.ddl = true
    
    • 1
    • 2
    1. 如果没有修改的话CanalMessage类有一成员变量是isDdl,可以判断是否为true手动执行ack进行消费
    • 消费异常无限循环需要手动ack消费
      如上RabbitMq监听类,通过redis计数器进行监控消费失败次数,达到最大值即可手动ack防止无限循环堵塞,要注意key键的命名啊如果你是多子系统我这里用的是服务名+表名+message.id(这个id好像是唯一的,至于为什么是好像因为我没看源码),别忘了设置过期时间。

    还有关于redis计数器的代码在之前的文章中有介绍,就是短信那一篇,下方会给链接。

    • mq数据转换的问题
      @Payload注解转换message消息时报出异常

    Listener method could not be invoked with the incoming messageEndpoint handler details:Method

    这个我们只需要加个converter增添对实体类的转换

    @Component
    public class MsgConverter {
        @Bean
        public MessageConverter jsonMessageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 主系统更新多个子系统数据库同步更新
      通过我们如上的配置,是根据rabbitmq的direct模式去进行传递消息的;但是这不满足我们多个系统同步更新的需求,如果只放到一个队列那多个子系统去抢占这个队列的消息,其他系统就得不到更新了。

    于是我们决定采用rabbitmq的发布、订阅模式去实现一主多从的数据同步更新,具体改造如下,记得重启

    配置改造

    canal.properties(换成发布、订阅模式)

    rabbitmq.deliveryMode =fanout
    
    • 1

    instance.properties(将绑定的路由key置为空)

    canal.mq.topic=
    
    • 1

    类的改造

    @Component
    public class FanoutAListener {
         //省略共同代码,需要改造的就是注解换成只绑定queue队列即可
          @RabbitListener(queues = "canal.a")
          public void handleDataChange(@Header(AmqpHeaders.CHANNEL) Channel channel, Message msg, @Payload CanalMessage message) throws IOException {
                    ......
          }
    }
    @Component
    public class FanoutBListener {
         //省略共同代码,需要改造的就是注解换成只绑定queue队列即可
          @RabbitListener(queues = "canal.b")
          public void handleDataChange(@Header(AmqpHeaders.CHANNEL) Channel channel, Message msg, @Payload CanalMessage message) throws IOException {
                    ......
          }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    Redis就没什么好说的了,把参数message直接换成String类型配对key更新就完了


    以上就是本文的全部内容了,能力有限,理性对待

    如果感觉还不错的话,欢迎点赞和关注🦋

    分享经验,贴近项目,crud永不为奴!!!


    欢迎大家关注我的公众号,公众号也会实时发布Java项目相关的文章!!!


    在这里插入图片描述
    1.一文搭建本地git服务器

    2.Java实现短信验证码


  • 相关阅读:
    【OS命令注入】常见OS命令执行函数以及OS命令注入利用实例以及靶场实验—基于DVWA靶场
    Embedding技术与应用 (2) :神经网络的发展及现代Embedding方法简介
    《向量数据库指南》——Range Search 的技术实现细节
    个人博客系统
    【前端小点】ElementUI-Dialog标题添加图标
    Tableau 自定义调色板及应用全流程讲解【保姆级】
    Ubuntu20.04下vim的安装,配置及使用
    MySQL读写分离
    [Machine Learning] 稀疏编码和矩阵分解
    Dubbo-Adaptive实现原理
  • 原文地址:https://blog.csdn.net/weixin_44166627/article/details/125988190