• RabbitMQ怎么保证可靠性


    RabbitMQ怎么保证可靠性

    • 前言
    • 生产端问题
      • 解决方案
      • 代码
      • 验证
    • RabbitMQ问题
    • 消费端问题
      • 解决方案
      • 代码
      • 验证
    • 总结

    前言

    RabbitMQ相信大家都非常熟悉了,今天咱们来聊聊怎么保证RabbitMQ的可靠性。

    那什么时候会出现问题呢?

    第一种是生产端出现的问题。我们向队列中发送消息的时候,消息不一定可以发送到MQ中,这个时候如果我们不做任何处理,这样消息丢失了。

    第二种则是RabbitMQ出现的问题。也就是说现在生产端的成功将消息发送到了RabbitMQ,但由于MQ并没有做持久化,这样宕机重启之后消息可能就丢失了。

    第三种则是消费端的问题。消费端处理消息时如果出现异常,默认的解决方式是在重复消费多次,当次数超过阈值时直接删除消息,这也导致消息丢失。

    接下来咱们就看看怎么应对以上三种问题。

    生产端问题

    解决方案

    这里我们需要清楚发送的一个大体流程。

    生产端发送消息到MQ之后,会收到一个结果,这个结果有acknack两种。

    其中ack代表消息成功到达了交换机,但并不意味者消息到达了队列。不过ack的情况下消息未送达队列,会有相应的错误信息提醒。

    nack就代表消息并未送达交换机

    那么,怎么才能知道消息发送情况呢?

    可以设置callback来获取消息发送结果。

    代码

    局部callback设置如下

        @GetMapping("testmq")
        public Result testmq(){
            String orderId = String.valueOf(UUID.randomUUID());
            String messageData = "下订单!";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String,Object> map=new HashMap<>();
            map.put("orderId",orderId);
            map.put("messageData",messageData);
            map.put("createTime",createTime);
    
    //        设置发送的callback
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            correlationData.getFuture().addCallback(result -> {
                // 判断结果
                if (result.isAck()) {
                    log.info("发送成功");
                } else {
                    log.error("消息未达到交换机,发送失败");
                }
            }, ex -> {
                log.error("出现异常,发送失败");
            });
    
            rabbitTemplate.convertAndSend(RabbitMQConfig.NORMALEXCHANGE, RabbitMQConfig.TESTROUTING, map, message -> {
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            }, correlationData);
            return Result.succ("ok");
        }
    

    验证

    消息发送成功
    在这里插入图片描述
    交换机名称有误
    在这里插入图片描述
    队列路由出错
    虽然没有错误,但给了我们warning。
    在这里插入图片描述

    RabbitMQ问题

    这里就比较简单了,那就是做下持久化就可以了

    首先是交换机,队列和消息的持久化
    交换机

    @Bean
        DirectExchange normalExchange() {
            /**
             * durable 是否持久化
             * autoDelete 没有queue绑定时是否自动删除
             */
            return new DirectExchange(NORMALEXCHANGE, true, false);
        }
    

    队列

    @Bean
        public Queue cleanDQueue() {
            return QueueBuilder.durable(CLEANQUEUE)
                    .build();
        }
    

    消息的持久化

    rabbitTemplate.convertAndSend(RabbitMQConfig.NORMALEXCHANGE, RabbitMQConfig.TESTROUTING, map, message -> {
    			// 设置消息持久化
               message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            }, correlationData);
    

    消费端问题

    解决方案

    消费端出现错误时,会进行重试,当重试次数超过阈值之后有三种解决方案,如下

    • RejectAndDontRequeueRecoverer:超过阈值,直接丢失消息。
    • ImmediateRequeueMessageRecoverer:超过阈值,返回nack,然后消息重新入队。
    • RepublishMessageRecoverer:超过阈值直接将消费失败的消息投递到指定交换机。

    这里我们以RepublishMessageRecoverer为例做下演示。

    代码

    首先需要声明消费消息失败后传递的交换机和队列

       @Bean
        DirectExchange normalExchange() {
            /**
             * durable 是否持久化
             * autoDelete 没有queue绑定时是否自动删除
             */
            return new DirectExchange(NORMALEXCHANGE, true, false);
        }
    //  用于处理消费失败消息的队列
        @Bean
        public Queue republishQueue() {
            return QueueBuilder.durable(REPULISHQUEUE)
                    .build();
        }
    //  绑定失败消费消息队列
        @Bean
        Binding bindingRepublish() {
            return BindingBuilder.bind(republishQueue()).to(normalExchange()).with(REPULISHROUTING);
        }
    
    
    

    然后配置下RepublishMessageRecoverer策略,随便找个config注入下bean就可以。

     //  设置RepublishMessageRecoverer,消费失败的消息转移到另一队列中,交给管理员手动处理
        @Bean
        public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
            /**
             * NORMALEXCHANGE   接收消费失败消息的交换机
             * REPULISHROUTING  接收消费失败消息的路由key
             */
            return new RepublishMessageRecoverer(rabbitTemplate, NORMALEXCHANGE, REPULISHROUTING);
        }
    
    

    验证

    咱们看看如果消费出错会咋样

    我们可以看到被消费的队列中信息被删除了。
    在这里插入图片描述
    然后我们设置的转入队列中的消息数加一,这时候我们可以接收下该队列中的信息,存储到数据库中,方便维护人员手动进行处理。
    在这里插入图片描述

    总结

    从生产端、RabbitMQ以及消费端三方面介绍了一下怎么保证RabbitMQ的可靠性,另外还有关于死信队列和延迟队列的内容在这篇博客中,大家有兴趣可以看一下。

    RabbitMQ的死信队列和延迟队列

    在这里插入图片描述

  • 相关阅读:
    2015软专算法题T2
    提升微服务稳定性与性能:深入剖析Netflix Hystrix框架
    化繁为简 面板式空调网关亮相上海智能家居展 智哪儿专访青岛中弘赵哲海
    高数和数据结构的小例子
    合并两个有序数组(c#)
    公众号H5微信生态
    动手学习深度学习 06:卷积神经网络
    Qt小项目 | 实现迅雷设置界面
    docker的使用
    大厂晋升指南:材料准备,PPT 写作和现场答辩
  • 原文地址:https://blog.csdn.net/qq_43627076/article/details/139376546