• Canal实现Mysql数据同步至Redis、Elasticsearch


    1.Canal简介

    官网
    https://github.com/alibaba/canal
    在这里插入图片描述
    在这里插入图片描述

    canal [kə’næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

    早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

    基于日志增量订阅和消费的业务包括

    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护(拆分异构索引、倒排索引等)
    • 业务 cache 刷新
    • 带业务逻辑的增量数据处理

    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

    1.1 MySQL主备复制原理

    在这里插入图片描述

    • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
    • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
    • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

    1.2 canal工作原理

    • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
    • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    • canal 解析 binary log 对象(原始为 byte 流)

    2.开启MySQL Binlog

    对于自建 MySQL , 需要先开启 Binlog 写入功能,
    配置 binlog-format 为 ROW 模式,这里以mysql8.0.27为例,my.cnf 中配置如下

    #开启bInlog
    log-bin=mysql-bin
    #以数据的方式写binlog日志 :statement 是记录SQL,row是记录数据
    binlog-format=ROW
    binlog-ignore-db=mysql
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述
    在这里插入图片描述
    修改后,重启mysql服务。

    • 创建cannal
    • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限,
    flush privileges;
    #创建用户cannal
    CREATE USER canal IDENTIFIED BY 'canal';
    #把所有权限赋予canal,密码也是canal
    GRANT ALL PRIVILEGES ON canaldb.user TO 'canal'@'%' identified by "canal";
    //GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' identified by "canal";
    #刷新权限
    flush privileges;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    如果已有账户可直接 grant

    ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; #更新一下用户密码
    FLUSH PRIVILEGES; #刷新权限
    
    • 1
    • 2

    通过以下命令,可以查看mysql用户信息

    #查看所有数据库
    show databases;
    #使用mysql数据库
    use mysql;
    #查看当前库下所有表
    show tables;
    #查看user表
    select Host, User from user;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.安装Canal

    3.1 下载Canal

    点击下载地址,选择版本后点击canal.deployer文件下载

    https://github.com/alibaba/canal/releases
    在这里插入图片描述

    3.2 修改配置文件

    打开目录下conf/example/instance.properties文件,主要修改以下内容

    ## mysql serverId,不要和 mysql 的 server_id 重复
    canal.instance.mysql.slaveId = 10
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306 
    #username/password,需要改成自己的数据库信息,与刚才添加的用户保持一致
    canal.instance.dbUsername = canal  
    canal.instance.dbPassword = canal
    复制代码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.3 启动和关闭

    #进入文件目录下的bin文件夹
    #Linux启动
    sh startup.sh
    ##Linux关闭
    sh stop.sh
    #Windows启动
    双击 startup.bat
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    4.SpringCloud集成Canal

    4.1 Canal数据结构在这里插入图片描述

    4.2 引入依赖

    父工程规定版本,子工程引用

    
        <properties>
          <canal.version>1.2.1-RELEASEcanal.version>
        properties>
    
    
        
        <dependencyManagement>
            <dependencies>
                
                <dependency>
                    <groupId>top.javatoolgroupId>
                    <artifactId>canal-spring-boot-starterartifactId>
                    <version>${canal.version}version>
                dependency>
    
            dependencies>
        dependencyManagement>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    4.3 配置多个数据同步的目的地

    由于我们这里是多个服务对应同一个canal端,则需要配置多个数据同步的目的地

    在canal的安装目录下打开canal.deployer-1.1.6\conf\canal.properties文件

    在canal.destinations = example后面添加多个数据目录,用逗号分割,一个服务对应一个目录,这里默认只有一个example

    #################################################
    ######### 		destinations		#############
    #################################################
    canal.destinations = example,ad,goods,course,order,secKill,auth
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    配置好后,重启canal服务
    之后会看到会看到canal/conf目录下新增了这些数据目录的文件夹
    我们需要将默认的example文件夹中的instance.properties配置文件复制到新创建的自定义数据目录中
    在这里插入图片描述
    在这里插入图片描述

    4.4 application.yml

    canal:
      #canal的地址
      server: 127.0.0.1:11111 
      #数据同步的目的地
      destination: goods
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4.5 监听配置

    去实现EntryHandler接口,添加自己的业务逻辑,比如缓存的删除更新插入,实现对增删改查的逻辑重写。

    4.5 监听配置

    去实现EntryHandler接口,添加自己的业务逻辑,比如缓存的删除更新插入,实现对增删改查的逻辑重写。

    canal-client提供了EntryHandler,该handler中提供了insert,delete,update方法,当监听到某张表的相关操作后,会回调对应的方法把数据传递进来,我们就可以拿到数据往Redis同步了。

    • @CanalTable(“employee”) :监听的表
    • EntryHandler : 拿到employee表的改变后的数据之后,会封装为Employee实体 投递给我们
    4.5.1 监听测试类
    @CanalTable("test")
    @Component
    @Slf4j
    public class TestHandler implements EntryHandler<Test> {
    
        @Resource
        private RedisService redisService;
    
        @Override
        public void insert(Test test) {
            log.info("新增Test:"+test);
        }
    
        @Override
        public void delete(Test test) {
            log.debug("删除Test:"+test);
        }
    
        @Override
        public void update(Test before, Test after) {
            log.info("修改前Test:"+before);
            log.info("修改后Test:"+after);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    在这里插入图片描述
    通过日志可以看到我们在navicat中对test这张表的增删改操作均被监听到了

    4.5.2 redis数据同步
    @CanalTable("employee")
    @Component
    @Slf4j
    public class EmployeeHandler implements EntryHandler<Employee> {
    
    	//把数据往Redis同步
        @Autowired
        private RedisTemplate<Object,Object> redisTemplate;
    
        @Override
        public void insert(Employee employee) {
            redisTemplate.opsForValue().set("EMP:"+employee.getId(),employee);
        }
    
        @Override
        public void delete(Employee employee) {
            redisTemplate.delete("EMP:"+employee.getId());
        }
    
        @Override
        public void update(Employee before, Employee after) {
            redisTemplate.opsForValue().set("EMP:"+after.getId(),after);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    4.5.3 Elasticsearch数据同步
    import top.javatool.canal.client.annotation.CanalTable;
    import top.javatool.canal.client.handler.EntryHandler;
    
    /**
     * GoodsHanlder : 数据同步处理器
     * 编写数据同步处理器,
     * canal-client提供了EntryHandler,
     * 该handler中提供了insert,delete,update方法,
     * 当监听到某张表的相关操作后,
     * 会回调对应的方法把数据传递进来,
     * 我们就可以拿到数据往Redis同步了。
     *
     * @author zyw
     * @create 2023/9/19 17:19
     */
    @CanalTable("t_goods")
    @Component
    @Slf4j
    public class GoodsHanlder implements EntryHandler<Goods> {
    
        @Resource
        private EsApiService esApiService;
        @Resource
        private RedisTemplate redisTemplate;
        @Resource
        private RedisService redisService;
    
        /**
         * 新增商品后同步es
         *
         * @param goods
         */
        @SneakyThrows
        @Override
        public void insert(Goods goods) {
            log.info("新增:" + goods);
            //同步至es
            try {
                boolean flag = esApiService.bulkRequest(EsConst.GOODS, String.valueOf(goods.getId()), goods);
                log.info("新增结果:" + flag);
            } catch (Exception e) {
                log.error("同步es新增:" + e.getMessage());
            }
            //同步redis
            Map<Long, Goods> goodsMap = (Map<Long, Goods>) redisTemplate.opsForValue().get(RedisEnum.GOODSLIST.getCode());
            goodsMap.put(goods.getId(),goods);
            redisService.setCacheObject(RedisEnum.GOODSLIST.getCode(), goodsMap);
    
        }
    
        /**
         * 更新商品后同步es
         *
         * @param before
         * @param after
         */
        @SneakyThrows
        @Override
        public void update(Goods before, Goods after) {
            log.info("修改前:" + before);
            log.info("修改后:" + after);
            //同步至es
            try {
                boolean flag = esApiService.updateDocument(EsConst.GOODS, String.valueOf(after.getId()), after);
                log.info("修改结果:" + flag);
            } catch (Exception e) {
                log.error("同步es更新:" + e.getMessage());
            }
            //同步redis
            Map<Long, Goods> goodsMap = (Map<Long, Goods>) redisTemplate.opsForValue().get(RedisEnum.GOODSLIST.getCode());
            goodsMap.put(after.getId(),after);
            redisService.setCacheObject(RedisEnum.GOODSLIST.getCode(), goodsMap);
        }
    
        /**
         * 删除商品后同步es
         *
         * @param goods
         */
        @SneakyThrows
        @Override
        public void delete(Goods goods) {
            log.info("删除:" + goods);
            //同步至es
            try {
                boolean flag = esApiService.deleteDocument(EsConst.GOODS, String.valueOf(goods.getId()));
                log.info("删除结果:" + flag);
            } catch (Exception e) {
                log.error("同步es更新:" + e.getMessage());
            }
            //同步redis
            //同步redis
            Map<Long, Goods> goodsMap = (Map<Long, Goods>) redisTemplate.opsForValue().get(RedisEnum.GOODSLIST.getCode());
            goodsMap.remove(goods.getId());
            redisService.setCacheObject(RedisEnum.GOODSLIST.getCode(), goodsMap);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    4.5.4 Es Api 封装业务类 EsApiService
    package com.youzi.elasticsearch.service;
    
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.delete.DeleteRequest;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.action.update.UpdateResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.common.xcontent.XContentType;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.List;
    
    /**
     * EsService : es API封装类
     *
     * @author zyw
     * @create 2023/9/19 17:31
     */
    @Component
    @Slf4j
    public class EsApiService {
    
        @Autowired
        @Qualifier("restHighLevelClient")
        private RestHighLevelClient client;
    
    
        /**
         * 更新文档信息
         * @param index 索引名称
         * @param id 更新文档的id
         * @param data 更新的对象
         * @return
         * @throws IOException
         */
        public boolean updateDocument(String index, String id, Object data) throws IOException {
            UpdateRequest request = new UpdateRequest(index, id);
            request.timeout(TimeValue.timeValueSeconds(1));
            request.doc(JSON.toJSONString(data), XContentType.JSON);
            UpdateResponse update = client.update(request, RequestOptions.DEFAULT);
            log.info("更新文档信息:" + update);
            return true;
        }
    
        /**
         * 插入单个
         *
         * @param index 索引名称
         * @param id 新增的文档id
         * @param data 新增的对象
         * @return
         * @throws IOException
         */
        public boolean bulkRequest(String index, String id,Object data) throws IOException {
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.timeout("10s");
            //批处理请求
            bulkRequest.add(new IndexRequest(index)
                    .id(id)
                    .source(JSON.toJSONString(data), XContentType.JSON));
            BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
            return bulk.hasFailures();
        }
    
    
        /**
         * 删除文档信息
         *
         * @param index 索引名称
         * @param id    删除文档的id
         * @throws IOException
         */
        public boolean deleteDocument(String index, String id) throws IOException {
            DeleteRequest request = new DeleteRequest(index, id);
            request.timeout(TimeValue.timeValueSeconds(1));
            DeleteResponse delete = client.delete(request, RequestOptions.DEFAULT);
            log.info("删除文档信息:" + delete);
            return true;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    4.6 canal日志

    如果不想让控制台一直打印某些信息,可以配置如下配置屏蔽AbstractCanalClient类process()一直打印this.log.info(“获取消息 {}”, message)。

    logging:
     level:
       #禁止AbstractCanalClient 打印常規日志 获取消息 {}
      top.javatool.canal.client: warn  
    
    • 1
    • 2
    • 3
    • 4

    4.7 第二种方案(解决数据库存在下划线,用上述方法,某些字段会为空)

    上面的方式只适合数据库字段和实体类字段,属性完全一致的情况;当数据库字段含有下划线的适合,因为我们直接去监听的binlog日志,里面的字段是数据库字段,因为跟实体类字段不匹配,所以会出现字段为空的情况,这个适合需要去获取列的字段,对字段进行属性转换,实现方法如下:

    4.7.1 引入依赖
            <dependency>
                <groupId>com.xpandgroupId>
                <artifactId>starter-canalartifactId>
                <version>0.0.1-SNAPSHOTversion>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    4.7.2 创建监听
    @CanalEventListener
    @Slf4j
    public class KafkaListener {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        /**
         * @param eventType 当前操作数据库的类型
         * @param rowData   当前操作数据库的数据
         */
        @ListenPoint(schema = "maruko", table = "kafka_test")
        public void listenKafkaTest(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
            KafkaTest kafkaTestBefore = new KafkaTest();
            KafkaTest kafkaTestAfter = new KafkaTest();
    
    
            //遍历数据获取k-v
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
    
    
            getEntity(beforeColumnsList, kafkaTestBefore);
            log.warn("获取到提交前的对象为:" + kafkaTestBefore);
    
            getEntity(afterColumnsList, kafkaTestAfter);
            log.warn("获取到提交后的对象为:" + kafkaTestAfter);
    
            //判断是新增还是更新还是删除
            switch (eventType.getNumber()) {
                case CanalEntry.EventType.INSERT_VALUE:
                case CanalEntry.EventType.UPDATE_VALUE:
                    redisTemplate.opsForValue().set("kafka_test" + kafkaTestAfter.getId(), kafkaTestAfter);
                    break;
                case CanalEntry.EventType.DELETE_VALUE:
                    redisTemplate.delete("kafka_test" + kafkaTestBefore.getId());
                    break;
            }
        }
    
        /**
         * 遍历获取属性转换为实体类
         *
         * @param columnsList
         * @param kafkaTest
         */
        private void getEntity(List<CanalEntry.Column> columnsList, KafkaTest kafkaTest) {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            for (CanalEntry.Column column : columnsList) {
                String name = column.getName();
                String value = column.getValue();
                switch (name) {
                    case KafkaTest.ID:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setId(Integer.parseInt(value));
                        }
                        break;
                    case KafkaTest.CONTENT:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setContent(value);
                        }
                        break;
                    case KafkaTest.PRODUCER_STATUS:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setProducerStatus(Integer.parseInt(value));
                        }
                        break;
                    case KafkaTest.CONSUMER_STATUS:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setConsumerStatus(Integer.parseInt(value));
                        }
                        break;
                    case KafkaTest.UPDATE_TIME:
                        if (StringUtils.hasLength(value)) {
                            try {
                                kafkaTest.setUpdateTime(format.parse(value));
                            } catch (ParseException p) {
                                log.error(p.getMessage());
                            }
                        }
                        break;
                    case KafkaTest.TOPIC:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setTopic(value);
                        }
                        break;
                    case KafkaTest.CONSUMER_ID:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setConsumerId(value);
                        }
                        break;
                    case KafkaTest.GROUP_ID:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setGroupId(value);
                        }
                        break;
                    case KafkaTest.PARTITION_ID:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setPartitionId(Integer.parseInt(value));
                        }
                        break;
                    case KafkaTest.PRODUCER_OFFSET:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setProducerOffset(Long.parseLong(value));
                        }
                        break;
                    case KafkaTest.CONSUMER_OFFSET:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setConsumerOffset(Long.parseLong(value));
                        }
                        break;
                    case KafkaTest.TEST:
                        if (StringUtils.hasLength(value)) {
                            kafkaTest.setTest(value);
                        }
                        break;
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    4.7.3 实体类
    @Data
    @TableName("kafka_test")
    public class KafkaTest {
    
        public static final String ID = "id";
    
        public static final String CONTENT = "content";
    
        public static final String PRODUCER_STATUS = "producer_status";
    
        public static final String CONSUMER_STATUS = "consumer_status";
    
        public static final String UPDATE_TIME = "update_time";
    
        public static final String TOPIC = "topic";
    
        public static final String CONSUMER_ID = "consumer_id";
    
        public static final String GROUP_ID = "group_id";
    
        public static final String PARTITION_ID = "partition_id";
    
        public static final String PRODUCER_OFFSET = "consumer_offset";
    
        public static final String CONSUMER_OFFSET = "producer_offset";
    
        public static final String TEST = "test";
    
        @TableId(type = IdType.AUTO)
        private Integer id;
    
        @TableField("content")
        private String content;
    
        @TableField("producer_status")
        private Integer producerStatus;
    
        @TableField("consumer_status")
        private Integer consumerStatus;
    
        @TableField("update_time")
        private Date updateTime;
    
        @TableField("topic")
        private String topic;
    
        @TableField("consumer_id")
        private String consumerId;
    
        @TableField("group_id")
        private String groupId;
    
        @TableField("partition_id")
        private int partitionId;
    
        @TableField("consumer_offset")
        private Long consumerOffset;
    
        @TableField("producer_offset")
        private Long producerOffset;
    
        @TableField("test")
        private String test;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    4.8 第三种方案(解决数据库存在下划线,某些字段会为空)

    实体类加上@Column注解

    /**
     * 商品表(TGoods)实体类
     *
     * @author zyw
     * @since 2023-07-15 17:09:12
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    @TableName("t_goods")
    public class Goods implements Serializable {
        private static final long serialVersionUID = -53376609655570945L;
        /**
         * 商品ID
         */
        @TableId(type = IdType.AUTO)
        @Column(name = "id")
        private Long id;
        /**
         * 商品名称
         */
        @TableField("goods_name")
        @Column(name = "goods_name")
        private String goodsName;
        /**
         * 商品标题
         */
        @TableField("goods_title")
        @Column(name = "goods_title")
        private String goodsTitle;
        /**
         * 商品图片
         */
        @TableField("goods_img")
        @Column(name = "goods_img")
        private String goodsImg;
        /**
         * 商品详情
         */
        @TableField("goods_detail")
        @Column(name = "goods_detail")
        private String goodsDetail;
        /**
         * 商品价格
         */
        @TableField("goods_price")
        @Column(name = "goods_price")
        private Double goodsPrice;
        /**
         * 商品库存
         */
        @TableField("goods_stock")
        @Column(name = "goods_stock")
        private Integer goodsStock;
    
        /**
         * 商家
         */
        @TableField("shop")
        @Column(name = "shop")
        private String shop;
    
        /**
         * 创建时间
         */
        @TableField("start_date")
        @Column(name = "start_date")
        @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
        @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
        private Date startDate;
    
        /**
         * 删除时间
         */
        @TableField("end_date")
        @Column(name = "end_date")
        @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
        @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
        private Date endDate;
    
    
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    在这里插入图片描述

    4.9 canal整合异常问题排查思路

    4…1 无法正常启动

    canal配置文件中mysql连接方式是否有效,

    是否已为canal单独配置mysql账号,并赋予权限

    服务对应的自定义数据存储目的地中配置文件是否完整,初始配置文件需要与默认的exmple中配置文件一致

    更改配置文件之后如果自定义的数据目录还是无法连接,则将对应目录下的meta.dat文件删除之后再重启

    4.9.2 使用canal监听数据 启动成功了 没有报错 不过一直监听不到消息
    1. 检查canal的配置,确保配置正确,特别是数据库的连接信息;
    2. 检查canal的日志,确保没有报错;
    3. 检查数据库的binlog是否开启,确保binlog格式为row;
    4. 检查数据库的binlog是否有变更,确保有变更;
    5. 检查canal的过滤规则,确保过滤规则正确;
    6. 检查canal的版本,确保版本与数据库版本兼容;
    7. 检查canal的运行环境,确保环境正确;
    8. 检查canal的配置文件,确保配置文件正确;
    9. 检查canal的运行状态,确保运行正常;
    10. 检查canal的监听端口,确保端口没有被占用。
  • 相关阅读:
    Mysql用户管理-权限(二)
    QFramework v1.0 使用指南 工具篇:09. SingletonKit 单例模板套件
    9.2 Plotting with pandas and seaborn(用pandas和seaborn绘图)
    Linux拔网线后网卡仍然处于激活状态
    Springboot 使用 Mybatis Plus LambdaQueryWrapper 构造器和注解自定义SQL
    从字符串中提取数字并重新编号
    【邀请函】Data & AI Con Shanghai 2023 | 大数据◆人工智能
    Clion中使用C/C++开发stm32程序
    Flink之Window窗口机制
    无人机/FPV穿越机的遥控器/接收机等配件厂商
  • 原文地址:https://blog.csdn.net/Zyw907155124/article/details/133085752