Kafka是一个基于发布订阅者模式的消息队列,实现数据缓存、流量削峰等等功能。
在大数据生态下,Kafka主要是用来和实时计算框架对接去处理海量的实时数据
生产消息、数据
Kafka的生产者,生产者是为了给topic的partition生产数据的,生产者可以是Flume,也可以是我们自定义的操作,还可以是Kafka自带的控制台生产者。 生产者生产的数据放到Kafka主题的哪个分区? 生产者生产的数据都是key和value格式类型的数据,只不过key值可以不存在。
如果生产的消息只有value,没有key,那么消息会采用轮询机制选址一个主题分区放入数据\
如果生产的消息key和value都有,但是没有指定分区机制,会按照key的hashcode值和topic的分区数取一个余数,放到对应的分区
如果生产的消息key和value都有。那么为了避免数据倾斜,我们可以自定义分区机制
【注意】:kafka在生产数据的时候,每一条数据都会带着一个offset偏移量,消费数据的时候需要根据offset偏移量去读取对应的数据进行消费
Kafka集群的一个节点,每一个broker节点都会有一个唯一的编号
主题,就是消息队列中的消息队,一个Broker中可以存在多个主题,一个主题也可以存在于多个Broker上
Kafka中消息主题,就是消息队列,是Kafka用来存储消息的组件,topic中存放的数据是有序的
分区,每一个Topic主题都可以指定存储的分区数,一般情况下,一个Broker会存储一个主题的一个分区数据,而且每一个分区还可以设置副本数保证存储数据的安全性,分区和分区副本之间有一个主从架构关系。分区副本数不能随便设置,必须小于等于broker的数量。
Topic存放的消息最底层是以分区的形式存在的,Topic所谓的数据有序,不是整体有序,而是每一个分区内部是有序的。分区设置副本机制的,副本数量必须小于等于broker的节点数量 Kafka主题分区的数据不是永久存在的,而是有一个数据清理机制(基于时间的清理机制、基于分区数据大小的清理机制)
Kafka中主题、分区、消费者等等元数据信息都是交给zookeeper统一管理的
订阅主题,消费数据
消费者:消费数据的最小单位,但是一个消费者可以订阅多个topic的数据
将多个消费者组合起来,同时消费同一个主题的数据
消费者组:一个消费者组可以有多个消费者,其中topic一个分区的数据只能被消费者组的一个消费者消费,如果我们想要让一个消费者消费topic所有分区的数据,那么我们需要保证消费者组中只有一个消费者。
server.properties
kafka-server-start.sh /opt/xxxx/server.properties &
kafka-server-stop.sh
kafka-topic.sh --create --topic topicName --partitions num --replication-factor num<=borkerCount --zookeeper zkserverxxx
kafka-topic.sh --delete --topic topicName --zookeeper xxx 必须开启主题的删除功能
kafka-topic.sh --alter --topic topicName --partitions num --zookeeper xxx 主题分区数一般只能增加
kafka-topic.sh --describe --topic topicName --zookeeper xxx
kafka-topic.sh --list --zookeeper xxx
kafka-console-producer.sh --bootstrap-server ip:9092,ip:9092 --topic topicName
kafka-console-consumer.sh --bootstrap-server ip:9092 --from-beginning --topic topicName --group groupName
主要通过AdminClient
类来实现kafka的各种操作
创建AdminClient类需要写一个配置项 bootstrap.servers
生产者:KafkaProducer ProducerRecord
生产者创建需要赋予一些参数:参数的key都是在ProducerConfig类中封装的
参数 | 作用 |
---|---|
key.serializer | 生产的key值的序列化类型 |
value.serializer | 生产的value值得序列化类型 |
bootstrap.servers | kafka服务的列表 |
消费者:KafkaConsumer ConsumerRecords ConsumerRecord
消费者创建需要赋予一些参数:参数的key都是ConsumerConfig类中封装的
参数 | 作用 |
---|---|
key.deserializer | 消息的key的反序列化类型 |
value.deserializer | 消息的value的反序列化类型 |
bootstrap.servers | kafka服务的列表 |
group.id | 当前消费者所属的消费者组 |
auto.offset.reset | 消费者消费消息的时候是从头开始消息还是从最新的位置开始消费的核心,两个取值earliest、latest |
【earliest、latest区别】: earliest当消费者所属的消费者组没有任何的消费记录,从头开始消费 latest当消费者所属的消费者组没有任何的消费记录,从最新的位置开始消费 如果他们所属的消费者组有消费记录,那么他们两者都是从消费记录的位置继续开始消费
offset explorer
kafka eagle
后期我们在做实时计算的时候,我们经常会做如下操作,会通过Flume采集相关数据到Kafka中缓存,然后再使用实时计算框架对接kafka进行计算。
Flume采集的数据给kafka,那么此时也就意味着Flume就相当于是Kafka的生产者,kafka相当于是Flume的sink下沉地
kafka除了当作flume的sink,也可以充当flume的source和channel
通过Spark Streaming消费Kafka中的数据,然后对数据进行实时计算处理
采用一个Reciver接受器去接受Kafka的数据,然后数据缓存到Spark的executor内存中,这种方式很容易出现数据丢失问题,如果想要实现数据的安全性,需要开启Spark的 WAL预写日志机制保证数据的安全性 receiver模式连接的zookooper实现
不需要接收器,直接连接Kafka节点获取数据,同时由Spark自动维护offset偏移量,此时我们不需要担心数据丢失。
spark-streaming-kafka-0.10/0.8
在Spark3版本之后,在KafkaUtils中把receiver模式移除了
想使用Flume采集端口的数据(以空格分割的单词)到kafka的某个主题中,然后借助Spark Streaming统计端口数据中每一个单词出现的总次数。
编写Flume脚本采集端口数据到Kafka中 sink指定到kafka即可,flume充当kafka的生产者
整合Spark Streaming代码读取kafka中的数据,此时Spark相当于kafka的消费者
#1、起别名
demo.sources=s1
demo.channels=c1
demo.sinks=k1
#2、配置source数据源
demo.sources.s1.type=netcat
demo.sources.s1.bind=single
demo.sources.s1.port=44444
#3、配置channel管道
demo.channels.c1.type=memory
demo.channels.c1.capacity=20000
demo.channels.c1.transactionCapacity=10000
#4、配置sink下沉地
demo.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
demo.sinks.k1.kafka.bootstrap.servers=single:9092
demo.sinks.k1.kafka.topic=flume-topic
# 5、关联
demo.sources.s1.channels=c1
demo.sinks.k1.channel=c1
a.sources=s1
a.channels=c1
a.sinks=k1
a.sources.s1.type=org.apache.flume.source.kafka.KafkaSource
a.sources.s1.kafka.bootstrap.servers=single:9092
a.sources.s1.kafka.consumer.group.id=flume
a.sources.s1.kafka.topics=flume-topic
a.channels.c1.type=memory
a.channels.c1.capacity=20000
a.channels.c1.transactionCapacity=10000
a.sinks.k1.type=logger
a.sources.s1.channels=c1
a.sinks.k1.channel=c1