• kafka—offset偏移量


    一、offset的基本概述

    offset定义:消费者再消费的过程中通过offset来记录消费数据的具体位置

    offset存放的位置:从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic(系统主题)中,名为__consumer_offsets,即offset维护在系统主题中

    说明:__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact(压缩存储),也就是每个 group.id+topic+分区号就保留最新数据

    1.面试题☆☆☆

    问:消费者的offset维护在什么位置

    答:在0.9版本之前维护在zookeeper当中,0.9版本之后维护在系统主题当中

    二、自动提交offset

    为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能

    自动提交offset的相关参数如下:

    • enable.auto.commit:是否开启自动提交offset功能,默认是true
    • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
    package com.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    /**
     * @author wangbo
     * @version 1.0
     */
    
    /**
     * 自动提交offset
     */
    
    public class CustomConsumer_03 {
        public static void main(String[] args) {
            //配置
            Properties properties = new Properties();
    
            //连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性
    
            //反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            //配置消费者组ID 可以任意起
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    
            //自动提交,默认为true采用自动提交,为false则为手动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    
            //提交时间间隔,默认为5000毫秒,即5s。我们修改为2秒
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,2000);
    
            //1.创建一个消费者 "","hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            //2.订阅主题 first3
            ArrayList<String> topics = new ArrayList<String>();
            topics.add("first3");
            kafkaConsumer.subscribe(topics);
    
            //3.消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据
    
                //循环打印消费的数据 consumerRecords.for
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    三、手动提交offset

    虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API

    手动提交offset的方法有两种:分别是commitSync同步提交)和commitAsync异步提交

    • 相同点:都会将本次提交的一批数据最高的偏移量提交
    • 不同点:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败

    比较

    同步提交:必须等待offset提交完毕,再去消费下一批数据 ,由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低

    异步提交:发送完提交offset请求后,就开始消费下一批数据了,由于同步提交 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式

    同步提交和异步提交的API代码

    package com.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    /**
     * @author wangbo
     * @version 1.0
     */
    
    /**
     * offset 同步提交
     */
    
    public class CustomConsumer_04 {
        public static void main(String[] args) {
            //配置
            Properties properties = new Properties();
    
            //连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性
    
            //反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            //配置消费者组ID 可以任意起
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    
            //手动提交 需要将参数改为false
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
    
            //1.创建一个消费者 "","hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            //2.订阅主题 first3
            ArrayList<String> topics = new ArrayList<String>();
            topics.add("first3");
            kafkaConsumer.subscribe(topics);
    
            //3.消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据
    
                //循环打印消费的数据 consumerRecords.for
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
    
                //手动提交offset
                kafkaConsumer.commitSync();     //同步提交
    //            kafkaConsumer.commitAsync();    //异步提交
    
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    四、指定offset位置消费

    问题引入:当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

    可以通过设置offset的消费位置,进行开始消费

    auto.offset.reset = earliest | latest | none 默认是 latest

    • earliest:自动将偏移量重置为最早的偏移量
    • latest(默认值):自动将偏移量重置为最新偏移量
    • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

    API中通过下面参数进行配置

    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    
    • 1

    还有一种是在任意指定 offset 位移开始消费

    package com.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    import java.util.Set;
    
    /**
     * @author wangbo
     * @version 1.0
     */
    
    /**
     * 1. 指定offset位置进行消费
     */
    
    public class CustomConsumer_05 {
        public static void main(String[] args) {
            //配置
            Properties properties = new Properties();
    
            //连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性
    
            //反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            //配置消费者组ID 可以任意起
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    
            //1.创建一个消费者 "","hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            //2.订阅主题 first3
            ArrayList<String> topics = new ArrayList<String>();
            topics.add("first3");
            kafkaConsumer.subscribe(topics);
    
            //指定位置进行消费
            //获取分区信息,返回一个分区集合
            Set<TopicPartition> assignment = kafkaConsumer.assignment();
    
            //保证分区的分配方案指定完毕
            while (assignment.size() == 0){ //说明没有分区分配方案
                kafkaConsumer.poll(Duration.ofSeconds(1));  //通过拉去数据,来获取分区分配方案
    
                //获取分区信息,返回一个分区集合,相当于更新一下
                assignment = kafkaConsumer.assignment();
            }
    
            //遍历分区集合,拿到所有的分区信息,指定消费的offset
            for (TopicPartition topicPartition : assignment) {
                kafkaConsumer.seek(topicPartition,100); //指定offset为100,从100的位置进行消费数据
            }
    
    
            //3.消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据
    
                //循环打印消费的数据 consumerRecords.for
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    五、指定时间消费

    需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

    package com.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.*;
    
    /**
     * @author wangbo
     * @version 1.0
     */
    
    /**
     * 1. 指定时间消费,把时间转换为对应的offset
     */
    
    public class CustomConsumer_06 {
        public static void main(String[] args) {
            //配置
            Properties properties = new Properties();
    
            //连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性
    
            //反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            //配置消费者组ID 可以任意起
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    
            //1.创建一个消费者 "","hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            //2.订阅主题 first3
            ArrayList<String> topics = new ArrayList<String>();
            topics.add("first3");
            kafkaConsumer.subscribe(topics);
    
            //==============================================================================================================
            //指定位置进行消费
            //获取分区信息,返回一个分区集合
            Set<TopicPartition> assignment = kafkaConsumer.assignment();
    
            //保证分区的分配方案指定完毕
            while (assignment.size() == 0){ //说明没有分区分配方案
                kafkaConsumer.poll(Duration.ofSeconds(1));  //通过拉去数据,来获取分区分配方案
    
                //获取分区信息,返回一个分区集合,相当于更新一下
                assignment = kafkaConsumer.assignment();
            }
    
            //--------------------------------------------------------------------------------------------------------------
    
            //把时间转换为对应的offset
            //Key:TopicPartition主题分区  value:对应的时间
            HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();
    
            //封装对应的集合,对集合中添加数据
            for (TopicPartition topicPartition : assignment) {
                topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1*24*3600*1000);  //当前时间-1天的时间 = 一天前的时间
            }
    
            //需要传入一个Map集合
            Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
    
            //--------------------------------------------------------------------------------------------------------------
    
            //遍历分区集合,拿到所有的分区信息,指定消费的offset
            for (TopicPartition topicPartition : assignment) {
                OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);//获取集合中分区对应的value值,下面通过offset()方法进行转换
    
                kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset()); //指定offset为100,从100的位置进行消费数据
            }
    
            //==============================================================================================================
    
            //3.消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据
    
                //循环打印消费的数据 consumerRecords.for
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    六、消费者事物

    问题引入:

    1. 重复消费:已经消费了数据,但是 offset 没提交
    2. 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费

    为了解决以上问题,保证数据的精确一次性消费,需要使用消费者事物的方式进行处理

    如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定,跟生产者事物类似 p57

    七、数据积压(提高吞吐量)

    1. 如果是Kafka消费能力不足,则可以考虑增 加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
    2. 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压
  • 相关阅读:
    ubuntu20.04在docker下运行ros-noetic进行开发
    NISP和CISP网络安全高级运维工程师需要掌握的应急响应有什么方向
    SwiftUI 为不同视图限制不同的屏幕旋转方向
    虚拟化技术 - I/O虚拟化 [二]
    达梦日志分析工具DMLOG使用
    【mmCEsim】开源项目预告:毫米波信道估计仿真软件
    最适合运动的耳机类型是什么、最适合运动的耳机推荐
    Selenium环境+元素定位大法
    【重拾C语言】七、指针(三)指针与字符串(字符串与字符串数组;指针与字符串的遍历、拷贝、比较;反转字符串)
    idea pom导入net.sf.json的jar包失败
  • 原文地址:https://blog.csdn.net/weixin_44604159/article/details/127553794