• Kafka多生产者消费者自动配置


    背景

    项目中不同的业务可能会使用多个kafka,按默认的Kafka配置,最多是支持消费者和生产者使用不同的Kafka,如果两个生产者使用不同的Kafka则需要自定义配置,生成对应的bean。

    解决方案

    多生产者,多消费者,使用不同的前缀来区分,根据前缀来区分配置,加载配置,实例化对应前缀的KafkaProperties kafkaListenerContainerFactory KafkaTemplate ,每个bean的名称都是带前缀的,使用的时候,按照需要注入对应的bean。

    YML配置

    spring:
      kafka:
        product:
          bootstrap-servers: 55.1.40.231:9091,55.6.70.231:9091,55.5.70.231:9091
          properties:
            sasl:
              mechanism: PLAIN
              jaas:
                config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="xxxx";
            security:
              protocol: SASL_PLAINTEXT
          producer:
            retries: 0
            acks: -1
            batch-size: 16384
            linger-ms: 0
            buffer-memory: 33554432
          consumer:
            group-id: consumer-group-id
            enable-auto-commit: true
            auto-commit-interval-ms: 1000
            auto-offset-reset: latest
            session-timeout-ms: 120000 
            request-timeout-ms: 180000
        order:
          bootstrap-servers: 55.10.33.132:9091,55.10.33.132:9092,55.10.33.132:9093,55.10.33.132:9094,55.10.33.132:9095,55.10.33.132:9096,55.10.33.132:9097,55.10.33.132:9098,55.10.33.132:9099,55.10.33.132:9100
          properties:
            sasl:
              mechanism: PLAIN
              jaas:
                config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user_order" password="xxxxxxx";
            security:
              protocol: SASL_PLAINTEXT
          producer:
            retries: 3
            acks: -1
            batch-size: 16384
            linger-ms: 0
            buffer-memory: 33554432
          consumer:
            group-id: order-migration
            enable-auto-commit: true
            auto-commit-interval-ms: 1000
            auto-offset-reset: latest
            session-timeout-ms: 120000
            request-timeout-ms: 180000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    自定义KafkaProperties

    使用KafkaProperties接收配置,但是需要修改下前缀,但是KafkaProperties源码改不了,新写一个类继承KafkaProperties

    @Component
    @Primary
    @ConfigurationProperties(prefix = "spring.kafka.order")
    public class OrderKafkaProperties extends KafkaProperties{
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果没有Kafka默认配置,Kafka会自动实例化默认的KafkaProperties,如果有多个KafkaProperties实例,需要指定一个首选的bean,否则KafkaAnnotationDrivenConfiguration类中构造函数会报错。

    所以在其中一个加上@Primary注解

    KafkaTemplate和KafkaListenerContainerFactory配置

    有了KafkaProperties就可以生成KafkaTemplateKafkaListenerContainerFactory实例

    @Configuration
    public class KafkaConfig {
    
        @Autowired
        private OrderKafkaProperties orderKafkaProperties;
    
        @Bean("orderKafkaTemplate")
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
        private ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        private Map<String, Object> producerConfigs() {
            return contractKafkaProperties.buildProducerProperties();
        }
    
    
        @Bean("orderKafkaListenerContainerFactory")
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(10);
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }
    
        private ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        private Map<String, Object> consumerConfigs() {
            return contractKafkaProperties.buildConsumerProperties();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    这样就可以在其他地方直接使用了,生产者就直接@Autowired orderKafkaTemplate,如果是消费者,直接在@KafkaListenercontainerFactory属性指定orderKafkaListenerContainerFactory

    如果有多个生产者消费者,就增加对应的配置即可。这样简化了配置的读取,除了加了前缀,其他的配置都是和Kafka默认配置一样的,复用Springboot的属性绑定,后续如果有其他配置,加上后能直接生效,无需修改代码。如果修改配置的结构需要代码中读取,然后手动设置,后期修改YML配置和代码都需要修改,比较麻烦。

    方案演进

    上述方案,如果需要新增一个Kafka的配置,需要新增一个前缀,然后新增对应配置代码,来生成KafkaPropertiesKafkaTemplateKafkaListenerContainerFactory实例,但是不同的前缀生成不同的实例代码都是重复的,而且所有的前缀、属性值都由YML配置可以得到,所以代码中生成带前缀的bean可以由代码自动生成,并注册到spring容器中。根据这个思路,写一个BeanFactoryAware的实现类。(Aware接口是框架提供给用户用户获取框架中一些对象的接口,比如BeanFactoryAware就是获取BeanFactory,框架会调用重写的setBeanFactory方法,将BeanFactory传给我们的实现类)

    @Component
    @Slf4j
    public class EmallBeanFactoryAware implements BeanFactoryAware {
    
        @Autowired
        private Environment environment;
    
        private static final String SPRING_KAFKA_PREFIX = "spring.kafka";
    
        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            if (beanFactory instanceof DefaultListableBeanFactory) {
                DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
    
                Binder binder = Binder.get(environment);
                //将YML中属性值映射到MAP中,后面根据配置前缀生成bean并注册到容器中,TODO 绑定可能有异常,加try catch稳一点
                BindResult<Map> bindResultWithPrefix = binder.bind(SPRING_KAFKA_PREFIX, Bindable.of(Map.class));
                if (!bindResultWithPrefix.isBound()) {
                    return;
                }
    
                Map map = bindResultWithPrefix.get();
                Set set = map.keySet();
                Set<String> kafkaPropertyFiledNames = getKafkaPropertyFiledNames();
    
                //如果配置多个primary, 只设置第一个,TODO项目启动过程中,这个变量是否有并发问题
                boolean hasSetPrimary = false;
                //实例化每个带前缀的KafkaProperties、KafkaTemplate、
                for (Object object : set) {
                    String prefix = object.toString();
    
                    if (kafkaPropertyFiledNames.contains(prefix)) {
                        //不带前缀的正常配置忽略
                        continue;
                    }
    
                    String configPrefix = SPRING_KAFKA_PREFIX + "." + prefix;
    
                    BindResult<KafkaProperties> kafkaPropertiesBindResult;
                    try {
                        kafkaPropertiesBindResult = binder.bind(configPrefix, Bindable.of(KafkaProperties.class));
                        if (!kafkaPropertiesBindResult.isBound()) {
                            continue;
                        }
                    } catch (Exception e) {
                        //一些配置不是在KafkaProperties属性,但是也不是前缀配置,在这一步会绑定失败,比如spring.kafka.topics配置,
                        //一些配置的名称是带-,KafkaProperties属性是驼峰,绑定是会出异常的,异常忽略
                        log.error("auto register kafka properties error, prefix is: {}", configPrefix);
                        continue;
                    }
    
                    //注册生产者(TODO 没配置生产者是否会报错)
                    KafkaProperties kafkaProperties = kafkaPropertiesBindResult.get();
                    String propertiesBeanName = prefix + "KafkaProperties";
                    boolean isBeanExist = defaultListableBeanFactory.containsBean(propertiesBeanName);
                    if (!isBeanExist) {
                        String primaryConfig = configPrefix + ".primary";
                        //没有默认的kafka配置,需要设置下primary
                        BindResult<Boolean> primaryBindResult = binder.bind(primaryConfig, Bindable.of(Boolean.class));
                        if (primaryBindResult.isBound() && primaryBindResult.get() && !hasSetPrimary) {
                            BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(KafkaProperties.class);
                            defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition());
                            defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
                            defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);
                            hasSetPrimary = true;
                        } else {
                            defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
                        }
                    }
    
    				//注册生产者KafkaTemplate
                    String templateBeanName = prefix + "KafkaTemplate";
                    if (!defaultListableBeanFactory.containsBean(templateBeanName)) {
                        KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(
                                new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()));
                        defaultListableBeanFactory.registerSingleton(templateBeanName, kafkaTemplate);
                    }
    
                    String beanName = prefix + "KafkaListenerContainerFactory";
                    if (!defaultListableBeanFactory.containsBean(beanName)) {
                        //注册消费者listener(TODO 没配置消费者是否会报错)
                        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
                        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
                        factory.setConcurrency(10);
                        factory.getContainerProperties().setPollTimeout(3000);
                        defaultListableBeanFactory.registerSingleton(beanName, factory);
                    }
                }
            }
        }
    
        private static Set<String> getKafkaPropertyFiledNames () {
            Set<String> names = new HashSet<>();
    
            Field[] declaredFields = KafkaProperties.class.getDeclaredFields();
            if (declaredFields.length == 0) {
                return names;
            }
    
            for (Field declaredField : declaredFields) {
                names.add(declaredField.getName());
            }
    
            return names;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107

    继续演进

    上面的方案是实现BeanFactoryAware接口通过BeanFactory来,优先级比一些系统配置的Bean低,导致@Autowire时找不到,需要加@Lazy注解,通过配置,提高自动注册的Kafka Bean优先级,使得能够被@Autowire。方法就是实现BeanDefinitionRegistryPostProcessor接口,在所有BeanDefinition加载完,所有Bean实例化之前,实例化Kafka生产者消费者Bean,这样就能直接被@Autowire,代码任何地方都能直接使用。也实现EnvironmentAware接口,获取当前环境的Kafka配置,这里不通过SpringBoot解析Yml文件,因为有些配置可能不仅仅在当前项目中,也可能配置在当前项目依赖的Jar中,使用Environment就能解析到当前环境在所有地方的配置,Springboot会去解析的。完整代码如下,写的比较急,代码应该还有优化的空间。

    @Configuration
    @Slf4j
    public class MultiKafkaAutoConfiguration implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
    
        private Environment environment;
    
        private static final String SPRING_KAFKA_PREFIX = "spring.kafka";
    
        @Override
        public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
            Binder binder = Binder.get(environment);
            Set set = getAllConfigPrefix(binder);
            if (set == null || set.size() == 0) {
                return;
            }
            Set<String> kafkaPropertyFiledNames = getKafkaPropertyFiledNames();
    
            boolean hasSetPrimary = false;
            for (Object object : set) {
                String prefix = object.toString();
                if (kafkaPropertyFiledNames.contains(prefix)) {
                    //不带前缀的正常配置忽略
                    continue;
                }
    
                String configPrefix = SPRING_KAFKA_PREFIX + "." + prefix;
    
                BindResult<Boolean> primaryBindResult =
                        binder.bind(configPrefix + ".primary", Bindable.of(Boolean.class));
    
                if (primaryBindResult.isBound() && primaryBindResult.get() && !hasSetPrimary) {
                    BeanDefinitionBuilder beanDefinitionBuilder =
                            BeanDefinitionBuilder.genericBeanDefinition(KafkaProperties.class).setPrimary(true);
                    registry.registerBeanDefinition(
                            prefix + "KafkaProperties", beanDefinitionBuilder.getBeanDefinition());
                    hasSetPrimary = true;
                }
            }
        }
    
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
            Binder binder = Binder.get(environment);
            Set set = getAllConfigPrefix(binder);
            if (set == null || set.size() == 0) {
                return;
            };
            Set<String> kafkaPropertyFiledNames = getKafkaPropertyFiledNames();
    
            //实例化每个带前缀的KafkaTemplate
            for (Object object : set) {
                String prefix = object.toString();
    
                if (kafkaPropertyFiledNames.contains(prefix)) {
                    //不带前缀的正常配置忽略
                    continue;
                }
    
                String configPrefix = SPRING_KAFKA_PREFIX + "." + prefix;
    
                BindResult<KafkaProperties> kafkaPropertiesBindResult;
                try {
                    kafkaPropertiesBindResult = binder.bind(configPrefix, Bindable.of(KafkaProperties.class));
                    if (!kafkaPropertiesBindResult.isBound()) {
                        continue;
                    }
                } catch (Exception e) {
                    //一些配置不是在KafkaProperties属性,但是也不是前缀配置,在这一步会绑定失败,比如spring.kafka.topics配置,
                    //一些配置的名称是带-,KafkaProperties属性是驼峰,绑定是会出异常的,异常忽略
                    log.error("auto register kafka properties error, prefix is: {}", configPrefix);
                    continue;
                }
    
                KafkaProperties kafkaProperties = kafkaPropertiesBindResult.get();
                String propertiesBeanName = prefix + "KafkaProperties";
                boolean isBeanExist = beanFactory.containsBean(propertiesBeanName);
                if (!isBeanExist) {
                    beanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
                }
    
                String templateBeanName = prefix + "KafkaTemplate";
                if (!beanFactory.containsBean(templateBeanName)) {
                    KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(
                            new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()));
                    beanFactory.registerSingleton(templateBeanName, kafkaTemplate);
                }
    
                String beanName = prefix + "KafkaListenerContainerFactory";
                if (!beanFactory.containsBean(beanName)) {
                    //注册消费者listener(TODO 没配置消费者是否会报错)
                    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
                    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
                    factory.setConcurrency(10);
                    factory.getContainerProperties().setPollTimeout(3000);
                    beanFactory.registerSingleton(beanName, factory);
                }
            }
        }
    
        @Override
        public void setEnvironment(Environment environment) {
            this.environment = environment;
        }
    
        private Set getAllConfigPrefix(Binder binder) {
            BindResult<Map> bindResultWithPrefix = binder.bind(SPRING_KAFKA_PREFIX, Bindable.of(Map.class));
            if (!bindResultWithPrefix.isBound()) {
                return null;
            }
            Map map = bindResultWithPrefix.get();
            return map.keySet();
        }
    
        private static Set<String> getKafkaPropertyFiledNames () {
            Set<String> names = new HashSet<>();
    
            Field[] declaredFields = KafkaProperties.class.getDeclaredFields();
            if (declaredFields.length == 0) {
                return names;
            }
    
            for (Field declaredField : declaredFields) {
                names.add(declaredField.getName());
            }
    
            return names;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129

    参考资料

    • https://stackoverflow.com/questions/28374000/spring-programmatically-generate-a-set-of-beans/28550486#28550486
    • https://stackoverflow.com/questions/19454289/spring-boot-environment-autowired-throws-nullpointerexception

    遇到的问题

    手动注册的bean代码中@Autowire无法注入

    手动注册的无法@Autowire,直接加@Lazy注解,先忽略bean注册的先后顺序

    多个KafkaProperties实例,无法确定使用哪一个

    因为使用前缀的配置方式,bean名称也是带前缀的,没有默认的Kafka配置,框架会自动生成对应的bean,KafkaAnnotationDrivenConfiguration中的KafkaProperties 属性是根据类型注入的,如果配置有多个前缀,注入的时候无法确定使用哪一个,所以增加一个primary配置,自动生成的时候设置下。

    既有带前缀,又有不带前缀使用默认配置的

    自动配置代码中有一段是根据yml中配置的key,判断是否是KafkaProperties类中的字段,如果是就忽略,让框架自动按默认配置,有些字段yml中是带-,如bootstrap-serversKafkaProperties中是驼峰命名bootstrapServers,绑定的时候会抛异常,影响应用启动,这种异常可以忽略,直接用try catch捕获。

    设置Bean为Primary

    第二个问题中,多个相同类型的Bean如何设置其中一个bean为Primary,手动注册bean,如果有实例对象,可以直接使用BeanFactoryregisterSingleton(beanName, object),如果没有实例对象,可以直接使用类名,通过BeanFactoryregisterBeanDefinition(beanName, beanDefinition)来注册,如果要设置bean为Primary,必须通过BeanDefinition来设置,但是通过框架的绑定是直接生成实例对象的,如果通过registerSingleton来注册,通过beanName获取BeanDefinition是会抛异常的,因为没有BeanDefinition,所以需要将对象实例和BeanDefinition关联起来,就是上面这段代码

    //注册BeanDefinition
    defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition());
    //注册对象实例,使用相同的bean名称
    defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
    //再获取BeanDefinition就能获取到,而且这个bean就是上面注册的实例对象
    defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    @Autowire Environment报空指针

    实现BeanDefinitionRegistryPostProcessor接口的配置了中直接注入Environment报空指针,是因为配置该类时所有的bean都没有实例化,改成实现EnvironmentAware接口就可以了,因为项目启动肯定先加载配置,所以EnvironmentAware肯定是已经有了,只是用哪种方式使用的问题。

    
    
    
    • 1
    • 2
  • 相关阅读:
    Unreal Engine Loop 流程
    eNSP-抓包实验
    开源数据库postgresql在统信系统上的离线安装shell脚本
    [AI Google] Google I/O 2024: 为新一代设计的 I/O
    大聪明教你学Java | Spring Boot全媒体资源库开发——验证码
    R语言拟合ARIMA模型并使用拟合模型进行预测推理、使用autoplot函数可视化ARIMA模型预测结果、可视化包含置信区间的预测结果
    天工开物 #8 Async Rust 的实现
    web server apache tomcat11-10-Class Loader
    spark ui的job数,stage数以及task数
    English语法_介词 - by
  • 原文地址:https://blog.csdn.net/AE86JayChou/article/details/128043113