• Kafka:安装与简单使用


    下载安装

    下载地址:kafka-download

    windows安装

    下载完后,找一个目录解压。

    解压完成后,看看目录结构

    对于linux来讲,所有的启动文件都放在bin目录下,那一堆sh文件;

    对于windows来讲,所有的启动文件放在bin\windows目录下,一堆bat文件。

    需要到config目录下,找到server.properties文件,指定log目录:

    log.dirs=D:\software\Kafka-2.6.3\kafka-logs
    
    • 1

    指定在kafka安装目录就可以。

    目录结构

    bin/
    config/
    libs/
    licenses/
    logs/
    site-docs/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    bin目录下放了一堆Kafka的启动文件,包括kafka启动文件、zookeeper启动文件。bin\windows目录下甚至还有一套给windows用的启动文件。

    config目录下有一堆配置文件,包括kafka服务器的配置文件server.properties,以及ZooKeeper的配置文件zookeeper.properties等。

    启动服务器

    Kafka使用ZooKeeper,因此如果还没有ZooKeeper服务器,需要先启动一个。

    Kafka的那一堆打包脚本里提供了一个简单的单节点ZooKeeper实例,cmd内调用:

    D:\software\Kafka-2.6.3>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
    
    • 1

    Zookeeper启动后,再启动Kafka服务器:

    D:\software\Kafka-2.6.3>.\bin\windows\kafka-server-start.bat .\config\server.properties
    
    • 1

    到这里,Kafka服务器启动成功。

    创建主题

    创建一个主题,名为test,设定只有一个分区,保留一个副本。

    bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    • 1

    如果想查看当前的主题列表:

    bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
    
    • 1

    除了手动创建主题以外,还可以将代理配置一下,设置成在发布不存在的主题时自动创建主题。

    发送一些消息

    Kafka带有一个命令行客户端,该客户端将从文件或者标准输入中获取输入,并将其作为消息发送到Kafka集群。

    默认情况下,每行将作为单独的消息发送。

    运行生产者,在控制台中输入一些消息以发送到服务器:

    bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
    >this is a message
    >this is the other message
    >
    
    • 1
    • 2
    • 3
    • 4

    这时候用kafka tool查看就可以看到:

    在这里插入图片描述

    启动消费者

    Kafka还有一个消费者命令行,可以将消息转存到标准输出:

    bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
    
    • 1

    如图:

    在这里插入图片描述

    如果在不同的终端运行着生产者和消费者的命令行,那么现在应该能够在生产者终端中输入消息,并看到它们出现在消费者终端中。

    所有命令行工具都有附加选项。之后可以记录查看。

    设置多代理集群

    在之前,我们的Kafka集群只有一个代理。虽然启动更多的代理并没有什么太大的作用,但是我们这里可以先感受一下(在本地机器上)将集群扩展为3个节点的方法:

    首先我们需要为每个broker都创建一个配置文件(windows上用copy指令):

    copy config\server.properties config\server-1.properties
    
    copy config\server.properties config\server-2.properties
    
    • 1
    • 2
    • 3

    然后分别编辑这几个配置文件,主要修改以下几个属性即可:

    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dirs=/tmp/kafka-logs-1
    
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dirs=/tmp/kafka-logs-2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    默认的server.properties,broker.id=0,listener默认应该是9092。

    broker.id,这个属性是每个代理的唯一名称,要保证不重复。

    这里重写端口和日志目录,是因为我们需要在同一台机器上运行它们,所以要避免不同代理使用同一端口的情况。

    之前我们已经启动了ZooKeeper和一个单节点,现在我们启用两个新来的节点:

    .\bin\windows\kafka-server-start.bat .\config\server-1.properties &
    
    .\bin\windows\kafka-server-start.bat .\config\server-2.properties
    
    • 1
    • 2
    • 3

    启动之后,在kafka tool中可以看到:

    在这里插入图片描述

    broker已经变成3个了。

    现在再创建一个复制因子为3的新主题:

    bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    
    • 1

    既然我们现在有了一个集群,那我们怎么知道集群下每个代理都在做些什么?

    使用describe指令:

    bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
    
    输出:
    Topic: my-replicated-topic      PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824
            Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    第一行给出了所有分区的摘要,之后每附加一行就给出一个分区的信息。

    当前主题只有一个分区,所以只有一行。

    • leader:负责给定分区所有读取和写入的节点。每个节点都可能成为领导者;
    • replicas:副本,哪些节点复制了此分区的日志;
    • Isr:一组同步副本,是replicas的子集,当前存活,且进度跟leader一样;

    在我们的示例中,my-replicated-topic 只有一个分区,分区下只有一个leader节点。

    如果想在新主题下发布或者查看消息的话,跟之前一样,改改topic名就可以,比如说发布:

    bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic my-replicated-topic
    
    • 1

    如果我们这时候把leader节点关掉,也并不会影响整个集群的时候,kafka将在现有的可用节点里选择一个作为leader,原先的leader也将不会在同步副本集中。

    2022-4-13 17:47:20 以上测试失败,集群崩掉了。。

    常见问题

    在kafka安装目录下,输入命令:

    .\bin\windows\kafka-server-start.bat .\config\server.properties
    
    • 1

    结果报错:

    输入行太长
    语法错误
    
    • 1
    • 2

    疑似原因是kafka目录建的太深了,原先我的kafka根目录位于磁盘下的四级目录,直接放在磁盘目录下就好了。

    很神奇的问题,据说就是文件夹树的深度太深了。

    工具

    kafka tool

    可视化查看kafka topic内容的工具

    下载地址:https://www.kafkatool.com/download.html

    安装后,双击.exe,启动工具就可以。

    右键add 新链接,填写基本信息:

    在这里插入图片描述

    在这里插入图片描述

    bootstrap servers多是限制只能用域名来访问,所以需要自己设置hosts。

    C:\Windows\System32\drivers\etc 下的 hosts 文件中,添加入 kafka 的集群域名, 例如:

    1xx.1xx.2xx.107 kafka1
    1xx.1xx.2xx.108 kafka2
    
    • 1
    • 2

    kafka tool支持的功能有很多,还可以模拟发送message,功能很丰富。

    详细的功能,可以参照参考文献1。

    常用指令

    topic

    查看topic

    打印当前所有topic

    kafka-topics.bat --bootstrap-server localhost:9092 --list
    
    • 1

    删除topic

    2022-4-26 13:53:21 第一种方式好像不行,删完之后直接kafka打不开了。

    2022-4-26 14:13:42 windows端删除topic就是有问题,每次删除必崩溃。我已经不敢动了。

    彻底删除topic有两种方式:

    1. 删除kafka存储目录下相关topic目录。(即server.properties中对log.dirs的设置);
    2. 如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过zookeeper-client 删除掉broker下的topic即可。
    ./bin/kafka-topics  --delete --zookeeper 【zookeeper server】  --topic 【topic name】
    
     如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
    
     你可以通过命令:./bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic
    
    
    
     此时你若想真正删除它,可以如下操作:
    
     (1)登录zookeeper客户端:命令:./bin/zookeeper-client
    
     (2)找到topic所在的目录:ls /brokers/topics
    
     (3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。
    
    
    
    另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,
    
    如果你删除了此处的topic,那么marked for deletion 标记消失
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    常见问题

    The Cluster ID doesn’t match stored clusterId

    去日志目录里把meta.properties删掉就好了,但是删完了之后,我的topic在kafka tool里都看不到了。。。。

    参考文献

    1. Kafka 可视化工具
    2. kafka中文文档 不过是1.0的
    3. kafka官方文档 版本可选
    4. windows系统下启动kafka CMD报错:输入行太长,语法错误
    5. windows下安装kafka教程 这个没有按顺序写,第一次看没有注意到要启动ZooKeeper。。。吃大亏。
    6. kafka删除topic
    7. kafka API文档 很不错,灰常重要,关于kafka producer和consummer怎么用,就是从这里学到的。
  • 相关阅读:
    面试复习题——底层
    SpringBoot中使用注解方式拦截恶意访问的IP
    JVM如何卸载一个类
    基于微调技术打造自己的私有LLM大模型
    MySQL导入导出视图
    实验室管理系统平台在实验室规范运作中的应用
    【开发工具】git服务器端安装部署+客户端配置
    基于Spring Boot的宠物猫店管理系统的设计与实现毕业设计源码140909
    LeetCode #94.二叉树的中序遍历
    深度学习实战06-循环神经网络(RNN)实现股票预测
  • 原文地址:https://blog.csdn.net/wlh2220133699/article/details/133430407