• 【Kafka】Kafka Producer 分区-05


    1. 分区的好处

    (1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
    (2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

    在这里插入图片描述

    see 2.4.1http://t.csdnimg.cn/m1O9u

    2. 分区策略

    2.1 默认的分区器 DefaultPartitioner

    /**
    * The default partitioning strategy:
    * 
      *
    • If a partition is specified in the record, use it *
    • If no partition is specified but a key is present choose a partition based on a hash of the key *
    • If no partition or key is present choose the sticky partition that changes when the batch is full. * * See KIP-480 for details about sticky partitioning. */ public class DefaultPartitioner implements Partitioner { … … }

    策略如下:

    1. 如果记录中指定了分区,使用指定的分区。
      这意味着生产者在发送消息时明确指定了一个分区号,Kafka将直接使用这个分区。
    2. 如果没有指定分区但存在键,根据键的哈希值选择分区。
      如果消息中没有明确指定分区,但是提供了一个键,Kafka会使用键的哈希值来计算应该使用哪个分区。这可以保证具有相同键的消息被发送到相同的分区,从而保证消息的有序性。
    3. 如果既没有指定分区也没有键,选择一个“sticky partition”(粘性分区),在批次满时更换分区。
      如果消息既没有指定分区,也没有键,Kafka将使用“sticky partition”策略。这种策略会在消息批次满时更换分区,以便于提高效率和性能。

    这个注释是对DefaultPartitioner类的说明,该类实现了Partitioner接口,用于定义消息分区的策略。DefaultPartitioner类的主要功能就是根据上述规则决定消息发送到哪个分区。以下是DefaultPartitioner类的示例实现结构:

    public class DefaultPartitioner implements Partitioner {
        // 初始化方法
        @Override
        public void configure(Map<String, ?> configs) {
            // 配置代码
        }
    
        // 计算分区的方法
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            // 分区计算逻辑
        }
    
        // 清理资源的方法
        @Override
        public void close() {
            // 资源清理代码
        }
    }
    

    该实现中包括了三个主要的方法:

    • configure(Map configs):用于初始化和配置分区器。
    • partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster):用于根据给定的主题、键和值来计算应该使用哪个分区。
    • close():用于在分区器关闭时清理资源。

    在这里插入图片描述

    案例一: 将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    public class CustomProducerCallbackPartitions {
        public static void main(String[] args) {
            // 1. 创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
    		// 2. 给 kafka 配置对象添加配置信息
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102
    :9092 ");
            // key,value 序列化(必须):key.serializer,value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
    
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            KafkaProducer<String, String> kafkaProducer = new
                    KafkaProducer<>(properties);
            for (int i = 0; i < 5; i++) {
                // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
                kafkaProducer.send(new ProducerRecord<>("first",
                        1, "", "atguigu " + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata,
                                             Exception e) {
                        if (e == null) {
                            System.out.println(" 主题: " +
                                    metadata.topic() + "->" + "分区:" + metadata.partition()
                            );
                        } else {
                            e.printStackTrace();
                        }
                    }
                });
            }
            kafkaProducer.close();
        }
    }
    

    案例二: 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

    public class CustomProducerCallback {
        public static void main(String[] args) {
            Properties properties = new Properties();
    
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
    
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            KafkaProducer<String, String> kafkaProducer = new
                    KafkaProducer<>(properties);
            for (int i = 0; i < 5; i++) {
                // 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
                kafkaProducer.send(new ProducerRecord<>("first",
                        "a", "atguigu " + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata,
                                             Exception e) {
                        if (e == null) {
                            System.out.println(" 主题: " +
                                    metadata.topic() + "->" + "分区:" + metadata.partition()
                            );
                        } else {
                            e.printStackTrace();
                        }
                    }
                });
            }
            kafkaProducer.close();
        }
    }
    

    3. 自定义分区器

    如果研发人员可以根据企业需求,自己重新实现分区器

    1. 需求
      例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区。

    2. 实现步骤
      定义类实现 Partitioner 接口
      重写 partition()方法

    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    /**
     * 1. 实现接口 Partitioner
     * 2. 实现 3 个方法:partition,close,configure
     * 3. 编写 partition 方法,返回分区号
     */
    public class MyPartitioner implements Partitioner {
        /**
         * 返回信息对应的分区
         *
         * @param topic 主题
         * @param key 消息的 key
         * @param keyBytes 消息的 key 序列化后的字节数组
         * @param value 消息的 value
         * @param valueBytes 消息的 value 序列化后的字节数组
         * @param cluster 集群元数据可以查看分区信息
         * @return
         */
        @Override
        public int partition(String topic, Object key, byte[]
                keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            // 获取消息
            String msgValue = value.toString();
            // 创建 partition
            int partition;
            // 判断消息是否包含 atguigu
            if (msgValue.contains("atguigu")) {
                partition = 0;
            } else {
                partition = 1;
            }
            // 返回分区号
            return partition;
        }
    
        // 关闭资源
        @Override
        public void close() {
        }
    
        // 配置方法
        @Override
        public void configure(Map<String, ?> configs) {
        }
    }
    

    使用分区器的方法,在生产者的配置中添加分区器参数。

    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    
    public class CustomProducerCallbackPartitions {
        public static void main(String[] args) throws InterruptedException {
            Properties properties = new Properties();
    
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
    
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class.getName());
            // 添加自定义分区器
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atgui
                    gu.kafka.producer.MyPartitioner");
                    KafkaProducer < String, String > kafkaProducer = new
                            KafkaProducer<>(properties);
            for (int i = 0; i < 5; i++) {
    
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata,
                                             Exception e) {
                        if (e == null) {
                            System.out.println(" 主题: " +
                                    metadata.topic() + "->" + "分区:" + metadata.partition()
                            );
                        } else {
                            e.printStackTrace();
                        }
                    }
                });
            }
            kafkaProducer.close();
        }
    }
    

    测试
    ①在 hadoop102 上开启 Kafka 消费者

    [atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    

    ②在 IDEA 控制台观察回调信息

    主题:first->分区:0
    主题:first->分区:0
    主题:first->分区:0
    主题:first->分区:0
    主题:first->分区:0
    
  • 相关阅读:
    2.1.C++项目:网络版五子棋对战之前置知识
    实战 | 如何用 Python 自动化监控文件夹完成服务部署
    【Hadoop快速入门】Hdfs、MapReduce、Yarn
    青少年软件编程(202209)(C语言)等级考试(五级)试题及参考答案
    【牛客】NC107寻找峰值,HJ31单词倒排
    Apache JMeter压测安装使用
    五、C++ 函数重载
    Python Dictionary(字典)进阶内容
    SQL Server 开发环境配置教程(SSMS+SQL Prompt)
    数据结构:堆
  • 原文地址:https://blog.csdn.net/Blue_Pepsi_Cola/article/details/139697836