• 谈谈RabbitMQ的五种消息模型以及SpringAMQP的使用


    一、前言

    1. RabbitMQ中的基本概念

    • message

    消息由消息头与消息体组成。消息体不透明,而消息头由其他可选属性组成,包括:RoutingKey(路由键)、Priority(相对于其他消息的优先权)等。

    • publisher

    消息的生产者,向队列或交换机发送消息。

    • consumer

    消息的消费者,从消息队列中获取消息。

    • exchange

    交换机,发布订阅模式中用来接受生产者发送的消息,并负责消息路由给队列。

    • queue

    消息队列,用来存储消息直到发送给消费者。

    • virtualHost

    虚拟主机,用以隔离不同租户的exchange、queue、消息的隔离。

    • connection

    网络连接。

    • channel

    信道,他是建立在TCP连接中的虚拟连接,AMQP的发布消息、订阅队列。接受消息均是通过信道完成。他们可以通过TCP连接完成,但是建立与销毁TCP的开销对于系统来说十分大,因此引入了信道,以复用TCP连接。

    • binding

    绑定,用于关联交换机与消息队列。

    • broker

    消息队列服务器实体,它是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。

    2. docker部署RabbitMQ

    • 拉取镜像
    docker pull rabbitmq:3-management
    
    • 1
    • 运行容器
    docker run \
     -e RABBITMQ_DEFAULT_USER=tyt\
     -e RABBITMQ_DEFAULT_PASS=123456 \
     --name mq \
     --hostname mq1 \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3-management
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3. AMQP与JMS的简单介绍

    MQ是一种消息通信的模型,但并不是具体的实现。现在实现MQ的有两种主流方式:AMQPJMS
    在这里插入图片描述

    在这里插入图片描述

    两者对比:

    • JMS定义了统一的接口,对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
    • JMS限定了只能使用Java;AMQP不限制语言
    • JMS只规定了两种消息模型;AMQP的消息模型有多种

    在本文中要使用的SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

    SpringAmqp的官方地址

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jBGx5FYz-1668677222333)(C:\Users\tangyitao\AppData\Roaming\Typora\typora-user-images\image-20221117141517452.png)]

    4. 演示demo搭建结构

    本文中对于各种模型均给出了示例代码,在此将整个工程结构给出。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-21DHFXDo-1668677222334)(C:\Users\tangyitao\AppData\Roaming\Typora\typora-user-images\image-20221117141405201.png)]

    • 首先,我们在父工程的pom.xml中导入SpringAMQP的依赖
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 然后分别在 publisher 与 consumer 模块的 application.yml 添加配置信息
    spring:
      rabbitmq:
        # rabbitMQ的ip地址
        host: localhost
        # 端口
        port: 5672 
        # 用户名与密码
        username: root
        password: 123456
        # 要操作的虚拟主机
        virtual-host: /
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    二、Basic Queue

    这是RabbitMQ的基本模型,也是其他几个模型的雏形,最简单的消息模式。
    在这里插入图片描述

    最基础的消息队列模型只包括三个角色:

    • publisher:生产者、消息发布者,将消息发送到队列queue

    • consumer:消费者,订阅队列,处理队列中的消息

    • queue:消息队列,负责接受并缓存消息。队列只受主机的内存与磁盘限制,是一个大的消息缓冲区。许多生产者都可以发送消息到一个队列,许多消费者也可以从一个队列接受消息。

    代码示例

    • 消息发送

    在 publisher 模块中利用RabbitTemplate实现消息发送

    @SpringBootTest
    public class SpringAmqpTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendMessage2HelloQueue() {
            // 指定队列名称
            String queueName = "hello.queue";
            // 要发送的信息
            String message = "hello, spring amqp!";
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 消息接受

    在consumer模块新建一个监听器类,用于监听RabbitMQ中的消息,代码如下

    @Component
    public class SpringRabbitListener {
    
        // 指定需要监听的 队列名称
         @RabbitListener(queues = "hello.queue")
         public void listenHelloQueue(String message) {
             System.err.println("消费者接收到hello.queue的消息:【" + message + "】");
         }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    三、Work Queue

    工作模型,比上述提到的Basic Queue模型相对复杂一丢丢。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

    注意:

    一个消息只能被一个消费者获取

    在这里插入图片描述

    工作模型相比于基本模型就是多了消费者的数量,那么,我们何时可以选择使用此模型呢?

    当消息处理比较耗时,并且生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用工作模型,多个消费者共同处理消息处理,速度就能大大提高了。

    代码示例

    • 消息发送

    此处,我们选择循环发送信息模拟消息堆积的情况,在 publisher 模块中利用RabbitTemplate实现消息发送

    @SpringBootTest
    public class SpringAmqpTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendMessage2WorkQueue() throws InterruptedException {
            // 指定 队列名称
            String queueName = "work.queue";
            String message = "this is message__";
            for (int i = 1; i <= 50; i++) {
                rabbitTemplate.convertAndSend(queueName, message + i);
                Thread.sleep(20);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 消息接受

    此处我们需要模拟多个消费者绑定同一个队列,我们在consumer模块的SpringRabbitListener中添加2个新的监听方法并让他们同时监听work.queue队列。

    我们特意让消费者1两次消费之间间隔20ms,消费者2间隔200ms。以此模拟消费者1性能优于消费者2的情况。

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String  message) throws InterruptedException {
        // System.err.print 在控制台输出为红色字体,主要是为了便于区分两个消费者
        System.err.println("消费者1接收到work.queue的消息:【" + message + "】"+ LocalTime.now());
        Thread.sleep(20);
    }
    
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String message) throws InterruptedException {
        System.out.println("消费者2接收到work.queue的消息:【" + message + "】" + LocalTime.now());
        Thread.sleep(200);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 观察结果
      在这里插入图片描述

    我们可以发现,消费者1与消费者2都同时处理了25条信息。

    消费者1很快完成了自己的25条消息,消费者2却在缓慢的处理自己的25条消息。

    也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。

    • 能者多劳配置

    通过以上的测试,我们可以发现队列中的消息默认是平均分配给每个消费者,并没有考虑到消费者的处理能力,这样显然是有问题的。

    我们可以通过配置解决这个问题,就是让消费者不要一次性的分配到多条信息,而是直到处理完了n条信息后再去获取信息。当然,这个n就是我们自己来制定的,一般都设置为 1。

    spring:
      rabbitmq:
        listener:
          simple:
            # 每次只能获取一条消息,处理完成才能获取下一个消息
            prefetch: 1 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    完成配置后,可以发现当前大多数消息都被消费者1进行了消费。


    四、发布订阅模式

    在这里插入图片描述

    在订阅模型中,多了一个exchange角色,并且过程也略有变化。

    • 与Work Queue相同,都是一个生产者对应多个消费者。但是此处一条消息是可以被多个消费者获取

    • 生产者不再将消息直接发送到队列,而是发送给exchange(交换机)

    • 每个队列都需要绑定一个交换机

    • 交换机一方面,可以接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

      • Fanout:广播,将消息交给所有绑定到交换机的队列
      • Direct:定向,把消息交给符合指定 RoutingKey 的队列
      • Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列
    • 由图也可知,该模式与以上提到的基本模式与工作模式毫不冲突,完全可以结合到一起使用。

    什么场景下需要使用发布订阅模式?

    在上面也提到了发布订阅模式中的一条消息是可以被多个消费者获取的。

    如果我们有此种需求,比如一笔外卖订单生成后,我们需要将用户信息分别发送到两个模块:配送模块与积分模块。配送模块需要的是用户配送信息,积分模块需要用户的Id。

    因此我们可以在订单生成后,将用户的Id存储在消息队列,然后两个模块分别基于用户Id完成相应操作。

    有了发布订阅模式之后,我们只需要将消息发送一次,多个消费者模块都可以接受到此消息。

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    1. Fanout

    广播模式,原理如图:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K164752z-1668677222339)(C:\Users\tangyitao\AppData\Roaming\Typora\typora-user-images\image-20221117152941692.png)]

    这种模式,交换机与队列直接完成绑定,不需要指定任何Key。所以它的消息传输速度是发布订阅模式中最快的。

    示例代码

    我们预实现如图场景:

    在这里插入图片描述

    • 创建一个交换机 tyt.fanout,类型是Fanout
    • 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机 tyt.fanout
    • 声明队列和交换机

    在consumer模块中新建一个配置类,声明队列和交换机:

    @Configuration
    public class FanoutConfig{
        
        // 声明一个名为 tyt.fanout 的交换机
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("tyt.fanout");
        }
    
        // 声明一个名为 fanout.queue1 的队列
        @Bean
        public Queue fanoutQueue1(){
            return new Queue("fanout.queue1");
        }
    
        // 绑定 队列1 到 交换机
        @Bean
        public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
            return BindingBuilder
                    .bind(fanoutQueue1)
                    .to(fanoutExchange);
        }
    
        // 声明一个名为 fanout.queue2 的队列
        @Bean
        public Queue fanoutQueue2(){
            return new Queue("fanout.queue2");
        }
    
        // 绑定 队列2 到 交换机
        @Bean
        public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
            return BindingBuilder
                    .bind(fanoutQueue2)
                    .to(fanoutExchange);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 消息发送

    在 publisher 模块中利用RabbitTemplate实现消息发送

    @SpringBootTest
    public class SpringAmqpTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendFanoutExchange() {
            // 交换机名称
            String exchangeName = "tyt.fanout";
            // 消息
            String message = "hello, every consumer!";
            // 发送消息
            rabbitTemplate.convertAndSend(exchangeName, "", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 消息接收

    在consumer模块的SpringRabbitListener中添加2个新的监听方法并让他们分别监听fanout.queue1fanout.queue2队列。

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String message) {
        System.out.println("消费者接收到fanout.queue1的消息:【" + message + "】");
    }
    
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String message) {
        System.out.println("消费者接收到fanout.queue2的消息:【" + message + "】");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2. Direct

    在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

    与Fanout模式相比,Direct模式增加了一个RoutingKey的概念。

    我们可以把 publisher 与 consumers (加个s代表多个消费者)当成黑帮老大,而交换机与队列就是它们分别派出去的小弟,派出去时,publisher与 consumers 都分别给了所派出去的交换机与队列一个RoutingKey。这就是他们行走江湖的暗号,当暗号对应上了,交换机就将消息给对应的队列。

    对于下图,我们就可以知道了 consumer1 可以接收到消息,而 consumer2 接受不到。

    在这里插入图片描述

    在Direct模型下:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定RoutingKey
    • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
    • Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的Routingkey与消息的 Routingkey完全一致,才会接收到消息

    示例代码

    • 声明队列和交换机

    在上面,我们已经使用过配置类完成声明队列与交换机。其实,Spring还提供了基于注解方式来声明。

    而基于的注解也是我们刚才使用过的@RabbitListener,因此,此处可以将声明与监听同时配置。

    在consumer 模块的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机

    @RabbitListener(bindings = @QueueBinding(
        // 指定队列的名字
        value = @Queue(name = "direct.queue1"),
        // 指定交换机的名字以及交换机类型
        exchange = @Exchange(name = "tyt.direct", type = ExchangeTypes.DIRECT),
        // 指定 RoutingKey,可以同时指定多个,到时候比对 RoutingKey 只要有一个可以对应就可以匹配成功
        key = {"111", "222"}
    ))
    public void listenDirectQueue1(String message){
        System.out.println("消费者接收到direct.queue1的消息:【" + message + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        // 指定队列的名字
        value = @Queue(name = "direct.queue2"),
        // 指定交换机的名字以及交换机类型
        exchange = @Exchange(name = "tyt.direct", type = ExchangeTypes.DIRECT),
        key = {"111", "333"}
    ))
    public void listenDirectQueue2(String message){
        System.out.println("消费者接收到direct.queue2的消息:【" + message + "】");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 消息发送

    在 publisher 模块中利用RabbitTemplate实现消息发送

    
    @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "tyt.direct";
        // 消息
        String message = "hello, direct!";
        /*
                当 routingKey = 111, direct.queue1 与 direct.queue2 均可获取信息
                当 routingKey = 222, 只有 direct.queue1 可获取信息
                当 routingKey = 333, 只有 direct.queue2 可获取信息
             */
        String routingKey = "111";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3. Topic

    Topic 与 Direct 相比,均可以根据RoutingKey把消息路由到不同的队列。只不过 Topic 可以让队列在绑定RoutingKey的时候使用通配符。说的通俗一点,就是可以将交换机与队列的RoutingKey模糊匹配。

    Routingkey一般都是有一个或多个单词组成,多个单词之间以.分割,例如: tyt.key

    通配符规则:

    • #:匹配一个或多个词

    • *:匹配1个词

    例如:

    student.#:能够匹配student.info.name 或者 student.name

    student.*:只能匹配student.name

    在这里插入图片描述

    解释:

    • queue1:绑定的是china.# ,因此凡是以 china.开头的Routingkey都会被匹配到。包括china.news和china.weather
    • queue2:绑定的是japan.# ,因此凡是以 japan.开头的Routingkey都会被匹配到。包括japan.news和japan.weather
    • queue3:绑定的是#.weather ,因此凡是以.weather结尾的Routingkey都会被匹配。包括china.weather和japan.weather
    • queue4:绑定的是#.news ,因此凡是以.news结尾的Routingkey都会被匹配。包括china.news和japan.news

    示例代码

    • 消息发送

    在 publisher 模块中利用RabbitTemplate实现消息发送

    @Test
    public void testSendTopicExchange() {
        // 交换机名称
        String exchangeName = "tyt.topic";
        // 消息
        String message = "北京申奥成功啦!!!";
        String routingKey = "china.news";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 消息接受

    在consumer 模块的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机

    @RabbitListener(bindings = @QueueBinding(
        // 指定队列的名字
        value = @Queue(name = "topic.queue1"),
        // 指定交换机的名字以及交换机类型
        exchange = @Exchange(name = "tyt.topic", type = ExchangeTypes.TOPIC),
        // 指定 RoutingKey,可以同时指定多个,到时候比对 RoutingKey 只要有一个可以对应就可以匹配成功
        key = {"china.#"}
    ))
    public void listenTopicQueue1(String message){
        System.out.println("消费者接收到topic.queue1的消息:【" + message + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        // 指定队列的名字
        value = @Queue(name = "topic.queue2"),
        // 指定交换机的名字以及交换机类型
        exchange = @Exchange(name = "tyt.topic", type = ExchangeTypes.TOPIC),
        // 指定 RoutingKey,可以同时指定多个,到时候比对 RoutingKey 只要有一个可以对应就可以匹配成功
        key = {"*.news"}
    ))
    public void listenTopicQueue2(String message){
        System.out.println("消费者接收到topic.queue2的消息:【" + message + "】");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    五、消息转换器

    在发送信息到RabbitMQ时,Spring会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

    默认情况下,Spring采用的就是JDK序列化。众所周知,JDK序列化存在下列问题:

    • 数据体积过大
    • 有安全漏洞
    • 可读性差

    1. 默认转换器

    • 新建一个实体类对象
    @Data
    public class UserEntity implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private String name;
    
        private Integer age;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 消息发送
    @Test
    public void testSendObject() {
        String queueName = "hello.queue";
        UserEntity user = new UserEntity();
        user.setName("tyt");
        user.setAge(20);
        rabbitTemplate.convertAndSend(queueName, user);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 查看控制台
      在这里插入图片描述

    显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

    2. 配置JSON转换器

    • 在 publisher 模块和 consumer 模块都引入依赖:
    <dependency>
        <groupId>com.fasterxml.jackson.dataformatgroupId>
        <artifactId>jackson-dataformat-xmlartifactId>
        <version>2.9.10version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 配置消息转换器

    在启动类上加上一个 Bean

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    The file was loaded in a wrong encoding: ‘UTF-8
    集成电路模拟版图入门——转行版图基础学习笔记(一)
    SMBMS超市管理系统(三:注销功能实现,登录功能优化)
    PAT A1017 Queueing at Bank
    392.判断子序列 | 792.匹配子序列的单词数
    使用webpack5搭建vue3项目过程记录(详细注解)
    交互与前端16 Tabulator 表格实践4
    Tmux:终端复用器的基本使用(一)
    【API 管理】什么是 API 管理,为什么它很重要?
    OpenGL原理与实践——核心模式(四):摄像机变换理论与应用
  • 原文地址:https://blog.csdn.net/qq_51938362/article/details/127908221