• Kafka 消息队列 ( 四 ) 复杂应用


    5.复杂应用

    5.1.生产者

    5.1.1.带回调的生产者

    kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

     @RequestMapping("/kafka/callbackOne/{message}")
        public void sendMessage2(@PathVariable("message") String callbackMessage) {
    
            kafkaTemplate.send("test1", callbackMessage).addCallback(success -> {
                // 消息发送到的topic
                String topic = success.getRecordMetadata().topic();
                // 消息发送到的分区
                int partition = success.getRecordMetadata().partition();
                // 消息在分区内的offset
                long offset = success.getRecordMetadata().offset();
                System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
            }, failure -> {
                System.out.println("发送消息失败:" + failure.getMessage());
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
        @RequestMapping("/kafka/callbackTwo/{message}")
        public void sendMessage3(@PathVariable("message") String callbackMessage) {
    
            kafkaTemplate.send("test1", callbackMessage).addCallback(
                    new ListenableFutureCallback<SendResult<String, Object>>() {
                        @Override
                        public void onFailure(Throwable ex) {
                            System.out.println("发送消息失败:"+ex.getMessage());
                        }
                        @Override
                        public void onSuccess(SendResult<String, Object> result) {
                            System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                                    + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
                        }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    5.1.2.自定义分区器

    我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

    ① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;

    ② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;

    ③ patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;

    ※ 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,

    public class CustomizePartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            // 自定义分区规则(这里假设全部发到0号分区)
            // ......
            return 0;
        }
    
    @Override
    public void close() {
    
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
    
    }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名,

    # 自定义分区器
    spring.kafka.producer.properties.partitioner.class=com.yuan.kafka.producer.CustomizePartitioner
    
    • 1
    • 2

    5.1.3.kafka事务提交

    如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务,

    @GetMapping("/kafka/transaction")
    public void sendMessage7(){
        // 声明事务:后面报错 消息不会发出去
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send("topic1","test executeInTransaction");
            throw new RuntimeException("fail");
        });
    
    // 不声明事务:后面报错 但前面消息已经发送成功了
    
       kafkaTemplate.send("topic1","test executeInTransaction");
       throw new RuntimeException("fail");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    5.2.消费者

    5.2.0.常用方法

    https://blog.csdn.net/usagoole/article/details/82812896

    1. Consumer消费的一般步骤
      1.1. 构造Properties配置对象
      1.2. 创建KafkaConsumer实例
      1.3. 调用KafkaConsumer的subscribe订阅感兴趣的topic
      1.4. 循环调用KafkaConsumer的poll()获取ConsumerRecord
      1.5. 处理获取的ConsumerRecord
      1.6. 关闭KafkaConsumer,调用KafkaConsumer.close()

    2. 从Kafka Consumer的角度来说,poll方法返回即表示消费成功。
      2.1. 如果poll返回消息的速度很慢,可以调节参数来提升poll的效率
      2.2. 如果消息的业务逻辑处理很慢,应该把业务逻辑放到单独的线程中执行

    3. java Consumer是一个多线程的java进程,不是线程安全的,主线程创建KafkaConsumer以及poll的调用( 消费者组执行 rebalance、消息获取、 coordinator管理、异步任务结果的处理甚至位移提交等操作都是运行在用户主线程中的),consumer在后台会创建一个心跳线程,该线程被称为后台心跳线程。

    5.2.0.1.KafkaConsumer构造方法

    KafkaConsumer(java.util.Map configs) : 通过提供一组键值对作为配置来实例化使用者。
    KafkaConsumer(java.util.Map configs, Deserializer keyDeserializer, Deserializer valueDeserializer)
    KafkaConsumer(java.util.Properties properties):通过提供Properties对象作为配置来实例化使用者。
    KafkaConsumer(java.util.Properties properties, Deserializer keyDeserializer, Deserializer valueDeserializer)

    通过提供一组键值对作为配置来实例化使用者。有效的配置字符串在此处记录。值可以是字符串,也可以是适当类型的对象(例如,数字配置将接受字符串“ 42”或整数42)。
    有效的配置字符串记录在ConsumerConfig中
    keyDeserializer-实现解串器的密钥的解串器。直接将反序列化器传递给使用者时,不会在使用者中调用configure()方法。
    valueDeserializer - 用于实现反序列化的价值反序列化器。直接将反序列化器传递给使用者时,不会在使用者中调用configure()方法。

    5.2.0.2.主要方法

    assign(java.util.Collection partitions) :手动为该使用者分配分区列表。

    手动为该使用者分配分区列表。此接口不允许增量分配,并且将替换先前的分配(如果有)。
    如果给定的主题分区列表为空,则将其与unsubscribe()相同。
    通过这种方法手动分配主题不能使用使用者的组管理功能。 这样,当组成员身份或群集和主题元数据发生更改时,将不会触发任何重新平衡操作。
    请注意,不能同时将手动分区分配assign(Collection)与 组分配与subscribe(Collection,ConsumerRebalanceListener)一起使用。
    如果启用了自动提交,则会在新分配替换旧分配之前触发异步提交(基于旧分配)。
    抛出异常:java.lang.IllegalArgumentException - 如果分区为空或包含空主题或空主题
    java.lang.IllegalStateException - 如果先前使用主题或模式调用了subscribe()(随后未调用unsubscribe())

    java.util.Set assignment() :获取当前分配给该使用者的一组分区。

    获取当前分配给该使用者的一组分区。如果订阅是通过使用Assign(Collection)直接分配分区而发生的,那么这将简单地返回已分配的相同分区。如果使用了主题订阅,那么这将给出当前分配给使用者的主题分区集(如果尚未进行分配,或者正在重新分配分区,则可能为none)。

    java.util.Map beginningOffsets(java.util.Collection partitions) :获取给定分区的第一个偏移量。

    请注意,如果分区不存在,则此方法可能会无限期阻塞。
    此方法不会更改分区的当前使用者位置。
    返回值Map:给定分区的最早可用偏移量

    close() : 关闭使用者,等待最多30秒的默认超时以进行任何所需的清理。

    如果启用了自动提交,则将在默认超时范围内尽可能地提交当前偏移量。有关详细信息,请参见close(long,TimeUnit)。请注意,wakeup()不能用于中断关闭。

    close(long timeout, java.util.concurrent.TimeUnit timeUnit) :尝试在指定的超时时间内彻底关闭使用者。

    尝试在指定的超时时间内彻底关闭使用者。此方法等待超时,直到使用者完成挂起的提交并离开组。
    如果启用了自动提交,则将在超时范围内提交当前偏移量。如果使用者无法完成偏移量提交并在超时到期之前正常退出组,则将使用者强制关闭。
    请注意,wakeup()不能用于中断关闭。

    commitAsync(): 提交所有主题和分区列表的最后一个poll()返回的偏移量。

    提交所有主题和分区列表的最后一个poll()返回的偏移量。与commitAsync(null)相同

    commitAsync(java.util.Map offsets, OffsetCommitCallback callback) :将指定的主题和分区列表的指定偏移量提交给Kafka。

    同commitAsync(OffsetCommitCallback callback)

    commitAsync(OffsetCommitCallback callback) : 提交最后一个poll()返回的主题和分区订阅列表的偏移量

    这是一个异步调用,不会阻塞。遇到的任何错误都将传递给回调(如果提供)或被丢弃。
    通过多次调用此API提交的偏移量保证以与调用相同的顺序发送。相应的提交回调也以相同的顺序调用。
    另外请注意,通过此API提交的偏移量可以保证在后续调用commitSync()(和变体)返回之前完成。

    commitSync(): 提交的最后一个poll()返回的所有主题和分区订阅列表的偏移量。

    这只会向Kafka提交偏移量。使用此API提交的偏移量将在每次重新平衡后的首次获取时以及启动时使用
    因此,如果您需要将偏移量存储在Kafka以外的任何其他位置,则不应使用此API。
    这是一个同步提交,将阻塞直到提交成功或遇到不可恢复的错误(在这种情况下,它将被抛出给调用者)。
    请注意,之前通过commitAsync(OffsetCommitCallback)(或类似方法)发送的异步偏移提交保证可以在完成此方法之前调用其回调。
    抛出异常 : CommitFailedException - 如果提交失败并且无法重试。仅当您将自动组管理与subscribe(Collection)一起使用时,或者如果存在具有相同groupId的活动组正在使用组管理,才可能发生这种情况。
    WakeupException - 如果在调用此函数之前或同时调用了wakeup()
    InterruptException - 如果在调用此函数之前或期间中断了调用线程
    AuthenticationException-如果身份验证失败。
    AuthorizationException -
    KafkaException -

    commitSync(java.util.Map offsets): 为指定的主题和分区列表提交指定的偏移量。

    同上:这会将偏移量提交给Kafka。提交的偏移量应该是您的应用程序将使用的下一条消息,即lastProcessedMessageOffset +1。

    OffsetAndMetadata committed(TopicPartition partition) : 获取给定分区的最后提交的偏移量(是否此提交是由该进程还是其他进程执行)。

    获取给定分区的最后提交的偏移量(无论此提交是由该进程还是其他进程执行)。
    如果发生故障,此偏移量将用作使用者的位置。
    该调用将阻止进行远程调用,以从服务器获取最新的提交偏移量。
    返回:最后提交的偏移量和元数据,如果没有先前的提交,则为null

    java.util.Map endOffsets(java.util.Collection partitions) :获取给定分区的最后一个偏移量。

    分区的最后一个偏移量是即将到来的消息的偏移量,即最后一个可用消息的偏移量+ 1。
    请注意,如果分区不存在,则此方法可能会无限期阻塞。此方法不会更改分区的当前使用者位置。
    当Isolation.level = read_committed时,最后一个偏移将是最后一个稳定偏移(LSO)。这是第一个消息与未清事务的偏移量。随着事务完成,LSO向前发展。

    java.util.Map listTopics() :获取有关用户有权查看的所有主题的分区的元数据。

    获取有关用户有权查看的所有主题的分区的元数据。此方法将向服务器发出远程调用。

    **

    java.util.Map metrics() :获取消费者保留的指标

    java.util.Map offsetsForTimes(java.util.Map timestampsToSearch) : 通过时间戳查找给定分区的偏移量。

    每个分区的返回偏移量是最早的偏移量,其时间戳大于或等于相应分区中的给定时间戳记。
    这是一个阻塞调用。不必为使用者分配分区。
    如果分区中的消息格式版本低于0.10.0,即消息没有时间戳,则该分区将返回null。
    请注意,如果分区不存在,则此方法可能会无限期阻塞。
    Parameters: timestampsToSearch - 从分区到时间戳的映射进行查找。
    返回值:从分区到时间戳的映射,以及时间戳大于或等于目标时间戳的第一条消息的偏移量。如果没有这样的消息,将为该分区返回null。

    java.util.List partitionsFor(java.lang.String topic) : 获取有关给定主题的分区的元数据。

    获取有关给定主题的分区的元数据。如果该方法还没有有关给定主题的任何元数据,则此方法将向服务器发出远程调用。

    pause(java.util.Collection partitions) : 暂停从请求的分区中提取。

    将来对poll(long)的调用将不会从这些分区中返回任何记录,除非已使用resume(Collection)恢复了它们。
    请注意,此方法不会影响分区订阅。特别是,使用自动分配时,它不会导致组重新平衡。

    java.util.Set paused() : 获取先前通过调用pause(Collection)暂停的一组分区。

    ConsumerRecords poll(long timeout) :使用订阅/分配API之一获取指定主题或分区的数据。

    使用订阅/分配API之一获取指定主题或分区的数据。在轮询数据之前未预订任何主题或分区是一个错误。
    每次轮询时,消费者都将尝试使用上次消耗的偏移量作为起始偏移量,并依次获取。
    可以通过seek(TopicPartition,long)手动设置最后消耗的偏移量,或者自动将其设置为已订阅分区列表的最后提交的偏移量
    timeout参数:如果缓冲区中没有数据,则等待轮询所花费的时间(以毫秒为单位)。如果为0,则立即返回缓冲区中当前可用的任何记录,否则返回空。不能为负。
    返回:ConsumerRecords自上次获取主题和分区的已订阅列表以来的主题到记录的映射
    抛出异常:InvalidOffsetException -如果一个分区或一组分区的偏移量未定义或超出范围,并且尚未配置偏移量重置策略
    WakeupException - 如果在调用此函数之前或同时调用了wakeup()
    InterruptException - 如果在调用此函数之前或期间中断了调用线程
    AuthenticationException - 如果身份验证失败。
    AuthorizationException - 如果调用者缺乏对任何已订阅主题或已配置groupId的读取权限。
    KafkaException - 其他任何不可恢复的错误(例如,无效的groupId或会话超时,反序列化键/值对的错误或将来版本中的任何新错误情况)
    java.lang.IllegalArgumentException -如果超时值为负
    java.lang.IllegalStateException - 如果使用者未订阅任何主题或未手动分配任何分区以从中使用

    long position(TopicPartition partition) : 获取将要提取的下一条记录的偏移量(如果存在具有该偏移量的记录)。

    获取将要提取的下一条记录的偏移量(如果存在具有该偏移量的记录)。
    抛异常的情况:java.lang.IllegalArgumentException - 如果提供的TopicPartition没有分配给此使用者
    InvalidOffsetException - 如果当前没有为分区定义偏移量
    WakeupException - if wakeup() is called before or while this function is called
    InterruptException - if the calling thread is interrupted before or while this function is called
    AuthenticationException - if authentication fails. See the exception for more details
    AuthorizationException - if not authorized to the topic or to the configured groupId. See the exception for more details
    KafkaException - for any other unrecoverable errors

    resume(java.util.Collection partitions) : 恢复已使用pause(Collection)暂停的指定分区。

    如果要提取新的poll(long)调用,将从这些分区返回记录。如果分区以前没有暂停过,则此方法为无操作。

    seek(TopicPartition partition, long offset) : 覆盖使用者将在下一次轮询(超时)时使用的获取偏移量。

    覆盖使用者将在下一次轮询(超时)时使用的获取偏移量。如果对同一分区多次调用此API,则最新的偏移量将用于下一个poll()。
    请注意,如果在使用过程中随意使用此API来重置获取偏移量,则可能会丢失数据
    抛出:java.lang.IllegalArgumentException - 如果提供的TopicPartition没有分配给此使用者,或者提供的偏移量为负

    seekToBeginning(java.util.Collection partitions) : 寻找每个给定分区的第一个偏移量。

    寻找每个给定分区的第一个偏移量。
    此函数延迟计算,仅在调用poll(long)或position(TopicPartition)时才寻求所有分区中的第一个偏移量。如果没有提供分区,为所有当前分配的分区寻找第一个偏移量。

    seekToEnd(java.util.Collection partitions) :寻找每个给定分区的最后一个偏移量。

    此函数延迟计算,仅在调用poll(long)或position(TopicPartition)时才在所有分区中寻求最终偏移量。
    如果没有提供分区,为所有当前分配的分区寻找最终偏移量。

    subscribe(java.util.Collection topics) : 订阅给定的主题列表以获取动态分配的分区。

    订阅给定的主题列表以获取动态分配的分区。主题订阅不是增量订阅。此列表将替换当前的分配(如果有)。
    无法将主题订阅与组管理结合在一起,并通过Assign(Collection)进行手动分区分配。如果给定的主题列表为空,则将其与unsubscribe()相同。
    这是使用noop侦听器的subscription(Collection,ConsumerRebalanceListener)的简写。如果您需要寻找特定偏移量的能力,则应首选使用subscription(Collection,ConsumerRebalanceListener),因为组重新平衡会导致分区偏移量被重置。如果要执行自己的偏移量管理,还应该提供自己的侦听器,因为侦听器使您有机会在重新平衡完成之前提交偏移量。

    subscribe(java.util.Collection topics, ConsumerRebalanceListener listener) : 订阅给定的主题列表以获取动态分配的分区。

    订阅给定的主题列表以获取动态分配的分区。主题订阅不是增量订阅。此列表将替换当前的分配(如果有)。请注意,不可能将主题订阅与组管理与通过Assign(Collection)进行手动分区分配相结合。如果给定的主题列表为空,则将其与unsubscribe()相同。
    作为组管理的一部分,使用方将跟踪属于特定组的使用方列表,并在以下事件之一触发时触发重新平衡操作:

    1.任何已订阅主题列表的分区数更改
    2.主题已创建或删除
    3.消费者组的现有成员死亡
    4.通过join API将新成员添加到现有使用者组
    当触发这些事件中的任何一个时,将首先调用提供的侦听器以指示消费者的分配已被撤销,然后在接收到新的分配时再次调用。请注意,此侦听器将立即覆盖在先前的订阅中设置的所有侦听器。但是,可以确保通过此接口吊销/分配的分区来自此调用中预订的主题。有关更多详细信息,请参见ConsumerRebalanceListener。
    抛出异常:java.lang.IllegalArgumentException - 如果topic为null或包含null或为空的元素,或者listener为null
    java.lang.IllegalStateException - 如果使用模式先前调用了subscription(),或者先前调用了assign(没有随后的对unsubscribe()的调用),或者至少未配置分区分配策略
    subscribe(java.util.regex.Pattern pattern):订阅与指定模式匹配的所有主题以获取动态分配的分区。
    订阅与指定模式匹配的所有主题以获取动态分配的分区。模式匹配将针对检查时存在的主题定期进行。
    这是使用noop(空)侦听器的subscription(Pattern,ConsumerRebalanceListener)的简写。
    如果您需要寻找特定偏移量的能力,则应首选使用subscription(Pattern,ConsumerRebalanceListener),因为组重新平衡会导致分区偏移量被重置。
    如果要执行自己的偏移量管理,还应该提供自己的侦听器,因为侦听器使您有机会在重新平衡完成之前提交偏移量。

    subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceListener listener) :订阅与指定模式匹配的所有主题以获取动态分配的分区。

    订阅与指定模式匹配的所有主题以获取动态分配的分区。模式匹配将针对检查时存在的主题定期进行。
    作为组管理的一部分,使用方将跟踪属于特定组的使用方列表,并在以下事件之一触发时触发重新平衡操作:
    1.任何已订阅主题列表的分区数更改 2.主题已创建或删除 3.消费者组的现有成员死亡 4.通过join API将新成员添加到现有使用者组

    java.util.Set subscription() :获取当前订阅。

    获取当前订阅。将返回与最近一次对subscription的调用相同的主题(Collection,ConsumerRebalanceListener),如果未进行此类调用,则返回一个空集。

    unsubscribe() :取消订阅当前使用subscribe(Collection)或subscribe(Pattern)订阅的主题。

    取消订阅当前使用subscribe(Collection)或subscribe(Pattern)订阅的主题。这还将清除所有直接通过assign(Collection)分配的分区。

    wakeup() : 唤醒消费者。

    此方法是线程安全的,特别用于中止长时间轮询。
    在操作中阻塞的线程将引发WakeupException。如果没有线程阻塞可以抛出WakeupException的方法,则对该方法的下一次调用将引发它。

    疑问:
    consumer.poll(Long.MAX_VALUE); 会一直等待还是等到什么情况返回。
    ConsumerRecords records(partition)方法的使用
    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
    for (TopicPartition partition : records.partitions()) {undefined
    List> partitionRecords = records.records(partition);

    public java.util.Map offsetsForTimes(java.util.Map timestampsToSearch)) 用法

    5.2.1.指定topic、partition、offset消费

    前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供,

    /**4
     * @Title 指定topic、partition、offset消费
     * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
     * @Author long.yuan
     * @Date 2020/3/22 13:38
     * @Param [record]
     * @return void
       **/
       @KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
           @TopicPartition(topic = "topic1", partitions = { "0" }),
           @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
       })
       public void onMessage2(ConsumerRecord<?, ?> record) {
           System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    属性解释:

    ① id:消费者ID;

    ② groupId:消费组ID;

    ③ topics:监听的topic,可监听多个;

    ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

    上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。

    注意:topics和topicPartitions不能同时使用;

    5.2.2.批量消费

    设置application.prpertise开启批量消费即可,

    # 设置批量消费
    
    spring.kafka.listener.type=batch
    
    # 批量消费每次最多消费多少条消息
    
    spring.kafka.consumer.max-poll-records=50
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    接收消息时用List来接收,监听代码如下,

    @KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
    public void onMessage3(List<ConsumerRecord<?, ?>> records) {
        System.out.println(">>>批量消费一次,records.size()="+records.size());
        for (ConsumerRecord<?, ?> record : records) {
            System.out.println(record.value());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    5.2.3.ConsumerAwareListenerErrorHandler 异常处理器

    通过异常处理器,我们可以处理consumer在消费时发生的异常。

    新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,

    // 新建一个异常处理器,用@Bean注入
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return (message, exception, consumer) -> {
            System.out.println("消费异常:"+message.getPayload());
            return null;
        };
    }
    
    // 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
    @KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
    public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
        throw new Exception("简单消费-模拟异常");
    }
    
    // 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
    @KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
    public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
        System.out.println("批量消费一次...");
        throw new Exception("批量消费-模拟异常");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    5.2.4.消息过滤器

    消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

    配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

    @Component
    public class KafkaConsumer {
        @Autowired
        ConsumerFactory consumerFactory;
    
    // 消息过滤器
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 消息过滤策略
        factory.setRecordFilterStrategy(consumerRecord -> {
            if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                return false;
            }
            //返回true消息则被过滤
            return true;
        });
        return factory;
    }
    
    // 消息过滤监听
    @KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
    public void onMessage6(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向topic1发送0-99总共100条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数,

    5.2.5.消息转发

    在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

    在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下,

    /**
    
     * @Title 消息转发
     * @Description 从topic1接收到的消息经过处理后转发到topic2
       **/
       @KafkaListener(topics = {"topic1"})
       @SendTo("topic2")
       public String onMessage7(ConsumerRecord<?, ?> record) {
       return record.value()+"-forward message";
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    5.2.6.定时启动、停止监听器

    默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

    ① 禁止监听器自启动;

    ② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;

    新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动

    @EnableScheduling
    @Component
    public class CronTimer {
    
    /**
    
     * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
     * 而是会被注册在KafkaListenerEndpointRegistry中,
     * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
       **/
       @Autowired
       private KafkaListenerEndpointRegistry registry;
    
    @Autowired
    private ConsumerFactory consumerFactory;
    
    // 监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止KafkaListener自启动
        container.setAutoStartup(false);
        return container;
    }
    
    // 监听器
    @KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
    public void onMessage1(ConsumerRecord<?, ?> record){
        System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
        
        
        
    
    // 定时启动监听器
    @Scheduled(cron = "0 42 11 * * ? ")
    public void startListener() {
        System.out.println("启动监听器...");
        // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
        if (!registry.getListenerContainer("timingConsumer").isRunning()) {
            registry.getListenerContainer("timingConsumer").start();
        }
        //registry.getListenerContainer("timingConsumer").resume();
    }
    
    // 定时停止监听器
    @Scheduled(cron = "0 45 11 * * ? ")
    public void shutDownListener() {
        System.out.println("关闭监听器...");
        registry.getListenerContainer("timingConsumer").pause();
    }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作,

    11:42分监听器启动开始工作,消费消息,

    11:45分监听器停止工作,

  • 相关阅读:
    不要再重复造轮子了,Hutool 这款开源工具类库贼好使
    服务器之Apollo单机部署(快速安装)
    opencv语法Mat类型总结
    java计算机毕业设计高校社团管理系统源码+mysql数据库+系统+lw文档+部署
    java-net-php-python-java西藏文库计算机毕业设计程序
    Leetcode148. 排序链表
    [Unity]将所有 TGA、TIFF、PSD 和 BMP(可自定义)纹理转换为 PNG,以减小项目大小,而不会在 Unity 中造成任何质量损失
    网络安全(黑客)自学
    vue3项目的创建、入口文件、全局方法、生命周期函数、setup中的生命周期函数使用、data的函数方式
    算法 顺时针旋转矩阵
  • 原文地址:https://blog.csdn.net/yuanchun05/article/details/127877299