• RabbitMQ------发布确认(单个确认、批量确认、未确认)(四)


    RabbitMQ------发布确认(四)

    发布确认原理

    生产者将信道设置为confirm模式,一旦信道进入confirm模式,所有再该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的地的队列了,如果消息和队列是可以持久化的,那么确认消息就会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic,ack的multiple域,标识到这个序列号之前所有的消息都已经得到了处理。
    confirm模式最大的好处在于,它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时,继续发送下一条消息,当消息最终端得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以再回调方法中处理该nack消息。
    生产者将消息发送到队列中,消息写在磁盘上,达到持久化的目标,RabbitMQ再给消息发送消息确认,才能够达到消息永不丢失的目的

    发布确认策略

    1.开启发布确认的方法

    发布确认默认没有开始,如果要开启需要调用confirmSelect方法,每当想要使用发布确认,都需要在channel上调用该方法。

            //一个连接中有多个信道
            //获取信道
            Channel channel = connection.createChannel();
            //开启发布确认
            channel.confirmSelect();
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. 单个确认发布、批量确认发布、异步确认发布(3种确认发布策略)

    2.1 单个确认发布

    简单的确认方式,它是一种同步确认发布模式。意味着生产者发布一个消息后,需等待确认结果,才能够发布下一个消息。
    缺点:效率低,发布慢。
    开启确认模式。

            //开启确认模式
            channel.confirmSelect();
    
    • 1
    • 2

    等待确认结果。

    boolean b = channel.waitForConfirms();
    
    • 1

    单个确认模式,发布一千条消息结果

        //单个确认,结果耗时722ms
        public static  void publishMessageIndividually() throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //队列声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,true,false,false,null);
            //开启确认模式
            channel.confirmSelect();
            //开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0;i< 1000 ;i++){
                String mes = i + "";
                channel.basicPublish("",queueName,null,mes.getBytes());
                //单个消息就马上进行发布确认
                boolean b = channel.waitForConfirms();
                if (b){
                    System.out.println("消息发布确认成功");
                }
            }
            //结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发布消息1000条单独确认消息,用时:"+(endTime-beginTime));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    2.2 批量确认发布

    缺点:当发生故障,导致发布出现问题时,不知道是哪个消息出现问题。并且这种方案仍然是同步的,一样阻塞消息的发布。
    优点:性能比单个确认发布高一点。
    隔100条确认一次,
    代码相对单笔确认,只修改了确认方法的调用逻辑位置

    waitForConfirms
    
    • 1

    代码:

      //批量确认,每100条确认一次  147ms
        public static  void publishMessageBatch() throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //队列声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,true,false,false,null);
            //开启确认模式
            channel.confirmSelect();
            //开始时间
            long beginTime = System.currentTimeMillis();
            int y =  100;
            for (int i = 0;i< 1000 ;i++){
                String mes = i + "";
                channel.basicPublish("",queueName,null,mes.getBytes());
                if (i%y ==  0){
                    //确认发布
                    boolean b = channel.waitForConfirms();
                    if (b){
                        System.out.println("消息发布确认成功");
                    }
                }
            }
            //结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发布消息1000条批量确认消息,用时:"+(endTime-beginTime));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    2.3 异步确认发布

    异步确认虽然编程逻辑比其他两个复杂,但是性价比最高。它是利用回调函数,来达到消息可靠性传递,这个中间件也是通过函数来保证是否投递成功。
    broker:消息的实体,包含交换机和队列等。异步通知
    无论消息队列是否收到,都会对生产者进行应答。确认应答、未确认应答。
    需要开启对消息的监听。

    //添加异步确认,消息监听器
    //ConfirmCallback ackCallback, ConfirmCallback nackCallback
     /**
     *  ackCallback 监听哪些消息确认成功了
      *  nackCallback 监听哪些消息确认失败了
         */
     channel.addConfirmListener(ackCallback,nackCallback);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    并且实现消息回调的方法。

            //确认消息,回调处理
            /**
             * deliveryTag  消息的标价
             * multiple 是否为批量确认
             */
            ConfirmCallback ackCallback = (long deliveryTag, boolean multiple)->{
                System.out.println("消息确认"+deliveryTag);
            };
            //未确认消息,回调处理
            ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) ->{
                System.out.println("消息未确认"+deliveryTag);
            };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    实例代码:

    //异步确认 62ms
        public static  void publishMessageAsync() throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //队列声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,true,false,false,null);
            //开启确认模式
            channel.confirmSelect();
            
            //开始时间
            long beginTime = System.currentTimeMillis();
    
            //确认消息,回调处理
            /**
             * deliveryTag  消息的标价
             * multiple 是否为批量确认
             */
            ConfirmCallback ackCallback = (long deliveryTag, boolean multiple)->{
                System.out.println("消息确认"+deliveryTag);
            };
            //未确认消息,回调处理
            ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) ->{
                System.out.println("消息未确认"+deliveryTag);
            };
            //添加异步确认,消息监听器
            //ConfirmCallback ackCallback, ConfirmCallback nackCallback
            /**
             *  ackCallback 监听哪些消息确认成功了
             *  nackCallback 监听哪些消息确认失败了
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            for (int i = 0;i< 1000 ;i++){
                String mes = i + "";
                channel.basicPublish("",queueName,null,mes.getBytes());
            }
            //结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发布消息1000条批量确认消息,用时:"+(endTime-beginTime));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    如何处理未确认消息处理

    最好的解决方案,将未确认消息放在一个基于内存的,能被发布线程访问的队列,比如说ConcurrentLinkedQueue这个队列(并发链路式队列),confirm callbacks(确认回调)与发布线程之间进行消息传递。它可以在确认回调与发布线程之间进行消息传递。
    此处是使用ConcurrentSkipListMap,用来记录消息与消息的标识。
    两步:
    1.发送消息时,将消息和消息标记,添加到map中
    2.确认成功后,将消息移除,则容器中剩下的都是未确认的消息。
    代码示例:

        //异步确认 62ms
        public static  void publishMessageAsync() throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //队列声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,true,false,false,null);
            //开启确认模式
            channel.confirmSelect();
    
            /**
             * 准备一个线程 安全、有序的哈希表  适用于高并发的情况下
             * 1.轻松地将序号与消息进行关联
             * 2.轻松的批量删除条目,只要给序号
             * 3.支持高并发(多线程)
              */
            ConcurrentSkipListMap<Long,String> skipListMap = new ConcurrentSkipListMap();
    
            //确认消息,回调处理
            /**
             * deliveryTag  消息的标价
             * multiple 是否为批量确认
             */
            ConfirmCallback ackCallback = (long deliveryTag, boolean multiple)->{
                //2.删除掉已经确认的消息
                if (multiple){
                    ConcurrentNavigableMap<Long, String> confiemed = skipListMap.headMap(deliveryTag);
                    confiemed.clear();
                }else {
                    skipListMap.remove(deliveryTag);
                }
                System.out.println("消息确认"+deliveryTag);
            };
            //未确认消息,回调处理
            ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) ->{
                String s = skipListMap.get(deliveryTag);
                System.out.println("消息未确认"+deliveryTag);
                System.out.println("消息未确认"+s);
            };
            //添加异步确认,消息监听器
            //ConfirmCallback ackCallback, ConfirmCallback nackCallback
            /**
             *  ackCallback 监听哪些消息确认成功了
             *  nackCallback 监听哪些消息确认失败了
             */
            channel.addConfirmListener(ackCallback,nackCallback);
            //开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0;i< 1000 ;i++){
                String mes = i + "";
                channel.basicPublish("",queueName,null,mes.getBytes());
                //1.此处记录下所有要发送的消息,消息总和
                skipListMap.put(channel.getNextPublishSeqNo(), mes);
            }
    
            //结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("发布消息1000条批量确认消息,用时:"+(endTime-beginTime));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    结论 :建议使用异步确认方式,单个确认耗时太久,批量确认无法判断具体未确认消息节点。

    核心方法:开启发布确认( channel.confirmSelect();)

  • 相关阅读:
    Kafka 二、集群搭建与实战
    图解系统之 内存管理 —— 摘自小林coding
    MOTOROLA MVME5500 数字量控制模块
    jenkins+allure+邮件发送配置
    EE5805-Java-summary
    Alibaba Sentinel - 滑动窗口
    使用vcpkg安装指定版本的开源软件
    【PostgreSQL内核学习(十五)—— (ExecutorRun)】
    python的练习
    关于gdb调试: 你的问题可能会在这里找到答案
  • 原文地址:https://blog.csdn.net/cz_chen_zhuo/article/details/127717625