• kafka-消费者服务搭建&配置&简单消费(SpringBoot整合Kafka)


    1、使用efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述在这里插入图片描述

    2、创建生产者发送消息

    [root@localhost ~]# kafka-console-producer.sh --bootstrap-server 192.168.74.148:9095,192.168.748:9096,192.168.74.148:9097 --topic my_topic1
    >1
    >2
    >3
    >
    

    在这里插入图片描述

    [
      [
        {
          "partition": 1,
          "offset": 0,
          "msg": "1",
          "timespan": 1717592203289,
          "date": "2024-06-05 12:56:43"
        },
        {
          "partition": 1,
          "offset": 1,
          "msg": "2",
          "timespan": 1717592204046,
          "date": "2024-06-05 12:56:44"
        },
        {
          "partition": 1,
          "offset": 2,
          "msg": "3",
          "timespan": 1717592204473,
          "date": "2024-06-05 12:56:44"
        }
      ]
    ]
    

    3、application.yml配置

    server:
      port: 8120
    
    # v1
    spring:
      Kafka:
        bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
        consumer:
          # read-committed读事务已提交的消息 解决脏读问题
          isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
          # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
          enable-auto-commit: true 
          # 消费者提交ack时多长时间批量提交一次
          auto-commit-interval: 1000
          # 消费者第一次消费主题消息时从哪个位置开始
          auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    

    4、创建消费者监听器

    package com.atguigu.spring.kafka.consumer.listener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    @Component
    public class MyKafkaListener {
    
        @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
        public void onMessage(ConsumerRecord<String, String> record) {
            System.out.println("消费者获取到消息:topic = "+ record.topic()
                    +",partition:"+record.partition()
                    +",offset = "+record.offset()
                    +",key = "+record.key()
                    +",value = "+record.value());
        }
    
    }
    
    

    5、创建SpringBoot启动类

    package com.atguigu.spring.kafka.consumer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    
    // Generated by https://start.springboot.io
    // 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
    @SpringBootApplication
    public class SpringKafkaConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringKafkaConsumerApplication.class, args);
        }
    
    }
    
    

    6、屏蔽 kafka debug 日志 logback.xml

    <configuration>      
        
        <logger name="org.apache.kafka.clients" level="debug" />
    configuration>
    
    

    7、引入spring-kafka依赖

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>3.0.5version>
            <relativePath/> 
        parent>
    
        
        
        <groupId>com.atguigugroupId>
        <artifactId>spring-kafka-consumerartifactId>
        <version>0.0.1-SNAPSHOTversion>
        <name>spring-kafka-consumername>
        <description>spring-kafka-consumerdescription>
        <properties>
            <java.version>17java.version>
        properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starterartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafkaartifactId>
            dependency>
    
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                plugin>
            plugins>
        build>
    
    project>
    
    

    此时启动SpringKafkaConsumerApplication,控制台会打印数据

      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::                (v3.0.5)
    
    消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = null,value = 1
    消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = null,value = 2
    消费者获取到消息:topic = my_topic1,partition:1,offset = 2,key = null,value = 3
    

    如果此时重新启动SpringKafkaConsumerApplication,控制台将不会打印数据,因为已经消费过数据

      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::                (v3.0.5)
    
    
  • 相关阅读:
    ADO.NET之sqlCommand对象
    【计算机网络OSI模型分层,TCP/IP模型】 --- 史上最详细的解读
    本地demo服务器搭建计划——(三)rabbitmq&配置中心config&配置自动刷新
    JVM学习五
    【温故而知新】构建高可用Linux服务器(三)
    学编程的第十八天
    python cos(x) 精确到某值 esp如0.00001
    深度学习计算 - 读写文件
    吉利银河L6顶配续航测试 记录 方便后续对比
    【linux】性能优化
  • 原文地址:https://blog.csdn.net/m0_65152767/article/details/139481096