• Kafka复习计划 - 客户端实践及原理(消费者组/位移/请求处理过程)


    前言

    同样地这篇文章主要学习胡夕老师的相关文章。其中的一些截图也是来自于它。

    一. 消费组

    消费组(Consumer Group)是Kafka提供的可拓展并且具有容错性消费者机制。 有三个特性:

    • 消费组下面可以有一个或者多个Consumer实例。
    • 每个消费组对应着一个唯一的Group ID,是一个字符串。
    • 消费组下面的所有实例订阅了一个主题的单个分区,同时该分区也只能被同一个消费组下的一个实例消费。

    因为消费组机制,Kafka可以同时实现了传统的消息队列模型和发布订阅模型:

    • 消息队列模型:所有实例都属于同一个Group。一条消息只能够被一个Consumer消费。
    • 发布订阅模型:所有实例分别属于不同的Group。即允许消息被多个Consumer消费

    1.1 Rebalance 重平衡

    Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。消费组发生Rebalance的触发条件有三种:

    • 消费组成员发生变更。(成员的加入和离开)
    • 订阅主题数发生变更。因为消费组可以使用正则表达式的方式来订阅主题,当创建的新主题满足这个正则表达式的时候,那么消费组就会发生Rebalance
    • 订阅主题的分区数发生变更。当某个主题的分区数量增加的时候,也会发生Rebalance

    Rebalance其实也就是重新分配分区,让其均匀分布在各个Broker上。这是其一个重要的功能,但是它也有很多缺点,例如:

    1. Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
    2. Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件Coordinator的帮助下全部重新分配所有分区。

    前者好理解,后者其实并不需要将所有的实例和分区之间的分配关系给打乱重排。因为我们从上一章Kafka复习计划 - 客户端实践及原理(连接器/TCP的管理/幂等性和事务)知道,实例必须和分区里副本所在的Broker建立TCP连接,才可以进行后续的相关操作。那么将所有实例全部重新分配分区,可能就会造成不必要的TCP连接的创建过程。

    诸如以上缺点,Rebalance过程所消耗的时间是很久的。因此,能避免就去避免Rebalance的发生。也就是避免触发Rebalance的三种情况。

    在将如何避免之前,先来说下上文提到的Coordinator协调者。

    1.2 Coordinator 协调者

    Coordinator 专门为消费者组服务:负责为其发生重平衡Rebalance的时候提供位移管理和组成员管理。 比如:

    1. Consumer端在提交位移的时候,就是向Coordinator所在的Broker提交位移。
    2. Consumer应用启动的时候,也是向Coordinator所在的Broker发送各种请求。由Coordinator负责执行消费者组的注册、成员管理记录等元数据的管理。

    每个Broker都有自己的Coordinator 组件。而消费者组确定自身对应的Coordinator 在哪一台Broker上,有两个步骤:

    1. 确定该消费者组的数据保存在哪个位移主题分区上。
    2. 找到该分区的Leader副本所在的Broker,即为对应的Coordinator

    Coordinator 协调者的算法流程:

    1. 假设某个Group ID为“test-group”,然后计算到它的hashCode值。

    2. 计算__consumer_offsets位移主题的分区数(默认50个),然后用哈希值对分区数做取模运算。
      在这里插入图片描述

    3. 此时意味着位移主题的分区12负责保存该Group的数据。

    4. 找到位移主题分区12的Leader副本所在的Broker

    1.3 如何尽量避免消费者组重平衡

    首先我们需要知道Rebalance后会发生什么事情:每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求表明它还存活着。

    而关于心跳请求以及认定Consumer是否挂掉的过程,这里有三个重要的参数至关重要:

    • session.timeout.ms:默认10s,意思是,如果10s内没有收到心跳请求,就会认为这个实例挂了。其决定了 Consumer 存活性的时间间隔
    • heartbeat.interval.ms:意思是Consumer发送心跳请求的频率大小。
    • max.poll.interval.ms:用于控制Consumer实际消费能力对Rebalance的影响。默认是5分钟Consumer程序如果在5分钟之内无法消费完poll()方法返回的消息,那么Consumer会主动发起离开组的请求,那么自然而然会导致一次Rebalance的发生。

    其次上文提到了触发Rebalance的三个时机:组成员数量、分区主题数量、主题的分区数量发生变化。这里来重点说下组成员数量变化的方面。有些情况下,Consumer实例会被Coordinator错误地认为它已经停止了,而被踢出Group,那么就会导致Rebalance的发生。


    那么结合心跳请求的机制,我们可以看出,这一块是一个避免消费者组发生重平衡的一个切入点。我们可以明确哪一些Rebalance操作是不必要发生的。

    1.Consumer未能及时发送心跳导致被踢出消费者组。 对于这样的现象,可以规定Consumer在被判定为 挂掉 之前,至少发送3轮的心跳请求。即session.timeout.ms >= 3 * heartbeat.interval.ms,例如前者设置6s,后者设置2s。

    2.Consumer消费时间过长导致主动发送离开消费者组请求。max.poll.interval.ms设置的阈值稍微大一点,可以做个统计,Consumer端程序运行的最大时长是多少,如果是10分钟,可以将这个值改为11分钟。

    1.4 重平衡流程

    Rebalance发生之后,由协调者决定发起,并且指挥其他的Consumer实例参与重平衡。而指挥的这一个操作则是通过心跳机制来完成。

    1. REBALANCE_IN_PROGRESS封装到心跳请求中,发送给消费者实例。
    2. 消费者实例接收到请求后,发现响应中包含REBALANCE_IN_PROGRESS,就知道此时重平衡开始了。

    消费者组在重平衡阶段,一共有5个状态:

    在这里插入图片描述
    而这五个状态之间的关系如下:
    在这里插入图片描述

    1.4.1 消费者端重平衡流程

    从消费者端角度来看,重平衡分为两个步骤:

    1. 加入组:会向协调者发送JoinGroup请求,主要告知协调者自己订阅了什么主题,这样协调者就可以收集到所有成员的订阅信息。
      在这里插入图片描述

    2. 等待领导者消费者分配具体的方案:普通成员和消费者领导者都会向协调者发送SyncGroup请求。普通成员发送的请求体没有实际的内容,领导者发送的请求体则包含做好的分配方案。
      在这里插入图片描述

    注意:

    • 通常情况下,第一个发送JoinGroup请求的成员将自动成为消费者领导者。它的任务就是收集所有成员的订阅信息,然后根据这些信息去制定具体的分区消费分配方案。
    • 一个组的每个成员都会向协调者发送SyncGroup请求,同样地协调者也会以SyncGroup请求的方式返回给各个成员,告知他们重平衡的最终结果(分配方案)。所有成员接收成功后,消费者组进入到Stable状态。

    1.4.2 Broker端重平衡流程

    场景一:新成员入组。当协调者接收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。
    在这里插入图片描述


    场景二:组成员主动离开组。即Consumer端实例主动close()了,也就是向协调者发送LeaveGroup请求。
    在这里插入图片描述


    场景三:组成员崩溃离组
    在这里插入图片描述

    注意区分主动离组:

    • 主动离组:主动发起请求告知协调者自己要离开消费者组,因此协调者能够立马感应并做出处理。
    • 崩溃离组:此时协调者通常需要等待一段时间才能够感知到某个消费者已经离开了组,主要看的是心跳机制相关的参数,比如默认情况下10s内感应不到心跳就会认为其挂了,从而发起重平衡。

    二. Offset 位移

    2.1 位移主题__consumer_offsets

    Kafka中有一个内部主题__consumer_offsets,叫位移主题,用作位移数据的管理,即保存消费者的位移消息。 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。其中相关的主题参数为:

    1. 分区数:offsets.topic.num.partitions,默认50。
    2. 副本数:offsets.topic.replication.factor,默认3。

    Kafka的位移管理机制用一句话表示就是:Consumer的位移数据作为一条普通的Kafka消息提交到主题__consumer_offsets中。

    注意:

    • 位移主题就是个普通的Kafka主题,可以手动创建、删除、修改。但一般我们无需去管理它。
    • 位移主题的消息格式是Kafka自定义的,用户不能够随意修改。

    这里我们可以来验证下:(我的KafkaZookeeper都是Docker上的)

    1. 查看Zookeeper的容器ID,并进入其中:
    docker exec -it 0b1db99556e0 bash
    
    • 1
    1. 到目录cd /opt/zookeeper-3.4.13/bin/中,使用命令./zkCli.sh启动Zookeeper客户端。
      在这里插入图片描述

    2. 查看对应主题下的分区:ls /brokers/topics/__consumer_offsets/partitions,结果如下:确实是50个,[0,49]
      在这里插入图片描述

    消息格式简单可以理解为Key-Value格式,Key包含三个重要的内容:Group ID,主题名,分区号。 Value主要保存位移值以及元数据。另外,位移主题还会保存以下两种信息:

    • 第二种:用于保存 Consumer Group 信息的消息,用来注册消费者组的。
    • 第三种:用于删除 Group 过期位移甚至是删除 Group 的消息。

    主要来说一下第三种消息,他也叫墓碑消息主要特点是它的消息体是null,即空消息体。他主要是标明某个消费者组需要被删除。它的出现有两个前提:

    1. 某个Consumer Group下的所有Consumer实例都停止了。
    2. 并且所有Consumer实例的位移数据已经被删除。

    2.2 Compact策略删除过期数据

    我们知道,KafkaConsumer提交位移的方式有两种:由参数enable.auto.commit来控制。

    • 自动提交位移:由Consumer在后台定期地提交位移,时间间隔由参数auto.commit.interval.ms来控制。默认是5s。
    • 手动提交位移:由Consumer端的程序来控制,consumer.commitSync()

    如果选择的是自动提交位移,那么只要Consumer一直启动着,就会无期限地向位移主题中写入消息,而文章到这里为止讲的基本上都是主题__consumer_offsets中有关信息的写入操作。那么相反的,Kafka必定会考虑到这点:

    Kafka 使用Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。Compact 策略的定义又简单易懂:

    1. 同一个Key(大前提),发送了两条消息,Msg1Msg2
    2. 倘若Msg1的发送时间早于Msg2,那么Msg1就是过期消息。

    Kafka 提供了叫Log Cleaner的后台线程取定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。Compact过程的先后对比如下:
    在这里插入图片描述

    2.3 合理的位移提交编码方式

    从用户角度来看,提交位移的方式有两种:

    1. 自动提交(默认的方式):默认情况下每5s会自动提交一次位移。
    2. 手动提交(推荐的方式):设置enable.auto.commit = false ,然后调用API提交位移。

    首先来说下自动提交的问题:

    • 优点:从顺序上来说,poll() 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。
    • 缺点:可能出现重复消费。

    重复消费的一个情景:

    1. 默认情况下,每隔5s会自动提交一次位移。倘若提交位移M之后的第三秒钟发生了Rebalance操作。
    2. 那么在Rebalance结束后,所以Consumer将会从上一次提交的位移M处继续消费。
    3. 但是位移M是3秒前的位移数据了,因此再Rebalance发生的前3秒消费的数据全部要重新消费一次。
    4. 也就是重复消费的问题。

    也因此提供了手动提交的API,可以让使用者自己控制提交的时机以及频率。从Consumer端角度来看,手动提交位移的方式也有两种。

    1. 同步提交:commitSync(),会提交Consumerpoll()返回的最新位移。会阻塞。能够自动重试。
    2. 异步提交:commitAsync()立刻返回结果,不会阻塞。通过回调函数来做后续操作。不能够自动重试。

    可以看到同步和异步提交各有各的优缺点,那么我们可以将它们俩结合在一起。手动提交下推荐的位移提交编码方式如下:

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "你的服务器地址:9092");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("group.id", "test-consumer");
    // 1.创建KafkaConsumer
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    // 2.订阅主题
    consumer.subscribe(Collections.singletonList("test"));
    // 3.轮询消费
    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            // 4.处理消息体逻辑
            processMsg(records);
            // 5.异步提交代码
            consumer.commitAsync();
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            // 6.先同步阻塞提交位移
            consumer.commitSync();
        } finally {
            // 7.最后关闭
            consumer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    优点如下:

    1. 对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞。
    2. 在最后的Consumer端程序关闭前,调用 commitSync() 方法执行同步阻塞式的位移提交,确保能够保存正确的位移数据。

    但是这段代码也有一个问题:poll()方法是一批一批的返回消息的,而commit提交位移的时候,是会将poll返回的所有消息的位移都提交。 试想一下,倘若消息有1W条,那么就会一次性提交1W条数据的位移。如果中间出现问题了,那么之前处理的岂不是全部要重新来一遍?

    因此我们可以从更细的粒度上出发去提交位移,比如可以每处理完100条数据就提交一次位移。我们以同步提交的API为例,来看下这个方法的重载函数:

    public void commitSync() {}
    /**
     * TopicPartition:消费的分区
     * OffsetAndMetadata:位移数据
     */
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    案例代码如下(以每100条异步提交一次位移为例):

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "你的服务器地址:9092");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("group.id", "test-consumer");
    // 1.创建KafkaConsumer
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    // 2.订阅主题
    consumer.subscribe(Collections.singletonList("test"));
    // 3.轮询消费
    try {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        int count = 0;
    
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            // 4.处理消息体逻辑
            for (ConsumerRecord<String, String> record : records) {
                // 处理逻辑....
                System.out.println("topic = " + record.topic() +
                        ", partition = " + record.partition() +
                        ", offset = " + record.offset() +
                        ", customer = " + record.key() +
                        ", country = " + record.value());
                // 将当前处理的消息放到位移集合中。届时一并提交其位移
                offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
                // 5.异步提交代码
                if (count != 0 && count % 100 == 0) {
                    consumer.commitAsync(offsets, null);
                }
                count++;
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            // 6.先同步阻塞提交位移
            consumer.commitSync();
        } finally {
            // 7.最后关闭
            consumer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    实际上,无论是是手动提交还是自动提交,都不能避免消息的重复消费。只要Consumer端满足以下两个条件,那么在下次重启之后依旧会出现消息的重复消费。

    1. Consumer消费了一定的数据。
    2. Consumer还没提交位移。

    2.4 如何做到避免重复消费

    既然仅仅靠Kafka的位移提交机制无法避免重复消费,我们可以从编码角度来解决:如果保证了消息的幂等性,那么就不会重复消费。大概做法如下:

    1. Producer端在发送消息的时候,往消息体里面塞一个唯一标识符。
    2. Consumer端在消费消息的时候,可以先去Redis(或者Mysql这类数据库)观察下这个消息是否被消费过。若消费过,则跳过。
    3. 否则,处理当前消息(等于消费),然后同步的将唯一标识符其保存到Redis中。
    4. 最后在统一的提交位移数据。

    三. 请求处理过程

    Kafka有一套自己的请求协议,用于实现各式各样的交互操作,例如:

    1. PRODUCE:生产消息用的。
    2. FETCH:消费消息用的。
    3. METADATA:请求Kafka集群元数据信息用的。

    不过有一个相同点就是:所有的请求都是通过TCP网络以Socket的方式进行通讯的。

    我们知道,Kafka对于一个分区下的消息,是顺序处理的,那么这个特性会引来几个问题:

    1. 由于顺序处理请求,那么下一个请求得等待上一个请求的结束才能够继续执行,这样会导致吞吐量太差。
    2. 倘若每个请求使用单独的线程去处理,即异步处理,那么创建线程的开销又很大。

    Kafka是利用Reactor模式来处理请求的。

    3.1 Reactor模式

    Reactor 模式是事件驱动架构的一种实现方式,适合应用于处理多个客户端并发向服务器端发送请求的场景。大概的模式架构图如下:
    在这里插入图片描述
    解释如下:

    1. 多个客户端发送请求到SocketServer组件中。这个组件有一个Acceptor线程还有一个网络线程池。
    2. Acceptor线程负责请求的分发,主要通过轮询的方式将入站请求均匀地发送到网络线程中。
    3. 网络线程池主要负责执行处理真正的逻辑处理。

    其中Kafka提供相关参数控制网络线程池:

    • num.network.threads调整网络线程池的线程数,默认是3。即每台Broker启动的时候都会创建3个网络线程,用于专门处理客户端发送的请求。

    Kafka又对网络线程池的工作做进一步的划分,主要做异步线程池的处理:

    1. 网络线程拿到请求后,并非立刻处理,而是将其放入到一个共享请求队列中。
    2. Broker端的一个IO线程池负责将其从队列中取出来做处理。根据请求的类型做对应的操作。
    3. Purgatory组件,用来缓存延时请求(一些暂时因不满足条件而立刻处理的请求),例如当ack机制设置为all的时候,当生产一条消息,该请求必须等待所有副本都接收到消息后才能够返回。那么等待的过程中,该IO请求就会暂存到Purgatory组件中。

    在这里插入图片描述

    其中Kafka提供相关参数控制IO线程池:

    • num.io.threads调整IO线程池的线程数,默认是8。即每台Broker启动的时候都会创建8个网络线程,用于处理请求的真实底层逻辑,例如消息写入磁盘,或者是从磁盘中读取消息。

    总的来说,Kafka对于请求的处理用到了四大角色(正好也是请求的一个先后顺序):

    1. Acceptor线程:所有客户端的请求都会先经过此 ,由其通过轮询的方式,将请求均匀地分配到网络线程池中。
    2. 网络线程池(默认3个线程):处理数据类请求。主要负责将请求放入到共享请求队列中。
    3. IO线程池(默认8个线程):处理控制类请求,从共享请求队列中取出请求,执行真正的业务逻辑。
    4. Purgatory组件:用来缓存延时请求。

    备注(请求的分类):

    • 数据类请求:诸如 PRODUCEFETCH 这类请求。
    • 控制类请求:比如负责更新副本以及 ISR 集合的 LeaderAndIsr 请求,负责勒令副本下线的 StopReplica 请求等。
  • 相关阅读:
    ProxySQL实现mysql8主从同步读写分离
    一个全响应式的企业级物联网平台,开源了
    编译 qsqlmysql.dll QMYSQL driver not loaded
    model加载模型部分参数
    使用GetX实现GetPage中间件
    C++学习之强制类型转换
    银行家算法程序模拟实现
    【教材】20022/11/28[指针] 指针数组
    Postgresql中JIT函数能否inline的依据function_inlinable
    QT学习笔记(四)——在QLabel显示的影像上画图形,并和影像同步放大缩小
  • 原文地址:https://blog.csdn.net/Zong_0915/article/details/125806369