• 常见的RabbitMQ测试点及解决办法


    一、RabbitMQ简介

    RabbitMQ 是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

    在使用RabbitMQ的过程当中,经常会遇到的异常场景有:

    • 消费者启动后,未显示在队列的Consumers;
    • 消费者启动后,消费入库时报错;
    • 消费者启动后,输入正确的json,重复入库;
    • 消费者启动后,消费但未入库;
    • 消费者启动后,消费者刚开始显示,但后来消失(消费者假死);
    • 消费者启动后,输入错误的json,消费失败;
    • 消费者启动后,消费者堵塞(队列阻塞,无法继续添加数据,可能导致服务挂掉)。

    二、RabbitMQ常见测试场景

    1. RabbitMQ弄丢了数据
    测试场景:简而言之,就是RabbitMQ自己弄丢了数据。

    解决办法:这个需要开启RabbitMQ的持久化,就是消息写入之后会持久化到磁盘中,哪怕是RabbitMQ自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。

    持久化的步骤有两个,第一个是创建queue的时候将其设置为持久化的,这样就可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里的数据。

    第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。

    2. 生产者弄丢了数据

    测试场景:生产者给RabbitMQ发送数据的时候,可能由于网络原因,数据没遇发送到RabbitMQ。

    解决办法:这个时候我们可以选择RabbitMQ的事务功能,在生产者发送数据之前开启事务(channel.txSelect),然后发送消息,如果消息没有顺利的被RabbitMQ接收,这个时候生产者就会收到异常的报错信息,可以回滚事务(channel.txRollback),重试消息发送。

    这个事务有个缺陷,就是比较损耗RabbitMQ服务器的性能。

    第二种方法:开启RabbitMQ的confirm模式,每次生产者给RabbitMQ发送消息的时候,都会分配一个唯一的id,如果写入RabbitMQ中,RabbitMQ会回传一个ack消息,告诉你消息发送成功;如果写入RabbitMQ失败,会回调一个nack接口,告诉你消息发送失败,可以进行重试。

    事务机制和cnofirm机制不同之处在于,事务是同步的,你提交一个事务之后会阻塞在那儿,等待事务处理才能进行下一个事务,但是confirm机制是异步的,发送这个消息之后就可以发送下一个消息,然后消息被RabbitMQ接收了之后,会异步回调通知你这个消息接收到了。

    所以生产者为了避免数据丢失,都是用confirm机制的。

    3.消费端弄丢了数据
    测试场景:RabbitMQ如果丢失了数据,主要是因为消费数据的时候,还没处理,结果服务挂了,比如服务进行了重启,但是RabbitMQ认为你都消费了,这种情况数据就丢了。

    解决办法:这个时候得用RabbitMQ提供的ack机制,简单来说,就是你关闭RabbitMQ自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。

    这样的话,如果你还没处理完,不就没有ack?那RabbitMQ就认为你还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。

    4.数据积压
    大量消息在RabbitMQ里积压,没有被及时消费。

    测试场景:上万条数据在RabbitMQ里积压四个小时,从下午的5点持续到晚上的9点。这个是我真实遇到过的一个场景,主要是因为线上系统故障。

    解决办法:扩容服务器集群,新建一个topic,partition是之前的十倍,将现有的consumer都停止,修复问题之后,新建一个新的consumer服务,部署上去消费积压的数据,消费不做耗时处理,之间轮询写入新的queue。

    5.RabbitMQ与JAVA
    生产者连接rabbitMQ的代码:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    public class RabbitProducer {
      private static final String EXCHANGE_NAME = "exchange_demo";
      private static final String ROUTING_KEY = "routingkey_demo";
      private static final String QUEUE_NAME = "queue_demo";
      private static final String IP_ADDRESS = "127.0.0.1";
      private static final int PORT = 5672;  // RabbitMQ服务端默认端口号为5672
      
      public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("zifeiy");
        factory.setPassword("passwd");
        Connection connection = factory.newConnection();  // 建立连接
        Channel channel = connection.createChannel();    // 创建信道
        // 创建一个type="direct"、持久化的、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
        // 创建一个持久化、非排他的、非自动删除的队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 将交换器和队列通过路由绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        // 发送一条持久化的消息:hello world!
        String message = "hello,world!";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, 
              MessageProperties.PERSISTENT_TEXT_PLAIN, 
              message.getBytes());
        // 关闭资源
        channel.close();
        connection.close();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    消费者连接rabbitMQ的代码:

    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Address;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.Connection;
    
    public class RabbitConsumer {
      private static final String QUEUE_NAME = "queue_demo";
      private static final String IP_ADDRESS = "127.0.0.1";
      private static final int PORT = 5672;
      
      public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Address[] addresses = new Address[] {
            new Address(IP_ADDRESS, PORT)
        };
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("zifeiy");
        factory.setPassword("passwd");
        // 这里的连接方式与生产者的demo略有不同,注意区分
        Connection connection = factory.newConnection(addresses);  // 创建连接
        final Channel channel = connection.createChannel();  // 创建信道
        channel.basicQos(64);   // 设置客户端最多接受未被ack的消息的个数
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, 
                  AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("recv message: " + new String(body));
            try {
              TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
            channel.basicAck(envelope.getDeliveryTag(), false);
          }
        };
        channel.basicConsume(QUEUE_NAME, consumer);
        // 等待回调函数执行完毕后,关闭资源
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    在这里插入图片描述

    注:仅供参考,有疑问或没疑问都可私信博主哦,共同学习,一起进步,奥利给!!!

  • 相关阅读:
    quarkus依赖注入之十一:拦截器高级特性上篇(属性设置和重复使用)
    python单例模式应用之pymongo连接
    Matlab:构建模式表达式
    Distributions & Currents
    Python---break关键字对for...else结构的影响
    智加科技多项成果亮相ITS World Congress 两款智能重卡计划量产
    机器学习(9)---线性回归中的公式推导(手推)、闭式解和数值解
    mysql自己update更新自己(根据本表字段更新本表)
    点云数据是什么
    解决golang无法下载依赖的奇葩问题
  • 原文地址:https://blog.csdn.net/web18536564348/article/details/126577721