如图kafka在三个阶段可能出现消息丢失,分别是生产消息、消费消息、页缓存操作后异步刷盘。
生产消息丢失原因有两个:
producer.send(msg)
。由于网络抖动,导致消息没有发送到broker。解决方案:
producer.send(msg, callback)
。记住,一定要使用带有回调通知的 send 方法。acks = all
。也就是所有的ISR副本都要接收到消息,才算已提交。retries
为一个较大的值。当出现网络的瞬时抖动时,消息发送可能会失败,因此设置自动重试。丢失原因:
kafka broker集群接收到数据后会将数据持久化存储到磁盘,消息会先写入到页缓存,然后由操作系统负责具体的刷盘任务或者使用fsync强制刷盘。如果此时broker宕机,并且选举出落后leader很多的follower副本时,会造成消息丢失。
解决方案:
unclean.leader.election.enable = false
。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。replication.factor >= 3
。这也是 Broker 端的参数。最好将消息多保存几份。min.insync.replicas > 1
。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。消息丢失原因:
消费者配置了自动提交消费者位移enable.auto.commit=true
。消费者收到消息后,便自动提交offset。实际消息处理时,如果出现消费者宕机或者处理异常时,出现消息丢失。
解决方案:
enable.auto.commit
,最好把它设置成 false,并采用手动提交位移的方式。这对于单 Consumer 多线程处理的场景而言是至关重要的。消费者端可能还可以补充一条,当消费者位移丢失时,偏移量设置策略spring.kafka.consumer.auto-offset-reset=earliest
,如果设为lastest,可能存在某些消费者丢失数据。