配置参数 | 配置参数释义 | 默认值 |
bootstrap.servers | 指定Kafka集群所需的broker地址清单 | “” |
metadata.max.age.ms | 强制刷新元数据时间,毫秒 | 默认300000,5分钟 |
batch.size | 指定ProducerBatch内存区域的大小 | 默认16kb |
acks | 指定分区中必须有多少个副本收到这条消息,才算消息发送成功 | 默认值1,字符串类型 |
linger.ms | 指定ProducerBatch在延迟多少毫秒后再发送,但如果在延迟的这段时间内batch的大小已经到了batch.size设置的大小,那么消息会被立即发送,不会再等待 | 默认值0 |
client.id | 用户设定,用于跟踪记录消息 | “” |
send.buffer.bytes | Socket发送缓冲区大小 | 默认128kb,-1将使用操作系统的设置 |
receive.buffer.bytes | Socket接收缓冲区大小 | 默认32kb,-1将使用操作系统的设置 |
max.request.size | 限制生产者客户端发送消息的最大值 | 默认1MB |
reconnect.backoff.ms | 连接失败后,尝试连接Kafka的时间间隔 | 默认50ms |
reconnect.backoff.max.ms | 尝试连接到Kafka,生产者客户端等待的最大时间 | 默认1000ms |
max.block.ms | 控制生产者客户端send()方法和partitionsFor()方法的阻塞时间。当生产者的发送缓存区已满,或者没有可用元数据时,这些方法就会阻塞 | 默认60s |
buffer.memory | 生产者客户端中用于缓存消息的缓存区大小 | 默认32MB |
retry.backoff.ms | 消息发送失败重试时间间隔 | 默认100ms |
compression.type | 指定消息的压缩方式 | 默认不压缩 |
metrics.sample.window.ms | 样本计算时间窗口 | 默认30000ms |
metrics.num.samples | 用于维护metrics的样本数量 | 默认2 |
metrics.log.level | metrics日志记录级别 | 默认info |
metric.reporters | 类的列表,用于衡量指标 | 默认空list |
max.in.flight.requests.per.connection | 可以在一个connection中发送多个请求,叫作一个flight,这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变 | 默认5 |
retries | 消息发送失败重试次数 | 默认0 |
key.serializer | key的序列化方式 | |
value.serializer | value序列化类方式 | |
connections.max.idle.ms | 设置多久之后关闭空闲连接 | 默认540000ms |
partitioner.class | 分区类,实现Partitioner接口,可以自定义分区规则 | |
request.timeout.ms | 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常 | 默认30000ms |
interceptor.classes | 拦截器类,实现ProducerInterceptor接口,自定义拦截器 | |
enable.idempotence | true为开启幂等性 | |
transaction.timeout.ms | 事务超时时间 | 默认60000ms |
ransactional.id | 设置事务id,必须唯一 |
group.id | 消费者所属消费组的唯一标识 | |
max.poll.records | 一次拉取请求的最大消息数 | 默认500条 |
max.poll.interval.ms | 指定拉取消息线程最长空闲时间 | 默认300000ms |
session.timeout.ms | 检测消费者是否失效的超时时间 | 默认10000ms |
heartbeat.interval.ms | 消费者心跳时间 | 默认3000ms |
bootstrap.servers | 连接集群broker地址 | |
enable.auto.commit | 是否开启自动提交消费位移的功能 | 默认true |
auto.commit.interval.ms | 自动提交消费位移的时间间隔 | 默认5000ms |
partition.assignment.strategy | 消费者的分区配置策略 | 默认 RangeAssignor |
auto.offset.reset | 如果分区没有初始偏移量,或者当前偏移量服务器上不存在时,将使用的偏移量设置,earliest从头开始消费,latest从最近的开始消费,none抛出异常 | |
fetch.min.bytes | 消费者客户端一次请求从Kafka拉取消息的最小数据量,如果Kafka返回的数据量小于该值,会一直等待,直到满足这个配置大小 | 默认1b |
fetch.max.bytes | 消费者客户端一次请求从Kafka拉取消息的最大数据量 | 默认50MB |
fetch.max.wait.ms | 从Kafka拉取消息时,在不满足fetch.min.bytes条件时,等待的最大时间 | 默认500ms |
metadata.max.age.ms | 强制刷新元数据时间,毫秒 | 默认300000,5分钟 |
max.partition.fetch.bytes | 设置从每个分区里返回给消费者的最大数据量,区别于fetch.max.bytes | 默认1MB |
send.buffer.bytes | Socket发送缓冲区大小 | 默认128kb,-1将使用操作系统的设置 |
receive.buffer.bytes | Socket发送缓冲区大小 | 默认64kb,-1将使用操作系统的设置 |
client.id | 消费者客户端的id | |
.reconnect.backoff.ms | 连接失败后,尝试连接Kafka的时间间隔 | 默认50ms |
reconnect.backoff.max.ms | 尝试连接到Kafka,生产者客户端等待的最大时间 | 默认1000ms |
retry.backoff.ms | 消息发送失败重试时间间隔 | 默认100ms |
metrics.sample.window.ms | 样本计算时间窗口 | 默认30000ms |
metrics.num.samples | 用于维护metrics的样本数量 | 默认2 |
metrics.log.level | metrics日志记录级别 | 默认info |
metric.reporters | 类的列表,用于衡量指标 | 默认空list |
check.crcs | 自动检查CRC32记录的消耗 | |
key.deserializer | key反序列化方式 | |
value.deserializer | value反序列化方式 | |
connections.max.idle.ms | 设置多久之后关闭空闲连接 | 默认540000ms |
request.timeout.ms | 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常 | 默认30000ms |
default.api.timeout.ms | 设置消费者api超时时间 | 默认60000ms |
interceptor.classes | 自定义拦截器 | |
exclude.internal.topics | 内部的主题:一consumer_offsets 和一transaction_state。该参数用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没有这个限制 | |
isolation.level | 用来配置消费者的事务隔离级别。如果设置为“read committed”,那么消费者就会忽略事务未提交的消息,即只能消 费到 LSO (LastStableOffset)的位置,默认情况下为 “read_uncommitted”,即可以消 费到 HW (High Watermark)处的位置 |
num.partitions | 参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值 | 默认值是 1 |
broker.id | 每个 broker 都需要有一个标识符,使用 broker.id 来表示,它可以被设置成其他任意整数,在集群中需要保证每个节点的 broker.id 都是唯一的 | 默认值是 0 |
port | 如果使用配置样本来启动 kafka ,它会监听 9092 端口,修改 port 配置参数可以把它设置成其他任意可用的端口 | |
zookeeper.connect | 用于保存 broker 元数据的地址是通过 zookeeper.connect 来指定。 localhost:2181 表示运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表 | |
log.dirs | Kafka 把消息都保存在磁盘上,存放这些日志片段的目录都是通过 log.dirs 来指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据 “最少使用” 原则,把同一分区的日志片段保存到同一路径下。要注意,broker 会向拥有最少数目分区的路径新增分区,而不是向拥有最小磁盘空间的路径新增分区 | |
num.recovery.threads.per.data.dir | 设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录,也就是说,如果num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程 | |
host.name | broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置 |
用户发送消息时,将待发送的消息和主题及分区信息等封装成 ProducerRecord对象,并作为参数KafkaProducer传入send方法
kafkaProducer发送消息的时候会创建sender、RecordAccumulator 和Metadata ,sender线程(sender是守护线程)
.sender线程通过调用底层java NIO连接服务器的broker,并获取服务器相关信息存入Metadata
类似于 GC机制的线程
String topic 消息所属的主题。
Integer partition 消息所在主题的队列数,可以人为指定,如果指定了 key 的话,会使用 key 的 - - hashCode 与队列总数进行取模来选择分区,如果前面两者都未指定,则会轮询主题下的所有分区。
Headers headers 该消息的额外属性对,与消息体分开存储.
K key 消息键,如果指定该值,则会使用该值的 hashcode 与 队列数进行取模来选择分区。
V value 消息体。
Long timestamp 消息时间戳,根据 topic 的配置信息 message.timestamp.type 的值来赋予不同的值。
public class KafkaProducer<K, V> implements Producer<K, V> {
private final Logger log;
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
private final String clientId;
// Visible for testing
final Metrics metrics;
private final Partitioner partitioner; //自定义分区策略
private final int maxRequestSize;
private final long totalMemorySize;
private final ProducerMetadata metadata; //元数据
private final RecordAccumulator accumulator; //消息暂存储
private final Sender sender; //消息拉取和发送的线程
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
private final Serializer<K> keySerializer; //key序列化
private final Serializer<V> valueSerializer; //value序列化
private final ProducerConfig producerConfig; //配置项
private final long maxBlockTimeMs; //拉取集群信息最大阻塞时间
private final ProducerInterceptors<K, V> interceptors; //拦截器
private final ApiVersions apiVersions;
private final TransactionManager transactionManager; //事务相关
KafkaProducer(Map<String, Object> configs,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer,
try {
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = time;
String transactionalId = (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext;
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
JmxReporter jmxReporter = new JmxReporter();
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
this.keySerializer = keySerializer;
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
this.valueSerializer = valueSerializer;
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
if (interceptors != null)
this.interceptors = interceptors;
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
this.apiVersions = new ApiVersions();
this.transactionManager = configureTransactionState(config, logContext);
this.accumulator = new RecordAccumulator(logContext,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs,
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
代码@1 初始化kafkaProducer,sender线程运行
代码@2 将消息追加到内存中
final Producer<K, V> producer = getTheProducer();
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
代码@1 首先执行消息发送拦截器,拦截器通过 interceptor.classes 配置指定,类型为 List< String >每一个元素为拦截器的全类路径限定名
代码@2 doSend()方法追加消息到内存
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
代码@1 获取 topic 的分区列表,如果本地没有该topic的分区信息,则需要向远端 broker 获取,该方法会返回拉取元数据所耗费的时间。在消息发送时的最大等待时间时会扣除该部分损耗的时间
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
代码@1 从当前阻塞中获取服务器的相关信息,有就直接返回ClusterAndWaitTime配置类
代码@2 当前阻塞中没有就用循环等待获取
代码@3 获取metadata当前版本号
代码@4 从阻塞中唤醒sender线程获取服务器相关信息
代码@5 sender线程更新metadata,就会更新版本,从而退出循环,否则抛出超时异常
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
//@2 取不到topic的配置信息,一直死循环wait,直到超时,抛TimeoutException
do {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
代码@1 sender更新版本,结束循环,拿到一次的集群信息
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {
if (remainingWaitMs != 0
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
注意:序列化方法虽然有传入 topic、Headers 这两个属性,但参与序列化的只是 key
代码@1 对传入的key值进行序列化处理
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
代码@1 对传入的value值进行序列化处理
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
根据分区负载算法计算本次消息发送该发往的分区。其默认实现类为 DefaultPartitioner,路由算法如下:
如果指定了 key ,则使用 key 的 hashcode 与分区数取模。
如果未指定 key,则轮询所有的分区。
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
代码@1 该主题下所有的分区信息
代码@2 如果key字节数组为空,也就是producer没有指定key的情况下。采用轮询策略
代码@3 根据topic获取nextValue值
代码@4 获取可用的分区信息
代码@5 如果存在可用的分区,我们就用生成的随机值取余可用分区数来确定该条消息发送的分区。
代码@6 如果不存在可用分区,就就用生成的随机值取余分区数来确定该条消息发送的分区
代码@7 如果指定了key就使用一致性hash算法来指定发送的分区
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//@1 该主题下所有的分区信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//@2 如果key字节数组为空,也就是producer没有指定key的情况下。采用轮询策略
if (keyBytes == null) {
//@3 根据topic获取nextValue值
int nextValue = nextValue(topic);
//@4 获取可用的分区信息
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
//@5 如果存在可用的分区,我们就用生成的随机值取余可用分区数来确定该条消息发送的分区
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
//@6 如果不存在可用分区,就用生成的随机值取余分区数来确定该条消息发送的分区
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
} else {
//@7 如果指定了key就使用一致性hash算法来指定发送的分区
// hash the keyBytes to choose a partition,同一个key会分到同一个分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
public static int toPositive(int number) {
// 为啥要把hash值和0x7FFFFFFF做一次按位与操作呢,
// 主要是为了保证得到的index的第一位为0,也就是为了得到一个正数。
// 因为有符号数第一位0代表正数,1代表负数
return number & 0x7fffffff;
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
//.putIfAbsent:如果所指定的 key 已经在 HashMap 中存在,返回和这个 key 值对应的 value,
// 如果所指定的 key 不在 HashMap 中存在,则返回 null
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
return counter.getAndIncrement();
Map<String, List<PartitionInfo>> tmpAvailablePartitionsByTopic = new HashMap<>(tmpPartitionsByTopic.size());
for (Map.Entry<String, List<PartitionInfo>> entry : tmpPartitionsByTopic.entrySet()) {
String topic = entry.getKey();
List<PartitionInfo> partitionsForTopic = Collections.unmodifiableList(entry.getValue());
tmpPartitionsByTopic.put(topic, partitionsForTopic);
// Optimise for the common case where all partitions are available
// 是否有leader
// stream().anyMatch:是有一个或一个以上的元素满足函数参数计算结果为true那整个方法返回值为true
boolean foundUnavailablePartition = partitionsForTopic.stream().anyMatch(p -> p.leader() == null);
List<PartitionInfo> availablePartitionsForTopic;
if (foundUnavailablePartition) {
availablePartitionsForTopic = new ArrayList<>(partitionsForTopic.size());
for (PartitionInfo p : partitionsForTopic) {
if (p.leader() != null)
availablePartitionsForTopic = Collections.unmodifiableList(availablePartitionsForTopic);
} else {
availablePartitionsForTopic = partitionsForTopic;
tmpAvailablePartitionsByTopic.put(topic, availablePartitionsForTopic);
this.partitionsByTopic = Collections.unmodifiableMap(tmpPartitionsByTopic);
this.availablePartitionsByTopic = Collections.unmodifiableMap(tmpAvailablePartitionsByTopic);
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
先初始化消息时间戳,并对传入的 Callable(回调函数) 加入到拦截器链中
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
代码@1 将消息追加到缓存区
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {}
is either full or getting a new batch", record.topic(), partition);
return result.future;
代码@1 如果当前缓存区已写满或创建了一个新的缓存区
代码@2 唤醒 Sender(消息发送线程),将缓存区中的消息发送到 broker 服务器
代码@3 最终返回 future
这里是经典的 Future 设计模式,所以消息发送采用的是异步发送形式
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {}
is either full or getting a new batch", record.topic(), partition);
return result.future;
* A callback interface that the user can implement to allow code to execute when the request is complete. This callback
* will generally execute in the background I/O thread so it should be fast.
public interface Callback {
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
* non-null.
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* Possible thrown exceptions include:
* Non-Retriable exceptions (fatal, the message will never be sent):
* InvalidTopicException
* OffsetMetadataTooLargeException
* RecordBatchTooLargeException
* RecordTooLargeException
* UnknownServerException
* Retriable exceptions (transient, may be covered by increasing #.retries):
* CorruptRecordException
* InvalidMetadataException
* NotEnoughReplicasAfterAppendException
* NotEnoughReplicasException
* OffsetOutOfRangeException
* TimeoutException
* UnknownTopicOrPartitionException
public void onCompletion(RecordMetadata metadata, Exception exception);
public interface ProducerListener<K, V> {
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
onSuccess(producerRecord.topic(), producerRecord.partition(),
producerRecord.key(), producerRecord.value(), recordMetadata);
default void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata) {
default void onError(ProducerRecord<K, V> producerRecord, Exception exception) {
onError(producerRecord.topic(), producerRecord.partition(),
producerRecord.key(), producerRecord.value(), exception);
default void onError(String topic, Integer partition, K key, V value, Exception exception) {
default boolean isInterestedInSuccess() {
return false;
public class LoggingProducerListener<K, V> implements ProducerListener<K, V> {
public static final int DEFAULT_MAX_CONTENT_LOGGED = 100;
private static final Log logger = LogFactory.getLog(LoggingProducerListener.class); // NOSONAR
private boolean includeContents = true;
private int maxContentLogged = DEFAULT_MAX_CONTENT_LOGGED;
public void setIncludeContents(boolean includeContents) {
this.includeContents = includeContents;
public void setMaxContentLogged(int maxContentLogged) {
this.maxContentLogged = maxContentLogged;
public void onError(String topic, Integer partition, K key, V value, Exception exception) {
if (logger.isErrorEnabled()) {
StringBuffer logOutput = new StringBuffer();
logOutput.append("Exception thrown when sending a message");
if (this.includeContents) {
logOutput.append(" with key='")
.append(toDisplayString(ObjectUtils.nullSafeToString(key), this.maxContentLogged))
.append(" and payload='")
.append(toDisplayString(ObjectUtils.nullSafeToString(value), this.maxContentLogged))
logOutput.append(" to topic ").append(topic);
if (partition != null) {
logOutput.append(" and partition ").append(partition);
logger.error(logOutput, exception);
private String toDisplayString(String original, int maxCharacters) {
if (original.length() <= maxCharacters) {
return original;
return original.substring(0, maxCharacters) + "...";
代码@0 KafkaTemplate的属性producerListener,初始化LoggingProducerListener类,LoggingProducerListener类实现ProducerListener接口
代码@1 kafka工具类调用发送信息方法时,一同创建了回调方法
代码@2 buildCallback方法创建Callback接口的实现类,实现onCompletion方法
代码@3 判断是否有异常,没有异常就调用producerListener的成功方法onSuccess
代码@4 判断是否有异常,有异常就调用producerListener的失败方法onError
private volatile ProducerListener<K, V> producerListener = new LoggingProducerListener<K, V>();
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
// @1
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final Producer<K, V> producer,
final SettableListenableFuture<SendResult<K, V>> future) {
//java 8 写法 等同于
// new Callback(){
// @Override
// public void onCompletion(metadata,exception){
// ...
// }
// }
// @2
return (metadata, exception) -> {
try {
// @3
if (exception == null) {
future.set(new SendResult<>(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
if (KafkaTemplate.this.logger.isTraceEnabled()) {
KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " + metadata);
// @4
else {
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onError(producerRecord, exception);
if (KafkaTemplate.this.logger.isDebugEnabled()) {
KafkaTemplate.this.logger.debug("Failed to send: " + producerRecord, exception);
finally {
if (!KafkaTemplate.this.transactional) {
closeProducer(producer, false);
代码@1 生产者拦截器集合类
代码@2 生产者拦截器类,继承Configurable配置,可在配置类里面加上自定义拦截类
public class ProducerInterceptors<K, V> implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
private final List<ProducerInterceptor<K, V>> interceptors;
public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
this.interceptors = interceptors;
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
// be careful not to throw exception from here
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
log.warn("Error executing interceptor onSend callback", e);
return interceptRecord;
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptor.onAcknowledgement(metadata, exception);
} catch (Exception e) {
// do not propagate interceptor exceptions, just log
log.warn("Error executing interceptor onAcknowledgement callback", e);
public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
if (record == null && interceptTopicPartition == null) {
interceptor.onAcknowledgement(null, exception);
} else {
if (interceptTopicPartition == null) {
interceptTopicPartition = new TopicPartition(record.topic(),
record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception);
} catch (Exception e) {
// do not propagate interceptor exceptions, just log
log.warn("Error executing interceptor onAcknowledgement callback", e);
public void close() {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
} catch (Exception e) {
log.error("Failed to close producer interceptor ", e);
public interface ProducerInterceptor<K, V> extends Configurable {
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
代码@1 回调方法加入到拦截器链路中
代码@2 kafka拦截器链集合
代码@3 KafkaProducer#发送玩一次请求,异步执行回调Callback接口的onCompletion方法
代码@4 调用拦截器的onAcknowledgement方法,执行生产者的各个拦截器的onAcknowledgement方法
代码@5 立即调用KafkaTemplate创建的onCompletion方法,发送成功或者失败日志信息
private Future<RecordMetadata> doSend(ProducerRecord<K, V>
record, Callback callback) {
// @1
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
private static class InterceptorCallback<K, V> implements Callback {
private final Callback userCallback;
// @2
private final ProducerInterceptors<K, V> interceptors;
private final TopicPartition tp;
private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
this.userCallback = userCallback;
this.interceptors = interceptors;
this.tp = tp;
// @3
public void onCompletion(RecordMetadata metadata, Exception exception) {
metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1);
// @4
this.interceptors.onAcknowledgement(metadata, exception);
if (this.userCallback != null)
// @5
this.userCallback.onCompletion(metadata, exception);
public class ProducerInterceptors<K, V> implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
private final List<ProducerInterceptor<K, V>> interceptors;
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V 计算结果
throws CancellationException 如果计算被取消
throws ExecutionException 如果计算抛出异常
throws InterruptedException 如果当前线程在等待时被中断
V get() throws InterruptedException, ExecutionException;
timeout 最长等待时间
unit 超时参数timeout的时间单位
V 计算结果
throws CancellationException 如果计算被取消
throws ExecutionException 如果计算抛出异常
throws InterruptedException 如果当前线程在等待时被中断
throws TimeoutException 如果等待超时
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {}
尝试根据 topic与分区在 kafka 中获取一个双端队列,如果不存在,则创建一个,然后调用 tryAppend 方法将消息追加到缓存中。Kafka 会为每一个 topic 的每一个分区创建一个消息缓存区,消息先追加到缓存中,然后消息发送 API 立即返回,然后由单独的线程 Sender 将缓存区中的消息定时发送到 broker 。这里的缓存区的实现使用的是 ArrayQeque。然后调用 tryAppend 方法尝试将消息追加到其缓存区,如果追加成功,则返回结果。
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
代码@1:首先判断 ProducerBatch 是否还能容纳当前消息,如果剩余内存不足,将直接返回 null。如果返回 null ,会尝试再创建一个新的ProducerBatch。
代码@2:通过 MemoryRecordsBuilder 将消息写入按照 Kafka 消息格式写入到内存中,即写入到 在创建 ProducerBatch 时申请的 ByteBuffer 中。本文先不详细介绍 Kafka 各个版本的消息格式,后续会专门写一篇文章介绍 Kafka 各个版本的消息格式。
代码@3:更新 ProducerBatch 的 maxRecordSize、lastAppendTime 属性,分别表示该批次中最大的消息长度与最后一次追加消息的时间。
代码@4:构建 FutureRecordMetadata 对象,这里是典型的 Future模式,里面主要包含了该条消息对应的批次的 produceFuture、消息在该批消息的下标,key 的长度、消息体的长度以及当前的系统时间。
代码@5:将 callback 、本条消息的凭证(Future) 加入到该批次的 thunks 中,该集合存储了 一个批次中所有消息的发送回执。
流程执行到这里,KafkaProducer 的 send 方法就执行完毕了,返回给调用方的就是一个 FutureRecordMetadata 对象。
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
// @1
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
// @2
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
// @3
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
// @4
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
// @5
thunks.add(new Thunk(callback, future));
return future;
内存存储结构如下ConcurrentMap< TopicPartition, Deque< ProducerBatch>> batches
如果项目方需要使用同步发送的方式,只需要拿到 send 方法的返回结果后,调用其 get() 方法,此时如果消息还未发送到 Broker 上,该方法会被阻塞,等到 broker 返回消息发送结果后该方法会被唤醒并得到消息发送结果。如果需要异步发送,则建议使用 send(ProducerRecord< K, V > record, Callback callback),但不能调用 get 方法即可。Callback 会在收到 broker 的响应结果后被调用,并且支持拦截器
public void sendMessage(String topic, String dataBuyPoint) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate
.send(topic, dataBuyPoint);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
public void onSuccess(SendResult<String, String> result) {
public void onFailure(Throwable ex) {
如果第一步未追加成功,说明当前没有可用的 ProducerBatch,则需要创建一个 ProducerBatch,故先从 BufferPool 中申请 batch.size 的内存空间,为创建 ProducerBatch 做准备,如果由于 BufferPool 中未有剩余内存,则最多等待 maxTimeToBlock ,如果在指定时间内未申请到内存,则抛出异常
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
创建一个新的批次 ProducerBatch,并将消息写入到该批次中,并返回追加结果,这里有如下几个关键点:
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 省略部分代码
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
代码@1:对生产者缓存区 ConcurrentHashMap<TopicPartition, Deque< ProducerBatch>> batches 遍历,从中挑选已准备好的消息批次。
代码@2:从生产者元数据缓存中尝试查找分区(TopicPartition) 的 leader 信息,如果不存在,当将该 topic 添加到 unknownLeaderTopics
代码@3:稍后会发送元数据更新请求去 broker 端查找分区的路由信息。
代码@4:如果不在 readyNodes 中就需要判断是否满足条件,isMuted 与顺序消息有关,本文暂时不关注,在后面的顺序消息部分会重点探讨。
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { // @1
TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part); // @2
synchronized (deque) {
if (leader == null && !deque.isEmpty()) { // @3
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) { // @4
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) { // @5
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
代码@2:遍历所有节点,调用 drainBatchesForOneNode 方法抽取数据,组装成 Map<Integer List< ProducerBatch>> batches。
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { // @1
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now); // @2
batches.put(node.id(), ready);
return batches;
代码@1:根据 brokerId 获取该 broker 上的所有主分区。
代码@2:初始化 start。这里首先来阐述一下 start 与 drainIndex 。
代码@4:根据 topic + 分区号从生产者发送缓存区中获取已累积的双端Queue。
代码@7:如果当前已抽取的消息总大小 加上新的消息已超过 maxRequestSize,则结束抽取。
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); // @1
List<ProducerBatch> ready = new ArrayList<>();
int start = drainIndex = drainIndex % parts.size(); // @2
do { // @3
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();
if (isMuted(tp, now))
Deque<ProducerBatch> deque = getDeque(tp); // @4
if (deque == null)
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
ProducerBatch first = deque.peekFirst(); // @5
if (first == null)
// first != null
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // @6
// Only drain the batch if it is not during backoff period.
if (backoff)
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // @7
} else {
if (shouldStopDrainBatchesForPartition(first, tp))
// 这里省略与事务消息相关的代码,后续会重点学习。
batch.close(); // @8
size += batch.records().sizeInBytes();
} while (start != drainIndex);
return ready;
ArrayDeque 是非线程安全
transient Object[] elements; // non-private to simplify nested class access
transient int head;
transient int tail;
private static final int MIN_INITIAL_CAPACITY = 8;
addFirst(E e)在数组前面添加元素
addLast(E e)在数组后面添加元素
offerFirst(E e) 在数组前面添加元素,并返回是否添加成功
offerLast(E e) 在数组后天添加元素,并返回是否添加成功
removeFirstOccurrence(Object o) 删除第一次出现的指定元素
removeLastOccurrence(Object o) 删除最后一次出现的指定元素
getFirst() 获取第一个元素,如果没有将抛出异常
getLast() 获取最后一个元素,如果没有将抛出异常
add(E e) 在队列尾部添加一个元素
offer(E e) 在队列尾部添加一个元素,并返回是否成功
remove() 删除队列中第一个元素,并返回该元素的值,如果元素为null,将抛出异常(其实底层调用的是removeFirst())
poll() 删除队列中第一个元素,并返回该元素的值,如果元素为null,将返回null(其实调用的是pollFirst())
element() 获取第一个元素,如果没有将抛出异常
peek() 获取第一个元素,如果返回null
push(E e) 栈顶添加一个元素
pop(E e) 移除栈顶元素,如果栈顶没有元素将抛出异常
size() 获取队列中元素个数
isEmpty() 判断队列是否为空
iterator() 迭代器,从前向后迭代
descendingIterator() 迭代器,从后向前迭代
contain(Object o) 判断队列中是否存在该元素
toArray() 转成数组
clear() 清空队列
clone() 克隆(复制)一个新的队列
dosend() 主要做了两个事情:
调用过程 Sender -> NetworkClient -> Selector(Kafka 封装的) -> Selector(Java NIO)
sender.wakeup() 方法的作用就是将 Sender 线程从 select()方法的阻塞中唤醒,select() 方法的作用是轮询注册在多路复用器上的 Channel,它会一直阻塞在这个方法上,除非满足下面条件中的一个:
否则 select() 将会一直轮询,阻塞在这个地方,直到条件满足。
KafkaProducer 中 dosend() 方法调用 sender.wakeup() 方法作用就是:当有新的 RecordBatch 创建后,旧的 RecordBatch 就可以发送了(或者此时有 Metadata 请求需要发送),如果线程阻塞在 select() 方法中,就将其唤醒,Sender 重新开始运行 run() 方法,在这个方法中,旧的 RecordBatch (或相应的 Metadata 请求)将会被选中,进而可以及时将这些请求发送出去
// org.apache.kafka.clients.producer.internals.Sender
* Wake up the selector associated with this send thread
public void wakeup() {
// org.apache.kafka.clients.NetworkClient
* Interrupt the client if it is blocked waiting on I/O.
public void wakeup() {
// org.apache.kafka.common.network.Selector
* Interrupt the nioSelector if it is blocked waiting to do I/O.
//note: 如果 selector 是阻塞的话,就唤醒
public void wakeup() {
代码@1:Sender 线程在运行状态下主要的业务处理方法,将消息缓存区中的消息向 broker 发送。
代码@2:如果主动关闭 Sender 线程,如果不是强制关闭,则如果缓存区还有消息待发送,再次调用 runOnce 方法将剩余的消息发送完毕后再退出。
代码@3:如果强制关闭 Sender 线程,则拒绝未完成提交的消息。
代码@4:关闭 Kafka Client 即网络通信对象。
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
// @1
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
// @2
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
try {
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
// @3
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
log.debug("Aborting incomplete batches due to forced shutdown");
try {
// @4
} catch (Exception e) {
log.error("Failed to close network client", e);
log.debug("Shutdown of Kafka producer I/O thread has completed.");
代码@1 accumulator.ready():遍历所有的 tp(topic-partition),如果其对应的 RecordBatch 可以发送(大小达到 batch.size 大小或时间达到 linger.ms),就将其对应的 leader 选出来,最后会返回一个可以发送 Produce request 的 Set(实际返回的是 ReadyCheckResult实例,不过 Set 是最主要的成员变量);
代码@2 如果发现有 tp 没有 leader,那么这里就调用 requestUpdate() 方法更新 metadata,实际上还是在第一步对 tp 的遍历中,遇到没有 leader 的 tp 就将其加入到一个叫做unknownLeaderTopics 的 set 中,然后会请求这个 tp 的 meta;
代码@3 accumulator.drain():遍历每个 leader (第一步中选出)上的所有 tp,如果该 tp 对应的 RecordBatch 不在 backoff 期间(没有重试过,或者重试了但是间隔已经达到了 retryBackoffMs ),并且加上这个 RecordBatch 其大小不超过 maxSize(一个 request 的最大限制,默认为 1MB),那么就把这个 RecordBatch 添加 list 中,最终返回的类型为 Map<Integer, List>,key 为 leader.id,value 为要发送的 RecordBatch 的列表;
代码@4 sendProduceRequests():发送 Produce 请求,从图中,可以看出,这个方法会调用 NetworkClient.send() 来发送 clientRequest;
代码@5 NetworkClient.poll():关于 socket 的 IO 操作都是在这个方法进行的,它还是调用 Selector 进行的相应操作,而 Selector 底层则是封装的 Java NIO 的相关接口
注意:代码@3如果要向一个 leader 发送 Produce 请求,那么这 leader 对应 tp,如果其 RecordBatch 没有达到要求(batch.size 或 linger.ms 都没达到)还是可能会发送,这样做的好处是:可以减少 request 的频率,有利于提供发送效率
//note: Sender 线程每次循环具体执行的地方
void run(long now) {
Cluster cluster = metadata.fetch();
//@1 获取那些已经可以发送的 RecordBatch 对应的 nodes
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
//@2 如果有 topic-partition 的 leader 是未知的,就强制 metadata 更新
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
//note: 如果与node 没有连接(如果可以连接,会初始化该连接),暂时先移除该 node
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {//note: 没有建立连接的 broker,这里会与其建立连接
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
//@3 返回该 node 对应的所有可以发送的 RecordBatch 组成的 batches(key 是 node.id,这些 batches 将会在一个 request 中发送)
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
//note: 保证一个 tp 只有一个 RecordBatch 在发送,保证有序性
//note: max.in.flight.requests.per.connection 设置为1时会保证
if (guaranteeMessageOrder) {
// Mute all the partitions draine
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
//note: 将由于元数据不可用而导致发送超时的 RecordBatch 移除
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
//@4 发送 RecordBatch
sendProduceRequests(batches, now);
//note: 如果有 partition 可以立马发送数据,那么 pollTimeout 为0.
//@5 关于 socket 的一些实际的读写操作
this.client.poll(pollTimeout, now);
Step1:首先根据当前时间,根据缓存队列中的数据判断哪些 topic 的 哪些分区已经达到发送条件
// @1
Cluster cluster = metadata.fetch();
// @2
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
Step2:如果在待发送的消息未找到其路由信息,则需要首先去 broker 服务器拉取对应的路由信息(分区的 leader 节点信息)
// @1
if (!result.unknownLeaderTopics.isEmpty()) {
// @2
for (String topic : result.unknownLeaderTopics)
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
// @3
2、client pollDelayMs 预估分区在接下来多久的时间间隔内都将处于未转变好状态(not ready),其标准如下:
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
Step4:根据已准备的分区,从缓存区中抽取待发送的消息批次(ProducerBatch),并且按照 nodeId:List 组织,注意,抽取后的 ProducerBatch 将不能再追加消息了,就算还有剩余空间可用
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
Step5:将抽取的 ProducerBatch 加入到 inFlightBatches 数据结构,该属性的声明如下:Map<TopicPartition, List< ProducerBatch >> inFlightBatches,即按照 topic-分区 为键,存放已抽取的 ProducerBatch,这个属性的含义就是存储待发送的消息批次。可以根据该数据结构得知在消息发送时以分区为维度反馈 Sender 线程的“积压情况”,max.in.flight.requests.per.connection 就是来控制积压的最大数量,如果积压达到这个数值,针对该队列的消息发送会限流
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
Step6:从 inflightBatches 与 batches 中查找已过期的消息批次(ProducerBatch),判断是否过期的标准是系统当前时间与 ProducerBatch 创建时间之差是否超过120s,过期时间可以通过参数 delivery.timeout.ms 设置
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeoutMs, now);
Step7:处理已超时的消息批次,通知该批消息发送失败,即通过设置 KafkaProducer#send 方法返回的凭证中的 FutureRecordMetadata 中的 ProduceRequestResult result,使之调用其 get 方法不会阻塞
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
Step10:该步骤按照 brokerId 分别构建发送请求,即每一个 broker 会将多个 ProducerBatch 一起封装成一个请求进行发送,同一时间,每一个 与 broker 连接只会只能发送一个请求,注意,这里只是构建请求,并最终会通过 NetworkClient#send 方法,将该批数据设置到 NetworkClient 的待发送数据中,此时并没有触发真正的网络调用
sendProduceRequests(batches, now);
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
代码@1:判断是否需要更新 meta,如果需要就更新(请求更新 metadata 的地方)。
代码@2:调用 Selector.poll() 进行 socket 相关的 IO 操作,触发真正的网络通讯,该方法中会通过收到调用 NIO 中的 Selector#select() 方法,对通道的读写就绪事件进行处理,当写事件就绪后,就会将通道中的消息发送到远端的 broker。
代码@4:并依次对结果进行唤醒,此时会将响应结果设置到 KafkaProducer#send 方法返回的凭证中,从而唤醒发送客户端,完成一次完整的消息发送流程
handleCompletedSends方法, 用于处理不需要响应的请求, 即发送成功就可执行该方法
两个方法的相同点: 都会从inflightreques队列中删除对应的reques, 将request加入请求响应(不需要回复的request使用默认响应,非真值)
public List<ClientResponse> poll(long timeout, long now) {
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
return responses;
long metadataTimeout = metadataUpdater.maybeUpdate(now); // @1
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // @2
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>(); // @3
//note: 处理已经完成的 send(不需要 response 的 request,如 send)
handleCompletedSends(responses, updatedNow);
//note: 处理从 server 端接收到 Receive(如 Metadata 请求)
handleCompletedReceives(responses, updatedNow);
//note: 处理连接失败那些连接,重新请求 meta
handleDisconnections(responses, updatedNow);
//note: 处理新建立的那些连接(还不能发送请求,比如:还未认证)
//对那些新建立的连接,发送 apiVersionRequest(默认情况:第一次建立连接时,需要向 Broker 发送 ApiVersionRequest 请求);
//处理 timeout 的连接,关闭该连接,并刷新 Metadata。
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses); // @4
return responses;
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
boolean madeReadProgressLastCall = madeReadProgressLastPoll;
//note: Step1 清除相关记录
boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
timeout = 0;
if (!memoryPool.isOutOfMemory() && outOfMemory) {
//we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
log.trace("Broker no longer low on memory - unmuting incoming sockets");
for (KafkaChannel channel : channels.values()) {
if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
outOfMemory = false;
/* check ready keys */
//note: Step2 获取就绪事件的数
long startSelect = time.nanoseconds();
int numReadyKeys = select(timeout);//轮询
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
//note: Step3 处理 io 操作
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
// Poll from channels that have buffered data (but nothing more from the underlying socket)
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
// Poll from channels where the underlying socket has more data
pollSelectionKeys(readyKeys, false, endSelect);
// Clear all selected keys so that they are included in the ready count for the next select
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
} else {
madeReadProgressLastPoll = true; //no work is also "progress"
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
//note: 每次 poll 之后会调用一次
//TODO: 连接虽然关闭了,但是 Client 端的缓存依然存在
// Add to completedReceives after closing expired connections to avoid removing
// channels with completed receives until all staged receives are completed.
//note: Step4 将处理得到的 stagedReceives 添加到 completedReceives 中
clear() 方法是在每次 poll() 执行的第一步,它作用的就是清理上一次 poll 过程产生的部分缓存。
//note: 每次 poll 调用前都会清除以下缓存
private void clear() {
// Remove closed channels after all their staged receives have been processed or if a send was requested
for (Iterator<Map.Entry<String, KafkaChannel>> it = closingChannels.entrySet().iterator(); it.hasNext(); ) {
KafkaChannel channel = it.next().getValue();
Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
boolean sendFailed = failedSends.remove(channel.id());
if (deque == null || deque.isEmpty() || sendFailed) {
doClose(channel, true);
Selector 的 select() 方法在实现上底层还是调用 Java NIO 原生的接口,这里的 nioSelector 其实就是 java.nio.channels.Selector 的实例对象,这个方法最坏情况下,会阻塞 ms 的时间,如果在一次轮询,只要有一个 Channel 的事件就绪,它就会立马返回。
private int select(long ms) throws IOException {
if (ms < 0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (ms == 0L)
return this.nioSelector.selectNow();
return this.nioSelector.select(ms);
这部分是 socket IO 的主要部分,发送 Send 及接收 Receive 都是在这里完成的,在 poll() 方法中,这个方法会调用两次:
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
//note: 处理一些刚建立 tcp 连接的 channel
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {//note: 连接已经建立
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
} else
/* if channel is not ready finish prepare */
//note: 处理 tcp 连接还未完成的连接,进行传输层的握手及认证
if (channel.isConnected() && !channel.ready())
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)//note: 知道读取一个完整的 Receive,才添加到集合中
addToStagedReceives(channel, networkReceive);//note: 读取数据
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);//note: 将完成的 send 添加到 list 中
this.sensors.recordBytesSent(channel.id(), send.size());
/* cancel any defunct sockets */
//note: 关闭断开的连接
if (!key.isValid())
close(channel, true);
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel, true);
这个方法的目的是处理接收到的 Receive,由于 Selector 这个类在 Client 和 Server 端都会调用,这里分两种情况讲述一下:
这个方法只有配合 Server 端的调用才能看明白其作用,它统一 Client 和 Server 调用的 api,使得都可以使用 Selector 这个类。
* checks if there are any staged receives and adds to completedReceives
private void addToCompletedReceives() {
if (!this.stagedReceives.isEmpty()) {//note: 处理 stagedReceives
Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
if (!channel.isMute()) {
Deque<NetworkReceive> deque = entry.getValue();
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
NetworkReceive networkReceive = stagedDeque.poll();
this.completedReceives.add(networkReceive); //note: 添加到 completedReceives 中
this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
(1)周期性的更新: 每隔一段时间更新一次,这个通过 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 这2个字段来实现
metadata.max.age.ms //缺省300000,即10分钟1次
(2) 失效检测,强制更新:检查到metadata失效以后,调用metadata.requestUpdate()强制更新
public void run() {
// main loop, runs until close is called
while (running) {
try {
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
public void run(long now) {
Cluster cluster = metadata.fetch();
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); //遍历消息队列中所有的消息,找出对应的,已经ready的Node
if (result.unknownLeadersExist) //如果一个ready的node都没有,请求更新metadata
for (ClientRequest request : requests)
client.send(request, now);
this.client.poll(pollTimeout, now);
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now); //关键点:每次poll的时候判断是否要更新metadata
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow); //在返回的handler中,会处理metadata的更新
handleDisconnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
return responses;
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
if (metadataTimeout == 0) {
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now); //找到负载最小的Node,加载最少的节点node
maybeUpdate(now, node); //把更新Metadata的请求,发给这个Node
return metadataTimeout;
private void maybeUpdate(long now, Node node) {
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();
this.metadataFetchInProgress = true;
ClientRequest metadataRequest = request(now, nodeConnectionId, topics); //关键点:发送更新Metadata的Request
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(metadataRequest, now); //这里只是异步发送,返回的response在上面的handleCompletedReceives里面处理
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);
} else { // connected, but can't send more OR connecting
this.lastNoNodeAvailableMs = now;
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
ResponseHeader header = ResponseHeader.parse(receive.payload());
// Always expect the response version id to be the same as the request version id
short apiKey = req.request().header().apiKey();
short apiVer = req.request().header().apiVersion();
Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
correlate(req.request().header(), header);
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
short apiKey = req.request().header().apiKey();
if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
handleResponse(req.request().header(), body, now);
return true;
return false;
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;
MetadataResponse response = new MetadataResponse(body);
Cluster cluster = response.cluster(); //从response中,拿到一个新的cluster对象
if (response.errors().size() > 0) {
log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now); //更新metadata,用新的cluster覆盖旧的cluster
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now); //更新metadata失败,做失败处理逻辑
//更新成功,version+1, 同时更新其它字段
public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
for (Listener listener: listeners)
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster; //新的cluster覆盖旧的cluster
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
public synchronized void failedUpdate(long now) {
this.lastRefreshMs = now;
public Node leastLoadedNode(long now) {
List<Node> nodes = this.metadataUpdater.fetchNodes();
if (nodes.isEmpty())
throw new IllegalStateException("There are no nodes in the Kafka cluster");
int inflight = Integer.MAX_VALUE;
Node foundConnecting = null;
Node foundCanConnect = null;
Node foundReady = null;
int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
if (canSendRequest(node.idString(), now)) {
int currInflight = this.inFlightRequests.count(node.idString());
if (currInflight == 0) {
// if we find an established connection with no in-flight requests we can stop right away
log.trace("Found least loaded node {} connected with no in-flight requests", node);
return node;
} else if (currInflight < inflight) {
// otherwise if this is the best we have found so far, record that
inflight = currInflight;
foundReady = node;
} else if (connectionStates.isPreparingConnection(node.idString())) {
foundConnecting = node;
} else if (canConnect(node, now)) {
if (foundCanConnect == null ||
this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
this.connectionStates.lastConnectAttemptMs(node.idString())) {
foundCanConnect = node;
} else {
log.trace("Removing node {} from least loaded node selection since it is neither ready " +
"for sending or connecting", node);
// We prefer established connections if possible. Otherwise, we will wait for connections
// which are being established before connecting to new nodes.
if (foundReady != null) {
log.trace("Found least loaded node {} with {} inflight requests", foundReady, inflight);
return foundReady;
} else if (foundConnecting != null) {
log.trace("Found least loaded connecting node {}", foundConnecting);
return foundConnecting;
} else if (foundCanConnect != null) {
log.trace("Found least loaded node {} with no active connection", foundCanConnect);
return foundCanConnect;
} else {
log.trace("Least loaded node selection failed to find an available node");
return null;
private boolean canSendRequest(String node, long now) {
return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
private boolean isReady(NodeConnectionState state, long now) {
return state != null && state.state == ConnectionState.READY && state.throttleUntilTimeMs <= now;
public boolean isPreparingConnection(String id) {
NodeConnectionState state = nodeState.get(id);
return state != null &&
(state.state == ConnectionState.CONNECTING || state.state == ConnectionState.CHECKING_API_VERSIONS);
public boolean canConnect(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
return state.state.isDisconnected() &&
now - state.lastConnectAttemptMs >= state.reconnectBackoffMs;
CHECKING_API_VERSIONS: 已建立连接,正在检查API版本。此检查失败将导致连接关闭就绪:连接已准备好发送请求
public enum ConnectionState {
public boolean isDisconnected() {
public boolean isConnected() {
return this == CHECKING_API_VERSIONS || this == READY;
public void send(ClientRequest request, long now) {
doSend(request, false, now);
Producer 端的请求都是通过 NetworkClient.dosend() 来发送的,其作用就是:
//note: 发送请求
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
String nodeId = clientRequest.destination();
if (!isInternalRequest) {
// If this request came from outside the NetworkClient, validate
// that we can send data. If the request is internal, we trust
// that that internal code has done this validation. Validation
// will be slightly different for some internal requests (for
// example, ApiVersionsRequests can be sent prior to being in
// READY state.)
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
AbstractRequest request = null;
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
//note: 构建 AbstractRequest, 检查其版本信息
try {
NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending message of type {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), nodeId, builder.version());
} else {
short version = versionInfo.usableVersion(clientRequest.apiKey());
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
request = builder.build();//note: 当为 Produce 请求时,转化为 ProduceRequest,Metadata 请求时,转化为 Metadata 请求
} catch (UnsupportedVersionException e) {
// If the version is not supported, skip sending the request over the wire.
// Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} to {}",
clientRequest.toString(), clientRequest.destination(), e);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(),
clientRequest.callback(), clientRequest.destination(), now, now,
false, e, null);
RequestHeader header = clientRequest.makeHeader();
if (log.isDebugEnabled()) {
int latestClientVersion = ProtoUtils.latestVersion(clientRequest.apiKey().id);
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} to node {}.", request, nodeId);
} else {
log.debug("Using older server API v{} to send {} to node {}.",
header.apiVersion(), request, nodeId);
//note: Send是一个接口,这里返回的是 NetworkSend,而 NetworkSend 继承 ByteBufferSend
Send send = request.toSend(nodeId, header);
InFlightRequest inFlightRequest = new InFlightRequest(
//note: 将 send 和对应 kafkaChannel 绑定起来,并开启该 kafkaChannel 底层 socket 的写事件
作用就是获取该 Send 对应的 KafkaChannel,调用 setSend() 向 KafkaChannel 注册一个 Write 事件。
//note: 发送请求
public void send(Send send) {
String connectionId = send.destination();
if (closingChannels.containsKey(connectionId))
else {
KafkaChannel channel = channelOrFail(connectionId, false);
try {
} catch (CancelledKeyException e) {
close(channel, false);
setSend() 方法需要配合 write()(该方法是在 Selector.poll() 中调用的) 方法一起来看
setSend():将当前 KafkaChannel 的 Send 赋值为要发送的 Send,并注册一个 OP_WRITE 事件;
write():发送当前的 Send,发送完后删除注册的 OP_WRITE 事件。
//note: 每次调用时都会注册一个 OP_WRITE 事件
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
//note: 调用 send() 发送 Send
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
return result;
//note: 发送完成后,就删除这个 WRITE 事件
private boolean send(Send send) throws IOException {
if (send.completed())
return send.completed();