• Kafka-Java四:Spring配置Kafka消费者提交Offset的策略


    一、Kafka消费者提交Offset的策略

    Kafka消费者提交Offset的策略有

    1. 自动提交Offset:
      1. 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。
      2. 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了
    2. 手动提交Offset
      1. 消费者在消费消息时/后,再提交offset,在消费者中实现
      2. 手动提交Offset分为:手动同步提交(commitSync)、手动异步提交(commitAsync)
    3. 什么是Offset
      1. 参考文章:Linux:【Kafka三】组件介绍

    二、自动提交策略

            Kafka消费者默认是自动提交Offset的策略

            可设置自动提交的时间间隔

    1. package com.demo.lxb.kafka;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.serialization.StringSerializer;
    7. import java.time.Duration;
    8. import java.util.Arrays;
    9. import java.util.Properties;
    10. /**
    11. * @Description: kafka消费者消费消息,自动提交offset
    12. * @Author: lvxiaobu
    13. * @Date: 2023-10-24 16:26
    14. **/
    15. public class MyConsumerAutoSubmitOffset {
    16. private final static String CONSUMER_GROUP_NAME = "GROUP1";
    17. private final static String TOPIC_NAME = "topic0921";
    18. public static void main(String[] args) {
    19. Properties props = new Properties();
    20. // 一、设置参数
    21. // 配置kafka地址
    22. // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
    23. // "192.168.151.28:9092"); // 单机配置
    24. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
    25. "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置
    26. // 配置消息 键值的序列化规则
    27. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    28. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    29. // 配置消费者组
    30. props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
    31. // 设置消费者offset的提交方式
    32. // 自动提交:默认配置
    33. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
    34. // 自动提交offset的时间间隔
    35. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    36. // 二、创建消费者
    37. KafkaConsumer consumer = new KafkaConsumer(props);
    38. // 三、消费者订阅主题
    39. consumer.subscribe(Arrays.asList(TOPIC_NAME));
    40. // 四、拉取消息,开始消费
    41. while (true){
    42. // 从kafka集群中拉取消息
    43. ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
    44. // 消费消息,当前是自动提交模式,在消息上一行消息被拉取下来以后,offset就自动被提交了,下面的代码如果出错,或者此时
    45. // 消费者挂掉了,那么消费其实是没有进行消费的(也就是业务逻辑处理)
    46. for (ConsumerRecord record : records) {
    47. System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
    48. + ", key值: " + record.key() + " , value值: "+record.value());
    49. }
    50. }
    51. }
    52. }

    上述代码中的如下代码是自动提交策略的相关设置 

    1. // 设置消费者offset的提交方式
    2. // 自动提交:默认配置
    3. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
    4. // 自动提交offset的时间间隔
    5. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

    三、手动提交策略

    3.1、手动同步提交策略

            手动同步提交,会在提交offset处阻塞。当消费者接收到 kafka集群返回的消费者提交offset成功的ack后,才开始执行消费者中后续的代码。

            因为使用异步提交容易丢失消息,固一般使用同步提交,在同步提交后不要再做其他逻辑处理。

    1. package com.demo.lxb.kafka;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.serialization.StringSerializer;
    7. import java.time.Duration;
    8. import java.util.Arrays;
    9. import java.util.Properties;
    10. /**
    11. * @Description: kafka消费者消费消息,手动同步提交offset
    12. * @Author: lvxiaobu
    13. * @Date: 2023-10-24 16:26
    14. **/
    15. public class MyConsumerMauSubmitOffset {
    16. private final static String CONSUMER_GROUP_NAME = "GROUP1";
    17. private final static String TOPIC_NAME = "topic0921";
    18. public static void main(String[] args) {
    19. Properties props = new Properties();
    20. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
    21. "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094");
    22. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    23. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    24. props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
    25. // 关键代码:关闭自动提交
    26. // 手动提交offset
    27. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    28. // 自动提交offset的时间间隔:此时不再需要设置该值
    29. // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    30. KafkaConsumer consumer = new KafkaConsumer(props);
    31. consumer.subscribe(Arrays.asList(TOPIC_NAME));
    32. while (true){
    33. ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
    34. for (ConsumerRecord record : records) {
    35. System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
    36. + ", key值: " + record.key() + " , value值: "+record.value());
    37. }
    38. // 关键代码:commitSync():同步提交方法
    39. // 同步方式提交,此时会产生阻塞,当kafka集群返回了提交成功的ack以后,才会消除阻塞,进行后续的代码逻辑。
    40. // 一般使用同步提交,在同步提交后不再做其他逻辑处理
    41. consumer.commitSync();
    42. // do anything
    43. }
    44. }
    45. }

    3.2、手动异步提交策略

            异步提交,不会在提交offset代码处阻塞,即消费者提交了offset后,不需要等待kafka集群返回的ack即可继续执行后续代码。但是在提交offset时需要提供一个回调方法,供kafka集群回调,来告诉消费者提交offset的结果。

    1. package com.demo.lxb.kafka;
    2. import com.alibaba.fastjson.JSONObject;
    3. import org.apache.kafka.clients.consumer.*;
    4. import org.apache.kafka.common.TopicPartition;
    5. import org.apache.kafka.common.serialization.StringSerializer;
    6. import java.time.Duration;
    7. import java.util.Arrays;
    8. import java.util.Map;
    9. import java.util.Properties;
    10. /**
    11. * @Description: kafka消费者消费消息,手动异步提交offset
    12. * @Author: lvxiaobu
    13. * @Date: 2023-10-24 16:26
    14. **/
    15. public class MyConsumerMauSubmitOffset2 {
    16. private final static String CONSUMER_GROUP_NAME = "GROUP1";
    17. private final static String TOPIC_NAME = "topic0921";
    18. public static void main(String[] args) {
    19. Properties props = new Properties();
    20. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
    21. "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094");
    22. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    23. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    24. props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
    25. // 关键代码:关闭自动提交
    26. // 手动提交offset
    27. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    28. // 自动提交offset的时间间隔:此时不再需要设置该值
    29. // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    30. KafkaConsumer consumer = new KafkaConsumer(props);
    31. consumer.subscribe(Arrays.asList(TOPIC_NAME));
    32. while (true){
    33. ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
    34. for (ConsumerRecord record : records) {
    35. System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
    36. + ", key值: " + record.key() + " , value值: "+record.value());
    37. }
    38. // 关键代码:commitAsync() 异步提交
    39. // new OffsetCommitCallback是kafka集群会回调的方法,告诉消费者提交offset的结果
    40. consumer.commitAsync(new OffsetCommitCallback() {
    41. @Override
    42. public void onComplete(Map map, Exception e) {
    43. if(e != null){
    44. // 可将提交失败的消息记录到日志
    45. System.out.println("记录提交offset失败的消息到日志");
    46. System.out.println("消费者提交offset抛出异常:" + Arrays.toString(e.getStackTrace()));
    47. System.out.println("消费者提交offset异常的消息信息:" + JSONObject.toJSONString(map));
    48. }
    49. }
    50. });
    51. // 后续逻辑处理,不需要等到kafka集群返回了提交成功的ack以后才开始处理。
    52. //do anything
    53. }
    54. }
    55. }

  • 相关阅读:
    使用idea时,光标变成了不能按空格键,只能修改的vim格式,怎么切换回正常光标
    DBPack 限流熔断功能发布说明
    模型压缩(二)yolov5剪枝
    虹科案例 | AR内窥镜手术应用为手术节约45分钟?
    华为od(D卷)亲子游戏
    Vue/Vuex (mutations) 核心概念 方法说明、辅助函数 mapMutations使用方法
    接了一个2000块的小活,大家进来看看值不值,附源码
    PROSS程序设计
    idea和maven组合使用
    【迪文屏幕】开发资料
  • 原文地址:https://blog.csdn.net/qq_36769100/article/details/134029496