• kafka消费者理解


    概念入门

    消费者、消费组

    消费者
    消费者从订阅的主题topic消费消息,消费消息的偏移量保存在Kafka的名字是__consumer_offsets 的主题中。消费者还可以将⾃⼰的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper
    推荐使⽤Kafka存储消费者的偏移量。因为Zookeeper不适合⾼并发
    消费组
    多个从同一个主题topic消费消息的消费者,可以加入到一个消费组中。
    消费组中的消费者共享group_id ,配置相同的 configs.put("group.id", "xxx");

    group_id一般设置为应用的逻辑名称。比如多个订单服务订阅一个主题topic,可以设置group_id为order_process。
    group_id通过消费者的配置指定:group_id=xxxxxx
    在这里插入图片描述在这里插入图片描述
    消费组均衡的给消费者分配分区,每个分区只由消费组中的一个消费者消费。
    消费组中的消费者和分区的关系:每个分区只能被消费组中的一个消费者消费,一个消费者可能消费多个分区

    • 消费组数量和分区数相同时,消费组中的每个消费者分别消费一个分区
    • 消费组数量<分区数时,消费组中的每个消费者消费多个分区
    • 消费组数量>分区数时,多余的消费者不会分配消费的分区
      通过控制分区数和消费者数量,可以通过增加消费者来横向扩展,增加消费的能力。横向扩展增加消费者不会对性能造成负面影响,但要保证每个消费者能分配到消费分区

    心跳机制

    kafka的心跳机制,是消费者和kafka broken之间的健康检查,用来控制消费超时。当检查不通过时,则会触发rebalance再平衡。心跳机制对消费者客户端来说是无感的,当我们启动客户端时心跳机制就开始,它是一个异步线程。心跳机制触发rebalance再平衡,可能会导致消息重复消费

    • 消费者宕机活着消费消息超时,心跳机制失败退出消费组,触发再平衡,重新给消费组中的消费者分配分区
      在这里插入图片描述
    • kafka broken宕机,导致某个分区没有用了。心跳机制失败触发rebalance再平衡,对应的消费者没有可消费的分区闲置。
      在这里插入图片描述
      Kafka 的⼼跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer才会发送⼼跳。
      Consumer 和 Rebalance再平衡 相关的 2 个配置参数:
    参数字段
    session.timeout.msMemberMetadata.sessionTimeoutMsconsumer会发送周期性的心跳表明该consumer是活着的。如果超过session.timeout.ms设定的值仍然没有收到心跳,重新分配分区和消费者的对应关系
    max.poll.interval.msMemberMetadata.rebalanceTimeoutMs它表示最大的poll数据间隔,默认值是3秒。如果超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该consumer处于 livelock状态。就会将该consumer移出consumer group。所以为了不使 Consumer 自己被移出Consumer 应该不停的发起poll(timeout)操作,当再次poll的时候,会重新加入到ConsumerGroup,触发消费者再平衡策略 RebalanceGroup。而这个动作 KafkaConsumer Client是不会帮我们做的,这就需要自己在程序中不停的调用poll方法了。

    心跳机制原理
    do something

    消息接收

    在这里插入图片描述

    必要参数配置

    参数说明
    bootstrap.servers向Kafka集群建⽴初始连接⽤到的host/port列表。客户端会使⽤这⾥列出的所有服务器进⾏集群其他服务器的发现,⽽不管是否指定了哪个服务器⽤作引导。这个列表仅影响⽤来发现集群所有服务器的初始主机。字符串形式:host1:port1,host2:port2,…由于这组服务器仅⽤于建⽴初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这⾥。⼀般最好两台,以防其中⼀台宕掉。
    key.deserializerkey的反序列化类,该类需要实现 org.apache.kafka.common.serialization.Deserializer 接⼝。
    value.deserializer实现了 org.apache.kafka.common.serialization.Deserializer 接⼝的反序列化器,⽤于对消息的value进⾏反序列化。
    client.id当从服务器消费消息的时候向服务器发送的id字符串。在ip/port基础上提供应⽤的逻辑名称,记录在服务端的请求⽇志中,⽤于追踪请求的源
    group.id⽤于唯⼀标志当前消费者所属的消费组的字符串。如果消费者使⽤组管理功能如subscribe(topic)或使⽤基于Kafka的偏移量管理策略,该项必须设置。
    auto.offset.reset当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理?earliest:⾃动重置偏移量到最早的偏移量 latest:⾃动重置偏移量为最新的偏移量 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常 anything:向消费者抛异常
    enable.auto.commit如果设置为true,消费者会⾃动周期性地向服务器提交偏移量。

    订阅

    主题和分区

    • topic:kafka用于分类管理消息的逻辑单元,类似于mysql的数据库。
    • partition:是kafka下存储数据的基本单元,这是物理上的概念。一个topic可以有多个partition,他们可以分布在不同的节点上,也可以在同一个节点。优势在于:可以水平扩张,避免单台机在磁盘空间和性能上的限制。同时可以通过数据备份增加容灾能力。为了做到均匀分布,通常partition数量是节点数量的蒸熟倍
    • consumer group:消费组是kafka实现单播和广播的能力。同一个消费者里面,一个分区只能有一个消费者,当消息路由到分区后只能被消费组里面的一个消费者消费。
      在这里插入图片描述
      cusumer采用pull模式从broken中读取消息。
      采用pull模式,消费者就能控制消息消费的速率,可以控制消费方式(逐条/批量),还可以选择不同的提交方式从而实现不同的传输语义

    反序列化

    在发送消息到broken之前,会先进行序列化变成字节数组。Kafka的broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进⾏反序列化处理,然后才能交给⽤户程序消费处理。
    消费者的反序列化器包括key的和value的反序列化器。
    key.deserializervalue.deserializer
    需要实现 org.apache.kafka.common.serialization.Deserializer 接⼝。

    package org.apache.kafka.common.serialization;
    
    import org.apache.kafka.common.header.Headers;
    
    import java.io.Closeable;
    import java.util.Map;
    
    /**
     * An interface for converting bytes to objects.
     *  * A class that implements this interface is expected to have a constructor with no parameters.
     * 

    * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. * * @param Type to be deserialized into. */ public interface Deserializer<T> extends Closeable { /** * Configure this class. * @param configs configs in key/value pairs * @param isKey whether is for key or value */ default void configure(Map<String, ?> configs, boolean isKey) { // intentionally left blank } /** * Deserialize a record value from a byte array into a value or object. * @param topic topic associated with the data * @param data serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception. * @return deserialized typed data; may be null */ T deserialize(String topic, byte[] data); /** * Deserialize a record value from a byte array into a value or object. * @param topic topic associated with the data * @param headers headers associated with the record; may be empty. * @param data serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception. * @return deserialized typed data; may be null */ default T deserialize(String topic, Headers headers, byte[] data) { return deserialize(topic, data); } /** * Close this deserializer. *

    * This method must be idempotent as it may be called multiple times. */ @Override default void close() { // intentionally left blank } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    系统提供了该接⼝的⼦接⼝以及实现类:

    • org.apache.kafka.common.serialization.ByteArrayDeserializer
    • org.apache.kafka.common.serialization.ByteBufferDeserializer
    • org.apache.kafka.common.serialization.BytesDeserializer
    • org.apache.kafka.common.serialization.DoubleDeserializer
    • org.apache.kafka.common.serialization.FloatDeserializer
    • org.apache.kafka.common.serialization.IntegerDeserializer
    • org.apache.kafka.common.serialization.LongDeserializer
    • org.apache.kafka.common.serialization.ShortDeserializer
    • org.apache.kafka.common.serialization.StringDeserializer

    ⾃定义反序列化

    ⾃定义反序列化类,需要实现 org.apache.kafka.common.serialization.Deserializer 接⼝。

    package luu.demo.kafka.consumer;
    
    import luu.demo.kafka.model.User;
    import org.apache.kafka.common.serialization.Deserializer;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class UserDeserializer implements Deserializer<User> {
        
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
        @Override
        public User deserialize(String topic, byte[] data) {
            ByteBuffer allocate = ByteBuffer.allocate(data.length);
            allocate.put(data);
            allocate.flip();
            int userId = allocate.getInt();
            int length = allocate.getInt();
            System.out.println(length);
            String username = new String(data, 8, length);
            return new User(userId, username);
        }
        
        @Override
        public void close() {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    MyConsumer

    package luu.demo.kafka.consumer;
    
    import luu.demo.kafka.model.User;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.function.Consumer;
    
    public class MyConsumer {
        public static void main(String[] args) {
            Map<String, Object> configs = new HashMap<>();
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    StringDeserializer.class);
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    UserDeserializer.class);
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
            configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");
            KafkaConsumer<String, User> consumer = new KafkaConsumer<String, User>(configs);
            consumer.subscribe(Collections.singleton("tp_user_01"));
            ConsumerRecords<String, User> records = consumer.poll(Long.MAX_VALUE);
            records.forEach(new Consumer<ConsumerRecord<String, User>>() {
                @Override
                public void accept(ConsumerRecord<String, User> record) {
                    System.out.println(record.value());
                }
            });
            // 关闭消费者
            consumer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    位移提交

    • Consumer需要向Kafka记录⾃⼰的位移数据,这个汇报过程称为提交位移(Committing Offsets)
    • Consumer 需要为分配给它的每个分区提交各⾃的位移数据
    • 位移提交的由Consumer端负责的,Kafka只负责保管__consumer_offsets
    • 位移提交分为⾃动提交和⼿动提交
    • 位移提交分为同步提交和异步提交

    自动提交

    Kafka Consumer 后台提交

    • 开启⾃动提交:enable.auto.commit=true
    • 配置⾃动提交间隔:Consumer端:auto.commit.interval.ms ,默认 5s
    Map<String, Object> configs = new HashMap<>();
    configs.put("bootstrap.servers", "node1:9092");
    configs.put("group.id", "mygrp");
    // 设置偏移量⾃动提交。⾃动提交是默认值。这⾥做示例。
    configs.put("enable.auto.commit", "true");
    // 偏移量⾃动提交的时间间隔
    configs.put("auto.commit.interval.ms", "3000");
    configs.put("key.deserializer", StringDeserializer.class);
    configs.put("value.deserializer", StringDeserializer.class);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
    consumer.subscribe(Collections.singleton("tp_demo_01"));
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.topic()
     - "\t" + record.partition()
     - "\t" + record.offset()
     - "\t" + record.key()
     - "\t" + record.value());
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • ⾃动提交位移的顺序
      配置 enable.auto.commit = true
      Kafka会保证在开始调⽤poll⽅法时,提交上次poll返回的所有消息
      因此⾃动提交不会出现消息丢失,但会 重复消费
      重复消费举例:
      Consumer 每 5s 提交 offset
      假设提交 offset 后的 3s 发⽣了 Rebalance
      Rebalance 之后的所有 Consumer 从上⼀次提交的 offset 处继续消费
      因此 Rebalance 发⽣前 3s 的消息会被重复消费

    同步提交

    • 使⽤ KafkaConsumer#commitSync():会提交 KafkaConsumer#poll() 返回的最新 offset
    • 该⽅法为同步操作,等待直到 offset 被成功提交才返回
    while (true) {
        ConsumerRecords<String, String> records =
                consumer.poll(Duration.ofSeconds(1));
        process(records); // 处理消息
        try {
            consumer.commitSync();
        } catch (CommitFailedException e) {
            handle(e); // 处理提交失败异常
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • commitSync 在处理完所有消息之后
    • ⼿动同步提交可以控制offset提交的时机和频率
    • ⼿动同步提交会:
      调⽤ commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果
      会影响 TPS
      可以选择拉⻓提交间隔,但有以下问题
      会导致 Consumer 的提交频率下降
      Consumer 重启后,会有更多的消息被消费

    异步提交

    • KafkaConsumer#commitAsync()
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(3_000);
        process(records); // 处理消息
        consumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
                handle(exception);
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • commitAsync出现问题不会⾃动重试
    • 处理⽅式:
    try {
        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofSeconds(1));
            process(records); // 处理消息
            commitAysnc(); // 使⽤异步提交规避阻塞
        }
    } catch (Exception e) {
        handle(e); // 处理异常
    } finally {
        try {
            consumer.commitSync(); // 最后⼀次提交使⽤同步阻塞式提交
        } finally {
            consumer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    消费者位移管理

    Kafka中,消费者根据消息的位移顺序消费消息。
    消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题__consumer_offsets中。
    Kafka提供了消费者API,让消费者可以管理⾃⼰的位移。
    API如下:KafkaConsumer

    项⽬细节
    APIpublic void assign(Collection partitions)
    说明给当前消费者⼿动分配⼀系列主题分区。⼿动分配分区不⽀持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。如果给出的主题分区是空的,则等价于调⽤unsubscribe⽅法。⼿动分配主题分区的⽅法不使⽤消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡。⼿动分区分配assign(Collection)不能和⾃动分区分配subscribe(Collection,ConsumerRebalanceListener)⼀起使⽤。如果启⽤了⾃动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进⾏异步提交。
    APIpublic Set assignment()
    说明获取给当前消费者分配的分区集合。如果订阅是通过调⽤assign⽅法直接分配主题分区,则返回相同的集合。如果使⽤了主题订阅,该⽅法返回当前分配给该消费者的主题分区集合。如果分区订阅还没开始进⾏分区分配,或者正在重新分配分区,则会返回none。
    APIpublic Map> listTopics()
    说明获取对⽤户授权的所有主题分区元数据。该⽅法会对服务器发起远程调⽤。
    APIpublic List partitionsFor(String topic)
    说明获取指定主题的分区元数据。如果当前消费者没有关于该主题的元数据,就会对服务器发起远程调⽤。
    APIpublic Map beginningOffsets(Collection partitions)
    说明对于给定的主题分区,列出它们第⼀个消息的偏移量。注意,如果指定的分区不存在,该⽅法可能会永远阻塞。该⽅法不改变分区的当前消费者偏移量。
    APIpublic void seekToEnd(Collection partitions)
    说明将偏移量移动到每个给定分区的最后⼀个。该⽅法延迟执⾏,只有当调⽤过poll⽅法或position⽅法之后才可以使⽤。如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移动到最后⼀个稳定的偏移量,即下⼀个要消费的消息现在还是未提交状态的事务消息。
    APIpublic void seek(TopicPartition partition, long offset)
    说明将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下⼀条要消费的消息偏移量。若该⽅法多次调⽤,则最后⼀次的覆盖前⾯的。如果在消费中间随意使⽤,可能会丢失数据。
    APIpublic long position(TopicPartition partition)
    说明检查指定主题分区的消费偏移量
    APIpublic void seekToBeginning(Collection partitions)
    说明将给定每个分区的消费者偏移量移动到它们的起始偏移量。该⽅法懒执⾏,只有当调⽤过poll⽅法或position⽅法之后才会执⾏。如果没有提供分区,则将所有分配给当前消费者的分区消费偏移量移动到起始偏移量。

    再均衡

    重平衡可以说是kafka为⼈诟病最多的⼀个点了。
    重平衡其实就是⼀个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每⼀个分区。⽐如⼀个topic有100个分区,⼀个消费者组内有20个消费者,在协调者的控制下让组内每⼀个消费者分配到5个分区,这个分配的过程就是重平衡。

    触发重平衡主要的条件:

    • 有新的消费者加入消费组。
    • 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的 GC、网络延迟、消费超时导致消费者长时间未向GroupCoordinator发送心跳等情况时,GroupCoordinator会认为消费者已经下线。
    • 有消费者主动退出消费组(发送 LeaveGroupRequest 请求)。比如客户端调用了unsubscrible()方法取消对某些主题的订阅。
    • 消费组所对应的GroupCoorinator节点发生了变更。
    • 消费组内所订阅的任一主题或者主题的分区数量发生变化。
      为什么说重平衡为⼈诟病呢?因为重平衡过程中,消费者⽆法从kafka消费消息,这对kafka的TPS影响极⼤,⽽如果kafka集内节点较多,⽐如数百个,那重平衡可能会耗时极多。数分钟到数⼩时都有可能,⽽这段时间kafka基本处于不可⽤状态。所以在实际环境中,应该尽量避免重平衡发⽣。

    避免重平衡

    要说完全避免重平衡,是不可能,因为你⽆法完全保证消费者不会故障。⽽消费者故障其实也是最常⻅的引发重平衡的地⽅,所以我们需要保证尽⼒避免消费者故障
    ⽽其他⼏种触发重平衡的⽅式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制。如果消费者真正挂掉了,就没办法了,但实际中,会有⼀些情况,kafka错误地认为⼀个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。

    ⾸先要知道哪些情况会出现错误判断挂掉的情况。在分布式系统中,通常是通过⼼跳来维持分布式系统的,kafka也不例外。
    在分布式系统中,由于⽹络问题你不清楚没接收到⼼跳,是因为对⽅真正挂了还是只是因为负载过重没来得及发⽣⼼跳或是⽹络堵塞。所以⼀般会约定⼀个时间,超时即判定对⽅挂了。⽽在kafka消费者场景中:

    • session.timout.ms控制⼼跳超时时间,就是规定这个超时时间是多少。
    • heartbeat.interval.ms控制⼼跳发送频率,这个参数控制发送⼼跳的频率,频率越⾼越不容易被误判,但也会消耗更多资源
    • max.poll.interval.ms控制poll消息的间隔,消费者poll数据后,需要⼀些处理,再进⾏拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过 max.poll.interval.ms 这个参数的值。这个参数的默认值是5分钟,⽽如果消费者接收到数据后会执⾏耗时的操作,则应该将其设置得⼤⼀些

    这⾥给出⼀个相对较为合理的配置,如下:

    • session.timout.ms:设置为6s
    • heartbeat.interval.ms:设置2s
    • max.poll.interval.ms:推荐为消费者处理消息最⻓耗时再加1分钟

    消费者拦截器

    消费者在拉取了分区消息之后,要⾸先经过反序列化器对key和value进⾏反序列化处理。
    处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应⽤程序进⾏处理。
    在这里插入图片描述
    消费端定义消息拦截器,需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor
    ⼝。

    1. ⼀个可插拔接⼝,允许拦截甚⾄更改消费者接收到的消息。⾸要的⽤例在于将第三⽅组件引⼊消费者应⽤程序,⽤于定制的监控、⽇志处理等。
    2. 该接⼝的实现类通过configre⽅法获取消费者配置的属性,如果消费者配置中没有指定clientID,还可以获取
      KafkaConsumer⽣成的clientId。获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产⽣冲突。
    3. ConsumerInterceptor⽅法抛出的异常会被捕获、记录,但是不会向下传播。如果⽤户配置了错误的key或value类型参数,消费者不会抛出异常,⽽仅仅是记录下来。
    4. ConsumerInterceptor回调发⽣在org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)⽅法同⼀个线程。

    接口中有如下方法:

    package org.apache.kafka.clients.consumer;
    
    import org.apache.kafka.common.Configurable;
    import org.apache.kafka.common.TopicPartition;
    import java.util.Map;
    
    public interface ConsumerInterceptor<K, V> extends Configurable {
        /**
         * 该⽅法在poll⽅法返回之前调⽤。调⽤结束后poll⽅法就返回消息了。
         * 

    * 该⽅法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或⽣成新的消息。 * 如果有多个拦截器,则该⽅法按照KafkaConsumer的configs中配置的顺序调⽤。 * * @param records 由上个拦截器返回的由客户端消费的消息。 */ public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); /** * 当消费者提交偏移量时,调⽤该⽅法。 * 该⽅法抛出的任何异常调⽤者都会忽略。 */ public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); public void close(); }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    自定义拦截器

    package com.lagou.kafka.demo.interceptor;
    import org.apache.kafka.clients.consumer.ConsumerInterceptor;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import java.util.Map;
    
    public class OneInterceptor implements ConsumerInterceptor<String, String> {
        @Override
        public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String>
                                                                 records) {
            // poll⽅法返回结果之前最后要调⽤的⽅法
            System.out.println("One -- 开始");
            // 消息不做处理,直接返回
            return records;
        }
    
        @Override
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            // 消费者提交偏移量的时候,经过该⽅法
            System.out.println("One -- 结束");
        }
    
        @Override
        public void close() {
            // ⽤于关闭该拦截器⽤到的资源,如打开的⽂件,连接的数据库等
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
            // ⽤于获取消费者的设置参数
            configs.forEach((k, v) -> {
                System.out.println(k + "\t" + v);
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    配置拦截器

    // One -> Two -> Three,接收消息和发送偏移量确认都是这个顺序
    props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
    	"com.lagou.kafka.demo.interceptor.OneInterceptor" +
    	",com.lagou.kafka.demo.interceptor.TwoInterceptor" +
    	",com.lagou.kafka.demo.interceptor.ThreeInterceptor"
     );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    消费者参数配置补充

    配置项说明
    bootstrap.servers建⽴到Kafka集群的初始连接⽤到的host/port列表
    group.id⽤于定义当前消费者所属的消费组的唯⼀字符串
    auto.commit.interval.ms如果设置了 enable.auto.commit的值为true,则该值定义了消费者偏移量向Kafka提交的频率。
    auto.offset.reset如果Kafka中没有初始偏移量或当前偏移量在服务器中不存在(⽐如数据被删掉了):earliest:⾃动重置偏移量到最早的偏移量。latest:⾃动重置偏移量到最后⼀个。none:如果没有找到该消费组以前的偏移量没有找到,就抛异常。其他值:向消费者抛异常。
    fetch.min.bytes服务器对每个拉取消息的请求返回的数据量最⼩值。如果数据量达不到这个值,请求等待,以让更多的数据累积,达到这个值之后响应请求。默认设置是1个字节,表示只要有⼀个字节的数据,就⽴即响应请求,或者在没有数据的时候请求超时。将该值设置为⼤⼀点⼉的数字,会让服务器等待稍微⻓⼀点⼉的时间以累积数据。如此则可以提⾼服务器的吞吐量,代价是额外的延迟时间。
    fetch.max.wait.ms如果服务器端的数据量达不到 fetch.min.bytes 的话,服务器端不能⽴即响应请求。该时间⽤于配置服务器端阻塞请求的最⼤时⻓。
    fetch.max.bytes服务器给单个拉取请求返回的最⼤数据量。消费者批量拉取消息,如果第⼀个⾮空消息批次的值⽐该值⼤,消息批也会返回,以让消费者可以接着进⾏。即该配置并不是绝对的最⼤值。broker可以接收的消息批最⼤值通过message.max.bytes (broker配置) 或 max.message.bytes (主题配置)来指定。需要注意的是,消费者⼀般会并发拉取请求。
    enable.auto.commit如果设置为true,则消费者的偏移量会周期性地在后台提交。
    connections.max.idle.ms在这个时间之后关闭空闲的连接
    check.crcs⾃动计算被消费的消息的CRC32校验值。可以确保在传输过程中或磁盘存储过程中消息没有被破坏。它会增加额外的负载,在追求极致性能的场合禁⽤。
    exclude.internal.topics是否内部主题应该暴露给消费者。如果该条⽬设置为true,则只能先订阅再拉取。
    isolation.level控制如何读取事务消息。如果设置了 read_committed ,消费者的poll()⽅法只会返回已经提交的事务消息。如果设置了 read_uncommitted (默认值),消费者的poll⽅法返回所有的消息,即使是已经取消的事务消息。⾮事务消息以上两种情况都返回。消息总是以偏移量的顺序返回。read_committed 只能返回到达LSO的消息。在LSO之后出现的消息只能等待相关的事务提交之后才能看到。结果, read_committed 模式,如果有为提交的事务,消费者不能读取到直到HW的消息。read_committed 的seekToEnd⽅法返回LSO。
    heartbeat.interval.ms当使⽤消费组的时候,该条⽬指定消费者向消费者协调器发送⼼跳的时间间隔。⼼跳是为了确保消费者会话的活跃状态,同时在消费者加⼊或离开消费组的时候⽅便进⾏再平衡。该条⽬的值必须⼩于 session.timeout.ms ,也不应该⾼于 session.timeout.ms 的1/3。可以将其调整得更⼩,以控制正常重新平衡的预期时间。
    session.timeout.ms当使⽤Kafka的消费组的时候,消费者周期性地向broker发送⼼跳数据,表明⾃⼰的存在。如果经过该超时时间还没有收到消费者的⼼跳,则broker将消费者从消费组移除,并启动再平衡。该值必须在broker配置 group.min.session.timeout.msgroup.max.session.timeout.ms 之间
    max.poll.records⼀次调⽤poll()⽅法返回的记录最⼤数量
    max.poll.interval.ms使⽤消费组的时候调⽤poll()⽅法的时间间隔。该条⽬指定了消费者调⽤poll()⽅法的最⼤时间间隔。如果在此时间内消费者没有调⽤poll()⽅法,则broker认为消费者失败,触发再平衡,将分区分配给消费组中其他消费者
    max.partition.fetch.bytes对每个分区,服务器返回的最⼤数量。消费者按批次拉取数据。如果⾮空分区的第⼀个记录⼤于这个值,批处理依然可以返回,以保证消费者可以进⾏下去。broker接收批的⼤⼩由 message.max.bytes (broker参数)max.message.bytes (主题参数)指定。fetch.max.bytes ⽤于限制消费者单次请求的数据量
    send.buffer.bytes⽤于TCP发送数据时使⽤的缓冲⼤⼩(SO_SNDBUF),-1表示使⽤OS默认的缓冲区⼤⼩。
    retry.backoff.ms在发⽣失败的时候如果需要重试,则该配置表示客户端等待多⻓时间再发起重试。该时间的存在避免了密集循环。
    request.timeout.ms客户端等待服务端响应的最⼤时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败。
    reconnect.backoff.ms重新连接主机的等待时间。避免了重连的密集循环。该等待时间应⽤于该客户端到broker的所有连接。
    reconnect.backoff.max.ms重新连接到反复连接失败的broker时要等待的最⻓时间(以毫秒为单位)。如果提供此选项,则对于每个连续的连接失败,每台主机的退避将成倍增加,直⾄达到此最⼤值。在计算退避增量之后,添加20%的随机抖动以避免连接⻛暴
    receive.buffer.bytesTCP连接接收数据的缓存(SO_RCVBUF)。-1表示使⽤操作系统的默认值
    partition.assignment.strategy当使⽤消费组的时候,分区分配策略的类名
    metrics.sample.window.ms计算指标样本的时间窗⼝
    metrics.recording.level指标的最⾼记录级别
    metrics.num.samples⽤于计算指标⽽维护的样本数量
    interceptor.classes拦截器类的列表。默认没有拦截器拦截器是消费者的拦截器,该拦截器需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接⼝。拦截器可⽤于对消费者接收到的消息进⾏拦截处理
  • 相关阅读:
    Linux常用命令大全
    Power BI vs Superset BI 调研报告
    怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据
    OLED透明屏的制造过程是怎样的?
    OA项目之会议发布
    Zabbix6.2配置ODBC监控oracle数据库
    Java继承 学习资料
    JVM —— 运行时数据区域
    【JavaEE】多线程案例-阻塞队列
    智能生成并盘活API研发资产
  • 原文地址:https://blog.csdn.net/weixin_40663800/article/details/125798363