消费者Consumer采用从broker中主动拉取数据,Kafka采用这种方式
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同
特点:
coordinator:辅助实现消费者组的初始化和分区的分配
coordinator节点选择 = groupid的hashcode值 % __consumer_offsets的分区数量
例如: groupid的hashcode值 = 1,__consumer_offsets为50,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset
每3秒 每个消费者都会和coordinator保持心跳(默认3s),一旦超时超过45s(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms5分钟)超过了五分钟,也会触发再平衡
再平衡:把挂掉的消费者的任务分配给其他消费者
在 IDEA 中编写生产者和消费者程序,生产者往主题为first3中发送数据,消费者从主题为first3中拉去数据
注意:运行程序之前,需要启动zk和kafka集群
生产者 CustomProducer01 类
package com.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author wangbo
* @version 1.0
*/
/**
* 异步发送,创建不带回调函数的API代码
*/
public class CustomProducer01 {
public static void main(String[] args) {
//配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092"); //写两个节点是为了防止客户挂掉,另一个能够正常工作
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1.创建kafka生成对象
// 表示 k的数据类型,和v的数据类型
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 2.发送数据
for (int i = 0; i<5;i++){
//第一个参数为生产者的主题名,第二个生产者生产的数据value。还有其他配置选项
kafkaProducer.send(new ProducerRecord<String, String>("first3","kafka"));
}
// 3.关闭资源
kafkaProducer.close();
}
}
消费者 CustomConsumer_01 类
package com.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @author wangbo
* @version 1.0
*/
/**
* 1. 启动集群的zk和kafka
* 2. 运行CustomConsumer_01消费者消费数据
* 3. 运行CustomProducer01生产者生产数据 注意主题要对上
*/
public class CustomConsumer_01 {
public static void main(String[] args) {
//配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092"); //多写一个,避免其中一台挂掉,保证数据的可靠性
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//配置消费者组ID 可以任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//1.创建一个消费者 "","hello"
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//2.订阅主题 first3
ArrayList<String> topics = new ArrayList<String>();
topics.add("first3");
kafkaConsumer.subscribe(topics);
//3.消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据
//循环打印消费的数据 consumerRecords.for
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}