• Kafka入门-分区及压缩


    一、生产者消息分区

    Kafka的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。

    分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。 

    分区策略

    所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka为我们提供了默认的分区策略,同时它也支持自定义分区策略。 如果要自定义分区策略,需要显式地配置生产者端的参数partitioner.class。这个参数该怎么设定呢?方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的partition方法。我们来看看这个方法的方法签名:

    int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前Kafka集群共有多少主题、多少Broker等)。只要实现类定义好了partition方法,同时设置partitioner.class参数为实现类的FullQualified Name,那么生产者程序就会按照代码逻辑对消息进行分区。

    比较常见的分区策略:

    1. 轮询策略

    未指定partitioner.class参数,默认使用

    2. 随机策略

    要实现随机策略版的partition方法,很简单,只需要两行代码即可:

    1. List partitions = cluster.partitionsForTopic(topic);
    2. return ThreadLocalRandom.current().nextInt(partitions.size());

     先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。

    3. 按消息键保序策略 

    Kafka允许为每条消息定义消息键,简称为Key。这个Key的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务ID等;一旦消息被定义了Key,那么就可以保证同一个Key的所有消息都进入到相同的分区里面,实现这个策略的partition方法同样简单,只需要下面两行代码即可:

    1. List partitions = cluster.partitionsForTopic(topic);
    2. return Math.abs(key.hashCode()) % partitions.size();

    4. 基于地理位置的分区策略

    这种策略一般只针对那些大规模的Kafka集群,特别是跨城市、跨国家甚至是跨大洲的集群。

    根据Broker所在的IP地址实现定制化的分区策略。比如下面这段代码:

    1. List partitions = cluster.partitionsForTopic(topic);
    2. return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

     二、Kafka的压缩

    Kafka的消息层次都分为两层:消息集合(messageset)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka底层的消息日志由一系列消息集合日志项组成。Kafka通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

    在Kafka中,压缩可能发生在两个地方:生产者端和Broker端

    生产者端

    生产者程序中配置compression.type参数即表示启用指定类型的压缩算法。比如下面这段程序代码展示了如何构建一个开启GZIP的Producer对象:

    1. Properties props = new Properties();
    2. props.put("bootstrap.servers", "localhost:9092");
    3. props.put("acks", "all");
    4. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    5. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    6. // 开启GZIP压缩
    7. props.put("compression.type", "gzip");
    8. Producer producer = new KafkaProducer<>(props);

    这样Producer启动后生产的每个消息集合都是经GZIP压缩过的,故而能很好地节省网络传输带宽以 及Kafka Broker端的磁盘占用。 

    Broker端

    1. Broker端指定了和Producer端不同的压缩算法。Broker端也有一个参数叫compression.type,默认值是producer,可以设置不同压缩算法。

    2. Broker端发生了消息格式转换。为了兼容老版本的格式,Broker端 会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。

    解压缩

    解压缩发生在消费者程序中,也就是说Producer发送压缩消息到Broker后,Broker照单全收并原样保存起来。当Consumer程序请求这部分消息时,由Consumer自行解压缩还原成之前的消息。

    Consumer怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka会将启用了哪种压缩算法封装进消息集合中,这样当Consumer读取到消息集合时,它自然就知道 这些消息使用的是哪种压缩算法。

    Producer端压缩、Broker端保持、Consumer端解压缩

    对于Kafka而言压缩算法对比:

            在吞吐量方面:LZ4 > Snappy > zstd和GZIP;

            而在压缩比方面,zstd > LZ4 > GZIP > Snappy。

  • 相关阅读:
    索引数据结构千千万 , 为什么B+Tree独领风骚
    字节迎来新 CFO,或重启上市;马斯克以 440 亿美元收购 Twitter;FFmpeg 支持 JPEG-XL|极客头条
    会计账簿、会计账簿概述、会计账簿的启用与登记要求、会计账簿的格式和登记方法
    DAY08_MyBatisPlus——入门案例&标准数据层开发CRUD-Lombok-分页功能&DQL编程控制&DML编程控制&乐观锁&快速开发-代码生成器
    Messari发布Moonbeam简报,每日交易量稳步增长,首次公布利润数据
    vue2实例
    日语_和方位相关的词
    【用户画像】Redis的简介和安装
    Linux安装MySQL 5.7主从、minio
    SaaSBase:什么是艾客SCRM?
  • 原文地址:https://blog.csdn.net/m0_47743175/article/details/139985057