• SpringBoot集成Kafka


    1. 概述:

    本项目是微服务集成环境下使用。Boot版本2.5.2,Cloud版本2020.0.3,Alibaba版本2021.1.

    本期涉及 Kafka集成在某个微服务中,不在父类Pom引用做公共处理(父POM有集成内部的,本次搭建一般公开模式)。

    SpringCloud, SpringBoot 项目引用版本: 

    2.5.2
    2020.0.3
    2021.1

    Kafka 版本

    
    
       org.springframework.kafka
       spring-kafka
       2.7.3
    
    
    
       org.apache.avro
       avro
       1.11.0
    

    2. Kafka 配置

    如下:(kafka节点在spring下)

    配置可以自动加载,也可以通过后面的配置类Bean覆盖自定义其他参数。

    ###########【Kafka集群】###########
    # 如果连接的为kafka集群,这里配置多个地址,并以“,”隔开
      kafka:
      ###########【初始化生产者配置】###########
        producer:
          bootstrap-servers: 192.168.4.113:30092
          retries: 3  # 重试次数
          # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
          # 0表示不进行消息接收是否成功的确认 1表示当Leader接收成功时确认  all -1表示Leader和Follower都接收成功时确认
          acks: 0
          # 批量大小
          batch-size: 16384
          # 提交延时
          properties:
            linger:
              ms: 0
      # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
      # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
          buffer-memory: 33554432   # 生产端缓冲区大小
          key-serializer: org.apache.kafka.common.serialization.StringDeserializer
          value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer
    ############  消费者  #######################
        consumer:
          bootstrap-servers: 192.168.4.113:30092
          group-id: xihai-test
          enable-auto-commit: false
          key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
          properties:
              deserializer:
                key:
                  delegate:
                    class: org.apache.kafka.common.serialization.StringDeserializer
                value:
                  delegate:
                    ##  io.confluent.kafka.serializers.KafkaAvroDeserializer
                    class: io.confluent.kafka.serializers.KafkaAvroDeserializer
          listener:
            # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
            concurrency: 4
            # 自动提交关闭,需要设置手动消息确认
            ack-mode: manual_immediate
            # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
            missing-topics-fatal: false
            # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
            poll-timeout: 600000
    

     

    3. 生产者配置类

    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    import org.springframework.kafka.support.serializer.JsonSerializer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    
    @Configuration
    @EnableKafka
    public class KafkaProviderConfig {
    
        /**
         * XX基地         192.168.xxx.194 : 9092
         * XX K8s         192.168.xxx.113 : 30092
         */
        final static String KAFKA_HOST = "192.168.4.xxx";
        final static String KAFKA_PORT = "30092";
    
        final static String KAFKA_SERVER = KAFKA_HOST + ":" + KAFKA_PORT;
    
        static Map initProducerOneConfig() {
            //生产者配置
            Map props = new HashMap<>();
            //连接地址
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
            //重试,0为不启用重试机制
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            //控制批处理大小,单位为字节
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            //提交延时
            props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
            //生产端缓冲区
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            //生产者消息的键的序列化器
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            //生产者消息的值的序列化器  JsonSerializer
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            return props;
        }
    
        @Bean("producerOneFactory")
        public ProducerFactory producerOneFactory() {
            return new DefaultKafkaProducerFactory(initProducerOneConfig());
        }
    
        @Bean(name = "ProducerOne")
        public KafkaTemplate getKafkaTemplateOne() {
            return new KafkaTemplate(producerOneFactory());
        }
    
    }
    

    4. 测试生产者,发送消息

    采用其中的一种发送回调模式,获取发送消息的回调信息。

    @Resource
    private KafkaTemplate kafkaTemplate;
    @PostMapping("/send-obj")
    public void sendMessageObj(@RequestBody HeiHeMessageDto dto) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("id","id-00001" );
                jsonObject.put("applyId","applyId0001" );
                jsonObject.put("sfzh","610428198912125191" );
                jsonObject.put("spDeptId",  5839);
                HeiHeMessage heiHeMessage = new HeiHeMessage();
                // 发-XXX
                heiHeMessage.setUserIds(Arrays.asList("1628xxx8844xxx").toString());
                heiHeMessage.setMessage(dto.getMessage());
                heiHeMessage.setMessageParentType(dto.getMessageParentType());
                heiHeMessage.setMessageSonType(dto.getMessageSonType());
                heiHeMessage.setSupplyData(jsonObject.toJSONString());
                heiHeMessage.setSendTime( DateUtils.getDateTime() );
                heiHeMessageService.save(heiHeMessage);
        dto.setCreatedAt( DateUtils.getDateTime() );
        dto.setMessageParentType(ConstantKey.ShenPiZhongXin);
        dto.setMessageSonType(ConstantKey.xinXiBianGeng);
        dto.setMessage(ConstantKey.MSG_UPDATE_APPLY);
        kafkaTemplate.send(KafkaTopicConstant.TOPIC, dto )
                // 回调
                .addCallback(new ListenableFutureCallback>() {
                    @Override
                    public void onFailure(@NonNull Throwable throwable) {
                        log.error("失败!!!主题[{}]发送消息[{}]失败", "xa1h2p_octupus", dto, throwable);
                    }
                    @Override
                    public void onSuccess(SendResult result) {
                        log.info("成功! 主题[{}]发送消息[{}]成功,发送结果:{}", KafkaTopicConstant.HEIHE_TOPIC, dto, result);
                    }
                });
        log.info("KafkaProducerController-sendMessageObj-END");
    }

    5. 消费者配置类

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Primary;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.listener.*;
    import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
    import org.springframework.kafka.support.serializer.JsonDeserializer;
    import org.springframework.util.backoff.BackOff;
    import org.springframework.util.backoff.FixedBackOff;
    
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.Map;
    
    
    //@Configuration
    @Slf4j
    public class KafkaMyConsumerConfig {
    
        /**
         * 哈XX地         192.168.104.xxx : 9092
         * 西XX K8s         192.168.4.xxx : 30092
         */
        final static String KAFKA_HOST = "192.168.4.xxx";
        final static String KAFKA_PORT = "30092";
    
        final static String KAFKA_SERVER = KAFKA_HOST + ":" + KAFKA_PORT;
    
        @Value("${spring.kafka.consumer.group-id}")
        private String groupId;
        @Value("${spring.kafka.consumer.enable-auto-commit}")
        private boolean enableAutoCommit;
    
        public Map myConsumerConfigs() {
            Map propsMap = new HashMap<>(16);
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            //自动提交的时间间隔,自动提交开启时生效
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
            //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
            //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
            //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
            //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
            //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    
            //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            // StringDeserializer JsonDeserializer
            propsMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class );
            propsMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class );
            // propsMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey");
            // propsMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
    
            propsMap.put(JsonDeserializer.TRUSTED_PACKAGES, "*");// com.xahengpin.qzx.rygl.api.model
            propsMap.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
            return propsMap;
        }
    
        @Bean("myConsumerFactory")
        public ConsumerFactory myConsumerFactory() {
            //配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
                //deserializer.trustedPackages("*"); // com.xahengpin.qzx.rygl.api.model
                return new DefaultKafkaConsumerFactory<>(
                        myConsumerConfigs());
        }
    
        @Resource
        KafkaTemplate kafkaTemplate;
    
        @Bean("kafkaListenerContainerFactory")
        public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(myConsumerFactory());
            //在侦听器容器中运行的线程数,一般设置为 机器数*分区数
            factory.setConcurrency(4);
            //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
            factory.setMissingTopicsFatal(false);
            //自动提交关闭,需要设置手动消息确认
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
            // # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
            factory.getContainerProperties().setPollTimeout(600000);
            //设置为批量监听,需要用List接收
            //factory.setBatchListener(true);
            factory.setErrorHandler( kafkaErrorHandler(kafkaTemplate) );
            return factory;
        }
    
        /**
         * Boot will autowire this into the container factory.
         */
        @Bean
        public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
            return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
        }
    
        /**
         * Configure the {@link DeadLetterPublishingRecoverer} to publish poison pill bytes to a dead letter topic:
         * "stock-quotes-avro.DLT".
         */
        @Bean
        public DeadLetterPublishingRecoverer publisher(KafkaTemplate bytesTemplate) {
            return new DeadLetterPublishingRecoverer(bytesTemplate);
        }
    
        @Bean
        @Primary
        public ErrorHandler kafkaErrorHandler(KafkaTemplate template) {
            log.warn("kafkaErrorHandler begin to Handle");
            // <1> 创建 DeadLetterPublishingRecoverer 对象
            ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
            // <2> 创建 FixedBackOff 对象   设置重试间隔 10秒 次数为 1 次
            // 创建 DeadLetterPublishingRecoverer 对象,它负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。
            // 注意,正常发送 1 次,重试 1 次,等于一共 2 次
            BackOff backOff = new FixedBackOff(10 * 1000L, 1L);
            // <3> 创建 SeekToCurrentErrorHandler 对象
            return new SeekToCurrentErrorHandler(recoverer, backOff);
        }
    }
    

    6. 异常消费处理配置

    消费有异常,无法正常确认消费。消息进入死信队列

    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.common.TopicPartition;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.listener.KafkaListenerErrorHandler;
    import org.springframework.kafka.listener.ListenerExecutionFailedException;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.stereotype.Component;
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.Map;
    
    @Component
    public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
    
        @Override
        @NonNull
        public Object handleError(@NonNull Message message, @NonNull ListenerExecutionFailedException exception) {
            return new Object();
        }
    
        @Resource
        private KafkaTemplate kafkaTemplate;
    
        @Override
        @NonNull
        public Object handleError(@NonNull Message message, @NonNull ListenerExecutionFailedException exception, Consumer consumer) {
            System.out.println("消息详情:" + message);
            System.out.println("异常信息::" + exception);
            System.out.println("消费者详情::" + consumer.groupMetadata());
            System.out.println("监听主题::" + consumer.listTopics());
            //return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
    
            //获取消息处理异常主题
            MessageHeaders headers = message.getHeaders();
            Map offsetsToReset = new HashMap<>();
            String topic = "test"+ ".DLT";
            //放入死信队列
            kafkaTemplate.send(topic,message.getPayload());
           return message;
        }
    }

    7. 消费者消费消息和测试死信队列

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    @Component
    @Slf4j
    public class KafkaConsumer {
    
        @KafkaListener(topics = {"xa1hp2_oct1upus", "test"},
                errorHandler = "myKafkaListenerErrorHandler", containerFactory = "kafkaListenerContainerFactory")
        public void consumerTestTopic(ConsumerRecord record, Acknowledgment ack ) {
            log.info(">>>>>>>>消费者监听到数据:{}", record);
            throw new RuntimeException("消费者监听到数据-抛出了异常!");
    //        try {
    //            //用于测试异常处理
    //            //int i = 1 / 0;
    //            System.out.println(record.topic());
    //            System.out.println(record.value());
    //            System.out.println(record.headers());
    //            //手动确认
    //            ack.acknowledge();
    //        } catch (Exception e) {
    //            System.out.println("消费失败:" + e);
    //        }
        }
    
        
        @KafkaListener( topics = {"test.DLT", "xa1hp2_oct1upus.DLT"})
        public void messListenerDLT(ConsumerRecord record) {
            System.out.println("XXXXXXX>>>>>>死信队列消费端 收到消息:" + record );
        }
    }
    

  • 相关阅读:
    Web框架开发-web框架
    cesium火箭发射,模型控制,模型动画,模型移动
    Spring DI(依赖注入)的实现方式:属性注入和构造注入
    数字化转型孕育而来的在线文档协同工具:Baklib知识库及帮助中心
    HTML+CSS+JavaScript七夕情人节表白网页【樱花雨3D相册】超好看
    微服务Eureka注册中心地址配置不生效
    Flask狼书笔记 | 07_留言板
    Cookie &Session & JSP
    mysql 练习3
    python类
  • 原文地址:https://blog.csdn.net/Mynah886/article/details/133885823