• Spring-Kafka系列(3)—— SpringKafka消费者监听MessageListener


    2.3 SpringKafka消费者

    2.3 SpringKafka消费者

    2.3.1 Kafka消息监听器MessageListener

    之前已经介绍了通过kafka-topic-consumer.shkafka-tool工具来消费数据。下面介绍SpringKafka消费数据的方式——kafka消息监听器。

    Kafka的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。GenericMessageListenerSpringKafka的消息监听器接口,也是一个函数式接口,利用接口的onMessage方法可以实现消费数据。

    public interface GenericMessageListener {
    	void onMessage(T data);
        
    	default void onMessage(T data, @Nullable Acknowledgment acknowledgment) {
    		throw new UnsupportedOperationException("Container should never call this");
    	}
    
    	default void onMessage(T data, Consumer consumer) {
    		throw new UnsupportedOperationException("Container should never call this");
    	}
    
    	default void onMessage(T data, @Nullable Acknowledgment acknowledgment, Consumer consumer) {
    		throw new UnsupportedOperationException("Container should never call this");
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    基于此接口可以实现单条数据消息监听器接口MessageListenen、多条数据消息监听器接口BatchMessageListener、带ACK机制的消息监听器AcknowledgingMessageListenerBatchAcknowledgingMessageListener

    #mermaid-svg-11qFALBGTxUu6O9B {font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-11qFALBGTxUu6O9B .error-icon{fill:#552222;}#mermaid-svg-11qFALBGTxUu6O9B .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-11qFALBGTxUu6O9B .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-11qFALBGTxUu6O9B .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-11qFALBGTxUu6O9B .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-11qFALBGTxUu6O9B .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-11qFALBGTxUu6O9B .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-11qFALBGTxUu6O9B .marker{fill:#333333;stroke:#333333;}#mermaid-svg-11qFALBGTxUu6O9B .marker.cross{stroke:#333333;}#mermaid-svg-11qFALBGTxUu6O9B svg{font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-11qFALBGTxUu6O9B .label{font-family:“trebuchet ms”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-11qFALBGTxUu6O9B .cluster-label text{fill:#333;}#mermaid-svg-11qFALBGTxUu6O9B .cluster-label span{color:#333;}#mermaid-svg-11qFALBGTxUu6O9B .label text,#mermaid-svg-11qFALBGTxUu6O9B span{fill:#333;color:#333;}#mermaid-svg-11qFALBGTxUu6O9B .node rect,#mermaid-svg-11qFALBGTxUu6O9B .node circle,#mermaid-svg-11qFALBGTxUu6O9B .node ellipse,#mermaid-svg-11qFALBGTxUu6O9B .node polygon,#mermaid-svg-11qFALBGTxUu6O9B .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-11qFALBGTxUu6O9B .node .label{text-align:center;}#mermaid-svg-11qFALBGTxUu6O9B .node.clickable{cursor:pointer;}#mermaid-svg-11qFALBGTxUu6O9B .arrowheadPath{fill:#333333;}#mermaid-svg-11qFALBGTxUu6O9B .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-11qFALBGTxUu6O9B .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-11qFALBGTxUu6O9B .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-11qFALBGTxUu6O9B .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-11qFALBGTxUu6O9B .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-11qFALBGTxUu6O9B .cluster text{fill:#333;}#mermaid-svg-11qFALBGTxUu6O9B .cluster span{color:#333;}#mermaid-svg-11qFALBGTxUu6O9B div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-11qFALBGTxUu6O9B :root{–mermaid-font-family:“trebuchet ms”,verdana,arial,sans-serif;}

    MessageListener

    GenericMessageListener

    单条数据监听器

    BatchMessageListener

    批量数据监听器

    AckowledgingMessageListenenr

    带ACK的单条数据监听器

    BatchAckowledgingMessageListener

    带ACK机制的批量数据监听器

    2.3.2 消息监听容器与容器工厂

    消息监听器MessageListener是由消费监听器容器MessageListenerContainer接口来承载,使用setupMessageListenner()方法启动一个监听器。其中还有定义了操作消息的resume()pause()等方法。

    public interface MessageListenerContainer extends SmartLifecycle {
        // 启动一个消息监听器
    	void setupMessageListener(Object messageListener);
        // 获取消费者的指标信息
        Map> metrics();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    spring-kafka提供了两个容器KafkaMessageListenerContainerConcurrentMessageListenerContainer

    #mermaid-svg-AShjsbIwvUJstX1D {font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-AShjsbIwvUJstX1D .error-icon{fill:#552222;}#mermaid-svg-AShjsbIwvUJstX1D .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-AShjsbIwvUJstX1D .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-AShjsbIwvUJstX1D .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-AShjsbIwvUJstX1D .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-AShjsbIwvUJstX1D .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-AShjsbIwvUJstX1D .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-AShjsbIwvUJstX1D .marker{fill:#333333;stroke:#333333;}#mermaid-svg-AShjsbIwvUJstX1D .marker.cross{stroke:#333333;}#mermaid-svg-AShjsbIwvUJstX1D svg{font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-AShjsbIwvUJstX1D .label{font-family:“trebuchet ms”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-AShjsbIwvUJstX1D .cluster-label text{fill:#333;}#mermaid-svg-AShjsbIwvUJstX1D .cluster-label span{color:#333;}#mermaid-svg-AShjsbIwvUJstX1D .label text,#mermaid-svg-AShjsbIwvUJstX1D span{fill:#333;color:#333;}#mermaid-svg-AShjsbIwvUJstX1D .node rect,#mermaid-svg-AShjsbIwvUJstX1D .node circle,#mermaid-svg-AShjsbIwvUJstX1D .node ellipse,#mermaid-svg-AShjsbIwvUJstX1D .node polygon,#mermaid-svg-AShjsbIwvUJstX1D .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-AShjsbIwvUJstX1D .node .label{text-align:center;}#mermaid-svg-AShjsbIwvUJstX1D .node.clickable{cursor:pointer;}#mermaid-svg-AShjsbIwvUJstX1D .arrowheadPath{fill:#333333;}#mermaid-svg-AShjsbIwvUJstX1D .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-AShjsbIwvUJstX1D .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-AShjsbIwvUJstX1D .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-AShjsbIwvUJstX1D .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-AShjsbIwvUJstX1D .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-AShjsbIwvUJstX1D .cluster text{fill:#333;}#mermaid-svg-AShjsbIwvUJstX1D .cluster span{color:#333;}#mermaid-svg-AShjsbIwvUJstX1D div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-AShjsbIwvUJstX1D :root{–mermaid-font-family:“trebuchet ms”,verdana,arial,sans-serif;}

    GenericMessageListenerContainer

    MessageListenerContainer

    AbstractMessageListenerContainer

    KafkaMessageListenerConatiner

    ConcurrentMessageListenerCOntainer

    消息监听器容器由容器工厂KafkaListenerContainerFactory统一创建并管理

    public interface KafkaListenerContainerFactory {
        // 根据endpoint创建监听器容器
        C createListenerContainer(KafkaListenerEndpoint endpoint);
        // 根据topic、partition和offset的配置
    	C createContainer(TopicPartitionOffset... topicPartitions);
    	// 根据topic创建监听器容器
        C createContainer(String... topics);
        // 根据topic的正则表达式创建监听器容器
    	C createContainer(Pattern topicPattern);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    spring-kafka提供了监听器容器工厂ConcurrentKafkaListenerContainerFactory,其有两个重要的配置

    ContainerPropertiesConsumerFactory

    #mermaid-svg-oFHzGvGC4hl330UK {font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-oFHzGvGC4hl330UK .error-icon{fill:#552222;}#mermaid-svg-oFHzGvGC4hl330UK .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-oFHzGvGC4hl330UK .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-oFHzGvGC4hl330UK .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-oFHzGvGC4hl330UK .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-oFHzGvGC4hl330UK .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-oFHzGvGC4hl330UK .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-oFHzGvGC4hl330UK .marker{fill:#333333;stroke:#333333;}#mermaid-svg-oFHzGvGC4hl330UK .marker.cross{stroke:#333333;}#mermaid-svg-oFHzGvGC4hl330UK svg{font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-oFHzGvGC4hl330UK .label{font-family:“trebuchet ms”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-oFHzGvGC4hl330UK .cluster-label text{fill:#333;}#mermaid-svg-oFHzGvGC4hl330UK .cluster-label span{color:#333;}#mermaid-svg-oFHzGvGC4hl330UK .label text,#mermaid-svg-oFHzGvGC4hl330UK span{fill:#333;color:#333;}#mermaid-svg-oFHzGvGC4hl330UK .node rect,#mermaid-svg-oFHzGvGC4hl330UK .node circle,#mermaid-svg-oFHzGvGC4hl330UK .node ellipse,#mermaid-svg-oFHzGvGC4hl330UK .node polygon,#mermaid-svg-oFHzGvGC4hl330UK .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-oFHzGvGC4hl330UK .node .label{text-align:center;}#mermaid-svg-oFHzGvGC4hl330UK .node.clickable{cursor:pointer;}#mermaid-svg-oFHzGvGC4hl330UK .arrowheadPath{fill:#333333;}#mermaid-svg-oFHzGvGC4hl330UK .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-oFHzGvGC4hl330UK .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-oFHzGvGC4hl330UK .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-oFHzGvGC4hl330UK .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-oFHzGvGC4hl330UK .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-oFHzGvGC4hl330UK .cluster text{fill:#333;}#mermaid-svg-oFHzGvGC4hl330UK .cluster span{color:#333;}#mermaid-svg-oFHzGvGC4hl330UK div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-oFHzGvGC4hl330UK :root{–mermaid-font-family:“trebuchet ms”,verdana,arial,sans-serif;}

    AbstractKafkaListenerContainerFactory

    KafkaListenerContainerFactory

    ConcurrentKafkaListenerContainerFactory

    ContainerProperties定义了要消费消息的topic,消息处理的MessageListener等信息。

    因此要实现一个消息监听器的流程如下:

    #mermaid-svg-VZRVgqMYDSikNYS2 {font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .error-icon{fill:#552222;}#mermaid-svg-VZRVgqMYDSikNYS2 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-VZRVgqMYDSikNYS2 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-VZRVgqMYDSikNYS2 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-VZRVgqMYDSikNYS2 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-VZRVgqMYDSikNYS2 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-VZRVgqMYDSikNYS2 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-VZRVgqMYDSikNYS2 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-VZRVgqMYDSikNYS2 .marker.cross{stroke:#333333;}#mermaid-svg-VZRVgqMYDSikNYS2 svg{font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-VZRVgqMYDSikNYS2 .label{font-family:“trebuchet ms”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .cluster-label text{fill:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .cluster-label span{color:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .label text,#mermaid-svg-VZRVgqMYDSikNYS2 span{fill:#333;color:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .node rect,#mermaid-svg-VZRVgqMYDSikNYS2 .node circle,#mermaid-svg-VZRVgqMYDSikNYS2 .node ellipse,#mermaid-svg-VZRVgqMYDSikNYS2 .node polygon,#mermaid-svg-VZRVgqMYDSikNYS2 .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-VZRVgqMYDSikNYS2 .node .label{text-align:center;}#mermaid-svg-VZRVgqMYDSikNYS2 .node.clickable{cursor:pointer;}#mermaid-svg-VZRVgqMYDSikNYS2 .arrowheadPath{fill:#333333;}#mermaid-svg-VZRVgqMYDSikNYS2 .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-VZRVgqMYDSikNYS2 .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-VZRVgqMYDSikNYS2 .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-VZRVgqMYDSikNYS2 .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-VZRVgqMYDSikNYS2 .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-VZRVgqMYDSikNYS2 .cluster text{fill:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .cluster span{color:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-VZRVgqMYDSikNYS2 :root{–mermaid-font-family:“trebuchet ms”,verdana,arial,sans-serif;}

    KafkaProperties

    ConsumerFactory

    ContainerProperties

    ContainerFactory

    ListenerContainer

    MessageListener

    2.3.3 非注解式消费监听器

    SpringKafka的消费者是由一个消费监听器容器ListenerConatiner去承载的,容器对应一个配置文件为ContainerPropertiesContainerProperties继承自消费者配置类ConsumerProperties,并且承载了消息监听器的设置

    #mermaid-svg-BgCGzMWDlV1c7PWP {font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .error-icon{fill:#552222;}#mermaid-svg-BgCGzMWDlV1c7PWP .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-BgCGzMWDlV1c7PWP .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-BgCGzMWDlV1c7PWP .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-BgCGzMWDlV1c7PWP .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-BgCGzMWDlV1c7PWP .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-BgCGzMWDlV1c7PWP .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-BgCGzMWDlV1c7PWP .marker{fill:#333333;stroke:#333333;}#mermaid-svg-BgCGzMWDlV1c7PWP .marker.cross{stroke:#333333;}#mermaid-svg-BgCGzMWDlV1c7PWP svg{font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-BgCGzMWDlV1c7PWP .label{font-family:“trebuchet ms”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .cluster-label text{fill:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .cluster-label span{color:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .label text,#mermaid-svg-BgCGzMWDlV1c7PWP span{fill:#333;color:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .node rect,#mermaid-svg-BgCGzMWDlV1c7PWP .node circle,#mermaid-svg-BgCGzMWDlV1c7PWP .node ellipse,#mermaid-svg-BgCGzMWDlV1c7PWP .node polygon,#mermaid-svg-BgCGzMWDlV1c7PWP .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-BgCGzMWDlV1c7PWP .node .label{text-align:center;}#mermaid-svg-BgCGzMWDlV1c7PWP .node.clickable{cursor:pointer;}#mermaid-svg-BgCGzMWDlV1c7PWP .arrowheadPath{fill:#333333;}#mermaid-svg-BgCGzMWDlV1c7PWP .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-BgCGzMWDlV1c7PWP .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-BgCGzMWDlV1c7PWP .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-BgCGzMWDlV1c7PWP .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-BgCGzMWDlV1c7PWP .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-BgCGzMWDlV1c7PWP .cluster text{fill:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .cluster span{color:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-BgCGzMWDlV1c7PWP :root{–mermaid-font-family:“trebuchet ms”,verdana,arial,sans-serif;}

    ContainerProperties

    ConsumerProperties

    首先介绍非注解式的消息监听器,类似于ProducerFactory,消费者需要创建一个ConsumerFactory

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }
    
    • 1
    • 2
    • 3
    • 4

    然后建立监听器容器工厂ConcurrentKafkaListenerContainerFactory

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    有了容器工厂之后,就可以通过注册bean的方式生成一个MessageListenerContainer

    @Bean
    public KafkaMessageListenerContainer kafkaMessageListenerContainer(
        ConsumerFactory consumerFactory) {
        ContainerProperties containerProperties = new ContainerProperties("numb");
        containerProperties.setMessageListener(
            (MessageListener) data -> System.out.println("收到消息: " + data.value()));
        return new KafkaMessageListenerContainer(consumerFactory, containerProperties);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这个kafkaMessageListenerContainer中,通过ContainerProperties配置了消费的topic和messageListener。之后启动项目后,spring会将kafkaMessageListenerContainer注册到ConcurrentKafkaListenerContainerFactory中,这样获取到数据后会自动调用消息监听器进行数据处理。

    测试消费者消费数据

    @Test
    public void test_send_and_consume() {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        threadPool.submit(() -> {
            while (true) {
                kafkaTemplate.send(KafkaConsts.TOPIC_TEST, UUID.randomUUID().toString(), "kv");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("发送完成");
            }
        });
        while (true);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    输出:

    发送完成
    发送成功
    收到消息: kv
    发送完成
    发送成功
    收到消息: kv
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.3.4 注解式消费监听器@KafkaListener

    之前配置了容器监听器工厂ConcurrentKafkaListenerContainerFactory之后,还需要用代码配置MessageListenerContainer, 指定消费的topic、消息监听器处理等。其实上面这步完全可以通过注解@KafkaListener实现。

    @Component
    @Slf4j
    public class MessageHandler {
        @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "kafkaListenerContainerFactory", id = "consumer_numb"
            // , topicPartitions = { @TopicPartition(topic = "numb", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset="1")})}
            )
        public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
            try {
                String message = (String) record.value();
                log.info("收到消息: {}", message);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            } finally {
                // 手动提交 offset
                acknowledgment.acknowledge();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    @KafkaListener的主要属性

    • id:监听器的id

    • groupId:消费组id

    • idIsGroup:是否用id作为groupId,如果置为false,并指定groupId时,消费组ID使用groupId;否则默认为true,会使用监听器的id作为groupId

    • topics:指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

    • topicPattern: 匹配Topic进行监听(与topics、topicPartitions 三选一)

    • topicPartitions: 显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

      @KafkaListener(id = “thing2”, topicPartitions =
      { @TopicPartition(topic = “topic1”, partitions = { “0”, “1” }),
      @TopicPartition(topic = “topic2”, partitions = “0”,
      partitionOffsets = @PartitionOffset(partition = “1”, initialOffset = “100”))
      })

    • containerFactory:指定监听器容器工厂

    • errorHandler: 监听异常处理器,配置BeanName

    • beanRef:真实监听容器的BeanName,需要在 BeanName前加 “__”

    • clientIdPrefix:消费者Id前缀

    • concurrency: 覆盖容器工厂containerFactory的并发配置

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    unity网络游戏开发
    proemtheus TSL加密认证
    vue3+vite配置eslint&prettier
    NUXT3.0实现网络请求二次封装
    打造一个开箱即用的超级丝滑的漂亮hexo博客网站
    Python:操作SQLite数据库简单示例
    关于《考研数学高分公式》系列的后续及一点说明
    Delta tuning(只抓核心)
    探索光模块的MSA多源协议
    AWS-Basic-S3
  • 原文地址:https://blog.csdn.net/sebeefe/article/details/126114193