rocketmq-all-4.3.1
版本由于ConsumeQueue和Index文件都是根据CommitLog文件异步构建的,所以ConsumeQueue、Index与CommitLog文件的数据就是最终一致,而不是强一致的。这样在Broker重启时就可能出现不一致性的情况
RocketMQ 有两种文件恢复机制。判断异常的方式是在 broker启动的时候创建一个 abort 空文件,在正常结束的时候删掉这个文件。在下一次启动 broker 时,如果发现了 abort 文件,则认为是异常宕机,否则就是正常关机。
关机恢复机制设计的目的就是保证数据0丢失,RocketMQ通过abort和checkpoint来保证数据0丢失
当索引文件刷盘成功,消息队列消费文件未刷盘成功且宕机时,会造成消息消费队列文件丢失的问题。但只要 Commitlog 文件没有丢失,就可以利用 RocketMQ 的文件恢复机制,恢复丢失的消息消费队列文件。在 RocketMQ 的文件恢复机制中,有针对异常宕机进行文件恢复的机制。当 broker 异常启动,在文件恢复过程中,RocketMQ 会将最后一个有效文件的所有消息转发到消息消费队列和索引文件,确保不丢失消息,但同时也会带来重复消费的问题,RocketMQ 保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幂等设计。
StoreCheckpoint(检查点)主要用于记录CommitLog
、ConsumeQueue
、Index
文件的刷盘时间点,当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复。checkpoint(检查点)文件固定长度为4KB
当索引文件刷盘成功,消息队列消费文件未刷盘成功且宕机时,会造成消息消费队列文件丢失的问题。但只要 Commitlog 文件没有丢失,就可以利用 RocketMQ 的文件恢复机制,恢复丢失的消息消费队列文件。在 RocketMQ 的文件恢复机制中,有针对异常宕机进行文件恢复的机制。当 broker 异常启动,在文件恢复过程中,RocketMQ 会将最后一个有效文件的所有消息转发到消息消费队列和索引文件,确保不丢失消息,但同时也会带来重复消费的问题,RocketMQ 保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幂等设计。
StoreCheckpoint文件源码
public class StoreCheckpoint {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final RandomAccessFile randomAccessFile;
private final FileChannel fileChannel;
private final MappedByteBuffer mappedByteBuffer;
//CommitLog最新一条记录的存储时间
private volatile long physicMsgTimestamp = 0;
//ConsumeQueue最新一条记录的存储时间
private volatile long logicsMsgTimestamp = 0;
//Index File最新一条记录的存储时间
private volatile long indexMsgTimestamp = 0;
public StoreCheckpoint(final String scpPath) throws IOException {
File file = new File(scpPath);
MappedFile.ensureDirOK(file.getParent());
boolean fileExists = file.exists();
this.randomAccessFile = new RandomAccessFile(file, "rw");
//一旦建立映射(map),fileChannel其实就可以关闭了,关闭fileChannel对映射不会有影响
//TODO 所以这个地方的fileChannel是不是直接关闭就好?
this.fileChannel = this.randomAccessFile.getChannel();
this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);
if (fileExists) {
log.info("store checkpoint file exists, " + scpPath);
this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
} else {
log.info("store checkpoint file not exists, " + scpPath);
}
}
public void shutdown() {
this.flush();
// unmap mappedByteBuffer
MappedFile.clean(this.mappedByteBuffer);
try {
this.fileChannel.close();
} catch (IOException e) {
log.error("Failed to properly close the channel", e);
}
}
public void flush() {
this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
this.mappedByteBuffer.force();
}
public long getPhysicMsgTimestamp() {
return physicMsgTimestamp;
}
public void setPhysicMsgTimestamp(long physicMsgTimestamp) {
this.physicMsgTimestamp = physicMsgTimestamp;
}
public long getLogicsMsgTimestamp() {
return logicsMsgTimestamp;
}
public void setLogicsMsgTimestamp(long logicsMsgTimestamp) {
this.logicsMsgTimestamp = logicsMsgTimestamp;
}
public long getMinTimestampIndex() {
return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
}
public long getMinTimestamp() {
long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);
//TODO 这里为什么要减去3000?
min -= 1000 * 3;
if (min < 0)
min = 0;
return min;
}
public long getIndexMsgTimestamp() {
return indexMsgTimestamp;
}
public void setIndexMsgTimestamp(long indexMsgTimestamp) {
this.indexMsgTimestamp = indexMsgTimestamp;
}
}
存储文件的启动时恢复主要完成成flushedWhere
、committedWhere
指针的设置、将消息消费队列最大偏移量加载到内存,并删除flushedWhere
之后所有的文件
DefaultMessageStore#load
是文件恢复的入口
判断abort文件是否存在,此文件在启动时创建,正常停止后时会被删除
加载延迟日志文件
加载CommitLog文件,按照文件名进行排序。如果文件与配置中的CommitLog文件大小不一致,则直接返回,会忽略后续的文件
加载ComsumeQueue文件
加载checkpoint文件
加载Index文件,如果上次异常退出,而且Index文件刷盘时间大于检查点文件最大的消息时间戳,则立即销毁此文件
根据是否正常停止,执行不同的恢复策略
load源码
public boolean load() {
boolean result = true;
try {
//判断abort文件是否存在,此文件在启动时创建,正常停止时会被删除
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// load Commit Log
result = result && this.commitLog.load();
// load Consume Queue
result = result && this.loadConsumeQueue();
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
正常恢复通过CommitLog#recoverNormally
实现
正常恢复CommitLog#recoverNormally
的源码
public void recoverNormally() {
// 默认开启CRC验证
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
// 从倒数第三个文件开始恢复
int index = mappedFiles.size() - 3;
if (index < 0)
// 不足三个文件,则从第一个文件开始恢复
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//processOffset为CommitLog文件已确认的物理偏移量
long processOffset = mappedFile.getFileFromOffset();
//当前文件已校验通过的物理偏移量
long mappedFileOffset = 0;
// 遍历CommitLog文件
while (true) {
// 查找消息,根据配置是否验证CRC
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (dispatchRequest.isSuccess() && size > 0) {
// 没有到文件末尾,mappedFileOffset指针向前移动本条消息的长度
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
// 文件末尾
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
// 下一个文件,重置mappedFileOffset和processOffset,继续下一次循环
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
//processOffset为CommitLog文件已确认的物理偏移量
processOffset = mappedFile.getFileFromOffset();
//当前已经校验通过的偏移量
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
// 更新MappedFileQueue的flushedWhere和committedWhere指针
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
}
}
异常恢复通过CommitLog#recoverAbnormally
实现基本与正常恢复逻辑差不多
异常恢复CommitLog#recoverAbnormally
源码分析
public void recoverAbnormally() {
// recover by the minimum time stamp
// 默认为ttue,即校验消息CRC
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
// 从最后一个文件开始向前遍历
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
// 找到第一个消息存储正常的文件
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
if (index < 0) {
// 第一个文件
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//processOffset为CommitLog文件已确认的物理偏移量
long processOffset = mappedFile.getFileFromOffset();
//当前文件已校验通过的物理偏移量
long mappedFileOffset = 0;
// 遍历CommitLog文件
while (true) {
// 查找消息,根据配置是否验证CRC
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (size > 0) {
mappedFileOffset += size;
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Intermediate file read error
else if (size == -1) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
//当前已经校验通过的偏移量
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
}
// 更新MappedFileQueue的flushedWhere和committedWhere指针
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
// Commitlog case files are deleted
else {
// 未找到有效的MappedFile,更新flushwhere和CommittedWhere为0
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
// 删除ConsumeQueue文件
this.defaultMessageStore.destroyLogics();
}
}