• RabbitMQ初步到精通-第四章-RabbitMQ工作模式-WORK


    第四章-RabbitMQ工作模式-WORK

    1.模式介绍

    1.1 work模式

    Work模式与前面的Simple模式一致,也是消息经由生产者发到Exchange再到queue再被消费者消费。不同点在于SIMPL模式是一个队列对应的一个消费者,此模式会由一个队列对应两个消费者或大于两个消费者。

     

    1.2 work模式模拟

    此模式下,我们的消费者实现了对消息的平均消费,但如果消费者1消费能力若于消费者2,那就会造成消费者1 的消息积压,这时候我们就会想到使用公平模式,能者多劳,消费快的多消费,消费少的少消费。合理消息。后面代码会涉及到。

     

    2.验证代码

    2.1 work平均模式

    我们还是举小明洗澡的例子,小明洗澡比较孤独,这时候小明的女朋友小丽也来一块洗澡了,但两个人不想公用一个喷头,便又接入了一个喷头,一人一个开始洗,但是小明洗澡比较快,虽然洗完了,但是喷头关不上,只好把水浪费掉,小丽洗澡比较慢,但感觉水又不太够用。一共就20升水,一人10升。

    2.1.1 生产者

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 一个生产者,一个默认的交换机,一个队列,两个消费者
    5. * @createTime 2022/07/27 19:34:00
    6. */
    7. public class WaterProducer {
    8. public static final String QUEUE_NAME = "SolarWaterHeater";
    9. //生产者
    10. public static void main(String[] args) throws Exception {
    11. //1、获取connection
    12. Connection connection = RabbitCommonConfig.getConnection();
    13. //2、创建channel
    14. Channel channel = connection.createChannel();
    15. for (int i = 1; i <= 20; i++) {
    16. sendMsg(channel, i);
    17. Thread.sleep(200);
    18. }
    19. //4、关闭管道和连接
    20. channel.close();
    21. connection.close();
    22. }
    23. private static void sendMsg(Channel channel, int k) throws IOException {
    24. //3、发送消息到exchange
    25. String msg = k + "升";
    26. /**
    27. * 参数1:指定exchange,使用“”。默认的exchange
    28. * 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中
    29. * 参数3:指定传递的消息携带的properties
    30. * 参数4:指定传递的消息,byte[]类型
    31. */
    32. channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    33. System.out.println("水龙头放水成功!" + k + "升");
    34. }
    35. }

    2.1.2 消费者

    小明洗澡:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 工作模式 一个生产者,一个默认的交换机,一个队列,两个消费者
    5. * 需要在consumer消费者端,平均分配
    6. * @createTime 2022/07/27 19:36:00
    7. */
    8. public class XMShowerConsumer {
    9. public static final String QUEUE_NAME = "SolarWaterHeater";
    10. //消费者
    11. public static void main(String[] args) throws Exception {
    12. //1、获取连对象、
    13. Connection connection = RabbitCommonConfig.getConnection();
    14. //2、创建channel
    15. Channel channel = connection.createChannel();
    16. //3、创建队列-helloworld
    17. /**
    18. * 参数1:queue 指定队列名称
    19. * 参数2:durable 是否开启持久化(true)
    20. * 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)
    21. * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
    22. * 参数5:arguments 指定队列携带的信息
    23. */
    24. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    25. //4.开启监听Queue
    26. DefaultConsumer consumer = new DefaultConsumer(channel) {
    27. int i = 1;
    28. @Override
    29. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    30. try {
    31. Thread.sleep(500);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. if (i > 5) {
    36. System.out.println("小明洗澡洗完,开始浪费-第:" + new String(body, "UTF-8"));
    37. } else {
    38. System.out.println("小明洗澡已用水-第: " + new String(body, "UTF-8"));
    39. }
    40. i++;
    41. }
    42. };
    43. /**
    44. * 参数1:queue 指定消费哪个队列
    45. * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
    46. * 参数1:cancelCallback 指定消费回调
    47. */
    48. channel.basicConsume(QUEUE_NAME, true, consumer);
    49. System.out.println("小明开始洗澡-快速洗......");
    50. //5、键盘录入,让程序不结束!
    51. System.in.read();
    52. //6、释放资源
    53. channel.close();
    54. connection.close();
    55. }
    56. }

    小丽洗澡:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 工作模式 一个生产者,一个默认的交换机,一个队列,两个消费者
    5. * 需要在consumer消费者端,平均分配
    6. * @createTime 2022/07/27 19:36:00
    7. */
    8. public class XLShowerConsumer {
    9. public static final String QUEUE_NAME = "SolarWaterHeater";
    10. //消费者
    11. public static void main(String[] args) throws Exception {
    12. //1、获取连对象、
    13. Connection connection = RabbitCommonConfig.getConnection();
    14. //2、创建channel
    15. Channel channel = connection.createChannel();
    16. //3、创建队列-helloworld
    17. /**
    18. * 参数1:queue 指定队列名称
    19. * 参数2:durable 是否开启持久化(true)
    20. * 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)
    21. * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
    22. * 参数5:arguments 指定队列携带的信息
    23. */
    24. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    25. //4.开启监听Queue
    26. DefaultConsumer consumer = new DefaultConsumer(channel) {
    27. @Override
    28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    29. try {
    30. Thread.sleep(2000);
    31. } catch (InterruptedException e) {
    32. e.printStackTrace();
    33. }
    34. System.out.println("小丽洗澡已用水-第: " + new String(body, "UTF-8"));
    35. }
    36. };
    37. /**
    38. * 参数1:queue 指定消费哪个队列
    39. * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
    40. * 参数1:cancelCallback 指定消费回调
    41. */
    42. channel.basicConsume(QUEUE_NAME, true, consumer);
    43. System.out.println("小丽开始洗澡-慢慢洗......");
    44. //5、键盘录入,让程序不结束!
    45. System.in.read();
    46. //6、释放资源
    47. channel.close();
    48. connection.close();
    49. }
    50. }

    2.1.3 结果验证

    生产者放水消息:

    1. 水龙头放水成功!1
    2. 水龙头放水成功!2
    3. 水龙头放水成功!3
    4. 水龙头放水成功!4
    5. 水龙头放水成功!5
    6. 水龙头放水成功!6
    7. 水龙头放水成功!7
    8. 水龙头放水成功!8
    9. 水龙头放水成功!9
    10. 水龙头放水成功!10
    11. 水龙头放水成功!11
    12. 水龙头放水成功!12
    13. 水龙头放水成功!13
    14. 水龙头放水成功!14
    15. 水龙头放水成功!15
    16. 水龙头放水成功!16
    17. 水龙头放水成功!17
    18. 水龙头放水成功!18
    19. 水龙头放水成功!19
    20. 水龙头放水成功!20

    小明洗澡

    1. 小明开始洗澡-快速洗......
    2. 小明洗澡已用水-第: 2
    3. 小明洗澡已用水-第: 4
    4. 小明洗澡已用水-第: 6
    5. 小明洗澡已用水-第: 8
    6. 小明洗澡已用水-第: 10
    7. 小明洗澡洗完,开始浪费-第:12
    8. 小明洗澡洗完,开始浪费-第:14
    9. 小明洗澡洗完,开始浪费-第:16
    10. 小明洗澡洗完,开始浪费-第:18
    11. 小明洗澡洗完,开始浪费-第:20

    小丽洗澡

    1. 小丽洗澡已用水-第: 1
    2. 小丽洗澡已用水-第: 3
    3. 小丽洗澡已用水-第: 5
    4. 小丽洗澡已用水-第: 7
    5. 小丽洗澡已用水-第: 9
    6. 小丽洗澡已用水-第: 11
    7. 小丽洗澡已用水-第: 13
    8. 小丽洗澡已用水-第: 15
    9. 小丽洗澡已用水-第: 17
    10. 小丽洗澡已用水-第: 19

    从结果来看,两个人都是平均用水,虽然有快有慢,慢的就会存在堆积情况。

    2.2 公平模式

    小丽就有意见了,自己不够洗,小明还白白浪费那么多水。小明给出了建议,让小丽把碰头开关开大点,自己开小点,这样就让小丽多用点水,自己少用点也就够了。

    2.2.1 生产者

    同上面生产者一致

    2.2.2 消费者

    小明洗澡:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 工作模式 一个生产者,一个默认的交换机,一个队列,两个消费者
    5. * 需要在consumer消费者端,添加Qos能力以及更改为ACK手动即可让消费者根据自己的能力消费,不是RabbitMQ默认的平均分配了
    6. * @createTime 2022/07/27 19:36:00
    7. */
    8. public class XMShowerConsumer {
    9. public static final String QUEUE_NAME = "SolarWaterHeater";
    10. //消费者
    11. public static void main(String[] args) throws Exception {
    12. //1、获取连对象、
    13. Connection connection = RabbitCommonConfig.getConnection();
    14. //2、创建channel
    15. Channel channel = connection.createChannel();
    16. channel.basicQos(1);
    17. //3、创建队列-helloworld
    18. /**
    19. * 参数1:queue 指定队列名称
    20. * 参数2:durable 是否开启持久化(true)
    21. * 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)
    22. * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
    23. * 参数5:arguments 指定队列携带的信息
    24. */
    25. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    26. //4.开启监听Queue
    27. DefaultConsumer consumer = new DefaultConsumer(channel) {
    28. @Override
    29. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    30. try {
    31. Thread.sleep(2000);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. System.out.println("小明洗澡已用水-第: " + new String(body, "UTF-8"));
    36. //手动ACK(接收信息,指定是否批量操作)
    37. channel.basicAck(envelope.getDeliveryTag(), false);
    38. }
    39. };
    40. /**
    41. * 参数1:queue 指定消费哪个队列
    42. * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
    43. * 参数1:cancelCallback 指定消费回调
    44. */
    45. channel.basicConsume(QUEUE_NAME, false, consumer);
    46. System.out.println("小明开始洗澡-慢慢洗......");
    47. //5、键盘录入,让程序不结束!
    48. System.in.read();
    49. //6、释放资源
    50. channel.close();
    51. connection.close();
    52. }
    53. }

    小丽洗澡:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 工作模式 一个生产者,一个默认的交换机,一个队列,两个消费者
    5. * 需要在consumer消费者端,添加Qos能力以及更改为ACK手动即可让消费者根据自己的能力消费,不是RabbitMQ默认的平均分配了
    6. * @createTime 2022/07/27 19:36:00
    7. */
    8. public class XLShowerConsumer {
    9. public static final String QUEUE_NAME = "SolarWaterHeater";
    10. //消费者
    11. public static void main(String[] args) throws Exception {
    12. //1、获取连对象、
    13. Connection connection = RabbitCommonConfig.getConnection();
    14. //2、创建channel
    15. Channel channel = connection.createChannel();
    16. channel.basicQos(1);
    17. //3、创建队列-helloworld
    18. /**
    19. * 参数1:queue 指定队列名称
    20. * 参数2:durable 是否开启持久化(true)
    21. * 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)
    22. * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
    23. * 参数5:arguments 指定队列携带的信息
    24. */
    25. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    26. //4.开启监听Queue
    27. DefaultConsumer consumer = new DefaultConsumer(channel) {
    28. @Override
    29. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    30. try {
    31. Thread.sleep(500);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. System.out.println("小丽洗澡已用水-第: " + new String(body, "UTF-8"));
    36. //手动ACK(接收信息,指定是否批量操作)
    37. channel.basicAck(envelope.getDeliveryTag(), false);
    38. }
    39. };
    40. /**
    41. * 参数1:queue 指定消费哪个队列
    42. * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
    43. * 参数1:cancelCallback 指定消费回调
    44. */
    45. channel.basicConsume(QUEUE_NAME, false, consumer);
    46. System.out.println("小丽开始洗澡-快速洗......");
    47. //5、键盘录入,让程序不结束!
    48. System.in.read();
    49. //6、释放资源
    50. channel.close();
    51. connection.close();
    52. }
    53. }

    2.2.3 结果验证

    生产者:

    1. 水龙头放水成功!1
    2. 水龙头放水成功!2
    3. 水龙头放水成功!3
    4. 水龙头放水成功!4
    5. 水龙头放水成功!5
    6. 水龙头放水成功!6
    7. 水龙头放水成功!7
    8. 水龙头放水成功!8
    9. 水龙头放水成功!9
    10. 水龙头放水成功!10
    11. 水龙头放水成功!11
    12. 水龙头放水成功!12
    13. 水龙头放水成功!13
    14. 水龙头放水成功!14
    15. 水龙头放水成功!15
    16. 水龙头放水成功!16
    17. 水龙头放水成功!17
    18. 水龙头放水成功!18
    19. 水龙头放水成功!19
    20. 水龙头放水成功!20

    小明:

    1. 小明开始洗澡-慢慢洗......
    2. 小明洗澡已用水-第: 2
    3. 小明洗澡已用水-第: 7
    4. 小明洗澡已用水-第: 12
    5. 小明洗澡已用水-第: 17

    小丽:

    1. 小丽开始洗澡-快速洗......
    2. 小丽洗澡已用水-第: 1
    3. 小丽洗澡已用水-第: 3
    4. 小丽洗澡已用水-第: 4
    5. 小丽洗澡已用水-第: 5
    6. 小丽洗澡已用水-第: 6
    7. 小丽洗澡已用水-第: 8
    8. 小丽洗澡已用水-第: 9
    9. 小丽洗澡已用水-第: 10
    10. 小丽洗澡已用水-第: 11
    11. 小丽洗澡已用水-第: 13
    12. 小丽洗澡已用水-第: 14
    13. 小丽洗澡已用水-第: 15
    14. 小丽洗澡已用水-第: 16
    15. 小丽洗澡已用水-第: 18
    16. 小丽洗澡已用水-第: 19
    17. 小丽洗澡已用水-第: 20

    从结果看:实现了我们期望的结果,小丽用的水多了,小明用的水少,大家都洗好了。

    3.模式总结

    此模式我们最应该注意的就是平均模式与公平模式的实现,这里是靠消费者的手工确认机制来实现的。

        String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
    

    如上代码中的 第二个参数 autoAck,

    true的时候,属于自动确认,消费者一旦接收到此消息,就会发回确认消息给Broker,Broker会从队列中删除掉此消息。当有多个消费者注册到同一个queue时,会默认轮询分发。

    false的时候,属于手动确认,消费者虽然接收到了消息,还需要执行一个 方法来告诉Broker才行

    channel.basicAck(envelope.getDeliveryTag(), false);

    若没有告诉Broker,Broker还会将此消息再次发送。

    同时此种情况下还需要使用此方法:

    void basicQos(int prefetchCount) throws IOException;

    指定每次消费抓取的数量

    maximum number of messages that the server will deliver

    这样我们通过这些改造,实现了,消费者消费的时候,消费一个告诉Broker删除一个,没消费的时候就不要给消费者投递了,最终实现了公平消费。

  • 相关阅读:
    基于.NetCore开发博客项目 StarBlog - (11) 实现访问统计
    2-FreeRTOS编码标准、风格指南
    树莓派4B开发之一安装64位系统并实现SSH访问
    DV,OV通配符的区别
    【2022河南萌新联赛第(五)场:信息工程大学】【部分思路题解+代码解析】
    焱融科技为国家重点实验室打造海量高性能存储
    SpringBoot整合MQTT总结
    leetcode 907. 子数组的最小值之和(重做)
    121. 买卖股票的最佳时机
    tiup cluster template
  • 原文地址:https://blog.csdn.net/blucastle/article/details/127927554