• kafka笔记(三):broker-工作流程/节点服役和退役/副本/文件存储/高效读写


    目录

    kafka broker

    kafka broker工作流程

    zookeeper存储的kafka信息

    kafka总体工作流程

    broker重要参数

    生产经验—节点服役和退役

    服役新节点

    退役旧节点

    kafka副本

    副本基本信息

    leader选举流程

    leader和follower故障处理细节

    生产经验—手动调整分区副本存储

    生产经验—leader partition负载均衡

    生产经验—增加副本因子

    文件存储

    文件存储机制

    文件清理策略

    高效读写数据


    kafka broker

    kafka broker工作流程

    zookeeper存储的kafka信息

    (1)开启zookeeper,进入zookeeper,执行./zkCli.sh

     

     (2)查看信息

    查看目录:ls /

    其它的用ls也可以查看到

    例:

     例:

     部分信息目录

    查看controller路径上的数据

     get /controller

     查看/brokers/topics/first/partitions/0/state的数据

    get /brokers/topics/first/partitions/0/state

    kafka总体工作流程

    broker重要参数

    参数名称描述
    replica.lag.time.max.ms
    ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该Follower 将被踢出 ISR 。该时间阈值, 默认 30s
    auto.leader.rebalance.enable
    默认是 true 。 自动 Leader Partition 平衡。
    leader.imbalance.per.broker.percentage
    默认是 10% 。每个 broker 允许的不平衡的 eader的比率。如果每个broker 超过了这个值,控制器会触发leader 的平衡。
    leader.imbalance.check.interval.seconds
    默认值 300 。检查 leader 负载是否平衡的间隔时间。
    log.segment.bytes
    Kafka log 日志是分成一块块存储的,此配置是指log 日志划分成块的大小, 默认值 1G
    log.index.interval.bytes
    默认 4kb kafka 里面每当写入了 4kb 大小的日志(.log ),然后就往 index 文件里面记录一个索引。
    log.retention.hours
    Kafka 中数据保存的时间, 默认 7 天。
    log.retention.minutes
    Kafka 中数据保存的时间, 分钟级别 ,默认关闭。
    log.retention.ms
    Kafka 中数据保存的时间, 毫秒级别 ,默认关闭。
    log.retention.check.interval.ms
    检查数据是否保存超时的间隔, 默认是 5 分钟
    log.retention.bytes
    默认等于 -1 ,表示无穷大。 超过设置的所有日志总大小,删除最早的segment
    log.cleanup.policy
    默认是 delete ,表示所有数据启用删除策略; 如果设置值为compact ,表示所有数据启用压缩策略。
    num.io.threads
    默认是 8 负责写磁盘的线程数。整个参数值要占总核数的 50%
    num.replica.fetchers
    副本拉取线程数,这个参数占总核数的 50% 1/3
    num.network.threads
    默认是 3 数据传输线程数,这个参数占总核数的50%的 2/3
    log.flush.interval.messages
    强制页缓存刷写到磁盘的条数,默认是 long 的最大值9223372036854775807 。一般不建议修改,交给系统自己管理。
    log.flush.interval.ms
    每隔多久,刷数据到磁盘,默认是 null 。一般不建议修改,交给系统自己管理。

    生产经验—节点服役和退役

    服役新节点

    (1)新节点准备

    1)克隆出另一台机器

    2)修改IP地址

    vim /etc/sysconfig/network-scripts/ifcfg-eno16777736

    3)修改主机名

    vim /etc/hostname

    4)修改新节点的kafka的broker.id

    vim config/server.properties

    5)删除新节点kafka下的datas和logs

    rm -rf datas/* logs/*

    6)启动zookeeper和kafka集群(启动过的不用重复启动)

    (2)执行负载均衡

    1)创建一个要负载均衡的主题

    vim topics-to-move.json

    内容: 

    1. {
    2. "topics": [
    3. {"topic": "first"}
    4. ],
    5. "version": 1
    6. }

    2)生成一个负载均衡的计划

    1. bin/kafka-reassign-partitions.sh --bootstrap-server hadoop01:9092
    2. --topics-to-move-json-file topics-to-move.json
    3. --broker-list "0,1,2,3" --generate

    3)创建副本存储计划

    上一步操作会出现Proposed partition reassignment configuration

    创建一个json文件,把Proposed partition reassignment configuration下的内容粘贴到这个文件中

    4)执行副本存储计划

    1. bin/kafka-reassign-partitions.sh --bootstrap-server hadoop01:9092
    2. --reassignment-json-file xxxxxx.json --verify

    5)验证副本存储计划

    1. bin/kafka-reassign-partitions.sh --bootstrap-server hadoop01:9092
    2. --reassignment-json-file XXXX.json --verify

    退役旧节点

    (1)执行负载均衡

    1)创建一个要负载均衡的主题

    vim topics-to-move.json
    1. {
    2. "topics": [
    3. {"topic": "first"}
    4. ],
    5. "version": 1
    6. }

    2)创建执行计划

    1. bin/kafka-reassign-partitions.sh --bootstrap-server hadoop01:9092
    2. --topics-to-move-json-file topics-to-move.json
    3. --broker-list "0,1,2" --generate

    3)创建副本存储计划

    上一步操作会出现Proposed partition reassignment configuration

    创建一个json文件,把Proposed partition reassignment configuration下的内容粘贴到这个文件中

    4)执行副本存储计划

    1. bin/kafka-reassign-partitions.sh --bootstrap-server hadoop01:9092
    2. --reassignment-json-file xxxxxx.json --verify

    5)验证副本存储计划

    1. bin/kafka-reassign-partitions.sh --bootstrap-server hadoop01:9092
    2. --reassignment-json-file XXXX.json --verify

    (2)执行停止命令

    在要退出的节点上执行

     bin/kafka-server-stop.sh

    kafka副本

    副本基本信息

    (1)Kafka副本作用:提高数据可靠性。

    (2)Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。

    (3)Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。

    (4)Kafka分区中的所有副本统称为AR(Assigned Repllicas)。

    AR = ISR + OSR

    ISR:表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。

    OSR:表示Follower与Leader副本同步时,延迟过多的副本。

    leader选举流程

    Kafka集群中有一个broker的Controller会被选举为Controller Leader,负责管理集群broker的上下线,所有topic的分区副本分配和Leader选举等工作。

    Controller的信息同步工作是依赖于Zookeeper的。

    (1)创建一个新的topic,4个分区,4个副本

    1. bin/kafka-topics.sh --bootstrap-server hadoop01:9092
    2. --create --topic first1 --partitions 4 --replication-factor 4

    (2)查看leader发布情况

    1. bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --describe
    2. --topic first1

    (3)停止不同的kafka进程并查看分区情况

    1. #停止(测试节点)
    2. bin/kafka-server-stop.sh
    3. #查看分区情况(主节点)
    4. bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
    5. --topic first1

    leader和follower故障处理细节

    LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。

    HW(High Watermark):所有副本中最小的LEO 。

    (1)Follower故障

    1) Follower发生故障后会被临时踢出ISR

    2) 这个期间LeaderFollower继续接收数据

    3)待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。

    4)等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

    (2)Leader故障

    1)Leader发生故障之后,会从ISR中选出一个新的Leader

    2)为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。

    注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

    生产经验—手动调整分区副本存储

    需求:创建一个新的topic,4个分区,两个副本,名称为three。将该topic的所有副本都存储到broker0和broker1两台服务器上。

    (1)创建新的topic

    1. bin/kafka-topics.sh --bootstrap-server hadoop01:9092
    2. --create --partitions 4 --replication-factor 2 --topic three

    (2)查看分区副本存储情况

    bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --describe --topic three

    (3)创建副本存储计划

    1. {
    2. "version":1,
    3. "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
    4. {"topic":"three","partition":1,"replicas":[0,1]},
    5. {"topic":"three","partition":2,"replicas":[1,0]},
    6. {"topic":"three","partition":3,"replicas":[1,0]}]
    7. }

    (4)执行副本存储计划

    1. bin/kafka-reassign-partitions.sh --bootstrap-server hadoop01:9092
    2. --reassignment-json-file xxxxxx.json --execute

    (5)验证副本存储计划

    1. bin/kafka-reassign-partitions.sh --bootstrap-server hadoop01:9092
    2. --reassignment-json-file xxxxxx.json --verify

    (6)查看分区副本存储计划

    bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --describe --topic three

    生产经验—leader partition负载均衡

    正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

    auto.leader.rebalance.enable,默认是true。 自动Leader Partition平衡

    leader.imbalance.per.broker.percentage, 默认是10%。每个broker允许的不平衡的leader的比率。如果broker超过了这个值,控制器会触发leader的平衡。

    leader.imbalance.check.interval.seconds,默认值300秒。检查leader负载是否平衡的间隔时间。

    生产经验—增加副本因子

    在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

    (1)创建topic

    1. bin/kafka-topics.sh --bootstrap-server hadoop01:9092
    2. --create --partitions 3 --replication-factor 1 --topic four

    (2)手动增加副本存储

    vim xxxxxx.json
    1. {
    2. "version":1,"partitions":[
    3. {"topic":"four","partition":0,"replicas":[0,1,2]},
    4. {"topic":"four","partition":1,"replicas":[0,1,2]},
    5. {"topic":"four","partition":2,"replicas":[0,1,2]}]
    6. }

    (3)执行副本存储计划

    1. bin/kafka-reassign-partitions.sh --bootstrap-server hadoop01:9092
    2. --reassignment-json-file xxxxxx.json --execute

    文件存储

    文件存储机制

    (1)topic数据的存储机制

    Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该 文件夹的命名规则为:topic名称+分区序号,例如:first-0。

    .log:日志文件

    .index:偏移量索引文件

    .timeindex:时间戳索引文件

    说明:index和log文件以当前segment的第一条消息的offset命名。

    (2)topic存储数据的位置

    1)查看hadoop01的topic的文件

     2)查看log日志

    启动生产者发送数据

     bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test

    [root@hadoop01 test-0]# cat 00000000000000000002.log
    =5V▒▒▒MwZ~▒MwZ~▒▒▒▒▒▒▒▒▒▒▒▒▒▒
    hello=5▒Zh▒▒Mwh_▒Mwh_▒▒▒▒▒▒▒▒▒▒▒▒▒▒
    spark=5]▒)0▒Mwp▒▒Mwp▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒
    kafka=5>{▒▒▒Mw~▒▒Mw~▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒


    3)查看index(直接查看不能查看,要使用工具查看)

    kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000002.index

    [root@hadoop01 test-0]# kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000002.index
    Dumping ./00000000000000000002.index
    offset: 2 position: 0
     

    4)使用工具查看log

    kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000002.log

    (3)index和log

    index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。

    参数log.index.interval.bytes默认4kb。

    Index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小。

    参数描述

    log.segment.bytes

    Kafka中log日志是分成一块块存储的,此配置是指log日志划分成块的大小,默认值1G。

    log.index.interval.bytes

    默认4kb,kafka里面每当写入了4kb大小的日志(.log)就往index文件里面记录一个索引。 稀疏索引。

    文件清理策略

    Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

    log.retention.hours:最低优先级小时,默认7天;

    log.retention.minutes:分钟;

    log.retention.ms:最高优先级毫秒;

    log.retention.check.interval.ms:负责设置检查周期,默认5分钟;

    Kafka中提供的日志清理策略有delete和compact两种

    (1)delete将过期数据删除

    log.cleanup.policy = delete 所有数据启用删除策略;

    1)基于时间:默认打开,以segment中所有记录中的最大时间戳作为该文件时间戳。

    2)基于大小:默认关闭,超过设置的所有日志总大小,删除最早的segment。

    log.retention.bytes,默认等于-1,表示无穷大。

    (2)compact日志压缩

    compact日志压缩:对于相同key的不同value值,只保留最后一个版本。

    log.cleanup.policy = compact:所有数据启用压缩策略

    压缩后的offset可能是不连续的,这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

    高效读写数据

    (1)kafka本身是分布式集群,可以采用分布式技术,并行度高

    (2)读取数据采用稀疏索引,可以快速定位要消费的数据

    (3)顺序写磁盘

    Kafka的producer生产的数据要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。

    官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

    (4)页缓存(PageCache)+零拷贝

    零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。

    页缓存:Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。

    参数描述

    log.flush.interval.messages

    强制页缓存写到磁盘的条数,默认是long的最大值9223372036854775807。一般不建议修改,交给系统自己管理。

    log.flush.interval.ms

    隔多久刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

    本文为学习笔记!!!

  • 相关阅读:
    CSS基础
    【密码学】RSA密码体制存在的问题
    速查!PMP考试成绩已出!
    036、目标检测-锚框
    Discord无法接受邀请的常见原因和解决方法
    openstack——4、开启虚拟机
    [航海协会]树
    Android WIFI架构
    LeetCode 1465. 切割后面积最大的蛋糕:纵横分别处理
    Mac 执行报错 -bash: mono: command not found 解决方式
  • 原文地址:https://blog.csdn.net/qq_55906442/article/details/126907321