• kafka位移提交


    目录

    前言:

    位移提交: 

    小结:

    参考资料 


    前言:

         Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

    位移提交: 

         提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。

         因为位移提交非常灵活,你完全可以提交任何位移值,但由此产生的后果你也要一并承担。假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20,那么从理论上讲,位移介于 11~19 之间的消息是有可能丢失的;相反地,如果你提交的位移值是 5,那么位移介于 5~9 之间的消息就有可能被重复消费。所以,Kafka 只会“无脑”地接受你提交的位移。

         从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。 

         所谓自动提交,就是指 Kafka Consumer 在后台默默地为你提交位移,作为用户的你完全不必操心这些事;而手动提交,则是指你要自己提交位移,Kafka Consumer 压根不管。

       自动提交的代码示意如下:

    1. Properties props = new Properties();
    2. props.put("bootstrap.servers", "localhost:9092");
    3. props.put("group.id", "test");
    4. props.put("enable.auto.commit", "true");
    5. props.put("auto.commit.interval.ms", "2000");
    6. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    7. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    8. KafkaConsumer consumer = new KafkaConsumer<>(props);
    9. consumer.subscribe(Arrays.asList("foo", "bar"));
    10. while (true) {
    11. ConsumerRecords records = consumer.poll(100);
    12. for (ConsumerRecord record : records)
    13. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    14. }

     和自动提交相反的,就是手动提交了。开启手动提交位移的方法就是设置 enable.auto.commit 为 false。但是,仅仅设置它为 false 还不够,因为你只是告诉 Kafka Consumer 不要自动提交位移而已,你还需要调用相应的 API 手动提交位移。

    最简单的 API 就是 KafkaConsumer#commitSync()。该方法会提交 KafkaConsumer#poll() 返回的最新位移。从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。下面这段代码展示了 commitSync() 的使用方法:

    1. while (true) {
    2. ConsumerRecords records =
    3. consumer.poll(Duration.ofSeconds(1));
    4. process(records); // 处理消息
    5. try {
    6. consumer.commitSync();
    7. } catch (CommitFailedException e) {
    8. handle(e); // 处理提交失败异常
    9. }
    10. }

    可见,调用 consumer.commitSync() 方法的时机,是在你处理完了 poll() 方法返回的所有消息之后。如果你莽撞地过早提交了位移,就可能会出现消费数据丢失的情况。那么你可能会问,自动提交位移就不会出现消费数据丢失的情况了吗?它能恰到好处地把握时机进行位移提交吗?为了搞清楚这个问题,我们必须要深入地了解一下自动提交位移的顺序。

    一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。

     在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

    手动提交位移,它的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然,你可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。 

     为了解决上述同步提交位移的问题,我们可以选择使用异步提交位移的方式KafkaConsumer#commitAsync()

     调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用 commitAsync() 的方法:

    1. while (true) {
    2. ConsumerRecords records =
    3. consumer.poll(Duration.ofSeconds(1));
    4. process(records); // 处理消息
    5. consumer.commitAsync((offsets, exception) -> {
    6. if (exception != null)
    7. handle(exception);
    8. });
    9. }

     commitAsync 是否能够替代 commitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。

     显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能达到最理想的效果,原因有两个:

    • 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
    • 我们不希望程序总处于阻塞状态,影响 TPS。 
    1. try {
    2. while(true) {
    3. ConsumerRecords records =
    4. consumer.poll(Duration.ofSeconds(1));
    5. process(records); // 处理消息
    6. commitAysnc(); // 使用异步提交规避阻塞
    7. }
    8. } catch(Exception e) {
    9. handle(e); // 处理异常
    10. } finally {
    11. try {
    12. consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    13. } finally {
    14. consumer.close();
    15. }
    16. }

    这段代码同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性

     设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。

     就拿刚刚提过的那个例子来说,如何每处理 100 条消息就提交一次位移呢?在这里,我以 commitAsync 为例,展示一段代码,实际上,commitSync 的调用方法和它是一模一样的。

    1. private Map offsets = new HashMap<>();
    2. int count = 0;
    3. ……
    4. while (true) {
    5. ConsumerRecords records =
    6. consumer.poll(Duration.ofSeconds(1));
    7. for (ConsumerRecord record: records) {
    8. process(record); // 处理消息
    9. offsets.put(new TopicPartition(record.topic(), record.partition()),
    10. new OffsetAndMetadata(record.offset() + 1);
    11. if(count % 100 == 0
    12. consumer.commitAsync(offsets, null); // 回调处理逻辑是null
    13. count++;
    14. }
    15. }

    小结:

     Kafka Consumer 的位移提交,是实现 Consumer 端语义保障的重要手段。位移提交分为自动提交和手动提交,而手动提交又分为同步提交和异步提交。在实际使用过程中,推荐你使用手动提交机制,因为它更加可控,也更加灵活。另外,建议你同时采用同步提交和异步提交两种方式,这样既不影响 TPS,又支持自动重试,改善 Consumer 应用的高可用性

    参考资料 

    18 | Kafka中位移提交那些事儿-极客时间

  • 相关阅读:
    记录:2022-9-12 多路归并 超级丑数 败者树 调度算法实现 帧分配 系统抖动 内存映射文件 内核内存分配
    C++中vector用法总结
    洛谷P4799 世界冰球锦标赛
    【SDP协议】
    SpringBoot终幕——日志的输出以及Lombok常用注解
    探索网络爬虫技术:原理、实践与挑战
    C语言描述数据结构 —— 常见排序(3)计数排序、归并排序
    如何使一个盒子水平垂直居中以及如何实现双飞翼(圣杯)布局?
    java基于springboot社区共享食堂订餐信息系统maven
    opensbi firmware源码分析(2)
  • 原文地址:https://blog.csdn.net/weixin_44399827/article/details/132741135