• Kafka概述及使用


    Kafka概述及使用

    Apache开发的,由Scala和Java编写的分布式消息发布订阅系统

    1 Kafka使用场景及基本术语

    1. Kafka使用场景:
    • 日志收集
    • 消息系统
    • 用户活动跟踪
    • 运营指标
    1. 基本消息术语

    kafka借鉴了JMS的思想,但是没有完全遵循
    JMS(Java Messaging Service)

    名称说明
    Broker(中间人)消息中间件处理节点,⼀个Kafka节点就是⼀个broker,⼀个或者多个Broker可以组成⼀个Kafka集群
    TopicKafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需要指定⼀个topic
    Producer消息生产者,想Broker发送消息的客户端
    Consumer消息消费者,从Broker读取消息的客户端
    ConsumerGroup每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
    Partition物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的

    从较高的层次上看,producer通过网络发送消息到kafka集群,然后consumer来进行消费。
    服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。

    2 Linux搭建Kafka环境

    启动kafka需要jdk环境,如果没有的,可以通过yum安装配置

    2.1 下载安装并启动zk(使用自带也行)

    kafka中默认也带zk,所以可以使用kafka中自带的
    到kafka的bin目录下

    启动zk(kafka中自带的):

    sh zookeeper-server-start.sh ../config/zookeeper.properties
    
    • 1

    启动kafka:

    sh kafka-server-start.sh ../config/server.properties
    
    • 1

    如果因为之前使用了别的zk注册kafka,那么就到kafka配置的日志文件中【配置文件路径在config/server.properties】,找到日志存放路径
    在这里插入图片描述
    然后进入到该路径,删除该路径下的所有日志文件
    rm -rf kafka-logs/删除所有日志,重新启动

    • 下载安装

    首先,要使用kafka是需要有zookeeper环境的,我们首先需要到官网去下载zookeeper
    官网地址:https://zookeeper.apache.org/releases.html

    此处我使用的是3.5.7版本
    通过Xftp将windows上的zookeeper推送到linux上,然后再/usr/local下创建zookeeper目录,在该目录下,通过tar -zxvf 压缩包名,解压

    • 启动zookeeper

    在bin目录下,通过sh zookeeper.sh启动zookeeper

    2.2 安装并启动kafka

    官⽹下载kafka的压缩包:http://kafka.apache.org/downloads

    1. 解压缩⾄如下路径
    /usr/local/kafka/ 
    
    • 1
    1. 修改配置⽂件:/usr/local/kafka/kafka2.11-2.4/config/server.properties
    #broker.id属性在kafka集群中必须要是唯⼀
    broker.id=0
    #kafka部署的机器ip和提供服务的端⼝号
    listeners=PLAINTEXT://192.168.145.13:9092
    #kafka的消息存储⽂件
    log.dir=/usr/local/data/kafka-logs
    #kafka连接zookeeper的地址
    zookeeper.connect=localhost:2181
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. 进入bin目录启动kafka
    ./kafka-server-start.sh -daemon ../config/server.properties 
    
    • 1

    验证是否启动成功:
    启动zk客户端,进⼊到zk中的节点看id是0的broker有没有存在(上线),

    ls /brokers/ids/
    
    • 1

    结果:
    在这里插入图片描述
    kafka成功在zookeeper上注册。

    3 Kafka相关功能

    3.1 server.properties核心配置详解

    PropertyDefaultDescription
    broker.id0每个broker通过非负整数id进行标识,需要唯一
    log.dirs/temp/kafka-logskafka存放数据的路径,路径不唯一,可以是多个,路径间用逗号分隔
    listenersPLAINTEXT://192.168.145.13:9092server接收客户端连接的端口,ip配置为kafka本机ip
    zookeeper.connectlocalhost:2181zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接⽅式为hostname1:port1, hostname2:port2,hostname3:port3
    log.retention.hours168日志保存时间
    num.partitions1创建topic的默认分区数
    default.replication.factor1自动创建topic的默认副本数量,建议设置为大于等于2
    min.insync.replicas1当producer设置acks为-1时,min.insync.replicas指定replicas的最⼩数⽬(必须确认每⼀个repica的写数据都是成功的),如果这个数⽬没有达到,producer发送消息会产⽣异常
    delete.topic.enablefalse是否允许删除主题

    3.2 kafka创建主题,发送、消费消息

    topic是什么概念?topic可以实现消息的分类,不同消费者订阅不同的topic。

    在这里插入图片描述

    • 创建主题

    执⾏以下命令创建名为“test”的topic,这个topic只有⼀个partition,并且备份因⼦也设置为1
    在kafka的bin目录下执行:

    ./kafka-topics.sh --create --zookeeper 192.168.145.13:2181 --replication-factor 1 --partitions 1 --topic test
    
    • 1

    ip地址为zookeeper地址
    查看当前kafka内有哪些topic,在kafka的bin目录下执行

    ./kafka-topics.sh --list --zookeeper 192.168.145.13:2181
    
    • 1

    在这里插入图片描述

    • 发送消息

    kafka⾃带了⼀个producer命令客户端,可以从本地⽂件中读取内容,或者我们也可以以命令⾏中直接输⼊内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每⼀个⾏会被当做成⼀个独⽴的消息。使⽤kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic

    ./kafka-console-producer.sh --broker-list 192.168.145.13:9092 --topic test
    
    • 1

    ip为kafka地址,指定topic为test
    在这里插入图片描述

    • 消费消息

    对于consumer,kafka同样也携带了⼀个命令⾏客户端,会将获取到内容在命令中进⾏输出,默认是消费最新的消息。使⽤kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息

    方式一:从最后一条消息的偏移量+1开始消费

    ./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092 --topic test
    
    • 1

    在这里插入图片描述

    方式二:从头开始消费

    ./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092 --from-beginning 
      --topic test
    
    • 1
    • 2

    在这里插入图片描述

    注意:

    1. 消息会被存储
    2. 消息是顺序存储
    3. 消息是有偏移量的
    4. 消费时可以指明偏移量进行消费

    3.3 单播消息、多播消息、消费者组

    消息的发送⽅会把消息发送到broker中,broker会存储消息,消息是按照发送的顺序进⾏存
    储。因此消费者在消费消息时可以指明主题中消息的偏移量。默认情况下,是从最后⼀个消
    息的下⼀个偏移量开始消费。

    ①单播消息的实现
    单播消息:一个消费者组里只有一个消费者能消费到某个topic中的消息。一个消费者组中有多个消费者,但是只有一个消费者可以消费到消息。
    producer:

    ./kafka-console-producer.sh --broker-list 192.168.145.13:9092 --topic test
    
    • 1

    多个consumer开两个会话就行

    consumer1:

    ./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092 
     --consumer-property group.id=testGroup --topic test
    
    • 1
    • 2

    consumer2:

    ./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092 
     --consumer-property group.id=testGroup --topic test
    
    • 1
    • 2

    ②多播消息的实现
    在一些业务场景中需要让一条消息被多个消费者消费,那么就可以使用多播模式。kafka实现多播,只需要让不同的消费者处于不同的消费者组即可。

    testGroup1中的consumer:

     ./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092 
      --consumer-property group.id=testGroup1 --topic test
    
    • 1
    • 2

    testGroup2中的consumer:

    ./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092 
     --consumer-property group.id=testGroup2 --topic test
    
    • 1
    • 2

    3.4 查看消费者组及信息

    # 查看当前主题下有哪些消费者组
    ./kafka-consumer-groups.sh --bootstrap-server 192.168.145.13:9092 --list
    
    # 查看消费者组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
    ./kafka-consumer-groups.sh --bootstrap-server 192.168.145.13:9092 --describe
     --group testGroup
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述
    在这里插入图片描述

    • Current-offset:当前消费组的已消费偏移量
    • Log-end-offset:主题对应分区消息的结束偏移量(HW)
    • Log:当前消费组未消费的消费数

    3.4 主题、分区概念

    1. 主题Topic
      主题Topic可以理解为一个类别的名称

    2. partition分区
      anatomy:剖析
      在这里插入图片描述
      一个主题中的消息量是非常大的,因此可以通过分区的设置,来分布式存储这些消息。比如一个topic创建了3个分区。那么topic中的消息就会分别存在这三个分区中【一个国家,分为多个省份】

    为一个主题创建多个分区

    ./kafka-topics.sh --zookeeper localhost:2181  --create --replication-factor 1 --partitions 2  --topic test1
    
    • 1
    • replication-factor 2 :指定2个副本
    • partitions 2 :指定2个分区
    • topic test1 :指定主题名字为test1
      查询topic的分区信息
    ./kafka-topics.sh --describe --zookeeper localhost:2181
    
    • 1

    在这里插入图片描述

    分区的作用:

    • 可以分布式存储
    • 可以并行写

    4 搭建kafka集群

    1. 复制配置文件
      到配置文件路径kafka/cofig/下
    cp server.properties server1.properties
    cp server.properties server2.properties
    
    • 1
    • 2
    1. 修改复制好的配置文件

    server1.proerties

    broker.id=1
    listeners=PLAINTEXT://192.168.145.13:9093
    log.dir=/usr/local/data/kafka-logs-1
    
    • 1
    • 2
    • 3

    server2.properties

    broker.id=2
    listeners=PLAINTEXT://192.168.145.13:9094
    log.dir=/usr/local/data/kafka-logs-2
    
    • 1
    • 2
    • 3
    1. 启动另外两台kafka
    ./kafka-server-start.sh -daemon ../config/server1.properties
    
    • 1
    ./kafka-server-start.sh -daemon ../config/server2.properties
    
    • 1

    进入kafka自带zk的cli

    来到kafka的bin目录下:

    sh zookeeper-shell.sh 本机IP:2181
    
    • 1

    在这里插入图片描述
    通过ls /brokers/ids/查看当前节点下的信息:
    在这里插入图片描述

    可以看到成功注册上了,至此集群搭建完成

    4.1 副本概念

    副本是对分区的备份。在集群中,不同的副本会被部署在不同的broker(kafka节点)上。
    例子:创建1个主题,2个分区,3个副本。

    ./kafka-topics.sh --create --zookeeper 192.168.145.13:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
    
    • 1

    通过查看主题信息,来获取其中关键数据,此处指定查看my-replicated-topic主题内容

    ./kafka-topics.sh --zookeeper localhost:2181 --topic my-replicated-topic --describe
    
    • 1

    在这里插入图片描述

    分析图:

    在这里插入图片描述

    • replicas:当前副本存在的broker节点
    • leader:partition中都有一个broker作为leader

    每个partition都有一个broker作为leader。
    消息发送方要把消息发给哪个broker?就看副本的leader是在哪个broker上。
    副本里的leader专门用来接收消息。接收到消息,其他follower通过poll的方式来同步数据。

    • follower:leader处理所有针对这个partition的读写请求,而follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的一致性),如果leader所在的broker挂掉,那么就会进行新leader的选举。

    通过kill掉leader后再查看主题情况

    # kill掉header
    ps -aux | grep server.properties
    kill -9 11968
    
    # 重新查看topic情况
    ./kafka-topics.sh --describe --zookeeper 192.168.145.13:2181 --topic my-replicated-topic
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    • isr:可以同步的broker节点和已同步的broker节点,放在isr集合中

    4.2 kafka集群消息的发送、消费

    • kafka集群由多个broker组成
    • 1个broker中存放1个topic的不同partition——副本
      在这里插入图片描述
    1. Kafka集群消息的发送
    ./kafka-console-producer.sh --broker-list 192.168.145.13:9092,192.168.145.13:9093,192.168.145.13:9094 --topic my-replicated-topic
    
    • 1
    1. Kafka集群消费的消费
    ./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092,192.168.145.13:9093,192.168.145.13:9094 --from-beginning --topic my-replicated-topic
    
    • 1
    1. Kafka分区消费组中消费者的细节
      在这里插入图片描述
      图中Kafka集群中有两个Broker,每个Broker中有多个partition。一个partition只能被一个消费者组里的某一个消费者消费,从而保证消费顺序。Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。一个消费者可以消费多个partition。

    消费组中消费者的的数量不能比一个topic中的partition数量多,否则多出来的消费者消费不到消息。

  • 相关阅读:
    技术分享 | 静态扫描体系集成
    cuMemcpyHtoDAsync failed: invalid argument
    MATLAB从0开始搭建简单的GUI界面
    React Native快速上手
    Smart Link 和 Monitor Link应用
    Git学习笔记 - Idea集成GitHub、Gitee
    打造一个投资组合管理的金融强化学习环境
    [附源码]Python计算机毕业设计高校第二课堂管理系统
    网页设计期末作业 使用HTML实现一个静态页面(含源码)
    京东数据分析:2023年9月京东笔记本电脑行业品牌销售排行榜
  • 原文地址:https://blog.csdn.net/weixin_45565886/article/details/125883056