• SpringBoot用kafka.listener监听接受Kafka消息


    1.创建kafka监听配置并进行注册

    
    
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ContainerProperties;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author 35
     * @description kafka listen监听配置
     * @date 2024年04月24日 13:25
     */
    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
    
        // kafka实例
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        // kafka AI 服务的Group
        private String groupId = Constants.KAFKA_AI_SERVER_GROUP;
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            // 设置为可以手动消费
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
    }
    
    

    2.使用示例

      @KafkaListener(topics = Constants.KAFKA_USER_TOPIC, groupId = Constants.KAFKA_SERVER_GROUP)
        public void syncUserByKafKa(String message, Acknowledgment ack) {
            try {
                // 调用具体的执行方法
                 bb(message);
    
                // 提交kafka消费位移
                ack.acknowledge();
            } catch (Exception e) {
                log.error("失败:" + e.getMessage() + "消息:" + message);
            } finally {
                // 提交kafka消费位移
                ack.acknowledge();
            }
    
        }
    
  • 相关阅读:
    TinyEngine 开源低代码引擎首次直播答疑Q&A合集
    深度学习论文: Segment Any Anomaly without Training via Hybrid Prompt Regularization
    python中的变量的定义和使用
    QT网页 webengine / CEF
    人工智能-深度学习之序列模型
    有关于联邦学习
    python-单例模式demo代码
    本地PHP搭建简单Imagewheel私人云图床,在外远程访问——“cpolar内网穿透”
    【OpenCV DNN】Flask 视频监控目标检测教程 09
    优思学院|揭秘六西格玛:七大迷思你不可不知!
  • 原文地址:https://blog.csdn.net/qq_27860623/article/details/142181316