了解前面两个机制后,你自然会想到一个问题。就是消息如何进行路由?也即是两个相关联的问题。
这两个问题其实都不难,你只要在几个Config类中稍微找一找就能找到答案。
首先,在Producer中,可以指定一个Partitioner来对消息进行分配。
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" +
""
+
"org.apache.kafka.clients.producer.internals.DefaultPartitioner
: The default partitioner. " +
"This strategy will try sticking to a partition until the batch is full, or linger.ms
is up. It works with the strategy:" +
""
+
"- If no partition is specified but a key is present, choose a partition based on a hash of the key
" +
"- If no partition or key is present, choose the sticky partition that changes when the batch is full, or
linger.ms
is up. " +
"" +
"" +
"org.apache.kafka.clients.producer.RoundRobinPartitioner
: This partitioning strategy is that " +
"each record in a series of consecutive records will be sent to a different partition(no matter if the 'key' is provided or not), " +
"until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " +
"Please check KAFKA-9965 for more detail." +
"" +
"org.apache.kafka.clients.producer.UniformStickyPartitioner
: This partitioning strategy will " +
"try sticking to a partition(no matter if the 'key' is provided or not) until the batch is full, or linger.ms
is up." +
"" +
"" +
"Implementing the org.apache.kafka.clients.producer.Partitioner
interface allows you to plug in a custom partitioner."
;
这里就说明了Kafka是通过一个Partitioner接口的具体实现来决定一个消息如何根据Key分配到对应的Partition上的。你甚至可以很简单的实现一个自己的分配策略。
消息发送者主要实现了两种分配策略,RoundRobinPartitioner是在各个Partition中进行轮询发送。DefaultPartitioner和UniformStickyPartitioner主要都是基于sticky策略,这种策略会尽量在Producer与Partition之间建议一种平均分配并且稳定的对应关系。
比如说有两个同组的Producer,Producer1和Producer2,然后Topic下有五个Partition,partition0~4。那么UniformStickyPartitioner策略就会先将五个partition尽量平均的分配给Producer。比如Producer1对应Partiton0,Partition2,Partition4。Producer2对应Partition1,Partition3。那往后这一批消息中,Producer1和Producer2都会尽量固定的发往自己对应的Partition。
UniformStickyPartitioner是不按照Key进行分区的。DefaultPartitioner则会先按照Key分区。没有Key就按照Sticky分区。
然后,在Consumer中,可以指定一个PARTITION_ASSIGNMENT_STRATEGY分区分配策略,决定如何在多个Consumer实例和多个Partitioner之间建立关联关系。
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +
"ordered by preference, of supported partition assignment strategies that the client will use to distribute " +
"partition ownership amongst consumer instances when group management is used. Available options are:" +
""
+
"org.apache.kafka.clients.consumer.RangeAssignor
: Assigns partitions on a per-topic basis. " +
"org.apache.kafka.clients.consumer.RoundRobinAssignor
: Assigns partitions to consumers in a round-robin fashion. " +
"org.apache.kafka.clients.consumer.StickyAssignor
: Guarantees an assignment that is " +
"maximally balanced while preserving as many existing partition assignments as possible." +
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor
: Follows the same StickyAssignor " +
"logic, but allows for cooperative rebalancing." +
"" +
"The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, "
+
"but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list." +
"Implementing the org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
"
+
"interface allows you to plug in a custom assignment strategy.";
同样,Kafka内置了一些实现方式,在通常情况下也都是最优的选择。你也可以实现自己的分配策略。
从上面介绍可以看到Kafka默认提供了三种消费者的分区分配策略
渔与鱼:实现分配策略本身并不难,但是更重要的是需要考虑分配算法的执行效率,尤其是在高并发,海量消息场景下的执行效率。官方默认提供的生产者端的DefaultPartitioner以及消费者端的RangeAssignor+CooperativeStickyAssignor分配策略,在大部分场景下都是非常高效的算法。深入理解这些算法,虽然对开发的帮助并不会很大,但是对于你深入理解MQ场景,以及借此去横向对比理解其他的MQ产品,都是非常有帮助的。