Kafka消费者提交Offset的策略有
Kafka消费者默认是自动提交Offset的策略
可设置自动提交的时间间隔
- package com.demo.lxb.kafka;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.Properties;
-
- /**
- * @Description: kafka消费者消费消息,自动提交offset
- * @Author: lvxiaobu
- * @Date: 2023-10-24 16:26
- **/
- public class MyConsumerAutoSubmitOffset {
-
- private final static String CONSUMER_GROUP_NAME = "GROUP1";
- private final static String TOPIC_NAME = "topic0921";
-
- public static void main(String[] args) {
- Properties props = new Properties();
-
- // 一、设置参数
- // 配置kafka地址
- // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- // "192.168.151.28:9092"); // 单机配置
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置
- // 配置消息 键值的序列化规则
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- // 配置消费者组
- props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
-
- // 设置消费者offset的提交方式
- // 自动提交:默认配置
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
- // 自动提交offset的时间间隔
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
-
- // 二、创建消费者
- KafkaConsumer
consumer = new KafkaConsumer(props); - // 三、消费者订阅主题
- consumer.subscribe(Arrays.asList(TOPIC_NAME));
-
- // 四、拉取消息,开始消费
- while (true){
- // 从kafka集群中拉取消息
- ConsumerRecords
records = consumer.poll(Duration.ofMillis(1000)); - // 消费消息,当前是自动提交模式,在消息上一行消息被拉取下来以后,offset就自动被提交了,下面的代码如果出错,或者此时
- // 消费者挂掉了,那么消费其实是没有进行消费的(也就是业务逻辑处理)
- for (ConsumerRecord
record : records) { - System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
- + ", key值: " + record.key() + " , value值: "+record.value());
- }
- }
- }
- }
上述代码中的如下代码是自动提交策略的相关设置
- // 设置消费者offset的提交方式
- // 自动提交:默认配置
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
- // 自动提交offset的时间间隔
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
3.1、手动同步提交策略
手动同步提交,会在提交offset处阻塞。当消费者接收到 kafka集群返回的消费者提交offset成功的ack后,才开始执行消费者中后续的代码。
因为使用异步提交容易丢失消息,固一般使用同步提交,在同步提交后不要再做其他逻辑处理。
- package com.demo.lxb.kafka;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.Properties;
-
- /**
- * @Description: kafka消费者消费消息,手动同步提交offset
- * @Author: lvxiaobu
- * @Date: 2023-10-24 16:26
- **/
- public class MyConsumerMauSubmitOffset {
-
- private final static String CONSUMER_GROUP_NAME = "GROUP1";
- private final static String TOPIC_NAME = "topic0921";
-
- public static void main(String[] args) {
- Properties props = new Properties();
-
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094");
-
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
-
- // 关键代码:关闭自动提交
- // 手动提交offset
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
- // 自动提交offset的时间间隔:此时不再需要设置该值
- // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
-
- KafkaConsumer
consumer = new KafkaConsumer(props); -
- consumer.subscribe(Arrays.asList(TOPIC_NAME));
-
- while (true){
- ConsumerRecords
records = consumer.poll(Duration.ofMillis(1000)); - for (ConsumerRecord
record : records) { - System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
- + ", key值: " + record.key() + " , value值: "+record.value());
- }
-
- // 关键代码:commitSync():同步提交方法
- // 同步方式提交,此时会产生阻塞,当kafka集群返回了提交成功的ack以后,才会消除阻塞,进行后续的代码逻辑。
- // 一般使用同步提交,在同步提交后不再做其他逻辑处理
- consumer.commitSync();
-
- // do anything
- }
- }
- }
3.2、手动异步提交策略
异步提交,不会在提交offset代码处阻塞,即消费者提交了offset后,不需要等待kafka集群返回的ack即可继续执行后续代码。但是在提交offset时需要提供一个回调方法,供kafka集群回调,来告诉消费者提交offset的结果。
- package com.demo.lxb.kafka;
-
- import com.alibaba.fastjson.JSONObject;
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.Map;
- import java.util.Properties;
-
- /**
- * @Description: kafka消费者消费消息,手动异步提交offset
- * @Author: lvxiaobu
- * @Date: 2023-10-24 16:26
- **/
- public class MyConsumerMauSubmitOffset2 {
-
- private final static String CONSUMER_GROUP_NAME = "GROUP1";
- private final static String TOPIC_NAME = "topic0921";
-
- public static void main(String[] args) {
- Properties props = new Properties();
-
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
-
- // 关键代码:关闭自动提交
- // 手动提交offset
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
- // 自动提交offset的时间间隔:此时不再需要设置该值
- // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
-
- KafkaConsumer
consumer = new KafkaConsumer(props); -
- consumer.subscribe(Arrays.asList(TOPIC_NAME));
-
- while (true){
- ConsumerRecords
records = consumer.poll(Duration.ofMillis(1000)); -
- for (ConsumerRecord
record : records) { - System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
- + ", key值: " + record.key() + " , value值: "+record.value());
- }
- // 关键代码:commitAsync() 异步提交
- // new OffsetCommitCallback是kafka集群会回调的方法,告诉消费者提交offset的结果
- consumer.commitAsync(new OffsetCommitCallback() {
- @Override
- public void onComplete(Map
map, Exception e) { - if(e != null){
- // 可将提交失败的消息记录到日志
- System.out.println("记录提交offset失败的消息到日志");
- System.out.println("消费者提交offset抛出异常:" + Arrays.toString(e.getStackTrace()));
- System.out.println("消费者提交offset异常的消息信息:" + JSONObject.toJSONString(map));
- }
- }
- });
-
- // 后续逻辑处理,不需要等到kafka集群返回了提交成功的ack以后才开始处理。
- //do anything
-
- }
- }
- }