• Kafka - 08 Kafka Broker工作流程 | 节点服役 | 节点退役


    1. Kafka Broker 工作流程

    在这里插入图片描述

    Kafka上下线时Zookeeper中的数据变化:

    在这里插入图片描述

    [zk: localhost:2181(CONNECTED) 9] ls /
    [zookeeper, kafka_cluster]
    
    [zk: localhost:2181(CONNECTED) 10] ls /kafka_cluster
    [cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
    
    [zk: localhost:2181(CONNECTED) 11] ls /kafka_cluster/brokers/ids
    [0, 1, 2]
    
    [zk: localhost:2181(CONNECTED) 12] get /kafka_cluster/controller
    {"version":1,"brokerid":0,"timestamp":"1669535410717"}
    
    [zk: localhost:2181(CONNECTED) 13] get /kafka_cluster/brokers/topics/test2/partitions/0/state
    {"controller_epoch":9,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2,1]}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    停止 kafka-01的 kafka节点,执行上面的3个步骤:

    在这里插入图片描述

    [zk: localhost:2181(CONNECTED) 14] ls /kafka_cluster
    [cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
    
    [zk: localhost:2181(CONNECTED) 15] ls /kafka_cluster/brokers/ids
    [1, 2]
    
    [zk: localhost:2181(CONNECTED) 16] get /kafka_cluster/controller
    {"version":1,"brokerid":2,"timestamp":"1669542009693"}
    
    [zk: localhost:2181(CONNECTED) 17] get /kafka_cluster/brokers/topics/test2/partitions/0/state
    {"controller_epoch":9,"leader":2,"version":1,"leader_epoch":1,"isr":[2,1]}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    启动 kafka-01的 kafka节点,执行上面的3个步骤:

    在这里插入图片描述

    [zk: localhost:2181(CONNECTED) 18] ls /kafka_cluster/brokers/ids
    [0, 1, 2]
    
    [zk: localhost:2181(CONNECTED) 19] get /kafka_cluster/controller
    {"version":1,"brokerid":2,"timestamp":"1669542009693"}
    
    [zk: localhost:2181(CONNECTED) 20] get /kafka_cluster/brokers/topics/test2/partitions/0/state
    {"controller_epoch":10,"leader":0,"version":1,"leader_epoch":2,"isr":[2,1,0]}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2. Kafka 节点服役

    1. 增加一个Kafka节点

    ① 将hadoop-103虚拟机关机,右键-管理-克隆一个新的节点,节点名称 hadoop-104。

    ② 修改hadoop-104的ip地址:vi /etc/sysconfig/network-scripts/ifcfg-ens33

    IPADDR=192.168.38.26
    
    • 1

    ③ 修改 hostname 主机名称:vi /etc/hostname

    hadoop104
    
    • 1

    ④ 将 hostname 和 ip 绑定:vi /etc/hosts

    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    192.168.38.23 hadoop101
    192.168.38.24 hadoop102
    192.168.38.25 hadoop103
    192.168.38.26 hadoop104
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    ⑤ 修改kafka的 server.properties 配置文件,主要关注以下几个配置:

    broker.id=3
    listeners=PLAINTEXT://192.168.38.26:9092
    advertised.listeners=PLAINTEXT://192.168.38.26:9092
    # 使用3台zookeeper集群即可 
    zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ⑥ 删除/opt/kafka/kafka_2.12-2.2.1安装目录下之前创建的log目录,并新建该目录:

    [root@hadoop104 kafka_2.12-2.2.1]# rm -rf logs/
    [root@hadoop104 kafka_2.12-2.2.1]# mkdir logs
    
    • 1
    • 2

    ⑦ 启动 hadoop-104节点的 kafka:

    [root@hadoop104 kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server.properties
    
    • 1

    注意:

    配置了4台服务器上的Kafka节点,但是Zookeeper只配置了101、102、103这三台,所以当重启电脑后,只需要重启101、102、103这三台服务器上的Zookeeper即可,否则会报错:Error contacting service. It is probably not running。

    2. 执行负载均衡操作

    ① 我们创建过主题test2,查看下它的主题详情:

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
    Topic:test2     PartitionCount:3   ReplicationFactor:3   Configs:segment.bytes=1073741824
    Topic: test2    Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 1,0,2
    Topic: test2    Partition: 1    Leader: 1       Replicas: 2,1,0 Isr: 1,0,2
    Topic: test2    Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    
    • 1
    • 2
    • 3
    • 4
    • 5

    可以看到以前创建的 test2 主题,仍然存储在 hadoop101、hadoop102、hadoop103 上,并没有存储在hadoop104 这个新的节点上,只有新创建的主题才会在 hadoop104 节点上,如何解决?

    ② 创建一个要均衡的主题:在集群中会有很多的主题,要指定对哪一个主题的存储数据进行负载均衡

    [root@hadoop101 kafka_2.12-2.2.1]# vi topics-to-move.json
    {
        "topics": [
            {"topic": "test2"}
        ],
        "version": 1
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    如果有多个 topic 就添加在 topics 列表中。

    ③ 创建执行计划:

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
    Missing required argument "[zookeeper]"
    
    • 1
    • 2

    上面的命令报错缺少zookeeper参数,加上zookeeper连接集群地址,继续执行:

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
    Current partition replica assignment
    {"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}
    
    Proposed partition reassignment configuration
    {"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Current partition replica assignment:当前分区副本分配

    Proposed partition reassignment configuration:建议的分区重新分配配置,这个就是副本执行计划文件内容

    ④ 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中):

    [root@hadoop101 kafka_2.12-2.2.1]# vi increase-replication-factor.json
    {"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
    
    • 1
    • 2

    ⑤ 执行副本执行计划:

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --execute
    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ⑥ 验证副本存储计划:

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster  --reassignment-json-file increase-replication-factor.json --verify
    Status of partition reassignment:
    Reassignment of partition test2-2 completed successfully
    Reassignment of partition test2-1 completed successfully
    Reassignment of partition test2-0 completed successfully
    
    • 1
    • 2
    • 3
    • 4
    • 5
    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
    Topic:test2  PartitionCount:3   ReplicationFactor:3    Configs:segment.bytes=1073741824
    Topic: test2    Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 1,0,2
    Topic: test2    Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
    Topic: test2    Partition: 2    Leader: 2       Replicas: 2,3,0 Isr: 0,2,3
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3. Kafka 节点退役

    现在有 hadoop101、hadoop102、hadoop103、hadoop104 四个节点,将 hadoop104 从集群节点中退出,不能直接将 hadoop105 节点关闭,因为test2主题的数据已经有一部分存在 hadoop105 节点上了,那么如何退役呢?

    先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。

    ① 创建一个要均衡的主题:

    [root@hadoop101 kafka_2.12-2.2.1]# vi topics-to-move.json
    {
        "topics": [
            {"topic": "test2"}
        ],
        "version": 1
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ② 创建执行计划:节点服役时配置–broker-list “0,1,2,3”,退出服役时只需要配置–broker-list “0,1,2” 接口,把3删除,那么 test2 主题的数据就不会存在 broker.id=3 的 hadoop105 节点上了。

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
    Current partition replica assignment
    {"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
    
    Proposed partition reassignment configuration
    {"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    ③ 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中):

    [root@hadoop101 kafka_2.12-2.2.1]# vi increase-replication-factor.json
    {"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}
    
    • 1
    • 2

    ④ 执行副本存储计划:

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092  --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --execute
    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ⑤ 验证副本存储计划:

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092  --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --verify
    Status of partition reassignment:
    Reassignment of partition test2-2 completed successfully
    Reassignment of partition test2-1 completed successfully
    Reassignment of partition test2-0 completed successfully
    
    • 1
    • 2
    • 3
    • 4
    • 5
    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
    Topic:test2  PartitionCount:3   ReplicationFactor:3     Configs:segment.bytes=1073741824
    Topic: test2    Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 1,2,0
    Topic: test2    Partition: 1    Leader: 1       Replicas: 0,2,1 Isr: 1,2,0
    Topic: test2    Partition: 2    Leader: 2       Replicas: 1,0,2 Isr: 2,1,0
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ⑥ 执行停止命令,将 hadoop105 节点退役,在 hadoop105 上执行停止命令即可

    [root@hadoop104 kafka_2.12-2.2.1]# bin/kafka-server-stop.sh
    
    • 1
  • 相关阅读:
    【汇编语言-王爽】第二章:寄存器
    第5章 MyBatis的注解开发
    安装 Dispatch 库
    修复SpringBoot Actuator未授权访问遇到的问题
    Sequence和Item
    Facebook类似受众的具体创建步骤
    Docker拉取镜像存储不足
    数据结构 之 树
    【分布式入门】Dubbo
    离散数学:图的基本概念
  • 原文地址:https://blog.csdn.net/qq_42764468/article/details/128071108