消费者与消费者组代码:
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.List;
- import java.util.Properties;
-
- public class Consumer1 {
-
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- // GROUP_ID_CONFIG 消费者组配置
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-1");
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- // 订阅主题
- consumer.subscribe(Collections.singletonList(Const.TOPIC_MODULE));
- System.err.println("consumer1 started.. ");
- try {
- while (true) {
- // 拉取结果集
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for (TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- String topic = partition.topic();
- int size = partitionRecords.size();
- System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));
- for (int i = 0; i< size; i++) {
- long offset = partitionRecords.get(i).offset() + 1;
- System.err.println(String.format("获取value: %s, 提交的 offset: %s",
- partitionRecords.get(i).value(), offset));
- }
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.List;
- import java.util.Properties;
-
- public class Consumer2 {
-
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-2");
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- // 订阅主题
- consumer.subscribe(Collections.singletonList(Const.TOPIC_MODULE));
- System.err.println("consumer2 started.. ");
- try {
- while (true) {
- // 拉取结果集
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for (TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- String topic = partition.topic();
- int size = partitionRecords.size();
- System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));
- for (int i = 0; i< size; i++) {
- long offset = partitionRecords.get(i).offset() + 1;
- System.err.println(String.format("获取value: %s, 提交的 offset: %s",
- partitionRecords.get(i).value(), offset));
- }
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
- import com.alibaba.fastjson.JSON;
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.util.Properties;
-
- public class Producer {
-
- public static void main(String[] args) {
-
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "module-producer");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-
- for(int i = 0 ; i < 10; i ++) {
- User user = new User();
- user.setId(i+"");
- user.setName("张三");
- producer.send(new ProducerRecord<>(Const.TOPIC_MODULE, JSON.toJSONString(user)));
- }
-
- producer.close();
-
- }
-
- }
注:如果Consumer1和Consumer2里面消费者的消费者组配置(props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-1");)都是一样的话,说明两个消费者隶属于同一个消费者组,相当于点对点模型,Consumer1和Consumer2这时会均摊的消费Topic的消息。反之,如果Consumer1和Consumer2里面消费者的消费者组配置不一样,相当于发布订阅模型,Consumer1和Consumer2这时会同时消费Topic里面的消息。
Kafka消费者必要参数方法(没有这些参数就启动不起来)
Kafka消费者提交位移
主题订阅代码:
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.PartitionInfo;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
-
- public class CoreConsumer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "core-group");
-
- // TODO : 使用手工方式提交
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
-
- // 对于consume消息的订阅 subscribe方法 :可以订阅一个 或者 多个 topic
- // consumer.subscribe(Collections.singletonList(Const.TOPIC_CORE));
- // 也可以支持正则表达式方式的订阅
- // consumer.subscribe(Pattern.compile("topic-.*"));
-
- // 可以指定订阅某个主题下的某一个 或者多个 partition
- // consumer.assign(Arrays.asList(new TopicPartition(Const.TOPIC_CORE, 0), new TopicPartition(Const.TOPIC_CORE, 2)));
-
- // 如何拉取主题下的所有partition
- List<TopicPartition> tpList = new ArrayList<TopicPartition>();
- List<PartitionInfo> tpinfoList = consumer.partitionsFor(Const.TOPIC_CORE);
- for(PartitionInfo pi : tpinfoList) {
- System.err.println("主题:"+ pi.topic() +", 分区: " + pi.partition());
- tpList.add(new TopicPartition(pi.topic(), pi.partition()));
- }
-
- consumer.assign(tpList);
-
- System.err.println("core consumer started...");
-
- try {
- while(true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for(TopicPartition topicPartition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
- String topic = topicPartition.topic();
- int size = partitionRecords.size();
- System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));
- for(int i = 0; i < size; i++) {
- ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
- String value = consumerRecord.value();
- long offset = consumerRecord.offset();
- long commitOffser = offset + 1;
- System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));
- }
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
- import com.alibaba.fastjson.JSON;
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class CoreProducer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ProducerConfig.CLIENT_ID_CONFIG, "core-producer");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
- for(int i = 0; i <10; i ++) {
- User user = new User("00" + i, "张三");
- ProducerRecord<String, String> record =
- new ProducerRecord<String, String>(Const.TOPIC_CORE,
- JSON.toJSONString(user));
- producer.send(record);
- }
- producer.close();
- }
- }
手工提交代码:
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
-
- public class CommitConsumer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "core-group");
-
- // 使用手工方式提交
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
- // properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- // properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
-
- // 消费者默认每次拉取的位置:从什么位置开始拉取消息
- // AUTO_OFFSET_RESET_CONFIG 有三种方式: "latest", "earliest", "none"
- // none
- // latest 从一个分区的最后提交的offset开始拉取消息
- // earliset 从最开始的起始位置拉取消息 0
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");
-
-
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
-
- // 对于consume消息的订阅 subscribe方法 :可以订阅一个 或者 多个 topic
- consumer.subscribe(Collections.singletonList(Const.TOPIC_CORE));
-
- System.err.println("core consumer started...");
-
- try {
- while(true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for(TopicPartition topicPartition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
- String topic = topicPartition.topic();
- int size = partitionRecords.size();
- System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));
- for(int i = 0; i < size; i++) {
- ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
- String value = consumerRecord.value();
- long offset = consumerRecord.offset();
- long commitOffser = offset + 1;
- System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));
- // 在一个partition内部,每一条消息记录 进行一一提交方式
- // consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(commitOffser)));
- consumer.commitAsync(
- Collections.singletonMap(topicPartition, new OffsetAndMetadata(commitOffser)),
- new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
- Exception exception) {
- if (null != exception) {
- System.err.println("error . 处理");
- }
- System.err.println("异步提交成功:" + offsets);
- }
- });
-
- }
- // 一个partition做一次提交动作
- }
-
- /**
- // 整体提交:同步方式
- // consumer.commitSync();
-
- // 整体提交:异步方式
- consumer.commitAsync(new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- if(null != exception) {
- System.err.println("error . 处理");
- }
- System.err.println("异步提交成功:" + offsets);
- }
- });
- */
-
- }
- } finally {
- consumer.close();
- }
- }
- }
- import com.alibaba.fastjson.JSON;
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class CommitProducer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ProducerConfig.CLIENT_ID_CONFIG, "core-producer");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
- for(int i = 0; i <10; i ++) {
- User user = new User("00" + i, "张三");
- ProducerRecord<String, String> record =
- new ProducerRecord<String, String>(Const.TOPIC_CORE,
- JSON.toJSONString(user));
- producer.send(record);
- }
- producer.close();
- }
- }
在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:
第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。所以对于Rebalance来说,Coordinator起着至关重要的作用。
Kafka再均衡监听代码:
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
-
- import java.time.Duration;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.List;
- import java.util.Properties;
-
- public class RebalanceConsumer1 {
-
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- // GROUP_ID_CONFIG 消费者组配置
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-group");
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- // 订阅主题
- consumer.subscribe(Collections.singletonList(Const.TOPIC_REBALANCE), new ConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- System.err.println("Revoked Partitions:" + partitions);
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- System.err.println("AssignedAssigned Partitions:" + partitions);
- }
- });
- System.err.println("rebalance consumer1 started.. ");
- try {
- while (true) {
- // 拉取结果集
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for (TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- String topic = partition.topic();
- int size = partitionRecords.size();
- System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));
- for (int i = 0; i< size; i++) {
- long offset = partitionRecords.get(i).offset() + 1;
- System.err.println(String.format("获取value: %s, 提交的 offset: %s",
- partitionRecords.get(i).value(), offset));
- }
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
-
- import java.time.Duration;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.List;
- import java.util.Properties;
-
- public class RebalanceConsumer2 {
-
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-group");
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- // 订阅主题
- // 订阅主题
- consumer.subscribe(Collections.singletonList(Const.TOPIC_REBALANCE), new ConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- System.err.println("Revoked Partitions:" + partitions);
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- System.err.println("AssignedAssigned Partitions:" + partitions);
- }
- });
-
- System.err.println("rebalance consumer2 started.. ");
- try {
- while (true) {
- // 拉取结果集
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for (TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- String topic = partition.topic();
- int size = partitionRecords.size();
- System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));
- for (int i = 0; i< size; i++) {
- long offset = partitionRecords.get(i).offset() + 1;
- System.err.println(String.format("获取value: %s, 提交的 offset: %s",
- partitionRecords.get(i).value(), offset));
- }
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
- import com.alibaba.fastjson.JSON;
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.util.Properties;
-
- public class RebalanceProducer {
-
- public static void main(String[] args) {
-
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "rebalance-producer");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-
- for(int i = 0 ; i < 10; i ++) {
- User user = new User();
- user.setId(i+"");
- user.setName("张三");
- producer.send(new ProducerRecord<>(Const.TOPIC_REBALANCE, JSON.toJSONString(user)));
- }
-
- producer.close();
-
- }
-
- }
Kafka消费者多线程
消费者多线程模型1代码实现:
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
-
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.List;
- import java.util.Properties;
- import java.util.concurrent.atomic.AtomicInteger;
-
- public class KafkaConsumerMt1 implements Runnable {
-
- private KafkaConsumer<String, String> consumer;
-
- private volatile boolean isRunning = true;
-
- private static AtomicInteger counter = new AtomicInteger(0);
-
- private String consumerName;
-
- public KafkaConsumerMt1(Properties properties, String topic) {
- this.consumer = new KafkaConsumer<>(properties);
- this.consumer.subscribe(Arrays.asList(topic));
- this.consumerName = "KafkaConsumerMt1-" + counter.getAndIncrement();
- System.err.println(this.consumerName + " started ");
- }
-
- @Override
- public void run() {
- try {
- while(isRunning) {
- // 包含所有topic下的所有消息内容
- ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
- for(TopicPartition topicPartition : consumerRecords.partitions()) {
- // String topic = topicPartition.topic();
- // 根据具体的topicPartition 去获取对应topicPartition下的数据集合
- List<ConsumerRecord<String, String>> partitionList = consumerRecords.records(topicPartition);
- int size = partitionList.size();
- for(int i = 0; i < size; i++) {
- ConsumerRecord<String, String> consumerRecord = partitionList.get(i);
- // do execute message
- String message = consumerRecord.value();
- long messageOffset = consumerRecord.offset();
- System.err.println("当前消费者:"+ consumerName
- + ",消息内容:" + message
- + ", 消息的偏移量: " + messageOffset
- + "当前线程:" + Thread.currentThread().getName());
- }
- }
- }
- } finally {
- if(consumer != null) {
- consumer.close();
- }
- }
- }
-
- public boolean isRunning() {
- return isRunning;
- }
-
- public void setRunning(boolean isRunning) {
- this.isRunning = isRunning;
- }
-
- }
- import com.alibaba.fastjson.JSON;
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.util.Properties;
- /**
- * $Mt1Producer
- * @author hezhuo.bai
- * @since 2019年2月28日 下午12:38:46
- */
- public class Mt1Producer {
-
- public static void main(String[] args) {
-
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "mt1-producer");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-
- for(int i = 0 ; i < 10; i ++) {
- User user = new User();
- user.setId(i+"");
- user.setName("张三");
- producer.send(new ProducerRecord<>(Const.TOPIC_MT1, JSON.toJSONString(user)));
- }
-
- producer.close();
-
- }
-
-
- }
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
-
- import java.util.Properties;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- public class Mt1Test {
-
- public static void main(String[] args) {
-
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "mt1-group");
- // 自动提交的方式
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- String topic = Const.TOPIC_MT1;
-
- // coreSize
- int coreSize = 5;
- ExecutorService executorService = Executors.newFixedThreadPool(coreSize);
-
- for(int i =0; i <5; i++) {
- executorService.execute(new KafkaConsumerMt1(props, topic));
- }
-
- }
- }
采用Master-worker模型(master负责分发,worker负责处理)。master初始化阶段,创建多个worker实例,然后master负责拉取消息,并负责分发给各个worker执行。各个worker执行完成后,将执行结果回传给master,由master统一进行commit。
这样避免了多个线程共同操作consumer,导致kafka抛异常ConcurrentModifacationException
Master-worker模型
Kafka消费者重要参数