- 我的 Kafka 旅程 - 概念 / 模式 / 应用
- 我的 Kafka 旅程 - Linux下的安装 / 基础命令 / 集群
- 我的 Kafka 旅程 - Producer
- 我的 Kafka 旅程 - Broker
- 我的 Kafka 旅程 - 文件存储机制
- 我的 Kafka 旅程 - Consumer
- 我的 Kafka 旅程 - 性能调优 / 压力测试
作者:[Sol·wang] - 博客园,原文出处:https://www.cnblogs.com/Sol-wang/p/16691610.html
一、原理阐述
Producer生产者是数据的入口,它先将数据序列化后于内存的不同队列中,它用push模式再将内存中的数据发送到服务端的broker,以追加的方式到各自分区中存储。生产者端有两大线程,以先后的顺序,分别负责各自的任务处理,可并行或同步的方式,完成生产者端的所有处理过程。
1.1 生产者端的两大线程
- 生产者线程:先将数据按规则放到内存当中
- Sender线程:再将内存中的数据发送到broker
二、生产者线程
生产者线程发送数据,经过序列化后再内存中分队列存储,每个队列中的数据块默认为16K,每个数据块的过期时间为0ms。
内存中有多个分区队列,消息是如何分队列的呢?
2.1 生产者线程的分区工作原理/三大分区策略
- 线程中的send方法指定分区号的,按指定分区存放
- 线程中的send方法无分区号的,有 key value 的,按key的hash值计算出一个固定区号存放
- 线程中的send方法只有value的,一个数据块填满后,随机按序平均存放到不同的分区
2.2 生产者线程也可以自定义分区策略
- 自定义类实现 Partitioner 接口
- 重写 Partition 方法(按收到的消息,指定到分区)
- 实现类配置到 Producer Properties 中
三、Sender线程
Sender线程处理生产者线程于内存中的队列数据块,它建立内存队列与服务端broder分区的数据通道,当数据块达到16K或超过过期时间,从内存中取出数据块并发送到服务端broker对应的分区;默认情况下,不用等broker分区的应答,最多连续发送5个数据块,失败时重发;数据块发送成功后,内存队列中的相应数据块删除,再进行下一次的发送。
服务端 broker 分区中,有两种不同的角色,leader 和 副本,leader 负责接收数据,并把数据同步给各个副本以做备份,当 leader 发生异常状况后,可自动启用副本继续运行。后续章节再继续阐述broker分区的内容。
当Sender线程将内存中的数据块发送给服务端的broker leader 时,
3.1 borker 对 Sender线程的应答机制
- 0:broker leader接收的数据,不用等保存到磁盘就及时应答给Sender线程
- 1:leader保存磁盘后,不用等同步给副本,就应答给负责发送的Sender线程
- all:leader和副本全部都磁盘保存完成后,再应答给负责发送的Sender线程
按实际场景可以选择broker不同的应答机制:
如果对数据的完整性要求最高的话,应答机制all是最好的选择,如银行流水数据;
如果对处理速度要求最高的话,允许偶然个别数据的缺失,应答机制0是最好的选择,如日志。
四、生产者端 - 整体运行示例图
为了更好的直观的便于理解,以图例方式展现上述所有的阐述内容:
上图需要关注的点: 序列化的数据压缩方式、 数据分区策略、 内存总大小、 内存数据块大小、 数据块过期时间、 broker应答机制。
五、生产者端的主要参数配置
于文件 config/producer.properties 中的配置项:
# 内存区大小(默认32M)
buffer.memory
# 单数据块容量(默认16K)
batch.size
# 数据块过期时长(默认0毫秒)
linger.ms
# 序列化数据压缩方式(默认none)
# none、gzip、推荐 snapy、lz4、zstd
compression.type=none