• 利用MQ实现mysql与elasticsearch数据同步


    流程

    1.声明exchange、queue、RoutingKey
    2. 在hotel-admin中进行增删改(SQL),完成消息发送
    3. 在hotel-demo中完成消息监听,并更新elasticsearch数据
    4. 测试同步

    在这里插入图片描述

    1.引入依赖

    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    我这里的mq是挂在了docker上,虚拟机地址是192.168.116.128。到时候这个根据自己的项目改就行

     spring: 
      rabbitmq:
        host: 192.168.116.128 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: itcast # 用户名
        password: 123321 # 密码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.声明交换机、队列和绑定关系

    package cn.itcast.hotel.constants;
    
    public class MqConstants {
    
        /**
         * 交换机
         */
        public final static String HOTEL_EXCHANGE = "hotel.topic";
        /**
         * 监听新增和修改的队列
         */
        public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
        /**
         * 监听删除的队列
         */
        public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
        /**
         * 新增或修改的RoutingKey
         */
        public final static String HOTEL_INSERT_KEY = "hotel.insert";
        /**
         * 删除的RoutingKey
         */
        public final static String HOTEL_DELETE_KEY = "hotel.delete";
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在hotel-demo中,定义配置类,声明队列、交换机:

    package cn.itcast.hotel.config;
    
    
    import cn.itcast.hotel.constants.MqConstants;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MqConfig {
        // 定义交换机
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
        }
        // 定义队列
        @Bean
        public Queue insertQueue()
        {
            return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
        }
        @Bean
        public Queue deleteQueue()
        {
            return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
        }
    
        // 定义绑定关系
        @Bean
        public Binding insertQueueBinding()
        {
            return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
        }
        @Bean
        public Binding deleteQueueBinding()
        {
            return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    3.发送MQ消息

    在hotel-admin中的增、删、改业务中分别发送MQ消息,具体怎么添加根据:

        // 新增
        @PostMapping
        public void saveHotel(){
            //数据库新增操作
            rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
        }
    
        // 更新
        @PutMapping()
        public void updateById(){
                //数据库修改操作
                rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    
        }
        // 删除
        @DeleteMapping("/{id}")
        public void deleteById() {
    		// 数据库删除操作
            rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    4.监听MQ消息

    接收MQ消息

    hotel-demo接收到MQ消息要做的事情包括:

    • 新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库
    • 删除消息:根据传递的hotel的id删除索引库中的一条数据

    1)首先在hotel-demo的cn.itcast.hotel.service包下的IHotelService中新增新增、删除业务

    void deleteById(Long id);
    
    void insertById(Long id);
    
    • 1
    • 2
    • 3

    2)给hotel-demo中的cn.itcast.hotel.service.impl包下的HotelService中实现业务:

    @Override
    public void deleteById(Long id) {
        try {
            // 1.准备Request
            DeleteRequest request = new DeleteRequest("hotel", id.toString());
            // 2.发送请求
            client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    
    @Override
    public void insertById(Long id) {
        try {
            // 0.根据id查询酒店数据
            Hotel hotel = getById(id);
            // 转换为文档类型
            HotelDoc hotelDoc = new HotelDoc(hotel);
    
            // 1.准备Request对象
            IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
            // 2.准备Json文档
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            // 3.发送请求
            client.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    编写监听类

    package cn.itcast.hotel.mq;
    
    import cn.itcast.hotel.constants.MqConstants;
    import cn.itcast.hotel.service.IHotelService;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HotelListener {
        // 专门用于消息监听的类
    
        @Autowired
        private IHotelService hotelService;
    
        @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
        public void listenHotelInsertOrUpdate(Long id)
        {
            hotelService.insertById(id);
        }
    
        @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
        public void listenHotelDelete(Long id)
        {
            hotelService.deleteById(id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
  • 相关阅读:
    JVM内存配置参数
    PAT 甲级 A1018 Public Bike Management
    算法通关村第19关【黄金】| 继续盘点高频动态规划dp问题
    Java8中的函数式接口(你知道几个?)
    如何用Python3自撰一个简单的后端框架
    Java项目:SSM共享汽车租赁平台
    git常见命令和操作
    印刷企业如何利用MES管理系统改善生产计划
    Redis未授权访问
    超大文件分片断点多线程上传
  • 原文地址:https://blog.csdn.net/qq_41296039/article/details/132676886