• Canal整合SpringBoot详解(一)


    文章目录

    Canal整合SpringBoot详解(一)

    什么是canal

    • canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
    • canal工作原理:
      • canal的工作原理就是把自己伪装成MySQL slave从节点,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如RocketMQ、Kafka、ElasticSearch等等。
    • canal能做什么:
      • 数据库镜像
      • 数据库实时备份
      • 索引构建和实时维护
      • 业务cache(缓存)刷新
      • 带业务逻辑的增量数据处理

    搭建Kafka3.2.1集群⭐

    Kafka集群机器规划
    IP地址主机名需要安装的资源操作系统
    192.168.184.201kafka01jdk、Docker、zookeeper、Kafkacentos7.9
    192.168.184.202kafka02jdk、Docker、zookeeper、Kafkacentos7.9
    192.168.184.203kafka03jdk、Docker、zookeeper、Kafkacentos7.9
    创建3台虚拟机(centos7系统)

    在这里插入图片描述在这里插入图片描述
    在这里插入图片描述

    必要的环境准备(3台虚拟机都要执行如下操作)⭐
    分别修改每个服务器的hosts文件(将上面的ip和主机名配置上去)
    • 1:进入hosts文件:
    vi /etc/hosts
    
    • 1

    在最后面追加内容如下:(这个需要根据你自己服务器的ip来配置)

    192.168.184.201 kafka01
    192.168.184.202 kafka02
    192.168.184.203 kafka03
    
    • 1
    • 2
    • 3
    分别关闭每个服务器的防火墙
    systemctl stop firewalld
    systemctl disable firewalld
    
    • 1
    • 2
    分别为每个服务器安装jdk8
    • 1:进入oracle官网下载jdk8的tar.gz包:

    • 2:将下载好的包上传到每个服务器上:

    • 3:查看是否上传成功:

    [root@kafka01 ~]# ls
    anaconda-ks.cfg  jdk-8u333-linux-x64.tar.gz
    
    • 1
    • 2
    • 4:创建文件夹:
    mkdir -p /usr/java/
    
    • 1
    • 5:解压刚刚下载好的包并输出到/usr/java目录下:
    tar -zxvf jdk-8u333-linux-x64.tar.gz -C /usr/java/
    
    • 1
    [root@kafka02 ~]# ls /usr/java/
    jdk1.8.0_333
    
    • 1
    • 2
    • 6:配置java环境变量:
    vi /etc/profile
    
    • 1

    在文件中末尾添加如下配置:(需要更改的是JAVA_HOME,根据自己的java目录名来更改)

    JAVA_HOME=/usr/java/jdk1.8.0_333
    CLASSPATH=$JAVA_HOME/lib/
    PATH=$PATH:$JAVA_HOME/bin
    export PATH JAVA_HOME CLASSPATH
    
    • 1
    • 2
    • 3
    • 4
    • 7:让配置立即生效:
    source /etc/profile
    
    • 1
    • 8:查看JDK是否安装成功:
    [root@kafka01 ~]# java -version
    java version "1.8.0_333"
    Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
    Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
    
    • 1
    • 2
    • 3
    • 4
    分别为每个服务器安装Docker
    • 1:切换镜像源
    wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo
    
    • 1
    • 2:查看当前镜像源中支持的docker版本
    yum list docker-ce --showduplicates | sort -r
    
    • 1
    • 3:安装特定版本的docker-ce
    yum -y install docker-ce-3:20.10.8-3.el7.x86_64 docker-ce-cli-3:20.10.8-3.el7.x86_64 containerd.io
    
    • 1
    为每个节点的Docker接入阿里云镜像加速器

    配置镜像加速器方法。

    • 准备工作:
    • 1:首先进入阿里云容器镜像服务 https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
    • 2:点击镜像工具下面的镜像加速器
    • 3:拿到你的加速器地址和下面第二步的registry-mirrors的值替换即可。

    针对Docker客户端版本大于 1.10.0 的用户,可以通过修改daemon配置文件/etc/docker/daemon.json来使用加速器

    • 第一步:
    mkdir -p /etc/docker
    
    • 1
    • 第二步:
    cat <<EOF> /etc/docker/daemon.json
    {
      "exec-opts": ["native.cgroupdriver=systemd"],	
      "registry-mirrors": [
        "https://u01jo9qv.mirror.aliyuncs.com",
        "https://hub-mirror.c.163.com",
        "https://mirror.baidubce.com"
      ],
      "live-restore": true,
      "log-driver":"json-file",
      "log-opts": {"max-size":"500m", "max-file":"3"},
      "max-concurrent-downloads": 10,
      "max-concurrent-uploads": 5,
      "storage-driver": "overlay2"
    }
    EOF
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 第三步:
    sudo systemctl daemon-reload
    
    • 1
    • 第四步:
    sudo systemctl restart docker
    
    • 1

    最后就接入阿里云容器镜像加速器成功啦。

    为每个节点的docker设置开机自动启动
    sudo systemctl enable docker
    
    • 1
    分别为每个服务器安装zookeeper3.7.1(搭建zookeeper集群)⭐
    • 1:在zookeeper官网上面下载zookeeper稳定版(当前为3.7.1)的tar.gz包,并上传到每个服务器上:

    zookeeper官网

    在这里插入图片描述在这里插入图片描述

    • 2:查看刚刚上传的zookeeper包:
    [root@kafka01 ~]# pwd
    /root
    [root@kafka01 ~]# ls | grep zookeeper
    apache-zookeeper-3.7.1-bin.tar.gz
    
    • 1
    • 2
    • 3
    • 4
    • 3:解压我们的zookeeper包:
    tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /usr/local
    
    • 1
    mv /usr/local/apache-zookeeper-3.7.1-bin/ /usr/local/zookeeper
    
    • 1
    cd /usr/local/zookeeper
    
    • 1
    • 4:配置关于zookeeper的环境变量:
    vi /etc/profile
    
    • 1

    在文件中末尾添加如下配置:(ZOOKEEPER_HOME需要根据你自己的zookeeper目录来配置)

    export ZOOKEEPER_HOME=/usr/local/zookeeper
    export PATH=$ZOOKEEPER_HOME/bin:$PATH
    
    • 1
    • 2
    • 5:让配置立即生效:
    source /etc/profile
    
    • 1
    • 6:创建目录:
    cd /usr/local/zookeeper
    sudo mkdir data
    
    • 1
    • 2
    • 7;添加配置:
    cd conf
    sudo vi zoo.cfg
    
    • 1
    • 2

    内容如下:(dataDir修改成自己的目录,kafka01/02/03是我们在hosts配置的主机名映射,相当于ip)

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/usr/local/zookeeper/data
    clientPort=2181
    server.1=kafka01:2888:3888
    server.2=kafka02:2888:3888
    server.3=kafka03:2888:3888
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    initLimit:ZooKeeper集群模式下包含多个zk进程,其中一个进程为leader,余下的进程为follower。
    当follower最初与leader建立连接时,它们之间会传输相当多的数据,尤其是follower的数据落后leader很多。initLimit配置follower与leader之间建立连接后进行同步的最长时间。

    syncLimit:配置follower和leader之间发送消息,请求和应答的最大时间长度。

    tickTime:tickTime则是上述两个超时配置的基本单位,例如对于initLimit,其配置值为5,说明其超时时间为 2000ms * 5 = 10秒。

    server.id=host:port1:port2其中id为一个数字,表示zk进程的id,这个id也是dataDir目录下myid文件的内容。host是该zk进程所在的IP地址,port1表示follower和leader交换消息所使用的端口,port2表示选举leader所使用的端口。

    dataDir:其配置的含义跟单机模式下的含义类似,不同的是集群模式下还有一个myid文件。myid文件的内容只有一行,且内容只能为1 - 255之间的数字,这个数字亦即上面介绍server.id中的id,表示zk进程的id。

    • 8:进入data目录:
    cd /usr/local/zookeeper/data/
    
    • 1
    • 9:对每个服务器(kafka01、kafka02、kafka03)配置myid文件:
    • 9(1):如果是kafka01服务器,则执行下面这个:(下面的1、2、3就是我们上面指定的server.id,每个zookeeper服务器都要有一个id,并且全局唯一)
    echo "1" > myid
    
    • 1
    • 9(2):如果是kafka02服务器,则执行下面这个:
    echo "2" > myid
    
    • 1
    • 9(3):如果是kafka03服务器,则执行下面这个
    echo "3" > myid
    
    • 1
    • 10:启动zookeeper服务命令:(必须要把全部zookeeper服务器启动之后在执行下一步status命令)
    cd /usr/local/zookeeper/bin/
    
    • 1
    [root@kafka01 bin]# zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    
    • 1
    • 2
    • 3
    • 4
    • 11:对全部的zookeeper服务器执行查看zookeeper集群节点状态命令:(看看哪个是leader节点、哪个是follower节点)。Mode就是某一台zookeeper的角色
    [root@kafka01 bin]# zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost. Client SSL: false.
    Mode: follower
    
    • 1
    • 2
    • 3
    • 4
    • 5
    [root@kafka02 data]# zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost. Client SSL: false.
    Mode: leader
    
    • 1
    • 2
    • 3
    • 4
    • 5
    [root@kafka03 data]# zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost. Client SSL: false.
    Mode: follower
    
    • 1
    • 2
    • 3
    • 4
    • 5
    分别为每个服务器安装Kafka3.2.1(搭建Kafka集群)⭐
    • 1:进入kafka官网:

    Kafka官网

    • 2(下载方式1):下载当前kafka的Binary稳定版(截止到2022-08-29,稳定版本为3.2.1),下载会十分缓慢,大约要1个小时的时间(假如你的网速很慢,那么这种方式就不推荐了。):

    在这里插入图片描述

    • 2(下载方式2):使用我上传kafka_2.13-3.2.1.zip包(注意这个不是tgz包,而是zip包)(推荐这种方式),下载速度很快:

    kafka3.2.1快速下载地址

    在这里插入图片描述

    • 3:解压kafka_2.13-3.2.1.zip包,拿到kafka的tgz包:

    在这里插入图片描述

    • 4:将解压好的kafka的tgz包上传到每个服务器上。
    • 5:查看每个服务器上是否都已经成功上传了kafka_2.13-3.2.1.tgz包:
    [root@kafka01 ~]# pwd
    /root
    [root@kafka01 ~]# ls | grep kafka
    kafka_2.13-3.2.1.tgz
    
    • 1
    • 2
    • 3
    • 4
    • 6:解压kafka.tgz包到/usr/local下:
    tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/
    
    • 1
    • 7:修改kafka目录:
    cd /usr/local/
    
    • 1
    mv kafka_2.13-3.2.1/ kafka
    
    • 1
    • **8:修改每个服务器的kafka配置文件:(注意:对应的机器要执行对应的命令,不是都在一台服务器执行)**⭐

      • 8(1):在kafka01服务器上修改的配置文件,将下面的内容粘贴上去:⭐
      [root@kafka01 local]# rm -f /usr/local/kafka/config/server.properties
      [root@kafka01 local]# vi /usr/local/kafka/config/server.properties
      
      • 1
      • 2

      内容如下:

      注意下面3个地方:

      ①每一个kafka的broker.id都不可以一样,并且要为数字(比如0、1、2都是可以的)!

      ②log.dirs为你当前机器的kafka的日志数据存储目录

      ③zookeeper.connect:配置连接Zookeeper集群地址,下面的kafka01:2181(kafka01的意思是zk所在的服务器的ip地址,因为我们配置了hosts,所以就直接用主机名更方便;2181就是zk配置文件中的clientPort)

      #broker 的全局唯一编号,不能重复,只能是数字。
      broker.id=1
      #处理网络请求的线程数量
      num.network.threads=3
      #用来处理磁盘 IO 的线程数量
      num.io.threads=8
      #发送套接字的缓冲区大小
      socket.send.buffer.bytes=102400
      #接收套接字的缓冲区大小
      socket.receive.buffer.bytes=102400
      #请求套接字的缓冲区大小
      socket.request.max.bytes=104857600
      #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
      log.dirs=/usr/local/kafka/datas
      #topic 在当前 broker 上的分区个数
      num.partitions=1
      #用来恢复和清理 data 下数据的线程数量
      num.recovery.threads.per.data.dir=1
      # 每个 topic 创建时的副本数,默认时 1 个副本
      offsets.topic.replication.factor=1
      #segment 文件保留的最长时间,超时将被删除
      log.retention.hours=168
      #每个 segment 文件的大小,默认最大 1G
      log.segment.bytes=1073741824
      # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
      log.retention.check.interval.ms=300000
      #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理)
      zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 8(2):在kafka02服务器上修改的配置文件,将下面的内容粘贴上去:⭐
      [root@kafka02 local]# rm -f /usr/local/kafka/config/server.properties
      [root@kafka02 local]# vi /usr/local/kafka/config/server.properties
      
      • 1
      • 2

      内容如下:

      #broker 的全局唯一编号,不能重复,只能是数字。
      broker.id=2
      #处理网络请求的线程数量
      num.network.threads=3
      #用来处理磁盘 IO 的线程数量
      num.io.threads=8
      #发送套接字的缓冲区大小
      socket.send.buffer.bytes=102400
      #接收套接字的缓冲区大小
      socket.receive.buffer.bytes=102400
      #请求套接字的缓冲区大小
      socket.request.max.bytes=104857600
      #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
      log.dirs=/usr/local/kafka/datas
      #topic 在当前 broker 上的分区个数
      num.partitions=1
      #用来恢复和清理 data 下数据的线程数量
      num.recovery.threads.per.data.dir=1
      # 每个 topic 创建时的副本数,默认时 1 个副本
      offsets.topic.replication.factor=1
      #segment 文件保留的最长时间,超时将被删除
      log.retention.hours=168
      #每个 segment 文件的大小,默认最大 1G
      log.segment.bytes=1073741824
      # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
      log.retention.check.interval.ms=300000
      #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理)
      zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 8(3):在kafka03服务器上修改的配置文件,将下面的内容粘贴上去:⭐
      [root@kafka03 local]# rm -f /usr/local/kafka/config/server.properties
      [root@kafka03 local]# vi /usr/local/kafka/config/server.properties
      
      • 1
      • 2

      内容如下:

      #broker 的全局唯一编号,不能重复,只能是数字。
      broker.id=3
      #处理网络请求的线程数量
      num.network.threads=3
      #用来处理磁盘 IO 的线程数量
      num.io.threads=8
      #发送套接字的缓冲区大小
      socket.send.buffer.bytes=102400
      #接收套接字的缓冲区大小
      socket.receive.buffer.bytes=102400
      #请求套接字的缓冲区大小
      socket.request.max.bytes=104857600
      #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
      log.dirs=/usr/local/kafka/datas
      #topic 在当前 broker 上的分区个数
      num.partitions=1
      #用来恢复和清理 data 下数据的线程数量
      num.recovery.threads.per.data.dir=1
      # 每个 topic 创建时的副本数,默认时 1 个副本
      offsets.topic.replication.factor=1
      #segment 文件保留的最长时间,超时将被删除
      log.retention.hours=168
      #每个 segment 文件的大小,默认最大 1G
      log.segment.bytes=1073741824
      # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
      log.retention.check.interval.ms=300000
      #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理)
      zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
    • 9:给每个服务器都配置kafka的环境变量:

    sudo vim /etc/profile
    
    • 1

    在最后面追加的内容如下:

    export KAFKA_HOME=/usr/local/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    
    • 1
    • 2
    • 10:让配置立即生效:
    source /etc/profile
    
    • 1
    • 11:启动zk集群。依次在 kafka01、kafka02、kafka03节点上启动zookeeper。(zk要先启动,然后再启动kafka)⭐
    /usr/local/zookeeper/bin/zkServer.sh start
    
    • 1
    • 12:后台模式启动kafka集群。依次在 kafka01、kafka02、kafka03节点上启动kafka。
    kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
    
    • 1
    • 13:查看kafka是否启动成功:
    [root@kafka01 local]# jps
    3603 Kafka
    3166 QuorumPeerMain
    4367 Jps
    
    • 1
    • 2
    • 3
    • 4
    • 14:关闭kafka集群:(可以暂时不关闭,方便后面继续演示)
      • 注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
        集群。
    kafka-server-stop.sh
    
    • 1
    • 15:等kafka集群全部关闭之后再关闭zookeeper:(可以暂时不关闭,方便后面继续演示)
    zkServer.sh stop
    
    • 1
    安装MySQL5.7,配置canal+mysql(本次采用Docker的方式)⭐
    • 1:创建my.cnf文件(也就是mysql的配置文件)
    vim /my-sql/mysql-master/conf/my.cnf
    
    • 1

    将内容粘贴进my.cnf文件

    [client]
    # 指定编码格式为utf8,默认的MySQL会有中文乱码问题
    default_character_set=utf8
    [mysqld]
    collation_server=utf8_general_ci
    character_set_server=utf8
    
    # 全局唯一id(不允许有相同的)
    server_id=201
    binlog-ignore-db=mysql
    # 指定MySQL二进制日志
    log-bin=mysql-bin
    # 二进制日志格式,因为要整合canal,所以这里必须要是row
    binlog_format=row
    #指定具体要同步的数据库,如果不配置则表示所有数据库均开启 Binlog(可以配置多个)
    binlog-do-db=canal-test-db1
    binlog-do-db=canal-test-db2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 2:运行一个mysql容器实例。作为Master节点
    docker run -p 3307:3306 \
    -v /my-sql/mysql-master/log:/var/log/mysql \
    -v /my-sql/mysql-master/data:/var/lib/mysql \
    -v /my-sql/mysql-master/conf:/etc/mysql \
    -e MYSQL_ROOT_PASSWORD=123456 \
    --name mysql-master \
    -d mysql:5.7
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 3:进入容器内部,并登陆mysql
    docker exec -it mysql-master /bin/bash
    mysql -uroot -p
    
    • 1
    • 2
    • 4:创建canal的mysql帐号,使该canal帐号具有MySQL的Slave (从节点)的权限(也就是能够主从复制), 如果已有账户可直接 grant(这几步都是在mysql容器内部进行,也就是登录了mysql帐号后执行的命令)
    CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    grant all privileges on *.* to 'canal'@'%' identified by 'canal';
    flush privileges;
    
    • 1
    • 2
    • 3
    • 5:退出mysql容器,并重启容器:
    docker restart mysql-master
    
    • 1
    • 6:再次进入容器内部,并登陆mysql
    docker exec -it mysql-master /bin/bash
    mysql -uroot -p
    
    • 1
    • 2
    • 7:查看是否成功开启binlog日志
    show variables like '%log_bin%';
    
    • 1

    案例1:Canal+Kafka实现mysql和redis的数据同步⭐

    案例目的:

    1:实现canal只监控canal-test-db1数据库下的t_config(主要同步这个表)和.t_user表。

    2:当我们修改canal-test-db1数据库下的t_config表的内容会自动同步到Redis中;

    3:当我们修改canal-test-db1数据库下的t_user表则不会同步。(虽然t_user表也被canal监控,但是这个案例就要做到在被监控的情况下,而不被同步),说白了就是只同步t_config表。

    必要的环境
    • 1:jdk8
    • 2:zookeeper
    • 3:kafka
    • 4:canal.deployer
    • 5:Redis
    • 6:Lombok
    配置canal.deployer
    • 1:进入Canal的github仓库:

    Canal的github仓库地址

    • 2:选择canal.deployer的版本(我们选择的是最新版v1.1.6):

      • 2(1)方式1:直接从GitHub上面下载。(下载速度十分慢,不推荐)

    在这里插入图片描述在这里插入图片描述
    在这里插入图片描述

    • 2(2)方式2:从我的csdn上面下载。(速度很快,推荐!⭐)

    canal.deployer快速下载地址

    在这里插入图片描述

    • 3:上传到我们的服务器上(这里我们就拿kafka01服务器作为canal服务器),生产环境可以另外创建一个新的canal服务器。

    • 4:查看canal是否上传到我们的服务器上:(只上传到kafka01服务器上)

    [root@kafka01 ~]# ls | grep canal
    canal.deployer-1.1.6.tar.gz
    
    • 1
    • 2
    • 5:解压canal.deployer:
    mkdir -p /usr/local/canal-deployer
    
    • 1
    tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-deployer
    
    • 1
    cd /usr/local/canal-deployer/conf
    
    • 1
    • 6:修改canal.properties文件:
    vim /usr/local/canal-deployer/conf/canal.properties
    
    • 1
    • 修改地方1(配置zookeeper集群地址):

    在这里插入图片描述

    • 修改地方2(修改成kafka模式):

    在这里插入图片描述

    • 修改地方3(修改canal数据库用户的账号密码):

    在这里插入图片描述

    • 修改地方4(在conf目录下要有example同名的目录,可以默认不改,意思就是instance.properties在/usr/local/canal-deployer/conf/example目录下。):
      • 例如要将example改成abc1,则也要在conf目录下创建一个abc1的目录,并在里面创建instance.properties配置文件。

    在这里插入图片描述

    • 修改地方5(配置kafka集群地址):

    在这里插入图片描述

    • 7:配置instance.properties配置文件:(默认是在example目录下)
    cd /usr/local/canal-deployer/conf/example
    
    • 1
    vim instance.properties
    
    • 1
    • 修改地方1。canal数据库的id,必须要全局唯一(和mysql的id不能设置一样):

    在这里插入图片描述

    • 修改地方2。我们MySQL的master数据库的ip+端口(我们上面设置mysql的是3307端口):

    在这里插入图片描述

    • 修改地方3。在MySQL的master数据库中canal的账号密码:

    在这里插入图片描述

    • 修改地方4。新增一个配置,设置默认同步的数据库名:⭐
      • canal.instance.defaultDatabaseName =监控的数据库名
        • 例如canal.instance.defaultDatabaseName=canal-test-db1

    在这里插入图片描述

    • 修改地方5:匹配表名的正则表达式:(指定canal要监控的数据库.表名)很重要⭐
      • canal.instance.filter.regex=canal-test-db1.t_config,canal-test-db1.t_user

    在这里插入图片描述

    • 修改地方6:指定用于canal传输消息的kafka的topic名称:(我们指定的topic名称为canal-test-topic)

    在这里插入图片描述

    启动canal.deployer⭐
    • 1:跳转目录:
    cd /usr/local/canal-deployer/bin/
    
    • 1
    • 2:执行sh:
    ./startup.sh
    
    • 1
    配置hosts(由于我是Windows运行,没有配置hosts导致无法识别kafka01主机名)⭐
    • :修改C:\Windows\System32\drivers\etc路径下的hosts文件:

    在这里插入图片描述

    创建一个SpringBoot项目⭐
    项目结构

    在这里插入图片描述

    准备需要同步的数据库表⭐
    CREATE DATABASE `canal-test-db1`;
    
    USE `canal-test-db1`;
    
    CREATE TABLE `t_config` (
      `config_id` bigint(20) NOT NULL,
      `config_info` text,
      `datetime` datetime DEFAULT NULL,
      `desc` varchar(255) DEFAULT NULL,
      PRIMARY KEY (`config_id`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    pom.xml⭐
    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
    
        <groupId>org.examplegroupId>
        <artifactId>canal-demoartifactId>
        <version>1.0-SNAPSHOTversion>
    
        <properties>
            
            <spring-boot.version>2.5.9spring-boot.version>
            <maven.compiler.source>8maven.compiler.source>
            <maven.compiler.target>8maven.compiler.target>
        properties>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-dependenciesartifactId>
                    <version>${spring-boot.version}version>
                    <type>pomtype>
                    <scope>importscope>
                dependency>
            dependencies>
        dependencyManagement>
    
        <dependencies>
    
    
            <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafkaartifactId>
            dependency>
    
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-data-redisartifactId>
            dependency>
    
            
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.70version>
            dependency>
    
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
            
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <version>1.18.12version>
            dependency>
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
    
        dependencies>
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    application.yml⭐
    server:
      port: 8081
    # spring整合kafka配置
    spring:
      kafka:
        # kafka集群地址(可以多个)
        bootstrap-servers:
          - 192.168.184.201:9092
          - 192.168.184.202:9092
          - 192.168.184.203:9092
        #kafka消费者配置
        consumer:
          # 指定一个消费者组id
          group-id: canal-group1
          # key/value的反序列化
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          #手动提交第1步:开启手动提交offset(true的话就是消费完一条消息自动会提交)
          enable-auto-commit: false
        # kafka生产者配置
        producer:
          # key/value的序列化
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        listener:
          #手动提交第2步:ack设置为手动(enable-auto-commit要设置为false)
          # manual_immediate:每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
          ack-mode: manual_immediate
      #redis配置
      redis:
        host: 127.0.0.1
        #    password:
        port: 6379
        database: 2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    RedisTemplateConfig(配置类)
    package com.boot.config;
    
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    @Configuration
    public class RedisTemplateConfig {
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
            Jackson2JsonRedisSerializer jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            redisTemplate.setKeySerializer(stringRedisSerializer);
            redisTemplate.setHashKeySerializer(stringRedisSerializer);
            redisTemplate.setValueSerializer(jsonRedisSerializer);
            redisTemplate.setHashValueSerializer(jsonRedisSerializer);
            // 解决查询缓存转换异常的问题
            ObjectMapper om = new ObjectMapper();
            // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jsonRedisSerializer.setObjectMapper(om); //如果不设置,存储到redis的对象取出来将无法进行转换
            redisTemplate.setDefaultSerializer(jsonRedisSerializer);
            return redisTemplate;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    Config.class⭐
    package com.boot.entity;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.experimental.Accessors;
    import java.io.Serializable;
    
    /**
     * 该实体类对应着数据库表t_config字段
     * @author youzhengjie 2022-09-01 16:55:40
     */
    
    //lombok注解简化开发
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Accessors(chain = true) //开启链式编程
    public class Config implements Serializable {
        private Long configId;
        private String configInfo;
        private String datetime;
        private String desc;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    canal要求必须要的实体类(用于接收canal发送到kafka的同步消息)⭐
    ConfigCanalBean.class⭐
    package com.boot.entity.config_canal;
    
    import com.boot.entity.Config;
    import lombok.Data;
    
    import java.util.List;
    
    /**
     * 这个类是接收canal发送过来的消息所必须要的
     * @author youzhengjie 2022-09-01 16:55:18
     */
    @Data
    public class ConfigCanalBean {
        //config实体类的数据
        private List<Config> data;
        //数据库名称
        private String database;
        private long es;
        //递增
        private int id;
        //是否是DDL语句
        private boolean isDdl;
        //表结构的字段类型
        private MysqlType mysqlType;
        //UPDATE语句,旧数据
        private String old;
        //主键名称
        private List<String> pkNames;
        //sql语句
        private String sql;
        //暂时没发现什么用,不过也要写上这个属性
        private SqlType sqlType;
        //表名
        private String table;
        private long ts;
        //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
        private String type;
        //getter、setter方法
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    MysqlType.class⭐
    package com.boot.entity.config_canal;
    
    import lombok.Data;
    
    /**
     * 和SqlType类差不多。(就是把我们要同步的t_config数据库表的字段全部复制到这里,然后全部改成String类型即可)
     * 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应,而无需更改。
     * 注意:这个类的属性全部都要是String类型
     * @author youzhengjie 2022-09-01 16:55:25
     */
    @Data
    public class MysqlType {
    
        private String configId;
    
        private String configInfo;
    
        private String datetime;
    
        private String desc;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    SqlType.class⭐
    package com.boot.entity.config_canal;
    
    import lombok.Data;
    
    /**
     * 和MysqlType类差不多。(就是把我们要同步的t_config数据库表的字段全部复制到这里,然后全部改成int类型即可)
     * 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应,而无需更改。
     * 注意:这个类的属性全部都要是int类型
     * @author youzhengjie 2022-09-01 16:55:32
     */
    @Data
    public class SqlType {
    
        private int configId;
    
        private int configInfo;
    
        private int datetime;
    
        private int desc;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    ConfigCanalRedisConsumer(kafka消费者类,监听指定topic,把canal发送的消息同步到Redis中)⭐
    package com.boot.comsumer;
    
    import com.alibaba.fastjson.JSONObject;
    import com.boot.entity.Config;
    import com.boot.entity.config_canal.ConfigCanalBean;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * kafka消费者(监听名为canal-test-topic的topic),同步Redis
     * @author youzhengjie 2022-09-01 16:54:28
     */
    @Component
    @Slf4j
    public class ConfigCanalRedisConsumer {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        //redis的key格式:(数据库.表名_字段的id)
        private static final String KEY_PREFIX = "canal-test-db1.t_config_";
    
        //过期时间(单位:小时)
        private static final int TIME_OUT = 24;
    
    
        /**
         * @param consumer 接收消费记录(消息)
         * @param ack 手动提交消息
         */
        @KafkaListener(topics = "canal-test-topic")
        public void receive(ConsumerRecord<String, String> consumer, Acknowledgment ack) {
    
            try {
                //获取canal的消息
                String value = (String) consumer.value();
                log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value);
    
                //转换为javaBean
                ConfigCanalBean canalBean = JSONObject.parseObject(value, ConfigCanalBean.class);
                /*
                 由于我们canal.instance配置了监控canal-test-db1.t_config表和canal-test-db1.t_user表(生产环境下可以启动多个canal,每一个canal监听一张需要同步的表)
                所以我们要对这两张表分开处理。(可以通过他们的表名(canalBean.getTable())来区分)
                如果canalBean.getTable()获取的表名是t_config,则同步到redis,如果不是则不管。
                 */
                //System.out.println(canalBean);
                if("t_config".equals(canalBean.getTable())){
                    //获取是否是DDL语句
                    boolean isDdl = canalBean.isDdl();
                    //获取当前sql语句的类型(比如INSERT、DELETE等等)
                    String type = canalBean.getType();
                    List<Config> configList = canalBean.getData();
                    //如果不是DDL语句
                    if (!isDdl) {
                        //INSERT和UPDATE都是一样的操作
                        if ("INSERT".equals(type) || "UPDATE".equals(type)) {
                            //新增语句
                            for (Config config : configList) {
                                Long id = config.getConfigId();
                                //新增到redis中,过期时间是10分钟
                                redisTemplate.
                                        opsForValue().
                                        set(KEY_PREFIX + id, JSONObject.toJSONString(config), TIME_OUT, TimeUnit.HOURS);
                            }
                        }else if("DELETE".equals(type)){
                            //删除语句
                            for (Config config : configList) {
                                Long id = config.getConfigId();
                                //从redis中删除
                                redisTemplate.delete(KEY_PREFIX+id);
                            }
                        }
    
                    }
                }
                //最后,如果上面的代码没有报错的情况下,可以确认消息了。(很重要)
                ack.acknowledge();
            }catch (Exception e){
                throw new RuntimeException();
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    创建kafka的topic(我们指定的topic名称为canal-test-topic)
    /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server kafka01:9092 --topic canal-test-topic --create
    
    • 1
    开始测试canal同步效果⭐
    测试1:给t_config表插入数据

    在这里插入图片描述
    在这里插入图片描述

    测试2:修改t_config表数据

    在这里插入图片描述在这里插入图片描述

    测试3:删除t_config表数据

    在这里插入图片描述

  • 相关阅读:
    开源,SUSE云原生拼图的底版
    Ubuntu设置SSH
    MySQL基础总结合集
    linux检测系统是否被入侵(上)
    Java多线程-CountDownLatch、Semaphone、CyclicBarrier入门
    position的粘性定位
    JAVA-递归构建树形结构 嵌套子节点返参给前端 && 获取某节点下所有叶子节点 && 获取某节点下叶节点(没有子节点的节点)
    阿里云服务器包年包月收费模式常见问题汇总(官方资料解答)
    计算机学院第一周语法组及算法组作业
    c++ 左值引用 右值引用 及 参数引用 & 与&&
  • 原文地址:https://blog.csdn.net/weixin_50071998/article/details/126755908