• 【详细教程】Kafka应用场景、基础组件、架构探索


    file

    1、应用场景

    1.1 kafka场景

    ​ Kafka最初是由LinkedIn公司采用Scala语言开发,基于ZooKeeper,现在已经捐献给了Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以 高吞吐、可持久化、可水平扩展、支持流处理等多种特性而被广泛应用。

    Apache Kafka能够支撑海量数据的数据传递。在离线和实时的消息处理业务系统中,Kafka都有广泛的应用。

    (1)日志收集:收集各种服务的log,通过kafka以统一接口服务的方式开放 给各种consumer,例如Hadoop、Hbase、Solr等;

    (2)消息系统:解耦和生产者和消费者、缓存消息等;

    (3)用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点 击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时 的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;

    (4)运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作 的集中反馈,比如报警和报告;

    (5)流式处理:比如spark streaming和storm;

    1.2 kafka特性

    kafka以高吞吐量著称,主要有以下特性:

    (1)高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;

    (2)可扩展性:kafka集群支持热扩展;

    (3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;

    (4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);

    (5)高并发:支持数千个客户端同时读写;

    1.3 消息对比

    • 如果普通的业务消息解耦,消息传输,rabbitMq是首选,它足够简单,管理方便,性能够用。
    • 如果在上述,日志、消息收集、访问记录等高吞吐,实时性场景下,推荐kafka,它基于分布式,扩容便捷
    • 如果很重的业务,要做到极高的可靠性,考虑rocketMq,但是它太重。需要你有足够的了解

    1.4 大厂应用

    • 京东通过kafka搭建数据平台,用于用户购买、浏览等行为的分析。成功抗住6.18的流量洪峰
    • 阿里借鉴kafka的理念,推出自己的rocketmq。在设计上参考了kafka的架构体系

    2、基础组件

    2.1 角色

    file

    • broker:节点,就是你看到的机器

    • provider:生产者,发消息的

    • consumer:消费者,读消息的

    • zookeeper:信息中心,记录kafka的各种信息的地方

    • controller:其中的一个broker,作为leader身份来负责管理整个集群。如果挂掉,借助zk重新选主

    2.2 逻辑组件

    file

    • topic:主题,一个消息的通道,收发总得知道消息往哪投

    • partition:分区,每个主题可以有多个分区分担数据的传递,多条路并行,吞吐量大

    • Replicas:副本,每个分区可以设置多个副本,副本之间数据一致。相当于备份,有备胎更可靠

    • leader & follower:主从,上面的这些副本里有1个身份为leader,其他的为follower。leader处理partition的所有读写请求

    2.3 副本集合

    • AR:所有副本的统称,AR=ISR+OSR

    • ISR:同步中的副本,可以参与leader选主。一旦落后太多(数量滞后和时间滞后两个维度)会被踢到OSR。

    • OSR:踢出同步的副本,一直追赶leader,追上后会进入ISR

    2.4 消息标记

    file

    • offset:偏移量,消息消费到哪一条了?每个消费者都有自己的偏移量
    • HW:(high watermark):副本的高水印值,客户端最多能消费到的位置,HW值为8,代表offset为[0,8]的9条消息都可以被消费到,它们是对消费者可见的,而[9,12]这4条消息由于未提交,对消费者是不可见的。
    • LEO:(log end offset):日志末端位移,代表日志文件中下一条待写入消息的offset,这个offset上实际是没有消息的。不管是leader副本还是follower副本,都有这个值。

    那么这三者有什么关系呢?

    比如在副本数等于3的情况下,消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW一般会小于LEO,即LEO>=HW。

    具体的同步原理,下面章节会详细讲到

    3.1 发展历程

    http://kafka.apache.org/downloads

    file

    3.1.1 版本命名

    Kafka在1.0.0版本前的命名规则是4位,比如0.8.2.1,0.8是大版本号,2是小版本号,1表示打过1个补丁

    现在的版本号命名规则是3位,格式是“大版本号”+“小版本号”+“修订补丁数”,比如2.5.0,前面的2代表的是大版本号,中间的5代表的是小版本号,0表示没有打过补丁

    我们所看到的下载包,前面是scala编译器的版本,后面才是真正的kafka版本。

    3.1.2 演进历史

    0.7版本
    只提供了最基础的消息队列功能。

    0.8版本
    引入了副本机制,至此Kafka成为了一个真正意义上完备的分布式高可靠消息队列解决方案。

    0.9版本
    增加权限和认证,使用Java重写了新的consumer API,Kafka Connect功能;不建议使用consumer API;

    0.10版本
    引入Kafka Streams功能,正式升级成分布式流处理平台;建议版本0.10.2.2;建议使用新版consumer API

    0.11版本
    producer API幂等,事务API,消息格式重构;建议版本0.11.0.3;谨慎对待消息格式变化

    1.0和2.0版本
    Kafka Streams改进;建议版本2.0;

    3.2 集群搭建(助学)

    1)原生启动

    kafka启动需要zookeeper,第一步启动zk:

    docker run --name zookeeper-1 -d -p 2181 zookeeper:3.4.13
    
    • 1

    原生安装:下载后解压启动即可 http://kafka.apache.org/downloads

    bin/kafka-server-start.sh config/server.properties
    
    • 1
    #server.properties配置说明
    #表示broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
    broker.id=0 
    #brokder对外提供的服务入口地址,默认9092
    listeners=PLAINTEXT://:9092 
    #设置存放消息日志文件的地址
    log.dirs=/tmp/kafka/log 
    #Kafka所需Zookeeper集群地址,这里是关键!加入同一个zk的kafka为同一集群
    zookeeper.connect=zookeeper:2181 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2)推荐docker-compose 一键启动

    #参考资料中的kafka.yml
    #注意hostname问题,ip地址:52.82.98.209,换成你自己服务器的
    #docker-compose -f kafka.yml up -d 启动
    version: '3'
    services:
        zookeeper:
            image: zookeeper:3.4.13
    
        kafka-1:
            container_name: kafka-1
            image: wurstmeister/kafka:2.12-2.2.2
            ports:
                - 10903:9092
            environment:
                KAFKA_BROKER_ID: 1 
                HOST_IP: 52.82.98.209
                KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
                #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
                KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
                KAFKA_ADVERTISED_PORT: 10903 
            volumes:
                - /etc/localtime:/etc/localtime
            depends_on:
                - zookeeper           
        kafka-2:
            container_name: kafka-2
            image: wurstmeister/kafka:2.12-2.2.2
            ports:
                - 10904:9092
            environment:
                KAFKA_BROKER_ID: 2 
                HOST_IP: 52.82.98.209
                KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
                KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
                KAFKA_ADVERTISED_PORT: 10904 
            volumes:
                - /etc/localtime:/etc/localtime
            depends_on:
                - zookeeper 
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    3.3 组件探秘

    命令行工具是管理kafka集群最直接的工具。官方自带,不需要额外安装。

    3.2.1 主题创建

    #进入容器
    docker exec -it kafka-1 sh
    #进入bin目录
    cd /opt/kafka/bin
    #创建
    kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test --partitions 2 --replication-factor 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.2.2 查看主题

    kafka-topics.sh --zookeeper zookeeper:2181 --list
    
    • 1

    3.2.3 主题详情

    kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test
    
    #分析输出:
    Topic:test	PartitionCount:2	ReplicationFactor:1	Configs:
    	Topic: test	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
    	Topic: test	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.2.4 消息收发

    #使用docker连接任意集群中的一个容器
    docker exec -it kafka-1 sh
    
    #进入kafka的容器内目录
    cd /opt/kafka/bin
    
    #客户端监听
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
    
    #另起一个终端,验证发送
    ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.2.5 分组消费

    #启动两个consumer时,如果不指定group信息,消息被广播
    #指定相同的group,让多个消费者分工消费(画图:group原理)
    
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa
    
    #结果:在发送方,连续发送 1-4 ,4条消息,同一group下的两台consumer交替消费,并发执行
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    注意!!!

    这是在消费者和分区数相等(都是2)的情况下。
    如果同一group下的 ( 消费者数量 > 分区数量 ) 那么就会有消费者闲置。

    验证方式:

    可以再多启动几个消费者试一试,会发现,超出2个的时候,有的始终不会消费到消息。
    停掉可以消费到的,那么闲置的会被激活,进入工作状态

    3.2.6 指定分区

    #指定分区通过参数 --partition,注意!需要去掉上面的group
    #指定分区的意义在于,保障消息传输的顺序性(画图:kafka顺序性原理)
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 1
    
    #结果:发送1-4条消息,交替出现。说明消息被均分到各个分区中投递
    
    
    #默认的发送是没有指定key的
    #要指定分区发送,就需要定义key。那么相同的key被路由到同一个分区
    ./kafka-console-producer.sh --broker-list kafka-1:9092 --topic test --property parse.key=true
    
    #携带key再发送,注意key和value之间用tab分割
    >1	1111
    >1	2222
    >2	3333
    >2	4444
    
    #查看consumer的接收情况
    #结果:相同的key被同一个consumer消费掉
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.2.7 偏移量

    #偏移量决定了消息从哪开始消费,支持:开头,还是末尾
    
    # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    
    # 注意点!!!有提交偏移量的话,仍然以提交的为主,即便使用earliest,比提交点更早的也不会被提取
    
    #--offset [earliest|latest(默认)] , 或者 --from-beginning
    #新起一个终端,指定offset位置
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset earliest
    
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --from-beginning
    
    #结果:之前发送的消息,从头又消费了一遍!
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.4 zk探秘

    前面说过,zk存储了kafka集群的相关信息,本节来探索内部的秘密。

    kafka的信息记录在zk中,进入zk容器,查看相关节点和信息

    docker exec -it kafka_zookeeper_1 sh
    
    >./bin/zkCli.sh
    
    >ls /
    
    #结果:得到以下配置信息
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    file

    3.4.1 broker信息

    [zk: localhost:2181(CONNECTED) 0] ls /brokers
    [ids, topics, seqid]
    [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
    [1, 2]
    
    #机器broker信息
    [zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://52.82.98.209:10903"],"jmx_port":-1,"host":"52.82.98.209","timestamp":"1609825245500","port":10903,"version":4}
    cZxid = 0x27
    ctime = Tue Jan 05 05:40:45 GMT 2021
    mZxid = 0x27
    mtime = Tue Jan 05 05:40:45 GMT 2021
    pZxid = 0x27
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x105a2db626b0000
    dataLength = 196
    numChildren = 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3.4.2 主题与分区

    #分区节点路径
    [zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
    [test, __consumer_offsets]
    [zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test
    [partitions]
    [zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/test/partitions
    [0, 1]
    [zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/test/partitions/0
    [state]
    
    #分区信息,leader所在的机器id,isr列表等
    [zk: localhost:2181(CONNECTED) 18] get /brokers/topics/test/partitions/0/state
    {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1]}
    cZxid = 0xb0
    ctime = Tue Jan 05 05:56:06 GMT 2021
    mZxid = 0xb0
    mtime = Tue Jan 05 05:56:06 GMT 2021
    pZxid = 0xb0
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 72
    numChildren = 0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    3.4.3 消费者与偏移量

    [zk: localhost:2181(CONNECTED) 15] ls /consumers
    []
    #空的???
    #那么,消费者以及它的偏移记在哪里呢???
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    kafka 消费者记录 group 的消费 偏移量 有两种方式 :

    1)kafka 自维护 (新)

    2)zookpeer 维护 (旧) ,已经逐渐被废弃

    查看方式:

    上面的消费用的是控制台工具,这个工具使用–bootstrap-server,不经过zk,也就不会记录到/consumers下。

    其消费者的offset会更新到一个kafka自带的topic【__consumer_offsets】下面

    #先起一个消费端,指定group
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa
    
    #使用控制台工具查看消费者及偏移量情况
    ./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --list
    KMOffsetCache-44acff134cad
    aaa
    
    #查看偏移量详情
    ./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group aaa
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    当前与LEO保持一致,说明消息都完整的被消费过

    file

    停掉consumer后,往provider中再发几条记录,offset开始滞后:

    file

    重新启动consumer,消费到最新的消息,同时再返回看偏移量,消息得到同步:

    file

    3.4.4 controller

    #当前集群中的主控节点是谁
    [zk: localhost:2181(CONNECTED) 17] get /controller
    {"version":1,"brokerid":1,"timestamp":"1609825245694"}
    cZxid = 0x2a
    ctime = Tue Jan 05 05:40:45 GMT 2021
    mZxid = 0x2a
    mtime = Tue Jan 05 05:40:45 GMT 2021
    pZxid = 0x2a
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x105a2db626b0000
    dataLength = 54
    numChildren = 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3.5 km

    3.5.1 启动

    kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源。提供可视化kafka集群操作

    官网:https://github.com/yahoo/kafka-manager/releases

    注意它的版本,docker社区的景象版本滞后于kafka,我们自己来打镜像。

    #Dockerfile
    FROM daocloud.io/library/java:openjdk-8u40-jdk
    ADD kafka-manager-2.0.0.2/ /opt/km2002/
    CMD ["/opt/km2002/bin/kafka-manager","-Dconfig.file=/opt/km2002/conf/application.conf"]
    
    #打包,注意将kafka-manager-2.0.0.2放到同一目录
    docker build -t km:2002 .
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    #启动:在上面的yml里,services节点下加一段
    #参考资料:km.yml
    #执行: docker-compose -f km.yml up -d
    		km:
            image: km:2002
            ports:
                - 10906:9000
            depends_on:
                - zookeeper
             
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3.5.2 使用

    使用km可以方便的查看以下信息:

    • cluster:创建集群,填写zk地址,选中jmx,consumer信息等选项
    • brokers:列表,机器信息
    • topic:主题信息,主题内的分区信息。创建新的主题,增加分区
    • cosumers: 消费者信息,偏移量等

    专注Java技术干货分享,欢迎志同道合的小伙伴,一起交流学习

  • 相关阅读:
    【Matplotlib】子图布局
    手把手教大家在CentOS7上编译并安装最新版的FreeSwitch-v1.10.7
    Kubernetes(k8s)介绍
    基于神经网络的车牌识别,卷积神经网络车牌识别
    Android Studio实现内容丰富的安卓校园超市
    架构设计的五个核心要素
    物联网智能互联创新开发平台
    Python(乱学)
    知识(202402)
    文本文件的编码格式
  • 原文地址:https://blog.csdn.net/bxg_kyjgs/article/details/126284978