自行通过docker部署好kafka,并启动相关容器。
假设Topic为http_capture。
#docker-kafka
kafka_dir=/opt/docker/kafka/build
sudo rm -rf ${kafka_dir}/*
cat > ${kafka_dir}/docker-compose.yml <<EOF
version: "3.3"
services:
zookeeper:
image: zookeeper:3.5.5
restart: always
container_name: dsms_zookeeper
ports:
- "2181:2181"
environment:
- ZOO_MY_ID=1
kafka:
image: wurstmeister/kafka:2.13-2.8.1
restart: always
container_name: dsms_kafka
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ADVERTISED_HOST_NAME=${local_ip}
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS=36000
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${local_ip}:9092
- KAFKA_LISTENERS=PLAINTEXT://:9092
ports:
- "9092:9092"
expose:
- "9092"
depends_on:
- zookeeper
EOF
cd ${kafka_dir} && sudo /opt/bin/docker-compose up -d
检测Kafka运行正常后,如果Topic为http_capture的主题存在,则更新分区为5个,若不存在Topic,则新建。
#!/bin/bash
# 检查 Kafka 容器是否正常运行
while ! docker ps --format '{{.Names}}' | grep -q "^dsms_kafka$"; do
echo "等待 Kafka 容器启动..."
sleep 5
done
echo "Kafka 容器已成功启动."
# 在 Kafka 容器内执行命令,将结果保存到临时文件中
docker exec dsms_kafka kafka-topics.sh --list --zookeeper zookeeper:2181 > /tmp/kafka_topics_list.txt
# 检查 http_capture 主题是否存在
if grep -q "^http_capture$" /tmp/kafka_topics_list.txt; then
echo "更新 http_capture 主题分区数量..."
docker exec dsms_kafka kafka-topics.sh --alter --topic http_capture --partitions 5 --zookeeper zookeeper:2181
else
echo "创建新的 http_capture 主题..."
docker exec dsms_kafka kafka-topics.sh --create --topic http_capture --partitions 5 --replication-factor 1 --zookeeper zookeeper:2181
echo "新的 http_capture 主题创建成功."
fi
# 删除临时文件
rm -f /tmp/kafka_topics_list.txt