Kafka 文件在宏观上的布局如下图所示:
正如上图所示,Kafka 文件布局的主要特征如下:
文件的组织以 topic + 分区进行组织,每一个 topic 可以创建多个分区,每一个分区包含单独的文件夹,并且是多副本机制。即 topic 的每一个分区会有 Leader 与 Follow,并且 Kafka 内部有机制保证 topic 的某一个分区的 Leader 与 follow 不会存在在同一台机器,并且每一台 broker 会尽量均衡的承担各个分区的 Leader,当然在运行过程中如果不均衡,可以执行命令进行手动重平衡。Leader 节点承担一个分区的读写,follow 节点只负责数据备份。
Kafka 的负载均衡主要依靠分区 Leader 节点的分布情况。
分区的 Leader 节点负责读写,而从节点负责数据同步,如果Leader分区所在的Broker节点发生宕机,会触发主从节点的切换,会在剩下的 follow 节点中选举一个新的 Leader 节点,其数据的流入流程如下图所示:

分区 Leader 收到客户端的消息发送请求时,是写入到 Leader 节点后就返回还是要等到它的从节点全部写入后再返回,这里非常关键,会直接影响消息发送端的时延,故 Kafka 提供了 ack 这个参数来进行策略选择:
ack = 0不等broker端确认就直接返回,即客户端将消息发送到网络中就返回发送成功。
ack = 1 Leader 节点接受并存储后向客户端返回成功。
ack = -1 Leader节点和所有的Follow节点接受并成功存储再向客户端返回成功。
RocketMQ 的文件布局如下图所示:

RocketMQ 所有主题的消息都会写入到 commitlog 文件中,然后基于 commitlog 文件构建消息消费队列文件(Consumequeue),消息消费队列的组织结构按照 /topic/{queue} 进行组织。从集群的视角来看如下图所示:


RocketMQ 默认采取的是主从同步,当然从RocketMQ4.5引入了多副本机制,但其副本的粒度为 Commitlog 文件,上图中不同 master 节点之间的数据完成不一样(数据分片),而主从节点节点数据一致。
RocketMQ的高性能在于顺序写盘(CommitLog)、零拷贝和跳跃读(尽量命中PageCache),高可靠性在于
刷盘和Master/Slave, 另外NameServer 全部挂掉不影响已经运行的Broker,Producer,Consumer。
发送消息负载均衡,且发送消息线程安全(可满足多个实例死循环发消息),集群消费模式下消费者端负载均衡,这些特性加上上述的高性能读写,
共同造就了RocketMQ的高并发读写能力。
刷盘和主从同步均为异步(默认)时,broker进程挂掉(例如重启),消息依然不会丢失,因为broker
shutdown时会执行persist。 当物理机器宕机时,才有消息丢失的风险。
另外,master挂掉后,消费者从slave消费消息,但slave不能写消息。
kafka中Topic的Partition数量过多,队列文件会过多,那么会给磁盘的IO读写造成比较大的压力,也就造成了性能瓶颈。所以RocketMQ进行了优化,消息主题统一存储在CommitLog中。
当然,这种设计并不是银弹,它也有它的优缺点
Kafka 中文件的布局是以 Topic/partition ,每一个分区一个物理文件夹,在分区文件级别实现文件顺序写,如果一个Kafka集群中拥有成百上千个主题,每一个主题拥有上百个分区,消息在高并发写入时,其IO操作就会显得零散,其操作相当于随机IO,即 Kafka 在消息写入时的IO性能会随着 topic 、分区数量的增长,其写入性能会先上升,然后下降。
- Broker 不维护数据消费状态,只是负责数据的顺序读写,功能单一,不需要创建对象及GC操作,效率高。
- kafka各broker关系: 各broker之间关系平等, 只有具体topic下的partition才有主从关系,其中master节点负责client的读写,follower不负责备份(ISR机制)。
- Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把 这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个
Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。【分区是为了性能,副本是为了容错】- kafka包括一个默认的topic: consumer__*, 默认是50个分区,分区时机是(分区位置= groupid.hascode()%50)记录每个ConsumerGroup的offset信息,这个之前是存储在zk中的。后来考虑的ZK性能问题,改存到kafka自己的topic中。
- producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
- producer 将消息发送给该 leader
- leader 将消息写入本地 log
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
当partition leader宕机时,zk会启动Leader选举
而 RocketMQ在消息写入时追求极致的顺序写,所有的消息不分主题一律顺序写入 commitlog 文件,并不会随着 topic 和 分区数量的增加而影响其顺序性。但通过笔者的实践来看一台物理机并使用SSD盘,但一个文件无法充分利用磁盘IO的性能。
Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上。Producer的发送机制保证消息尽量平均分布到
所有队列中,最终效果就是所有消息都平均落在每个Broker上。
1.普通消息 普通消息也叫做无序消息,简单来说就是没有顺序的消息,producer 只管发送消息,consumer 只管接收消息,至于消息和消息之间的顺序并没有保证,可能先发送的消息先消费,也可能先发送的消息后消费。
2.有序消息 有序消息就是按照一定的先后顺序的消息类型。 全局有序消息:只有一个队列一个消费者,效率受限 局部有序消息:配置算法,让所有相关消息进入同一个队列。
3.延时消息 延时消息,简单来说就是当 producer 将消息发送到 broker 后,会延时一定时间后才投递给 consumer 进行消费。
两者文件组织方式,除了在磁盘的顺序写方面有所区别后,由于其粒度的问题,Kafka 的 topic 扩容分区会涉及分区在各个 Broker 的移动,其扩容操作比较重,而 RocketMQ 数据存储是基于 commitlog 文件的,扩容时不会产生数据移动,只会对新的数据产生影响,RocketMQ 的运维成本对 Kafka 更低。
最后 Kafka 的 ack 参数可以类比 RocketMQ 的同步复制、异步复制。
Kafka 的 ack 参数为 1 时,对比 RocketMQ 的异步复制;-1 对标 RocketMQ 的 同步复制,而 -1 则对标 RocketMQ 消息发送方式的 oneway 模式。
Kafka 的消息写入使用的是 FileChannel,其代码截图如下:

并且在消息写入时使用了 transferTo 方法, NIO 中网络读写真正是零拷贝的就是需要调用 FileCha nnel 的 transferTo或者 transferFrom 方法,其内部机制是利用了 sendfile 系统调用。Kafka 采用的是 sendfile 这种零拷贝方式,适用于系统日志消息这种高吞吐量的大块文件的数据持久化和传输。但是值得注意的一点是,Kafka 的索引文件使用的是 mmap + write 方式,数据文件使用的是 sendfile 方式。
RocketMQ 的消息写入支持内存映射与FileChannel 写入两种方式:
RocketMQ 选择了 mmap + write 这种零拷贝方式,适用于业务级消息这种小块文件的数据持久化和传输



尽管 RocketMQ 与 Kafka 都支持 FileChannel 方式写入,但 RocketMQ 基于 FileChannel 写入时调用的 API 却并不是 transferTo,而是先调用 writer,然后定时 flush 刷写到磁盘,其代码截图如下:

但是对于超过64K的内存写入时往往 sendfile 的性能更高
Kafka 在消息发送客户端采用了一个双端队列,引入了批处理思想。其消息发送机制如下图所示:

客户端通过调用 kafka 的消息发送者发送消息时,消息会首先存入到一个双端队列中,双端队列中单个元素为 ProducerBatch,表示一个发送批次,其最大大小受参数 batch.size 控制,默认为 16K。然后会单独开一个 Send 线程,从双端队列中获取一个发送批次,将消息按批发送到 Kafka集群中,这里引入了 linger.ms 参数来控制 Send 线程的发送行为。
为了提高 kafka 消息发送的高吞吐量,即控制在缓存区中未积满 batch.size 时来控制消息发送线程的行为,是立即发送还是等待一定时间,如果linger.ms 设置为 0表示立即发送,如果设置为大于0,则消息发送线程会等待这个值后才会向broker发送。linger.ms 参数者会增加响应时间,但有利于增加吞吐量。有点类似于 TCP 领域的 Nagle 算法。
Kafka 的消息发送,在写入 ProducerBatch 时会按照消息存储协议组织好数据,在服务端可以直接写入到文件中。
RocketMQ 消息发送在客户端主要是根据路由选择算法选择一个队列,然后将消息发送到服务端,消息会在服务端按照消息的存储格式进行组织,然后进行持久化等操作。
Kafka 在消息发送方面比 RokcetMQ 有一个显著的优势就是消息格式的组织是发生在客户端,这样会有一个大的优势节约了 Broker 端的CPU压力,客户端“分布式”的承接了其优势,其架构方式有点类似 shardingjdbc 与 MyCat 的区别。
Kafka 在消息发送端另外一个特点是引入了双端缓存队列,Kafka 无处不在追求批处理,这样显著的特点是能提高消息发送的吞吐量,但与之带来的是增大消息的响应时间,并且带来了消息丢失的可能性,因为 Kafka 追加到消息缓存后会返回成功,如果消息发送方异常退出,会带来消息丢失。
Kafka 中的 linger.ms = 0 可类比 RocketMQ 消息发送的效果。
但 Kafka 通过提供 batch.size 与 linger.ms 两个参数按照场景进行定制化,比 RocketMQ 灵活。
例如日志集群,通常会调大 batch.size 与 linger.ms 参数,重复发挥消息批量发送机制,提高其吞吐量;但如果对一些响应时间比较敏感的话,可以适当减少 linger.ms 的值。
从上面的对比来看,Kafka 在性能上综合表现确实要比 RocketMQ 更加的优秀,但在消息选型过程中,我们不仅仅要参考其性能,还有从功能性上来考虑,例如 RocketMQ 提供了丰富的消息检索功能、事务消息、消息消费重试、定时消息等。
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,0.11开始不支持事务消息,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务(行为跟踪,日志收集等)。
#RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
- 使用场景:Kafka适合日志处理;RocketMQ适合业务处理
- 性能:Kafka吞吐量更高,单机百万/秒;RocketMQ单机10万/秒。 因为Kafka一个topic有很多partition,代表很多目录,每个目录下有很多segment,每个代表一个消息文件,而RocketMQ存储消息只有commitLog文件。所以Kafka可以并发写,快于RocketMQ。但同样的,当Topic增加,Kafka分区文件增多,文件刷盘时会竞争磁盘资源,而导致效率降低。
同时,生产者有一个发送消息的缓存队列,客户端发送后,放入缓存,立刻返回成功。当缓存队列达到阈值,才真正发送给broker,此举合并了多次请求,减少网络IO,但增大消息丢失风险- 特殊消息:Kafka不支持定时等
- 支持队列数:Kafka超过64个队列(partition)性能下降严重,而RocketMQ最高支持5万个队列