• springboot 实现kafka多源配置


    背景

    实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。

    核心配置

    自动化配置类

    import com.example.kafka.autoconfig.CustomKafkaDataSourceRegister;
    import com.example.kafka.autoconfig.kafkaConsumerConfig;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.BeanFactoryAware;
    import org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Import;
    import org.springframework.kafka.annotation.EnableKafka;
    
    @EnableKafka
    @Configuration(
            proxyBeanMethods = false
    )
    @ConditionalOnWebApplication
    @EnableConfigurationProperties({kafkaConsumerConfig.class})
    @Import({CustomKafkaDataSourceRegister.class})
    public class MyKafkaAutoConfiguration implements BeanFactoryAware, SmartInstantiationAwareBeanPostProcessor {
        public MyKafkaAutoConfiguration() {
        }
    
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            beanFactory.getBean(CustomKafkaDataSourceRegister.class);
        }
    }
    

    注册生产者、消费者核心bean到spring

    public void afterPropertiesSet() {
            Map factories = kafkaConsumerConfig.getFactories();
            if (factories != null && !factories.isEmpty()) {
                factories.forEach((factoryName, consumerConfig) -> {
                    KafkaProperties.Listener listener = consumerConfig.getListener();
                    Integer concurrency = consumerConfig.getConcurrency();
                    // 创建监听容器工厂
                    ConcurrentKafkaListenerContainerFactory containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency);
                    // 注册到容器
                    if (!beanFactory.containsBean(factoryName)) {
                        beanFactory.registerSingleton(factoryName, containerFactory);
                    }
                });
            }
            Map templates = kafkaProducerConfig.getTemplates();
            if (!ObjectUtils.isEmpty(templates)) {
                templates.forEach((templateName, producerConfig) -> {
                    //registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues);
                    //注册spring bean的两种方式
                    registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties()));
                });
            }
        }
    

    配置spring.factories

    
    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    com.example.kafka.MyKafkaAutoConfiguration
    
    

    yml配置

    spring:
      kafka:
        multiple:
          consumer:
            factories:
              test-factory:
                key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
                bootstrap-servers: 192.168.56.112:9092
                group-id: group_a
                concurrency: 25
                fetch-min-size: 1048576
                fetch-max-wait: 3000
                listener:
                  type: batch
                properties:
                  spring-json-trusted-packages: '*'
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            auto-offset-reset: latest
          producer:
            templates:
              test-template:
                bootstrap-servers: 192.168.56.112:9092
                key-serializer: org.apache.kafka.common.serialization.StringSerializer
                value-serializer: org.apache.kafka.common.serialization.StringSerializer
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    使用

    在这里插入图片描述

    在这里插入图片描述

    源码仓库

    https://github.com/fafeidou/shield

  • 相关阅读:
    数据结构之单链表基本操作
    实验2 朴素贝叶斯和SVM 实操项目2 肿瘤分类与预测(SVM)
    Vue入门基础
    ubuntu安装依赖包时显示需要先安装其所需要的各种安装包)apt-get源有问题
    egg中使用Sequelize老报错?看了这篇相信你会有思路……
    使用 Filebeat+Easysearch+Console 打造日志管理平台
    模拟业务流程+构造各种测试数据,一文带你测试效率提升80%
    “论单元测试方法及应用”写作框架,软考高级论文,系统架构设计师论文
    猿创征文|瑞吉外卖——配置开发环境
    针对CSP-J/S的每日一练:Day 8
  • 原文地址:https://blog.csdn.net/qq_37362891/article/details/139391610