• Kafka源码简要分析


    目录

    一、生产者的初始化流程

    二、生产者到缓冲队列的流程

    三、Sender拉取数据到Kafka流程

    四、消费者初始化

    五、主题订阅原理

    六、消费者抓取数据原理

    七、消费者组初始化

    八、消费者组消费流程

    九、提交offset原理


    一、生产者的初始化流程

    1. 首先获取事务id和客户端id(用到事物必须要事物id不然报错,每个生产者都需要唯一标识客户端id)
    2. 监控kafka相关情况的JmxReporter配置
    3. 然后获取分区器,如果用户有自定义的就读取配置的,如果没有配置就用默认分区器
    4. 然后key和value进行序列化
    5. 然后就读取自定义拦截器,可以定义多个拦截器,组成拦截器链
    6. 然后初始化控制单条日志的大小,默认是1m;缓冲区大小,默认32m;
    7. 创建内存池,缓存队列,初始化批次大小默认16k,压缩相关处理,默认是none,重试间隔时间默认100ms
    8. 连接kafka集群,获取元数据,才能知道要发送到哪个分区
    9. 创建sender线程,会有个创建sender的方法,sender线程负责拉取缓冲队列消息到Kafka,在方法里面会定义缓存请求的个数默认5个,然后请求超时的时间,然后创建一个网络请求客户端对象,会传入刚刚的参数还有客户端id,重试时间,发送缓冲区的大小128和接受缓冲区的大小32,还有acks等配置。sender继承了Runnbale接口,然后会new个sender线程出来用上面这些参数,然后返回。
    10. sender放到后台,启动sender线程

    二、生产者到缓冲队列的流程

    1. 在执行到拦截器的时候就要调用一个onSend方法,如果有多个拦截器,每个拦截器都会走一次这个方法,这个方法就是拦截器对数据加工的
    2. 然后获取元数据,要根据主题的分区放到对应的缓存队列
    3. 序列化相关操作key和value的序列化和压缩
    4. 分区操作,如果指定了分区,直接分配到指定分区;没有指定就会根据分区器进行分配,没有指定key就会粘性分区处理(如果批次大小和活着时间到了不然就一直是那个,满足才能创建新队列用),如果指定key就根据key到hashcode进分区数取模,
    5. 保证(序列化和压缩后)数据大小能够传输,他去读取配置的消息最大值和缓冲区大小,如果有超过的抛异常
    6. 向缓存队列里面追加数据,获取或者创建一个队列按照分区,然后尝试添加数据(一般不成功,因为还没申请内存),然后根据16k和现在压缩后的总大小取最大值,申请内存就申请这个大小,内存池分配内存,然后sender线程拿走就了会释放内存。
    7. 如果批次大小满了或者有了新的批次需要创建,就唤醒sender线程把缓冲队列的数据拉取过去。

    三、Sender拉取数据到Kafka流程

    1. 事务相关操作
    2. 获取元数据信息,为了知道发到哪个分区
    3. 判断32m缓存是否准备好,先获取队列的信息,先判断内存队列有没有数据
    4. 判断leader是不是空如果没有目标那还是会抛出异常,如果批次大小或时间满足一个条件,就会发送。
    5. 把所有请求按照节点为单位来发送请求,这样一台机器只需要建立一次连接
    6. 封装了个request然后通过网络客户端把数据发送过去
    7. 然后服务端还是通过网络客户端获取结果

    四、消费者初始化

    1. 消费者组平衡
    2. 获取消费者组id和客户端id
    3. 设置请求服务端等待时间,默认30秒;重试时间,默认100毫秒
    4. 拦截器链相关处理
    5. key和value的反序列化
    6. 判断offset从什么位置开始消费
    7. 获取消费者元数据(重试时间、是否允许访问系统主题默认false,是否允许自动创建topic主题默认true)
    8. 连接Kafka集群
    9. 创建网络客户端对象(连接重试时间默认50ms,最大重试时间1s,发送缓冲区128kb和接受缓冲区64kb大小)
    10. 指定消费者分区分配策略
    11. 创建coordinator对象
    12. 设置自动提交offset时间,默认5s,配置抓取数据的参数(最少抓取多少最大一次抓取多少等)

    五、主题订阅原理

    1. 传入要订阅的主题,如果为null直接抛出异常
    2. 注册负载均衡监听器,如果消费者组中有节点挂了,要通知其他消费者
    3. 按照主题自动订阅进行分配

    六、消费者抓取数据原理

    1. 他首先先初始化消费者组和队列
    2. 然后回调消息会到缓冲队列,然后去队列抓取数据,最多一次500条
    3. 然后抓取后拦截器开始处理数据

    七、消费者组初始化

    1. 先判断coordinator不为null那就说明为消费者组
    2. 如果没有指定分区分配策略会抛出异常
    3. 判断coordinator是否准备好,他会循环创建查找coordinator的请求并发送,并获取服务器返回到结果

    他这整个消费者组初始化就是判断coordinator有没有准备好

    八、消费者组消费流程

    1. 他会用判断coordinator是不是空,是的话就等待
    2. 他上来先去队列拉取数据,一般是拉取不到的
    3. 他先构造请求的入参(最少一次抓多少,最多抓多少,超时时间等待)然后调用send
    4. 他送后返回future,通过回调获取数据的
    5. 他会循环遍历数据获取分区,获取分区的数据,如果有数据就放到消息队列里面
    6. 然后就调用从队列拉取数据的方法拉取,然后他有大小限制最大500,他会循环一波一波拉取过去
    7. 然后放到拦截器走加工操作

    九、提交offset原理

    • 同步提交:找到coordinator然后调用commitOffset进行发送,然后不停dowhile循环,调用发送提交请求,然后等待回调获取结果,一直循环到成功为止。
    • 异步提交:他还是用coordinator去提交但是他不等待结果,他new了个监听等待结果。
  • 相关阅读:
    Exploration by random network distillation论文笔记
    好用且免费的ChatGPT工具
    Python tests in.....
    图表制作办公首选--实用图表工具Echars
    JavaScript 在前端开发中有什么应用?
    python Jupyter程序之Matplotlib数据可视化
    [微前端实战]---036 react16 - 新车排行登录
    EBS JVM 内存优化攻略
    类型组合——数组、结构、指针
    GUI编程--PyQt5--QTreeWidget
  • 原文地址:https://blog.csdn.net/weixin_54232666/article/details/133557358