目录
三台CentOS7都安装好zookeeper-3.7.1,可参考 CentOS7安装ZooKeeper3.7.1集群
node2 | node3 | node4 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
kafka_2.12-3.3.1.tgz
安装包[hadoop@node2 installfile]$ wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.12-3.3.1.tgz
[hadoop@node2 installfile]$ tar -zxvf kafka_2.12-3.3.1.tgz -C ~/soft
[hadoop@node2 installfile]$ cd ~/soft/ [hadoop@node2 soft]$ xsync kafka_2.12-3.3.1
$ sudo nano /etc/profile.d/my_env.sh
添加如下内容
#KAFKA_HOME export KAFKA_HOME=/home/hadoop/soft/kafka_2.12-3.3.1 export PATH=$PATH:$KAFKA_HOME/bin
让环境变量生效
$ source /etc/profile
注意:node2、node3、node4都需要设置环境变量并让环境变量生效。
切换到kafka配置目录
[hadoop@node2 soft]$ cd $KAFKA_HOME/config
修改配置文件server.properties
[hadoop@node2 config]$ vim server.properties
找到相关配置项,配置为如下
broker.id=0 advertised.listeners=PLAINTEXT://node2:9092 log.dirs=/home/hadoop/soft/kafka_2.12-3.3.1/datas zookeeper.connect=node2:2181,node3:2181,node4:2181/kafka
分发配置
[hadoop@node2 config]$ xsync server.properties
修改node3配置
[hadoop@node3 ~]$ cd $KAFKA_HOME/config [hadoop@node3 config]$ vim server.properties broker.id=1 advertised.listeners=PLAINTEXT://node3:9092
修改node4配置
[hadoop@node4 ~]$ cd $KAFKA_HOME/config [hadoop@node4 config]$ nano server.properties broker.id=2 advertised.listeners=PLAINTEXT://node4:9092
[hadoop@node2 ~]$ zkServer.sh start [hadoop@node3 ~]$ zkServer.sh start [hadoop@node4 ~]$ zkServer.sh start
[hadoop@node2 ~]$ kafka-server-start.sh -daemon /home/hadoop/soft/kafka_2.12-3.3.1/config/server.properties [hadoop@node3 ~]$ kafka-server-start.sh -daemon /home/hadoop/soft/kafka_2.12-3.3.1/config/server.properties [hadoop@node4 ~]$ kafka-server-start.sh -daemon /home/hadoop/soft/kafka_2.12-3.3.1/config/server.properties
[hadoop@node2 ~]$ jps 1623 QuorumPeerMain 2696 Kafka 2957 Jps [hadoop@node3 ~]$ jps 2743 Jps 1609 QuorumPeerMain 2538 Kafka [hadoop@node4 ~]$ jps 2739 Jps 2532 Kafka 1608 QuorumPeerMain
进入~/bin
目录,创建kf.sh
[hadoop@node2 soft]$ cd ~/bin [hadoop@node2 bin]$ vim kf.sh
kf.sh
内容如下:
- #! /bin/bash
- case $1 in
- "start"){
- for i in node2 node3 node4
- do
- echo " --------启动 $i Kafka-------"
- ssh $i "/home/hadoop/soft/kafka_2.12-3.3.1/bin/kafka-server-start.sh -daemon /home/hadoop/soft/kafka_2.12-3.3.1/config/server.properties"
- done
- };;
- "stop"){
- for i in node2 node3 node4
- do
- echo " --------停止 $i Kafka-------"
- ssh $i "/home/hadoop/soft/kafka_2.12-3.3.1/bin/kafka-server-stop.sh stop"
- done
- };;
- esac
添加权限
[hadoop@node2 bin]$ chmod u+x kf.sh
测试kf集群启动脚本
[hadoop@node2 bin]$ kf.sh start --------启动 node2 Kafka------- --------启动 node3 Kafka------- --------启动 node4 Kafka------- [hadoop@node2 bin]$ jps 2071 Jps 2041 Kafka
测试kf集群停止脚本
[hadoop@node2 bin]$ kf.sh stop --------停止 node2 Kafka------- No kafka server to stop --------停止 node3 Kafka------- No kafka server to stop --------停止 node4 Kafka------- No kafka server to stop [hadoop@node2 bin]$ jps 2121 Jps
分发脚本
为了方便其他机器也可以启动/停止kafka,分发脚本
[hadoop@node2 bin]$ xsync ~/bin
启动zk
[hadoop@node2 ~]$ zk.sh start
启动kafka
[hadoop@node2 ~]$ kf.sh start
jps查看进程
[hadoop@node2 ~]$ jps 3267 Kafka 2868 QuorumPeerMain 3354 Jps
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 3 --partitions 1 --topic topic_log
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list topic_log
[hadoop@node3 ~]$ zkCli.sh [zk: localhost:2181(CONNECTED) 0] ls -R /kafka /kafka /kafka/admin /kafka/brokers /kafka/cluster /kafka/config /kafka/consumers /kafka/controller /kafka/controller_epoch /kafka/feature /kafka/isr_change_notification /kafka/latest_producer_id_block /kafka/log_dir_event_notification /kafka/admin/delete_topics /kafka/brokers/ids /kafka/brokers/seqid /kafka/brokers/topics /kafka/brokers/ids/0 /kafka/brokers/ids/1 /kafka/brokers/ids/2 /kafka/brokers/topics/topic_log /kafka/brokers/topics/topic_log/partitions /kafka/brokers/topics/topic_log/partitions/0 /kafka/brokers/topics/topic_log/partitions/0/state /kafka/cluster/id /kafka/config/brokers /kafka/config/changes /kafka/config/clients /kafka/config/ips /kafka/config/topics /kafka/config/users /kafka/config/topics/topic_log [zk: localhost:2181(CONNECTED) 1]
[hadoop@node2 ~]$ kafka-console-producer.sh --broker-list node2:9092 --topic topic_log >hello world >hello ha >
不关闭这个终端
打开一个新的node2窗口
[hadoop@node2 ~]$ kafka-console-consumer.sh --bootstrap-server node2:9092 --from-beginning --topic topic_log hello world hello ha
--from-beginning
:会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
此时可以继续在生产者终端发送数据,发现消费者终端能继续接收数据。
分别在生产者终端和消费者终端按Ctrl+c退出并返回Linux命令行。
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --delete --topic topic_log
再次查看topic
[hadoop@node2 bin]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list __consumer_offsets
完成!enjoy it!