• Kafka - 15 Kafka Offset | 自动和手动提交Offset | 指定Offset消费 | 漏消费和重复消费 | 消息积压


    1. Offset 的默认维护位置

    Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中。从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets

    在这里插入图片描述

    __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

    在这里插入图片描述

    消费 offset 案例:

    __consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。但是需要在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

    ① 创建一个主题topic

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 3 --topic topic02
    
    • 1

    ② 启动消费者消费数据

    # 指定消费者组
    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic topic02 --group test1 --from-beginning
    hello,kafka
    hello,zhangsan
    hello,lisi
    hello
    hello,wangwu
    hello,haha
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ③ 启动生产者生产数据

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic topic02
    >hello,kafka
    >hello,zhangsan
    >hello,lisi
    >hello
    >hello,wangwu
    >hello,haha
    >
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ④ 查看消费者消费主题__consumer_offsets

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
    
    # key 是 group.id+topic+分区号,value 就是当前 offset 的值
    [test1,topic02,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1670144851463, expireTimestamp=None)
    [test1,topic02,0]::OffsetAndMetadata(offset=6, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1670144891473, expireTimestamp=None)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. 自动提交 Offset

    为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

    自动提交offset的相关参数:
    enable.auto.commit:是否开启自动提交offset功能,默认是true
    auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

    在这里插入图片描述

    在这里插入图片描述

    消费者自动提交 offset :

    ① 创建一个主题topic

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 3 --topic nini
    
    • 1

    ② 启动消费者消费数据

    public class CustomConsumer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 配置分区分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
            // 创建消费者组,组名任意起名都可以
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
            // 自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
            // 提交时间间隔,默认为5s,修改为1s
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
    
            // 创建消费者
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
            // 订阅主题
            ArrayList<String> topics = new ArrayList<>();
            topics.add("nini");
            consumer.subscribe(topics);
    
            // 消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("分区:"+consumerRecord.partition()+",消息:"+consumerRecord.value());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    ③ 启动生产者生产数据

    public class CustomProducerCallbackPartitions {
        public static void main(String[] args) throws InterruptedException {
            // kafka生产者属性配置
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            // 添加自定义分区器
            // properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hh.producer.MyPartitioner");
    
            // kafka生产者
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            for(int i=0;i<5;i++){
                kafkaProducer.send(new ProducerRecord<>("nini" ,"hello,kafka"), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                        if(exception==null){
                            // 消息发送成功
                            System.out.println("主题"+recordMetadata.topic()+",发往的分区:"+recordMetadata.partition());
                        }else{
                            // 消息发送失败
                            exception.printStackTrace();
                        }
                    }
                });
                Thread.sleep(2);
            }
            // 关闭资源
            kafkaProducer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在这里插入图片描述

    ④ 查看自动提交的 Offset:

    # 每隔1秒提交一次offet
    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
    
    [group-1,nini,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670146009308, expireTimestamp=None)
    [group-1,nini,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670146010308, expireTimestamp=None)
    [group-1,nini,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670146011309, expireTimestamp=None)
    [group-1,nini,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670146012310, expireTimestamp=None)
    [group-1,nini,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670146013311, expireTimestamp=None)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3. 手动提交 Offset

    虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。

    手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交):

    commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
    commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

    两者的相同点是,都会将本次提交的一批数据最

    高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

    在这里插入图片描述

    1. 同步提交 offset

    由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。

    public class CustomConsumer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 配置分区分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
            // 创建消费者组,组名任意起名都可以
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
            // 自动提交关闭
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            // 创建消费者
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
            // 订阅主题
            ArrayList<String> topics = new ArrayList<>();
            topics.add("nini");
            consumer.subscribe(topics);
    
            // 消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("分区:"+consumerRecord.partition()+",消息:"+consumerRecord.value());
                }
    
                // 手动同步提交offset
                consumer.commitSync();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在这里插入图片描述

    2. 异步提交 offset

    虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

    public class CustomConsumer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 配置分区分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
            // 创建消费者组,组名任意起名都可以
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
            // 自动提交关闭
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            // 创建消费者
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
            // 订阅主题
            ArrayList<String> topics = new ArrayList<>();
            topics.add("nini");
            consumer.subscribe(topics);
    
            // 消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("分区:"+consumerRecord.partition()+",消息:"+consumerRecord.value());
                }
    
                // 手动同步提交offset
                consumer.commitAsync();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在这里插入图片描述

    4. 指定 Offset 消费

    auto.offset.reset = earliest | latest | none 默认是 latest

    当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

    earliest:自动将偏移量重置为最早的偏移量,–from-beginning

    latest(默认值):自动将偏移量重置为最新偏移量。

    none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

    在这里插入图片描述

    任意指定 offset 位移开始消费:

    ① 查看消费者消费主题__consumer_offsets

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
    
    [group-3,nini,0]::OffsetAndMetadata(offset=60, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670149746455, expireTimestamp=None)
    
    • 1
    • 2
    • 3

    可以看到消费者组为group-3,主题为 nini 的消费Offset = 60,所以我们可以指定消费者从Offset=55开始消费:

    public class CustomConsumer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 创建消费者组,组名任意起名都可以
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-3");
    
            // 创建消费者
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
            // 订阅主题
            ArrayList<String> topics = new ArrayList<>();
            topics.add("nini");
            consumer.subscribe(topics);
    
            // 获取所有的分区信息
            Set<TopicPartition> assignment= new HashSet<>();
            while (assignment.size() == 0) {
                consumer.poll(Duration.ofSeconds(1));
                // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
                assignment = consumer.assignment();
            }
    
            // 遍历所有分区,并指定 offset 从55的位置开始消费
            for (TopicPartition tp: assignment) {
                consumer.seek(tp, 55);
            }
    
            // 消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("分区:"+consumerRecord.partition()+",消息:"+consumerRecord.value());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    在这里插入图片描述

    5. 指定时间消费

    需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

    public class CustomConsumer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 创建消费者组,组名任意起名都可以
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-3");
    
            // 创建消费者
            KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
    
            // 订阅主题
            ArrayList<String> topics = new ArrayList<>();
            topics.add("nini");
            kafkaConsumer.subscribe(topics);
    
            Set<TopicPartition> assignment = new HashSet<>();
            while (assignment.size() == 0) {
                kafkaConsumer.poll(Duration.ofSeconds(1));
                // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
                assignment = kafkaConsumer.assignment();
            }
            HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
            // 封装集合存储,每个分区对应一天前的数据
            for (TopicPartition topicPartition : assignment) {
                timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
            }
            // 获取从 1 天前开始消费的每个分区的 offset
            Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
            // 遍历每个分区,对每个分区设置消费时间。
            for (TopicPartition topicPartition : assignment) {
                OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
                // 根据时间指定开始消费的位置
                if (offsetAndTimestamp != null){
                    kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
                }
            }
            
            // 消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("分区:"+consumerRecord.partition()+",消息:"+consumerRecord.value());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    6. 漏消费和重复消费

    重复消费:已经消费了数据,但是 offset 没提交。

    漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

    ① 场景1:重复消费,自动提交Offset引起的

    在这里插入图片描述

    ② 场景1:漏消费。设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。

    在这里插入图片描述

    思考:怎么能做到既不漏消费也不重复消费呢?需要使用消费者事务。

    7. 消费者事务

    如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定 。 此 时我们需要将 Kafka 的 offset 保存到支持事务的自定义介质( 比如MySQL)。

    在这里插入图片描述

    8. 数据积压(消费者如何提高吞吐量)

    如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)

    在这里插入图片描述

    如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。从一次最多拉取500条,调整为一次最多拉取1000条。

    在这里插入图片描述

    在这里插入图片描述

  • 相关阅读:
    Redis 双写一致原理篇
    iPhone 15秋季发布会召开,媒介盒子多家媒体持续报道
    SpringBoot+MyBatisPlus+Redis+Jwt+Shiro+Vue 完整博客文章管理前后端实战
    Linux——网络配置(重点)
    Unity6 URP17使用初探
    【网络编程】深入理解TCP协议二(连接管理机制、WAIT_TIME、滑动窗口、流量控制、拥塞控制)
    牛客Mysql——SQL必知必会
    DDD 实战 (2):看看代码结构长啥样(值得收藏)
    Makefile 基础教程:从零开始学习
    ES6迭代器详细介绍
  • 原文地址:https://blog.csdn.net/qq_42764468/article/details/128175679