• 539、RabbitMQ详细入门教程系列 -【100%消息投递消费(一)】 2022.08.31


    一、前言概述

    生产者生产消息到消费者消息消费,中间需要生产者将消息发送到交换器,再由交换器路由到队列存储,然后消费者进行消息消费。在没有任何设置情况下,中间可能存在以下几种情况导致消息丢失:

    在这里插入图片描述

    1. 消费者将消息发送到交换器因为RabbitMQ内部原因丢失消息
    2. 交换器将消息路由到队列,因为队列不存在等因素导致消息丢失
    3. 队列中存储的消息在消费者未消费时RabbitMQ服务宕机导致消息丢失
    4. 消费者消费消息时消费者宕机未处理完消息导致消息丢失

    针对上述情况,本文将根据每个节点讲述如何操作确保消息投递的可靠性,同时在保障可靠性的情况下可能会引发系列如消息重复等问题,也是本文将会涉及到的重点

    二、事务TX

    理解数据库的事务就是将多次操作原子化,统一提交回滚实现数据一致性。RabbitMQ事务可能与之前数据库等事务有一定差别,当然也是支持消息的提交与回滚操作。事务是解决生产者发送消息到交换器消息丢失问题的方案之一

    2.1 相关操作

    RabbitMQ中的事务实现有如下两个个主要步骤:
    在这里插入图片描述

    1. 将通信的信道设置为事务模式
    2. 事务提交/事务回滚

    2.2 代码示例

            // 将信道设置为事务模式
            channel.txSelect();
            // 发送消息
            try {
                channel.basicPublish(bindingKey, bindingKey, false, false, null, messgae.getBytes("UTF-8"));
                // 提交事务
                channel.txCommit();
            }catch (Exception e){
                // 异常回滚事务,发生异常的时候可以尝试重发或者记录等操作
                channel.txRollback();
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.3 总结

    事务机制的确认需要等待上一条消息发送完毕反馈之后才能进行第二条消息发送,这样的操作将会对于使用RabbitMQ而言感觉就是暴殄天物。如果是想要发送多条消息只能循环操作,但是注意如果没有将channel信道设置为事务模式不能进行事务操作,不然会抛出异常。最后一点就是信道设置为事务模式只需要操作一次即可

    三、确认Confirm

    事务机制对于RabbitMQ性能消耗是灾难性的,针对生产者到交换器消息丢失处理提出了全新的轻量级处理方式。发送发确认机制confirm,该操作编码上会有很多地方都在讲什么等待确认、批量确认、异步确认。一切为了生产,所以本文将只会介绍生产用的异步确认方式

    3.1 相关操作

    RabbitMQ的生产发送确认实现由以下三个部分构成:

    • 将信道channel设置为确认模式
    • 增加确认监听Listener
    • 处理监听结果

    3.2 代码示例

    • 每个通道发送到RabbitMQ的Broker中都会有唯一的编码
    • 生产端最好使用有序队列存储发送的消息,方便确认后的删除
    • 创建Channel通道时可以指定唯一编码,标识该通道
            // 设置confirm消息发送确认机制
            channel.confirmSelect();
            // 增加确认机制监听器
            channel.addConfirmListener(new ConfirmListener() {
                /**
                 * 成功确认
                 * deliveryTag 表示消息的唯一标识
                 * multiple 本次确认是否为批量操作
                 */
                @Override
                public void handleAck (long deliveryTag, boolean multiple) throws IOException {
                    // 删除有序集合中的消息
                    if (! multiple){
                        // 根据坐标删除消息
                    }else {
                        // 批量删除消息
                    }
                }
        
                /**
                 * 失败确认
                 * deliveryTag 表示消息的唯一标识
                 * multiple 本次确认是否为批量操作
                 */
                @Override
                public void handleNack (long deliveryTag, boolean multiple) throws IOException {
                    // 可以根据deliveryTag做重试操作等
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    3.3 注意事项

    在这里插入图片描述

    • 共存:事务与确认机制不能共存,不然会异常
    • 查验:通过RabbitMQ可视化监控界面可看到Channels栏Mod属性T表示事务,C表示监听确认
    • 有序:因为RabbitMQ生成的序列deliveryTag是由小到大自动递增的,所以最好存储消息的时候考虑到
    • 顺序性,更方便通过deliveryTag定位到消息进行操作

    四、参考链接

    [01] RabbitMQ详细入门教程系列 -【100%消息投递消费(一)】

  • 相关阅读:
    基础—SQL—DQL(数据查询语言)排序查询
    GitHub百万下载量的高性能Java架构核心手册,到底有多牛?
    前端面试基础面试题——7
    面试__编程
    Three.js——骨骼动画
    JAVA:实现Sudoku数独算法(附完整源码)
    LeetCode(26)判断子序列【双指针】【简单】
    flink的副输出sideoutput单元测试
    Python如何自动操作电脑桌面应用程序
    DOA估计算法——Capon算法
  • 原文地址:https://blog.csdn.net/youyouwuxin1234/article/details/126620432