• Kafka的安装及接入SpringBoot


    环境:windows、jdk1.8、springboot2

    Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/

    1.概述

            Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目。它设计用于处理大规模的实时数据流,具有高吞吐量、低延迟、持久性等特点,被广泛应用于构建实时数据管道、日志收集、事件驱动架构等场景。

            详细概述见Kafka概述:

    1.1 Kafka的作用

    • 发布和订阅记录流
    • 持久存储记录流,Kafka中的数据即使消费后也不会消失
    • 在系统或应用之间构建可靠获取数据的实时流数据管道
    • 构建转换或响应数据流的实时流应用程序
    • Kafka可以处理源源不断产生的数据

    1.2 Kafka的一些概念

    • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic 就是Rabbitmq中的queue)

    • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

    • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

    • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

    2.Kafka下载安装

    Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/downloads        选择最新版就可以

    2.1 配置kafka

            解压下载的文件,修改 config 文件夹下的 zookeeper.properties

            修改 config 文件夹下的 server.properties

            当需要外网访问时要配置advertised.listeners(比如连云服务器的kafka)

    advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

     

    2.2 启动 zookeeper

            Zookeeper 在 Kafka 中充当了分布式协调服务的角色,帮助 Kafka 实现了集群管理、元数据存储、故障恢复、领导者选举等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撑。

            kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

    zookeeper-server-start.bat ../../config/zookeeper.properties

            可以本地访问看一下:http://localhost:2181/ 

    2.3 启动Kafka 

            kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

    kafka-server-start.sh ../../config/server.properties

            访问路径: http://localhost:9092/ 

    2.4 便捷启动脚本

            两个脚本放到Kafka的目录(kafka_2.13-3.7.0)中

    cd bin\windows

    zookeeper-server-start.bat ../../config/zookeeper.properties

    cd bin\windows

    kafka-server-start.bat ../../config/server.properties

    3.springboot集成Kafka

    3.1 环境搭建

    (1)添加pom依赖

    1.    org.springframework.boot
    2.    spring-boot-starter-parent
    3.    2.2.8.RELEASE
    4.    1.2.58
    5.    
    6.        org.springframework.boot
    7.        spring-boot-starter-web
    8.    
    9.    
    10.    
    11.        org.springframework.kafka
    12.        spring-kafka
    13.    
    14.    
    15.        com.alibaba
    16.        fastjson
    17.        ${fastjson.version}
    18.    
    19.    
    20.        org.springframework.boot
    21.        spring-boot-starter-test
    22.    

    (2)配置类application.yml

            生产者:

    1. spring:
    2. kafka:
    3. bootstrap-servers: xxx.xxx.xxx.xxx:9092
    4. producer:
    5. retries: 0
    6. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    7. value-serializer: org.apache.kafka.common.serialization.StringSerializer

            消费者:

    1. spring:
    2. kafka:
    3.   bootstrap-servers: xxx.xxx.xxx.xxx:9092
    4.   consumer:
    5.     group-id: kafka-demo-kafka-group
    6.     key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    7.     value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    (3)启动类

    1. import org.springframework.boot.SpringApplication;
    2. import org.springframework.boot.autoconfigure.SpringBootApplication;
    3. @SpringBootApplication
    4. public class KafkaApp {
    5.    public static void main(String[] args) {
    6.        SpringApplication.run(KafkaApp.class, args);
    7.   }
    8. }

    3.2 消息生产者

            junit测试,新建消息发送方

    1. import org.junit.Test;
    2. import org.junit.runner.RunWith;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.boot.test.context.SpringBootTest;
    5. import org.springframework.kafka.core.KafkaTemplate;
    6. import org.springframework.test.context.junit4.SpringRunner;
    7. @RunWith(SpringRunner.class)
    8. @SpringBootTest
    9. public class KafkaSendTest {
    10.    @Autowired
    11.    private KafkaTemplate kafkaTemplate; //如果这里有红色波浪线,那是假错误
    12.    @Test
    13.    public void sendMsg(){
    14.        String topic = "spring_test";
    15.        kafkaTemplate.send(topic,"hello spring boot kafka!");
    16.        System.out.println("发送成功.");
    17.        while (true){ //保存加载ioc容器
    18.       }
    19.   }
    20. }

    3.3 消息消费者

            新建监听类:

    1. import org.apache.kafka.clients.consumer.ConsumerRecord;
    2. import org.springframework.kafka.annotation.KafkaListener;
    3. import org.springframework.stereotype.Component;
    4. @Component
    5. public class MyKafkaListener {
    6. //   以下两种方法都行
    7.    
    8. // 指定监听的主题
    9. //   @KafkaListener(topics = "spring_test")
    10. //   public void receiveMsg(String message){
    11. //       System.out.println("接收到的消息:"+message);
    12. //   }
    13.    @KafkaListener(topics = "spring_test")
    14.    public void handleMessage(ConsumerRecord record) {
    15.        System.out.println("接收到消息,偏移量为: " + record.offset() + " 消息为: " + record.value());
    16.   }
    17. }
  • 相关阅读:
    129页4万字某智慧能源集团数字化管控平台项目 建设方案
    负载开关、高侧开关、低侧开关等
    【解救ROS】ros小车机器人摄像头寻线的实现(基于opencv)
    Cython加密python代码防止反编译
    c++ SFML 连接ftp
    Kafka之日志存储详解
    配置windows限时登录
    Navicat访问宝塔中的MySQL
    编程语言除了面向过程和面向对象还有什么
    C++ fstream类移动读写指针和字节数形式获取该指针位置(seekp、seekg、tellg、tellp)
  • 原文地址:https://blog.csdn.net/LB_bei/article/details/138698863