• 【学相伴】狂神说 RabbitMQ快速入门笔记(简单使用RabbitMQ)SpringBoot整和RabbitMQ


    SpringBoot整合RabbitMQ

    由于我们做项目基于Springboot开发,因此我们这里主要进行Springboot配置开发
    
    • 1

    在这里插入图片描述

    YML配置

    server:
      port: 8081
    spring:
      rabbitmq:
        username: sakura
        password: 123456
        virtual-host: /
        host: 192.168.117.129
        port: 5672
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Config配置类

    我们需要一个配置类,声明我们的交换机和队列,需要注册进容器里面

    • 1.声明一个注册交换机FanoutExchange交换机
    • 2.声明一个队列EmailQueue
    • 3.进行队列和交换机进行
    @Configuration
    public class RabbitMqConfiguration {
        //1. 声明注册fanout模式的交换机
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanout-order-exchange", true, false);
        }
        //2. 声明队列duanxinqueue smsqueue emailqueue
        @Bean
        public Queue emailQueue() {
            return new Queue("email.fanout.queue", true);
        }
        @Bean
        public Queue smsQueue() {
            return new Queue("sms.fanout.queue", true);
        }
        @Bean
        public Queue weixinQueue() {
            return new Queue("weixin.fanout.queue", true);
        }
        //3. 绑定关系
        @Bean
        public Binding emailBinding() {
            return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
        }
        @Bean
        public Binding smsBinding() {
            return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
        }
        @Bean
        public Binding weixinBinding() {
            return BindingBuilder.bind(weixinQueue()).to(fanoutExchange());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    上述的队列名,交换机名,我们可以定义一个类,声明为常量或者就在RabbitMQ里面创建的exchange或者queue

    2.service类

    @Service
    public class OrderService {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        public void makeOrder(String userId, String productId, int num) {
            String orderId = UUID.randomUUID().toString();
            System.out.println("订单创建成功:" + orderId);
            String exchangeName = "fanout-order-exchange";
            String routingKey = "";
            /*
             * @params1 交换机
             * @params2 routingKey/队列名称
             * @params3 消息内容
             * */
            rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    3.测试类

    @SpringBootTest
    class SpringbootOrderRabbitmqProducerApplicationTests {
        @Autowired
        private OrderService orderService;
        @Test
        void contextLoads() throws InterruptedException{
            orderService.makeOrder("1","1",12);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    二、进行消费者接受信息

    2.1yml文件

    server:
      port: 8082
    spring:
      rabbitmq:
        username: sakura
        password: 123456
        virtual-host: /
        host: 192.168.117.129
        port: 5672
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.2 配置类

    @Configuration
    public class DirectRabbitMqConfiguration {
        //1. 声明注册fanout模式的交换机
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange("direct-order-exchange", true, false);
        }
        //2. 声明队列
        @Bean
        public Queue emailQueueDirect() {
            return new Queue("email.direct.queue", true);
        }
        @Bean
        public Queue smsQueueDirect() {
            return new Queue("sms.direct.queue", true);
        }
        @Bean
        public Queue weixinQueueDirect() {
            return new Queue("weixin.direct.queue", true);
        }
        //3. 绑定关系
        @Bean
        public Binding emailBindingDirect() {
            return BindingBuilder.bind(emailQueueDirect()).to(directExchange()).with("email");
        }
        @Bean
        public Binding smsBindingDirect() {
            return BindingBuilder.bind(smsQueueDirect()).to(directExchange()).with("sms");
        }
        @Bean
        public Binding weixinBindingDirect() {
            return BindingBuilder.bind(weixinQueueDirect()).to(directExchange()).with("weixin");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    3.FanoutConsumer…多个消费者

    @Component
    @RabbitListener(queues = {"email.fanout.queue"})
    public class FanoutEmailConsumer {
        @RabbitHandler
        public void receiveMessage(String message) {
            System.out.println("email.queue--接收到了消息是:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    FanoutSMSConsumer:

    @Component
    @RabbitListener(queues = {"sms.fanout.queue"})
    public class FanoutSMSConsumer {
        @RabbitHandler
        public void receiveMessage(String message) {
            System.out.println("sms.queue--接收到了消息是:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    FanoutSMSConsumer:

    @Component
    @RabbitListener(queues = {"sms.fanout.queue"})
    public class FanoutSMSConsumer {
        @RabbitHandler
        public void receiveMessage(String message) {
            System.out.println("sms.queue--接收到了消息是:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    FanoutWeiXinConsumer:

    @Component
    @RabbitListener(queues = {"weixin.fanout.queue"})
    public class WeiXinConsumer {
        @RabbitHandler
        public void receiveMessage(String message) {
            System.out.println("weixin.queue--接收到了消息是:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这样每个消费者类都监听了对应的队列,都可以同步收到信息

    DIRECT模式

    3.1配置类

    @Configuration
    public class DirectRabbitMqConfiguration {
        //1. 声明注册fanout模式的交换机
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange("direct-order-exchange", true, false);
        }
        //2. 声明队列
        @Bean
        public Queue emailQueueDirect() {
            return new Queue("email.direct.queue", true);
        }
        @Bean
        public Queue smsQueueDirect() {
            return new Queue("sms.direct.queue", true);
        }
        @Bean
        public Queue weixinQueueDirect() {
            return new Queue("weixin.direct.queue", true);
        }
        //3. 绑定关系
        @Bean
        public Binding emailBindingDirect() {
            return BindingBuilder.bind(emailQueueDirect()).to(directExchange()).with("email");
        }
        @Bean
        public Binding smsBindingDirect() {
            return BindingBuilder.bind(smsQueueDirect()).to(directExchange()).with("sms");
        }
        @Bean
        public Binding weixinBindingDirect() {
            return BindingBuilder.bind(weixinQueueDirect()).to(directExchange()).with("weixin");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    3.2service:

       public void makeOrderDirect(String userId, String productId, int num) {
            String orderId = UUID.randomUUID().toString();
            System.out.println("订单创建成功:" + orderId);
            String exchangeName = "direct-order-exchange";
            /*
             * @params1 交换机
             * @params2 routingKey/队列名称
             * @params3 消息内容
             * */
            rabbitTemplate.convertAndSend(exchangeName, "email", orderId);
            rabbitTemplate.convertAndSend(exchangeName, "weixin", orderId);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.3test类

        @Test
        void contextLoadsDirect() throws InterruptedException{
            orderService.makeOrderDirect("1","1",12);
        }
    
    • 1
    • 2
    • 3
    • 4

    消费者:
    DirectEmailConsumer

    @Component
    @RabbitListener(queues = {"email.direct.queue"})
    public class DirectEmailConsumer {
        @RabbitHandler
        public void receiveMessage(String message) {
            System.out.println("email.queue--接收到了消息是:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    DirectSMSConsumer

    @Component
    @RabbitListener(queues = {"sms.direct.queue"})
    public class DirectSMSConsumer {
        @RabbitHandler
        public void receiveMessage(String message) {
            System.out.println("sms.queue--接收到了消息是:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ES中使用MQ来进行消息查询转发的情况

    1)引入依赖

    在hotel-admin、hotel-demo中引入rabbitmq的依赖:

    
    
        org.springframework.boot
        spring-boot-starter-amqp
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    2)声明队列交换机名称

    在hotel-admin和hotel-demo中的cn.itcast.hotel.constatnts包下新建一个类MqConstants

    package cn.itcast.hotel.constatnts;
    
        public class MqConstants {
        /**
         * 交换机
         */
        public final static String HOTEL_EXCHANGE = "hotel.topic";
        /**
         * 监听新增和修改的队列
         */
        public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
        /**
         * 监听删除的队列
         */
        public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
        /**
         * 新增或修改的RoutingKey
         */
        public final static String HOTEL_INSERT_KEY = "hotel.insert";
        /**
         * 删除的RoutingKey
         */
        public final static String HOTEL_DELETE_KEY = "hotel.delete";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    3.声明队列交换机(配置类)

    在hotel-demo中,定义配置类,声明队列、交换机:

    package cn.itcast.hotel.config;
    
    import cn.itcast.hotel.constants.MqConstants;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MqConfig {
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
        }
    
        @Bean
        public Queue insertQueue(){
            return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
        }
    
        @Bean
        public Queue deleteQueue(){
            return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
        }
    
        @Bean
        public Binding insertQueueBinding(){
            return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
        }
    
        @Bean
        public Binding deleteQueueBinding(){
            return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    发送MQ
    在这里插入图片描述

    3.2.5.接收MQ消息

    hotel-demo接收到MQ消息要做的事情包括:

    • 新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库
    • 删除消息:根据传递的hotel的id删除索引库中的一条数据

    1)给hotel-demo中的cn.itcast.hotel.service.impl包下的HotelService中实现业务:

    @Override
    public void deleteById(Long id) {
        try {
            // 1.准备Request
            DeleteRequest request = new DeleteRequest("hotel", id.toString());
            // 2.发送请求
            client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    
    @Override
    public void insertById(Long id) {
        try {
            // 0.根据id查询酒店数据
            Hotel hotel = getById(id);
            // 转换为文档类型
            HotelDoc hotelDoc = new HotelDoc(hotel);
    
            // 1.准备Request对象
            IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
            // 2.准备Json文档
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            // 3.发送请求
            client.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3)编写监听器

    在hotel-demo中的cn.itcast.hotel.mq包新增一个类:

    package cn.itcast.hotel.mq;
    
    import cn.itcast.hotel.constants.MqConstants;
    import cn.itcast.hotel.service.IHotelService;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HotelListener {
    
        @Autowired
        private IHotelService hotelService;
    
        /**
         * 监听酒店新增或修改的业务
         * @param id 酒店id
         */
        @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
        public void listenHotelInsertOrUpdate(Long id){
            hotelService.insertById(id);
        }
    
        /**
         * 监听酒店删除的业务
         * @param id 酒店id
         */
        @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
        public void listenHotelDelete(Long id){
            hotelService.deleteById(id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
  • 相关阅读:
    面试总结之Spring篇
    Day08--组件通信-使用属性绑定实现父向子传递数据
    几种常见采样方法及原理
    C++指针常量,常量指针以及, 引用和指针的区别
    webrtc快速入门——使用 WebRTC 拍摄静止的照片
    Fast Reed-Solomon Interactive Oracle Proofs of Proximity学习笔记
    【011】C++选择控制语句 if 和 switch 详解
    单据架构—实现页面可配置化
    网络知识学习(笔记二)
    黑客(网络安全)技术自学30天
  • 原文地址:https://blog.csdn.net/weixin_59823583/article/details/126772783