一、安装kafka
1、由于kafka依赖zookeeper,因此需要安装zookeeper(也可用kafka自带的zookeeper)
cp zoo_sample.cfg zoo.cfg
2、安装kafka
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://127.0.0.1:9092
#kafka的消息存储文件
log.dir=/usr/local/data/kafka‐logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
启动脚本语法:kafka-server-start.sh [-daemon] server.properties
#启动kafka,运行日志在logs目录的server.log文件里
#后台启动,不会打印日志到控制台
bin/kafka‐server‐start.sh ‐daemon config/server.properties
或
bin/kafka‐server‐start.sh config/server.properties &
#进入zookeeper客户端
bin/zkCli.sh
#查看zk根目录下kafka相关内容
ls /
#查看kafka节点
ls /brokers/ids
3、kafka的相关命令
(1)创建topic
# 创建一个名为test的topic,设置分区为1,备份因子为1
kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
# 查看kafka中目前存在的topic
kafka-topics.sh --list --zookeeper 127.0.0.1:2181
(2)发送接收消息
# 开启一个终端连接producer
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
this is a msg
# 再开启一个终端连接consumer,如果想要消费之前的消息可以通过--from-beginning参数指定
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
# 消费之前的消息
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
# 消费多主题
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --whitelist "test|test-2"
(3)单播和多播
单播消费:一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可。
# 启动一个属于testGroup消费组的消费者
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup --topic test
# 再启动一个属于testGroup消费组的消费者
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup --topic test
# 此时生产端发送消息时,只有一个消费者能接收到消息,因此它们同属于一个消费者组
多播消费:一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。
# 启动一个属于testGroup消费组的消费者
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup --topic test
# 启动一个属于testGroup2消费组的消费者
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup2 --topic test
# 此时两个消费者都能消费到消息
# 查看消费组名
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
# 查看消费组的消费偏移量
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testGroup
current-offset:当前消费组的已消费偏移量
log-end-offset:主题对应分区消息的结束偏移量(HW)
lag:当前消费组未消费的消息数
# 创建多个分区的主题
kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 2 --topic test1
# 查看topic的情况
kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test1
# 增加topic的分区数量
kafka-topics.sh -alter --partitions 3 --zookeeper 127.0.0.1:2181 --topic test
二、kafka集群搭建
一个单独的broker意味着kafka集群中只有一个节点,要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。这里以搭建伪集群为例
1、添加两个配置文件
(1)config/server-1.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://127.0.0.1:9093
#kafka的消息存储文件
log.dir=/usr/local/data/kafka‐logs-1
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
(2)config/server-2.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=2
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://127.0.0.1:9094
#kafka的消息存储文件
log.dir=/usr/local/data/kafka‐logs-2
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
2、启动两个实例
kafka-server-start.sh -daemon ../config/server-1.properties
kafka-server-start.sh -daemon ../config/server-2.properties
3、查看zookeep中的ids,是否成功启动
4、创建一个新的topic,副本数设置为3,分区数设置为2
kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
5、查看topic情况
kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic my-replicated-topic
三、集群消费
log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。kafka集群支持配置一个partition备份的数量
针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个leader失效了,其中的一个follower将会自动的变成新的leader
1、Producers
生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过roundrobin做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多
2、Consumers
传统的消息传递模式有2种:队列(queue)和(publish-subscribe)
3、消费顺序
一个partition同一个时刻在一个consumer group中只能有一个consumer instance在消费,从而保证消费顺序。consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则多出来的consumer消费不到消息,产生浪费。
Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。如果有在总体上保证消费顺序的需求,可以通过将topic的partition数量设置为1,将consumer group中的consumer instance数量也设置为1,但是这样会影响性能,所以kafka的顺序消费很少用。