• kafka3.X集群安装(不使用zookeeper)


    一、kafka集群实例角色规划

    在本专栏的之前的一篇文章《kafka3种zk的替代方案》已经为大家介绍过在kafka3.0种已经可以将zookeeper去掉。

    上图中黑色代表broker(消息代理服务),褐色/蓝色代表Controller(集群控制器服务)

    • 左图(kafka2.0):一个集群所有节点都是broker角色,kafka从三个broker中选举出来一个Controller控制器,控制器将集群元数据信息(比如主题分类、消费进度等)保存到zookeeper,用于集群各节点之间分布式交互。
    • 右图(kafka3.0):假设一个集群有四个broker,指定三个作为Conreoller角色(蓝色),从三个Controller中选举出来一个Controller作为主控制器(褐色),其他的2个备用。zookeeper不再被需要!相关的元数据信息以kafka日志的形式存在(即:以消息队列消息的形式存在)。
    • controller通信端口:9093, 作用与zk的2181端口类似 。

    在搭建kafka3.0集群之前, 我们需要先做好kafka实例角色规划。(四个broker, 需要通过主动配置指定三个作为Controller, Controller需要奇数个, 这一点和zk是一样的)

    主机名称ip角色node.id
    kafka-vm1192.168.1.111broker,controller1
    kafka-vm2192.168.1.112broker,controller2
    kafka-vm3192.168.1.113broker,controller3
    kafka-vm4192.168.1.114broker4

    二、准备工作

    • kafka3.x不再支持JDK8,建议安装JDK11或JDK17。
    • 新建kafka持久化日志数据mkdir -p /data/kafka;并保证安装kafka的用户具有该目录的读写权限。

    各个机器节点执行:

    # 安装jdk(kafka3.x不再支持JDK8,建议安装JDK11或JDK17, 这里安装jdk11)
    # 下载安装jdk11, 参考: https://blog.csdn.net/justlpf/article/details/127268046
     
    # 下载kafka
    adduser kafka
    cd /opt
    wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
    tar -xf kafka_2.12-3.3.1.tgz
     
    chown -R kafka:kafka kafka_2.12-3.3.1*
     
    mkdir -p /data/kafka
    chown -R kafka:kafka /data/kafka
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    vi /etc/hosts,各个节点,添加如下内容:
    192.168.1.111 data-vm1
    192.168.1.112 data-vm2
    192.168.1.113 data-vm3
    192.168.1.114 data-vm4
    
    • 1
    • 2
    • 3
    • 4

    三、修改Kraft协议配置文件

    在kafka3.x版本中,使用Kraft协议代替zookeeper进行集群的Controller选举,所以要针对它进行配置。

    vi /opt/kafka_2.12-3.3.1/config/kraft/server.properties
    
    • 1

    具体配置参数如下:

    # data-vm1节点
    node.id=1
    process.roles=broker,controller
    listeners=PLAINTEXT://data-vm1:9092,CONTROLLER://data-vm1:9093
    advertised.listeners=PLAINTEXT://:9092
    controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
    log.dirs=/data/kafka/
     
    # data-vm2节点
    node.id=2
    process.roles=broker,controller
    listeners=PLAINTEXT://data-vm2:9092,CONTROLLER://data-vm2:9093
    advertised.listeners=PLAINTEXT://:9092
    controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
    log.dirs=/data/kafka/
     
    # data-vm3节点
    node.id=3
    process.roles=broker,controller
    listeners=PLAINTEXT://data-vm3:9092,CONTROLLER://data-vm3:9093
    advertised.listeners=PLAINTEXT://:9092
    controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
    log.dirs=/data/kafka/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • node.id:这将作为集群中的节点 ID,唯一标识,按照我们事先规划好的(上文),在不同的服务器上这个值不同。其实就是kafka2.0中的broker.id,只是在3.0版本中kafka实例不再只担任broker角色,也有可能是controller角色,所以改名叫做node节点。
    • process.roles:一个节点可以充当broker或controller或两者兼而有之。按照我们事先规划好的(上文),在不同的服务器上这个值不同。多个角色用逗号分开。
    • listeners: broker将使用9092端口,而kraft controller控制器将使用9093端口。
    • advertised.listeners: 这里指定kafka通过代理暴漏的地址,如果都是局域网使用,就配置PLAINTEXT://:9092即可。
    • controller.quorum.voters:这个配置用于指定controller主控选举的投票节点,所有process.roles包含controller角色的规划节点都要参与,即:zimug1、zimug2、zimug3。其配置格式为:node.id1@host1:9093,node.id2@host2:9093
    • log.dirs:kafka 将存储数据的日志目录,在准备工作中创建好的目录。

    所有kafka节点都要按照上文中的节点规划进行配置,完成config/kraft/server.properties配置文件的修改。

    四、格式化存储目录

    生成一个唯一的集群ID(在一台kafka服务器上执行一次即可),这一个步骤是在安装kafka2.0版本的时候不存在的。

    $ /opt/kafka_2.12-3.3.1/bin/kafka-storage.sh random-uuid
    SzIhECn-QbCLzIuNxk1A2A
    
    • 1
    • 2

    使用生成的集群ID+配置文件格式化存储目录log.dirs,

    所以这一步确认配置及路径确实存在,

    并且kafka用户有访问权限(检查准备工作是否做对)。

    每一台主机服务器都要执行命令:

    /opt/kafka_2.12-3.3.1/bin/kafka-storage.sh format \
    -t SzIhECn-QbCLzIuNxk1A2A \
    -c /opt/kafka_2.12-3.3.1/config/kraft/server.properties
    
    • 1
    • 2
    • 3

    格式化操作完成之后,log.dirs​目录下多出一个Meta.properties文件​,存储了当前的kafka节点的id(node.id),当前节点属于哪个集群(cluster.id)

    [root@data-vm2 ~]# ll /data/kafka/
    总用量 8
    -rw-r--r--. 1 root root 249 10月 11 18:23 bootstrap.checkpoint
    -rw-r--r--. 1 root root  86 10月 11 18:23 meta.properties
     
    $ cat /data/kafka/meta.properties
    #
    #Tue Apr 12 07:39:07 CST 2022
    node.id=1
    version=1
    cluster.id=SzIhECn-QbCLzIuNxk1A2A
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    五、 启动集群,完成基础测试

    zimug1 zimug2 zimug3是三台应用服务器的主机名称(参考上文中的角色规划),实现方式已经在本专栏《linux主机与ip解析》中进行了说明。将下面的命令集合保存为一个shell脚本,并赋予执行权限。执行该脚本即可启动kafka集群所有的节点,前提是:你已经按照本专栏的《集群各节点之间的ssh免密登录》安装方式做了集群各节点之间的ssh免密登录。

    启动命令:

    bin/kafka-server-start.sh \
    /opt/kafka_2.12-3.3.1/config/kraft/server.properties
     
    # 后台运行
    nohup bin/kafka-server-start.sh \
    /opt/kafka_2.12-3.3.1/config/kraft/server.properties 2>&1 &
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    脚本: 

    #!/bin/bash
    kafkaServers='data-vm1 data-vm2 data-vm3'
    #启动所有的kafka
    for kafka in $kafkaServers
    do
        ssh -T $kafka </dev/null 2>&1 &
    EOF
    echo 从节点 $kafka 启动kafka3.0...[ done ]
    sleep 5
    done
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    六、一键停止集群脚本

    一键停止kafka集群各节点的脚本,与启动脚本的使用方式及原理是一样的。

    停止命令:

    /opt/kafka_2.12-3.3.1/bin/kafka-server-stop.sh
    
    • 1

    执行脚本:

    #!/bin/bash
    kafkaServers='data-vm1 data-vm2 data-vm3'
    #停止所有的kafka
    for kafka in $kafkaServers
    do
        ssh -T $kafka <
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    七、测试Kafka集群

    7.1 创建topic

    [root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
    --create \
    --topic quickstart-events \
    --bootstrap-server data-vm4:9092
     
    Created topic quickstart-events.
    [root@data-vm1 kafka_2.12-3.3.1]#
     
    # 
    [root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
    --create \
    --topic quickstart-events \
    --bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    7.2 查看topic列表

    bin/kafka-topics.sh \
    --list \
    --bootstrap-server data-vm4:9092
     
    # 
    bin/kafka-topics.sh \
    --list \
    --bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092,data-vm4:9092
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    7.3 查看消息详情

    [root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
    --describe \
    --topic quickstart-events \
    --bootstrap-server data-vm3:9092
     
    Topic: quickstart-events        TopicId: zSOJC6wNRRGQ4MudfHLGvQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
            Topic: quickstart-events        Partition: 0    Leader: 1       Replicas: 1     Isr: 1
     
    [root@data-vm1 kafka_2.12-3.3.1]#
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    7.4 生产消息

    [root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-console-producer.sh \
    --topic quickstart-events \
    --bootstrap-server data-vm1:9092
     
    # 参考: 创建并配置topic
    bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --create \
    --topic my-topic \
    --partitions 1 \
    --replication-factor 1 \
    --config max.message.bytes=64000 \
    --config flush.messages=1
     
    # ------------------------- 参考 ------------------------ #
    # 1: 修改已创建topic配置
    # (Overrides can also be changed or set later using the alter configs command.)
    bin/kafka-configs.sh \
    --bootstrap-server localhost:9092 \
    --entity-type topics \
    --entity-name my-topic \
    --alter \
    --add-config max.message.bytes=128000
     
    # 2: 检查已修改的topic配置是否生效
    # (To check overrides set on the topic you can do)
    bin/kafka-configs.sh \
    --bootstrap-server localhost:9092 \
    --entity-type topics \
    --entity-name my-topic \
    --describe
     
    # 3. 恢复到原来的配置
    # (To remove an override you can do)
    bin/kafka-configs.sh \
    --bootstrap-server localhost:9092 \
    --entity-type topics \
    --entity-name my-topic \
    --alter \
    --delete-config max.message.bytes
     
    # 4. 增加分区数
    # (To add partitions you can do)
    bin/kafka-topics.sh \
    --bootstrap-server broker_host:port \
    --alter \
    --topic my_topic_name \
    --partitions 40
     
    # 5. 添加配置
    # (To add configs:)
    bin/kafka-configs.sh \
    --bootstrap-server broker_host:port \
    --entity-type topics \
    --entity-name my_topic_name \
    --alter \
    --add-config x=y
     
    # 6. 移除配置
    # (To remove a config:)
    bin/kafka-configs.sh \
    --bootstrap-server broker_host:port \
    --entity-type topics \
    --entity-name my_topic_name \
    --alter \
    --delete-config x
     
    # 7. 删除topic
    # (And finally deleting a topic:)
    bin/kafka-topics.sh \
    --bootstrap-server broker_host:port \
    --delete \
    --topic my_topic_name
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    7.5 消费消息

    bin/kafka-console-consumer.sh \
    --topic quickstart-events \
    --from-beginning \
    --bootstrap-server data-vm4:9092
    
    • 1
    • 2
    • 3
    • 4

    7.6 查看消费者组

    # 检查消费者postition
    # Checking consumer position
    bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --describe \
    --group my-group
     
      TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
      my-topic                       0          2               4               2          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
      my-topic                       1          2               3               1          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
      my-topic                       2          2               3               1          consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2   /127.0.0.1                     consumer-2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    7.7 查看消费者组列表

    # list all consumer groups across all topics
    bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --list
     
      test-consumer-group
     
     
    # To view offsets, as mentioned earlier, 
    # we "describe" the consumer group like this:
    bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --describe \
    --group my-group
     
      TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
      topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
      topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
      topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
      topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
      topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
      topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4
     
    # 更多配置参考:
    # https://kafka.apache.org/32/documentation.html#uses
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
  • 相关阅读:
    高性能计算环境下的深度学习异构集群建设与优化实践
    简易node ts代码给json进行sort或者diff
    git clean 命令详解
    实战基于Docker部署NGINX应用网站
    如何通过CRM系统做好客户的分级分类
    Shortsighted(线段树维护2次函数)
    Django笔记十四之统计总数、最新纪录和空值判断等功能
    大龄程序员如何面对“中年危机”?这份书单或许能帮到你
    L1-028 判断素数
    【ELFK】之zookeeper
  • 原文地址:https://blog.csdn.net/zhangjunli/article/details/134036556