• 【学习笔记】RabbitMQ-5 消息的可靠性投递 以及示例代码


    参考资料

    八、RabbitMQ的确认机制 -confirm

    8.1 Confirm 模式简介

    消息队列的 confirm 确认机制,是指生产者投递消息后,到达了消息服务器 Broker 里面的exchange 交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到 Broker 的 exchange 中。

    这也是消息可靠性投递的重要保障。

    用于确保消息传递是否正常,但是会牺牲一些性能,但是提高了系统运行的稳定性

    image-20231017161931900

    8.2 具体代码设置

    8.2.1 设置思路

    image-20231017162057149

    8.2.2 代码实现

    基于之前的案例中的直连路由器进行测试。

    需要传递如下的参数才能成功发送消息。

    image-20231017164313453

    8.2.2.1 开启生产者的确认模式.

    在配置文件中,开启生产者的确认模式:

    • 与CorrelationData一起使用可将确认与已发送的消息关联起来。
    # Use with CorrelationData to correlate confirmations with sent messsages.
    publisher-comfirm-type: correlated
    
    • 1
    • 2
    8.2.2.2 实现接口ComfirmCallback
    @Component
    @Slf4j
    public class DemoRabbitCallback implements RabbitTemplate.ConfirmCallback {
        /**
         * 证实
         *
         * @param correlationData 相关数据
         * @param ack             是否应答成功
         * @param cause           原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack){
                log.info("(确认模式) 消息正常发送:{}" , correlationData);
                return;
            }
            log.error("(确认模式) 消息发送异常 :{},异常原因:{}" , correlationData , cause);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    8.2.2.3 配置rabbitTemplate并发送消息
    @RestController
    @RequestMapping("/confirm")
    @Slf4j
    public class ConfirmTestController {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Resource
        private DemoRabbitCallback demoRabbitCallback;
    
    
        @PostConstruct
        public void init() {
            rabbitTemplate.setConfirmCallback(demoRabbitCallback);
            log.info("(确认模式)成功配置回调方法");
        }
    
        @GetMapping("/{key}")
        public void sentErrorMsg(@PathVariable("key") String key) {
            String msg = "...确认模式测试消息...";
            // 声明一个相关性数据,可以用于辅助确认本次请求的信息
            CorrelationData correlationData = new CorrelationData();
            String uuid = "ORDER_NUM_"+UUID.randomUUID();
            correlationData.setId(uuid);
    
            log.info("(确认模式)准备发送的信息:{} , 交换机名称是 :{} , 相关数据信息:{} ", msg, key , correlationData);
            //  发送消息时添加参数:CorrelationData
            rabbitTemplate.convertAndSend(key,
                    "error",
                    msg.getBytes(StandardCharsets.UTF_8),
                    correlationData);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    8.2.2.3 测试运行

    尝试发送一个交换机名称错误的信息。异常信息如下:

    (确认模式) 消息发送异常 :CorrelationData [id=ORDER_NUM_084864b0-ee07-4e60-b19c-d360edeecb27],异常原因:channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'xcong.directaaaaa' in vhost 'hc-test', class-id=60, method-id=40)
    
    
    • 1
    • 2

    发送一个正确的名称时/confirm/xcong.direct:也会打印日志:

    (确认模式) 消息正常发送:CorrelationData [id=ORDER_NUM_744c6441-4075-4c4b-ba3f-530578b901aa]
    
    • 1

    进入控制台也可以看该消息,而且可以发现,相关信息是存放在headers中

    image-20231017165933441

    8.3 使用匿名内部类优化代码写法

    教程里提到的写法,将消息发送的服务和回调方法的实现合在一个类中完成。类似下面这样

    image-20231017170358826

    省一个类而已,局限性比较大,不适合全局部署。根据实际需求场景使用。

    另外一种方法即使用匿名内部类。效果如下,跟上面的方法类似,不是很好用。

    image-20231017170608219

    8.4 ⭐️ 推荐-使用lambda表达式实现确认模式

    • 代码简洁
    • 阅读直观

    当接口只有一个方法,且添加了注解@FunctionalInterface时,就可以尝试用lambda表示简洁写法

    最终效果改造如下(测试和之前的示例一样

    @RestController
    @RequestMapping("/v2/confirm")
    @Slf4j
    public class ConfirmTestController2 {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
    
        @PostConstruct
        public void init() {
            rabbitTemplate.setConfirmCallback(
                    // 参数类型可以省略 CorrelationData correlationData, boolean ack, String cause
                    (correlationData, ack, cause) -> {
                        if (ack) {
                            log.info("(确认模式) 消息正常发送:{}", correlationData);
                            return;
                        }
                        log.error("(确认模式) 消息发送异常 :{},异常原因:{}", correlationData, cause);
                    }
            );
            log.info("(确认模式)成功配置回调方法");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    九、RabbitMQ消息Return模式

    9.1 返回模式概念

    刚才我们已经确保了“生产者——>交换机”的消息可靠性,接下来的返回模式就是确保“exchange——>queue”的消息可靠性

    • P -> X ——确认模式
    • X -> Q ——返回模式

    9.2 具体代码设置

    思路和确认模式类型,不贴代码了,直接截图作为参考即可

    9.2.1 开启确认模式

    在配置文件中开启确认模式

    publisher-returns: true
    
    • 1
    9.2.2 实现ReturnsCallback接口

    image-20231017172335306

    9.2.3 设置回调函数

    image-20231017172353571

    9.2.3 ⭐️代码用例(lambda
    @Slf4j
    @RestController
    @RequestMapping("/return")
    public class ReturnTestController {
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            rabbitTemplate.setReturnsCallback(
                    (returnedMessage) -> {
                        log.info("(返回模式) 异常信息内容是:{}", returnedMessage);
                    }
            );
        }
    
        @GetMapping("/{key}/{msg}")
        public void sentErrorMsg(@PathVariable("msg") String msg, @PathVariable("key") String key) {
            log.info("(返回模式) 准备发送的信息:{} , 路由键 :{}", msg, key);
            rabbitTemplate.convertAndSend(exchangeName, key, msg.getBytes(StandardCharsets.UTF_8));
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    9.2.4 测试结果
    • 发送一个错误的路由key/return/error1/返回模式测试消息

      image-20231018092413244

    • 发送一个正确的路由key时:不会触发回调函数

      image-20231018092449880

    9.3 ⭐️使用Lambda函数实现返回模式

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnsCallback(
            returnedMessage-> {
                log.info("(返回模式) 异常信息内容是:{}", returnedMessage);
            }
        );
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    十、RabbitMQ交换机的属性

    10.1 具体属性

    Snipaste_2023-10-18_10-16-13

    1. Name:名称
    2. Type:类型,direct/fanout/topic/headers
    3. Durability:持久化
    4. auto Delete:自动删除。
    5. intenal:内部交换机
    6. Arguments:自定义参数
      • Alternate exchange:备用交换机

    10.2 交换机持久化

    • 声明交换机是否持久化,服务器重启后是否还在
    • 默认是持久化
    • 如果是非持久化,服务器重启后,交换机的数据就会丢失

    10.3 自动删除

    • 如果true,当所有的队列和交换接触和交换机的绑定后,交换器将删除自身
    • 删除的结点:最后一个队列或者交换机接触绑定时。
    • 默认为false

    10.4 内部交换机和备用交换机

    • intenal属性值为yes(true)时,该交换机无法被客户端直接访问,只能作为和其他交换机的相互绑定中。
    • 在参数配置中,添加alternate-exchange的值,可以声明一个备用交换机:
      • 当一个消息无法被routed时,可以将他们的信息转发到备用交换机上。

    实际的应用场景

    可以用于存储客户端发送错误routingKey的信息,防止消息发送时指定了错误的key,导致消息丢失的情况。

    也是一种保证消息可靠性投递的方式

  • 相关阅读:
    被玩坏的数组排序之sort函数
    自主WebServer实现
    【回溯算法】leetcode 46. 全排列
    oracle11g-图形安装(centos7)
    stc8a8k64s4a12单片机声音检测编程
    unipush2.0实现APP消息推送(2)云函数多个方法的创建与使用
    python中hasattr()函数用法详解
    ES集群搭建及Kibana安装
    改进YOLOv7系列:22.最新HorNet结合YOLOv7应用! | 多种搭配,即插即用 | Backbone主干、递归门控卷积的高效高阶空间交互
    宠物店会员管理系统| 宠物店小程序
  • 原文地址:https://blog.csdn.net/Xcong_Zhu/article/details/133903167