• KafkaQ - 好用的 Kafka Linux 命令行可视化工具


    软件效果前瞻 ~

    鉴于并没有在网上找到比较好的linux平台的kafka可视化工具,今天为大家介绍一下自己开发的在 Linux 平台上使用的可视化工具KafkaQ

    虽然简陋,主要可以实现下面的这些功能:

    1)查看当前topic的分片数量和副本数量

    2)查看当前topic下面每个分片的最大offset

    3)查看当前topic某个分片下面指定offset范围的数据

    4)搜索当前topic指定关键词的message

    +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

    KafkaQ分为普通版本和搜索版本:

    * 普通版本支持上述3种查询

    * 搜索版本支持上述3种查询之外,增加关键词搜索,即在分片中搜索指定关键词的message

    一、普通版 KafkaQ.sh

    使用方法:

    1. Usage: KafkaQ.sh --topic [--partition] [--offset] [--limit<limit>]
    2. --topic 话题名称
    3. --partition 分片索引(可选)
    4. --offset 从第k个offset开始检索(可选)
    5. --limit 从第k个offset开始检索X条结果(可选)

    显示的效果如下,十分简洁,分片数据里面左边一列是消息入库的时间,右边是message内容:

    KafkaQ 源码如下:

    1. #!/bin/bash
    2. # 默认值
    3. PARTITION=${2:-0}
    4. OFFSET=${3:-0}
    5. LIMIT=${4:-0}
    6. # 检查参数
    7. if [ -z "$1" ]; then
    8. echo "Usage: $0 --topic [--partition] [--offset] [--limit]"
    9. exit 1
    10. fi
    11. TOPIC="$1"
    12. # 检查Kafka命令是否存在
    13. if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; then
    14. echo "Kafka not found at /usr/local/kafka/bin/"
    15. exit 1
    16. fi
    17. # 获取Topic信息
    18. echo -e "\033[0;31m* 话题: $TOPIC\033[0m"
    19. # 获取分区数和副本数
    20. PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
    21. PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
    22. REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')
    23. echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"
    24. # 获取分片a和分片b的最大偏移量
    25. MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC" | awk -F: '{ printf " 分片: %s,MaxOffset: %s\n", $2, $3 }')
    26. echo "$MAX_OFFSET"
    27. # 获取分片数据
    28. if [ "$LIMIT" -gt 0 ]; then
    29. echo -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"
    30. MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)
    31. # 格式化输出消息
    32. echo "$MESSAGES" | awk -F'\t' 'BEGIN {
    33. print "* 分片数据:"
    34. }
    35. {
    36. if ($3 != "null") {
    37. timestamp = substr($1, 12) / 1000 # 从第10个字符开始提取时间戳,并除以1000以转换为秒级时间戳
    38. value = $3
    39. printf "\033[0;33m%s\033[0m %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value
    40. }
    41. }'
    42. fi

    二、搜索版 KafkaQ-Search.sh

    使用方法:

    1. Usage: KafkaQ-Search.sh --topic [--partition] [--offset] [--limit<limit>] [--search]
    2. --topic 话题名称
    3. --partition 分片索引(可选)
    4. --offset 从第k个offset开始检索(可选)
    5. --limit 从第k个offset开始检索X条结果(可选)
    6. --search 搜索字符串

    示例(所有参数是必选的哦):

    sh KafkaQ-Search.sh --topic log --partition 0 --offset 0 --limit 18480 --search '9fea9c52-c0fe-4429-81e1-d045f35f9be9'

    显示效果如下:

    KafkaQ-Search.sh 源码如下:

    1. #!/bin/bash
    2. # 默认值
    3. PARTITION=${2:-0}
    4. OFFSET=${3:-0}
    5. LIMIT=${4:-0}
    6. SEARCH=${5:-""}
    7. # 检查参数
    8. if [ -z "$1" ]; then
    9. echo "Usage: $0 --topic [--partition] [--offset] [--limit] [--search]"
    10. exit 1
    11. fi
    12. while [[ $# -gt 0 ]]; do
    13. case "$1" in
    14. --topic)
    15. TOPIC="$2"
    16. shift 2
    17. ;;
    18. --partition)
    19. PARTITION="$2"
    20. shift 2
    21. ;;
    22. --offset)
    23. OFFSET="$2"
    24. shift 2
    25. ;;
    26. --limit)
    27. LIMIT="$2"
    28. shift 2
    29. ;;
    30. --search)
    31. SEARCH="$2"
    32. shift 2
    33. ;;
    34. *)
    35. echo "Unknown parameter: $1"
    36. exit 1
    37. ;;
    38. esac
    39. done
    40. # 检查Kafka命令是否存在
    41. if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; then
    42. echo "Kafka not found at /usr/local/kafka/bin/"
    43. exit 1
    44. fi
    45. # 获取Topic信息
    46. echo -e "\033[0;31m* 话题: $TOPIC\033[0m"
    47. # 获取分区数和副本数
    48. PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
    49. PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
    50. REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')
    51. echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"
    52. # 获取分片a和分片b的最大偏移量
    53. MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC" | awk -F: '{ printf " 分片: %s,MaxOffset: %s\n", $2, $3 }')
    54. echo "$MAX_OFFSET"
    55. # 获取分片数据
    56. if [ "$LIMIT" -gt 0 ]; then
    57. echo -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"
    58. MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)
    59. # 搜索关键词并输出结果
    60. if [[ ! -z $SEARCH ]]; then
    61. echo -e "\033[0;32m* 搜索条件:$SEARCH\033[0m"
    62. echo " 搜索结果:"
    63. echo "$MESSAGES" | grep --color=auto "$SEARCH" | awk -F'\t' '{
    64. timestamp = substr($1, 12) / 1000 # Extract timestamp starting from the 12th character and convert to seconds
    65. value = $3
    66. # Print with timestamp formatted and value highlighted in yellow (ANSI escape code 0;33m) if matches $SEARCH
    67. printf "\033[0;33m%s\033[0m ", strftime("%Y-%m-%d %H:%M:%S", timestamp)
    68. gsub("'"$SEARCH"'", "\033[0;33m&\033[0m", value)
    69. print value
    70. }'
    71. fi
    72. fi

     * (附注)参考的shell如下

    1、获取kafka的topic 分区数量

    /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic 

    2、获取kafka每个分片最大的offset

    /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic 

    3、获取kafka分片指定offset范围的具体信息

    /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic  --partition  --offset  --max-messages  --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

  • 相关阅读:
    C语言统计成绩
    Python毕业论文题目计算机毕业论文django微信小程序校园导航系统
    华为云云耀云服务器L实例评测| 之兼容性测试
    C语言画直方图
    HTML整站规划与规范
    K8S知识点(三)
    Redis 设置密码
    [C++](8)模板的初步了解
    1.4_28 Axure RP 9 for mac 高保真原型图 - 案例27【中继器 - 后台管理系统5】功能-弹窗修改数据
    【每日一题】744. 寻找比目标字母大的最小字母
  • 原文地址:https://blog.csdn.net/panguangyuu/article/details/139649048