• springboot+canal+mysql+redis缓存双写一致性


    canal官网地址:https://github.com/alibaba/canal/wiki/QuickStart
    基本上按照官网的步骤来就行
    在这里插入图片描述

    准备

    首先服务器上要安装好jdk,因为canal运行需要jdk,同时把canal对应的端口在服务中开放,否则连接不上

    对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    查看是否开启binlog日志功能

    SHOW VARIABLES LIKE 'log_bin';
    
    • 1

    如果为OFF说明没有开启

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    • 1
    • 2
    • 3
    • 4

    授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    注意点,这里执行命令最好是在服务器端执行,因为我们通过连接客户端登录的自己创建的后台用户比如root ,没有授权权限,会出错
    出现以下情况
    在这里插入图片描述

    DROP USER IF EXISTS 'canal'@'%'; #删除用户
    CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';   ## 创建用户
    GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';  
    FLUSH PRIVILEGES;
    SELECT * FROM mysql.user;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    启动

    下载 canal, 访问 release 页面 , 选择需要的包下载, 我这里下载最新的

    https://github.com/alibaba/canal/releases/tag/canal-1.1.6
    
    • 1

    解压缩

    上传服务器选择一个位置,并解压指定的文件夹下面

    mkdir /tmp/canal
    tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal
    
    • 1
    • 2

    解压之后

    解压完成后,进入 /tmp/canal 目录,可以看到如下结构

    drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
    drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
    drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
    drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs
    
    • 1
    • 2
    • 3
    • 4

    配置修改

    vi conf/example/instance.properties
    canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
    如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

    ## mysql serverId
    canal.instance.mysql.slaveId = 1234 #默认是注释的,
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306 
    canal.instance.master.journal.name = 
    canal.instance.master.position = 
    canal.instance.master.timestamp = 
    #canal.instance.standby.address = 
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position = 
    #canal.instance.standby.timestamp = 
    #username/password,需要改成自己的数据库信息
    canal.instance.dbUsername = canal  #canal默认用户名
    canal.instance.dbPassword = canal  #canal默认密码 
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    #table regex
    canal.instance.filter.regex = .\*\\\\..\*  #监听的表,默认监听所有表
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    启动

    sh bin/startup.sh
    
    • 1

    查看 server 日志

    vi logs/canal/canal.log

    2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
    2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
    2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    查看 instance 的日志

    vi logs/example/example.log

    2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
    2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
    
    • 1
    • 2
    • 3
    • 4

    DEMO

    依赖

     <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.73</version>
            </dependency>
    
    
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>5.3.10</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
    
        
    
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
            </dependency>
    
           
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    
            <!-- 导入mysql驱动包-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.11</version>
            </dependency>
            <!-- 导入数据源依赖-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.1.9</version>
            </dependency>
    
         
    
    
            <!--canal-->
    
    
            <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
    
            </dependency>
    
    
    
    
        </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    代码

    配置文件
    server:
      port: 8081
    spring:
      datasource:
        url: jdbc:mysql://42.192.46.136:3306/test?useUnicode=true&useSSL=false&characterEncoding=utf-8
        username: root
        password: lkzroot
        type: com.alibaba.druid.pool.DruidDataSource
      redis:
        password: 123456
        host: 42.192.43.136
        port: 16379
        lettuce:
          pool:
            max-active: 200
            max-idle: 10
            max-wait: -1ms
            min-idle: 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    测试类
    package com.boot.canal.client;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import redis.clients.jedis.Jedis;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Author: lkz
     * @Title: RedisCanalClientExample
     * @Description: TODO
     * @Date: 2023/9/18 14:06
     */
    
    public class RedisCanalClientExample {
    
    
        public static final Integer _60SECONDS = 60;
        public static final String  Canal_host_ip = "127.0.0.1";
    
        private static void redisInsert(List<CanalEntry.Column> columns)
        {
            JSONObject jsonObject = new JSONObject();
            for (CanalEntry.Column column : columns)
            {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                jsonObject.put(column.getName(),column.getValue());
            }
            if(columns.size() > 0)
            {
                try(Jedis jedis = RedisUtils.getJedis())
                {
                    jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
        private static void redisDelete(List<CanalEntry.Column> columns)
        {
            JSONObject jsonObject = new JSONObject();
            for (CanalEntry.Column column : columns)
            {
                jsonObject.put(column.getName(),column.getValue());
            }
            if(columns.size() > 0)
            {
                try(Jedis jedis = RedisUtils.getJedis())
                {
                    jedis.del(columns.get(0).getValue());
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
        private static void redisUpdate(List<CanalEntry.Column> columns)
        {
            JSONObject jsonObject = new JSONObject();
            for (CanalEntry.Column column : columns)
            {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                jsonObject.put(column.getName(),column.getValue());
            }
            if(columns.size() > 0)
            {
                try(Jedis jedis = RedisUtils.getJedis())
                {
                    jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                    System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
        public static void printEntry(List<CanalEntry.Entry> entrys) {
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                CanalEntry.RowChange rowChage = null;
                try {
                    //获取变更的row数据
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
                }
                //获取变动类型
                CanalEntry.EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
    
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == CanalEntry.EventType.INSERT) {
                        redisInsert(rowData.getAfterColumnsList());
                    } else if (eventType == CanalEntry.EventType.DELETE) {
                        redisDelete(rowData.getBeforeColumnsList());
                    } else {//EventType.UPDATE
                        redisUpdate(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
    
        public static void main(String[] args)
        {
            System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
    
            //=================================
            // 创建链接canal服务端
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(Canal_host_ip, 11111),
                    "example",
                    "",
                    "");
            int batchSize = 1000;
            //空闲空转计数器
            int emptyCount = 0;
            System.out.println("---------------------canal init OK,开始监听mysql变化------");
            try {
                connector.connect();
                //connector.subscribe(".*\\..*");
                connector.subscribe("test.test1");
                connector.rollback();
                int totalEmptyCount = 10 * _60SECONDS;
                while (emptyCount < totalEmptyCount) {
                    System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                    } else {
                        //计数器重新置零
                        emptyCount = 0;
                        printEntry(message.getEntries());
                    }
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
                System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
            } finally {
                connector.disconnect();
            }
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    redis配置
    package com.boot.canal.client;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    
    public class RedisUtils
    {
        public static final String  REDIS_IP_ADDR = "127.0.0.1";
        public static final String  REDIS_pwd = "123456";
        public static JedisPool jedisPool;
    
        static {
            JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
            jedisPoolConfig.setMaxTotal(20);
            jedisPoolConfig.setMaxIdle(10);
            jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,16379,10000,REDIS_pwd);
        }
    
        public static Jedis getJedis() throws Exception {
            if(null!=jedisPool){
                return jedisPool.getResource();
            }
            throw new Exception("Jedispool is not ok");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    结果

    修改表数据

    在这里插入图片描述

  • 相关阅读:
    最后的防线:数据存储加密方案
    Python3 输入和输出
    0. HarmonyOS开发环境搭建问题
    ​SAP MM 使用两个STO实现免关税跨国公司间转储
    前端架构师之10_JavaScript_DOM
    react-redux的connect函数实现
    Docker-Compose安装、卸载、使用详解
    智能硬件开发怎么做?机智云全套自助式开发工具助力高效开发
    多进程编程(五):信号量
    Keras中model.evaluate() 返回的是 loss value 和 metrics values
  • 原文地址:https://blog.csdn.net/qq_43049310/article/details/132969494