一、什么是Kafka
MQ消息队列作为最常用的中间件之一,其主要特性有:解耦、异步、限流/削峰。
Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
二、Kafka常用概念
2.1 Topic与Partition
Topic(主题)是一个逻辑概念,在物理上并不存储。主要用于描述一个类型的消息。例如我们有一个业务系统会发送一个描述用户订单状态的消息,那么这一个类型里面所有的消息就是一个Topic,又比如这个业务系统同时还会发送描述会员余额的消息,那么这个就是一个新的消息类型,也就是一个新的Topic。
**Partition(分区)**是一个物理概念,是实际存在于物理设备上的。一个Topic由多个Partition共同组成。Partition的存在是为了提高消息的性能与吞吐量,多个分区多个进程消息处理速度肯定要比单分区快的多。
2.2 Broker与Partition
Broker作为分布式的实现,其实可以直接简单理解为一个Kafka进程就是一个Broker。
我们之前提到Partition是物理存在的,其物理的存在的位置就在Broker中。同时,为了服务具有一定的可靠性,每一个分区都有几个副本,每个副本存在于不同的Broker中。
我们之前提到的Topic是逻辑概念即在于此,并没有物理存在,图中每个TopicA-x都是一个Partition,其中后面的数字代表了一个分区中的第几个副本,每个Broker中都有不同的副本,目的就是当有Broker宕机时,其他的副本还存在保证系统的可用性。
此外,多个副本Partition中会选取一个作为leader,其他的作为follower。我们的生产者在发送数据的时候,是直接发送到leader partition里面,然后follower partition会去leader那里自行同步数据,消费者消费数据的时候,也是从leader那去消费数据的。
副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用。
2.3 生产者消费者与ZooKeeper
产生消息的角色或系统称之为生产者,例如上述某个业务系统产生了关于订单状态的相关消息,那么该业务系统即为生产者。
消费者则是负责接收或者使用消息的角色或系统。
ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器 的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。
在每一个Broker在启动时都会像向ZK注册信息,ZK会选取一个最早注册的Broker作为Controller,后面Controller会与ZK进行数据交互获取元数据(即整个Kafka集群的信息,例如有那些Broker,每个Broker中有那些Partition等信息),然后其他Broker再与Controller交互进而所有的Broker都能感知到整个集群的所有信息.
2.4 消费者组
目前大部分业务系统架构都是分布式的,即一个应用会部署多个节点。正常来说,一条消息只应该被其中某一个节点消费掉,而不应该是所有被所有的消费者同时消费一遍。因此就产生了消费者组的概念,在一个消费者组中,一条消息只会被消费者组中的一个消费者所消费。
从使用上来说,一般配置为一个应用为一个消费者组,或一个应用中不同的环境也可以配置不用的消费者组。例如生产环境的节点与预发环境的节点可以配置两套消费者组,这样在有新的改动部署在预发时,即时本次改动修改了消费动作的相关逻辑,也不会影响生产的数据。
消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少) 消费者的个数来提高(或降低)整体的消费能力。对于分区数固定的情况,一味地增加消费者 并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况, 就会有消费者分配不到任何分区。参考下图(右下),一共有 8 个消费者,7 个分区,那么最后的消费 者 C7 由于分配不到任何分区而无法消费任何消息。
2.5 ISR、HW、LEO
Kafka通过ISR机制尽量保证消息不会丢失。
一个Partition中所有副本称为AR****(Assigned Replicas),所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成 **ISR (In-Sync Replicas)。**我们上文提到,follower 副本只负责消息的同步,很多时候 follower 副本中的消息相对 leader 副本而言会有一定的滞后,而及时与leader副本保持数据一致的就可以成为ISR成员。与 leader 副本同步滞后过多的副本(不包括 leader 副本)组成OSR (Out-of-Sync Replicas),由此可见,AR****=ISR+OSR。 在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR, OSR 集合为空。
leader副本会监听所有follower副本,当其与leader副本数据一致时会将其加入ISR成员,当与leader副本相差太多或宕机时会将其踢出ISR,也会再其追上leader副本后重新加入ISR。
当leader副本宕机或不可用时,只有ISR成员才能有机会被选择为新的leader副本,这样就能确保新的leader与已经宕机的leader数据一致,而如果选择OSR中的副本作为leader时会造成部分未同步的数据丢失。
上图情况中,P1副本首先当选了leader,且只有P2副本同步了P1的数据,offset都为110,那么此时的ISR只有P1与P2,OSR有P3和P4。当P3同步数据到110后,也会被leader加入到ISR中,若此时leader宕机,则会从ISR中选出一个新的leader,并将P0踢出ISR中。
那么leader是如何感知到其他副本是否与自己数据一致呢?靠的就是HW与LEO机制。
LEO 是 Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的 offset,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加 1。分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,HW 是 High Watermark 的缩写,俗称高水位,它标识 了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。
上图中,因为所有副本消息都是一致的,所以所有LEO都是3,HW也为3,当有新的消息产生时,即leader副本新插入了3/4两条消息,此时leader的LEO为5,两个follower的此时未同步消息,所以LEO仍未3,HW选择最小的LEO是3.
当follower1同步完成leader的数据后,LEO未5,但follower2未同步,所以此时HW仍未3。此后follower2同步完成后,其LEO为5,所有副本的LEO都未5,此时HW选择最小的为5。
通过这种机制,leader副本就能知道那些副本是满足ISR条件的(该副本LEO是否等于leader副本LEO)。
三、Kafka全流程梳理
3.1 注册信息
Kafka强依赖与ZooKeeper以维护整个集群的信息,因此在启动前应该先启动ZooKeeper。
在ZK启动完成之后,所有的Broker(即所有的Kafka进程)都会向ZK注册信息,然后争取/controller
的监听权,获取到监听权的Broker称为Controller,此后由Controller与ZK进行信息交换,所有的Broker与Controller进行消息交换。进而保持整个Kafka集群的信息一致性。
3.2 创建主题
在所有的Broker注册完毕后,需要注册主题(Topic)以继续后续流程。
其中某个客户端接收到创建Topic请求后,会将请求中的分区方案(有几个分区、几个副本等)告诉ZK,ZK再将信息同步至Controller,此后所有的Broker与Controller交换完元数据,至此所有的Broker都已经知道该Topic的分区方案了,然后按照该分区方案创建自己的分区或副本即可。
以上就是某一个broker下面的某一个主题的分布情况
3.3 生产者发送数据
在创建完想要的Topic之后,生产者就可以开始发送数据。
3.3.1 封装ProducerRecord
首先生产者会将信息封装成ProducerRecord
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
其中主要包好了要发送的Topic名称,要发送至那个分区,以及要发送的数据和key。
其他的都比较好理解,key的作用是如果key存在的话,就会对key进行hash,然后根据不同的结果发送至不同的分区,这样当有相同的key时,所有相同的key都会发送到同一个分区,我们之前也提到,所有的新消息都会被添加到分区的尾部,进而保证了数据的顺序性。
例如我们有个关于会员的业务系统,其中生产者会产生关于某个会员积分的信息,消费者拿到这个消息之后会实际对积分进行操作。假如某个会员先获得了100积分,然后又消费了50积分。因此生产者会发送两个MQ消息,但是假如没有使用key的功能,这两个消息被发送到了不同的分区,因为每个分区的消费水平不一样(例如获得积分的逻辑耗时比较长而某个分区又都是获得积分的MQ),就有可能造成消费50积分的MQ会先被消费者收到。
而假如此时会员积分为0的情况下再去消费50积分明显是不合理且逻辑错误的,会造成业务系统异常。因此在生产者发送MQ时如果消息有顺序性要求则一定要将key赋值,具体的可以是某些有唯一性标识例如此处可以是会员ID。
3.3.2 序列化数据、获取元数据、确定分区
首先生产则客户端的序列化器会将要发送的ProducerRecord对象序列化成字节数组 ,然后发送到消费端后消费端的反序列化器会将字节数组再转换成对应的消费对象。常用的序列化器有String、Doule、Long等等。
其次也可以自定义序列化器与反序列化器,例如可以将将字节数组进行加密后再进行传输,以此保证数据的安全性。
数据都准备完成之后就可以开始获取broker元数据,例如host等,以方便后续确定要发送的位置。
确定要发送至那个分区有几种情况:
-
如果ProducerRecord中指定了要发往那个分区,则选择用户使用的分区
-
如果没有指定分区,则查看ProducerRecord中key是否为空,如果不为空则对key进行计算以获取使用那个分区
-
如果key也为空,则按照轮询的方式发送至不同的分区
也可以通过自定义分区器的方式确定发送那个分区。
3.3.3 写入缓冲区、分批分送消息
生产者发送的MQ并不会直接通过网络发送至broker,而是会先保存在生产者的缓冲区。
然后由生产者的Sender线程分批次将数据发送出去,分批次发送的原因是可以节省一定的网络消耗与提升速度,因为一次发送一万条与一万次发送一条肯定效率不太一样。
分批次发送主要有两个参数,批次量与等待时间。两个参数主要是解决两个问题,一个是防止一次发送的消息量过大,比如一次可能发送几十mb的数据。另一个解决的问题是防止长时间没有足够消息产生而导致的消息一直不发送。因此当上述两个条件任意满足其一就会触发这一批次的发送。
Kafka的网络模型用的是加强版的reactor网络模型
首先客户端发送请求全部会先发送给一个Acceptor,broker里面会存在3个线程(默认是3个),这3个线程都是叫做processor,Acceptor不会对客户端的请求做任何的处理,直接封装成一个个socketChannel发送给这些processor形成一个队列,发送的方式是轮询,就是先给第一个processor发送,然后再给第二个,第三个,然后又回到第一个。消费者线程去消费这些socketChannel时,会获取一个个request请求,这些request请求中就会伴随着数据。
线程池里面默认有8个线程,这些线程是用来处理request的,解析请求,如果request是写请求,就写到磁盘里。读的话返回结果。 processor会从response中读取响应数据,然后再返回给客户端。这就是Kafka的网络三层架构。
所以如果我们需要对kafka进行增强调优,增加processor并增加线程池里面的处理线程,就可以达到效果。request和response那一块部分其实就是起到了一个缓存的效果,是考虑到processor们生成请求太快,线程数不够不能及时处理的问题。
3.4 消费者消费数据
消费者消费也主要分为两个阶段:
-
信息注册阶段,即整个消费者组向集群注册消费信息等
-
信息消费阶段,开始信息消息,确保消息可靠性等
3.4.1 信息注册
首先消费者组内所有消费者都会向集群寻找自己的Coordinator(以消费者组id做均衡)。找到Coordinator后,所有的Consumer都会向Coordinator发起join group
加入消费者组的请求,Coordinator会选择一个最早发起请求的Consumer作为leader Consumer,其他的Consumer作为follower。
leader会根据要消费的Topic及分区情况制定一个消费方案,告知给Coordinator,Coordinator再将此消费方案告知给各个follower。
自此,所有的Consumer都已经知道自己要消费那个分区了。
如上图,每个消费者都找了自己要消费的分区情况
3.4.2 消费信息
消费信息主要包含了以下几个步骤:
1)拉取消息
常用的消息队列的消费消息一般有两种,推送或者拉取,Kafka在此处用的是拉取模式。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
int updateCount = 1;
if (map.containsKey(record.value())) {
updateCount = (int) map.get(record.value() + 1);
}
map.put(record.value(), updateCount);
}
}
}finally {
consumer.close();
}
通过设置定时时间,每隔多长时间拉取一次消息。
2)反序列化与消费消息
在上面的代码中,我们拿到的就是ConsumerRecord
对象,但是实际上这个是消费者客户端帮我们做的反序列化的操作,将字节数组(byte[])反序列化成了对象。参考3.3.2我们也可以自定义反序列化器。
3)提交消息位移
例如当消息队列中有100条消息,消费者第一次消费了20条消息,那么第二次消费的位置肯定是要从第21条消息开始消费,而记录第21条消息的信息称之为offset,offset为已经消费位置+1.
在之前版本的客户端,offset数据被存在zk中,每次都需要请求zk获取数据,而zk并不适合作为高并发的请求。因此在现在的版本中,kafka通过建立一个Topic来记录所有消费者消费的offset,这个Topic是__consumer_offsets
。每一个消费者在消费数据之前(即pol()方法中),都会把上一次消费数据中最大的offset提交到该Topic中,即此时是作为生产者的身份投递信息。
kafka中有几种offset提交模式,默认的是自动提交:
enable.auto.commit
设置为true时,每隔auto.commit.interval.ms
时间会自动提交已经已经拉取到的消息中最大的offset。
但是默认的自动提交也会带来重复消费与消息丢失的问题:
-
重复消费。例如从offset为21开始拉取数据,拉取到了40,但是当消费者处理到第30条数据的时候系统宕机了,那么此时已经提交的offset仍为21,当节点重新连接时,仍会从21消费,那么此时21-30的数据就会被重新消费。还有一种情况是再均衡时,例如有新节点加入也会引发类似的问题。
-
消息丢失。
手动同步提交
public static void main(String[] args) {
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
// 模拟消息的处理逻辑
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
try {
//处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息
consumer.commitSync();
} catch (CommitFailedException e) {
//todo 事务回滚
e.printStackTrace();
}
}
}
手动同步提交可以在任何时候提交offset,例如可以每消费一条进行一次提交。提交失败之后会抛出异常,可以在异常中做出补偿机制,例如事务回滚等操作。
但是因为手动同步提交是阻塞性质的,所以不建议太高的频率进行提交。
手动异步提交
异步提交有三种方式,区别在于有没有回调的方式。
@Test
public void asynCommit1(){
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
consumer.commitAsync();
}
}
@Test
public void asynCommit2(){
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
// 异步回调机制
consumer.commitAsync(new OffsetCommitCallback(){
@Override
public void onComplete(Map offsets, Exception exception ) {
if (exception!=null){
System.out.println(String.format("提交失败:%s", offsets.toString()));
}
}
});
}
}
@Test
public void asynCommit3(){
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
consumer.commitAsync((offsets, exception) ->{
if (exception!=null){
System.out.println(String.format("提交失败:%s", offsets.toString()));
}
});
}
}
异步提交commitAsync()
与同步提交commitSync()
最大的区别在于异步提交不会进行重试,同步提交会一直进行自动重试,当然也可以通过再发生异常时继续提交的方式来完成此功能。
同步+异步
可以使用同步+异步的形式保证数据能够准确提交:
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
log.trace("Kafka消费信息ConsumerRecord={}",record.toString());
}
try {
//先使用异步提交机制
consumer.commitAsync();
} catch (CommitFailedException e) {
// todo 补偿机制
log.error("commitAsync failed", e)
} finally{
try {
//再使用同步提交机制
consumer.commitSync();
} catch (CommitFailedException e) {
// todo 补偿机制
log.error("commitAsync failed", e)
} finally{
consumer.close();
}
}
}
四、异常场景实践
4.1 异常重试
我们系统之前遇到过消费者在消费消息时,短时间内连续报错。根据现象以为是系统出现问题,后续发现所有报错都是同一条消息,排查后发现是处理消息过程中存在未捕获的异常,导致消息重试,相同的问题引发了连续报错。
JMQ在消费过程中如果有未捕获的异常会认为消息消费失败,会首先在本地重试两次后放入重试队列中,进入重试队列的消息,会有过期逻辑,当超过重试时间或者超过最大重试次数后(默认3天过期),消息将会被丢弃。因此在处理消息时需要考虑如果出现异常后的处理场景,选择是重试还是忽略还是记录数据后告警。
因此我们在消费消息的过程中,尤其是采用pull模式,一定要根据业务场景注意异常的捕获。否则小则影响本条消息,大则本批次后续所有消息都可能丢失。
//每隔1min拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60L));
for (ConsumerRecord<String, String> record : records) {
try {
//doing
} catch (Exception e) {
//如果此处未捕获消息,会直接导致for循环退出,后续所有消息都将丢失
log.error("Bdp监听任务执行失败, taskName:{}", taskName, e);
}
}
4.2 本地重试与服务端重试
系统还遇到过在JMQ服务端配置了消费失败重试的逻辑,例如重试多少次间隔多久,但是在消费失败之后,发现重试的逻辑并没有按照配置的逻辑走。联系运维帮忙排查后发现:
重试分为本地重试和服务端重试
根据4.1我们知道消费失败后,会首先在本地重试,本地重试失败后会放入重试队列,则此时进入服务端重试,两套重试需要两套配置,本地的重试配置在本地的配置文件中。
本地配置如下:
<jmq:consumer id="apiConsumer" transport="jmq.apilog.transport">
<jmq:listener topic="${jmq.topic.apilog}" listener="apiLogMessageListener" retryDelay="1000" maxRetrys="3"/>
jmq:consumer>
服务端重试配置:
作者:京东科技 韩国凯
来源:京东云开发者社区 转载请注明来源