kafka的高可用表现在一个topic可以有多个分区,分配在不同的机器上。每个分区可以有多个副本,每个副本持有当前分区的所有数据。多个副本会选取一个作为leader,其他作为follower。生产者和消费者都会从leader操作数据。同时leader会把数据同步到follower上。假设某一台机器宕机了,上面刚好作为leader就没了。此时其他机器follower会感知到leader死了,会进行选举出来一个作为leader。
引入Replication之后,同一个Partition可能会有多个Replica,而这时需要在这些Replication之间选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica作为Follower从Leader中复制数据。
因为需要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。如果没有一个Leader,所有Replica都可同时读/写数据,那就需要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了Replication实现的复杂性,同时也增加了出现异常的几率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。
Kafka分配Replica的算法如下:
1.将所有Broker(假设共n个Broker)和待分配的Partition排序
2.将第i个Partition分配到第(i mod n)个Broker上
3.将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
topic是逻辑概念,partition是物理概念。为topic定义partition,默认创建topic时是5个paration。
每个paration为一个目录,命名规则为:topic+partition序列号,从0开始。每个partition目录里面是由多个segment(段)数据文件组成。segment文件由两部分组成,分别为".index"文件和".log"文件,分别为segment索引文件和数据文件。这两个文件的命名规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充。
首先会根据 offset 值去查找 Segment 中的 index 文件,因为 index 文件是以上个文件的最大 offset 偏移命名的所以可以通过二分法快速定位到索引文件。
找到索引文件后,索引文件中保存的是 offset 和对应的消息行在 log 日志中的存储型号,因为 Kafka 采用稀疏矩阵的方式来存储索引信息,并不是每一条索引都存储,所以这里只是查到文件中符合当前 offset 范围的索引。
拿到 当前查到的范围索引对应的行号之后再去对应的 log 文件中从 当前 Position 位置开始查找 offset 对应的消息,直到找到该 offset 为止。
1、单个 Partition 来管理数据,顺序往 Partition 中累加写势必会造成单个 Partition 文件过大,查找和维护数据就变得非常困难。
2、另一个原因是 Kafka 消息记录不是一直堆堆堆,默认是有日志清除策略的。要么是日志超过设定的保存时间触发清理逻辑,要么就是 Topic 日志文件超过阈值触发清除逻辑,如果是一个大文件删除是要锁文件的这时候写操作就不能进行。因此设置分段存储对于清除策略来说也会变得更加简单,只需删除较早的日志块即可。
生产者调用send方法发送消息,经过拦截器、序列化器(对key和value进行序列化),进入分区器选择消息的分区。接着消息会进入到一个名为RecordAccumulator的缓冲队列,这个队列默认32M,当满足以下两个条件的任意一个之后,消息由sender线程发送。
条件一:消息累计达到batch.size,默认是16kb
条件二:等待时间达到linger.ms,默认是0毫秒。所以默认情况下,是来一条发送一条。
Sender线程首先会通过sender读取数据,并创建发送的请求,针对Kafka集群里的每一个Broker,都会有一个InFlightRequests请求队列存放在NetWorkClient中,默认每个InFlightRequests请求队列中缓存5个请求。接着这些请求就会通过Selector发送到Kafka集群中。
当请求发送到Kafka集群后,Kafka集群会返回对应的acks信息。生产者可以根据具体的情况选择处理acks信息。比如是否需要等有回应之后再继续发送消息,还是不管发送成功失败都继续发送消息。
1、 producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
2、 producer 将消息发送给该 leader
3、 leader 将消息写入本地 log
4、 followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
5、 leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
提高发送吞吐量
1、调整batch.size和linger.ms值提高吞吐量
2、提高数据可靠性
数据发送到kafka集群后,kafka集群有三种应答方式:
acks=0,生产者发送过来的数据不管是否成功都不管。
acks=1,只有当kafka的分区Leader节点应答后才会继续发送数据。
acks=-1,只有当Leader和ISR队列里所有节点都应答后才继续发消息。
ISR队列是和Leader节点保持同步的Follower和Leader节点的集合队列,比如Leader节点是0,另外两个Follower节点是1和2,则ISR队列就是0,1,2。如果某个Follow节点在指定时间内没有应答Leader,则将这个节点从ISR队列中踢出。
一般来讲会根据应用场景选择三种应答方式,如果是数据需要强可靠性的情况,就会使用acks=-1的情况,如果对数据的可靠性没有要求,则可以选择0和1。
3、消息的事务管理
4、Producer端可以通过GZIP和Snappy格式对消息集合进行压缩传输,减少网络传输的压力
某个消费组消费 partition 需要保存 offset 记录当前消费位置,0.10 之前的版本是把 offset 保存到 zk 中,但是 zk 的写性能不是很好,Kafka 采用的方案是 consumer 每分钟上报一次,这样就造成了重复消费的可能。
0.10 版本之后 Kafka 就 offset 的保存从 zk 剥离,保存到一个名为 consumer_offsets 的 Topic 中。消息的 key 由 [groupid、topic、partition] 组成,value 是偏移量 offset。Topic 配置的清理策略是compact。总是保留最新的 key,其余删掉。一般情况下,每个 key 的 offset 都是缓存在内存中,查询的时候不用遍历 Partition,如果没有缓存第一次就会遍历 Partition 建立缓存然后查询返回。
正常读取文件:
1、调用 read() 时,文件 A 中的内容被复制到了内核模式下的 Read Buffer 中。
2、CPU 控制将内核模式数据复制到用户模式下。
3、调用 write() 时,将用户模式下的内容复制到内核模式下的 Socket Buffer 中。
4、将内核模式下的 Socket Buffer 的数据复制到网卡缓冲区(NIC Buffer)网卡设备中传送
有两次拷贝是浪费的。
1.从内核空间拷贝到用户空间;
2.从用户空间再次拷贝到内核空间;
除此之外,由于用户空间和内核空间的切换,会带来Cpu上下文切换,对于Cpu的性能也会造成影响,
而所谓的零拷贝,就是把这两次多余的拷贝忽略掉,应用程序可以直接把磁盘中的数据,
从内核中直接传输到Socket, 而不需要再次经过应用程序所在的用户空间。
零拷贝通过DMA(Direct Memory Access)技术,把文件内容 复制到内核空间中的Read Buffer,
接着把包含数据长度和位置的信息文件描述符, 加载到 SocketBuffer 中,
DMA引擎直接可以把数据从内核空间传递到网卡设备,在这个流程中,
数据只经历了两次拷贝,就把数据发送到网卡中,并且减少了2次Cpu的上下文切换,
对于效率是有非常大的提高。所以,所谓的零拷贝并不是完全没有数据的拷贝,只是相对用户空间来说,不需要再进行数据的拷贝。只是减少了不必要的拷贝次数而已。
zookeeper可以创建zNode节点空间,ZooKeeper 还提供了一种订阅 ZNode 状态变化的通知机制:Watcher,一旦 ZNode 或者它的子节点状态发生了变化,订阅的客户端会立即收到通知。利用 ZooKeeper 临时节点和 Watcher 机制,我们很容易随时来获取业务集群中每个节点的存活状态,并且可以监控业务集群的节点变化情况,当有节点上下线时,都可以收到来自 ZooKeeper 的通知。
zookeeper会针对主题,队列建立节点空间,比如topic:order,partition:1,会创建一个分区空间order/partition/1,当客户端订阅当前topic时,轮询到某个分区上,会去zookeeper找到这个topic某个分区对应的节点,在节点里面拿到bokerId,再通过brokerId在zookeeper里面获取到broker地址连接信息,每个分区节点下面是一个名为 state 的临时节点,节点中保存着分区当前的 leader 和所有的 ISR 的 BrokerID。这个 state 临时节点是由这个分区当前的 Leader Broker 创建的。如果这个分区的 Leader Broker 宕机了,对应的这个 state 临时节点也会消失,直到新的 Leader 被选举出来,再次创建 state 临时节点。
1、节点必须和zookeeper维护,建立连接,zookeeper通过心跳机制检查每个节点的连接
2、如果节点是个follower,他必须及时的同步leader的写操作,延时不能太久
Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(该值可在
K
A
F
K
A
H
O
M
E
/
c
o
n
f
i
g
/
s
e
r
v
e
r
.
p
r
o
p
e
r
t
i
e
s
中通过
r
e
p
l
i
c
a
.
l
a
g
.
m
a
x
.
m
e
s
s
a
g
e
s
配置,其默认值是
4000
)或者
F
o
l
l
o
w
e
r
超过一定时间(该值可在
KAFKA_HOME/config/server.properties中通过replica.lag.max.messages配置,其默认值是4000)或者Follower超过一定时间(该值可在
KAFKAHOME/config/server.properties中通过replica.lag.max.messages配置,其默认值是4000)或者Follower超过一定时间(该值可在KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,完全同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。
request.require.ack 有三个值0、1、-1
0:生产者不等待Booker的ack,延迟最低,但存在丢消息
1、等待leader响应,确保leader接收到消息
-1、等待leader以及所有对应的多有follower都响应,这样数据不会丢失
出现活锁的情况,是它持续发送心跳,没有响应。为了防止消费者持有当前分区一直不消费,我们使用max.poll.interval.ms 活跃检测机器。如果你poll消息,处理时间大于该时间间隔,没有提交ACK。则会把当前消费者移除消费者组,有其他消费者接管消费。此时被移除的消费者提交offset,引发CommitFailedException。这事一种安全机制,保障只有活动成员可以提offset。
消费者提供两个设置来控制poll循环:
max.poll.interval.ms:poll消息最大消费的时间间隔
max.poll.records:一次拉取的最大消息数目
seek(topicPartition,offset),指定消费位置
也可以seekToBeginning(collection)和seekToEnd(collection)
Leader选举本质上是一个分布式锁,有两种方式实现基于ZooKeeper的分布式锁:
节点名称唯一性:多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁
临时顺序节点:所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁
一种非常常用的选举leader的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个Replica(包含Leader和Follower),那在commit之前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不能超过f个。因为在剩下的任意f+1个Replica里,至少有一个Replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几个Broker,而非最慢那个。Majority Vote也有一些劣势,为了保证Leader Election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的Replica,如果要容忍2个Follower挂掉,必须要有5个以上的Replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的Replica,而大量的Replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal,但是它的数据存储并没有使用这种方式。
Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。
虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优势,但是Kafka作者认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题,并且节省下来的Replica和磁盘使得ISR模式仍然值得。
在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
1.等待ISR中的任一个Replica“活”过来,并且选它作为Leader
2.选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader
这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
最简单最直观的方案是,所有Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(ZooKeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。
但是该方法会有3个问题:
1.split-brain 这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致
2.herd effect 如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整
3.ZooKeeper负载过重 每个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增加到几千个Partition时ZooKeeper负载会过重。
Kafka 0.8.*的Leader Election方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
1、 controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2、 controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
2.1、 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
2.2、 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
3、 controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。
1、 controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2、 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。
1、 controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
2、 controller 从 /brokers/ids 节点读取可用broker
3、 controller决定set_p,该集合包含宕机 broker 上的所有 partition
4、 对 set_p 中的每一个 partition
4.1、 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
4.2、 决定新 leader
4.3、 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
5、 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令
当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 “/controller” 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。
当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:
1、 读取并增加 Controller Epoch。
2、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
3、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
4、 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
5、 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
6、 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
7、 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
8、 启动 replicaStateMachine 和 partitionStateMachine。
9、 将 brokerState 状态设置为 RunningAsController。
10、 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
11、 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
12、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
参考博客:https://www.cnblogs.com/qingyunzong/p/9004703.html