• SpringBoot 整合 RabbitMQ 实现消息可靠传输


    消息的可靠传输是面试必问的问题之一,保证消息的可靠传输主要在生产端开启 comfirm 模式,RabbitMQ 开启持久化,消费端关闭自动 ack 模式。

    环境配置

    SpringBoot 整合 RabbitMQ 实现消息的发送。

    • 添加 maven 依赖
           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
    • 添加 application.yml 配置文件
    spring:
      rabbitmq:
        host: 192.168.3.19
        port: 5672
        username: admin
        password: xxxx
    
    • 配置交换机、队列以及绑定
        @Bean
        public DirectExchange myExchange() {
            DirectExchange directExchange = new DirectExchange("myExchange");
            return directExchange;
        }
    
        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue");
            return queue;
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
        }
    
    • 生产发送消息
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/send")
        public String send(String message) {
            rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
            System.out.println("【发送消息】" + message)
            return "【send message】" + message;
        }
    
    • 消费者接收消息
        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 当前时间" + time);
    
    • 调用生产端发送消息 hello,控制台输出:
    【发送消息】hello
    【接收信息】hello 当前时间2022-05-12 10:21:14
    

    说明消息已经被成功接收。

    消息丢失分析

    file

    一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:

    • 生产端丢失: 生产者无法传输到 RabbitMQ
    • 存储端丢失: RabbitMQ 存储自身挂了
    • 消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费

    RabbitMQ 从生产端、储存端、消费端都对可靠性传输做很好的支持。

    生产阶段

    生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。

    • 配置 application.yml
    spring:
      rabbitmq:
        # 消息确认机制 生产者 -> 交换机
        publisher-confirms: true
        # 消息返回机制  交换机 -> 队列
        publisher-returns: true
    

    配置

    @Configuration
    @Slf4j
    public class RabbitConfig {
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("【correlationData】:" + correlationData);
                    log.info("【ack】" + ack);
                    log.info("【cause】" + cause);
                    if (ack) {
                        log.info("【发送成功】");
                    } else {
                        log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause);
                    }
                }
            });
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.warn("【消息发送失败】");
                    log.info("【message】" + message);
                    log.info("【replyCode】" + replyCode);
                }
            });
    
            return rabbitTemplate;
        }
    }
    

    消息从 生产者交换机, 有confirmCallback 确认模式。发送消息成功后消息会调用方法confirm(CorrelationData correlationData, boolean ack, String cause),根据 ack 判断消息是否发送成功。

    消息从 交换机队列,有returnCallback 退回模式。

    发送消息 product message 控制台输出如下:

    【发送消息】product message
    【接收信息】product message 当前时间2022-05-12 11:27:56
    【correlationData】:null
    【ack】true
    【cause】null
    【发送成功】
    

    生产端模拟消息丢失

    这里有两个方案:

    1. 发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。
    2. 发送不存在的交换机:
    // myExchange 修改成 myExchangexxxxx
    rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
    

    结果:

    【correlationData】:null
    【ack】false
    【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
    【发送失败】
    

    当发送失败可以对消息进行重试

    1. 交换机正确,发送不存在的队列:

    交换机接收到消息,返回成功通知,控制台输出:

    【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
    【ack】true
    【cause】null
    【发送成功】
    

    交换机没有找到队列,返回失败信息:

    【消息发送失败】
    【message】product message
    【replyCode】312
    

    RabbitMQ

    开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里

    修改队列的持久化,修改成非持久化:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",false);
            return queue;
        }
    

    发送消息之后,消息存放在队列中,然后重启 RabbitMQ,消息不存在了。
    设置队列持久化:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",true);
            return queue;
        }
    

    重启之后,队列的消息还存在。

    消费端

    消费端默认开始 ack 自动确认模式,当队列消息被消费者接收,不管有没有被消费端消息,都自动删除队列中的消息。所以为了确保消费端能成功消费消息,将自动模式改成手动确认模式:

    修改 application.yml 文件

    spring:
      rabbitmq:
        # 手动消息确认
        listener:
          simple:
            acknowledge-mode: manual
    

    消费接收消息之后需要手动确认:

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 当前时间" + time);
            System.out.println(message.getMessageProperties().getDeliveryTag());
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    

    如果不添加:

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    

    发送两条消息
    消息被接收后,没有确认,重新放到队列中:

    file

    重启项目,之后,队列的消息会发送到消费者,但是没有 ack 确认,还是继续会放回队列中。

    加上 channel.basicAck 之后,再重启项目:
    file

    队列消息就被删除了

    basicAck 方法最后一个参数 multiple 表示是删除之前的队列。

    multiple 设置成 true,把后面的队列都清理掉了:
    file

    源码

    https://github.com/jeremylai7/springboot-learning/tree/master/spring-rabbitmq

    如果觉得文章对你有帮助的话,请点个推荐吧!

  • 相关阅读:
    辽宁工程技术大学计算机考研资料汇总
    vue面试相关知识
    linux系统只给某个用户安装软件
    创建了一个名为nums_list的vector容器,其中存储了一系列的pair<int, int>
    论文阅读:Privacy Leakage of Real-World Vertical Federated Learning
    c++成员函数做回调
    特斯拉/小鹏「调整」软件策略,智能驾驶「标配」进入全新周期
    苹果服务器退款通知处理notificationType为CONSUMPTION_REQUEST
    Java微服务监控及与普罗米修斯集成
    cmka下切换使用不同版本的boost-未解决
  • 原文地址:https://www.cnblogs.com/jeremylai7/p/16307934.html