• 3、Kafka进阶提升-消费者


     

     

     

     消费者与消费者组代码:

    1. import com.lvxiaosha.kafka.api.Const;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.TopicPartition;
    7. import java.time.Duration;
    8. import java.util.Collections;
    9. import java.util.List;
    10. import java.util.Properties;
    11. public class Consumer1 {
    12. public static void main(String[] args) {
    13. Properties props = new Properties();
    14. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    15. // GROUP_ID_CONFIG 消费者组配置
    16. props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-1");
    17. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    18. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    19. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    20. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    21. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    22. // 订阅主题
    23. consumer.subscribe(Collections.singletonList(Const.TOPIC_MODULE));
    24. System.err.println("consumer1 started.. ");
    25. try {
    26. while (true) {
    27. // 拉取结果集
    28. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    29. for (TopicPartition partition : records.partitions()) {
    30. List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    31. String topic = partition.topic();
    32. int size = partitionRecords.size();
    33. System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));
    34. for (int i = 0; i< size; i++) {
    35. long offset = partitionRecords.get(i).offset() + 1;
    36. System.err.println(String.format("获取value: %s, 提交的 offset: %s",
    37. partitionRecords.get(i).value(), offset));
    38. }
    39. }
    40. }
    41. } finally {
    42. consumer.close();
    43. }
    44. }
    45. }
    1. import com.lvxiaosha.kafka.api.Const;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.TopicPartition;
    7. import java.time.Duration;
    8. import java.util.Collections;
    9. import java.util.List;
    10. import java.util.Properties;
    11. public class Consumer2 {
    12. public static void main(String[] args) {
    13. Properties props = new Properties();
    14. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    15. props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-2");
    16. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    17. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    18. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    19. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    20. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    21. // 订阅主题
    22. consumer.subscribe(Collections.singletonList(Const.TOPIC_MODULE));
    23. System.err.println("consumer2 started.. ");
    24. try {
    25. while (true) {
    26. // 拉取结果集
    27. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    28. for (TopicPartition partition : records.partitions()) {
    29. List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    30. String topic = partition.topic();
    31. int size = partitionRecords.size();
    32. System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));
    33. for (int i = 0; i< size; i++) {
    34. long offset = partitionRecords.get(i).offset() + 1;
    35. System.err.println(String.format("获取value: %s, 提交的 offset: %s",
    36. partitionRecords.get(i).value(), offset));
    37. }
    38. }
    39. }
    40. } finally {
    41. consumer.close();
    42. }
    43. }
    44. }
    1. import com.alibaba.fastjson.JSON;
    2. import com.lvxiaosha.kafka.api.Const;
    3. import com.lvxiaosha.kafka.api.User;
    4. import org.apache.kafka.clients.producer.KafkaProducer;
    5. import org.apache.kafka.clients.producer.ProducerConfig;
    6. import org.apache.kafka.clients.producer.ProducerRecord;
    7. import java.util.Properties;
    8. public class Producer {
    9. public static void main(String[] args) {
    10. Properties props = new Properties();
    11. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    12. props.put(ProducerConfig.CLIENT_ID_CONFIG, "module-producer");
    13. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    14. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    15. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    16. for(int i = 0 ; i < 10; i ++) {
    17. User user = new User();
    18. user.setId(i+"");
    19. user.setName("张三");
    20. producer.send(new ProducerRecord<>(Const.TOPIC_MODULE, JSON.toJSONString(user)));
    21. }
    22. producer.close();
    23. }
    24. }

    注:如果Consumer1和Consumer2里面消费者的消费者组配置(props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-1");)都是一样的话,说明两个消费者隶属于同一个消费者组,相当于点对点模型,Consumer1和Consumer2这时会均摊的消费Topic的消息。反之,如果Consumer1和Consumer2里面消费者的消费者组配置不一样,相当于发布订阅模型,Consumer1和Consumer2这时会同时消费Topic里面的消息。

    Kafka消费者必要参数方法(没有这些参数就启动不起来)

    • bootstrap.servers:用来指定连接Kafka集群所需的broker地址清单。
    • key.deserializer和value.deserizer:反序列化参数;
    • group.id:消费者所属消费组;
    • subscribe:消费主题订阅,支持集合/标准正则表达式;
    • assign:只订阅主题的某个分区。

    Kafka消费者提交位移

     

    主题订阅代码:

    1. import com.lvxiaosha.kafka.api.Const;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.PartitionInfo;
    7. import org.apache.kafka.common.TopicPartition;
    8. import org.apache.kafka.common.serialization.StringDeserializer;
    9. import java.time.Duration;
    10. import java.util.ArrayList;
    11. import java.util.List;
    12. import java.util.Properties;
    13. public class CoreConsumer {
    14. public static void main(String[] args) {
    15. Properties properties = new Properties();
    16. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    17. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    18. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    19. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "core-group");
    20. // TODO : 使用手工方式提交
    21. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    22. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    23. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    24. // 对于consume消息的订阅 subscribe方法 :可以订阅一个 或者 多个 topic
    25. // consumer.subscribe(Collections.singletonList(Const.TOPIC_CORE));
    26. // 也可以支持正则表达式方式的订阅
    27. // consumer.subscribe(Pattern.compile("topic-.*"));
    28. // 可以指定订阅某个主题下的某一个 或者多个 partition
    29. // consumer.assign(Arrays.asList(new TopicPartition(Const.TOPIC_CORE, 0), new TopicPartition(Const.TOPIC_CORE, 2)));
    30. // 如何拉取主题下的所有partition
    31. List<TopicPartition> tpList = new ArrayList<TopicPartition>();
    32. List<PartitionInfo> tpinfoList = consumer.partitionsFor(Const.TOPIC_CORE);
    33. for(PartitionInfo pi : tpinfoList) {
    34. System.err.println("主题:"+ pi.topic() +", 分区: " + pi.partition());
    35. tpList.add(new TopicPartition(pi.topic(), pi.partition()));
    36. }
    37. consumer.assign(tpList);
    38. System.err.println("core consumer started...");
    39. try {
    40. while(true) {
    41. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    42. for(TopicPartition topicPartition : records.partitions()) {
    43. List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
    44. String topic = topicPartition.topic();
    45. int size = partitionRecords.size();
    46. System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));
    47. for(int i = 0; i < size; i++) {
    48. ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
    49. String value = consumerRecord.value();
    50. long offset = consumerRecord.offset();
    51. long commitOffser = offset + 1;
    52. System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));
    53. }
    54. }
    55. }
    56. } finally {
    57. consumer.close();
    58. }
    59. }
    60. }
    1. import com.alibaba.fastjson.JSON;
    2. import com.lvxiaosha.kafka.api.Const;
    3. import com.lvxiaosha.kafka.api.User;
    4. import org.apache.kafka.clients.producer.KafkaProducer;
    5. import org.apache.kafka.clients.producer.ProducerConfig;
    6. import org.apache.kafka.clients.producer.ProducerRecord;
    7. import org.apache.kafka.common.serialization.StringSerializer;
    8. import java.util.Properties;
    9. public class CoreProducer {
    10. public static void main(String[] args) {
    11. Properties properties = new Properties();
    12. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    13. properties.put(ProducerConfig.CLIENT_ID_CONFIG, "core-producer");
    14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    15. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    16. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    17. for(int i = 0; i <10; i ++) {
    18. User user = new User("00" + i, "张三");
    19. ProducerRecord<String, String> record =
    20. new ProducerRecord<String, String>(Const.TOPIC_CORE,
    21. JSON.toJSONString(user));
    22. producer.send(record);
    23. }
    24. producer.close();
    25. }
    26. }

    手工提交代码:

    1. import com.lvxiaosha.kafka.api.Const;
    2. import org.apache.kafka.clients.consumer.*;
    3. import org.apache.kafka.common.TopicPartition;
    4. import org.apache.kafka.common.serialization.StringDeserializer;
    5. import java.time.Duration;
    6. import java.util.Collections;
    7. import java.util.List;
    8. import java.util.Map;
    9. import java.util.Properties;
    10. public class CommitConsumer {
    11. public static void main(String[] args) {
    12. Properties properties = new Properties();
    13. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    14. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    15. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    16. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "core-group");
    17. // 使用手工方式提交
    18. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    19. // properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    20. // properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    21. // 消费者默认每次拉取的位置:从什么位置开始拉取消息
    22. // AUTO_OFFSET_RESET_CONFIG 有三种方式: "latest", "earliest", "none"
    23. // none
    24. // latest 从一个分区的最后提交的offset开始拉取消息
    25. // earliset 从最开始的起始位置拉取消息 0
    26. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");
    27. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    28. // 对于consume消息的订阅 subscribe方法 :可以订阅一个 或者 多个 topic
    29. consumer.subscribe(Collections.singletonList(Const.TOPIC_CORE));
    30. System.err.println("core consumer started...");
    31. try {
    32. while(true) {
    33. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    34. for(TopicPartition topicPartition : records.partitions()) {
    35. List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
    36. String topic = topicPartition.topic();
    37. int size = partitionRecords.size();
    38. System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));
    39. for(int i = 0; i < size; i++) {
    40. ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
    41. String value = consumerRecord.value();
    42. long offset = consumerRecord.offset();
    43. long commitOffser = offset + 1;
    44. System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));
    45. // 在一个partition内部,每一条消息记录 进行一一提交方式
    46. // consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(commitOffser)));
    47. consumer.commitAsync(
    48. Collections.singletonMap(topicPartition, new OffsetAndMetadata(commitOffser)),
    49. new OffsetCommitCallback() {
    50. @Override
    51. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
    52. Exception exception) {
    53. if (null != exception) {
    54. System.err.println("error . 处理");
    55. }
    56. System.err.println("异步提交成功:" + offsets);
    57. }
    58. });
    59. }
    60. // 一个partition做一次提交动作
    61. }
    62. /**
    63. // 整体提交:同步方式
    64. // consumer.commitSync();
    65. // 整体提交:异步方式
    66. consumer.commitAsync(new OffsetCommitCallback() {
    67. @Override
    68. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    69. if(null != exception) {
    70. System.err.println("error . 处理");
    71. }
    72. System.err.println("异步提交成功:" + offsets);
    73. }
    74. });
    75. */
    76. }
    77. } finally {
    78. consumer.close();
    79. }
    80. }
    81. }
    1. import com.alibaba.fastjson.JSON;
    2. import com.lvxiaosha.kafka.api.Const;
    3. import com.lvxiaosha.kafka.api.User;
    4. import org.apache.kafka.clients.producer.KafkaProducer;
    5. import org.apache.kafka.clients.producer.ProducerConfig;
    6. import org.apache.kafka.clients.producer.ProducerRecord;
    7. import org.apache.kafka.common.serialization.StringSerializer;
    8. import java.util.Properties;
    9. public class CommitProducer {
    10. public static void main(String[] args) {
    11. Properties properties = new Properties();
    12. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    13. properties.put(ProducerConfig.CLIENT_ID_CONFIG, "core-producer");
    14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    15. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    16. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    17. for(int i = 0; i <10; i ++) {
    18. User user = new User("00" + i, "张三");
    19. ProducerRecord<String, String> record =
    20. new ProducerRecord<String, String>(Const.TOPIC_CORE,
    21. JSON.toJSONString(user));
    22. producer.send(record);
    23. }
    24. producer.close();
    25. }
    26. }

    谈一谈 Kafka 的再均衡?

    Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:

    第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。所以对于Rebalance来说,Coordinator起着至关重要的作用。

     Kafka再均衡监听代码:

    1. import com.lvxiaosha.kafka.api.Const;
    2. import org.apache.kafka.clients.consumer.*;
    3. import org.apache.kafka.common.TopicPartition;
    4. import java.time.Duration;
    5. import java.util.Collection;
    6. import java.util.Collections;
    7. import java.util.List;
    8. import java.util.Properties;
    9. public class RebalanceConsumer1 {
    10. public static void main(String[] args) {
    11. Properties props = new Properties();
    12. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    13. // GROUP_ID_CONFIG 消费者组配置
    14. props.put(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-group");
    15. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    16. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    17. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    18. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    19. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    20. // 订阅主题
    21. consumer.subscribe(Collections.singletonList(Const.TOPIC_REBALANCE), new ConsumerRebalanceListener() {
    22. @Override
    23. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    24. System.err.println("Revoked Partitions:" + partitions);
    25. }
    26. @Override
    27. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    28. System.err.println("AssignedAssigned Partitions:" + partitions);
    29. }
    30. });
    31. System.err.println("rebalance consumer1 started.. ");
    32. try {
    33. while (true) {
    34. // 拉取结果集
    35. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    36. for (TopicPartition partition : records.partitions()) {
    37. List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    38. String topic = partition.topic();
    39. int size = partitionRecords.size();
    40. System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));
    41. for (int i = 0; i< size; i++) {
    42. long offset = partitionRecords.get(i).offset() + 1;
    43. System.err.println(String.format("获取value: %s, 提交的 offset: %s",
    44. partitionRecords.get(i).value(), offset));
    45. }
    46. }
    47. }
    48. } finally {
    49. consumer.close();
    50. }
    51. }
    52. }
    1. import com.lvxiaosha.kafka.api.Const;
    2. import org.apache.kafka.clients.consumer.*;
    3. import org.apache.kafka.common.TopicPartition;
    4. import java.time.Duration;
    5. import java.util.Collection;
    6. import java.util.Collections;
    7. import java.util.List;
    8. import java.util.Properties;
    9. public class RebalanceConsumer2 {
    10. public static void main(String[] args) {
    11. Properties props = new Properties();
    12. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    13. props.put(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-group");
    14. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    15. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    16. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    17. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    18. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    19. // 订阅主题
    20. // 订阅主题
    21. consumer.subscribe(Collections.singletonList(Const.TOPIC_REBALANCE), new ConsumerRebalanceListener() {
    22. @Override
    23. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    24. System.err.println("Revoked Partitions:" + partitions);
    25. }
    26. @Override
    27. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    28. System.err.println("AssignedAssigned Partitions:" + partitions);
    29. }
    30. });
    31. System.err.println("rebalance consumer2 started.. ");
    32. try {
    33. while (true) {
    34. // 拉取结果集
    35. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    36. for (TopicPartition partition : records.partitions()) {
    37. List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    38. String topic = partition.topic();
    39. int size = partitionRecords.size();
    40. System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));
    41. for (int i = 0; i< size; i++) {
    42. long offset = partitionRecords.get(i).offset() + 1;
    43. System.err.println(String.format("获取value: %s, 提交的 offset: %s",
    44. partitionRecords.get(i).value(), offset));
    45. }
    46. }
    47. }
    48. } finally {
    49. consumer.close();
    50. }
    51. }
    52. }
    1. import com.alibaba.fastjson.JSON;
    2. import com.lvxiaosha.kafka.api.Const;
    3. import com.lvxiaosha.kafka.api.User;
    4. import org.apache.kafka.clients.producer.KafkaProducer;
    5. import org.apache.kafka.clients.producer.ProducerConfig;
    6. import org.apache.kafka.clients.producer.ProducerRecord;
    7. import java.util.Properties;
    8. public class RebalanceProducer {
    9. public static void main(String[] args) {
    10. Properties props = new Properties();
    11. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    12. props.put(ProducerConfig.CLIENT_ID_CONFIG, "rebalance-producer");
    13. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    14. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    15. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    16. for(int i = 0 ; i < 10; i ++) {
    17. User user = new User();
    18. user.setId(i+"");
    19. user.setName("张三");
    20. producer.send(new ProducerRecord<>(Const.TOPIC_REBALANCE, JSON.toJSONString(user)));
    21. }
    22. producer.close();
    23. }
    24. }

    Kafka消费者多线程

     

     消费者多线程模型1代码实现:

    1. import org.apache.kafka.clients.consumer.ConsumerRecord;
    2. import org.apache.kafka.clients.consumer.ConsumerRecords;
    3. import org.apache.kafka.clients.consumer.KafkaConsumer;
    4. import org.apache.kafka.common.TopicPartition;
    5. import java.time.Duration;
    6. import java.util.Arrays;
    7. import java.util.List;
    8. import java.util.Properties;
    9. import java.util.concurrent.atomic.AtomicInteger;
    10. public class KafkaConsumerMt1 implements Runnable {
    11. private KafkaConsumer<String, String> consumer;
    12. private volatile boolean isRunning = true;
    13. private static AtomicInteger counter = new AtomicInteger(0);
    14. private String consumerName;
    15. public KafkaConsumerMt1(Properties properties, String topic) {
    16. this.consumer = new KafkaConsumer<>(properties);
    17. this.consumer.subscribe(Arrays.asList(topic));
    18. this.consumerName = "KafkaConsumerMt1-" + counter.getAndIncrement();
    19. System.err.println(this.consumerName + " started ");
    20. }
    21. @Override
    22. public void run() {
    23. try {
    24. while(isRunning) {
    25. // 包含所有topic下的所有消息内容
    26. ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
    27. for(TopicPartition topicPartition : consumerRecords.partitions()) {
    28. // String topic = topicPartition.topic();
    29. // 根据具体的topicPartition 去获取对应topicPartition下的数据集合
    30. List<ConsumerRecord<String, String>> partitionList = consumerRecords.records(topicPartition);
    31. int size = partitionList.size();
    32. for(int i = 0; i < size; i++) {
    33. ConsumerRecord<String, String> consumerRecord = partitionList.get(i);
    34. // do execute message
    35. String message = consumerRecord.value();
    36. long messageOffset = consumerRecord.offset();
    37. System.err.println("当前消费者:"+ consumerName
    38. + ",消息内容:" + message
    39. + ", 消息的偏移量: " + messageOffset
    40. + "当前线程:" + Thread.currentThread().getName());
    41. }
    42. }
    43. }
    44. } finally {
    45. if(consumer != null) {
    46. consumer.close();
    47. }
    48. }
    49. }
    50. public boolean isRunning() {
    51. return isRunning;
    52. }
    53. public void setRunning(boolean isRunning) {
    54. this.isRunning = isRunning;
    55. }
    56. }
    1. import com.alibaba.fastjson.JSON;
    2. import com.lvxiaosha.kafka.api.Const;
    3. import com.lvxiaosha.kafka.api.User;
    4. import org.apache.kafka.clients.producer.KafkaProducer;
    5. import org.apache.kafka.clients.producer.ProducerConfig;
    6. import org.apache.kafka.clients.producer.ProducerRecord;
    7. import java.util.Properties;
    8. /**
    9. * $Mt1Producer
    10. * @author hezhuo.bai
    11. * @since 2019年2月28日 下午12:38:46
    12. */
    13. public class Mt1Producer {
    14. public static void main(String[] args) {
    15. Properties props = new Properties();
    16. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    17. props.put(ProducerConfig.CLIENT_ID_CONFIG, "mt1-producer");
    18. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    19. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    20. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    21. for(int i = 0 ; i < 10; i ++) {
    22. User user = new User();
    23. user.setId(i+"");
    24. user.setName("张三");
    25. producer.send(new ProducerRecord<>(Const.TOPIC_MT1, JSON.toJSONString(user)));
    26. }
    27. producer.close();
    28. }
    29. }
    1. import com.lvxiaosha.kafka.api.Const;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import java.util.Properties;
    4. import java.util.concurrent.ExecutorService;
    5. import java.util.concurrent.Executors;
    6. public class Mt1Test {
    7. public static void main(String[] args) {
    8. Properties props = new Properties();
    9. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    10. props.put(ConsumerConfig.GROUP_ID_CONFIG, "mt1-group");
    11. // 自动提交的方式
    12. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    13. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    14. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    15. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    16. String topic = Const.TOPIC_MT1;
    17. // coreSize
    18. int coreSize = 5;
    19. ExecutorService executorService = Executors.newFixedThreadPool(coreSize);
    20. for(int i =0; i <5; i++) {
    21. executorService.execute(new KafkaConsumerMt1(props, topic));
    22. }
    23. }
    24. }

    采用Master-worker模型(master负责分发,worker负责处理)。master初始化阶段,创建多个worker实例,然后master负责拉取消息,并负责分发给各个worker执行。各个worker执行完成后,将执行结果回传给master,由master统一进行commit。

    这样避免了多个线程共同操作consumer,导致kafka抛异常ConcurrentModifacationException

    Master-worker模型

    Kafka消费者重要参数

     

  • 相关阅读:
    Vue - 实现点击按钮(笔图标)可编辑 input 输入框(点击文字内容后变成 <input> 输入框同时能修改和取消、删除)
    vue中的mixin混入
    java毕业设计旧衣物捐赠系统(附源码、数据库)
    VUE快速入门-2
    Flutter 按钮 大集合
    映翰通C++ 一面(技术面、65min、offer)
    深入了解iOS内存(WWDC 2018)笔记-内存诊断
    淘宝价格带卡位公式是什么?如何定价?
    前端必看!css中常用单位及区别
    六、02【Java 多线程】之线程基础知识
  • 原文地址:https://blog.csdn.net/Xx13624558575/article/details/126778621