• RabbitMQ保证消息的可靠性


    一、背景

    消息丢失:下图是消息从生产者发送到消费者接收的关系图。通过图片可以看出,消息在生产者、MQ、消费者这三个环节都有可能丢失。
    在这里插入图片描述

    1.1 生产者丢失

    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常

    1.2 MQ丢失

    • 消息到达MQ,保存到队列后,尚未消费就突然宕机

    1.3 消费者丢失

    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

    1.4 总结(三方面入手)

    • 确保生产者成功把消息发送到MQ
    • 确保MQ不会丢失消息
    • 确保消费者成功处理消息

    二、解决方案

    配置

    package com.qiangesoft.rabbitmq.producer;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 消息发送配置
     *
     * @author qiangesoft
     * @date 2024-05-08
     */
    @Configuration
    public class MessageConfig {
    
        public static final String EXCHANGE = "simple.exchange";
    
        public static final String QUEUE = "simple.queue";
    
        public static final String ROUTING_KEY = "simple";
    
        @Bean
        public DirectExchange simpleExchange() {
            return ExchangeBuilder
                    .directExchange(EXCHANGE)
                    // 持久化交换机
                    .durable(true)
                    .build();
        }
    
        @Bean
        public Queue simpleQueue() {
            return QueueBuilder
                    // 持久化队列
                    .durable(QUEUE)
                    // 避免消息堆积、懒加载
                    .lazy()
                    .build();
        }
    
        @Bean
        public Binding simpleBinding(Queue simpleQueue, DirectExchange simpleExchange) {
            return BindingBuilder.bind(simpleQueue).to(simpleExchange).with(ROUTING_KEY);
        }
    
    }
    
    

    2.1 生产者

    2.1.1 生产者重试机制

    背景:生产者发送消息时,出现了网络故障,导致与MQ的连接中断
    解决方案:配置连接超时时间、重试机制。

    spring:
      rabbitmq:    
        # 设置MQ的连接超时时间
        connection-timeout: 1s
        template:
          # 连接重试机制
          retry:
            enabled: true
            # 失败后的初始等待时间
            initial-interval: 1000ms
            # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
            multiplier: 1
            # 最大重试次数
            max-attempts: 3
    

    2.1.2 生产者确认机制

    背景:

    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常

    解决方案:配置Publisher Confirm机制、Publisher Return机制。

    spring:
      rabbitmq:
        # 开启publisher confirm机制,并设置confirm类型,确保消息到达交换机
        publisher-confirm-type: correlated
        # 开启publisher return机制,确保消息到达队列
        publisher-returns: true
    
    定义ConfirmCallback
    package com.qiangesoft.rabbitmq.producer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageDeliveryMode;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.nio.charset.StandardCharsets;
    import java.util.UUID;
    
    /**
     * 生产者
     *
     * @author qiangesoft
     * @date 2024-05-08
     */
    @Slf4j
    @RequestMapping("/producer")
    @RestController
    public class ProducerController {
    
        @Autowired
        public RabbitTemplate rabbitTemplate;
    
        @GetMapping("/send")
        public void send(String content) {
            CorrelationData correlation = getCorrelationData();
    
            Message message = MessageBuilder
                    .withBody(content.getBytes(StandardCharsets.UTF_8))
                    .setMessageId(UUID.randomUUID().toString())
                    // 消息持久化
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                    .build();
    
            // 正常发送
            rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE, MessageConfig.ROUTING_KEY, message, correlation);
        }
    
        private static CorrelationData getCorrelationData() {
            // 异步回调返回回执,开启publisher confirm机制【确保消息到达交换机】
            CorrelationData correlation = new CorrelationData();
            correlation.getFuture().addCallback(new ListenableFutureCallback<>() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("消息发送异常,ID:{},原因:{}", correlation.getId(), ex.getMessage());
                }
    
                @Override
                public void onSuccess(CorrelationData.Confirm result) {
                    log.info("触发【publisher confirm】机制");
                    if (result.isAck()) {
                        log.info("消息发送成功到达交换机,ID:{}", correlation.getId());
                    } else {
                        // 消息发送失败
                        log.error("消息发送失败未到达交换机,ID:{},原因:{}", correlation.getId(), result.getReason());
                    }
                }
            });
            return correlation;
        }
    
    }
    
    
    定义ReturnCallback
    package com.qiangesoft.rabbitmq.producer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 消息路由失败回退配置
     *
     * @author qiangesoft
     * @date 2024-05-08
     */
    @Slf4j
    @Configuration
    public class ReturnsCallbackConfig implements ApplicationContextAware {
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
            // 消息路由失败退回,设置ReturnsCallback【消息到达交换机没有达到队列】
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returned) {
                    log.info("触发【publisher return】机制");
                    log.error("消息投递失败未到达队列,应答码:{},原因:{},交换机:{},路由键:{},消息:{}", returned.getReplyCode(), returned.getReplyText(),
                            returned.getExchange(), returned.getRoutingKey(), returned.getMessage());
                }
            });
        }
    
    }
    

    2.2 MQ

    • Exchange持久化
    • Queue持久化
    • Message持久化

    2.2.1 Exchange

    @Bean
    public DirectExchange simpleExchange() {
        return ExchangeBuilder
                .directExchange(EXCHANGE)
                // 持久化交换机
                .durable(true)
                .build();
    }
    

    2.2.2 Queue

    @Bean
    public Queue simpleQueue() {
        return QueueBuilder
                // 持久化队列
                .durable(QUEUE)
                // 避免消息堆积、懒加载
                .lazy()
                .build();
    }
    

    2.2.3 Message

    Message message = MessageBuilder
                    .withBody(content.getBytes(StandardCharsets.UTF_8))
                    .setMessageId(UUID.randomUUID().toString())
                    // 消息持久化
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                    .build();
    // 发送
    rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE, MessageConfig.ROUTING_KEY, message, correlation);
    

    2.3 消费者

    2.3.1 消费者确认机制

    spring:
      rabbitmq:
        listener:
          simple:
            # 自动ack
            acknowledge-mode: auto
    

    2.3.2 消费者重试机制

    spring:
      rabbitmq:
        listener:
          simple:
            # 消费者失败重试机制
            retry:
              enabled: true
              # 初始的失败等待时长为1秒
              initial-interval: 1000ms
              # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
              multiplier: 1
              # 最大重试次数
              max-attempts: 3
              # true无状态;false有状态。如果业务中包含事务,这里改为false
              stateless: true
    

    2.3.3 失败处理策略

    package com.qiangesoft.rabbitmq.consumer;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 消息消费失败配置
     * ps:配置处理失败消息的交换机和队列
     *
     * @author qiangesoft
     * @date 2024-05-08
     */
    @Configuration
    public class ErrorMessageConfig {
    
        public static final String EXCHANGE = "error.exchange";
    
        public static final String QUEUE = "error.queue";
    
        public static final String ROUTING_KEY = "error";
    
        @Bean
        public DirectExchange errorMessageExchange() {
            return new DirectExchange(EXCHANGE);
        }
    
        @Bean
        public Queue errorQueue() {
            return new Queue(QUEUE, true);
        }
    
        @Bean
        public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
            return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(ROUTING_KEY);
        }
    
        @Bean
        public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
            return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE, ROUTING_KEY);
        }
    
    }
    
    
  • 相关阅读:
    js事件:
    React-hooks【四】父组件通过ref获取子组件实例
    爱奇艺开源的高性能网络安全监控引擎
    玩转gRPC—不同编程语言间通信
    Linux系统下centos中在线添加硬盘后不重启在线扩容linux系统目录不重启系统
    STM32F103标准库硬件IIC+DMA连续数据发送、接收
    JAVA面试前的准备工作
    生产环境搭建高可用Harbor(包括恢复演练实操)
    GC 垃圾回收机制
    一文教你学会使用Cron表达式定时备份MySQL数据库
  • 原文地址:https://blog.csdn.net/weixin_39311781/article/details/138580195