• kafka环境搭建以及基本原理


    kafka最先是作为日志数据采集,后用于消息传递,kafka能承担tb级别数据存储,确保服务的可用性,允许少量数据的丢失

    作为消息中间件就有异步、解耦、削峰三个作用

    一、单机搭建

    单机ip:192.168.64.133

    下载地址:Apache Kafka 选择kafka_2.13-3.4.0.tgz进行下载

    关于kafka的版本,前面的2.13是开发kafka的scala语言的版本,后面的3.4.0是kafka应用的版本。

    下载Zookeeper,下载地址 Apache ZooKeeper ,kafka有内置的zookeeper,Zookeeper的版本并没有强制要求,这里我们选择比较新的3.6.1版本。

    1. #下载解压
    2. cd /usr/local/kafka
    3. wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
    4. tar -zxvf kafka_2.13-3.4.0.tgz

     1、启动Kafka之前需要先启动Zookeeper。**这里就用Kafka自带的Zookeeper。启动脚本在bin目录下。

    1. cd kafka_2.13-3.4.0/
    2. nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

    ​ 从nohup.out中可以看到zookeeper默认会在2181端口启动。通过jps指令看到一个QuorumPeerMain进程,确定服务启动成功。 

     2、启动Kafka。

    nohup bin/kafka-server-start.sh config/server.properties &

    启动完成后,使用jps指令,看到一个kafka进程,确定服务启动成功。服务会默认在9092端口启动。

    3、简单收发消息

    ​ Kafka的基础工作机制是消息发送者可以将消息发送到kafka上指定的topic,而消息消费者,可以从指定的topic上消费消息。

      首先,可以使用Kafka提供的客户端脚本创建Topic

    1. #查看帮助命令
    2. bin/kafka-topics.sh --help
    3. #创建Topic
    4. bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
    5. #查看Topic
    6. bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092

     然后,启动一个消息发送者端。往一个名为test的Topic发送消息。

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    当命令行出现 > 符号后,随意输入一些字符然后enter。Ctrl+C 退出命令行。这样就完成了往kafka发消息的操作。

      然后启动一个消息消费端,从名为test的Topic上接收消息。

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

    4、其他消费模式

    ​ 之前我们通过kafka提供的生产者和消费者脚本,启动了一个简单的消息生产者以及消息消费者,实际上,kafka还提供了丰富的消息消费方式。

    指定消费进度

    ​ 通过kafka-console.consumer.sh启动的控制台消费者,会将获取到的内容在命令行中输出。如果想要消费之前发送的消息,可以通过添加--from-begining参数指定。

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

    如果需要更精确的消费消息,甚至可以指定从哪一条消息开始消费。 

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 4 --topic test

    这表示从第0号Partition上的第四个消息开始读起。Partition和Offset是什么呢,可以用以下指令查看。

    分组消费

    对于每个消费者,可以指定一个消费者组。kafka中的同一条消息,只能被同一个消费者组下的某一个消费者消费。而不属于同一个消费者组的其他消费者,也可以消费到这一条消息。在kafka-console-consumer.sh脚本中,可以通过--consumer-property group.id=testGroup来指定所属的消费者组。例如,可以启动三个消费者组,来验证一下分组消费机制: 

    1. #两个消费者实例属于同一个消费者组
    2. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup --topic test
    3. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup --topic test
    4. #这个消费者实例属于不同的消费者组
    5. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup2 --topic test

    查看消费者组的偏移量

    ​ 接下来,还可以使用kafka-consumer-groups.sh观测消费者组的情况。包括他们的消费进度。

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

    二、集群搭建

    集群就为了保证服务的高可用和数据的安全性,这就是我的理解,万一挂了其它节点依旧可以对外提供服务

    1、部署zookeeper集群

    1. #下载解压
    2. cd /usr/local/zookeeper
    3. wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
    4. tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
    5. #配置
    6. cd /usr/local/zookeeper/apache-zookeeper-3.6.1-bin/conf
    7. cp zoo_sample.cfg zoo.cfg
    8. vim zoo.cfg
    9. #zoo.cfg文件中修改如下配置
    10. #Zookeeper的本地数据目录,默认是/tmp/zookeeper。这是Linux的临时目录,随时会被删掉。
    11. dataDir=/usr/local/zookeeper/data
    12. dataLogDir=/usr/local/zookeeper/log
    13. #Zookeeper的服务端口
    14. clientPort=2181
    15. #集群节点配置
    16. server.1=192.168.64.133:2888:3888
    17. server.2=192.168.64.134:2888:3888
    18. server.3=192.168.64.128:2888:3888
    19. #切换到dataDir目录下
    20. vim myid
    21. #myid文件内容:比如当前节点是server.x,那么内容就是x

    其中,clientPort 2181是对客户端开放的服务端口。

    集群配置部分, server.x这个x就是节点在集群中的myid。后面的2888端口是集群内部数据传输使用的端口。3888是集群内部进行选举使用的端口。

    ​ 接下来将整个Zookeeper的应用目录分发到另外两台机器上。就可以在三台机器上都启动Zookeeper服务了。

    1. #切换到zookeeper的home目录下,运行如下命令启动
    2. bin/zkServer.sh --config conf start

    启动完成后,使用jps指令可以看到一个QuorumPeerMain进程就表示服务启动成功。

    ​ 三台机器都启动完成后,可以查看下集群状态。

    1. root@gaorufeng20221210:/usr/local/zookeeper/apache-zookeeper-3.6.1-bin# bin/zkServer.sh status
    2. ZooKeeper JMX enabled by default
    3. Using config: /usr/local/zookeeper/apache-zookeeper-3.6.1-bin/bin/../conf/zoo.cfg
    4. Client port found: 2181. Client address: localhost.
    5. Mode: leader
    6. root@gaorufeng20221210:/usr/local/zookeeper/apache-zookeeper-3.6.1-bin#

    这其中Mode 为leader就是主节点,follower就是从节点。

    2、部署kafka集群

    ​ kafka服务并不需要进行选举,因此也没有奇数台服务的建议。

    ​ 部署Kafka的方式跟部署Zookeeper差不多,就是解压、配置、启服务三板斧。

    ​ 首先将Kafka解压到/usr/local/kafka目录下。

    ​ 然后进入config目录,修改server.properties。这个配置文件里面的配置项非常多,下面列出几个要重点关注的配置。

    1. #broker 的全局唯一编号,不能重复,只能是数字。
    2. broker.id=0
    3. #监听端口,要么就配置hosts文件
    4. listeners=PLAINTEXT://192.168.64.133:9092
    5. #数据文件地址。同样默认是给的/tmp目录。
    6. log.dirs=/usr/local/kafka/kafka-logs
    7. #默认的每个Topic的分区数
    8. num.partitions=1
    9. #zookeeper的服务地址
    10. zookeeper.connect=192.168.64.133:2181,192.168.64.134:2181,192.168.64.128:2181
    11. #可以选择指定zookeeper上的基础节点。
    12. #zookeeper.connect=192.168.64.133:2181,192.168.64.134:2181,192.168.64.128:2181/kafka

    broker.id需要每个服务器上不一样,分发到其他服务器上时,要注意修改一下。

    多个Kafka服务注册到同一个zookeeper集群上的节点,会自动组成集群。

    配置文件中的注释非常细致,可以关注一下。下面是server.properties文件中比较重要的核心配置

    PropertyDefaultDescription
    broker.id0broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为一的即可。
    log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
    listenersPLAINTEXT://127.0.0.1:9092server接受客户端连接的端口,ip配置kafka本机ip即可
    zookeeper.connectlocalhost:2181zookeeper连接地址。hostname:port。如果是Zookeeper集群,用逗号连接。
    log.retention.hours168每个日志文件删除之前保存的时间。
    num.partitions1创建topic的默认分区数
    default.replication.factor1自动创建topic的默认副本数量
    min.insync.replicas1当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
    delete.topic.enablefalse是否允许删除主题

    ​ 接下来就可以启动kafka服务了。启动服务时需要指定配置文件。

    bin/kafka-server-start.sh -daemon config/server.properties

    -daemon表示后台启动kafka服务,这样就不会占用当前命令窗口。

    ​ 通过jps指令可以查看Kafka的进程。

    三、Kraft集群

    ​ 在Kafka的config目录下,提供了一个kraft的文件夹,在这里面就是Kraft协议的参考配置文件。在这个文件夹中有三个配置文件,broker.properties,controller.properties,server.properties,分别给出了Kraft中三种不同角色的示例配置。

    • broker.properties: 数据节点
    • controller.properties: Controller控制节点
    • server.properties: 即可以是数据节点,又可以是Controller控制节点。

    这里同样列出几个比较关键的配置项,按照自己的环境进行定制即可

    1. #配置当前节点的角色。Controller相当于Zookeeper的功能,负责集群管理。Broker提供具体的消息转发服务。
    2. process.roles=broker,controller
    3. #配置当前节点的id。与普通集群一样,要求集群内每个节点的ID不能重复。
    4. node.id=1
    5. #配置集群的投票节点。其中@前面的是节点的id,后面是节点的地址和端口,这个端口跟客户端访问的端口是不一样的。通常将集群内的所有Controllor节点都配置进去。
    6. controller.quorum.voters=1@worker1:9093,2@worker2:9093,3@worker3:9093
    7. #Broker对客户端暴露的服务地址。基于PLAINTEXT协议。
    8. advertised.listeners=PLAINTEXT://worker1:9092
    9. #Controller服务协议的别名。默认就是CONTROLLER
    10. controller.listener.names=CONTROLLER
    11. #配置监听服务。不同的服务可以绑定不同的接口。这种配置方式在端口前面是省略了一个主机IP的,主机IP默认是使用的java.net.InetAddress.getCanonicalHostName()
    12. listeners=PLAINTEXT://:9092,CONTROLLER://:9093
    13. #数据文件地址。默认配置在/tmp目录下。
    14. log.dirs=/app/kafka/kraft-log
    15. #topic默认的partition分区数。
    16. num.partitions=2

    将配置文件分发,并修改每个服务器上的node.id属性和advertised.listeners属性。

    ​ 由于Kafka的Kraft集群对数据格式有另外的要求,所以在启动Kraft集群前,还需要对日志目录进行格式化。

    1. [oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh random-uuid
    2. j8XGPOrcR_yX4F7ospFkTA
    3. [oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh format -t j8XGPOrcR_yX4F7ospFkTA -c config/kraft/server.properties 
    4. Formatting /app/kafka/kraft-log with metadata.version 3.4-IV0.

    -t 表示集群ID,三个服务器上可以使用同一个集群ID。

    ​ 接下来就可以指定配置文件,启动Kafka的服务了。 例如,在Worker1上,启动Broker和Controller服务。

    1. [oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-server-start.sh -daemon config/kraft/server.properties 
    2. [oper@worker1 kafka_2.13-3.4.0]$ jps
    3. 10993 Jps
    4. 10973 Kafka

    ​ 等三个服务都启动完成后,就可以像普通集群一样去创建Topic,并维护Topic的信息了。

  • 相关阅读:
    vue+antd——实现table表格的打印——分页换行,每页都有表头——基础积累
    ES增删改查入门
    K3wise 常用表及视图
    Linux: signal:需要注意的一个问题:SIGRTMIN 32 vs 34
    【量化交易笔记】11.移动平均交易策略
    前端使用docx-preview展示docx + 后端doc转docx
    Linux C文件操作
    毕业设计选题之Java+springboot线上蔬菜销售与配送系统(源码+调试+开题+lw)
    R可视化:给柱状图添加网格
    哪种烧录单片机的方法合适?
  • 原文地址:https://blog.csdn.net/gp3056/article/details/133390216