• kafka快速入门


    kafka快速入门

    一、docker安装kafka集群

    # docker直接拉取kafka和zookeeper的镜像
    docker pull wurstmeister/kafka
    docker pull wurstmeister/zookeeper 
    # 首先需要启动zookeeper,如果不先启动,启动kafka没有地方注册消息
    docker run -it --name zookeeper --restart=always -p 12181:2181 -d wurstmeister/zookeeper:latest
    # 启动kafka容器,注意需要启动三台,注意端口的映射,都是映射到9092
    # 第一台
    docker run -dit --name kafka01 --restart=always -p 19092:9092  -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT={自己的ip}:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{自己的ip}:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
    # 第二台
    docker run -dit --name kafka02 --restart=always -p 19093:9092  -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT={自己的ip}:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{自己的ip}:19093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
    # 第三台
    docker run -dit --name kafka03 --restart=always -p 19094:9092  -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT={自己的ip}:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{自己的ip}:19094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
    
    # 可视化工具
    docker run -dit -p --name kafdrop 9000:9000 -e JVM_OPTS="-Xms32M -Xmx64M" -e KAFKA_BROKERCONNECT=1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094 -e SERVER_SERVLET_CONTEXTPATH="/" obsidiandynamics/kafdrop
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    -e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

    -e KAFKA_ZOOKEEPER_CONNECT=192.168.100.31:12181 配置zookeeper管理kafka的路径192.168.100.31:12181

    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.100.31:19092 把kafka的地址端口注册给zookeeper

    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

    上面端口的映射注意都是映射到Kafka的9092端口上!否则将不能够连接!

    二、命令行操作

    查看正在运行的容器:docker ps

    在这里插入图片描述

    进入容器:docker exec -it kafka01 /bin/bash

    在这里插入图片描述

    用命令行对topic进行curd

    进如kafka安装文件夹 cd /opt/kafka/bin

    查看所有topic

    kafka-topics.sh --list --zookeeper 1.14.252.45:12181
    
    • 1

    在这里插入图片描述

    创建topic

    kafka-topics.sh --zookeeper 1.14.252.45:12181 --create --topic first --replication-factor 1 --partitions 3
    
    • 1

    在这里插入图片描述

    查询topic详情

    kafka-topics.sh --zookeeper 1.14.252.45:12181 --describe --topic first
    
    • 1

    在这里插入图片描述

    删除topic

    kafka-topics.sh --zookeeper 1.14.252.45:12181 --delete --topic first
    
    • 1

    发送消息

    kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic

    kafka-console-producer.sh --broker-list 1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094 --topic first
    
    • 1

    在这里插入图片描述

    接受消息

    对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输 出, 默认是消费最新的消息 。使用kafka的消费者消息的客户端,从指定kafka服务器的指定 topic中消费消息

    kafka-console-consumer.sh --bootstrap-server 1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094 --topic first --from-beginning
    
    • 1

    在这里插入图片描述

    在这里插入图片描述

    查看消费者和信息

    # 查看当前主题下有哪些消费组
    ./kafka-consumer-groups.sh --bootstrap-server 1.14.252.45:19092 --list
    # 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
    ./kafka-consumer-groups.sh --bootstrap-server 1.14.252.45:19092 --describe --group testGroup
    
    • 1
    • 2
    • 3
    • 4
    • Currennt-offset: 当前消费组的已消费偏移量
    • Log-end-offset: 主题对应分区消息的结束偏移量(HW)
    • Lag: 当前消费组未消费的消息数
  • 相关阅读:
    浅谈大数据背景下数据库安全保障体系
    第四章 选择结构程序设计
    数据结构与算法 —— DFS的定义与原理
    多种方法实现conda环境迁移
    【数据结构-字符串 四】【字符串识别】字符串转为整数、比较版本号
    springboot校园安全通事件报告小程序-计算机毕业设计源码02445
    流程控制break关键字
    Laravel-admin弹出提示层的三种方法
    LeetCode //C - 201. Bitwise AND of Numbers Range
    图像像素值统计&图像几何形状的绘制&随机数与随机颜色
  • 原文地址:https://blog.csdn.net/weixin_43296313/article/details/125525276