上篇文章说了,kafka可以通过实现partitioner自定义分区,producer拦截器,拦截器是在producer发送消息之后,回调之前调用,里面主要重写两个方法,一个是onSend,可以重新定义发送的消息,一个是在回调之前调用,onAcknowledgement在回调之前调用,可以记录发送成功或者失败的消息数量。无消息丢失配置,首先保证一个问题,消息不会丢失,要acks设置为all或者-1,这样send回调才会生效,这时候还会存在一个问题,当网络瞬时故障时候,会出现乱序发送,乱序的出现是因为retries重试,这时候必须只能在同一时刻在同一个broker只能发送一次,max.in.flight.request.per.connection。还有参数replication.factory三备份原则,Min.insync.replica至少写入多少副本。
Kafka消息分区&producer拦截器&无消息丢失(八)https://blog.csdn.net/ke1ying/article/details/126212509
压缩能显著降低磁盘占用和宽带占用,从而有效提升I/O密集型性能,不过在积压的同时,也会消耗额外的CPU,因此压缩是I/O性能和CPU资源的平衡(trade-off)。
Kafka在0.7.0版本开始支持压缩特性,producer能将一批消息压缩成一条消息发送,而broker将这个压缩消息写入本地日志文件。当consumer接受到这条消息时候,会对消息进行解压,还原成初始的消息返回给用户。总结就是,producer压缩,broker保持,consumer则解压。Broker端保持通常情况下不会进行解压操作,除非需要进行消息格式转换,那么broker端就需要对消息进行重新压缩。
Kafka支持的压缩算法
三种压缩算法:GZIP,Snappy和LZ4。默认情况下,kafka是不会压缩消息的,需要指定compression.type。值得一提的是,facebook开源了新的压缩算法Zstandard,效率更高,kafka也在后续计划中支持Zstandard压缩。
算法性能比较优化
熟悉kafka源码的同学大家都知道,kafka的send主要耗时在压缩消息上,所以消息压缩性能至关重要。
对于kafka而言,LZ4>Snappy>GZIP,原本LZ4和Snappy性能差不多,但是因为kafka源码 某个关键参数进行了硬编码,使得后面两位表现不够优秀。
那么到底压缩要不要使用呢,我们则需要分析服务器I/O消耗和cpu消耗对比,比如producer消耗了大量的网络宽带或者broker端cpu占用率非常高,而producer端cpu资源非常富裕,那么可以考虑为producer开启消息压缩,反之则不需要压缩消息。
其次,压缩时间也与batch大小息息相关,batch大小越大,则压缩的时间越长,不过时间增长不是线性,而是越来越平缓,如果发现压缩很慢,说明系统的瓶颈在用户主线程而不是IO发送线程,因此可以考虑增加多个用户线程同时发送消息,这样可以提升producer吞吐量。
实际环境中只使用一个用户主线程通常无法满足所需的吞吐量目标,因此需要构造多个线程或者多个进程来同时给kafka集群发送消息。
多线程单kafkaProducer顾名思义就是全局构造一个producer,然后多个线程共享一个producer,因为kafkaProducer是线程安全的,所以这种方法也是线程安全的。
优点:实现简单性能好。
缺点:1)所有线程共享一个内存缓冲区,需要较多内存。2)一旦producer某个线程崩溃导致producer实例被破坏,则所有线程都无法工作。
多线程多kafkaProducer可以在每个producer主线程中构造一个kafkaProducer,并且保证此实例在该线程中封闭,thread confinement,线程封闭是实现线程安全重要手段之一。
优点:每个用户都有专属的producer实例,缓冲区空间及一组对应的参数配置,可以进行细颗粒度调优。单个kafkaProducer崩溃不会影响其他producer线程。
缺点:需要较大的内存分配开销。
Consumer是读取kafka集群中某些topic消息的应用程序,在当前kafka生态中,consumer可以由多种编程语言实现,比如常见的java、c++、go等。
在之前版本中,kafka开源时候是由scala语言编写consumer客户端,我们这里称为scala consumer或者old consumer,旧版本的consumer。随着时间推移,发现旧版本的 consumer有很多设计缺陷,例如在旧版本中如果不使用 consumer group,而直接使用low-level consumer,用户必须实现错误处理和故障转移。因此在新版本退出new consumer,由java编写。
事实上,这两个版本设计上差距极大,很多用户甚至无法区分自己使用哪个版本的consumer,特别是新版本的consumer颠覆了旧版本的管理和保存位移机制,这也是为什么当用户切换到新版本consumer之后,会抱怨kafka-manager监控框架(yahoo开源的kafka监控框架)无法监测consumer位移信息的原因。
新版本 java consumer:主要使用kafkaConsumer。(org.apache.kafka.clients.consumer.*)
旧版本 scala consumer:zookeeper Consumer ,simpleConsumer。(kafka.consumer.*)
除了要正确区分consumer版本,我们还要了解consumer分类。严格来说,kafka分类不是kafka社区已有的概念。我们只有理解他们的分类和区别,我们才可以在实际场景中选择更合理的consumer分类来帮我们完成业务需求。Consumer分为如下两大类:
消费者组:consumer group
独立消费者:standalone consumer
这里我们先了解consumer group是由多个consumer instance(消费者实例)构造成一个整体进行消费,而standalone consumer则是单独消费的。
我们在讨论consumer或者开发consumer程序时候,必须明确给出消费者上下文consumer context,即所有consumer 版本以及consumer 分类。
Kafka官方一句话是:消费者使用一个消费组名(groupId)来标记自己,topic的每条消息都只会被发送到每个订阅它的的消费者组的一个消费实例上。
总结:
我们在第一章提到过,kafka同时支持基于 队列 和基于 发布/订阅 两种消息引擎模型,事实上是通过consumer group来实现对这两种模型的支持。
所有consumer实例都属于相同group---实现基于队列模型,每条消息只会被一个consumer实例处理。
Consumer实例都属于不同group---实现基于发布/订阅的模型,极端的情况每个consumer实例都设置不同的group,这样kafka消息就会广播到所有consumer实例上。
图上清晰展示了两个consumer group订阅相同的topic场景,图中topic有p0,p1,p2和p3这4个分区,分别保存在broker1和broker2上,具体分配如下,
GroupA只有两个consumer实例,每个实例消费两个分区的数据,而group B有4个consumer实例,每个实例分别消费一个分区的数据。也很好的说明了kafka为consumer分配消息时候可以做到公平分配。
那么我们为什么需要多个consumer group呢?我们把多个 consumer实例放在一个group里有什么好处吗?实际上,consumer group是用于高伸缩性,高容错性的consumer机制。组内多个consumer实例可以同时读取kafka消息,而一旦某个consumer挂了,group会立即崩溃,这时候负责的分区交给其他consumer负责,从而保证group可以正常工作。这过程我们称呼为 重平衡(rebalance)。
另外由于kafka目前只提供单个分区内的消息顺序,而不会维护全局消息顺序,因此如果用户要实现topic全局消息顺序,就只能通过让每个consumer group下只包含一个consumer实例的方式来间接实现的。
所以group特点:
这里的offset指的是consumer的,并不是分区日志中的offset。每个consumer都会维护一个位置来记录当前消费了多少消息,很多消息引擎是把记录维护在broker上,这样做的好处是实现简单,但以下有三个问题:Broker从此变成了有状态的,增加同步成本,影响伸缩性。
需要引入应答机制(acknowledgement)来确认消费。
由于要保存多个consumer的offset,必然要引入复杂数据结构,资源则会有不必要的浪费。
Kafka则选择不同的方式,让consumer group来保存offset,那么只要简单的保存长整数就可以了,而且consumer还引入了检查机制,定期对offset持久化,从而简化了应答机制。
consumer客户端定期会向kafka集群汇报自己消费数据的进度,这一过程被称为位移提交offset commit。位移提交对consumer非常重要,不仅象征消费进度,也决定了consumer消费语义保证。
新版本和旧版本提交位移方式完全不同:旧版本会定期将位移信息提交到zookeeper下的固定节点。
随着kafka不断更新,社区发现consumer把移位提交给zookeeper不太合适,zookeeper本质是一个协调服务组件,他并不适合作为移位的存储,毕竟频繁的读写并不是zookeeper擅长的事。于是在0.9.0.0版本,kafka推出新版本consumer中,不再提交给zookeeper,而是把位移提交给kafka的内部一个topic中(__consumer_offsets),注意,这个前面有两个下划线。
__consumer_offsets:
首先这个topic通常是给新版本的consumer使用的,旧版本虽然也提供,但是需要offsets.storge=kafka设置,很少会有人这么用。
其次,这个topic是kafka自行创建的,因此用户不可以在文件下自行删除这些日志,通常情况下,这些有50个文件,打开任意一个文件都会发现他是个正常的kafka topic日志文件目录,里面至少有一个日志.log文件,和两个索引文件.index和.timeindex。只不过该日志保存的消息都是kafka一群上consumer的位移信息罢了。
可以把这个想象成kv格式消息,key则是groupid+topic+分区号,而value就是offset值,每当更新同一个key的值时候,就会在该topic写入一条最新offset数据。同时kafka会对这个进行压实操作(compact),即每个key只保存含最新的offset消息,这样避免对分区日志的修改,也控制了总体日志容量。
考虑到一个kafka生产环境可能有多个consumer或consumer group,如果这些consumer同时提交位移,则必将加重__consumer_offsets的写入负载,因此社区特意创建了50个分区,对每个group_id进行hash取模运算,从而分散到不同的分区上。
我们在后面监控相关章节会介绍如何利用_consumer_offsets定位并读取consumer group的offset。
标题中特意强调了consumer group,如果是standalone consumer,则没有重平衡rebalance概念,所以只对consumer group奏效。
何为rebalance,这是会consumer平衡分配,比如有一个group组,他有20个consumer实例,那么他订阅了一个topic有100个分区,那么100个分区可以重平衡分给20个consumer实例,每个实例五个。新旧版本都有重平衡过程,只是有些不同。