• Canal整合SpringBoot详解(二)


    Canal整合SpringBoot详解(二)

    什么是canal

    • canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
    • canal工作原理:
      • canal的工作原理就是把自己伪装成MySQL slave从节点,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如RocketMQ、Kafka、ElasticSearch等等。
    • canal能做什么:
      • 数据库镜像
      • 数据库实时备份
      • 索引构建和实时维护
      • 业务cache(缓存)刷新
      • 带业务逻辑的增量数据处理

    案例2:Canal+Kafka实现mysql和elasticsearch的数据同步⭐

    案例目的:

    1:实现canal只监控canal-test-db1数据库下的t_config(主要同步这个表)和.t_user表。

    2:当我们修改canal-test-db1数据库下的t_config表的内容会自动同步到ElasticSearch中;

    3:当我们修改canal-test-db1数据库下的t_user表则不会同步。(虽然t_user表也被canal监控,但是这个案例就要做到在被监控的情况下,而不被同步),说白了就是只同步t_config表。

    Docker搭建elasticsearch7.8.0(单机版本)⭐
    • 1:docker可能会拉取不了es,此时可以配置一个很好用的镜像源(daocloud),下载非常快:
    curl -sSL https://get.daocloud.io/daotools/set_mirror.sh | sh -s http://f1361db2.m.daocloud.io
    
    • 1
    sudo systemctl restart docker
    
    • 1
    • 2:创建挂载目录:
    mkdir -p /usr/local/docker/elasticsearch/config
    mkdir -p /usr/local/docker/elasticsearch/data
    
    chmod 777 /usr/local/docker/elasticsearch/config
    chmod 777 /usr/local/docker/elasticsearch/data
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 3:编写es配置文件:
    vi /usr/local/docker/elasticsearch/config/elasticsearch.yml
    
    • 1

    内容如下:

    cluster.name: “es-cluser01”
    node.name: es-node1
    network.host: 0.0.0.0
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    cluster.initial_master_nodes: ["es-node1"] #这个一定要填,集群默认的主节点名称(node.name)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 4:永久调大虚拟机内存:(不然启动不了)
    vim /etc/sysctl.conf
    
    • 1

    在最后面添加的内容如下:

    vm.max_map_count=262144
    
    • 1
    • 5:刷新配置:
    sysctl -p
    
    • 1
    • 6:运行elasticsearch容器:(访问该服务器ip:9200即可访问)
      • ES_JAVA_OPTS两个Xms的值都要一致,不然会报错。(这个很坑!!)
    docker run --name elasticsearch \
    -p 9200:9200 \
    -p 9300:9300 \
    -e “discovery.type=single-node” \
    -e ES_JAVA_OPTS="-Xms256m -Xmx256m" \
    -v /usr/local/docker/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
    -v /usr/local/docker/elasticsearch/data:/usr/share/elasticsearch/data \
    -v /usr/local/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
    -d elasticsearch:7.8.0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    Docker安装elasticsearch-head5⭐
    • 1:拉取镜像:
    docker pull mobz/elasticsearch-head:5
    
    • 1
    • 2:启动镜像:
    docker run -d -p 9100:9100 --name=elasticsearch-head mobz/elasticsearch-head:5
    
    • 1
    • 3:进入容器:
    docker exec -it elasticsearch-head /bin/bash
    
    • 1
    解决es-head 406错误问题
    • 方式1:直接修改容器内文件
    • 方式2:使用容器数据卷的方式(推荐。可以使用容器数据卷的方式修改vendor.js 文件⭐)
    直接修改容器内文件(需要下载vim命令)
    • 1:
    mv /etc/apt/sources.list /etc/apt/sources.list.bak
        echo "deb http://mirrors.163.com/debian/ jessie main non-free contrib" >> /etc/apt/sources.list
        echo "deb http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
        echo "deb-src http://mirrors.163.com/debian/ jessie main non-free contrib" >>/etc/apt/sources.list
        echo "deb-src http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 2:更新源
    apt update
    
    • 1
    • 3:安装vim:(按Y即可)
    apt-get install vim
    
    • 1
    • 4:修改vendor.js 文件:
    vim _site/vendor.js 
    
    • 1

    修改1:在6886行,把contentType: "application/x-www-form-urlencoded,修改成contentType: “application/json;charset=UTF-8”

    修改2:7573行 var inspectData = s.contentType === “application/x-www-form-urlencoded” &&

    修改成var inspectData = s.contentType === “application/json;charset=UTF-8” &&

    • 5:重启容器:
    docker restart elasticsearch-head
    
    • 1
    Docker安装kibana(注意:kibana的版本要和elasticsearch的版本相同才行)⭐
    • 1:拉取镜像:(注意:kibana的版本要和elasticsearch的版本相同才行)
    docker pull kibana:7.8.0
    
    • 1
    • 2:编辑配置文件:
    mkdir -p /usr/local/kibana/config/
    vi /usr/local/kibana/config/kibana.yml
    
    • 1
    • 2

    内容如下:(修改elasticsearch.hosts为你的elasticsearch地址列表)

    server.name: kibana
    server.host: "0"
    elasticsearch.hosts: [ "http://192.168.184.201:9200" ]
    xpack.monitoring.ui.container.elasticsearch.enabled: true
    
    • 1
    • 2
    • 3
    • 4
    • 3:启动:
    docker run -d \
      --name=kibana \
      --restart=always \
      -p 5601:5601 \
      -v /usr/local/kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml \
      kibana:7.8.0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 4:访问kibana:(服务器ip:5601)

    在这里插入图片描述
    在这里插入图片描述

    修改我们刚刚的SpringBoot项目⭐
    把ConfigCanalRedisConsumer类注释掉(或者可以修改instance.properties的topic名称)⭐

    在这里插入图片描述

    给pom.xml添加ElasticSearch的依赖⭐
            
            
            <dependency>
                <groupId>org.elasticsearchgroupId>
                <artifactId>elasticsearchartifactId>
                <version>7.8.0version>
            dependency>
    		
            
            <dependency>
                <groupId>org.elasticsearch.clientgroupId>
                <artifactId>elasticsearch-rest-high-level-clientartifactId>
                <version>7.8.0version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    ElasticSearchConfig.class
    package com.boot.config;
    
    import org.apache.http.HttpHost;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    /**
     * @author youzhengjie 2022-09-01 22:46:04
     */
    @Configuration
    public class ElasticSearchConfig {
    
        @Bean
        public RestHighLevelClient restHighLevelClient() {
            HttpHost httpHost = new HttpHost("192.168.184.201", 9200, "http");
            RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
            return new RestHighLevelClient(restClientBuilder);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    添加索引的test方法
    package com.boot;
    
    import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.client.indices.CreateIndexRequest;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.elasticsearch.common.xcontent.XContentFactory;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import java.io.IOException;
    
    @SpringBootTest
    public class ConfigCanalTest {
    
        @Autowired
        private RestHighLevelClient restHighLevelClient;
    
        private static final String INDEX="config-canal-es";
    
        /*
         1:创建es索引
         */
        /*
    
        PUT config-canal-es
        {
            "mappings":{
            "properties":{
                "configInfo":{
                   "type":"text",
                   "analyzer":"standard"
               },
                "datetime":{
                   "type":"keyword"
               },
               "desc":{
                   "type":"text"
               }
    
                 }
            }
       }
    
         */
        @Test //代码实现上面的添加索引。
        //注意:使用XContentFactory.jsonBuilder()创建索引,不需要把"mappings":{}这个算上去。不然会报错。也就是说不可以写startObject("mappings").endObject()
        void addConfigIndexToES() throws IOException {
    
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);
            XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
                    .startObject() //{
                    .startObject("properties")//"properties"{
                    .startObject("configInfo") //"configInfo"{
                    .field("type", "text") // "type":"text",
                    .field("analyzer", "standard")//"analyzer":"standard"
                    .endObject()//},
                    .startObject("datetime")//"datetime":{
                    .field("type", "keyword")//"type":"keyword"
                    .endObject()//},
                    .startObject("desc")//"desc":{
                    .field("type", "text")//"type":"text"
                    .endObject()//}
                    .endObject()//} ,properties的结束
                    .endObject();//}
    
            createIndexRequest.mapping(xContentBuilder);
            restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
        }
    
        @Test//删除索引
        void deleteConfigToES() throws IOException {
    
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(INDEX);
            restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    新建ConfigCanalElasticSearchConsumer类(kafka消费者类,监听指定topic,把canal发送的消息同步到ElasticSearch中)⭐
    package com.boot.comsumer;
    
    import com.alibaba.fastjson.JSONObject;
    import com.boot.entity.Config;
    import com.boot.entity.config_canal.ConfigCanalBean;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.delete.DeleteRequest;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.rest.RestStatus;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    import java.util.List;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.TimeUnit;
    
    /**
     * kafka消费者(监听名为canal-test-topic的topic),同步ElasticSearch
     * @author youzhengjie 2022-09-01 16:54:28
     */
    @Component
    @Slf4j
    public class ConfigCanalElasticSearchConsumer {
    
        @Autowired
        private RestHighLevelClient restHighLevelClient;
    
        //es的index,相当于mysql的数据库:(数据库.表名)
        private static final String ES_INDEX = "config-canal-es";
    
        //过期时间(单位:小时)
        private static final int TIME_OUT = 24;
    
    
        /**
         * @param consumer 接收消费记录(消息)
         * @param ack 手动提交消息
         */
        @KafkaListener(topics = "canal-test-topic")
        public void receive(ConsumerRecord<String, String> consumer, Acknowledgment ack) {
    
            try {
                //获取canal的消息
                String value = (String) consumer.value();
                log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value);
    
                //转换为javaBean
                ConfigCanalBean canalBean = JSONObject.parseObject(value, ConfigCanalBean.class);
                /*
                由于我们canal.instance配置了监控canal-test-db1.t_config表和canal-test-db1.t_user表(生产环境下可以启动多个canal,每一个canal监听一张需要同步的表)
                所以我们要对这两张表分开处理。(可以通过他们的表名(canalBean.getTable())来区分)
                如果canalBean.getTable()获取的表名是t_config,则同步到es,如果不是则不管。
                 */
                log.warn("["+canalBean+"]");
                if("t_config".equals(canalBean.getTable())){
                    //获取是否是DDL语句
                    boolean isDdl = canalBean.isDdl();
                    //获取当前sql语句的类型(比如INSERT、DELETE等等)
                    String type = canalBean.getType();
                    List<Config> datas = canalBean.getData();
                    if ("t_config".equals(canalBean.getTable())) {
                        //如果不是DDL语句
                        if (!isDdl) {
                            //INSERT和UPDATE都是一样的操作
                            if ("INSERT".equals(type) || "UPDATE".equals(type)) {
                                //新增语句
                                for (Config config : datas) {
                                    // 增加、修改处理
                                    IndexRequest indexRequest = new IndexRequest(ES_INDEX);
                                    indexRequest.id(config.getConfigId()+""); //id
                                    ConcurrentHashMap<String, Object> dataMap = new ConcurrentHashMap<>();
                                    dataMap.put("configInfo",(config.getConfigInfo()!=null)?config.getConfigInfo():"");
                                    dataMap.put("datetime",(config.getDatetime()!=null)?config.getDatetime():"");
                                    dataMap.put("desc",(config.getDesc()!=null)?config.getDesc():"");
                                    indexRequest.source(dataMap);
    
                                    IndexResponse response= restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
    
                                    RestStatus status = response.status();
                                    log.info("status="+status.toString());
                                }
                            }else if("DELETE".equals(type)){
                                //删除语句
                                if(datas!=null && datas.size()>0){
                                    for (Config config : datas) {
                                        DeleteRequest deleteRequest = new DeleteRequest();
                                        deleteRequest.id(config.getConfigId()+"");
                                        DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
                                        log.info("deleteResponse:"+deleteResponse);
                                    }
                                }
                            }
    
                        }
                    }
                }
                //最后,如果上面的代码没有报错的情况下,可以确认消息了。(很重要)
                ack.acknowledge();
            }catch (Exception e){
                throw new RuntimeException();
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
  • 相关阅读:
    剑指 Offer 26. 树的子结构
    OAK PoE设备独立模式详细介绍
    用shell批量改变打错标签的名字(打标签是一生之敌)
    使用vue-cli创建Vue工程化项目及单文件组件的创建和调用
    Linux系统输出整数值的shell脚本
    el-form动态检验无法生效问题(已解决)
    spring源码解析——IOC之自定义标签解析
    LeetCode每日一题:1774. 最接近目标价格的甜点成本 深搜+剪枝 / 动态规划
    js第一章
    Jetson平台180度鱼眼相机畸变校正调试记录
  • 原文地址:https://blog.csdn.net/weixin_50071998/article/details/126756032