• kafka的auto.offset.reset详解与测试


    1. 取值及定义#
      auto.offset.reset有以下三个可选值:

    latest (默认)
    earliest
    none
    三者均有共同定义:
    对于同一个消费者组,若已有提交的offset,则从提交的offset开始接着消费

    意思就是,只要这个消费者组消费过了,不管auto.offset.reset指定成什么值,效果都一样,每次启动都是已有的最新的offset开始接着往后消费

    不同的点为:

    latest(默认):对于同一个消费者组,若没有提交过offset,则只消费消费者连接topic后,新产生的数据
    就是说如果这个topic有历史消息,现在新启动了一个消费者组,且auto.offset.reset=latest,此时已存在的历史消息无法消费到,那保持消费者组运行,如果此时topic有新消息进来了,这时新消息才会被消费到。而一旦有消费,则必然会提交offset
    这时候如果该消费者组意外下线了,topic仍然有消息进来,接着该消费者组在后面恢复上线了,它仍然可以从下线时的offset处开始接着消费,此时走的就是共同定义

    earliest:对于同一个消费者组,若没有提交过offset,则从头开始消费
    就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费,这就是与latest不同之处。
    一旦该消费者组消费过topic后,此时就有该消费者组的offset了,这种情况下即使指定了auto.offset.reset=earliest,再重新启动该消费者组,效果是与latest一样的,也就是此时走的是共同的定义

    none:对于同一个消费者组,若没有提交过offset,会抛异常
    一般生产环境基本用不到该参数

    1. 新建全新topic#
      ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic TestOffsetResetTopic --partitions 1 --replication-factor 1 --create

    2. 往新建的topic发送消息#
      便于测试,用Java代码发送5条消息

    public class TestProducer {

    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer producer = new KafkaProducer<>(properties);
    
        String topic = "TestOffsetResetTopic";
    
        for (int i = 0; i < 5; i++) {
            String value = "message_" + i + "_" + LocalDateTime.now();
            System.out.println("Send value: " + value);
            producer.send(new ProducerRecord<>(topic, value), (metadata, exception) -> {
                if (exception == null) {
                    String str = MessageFormat.format("Send success! topic: {0}, partition: {1}, offset: {2}", metadata.topic(), metadata.partition(), metadata.offset());
                    System.out.println(str);
                }
            });
            Thread.sleep(500);
        }
    
        producer.close();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    }

    发送消息成功:

    Send value: message_0_2022-09-16T18:26:15.943749600
    Send success! topic: TestOffsetResetTopic, partition: 0, offset: 0
    Send value: message_1_2022-09-16T18:26:17.066608900
    Send success! topic: TestOffsetResetTopic, partition: 0, offset: 1
    Send value: message_2_2022-09-16T18:26:17.568667200
    Send success! topic: TestOffsetResetTopic, partition: 0, offset: 2
    Send value: message_3_2022-09-16T18:26:18.069093600
    Send success! topic: TestOffsetResetTopic, partition: 0, offset: 3
    Send value: message_4_2022-09-16T18:26:18.583288100
    Send success! topic: TestOffsetResetTopic, partition: 0, offset: 4

    现在TestOffsetResetTopic这个topic有5条消息,且还没有任何消费者组对其进行消费过,也就是没有任何offset

    1. 测试latest#
      已知topic已经存在5条历史消息,此时启动一个消费者

    public class TestConsumerLatest {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 指定消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        // 设置 auto.offset.reset
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    
        String topic = "TestOffsetResetTopic";
        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));
    
        // 消费数据
        while (true) {
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    }

    发现如上面所述,历史已存在的5条消息不会消费到,消费者没有任何动静,现在保持消费者在线

    启动TestProducer再发5条消息,会发现这后面新发的,offset从5开始的消息就被消费了

    ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1663329725731, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_0_2022-09-16T20:02:05.523581500)
    ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1663329726251, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_1_2022-09-16T20:02:06.251399400)
    ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1663329726764, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_2_2022-09-16T20:02:06.764186200)
    ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1663329727264, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_3_2022-09-16T20:02:07.264268500)
    ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1663329727778, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_4_2022-09-16T20:02:07.778469700)

    此时该消费者组对于这个topic的offset已经为9了,现在停掉这个消费者(下线),再启动TestProducer发5条消息,接着再启动TestConsumerLatest,会发现紧接上一次的offset之后开始,即从10继续消费

    如果测试发现没动静,请多等一会,估计机器性能太差…

    1. 测试earliest#
      新建一个测试消费者,设置auto.offset.reset为earliest,注意groupid为新的group2,表示对于topic来说是全新的消费者组

    public class TestConsumerEarliest {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 指定消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        // 设置 auto.offset.reset
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
        String topic = "TestOffsetResetTopic";
        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));
    
        // 消费数据
        while (true) {
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    }

    一运行发现已有的10条消息(最开始5条加上面一次测试又发了5条,一共10条)是可以被消费到的,且消费完后,对于这个topic就已经有了group2这个组的offset了,无论之后启停,只要groupid不变,都会从最新的offset往后开始消费

    1. 测试none#
      新建一个测试消费者,设置auto.offset.reset为none,注意groupid为新的group3,表示对于topic来说是全新的消费者组

    public class TestConsumerNone {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 指定消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group3");
        // 设置 auto.offset.reset
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
    
        String topic = "TestOffsetResetTopic";
        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));
    
        // 消费数据
        while (true) {
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    }

    一运行,程序报错,因为对于topic来说是全新的消费者组,且又指定了auto.offset.reset为none,直接抛异常,程序退出

    Exception in thread “main” org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [TestOffsetResetTopic-0]
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:706)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2434)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1266)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at kakfa.TestConsumerNone.main(TestConsumerNone.java:31)

    1. 总结#
      如果topic已经有历史消息了,又需要消费这些历史消息,则必须要指定一个从未消费过的消费者组,同时指定auto.offset.reset为earliest,才可以消费到历史数据,之后就有提交offset。有了offset,无论是earliest还是latest,效果都是一样的了。
      如果topic没有历史消息,或者不需要处理历史消息,那按照默认latest即可。
  • 相关阅读:
    Java if VS switch
    Golang sync.WaitGroup
    ROS 摄像头标定-camera_calibration
    基于数据库实现全局唯一ID
    Go语言用Colly库编写的图像爬虫程序
    cpolar内网穿透
    MM32F0140 UART1空闲中断接收
    怪兽存活概率问题
    STM32G0开发笔记-Platformio+libopencm3-按键和外部中断
    vue3 的组件通信以及ref的使用
  • 原文地址:https://blog.csdn.net/Huangjiazhen711/article/details/126902421