• 搭建大型分布式服务(四十)SpringBoot 整合多个kafka数据源-支持生产者


    系列文章目录



    前言

    本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。

    <dependency>
        <groupId>io.github.vipjoeygroupId>
        <artifactId>multi-kafka-starterartifactId>
        <version>最新版本号version>
    dependency>
    

    例如下面这样简单的配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessorcom.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。

    ## topic1的kafka配置
    spring.kafka.one.enabled=true
    spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
    spring.kafka.one.topic=mmc-topic-one
    spring.kafka.one.group-id=group-consumer-one
    spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
    spring.kafka.one.consumer.auto-offset-reset=latest
    spring.kafka.one.consumer.max-poll-records=10
    spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    ## topic2的kafka配置
    spring.kafka.two.enabled=true
    spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
    spring.kafka.two.topic=mmc-topic-two
    spring.kafka.two.group-id=group-consumer-two
    spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
    spring.kafka.two.consumer.auto-offset-reset=latest
    spring.kafka.two.consumer.max-poll-records=10
    spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    ## pb 消息消费者
    spring.kafka.pb.enabled=true
    spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
    spring.kafka.pb.topic=mmc-topic-pb
    spring.kafka.pb.group-id=group-consumer-pb
    spring.kafka.pb.processor=pbProcessor
    spring.kafka.pb.consumer.auto-offset-reset=latest
    spring.kafka.pb.consumer.max-poll-records=10
    spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
    
    ## kafka消息生产者
    spring.kafka.four.enabled=true
    spring.kafka.four.producer.name=fourKafkaSender
    spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
    spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    
    

    国籍惯例,先上源码:Github源码

    一、本文要点

    本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录

    • SpringBoot 整合多个kafka数据源
    • SpringBoot 批量消费kafka消息
    • SpringBoot 优雅地启动或停止消费kafka
    • SpringBoot kafka本地单元测试(免集群)
    • SpringBoot 利用map注入多份配置
    • SpringBoot BeanPostProcessor 后置处理器使用方式
    • SpringBoot 将自定义类注册到IOC容器
    • SpringBoot 注入bean到自定义类成员变量
    • Springboot 取消限定符
    • SpringBoot 支持消费protobuf类型的kafka消息
    • SpringBoot Aware设计模式
    • SpringBoot 获取kafka消息中的topic、offset、partition、header等参数
    • SpringBoot 使用任意生产者发送kafka消息
    • SpringBoot 配置任意数量的kafka生产者

    二、开发环境

    • jdk 1.8
    • maven 3.6.2
    • springboot 2.4.3
    • kafka-client 2.6.6
    • idea 2020

    三、原项目

    1、接前文,我们基本完成了kafka consumer常用的特性开发,有小伙伴问,我们该如何配置多个数据源生产者,想consumer一样简单,发送kafka消息呢?

    
    ## 1.配置
    spring.kafka.four.enabled=true
    spring.kafka.four.producer.name=fourKafkaSender
    spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
    spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    ## 2.引用
    @Resource(name = "fourKafkaSender")
    private MmcKafkaMultiSender mmcKafkaMultiSender;
    
    ## 3.使用
    mmcKafkaMultiSender.sendStringMessage(topicOne, "aaa", json);
    
    

    答案是可以的、但我们要升级和优化一下。

    四、修改项目

    1、修改内部类MmcKafkaProperties类,增加生产者相关的配置。

        @EqualsAndHashCode(callSuper = true)
        @Data
        public static class Producer extends KafkaProperties.Producer {
    
            /**
             * 是否启用.
             */
            private boolean enabled = true;
            /**
             * 生产者名称,如果有设置则会覆盖默认的xxxKakfkaSender名称.
             */
            private String name;
        }
            /**
             * 生产者.
             */
            private final Producer producer = new Producer();
            /**
             * Create an initial map of producer properties from the state of this instance.
             * 

    * This allows you to add additional properties, if necessary, and override the * default kafkaProducerFactory bean. * * @return the producer properties initialized with the customizations defined on this * instance */ Map<String, Object> buildProducerProperties() { return new HashMap<>(this.producer.buildProperties()); }

    2、新增MmcKafkaSender接口,作为发送Kafka消息的唯一约束。

    public interface MmcKafkaSender {
    
        /**
         * 发送kafka消息.
         *
         * @param topic        topic名称
         * @param partitionKey 消息分区键
         * @param message      具体消息
         */
        void sendStringMessage(String topic, String partitionKey, String message);
    
    
        /**
         * 发送kafka消息.
         *
         * @param topic        topic名称
         * @param partitionKey 消息分区键
         * @param message      具体消息
         */
        void sendProtobufMessage(String topic, String partitionKey, byte[] message);
    }
    
    
    

    3、新增MmcKafkaOutputContainer容器类,用于存储所有生产者,方便统一管理;

    @Getter
    @Slf4j
    public class MmcKafkaOutputContainer {
    
        /**
         * 存放所有生产者.
         */
        private final Map<String, MmcKafkaSender> outputs;
    
        /**
         * 构造函数.
         */
        public MmcKafkaOutputContainer(Map<String, MmcKafkaSender> outputs) {
            this.outputs = outputs;
        }
    
    }
    

    4、新增MmcKafkaSingleSender实现类,用于真实发送Kafka消息;

    public class MmcKafkaSingleSender implements MmcKafkaSender {
    
        private final KafkaTemplate<String, Object> template;
    
    
        public MmcKafkaSingleSender(KafkaTemplate<String, Object> template) {
            this.template = template;
        }
    
        @Override
        public void sendStringMessage(String topic, String partitionKey, String message) {
    
            template.send(topic, partitionKey, message);
        }
    
    
        @Override
        public void sendProtobufMessage(String topic, String partitionKey, byte[] message) {
    
            template.send(topic, partitionKey, message);
    
        }
    
    }
    

    5、修改MmcMultiProducerAutoConfiguration配置类,遍历所有配置,组装并生成MmcKafkaSingleSender,并注册到IOC容器;

    
    @Slf4j
    @Configuration
    @EnableConfigurationProperties(MmcMultiKafkaProperties.class)
    @ConditionalOnProperty(prefix = "spring.kafka", value = "enabled", matchIfMissing = true)
    public class MmcMultiProducerAutoConfiguration implements BeanFactoryAware {
    
        private DefaultListableBeanFactory beanDefinitionRegistry;
    
        @Resource
        private MmcMultiKafkaProperties mmcMultiKafkaProperties;
    
    
        @Bean
        public MmcKafkaOutputContainer mmcKafkaOutputContainer() {
    
            // 初始化一个存储所有生产者的哈希映射
            Map<String, MmcKafkaSender> outputs = new HashMap<>();
    
            // 获取所有的Kafka配置信息
            Map<String, MmcMultiKafkaProperties.MmcKafkaProperties> kafkas = mmcMultiKafkaProperties.getKafka();
    
            // 逐个遍历,并生成producer
            for (Map.Entry<String, MmcMultiKafkaProperties.MmcKafkaProperties> entry : kafkas.entrySet()) {
    
                // 唯一生产者名称
                String name = entry.getKey();
    
                // 生产者配置
                MmcMultiKafkaProperties.MmcKafkaProperties properties = entry.getValue();
    
                // 是否开启
                if (properties.isEnabled()
                        && properties.getProducer().isEnabled()
                        && CommonUtil.isNotEmpty(properties.getProducer().getBootstrapServers())) {
    
                    // bean名称
                    String beanName = Optional.ofNullable(properties.getProducer().getName())
                            .orElse(name + "KafkaSender");
    
                    KafkaTemplate<String, Object> template = mmcdKafkaTemplate(properties);
    
                    // 创建实例
                    MmcKafkaSender sender = new MmcKafkaSingleSender(template);
                    outputs.put(beanName, sender);
    
                    // 注册到IOC
                    beanDefinitionRegistry.registerSingleton(beanName, sender);
                }
    
            }
    
            return new MmcKafkaOutputContainer(outputs);
        }
    
        private KafkaTemplate<String, Object> mmcdKafkaTemplate(MmcMultiKafkaProperties.MmcKafkaProperties producer) {
    
            return new KafkaTemplate<>(baseKafkaProducerFactory(producer));
    
        }
    
        private ProducerFactory<String, Object> baseKafkaProducerFactory(MmcMultiKafkaProperties.MmcKafkaProperties producer) {
            return new DefaultKafkaProducerFactory<>(producer.buildProducerProperties());
        }
    
        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.beanDefinitionRegistry = (DefaultListableBeanFactory) beanFactory;
        }
    }
    

    五、测试一下

    1、引入kafka测试需要的jar。参考文章:kafka单元测试

            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
    
            <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafka-testartifactId>
                <scope>testscope>
            dependency>
            
            <dependency>
                <groupId>com.google.protobufgroupId>
                <artifactId>protobuf-javaartifactId>
                <version>3.18.0version>
                <scope>testscope>
            dependency>
            
            <dependency>
                <groupId>com.google.protobufgroupId>
                <artifactId>protobuf-java-utilartifactId>
                <version>3.18.0version>
                <scope>testscope>
            dependency>
    

    2、消费者配置保持不变,增加生产者配置。

    ## json消息消费者
    spring.kafka.one.enabled=true
    spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
    spring.kafka.one.topic=mmc-topic-one
    spring.kafka.one.group-id=group-consumer-one
    spring.kafka.one.processor=oneProcessor
    spring.kafka.one.duplicate=false
    spring.kafka.one.snakeCase=false
    spring.kafka.one.consumer.auto-offset-reset=latest
    spring.kafka.one.consumer.max-poll-records=10
    spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.one.container.threshold=2
    spring.kafka.one.container.rate=1000
    spring.kafka.one.container.parallelism=8
    
    ## json消息生产者
    spring.kafka.four.enabled=true
    spring.kafka.four.producer.name=fourKafkaSender
    spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
    spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    

    3、编写测试类。

    @Slf4j
    @ActiveProfiles("dev")
    @ExtendWith(SpringExtension.class)
    @SpringBootTest(classes = {MmcMultiProducerAutoConfiguration.class, MmcMultiConsumerAutoConfiguration.class,
            DemoService.class, OneProcessor.class})
    @TestPropertySource(value = "classpath:application-string.properties")
    @DirtiesContext
    @EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"},
            topics = {"${spring.kafka.one.topic}"})
    class KafkaStringMessageTest {
    
    
        @Value("${spring.kafka.one.topic}")
        private String topicOne;
    
        @Value("${spring.kafka.two.topic}")
        private String topicTwo;
    
        @Resource(name = "fourKafkaSender")
        private MmcKafkaSingleSender mmcKafkaSingleSender;
    
    
        @Test
        void testDealMessage() throws Exception {
    
            Thread.sleep(2 * 1000);
    
            // 模拟生产数据
            produceMessage();
    
            Thread.sleep(10 * 1000);
        }
    
        void produceMessage() {
    
    
            for (int i = 0; i < 10; i++) {
    
                DemoMsg msg = new DemoMsg();
                msg.setRoutekey("routekey" + i);
                msg.setName("name" + i);
                msg.setTimestamp(System.currentTimeMillis());
    
                String json = JsonUtil.toJsonStr(msg);
    
                mmcKafkaSingleSender.sendStringMessage(topicOne, "aaa", json);
    
    
            }
        }
    }
    
    
    
    

    5、运行一下,测试通过,可以看到能正常发送消息和消费。
    在这里插入图片描述

    五、小结

    将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。

    加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

  • 相关阅读:
    1、广告-互联网展示广告发展史
    Qt基础开发之QString与QByteArray详细用法与区别及QString QByteArray互转
    [Java Framework] [Spring] Spring Event / 事件的使用 一: ApplicationEvent
    Elasticsearch-8.4.2 集群安装
    贵金属金银铂铑钯回收-HP4080
    android Leakcanary/Studio Profiler/MAT 处理内存问题(泄漏和Big超大内存对象)
    工控上位机程序为什么只能用C语言?
    CUDA By Example(二)——CUDA C简介
    张益唐与黎曼猜想
    11-pytorch-使用自己的数据集测试
  • 原文地址:https://blog.csdn.net/hanyi_/article/details/139897315