• kafka广播消费组停机后未删除优化


    背景

    kafka广播消息的时候为了保证groupId不重复,再创建的时间采用前缀+时间戳的形式,这样可以保证每次启动的时候是创建的新的,但是

    会出现一个问题:就是每次停机或者重启都会新建一个应用实例,关闭应用后并不会删除kafka下面的消费组,导致消费组越来越多,目前

    我们有promethes监控kafka消息偏移,一直没有消费的消费组就会进行报警;

    解决思路

    既然是没有删除消费组就通过优雅停机,应用关闭前采用java的api操作kafka消费组,进行删除

    代码实现

    1)编写类实现DisposableBean接口,实现destroy方法,注意每个项目定义的id会不一样,此例子中 id = “cfgs-broadcast”

    package com.simo.vsim.cfgs.init;
     
    import com.alibaba.nacos.api.config.annotation.NacosValue;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
    import org.apache.kafka.common.KafkaFuture;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
    import org.springframework.kafka.listener.MessageListenerContainer;
    import org.springframework.stereotype.Component;
     
    import javax.annotation.Resource;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
     
    @Data
    @Component
    @Slf4j
    public class ApplicationListen implements InitializingBean, DisposableBean {
     
        @Resource
        private KafkaListenerEndpointRegistry registry;
     
        @NacosValue(value = "${spring.kafka.bootstrap-servers}", autoRefreshed = true)
        private String servers;
     
        @Override
        public void destroy()  {
            MessageListenerContainer listenerContainer = registry.getListenerContainer("cfgs-broadcast");
            String groupId = listenerContainer.getGroupId();
            Map<String, Object> props = new HashMap<>(1);
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
            AdminClient adminClient = AdminClient.create(props);
            DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(groupId));
            KafkaFuture resultFuture = deleteConsumerGroupsResult.all();
            try {
                resultFuture.get();
                log.info("kafka关闭消费组="+groupId);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
            adminClient.close();
        }
     
        @Override
        public void afterPropertiesSet() {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    2)接收kafka广播消息的时候指定容器id,用于第一步通过id进行删除,id = “cfgs-broadcast”

    /**
     * groupId不一样代表广播模式,earliest 可能重复消费,latest可能漏消费
     * @param message
     * @param ack
     */
    @KafkaListener(containerFactory = "manualImmediateListenerContainerFactory" , topics = {"${kafka.topic.cfgs-broadcast}"},properties = {"auto.offset.reset=latest"},
            groupId = "cfgs-broadcast-" + "#{T(java.lang.System).currentTimeMillis()}",idIsGroup = false,id = "cfgs-broadcast")
    public void onMessageManualBroadcast(List<Object> message, Acknowledgment ack){
        message.forEach(item -> handleMsg(2,item));
        //直接提交offset
        ack.acknowledge();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    效果

    1)正常启动有这个消费组:cfgs-broadcast-1696754926097

    2)重新启动,通过日志显示已经删除(k8s默认是优雅停机)
    在这里插入图片描述
    如果是iead直接关闭下,不要一下子点击两下停止,点击一次是优雅停机,连续点击2次就是kill -9的效果,就无法看到效果
    ![在这里插入图片描述](https://img-blog.csdnimg.cn/d36947cdd8f048acaa886eadafeaa34b.png

    3)查看kafka消费组,确实已经删除

  • 相关阅读:
    【JavaScript】用类的操作对CSDN社区管理菜单栏优化
    【千律】OpenCV基础:Hough圆检测
    TorchVision Transforms API 大升级,支持目标检测、实例/语义分割及视频类任务
    循环神经网络(RNN)
    系统安全分析与设计
    14. 机器学习 - KNN & 贝叶斯
    初识React.js
    Object.prototype.toString.call() 和 instanceOf 和 Array.isArray() 详解
    ChatGPT不到1分钟生成全部代码,你就说慌不慌吧?
    input输入多行文本:删除“首先 其次 此外 总的来说”
  • 原文地址:https://blog.csdn.net/wuyongde0922/article/details/133760358