• spring设置kafka超时时间没有生效的解决方法(解决rebalancing问题)


    一、前言

    最近生产kafka遇到一个问题,总是隔几分钟就rebalancing,导致没有消费者、消息堆积;
    平衡好后,正常消费消息几分钟后,就又开始rebalancing,消息再次堆积,一直循环。

    登录kafka服务器,用命令查看kafka组:

    //组名是commonGroup,java里设置的
    ./kafka-consumer-groups.sh --bootstrap-server 10.123.123.123:9092 --group commonGroup --describe
    
    • 1
    • 2

    就会发现报错:

    warning: Consumer group 'commonGroup' is rebalancing.
    
    • 1

    此时组里的所有topic都会没有消费者。

    再查看消费者(java后台)的日志,会发现大量的rebalancing语句,与重新加入分组的语句:

    //这个是心跳发送失败报错的日志,因为此时在rebalancing
    2022-08-25 17:55:41.801 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=commonGroup] Attempt to heartbeat failed since group is rebalancing
    
    //这个是重新加入分组的日志,重新加入了commonGroup组里的topic为examTake的第13个分区(生产topic分了14个区)
    2022-08-30 16:29:27.434 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  o.s.kafka.listener.KafkaMessageListenerContainer - partitions assigned: [examTake-13]
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这个现象会导致消息堆积2-3分钟,然后消息会统一被消费一波,然后继续堆积2-3分钟消息;
    因为kafka不知道为什么总是rebalancing,每次平衡需要2-3分钟时间,此时没有消费者;
    平衡好后,消息被消费者消费一波,就又开始rebalancing。

    用户明显感觉到系统变慢,需要想办法解决这个问题。

    二、可能的原因

    百度发现,kafka rebalancing发生的情况,主要有这几种:

    1.有消费者新增/减少

    如果启动了新的java程序,增加了消费者、或者有消费者挂了,kafka就会重新平衡;
    但是排查后发现,所有消费者日志打印正常,没有挂掉的,也没有新增消费者,所以不是这个问题。

    2.有消费者在规定时间内未发送心跳包

    spring里可以配置kafka的session超时时间(默认10秒):

    spring.kafka.properties.session.timeout.ms = 10000
    
    • 1

    以及心跳包发送时间间隔(默认隔3秒发送一次):

    spring.kafka.properties.heartbeat.interval.ms = 3000
    
    • 1

    如果有消费者在session规定时间内没有发送心跳包,kafka就会认为该消费者不可用,开始rebalancing。
    但是排查后发现,项目里配置的超时时间是15秒,心跳包间隔时间没有配置(默认3秒),感觉不应该有消费者15秒内一次心跳包也发不出去(消费者日志打印正常,没有挂掉的),所以不确定是不是这个问题。

    3.有消费者在规定时间内没有处理完消息

    spring里可以配置消费者一次拉取的消息数(默认500,低版本kafka好像不支持修改):

    spring.kafka.consumer.max-poll-records=500
    
    • 1

    以及消费消息的超时时间(默认5分钟):

    spring.kafka.properties.max.poll.interval.ms=300000
    
    • 1

    如果有消费者在规定时间内没有处理完消息,那么也会引起kafka的rebalancing。
    但是排查后发现,kafka里的待消费消息数很低时(几条-几十条),仍然会隔几分钟就rebalancing一次,然后消费者会很快把消息全部消费完,就算是这样kafka后续还是会rebalancing。这样看来也不是这个问题。

    三、设置kafka超时时间没有生效的解决方法

    1.问题描述

    虽然感觉不像是这几个原因导致kafka反复重新平衡的,但是还是得尝试解决。

    因此,按照网上的方法,在spring项目里的application.properties中进行了配置,增加了超时时间:

    //心跳超时时间(session超时时间)增加成25秒(之前项目设置了15秒)
    spring.kafka.properties.session.timeout.ms = 25000
    
    //每次拉取的消息减少为20(之前是默认值500)
    spring.kafka.consumer.max-poll-records=20
    
    //消息消费超时时间增加为10分钟
    spring.kafka.properties.max.poll.interval.ms=600000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    但是配置了之后,启动项目,发现这些配置都没有生效,kafka打印的参数还是之前的:

    max.poll.interval.ms = 300000
    max.poll.records = 500
    session.timeout.ms = 15000
    
    • 1
    • 2
    • 3

    尤其是max.poll.records参数,这个都可以点进jar包里了,不应该不生效的:
    在这里插入图片描述
    在这里插入图片描述

    2.解决方法

    (1)百度发现,低版本kafka好像不支持修改max.poll.records;不过目前项目中不是低版本kafka,应该是可以设置的;而且其它参数总是可以设置的,问题是不知道为什么没有生效。

    (2)找了半天,发现项目中有一个KafkaConfig.java,其中部分配置为:

        @Value("${kafka.session.timeout.ms:15000}")
        private String sessionTimeout;
    
        @Value("${kafka.consumer.max.poll.records:500}")
        private String maxPollRecords;
    
        @Value("${kafka.max.poll.interval.ms:300000}")
        private String maxPollIntervalMs;
    
        @Value("${kafka.group.id:commonGroup}")
        private String groupId;
        
        private Map consumerConfigs() {
            Map props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            //这个是组id
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            //这个是心跳(session)超时时间
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            //这个是每次拉取的消息数量
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
            //这个是消费消息的超时时间
            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaIntegerDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    (3)这下application.properties中配置了kafka参数没有生效的原因找到了,看来是java与application.properties中同时配置了kafka参数的话,会以java中配置的为准。

    3.结果

    修改java中的kafka配置后,启动日志显示配置生效了:

    max.poll.interval.ms = 600000
    max.poll.records = 20
    session.timeout.ms = 25000
    
    • 1
    • 2
    • 3

    然而,项目用这个配置启动后,kafka反复rebalancing的状况还是没有好,并且rebalancing需要的时间更长了,从2-3分钟延长到了5-10分钟,消息积压时间延长、用户体验更差了。

    四、kafka反复rebalancing最终解决方法

    1.排查过程

    反复排查了整个项目,情况如下:

    (1)生产环境最近只发版了一个很小的功能,这个功能不会造成kafka反复rebalancing。

    (2)生产环境发版后,有2天时间日志是正常的,kafka没有反复rebalancing,说明之前的kafka配置基本没有问题。

    (3)第3天下午开始kafka出现了反复rebalancing问题,但是期间并没有发版,也不是用户访问量突然增多导致的。

    (4)尝试调大kafka超时时间,但是没有作用。

    (5)重启了kafka,也重启了所有消费者,但是反复rebalancing问题并没有好转。

    2.最终解决方法

    1.kafka重新平衡是按group的,具体来说就是commonGroup不知道哪里除了问题:

    warning: Consumer group 'commonGroup' is rebalancing.
    
    • 1

    2.因此,决定把这个组里比较重要的几个topic移动出去,换到其它组(java里只需要改一行):

    //这里没有显式配置组,用的是上方KafkaConfig.java里的commonGroup组
    //@KafkaListener(topics = "${kafka.topic.commit}")
    
    //改为了显式配置组,把这个topic移动到新组 commitGroup
    @KafkaListener(topics = "${kafka.topic.commit}", groupId = "commitGroup")
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.把重要的topic移动出去、分到新组后,发现,新组里的topic工作正常,没有反复重新平衡;
    旧组commonGroup依然有问题,隔一段时间就会rebalancing。

    4.由于旧组里的topic不太重要,因此消费堆积2-3分钟的问题勉强可以接受;
    由于旧组里的topic还有很多,因此暂时还没有排查出是哪个topic及其消费者有问题。

    5.最后,这个问题就勉强算解决了,后续有时间后再继续研究为什么kafka会反复rebalancing。

    五、备注

    1.spring设置kafka参数session超时时间时,要小于请求超时时间与处理超时时间,例如:

    request.timeout.ms = 30000  session.timeout.ms = 15000    max.poll.interval.ms = 300000
    
    session.timeout.ms < request.timeout.ms
    
    session.timeout.ms < max.poll.interval.ms
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.kafka的topic的分区,最好是有几个消费者、就创建几个分区,这样可以一一对应,一个消费者对应一个分区。

    3.kafka的rebalancing是按group的,遇到rebalancing问题,可以把重要的topic移动到其它group里,试试能不能行;最好是一个topic一个group,这样可以快速定位是哪个topic出了问题。

  • 相关阅读:
    UE4 Unlua源码解析12 - Lua与UE4的混合GC
    零零信安-D&D数据泄露报警日报【第40期】
    站长告诉怎么选择网站服务器
    高斯线性模型
    银河麒麟服务器x86安装qemu虚拟机,并安装windows server 2019
    与创新者同行!Apache Doris 首届线下峰会即将开启,最新议程公开!|即刻预约
    Discuz小鱼游戏风影传说商业GBK+UTF8版模板/DZ游戏网站模板
    6大热门开源自动化测试框架【建议收藏】
    中秋节祝福程序源代码分享:土地分类数据阈值筛选和重投影分类
    JavaScript的综合案例
  • 原文地址:https://blog.csdn.net/BHSZZY/article/details/126757295