• 【微服务】异步通讯实现ES数据同步


    1、引入

    玩过Elasticsearch(下面统称ES)的小伙伴都知道ES是一个十分强悍的搜索引擎,但是在之前学习的过程中一直都是通过手敲DSL语句把数据导入进去,这多少有点不优雅。那么到底能不能做到在我们数据库发生变更时,ES中的数据也随之改变,即做到两者数据同步呢?答案理所应当的可以!

    image-20220808114020522

    2、方案分析

    常见的数据同步方案主要有以下三种:

    • 同步调用。在实现增删改的同时,通过调用ES所在服务提供的接口,从而实现两端服务的数据同步。这种方式能够简单粗暴的解决同步问题,但是业务的耦合度较高,一般不建议使用;
    • 异步调用。增删改服务和搜索服务分别通过MQ进行发送和监听消息,从而实现两端服务的数据同步。这种方式能够在一定程度上有效降低业务的耦合度,但较为依赖MQ的性能
    • binlog监听。给MySQL开启binlog功能,搜索服务基于canal监听binlog变化,从而实现两端服务的数据同步。这种方法完全解除了服务间的耦合,但开启binlog会增加数据库负担、同时实现复杂度较高。

    而在这里选择的是通过MQ异步调用方案解决不同服务间ES和MySQL的数据同步问题,MQ则是采用了RabbitMQ

    image-20220808112604565

    3、具体实现

    3.1、大致思路


    我们的目标其实是很明确的,就是我们在进行增删改操作之后,能够即可在搜索端更新并得以体现。这里使用的是发布订阅模型中的Topic Exchange,当然也可以使用Direct Exchange,但是就不推荐使用Fanout Exchange了,当然自己喜欢最重要。

    需要注意的是,这里除了数据同步的具体实现外,其余的都只是大致提及需要自己进行编写,比如增删改中的接口,搜索的接口等。

    步骤大致可分为以下部分:

    1. 搭建一个增删改服务my-admin,并且准备好对应的增删改接口;
    2. 搭建一个搜索服务my-search,并且准备好对应的搜索接口;
    3. 导入实现数据同步所需依赖;
    4. 配置xml文件;
    5. 声明交换机及所需队列;
    6. 在my-admin服务中完成消息发送;
    7. 在my-search服务中对消息进行监听从而更新ES数据。

    image-20220808121249545

    3.2、导入依赖


    第一步和第二步这里就直接省略了,大家伙们随便准备一个简单的demo就行。这里除了基本的依赖之外,值得注意的只要有三个依赖,分别是数据库依赖、ES依赖和MQ依赖,如果需要用的druid的则需要自己进行修改。(其中ES依赖只有搜索服务需要

    
    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <scope>runtimescope>
    dependency>
    
    
    
    <dependency>
        <groupId>org.elasticsearch.clientgroupId>
        <artifactId>elasticsearch-rest-high-level-clientartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3.3、配置文件


    配置文件主要是对MySQL和RabbitMQ进行配置。这一部分两个服务不出意外的话是一样的,除非是用了不同的数据库或者RabbitMQ。

    spring:
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/heima?useSSL=false
        username: root
        password: 123456
      rabbitmq:
        host: 192.168.150.100 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: xbaozi # 用户名
        password: 123456 # 密码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.4、MQ搭建


    3.4.1、封装名称属性


    对交换机和队列以及Topic Exchange所需要的RoutingKey名称,提高整体代码的复用性。在两个服务中同时新建一个constant包,将该包下新建下面类:

    public class MqConstants {
        /**
         * 交换机
         */
        public final static String MY_EXCHANGE = "my.topic";
        /**
         * 监听新增和修改的队列
         */
        public final static String MY_INSERT_QUEUE = "my.insert.queue";
        /**
         * 监听删除的队列
         */
        public final static String MY_DELETE_QUEUE = "my.delete.queue";
        /**
         * 新增或修改的RoutingKey
         */
        public final static String MY_INSERT_KEY = "my.insert";
        /**
         * 删除的RoutingKey
         */
        public final static String MY_DELETE_KEY = "my.delete";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    3.4.2、声明注入交换机


    因为是两边服务都需要,且为了避免只在一个服务声明另一个服务先运行产生异常,因此这里使用的是配置类注入的形式进行声明,当然也可以使用注解的形式,只是相对来说对这里不太方便而已。在两个服务中新建config包,并在该包下新建配置类:

    @Configuration
    public class MqConfig {
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(MqConstants.MY_EXCHANGE, true, false);
        }
    
        @Bean
        public Queue insertQueue() {
            return new Queue(MqConstants.MY_INSERT_QUEUE, true);
        }
    
        @Bean
        public Queue deleteQueue() {
            return new Queue(MqConstants.MY_DELETE_QUEUE, true);
        }
    
        @Bean
        public Binding insertQueueBinding() {
            return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.MY_INSERT_KEY);
        }
    
        @Bean
        public Binding deleteQueueBinding() {
            return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.MY_DELETE_KEY);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3.4.3、发送消息


    发送消息相对简单,只需要在增删改服务中的增、删、改操作中调用convertAndSend方法即可。(记得注入RabbitTemplate)

    @PostMapping
    public void save(@RequestBody User user){
        userService.save(user);
        // 发送保存消息,避免发送消息过大造成队列溢出,因此只需要发送id支持查询即可
        rabbitTemplate.convertAndSend(MqConstants.MY_EXCHANGE, MqConstants.MY_INSERT_KEY, user.getId());
    }
    @PutMapping()
    public void updateById(@RequestBody User user){
        if (user.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        userService.updateById(user);
        // 发送更新消息,避免发送消息过大造成队列溢出,因此只需要发送id支持查询即可
        rabbitTemplate.convertAndSend(MqConstants.MY_EXCHANGE, MqConstants.MY_INSERT_KEY, user.getId());
    }
    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        userService.removeById(id);
        // 发送删除消息
        rabbitTemplate.convertAndSend(MqConstants.MY_EXCHANGE, MqConstants.MY_DELETE_KEY, id);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.4.4、接收消息


    对于接受消息需要准备好删除和更新的业务逻辑,其中新增和修改统称为了更新业务。除此之外,还需要准备好一个监听器,用于监听MQ发送过来的消息。

    • 业务逻辑实现:这里是通过service实现了两个业务逻辑,这里直接给出Impl中的具体实现方式。
    @Override
    public void deleteById(Long id) {
        try {
            // 1.准备Request
            DeleteRequest request = new DeleteRequest("hotel", id.toString());
            // 2.发送请求
            client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void updateById(Long id) {
        try {
            // 0.根据id查询酒店数据
            User user = getById(id);
            // 转换为文档类型
            UserDoc userDoc = new UserDoc(user);
            // 1.准备Request对象
            IndexRequest request = new IndexRequest("user").id(user.getId().toString());
            // 2.准备Json文档
            request.source(JSON.toJSONString(userDoc), XContentType.JSON);
            // 3.发送请求
            client.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 监听消息实现:因为前期已经通过Bean注入了交换机与队列的关系,因此这里是需要使用注解表明需要监听的队列即可。
    /**
     * 监听用户新增或修改的业务
     * @param id 用户id
     */
    @RabbitListener(queues = MqConstants.MY_INSERT_QUEUE)
    public void listenUserInsertOrUpdate(Long id){
        userService.insertById(id);
    }
    /**
     * 监听用户删除的业务
     * @param id 用户id
     */
    @RabbitListener(queues = MqConstants.MY_DELETE_QUEUE)
    public void listenUserDelete(Long id){
        userService.deleteById(id);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3.5、验收成果


    到这里的话就可以对结果进行演示了,因为在写这玩意的时候是已经写完了demo且已经是关掉了的,因此这里就不放演示截图,直接告诉大家伙的验收方式:

    • 如果有页面的,通过界面测试:
      • 同时打开搜索界服务和增删改服务的界面,找到同一条数据;
      • 通过增删改服务的界面进行新增操作(删除或修改);
      • 确保已经操作成功后在搜索服务界面刷新,找到那条数据进行验证;
      • 如果搜索服务搜索出来的是更新后的,则证明数据同步成功。
    • 如果没有界面的,通过接口测试工具测试:
      • 同样先找到同一条数据确定下来,无论是通过接口查询还是DSL或SQL;
      • 通过Postman或其他接口测试工具进行新增操作(删除或修改);
      • 确保已经操作成功后再次调用搜索服务的搜索接口,找到那条数据进行验证;
      • 如果搜索出来的是更新后的,则证明数据同步成功。

    4、失败原因推测

    这里对可能失败的原因进行一个推测,如果真的失败了可以尝试一下以下方法:

    1. MQ、ES等服务是否启动;
    2. 如果是使用的服务器或虚拟机,防火墙 / 端口是否打开;
    3. 是否在两个服务中都对MySQL以及MQ进行配置
    4. 在运行时代码中是否对RestClient进行初始化,如通过Bean注入;
    5. 交换机、队列的声明和绑定是否正确,且在两个服务中都进行了声明;
    6. 编译器如IDEA的Maven配置、yml文件的字符集配置是否正确;
    7. 如果是长期打开的虚拟机,建议重启后再试;
    8. 换一个IDEA或者服务器吧,没救了(偷溜)
      在这里插入图片描述
  • 相关阅读:
    MATLAB程序设计之循环结构入门详解
    基于Java+SpringBoot+Vue前后端分离农产品直卖平台设计和实现
    基于RK3568的鸿蒙通行一体机方案项目
    问题usr/bin/env: “python‘: Too many levels of symbolic links太多层链接的bug pycharm
    性能测试,如何做压力测试?压力测试的12点误区,避免背锅提升效率(一)
    基于内存的分布式NoSQL数据库Redis(一)介绍与安装
    Lab: Xv6 and Unix utilities
    main函数参数解析与应用
    NDSS 2022 EMS: History-Driven Mutation for Coverage-based Fuzzing
    apk构建过程
  • 原文地址:https://blog.csdn.net/Aqting/article/details/126239263