• kafka满五替换


    生产需求:
    生产环境中kafka集群机器满了五年(或者更久)之后,服务器到达了寿命需要替换。将用了超过五年的服务进行下线处理。
    方案一:使用kafka-reassign-partitions.sh重分配指令。但缺点是导致再topic充分配的时候导致生产和消费端产生异常,所以不采用。
    方案二:先扩副本(之前已经做了扩集群,可以将下线的机器上的topic全部迁移到扩的集群中),再下线机器,再所副本,这样对生产和消费端做到无感知。所以采用方案二。

    本篇文章详细讲解方案二的策略:
    在这里插入图片描述

    topic-lists.txt : 该文件是用来填写需要迁移的主题名称,主要有两种模式:
    1、值为all,代表对集群中所有的主题执行迁移操作,但是脚本里面会判断,当主题中所有的副本都不在下线节点中的话,则不会进行迁移
    2、值为主题名,每个主题占一行,对指定主题进行迁移
    topic-replica-change.sh :该脚本是主脚本,主要有3个参数
    参数1: 有两种类型的值,Add和Del,其中add代表在现有副本中扩副本,所扩副本数为所有分区的副本中,某个分区的副本在下线节点中个数最多的即为该主题所扩副本数;del代表删除副本中broker id为下线节点的broker对应的副本
    参数2:所有非下线节点的broker id,用逗号分割
    参数3:下线节点的broker id,用逗号分割
    /bin/bash topic-replica-change.sh Add 0,1,2 3 #这是扩容调用的脚本
    /bin/bash topic-replica-change.sh Del 0,1,2 3 #这是缩容调用的脚本
    扩容脚本执行过程中产生的文件:
    主题名.txt : 主题扩容前对应的describe信息
    主题名-to-move-add.json : 主题副本扩容生成的对应的执行计划
    move-result.txt : 主题副本重分配结果
    缩容脚本执行过程中产生的文件:
    【注意】下线缩容,为避免迁移过程中leader切换影响业务,必须采用先下线节点,后进行缩容操作
    主题名.txt : 主题缩容前对应的describe信息
    主题名-to-move-del.json : 主题副本缩容生成的对应的执行计划
    move-result.txt : 主题副本重分配结果

    topic-lists.txt
    1、值为all,代表对集群中所有的主题执行迁移操作,但是脚本里面会判断,当主题中所有的副本都不在下线节点中的话,则不会进行迁移
    2、值为主题名,每个主题占一行,对指定主题进行迁移

    all
    
    • 1

    topic-replica-change.sh

    #!/bin/sh
    SHELL_NAME=$(echo $0)
    SHELLDIR=$(cd $(dirname $0);echo $(pwd);cd - >/dev/null)
    TOPIC_FILE=${SHELLDIR}/topic-lists.txt
    ZK_URL="localhost:2181/kafka"
    KAFKA_DIR="/home/software/kafka_2.11-0.11.0.0/bin"
    TOPIC_LIST="${KAFKA_DIR}/kafka-topics.sh --zookeeper ${ZK_URL} --list"
    TOPIC_DESCRIBE="${KAFKA_DIR}/kafka-topics.sh --zookeeper ${ZK_URL} --describe --topic "
    REASSIGN_EXECUTE="${KAFKA_DIR}/kafka-reassign-partitions.sh --zookeeper ${ZK_URL} --execute --reassignment-json-file "
    REASSIGN_VERIFY="${KAFKA_DIR}/kafka-reassign-partitions.sh --zookeeper ${ZK_URL} --verify --reassignment-json-file "
    OPERATION=$1
    ONLINE_BROKER=$2
    OFFLINE_BROKER=$3
    echo $SHELL_NAME
    echo $SHELLDIR
    echo $ONLINE_BROKER
    echo $OFFLINE_BROKER
    
    usage() {
      	cat << EOF
    	Usage: /bin/bash ${SHELL_NAME} [-degv] argument
    	示例如下:
    		扩容调用的脚本:/bin/bash  topic-replica-change.sh Add 10000,10001 10002 60  #扩容调用的脚本
    		缩容调用的脚本:/bin/bash  topic-replica-change.sh Del 10000,10001 10002 60  #这是缩容调用的脚本
    		参数1: 有两种类型的值,Add和Del
    			Add代表在现有副本中扩副本,所扩副本数为所有分区的副本中,某个分区的副本在下线节点中个数最多的即为该主题所扩副本数;
    			Del代表删除副本中broker id为下线节点的broker对应的副本。
    		参数2:所有非下线节点的broker id,用逗号分割
    		参数3:下线节点的broker id,用逗号分割
    EOF
    }
    
    verify_plan(){
        local topic=$1
        move_res=`${REASSIGN_VERIFY}${topic}"-to-move-${OPERATION}.json" | grep "in progress" | wc -l`
    	while [[ ${move_res} -ne 0 ]] ;do
    		sleep 5s
    		echo -e " ${topic} reassignment is still in progress,please wait!\n   ${move_res} partitions is still in progress\n"
    		verify_plan $topic
    	done 
    }
    
    execute_plan(){
        local topic=$1
        echo "start execute plan of ${topic}-to-move-${OPERATION}.json"
        ${REASSIGN_EXECUTE}${topic}"-to-move-${OPERATION}.json"
        if [ $? -eq 0 ]; then
            echo "success execute plan of $topic"
            verify_plan $topic
    		echo "${topic} move success,operation is ${OPERATION}" | tee -a move-result.txt
        else
            echo "execute plan of $topic failed"
        fi    
    }
    
    #get topic lists
    get_topic_lists(){
        topic_lists=`${TOPIC_LIST}`
        for topic in $topic_lists 
        do
    	    #过滤掉__consumer_offsets主题名字的topic,这个topic是记录消费者的offset的
            if [ $topic != "__consumer_offsets" ]; then
    		    #-e 参数是激活转义字符 \n换行
    	        echo -e "\n${topic}"
                process_topic $topic
            fi
        done
    }
    
    
    process_topic(){
        local countNoLeader=0 
        local countReplicasIsrNoEqual=0 
        local countLteOneBak=0   
        local topic=$1
        echo "Beginning check Topic=${topic} Status:"
        line="${TOPIC_DESCRIBE}${topic}"
        countNoLeader=`${line} | egrep "\-1|none"  |wc -l`
        echo "${topic}的分区中没有leader的个数为${countNoLeader}"
        countLteOneBak=`${line}| grep "ReplicationFactor" |  awk '{match($0,/ReplicationFactor:([^[:blank:]]*)/,arr);print arr[1]}'`
        echo "${topic}的副本个数为${countLteOneBak}"
        num=`${line} | grep -o "Topic" | wc -l`
        echo "${topic}的分区个数为${num}"
        NUMOFFLINE=`echo ${OFFLINE_BROKER}|grep -o ","|wc -l`
        echo $(${line}|grep "Leader:"| awk '{match($0,/Replicas: ([^[:blank:]]*).*Isr: ([^[:blank:]]*)/,arr);print arr[1],arr[2]}')
        
        while read -r LINE ;do 
        [[ "${OPERATION}" == "Add" ]] && [ $(echo ${LINE} | awk '{print $1}' | grep -o "," | wc -l) -ne $(echo ${LINE} | awk '{print $2}' |grep -o ","|wc -l) ] && ((countReplicasIsrNoEqual++));
        [[ "${OPERATION}" == "Del" ]] && [ $(echo ${LINE} | awk '{print $1}' | grep -o "," | wc -l)  -ne $(($(echo ${LINE} | awk '{print $2}' |grep -o ","|wc -l) + ${NUMOFFLINE} + 1 )) ] && ((countReplicasIsrNoEqual++));
        done < <( ${line}|grep "Leader:"| awk '{match($0,/Replicas: ([^[:blank:]]*).*Isr: ([^[:blank:]]*)/,arr);print arr[1],arr[2]}' )
        if [ "${countNoLeader}" -ne 0 ] ||  [[ "${countReplicasIsrNoEqual}" -ne 0 ]] || [ "1${countLteOneBak}" -eq "11" ] || [ "${num}" -lt 1 ] ; then
        echo "Topics has ExceptionStatus!"
        else
            echo "start process $topic"
            rm -rf "${topic}.txt"
            rm -rf "${topic}-to-move-${OPERATION}.json"
            echo `${line}`|sed '#\n##g' | tee -a "${topic}.txt"
            generate_plan ${topic}
        fi
    }
    
    generate_plan(){
        #在函数内定义局部变量。
        local topic=$1
        echo "start generate plan of $topic"
    	#执行python脚本
        python ${SHELLDIR}/topicReplica${OPERATION}.py $topic $ONLINE_BROKER $OFFLINE_BROKER
        echo ${SHELLDIR}/topicReplica${OPERATION}.py 
       if [ ! -f "${topic}-to-move-${OPERATION}.json" ]; then
            echo "generate plan of $topic failed"
        else
            echo "success generate plan of $topic,plan file is ${topic}-to-move-${OPERATION}.json"
            execute_plan $topic
        fi
    }
    
    function doWork(){
        #遍历文件中所有topics
    	while read -r LINE ;do 
    		echo  "topic=${LINE}"
    		process_topic ${LINE} 
    	done < $TOPIC_FILE
    }
    
    main(){
        #- z 如果参数ONLINE_BROKER为空,或
    	#-s "$TOPIC_FILE"如果文件存在且非空
    	if [ -z "${ONLINE_BROKER}" ] || [ ! -s "$TOPIC_FILE" ];then
    		echo "Please check params or $TOPIC_FILE "
            usage
            exit
    	elif [ $(grep -c "^all$" $TOPIC_FILE) -gt 0 ] ;then
    	    echo "start process all topic"
            get_topic_lists
    	else
    		case ${OPERATION} in
    			-h)
    				usage
    				exit 1
    				;;
    			Add)
    				doWork
    				exit 1
    				;;
    			Del)
    				doWork
    				exit 1
    				;;
    			*)
    				echo -e  "input error\n"
    				usage
    				exit 1
    				;;
    		esac
    	fi
    }
    main
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158

    topicReplicaAdd.py

    # coding:utf-8
    import sys
    import customlog as logger
    import json
    from random import randint
    
    
    def get_str_btw(s, f, b):
        par = s.partition(f)
        return (par[2].partition(b))[0][:]
    
    
    def read_topic_info(topic):
        path = "./" + topic + ".txt"
        with open(path, 'r') as f:
            info = f.readline()
        print("info" + info)
        partition_infos = info.split("Configs:")[1].split("Topic")
        topic_dic = {}
        for line in partition_infos:
            partition = get_str_btw(line, "Partition:", "Leader:").strip()
            replicas = get_str_btw(line, "Replicas:", "Isr:").strip().split(",")
            if partition != '':
                topic_dic[partition] = replicas
        return topic_dic
    
    
    def get_max_replica(offline_broker, curr_relica):
        replica = [ i for i in offline_broker if i in curr_relica]
        return len(replica)
    
    
    def get_usefull_broker(b1, b2):
        return [i for i in b1 if i not in b2]
    
    
    def add_new_replicas(b1, b2, replica_str, num):
        usefull_online = get_usefull_broker(b1, b2)
        if len(usefull_online) == 0:
            logger.error("没有多余节点用来扩副本")
            sys.exit(1)
        used_lists = []
        for k in range(0, num):
            while True:
                random_index = randint(0, len(usefull_online)-1)
                if random_index not in used_lists:
                    replica_str = replica_str + "," + usefull_online[random_index]
                    used_lists.append(random_index)
                    break
        return replica_str
    
    
    def write_to_json(filepath,txt):
        with open(filepath,"w") as f:
            f.write(txt)
    
    
    def get_new_replicas(topic_name, partition_replica, new_replica_num, online_broker, offline_broker):
        pars = len(partition_replica)
        js_str = """{"version":1,"partitions":["""
        for x in range(0, pars):
            replica_info = partition_replica[str(x)]
            print(replica_info)
            off_num = get_max_replica(offline_broker, replica_info)
            on_num = new_replica_num - off_num
            replica_news = ','.join(replica_info)
            if off_num != 0:
                replica_news = add_new_replicas(online_broker, replica_info, replica_news, off_num)
            if on_num != 0:
                replica_news = add_new_replicas(offline_broker, replica_info, replica_news, on_num)
            partition_str = """{"topic": "%s", "partition": %s,"replicas": [%s]}""" % (topic ,x ,replica_news)
            if x == pars -1:
                js_str = js_str + partition_str+"]}"
            else:
                js_str = js_str + partition_str+","
        write_to_json("./"+topic_name+"-to-move-Add.json",js_str)
    
    
    if __name__ == '__main__':
        topic = sys.argv[1]
        print(topic+"topic")
        onlineBroker = sys.argv[2].split(",")
        offlineBroker = sys.argv[3].split(",")
        res = read_topic_info(topic)
        print("============================")
        partitions = len(res)
        # 获得主题新增最大副本数
        max_replica = 0
        for i in range(0, partitions):
            max_middle = get_max_replica(offlineBroker, res[str(i)])
            if max_replica < max_middle:
                max_replica = max_middle
        print("####################")
        print(max_replica)
        if max_replica == 0:
            logger.info("当前主题%s所有分区副本都不在下线节点上,故不需要迁移!" % topic)
            sys.exit(1)
        get_new_replicas(topic, res, max_replica, onlineBroker, offlineBroker)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    topicReplicaDel.py

    # coding:utf-8
    import sys
    import customlog as logger
    import json
    from random import randint
    
    
    def get_str_btw(s, f, b):
        par = s.partition(f)
        return (par[2].partition(b))[0][:]
    
    
    def write_to_json(filepath,txt):
        with open(filepath,"w") as f:
            f.write(txt)
    
    
    def read_topic_info(topic):
        path = "./" + topic + ".txt"
        with open(path, 'r') as f:
            info = f.readline()
        if len(info) == 0:
            logger.info("主题%s信息文件对应的内容为空,请排查!" % topic)
            exit(1)
        # info.split()
        partition_infos = info.split("Configs:")[1].split("Topic")
        topic_dic = {}
        for line in partition_infos:
            partition = get_str_btw(line, "Partition:", "Leader:").strip()
            replicas = get_str_btw(line, "Replicas:", "Isr:").strip().split(",")
            if partition != '':
                topic_dic[partition] = replicas
        return topic_dic
    
    
    def get_new_replicas(topic_name, partition_replica,offline_broker):
        pars = len(partition_replica)
        js_str = """{"version":1,"partitions":["""
        flag= "false"
        for i in range(0, pars):
            replica_info = partition_replica[str(i)]
            new_replica = []
            for broker in replica_info:
                if broker not in offline_broker:
                    new_replica.append(broker)
                else:
                    falg = "true"
            partition_str = """{"topic": "%s", "partition": %s,"replicas": [%s]}""" % (topic, i, ','.join(new_replica))
            if i == pars -1:
                js_str = js_str + partition_str+"]}"
            else:
                js_str = js_str + partition_str+","
        if flag == "true":
            logger.warn("该主题%s对应的所有副本都不在下线节点中,故不需要迁移!" % topic_name)
            sys.exit(1)
        write_to_json("./"+topic_name+"-to-move-Del.json", js_str)
    
    
    if __name__ == "__main__":
        topic = sys.argv[1]
        offlineBroker = sys.argv[3].split(",")
        res = read_topic_info(topic)
        get_new_replicas(topic,res,offlineBroker)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    customlog.py
    这个文件是将日志打印到installlog中,这个目录在执行脚本的上一个目录。

    import os
    import datetime
    
    log_path = '%s/../installlog' % os.path.dirname(__file__)
    if not os.path.exists(log_path):
        os.mkdir(log_path)
    log_file = '%s/install.log' % log_path
    
    def info(msg):
        log(msg, 'INFO')
    
    def warn(msg):
        log(msg, 'WARN')
    
    def error(msg):
        log(msg, 'ERROR')
    
    def log(msg, level):
        log_msg = '%s %s: %s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), level, msg)
        print(log_msg)
    
        with open(log_file, 'a') as f:
            f.write('%s\n' % log_msg)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    测试过程展示

    1.观察跑扩容脚本前所有topic的分区和副本分布情况:
    在这里插入图片描述
    2.执行扩容脚本:/bin/bash topic-replica-change.sh Add 0,1,2 3
    3.观察跑扩容脚本后所有topic的分区和副本分布情况
    在这里插入图片描述
    4.下线borkerId=3的机器
    5.观察下线机器后所有topic的分区和副本分布情况
    在这里插入图片描述
    6.执行缩容脚本:/bin/bash topic-replica-change.sh Del 0,1,2 3

    7.观察跑缩容脚本后所有topic的分区和副本分布情况
    在这里插入图片描述
    此功能在正常吞吐量写入过程中,也不会影响集群的稳定性,也进行压力测试过。

    工具脚本:

    如果在测试环境有多个topic(几十几百个),可以用以下脚本将3副本以上的脚本恢复成3副本。
    ReducedReplication2three.sh

    #!/bin/bash
    
    shopt -s expand_aliases
    source /etc/profile
    SHELLDIR=$(cd $(dirname $0);echo $(pwd);cd - >/dev/null)
    ZKURL="localhost:2181/kafka"
    TOPICSDESCRIBE="kafka-topics.sh --zookeeper ${ZKURL} --describe --topic"
    LOGFILE="${SHELLDIR}/topics-to-move.json"
    alias log='tee -a ${LOGFILE}'
    kafka-topics.sh --zookeeper ${ZKURL} --describe |grep "ReplicationFactor"|awk -F '[:\t]' '{if($6>3) print $2}' >${SHELLDIR}/kafka_list.txt
    
    echo '{"version":1,"partitions": ['>${SHELLDIR}/topics-to-move.json
    
    for replicas in `cat ${SHELLDIR}/kafka_list.txt`
    do
    echo ${replicas}
    ( ${TOPICSDESCRIBE} ${replicas} | grep -v "^Topic" | awk '{match($0,/Topic: ([^[:blank:]]*).*Partition: ([^ ]*).*Leader: ([^ ]*).*Replicas: ([^ ]*).*Isr: ([^[:blank:]]*)/,arr); print "{\"topic\": \""arr[1]"\", \"partition\": "int(arr[2])", \"replicas\": ["substr(arr[5],1,5)"]},"}' ) |log
    done
    
    
    sed -i ':lable;N;s/\n//;b lable' ${SHELLDIR}/topics-to-move.json
    sed -i -e '$s/,$/]}/' ${SHELLDIR}/topics-to-move.json
    sh /home/software/kafka_2.11-0.11.0.0/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka  --execute --reassignment-json-file /home/sc/suofuben/topics-to-move.json
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    注意:1.文中/home/software/kafka_2.11-0.11.0.0/bin/修改成自己的kafka的bin目录。
    2.substr(arr[5],1,5)这个需要根据自己broker的长度进行切割。(原理是切割前面3个brokerId的长度)

  • 相关阅读:
    RocketMQ快速实战以及集群架构原理详解
    linux系统安全配置命令详解
    24PL-18-50-1836、12PN-4.1-50-1836比例电磁铁控制器
    Linux上x86_64架构的动态链接器 ld-linux-x86-64.so.2
    《C++ Primer Plus》第九章:内存模型和名称空间(2)
    java 编译 引用 jar 包进行编译和执行编译后的class文件
    生物素标记甾体化合物/多杀菌素探针分子/壳聚糖/聚乙二醇偶连基团为华生物提供
    RecSysOps: 大规模推荐系统运维最佳实践
    基座向量施密特正交化
    HummerRisk V0.5:新版云合规报告、资源风险联动、拓扑展示等内容
  • 原文地址:https://blog.csdn.net/weixin_44468025/article/details/126467251