• 编码技巧——@KafkaListener的使用


    最近在接手某个项目代码时,发现关于Kafka的consumer相关的代码写的很乱,consumer中写了大量的配置的代码,并且手动的拉取消息,并开启线程消费,不够优雅;

    理想的做法是单独维护kafka的consumer配置,在定义consumer的bean时,指定topic和group,仅实现消费逻辑;

    从kafka-clients的2.2.4版本开始,可以直接使用@KafkaListener注解来标记消费者,注解的属性将覆盖在消费者工厂中配置的具有相同名称的所有属性,下面介绍使用方法;

    1. <kafka.client.version>2.3.1kafka.client.version>
    2. <spring-kafka.version>1.3.9.RELEASEspring-kafka.version>
    3. <dependency>
    4. <groupId>org.apache.kafkagroupId>
    5. <artifactId>kafka-clientsartifactId>
    6. <version>${kafka.client.version}version>
    7. dependency>
    8. <dependency>
    9. <groupId>org.springframework.kafkagroupId>
    10. <artifactId>spring-kafkaartifactId>
    11. <version>${spring-kafka.version}version>
    12. dependency>

    1. 定义消费者工厂ConcurrentKafkaListenerContainerFactory

    1. /**
    2. * @description kafka配置类
    3. */
    4. @EnableKafka
    5. @Configuration
    6. public class KafkaConfig {
    7. /**
    8. * 消费者配置 因为当前应用的多个consumer消费不同的broker集群 因此这里把kafkaServerUrls提出来当做入参
    9. * @param kafkaServerUrls
    10. * @return
    11. */
    12. public Map consumerConfigs(String kafkaServerUrls) {
    13. Map props = Maps.newHashMap();
    14. // broker server地址
    15. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrls);
    16. // 自动提交(按周期)已消费offset 批量消费下设置false
    17. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    18. // session超时时间 broker提出consumer的心跳间隔
    19. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ConfigManager.getString("kafka.session.timeout.ms", "60000"));
    20. // 最大消息拉取条数
    21. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, ConfigManager.getString("kafka.max.poll.records", "500"));
    22. // 序列化
    23. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    24. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    25. // 默认的groupId 未配置则在注解中声明
    26. props.put(ConsumerConfig.GROUP_ID_CONFIG, ConfigManager.getString(BizConstants.KAFKA_GROUP_ID));
    27. // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
    28. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    29. // 加密
    30. props.put("sasl.mechanism", "SCRAM-SHA-256");
    31. props.put("security.protocol", "SASL_PLAINTEXT");
    32. // 账号密码
    33. props.put("sasl.jaas.config", ConfigManager.getString(BizConstants.KAFKA_SASL_JAAS_CONFIG));
    34. return props;
    35. }
    36. /**
    37. * 消费者工厂类
    38. *
    39. * @param kafkaServerUrls
    40. * @return
    41. */
    42. public ConsumerFactory initConsumerFactory(String kafkaServerUrls) {
    43. return new DefaultKafkaConsumerFactory(consumerConfigs(kafkaServerUrls));
    44. }
    45. public KafkaListenerContainerFactory> initKafkaListenerContainerFactory(String kafkaServerUrls) {
    46. ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    47. factory.setConsumerFactory(initConsumerFactory(kafkaServerUrls));
    48. factory.setConcurrency(ConfigManager.getInteger("kafka.concurrency", 1));
    49. // listener类型为批量batch类型
    50. factory.setBatchListener(true);
    51. // offset提交模式为batch
    52. factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    53. factory.getContainerProperties().setPollTimeout(ConfigManager.getInteger("kafka.poll.timeout", 3000));
    54. return factory;
    55. }
    56. /**
    57. * 工厂A 绑定A集群
    58. * @return
    59. */
    60. @Bean(name = "AKafkaListenerContainerFactory")
    61. public KafkaListenerContainerFactory> moliKafkaListenerContainerFactory() {
    62. return initKafkaListenerContainerFactory(ConfigManager.get("gameCenter.brokers", "moli-kafka.prd.lan:9092"));
    63. }
    64. /**
    65. * 工厂B 绑定B集群
    66. * @return
    67. */
    68. @Bean(name = "BKafkaListenerContainerFactory")
    69. public KafkaListenerContainerFactory> xiangrikuiKafkaListenerContainerFactory() {
    70. return initKafkaListenerContainerFactory(ConfigManager.get("minigame.brokers", "xiangrikui-kafka.prd.lan:9092"));
    71. }
    72. }

    2. 定义消费者KafkaListener

    1. @Slf4j
    2. @Component
    3. public class MyKafkaListener implements BatchMessageListener {
    4. public static final Integer BIZ_CODE = 2;
    5. @Autowired
    6. private KafkaMsgHandleService kafkaMsgHandleService;
    7. @Autowired
    8. @Qualifier("dataDiskThreadPool")
    9. private ThreadPoolTaskExecutor executor;
    10. /**
    11. * Invoked with data from kafka.
    12. *
    13. * @param batch the data to be processed.
    14. */
    15. @Override
    16. @KafkaListener(
    17. containerFactory = "AKafkaListenerContainerFactory",
    18. topics = "${my.topic}",
    19. groupId = "${my.consumer.group}"
    20. )
    21. public void onMessage(List> batch) {
    22. try {
    23. final List msgBatch = batch.stream().map(ConsumerRecord::value).collect(Collectors.toList());
    24. msgBatch.forEach(this::processSingleRecord);
    25. if (EnvUtils.isNotPrdPressEnv()) {
    26. log.info("本次接收的kafka消息体size={} content:{}", msgBatch.size(), JSON.toJSONString(msgBatch));
    27. }
    28. } catch (Exception e) {
    29. log.error("KafkaListener_kafka_consume_error.", e);
    30. }
    31. }
    32. private void processSingleRecord(String data) {
    33. executor.submit(() -> {
    34. if (!ConfigManager.getBoolean("my.kafka.switch", true)) {
    35. log.warn("KafkaListener_turn_off_drop_message.");
    36. return;
    37. }
    38. kafkaMsgHandleService.handle(data, BIZ_CODE);
    39. });
    40. }
    41. }

    对于不同的消费者配置,可以实现不同的KafkaListener来处理消息,上面的代码就针对自动提交/批量消费的消费者配置,实现BatchMessageListener接口;可参考下面官方文档Spring for Apache Kafka给出的几种针对不同配置的消费者接口;

    1. public interface MessageListener {
    2. void onMessage(ConsumerRecord data);
    3. }
    4. public interface AcknowledgingMessageListener {
    5. void onMessage(ConsumerRecord data, Acknowledgment acknowledgment);
    6. }
    7. public interface ConsumerAwareMessageListener extends MessageListener {
    8. void onMessage(ConsumerRecord data, Consumer consumer);
    9. }
    10. public interface AcknowledgingConsumerAwareMessageListener extends MessageListener {
    11. void onMessage(ConsumerRecord data, Acknowledgment acknowledgment, Consumer consumer);
    12. }
    13. public interface BatchMessageListener {
    14. void onMessage(List> data);
    15. }
    16. public interface BatchAcknowledgingMessageListener {
    17. void onMessage(List> data, Acknowledgment acknowledgment);
    18. }
    19. public interface BatchConsumerAwareMessageListener extends BatchMessageListener {
    20. void onMessage(List> data, Consumer consumer);
    21. }
    22. public interface BatchAcknowledgingConsumerAwareMessageListener extends BatchMessageListener {
    23. void onMessage(List> data, Acknowledgment acknowledgment, Consumer consumer);
    24. }

    3. @KafkaListener注解属性说明

    1. public @interface KafkaListener {
    2. /**
    3. * 监听器id
    4. */
    5. String id() default "";
    6. /**
    7. * 监听器工厂
    8. */
    9. String containerFactory() default "";
    10. /**
    11. * 监听器topics
    12. */
    13. String[] topics() default {};
    14. /**
    15. * 监听器topics匹配正则表达式
    16. */
    17. String topicPattern() default "";
    18. /**
    19. * 监听器分区
    20. */
    21. TopicPartition[] topicPartitions() default {};
    22. /**
    23. * 异常处理器
    24. */
    25. String errorHandler() default "";
    26. /**
    27. * 分组id
    28. */
    29. String groupId() default "";
    30. /**
    31. * 是否使用id作为groupId
    32. */
    33. boolean idIsGroup() default true;
    34. }

    3.1 id 监听器的id

    (1)监听器id属性,可用来命名消费者线程,如下:

    填写id = "consumer-id5",线程名如下:

        2022-8-8 17:27:30 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消费

    没有填写ID,线程名如下:

        2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

    (2)需要注意,在相同容器中,监听器ID不能重复

    否则会报错:

    Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id...

    (3)会覆盖消费者工厂的消费组GroupId

    例如,消费者工厂属性配置了消费组kafka.consumer.group-id=BASE-DEMO,它是该容器中的默认消费组;但是如果设置了 @KafkaListener(id = "consumer-id7"),那么当前消费者的消费组就是consumer-id7;当然如果你不想要他作为groupId的话可以设置属性idIsGroup = false,那么还是会使用默认的GroupId;

    (4)如果配置了属性groupId,则groupId优先级最高

     @KafkaListener(id = "consumer-id5", idIsGroup = false, topics = "My_TOPIC", groupId = "groupId-test")

    3.2 监听器的topic

    关于topic的配置有3种,topics、topicPattern、topicPartitions 三选一;

    (1)topics属性,这种方式最简单,可以指定多个topic

    1. @KafkaListener(
    2. topics = {"SHI_TOPIC3","SHI_TOPIC4"},
    3. groupId = "${gameCenter.consumer.group}"
    4. )

    (2)topicPattern,支持表达式

    1. @KafkaListener(id = "pullPatternMsg", topicPattern = "rx_.*_.*_thing.*", concurrency = "1")
    2. public void pullPatternMsg(@Payload String data,
    3. @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    4. @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ByteBuffer key,
    5. Acknowledgment ack, //手动提交offset
    6. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    7. @Header(KafkaHeaders.OFFSET) long offSet,
    8. Consumer consumer //消费者
    9. )

    (3)topicPartitions显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

    1. @KafkaListener(id = "thing2", topicPartitions =
    2. { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
    3. @TopicPartition(topic = "topic2", partitions = "0",
    4. partitionOffsets = @PartitionOffset(partition = "1",
    5. initialOffset = "100"))
    6. })
    7. public void listen(ConsumerRecord record) {
    8. ...
    9. }

    上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;

    3.3 errorHandler异常处理器

    可以在consumer中手动try/catch,也可以实现KafkaListenerErrorHandler复用异常处理逻辑;

    1. @Component("kafkaErrorHandler")
    2. public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    3.     @Override
    4.     public Object handleError(Message message, ListenerExecutionFailedException exception) {
    5.         return null;
    6.     }
    7.  
    8.     @Override
    9.     public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) {
    10.         //do someting
    11.         return null;
    12.     }
    13. }

    调用的时候errorHandler的值填写beanName,如下:

    1. @KafkaListener(
    2. containerFactory = "moliKafkaListenerContainerFactory",
    3. topics = "${gameCenter.topic}",
    4. groupId = "${gameCenter.consumer.group}",
    5. errorHandler = "kafkaErrorHandler"
    6. )

    3.4 containerFactory监听器工厂

    kafka的配置就放在这里;

    1. @Bean(name = "moliKafkaListenerContainerFactory")
    2. public KafkaListenerContainerFactory> moliKafkaListenerContainerFactory() {
    3. return initKafkaListenerContainerFactory(ConfigManager.get("gameCenter.brokers", "moli-kafka.prd.lan:9092"));
    4. }
    1. @KafkaListener(
    2. containerFactory = "moliKafkaListenerContainerFactory",
    3. topics = "${gameCenter.topic}",
    4. groupId = "${gameCenter.consumer.group}"
    5. )

    补充:kafka的消费者分区分配策略

    kafka有三种分区分配策略

    1. RoundRobin

    2. Range

    3. Sticky

    1. RoundRobin

    (1)把所有topic的分区partition放入一个队列中,按照name的hashcode进行排序;

    (2)把consumer放在一个循环队列,按照name的hashcode进行排序;

    (3)循环遍历consumer,从partition队列pop出一个partition,分配给当前consumer;以此类推,取下一个consumer,继续从partition队列pop出来分配给当前consumer;直到partition队列中的元素被分配完;

    2. Range

    (1)假设topicA有4个分区,topicB有5个分区,topicC有6个分区;一共有3个consumer;

    (2)遍历3个topic的分区集合,先取topicA的分区集合,然后准备依次给3个consumer分配分区;对于第1个consumer,所分配的分区数量根据以下公式:假设消费者数量为N,当前主题剩下的分区数量为M,则当前消费者应该分配的分区数量 = M%N==0? M/N +1 : M/N ;按照公式,3个消费者应该分配的分区数量依次为:2/1/1,即topicA-partition-0/1分配给consumer-0,topicA-partition-2分配给consumer-1,topicA-partition-3分配给consumer-2;

    (3)按照上述规则按序把topicB和topicC的分区分配给3个consumer;依次为:2/2/1,2/2/2;

    3. Sticky

    kafka在0.11版本引入了Sticky分区分配策略,它的两个主要目的是:

    1. 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;

    2. 分区的分配尽可能的与上次分配的保持相同;

    当两者发生冲突时,第一个目标优先于第二个目标;

    粘性分区是由Kafka从0.11x版本开始引入的分配策略,首先会尽量均衡的分配分区到消费者上面,在出现同一消费组内消费者出现问题的时候,会尽量保持原来的分配的分区不变;

    Sticky分区初始分配分区的方法与Range相似,但是不同;拿7个分区3个消费者为例,消费者消费的分区依旧是3/2/2,但是不同与Range的是Range分区是排好序的,但是Sticky分区是随机的;

    参考:

    @KafkaListener详解与使用_python_石臻臻的杂货铺-DevPress官方社区

    kafka之@KafkaListener

    Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例

    Spring for Apache Kafka

    kafka的消费者分区分配策略

    Kafka消费者分区分配策略

  • 相关阅读:
    VoxEdit 主题创作大赛:将 90 年代的复古元素带入 Web3
    SLAM中的子图
    若依框架使用mars3d的环境配置,地球构建
    k8s 基础
    小米手环6解决天气未同步问题
    基于FastAPI的文件上传和下载
    《大数据之路:阿里巴巴大数据实践》-第3篇 数据管理篇 -第13章 计算管理
    软件测试需要学习什么 3分钟带你了解软测的学习内容
    [计算机毕业设计]基于SM9的密钥交换方案的实现与应用
    【Java】Java 17 新特性
  • 原文地址:https://blog.csdn.net/minghao0508/article/details/126231353