• RabbitMQ------发布确认高级(消息回调、回退、备份交换机)(八)


    RabbitMQ------发布确认高级(八)

    可能由于某些意外情况,导致RabbitMQ重启,在RabbitMQ重启过程中,生产者投递消息失败,导致消息丢失。
    如果才能够保证RabbitMQ的消息可靠性呢?
    可能出现两种问题的情况。
    1.交换机接收不到请求
    2.队列接收不到请求
    解决逻辑:当交换机确认发送消息后,将消息从内存中删除,未能删除的消息,通过定时任务,再次发送给交换机

    代码

    配置类:

    /**
     * 配置类 发布确认 高级
     */
    @Configuration
    public class ConfirmConfig {
    
        //交换机
        public static final  String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
        //队列
        public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
        //Routingkey
        public static final String CONFIRM_ROUTING_KEY = "key1";
    
        //声明交换机
        @Bean("confirmExchange")
        public DirectExchange confirmExchange(){
            return  new DirectExchange(CONFIRM_EXCHANGE_NAME);
        }
    
        //声明队列
        @Bean("confirmQueue")
        public Queue confirmQueue(){
            //创建队列的两种方式
            //QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
            return  new Queue(CONFIRM_QUEUE_NAME);
        }
    
        //绑定
        @Bean
        public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue ,
                                            @Qualifier("confirmExchange") DirectExchange confirmExchange){
            //将队列与交换机绑定
            return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
        } 
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    生产者:

    package com.example.springboot01.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import java.util.Date;
    
    /**
     * 发送延迟
     * 生产者
     */
    @Slf4j
    @RestController
    @RequestMapping("/ttl")
    public class SendMsgController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        //开始发消息  发布确认
        @GetMapping("/sendConfirmMsg/{message}")
        public void sendConfirmMsg(@PathVariable String message){
            log.info("当前时间:{},发送一条消息队列:{}",new Date().toString(),message);
            /**
             * 交换机
             * routingkey
             * message
             */
            rabbitTemplate.convertAndSend("confirm_exchange","key1",message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    消费者:

    package com.example.springboot01.consumer;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     *  消费者
     */
    @Slf4j
    @Component
    public class ConfrimConsumer {
    
        //接收消息
        @RabbitListener(queues = "confirm_queue")
        public void receiveConfirm(Message message) throws Exception {
            String string = new String(message.getBody(),"UTF-8");
            log.info("当前时间:{},收到确认消息:{}",new Date().toString(),string);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    经测试以上模板代码可行。

    交换机收不到消息,实现回调接口

    使用回调接口,前提需要在配置文件中增加配置,spring.rabbitmq.publisher-confirm-type有三种参数设置:
    1.NONE 禁用发布确认模式,是默认值
    2.correlated,发布消息成功到交换机后会触发回调方法(交换机接收成功或失败都会回调)
    3.simple
    它有两种效果,第一种和correlated一样触发回调方法
    第二种:消息在发送成功后使用rabbitTemplete调用waitForConfirms或者waitForConfirmOrDie方法,等待broker节点返回发送结果,根据返回结果判定下一步的逻辑。

    注意:waitForConfirmOrDie方法如果返回false,则会关闭channel,则接下来无法发送消息到broker。

    spring.rabbitmq.host=182.92.234.71
    spring.rabbitmq.prot=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123
    spring.rabbitmq.publisher-confirm-type=correlated
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    将发不出去的消息保存下来等等自定义的处理逻辑,都可以通过回调接口自己实现。
    在RabbitTemplate中提供了这样的回调接口,
    在这里插入图片描述
    这个接口是一个函数式接口,接口中只有一个方法,可以用lamdba表达式
    在这里插入图片描述
    如果失败了收不到信息,var2就会为false,并且var将失败原因返回。var1是发送的内容
    如果成功了,var2将会为true。
    实现回调接口,需要创建新的类,对其实现

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback {
        //实现类虽然实例化,但是它集成的是内部接口,并且MyCallBack并不在RabbitTemplate里面
        //所以导致RabbitTemplate在掉用自身接口时,根本掉不到MyCallBack
        //因此需要将MyCallBack注入到RabbitTemplate
        @Autowired 
        private RabbitTemplate rabbitTemplate;
        //注入
        //PostConstruct注解会在Component、Autowired注解完成后再执行
        @PostConstruct
        public void init(){
            rabbitTemplate.setConfirmCallback(this);
        }
        
        
        /**
         * 交换机确认回调方法    成功和失败都会回调
         * @param correlationData 保存消息的id以及相关信息
         * @param ack  交换机收到为true   失败为false
         * @param cause  成功为null   失败为错误原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                log.info("交换机已收到消息,ID为{}的消息",correlationData == null ? "":correlationData.getId());
            }else {
                log.info("交换机未收到消息,ID为{}的消息,原因为:",correlationData.getId(),cause);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    correlationData消息的相关信息,本质上不是交换机的,是由生产者发送的。
    而生产者的发送消息方法:convertAndSend,有很多重载方法。其中就包括传递correlationData的方法。因此需要再生产发送消息时new出该对象

        public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
            this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
        }
    
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述
    生产者代码:

     @GetMapping("/sendConfirmMsg/{message}")
        public void sendConfirmMsg(@PathVariable String message){
            log.info("当前时间:{},发送一条消息队列:{}",new Date().toString(),message);
    
            /**
             * 交换机
             * routingkey
             * message
             */
            CorrelationData correlationData = new CorrelationData("1");
            rabbitTemplate.convertAndSend("confirm_exchange","key1",message,correlationData);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    模拟交换机接受不到消息,可以将交换机的名字改错,
    结论:将生产者发送消息的队列名改错,就能够再控制台看到,接收失败时的回调函数处理方式
    但是这种,方式只能解决交换机接收不到消息的情况。

    队列收不到消息,回退消息

    Mandatory参数

    在仅开启生产者确认机制的情况下,交换机接收到消息后,会直接将消息生产发送确认消息,如果发现发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃的事件的。
    通过设置mandatory参数可以当消息传递过程中不可达目的时将消息返回给生产者。
    同样也有退回消息的配置。

    spring.rabbitmq.publisher-returns=true
    
    
    • 1
    • 2

    开启配置后,修改config代码,实现回退接口ReturnCallback
    同时需要注入

    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
        //实现类虽然实例化,但是它集成的是内部接口,并且MyCallBack并不在RabbitTemplate里面
        //所以导致RabbitTemplate在掉用自身接口时,根本掉不到MyCallBack
        //因此需要将MyCallBack注入到RabbitTemplate
        @Autowired
        private RabbitTemplate rabbitTemplate;
        //注入
        //PostConstruct注解会在Component、Autowired注解完成后再执行
        @PostConstruct
        public void init(){
            rabbitTemplate.setConfirmCallback(this);
            //注入
            rabbitTemplate.setReturnCallback(this);
        }
    
        
    
        /**
         * 交换机确认回调方法    成功和失败都会回调
         * @param correlationData 保存消息的id以及相关信息
         * @param ack  交换机收到为true   失败为false
         * @param cause  成功为null   失败为错误原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                log.info("交换机已收到消息,ID为{}的消息",correlationData == null ? "":correlationData.getId());
            }else {
                log.info("交换机未收到消息,ID为{}的消息,原因为:",correlationData.getId(),cause);
            }
        }
    
        /**
         * 队列回退接口  当消息不能到目的地时会返回给生产者
         * 只有不可达目的地时,才进行回退
         * @param message   消息主体
         * @param replyCode  退回码值
         * @param replyText  退回原因
         * @param exchange   交换机
         * @param routingKey  路由key
         */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.error("消息{},被交换机{}退回,退回原因{},路由key{}",new String(message.getBody()),exchange,replyText,routingKey);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    修改生产者代码,将routingkey改为key2,队列接收不到,触发回退接口,打印出日志。

    备份交换机

    当交换机接收到一条不可路由的消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为Fanout,这样就能把所有消息都投递到预期绑定的队列。当然也可以建立一个报警队列,用独立的消费者来进行监测和报警。
    示例代码:消费者,主交换机,主队列,备份交换机,备份队列,报警队列,
    修改配置文件
    1.声明备份交换机,fanout类型
    2.声明备份队列
    3.声明报警队列
    4.备份队列与备份交换机绑定
    5.报警队列与备份交换机绑定
    6.修改主交换机,配置参数,主交换机发送消息失败后,将消息转发到备份交换机,并且需要设置持久化

    ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
    .withArgument(“alternate-exchange”,BACKUP_EXCHANGE_NAME).build();

    /**
     * 配置类 发布确认 高级
     */
    @Configuration
    public class ConfirmConfig {
    
        //交换机
        public static final  String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
        //队列
        public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
        //Routingkey
        public static final String CONFIRM_ROUTING_KEY = "key1";
    
        //备份交换机
        public static final  String BACKUP_EXCHANGE_NAME = "backup_exchange";
        //备份队列
        public static final String BACKUP_QUEUE_NAME = "backup_queue";
        //报警队列
        public static final String WARING_QUEUE_NAME = "warning_queue";
    
        //声明交换机
        @Bean("confirmExchange")
        public DirectExchange confirmExchange(){
            return  ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                    .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
        }
    
        //声明队列
        @Bean("confirmQueue")
        public Queue confirmQueue(){
            //创建队列的两种方式
            //QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
            return  new Queue(CONFIRM_QUEUE_NAME);
        }
    
        //绑定
        @Bean
        public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue ,
                                            @Qualifier("confirmExchange") DirectExchange confirmExchange){
            //将队列与交换机绑定
            return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
        }
    
        //声明交换机
        @Bean("backupExchange")
        public FanoutExchange backupExchange(){
            return  new FanoutExchange(BACKUP_EXCHANGE_NAME);
        }
    
        //声明备份队列
        @Bean("backupQueue")
        public Queue backupQueue(){
            //创建队列的两种方式
            //QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
            return  new Queue(BACKUP_QUEUE_NAME);
        }
    
        //声明报警队列
        @Bean("warningQueue")
        public Queue warningQueue(){
            //创建队列的两种方式
            //QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
            return  new Queue(WARING_QUEUE_NAME);
        }
        //绑定  扇出类型不写routingkey
        @Bean
        public Binding backupQueueBindingExchange(@Qualifier("backupQueue") Queue backupQueue ,
                                            @Qualifier("backupExchange") FanoutExchange backupExchange){
            //将队列与交换机绑定
            return BindingBuilder.bind(backupQueue).to(backupExchange);
        }
        //绑定  扇出类型不写routingkey
        @Bean
        public Binding warningQueueBindingExchange(@Qualifier("warningQueue") Queue warningQueue ,
                                                  @Qualifier("backupExchange") FanoutExchange backupExchange){
            //将队列与交换机绑定
            return BindingBuilder.bind(warningQueue).to(backupExchange);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    消费者

    /**
     * 队列TTL 消费者
     */
    @Slf4j
    @Component
    public class DeadLetterConsumer {
    
        //接收消息
        @RabbitListener(queues = "QD")
        public void receiveD(Message message, Channel channel) throws Exception {
            String string = new String(message.getBody(),"UTF-8");
            log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),string);
    
        }
    
        //接收消息
        @RabbitListener(queues = "confirm_queue")
        public void receiveConfirm(Message message) throws Exception {
            String string = new String(message.getBody(),"UTF-8");
            log.info("当前时间:{},收到确认消息:{}",new Date().toString(),string);
        }
    
    
        //接收消息  报警消息
        @RabbitListener(queues = "warning_queue")
        public void Confirm(Message message) throws Exception {
            String string = new String(message.getBody(),"UTF-8");
            log.info("当前时间:{},收到报警消息:{}",new Date().toString(),string);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    mandatory参数与备份交换机可以一起使用,如果两者同时开启,消息会去哪里?
    备份交换机的优先级高!

  • 相关阅读:
    基于MYSQL的论坛管理系统数据库设计项目实战
    基于javaweb的毕业设计毕业论文管理系统(java+ssm+jsp+tomcat+mysql)
    低代码助力企业数字化升级
    Springboot毕设项目保险理赔管理系统312z6(java+VUE+Mybatis+Maven+Mysql)
    (附源码)spring boot北京冬奥会志愿者报名系统 毕业设计 150947
    JAVA实现数学函数图像
    1.4.26 实验26:华为综合ACL
    1.3 Linux目录操作
    FPGA时序分析与约束(6)——综合的基础知识
    Java使用UDP Socket实现回显服务
  • 原文地址:https://blog.csdn.net/cz_chen_zhuo/article/details/128044065