• MQ高级特性(消息的可靠性)


    一、消息确认

    1.1 消息的可靠性

    • 生产者发送消息未送达exchange
    • 消息到达exchange后未到达queue
    • 消息到达queue,未到达消费者

    1.2 生产者的确认机制

    • pulisher-confirm 发送者确认
    • 消息成功投递到交换机了,返回ack
    • 消息未投递到交换机,返回nack
    • pulisher-return 发送者回执(回执通过全局唯一ID确认是哪条消息)
    • 消息投递到交换机,但没有路由到队列,返回ACK以及路由失败的原因

    1.3 实现

    1.3.1 yaml配置

    spring:
     rabbitmq:
      publisher-confirm-type: correlated  #生产者确认类型。有simple:同步等待conrim,阻塞。correlated:异步回调,定义ConfirmCallback,MQ返回结果会调这个函数
        publisher-returns: true #开启功能,没有路由到队列,回调ReturnCallback
        template:
          mandatory: true   #true则调用ReturnCallback   ,false丢弃消息
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    1.3.2 配置ReturnCallback(只有一个RabbitTemplate,只配一次)

    @Slf4j
    @Configuration
    public class CommonConfig implements ApplicationContextAware {
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            // 获取RabbitTemplate对象
            RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
            // 配置ReturnCallback
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                // 判断是否是延迟消息
                Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
                if (receivedDelay != null && receivedDelay > 0) {
                    // 是一个延迟消息,忽略这个错误提示
                    return;
                }
                // 记录日志
                log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                         replyCode, replyText, exchange, routingKey, message.toString());
                // 如果有需要的话,重发消息
            });
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    1.3.2 ConfirmCallback(每次发消息都能配置)

    // 1.准备消息
            String message = "hello, spring amqp!";
            // 2.准备CorrelationData
            // 2.1.消息ID
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            // 2.2.准备ConfirmCallback
            correlationData.getFuture().addCallback(result -> {
                // 判断结果
                if (result.isAck()) {
                    // ACK
                    log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
                } else {
                    // NACK
                    log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                    // 重发消息
                }
            }, ex -> {
                // 记录日志
                log.error("消息发送失败!", ex);
                // 重发消息
            });
            // 3.发送消息
            rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    二、消息持久化(spring默认配好交换机、队列、消息全都是持久化的)

    MQ默认是内存存储消息,开启持久化功能可以确保消息不会丢失
    ![在这里插入图片描述](https://img-blog.csdnimg.cn/211a78ac6c3340d59442bd119e7db76f.png
    2.1 设置消息持久化

      Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))//消息体
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)//持久化
                    .build();	
    
    • 1
    • 2
    • 3

    三、消费者消息确认

    消费者消费完消息后可以向MQ发送ack回执,MQ收到ack回执才会删除该消息。Springamqp允许配置3种模式。

    • manual:手动ack,在业务代码结束后,调用api发送ack。
    • auto:由spring进行检测监听的代码 是否出现异常,没有则返回ack,抛出异常返回nack。
    • none:关闭ack,消息投递后就删除。

    3.1 配置确认模式

    消费者失败会进行requeue重新入队,然后再次异常,再次入队,重复循环。
    采取spring的本地重试,配置重试次数和间隔时间

    spring:
     rabbitmq:
       listener:
         simple:
           acknowledge-mode: auto  #自动投递
           retry:
              enabled: true
              initial-interval: 1000  #初始等待时长
              multiplier: 3   #下次失败等待市场,上次*multiplier
              max-attempts: 4 #最大重试次数
              stateless: true  #true无状态,如果包含事务需要为fasle,否则事务会失效
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.2 消费者失败消息策略处理

    在spring开启本地重试后,当重试次数耗尽,由MessageRecover接口来处理,它包含三种不同的实现

    • RejectAndDontRequeueRecoverer
      重试耗尽后,直接reject,丢弃消息。默认就是这种方式.
    • ImmediateRequeueMessageRecoverer
      重试耗尽后,返回nack,消息重新入队
    • RepublishMessageRecoverer
      重试耗尽后,将失败消息投递到指定的交换机

    3.2.1 RepublishMessageRecoverer的使用

    将error.queue绑定给error.direct,当routingkey是error,发送给error.queue。当重现重试耗尽,RepublishMessageRevocerer重新投递到指定的交换机error.direct,根据

    @Configuration
    public class ErrorMessageConfig {
    
        @Bean
        public DirectExchange errorMessageExchange(){
            return new DirectExchange("error.direct");
        }
    
        @Bean
        public Queue errorQueue(){
            return new Queue("error.queue");
        }
    
        @Bean
        public Binding errorMessageBinding(){
            return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
        }
    
        @Bean
        public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
            return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    OPENCV实现人类识别(包括眼睛、鼻子、嘴巴)
    vulnhub之Ripper
    Python(9)字典和集合
    Vue中$nextTick实现源码解析
    机器学习理论公式推导及原理—决策树
    传智杯第一届例题5
    成为千行百业数字化转型催化剂的,竟然是它!
    python开发环境搭建问题汇总
    对于BI可视化分析平台,你了解多少?
    青少年软件编程(202209)(C语言)等级考试(五级)试题及参考答案
  • 原文地址:https://blog.csdn.net/qq_46624276/article/details/126673422