• SpringBoot用kafka.listener监听接受Kafka消息


    1.创建kafka监听配置并进行注册

    
    
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ContainerProperties;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author 35
     * @description kafka listen监听配置
     * @date 2024年04月24日 13:25
     */
    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
    
        // kafka实例
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        // kafka AI 服务的Group
        private String groupId = Constants.KAFKA_AI_SERVER_GROUP;
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            // 设置为可以手动消费
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
    }
    
    

    2.使用示例

      @KafkaListener(topics = Constants.KAFKA_USER_TOPIC, groupId = Constants.KAFKA_SERVER_GROUP)
        public void syncUserByKafKa(String message, Acknowledgment ack) {
            try {
                // 调用具体的执行方法
                 bb(message);
    
                // 提交kafka消费位移
                ack.acknowledge();
            } catch (Exception e) {
                log.error("失败:" + e.getMessage() + "消息:" + message);
            } finally {
                // 提交kafka消费位移
                ack.acknowledge();
            }
    
        }
    
  • 相关阅读:
    Haproxy集群
    蓝桥杯刷题_day10
    2022年高教社杯国赛A题思路——波浪能最大输出功率设计
    Linux信号
    动态规划(二)最长递增子序列
    Opengl ES之FBO
    431-C++基础语法(31-40)
    Python:实现bogo sort排序算法(附完整源码)
    AI容器化部署开发尝试 (一)(Pycharm连接docker,并部署django测试)
    iphone14来了,可是约好的你去哪了
  • 原文地址:https://blog.csdn.net/qq_27860623/article/details/142181316