了解什么是kafka之前,首先要了解一下什么是消息队列
定义
官方定义:消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
简单点说:消息队列MQ用于实现两个系统之间或者两个模块之间传递消息数据时,实现数据缓存
功能
基于队列的方式,实现消息传递的数据缓存【长久】
队列的特点:顺序:先进先出
应用场景
用于所有需要实现实时、高性能、高吞吐、高可靠的消息传递架构中
大数据应用中:作为唯一的实时数据存储平台
实时数据采集:生产写入Kafka
实时数据处理:消费读取Kafka
优点
实现了架构解耦
需求:C也需要A的数据
如果不构建消息队列:A =》 B
停止A,修改A的代码,添加发送C的代码
高耦合度的
如果构建了消息队列:A =》 MQ 《= B
直接让C从MQ中取即可
低耦合度的
保证了最终一致性
最终可以保证实现最初的需求
实现异步,提供传输性能
A给B和C都发一份数据
不做消息队列
A发送:10s
B接受:1000s
C接受:1000s
总共:2020s
做了消息队列
A:10s
B和C并行接受:1000s
总共:1010s
限流削峰:合理根据成本来控制资源
缺点
增加了消息队列,架构运维更加复杂
必须保证消息队列是高可靠的
如果消息队列故障,整个所有系统都瘫痪了
| 保证消息队列即使机器出现故障,消息队列也能正常运行 =》 分布式
数据保证更加复杂,必须保证生产安全和消费安全
数据安全
数据在传输过程中:不丢失、不重复
小结
消息队列的功能、特点是什么?
功能:实现两个系统或者两个模块之间的数据缓存,解决高并发读写
优点:架构解耦、异步模式能提高并发性能
缺点:架构和安全维护更加麻烦
同步的概念
流程
step1:用户提交请求
step2:后台处理请求
step3:将处理的结果返回给用户,用户继续下一步操作
特点:用户看到的结果就是我处理好的结果,等待看到结果以后,进行下一步
场景:去银行存钱、转账等,必须看到真正处理的结果才能表示成功,实现立即一致性
优点:结果肯定是准确的
缺点:性能问题
异步的概念
流程
step1:用户提交请求
step2:后台将请求放入消息队列,等待处理,返回给用户一个临时结果,用户不管这次的结果是什么,直接进行下一步
step3:用户看到临时的结果,真正的请求在后台等待处理
特点:用户看到的结果并不是我们已经处理的结果
场景:用户暂时不需要关心真正处理结果的场景下,只要保证这个最终结果是用户想要的结果即可,实现最终一致性
优点:性能更高
缺点:可能结果误差
数据传递的同步与异步
A给B发送消息:基于UDP协议
A首先给B发送一条数据
A不管B有没有收到,继续发送下一条
优点:快
缺点:数据容易丢失
异步过程
A给B发送消息:基于TCP协议
A首先给B发送一条数据
A会等待B告诉A收到了这条消息,A才会发送下一条
优点:安全
缺点:性能相对差一些
同步过程
小结
同步:提交和处理是同步操作,立即就能看到结果,立即一致性
异步:提交和处理是异步操作,最终得到一个处理的结果,最终一致性
实施
角色
生产者:负责往消息队列中写数据的
消息队列:负责缓存传递的数据
消费者:负责从消息队列中读取数据的
流程
step1:生产者要往消息队列中写数据
step2:消费者从消息队列中读数据
step3:消费者消费成功以后,会返回一个确认ack给消息队列,消息队列会将消费成功的数据删除
小结
特点:数据只能被一个消费者使用,消费成功以后数据就会被删除,无法实现消费数据的共享
实施
角色
生产者
消息队列
消费者
Topic:主题,用于划分存储不同业务的数据
流程
step1:生产者往消息队列中生产数据,将数据写入对应的主题中
step2:消费者可以订阅主题,如果主题中出现新的数据,消费就可以立即消费
特点:一个消费者可以订阅多个主题,一个主题可以被多个消费者订阅
消费成功以后,不会立即主动删除数据
可以实现数据共享
小结
什么是发布订阅模式?
发布:生产者不断将最新的数据生产写入消息队列的主题中
订阅:消费者只要订阅了主题,就能立即获取最新的数据
类似于微信公众号
实施
官网:kafka.apache.org
领英公司基于Scala语言开发的工具
Scala语言:基于JVM之上的语言
val inputRdd = sc.textFile("new Path")
val wcRdd = inputRdd
.flatMap(_.trim.split(" "))
.map((_,1))
.reduceByKey(_+_)
wcRdd.saveAsTextFile
功能
分布式流式数据实时存储:分布式存储
实时消息队列存储,工作中主要使用的功能
分布式流式计算:分布式计算:KafkaStream
这个功能一般不用
定义
分布式的基于订阅发布模式的高吞吐高性能的实时消息队列系统
应用场景
实时场景
目前:只要做实时大数据,都必用Kafka
离线数据仓库:Hive
实时数据仓库:Kafka
Kafka生产者:数据采集的工具
Kafka消费者:实时计算的程序
特点
高性能:实时的对数据进行实时读写
Kafka也使用内存
高并发:分布式并行读写
分布式架构
高吞吐:使用分布式磁盘存储
Kafka也基于磁盘
高可靠:分布式主从架构
高安全性:数据安全保障机制
内存 + 磁盘:副本
这个内存非常特殊:操作系统级别,即使Kafka服务故障,数据依旧存在,只有机器故障才受影响
高灵活性:根据需求,随意添加生产者和消费者
异步模式
小结
Kafka在大数据中专门用于实现实时的数据存储,实现大数据实时计算
存储结构
MySQL:数据库、表、行数据【列】
HDFS:目录、文件 / 块、行数据
Redis:数据库、分片【小集群】、KV
实施
Broker:Kafka是一个分布式集群,多台机器构成,每台Kafka的节点就是一个Broker
Producer:生产者
负责将数据写入Kafka中,工作中一般生成都是数据采集工具
本质:==Kafka写入数据的客户端==
Kafka的每条数据格式:KV格式
Consumer:消费者
负责从Kafka中消费数据
本质:==Kafka读取数据的客户端==
消费数据:主要消费的数据是V
Consumer Group:==Kafka中必须以消费者组的形式从Kafka中消费数据==
消费者组到kafka消费数据
任何一个消费者必须属于某一个消费者组
一个消费者组中可以有多个消费者:多个消费者共同并行消费数据,提高消费性能
消费者组中多个消费者消费的数据是不一样的
整个消费者组中所有消费者消费的数据加在一起是一份完整的数据
小结
生产者 =》 Kafka 集群【多个Broker】 《= 消费者组【消费者】
实施
Topic:数据主题,用于区分不同的数据,对数据进行分类
类似于MySQL中会将数据划分到不同的表:不同的数据存储在不同的表中
Kafka是分布式存储
Topic就是分布式的概念:一个Topic可以划分多个分区Partition,每个不同分区存储在不同的Kafka节点上
写入Topic的数据实现分布式存储
问题:生产者写入一条KV结构数据,这条数据写入这个Topic的哪个分区由分区规则来决定,分区规则是什么呢?
有多种分区规则:不同场景对应的分区规则不一样
Partition:数据分区,用于实现Topic的分布式存储,对Topic的数据进行划分
每个分区存储在不同的Kafka节点Broker上
写入Topic:根据一定的规则决定写入哪个具体的分区
小结
什么是Topic,什么是Partition?
Topic:类似于数据库或者表的概念,用于对数据进行分类,不同业务的数据放入不同Topic
Kafka的存储是分布式存储
数据都是读写Topic
Topic就是分布式存储
Partition:一个Topic可以划分多个Partition,写入Topic的数据可以存储在不同的Partition中
不同Partition可以存储在不同的Kafka节点上
实施
问题1:Kafka中的每个Topic的每个分区存储在不同的节点上,如果某个节点故障,怎么保证集群正常可用?
Kafka选用了==副本机制==来保证数据的安全性
如果某台机器故障,其他机器还有这个分区的副本,其他机器的副本照样可以对外提供客户端读写
Kafka每一个分区都可以有多个副本
类似于HDFS的副本机制,一个块构建多个副本
注意:Kafka中一个分区的副本个数最多只能等于机器的个数,相同分区的副本不允许放在同一台机器,没有意义
问题2:一个分区有多个副本,读写这个分区的数据时候,到底读写哪个分区副本呢?
Kafka将一个分区的多个副本,划分为两种角色
Leader副本:负责对外提供读写
生产者和消费者只对leader副本进行读写
Follower副本
与Leader同步数据
如果leader故障,从follower新的leader副本对外提供读写
小结
Kafka怎么保证分区数据安全?
副本机制,一个分区可以有多个副本,相同分区的副本不能存储在同一台机器
Kakfa如何决定分区副本的读写?
每个分区的副本划分为两种角色
leader:对外提供读写
follower:与Leader同步数据,如果Leader故障,从Follower选举一个新的Leader
定义:对每个分区的数据进行了更细的划分,先写入的数据会先生成一对Segment文件,存储到一定条件以后,后面数据写入另外一对Segment文件,每个文件就叫Segment文件对
内容:每个Segment对应一对【两个】文件
xxxxxxxxx.log:存储数据
xxxxxxxxx.index:对应.log的文件的数据索引
设计:为了加快数据检索的效率,将数据按照规则写入不同文件,以后可以根据规则快速的定位数据所在的文件,读取对应的小的segment文件,不用读取所有数据文件
举例
如果分区第一次写入数据,会产生第一个segment
00000000000000000000000.log 00000000000000000000000.index 00000000000000000000000.timeindex
当文件越来越大,存储的数据越来越多,影响读的性能,会再构建一个新的segment,老的segment不再被写入
00000000000000000000000.log 00000000000000000000000.index 00000000000000000000000.timeindex 00000000000000000199999.log 00000000000000000199999.index 00000000000000000199999.timeindex 00000000000002000000000.log 00000000000002000000000.index 00000000000002000000000.timeindex
Segment文件的名字就是这个Segment记录的offset的最小值
消费者消费数据是根据offset进行消费的
消费者1:想消费分区1:39999这个offset开始消费
先根据文件文件名来判断我要的offset在哪个文件中
小结
什么是Segment?
功能:对分区内部的数据进行了划分
规则:先写入的数据先写入第一个Segment,达到一定条件,数据就写入新的Segment对中
实现:每个Segment中包含两种文件
xxxxxx.log:数据
xxxxxx.index:对应.log文件的索引
设计:为了加快查询效率
先写入的offset就越小
第一条数据的offset就为0
第二条数据的offset就为1
……
消息队列:先进先出
写入分区的顺序就是offset偏移量,==Offset是分区级别的==,每个分区的offset独立管理,都从0开始
生成:生产者往Kafka中写入数据,写入某个分区
每个分区单独管理一套Offset【分区】,offset从0开始对每条数据进行编号
Kafka写入数据也是按照KV来写入数据
#Kafka中一条数据存储的结构 offset Key Value
功能:基于offset来指定数据的顺序,消费时候按照offset顺序来读取
消费者消费Topic分区中的数据是按照offset进行顺序消费的
怎么保证不丢失不重复:只要保证消费者每次按照offset的顺序消费即可
如果没有Offset
从头取一遍:数据重复
从最新的去:数据丢失
小结
Offset用于标记分区中的每条数据,消费者根据上一次消费的offset对分区继续进行消费,保证顺序
实现保证数据不丢失不重复
目标:了解Kafka集群架构及角色功能
路径
Kafka集群有哪些角色?
Kafka每个角色的功能是什么?
Zookeeper在架构中的作用是什么?
架构角色
Kafka:分布式主从架构,实现消息队列的构建
Zookeeper:辅助选举Controller、元数据存储
Kafka中的每个角色以及对应的功能
分布式主从架构
节点:Broker
进程:Kafka
主:Kafka ==Controller==
是一种特殊的Broker,从所有Broker中选举出来的,负责普通Broker的工作
负责管理所有从节点:Topic、分区和副本
每次启动集群,会从所有Broker中选举一个Controller,由ZK实现
从:Kafka Broker
对外提供读写请求
其他的Broker监听Controller,如果Controller故障,会重新从Broker选举一个新的Controller
ZK的功能
辅助选举Controller
存储元数据
小结
kafka是一个主从架构,整体对外提供分布式读写
ZK主要负责选举Controller和实现元数据存储
目标:实现Kafka分布式集群的搭建部署
路径
step1:选择版本
step2:下载解压安装
step:3:修改配置文件
实施
版本的选型
0.8.x:老的版本,很多的问题
0.10.x +:消息功能上基本没有问题
选择:kafka_2.12-2.4.1.tgz
Kafka:2.4.1
Scala:2.12,Kafka是由Scala语言开发
下载解压安装
上传到第一台机器
cd /export/software/ rz
解压
tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/ cd /export/server/kafka_2.12-2.4.1/ mkdir logs
bin:一般用于存放客户端操作命令脚本
sbin:一般用于存放集群的启动和关闭的命令脚本,如果没有这个命令,脚本放在bin目录中
conf/etc/config:配置文件目录
lib:jar包的存放目录
logs:一般用于存放服务日志
修改配置
切换到配置文件目录
cd /export/server/kafka_2.12-2.4.1/config
修改server.properties
- #21行:唯一的 服务端id
- broker.id=0
- #60行:指定kafka的日志及数据【segment【.log,.index】】存储的位置
- log.dirs=/export/servers/kafka_2.12-2.4.1/logs
- #123行:指定zookeeper的地址
- zookeeper.connect=node01:2181,node02:2181,node03:2181
- #在最后添加两个配置,允许删除topic,当前kafkaServer的主机名
- delete.topic.enable=true
- host.name=node01
分发
cd /export/servers/ scp -r kafka_2.12-2.4.1 node02:$PWD scp -r kafka_2.12-2.4.1 node03:$PWD
第二台:server.properties
- #21行:唯一的 服务端id
- broker.id=1
- #最后
- host.name=node02
第三台:server.properties
- #21行:唯一的 服务端id
- broker.id=2
- #最后
- host.name=node03
添加环境变量
vim /etc/profile
- #KAFKA_HOME
- export KAFKA_HOME=/export/servers/kafka_2.12-2.4.1
- export PATH=:$PATH:$KAFKA_HOME/bin
source /etc/profile
小结
按照笔记一步步来,不做过多要求,只要配置含义,实现安装即可
解压安装
修改配置:server.properties
目标:掌握kafka集群的启动与关闭命令及脚本封装
路径
step1:如何启动Kafka集群?
step2:如何关闭Kafka集群?
step3:如何封装启动及关闭脚本?
实施
启动Zookeeper
start-zk-all.sh
启动Kafka
bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 & >>/dev/null 2>&1 &:在后台运行
关闭Kafka
bin/kafka-server-stop.sh
封装Kafka脚本
这里我封装的也不太好,有时间可以去网上搜一下详解小结
启动:kafka-server-start.sh
关闭:kafka-server-stop.sh
目标:掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic
路径
实施
创建Topic
bin/kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node01:9092,node02:9092,node03:9092
--create:创建
--topic:指定名称
--partitions :分区个数
--replication-factor:分区的副本个数
--bootstrap-server:指定Kafka服务端地址
--list:列举
列举Topic
bin/kafka-topics.sh --list -bootstrap-server node01:9092,node02:9092,node03:9092
小结
掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic
实施
bin/kafka-topics.sh --delete --topic bigdata02 --bootstrap-server node01:9092,node02:9092,node03:9092
查看Topic信息
bin/kafka-topics.sh --describe --topic bigdata01 --bootstrap-server node01:9092,node02:9092,node03:9092
结果
Topic: bigdata01 PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: bigdata01 Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: bigdata01 Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: bigdata01 Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0
每个分区都有一个唯一的标号:从0开始
怎么唯一标识一个分区:Topic名称+分区编号
Leader:这个分区的Leader副本所在的Broker id
Replicas:这个分区的所有副本所在的Broker的id
ISR:in -sync -replicas:可用副本
删除Topic
kafka-topics.sh --delete --topic bigdata02 --partitions 3 --replication-factor 2 --bootstrap-server node01:9092,node02:9092,node03:9092
小结
查看信息:describe
删除:delete
实施
Console生产者
bin/kafka-console-producer.sh --topic bigdata01 --broker-list node01:9092,node02:9092,node03:9092
Console消费者
bin/kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning
--from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费
默认从最新位置开始消费
--from-beginning:从最早的位置开始消费
小结
只要生产者不断生产,消费就能实时的消费到Topic中的数据
实施
安装Kafka Tool:不断下一步即可
构建集群连接:连接Kafka集群
查看集群信息
小结
可视化工具,界面或者交互性不是很友好
后面会学习:Kafka Eagle
实施
创建Topic
bin/kafka-topics.sh --create --topic bigdata --partitions 2 --replication-factor 2 --bootstrap-server node01:9092,node02:9092,node03:9092
生产测试
kafka-producer-perf-test.sh --topic bigdata --num-records 1000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node01:9092,node02:9092,node03:9092 acks=1
--num-records:写入数据的条数
--throughput:是否做限制,-1表示不限制
--record-size:每条数据的字节大小
消费测试
kafka-consumer-perf-test.sh --topic bigdata --broker-list node01:9092,node02:9092,node03:9092 --fetch-size 1048576 --messages 1000000
小结
工作中一般根据实际的需求来调整参数,测试kafka集群的最高性能,判断是否能满足需求
实施
命令行使用Kafka
一般只用于topic的管理:创建、删除
大数据架构中使用Kafka
Java API:构建生产者和消费者
工作中一般不用自己开发生产者
生产者:数据采集工具
Flume:Kafka sink
配置kafka集群地址
Topic的名称
消费者:实时计算程序
SparkStream:KafkaUtil
KafkaUtil.createDirectStream(Kafka集群地址,消费的Topic)
这些软件的API已经将Kafka生产者和消费者的API封装了,只要调用即可
==重点掌握:用到哪些类和方法==
Kafka的API的分类
High Level API:高级API
基于了SimpleAPI做了封装,让用户开发更加方便
但是由于封装了底层的API,有很多的东西不能控制,无法控制数据安全
Offset自动存储Zookeeper中,不用自己管理
==Simple API:简单API==
并不简单,最原始的API
自定义控制所有消费和生产、保证数据安全
小结
大数据工作中一般不自己开发Java API:掌握类和方法即可
只使用Simple API来实现开发
实施
- package bigdata.itcast.cn.kafka.producer;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.util.Properties;
-
- public class KafkaProducerClient {
- public static void main(String[] args) {
- //todo:1 构建Kafka生产者连接对象
- //构建连接配置
- Properties props=new Properties();
- //指定服务端地址
- props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9093");
- /**
- * 问题:生产者怎么保证生产数据不丢失? ack机制 + 重试机制
- * ack:数据传输的确认码,用于定义生产者如何将数据写入Kafka
- * 0:生产者发送一条数据写入Kafka,不管Kafka有没有写入这条数据,都直接发送下一条【快,不安全,不用的】
- * 1:中和性方案,生产者发送一条数据写入Kafka,Kafka将这条数据写入对应分区Leader副本,就返回一个ack,生产者收到ack,发送下一条
- * 【性能和安全性之间做了权衡】
- * all/-1:生产者发送一条数据写入Kafka,Kafka将这条数据写入对应分区Leader副本,并且等待所有ISR:Follower同步成功,就返回一个ack,生产者收到ack,发送下一条
- * 【安全,慢】
- * 如果ack为1或者为all,生产者没有收到ack,就认为数据丢失,重新发送这条数据
- **/
- props.put("acks", "all");
- //指定写入Kafka中的KV的序列化的类
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- //构建生产者对象
- KafkaProducer
producer = new KafkaProducer<>(props); - //todo:2-调用连接对象的方法实现生产数据
- for (int i = 0; i < 10; i++){
- //ProducerRecord:生产者的数据对象,用于send发送对象写入Kafka:Topic、Key、Value
- // producer.send(new ProducerRecord
("bigdata01", i+"", "itcast"+i)); -
- //ProducerRecord:生产者的数据对象,用于send发送对象写入Kafka:Topic、Value
- //producer.send(new ProducerRecord
("bigdata01", "itcast"+i)); -
- //ProducerRecord:生产者的数据对象,用于send发送对象写入Kafka:Topic、Partition、Key、Value
- producer.send(new ProducerRecord
("bigdata01", 0,i+"", "itcast"+i)); - }
- producer.close();
- }
- }
小结
掌握具体的类和方法?
Properties:配置类
KafkaProducer:生产者类
send(数据对象):负责生产数据到Kafka中
ProducerRecord:数据类对象
Topic、Key、Value:类似于Hash取余
Topic、Value:将所有数据写入了某一个分区
Topic、Partition、Key、Value:将所有数据写入指定的分区
实施
- package bigdata.itcast.cn.kafka.consumer;
-
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
-
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.Properties;
-
- public class KafkaConsumerClient {
- public static void main(String[] args) {
- //todo:1 构建Kafka消费者连接对象
- Properties props =new Properties();
- //指定服务端地址
- props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
- //指定当前消费者属于哪个组
- props.setProperty("group.id", "test01");
- //开启自动提交
- props.setProperty("enable.auto.commit", "true");
- //自动提交的时间间隔
- props.setProperty("auto.commit.interval.ms", "1000");
- //读取数据对KV进行反序列化
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- //构建消费者对象
- KafkaConsumer
consumer = new KafkaConsumer<>(props); - //todo:2-消费数据
- //先订阅Topic
- consumer.subscribe(Arrays.asList("bigdata01"));
- //再消费Topic
- while (true) {
- //step1:拉取订阅的Topic中的数据,100ms是一个超时时间
- //ConsumerRecords:存储的是本次拉取的所有数据
- ConsumerRecords
records = consumer.poll(Duration.ofMillis(100)); - //处理数据
- //ConsumerRecord:消费到的每一条数据对象
- for (ConsumerRecord
record : records){ - //Topic
- String topic = record.topic();
- //part
- int part = record.partition();
- //offset
- long offset = record.offset();
- //Key And Value
- String key = record.key();
- String value = record.value();
- //模拟处理:输出
- System.out.println(topic+"\t"+part+"\t"+offset+"\t"+key+"\t"+value);
- }
- }
- }
- }
小结
掌握具体的类和方法?
Properties:配置对象
KafkaConsumer:消费者对象
subscribe:订阅Topic
poll:拉取Topic数据
ConsumerRecords:拉取到消费所有数据的集合
ConsumerRecord:每一条消费到的数据对象
topic、partition、offset、key、value
实施
面试题:Kafka生产者怎么实现生产数据的负载均衡?
生产数据的时候尽量保证相对均衡的分到Topic多个分区中
问题:为什么生产数据的方式不同,分区的规则就不一样?
- ProducerRecord(Topic,Value)//将所有数据写入某一个分区 - ProducerRecord(Topic,Key,Value) //按照Key的Hash取余方式 - ProducerRecord(Topic,Partition,Key,Value) //指定写入某个分区
规则
如果指定了分区:写入所指定的分区中
如果没指定分区:默认调用的是DefaultPartitioner分区器中partition这个方法
如果指定了Key:按照Key的Hash取余分区的个数,来写入对应的分区
如果没有指定Key:按照黏性分区
2.4之前:轮询分区
优点:数据分配相对均衡
Topic part key value topic 0 1 itcast1 topic 1 2 itcast2 topic 2 3 itcast3 topic 0 4 itcast4 topic 1 5 itcast5 topic 2 6 itcast6 topic 0 7 itcast7 topic 1 8 itcast8 topic 2 9 itcast9
缺点:性能非常差
Kafka生产者写入数据
step1:先将数据放入一个批次中,判断是否达到条件,达到条件才将整个批次的数据写入kafka
批次满了【batch.size】
达到一定时间【linger.ms】
step2:根据数据属于哪个分区,就与分区构建一个连接,发送这个分区的批次的数据
第一条数据:先构建0分区的连接,第二条不是0分区的,所以直接构建一个批次,发送第一条
第二条数据:先构建1分区的连接,第三条不是1分区的,所以直接构建一个批次,发送第二条
……
每条数据需要构建一个批次,9条数据,9个批次,每个批次一条数据
批次多,每个批次数据量少,性能比较差
希望:批次少,每个批次数据量多,性能比较好
2.4之后:黏性分区
设计:实现少批次多数据
规则:判断缓存中是否有这个topic的分区连接,如果有,直接使用,如果没有随机写入一个分区,并且放入缓存
第一次:将所有数据随机选择一个分区,全部写入这个分区中,将这次的分区编号放入缓存中
bigdata01 1 37 null itcast0 bigdata01 1 38 null itcast1 bigdata01 1 39 null itcast2 bigdata01 1 40 null itcast3 bigdata01 1 41 null itcast4 bigdata01 1 42 null itcast5 bigdata01 1 43 null itcast6 bigdata01 1 44 null itcast7 bigdata01 1 45 null itcast8 bigdata01 1 46 null itcast9
第二次开始根据缓存中是否有上一次的编号
有:直接使用上一次的编号
没有:重新随机选择一个
小结
Kafka中生产数据的分区规则是什么?
step1:先判断是否指定了分区
如果指定了,就写入指定的分区
step2:再判断是否指定了Key
如果指定了Key,按照Key的mur取余分区个数来决定
如果没有指定Key,按照黏性分区
实施
开发一个随机分区器
- package bigdata.itcast.cn.kafka.userpart;
-
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
-
- import java.util.Map;
- import java.util.Random;
-
- /**
- * @ClassName UserPartitioner
- * @Description TODO 用户自定义的分区器,实现用户自定义随机分区
- * @Date 2021/9/22 15:36
- * @Create By Frank
- */
- public class UserPartitioner implements Partitioner {
-
- /**
- * 真正计算返回分区的方法
- * @return
- */
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- //先获取分区个数
- Integer count = cluster.partitionCountForTopic(topic);
- //构建一个随机值
- Random random = new Random();
- //随机生成一个分区编号
- int i = random.nextInt(count);
- return i;
- }
-
- @Override
- public void close() {
- //释放资源方法
- }
-
- @Override
- public void configure(Map
configs) { - //获取配置方法
- }
- }
-
加载分区器
//指定分区器
props.put("partitioner.class","bigdata.itcast.cn.kafka.userpart.UserPartition"); 小结
如何构建一个自定义分区器?
step2:生产者代码指定自定义分区器即可
step1:自己开发分区器
开发一个类:实现Partitioner接口
实现方法:partition
RoundRobinAssignor:轮询分配,常见于Kafka2.0之前的版本,按照Topic名称和分区编号排序,轮询分配每个消费者org.apache.kafka.clients.consumer.RoundRobinAssignor
优点:如果有多个消费者,消费的Topic都是一样的,实现将所有Topic的所有分区轮询分配给所有消费者,尽量实现负载的均衡
缺点:如果遇到消费者Topic订阅是不一致的,只能基于订阅的消费者进行轮询,导致整体消费者负载不均衡
StickyAssignor:黏性分配,2.0之后建议使用,类似于粘性分配,尽量将分区均衡的分配给消费者,底层是通过算法实现的,
特点:相对的保证分配的均衡
如果某一个消费者故障,尽量的避免网络传输
尽量保证原来的消费的分区不变,将多出来分区均衡给剩余的消费者
假设原来有3个消费,消费6个分区,平均每个消费者消费2个分区
如果有一个消费者故障了,这个消费者负责的分区交给剩下的消费者来做:消费重平衡
优点:分配更加均衡,如果消费者出现故障,提高性能,避免重新分配,将多余的分区均衡的分配给剩余的消费者
org.apache.kafka.clients.consumer.StickyAssignor
会获取元数据,从元数据中获取这个分区的leader副本所在的broker的地址
将请求提交给这个Broker
先写PageCache:内存区域
满足条件以后:将PageCache中的数据顺序写入磁盘中的文件
写入很快?
PageCache:写内存
顺序写:写磁盘
实施
step1:消费者根据Topic、Partition、Offset提交给Kafka请求读取数据
step2:Kafka根据元数据信息,找到对应的这个分区对应的Leader副本节点
step3:请求Leader副本所在的Broker,先读PageCache,通过零拷贝机制【Zero Copy】读取PageCache
实现0磁盘读写
直接将内存数据发送到网络端口,实现传输
step4:如果PageCache中没有,读取Segment文件段,先根据offset找到要读取的那个Segment
先根据offset和segment文件段名称定位这个offset在哪个segment文件段中
step5:将.log文件对应的.index文件加载到内存中,根据.index中索引的信息找到Offset在.log文件中的最近位置
最近位置:index中记录的稀疏索引【不是每一条数据都有索引】
step6:读取.log,根据索引读取对应Offset的数据
小结
Kafka数据是如何被读取的?
step1:先读PageCache,如果有,通过Zero Copy【sendFile】机制来实现
step2:如果没有读这个分区的Segment
先根据offset确定读取哪个Segment
先index,再读.log
Kafka为什么写入很快?
PageCahce + 顺序写
Kafka为什么读取和快?
PageCache + 零拷贝
index索引机制 + offset
实施
索引类型
全量索引:每一条数据,都对应一条索引
index:201条
0 0 1 101 2 202
.log:201条数据
0 key1 value1 1 key2 value2 …… 200 key201 value201
稀疏索引:部分数据有索引,有一部分数据是没有索引的
index:10条
0 0 2 202 9 1010 ……
log:201条
0 key1 value1 1 key2 value2 …… 200 key201 value201
优点:减少了索引存储的数据量加快索引的索引的检索效率
什么时候生成一条索引?
#.log文件每增加4096字节,在.index中增加一条索引 log.index.interval.bytes=4096
Kafka中选择使用了稀疏索引
索引内容
两列
第一列:这条数据在这个文件中的位置
第二列:这条数据在文件中的物理偏移量
是这个文件中的第几条,数据在这个文件中的物理位置 1,0 --表示这个文件中的第一条,在文件中的位置是第0个字节开始 3,497 --表示这个文件中的第三条,在文件中的位置是第497个字节开始
这个文件中的第1条数据是这个分区中的第368770条数据,offset = 368769
检索数据流程
step1:先根据offset计算这条offset是这个文件中的第几条
step2:读取.index索引,根据二分检索,从索引中找到离这条数据最近偏小的位置
step3:读取.log文件从最近位置读取到要查找的数据
举例
需求:查找offset = 368772
step1:计算是文件中的第几条
368772 - 368769 = 3 + 1 = 4,是这个文件中的第四条数据
step2:读取.index索引,找到最近位置
3,497
step3:读取.log,从497位置向后读取一条数据,得到offset = 368772的数据
问题:为什么不直接将offset作为索引的第一列?
用offset会导致index文件很大,而且比较费时
小结
.index文件中的索引的内容是什么?
第一列:这条数据在文件中的是第几条数据
第二列:这条数据在文件中的物理偏移量
查询数据时如何根据索引找到对应offset的数据?
step1:先根据offset定位Segment文件
step2:读取.index文件,找到最近的位置
先计算你要的offset是这个文件第几条
offset - 文件最小offset + 1
从index找到最近位置返回
step3:从最近位置开始向后读取
实施
属性配置
#开启清理 log.cleaner.enable = true #清理规则 log.cleanup.policy = delete | compact
清理规则:delete
基于存活时间规则:最常用的方式
log.retention.ms log.retention.minutes log.retention.hours=168/7天
基于文件大小规则
#删除文件阈值,如果整个数据文件大小,超过阈值的一个segment大小,将会删除最老的segment,-1表示不使用这种规则 log.retention.bytes = -1
基于offset消费规则
功能:将明确已经消费的offset的数据删除
如何判断已经消费到什么位置
step1:编写一个文件offset.json
{
"partitions":[
{"topic": "bigdata", "partition": 0,"offset": 2000},
{"topic": "bigdata", "partition": 1,"offset": 2000}
],
"version":1
} step2:指定标记这个位置
kafka-delete-records.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --offset-json-file offset.json
清理规则:compact
功能:也称为压缩,将重复的更新数据的老版本删除,保留新版本,要求每条数据必须要有Key,根据Key来判断是否重复
小结
Kafka用于实现实时消息队列的数据缓存,不需要永久性的存储数据,如何将过期数据进行清理?
delete方案:根据时间定期的清理过期的Segment文件
实施
分区副本机制:每个kafka中分区都可以构建多个副本,相同分区的副本存储在不同的节点上
为了保证安全和写的性能:划分了副本角色
leader副本:对外提供读写数据
follower副本:与Leader同步数据,如果leader故障,选举一个新的leader
选举实现:Kafka主节点Controller实现
AR:All - Replicas
所有副本:指的是一个分区在所有节点上的副本
Partition: 0 Replicas: 1,0
ISR:In - Sync - Replicas
可用副本:所有正在与Leader同步的Follower副本
Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
列表中:按照优先级排列【Controller根据副本同步状态以及Broker健康状态】,越靠前越会成为leader
如果leader故障,是从ISR列表中选择新的leader
如果生产者写入数据:ack=all,写入所有ISR列表中的副本,就返回ack
OSR:Out - Sync - Replicas
不可用副本:与Leader副本的同步差距很大,成为一个OSR列表的不可用副本
原因:网路故障等外部环境因素,某个副本与Leader副本的数据差异性很大
判断是否是一个OSR副本?
0.9之前:时间和数据差异
replica.lag.time.max.ms = 10000 可用副本的同步超时时间 replica.lag.max.messages = 4000 可用副本的同步记录差,该参数在0.9以后被删除
0.9以后:只按照时间来判断
replica.lag.time.max.ms = 10000 可用副本的同步超时时间
写入、Leader选举:都只从ISR列表中选取
小结
Kafka中的分区数据如何保证数据安全?
分区副本机制
什么是AR、ISR、OSR?
AR:所有副本
ISR:可用副本
OSR:不可用副本
AR = ISR + OSR
实施
什么是HW、LEO?
分区:3个副本
Leader:0 1 2 3 4 5
6 7 8
LEO:9
Follower1: 0 1 2 3 4 5
LEO:6
Follower2:0 1 2 3 4 5 6
LEO = 7
HW = 6
LW:low_watermark:最低消费的offset
HW:high_watermark:最高消费的offset
LEO:Log End Offset
LSO:Log StartOffset
HW:当前这个分区所有副本同步的最低位置+1,消费者能消费到的最大位置
LEO:当前每个副本已经写入数据的最新位置 + 1
副本最小的LEO = HW
数据写入Leader及同步过程
step1:数据写入分区的Leader副本
leader:LEO = 5
follower1:LEO = 3
follower2:LEO = 3
step2:Follower到Leader副本中同步数据
leader:LEO = 5
follower1:LEO = 5
follower2:LEO = 4
小结
HW:所有副本都同步的位置+1,消费者可以消费到的位置
LEO:leader当前最新的位置+1
实施
集群配置:server.properties
| 属性 | 值 | 含义 |
|---|---|---|
| broker.id | int类型 | Kafka服务端的唯一id,用于注册zookeeper,一般一台机器一个 |
| host.name | hostname | 绑定该broker对应的机器地址 |
| port | 端口 | Kafka服务端端口:9092 |
| log.dirs | 目录 | kafka存放数据的路径 |
| zookeeper.connect | hostname:2181/kafkadata | zookeeper的地址 |
| zookeeper.session.timeout.ms | 6000 | zookeeper会话超时时间 |
| zookeeper.connection.timeout.ms | 6000 | zookeeper客户端连接超时时间 |
| num.partitions | 1 | 分区的个数 |
| default.replication.factor | 1 | 分区的副本数 |
| log.segment.bytes | 1073741824 | 单个log文件的大小,默认1G生成一个 |
| log.index.interval.bytes | 4096 | log文件每隔多大生成一条index |
| log.roll.hours | 168 | 单个log文件生成的时间规则,默认7天一个log |
| log.cleaner.enable | true | 开启日志清理 |
| log.cleanup.policy | delete,compact | 默认为delete,删除过期数据,compact为合并数据 |
| log.retention.minutes | 分钟值 | segment生成多少分钟后删除 |
| log.retention.hours | 小时值 | segment生成多少小时后删除【168】,7天 |
| log.retention.ms | 毫秒值 | segment生成多少毫秒后删除 |
| log.retention.bytes | -1 | 删除文件阈值,如果整个数据文件大小,超过阈值的一个segment大小,将会删除最老的segment,直到小于阈值 |
| log.retention.check.interval.ms | 毫秒值【5分钟】 | 多长时间检查一次是否有数据要标记删除 |
| log.cleaner.delete.retention.ms | 毫秒值 | segment标记删除后多长时间删除 |
| log.cleaner.backoff.ms | 毫秒值 | 多长时间检查一次是否有数据要删除 |
| log.flush.interval.messages | Long.MaxValue | 消息的条数达到阈值,将触发flush缓存到磁盘 |
| log.flush.interval.ms | Long.MaxValue | 隔多长时间将缓存数据写入磁盘 |
| auto.create.topics.enable | false | 是否允许自动创建topic,不建议开启 |
| delete.topic.enable | true | 允许删除topic |
| replica.lag.time.max.ms | 10000 | 可用副本的同步超时时间 |
| replica.lag.max.messages | 4000 | 可用副本的同步记录差,该参数在0.9以后被删除 |
| unclean.leader.election.enable | true | 允许不在ISR中的副本成为leader |
| num.network.threads | 3 | 接受客户端请求的线程数 |
| num.io.threads | 8 | 处理读写硬盘的IO的线程数 |
| background.threads | 4 | 后台处理的线程数,例如清理文件等 |
生产配置:producer.properties
| 属性 | 值 | 含义 |
|---|---|---|
| bootstrap.servers | hostname:9092 | KafkaServer端地址 |
| poducer.type | sync | async | 同步或者异步写入磁盘 |
| min.insync.replicas | 3 | 最小ISR个数 |
| buffer.memory | 33554432 | 配置生产者本地发送数据的缓存大小 |
| compression.type | none | 配置数据压缩,可配置snappy |
| partitioner.class | Partition | 指定分区的类 |
| acks | 1 | 指定写入数据的保障方式 |
| request.timeout.ms | 10000 | 等待ack确认的时间,超时发送失败 |
| retries | 3 | 发送失败的重试次数 |
| batch.size | 16384 | 批量发送的大小 |
| long.ms | 5000 | 发送间隔时间 |
| metadata.max.age.ms | 300000 | 更新缓存的元数据【topic、分区leader等】 |
消费配置:consumer.properties
| 属性 | 值 | 含义 |
|---|---|---|
| bootstrap.servers | hostname:9092 | 指定Kafka的server地址 |
| group.id | id | 消费者组的 名称 |
| consumer.id | 自动分配 | 消费者id |
| auto.offset.reset | latest | 新的消费者从哪里读取数据latest,earliest |
| auto.commit.enable | true | 是否自动commit当前的offset |
| auto.commit.interval.ms | 1000 | 自动提交的时间间隔 |
小结
常用属性了解即可
实施
Kafka Eagle的功能
用于集成Kafka,实现Kafka集群可视化以及监控报表平台
Kafka Eagle的部署启动
下载解压:以第三台机器为例
cd /export/software/ rz tar -zxvf kafka-eagle-bin-1.4.6.tar.gz -C /export/server/ cd /export/server/kafka-eagle-bin-1.4.6/ tar -zxf kafka-eagle-web-1.4.6-bin.tar.gz
修改配置
准备数据库:存储eagle的元数据,在Mysql中创建一个数据库
create database eagle;
修改配置文件:
cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/ vim conf/system-config.properties
#配置zookeeper集群的名称 kafka.eagle.zk.cluster.alias=cluster1 #配置zookeeper集群的地址 cluster1.zk.list=node1:2181,node2:2181,node3:2181 #31行左右配置开启统计指标 kafka.eagle.metrics.charts=true #配置连接MySQL的参数,并注释自带的sqlite数据库 kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://node1:3306/eagle kafka.eagle.username=root kafka.eagle.password=hadoop
配置环境变量
vim /etc/profile #KE_HOME export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6 export PATH=$PATH:$KE_HOME/bin source /etc/profile
添加执行权限
cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6 chmod u+x bin/ke.sh
启动服务
ke.sh start
登陆
网页:node3:8048/ke 用户名:admin 密码:123456
- CREATE TABLE IF NOT EXISTS `ke_alarm_crontab` (
- `id` BIGINT ( 20 ) NOT NULL,
- `type` VARCHAR ( 64 ) NOT NULL,
- `crontab` VARCHAR ( 32 ) DEFAULT NULL,
- `is_enable` VARCHAR ( 2 ) DEFAULT 'Y',
- `created` VARCHAR ( 32 ) DEFAULT NULL,
- `modify` VARCHAR ( 32 ) DEFAULT NULL,
- PRIMARY KEY ( `id`, `type` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_metrics` (
- `cluster` VARCHAR ( 64 ) DEFAULT NULL,
- `broker` TEXT DEFAULT NULL,
- `type` VARCHAR ( 32 ) DEFAULT NULL,
- `key` VARCHAR ( 64 ) DEFAULT NULL,
- `value` VARCHAR ( 128 ) DEFAULT NULL,
- `timespan` BIGINT ( 20 ) DEFAULT NULL,
- `tm` VARCHAR ( 16 ) DEFAULT NULL
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_p_role` (
- `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
- `name` VARCHAR ( 64 ) CHARACTER
- SET utf8 NOT NULL COMMENT 'role name',
- `seq` TINYINT ( 4 ) NOT NULL COMMENT 'rank',
- `description` VARCHAR ( 128 ) CHARACTER
- SET utf8 NOT NULL COMMENT 'role describe',
- PRIMARY KEY ( `id` )
- ) ENGINE = INNODB AUTO_INCREMENT = 4 DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_alarm_clusters` (
- `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
- `type` VARCHAR ( 32 ) DEFAULT NULL,
- `cluster` VARCHAR ( 64 ) DEFAULT NULL,
- `server` TEXT DEFAULT NULL,
- `alarm_group` VARCHAR ( 128 ) DEFAULT NULL,
- `alarm_times` INT ( 11 ),
- `alarm_max_times` INT ( 11 ),
- `alarm_level` VARCHAR ( 4 ),
- `is_normal` VARCHAR ( 2 ) DEFAULT 'Y',
- `is_enable` VARCHAR ( 2 ) DEFAULT 'Y',
- `created` VARCHAR ( 32 ) DEFAULT NULL,
- `modify` VARCHAR ( 32 ) DEFAULT NULL,
- PRIMARY KEY ( `id` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_alarm_consumer` (
- `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
- `cluster` VARCHAR ( 64 ) DEFAULT NULL,
- `group` VARCHAR ( 128 ) DEFAULT NULL,
- `topic` VARCHAR ( 128 ) DEFAULT NULL,
- `lag` BIGINT ( 20 ) DEFAULT NULL,
- `alarm_group` VARCHAR ( 128 ) DEFAULT NULL,
- `alarm_times` INT ( 11 ),
- `alarm_max_times` INT ( 11 ),
- `alarm_level` VARCHAR ( 4 ),
- `is_normal` VARCHAR ( 2 ) DEFAULT 'Y',
- `is_enable` VARCHAR ( 2 ) DEFAULT 'Y',
- `created` VARCHAR ( 32 ) DEFAULT NULL,
- `modify` VARCHAR ( 32 ) DEFAULT NULL,
- PRIMARY KEY ( `id` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_connect_config` (
- `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
- `cluster` VARCHAR ( 64 ),
- `connect_uri` VARCHAR ( 128 ),
- `version` VARCHAR ( 32 ),
- `alive` VARCHAR ( 16 ),
- `created` VARCHAR ( 32 ) DEFAULT NULL,
- `modify` VARCHAR ( 32 ) DEFAULT NULL,
- PRIMARY KEY ( `id` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_resources` (
- `resource_id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
- `name` VARCHAR ( 255 ) CHARACTER
- SET utf8 NOT NULL COMMENT 'resource name',
- `url` VARCHAR ( 255 ) NOT NULL,
- `parent_id` INT ( 11 ) NOT NULL,
- PRIMARY KEY ( `resource_id` )
- ) ENGINE = INNODB AUTO_INCREMENT = 17 DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_metrics_offline` (
- `cluster` VARCHAR ( 64 ) NOT NULL,
- `key` VARCHAR ( 128 ) NOT NULL,
- `one` VARCHAR ( 128 ) DEFAULT NULL,
- `mean` VARCHAR ( 128 ) DEFAULT NULL,
- `five` VARCHAR ( 128 ) DEFAULT NULL,
- `fifteen` VARCHAR ( 128 ) DEFAULT NULL,
- PRIMARY KEY ( `cluster`, `key` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
-
- CREATE TABLE IF NOT EXISTS `ke_logsize` (
- `cluster` VARCHAR ( 64 ) DEFAULT NULL,
- `topic` VARCHAR ( 64 ) DEFAULT NULL,
- `logsize` BIGINT ( 20 ) DEFAULT NULL,
- `diffval` BIGINT ( 20 ) DEFAULT NULL,
- `timespan` BIGINT ( 20 ) DEFAULT NULL,
- `tm` VARCHAR ( 16 ) DEFAULT NULL
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_consumer_group_summary` (
- `cluster` VARCHAR ( 64 ) NOT NULL,
- `group` VARCHAR ( 128 ) NOT NULL,
- `topic_number` VARCHAR ( 128 ) NOT NULL,
- `coordinator` VARCHAR ( 128 ) DEFAULT NULL,
- `active_topic` INT ( 11 ) DEFAULT NULL,
- `active_thread_total` INT ( 11 ) DEFAULT NULL,
- PRIMARY KEY ( `cluster`, `group` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_consumer_bscreen` (
- `cluster` VARCHAR ( 64 ) DEFAULT NULL,
- `group` VARCHAR ( 128 ) DEFAULT NULL,
- `topic` VARCHAR ( 64 ) DEFAULT NULL,
- `logsize` BIGINT ( 20 ) DEFAULT NULL,
- `difflogsize` BIGINT ( 20 ) DEFAULT NULL,
- `offsets` BIGINT ( 20 ) DEFAULT NULL,
- `diffoffsets` BIGINT ( 20 ) DEFAULT NULL,
- `lag` BIGINT ( 20 ) DEFAULT NULL,
- `timespan` BIGINT ( 20 ) DEFAULT NULL,
- `tm` VARCHAR ( 16 ) DEFAULT NULL
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_users` (
- `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
- `rtxno` INT ( 11 ) NOT NULL,
- `username` VARCHAR ( 64 ) NOT NULL,
- `password` VARCHAR ( 128 ) NOT NULL,
- `email` VARCHAR ( 64 ) NOT NULL,
- `realname` VARCHAR ( 128 ) NOT NULL,
- PRIMARY KEY ( `id` )
- ) ENGINE = INNODB AUTO_INCREMENT = 2 DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_sql_history` (
- `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
- `cluster` VARCHAR ( 64 ) DEFAULT NULL,
- `username` VARCHAR ( 64 ) DEFAULT NULL,
- `host` VARCHAR ( 128 ) DEFAULT NULL,
- `ksql` TEXT DEFAULT NULL,
- `status` VARCHAR ( 16 ) DEFAULT NULL,
- `spend_time` BIGINT ( 20 ) DEFAULT NULL,
- `created` VARCHAR ( 32 ) DEFAULT NULL,
- `tm` VARCHAR ( 16 ) DEFAULT NULL,
- PRIMARY KEY ( `id` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_consumer_group` (
- `cluster` VARCHAR ( 64 ) NOT NULL,
- `group` VARCHAR ( 128 ) NOT NULL,
- `topic` VARCHAR ( 128 ) NOT NULL,
- `status` INT ( 11 ) DEFAULT NULL,
- PRIMARY KEY ( `cluster`, `group`, `topic` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_alarm_config` (
- `cluster` VARCHAR ( 64 ) NOT NULL,
- `alarm_group` VARCHAR ( 128 ) NOT NULL,
- `alarm_type` VARCHAR ( 16 ) DEFAULT NULL,
- `alarm_url` TEXT DEFAULT NULL,
- `http_method` VARCHAR ( 16 ) DEFAULT NULL,
- `alarm_address` TEXT DEFAULT NULL,
- `created` VARCHAR ( 32 ) DEFAULT NULL,
- `modify` VARCHAR ( 32 ) DEFAULT NULL,
- PRIMARY KEY ( `cluster`, `alarm_group` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_user_role` (
- `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
- `user_id` INT ( 11 ) NOT NULL,
- `role_id` TINYINT ( 4 ) NOT NULL,
- PRIMARY KEY ( `id` )
- ) ENGINE = INNODB AUTO_INCREMENT = 2 DEFAULT CHARSET = utf8;
-
- CREATE TABLE IF NOT EXISTS `ke_role_resource` (
- `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
- `role_id` INT ( 11 ) NOT NULL,
- `resource_id` INT ( 11 ) NOT NULL,
- PRIMARY KEY ( `id` )
- ) ENGINE = INNODB AUTO_INCREMENT = 19 DEFAULT CHARSET = utf8;
-
- ALTER TABLE `ke_consumer_bscreen` ADD INDEX `idx_timespan` ( `timespan` );
- ALTER TABLE `ke_logsize` ADD INDEX `idx_timespan` ( `timespan` );
- ALTER TABLE `ke_logsize` ADD INDEX `idx_tm_topic` ( `tm`, `topic` );
- ALTER TABLE `ke_logsize` ADD INDEX `idx_tm_cluster_diffval` ( `tm`, `cluster`, `diffval` );
- ALTER TABLE `ke_consumer_bscreen` ADD INDEX`eagle` `idx_tm_cluster_diffoffsets` ( `tm`, `cluster`, `diffoffsets` );
- INSERT INTO `ke_users`
- VALUES
- ( '1', '1000', 'admin', '123456', 'admin@email.com', 'Administrator' );;
- CREATE TABLE IF NOT EXISTS `ke_topic_rank` (
- `cluster` VARCHAR ( 64 ) NOT NULL,
- `topic` VARCHAR ( 64 ) NOT NULL,
- `tkey` VARCHAR ( 64 ) NOT NULL,
- `tvalue` BIGINT ( 20 ) DEFAULT NULL,
- PRIMARY KEY ( `cluster`, `topic`, `tkey` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8;
- INSERT INTO `ke_user_role`
- VALUES
- ( '1', '1', '1' );;
-
- INSERT INTO `ke_p_role`
- VALUES
- ( '1', 'Administrator', '1', 'Have all permissions' ),
- ( '2', 'Devs', '2', 'Own add or delete' ),
- ( '3', 'Tourist', '3', 'Only viewer' );
-
- INSERT INTO `ke_role_resource`
- VALUES
- ( '1', '1', '1' ),
- ( '2', '1', '2' ),
- ( '3', '1', '3' ),
- ( '4', '1', '4' ),
- ( '5', '1', '5' ),
- ( '6', '1', '7' ),
- ( '7', '1', '8' ),
- ( '8', '1', '10' ),
- ( '9', '1', '11' ),
- ( '10', '1', '13' ),
- ( '11', '2', '7' ),
- ( '12', '2', '8' ),
- ( '13', '2', '13' ),
- ( '14', '2', '10' ),
- ( '15', '2', '11' ),
- ( '16', '1', '14' ),
- ( '17', '1', '15' ),
- ( '18', '1', '16' ),
- ( '19', '1', '18' ),
- ( '20', '1', '19' ),
- ( '21', '1', '20' ),
- ( '22', '1', '21' ),
- ( '23', '1', '22' ),
- ( '24', '1', '23' ),
- ( '25', '1', '24' );
- INSERT INTO `ke_resources`VALUES
- ( '1', 'System', '/system', '-1' ),
- ( '2', 'User', '/system/user', '1' ),
- ( '3', 'Role', '/system/role', '1' ),
- ( '4', 'Resource', '/system/resource', '1' ),
- ( '5', 'Notice', '/system/notice', '1' ),
- ( '6', 'Topic', '/topic', '-1' ),
- ( '7', 'Message', '/topic/message', '6' ),
- ( '8', 'Create', '/topic/create', '6' ),
- ( '9', 'Alarm', '/alarm', '-1' ),
- ( '10', 'Add', '/alarm/add', '9' ),
- ( '11', 'Modify', '/alarm/modify', '9' ),
- ( '12', 'Cluster', '/cluster', '-1' ),
- ( '13', 'ZkCli', '/cluster/zkcli', '12' ),
- ( '14', 'UserDelete', '/system/user/delete', '1' ),
- ( '15', 'UserModify', '/system/user/modify', '1' ),
- ( '16', 'Mock', '/topic/mock', '6' ),
- ( '18', 'Create', '/alarm/create', '9' ),
- ( '19', 'History', '/alarm/history', '9' ),
- ( '20', 'Manager', '/topic/manager', '6' ),
- ( '21', 'PasswdReset', '/system/user/reset', '1' ),
- ( '22', 'Config', '/alarm/config', '9' ),
- ( '23', 'List', '/alarm/list', '9' ),
- ( '24', 'Hub', '/topic/hub', '6' );
Kafka Eagle使用
监控Kafka集群
监控Zookeeper集群
监控Topic
查看数据积压
现象:消费跟不上生产速度,导致处理的延迟
原因
消费者组的并发能力不够
消费者处理失败
网络故障,导致数据传输较慢
解决
提高消费者组中消费者的并行度
分析处理失败的原因
找到网络故障的原因
查看监控
报表
小结
Kafka中最常用的监控工具
用于查看集群信息、管理集群、监控集群