前篇最后提到,日志注册之后XLOG填充进度为(红色暂无数据、绿色已有数据):
XLogRecord+XLogRecordBlockHeader+RelFileNode+BlockNumber + mainrdata_len(XLogRecordDataHeaderShort 或 XLogRecordDataHeaderLong) +
xl_heap_header(block data) + 实际元组数据 + xl_heap_insert(main data)
日志组装函数XLogRecordAssemble需要负责填充红色部分,并将以上所有数据组装成XLogRecData链表,即完整的XLOG数据。

先不看源码,先看这个组装流程图,对照前面的日志格式。可以看到,它主要对registered_buffers数组中的数据进行二次加工,例如判是否需要做FPW、是否需要压缩页面等,然后对应填入日志头可选值。





代码核心流程图(只保留了组装的过程,去掉了前面一大截检查的部分)

首先看这个函数的返回类型 XLogRecData,这就是上一节注册函数中提到的rdatas中数组(存放WAL日志数据)元素的类型。

- /*
- * Assemble a WAL record from the registered data and buffers into an
- * XLogRecData chain, ready for insertion with XLogInsertRecord().
- * 将registered_buffers数组中注册好的数据组装到XLogRecData链表,为后续XLogInsertRecord函数将日志插入到WAL buffer中做准备
- */
- static XLogRecData *
- XLogRecordAssemble(RmgrId rmid, uint8 info,
- XLogRecPtr RedoRecPtr, bool doPageWrites,
- XLogRecPtr *fpw_lsn, int *num_fpi)
- {
- XLogRecData *rdt; // XLogRecData指针
- uint32 total_len = 0; // XLOG Record大小
- int block_id; // 块id
- pg_crc32c rdata_crc; // CRC
- registered_buffer *prev_regbuf = NULL; // registered_buffer元素指针
- XLogRecData *rdt_datas_last; // 尾指针
- XLogRecord *rechdr; // 记录日志头部的临时缓存地址
- char *scratch = hdr_scratch; // 记录头部的临时缓存地址
-
- /*
- * Note: this function can be called multiple times for the same record.
- * All the modifications we do to the rdata chains below must handle that.
- * 该函数可以对同一条记录多次调用,下面我们对rdata chains做的所有修改都必须处理这种情况
- */
-
- /* The record begins with the fixed-size header,XLOG Record的头部大小是固定的*/
- rechdr = (XLogRecord *) scratch;
-
- /* 因此scratch指针可以直接跳过这一段 */
- scratch += SizeOfXLogRecord;
-
- /* 初始化头部的XLogRecordData数据,rdt_datas_last指向日志数据链尾部,hdr_rdt指向日志数据链的头部 8*/
- hdr_rdt.next = NULL;
- rdt_datas_last = &hdr_rdt;
- hdr_rdt.data = hdr_scratch;

- /*
- * Enforce consistency checks for this record if user is looking for it.
- * Do this before at the beginning of this routine to give the possibility
- * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
- * a record.
- * 如果用户当前正在搜索此记录,则强制对其进行一致性检查
- * 在该处理过程开始前执行此项检查,以便XLogInsert()的调用者可以直接传递XLR_CHECK_CONSISTENCY给XLOG Record
- */
- if (wal_consistency_checking[rmid])
- info |= XLR_CHECK_CONSISTENCY;
-
- /*
- * 逐个处理XLogRegisterBuffer函数注册的各个Block(registerd_buffers数组元素).
- */
- *fpw_lsn = InvalidXLogRecPtr;
- for (block_id = 0; block_id < max_registered_block_id; block_id++)
- {
- registered_buffer *regbuf = ®istered_buffers[block_id];
- bool needs_backup; // 是否做FPW
- bool needs_data;
- XLogRecordBlockHeader bkpb; // 通用的Block的Header信息
- XLogRecordBlockImageHeader bimg; // 如果做FPW,则需要这个Header信息
- XLogRecordBlockCompressHeader cbimg = {0}; // 做FPW且需要页面压缩
- bool samerel; // 日志记录的前一个页面是不是和本日志记录是同一个表的
- bool is_compressed = false; // 页面是否已经压缩
- bool include_image; // 也是FPW的一个标志
-
- if (!regbuf->in_use) // XLogRegisterBuffer注册时会设置这个变量,如果该页没有被注册,直接跳到数组的下一个页
- continue;
-
- /* Determine if this block needs to be backed up,是否需要做FPW,优先根据flag信息判断,否则根据GUC参数和是否处于backup状态判断,最终根据LSN判断 */
- if (regbuf->flags & REGBUF_FORCE_IMAGE)
- needs_backup = true;
- else if (regbuf->flags & REGBUF_NO_IMAGE)
- needs_backup = false;
- else if (!doPageWrites)
- needs_backup = false;
- else
- {
- /*
- * We assume page LSN is first data on *every* page that can be
- * passed to XLogInsert, whether it has the standard page layout
- * or not.
- */
- XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
-
- needs_backup = (page_lsn <= RedoRecPtr);
- if (!needs_backup)
- {
- if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
- *fpw_lsn = page_lsn;
- }
- }
-
- /* Determine if the buffer data needs to included,是否保存页面数据*/
- if (regbuf->rdata_len == 0) // 页面没有数据
- needs_data = false;
- else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0) // 页面明确指出了需要保存数据
- needs_data = true;
- else // 如果没有指定,则根据是否做FPW来决定是否保存数据
- needs_data = !needs_backup;
-
- //组装XLogRecordBlockHeader
- bkpb.id = block_id;
- bkpb.fork_flags = regbuf->forkno;
- bkpb.data_length = 0;
-
- if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
- bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
-
- /*
- * If needs_backup is true or WAL checking is enabled for current
- * resource manager, log a full-page write for the current block.
- * 如果要做FPW,则需要保存页面的备份,如果在回放时要检查日志的一致性,则需要做页面的备份
- */
- include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
-
- if (include_image)
- {
- Page page = regbuf->page;
- uint16 compressed_len = 0;
-
- /*
- * The page needs to be backed up, so calculate its hole length
- * and offset.标准页面中在pd_lower和pd_upper之间会有一个空洞,这部分没有数据,可以考虑裁剪掉,提高存储空间的利用率
- */
- if (regbuf->flags & REGBUF_STANDARD)
- {
- /* Assume we can omit data between pd_lower and pd_upper,如果有空洞,则记录空洞的位置及长度*/
- uint16 lower = ((PageHeader) page)->pd_lower;
- uint16 upper = ((PageHeader) page)->pd_upper;
-
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
- {
- bimg.hole_offset = lower;
- cbimg.hole_length = upper - lower;
- }
- else
- {
- /* No "hole" to remove,没有空洞 */
- bimg.hole_offset = 0;
- cbimg.hole_length = 0;
- }
- }
- else
- {
- /* Not a standard page header, don't try to eliminate "hole",不是标准的页头,不尝试估算空洞 */
- bimg.hole_offset = 0;
- cbimg.hole_length = 0;
- }
-
- /*
- * Try to compress a block image if wal_compression is enabled,如果开启了wal_compression参数,则会对保存进日志记录的数据页面进行压缩
- */
- if (wal_compression)
- {
- is_compressed =
- XLogCompressBackupBlock(page, bimg.hole_offset,
- cbimg.hole_length,
- regbuf->compressed_page,
- &compressed_len);
- }
-
- /*
- * Fill in the remaining fields in the XLogRecordBlockHeader struct
- */
- bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
-
- /* Report a full page image constructed for the WAL record */
- *num_fpi += 1;
-
- /*
- * Construct XLogRecData entries for the page content.当前阶段已经不适宜再调用Register系列函数,所以开始使用registered_buffer里面的临时槽位
- */
- rdt_datas_last->next = ®buf->bkp_rdatas[0];
- rdt_datas_last = rdt_datas_last->next;
-
- bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
-
- /*
- * If WAL consistency checking is enabled for the resource manager
- * of this WAL record, a full-page image is included in the record
- * for the block modified. During redo, the full-page is replayed
- * only if BKPIMAGE_APPLY is set. 如WAL一致性检查已启用,被更新的block已在XLOG Record中包含了FPI.在redo期间,在设置了BKPIMAGE_APPLY标记的情况下full-page才会回放
- */
- if (needs_backup)
- bimg.bimg_info |= BKPIMAGE_APPLY;
-
- if (is_compressed) // 情况1:如果是压缩页面,则空洞信息已经包含在其中
- {
- bimg.length = compressed_len;
- bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
-
- rdt_datas_last->data = regbuf->compressed_page;
- rdt_datas_last->len = compressed_len;
- }

- else
- {
- bimg.length = BLCKSZ - cbimg.hole_length;
-
- if (cbimg.hole_length == 0) // 情况2:如果空洞长度是0,则直接记录整个页面
- {
- rdt_datas_last->data = page;
- rdt_datas_last->len = BLCKSZ;
- }
- else // 情况3:如果未压缩且有空洞,则需要借用registered_buffer里面的两个槽位
- {
- /* must skip the hole */
- rdt_datas_last->data = page;
- rdt_datas_last->len = bimg.hole_offset;
-
- rdt_datas_last->next = ®buf->bkp_rdatas[1];
- rdt_datas_last = rdt_datas_last->next;
-
- rdt_datas_last->data =
- page + (bimg.hole_offset + cbimg.hole_length);
- rdt_datas_last->len =
- BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
- }
- }
-
- total_len += bimg.length;
- }
情况2

情况3

- // 把XLogRegisterBufData注册到registered_buffer中的数据链接进数组中
- // 此操作通常和FPW相反
- // 因为FPW会记录整个页面,所以如果做了FPW通常不会记录日志修改的数据信息
- // 但在有些情况下除外,例如逻辑日志解析可能需要数据信息
- if (needs_data)
- {
- /*
- * Link the caller-supplied rdata chain for this buffer to the
- * overall list.
- */
- bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
- bkpb.data_length = regbuf->rdata_len;
- total_len += regbuf->rdata_len;
-
- rdt_datas_last->next = regbuf->rdata_head;
- rdt_datas_last = regbuf->rdata_tail;
- }
- // 如果连续的两个日志都是同一个表中的日志记录,则可以省略一个filenode的空间
- // 这里做个标记,下面会根据这个标记做对应的操作
- if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
- {
- samerel = true;
- bkpb.fork_flags |= BKPBLOCK_SAME_REL;
- }
- else
- samerel = false;
- prev_regbuf = regbuf;
下面是核心部分,真正开始组装,前面大都是准备工作
- /* Ok, copy the header to the scratch buffer,正式组装,复制多个Block相关的Header到hdr_scratch */
-
- // 1. 复制XLogRecordBlockHeader信息
- memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
- scratch += SizeOfXLogRecordBlockHeader;
- if (include_image)
- {
- // 2. 复制XLogRecordBlockImageHeader信息
- memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
- scratch += SizeOfXLogRecordBlockImageHeader;
- if (cbimg.hole_length != 0 && is_compressed)
- {
- // 3. 复制XLogRecordBlockCompressHeader信息
- memcpy(scratch, &cbimg,
- SizeOfXLogRecordBlockCompressHeader);
- scratch += SizeOfXLogRecordBlockCompressHeader;
- }
- }
- if (!samerel) // 是否可以节省一个filenode空间
- {
- // 4. 复制RelFileNode,这个数据是从之前注册的regbuf->rnode中获取的
- memcpy(scratch, ®buf->rnode, sizeof(RelFileNode));
- scratch += sizeof(RelFileNode);
- }
- // 5. 复制BlockNumber,这部分是必有的
- memcpy(scratch, ®buf->block, sizeof(BlockNumber));
- scratch += sizeof(BlockNumber);
- }
-
- /* followed by the record's origin, if any */
- if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
- replorigin_session_origin != InvalidRepOriginId)
- {
- *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
- memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
- scratch += sizeof(replorigin_session_origin);
- }
-
- /* followed by toplevel XID, if not already included in previous record */
- if (IsSubTransactionAssignmentPending())
- {
- TransactionId xid = GetTopTransactionIdIfAny();
-
- /* update the flag (later used by XLogResetInsertion) */
- XLogSetRecordFlags(XLOG_INCLUDE_XID);
-
- *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
- memcpy(scratch, &xid, sizeof(TransactionId));
- scratch += sizeof(TransactionId);
- }
-
- /* followed by main data, if any,记录maindata的长度,实际上maindata的主要内容会保存在hdr_rdt对应的数据链中*/
-
- // 6. 复制mainrdata_len(short或long)
- if (mainrdata_len > 0)
- {
- if (mainrdata_len > 255)
- {
- *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG; // 用4字节表示长度
- memcpy(scratch, &mainrdata_len, sizeof(uint32));
- scratch += sizeof(uint32);
- }
- else
- {
- *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT; // 用1字节表示长度
- *(scratch++) = (uint8) mainrdata_len;
- }
- rdt_datas_last->next = mainrdata_head;
- rdt_datas_last = mainrdata_last;
- total_len += mainrdata_len;
- }
- rdt_datas_last->next = NULL;
-
- hdr_rdt.len = (scratch - hdr_scratch);
- total_len += hdr_rdt.len;
数据CRC校验
- /*
- * Calculate CRC of the data
- *
- * Note that the record header isn't added into the CRC initially since we
- * don't know the prev-link yet. Thus, the CRC will represent the CRC of
- * the whole record in the order: rdata, then backup blocks, then record
- * header.
- */
- INIT_CRC32C(rdata_crc);
- COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
- for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
- COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
- /*
- * Fill in the fields in the record header. Prev-link is filled in later,
- * once we know where in the WAL the record will be inserted. The CRC does
- * not include the record header yet.
- */
- rechdr->xl_xid = GetCurrentTransactionIdIfAny();
- rechdr->xl_tot_len = total_len;
- rechdr->xl_info = info;
- rechdr->xl_rmid = rmid;
- rechdr->xl_prev = InvalidXLogRecPtr;
- rechdr->xl_crc = rdata_crc;
-
- return &hdr_rdt;
- }
XLogRecordAssemble函数最终返回hdr_rdt,所以我们需要观察函数是如何操作hdr_rdt的。
hdr_rdt将作为链表的链头,所以这里使用rdt_datas_last指针指向链头。
- hdr_rdt.next = NULL; //初始化next的指针
- rdt_datas_last = &hdr_rdt; //指向链头
当前hdr_rdt为链头,所以直接将XLOG头的buffer赋值给data,构建好XLOG头之后,再计算XLOG头的长度。
- hdr_rdt.data = hdr_scratch;
- //中间代码省略
- hdr_rdt.len = (scratch - hdr_scratch);
在注册阶段,我们知道xl_heap_header和元组具体数据都存放在regbuf的XLogRecData链表中,并且xl_heap_header在前、元组具体数据在后(xl_heap_header先注册)。所以直接将regbuf的XLogRecData链表头,添加到hdr_rdt中即可。
- if (needs_data)
- {
- /*
- * Link the caller-supplied rdata chain for this buffer to the
- * overall list.
- */
- bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
- bkpb.data_length = regbuf->rdata_len;
- total_len += regbuf->rdata_len;
-
- //串链
- rdt_datas_last->next = regbuf->rdata_head;
- rdt_datas_last = regbuf->rdata_tail;
- }
在组装mainrdata_len部分
- rdt_datas_last->next = mainrdata_head;
- rdt_datas_last = mainrdata_last;
- total_len += mainrdata_len;
参考
PostgreSQL数据库WAL——日志合成XLogRecordAssemble_肥叔菌的博客-CSDN博客
PostgreSQL预写式日志的内核实现详解-wal记录写入 - 知乎
https://www.geek-share.com/detail/2799289354.html
PostgreSQL Source Code: XLogRecord Struct Reference
Postgresql源碼(21)update生成XLOG過程&內容解析
PostgreSQL Source Code: XLogRecord Struct Reference
https://www.jianshu.com/p/2c6c29a01eda