• kafka的Java客户端-offset


    kafka的Java客户端-offset

    一、offset的默认维护位置

    在这里插入图片描述

    __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+
    分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行
    compact,也就是每个 group.id+topic+分区号就保留最新数据

    二、自动提交offset

    为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能

    自动提交offset的相关参数:

    参数名称描述
    enable.auto.commit默认值为 true,消费者会自动周期性地向服务器提交偏移量
    auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s

    在这里插入图片描述

    public class CustomConsumerAutoOffset {
    
        public static void main(String[] args) {
    
            // 1.创建消费者的配置对象
            Properties properties = new Properties();
            // 2.给消费者配置对象添加参数
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
            //显示设置偏移量自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
            //设置偏移量提交时间间隔
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
            // 3.配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 4.配置消费者组(组名任意起名) 必须
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
            // 创建消费者对象
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 注册要消费的主题(可以消费多个主题)
            kafkaConsumer.subscribe(Collections.singletonList("first"));
    
            // 拉取数据打印
            while (true) {
                // 设置 1s 中消费一批数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
                // 打印消费到的数据
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    三、手动交提交offset

    虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API

    手动提交offset的方法有两种:分别是commitSync(同步提交)commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败

    • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据
    • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了

    同步提交 offset

    由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低

    public class CustomConsumerByHandSync {
    
        public static void main(String[] args) {
    
    
            // 1.创建消费者的配置对象
            Properties properties = new Properties();
            // 2.给消费者配置对象添加参数
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
            //设置手动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
            // 3.配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 4.配置消费者组(组名任意起名) 必须
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    
            // 创建消费者对象
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 注册要消费的主题(可以消费多个主题)
            kafkaConsumer.subscribe(Collections.singletonList("first"));
    
            // 拉取数据打印
            while (true) {
                // 设置 1s 中消费一批数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
                // 打印消费到的数据
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
                }
                //同步提交
                kafkaConsumer.commitSync();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    异步提交offser

    虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset的方式

    public class CustomConsumerByHandSync {
    
        public static void main(String[] args) {
    
    
            // 1.创建消费者的配置对象
            Properties properties = new Properties();
            // 2.给消费者配置对象添加参数
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
            //设置手动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
            // 3.配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 4.配置消费者组(组名任意起名) 必须
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    
            // 创建消费者对象
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 注册要消费的主题(可以消费多个主题)
            kafkaConsumer.subscribe(Collections.singletonList("first"));
    
            // 拉取数据打印
            while (true) {
                // 设置 1s 中消费一批数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
                // 打印消费到的数据
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
                }
                //异步提交
                kafkaConsumer.commitAsync();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    四、指定 Offset 消费

    auto.offset.reset = earliest | latest | none 默认是 latest

    • earliest:自动将偏移量重置为最早的偏移量
    • latest(默认值):自动将偏移量重置为最新偏移量
    • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

    在这里插入图片描述

    public class CustomConsumerSeek {
    
        public static void main(String[] args) {
    
            // 1.创建消费者的配置对象
            Properties properties = new Properties();
    
            // 2.给消费者配置对象添加参数
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
            //显示设置偏移量自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
            //设置偏移量提交时间间隔
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
            // 3.配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 4.配置消费者组(组名任意起名) 必须
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    
            // 创建消费者对象
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 注册要消费的主题(可以消费多个主题)
            kafkaConsumer.subscribe(Collections.singletonList("first"));
    
            // 指定位置进行消费
            Set<TopicPartition> assignment = kafkaConsumer.assignment();
    
            //  保证分区分配方案已经制定完毕
            while (assignment.size() == 0){
                kafkaConsumer.poll(Duration.ofSeconds(1));
                assignment = kafkaConsumer.assignment();
            }
    
            // 指定消费的offset
            for (TopicPartition topicPartition : assignment) {
                kafkaConsumer.seek(topicPartition,6);
            }
    
            // 拉取数据打印
            while (true) {
                // 设置 1s 中消费一批数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
                // 打印消费到的数据
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
                }
            }
    
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    在这里插入图片描述

    五、指定时间消费

    在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

    public class CustomConsumerSeekTime {
    
        public static void main(String[] args) {
    
            // 1.创建消费者的配置对象
            Properties properties = new Properties();
    
            // 2.给消费者配置对象添加参数
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
            //显示设置偏移量自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
            //设置偏移量提交时间间隔
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
            // 3.配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 4.配置消费者组(组名任意起名) 必须
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    
            // 创建消费者对象
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 注册要消费的主题(可以消费多个主题)
            kafkaConsumer.subscribe(Collections.singletonList("first"));
    
            // 指定位置进行消费
            Set<TopicPartition> assignment = kafkaConsumer.assignment();
    
            //  保证分区分配方案已经制定完毕
            while (assignment.size() == 0){
                kafkaConsumer.poll(Duration.ofSeconds(1));
                // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
                assignment = kafkaConsumer.assignment();
            }
    
            // 希望把时间转换为对应的offset
            HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();
    
            // 封装集合存储,每个分区对应一天前的数据
            for (TopicPartition topicPartition : assignment) {
                topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 24 * 3600 * 1000);
            }
            // 获取从 1 天前开始消费的每个分区的 offset
            Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
    
            // 遍历每个分区,对每个分区设置消费时间
            for (TopicPartition topicPartition : assignment) {
                OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
                // 根据时间指定开始消费的位置
                kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
            }
    
            // 拉取数据打印
            while (true) {
                // 设置 1s 中消费一批数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
                // 打印消费到的数据
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
  • 相关阅读:
    vue2中,vue-easytable组件的使用(三)——实现表格的虚拟滚动功能
    【附源码】Python计算机毕业设计社区新冠疫苗接种管理系统
    docker网络管理与资源控制
    记一次 .NET某工控 宇宙射线 导致程序崩溃分析
    【数据结构】搜索树 与 Java集合框架中的Set,Map
    算法竞赛进阶指南 基本算法 0x03 前缀和与差分
    匈牙利算法
    【算法基础】基础算法(二)--(高精度、前缀和、差分)
    第十三天到达终点数字
    【藏经阁一起读】(68)__《ECS技术实战指南》
  • 原文地址:https://blog.csdn.net/weixin_43296313/article/details/125525618