• RabbitMQ可靠发布-SpringBoot


    • 消息队列可靠发布包括:(从流程分析)
      1. 发布层面:发布确认(两个回调)、备份交换机(与发布确认方案二选一)
      2. rabbit服务器层面:队列与消息持久化惰性队列集群与镜像队列
      3. 消费层面:自动应答(默认)、手动应答(可以自由拒绝或否定)
      4. 业务层面:幂等性设计优先级队列
    • 以下用SpringBoot演示,本demo中关于发布确认失败的消息仅用日志记录,如有需要可以用ConcucrrentSkipListMap来存储id - 消息内容 的映射,在回调函数中用correlationData.getId()来获取对应的消息;也可以通过备份交换机进行处理

    1.为什么需要发布确认

    • 消息丢失时,消费者感受不到,因此发布确认机制存在于发送阶段
    • 生产者找不到对应的exchange或者routingkey或者rabbitmq正在重启,消息会丢失
    • 发布确认模式:确保消息的可靠投递

    2.发布确认的时机

    • 首先:无论是原生API还是SpringBoot的API,默认都是关闭发布确认的。需要手动开启
    • ConfirmCallback 回调函数(重心在判断是否找到exchange) ,生产者的消息只要进入了broker就会触发,如果没找到对应的exchange,则ack=false;如果找到了对应的exchange,则ack=true
    • ReturnCallback退回模式(重心在判断是否找到对应的queue),使用这个之前需要先设置mandatory=true;触发条件是:找到了exchange,但没找到routingkey对应的queue,才会执行(找到了queue则不执行

    3.ConfirmCallback回调

    3.1配置文件

    也可以用连接工厂 而不是写死在配置文件中

    3.2实现回调接口

    /**
     * 消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。
     * @param correlationData 回调的相关数据。
     * @param ack             ack为真,nack为假
     * @param cause           一个可选的原因,用于nack,如果可用,否则为空。
     */
    @Slf4j
    @Component
    public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
          log.error("消息发送异常!correlationData={} ,ack={}", correlationData.getId(), ack);
        } else {
          log.info("消息已成功推送到mq队列,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3.3设置RabbitTemplate回调属性

    • 此时如果直接@Autowired一个RabbitTemplate对象,是没有装配这个回调的,需要手动调用:
      rabbitTemplate.setConfirmCallback(RabbitTemplate.ConfirmCallback confirmCallback)

    • 由于ConfirmCallback本身是一个接口,根据多态性,这里传入其实现类即可

    在3.2中有一个实现类,直接传进入就好,为了可以区分不同的配置,我们可以采用如下方式:

    3.3.1创建一个新的Bean

    在配置类中,创建名为myRabbitTemplate的Bean对象,逻辑是根据连接工厂(后面写)新new一个RabbitTemplate对象进行设置属性

    不直接操作原生的rabbitTemplate对象是为了解耦

      @Bean(name = "myRabbitTemplate")
      public RabbitTemplate wwRabbitTemplate(@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(new ConfirmCallbackService());
        return rabbitTemplate;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    使用时直接用@Resource

      @Resource(name = "myRabbitTemplate")
      private RabbitTemplate rabbitTemplate;
    
    • 1
    • 2

    4.ReturnCallback退回

    4.1实现退回接口

    /**
     * 如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
     */
    @Slf4j
    @Component
    public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
    
      @Override
      public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage:replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.2结合3号标题整合

    当然配置信息其实不应该这样写死,因为能会使用非集群的多个mq,因此在5号标题中引出连接工厂

    spring.rabbitmq.host=12...
    #spring.rabbitmq.port=5672 默认
    spring.rabbitmq.username=ad..
    spring.rabbitmq.password=.
    spring.rabbitmq.publisher-confirm-type=correlated
    spring.rabbitmq.publisher-returns=true
    
    
      @Bean(name = "myRabbitTemplate")
      public RabbitTemplate wwRabbitTemplate(@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        /**
         * 确保消息发送失败后可以重新返回到队列中
         * 注意:yml需要配置 publisher-returns: true
         */
        // 将两种不同确认模式的回调接口实例传入
        rabbitTemplate.setConfirmCallback(new ConfirmCallbackService());
        //找不到routingkey时的处理
        rabbitTemplate.setMandatory(Boolean.TRUE);//退回函数生效
        rabbitTemplate.setReturnCallback(new ReturnCallbackService());
        return rabbitTemplate;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    5.补:配置类+连接工厂

    5.1连接工厂

    我现在有很多个mq,其中有一个的配置如下(写在配置类中)

      @Bean(name = "myConnectionFactory")
      public ConnectionFactory myConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(true); //发布确认回调
        connectionFactory.setPublisherReturns(true); //退回
        return connectionFactory;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    5.2为独特的配置创建Bean对象

    使用时用@Resource注入即可

      @Bean(name = "myRabbitTemplate")
      public RabbitTemplate wwRabbitTemplate(@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        /**
         * 确保消息发送失败后可以重新返回到队列中
         * 注意:yml需要配置 publisher-returns: true
         */
        // 将两种不同确认模式的回调接口实例传入
        rabbitTemplate.setConfirmCallback(new ConfirmCallbackService());
        //找不到routingkey时的处理
        rabbitTemplate.setMandatory(Boolean.TRUE);//退回函数生效
        rabbitTemplate.setReturnCallback(new ReturnCallbackService());
        return rabbitTemplate;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    6.发布确认的原理

    首先,Spring整合RabbitMQ中的回调实现是异步、独立
    并且这个ConfirmCallback不是原生的那个,是Spring中的,是一个内部接口,不要混淆了

    6.1异步的好处

    这就要涉及到另外两个原生的方案:

    • 同步、独立确认:慢,并发场景效率很低
    • 同步、批量确认:稍快,不过一旦出问题,不能定位是哪条消息出了问题

    异步、独立确认:不影响消息发布线程,且能准确定位具体是哪条消息出的问题

    6.2原生与Spring整合的异步回调区别

    • 原生的API中回调接口可以支持批量回调
    • 但是Spring这个回调明显可以看到只能调用correlationData.getId()获取一个唯一的id,更加可靠

    6.2.1原生API实现批量确认(了解)

    Channel    channel    = connection.createChannel();
    
    channel.queueDeclare("异步队列",true, false,false,null);
    
    
    //1.线程安全有序的一个队列,并发
    ConcurrentSkipListMap mqMap = new ConcurrentSkipListMap<>();
    
    
    //2.回调
    
      //2.1确认回调
    ConfirmCallback ack = (deliveryTag,multiple)->{
          //日志记录等操作,略
          //去掉回调收到的第deliveryTag号消息
      if(multiple) {//如果是批量,则批量删除
        ConcurrentNavigableMap newMqMap = mqMap.headMap(deliveryTag);
        newMqMap.clear();
      }else{//单个直接删
        mqMap.remove(deliveryTag);
      }
      //此时的mqMap中只剩下了尚未确认的消息
    
    };
    
      //2.2否认回调
    ConfirmCallback nack = (deliveryTag,multiple)->{
        //日志记录等操作,略
      System.out.println(mqMap.get(deliveryTag));
        /*收到了nack,消息不会重新入队,需要自行处理
          一般都是需要重发的,所以nack时不去除mqMap中的元素
        */
    };
    
    
    
        /*** 生产者开启confirm确认模式****/
    channel.confirmSelect();
    //3.新的线程:发布确认的消息监听 监听ack和nack
        //注意顺序, 前面是ack,后面是nack
    channel.addConfirmListener(ack,nack);//还有另外一种构造形式,效果相同
    
    
    
    //4.模拟发布消息
    for(int i = 0 ; i < 1000 ; i++){
      channel.basicPublish("",//交换机
          "队列:"+i,//routingKey
          MessageProperties.PERSISTENT_TEXT_PLAIN,//消息策略:消息持久化
          ("消息"+i).getBytes(StandardCharsets.UTF_8));//消息内容
    
      //每次发布,mqMap都更新一个值
      mqMap.put(channel.getNextPublishSeqNo(),"消息"+i);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    7.如何处理发布失败的消息?

    由上述可知:我们可以从回调接口中获取CorrelationData参数,这个类中包含了一个很重要的信息————消息id,我们可以创建一个ConcucrrentSkipListMap来存储id - 消息内容 的映射,每次回调时:

    • ack = true则去除这个键值对
    • ack = false则取value重发或做一些其他的事

    8.备份交换机

    8.1发布确认已解决的问题

    • 有了ConfirmCallback:只要进入broker就回调,可以感知到exchange是否出错
    • 有了mandatoryReturnCallback:可以感知到queue是否出错
    • 实现上述两个接口并赋值给RabbitTemplate后,可以确保生产者投递消息时消息不丢失;但是如果要实现消息自由处理需要额外维护一个Map保存(消息id — 消息内容),不过这样对于生产者来说代码比较重
    • 注意:同时存在发布确认 和 备份交换机时,失败的消息优先进入备份交换机,所以方案二选一即可

    8.2备份交换机的好处

    • 备份交换机是一个fanout-exchange,当消息投递失败后不用上述两种回调,而是直接投递给备份交换机,备份交换机可以连接多个消费者如:日志记录,消息重发
    • 不过这属于锦上添花,减少了回调写法的复杂度

    8.3SpringBoot写法

    他本质也是个普通的扇出交换机,所以区别只是在“消息失败后投递给他

    普通交换机 指向(alternate-exchange) 备份交换机

    9.消费者应答

    SpringBoot默认是自动应答(监听队列也是),同时提供的API也会应答

    9.1配置手动应答

    首先需要关闭自动应答

    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    
    • 1
    • 2

    当然,仍然可以通过配置类来进行低耦合配置

      @Bean
      public RabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    9.2三种应答

    注:如果@RabbitListener写在类上,就需要搭配@RabbitHandler使用

    形参中有channel

    	  // 确认 : 消息tag,不批量
          channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    		//拒绝(不重新入队): 消息tag,不批量
          channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
          // 否定(可选重新入队):消息tag,不批量,重新入队
          channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    10.持久化

    10.1队列持久化

    在使用方面,声明队列的时候,其实底层也是调用的channel.queueDeclare(…)中第二个参数durable设置为true

    10.2消息持久化

    SpringBoot提供的API中是默认消息持久化的

    参考rabbitTemplate消息持久化

    channel.basicPublish的第三个参数,将策略设置为MessageProperties.PERSISTENT_TEXT_PLAIN
    第三个参数

    • 作用:每次发布消息都告诉MQ,将消息持久化
    • 缺点:MQ可能在消息持久化过程中挂了
    • 对于简单任务队列绰绰有余,更可靠的持久化方案是:发布确认

    11.惰性队列

    • 通俗话讲就是:队列存在磁盘中,内存中只保存索引,消费者消费消息的时候根据索引读取到内存中。
    • 这样做很安全,内存占用低,但是效率低下

    参考:

    原生写法

    springboot写法

    12.集群与镜像队列


    其他的关于RabbitMQ负载均衡部分省略

    13.幂等设计

    一般也都是用Redis,快
    全局唯一ID + Redis

    14.队列优先级

    操作的是max-priority属性

    参考:原生写法

  • 相关阅读:
    项目重构演进之路
    Vue.js核心技术解析与uni-app跨平台实战开发学习笔记 第1章 Vue.js基础入门 1.5 Vue基础指令
    大数据技术基础实验五:Zookeeper实验——部署ZooKeeper
    Android开发笔记(一百八十九)利用LAME录制MP3音频
    Spark3.0 Sql 使用HiveTableScanExec 读取Hive orc表源码分析及参数调优
    asp.net城市公交线路查询系统sqlserver
    NDK编译构建C/CPP工程
    华为OD七日集训第7期 - 按算法分类,由易到难,循序渐进,玩转OD
    ADDS:卸载/降级域控制器
    Hudi数据湖相关资料
  • 原文地址:https://blog.csdn.net/m0_56079407/article/details/126557595