Kafka 有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,也就是说Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份,如下所示:
现在我抛出一个问题你可以先思考一下:你觉得为什么Kafka 要做这样的设计?为什么使用分区的概念而不是直接使用多个主题呢?
其实分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性
(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
值得注意的是,不同的分布式系统对分区的叫法也不尽相同。比如在 Kafka 中叫分区,在 MongoDB 和Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode。从表面看起来
它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变。
除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如实现业务级别的消息顺序的问题。
生产者发送的消息实体 ProducerRecord 的构造方法:
我们发送消息时可以指定分区号,如果不指定那就需要分区器,这个很重要,一条消息该发往哪一个分区,关系到顺序消息问题。下面我们说说 Kafka 生产者的分区策略。所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。
① Partitioner 接口源码:
public interface Partitioner extends Configurable, Closeable {
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
void close();
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}
这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。只要你自己的实现类定义好了 partition 方法,同时设置partitioner.class 参数为你自己实现类的 Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区。
② 实现自定义分区策略 DefinePartitioner:
public class DefinePartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
int size = partitionInfos.size();
if(null==keyBytes){
return counter.getAndIncrement() % size;
}else{
return Utils.toPositive(Utils.murmur2(keyBytes) % size);
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
③ 显式地配置生产者端的参数 partitioner.class:
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// kafka生产者属性配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// 使用自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
DefinePartitioner.class.getName());
// kafka生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String,String>("test","hello,kafka");
kafkaProducer.send(producerRecord).get();
// 关闭资源
kafkaProducer.close();
}
}
虽说可以有无数种分区的可能,但比较常见的分区策略也就那么几种,分析源码我们查看到提供了三种分区策略:
也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,就像下面这张图展示的那样。
这就是所谓的轮询策略,轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。
RoundRobinPartitioner 轮询策略实现源码:
/**
The "Round-Robin" partitioner
This partitioning strategy can be used when user wants to distribute the writes to all partitions equally. This is the behaviour regardless of record key hash.
这种分区策略可以用于用户想要消息被平均分配到所有分区,这是种与key的哈希值无关的行为
*/
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题中所有的分区列表
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 根据主题获取下一个原子数+1
int nextValue = nextValue(topic);
// 获取可用的分区列表
List<PartitionInfo> availablePartitions
= cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
// 取余,这样获取的就是一个轮询的方式,从可用的分区列表中获取分区
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
// 取余,这样获取的就是一个轮询的方式,从分区列表中获取分区
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic) {
// 根据Topic获取Map中的AtomicInteger的值,获取不到就默认是0
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
public void close() {}
}
也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。
如果要实现随机策略版的 partition 方法,很简单,只需要两步即可:先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
UniformStickyPartitioner 随机策略实现源码:
public class UniformStickyPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public void configure(Map<String, ?> configs) {}
am cluster The current cluster metadata
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return stickyPartitionCache.partition(topic, cluster);
}
public void close() {}
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}
其核心的方法是 stickyPartitionCache.partition(topic, cluster),分析 StickyPartitionCache 源码:
public class StickyPartitionCache {
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}
public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
// 获取所有的分区列表
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 获取上一次发送消息的的分区系数
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
if (oldPart == null || oldPart == prevPartition) {
// 获取所有可用的分区列表
List<PartitionInfo> availablePartitions
= cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
// 获取随机数
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
// 根据随机数和分区总数取余,获取系数
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
// 如果只有一个可用的分区,那么默认只能选择这个分区
newPart = availablePartitions.get(0).partition();
} else {
// 获取的分区系数和上一次的不能一样
while (newPart == null || newPart.equals(oldPart)) {
// 获取随机数
int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
// 根据随机数和分区总数取余,获取系数
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// Topic 对应的信息如果不存在 MapCache 就放入,存在就替换
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
}
Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。
Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,版本不同则不一样0.9.0~2.3版本采用轮询策略,目前2.4、2.5采用随机算法。
/**
The default partitioning strategy:
If a partition is specified in the record, use it
If no partition is specified but a key is present choose a partition based on a hash of the key
If no partition or key is present choose the sticky partition that changes when the batch is full.
*/
public class DefaultPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
// key如果不存在使用随机算法
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
// 根据key的字节数计算获取分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
public void close() {}
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}
我曾经给一个国企进行过 Kafka 培训,碰到的一个问题就是如何实现消息的顺序问题。这家企业发送的 Kafka 的消息是有因果关系的,故处理因果关系也必须要保证有序性,否则先处理了“果”后处理“因”必然造成业务上的混乱。
当时那家企业的做法是给 Kafka 主题设置单分区,也就是 1 个分区。这样所有的消息都只在这一个分区内读写,因此保证了全局的顺序性。这样做虽然实现了因果关系的顺序性,但也丧失了 Kafka 多分区带来的高吞吐量和负载均衡的优势。
后来经过了解和调研,发现这种具有因果关系的消息都有一定的特点,比如在消息体中都封装了固定的标志位,后来我就建议他们对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区,这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利。
这种基于个别字段的分区策略本质上就是按消息键保序的思想,其实更加合适的做法是把标志位数据提取出来统一放到Key 中,这样更加符合 Kafka 的设计思想。经过改造之后,这个企业的消息处理吞吐量一下提升了 40 多倍,从这个案例你也可以看到自定制分区策略的效果可见一斑。