• 关于RabbitMQ的小总结


    问题:消息在转换机无法被路由

    发布确认高级作用在生产者发送到转换机,回退消息作用在消息在转换机无法被路由的情况(消息无法路由的意思是,消息在转换机没有匹配到对应的队列),进行消息回退,打印日志,但增加了生产者的复杂性。
    前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些 处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。

    解决方法:设置备份交换机

    RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由 备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑 定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

    注:若 转换机 设置 备份转换机,在消息在转换机无法被路由的情况下就不会调用 回退消息 的方法,消息 直接转发到备份转换机

    可参考一下代码:

    package com.ai.boy.config;
    
    import com.ai.boy.common.RabbitMqUtils;
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * RabbitMq配置类,声明队列、交换机
     * 绑定备份转换机(Fanout),并绑定备份队列、警告队列
     * 备份转换机 绑定在 正常转换机 上
     */
    @Configuration
    public class RabbitMqConfig {
    
        /**
         * 声明交换机 Exchange
         * 并设置该交换机的备份交换机
         * */
        @Bean("confirmExchange")
        public DirectExchange confirmExchange(){
            /**
             * return new DirectExchange(CONFIRM_EXCHANGE_NAME);
             * 若不设置备份交换机,按照以上即可
             * */
            return ExchangeBuilder.directExchange(RabbitMqUtils.CONFIRM_EXCHANGE_NAME)
                    .durable(true)
                    /**设置该交换机的备份交换机*/
                    .withArgument("alternate-exchange", RabbitMqUtils.BACKUP_EXCHANGE_NAME).build();
        }
    
        /**
         * 声明确认队列
         * */
        @Bean("confirmQueue")
        public Queue confirmQueue(){
            return QueueBuilder.durable(RabbitMqUtils.CONFIRM_QUEUE_NAME).build();
        }
    
        /**
         * 声明确认队列绑定关系
         * */
        @Bean
        public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                    @Qualifier("confirmExchange") DirectExchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(RabbitMqUtils.CONFIRM_KEY);
        }
    
        /**
         * 声明备份交换机 Exchange
         * */
        @Bean("backupExchange")
        public FanoutExchange backupExchange(){
            return new FanoutExchange(RabbitMqUtils.BACKUP_EXCHANGE_NAME);
        }
    
        /**
         * 声明备份队列
         * */
        @Bean("backQueue")
        public Queue backQueue(){
            return QueueBuilder.durable(RabbitMqUtils.BACKUP_QUEUE_NAME).build();
        }
    
        /**
         * 声明警告队列
         * */
        @Bean("warningQueue")
        public Queue warningQueue(){
            return QueueBuilder.durable(RabbitMqUtils.WARNING_QUEUE_NAME).build();
        }
    
        /**
         * 声明 备份队列 绑定关系
         * 备份队列绑定备份备份交换机
         * */
        @Bean
        public Binding backupBinding(@Qualifier("backQueue") Queue queue,
                                     @Qualifier("backupExchange") FanoutExchange backupExchange){
            return BindingBuilder.bind(queue).to(backupExchange);
        }
    
        /**
         * 声明 报警队列 绑定关系
         * 报警队列绑定备份备份交换机
         * */
        @Bean
        public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
                                      @Qualifier("backupExchange") FanoutExchange backupExchange){
            return BindingBuilder.bind(queue).to(backupExchange);
        }
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    package com.ai.boy.common;
    
    public class RabbitMqUtils {
    
        public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
        public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
        public static final String CONFIRM_KEY = "key1";
        public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
        public static final String BACKUP_QUEUE_NAME = "backup.queue";
        public static final String WARNING_QUEUE_NAME = "warning.queue";
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    发布确认高级、回退消息
    package com.ai.boy.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    /**
     * 发布确认高级、回退消息
     * 作用在生产者发送消息到转换机过程
     */
    @Component
    @Slf4j
    public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init(){
            rabbitTemplate.setConfirmCallback(this);
            /**
             * true:
             * 交换机无法将消息进行路由时,会将该消息返回给生产者
             * false:
             * 如果发现消息无法进行路由,则直接丢弃
             */
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnsCallback(this);
        }
    
        /**
         * 发布确认高级:
         * 交换机不管是否收到消息的一个回调方法
         * 参数:
         * CorrelationData 消息相关数据
         * ack 交换机是否收到消息
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    
            String id=correlationData!=null?correlationData.getId():"";
            if(ack){
                log.info("交换机已经收到 id 为:{}的消息",id);
            }else{
                log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
            }
        }
    
        /**
         * 回退消息:
         * 作用在消息在 转换机 无法被路由的情况下 执行 的回调方法
         * (发布确认高级 已确定消息已发送成功,但在转换机的消息没有匹配到对应的队列)
         * 注:若 转换机 设置 备份转换机,就不会调用该方法,消息直接转发到备份转换机
         * */
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            log.error("回退消息:{},被交换机{}退回,退回原因 :{},路由key:{}",
                    new String(returnedMessage.getMessage().getBody()),
                    returnedMessage.getExchange(),
                    returnedMessage.getReplyText(),
                    returnedMessage.getRoutingKey());
        }
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    RabbitMQ原有交换机

    在这里插入图片描述

  • 相关阅读:
    4WRKE16W6-200L-3X/6EG24EK31/A1D3M(R900977311)比例方向阀
    基于OpenVINO 2022.1 POT API实现YOLOv5模型INT8量化
    OpenCV图像特征提取学习二,Shi-Tomasi 角点检测算法
    快速掌握基本数据库查询语句,面试无烦恼!!
    鸿蒙系统(HarmonyOS)--第一章
    在TensorFlow中使用GAN生成图像
    Python【数据分析第二阶段测试】
    【JAVA面试】JAVA面试指南
    Python 潮流周刊#21:如何提升及测量 Python 代码的性能?
    排查 Spring Boot 没有你想的那么简单
  • 原文地址:https://blog.csdn.net/weixin_45961836/article/details/134206490