• Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例


    目录

    Spring Kafka消费消息的模式分为2种模式(对应spring.kafka.listener.type配置):

    • single - 每次消费单条记录
    • batch - 批量消费消息列表

    且每种模式都分为2种提交已消费消息offset的ack模式:

    • 自动确认
    • 手动确认

    接下来依次讲解这两种消费模式及其对应的ack模式的示例配置及代码。

    1. 单记录消费listener.type=single

    本章节先来讲讲record模式 - 单记录消费,且分为自动确认和手动确认2种方式来提交已消费消息offset。

    1.1 单记录消费 - 自动确认

    即由Spring Kafak框架按照配置规则自动提交已消费消息offset,无需程序手动编码控制。

    需注意如下对应配置:

    # ============ 方式1:定时自动提交[不推荐] =====================
    # 开启自动提交(按周期)已消费offset
    spring.kafka.consumer.enable-auto-commit: true
    # 自动提交已消费offset时间价格(配置enable-auto-commit=true时使用)
    spring.kafka.consumer.auto-commit-interval: 1s

    # ========= 方式2:通过ack-mode设置自动提交[推荐] =============
    # 禁用自动提交(按周期)已消费offset
    spring.kafka.consumer.enable-auto-commit: false
    # listener类型为单条记录single类型(默认为single单条消费模式)
    spring.kafka.listener.type: single
    # offset提交模式为record
    spring.kafka.listener.ack-mode: record

    注:
    关于消费者提交已消费消息offset的相关配置说明:

    • spring.kafka.consumer.enbable-auto-commit
      • true 自动提交已消费消息offset
        • auto-commit-interval 设置自动提交间隔
      • fasle 由程序控制已消费消息offset提交
        • spring.kafka.listener.ack-mode 已消费offset提交模式

    spring.kafka.listener.ack-mode列表(详细说明参见:SpringKafka - Committing Offsets

    ack-mode模式

    说明

    自动提交

    RECORD
    单记录

    当每一条记录被消费者监听器(ListenerConsumer)处理之后提交

    BATCH(默认)
    批量

    当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交

    TIME
    超时

    当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
    (通过spring.kafka.listener.ack-time设置触发时间)

    COUNT
    超过消费数量

    当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
    (通过spring.kafka.listener.ack-count设置触发数量)

    COUNT_TIME
    超时或超数量

    TIME或COUNT 有一个条件满足时提交

    MANUAL
    手动提交(ack)后同BATCH

    当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交

    需要手动使用
    Acknowledgment参数提交

    MANUAL_IMMEDIATE
    手动立即提交

    手动调用Acknowledgment.acknowledge()后立即提交

    需要手动使用
    Acknowledgment参数提交

    消费端配置示例1 - 定时自动提交[不推荐]

    spring:
      kafka:
        # 逗号分隔的集群broker列表
        bootstrap-servers: localhost:9092
        # ====================================================
        # ================== 消费者配置 ========================
        # ====================================================
        consumer:
          # 自动提交(按周期)已消费offset
          enable-auto-commit: true
          # 自动提交已消费offset时间价格(配置enable-auto-commit=true时使用)
          auto-commit-interval: 1s
          ...
        # ====================================================
        # ============= 消费者监听器(及线程池)配置 ==============
        # ====================================================
        listener:
          # listener类型
          # single | batch
          type: single
        # ====================================================
        # ============= 具体业务Kafka定义=======================
        # ====================================================
        biz1:
          topic: topic1
          consumer:
            group: group1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    消费端配置示例2 - 通过ack-mode设置自动提交[推荐]
    listener自动提交offsetd的ack-mode模式包括:RECORD | BATCH | TIME | COUNT | COUNT_TIME
    且使用相关自动模式不可在@KafkaListener标注方法中使用Acknowledgment参数。

    spring:
      kafka:
        # 逗号分隔的集群broker列表
        bootstrap-servers: localhost:9092
        # ====================================================
        # ================== 消费者配置 ========================
        # ====================================================
        consumer:
          # 禁用自动提交(按周期)已消费offset
          enable-auto-commit: false
          ...
        # ====================================================
        # ============= 消费者监听器(及线程池)配置 ==============
        # ====================================================
        listener:
          # listener类型
          # single | batch
          type: single
          # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
          # 单记录  | 批量         | 超时  | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
          # RECORD | BATCH(默认) | TIME | COUNT      | COUNT_TIME   | MANUAL                | MANUAL_IMMEDIATE
          # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
          # 注:listener自动提交offset模式包括:RECORD | BATCH | TIME | COUNT | COUNT_TIME ,
          #    且使用相关自动模式不可在@KafkaListener标注方法中使用Acknowledgment参数
          ack-mode: record
        # ====================================================
        # ============= 具体业务Kafka定义=======================
        # ====================================================
        biz1:
          topic: topic1
          consumer:
            group: group1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    消费端代码示例

    /**
     * 定义biz1消息接收者
     *
     * @param message
     * @rabbit.exhange exchange1
     * @rabbit.aueue queue1
     * @rabbit.bindingKey #
     */
    @KafkaListener(
            id = "biz1-${spring.kafka.biz1.consumer.group}",
            groupId = "${spring.kafka.biz1.consumer.group}",
            topics = "${spring.kafka.biz1.topic}")
    public void biz1Consumer(String message) {
        log.info("[biz1Consumer] RECV MSG: {}", message);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1.2 单记录消费 - 手动确认

    需注意如下对应配置:

    # 禁用自动提交(按周期)已消费offset
    spring.kafka.consumer.enable-auto-commit: false
    # listener类型为单条记录single类型(默认为single单条消费模式)
    spring.kafka.listener.type: single
    # offset提交模式为manual_immediate
    spring.kafka.listener.ack-mode: manual_immediate

    消费端配置示例
    手动提交offset的ack-mode模式包括:MANUAL | MANUAL_IMMEDIATE
    且使用相关手动模式需在@KafkaListener标注方法中使用Acknowledgment参数。

    spring:
    #  profiles:
    #    active: kafka-origin
      kafka:
        # 逗号分隔的集群broker列表
        bootstrap-servers: localhost:9092
        # ====================================================
        # ================== 消费者配置 ========================
        # ====================================================
        consumer:
          # 禁用自动提交(按周期)已消费offset
          enable-auto-commit: false
          ...
        # ====================================================
        # ============= 消费者监听器(及线程池)配置 ==============
        # ====================================================
        listener:
          # listener类型
          # single | batch
          type: single
          # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
          # 单记录  | 批量   | 超时  | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
          # RECORD | BATCH | TIME | COUNT      | COUNT_TIME   | MANUAL                | MANUAL_IMMEDIATE
          # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
          # 注:手动提交offset模式包括:MANUAL | MANUAL_IMMEDIATE
          #    且使用相关手动模式需在@KafkaListener标注方法中使用Acknowledgment参数
          ack-mode: manual_immediate
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    消费端代码示例
    参考:SpringKafka - Manual Acknowledgment

    /**
     * 定义biz1消息接收者
     *
     * @param message
     * @kafka.topic topic1
     * @kafka.group group1
     */
    @KafkaListener(
            id = "biz1-${spring.kafka.biz1.consumer.group}",
            groupId = "${spring.kafka.biz1.consumer.group}",
            topics = "${spring.kafka.biz1.topic}")
    public void biz1Consumer(String message, Acknowledgment ack) {
        log.info("[biz1Consumer] RECV MSG: {}", message);
        //确认单当前消息(及之前的消息)offset均已被消费完成
        ack.acknowledge();
        //拒绝当前消息(此方法仅适用于listener.type=single)
        //当前poll查询出的剩余消息记录均被抛弃,
        //且当前消费线程在阻塞指定sleep(如下3000毫秒)后重新调用poll获取待消费消息(包括之前poll被抛弃的消息)
        //ack.nack(3000)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2. 批量消费listener.type=batch

    在一些需要通过批量处理消息的场景中,SpringKafka支持使用Batch Listeners,即批量处理消息列表。
    本章节主要讲解batch模式 - 批量消费,且同样分为自动确认和手动确认2种方式来提交已消费消息offset。

    2.1 批量消费 - 自动确认

    需注意如下对应配置:

    # 禁用自动提交(按周期)已消费offset
    spring.kafka.consumer.enable-auto-commit: false
    # 批量消费的单次最大消费记录数
    spring.kafka.consumer.max-poll-reocrds: 50
    # listener类型为批量batch类型(默认为single单条消费模式)
    spring.kafka.listener.type: batch
    # offset提交模式为batch(不可使用record - 启动报错)
    spring.kafka.listener.ack-mode: batch

    配置示例如下

    spring:
      kafka:
        # 逗号分隔的集群broker列表
        bootstrap-servers: localhost:9092
        # ====================================================
        # ================== 消费者配置 ========================
        # ====================================================
        consumer:
          # 禁用自动提交(按周期)已消费offset
          enable-auto-commit: false
          # 单次poll()调用返回的记录数
          max-poll-records: 50
        # ====================================================
        # ============= 消费者监听器(及线程池)配置 ==============
        # ====================================================
        listener:
          # listener类型
          # single | batch
          type: batch
          # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
          # 单记录  | 批量   | 超时  | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
          # RECORD | BATCH | TIME | COUNT      | COUNT_TIME   | MANUAL                | MANUAL_IMMEDIATE
          # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
          ack-mode: batch
        # ====================================================
        # ============= 具体业务Kafka定义=======================
        # ====================================================
        biz1:
          topic: topic1
          consumer:
            group: group1
        biz2:
          topic: topic2
          consumer:
            group: group2
            # 分区格式示例:0 | 0,1,2 | 0-3 
            partitions: 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    消费端代码示例

    /**
     * 定义biz1消息接收者
     * 自动模式(无需手动ack):
     * 1. listener.type=batch
     * 2. ack-mode=batch
     *
     * @param messages
     * @kafka.topic topic1
     * @kafka.group group1
     */
    @KafkaListener(
            id = "biz1-${spring.kafka.biz1.consumer.group}",
            groupId = "${spring.kafka.biz1.consumer.group}",
            topics = "${spring.kafka.biz1.topic}")
    public void biz1Consumer(List messages) {
        log.info("[biz1Consumer] RECV MSG COUNT: {}", messages.size());
        log.info("[biz1Consumer] RECV MSG[0]: {}", messages.get(0));
    }
    
    /**
     * 定义biz2消息接收者
     * 自动模式(无需手动ack):
     * 1. listener.type=batch
     * 2. ack-mode=batch
     * 
     * @param messages
     * @kafka.topic topic2
     * @kafka.group group2
     */
    @KafkaListener(
            id = "biz2-${spring.kafka.biz2.consumer.group}",
            groupId = "${spring.kafka.biz2.consumer.group}",
            //消费指定分区
            topicPartitions = {
                    @TopicPartition(topic = "${spring.kafka.biz2.topic}", partitions = "${spring.kafka.biz2.consumer.partitions}")
            })
    public void biz2Consumer(List messages) {
        log.info("[biz2Consumer] RECV MSG COUNT: {}", messages.size());
        log.info("[biz2Consumer] RECV MSG[0]: {}", messages.get(0));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    在这里插入图片描述

    2.2 批量消费 - 手动确认

    需注意如下对应配置:

    # 禁用自动提交(按周期)已消费offset
    spring.kafka.consumer.enable-auto-commit: false
    # 批量消费的单次最大消费记录数
    spring.kafka.consumer.max-poll-reocrds: 50
    # listener类型为批量batch类型(默认为single单条消费模式)
    spring.kafka.listener.type: batch
    # offset提交模式为batch(不可使用record - 启动报错)
    spring.kafka.listener.ack-mode: manual

    配置示例如下

    spring:
      kafka:
        # 逗号分隔的集群broker列表
        bootstrap-servers: localhost:9092
        # ====================================================
        # ================== 消费者配置 ========================
        # ====================================================
        consumer:
          # 禁用自动提交(按周期)已消费offset
          enable-auto-commit: false
          # 单次poll()调用返回的记录数
          max-poll-records: 50
        # ====================================================
        # ============= 消费者监听器(及线程池)配置 ==============
        # ====================================================
        listener:
          # listener类型
          # single | batch
          type: batch
          # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定)
          # 单记录  | 批量   | 超时  | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交
          # RECORD | BATCH | TIME | COUNT      | COUNT_TIME   | MANUAL                | MANUAL_IMMEDIATE
          # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
          ack-mode: manual
        # ====================================================
        # ============= 具体业务Kafka定义=======================
        # ====================================================
        biz1:
          topic: topic1
          consumer:
            group: group1
        biz2:
          topic: topic2
          consumer:
            group: group2
            partitions: 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    消费端代码示例

    /**
     * 定义biz1消息接收者
     * 手动模式(需手动ack):
     * 1. listener.type=batch
     * 2. ack-mode=manual
     *
     * @param messages
     * @kafka.topic topic1
     * @kafka.group group1
     */
    @KafkaListener(
            id = "biz1-${spring.kafka.biz1.consumer.group}",
            groupId = "${spring.kafka.biz1.consumer.group}",
            //仅在多partition单个消费者时,用于多线程消费消息(concurrency <= partition数量)
            //当存在多个消费者时,即便设置concurrency > 1也仅有唯一消费线程生效
            concurrency = "${spring.kafka.biz1.consumer.concurrency}",
            topics = "${spring.kafka.biz1.topic}")
    public void biz1Consumer(List messages, Acknowledgment ack) {
        log.info("[biz1Consumer] RECV MSG COUNT: {}", messages.size());
        log.info("[biz1Consumer] RECV MSG[0]: {}", messages.get(0));
        //确认单当前消息(及之前的消息)offset均已被消费完成
        ack.acknowledge();
    
        //拒绝消息列表中指定index(发生错误的消息index)对应的消息(此方法仅适用于listener.type=batch),
        //当前指定index之前的消息会被成功提交,
        //当前poll查询出的剩余消息记录(包括当前指定的index)均被抛弃,
        //且当前消费线程在阻塞指定sleep(如下3000毫秒)后重新调用poll获取待消费消息(包括当前index及之前poll抛弃的消息)
        //如下即确认当前list中前5条消息(0-4),抛弃当前list中后续消息,3秒后再次poll查询未消费消息
        //ack.nack(5, 3000);
    }
    
    
    /**
     * 定义biz2消息接收者
     * 手动模式(需手动ack):
     * 1. listener.type=batch
     * 2. ack-mode=manual
     * 
     * @param messages
     * @kafka.topic topic2
     * @kafka.group group2
     */
    @KafkaListener(
            id = "biz2-${spring.kafka.biz2.consumer.group}",
            groupId = "${spring.kafka.biz2.consumer.group}",
            //消费指定分区
            topicPartitions = {
                    @TopicPartition(topic = "${spring.kafka.biz2.topic}", partitions = "${spring.kafka.biz2.consumer.partitions}")
            })
    public void biz2Consumer(List messages, Acknowledgment ack) {
        log.info("[biz2Consumer] RECV MSG COUNT: {}", messages.size());
        log.info("[biz2Consumer] RECV MSG[0]: {}", messages.get(0));
        //确认单当前消息(及之前的消息)offset均已被消费完成
        ack.acknowledge();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    3. 手动模式下的acknowledge和nack方法

    在手动确认模式下,除了支持ack.acknowledge()方法用于确认单条记录(对应record模式)或者批次记录(对应batch模式),
    还支持nack方法用于拒绝消息,关于acknowledgenack方法的详细使用见下表:

    方法

    说明

    适用listener.type

    acknowledge()

    确认单条消息 或 整批消息

    single
    batch

    nack(sleepMills)

    拒绝确认当前消息,
    当前poll查询出的剩余消息记录均被抛弃,
    且当前消费线程在阻塞指定sleepMills时间后
    会重新调用poll获取待消费消息(包括之前poll被抛弃的消息)

    single

    nack(index, sleepMills)

    拒绝消息列表中指定index(发生错误的消息index)及其之后index对应的消息,
    当前指定index之前的消息会被成功提交,
    当前poll查询出的剩余消息记录(包括当前指定的index)均被抛弃,
    且当前消费线程在阻塞指定sleepMills时间后
    会重新调用poll获取待消费消息(包括当前index及之前poll抛弃的消息)

    batch

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    cpu设计和实现(数据访问)
    线程常见的几种方法
    目标检测——UCF50动作识别数据集
    介绍 10 个有用的 Flutter 软件包
    星空投影仪美国亚马逊审核标准UL62368检测项目介绍
    1.初识python
    [c语言]小课堂 day1
    工地反光衣识别检测系统
    13.深入浅出高速缓存带来的可见性问题
    (免费分享)基于springboot财务管理系统
  • 原文地址:https://blog.csdn.net/drnrrwfs/article/details/126114152