• SpringBoot 2.3.12.RELEASE整合RabittMQ 3.8.12


    • 导入依赖
    
     
        org.springframework.boot
        spring-boot-starter-amqp
    
    
        org.springframework.boot
        spring-boot-starter-web
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 编写配置文件

    application.yml:

    spring:
      rabbitmq:
        host: 192.168.31.89
        port: 5672
        username: admin
        password: admin
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Direct模式–整合SpringBoot

    • 创建生产者项目

    pom

      <!--rabbitmq-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.73</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 配置文件

    application.yml

    server:
      port: 8081
    spring:
      rabbitmq:
        host: 192.168.31.89
        port: 5672
        username: admin
        password: admin
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 编写Direct模式配置类
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    /**
     * @Description: Direct模式配置
     */
    @Configuration
    public class DirectRabbitConfig {
    
    
        /**
         * 创建一个队列,名称为 queueOne.direct.queue
         * @return
         */
        @Bean
        public Queue queueOne() {
            /**
             * 参数解析:
             * new Queue (String name, boolean durable, boolean exclusive, boolean autoDelete)
             * name:队列名称
             * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
             * exclusive:默认是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
             * autoDelete:是否自动删除队列,当没有生产者或者消费者使用此队列,该队列会自动删除。
             */
            return new Queue("queueOne.direct.queue", true);
        }
    
        /**
         * 创建一个队列,名称为 queueTwo.direct.queue
         * @return
         */
        @Bean
        public Queue queueTwo() {
            return new Queue("queueTwo.direct.queue", true);
        }
    
    
        /**
         * 创建一个交换机,名称为 direct_test_exchange
         * @return
         */
        @Bean
        public DirectExchange directTestExchange() {
            return new DirectExchange("direct_test_exchange", true, false);
        }
    
    
        /**
         * 绑定交换机和队列1
         * 设置routingKey为queueOne
         * @return
         */
        @Bean
        public Binding bindingDirectAndQueueOne() {
            return BindingBuilder.bind(queueOne()).to(directTestExchange()).with("queueOne");
        }
        /**
         * 绑定交换机和队列2
         * 设置routingKey为queueTwo
         * @return
         */
        @Bean
        public Binding bindingDirectAndQueueTwo() {
            return BindingBuilder.bind(queueTwo()).to(directTestExchange()).with("queueTwo");
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 编写消息推送代码
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    /**
     * @Description: Direct模式-生产者
     */
    @RestController
    @RequestMapping("/direct")
    public class DirectProducerController {
    
        /**
         * RabbitTemplate
         * 供了接收,发送等方法
         */
        @Autowired
        private  RabbitTemplate rabbitTemplate;
    
        /**
         * 交换机名称
         */
        private static  final String EXCHANGE_NAME = "direct_test_exchange";
    
        /**
         * 推送消息
         * @param routeKey 需要推送队列的路由key
         * @return
         */
        @GetMapping("/send/{routeKey}/{msg}")
        public String sendDirectMessage(@PathVariable("routeKey") String routeKey, @PathVariable("msg")String msg) {
    //        String routeKeyOne="queueOne";
    //        String routeKeyTwo="queueTwo";
    
            String Id= String.valueOf(UUID.randomUUID());
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String,Object> map=new HashMap<>();
            map.put("Id",Id);
            map.put("data",msg);
            map.put("createTime",createTime);
    
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, routeKey, JSONObject.toJSONString(map));
             return msg+"-推送成功";
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 实现效果
      根据不同的routeKey将消息推送给不同的消费者
    • 访问接口
    http://localhost:8081/direct/send/queueOne/aa
    
    http://localhost:8082/direct/send/queueTwo/bb
    
    • 1
    • 2
    • 3
    • 查看控制台
      在这里插入图片描述

    • 创建消费者项目
      pom与生产者一样

      <!--rabbitmq-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.73</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 配置文件

    application.yml

    server:
      port: 8082
    spring:
      rabbitmq:
        host: 192.168.31.89
        port: 5672
        username: admin
        password: admin
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 编写队列一的消费者
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Service;
    
    import java.util.Map;
    
    /**
     * @Description: Direct模式-消费者
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    //队列名称,是否自动删除
                    value = @Queue(value = "queueOne.direct.queue", autoDelete = "false"),
                    //交换机名称,指定rabbitmq模式
                    exchange = @Exchange(value = "direct_test_exchange",type = ExchangeTypes.DIRECT)
                    )
    )
    @Service
    public class DirectConsumerOneService {
    
        /**
         * 注解 @RabbitHandler 代表此方法是一个消息接收的方法。不要有返回值
         * @param testMessage
         */
        @RabbitHandler
        public void process(String testMessage) {
            System.out.println("DirectConsumerOneService : " + testMessage.toString());
            Map map = JSONObject.parseObject(testMessage, Map.class);
            System.out.println("map1-->"+map);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 编写队列二的消费者
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Service;
    
    import java.util.Map;
    
    /**
     * @Description: Direct模式-消费者
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    //队列名称,是否自动删除
                    value = @Queue(value = "queueTwo.direct.queue", autoDelete = "false"),
                    //交换机名称,指定rabbitmq模式
                    exchange = @Exchange(value = "direct_test_exchange",type = ExchangeTypes.DIRECT)
                    )
    )
    @Service
    public class DirectConsumerTwoService {
    
        /**
         * 注解 @RabbitHandler 代表此方法是一个消息接收的方法。不要有返回值
         * @param testMessage
         */
        @RabbitHandler
        public void process(String testMessage) {
            System.out.println("DirectConsumerTwoService : " + testMessage.toString());
            Map map = JSONObject.parseObject(testMessage, Map.class);
            System.out.println("map2-->"+map);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 运行消费者,消费刚才推送的aa ,bb

    在这里插入图片描述

    • 再次访问接口,推送消息,观察消费者接收情况
    http://localhost:8081/direct/send/queueOne/cc
    
    http://localhost:8081/direct/send/queueTwo/dd
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    注:以下模式的演示,都将在以上创建的项目中进行

    Fanout模式–整合SpringBoot

    • 在生产者项目中编写Fanout模式配置类
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    /**
     * @Description: Fanout模式配置
     */
    @Configuration
    public class FanoutRabbitConfig {
    
        /**
         * 创建一个队列,名称为 queueOne.fanout.queue
         * @return
         */
        @Bean
        public Queue queueFanoutOne() {
            return new Queue("queueOne.fanout.queue", true);
        }
    
        /**
         * 创建一个队列,名称为 queueTwo.fanout.queue
         * @return
         */
        @Bean
        public Queue queueFanoutTwo() {
            return new Queue("queueTwo.fanout.queue", true);
        }
    
    
    
        /**
         * 创建一个交换机,名称为 fanout_test_exchange
         * @return
         */
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanout_test_exchange");
        }
    
    
        /**
         * 绑定交换机和队列1
         * 此处是fanout模式,所以不设置routingKey
         * @return
         */
        @Bean
        Binding bindingExchangeA() {
            return BindingBuilder.bind(queueFanoutOne()).to(fanoutExchange());
        }
    
    
        /**
         * 绑定交换机和队列2
         * 此处是fanout模式,所以不设置routingKey
         * @return
         */
        @Bean
        Binding bindingExchangeB() {
            return BindingBuilder.bind(queueFanoutTwo()).to(fanoutExchange());
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 编写生产者
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    /**
     * @Description: fanout模式-生产者
     */
    @RestController
    @RequestMapping("/fanout")
    public class FanoutProducerController {
    
        /**
         * RabbitTemplate
         * 供了接收,发送等方法
         */
        @Autowired
        private  RabbitTemplate rabbitTemplate;
    
        /**
         * 交换机名称
         */
        private static  final String EXCHANGE_NAME = "fanout_test_exchange";
    
        /**
         * 推送消息
         * @return
         */
        @GetMapping("/send/{msg}")
        public String sendDirectMessage( @PathVariable("msg")String msg) {
            String Id = String.valueOf(UUID.randomUUID());
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String,Object> map=new HashMap<>();
            map.put("Id",Id);
            map.put("data",msg);
            map.put("createTime",createTime);
    
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", JSONObject.toJSONString(map));
            return msg+"-fanout模式推送成功";
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 实现效果
      以广播的形式推送消息,2个消费者都可以消费同一条消息
    • 访问接口
    http://localhost:8081/fanout/send/11
    
    http://localhost:8081/fanout/send/22
    
    • 1
    • 2
    • 3
    • 查看控制台
      在这里插入图片描述

    • 在消费者项目中编写消费者一

    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Service;
    import java.util.Map;
    
    /**
     * @Description: fanout模式-消费者
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    //队列名称,是否自动删除
                    value = @Queue(value = "queueOne.fanout.queue", autoDelete = "false"),
                    //交换机名称,指定rabbitmq模式
                    exchange = @Exchange(value = "fanout_test_exchange",type = ExchangeTypes.FANOUT)
                    )
    )
    @Service
    public class FanoutConsumerOneService {
    
        /**
         * 注解 @RabbitHandler 代表此方法是一个消息接收的方法。不要有返回值
         * @param testMessage
         */
        @RabbitHandler
        public void process(String testMessage) {
            System.out.println("FanoutConsumerOneService : " + testMessage.toString());
            Map map = JSONObject.parseObject(testMessage, Map.class);
            System.out.println("map1-->"+map);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 在消费者项目中编写消费者二
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Service;
    import java.util.Map;
    
    /**
     * @Description: fanout模式-消费者
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    //队列名称,是否自动删除
                    value = @Queue(value = "queueTwo.fanout.queue", autoDelete = "false"),
                    //交换机名称,指定rabbitmq模式
                    exchange = @Exchange(value = "fanout_test_exchange",type = ExchangeTypes.FANOUT)
                    )
    )
    @Service
    public class FanoutConsumerTwoService {
    
        /**
         * 注解 @RabbitHandler 代表此方法是一个消息接收的方法。不要有返回值
         * @param testMessage
         */
        @RabbitHandler
        public void process(String testMessage) {
            System.out.println("FanoutConsumerTwoService : " + testMessage.toString());
            Map map = JSONObject.parseObject(testMessage, Map.class);
            System.out.println("map2-->"+map);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 运行消费者,消费刚才推送的11 ,22

    在这里插入图片描述

    • 再次访问接口,推送消息,观察消费者接收情况
    http://localhost:8081/fanout/send/33
    
    http://localhost:8081/fanout/send/44
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    Topic 模式–整合SpringBoot

    • 在生产者项目中编写topic 模式配置类
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @Description: Topic模式配置
     */
    @Configuration
    public class TopicRabbitConfig {
    
    
        /**
         * 创建一个队列,名称为 queueOne.topic.queue
         * @return
         */
        @Bean
        public Queue queueTopicOne() {
            return new Queue("queueOne.topic.queue", true);
        }
    
        /**
         * 创建一个队列,名称为 queueTwo.topic.queue
         * @return
         */
        @Bean
        public Queue queueTopicTwo() {
            return new Queue("queueTwo.topic.queue", true);
        }
    
    
    
        /**
         * 创建一个交换机,名称为topic_test_exchange
         * @return
         */
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("topic_test_exchange");
        }
    
    
        /**
         * 绑定交换机和队列1
         * 匹配规则为:以queueOne为开头的且单词长度为3
         * @return
         */
        @Bean
        Binding bindingExchangeMessage() {
            return BindingBuilder.bind(queueTopicOne()).to(exchange()).with("queueOne.*.*");
        }
    
        /**
         * 绑定交换机和队列2
         * 匹配规则为:包含单词topic
         * @return
         */
        @Bean
        Binding bindingExchangeMessage2() {
            return BindingBuilder.bind(queueTopicTwo()).to(exchange()).with("#.topic.#");
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 编写生产者
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    /**
     * @Description: topic模式-生产者
     */
    @RestController
    @RequestMapping("/topic")
    public class TopicProducerController {
    
        /**
         * RabbitTemplate
         * 供了接收,发送等方法
         */
        @Autowired
        private  RabbitTemplate rabbitTemplate;
    
        /**
         * 交换机名称
         */
        private static  final String EXCHANGE_NAME = "topic_test_exchange";
    
        /**
         * 推送消息
         * @param routeKey 需要推送队列的路由key
         * @return
         */
        @GetMapping("/send/{routeKey}/{msg}")
        public String sendDirectMessage(@PathVariable("routeKey") String routeKey, @PathVariable("msg")String msg) {
            String messageId = String.valueOf(UUID.randomUUID());
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String,Object> map=new HashMap<>();
            map.put("Id",messageId);
            map.put("data",msg);
            map.put("createTime",createTime);
    
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, routeKey, JSONObject.toJSONString(map));
            return msg+"-topic模式推送成功";
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 实现效果
      按照不同的匹配规则,输入不同的匹配路径,会推送到与匹配路径相符合的队列中
    • 访问接口
    http://localhost:8081/topic/send/queueOne.test.aa/11
    http://localhost:8081/topic/send/queueOne.test.aa/22
    
    http://localhost:8081/topic/send/topic/33
    http://localhost:8081/topic/send/topic/44
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 查看控制台
      在这里插入图片描述
    • 在消费者项目中编写消费者一
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Service;
    
    import java.util.Map;
    
    /**
     * @Description: topic模式-消费者
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    //队列名称,是否自动删除
                    value = @Queue(value = "queueOne.topic.queue", autoDelete = "false"),
                    //交换机名称,指定rabbitmq模式
                    exchange = @Exchange(value = "topic_test_exchange",type = ExchangeTypes.TOPIC)
                    )
    )
    @Service
    public class TopicConsumerOneService {
    
        /**
         * 注解 @RabbitHandler 代表此方法是一个消息接收的方法。不要有返回值
         * @param testMessage
         */
        @RabbitHandler
        public void process(String testMessage) {
            System.out.println("TopicConsumerOneService : " + testMessage.toString());
            Map map = JSONObject.parseObject(testMessage, Map.class);
            System.out.println("map1-->"+map);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 在消费者项目中编写消费者二
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Service;
    
    import java.util.Map;
    
    /**
     * @Description: fanout模式-消费者
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    //队列名称,是否自动删除
                    value = @Queue(value = "queueTwo.topic.queue", autoDelete = "false"),
                    //交换机名称,指定rabbitmq模式
                    exchange = @Exchange(value = "topic_test_exchange",type = ExchangeTypes.TOPIC)
                    )
    )
    @Service
    public class TopicConsumerTwoService {
    
        /**
         * 注解 @RabbitHandler 代表此方法是一个消息接收的方法。不要有返回值
         * @param testMessage
         */
        @RabbitHandler
        public void process(String testMessage) {
            System.out.println("TopicConsumerTwoService : " + testMessage.toString());
            Map map = JSONObject.parseObject(testMessage, Map.class);
            System.out.println("map2-->"+map);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 运行消费者,消费刚才推送的11 ,223344
      在这里插入图片描述

    • 再次访问接口,推送消息,观察消费者接收情况

    http://localhost:8081/topic/send/queueOne.test.aa/55
    
    http://localhost:8081/topic/send/topic/66
    
    • 1
    • 2
    • 3

    在这里插入图片描述

  • 相关阅读:
    ubuntu下vscode的安装包
    外骨骼机器人混战:程天科技做“深”,傅利叶智能做“广”
    枚举与反射
    文字悬停效果
    ThreadLocal
    字节跳动面试真题-最长回文子串
    Google Earth Engine(GEE)——一个免费下载Landsat影像的APP
    C# Onnx Yolov8 Detect Poker 扑克牌识别
    详解nvim内建LSP体系与基于nvim-cmp的代码补全体系
    jdk 8 List相关知识点
  • 原文地址:https://blog.csdn.net/qq_46122292/article/details/125871455