• RabbitMQ(七)【SpringBoot案例】


    七、RabbitMQ - SpringBoot案例


    上一篇文章基础入门案例

    整体核心

    在这里插入图片描述

    7.1 fanout 模式

    生产者:交换机绑定队列

    在这里插入图片描述

    1. 创建一个 springboot 项目springboot-order-rabbitmq-producer

    在这里插入图片描述

    1. 配置文件application.yml
    # 服务端口
    server:
      port: 8080
    
    # 配置rabbitmq
    spring:
      rabbitmq:
        username: admin
        password: admin
        virtual-host: /
        host: 192.168.159.100
        port: 5672
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    1. 添加依赖
        <dependencies>
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
            <dependency>
                <groupId>org.springframework.amqpgroupId>
                <artifactId>spring-rabbit-testartifactId>
                <scope>testscope>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    1. 编写一个OrderServiceImpl.java
    package com.vinjcent.rabbitmq;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.UUID;
    
    @SuppressWarnings("all")
    @Service
    public class OrderServiceImpl {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        public void createOrderFanout(){
            // 1.根据商品id查询库存是否充足
            // 2.保存订单
            String orderId = UUID.randomUUID().toString();
            // 3.通过MQ来完成消息的分发
            // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
            String exchangeName = "fanout_order_exchange";
            String routingKey = "";
            rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    1. 编写FanoutRabbitMQConfiguration.java
    package com.vinjcent.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutRabbitMQConfiguration {
    
        // 1.声明注册fanout模式交换机
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("fanout_order_exchange",true,false);
        }
    
    
        // 2.声明队列 account.fanout.queue、express.fanout.queue、sms.fanout.queue
        @Bean
        public Queue fanout_accountQueue(){
            return new Queue("account.fanout.queue",true);
        }
        @Bean
        public Queue fanout_expressQueue(){
            return new Queue("express.fanout.queue",true);
        }
        @Bean
        public Queue fanout_smsQueue(){
            return new Queue("sms.fanout.queue",true);
        }
        // 3.完成绑定关系(队列)
        @Bean
        public Binding fanout_accountBinding(){
            return BindingBuilder.bind(fanout_accountQueue()).to(fanoutExchange());
        }
        @Bean
        public Binding fanout_expressBinding(){
            return BindingBuilder.bind(fanout_expressQueue()).to(fanoutExchange());
        }
        @Bean
        public Binding fanout_smsBinding(){
            return BindingBuilder.bind(fanout_smsQueue()).to(fanoutExchange());
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    1. 在web界面中将队列 queue 和交换机 exchange 清空

    在这里插入图片描述

    在这里插入图片描述

    1. 运行测试
    @SpringBootTest
    class SpringbootOrderRabbitmqProducerApplicationTests {
    
        @Autowired
        OrderServiceImpl orderService;
    
        @Test
        void contextLoads() {
            orderService.createOrderFanout("1","1",12);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    1. 观察web界面中的消息队列和交换机

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    消费者接收消息

    1. 创建一个 springboot 项目springboot-order-rabbitmq-consumer

    在这里插入图片描述

    1. 配置文件
    # 应用服务 WEB 访问端口
    server:
      port: 80
    
    # 配置rabbitmq
    spring:
      rabbitmq:
        username: admin
        password: admin
        virtual-host: /
        host: 192.168.159.100
        port: 5672
      application:
        # 应用名称
        name: springboot-order-rabbitmq-consumer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    1. service包下添加一个fanout包,添加以下类,注入Spring容器中

    (1)FanoutAccountConsumer.java监听队列account.fanout.queueexpress.fanout.queuesms.fanout.queue

    package com.vinjcent.rabbitmq.service.fanout;
    
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service
    @RabbitListener(queues = {"account.fanout.queue"})
    public class FanoutAccountConsumer {
    
        @RabbitHandler
        public void receiveMessage(String message){
            System.out.println("account.fanout.queue===>" + message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    (2)FanoutExpressConsumer.java

    package com.vinjcent.rabbitmq.service.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service
    @RabbitListener(queues = {"express.fanout.queue"})
    public class FanoutExpressConsumer {
    
        @RabbitHandler
        public void receiveMessage(String message){
            System.out.println("express.fanout.queue===>" + message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    (3)FanoutSMSConsumer.java

    package com.vinjcent.rabbitmq.service.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service
    @RabbitListener(queues = {"sms.fanout.queue"})
    public class FanoutSMSConsumer {
    
        @RabbitHandler
        public void receiveMessage(String message){
            System.out.println("sms.fanout.queue===>" + message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    1. 运行springboot-order-rabbitmq-consumer工程

    2. 查看控制台打印结果

    在这里插入图片描述

    在这里插入图片描述

    7.2 direct 模式

    1)生产者:交换机绑定队列

    1. springboot-order-rabbitmq-producer工程的service包下OrderServiceImpl.java添加如下;并配置一个DirectRabbitMQConfiguration.java

    OrderServiceImpl.java

    package com.vinjcent.rabbitmq;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.UUID;
    
    @SuppressWarnings("all")
    @Service
    public class OrderServiceImpl {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        // fanout模式...
    
    
        // direct模式
        public void createOrderDirect(String userId, String productId,int num){
            // 1.根据商品id查询库存是否充足
            // 2.保存订单
            String orderId = UUID.randomUUID().toString();
            System.out.println("订单生产成功: " + orderId);
            // 3.通过MQ来完成消息的分发
            // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
            String exchangeName = "direct_order_exchange";
            String routingKey = "";
            // 根据路由推送消息
            rabbitTemplate.convertAndSend(exchangeName, "account", orderId);
            rabbitTemplate.convertAndSend(exchangeName, "express", orderId);
    
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    DirectRabbitMQConfiguration.java

    package com.vinjcent.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class DirectRabbitMQConfiguration {
    
    
        // 1.声明注册fanout模式交换机
        @Bean
        public DirectExchange directExchange(){
            return new DirectExchange("direct_order_exchange",true,false);
        }
    
    
        // 2.声明队列 account.direct.queue、express.direct.queue、sms.direct.queue
        @Bean
        public Queue direct_accountQueue(){
            return new Queue("account.direct.queue",true);
        }
        @Bean
        public Queue direct_expressQueue(){
            return new Queue("express.direct.queue",true);
        }
        @Bean
        public Queue direct_smsQueue(){
            return new Queue("sms.direct.queue",true);
        }
        // 3.完成绑定关系(队列)
        // direct模式比fanout模式多了一个路由key
        @Bean
        public Binding direct_accountBinding(){
            return BindingBuilder.bind(direct_accountQueue()).to(directExchange()).with("account");
        }
        @Bean
        public Binding direct_expressBinding(){
            return BindingBuilder.bind(direct_expressQueue()).to(directExchange()).with("express");
        }
        @Bean
        public Binding direct_smsBinding(){
            return BindingBuilder.bind(direct_smsQueue()).to(directExchange()).with("sms");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    1. 测试类
    @SpringBootTest
    class SpringbootOrderRabbitmqProducerApplicationTests {
    
        @Autowired
        OrderServiceImpl orderService;
    
        @Test
        void testFanout() {
            orderService.createOrderFanout("1","1",12);
        }
    
        // 运行该测试用例
        @Test
        void testDirect() {
            orderService.createOrderDirect("1","1",12);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    1. 在web界面观察交换机与队列情况

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    先要启动生产者再自动消费者,不然交换机不存在会报错

    2)消费者接收消息

    1. 在项目springboot-order-rabbitmq-consumer工程中的service包下添加一个direct包,添加以下类,注入Spring容器中

    (1)DirectAccountConsumer.java监听队列account.direct.queue

    @Service
    @RabbitListener(queues = {"account.direct.queue"})
    public class DirectAccountConsumer {
    
        @RabbitHandler
        public void receiveMessage(String message){
            System.out.println("account.direct.queue===>" + message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    (2)DirectExpressConsumer.java监听队列express.direct.queue

    @Service
    @RabbitListener(queues = {"express.direct.queue"})
    public class DirectExpressConsumer {
    
        @RabbitHandler
        public void receiveMessage(String message){
            System.out.println("express.direct.queue===>" + message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    (3)DirectSMSConsumer.java监听队列sms.direct.queue

    @Service
    @RabbitListener(queues = {"sms.direct.queue"})
    public class DirectSMSConsumer {
    
        @RabbitHandler
        public void receiveMessage(String message){
            System.out.println("sms.direct.queue===>" + message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 运行springboot-order-rabbitmq-consumer

    2. 查看控制台打印结果

    在这里插入图片描述

    7.3 topic 模式

    使用注解方式实现绑定

    消费者

    1. springboot-order-rabbitmq-consumer工程下的service包下添加一个topic包,添加以下类,注入Spring容器中

    (1)TopicAccountConsumer.java

    package com.vinjcent.rabbitmq.service.topic;
    
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Service;
    
    @Service
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "account.topic.queue", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
            key = "#.account.#"
    ))
    public class TopicAccountConsumer {
    
        @RabbitHandler
        public void receiveMessage(String message){
            System.out.println("account.topic.queue===>" + message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    (2)TopicExpressConsumer.java

    package com.vinjcent.rabbitmq.service.topic;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Service;
    
    @Service
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "express.topic.queue", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
            key = "*.express.#"
    ))
    public class TopicExpressConsumer {
    
        @RabbitHandler
        public void receiveMessage(String message){
            System.out.println("express.topic.queue===>" + message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    (3)TopicSMSConsumer.java

    package com.vinjcent.rabbitmq.service.topic;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Service;
    
    @Service
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "sms.topic.queue", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
            key = "sms.#"
    ))public class TopicSMSConsumer {
    
        @RabbitHandler
        public void receiveMessage(String message){
            System.out.println("sms.topic.queue===>" + message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    1. 启动springboot-order-rabbitmq-consumer
    2. 查看web界面

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    推荐使用配置类的方式实现交换机与队列的绑定,以及路由key规则

    生产者

    1. springboot-order-rabbitmq-producer工程下,service包下添加以下内容
    package com.vinjcent.rabbitmq;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.UUID;
    
    @SuppressWarnings("all")
    @Service
    public class OrderServiceImpl {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        // fanout模式
        public void createOrderFanout(String userId, String productId,int num){
            //......
        }
    
        // direct模式
        public void createOrderDirect(String userId, String productId,int num){
            //......
        }
    
        // topic
        public void createOrderTopic(String userId, String productId,int num){
            // 1.根据商品id查询库存是否充足
            // 2.保存订单
            String orderId = UUID.randomUUID().toString();
            System.out.println("订单生产成功: " + orderId);
            // 3.通过MQ来完成消息的分发
            // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
            String exchangeName = "topic_order_exchange";
            // 路由给三个消息队列推送消息
            String routingKey = "sms.express.account.xxxxx";
    
            /*
             *  #.account.#
             *  *.express.#
             *  sms.#
             */
            rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    1. 运行测试用例,生产消息
    @SpringBootTest
    class SpringbootOrderRabbitmqProducerApplicationTests {
    
        @Autowired
        OrderServiceImpl orderService;
    
        @Test
        void contextLoads() {
            orderService.createOrderFanout("1","1",12);
        }
    
        @Test
        void testDirect() {
            orderService.createOrderDirect("1","1",12);
        }
    
        // 运行该测试用例
        @Test
        void testTopic() {
            orderService.createOrderTopic("1","1",12);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    在这里插入图片描述

    下一篇文章RabbitMQ高级 - 过期时间 TTL

  • 相关阅读:
    22、阻塞模式调试1(一个客户端)
    selenium网页自动化使用教程
    Springboot+JPA+ORACLE12C项目hibernate生成的SQL语句中schema."小写表名"导致的“ORA-00942 表或视图不存在”问题,求解决方案。
    【VisualStudio 】VisualStudio2022 项目模板
    Flask 学习-36.Flask-RESTful 序列化输出对象
    【MySql】深入了解 MySQL 中的 INNER JOIN 和 OUTER JOIN
    centos怎么禁用和关闭selinux
    Ubuntu16.4安装搜狗拼音输入法
    Linux Podman安装DVWA靶场环境
    Linux运行级别指定
  • 原文地址:https://blog.csdn.net/Wei_Naijia/article/details/126573233