• springcloud 整合 RabbitMQ 消息中间件


    以下是在 Spring Cloud 中整合 RabbitMQ 消息中间件的详细步骤、代码说明,以及分析和解决消息丢失和消息重复消费问题的示例:

    1. 依赖添加:

    在 Maven 项目的 pom.xml 文件中添加 RabbitMQ 和 Spring Cloud Stream 的依赖:

    
        
            org.springframework.cloud
            spring-cloud-stream
            3.2.5
        
        
            org.springframework.amqp
            spring-rabbit
            3.2.5
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    2. 配置 RabbitMQ:

    在 Spring Cloud 配置文件(例如 application.yml)中添加 RabbitMQ 的连接配置:

    spring:
      cloud:
        stream:
          bindings:
            output:
              destination: my-exchange
              binder: rabbitmq
          rabbitmq:
            binder:
              # RabbitMQ 服务器地址
              address: localhost
              # RabbitMQ 端口
              port: 5672
              # RabbitMQ 虚拟主机
              virtual-host: /my_vhost
              username: guest
              password: guest
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    3. 创建消息生产者:

    创建一个发送消息的 Spring Cloud Stream 组件(例如 MessageProducer),并配置输出通道和消息转换器:

    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MessageProducer {
    
        private final Source source;
    
        public MessageProducer(Source source) {
            this.source = source;
        }
    
        public void sendMessage(String message) {
            // 将消息发送到输出通道
            this.source.output().send(MessageBuilder.withPayload(message).build());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    4. 创建消息消费者:

    创建一个接收消息的 Spring Cloud Stream 组件(例如 MessageConsumer),并配置输入通道和消息处理器:

    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MessageConsumer {
    
        private final Sink sink;
    
        public MessageConsumer(Sink sink) {
            this.sink = sink;
        }
    
        public void consumeMessage(String message) {
            // 从输入通道接收消息
            this.sink.input().receive(MessageBuilder.withPayload(message).build());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    5. 解决消息丢失问题:
    消息丢失可能发生在生产者发送消息时或消费者处理消息时

    生产者端:
    生产者在发送消息时出现异常,导致消息未能成功发送到消息中间件
    生产者在发送消息后没有正确处理返回的确认信息,导致消息可能被丢弃。

    消息中间件(如 RabbitMQ)端:
    消息中间件在处理消息时出现故障,导致消息丢失。
    消息中间件的配置问题,例如缓冲区大小设置不合理,导致消息在缓冲区溢出时丢失。
    消息中间件在进行数据持久化时出现问题,导致消息未能正确存储。

    消费者端:
    消费者在处理消息时出现异常,导致消息未能被正确处理。
    消费者在确认消息已处理之前出现故障,导致消息可能被重新分配给其他消费者或丢失。

    为了确保消息不丢失,可以采取以下措施:
    • 在生产者端启用消息确认机制,确保消息成功到达 RabbitMQ 服务器。
    • 在消费者端启用手动确认机制,确保消息在处理完成后被确认。
    6. 解决消息重复消费问题:

    消息重复消费可能发生在消费者重启或网络故障等情况下。为了避免重复处理消息,可以采取以下措施:

    • 在消息处理逻辑中添加幂等性处理,确保相同的消息不会被重复处理。
    • 使用消息唯一标识(例如消息的 UUID)来避免重复处理相同的消息。

    请注意,上述示例代码中的 my-exchange 是 RabbitMQ 的交换器名称,你可以根据实际需求进行修改。另外,还需要确保在启动应用时,正确配置和启动 Spring Cloud Stream 和 RabbitMQ 相关的服务。

  • 相关阅读:
    【UiPath2022+C#】UiPath 调试
    pytorch-09.多分类问题
    MYSQL事务操作
    mybatis实战:一、mybatis入门(配置、一些问题的解决)
    Linux C应用编程-5-线程
    Maven快速配置和入门
    django中orm定义数据库表字段的几种字段类型
    (附源码)springboot宠物医院管理系统 毕业设计 180923
    服务器和电脑的区别是什么
    MySQL是如何保证高可用的
  • 原文地址:https://blog.csdn.net/momo_128/article/details/136437910