码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • SpringBoot——》@KafkaListener


    推荐链接:
        总结——》【Java】
        总结——》【Mysql】
        总结——》【Redis】
        总结——》【Spring】
        总结——》【SpringBoot】
        总结——》【MyBatis、MyBatis-Plus】

    SpringBoot——》@KafkaListener

    • 一、监听器id
      • 1、在相同容器中,监听器id不能重复
      • 2、使用默认配置的消费组
      • 3、使用自定义的消费组
    • 二、监听器工厂
      • 1、定义kafkaListenerContainerFactory
      • 2、配置containerFactory参数
    • 三、监听器topics
      • 1、固定监听topics
      • 2、动态监听topics
    • 四、监听器topics匹配正则表达式
    • 五、监听器分区
    • 六、异常处理器
      • 1、实现KafkaListenerErrorHandler
      • 2、配置errorHandler参数
    • 七、分组id
    • 八、是否使用id作为groupId

    方法功能
    String id() default “”;监听器id
    String containerFactory() default “”;监听器工厂
    String[] topics() default {};监听器topics
    String topicPattern() default “”;监听器topics匹配正则表达式
    TopicPartition[] topicPartitions() default {};监听器分区
    String errorHandler() default “”;异常处理器
    String groupId() default “”;分组id
    boolean idIsGroup() default true;是否使用id作为groupId

    一、监听器id

    @KafkaListener(id = "listenerForSyncEsfCommunity", topics = "test_topic1")
    
    • 1

    1、在相同容器中,监听器id不能重复

    如果ID重复,会报错Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id

    2、使用默认配置的消费组

    kafka.consumer.group-id = xxxxx

    // 消费组为xxxxx
    @KafkaListener(id = "listenerForSyncEsfCommunity",idIsGroup = false)
    
    • 1
    • 2

    3、使用自定义的消费组

    // 方式一:消费组为listenerForSyncEsfCommunity
    @KafkaListener(id = "listenerForSyncEsfCommunity")
    
    // 方式二:消费组为groupId-test
    @KafkaListener(id = "listenerForSyncEsfCommunity",idIsGroup = false,groupId = "groupId-test")
    
    • 1
    • 2
    • 3
    • 4
    • 5

    二、监听器工厂

    1、定义kafkaListenerContainerFactory

    @Bean("kafkaListenerContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // consumerGroupId为空时,会用默认的groupId
        factory.setConsumerFactory(consumerFactory("g1"));
        factory.setConcurrency(4);
        // 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2、配置containerFactory参数

    @KafkaListener(id = "listenerForSyncEsfCommunity", topics = "test_topic1", containerFactory = "kafkaListenerContainerFactory")
    
    • 1

    三、监听器topics

    1、固定监听topics

    // 指定多个topic
    @KafkaListener(id = "listenerForSyncEsfCommunity", topics = {"test_topic1","test_topic2"})
    
    • 1
    • 2

    2、动态监听topics

    自定义配置:kafka.consumer.topics=topic1,topic2

    // Spring的SpEl表达式
    @KafkaListener(topics = "#{'${kafka.consumer.topics}'.split(',')}")
    
    • 1
    • 2

    四、监听器topics匹配正则表达式

    @KafkaListener(id = "listenerForSyncEsfCommunity", topicPattern = "test_.*topic.*")
    
    • 1

    五、监听器分区

    @KafkaListener(id = "listenerForSyncEsfCommunity",  topicPartitions =
            { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
              @TopicPartition(topic = "topic2", partitions = "0")
            })
    
    • 1
    • 2
    • 3
    • 4

    六、异常处理器

    异常处理有2种方式:

    • 方式一:consumer中手动try/catch
    • 方式二:实现KafkaListenerErrorHandler,重写异常处理逻辑

    1、实现KafkaListenerErrorHandler

    @Component("kafkaErrorHandler")
        public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
                return null;
            }
    
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
                //TODO
                return null;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2、配置errorHandler参数

    // 调用的时候errorHandler的值填写beanName
    @KafkaListener(id = "listenerForSyncEsfCommunity", topics = "topic1",errorHandler = "kafkaErrorHandler")
    
    • 1
    • 2

    七、分组id

    参考监听器id

    八、是否使用id作为groupId

    参考监听器id

  • 相关阅读:
    androidStudio第一次运行报错无法运行
    《JAVA设计模式系列》责任链模式
    自动驾驶感知算法实战15——纯视觉感知和传感器融合方案对比,特斯拉九头蛇的进化
    HarmonyOS列表组件
    AOP——基本概念、底层原理
    网络性能概述
    ALGO开发源码【node服务】
    使用metrics-server监控k8s的资源指标
    CentOS7.9 下修改MariaDB访问端口不能访问
    Helplook VS Document360:哪个更适合知识库管理?
  • 原文地址:https://blog.csdn.net/weixin_43453386/article/details/128189386
  • 最新文章
  • 攻防演习之三天拿下官网站群
    数据安全治理学习——前期安全规划和安全管理体系建设
    企业安全 | 企业内一次钓鱼演练准备过程
    内网渗透测试 | Kerberos协议及其部分攻击手法
    0day的产生 | 不懂代码的"代码审计"
    安装scrcpy-client模块av模块异常,环境问题解决方案
    leetcode hot100【LeetCode 279. 完全平方数】java实现
    OpenWrt下安装Mosquitto
    AnatoMask论文汇总
    【AI日记】24.11.01 LangChain、openai api和github copilot
  • 热门文章
  • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
    奉劝各位学弟学妹们,该打造你的技术影响力了!
    五年了,我在 CSDN 的两个一百万。
    Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
    面试官都震惊,你这网络基础可以啊!
    你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
    13 万字 C 语言从入门到精通保姆级教程2021 年版
    10行代码集2000张美女图,Python爬虫120例,再上征途
Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
正则表达式工具 cron表达式工具 密码生成工具

京公网安备 11010502049817号