• 消息监听器和消息监听容器


    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站

    一、消息监听器

    1.1、消息监听器接口

    消息监听器顾名思义用来接收消息,它是使用消息监听容器的必须条件。目前有8个消息监听器:

    • 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。
      public interface MessageListener<K, V> {
        void onMessage(ConsumerRecord<K, V> data);
      }
      
      • 1
      • 2
      • 3
    • 使用手动提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。
      public interface AcknowledgingMessageListener<K, V> {
        void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
      }
      
      • 1
      • 2
      • 3
    • 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。 提供对 Consumer 对象的访问。
      public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
        void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
      }
      
      • 1
      • 2
      • 3
    • 使用手动提交方法之一,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。 提供对 Consumer 对象的访问。
      public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
        void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
      }
      
      • 1
      • 2
      • 3
    • 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
      (使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。)(注意:这个接收所有)
      public interface BatchMessageListener<K, V> {
         void onMessage(List<ConsumerRecord<K, V>> data);
      }
      
      • 1
      • 2
      • 3
    • 使用手动提交方法之一,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。(注意:这个接收所有)
      public interface BatchAcknowledgingMessageListener<K, V> {
         void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
      }
      
      • 1
      • 2
      • 3
    • 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
      (使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。) 提供对 Consumer 对象的访问。(注意:这个接收所有)
      public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
        void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
      }
      
      • 1
      • 2
      • 3
    • 使用手动提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
      提供对 Consumer 对象的访问。(注意:这个接收所有)
      public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
          void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
      }
      
      • 1
      • 2
      • 3

    1、Consumer对象不是线程安全的;2、不要在这个过程中操作消费者位置和/或监听器中已提交偏移量。

    1.2、消息监听器容器

    消息监听容器有两种,一种是单线程消费,一种是多线程消费。

    1.2.1、单线程

    单线程实现为 KafkaMessageListenerContainer,KafkaMessageListenerContainer在单个线程上接收来自所有主题或分区的所有消息。

    • 使用 KafkaMessageListenerContainer
    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                        ContainerProperties containerProperties)
    
    • 1
    • 2

    通过 ContainerProperties 可以对主题和分区以及其他信息进行配置

    //主题设置
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    //监听器设置
    containerProps.setMessageListener(new MessageListener<Integer, String>() {
        ...
    });
    //消费者工厂配置
    DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<>(consumerProps());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    权限配置:authorizationExceptionRetryInterval。这是一个容器属性,它会从KafkaConsumer中获取获取信息,当配置的用户被拒绝读取特定主题时,
    就会发生触发 AuthorizationException。

    1.2.2、多线程

    多线程实现为 ConcurrentMessageListenerContainer ,ConcurrentMessageListenerContainer实际上是在给一个或多个KafkaMessageListenerContainer实例提供多线程消费,
    本质上最后进行工作的还是KafkaMessageListenerContainer,故此它的实现和 KafkaMessageListenerContainer类似,

    public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                                ContainerProperties containerProperties)
    
    • 1
    • 2

    它有一个concurrency属性,通过设置这个属性可以指定创建几个 KafkaMessageListenerContainer实例。

    假设我们有3个主题,每个主题有5个分区。同时设置 container.setConcurrency(15),我们希望的是有15个线程活动着,实际上只有5个活着,
    这是因为 Kafka 中默认的 PartitionAssignor是RangeAssignor。 在SpringBoot中可以设置: spring.kafka.consumer.properties.partition.assignment.strategy=
    org.apache.kafka.clients.consumer.RoundRobinAssignor 来更改策略。

    1.3、偏移量

    • 自动提交
      设置 enable.auto.commit 消费者属性为 true 即可。这个也是默认状态。

    • 手动提交
      设置 enable.auto.commit 消费者属性为 false 即可;同时注意设置 AckMode。以下是spring支持的集中类型说明(无事务)

      • RECORD:当侦听器处理记录后返回时提交偏移量。
      • BATCH:当返回的所有记录都poll()处理完毕后提交偏移量。
      • TIMEpoll():只要超出了ackTime自上次提交以来的记录,则在处理完返回的所有记录后提交偏移量。
      • COUNTpoll():只要ackCount自上次提交以来已收到记录,则在处理完返回的所有记录后提交偏移量。
      • COUNT_TIMETIME:与和类似COUNT,但如果任一条件为 ,则执行提交true。
      • MANUAL:消息监听者对acknowledge()负责Acknowledgment。之后,BATCH应用与 相同的语义。
      • MANUAL_IMMEDIATEAcknowledgment.acknowledge():当监听器调用该方法时立即提交偏移量。
    • 如何提交

    public interface Acknowledgment {
    
        void acknowledge();
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    如果要提交部分批次,使用nack(),使用事务时,设置AckMode为MANUAL; 调用nack()会将成功处理的记录的偏移量发送到事务。
    nack()只能在调用侦听器的消费者线程上调用。

  • 相关阅读:
    【畅购商城】购物车模块之修改购物车以及结算
    Reggie外卖项目 —— 分类管理模块之分类信息分页查询功能
    【Redis学习笔记】第二章 Redis数据类型
    移动边缘计算终端如何赋能高校学习空间智慧管理
    IDEA常用插件
    Nginx之动静分离
    快速上手SpringBoot继承SpringSecurity(安全框架)详细教程!
    uniapp 对于scroll-view滑动和页面滑动的联动处理
    基于MindSpore的llama微调在OpenI平台上运行
    中科大遭钓鱼邮件攻击了?3500名师生中招
  • 原文地址:https://blog.csdn.net/qq_35241329/article/details/133133190