• kafka的ISR工作机制原理


    昨日内容回顾:
        - ES的加密及客户端的连接方式,比如logstash,filebeat,curl,es-head,postman...
        - zk单点部署
        - kafka单点部署
        - kafka的集群扩容
        - kafka集群架构
        - kafka的堆内存调优
        - kafka的集群宏观架构
        

    Q1: 为什么需要配置"dvertised.listeners".
        因为没有配置主机名解析导致的问题。
        
    Q2: 一个分区数写入数据是有序的,但为什么多个分区数写入就是无序的呢?
        因为消费者是从多个分区读取的,顺序无法确定。
        
    Q3: leader同步follower是如何同步的?
        follower主动去leader拉取数据。

    Q4: leader挂掉之后发生了哪些事,请说明?
        leader挂掉后,follower会成为新的leader,需要借助zookeeper实现。


    Q5: follower挂掉之后如何解决,会发生哪些问题?
        leader会根据replica.lag.time.max.ms定义的间隔时间,超出该范围leader就会将其剔除ISR列表。

    Q6: 当follower数据和leader不同时,又会发生啥事?
        leader会根据replica.lag.time.max.ms定义的间隔时间,超出该范围leader就会将其剔除ISR列表。

    Q7: 当消费者数据延迟时如何解决问题?请说明原因。
        将多个消费者加入到同一个消费者组进行消费,但是要注意消费者数量不能大于分区数数量。

    Q8: 请问多副本,例如一个分区3个副本,kafka会存在数据丢失的风险吗?请说明原因。
        会,还是ISR的存在。

    相关术语:
        Log End Offset (简称"LEO"):
            每个partition的最后一个日志的偏移量。
            
        High Water(简称"HW"):
            所有ISR列表中最小的LEO。
            所有的消费者只能读取到HW之前的偏移量数据。
            
        ISR:
            和leader数据同步的所有副本集合。
            
        OSR:
            和leader数据不同的所有副本集合。

        AR:
            是ISR+OSR,指的是所有副本集合。

        replica.lag.time.max.ms: 30000ms  ---> 30s
            当follower节点超过30秒没有向leader发送fetch请求或者follower的LEO不等于leader的LEO时,leader会将follower踢出ISR列表。
            
            


    zookeeper的基础操作:
        1.简介
    zookeeper是一个分布式协调服务,其扮演的是辅助的功能。本身并不支持存储大量数据,每个znode默认可以存储大约2M的数据。

        2.什么是znode
    所谓的znode指的是zookeeper node,相当于Linux的目录,只不过znode下不仅仅可以存储子znode,其本身也能够存数据。

        3.znode的基础管理
    登录:
    # zkCli.sh 
    增:
        create /test
            创建一个"/test"的znode。
        
        create /test/b
            支持多级路径创建,但要求父路径是必须存在的。
            
        

    删:
        delete /test/a 
            删除"/test/a"的znode,要求znode为空。
            
        deleteall /test 
            删除"/test"的所有内容,可以删除非空znode。

    改:
        set /test/c/1111 bbbbbbbbbb
            修改"/test/c/1111"的znode存储数据。
            
        
    查:            *****
        ls /
            查看/的znode下有哪些子znode。
        
        ls /test -R
            递归查看"/test"子路径下的所有znode。
            
        get /test
            查看数据。
            
            
            
    kafka 0.8.0 多实例部署:
        1.解压软包包
    tar xf kafka_2.8.0-0.8.0.tar.gz  -C /oldboyedu/softwares/
     
        2.修改配置文件
    vim /oldboyedu/softwares/kafka_2.8.0-0.8.0/config/server.properties
    ...
    broker.id=101
    # kafka的监听端口
    port=19092
    log.dirs=/oldboyedu/data/kafka080
    zookeeper.connect=10.0.0.101:2181/oldboyedu-linux82-kafka080

        3.101节点同步数据
    data_rsync.sh /oldboyedu/softwares/kafka_2.8.0-0.8.0/    
        
        4.102节点修改broker.id
    vim /oldboyedu/softwares/kafka_2.8.0-0.8.0/config/server.properties
    ...
    broker.id=102
     
        5.103节点修改broker.id
    vim /oldboyedu/softwares/kafka_2.8.0-0.8.0/config/server.properties
    ...
    broker.id=102

        6.zookeeper创建znode
    create /oldboyedu-linux82-kafka080

        
        7.启动kafka
    cd /oldboyedu/softwares/kafka_2.8.0-0.8.0  && ./bin/kafka-server-start.sh config/server.properties &>/dev/null &


        8.检查zookeeper集群的ids信息
    ls /oldboyedu-linux82-kafka080/brokers/ids


    早期kafka 0.9之前的offset存储在zookeeper:
        1.查看topic信息
    ./bin/kafka-list-topic.sh --zookeeper 10.0.0.101:2181/oldboyedu-linux82-kafka080
            
        2.启动生产者
    ./bin/kafka-console-producer.sh  --broker-list 10.0.0.101:19092,10.0.0.102:19092,10.0.0.103:19092 --topic oldboyedu-linux82

        3.启动消费者
    ./bin/kafka-console-consumer.sh --zookeeper 10.0.0.101:2181/oldboyedu-linux82-kafka080 --topic oldboyedu-linux82


        4.查看偏移量,注意"console-consumer-78441"根据您的环境适当改变即可
    get /oldboyedu-linux82-kafka080/consumers/console-consumer-78441/offsets/oldboyedu-linux82/0

    从kafka 0.9之后的版本之后支持将offset存储在kafka的"__consumer_offsets"内置topic中:
        1.查看kafka内置"__consumer_offsets"的方式
    kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092  --topic __consumer_offsets  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning  | grep oldboyedu-linux82

    ....

    # [消费者组,消费的topic,分区数]::OffsetAndMetadata(偏移量,...)
    [linux82-elk,oldboyedu-linux82,4]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1661498621467, expireTimestamp=None)
    [linux82-elk,oldboyedu-linux82,1]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1661498621467, expireTimestamp=None)
    [linux82-elk,oldboyedu-linux82,2]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1661498621467, expireTimestamp=None)
    [linux82-elk,oldboyedu-linux82,0]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1661498621467, expireTimestamp=None)
    [linux82-elk,oldboyedu-linux82,3]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1661498621467, expireTimestamp=None)


        2.查看消费者组信息,注意观察"linux82-elk"的消费者组是否有消息延迟!
    kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092  --describe --all-groups
    kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092  --describe --group linux82-elk

    ...
    GROUP           TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    linux82-elk     oldboyedu-linux82 4          5               6               1               -               -               -
    linux82-elk     oldboyedu-linux82 1          4               4               0               -               -               -
    linux82-elk     oldboyedu-linux82 2          3               4               1               -               -               -
    linux82-elk     oldboyedu-linux82 0          4               5               1               -               -               -
    linux82-elk     oldboyedu-linux82 3          3               5               2               -               -               -

        3.修改配置文件
    vim /oldboyedu/softwares/kafka_2.13-3.2.1/config/consumer.properties 
    ...
    group.id=linux82-elk
     
        
        4.基于配置文件启动消费者
    kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic oldboyedu-linux82 --from-beginning --consumer.config /oldboyedu/softwares/kafka_2.13-3.2.1/config/consumer.properties


        5.再次查看消费者信息
    kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092  --describe --group linux82-elk


    kafka生产环境应该监控哪些指标:
        (1)监控kafka的程序是否正常运行;
        (2)数据是否有延迟;
        (3)是否频繁涉及到ISR列表的变化;
        (4)iops;
        (5)其他参数可参考<>;

    安装开源工具监控kafka集群:
        1.安装MySQL数据库并授权
            1.1 部署mariadb
    yum -y install mariadb-server
    systemctl start mariadb

            1.2 修改配置文件
    vim /etc/my.cnf
    ...
    [mysqld]
    # 跳过名称解析。
    skip-name-resolve


            1.3 重启服务并测试链接
    systemctl restart mariadb
    mysql -u linux82 -poldboyedu -h 10.0.0.103

            1.4 创建数据库
    CREATE DATABASE oldboyedu_linux82 DEFAULT CHARACTER SET utf8mb4;
            
            1.5 授权
    CREATE USER linux82 IDENTIFIED BY 'oldboyedu';
    GRANT ALL ON oldboyedu_linux82.* TO linux82;
    SHOW GRANTS FOR linux82;

            
            
        2.修改kafka开启JMX
            2.1 修改启动脚本
    vim /oldboyedu/softwares/kafka_2.13-3.2.1/bin/kafka-server-start.sh

    ...
    # 大概在30行左右
        export KAFKA_HEAP_OPTS="-Xmx256m -Xms256m"
        export JMX_PORT="8888"

            2.2 同步启动脚本到其他节点
    data_rsync.sh /oldboyedu/softwares/kafka_2.13-3.2.1/bin/kafka-server-start.sh

            2.3 所有重启kafka服务
    kafka-server-stop.sh 
    kafka-server-start.sh $KAFKA_HOME/config/server.properties &>/dev/null &
        
        

        3.启动zookeeper的JMX端口
            3.1 修改启动脚本
    vim /oldboyedu/softwares/apache-zookeeper-3.8.0-bin/bin/zkServer.sh +77
    ...
    # 大概是在77行左右
    ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

        
            3.2 重启zookeeper服务
    zkServer.sh restart


        4.安装kafka eagle监控工具
            4.1 解压软件包
    unzip kafka-eagle-bin-2.0.8.zip
    tar xf efak-web-2.0.8-bin.tar.gz  -C /oldboyedu/softwares/

            4.2 修改kafka eagle启动脚本的堆内存大小
    vim  /oldboyedu/softwares/efak-web-2.0.8/bin/ke.sh 
    ...
    export KE_JAVA_OPTS="-server -Xmx256m -Xms256m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

            4.3 修改配置文件 
    cat > /oldboyedu/softwares/efak-web-2.0.8/conf/system-config.properties < efak.zk.cluster.alias=linux82,cluster2
    linux82.zk.list=10.0.0.101:2181/oldboyedu-linux82-kafka3.2.1
    cluster2.zk.list=10.0.0.101:2181/oldboyedu-linux82-kafka080
    linux82.efak.broker.size=20
    kafka.zk.limit.size=32
    efak.webui.port=8048
    linux82.efak.offset.storage=kafka
    cluster2.efak.offset.storage=zk
    linux82.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
    efak.metrics.charts=true
    efak.metrics.retain=15
    efak.sql.topic.records.max=5000
    efak.sql.topic.preview.records.max=10
    efak.topic.token=keadmin
    efak.driver=com.mysql.cj.jdbc.Driver
    efak.url=jdbc:mysql://10.0.0.103:3306/oldboyedu_linux82?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=linux82
    efak.password=oldboyedu
    EOF

            4.4 配置环境变量
    cat > /etc/profile.d/kafka_eagle.sh < #!/bin/bash

    export KE_HOME=/oldboyedu/softwares/efak-web-2.0.8
    export PATH=$PATH:$KE_HOME/bin
    EOF


            4.5 启动kafka eagle服务,第一次需要等一会,可能有点漫长哈!大概30s~1分钟左右
    ke.sh start


    kafka集群压力测试:
        1.简介 
    所谓压力测试就是对一个集群的处理能力的上限做一个评估。为将来集群扩容提供有效的依据。
        
        2.为什么要进行压力测试
    (1)压力测试可以了解当前集群的处理能力上限;
    (2)当修改集群的配置参数后,压力测试可以协助运维人员去参考本次调优的效果;
    (3)压力测试的结果以后可以作为参考扩容集群的有效依据;
        
        3.实战案例
    install -d /tmp/kafka-test/
    cat > oldboyedu-kafka-test.sh <<'EOF'
    # 创建topic
    kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic oldboyedu-kafka-2022 --replication-factor 1 --partitions 10 --create

    # 启动消费者消费数据
    nohup kafka-consumer-perf-test.sh --broker-list 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic  oldboyedu-kafka-2022 --messages 100000000 &>/tmp/kafka-test/oldboyedu-kafka-consumer.log &

    # 启动生产者写入数据
    nohup kafka-producer-perf-test.sh --num-records 100000000 --record-size 1000 --topic  oldboyedu-kafka-2022 --throughput 1000000 --producer-props bootstrap.servers=10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 &> /tmp/kafka-test/oldboyedu-kafka-producer.log &
    EOF
    bash oldboyedu-kafka-test.sh 


    参数说明:
    kafka-consumer-perf-test.sh 
        ---messages:
            指定消费消息的数量。
        --broker-list:
            指定broker列表。
        --topic:
            指定topic主体。
            
    kafka-producer-perf-test.sh 
        -num-records
            生产消息的数量。
        --record-size:
            每条消息的大小,单位是字节。
        --topic:
            指定topic主体。
        --throughput 
            设置每秒发送的消息数量,即指定最大消息的吞吐量,若设置为-1代表不限制!
        --producer-props bootstrap.servers
            指定broker列表。
            
            
    温馨提示:
        本案例测试大约会生成93GB( echo  "100000000 * 1000/1024/1024/1024" | bc)的数据,如果硬盘资源不足的小伙伴可以暂时不用测试了,或者该小上面提到的参数。

    今日内容回顾:
        - kafka的ISR工作机制原理   *****
        - ZK的znode基础操作  ***
        - kafka多实例部署
        - kafka eagle监控工具
        - kafka集群压力测试  ***

    下次课程:
        - kafka优化
        - kafka配合elk使用案例
        - zk的ACL
        - zk的监控工具
        - zk调优
        - ES8集群环境搭建

  • 相关阅读:
    迅速了解JDK线程池以及Spring线程池
    C现代方法(第14章)笔记——预处理器
    如何在MATLAB中创建和操作矩阵?
    基于PHP下的大学生校园交流论坛的设计与实现毕业设计源码101634
    springboot毕设项目高校宿舍管理系统的设计与实现ukgdt(java+VUE+Mybatis+Maven+Mysql)
    MyBatis详解(一)
    JAVA后端开发面试基础知识(十)——设计模式
    9.30号作业
    BI财务分析 – 反映盈利水平利润占比的指标如何分析(上)
    电磁场中的几种阻抗
  • 原文地址:https://blog.csdn.net/lpx1249115962/article/details/132644401