• java中使用rabbitmq



    前言

    mq常用于业务解耦、流量削峰和异步通信,rabbitmq是使用范围较广,比较稳定的一款开源产品,接下来我们使用springboot的starter来引入rabbitmq,了解mq的几种使用模式,通过几个简单的案例,让你可以快速地了解到该使用哪种模式来对应业务场景,使用rabbitmq看这一篇就够了,下方附安装链接。


    一、引入和配置

    1.引入

    Spring AMQP高级消息队列协议有两部分组成,spring-amqp是基础抽象,spring-rabbit是RabbitMQ实现。

    <dependency>
    	<groupId>org.springframework.bootgroupId>
    	<artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    2.配置

    配置参考RabbitProperties.java

    spring:
      rabbitmq:
        host: 192.168.137.192
        port: 5672
        username: guest
        password: guest
        virtualHost: /
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    二、使用

    1.队列

    在这里插入图片描述
    RabbitConfiguration

    package com.student.rabbit.queue;
    
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Bean;
    import org.springframework.amqp.core.Queue;
    /**
     * Create by zjg on 2024/3/9
     */
    @Configuration
    public class RabbitConfiguration {
        protected final String queueName = "queue";
        @Bean
        public Queue queue() {
            return new Queue(this.queueName);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    Producer

    package rabbit.queue;
    
    import com.student.SpringbootStart;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Create by zjg on 2024/3/9
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = SpringbootStart.class)
    public class Producer {
        @Autowired
        private RabbitTemplate template;
        @Autowired
        private Queue queue;
    
        AtomicInteger count = new AtomicInteger(0);
        @Test
        public void send() {
            for (int i = 0; i < 10; i++) {
                StringBuilder builder = new StringBuilder("Hello");
                builder.append(" "+count.incrementAndGet());
                String message = builder.toString();
                template.convertAndSend(queue.getName(), message);
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    Consumer

    package com.student.rabbit.queue;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Create by zjg on 2024/3/9
     */
    @Component
    public class Consumer {
        private static final Logger log = LoggerFactory.getLogger(Consumer.class);
        protected final String queueName = "queue";
        @RabbitListener(queues = queueName)
        public void receive1(String message){
            log.debug("receive1:"+message);
        }
        @RabbitListener(queues = queueName)
        public void receive2(String message){
            log.debug("receive2:"+message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    每个队列都消费了5条消息
    在这里插入图片描述

    2.发布/订阅

    交换机类型有fanout,direct, topic, headers四种,接下来我们来学习每种方式的使用以及它们的区别。

    2.1 fanout(广播)

    P(生产者)产生消息给到X(交换机),X分发给绑定的所有队列。

    在这里插入图片描述

    RabbitFanoutConfiguration
    我们定义了AnonymousQueue,它创建了一个具有生成名称的非持久、独占、自动删除队列

    package com.student.rabbit.fanout;
    
    import org.springframework.amqp.core.AnonymousQueue;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @Configuration
    public class RabbitFanoutConfiguration {
        @Bean
        public FanoutExchange fanout() {
            return new FanoutExchange("sys.fanout");
        }
        private static class ReceiverConfig {
            @Bean
            public Queue fanoutQueue1() {
                return new AnonymousQueue();
            }
            @Bean
            public Queue fanoutQueue2() {
                return new AnonymousQueue();
            }
            @Bean
            public Binding bindingFanout1(FanoutExchange fanout,Queue fanoutQueue1) {
                return BindingBuilder.bind(fanoutQueue1).to(fanout);
            }
            @Bean
            public Binding bindingFanout2(FanoutExchange fanout,Queue fanoutQueue2) {
                return BindingBuilder.bind(fanoutQueue2).to(fanout);
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    FanoutProducer

    package rabbit.fanout;
    
    import com.student.SpringbootStart;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = SpringbootStart.class)
    public class FanoutProducer {
        @Autowired
        private RabbitTemplate template;
        @Autowired
        private FanoutExchange fanout;
        @Test
        public void send() {
            AtomicInteger count = new AtomicInteger(0);
            for (int i = 0; i < 10; i++) {
                StringBuilder builder = new StringBuilder("Hello");
                builder.append(" "+count.incrementAndGet());
                String message = builder.toString();
                template.convertAndSend(fanout.getName(), "", message);
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    FanoutConsumer

    package com.student.rabbit.fanout;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @Component
    public class FanoutConsumer {
        private static final Logger log = LoggerFactory.getLogger(FanoutConsumer.class);
        @RabbitListener(queues = "#{fanoutQueue1.name}")
        public void receive1(String message){
            log.debug("receive1:"+message);
        }
        @RabbitListener(queues = "#{fanoutQueue2.name}")
        public void receive2(String message){
            log.debug("receive2:"+message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    总共发送10条消息,每个队列都消费了10条
    在这里插入图片描述

    2.2 direct(Routing/路由)

    可以将根据不同的路由规则分发消息,很灵活,消费者需要哪种就订阅哪种消息。

    在这里插入图片描述
    在这里插入图片描述
    RabbitDirectConfiguration

    package com.student.rabbit.direct;
    
    import org.springframework.amqp.core.AnonymousQueue;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @Configuration
    public class RabbitDirectConfiguration {
        @Bean
        public DirectExchange direct() {
            return new DirectExchange("sys.direct");
        }
    
        private static class ReceiverConfig {
            @Bean
            public Queue directQueue1() {
                return new AnonymousQueue();
            }
            @Bean
            public Queue directQueue2() {
                return new AnonymousQueue();
            }
            @Bean
            public Binding bindingDirect1a(DirectExchange direct,Queue directQueue1) {
                return BindingBuilder.bind(directQueue1).to(direct).with("orange");
            }
            @Bean
            public Binding bindingDirect1b(DirectExchange direct,Queue directQueue1) {
                return BindingBuilder.bind(directQueue1).to(direct).with("black");
            }
            @Bean
            public Binding bindingDirect2a(DirectExchange direct,Queue directQueue2) {
                return BindingBuilder.bind(directQueue2).to(direct).with("green");
            }
            @Bean
            public Binding bindingDirect2b(DirectExchange direct,Queue directQueue2) {
                return BindingBuilder.bind(directQueue2).to(direct).with("black");
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    DirectProducer

    package rabbit.direct;
    
    import com.student.SpringbootStart;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = SpringbootStart.class)
    public class DirectProducer {
        @Autowired
        private RabbitTemplate template;
        @Autowired
        private DirectExchange direct;
        private final String[] keys = {"orange", "black", "green"};
        @Test
        public void send() {
            AtomicInteger count = new AtomicInteger(0);
            for (int i = 0; i < keys.length; i++) {
                StringBuilder builder = new StringBuilder("Hello to ");
                String key = keys[count.getAndIncrement()];
                builder.append(" "+key);
                String message = builder.toString();
                template.convertAndSend(direct.getName(), key, message);
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    DirectConsumer

    package com.student.rabbit.direct;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @Component
    public class DirectConsumer {
        private static final Logger log = LoggerFactory.getLogger(DirectConsumer.class);
        @RabbitListener(queues = "#{directQueue1.name}")
        public void receive1(String message){
            log.debug("receive1:"+message);
        }
        @RabbitListener(queues = "#{directQueue2.name}")
        public void receive2(String message){
            log.debug("receive2:"+message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    共发送了3条消息,有两个队列都绑定了black,所以black的消息消费2次
    在这里插入图片描述

    2.3 Topics(主题)

    主题模式在路由的基础上增加了routingKey的模糊匹配。
    *(星)可以代替一个词。
    #(hash)可以代替零个或多个单词。

    在这里插入图片描述
    RabbitTopicConfiguration

    package com.student.rabbit.topic;
    
    import org.springframework.amqp.core.AnonymousQueue;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @Configuration
    public class RabbitTopicConfiguration {
        @Bean
        public TopicExchange topic() {
            return new TopicExchange("sys.topic");
        }
    
        private static class ReceiverConfig {
            @Bean
            public Queue topicQueue1() {
                return new AnonymousQueue();
            }
            @Bean
            public Queue topicQueue2() {
                return new AnonymousQueue();
            }
            @Bean
            public Binding bindingTopic1a(TopicExchange topic,Queue topicQueue1) {
                return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");
            }
            @Bean
            public Binding bindingTopic1b(TopicExchange topic,Queue topicQueue1) {
                return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit");
            }
            @Bean
            public Binding bindingTopic2a(TopicExchange topic,Queue topicQueue2) {
                return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");
            }
            @Bean
            public Binding bindingTopic2b(TopicExchange topic,Queue topicQueue2) {
                return BindingBuilder.bind(topicQueue2).to(topic).with("quick.brown.*");
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    TopicProducer

    package rabbit.topic;
    
    import com.student.SpringbootStart;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = SpringbootStart.class)
    public class TopicProducer {
        @Autowired
        private RabbitTemplate template;
        @Autowired
        private TopicExchange topic;
        private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
                "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};
        @Test
        public void send() {
            AtomicInteger count = new AtomicInteger(0);
            for (int i = 0; i < keys.length; i++) {
                StringBuilder builder = new StringBuilder("Hello to ");
                String key = keys[count.getAndIncrement()];
                builder.append(" "+key);
                String message = builder.toString();
                template.convertAndSend(topic.getName(), key, message);
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    TopicConsumer

    package com.student.rabbit.topic;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @Component
    public class TopicConsumer {
        private static final Logger log = LoggerFactory.getLogger(TopicConsumer.class);
        @RabbitListener(queues = "#{topicQueue1.name}")
        public void receive1(String message){
            log.debug("receive1:"+message);
        }
        @RabbitListener(queues = "#{topicQueue2.name}")
        public void receive2(String message){
            log.debug("receive2:"+message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    队列1匹配了中间值为orange和rabbit结尾的消息,队列2匹配了lazy开头和quick.brown开头的消息
    在这里插入图片描述

    2.4 Headers

    关于headers模式,在官方没有找到文档,但包里还有,索性还是写一下吧。

    RabbitHeadersConfiguration

    package com.student.rabbit.headers;
    
    import org.springframework.amqp.core.AnonymousQueue;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.HeadersExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @Configuration
    public class RabbitHeadersConfiguration {
        @Bean
        public HeadersExchange headers() {
            return new HeadersExchange("sys.headers");
        }
    
        private static class ReceiverConfig {
            @Bean
            public Queue headersQueue1() {
                return new AnonymousQueue();
            }
            @Bean
            public Queue headersQueue2() {
                return new AnonymousQueue();
            }
            @Bean
            public Queue headersQueue3() {
                return new AnonymousQueue();
            }
            @Bean
            public Binding bindingHeaders1(HeadersExchange headers,Queue headersQueue1) {
                Map<String,Object> headerValue=new HashMap<>();
                headerValue.put("user","sys");
                return BindingBuilder.bind(headersQueue1).to(headers).whereAll(headerValue).match();
            }
            @Bean
            public Binding bindingHeaders2(HeadersExchange headers,Queue headersQueue2) {
                Map<String,Object> headerValue=new HashMap<>();
                headerValue.put("user","admin");
                return BindingBuilder.bind(headersQueue2).to(headers).whereAll(headerValue).match();
            }
            @Bean
            public Binding bindingHeaders3(HeadersExchange headers,Queue headersQueue3) {
                return BindingBuilder.bind(headersQueue3).to(headers).where("user").exists();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    HeadersProducer

    package rabbit.headers;
    
    import com.student.SpringbootStart;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.HeadersExchange;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = SpringbootStart.class)
    public class HeadersProducer {
        @Autowired
        private RabbitTemplate template;
        @Autowired
        private HeadersExchange headers;
        private final String[] keys = {"sys", "admin"};
        @Test
        public void send() {
            AtomicInteger count = new AtomicInteger(0);
            for (int i = 0; i < keys.length; i++) {
                StringBuilder builder = new StringBuilder("Hello to ");
                String key = keys[count.getAndIncrement()];
                builder.append(" "+key);
                MessageProperties messageProperties=new MessageProperties();
                messageProperties.setHeader("user",key);
                Message message = MessageBuilder.withBody(builder.toString().getBytes()).andProperties(messageProperties).build();
                template.send(headers.getName(), "", message);
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    HeadersConsumer

    package com.student.rabbit.headers;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Create by zjg on 2024/3/10
     */
    @Component
    public class HeadersConsumer {
        private static final Logger log = LoggerFactory.getLogger(HeadersConsumer.class);
        @RabbitListener(queues = "#{headersQueue1.name}")
        public void receive1(Message message){
            log.debug("receive1:"+new String(message.getBody()));
        }
        @RabbitListener(queues = "#{headersQueue2.name}")
        public void receive2(Message message){
            log.debug("receive2:"+new String(message.getBody()));
        }
        @RabbitListener(queues = "#{headersQueue3.name}")
        public void receive3(Message message){
            log.debug("receive3:"+new String(message.getBody()));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    第一个队列接收sys消息,第二个队列接收admin消息,第三个队列只要包含user头的消息都接收。
    在这里插入图片描述


    总结

    回到顶部
    安装看这里
    官方文档
    官方网站
    其他项目,可参考官方案例
    路漫漫其修远兮,吾将上下而求索。

  • 相关阅读:
    vue——VM对象和基础指令
    Vb6 TCP Server服务端监听多个RFID读卡器客户端上传的刷卡数据
    想要精通算法和SQL的成长之路 - K次取反后最大化的数组和
    新加坡国立大学『3D计算机视觉』课程;Python爬虫知识库;基于SKLearn时序预测模块;从零构建AI推理引擎;前沿论文 | ShowMeAI资讯日报
    1.12 进程注入ShellCode套接字
    SparkCore系列-10、Spark 内核调度
    《微信小程序-进阶篇》Lin-ui组件库源码分析-Button组件(二)
    c++day3
    Flink处理函数(一)
    【毕业设计】大数据大众点评评论文本分析 - python 数据挖掘
  • 原文地址:https://blog.csdn.net/qq_44824164/article/details/136591641