• RabbitMQ消息的可靠传输和防止消息丢失


    在Spring Cloud项目中,为了确保RabbitMQ消息的可靠传输和防止消息丢失,需要考虑以下几个方面:

    1. 消息持久化:确保消息在RabbitMQ中持久化。
    2. 队列持久化:确保队列是持久化的。
    3. 发布确认:使用发布确认机制确保消息发送到RabbitMQ。
    4. 消费者确认:确保消费者正确地确认消息。
    5. 重试机制:在消息消费失败时,设置重试机制。

    下面详细介绍如何实现这些措施:

    1. 添加依赖

    确保在你的pom.xml中添加了Spring Boot和RabbitMQ的依赖:

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.cloudgroupId>
        <artifactId>spring-cloud-starter-stream-rabbitartifactId>
    dependency>
    

    2. 配置RabbitMQ

    application.ymlapplication.properties文件中配置RabbitMQ:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        publisher-confirm-type: correlated
        publisher-returns: true
    

    3. 定义配置类

    创建一个配置类来配置队列、交换机和绑定:

    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
    
        public static final String QUEUE_NAME = "myQueue";
        public static final String EXCHANGE_NAME = "myExchange";
        public static final String ROUTING_KEY = "myRoutingKey";
    
        @Bean
        public Queue myQueue() {
            return QueueBuilder.durable(QUEUE_NAME).build();
        }
    
        @Bean
        public DirectExchange myExchange() {
            return new DirectExchange(EXCHANGE_NAME);
        }
    
        @Bean
        public Binding myBinding(Queue myQueue, DirectExchange myExchange) {
            return BindingBuilder.bind(myQueue).to(myExchange).with(ROUTING_KEY);
        }
    }
    

    4. 配置消息生产者

    确保消息生产者配置了发布确认和消息持久化

    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.util.UUID;
    
    @Service
    public class MessageProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            // 设置发布确认回调
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if (ack) {
                        System.out.println("Message delivered successfully: " + correlationData);
                    } else {
                        System.err.println("Failed to deliver message: " + correlationData + ", cause: " + cause);
                    }
                }
            });
    
            // 设置消息返回回调
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                System.err.println("Returned Message: " + new String(message.getBody()) +
                        ", replyCode: " + replyCode + ", replyText: " + replyText +
                        ", exchange: " + exchange + ", routingKey: " + routingKey);
            });
        }
    
        public void sendMessage(String message) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message, correlationData);
        }
    }
    

    5. 配置消息消费者

    确保消息消费者配置了消息确认机制:

    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    import org.springframework.amqp.rabbit.support.Acknowledgment;
    import org.springframework.stereotype.Service;
    
    @Service
    public class MessageConsumer {
    
        @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
        public void handleMessage(String message, Channel channel, Message message) throws Exception {
            try {
                // 处理消息
                System.out.println("Received Message: " + message);
                
                // 消息确认
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                // 消费失败,重新放回队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
    

    6. 启用重试机制

    在Spring Cloud Stream中启用重试机制:

    spring:
      cloud:
        stream:
          bindings:
            input:
              destination: myQueue
              consumer:
                retry:
                  max-attempts: 5
                  backOffPolicy:
                    initialInterval: 1000
                    multiplier: 2.0
                    maxInterval: 10000
    

    7. 测试

    测试消息生产和消费,确保消息在各种情况下都不会丢失,包括网络故障、RabbitMQ服务器重启等。

    总结

    通过以上步骤,你可以在Spring Cloud项目中使用RabbitMQ并确保消息不会丢失。关键在于:

    1. 消息和队列的持久化:确保消息和队列都是持久化的。
    2. 发布确认:启用发布确认回调机制,确保消息被正确地发送到RabbitMQ。
    3. 消费者确认:确保消费者正确地确认消息。
    4. 重试机制:在消费失败时启用重试机制,以确保消息最终能够被成功处理。

    通过这些配置,可以显著提高消息传输的可靠性,防止消息丢失。

  • 相关阅读:
    关于scanf和printf的格式控制修饰符
    【信号与系统】相位卷绕以及连续信号的符号表示
    一、pycharm的使用技巧和好用插件
    物联网中的MQTT协议总结
    【深度学习】RNN循环神经网络和LSTM深度学习模型
    最长公共子串问题
    RabbitMQ工作模式-路由模式
    Python经典练习题(二)
    基于GAN的自动提取混凝土损伤特征方法
    【Flink伪分布式环境搭建及应用,Standlong(开发测试)】
  • 原文地址:https://blog.csdn.net/lw_jack/article/details/139723706