• 大数据队列Kafka


    了解什么是kafka之前,首先要了解一下什么是消息队列

    一丶kafka的基本概述

    消息队列:MQ介绍

      • 定义

        • 官方定义:消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

        • 简单点说:消息队列MQ用于实现两个系统之间或者两个模块之间传递消息数据时,实现数据缓存

      • 功能

        • 基于队列的方式,实现消息传递的数据缓存【长久】

          • 队列的特点:顺序:先进先出

      • 应用场景

        • 用于所有需要实现实时、高性能、高吞吐、高可靠的消息传递架构中

        • 大数据应用中:作为唯一的实时数据存储平台

          • 实时数据采集:生产写入Kafka

          • 实时数据处理:消费读取Kafka

      • 优点

        • 实现了架构解耦

          • 需求:C也需要A的数据

          • 如果不构建消息队列:A =》 B

            • 停止A,修改A的代码,添加发送C的代码

            • 高耦合度的

          • 如果构建了消息队列:A =》 MQ 《= B

            • 直接让C从MQ中取即可

            • 低耦合度的

        • 保证了最终一致性

          • 最终可以保证实现最初的需求

        • 实现异步,提供传输性能

          • A给B和C都发一份数据

          • 不做消息队列

            • A发送:10s

            • B接受:1000s

            • C接受:1000s

            • 总共:2020s

          • 做了消息队列

            • A:10s

            • B和C并行接受:1000s

            • 总共:1010s

        • 限流削峰:合理根据成本来控制资源

      • 缺点

        • 增加了消息队列,架构运维更加复杂

          • 必须保证消息队列是高可靠的

          • 如果消息队列故障,整个所有系统都瘫痪了

          • | 保证消息队列即使机器出现故障,消息队列也能正常运行 =》 分布式

        • 数据保证更加复杂,必须保证生产安全和消费安全

          • 数据安全

            • 数据在传输过程中:不丢失、不重复

    • 小结

      • 消息队列的功能、特点是什么?

      • 功能:实现两个系统或者两个模块之间的数据缓存,解决高并发读写

        • 优点:架构解耦、异步模式能提高并发性能

        • 缺点:架构和安全维护更加麻烦

    消息队列:同步与异步

      • 同步的概念

        • 流程

          • step1:用户提交请求

          • step2:后台处理请求

          • step3:将处理的结果返回给用户,用户继续下一步操作

        • 特点:用户看到的结果就是我处理好的结果,等待看到结果以后,进行下一步

        • 场景:去银行存钱、转账等,必须看到真正处理的结果才能表示成功,实现立即一致性

        • 优点:结果肯定是准确的

        • 缺点:性能问题

      • 异步的概念

        • 流程

          • step1:用户提交请求

          • step2:后台将请求放入消息队列,等待处理,返回给用户一个临时结果,用户不管这次的结果是什么,直接进行下一步

          • step3:用户看到临时的结果,真正的请求在后台等待处理

        • 特点:用户看到的结果并不是我们已经处理的结果

        • 场景:用户暂时不需要关心真正处理结果的场景下,只要保证这个最终结果是用户想要的结果即可,实现最终一致性

        • 优点:性能更高

        • 缺点:可能结果误差

      • 数据传递的同步与异步

        • A给B发送消息:基于UDP协议

          • A首先给B发送一条数据

          • A不管B有没有收到,继续发送下一条

          • 优点:快

          • 缺点:数据容易丢失

          • 异步过程

        • A给B发送消息:基于TCP协议

          • A首先给B发送一条数据

          • A会等待B告诉A收到了这条消息,A才会发送下一条

          • 优点:安全

          • 缺点:性能相对差一些

          • 同步过程

    • 小结

      • 同步:提交和处理是同步操作,立即就能看到结果,立即一致性

      • 异步:提交和处理是异步操作,最终得到一个处理的结果,最终一致性

    消息队列:点对点模式

    • 实施

      • 角色

        • 生产者:负责往消息队列中写数据的

        • 消息队列:负责缓存传递的数据

        • 消费者:负责从消息队列中读取数据的

      • 流程

        • step1:生产者要往消息队列中写数据

        • step2:消费者从消息队列中读数据

        • step3:消费者消费成功以后,会返回一个确认ack给消息队列,消息队列会将消费成功的数据删除

    • 小结

      • 特点:数据只能被一个消费者使用,消费成功以后数据就会被删除,无法实现消费数据的共享

    消息队列:订阅发布模式

    • 实施

      • 角色

        • 生产者

        • 消息队列

        • 消费者

        • Topic:主题,用于划分存储不同业务的数据

      • 流程

        • step1:生产者往消息队列中生产数据,将数据写入对应的主题中

        • step2:消费者可以订阅主题,如果主题中出现新的数据,消费就可以立即消费

      • 特点:一个消费者可以订阅多个主题,一个主题可以被多个消费者订阅

        • 消费成功以后,不会立即主动删除数据

        • 可以实现数据共享

    • 小结

      • 什么是发布订阅模式?

        • 发布:生产者不断将最新的数据生产写入消息队列的主题中

        • 订阅:消费者只要订阅了主题,就能立即获取最新的数据

        • 类似于微信公众号

    Kafka的介绍及特点

    • 实施

      • 官网:kafka.apache.org

        • 领英公司基于Scala语言开发的工具

        • Scala语言:基于JVM之上的语言

          val inputRdd = sc.textFile("new Path")
          val wcRdd = inputRdd
              .flatMap(_.trim.split(" "))
              .map((_,1))
              .reduceByKey(_+_)
          wcRdd.saveAsTextFile
          
      • 功能

        • 分布式流式数据实时存储:分布式存储

          • 实时消息队列存储,工作中主要使用的功能

        • 分布式流式计算:分布式计算:KafkaStream

          • 这个功能一般不用

      • 定义

        • 分布式的基于订阅发布模式的高吞吐高性能的实时消息队列系统

      • 应用场景

        • 实时场景

        • 目前:只要做实时大数据,都必用Kafka

          • 离线数据仓库:Hive

          • 实时数据仓库:Kafka

        • Kafka生产者:数据采集的工具

        • Kafka消费者:实时计算的程序

      • 特点

        • 高性能:实时的对数据进行实时读写

          • Kafka也使用内存

        • 高并发:分布式并行读写

          • 分布式架构

        • 高吞吐:使用分布式磁盘存储

          • Kafka也基于磁盘

        • 高可靠:分布式主从架构

        • 高安全性:数据安全保障机制

          • 内存 + 磁盘:副本

          • 这个内存非常特殊:操作系统级别,即使Kafka服务故障,数据依旧存在,只有机器故障才受影响

        • 高灵活性:根据需求,随意添加生产者和消费者

          • 异步模式

    • 小结

      • Kafka在大数据中专门用于实现实时的数据存储,实现大数据实时计算

    存储结构

    • MySQL:数据库、表、行数据【列】

    • HDFS:目录、文件 / 块、行数据

    • Redis:数据库、分片【小集群】、KV

    Kafka概念:Producer、Broker、Consumer

    • 实施

      • Broker:Kafka是一个分布式集群,多台机器构成,每台Kafka的节点就是一个Broker

      • Producer:生产者

        • 负责将数据写入Kafka中,工作中一般生成都是数据采集工具

        • 本质:==Kafka写入数据的客户端==

          • Kafka的每条数据格式:KV格式

      • Consumer:消费者

        • 负责从Kafka中消费数据

        • 本质:==Kafka读取数据的客户端==

          • 消费数据:主要消费的数据是V

      • Consumer Group:==Kafka中必须以消费者组的形式从Kafka中消费数据==

        • 消费者组到kafka消费数据

        • 任何一个消费者必须属于某一个消费者组

        • 一个消费者组中可以有多个消费者:多个消费者共同并行消费数据,提高消费性能

          • 消费者组中多个消费者消费的数据是不一样的

          • 整个消费者组中所有消费者消费的数据加在一起是一份完整的数据

    • 小结

      • 生产者 =》 Kafka 集群【多个Broker】 《= 消费者组【消费者】

    Kafka概念:Topic、Partition

    • 实施

      • Topic:数据主题,用于区分不同的数据,对数据进行分类

        • 类似于MySQL中会将数据划分到不同的表:不同的数据存储在不同的表中

        • Kafka是分布式存储

        • Topic就是分布式的概念:一个Topic可以划分多个分区Partition,每个不同分区存储在不同的Kafka节点上

          • 写入Topic的数据实现分布式存储

        • 问题:生产者写入一条KV结构数据,这条数据写入这个Topic的哪个分区由分区规则来决定,分区规则是什么呢?

          • 有多种分区规则:不同场景对应的分区规则不一样

      • Partition:数据分区,用于实现Topic的分布式存储,对Topic的数据进行划分

        • 每个分区存储在不同的Kafka节点Broker上

        • 写入Topic:根据一定的规则决定写入哪个具体的分区

    • 小结

      • 什么是Topic,什么是Partition?

      • Topic:类似于数据库或者表的概念,用于对数据进行分类,不同业务的数据放入不同Topic

        • Kafka的存储是分布式存储

        • 数据都是读写Topic

        • Topic就是分布式存储

        • Partition:一个Topic可以划分多个Partition,写入Topic的数据可以存储在不同的Partition中

          • 不同Partition可以存储在不同的Kafka节点上

    Kafka概念:分区副本机制

    • 实施

      • 问题1:Kafka中的每个Topic的每个分区存储在不同的节点上,如果某个节点故障,怎么保证集群正常可用?

        • Kafka选用了==副本机制==来保证数据的安全性

          • 如果某台机器故障,其他机器还有这个分区的副本,其他机器的副本照样可以对外提供客户端读写

        • Kafka每一个分区都可以有多个副本

          • 类似于HDFS的副本机制,一个块构建多个副本

        • 注意:Kafka中一个分区的副本个数最多只能等于机器的个数,相同分区的副本不允许放在同一台机器,没有意义

      • 问题2:一个分区有多个副本,读写这个分区的数据时候,到底读写哪个分区副本呢?

        • Kafka将一个分区的多个副本,划分为两种角色

        • Leader副本:负责对外提供读写

          • 生产者和消费者只对leader副本进行读写

        • Follower副本

          • 与Leader同步数据

          • 如果leader故障,从follower新的leader副本对外提供读写

    • 小结

      • Kafka怎么保证分区数据安全?

        • 副本机制,一个分区可以有多个副本,相同分区的副本不能存储在同一台机器

      • Kakfa如何决定分区副本的读写?

        • 每个分区的副本划分为两种角色

        • leader:对外提供读写

        • follower:与Leader同步数据,如果Leader故障,从Follower选举一个新的Leader

    Kafka概念:Segment

      • 定义:对每个分区的数据进行了更细的划分,先写入的数据会先生成一对Segment文件,存储到一定条件以后,后面数据写入另外一对Segment文件,每个文件就叫Segment文件对

      • 内容:每个Segment对应一对【两个】文件

        • xxxxxxxxx.log:存储数据

        • xxxxxxxxx.index:对应.log的文件的数据索引

      • 设计:为了加快数据检索的效率,将数据按照规则写入不同文件,以后可以根据规则快速的定位数据所在的文件,读取对应的小的segment文件,不用读取所有数据文件

      • 举例

        • 如果分区第一次写入数据,会产生第一个segment

          00000000000000000000000.log
          00000000000000000000000.index
          00000000000000000000000.timeindex
        • 当文件越来越大,存储的数据越来越多,影响读的性能,会再构建一个新的segment,老的segment不再被写入

          00000000000000000000000.log         
          00000000000000000000000.index
          00000000000000000000000.timeindex
          ​
          00000000000000000199999.log
          00000000000000000199999.index
          00000000000000000199999.timeindex
          ​
          00000000000002000000000.log
          00000000000002000000000.index
          00000000000002000000000.timeindex
        • Segment文件的名字就是这个Segment记录的offset的最小值

          • 消费者消费数据是根据offset进行消费的

          • 消费者1:想消费分区1:39999这个offset开始消费

          • 先根据文件文件名来判断我要的offset在哪个文件中

    • 小结

      • 什么是Segment?

        • 功能:对分区内部的数据进行了划分

        • 规则:先写入的数据先写入第一个Segment,达到一定条件,数据就写入新的Segment对中

        • 实现:每个Segment中包含两种文件

          • xxxxxx.log:数据

          • xxxxxx.index:对应.log文件的索引

        • 设计:为了加快查询效率

    Kafka概念:Offset

    • 定义Kafka中所有消费者数据的读取都是按照Offset来读取数据,每条数据在自己分区中的偏移量
      • 先写入的offset就越小

        • 第一条数据的offset就为0

        • 第二条数据的offset就为1

        • ……

      • 消息队列:先进先出

      • 写入分区的顺序就是offset偏移量,==Offset是分区级别的==,每个分区的offset独立管理,都从0开始

      • 生成:生产者往Kafka中写入数据,写入某个分区

        • 每个分区单独管理一套Offset【分区】,offset从0开始对每条数据进行编号

        • Kafka写入数据也是按照KV来写入数据

          #Kafka中一条数据存储的结构
          offset      Key             Value
      • 功能:基于offset来指定数据的顺序,消费时候按照offset顺序来读取

        • 消费者消费Topic分区中的数据是按照offset进行顺序消费的

        • 怎么保证不丢失不重复:只要保证消费者每次按照offset的顺序消费即可

          • 如果没有Offset

            • 从头取一遍:数据重复

            • 从最新的去:数据丢失

    • 小结

      • Offset用于标记分区中的每条数据,消费者根据上一次消费的offset对分区继续进行消费,保证顺序

      • 实现保证数据不丢失不重复

    二丶kafka的部署使用

    Kafka集群架构

    • 目标了解Kafka集群架构及角色功能

    • 路径

      • Kafka集群有哪些角色?

      • Kafka每个角色的功能是什么?

      • Zookeeper在架构中的作用是什么?

    • 架构角色

        • Kafka:分布式主从架构,实现消息队列的构建

        • Zookeeper:辅助选举Controller、元数据存储

      • Kafka中的每个角色以及对应的功能

        • 分布式主从架构

          • 节点:Broker

          • 进程:Kafka

        • 主:Kafka ==Controller==

          • 是一种特殊的Broker,从所有Broker中选举出来的,负责普通Broker的工作

          • 负责管理所有从节点:Topic、分区和副本

          • 每次启动集群,会从所有Broker中选举一个Controller,由ZK实现

        • 从:Kafka Broker

          • 对外提供读写请求

          • 其他的Broker监听Controller,如果Controller故障,会重新从Broker选举一个新的Controller

      • ZK的功能

        • 辅助选举Controller

        • 存储元数据

    • 小结

      • kafka是一个主从架构,整体对外提供分布式读写

      • ZK主要负责选举Controller和实现元数据存储

    Kafka分布式集群部署

    • 目标实现Kafka分布式集群的搭建部署

    • 路径

      • step1:选择版本

      • step2:下载解压安装

      • step:3:修改配置文件

    • 实施

      • 版本的选型

        • 0.8.x:老的版本,很多的问题

        • 0.10.x +:消息功能上基本没有问题

        • 选择:kafka_2.12-2.4.1.tgz

          • Kafka:2.4.1

          • Scala:2.12,Kafka是由Scala语言开发

      • 下载解压安装

        • 下载:Index of /dist/kafka

        • 上传到第一台机器

          cd /export/software/
          rz
        • 解压

          tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
          cd /export/server/kafka_2.12-2.4.1/
          mkdir logs
          • bin:一般用于存放客户端操作命令脚本

          • sbin:一般用于存放集群的启动和关闭的命令脚本,如果没有这个命令,脚本放在bin目录中

          • conf/etc/config:配置文件目录

          • lib:jar包的存放目录

          • logs:一般用于存放服务日志

      • 修改配置

        • 切换到配置文件目录

          cd /export/server/kafka_2.12-2.4.1/config
          
        • 修改server.properties

          1. #21行:唯一的 服务端id
          2. broker.id=0
          3. #60行:指定kafka的日志及数据【segment【.log,.index】】存储的位置
          4. log.dirs=/export/servers/kafka_2.12-2.4.1/logs
          5. #123行:指定zookeeper的地址
          6. zookeeper.connect=node01:2181,node02:2181,node03:2181
          7. #在最后添加两个配置,允许删除topic,当前kafkaServer的主机名
          8. delete.topic.enable=true
          9. host.name=node01
        • 分发

          cd /export/servers/
          scp -r kafka_2.12-2.4.1 node02:$PWD
          scp -r kafka_2.12-2.4.1 node03:$PWD
        • 第二台:server.properties

          1. #21行:唯一的 服务端id
          2. broker.id=1
          3. #最后
          4. host.name=node02
        • 第三台:server.properties

          1. #21行:唯一的 服务端id
          2. broker.id=2
          3. #最后
          4. host.name=node03
        • 添加环境变量

          vim /etc/profile
          
          1. #KAFKA_HOME
          2. export KAFKA_HOME=/export/servers/kafka_2.12-2.4.1
          3. export PATH=:$PATH:$KAFKA_HOME/bin
          source /etc/profile
          
    • 小结

      • 按照笔记一步步来,不做过多要求,只要配置含义,实现安装即可

      • 解压安装

      • 修改配置:server.properties

    Kafka启动与关闭

    • 目标掌握kafka集群的启动与关闭命令及脚本封装

    • 路径

      • step1:如何启动Kafka集群?

      • step2:如何关闭Kafka集群?

      • step3:如何封装启动及关闭脚本?

    • 实施

      • 启动Zookeeper

        start-zk-all.sh 
      • 启动Kafka

        bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &
        ​
         >>/dev/null 2>&1 &:在后台运行
      • 关闭Kafka

        bin/kafka-server-stop.sh 
        
      • 封装Kafka脚本

        这里我封装的也不太好,有时间可以去网上搜一下详解
    • 小结

      • 启动:kafka-server-start.sh

      • 关闭:kafka-server-stop.sh

    Topic管理:创建与列举

    • 目标掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic

    • 路径

    • 实施

      • 创建Topic

      • bin/kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node01:9092,node02:9092,node03:9092
        • --create:创建

        • --topic:指定名称

        • --partitions :分区个数

        • --replication-factor:分区的副本个数

        • --bootstrap-server:指定Kafka服务端地址

        • --list:列举

      • 列举Topic

        bin/kafka-topics.sh --list -bootstrap-server node01:9092,node02:9092,node03:9092
        
    • 小结

      • 掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic

    Topic管理:查看与删除

    • 实施

      bin/kafka-topics.sh --delete --topic bigdata02 --bootstrap-server node01:9092,node02:9092,node03:9092

      • 查看Topic信息

        bin/kafka-topics.sh --describe --topic bigdata01  --bootstrap-server node01:9092,node02:9092,node03:9092
        • 结果

          Topic: bigdata01   PartitionCount: 3   ReplicationFactor: 2  Configs: segment.bytes=1073741824
          ​
          Topic: bigdata01        Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0,2
          Topic: bigdata01        Partition: 1    Leader: 2       Replicas: 2,1   Isr: 2,1
          Topic: bigdata01        Partition: 2    Leader: 1       Replicas: 1,0   Isr: 1,0
          • 每个分区都有一个唯一的标号:从0开始

          • 怎么唯一标识一个分区:Topic名称+分区编号

          • Leader:这个分区的Leader副本所在的Broker id

          • Replicas:这个分区的所有副本所在的Broker的id

          • ISR:in -sync -replicas:可用副本

      • 删除Topic

        kafka-topics.sh --delete --topic bigdata02 --partitions 3 --replication-factor 2 --bootstrap-server node01:9092,node02:9092,node03:9092 
        
    • 小结

      • 查看信息:describe

      • 删除:delete

    生产者及消费者测试

    • 实施

      • Console生产者

      • bin/kafka-console-producer.sh --topic bigdata01 --broker-list node01:9092,node02:9092,node03:9092
        
      • Console消费者

        bin/kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node01:9092,node02:9092,node03:9092  --from-beginning
        
        • --from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费

        • 默认从最新位置开始消费

        • --from-beginning:从最早的位置开始消费

    • 小结

      • 只要生产者不断生产,消费就能实时的消费到Topic中的数据

    可视化工具Kafka Tool

    • 实施

      • 安装Kafka Tool:不断下一步即可

      • 构建集群连接:连接Kafka集群

        查看集群信息

    • 小结

      • 可视化工具,界面或者交互性不是很友好

      • 后面会学习:Kafka Eagle

    Kafka集群压力测试

    • 实施

      • 创建Topic

        bin/kafka-topics.sh --create --topic bigdata --partitions 2 --replication-factor 2 --bootstrap-server node01:9092,node02:9092,node03:9092
        
      • 生产测试

        kafka-producer-perf-test.sh --topic bigdata --num-records 1000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node01:9092,node02:9092,node03:9092 acks=1
        
        • --num-records:写入数据的条数

        • --throughput:是否做限制,-1表示不限制

        • --record-size:每条数据的字节大小

      • 消费测试

        kafka-consumer-perf-test.sh --topic bigdata --broker-list node01:9092,node02:9092,node03:9092  --fetch-size 1048576 --messages 1000000
        
    • 小结

      • 工作中一般根据实际的需求来调整参数,测试kafka集群的最高性能,判断是否能满足需求

    Kafka API 的应用

    • 实施

      • 命令行使用Kafka

        • 一般只用于topic的管理:创建、删除

      • 大数据架构中使用Kafka

        • Java API:构建生产者和消费者

        • 工作中一般不用自己开发生产者

        • 生产者:数据采集工具

          • Flume:Kafka sink

            • 配置kafka集群地址

            • Topic的名称

        • 消费者:实时计算程序

          • SparkStream:KafkaUtil

            KafkaUtil.createDirectStream(Kafka集群地址,消费的Topic)
        • 这些软件的API已经将Kafka生产者和消费者的API封装了,只要调用即可

        • ==重点掌握:用到哪些类和方法==

      • Kafka的API的分类

        • High Level API:高级API

          • 基于了SimpleAPI做了封装,让用户开发更加方便

          • 但是由于封装了底层的API,有很多的东西不能控制,无法控制数据安全

          • Offset自动存储Zookeeper中,不用自己管理

        • ==Simple API:简单API==

          • 并不简单,最原始的API

          • 自定义控制所有消费和生产、保证数据安全

    • 小结

      • 大数据工作中一般不自己开发Java API:掌握类和方法即可

      • 只使用Simple API来实现开发

    生产者API:生产数据到Kafka

    • 实施

      1. package bigdata.itcast.cn.kafka.producer;
      2. import org.apache.kafka.clients.producer.KafkaProducer;
      3. import org.apache.kafka.clients.producer.ProducerRecord;
      4. import java.util.Properties;
      5. public class KafkaProducerClient {
      6. public static void main(String[] args) {
      7. //todo:1 构建Kafka生产者连接对象
      8. //构建连接配置
      9. Properties props=new Properties();
      10. //指定服务端地址
      11. props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9093");
      12. /**
      13. * 问题:生产者怎么保证生产数据不丢失? ack机制 + 重试机制
      14. * ack:数据传输的确认码,用于定义生产者如何将数据写入Kafka
      15. * 0:生产者发送一条数据写入Kafka,不管Kafka有没有写入这条数据,都直接发送下一条【快,不安全,不用的】
      16. * 1:中和性方案,生产者发送一条数据写入Kafka,Kafka将这条数据写入对应分区Leader副本,就返回一个ack,生产者收到ack,发送下一条
      17. * 【性能和安全性之间做了权衡】
      18. * all/-1:生产者发送一条数据写入Kafka,Kafka将这条数据写入对应分区Leader副本,并且等待所有ISR:Follower同步成功,就返回一个ack,生产者收到ack,发送下一条
      19. * 【安全,慢】
      20. * 如果ack为1或者为all,生产者没有收到ack,就认为数据丢失,重新发送这条数据
      21. **/
      22. props.put("acks", "all");
      23. //指定写入Kafka中的KV的序列化的类
      24. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      25. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      26. //构建生产者对象
      27. KafkaProducer producer = new KafkaProducer<>(props);
      28. //todo:2-调用连接对象的方法实现生产数据
      29. for (int i = 0; i < 10; i++){
      30. //ProducerRecord:生产者的数据对象,用于send发送对象写入Kafka:Topic、Key、Value
      31. // producer.send(new ProducerRecord("bigdata01", i+"", "itcast"+i));
      32. //ProducerRecord:生产者的数据对象,用于send发送对象写入Kafka:Topic、Value
      33. //producer.send(new ProducerRecord("bigdata01", "itcast"+i));
      34. //ProducerRecord:生产者的数据对象,用于send发送对象写入Kafka:Topic、Partition、Key、Value
      35. producer.send(new ProducerRecord("bigdata01", 0,i+"", "itcast"+i));
      36. }
      37. producer.close();
      38. }
      39. }

    • 小结

      • 掌握具体的类和方法?

        • Properties:配置类

        • KafkaProducer:生产者类

          • send(数据对象):负责生产数据到Kafka中

        • ProducerRecord:数据类对象

          • Topic、Key、Value:类似于Hash取余

          • Topic、Value:将所有数据写入了某一个分区

          • Topic、Partition、Key、Value:将所有数据写入指定的分区

    消费者API:消费Topic数据

    • 实施

      1. package bigdata.itcast.cn.kafka.consumer;
      2. import org.apache.kafka.clients.consumer.ConsumerRecord;
      3. import org.apache.kafka.clients.consumer.ConsumerRecords;
      4. import org.apache.kafka.clients.consumer.KafkaConsumer;
      5. import java.time.Duration;
      6. import java.util.Arrays;
      7. import java.util.Properties;
      8. public class KafkaConsumerClient {
      9. public static void main(String[] args) {
      10. //todo:1 构建Kafka消费者连接对象
      11. Properties props =new Properties();
      12. //指定服务端地址
      13. props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
      14. //指定当前消费者属于哪个组
      15. props.setProperty("group.id", "test01");
      16. //开启自动提交
      17. props.setProperty("enable.auto.commit", "true");
      18. //自动提交的时间间隔
      19. props.setProperty("auto.commit.interval.ms", "1000");
      20. //读取数据对KV进行反序列化
      21. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      22. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      23. //构建消费者对象
      24. KafkaConsumer consumer = new KafkaConsumer<>(props);
      25. //todo:2-消费数据
      26. //先订阅Topic
      27. consumer.subscribe(Arrays.asList("bigdata01"));
      28. //再消费Topic
      29. while (true) {
      30. //step1:拉取订阅的Topic中的数据,100ms是一个超时时间
      31. //ConsumerRecords:存储的是本次拉取的所有数据
      32. ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
      33. //处理数据
      34. //ConsumerRecord:消费到的每一条数据对象
      35. for (ConsumerRecord record : records){
      36. //Topic
      37. String topic = record.topic();
      38. //part
      39. int part = record.partition();
      40. //offset
      41. long offset = record.offset();
      42. //Key And Value
      43. String key = record.key();
      44. String value = record.value();
      45. //模拟处理:输出
      46. System.out.println(topic+"\t"+part+"\t"+offset+"\t"+key+"\t"+value);
      47. }
      48. }
      49. }
      50. }
    • 小结

      • 掌握具体的类和方法?

        • Properties:配置对象

        • KafkaConsumer:消费者对象

          • subscribe:订阅Topic

          • poll:拉取Topic数据

        • ConsumerRecords:拉取到消费所有数据的集合

        • ConsumerRecord:每一条消费到的数据对象

          • topic、partition、offset、key、value

    三丶Kafka的规则与机制

    生产分区规则

    • 实施

      • 面试题:Kafka生产者怎么实现生产数据的负载均衡?

        • 生产数据的时候尽量保证相对均衡的分到Topic多个分区中

      • 问题:为什么生产数据的方式不同,分区的规则就不一样?

        - ProducerRecord(Topic,Value)//将所有数据写入某一个分区
        - ProducerRecord(Topic,Key,Value) //按照Key的Hash取余方式
        - ProducerRecord(Topic,Partition,Key,Value) //指定写入某个分区
        
      • 规则

        • 如果指定了分区:写入所指定的分区中

        • 如果没指定分区:默认调用的是DefaultPartitioner分区器中partition这个方法

          • 如果指定了Key:按照Key的Hash取余分区的个数,来写入对应的分区

          • 如果没有指定Key:按照黏性分区

            • 2.4之前:轮询分区

              • 优点:数据分配相对均衡

                Topic       part        key     value
                topic       0           1       itcast1
                topic       1           2       itcast2
                topic       2           3       itcast3
                topic       0           4       itcast4
                topic       1           5       itcast5
                topic       2           6       itcast6
                topic       0           7       itcast7
                topic       1           8       itcast8
                topic       2           9       itcast9
              • 缺点:性能非常差

                • Kafka生产者写入数据

                  • step1:先将数据放入一个批次中,判断是否达到条件,达到条件才将整个批次的数据写入kafka

                    • 批次满了【batch.size】

                    • 达到一定时间【linger.ms】

                  • step2:根据数据属于哪个分区,就与分区构建一个连接,发送这个分区的批次的数据

                    • 第一条数据:先构建0分区的连接,第二条不是0分区的,所以直接构建一个批次,发送第一条

                    • 第二条数据:先构建1分区的连接,第三条不是1分区的,所以直接构建一个批次,发送第二条

                    • ……

                    • 每条数据需要构建一个批次,9条数据,9个批次,每个批次一条数据

                • 批次多,每个批次数据量少,性能比较差

                • 希望:批次少,每个批次数据量多,性能比较好

            • 2.4之后:黏性分区

              • 设计:实现少批次多数据

              • 规则:判断缓存中是否有这个topic的分区连接,如果有,直接使用,如果没有随机写入一个分区,并且放入缓存

            • 第一次:将所有数据随机选择一个分区,全部写入这个分区中,将这次的分区编号放入缓存中

              bigdata01   1   37  null    itcast0
              bigdata01   1   38  null    itcast1
              bigdata01   1   39  null    itcast2
              bigdata01   1   40  null    itcast3
              bigdata01   1   41  null    itcast4
              bigdata01   1   42  null    itcast5
              bigdata01   1   43  null    itcast6
              bigdata01   1   44  null    itcast7
              bigdata01   1   45  null    itcast8
              bigdata01   1   46  null    itcast9
            • 第二次开始根据缓存中是否有上一次的编号

              • 有:直接使用上一次的编号

              • 没有:重新随机选择一个

    • 小结

      • Kafka中生产数据的分区规则是什么?

        • step1:先判断是否指定了分区

        • 如果指定了,就写入指定的分区

        • step2:再判断是否指定了Key

          • 如果指定了Key,按照Key的mur取余分区个数来决定

          • 如果没有指定Key,按照黏性分区

    自定义开发生产分区器

    • 实施

      • 开发一个随机分区器

        1. package bigdata.itcast.cn.kafka.userpart;
        2. import org.apache.kafka.clients.producer.Partitioner;
        3. import org.apache.kafka.common.Cluster;
        4. import java.util.Map;
        5. import java.util.Random;
        6. /**
        7. * @ClassName UserPartitioner
        8. * @Description TODO 用户自定义的分区器,实现用户自定义随机分区
        9. * @Date 2021/9/22 15:36
        10. * @Create By     Frank
        11. */
        12. public class UserPartitioner implements Partitioner {
        13.    /**
        14.     * 真正计算返回分区的方法
        15.     * @return
        16.     */
        17.    @Override
        18.    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        19.        //先获取分区个数
        20.        Integer count = cluster.partitionCountForTopic(topic);
        21.        //构建一个随机值
        22.        Random random = new Random();
        23.        //随机生成一个分区编号
        24.        int i = random.nextInt(count);
        25.        return i;
        26.   }
        27.    @Override
        28.    public void close() {
        29.        //释放资源方法
        30.   }
        31.    @Override
        32.    public void configure(Map configs) {
        33.        //获取配置方法
        34.   }
        35. }
      • 加载分区器

        //指定分区器
        props.put("partitioner.class","bigdata.itcast.cn.kafka.userpart.UserPartition");
      • 小结

      • 如何构建一个自定义分区器?

        • step2:生产者代码指定自定义分区器即可

        • step1:自己开发分区器

          • 开发一个类:实现Partitioner接口

          • 实现方法:partition

    消费分配策略

    • 基本规则
       一个分区只能被一个消费者组的一个消费者消费
      一个消费者可以消费多个分区
      最多存在和分区数相同数量的消费者
    • 分配策略
    • RangeAssignor:范围分配,默认的分配策略,如果能均分,则平均分配,如果不能均分,默认将多出来的分区分配给编号小的消费者
      优点:当Topic或分区个数比较少时,分配相对比较均衡
      缺点:如果Topic或分区个数比较多时,若不能均分,会导致负载不均衡的问题,一般不用
    • RoundRobinAssignor:轮询分配,常见于Kafka2.0之前的版本,按照Topic名称和分区编号排序,轮询分配每个消费者org.apache.kafka.clients.consumer.RoundRobinAssignor
      优点:如果有多个消费者,消费的Topic都是一样的,实现将所有Topic的所有分区轮询分配给所有消费者,尽量实现负载的均衡
      缺点:如果遇到消费者Topic订阅是不一致的,只能基于订阅的消费者进行轮询,导致整体消费者负载不均衡
       

    • StickyAssignor:黏性分配,2.0之后建议使用,类似于粘性分配,尽量将分区均衡的分配给消费者,底层是通过算法实现的,
      特点:相对的保证分配的均衡
                 如果某一个消费者故障,尽量的避免网络传输

    • 尽量保证原来的消费的分区不变,将多出来分区均衡给剩余的消费者

      • 假设原来有3个消费,消费6个分区,平均每个消费者消费2个分区

    • 如果有一个消费者故障了,这个消费者负责的分区交给剩下的消费者来做:消费重平衡
      优点:分配更加均衡,如果消费者出现故障,提高性能,避免重新分配,将多余的分区均衡的分配给剩余的消费者

    • org.apache.kafka.clients.consumer.StickyAssignor
      

    kafka的存储机制:存储结构

    • 分类
    1.  Broker:物理存储节点,用于存储Kafka中每个分区的数据
    2. Producer:生产者生产数据
    3. Topic:逻辑存储对象,用于区分不同数据的存储
    4. Partition:分布式存储单元,一个Topic可以划分多个分区,每个分区可以分布式存储在不同的Broker节点上
    5. Segment:分区端,每个分区的数据存储在一个或多个Segment中,每个Segment有一对文件组成
      .log
      .index
      segment命名规则:最小Offset
    • 小结
      kafka的存储结构:
      Producer->Broker->Consumer

    kafka的存储机制:写入过程

    • 问:kafka是如何写入的,为什么写入这么快
    • 实施:
    1. 生产者生产每一条数据,将数据放入一个batch批次中,如果batch满了或达到一定时间,提交写入请求
    2. 生产者根据分区规则构建数据分区,获取对应的元数据,将请求提交给leader副本所在的Broker

      一个Topic分区会有多个副本,值写入leader副本
      从元数据中获取当前这个分区对应的leader副本的位置,提交写入请求
      kafka元数据存储在ZK中
    3. 先写入这台Broker的PageCache(操作系统级别内存)中,Kafka也用了内存机制来实现数据的快速读写
      ​​​
      Redis:进程级别内存,数据会随着进程的关闭而释放
      Kafka:操作系统Page Cache
                    选用了操作系统自带的缓存区域:PageCache
                    由操作系统来管理所有内存,即使Kafka Broker故障,数据依旧存在PageCache中
    4. 操作系统的后台自动将页缓存中的数据SYNC同步到磁盘文件中:最新的Segment的.log中
      kafka通过操作系统内存刷新调用机制来实现:内存存储容量+时间
      顺序写磁盘:不断将每一条数据追加到.log文件中
    5. 其他的Follower到Leader中同步数据
    • 小结
      kafka是如何写入数据的:生产者生产数据,提交给kafka集群
      生产者先计算每条数据属于哪个分区
      • 会获取元数据,从元数据中获取这个分区的leader副本所在的broker的地址

      • 将请求提交给这个Broker

      • 先写PageCache:内存区域

      • 满足条件以后:将PageCache中的数据顺序写入磁盘中的文件

    • 写入很快?

      • PageCache:写内存

      • 顺序写:写磁盘

    Kafka存储机制:读取过程

    • 实施

      • step1:消费者根据Topic、Partition、Offset提交给Kafka请求读取数据

      • step2:Kafka根据元数据信息,找到对应的这个分区对应的Leader副本节点

      • step3:请求Leader副本所在的Broker,先读PageCache,通过零拷贝机制【Zero Copy】读取PageCache

        • 实现0磁盘读写

        • 直接将内存数据发送到网络端口,实现传输

      • step4:如果PageCache中没有,读取Segment文件段,先根据offset找到要读取的那个Segment

        • 先根据offset和segment文件段名称定位这个offset在哪个segment文件段中

      • step5:将.log文件对应的.index文件加载到内存中,根据.index中索引的信息找到Offset在.log文件中的最近位置

        • 最近位置:index中记录的稀疏索引【不是每一条数据都有索引】

      • step6:读取.log,根据索引读取对应Offset的数据

    • 小结

      • Kafka数据是如何被读取的?

        • step1:先读PageCache,如果有,通过Zero Copy【sendFile】机制来实现

        • step2:如果没有读这个分区的Segment

          • 先根据offset确定读取哪个Segment

          • 先index,再读.log

      • Kafka为什么写入很快?

        • PageCahce + 顺序写

      • Kafka为什么读取和快?

        • PageCache + 零拷贝

        • index索引机制 + offset

      • 终于知道Kafka为什么这么快了! - 知乎

    Kafka存储机制:index索引设计

    • 实施

      • 索引类型

        • 全量索引:每一条数据,都对应一条索引

          • index:201条

            0           0
            1           101
            2           202
            
          • .log:201条数据

            0           key1            value1
            1           key2            value2
            ……
            200         key201          value201
            
        • 稀疏索引:部分数据有索引,有一部分数据是没有索引的

          • index:10条

            0           0
            2           202
            9           1010
            ……
          • log:201条

            0           key1            value1
            1           key2            value2
            ……
            200         key201          value201
          • 优点:减少了索引存储的数据量加快索引的索引的检索效率

          • 什么时候生成一条索引?

            #.log文件每增加4096字节,在.index中增加一条索引
            log.index.interval.bytes=4096
          • Kafka中选择使用了稀疏索引

      • 索引内容

        • 两列

          • 第一列:这条数据在这个文件中的位置

          • 第二列:这条数据在文件中的物理偏移量

            是这个文件中的第几条,数据在这个文件中的物理位置
            1,0             --表示这个文件中的第一条,在文件中的位置是第0个字节开始
            3,497           --表示这个文件中的第三条,在文件中的位置是第497个字节开始
            
        • 这个文件中的第1条数据是这个分区中的第368770条数据,offset = 368769

      • 检索数据流程

        • step1:先根据offset计算这条offset是这个文件中的第几条

        • step2:读取.index索引,根据二分检索,从索引中找到离这条数据最近偏小的位置

        • step3:读取.log文件从最近位置读取到要查找的数据

      • 举例

        • 需求:查找offset = 368772

        • step1:计算是文件中的第几条

          368772 - 368769 = 3 + 1 = 4,是这个文件中的第四条数据
        • step2:读取.index索引,找到最近位置

          3,497
        • step3:读取.log,从497位置向后读取一条数据,得到offset = 368772的数据

      • 问题:为什么不直接将offset作为索引的第一列?

        • 用offset会导致index文件很大,而且比较费时

    • 小结

      • .index文件中的索引的内容是什么?

      • 第一列:这条数据在文件中的是第几条数据

      • 第二列:这条数据在文件中的物理偏移量

      • 查询数据时如何根据索引找到对应offset的数据?

        • step1:先根据offset定位Segment文件

        • step2:读取.index文件,找到最近的位置

        • 先计算你要的offset是这个文件第几条

          • offset - 文件最小offset + 1

          • 从index找到最近位置返回

        • step3:从最近位置开始向后读取

    Kafka数据清理规则

    • 实施

      • 属性配置

        #开启清理
        log.cleaner.enable = true
        #清理规则
        log.cleanup.policy = delete | compact
      • 清理规则:delete

        • 基于存活时间规则:最常用的方式

          log.retention.ms
          log.retention.minutes
          log.retention.hours=168/7天
        • 基于文件大小规则

          #删除文件阈值,如果整个数据文件大小,超过阈值的一个segment大小,将会删除最老的segment,-1表示不使用这种规则
          log.retention.bytes = -1
        • 基于offset消费规则

          • 功能:将明确已经消费的offset的数据删除

          • 如何判断已经消费到什么位置

            • step1:编写一个文件offset.json

              {
                "partitions":[
                   {"topic": "bigdata", "partition": 0,"offset": 2000},
                   {"topic": "bigdata", "partition": 1,"offset": 2000}
                 ],
                 "version":1
              }
            • step2:指定标记这个位置

              kafka-delete-records.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --offset-json-file offset.json 
      • 清理规则:compact

        • 功能:也称为压缩,将重复的更新数据的老版本删除,保留新版本,要求每条数据必须要有Key,根据Key来判断是否重复

    • 小结

      • Kafka用于实现实时消息队列的数据缓存,不需要永久性的存储数据,如何将过期数据进行清理?

        • delete方案:根据时间定期的清理过期的Segment文件

    Kafka分区副本概念:AR、ISR、OSR

    • 实施

      • 分区副本机制:每个kafka中分区都可以构建多个副本,相同分区的副本存储在不同的节点上

        • 为了保证安全和写的性能:划分了副本角色

        • leader副本:对外提供读写数据

        • follower副本:与Leader同步数据,如果leader故障,选举一个新的leader

        • 选举实现:Kafka主节点Controller实现

      • AR:All - Replicas

        • 所有副本:指的是一个分区在所有节点上的副本

          Partition: 0   Replicas: 1,0 
      • ISR:In - Sync - Replicas

        • 可用副本:所有正在与Leader同步的Follower副本

          Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0
        • 列表中:按照优先级排列【Controller根据副本同步状态以及Broker健康状态】,越靠前越会成为leader

        • 如果leader故障,是从ISR列表中选择新的leader

        • 如果生产者写入数据:ack=all,写入所有ISR列表中的副本,就返回ack

      • OSR:Out - Sync - Replicas

        • 不可用副本:与Leader副本的同步差距很大,成为一个OSR列表的不可用副本

        • 原因:网路故障等外部环境因素,某个副本与Leader副本的数据差异性很大

        • 判断是否是一个OSR副本?

          • 0.9之前:时间和数据差异

            replica.lag.time.max.ms = 10000   可用副本的同步超时时间
            replica.lag.max.messages = 4000   可用副本的同步记录差,该参数在0.9以后被删除
          • 0.9以后:只按照时间来判断

            replica.lag.time.max.ms = 10000   可用副本的同步超时时间
        • 写入、Leader选举:都只从ISR列表中选取

    • 小结

      • Kafka中的分区数据如何保证数据安全?

        • 分区副本机制

      • 什么是AR、ISR、OSR?

        • AR:所有副本

        • ISR:可用副本

        • OSR:不可用副本

        • AR = ISR + OSR

    Kafka数据同步概念:HW丶LEO 

    • 实施

      • 什么是HW、LEO?

        • 分区:3个副本

        • Leader:0 1 2 3 4 5 

          6 7 8

          • LEO:9

        • Follower1: 0 1 2 3 4 5

          • LEO:6

        • Follower2:0 1 2 3 4 5 6

          • LEO = 7

        • HW = 6

          • LW:low_watermark:最低消费的offset

          • HW:high_watermark:最高消费的offset

        • LEO:Log End Offset

        • LSO:Log StartOffset

        • HW:当前这个分区所有副本同步的最低位置+1,消费者能消费到的最大位置

        • LEO:当前每个副本已经写入数据的最新位置 + 1

          • 副本最小的LEO = HW

      • 数据写入Leader及同步过程

        • step1:数据写入分区的Leader副本

          • leader:LEO = 5

          • follower1:LEO = 3

          • follower2:LEO = 3

        • step2:Follower到Leader副本中同步数据

          • leader:LEO = 5

          • follower1:LEO = 5

          • follower2:LEO = 4

    • 小结

      • HW:所有副本都同步的位置+1,消费者可以消费到的位置

      • LEO:leader当前最新的位置+1

    消息队列的一次性语义

    • 实施
    1. at-most-once:至多一次
    2. at-least-once:至少一次
    3. exactly-once:有且仅有一次
    • 小结
      kafka从理论上可以实现Exactly Once
      大多数的消息队列一般不能满足ExactlyOnce就满足at-least-once

    Kafka保证消费一次性语义

    • 实施
      规则:消费者是根据offset来持续消费,只要保证任何场景下消费者都能知道上一次offset即可
      实现:将offset存储在一种可靠外部存储中华,手动管理offset
      第一步:第一次消费根据属性进行消费
      第二步:消费分区数据,处理分区数据
      第三步:处理成功,将处理成功的分区offset进行额外的存储。kafka默认存储_consumer_offsets,外部存储有Mysql,Redis,Zookeeper
      第四步:如果消费者故障,可以从外部存储读取上一次消费者消费的offset向kafka进行请求
    • 小结
      如何实现一次性语义?

      原则:必须按照offset顺序消费
      当消费并处理成功以后,自己保存offset
      默认方案:手动将offset提交到__consumer_offsets这个Topic中
      手动管理:自己进行额外存储,以后重启提交,读取外部系统中记录的offset,向Kafka提交

    Kafka集群常用配置

    • 实施

      • 集群配置:server.properties

        属性含义
        broker.idint类型Kafka服务端的唯一id,用于注册zookeeper,一般一台机器一个
        host.namehostname绑定该broker对应的机器地址
        port端口Kafka服务端端口:9092
        log.dirs目录kafka存放数据的路径
        zookeeper.connecthostname:2181/kafkadatazookeeper的地址
        zookeeper.session.timeout.ms6000zookeeper会话超时时间
        zookeeper.connection.timeout.ms6000zookeeper客户端连接超时时间
        num.partitions1分区的个数
        default.replication.factor1分区的副本数
        log.segment.bytes1073741824单个log文件的大小,默认1G生成一个
        log.index.interval.bytes4096log文件每隔多大生成一条index
        log.roll.hours168单个log文件生成的时间规则,默认7天一个log
        log.cleaner.enabletrue开启日志清理
        log.cleanup.policydelete,compact默认为delete,删除过期数据,compact为合并数据
        log.retention.minutes分钟值segment生成多少分钟后删除
        log.retention.hours小时值segment生成多少小时后删除【168】,7天
        log.retention.ms毫秒值segment生成多少毫秒后删除
        log.retention.bytes-1删除文件阈值,如果整个数据文件大小,超过阈值的一个segment大小,将会删除最老的segment,直到小于阈值
        log.retention.check.interval.ms毫秒值【5分钟】多长时间检查一次是否有数据要标记删除
        log.cleaner.delete.retention.ms毫秒值segment标记删除后多长时间删除
        log.cleaner.backoff.ms毫秒值多长时间检查一次是否有数据要删除
        log.flush.interval.messagesLong.MaxValue消息的条数达到阈值,将触发flush缓存到磁盘
        log.flush.interval.msLong.MaxValue隔多长时间将缓存数据写入磁盘
        auto.create.topics.enablefalse是否允许自动创建topic,不建议开启
        delete.topic.enabletrue允许删除topic
        replica.lag.time.max.ms10000可用副本的同步超时时间
        replica.lag.max.messages4000可用副本的同步记录差,该参数在0.9以后被删除
        unclean.leader.election.enabletrue允许不在ISR中的副本成为leader
        num.network.threads3接受客户端请求的线程数
        num.io.threads8处理读写硬盘的IO的线程数
        background.threads4后台处理的线程数,例如清理文件等
      • 生产配置:producer.properties

        属性含义
        bootstrap.servershostname:9092KafkaServer端地址
        poducer.typesync | async同步或者异步写入磁盘
        min.insync.replicas3最小ISR个数
        buffer.memory33554432配置生产者本地发送数据的缓存大小
        compression.typenone配置数据压缩,可配置snappy
        partitioner.classPartition指定分区的类
        acks1指定写入数据的保障方式
        request.timeout.ms10000等待ack确认的时间,超时发送失败
        retries3发送失败的重试次数
        batch.size16384批量发送的大小
        long.ms5000发送间隔时间
        metadata.max.age.ms300000更新缓存的元数据【topic、分区leader等】

      • 消费配置:consumer.properties

        属性含义
        bootstrap.servershostname:9092指定Kafka的server地址
        group.idid消费者组的 名称
        consumer.id自动分配消费者id
        auto.offset.resetlatest新的消费者从哪里读取数据latest,earliest
        auto.commit.enabletrue是否自动commit当前的offset
        auto.commit.interval.ms1000自动提交的时间间隔
    • 小结

      • 常用属性了解即可

    可视化工具Kafka Eagle部署及使用

    • 实施

      • Kafka Eagle的功能

        • 用于集成Kafka,实现Kafka集群可视化以及监控报表平台

      • Kafka Eagle的部署启动

        • 下载解压:以第三台机器为例

          cd /export/software/
          rz
          tar -zxvf kafka-eagle-bin-1.4.6.tar.gz -C /export/server/
          cd /export/server/kafka-eagle-bin-1.4.6/
          tar -zxf kafka-eagle-web-1.4.6-bin.tar.gz 
          
        • 修改配置

          • 准备数据库:存储eagle的元数据,在Mysql中创建一个数据库

            create database eagle;
          • 修改配置文件:

            cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/
            vim  conf/system-config.properties
            #配置zookeeper集群的名称
            kafka.eagle.zk.cluster.alias=cluster1
            #配置zookeeper集群的地址
            cluster1.zk.list=node1:2181,node2:2181,node3:2181
            #31行左右配置开启统计指标
            kafka.eagle.metrics.charts=true
            #配置连接MySQL的参数,并注释自带的sqlite数据库
            kafka.eagle.driver=com.mysql.jdbc.Driver
            kafka.eagle.url=jdbc:mysql://node1:3306/eagle
            kafka.eagle.username=root
            kafka.eagle.password=hadoop
        • 配置环境变量

          vim /etc/profile
          ​
          #KE_HOME
          export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
          export PATH=$PATH:$KE_HOME/bin
          ​
          source /etc/profile
        • 添加执行权限

          cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
          chmod u+x bin/ke.sh
        • 启动服务

          ke.sh start
        • 登陆

          网页:node3:8048/ke
          用户名:admin
          密码:123456
      • 常见问题:登录时可能会报405错误,网上说很可能是版本问题,可把我坑惨了,我换了三次版本,1.4.6,1.4.8,2.1.0,但实际上并不是版本问题。重要的少一点是sql数据库里面要有数据表并且有数据,回头想一下也确实,不然咋匹配呢。下面是我在网上找到脚本文件:
         
        1. CREATE TABLE IF NOT EXISTS `ke_alarm_crontab` (
        2. `id` BIGINT ( 20 ) NOT NULL,
        3. `type` VARCHAR ( 64 ) NOT NULL,
        4. `crontab` VARCHAR ( 32 ) DEFAULT NULL,
        5. `is_enable` VARCHAR ( 2 ) DEFAULT 'Y',
        6. `created` VARCHAR ( 32 ) DEFAULT NULL,
        7. `modify` VARCHAR ( 32 ) DEFAULT NULL,
        8. PRIMARY KEY ( `id`, `type` )
        9. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        10. CREATE TABLE IF NOT EXISTS `ke_metrics` (
        11. `cluster` VARCHAR ( 64 ) DEFAULT NULL,
        12. `broker` TEXT DEFAULT NULL,
        13. `type` VARCHAR ( 32 ) DEFAULT NULL,
        14. `key` VARCHAR ( 64 ) DEFAULT NULL,
        15. `value` VARCHAR ( 128 ) DEFAULT NULL,
        16. `timespan` BIGINT ( 20 ) DEFAULT NULL,
        17. `tm` VARCHAR ( 16 ) DEFAULT NULL
        18. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        19. CREATE TABLE IF NOT EXISTS `ke_p_role` (
        20. `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
        21. `name` VARCHAR ( 64 ) CHARACTER
        22. SET utf8 NOT NULL COMMENT 'role name',
        23. `seq` TINYINT ( 4 ) NOT NULL COMMENT 'rank',
        24. `description` VARCHAR ( 128 ) CHARACTER
        25. SET utf8 NOT NULL COMMENT 'role describe',
        26. PRIMARY KEY ( `id` )
        27. ) ENGINE = INNODB AUTO_INCREMENT = 4 DEFAULT CHARSET = utf8;
        28. CREATE TABLE IF NOT EXISTS `ke_alarm_clusters` (
        29. `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
        30. `type` VARCHAR ( 32 ) DEFAULT NULL,
        31. `cluster` VARCHAR ( 64 ) DEFAULT NULL,
        32. `server` TEXT DEFAULT NULL,
        33. `alarm_group` VARCHAR ( 128 ) DEFAULT NULL,
        34. `alarm_times` INT ( 11 ),
        35. `alarm_max_times` INT ( 11 ),
        36. `alarm_level` VARCHAR ( 4 ),
        37. `is_normal` VARCHAR ( 2 ) DEFAULT 'Y',
        38. `is_enable` VARCHAR ( 2 ) DEFAULT 'Y',
        39. `created` VARCHAR ( 32 ) DEFAULT NULL,
        40. `modify` VARCHAR ( 32 ) DEFAULT NULL,
        41. PRIMARY KEY ( `id` )
        42. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        43. CREATE TABLE IF NOT EXISTS `ke_alarm_consumer` (
        44. `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
        45. `cluster` VARCHAR ( 64 ) DEFAULT NULL,
        46. `group` VARCHAR ( 128 ) DEFAULT NULL,
        47. `topic` VARCHAR ( 128 ) DEFAULT NULL,
        48. `lag` BIGINT ( 20 ) DEFAULT NULL,
        49. `alarm_group` VARCHAR ( 128 ) DEFAULT NULL,
        50. `alarm_times` INT ( 11 ),
        51. `alarm_max_times` INT ( 11 ),
        52. `alarm_level` VARCHAR ( 4 ),
        53. `is_normal` VARCHAR ( 2 ) DEFAULT 'Y',
        54. `is_enable` VARCHAR ( 2 ) DEFAULT 'Y',
        55. `created` VARCHAR ( 32 ) DEFAULT NULL,
        56. `modify` VARCHAR ( 32 ) DEFAULT NULL,
        57. PRIMARY KEY ( `id` )
        58. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        59. CREATE TABLE IF NOT EXISTS `ke_connect_config` (
        60. `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
        61. `cluster` VARCHAR ( 64 ),
        62. `connect_uri` VARCHAR ( 128 ),
        63. `version` VARCHAR ( 32 ),
        64. `alive` VARCHAR ( 16 ),
        65. `created` VARCHAR ( 32 ) DEFAULT NULL,
        66. `modify` VARCHAR ( 32 ) DEFAULT NULL,
        67. PRIMARY KEY ( `id` )
        68. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        69. CREATE TABLE IF NOT EXISTS `ke_resources` (
        70. `resource_id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
        71. `name` VARCHAR ( 255 ) CHARACTER
        72. SET utf8 NOT NULL COMMENT 'resource name',
        73. `url` VARCHAR ( 255 ) NOT NULL,
        74. `parent_id` INT ( 11 ) NOT NULL,
        75. PRIMARY KEY ( `resource_id` )
        76. ) ENGINE = INNODB AUTO_INCREMENT = 17 DEFAULT CHARSET = utf8;
        77. CREATE TABLE IF NOT EXISTS `ke_metrics_offline` (
        78. `cluster` VARCHAR ( 64 ) NOT NULL,
        79. `key` VARCHAR ( 128 ) NOT NULL,
        80. `one` VARCHAR ( 128 ) DEFAULT NULL,
        81. `mean` VARCHAR ( 128 ) DEFAULT NULL,
        82. `five` VARCHAR ( 128 ) DEFAULT NULL,
        83. `fifteen` VARCHAR ( 128 ) DEFAULT NULL,
        84. PRIMARY KEY ( `cluster`, `key` )
        85. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        86. CREATE TABLE IF NOT EXISTS `ke_logsize` (
        87. `cluster` VARCHAR ( 64 ) DEFAULT NULL,
        88. `topic` VARCHAR ( 64 ) DEFAULT NULL,
        89. `logsize` BIGINT ( 20 ) DEFAULT NULL,
        90. `diffval` BIGINT ( 20 ) DEFAULT NULL,
        91. `timespan` BIGINT ( 20 ) DEFAULT NULL,
        92. `tm` VARCHAR ( 16 ) DEFAULT NULL
        93. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        94. CREATE TABLE IF NOT EXISTS `ke_consumer_group_summary` (
        95. `cluster` VARCHAR ( 64 ) NOT NULL,
        96. `group` VARCHAR ( 128 ) NOT NULL,
        97. `topic_number` VARCHAR ( 128 ) NOT NULL,
        98. `coordinator` VARCHAR ( 128 ) DEFAULT NULL,
        99. `active_topic` INT ( 11 ) DEFAULT NULL,
        100. `active_thread_total` INT ( 11 ) DEFAULT NULL,
        101. PRIMARY KEY ( `cluster`, `group` )
        102. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        103. CREATE TABLE IF NOT EXISTS `ke_consumer_bscreen` (
        104. `cluster` VARCHAR ( 64 ) DEFAULT NULL,
        105. `group` VARCHAR ( 128 ) DEFAULT NULL,
        106. `topic` VARCHAR ( 64 ) DEFAULT NULL,
        107. `logsize` BIGINT ( 20 ) DEFAULT NULL,
        108. `difflogsize` BIGINT ( 20 ) DEFAULT NULL,
        109. `offsets` BIGINT ( 20 ) DEFAULT NULL,
        110. `diffoffsets` BIGINT ( 20 ) DEFAULT NULL,
        111. `lag` BIGINT ( 20 ) DEFAULT NULL,
        112. `timespan` BIGINT ( 20 ) DEFAULT NULL,
        113. `tm` VARCHAR ( 16 ) DEFAULT NULL
        114. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        115. CREATE TABLE IF NOT EXISTS `ke_users` (
        116. `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
        117. `rtxno` INT ( 11 ) NOT NULL,
        118. `username` VARCHAR ( 64 ) NOT NULL,
        119. `password` VARCHAR ( 128 ) NOT NULL,
        120. `email` VARCHAR ( 64 ) NOT NULL,
        121. `realname` VARCHAR ( 128 ) NOT NULL,
        122. PRIMARY KEY ( `id` )
        123. ) ENGINE = INNODB AUTO_INCREMENT = 2 DEFAULT CHARSET = utf8;
        124. CREATE TABLE IF NOT EXISTS `ke_sql_history` (
        125. `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
        126. `cluster` VARCHAR ( 64 ) DEFAULT NULL,
        127. `username` VARCHAR ( 64 ) DEFAULT NULL,
        128. `host` VARCHAR ( 128 ) DEFAULT NULL,
        129. `ksql` TEXT DEFAULT NULL,
        130. `status` VARCHAR ( 16 ) DEFAULT NULL,
        131. `spend_time` BIGINT ( 20 ) DEFAULT NULL,
        132. `created` VARCHAR ( 32 ) DEFAULT NULL,
        133. `tm` VARCHAR ( 16 ) DEFAULT NULL,
        134. PRIMARY KEY ( `id` )
        135. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        136. CREATE TABLE IF NOT EXISTS `ke_consumer_group` (
        137. `cluster` VARCHAR ( 64 ) NOT NULL,
        138. `group` VARCHAR ( 128 ) NOT NULL,
        139. `topic` VARCHAR ( 128 ) NOT NULL,
        140. `status` INT ( 11 ) DEFAULT NULL,
        141. PRIMARY KEY ( `cluster`, `group`, `topic` )
        142. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        143. CREATE TABLE IF NOT EXISTS `ke_alarm_config` (
        144. `cluster` VARCHAR ( 64 ) NOT NULL,
        145. `alarm_group` VARCHAR ( 128 ) NOT NULL,
        146. `alarm_type` VARCHAR ( 16 ) DEFAULT NULL,
        147. `alarm_url` TEXT DEFAULT NULL,
        148. `http_method` VARCHAR ( 16 ) DEFAULT NULL,
        149. `alarm_address` TEXT DEFAULT NULL,
        150. `created` VARCHAR ( 32 ) DEFAULT NULL,
        151. `modify` VARCHAR ( 32 ) DEFAULT NULL,
        152. PRIMARY KEY ( `cluster`, `alarm_group` )
        153. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        154. CREATE TABLE IF NOT EXISTS `ke_user_role` (
        155. `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
        156. `user_id` INT ( 11 ) NOT NULL,
        157. `role_id` TINYINT ( 4 ) NOT NULL,
        158. PRIMARY KEY ( `id` )
        159. ) ENGINE = INNODB AUTO_INCREMENT = 2 DEFAULT CHARSET = utf8;
        160. CREATE TABLE IF NOT EXISTS `ke_role_resource` (
        161. `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
        162. `role_id` INT ( 11 ) NOT NULL,
        163. `resource_id` INT ( 11 ) NOT NULL,
        164. PRIMARY KEY ( `id` )
        165. ) ENGINE = INNODB AUTO_INCREMENT = 19 DEFAULT CHARSET = utf8;
        166. ALTER TABLE `ke_consumer_bscreen` ADD INDEX `idx_timespan` ( `timespan` );
        167. ALTER TABLE `ke_logsize` ADD INDEX `idx_timespan` ( `timespan` );
        168. ALTER TABLE `ke_logsize` ADD INDEX `idx_tm_topic` ( `tm`, `topic` );
        169. ALTER TABLE `ke_logsize` ADD INDEX `idx_tm_cluster_diffval` ( `tm`, `cluster`, `diffval` );
        170. ALTER TABLE `ke_consumer_bscreen` ADD INDEX`eagle` `idx_tm_cluster_diffoffsets` ( `tm`, `cluster`, `diffoffsets` );
        171. INSERT INTO `ke_users`
        172. VALUES
        173. ( '1', '1000', 'admin', '123456', 'admin@email.com', 'Administrator' );;
        174. CREATE TABLE IF NOT EXISTS `ke_topic_rank` (
        175. `cluster` VARCHAR ( 64 ) NOT NULL,
        176. `topic` VARCHAR ( 64 ) NOT NULL,
        177. `tkey` VARCHAR ( 64 ) NOT NULL,
        178. `tvalue` BIGINT ( 20 ) DEFAULT NULL,
        179. PRIMARY KEY ( `cluster`, `topic`, `tkey` )
        180. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
        181. INSERT INTO `ke_user_role`
        182. VALUES
        183. ( '1', '1', '1' );;
        184. INSERT INTO `ke_p_role`
        185. VALUES
        186. ( '1', 'Administrator', '1', 'Have all permissions' ),
        187. ( '2', 'Devs', '2', 'Own add or delete' ),
        188. ( '3', 'Tourist', '3', 'Only viewer' );
        189. INSERT INTO `ke_role_resource`
        190. VALUES
        191. ( '1', '1', '1' ),
        192. ( '2', '1', '2' ),
        193. ( '3', '1', '3' ),
        194. ( '4', '1', '4' ),
        195. ( '5', '1', '5' ),
        196. ( '6', '1', '7' ),
        197. ( '7', '1', '8' ),
        198. ( '8', '1', '10' ),
        199. ( '9', '1', '11' ),
        200. ( '10', '1', '13' ),
        201. ( '11', '2', '7' ),
        202. ( '12', '2', '8' ),
        203. ( '13', '2', '13' ),
        204. ( '14', '2', '10' ),
        205. ( '15', '2', '11' ),
        206. ( '16', '1', '14' ),
        207. ( '17', '1', '15' ),
        208. ( '18', '1', '16' ),
        209. ( '19', '1', '18' ),
        210. ( '20', '1', '19' ),
        211. ( '21', '1', '20' ),
        212. ( '22', '1', '21' ),
        213. ( '23', '1', '22' ),
        214. ( '24', '1', '23' ),
        215. ( '25', '1', '24' );
        216. INSERT INTO `ke_resources`VALUES
        217. ( '1', 'System', '/system', '-1' ),
        218. ( '2', 'User', '/system/user', '1' ),
        219. ( '3', 'Role', '/system/role', '1' ),
        220. ( '4', 'Resource', '/system/resource', '1' ),
        221. ( '5', 'Notice', '/system/notice', '1' ),
        222. ( '6', 'Topic', '/topic', '-1' ),
        223. ( '7', 'Message', '/topic/message', '6' ),
        224. ( '8', 'Create', '/topic/create', '6' ),
        225. ( '9', 'Alarm', '/alarm', '-1' ),
        226. ( '10', 'Add', '/alarm/add', '9' ),
        227. ( '11', 'Modify', '/alarm/modify', '9' ),
        228. ( '12', 'Cluster', '/cluster', '-1' ),
        229. ( '13', 'ZkCli', '/cluster/zkcli', '12' ),
        230. ( '14', 'UserDelete', '/system/user/delete', '1' ),
        231. ( '15', 'UserModify', '/system/user/modify', '1' ),
        232. ( '16', 'Mock', '/topic/mock', '6' ),
        233. ( '18', 'Create', '/alarm/create', '9' ),
        234. ( '19', 'History', '/alarm/history', '9' ),
        235. ( '20', 'Manager', '/topic/manager', '6' ),
        236. ( '21', 'PasswdReset', '/system/user/reset', '1' ),
        237. ( '22', 'Config', '/alarm/config', '9' ),
        238. ( '23', 'List', '/alarm/list', '9' ),
        239. ( '24', 'Hub', '/topic/hub', '6' );

      • Kafka Eagle使用

        • 监控Kafka集群

        • 监控Zookeeper集群

        • 监控Topic

        • 查看数据积压

          • 现象:消费跟不上生产速度,导致处理的延迟

          • 原因

            • 消费者组的并发能力不够

            • 消费者处理失败

            • 网络故障,导致数据传输较慢

          • 解决

            • 提高消费者组中消费者的并行度

            • 分析处理失败的原因

            • 找到网络故障的原因

            • 查看监控

        • 报表

    • 小结

      • Kafka中最常用的监控工具

      • 用于查看集群信息、管理集群、监控集群

  • 相关阅读:
    命令模式
    ccs无法进入main函数
    .NET高级技术_03反射和Attribute
    1.6 前后端身份认证
    计算结构体大小(内存对齐原则)struct、union、class
    npm安装依赖过慢
    ST/意法STTH30ST06-Y车规FRD,原厂渠道ASEMI代理
    知识图谱-KGE-双线性模型-2015:DistMult
    【学习笔记】《Python深度学习》第六章:深度学习用于文本和序列
    数据结构--树
  • 原文地址:https://blog.csdn.net/m0_73745224/article/details/136014483