• 【数仓】kafka软件安装及集群配置


    相关文章

    一、环境准备

    准备3台虚拟机

    • Hadoop131:192.168.56.131
    • Hadoop132:192.168.56.132
    • Hadoop133:192.168.56.133

    本例系统版本 CentOS-7.8,已安装jdk1.8

    关闭防火墙

    systemctl stop firewalld
    
    • 1

    zookeeper 已安装,且已启动

    二、kafka安装配置

    1、kafka下载安装

    # 下载解压
    wget --no-check-certificate https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
    tar -xzvf kafka_2.13-3.7.0.tgz
    mv kafka_2.13-3.7.0/ /data/kafka/
    
    • 1
    • 2
    • 3
    • 4

    2、配置环境变量

    新增环境变量文件

    vi /etc/profile.d/kafka_env.sh

    export KAFKA_HOME=/data/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    
    • 1
    • 2

    使用source让新增环境生效

    source /etc/profile
    
    • 1

    配置完成后分发到其他服务器

    xsync.sh /etc/profile.d/kafka_env.sh
    
    • 1

    3、配置Kafka服务器属性

    Kafka集群的配置涉及多个方面,包括broker、Zookeeper、producer和consumer的配置。不过,通常我们主要关注的是broker和Zookeeper的配置,因为它们是构成Kafka集群的基础。

    Broker配置

    Broker的配置主要在Kafka安装目录下的config/server.properties文件中进行。以下是一些关键的配置项:

    1. broker.id:每个Kafka broker都需要一个唯一的标识符,即broker.id。在集群中,每个broker的ID必须是唯一的。

    2. listeners:监听的网络端口和协议,用于客户端和其他brokers的连接。例如,listeners=PLAINTEXT://:9092表示监听本机9092端口,使用PLAINTEXT协议。

    3. log.dirs:Kafka存储数据的目录。可以配置多个目录以实现磁盘的负载均衡。例如,log.dirs=/data/kafka-logs

    4. num.partitions:创建新topic时的默认分区数。这个配置也可以在创建topic时通过命令行参数指定。

    5. offsets.topic.replication.factor:创建新topic时的默认副本因子。这个配置决定了topic的副本数,即数据在不同broker上的复制份数。

    6. zookeeper.connect:指定ZooKeeper集群的地址和端口。例如,zookeeper.connect=localhost:2181表示连接本机的Zookeeper实例,端口为2181。如果是ZooKeeper集群,可以配置多个地址,用逗号分隔。

    vi /data/kafka/config/server.properties
    
    • 1

    主要配置参数如下:

    #broker 的全局唯一编号,不能重复,只能是数字。broker.id=0
    #broker 对外暴露的 IP 和端口 (每个节点单独配置)
    advertised.listeners=PLAINTEXT://hadoop131:9092
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/data/kafka/datas
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    #每个 topic 创建时的副本数,默认时1个副本
    offsets.topic.replication.factor=l
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    #检查过期数据的时间,默认5分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
    zookeeper.connect=hadoop131:2181,hadoop132:2181,hadoop133:2181/kafka
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    4、同步kafka到其他服务器

    1)同步kafka到其他两台服务器 hadoop132、hadoop133

    xsync.sh /data/kafka/
    
    • 1

    2)分别修改 hadoop132、hadoop133 上面的 broker.idadvertised.listeners

    5、启动kafka

    增加集群启动脚本

    1)创建文件kafka.sh

    vi /usr/bin/kafka.sh
    # 修改文件权限
    chmod 777 /usr/bin/kafka.sh
    
    • 1
    • 2
    • 3

    2)复制如下内容

    #!/bin/bash
    
    #1. 判断参数个数
    if [ $# -lt 1 ]
    then
        echo Not Enough Arguement!
        exit;
    fi
    
    case $1 in
    "start")
    	#遍历集群所有机器
    	for host in hadoop131 hadoop132 hadoop133
    	do
    		echo --------------------  $host kafka 启动 --------------------
    		ssh $host "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"
    	done
    ;;
    "stop")
    	#遍历集群所有机器
    	for host in hadoop131 hadoop132 hadoop133
    	do
    		echo --------------------  $host kafka 停止 --------------------
    		ssh $host "/data/kafka/bin/kafka-server-stop.sh"
    	done
    ;;
    *)
    	echo "Input Args Error..."
    ;;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3)通过集群脚本 kafka.sh 操作

    kafka.sh start
    
    • 1
    -------------------- hadoop131 kafka 启动 --------------------
    -------------------- hadoop132 kafka 启动 --------------------
    -------------------- hadoop133 kafka 启动 --------------------
    
    • 1
    • 2
    • 3

    使用xcall.sh jps -m查看进程,可以看见kafka进程已经启动

    ------ hadoop131 jps ------
    4548 QuorumPeerMain
    5781 Jps
    5673 Kafka
    ------ hadoop132 jps ------
    5560 Kafka
    4458 QuorumPeerMain
    5659 Jps
    ------ hadoop133 jps ------
    5570 Kafka
    5668 Jps
    4463 QuorumPeerMain
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    6、通过命令测试kafka

    1)发送消息

    bin/kafka-console-producer.sh \
        --broker-list localhost:9092 \
        --topic my.kafka.broadcast
    	
    # 或
    
    bin/kafka-console-producer.sh \
        --bootstrap-server localhost:9092 \
        --topic my.kafka.broadcast
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    --broker-list :<端口>:指定Kafka broker的地址和端口。与--bootstrap-server参数类似,但在新版本的Kafka中,更推荐使用--bootstrap-server
    --topic :指定要发送消息的topic名称。

    在你执行这个命令后,终端会等待你输入消息。你可以在终端中输入一行文本,然后按下Enter键来发送消息。
    要结束消息发送,你可以通过按下Ctrl+D(在大多数Unix系统中)或Ctrl+Z(在Windows系统中)来终止输入。

    2)接收消息

    bin/kafka-console-consumer.sh \
        --bootstrap-server localhost:9092 \
        --topic my.kafka.broadcast \
        --from-beginning
    
    • 1
    • 2
    • 3
    • 4

    --bootstrap-server :<端口>:指定Kafka broker的地址和端口。你可以使用逗号分隔多个broker地址,例如broker1:9092,broker2:9092。
    --topic :指定要消费的topic名称。
    --from-beginning:从topic的起始位置开始消费。如果不指定该参数,消费者将从最新的消息开始消费。
    --consumer.config <消费者配置文件>:可选参数,用于指定消费者配置文件。你可以在配置文件中设置各种消费者属性,如消费者组ID、自动提交偏移量等。如果你不需要使用消费者组或自定义配置,可以忽略此参数。

    执行上述命令后,你将在终端上看到从指定topic接收到的消息。
    请注意,该命令将一直运行,直到你手动停止它(通常通过按下Ctrl+C来终止)

    3)查询topic列表

    bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    
    • 1

    三、kafka常见配置项

    Apache Kafka是一个流行的分布式事件流处理平台,它有很多配置项,用于优化和调整其性能、可靠性和安全性。以下是一些Kafka中常见的配置项:

    Broker配置

    1. broker.id:每个Kafka broker都需要一个唯一的标识符,即broker.id
    2. log.dirs:Kafka存储数据的目录,可以配置多个目录以实现磁盘的负载均衡。
    3. listeners:监听的网络端口和协议,用于客户端和其他brokers的连接。
    4. num.partitions:创建新topic时的默认分区数。
    5. default.replication.factor:创建新topic时的默认副本因子。
    6. auto.create.topics.enable:是否允许自动创建topic。
    7. log.retention.hourslog.retention.byteslog.retention.minuteslog.retention.ms:控制日志数据的保留策略。
    8. log.segment.bytes:控制日志段的大小。
    9. zookeeper.connect:指定ZooKeeper集群的地址。

    Producer配置

    1. bootstrap.servers:Kafka集群的地址列表。
    2. key.serializer:用于序列化key的类。
    3. value.serializer:用于序列化value的类。
    4. acks:控制发送消息的确认机制。
    5. retries:发送失败时的重试次数。
    6. batch.size:控制批量发送的大小。
    7. linger.ms:发送批量消息前的等待时间。
    8. buffer.memory:生产者用于缓存的内存大小。

    Consumer配置

    1. group.id:消费者组的标识符。
    2. bootstrap.servers:Kafka集群的地址列表。
    3. key.deserializer:用于反序列化key的类。
    4. value.deserializer:用于反序列化value的类。
    5. auto.offset.reset:当没有初始化的offset或offset不再存在时,应该做什么。
    6. enable.auto.commit:是否允许自动提交offset。
    7. fetch.min.bytesfetch.max.bytes:控制从broker获取数据的最小和最大字节数。
    8. max.poll.records:每次poll操作返回的最大记录数。

    以上只是Kafka配置的一部分,实际上Kafka的配置项非常多,可以根据具体的需求和场景进行调整。具体的配置项和使用方法可以参考Kafka的官方文档。

    请注意,配置项的默认值可能会随着Kafka版本的更新而发生变化,因此建议查阅对应版本的官方文档以获取最准确的信息。

    参考

  • 相关阅读:
    【Notes】互联网架构之Netty4.X
    Mybatis 使用参数时$与#的区别
    系统学习SpringFramework:SpringBean的生命周期
    transformers - 预测中间词
    我在VScode学Java(Java方法method)
    【无标题】
    基于HASM模型的土壤高精度建模matlab仿真
    一款免费的中英文文本翻译的api接口
    11 - Spring AOP介绍与使用2 - 简单配置
    代码随想录训练营二刷第四十六天 | 完全背包 518. 零钱兑换 II 377. 组合总和 Ⅳ
  • 原文地址:https://blog.csdn.net/wlddhj/article/details/136411748