MIT6.830的lab6中,我们将实现日志的回滚和崩溃恢复。
在进行我们的任务之前,我们需要了解一些基本概念。
lab6这个实验中,我们实现的是steal/no-force策略,两种策略的区别如下:
static final int ABORT_RECORD = 1;
static final int COMMIT_RECORD = 2;
static final int UPDATE_RECORD = 3;
static final int BEGIN_RECORD = 4;
static final int CHECKPOINT_RECORD = 5;
simpledb的日志记录一共有5种:ABORT, COMMIT, UPDATE, BEGIN, and CHECKPOINT,分别记录事务失败、事务提交、写入磁盘前的脏页、事务开始、检测点,这些格式的日志都记录在同一个日志文件中;日志文件以及每条日志的通用格式如下:
对于ABORT, COMMIT, and BEGIN这三种,中间的content是空的;
对于UPDATE格式的记录,主要是五部分:
对于CHECKPOINT记录,在日志的开始时候会记录一个long类型的数据,记录的就是最新checkpoint的offset。
而CHECKPOINT记录的格式如下:
在simpledb中,日志不区分redo log和undo log,格式较为简单,也不会记录事务执行过程中对记录的具体修改行为。
simpledb中用UPDATE格式的日志来保存数据的变化,在每次将数据页写入磁盘前需要用logWrite方法来记录变化:
public synchronized void logWrite(TransactionId tid, Page before,
Page after)
throws IOException {
Debug.log("WRITE, offset = " + raf.getFilePointer());
preAppend();
/* update record conists of
record type
transaction id
before page data (see writePageData)
after page data
start offset
*/
raf.writeInt(UPDATE_RECORD);
raf.writeLong(tid.getId());
writePageData(raf,before);
writePageData(raf,after);
raf.writeLong(currentOffset);
currentOffset = raf.getFilePointer();
Debug.log("WRITE OFFSET = " + currentOffset);
}
前面也说了,update record保存了每一个page的before和after数据,before数据就是修改前的老数据,after数据就是当前要刷盘的数据。
每一个page都会有一个数据结构:
byte[] oldData;
该数组就保存了page的旧数据流。
数据页一开始的旧数据是从磁盘中读取后进行设置的,之后当事务提交时,就意味着这个修改已经是持久化到磁盘了,可以更新一次旧数据。而刷盘主要是两种情况:
(1)某一事务提交时刷盘
public synchronized void flushPages(TransactionId tid) throws IOException {
// some code goes here
// not necessary for lab1|lab2
LRUCache<PageId, Page>.DLinkedNode head = buffer.getHead();
LRUCache<PageId, Page>.DLinkedNode tail = buffer.getTail();
while(head!=tail){
Page page = head.value;
if(page!=null && page.isDirty()!=null&&page.isDirty().equals(tid) ){
DbFile dbFile = Database.getCatalog().getDatabaseFile(page.getId().getTableId());
//记录日志
try{
Database.getLogFile().logWrite(page.isDirty(),page.getBeforeImage(),page);
Database.getLogFile().force();
page.markDirty(false,null);
dbFile.writePage(page);
page.setBeforeImage();
}catch (IOException e){
e.printStackTrace();
}
}
head = head.next;
}
}
(2)进行一次checkpoint时全部刷盘
public void logCheckpoint() throws IOException {
//make sure we have buffer pool lock before proceeding
synchronized (Database.getBufferPool()) {
synchronized (this) {
//Debug.log("CHECKPOINT, offset = " + raf.getFilePointer());
preAppend();
long startCpOffset, endCpOffset;
Set<Long> keys = tidToFirstLogRecord.keySet();
Iterator<Long> els = keys.iterator();
force();
Database.getBufferPool().flushAllPages();
...
这里调用了一次Database.getBufferPool().flushAllPages()将所有脏页进行刷盘,并且记录旧数据。
该任务主要完成的是LogFile.rollback()函数,当事务中止时,在事务释放其锁之前调用此函数。它的工作是撤消事务可能对数据库所做的任何更改。
大概逻辑很简单,只需要找到该事务tid第一个update record,然后用它的before data进行刷盘就可以进行恢复。
在介绍任务的完成代码前,我们必须要了解一下LogFile中的一个数据结构:
final Map<Long,Long> tidToFirstLogRecord = new HashMap<>();
该map结构很重要,key是活跃事务tid,value就是当前事务的第一条record offset。开始BEGIN日志的时候会往map里put一个tid和对应的offset进去,在事务完成(COMMIT或ABORT)以后会删除这个map里对应的tid的记录。所以这个map里实际上保存的是正在进行的事务的BeginOffset。
public void rollback(TransactionId tid)
throws NoSuchElementException, IOException {
synchronized (Database.getBufferPool()) {
synchronized(this) {
// some code goes here
preAppend();
long tidId = tid.getId();
Long begin = tidToFirstLogRecord.get(tidId);
raf.seek(begin);
while(true){
try{
int type = raf.readInt();
Long curTid = raf.readLong();
if(curTid!=tidId){
//如果不是当前的tid,就直接跳过
if(type==3){
//update record 还要跳过页数据
readPageData(raf);
readPageData(raf);
}
}else{
if(type==3){
//只需要恢复到最初的状态就行
Page before = readPageData(raf);
Page after = readPageData(raf);
DbFile databaseFile = Database.getCatalog().getDatabaseFile(before.getId().getTableId());
databaseFile.writePage(before);
Database.getBufferPool().discardPage(after.getId());
raf.seek(raf.getFilePointer()+8);
break;
}
}
raf.seek(raf.getFilePointer()+8);
}catch (EOFException e){
break;
}
}
}
}
}
该任务主要完成的是LogFile.recover(),当数据库崩溃并重启后,就要在所有的事务开始前开始一次数据恢复。
大概得恢复理论就是借助checkpoint,大概逻辑如下:
当然其实我们的数据库是一种简化后的数据库,redo和undo操作只需要将相应的page刷盘就可以了。
public void recover() throws IOException {
synchronized (Database.getBufferPool()) {
synchronized (this) {
recoveryUndecided = false;
// some code goes here
HashMap<Long, List<Page[]>> undoMap = new HashMap<>();
raf.seek(0);
print();
long checkpoint = raf.readLong();
if(checkpoint!=-1){
HashMap<Long, Long> tidPos = new HashMap<>();
raf.seek(checkpoint);
//跳过record type和tid
raf.seek(raf.getFilePointer()+12);
//获取正在进行事务的个数
int num = raf.readInt();
while(num>0){
//获取每一个事务的tid和第一条log record OFFSET
long curTid = raf.readLong();
long offset = raf.readLong();
tidPos.put(curTid,offset);
num--;
}
for(Long pos:tidPos.keySet()){
raf.seek(tidPos.get(pos));
recoverSearch(raf,undoMap);
}
}else{
System.out.println(raf.getFilePointer() + "-----------");
recoverSearch(raf, undoMap);
}
//进行undo操作
for(Long tid:undoMap.keySet()){
Page[] pages = undoMap.get(tid).get(0);
Page before = pages[0];
DbFile databaseFile = Database.getCatalog().getDatabaseFile(before.getId().getTableId());
databaseFile.writePage(before);
}
undoMap.clear();
}
}
}
看下recoverSearch函数:
/**
* 从指定位置开始检索每一条记录,update记录就直接放入map中,commit就直接将最终page刷盘,abort就直接将开始前的page刷盘
* @param raf
* @param map
*/
private void recoverSearch(RandomAccessFile raf,Map<Long,List<Page[]>> map) throws IOException {
while(true){
try{
int type = raf.readInt();
long curTid = raf.readLong();
if(type==3){
//update
if(!map.containsKey(curTid)){
map.put(curTid,new ArrayList<>());
}
Page before = readPageData(raf);
Page after = readPageData(raf);
map.get(curTid).add(new Page[]{before,after});
}else if(type==2 && map.containsKey(curTid)){
//commit
Page[] pages = map.get(curTid).get(map.get(curTid).size() - 1);
Page after = pages[1];
DbFile databaseFile = Database.getCatalog().getDatabaseFile(after.getId().getTableId());
databaseFile.writePage(after);
map.remove(curTid);
}else if(type==1 && map.containsKey(curTid)){
//abort
Page[] pages = map.get(curTid).get(0);
Page before = pages[0];
DbFile databaseFile = Database.getCatalog().getDatabaseFile(before.getId().getTableId());
databaseFile.writePage(before);
map.remove(curTid);
}
raf.seek(raf.getFilePointer()+8);
}catch (EOFException e){
break;
}
}
}
从指定的文件位置开始进行读取每一个record:
这里特地再说明一下,在这个map中保存的都是一些需要undo的事务数据。