• 连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?


    前言

    在介绍Producer端原理之前,大家先对其整体架构有一个大致的了解,图示如下所示:

    这个图看不懂没有关系,我们会在介绍Producer端原理时一一介绍每个部分的含义及其所复杂的功能。

    Main Thread(主线程)

    在Main Thread中,一共分为四个步骤,分别是:KafkaProducerKafka生产端)、Interceptor拦截器)、Serializer(序列化器)和Partitioner(分区器);

    那么在上个章节中,我们介绍了KafkaProducer端的一些重要参数和使用方式。

    本章,就主要针对剩余的3个部分:Interceptor(拦截器)、Serializer(序列化器)和Partitioner(分区器)进行讲解。

    1> Interceptor拦截器

    Kafka中一共存在两种拦截器,分别是:生产者拦截器ProducerInterceptor)和消费者拦截器ConsumerInterceptor

    我们来看一下生产者拦截器的接口定义了哪些方法,如下所示:

    1. public interface ProducerInterceptor<K, V> extends Configurable {
    2.     /** KafkaProducer会在【将消息序列化】和【计算分区】之前调用该方法,来对消息进行相应的定制化操作 */
    3.     ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    4.     /** KafkaProducer会在【消息被应答之前/消息发送失败】时调用该方法 */
    5.     void onAcknowledgement(RecordMetadata metadata, Exception exception);
    6.     /** 关闭拦截器(由于此方法可能被KafkaProducer调用多次,所以必须是幂等的)*/
    7.     void close();
    8. }

    在ProducerRecord类中,包含了我们发送消息所需要和信息,这些信息我们都可以在 onSend(ProducerRecord record) 方法中进行修改,比如,在发送消息前修改ProducerRecord中的value值,从而改变消息内容。但是,要注意最好不要修改topickeypartition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。如下就是ProducerRecord类中包含的待发送消息的属性列表;

    1. public class ProducerRecord<K, V> {
    2.     private final String topic;
    3.     private final Integer partition;
    4.     private final Headers headers;
    5.     private final K key;
    6.     private final V value;
    7.     private final Long timestamp;
    8.     ... ...
    9. }

    那么在ProducerRecord类的 onAcknowledgement(RecordMetadata metadata, Exception exception) 方法中,有如下规律:

    消息发送成功】metadate不为null,exception为null
    消息发送失败】metadate为null,exception不为null

    所以,我们可以根据上面的规律来判断有哪些消息发送成功,有哪些消息是发送失败了。对于RecordMetadata类中,包含的发送成功后的“回执”信息,如果想要在源码及注释如下所示:

    1. public final class RecordMetadata {
    2.     public static final int UNKNOWN_PARTITION = -1;
    3.     private final long offset; // 消息的偏移量
    4.     private final long timestamp; // 时间戳
    5.     private final int serializedKeySize; // key的序列化长度
    6.     private final int serializedValueSize; // value的序列化长度
    7.     private final TopicPartition topicPartition; // 主题所在分区
    8.     ... ...
    9. }

    2> Serializer序列化器

    由于Producer端发送消息给Kafka之后,待传输的消息对象obj是需要被转换成 字节数组byte[] 之后才能在网络中传送,所以,此处必不可少的一个步骤就是序列化器Serializer了。而在Consumer端,需要将接收到的字节数组byte[] 再转换成对象obj,那么这个步骤就是反序列化器Deserializer了。

    Kafka在org.apache.kafka.common.serialization目录下提供了多种类型预置的序列化器/反序列化,具体如下所示:

    Deserializer、Serializer、ByteArrayDeserializer、ByteArraySerializer
    ByteBufferDeserializer、ByteBufferSerializer、BytesDeserializer、BytesSerializer
    DoubleDeserializer、DoubleSerializer、FloatDeserializer、FloatSerializer
    IntegerDeserializer、IntegerSerializer、ListDeserializer、ListSerializer
    LongDeserializer、LongSerializer、ShortDeserializer、ShortSerializer
    StringDeserializer、StringSerializer、UUIDDeserializer、UUIDSerializer
    VoidDeserializer、VoidSerializer

    那么由于本章主要介绍的是Producer端的执行原理,所以我们此时只需关注序列化器Serializer,该接口如下所示:

    1. public interface Serializer<T> extends Closeable {
    2.     /** 配置当前类 */
    3.     default void configure(Map<String, ?> configs, boolean isKey) {
    4.     }
    5.     /** 将对象data转换为字节数组 */
    6.     byte[] serialize(String topic, T data);
    7.     /** 将对象data转换为字节数组 */
    8.     default byte[] serialize(String topic, Headers headers, T data) {
    9.         return serialize(topic, data);
    10.     }
    11.     /** 关闭序列化器(由于此方法可能被KafkaProducer调用多次,所以必须是幂等的)*/
    12.     @Override
    13.     default void close() {
    14.     }
    15. }

    对于需要实现序列化操作,只需要实现Serialize接口中的方法接口,我们以StringSerializer为例,看一下它是如何实现的,代码如下所示:

    1. public class StringSerializer implements Serializer<String> {
    2.     private String encoding = StandardCharsets.UTF_8.name(); // 默认编码为UTF-8
    3.     @Override
    4.     public void configure(Map<String, ?> configs, boolean isKey) {
    5.         String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
    6.         // 首先尝试从configs中获得"key.serializer.encoding""value.serializer.encoding"所配置的值
    7.         Object encodingValue = configs.get(propertyName);
    8.         if (encodingValue == null)
    9.             // 如果没配置,则尝试从configs中获得"serializer.encoding"所配置的值
    10.             encodingValue = configs.get("serializer.encoding");
    11.         if (encodingValue instanceof String)
    12.             encoding = (String) encodingValue; // 如果配置了自定义编码,则赋值给encoding;否则为默认的UTF-8
    13.     }
    14.     @Override
    15.     public byte[] serialize(String topic, String data) {
    16.         try {
    17.             if (data == nullreturn null;
    18.             else 
    19.                 return data.getBytes(encoding); // 通过调用String的getBytes方法获得字节数组
    20.         } catch (UnsupportedEncodingException e) {
    21.             throw new SerializationException(...);
    22.         }
    23.     }
    24. }

    StringSerializer类中,序列化方式非常简单,就是通过调用StringgetBytes方法获得字节数组;除此之外,也可以配置自定义编码。配置方式可以通过向configs中设置key为:"key.serializer.encoding"、"value.serializer.encoding"、"serializer.encoding"这三种,其中serializer.encoding的优先级最低。如果没有配置这3个key,则 默认编码类型就是"UTF-8" ;

    如果Kafka内置的这几种序列化器都不满足需求,则可以自己实现自定义序列化器(例如:MuseSerializer),然后使用时,在properties配置中指定即可:

    Properties properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MuseSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MuseSerializer.class.getName());

    3> Partitioner分区器

    构造ProducerRecord实例对象时,如果在构造方法中指定了partition字段,那么就不需要分区器了;否则,就需要Partitioner分区器来根据key字段计算分区值。ProducerRecord的构造函数如下所示:

    当我们没有在ProducerRecord的构造函数中指定partition字段的时候,就需要分区器起作用了,所有的分区器都需要实现接口Partitioner,该接口有如下三个方法:

    1. public interface Partitioner extends Configurable, Closeable {
    2.     /** 计算给定记录的分区 */
    3.     int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    4.     /** 关闭分区器(由于此方法可能被KafkaProducer调用多次,所以必须是幂等的)*/
    5.     void close();
    6.     /** 通知分区器即将创建一个新的批处理。当使用sticky分区器时,此方法可以为新批更改选择的sticky分区 */
    7.     default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    8.     }
    9. }

    在Kafka中默认的分区器是DefaultPartitioner。这里有两条逻辑判断分支,即:keyBytes是否为nullkeyBytes就是key的字节数组

    keyBytes不为null】对keyBytes进行murmur2哈希计算,然后再基于指定Topic下的所有分片总数进行取余寻址计算。
    keyBytes为null】需要调用StickyPartitionCache的partition(...)方法进行计算。

    分区逻辑如下所示:

    1. public class DefaultPartitioner implements Partitioner {
    2.     private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
    3.     ... ...
    4.     
    5.     public int partition(String topic, Object key, byte[] keyBytes, 
    6.                          Object value, byte[] valueBytes, Cluster cluster) {
    7.         return partition(topic, key, keyBytes, value, valueBytes, cluster, 
    8.                          cluster.partitionsForTopic(topic).size()); // 获得Topic下【所有分片】总数
    9.     }
    10.     public int partition(String topic, Object key, byte[] keyBytes, Object value
    11.                          byte[] valueBytes, Cluster cluster, int numPartitions) {
    12.         // 如果不存在key的序列化值
    13.         if (keyBytes == null
    14.             return stickyPartitionCache.partition(topic, cluster);
    15.         // 对keyBytes进行哈希计算,并在获得Topic下【所有分片】中寻址
    16.         return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    17.     }
    18.     ... ...
    19. }

    如果keyBytes==null,在StickyPartitionCache中如何计算出分区值呢?首先,以主题topic为key,去缓存indexCache中获取分区值part,如果part不为空,则直接返回part,搞定!!

    如果part等于null,则说明缓存中没有缓存该topic的分区值,那么就需要计算了,计算步骤如下所示:

    步骤1】获得topic下所有分片集合partitions
    步骤2】获得topic下所有有效分片集合availablePartitions
    步骤3】如果不存在有效分片,则获得一个随机数,基于partitions中取余寻址;
    步骤4】如果存在1个有效分片,则获取此分片值;
    步骤5】如果存在多个有效分片,则获得一个随机数,基于availablePartitions中取余寻址;
    步骤6】将topic分区值维护到缓存indexCache中,并返回分区值;

    如下则是partition方法的源码及注释,请见如下所示:

    1. public int partition(String topic, Cluster cluster) {
    2.     Integer part = indexCache.get(topic); // 尝试去缓存中获取,如果获取到,则直接返回
    3.     if (part == null
    4.         return nextPartition(topic, cluster, -1); // 获得某主题topic的分区号,并将其维护到缓存indexCache中
    5.     return part;
    6. }
    7. public int nextPartition(String topic, Cluster cluster, int prevPartition) {
    8.     // 获得topic下所有分片集合
    9.     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    10.     Integer oldPart = indexCache.get(topic); // 尝试去缓存中获取分片号,作为旧分片oldPart
    11.     Integer newPart = oldPart;
    12.     if (oldPart == null || oldPart == prevPartition) {
    13.         // 获得Topic下所有【有效分片】集合
    14.         List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    15.         // 如果不存在有效分片,则获得一个随机数,基于partitions中取余寻址
    16.         if (availablePartitions.size() < 1) {
    17.             Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
    18.             newPart = random % partitions.size();
    19.         } 
    20.         
    21.         // 如果存在1个有效分片,则分配到此处
    22.         else if (availablePartitions.size() == 1) {
    23.             newPart = availablePartitions.get(0).partition();
    24.         } 
    25.         // 如果存在多个有效分片,则获得一个随机数,基于availablePartitions中取余寻址
    26.         else {
    27.             while (newPart == null || newPart.equals(oldPart)) {
    28.                 int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
    29.                 newPart = availablePartitions.get(random % availablePartitions.size()).partition();
    30.             }
    31.         }
    32.         
    33.         // 维护到缓存indexCache中,主题Topic为key
    34.         if (oldPart == null) indexCache.putIfAbsent(topic, newPart);
    35.         else indexCache.replace(topic, prevPartition, newPart);
    36.         return indexCache.get(topic); // 获得主题Topic的分区号
    37.     }
    38.     
    39.     return indexCache.get(topic); // 获得主题Topic的分区号
    40. }

    今天的文章内容就这些了:

    写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享 。

    更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」

  • 相关阅读:
    c++异常处理-在构造函数中
    Vue的快速入门(01)
    (十五)admin-boot项目之使用undertow来替代tomcat容器
    使用 HammerDB 对 Citus 和 Postgres 进行 Benchmark,每分钟200万新订单处理测试(官方博客)
    git主干master分支回滚到历史版本(不会有错误的提交记录)
    Spring容器启动流程
    SAP ARFCSTATE ARFCSDATA TRFCQOUT
    Android 12.0 系统wifi列表显示已连接但无法访问网络问题解决
    (五)正点原子STM32MP135移植——烧录
    sql 限制返回的行数、从表中随机返回n行数据、将NULL转换为实际值
  • 原文地址:https://blog.csdn.net/qq_26470817/article/details/132686395