• kafka数据存储目录间迁移


    生产环境kafka集群,在数据量大的情况下,经常会出现单机各个磁盘间的占用不均匀情况。

    原因探究

    这是因为kafka在之前的版本中只保证分区数在各个磁盘上均匀分布,但因无法统计每个分区实际大小,导致大概率出现某些分区数据量巨大导致磁盘利用率不均衡。

    在 kafka1.1 版本之前,用户对此基本没有什么优雅的处理方法,即便手动迁移日志文件和 offset 信息,也需要重启生效,风险极高。在 1.1 之前,kafka只支持分区数据在不同broker间的reassigment,而无法做到在同一个broker下的不同磁盘间做重新分配。而在1.1 版本后,kafka正式开始支持副本在不同路径间迁移,具体的实现细节可以看kafka官方wiki KIP-113

    目录间迁移步骤

    假设我在server.properties文件中配置了多个日志存储路径(表示日志数据存储在多块磁盘),如下所示:

    1. # A comma seperated list of directories under which to store log files
    2. log.dirs=/data1/kafka-logs,/data2/kafka-logs,/data3/kafka-logs

    复制

    首先创建一个 9 分区的topic,并发送 1000W 条消息。查询这些数据目录,发现Kafka均匀地将 9 个分区分布在这三个路径上:

    1. > ll /data1/kafka-logs/ |grep test-topic
    2. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-3
    3. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-4
    4. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-5
    5. > ll /data2/kafka-logs/ |grep test-topic
    6. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-0
    7. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-1
    8. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-2
    9. > ll /data3/kafka-logs/ |grep test-topic
    10. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-6
    11. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-7
    12. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-8

    复制

    假设由于还有其他topic数据分布等原因,导致磁盘存储不均衡。需要将test-topic678分区全部迁移到/data2路径下,并且把test-topic1分区迁移到/data1下。若要实现这个需求,我们首先需要写一个JSON文件,migrate-replica.json:

    1. {
    2. "partitions": [
    3. {
    4. "topic": "test-topic",
    5. "partition": 1,
    6. "replicas": [
    7. 0
    8. ],
    9. "log_dirs": [
    10. "/data1/kafka-logs"
    11. ]
    12. },
    13. {
    14. "topic": "test-topic",
    15. "partition": 6,
    16. "replicas": [
    17. 0
    18. ],
    19. "log_dirs": [
    20. "/data2/kafka-logs"
    21. ]
    22. },
    23. {
    24. "topic": "test-topic",
    25. "partition": 7,
    26. "replicas": [
    27. 0
    28. ],
    29. "log_dirs": [
    30. "/data2/kafka-logs"
    31. ]
    32. },
    33. {
    34. "topic": "test-topic",
    35. "partition": 8,
    36. "replicas": [
    37. 0
    38. ],
    39. "log_dirs": [
    40. "/data2/kafka-logs"
    41. ]
    42. }
    43. ],
    44. "version": 1
    45. }

    复制

    其中,replicas中的 0 表示broker ID,由于本文只启动了一个broker,且broker.id = 0,故这里只写 0 即可。实际上你可以指定多个broker实现为多个broker同时迁移副本的功能。另外当前的version固定是 1。

    保存好这个JSON后,我们执行以下命令执行副本迁移:

    1. > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --bootstrap-server localhost:9092 --reassignment-json-file ../migrate-replica.json --execute
    2. Current partition replica assignment
    3. {"version":1,"partitions":[{"topic":"test-topic","partition":8,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":5,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":6,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":7,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":0,"replicas":[0],"log_dirs":["any"]}]}
    4. Save this to use as the --reassignment-json-file option during rollback
    5. Successfully started reassignment of partitions.

    复制

    迁移结果

    执行完成后,我们再次查看存储目录副本分布:

    1. > ll /data1/kafka-logs/ |grep test-topic
    2. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-1
    3. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-3
    4. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-4
    5. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-5
    6. > ll /data2/kafka-logs/ |grep test-topic
    7. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-0
    8. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-1
    9. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-2
    10. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-6
    11. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-7
    12. drwxr-xr-x 6 kafka staff 192 Dec 14 17:21 test-topic-8
    13. > ll /data3/kafka-logs/ |grep test-topic

    复制

    可以看到,678已经被成功地迁移到/data2下,而分区1也迁移到了/data1下。值得一提的是,不仅所有的日志段、索引文件被迁移,实际上分区外层的checkpoint文件也会被更新。比如我们检查/data2下的replication-offset-checkpoint文件可以发现,现在该文件已经包含了678分区的位移数据,如下所示:

    1. > cat replication-offset-checkpoint
    2. 0
    3. 7
    4. test-topic 8 1000000
    5. test-topic 2 1000000
    6. test 0 1285714
    7. test-topic 6 1000000
    8. test-topic 7 1000000
    9. test-topic 0 1000000
    10. test 2 1285714

    复制

  • 相关阅读:
    C++智能指针
    机器学习自学成才的十条戒律
    第七章 查找 七、红黑树
    网络协议常用面试题汇总(二)
    一些个人电脑用的小工具软件
    Spring Cloud 微服务项目实战 -
    Worthington弹性蛋白酶的应用和相关研究
    Excel 转 Json 、Node.js实现(应用场景:i18n国际化)
    sed的介绍及应用
    羧酸-COOH功能化修饰红色荧光聚苯乙烯AIE微球的产品组成和保存条件
  • 原文地址:https://blog.csdn.net/qq_32907195/article/details/126473068