• kafka知识点总结


    一 、kafka结构

    1.kafka 基础结构

    kafka有两种消息队列的模式 即点对点 和主题模式;
    
    为了方便扩展,并提高吞吐量,一个topic被切分成多个pertition
    一个主机对应一个broker,每个break里面又被分成topic;

    (1)Producer:消息生产者,就是向Kafka broker 发消息的客户端。
    
    (2)Consumer:消息消费者,向Kafka broker 取消息的客户端。
    
    (3)Consumer Group(CG):消费者组,由多个consumer 组成。消费者组内每个消
        费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不
        影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
    
    (4)Broker:一台Kafka 服务器就是一个broker。一个集群由多个broker 组成。一个broker 可以容纳多个topic。
    
    (5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
    
    (6)Partition:为了实现扩展性,一个非常大的topic 可以分布到多个broker(即服务器)上,一个topic 可以分为多个                
         partition,每个partition 是一个有序的队列。
    
    (7)Replica:副本。一个topic 的每个分区都有若干个副本,一个Leader 和若干个Follower。
    
    (8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
    
    (9)Follower:每个分区多个副本中的“从”,实时从Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 
          Follower 会成为新的Leader。

    常见题目

    2、常用配置

    配置文件为config包下的service.property 文件

    1. broker 的 全局唯一编号,不能重复 ,只能是数字 。
    2. **broker.id=0**
    3. 处理网络请求的线程数量
    4. num.network.threads=3
    5. 用来处理磁盘IO 的线程数量
    6. num.io.threads=8
    7. 发送套接字的缓冲区大小
    8. socket.send.buffer.bytes=102400
    9. 接收套接字的缓冲区大小
    10. socket.receive.buffer.bytes=102400
    11. 请求套接字的缓冲区大小
    12. socket.request.max.bytes=104857600
    13. kafka 运行日志数据存放的路径 ,路径不需要提前创建 kafka 自动帮你创建 ,可以
    14. 配置多个磁盘路径,路径与路径之间可以用分隔
    15. log.dirs=/opt/module/kafka/ datas
    16. topic 在当前 broker 上的分区个数
    17. num. partitions=1
    18. 用来恢复和清理 data 下数据的线程数量
    19. num.recovery.threads.per.data.dir=1
    20. 每个 topic 创建时的副本数,默认时 1 个副本
    21. offsets.topic.replication.factor=1
    22. segment 文件保留的最长时间,超时将被删除
    23. log.retention.hours=168
    24. 每个 segment 文件的大小,默认最大 1G
    25. log.segment.bytes=1073741824
    26. 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    27. log.retention.check.interval.ms=300000
    28. 配置连接 Zookeeper 集群 地址 (在 zk 根目录下创建 kaf ka ,方便管理
    29. zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181 /kafka

    集群部署kafka的shell脚本

    #! /bin/bash
    case $1 in
    "
    for i in hadoop102 hadoop103 hadoop104
    do
    echo " 启动 $i Kafka
    ssh $i "/opt/module/kafka/bin/kafka server start.sh
    daemon /opt/module/kafka/config/server.properties"
    done
    "
    for i in hadoop102 hadoop103 hadoop104
    do
    echo " 停止 $i Kafka
    ssh $i "/opt/module/kafka/bin/kafka server stop.sh "
    done
    esac

    二、kafka生产者

    在消息发送的过程中,涉及到了两个线程——main 线程和Sender 线程。在main 线程中创建了一个双端队列RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到Kafka Broker。

    buffer.memory  RecordAccumulator 缓冲区总大小, 默认 32 m ;
    reques是批量应答的时候用到,broker最多缓存几个requst;

    1、三种生产者发送消息的方式

    发送消息的三种方式

    
      
          org.apache.kafka
          kafka-clients
          3.0.0
      
    

    1、普通异步发送的生产者代码,不关注返回结果

    1. package com.atguigu.kafka.producer;
    2. import org.apache.kafka.clients.producer.KafkaProducer;
    3. import org.apache.kafka.clients.producer.ProducerRecord;
    4. import java.util.Properties;
    5. public class CustomProducer {
    6. public static void main(String[] args) throws InterruptedException {
    7. // 1. 创建 kafka 生产者的配置对象
    8. Properties properties = new Properties();
    9. // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
    10. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    11. // key,value 序列化(必须):key.serializer,value.serializer
    12. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    13. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    14. // 3. 创建 kafka 生产者对象
    15. KafkaProducer kafkaProducer = new KafkaProducer(properties);
    16. // 4. 调用 send 方法,发送消息
    17. for (int i = 0; i < 5; i++) {
    18. kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));
    19. // 异步发送 默认
    20. // kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
    21. // 同步发送
    22. kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
    23. //send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
    24. Future sendResult = producer.send(record);
    25. RecordMetadata recordMetadata = sendResult.get();
    26. }
    27. // 5. 关闭资源
    28. kafkaProducer.close();
    29. }
    30. }

    2、带回调函数的异步发送,通过回调函数获取返回结果,且回调函数是异步的

    回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

    注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

    1. package com.atguigu.kafka.producer;
    2. import org.apache.kafka.clients.producer.*;
    3. import java.util.Properties;
    4. public class CustomProducerCallback {
    5. public static void main(String[] args) throws InterruptedException {
    6. // 1. 创建 kafka 生产者的配置对象
    7. Properties properties = new Properties();
    8. // 2. 给 kafka 配置对象添加配置信息
    9. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    10. // key,value 序列化(必须):key.serializer,value.serializer
    11. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    12. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    13. // 3. 创建 kafka 生产者对象
    14. KafkaProducer kafkaProducer = new KafkaProducer(properties);
    15. // 4. 调用 send 方法,发送消息
    16. for (int i = 0; i < 5; i++) {
    17. // 添加回调
    18. kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
    19. // 该方法在 Producer 收到 ack 时调用,为异步调用
    20. @Override
    21. public void onCompletion(RecordMetadata metadata, Exception exception) {
    22. if (exception == null) {
    23. // 没有异常,输出信息到控制台
    24. System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition());
    25. } else {
    26. // 出现异常打印
    27. exception.printStackTrace();
    28. }
    29. }
    30. });
    31. // 延迟一会会看到数据发往不同分区
    32. Thread.sleep(2);
    33. }
    34. // 5. 关闭资源
    35. kafkaProducer.close();
    36. }
    37. }

    3、同步发送,关注返回结果,同步的会被阻塞

    1. public static void sendMessageWithCareResult() throws ExecutionException, InterruptedException {
    2. //ProducerRecord的三个参数,topic,发送的key,发送的value
    3. ProducerRecord record = new ProducerRecord<>("user-info-topic","name","路飞");
    4. //send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata
    5. Future sendResult = producer.send(record);
    6. RecordMetadata recordMetadata = sendResult.get();
    7. //打印下发送消息的topic,partition,offset
    8. System.out.println(String.format("发送结果:topic:%s,存储的partition:%s,offset:%s",
    9. recordMetadata.topic(),
    10. recordMetadata.partition(),
    11. recordMetadata.offset()));
    12. producer.close();
    13. }

    通过topic里面的pertition分区来提高消息处理的效率

    (1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。 (2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

    在IDEA中全局查找(ctrl +n)ProducerRecord类,在类中可以看到构造方法:

    1. // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)

    kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " + i)); 第一个参数是topic,第二个参数是pertition,第三个是消息messenger

    1. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值

    // 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余, 分别发往 1、2、0

    kafkaProducer.send(new ProducerRecord<>("first", "a","atguigu " + i), new Callback() { });

    2、自定义分区器

    定义类实现 Partitioner 接口来重写分区方法;

    1. package com.atguigu.kafka.producer;
    2. import org.apache.kafka.clients.producer.Partitioner;
    3. import org.apache.kafka.common.Cluster;
    4. import java.util.Map;
    5. /**
    6. * 1. 实现接口 Partitioner
    7. * 2. 实现 3 个方法:partition,close,configure
    8. * 3. 编写 partition 方法,返回分区号
    9. */
    10. public class MyPartitioner implements Partitioner {
    11. /**
    12. * 返回信息对应的分区
    13. * @param topic 主题
    14. * @param key 消息的 key
    15. * @param keyBytes 消息的 key 序列化后的字节数组
    16. * @param value 消息的 value
    17. * @param valueBytes 消息的 value 序列化后的字节数组
    18. * @param cluster 集群元数据可以查看分区信息
    19. * @return
    20. */
    21. @Override
    22. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    23. // 获取消息
    24. String msgValue = value.toString();
    25. // 创建 partition
    26. int partition;
    27. // 判断消息是否包含 atguigu
    28. if (msgValue.contains("atguigu")){
    29. partition = 0;
    30. }else {
    31. partition = 1;
    32. }
    33. // 返回分区号
    34. return partition;
    35. }
    36. // 关闭资源
    37. @Override
    38. public void close() {
    39. }
    40. // 配置方法
    41. @Override
    42. public void configure(Map configs) {
    43. }
    44. }

    使用分区器的方法,在生产者的配置中添加分区器参数。其实就是再properties里面引入分区的配置类

    1. package com.atguigu.kafka.producer;
    2. import org.apache.kafka.clients.producer.*;
    3. import java.util.Properties;
    4. public class CustomProducerCallbackPartitions {
    5. public static void main(String[] args) throws InterruptedException {
    6. Properties properties = new Properties();
    7. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
    8. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    9. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    10. // 添加自定义分区器
    11. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
    12. KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
    13. for (int i = 0; i < 5; i++) {
    14. kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
    15. @Override
    16. public void onCompletion(RecordMetadata metadata, Exception e) {
    17. if (e == null){
    18. System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition()
    19. );
    20. }else {
    21. e.printStackTrace();
    22. }
    23. }
    24. });
    25. }
    26. kafkaProducer.close();
    27. }
    28. }

    为了提高生成者的效率,还可以通过配置以下内容

    // batch.size:批次大小,默认 16K properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

    // linger.ms:等待时间,默认 0 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

    // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

    // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd

    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

    三、消息可靠性问题

    1、可靠性和重复性

    可靠性总结:

    acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
    acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
    acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
    在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

    同意是通过在properties里面配置ack的机制

    1. // 设置 acks
    2. properties.put(ProducerConfig.ACKS_CONFIG, "all");
    3. // 重试次数 retries,默认是 int 最大值,2147483647
    4. properties.put(ProducerConfig.RETRIES_CONFIG, 3);

    数据重复性问题

    • 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
    • 最多一次(At Most Once)= ACK级别设置为0
    
    • 总结:
    At Least Once可以保证数据不丢失,但是不能保证数据不重复;
    At Most Once可以保证数据不重复,但是不能保证数据不丢失。
    • 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。

    2、幂等性和事务

    幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

    精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。

    重复数据的判断标准:具有相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。

    所以幂等性只能保证的是在单分区单会话内不重复。

    开启幂等性参数 :enable.idempotence 默认为 true,false 关闭。

    事务:

    开启事务前必须开启幂等性

    Kafka 的事务一共有如下 5 个 API

    1. // 1 初始化事务
    2. void initTransactions();
    3. // 2 开启事务
    4. void beginTransaction() throws ProducerFencedException;
    5. // 3 在事务内提交已经消费的偏移量(主要用于消费者)
    6. void sendOffsetsToTransaction(Map offsets,String consumerGroupId) throws ProducerFencedException;
    7. // 4 提交事务
    8. void commitTransaction() throws ProducerFencedException;
    9. // 5 放弃事务(类似于回滚事务的操作)
    10. void abortTransaction() throws ProducerFencedException;
    11. 单个 Producer,使用事务保证消息的仅一次发送
    12. package com.atguigu.kafka.producer;
    13. import org.apache.kafka.clients.producer.KafkaProducer;
    14. import org.apache.kafka.clients.producer.ProducerRecord;
    15. import java.util.Properties;
    16. public class CustomProducerTransactions {
    17. public static void main(String[] args) throws InterruptedException {
    18. // 1. 创建 kafka 生产者的配置对象
    19. Properties properties = new Properties();
    20. // 2. 给 kafka 配置对象添加配置信息
    21. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    22. // key,value 序列化
    23. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    24. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    25. // 设置事务 id(必须),事务 id 任意起名
    26. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
    27. // 3. 创建 kafka 生产者对象
    28. KafkaProducer kafkaProducer = new KafkaProducer(properties);
    29. // 初始化事务
    30. kafkaProducer.initTransactions();
    31. // 开启事务
    32. kafkaProducer.beginTransaction();
    33. try {
    34. // 4. 调用 send 方法,发送消息
    35. for (int i = 0; i < 5; i++) {
    36. // 发送消息
    37. kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
    38. }
    39. // int i = 1 / 0;
    40. // 提交事务
    41. kafkaProducer.commitTransaction();
    42. } catch (Exception e) {
    43. // 终止事务
    44. kafkaProducer.abortTransaction();
    45. } finally {
    46. // 5. 关闭资源
    47. kafkaProducer.close();
    48. }
    49. }
    50. }

    3、生产者消息的有序性

    主要是开启幂等性后会通过其序号来落盘,如果失败,则会缓存起来知道正序的到来才落盘

    四、broker工作流程

    本章主要介绍kafka如何存储数据的

    在zookeeper的服务端存储的Kafka相关信息:

    1)/kafka/brokers/ids [0,1,2] 记录有哪些服务器
    
    2)/kafka/brokers/topics/first/partitions/0/state {"leader":1 ,"isr":[1,0,2] } 记录谁是Leader,有哪些服务器可用
        Zookeeper中存储的Kafka 信息
    
    3)/kafka/controller  {“brokerid”:0}  辅助选举Leader

    ISR 是lead 跟follow里面通讯正常的节点

    1、节点的服役和退役

    2、kafka的副本

    (1 Kafka 副本作用:提高数据可靠性 。
    (2 Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加 磁盘存储空间,增加网络上数据传输,
        降低效率。
    (3 Kafka 中副本分为: Leader 和 Follower 。 Kafka 生产者只会把数据发往 Leader然后 Follower 找 Leader 
        进行同步数据。
    (4 Kafka 分区中的所有副本统称为 AR Assigned Repllicas )。AR =ISR + OSR
      ISR表示 和 Leader 保持同步的 Follower 集合。 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,
      则该Follower 将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader 发生故障之后,
      就会从ISR 中选举新的Leader。OSR,表示Follower 与Leader 副本同步时,延迟过多的副本。

    3、lead选举流程

    Kafka 集群中有一个broker 的Controller 会被选举为Controller Leader,负责管理集群
    broker 的上下线,所有topic 的分区副本分配和Leader 选举等工作。
    Controller 的信息同步工作是依赖于Zookeeper 的。

    4. lead 和follow故障处理

    1)Follower故障

    (1) Follower发生故障后会被临时踢出ISR (2) 这个期间Leader和Follower继续接收数据 (3)待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。 (4)等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

    2)Leader故障

    (1) Leader发生故障之后,会从ISR中选出一个新的Leader (2)为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。

    注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

    5. Leader Partition 负载平衡

    正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。
    但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,
    其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

    6. 生产经验——增加副本因子

     在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

    7.topic 里面文件存储的机制

    Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数 据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。

    说明:日志存储参数配置

    参数描述 log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。

    log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。

    8.文件清理策略

    (1)检查是否过期的配置

         Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
          log.retention.hours,最低优先级小时,默认 7 天。
          log.retention.minutes,分钟。
          log.retention.ms,最高优先级毫秒。
          log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
    
    
    (2)过期清楚策略
    
        Kafka 中提供的日志清理策略有 delete 和 compact 两种。
    
        delete 日志删除:将过期数据删除
         log.cleanup.policy = delete 所有数据启用删除策略
        (1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
        (2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
            log.retention.bytes,默认等于-1,表示无穷大。
            思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?
    
    
        compact日志压缩:对于相同key的不同value值,只保留最后一个版本。
    
            压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大
            的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
            这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息
            集里就保存了所有用户最新的资料。
            log.cleanup.policy = compact 所有数据启用压缩策

    读数据采用稀疏索引,可以快速定位要消费的数据,写入的时候是通过顺序写磁盘

    Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

    零拷贝: Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用 走应用层,传输效率高。

    PageCache页缓存: Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存

    五、kafka的消费者

    消费者组 :由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

    1、基础消费者代码:

    1. import org.apache.kafka.clients.consumer.ConsumerRecord;
    2. import org.apache.kafka.clients.consumer.ConsumerRecords;
    3. import org.apache.kafka.clients.consumer.KafkaConsumer;
    4. import java.time.Duration;
    5. import java.util.ArrayList;
    6. import java.util.Properties;
    7. public class CustomConsumer {
    8. public static void main(String[] args) {
    9. // 1.创建消费者的配置对象
    10. Properties properties = new Properties();
    11. // 2.给消费者配置对象添加参数
    12. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    13. // 配置序列化 必须
    14. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    15. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    16. // 配置消费者组(组名任意起名) 必须
    17. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    18. // 创建消费者对象
    19. KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
    20. // 注册要消费的主题(可以消费多个主题)
    21. ArrayList topics = new ArrayList<>();
    22. topics.add("first");
    23. kafkaConsumer.subscribe(topics);
    24. // 拉取数据打印
    25. while (true) {
    26. // 设置 1s 中消费一批数据
    27. ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    28. // 打印消费到的数据
    29. for (ConsumerRecord consumerRecord : consumerRecords) {
    30. System.out.println(consumerRecord);
    31. }
    32. }
    33. }
    34. }

    如果是消费某个topic里面特定的partitions,需要在配置里面注明

    // 消费某个主题的某个分区数据
     ArrayList topicPartitions = new ArrayList<>();
     topicPartitions.add(new TopicPartition("first", 0));// 消费first主题下的0分区
     kafkaConsumer.assign(topicPartitions);

    复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者,代码与上面的基础消费者完全一样,消费者组名还是test 这样就能在test消费者组里面启动两个消费者

    2、分区的分配以及再平衡

    (1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。
    
    (2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,
     修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。

    常见参数的配置:

    heartbeat.interval.ms : Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。
    
    session.timeout.ms : Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
    
    max.poll.interval.ms: 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
    
    partition.assignment.strategy: 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。
    
    可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky

    3、四种分区分配策略

    1)Range 分区策略原理

    1. RoundRobin 分区策略原理

      RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。

    // 修改分区分配策略
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

    0 号消费者宕机后,0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,
     分别由 1 号消费者或者 2 号消费者消费。

    3)Sticky 以及再平衡

      粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,
      尽量少的调整分配的变动,可以节省大量的开销。粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,
      首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
    1. // 修改分区分配策略
    2. ArrayList startegys = new ArrayList<>();
    3. startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
    4. properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

    4、offset的位移

    消费完一批数据后,需要提交offset,可以设置自动提交和手动提交;默认是自动提交

    设置自动自交offset和提交时间;这种方式时间不好控制

    // 是否自动提交 offset
     properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
     // 提交 offset 的时间周期 1000ms,默认 5s
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

    同步提交offset和异步提交:

      虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因
      此Kafka还提供了手动提交offset的API。手动提交offset的方法有两种:分别是commitSync(同步提交)
      和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,
      同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);
      而异步提交则没有失败重试机制,故有可能提交失败。
    
          • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
          • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
    1. package com.atguigu.kafka.consumer;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import java.util.Arrays;
    7. import java.util.Properties;
    8. public class CustomConsumerByHandSync {
    9. public static void main(String[] args) {
    10. // 1. 创建 kafka 消费者配置类
    11. Properties properties = new Properties();
    12. // 2. 添加配置参数
    13. // 添加连接
    14. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    15. // 配置序列化
    16. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    17. "org.apache.kafka.common.serialization.StringDeserializer");
    18. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    19. "org.apache.kafka.common.serialization.StringDeserializer");
    20. // 配置消费者组
    21. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    22. // 是否自动提交 offset
    23. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    24. //3. 创建 kafka 消费者
    25. KafkaConsumer consumer = new KafkaConsumer<>(properties);
    26. //4. 设置消费主题 形参是列表
    27. consumer.subscribe(Arrays.asList("first"));
    28. //5. 消费数据
    29. while (true){
    30. // 读取消息
    31. ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
    32. // 输出消息
    33. for (ConsumerRecord consumerRecord : consumerRecords) {
    34. System.out.println(consumerRecord.value());
    35. }
    36. // 同步提交 offset
    37. consumer.commitSync();
    38. // 异步提交 offset
    39. consumer.commitAsync();
    40. }
    41. }
    42. }

    虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式

    5、指定offser消费

    auto.offset.reset = earliest | latest | none 默认是 latest。

    当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

    (1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。
    (2)latest(默认值):自动将偏移量重置为最新偏移量。
    (3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
    (4)任意指定 offset 位移开始消费
    1. // 2 订阅一个主题
    2. ArrayList topics = new ArrayList<>();
    3. topics.add("first");
    4. kafkaConsumer.subscribe(topics);
    5. Set assignment= new HashSet<>();
    6. while (assignment.size() == 0) {
    7. kafkaConsumer.poll(Duration.ofSeconds(1));
    8. // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
    9. assignment = kafkaConsumer.assignment();
    10. }
    11. // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
    12. for (TopicPartition tp: assignment) {
    13. kafkaConsumer.seek(tp, 1700);
    14. }
    15. // 3 消费该主题数据
    16. while (true) {
    17. ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    18. for (ConsumerRecord consumerRecord : consumerRecords) {
    19. System.out.println(consumerRecord);
    20. }
    21. }

    (5)指定时间消费

    需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
    1. package com.atguigu.kafka.consumer;
    2. import org.apache.kafka.clients.consumer.*;
    3. import org.apache.kafka.common.TopicPartition;
    4. import org.apache.kafka.common.serialization.StringDeserializer;
    5. import java.time.Duration;
    6. import java.util.*;
    7. public class CustomConsumerForTime {
    8. public static void main(String[] args) {
    9. // 0 配置信息
    10. Properties properties = new Properties();
    11. // 连接
    12. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    13. // key value 反序列化
    14. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    15. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    16. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
    17. // 1 创建一个消费者
    18. KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
    19. // 2 订阅一个主题
    20. ArrayList topics = new ArrayList<>();
    21. topics.add("first");
    22. kafkaConsumer.subscribe(topics);
    23. Set assignment = new HashSet<>();
    24. while (assignment.size() == 0) {
    25. kafkaConsumer.poll(Duration.ofSeconds(1));
    26. // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
    27. assignment = kafkaConsumer.assignment();
    28. }
    29. HashMap timestampToSearch = new HashMap<>();
    30. // 封装集合存储,每个分区对应一天前的数据
    31. for (TopicPartition topicPartition : assignment) {
    32. timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
    33. }
    34. // 获取从 1 天前开始消费的每个分区的 offset
    35. Map offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
    36. // 遍历每个分区,对每个分区设置消费时间。
    37. for (TopicPartition topicPartition : assignment) {
    38. OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
    39. // 根据时间指定开始消费的位置
    40. if (offsetAndTimestamp != null){
    41. kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
    42. }
    43. }
    44. // 3 消费该主题数据
    45. while (true) {
    46. ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    47. for (ConsumerRecord consumerRecord : consumerRecords) {
    48. System.out.println(consumerRecord);
    49. }
    50. }
    51. }
    52. }

    6、重复消费和漏消费问题

    重复消费:已经消费了数据,但是 offset 没提交。

    漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

    如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如MySQL)。这部分知识会在后续项目部分涉及。

    7. 数据积压问题

    六、kafka集成其他框架

    1.kafka与flink集成

    kafka作为生产者发送消息到flink,也可以kafka作为消费者从flink里面消费数据``

    1. org.apache.flink flink-java 1.13.1
    2. org.apache.flink
    3. flink-streaming-java_2.12
    4. 1.13.1
    5. org.apache.flink
    6. flink-clients_2.12
    7. 1.13.1
    8. org.apache.flink
    9. flink-connector-kafka-0.11_2.11
    10. 1.7.2
    生产者
    1. public class MyFlinkKafkaProducer1 {
    2. public static void main(String[] args) throws Exception {
    3. // 准备环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. env.setParallelism(3);
    6. // 准备数据源
    7. ArrayList strings = new ArrayList<>();
    8. strings.add("test1");
    9. strings.add("test2");
    10. DataStreamSource stream = env.fromCollection(strings);
    11. //创建kafka生产者 -- 未完善
    12. Properties properties = new Properties();
    13. FlinkKafkaProducer first = new FlinkKafkaProducer<>("first", new SimpleStringSchema(), properties);
    14. // 添加数据源
    15. stream.addSink((SinkFunction) first);
    16. // 执行
    17. env.execute();
    18. }
    19. }
    消费者
    1. public class MyFlinkKafkaConsumer {
    2. public static void main(String[] args) {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(3);
    5. Properties properties = new Properties();
    6. FlinkKafkaConsumer first =
    7. new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties);
    8. }
    9. }

     2、springboot与kafka的集成

    org.springframework.kafka spring-kafka 

    1. #配置连接的集群
    2. spring.kafka.bootstrap-servers=ip:socket,ip1:socket
    3. # key value 的序列化
    4. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    5. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

    生产者

    1. @RestController
    2. @RequestMapping("api/v1/kafka")
    3. public class ProducerController {
    4. @Autowired
    5. KafkaTemplate kafkaTemplate;
    6. @GetMapping("/producedata")
    7. public String data(String msg){
    8. kafkaTemplate.send("first",msg);
    9. return "true";
    10. }
    11. }

    消费者

    1. # 反序列化
    2. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    3. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    4. #消费者组id
    5. spring.kafka.consumer.group-id=testcon
    1. @Configuration
    2. public class ConsumerController {
    3. @Autowired
    4. KafkaTemplate kafkaTemplate;
    5. @KafkaListener(topics = "first")
    6. public void getData(String msg){
    7. System.out.println(msg);
    8. }
    9. }`

  • 相关阅读:
    ubuntu20.04开机运行java的sh脚本
    Linux0.11——第三回 做好访问内存的最基础准备工作
    [附源码]Python计算机毕业设计Django数字乡村基础治理系统
    项目中使用到的Spring注解及其作用
    “轻松实现文件夹批量重命名:使用顺序编号批量改名“
    Servlet规范之The Request
    Spring的常见注解
    [].slice.call(arguments)
    Nacos2.1.0与Seata1.5.2版本基于Docker部署指南
    C++多态之虚函数表详解及代码示例
  • 原文地址:https://blog.csdn.net/weixin_45063957/article/details/126677099