• RabbitMQ初步到精通-第十一章-RabbitMQ之常见问题汇总


    目录

    RabbitMQ之常见问题汇总

    1.rabbitmq丢消息场景

    1.1 消息未持久化丢失

    1.2 消费时消息丢失

    1.3 如何阻止消息丢失

    2. mq消费消息是pull 还是 push

    2.1 pull形式消费

    2.2 push形式消费

    3. mq重复消费场景

    3.1 生产端重复情况

    3.2 消费端重复

    3.3 如何防止

    4.prefetch作用


    RabbitMQ之常见问题汇总

    1.rabbitmq丢消息场景

    在前面的文章中,我们介绍了mq如何防消息丢失,从消息从生产者发送到Broker的Exchange,再到Queue,再Deliver到消费者,各个环节都有可能会丢失消息。

    这里我们主要模拟一下两个场景:消息未持久化丢失和消费时消息丢失。

    1.1 消息未持久化丢失

    生产者发送了一条未持久化消息,此时消息未被消费,那这条消息存储在内存中

    此时重启MQ,那这条消息就会丢失掉。 

    1.2 消费时消息丢失

    若消费时使用自动确认机制,产生了2条消息,第一条消息由于处理时间比较长,第二条消息已经拉取到了本地BlockingQueue中,再第一条消息处理的过程中,系统宕机了,第二条消息还没来的及消费,就丢失了。

    1.3 如何阻止消息丢失

    参考 RabbitMQ初步到精通-第五章-RabbitMQ之消息防丢失_Mr-昊哥的博客-CSDN博客

    2. mq消费消息是pull 还是 push

    mq消费既支持pull的形式也支持push的形式。pull形式,是靠客户端一直拉取服务端的消息,而客户端并不知道什么时候会有消息进入到队列中,会一直取消息,造成额外的系统开销,效率也不高。

    而push形式,是一般我们采用的方式。将监听函数注册到服务中,mqBroker收到消息进行投递,再调用监听函数方法消费消息。

    2.1 pull形式消费

    代码中使用:

    1. for (int i = 0; i < 2; i++) {
    2. GetResponse response = channel.basicGet(QUEUE_NAME, true);
    3. System.out.println("消费:" + new String(response.getBody()));
    4. }

    抓包情况:

    都是先发出请求,再将结果返回。

    2.2 push形式消费

    代码:

    1. DefaultConsumer consumer = new DefaultConsumer(channel) {
    2. @SneakyThrows
    3. @Override
    4. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    5. Thread.sleep(1000);
    6. System.out.println("接收到消息:" + new String(body, "UTF-8"));
    7. channel.basicAck(envelope.getDeliveryTag(), false);
    8. }
    9. };
    10. channel.basicConsume(QUEUE_NAME, false, consumer);

    抓包情况:

    服务端将消息推送过来:

    3. mq重复消费场景

    3.1 生产端重复情况

    从生产端即产生了重复的情况,多由于系统bug,或系统间调用造成重试等原因,推出了重复的消息。

    3.2 消费端重复

    消费使用手动ACK模式,两个消费者,消费者1,虽然消费到了消息,由于处理时间长,未能及时ACK,这时候系统宕机,mq会将NACK状态的消息变为Ready状态,又推送给了消费者2 ,这样就造成了,同一条消息 同时在消费者1和消费者2都消费到了。

    生产者:

    1. public class RepeatProducer {
    2. public static final String QUEUE_NAME = "work";
    3. //生产者
    4. public static void main(String[] args) throws Exception {
    5. //1、获取connection
    6. Connection connection = RabbitCommonConfig.getConnection();
    7. //2、创建channel
    8. Channel channel = connection.createChannel();
    9. for (int i = 1; i <= 3; i++) {
    10. sendMsg(channel, i);
    11. Thread.sleep(1000);
    12. }
    13. //4、关闭管道和连接
    14. channel.close();
    15. connection.close();
    16. }
    17. private static void sendMsg(Channel channel, int k) throws IOException {
    18. //3、发送消息到exchange
    19. String msg = "hello work :" + k;
    20. channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    21. System.out.println("生产者发布消息成功!" + k);
    22. }
    23. }

    消费者1:

    1. public class RepeatConsumer1 {
    2. public static final String QUEUE_NAME = "work";
    3. //消费者
    4. public static void main(String[] args) throws Exception {
    5. //1、获取连对象、
    6. Connection connection = RabbitCommonConfig.getConnection();
    7. //2、创建channel
    8. Channel channel = connection.createChannel();
    9. channel.basicQos(1);
    10. //3、创建队列
    11. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    12. //4.开启监听Queue
    13. DefaultConsumer consumer = new DefaultConsumer(channel) {
    14. @Override
    15. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    16. System.out.println("Consumer1 接收到消息:" + new String(body, "UTF-8"));
    17. try {
    18. Thread.sleep(1000 * 60);
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. }
    22. //手动ACK(接收信息,指定是否批量操作)
    23. channel.basicAck(envelope.getDeliveryTag(), false);
    24. }
    25. };
    26. //5.关闭自动ACK
    27. channel.basicConsume("work", false, consumer);
    28. System.out.println("消费者1开始监听队列");
    29. //6、键盘录入,让程序不结束!
    30. System.in.read();
    31. //7、释放资源
    32. channel.close();
    33. connection.close();
    34. }
    35. }

    消费者2:

    1. public class RepeatConsumer2 {
    2. public static final String QUEUE_NAME = "work";
    3. //消费者
    4. public static void main(String[] args) throws Exception {
    5. //1、获取连对象、
    6. Connection connection = RabbitCommonConfig.getConnection();
    7. //2、创建channel
    8. Channel channel = connection.createChannel();
    9. channel.basicQos(1);
    10. //3、创建队列
    11. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    12. //4.开启监听Queue
    13. DefaultConsumer consumer = new DefaultConsumer(channel) {
    14. @Override
    15. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    16. System.out.println("Consumer2 接收到消息:" + new String(body, "UTF-8"));
    17. try {
    18. Thread.sleep(1000);
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. }
    22. //手动ACK(接收信息,指定是否批量操作)
    23. channel.basicAck(envelope.getDeliveryTag(), false);
    24. }
    25. };
    26. //5.关闭自动ACK
    27. channel.basicConsume(QUEUE_NAME, false, consumer);
    28. System.out.println("消费者2开始监听队列");
    29. //6、键盘录入,让程序不结束!
    30. System.in.read();
    31. //7、释放资源
    32. channel.close();
    33. connection.close();
    34. }
    35. }

    结果:

    第一次运行:

    消费者1开始监听队列
    Consumer1 接收到消息:hello work :1

    消费者2开始监听队列
    Consumer2 接收到消息:hello work :2
    Consumer2 接收到消息:hello work :3

    运行过程中停掉消费者1

    消费者2开始监听队列
    Consumer2 接收到消息:hello work :2
    Consumer2 接收到消息:hello work :3
    Consumer2 接收到消息:hello work :1

    3.3 如何防止

    消费时引入幂等机制,使用分布式锁,数据库唯一索引等控制。

    4.prefetch作用

    接触到prefetch分别会在java和spring 客户端中出现,只有需要手动ACK的时候才起作用。

    设置了此值,会控制mq服务端给客户端推送的消息数量。

    例如设置15,mq堆积了大量消息的情况下,会首次推送给 客户端15个消息,若消息消费慢的情况,mq的服务端会始终保证15个unack的消息的前提下,给客户端推送。

    1.java amqp client

    1. channel.basicQos(15);
    2. //开启手动确认
    3. channel.basicConsume(QUEUE_NAME, false, consumer);

    2. spring 配置中

    1. spring.rabbitmq.listener.simple.acknowledge-mode = manual
    2. spring.rabbitmq.listener.simple.prefetch = 15

    spring若没进行配置默认250

    1. private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;
    2. public static final int DEFAULT_PREFETCH_COUNT = 250;
  • 相关阅读:
    基于Ubunru服务器搭建wordpress个人博客
    XSS线上靶场---Warmups
    基于SSM+SpringBoot+MySQL+Vue前后端分离的高校考试管理系统
    如何在两个相关泛型类之间创建类似子类型的关系
    Rosalind Java|Longest Increasing Subsequence动态规划算法
    前端架构师技术之Sass
    @PathVariable, @RequestBody, @Param使用场景? 为什么用它?
    Codeforces Round #811 (Div. 3)补题(A-G)
    map和set
    instant 关于时间相关的处理
  • 原文地址:https://blog.csdn.net/blucastle/article/details/128104737