• 聊聊在springboot项目中如何配置多个kafka消费者


    前言

    不知道大家有没有遇到这样的场景,就是一个项目中要消费多个kafka消息,不同的消费者消费指定kafka消息。遇到这种场景,我们可以通过kafka的提供的api进行配置即可。但很多时候我们会使用spring-kafka来简化开发,可是spring-kafka原生的配置项并没提供多个kafka配置,因此本文就来聊聊如何将spring-kafka进行改造,使之能支持多个kafka配置

    正文

    1、通过 @ConfigurationProperties指定KafkaProperties前缀

        @Primary
        @ConfigurationProperties(prefix = "lybgeek.kafka.one")
        @Bean
        public KafkaProperties oneKafkaProperties(){
            return new KafkaProperties();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果有多个就配置多个,形如

        @ConfigurationProperties(prefix = "lybgeek.kafka.two")
        @Bean
        public KafkaProperties twoKafkaProperties(){
            return new KafkaProperties();
        }
    
        @ConfigurationProperties(prefix = "lybgeek.kafka.three")
        @Bean
        public KafkaProperties threeKafkaProperties(){
            return new KafkaProperties();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2、配置消费者工厂,消费者工厂绑定对应的KafkaProperties

      @Bean
        public ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){
    
            return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3、配置消费者监听器工厂,并绑定指定消费者工厂以及消费者配置

      @Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_TWO)
        public KafkaListenerContainerFactory twoKafkaListenerContainerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties, @Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) {
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(twoConsumerFactory);
            factory.setConcurrency(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getConcurrency()) ? Runtime.getRuntime().availableProcessors() : twoKafkaProperties.getListener().getConcurrency());
            factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getAckMode()) ? ContainerProperties.AckMode.MANUAL:twoKafkaProperties.getListener().getAckMode());
    
            return factory;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    完整的配置示例如下

    @Configuration
    @EnableConfigurationProperties(MultiKafkaComsumeProperties.class)
    public class OneKafkaComsumeAutoConfiguration {
    
        @Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE)
        public KafkaListenerContainerFactory oneKafkaListenerContainerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties, @Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) {
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(oneConsumerFactory);
            factory.setConcurrency(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getConcurrency()) ? Runtime.getRuntime().availableProcessors() : oneKafkaProperties.getListener().getConcurrency());
            factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getAckMode()) ? ContainerProperties.AckMode.MANUAL:oneKafkaProperties.getListener().getAckMode());
            return factory;
        }
    
        @Primary
        @Bean
        public ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){
            return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());
        }
    
    
        @Primary
        @ConfigurationProperties(prefix = "lybgeek.kafka.one")
        @Bean
        public KafkaProperties oneKafkaProperties(){
            return new KafkaProperties();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    那个 @Primary要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的自动装配不懂要选哪个而报错

    @Configuration
    @ConditionalOnClass(KafkaTemplate.class)
    @EnableConfigurationProperties(KafkaProperties.class)
    @Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
    public class KafkaAutoConfiguration {
    
    	private final KafkaProperties properties;
    
    	private final RecordMessageConverter messageConverter;
    
    	public KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<RecordMessageConverter> messageConverter) {
    		this.properties = properties;
    		this.messageConverter = messageConverter.getIfUnique();
    	}
    
    	@Bean
    	@ConditionalOnMissingBean(KafkaTemplate.class)
    	public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
    			ProducerListener<Object, Object> kafkaProducerListener) {
    		KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
    		if (this.messageConverter != null) {
    			kafkaTemplate.setMessageConverter(this.messageConverter);
    		}
    		kafkaTemplate.setProducerListener(kafkaProducerListener);
    		kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
    		return kafkaTemplate;
    	}
    
    	@Bean
    	@ConditionalOnMissingBean(ProducerListener.class)
    	public ProducerListener<Object, Object> kafkaProducerListener() {
    		return new LoggingProducerListener<>();
    	}
    
    	@Bean
    	@ConditionalOnMissingBean(ConsumerFactory.class)
    	public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    		return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
    	}
    
    	@Bean
    	@ConditionalOnMissingBean(ProducerFactory.class)
    	public ProducerFactory<?, ?> kafkaProducerFactory() {
    		DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
    				this.properties.buildProducerProperties());
    		String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
    		if (transactionIdPrefix != null) {
    			factory.setTransactionIdPrefix(transactionIdPrefix);
    		}
    		return factory;
    	}
    
    	@Bean
    	@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
    	@ConditionalOnMissingBean
    	public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
    		return new KafkaTransactionManager<>(producerFactory);
    	}
    
    	@Bean
    	@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
    	@ConditionalOnMissingBean
    	public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
    		KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
    		Jaas jaasProperties = this.properties.getJaas();
    		if (jaasProperties.getControlFlag() != null) {
    			jaas.setControlFlag(jaasProperties.getControlFlag());
    		}
    		if (jaasProperties.getLoginModule() != null) {
    			jaas.setLoginModule(jaasProperties.getLoginModule());
    		}
    		jaas.setOptions(jaasProperties.getOptions());
    		return jaas;
    	}
    
    	@Bean
    	@ConditionalOnMissingBean
    	public KafkaAdmin kafkaAdmin() {
    		KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
    		kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
    		return kafkaAdmin;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    同项目使用多个kafka消费者示例

    1、在项目的pom引入spring-kafka GAV

     <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4

    2、在项目的yml中配置如下内容

    lybgeek:
        kafka:
            multi:
                comsume-enabled: false
            one:
                producer:
                    # kafka生产者服务端地址
                    bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:10.1.4.71:32643}
                    # 生产者重试的次数
                    retries: ${KAFKA_PRODUCER_RETRIES:0}
                    # 每次批量发送的数据量
                    batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384}
                    # 每次批量发送消息的缓冲区大小
                    buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432}
                    # 指定消息key和消息体的编码方式
                    key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
                    value-serializer:  ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
                    # acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
                    acks: ${KAFKA_PRODUCER_ACK:1}
    
                consumer:
                    bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:10.1.4.71:32643}
                    # 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
                    auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest}
                    #  是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
                    enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false}
                    # 指定消息key和消息体的解码方式
                    key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
                    value-deserializer:  ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
                listener:
                    # 在侦听器容器中运行的线程数。
                    concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1}
                    missing-topics-fatal: false
                    ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}
                    
        two:
            producer:
                # kafka生产者服务端地址
                bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:192.168.1.3:9202}
                # 生产者重试的次数
                retries: ${KAFKA_PRODUCER_RETRIES:0}
                # 每次批量发送的数据量
                batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384}
                # 每次批量发送消息的缓冲区大小
                buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432}
                # 指定消息key和消息体的编码方式
                key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
                value-serializer:  ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
                # acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
                acks: ${KAFKA_PRODUCER_ACK:1}
    
                consumer:
                    bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:192.168.1.3:9202}
                    # 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
                    auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest}
                    #  是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
                    enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false}
                    # 指定消息key和消息体的解码方式
                    key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
                    value-deserializer:  ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
                listener:
                    # 在侦听器容器中运行的线程数。
                    concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1}
                    missing-topics-fatal: false
                    ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    3、配置生产者

     private KafkaTemplate kafkaTemplate;
    
        @Override
        public MqResp sendSync(MqReq mqReq) {
            ListenableFuture<SendResult<String, String>> result = this.send(mqReq);
            MqResp mqResp = this.buildMqResp(result);
            return mqResp;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这个KafkaTemplate绑定的就是@Primary配置的kafkaProperties

    4、配置消费者监听,并绑定containerFactory

    @LybGeekKafkaListener(id = "createUser",containerFactory = MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE,topics = Constant.USER_TOPIC)
    public class UserComsumer extends BaseComusmeListener {
    
        @Autowired
        private UserService userService;
    
        @Override
        public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) {
            User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class);
            System.out.println("-----------------------");
            return userService.isExistUserByUsername(user.getUsername());
        }
    
        @Override
        public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) {
            User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class);
            System.out.println(user);
            return userService.save(user);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    通过指定containerFactory ,来消费指定的kafka消息

    5、测试

      User user = User.builder().username("test")
                    .email("test@qq.com")
                    .fullname("test")
                    .mobile("1350000001")
                    .password("1234561")
                    .build();
          userService.saveAndPush(user);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    发送消息,观察控制台输出

    : messageKey:null】,topic:【user-sync】存在重复消息数据-->{"email":"test@qq.com","fullname":"test","mobile":"1350000000","password":"123456","username":"test"}
    • 1

    会出现这样,是因为数据库已经有这条记录了,刚好验证一下重复消费

    总结

    本文实现的核心其实就是通过注入多个kafkaProperties来实现多配置 ,不知道大家有没有发现,就是改造后的配置,配置消费者后,生产者仍然也要配置。因为如果不配置,走的就是kafkaProperties默认的配置信息,即为localhost。还有细心的朋友也许会发现我示例中的消费者监听使用的注解是@LybGeekKafkaListener,这个和 @KafkaListener实现的功能基本一致。因为本示例和之前的文章聊聊如何实现一个带幂等模板的kafka消费者监听是同份代码,就直接复用了

    demo链接

    https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template

  • 相关阅读:
    NeurIPS 2022 | 基于实例等价性的知识图谱补全
    第十三章第一节:Java数据结构预备知识之数据结构、Java集合框架概述
    【实践篇】基于CAS的单点登录实践之路
    LeetCode_动态规划_中等_764.最大加号标志
    CPU+GPU掌舵AI大算力时代,中国企业能否从巨头碗中分一杯羹?
    设计一个递归算法,删除不带头结点的单链表L中所有值为x的结点(C语言实现)
    【云原生】springcloud08——Ribbon负载均衡调用
    linux0.11-内核信号
    在线webp转换jpg免费转换教程
    【Servlet】Servlet API
  • 原文地址:https://blog.csdn.net/kingwinstar/article/details/125274544