• kafka3.X基本概念和使用


    kafka基本概念和使用


    本文"kafka的概念"部分是在[初谈Kafka][ https://juejin.im/post/5a8e7f296fb9a0635a6573e9]的基础上重新整理而成,看官自行选择阅读…

    kafka的概念

    kafka是一个分布式的、可区分的、可复制的、基于发布、订阅的**“消息系统”**。

    主要应用领域:大数据,当然,在分布式的系统中也有应用。

    市面流行的消息队列RocketMQ就是借鉴kafka原理,java开发得到的,可通过文章[RocketMQ原理&最佳实践][https://www.jianshu.com/p/2838890f3284]了解

    kafka优势:适合离线、在线的消息消费

    • 消息保存位置:磁盘;
    • Topic为单位,进行消息归纳;
    • Producer:向 Topic 发送(Push)消息;
    • Cunsumer:消费(Pull)Topic里的消息。
    基本概念

    基本概念对于理解kafka本身,以及基于kafka原理的消息队列的理解,非常重要,理解基本概念之后,对于kafka的上手,以及使用,以及后续消息队列的使用,会更加得心应手。

    • Topic

    适用于存储消息的逻辑概念,1个Topic,可以看做是消息的集合。

    这个消息的集合(Topic),可以接收多个生产者(Producer)推送(Push)过来的消息,也可以让多个消费者(Consumer)从中消费(Pull)消息。

    【这里的消费,是pull之后不保留在集合中,还是pull之后依然保留,待思考或者自行查找资料,目前我的理解是pull之后依然保留,这样多个消费者均可获取到同一条消息,同理,push应该可以push相同的消息,至于是否正确,我们慢慢看看吧】

    • 分区(Partition)

    分区的概念,可以理解为Topic的子集:1个Topic可能只有1个分区,也可能有多个分区。这里说的多个分区,一个分区就代表磁盘的一块连续的位置,不同的分区也就是磁盘上不同的区域块。

    分区存在的意义:

    通过不同的区域块,kafka存储在Topic里的消息就可以在多个地方存储,也就是对kafka进行了水平扩展,这样可以增加kafka的并行处理能力。

    对于不同区域块存储消息的理解:

    同一个Topic下,多个区域块,这些不同的区域块,存储的消息是不同的,也就是说,A、B两个区域块,不会同时存储1类消息。

    对于同一个区域内的消息:

    在区域块A接收到消息x的时候,该消息x会被接收它的区域块A分配一个offset(代表它在这个区域块中的唯一编号),这个offset使得kafka确定了这条消息x在这个区域块A内的顺序。
    要注意的是:这个顺序仅适用于该区域块A,对于另一区域块B内另一条消息y,x和y的顺序是不能确定的。

    Topic AND Partition

    • Log

    分区在逻辑上,对应一个Log,当生产者将消息写入分区的时候,实际上就是写入一个log。

    Log:一个逻辑概念,对应着磁盘上的一个文件夹。

    Log的组成:由多个Segment组成,每1个Segment对应1个日志文件和一个索引文件。

    • Broker

    说完了Topic,里面提到区域块A会给它接收的消息x分配一个offset,并且x会保存到A所在的磁盘区域上。而这个功能,就是由Broker完成的。

    Broker:1个Broker就是一个单独的kafka server。

    Broker的主要工作:接收生产者发送来的消息,分配offset,然后将包装过的数据保存到磁盘上。

    Broker的其他作用:接收消费者Consumer和其他Broker的请求,根据请求的类型进行相应的处理然后返回响应。

    这里引出集群(Cluster)的概念,1个Cluster是由多个Broker构成的,也就是说,1个Broker不会对外提供服务,而是通过Cluster的形式对外提供服务:

    因为,一个Cluster里,需要1个Broker担任Controller,这个Broker就是这个集群的指挥中心,负责:

    • 管理各个分区(Partition)的状态;
    • 管理每个分区(Partition)的副本【这个副本暂时没找到出处】的状态;
    • 监听zookeeper的数据变化。

    其他的Broker均是通过这个Controller进行指挥的,完成各自相应的功能。

    关于Cluster的一主多从实现:

    除了担任Controller的Broker会监听其他Broker的状态,其他Broker也会监听Controller的状态,当Controller出翔了故障,就会重新选取新的Broker担任Controller

    • 消息

    Kafka中最基本的消息单元。有一串字节组成,主要由key和value构成(即key、value都是字节数组)

    • key:主要作用是 根据一定策略,将这个消息路由到制定分区中 ==> 这样就保障了,包含同一个key的消息全部写入一个分区A,不会写入另外一个分区。(即实现了Partition分区中提到的“不同分区存储的消息是不一样的”)
    • 副本

    Kafkah会对消息进行冗余备份,每一个分区Partition,都可以有多个副本(每一个副本包含的消息是相同的,但是不能保证同一时刻下完全相同)。

    副本类型:Leader、Follower。

    副本选举策略(即选举1个Leader,其余为Follower):

    • 当分区只有1个副本,这个副本就是Leader,没有Follower。
    • 在其他不同场景,会采取不同的选举策略。
    • Leader:处理Kafka中所有的读写请求
    • Follower:仅仅把数据从Leader中拉取到本地,同步更新到自己的Log中。

    Broker-leader-follower

    • ISR集合

    ISR集合:表示目前可用(alive)、且消息量与Leader相差不多的Follower集合,即ISR是整个副本集合的一个子集。

    ISR集合中Follower:

    • 所在的节点都和ZooKeeper保持着连接;
    • 最后1条消息的offset和Leader中的最后一条消息的offset,差值不能超过指定的阈值

    每一个分区Partition上的Leader,都会维护这个Partition的ISR集合。(也就是说,ISR集合内的Follower,都在同一个分区上,归属同一个Leader,那么上面的说法就说得通了)。


    根据上面的说法,这个分区上的副本Leader,在进行了消息的写请求之后,副本Follower就会从Leader上拉取Leader写入的消息,同步到自己对应的Log中,这个过程也就说明同一时刻,Follower中的消息数量少于Leader的消息数量,只要这个差值少于指定的阈值(这个就说明了消息量与Leader差不多这一特征),那么这些Follower的集合就是ISR集合。

    • 生产者(Producer)**

    产生消息的对象,产生消息之后,将消息按照一定的规则推送到Topic的分区中

    • 消费者(Consumer)

    从Topic中拉取消息,并对消息进行消费

    • Consumer:有一个作用,维护它消费到分区(Partition)上的什么位置(即offset的值)。
    • Consumer Group: 在kafka中,多个Consumer可以组成1个Consumer Group,1个Consumer只属于1个Consumer Group.

    Consumer Group的作用:保证了这个Consumer Group订阅的Topic(Partition的集合)中的每一个分区(Partition),只被Consumer Group中的一个Consumer处理。

    当然,如果要实现消息的广播消费,则将同1条消息放在多个不同的Consumer Group中即可。

    就上述这个Consumer和Partition的关系可以理解下面的说法:

    通过向Consumer Group中动态添加适量的Consumer, 可以触发kafka的Rebalance操作(重新分配Partition和Consumer的一一对应关系,结合Topic部分的理解,这样就实现了kafka的水平扩展能力)

    Kafka的使用
    首先kafka的安装
    1. Kafka安装需要的环境准备:需要Java环境,centos 7自带java1.6版本,可以安装,如果觉得jdk太旧,可以安装新的jdk,比如1.8版本的https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
    2. 安装包下载地址:http://kafka.apache.org/downloads.html
    3. 下载好kafka安装包之后,解压到/usr/local目录(Linux,windows可自行决定),删除压缩包
    4. 进入kafka目录进行操作
    5. 如果想搭建集群,可以参考一下文章[Kafka集群搭建的详细步骤][https://blog.csdn.net/zxy987872674/article/details/72466504],我们在文章后面也有列举到搭建集群的步骤方法
    6. 注意kafka里的配置文件,应该在kafka文件夹下的config或conf文件夹下,叫server.properties.关于kafka配置的说明,可以参看文章[apache kafka系列之server.properties配置文件参数说明][https://blog.csdn.net/lizhitao/article/details/25667831]
    kafka的简单实用和理解

    我们抛开kakfa集群搭建什么的,先来简单使用,帮助理解即可。

    Linux版本


    进入kafka目录下的bin目录

    启动zk

    nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &
    或
    nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
    
    • 1
    • 2
    • 3

    nohup(不挂断地运行命令,可以自己百度一下看看,就是nohup 命令 &,让命令在后台执行)

    启动kafka

    nohup ./kafka-server-start.sh ../config/server.properties &
    或
    nohup ./bin/kafka-server-start.sh ./config/server.properties &
    
     
     
    • 1
    • 2
    • 3

    创建topic

    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    或
    ./kafka-topics.sh --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2 --create
    
     
     
    • 1
    • 2
    • 3

    列出topic

    ./kafka-topics.sh --list --zookeeper localhost:2181
    或
    ./kafka-topic --zookeeper localhost:2181 --list
    
     
     
    • 1
    • 2
    • 3

    启动生产者并发送消息

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic zyl
    输入消息:
    nihao
    china
    
     
     
    • 1
    • 2
    • 3
    • 4

    另外开个终端,启动消费者接受消息

    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic zyl --from-beginning
    可以接收到:
    nihao
    china
    
     
     
    • 1
    • 2
    • 3
    • 4

    另外开个终端,可以看到有4个进程(分别为zk、kafka、控制台消费者、控制台生产者)

    [root[@aa](http://my.oschina.net/test2) bin]# jps | grep -v Jps
    9802 QuorumPeerMain
    10392 Kafka
    10956 ConsoleConsumer
    10887 ConsoleProducer
    
     
     
    • 1
    • 2
    • 3
    • 4
    • 5
    搭建集群(3个节点)

    拷贝并修改配置文件

    cp server.properties server1.properties
    cp server.properties server2.properties
    cp server.properties server3.properties
    
    • 1
    • 2
    • 3

    nohup ./kafka-server-start.sh …/config/server1.properties &
    nohup ./kafka-server-start.sh …/config/server2.properties &
    nohup ./kafka-server-start.sh …/config/server3.properties &

    [root@aa bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic(注意,指定的是3)

    Created topic “my-replicated-topic”.

    [root@aa bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
    Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0

    [root@aa bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 4 --partitions 1 --topic my-replicated-topic
    Error while executing topic command Topic “my-replicated-topic” already exists.
    kafka.common.TopicExistsException: Topic “my-replicated-topic” already exists.
    at kafka.admin.AdminUtils . c r e a t e O r U p d a t e T o p i c P a r t i t i o n A s s i g n m e n t P a t h I n Z K ( A d m i n U t i l s . s c a l a : 171 ) a t k a f k a . a d m i n . A d m i n U t i l s .createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:171) at kafka.admin.AdminUtils .createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:171)atkafka.admin.AdminUtils.createTopic(AdminUtils.scala:156)
    at kafka.admin.TopicCommand . c r e a t e T o p i c ( T o p i c C o m m a n d . s c a l a : 86 ) a t k a f k a . a d m i n . T o p i c C o m m a n d .createTopic(TopicCommand.scala:86) at kafka.admin.TopicCommand .createTopic(TopicCommand.scala:86)atkafka.admin.TopicCommand.main(TopicCommand.scala:50)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)

    [root@aa bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 4 --partitions 1 --topic my-replicated-topic2
    Created topic “my-replicated-topic2”.

    [root@aa bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic2
    Topic:my-replicated-topic2 PartitionCount:1 ReplicationFactor:4 Configs:
    Topic: my-replicated-topic2 Partition: 0 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0

    [root@aa bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic zyl
    Topic:zyl PartitionCount:1 ReplicationFactor:1 Configs:
    Topic: zyl Partition: 0 Leader: 0 Replicas: 0 Isr: 0

    启动生产者并发送消息

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    
     
     
    • 1

    另外开个终端,启动消费者接受消息

    ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
    
     
     
    • 1

    另外开个终端,可以看到有9个进程(分别为1个zk、4个kafka、2个控制台消费者、2个控制台生产者)

      [root@aa ~]# jps | grep -v Jps
      9802 QuorumPeerMain
      10392 Kafka
      11547 Kafka
      12007 ConsoleProducer
      10956 ConsoleConsumer
      10887 ConsoleProducer
      11469 Kafka
      12054 ConsoleConsumer
      11710 Kafka
    
     
     
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    查看单独的进程:

      ps -ef | grep server.properties | grep -v grep
      ps -ef | grep "server1.properties" | grep -v grep
    
     
     
    • 1
    • 2

    测试leader

    杀掉进程,然后–describe查看

      [root@aaa bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
      Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
      Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
      可以看到leader是2.
    
    • 1
    • 2
    • 3
    • 4

    [root@aa bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic2
    Topic:my-replicated-topic2 PartitionCount:1 ReplicationFactor:4 Configs:
    Topic: my-replicated-topic2 Partition: 0 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
    可以看到leader是2.

    [root@aa bin]# ps -ef | grep server1.properties | grep -v grep

    杀掉某个kafka进程

      kill -9 11469
    
    • 1

    ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic2
    Topic:my-replicated-topic2 PartitionCount:1 ReplicationFactor:4 Configs:
    Topic: my-replicated-topic2 Partition: 0 Leader: 2 Replicas: 1,2,3,0 Isr: 2,3,0

    [root@aa bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
    Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0

    可以看到my-replicated-topic2的leader变成了2。

    问题:
    1.zookeeper-shell.sh如何使用
    2.能不能通过kafka-server-stop.sh停止某个kafka呢

    Command must include exactly one action: --list, --describe, --create or --alter

    OptionDescription
    –alterAlter the configuration for the topic.
    –config A topic configuration override for the topic being created or altered.
    –createCreate a new topic.
    –deleteConfig A topic configuration override to be removed for an existing topic
    –describeList details for the given topics.
    –helpPrint usage information.
    –listList all available topics.
    –partitions The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    –replica-assignment A list of manual partition-to-broker assignments for the topic being created or altered.
    –replication-factor The replication factor for each partition in the topic being created.
    –topic The topic to be create, alter or describe. Can also accept a regular expression except for --create option
    –topics-with-overridesif set when describing topics, only show topics that have overridden configs
    –unavailable-partitionsif set when describing topics, only show partitions whose leader is not available
    –under-replicated-partitionsif set when describing topics, only show under replicated partitions
    –zookeeper REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
    windows版本

    环境搭建
    1. jdk环境,上文已经提到过,并配置环境变量,这里不讨论

    2. 安装zookeeper,这是运行kafka之前需要的环境(但是可以直接去第三步,因为,kafka有自带的zookeeper服务)
      2.1. 到http://mirror.bit.edu.cn/apache/zookeeper/下载一个版本的zookeeper即可
      2.2. 解压到一个文件夹下,可自己选择
      2.3. 将解压后的文件夹里,conf文件夹下zoo_sample.cfg 改名为 zoo.cfg
      2.4. 修改(如果没有则新增设置)
      dataDir和dataLogDir路径dataDir=D:/Program Files/zookeeper-3.4.13/data/logs
      dataLogDir=D:/Program Files/zookeeper-3.4.13/data/logs
      zoo.cfg配置
      2.5. 设置环境变量:
      新增ZOOKEEPERHOME:D:\Program Files\zookeeper-3.4.13
      Path添加:%ZOOKEEPERHOME%\bin;
      2.6. 检查安装成功与否:

      打开cmd窗口,输入zkserver,结果应该如下,则安装以及配置环境变量成功:
      
         
         
      • 1

    zkserver

    1. 安装kafka

      3.1 到http://kafka.apache.org/downloads.html下载

      3.2 解压到自己的文件夹路径(如D:\Program Files\kafka_2.11-1.1.0)

      3.3.1 修改配置文件在config文件夹下,D:\Program Files\kafka_2.11-1.1.0\server.properties

      3.3.2 由于kafka有自带的zookeeper服务,所以如果跳过第2步,可以直接使用自带zk服务,记得配置对应的配置文件/config/zookeeper.properties

      3.4.1 修改server.properties里的log.dirs:
      server.properties

      3.4.2 修改zookeeper.properties
      zookeeper.properties

      3.5 注意观察,bin目录下有一个window文件夹,windows文件夹下的文件内容和bin下的是一样的,只是文件后缀不同,bin目录下的.sh文件需要用shell执行,bat则通过cmd窗口即可执行,当然bat文件通过shell也是可以执行的,至于两者区别,就自行百度了。调出shell窗口的方法:按住shift键+在文件夹空白处单机鼠标右键,就可以看到“在此处打开powershell窗口"的选项。

      3.6 在bin目录下启动powerShell,然后入去一下命令,启动kafka:

      如果是先装了zookeeper,则可以直接执行:
      ./windows/kafka-server-start.bat "D:/Program Files/kafka_2.11-1.1.0/config/ser
      ver.properties"[注意:bat 和 "D:/" 之间有一个空格]
      (这里,由于路径名Programe Files里包含空格,如果不用双引号引用后面的参数,会报错)
      当然,也可以到bin目录的上一层,也就是kafka_2.11-1.1.0目录下执行下面的语句,效果一样:
      ./bin/windows/kafka-server-start.bat ./config/ser
      ver.properties"[注意:bat 和 "./config" 之间有一个空格]
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

    如果没有装zookeeper,而是使用自带的,则先执行:
    ./windows/zookeeper-server-start.bat “D:/Program Files/kafka_2.11-1.1.0/config/zookeeper.properties”

    启动自带zookeeper时,可以看到:

    启动zoopkeeper

    然后重新打开一个窗口,开启kafka服务(./windows/kafka-server-start.bat “D:/Program Files/kafka_2.11-1.1.0/config/ser
    ver.properties”)时,可以看到:
    启动kafka-server-start
    说明成功了。

    • kafka的简单使用:

      4.1 创建一个名字为demo的topic,指定其分区数目为1,副本工厂数目为1:

      到bin/window目录下执行:
      
        
        
      • 1
      ./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
      输出:Created topic "demo".
      
        
        
      • 1
      • 2
    • create topic

      4.2 查看所有的topics:

      ./kafka-topics.bat --list --zookeeper localhost:2181
      
       
       
      • 1

      look up list

      如果想删除topics, 可以参考文章:kafka如何彻底删除topic及数据

      4.3 向指定topic发送消息,首先打开2个新的powerShell或cmd窗口,一个生产者producer,一个消费者Consumer:

      设置生产者,设置端口号,并指定topic:
      ./kafka-console-producer.bat --broker-list localhost:9092 --topic demo
      设置消费者,设置从哪一个端口号(2181)、哪个topic(demo)、什么位置(beginning)开始取出消息:
      ./kafka-console-consumer.bat --zookeeper localhost:2181 --topic demo --from-beginning
      
       
       
      • 1
      • 2
      • 3
      • 4

      在producer窗口里输入消息,就可以在对应的Consumer窗口看到对应的消息

      producer and consumer

      好了,大概就这些,应该可以让各位对kafka的概念和基本使用有了一个不错的理解,哈哈

  • 相关阅读:
    384.打乱数组
    QT Object定时器使用
    智能语音机器人系统和电话系统的区别是什么
    认识操作系统 | 理解管理 | 系统调用(System Call)
    软件工程第三周
    数学建模国赛C蔬菜类商品的自动定价与补货决策C
    语法基础(变量、输入输出、表达式与顺序语句)
    看完 2022 雷军年度演讲,我总结了我的故事
    Git(SourceTree)变基操作使用
    多卫星定位算法
  • 原文地址:https://blog.csdn.net/zhangjunli/article/details/134036694