一、配置文件
- xxxxxx:
- kafka:
- bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
- consumer:
- poll-timeout: 3000
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- auto-commit: false
- offset-reset: earliest
- records: 10
- session-timeout: 150000
- poll-interval: 360000
- request-timeout: 60000
二、KafkaConfig
- package com.xxxxxx.xxxxxx.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.config.KafkaListenerContainerFactory;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.listener.*;
-
- import java.util.HashMap;
- import java.util.Map;
-
- @Slf4j
- @Configuration
- @EnableKafka
- public class KafkaConfig {
-
- @Value("${xxxxxx.kafka.bootstrap-servers}")
- private String bootstrapServers;
-
- @Value("${xxxxxx.kafka.consumer.poll-timeout}")
- private Integer pollTimeout;
-
- @Value("${xxxxxx.kafka.consumer.key-deserializer}")
- private String keyDeserializer;
-
- @Value("${xxxxxx.kafka.consumer.value-deserializer}")
- private String valueDeserializer;
-
- @Value("${xxxxxx.kafka.consumer.auto-commit}")
- private String autoCommit;
-
- @Value("${xxxxxx.kafka.consumer.offset-reset}")
- private String offsetReset;
-
- @Value("${xxxxxx.kafka.consumer.records}")
- private Integer records;
-
- @Value("${xxxxxx.kafka.consumer.session-timeout}")
- private Integer sessionTimeout;
-
- @Value("${xxxxxx.kafka.consumer.poll-interval}")
- private Integer pollInterval;
-
- @Value("${xxxxxx.kafka.consumer.request-timeout}")
- private Integer requestTimeout;
-
- @Bean(name = "ixxxxxxKafkaListenerContainerFactory")
- public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- //并发数量
- factory.setConcurrency(3);
- //设置在消费者中等待记录的最大阻塞时间。
- factory.getContainerProperties().setPollTimeout(pollTimeout);
- //ack模式
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
- return factory;
- }
-
- private ConsumerFactory consumerFactory() {
- return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
-
- private Map consumerConfigs() {
- Map props = new HashMap<>();
-
- //Kafka集群
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- //消费者组,只要group.id相同,就属于同一个消费者组
- //props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- //是否自动提交offset,默认为true,设置为false
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
- //key反序列化器
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
- //value反序列化器
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
- //一次消费信息条数
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
- //earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
- //session超时时间
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
- //消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员
- props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
- //客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求
- props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
-
- return props;
- }
- }
三、消费者
- @KafkaListener(
- containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",
- id = "itsId",
- idIsGroup = false,
- groupId = "itsGroupId",
- topics = "itsTopic"
- )
- public void consumerUser(
- @Payload String data,
- @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
- Acknowledgment ack,
- Consumer, ?> consumer
- ){
- try{
-
-
- }catch (Exception e){
-
- }
- ack.acknowledge();
- }