• 基于springboot实现的rabbitmq消息确认


    概述

    RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

    详细

    一、运行效果

    image.png

    二、实现过程

    ①、引入rabbitmq包
    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>
    ②、修改application.properties配置
    1. spring.rabbitmq.host=127.0.0.1
    2. spring.rabbitmq.port=5672
    3. spring.rabbitmq.username=guest
    4. spring.rabbitmq.password=guest
    5. # 发送者开启 confirm 确认机制
    6. spring.rabbitmq.publisher-confirms=true
    7. # 发送者开启 return 确认机制
    8. spring.rabbitmq.publisher-returns=true
    9. ####################################################
    10. # 设置消费端手动 ack
    11. spring.rabbitmq.listener.simple.acknowledge-mode=manual
    12. # 是否支持重试
    13. spring.rabbitmq.listener.simple.retry.enabled=true
    ③、定义exchange和queue,并将queue绑定在exchange上
    1. package com.mm.springbootrabbitmqconfirmdemo.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.FanoutExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.beans.factory.annotation.Qualifier;
    7. import org.springframework.context.annotation.Bean;
    8. import org.springframework.context.annotation.Configuration;
    9. @Configuration
    10. public class RabbitMQConfig {
    11. @Bean(name = "confirmQueue")
    12. public Queue confirmQueue(){
    13. return new Queue("confirmQueue",true,false,false);
    14. }
    15. @Bean(name = "confirmExchange")
    16. public FanoutExchange confirmExchange(){
    17. return new FanoutExchange("confirmExchange");
    18. }
    19. @Bean
    20. public Binding confirmFanoutExchangeAndQueue(@Qualifier("confirmExchange") FanoutExchange confirmExchange,
    21. @Qualifier("confirmQueue") Queue confirmQueue){
    22. return BindingBuilder.bind(confirmQueue).to(confirmExchange);
    23. }
    24. }
    ④、消息发送确认

    发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。

    消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。

    消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。

    我们可以利用这两个Callback来确保消息的100%送达。

    1、 ConfirmCallback确认模式

    消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。

    1. package com.mm.springbootrabbitmqconfirmdemo.service;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.connection.CorrelationData;
    4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    5. import org.springframework.stereotype.Component;
    6. @Slf4j
    7. @Component
    8. public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    9. @Override
    10. public void confirm(CorrelationData correlationData, boolean ack, String cause){
    11. if (!ack) {
    12. log.error("消息发送异常!");
    13. } else {
    14. log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
    15. }
    16. }
    17. }

    实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationDataackcause

    • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。

    • ack:消息投递到broker 的状态,true表示成功。

    • cause:表示投递失败的原因。

    但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback 。

    2、 ReturnCallback 退回模式

    如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

    1. com.mm.springbootrabbitmqconfirmdemo.service;
    2. lombok.extern.slf4j.;
    3. org.springframework.amqp.core.Message;
    4. org.springframework.amqp.rabbit.core.RabbitTemplate;
    5. org.springframework.stereotype.;
    6. ReturnCallbackService RabbitTemplate.ReturnCallback returnedMessageMessage message, replyCode, String replyText, String exchange, String routingKey.info, replyCode, replyText, exchange, routingKey;

    实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

    下边是具体的消息发送,在rabbitTemplate中设置 Confirm 和 Return 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id 为10000000000

    ⑤、消息发送确认

    消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。使用@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数。

    1. @Slf4j
    2. @Component
    3. @RabbitListener(queues = "confirm_test_queue")
    4. public class ReceiverMessage1 {
    5. @RabbitHandler
    6. public void processHandler(String msg, Channel channel, Message message) throws IOException {
    7. try {
    8. log.info("小富收到消息:{}", msg);
    9. //TODO 具体业务
    10. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    11. } catch (Exception e) {
    12. if (message.getMessageProperties().getRedelivered()) {
    13. log.error("消息已重复处理失败,拒绝再次接收...");
    14. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
    15. } else {
    16. log.error("消息即将再次返回队列处理...");
    17. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    18. }
    19. }
    20. }
    21. }

    消费消息有三种回执方法,我们来分析一下每种方法的含义。

    1、basicAck

    basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

    void basicAck(long deliveryTag, boolean multiple)

    deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行acknackreject等操作。

    multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

    举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

    2、basicNack

    basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

    void basicNack(long deliveryTag, boolean multiple, boolean requeue)

    deliveryTag:表示消息投递序号。

    multiple:是否批量确认。

    requeue:值为 true 消息将重新入队列。

    3、basicReject

    basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

    void basicReject(long deliveryTag, boolean requeue)

    deliveryTag:表示消息投递序号。

    requeue:值为 true 消息将重新入队列。

    三、项目结构图

    image.png

    四、补充

    1、别忘确认消息

    这是一个非常没技术含量的坑,但却是非常容易犯错的地方。

    开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。

    2、消息无限投递

    在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

    1. @RabbitHandler
    2. public void processHandler(String msg, Channel channel, Message message) throws IOException {
    3. try {
    4. log.info("消费者 2 号收到:{}", msg);
    5. int a = 1 / 0;
    6. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    7. } catch (Exception e) {
    8. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    9. }
    10. }

    3、重复消费

    如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助MySQL、或者redis将消息持久化,通过再消息中的唯一性属性校验。

    可以看到使用了 RabbitMQ 以后,我们的业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。例如:

    • 消息生产者 - > rabbitmq服务器(消息发送失败)

    • rabbitmq服务器自身故障导致消息丢失

    • 消息消费者 - > rabbitmq服务(消费消息失败)

  • 相关阅读:
    DeferredResult解决了什么问题
    数据结构之B树
    合并word中参考文献-(Endnote生成)
    「AI人工智能」关于AI的灵魂发问
    不用任何比较运算符找出两个整数中的较大的值
    Java SE 18 新增特性
    libfaad2 主机编译和交叉编译
    Element-ui select远程搜索
    【网络是怎么连接的】第四章 探索接入网和网络运营商
    idea 将项目上传到gitee远程仓库具体操作
  • 原文地址:https://blog.csdn.net/hanjiepo/article/details/132797112