需求:创建一个独立消费者,消费主题中数据:
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 3 --replication-factor 3 --topic hh
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
Topic:hh PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: hh Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 2,1,0
Topic: hh Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 3,0,2
Topic: hh Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,1,2
注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
// 创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("hh");
consumer.subscribe(topics);
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
}
}
}
Springboot 自定义日志配置关闭Kafka消费者debug日志打印:在resource目录下添加文件 logback.xml
即可。
<configuration debug="false">
<property name="LOG_HOME" value="logs/">property>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%npattern>
encoder>
appender>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/%d{yyyy-MM-dd}.%i.logFileNamePattern>
<maxFileSize>50MBmaxFileSize>
<maxHistory>30maxHistory>
rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%npattern>
encoder>
appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
root>
<logger name="org.apache.kafka" level="info" additivity="false"/>
<logger name="org.apache.kafka.clients" level="info" additivity="false"/>
configuration>
测试生产者发送消息:
需求:创建一个独立消费者,消费主题 0 号分区的数据。
① kafka 消费者消费主题0号分区的数据:
public class CustomConsumerPartition {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
// 创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题对应的分区
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("hh",0));
consumer.assign(topicPartitions);
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
}
}
}
② kafka 生产者向主题的0号分区发送数据:
public class CustomProducerCallbackPartitions {
public static void main(String[] args) throws InterruptedException {
// kafka生产者属性配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// kafka生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for(int i=0;i<5;i++){
kafkaProducer.send(new ProducerRecord<>("hh" ,0,"","hello,kafka" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if(exception==null){
// 消息发送成功
System.out.println("主题"+recordMetadata.topic()+"->"+"分区:"+recordMetadata.partition());
}else{
// 消息发送失败
exception.printStackTrace();
}
}
});
Thread.sleep(2);
}
// 关闭资源
kafkaProducer.close();
}
}
③ 测试:先启动消费者程序,再启动生产者程序
需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。
① 创建3个消费者:复制2份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的3个消费者。
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
// 创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("hh");
consumer.subscribe(topics);
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
}
}
}
② 生产者发送消息:
public class CustomProducerCallbackPartitions {
public static void main(String[] args) throws InterruptedException {
// kafka生产者属性配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 添加自定义分区器
// properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hh.producer.MyPartitioner");
// kafka生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for(int i=0;i<50;i++){
kafkaProducer.send(new ProducerRecord<>("hh" ,"hello,kafka" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if(exception==null){
// 消息发送成功
System.out.println("主题"+recordMetadata.topic()+",发往的分区:"+recordMetadata.partition());
}else{
// 消息发送失败
exception.printStackTrace();
}
}
});
Thread.sleep(2);
}
// 关闭资源
kafkaProducer.close();
}
}
③ 测试:先启动3个消费者程序,再启动生产者程序
可以看到发送的50条消息分别被消费者组中的不同消费者消费,他们消费的是不同分区的数据。