kafka 对外提供的 API 主要有两类:生产者 API 和 消费者 API,本文将从Kafka生产者的设计和组件讲起,学习如何使用Kafka生产者。将首先演示如何创建KafkaProducer对象和ProducerRecords对象、如何将记录发送给Kafka,以及如何处理Kafka返回的错误响应。然后介绍用于控制生产者行为的重要配置参数。最后深入探讨如何使用不同的分区方法和序列化器,以及如何自定义序列化器和分区器。
kafka 生产者概述
下图展示了 kafka 生产者往 broker 发送消息的流程交互图:
创建 Producer 必填参数说明:
bootstrap.servers
key.serializer
org.apache.kafka.common.serialization.Serializer
接口的类,生产者会用这个类把键序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer
等。value.serializer
下面的代码片段演示了如何创建一个生产者。这里只指定了必需的属性,其他属性使用默认值。
Properties kafkaProps = new Properties(); ➊
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); ➋
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps); ➌
因为 Kafka 只能处理字节数组,我们在往 kafka 发送数据的时候必须将数据序列化为字节,我们可以通过对应语言自己的序列化方式直接将键或值序列化,也可以通过实现序列化器,来自定义将语言的内置数据类型对象转换为字节发送给生产者,这个时候我们就需要将序列化器的类名作为 **key.serializer 和 value.serializer **参数在创建生产者的时候指定。
kafka 往生产者发送消息的三种方式
发送并忘记
同步发送
异步发送
最简单的消息发送方式如下所示:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products",
"France"); ➊
try {
producer.send(record); ➋
} catch (Exception e) {
e.printStackTrace(); ➌
}
ProducerRecord
,指定 Topic, 键,值对。send()
方法来发送 ProducerRecord 对象, 消息会先被放进缓冲区,然后通过单独的线程发送给服务器端。这里我们忽略了消息是否发送成功。如果采用同步发送方式,那么发送线程在这段时间内就只能等待,什么也不做,甚至都不发送其他消息,这将导致糟糕的性能,通常不会被用在生产环境中,代码如下:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get(); ➊
} catch (Exception e) {
e.printStackTrace(); ➋
}
异步发送只需要指定回调方法即可,代码如下:
private class DemoProducerCallback implements Callback { ➊
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace(); ➋
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); ➌
producer.send(record, new DemoProducerCallback()); ➍
除了上边提到的必填参数外,还有一些选填参数,在内存使用、性能和可靠性方面对生产者影响比较大,接下来将详细介绍它们。
acks 指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。
acks 有三种设置:0,1,all.
acks=0
acks=1
acks=all
kafka 生产者往 broke 发送消息是如何判断超时失败的,这个时间是可配置的。我们将ProduceRecord的发送时间分成如下两个时间间隔,它们是被分开处理的。
send()
所花费的时间。在此期间,调用send()
的线程将被阻塞。生产环境一般使用异步发送,下边讨论的时间都是异步发送过程中的时间,下图展示了生产者的内部数据流以及不同的配置参数如何相互影响:
max.block.ms
delivery.timeout.ms
request.timeout.ms
retries 和retry.backoff.ms
linger.ms
buffer.memory
compression.type
batch.size
max.in.flight.requests.per.connection
max.request.size
receive.buffer.bytes和send.buffer.bytes
enable.idempotence
是否开启**精确一次性(exactly once), **如果设置为开启,需要同时配置 acks=all, 并将delivery.timeout.ms设置为一个比较大的数,允许进行尽可能多的重试。
当幂等生产者被启用时,生产者将给发送的每一条消息都加上一个序列号。如果broker收到具有相同序列号的消息,那么它就会拒绝第二个副本,而生产者则会收到DuplicateSequenceException,这个异常对生产者来说是不需要关心和无影响的。
如果要启用幂等性,那么max.in.flight.requests.per.connection应小于或等于5、retries应大于0,并且acks被设置为all。如果设置了不恰当的值,则会抛出ConfigException异常。
kafka 生产者发送的消息必须是字节类型的,kafka 生产者提供了一些简单的内置序列化器,比如说将字符串序列化为字节,但是并不能满足大部分场景,目前已有几种比较成熟的序列化方案:比如JSON、Avro、Thrift或Protobuf。我们也可以自定义一些序列化方式,将消息对象序列化为字节,但是除了一些定制化场景外,不建议自定义序列化器。
下边说明下如何在 kafka 生产者中使用 Avro 序列化消息。
在使用 Avro 对消息进行序列化的时候,每条消息都会包含对应的序列化 schema 信息,当发送消息条数不多的时候,可以忽略消息大小的增加。
但是如果消息量大,就会导致由于 schema 信息导致消息数据量剧增,因此我们需要使用到 schema 注册表的架构设计模式,生产者和消费者在序列化和反序列化的时候,都从 schema 注册表中查询 schema 信息,无需将 schema 信息保存在发送消息中。
交互流程如下图所示:
下面的例子演示了如何把生成的Avro对象发送给Kafka:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer"); ➊
props.put("schema.registry.url", schemaUrl); ➋
String topic = "customerContacts";
Producer<String, Customer> producer = new KafkaProducer<>(props); ➌
// 不断生成新事件,直到有人按下Ctrl-C组合键
while (true) {
Customer customer = CustomerGenerator.getNext(); ➍
System.out.println("Generated customer " +
customer.toString());
ProducerRecord<String, Customer> record =
new ProducerRecord<>(topic, customer.getName(), customer); ➎
producer.send(record); ➏
}
也可以使用通用的Avro对象,就像使用map那样,这与基于模式生成的带有getter方法和setter方法的Avro对象不同。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer"); ➊
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url); ➋
String schemaString =
"{\"namespace\": \"customerManagement.avro\",
"\"type\": \"record\", " + ➌
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\", \"type\": " + "[\"null\",\"string\"], " +
"\"default\":\"null\" }" +
"]}";
Producer<String, GenericRecord> producer =
new KafkaProducer<String, GenericRecord>(props); ➍
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
String name = "exampleCustomer" + nCustomers;
String email = "example " + nCustomers + "@example.com"
GenericRecord customer = new GenericData.Record(schema); ➎
customer.put("id", nCustomers);
customer.put("name", name);
customer.put("email", email);
ProducerRecord<String, GenericRecord> data =
new ProducerRecord<>("customerContacts", name, customer);
producer.send(data);
}
Kafka消息就是一个个的键–值对,ProducerRecord对象可以只包含主题名称和值,键默认情况下是null。不过,大多数应用程序还是会用键来发送消息。键有两种用途:
不指定键
键不为空且使用了默认的分区器
如果使用了默认的分区器,那么只有在不改变主题分区数量的情况下键与分区之间的映射才能保持一致。
例如,只要分区数量保持不变,就可以保证用户045189的记录总是被写到分区34。这样就可以在从分区读取数据时做各种优化。但是,一旦主题增加了新分区,这个就无法保证了——旧数据仍然留在分区34,但新记录可能被写到了其他分区。
如果要使用键来映射分区,那么最好在创建主题时就把分区规划好,而且永远不要增加新分区。
除了键和值,记录还可以包含标头。可以在不改变记录键–值对的情况下向标头中添加一些有关记录的元数据。标头指明了记录数据的来源,可以在不解析消息体的情况下根据标头信息来路由或跟踪消息(消息有可能被加密,而路由器没有访问加密数据的权限)。
标头由一系列有序的键–值对组成。键是字符串,值可以是任意被序列化的对象,就像消息里的值一样。
下面这个简单的示例演示了如何给ProduceRecord添加标头。
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
record.headers().add("privacy-level","YOLO".getBytes(StandardCharsets.UTF_8));
Kafka的ProducerInterceptor拦截器包含两个关键方法。ProducerRecord
这个方法会在记录被发送给Kafka之前,甚至是在记录被序列化之前调用。如果覆盖了这个方法,那么就可以捕获到有关记录的信息,甚至可以修改它。只需确保这个方法返回一个有效的ProducerRecord对象。这个方法返回的记录将被序列化并发送给Kafka。void onAcknowledgement(RecordMetadata metadata, Exception exception)
这个方法会在收到Kafka的确认响应时调用。如果覆盖了这个方法,则不可以修改Kafka返回的响应,但可以捕获到有关响应的信息。
常见的生产者拦截器应用场景包括:捕获监控和跟踪信息、为消息添加标头,以及敏感信息脱敏。
下面是一个非常简单的生产者拦截器示例,它只是简单地统计在特定时间窗口内发送和接收的消息数量:
public class CountingProducerInterceptor implements ProducerInterceptor {
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
static AtomicLong numSent = new AtomicLong(0);
static AtomicLong numAcked = new AtomicLong(0);
public void configure(Map<String, ?> map) {
Long windowSize = Long.valueOf(
(String) map.get("counting.interceptor.window.size.ms")); ➊
executorService.scheduleAtFixedRate(CountingProducerInterceptor::run,
windowSize, windowSize, TimeUnit.MILLISECONDS);
}
public ProducerRecord onSend(ProducerRecord producerRecord) {
numSent.incrementAndGet();
return producerRecord; ➋
}
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
numAcked.incrementAndGet(); ➌
}
public void close() {
executorService.shutdownNow(); ➍
}
public static void run() {
System.out.println(numSent.getAndSet(0));
System.out.println(numAcked.getAndSet(0));
}
}
Kafka可以限制生产消息和消费消息的速率,这是通过配额机制来实现的。Kafka提供了3种配额类型:生产、消费和请求。
在配置文件中指定的配额都是静态的,如果要修改它们,则需要重启所有的broker。因为随时都可能有新客户端加入,所以这种配置方式不是很方便。因此,特定客户端的配额通常采用动态配置。可以用kafka-config.sh或AdminClient API来动态设置配额:
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_
byte_rate=1024' --entity-name clientC --entity-type clients ➊
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_
byte_rate=1024,consumer_byte_rate=2048' --entity-name user1 --entity-type users ➋
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'consumer_
byte_rate=2048' --entity-type users ➌
如果异步调用Producer.send(),并且发送速率超过了broker能够接受的速率(无论是由于配额的限制还是由于处理能力不足),那么消息将会被放入客户端的内存队列。如果发送速率一直快于接收速率,那么客户端最终将耗尽内存缓冲区,并阻塞后续的Producer.send()调用。如果超时延迟不足以让broker赶上生产者,使其清理掉一些缓冲区空间,那么Producer.send()最终将抛出TimeoutException异常。或者,批次里的记录因为等待时间超过了delivery.timeout.ms而过期,导致执行send()的回调,并抛出TimeoutException异常。因此,要做好计划和监控,确保broker的处理能力总是与生产者发送数据的速率相匹配。