• Kafka 实战使用、单机搭建、集群搭建、Kraft集群搭建


    实验环境

    准备三台虚拟机

    三台机器均预装CentOS7 操作系统。分别配置机器名 worker1,worker2,worker3。

    vi /etc/hosts
    
    192.168.75.61 worker1
    192.168.75.62 worker2
    192.168.75.63 worker3
    
    
    firewall-cmd --state   查看防火墙状态
    systemctl stop firewalld.service   关闭防火墙
    systemctl disable firewalld 禁止开机自启
    



    都需要安装jdk

    [root@localhost ~]# mkdir -p /usr/local/soft/jdk
    
    [root@localhost ~]# tar -zxvf jdk-8u201-linux-x64.tar.gz -C /usr/local/soft/jdk
    
    [root@localhost ~]# vim /etc/profile
    export JAVA_HOME=/usr/local/soft/jdk/jdk1.8.0_201
    export JRE_HOME=/usr/local/soft/jdk/jdk1.8.0_201/jre
    export CLASS_PATH=.:$JAVA_HOME/lib:$JRE_HOME/lib
    export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
    
    [root@localhost jdk1.8.0_201]# source /etc/profile
    [root@localhost jdk1.8.0_201]# java -version
    



    下载kafka3.4.0

    前面的2.13是开发kafka的scala语言的版本,后面的3.4.0是kafka应用的版本。

    Scala是一种运行于JVM虚拟机之上的语言。在运行时,只需要安装JDK就可以了,选哪个Scala版本没有区别。但是如果要调试源码,就必须选择对应的Scala版本。因为Scala语言的版本并不是向后兼容的。

    另外,在选择kafka版本时,建议先去kafka的官网看下发布日志,了解一下各个版本的特性。 https://kafka.apache.org/downloads。




    在这里插入图片描述



    下载Zookeeper 3.6.2版本

    在这里插入图片描述



    kafka的安装程序中自带了Zookeeper,可以在kafka的安装包的libs目录下查看到zookeeper的客户端jar包。但是,通常情况下,为了让应用更好维护,我们会使用单独部署的Zookeeper,而不使用kafka自带的Zookeeper。

    下载完成后,将这两个工具包上传到三台服务器上,解压后,分别放到/usr/local/soft/kafka和/usr/local/soft/zookeeper目录下。最好是并将部署目录下的bin目录路径配置到path环境变量中。

    单机服务

    启动

    解压kafka

    [root@localhost ~]# mkdir /usr/local/soft/kafka/
    
    [root@localhost ~]# tar -zxvf kafka_2.13-3.4.0.tgz -C /usr/local/soft/kafka/
    
    # 最好是设置一下环境变量
    [root@worker1 ~]# vi ./.bash_profile 
    export KAFKA_HOME=/usr/local/soft/kafka/kafka_2.13-3.4.0
    export PATH=$PATH:$KAFKA_HOME/bin
    [root@worker1 ~]# source ./.bash_profile
    
    [root@localhost ~]# cd /usr/local/soft/kafka/kafka_2.13-3.4.0
    

    直接执行

    # 启动kafka自带的zookeeper服务
    # 从nohup.out中可以看到zookeeper默认会在2181端口启动。通过jps指令看到一个QuorumPeerMain进程,确定服务启动成功。
    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    
    # 启动kafka服务
    # 也可以加上-daemon表示后台启动    bin/kafka-server-start.sh -daemon config/server.properties
    # 服务会默认在9092端口启动。
    nohup bin/kafka-server-start.sh config/server.properties &
    
    [root@localhost kafka_2.13-3.4.0]# jps
    2402 Kafka
    2850 Jps
    2044 QuorumPeerMain
    



    停止服务

    [root@worker1 ~]# kafka-server-stop.sh 
    [root@worker1 ~]# zookeeper-server-stop.sh 
    [root@worker1 ~]# jps
    21269 Jps
    



    简单收发消息

    Kafka的基础工作机制是消息发送者可以将消息发送到kafka上指定的topic,而消息消费者,可以从指定的topic上消费消息。

    在这里插入图片描述



    首先,可以使用Kafka提供的客户端脚本创建Topic

    #创建Topic
    [root@localhost kafka_2.13-3.4.0]# bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
    Created topic test.
    
    #查看Topic   --describe参数
    [root@localhost kafka_2.13-3.4.0]# bin/kafka-topics.sh --topic test --describe --bootstrap-server localhost:9092
    Topic: test	TopicId: XSQWNCb3SbGbDEleLlBvtQ	PartitionCount: 1	ReplicationFactor: 1	Configs: 
    	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
    



    然后,启动一个消息发送者端。往一个名为test的Topic发送消息。

    当命令行出现 > 符号后,随意输入一些字符。Ctrl+C 退出命令行。这样就完成了往kafka发消息的操作。

    [root@localhost kafka_2.13-3.4.0]# bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
    >1
    >2
    >3
    >4
    >
    

    如果不提前创建Topic,那么在第一次往一个之前不存在的Topic发送消息时,消息也能正常发送,只是会抛出LEADER_NOT_AVAILABLE警告。

    [oper@worker1 kafka_2.13-3.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    >123
    12[2021-03-05 14:00:23,347] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
    3[2021-03-05 14:00:23,479] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
    
    [2021-03-05 14:00:23,589] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
    >>123
    

    这是因为Broker端在创建完主题后,会显示通知Clients端LEADER_NOT_AVAILABLE异常。Clients端接收到异常后,就会主动去更新元数据,获取新创建的主题信息。



    然后启动一个消息消费端,从名为test的Topic上接收消息。

    [root@localhost kafka_2.13-3.4.0]# bin/kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092
    
    

    在这里插入图片描述



    其他消费模式

    指定消费进度

    上方消费者只能接收到我新发送的5和6这两条消息,之前发送的4条消息就没有接收到。

    如果想要消费之前发送的消息,可以通过添加--from-beginning参数指定。

    [root@worker1 kafka_2.13-3.4.0]# bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
    1
    2
    3
    4
    5
    6
    



    如果需要更精确的消费消息,甚至可以指定从哪一条消息开始消费。

    这表示从第0号Partition上的第5条消息开始读起。

    bin/kafka-console-consumer.sh --topic test --partition 0 --offset 4 --bootstrap-server localhost:9092
    5
    6
    

    topic是逻辑上的概念,一个topic可以有多个partition,消息真正是保存在partition中的,可以理解为partition就是真实的queue。

    消息生成者就只管指定某个topic,往topic发送消息即可,kafka内部会把消息分发到该topic内的partition中,如果存在多个partition就会有相应的负载均衡策略



    分组消费

    一个partition中的消息,只会被一个消费者组中的某一个消费者消费。和RocketMQ一样。不同的消费者组可以对消息有不同的处理逻辑,而同一个消费者组的目的是减少消息消费压力,做水平扩容的

    kafka-console-consumer.sh脚本中,可以通过--consumer-property group.id=testGroup来指定所属的消费者组

    # --consumer-property可以指定多个key-value  其中group.id只是其中一个
    kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092
    



    我们现在可以启动三个消费者,来验证一下分组消费机制:

    # 同一个消费者组
    kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092
    kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092
    # 另一个消费者组
    kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup2 --bootstrap-server localhost:9092
    

    在这里插入图片描述



    查看消费者组的偏移量

    接下来,还可以使用kafka-consumer-groups.sh观测消费者组的情况。包括他们的消费进度。

    # 添加-group 和 --describe两个参数
    [root@worker1 ~]# kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092
    
    Consumer group 'testGroup' has no active members. # 当前消费者组无消费者,这是因为我此时停止了所有的消费者进程
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG         CONSUMER-ID     HOST            CLIENT-ID
    testGroup       test            0          8               8               0            -               -               -
    
    • GROUP表示哪一个消费者组
    • TOPIC是topic名
    • PARTITION 一个topic可以存在多个partition,则标识是第0号partition
    • CURRENT-OFFSET表示当前消费者组消费到了topic为test、第0号partition的消费消息offset偏移量
    • LOG-END-OFFSET 表示生产者往当前partition中生产的消息总数
    • LAG 表示 还未消费的数量

    比如我此时使用消息生产者再发送几条消息后再查看

    # 生产三条消息
    [root@worker1 ~]# kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
    >1  
    >2
    >3
    >
    
    # 此时 LOG-END-OFFSET就变为了11   而LAG就变为了3
    [root@worker1 ~]# kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092
    
    Consumer group 'testGroup' has no active members.
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG         CONSUMER-ID     HOST            CLIENT-ID
    testGroup       test            0          8               11              3               -               -               -
    

    虽然业务上是通过Topic来分发消息的,但是实际上,消息是保存在Partition这样一个数据结构上的。



    理解Kakfa的消息传递机制

    kafka的消息生产者和消息消费者是通过topic这样一个逻辑概念来进行业务沟通的,单实际上所有的消息是存在Partition这样一个数据结构中的

    在这里插入图片描述



    概念:

    • 客户端client:消息生产者和消息消费者都是Kafka的Client

    • 消费者组:每个消费者可以指定一个所属的消费者组,每一条消息会被多个感兴趣的消费者组消费,但一条消息只会被一个消费者组中的一个消费者消费。

    • 服务端Broker:一个Kafka服务就是一个Broker

    • 话题Topic:逻辑概念,一个Topic被认为是业务含义相同的一组消息。客户端通过绑定Topic来生产或消费自己感兴趣的话题

    • 分区Partition:Topic是逻辑概念,而Partition是实际存储消息的组件。每一个Partition就是一个queue。



    集群服务

    为什么要使用集群

    • 创建多个partition,多个partition分布在不同的broker上,减少一个broker的压力
    • 为每个partition准备多个备份,保证数据不丢失。多个partition会尽可能的分布在不同的broker上。多个partition是通过zookeeper来选举出一个Leader 处理Client的读写请求,其他备份partition就是Follower角色,只做数据备份和参与重新选举Leader

    集群环境下就不会再使用kafka自带的zookeeper了,会单独部署zookeeper服务。

    在这里插入图片描述



    部署Zookeeper集群

    zookeeper是使用的zab协议,选举需要半数以上的同意,所以部署的节点数需要单数。

    [root@worker1 ~]# tar -zxf apache-zookeeper-3.6.2-bin.tar.gz -C /usr/local/soft/zookeeper/
    [root@worker1 ~]# cd /usr/local/soft/zookeeper/apache-zookeeper-3.6.2-bin
    

    然后进入conf目录,修改配置文件。在conf目录中,提供了一个zoo_sample.cfg文件,这是一个示例文件。我们只需要将这个文件复制一份zoo.cfg,修改下其中的关键配置就可以了。其中比较关键的修改参数如下:

    [root@worker1 apache-zookeeper-3.6.2-bin]# cd conf/
    [root@worker1 conf]# cp zoo_sample.cfg zoo.cfg
    [root@worker1 conf]# vi zoo.cfg 
    
    #Zookeeper的本地数据目录,默认是/tmp/zookeeper。这是Linux的临时目录,随时会被删掉。
    dataDir=/app/zookeeper/data
    #Zookeeper的服务端口
    clientPort=2181
    #集群节点配置  2888集群内部数据传输   3888集群内部进行选举
    server.1=192.168.75.61:2888:3888
    server.2=192.168.75.62:2888:3888
    server.3=192.168.75.63:2888:3888
    

    其中,clientPort 2181是对客户端开放的服务端口。

    集群配置部分, server.x这个x就是节点在集群中的myid。后面的2888端口是集群内部数据传输使用的端口。3888是集群内部进行选举使用的端口。



    启动服务

    # --config 指定config目录,它会自己去读取目录下的zoo.cfg文件
    [root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh --config conf start 
    ZooKeeper JMX enabled by default
    Using config: conf/zoo.cfg
    Starting zookeeper ... FAILED TO START
    
    

    发现启动失败了,查看vi ./logs/zookeeper-root-server-worker1.out保存信息,提示是没有myid文件

    在这里插入图片描述



    因为我们上方的配置文件中dataDir=/app/zookeeper/data,那么我们就在这个目录下创建一个myid文件,并指定一个集群环境下唯一id

    # 在这个文件中输入一个1   其他节点就输入2  3  保证各节点不重复即可
    vi /app/zookeeper/data/myid
    1
    

    再启动服务

    [root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh --config conf start 
    ZooKeeper JMX enabled by default
    Using config: conf/zoo.cfg
    Starting zookeeper ... STARTED
    



    启动完成后,使用jps指令可以看到一个QuorumPeerMain进程就表示服务启动成功。

    [root@worker1 apache-zookeeper-3.6.2-bin]# jps
    8438 Jps
    8395 QuorumPeerMain
    
    



    三台机器都启动完成后,可以查看下集群状态。

    这其中Mode 为leader就是主节点,follower就是从节点。

    [root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /app/zookeeper/zookeeper-3.5.8/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost.
    Mode: leader   
    



    部署Kafka集群

    kafka服务并不需要进行选举,因此也没有奇数台服务的建议。每台服务器上都执行下面的命令

    解压kafka

    [root@localhost ~]# mkdir /usr/local/soft/kafka/
    
    [root@localhost ~]# tar -zxvf kafka_2.13-3.4.0.tgz -C /usr/local/soft/kafka/
    
    # 最好是设置一下环境变量
    [root@worker1 ~]# vi ./.bash_profile 
    export KAFKA_HOME=/usr/local/soft/kafka/kafka_2.13-3.4.0
    export PATH=$PATH:$KAFKA_HOME/bin
    [root@worker1 ~]# source ./.bash_profile
    
    [root@localhost ~]# cd /usr/local/soft/kafka/kafka_2.13-3.4.0
    



    然后进入config目录,修改server.properties。这个配置文件里面的配置项非常多,下面列出几个要重点关注的配置。

    [root@worker1 kafka_2.13-3.4.0]# mkdir -p /app/kafka/logs
    [root@worker1 kafka_2.13-3.4.0]# vi config/server.properties
    #broker 的全局唯一编号,不能重复,只能是数字。 所以一个集群下的kafka实例就需要使用不同的id
    broker.id=1
    #数据文件地址。同样默认是给的/tmp目录。
    log.dirs=/app/kafka/logs
    # 默认的每个Topic的分区数1,可以修改,我这里就修改为了2
    num.partitions=2
    #zookeeper的服务地址,因为我配置了域名映射,所以这里就使用使用的worker1 worker2 worker3
    zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
    #可以选择指定zookeeper上的基础节点。
    #zookeeper.connect=worker1:2181,worker2:2181,worker3:2181/kafka
    

    broker.id需要每个服务器上不一样,分发到其他服务器上时,要注意修改一下。

    多个Kafka服务注册到同一个zookeeper集群上的节点,会自动组成集群。

    一些核心的配置内如下所示

    PropertyDefaultDescription
    broker.id0broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为一的即可。
    log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
    listenersPLAINTEXT://127.0.0.1:9092server接受客户端连接的端口,ip配置kafka本机ip即可
    zookeeper.connectlocalhost:2181zookeeper连接地址。hostname:port。如果是Zookeeper集群,用逗号连接。
    log.retention.hours168每个日志文件删除之前保存的时间。
    num.partitions1创建topic的默认分区数
    default.replication.factor1自动创建topic的默认副本数量
    min.insync.replicas1当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
    delete.topic.enablefalse是否允许删除主题



    接下来就可以启动kafka服务了。启动服务时需要指定配置文件。

    # -daemon表示后台启动
    [root@worker1 kafka_2.13-3.4.0]# bin/kafka-server-start.sh -daemon config/server.properties
    [root@worker1 kafka_2.13-3.4.0]# jps
    8819 Kafka
    8884 Jps
    8395 QuorumPeerMain
    



    理解服务端的Topic、Partition和Broker

    创建一个topic,指定该topic的partition数量为4,每组partition的副本集个数为2,

    这里的副本是指一个Leader一个Follower,并不是指一个Leader两个Follower

    # --partitions 参数表示分区数   --replication-factor 表示每个分区的副本集个数
    kafka-topics.sh --create --topic disTopic --partitions 4 --replication-factor 2 --bootstrap-server worker1:9092
    



    列出所有的topic

    [root@worker1 ~]# kafka-topics.sh --list --bootstrap-server worker1:9092
    disTopic
    



    查看列表情况

    # --describe查看Topic信息。
    [root@worker1 ~]# kafka-topics.sh --describe --topic disTopic --bootstrap-server worker1:9092
    Topic: disTopic	TopicId: rDUdZBO7RH2GNPgdRXk7Tw	PartitionCount: 4	ReplicationFactor: 2	Configs: 
    	Topic: disTopic	Partition: 0	Leader: 3	Replicas: 3,1	Isr: 3,1
    	Topic: disTopic	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2
    	Topic: disTopic	Partition: 2	Leader: 2	Replicas: 2,3	Isr: 2,3
    	Topic: disTopic	Partition: 3	Leader: 3	Replicas: 3,2	Isr: 3,2
    	
    	
    # 这里有4个partition   就拿partition0来举例,其中partition0这个副本集中的Leader在BrokerId为3的机器上,其中两个副本分别在brokerId1 BrokerId3上
    
    • Partiton参数列出了四个partition,后面带有分区编号,用来标识这些分区。

    • Leader表示这一组partition的Leader节点是哪一个,这个后面的数字是启动kafka服务时配置文件中指定的broker.id数

      kafka的每一组partition都会选举一个Leader,Leader会处理Client的读写请求,但实际上Leader可能会让某个副本去响应Client的请求。

    • Replicas表示这个Partition的复制集是分配到哪些Broker上的。但是Replicas列出的只是一个逻辑上的分配情况,并不关心数据实际上是不是按照这个分配,甚至有些节点服务挂了,这一列也还会显示

    • Isr表示Partition的实际分配情况,它是Replicas列的一个子集,只列出当前存活,能正常同步数据Broker节点



    我们在配置文件中指定的log.dirs=/app/kafka/logs 配置

    在这里插入图片描述



    一个Broker上的一个partition就对应着一个目录,而这个Partition上的所有消息,就保存在这个对应的目录当中。

    在这里插入图片描述



    在Kafka中,Topic是逻辑上的概念,数据实际上是保存在Topic下的Partition中的,Partition就是数据存储的物理单元,而Broker是Partition的物理载体,这些Partition会尽量均匀的分配在不同的Broker上,避免一个Broker宕机整个Partition副本集都不能用的情况发生

    而消费者消费消息的offset其实就是每个消息在partition上的偏移量



    总结

    在这里插入图片描述

    1. Topic是逻辑上的概念,producer和consumer通过Topic进行业务沟通
    2. Topic并不存储数据,数据是保存在Topic下的多组Partition中的,消息会尽量平均的分发在各组Partition中,每组Partition保存了Topic下的一部分消息
    3. 每组Partition包含一个Leader和多个Follower,每组Partition的个数成为备份因子replica factor
    4. producer将消息发送到Partition上,consumer通过partition上的offset记录自己所属组在当前partition上消费消息的消费进度
    5. producer将消息发送到Topic,kafka会推送给所有订阅了该topic的消费者组进行处理。但是每个消费者组内部只会有一个消费者实例处理这一条消息
    6. kafka的Broker通过zookeeper组成集群,然后在这些Broker中,需要选举产生一个担任Controller角色的Broker。这个Controller的主要任务就是负责Topic的分配以及后续管理工作。这也是通过ZooKeeper选举的。



    Kraft集群

    在Kafka的config目录下,提供了一个kraft的文件夹,在这里面就是Kraft协议的参考配置文件。在这个文件夹中有三个配置文件,broker.properties,controller.properties,server.properties,分别给出了Kraft中三种不同角色的示例配置。

    • broker.properties: 数据节点
    • controller.properties: Controller控制节点
    • server.properties: 即可以是数据节点,又可以是Controller控制节点。

    这里同样列出几个比较关键的配置项,按照自己的环境进行定制即可。

    [root@worker1 kafka_2.13-3.4.0]# vi config/kraft/server.properties
    
    #配置当前节点的角色。Controller相当于Zookeeper的功能,负责集群管理。Broker提供具体的消息转发服务。
    process.roles=broker,controller
    #配置当前节点的id。与普通集群一样,要求集群内每个节点的ID不能重复。
    node.id=1
    #配置集群的投票节点。其中@前面的是node.id的值,后面是节点的地址和端口,这个端口跟客户端访问的端口是不一样的。通常将集群内的所有Controllor节点都配置进去。
    controller.quorum.voters=1@192.168.75.61:9093,2@192.168.75.62:9093,3@192.168.75.63:9093
    #Broker对客户端暴露的服务地址。基于PLAINTEXT协议。一般写本机ip
    advertised.listeners=PLAINTEXT://worker1:9092
    #Controller服务协议的别名。默认就是CONTROLLER
    controller.listener.names=CONTROLLER
    #配置监听服务。不同的服务可以绑定不同的接口。这种配置方式在端口前面是省略了一个主机IP的,主机IP默认是使用的java.net.InetAddress.getCanonicalHostName()
    listeners=PLAINTEXT://:9092,CONTROLLER://:9093
    #数据文件地址。默认配置在/tmp目录下。
    log.dirs=/app/kafka/kraft-log
    #topic默认的partition分区数。
    num.partitions=2
    



    将配置文件分发,并修改每个服务器上的node.idcontroller.quorum.voterslog.dirsnum.partitions属性。

    由于Kafka的Kraft集群对数据格式有另外的要求,所以在启动Kraft集群前,还需要对日志目录进行格式化。

    # 先使用kafka提供的脚本生成一个uuid
    [oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh random-uuid
    NT1Y5KgOTQ63SPppgsfhqA
    
    # -t 集群ID,三个服务器上使用同一个集群ID。 加上上方的uuid  -c 指定配置文件
    [oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh format -t NT1Y5KgOTQ63SPppgsfhqA -c config/kraft/server.properties 
    




    在这里插入图片描述



    接下来就可以指定配置文件,启动Kafka的服务了。 例如,在Worker1上,启动Broker和Controller服务。

    [root@worker1 kafka_2.13-3.4.0]# bin/kafka-server-start.sh -daemon config/kraft/server.properties 
    [root@worker1 kafka_2.13-3.4.0]# jps
    12960 Kafka
    13028 Jps
    

    等三个服务都启动完成后,就可以像普通集群一样去创建Topic,并维护Topic的信息了。



    相关概念

    topic是逻辑上的概念,一个topic可以有多个partition,消息真正是保存在partition中的,可以理解为partition就是真实的queue。

    消息生成者就只管指定某个topic,往topic发送消息即可,kafka内部会把消息分发到该topic内的partition中,如果存在多个partition就会有相应的负载均衡策略



    一个partition中的消息,只会被一个消费者组中的某一个消费者消费,和RocketMQ一样。不同的消费者组可以对消息有不同的处理逻辑,而同一个消费者组的目的是减少消息消费压力,做水平扩容的



    消息副本,每个消费者组都有一个消息副本,而一个消费者组下的多个消费者共用一个消息副本,也就是一条消息只能被一个消费者组下的一个消费者消费。



    概念:

    • 客户端client:消息生产者和消息消费者都是Kafka的Client

    • 消费者组:每个消费者可以指定一个所属的消费者组,每一条消息会被多个感兴趣的消费者组消费,但一条消息只会被一个消费者组中的一个消费者消费。

    • 服务端Broker:一个Kafka服务就是一个Broker

    • 话题Topic:逻辑概念,一个Topic被认为是业务含义相同的一组消息。客户端通过绑定Topic来生产或消费自己感兴趣的话题

    • 分区Partition:Topic是逻辑概念,而Partition是实际存储消息的组件。每一个Partition就是一个queue。

    在Kafka中,Topic是逻辑上的概念,数据实际上是保存在Topic下的Partition中的,Partition就是数据存储的物理单元,而Broker是Partition的物理载体,这些Partition会尽量均匀的分配在不同的Broker上,避免一个Broker宕机整个Partition副本集都不能用的情况发生

    而消费者消费消息的offset其实就是每个消息在partition上的偏移量

  • 相关阅读:
    C语言为什么不支持函数重载?C和C++程序怎样互调?
    FPGA工程师面试——基础概念问题整理
    Kafka 为什么那么快?
    QPainter、QPen 、QBrush(概念)
    sql跨表查询的三种方案
    【Python入门】Numpy基础
    Ceph入门到精通-Macvlan网络模式
    批量采集的时间管理与优化
    深入理解FFmpeg--libavformat接口使用(一)
    【IP SSL】内网IP SSL证书Nginx部署
  • 原文地址:https://blog.csdn.net/qq_44027353/article/details/140993096