• RabbitMQ 学习(四)---- 工作队列模式


    RabbitMQ 学习(四)---- 工作队列模式


      这是第二种模型 (Work Queue),任务模型,当消息处理比较耗时的时候,生产者发送消息的速度远远大于消费的速度,长此以往,消息就会堆积的越来越多,无法及时处理,可以使用work模型,让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。


    (1)公平竞争机制


      一个生产者发送消息到默认交换机,通过路由同名规则将 队列中的信息 循环分发到 监听队列的消费者中,一对多,不过是公平分发,按照顺序将每条消息发送给每一个消费者。每个消费者平均分配队列中的消息。公平竞争。

    在这里插入图片描述

      这里的生产者代码与 之前简单模型,一模一样,只不过循环发送了多条消息等待分配


    package workAver;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.MessageProperties;
    import utils.RabbitMQUtils;
    
    import java.io.IOException;
    
    public class AvgProvider {
        public static void main(String[] args) {
            Connection connection =null;
            Channel channel =null;
            try {
                // 1、获取连接对象
                connection = RabbitMQUtils.getConnect();
    
                //2、通过连接获取信道
                assert connection != null;
                channel = connection.createChannel();
    
                // 声明发送的消息
                String message = "work平均分配生产的消息!";
    
                //3、声明队列信息
                channel.queueDeclare("work", false, false, false, null);
    
                for (int i = 1; i <= 10; i++) {
                    //4、使用信道发送消息, routineKey与队列同名方便匹配
                    channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, (message+": "+i).getBytes());
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                RabbitMQUtils.close(channel,connection);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

      创建多个消费者,与之前简单模型一模一样,只不过创建了多个连接到 rabbitMq的服务器上对 队列进行监听


    消费者1

    package workAver;
    
    import com.rabbitmq.client.*;
    import utils.RabbitMQUtils;
    
    import java.io.IOException;
    
    public class AvgCustomer1 {
        public static void main(String[] args) {
    
            try {
    
                Connection connection = RabbitMQUtils.getConnect();
                Channel channel = connection.createChannel();
    
                // 声明队列
                channel.queueDeclare("work", false, false, false, null);
    
                // 该消费者1 接收队列中的消息
                channel.basicConsume("work", true, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println(new String(body));
                    }
                });
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    消费者2

    package workAver;
    
    import com.rabbitmq.client.*;
    import utils.RabbitMQUtils;
    
    import java.io.IOException;
    
    public class AvgCustomer2 {
        public static void main(String[] args) {
    
            try {
    
                Connection connection = RabbitMQUtils.getConnect();
                Channel channel = connection.createChannel();
    
                // 声明队列
                channel.queueDeclare("work", false, false, false, null);
    
                // 该消费者1 接收队列中的消息
                channel.basicConsume("work", true, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println(new String(body));
                    }
                });
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    消费者1 收到的消息情况,全部是奇数


    在这里插入图片描述


    消费者2 收到的消息的情况,全部都是偶数


    在这里插入图片描述


    可以证明,监听队列的消费者中间一次循环接收队列中的消息,公平竞争


    (2)能者多劳机制


    我们需要了解消费者的自动确认机制

      默认情况下,RabbitMQ会按照顺序将每个消息发送到下一个使用者,平均每个消费者会受到相同数量的消息,这种分发消息的机制称为循环。


    而为什么会自动平均分配呢?

      因为我们在消费者消费的时候有一个参数设置为 autoAck:true,我们设置消费者接收消息自动确认,而一般都是队列中分配好了那个消费者要传递什么信息,直接一次全部传递过去,不是消费一个确认一下。消费者接收到所有消息之后自动确认,队列中会标记删除所有的信息,他不关心你接收完信息之后的后续业务操作。就是只关心你是否收到了数据。

      消息自动确认机制,完成一项任务可能需要几秒钟甚至几分钟,如果一个消费者开始了一项长期的任务,却只完成了一部分就挂了,那么RabbitMQ一旦将消息传递给消费者,就会立刻标记删除,那么因为消费者挂了,接收数据的时候已经确认应答了,队列中的数据也删除了,所以剩余接收到的信息也没了。


    autoAck 取消 ,手动确认


      如果生产者发送10条消息,消费者1拿到5条,消费者2拿到5条,不进行自动应答,服务器队列的数据即使消费了,我们没有应答就不会被标记删除,保证服务器队列中的数据一直还在。如果消费者处理完了这条数据,那么手动确认,队列中知道已经确认了进行删除


    接收消息的时候,参数设置为false

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SGXb3SYK-1663644288424)(C:\Users\rain7\AppData\Roaming\Typora\typora-user-images\1662977824211.png)]

    执行回调函数的时候,手动确认


    在这里,我们在 handlerDeliver 方法中,

    • 第一个参数 deliveryTag,代表本次信道传递的标签标识,Envelope 类的实例包含了 本次传递的 deliverTag数据可直接获取

    • 第二个参数 multiple:批量

      比如批量确认.当multiple的值设置为true时,RabbitMQ将确认指定传输标签以及之前所有未被确认的消息。与单个确认相同,批量确认的作用域为每个通道。例如:通道Ch上有四个未被确认的消息,标签分别为5,6,7,8;当一个delivery_tag值为8并且multiple值为true的确认消息到达通道时,所有5到8的标签都会被确认。如果multiple值设置为false,标签为5,6,7的消息将不会被确认。

         // 手动确认
           channel.basicAck(envelope.getDeliveryTag(), false);
    
    • 1
    • 2

    补充一点还有其他手动确认的API

    在这里插入图片描述


    chanel 传递1条数据


      在之前的公平竞争机制下,说是按照顺序给每一个消费者数据,其实在发送给消费者之前在内部已经计算好了,给消费者一第1、3、5、7、9数据,一次性发送,给消费者二第2、4、6、8、10条数据,然后一次性发送,如果我们手动确认的话,那么相当于一次确认一大批,队列经过确认后进行删除,如果在后续处理业务中挂掉了,照样消息已经删除了。

      我们设置一个信道每次只能消费一个消息,如果其中一个消费者服务器挂掉了,连接就断掉了,剩余的未被手动确认的数据还在队列中保存。也能及时得把剩余的消息继续交给消费者2进行处理,不耽误业务的持续进行。

      这就是能者多劳的机制。就是说处理快的消费者处理完业务会很快的手动确认,然后再次进行接收新的消息,处理慢的消费者经过一段时间处理之后再进行确认,就会能者多劳,业务处理快的接受的消息多,处理满的接受的少

    在消费消息之前设置信道中接收消息只能是1个

    chanel.basicQos(1); // 设置信道中一次只能消费一个信息
    
    • 1

    (3) 能者多劳的代码案例


    1、生产者


    生产者发送给队列 10条消息

    package workAver;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.MessageProperties;
    import utils.RabbitMQUtils;
    
    import java.io.IOException;
    
    public class AvgProvider {
        public static void main(String[] args) {
            Connection connection =null;
            Channel channel =null;
            try {
                // 1、获取连接对象
                connection = RabbitMQUtils.getConnect();
    
                //2、通过连接获取信道
                assert connection != null;
                channel = connection.createChannel();
    
                // 声明发送的消息
                String message = "work平均分配生产的消息!";
    
                //3、声明队列信息
                channel.queueDeclare("work", false, false, false, null);
    
                for (int i = 1; i <= 10; i++) {
                    //4、使用信道发送消息, routineKey与队列同名方便匹配
                    channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, (message+": "+i).getBytes());
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                RabbitMQUtils.close(channel,connection);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    2、消费者1


      接收到消息之后,会休眠2秒,在进行业务操作,作为处理较慢的消费者,设置信道每次传递一个,处理完业务手动确认ack。

    package workAver;
    
    import com.rabbitmq.client.*;
    import utils.RabbitMQUtils;
    
    import java.io.IOException;
    
    public class AvgCustomer1 {
        public static void main(String[] args) {
    
            try {
    
                Connection connection = RabbitMQUtils.getConnect();
                final Channel channel = connection.createChannel();
    
                //声明信道中一次只能接受一条信息
                channel.basicQos(1);
    
                // 声明队列
                channel.queueDeclare("work", false, false, false, null);
    
                // 该消费者1 接收队列中的消息
                channel.basicConsume("work", false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        // 休眠2s
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        // 处理业务
                        System.out.println(new String(body));
    
                        //手动确认,删除队列中的信息
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    3、消费者2


      接收到消息之后,休眠1秒,作为消费较快的消费者,设置信道传递1条数据,处理完业务之后手动确认。

    package workAver;
    
    import com.rabbitmq.client.*;
    import utils.RabbitMQUtils;
    
    import java.io.IOException;
    
    public class AvgCustomer2 {
        public static void main(String[] args) {
    
            try {
    
                Connection connection = RabbitMQUtils.getConnect();
                final Channel channel = connection.createChannel();
    
                //声明信道中一次只能接受一条信息
                channel.basicQos(1);
    
                // 声明队列
                channel.queueDeclare("work", false, false, false, null);
    
                // 该消费者1 接收队列中的消息
                channel.basicConsume("work", false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        // 休眠1秒
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        // 处理业务
                        System.out.println(new String(body));
    
                        // 手动确认
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    4、查看接收结果


    消费者1,处理业务2s,处理了4条消息


    在这里插入图片描述


    消费者2,处理业务1s,处理了6条消息


    在这里插入图片描述

    实现了能者多劳机制

  • 相关阅读:
    Element-plus DatePicker 日期选择器【正则校验时间范围】
    【我不熟悉的javascript】02. 新手菜鸟们,求你不要再用console.log了,console.table用起来,提升效率的小方法
    各种实用航测遥感数据数据免费获取,速来领取!
    Apache DolphinScheduler新一代分布式工作流任务调度平台实战-上
    【开发经验】通知气泡实现思路
    Android Studio的debug和release模式及签名配置
    关于 GIN 的路由树
    HTML+CSS+JS环境保护网页设计期末课程大作业 web前端开发技术 web课程设计 网页规划与设计
    在Linux中如何解决程序崩溃的问题
    清理电脑存储的五种方法
  • 原文地址:https://blog.csdn.net/rain67/article/details/126949861