• rabbitMQ学习-发布和确认


    发布和确认

    生产者 - - 发送消息 – 队列hello 🍎必须保存在磁盘上才能达到持久化操作。

    1. 设置要去队列必须持久化
    2. 设置要求队列中的消息必须持久化
    3. 发布确认

    单个确认发布:

    每次生产者生产一个消息他都会确认一次,这样的好处就是,如果发送信息丢失可以很容易确定位置,缺点是慢

     //单个确认
        public static void publicMessageIndividually() throws IOException, TimeoutException, InterruptedException {
            Channel channel = GetConnection.getChannel();
    
            //队列的声明
            String queueName = UUID.randomUUID().toString();
    
            //使用信道对队列进行声明
            channel.queueDeclare(queueName,true,false,false,null);
            //开启发布确认
            channel.confirmSelect();
            //开始时间
            long begin = System.currentTimeMillis();
    
            //批量发消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("",queueName,null,message.getBytes());
                //单个消息就马上进行发布确认
                boolean flag = channel.waitForConfirms();
                if(flag){
                    System.out.println("消息发送成功");
                }
            }
            //结束时间
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "单个消息" + (end-begin)+"毫秒");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    批量确认发布:

    批量发布对比单个发布速度快了很多,但是这个碰到问题去查找问题的时候就比较慢了。

    //批量发布
        public static void publicMessageBatch() throws IOException, TimeoutException, InterruptedException {
            Channel channel = GetConnection.getChannel();
    
            //队列的声明
            String queueName = UUID.randomUUID().toString();
    
            //使用信道对队列进行声明
            channel.queueDeclare(queueName,true,false,false,null);
            //开启发布确认
            channel.confirmSelect();
            //开始时间
            long begin = System.currentTimeMillis();
    
            //批量确认消息的大小
            int batchSize = 10;
            //未确认消息个数
    
            //批量发消息,批量发布确认
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("",queueName,null,message.getBytes());
                //单个消息就马上进行发布确认
                boolean flag = channel.waitForConfirms();
    
                //消息达到10确认一次
                if(i%batchSize == 0){
                    channel.waitForConfirms();
                }
            }
            //结束时间
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "发布100个消息" + (end-begin) +"毫秒");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    异步确认发布

    对比单步和批量这个就厉害多了,他可以异常进行,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。
    在这里插入图片描述
    寄快件的人疯狂发,然后到broker中选择哪些需要确认的进行确认。

     //异步
        public static void publishMessageAsync() throws IOException, TimeoutException {
            Channel channel = GetConnection.getChannel();
    
            //队列的声明
            String queueName = UUID.randomUUID().toString();
    
            //使用信道对队列进行声明
            channel.queueDeclare(queueName,true,false,false,null);
            //开启发布确认
            channel.confirmSelect();
            //开始时间
            long begin = System.currentTimeMillis();
    
            //消息确认成功,回调函数
            ConfirmCallback ackCallback = (delivery,multiple)->{
                System.out.println("确认的消息:" + delivery);
            };
            //消息确认失败,回调函数
            /*
              1.消息的标记
              2.消息是否为批量
             */
            ConfirmCallback nackCallback = (delivery,multiple)->{
                System.out.println("未确认的消息:" + delivery);
            };
    
            //在发消息的时候,你需要准备一个监听器,监听哪些消息成功了,哪些消息失效了
            /*
            1.监听哪些消息成功了
            2.监听哪些消息失败了
             */
            channel.addConfirmListener(ackCallback,nackCallback);
    
            //批量发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "消息" + i;
                channel.basicPublish("",queueName,null,message.getBytes());
                //发布确认,这个时候要是进行发布确认就是同步了
            }
    
    
    
            //结束时间
            long end =  System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "异步发布确认消息,耗时" + (end-begin) +"毫秒");
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    这就出现一个问题,如何处理异步未确认消息

    最好的解决的解决方案,就是把未确认的消息放到一个基于内存的能发布线程访问的队列,比如说ConcurrentLinkedQueue这个队列在confirm
    callbacks与发布线程之间进行消息的传递。

     //异步
        public static void publishMessageAsync() throws IOException, TimeoutException {
            Channel channel = GetConnection.getChannel();
    
            //队列的声明
            String queueName = UUID.randomUUID().toString();
    
            //使用信道对队列进行声明
            channel.queueDeclare(queueName,true,false,false,null);
            //开启发布确认
            channel.confirmSelect();
    
            /*
            线程安全有序的一个哈希表,适用于高并发的情况下的
            1.轻松将序号和消息进行关联
            2.轻松批量删除条目,只要给到序号
            3.支持高并发(多线程)
             */
            ConcurrentSkipListMap<Long,String>  outstandingConfirms=
                    new ConcurrentSkipListMap<>();
    
            //消息确认成功,回调函数
            ConfirmCallback ackCallback = (delivery,multiple)->{
             if(multiple){
                 //删除已经确认的消息,剩下的就是为确认的消息
                 ConcurrentNavigableMap<Long, String> confirmed =
                          outstandingConfirms.headMap(delivery);
                 confirmed.clear();
             }else{
                 outstandingConfirms.remove(delivery);
             }
                System.out.println("确认的消息:" + delivery);
            };
            //消息确认失败,回调函数
            /*
              1.消息的标记
              2.消息是否为批量
             */
            ConfirmCallback nackCallback = (delivery,multiple)->{
               //3.打印一下未确认的消息都有哪些
              String message =  outstandingConfirms.get(delivery);
                System.out.println("未确认的消息是:" + message+"未确认的标记:" + delivery);
            };
    
            //在发消息的时候,你需要准备一个监听器,监听哪些消息成功了,哪些消息失效了
            /*
            1.监听哪些消息成功了
            2.监听哪些消息失败了
             */
            channel.addConfirmListener(ackCallback,nackCallback); // 异步通知
    
            //开始时间
            long begin = System.currentTimeMillis();
    
    
            //批量发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "消息" + i;
                channel.basicPublish("",queueName,null,message.getBytes());
                //发布确认,这个时候要是进行发布确认就是同步了
                /*
                1。此处记录下所有要发送的消息,消息的总和
                 */
           outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
                System.out.println();
            }
            //结束时间
            long end =  System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "异步发布确认消息,耗时" + (end-begin) +"毫秒");
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    以上三种发布的确认速度对比

    单步发布消息

    • 同步等待确认,简单,但是吞吐量非常有限

    批量发布确认

    • 批量同步等待确认,加单,合理的吞吐量,一旦出现问题,但很难推断出是那条消息出现了问题。

    异步发布确认

    • 最佳性能和资源使用,在出现错误的情况下可以很好的控制,但是实现起来稍微难一点。
  • 相关阅读:
    【线性代数基础进阶】行列式-补充+练习
    vue3播放音频
    Python入门之控制结构 - 循环结构
    Python数据分析--Numpy常用函数介绍(7)--Numpy中矩阵和通用函数
    UE4 C++ ActionRoguelike开发记录
    计算变量占用的内存sys.getsizeof()方法
    《计算机视觉基础知识蓝皮书》第7篇 模型优化方法及思路
    Credly 数字证书
    这可能是最全的SpringBoot3新版本变化了!
    GO开发环境配置
  • 原文地址:https://blog.csdn.net/qq_45922256/article/details/127794455