1、生产者分区策略
2、消费者消费策略
用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个
举个例子:假设为Topic A 创建了 10个分区,有三个消费者(C1,C2,C3),分配如下:
C1:0,1,2,3 C2:4,5,6 C3:7,8,9
RoundRobin 轮询策略
轮询策略是将消费组内所有的消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
举个例子:假设为Topic A 创建了 3个分区(P0,P1,P2),有两个消费者(C1,C2),按照字典排序分区变成了 P0->P1->P2,然后依次将P0分配C1,P1分配给C2,P2又分配给C1,最终分配如下:
C1: P0,P2 C2:P1
如需修改策略,修改consumer.properties配置
#轮训策略
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
#范围策略
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
3、消息队列的两种模式
首先需要知道:一个partition只能被一个Consumer Group组中一个消费者消费
4、生产者发送流程
kafka在发送消息的过程中,主要涉及两个线程main 线程和 sender 线程。
1、在 main 线程 中创建了一个双端队列 RecordAccumulator。
2、main 线程将消息发送给 RecordAccumulator。
3、sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker
(1)生产者发送时机
从上图可知,当有消息时,生产者不是立马发送到broken上,而是先写入内存的队列中,只有当满足以下两个条件时,才会发送到broken上:
以上两个参数均为生产者配置,其余生产者重要配置项如下:
(2)消息确认机制
消息发送到broker上,通过配置的acks来确认消息是否投递成功,acks有3个值可选 0、1和-1(或者all)默认值为1
5、kafka具有高吞吐量的原因
batch.size
一起来设置,可避免消息长时间凑不齐batch.size指定的大小,导致消息一直积压在内存里发送不出去的情况。默认大小是0ms(就是有消息就立即发送)。6、kafka如何解决数据重复性问题
从上面生产者发送流程可知,当生产者收到broker的acks回复时,表示消息发送成功,但有可能由于网络问题,导致生产者触发重试机制,导致消息重复,同样的,消费者再消费的时候,消息消费成功,但是再提交offset的时候失败,同样也会触发消息重试,导致消息重复:
1、生产者发送重复
再解决生产者幂等,只需要将Producer的enable.idempotence
配置项设为true,其原理是kafka引入了ProducerID和Sequence。
broker会为每个TopicPartition组合维护PID和序列号。对每条接收到的消息,都会检查它的序列号是否比Broker所维护的值严格+1,只有这样才是合法的,其他情况都会丢弃。当出现消息重试时,broker就会检测到有两条PID 一样且seq 也一致的的消息写入了Partition,并忽略掉重发的那一条
其中这种方式是有部分缺陷的:
单会话有效
只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
原因:重启之后标识producer的PID就变化了,导致broker无法根据这个<PID,TP,SEQNUM>条件去去判断是否重复。
单分区有效
只能保证单分区上的幂等性。即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
原因:在某一个partition 上判断是否重复是通过一个递增的sequence number,也就是说这个递增是针对当前特定分区的,如果你要是发送到其他分区上去了,那么递增关系就不存在了。
为解决以上问题,实现跨分区跨会话的事务kafka又引入一个全局唯一的 Transaction ID,并将 Producer 获得的 PID 和 Transaction ID 绑定。这样当Producer重启之后,就可以通过正在运行的 Transaction ID 获得原来的 PID。
为了管理 Transaction,Kafka引入了一个新的组件 Transaction Coordinator。Producer就是通过 Transaction Coordinator 获得 Transaction ID 对应的任务状态。Transaction Coordinator还负责将事务状态写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复。
2、消费者消费重复
一般又业务方自行保证,如将唯一键存入第三方介质,要操作数据的时候先判断第三方介质(数据库或者缓存)有没有这个唯一键。
7、kafka消息有序性
kafka的有序性 - dreamness的个人空间 - OSCHINA - 中文开源技术交流社区