• 【kafka】kafka命令大全


    概述

    本文将分享一些kafka经常使用的一些命令,不断完善中。

    管理

    创建主题,3个分区,2个副本

    对使用了zookeeper的kafka
    kafka-topics.sh --create --zookeeper 192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181 --replication-factor 2 --partitions 3 --topic test2
    Created topic test2.
    
    对kafka版本 >= 2.2,可以将--zookeeper替换成--bootstrap-server参数即可,如下:后续的其他命令类似。
    kafka-topics.sh --create --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --replication-factor 2 --partitions 3 --topic test2
    
    分区扩容(注意只能扩,不能缩,我测试时至少是这样的)
    kafka版本 < 2.2
    kafka-topics.sh --zookeeper 192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181 --alter --topic test2 --partitions 4
    
    kafka版本 >= 2.2
    kafka-topics.sh --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --alter --topic test2 --partitions 3
    
    删除某一个topic
    kafka-topics.sh --delete  --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --topic test2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    注意,在实际生产使用过程中,–zookeeper或者–bootstrap-server后面的参数值最好写完整,避免因某个节点异常而不能正常使用,后面同是,但是使用bootstrap-server时,成功执行时不会返回结果。其次建议使用–bootstrap-server,–zookeeper后续会成过去式

    查询

    查询集群描述,结果中包括所有的topic分区主从情况
    kafka-topics.sh --describe --zookeeper  192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181
    
    topic列表查询,结果中只有topic的名称
    kafka-topics.sh --list --zookeeper  192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181
    
    topic列表查询(支持0.9版本+)
    kafka-topics.sh --list  --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092
    
    消费者列表查询(存储在zk中的)
    kafka-consumer-groups.sh --zookeeper 192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181 --list
    
    消费者列表查询(支持0.9版本+)
    kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --list
    
    新消费者列表查询(支持0.10版本+)
    kafka-consumer-groups.sh    --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092  --list
    connect-local-file-sink
    
    显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
    kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --describe --group group1
    显示某个消费组的消费详情(0.10.1.0版本+)
    kafka-consumer-groups.sh --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --describe --group group1
    
    kafka-consumer-groups.sh --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092  --delete --group test-consumer-group
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    修改删除topic

    kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
    #可以执行以下命令验证结果
    kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --describe
    Copy
    #移除:
    kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    生产和消费

    生产者发送数据,注意参数是--broker-list
    kafka-console-producer.sh --broker-list 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --topic test2
    
    消费数据,注意参数是--bootstrap-server,不能使用--zookeeper
    kafka-console-consumer.sh  --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --topic test2 --from-beginning
    
    消费者(支持0.9版本+)
    kafka-console-consumer.sh --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --topic test2 --from-beginning --consumer.config config/consumer.properties
    
    消费数据到某个文件,如下的kafka.txt
    kafka-console-consumer.sh  --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --topic test2 --from-beginning > kafka.txt
    
    kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等)
    kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName
    
    
    查看消费者消费主题的情况
    kafka-consumer-groups.sh --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --group connect-local-file-sink --describe
    Consumer group 'connect-local-file-sink' has no active members.
    
    主题	分区	当前位置	结束位置	剩余	消费者id	主机	客户端id
    TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    connect-test    0          3               3               0               -               -               -
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    对topic进行负载均衡

    kafka版本 <= 2.4
    kafka-preferred-replica-election.sh  --zookeeper localhost:2181
    kafka新版本
    kafka-preferred-replica-election.sh  --bootstrap-server 192.168.2.140:9092
    
    • 1
    • 2
    • 3
    • 4
    查看目前的消费组
    kafka-consumer-groups.sh --list  --bootstrap-server localhost:9092
    查看某个消费者的详情
    kafka-consumer\-groups.sh  --bootstrap-server localhost:9092 --describe --group test-consumer-group
    
    • 1
    • 2
    • 3
    • 4

    kafka自带压测命令

    kafka-producer-perf-test.sh --topic test --num-records 1000 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092
    501 records sent, 100.2 records/sec (0.00 MB/sec), 28.5 ms avg latency, 794.0 ms max latency.
    1000 records sent, 99.860196 records/sec (0.00 MB/sec), 16.42 ms avg latency, 794.00 ms max latency, 3 ms 50th, 28 ms 95th, 481 ms 99th, 794 ms 99.9th.
    
    • 1
    • 2
    • 3

    调整主题

    副本扩容

    kafka-topics.sh  --alter --zookeeper 192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181 --topic test --partitions 3
    kafka-topics.sh --alter --topic topic_name --partitions 5 --bootstrap-server localhost:9092
    
    • 1
    • 2

    分区调整

    创建规则json,注意replicas后面的数字为kafka的broker.id。

    {"version":1,
    "partitions":[
    {"topic":"test","partition":0,"replicas":[0,1,2]},
    {"topic":"test","partition":1,"replicas":[1,2,0]},
    {"topic":"test","partition":2,"replicas":[2,0,1]}
    ]}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    执行

    kafka-reassign-partitions.sh --zookeeper  192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181  --reassignment-json-file 1.json --execute 
    
    • 1

    验证

    kafka-reassign-partitions.sh --zookeeper 192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181 --reassignment-json-file 1.json --verify
    Status of partition reassignment: 
    Reassignment of partition test-0 completed successfully
    Reassignment of partition test-1 completed successfully
    Reassignment of partition test-2 completed successfully
    
    • 1
    • 2
    • 3
    • 4
    • 5

    kafka持续发送消息

    持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:

     kafka-verifiable-producer.sh --broker-list $(hostname -i):9092 --topic test --max-messages 1000000
    
    • 1

    查看消费者主题

    Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中。从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets

    查看消费者主题的相关信息命令如下:

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
    
    • 1

    优雅的关闭kafka

    在配置文件中添加以下配置:

    controlled.shutdown.enable=true
    
    • 1

    kafka在数据迁移期间限制带宽的使用

    #可以改变速率限制多次执行
    kafka-reassign-partitions.sh --zookeeper localhost:2181--execute --reassignment-json-file 1.json —throttle 50000000
    #确认限制被移除
    kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify --reassignment-json-file bigger-cluster.json
    
    • 1
    • 2
    • 3
    • 4

    更多关于kafka的知识分享,请前往博客主页。编写过程中,难免出现差错,敬请指出

  • 相关阅读:
    视频编解码 — 帧内预测
    c# Linq 及其扩展方法的使用
    彻底理解并解决服务器出现大量TIME_WAIT - 第四篇
    [附源码]计算机毕业设计基于Springboot楼盘销售管理系统
    Android SDK 上手指南||第十一章 虚拟与物理设备
    你们天天说的应用性能监控: Sky Walking
    湖南麒麟两种修复硬盘方式
    一个ubuntu系统搭建redis集群
    tarjan2
    Linux上快速安装zookeeper
  • 原文地址:https://blog.csdn.net/margu_168/article/details/132871437