kafka有两种消息队列的模式 即点对点 和主题模式; 为了方便扩展,并提高吞吐量,一个topic被切分成多个pertition 一个主机对应一个broker,每个break里面又被分成topic;
(1)Producer:消息生产者,就是向Kafka broker 发消息的客户端。 (2)Consumer:消息消费者,向Kafka broker 取消息的客户端。 (3)Consumer Group(CG):消费者组,由多个consumer 组成。消费者组内每个消 费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不 影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 (4)Broker:一台Kafka 服务器就是一个broker。一个集群由多个broker 组成。一个broker 可以容纳多个topic。 (5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。 (6)Partition:为了实现扩展性,一个非常大的topic 可以分布到多个broker(即服务器)上,一个topic 可以分为多个 partition,每个partition 是一个有序的队列。 (7)Replica:副本。一个topic 的每个分区都有若干个副本,一个Leader 和若干个Follower。 (8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。 (9)Follower:每个分区多个副本中的“从”,实时从Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的Leader。
配置文件为config包下的service.property 文件
- broker 的 全局唯一编号,不能重复 ,只能是数字 。
- **broker.id=0**
-
- 处理网络请求的线程数量
- num.network.threads=3
- 用来处理磁盘IO 的线程数量
- num.io.threads=8
- 发送套接字的缓冲区大小
- socket.send.buffer.bytes=102400
- 接收套接字的缓冲区大小
- socket.receive.buffer.bytes=102400
- 请求套接字的缓冲区大小
- socket.request.max.bytes=104857600
- kafka 运行日志数据存放的路径 ,路径不需要提前创建 kafka 自动帮你创建 ,可以
- 配置多个磁盘路径,路径与路径之间可以用分隔
- log.dirs=/opt/module/kafka/ datas
- topic 在当前 broker 上的分区个数
- num. partitions=1
- 用来恢复和清理 data 下数据的线程数量
- num.recovery.threads.per.data.dir=1
- 每个 topic 创建时的副本数,默认时 1 个副本
- offsets.topic.replication.factor=1
- segment 文件保留的最长时间,超时将被删除
- log.retention.hours=168
- 每个 segment 文件的大小,默认最大 1G
- log.segment.bytes=1073741824
- 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
- log.retention.check.interval.ms=300000
- 配置连接 Zookeeper 集群 地址 (在 zk 根目录下创建 kaf ka ,方便管理
- zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181 /kafka
集群部署kafka的shell脚本
#! /bin/bash case $1 in " for i in hadoop102 hadoop103 hadoop104 do echo " 启动 $i Kafka ssh $i "/opt/module/kafka/bin/kafka server start.sh daemon /opt/module/kafka/config/server.properties" done " for i in hadoop102 hadoop103 hadoop104 do echo " 停止 $i Kafka ssh $i "/opt/module/kafka/bin/kafka server stop.sh " done esac
在消息发送的过程中,涉及到了两个线程——main 线程和Sender 线程。在main 线程中创建了一个双端队列RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到Kafka Broker。
buffer.memory RecordAccumulator 缓冲区总大小, 默认 32 m ; reques是批量应答的时候用到,broker最多缓存几个requst;
org.apache.kafka kafka-clients 3.0.0
1、普通异步发送的生产者代码,不关注返回结果
- package com.atguigu.kafka.producer;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import java.util.Properties;
-
- public class CustomProducer {
- public static void main(String[] args) throws InterruptedException {
-
- // 1. 创建 kafka 生产者的配置对象
- Properties properties = new Properties();
-
- // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
- // key,value 序列化(必须):key.serializer,value.serializer
-
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-
- // 3. 创建 kafka 生产者对象
- KafkaProducer
kafkaProducer = new KafkaProducer(properties); -
- // 4. 调用 send 方法,发送消息
- for (int i = 0; i < 5; i++) {
- kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));
-
-
- // 异步发送 默认
- // kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
-
- // 同步发送
- kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
-
- //send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
- Future
sendResult = producer.send(record); - RecordMetadata recordMetadata = sendResult.get();
-
- }
- // 5. 关闭资源
- kafkaProducer.close();
- }
- }
2、带回调函数的异步发送,通过回调函数获取返回结果,且回调函数是异步的
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
- package com.atguigu.kafka.producer;
- import org.apache.kafka.clients.producer.*;
- import java.util.Properties;
-
- public class CustomProducerCallback {
- public static void main(String[] args) throws InterruptedException {
-
- // 1. 创建 kafka 生产者的配置对象
- Properties properties = new Properties();
- // 2. 给 kafka 配置对象添加配置信息
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
- // key,value 序列化(必须):key.serializer,value.serializer
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- // 3. 创建 kafka 生产者对象
- KafkaProducer
kafkaProducer = new KafkaProducer(properties); -
- // 4. 调用 send 方法,发送消息
- for (int i = 0; i < 5; i++) {
- // 添加回调
- kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
-
- // 该方法在 Producer 收到 ack 时调用,为异步调用
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
-
- // 没有异常,输出信息到控制台
- System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition());
- } else {
- // 出现异常打印
- exception.printStackTrace();
- }
- }
- });
- // 延迟一会会看到数据发往不同分区
- Thread.sleep(2);
- }
- // 5. 关闭资源
- kafkaProducer.close();
- }
- }
3、同步发送,关注返回结果,同步的会被阻塞
- public static void sendMessageWithCareResult() throws ExecutionException, InterruptedException {
-
- //ProducerRecord的三个参数,topic,发送的key,发送的value
- ProducerRecord
record = new ProducerRecord<>("user-info-topic","name","路飞"); -
- //send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
- Future
sendResult = producer.send(record); - RecordMetadata recordMetadata = sendResult.get();
-
- //打印下发送消息的topic,partition,offset
- System.out.println(String.format("发送结果:topic:%s,存储的partition:%s,offset:%s",
- recordMetadata.topic(),
- recordMetadata.partition(),
- recordMetadata.offset()));
-
- producer.close();
- }
通过topic里面的pertition分区来提高消息处理的效率
(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。 (2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
在IDEA中全局查找(ctrl +n)ProducerRecord类,在类中可以看到构造方法:
kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " + i)); 第一个参数是topic,第二个参数是pertition,第三个是消息messenger
// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余, 分别发往 1、2、0
kafkaProducer.send(new ProducerRecord<>("first", "a","atguigu " + i), new Callback() { });
定义类实现 Partitioner 接口来重写分区方法;
- package com.atguigu.kafka.producer;
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
- import java.util.Map;
-
- /**
- * 1. 实现接口 Partitioner
- * 2. 实现 3 个方法:partition,close,configure
- * 3. 编写 partition 方法,返回分区号
- */
- public class MyPartitioner implements Partitioner {
-
- /**
- * 返回信息对应的分区
- * @param topic 主题
- * @param key 消息的 key
- * @param keyBytes 消息的 key 序列化后的字节数组
- * @param value 消息的 value
- * @param valueBytes 消息的 value 序列化后的字节数组
- * @param cluster 集群元数据可以查看分区信息
- * @return
- */
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // 获取消息
- String msgValue = value.toString();
-
- // 创建 partition
- int partition;
-
- // 判断消息是否包含 atguigu
- if (msgValue.contains("atguigu")){
- partition = 0;
- }else {
- partition = 1;
- }
-
- // 返回分区号
- return partition;
- }
-
- // 关闭资源
- @Override
- public void close() {
- }
-
- // 配置方法
- @Override
- public void configure(Map
configs) { - }
- }
使用分区器的方法,在生产者的配置中添加分区器参数。其实就是再properties里面引入分区的配置类
- package com.atguigu.kafka.producer;
- import org.apache.kafka.clients.producer.*;
- import java.util.Properties;
-
- public class CustomProducerCallbackPartitions {
- public static void main(String[] args) throws InterruptedException {
-
- Properties properties = new Properties();
-
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
-
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- // 添加自定义分区器
- properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
- KafkaProducer
kafkaProducer = new KafkaProducer<>(properties); -
- for (int i = 0; i < 5; i++) {
-
- kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception e) {
- if (e == null){
- System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition()
- );
-
-
- }else {
- e.printStackTrace();
- }
- }
- });
-
- }
- kafkaProducer.close();
- }
- }
为了提高生成者的效率,还可以通过配置以下内容
// batch.size:批次大小,默认 16K properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
可靠性总结:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等; acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
同意是通过在properties里面配置ack的机制
- // 设置 acks
- properties.put(ProducerConfig.ACKS_CONFIG, "all");
- // 重试次数 retries,默认是 int 最大值,2147483647
- properties.put(ProducerConfig.RETRIES_CONFIG, 3);
数据重复性问题
• 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2 • 最多一次(At Most Once)= ACK级别设置为0 • 总结: At Least Once可以保证数据不丢失,但是不能保证数据不重复; At Most Once可以保证数据不重复,但是不能保证数据不丢失。 • 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
重复数据的判断标准:具有
所以幂等性只能保证的是在单分区单会话内不重复。
开启幂等性参数 :enable.idempotence 默认为 true,false 关闭。
事务:
开启事务前必须开启幂等性
Kafka 的事务一共有如下 5 个 API
- // 1 初始化事务
- void initTransactions();
-
- // 2 开启事务
- void beginTransaction() throws ProducerFencedException;
-
- // 3 在事务内提交已经消费的偏移量(主要用于消费者)
- void sendOffsetsToTransaction(Map
offsets,String consumerGroupId) throws ProducerFencedException; -
- // 4 提交事务
- void commitTransaction() throws ProducerFencedException;
-
- // 5 放弃事务(类似于回滚事务的操作)
-
- void abortTransaction() throws ProducerFencedException;
- 单个 Producer,使用事务保证消息的仅一次发送
-
- package com.atguigu.kafka.producer;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import java.util.Properties;
-
- public class CustomProducerTransactions {
-
- public static void main(String[] args) throws InterruptedException {
- // 1. 创建 kafka 生产者的配置对象
- Properties properties = new Properties();
-
- // 2. 给 kafka 配置对象添加配置信息
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
- // key,value 序列化
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- // 设置事务 id(必须),事务 id 任意起名
- properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
-
- // 3. 创建 kafka 生产者对象
- KafkaProducer
kafkaProducer = new KafkaProducer(properties); -
- // 初始化事务
- kafkaProducer.initTransactions();
- // 开启事务
- kafkaProducer.beginTransaction();
-
- try {
- // 4. 调用 send 方法,发送消息
- for (int i = 0; i < 5; i++) {
- // 发送消息
- kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
- }
-
- // int i = 1 / 0;
- // 提交事务
- kafkaProducer.commitTransaction();
-
- } catch (Exception e) {
- // 终止事务
- kafkaProducer.abortTransaction();
- } finally {
-
- // 5. 关闭资源
- kafkaProducer.close();
- }
- }
- }
主要是开启幂等性后会通过其序号来落盘,如果失败,则会缓存起来知道正序的到来才落盘
本章主要介绍kafka如何存储数据的
在zookeeper的服务端存储的Kafka相关信息:
1)/kafka/brokers/ids [0,1,2] 记录有哪些服务器 2)/kafka/brokers/topics/first/partitions/0/state {"leader":1 ,"isr":[1,0,2] } 记录谁是Leader,有哪些服务器可用 Zookeeper中存储的Kafka 信息 3)/kafka/controller {“brokerid”:0} 辅助选举Leader
ISR 是lead 跟follow里面通讯正常的节点
(1 Kafka 副本作用:提高数据可靠性 。 (2 Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加 磁盘存储空间,增加网络上数据传输, 降低效率。 (3 Kafka 中副本分为: Leader 和 Follower 。 Kafka 生产者只会把数据发往 Leader然后 Follower 找 Leader 进行同步数据。 (4 Kafka 分区中的所有副本统称为 AR Assigned Repllicas )。AR =ISR + OSR ISR表示 和 Leader 保持同步的 Follower 集合。 如果 Follower 长时间未向 Leader 发送通信请求或同步数据, 则该Follower 将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader 发生故障之后, 就会从ISR 中选举新的Leader。OSR,表示Follower 与Leader 副本同步时,延迟过多的副本。
Kafka 集群中有一个broker 的Controller 会被选举为Controller Leader,负责管理集群 broker 的上下线,所有topic 的分区副本分配和Leader 选举等工作。 Controller 的信息同步工作是依赖于Zookeeper 的。
1)Follower故障
(1) Follower发生故障后会被临时踢出ISR (2) 这个期间Leader和Follower继续接收数据 (3)待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。 (4)等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。
2)Leader故障
(1) Leader发生故障之后,会从ISR中选出一个新的Leader (2)为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。 但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高, 其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。
在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数 据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。
说明:日志存储参数配置
参数描述 log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。
(1)检查是否过期的配置
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。 log.retention.hours,最低优先级小时,默认 7 天。 log.retention.minutes,分钟。 log.retention.ms,最高优先级毫秒。 log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。 (2)过期清楚策略 Kafka 中提供的日志清理策略有 delete 和 compact 两种。 delete 日志删除:将过期数据删除 log.cleanup.policy = delete 所有数据启用删除策略 (1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。 (2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。 log.retention.bytes,默认等于-1,表示无穷大。 思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理? compact日志压缩:对于相同key的不同value值,只保留最后一个版本。 压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。 这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息 集里就保存了所有用户最新的资料。 log.cleanup.policy = compact 所有数据启用压缩策
读数据采用稀疏索引,可以快速定位要消费的数据,写入的时候是通过顺序写磁盘
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
零拷贝: Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用 走应用层,传输效率高。
PageCache页缓存: Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存
消费者组 :由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Properties;
-
- public class CustomConsumer {
- public static void main(String[] args) {
-
- // 1.创建消费者的配置对象
- Properties properties = new Properties();
- // 2.给消费者配置对象添加参数
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
- // 配置序列化 必须
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- // 配置消费者组(组名任意起名) 必须
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
-
- // 创建消费者对象
- KafkaConsumer
kafkaConsumer = new KafkaConsumer(properties); -
- // 注册要消费的主题(可以消费多个主题)
- ArrayList
topics = new ArrayList<>(); - topics.add("first");
- kafkaConsumer.subscribe(topics);
-
- // 拉取数据打印
- while (true) {
- // 设置 1s 中消费一批数据
- ConsumerRecords
consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); - // 打印消费到的数据
- for (ConsumerRecord
consumerRecord : consumerRecords) { - System.out.println(consumerRecord);
- }
- }
- }
- }
如果是消费某个topic里面特定的partitions,需要在配置里面注明
// 消费某个主题的某个分区数据 ArrayListtopicPartitions = new ArrayList<>(); topicPartitions.add(new TopicPartition("first", 0));// 消费first主题下的0分区 kafkaConsumer.assign(topicPartitions);
复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者,代码与上面的基础消费者完全一样,消费者组名还是test 这样就能在test消费者组里面启动两个消费者
(1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。 (2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy, 修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。
常见参数的配置:
heartbeat.interval.ms : Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。 session.timeout.ms : Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。 max.poll.interval.ms: 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。 partition.assignment.strategy: 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。 可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky
1)Range 分区策略原理
RoundRobin 分区策略原理
RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
0 号消费者宕机后,0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据, 分别由 1 号消费者或者 2 号消费者消费。
3)Sticky 以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果, 尽量少的调整分配的变动,可以节省大量的开销。粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略, 首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
- // 修改分区分配策略
- ArrayList
startegys = new ArrayList<>(); - startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
- properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
消费完一批数据后,需要提交offset,可以设置自动提交和手动提交;默认是自动提交
设置自动自交offset和提交时间;这种方式时间不好控制
// 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 提交 offset 的时间周期 1000ms,默认 5s properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
同步提交offset和异步提交:
虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API。手动提交offset的方法有两种:分别是commitSync(同步提交) 和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是, 同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败); 而异步提交则没有失败重试机制,故有可能提交失败。 • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。 • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
- package com.atguigu.kafka.consumer;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.util.Arrays;
- import java.util.Properties;
-
- public class CustomConsumerByHandSync {
- public static void main(String[] args) {
-
- // 1. 创建 kafka 消费者配置类
- Properties properties = new Properties();
- // 2. 添加配置参数
- // 添加连接
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
- // 配置序列化
-
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringDeserializer");
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringDeserializer");
-
- // 配置消费者组
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
-
- // 是否自动提交 offset
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
- //3. 创建 kafka 消费者
- KafkaConsumer
consumer = new KafkaConsumer<>(properties); -
- //4. 设置消费主题 形参是列表
- consumer.subscribe(Arrays.asList("first"));
- //5. 消费数据
- while (true){
- // 读取消息
- ConsumerRecords
consumerRecords = consumer.poll(Duration.ofSeconds(1)); -
- // 输出消息
- for (ConsumerRecord
consumerRecord : consumerRecords) { - System.out.println(consumerRecord.value());
- }
-
- // 同步提交 offset
- consumer.commitSync();
-
- // 异步提交 offset
- consumer.commitAsync();
- }
- }
- }
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。 (2)latest(默认值):自动将偏移量重置为最新偏移量。 (3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。 (4)任意指定 offset 位移开始消费
- // 2 订阅一个主题
- ArrayList
topics = new ArrayList<>(); - topics.add("first");
- kafkaConsumer.subscribe(topics);
-
- Set
assignment= new HashSet<>(); - while (assignment.size() == 0) {
- kafkaConsumer.poll(Duration.ofSeconds(1));
- // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
- assignment = kafkaConsumer.assignment();
- }
-
- // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
- for (TopicPartition tp: assignment) {
- kafkaConsumer.seek(tp, 1700);
- }
-
- // 3 消费该主题数据
- while (true) {
- ConsumerRecords
consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); - for (ConsumerRecord
consumerRecord : consumerRecords) { - System.out.println(consumerRecord);
- }
- }
(5)指定时间消费
需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
- package com.atguigu.kafka.consumer;
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import java.time.Duration;
- import java.util.*;
-
- public class CustomConsumerForTime {
- public static void main(String[] args) {
-
- // 0 配置信息
-
- Properties properties = new Properties();
- // 连接
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
-
- // key value 反序列化
-
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
-
- // 1 创建一个消费者
- KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties); -
- // 2 订阅一个主题
- ArrayList
topics = new ArrayList<>(); - topics.add("first");
- kafkaConsumer.subscribe(topics);
-
- Set
assignment = new HashSet<>(); -
- while (assignment.size() == 0) {
- kafkaConsumer.poll(Duration.ofSeconds(1));
- // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
- assignment = kafkaConsumer.assignment();
- }
-
- HashMap
timestampToSearch = new HashMap<>(); -
- // 封装集合存储,每个分区对应一天前的数据
- for (TopicPartition topicPartition : assignment) {
- timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
- }
-
- // 获取从 1 天前开始消费的每个分区的 offset
- Map
offsets = kafkaConsumer.offsetsForTimes(timestampToSearch); -
- // 遍历每个分区,对每个分区设置消费时间。
- for (TopicPartition topicPartition : assignment) {
- OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
- // 根据时间指定开始消费的位置
- if (offsetAndTimestamp != null){
- kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
- }
- }
-
- // 3 消费该主题数据
- while (true) {
- ConsumerRecords
consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); -
- for (ConsumerRecord
consumerRecord : consumerRecords) { - System.out.println(consumerRecord);
- }
- }
- }
- }
重复消费:已经消费了数据,但是 offset 没提交。
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如MySQL)。这部分知识会在后续项目部分涉及。
kafka作为生产者发送消息到flink,也可以kafka作为消费者从flink里面消费数据``
- org.apache.flink flink-java 1.13.1
-
-
-
org.apache.flink -
flink-streaming-java_2.12 -
1.13.1 -
-
-
-
org.apache.flink -
flink-clients_2.12 -
1.13.1 -
-
-
-
org.apache.flink -
flink-connector-kafka-0.11_2.11 -
1.7.2 -
-
生产者
- public class MyFlinkKafkaProducer1 {
-
- public static void main(String[] args) throws Exception {
- // 准备环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
- // 准备数据源
- ArrayList
strings = new ArrayList<>(); - strings.add("test1");
- strings.add("test2");
- DataStreamSource
stream = env.fromCollection(strings); - //创建kafka生产者 -- 未完善
- Properties properties = new Properties();
-
- FlinkKafkaProducer
first = new FlinkKafkaProducer<>("first", new SimpleStringSchema(), properties); - // 添加数据源
- stream.addSink((SinkFunction
) first); - // 执行
- env.execute();
-
- }
- }
消费者
- public class MyFlinkKafkaConsumer {
-
- public static void main(String[] args) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- Properties properties = new Properties();
-
- FlinkKafkaConsumer
first = - new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties);
-
- }
- }
org.springframework.kafka spring-kafka
- #配置连接的集群
- spring.kafka.bootstrap-servers=ip:socket,ip1:socket
- # key value 的序列化
- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
生产者
- @RestController
- @RequestMapping("api/v1/kafka")
- public class ProducerController {
-
- @Autowired
- KafkaTemplate
kafkaTemplate; -
- @GetMapping("/producedata")
- public String data(String msg){
- kafkaTemplate.send("first",msg);
- return "true";
- }
- }
消费者
- # 反序列化
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
-
- #消费者组id
- spring.kafka.consumer.group-id=testcon
- @Configuration
- public class ConsumerController {
-
- @Autowired
- KafkaTemplate
kafkaTemplate; -
- @KafkaListener(topics = "first")
- public void getData(String msg){
- System.out.println(msg);
- }
- }`