• 5、Kafka集成 SpringBoot


    SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以
    用于 SpringBoot 的消费者。
    在这里插入图片描述
    1)在 IDEA 中安装 lombok 插件
    在 Plugins 下搜索 lombok 然后在线安装即可,安装后注意重启
    2)SpringBoot 环境准备
    (1)创建一个 Spring Initializr
    注意:有时候 SpringBoot 官方脚手架不稳定,我们切换国内地址 https://start.aliyun.com
    (2)项目名称 springboot
    (3)添加项目依赖
    (4)检查自动生成的配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
    https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
     <version>2.6.1</version>
     <relativePath/> <!-- lookup parent from repository -->
     </parent>
     <groupId>com.atguigu</groupId>
     <artifactId>springboot</artifactId>
     <version>0.0.1-SNAPSHOT</version>
     <name>springboot</name>
     <description>Demo project for Spring Boot</description>
     <properties>
     <java.version>1.8</java.version>
     </properties>
     <dependencies>
     <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
    <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
     </dependency>
     <dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     <optional>true</optional>
     </dependency>
     <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-test</artifactId>
     <scope>test</scope>
     </dependency>
     <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka-test</artifactId>
     <scope>test</scope>
     </dependency>
     </dependencies>
     <build>
     <plugins>
     <plugin>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-maven-plugin</artifactId>
     <configuration>
     <excludes>
     <exclude>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     </exclude>
     </excludes>
     </configuration>
     </plugin>
     </plugins>
     </build>
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    3.1 SpringBoot 生产者
    (1)修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息

    # 应用名称
    spring.application.name=atguigu_springboot_kafka
    # 指定 kafka 的地址
    spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
    #指定 key 和 value 的序列化器
    spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    (2)创建 controller 从浏览器接收数据, 并写入指定的 topic

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    @RestController
    public class ProducerController {
     
     // Kafka 模板用来向 kafka 发送数据
     @Autowired
     KafkaTemplate<String, String> kafka;
     
     @RequestMapping("/prince")
     public String data(String msg) {
     kafka.send("first", msg);
     return "ok";
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    3.2 SpringBoot 消费者
    (1)修改 SpringBoot 核心配置文件 application.propeties

    # =========消费者配置开始=========
    # 指定 kafka 的地址
    spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
    # 指定 key 和 value 的反序列化器
    spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserial
    izer
    spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserial
    izer
    #指定消费者组的 group_id
    spring.kafka.consumer.group-id=prince
    # =========消费者配置结束=========
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    (2)创建类消费 Kafka 中指定 topic 的数据

    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.KafkaListener;
    @Configuration
    public class KafkaConsumer {
     
     // 指定要监听的 topic
     @KafkaListener(topics = "first")
     public void consumeTopic(String msg) { // 参数: 收到的 value
     System.out.println("收到的信息: " + msg);
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    (3)向 first 主题发送数据

    [hadoop102 kafka]$ bin/kafka-console-producer.sh --
    bootstrap-server hadoop102:9092 --topic first
    >
    
    • 1
    • 2
    • 3
  • 相关阅读:
    【毕业设计】基于单片机的智慧农业管理系统 -大棚管理系统 自动灌溉系统
    使用fastlane match自动化管理证书和描述文件
    代码随想录day32|122.买卖股票的最佳时机II|55. 跳跃游戏|45.跳跃游戏II|Golang
    js中getBoundingClientRect()方法详解
    如何进行日期和时间的计算和操作?
    二合一的集度,任重道远
    基于SpringBoot的电子招标投标管理系统
    推荐系统_各种方法的损失计算过程
    FNN、DeepFM与NFM
    从简历被拒到收割8个大厂offer,我只用了三个月的时间成功逆袭
  • 原文地址:https://blog.csdn.net/weixin_45817985/article/details/133941705