• RabbitMQ第二个实操小案例——WorkQueue


    RabbitMQ第二个实操小案例——WorkQueue

    讲第二个案例之前,我们先看下前面第一个案例的模型:
    在这里插入图片描述
    可以看到,我们只有一个发布者和一个消费者,通过Queue队列,实现最简单的消息的发送和接收。但是,如果现在我们的发布者他每秒发布100条数据,而我们的消费者每秒只能处理90条数据,那么每秒我们的队列将有10条数据被卡在队列中,而队列的容量是有限的,随着累计的时间长了,我们的消息也会因为来不及处理,导致后面发送的消息没来得及被接收就被销毁掉了。

    这个时候,就要引出我们的第二个模型了——工作队列了,往下看:

    我们先看一下工作队列的模型,如下:
    在这里插入图片描述
    WorkQueue的模型跟前面第一个案例Hello,World!的模型,最明显的区别其实就是,第一个案例他只有一个消费者。我们知道RabbitMQ他的消息是阅完即焚,即消费者一旦接收,这个消息直接就从Queue中被弹出了。
    而现在这个案例,他有两个消费者(画两个只是方便,他当然也可以有3个、4个),他的消息应该是通过某种算法做负载均衡送到不同的消费者,让消费者进行处理,让消息不至于处理不过来,从而导致滞留在Queue中的消息被弹出。

    废话少说,代码走起!

    思路如下:
    1、我们先让Publish服务每秒发布50条消息到 simple.queue,来演示消息的频繁发送。
    2、在Consumer服务中定义两个消费者,来监听我们的 simple.queue队列。
    3、消费者1每秒处理40条消息,消费者2每秒处理30条消息。

    首先,我们在Publish服务下编写测试方法:

        @Test
        public void testWorkQueue() throws InterruptedException {
            String queueName = "simple.queue";
            String message = "Hello, I'm ";
            for (int i = 1; i <= 50; i++) {
                rabbitTemplate.convertAndSend(queueName, message+i);
                Thread.sleep(20);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    然后,我们编写我们的两个消费者。(为了不影响我们的实验,记得把上一个实验写的方法注释掉)

    //    @RabbitListener(queues = "simple.queue")
    //    public void listenQueueMessage(String msg) throws InterruptedException{
    //        System.out.println("监听到的消息为:【"+ msg +"】");
    //    }
    
        @RabbitListener(queues = "simple.queue")
        public void listenWorkQueueMessage1(String msg) throws InterruptedException{
            //打印白色的字体
            System.out.println("消费者1监听到的消息为:【"+ msg +"】");
            Thread.sleep(25);
        }
    
        @RabbitListener(queues = "simple.queue")
        public void listenWorkQueueMessage2(String msg) throws InterruptedException{
            //打印红色的字体
            System.err.println("消费者2监听到的消息为:【"+ msg +"】");
            Thread.sleep(33);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    然后我们启动我们的消费者的服务。
    在这里插入图片描述
    然后我们跑一下我们的Publish的测试方法,开始见证奇迹:
    在这里插入图片描述
    看到这里,有些人可能会好奇,怎么消费者1执行的都是奇数的消息,消费者2执行的都是偶数的消息,其实这个不是偶然哈,这个是必然的,现在我们把消费者1的睡眠时间改成5,消费者2的睡眠时间改为50,再看一眼结果。(这就很明显了,红色的是2、4、6、8、10),白色的是1、3、5、7、9。
    在这里插入图片描述
    造成这种情况的原因是 RabbitMQ 内部的 消息预取 机制导致的,那么什么叫消息预取呢?其实也很简单,就是说 RabbitMQ 他的管道,他不管你处理快不快,我拿到了我就先预定好,这个给消费者1,这个给消费者2,这个给消费者1,这个给消费者2,以此类推,一直这么分下去。最后两个消费者就会拿到一样的消息,但是!这明显不是我们要的,有些服务器他快,他处理起来快,我们应该多给他几条,有些服务器他慢,我们应该让他少处理几条。怎么玩呢?其实也很简单,配置一下RabbitMQ预取的数量(每次获取的消息数量,处理完后这一批后再获取下一批)就可以了:

    在消费者服务的配置文件中,配置我们的RabbitMQ的预取数如下:spring.rabbitmq.listener.simple.prefetch

    spring:
      rabbitmq:
        host: 192.168.83.130
        port: 5672
        virtual-host: /
        username: admin
        password: root
        listener:
          simple:
          	#预取数:每次接收的消息数,处理完才会接收下一批消息,这里我们设置为一条
            prefetch: 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    配置好后,我们重启一下我们的消费者服务,然后看一下结果:
    在这里插入图片描述

  • 相关阅读:
    企业实践|基于软件研运一体化DevOps平台的应用解析
    vue实现【接口数据渲染随机显示】和【仅显示前五条数据】
    unity打包webgl 部署到本地Web服务器
    AI-“国外一开源,国内就创新”!
    springboot自动配置原理
    【C++】C++基础知识(四)---程序流程结构
    潘多拉 IOT 开发板学习(HAL 库)—— 实验1 跑马灯(RGB)实验(学习笔记)
    Go 的三种指针
    【Kubernetes】k8s集群中pod的容器资源限制和三种探针
    【数据聚类】第三章第三节4:类K-Means算法之二分K-均值算法(bisecting K-Means算法)
  • 原文地址:https://blog.csdn.net/weixin_44741023/article/details/127790186