• SpringBoot Kafka消费者 多kafka配置


    一、配置文件

    1. xxxxxx:
    2. kafka:
    3. bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
    4. consumer:
    5. poll-timeout: 3000
    6. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    7. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    8. auto-commit: false
    9. offset-reset: earliest
    10. records: 10
    11. session-timeout: 150000
    12. poll-interval: 360000
    13. request-timeout: 60000

    二、KafkaConfig

    1. package com.xxxxxx.xxxxxx.config;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.apache.kafka.clients.consumer.ConsumerConfig;
    4. import org.springframework.beans.factory.annotation.Value;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. import org.springframework.kafka.annotation.EnableKafka;
    8. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    9. import org.springframework.kafka.config.KafkaListenerContainerFactory;
    10. import org.springframework.kafka.core.ConsumerFactory;
    11. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    12. import org.springframework.kafka.listener.*;
    13. import java.util.HashMap;
    14. import java.util.Map;
    15. @Slf4j
    16. @Configuration
    17. @EnableKafka
    18. public class KafkaConfig {
    19. @Value("${xxxxxx.kafka.bootstrap-servers}")
    20. private String bootstrapServers;
    21. @Value("${xxxxxx.kafka.consumer.poll-timeout}")
    22. private Integer pollTimeout;
    23. @Value("${xxxxxx.kafka.consumer.key-deserializer}")
    24. private String keyDeserializer;
    25. @Value("${xxxxxx.kafka.consumer.value-deserializer}")
    26. private String valueDeserializer;
    27. @Value("${xxxxxx.kafka.consumer.auto-commit}")
    28. private String autoCommit;
    29. @Value("${xxxxxx.kafka.consumer.offset-reset}")
    30. private String offsetReset;
    31. @Value("${xxxxxx.kafka.consumer.records}")
    32. private Integer records;
    33. @Value("${xxxxxx.kafka.consumer.session-timeout}")
    34. private Integer sessionTimeout;
    35. @Value("${xxxxxx.kafka.consumer.poll-interval}")
    36. private Integer pollInterval;
    37. @Value("${xxxxxx.kafka.consumer.request-timeout}")
    38. private Integer requestTimeout;
    39. @Bean(name = "ixxxxxxKafkaListenerContainerFactory")
    40. public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {
    41. ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
    42. factory.setConsumerFactory(consumerFactory());
    43. //并发数量
    44. factory.setConcurrency(3);
    45. //设置在消费者中等待记录的最大阻塞时间。
    46. factory.getContainerProperties().setPollTimeout(pollTimeout);
    47. //ack模式
    48. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    49. return factory;
    50. }
    51. private ConsumerFactory consumerFactory() {
    52. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    53. }
    54. private Map consumerConfigs() {
    55. Map props = new HashMap<>();
    56. //Kafka集群
    57. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    58. //消费者组,只要group.id相同,就属于同一个消费者组
    59. //props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    60. //是否自动提交offset,默认为true,设置为false
    61. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
    62. //key反序列化器
    63. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
    64. //value反序列化器
    65. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
    66. //一次消费信息条数
    67. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
    68. //earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息
    69. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
    70. //session超时时间
    71. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    72. //消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员
    73. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
    74. //客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求
    75. props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
    76. return props;
    77. }
    78. }

    三、消费者

    1. @KafkaListener(
    2. containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",
    3. id = "itsId",
    4. idIsGroup = false,
    5. groupId = "itsGroupId",
    6. topics = "itsTopic"
    7. )
    8. public void consumerUser(
    9. @Payload String data,
    10. @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    11. Acknowledgment ack,
    12. Consumer consumer
    13. ){
    14. try{
    15. }catch (Exception e){
    16. }
    17. ack.acknowledge();
    18. }

  • 相关阅读:
    738. 单调递增的数字
    论文阅读笔记——A deep tree-based model for software defect prediction
    Reinforcement Learning in the Era of LLMs: What is Essential? What is needed?
    CANopen Object 1000h: Device type 多设备信息
    【网络安全】护网
    经典场的量子化
    eBPF 极简开发工具介绍:eunomia-bpf
    SpringMVC【文件上传(原生方式上传、上传多文件、异步上传、跨服务器上传 ) 】(五)-全面详解(学习总结---从入门到深化)
    Spring状态机(FSM),让订单状态流转如丝般顺滑
    大语言模型RAG-将本地大模型封装为langchain的chat model(三)
  • 原文地址:https://blog.csdn.net/cndn20120225/article/details/134271762