• 我的 Kafka 旅程 - Consumer


    作者:[Sol·wang] - 博客园,原文出处:https://www.cnblogs.com/Sol-wang/p/16691613.html

    kafka采用Consumer消费者Pull主动拉取数据的方式,当Broker无数据时,消费者空转。Kafka并不删除已消费的消息,各自独立的消费者可消费同一个Broker分区数据。

    一、消费流程

    1.1 消费者发起网络消费请求

    # 每批次最小抓取设置(推荐1字节)
    fetch.min.bytes
    # 每批次最大抓取大小设置(推荐500ms)
    fetch.max.bytes
    # 未达到大小的超时设置(推荐50M)
    fetch.max.wait.ms

    1.2 拉取数据到内存消费队列中

    # 单次拉取最大消息条数设置(推荐500条)
    max.poll.records

      反序列化处理(对应了Producer端的序列化动作)

      拦截器处理(如:汇总统计记录)

    1.3 数据的后续处理

      保存等的消费端动作。

    二、Offset

    当一个消费者挂掉或重启后,是否还记得消费到的位置了?offset解决了此问题。
    对于每一个topic,都会维持一个分区日志,分区中的每一个记录都会分配一个Id来表示顺序,称之为offset,offset用来唯一的标识分区中每条记录,并将每次的消费位置提交到topic中。消费者恢复启动后接着按序消费数据。

    2.1 自动提交

    # 开启自动提交
    enable.auto.commit = true
    # 每次提交间隔(推荐5秒)
    auto.commit.interval.ms = 5000

    2.2 手动提交

    先关闭自动提交后,在Consumer客户端的代码中,通过调用方法函数提交,通常的方法名:

    # 同步提交,等提交完成才可下一次再消费
    .CommitSync
    # 异步提交,可直接进行下一个消费,也有可能提交失败
    .CommitAync

    2.3 指定消费

    在Consumer客户端的代码中,手动指定offset的位置进行消费,关联到的方法函数名:

    # 按指定时间得出offset值
    .offsetsForTimes
    # 按指定offset值继续消费
    .seek

    2.4 初始策略

    # earliest:	最早消费;无offset时,从头开始消费。
    # latest:	最新消费;无offset时,从最新的数据开始消费。
    # none:	无offset时,引发异常。
    auto.offset.reset = earliest | latest | none

    2.5 消费现象

    重复消费:offset未提交成功,下次消费还是旧的offset。

    漏消费:offset提交成功,消费者端后续的数据处理未完成(建议下游步骤事务处理)。

    三、消费者组

    为了实现横向扩展,应用程序需要创建一个消费者群组,然后往群组里添加消费者来提高处理效率,群组里的每个消费者只处理一部分消息。

    消费者组是逻辑上的一个消费者,是由一个或多个消费者实例组成,具有可扩展性和可容错性,消费者组内的消费者共享一个GroupId组成;组内每个消费者负责消费不同分区数据,并行消费数据;当组内一个消费者挂了之后,其它消费者要自动承担它的消费任务 - 组内再平衡

    3.1 触发再平衡

    消费成员与Broker分区保持心跳连接,或者消费成员处理消息时间过长,会被认为此消费者需要被移除,触发组内消费成员任务再分配。以下配置任其一条件触发再平衡:

    # 心跳连接超时的 移除条件(建议45秒)
    session.timeout.ms
    # 消息处理超时的 移除条件(建议5分钟)
    max.poll.interval.ms

    3.2 再平衡策略

    # 再平衡策略配置项(可多策略组合)
    partition.assignment.strategy = Range | RoundRobin | Sticky | CooperativeSticky
    • Range:单个Topic内的重新平均分配
    • RoundRobin:所有Topic的全部消费者,一起重新分配
    • Sticky:一次小范围重新分配;仅调整需要的,避免大规模重新分配
    • CooperativeSticky:可多次小范围重新调整,直至最终效果

    四、提升吞吐量

    • 增加分区,增加消费者,两者一一对应起来,并行消费
    • 调整一次最多拉取的消息条数(500条)
    • 调整单次抓取的数据最大容量(50M)
  • 相关阅读:
    基于Redis手工实现分布式锁
    基于ssm汽车租赁系统
    网关 GateWay 的使用详解、路由、过滤器、跨域配置
    Centos GCC 版本升级
    计网第五章(运输层)(四)(TCP的流量控制)
    C++中的不规则二维数组
    VS系列多通道振弦温度采发仪的选型与开机操作
    【ARM 安全系列介绍 1 -- 奇偶校验与海明码校验详细介绍】
    酷早报:6月30日Web3元宇宙业界重点消息大汇总
    redis 6.0.5 linux详细安装步骤和测试
  • 原文地址:https://www.cnblogs.com/Sol-wang/p/16691613.html