名词解释 相信做过数据处理的小伙伴们对于kafka肯定是熟悉的。基础的kafka知识这里就不过多陈述了。今天主要来讲一下kafka的几个特性,下面先简单解释下这几个特性的含义:
数据从producer中写入到kafka以及consumer从topic中消费数据,数据都不会丢失。
数据在kafka的流程中既不会被重新生产,也不会被重复消费。
这也是实现exactly-once语义的基础。
由于kafka是顺序消费,所以kafka的有序性主要体现在生产者写入kafka时数据的有序性。
使用场景 介绍完这几个定义后,下面来看看这几个特性的使用场景:
安全性这里就不多说了,毕竟作为一个消息中间件,数据丢失是万万不可接受的。所以数据安全性的保障是kafka能够使用的基础。
幂等性对于某些业务可能意义不是很大,但是对于一些业务却是十分重要的。就拿资金结算场景来说,业务方作为上游把数据打到结算平台,如果一份数据被计算、处理了多次,产生的后果将会特别严重,到时候损失的就是真金白银了,而且很多其他业务也可能受到影响。
此外kafka的幂等性也是实现kafka exactly-once语义的基础,对于kafka来说还是很重要的。
有序性在数据有更新以及某些依赖数据顺序的业务场景意义很大,因为数据乱序造成的数据污染或者业务出错显然是不可接受的。
众所周知,kafka是顺序读取数据的,所以kafka的有序性取决于生产者的生产数据的有序性。
在了解了这几个特性的使用场景后,下面来看一下kafka是如何实现这些特性的:
kafka的数据在绝大多数情况下安全的,但是某些极端的情况还是可能导致kafka丢数据的。比如下面这些情况:
下面详细说下这些场景:
发送数据后,忽略ack直接确认成功:即kafka的at most once语义。生产者认为数据生产成功,但是broker数据处理失败。造成数据丢失 consumer在消费数据和commit offset时会遇到下面的场景:
先commit offset,再执行业务逻辑:commit成功,数据处理失败。造成数据丢失 先执行业务逻辑,再commit offset:commit成功,异步数据处理失败。造成数据丢失 还有一种情况就是kafka自身数据的安全性,在某些节点下线后,仍能对外提供服务,保证数据不丢失。但是kafka broker自身的问题也会造成数据“丢失”,这里的丢失代表的意思是无法对外提供服务,毕竟数据写入kafka后存储在磁盘上。考虑下下面的场景:
kafka数据无备份,某台broker挂掉后,这台broker上的数据将“丢失”且无法对外提供服务 producer数据发送到broker后,数据在leader节点写入成功即返回ack成功,在leader向replica节点同步数据时,leader节点宕机。造成producer端认为数据生产成功,而broker端数据“丢失”
显然针对上面的场景,kafka都有相关的预案。下面就从生产数据安全、消费数据安全以及kafka自身存储数据安全三个场景来说明下这个问题:
生产者数据安全性
producer的acks设置的修改,先看看acks的取值:
另外,producer发送消息的模式最好选用异步方式,可以提升性能;并且通过回调函数的方式处理发送后的一些逻辑处理,如打印日志或者数据统计等。
消费者数据安全性
consumer出现丢数据的问题主要还是enable.auto.commit这个配置项。这个配置项的配置为true就是kafka自动commit offset;而设置为false就是手动commit offset。
自动提交只要程序出问题,基本上就会出现丢数据的问题。而手动提交如果处理不好,也会存在丢数据的问题,下面我们就来分析下这两种提交方式的区别,以及正确的使用方法。
自动提交
在Kafka 中默认的消费位移的提交方式是自动提交。这个由消费者客户端参数enable.auto.commit 配置,默认值为true。 这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交。这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒。另外此参数生效的前提是enable.auto.commit参数为true。 自动位移提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一轮消费的位移。 手动提交
手动提交分为同步提交(commitSync)和异步提交(commitAsync)两种方式。
同步提交最大的问题是数据是批量处理的时候,当部分数据完成消费,还没来得及提交offset就被中断,则会使得下次消费会重复消费那部分已经消费过的数据。而且同步提交会阻塞线程,造成整体性能受影响。
异步提交不会阻塞线程,比起同步提交commitSync具有更好的性能。但是需要处理回调函数,也需要在某些极端场景下如rebalance操作时有相应的处理。
综上,常规的做法就是数据消费处理流程中使用异步提交的方式commit offset;而消费者正常退出的场景,我们可以使用,commitSync同步提交,保证offset的正确。例子如下:
try {
while(isRunning.get()) {
//poll records and do some data processing .
consumer.commitAsync() ;
}
) finally {
try {
consumer.commitSync() ;
) finally {
consumer.close() ;
}}
而rebalance时offset无法正常提交,导致数据重复消费,在这种场景下我们需要在监听到rebalance发生之前进行一次offset提交。例子如下:
//currentOffsets需要保存该消费者消费的分区的最新的offset
//本段代码中没有体现,可以在消费数据之后 进行更新该对象,并在rebalance之后清空该对象
Map currentOffsets =new HashMap<>() ;
consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () {
//发生在rebalance之前,并且消费者停止读取消息的时候
@Override
public void onPartitionsRevoked(Collection partitions) {
consume.commitSync(currentOffsets) ;
currentOffsets.clear();
}
@Override
public void onPartitions Assigned(Collection partitions) {
//do nothing .
}
});
kafka存储数据安全性
kafka数据由于是存储在硬盘上,所以kafka本身的安全性主要是数据的高可用性上,所以针对kafka数据的安全性主要是数据备份以及数据同步时候相关的策略配置。
首先是replication.factor配置参数,这个配置决定了副本的数量,默认是1。注意这个参数不能超过broker的数量。说这个参数其实是因为如果使用默认的1,或者不在创建topic的时候指定副本数量(也就是副本数为1),那么当一台机器出现磁盘损坏或者服务宕机等情况,那么数据也就从kafka里面丢失了。所以replication.factor这个参数最好是配置大于1,比如说3。 其次就是数据同步的时候导致的数据不可用。比如leader副本接收到数据,但还没同步给其他副本的时候就挂掉了,这时候数据也是丢失了。针对这种场景可以通过配置producer端的acks设置为-1可以解决问题,但是这样的代价是会影响kafka的部分性能以及吞吐量。 最后kafka有一个配置参数,min.insync.replicas,默认是1(也就是只有leader,实际生产应该调高),该属性规定了最小的ISR数。这意味着当acks为-1(即all)的时候,这个参数规定了必须写入的ISR集中的副本数,如果没达到,那么producer会产生异常。这个参数的作用是leader挂了之后,kakfa能快速从ISR中选出leader,最快速度恢复服务。
幂等性包含生产数据的幂等性以及消费数据的幂等性。下面就从这两个方面来说明:
幂等性producer kafka从0.11版本以后开始支持producer端的幂等性,通过下面的配置项就可以完成幂等性producer的创建:
props.put("enable.idempotence", true)
开启幂等性的时候,acks就自动配置成“all”了,如果这时候手动将acks设置为0,程序那么会报错。
而幂等性的producer实现逻辑也比较简单,即Kafka增加了pid和seq。Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个topic也会维护pid-seq的映射,并且每Commit都会更新lastSeq。这样recordBatch到来时,broker会先检查RecordBatch再保存数据:如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存(inSequence方法)。
虽然幂等性的producer解决了部分问题,但是还是有两个主要缺陷:
幂等性的producer仅做到单分区上的幂等性,即单分区消息不重复,多分区无法保证幂等性。 只能保持单会话的幂等性,无法实现跨会话的幂等性,也就是说如果producer挂掉再重启,无法保证两个会话间的幂等(新会话可能会重发)。因为broker端无法获取之前的状态信息,所以无法实现跨会话的幂等。 针对这种情况,kafka提供了事务性的producer来解决上述问题。
事务性producer kafka事务引入了transactionId和Epoch,设置开启事务后,一个transactionId只对应一个pid, 且Server端会记录最新的Epoch值。
这样有新的producer初始化时,会向TransactionCoordinator发送InitPIDRequest请求,TransactionCoordinator已经有了这个transactionId对应的meta,会返回之前分配的 PID,并把Epoch自增 1返回,这样当old producer恢复过来请求操作时,将被认为是无效producer抛出异常。
如果没有开启事务,TransactionCoordinator会为新的producer返回new pid,这样就起不到隔离效果,因此无法实现多会话幂等。
下面就是开启事务性producer的方法:
//初始化事务
producer.initTransactions();
try {
//开启一个事务
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
//提交
producer.commitTransaction();
} catch (KafkaException e) {
//出现异常的时候,终止事务
producer.abortTransaction();
}
上面就是对于producer端幂等性的实现。但是无论开启幂等还是事务的特性,都会对性能有一定影响,这是必然的。所以kafka默认也并没有开启这两个特性,大家也需要根据业务来权衡是否需要开启。
比较遗憾的是,kafka目前没有保证consumer幂等消费的措施,如果确实需要保证consumer的幂等,可以对每条消息维持一个全局的id,每次消费进行加锁去重。
至于耗费这么多的资源来实现consumer端的幂等性,进而实现kafka的exactly once的消费到底值不值,那就得根与业务进行权衡了。
下面一共一段伪代码:
if(cache.contain(msgId)){
// cache中包含msgId,已经处理过
continue;
}else {
lock.lock();
cache.put(msgId,timeout);
commitSync();
lock.unLock();
}
// 后续完成所有操作后,删除cache中的msgId,只要msgId存在cache中,就认为已经处理过。Note:需要给cache设置有消息
首先kafka的有序性只能保证是单分区数据有序,无法保证全局有序。
如果确实需要保证需要处理的数据有序,可以通过重写分区函数将需要顺序处理的数据写入到同一个分区进行处理。分区选择器需要实现org.apache.kafka.clients.producer.Partitioner接口。然后通过配置项ProducerConfig.PARTITIONER_CLASS_CONFIG进行配置指定。
再回到单分区数据有序的实现方式。这其实是kafka幂等性实现的一个附带效果。新版本kafka设置enable.idempotence=true后能够动态调整max-in-flight-request。该参数指定了生产者在收到服务端响应之前可以发送多少个batch消息,默认值为5。
当重试请求到来时,batch会根据seq重新添加到队列的合适位置,并把max.in.flight.requests.per.connection设为1,这样它前面的batch序号都比它小,只有前面的都发完了,它才能发。这样就在牺牲了部分性能和吞吐量的前提下,保证了数据的有序性。
最后路漫漫其修远兮,大数据之路还很漫长。如果想一起大数据的小伙伴,欢迎点赞转发加关注,下次学习不迷路,我们在大数据的路上共同前进!
本文由 mdnice 多平台发布