• 使用canal解决Mysql和Redis数据同步(TCP)


    前言

    之前写过一篇文章《使用canal解决Mysql和Redis数据同步问题》,也是使用canal实现mysql和redis的数据同步,和该篇文章不一样的是,上一篇是基于MQ实现数据同步,该篇文章是基于TCP方式来实现。

    工作原理分析

    我们在面试的时候常常听面试官问这么一个问题:你们的Mysql和Redis怎么做数据同步的,根据不同的业务场景又很多方案,你可能会说先写库再删缓存,或者延迟双删或其他方案。今天我要给大家分享的就是比较成熟的方案-使用Canal实现Mysql和Redis数据的同步。

    我不知道你是否了解Mysql主从,根据2/8原则,80%的性能问题都在读上面,当我们数据库的读并发较大的时候,我们可以使用Mysql主从来分担读的压力。它的原理是所有的写操作在主库上,读操作在从库上,当然主库也可以承担读请求,而从库的数据则通过主库复制而来,Mysql自带主从复制的功能。如下图
    在这里插入图片描述

    主从复制步骤:

    1. 将Master的binary-log日志文件打开,mysql会把所有的DDL,DML,TCL写入BinaryLog日志文件中
    2. Master会生成一个 log dump 线程,用来给从库的 i/o线程传binlog
    3. 从库的i/o线程去请求主库的binlog,并将得到的binlog日志写到中继日志(relaylog)中
    4. 从库的sql线程,会读取relaylog文件中的日志,并解析成具体操作,通过主从的操作一致,而达到最终数据一致

    而Canal的原理就是伪装成Slave从Binlog中复制SQL语句或者数据。

    Mysql和Redis数据同步方案

    根据上面所说,我们就可以通过Canal去自动同步数据库的binlog数据日志文件,然后再把数据同步到Redis,从而达到Mysql和Redis自动同步的功能。很遗憾的是Canal没办法直接把数据库同步到Redis,它支持的是组件有 : mysql、Kafka、ElasticSearch、Hbase、RocketMQ等
    在这里插入图片描述
    当然 canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑

    • canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
    • canal c# 客户端: https://github.com/dotnetcore/CanalSharp
    • canal go客户端: https://github.com/CanalClient/canal-go
    • canal Python客户端: https://github.com/haozi3156666/canal-python

    canal 作为 MySQL binlog 增量获取和解析工具,可将数据通过TCP协议将数据同步到canal-client也就是我们的应用中,因此我们可以使用下面这种方案来同步数据
    在这里插入图片描述

    1. 首选需要开启Mysql的bin-log
    2. 然后需要安装canal-server伪装成slave同步mysql中的数据
    3. 编写canal-client客户端监听canal-server,把数据从canal-server中同步过来
    4. 然后把拿到的数据写入Redis即可

    开启Mysql bin-log日志

    找到Mysql安装目录中的my.ini 配置文件,我以mysql 5.5为例,在 mysqld 下做如下配置

    [mysqld]
    #开启bInlog
    log-bin=mysql-bin
    #给mysql服务指定一个唯一的ID
    server-id=1
    #以数据的方式写binlog日志 :statement 是记录SQL,row是记录数据
    binlog-format=ROW
    #同步的数据库名
    #binlog-do-db=canaldb
    #忽略的表
    binlog-ignore-db=mysql
    # 启动mysql时不启动grant-tables授权表
    skip-grant-tables
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    修改好之后,重启Mysql服务。注意:我这里指定了需要同步的数据库为canaldb,所以需要创建一个数据库,同时创建了一个employee表作为演示
    在这里插入图片描述
    然后创建一个用户提供给canal来链接Mysql做数据同步

    flush privileges;
    #创建用户cannal
    CREATE USER canal IDENTIFIED BY 'canal';
    #把所有权限赋予canal,密码也是canal
    GRANT ALL PRIVILEGES ON canaldb.user TO 'canal'@'%' identified by "canal";
    //GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' identified by "canal";
    #刷新权限
    flush privileges;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    到这,Mysql部分就搞定了

    安装Canal

    去官网下载 Canal : https://github.com/alibaba/canal/releases ,我使用的是canal.deployer-1.1.5.tar.gz版本在这里插入图片描述

    下载好之后解压,目录结构如下
    在这里插入图片描述
    接下来修改instance 配置文件 : conf/example/instance.properties

    #  按需修改成自己的数据库信息
    #################################################
    ...
    #我的端口是3307
    canal.instance.master.address=192.168.1.20:3307
    # username/password,数据库的用户名和密码
    ...
    #刚才开通的mysql的账户密码
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    ...
    # 同步的表的规则
    # table regex
    # 同步所有表
    #canal.instance.filter.regex=.*\\..*
    # 同步多个表,用逗号隔开
    canal.instance.filter.regex=canaldb.employee,canaldb.dept
    #################################################
    
    ...省略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    这里注意如下几个东西,其他的不用管

    • master.address :Mysql的地址,我的端口是3307,默认是3306
    • dbUsername :上面开通的Mysql用户
    • dbPassword : 密码
    • ccanal.instance.filter.regex : 要同步的表,多个表用逗号隔开

    接着修改canal 配置文件 conf/canal.properties

    # ...
    # 可选项: tcp(默认), kafka, RocketMQ
    # 这里使用tcp , 还支持kafka和rocketmq
    canal.serverMode = tcp
    ...省略...
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这里需要注意 : canal.serverMode = tcp: 我这里以tcp为例,指的是以tcp协议把数据同步数据,而不是同步到mq

    配置好之后,找到 canal 安装目录下 bin目录下的 startup.bat 双击启动,linux上启动:startup.sh
    在这里插入图片描述

    编写canal-client

    接下来我们需要在项目中整合canal-client来同步canal-server中的数据,然后写入Redis

    第一步:导入如下依赖,我这里使用了 canal-spring-boot-starter 来整合canal-client

    <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.2.5.RELEASEversion>
        parent>
        <dependencies>
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <optional>trueoptional>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-data-redisartifactId>
            dependency>
            
            <dependency>
                <groupId>top.javatoolgroupId>
                <artifactId>canal-spring-boot-starterartifactId>
                <version>1.2.1-RELEASEversion>
            dependency>
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.50version>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    第二步:在yaml配置canal地址,以及Redis相关参数

    canal:
      server: 127.0.0.1:11111 #canal的地址
      destination: example #默认的数据同步的目的地
    spring:
      redis:
        host: 127.0.0.1
        password: 123456
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    编写启动类

    @SpringBootApplication
    public class CanalApplication {
        public static void main(String[] args) {
            SpringApplication.run(CanalApplication.class,args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    第三步:对Redis做配置,实现自动序列化

    //缓存的配置
    @Configuration
    public class RedisConfig {
    
        @Resource
        private RedisConnectionFactory factory;
    
    
        //使用JSON进行序列化
        @Bean
        public RedisTemplate<Object, Object> redisTemplate() {
            RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
    
            redisTemplate.setConnectionFactory(factory);
            //JSON格式序列化
            GenericFastJsonRedisSerializer serializer = new GenericFastJsonRedisSerializer();
             //key的序列化
            redisTemplate.setKeySerializer(serializer);
            //value的序列化
            redisTemplate.setValueSerializer(serializer);
            //hash结构key的虚拟化
            redisTemplate.setHashKeySerializer(new StringRedisSerializer());
            //hash结构value的虚拟化
            redisTemplate.setHashValueSerializer(serializer);
            return redisTemplate;
        }
    
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    第四步:编写实体类,对应要同步的数据库的表

    @Data
    public class Employee {
        private Long id;
        private String username;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    第五步:编写数据同步处理器,canal-client提供了EntryHandler,该handler中提供了insert,delete,update方法,当监听到某张表的相关操作后,会回调对应的方法把数据传递进来,我们就可以拿到数据往Redis同步了。

    @CanalTable("employee")
    @Component
    @Slf4j
    public class EmployeeHandler implements EntryHandler<Employee> {
    
    	//把数据往Redis同步
        @Autowired
        private RedisTemplate<Object,Object> redisTemplate;
    
        @Override
        public void insert(Employee employee) {
            redisTemplate.opsForValue().set("EMP:"+employee.getId(),employee);
        }
    
        @Override
        public void delete(Employee employee) {
            redisTemplate.delete("EMP:"+employee.getId());
        }
    
        @Override
        public void update(Employee before, Employee after) {
            redisTemplate.opsForValue().set("EMP:"+after.getId(),after);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • @CanalTable(“employee”) :监听的表
    • EntryHandler : 拿到employee表的改变后的数据之后,会封装为Employee实体 投递给我们

    到这里代码就编写完成了,启动程序可以从控制台看到canal-client在不同尝试获取数据

    在这里插入图片描述
    启动redis后, 尝试手动修改数据库 employee表中的数据,然后实例redis-cli 查看 数据,下面是表中的数据

    在这里插入图片描述

    下面是redis中的数据

    在这里插入图片描述

    好了文章就到这里把,喜欢的话请给个好评,一不小心来个一键三连就更好啦!!!

  • 相关阅读:
    Python旅游门票收费问题
    ovirt-engine通过UI Plugin自定义页面
    Kubernetes 服务发现
    mnist手写数字识别
    Android调试Plugin
    Python的命令行参数实例详解
    【DevOps】Git 图文详解(五):远程仓库
    微信支付上新的“分分捐”很暖心,一起吗?
    Java高级-stream流
    Spring Boot中使用MongoDB完成数据存储
  • 原文地址:https://blog.csdn.net/u014494148/article/details/126433498