• Kafka(二)、Kafka与SpringBoot集成


    一、SpringBoot整合Kafka

    1.1 pom.xml

    <dependency>
        <groupId>org.springframework.kafkagroupId>
        <artifactId>spring-kafkaartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    1.2 application.yml

    spring:
      application:
        name: spring-kafka
    
      kafka:
        # kafka集群地址,多个逗号隔开
        bootstrap-servers: 127.0.0.01:9092
        # producer 生产者
        producer:
          retries: 0 # 消息发送失败重试
          acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
          batch-size: 16384 # 批量大小
          buffer-memory: 33554432 # 生产端缓冲区大小
          # key value 序列化机制
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    
        # consumer消费者
        consumer:
          # 默认的消费组ID
          group-id: kafka-default-group
          # 是否自动提交 offset
          enable-auto-commit: true
          # 提交offset延时(接收到消息后多久提交offset)
          auto-commit-interval: 100
          # earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
          # latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
          # none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
          auto-offset-reset: latest
          # key value 序列化机制
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    1.3 消息发送

    Kafka 消息发送流程如下:
    在这里插入图片描述

    1.3.1 发送消息

    通过 KafkaTemplate 发送消息。

    @RestController
    @Slf4j
    public class KafkaProducerController {
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        public static final String TOPIC = "spring-kafka-topic-test";
    
        @GetMapping("/sendMessage")
        public String sendMessage(String msg) throws Exception {
            // 默认异步发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC, msg);
            // 通过 ListenableFuture.get() 同步等待
            SendResult sendResult = future.get();
            return sendResult.toString();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    1.3.2 异步消息发送监听

    spring-kafka 发送消息采用异步方式,我们可以通过 ListenableFuture> future = kafkaTemplate.send(TOPIC, msg); SendResult sendResult = future.get(); 进行同步等待获取结果,也可以为 KafkaTemplate 添加一个异步消息发送监听器,来判断消息是否发送成功,如下:

    @Configuration
    @Slf4j
    public class KafkaConfiguration<K, V> {
    	@Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public KafkaTemplate<K, V> kvKafkaTemplate() {
            Map<String, Object> props = new HashMap<>();
            // kafka服务地址
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            // key、value 采用的序列化机制
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // 添加自定义分区器
            // props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartition.class);
            KafkaTemplate<K, V> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
            kafkaTemplate.setProducerListener(new ProducerListener<K, V>() {
                @Override
                public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
                    log.info("onSuccess record={}", producerRecord.value());
                }
    
                @Override
                public void onError(ProducerRecord<K, V> producerRecord, Exception exception) {
                    log.info("onError record={}", producerRecord.value());
                }
            });
            return kafkaTemplate;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    1.3.3 序列化

    spring-kafka 使用 KafkaTemplate 发送消息时,会指定序列化机制,如:

    spring:
      kafka:
        bootstrap-servers: 127.0.0.1:9092
        # producer 生产者
        producer:
          # key value 序列化机制
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        # consumer消费者
        consumer:
          # key value 序列化机制
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    org.apache.kafka.common.serialization.StringSerializer 为 Kafka 自带的字符串序列化机制,除此之外,还提供很多默认序列化机制,如:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等,这些序列化器都实现了接口 org.apache.kafka.common.serialization.Serializer,因此,我们也可以实现自定义序列化机制,如下:

    1. Serializer
    public class MySerializer<T> implements Serializer<Object> {
        @Override
        public void configure(Map map, boolean b) {
            // 配置
        }
    
        @Override
        public byte[] serialize(String s, Object o) {
            // 序列化
            return JSON.toJSONString(o).getBytes(StandardCharsets.UTF_8);
        }
    
        @Override
        public void close() {
            // 关闭时调用
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    2. Deserializer
    public class MyDeserializer<T> implements Deserializer<Object> {
        @Override
        public void configure(Map<String, ?> map, boolean b) {
            // 配置
        }
    
        @Override
        public Object deserialize(String s, byte[] bytes) {
            // 反序列化
            return JSON.parse(new String(bytes, StandardCharsets.UTF_8));
        }
    
        @Override
        public void close() {
            // 关闭时调用
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    1.3.4 分区器

    Kafka 分区器决定了消息根据 key 投放到哪个分区,也是顺序消费保障的基石。

    • 指定分区号,直接将数据发送到指定的分区
    • 没有指定分区号,key != null,通过给定数据的 key 进行 hashCode 取模判断
    • 既没有指定分区号,key == null,轮循发送(默认)
    • 自定义分区
    1. 默认分区器
    public class DefaultPartitioner implements Partitioner {
    
        private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
    
        public void configure(Map<String, ?> configs) {}
    
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if (keyBytes == null) {
                int nextValue = nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {
                    // no partitions are available, give a non-available partition
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
                // hash the keyBytes to choose a partition
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
        private int nextValue(String topic) {
            AtomicInteger counter = topicCounterMap.get(topic);
            if (null == counter) {
                counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
                AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
                if (currentCounter != null) {
                    counter = currentCounter;
                }
            }
            return counter.getAndIncrement();
        }
    
        public void close() {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    2. 自定义分区器

    实现 org.apache.kafka.clients.producer.Partitioner 接口,重写 partition() 方法,如下:

    public class MyPartition implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            /**
             * 自定义分区器:key 的哈希值取模
             */
            if (key == null) {
                return 0;
            }
            return key.toString().hashCode() % 2;
        }
    
        @Override
        public void close() {
            // 关闭时调用
        }
    
        @Override
        public void configure(Map<String, ?> map) {
            // 配置
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    1.4 消息接收

    kafka的消息监听有两种方式:

    1. 注解式监听
    2. 接口式监听

    1.4.1 @KafkaListener 注解

    @Component
    @Slf4j
    public class SpringKafkaTestConsumer {
    
        @KafkaListener(topics = KafkaProducerController.TOPIC)
        /*@KafkaListeners({
                // 可同时监听多个topic
                @KafkaListener(topics = KafkaProducerController.TOPIC)
        })*/
        public void onMessage(ConsumerRecord<String, String> record) {
            String value = record.value();
            System.out.println("=================收到topic【" + KafkaProducerController.TOPIC + "】消息:" + value);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    关于 @KafkaListener 注解详解,参照下面。

    1.4.2 MessageListener 接口

    @Component
    @Slf4j
    public class SpringKafkaTestConsumer implements GenericMessageListener {
        @Override
        public void onMessage(Object o) {
            String value = (String) o;
            System.out.println("=================收到topic【" + KafkaProducerController.TOPIC + "】消息:" + value);
        }
    
        @Override
        public void onMessage(Object data, Acknowledgment acknowledgment) {
            String value = (String) data;
            System.out.println("=================收到topic【" + KafkaProducerController.TOPIC + "】消息:" + value);
        }
    
        @Override
        public void onMessage(Object data, Acknowledgment acknowledgment, Consumer consumer) {
            String value = (String) data;
            System.out.println("=================收到topic【" + KafkaProducerController.TOPIC + "】消息:" + value);
        }
    
        @Override
        public void onMessage(Object data, Consumer consumer) {
            String value = (String) data;
            System.out.println("=================收到topic【" + KafkaProducerController.TOPIC + "】消息:" + value);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    使用 MessageListener 需添加配置类,详细参照下面。

    1.4.3 消息组

    Kafka 消费者使用一个消费组名称来进行标识,发布到 topic 中的每条记录被分配给订阅消费组中的一个消费者实例。消费者实例可以分布在多个进程中或者多个机器上。

    • 如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。
    • 如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。

    在这里插入图片描述
    同一个 Topic 可由不同消费者组进行消费,如下:

    // 组A,消费者1
    @KafkaListener(topics = {"topic-test"},groupId = "group-A")
    public void onMessage1(ConsumerRecord<?, ?> record) {
        String value = record.value();
        System.out.println("=================收到topic【" + KafkaProducerController.TOPIC + "】消息:" + value);
    }
    
    // 组A,消费者2
    @KafkaListener(topics = {"topic-test"},groupId = "group-A")
    public void onMessage2(ConsumerRecord<?, ?> record) {
        String value = record.value();
        System.out.println("=================收到topic【" + KafkaProducerController.TOPIC + "】消息:" + value);
    }
    
    @KafkaListener(topics = {"topic-test"},groupId = "group-B")
    public void onMessage3(ConsumerRecord<?, ?> record) {
        String value = record.value();
        System.out.println("=================收到topic【" + KafkaProducerController.TOPIC + "】消息:" + value);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    1.4.4 offset 提交

    1. 自动提交

    在 spring-kafka配置中,存在两个配置,如下:

    # 是否自动提交 offset
    enable-auto-commit: true
    # 提交offset延时(接收到消息后多久提交offset),单位毫秒
    auto-commit-interval: 100
    
    • 1
    • 2
    • 3
    • 4
    2. 手动提交

    大多数场景中,业务需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复等。
    下面我们自己定义配置,覆盖上面的参数。

    @Configuration
    public class KafkaConfiguration {
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean("manualKafkaListenerContainerFactory")
        public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            // 设置手动提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
    
            /**
             * ack模式,针对 ENABLE_AUTO_COMMIT_CONFIG=false 时生效,有7中模式,如下
             * 1.RECORD:每处理一条commit一次
             * 2.BATCH(默认):每次poll的时候批量提交一次,频率取决于每次poll的调用频率
             * 3.TIME:每次间隔ackTime的时间去commit
             * 4.COUNT:累积达到ackCount次的ack去commit
             * 5.COUNT_TIME:ackTime或ackCount哪个条件先满足,就commit
             * 6.MANUAL:listener负责ack,但是背后也是批量上去
             * 7.MANUAL_IMMEDIATE:listner负责ack,每调用一次,就立即commit
             */
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
            return factory;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    然后通过在消费端的Consumer来提交偏移量

    Kafka 提供了两种提交偏移量方式:

    1. consumer.commitAsync():异步提交
    2. consumer.commitSync():同步提交.

    commitSync() 方法提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,commitSync()会一直重试,但是commitAsync()不会。一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。否则就会造成重复消费因此,在消费者关闭前一般会组合使用 commitAsync() 和 commitSync(),如下:

    @KafkaListener(topics = "topic-test", groupId = "offset-group", 
    containerFactory = "manualKafkaListenerContainerFactory" // 指定容器创建工程)
    public void manualCommit(@Payload String message,
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             Consumer consumer,
                             Acknowledgment ack) {
        try {
            // 异步提交
            consumer.commitAsync();
            
            //do something.....
            
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            try {
            	// 同步提交
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    二、@KafkaListener 注解

    2.1 KafkaListener 注解的主要属性

    • id:监听器 id,消费者线程命名会使用当前值为前缀,在相同容器中的监听器ID不能重复
    • groupId:kafka 消费组 id
    • idlsGroup:是否用 id 作为 groupId,如果置为 false,并指定 groupId 时,消费组 ID 使用 groupId;如果置为 true,会使用监听器的 id 作为 groupId
    • topics:topics:指定要监听哪些 topic(与 topicPattern、topicPartitions 三选一)
    • topicPattern: 匹配 topic 进行监听(与 topics、topicPartitions 三选一),topicPattern 支持表达式,如:
    // _ 匹配一个字符,* 匹配多个字符
    @KafkaListener(topicPattern = "aaa_.*_.*_bb.*")
    
    • 1
    • 2
    • topicPartitions: 显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)
    // 监听topic1的0,1分区
    // 监听topic2的第0分区, 并且第1分区从offset为100的开始消费
    @KafkaListener(topicPartitions = { 
    	@TopicPartition(topic = "topic1", partitions = { "0", "1" }),
    	@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • containerFactory:指定监听器容器工厂
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 设置为批量消费,通过参数 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 指定最大拉取数量
        factory.setBatchListener(true);
        return factory;
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • errorHandler: 监听异常处理器,配置 Spring IoC 中的 BeanName。实现KafkaListenerErrorHandler; 然后做一些异常处理。
    @Component
    public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
        @Override
        public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
            return null;
        }
     
        @Override
        public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
            
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • beanRef:真实监听容器的 BeanName,需要在 BeanName前加 “__”
      在这里插入图片描述
    • clientIdPrefix:消费者Id前缀
    • concurrency: 覆盖容器工厂 containerFactory 的并发配置,表示消费者的个数,一般和topic分区设置成一样

    注意:从 2.2.4 版开始,可以直接在注解中指定 Kafka 使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。可以使用 #{…​} 或属性占位符(${…​})在 SpEL 上配置注释上的大多数属性。如:
    @KafkaListener(concurrency = "${listen.concurrency:3}")
    标识属性 concurrency 将会从容器中获取 listen.concurrency 的值,如果不存在就使用默认值3

    2.2 KafkaListener注解样例

    @KafkaListener(topics = KafkaProducerController.TOPIC)
    /*@KafkaListeners({
            // 可同时监听多个topic
            @KafkaListener(topics = KafkaProducerController.TOPIC)
    })*/
    public void onMessage(ConsumerRecord<String, String> record) {
        String value = record.value();
        System.out.println("=================收到topic消息:" + value);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    @KafkaListener(topicPattern = "topic-test")
    public void onMessage(@Payload String data,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ByteBuffer key,
                          Acknowledgment ack,// 手动提交offset
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.OFFSET) long offSet,
                          Consumer<?, ?> consumer) {
    	try {
            String value= (String) record.value();
        	System.out.println("=================收到topic消息:" + value);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            // 手动提交 offset
            ack.acknowledge();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    三、MessageListener 接口

    Kafka 的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。GenericMessageListener 是 Spring Kafka 的消息监听器接口,也是一个函数式接口,利用接口的 onMessage() 方法可以实现消费数据,如下:

    @FunctionalInterface
    public interface GenericMessageListener<T> {
    
    	void onMessage(T data);
    
    	default void onMessage(T data, Acknowledgment acknowledgment) {
    		throw new UnsupportedOperationException("Container should never call this");
    	}
    
    	default void onMessage(T data, Consumer<?, ?> consumer) {
    		throw new UnsupportedOperationException("Container should never call this");
    	}
    
    	default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
    		throw new UnsupportedOperationException("Container should never call this");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    spring-kafka 根据 GenericMessageListener 默认提供的接口有:

    1. 单条数据消息监听器接口 MessageListenen
    2. 多条数据消息监听器接口 BatchMessageListener
    3. 带ACK机制的单条数据消息监听器 AcknowledgingMessageListener
    4. 带ACK机制的多条数据消息监听器 BatchAcknowledgingMessageListener

    其中,类关系图如下:
    在这里插入图片描述

    3.1 消息监听容器

    消息监听器 MessageListener 需要通过消费监听器容器 MessageListenerContainer 接口中的 setupMessageListenner() 方法来启动监听,如下:

    public interface MessageListenerContainer extends SmartLifecycle {
    
    	// 设置消息监听器
    	void setupMessageListener(Object messageListener);
    
    	// 返回按 client-id 分组的消费者指标,key -> client-id
    	Map<String, Map<MetricName, ? extends Metric>> metrics();
    
    	// 获取容器属性
    	default ContainerProperties getContainerProperties() {
    		throw new UnsupportedOperationException("This container doesn't support retrieving its properties");
    	}
    
    	// 获取容器的topic、partition
    	default Collection<TopicPartition> getAssignedPartitions() {
    		throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
    	}
    
    	// 在下一个轮询之前暂停此容器。
    	default void pause() {
    		throw new UnsupportedOperationException("This container doesn't support pause");
    	}
    
    	// 如果暂停,则在下一个轮询之后恢复此容器。
    	default void resume() {
    		throw new UnsupportedOperationException("This container doesn't support resume");
    	}
    
    	// true if pause has been requested.
    	default boolean isPauseRequested() {
    		throw new UnsupportedOperationException("This container doesn't support pause/resume");
    	}
    
    	// true if the container is paused.
    	default boolean isContainerPaused() {
    		throw new UnsupportedOperationException("This container doesn't support pause/resume");
    	}
    
    	// 设置自动启动
    	default void setAutoStartup(boolean autoStartup) {
    		// empty
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    spring-kafka 提供了两个容器 KafkaMessageListenerContainerConcurrentMessageListenerContainer
    在这里插入图片描述

    3.2 消息监听容器工厂

    消息监听器容器由容器工厂 KafkaListenerContainerFactory 统一管理和创建,如下,需指定消息监听容器类型。

    public interface KafkaListenerContainerFactory<C extends MessageListenerContainer> {
    	// 根据 endpoint 创建一个消息监听器容器
    	C createListenerContainer(KafkaListenerEndpoint endpoint);
    
    	// 根据 topic、partition 创建一个消息监听器容器
    	C createContainer(Collection<TopicPartitionInitialOffset> topicPartitions);
    
    	// 根据 topic 创建一个消息监听器容器
    	C createContainer(String... topics);
    
    	// 根据 topic 的正则表达式创建一个消息监听器容器
    	C createContainer(Pattern topicPattern);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    spring-kafka 提供了监听器容器工厂 ConcurrentKafkaListenerContainerFactory,类结构图如下:

    在这里插入图片描述
    其有两个重要的配置

    1. ContainerProperties:ContainerProperties 定义了要消费消息的 topic,消息处理的 MessageListener 等信息。
    2. ConsumerFactory:ConsumerFactory 指定了消费者工厂

    3.3 实现一个接口式消息监听器

    1. 创建一个 ConsumerFactory
    2. 创建 ConcurrentKafkaListenerContainerFactory 容器工厂
    3. 创建 KafkaMessageListenerContainer 监听容器

    如下:

    @Configuration
    @Slf4j
    public class KafkaConfiguration<K, V> {
        @Autowired
        private KafkaProperties kafkaProperties;
    
        @Bean
        public ConsumerFactory<K, V> consumerFactory() {
            // 通过 KafkaProperties ConsumerProperties 创建一个默认的 ConsumerFactory
            return new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties());
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<K, V> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
            // 深圳 ConsumerFactory
            factory.setConsumerFactory(consumerFactory());
            // 指定容器并发性:表示消费者的个数,一般和 topic 分区设置成一样
            factory.setConcurrency(3);
            // 是否为批量监听
            factory.setBatchListener(true);
            // 设置拉取超时时间,单位毫秒(ms)
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }
    
        @Bean
        public KafkaMessageListenerContainer<K, V> kafkaMessageListenerContainer() {
            // 创建一个容器属性对象:监听 spring-kafka-topic-test、spring-kafka-topic-test-1 消息
            ContainerProperties containerProperties = new ContainerProperties("spring-kafka-topic-test", "spring-kafka-topic-test-1");
            // 设置消息监听:自定义消息监听器
            containerProperties.setMessageListener((MessageListener<K, V>) data ->
                    System.out.println("收到消息: " + data.value())
            );
            return new KafkaMessageListenerContainer<K, V>(consumerFactory(), containerProperties);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    四、消息拦截器

    实际业务开发中,很多场景需要在消息发送前进行某些操作,如按照某个规则过滤掉不符合要求的消息,或者屏蔽某些关键词,或者改变某些属性等等,都可以依托消息拦截器链来实现。

    自定义拦截器需要实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口,根据需要重写相应的方法。

    public interface ProducerInterceptor<K, V> extends Configurable {
    	// 重新当前方法,可改变消息内容
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    
    	// 如果需要对消息发送的结果进行处理,则需要重写onAcknowledgement方法,
    	// exception有值,说明消息发送失败,可以采取自定义的措施,比如重试等等,也可以用来统计消息发送的成功率
        public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    
    	// Producer关闭时调用
        public void close();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4.1 自定义消息拦截器

    @Slf4j
    public class MyProducerInterceptor implements ProducerInterceptor {
    
        @Override
        public ProducerRecord onSend(ProducerRecord record) {
            log.info("myProducerInterceptor>>>>>>>onSend record={}", JSON.toJSONString(record));
    
            String value = (String) record.value();
            value += "   timeMillis:" + System.currentTimeMillis();
    
            ProducerRecord<String, String> newRecord = new ProducerRecord<>(
                    record.topic(),
                    record.partition(),
                    record.timestamp(),
                    (String) record.key(),
                    value,  // 修改后的 value值
                    record.headers());
            return newRecord;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            log.info("myProducerInterceptor>>>>>>>onAcknowledgement metadata={}", JSON.toJSONString(metadata));
            if (exception != null) {
                // 发送失败
                log.info("myProducerInterceptor>>>>>>>onAcknowledgement exception={}", exception.getMessage());
            }
        }
    
        @Override
        public void close() {
            log.info("myProducerInterceptor>>>>>>>close");
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    4.2 添加配置

    4.2.1 yaml 配置

    spring:
      kafka:
        bootstrap-servers: 127.0.0.1:9092
        # producer 生产者
        producer:
          retries: 0 # 消息发送失败重试
          acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
          batch-size: 16384 # 批量大小
          buffer-memory: 33554432 # 生产端缓冲区大小
          # key value 序列化机制
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          properties:
            # 拦截器配置,多个用逗号隔开
            interceptor:
              classes: xxx.xxx.xxx.MyProducerInterceptor,xxx.xxx.xxx.My2ProducerInterceptor
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    4.2.2 代码初始化指定

    @Configuration
    @Slf4j
    public class KafkaConfiguration<K, V> {
    	@Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public KafkaTemplate<K, V> kafkaTemplate() {
            Map<String, Object> props = new HashMap<>();
            // kafka服务地址
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            // key、value 采用的序列化机制
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // 添加自定义分区器
            // props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartition.class);
            // 自定义拦截器
            List<String> interceptors = Arrays.asList(MyProducerInterceptor.class.getName());
            props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
            
            KafkaTemplate<K, V> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
            return kafkaTemplate;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    4.3 拦截器执行顺序

    执行顺序取决于配置的先后顺序,在前面的先执行。

    同一个拦截器添加多次,会执行多次。

  • 相关阅读:
    基于java_ssm_vue鲜花在线销售商城网站-计算机毕业设计
    虚拟机CentOS 8 重启后不能上网
    【学习笔记】[ARC145F] Modulo Sum of Increasing Sequences
    学习笔记:机器学习之支持向量机(四、线性支持向量机-软间隔最大化对偶形式)
    实现Java基于类的代理方式 - CGLIB动态代理(动态代理篇 三)
    盘点面试中常见的超大规模数据常见的算法问题
    【SpringSecurity】SpringSecurity2.7.x 的使用(02)
    SpringBoot集成Dubbo、Redis、MyBatis、Spring、SpringMVC、JSP
    iOS之crash分析篇--捕获signal类型的崩溃信息
    网络安全(黑客)自学
  • 原文地址:https://blog.csdn.net/qq_33375499/article/details/127601922