• Canal 安装与入门


    MySQL Binlog 简介

    https://blog.csdn.net/weixin_44371237/article/details/127904514

    MySQL 主从复制过程

    1)Master 主库将改变记录,写到二进制日志(Binary Log)中;

    2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log);

    3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

    Canal 的工作原理

    把自己伪装成 Slave,假装从 Master 复制数据

    使用场景:同步数据库到redis

    在这里插入图片描述

    MySQL 的准备

    启动MySQL
    service mysqld start

    登录 mysql
    mysql -uroot -proot123456

    创建数据库canal,表 test

    赋权限

    mysql> set global validate_password_length=4; 
    mysql> set global validate_password_policy=0; 
    mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
    
    • 1
    • 2
    • 3

    查看mysql数据库,可以看到用户已生效
    在这里插入图片描述

    Canal 的下载和安装

    下载并解压 Jar 包
    https://github.com/alibaba/canal/releases

    上传jar包到software,在/opt/module/创建canal文件夹,再解压
    tar -zxvf /software/canal.deployer-1.1.2.tar.gz -C /opt/module/canal

    修改 canal.properties 的配置(不用改)
    vim /opt/module/canal/conf/canal.properties

    进入
    cd /opt/module/canal/conf/example

    修改配置文件 vim instance.properties

    ## 只要跟/etc/my.cnf的server.id=1不一样就行
    canal.instance.mysql.slaveId=20
    canal.instance.master.address=hadoop100:3306
    
    • 1
    • 2
    • 3

    启动
    /opt/module/canal/bin/startup.sh

    jps查看有CanalLauncher进程
    在这里插入图片描述

    TCP 模式测试

    pom.xml

    
    
        4.0.0
    
        com.chen
        canal
        1.0-SNAPSHOT
    
        
            8
            8
        
        
            
                com.alibaba.otter
                canal.client
                1.1.2
            
            
                org.apache.kafka
                kafka-clients
                2.4.1
            
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    CanalClient

    package com.chen;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.protobuf.ByteString;
    import com.google.protobuf.InvalidProtocolBufferException;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    public class CanalClient {
        public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
            //连接器
            CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop100", 11111), "example", "", "");
    
            while (true){
                //连接
                canalConnector.connect();
                //订阅数据库
                canalConnector.subscribe("canal.*");
                //获取数据
                Message message = canalConnector.get(100);
                //获取Entry集合
                List entries = message.getEntries();
                //判断集合是否为空,为空则等待一会继续拉取
                if (entries.size()<=0){
                    System.out.println("当次抓去没有数据,休息一会儿。。。");
                    Thread.sleep(1000);
                }else {
                    //遍历entries,单条解析
                    for (CanalEntry.Entry entry : entries) {
                        //获取表名
                        String tableName = entry.getHeader().getTableName();
                        //获取类型
                        CanalEntry.EntryType entryType = entry.getEntryType();
                        //获取序列化后的数据
                        ByteString storeValue = entry.getStoreValue();
                        //判断entry类型是否为ROWDATA类型
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                            //反序列化
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            //获取当前事件操作类型
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            //获取数据集
                            List rowDatasList = rowChange.getRowDatasList();
                            //遍历
                            for (CanalEntry.RowData rowData : rowDatasList) {
                                //改变前数据
                                JSONObject jsonObjectBefore = new JSONObject();
                                List beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    jsonObjectBefore.put(column.getName(),column.getValue());
                                }
                                //改变后数据
                                JSONObject jsonObjectAfter = new JSONObject();
                                List afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    jsonObjectAfter.put(column.getName(),column.getValue());
                                }
                                System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
                            }
                        }else {
                            System.out.println("当前操作类型为:"+entryType);
                        }
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    运行程序

    然后向canal数据库的test表插入数据

    insert into canal.test values(1,'aaa'); 
    
    • 1

    控制台输出如下
    在这里插入图片描述

    Kafka 模式测试

    修改 canal.properties 的配置
    vim /opt/module/canal/conf/canal.properties

    canal.serverMode = kafka
    canal.mq.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092
    
    • 1
    • 2

    修改instance.properties
    vim /opt/module/canal/conf/example/instance.properties

    canal.mq.topic=canal
    
    • 1

    启动zookeeper和kafka
    /home/zk.sh start
    /home/kafka.sh start

    启动canal
    /opt/module/canal/bin/startup.sh

    启动kafka消费者canal

    bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic canal
    
    • 1

    向canal数据库的test表插入数据

    insert into canal.test values(8,'aaa'); 
    
    • 1

    可以看到kafka消费者接收到如下
    在这里插入图片描述

  • 相关阅读:
    【uniapp基础篇】实现微信登录
    蓝桥杯 第 3 场算法双周赛4,7题
    grid 布局 grid-column-gap 使用后内容超出网格
    多目标进化算法详细讲解及代码实现(样例:MOEA/D、NSGA-Ⅱ求解多目标(柔性)作业车间调度问题)
    微服务的快速开始(nacos)最全快速配置图解
    PHP修改上传文件大小的方法
    第二章:人工智能深度学习教程-深度学习简介
    香港Web3媒体:Techub News
    基于vue项目的代码优化
    预编码ZF,MMSE,THP准则线性预编码误码率仿真
  • 原文地址:https://blog.csdn.net/weixin_44371237/article/details/127905093