• kafka—分区的分配和再平衡


    一、分区的分配以及再平衡

    问题引入:一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据

    Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky

    可以通过配置参数partition.assignment.strategy,修改分区的分配策略。Kafka可以同时使用多个分区分配策略

    说明:默认策略是Range + CooperativeSticky

    package com.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    /**
     * @author wangbo
     * @version 1.0
     */
    
    /**
     * 分区策略和再平衡
     */
    
    public class CustomConsumer_02 {
        public static void main(String[] args) {
            //配置
            Properties properties = new Properties();
    
            //连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性
    
            //反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            //配置消费者组ID 可以任意起
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    
            //设置分区策略,一共有四种 Range、RoundRobin、Sticky、CooperativeSticky,默认策略是Range + CooperativeSticky
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RangeAssignor");  //Range分区策略
    //        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");  //RoundRobin分区策略
    //        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");  //Sticky分区策略
    //        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");  //CooperativeSticky分区策略
    
            //1.创建一个消费者 "","hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            //2.订阅主题 first3
            ArrayList<String> topics = new ArrayList<String>();
            topics.add("first3");
            kafkaConsumer.subscribe(topics);
    
            //3.消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据
    
                //循环打印消费的数据 consumerRecords.for
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    二、Range策略

    1.Range分配策略

    Range是对每个topic而言的,首先对同一个topic里面的分区按照序号进行排序,而消费者按照字母顺序进行排序。
    例如:7个分区,3个消费者,排序后分区为0,1,2,3,4,5,6;消费者排序之后为C0,C1,C2

    通过分区数 / 消费者数来决定每个消费者消费几个分区,如果除不尽,那么前几个消费者将会多消费数据
    例如: 7/3 = 2 余 1,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个

    缺点:数据量如果非常大,容易造成数据倾斜
    例如:如果有 N 多个 topic,那么针对每个topic,消费者 C0都将多消费 1 个分区,topic越多,C0消 费的分区会比其他消费者明显多消费 N 个分区

    2.Range分区分配再平衡

    引入:假如有7个分区,3个消费者,排序后分区为0,1,2,3,4,5,6;消费者排序之后为C0,C1,C2

    C0 号消费者:消费到 0、1、2 号分区数据
    C1 号消费者:消费到 3、4 号分区数据
    C2 号消费者:消费到 5、6 号分区数据

    (1)停止掉C0号消费者,快速重新发送消息观看结果(45s 以内,越快越好)

    C0号消费者的任务会整体被分配到C1号消费者或者C2号消费者

    说明:C0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行

    (2)停止掉C0号消费者,快速重新发送消息给消费者观看结果(45s 以后)

    C1 号消费者:消费到 0、1、2、3 号分区数据
    C2 号消费者:消费到 4、5、6 号分区数据

    说明:消费者 C0 已经被踢出消费者组,所以重新按照 Range分配策略进行分配

    三、RoundRobin策略

    1.RoundRobin分区策略原理

    RoundRobin 针对集群中所有Topic而言,RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者
    例如:C0消费0号分区,C1消费1号分区,C2消费2号分区,C0消费3号分区,以此类推

    2.RoundRobin分区分配再平衡

    引入:假如有7个分区,3个消费者,排序后分区为0,1,2,3,4,5,6;消费者排序之后为C0,C1,C2

    C0 号消费者:消费到 0、3、6 号分区数据
    C1 号消费者:消费到 2、5 号分区数据
    C2 号消费者:消费到 4、1 号分区数据

    (1)停止掉 C0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)

    C0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、3 和 6 号分区数据,分别由 C1 号消费者或者 C2 号消费者消费

    说明:C0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行

    (2)停止掉C0号消费者,快速重新发送消息给消费者观看结果(45s 以后)

    C1 号消费者:消费到 0、2、4、6 号分区数据
    C2 号消费者:消费到 1、3、5 号分区数据

    说明:消费者 C0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配

    四、Sticky策略

    1.Sticky分区策略原理

    Sticky(粘性)分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销

    例如:7个分区,3个消费者,排序后分区为0,1,2,3,4,5,6;消费者排序之后为C0,C1,C2

    C0 号消费者:消费到 2、3、5 号分区数据
    C1 号消费者:消费到 1、6 号分区数据
    C2 号消费者:消费到 0、4 号分区数据

    有点类似Range策略,但是分区是随机的,不是按照顺序来的

    2.Sticky分区分配再平衡

    (1)停止掉 C0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)

    C1 号消费者:消费到 2、5、3 号分区数据
    C2 号消费者:消费到 4、6 号分区数据

    C0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 C1 号消费者或者 C2 号消费者消费。

    说明:C0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行

    (2)停止掉C0号消费者,快速重新发送消息给消费者观看结果(45s 以后)

    C1 号消费者:消费到 2、3、5 号分区数据
    C2 号消费者:消费到 0、1、4、6 号分区数据

    说明:消费者 C0 已经被踢出消费者组,所以重新按照粘性方式分配

  • 相关阅读:
    kotlin用ping命令判断网络是否是通的
    STM32F407 芯片的学习 day02 , led模块, key 模块, beep 模块
    计算机视觉岗实习面经
    剑指JUC原理-15.ThreadLocal
    3、TCP状态
    JavaScript字符串类型
    元宇宙vr工业产品展示空间降低研发成本
    如何使用Puppeteer进行金融数据抓取和预测
    Tomcat长轮询原理与源码解析
    白嫖在线云服务器,免费在 linux 服务器使用 docker 。 附视频+附文档
  • 原文地址:https://blog.csdn.net/weixin_44604159/article/details/127540384