Kafka生产者在发送消息时主要存在两个线程:主线程 和 Sender线程。
通过这种暂存机制,可以提升Kafka的吞吐量:
Kafka生产者发送消息的整体流程如下:
由此,我们可以引出KafkaProducer的几个核心组件:
接下来,我们将从主线程和Sender线程两个方面来分析KafkaProducer的消息发送过程
我们从该发送消息的方法入口进入阅读源码
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
在调用发送消息方法后,会先进行一些准备工作:
这些准备工作不是我们这里关注的重点,感兴趣的可以自己去看一下源码
做完准备工作后,生产者接下来要开始发送消息了,首先就需要确认消息发送给topic下的哪个partition这
这里就用到了上文提到过的的一个组件——partitioner
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
......
// 确定目标Partition
int partition = partition(record, serializedKey, serializedValue, cluster);
......
}
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
// 若ProducerRecord中强制指定了partition, 则以该值为准
Integer partition = record.partition();
// 否则调用Partitioner动态计算对应的partition
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
在创建KafkaProducer时,可以通过"partitioner.class"配置来指定Partitioner的实现类。若未指定,则使用Kafka内置实现类——DefaultPartitioner。来看DefaultPartitioner的具体实现:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
// 如果没指定key,会调用stickyPartitionCache缓存,发送到该topic某个特定的partition上
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
StickyPartitionCache实际就是一个缓存,其内部维护了一个 ConcurrentMap,用以保存某topic应该发送到哪个分区
public class StickyPartitionCache {
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}
public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
if (oldPart == null || oldPart == prevPartition) {
// 从集群的元数据中获取可以使用的分区
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
// 只有一个分区则使用当前分区
newPart = availablePartitions.get(0).partition();
} else {
// 保证在多线程情况下不会重复创建
while (newPart == null || newPart.equals(oldPart)) {
// 使用ThreadLocalRandom来随机获取一个可用的分区,
// 注意,这里的ThreadLocalRandom都是去使用主线程的seed,因此即使执行了多次也是返回同一个随机数,确保线程安全
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
}
通过以上代码,可以知道DefaultPartitioner的分区策略如下:
RecordAccumulator作为消息暂存者,其思想是将目的地Partition相同的消息放到一起,并按一定的"规格"(由"batch.size"配置指定)划分成多个"批次"(ProducerBatch),然后以批次为单位进行数据压缩&发送。示意图如下:
因此每一个TopicPartition都对应一个Batch队列,RecordAccumulator中使用一个ConcurrentHashMap进行存储:ConcurrentMap
RecordAccumulator有两个核心方法,分别对应"存"和"取":
来看看append方法
在public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs) throws InterruptedException {
// appendsInProgress是一个AtomicInteger,用于记录正在进行存操作的线程数
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 从batches中获取该partition对应的队列,没有则创建,ConcurrentHashMap会保证多个线程获取的是同一个队列
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 以 partition 为粒度进行同步的存操作
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 尝试将内容写入batch,如果写入成功则直接返回
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}
// 如果不允许创建新的批次,则先返回,等待下一次append(默认为false,一般都是允许创建新的批次的)
if (abortOnNewBatch) {
// 第二个参数表示批次是否已满,第三个参数表示是否创建了新的批次
return new RecordAppendResult(null, false, false, true);
}
// 为缓冲区分配合适的空间
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
// 更新当前时间
nowMs = time.milliseconds();
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 开始第二次写入,通常情况下还是没有batch空间的
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
return appendResult;
}
// 在缓冲区上创建batch,在该batch上append,返回一个future
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));
// 将新创建的批次放入队列中和未完成的批次集合中
dq.addLast(batch);
incomplete.add(batch);
buffer = null;
// 将该future存入RecordAppendResult中返回
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
来看 tryAppend 方法,就是先从队列中获取最后一个批次ProducerBatch,只要批次存在,就在这个批次中写入数据,返回一个future,主题逻辑跟上面一样,都是通过ProducerBatch进行写数据
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
return null;
}
那么具体是如何在这个批次上进行数据写入的呢,我们来看 ProducerBatch 的 tryAppend 方法
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
// 首先检查该recordsBuilder是否有充足的空间写入
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
// 在recordsBuilder上写入数据(包含数据压缩),并返回写入数据的CRC
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
// 更新最大记录大小和更新时间
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
// 返回这条写入记录的Future,其中produceFuture为这个ProducerBatch公用的Future,
// recordCount为这条记录的偏移量,checksum为这条记录的CRC
// 此外,还包括了这条记录key和value的大小,通过这些数据可以唯一确认和校验一条记录!!!!
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
thunks.add(new Thunk(callback, future));
// 更新偏移量
this.recordCount++;
return future;
}
}
可以看到 ProducerBatch又通过recordsBuilder进行数据写入,这里就不再继续往下看了,内部实际就是对要发送的消息进行了数据压缩,并通过计算得到这批数据的CRC校验值。
写完后返回了这条数据的RecordMetadata,其中包含了这条数据的偏移量,校验值,内容长度,通过这些数据可以唯一确认和校验一条记录,此外,还包含了各个批次一个公有的produceFuture,这个Future是什么稍后再看
append完成后,我们再回到 KafkaProducer的 doSend()方法
// 如果在内部不允许创建新的批次,则在外部重新创建partition再重新写入(默认不会这样)
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
// 处理事务
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
// 如果发现批次以及满了或者创建了新的批次,则唤醒sender线程!!!!
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
// 返回该batch的future
return result.future;
可以看到,如果我们这一批写满了或者新的批次被创建出来了(实际就是上一批写满了),就会去唤醒Sender线程,告诉它我们暂存的数据够多了,该把这些数据发送给Server了!
最后,发送完毕后,返回的是我们上面方法创建的FutureRecordMetadata
那么这个返回的future到底是什么,它实际就是我们使用消费者线程发送消息后返回的Future
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
来看看他内部包含的信息是如何写入的:
首先,在使用 ProducerBatch 写信息前,会在构建函数中将该Batch对应的topicPartition放入Future中
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
...
this.produceFuture = new ProduceRequestResult(topicPartition);
...
}
之后,在消息发送完毕后,也会将该消息对应的偏移量、logAppendTime, exception写入到该Future中。
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
produceFuture.set(baseOffset, logAppendTime, exception);
}
由此,就可以通过该Future获取我们所需要的信息
以上主线程执行完毕,消息被写入了topicpartition的一个又一个batch中。Sender线程就开始负责发送数据了
Sender线程需要用到一个基础通信类NetworkClient,NetworkClient中有一个核心属性 Selectable selector,进行网络通讯时的 send和 poll 操作都是通过 selector实现的,org.apache.kafka.common.network.Selector 内部则通过 java.nio.channels.Selector 来实现,因此 Kafka内部本质是通过NIO进行网络通讯的
在KafkaProducer中,和Sender线程相关的有两个属性:
他们在 KafkaProducer 构造函数中被创建:
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
...
return new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
来看Sender的run方法,实际就是通过sendProducerData方法发送消息
public void run() {
log.debug("Starting Kafka producer I/O thread.");
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
.....
}
void runOnce() {
... ...
// 1. 发送请求,并确定下一步的阻塞超时时间
long pollTimeout = sendProducerData(now);
// 2. 处理端口事件,poll的timeout为上一步计算结果
client.poll(pollTimeout, now);
}
来看sendProducerData方法:
........
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
........
sendProduceRequests(batches, now);
return pollTimeout;
可以看到这里通过 accumulator.drain 将刚刚写入到batch中的信息重新取出
取出后 sendProduceRequests 会将batch中的信息构建成 ClientRequest 通过 KafkaClient 的 send方法将消息发送至指定的partition,在发送完毕后会有一个 callback 来进行异步的处理
由此一来,Kafka整个消息的发送逻辑就完成了
参考文献:https://zhuanlan.zhihu.com/p/371361083
https://blog.csdn.net/wanger61?spm=1000.2115.3001.5343