• 如何配置 Kafka 无消息丢失


    在这里插入图片描述
    如图kafka在三个阶段可能出现消息丢失,分别是生产消息、消费消息、页缓存操作后异步刷盘。

    生产消息

    生产消息丢失原因有两个:

    1. kafka生产端异步发送消息,不管broker是否响应,立即返回,例如producer.send(msg)。由于网络抖动,导致消息没有发送到broker。
    2. kafka生产端发送消息超过大小限制,broker端接收到后无法存储。

    解决方案:

    1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
    2. 设置 acks = all。也就是所有的ISR副本都要接收到消息,才算已提交。
    3. 设置 retries 为一个较大的值。当出现网络的瞬时抖动时,消息发送可能会失败,因此设置自动重试。
    4. 定义本地消息日志表,定时任务扫描表自动补偿,做好监控预警;提供手动补偿措施。
    broker端

    丢失原因:

    kafka broker集群接收到数据后会将数据持久化存储到磁盘,消息会先写入到页缓存,然后由操作系统负责具体的刷盘任务或者使用fsync强制刷盘。如果此时broker宕机,并且选举出落后leader很多的follower副本时,会造成消息丢失。

    解决方案:

    1. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
    2. 设置 replication.factor >= 3。这也是 Broker 端的参数。最好将消息多保存几份。
    3. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
    4. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
    消费者端

    消息丢失原因:

    消费者配置了自动提交消费者位移enable.auto.commit=true。消费者收到消息后,便自动提交offset。实际消息处理时,如果出现消费者宕机或者处理异常时,出现消息丢失。

    解决方案:

    1. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。这对于单 Consumer 多线程处理的场景而言是至关重要的。
    2. 消费者多线程处理业务逻辑,等所有线程处理完成后,才手动提交offset。
    3. 消费者做幂等处理,防止重复消费。

    消费者端可能还可以补充一条,当消费者位移丢失时,偏移量设置策略spring.kafka.consumer.auto-offset-reset=earliest,如果设为lastest,可能存在某些消费者丢失数据。

  • 相关阅读:
    解决Maven依赖下载缓慢的问题(亲测管用)
    9.0 HashMap底层原理及部分源码分析
    同时看过 unreal4 和 Unity 源代码的人觉得哪个引擎架构更好?
    Python-函数进阶
    基于SSM的学生实践管理平台开发
    SSM+基于微信小程序的航空售票管理系统 毕业设计-附源码191111
    SRv6----IS-IS扩展
    来自五年架构师的职业感悟,学历+路线+风口,助你成就美好未来
    Promise的简单用法
    kafka---- zookeeper集群搭建
  • 原文地址:https://blog.csdn.net/LIZHONGPING00/article/details/126844042