• RabbitMQ:发布确认模式



    📃个人主页:不断前进的皮卡丘
    🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
    🔥个人专栏:消息中间件

    1.基本介绍

    在这里插入图片描述

    生产者把信道设置成为confirm(确认)模式,一旦信道进入confirm模式,所有在这个信道上面发布的消息都会被指定唯一的一个ID(ID从1开始).一旦消息被投递到所有匹配的队列以后,broker就会发送一个确认给生产者(包含ID),这样使得生产者知道消息已经正确到底目的队列了。如果消息和队列是可持久化的,那么确认消息就会在消息被写入磁盘以后发出,broker回传给生产者的确认消息中delivery-tag包含了确认消息的序列号。

    • 在这里插入图片描述

    2.实现消息可靠传递的三个条件

    2.1队列持久化

    生产者发送消息到队列的时候,把durable参数设置为true(表示队列持久化)

    // 参数1 queue :队列名
    // 参数2 durable :是否持久化
    // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
    // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
    // 参数5 arguments
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.2消息持久化

    我们需要将消息标记为持久性 - 通过将消息属性(实现基本属性)设置为PERSISTENT_TEXT_PLAIN的值。

    //交换机名称,队列名称,消息持久化,消息
    channel.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
    
    • 1
    • 2
    • 3
    • 4

    2.3发布确认

    • 队列接收到生产者发送的数据以后,队列把消息保存在磁盘(为了实现持久化),队列会把最终的可靠性传递结果告诉给生产者,这就是发布确认。
    • 三种常用的发布确认策略:单个确认发布、批量确认发布、异步确认发布

    3.发布确认模式

    RabbitMQ的发布确认模式默认是没有开启的,我们可以通过调用channel.confirmSelect()方法来手动开启发布确认模式。

    3.1单个确认发布模式

    • 单个确认发布模式是一种简单的同步确认发布的方式。也就是说发布一个消息以后,只要确认它被确认发布,才可以继续发布后续的消息。
    • waitForConfirms(long)这一个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认,就会抛出异常。
    • 缺点:速度慢,因为如果没有确认消息的话,后面的消息都会被阻塞
    public class ConfirmMessage {
        //消息数量
        public static final int MSG_CNT=200;
        public static void main(String[] args) {
            //调用单个确认发布方法
            confirmSingleMessage();
        }
    
        public static void confirmSingleMessage() {
            try {
                //获取信道对象
                Channel channel = ConnectUtil.getChannel();
                //开启确认发布
                channel.confirmSelect();
                //声明队列
                String queue = UUID.randomUUID().toString();
                //队列持久化
                channel.queueDeclare(queue, true, false, false, null);
                //发送消息
                long start= System.currentTimeMillis();
                for (int i = 0; i < MSG_CNT; i++) {
                    String msg="消息:"+i;
                    //发送消息,消息需要持久化
                    channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
                    //服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息
                    boolean flag=channel.waitForConfirms();
                    if (flag){
                        System.out.println("————————第"+(i+1)+"条消息发送成功————————");
                    }else {
                        System.out.println("========第"+(i+1)+"条消息发送失败=========");
                    }
    
                }
                //记录结束时间
                long end=System.currentTimeMillis();
                System.out.println("发布:"+MSG_CNT+"个单独确认消息,耗时:"+(end-start)+"毫秒");
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    在这里插入图片描述

    3.2批量确认发布模式

    • 先发布一批信息然后一起确认可以大大提高吞吐量
    • 缺点:当故障发生的时候,我们不知道是哪一个消息出现了问题,我们需要把整个批处理保存在内存中,记录重要的信息后重新发布消息
    • 这种方案仍然是同步的方式,会阻塞消息的发布
    public class ConfirmMessage {
        //消息数量
        public static final int MSG_CNT = 200;
    
        public static void main(String[] args) {
            //调用单个确认发布方法
            //confirmSingleMessage();//发布:200个单独确认消息,耗时:192毫秒
            confirmBatchMessage();
        }
    
        public static void confirmSingleMessage() {
            try {
                //获取信道对象
                Channel channel = ConnectUtil.getChannel();
                //开启确认发布
                channel.confirmSelect();
                //声明队列
                String queue = UUID.randomUUID().toString();
                //队列持久化
                channel.queueDeclare(queue, true, false, false, null);
                //发送消息
                long start = System.currentTimeMillis();
                for (int i = 0; i < MSG_CNT; i++) {
                    String msg = "消息:" + i;
                    //发送消息,消息需要持久化
                    channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
                    //服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息
                    boolean flag = channel.waitForConfirms();
                    if (flag) {
                        System.out.println("————————第" + (i + 1) + "条消息发送成功————————");
                    } else {
                        System.out.println("========第" + (i + 1) + "条消息发送失败=========");
                    }
    
                }
                //记录结束时间
                long end = System.currentTimeMillis();
                System.out.println("发布:" + MSG_CNT + "个单独确认消息,耗时:" + (end - start) + "毫秒");
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void confirmBatchMessage() {
            try {
                //获取信道对象
                Channel channel = ConnectUtil.getChannel();
                //开启确认发布
                channel.confirmSelect();
                //批量确认消息数量
                int batchSize=20;
                //未确认消息数量
                int nackMessageCount=0;
                //声明队列
                String queue = UUID.randomUUID().toString();
                //队列持久化
                channel.queueDeclare(queue, true, false, false, null);
                //发送消息
                long start = System.currentTimeMillis();
                for (int i = 0; i < MSG_CNT; i++) {
                    String msg = "消息:" + i;
                    //发送消息,消息需要持久化
                    channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
                    //累加未确认的发布数量
                    nackMessageCount++;
                    //判断的未确认消息数量和批量确认消息的数量是否一致
                    if (nackMessageCount==batchSize){
                        //服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息
                        boolean flag = channel.waitForConfirms();
                        if (flag) {
                            System.out.println("————————第" + (i + 1) + "条消息发送成功————————");
                        } else {
                            System.out.println("========第" + (i + 1) + "条消息发送失败=========");
                        }
                        //清空未确认发布消息个数
                        nackMessageCount=0;
                    }
                }
                //为了确认剩下的是没有确认的消息,所以要再次进行确认
                if (nackMessageCount>0){
                    //再次重新确认
                    channel.waitForConfirms();
                }
                //记录结束时间
                long end = System.currentTimeMillis();
                System.out.println("发布:" + MSG_CNT + "个单独确认消息,耗时:" + (end - start) + "毫秒");
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    在这里插入图片描述

    3.3异步确认发布模式

    在这里插入图片描述

     //异步消息发布确认
        public static void publishMessageAsync() throws Exception {
            Channel channel = ConnectUtil.getChannel();
            //声明队列,此处使用UUID作为队列的名字
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null);
            //开启发布确认模式
            channel.confirmSelect();
            //创建ConcurrentSkipListMap集合(跳表集合)
            ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
            //确认收到消息回调函数
            ConfirmCallback ackCallBack = new ConfirmCallback() {
    
                @Override
                public void handle(long deliveryTag, boolean multiple) throws IOException {
                    //判断是否批量异步确认
                    if (multiple) {
                        //把集合中没有被确认的消息添加到该集合中
                        ConcurrentNavigableMap<Long, String> confirmed = concurrentSkipListMap.headMap(deliveryTag, true);
                        //清除该部分没有被确认的消息
                        confirmed.clear();
                    } else {
                        //只清除当前序列胡的消息
                        concurrentSkipListMap.remove(deliveryTag);
    
                    }
                    System.out.println("确认的消息序列序号:" + deliveryTag);
                }
            };
    
            //未被确认消息的回调函数
            ConfirmCallback nackCallBack = new ConfirmCallback() {
                @Override
                public void handle(long deliveryTag, boolean multiple) throws IOException {
                    //获取没有被确认的消息
                    String msg = concurrentSkipListMap.get(deliveryTag);
                    System.out.println("发布的消息:" + msg + "未被确认,该消息序列号:" + deliveryTag);
                }
            };
            //添加异步确认监听器
            channel.addConfirmListener(ackCallBack, nackCallBack);
            //记录开始时间
            long start = System.currentTimeMillis();
            //循环发送消息
            for (int i = 0; i < MSG_CNT; i++) {
                //消息内容
                String message = "消息:" + i;
                //把未确认的消息放到集合中,通过序列号和消息进行关联
    //            channel.getNextPublishSeqNo(); 获取下一个消息的序列号
                concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);
                //发送消息
                channel.basicPublish("", queueName, null, message.getBytes());
    
            }
            //记录结束时间
            long end = System.currentTimeMillis();
            System.out.println("发布"+MSG_CNT+"个批量确认消息,一共耗时:"+(end-start)+"毫秒");
    
    
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    在这里插入图片描述

  • 相关阅读:
    启动solr报错The stack size specified is too small,Specify at least 328k
    Python | GUI | tinker不完全总结
    A1052 Linked List Sorting(25分)PAT 甲级(Advanced Level) Practice(C++)满分题解【链表地址+排序]
    【nlp】2.8 注意力机制拓展
    封装Cookie的增删改查 history worker的使用 touch事件
    android 项目中导入flutter module
    git push rejected的原因
    微信小程序控制元素显示隐藏
    10x倍加速PDE的AI求解:元自动解码器求解参数化偏微分方程
    基于信通院 Serverless 工具链模型的实践:Serverless Devs
  • 原文地址:https://blog.csdn.net/qq_52797170/article/details/127226847