• 中间件 | Kafka - [基本信息]


    §1 作用

    定义

    • 消息队列
    • 分布式事件流平台

    应用目的
    削峰(缓冲)

    解耦
    基于消息的通信是平台无关、语言无关的,只取决于通信双方是否可以正确解析消息内容

    异步
    同步接口转异步的常规套路

    应用场景
    分布式事务
    通过将分布式事务拆分为流程
    不同流程节点对应不同 topic,结合分布式锁可以实现分布式事务

    通信次数多但单个消息处理简单
    日志、用户活动跟踪、监控

    §2 术语

    Producer
    生产者,生产消息的一方

    Consumer
    消费者,使用消息的一方

    Topic
    主题,约等于消息的类型

    Partition
    分区,是某个 Topic 的一部分

    broker
    代表存储数据的物理设备
    通常对于一个 Topic 而言,不同的 Partition 需要分布在不同的 broker 上

    Consumer Group
    消费者组,一个消费者组可以共同消费一个 Topic
    但 Topic 中的一个 Partition 只能由消费者组中的一个消费者消费

    Replication
    副本,是对某 Topic 的某 Partition 的备份

    Leader
    主副本,生产和消费行为只针对 leader

    Follower
    从副本
    主副本存活时,生产和消费行为与 follower 无关
    follower 只定期从 leader 同步信息,作为备份
    leader 宕机后 follower 会代替

    Zookeeper
    Kafka 2.8.0 后可以不配合 Zookeeper
    作为 kafka broker 和 topic 的注册中心,信息存放于

    • /brokers/ids/[0,1,2]
    • /brokers/topics/topics_name/partitions/partition_no/state
      “leader”:0,“isr”:[0,2]

    通信模式

    • 点对点
      只用于一个生产者对接一个消费者的场景
      消息消费后会删除
    • 发布/订阅
      可以同时存在多个 topic
      每个 topic 可以同时被多个消费者消费,并且相互独立
      消费的消息不会删除

    §3 生产者

    模型

    在这里插入图片描述

    工作流程

    • 调用 send()
    • 通过 kafka 拦截器
      可选,通常不建议
    • 通过序列化器
      通常不适用 JDK 自带序列化器,因会夹杂较多非消息相关数据,比如校验信息
    • 通过分区器,将消息写入 RecordAccumulator
      • 分区器会在内存中维护一组缓冲队列
        • 缓冲队列数量默认 == Partition 数量
        • 缓冲队列总大小默认 32M
        • 每一批消息大小默认 16K
      • 消息写入 RecordAccumulator 时会区分是否同步发送
        • 同步发送时,缓冲队列最多只有一个消息
        • 异步发送时,缓冲队列可以缓冲多个消息
    • 通过 sender 线程发送消息
      • sender 会在满足下面条件之一时发送
        • 消息积累至 batch.size 大小,默认 16K
        • 消息等待至 linger.ms 时间,默认 0ms,即无延时
      • 通过 NetworkClient 缓冲消息发送请求
        • NetworkClient 对每个 kafka broker 维护一个请求队列
        • 每个请求队列默认最多缓冲 5 个消息发送请求
        • 队列中在前的请求发出后若对应 broker 没有及时应答,允许 发送后面的请求,但不能超过 5 个
      • 通过 Selector 选择 kafka 的 broker,并完成发送请求的实际执行
    • broker 进行副本同步
    • broker 根据应答机制配置,返回 acks 信息给 Selector
      • 消息发送成功时
        • 清理对应请求
        • 清理 sender 中的消息
      • 消息发送失败时
        • 进行重试,默认重试 Integer.MAX_VALUE 次(即无限次)

    分区策略
    默认分区策略

    • 指定了 partition 时,使用指定的 partition
    • 未指定 partition,但指定了 key 时,通过 key 的 hashcode 计算落点 key.hashcode()%partitionCount
    • 只指定了值时,使用黏性分区(Sticky Partition)
      • 随机选中一个 partition
      • 随后尽量持续使用此分区
      • 分区 batch 占满或完成后,重新随机选中分区

    自定义分区策略
    自定义分区器

    public class XXPartitioner implements Partitioner{
    	@Override
    	public int partition(String topic,Object key,byte[] keyBytes,Object value, byte[] valueBytes,Cluster cluster){
    		//String topic		主题
    		//Object key		键
    		//byte[] keyBytes	序列化之后的键的字节数组
    		//Object value		值
    		//byte[] valueBytes	序列化之后的值的字节数组
    		//Cluster cluster	集群
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    使用分区器

    Properties properties = new Properties();
    ...
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.xx.xx.xx.XXPartitioner");
    ...
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    大数据架构
    使用 KubeBlocks 为 K8s 提供稳如老狗的数据库服务
    Java,回形数
    Docker套件之Mysql服务配置
    图像处理领域之►边缘检测大合集◄【应该是全网仅有的了吧】
    涨1100w播放,150w粉!B站UP主仅入站百天竟成功出圈!
    Godot Best practices
    Python:【基础语法】 deque()用法
    让终端命令更有时间观念
    【数组】轮转数组
  • 原文地址:https://blog.csdn.net/ZEUS00456/article/details/127698164