• Spring-Kafka系列(3)—— SpringKafka消费者监听MessageListener


    2.3 SpringKafka消费者

    2.3.1 Kafka消息监听器MessageListener

    之前已经介绍了通过kafka-topic-consumer.shkafka-tool工具来消费数据。下面介绍SpringKafka消费数据的方式——kafka消息监听器。

    Kafka的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。GenericMessageListenerSpringKafka的消息监听器接口,也是一个函数式接口,利用接口的onMessage方法可以实现消费数据。

    public interface GenericMessageListener<T> {
    	void onMessage(T data);
        
    	default void onMessage(T data, @Nullable Acknowledgment acknowledgment) {
    		throw new UnsupportedOperationException("Container should never call this");
    	}
    
    	default void onMessage(T data, Consumer<?, ?> consumer) {
    		throw new UnsupportedOperationException("Container should never call this");
    	}
    
    	default void onMessage(T data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
    		throw new UnsupportedOperationException("Container should never call this");
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    基于此接口可以实现单条数据消息监听器接口MessageListenen、多条数据消息监听器接口BatchMessageListener、带ACK机制的消息监听器AcknowledgingMessageListenerBatchAcknowledgingMessageListener

    MessageListener
    GenericMessageListener
    单条数据监听器
    BatchMessageListener
    批量数据监听器
    AckowledgingMessageListenenr
    带ACK的单条数据监听器
    BatchAckowledgingMessageListener
    带ACK机制的批量数据监听器

    2.3.2 消息监听容器与容器工厂

    消息监听器MessageListener是由消费监听器容器MessageListenerContainer接口来承载,使用setupMessageListenner()方法启动一个监听器。其中还有定义了操作消息的resume()pause()等方法。

    public interface MessageListenerContainer extends SmartLifecycle {
        // 启动一个消息监听器
    	void setupMessageListener(Object messageListener);
        // 获取消费者的指标信息
        Map<String, Map<MetricName, ? extends Metric>> metrics();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    spring-kafka提供了两个容器KafkaMessageListenerContainerConcurrentMessageListenerContainer

    GenericMessageListenerContainer
    MessageListenerContainer
    AbstractMessageListenerContainer
    KafkaMessageListenerConatiner
    ConcurrentMessageListenerCOntainer

    消息监听器容器由容器工厂KafkaListenerContainerFactory统一创建并管理

    public interface KafkaListenerContainerFactory<C extends MessageListenerContainer> {
        // 根据endpoint创建监听器容器
        C createListenerContainer(KafkaListenerEndpoint endpoint);
        // 根据topic、partition和offset的配置
    	C createContainer(TopicPartitionOffset... topicPartitions);
    	// 根据topic创建监听器容器
        C createContainer(String... topics);
        // 根据topic的正则表达式创建监听器容器
    	C createContainer(Pattern topicPattern);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    spring-kafka提供了监听器容器工厂ConcurrentKafkaListenerContainerFactory,其有两个重要的配置

    ContainerPropertiesConsumerFactory

    AbstractKafkaListenerContainerFactory
    KafkaListenerContainerFactory
    ConcurrentKafkaListenerContainerFactory

    ContainerProperties定义了要消费消息的topic,消息处理的MessageListener等信息。

    因此要实现一个消息监听器的流程如下:

    KafkaProperties
    ConsumerFactory
    ContainerProperties
    ContainerFactory
    ListenerContainer
    MessageListener

    2.3.3 非注解式消费监听器

    SpringKafka的消费者是由一个消费监听器容器ListenerConatiner去承载的,容器对应一个配置文件为ContainerPropertiesContainerProperties继承自消费者配置类ConsumerProperties,并且承载了消息监听器的设置

    ContainerProperties
    ConsumerProperties

    首先介绍非注解式的消息监听器,类似于ProducerFactory,消费者需要创建一个ConsumerFactory

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }
    
    • 1
    • 2
    • 3
    • 4

    然后建立监听器容器工厂ConcurrentKafkaListenerContainerFactory

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    有了容器工厂之后,就可以通过注册bean的方式生成一个MessageListenerContainer

    @Bean
    public KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer(
        ConsumerFactory<String, String> consumerFactory) {
        ContainerProperties containerProperties = new ContainerProperties("numb");
        containerProperties.setMessageListener(
            (MessageListener<String, String>) data -> System.out.println("收到消息: " + data.value()));
        return new KafkaMessageListenerContainer(consumerFactory, containerProperties);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这个kafkaMessageListenerContainer中,通过ContainerProperties配置了消费的topic和messageListener。之后启动项目后,spring会将kafkaMessageListenerContainer注册到ConcurrentKafkaListenerContainerFactory中,这样获取到数据后会自动调用消息监听器进行数据处理。

    测试消费者消费数据

    @Test
    public void test_send_and_consume() {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        threadPool.submit(() -> {
            while (true) {
                kafkaTemplate.send(KafkaConsts.TOPIC_TEST, UUID.randomUUID().toString(), "kv");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("发送完成");
            }
        });
        while (true);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    输出:

    发送完成
    发送成功
    收到消息: kv
    发送完成
    发送成功
    收到消息: kv
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.3.4 注解式消费监听器@KafkaListener

    之前配置了容器监听器工厂ConcurrentKafkaListenerContainerFactory之后,还需要用代码配置MessageListenerContainer, 指定消费的topic、消息监听器处理等。其实上面这步完全可以通过注解@KafkaListener实现。

    @Component
    @Slf4j
    public class MessageHandler {
        @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "kafkaListenerContainerFactory", id = "consumer_numb"
            // , topicPartitions = { @TopicPartition(topic = "numb", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset="1")})}
            )
        public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
            try {
                String message = (String) record.value();
                log.info("收到消息: {}", message);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            } finally {
                // 手动提交 offset
                acknowledgment.acknowledge();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    @KafkaListener的主要属性

    • id:监听器的id

    • groupId:消费组id

    • idIsGroup:是否用id作为groupId,如果置为false,并指定groupId时,消费组ID使用groupId;否则默认为true,会使用监听器的id作为groupId

    • topics:指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

    • topicPattern: 匹配Topic进行监听(与topics、topicPartitions 三选一)

    • topicPartitions: 显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

    @KafkaListener(id = "thing2", topicPartitions =
            { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
              @TopicPartition(topic = "topic2", partitions = "0",
                 partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
            })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • containerFactory:指定监听器容器工厂

    • errorHandler: 监听异常处理器,配置BeanName

    • beanRef:真实监听容器的BeanName,需要在 BeanName前加 “__”

    • clientIdPrefix:消费者Id前缀

    • concurrency: 覆盖容器工厂containerFactory的并发配置

  • 相关阅读:
    C++类和对象(三)
    518. 零钱兑换II(完全背包问题)
    中国人保为易集康健康科技承保产品责任险,为消费者保驾护航!
    代码随想录算法训练营day50
    ntp服务器时钟同步
    面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
    高校宿舍系统
    多版本node的安装与切换详细操作
    Union和union导致的数据不一致
    【MYSQL】数据类型
  • 原文地址:https://blog.csdn.net/Numb_ZL/article/details/125433522