• 01-Kafka之单机和集群安装


    一、安装kafka
    1、由于kafka依赖zookeeper,因此需要安装zookeeper(也可用kafka自带的zookeeper)

    • 下载地址:https://zookeeper.apache.org/releases.html
    • 下载3.8版本解压后执行cp zoo_sample.cfg zoo.cfg
    • 修改zoo.cfg配置文件,将dataDir=/tmp/zookeeper修改为指定的data目录
    • 进入bin目录,启动zookeeper server
      • zkServer.sh start
    • 启动zookeeper client连接Zookeeper server
      • zkCli.sh ‐server ip:port

    2、安装kafka

    • 下载地址:https://kafka.apache.org/downloads
    • 下载2.8版本编译后的二进制文件并解压:kafka_2.12-2.8.0.tgz
    • 修改配置文件config/server.properties
    #broker.id属性在kafka集群中必须要是唯一
    broker.id=0
    #kafka部署的机器ip和提供服务的端口号
    listeners=PLAINTEXT://127.0.0.1:9092
    #kafka的消息存储文件
    log.dir=/usr/local/data/kafka‐logs
    #kafka连接zookeeper的地址
    zookeeper.connect=192.168.65.60:2181
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 启动kafka服务
    启动脚本语法:kafka­-server­-start.sh [-­daemon] server.properties
    
    #启动kafka,运行日志在logs目录的server.log文件里
    #后台启动,不会打印日志到控制台
    bin/kafka‐server‐start.sh ‐daemon config/server.properties 
    或
    bin/kafka‐server‐start.sh config/server.properties &
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 打开zookeeper客户端查看kafka相关节点
    #进入zookeeper客户端
    bin/zkCli.sh
    #查看zk根目录下kafka相关内容
    ls /
    #查看kafka节点
    ls /brokers/ids
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3、kafka的相关命令
    (1)创建topic

    # 创建一个名为test的topic,设置分区为1,备份因子为1
    kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
    
    # 查看kafka中目前存在的topic
    kafka-topics.sh --list --zookeeper 127.0.0.1:2181
    
    • 1
    • 2
    • 3
    • 4
    • 5

    (2)发送接收消息

    # 开启一个终端连接producer
    kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
    this is a msg
    
    # 再开启一个终端连接consumer,如果想要消费之前的消息可以通过--from-beginning参数指定
    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
    
    # 消费之前的消息
    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
    # 消费多主题
    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --whitelist "test|test-2"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    (3)单播和多播
    单播消费:一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可。

    # 启动一个属于testGroup消费组的消费者
    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup --topic test
    
    # 再启动一个属于testGroup消费组的消费者
    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup --topic test
    
    # 此时生产端发送消息时,只有一个消费者能接收到消息,因此它们同属于一个消费者组
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    多播消费:一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。

    # 启动一个属于testGroup消费组的消费者
    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup --topic test
    
    # 启动一个属于testGroup2消费组的消费者
    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup2 --topic test
    
    # 此时两个消费者都能消费到消息
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # 查看消费组名
    kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
    # 查看消费组的消费偏移量
    kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testGroup
    
    current-offset:当前消费组的已消费偏移量
    log-end-offset:主题对应分区消息的结束偏移量(HW)
    lag:当前消费组未消费的消息数
    
    # 创建多个分区的主题
    kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 2 --topic test1
    # 查看topic的情况
    kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test1
    # 增加topic的分区数量
    kafka-topics.sh -alter --partitions 3 --zookeeper 127.0.0.1:2181 --topic test
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    二、kafka集群搭建
    一个单独的broker意味着kafka集群中只有一个节点,要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。这里以搭建伪集群为例
    1、添加两个配置文件
    (1)config/server-1.properties

    #broker.id属性在kafka集群中必须要是唯一
    broker.id=1
    #kafka部署的机器ip和提供服务的端口号
    listeners=PLAINTEXT://127.0.0.1:9093
    #kafka的消息存储文件
    log.dir=/usr/local/data/kafka‐logs-1
    #kafka连接zookeeper的地址
    zookeeper.connect=192.168.65.60:2181
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    (2)config/server-2.properties

    #broker.id属性在kafka集群中必须要是唯一
    broker.id=2
    #kafka部署的机器ip和提供服务的端口号
    listeners=PLAINTEXT://127.0.0.1:9094
    #kafka的消息存储文件
    log.dir=/usr/local/data/kafka‐logs-2
    #kafka连接zookeeper的地址
    zookeeper.connect=192.168.65.60:2181
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2、启动两个实例

    kafka-server-start.sh -daemon ../config/server-1.properties
    kafka-server-start.sh -daemon ../config/server-2.properties
    
    • 1
    • 2

    3、查看zookeep中的ids,是否成功启动

    4、创建一个新的topic,副本数设置为3,分区数设置为2

    kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
    
    • 1

    5、查看topic情况

    kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic my-replicated-topic
    
    • 1

    三、集群消费
    log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。kafka集群支持配置一个partition备份的数量
    针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个leader失效了,其中的一个follower将会自动的变成新的leader

    1、Producers
    生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round­robin做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多

    2、Consumers
    传统的消息传递模式有2种:队列(queue)和(publish-subscribe)

    • queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer
    • publish-subscribe模式:消息会被广播给所有的consumer
      Kafka基于这2种模式提供了一种consumer的抽象概念:consumer group
    • queue模式:所有的consumer都位于同一个consumer group下
    • publish-subscribe模式:所有的consumer都有着自己唯一的consumer group
    由2个broker组成的kafka集群,某个主题总共有4个partition(P0-P3),分别位于不同的broker上。这个集群由2个Consumer Group消费,A有2个consumer instances,B有4个 通常一个topic会有几个consumer group,每个consumer group都是一个逻辑上的订阅者(logical subscriber)。每个consumer group由多个consumer instance组成,从而达到可扩展和容灾的功能

    3、消费顺序
    一个partition同一个时刻在一个consumer group中只能有一个consumer instance在消费,从而保证消费顺序。consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则多出来的consumer消费不到消息,产生浪费。
    Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。如果有在总体上保证消费顺序的需求,可以通过将topic的partition数量设置为1,将consumer group中的consumer instance数量也设置为1,但是这样会影响性能,所以kafka的顺序消费很少用。

  • 相关阅读:
    使用OAuth2实现授权服务
    用于制作耳机壳的倒模专用UV树脂有什么特点?
    孙哥Spring源码第26集
    react项目导出数据doc格式及其他格式方法
    深入理解 Java 泛型
    如何找到‘.‘ is not recognized as an internal or external command的根本原因和解决方案
    gtest学习
    投研报告 -用DEX技术链改投注网站的项目Betswap($BSGG)
    LeetCode:二分查找
    【Multisim仿真】LM339过零电路仿真
  • 原文地址:https://blog.csdn.net/qq_39234967/article/details/126451891