• 微服务同时接入多个Kafka


    最近在做微服务的迁移改造工作,其中有一个服务需要订阅多个Kafka,如果使用spring kafka自动配置的话只能配置一个Kafka,不符合需求,该文总结了如何配置多个Kafka,希望对您有帮助。

    准备工作

    • 自己搭建一个Kafka
      从官方下载Kafka,选择对应Spring Boot 的版本,好在Kafka支持的版本范围比较广,当前最新版本是3.2.1,支持2.12-3.2.1 范围的版本,覆盖了Spring Boot 2.0x-Spring Boot 3.0.x
      https://kafka.apache.org/downloads
      在这里插入图片描述
    • 解压安装
      进入bin目录,执行如下命令,按照如下顺序启动
      Linux
    # 配置文件选择自己对应的目录
    zookeeper-server-start.sh ../config/zookeeper.properties
    
    • 1
    • 2

    Windows

    windows/zookeeper-server-start.bat ../config/zookeeper.properties
    
    • 1

    打开另外一个终端,启动KafkaServer
    Linux

    kafka-server-start.sh ../config/server.properties
    
    • 1

    Windows

    windows/kafka-server-start.bat ../config/server.properties
    
    • 1

    最小化配置Kafka

    如下是最小化配置Kafka
    pom.xml 引入依赖

    <dependency>
    	<groupId>org.springframework.kafkagroupId>
    	<artifactId>spring-kafkaartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    application.properties

    server.port=8090
    spring.application.name=single-kafka-server
    
    #kafka 服务器地址
    spring.kafka.bootstrap-servers=localhost:9092
    #消费者分组,配置后,自动创建
    spring.kafka.consumer.group-id=default_group
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    KafkaProducer 生产者

    @Slf4j
    @Component
    @EnableScheduling
    public class KafkaProducer {
    
        @Resource
        private KafkaTemplate kafkaTemplate;
    
        private void sendTest() {
        	//topic 会自动创建
            kafkaTemplate.send("topic1", "hello kafka");
        }
    
        @Scheduled(fixedRate = 1000 * 10)
        public void testKafka() {
            log.info("send message...");
            sendTest();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    KafkaConsumer 消费者

    @Slf4j
    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics = {"topic1"})
        public void processMessage(String spuId) {
            log.warn("process spuId ={}", spuId);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    运行效果:
    在这里插入图片描述

    Kafka配置

    配置稍微复杂了一点,灵魂就是手动创建,同样引入依赖
    pom.xml

    <dependency>
    	<groupId>org.springframework.kafkagroupId>
    	<artifactId>spring-kafkaartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    application.properties

    server.port=8090
    spring.application.name=kafka-server
    
    #kafka1
    #服务器地址
    spring.kafka.one.bootstrap-servers=localhost:9092
    spring.kafka.one.consumer.group-id=default_group
    
    
    #kafka2
    spring.kafka.two.bootstrap-servers=localhost:9092
    spring.kafka.two.consumer.group-id=default_group2
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    第一个Kafka配置,需要区分各Bean的名称
    KafkaOneConfig

    @Configuration
    public class KafkaOneConfig {
    
        @Value("${spring.kafka.one.bootstrap-servers}")
        private String bootstrapServers;
        @Value("${spring.kafka.one.consumer.group-id}")
        private String groupId;
    
        @Bean
        public KafkaTemplate<String, String> kafkaOneTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
        @Bean(name = "kafkaOneContainerFactory")
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }
    
        private ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        private ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        private Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        private Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息
    kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中,
    producerFactory 生产者工厂
    consumerFactory 消费者工厂
    producerConfigs 生产者配置
    consumerConfigs 消费者配置

    同样创建第二个Kafka,配置含义,同第一个Kafka
    KafkaTwoConfig

    @Configuration
    public class KafkaTwoConfig {
    
        @Value("${spring.kafka.two.bootstrap-servers}")
        private String bootstrapServers;
        @Value("${spring.kafka.two.consumer.group-id}")
        private String groupId;
    
        @Bean
        public KafkaTemplate<String, String> kafkaTwoTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
        @Bean(name = "kafkaTwoContainerFactory")
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }
    
        private ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        private Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        private Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    创建一个测试的消费者,注意配置不同的监听容器containerFactory
    KafkaConsumer

    @Slf4j
    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics = {"topic1"}, containerFactory = "kafkaOneContainerFactory")
        public void oneProcessItemcenterSpuMessage(String spuId) {
            log.warn("one process spuId ={}", spuId);
        }
    
        @KafkaListener(topics = {"topic2"},containerFactory = "kafkaTwoContainerFactory")
        public void twoProcessItemcenterSpuMessage(String spuId) {
            log.warn("two process spuId ={}", spuId);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    创建一个测试的生产者,定时往两个topic中发送消息
    KafkaProducer

    @Slf4j
    @Component
    public class KafkaProducer {
    
        @Resource
        private KafkaTemplate kafkaOneTemplate;
        @Resource
        private KafkaTemplate kafkaTwoTemplate;
    
        private void sendTest() {
            kafkaOneTemplate.send("topic1", "hello kafka one");
            kafkaTwoTemplate.send("topic2", "hello kafka two");
        }
    
        @Scheduled(fixedRate = 1000 * 10)
        public void testKafka() {
            log.info("send message...");
            sendTest();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    最后运行效果:
    在这里插入图片描述

    其他kafka文章:
    【从面试题看源码】-看完Kafka性能优化-让你吊打面试官

  • 相关阅读:
    前端培训技术AngularJS 服务(Service)
    Head First设计模式(阅读笔记)-04.工厂模式
    VXLAN内通信与EVPN
    leetcode337打家劫舍3刷题打卡
    C++【智能指针】
    阿里云有奖体验:如何使用 PolarDB-X
    11+孟德尔随机化+GWAS分析
    小白入门大模型的第一课
    整治PPOCRLabel中cv2文件读取问题(更新中)
    【Sentinel】Sentinel配置zk持久化
  • 原文地址:https://blog.csdn.net/weixin_40972073/article/details/126682094