生产者、Broker、消费者、Zookeeper;
注意:Zookeeper中保存Broker id和消费者offsets等信息,但是没有生产者信息。
Apache Kafka是由Apache开发的一种发布订阅消息系统
,它是一个分布式的、分区的和重复的日志服务
。
名称 | 说明 |
---|---|
Topic | 主题,可以理解为一个队列 |
Partition | 分区,为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序 |
Offset | 偏移量,kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka |
Broker | 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic |
Producer | 消息生产者,向kafka broker发消息的客户端;生产者发布消息时根据消息是否具有键采用不同的分区策略,消息没有键时通过轮询方式进行客户端负载均衡,如果有键则根据分区语义确保相同键发送至同一分区。 |
Consumer | 消息消费者,向kafka broker取消息的客户端;消费者读取一个分区消息的顺序和生产者写入分区的顺序是一致的。 |
Consumer Group | 消费者组,这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;同一消费者组下的多个消费者互相协调消费工作,每个消费者都会均匀分配到分区,每增加或者减少消费者都会触发消费者管理协议的平衡操作。消费者数量最好和分区数一致,避免有空闲消费者。 |
1.消息分类按不同类别,分成不同的Topic,Topic⼜拆分成多个partition,每个partition均衡分散到不同的服务器(提⾼并发访问的能⼒)
2.消费者按顺序从partition中读取,不⽀持随机读取数据,但可通过改变保存到zookeeper中的offset位置实现从任意位置开始读取
3.服务器消息定时清除(不管有没有消费)
4.每个partition还可以设置备份到其他服务器上的个数以保证数据的可⽤性。通过Leader,Follower⽅式
5.zookeeper保存kafka服务器和客户端的所有状态信息.(确保实际的客户端和服务器轻量级)
6.在kafka中,⼀个partition中的消息只会被group中的⼀个consumer消费;每个group中consumer消息消费互相独⽴;我们可以认为⼀个group是⼀个"订阅"者,⼀个Topic中的每个partions,只会被⼀个"订阅者"中的⼀个consumer消费,不过⼀个consumer可以消费多个partitions中的消息
7.如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
8.如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会⼴播给所有的消费者.
9.持久性,当收到的消息时先buffer起来,等到了⼀定的阀值再写⼊磁盘⽂件,减少磁盘IO.在⼀定程度上依赖OS的⽂件系统(对⽂件系统本身优化⼏乎不可能)
10.除了磁盘IO,还应考虑⽹络IO,批量对消息发送和接收,并对消息进⾏压缩。
11.在JMS实现中,Topic模型基于push⽅式,即broker将消息推送给consumer端.不过在kafka中,采⽤了pull⽅式,即consumer在和broker建⽴连接之后,主动去pull(或者说fetch)消息;这种模式有些优点,⾸先consumer端可以根据⾃⼰的消费能⼒适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.
12.kafka⽆需记录消息是否接收成功,是否要重新发送等,所以kafka的producer是⾮常轻量级的,consumer端也只需要将fetch后的offset位置注册到zookeeper,所以也是⾮常轻量级的.
对于⼀些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.
不过到⽬前为⽌,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性"“消息传输担保(消息确认机制)”"消息分组"等企业级特性;
kafka只能使⽤作为"常规"的消息系统,在⼀定程度上,尚未确保消息的发送与接收绝对可靠(⽐如,消息重发,消息发送丢失等),kafka的特性决定它⾮常适合作为"⽇志收集中⼼";application可以将操作⽇志"批量""异步"的发送到kafka集群中,⽽不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端⽽⾔,⼏乎感觉不到性能的开⽀.
consumer端采⽤批量fetch⽅式,此时consumer端也可以使hadoop等其他系统化的存储和分析系统
Kafka机器数量 = 2 *(峰值生产速度 * 副本数 / 100)+ 1
一般设置成2个或3个,很多企业设置为2个。
副本优势:提高可靠性;
副本劣势:增加了网络IO传输。
Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。
默认保存7天;生产环境建议3天
每天总数据量100g,每天产生1亿条日志,10000万/24/60/60=1150条/每秒钟
平均每秒钟:1150条
低谷每秒钟:50条
高峰每秒钟:1150条 *(2-20倍)= 2300条 - 23000条
每条日志大小:0.5k - 2k(取1k)
每秒多少数据量:2.0M - 20MB
每天的数据量(100g) * 副本数(2个副本) * 日志保存时长(3天) / 70%
自己开发监控器;
开源的监控器:KafkaManager、KafkaMonitor、KafkaEagle
分区数一般设置为:3-10个
1)创建一个只有1个分区的topic
2)测试这个topic的producer吞吐量和consumer吞吐量。
3)假设他们的值分别是Tp和Tc,单位可以是MB/s。
4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)
例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;则分区数 = 100 / 20 = 5个分区
通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。
Kafka中的ISR(In-Sync Replica)是指与Leader副本保持同步的副本集合。(ISR中包括Leader和Follower)。 ISR中的副本可以参与消息的读取和写入。如果一个副本落后于Leader副本的进度,或者发生了不可恢复的错误,那么这个副本将被移除ISR,存入OSR(Outof-Sync Replicas)列表(新加入的Follower也会先存放在OSR中同时等待恢复或者被替换)。
ISR副本同步队列是Kafka中一种用于同步副本的机制。当ISR中的某个副本出现了错误或者落后于Leader副本时,Kafka需要将这个副本移除ISR,并找到一个新的副本来替换它。Kafka通过ISR副本同步队列来管理新副本的同步进度。
当需要替换ISR中的一个副本时,Kafka会从备份副本中选择一个新的副本来代替它。在新副本与Leader副本同步之前,它不会被添加到ISR中。Kafka会将新副本的同步进度记录到ISR副本同步队列中。当新副本与Leader副本的同步进度达到一定程度时,它就可以加入ISR中,参与消息的读取和写入了。
如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)
和replica.lag.time.max.ms(延迟时间)
两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列
。
在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。
Range是默认策略。Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
例如:我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽。
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区
RoundRobin方式第一步将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。
重新启动 Kafka:如果 Kafka 只是因为临时的网络问题、内存不足等原因导致的崩溃,可以尝试重启 Kafka。这可能会解决问题,但不适用于持续出现故障的情况。
切换到备份 Kafka:如果有备份 Kafka 集群,可以尝试将生产者和消费者重定向到备份 Kafka,直到主 Kafka 恢复正常。
Kafka 自动故障转移:Kafka 支持自动故障转移,即在主 Kafka 节点宕机时自动将副本切换为主节点。可以使用 ZooKeeper 实现 Kafka 的自动故障转移,ZooKeeper 会监视 Kafka 集群中的节点,并在检测到主节点宕机时自动将副本切换为主节点。
增加 Kafka 的硬件资源:如果 Kafka 经常出现性能问题,可以尝试增加 Kafka 集群的硬件资源,例如增加内存、CPU、磁盘等,以提高 Kafka 的吞吐量和稳定性。
优化 Kafka 的配置:可以根据实际情况调整 Kafka 的配置参数,例如调整 Kafka 的内存限制、调整 Kafka 的缓存大小、调整 Kafka 的副本数量等,以提高 Kafka 的性能和稳定性。
查找并解决 Kafka 的故障:可以使用 Kafka 的日志、监控工具等来查找并解决 Kafka 的故障,例如查找 Kafka 的磁盘空间是否已满、查找 Kafka 的网络连接是否正常、查找 Kafka 的进程是否崩溃等。
kafka 的 ack 机制:在 kafka producer发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到,其中状态有 0,1,-1。
ack在生产者指定,不同生产者可以不同。
Ack = 0,相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack = 1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack = -1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。
ack设为-1时,需要ISR里的所有follower应答,想要真正不丢数据,需要配合参数min.insync.replicas: n --(ack为-1时生效,ISR里应答的最小follower数量)
,min.insync.replicas默认为1(leader本身也算一个!),所以当ISR里除了leader本身,没有其他的follower,即使ack设为-1,相当于1的效果,不能保证不丢数据。 需要将min.insync.replicas设置大于等于2,才能保证有其他副本同步到数据。
retries = Integer.MAX_VALUE --(无限重试)。
如果上述两个条件不满足,写入一直失败,就会无限次重试,保证数据必须成功的发送给两个副本,如果做不到,就不停的重试,除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失
kafka-topics.sh --bootstrap-server hadoop1:9092 --create --topic testisr2
--replication-factor 3 --partitions 4 --config min.insync.replicas=2
完全不丢结论:ack=-1 + min.insync.replicas>=2 +无限重试
① 副本数大于1,min.insync.replicas大于1 消息至少要被写入到这么多副本才算成功;
每个 broker 中的 partition 我们一般都会设置有 replication(副本)的个数,生产者写入的时候首先根据分发策略(有 partition 按partition,有 key 按key,都没有轮询)写入到 leader 中,follower(副本)再跟 leader 同步数据,这样有了备份,也可以保证消息数据的不丢失。
② unclean.leader.election.enable=false 关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失。
enable.auto.commit=false 关闭自动提交offset,处理完数据之后手动提交
flink结合checkpoint
Kafka数据重复的原因可能有多种,下面列举几种比较常见的情况:
① 消费者在处理数据时出现异常导致消费失败,但是由于Kafka自动提交offset的机制,导致已经消费过的数据的offset没有提交成功,下次启动时会重新消费这些数据。
② 消费者在处理数据时出现重复消费的情况,这可能是由于业务逻辑不严谨或者数据消费的幂等性没有保证。
③ Kafka的消息传输机制导致消息重复。 比如消息在传