• kafka消费的完整解决方案


    无论是kafka,还是RocketMQ,rabbitMQ等,与springboot的结合得益于spring的强大,使得变的非常easy,但依然知识简单的使用变的非常容易,如果要达到理想的结果,不仅需要他们对原理熟悉一点,还要对spring提供的sdk熟悉,下面就看一下kafka的使用,以及需要解决的一些问题。

    1. 引入依赖
                <!--kafka-->
                <dependency>
                    <groupId>org.springframework.kafka</groupId>
                    <artifactId>spring-kafka</artifactId>
                    <version>2.8.2</version>
                </dependency>
    • 1
    1. 消费
    /**
         * 消费者监听.
         *
         * @param message 消息内容
         * @param ack     ack
         */

        @KafkaListener(topics = {"test_topic"})
        public void listener(String message) {
           //消费落库
        }

    • 1
    1. 配置kafka地址

    spring:    
      kafka:
        bootstrap-servers: localhost:9092
    • 1

    通过以上三步,基本上就可以成功消费到。

    but

    如果你在公司写这样的代码,肯定要被吐槽的,因为这段代码和配置,只能简单的消费,并不能解决消息丢失,重复消费,并发消费,消费能力不足或者浪费资源等问题。

    接下来一一改造成理想的样子。

    消息丢失的解决方法

    1. 生产者层面,Kafka消息发送有两种方式:

      同步(sync)和异步(async),默认是同步方式,可通过 producer.type)属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:

    • 0一表示不进行消息接收是否成功的确认:
    • 1一表示当Leader接收成功时确认;
    • -1一表示Leader和Follower都接收成功时确认:

    这是发送消息阶段需要根据需要去配置的。可以配置-1.但是效率是最低的。

    1. 当然还有另一种情况就属于与业务层面,消费后kafka的offset被自动提交了,但实际上业务并没有成功消费。

      针对这种情况,可以设置手动提交,配置enable.auto.commit为false。手动 去提交offset,代码改造为:

       @KafkaListener(topics = {"test_topic"})
        public void listener(final String message, final Acknowledgment ack) {
            //消费业务代码
            //...
            
            //提交offset
            ack.acknowledge();
        }
    • 1

    消息重复消费

    先看一下设置为手动提交offset后,产生的三种情况:

    • 1.如果在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。
    • 2.如果有消费过程中有几条或者一批数据没有提交offset,后面其他的消息消费后正常提交0ffset . 那服务端会更新为消费后最新的offset,不会重新消费,就算重启程序也不会重新消费。
    • 3.消费者如果没有提交offset,程 序不会阻塞或者重复消费,除非在消费到到这个你不想提交的offset的消息时,你尝试重新初始化一个客户端消费者,即可再次消费这个未提交offset的数据。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在你重新初始化客户端消费者之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置 开始消费。

    接下来根据情况来解决

    1. 手动提交offset,如果消费的时候业务代码没有完全执行结束,导致偏移量没有提交;

      经过测试,如果消费业务代码出现异常导致ack.acknowledge()没有执行,kakaf会重试多次进行消费。

      此时我们的业务代码就要处理这种插入数据的场景产生的重复数据落到数据库里;

      第一种解决办法就是在insert的时候使用INSERT INTO ...ON DUPLICATE KEY UPDATE语法,不存在时插入,存在时更新,是天然支持幂等性的。

    第二种解决办法,就是通过redis,根据业务的唯一键来存储到redis,每次消费时判断是否消费过,但一定要设置一个过期时间。

    第三种情况是,如果你的消费者的concurrency设置的是1,没有并发的情况,那你可以先查库判断库里面是否有,再进行插入。(concurrency相当于消费线程,也相当于消费者,机器数量*concurrency <= 分区数

    1. 消费端重复发送了

      此时也可以用上面第一种所描述的方案

    其实不管那种情况导致的重复消息,解决方案在业务里是一成不变的。

    本文由 mdnice 多平台发布

  • 相关阅读:
    谷粒学院——后台管理系统功能模块
    408王道操作系统强化——文件管理及大题解构
    vuex基础用法1.0
    网络协议之:Domain name service DNS详解
    你接受不了60%的暴跌,就没有资格获得6000%的涨幅 2021-05-27
    C#:实现鸡尾酒定向冒泡排序算法(附完整源码)
    六月集训(第28天) —— 动态规划
    【设计模式】Java设计模式 - 桥接模式
    网络协议的重要性与应用:理解进程间通信和网络分层结构(上)
    docker-compose安装redis
  • 原文地址:https://blog.csdn.net/weixin_38019299/article/details/125563533