• SpringCloud(九)——RabbitMQ简单了解


    1. 同步通讯与异步通讯

    1. 同步通讯
      同步通讯就像打电话,小明和小王正在连线,那么小李打进来肯定是打不通的,需要小明打完电话,小李才能进行连线。

      那么在我们最初写代码时也是这个道理,当我们用户发起一个请求时,请求选择需要的服务,需要的服务再去调其他服务,当所有流程都做完之后返回结果给用户。

      同步通讯的优点是时效性强,可以立即得到结果,缺点如下

      • 耦合度高
      • 性能和吞吐能力下降
      • 有额外的资源消耗
      • 有级联失败问题
    2. 异步通讯
      相对于同步通讯,异步通讯像是发消息,小王在与小明聊天时也可以接收小李的信息,回完小明的消息就可以回小李的消息。

      微服务中可以对用户购买商品做一些处理来提高效率,我们可以下完订单就返回购买成功,剩下的事情交给后台来做。

      异步通讯的大致流程如下:
      在这里插入图片描述
      一个服务只需要将消息传递给Broker即可,之后Broker再调用这些事件,而由于服务仅调取了Broker以及等待Broker的响应,所以耗时极短,且该用时不会随着服务的功能增加而增加。

      异步通讯的优点如下:

      • 耦合度低
      • 吞吐量提升
      • 故障隔离
      • 流量削峰

      异步通讯的缺点如下:

      • 依赖于Broker的可靠性、安全性、吞吐能力
      • 架构复杂了,业务没有明显的流程线,不好追踪管理

    2. MQ 介绍

    MQ(Message Queue)即消息队列,就是存放消息的队列,也就是上面的事件驱动架构的Broker。

    RabbitMQActiveMQRocketMQKafka
    公司/社区RabbitApache阿里Apache
    开发语言ErlangJavaJavaScala&Java
    协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
    可用性一般
    单机吞吐量一般非常高
    消息延迟微秒级毫秒级毫秒级毫秒以内
    消息可靠性一般一般

    接下来我们介绍的就是上面的RabbitMQ。

    3. RabbitMQ运行

    有了Docker后,RabbitMQ的安装十分简单,只需要在Docker镜像仓库中pull一下即可,安装命令如下:

    docker pull rabbitmq:3-management 
    
    • 1

    待安装完后,使用 docker images 命令可以看到RabbitMQ镜像已经安装完成。

    需要运行RabbitMQ时,执行的命令如下:

    docker run \
     -e RABBITMQ_DEFAULT_USER=suppose \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     --name mq \
     --hostname mq1 \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3-management
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    上述命令中的 -e 是指环境变量,这里配置的是用户名以及密码; --name 是为容器起别名, --hostname 是为主机起别名, -p 15672:15672 是RabbitMQ提供的UI界面的端口,-p 5672:5672 是指RabbitMQ提供的消息通信的端口,也就是说发消息收消息都需要通过这个端口;-d 指在后台运行。

    运行上面的代码后,输入虚拟机的IP地址,并访问 15672 这个端口号,再输入上面设置的账号和密码进行登录,在这一步,我的使用谷歌浏览器一直访问不了,一直报 undefined: There is no template at js/tmpl/login.ejs undefined 的错误,这个时候,我换了Edge浏览器就好了,能够重新登陆。

    RabbitMQ的界面如下:
    在这里插入图片描述
    可以看到,上面有六个切换栏,其每个状态栏包含的信息如下:

    • Overview:总览,包含有所有结点和集群的信息;
    • Connection:连接,消息发布者和被通知者都应该与RabbitMQ先进行连接;
    • Channels:通道,建立连接后必须通过通道进行消息的发送;
    • Exchanges:交换机,消息的路由器,用来转发消息到不同的队列;
    • Queue:队列,用来做消息的存储;
    • Admin:管理。可以管理用户信息,每个用户都可以建立一个虚拟主机,用来隔离每个用户的访问。

    4. RabbitMQ 模型

    打开RabbitMQ的官方网站的入门页面,可以发现RabbitMQ所提供的的消息模型以及介绍,RabbitMQ中与消息有关的模型有五种。

    4.1 五种模型简介

    • 基本消息队列

      第一种消息模型名为 “Hello World!”,该消息模型的流程示意图如下所示,
      在这里插入图片描述
      在该情况下,只有一个接受者,使用异步通讯,但是没有使用到交换机。

    • 工作消息队列

      该队列名为 Work Queues,该模型的流程示意图如下:
      在这里插入图片描述
      在该情况下,可以有多个接受者进行异步通讯,但是同样也没有交换机。

    • 发布订阅

      该模式下的消息模型已经拥有了交换机,根据交换机类型的不同,分为三种类型。

      1. Fanout Exchange:广播

        模型示意图如下:
        在这里插入图片描述

      2. Direct Exchange:路由

        模型示意图如下:
        在这里插入图片描述

      3. Topic Exchange:主题

        模型示意图如下:
        在这里插入图片描述

    4.2 实现基本消息队列

    接下来我们以第一种消息模型 “Hello World!” 为例,实现消息的发布与订阅。

    4.2.1 消息发布者

    首先是消息的发布者,对消息的发布者而言,其需要知道主机名、端口号、虚拟主机等信息,然后创建通道,创建队列,将消息发送到队列中去,基本代码如下:

    public class PublisherTest {
        @Test
        public void testSendMessage() throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数
            // 设置主机名
            factory.setHost("192.168.59.233");
            // 设置端口号,为队列通信的端口号,不是可视化界面的端口号
            factory.setPort(5672);
            // 设置RabbitMQ的虚拟主机
            factory.setVirtualHost("/");
            // 设置用户名
            factory.setUsername("suppose");
            // 设置密码
            factory.setPassword("123456");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.发送消息
            String message = "hello, rabbitmq!";
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("发送消息成功:【" + message + "】");
    
            // 5.关闭通道和连接
            channel.close();
            connection.close();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    可以看到,当代码执行完后程序就结束了,并不会管是否有接收者需要接收消息或者接受者是否接收到消息,这表明该消息是异步通讯。
    打开RabbitMQ的可视化界面,可以在队列中的 Get Message 看到发送到队列中的消息,如下所示。

    在这里插入图片描述

    4.2.2 消息订阅者

    消息发送到队列后,就需要有订阅者来接收队列中的消息了,接收的代码如下:

    public class ConsumerTest {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.59.233");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("suppose");
            factory.setPassword("123456");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 5.处理消息
                    String message = new String(body);
                    System.out.println("接收到消息:【" + message + "】");
                }
            });
            System.out.println("等待接收消息。。。。");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    在这里,消息的接收的流程与发送流程相似,至于为何还需要创建队列,那是因为发布者与接受者都是异步创建的,可能会发生接受者先执行而发布者后执行的情况,如果出现这种情况那么队列名就不一定会存在,会产生报错,因此我们先创建一个队列名,以防其报错。

    而且,当订阅者取走了消息后,可以发现消息队列变为了空队列,这表明队列中的消息只能使用一次。

    5. SpringAMQP

    经过上面的代码,我们发现,不管消息需要发送还是接收,都比较麻烦,步骤十分多,那么,有没有可以简单快速的发送或接收消息的方式呢?这就需要聊到AMQP了。

    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

    Spring AMQP是基于ANQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

    如果要在Spring中使用AMQP,那么实现需要导入AMQP的依赖,导入依赖如下:

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    5.1 基本队列

    在SprintAMQP下如果需要使用上一节的基本队列,那么就变得十分简单了,使用测试运行发送消息的代码如下所示:

    @RunWith(SpringRunner.class)//使得自动注入生效
    @SpringBootTest //SpringBoot 测试
    public class SpringAMQPTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendMessage(){
            String queneName = "simple.queue";
            String message = "Hello everyone, there is SpringAMQP!";
            rabbitTemplate.convertAndSend(queneName, message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    注意:该方法只能使用已经创建好的队列名,如果队列名不存在,那么运行了之后也啥事没有,当然,也不会报错。

    至于消费消息方面,SpringAMQP提供了RabbitMQ的监听器,用来监听队列,仅需要加上注解 @RabbitListener(queues = "simple.queue") 并指明需要监听的队列即可对队列进行监听,一般将其写作一个Bean,这样,当队列中有消息时,其就会将队列中的消息传导该Bean中进行消费,代码如下:

    @Component
    public class SpringAMQPListener {
    
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueue(String msg){
            System.out.println("消费者收到的simple.queue队列的消息为:【" + msg + "】");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    5.2 工作队列

    工作队列示意图如下:
    在这里插入图片描述
    较之基本队列,工作队列可以有多个消费者,这主要是用来解决队列产生速度与消费者消费速度不匹配的情况,因为如果消费的速度赶不上队列堆积的速度,那么队列迟早会被堆满,因此,可以使用多个消费者来加快队列的消耗速度。

    下面我们来举一个例子,设每秒钟产生50个消息放入队列中,消费者1每秒钟消费50个消息,消费者2每秒钟消费5个消息,那么,如果使用这两个消费者来消费产生的队列会是什么情况呢?

    如果直接这样设定两个消费者的话,会是两个消费者一人一半的消息,因为AMQP中有预取的操作,正常是将到来的消息全部预取出来并逐个消息进行分配,这里如果想实现上述消费者1与消费者2异步执行,那么我们可以设置消息的预取数量为1,以此避免消息预取,设置如下:

    spring:
      rabbitmq:
        listener:
          simple:
            prefetch: 1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    生成50条消息的代码如下:

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAMQPTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendMessage(){
            String queneName = "simple.queue";
            String message = "there is SpringAMQP---";
            for (int i = 0; i < 50; i++) {
                rabbitTemplate.convertAndSend(queneName, message + i);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    两个消费者消费的代码如下:

    @Component
    public class SpringAMQPListener {
    
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueue1(String msg) throws InterruptedException {
            System.out.println("消费者收到的simple.queue队列的消息为:【" + msg + "】" + LocalTime.now());
            Thread.sleep(20);
        }
    
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueue2(String msg) throws InterruptedException {
        	// 以示区分
            System.err.println("消费者收到的simple.queue队列的消息为:【" + msg + "】" + LocalTime.now());
            Thread.sleep(200);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    以上两种方式在消费者收到消息的时候,消息就消失了,即一个接受者只能接收到队列中的一条消息,接下来我们将要讲解的是发布订阅模式,该模式下增加了一种新的结构,交换机,同时,该模式允许将一条消息传递给多个用户,需要注意的是,交换机仅负责消息路由,不负责消息存储,路由失败那么消息就丢失。

    5.3 广播

    首先是广播,广播模式的交换机又称为 Fanout Exchange ,该模式下的交换机会将所有的消息都发给每一个队列,当然,前提是队列已经绑定在了交换机上,2接下来我们实现一个如下结构的模型,
    在这里插入图片描述
    想要实现上面的结构,首先我们需要先new一个交换机以及两个队列并且将两个队列绑定在交换机上,代码如下:

    @Configuration
    public class FanoutConfig {
    
        // 新增交换机
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("suppose.fanout");
        }
    
        // 新增一个队列1
        @Bean
        public Queue fanoutQueue1(){
            return new Queue("fanout.queue1");
        }
    
        // 绑定队列1到交换机上
        @Bean
        public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
            return BindingBuilder
                    .bind(fanoutQueue1)
                    .to(fanoutExchange);
        }
    
        // 新增一个队列2
        @Bean
        public Queue fanoutQueue2(){
            return new Queue("fanout.queue2");
        }
    
        // 绑定队列2到交换机上
        @Bean
        public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
            return BindingBuilder
                    .bind(fanoutQueue2)
                    .to(fanoutExchange);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    执行上面的代码后,我们打开可视化界面的交换机一栏,发现我们创建的交换机已经存在了。

    点开交换机,发现交换机里面绑定有两个队列,结构与我们所写的一致。
    在这里插入图片描述
    之后就是消费者端消费消息了,消费消息依旧是将队列监听器绑定到对应的方法上面,代码如下:

    @Component
    public class SpringAMQPListener {
    
        @RabbitListener(queues = "fanout.queue1")
        public void listenSimpleQueue1(String msg) throws InterruptedException {
            System.out.println("消费者1收到的simple.queue队列的消息为:【" + msg + "】");
        }
    
        @RabbitListener(queues = "fanout.queue2")
        public void listenSimpleQueue2(String msg) throws InterruptedException {
            System.err.println("消费者2收到的simple.queue队列的消息为:【" + msg + "】");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    最后,就是发送消息了,发送消息的代码如下:

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAMQPTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendFanoutExchange(){
            // 交换机名称
            String exchangeName = "suppose.fanout";
            // 消息
            String message = "hello, suppose";
            // 发送消息
            rabbitTemplate.convertAndSend(exchangeName, "", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    最后,可以发现两个消费者都接收到了发送的消息。
    在这里插入图片描述

    5.4 路由

    路由模式的交换机又称为 Direct Exchange ,该模式下的交换机能够将接收到的消息按照指定规则路由到指定Queue,依据是每个Queue在与交换机绑定时会指定一个 BindKey,发送者发送消息时,指定消息的RoutingKey,于是交换机就会将消息路由到RoutingKey与BindKey一致的队列中,当然,也可以对多个Queue绑定一个BindKey,这样的话,相同的BindKey相当于就使用的是广播模式。明显可以看出来,该方式比广播模式更加的灵活。

    接下来我们来实现下面一个结构的模型。
    在这里插入图片描述
    在这里,我们使用更加简单的注解方式来对交换机与队列进行new和绑定,代码如下:

    @Component
    public class SpringAMQPListener {
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "direct.queue1"),
                exchange = @Exchange(name = "suppose.direct", type = "direct"),//默认模式就是direct
                key = {"red", "blue"}
        ))
        public void listenDirectQueue1(String msg){
            System.out.println("消费者1收到的direct.queue队列的消息为:【" + msg + "】");
        }
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "direct.queue2"),
                exchange = @Exchange(name = "suppose.direct", type = "direct"),
                key = {"red", "yellow"}
        ))
        public void listenDirectQueue2(String msg){
            System.out.println("消费者2收到的direct.queue队列的消息为:【" + msg + "】");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    之后就是发送消息了,发送消息时需要指定RoutingKey,代码如下:

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAMQPTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendDirectExchange(){
            // 交换机名称
            String exchangeName = "suppose.direct";
            // 消息
            String message = "hello, blue";
            // 发送消息
            rabbitTemplate.convertAndSend(exchangeName, "blue", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这样,只有BindingKey为 blue 的队列能够接收到该消息。

    5.5 主题

    路由模式的交换机又称为 Topic Exchange ,该模式下的交换机其实与 Direct模式相差不大,也是需要RoutingKey,但是,该模式的RoutingKey必须是多个单词的列表,并且以 . 分割,Queue与交换机指定BindingKey时可以使用通配符,通配符 * 代表一个单词,通配符 # 代表0个或多个单词。

    接下来我们来实现下面的案例模型,

    在这里插入图片描述
    首先依旧是使用注解来创建交换机与队列并进行绑定,只需要更改队列名、交换机名以及交换机类型即可,代码如下:

    @Component
    public class SpringAMQPListener {
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "topic.queue1"),
                exchange = @Exchange(name = "suppose.topic", type = "topic"),
                key = "china.#"
        ))
        public void listenTopicQueue1(String msg){
            System.out.println("消费者1收到的topic.queue队列的消息为:【" + msg + "】");
        }
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "topic.queue2"),
                exchange = @Exchange(name = "suppose.topic", type = "topic"),
                key = "#.news"
        ))
        public void listenTopicQueue2(String msg){
            System.out.println("消费者2收到的topic.queue队列的消息为:【" + msg + "】");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    接收消息的部分与上面的路由模式基本一致,

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAMQPTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendTopicExchange(){
            // 交换机名称
            String exchangeName = "suppose.topic";
            // 消息
            String message = "hello, china.feature";
            // 发送消息
            rabbitTemplate.convertAndSend(exchangeName, "china.feature", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    6. 消息转换器

    在发消息时,如果我们需要发送像Map类型 ,List类型的消息怎么办呢?这里其实是可以发送过去的,因为查看 rabbitTemplate.convertAndSend 的源码可以知道,消息的类型其实定义的是 Object 类型。
    在这里插入图片描述
    这样的话其实就相当于将消息序列化后再传到队列中,然后接收时在反序列出来。这样的话在接收时只要也定义相同的类型也能接收,但是,在RabbitMQ的可视化界面中展示消息 时,展示的就是序列化后的结果,就没有可读性,如下。
    在这里插入图片描述
    于是,我们下面使用JSON的MessageConvert来覆盖默认的JDK中的序列化,使得在RabbitMQ的可视化界面中也能出现我们传递的消息。
    首先,引入如下依赖,

    <dependency>
        <groupId>com.fasterxml.jackson.dataformatgroupId>
        <artifactId>jackson-dataformat-xmlartifactId>
        <version>2.9.10version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    然后创建一个Bean,如下:

        @Bean
        public Jackson2JsonMessageConverter jsonMessageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    
    • 1
    • 2
    • 3
    • 4

    其余代码不变,将消息放送后,在RabbitMQ中可以看到,消息的显示已经是原本的消息了,并非序列化后的消息,
    在这里插入图片描述

    最后,注意,接收方与发送方的消息转换器必须是同一个,也就是说,在发送方修改了消息转换器后,那么接收方也需要导入依赖并创建如上的Bean,最后再将其进行接收,注意,接收的类型与发送的类型要一致:

    	@RabbitListener(bindings = @QueueBinding(
    	        value = @Queue(name = "topic.queue2"),
    	        exchange = @Exchange(name = "suppose.topic", type = "topic"),
    	        key = "#.news"
    	))
    	public void listenTopicQueue2(Map<String, Object> msg){
    	    System.out.println("消费者2收到的topic.queue队列的消息为:【" + msg + "】");
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 相关阅读:
    cadence SPB17.4 S032 - Update Symbols失败的问题
    2023.11.13 hive数据仓库之分区表与分桶表操作,与复杂类型的运用
    react setState异步操作数据的问题
    商品分类,手机云进销存ERP门店零售批发仓库开单APP软件,文具五金服装鞋帽酒店烟酒饰品批发条码管理
    R语言forcats包处理因子
    前端学习 Nginx
    Vue3学习笔记:ref函数、reactive函数等常用Composition API、生命周期,Fragment、teleport等新的组件用法记录
    【算法分析与设计】动态规划(下)
    MySQL中的InnoDB引擎
    Java复习第二弹!
  • 原文地址:https://blog.csdn.net/ifhuke/article/details/130288479