• centos7 Kafka安装


    一、安装 JDK

    1.安装jdk

    2.配置环境变量

    vim /etc/profile
    
    • 1

    在这里插入图片描述
    在最后面添加:

    export JAVA_HOME=/app/jdk-12.0.2
    export JRE_HOME=${JAVA_HOME}/jre
    export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export PATH=$PATH:$JAVA_HOME/bin
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    3.执行profile文件:

    source /etc/profile
    
    • 1

    这样可以使配置文件立即生效

    二、安装kafka

    1. 下载kafka

    https://kafka.apache.org/downloads

    在这里插入图片描述

    2. 启动zookeeper

    Kafka依赖于ZooKeeper,所以您需要先启动一个ZooKeeper服务器。如果没有安装,您可以使用随Kafka一起打包的便捷脚本来获取一个快速但是比较粗糙的单节点ZooKeeper实例。

    • zookeeper配置是 config目录下的zookeeper.properties,默认端口 2181
    vim /data/kafka_2.12-2.8.0/config/zookeeper.properties
    
    • 1
    • 启动命令:nohup /data/kafka_2.12-2.8.0/bin/zookeeper-server-start.sh /data/kafka_2.12-2.8.0/config/zookeeper.properties &,启动后可后台运行zookeeper
    nohup /data/kafka_2.12-2.8.0/bin/zookeeper-server-start.sh   /data/kafka_2.12-2.8.0/config/zookeeper.properties &
    
    • 1
    • 命令:ps -ef | grep zookeeper 查看zookeeper是否启动成功
    #查看进程
    ps -ef | grep zookeeper
    #查看端口
    lsof -i:2181
    netstat -antp | grep 2181
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    • 查看日志
    tail -100f /data/kafka_2.12-2.8.0/logs/server.log
    
    • 1

    3. 启动kafka

    1. 在config目录下提供了kafka的配置文件server.properties。为了保证可以远程访问Kafka,我们需要修改两处配置。
    vim /data/kafka_2.12-2.8.0/config/server.properties
    
    • 1
    # 去掉注释
    listeners=PLAINTEXT://:9092
    # 去掉注释并修改地址
    advertised.listeners=PLAINTEXT://127.0.0.1:9092 
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    1. 启动Kafka:
    nohup /data/kafka_2.12-2.8.0/bin/kafka-server-start.sh /data/kafka_2.12-2.8.0/config/server.properties   &
    
    • 1
    1. 守护进程方式启动kafka:
    /data/kafka_2.12-2.8.0/bin/kafka-server-start.sh -daemon /data/kafka_2.12-2.8.0/config/server.properties
    
    • 1
    1. 命令:ps -ef | grep kafka 查看kafka是否启动成功
    #查看进程
    ps -ef | grep kafka
    #查看端口
    lsof -i:9092 
    netstat -antp | grep 9092 
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    1. 查看日志
    tail -100f /data/kafka_2.12-2.8.0/logs/kafkaServer.out
    
    • 1

    4. 测试kafka

    1).创建 Topic

    /data/kafka_2.12-2.8.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    
    • 1

    在这里插入图片描述

    2).查看 topic 列表

     # 返回上面创建的 test
    /data/kafka_2.12-2.8.0/bin/kafka-topics.sh --list --zookeeper localhost:2181  
    
    • 1
    • 2

    在这里插入图片描述

    3).查看描述 topics 信息

    /data/kafka_2.12-2.8.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    
    • 1

    在这里插入图片描述

    4).启动生产者(窗口不要关闭)

    /data/kafka_2.12-2.8.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    • 1

    在这里插入图片描述

    5).启动消费者(窗口不要关闭)

    /data/kafka_2.12-2.8.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    
    • 1

    在这里插入图片描述

    6).在生产者窗口输入内容,消费者端可查看到输入的内容

    (1).生产端:

    {"id":"1"}
    
    • 1

    在这里插入图片描述

    (2).消费端:

    在这里插入图片描述

    7).删除topic及数据

    /data/kafka_2.12-2.8.0/bin/kafka-run-class.sh kafka.admin.TopicCommand --delete --topic ProjectBaseInfo --zookeeper localhost:2181
    
    • 1

    在这里插入图片描述

    三、kafka查看消费数据

    1.查看所有组

    要想查询消费数据,必须要指定组。那么线上运行的kafka有哪些组呢?使用以下命令:

    /data/kafka_2.12-2.8.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
    
    • 1

    在这里插入图片描述

    2.查看消费情况

    /data/kafka_2.12-2.8.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test
    
    • 1

    参数解释:

    • –describe 显示详细信息
    • –bootstrap-server 指定kafka连接地址
    • –group 指定组。

    注意: --group指定的组必须存在才行!可以用上面的–list命令来查看

    在这里插入图片描述

    TOPIC
    PARTITION
    CURRENT-OFFSET
    LOG-END-OFFSET
    LAG
    CONSUMER-ID
    HOST
    CLIENT-ID
    topic名字分区id当前已消费的条数总条数未消费的条数消费id主机ip客户端id

    三、kafka其它错误

    1【Kafka】.org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 291020532 larger than 104857600)

    在这里插入图片描述
    将$KAFKA_HOME/config/server.properties文件中的socket.request.max.bytes值重置为超过数据包大小,然后重新启动kafka服务器。将socket.request.max.bytes翻倍。

    vim /data/kafka_2.12-2.8.0/config/server.properties
    
    • 1

    2. org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

    1).首先是server.properties,加入在服务端的配置文件server.properties加上的message.max.bytes配置

    vim /data/kafka_2.12-2.8.0/config/server.properties
    message.max.bytes=2073741824
    
    • 1
    • 2

    在这里插入图片描述

    对于已创建的topic,调整max.message.bytes参数方法:

    /data/kafka_2.12-2.8.0/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test  --config max.message.bytes=2073741824
    
    
    • 1
    • 2

    2). 要在配置kafka连接(在生产者端配置)时,加入配置max.request.size即可,如下

    在这里插入图片描述

    但是需要注意的是,在这里配置的值应该小于服务端配置的最大值,否则报如下错误

    org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
    
    • 1

    max.request.size必须小于message.max.bytes

    3. kafka 出现Java heap space的解决方法

    1.前言

    kafka是由scala和java编写的。所以需要调一些jvm的参数。java的内存分为堆内内存和堆外内存。

    • -Xms2048m, -Xmx2048m,设置的是堆内内存。-Xms是初始可用的最大堆内内存。-Xmx设置的是最大可用的堆内内存。二者设置成一样是因为效率问题,可以让jvm少做一些运算。如果这两个参数设置的太小,kafka会出现java.lang.OutOfMemoryError:
      Java heap space的错误。
    • -XX:MaxDirectMemorySize=5120m。这个参数配置的太小,kafka会出现java.lang.OutOfMemoryError:
      Direct buffer memory的错误。

    -XX:MaxDirectMemorySize加大,该参数默认是64M,可以根据需求调大,
    如果没找到,就在KAFKA_JVM_PERFORMANCE_OPTS添加,
    并且检查JVM参数里面有无:-XX:+DisableExplicitGC,如果有就去掉。

    在这里插入图片描述

    因为kafka的网络IO使用了java的nio中的DirectMemory的方式,而这个申请的是堆外内存。

    2.解决方案

    bin目录下的kafka-run-class.sh中需要配置的参数,找到 Memory options处,默认设置是256M,将其修改为如下值:
    修改前:
    在这里插入图片描述

    vim /data/kafka_2.12-2.8.0/bin/kafka-run-class.sh
    
    -Xms2048m  -Xmx2048m -XX:MaxDirectMemorySize=5120m
    
    • 1
    • 2
    • 3

    修改后:
    在这里插入图片描述

  • 相关阅读:
    C语言典范编程
    我的Vue之旅 10 Gin重写后端、实现页面详情页 Mysql + Golang + Gin
    英伟达Nvidia论坛
    Flink--2、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
    【周末闲谈】什么是云计算?
    配合AI刷leetcode 实现1170
    Java · 面向对象编程 · 包的概念 · 继承 · 组合
    Python gRPC 生成代码参考
    《逆向工程核心原理》学习笔记(六):高级逆向分析技术
    java学习之mybatis-plus
  • 原文地址:https://blog.csdn.net/q908544703/article/details/126266827