• RabbitMQ相关问题


    Mybatis框架相关问题

    部分示例代码

    一、RabbitMQ的核心组件和工作原理?

    在这里插入图片描述
    交换机有四类:
    1、Fanout Exchange

    投递到所有绑定的队列,不需要规则,不需要匹配,相当于广播、群发;
    在这里插入图片描述
    2、Direct Exchange

    根据路由键精确匹配进行路由消息队列;
    在这里插入图片描述
    3、Topic Exchange

    通配符匹配,相当于模糊匹配;

    # 匹配多个单词,* 匹配一个单词,用 . 隔开的为一个单词:

    在这里插入图片描述
    4、Headers Exchange

    基于消息内容中的headers属性进行匹配;
    在这里插入图片描述

    二、如何保证消息可靠投递不丢失的?

    在这里插入图片描述
    要确保一下四个步骤全部成功;
    ① 代表消息从生产者发送到Exchange;
    ② 代表消息从Exchange路由到Queue
    ③ 代表消息在Queue中存储;
    ④ 代表消费者监听Queue并消费消息;

    • ① 代表消息从生产者发送到Exchange;

      • 方案:开启Confirm确认模式;
      • 消息未投递成功,采取补偿措施,或是记录日志,或是发送通知让负责人知道;
      // 实现接口
      implements RabbitTemplate.ConfirmCallback
      
      // 重写方法
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String cause)
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    • ② 代表消息从Exchange路由到Queue;

      • 方案:开启Return返回模式;
      • 消息未投递成功,采取补偿措施,或是记录日志,或是发送通知让负责人知道;
      // 实现接口
      implements RabbitTemplate.ReturnsCallback
      
      /**
       * 消息从 交换机 --> 到 --> 队列,如果失败了,就会回调该方法
       * (失败了才触发该方法,成功是不会触发该方法的)
       *
       * 比如说磁盘满
       *
       * @param returned
       */
      @Override
      public void returnedMessage(ReturnedMessage returned)
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
    • ③ 代表消息在Queue中存储;

      • 方案:
        1、交换机持久化
            /**
             * 声明创建一个FanoutExchange交换机
             *
             * @return
             */
            @Bean
            public DirectExchange directExchange() {
                return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME)
                        .durable(true) // 持久化
                        .build();
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        2、队列持久化
            /**
             * 声明创建一个队列
             *
             * @return
             */
            @Bean
            public Queue directQueue() {
                return QueueBuilder.durable(DIRECT_QUEUE_NAME).build();
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        3、消息持久化
        	//消息体
            Message message = MessageBuilder.withBody(json.getBytes(StandardCharsets.UTF_8))
            	.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
                .build();
        
        • 1
        • 2
        • 3
        • 4
        4、集群高可用部署
    • ④ 代表消费者监听Queue并消费消息;

      • 方案:手动确认消息
      @Slf4j
      @Component
      public class MyRabbitListener {
      
          @RabbitListener(queues = {RabbitConfig.DIRECT_QUEUE_NAME})
          public void onMessage(String msg, @Headers Map<String, Object> header, Message message, Channel channel) {
              try {
                  System.out.println("[RabbitListener]接收到的消息: " + msg);
      
                  //处理业务
      
                  //业务处理成功,手动确认消息
                  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
      
                  //业务处理失效,不确认消息,并且重新入队,这样又可以重新消费
                  // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
      
                  //拒绝消息
                  // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24

    三、RabbitMQ如何保证消息的幂等性?

    幂等性:多次操作,重复操作,对系统不会造成影响; 比如:消息重复发送两次或多次,消息重复消费两次或多次;

    保证MQ的幂等性,只要保证消费者不会重复消费相同的消息即可;

    • 方案:
      • 生产者发送消息时,为每条消息设置一个全局唯一的id
      • 消费者消费消息时,使用 redis 的 setnx 命令:SET id 1 NX,若返回OK,说明该消息之前没有消费过,正常消费;若返回 nil,说明该消息之前已消费过,那就不用处理;

    四、什么是死信队列?死信队列是如何导致的?

    死信队列即DLX,全称为Dead-Letter-Exchange,翻译为:死信交换机。当一个消息在队列中变成死信 (dead message) 之后,它能被重新发送到另外一个交换机中,这个交换机就是DLX,绑定到DLX的队列就称为死信队列;

    死信队列本身也是一个普通的消息队列,可以通过设置一些参数将其设置为死信队列;

    死信队列是一个用于存放无法被消费的消息的队列,这些消息被称为死信,死信队列可以避免消息一直被消费却无法消费成功的情况;

    在这里插入图片描述

    五、RabbitMQ死信队列是如何导致的?

    RabbitMQ导致死信的几种原因:
    1、消息被拒

    • Basic.Nack 且 requeue = false
    • Basic.Reject 且 requeue = false

    2、消息 TTL 过期;
    3、队列达到最大长度,即队列满了;

    在这里插入图片描述

    六、什么是延迟队列?RabbitMQ 如何实现延迟队列?

    • 延迟队列是用来存放“延迟消息”的队列;
    • 所谓“延迟消息”是指消息被发送到队列以后,并不想让消费者立刻拿到消息,而是等待指定的时间后,消费者才能拿到这个消息;

    RabbitMQ 本身是没有直接可以使用的延迟队列;要实现延迟队列,一般有两种方式:

    • 使用TTL消息过期 + DLX死信队列来实现;
      在这里插入图片描述

    • 使用使用 rabbitmq-delayed-message-exchange 延迟插件来实现;
      在这里插入图片描述

    七、RabbitMQ的高可用机制有了解嘛?

    RabbitMQ有三种模式:
    1)单机模式
    2)普通集群模式
    在这里插入图片描述

    3)镜像集群模式 (高可用)
    在这里插入图片描述

    八、如果有百万消息堆积在MQ中,如何解决?

    在这里插入图片描述

    • 原因:生产和消费失衡;
    • 解决:

    1、提前预防;

    • 流量预估,预估每秒产生的消息数量;
    • 做好压测,压测系统的消费能力;
    • 做好预案,准备好可能发生的超出预估的突发情况;

    2、应急处理;

    • 消费端
      • 临时增加消费者数量:增加更多的消费者实例、增加消费者线程数量;
      • 临时增加消费者把消息写入其他设备,后续处理;
    • 生产端
      • 可以适当限流,降低消息发送速度;

    3、事后优化;

    • 优化消费端业务处理逻辑,提升业务执行效率,减少io操作,减少数据库操作,减少网络连接,业务异步处理等;

    九、如何解决RabbitMQ中因为消息堆积而导致的消息过期失效的问题?

    消息堆积,导致消息大量存储在消息队列中得不到消费,而消息又设置了TTL过期时间,当到达过期时间时,消息被过期丢弃;

    解决:
    1、让消息不要被堆积;
    2、不设置TTL过期时间或者增加过期时间时长;
    3、设置死信队列
    4、编写临时程序补发消息;

  • 相关阅读:
    相机1:如何系相机肩带
    【第一章】浅谈游戏作弊类型与核心原理
    Gateway基础使用
    【C++】STL — vector的使用 + 模拟实现
    terraform简单的开始-简单分析一下内容
    时序分解 | Matlab实现SSA-VMD麻雀算法优化变分模态分解时间序列信号分解
    【黄啊码】composer运行php相关问题解决方式
    架构师日常(二)
    狂奔的低代码,画风各异的阿里云、腾讯云
    JavaWeb 综合案例(包含注册与登录页面,包含对数据库的增删改查页面)新增加session与cookie与验证码
  • 原文地址:https://blog.csdn.net/weixin_46990523/article/details/136431231