最近在接手某个项目代码时,发现关于Kafka的consumer相关的代码写的很乱,consumer中写了大量的配置的代码,并且手动的拉取消息,并开启线程消费,不够优雅;
理想的做法是单独维护kafka的consumer配置,在定义consumer的bean时,指定topic和group,仅实现消费逻辑;
从kafka-clients的2.2.4版本开始,可以直接使用@KafkaListener注解来标记消费者,注解的属性将覆盖在消费者工厂中配置的具有相同名称的所有属性,下面介绍使用方法;
- <kafka.client.version>2.3.1kafka.client.version>
- <spring-kafka.version>1.3.9.RELEASEspring-kafka.version>
-
- <dependency>
- <groupId>org.apache.kafkagroupId>
- <artifactId>kafka-clientsartifactId>
- <version>${kafka.client.version}version>
- dependency>
- <dependency>
- <groupId>org.springframework.kafkagroupId>
- <artifactId>spring-kafkaartifactId>
- <version>${spring-kafka.version}version>
- dependency>
- /**
- * @description kafka配置类
- */
- @EnableKafka
- @Configuration
- public class KafkaConfig {
-
- /**
- * 消费者配置 因为当前应用的多个consumer消费不同的broker集群 因此这里把kafkaServerUrls提出来当做入参
- * @param kafkaServerUrls
- * @return
- */
- public Map
consumerConfigs(String kafkaServerUrls) { - Map
props = Maps.newHashMap(); - // broker server地址
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrls);
- // 自动提交(按周期)已消费offset 批量消费下设置false
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- // session超时时间 broker提出consumer的心跳间隔
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ConfigManager.getString("kafka.session.timeout.ms", "60000"));
- // 最大消息拉取条数
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, ConfigManager.getString("kafka.max.poll.records", "500"));
- // 序列化
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- // 默认的groupId 未配置则在注解中声明
- props.put(ConsumerConfig.GROUP_ID_CONFIG, ConfigManager.getString(BizConstants.KAFKA_GROUP_ID));
- // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- // 加密
- props.put("sasl.mechanism", "SCRAM-SHA-256");
- props.put("security.protocol", "SASL_PLAINTEXT");
- // 账号密码
- props.put("sasl.jaas.config", ConfigManager.getString(BizConstants.KAFKA_SASL_JAAS_CONFIG));
- return props;
- }
-
- /**
- * 消费者工厂类
- *
- * @param kafkaServerUrls
- * @return
- */
- public ConsumerFactory
initConsumerFactory(String kafkaServerUrls) { - return new DefaultKafkaConsumerFactory(consumerConfigs(kafkaServerUrls));
- }
-
- public KafkaListenerContainerFactory
> initKafkaListenerContainerFactory(String kafkaServerUrls) { - ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory(); - factory.setConsumerFactory(initConsumerFactory(kafkaServerUrls));
- factory.setConcurrency(ConfigManager.getInteger("kafka.concurrency", 1));
- // listener类型为批量batch类型
- factory.setBatchListener(true);
- // offset提交模式为batch
- factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
- factory.getContainerProperties().setPollTimeout(ConfigManager.getInteger("kafka.poll.timeout", 3000));
- return factory;
- }
-
- /**
- * 工厂A 绑定A集群
- * @return
- */
- @Bean(name = "AKafkaListenerContainerFactory")
- public KafkaListenerContainerFactory
> moliKafkaListenerContainerFactory() { - return initKafkaListenerContainerFactory(ConfigManager.get("gameCenter.brokers", "moli-kafka.prd.lan:9092"));
- }
-
- /**
- * 工厂B 绑定B集群
- * @return
- */
- @Bean(name = "BKafkaListenerContainerFactory")
- public KafkaListenerContainerFactory
> xiangrikuiKafkaListenerContainerFactory() { - return initKafkaListenerContainerFactory(ConfigManager.get("minigame.brokers", "xiangrikui-kafka.prd.lan:9092"));
- }
-
- }
- @Slf4j
- @Component
- public class MyKafkaListener implements BatchMessageListener
{ -
- public static final Integer BIZ_CODE = 2;
-
- @Autowired
- private KafkaMsgHandleService kafkaMsgHandleService;
-
- @Autowired
- @Qualifier("dataDiskThreadPool")
- private ThreadPoolTaskExecutor executor;
-
- /**
- * Invoked with data from kafka.
- *
- * @param batch the data to be processed.
- */
- @Override
- @KafkaListener(
- containerFactory = "AKafkaListenerContainerFactory",
- topics = "${my.topic}",
- groupId = "${my.consumer.group}"
- )
- public void onMessage(List
> batch) { - try {
- final List
msgBatch = batch.stream().map(ConsumerRecord::value).collect(Collectors.toList()); - msgBatch.forEach(this::processSingleRecord);
- if (EnvUtils.isNotPrdPressEnv()) {
- log.info("本次接收的kafka消息体size={} content:{}", msgBatch.size(), JSON.toJSONString(msgBatch));
- }
- } catch (Exception e) {
- log.error("KafkaListener_kafka_consume_error.", e);
- }
- }
-
- private void processSingleRecord(String data) {
- executor.submit(() -> {
- if (!ConfigManager.getBoolean("my.kafka.switch", true)) {
- log.warn("KafkaListener_turn_off_drop_message.");
- return;
- }
- kafkaMsgHandleService.handle(data, BIZ_CODE);
- });
- }
-
- }
对于不同的消费者配置,可以实现不同的KafkaListener来处理消息,上面的代码就针对自动提交/批量消费的消费者配置,实现BatchMessageListener接口;可参考下面官方文档Spring for Apache Kafka给出的几种针对不同配置的消费者接口;
- public interface MessageListener
{ -
- void onMessage(ConsumerRecord
data) ; -
- }
-
- public interface AcknowledgingMessageListener
{ -
- void onMessage(ConsumerRecord
data, Acknowledgment acknowledgment) ; -
- }
-
- public interface ConsumerAwareMessageListener
extends MessageListener { -
- void onMessage(ConsumerRecord
data, Consumer, ?> consumer) ; -
- }
-
- public interface AcknowledgingConsumerAwareMessageListener
extends MessageListener { -
- void onMessage(ConsumerRecord
data, Acknowledgment acknowledgment, Consumer, ?> consumer) ; -
- }
-
- public interface BatchMessageListener
{ -
- void onMessage(List
> data) ; -
- }
-
- public interface BatchAcknowledgingMessageListener
{ -
- void onMessage(List
> data, Acknowledgment acknowledgment) ; -
- }
-
- public interface BatchConsumerAwareMessageListener
extends BatchMessageListener { -
- void onMessage(List
> data, Consumer, ?> consumer) ; -
- }
-
- public interface BatchAcknowledgingConsumerAwareMessageListener
extends BatchMessageListener { -
- void onMessage(List
> data, Acknowledgment acknowledgment, Consumer, ?> consumer) ; -
- }
- public @interface KafkaListener {
-
- /**
- * 监听器id
- */
- String id() default "";
-
- /**
- * 监听器工厂
- */
- String containerFactory() default "";
-
- /**
- * 监听器topics
- */
- String[] topics() default {};
-
- /**
- * 监听器topics匹配正则表达式
- */
- String topicPattern() default "";
-
- /**
- * 监听器分区
- */
- TopicPartition[] topicPartitions() default {};
-
- /**
- * 异常处理器
- */
- String errorHandler() default "";
-
- /**
- * 分组id
- */
- String groupId() default "";
-
- /**
- * 是否使用id作为groupId
- */
- boolean idIsGroup() default true;
-
- }
3.1 id 监听器的id
(1)监听器id属性,可用来命名消费者线程,如下:
填写id = "consumer-id5",线程名如下:
2022-8-8 17:27:30 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消费
没有填写ID,线程名如下:
2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7
(2)需要注意,在相同容器中,监听器ID不能重复
否则会报错:
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id...
(3)会覆盖消费者工厂的消费组GroupId
例如,消费者工厂属性配置了消费组kafka.consumer.group-id=BASE-DEMO,它是该容器中的默认消费组;但是如果设置了 @KafkaListener(id = "consumer-id7"),那么当前消费者的消费组就是consumer-id7;当然如果你不想要他作为groupId的话可以设置属性idIsGroup = false,那么还是会使用默认的GroupId;
(4)如果配置了属性groupId,则groupId优先级最高
@KafkaListener(id = "consumer-id5", idIsGroup = false, topics = "My_TOPIC", groupId = "groupId-test")
3.2 监听器的topic
关于topic的配置有3种,topics、topicPattern、topicPartitions 三选一;
(1)topics属性,这种方式最简单,可以指定多个topic
- @KafkaListener(
- topics = {"SHI_TOPIC3","SHI_TOPIC4"},
- groupId = "${gameCenter.consumer.group}"
- )
(2)topicPattern,支持表达式
-
- @KafkaListener(id = "pullPatternMsg", topicPattern = "rx_.*_.*_thing.*", concurrency = "1")
- public void pullPatternMsg(@Payload String data,
- @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ByteBuffer key,
- Acknowledgment ack, //手动提交offset
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
- @Header(KafkaHeaders.OFFSET) long offSet,
-
- Consumer, ?> consumer //消费者
- )
(3)topicPartitions显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)
- @KafkaListener(id = "thing2", topicPartitions =
- { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
- @TopicPartition(topic = "topic2", partitions = "0",
- partitionOffsets = @PartitionOffset(partition = "1",
- initialOffset = "100"))
- })
- public void listen(ConsumerRecord, ?> record) {
- ...
- }
上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;
3.3 errorHandler异常处理器
可以在consumer中手动try/catch,也可以实现KafkaListenerErrorHandler复用异常处理逻辑;
- @Component("kafkaErrorHandler")
- public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
- @Override
- public Object handleError(Message> message, ListenerExecutionFailedException exception) {
- return null;
- }
-
- @Override
- public Object handleError(Message> message, ListenerExecutionFailedException exception, Consumer, ?> consumer) {
- //do someting
- return null;
- }
- }
调用的时候errorHandler的值填写beanName,如下:
- @KafkaListener(
- containerFactory = "moliKafkaListenerContainerFactory",
- topics = "${gameCenter.topic}",
- groupId = "${gameCenter.consumer.group}",
- errorHandler = "kafkaErrorHandler"
- )
3.4 containerFactory监听器工厂
kafka的配置就放在这里;
- @Bean(name = "moliKafkaListenerContainerFactory")
- public KafkaListenerContainerFactory
> moliKafkaListenerContainerFactory() { - return initKafkaListenerContainerFactory(ConfigManager.get("gameCenter.brokers", "moli-kafka.prd.lan:9092"));
- }
- @KafkaListener(
- containerFactory = "moliKafkaListenerContainerFactory",
- topics = "${gameCenter.topic}",
- groupId = "${gameCenter.consumer.group}"
- )
kafka有三种分区分配策略
1. RoundRobin
2. Range
3. Sticky
1. RoundRobin
(1)把所有topic的分区partition放入一个队列中,按照name的hashcode进行排序;
(2)把consumer放在一个循环队列,按照name的hashcode进行排序;
(3)循环遍历consumer,从partition队列pop出一个partition,分配给当前consumer;以此类推,取下一个consumer,继续从partition队列pop出来分配给当前consumer;直到partition队列中的元素被分配完;
2. Range
(1)假设topicA有4个分区,topicB有5个分区,topicC有6个分区;一共有3个consumer;
(2)遍历3个topic的分区集合,先取topicA的分区集合,然后准备依次给3个consumer分配分区;对于第1个consumer,所分配的分区数量根据以下公式:假设消费者数量为N,当前主题剩下的分区数量为M,则当前消费者应该分配的分区数量 = M%N==0? M/N +1 : M/N ;按照公式,3个消费者应该分配的分区数量依次为:2/1/1,即topicA-partition-0/1分配给consumer-0,topicA-partition-2分配给consumer-1,topicA-partition-3分配给consumer-2;
(3)按照上述规则按序把topicB和topicC的分区分配给3个consumer;依次为:2/2/1,2/2/2;
3. Sticky
kafka在0.11版本引入了Sticky分区分配策略,它的两个主要目的是:
1. 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;
2. 分区的分配尽可能的与上次分配的保持相同;
当两者发生冲突时,第一个目标优先于第二个目标;
粘性分区是由Kafka从0.11x版本开始引入的分配策略,首先会尽量均衡的分配分区到消费者上面,在出现同一消费组内消费者出现问题的时候,会尽量保持原来的分配的分区不变;
Sticky分区初始分配分区的方法与Range相似,但是不同;拿7个分区3个消费者为例,消费者消费的分区依旧是3/2/2,但是不同与Range的是Range分区是排好序的,但是Sticky分区是随机的;
参考:
@KafkaListener详解与使用_python_石臻臻的杂货铺-DevPress官方社区