• springboot kafka消息消费学习 @KafkaListener 使用


    kafka 配置类

    用途:定义使用的基本 kafka 配置,以及定义Bean
    下面文件是读取本地 spring 的标准配置文件的类,用于一般属性获取等操作

    import lombok.Data;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    import org.springframework.kafka.listener.ContainerProperties;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Component
    @ConfigurationProperties(prefix = "my.kafka")
    @Data
    public class MyTaskKafkaProperties {
    
        /**r
         * kafka地址
         */
        private String serverUrl;
    
        /**
         * groupId
         */
        private String groupId;
    
        /**
         * topic
         */
        private String topic;
    
        private boolean enableAutoCommit;
    
        private String autoOffsetReset;
    
        @Bean
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(6);
            factory.getContainerProperties().setPollTimeout(6000);
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
            return factory;
        }
    
        private ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        private Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            return props;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    @Data 为其他用于控制get set 方法的,与 此处配置不是强关联,可以没有

    实际 kafka 监听消费

    import com.dtdream.dthink.dtalent.dmall.openplat.service.opendata.OpenDataService;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.kafka.support.KafkaHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    @Slf4j
    @ConditionalOnProperty(name = "my.kafka.enable", havingValue = "true")
    @Component
    public class MyTaskConsumer {
        @Autowired
        private XxxxxService xxxxxService;
    
        @KafkaListener(topics = "${my.kafka.topic}", groupId = "${my.kafka.groupId}",
                containerFactory = "kafkaTwoContainerFactory")
        public void dxpTaskEnd(ConsumerRecord<String, String> record, Acknowledgment ack,
                               @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
            consume(record, ack, topic, msg -> xxxxxService.xxxxxxx(msg));
        }
    
        private void consume(ConsumerRecord<String, String> record, Acknowledgment ack, String topic,
                             java.util.function.Consumer<String> consumer) {
            Optional<String> optional = Optional.ofNullable(record.value());
            if (!optional.isPresent()) {
                log.warn("kafka收到消息 但为空,record:{}", record);
                return;
            }
            String msg = optional.get();
            log.info("kafka收到消息  开始消费 topic:{},msg:{}", topic, msg);
            try {
                consumer.accept(msg);
                // 上面方法执行成功后手动提交
                ack.acknowledge();
                log.info("kafka收到消息消费成功 topic:{},msg:{}", topic, msg);
            } catch (Exception e) {
                log.error("kafka消费消息失败 topic:{},msg:{}", topic, msg, e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    @ConditionalOnProperty spring boot 用于判断当前类是否加载的条件

    XxxxxService: 为我们的业务服务层,用于消费消息

  • 相关阅读:
    查找算法思想及代码——C语言
    【CSS】CSS入门笔记第三弹~
    机器学习模型与backtrader框架整合
    Google高性能开源框架gRPC:快速搭建及HTTP/2抓包
    宇宙采集器 淘宝商家电话采集爬虫分享
    HarmonyOS 4.0 实况窗上线!支付宝实现医疗场景智能提醒
    java编程基础总结——18.ArrayList源码解析
    【python】 16进制字符串转list
    杠杆炒股中吸筹是什么?
    【Linux】文件系统及动静态库
  • 原文地址:https://blog.csdn.net/weixin_44131922/article/details/132691585