• Spring-ReactiveKafkaConsumer(反应式消费kafka消息)


    1.引入相应的kafka架包

        
            org.springframework.boot
            spring-boot-starter-parent
            2.6.6
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
                
            
                org.springframework.cloud
                spring-cloud-starter-stream-kafka
            
            
                org.springframework.kafka
                spring-kafka
            
            
                org.springframework.cloud
                spring-cloud-stream
            
            
                io.projectreactor.kafka
                reactor-kafka
                1.3.11
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    关键架包reactor-kafka

    2.创建监听监听Template

    package com.kittlen.cloud.reactivekafka.config;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
    import org.springframework.stereotype.Component;
    import reactor.kafka.receiver.ReceiverOptions;
    
    import java.util.Collections;
    
    /**
     * @author kittlen
     * @version 1.0
     * @date 2022/38/04 10:38
     */
    @Component
    public class ReactiveConsumerConfig {
    
        @Bean
        public ReceiverOptions<String, String> kafkaReceiverOptions(@Value(value = "${kafka.consumer.topic}") String topic, KafkaProperties kafkaProperties) {
            ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
            return basicReceiverOptions.subscription(Collections.singletonList(topic));
        }
    
        @Bean
        public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
            return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    3.根据Template创建对应的实际监听业务

    package com.kittlen.cloud.reactivekafka.consumers;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
    import org.springframework.stereotype.Service;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    import reactor.kafka.receiver.ReceiverRecord;
    
    /**
     * @author kittlen
     * @version 1.0
     * @date 2022/40/04 10:40
     */
    @Service
    public class ReactiveConsumerService implements CommandLineRunner {
    
        protected Log log = LogFactory.getLog(ReactiveConsumerService.class);
    
        @Autowired
        ReactiveKafkaConsumerTemplate<String, String> requestMsgReactiveKafkaConsumerTemplate;
    
        private Flux<Mono<Boolean>> dgkConsummer() {
            Flux<Mono<Boolean>> monoFlux = requestMsgReactiveKafkaConsumerTemplate
                    .receiveAutoAck()
                    .map(cr -> handler(cr))
                    .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
            return monoFlux;
        }
    
        //返回类型根据实际需求自己进行调整
        //在该方法里面如果直接抛出异常,会直接导致停止对该topic的监听
        protected Mono<Boolean> handler(ConsumerRecord<String, String> consumerRecord) {
            try{
                 /*
                * 对监听到的数据的处理逻辑 
                * */
                return Mono.just(true);
            }catch (Exception e) {
                return Mono.error(e);
            }
        }
    
        @Override
        public void run(String... args) {
            dgkConsummer().subscribe(m -> m.subscribe());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    4.监听多topic

    使用同一个ReactiveKafkaConsumerTemplate

    创建kafkaReceiverOptions时订阅多个topic

     @Bean
        public ReceiverOptions<String, String> kafkaReceiverOptions(KafkaProperties kafkaProperties) {
            ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
            return basicReceiverOptions.subscription(Stream.of("topic1", "topic2").collect(Collectors.toList()));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    处理消息时根据topic进行判断

      protected Mono<Boolean> handler(ConsumerRecord<String, String> consumerRecord) {
            try{
                if(consumerRecord.topic().equals("topic1")){
                 //***
                 }
                 /*
                * 对监听到的数据的处理逻辑 
                * */
                return Mono.just(true);
            }catch (Exception e) {
                return Mono.error(e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    定义多个监听template,再根据template创建对应的实际监听业务

    
        @Bean
        public ReceiverOptions<String, String> kafkaReceiverOptions1(KafkaProperties kafkaProperties) {
            ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
            return basicReceiverOptions.subscription(Collections.singletonList("topic1"));
        }
    
        @Bean
        public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate1(ReceiverOptions<String, String> kafkaReceiverOptions1) {
            return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions1);
        }
        @Bean
        public ReceiverOptions<String, String> kafkaReceiverOptions2(KafkaProperties kafkaProperties) {
            ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
            return basicReceiverOptions.subscription(Collections.singletonList("topic2"));
        }
    
        @Bean
        public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate2(ReceiverOptions<String, String> kafkaReceiverOptions2) {
            return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions2);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    
        @Resource(name = "reactiveKafkaConsumerTemplate1")
        ReactiveKafkaConsumerTemplate<String, String> requestMsgReactiveKafkaConsumerTemplate1;
        
        @Resource(name = "reactiveKafkaConsumerTemplate2")
        ReactiveKafkaConsumerTemplate<String, String> requestMsgReactiveKafkaConsumerTemplate2;
    
         private Flux<Mono<Boolean>> dgkConsummer1() {
            Flux<Mono<Boolean>> monoFlux = requestMsgReactiveKafkaConsumerTemplate1
                    .receiveAutoAck()
                    .map(cr -> handler1(cr))
                    .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
            return monoFlux;
        }
    
        //返回类型根据实际需求自己进行调整
        protected Mono<Boolean> handler1(ConsumerRecord<String, String> consumerRecord) {
            try{
                 /*
                * 对监听到的数据的处理逻辑 
                * */
                return Mono.just(true);
            }catch (Exception e) {
                return Mono.error(e);
            }
        }
         private Flux<Mono<Boolean>> dgkConsummer2() {
            Flux<Mono<Boolean>> monoFlux = requestMsgReactiveKafkaConsumerTemplate2
                    .receiveAutoAck()
                    .map(cr -> handler2(cr))
                    .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
            return monoFlux;
        }
    
        //返回类型根据实际需求自己进行调整
        protected Mono<Boolean> handler2(ConsumerRecord<String, String> consumerRecord) {
            try{
                 /*
                * 对监听到的数据的处理逻辑 
                * */
                return Mono.just(true);
            }catch (Exception e) {
                return Mono.error(e);
            }
        }
    
        @Override
        public void run(String... args) {
            dgkConsummer().subscribe(m -> m.subscribe());
            dgkConsummer2().subscribe(m -> m.subscribe());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
  • 相关阅读:
    Java:垃圾收集 CPU 统计信息
    gdb监视
    关于echarts+vue频繁刷新的造成的内存增长问题
    Linux工具 - 好用的yum包管理器
    WebRTC系列-网络传输之本地scoket端口
    短信发送:使用RestTemplate的时候,遇到类型无法转换的问题
    DLP迈向NG DLP的进化之路
    java项目之服装定制系统(ssm框架)
    QTP——功能测试
    【数据结构基础_树】Leetcode 230.二叉搜索树中第k小的元素
  • 原文地址:https://blog.csdn.net/weixin_44728369/article/details/126161759