• CentOS7安装Kafka_2.12-3.3.1集群及使用


    目录

    前提条件

    集群规划

    安装步骤

    下载kafka_2.12-3.3.1.tgz安装包

    解压

    同步至其他机器

    配置环境变量(所有机器都执行)

    配置kafka

    启动zk集群(三台都执行)

    启动kafka(三台都执行)

    查看进程(三台都执行)

    Kafka集群启动停止脚本

    Kafka常用命令

    启动集群

    创建topic

    查看topic

    查看zk里的topic节点

    kafka生产消息

    kafka消费消息

    删除topic


    前提条件

    三台CentOS7都安装好zookeeper-3.7.1,可参考 CentOS7安装ZooKeeper3.7.1集群

    集群规划

    node2node3node4
    zkzkzk
    kafkakafkakafka

    安装步骤

    下载kafka_2.12-3.3.1.tgz安装包

    [hadoop@node2 installfile]$ wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.12-3.3.1.tgz

    解压

    [hadoop@node2 installfile]$ tar -zxvf kafka_2.12-3.3.1.tgz -C ~/soft

    同步至其他机器

    [hadoop@node2 installfile]$ cd ~/soft/
    [hadoop@node2 soft]$ xsync kafka_2.12-3.3.1

    配置环境变量(所有机器都执行)

    $ sudo nano /etc/profile.d/my_env.sh

    添加如下内容

    #KAFKA_HOME
    export KAFKA_HOME=/home/hadoop/soft/kafka_2.12-3.3.1
    export PATH=$PATH:$KAFKA_HOME/bin

    让环境变量生效

    $ source /etc/profile

    注意:node2、node3、node4都需要设置环境变量并让环境变量生效。

    配置kafka

    切换到kafka配置目录

    [hadoop@node2 soft]$ cd $KAFKA_HOME/config

    修改配置文件server.properties

    [hadoop@node2 config]$ vim server.properties

    找到相关配置项,配置为如下

    broker.id=0
    advertised.listeners=PLAINTEXT://node2:9092
    log.dirs=/home/hadoop/soft/kafka_2.12-3.3.1/datas
    zookeeper.connect=node2:2181,node3:2181,node4:2181/kafka

    分发配置

    [hadoop@node2 config]$ xsync server.properties 

    修改node3配置

    [hadoop@node3 ~]$ cd $KAFKA_HOME/config
    [hadoop@node3 config]$ vim server.properties
    broker.id=1
    advertised.listeners=PLAINTEXT://node3:9092

    修改node4配置

    [hadoop@node4 ~]$ cd $KAFKA_HOME/config
    [hadoop@node4 config]$ nano server.properties
    broker.id=2
    advertised.listeners=PLAINTEXT://node4:9092

    启动zk集群(三台都执行)

    [hadoop@node2 ~]$ zkServer.sh start
    [hadoop@node3 ~]$ zkServer.sh start
    [hadoop@node4 ~]$ zkServer.sh start

    启动kafka(三台都执行)

    [hadoop@node2 ~]$ kafka-server-start.sh -daemon /home/hadoop/soft/kafka_2.12-3.3.1/config/server.properties
    [hadoop@node3 ~]$ kafka-server-start.sh -daemon /home/hadoop/soft/kafka_2.12-3.3.1/config/server.properties
    [hadoop@node4 ~]$ kafka-server-start.sh -daemon /home/hadoop/soft/kafka_2.12-3.3.1/config/server.properties

    查看进程(三台都执行)

    [hadoop@node2 ~]$ jps
    1623 QuorumPeerMain
    2696 Kafka
    2957 Jps
    ​
    [hadoop@node3 ~]$ jps
    2743 Jps
    1609 QuorumPeerMain
    2538 Kafka
    ​
    [hadoop@node4 ~]$ jps
    2739 Jps
    2532 Kafka
    1608 QuorumPeerMain
    ​

    Kafka集群启动停止脚本

    进入~/bin目录,创建kf.sh

    [hadoop@node2 soft]$ cd ~/bin
    [hadoop@node2 bin]$ vim kf.sh

    kf.sh内容如下:

    1. #! /bin/bash
    2. case $1 in
    3. "start"){
    4. for i in node2 node3 node4
    5. do
    6. echo " --------启动 $i Kafka-------"
    7. ssh $i "/home/hadoop/soft/kafka_2.12-3.3.1/bin/kafka-server-start.sh -daemon /home/hadoop/soft/kafka_2.12-3.3.1/config/server.properties"
    8. done
    9. };;
    10. "stop"){
    11. for i in node2 node3 node4
    12. do
    13. echo " --------停止 $i Kafka-------"
    14. ssh $i "/home/hadoop/soft/kafka_2.12-3.3.1/bin/kafka-server-stop.sh stop"
    15. done
    16. };;
    17. esac

    添加权限

    [hadoop@node2 bin]$ chmod u+x kf.sh 

    测试kf集群启动脚本

    [hadoop@node2 bin]$ kf.sh start
     --------启动 node2 Kafka-------
     --------启动 node3 Kafka-------
     --------启动 node4 Kafka-------
    [hadoop@node2 bin]$ jps
    2071 Jps
    2041 Kafka

    测试kf集群停止脚本

    [hadoop@node2 bin]$ kf.sh stop
     --------停止 node2 Kafka-------
    No kafka server to stop
     --------停止 node3 Kafka-------
    No kafka server to stop
     --------停止 node4 Kafka-------
    No kafka server to stop
    [hadoop@node2 bin]$ jps
    2121 Jps
    ​

    分发脚本

    为了方便其他机器也可以启动/停止kafka,分发脚本

    [hadoop@node2 bin]$ xsync ~/bin

    Kafka常用命令

    启动集群

    启动zk

    [hadoop@node2 ~]$ zk.sh start

    启动kafka

    [hadoop@node2 ~]$ kf.sh start

    jps查看进程

    [hadoop@node2 ~]$ jps
    3267 Kafka
    2868 QuorumPeerMain
    3354 Jps
    

    创建topic

    [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 3 --partitions 1 --topic topic_log

    查看topic

    [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
    topic_log
    ​
    

    查看zk里的topic节点

    [hadoop@node3 ~]$ zkCli.sh 
    ​
    [zk: localhost:2181(CONNECTED) 0] ls -R /kafka
    /kafka
    /kafka/admin
    /kafka/brokers
    /kafka/cluster
    /kafka/config
    /kafka/consumers
    /kafka/controller
    /kafka/controller_epoch
    /kafka/feature
    /kafka/isr_change_notification
    /kafka/latest_producer_id_block
    /kafka/log_dir_event_notification
    /kafka/admin/delete_topics
    /kafka/brokers/ids
    /kafka/brokers/seqid
    /kafka/brokers/topics
    /kafka/brokers/ids/0
    /kafka/brokers/ids/1
    /kafka/brokers/ids/2
    /kafka/brokers/topics/topic_log
    /kafka/brokers/topics/topic_log/partitions
    /kafka/brokers/topics/topic_log/partitions/0
    /kafka/brokers/topics/topic_log/partitions/0/state
    /kafka/cluster/id
    /kafka/config/brokers
    /kafka/config/changes
    /kafka/config/clients
    /kafka/config/ips
    /kafka/config/topics
    /kafka/config/users
    /kafka/config/topics/topic_log
    [zk: localhost:2181(CONNECTED) 1] 
    ​
    ​
    

    kafka生产消息

    [hadoop@node2 ~]$ kafka-console-producer.sh --broker-list node2:9092 --topic topic_log
    >hello world
    >hello ha
    >

    不关闭这个终端

    kafka消费消息

    打开一个新的node2窗口

    [hadoop@node2 ~]$ kafka-console-consumer.sh --bootstrap-server node2:9092 --from-beginning --topic topic_log
    hello world
    hello ha

    --from-beginning:会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

    此时可以继续在生产者终端发送数据,发现消费者终端能继续接收数据。

    分别在生产者终端和消费者终端按Ctrl+c退出并返回Linux命令行。

    删除topic

    [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --delete --topic topic_log

    再次查看topic

    [hadoop@node2 bin]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
    __consumer_offsets

    完成!enjoy it!

  • 相关阅读:
    结合RF与1DCNN的多信息融合气温预报方法
    Jetsonnano B01 笔记6:开启USB摄像头
    手动下载/安装Xcode的simulator
    7 Series Devices Memory Interface Solution - Memory Controller Block
    【AI视野·今日Sound 声学论文速览 第二十五期】Fri, 13 Oct 2023
    基于springcloud+vue的分布式架构网上商城网站 前后端分离
    Mysql数据类型
    【Pandas 数据分析3-2】Pandas 数据读取与输出 - Excel
    微软AD域如何实现用户自助修改/重置密码?
    嵌入式Linux裸机开发(六)EPIT 定时器
  • 原文地址:https://blog.csdn.net/qq_42881421/article/details/139670493