• kafka-消费者组(SpringBoot整合Kafka)


    1、消费者组

    1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

    在这里插入图片描述

    1.2、创建生产者发送消息

    package com.atguigu.spring.kafka.consumer;
    
    import jakarta.annotation.Resource;
    import org.junit.jupiter.api.Test;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.KafkaTemplate;
    
    @SpringBootTest
    class SpringKafkaConsumerApplicationTests {
        @Resource
        KafkaTemplate kafkaTemplate;
    
        @Test
        void contextLoads() {
            for (int i = 0; i < 10; i++) {
                kafkaTemplate.send("my_topic1",i%6,"", "消费者组"+i);
            }
        }
    }
    
    

    在这里插入图片描述
    在这里插入图片描述

    1.3、application.yml配置

    server:
      port: 8120
    
    # v1
    spring:
      Kafka:
        bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
        consumer:
          # read-committed读事务已提交的消息 解决脏读问题
          isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
          # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
          enable-auto-commit: true 
          # 消费者提交ack时多长时间批量提交一次
          auto-commit-interval: 1000
          # 消费者第一次消费主题消息时从哪个位置开始
          auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    
    

    1.4、创建消费者监听器

    package com.atguigu.spring.kafka.consumer.listener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    @Component
    public class MyKafkaListener {
    
        @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
        public void onMessage1(ConsumerRecord<String, String> record) {
            System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()
                    +",partition:"+record.partition()
                    +",offset = "+record.offset()
                    +",key = "+record.key()
                    +",value = "+record.value());
        }
        @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
        public void onMessage2(ConsumerRecord<String, String> record) {
            System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic()
                    +",partition:"+record.partition()
                    +",offset = "+record.offset()
                    +",key = "+record.key()
                    +",value = "+record.value());
        }
        @KafkaListener(topics ={"my_topic1"},groupId = "my_group2")
        public void onMessage3(ConsumerRecord<String, String> record) {
            System.out.println("my_group2消费者获取到消息:topic = "+ record.topic()
                    +",partition:"+record.partition()
                    +",offset = "+record.offset()
                    +",key = "+record.key()
                    +",value = "+record.value());
        }
    
    }
    
    

    1.5、创建SpringBoot启动类

    package com.atguigu.spring.kafka.consumer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    
    // Generated by https://start.springboot.io
    // 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
    @SpringBootApplication
    public class SpringKafkaConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringKafkaConsumerApplication.class, args);
        }
    
    }
    
    

    1.6、屏蔽 kafka debug 日志 logback.xml

    <configuration>      
        
        <logger name="org.apache.kafka.clients" level="debug" />
    configuration>
    

    1.7、引入spring-kafka依赖

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>3.0.5version>
            <relativePath/> 
        parent>
    
        
        
        <groupId>com.atguigugroupId>
        <artifactId>spring-kafka-consumerartifactId>
        <version>0.0.1-SNAPSHOTversion>
        <name>spring-kafka-consumername>
        <description>spring-kafka-consumerdescription>
        <properties>
            <java.version>17java.version>
        properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starterartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafkaartifactId>
            dependency>
    
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                plugin>
            plugins>
        build>
    
    project>
    
    
    

    1.8、消费者控制台:

      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::                (v3.0.5)
    
    my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
    my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
    my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
    my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
    my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
    my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
    my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
    my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
    my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
    my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
    my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
    my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
    my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
    my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
    my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
    my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
    my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
    my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
    my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
    my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
    

    在这里插入图片描述

  • 相关阅读:
    SpringBoot_快速入门
    C语言数据结构 —— 复杂度
    Postman使用实例
    [最短路]猛犸不上 Ban 2021RoboCom决赛D
    【Java 基础篇】Properties 结合集合类的使用详解
    03MyBatis-Plus中的常用注解
    详解设计模式:责任链模式
    程序地址空间--Linux
    中断:ZYNQ
    车载蓝牙PIN码是什么
  • 原文地址:https://blog.csdn.net/m0_65152767/article/details/139495545