日志写入WAL Buffer的过程分为两步:
- /*
- * pg声明了NUM_XLOGINSERT_LOCKS(目前是8)个用于wal插入的锁WALInsertLock
- * 值越大可以并发插入的进程越多,但是CPU负载会越高。
- */
- #define NUM_XLOGINSERT_LOCKS 8
- /* 每个WALInsertLock由“轻量锁+日志写入位置”组成
- * 想要进行日志写入时,必须持有一个WALInsertLock(随机获取,哪一个无所谓)
- */
- typedef struct
- {
- LWLock lock; // 轻量锁,当锁释放时,代表日志已经写入WAL Buffer
- XLogRecPtr insertingAt; // 记录当前日志写入WAL Buffer的进展,不需要跨页写入的小记录不会去更新这个值,通常在日志记录较长时才会更新该值。insertingAt这个变量会在进程将WAL由内存刷往磁盘时读取,以确认所有对该区域的写入操作已完成
-
- XLogRecPtr lastImportantAt; // lastImportantAt contains the LSN of the last important WAL record inserted using a given lock.在待插入的日志记录中,有一些记录是和数据一致性无关的,即使丢失也不影响,这种记录不影响lastImportantAt的值
- } WALInsertLock;
这里我们留下两个问题:
这个问题的答案在WaitXLogInsertionsToFinish函数,下一篇我们会学习它。
简单来说,每次WAL刷入磁盘,都会调用这个函数,而这个函数需要遍历所有WALInsertLocks,所以NUM_XLOGINSERT_LOCKS不宜过大,目前代码中写死为8。
- for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
- {
- ...
- }
如前所述,这个代码最重要就干两件事:
函数开头是一些检查
- XLogRecPtr
- XLogInsertRecord(XLogRecData *rdata,
- XLogRecPtr fpw_lsn,
- uint8 flags,
- int num_fpi)
- {
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- pg_crc32c rdata_crc;
- bool inserted;
- XLogRecord *rechdr = (XLogRecord *) rdata->data;
- uint8 info = rechdr->xl_info & ~XLR_INFO_MASK;
- bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
- info == XLOG_SWITCH);
- XLogRecPtr StartPos;
- XLogRecPtr EndPos;
- bool prevDoPageWrites = doPageWrites;
- …
- START_CRIT_SECTION();
- // WAL日志段切换期间会拿排他锁,此时其他进程不能预留空间
- if (isLogSwitch)
- WALInsertLockAcquireExclusive();
- else
- WALInsertLockAcquire();
-
- // 进程当前copy的RedoRecPtr有没有过期,如果过期了(只会发生在恰好做完checkpoint操作),需要回到调用函数重新计算,因此这种场景下会比其他场景慢。
- if (RedoRecPtr != Insert->RedoRecPtr)
- {
- Assert(RedoRecPtr < Insert->RedoRecPtr);
- RedoRecPtr = Insert->RedoRecPtr;
- }
- // 另外要检查是否启用了 fullPageWrites 或者 forcePageWrites
- doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
-
- if (doPageWrites &&
- (!prevDoPageWrites ||
- (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr)))
- {
- /*
- * Oops, some buffer now needs to be backed up that the caller didn't
- * back up. Start over.如果人家配了但你没做全页写,需要回炉重做,直接报错返回
- */
- WALInsertLockRelease();
- END_CRIT_SECTION();
- return InvalidXLogRecPtr;
- }
预留空间部分
- /*
- * Reserve space for the record in the WAL. This also sets the xl_prev pointer.
- * 预留空间,这步也会设置xl_prev指针
- */
- if (isLogSwitch)
- // 如果是日志切换记录,恰好需要做日志切换,则可能StartPos和EndPos相同,也就是说不需要记这个WAL日志记录
- inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
- else
- {
- ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
- &rechdr->xl_prev);
- inserted = true;
- }
数据复制部分
- // 预留空间之后,开始做数据复制。inserted 为true,表示非日志切换记录
- if (inserted)
- {
- /*
- * Now that xl_prev has been filled in, calculate CRC of the record header.目前xl_prev已经填充了,对记录头做cdc校验
- */
- rdata_crc = rechdr->xl_crc;
- COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
- FIN_CRC32C(rdata_crc);
- rechdr->xl_crc = rdata_crc;
-
- /*
- * All the record data, including the header, is now ready to be
- * inserted. Copy the record in the space reserved. 将日志记录复制到WAL Buffer
- */
- CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
- StartPos, EndPos);
-
- /*
- * Unless record is flagged as not important, update LSN of last
- * important record in the current slot. When holding all locks, just
- * update the first one.除非是一些被标记为不重要的数据,否则都需要更新当前槽位的lastImportantAt值,如果holdingAllLocks为真,则更新第一个值
- */
- if ((flags & XLOG_MARK_UNIMPORTANT) == 0)
- {
- int lockno = holdingAllLocks ? 0 : MyLockNo;
-
- WALInsertLocks[lockno].l.lastImportantAt = StartPos;
- }
- }
- else // inserted 为false,表示日志切换记录
- {
- /*
- * This was an xlog-switch record, but the current insert location was
- * already exactly at the beginning of a segment, so there was no need
- * to do anything. 这是一条日志切换记录,但当前插入位置正好在段的开始位置,因此什么都不用干(因为没东西可以复制)。
- */
- }
-
- /*
- * Done! Let others know that we're finished.操作完成,释放锁
- */
- WALInsertLockRelease();
-
- MarkCurrentTransactionIdLoggedIfAny();
-
- END_CRIT_SECTION();
- …
- /*
- * Update our global variables
- */
- ProcLastRecPtr = StartPos;
- XactLastRecEnd = EndPos;
- …
- return EndPos;
- }
- static void
- ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
- XLogRecPtr *PrevPtr)
- {
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- uint64 startbytepos;
- uint64 endbytepos;
- uint64 prevbytepos;
-
- size = MAXALIGN(size);
-
- /* All (non xlog-switch) records should contain data. */
- Assert(size > SizeOfXLogRecord);
-
- /*
- * 这部分是核心,也是真正串行执行的部分,务必要快
- */
- SpinLockAcquire(&Insert->insertpos_lck);
-
- startbytepos = Insert->CurrBytePos;
- endbytepos = startbytepos + size;
- prevbytepos = Insert->PrevBytePos;
- Insert->CurrBytePos = endbytepos;
- Insert->PrevBytePos = startbytepos;
-
- SpinLockRelease(&Insert->insertpos_lck);
-
- *StartPos = XLogBytePosToRecPtr(startbytepos);
- *EndPos = XLogBytePosToEndRecPtr(endbytepos);
- *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
-
- /*
- * Check that the conversions between "usable byte positions" and
- * XLogRecPtrs work consistently in both directions.
- */
- Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
- Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
- Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
- }
将WAL记录数据复制到WAL Buffer中预留好的空间。
函数参数:
- static void
- CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
- XLogRecPtr StartPos, XLogRecPtr EndPos)
- {
- char *currpos;
- int freespace;
- int written;
- XLogRecPtr CurrPos;
- XLogPageHeader pagehdr;
-
- /*
- * Get a pointer to the right place in the right WAL buffer to start
- * inserting to.复制操作的起点
- */
- CurrPos = StartPos;
- currpos = GetXLogBuffer(CurrPos);
- freespace = INSERT_FREESPACE(CurrPos);
-
- /*
- * there should be enough space for at least the first field (xl_tot_len) on this page.
- */
- Assert(freespace >= sizeof(uint32));
-
- /* Copy record data,核心代码,循环复制rdata数组中每个元素的数据 */
- written = 0;
- while (rdata != NULL)
- {
- char *rdata_data = rdata->data;
- int rdata_len = rdata->len;
-
- /* 用于处理当前需要写入的XLOG长度大于WAL Buffer中当前page的可用空间的情况,此时需要先将XLOG一部分写入当前page,然后再切换到下一个page。 */
- while (rdata_len > freespace)
- {
- /*
- * Write what fits on this page, and continue on the next page.
- */
- Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
- memcpy(currpos, rdata_data, freespace);
- rdata_data += freespace;
- rdata_len -= freespace;
- written += freespace;
- CurrPos += freespace;
-
- /*
- *获取下一个page开头位置的指针,并在页头设置xlp_rem_len
- */
- currpos = GetXLogBuffer(CurrPos);
- pagehdr = (XLogPageHeader) currpos;
- pagehdr->xlp_rem_len = write_len - written;
- pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
-
- /* skip over the page header,跳过页头部分 */
- if (XLogSegmentOffset(CurrPos, wal_segment_size) == 0)
- {
- CurrPos += SizeOfXLogLongPHD;
- currpos += SizeOfXLogLongPHD;
- }
- else
- {
- CurrPos += SizeOfXLogShortPHD;
- currpos += SizeOfXLogShortPHD;
- }
- freespace = INSERT_FREESPACE(CurrPos);
- }
-
- Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
- memcpy(currpos, rdata_data, rdata_len);
- currpos += rdata_len;
- CurrPos += rdata_len;
- freespace -= rdata_len;
- written += rdata_len;
-
- rdata = rdata->next;
- }
- Assert(written == write_len);
- …
- if (CurrPos != EndPos)
- elog(PANIC, "space reserved for WAL record does not match what was written");
- }
参考
《PostgreSQL技术内幕:事务处理深度探索》第4章
https://blog.csdn.net/obvious__/article/details/119242661?spm=1001.2014.3001.5502
https://blog.csdn.net/asmartkiller/article/details/121375548
https://icode.best/i/12479444350651