Apache开发的,由Scala和Java编写的分布式消息发布订阅系统
kafka借鉴了JMS的思想,但是没有完全遵循
JMS(Java Messaging Service)
| 名称 | 说明 |
|---|---|
| Broker(中间人) | 消息中间件处理节点,⼀个Kafka节点就是⼀个broker,⼀个或者多个Broker可以组成⼀个Kafka集群 |
| Topic | Kafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需要指定⼀个topic |
| Producer | 消息生产者,想Broker发送消息的客户端 |
| Consumer | 消息消费者,从Broker读取消息的客户端 |
| ConsumerGroup | 每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息 |
| Partition | 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的 |
从较高的层次上看,producer通过网络发送消息到kafka集群,然后consumer来进行消费。
服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。
启动kafka需要jdk环境,如果没有的,可以通过yum安装配置
kafka中默认也带zk,所以可以使用kafka中自带的
到kafka的bin目录下
启动zk(kafka中自带的):
sh zookeeper-server-start.sh ../config/zookeeper.properties
启动kafka:
sh kafka-server-start.sh ../config/server.properties
如果因为之前使用了别的zk注册kafka,那么就到kafka配置的日志文件中【配置文件路径在config/server.properties】,找到日志存放路径

然后进入到该路径,删除该路径下的所有日志文件
rm -rf kafka-logs/删除所有日志,重新启动
首先,要使用kafka是需要有zookeeper环境的,我们首先需要到官网去下载zookeeper
官网地址:https://zookeeper.apache.org/releases.html
此处我使用的是3.5.7版本
通过Xftp将windows上的zookeeper推送到linux上,然后再/usr/local下创建zookeeper目录,在该目录下,通过tar -zxvf 压缩包名,解压
在bin目录下,通过sh zookeeper.sh启动zookeeper
官⽹下载kafka的压缩包:http://kafka.apache.org/downloads
/usr/local/kafka/
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
#kafka部署的机器ip和提供服务的端⼝号
listeners=PLAINTEXT://192.168.145.13:9092
#kafka的消息存储⽂件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=localhost:2181
./kafka-server-start.sh -daemon ../config/server.properties
验证是否启动成功:
启动zk客户端,进⼊到zk中的节点看id是0的broker有没有存在(上线),
ls /brokers/ids/
结果:

kafka成功在zookeeper上注册。
| Property | Default | Description |
|---|---|---|
| broker.id | 0 | 每个broker通过非负整数id进行标识,需要唯一 |
| log.dirs | /temp/kafka-logs | kafka存放数据的路径,路径不唯一,可以是多个,路径间用逗号分隔 |
| listeners | PLAINTEXT://192.168.145.13:9092 | server接收客户端连接的端口,ip配置为kafka本机ip |
| zookeeper.connect | localhost:2181 | zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接⽅式为hostname1:port1, hostname2:port2,hostname3:port3 |
| log.retention.hours | 168 | 日志保存时间 |
| num.partitions | 1 | 创建topic的默认分区数 |
| default.replication.factor | 1 | 自动创建topic的默认副本数量,建议设置为大于等于2 |
| min.insync.replicas | 1 | 当producer设置acks为-1时,min.insync.replicas指定replicas的最⼩数⽬(必须确认每⼀个repica的写数据都是成功的),如果这个数⽬没有达到,producer发送消息会产⽣异常 |
| delete.topic.enable | false | 是否允许删除主题 |
topic是什么概念?topic可以实现消息的分类,不同消费者订阅不同的topic。

执⾏以下命令创建名为“test”的topic,这个topic只有⼀个partition,并且备份因⼦也设置为1
在kafka的bin目录下执行:
./kafka-topics.sh --create --zookeeper 192.168.145.13:2181 --replication-factor 1 --partitions 1 --topic test
ip地址为zookeeper地址
查看当前kafka内有哪些topic,在kafka的bin目录下执行
./kafka-topics.sh --list --zookeeper 192.168.145.13:2181

kafka⾃带了⼀个producer命令客户端,可以从本地⽂件中读取内容,或者我们也可以以命令⾏中直接输⼊内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每⼀个⾏会被当做成⼀个独⽴的消息。使⽤kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic
./kafka-console-producer.sh --broker-list 192.168.145.13:9092 --topic test
ip为kafka地址,指定topic为test

对于consumer,kafka同样也携带了⼀个命令⾏客户端,会将获取到内容在命令中进⾏输出,默认是消费最新的消息。使⽤kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息
方式一:从最后一条消息的偏移量+1开始消费
./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092 --topic test

方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092 --from-beginning
--topic test

注意:
消息的发送⽅会把消息发送到broker中,broker会存储消息,消息是按照发送的顺序进⾏存
储。因此消费者在消费消息时可以指明主题中消息的偏移量。默认情况下,是从最后⼀个消
息的下⼀个偏移量开始消费。
①单播消息的实现
单播消息:一个消费者组里只有一个消费者能消费到某个topic中的消息。一个消费者组中有多个消费者,但是只有一个消费者可以消费到消息。
producer:
./kafka-console-producer.sh --broker-list 192.168.145.13:9092 --topic test
多个consumer开两个会话就行
consumer1:
./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092
--consumer-property group.id=testGroup --topic test
consumer2:
./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092
--consumer-property group.id=testGroup --topic test
②多播消息的实现
在一些业务场景中需要让一条消息被多个消费者消费,那么就可以使用多播模式。kafka实现多播,只需要让不同的消费者处于不同的消费者组即可。
testGroup1中的consumer:
./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092
--consumer-property group.id=testGroup1 --topic test
testGroup2中的consumer:
./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092
--consumer-property group.id=testGroup2 --topic test
# 查看当前主题下有哪些消费者组
./kafka-consumer-groups.sh --bootstrap-server 192.168.145.13:9092 --list
# 查看消费者组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 192.168.145.13:9092 --describe
--group testGroup


主题Topic
主题Topic可以理解为一个类别的名称
partition分区
anatomy:剖析

一个主题中的消息量是非常大的,因此可以通过分区的设置,来分布式存储这些消息。比如一个topic创建了3个分区。那么topic中的消息就会分别存在这三个分区中【一个国家,分为多个省份】
为一个主题创建多个分区:
./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2 --topic test1
./kafka-topics.sh --describe --zookeeper localhost:2181

分区的作用:
cp server.properties server1.properties
cp server.properties server2.properties
server1.proerties
broker.id=1
listeners=PLAINTEXT://192.168.145.13:9093
log.dir=/usr/local/data/kafka-logs-1
server2.properties
broker.id=2
listeners=PLAINTEXT://192.168.145.13:9094
log.dir=/usr/local/data/kafka-logs-2
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
进入kafka自带zk的cli
来到kafka的bin目录下:
sh zookeeper-shell.sh 本机IP:2181

通过ls /brokers/ids/查看当前节点下的信息:

可以看到成功注册上了,至此集群搭建完成
副本是对分区的备份。在集群中,不同的副本会被部署在不同的broker(kafka节点)上。
例子:创建1个主题,2个分区,3个副本。
./kafka-topics.sh --create --zookeeper 192.168.145.13:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
通过查看主题信息,来获取其中关键数据,此处指定查看my-replicated-topic主题内容
./kafka-topics.sh --zookeeper localhost:2181 --topic my-replicated-topic --describe

分析图:

每个partition都有一个broker作为leader。
消息发送方要把消息发给哪个broker?就看副本的leader是在哪个broker上。
副本里的leader专门用来接收消息。接收到消息,其他follower通过poll的方式来同步数据。
通过kill掉leader后再查看主题情况
# kill掉header
ps -aux | grep server.properties
kill -9 11968
# 重新查看topic情况
./kafka-topics.sh --describe --zookeeper 192.168.145.13:2181 --topic my-replicated-topic


./kafka-console-producer.sh --broker-list 192.168.145.13:9092,192.168.145.13:9093,192.168.145.13:9094 --topic my-replicated-topic
./kafka-console-consumer.sh --bootstrap-server 192.168.145.13:9092,192.168.145.13:9093,192.168.145.13:9094 --from-beginning --topic my-replicated-topic

消费组中消费者的的数量不能比一个topic中的partition数量多,否则多出来的消费者消费不到消息。