消息队列(Message Queue)是一种用于跨进程或分布式系统中传递消息的通信机制。消息队列在异步通信、系统解耦、负载均衡和容错方面具有重要作用。
异步通信:发送方将消息发送到队列中后,不需要等待接收方处理完毕即刻返回继续执行。接收方可以在需要时从队列中读取并处理消息。
解耦:消息队列在发送方和接收方之间充当中介,允许它们独立运行。这样,即使其中一个部分暂时不可用,系统的整体功能依然可以保持正常。
负载均衡:通过消息队列,多个消费者可以分担消息处理的工作量,提高系统的吞吐量。
可靠性:消息队列可以确保消息在传递过程中不丢失,即使在系统出现故障时也能保证消息被妥善处理。
扩展性:可以根据需要增加消息生产者或消费者,从而轻松扩展系统。
缓冲和流量控制:在高并发的场景下,系统的不同部分可能无法以相同的速度处理请求。消息队列可以作为缓冲区,调节生产者和消费者之间的速度差异,防止系统过载或崩溃。
特点:
用途:
优点:
缺点:
特点:
用途:
优点:
缺点:
特点:
用途:
优点:
缺点:
特点:
用途:
优点:
缺点:
Apache Kafka 是一个分布式流处理平台,最初由LinkedIn开发,并于2011年成为Apache开源项目。Kafka主要用于构建实时数据管道和流处理应用程序,它具有高吞吐量、低延迟、可扩展性和容错性。
Kafka 的架构主要包括以下几个核心组件:
生产者是负责发布消息到 Kafka 主题的客户端。生产者可以选择将消息发送到特定的分区,也可以通过键进行分区路由。
消费者是负责从 Kafka 主题中读取消息的客户端。消费者可以订阅一个或多个主题,并以流的方式处理数据。消费者通常会组成一个消费者组(Consumer Group),每个消费者组可以同时读取和处理消息,实现负载均衡。
Kafka 集群中的每个服务器称为一个代理(Broker)。每个代理负责接收、存储和转发消息。一个 Kafka 集群通常由多个代理组成,以实现高可用性和容错性。
多生产者和多消费者:一个主题可以有多个生产者发布消息,也可以有多个消费者订阅和消费消息。这种特性使得 Kafka 可以轻松实现多对多的数据传递。
逻辑分组:主题用于将相同类别的数据进行逻辑分组,便于管理和处理。例如,可以创建一个主题来存储网站的访问日志,另一个主题来存储订单信息。
分区是 Kafka 中并行处理的基本单元。每个分区在磁盘上是一个日志文件,消息以追加的方式写入。分区提供了高吞吐量和并行处理能力。
Zookeeper 负责存储集群的元数据,如代理信息、分区状态等。它还负责选举 Kafka 集群的控制器,管理分区的副本分配和故障恢复。
数据路由规则决定了生产者如何将消息发送到 Kafka Topic 的各个 Partition。Kafka 提供了几种常见的 Partitioning 策略,具体如下:
Round-robin(轮询):这是最简单的策略,生产者轮流将消息发送到不同的 Partition。如果所有 Partition 都有相似的负载和数据量,这种策略可以实现基本的负载均衡。但是,它不能保证消息的相关性或有序性。
Hash-based(基于哈希):生产者使用消息的某个属性(如 key)计算哈希值,然后根据哈希值将消息路由到对应的 Partition。这种策略可以保证具有相同 key 的消息总是被发送到同一个 Partition,从而保证了这些消息的顺序性。
Custom Partitioner(自定义分区器):开发者可以根据自己的需求实现自定义的 Partitioner 接口,来控制消息的分区逻辑。这种方式灵活性最高,可以根据业务需求定义非常复杂的分区逻辑。
在 Kafka 中,默认的 Partitioning 策略是基于消息的 key 进行哈希分区。具体步骤如下:
如果消息有 key,则使用 key 进行哈希计算,然后将哈希值与 Topic 的 Partition 数取模,以确定消息发送到哪个 Partition。
如果消息没有 key,则使用轮询策略,即将消息依次发送到每个 Partition,实现简单的负载均衡。
消费者组成员:
主题分区分配:
消费者组协调器:
消费者协调与分配:
消息处理并行性:
消费者偏移提交:
消费者组在 Kafka 中有多种应用场景,包括但不限于:
并行处理:多个消费者实例并行处理同一主题的消息,提高消费吞吐量和效率。
水平扩展:通过增加消费者实例,可以水平扩展消费者组的处理能力,适应大规模数据流的需求。
容错和高可用性:当一个消费者实例故障或下线时,协调器会重新分配其负责的分区给其他实例,确保消息的连续性和可用性。
偏移量是一个64位的整数,用来唯一标识消费者在一个特定分区中已经消费过的消息位置。每个消费者都会为每个分区维护一个偏移量。偏移量的作用包括:
消费位置的记录:偏移量表示消费者已经处理并成功提交的消息位置。消费者会定期地更新偏移量,以记录自己的消费进度。
消息处理的顺序性:Kafka 保证每个分区内的消息顺序,消费者通过记录偏移量来确保消息的有序消费,避免重复消费或消息丢失。
消费者的恢复:如果消费者实例停止或重启,它可以利用存储的偏移量来恢复消费位置,从上次离开的地方继续消费,而不会丢失消息。
在 Kafka 中,偏移量的管理可以通过以下几种方式实现:
偏移量的生命周期包括:
在 Apache Kafka 中,副本(Replica)的同步是通过一种基于日志的复制机制来实现的,具体过程如下:
副本同步的过程主要分为两个阶段:首先是领导者将数据写入本地日志(Leader Log),然后追随者从领导者的日志中复制数据。
生产者发送消息:
消息确认:
消息发送确认类型:
Kafka 提供了三种消息发送确认类型,分别是:
消息持久性保证:
根据不同的确认类型,Kafka 提供了不同级别的消息持久性保证:
生产者确认机制:
acks 参数来选择确认级别。生产者可以通过配置来平衡消息传递的可靠性和延迟。消息确认机制的工作流程
acks 参数,生产者可能会等待领导者副本或所有 ISR 中的副本确认消息的接收。acks=all,生产者会等待所有 ISR 中的副本都确认接收消息,然后才会收到确认。acks=1,生产者会等待领导者副本确认接收消息,并且不会等待其他 ISR 中的副本确认。追随者复制数据:
同步方式:
Kafka 支持两种类型的复制同步方式:
同步复制
异步复制
数据批次:
保序性:
ISR 机制:
领导者故障:
追随者故障:
log.segment.bytes 控制默认大小,默认为 1GB)来确定何时创建新的日志分段文件。log.retention.hours:指定消息在分段中保留的时间,默认为 7 天。log.retention.bytes:指定分段文件的最大大小,默认为 -1(无限制)。默认情况下,Kafka 的日志文件和索引文件存储在一个或多个目录中,这些目录由 log.dirs 参数指定。
log.dirs 参数可以配置为一个或多个目录路径,多个路径之间用逗号分隔。这样做的目的是为了提供数据的冗余备份和提高性能。
当 Kafka 创建新的日志分段文件或索引文件时,它会依次选择配置的目录路径之一来存储文件。
配置示例
在 Kafka 的配置文件(通常是 server.properties)中,可以配置 log.dirs 参数,例如:
log.dirs=/path/to/kafka/logs
如果需要配置多个存储路径,可以用逗号分隔:
log.dirs=/path/to/kafka/logs1,/path/to/kafka/logs2
Kafka 的安装目录结构通常如下所示:
kafka/
├── bin/ # 包含所有 Kafka 命令行工具的目录
├── config/ # 存放 Kafka 配置文件的目录
├── libs/ # 存放 Kafka 所需的库文件
├── logs/ # 存放 Kafka 日志文件的目录
└── ...
创建主题:
./bin/kafka-topics.sh --create --topic <topic_name> --partitions <num_partitions> --replication-factor <replication_factor> --bootstrap-server <broker_list>
--replication-factor <replication_factor>:指定每个分区的副本数。
--bootstrap-server <broker_list>:指定连接的 Kafka Broker 列表。
示例:
./bin/kafka-topics.sh --create --topic myTopic --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
说明:在 Kafka 安装目录下的 bin/ 目录中执行命令。
查看主题列表:
./bin/kafka-topics.sh --list --bootstrap-server <broker_list>
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
查看主题详细信息:
./bin/kafka-topics.sh --describe --topic <topic_name> --bootstrap-server <broker_list>
./bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092
生产者发送消息:
./bin/kafka-console-producer.sh --topic <topic_name> --bootstrap-server <broker_list>
示例:
./bin/kafka-console-producer.sh --topic myTopic --bootstrap-server localhost:9092
消费者消费消息:
./bin/kafka-console-consumer.sh --topic <topic_name> --from-beginning --bootstrap-server <broker_list>
示例:
./bin/kafka-console-consumer.sh --topic myTopic --from-beginning --bootstrap-server localhost:9092
查看消费者组列表:
./bin/kafka-consumer-groups.sh --list --bootstrap-server <broker_list>
示例:
./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
查看消费者组消费情况:
./bin/kafka-consumer-groups.sh --describe --group <group_name> --bootstrap-server <broker_list>
示例:
./bin/kafka-consumer-groups.sh --describe --group myConsumerGroup --bootstrap-server localhost:9092
查看集群信息:
./bin/kafka-topics.sh --describe --topic <topic_name> --bootstrap-server <broker_list>
示例:
./bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092
查看 Broker 日志:
tail -f logs/server.log
示例:在 Kafka 根目录下执行,查看当前 Kafka Broker 的日志。
tail -f logs/server.log
ZooKeeper 的工作机制主要围绕其设计的分布式一致性服务和协作基础展开,它通过一些关键的设计和算法来实现高可靠性和可扩展性。以下是 ZooKeeper 的工作机制的详细解释。
ZooKeeper 集群:ZooKeeper 以集群的方式运行,每个集群节点称为 ZooKeeper 服务器。集群中的节点数量通常为奇数,例如 3、5、7 等,以实现多数派选举和容错性。
客户端连接:客户端通过连接到任意一个 ZooKeeper 服务器来与整个集群进行交互,这些服务器相互之间通过 TCP/IP 进行通信。
ZooKeeper 的数据模型:ZooKeeper 提供了类似文件系统的数据模型,称为 ZooKeeper 数据树(ZooKeeper Data Tree)。
/
├── apps
│ ├── app1
│ └── app2
├── config
│ ├── global
│ │ ├── setting1
│ │ └── setting2
│ └── local
│ └── setting3
└── services
├── service1
│ ├── instances
│ │ ├── instance1
│ │ └── instance2
│ └── status
└── service2
根节点 / 是 ZooKeeper 数据树的起始点。
apps、config 和 services 是一级子节点。
apps 下有 app1 和 app2 两个子节点。
config 下有 global 和 local 两个子节点,global 下包含 setting1 和 setting2。
services 下有 service1 和 service2,service1 下有 instances 和 status 两个子节点,instances 下有 instance1 和 instance2。
节点(ZNode):ZooKeeper 数据树中的每个节点称为 ZNode,类似于文件系统中的目录或文件。每个 ZNode 可以存储一小段数据,并且可以关联 ACL(访问控制列表)来限制访问权限。
ZAB 协议:ZooKeeper 使用 ZAB(ZooKeeper Atomic Broadcast)协议来实现数据的一致性和可靠性。
基本概念:
一致性保证:ZooKeeper 保证了在分布式环境下的数据一致性,所有的更新操作都是原子性的,并且按照客户端的顺序执行。
持久性存储:ZooKeeper 将数据存储在磁盘上,并使用写前日志(Write-Ahead Log,WAL)来保证即使在节点故障后也不会丢失数据。
Watch 机制:ZooKeeper 允许客户端注册 Watcher 监听某个 ZNode 的状态变化。
事件通知:一旦 ZNode 的状态发生变化(如数据更新、ZNode 删除等),ZooKeeper 将通知所有注册了 Watcher 的客户端,使得客户端可以及时响应状态变化。
ZooKeeper 的工作机制使其在以下场景中广泛应用:
分布式锁:通过创建临时顺序节点实现分布式锁,确保在分布式环境下的资源竞争问题。
选举机制:通过 ZAB 协议实现 Leader 选举,确保分布式系统中只有一个主节点处理请求。
配置管理:存储和管理配置信息,各个节点通过 Watcher 实时感知配置的变化。
服务注册与发现:将服务节点作为 ZNode 注册到 ZooKeeper 中,客户端通过 Watcher 发现服务的上线和下线状态。
分布式协调:
数据管理:
配置管理:
命名服务:
分布式锁:
事件通知:
LOOKING(选举状态):
节点正在寻找新的 Leader,即处于选举过程中。
发起选举请求,尝试成为新的 Leader。
FOLLOWING(跟随状态):
Follower 节点的状态。
节点已经确定当前的 Leader,并跟随 Leader 处理请求。
LEADING(领导状态):
Leader 节点的状态。
节点负责处理和协调整个集群的写请求和事务处理。
OBSERVING(观察状态):
Observer 节点的状态。
类似于 Follower,但专门用于扩展集群的读取能力。
不参与 Leader 的选举过程,仅处理读请求,提升系统的性能和吞吐量。
启动阶段:
节点状态:
投票:
多数派原则:
Leader 确定:
系统启动:
节点数和半数原则:
超时机制:
持久化:
Leader 失效检测:
触发选举:
投票过程:
选举规则:
EPOCH(选举周期)优先:
事务ID(ZXID)优先:
服务器ID(SID)优先:
多数派原则:
新 Leader 确定:
系统恢复:
启动 ZooKeeper 服务
bin/zkServer.sh start
停止 ZooKeeper 服务
bin/zkServer.sh stop
重启 ZooKeeper 服务
bin/zkServer.sh restart
查看 ZooKeeper 服务状态
bin/zkServer.sh status
启动 ZooKeeper 客户端
bin/zkCli.sh
连接到特定的 ZooKeeper 服务器
bin/zkCli.sh -server 127.0.0.1:2181
在启动了 ZooKeeper 客户端后,可以使用以下命令:
创建 znode
create /my-node "some data"
获取 znode 数据
get /my-node
设置 znode 数据
set /my-node "new data"
删除 znode
delete /my-node
列出 znode
ls /
查看 znode 状态
stat /my-node
设置集群模式 在 conf/zoo.cfg 中配置多台服务器:
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888
管理 ACL(访问控制列表)
setAcl /my-node world:anyone:r
将 ZooKeeper 设置为系统服务可以简化管理:
创建 Systemd 服务文件 在 /etc/systemd/system 目录下创建 zookeeper.service 文件:
[Unit]
Description=ZooKeeper
After=network.target
[Service]
Type=forking
ExecStart=/path/to/zookeeper/bin/zkServer.sh start
ExecStop=/path/to/zookeeper/bin/zkServer.sh stop
ExecReload=/path/to/zookeeper/bin/zkServer.sh restart
User=zookeeper
Group=zookeeper
Restart=on-failure
[Install]
WantedBy=multi-user.target
启动和启用 ZooKeeper 服务
systemctl daemon-reload
systemctl start zookeeper
systemctl enable zookeeper
| 服务器 | 部署 |
|---|---|
| 192.168.20.140 | zookeeper、kafka |
| 192.168.20.141 | zookeeper、kafka |
| 192.168.20.142 | zookeeper、kafka |
systemctl stop firewalld
setenforce 0

mv zookeeper /usr/local/zookeeper
# 复制模板配置文件
cp zoo_sample.cfg zoo.cfg
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
tickTime=2000
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
initLimit=10
#Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认 为Follwer死掉,并从服务器列表中删除Follwer
syncLimit=5
#修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataDir=/tmp/zookeeper
#添加, 指定存放日志的目录,目录需要单独创建
dataLogDir=/tmp/logs
#客户端连接端口
clientPort=2181
#添加集群信息
server.1=192.168.20.140:3188:3288
server.2=192.168.20.141:3188:3288
server.3=192.168.20.142:3188:3288
server.A=B:C:D
●A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件
myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面
的配置信息比较从而判断到底是哪个server
●c是这个服务器Follower与集群中的Leader服务器交换信息的端口
●D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就
是用来执行选举时服务器相互通信的端口
如果指定节点不参加选举,在末尾加observer
server.3=192.168.19.102:3188:3288:observer
#在每个节点上创建数据目录和日志目录
mkdir /tmp/zookeeper
mkdir /tmp/logs
#在每个节点的dataDir指定的目录下创建一个myid的文件
echo 1 > /tmp/zookeeper/myid
echo 2 > /tmp/zookeeper/myid
echo 3 > /tmp/zookeeper/myid
cd /etc/systemd/system
vim zookeeper.service
[Unit]
Description=ZooKeeper
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
Restart=on-failure
[Install]
WantedBy=multi-user.target
# 写入后加载配置
systemctl daemon-reload


cd /kafka/config
# 备份配置文件
cp server.properties{,.bak}
vim server.properties
broker.id=0
#21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置
broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.19.100:9092
broker的IP需区分开来,也可保持默认配置不用修改
num.network.threads=3
#42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.io.threads=8
#45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400
#48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
#51行,接收套接字的缓冲区大小
#54行,请求套接字的缓冲区大小
#31行,指定监听的IP和端口,可以修改每个
#60行,kafka运行日志存放的路径,也是数据存放的路径
num.partitions=1
#65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1
log.retention.hours=168
#69行,用来恢复和清理data下数据的线程数量
#103行,segment文件(数据文件)保留的最长时间,单位为小时,默
认为7天,超时将被删除
log.segment.bytes=1073741824
#110行,一个segment文件最大的大小,默认为 1G,超出将新建
一个新的segment文件
zookeeper.connect=192.168.20.140:2181,192.168.20.141:2181,192.168.20.142:2181
#123行,配置连接Zookeeper集群地址
# 如果设备延迟高,可以将zookeeper的连接超时时间改高一些
zookeeper.connection.timeout.ms=30000
#修改环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
#配置 Kafka 启动脚本
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
echo "---------- Kafka 启动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 状态 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ]; then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
exit 1
;;
esac
# 添加权限
chmod +x /etc/init.d/kafka
service kafka start
# 创建topic
./kafka-topics.sh --create --zookeeper 192.168.20.140:2181,192.168.20.141:2181,192.168.20.142:2181 --replication-factor 2 --partitions 3 --topic test
