• 【Kafka源码分析】三、消费者Consumer


    一、推拉模式和基本设计

    消息队列通常有两种消息获取模式:

    • pull(拉)模式:consumer采用从broker中主动拉取数据,不足之处是如果没有数据,消费者可能会陷入循环中,一直返回空数据。
    • push(推)模式:由broker主动向消费者主动推送消息,缺点是由broker决定消息发送速率,很难适应所有消费者的消费速率。

    Kafka采用 pull(拉)模式!!!

    基本设计:

    • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
    • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
      在这里插入图片描述

    二、消费流程

    在这里插入图片描述
    消费流程如下:

    1. 消费者创建连接客户端(ConsumerNetworkClient),调用 sendFetches 方法向kafka集群发送消费请求
    2. Server收到请求后,向消费者按批次发送消息数据
    3. 消费者端将收到的消息按照批次大小放到一个队列中(completedFetches)
    4. 然后消费者会从队列(FetchedRecords)中抓取数据,Max.poll.records一次拉取数据返回消息的最大条数默认500条,然后经过反序列化、拦截器等进行数据处理,最终进行消费。

    消费者端在向kafka集群抓取消息,会有几个初始化配置:

    • Fetch.min.bytes:每批次最小抓取大小(默认1字节), 当一批次数据不满足最小的抓取大小(1字节),等待到达超时时间,也会将这些数据返回。
    • fetch.max.wait.ms:一批数据最小值未达到的超时时间(默认500ms)
    • Fetch.max.bytes:每批次最大抓取大小(默认50m)
    • Max.poll.records:一次拉取数据返回消息的最大条数(默认500条)

    三、消费者组分布式协调

    消费者组里的消费者可能分布在不同的主机上,与Kafka集群进行交互时也要涉及不同的主机,那么消费者之间是怎么分布式之间的协调的呢?这里就涉及到一个 coordinator 协调者概念

    首先每一个broker中都会存在一个coordinator,消费者组借助其中一个coordinator辅助实现消费者组的初始化和分区的分配。选择策略是 根据groupId的hashcode值与offsets的分区数量求模 (如XXX主题的默认分区数是50,groupid的hashcode值 = 1,1% 50 = 1,那么XXX主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为协调者)
    选择出coordinator后,进行消费者组的初始化:

    1. 这个消费者组的所有消费者都会向coordinator发送加入消费者组的请求
    2. coordinator收到请求之后,会从所有的消费者中随机选择一个作为leader
    3. 然后将要消费的topic信息发送给leader,由leader负责制定消费方案
    4. 制定完相应的消费计划之后,再将这个消费计划发送给coordinator
    5. 然后coordinator将计划发给每一个消费者,每一个消费者按照制定的消费计划进行消费
    • 心跳机制:
      在消费过程中每个消费者都会和coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并且原本该消费者的任务,也会被分配到其他的消费者上,触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms5分钟),超过5分钟未从分区中拉取数据,也会导致消费者被移除,并且原本该消费者的任务,也会被分配到其他的消费者上,触发再平衡。在这里插入图片描述

    四、分配策略与再平衡

    Kafka有3种主流的分区分配策略:

    • Range(范围)
    • RoundRobin(轮询)
    • Sticky(黏性)

    可以通过配置参数 partition.assignment.strategy,修改分区的分配策略,默认策略是Range + CooperativeSticky,Kafka可以同时使用多个分区分配策略。

    • Range 分区策略
      在这里插入图片描述

    再平衡策略:
    0 号消费者挂掉之后,0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
    1 号消费者:消费到 3、4、0、1、2号分区数据。
    2 号消费者:消费到 5、6 号分区数据。

    数据倾斜问题:在上述进行分区分配时,只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。容易产生数据倾斜!

    • RoundRobin 分区策略
      先将所有分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。从而解决上面的数据倾斜问题
      另外,如果某个消费者没有订阅消费者组内的某个主题,也可能会导致分配的不平衡

    再平衡策略:
    某个消费者挂掉之后,所有的分区会重新进行一次再分配!(可能会造成资源的浪费)

    • Sticky 分区策略

    Sticky 分区策略主要有两个目的:

    1. 分区分配尽可能均匀
    2. 分区分配尽可能与上次分配保持相同

    两者发生冲突时,第一个目标优于第二个目标

    对于常规情况,Sticky 分区策略与RoundRobin 分区策略的分配结果基本一致
    而对于某个消费者没有订阅消费者组内的某个主题的情况,Sticky 分区策略分配更加均衡,更加合理

    再平衡策略:
    0 号消费者挂掉之后,0 号消费者的分区以轮询的方式尽可能均匀地分配到1号消费者或者 2 号消费者。
    而1号消费者或者 2 号消费者依旧保持原有的分区,即重分配后尽可能和上次分配保持相同,使分配策略具备一定“黏性”,从而减少系统资源的消耗和异常情况的发生

    再平衡的原理

    这里还存在一个问题,如果有多个消费者,彼此配置的分配策略并不完全相同,那么需要以哪个为准?

    这就涉及到了再平衡的原理:全部消费者组被分为多个子集,每个消费者组子集在服务端都对应一个 GroupCoordinator 对其进行管理。而 GroupCoordinator 最重要的职责就是负责执行消费者在均衡的操作

    再平衡的具体步骤涉及一系列步骤,这里不展开,而对于上面的不同分配策略的问题,执行策略为:

    1. 收集各个消费者支持的所有分配策略,组成候选集candidates
    2. 每个消费者从候选集candidates中找出第一个自身支持的策略,为这个策略投上一票
    3. 选票数最多的策略即为当前消费者组的分配策略

    五、Offset

    从0.9版本开始,consumer默认将offset保存在broker的partition中。上文讲到了 coordinator 协调者这个概念,coordinator 选择完毕后,消费者组下的所有的消费者提交offset的时候就会往coordinator 所在的这个分区去提交offset

    offset的提交方式:

    • 自动提交:消费者每间隔一段时间会自动提交消费的offset到系统主题中。相关参数:
      • enable.auto.commit:开启自动提交,默认值为true
      • auto.commit.interval.ms:每隔多长时间自动提交一次,默认5s
    • 手动提交:自行决定提交offset的时机。有两种提交方式:
      • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
      • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了

    不同点是同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

    指定Offset 消费策略:
    当 Kafka 中没有初始偏移量(如消费者第一次消费,宕机,数据被删除),可以指定从哪个 offset 开始消费:
    auto.offset.reset = earliest | latest | none

    • earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
    • latest(默认值):自动将偏移量重置为最新偏移量。
    • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

    自动提交和手动提交可能会导致消息的重复消费和漏消费,如果要确保每个消息有且仅被消息一次,需要借助事务消息!

    参考文献:https://zhuanlan.zhihu.com/p/371361083
    https://blog.csdn.net/wanger61?spm=1000.2115.3001.5343

  • 相关阅读:
    mybatis拦截器实现数据脱敏&拦截器使用
    GitHub获120k+star的阿里内网“疯传”葵花宝典JVM虚拟机调优指南
    MyBatis-动态SQL
    脉冲神经网络:MATLAB实现脉冲神经网络(Spiking Neural Network,SNN) 用于图像分类(提供MATLAB代码)
    MobileNetV1架构解析
    vue项目性能优化:去除没有引用的文件
    【Django】开发日报_3_Day:员工管理系统
    使用docker-compose管理freeswitch容器
    Go语言面试题
    类之间的关系
  • 原文地址:https://blog.csdn.net/wanger61/article/details/126459677