• 消费者提交已消费的偏移量


    1.概述

      消费者而在消费了消息之后会把消费的offset提交到 __consumer_offsets-的内置Topic中;每个消费者组都有维护一个当前消费者组的offset。那么问题来了: 消费组什么时候把offset更新到broker中的分区中呢?

    Kafka消费者的配置信息

    Name描述default
    enable.auto.commit如果为true,消费者的offset将在后台周期性的提交true
    auto.commit.interval.ms如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)5000

    2.自动提交偏移量

    消费者端开启了自动提交之后,每隔auto.commit.interval.ms自动提交一次;

    public static void main(String[] args) {
        //创建kafka消费者配置对象以及配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        props.put("group.id", "hy-local-consumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "5000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //创建kafka消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //消费消息
        kafkaConsumer.subscribe(Arrays.asList("hy1-test-topic"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
    
    /*
    output:
    ------offset-- = 5, key = null, value = NBA
    ------offset-- = 4, key = null, value = CBA
    ------offset-- = 5, key = null, value = CUBA
    ------offset-- = 6, key = null, value = NCAA
    ------offset-- = 6, key = null, value = ABA
    ------offset-- = 5, key = null, value = NBL
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    假如Consumer在获取了消息消费成功但是在提交offset之前服务挂掉了,会出现什么情况?

    答:重复消费

    3.手动提交偏移量

      虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。

    手动提交 offset 的方法有两种: commitSync(同步提交)commitAsync(异步 提交)

    • 相同点: 将本次poll 的一批数据最高的偏移量提交
    • 不同点: commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

    3.1 同步提交偏移量

    public static void main(String[] args) {
        //创建kafka消费者配置对象以及配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        props.put("group.id", "hy-local-consumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "5000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //创建kafka消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅消息
        kafkaConsumer.subscribe(Arrays.asList("hy2-test-topic"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value())
            }
            //同步提交,当前线程会阻塞直到 offset 提交成功
            kafkaConsumer.commitSync();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    3.2 异步提交偏移量

      虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞 吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

    public static void main(String[] args) {
        //创建kafka消费者配置对象以及配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        props.put("group.id", "hy-local-consumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "5000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //创建kafka消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅消息
        kafkaConsumer.subscribe(Arrays.asList("hy2-test-topic"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            //异步提交
            kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception != null) {
                        System.err.println("异常.....");
                    }
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

      无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费

  • 相关阅读:
    yolov8+多算法多目标追踪+实例分割+目标检测+姿态估计(代码+教程)
    [数据结构] 万字解析排序算法
    多策略协同改进的阿基米德优化算法及其应用(Matlab代码实现)
    Java与Go:基本数据类型
    面试Java后端
    计算机毕业设计Java服装批发进销存系统(源码+系统+mysql数据库+lw文档)
    CSDN竞赛第一期
    日常bug汇总
    Spring框架学习 -- 创建与使用
    腾讯开源混元DiT文生图模型,消费级单卡可推理
  • 原文地址:https://blog.csdn.net/weixin_44852067/article/details/133353278