• RabbitMQ---SpringAMQP的使用,五种消息模型的示例


    SpringAMQP的使用:

    SpringAMQP 提供了三个功能:

    自动声明队列、交换机及其绑定关系
    基于注解的监听器模式,异步接收消息
    封装了 RabbitTemplate 工具,用于发送消息

    第一步引入AMQP依赖:

    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    第二部:
    在yml文件中进行对于的配置

    spring:
      rabbitmq:
        host: 127.0.0.1 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: admin # 用户名
        password: 123456 # 密码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    RabbitMQ的五种消息模型:

    在这里插入图片描述

    基本消息队列模型

    基础的消息队列模型来实现的,只包括三个角色:

    publisher:消息发布者,将消息发送到队列queue
    queue:消息队列,负责接受并缓存消息
    consumer:订阅队列,处理队列中的消息

    在这里插入图片描述

    代码示例:

    在 consumer 服务中添加监听队列

    @Component
    public class RabbitMQListener {
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage(String msg){
            System.out.println("消费者接收到消息:【" + msg + "】");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在 publisher 服务中添加发送消息的测试类

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAmqpTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @Test
        public void testSimpleQueue() {
            // 队列名称
            String queueName = "simple.queue";
            // 消息
            String message = "hello SimpleQueue!";
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    要注意的是,这种方式发送前要有simple.queue的通道,否则会报错
    可以使用下列方式,自动创建通道

    //2. 自动创建队列
    @RabbitListener(queuesToDeclare = @Queue("simple.queue"))
    public void process2(String msg){
        System.out.println("消费者接收到消息:【" + msg + "】");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    WorkQueue工作消息模式

    Work queues,也被称为(Task queues)任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息
    在这里插入图片描述
    Work 模型的使用:

    • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
    • 通过设置 prefetch 来控制消费者预取的消息数量

    此任务模型可以开启多个通道,共同处理消息队列中的任务

    代码示例:

    在 publisher 服务中的 SpringAmqpTest 类中添加一个测试方法:

    /**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */
    @Test
    public void testWorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, message_";
        for (int i = 0; i < 50; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    consumer 服务的 RabbitMQListener 中添加2个新的方法:

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }
    
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这样子编写完之后,每个消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这是因为 RabbitMQ 默认有一个消息预取机制

    如果想要能者多劳,处理快的消息通道多处理几个,就要配置
    处理消息预取机制,每次只能拿一条,处理完成才开始处理下一条的消息

    yml中配置:

        listener:
          simple:
            prefetch: 1 #处理消息预取机制,每次只能拿一条,处理完成才开始处理下一条的消息
    
    • 1
    • 2
    • 3

    发布/订阅

    在这里插入图片描述
    图中可以看到,在订阅模型中,多了一个 exchange 角色,而且过程略有变化

    • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 exchange(交换机)
    • Consumer:消费者,与以前一样,订阅队列,没有变化
    • Queue:消息队列也与以前一样,接收消息、缓存消息
    • Exchange:交换机,一方面,接收生产者发送的消息;另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。Exchange 有以下3种类型:
      • Fanout:广播,将消息交给所有绑定到交换机的队列
      • Direct:定向,把消息交给符合指定 routing key 的队列
      • Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

    Fanout广播模式:

    在这里插入图片描述
    在广播模式下,消息发送流程是这样的:

    可以有多个队列
    每个队列都要绑定到 Exchange(交换机)
    生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
    交换机把消息发送给绑定过的所有队列
    订阅队列的消费者都能拿到消息

    代码实现:

    publisher 服务的 SpringAmqpTest类中添加测试方法:

    /**
     * Fanout交换机工作模型
     */
    @Test
    public void FanoutQueueTest() {
        String exchangeName = "fanout.exchange";
        String msg= "hello FanoutQueue";
        rabbitTemplate.convertAndSend(exchangeName,"",msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在 consumer 服务的中添加个方法,作为消费者

        @RabbitListener(bindings = @QueueBinding(
                value = @Queue("fanout.queue1"),
                exchange = @Exchange(value = "fanout.exchange", type = ExchangeTypes.FANOUT)
        ))
        public void LisenterFanOutQueue1(String msg) {
            System.out.println("fanout.queue1接收到的消息为:" + msg);
        }
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue("fanout.queue2"),
                exchange = @Exchange(value = "fanout.exchange", type = ExchangeTypes.FANOUT)
        ))
        public void LisenterFanOutQueue2(String msg) {
            System.out.println("fanout.queue2接收到的消息为:" + msg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里插入图片描述
    运行该方法,可以发现 fanout.queue1、fanout.queue2 都收到了交换机的消息。

    Direct路由消息模式

    Direct路由消息模式其实算是Fanout模式的一种,只不过可以给queue设置key名称,从而达到不同消息通道收到不同的消息
    在这里插入图片描述
    在 Direct 模型下:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
    • 消息的发送方向 Exchange发送消息时,也必须指定消息的 RoutingKey
    • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key完全一致,才会接收到消息

    代码示例

    在 consumer中添加两个消费者,同时基于注解来声明队列和交换机

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("Direct.queue"),
            exchange = @Exchange(value = "Direct.exchange",type = ExchangeTypes.DIRECT),
            key = "blue"
    ))
    public void DirectQueue1(String msg){
        System.out.println("DirectQueue1接收到的消息为:"+msg);
    }
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("Direct.queue2"),
            exchange = @Exchange(value = "Direct.exchange",type = ExchangeTypes.DIRECT),
            key = "red"
    ))
    public void DirectQueue2(String msg){
        System.out.println("DirectQueue2接收到的消息为:"+msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    publisher 服务的 SpringAmqpTest类中添加测试方法:

    /**
     * Direct交换机工作模型
     */
    @Test
    public void DirectQueueTest() {
        String exchangeName = "Direct.exchange";
        String msg= "hello DirectQueue";
        rabbitTemplate.convertAndSend(exchangeName,"blue",msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述
    可以看到只有key为blue的收到消息

    Topic

    Topic 与 Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic 类型可以让队列在绑定Routing key 的时候使用通配符

    通配符规则:

    #:匹配一个或多个词
    *:只能匹配一个词

    例如:

    item.#:能够匹配item.spu.insert 或者 item.spu
    item.*:只能匹配item.spu

    在这里插入图片描述

    代码示例:

    在 consumer中添加两个消费者,同时基于注解来声明队列和交换机

    @Component
    public class TopicLisenter {
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "topic.queue1"),
                exchange = @Exchange(name = "Topic.exchange", type = ExchangeTypes.TOPIC),
                key = "china.#"
        ))
        public void listenTopicQueue1(String msg) {
            System.out.println("接收到topic.queue1的消息:【" + msg + "】");
        }
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "topic.queue2"),
                exchange = @Exchange(name = "Topic.exchange", type = ExchangeTypes.TOPIC),
                key = "china.*"
        ))
        public void listenTopicQueue2(String msg) {
            System.out.println("接收到topic.queue2的消息:【" + msg + "】");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    publisher 服务的 SpringAmqpTest类中添加测试方法:

    /**
     * topic
     * 向交换机发送消息
     */
    @Test
    public void testTopicExchange() {
        // 交换机名称
        String exchangeName = "Topic.exchange";
        // 消息
        String message1 = "hello, i am topic form china.news";
        String message2 = "hello, i am topic form china.news.2";
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message1);
        rabbitTemplate.convertAndSend(exchangeName, "china.news.2", message2);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里插入图片描述

    消息转换器

    Spring 会把你发送的消息序列化为字节发送给 MQ,接收消息的时候,还会把字节反序列化为 Java 对象。

    默认情况下 Spring 采用的序列化方式是 JDK 序列化。

    代码示例

    //queuesToDeclare 自动创建消息通道
    @RabbitListener(queuesToDeclare = @Queue(value = "object.queue"))
    public void listenObjectQueue(Map<String,Object> msg) throws InterruptedException {
        System.err.println("object接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    @Test
    public void testSendMap()  {
        // 准备消息
        Map<String,Object> msg = new HashMap<>();
        msg.put("name", "Jack");
        msg.put("age", 21);
        // 发送消息
        rabbitTemplate.convertAndSend("object.queue", msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    众所周知,JDK序列化存在下列问题:

    数据体积过大
    有安全漏洞
    可读性差

    我们推荐可以使用 JSON 来序列化
    在 publisher 和 consumer 两个服务中都引入依赖

    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-xml</artifactId>
        <version>2.9.10</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    配置消息转换器。

    在各自的启动类中添加一个 Bean 即可

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

  • 相关阅读:
    # Oracle 库常见问题排查
    好数组——尺取法
    某省大学排名网络爬虫应该怎么做
    存储性能测试
    【Hack The Box】Linux练习-- OpenAdmin
    模型微调迁移学习Finetune方法大全
    js中 slice 用法用法全解析
    【微信小程序】后台数据交互于WX文件使用
    windows系统安装python教程,以及PyCharm安装,新手入门详细
    财报解读:双轮驱动下,香飘飘究竟能打开多大的获利空间?
  • 原文地址:https://blog.csdn.net/qq_57480977/article/details/127824634