- package nj.zb.kb23.kafka;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
-
- /*
- * 单线程
- */
- public class MyConsumer {
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
- /*
- earliest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,从头开始消费
- latest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,消费新产生的数据
- none: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,抛出异常
- */
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- /*
- * ENABLE_AUTO_COMMIT_CONFIG 设置是否自动提交,获取数据的状态,false手动提交,true自动提交
- * AUTO_COMMIT_INTERVAL_MS_CONFIG 设置提交时间,1000ms
- */
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
- /**
- * 设置消费组
- */
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
- //单线程
- KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties); - kafkaConsumer.subscribe(Collections.singleton("kb23"));
- while (true){
- ConsumerRecords
records = kafkaConsumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord
record: - records){
- System.out.println("topic:"+record.topic()
- +" partition:"+record.partition()
- +" 偏移量:"+record.offset()
- +" value:"+record.value()
- +" 时间戳:"+record.timestamp());
- }
- //设置手动提交 earliest/latest都接着上次的内容继续输出,除非有新消息输入
- kafkaConsumer.commitAsync();
- }
- }
- }
- package nj.zb.kb23.kafka;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
-
- /*
- * 多线程
- */
- public class MyConsumer2 {
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
- /**
- * 设置消费组
- */
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"心心威武");
- //多线程(3个线程)
- for(int i=0;i<=3;i++){
- new Thread(new Runnable() {
- @Override
- public void run() {
- KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties); - kafkaConsumer.subscribe(Collections.singleton("kb23"));
- while(true){
- ConsumerRecords
records = kafkaConsumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord
record : records) { - System.out.println(Thread.currentThread().getName()+
- " topic: "+record.topic()+
- " partition: "+record.partition()+
- " 偏移量: "+record.offset()+
- " value: "+record.value()+
- " 时间戳: "+record.timestamp());
- }
- }
- }
- }).start();
- }
- }
- }
| "C:\Program Files\Java\jdk1.8.0_144\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA 。。。。。。。。。。。。。。。。。。。。。。。。 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Thread-3 topic: kb23 partition: 0 偏移量: 0 value: hello java 时间戳: 1695173593009 Thread-3 topic: kb23 partition: 0 偏移量: 1 value: hello c== 时间戳: 1695173606546 Thread-2 topic: kb23 partition: 1 偏移量: 0 value: dufiudhifch 时间戳: 1695174679229 Thread-1 topic: kb23 partition: 2 偏移量: 0 value: hel 时间戳: 1695173599314 Thread-3 topic: kb23 partition: 0 偏移量: 2 value: djfhjsjkhfk 时间戳: 1695174683054 Thread-1 topic: kb23 partition: 2 偏移量: 1 value: hello world 时间戳: 1695173611446 Thread-2 topic: kb23 partition: 1 偏移量: 1 value: hsdakhskfhak 时间戳: 1695174686318 Thread-1 topic: kb23 partition: 2 偏移量: 2 value: hshcdshcdskc 时间戳: 1695174681057 Thread-3 topic: kb23 partition: 0 偏移量: 3 value: jkfdsajklfjalds 时间戳: 1695174689058 Thread-1 topic: kb23 partition: 2 偏移量: 3 value: dhjfhkshkf 时间戳: 1695174684802 |
- package nj.zb.kb23.kafka;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.HashSet;
- import java.util.Properties;
- import java.util.Set;
-
- /*
- * seek指定开始消费的位置
- */
- public class MyConsumerSeek {
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
- /*
- earliest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,从头开始消费
- latest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,消费新产生的数据
- none: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,抛出异常
- */
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- /*
- * ENABLE_AUTO_COMMIT_CONFIG 设置是否自动提交,获取数据的状态,false手动提交,true自动提交
- * AUTO_COMMIT_INTERVAL_MS_CONFIG 设置提交时间,1000ms
- */
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
- /**
- * 设置消费组
- */
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group3");
- //单线程
- KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties); - kafkaConsumer.subscribe(Collections.singleton("kb23"));
-
- Set
assignment = new HashSet<>(); - while (assignment.size()==0){
- kafkaConsumer.poll(Duration.ofMillis(1000));
- assignment = kafkaConsumer.assignment();
- }
- for (TopicPartition topicPartition :
- assignment) {
- //topic:kb23 tp-0:4 tp-1:5 tp-2:4
- System.out.println(topicPartition.topic()+"\t"+topicPartition.partition());
- if (topicPartition.partition()==0){
- kafkaConsumer.seek(topicPartition,4);
- }else if (topicPartition.partition()==1){
- kafkaConsumer.seek(topicPartition,5);
- }else if (topicPartition.partition()==2){
- kafkaConsumer.seek(topicPartition,4);
- }
- }
- while (true){
- ConsumerRecords
records = kafkaConsumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord
record: - records){
- System.out.println("topic:"+record.topic()
- +" partition:"+record.partition()
- +" 偏移量:"+record.offset()
- +" value:"+record.value()
- +" 时间戳:"+record.timestamp());
- }
- }
- }
- }
| "C:\Program Files\Java\jdk1.8.0_144\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA.。。。。。。。。。。。。。 |