• 【Spring Boot 使用记录】kafka自动配置和自定义配置及消费者


    一、前期准备 POM文件引入依赖

            
                org.springframework.kafka
                spring-kafka
            
    
    • 1
    • 2
    • 3
    • 4

    二、自动配置

    1 前言(了解)

    自动配置实现在 org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
    配置类为:

    @ConfigurationProperties(prefix = "spring.kafka")
    public class KafkaProperties {
     
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2 、配置文件 application.yml配置文件(在项目里面配置文件配置)

    spring:
      kafka:
        # kafka集群信息
        bootstrap-servers: 192.168.153.162:9092
        # 生产者配置
        producer: 
          # 设置大于0的值,则客户端会将发送失败的记录重新发送
          retries: 3 
          #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。16M
          batch-size: 16384
          linger: 1
          # 设置生产者内存缓冲区的大小。#32M
          buffer-memory: 33554432
          # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
          # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
          # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
          acks: 1
          # 指定消息key和消息体的编解码方式 值的序列化方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          # 消费者组
          group-id: test 
          # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
          auto-offset-reset: earliest
          # 自动提交的时间间隔  刷新间隔时间,负值失败时候刷新,0每次发送后刷新
          auto-commit-interval: 100
          # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
          enable-auto-commit: true
          # 在侦听器容器中运行的线程数。
          concurrency: 5
          #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
          session.timeout.ms: 600000			
          # 键的反序列化方式
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # 值的反序列化方式
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    3、启动项目完成基础

    三、自定义配置

    1、前言

    配置类org.springframework.boot.autoconfigure.kafka.KafkaProperties中并没有涵盖所有的org.apache.kafka.clients.producer.ProducerConfig和org.apache.kafka.clients.consumer.ConsumerConfig中的配置,这就导致某些特殊配置不能依赖spring boot自动创建,需要我们手动创建Poducer和comsumer。

    2 、配置文件 application.yml配置文件(在项目里面配置文件配置)

    spring:
      kafka:
        # kafka集群信息
        bootstrap-servers: 192.168.153.162:9092
        # 生产者配置
        producer: 
          # 设置大于0的值,则客户端会将发送失败的记录重新发送
          retries: 3 
          #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。16M
          batch-size: 16384
          linger: 1
          # 设置生产者内存缓冲区的大小。#32M
          buffer-memory: 33554432
          # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
          # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
          # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
          acks: 1
          # 指定消息key和消息体的编解码方式 值的序列化方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          # 消费者组
          group-id: test 
          # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
          auto-offset-reset: earliest
          # 自动提交的时间间隔  刷新间隔时间,负值失败时候刷新,0每次发送后刷新
          auto-commit-interval: 100
          # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
          enable-auto-commit: true
          # 在侦听器容器中运行的线程数。
          concurrency: 5
          #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
          session.timeout.ms: 600000			
          # 键的反序列化方式
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # 值的反序列化方式
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    3、生产端自定义配置例子:

    @EnableKafka:这个注解用来启用kafka相关注解配置功能

    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    @Configuration
    @EnableKafka
    public class KafkaProducerConfiguration {
    
        @Value("${spring.kafka.bootstrap-servers:192.168.153.162:9092}")
        private String servers;
        @Value("${spring.kafka.producer.retries:3}")
        private int retries;
        @Value("${spring.kafka.producer.batch-size:16384}")
        private int batchSize;
        @Value("${spring.kafka.producer.linger:1}")
        private int linger;
        @Value("${spring.kafka.producer.buffer-memory:33554432}")
        private int bufferMemory;
        // 创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
        public Map producerConfigs() {
            Map props = new HashMap<>();
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            //设置重试次数
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            //达到batchSize大小的时候会发送消息
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            //延时时间,延时时间到达之后计算批量发送的大小没达到也发送消息
            props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
            //缓冲区的值
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
    //		props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "cn.ztuo.bitrade.kafka.kafkaPartitioner");
            //序列化手段
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
    
    //        //producer端的消息确认机制,-1和all都表示消息不仅要写入本地的leader中还要写入对应的副本中
    //        props.put(ProducerConfig.ACKS_CONFIG, -1);
    //        //单条消息的最大值以字节为单位,默认值为1048576
    //        props.put(ProducerConfig.LINGER_MS_CONFIG, 10485760);
    //        //设置broker响应时间,如果broker在60秒之内还是没有返回给producer确认消息,则认为发送失败
    //        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
    //        //指定拦截器(value为对应的class)
    //        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.te.handler.KafkaProducerInterceptor");
    //        //设置压缩算法(默认是木有压缩算法的)
    //        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "LZ4");
    
    
            return props;
        }
    
        /**
         * 不使用spring boot的KafkaAutoConfiguration默认方式创建的DefaultKafkaProducerFactory,重新定义
         * @return
         */
        public ProducerFactory producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
        /**
         * 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义
         * @return
         */
        @Bean
        public KafkaTemplate kafkaTemplate() {
            return new KafkaTemplate(producerFactory());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    4、消费端自定义配置例子:

    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    //这里创建了对应类型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的对应Bean定义将不起作用。
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfiguration {
    
        @Value("${spring.kafka.bootstrap-servers:192.168.153.162:9092}")
        private String servers;
        @Value("${spring.kafka.consumer.enable-auto-commit:true}")
        private boolean enableAutoCommit;
        @Value("${spring.kafka.consumer.auto-commit-interval:100}")
        private String autoCommitInterval;
        @Value("${spring.kafka.consumer.group-id:test}")
        private String groupId;
        @Value("${spring.kafka.consumer.auto-offset-reset:earliest}")
        private String autoOffsetReset;
        @Value("${spring.kafka.consumer.session.timeout.ms:120000}")
        private String sessionTimeout;
        @Value("${spring.kafka.consumer.concurrency:5}")
        private int concurrency;
        //构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多
        public Map consumerConfigs() {
            Map propsMap = new HashMap<>();
            //kafka集群信息
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            //自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
            propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
            //# 消费者组
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            //earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            // #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
            //两次Poll之间的最大允许间隔。
            //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。 单位毫秒
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            return propsMap;
        }
        /**
         * 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式
         * @return
         */
        public ConsumerFactory consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        /**
         * 它具有并发属性。例如, container.setConcurrency(3) 创建了三个KafkaMessageListenerContainer实例。     *
         * 如果您提供了六个TopicPartition实例并且并发数为 3;每个容器有两个分区。对于五个 TopicPartition 实例,
         * 两个容器获得两个分区,第三个获得一个分区。如果并发数大于 TopicPartitions 的数量,则向下调整并发性,使每个容器获得一个分区。
         * 配置批处理侦听器
         *
         * 从版本1.1开始,可以将@KafkaListener方法配置为接收从消费者调查接收的整批消费者记录。配置监听器容器工厂创建一批听众,
         * 设置的的batchListener属性ConcurrentKafkaListenerContainerFactory来true。
         *
         * 我们可以选择BatchErrorHandler使用ConcurrentKafkaListenerContainerFactory#getContainerProperties().setBatchErrorHandler()
         * 并提供批处理错误处理程序来创建一个。
         *
         * 我们可以通过将Spring Kafka设置为ConsumerConfig.MAX_POLL_RECORDS_CONFIG适合您的值来配置Spring Kafka来设置批量大小的上限。默认情况下,
         * 动态计算每批中接收的记录数。在以下示例中,我们将上限配置为5。
         *
         *
         *
         */
        @Bean
        public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(concurrency);
            //设置批量消费
            factory.setBatchListener(true);
            factory.setMissingTopicsFatal(false);
            factory.getContainerProperties().setPollTimeout(1500);
            factory.setBatchListener(true);
            return factory;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    四、消费者

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    
    @Component
    public class KafkaMessageListener {
        private static final Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);
    
    
    
        /**
         * 因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List>。
         *
         * 从版本1.1开始,可以将@KafkaListener方法配置为接收从消费者调查接收的整批消费者记录。配置监听器容器工厂创建一批听众,
         * 设置的的batchListener属性ConcurrentKafkaListenerContainerFactory来true。
         *
         * 我们可以选择BatchErrorHandler使用ConcurrentKafkaListenerContainerFactory#getContainerProperties().setBatchErrorHandler()
         * 并提供批处理错误处理程序来创建一个。
         *
         * 我们可以通过将Spring Kafka设置为ConsumerConfig.MAX_POLL_RECORDS_CONFIG适合您的值来配置Spring Kafka来设置批量大小的上限。默认情况下,
         * 动态计算每批中接收的记录数。在以下示例中,我们将上限配置为5。
         * topic1,topic2
         * "#{'${spring.kafka.topics:test}'.split(',')}"
         * "#{'${kafka.consumer.topics}'.split(',')}"
         *(topics ="#{'${kafka.consumer.topics}'.split(',')}")
         */
    //    @KafkaListener(topics = {"${spring.kafka.topic:test}"})
    //    public void listen(List> recordList) {
    //        for (ConsumerRecord record : recordList) {
    //            // 打印消息的分区以及偏移量
    //            logger.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());
    //            //获取topic
    //            String topic = record.topic();
    //            String value = record.value();
    //            if(StringUtils.isNotBlank(value)){
    //                logger.info("value = " + value);
    //                // 处理业务逻辑 ...
    //            }
    //
    //        }
    //    }
    
        @KafkaListener(topics = "#{'${spring.kafka.topics:test,test1}'.split(',')}")
        public void msgListen(List> recordList) {
            for (ConsumerRecord record : recordList) {
                // 打印消息的分区以及偏移量
                logger.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());
                //获取topic
                String topic = record.topic();
                String value = record.value();
                if(StringUtils.isNotBlank(value)){
                    logger.info("value = " + value);
                    // 处理业务逻辑 ...
                }
    
            }
        }
    
    
    //    // kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
    //    @KafkaListener(topics = "test")
    //    public void listenZhugeGroup(ConsumerRecord record, Acknowledgment ack) {
    //        String value = record.value();
    //        logger.info("value = " + value);
    //        logger.info("record = " + record);
    //        //手动提交offset
    //        ack.acknowledge();
    //    }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
  • 相关阅读:
    Google桌面与BBdoc文件管理助手对比分析
    安装 ZooKeeper 并配置服务
    突破编程_C++_设计模式(观察者模式)
    测试工程师转开发希望大吗?
    Http-Sumggling缓存漏洞分析
    JDBC如何记忆
    Redis性能瓶颈揭秘:如何优化大key问题?
    【无标题】
    3DS Max中绘制圆锥箭头
    MongoDB入门与实战-第五章-MongoDB副本集
  • 原文地址:https://blog.csdn.net/q908544703/article/details/126114498