• SpringBoot Kafka消费者 多kafka配置


    一、配置文件

    1. xxxxxx:
    2. kafka:
    3. bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
    4. consumer:
    5. poll-timeout: 3000
    6. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    7. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    8. auto-commit: false
    9. offset-reset: earliest
    10. records: 10
    11. session-timeout: 150000
    12. poll-interval: 360000
    13. request-timeout: 60000

    二、KafkaConfig

    1. package com.xxxxxx.xxxxxx.config;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.apache.kafka.clients.consumer.ConsumerConfig;
    4. import org.springframework.beans.factory.annotation.Value;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. import org.springframework.kafka.annotation.EnableKafka;
    8. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    9. import org.springframework.kafka.config.KafkaListenerContainerFactory;
    10. import org.springframework.kafka.core.ConsumerFactory;
    11. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    12. import org.springframework.kafka.listener.*;
    13. import java.util.HashMap;
    14. import java.util.Map;
    15. @Slf4j
    16. @Configuration
    17. @EnableKafka
    18. public class KafkaConfig {
    19. @Value("${xxxxxx.kafka.bootstrap-servers}")
    20. private String bootstrapServers;
    21. @Value("${xxxxxx.kafka.consumer.poll-timeout}")
    22. private Integer pollTimeout;
    23. @Value("${xxxxxx.kafka.consumer.key-deserializer}")
    24. private String keyDeserializer;
    25. @Value("${xxxxxx.kafka.consumer.value-deserializer}")
    26. private String valueDeserializer;
    27. @Value("${xxxxxx.kafka.consumer.auto-commit}")
    28. private String autoCommit;
    29. @Value("${xxxxxx.kafka.consumer.offset-reset}")
    30. private String offsetReset;
    31. @Value("${xxxxxx.kafka.consumer.records}")
    32. private Integer records;
    33. @Value("${xxxxxx.kafka.consumer.session-timeout}")
    34. private Integer sessionTimeout;
    35. @Value("${xxxxxx.kafka.consumer.poll-interval}")
    36. private Integer pollInterval;
    37. @Value("${xxxxxx.kafka.consumer.request-timeout}")
    38. private Integer requestTimeout;
    39. @Bean(name = "ixxxxxxKafkaListenerContainerFactory")
    40. public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {
    41. ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
    42. factory.setConsumerFactory(consumerFactory());
    43. //并发数量
    44. factory.setConcurrency(3);
    45. //设置在消费者中等待记录的最大阻塞时间。
    46. factory.getContainerProperties().setPollTimeout(pollTimeout);
    47. //ack模式
    48. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    49. return factory;
    50. }
    51. private ConsumerFactory consumerFactory() {
    52. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    53. }
    54. private Map consumerConfigs() {
    55. Map props = new HashMap<>();
    56. //Kafka集群
    57. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    58. //消费者组,只要group.id相同,就属于同一个消费者组
    59. //props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    60. //是否自动提交offset,默认为true,设置为false
    61. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
    62. //key反序列化器
    63. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
    64. //value反序列化器
    65. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
    66. //一次消费信息条数
    67. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
    68. //earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息
    69. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
    70. //session超时时间
    71. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    72. //消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员
    73. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
    74. //客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求
    75. props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
    76. return props;
    77. }
    78. }

    三、消费者

    1. @KafkaListener(
    2. containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",
    3. id = "itsId",
    4. idIsGroup = false,
    5. groupId = "itsGroupId",
    6. topics = "itsTopic"
    7. )
    8. public void consumerUser(
    9. @Payload String data,
    10. @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    11. Acknowledgment ack,
    12. Consumer consumer
    13. ){
    14. try{
    15. }catch (Exception e){
    16. }
    17. ack.acknowledge();
    18. }

  • 相关阅读:
    在ubuntu上安装ns2和nam(ubuntu16.04)
    【信号加密】基于傅里叶变换和小波变换对音频水印的嵌入、提取matlab代码
    YOLO物体检测-系列教程1:YOLOV1整体解读(预选框/置信度/分类任/回归任务/损失函数/公式解析/置信度/非极大值抑制)
    基于Java+SpringBoot+Vue前后端分离旅游管理系统设计和实现
    AI绘画:实例-利用Stable Diffusion ComfyUI实现多图连接:区域化提示词与条件设置
    链表-真正的动态数据结构
    Large Search Model: Redefining Search Stack in the Era of LLMs
    【C++】STL06 -list
    【构建并发程序】2-线程池-的注意事项与缺点
    SpringBoot+Vue项目线上买菜系统
  • 原文地址:https://blog.csdn.net/cndn20120225/article/details/134271762