NameServe采用的是Peer-to-Peer架构,这其中任何一台NameServe都是独立的,跟其他的NameServer没有通信;每台NameServer都有完整的路由信息;
每个Broker(包括Slave Broker)启动的时候会跟每个NameServer都建立一个TCP长连接, 然后向NameServer进行注册,并定期发送心跳;
生产者、消费者启动后会主动从NameServer拉取元数据信息;
每个Broker(包括Slave Broker)会每隔30s给所有的NameServe发送心跳,证明自己还活着;然后NameServe会每隔10s运行一个任务,去检查一下各个Broker的最近一次心跳时间,如果某个Broker超过120s都没发送心跳,那么就认为这个Broker已经挂掉;发送心跳的时候会告诉NameServer自己当前的数据情况,比如有哪些Topic等元数据;
RocketMQ的Master-Slave模式采取的是Slave Broker不停的发送请求到Master Broker去拉取消息(Pull模式拉取消息);消费者在获取消息 的时候会先发送请求到Master Broker上请求获取一批消息,此时Master Broker会返回一批消息给消费者; 然后Master Broker在返回消息给消费者系统的时候,会根据当时Master Broker的负载情况和Slave Broker的同步情况,向消费者系统建议下一次拉取消息的时候是从Master Broker拉取还是从Slave Broker拉取;
如果Slave Broker挂了对系统有一些影响,但是影响不大,所以如果Slave Broker挂了,那么此时无论消息写入还是消息拉取,还是可以继续从Master Broke去走,对整体运行不影响。只不过少了Slave Broker,会导致所有读写压力都集中在Master Broker上。
如果Master Broker挂了:
在RocketMQ 4.5版本之前: 如果master broker挂了对消息的写入和获取都有影响,Slave Broker同步数据可能又部分还没来得及同步,并且RocketMQ不能实现将Slave Broker切换为Master Broker的。所以Master Broker宕机以后需要运维工程师介入手动做一些运维操作,修改Slave Broker配置,重启机器调整为Master Broker,比较麻烦
在RocketMQ4.5版本以后:引入了Dledger机制(基于Raft协议实现)可以让一个Master Broker对应多个Slave Broker(至少2个),此时一旦Master Broker宕机了,就可以在多个副本Slave中,通过Dledger技术和Raft协议算法进行leader选举,直接将一个Slave Broker选举为新的Master Broker,然后这个新的Master Broker就可以对外提供服务了。
8. 生产者启动以后会向其中的一台NameServer建立长连接,并定时拉取最新的元数据,然后生产者根据负载均衡算法选择一台Master Broker机器建立长连接;【生产者一定是投递消息到Master Broker的 】
9. 消费者系统其实跟生产者系统原理是类似的,他们也会跟NameServer建立长连接,然后拉取路由信息,接着找到自己要获取消息的Topic在哪几台Broker上,就可以跟Broker建立长连接,从里面拉取消息了。这里唯一要注意的一点是,消费者系统可能会从Master Broker拉取消息,也可能从Slave Broker拉取消息,都有可能,一切都看具体情况。
vm.overcommit_memory
“vm.overcommit_memory” 这个参数有三个值可以选择,0、1、2。
0: 如果值是0的话,在你的中间件系统申请内存的时候,os内核会检查可用内存是否足够,如果足够的话就分配内存给你,如果感觉剩余内存不是太够了,干脆就拒绝你的申请,导致你申请内存失败,进而导致中间件系统异常出错。比如我们曾经线上环境部署的Redis就因为这个参数是0,导致在save数据快照到磁盘文件的时候,需要申请大内存的时候被拒绝了,进而导致了异常报错。
1:因此一般需要将这个参数的值调整为1,意思是把所有可用的物理内存都允许分配给你,只要有内存就给你来用,这样可以避免申请内存失败的问题。
2: 表示内核允许分配超过所有物理内存和交换空间总和的内存
可以用如下命令修改:
echo 'vm.overcommit_memory=1' >> /etc/sysctl.conf
2. vm.max_map_count
这个参数的值会影响中间件系统可以开启的线程的数量,同样也是非常重要的。如果这个参数过小,有的时候可能会导致有些中间件无法开启足够的线程,进而导致报错,甚至中间件系统挂掉。他的默认值是65536,但是这个值有时候是不够的,比如我们大数据团队的生产环境部署的Kafka集群曾经有一次就报出过这个异常,说无法开启足够多的线程,直接导致Kafka宕机了。因此建议可以把这个参数调大10倍,比如655360这样的值,保证中间件可以开启足够多的线程。
可以用如下命令修改:
echo 'vm.max_map_count=655360' >> /etc/sysctl.conf
3. vm.swappiness
这个参数是用来控制进程的swap行为的,这个简单来说就是os会把一部分磁盘空间作为swap区域,然后如果有的进程现在可能不是太活跃,就会被操作系统把进程调整为睡眠状态,把进程中的数据放入磁盘上的swap区域,然后让这个进程把原来占用的内存空间腾出来,交给其他活跃运行的进程来使用。
如果这个参数的值设置为0,意思就是尽量别把任何一个进程放到磁盘swap区域去,尽量大家都用物理内存。
如果这个参数的值是100,那么意思就是尽量把一些进程给放到磁盘swap区域去,内存腾出来给活跃的进程使用。
默认这个参数的值是60,有点偏高了,可能会导致我们的中间件运行不活跃的时候被迫腾出内存空间然后放磁盘swap区域去。
因此通常在生产环境建议把这个参数调整小一些,比如设置为10,尽量用物理内存,别放磁盘swap区域去。
可以用如下命令修改:
echo 'vm.swappiness=10' >> /etc/sysctl.conf
4. ulimit
这个是用来控制linux上的最大文件链接数的,默认值可能是1024,一般肯定是不够的,因为你在大量频繁的读写磁盘文件的时候,或者是进行网络通信的时候,都会跟这个参数有关系。对于一个中间件系统而言肯定是不能使用默认值的,如果你采用默认值,很可能在线上会出现如下错误:error: too many open files。
因此通常建议用如下命令修改这个值:
echo 'ulimit -n 1000000' >> /etc/profile
在启动Broker和NameServer的时候查看启动脚本中的内容,以Broker的启动脚本为例:
- # 使用服务器模式启动;JVM默认分配内存;JVM最大可使用内存;年轻代内存;
- JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
-
- #选用了G1垃圾回收器来做分代回收;设置Region大小为16M;在G1管理的老年代里预留25%的空闲内存,保证新生代对象晋升到老年代的时候有足够空间,避免老年代内存都满了,新生代有对象要进入老年代没有充足内存了,默认值是10%,略微偏少;当堆内存的使用率达到30%之后就会自动启动G1的并发垃圾回收,开始尝试回收一些垃圾对象,默认值是45%,这里调低了一些,也就是提高了GC的频率,但是避免了垃圾对象过多,一次垃圾回收耗时过长的问题;这个参数默认设置为0了,建议这个参数不要设置为0,避免频繁回收一些软引用的Class对象,这里可以调整为比如1000
- JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
-
- # 控制GC日志打印输出,指定gc日志文件的地址,要打印哪些详细信息,每个gc日志文件的大小是30m,最多保留5个gc日志文件
- JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
- JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
-
- # 有时候JVM会抛弃一些异常堆栈信息,这个参数设置之后,就是禁用这个特性要把完整的异常堆栈信息打印出来
- JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
-
-
- # 指定JVM用多少内存,并不会真正分配给他,会在实际需要使用的时候再分配给他,所以使用这个参数之后就是强制让JVM启动的时候直接分配我们指定的内存,不要等到使用内存的时候再分配
- JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
-
-
- # NIO中的direct buffer最多申请多少内存,如果你机器内存比较大可以适当调大这个值
- JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
-
- # 禁用大内存页和偏向锁
- JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
-
- # other
- JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
- JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
- JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
- JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
在目录里有dledger的示例配置文件:rocketmq/distribution/target/apache-rocketmq/conf/dledger
有一个比较核心重要的参数: sendMessageThreadPoolNums=16
参数的意思就是RocketMQ内部用来发送消息的线程池的线程数量,默认是16。这参数可根据机器性能调整。
one-way: 当发送的消息不重要时,采用one-way
方式,以提高吞吐量;
同步:当发送的消息很重要是,且对响应时间不敏感的时候采用sync
方式;
异步:当发送的消息很重要,且对响应时间非常敏感的时候采用async
方式
在RocketMQ中,消费者有两种模式,一种是push模式,另一种是pull模式。 push模式:客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端。 pull模式:客户端不断的轮询请求服务端,来获取新的消息。 但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息。 区别:
Push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
Topic是一个逻辑上的概念,实际上在每个broker上以queue的形式保存,也就是说每个topic在broker上会划分成几个逻辑队列,每个逻辑队列保存一部分消息数据,但是保存的消息数据实际上不是真正的消息数据,而是指向commit log的消息索引。
Topic创建的时候可以用集群模式去创建(这样集群里面每个broker的queue的数量相同),也可以用单个broker模式去创建(这样每个broker的queue数量可以不一致)。
每个broker上的角色是等同的,也就是说每个broker上面的queue保存了该topic下一部分消息,注意是一部分而不是全量消息。
topic数据结构图(图片来源于互联网):
如果一个master broker宕机了,那么这台broker就无法提供写入消息的服务,如果你还是按照之前的策略来均匀把数据写入各个Broker上的MessageQueue,那么会导致你在一段时间内,每次访问到这个挂掉的Master Broker都会访问失败,这个似乎不是我们想要的样子。
通常来说建议大家在Producer中开启一个开关,就是sendLatencyFaultEnable。 启用Broker故障延迟机制。该值默认为``false
,在不开启的情况下,相同线程发送消息是轮询topic下的所有队列。一旦打开了这个开关,那么他会有一个自动容错机制,比如如果某次访问一个Broker发现网络延迟有500ms,然后还无法访问,那么就会自动回避访问这个Broker一段时间,比如接下来3000ms内,就不会访问这个Broker了。
另一个就是消息重试:
producer的send方法本身支持内部重试,重试逻辑如下:
1、最大重试次数默认2次,可以通过参数retryTimesWhenSendFailed
设置
2、发送失败,则轮询到下一个broker,如果此时只有一个broker在线呢?那就会轮训这个broker下的其他队列。 3、这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认为3s。
如果发送消息,broker返回结果超时,这种超时不会进行重试了;如果是方法本身耗时超过sendMsgTimeout ,还未来得及调用发送消息,此时的超时也不会重试。
以上策略其实也很难保证同步发送消息一定成功,如果应用要保证消息不丢失,最好先把消息存储到db,后台启线程定时重试,确保消息一定存储到broker。
首先生产者将消息发送到Broker,先会把把消息存入磁盘上的日志文件,也就是CommitLog,顺序写入。每个CommitLog文件限定最多1GB,如果一个CommitLog写满了1GB,就会创建一个新的CommitLog文件,所以Broker下会有很多个CommitLog文件。
在Broker中,对Topic下的每个MessageQueue都会有一系列的ConsumeQueue文件。在Broker磁盘文件下有下面这种格式的文件:
$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
{topic}指代的就是某个Topic,{queueId}指代的就是某个MessageQueue。然后对存储在这台Broker机器上的Topic下的一个MessageQueue,他有很多的ConsumeQueue文件,这个ConsumeQueue文件里存储的是一条消息对应在CommitLog文件中的offset偏移量。
首先我们假设有一个Topic,他有4个MessageQueue,然后在两台Broker机器上,每台Broker机器会存储两个MessageQueue。
那么此时假设生产者选择对其中一个MessageQueue写入了一条消息,此时消息会发送到Broker上。然后Broker必然会把这个消息写入自己的CommitLog文件中。也就是说,Topic下的MessageQueue0和MessageQueue1就放在这个Broker机器上,而且他们每个MessageQueue目前在磁盘上就对应了一个ConsumeQueue,所以就是MessageQueue0对应着Broker磁盘上的ConsumeQueue0,MessageQueue1对应着磁盘上的ConsumeQueue1。
接着假设Queue的名字叫做:TopicOrderPaySuccess,那么此时在Broker磁盘上应该有如下两个路径的文件:
$HOME/store/consumequeue/TopicOrderPaySuccess/MessageQueue0/ConsumeQueue0磁盘文件
$HOME/store/consumequeue/TopicOrderPaySuccess/MessageQueue1/ConsumeQueue1磁盘文件
当你的Broker收到一条消息写入了CommitLog之后,其实他同时会将这条消息在CommitLog中的物理位置,也就是一个文件偏移量,就是一个offset,写入到这条消息所属的MessageQueue对应的ConsumeQueue文件中去。
比如现在这条消息在生产者发送的时候是发送给MessageQueue0的,那么此时Broker就会将这条消息在CommitLog中的offset偏移量,写入到MessageQueue0对应的ConsumeQueue0中去,如下图所示。
~~
所以实际上,ConsumeQueue0中存储的是一个一个消息在CommitLog文件中的物理位置,也就是offset。ConsumeQueue中的一个物理位置其实是对CommitLog文件中一个消息的引用。
实际上在ConsumeQueue中存储的每条数据不只是消息在CommitLog中的offset偏移量,还包含了消息的长度,以及tag hashcode,一条数据是20个字节,每个ConsumeQueue文件保存30万条数据,大概每个文件是5.72MB。
所以实际上Topic的每个MessageQueue都对应了Broker机器上的多个ConsumeQueue文件,保存了这个MessageQueue的所有消息在CommitLog文件中的物理位置,也就是offset偏移量。
Broker是基于OS操作系统的PageCache和顺序写两个机制,来提升写入CommitLog文件的性能的。
首先Broker是以顺序的方式将消息写入CommitLog磁盘文件的,也就是每次写入就是在文件末尾追加一条数据就可以了,对文件进行顺序写的性能要比对文件随机写的性能提升很多。
另外,数据写入CommitLog文件的时候,其实不是直接写入底层的物理磁盘文件的,而是先进入OS的PageCache内存缓存中,然后后续由OS的后台线程选一个时间,异步化的将OS PageCache内存缓冲中的数据刷入底层的磁盘文件。
所以在这样的优化之下,采用磁盘文件顺序写+OS PageCache写入+OS异步刷盘的策略,基本上可以让消息写入CommitLog的性能跟你直接写入内存里是差不多的,所以正是如此,才可以让Broker高吞吐的处理每秒大量的消息写入。
异步刷盘: 磁盘文件顺序写+OS PageCache写入+OS异步刷盘,高吞吐写入+丢失数据风险
同步刷盘: 写入吞吐量下降+数据不丢失,如果你使用同步刷盘模式的话,那么生产者发送一条消息出去,broker收到了消息,必须直接强制把这个消息刷入底层的物理磁盘文件中,然后才会返回ack给producer,此时你才知道消息写入成功了。
Broker上述高可用架构就是基于DLedger技术来实现的。DLedger技术实际上首先他自己就有一个CommitLog机制,你把数据交给他,他会写入CommitLog磁盘文件里去,这是他能干的第一件事情。所以首先我们在下面的图里可以看到,如果基于DLedger技术来实现Broker高可用架构,实际上就是用DLedger先替换掉原来Broker自己管理的CommitLog,由DLedger来管理CommitLog:
接着我们思考一下,如果我们配置了一组Broker,比如有3台机器,DLedger是如何从3台机器里选举出来一个Leader的?
实际上DLedger是基于Raft协议来进行Leader Broker选举的,那么Raft协议中是如何进行多台机器的Leader选举的呢?
这需要发起一轮一轮的投票,通过三台机器互相投票选出来一个人作为Leader。
简单来说,三台Broker机器启动的时候,他们都会投票自己作为Leader,然后把这个投票发送给其他Broker。此时在第一轮选举中,Broker01会收到别人的投票,他发现自己是投票给自己,但是Broker02投票给Broker02自己,Broker03投票给Broker03自己,似乎每个人都很自私,都在投票给自己,所以第一轮选举是失败的。
接着每个人会进入一个随机时间的休眠,比如说Broker01休眠3秒,Broker02休眠5秒,Broker03休眠4秒。
此时Broker01必然是先苏醒过来的,他苏醒过来之后,直接会继续尝试投票给自己,并且发送自己的选票给别人。
接着Broker03休眠4秒后苏醒过来,他发现Broker01已经发送来了一个选票是投给Broker01自己的,此时他自己因为没投票,所以会尊重别人的选择,就直接把票投给Broker01了,同时把自己的投票发送给别人。
接着Broker02苏醒了,他收到了Broker01投票给Broker01自己,收到了Broker03也投票给了Broker01,那么他此时自己是没投票的,直接就会尊重别人的选择,直接就投票给Broker01,并且把自己的投票发送给别人。
此时所有人都会收到三张投票,都是投给Broker01的,那么Broker01就会当选为Leader。
其实只要有(3台机器 / 2) + 1个人投票给某个人,就会选举他当Leader,这个(机器数量 / 2) + 1就是大多数的意思。这就是Raft协议中选举leader算法的简单描述,简单来说,他确保有人可以成为Leader的核心机制就是一轮选举不出来Leader的话,就让大家随机休眠一下,先苏醒过来的人会投票给自己,其他人苏醒过后发现自己收到选票了,就会直接投票给那个人。
依靠这个随机休眠的机制,基本上几轮投票过后,一般都是可以快速选举出来一个Leader。然后只有Leader可以接收数据写入,Follower只能接收Leader同步过来的数据。
简单来说,数据同步会分为两个阶段,一个是uncommitted阶段,一个是commited阶段。
首先Leader Broker上的DLedger收到一条数据之后,会标记为uncommitted状态,然后他会通过自己的DLedgerServer组件把这个uncommitted数据发送给Follower Broker的DLedgerServer。
接着Follower Broker的DLedgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的DLedgerServer,然后如果Leader Broker收到超过半数的Follower Broker返回ack之后,就会将消息标记为committed状态。
然后Leader Broker上的DLedgerServer就会发送commited消息给Follower Broker机器的DLedgerServer,让他们也把消息标记为comitted状态。
这个就是基于Raft协议实现的两阶段完成的数据同步机制。
如果Leader Broker崩溃,则基于DLedger和Raft协议重新选举Leader
一般情况下同一个微服务都会部署多台机器,并且给这个服务配置消费者组名XXX,那么这多台机器上部署的服务消费者组名字都叫XXX,那么这多台机器就属于同一个消费者组。以此类推,每个系统的几台机器都是属于各自的消费者组。
默认情况下我们都是集群模式,也就是说,一个消费组获取到一条消息,只会交给组内的一台机器去处理,不是每台机器都可以获取到这条消息的。但是我们可以通过如下设置来改变为广播模式:
consumer.setMessageModel(MessageModel.BROADCASTING);
如果修改为广播模式,那么对于消费组获取到的一条消息,组内每台机器都可以获取到这条消息。但是相对而言广播模式其实用的很少,常见基本上都是使用集群模式来进行消费的。
所以你大致可以认为一个Topic的多个MessageQueue会均匀分摊给消费组内的多个机器去消费,这里的一个原则就是,一个MessageQueue只能被一个消费机器去处理,但是一台消费者机器可以负责多个MessageQueue的消息处理。
其实消费消息的时候,本质就是根据你要消费的MessageQueue以及开始消费的位置,去找到对应的ConsumeQueue读取里面对应位置的消息在CommitLog中的物理offset偏移量,然后到CommitLog中根据offset读取消息数据,返回给消费者机器。
接着消费者机器拉取到一批消息之后,就会将这批消息回调我们注册的一个函数,如下面这样子:
当我们处理完这批消息之后,消费者机器就会提交我们目前的一个消费进度offset到Broker上去,然后Broker就会存储我们的消费进度
这个时候其实会进入一个rabalance的环节,也就是说重新给各个消费机器分配他们要处理的MessageQueue。
broker收到一条消息,会写入CommitLog文件,但是会先把CommitLog文件中的数据写入os cache(操作系统管理的缓存)中去。然后os自己有后台线程,过一段时间后会异步把os cache缓存中的CommitLog文件的数据刷入磁盘中去。就是依靠这个写入CommitLog时先进入os cache缓存,而不是直接进入磁盘的机制,就可以实现broker写CommitLog文件的性能是内存写级别的,这才能实现broker超高的消息接入吞吐量。
也就是说,对于Broker机器的磁盘上的大量的ConsumeQueue文件,在写入的时候也都是优先进入os cache中的。而且os自己有一个优化机制,就是读取一个磁盘文件的时候,他会自动把磁盘文件的一些数据缓存到os cache中。而且ConsumeQueue文件主要是存放消息的offset,所以每个文件很小,30万条消息的offset就只有5.72MB而已。所以实际上ConsumeQueue文件们是不占用多少磁盘空间的,他们整体数据量很小,几乎可以完全被os缓存在内存cache里。
所以实际上在消费者机器拉取消息的时候,第一步大量的频繁读取ConsumeQueue文件,几乎可以说就是跟读内存里的数据的性能是一样的,通过这个就可以保证数据消费的高性能以及高吞吐。
因为CommitLog是用来存放消息的完整数据的,所以内容量是很大的,毕竟他一个文件就要1GB,所以整体完全有可能多达几个TB。也就是说,os cache对于CommitLog而言,主要是提升文件写入性能,当你不停的写入的时候,很多最新写入的数据都会先停留在os cache里,比如这可能有10GB~20GB的数据。之后os会自动把cache里的比较旧的一些数据刷入磁盘里,腾出来空间给更新写入的数据放在os cache里,所以大部分数据可能多达几个TB都是在磁盘上的。
所以最终结论来了,当你拉取消息的时候,可以轻松从os cache里读取少量的ConsumeQueue文件里的offset,这个性能是极高的,但是当你去CommitLog文件里读取完整消息数据的时候,会有两种可能。
第一:如果你读取的是那种刚刚写入CommitLog的数据,那么大概率他们还停留在os cache中,此时你可以顺利的直接从os cache里读取CommitLog中的数据,这个就是内存读取,性能是很高的。
第二:你也许读取的是比较早之前写入CommitLog的数据,那些数据早就被刷入磁盘了,已经不在os cache里了,那么此时你就只能从磁盘上的文件里读取了,这个性能是比较差一些的。
如果你的消费者机器一直快速的在拉取和消费处理,紧紧的跟上了生产者写入broker的消息速率,那么你每次拉取几乎都是在拉取最近人家刚写入CommitLog的数据,那几乎都在os cache里。但是如果broker的负载很高,导致你拉取消息的速度很慢,或者是你自己的消费者机器拉取到一批消息之后处理的时候性能很低,处理的速度很慢,这都会导致你跟不上生产者写入的速率,这样就会从磁盘拉取数据。
broker知道当前机器物理内存有多大,自己能使用的最大内存是多大,所以如果消费消息落后太多,并且剩余未拉取消息总大小已经超过了可使用的最大os cache大小,这个时候就会建议下次从slave broker拉取消息。
Broker中就是大量的使用mmap技术去实现CommitLog这种大磁盘文件的高性能读写优化的。
Broker对磁盘文件的写入主要是借助直接写入os cache来实现性能优化的,因为直接写入os cache,相当于就是写入内存一样的性能,后续等os内核中的线程异步把cache中的数据刷入磁盘文件即可。
传统IO操作: 磁盘文件 拷贝-> 内核IO缓冲区 拷贝-> 用户进程私有空间
RocketMQ底层对CommitLog、ConsumeQueue之类的磁盘文件的读写操作,基本上都会采用mmap技术来实现。如果具体到代码层面,就是基于JDK NIO包下的MappedByteBuffer的map()函数,来先将一个磁盘文件(比如一个CommitLog文件,或者是一个ConsumeQueue文件)映射到内存里来.
刚开始你建立映射的时候,并没有任何的数据拷贝操作,其实磁盘文件还是停留在那里,只不过他把物理上的磁盘文件的一些地址和用户进程私有空间的一些虚拟内存地址进行了一个映射.
这个地址映射的过程,就是JDK NIO包下的MappedByteBuffer.map()函数干的事情,底层就是基于mmap技术实现的。另外这里给大家说明白的一点是,这个mmap技术在进行文件映射的时候,一般有大小限制,在1.5GB~2GB之间.所以RocketMQ才让CommitLog单个文件在1GB,ConsumeQueue文件在5.72MB,不会太大。
接下来就可以对这个已经映射到内存里的磁盘文件进行读写操作了,比如要写入消息到CommitLog文件,你先把一个CommitLog文件通过MappedByteBuffer的map()函数映射其地址到你的虚拟内存地址。
接着就可以对这个MappedByteBuffer执行写入操作了,写入的时候他会直接进入PageCache中,然后过一段时间之后,由os的线程异步刷入磁盘中,如下图我们可以看到这个示意。
而且PageCache技术在加载数据的时候还会将你加载的数据块的临近的其他数据块也一起加载到PageCache里去。
预映射机制 + 文件预热机制:
(1)内存预映射机制:Broker会针对磁盘上的各种CommitLog、ConsumeQueue文件预先分配好MappedFile,也就是提前对一些可能接下来要读写的磁盘文件,提前使用MappedByteBuffer执行map()函数完成映射,这样后续读写文件的时候,就可以直接执行了。
(2)文件预热:在提前对一些文件完成映射之后,因为映射不会直接将数据加载到内存里来,那么后续在读取尤其是CommitLog、ConsumeQueue的时候,其实有可能会频繁的从磁盘里加载数据到内存中去。所以其实在执行完map()函数之后,会进行madvise系统调用,就是提前尽可能多的把磁盘文件加载到内存里去。
通过上述优化,才真正能实现一个效果,就是写磁盘文件的时候都是进入PageCache的,保证写入高性能;同时尽可能多的通过map + madvise的映射后预热机制,把磁盘文件里的数据尽可能多的加载到PageCache里来,后续对CosumeQueue、CommitLog进行读取的时候,才能尽可能从内存里读取数据。
总结图: