• Spring Kafka—— KafkaListenerEndpointRegistry 隐式注册分析


    由于我想在项目中实现基于 Spring kafka 动态连接 Kafka 服务,指定监听 Topic 并控制消费程序的启动和停止这样一个功能,所以就大概的了解了一下 Spring Kafka 的几个重要的类的概念,内容如下:

    1. ConsumerFactory
      • 作用:负责创建 Kafka 消费者的实例。ConsumerFactory 是一个简单的工厂类,用于封装消费者的配置(如bootstrap servers, key deserializer, value deserializer等)并生成Consumer实例。
      • 用法:通常在Spring配置类中定义,并通过依赖注入提供给KafkaListenerContainerFactory
    2. ConcurrentKafkaListenerContainerFactory
      • 作用:这个工厂类用于创建 ConcurrentMessageListenerContainer 实例,该容器管理多个Kafka MessageListenerContainer来提供并发消息消费。
      • 特点:可以设置并发消费的数量,即同时运行的MessageListenerContainer的数量。
        支持消息过滤、错误处理和事务管理。
      • 用法:在Spring配置类中定义,并设置其ConsumerFactory和其他相关配置。然后,可以通过@KafkaListener注解直接使用,Spring会自动使用这个工厂来创建监听器。
    3. KafkaListenerEndpointRegistry
      • 作用:这是一个管理类,用于管理应用中所有由@KafkaListener注解创建的消息监听器容器。
      • 特点:提供了启动和停止监听器的方法,可以在运行时控制监听器。
        可以用来查询当前所有注册的监听器的状态。
      • 用法:通常自动配置,可以通过自动注入到任何Spring管理的Bean中,用于运行时管理监听器。
    4. KafkaTemplate
      • 作用:这是一个高级抽象,用于生产消息到Kafka主题。
      • 特点:提供同步和异步发送消息的方法。
        支持事务消息发送。
      • 用法:定义在Spring配置类中,注入生产者工厂ProducerFactory,并用于应用中的消息发送。
    5. @KafkaListener
      作用:注解用于标记方法以作为Kafka消息的监听器,这些方法会自动被Spring容器管理,并在有新消息时触发。
      特点:
      可以指定主题、分区和消费组。
      支持并发消费。
      用法:放在组件的方法上,方法参数可以灵活地映射消息的key、value、headers等。

    从上面的内容可以看到,KafkaListenerEndpointRegistry 这个类是管理消息监听容器的,并提供了启动和停止监听器的方法,于是我就想创建这个类来完成我的需求功能。当我直接写如下内容时:

    @Component
    public class KafkaConfig {
    
        @Autowired
        private KafkaListenerEndpointRegistry registry;
    
        @PostConstruct
        public void init() {
            System.out.println(registry);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    IDEA提示了 Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found. 但是我启动 SpringBoot 项目却没有报错 :
    在这里插入图片描述
    我在我的项目中是没有加 @EnableKafka 这样的注解的,代码如下:

    @SpringBootApplication
    public class SpringKafkaExampleApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringKafkaExampleApplication.class, args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    引入的依赖:

    <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafkaartifactId>
            dependency>
    
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <optional>trueoptional>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
            <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafka-testartifactId>
                <scope>testscope>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    于是我就比较好奇,项目启动的时候是在什么地方声明了 KafkaListenerEndpointRegistry 这个 bean 的。

    KafkaListenerEndpointRegistry 隐式注册分析

    SpringBoot 对于 kafka 有如下的自动配置:

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(EnableKafka.class)
    class KafkaAnnotationDrivenConfiguration {
    
    	private final KafkaProperties properties;
    
    	private final RecordMessageConverter recordMessageConverter;
    
    	private final RecordFilterStrategy<Object, Object> recordFilterStrategy;
    
    	private final BatchMessageConverter batchMessageConverter;
    
    	private final KafkaTemplate<Object, Object> kafkaTemplate;
    
    	private final KafkaAwareTransactionManager<Object, Object> transactionManager;
    
    	private final ConsumerAwareRebalanceListener rebalanceListener;
    
    	private final CommonErrorHandler commonErrorHandler;
    
    	private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
    
    	private final RecordInterceptor<Object, Object> recordInterceptor;
    
    	KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
    			ObjectProvider<RecordMessageConverter> recordMessageConverter,
    			ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
    			ObjectProvider<BatchMessageConverter> batchMessageConverter,
    			ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
    			ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
    			ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
    			ObjectProvider<CommonErrorHandler> commonErrorHandler,
    			ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
    			ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
    		this.properties = properties;
    		this.recordMessageConverter = recordMessageConverter.getIfUnique();
    		this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
    		this.batchMessageConverter = batchMessageConverter
    			.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));
    		this.kafkaTemplate = kafkaTemplate.getIfUnique();
    		this.transactionManager = kafkaTransactionManager.getIfUnique();
    		this.rebalanceListener = rebalanceListener.getIfUnique();
    		this.commonErrorHandler = commonErrorHandler.getIfUnique();
    		this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
    		this.recordInterceptor = recordInterceptor.getIfUnique();
    	}
    
    	@Bean
    	@ConditionalOnMissingBean
    	ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
    		ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
    		configurer.setKafkaProperties(this.properties);
    		configurer.setBatchMessageConverter(this.batchMessageConverter);
    		configurer.setRecordMessageConverter(this.recordMessageConverter);
    		configurer.setRecordFilterStrategy(this.recordFilterStrategy);
    		configurer.setReplyTemplate(this.kafkaTemplate);
    		configurer.setTransactionManager(this.transactionManager);
    		configurer.setRebalanceListener(this.rebalanceListener);
    		configurer.setCommonErrorHandler(this.commonErrorHandler);
    		configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
    		configurer.setRecordInterceptor(this.recordInterceptor);
    		return configurer;
    	}
    
    	@Bean
    	@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    	ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
    			ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    			ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
    		ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    		configurer.configure(factory, kafkaConsumerFactory
    			.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
    		return factory;
    	}
    
    	@Configuration(proxyBeanMethods = false)
    	@EnableKafka
    	@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    	static class EnableKafkaConfiguration {
    
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    可以看到这个配置类里面有一个静态的内部类 EnableKafkaConfiguration 该类上声明了 @EnableKafka 注解,也就是说内部静态类EnableKafkaConfiguration使用了@EnableKafka注解,并且通过@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)确保如果Spring上下文中缺少相应的Bean,则自动激活@EnableKafka功能。这意味着,即便你没有在你的应用配置中显式添加@EnableKafka,这个内部类也可以根据条件自动注册所需的Bean,从而启用Kafka的支持。

    @EnableKafka 定义如下:

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(KafkaListenerConfigurationSelector.class)
    public @interface EnableKafka {
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这个注解的使用导致了KafkaListenerConfigurationSelector的激活,其源码如下:

    @Order
    public class KafkaListenerConfigurationSelector implements DeferredImportSelector {
    
    	@Override
    	public String[] selectImports(AnnotationMetadata importingClassMetadata) {
    		return new String[] { KafkaBootstrapConfiguration.class.getName() };
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    上面的代码中 DeferredImportSelector是Spring框架中一个特殊的接口,它继承自ImportSelector。它主要用于处理配置类的导入,允许更细致地控制配置类的加载顺序。这个接口特别适用于那些依赖于由Spring容器中其他Bean或配置动态决定的配置。
    KafkaListenerConfigurationSelector 这个类实现了DeferredImportSelector并通过selectImports方法返回了一个配置类名称的数组。这个方法指定了当Spring处理到这个选择器时,它应该导入KafkaBootstrapConfiguration类。

    KafkaBootstrapConfiguration 内容如下:

    public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
    
    	@Override
    	public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    		if (!registry.containsBeanDefinition(
    				KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
    
    			registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
    					new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
    		}
    
    		if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
    			registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
    					new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
    		}
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    KafkaBootstrapConfiguration 是一个实现了ImportBeanDefinitionRegistrar接口的类,主要用于程序化地注册Bean定义到Spring的ApplicationContext中。通过实现ImportBeanDefinitionRegistrar接口,这个类可以在Spring的配置阶段动态地添加Bean定义。

    在这个特定的实现中,KafkaBootstrapConfiguration检查特定的Kafka相关Bean(如KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME和KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)是否已经注册。如果这些Bean尚未注册,它会使用RootBeanDefinition手动注册这些Bean到Spring容器中。

    RootBeanDefinition 的功能

    RootBeanDefinition是Spring框架中用于定义Bean的一个核心类。它是BeanDefinition接口的一个直接实现,提供了一种配置Spring管理的Bean的方式,包括Bean的类类型、生命周期回调、依赖信息等。

    • Bean配置的详细定义:RootBeanDefinition允许开发者详细定义Bean的创建细节,如构造函数参数、属性值、初始化方法、销毁方法等。
    • 高级功能:它还支持更复杂的配置,如懒加载、自动装配模式、作用域和其他高级特性。
    • 程序化Bean注册:通过使用RootBeanDefinition,开发者可以在运行时动态地注册Bean,这对于条件配置或需要响应不同配置环境的高级用途尤为重要。

    KafkaBootstrapConfiguration类中,使用RootBeanDefinition来创建和注册KafkaListenerAnnotationBeanPostProcessorKafkaListenerEndpointRegistry类的实例,这些是设置和管理Kafka消息监听器所必需的。

    之后在AbstractBeanFactory会根据 beanName 获取到了 RootBeanDefinition 如下图所示:
    在这里插入图片描述
    然后在如下所示的位置:
    在这里插入图片描述
    程序创建了 beanName 为 org.springframework.kafka.config.internalKafkaListenerEndpointRegistry 的实例,具体创建实例的位置如下:
    在这里插入图片描述
    从调试中可以看到此处实例化了 KafkaListenerEndpointRegistry
    所以当我们 springboot 项目引入了

     <dependency>
         <groupId>org.springframework.kafkagroupId>
         <artifactId>spring-kafkaartifactId>
     dependency>
    
    • 1
    • 2
    • 3
    • 4

    依赖后,即使我们不显示的声明 @EnableKafka 程序也会进行初始化相应的配置。

    总结

    当Spring Boot项目中引入Spring Kafka依赖后,即使我们没有显式声明@EnableKafka,系统仍会自动进行相应的配置。因此,在项目中尝试注入KafkaListenerEndpointRegistry时,尽管IDE可能会提示“Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found.”,项目依然能够正常启动。这是因为KafkaListenerEndpointRegistry在Spring Kafka的自动配置过程中已被隐式注册。

  • 相关阅读:
    springboot昆明学院档案管理系统毕业设计源码311758
    【Numpy总结】第三节:Numpy创建数组
    【AI副业指南】用AI做心理测试图文号,单月稳赚7000+(附详细教程)
    Ubuntu18.04遇到的nodejs的坑记录
    JavaEE企业开发新技术1
    如何在 Spring Security 中自定义权限表达式
    docker 部署问题集锦
    ☕Java 面向对象进阶内容
    DF-GAN实验复现——复现DFGAN详细步骤 及使用MobaXtem实现远程端口到本机端口的转发查看Tensorboard
    jsp九大内置对象
  • 原文地址:https://blog.csdn.net/lt5227/article/details/138085715