• Spring-kafka配置参数详解,消息批量发送与批量接收消费



    在这里插入图片描述

    配置文件

    • 这个是我正在使用的配置,基本上都加了注释
    • 有些我没用到的,就没写,以后有需要遇到了再补充
    spring:
      kafka:
        bootstrap-servers: localhost:9092 # 用来初始化连接kafka(不用配置全部节点,会动态发现)
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          buffer-memory: 33554432 # 缓存容量。默认值32MB = 33554432
          batch-size: 163840 # 默认 single request 批处理大小(以字节为单位),默认16KB = 16384
          retries: 1 # 消息发送失败重试次数
          acks: 1
          properties:
            linger:
              ms: 500 # 不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送。与batch-size配合使用,满足一个就发送
            max:
              request:
                size: 1048576 # 请求的最大字节数
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group-id: radar # 默认消费者组
          max-poll-records: 2000 # 批量一次最大拉取数据量
          enable-auto-commit: false # 自动提交已消费offfset,false-禁用
          auto-commit-interval: 4000 # 自动提交时间间隔,单位ms
          auto-offset-reset: earliest
          heartbeat-interval: 10000 # ⼼跳与消费者协调员之间的预期时间(以毫秒为单位)
          fetch-max-wait: 500
        listener:
          ack-mode: manual_immediate # manual_immediate-手动ack后立即提交;batch-批量自动确认;RECORD-单条自动确认;
          type: batch # 批量消费
          missing-topics-fatal: false # 未发现topic时不报错: 自动创建topic需要设置为false
        template:
          default-topic: radar
          patitions: 7
          replications: 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    消息批量发送

    • 消息批量发送,主要是设置batch-sizelinger.ms
    • batch-size参数是指一批消息发送的字节数,消息积累到这么多字节就会发送,默认16384(16KB)。根据自己业务需求,决定是要低延迟还是高吞吐量,可以改小或改大,也可以通过修改数值不断尝试,从而取得延迟与吞吐量的平衡。
    • linger.ms是指延迟毫秒数,默认是0立刻发送,设置数值后在到达指定毫秒数时才会一起发送
    • batch-sizelinger.ms这两个条件都设置时,只要满足其中一个条件,就会发送消息
    • 对于linger.ms,和batch-size不同,没法直接配置,需要使用properties进行配置,还有一些其他参数也是如此

    消息批量消费

    • 主要是listener.type设置为batch,启用批量监听消费
    • max.poll.records,一次批量拉取的数量,默认500,可以根据需要设置大一点,但要注意,如果一次拉取太多,消费不了阻塞了,也会有问题
    • 我这里设置了禁用自动确认enable-auto-commit: false,消息消费后手动确认立刻生效listener.ack-mode: manual_immediate

    配置类

    • Spring-kafka,维护配置文件即可,不需要手动创建bean
    • 此处配置类,是为了在项目启动时,自动创建指定分区数、副本数的topic
    • 如果你只有一个topic或者topic的分区和副本数都是一致的,也可以在kafka的配置文件server.properties里设置,这个配置类就不需要了,只需要设置spring.kafka.listener.missing-topics-fatalfalse即可,未发现topic时不会报错而是自动创建topic,具体可参考我的这篇博客
    package com.newatc.collect.config;
    
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @ClassName KafkaConfiguration
     * @Description kafka配置类,读取配置创建topic
     * @Date 2022-6-28 09:55:26
     * @Author yanyulin
     **/
    @Configuration
    public class KafkaConfiguration {
    
        @Value("${spring.kafka.template.default-topic}")
        private String topic;
    
        @Value("${spring.kafka.template.patitions}")
        private Integer patitions;
    
        @Value("${spring.kafka.template.replications}")
        private Short replications;
    
        public String getTopic() {
            return topic;
        }
    
        public void setTopic(String topic) {
            this.topic = topic;
        }
    
        public Integer getPatitions() {
            return patitions;
        }
    
        public void setPatitions(Integer patitions) {
            this.patitions = patitions;
        }
    
        public Short getReplications() {
            return replications;
        }
    
        public void setReplications(Short replications) {
            this.replications = replications;
        }
    
        /**
         * 项目启动时,自动创建topic,指定分区和副本数量
         * @return Topic
         */
        @Bean
        public NewTopic topic() {
            return new NewTopic(topic, patitions, replications);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    消息生产者

    • Spring kafka集成的很好,很多东西都不需要我们做了,直接使用KafkaTemplate即可
    package com.newatc.collect.config;
    
    import com.newatc.collect.util.PartitionEnum;
    import javax.annotation.Resource;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * @ClassName KafkaProducer
     * @Description kafka信息发送类
     * @Date 2022-6-27 11:20:12
     * @Author yanyulin
     **/
    @Component
    public class KafkaProducer {
    
        private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    
        @Resource
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Value("${spring.kafka.template.default-topic}")
        private String topic;
    
        /**
         * 将雷达上报数据,发到kafka队列里
         * @param type
         * @param data
         */
        public void sendData(String type, String data) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, PartitionEnum.getMap().get(type), type, data);
            kafkaTemplate.send(producerRecord);
        }
    
        /**
         * 将雷达上报数据,发到kafka队列里
         * @param type
         * @param message
         */
        public void sendMessage(String type, String message) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, PartitionEnum.getMap().get(type), type, message);
            kafkaTemplate.send(producerRecord);
            log.debug("发送 {} 数据到kafka : {}", type, message);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    调用生产者发送消息

    • 使用也很简单,把KafkaProducer依赖注入即可
        @Autowired
        private KafkaProducer producer;
        
        producer.sendData(PartitionEnum.FLOW_STATS.getType(), JSONObject.toJSONString(flowStats));
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    消息消费者

    • 消息消费者也很简单,使用注解@KafkaListener即可
    • 可以指定消费的topicpartition
    @Component
    public class KafkaConsumer {
        /**
         * 雷达实时轨迹数据加载呈现<1s,即时数据立刻消费
         * @param records
         * @param ack
         */
        @KafkaListener(
            containerGroup = "${spring.kafka.consumer.group-id}",
            topicPartitions = { @TopicPartition(topic = "${spring.kafka.template.default-topic}", partitions = { "0" }) }
        )
        public void receiverRealTimeDataRecord(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
            int size = records.size();
            log.debug("RealTimeData RECV MSG COUNT: {}\n", size);
            List<String> data = new ArrayList<>();
            for (ConsumerRecord<String, String> consumerRecord : records) {
                data.add(consumerRecord.value());
            }
            realTimeDataClickHouseService.saveAll(data);
            log.debug("\n[RealTimeData] {} 消费完成", size);
            //确认单当前消息(及之前的消息)offset均已被消费完成
            ack.acknowledge();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 如果对你有帮助,希望可以给一个免费的赞
  • 相关阅读:
    Java.lang.Class类 isSynthetic()方法有什么功能呢?
    部署基于efk+logstash+kafka构建日志收集平台并对nginx日志进行分析【待执行】
    java计算机毕业设计酒店信息管理源码+mysql数据库+系统+lw文档+部署
    监听redis键失效事件实现延迟功能
    【wespeaker】模型ECAPA_TDNN介绍
    Go 中的方法
    openlayer绘制过程添加提示文字
    论文阅读——ViTAE
    基于springboot小型车队管理系统毕业设计源码061709
    Xshell远程连接配置 Ubuntu 18.04.6 + Anaconda + CUDA + Cudnn + Pytorch(GPU+CPU)
  • 原文地址:https://blog.csdn.net/u010882234/article/details/125548598