• 大数据之kafka应用


    2024启kafka

    kafka常见命令

    1. 指定jmx端口启动kafka lsof -i :9999

      JMX_PORT=9999 /opt/kafka_2.12-3.1.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties
      
      • 1
    2. 新建topic:副本不能大于kafka_server数

      /opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --partitions 3 --replication-factor 1 --create --topic knowScript
      
      • 1
    3. 查看topic的详情

      /opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --describe --topic knowScript
      
      • 1
    4. 罗列所有的topic

      /opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --list
      
      • 1
    5. 给topic拓展分区:只增大不减小

      /opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --alter --topic knowScript --partitions 4
      
      • 1
    6. 查看topic的磁盘大小,单位为Byte

      /opt/kafka_2.12-3.1.0/bin/kafka-log-dirs.sh --bootstrap-server only:9092 --topic-list knowScript --describe |  grep -oP '(?<=size":)\d+' |  awk '{ sum += $1 } END { print sum }'
      
      • 1
    7. 删除topic

      /opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --topic knowScript --delete
      
      • 1
    8. 查看消费者组列表

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --list
      
      • 1
    9. 查看特定消费者组的消费情况

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId
      
      • 1
    10. 新建一个消费者组:消费一个topic即可

      /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group makeGroup
      
      • 1
    11. 删除一个消费者组

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --delete --group makeGroup
      
      • 1
    12. 往特定topic生产message,不指定key

      /opt/kafka_2.12-3.1.0/bin/kafka-console-producer.sh --bootstrap-server only:9092 --topic knowScript
      
      • 1
    13. 往特定topic生产message,指定key

      /opt/kafka_2.12-3.1.0/bin/kafka-console-producer.sh --bootstrap-server only:9092 --topic knowScript --property parse.key=true --property key.separator=,
      
      • 1
    14. 消费指定topic的message:–from-beginning,默认为最近的消息消费

      /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --from-beginning
      
      • 1
    15. 消费topic的message,限制条数

      /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset earliest --partition 0 --max-messages 3
      
      • 1
    16. 消费指定topic的message:指定分区的offset

      /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset latest --partition 0
      
      /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset earliest --partition 0
      
      /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset 10 --partition 0
      
      • 1
      • 2
      • 3
      • 4
      • 5
    17. 查看kafka的log文件明文内容

      /opt/kafka_2.12-3.1.0/bin/kafka-dump-log.sh --files /opt/kafka_2.12-3.1.0/data/kafka_logs/knowScript-0/00000000000000000000.log  -deep-iteration --print-data-log
      
      • 1
    18. 查看topic的message总数

      1. 每个分区的最小offset

        /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server only:9092 -topic knowScript  --time -2
        
        • 1
      2. 每个分区的最大offset

        /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server only:9092 -topic knowScript  --time -1
        
        • 1
    19. 查看指定消费者组的消费情况offset和lag

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId
      
      • 1
    20. 重设指定消费者组消费指定的offset(每个分区)

      1. 打印结果,没有执行(生产使用这个验证)

        /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset  --dry-run
        
        • 1
      2. 直接执行

        /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset  --execute
        
        • 1
      3. 重设指定消费者组指定分区的指定offset(topic后使用 :$partitionNum来设定指定的partition)

        /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 3 --topic consumerTopicByOffset:1  --execute
        
        • 1
    21. 指定消费者组从最新的offset进行消费(没有指定group,则全部都消费,指定了会从消费者组最新的offset开始消费)

      /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group consumeByOffsetId --from-beginning
      
      • 1
    22. 生产message基准测试; num-records生产的message条数,through为-1表示不限制吞吐量,record-size表示每条record的大小为1024K, producer-props后面跟着kv的producer属性配置

      /opt/kafka_2.12-3.1.0/bin/kafka-producer-perf-test.sh --topic test_producer_perf --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=only:9092 acks=-1 linger.ms=2000 compression.type=lz4
      
      • 1
    23. 消费message基准测试

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-perf-test.sh --broker-list only:9092 --messages 10000000 --topic test_producer_perf
      
      • 1

    生产上重放信息

    1. 查看指定消费者组的消费情况offset

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId
      
      • 1
    2. 重设指定消费者组消费指定的offset(每个分区)

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset  --execute
      
      • 1
      • 打印结果,没有执行

        /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset  --dry-run
        
        • 1
      • other:重设指定消费者组指定分区的指定offset(topic后使用 :partitionNum来设定指定的partition)

        /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 3 --topic consumerTopicByOffset:1  --dry-run
        
        • 1
    3. 指定消费者组从最新的offset进行消费(没有指定group,则全部都消费,指定了会从消费者组最新的offset开始消费)

      /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group consumeByOffsetId --from-beginning
      
      • 1
    • 指定分区指定offset进行开始消费(查看某消费者组在特定的分区丢失了哪些消息)

      /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --offset 10 --partition 0
      
      • 1

    jmx

    jmx的配置和开启

    1. 配置jmx

      JMX_PORT=9999 /opt/kafka_2.12-3.1.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties
      
      • 1
      • 配置文件配置更稳定

    jmx的使用

    jmx例子一
    1. 参考url的MBEAN

      https://docs.confluent.io/platform/current/kafka/monitoring.html#kafka-monitoring-metrics-broker

      1. 找到category类别下面的MBEAN:

        kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic={topicName}
        
        • 1
      2. 获取到对应的MBEAN的指标值

        /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
        
        • 1
        • 输出列有

          "time","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:Count","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:EventType","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:FiveMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:MeanRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:RateUnit"
          
          • 1
        • 寻找上面的name的值,如 FifteenMinuteRate

          /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --date-format "YYYY-MM-dd HH:mm:ss" --attributes FifteenMinuteRate --reporting-interval 5000
          
          • 1
          • –date-format格式化时间 、 --reporting-interval指定更新时间间隔

    jmx例子二

    1. 找到category类别下面的MBEAN

      kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs
      
      • 1
    2. 获取到对应的MBEAN的指标值

      /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
      
      • 1
      1. 输出列有

        "time","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:50thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:75thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:95thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:98thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:999thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:99thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Count","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Max","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Mean","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Min","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:StdDev"
        
        • 1
        • 寻找上面的name的值,如 75thPercentile、 95thPercentile

          /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes 75thPercentile
          
          /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes 95thPercentile
          
          • 1
          • 2
          • 3

    jmx例子三

    1. 找到category类别下面的MBEAN

      kafka.network:type=RequestChannel,name=ResponseQueueSize
      
      • 1
    2. 获取到对应的MBEAN的指标值

      /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.network:type=RequestChannel,name=ResponseQueueSize --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
      
      • 1
      • 输出列有

        • "time","kafka.network:type=RequestChannel,name=ResponseQueueSize:Value"
          
          • 1
      • 寻找上面的name的值,如 Value

        • /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.network:type=RequestChannel,name=ResponseQueueSize --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes Value
          
          • 1

    jmx例子四(special)

    1. 并不是所有的MBEAN都有对应的jmx指标值;例如:kafka.log:type=Log,name=LogEndOffset

    2. 下面的命令会报错

      /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.log:type=Log,name=LogEndOffset --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
      
      • 1
    3. 但是从jconsole localhost:9999的弹窗MBEAN可以找到;上面更深层的信息

      /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.log:type=Log,name=LogEndOffset,topic=flink2kafka,partition=0 --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
      
      • 1
  • 相关阅读:
    【毕业设计】STM32单片机的智能手环 - 蓝牙手环 物联网
    C# 搭建一个简单的WebApi项目23.10.10
    linux系统离线安装docker(分步法&一键法)
    项目实战:Qt+OpenCV大家来找茬(Qt抓图,穿透应用,识别左右图区别,框选区别,微调位置)
    腾讯云服务器安装宝塔面板并快速搭建WordPress个人站点
    Vue零基础实战教程
    MySQL索引和优化
    空间结构是可数的吗?
    【7月5日活动预告】Flink峰会
    【ML】基于机器学习的心脏病预测研究(附代码和数据集,逻辑回归模型)
  • 原文地址:https://blog.csdn.net/liguanhaoyonghu/article/details/137618126