• Docker中Kafka容器创建/更新Topic支持多分区


    前提

    自行通过docker部署好kafka,并启动相关容器。
    假设Topic为http_capture。

    #docker-kafka
    kafka_dir=/opt/docker/kafka/build
    sudo rm -rf ${kafka_dir}/*
    cat > ${kafka_dir}/docker-compose.yml <<EOF
    version: "3.3"
    services:
      zookeeper:
        image: zookeeper:3.5.5
        restart: always
        container_name: dsms_zookeeper
        ports:
          - "2181:2181"
        environment:
          - ZOO_MY_ID=1
    
      kafka:
        image: wurstmeister/kafka:2.13-2.8.1
        restart: always
        container_name: dsms_kafka
        environment:
          - KAFKA_BROKER_ID=1
          - KAFKA_ADVERTISED_HOST_NAME=${local_ip}
          - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
          - KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS=36000
          - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${local_ip}:9092
          - KAFKA_LISTENERS=PLAINTEXT://:9092
        ports:
          - "9092:9092"
        expose:
          - "9092"
        depends_on:
          - zookeeper
    EOF
    cd ${kafka_dir} && sudo /opt/bin/docker-compose up -d
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    Docker中Kafka容器创建/更新Topic支持多分区

    检测Kafka运行正常后,如果Topic为http_capture的主题存在,则更新分区为5个,若不存在Topic,则新建。

    #!/bin/bash
    
    # 检查 Kafka 容器是否正常运行
    while ! docker ps --format '{{.Names}}' | grep -q "^dsms_kafka$"; do
        echo "等待 Kafka 容器启动..."
        sleep 5
    done
    
    echo "Kafka 容器已成功启动."
    
    # 在 Kafka 容器内执行命令,将结果保存到临时文件中
    docker exec dsms_kafka kafka-topics.sh --list --zookeeper zookeeper:2181 > /tmp/kafka_topics_list.txt
    
    # 检查 http_capture 主题是否存在
    if grep -q "^http_capture$" /tmp/kafka_topics_list.txt; then
        echo "更新 http_capture 主题分区数量..."
        docker exec dsms_kafka kafka-topics.sh --alter --topic http_capture --partitions 5 --zookeeper zookeeper:2181
    else
        echo "创建新的 http_capture 主题..."
        docker exec dsms_kafka kafka-topics.sh --create --topic http_capture --partitions 5 --replication-factor 1 --zookeeper zookeeper:2181
        echo "新的 http_capture 主题创建成功."
    fi
    
    # 删除临时文件
    rm -f /tmp/kafka_topics_list.txt
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
  • 相关阅读:
    如何将 Helm Chart 推送至 Harbor ?
    NodeJS @kubernetes/client-node连接到kubernetes集群的方法
    lazada、shopee、速卖通、eBay、亚马逊提升排名的关键:掌握自养号测评技巧
    力扣HOT100 - 200. 岛屿数量
    在同一台机器上部署OGG并测试
    基础算法练习200题06、分练习本
    后App时代的小程序化技术?
    19 视图定义 union 是根据第一个 select 字段列表顺序,来进行 merge 的
    minikube基本使用方法
    自动驾驶杂谈
  • 原文地址:https://blog.csdn.net/u011019141/article/details/137928665