Kafka KRaft是自Kafka2.8起发布的,用于替代Zookeeper的一种共识服务。
即所有集群的元数据将全部存储在Kafka主题中,并全权由Kafka内部进行自主管理。
从而脱离及解除Kakfa原有对Zookeeper的性能限制及强依赖关系。
闲言少叙,我们直接进入正题。
基于实际业务场景,我们需要准备至少3台服务器
服务器名称 | 服务器IP | 服务器系统 |
---|---|---|
kraft1 | 192.0.0.1 | Centos8.2 |
kraft2 | 192.0.0.2 | Centos8.2 |
kraft3 | 192.0.0.3 | Centos8.2 |
提示:请在对应服务器下执行对应代码:
hostnamectl set-hostname kraft1
hostnamectl set-hostname kraft2
hostnamectl set-hostname kraft3
cat >> /etc/hosts << EOF
192.0.0.1 kraft1
192.0.0.2 kraft2
192.0.0.3 kraft3
EOF
# 查询已安装的jdk
rpm -qa|grep java
# 移除已安装jdk
rpm -e --nodeps (查出来带open字样的)
# 创建java目录
mkdir -p /opt/java
# 下载Open-JDK17
wget https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/openjdk-17.0.2_linux-x64_bin.tar.gz
# 解压JDK
tar -xzf /data/nfs/openjdk-17.0.2_linux-x64_bin.tar.gz -C /opt/java
# 写入/etc/profile环境变量
# 需要检查,个别服务器需要vim打开后在PATH内容后追加[:$PATH:$JAVA_HOME/bin])
cat >> /etc/profile << EOF
export JAVA_HOME=/opt/java/jdk-17.0.2
export PATH=$PATH:$JAVA_HOME/bin
EOF
# 环境变量生效
source /etc/profile
# 查看java版本
java -version
# 创建Kafka目录
mkdir -p /opt/kakfa
# 进入Kafka目录
cd /opt/kafka
# 下载Kafka
wget https://mirrors.bfsu.edu.cn/apache/kafka/3.2.0/kafka_2.13-3.2.0.tgz
# 解压Kafka
tar -xzf kafka_2.13-3.2.0.tgz -C /opt/kafka
提示:只需要在一台服务器上生成即可
# 进入kafka根目录
cd /opt/kafka/kafka_2.13-3.2.0
# 使用kafka-storage tool生成集群唯一id
./bin/kafka-storage.sh random-uuid
提示:需要将random-uuid替换为上一个命令执行生成的uuid
# 进入kafka根目录
cd /opt/kafka/kafka_2.13-3.2.0
# 格式化存储数据的目录
# ★ 3台服务器均需要执行!命令一致
./bin/kafka-storage.sh format -t random-uuid -c ./config/kraft/server.properties
# 进入kraft配置文件目录
cd /opt/kafka/kafka_2.13-3.2.0/config/kraft
# 编辑server.properties文件
vim server.properties
# ★ 修改以下内容
node.id=1
controller.quorum.voters=1@kraft1:9093
listeners=PLAINTEXT://192.0.0.1:9092,CONTROLLER://192.0.0.1:9093
advertised.listeners=PLAINTEXT://10.10.10.10:9092
参数名称 | 参数含义 | 注意事项 |
---|---|---|
node.id | Kafka服务ID | ★不允许重复,独立且唯一 |
controller.quorum.voters | Kraft集群控制代理 | ★服务ID@服务器名称/公网IP:端口;多个间用逗号隔开 |
listeners | 服务监听 | ★IP为内网IP,防火墙需要开通9093端口 |
advertised.listeners | 广播服务监听 | ★IP为公网IP,防火墙需要开通9092端口 |
提示:启动时,注意修改server.properties的controller.quorum.voters的参数,顺序及修改内容如下:
序列 | 服务器IP | 参数设置 |
---|---|---|
1 | 192.0.0.1 | 1@kraft1:9093 |
2 | 192.0.0.2 | 1@kraft:9093, 2@kraft2:9093 |
3 | 192.0.0.3 | 1@kraft:9093, 2@kraft2:9093,3@kraft3:9093 |
# 进入kafka根目录
cd /opt/kafka/kafka_2.13-3.2.0/
# 依序启动服务
./bin/kafka-server-start.sh -deamon /opt/kafka/kafka_2.13-3.2.0/config/kraft/server.properties
提示:如果需要验证集群效果,创建/查看,生产/消费可以在不同服务器间进行测试
# 进入kafka根目录
cd /opt/kafka/kafka_2.13-3.2.0/
# 创建topic
./bin/kafka-topics.sh --bootstrap-server kraft1:9092 --create --topic test1 --partitions 3 --replication-factor 2
# 查看topic
./bin/kafka-topics.sh --bootstrap-server kraft1:9092 --list
# 启动生产者
./kafka-console-producer.sh --broker-list kraft1:9092 --topic test1
# 启动消费者
./bin/kafka-console-consumer.sh --bootstrap-server kraft1:9092 --topic test1
在实际应用环境中,我们都需要对服务进行鉴权管控,以免被他人盗用造成不必要损失
当然,相关配置也会相较繁琐一点,Kafka的鉴权方式有很多种,本文仅讲解SASL方式的具体实现
★提示:Djava.security.auth.login.config文件目录需要改为你的kafka解压目录
# 进入kafka/bin脚本目录
cd /opt/kafka/kafka_2.13-3.2.0/bin
# 复制kafka服务端启动脚本
cp kafka-server-start.sh kafka-server-start-sasl.sh
# 修改脚本内容
vim kafka-server-start-sasl.sh
# ★ 将export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改为:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka/kafka_2.13-3.2.0/config/kafka_server_jaas.conf"
★提示:Djava.security.auth.login.config文件目录需要改为你的kafka解压目录
# 进入kafka/bin脚本目录
cd /opt/kafka/kafka_2.13-3.2.0/bin
# 修改脚本内容
vim kafka-console-consumer.sh
# ★ 将export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改为:
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka/kafka_2.13-3.2.0/config/kafka_client_jaas.conf"
★提示:Djava.security.auth.login.config文件目录需要改为你的kafka解压目录
# 进入kafka/bin脚本目录
cd /opt/kafka/kafka_2.13-3.2.0/bin
# 修改脚本内容
vim kafka-console-producer.sh
# ★ 将export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改为:
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka/kafka_2.13-3.2.0/config/kafka_client_jaas.conf"
★提示:配置文件中内容要顶头写,且最后两个分号[;]很关键,不要随便删掉!
# 进入kafka/config目录
cd /opt/kafka/kafka_2.13-3.2.0/config
# 创建服务端鉴权配置文件
touch kafka_server_jaas.conf
# 编辑内容
kafka_server_jaas.conf
###
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="kafka"
username="admin"
password="admin"
user_admin="admin";
};
###
★提示:配置文件中内容要顶头写,且最后两个分号[;]很关键,不要随便删掉!
# 进入kafka/config目录
cd /opt/kafka/kafka_2.13-3.2.0/config
# 创建服务端鉴权配置文件
touch kafka_client_jaas.conf
# 编辑内容
kafka_client_jaas.conf
###
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
###
★提示:username及password为9.2.1章节配置一致
# 进入kafka/config目录
cd /opt/kafka/kafka_2.13-3.2.0/config
# 创建command_config文件
touch command_config
# 编辑command_config内容
vim command_config
###
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
###
★提示:username及password为9.2.1章节配置一致
# 进入kafka/config目录
cd /opt/kafka/kafka_2.13-3.2.0/config
# 编辑consumer.properties内容
vim consumer.properties
###
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
###
★提示:username及password为9.2.1章节配置一致
# 进入kafka/config目录
cd /opt/kafka/kafka_2.13-3.2.0/config
# 编辑producer.properties内容
vim producer.properties
###
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
###
★提示:注意,鉴权模式与简单模式不一致:SASL_PLAINTEXT
# 进入kraft/config目录
cd /opt/kafka/kafka_2.13-3.2.0/config/kraft
# 编辑server.properties文件
vim server.properties
# ★ 修改以下内容
node.id=1
controller.quorum.voters=1@kraft1:9093
listeners=SASL_PLAINTEXT://192.0.0.1:9092,CONTROLLER://192.0.0.1:9093
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://10.10.10.10:9092
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
参数名称 | 参数含义 | 注意事项 |
---|---|---|
node.id | Kafka服务ID | ★不允许重复,独立且唯一 |
controller.quorum.voters | Kraft集群控制代理 | ★服务ID@服务器名称/公网IP:端口;多个间用逗号隔开 |
listeners | 服务监听 | ★IP为内网IP,防火墙需要开通9093端口 |
advertised.listeners | 广播服务监听 | ★IP为公网IP,防火墙需要开通9092端口 |
sasl.enabled.mechanisms | sasl鉴权机制 | ★账号密码形式:PLAIN |
sasl.mechanism.inter.broker.protocol | broker鉴权规则 | ★账号密码形式:PLAIN |
提示1:启动脚本与简单模式不同,注意不要混淆:kafka-server-start-sasl.sh
提示2:启动时,注意修改server.properties的controller.quorum.voters的参数,顺序及修改内容如下:
序列 | 服务器IP | 参数设置 |
---|---|---|
1 | 192.0.0.1 | 1@kraft1:9093 |
2 | 192.0.0.2 | 1@kraft:9093, 2@kraft2:9093 |
3 | 192.0.0.3 | 1@kraft:9093, 2@kraft2:9093,3@kraft3:9093 |
# 进入kafka根目录
cd /opt/kafka/kafka_2.13-3.2.0/
# 依序启动服务
./bin/kafka-server-start-sasl.sh -deamon /opt/kafka/kafka_2.13-3.2.0/config/kraft/server.properties
提示:如果需要验证集群效果,创建/查看,生产/消费可以在不同服务器间进行测试
# 进入kafka根目录
cd /opt/kafka/kafka_2.13-3.2.0/
# 创建topic
./bin/kafka-topics.sh --bootstrap-server kraft1:9092 --create --topic test1 --partitions 3 --replication-factor 2 --command-config /opt/kafka/kafka_2.13-3.2.0/config/command_config
# 查看topic
./bin/kafka-topics.sh --bootstrap-server kraft1:9092 --list --command-config /opt/kafka/kafka_2.13-3.2.0/config/command_config
# 启动生产者
./kafka-console-producer.sh --broker-list kraft1:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
# 启动消费者
./bin/kafka-console-consumer.sh --bootstrap-server kraft1:9092 --topic test1 --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN --from-beginning
Thanks for your watching,本文所涉及的配置文件将于几日后整理并上传,届时欢迎各位需要的小伙伴取用,其他更多保姆式文档正在逐步完善中,敬请期待。