• RabbitMQ--基础--7.2--工作模式--工作队列


    RabbitMQ–基础–7.2–工作模式–工作队列


    代码位置

    https://gitee.com/DanShenGuiZu/learnDemo/tree/master/rabbitMq-learn/rabbitMq-03
    
    • 1

    1、介绍

    1. 工作队列模式 也叫 竞争消费者模式
      1. 多个消费端消费同一个队列中的消息
      2. 队列采用轮询的方式将消息是平均发送给消费者,此处的资源是竞争关系
    2. 与简单模式相比 多了一个(或者多个)消费端
    3. 它解决了当消息队列的消息过多的情况,单消费者消费速率有限,导致的消息堆积的问题。
    4. 工作队列模式中的队列是和默认的交换机 AMQP default 进行绑定(和简单模式一样)
    5. 保证同一个消息只能被一个消费者消费掉
      1. 可以设置一个开关,syncronize,保证一条消息只能被一个消费者使用
    6. 用于场景
      1. 红包场景
      2. 大型项目中的资源调度

    1.1、结构图

    在这里插入图片描述

    1.2、模式

    1. 轮询分发
    2. 公平分发

    1.2.1、轮询分发

    消息平均分配。不管谁忙,都不会多给消息,总是你一个我一个

    1.2.2、公平分发

    1. 能者多劳。谁消费得快,谁就消费得多。
    2. 这种模式下,必须关闭自动应答 ACK,改成手动应答,并且设置 channel.basicQos(1),表示消费者一次只处理一条消息

    2、MQ实现(工作队列模式的实现)

    工作队列模式中的队列是和默认的交换机 AMQP default 进行绑定(和简单模式一样)

    2.1、创建队列

    work_queue1
    work_queue2
    
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    2.2、通过默认交换机发送消息

    1. 需要指定路由键:路由键为队列名称

    在这里插入图片描述

    在这里插入图片描述

    2.3、到队列中查看消息

    在这里插入图片描述

    3、代码实现

    P:生产者:生产消息
    C1:消费者1 Consumer :消费消息,完成速度较慢(模拟耗时)
    C2:消费者2 Consumer2 :消费消息,完成速度较快

    3.1、轮询分发

    消息平均分配。不管谁忙,都不会多给消息,总是你一个我一个

    3.1.1、代码结构

    在这里插入图片描述

    3.1.2、Producer代码

    package com.example.rabbitmq03.business.test2;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
        
        public static void main(String[] args) throws Exception {
            // 1. 获取连接
            Connection connection = RabbitMqUtil.getConnection("生产者");
            // 2. 通过连接获取通道 Channel
            Channel channel = connection.createChannel();
            String queueName = "code_work_queue1";
            // 3. 通过通道创建声明队列
            /**
             * @param1 队列名称
             * @param2 是否持久化 持久化:RabbitMQ服务器重启,队列还存在;反之,不存在。
             *         持久化的队列中的消息会存盘,不会随着服务器的重启会消失
             * @param3 排他性 是否独占一个队列(一般不会)
             * @param4 是否自动删除 随着最后一个消费者消费消息完毕后,是否自动删除这个队列
             * @param5 携带一些附加信息 供其它消费者获取
             */
            channel.queueDeclare(queueName, false, false, false, null);
            // 4. 发送消息给队列 Queue
            for (int i = 0; i < 20; i++) {
                // 5. 准备消息内容
                String message = "Hello RabbitMQ" + i;
                channel.basicPublish("", queueName, null, message.getBytes());
                System.out.println("消息发送完成~~~发送的消息为:" + message);
            }
            // 6. 关闭信道、连接
            RabbitMqUtil.close(connection, channel);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    3.1.3、Consumer代码

    package com.example.rabbitmq03.business.test2;
    
    import com.example.rabbitmq03.business.test2.RabbitMqUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    public class Consumer {
        
        public static void main(String[] args) throws Exception {
            // 获取连接
            Connection connection = RabbitMqUtil.getConnection("消费者");
            
            final Channel channel = connection.createChannel();
            String queueName = "code_work_queue1";
            
            // 定义消费者
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 消息体
                    String msg = new String(body, "utf-8");
                    System.out.println("Consumer收到消息:" + msg);
                    try {
                        // 模拟任务耗时 1s
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            
            // param1:队列名称
            // param2:设置是否自动确认。
            // 当消费者接收到消息后要告诉 mq 消息已接收,如果将此参数设置为 true 表示会自动回复 mq,如果设置为
            // false,要通过编程实现回复
            // param3:设置消费者的回调函数,用来处理 RabbitMQ 推送过来的消息
            // 监听队列
            channel.basicConsume(queueName, true, consumer);
            
            System.out.println("开始接收消息~~~");
            // 等待输入
            System.in.read();
            
            // 关闭信道、连接
            RabbitMqUtil.close(connection, channel);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    3.1.4、Consumer2代码

    Consumer2 与 消费者 Consumer 代码基本一样。只是 Consumer2 没有设置消费耗时间

    3.1.5、操作

    1. 先启动Consumer,Consumer2
    2. 再启动Producer

    在这里插入图片描述

    在这里插入图片描述

    可以发现,两个消费者各自消费了不同 10 条消息,这就实现了任务的分发。

    3.2、公平分发

    1. 能者多劳。谁消费得快,谁就消费得多,就上面而言,Consumer2应该消费最多
    2. 实现步骤
      1. 消费者 必须 关闭自动应答 ACK,改成手动应答
      2. 消费者 设置 channel.basicQos(1),表示消费者一次只处理一条消息

    3.2.1、步骤1–拷贝test2的代码

    在这里插入图片描述

    3.2.2、步骤2–修改Consumer,Consumer2的代码

    在这里插入图片描述

    3.2.3、修改后的Consumer代码

    package com.example.rabbitmq03.business.test3;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    import com.rabbitmq.client.*;
    
    public class Consumer {
        
        public static void main(String[] args) throws Exception {
            // 获取连接
            Connection connection = RabbitMqUtil.getConnection("消费者");
            
            final Channel channel = connection.createChannel();
            String queueName = "code_work_queue1";
            
            // 定义消费者
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 消息体
                    String msg = new String(body, "utf-8");
                    System.out.println("Consumer收到消息:" + msg);
                    try {
                        // 模拟任务耗时 1s
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    /**
                     * @param1:deliveryTag:用来标识消息的id
                     * @param2:multiple:是否批量。true:将一次性处理 ACK 所有小于 deliveryTag 的消息
                     */
                    // 手动确认
                    channel.basicAck(deliveryTag, false);
                }
            };
            // 限速,表示消费者一次只处理一条消息
            channel.basicQos(1);
            // param1:队列名称
            // param2:设置是否自动确认。
            // 当消费者接收到消息后要告诉 mq 消息已接收,如果将此参数设置为 true 表示会自动回复 mq,如果设置为
            // false,要通过编程实现回复
            // param3:设置消费者的回调函数,用来处理 RabbitMQ 推送过来的消息
            // 监听队列
            channel.basicConsume(queueName, false, consumer);
            
            System.out.println("开始接收消息~~~");
            // 等待输入
            System.in.read();
            
            // 关闭信道、连接
            RabbitMqUtil.close(connection, channel);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    3.2.4、测试

    1. 先启动Consumer,Consumer2
    2. 再启动Producer
      在这里插入图片描述
      在这里插入图片描述

    可以看到 Consumer 只消费了1条消息,Consumer2消费了19条消息,实现公平分发。

  • 相关阅读:
    nginx配置参数详细解析
    酷开系统 酷开科技,将家庭娱乐推向新高潮
    关于 ELEMENTOR 的常见问题
    2022最全Spring面试题70道
    ingenic carrier-server fgets is err问题
    使用Uiautomator2进行APP自动化测试
    深入探讨Qt树状显示功能:理论与实践
    Python网络爬虫5-实战网页爬取
    Ubuntu批量新建文件
    MySQL数据库入门到大牛_基础_09_子查询(子查询分类方法;单行子查询,多行子查询;相关子查询)
  • 原文地址:https://blog.csdn.net/zhou920786312/article/details/127425397