• kafka消费者模式


    一、单线程消费者模式

    1. package nj.zb.kb23.kafka;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.serialization.StringDeserializer;
    7. import java.time.Duration;
    8. import java.util.Collections;
    9. import java.util.Properties;
    10. /*
    11. * 单线程
    12. */
    13. public class MyConsumer {
    14. public static void main(String[] args) {
    15. Properties properties = new Properties();
    16. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
    17. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    18. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    19. /*
    20. earliest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,从头开始消费
    21. latest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,消费新产生的数据
    22. none: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,抛出异常
    23. */
    24. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    25. /*
    26. * ENABLE_AUTO_COMMIT_CONFIG 设置是否自动提交,获取数据的状态,false手动提交,true自动提交
    27. * AUTO_COMMIT_INTERVAL_MS_CONFIG 设置提交时间,1000ms
    28. */
    29. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    30. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    31. /**
    32. * 设置消费组
    33. */
    34. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
    35. //单线程
    36. KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
    37. kafkaConsumer.subscribe(Collections.singleton("kb23"));
    38. while (true){
    39. ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(100));
    40. for (ConsumerRecord record:
    41. records){
    42. System.out.println("topic:"+record.topic()
    43. +" partition:"+record.partition()
    44. +" 偏移量:"+record.offset()
    45. +" value:"+record.value()
    46. +" 时间戳:"+record.timestamp());
    47. }
    48. //设置手动提交 earliest/latest都接着上次的内容继续输出,除非有新消息输入
    49. kafkaConsumer.commitAsync();
    50. }
    51. }
    52. }

    二、多线程消费者模式

    1. package nj.zb.kb23.kafka;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.serialization.StringDeserializer;
    7. import java.time.Duration;
    8. import java.util.Collections;
    9. import java.util.Properties;
    10. /*
    11. * 多线程
    12. */
    13. public class MyConsumer2 {
    14. public static void main(String[] args) {
    15. Properties properties = new Properties();
    16. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
    17. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    18. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    19. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    20. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    21. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    22. /**
    23. * 设置消费组
    24. */
    25. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"心心威武");
    26. //多线程(3个线程)
    27. for(int i=0;i<=3;i++){
    28. new Thread(new Runnable() {
    29. @Override
    30. public void run() {
    31. KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
    32. kafkaConsumer.subscribe(Collections.singleton("kb23"));
    33. while(true){
    34. ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(100));
    35. for (ConsumerRecord record : records) {
    36. System.out.println(Thread.currentThread().getName()+
    37. " topic: "+record.topic()+
    38. " partition: "+record.partition()+
    39. " 偏移量: "+record.offset()+
    40. " value: "+record.value()+
    41. " 时间戳: "+record.timestamp());
    42. }
    43. }
    44. }
    45. }).start();
    46. }
    47. }
    48. }
    "C:\Program Files\Java\jdk1.8.0_144\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA 。。。。。。。。。。。。。。。。。。。。。。。。
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    Thread-3 topic: kb23 partition: 0 偏移量: 0 value: hello java 时间戳: 1695173593009
    Thread-3 topic: kb23 partition: 0 偏移量: 1 value: hello c== 时间戳: 1695173606546
    Thread-2 topic: kb23 partition: 1 偏移量: 0 value: dufiudhifch 时间戳: 1695174679229
    Thread-1 topic: kb23 partition: 2 偏移量: 0 value: hel 时间戳: 1695173599314
    Thread-3 topic: kb23 partition: 0 偏移量: 2 value: djfhjsjkhfk 时间戳: 1695174683054
    Thread-1 topic: kb23 partition: 2 偏移量: 1 value: hello world 时间戳: 1695173611446
    Thread-2 topic: kb23 partition: 1 偏移量: 1 value: hsdakhskfhak 时间戳: 1695174686318
    Thread-1 topic: kb23 partition: 2 偏移量: 2 value: hshcdshcdskc 时间戳: 1695174681057
    Thread-3 topic: kb23 partition: 0 偏移量: 3 value: jkfdsajklfjalds 时间戳: 1695174689058
    Thread-1 topic: kb23 partition: 2 偏移量: 3 value: dhjfhkshkf 时间戳: 1695174684802

    三、消费者模式seek方法

    1. package nj.zb.kb23.kafka;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.TopicPartition;
    7. import org.apache.kafka.common.serialization.StringDeserializer;
    8. import java.time.Duration;
    9. import java.util.Collections;
    10. import java.util.HashSet;
    11. import java.util.Properties;
    12. import java.util.Set;
    13. /*
    14. * seek指定开始消费的位置
    15. */
    16. public class MyConsumerSeek {
    17. public static void main(String[] args) {
    18. Properties properties = new Properties();
    19. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
    20. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    21. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    22. /*
    23. earliest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,从头开始消费
    24. latest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,消费新产生的数据
    25. none: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,抛出异常
    26. */
    27. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    28. /*
    29. * ENABLE_AUTO_COMMIT_CONFIG 设置是否自动提交,获取数据的状态,false手动提交,true自动提交
    30. * AUTO_COMMIT_INTERVAL_MS_CONFIG 设置提交时间,1000ms
    31. */
    32. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    33. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    34. /**
    35. * 设置消费组
    36. */
    37. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group3");
    38. //单线程
    39. KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
    40. kafkaConsumer.subscribe(Collections.singleton("kb23"));
    41. Set assignment = new HashSet<>();
    42. while (assignment.size()==0){
    43. kafkaConsumer.poll(Duration.ofMillis(1000));
    44. assignment = kafkaConsumer.assignment();
    45. }
    46. for (TopicPartition topicPartition :
    47. assignment) {
    48. //topic:kb23 tp-0:4 tp-1:5 tp-2:4
    49. System.out.println(topicPartition.topic()+"\t"+topicPartition.partition());
    50. if (topicPartition.partition()==0){
    51. kafkaConsumer.seek(topicPartition,4);
    52. }else if (topicPartition.partition()==1){
    53. kafkaConsumer.seek(topicPartition,5);
    54. }else if (topicPartition.partition()==2){
    55. kafkaConsumer.seek(topicPartition,4);
    56. }
    57. }
    58. while (true){
    59. ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(100));
    60. for (ConsumerRecord record:
    61. records){
    62. System.out.println("topic:"+record.topic()
    63. +" partition:"+record.partition()
    64. +" 偏移量:"+record.offset()
    65. +" value:"+record.value()
    66. +" 时间戳:"+record.timestamp());
    67. }
    68. }
    69. }
    70. }

    "C:\Program Files\Java\jdk1.8.0_144\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA.。。。。。。。。。。。。。
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    kb23    2
    kb23    1
    kb23    0
    topic:kb23 partition:2 偏移量:4 value:sjhkdksahkdah 时间戳:1695174687827
    topic:kb23 partition:2 偏移量:5 value:hhh1 时间戳:1695175898301
    topic:kb23 partition:2 偏移量:6 value:2222 时间戳:1695176003767
    topic:kb23 partition:2 偏移量:7 value:444 时间戳:1695176010084
    topic:kb23 partition:2 偏移量:8 value:ppp 时间戳:1695177956251
    topic:kb23 partition:2 偏移量:9 value:ppp1 时间戳:1695178017439
    topic:kb23 partition:2 偏移量:10 value:ppp3 时间戳:1695178021374
    topic:kb23 partition:2 偏移量:11 value:ananaq 时间戳:1695179560702
    topic:kb23 partition:1 偏移量:5 value:qqq 时间戳:1695175970133

  • 相关阅读:
    C++入门:C语言到C++的过渡
    报错!Jupyter notebook 500 : Internal Server Error
    http响应状态码(Header常见属性 — Location属性)
    STM32Cube高效开发教程<基础篇>(八)----通用定时器-输入捕获、输出比较、PWM输出/互补输出等
    Pandas数据分析20——pandas窗口计算
    C#源代码生成器深入讲解二
    实验四 内核线程管理-实验部分
    轻盈百搭头戴式耳机——umelody轻律 U1头戴式复古耳机分享
    C语言:动态内存管理
    SpringCloud进阶-搭建基本环境
  • 原文地址:https://blog.csdn.net/berbai/article/details/133104381