这一部分主要是从客户端使用的角度来理解Kakfa的重要机制。重点依然是要建立自己脑海中的Kafka消费模型。Kafka的HighLevel API使用是非常简单的,所以梳理模型时也要尽量简单化,主线清晰,细节慢慢扩展。
# 一、从基础的客户端说起
Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka_2.13artifactId>
<version>3.2.0version>
dependency>
然后可以使用Kafka提供的Producer类,快速发送消息。
public class MyProducer {
private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
private static final String TOPIC = "disTopic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//PART1:设置发送者相关属性
Properties props = new Properties();
// 此处配置的是kafka的端口
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 配置key的序列化类
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 配置value的序列化类
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 5; i++) {
//Part2:构建消息
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
//Part3:发送消息
//单向发送:不关心服务端的应答。
producer.send(record);
System.out.println("message "+i+" sended");
//同步发送:获取服务端应答消息前,会阻塞当前线程。
RecordMetadata recordMetadata = producer.send(record).get();
String topic = recordMetadata.topic();
long offset = recordMetadata.offset();
String message = recordMetadata.toString();
System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);
//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(null != e){
System.out.println("消息发送失败,"+e.getMessage());
e.printStackTrace();
}else{
String topic = recordMetadata.topic();
long offset = recordMetadata.offset();
String message = recordMetadata.toString();
System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);
}
}
});
}
producer.close();
}
}
整体来说,构建Producer分为三个步骤:
接下来可以使用Kafka提供的Consumer类,快速消费消息。
public class MyConsumer {
private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
private static final String TOPIC = "disTopic";
public static void main(String[] args) {
//PART1:设置发送者相关属性
Properties props = new Properties();
//kafka地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
//每个消费者要指定一个group
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
//key序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//value序列化类
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
//PART2:拉取消息
// 100毫秒超时时间
ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
//PART3:处理消息
for (ConsumerRecord<String, String> record : records) {
System.out.println("offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());
}
//提交offset,消息就不会重复推送。
consumer.commitSync();
// consumer.commitAsync();
}
}
}
整体来说,Consumer同样是分为三个步骤:
以Producer、Consumer为主体的这一层Kafka的客户端API,在Kafka中成为HighLevel API。这一层API封装了Kafka运行的很多底层细节,使用起来是非常简单的,基本就是固定的按照这三个大的步骤进行。在具体使用过程中,最大的变数基本上来自于生产者和消费者的第一部分,也就是属性定制的部分。这些属性极大的影响了客户端程序的执行方式。
渔与鱼:对于这些属性,你并不需要煞有介事的强行去记忆,随时可以根据ProducerConfig和ConsumerConfig以及他们的父类CommonClientConfig去理解,大部分的属性都配有非常简明扼要的解释。然后,尝试自己建立一个消息流转模型,去理解其中比较重要的一些属性。接下来,就只找几个相对比较重要的属性着重解释一下。