• canal安装与客户端使用



    mysql的binlog的实时数据订阅
    (1) canal安装与客户端使用
    (2) openfire 4.7.5 Web插件开发
    (3) 使用canal和openfire实现Mysql的实时数据订阅

    1、canal安装配置

    官网地址:https://github.com/alibaba/canal

    1.1、创建Mysql同步帐号

    对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    • 1
    • 2
    • 3
    • 4

    授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER flinkuser IDENTIFIED BY 'flinkpwd';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'flinkuser'@'%' ;
    FLUSH PRIVILEGES;
    
    • 1
    • 2
    • 3
    • 4

    1.2、canal server下载安装

    下载地址:https://github.com/alibaba/canal/releases
    本文使用1.1.6版本

    • canal.adapter-1.1.6.tar.gz:是canal的客户端适配器,可将其看作canal client。能够直接将canal同步的数据写入到目标数据库(hbase,rdb,es),rdb是关系型数据库比如MySQL、Oracle、PostgresSQL和SQLServer等,比较的快捷方便。
    • canal.admin-1.1.6.tar.gz:为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。
    • canal.deployer-1.1.6.tar.gz:可将其看做canal server。它负责伪装成mysql从库,接收、解析binlog并投递(不做处理)到指定的目标端(RDS、MQ 或 canal adapter)
    • canal.example-1.1.6.tar.gz:canal的客户端的一个使用例子。

    解压canal.deployer-1.1.6.tar.gz
    在这里插入图片描述

    1.3、canal配置文件

    properties配置分为两部分:

    • canal.properties (系统根配置文件)
    • instance.properties (instance级别的配置文件,每个instance一份)

    修改canal.properties配置
    只修改实例名称,其它保持不变

    #canal.destinations = example
    # 在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件,多个实例用逗号分隔,比如example1,example2
    canal.destinations = news
    canal.conf.dir = ../conf
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    修改instance.properties配置

    #################################################
    
    canal.instance.gtidon=false
    
    # position info
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    
    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=
    
    # table meta tsdb info
    canal.instance.tsdb.enable=true
    
    
    # username/password
    canal.instance.dbUsername=flinkuser
    canal.instance.dbPassword=flinkpwd
    canal.instance.connectionCharset = UTF-8
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    
    #mysql 数据解析关注的表,Perl正则表达式.
    #1.所有表:.*   or  .*\\..*
    #2.canal schema下所有表: canal\\..*
    #3.canal下的以canal打头的表:canal\\.canal.*
    #4.canal schema下的一张表:canal\\.test1
    #5.多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
    canal.instance.filter.regex=.*\\..*
    # mysql 数据解析表的黑名单,表达式规则见白名单的规则
    canal.instance.filter.black.regex=mysql\\.slave_.*
    
    # mq config
    canal.mq.topic=news
    canal.mq.partition=0
    #################################################
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    1.4、canal server启动和关闭

    启动

    sh bin/startup.sh
    
    • 1

    关闭

    sh bin/stop.sh
    
    • 1

    2、canal客户端例子

    2.1、基于canal 客户端api使用例子

    package com.penngo.canal;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.common.utils.AddressUtils;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    
    
    public class Main {
    
        public static void main(String args[]) {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                    11111), "news", "", "");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                int totalEmptyCount = 120;
                while (emptyCount < totalEmptyCount) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        emptyCount = 0;
                        printEntry(message.getEntries());
                    }
    
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
    
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
    
                EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
    
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    参考官方示例https://github.com/alibaba/canal/wiki/ClientExample

    2.2、SpringBoot整合Canal的客户端使用例子

    pom.xml

    
    
        4.0.0
        
            org.springframework.boot
            spring-boot-starter-parent
            3.0.10
            
        
        spring_canal2
        1.0-SNAPSHOT
        
            17
            17
            UTF-8
        
        
            
            
                com.alibaba.otter
                canal.client
                1.1.6
            
            
            
                com.alibaba.otter
                canal.protocol
                1.1.6
            
            
                mysql
                mysql-connector-java
                8.0.26
            
            
                org.springframework.boot
                spring-boot-starter-web
            
            
                org.projectlombok
                lombok
                true
            
        
        
            
                
                    org.springframework.boot
                    spring-boot-maven-plugin
                    
                        com.penngo.canal.CanalMain
                        
                            
                                org.projectlombok
                                lombok
                            
                        
                    
                
            
        
        
            
                alimaven
                aliyun maven
                https://maven.aliyun.com/repository/public/
            
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    application.yml

    canal:
      hostname: 127.0.0.1
      port: 11111
      destination: news
      username:
      password:
    
    logging:
      file:
        name: logs/app.log
      pattern:
        console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} %L - %msg%xEx%n'
        file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} %L - %msg%xEx%n'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    对应的配置类CanalConfig.java

    package com.penngo.canal.component;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.net.InetSocketAddress;
    
    @Configuration
    @ConfigurationProperties(prefix = "canal")
    @Data
    public class CanalConfig {
        private String hostname;
        private int port;
        private String destination;
        private String username;
        private String password;
    
        @Bean("canalConnector")
        public CanalConnector canalConnector() {
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname,
                    port), destination, username, password);
    
            return connector;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    客户端处理CanalClient.java

    package com.penngo.canal.component;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import jakarta.annotation.Resource;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    import java.util.List;
    
    @Component
    @Slf4j
    public class CanalClient {
        @Resource
        private CanalConnector canalConnector;
    
        public void run() {
            int batchSize = 1000;
            try {
                canalConnector.connect();
                canalConnector.subscribe(".*\\..*");// 订阅所有库下面的所有表
                // connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal
                canalConnector.rollback();
                try {
                    while (true) {
                        Message message = canalConnector.getWithoutAck(batchSize);
                        long batchId = message.getId();
                        int size = message.getEntries().size();
                        if (batchId == -1 || size == 0) {
                            Thread.sleep(1000);
                        } else {
                            printEntry(message.getEntries());
                        }
                        canalConnector.ack(batchId);
    
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } finally {
                canalConnector.disconnect();
            }
        }
    
        private static void printEntry(List<Entry> entrys) {
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
                System.out.println("rowChare ======>"+rowChage.toString());
    
                EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名称
                        entry.getHeader().getLogfileOffset(), //偏移量
                        entry.getHeader().getSchemaName(),//庫名
                        entry.getHeader().getTableName(), //表名
                        eventType));//事件名
    
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    CanalCommand.java

    package com.penngo.canal.component;
    
    import jakarta.annotation.Resource;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;
    
    @Order(value=1)
    @Component
    public class CanalCommand implements CommandLineRunner {
        @Resource
        private CanalClient canalClient;
    
        @Override
        public void run(String... args) throws Exception {
            canalClient.run();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    启动类Main.java

    package com.penngo.canal;
    
    import jakarta.annotation.Resource;
    import org.springframework.boot.Banner;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.WebApplicationType;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import com.penngo.canal.component.CanalClient;
    import org.springframework.boot.builder.SpringApplicationBuilder;
    
    @SpringBootApplication
    public class Main {
        public static void main(String[] args) {
            new SpringApplicationBuilder(Main.class)
                    .bannerMode(Banner.Mode.CONSOLE)
                    .web(WebApplicationType.NONE)
                    .headless(true)
                    .run(args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    运行日志、执行增删改操作日志

    
      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::               (v3.0.10)
    
    2023-09-27 12:58:29.793 [main] INFO  com.penngo.canal.Main 51 - Starting Main using Java 17.0.8.1 with PID 52644 (D:\project\idea\datasfc\Canal\spring_canal2\target\classes started by Administrator in D:\project\idea\datasfc\Canal)
    2023-09-27 12:58:29.796 [main] INFO  com.penngo.canal.Main 632 - No active profile set, falling back to 1 default profile: "default"
    2023-09-27 12:58:30.372 [main] INFO  com.penngo.canal.Main 57 - Started Main in 0.987 seconds (process running for 1.54)
    rowChare ======>tableId: 1199
    eventType: INSERT
    isDdl: false
    rowDatas {
      afterColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: false
        updated: true
        isNull: false
        value: "12"
        mysqlType: "bigint unsigned"
      }
      afterColumns {
        index: 1
        sqlType: 12
        name: "title"
        isKey: false
        updated: true
        isNull: false
        value: "GraphQL\345\237\272\347\241\200\347\237\245\350\257\206\344\270\216Spring for GraphQL\344\275\277\347\224\250\346\225\231\347\250\213910"
        mysqlType: "varchar(200)"
      }
      afterColumns {
        index: 2
        sqlType: 12
        name: "url"
        isKey: false
        updated: true
        isNull: false
        value: "https://blog.csdn.net/penngo/article/details/132957429"
        mysqlType: "varchar(500)"
      }
      afterColumns {
        index: 3
        sqlType: 12
        name: "cate"
        isKey: false
        updated: true
        isNull: false
        value: "java"
        mysqlType: "varchar(10)"
      }
      afterColumns {
        index: 4
        sqlType: 93
        name: "create_date"
        isKey: false
        updated: true
        isNull: false
        value: "2023-09-26 19:00:00"
        mysqlType: "datetime"
      }
    }
    
    ================> binlog[mysql-bin.000075:62317] , name[flinktest,news] , eventType : INSERT
    id : 12    update=true
    title : GraphQL基础知识与Spring for GraphQL使用教程910    update=true
    url : https://blog.csdn.net/penngo/article/details/132957429    update=true
    cate : java    update=true
    create_date : 2023-09-26 19:00:00    update=true
    rowChare ======>tableId: 1199
    eventType: DELETE
    isDdl: false
    rowDatas {
      beforeColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: false
        updated: false
        isNull: false
        value: "4"
        mysqlType: "bigint unsigned"
      }
      beforeColumns {
        index: 1
        sqlType: 12
        name: "title"
        isKey: false
        updated: false
        isNull: false
        value: "GraphQL\345\237\272\347\241\200\347\237\245\350\257\206\344\270\216Spring for GraphQL\344\275\277\347\224\250\346\225\231\347\250\2133"
        mysqlType: "varchar(200)"
      }
      beforeColumns {
        index: 2
        sqlType: 12
        name: "url"
        isKey: false
        updated: false
        isNull: false
        value: "https://blog.csdn.net/penngo/article/details/132957429"
        mysqlType: "varchar(500)"
      }
      beforeColumns {
        index: 3
        sqlType: 12
        name: "cate"
        isKey: false
        updated: false
        isNull: false
        value: "java"
        mysqlType: "varchar(10)"
      }
      beforeColumns {
        index: 4
        sqlType: 93
        name: "create_date"
        isKey: false
        updated: false
        isNull: false
        value: "2023-09-26 19:00:00"
        mysqlType: "datetime"
      }
    }
    
    ================> binlog[mysql-bin.000075:62711] , name[flinktest,news] , eventType : DELETE
    id : 4    update=false
    title : GraphQL基础知识与Spring for GraphQL使用教程3    update=false
    url : https://blog.csdn.net/penngo/article/details/132957429    update=false
    cate : java    update=false
    create_date : 2023-09-26 19:00:00    update=false
    rowChare ======>tableId: 1199
    eventType: UPDATE
    isDdl: false
    rowDatas {
      beforeColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: false
        updated: false
        isNull: false
        value: "11"
        mysqlType: "bigint unsigned"
      }
      beforeColumns {
        index: 1
        sqlType: 12
        name: "title"
        isKey: false
        updated: false
        isNull: false
        value: "GraphQL\345\237\272\347\241\200\347\237\245\350\257\206\344\270\216Spring for GraphQL\344\275\277\347\224\250\346\225\231\347\250\213910"
        mysqlType: "varchar(200)"
      }
      beforeColumns {
        index: 2
        sqlType: 12
        name: "url"
        isKey: false
        updated: false
        isNull: false
        value: "https://blog.csdn.net/penngo/article/details/132957429"
        mysqlType: "varchar(500)"
      }
      beforeColumns {
        index: 3
        sqlType: 12
        name: "cate"
        isKey: false
        updated: false
        isNull: false
        value: "java"
        mysqlType: "varchar(10)"
      }
      beforeColumns {
        index: 4
        sqlType: 93
        name: "create_date"
        isKey: false
        updated: false
        isNull: false
        value: "2023-09-26 19:00:00"
        mysqlType: "datetime"
      }
      afterColumns {
        index: 0
        sqlType: -5
        name: "id"
        isKey: false
        updated: false
        isNull: false
        value: "11"
        mysqlType: "bigint unsigned"
      }
      afterColumns {
        index: 1
        sqlType: 12
        name: "title"
        isKey: false
        updated: true
        isNull: false
        value: "GraphQL\345\237\272\347\241\200\347\237\245\350\257\206\344\270\216Spring for GraphQL\344\275\277\347\224\250\346\225\231\347\250\21310"
        mysqlType: "varchar(200)"
      }
      afterColumns {
        index: 2
        sqlType: 12
        name: "url"
        isKey: false
        updated: false
        isNull: false
        value: "https://blog.csdn.net/penngo/article/details/132957429"
        mysqlType: "varchar(500)"
      }
      afterColumns {
        index: 3
        sqlType: 12
        name: "cate"
        isKey: false
        updated: false
        isNull: false
        value: "java"
        mysqlType: "varchar(10)"
      }
      afterColumns {
        index: 4
        sqlType: 93
        name: "create_date"
        isKey: false
        updated: false
        isNull: false
        value: "2023-09-26 19:00:00"
        mysqlType: "datetime"
      }
    }
    
    ================> binlog[mysql-bin.000075:63103] , name[flinktest,news] , eventType : UPDATE
    -------> before
    id : 11    update=false
    title : GraphQL基础知识与Spring for GraphQL使用教程910    update=false
    url : https://blog.csdn.net/penngo/article/details/132957429    update=false
    cate : java    update=false
    create_date : 2023-09-26 19:00:00    update=false
    -------> after
    id : 11    update=false
    title : GraphQL基础知识与Spring for GraphQL使用教程10    update=true
    url : https://blog.csdn.net/penngo/article/details/132957429    update=false
    cate : java    update=false
    create_date : 2023-09-26 19:00:00    update=false
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256

    3、canal配置说明

    3.1、canal.properties配置文件

    canal配置主要分为两部分定义:

    • 1.instance列表定义 (列出当前server上有多少个instance,每个instance的加载方式是spring/manager等)
    参数名字参数说明默认值
    canal.destinations当前server上部署的instance列表
    canal.conf.dirconf/目录所在的路径../conf
    canal.auto.scan开启instance自动扫描
    如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:
    a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动
    b. instance目录删除:卸载对应instance配置,如已启动则进行关闭
    c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作
    true
    canal.auto.scan.intervalinstance自动扫描的间隔时间,单位秒5
    canal.instance.global.mode全局配置加载方式spring
    canal.instance.global.lazy全局lazy模式false
    canal.instance.global.manager.address全局的manager配置方式的链接信息
    canal.instance.global.spring.xml全局的spring配置方式的组件文件classpath:spring/memory-instance.xml 
     (spring目录相对于canal.conf.dir)
    canal.instance.example.mode
    canal.instance.example.lazy
    canal.instance.example.spring.xml
    .....
    instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式
    命名规则:canal.instance.{name}.xxx
    canal.instance.tsdb.spring.xmlv1.0.25版本新增,全局的tsdb配置方式的组件文件classpath:spring/tsdb/h2-tsdb.xml (spring目录相对于canal.conf.dir)
    • 2.common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享. 【instance.properties配置定义优先级高于canal.properties】
    参数名字参数说明默认值
    canal.id每个canal server实例的唯一标识,暂无实际意义1
    canal.ipcanal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
    canal.register.ipcanal server注册到外部zookeeper、admin的ip信息 (针对docker的外部可见ip)
    canal.portcanal server提供socket服务的端口11111
    canal.zkServerscanal server链接zookeeper集群的链接信息
    例子:10.20.144.22:2181,10.20.144.51:2181
    canal.zookeeper.flush.periodcanal持久化数据到zookeeper上的更新频率,单位毫秒1000
    canal.instance.memory.batch.modecanal内存store中数据缓存模式
    1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量
    2. MEMSIZE : 根据buffer.size  * buffer.memunit的大小,限制缓存记录的大小
    MEMSIZE
    canal.instance.memory.buffer.sizecanal内存store中可缓存buffer记录数,需要为2的指数16384
    canal.instance.memory.buffer.memunit内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小1024
    canal.instance.transactionn.size最大事务完整解析的长度支持
    超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性
    1024
    canal.instance.fallbackIntervalInSecondscanal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒
    说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢
    60
    canal.instance.detecting.enable是否开启心跳检查false
    canal.instance.detecting.sql心跳检查sqlinsert into retl.xdual values(1,now()) on duplicate key update x=now()
    canal.instance.detecting.interval.time心跳检查频率,单位秒3
    canal.instance.detecting.retry.threshold心跳检查失败重试次数3
    canal.instance.detecting.heartbeatHaEnable心跳检查失败后,是否开启自动mysql自动切换
    说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据
    false
    canal.instance.network.receiveBufferSize网络链接参数,SocketOptions.SO_RCVBUF16384
    canal.instance.network.sendBufferSize网络链接参数,SocketOptions.SO_SNDBUF16384
    canal.instance.network.soTimeout网络链接参数,SocketOptions.SO_TIMEOUT30
    canal.instance.filter.druid.ddl是否使用druid处理所有的ddl解析来获取库和表名

    true

    canal.instance.filter.query.dcl是否忽略dcl语句false
    canal.instance.filter.query.dml是否忽略dml语句
    (mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档)
    false
    canal.instance.filter.query.ddl是否忽略ddl语句false
    canal.instance.filter.table.error

    是否忽略binlog表结构获取失败的异常

    (主要解决回溯binlog时,对应表已被删除或者表结构和binlog不一致的情况)

    false
    canal.instance.filter.rows

    是否dml的数据变更事件

    (主要针对用户只订阅ddl/dcl的操作)

    false
    canal.instance.filter.transaction.entry是否忽略事务头和尾,比如针对写入kakfa的消息时,不需要写入TransactionBegin/Transactionend事件false
    canal.instance.binlog.format支持的binlog format格式列表
    (otter会有支持format格式限制)
    ROW,STATEMENT,MIXED
    canal.instance.binlog.image支持的binlog image格式列表
    (otter会有支持format格式限制)
    FULL,MINIMAL,NOBLOB
    canal.instance.get.ddl.isolation

    ddl语句是否单独一个batch返回

    (比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致)

    false
    canal.instance.parser.parallel

    是否开启binlog并行解析模式

    (串行解析资源占用少,但性能有瓶颈, 并行解析可以提升近2.5倍+)

    true
    canal.instance.parser.parallelBufferSizebinlog并行解析的异步ringbuffer队列
    (必须为2的指数)

    256

    canal.instance.tsdb.enable是否开启tablemeta的tsdb能力true
    canal.instance.tsdb.dir主要针对h2-tsdb.xml时对应h2文件的存放目录,默认为conf/xx/h2.mv.db${canal.file.data.dir:../conf}/${canal.instance.destination:}
    canal.instance.tsdb.url

    jdbc url的配置

    (h2的地址为默认值,如果是mysql需要自行定义)

    jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
    canal.instance.tsdb.dbUsername

    jdbc url的配置

    (h2的地址为默认值,如果是mysql需要自行定义)

    canal
    canal.instance.tsdb.dbPassword

    jdbc url的配置

    (h2的地址为默认值,如果是mysql需要自行定义)

    canal
    canal.instance.rds.accesskey

    aliyun账号的ak信息

    (如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值

    canal.instance.rds.secretkey

    aliyun账号的sk信息

    (如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)

    canal.admin.manager canal链接canal-admin的地址 (v1.1.4新增)
    canal.admin.port

    admin管理指令链接端口 (v1.1.4新增)

    11110
    canal.admin.user

    admin管理指令链接的ACL配置 (v1.1.4新增)

    admin
    canal.admin.passwd

    admin管理指令链接的ACL配置 (v1.1.4新增)

    密码默认值为admin的密文
    canal.user

    canal数据端口订阅的ACL配置 (v1.1.4新增)

    如果为空,代表不开启

    canal.passwd

    canal数据端口订阅的ACL配置 (v1.1.4新增)

    如果为空,代表不开启

    3.2、instance.properties参数列表:

    instance.properties参数列表:

    参数名字参数说明默认值
    canal.instance.mysql.slaveIdmysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一
    (v1.1.x版本之后canal会自动生成,不需要手工指定)
    canal.instance.master.addressmysql主库链接地址127.0.0.1:3306
    canal.instance.master.journal.namemysql主库链接时起始的binlog文件
    canal.instance.master.positionmysql主库链接时起始的binlog偏移量
    canal.instance.master.timestampmysql主库链接时起始的binlog的时间戳
    canal.instance.gtidon是否启用mysql gtid的订阅模式false
    canal.instance.master.gtidmysql主库链接时对应的gtid位点
    canal.instance.dbUsernamemysql数据库帐号canal
    canal.instance.dbPasswordmysql数据库密码canal
    canal.instance.defaultDatabaseNamemysql链接时默认schema 
    canal.instance.connectionCharsetmysql 数据解析编码UTF-8
    canal.instance.filter.regex

    mysql 数据解析关注的表,Perl正则表达式.

    多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)


    常见例子:

    1.  所有表:.*   or  .*\\..*
    2.  canal schema下所有表: canal\\..*
    3.  canal下的以canal打头的表:canal\\.canal.*
    4.  canal schema下的一张表:canal\\.test1

    5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

    .*\\..*
    canal.instance.filter.black.regex

    mysql 数据解析表的黑名单,表达式规则见白名单的规则

    canal.instance.rds.instanceId
    aliyun rds对应的实例id信息
    (如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)

    参考instance.properties介绍:https://github.com/alibaba/canal/wiki/AdminGuide#instanceproperties%E4%BB%8B%E7%BB%8D

  • 相关阅读:
    大珩PPT助手一键颜色设置
    nmcli 命令行设置 ipv4 ipv6 ip 网关等
    Java线程池创建方式和应用场景
    02-JVM学习记录-运行时数据区
    机器学习 第11章 特征选择与稀疏学习
    解决雪花算法生成的ID传输前端后精度丢失
    文献阅读(44)—— 基于眼底照的近视深度学习算法和区块链平台,以促进人工智能医学研究:回顾性多队列研究
    力扣:583. 两个字符串的删除操作
    分布式调度框架Elastic-Job和xxl-job区别
    详解中断系统
  • 原文地址:https://blog.csdn.net/penngo/article/details/133347290