• 关于 自定义的RabbitMQ的RabbitMessageContainer注解-实现原理


    概述

    RabbitMessageContainer注解 的主要作用就是 替换掉@Configuration配置类中的各种@Bean配置;

    采用注解的方式可以让我们 固化配置,降低代码编写复杂度、减少配置错误情况的发生,提升编码调试的效率、提高业务的可用性。

    • 为什么说“降低代码编写的复杂度”呢?因为,用一行注解代替了原本好几十行的代码。
    • 为什么说“减少配置错误情况的发生,提升编码调试的效率”呢?因为,开发者从其他@Configuration配置文件复制粘贴的代码,有时会忘记修改某些Bean名称,而启动又不会报错,最终会导致队列没有消费者,需要浪费时间排查问题。
    • 为什么说“提高业务的可用性”呢?因为,组件默认配置了死信队列机制,当消费失败的时候,将异常抛出即可重试,避免因为没有配置死信队列而导致消息丢失。(如果继承AbstractJdkSerializeListener/AbstractJsonSerializeListener可以在重试一定次数后将消息落库并且丢弃)

    接入方式

    该组件使用Spring Boot的自动装配能力,只需要引入pom依赖即可完成接入。

    1.     com.ccbscf
    2.     ccbscf-biz-enhancer-rabbitmq-starter
    3.     1.0.1-SNAPSHOT

    支持哪些能力?

    简单来说,以前@Bean注入方式常用的能力,这个组件都支持,以下是具体注解信息及属性配置:

    • com.ccbscf.biz.enhancer.rabbitmq.annotation.RabbitMessageContainer注解
    1. /**
    2.  * 向spring中注入SimpleMessageListenerContainer容器
    3.  * 暂时只对Container的acknowledgeMode、exposeListenerChannel、prefetchCount、concurrentConsumers、maxConcurrentConsumers提供了赋值的扩展,如果需要其他的字段赋值,需要升级组件
    4.  */
    5. @Target({ElementType.TYPE})
    6. @Retention(RetentionPolicy.RUNTIME)
    7. @Documented
    8. public @interface RabbitMessageContainer {
    9.     /**
    10.      * container的name,向spring容器注入bean
    11.      * @return
    12.      */
    13.     String value();
    14.     /**
    15.      * 定义绑定关系,队列、交换器、路由key的定义都在这里面
    16.      * 这里为什么是定义数组呢,因为同一个Container是可以绑定多个队列的,因此这里是数组;
    17.      * @return
    18.      */
    19.     QueueBinding[] bindings();
    20.     /**
    21.      * @return
    22.      * @see AbstractMessageListenerContainer#setAcknowledgeMode(org.springframework.amqp.core.AcknowledgeMode)
    23.      */
    24.     AcknowledgeMode acknowledgeMode() default AcknowledgeMode.MANUAL;
    25.     /**
    26.      * @return
    27.      * @see AbstractMessageListenerContainer#setExposeListenerChannel(boolean)
    28.      */
    29.     boolean exposeListenerChannel() default true;
    30.     /**
    31.      * @return
    32.      * @see SimpleMessageListenerContainer#setPrefetchCount(int)
    33.      */
    34.     int prefetchCount() default 5;
    35.     /**
    36.      * @return
    37.      * @see SimpleMessageListenerContainer#setConcurrentConsumers(int)
    38.      */
    39.     int concurrentConsumers() default 1;
    40.     /**
    41.      * @return
    42.      * @see SimpleMessageListenerContainer#setMaxConcurrentConsumers(int)
    43.      */
    44.     int maxConcurrentConsumers() default 1;
    45.     /**
    46.      * 失败 抛出异常 捕捉到异常以后 是否进行重试 默认重试
    47.      * @return
    48.      */
    49.     boolean needRetry() default true;
    50.     /**
    51.      * 自定义的Listener维度的重试次数上限
    52.      * @return
    53.      */
    54.     int customerRetriesLimitForListener() default -1;
    55.     /**
    56.      * 重试时间间隔
    57.      * @return
    58.      */
    59.     long retryTimeInterval() default -1;
    60. }

    上面是@RabbitMessageContainer注解的源代码;原本@Bean中SimpleMessageListenerContainer常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。

    除了实现@Bean方式常用字段,另外增加了以下几个功能字段:

    • needRetry:失败 抛出异常 捕捉到异常以后 是否进行重试? 默认重试
    • customerRetriesLimitForListener:自定义的Listener维度的重试次数上限,此优先级高于全局的次数上限配置
    • retryTimeInterval:重试时间间隔,固定时间间隔,不支持梯度;这个配置是加在队列参数上的,一旦配置生效,就无法修改,这个RabbitMQ的特性

    为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:

    1. public static SimpleMessageListenerContainer buildSimpleMessageListenerContainer(Queue queue, ConnectionFactory connectionFactory, Object messageListener) {
    2.     SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
    3.     simpleMessageListenerContainer.setQueues(queue);
    4.     simpleMessageListenerContainer.setMaxConcurrentConsumers(1);
    5.     simpleMessageListenerContainer.setConcurrentConsumers(1);
    6.     simpleMessageListenerContainer.setPrefetchCount(5);
    7.     simpleMessageListenerContainer.setExposeListenerChannel(true);
    8.     simpleMessageListenerContainer.setMessageListener(messageListener);
    9.     simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    10.     return simpleMessageListenerContainer;
    11. }

     

    • com.ccbscf.biz​​​​​​​.enhancer.rabbitmq.annotation.QueueBinding注解
    1. @Target({})
    2. @Retention(RetentionPolicy.RUNTIME)
    3. public @interface QueueBinding {
    4.     /**
    5.      * 绑定关系的name,主要用于向容器中注入bean的名称
    6.      * @return
    7.      */
    8.     String value();
    9.     /**
    10.      * @return the queue.
    11.      */
    12.     Queue queue();
    13.     /**
    14.      * @return the exchange.
    15.      */
    16.     Exchange exchange();
    17.     /**
    18.      * @return the routing key or pattern for the binding.
    19.      */
    20.     String key() default "";
    21. }

    上面是@QueueBinding注解的源代码;原本@Bean中Binding常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。

    为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:

    1. @Bean
    2.     public Binding sendSuperviseBinding(TopicExchange approveDocDatumTopicExchange) {
    3.         return BindingBuilder.bind(sendSuperviseQueue()).to(approveDocDatumTopicExchange).with(DOC_DATUM_TOPIC_APPROVE_ROUTING_KEY);
    4.     }
    1. @Target({})
    2. @Retention(RetentionPolicy.RUNTIME)
    3. public @interface Queue {
    4.     /**
    5.      * @return the queue name or "" for a generated queue name (default).
    6.      */
    7.     String value();
    8.     /**
    9.      * @return true if the queue is to be declared as durable.
    10.      */
    11.     boolean durable() default true;
    12.     /**
    13.      * @return true if the queue is to be declared as exclusive.
    14.      */
    15.     boolean exclusive() default false;
    16.     /**
    17.      * @return true if the queue is to be declared as auto-delete.
    18.      */
    19.     boolean autoDelete() default false;
    20.     /**
    21.      * 是否延迟队列
    22.      * @return
    23.      */
    24.     boolean delayConsumer() default false;
    25.     /**
    26.      * delayConsumer为true的情况下该字段才会生效,单位:ms
    27.      * 如果设置了delayConsumer=true延迟队消费开启,但是未设置delayTime延迟消费时间,默认值是10分钟
    28.      * @return
    29.      */
    30.     long delayTime() default -1;
    31. }

    上面是@Queue注解的源代码;原本@Bean中Queue常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。

    除了实现@Bean方式常用字段,另外增加了以下几个功能字段:

    • delayConsumer:是否延迟队列?默认为false,如果需要开启延迟消费的功能,需要配置为true
    • delayTime:delayConsumer为true的情况下该字段才会生效,单位:ms;如果设置了delayConsumer=true延迟队消费开启,但是未设置delayTime延迟消费时间,默认值是10分钟

    为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:

    new Queue(queueName, truefalsefalse, params)

     

    1. @Target({})
    2. @Retention(RetentionPolicy.RUNTIME)
    3. public @interface Exchange {
    4.     /**
    5.      * @return the exchange name.
    6.      */
    7.     String value();
    8.     /**
    9.      * The exchange type - only DIRECT, FANOUT TOPIC, and HEADERS exchanges are supported.
    10.      * @return the exchange type.
    11.      */
    12.     String type() default ExchangeTypes.TOPIC;
    13.     /**
    14.      * @return true if the exchange is to be declared as durable.
    15.      */
    16.     boolean durable() default true;
    17.     /**
    18.      * @return true if the exchange is to be declared as auto-delete.
    19.      */
    20.     boolean autoDelete() default false;
    21. }

    上面是@Exchange注解的源代码;原本@Bean中Exchange常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。

    为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:

    1. @Bean
    2.     public TopicExchange bizCcbDefaultTopicExchange() {
    3.         return new TopicExchange(BIZ_CCB_DEFAULT_TOPIC_EXCHANGE, truefalse);
    4.     }

    核心代码逻辑

    其实,实现思路非常简单,原有方式:通过开发者定义@Bean配置向spring容器中添加BeanDefinition并生成单例Bean;新的方式:根据开发者配置的注解信息集中式的生成BeanDefinition并注册到spring容器即可。

    至于绑定关系、队列、交换器向MQ消息中心注册的过程不受任何影响,因为本来@Bean就是在向容器注入bean而已;

    核心代码都在这一个RabbitMqEnhancerBeanDefinitionRegistry类,这个类实现了BeanDefinitionRegistryPostProcessor接口,当然BeanDefinitionRegistryPostProcessor也继承了BeanFactoryPostProcessor接口,只不过我们只使用了BeanDefinitionRegistryPostProcessor具有的特性,向容器中注入BeanDefinition信息;至于spring生成单例bean的过程,我们不去干预还是交给spring来自行完成。

    从@RabbitMessageContainer、@Queue、@Exchange、@QueueBinding注解中获取信息,创建相应的BeanDefinition并注册到容器中,由spring容器管理,充分利用spring现有机制,自动创建bean实例,尽可能减少硬编码干预spring的流程。

    源代码如下:

    1. /**
    2.  * @ClassName RabbitMqEnhancerBeanDefinitionRegistry
    3.  * @Description
    4.  * 处理@RabbitMessageContainer@Queue@Exchange@QueueBinding注解,以及创建相应的BeanDefinition注册到容器中;
    5.  * 由spring容器管理,充分利用spring现有机制,自动创建bean实例,尽可能减少硬编码干预spring的流程。
    6.  * 还有一种实现思路是:
    7.  *  自定义一个BeanPostProcessor的实现类,同时实现BeanFactoryAware接口(目的是获取到BeanFactory,用ApplicationContextAware也行,但是BeanFactoryAware更好些);
    8.  *  调用postProcessAfterInitialization方法,拦截Listener并识别注解信息,创建并注册BeanDefinition,调用BeanFactory的getBean方法,创建单例bean对象;
    9.  *  这种方式不仅个性化spring的BeanDefinition的注册,而且还个性化了bean的创建过程,因此不是最优的方式。
    10.  * @Author zhangyuxuan
    11.  * @Date 2023/9/13 15:29
    12.  * @Version 1.0
    13.  */
    14. public class RabbitMqEnhancerBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
    15.     private Environment environment;
    16.     /**
    17.      * 处理@RabbitMessageContainer@Queue@Exchange@QueueBinding注解,以及创建相应的BeanDefinition注册到容器中;
    18.      * 由spring容器管理,充分利用spring现有机制,自动创建bean实例,尽可能减少硬编码干预spring的流程。
    19.      *
    20.      * @param registry
    21.      * @throws BeansException
    22.      */
    23.     @Override
    24.     public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
    25.         for (String beanDefinitionName : registry.getBeanDefinitionNames()) {
    26.             BeanFactory beanFactory = (BeanFactory) registry;
    27.             //获取bean对应的Class
    28.             Class type = beanFactory.getType(beanDefinitionName);
    29.             //获取RabbitMessageContainer注解
    30.             RabbitMessageContainer rabbitMessageContainer = AnnotationUtils.findAnnotation(type, RabbitMessageContainer.class);
    31.             if (rabbitMessageContainer == null) {
    32.                 continue;
    33.             }
    34.             //获取QueueBinding注解
    35.             QueueBinding[] bindings = rabbitMessageContainer.bindings();
    36.             if (bindings.length == 0) {
    37.                 continue;
    38.             }
    39.             //存储queue信息,都是实际消费消息 绑定Listener的队列
    40.             List queueNameList = new ArrayList<>();
    41.             // 这里为什么是定义数组呢,因为同一个Container是可以绑定多个队列的,因此这里是数组;
    42.             for (QueueBinding binding : bindings) {
    43.                 Queue queue = binding.queue();
    44.                 Exchange exchange = binding.exchange();
    45.                 //是否开启延迟消费功能
    46.                 boolean needDelay = queue.delayConsumer();
    47.                 //是否开启重试功能
    48.                 boolean needRetry = rabbitMessageContainer.needRetry();
    49.                 //死信重试路由key
    50.                 String retryRoutingKey = obtainDoConsumeQueue(queue, needDelay) + DL_ROUTING_KEY_SUFFIX;
    51.                 //延迟消费 实际消费的交换器
    52.                 String exchangeForDelay = environment.getProperty("spring.application.name""") + DELAY_EXCHANGE_NAME_SUFFIX;
    53.                 //失败重试 死信交换器
    54.                 String exchangeForDl = environment.getProperty("spring.application.name""") + DL_EXCHANGE_NAME_SUFFIX;
    55.                 //失败重试 重试交换器
    56.                 String exchangeForRetry = environment.getProperty("spring.application.name""") + RETRY_EXCHANGE_NAME_SUFFIX;
    57.                 if (needDelay) {//延迟消费
    58.                     String delayRoutingKey = queue.value() + DELAY_CONSUME_ROUTE_SUFFIX;//用于延迟消费
    59.                     //用户定义的原队列
    60.                     BindingWrapper bindingWrapper = BindingWrapper.generateBinding(binding.value(), binding.key())
    61.                             .buildQueue(queue.value(), obtainMapForDelayQueue(delayRoutingKey, exchangeForDelay, queue.delayTime()), queue.durable(), queue.exclusive(), queue.autoDelete())
    62.                             .buildExchange(exchange.value(), exchange.type(), exchange.durable(), exchange.autoDelete());
    63.                     //注册用户定义的原队列相关配置
    64.                     configRabbitMq(registry, bindingWrapper, true);
    65.                     //实际消费消息的队列
    66.                     BindingWrapper bindingWrapperConsume = BindingWrapper.generateBinding(binding.value() + DELAY_CONSUME_BINDING_SUFFIX, delayRoutingKey)
    67.                             .buildQueue(obtainDoConsumeQueue(queue, true), obtainMapForConsumeQueue(needRetry, retryRoutingKey, exchangeForDl), queue.durable(), queue.exclusive(), queue.autoDelete())
    68.                             .buildExchange(exchangeForDelay, exchange.type(), exchange.durable(), exchange.autoDelete());
    69.                     //注册实际消费消息的队列相关配置,延迟交换器已经在配置中注册
    70.                     configRabbitMq(registry, bindingWrapperConsume, false);
    71.                     //存储queue信息,都是实际消费消息 绑定Listener的队列
    72.                     queueNameList.add(bindingWrapperConsume.getQueueWrapper().getQueueName());
    73.                 } else {//非延迟消费
    74.                     BindingWrapper bindingWrapper = BindingWrapper.generateBinding(binding.value(), binding.key())
    75.                             .buildQueue(queue.value(), obtainMapForConsumeQueue(needRetry, retryRoutingKey, exchangeForDl), queue.durable(), queue.exclusive(), queue.autoDelete())
    76.                             .buildExchange(exchange.value(), exchange.type(), exchange.durable(), exchange.autoDelete());//用户定义的原队列
    77.                     configRabbitMq(registry, bindingWrapper, true);
    78.                     //存储queue信息,都是实际消费消息 绑定Listener的队列
    79.                     queueNameList.add(bindingWrapper.getQueueWrapper().getQueueName());
    80.                 }
    81.                 if (needRetry) {//是否需要重试
    82.                     //死信队列
    83.                     BindingWrapper bindingWrapperDl = BindingWrapper.generateBinding(binding.value() + DL_BINDING_SUFFIX, retryRoutingKey)
    84.                             .buildQueue(queue.value() + DL_QUEUE_SUFFIX, obtainMapForDlQueue(retryRoutingKey, exchangeForRetry, rabbitMessageContainer.retryTimeInterval()), queue.durable(), queue.exclusive(), queue.autoDelete())
    85.                             .buildExchange(exchangeForDl, DIRECT, exchange.durable(), exchange.autoDelete());
    86.                     //注册死信队列相关配置,死信交换器已经在配置中注册
    87.                     configRabbitMq(registry, bindingWrapperDl, false);
    88.                     //重试队列 用于重新消费
    89.                     BindingWrapper bindingWrapperRetry = BindingWrapper.generateBinding(binding.value() + RETRY_BINDING_SUFFIX, retryRoutingKey)
    90.                             .buildQueue(obtainDoConsumeQueue(queue, needDelay), Collections.emptyMap(), queue.durable(), queue.exclusive(), queue.autoDelete())
    91.                             .buildExchange(exchangeForRetry, exchange.type(), exchange.durable(), exchange.autoDelete());
    92.                     // 向容器中注册binding的BeanDefinition,队列复用用户定义的,重试交换器已经在配置中创建
    93.                     registryBinding(registry, bindingWrapperRetry);
    94.                 }
    95.             }
    96.             // 向容器中注册container的BeanDefinition
    97.             registryContainer(registry, beanDefinitionName, rabbitMessageContainer, queueNameList);
    98.         }
    99.     }
    100.     /**
    101.      * 因为延迟消费情况的存在,因此需要获取实际消费队列的逻辑
    102.      * @param queue
    103.      * @param needDelay
    104.      * @return
    105.      */
    106.     private String obtainDoConsumeQueue(Queue queue, boolean needDelay) {
    107.         return needDelay ? queue.value() + DELAY_CONSUME_QUEUE_SUFFIX : queue.value();
    108.     }
    109.     /**
    110.      * 向容器中注册mq的配置,包括queue、exchange、binding
    111.      * @param registry
    112.      * @param bindingWrapper
    113.      */
    114.     private void configRabbitMq(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper, boolean isNeedCreateExchange) {
    115.         // 向容器中注册queue的BeanDefinition
    116.         registryQueue(registry, bindingWrapper);
    117.         // 向容器中注册exchange的BeanDefinition
    118.         if (isNeedCreateExchange) {
    119.             registryExchangeIfNecessary(registry, bindingWrapper);
    120.         }
    121.         // 向容器中注册binding的BeanDefinition
    122.         registryBinding(registry, bindingWrapper);
    123.     }
    124.     /**
    125.      * 向容器中注册container的BeanDefinition
    126.      * @param registry
    127.      * @param beanDefinitionName
    128.      * @param rabbitMessageContainer
    129.      * @param queueNameList
    130.      */
    131.     private void registryContainer(BeanDefinitionRegistry registry, String beanDefinitionName, RabbitMessageContainer rabbitMessageContainer, List queueNameList) {
    132.         ManagedArray managedArray = new ManagedArray("org.springframework.amqp.core.Queue", queueNameList.size());
    133.         for (String queueName : queueNameList) {
    134.             managedArray.add(new RuntimeBeanReference(queueName));
    135.         }
    136.         AbstractBeanDefinition containerBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(SimpleMessageListenerContainer.class)
    137.                 .addConstructorArgReference("shadowConnectionFactory")
    138.                 .addPropertyValue("queues", managedArray)
    139.                 .addPropertyReference("messageListener", beanDefinitionName)
    140.                 .addPropertyValue("acknowledgeMode", rabbitMessageContainer.acknowledgeMode())
    141.                 .addPropertyValue("maxConcurrentConsumers", rabbitMessageContainer.maxConcurrentConsumers())
    142.                 .addPropertyValue("concurrentConsumers", rabbitMessageContainer.concurrentConsumers())
    143.                 .addPropertyValue("prefetchCount", rabbitMessageContainer.prefetchCount())
    144.                 .addPropertyValue("exposeListenerChannel", rabbitMessageContainer.exposeListenerChannel())
    145.                 .getBeanDefinition();
    146.         registry.registerBeanDefinition(rabbitMessageContainer.value(), containerBeanDefinition);
    147.     }
    148.     /**
    149.      * 向容器中注册queue的BeanDefinition
    150.      * @param registry
    151.      * @param bindingWrapper
    152.      */
    153.     private void registryQueue(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
    154.         BindingWrapper.QueueWrapper queueWrapper = bindingWrapper.getQueueWrapper();
    155.         AbstractBeanDefinition queueBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(org.springframework.amqp.core.Queue.class)
    156.                 .addConstructorArgValue(queueWrapper.getQueueName())
    157.                 .addConstructorArgValue(queueWrapper.isDurable())
    158.                 .addConstructorArgValue(queueWrapper.isExclusive())
    159.                 .addConstructorArgValue(queueWrapper.isAutoDelete())
    160.                 .addConstructorArgValue(queueWrapper.getParams())
    161.                 .getBeanDefinition();
    162.         registry.registerBeanDefinition(queueWrapper.getQueueName(), queueBeanDefinition);
    163.     }
    164.     /**
    165.      * 如果有必要,向容器注入交换器
    166.      * @param registry
    167.      * @param bindingWrapper
    168.      */
    169.     private void registryExchangeIfNecessary(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
    170.         // 如果容器中已经被ConfigurationClassPostProcessor添加了同名的Exchange的BeanDefinition,那就不在添加了;
    171.         // 一是兼容项目原有代码已经通过@Bean方式注入了BeanDefinition;
    172.         // 二是Exchange本来原则上就是应该尽可能服用的,所以多个Listener一定会存在使用相同的Exchange的情况;
    173.         if (!registry.containsBeanDefinition(bindingWrapper.getExchangeWrapper().getExchangeName())) {
    174.             registryExchange(registry, bindingWrapper);
    175.         }
    176.     }
    177.     /**
    178.      * 向容器中注册exchange的BeanDefinition
    179.      * @param registry
    180.      * @param bindingWrapper
    181.      */
    182.     private void registryExchange(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
    183.         BindingWrapper.ExchangeWrapper exchangeWrapper = bindingWrapper.getExchangeWrapper();
    184.         AbstractBeanDefinition exchangeBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(this.obtainExchangeType(exchangeWrapper.getType()))
    185.                 .addConstructorArgValue(exchangeWrapper.getExchangeName())
    186.                 .addConstructorArgValue(exchangeWrapper.isDurable())
    187.                 .addConstructorArgValue(exchangeWrapper.isAutoDelete())
    188.                 .getBeanDefinition();
    189.         registry.registerBeanDefinition(exchangeWrapper.getExchangeName(), exchangeBeanDefinition);
    190.     }
    191.     /**
    192.      * 向容器中注册binding的BeanDefinition
    193.      * @param registry
    194.      * @param bindingWrapper
    195.      */
    196.     private void registryBinding(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
    197.         AbstractBeanDefinition bindingBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(org.springframework.amqp.core.Binding.class)
    198.                 .addConstructorArgValue(bindingWrapper.getQueueWrapper().getQueueName())
    199.                 .addConstructorArgValue(Binding.DestinationType.QUEUE)
    200.                 .addConstructorArgValue(bindingWrapper.getExchangeWrapper().getExchangeName())
    201.                 .addConstructorArgValue(bindingWrapper.getKey())
    202.                 .addConstructorArgValue(Collections.emptyMap())
    203.                 .getBeanDefinition();
    204.         registry.registerBeanDefinition(bindingWrapper.getBindingName(), bindingBeanDefinition);
    205.     }
    206.     /**
    207.      * 延迟消费 存储消息的制造延迟效果 的队列 上面的param
    208.      * @return
    209.      */
    210.     private Map obtainMapForDelayQueue(String delayRoutingKey, String exchangeForConsume, long delayTime) {
    211.         Map paramsForDelay = new HashMap<>();
    212.         paramsForDelay.put(X_MESSAGE_TTL_DEFAULT, delayTime == -1 ? TTL_DEFAULT_VALUE : delayTime);//默认10分钟
    213.         paramsForDelay.put(X_DEAD_LETTER_EXCHANGE, exchangeForConsume);//延迟交换器
    214.         paramsForDelay.put(X_DEAD_LETTER_ROUTING_KEY, delayRoutingKey);//延迟消费路由key
    215.         return paramsForDelay;
    216.     }
    217.     /**
    218.      * 和Listener绑定,实际消费消息 的队列 上面的param
    219.      * @return
    220.      */
    221.     private Map obtainMapForConsumeQueue(boolean needRetry, String dlRoutingKey, String exchangeForDl) {
    222.         if (!needRetry) {
    223.             return Collections.emptyMap();
    224.         }
    225.         Map paramsForDl = new HashMap<>();
    226.         paramsForDl.put(X_DEAD_LETTER_EXCHANGE, exchangeForDl);//死信交换器
    227.         paramsForDl.put(X_DEAD_LETTER_ROUTING_KEY, dlRoutingKey);//死信消费路由key
    228.         return paramsForDl;
    229.     }
    230.     /**
    231.      * 重试场景下 死信队列 上面的param
    232.      * @return
    233.      */
    234.     private Map obtainMapForDlQueue(String bindingWrapperForRetry, String exchangeForRetry, long delayTime) {
    235.         Map paramsForOriginal = new HashMap<>();
    236.         paramsForOriginal.put(X_DEAD_LETTER_EXCHANGE, exchangeForRetry);//重试交换器
    237.         paramsForOriginal.put(X_DEAD_LETTER_ROUTING_KEY, bindingWrapperForRetry);//重试消费路由key
    238.         paramsForOriginal.put(X_MESSAGE_TTL_DEFAULT, delayTime == -1 ? TTL_DEFAULT_VALUE : delayTime);//默认10分钟
    239.         return paramsForOriginal;
    240.     }
    241.     /**
    242.      * 根据注解中的属性值,返回对应的交换机类型
    243.      * @param exchangeTypes
    244.      * @return
    245.      */
    246.     private Class obtainExchangeType(String exchangeTypes) {
    247.         switch (exchangeTypes) {
    248.             case DIRECT:
    249.                 return DirectExchange.class;
    250.             case FANOUT:
    251.                 return FanoutExchange.class;
    252.             case HEADERS:
    253.                 return HeadersExchange.class;
    254.             case TOPIC:
    255.             default:
    256.                 return TopicExchange.class;
    257.         }
    258.     }
    259.     @Override
    260.     public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    261.         //do nothing
    262.     }
    263.     @Override
    264.     public void setEnvironment(Environment environment) {
    265.         this.environment = environment;
    266.     }
    267. }

    MQ组件配置关系图

  • 相关阅读:
    Android中使用AlertDialog创建对话框
    10.5汇编语言整理
    11.3面试相关
    HBase基础
    RFID基础知识,都学会了吗
    撸视频号收益这个副业靠谱吗?
    静态HTML CSS个人网页作业源代码 (人物介绍)
    [面试直通版]操作系统之锁、同步与通信(下)
    《软件方法》第1章2023版连载(06)自测题
    JavaScript对象
  • 原文地址:https://blog.csdn.net/weixin_44182586/article/details/133313326