RabbitMessageContainer注解 的主要作用就是 替换掉@Configuration配置类中的各种@Bean配置;
采用注解的方式可以让我们 固化配置,降低代码编写复杂度、减少配置错误情况的发生,提升编码调试的效率、提高业务的可用性。
为什么说“提高业务的可用性”呢?因为,组件默认配置了死信队列机制,当消费失败的时候,将异常抛出即可重试,避免因为没有配置死信队列而导致消息丢失。(如果继承AbstractJdkSerializeListener/AbstractJsonSerializeListener可以在重试一定次数后将消息落库并且丢弃)
该组件使用Spring Boot的自动装配能力,只需要引入pom依赖即可完成接入。
-
com.ccbscf -
ccbscf-biz-enhancer-rabbitmq-starter -
1.0.1-SNAPSHOT
简单来说,以前@Bean注入方式常用的能力,这个组件都支持,以下是具体注解信息及属性配置:
- /**
- * 向spring中注入SimpleMessageListenerContainer容器
- * 暂时只对Container的acknowledgeMode、exposeListenerChannel、prefetchCount、concurrentConsumers、maxConcurrentConsumers提供了赋值的扩展,如果需要其他的字段赋值,需要升级组件
- */
- @Target({ElementType.TYPE})
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface RabbitMessageContainer {
- /**
- * container的name,向spring容器注入bean
- * @return
- */
- String value();
- /**
- * 定义绑定关系,队列、交换器、路由key的定义都在这里面
- * 这里为什么是定义数组呢,因为同一个Container是可以绑定多个队列的,因此这里是数组;
- * @return
- */
- QueueBinding[] bindings();
-
- /**
- * @return
- * @see AbstractMessageListenerContainer#setAcknowledgeMode(org.springframework.amqp.core.AcknowledgeMode)
- */
- AcknowledgeMode acknowledgeMode() default AcknowledgeMode.MANUAL;
-
- /**
- * @return
- * @see AbstractMessageListenerContainer#setExposeListenerChannel(boolean)
- */
- boolean exposeListenerChannel() default true;
-
- /**
- * @return
- * @see SimpleMessageListenerContainer#setPrefetchCount(int)
- */
- int prefetchCount() default 5;
-
- /**
- * @return
- * @see SimpleMessageListenerContainer#setConcurrentConsumers(int)
- */
- int concurrentConsumers() default 1;
-
- /**
- * @return
- * @see SimpleMessageListenerContainer#setMaxConcurrentConsumers(int)
- */
- int maxConcurrentConsumers() default 1;
-
- /**
- * 失败 抛出异常 捕捉到异常以后 是否进行重试 默认重试
- * @return
- */
- boolean needRetry() default true;
-
- /**
- * 自定义的Listener维度的重试次数上限
- * @return
- */
- int customerRetriesLimitForListener() default -1;
-
- /**
- * 重试时间间隔
- * @return
- */
- long retryTimeInterval() default -1;
- }
上面是@RabbitMessageContainer注解的源代码;原本@Bean中SimpleMessageListenerContainer常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。
除了实现@Bean方式常用字段,另外增加了以下几个功能字段:
为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:
- public static SimpleMessageListenerContainer buildSimpleMessageListenerContainer(Queue queue, ConnectionFactory connectionFactory, Object messageListener) {
- SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
- simpleMessageListenerContainer.setQueues(queue);
- simpleMessageListenerContainer.setMaxConcurrentConsumers(1);
- simpleMessageListenerContainer.setConcurrentConsumers(1);
- simpleMessageListenerContainer.setPrefetchCount(5);
- simpleMessageListenerContainer.setExposeListenerChannel(true);
- simpleMessageListenerContainer.setMessageListener(messageListener);
- simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- return simpleMessageListenerContainer;
- }
- @Target({})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface QueueBinding {
-
- /**
- * 绑定关系的name,主要用于向容器中注入bean的名称
- * @return
- */
- String value();
-
- /**
- * @return the queue.
- */
- Queue queue();
-
- /**
- * @return the exchange.
- */
- Exchange exchange();
-
- /**
- * @return the routing key or pattern for the binding.
- */
- String key() default "";
- }
上面是@QueueBinding注解的源代码;原本@Bean中Binding常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。
为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:
- @Bean
- public Binding sendSuperviseBinding(TopicExchange approveDocDatumTopicExchange) {
- return BindingBuilder.bind(sendSuperviseQueue()).to(approveDocDatumTopicExchange).with(DOC_DATUM_TOPIC_APPROVE_ROUTING_KEY);
- }
- @Target({})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface Queue {
- /**
- * @return the queue name or "" for a generated queue name (default).
- */
- String value();
-
- /**
- * @return true if the queue is to be declared as durable.
- */
- boolean durable() default true;
-
- /**
- * @return true if the queue is to be declared as exclusive.
- */
- boolean exclusive() default false;
-
- /**
- * @return true if the queue is to be declared as auto-delete.
- */
- boolean autoDelete() default false;
-
- /**
- * 是否延迟队列
- * @return
- */
- boolean delayConsumer() default false;
-
- /**
- * delayConsumer为true的情况下该字段才会生效,单位:ms
- * 如果设置了delayConsumer=true延迟队消费开启,但是未设置delayTime延迟消费时间,默认值是10分钟
- * @return
- */
- long delayTime() default -1;
- }
上面是@Queue注解的源代码;原本@Bean中Queue常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。
除了实现@Bean方式常用字段,另外增加了以下几个功能字段:
为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:
new Queue(queueName, true, false, false, params)
- @Target({})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface Exchange {
-
- /**
- * @return the exchange name.
- */
- String value();
-
- /**
- * The exchange type - only DIRECT, FANOUT TOPIC, and HEADERS exchanges are supported.
- * @return the exchange type.
- */
- String type() default ExchangeTypes.TOPIC;
-
- /**
- * @return true if the exchange is to be declared as durable.
- */
- boolean durable() default true;
-
- /**
- * @return true if the exchange is to be declared as auto-delete.
- */
- boolean autoDelete() default false;
- }
上面是@Exchange注解的源代码;原本@Bean中Exchange常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。
为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:
- @Bean
- public TopicExchange bizCcbDefaultTopicExchange() {
- return new TopicExchange(BIZ_CCB_DEFAULT_TOPIC_EXCHANGE, true, false);
- }
其实,实现思路非常简单,原有方式:通过开发者定义@Bean配置向spring容器中添加BeanDefinition并生成单例Bean;新的方式:根据开发者配置的注解信息集中式的生成BeanDefinition并注册到spring容器即可。
至于绑定关系、队列、交换器向MQ消息中心注册的过程不受任何影响,因为本来@Bean就是在向容器注入bean而已;
核心代码都在这一个RabbitMqEnhancerBeanDefinitionRegistry类,这个类实现了BeanDefinitionRegistryPostProcessor接口,当然BeanDefinitionRegistryPostProcessor也继承了BeanFactoryPostProcessor接口,只不过我们只使用了BeanDefinitionRegistryPostProcessor具有的特性,向容器中注入BeanDefinition信息;至于spring生成单例bean的过程,我们不去干预还是交给spring来自行完成。
从@RabbitMessageContainer、@Queue、@Exchange、@QueueBinding注解中获取信息,创建相应的BeanDefinition并注册到容器中,由spring容器管理,充分利用spring现有机制,自动创建bean实例,尽可能减少硬编码干预spring的流程。
源代码如下:
- /**
- * @ClassName RabbitMqEnhancerBeanDefinitionRegistry
- * @Description
- * 处理@RabbitMessageContainer、@Queue、@Exchange、@QueueBinding注解,以及创建相应的BeanDefinition注册到容器中;
- * 由spring容器管理,充分利用spring现有机制,自动创建bean实例,尽可能减少硬编码干预spring的流程。
- * 还有一种实现思路是:
- * 自定义一个BeanPostProcessor的实现类,同时实现BeanFactoryAware接口(目的是获取到BeanFactory,用ApplicationContextAware也行,但是BeanFactoryAware更好些);
- * 调用postProcessAfterInitialization方法,拦截Listener并识别注解信息,创建并注册BeanDefinition,调用BeanFactory的getBean方法,创建单例bean对象;
- * 这种方式不仅个性化spring的BeanDefinition的注册,而且还个性化了bean的创建过程,因此不是最优的方式。
- * @Author zhangyuxuan
- * @Date 2023/9/13 15:29
- * @Version 1.0
- */
- public class RabbitMqEnhancerBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
-
- private Environment environment;
-
- /**
- * 处理@RabbitMessageContainer、@Queue、@Exchange、@QueueBinding注解,以及创建相应的BeanDefinition注册到容器中;
- * 由spring容器管理,充分利用spring现有机制,自动创建bean实例,尽可能减少硬编码干预spring的流程。
- *
- * @param registry
- * @throws BeansException
- */
- @Override
- public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
- for (String beanDefinitionName : registry.getBeanDefinitionNames()) {
- BeanFactory beanFactory = (BeanFactory) registry;
- //获取bean对应的Class
- Class> type = beanFactory.getType(beanDefinitionName);
- //获取RabbitMessageContainer注解
- RabbitMessageContainer rabbitMessageContainer = AnnotationUtils.findAnnotation(type, RabbitMessageContainer.class);
- if (rabbitMessageContainer == null) {
- continue;
- }
- //获取QueueBinding注解
- QueueBinding[] bindings = rabbitMessageContainer.bindings();
- if (bindings.length == 0) {
- continue;
- }
- //存储queue信息,都是实际消费消息 绑定Listener的队列
- List
queueNameList = new ArrayList<>(); - // 这里为什么是定义数组呢,因为同一个Container是可以绑定多个队列的,因此这里是数组;
- for (QueueBinding binding : bindings) {
- Queue queue = binding.queue();
- Exchange exchange = binding.exchange();
- //是否开启延迟消费功能
- boolean needDelay = queue.delayConsumer();
- //是否开启重试功能
- boolean needRetry = rabbitMessageContainer.needRetry();
- //死信重试路由key
- String retryRoutingKey = obtainDoConsumeQueue(queue, needDelay) + DL_ROUTING_KEY_SUFFIX;
- //延迟消费 实际消费的交换器
- String exchangeForDelay = environment.getProperty("spring.application.name", "") + DELAY_EXCHANGE_NAME_SUFFIX;
- //失败重试 死信交换器
- String exchangeForDl = environment.getProperty("spring.application.name", "") + DL_EXCHANGE_NAME_SUFFIX;
- //失败重试 重试交换器
- String exchangeForRetry = environment.getProperty("spring.application.name", "") + RETRY_EXCHANGE_NAME_SUFFIX;
- if (needDelay) {//延迟消费
- String delayRoutingKey = queue.value() + DELAY_CONSUME_ROUTE_SUFFIX;//用于延迟消费
- //用户定义的原队列
- BindingWrapper bindingWrapper = BindingWrapper.generateBinding(binding.value(), binding.key())
- .buildQueue(queue.value(), obtainMapForDelayQueue(delayRoutingKey, exchangeForDelay, queue.delayTime()), queue.durable(), queue.exclusive(), queue.autoDelete())
- .buildExchange(exchange.value(), exchange.type(), exchange.durable(), exchange.autoDelete());
- //注册用户定义的原队列相关配置
- configRabbitMq(registry, bindingWrapper, true);
- //实际消费消息的队列
- BindingWrapper bindingWrapperConsume = BindingWrapper.generateBinding(binding.value() + DELAY_CONSUME_BINDING_SUFFIX, delayRoutingKey)
- .buildQueue(obtainDoConsumeQueue(queue, true), obtainMapForConsumeQueue(needRetry, retryRoutingKey, exchangeForDl), queue.durable(), queue.exclusive(), queue.autoDelete())
- .buildExchange(exchangeForDelay, exchange.type(), exchange.durable(), exchange.autoDelete());
- //注册实际消费消息的队列相关配置,延迟交换器已经在配置中注册
- configRabbitMq(registry, bindingWrapperConsume, false);
- //存储queue信息,都是实际消费消息 绑定Listener的队列
- queueNameList.add(bindingWrapperConsume.getQueueWrapper().getQueueName());
- } else {//非延迟消费
- BindingWrapper bindingWrapper = BindingWrapper.generateBinding(binding.value(), binding.key())
- .buildQueue(queue.value(), obtainMapForConsumeQueue(needRetry, retryRoutingKey, exchangeForDl), queue.durable(), queue.exclusive(), queue.autoDelete())
- .buildExchange(exchange.value(), exchange.type(), exchange.durable(), exchange.autoDelete());//用户定义的原队列
- configRabbitMq(registry, bindingWrapper, true);
- //存储queue信息,都是实际消费消息 绑定Listener的队列
- queueNameList.add(bindingWrapper.getQueueWrapper().getQueueName());
- }
- if (needRetry) {//是否需要重试
- //死信队列
- BindingWrapper bindingWrapperDl = BindingWrapper.generateBinding(binding.value() + DL_BINDING_SUFFIX, retryRoutingKey)
- .buildQueue(queue.value() + DL_QUEUE_SUFFIX, obtainMapForDlQueue(retryRoutingKey, exchangeForRetry, rabbitMessageContainer.retryTimeInterval()), queue.durable(), queue.exclusive(), queue.autoDelete())
- .buildExchange(exchangeForDl, DIRECT, exchange.durable(), exchange.autoDelete());
- //注册死信队列相关配置,死信交换器已经在配置中注册
- configRabbitMq(registry, bindingWrapperDl, false);
- //重试队列 用于重新消费
- BindingWrapper bindingWrapperRetry = BindingWrapper.generateBinding(binding.value() + RETRY_BINDING_SUFFIX, retryRoutingKey)
- .buildQueue(obtainDoConsumeQueue(queue, needDelay), Collections.emptyMap(), queue.durable(), queue.exclusive(), queue.autoDelete())
- .buildExchange(exchangeForRetry, exchange.type(), exchange.durable(), exchange.autoDelete());
- // 向容器中注册binding的BeanDefinition,队列复用用户定义的,重试交换器已经在配置中创建
- registryBinding(registry, bindingWrapperRetry);
- }
- }
- // 向容器中注册container的BeanDefinition
- registryContainer(registry, beanDefinitionName, rabbitMessageContainer, queueNameList);
- }
- }
-
- /**
- * 因为延迟消费情况的存在,因此需要获取实际消费队列的逻辑
- * @param queue
- * @param needDelay
- * @return
- */
- private String obtainDoConsumeQueue(Queue queue, boolean needDelay) {
- return needDelay ? queue.value() + DELAY_CONSUME_QUEUE_SUFFIX : queue.value();
- }
-
- /**
- * 向容器中注册mq的配置,包括queue、exchange、binding
- * @param registry
- * @param bindingWrapper
- */
- private void configRabbitMq(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper, boolean isNeedCreateExchange) {
- // 向容器中注册queue的BeanDefinition
- registryQueue(registry, bindingWrapper);
- // 向容器中注册exchange的BeanDefinition
- if (isNeedCreateExchange) {
- registryExchangeIfNecessary(registry, bindingWrapper);
- }
- // 向容器中注册binding的BeanDefinition
- registryBinding(registry, bindingWrapper);
- }
-
- /**
- * 向容器中注册container的BeanDefinition
- * @param registry
- * @param beanDefinitionName
- * @param rabbitMessageContainer
- * @param queueNameList
- */
- private void registryContainer(BeanDefinitionRegistry registry, String beanDefinitionName, RabbitMessageContainer rabbitMessageContainer, List
queueNameList) { - ManagedArray managedArray = new ManagedArray("org.springframework.amqp.core.Queue", queueNameList.size());
- for (String queueName : queueNameList) {
- managedArray.add(new RuntimeBeanReference(queueName));
- }
- AbstractBeanDefinition containerBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(SimpleMessageListenerContainer.class)
- .addConstructorArgReference("shadowConnectionFactory")
- .addPropertyValue("queues", managedArray)
- .addPropertyReference("messageListener", beanDefinitionName)
- .addPropertyValue("acknowledgeMode", rabbitMessageContainer.acknowledgeMode())
- .addPropertyValue("maxConcurrentConsumers", rabbitMessageContainer.maxConcurrentConsumers())
- .addPropertyValue("concurrentConsumers", rabbitMessageContainer.concurrentConsumers())
- .addPropertyValue("prefetchCount", rabbitMessageContainer.prefetchCount())
- .addPropertyValue("exposeListenerChannel", rabbitMessageContainer.exposeListenerChannel())
- .getBeanDefinition();
- registry.registerBeanDefinition(rabbitMessageContainer.value(), containerBeanDefinition);
- }
-
- /**
- * 向容器中注册queue的BeanDefinition
- * @param registry
- * @param bindingWrapper
- */
- private void registryQueue(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
- BindingWrapper.QueueWrapper queueWrapper = bindingWrapper.getQueueWrapper();
- AbstractBeanDefinition queueBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(org.springframework.amqp.core.Queue.class)
- .addConstructorArgValue(queueWrapper.getQueueName())
- .addConstructorArgValue(queueWrapper.isDurable())
- .addConstructorArgValue(queueWrapper.isExclusive())
- .addConstructorArgValue(queueWrapper.isAutoDelete())
- .addConstructorArgValue(queueWrapper.getParams())
- .getBeanDefinition();
- registry.registerBeanDefinition(queueWrapper.getQueueName(), queueBeanDefinition);
- }
-
- /**
- * 如果有必要,向容器注入交换器
- * @param registry
- * @param bindingWrapper
- */
- private void registryExchangeIfNecessary(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
- // 如果容器中已经被ConfigurationClassPostProcessor添加了同名的Exchange的BeanDefinition,那就不在添加了;
- // 一是兼容项目原有代码已经通过@Bean方式注入了BeanDefinition;
- // 二是Exchange本来原则上就是应该尽可能服用的,所以多个Listener一定会存在使用相同的Exchange的情况;
- if (!registry.containsBeanDefinition(bindingWrapper.getExchangeWrapper().getExchangeName())) {
- registryExchange(registry, bindingWrapper);
- }
- }
-
- /**
- * 向容器中注册exchange的BeanDefinition
- * @param registry
- * @param bindingWrapper
- */
- private void registryExchange(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
- BindingWrapper.ExchangeWrapper exchangeWrapper = bindingWrapper.getExchangeWrapper();
- AbstractBeanDefinition exchangeBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(this.obtainExchangeType(exchangeWrapper.getType()))
- .addConstructorArgValue(exchangeWrapper.getExchangeName())
- .addConstructorArgValue(exchangeWrapper.isDurable())
- .addConstructorArgValue(exchangeWrapper.isAutoDelete())
- .getBeanDefinition();
- registry.registerBeanDefinition(exchangeWrapper.getExchangeName(), exchangeBeanDefinition);
- }
-
- /**
- * 向容器中注册binding的BeanDefinition
- * @param registry
- * @param bindingWrapper
- */
- private void registryBinding(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
- AbstractBeanDefinition bindingBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(org.springframework.amqp.core.Binding.class)
- .addConstructorArgValue(bindingWrapper.getQueueWrapper().getQueueName())
- .addConstructorArgValue(Binding.DestinationType.QUEUE)
- .addConstructorArgValue(bindingWrapper.getExchangeWrapper().getExchangeName())
- .addConstructorArgValue(bindingWrapper.getKey())
- .addConstructorArgValue(Collections.
emptyMap()) - .getBeanDefinition();
- registry.registerBeanDefinition(bindingWrapper.getBindingName(), bindingBeanDefinition);
- }
-
- /**
- * 延迟消费 存储消息的制造延迟效果 的队列 上面的param
- * @return
- */
- private Map
obtainMapForDelayQueue(String delayRoutingKey, String exchangeForConsume, long delayTime) { - Map
paramsForDelay = new HashMap<>(); - paramsForDelay.put(X_MESSAGE_TTL_DEFAULT, delayTime == -1 ? TTL_DEFAULT_VALUE : delayTime);//默认10分钟
- paramsForDelay.put(X_DEAD_LETTER_EXCHANGE, exchangeForConsume);//延迟交换器
- paramsForDelay.put(X_DEAD_LETTER_ROUTING_KEY, delayRoutingKey);//延迟消费路由key
- return paramsForDelay;
- }
-
- /**
- * 和Listener绑定,实际消费消息 的队列 上面的param
- * @return
- */
- private Map
obtainMapForConsumeQueue(boolean needRetry, String dlRoutingKey, String exchangeForDl) { - if (!needRetry) {
- return Collections.emptyMap();
- }
- Map
paramsForDl = new HashMap<>(); - paramsForDl.put(X_DEAD_LETTER_EXCHANGE, exchangeForDl);//死信交换器
- paramsForDl.put(X_DEAD_LETTER_ROUTING_KEY, dlRoutingKey);//死信消费路由key
- return paramsForDl;
- }
-
- /**
- * 重试场景下 死信队列 上面的param
- * @return
- */
- private Map
obtainMapForDlQueue(String bindingWrapperForRetry, String exchangeForRetry, long delayTime) { - Map
paramsForOriginal = new HashMap<>(); - paramsForOriginal.put(X_DEAD_LETTER_EXCHANGE, exchangeForRetry);//重试交换器
- paramsForOriginal.put(X_DEAD_LETTER_ROUTING_KEY, bindingWrapperForRetry);//重试消费路由key
- paramsForOriginal.put(X_MESSAGE_TTL_DEFAULT, delayTime == -1 ? TTL_DEFAULT_VALUE : delayTime);//默认10分钟
- return paramsForOriginal;
- }
-
- /**
- * 根据注解中的属性值,返回对应的交换机类型
- * @param exchangeTypes
- * @return
- */
- private Class> obtainExchangeType(String exchangeTypes) {
- switch (exchangeTypes) {
- case DIRECT:
- return DirectExchange.class;
- case FANOUT:
- return FanoutExchange.class;
- case HEADERS:
- return HeadersExchange.class;
- case TOPIC:
- default:
- return TopicExchange.class;
- }
- }
-
- @Override
- public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
- //do nothing
- }
-
- @Override
- public void setEnvironment(Environment environment) {
- this.environment = environment;
- }
- }
