• postgresql源码学习(23)—— 故障恢复④-事务日志的组装


    一、 日志组装简介

    前篇最后提到,日志注册之后XLOG填充进度为(红色暂无数据、绿色已有数据):

    XLogRecord+XLogRecordBlockHeader+RelFileNode+BlockNumber + mainrdata_len(XLogRecordDataHeaderShort 或 XLogRecordDataHeaderLong) +

    xl_heap_header(block data) + 实际元组数据 + xl_heap_insert(main data)

            日志组装函数XLogRecordAssemble需要负责填充红色部分,并将以上所有数据组装成XLogRecData链表,即完整的XLOG数据。

    https://img-blog.csdnimg.cn/cd8b8e5fb8df4820936f3b0810b0d213.png

            先不看源码,先看这个组装流程图,对照前面的日志格式。可以看到,它主要对registered_buffers数组中的数据进行二次加工,例如判是否需要做FPW、是否需要压缩页面等,然后对应填入日志头可选值。

    代码核心流程图(只保留了组装的过程,去掉了前面一大截检查的部分)

     

    二、 XLogRecordAssemble 函数源码学习

           首先看这个函数的返回类型 XLogRecData,这就是上一节注册函数中提到的rdatas中数组(存放WAL日志数据)元素的类型。

    1. /*
    2. * Assemble a WAL record from the registered data and buffers into an
    3. * XLogRecData chain, ready for insertion with XLogInsertRecord().
    4. * 将registered_buffers数组中注册好的数据组装到XLogRecData链表,为后续XLogInsertRecord函数将日志插入到WAL buffer中做准备
    5. */
    6. static XLogRecData *
    7. XLogRecordAssemble(RmgrId rmid, uint8 info,
    8. XLogRecPtr RedoRecPtr, bool doPageWrites,
    9. XLogRecPtr *fpw_lsn, int *num_fpi)
    10. {
    11. XLogRecData *rdt; // XLogRecData指针
    12. uint32 total_len = 0; // XLOG Record大小
    13. int block_id; // 块id
    14. pg_crc32c rdata_crc; // CRC
    15. registered_buffer *prev_regbuf = NULL; // registered_buffer元素指针
    16. XLogRecData *rdt_datas_last; // 尾指针
    17. XLogRecord *rechdr; // 记录日志头部的临时缓存地址
    18. char *scratch = hdr_scratch; // 记录头部的临时缓存地址
    19. /*
    20. * Note: this function can be called multiple times for the same record.
    21. * All the modifications we do to the rdata chains below must handle that.
    22. * 该函数可以对同一条记录多次调用,下面我们对rdata chains做的所有修改都必须处理这种情况
    23. */
    24. /* The record begins with the fixed-size header,XLOG Record的头部大小是固定的*/
    25. rechdr = (XLogRecord *) scratch;
    26. /* 因此scratch指针可以直接跳过这一段 */
    27. scratch += SizeOfXLogRecord;
    28. /* 初始化头部的XLogRecordData数据,rdt_datas_last指向日志数据链尾部,hdr_rdt指向日志数据链的头部 8*/
    29. hdr_rdt.next = NULL;
    30. rdt_datas_last = &hdr_rdt;
    31. hdr_rdt.data = hdr_scratch;

    1. /*
    2. * Enforce consistency checks for this record if user is looking for it.
    3. * Do this before at the beginning of this routine to give the possibility
    4. * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
    5. * a record.
    6. * 如果用户当前正在搜索此记录,则强制对其进行一致性检查
    7. * 在该处理过程开始前执行此项检查,以便XLogInsert()的调用者可以直接传递XLR_CHECK_CONSISTENCY给XLOG Record
    8. */
    9. if (wal_consistency_checking[rmid])
    10. info |= XLR_CHECK_CONSISTENCY;
    11. /*
    12. * 逐个处理XLogRegisterBuffer函数注册的各个Block(registerd_buffers数组元素).
    13. */
    14. *fpw_lsn = InvalidXLogRecPtr;
    15. for (block_id = 0; block_id < max_registered_block_id; block_id++)
    16. {
    17. registered_buffer *regbuf = &registered_buffers[block_id];
    18. bool needs_backup; // 是否做FPW
    19. bool needs_data;
    20. XLogRecordBlockHeader bkpb; // 通用的Block的Header信息
    21. XLogRecordBlockImageHeader bimg; // 如果做FPW,则需要这个Header信息
    22. XLogRecordBlockCompressHeader cbimg = {0}; // 做FPW且需要页面压缩
    23. bool samerel; // 日志记录的前一个页面是不是和本日志记录是同一个表的
    24. bool is_compressed = false; // 页面是否已经压缩
    25. bool include_image; // 也是FPW的一个标志
    26. if (!regbuf->in_use) // XLogRegisterBuffer注册时会设置这个变量,如果该页没有被注册,直接跳到数组的下一个页
    27. continue;
    28. /* Determine if this block needs to be backed up,是否需要做FPW,优先根据flag信息判断,否则根据GUC参数和是否处于backup状态判断,最终根据LSN判断 */
    29. if (regbuf->flags & REGBUF_FORCE_IMAGE)
    30. needs_backup = true;
    31. else if (regbuf->flags & REGBUF_NO_IMAGE)
    32. needs_backup = false;
    33. else if (!doPageWrites)
    34. needs_backup = false;
    35. else
    36. {
    37. /*
    38. * We assume page LSN is first data on *every* page that can be
    39. * passed to XLogInsert, whether it has the standard page layout
    40. * or not.
    41. */
    42. XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
    43. needs_backup = (page_lsn <= RedoRecPtr);
    44. if (!needs_backup)
    45. {
    46. if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
    47. *fpw_lsn = page_lsn;
    48. }
    49. }
    50. /* Determine if the buffer data needs to included,是否保存页面数据*/
    51. if (regbuf->rdata_len == 0) // 页面没有数据
    52. needs_data = false;
    53. else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0) // 页面明确指出了需要保存数据
    54. needs_data = true;
    55. else // 如果没有指定,则根据是否做FPW来决定是否保存数据
    56. needs_data = !needs_backup;
    57. //组装XLogRecordBlockHeader
    58. bkpb.id = block_id;
    59. bkpb.fork_flags = regbuf->forkno;
    60. bkpb.data_length = 0;
    61. if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
    62. bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
    63. /*
    64. * If needs_backup is true or WAL checking is enabled for current
    65. * resource manager, log a full-page write for the current block.
    66. * 如果要做FPW,则需要保存页面的备份,如果在回放时要检查日志的一致性,则需要做页面的备份
    67. */
    68. include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
    69. if (include_image)
    70. {
    71. Page page = regbuf->page;
    72. uint16 compressed_len = 0;
    73. /*
    74. * The page needs to be backed up, so calculate its hole length
    75. * and offset.标准页面中在pd_lower和pd_upper之间会有一个空洞,这部分没有数据,可以考虑裁剪掉,提高存储空间的利用率
    76. */
    77. if (regbuf->flags & REGBUF_STANDARD)
    78. {
    79. /* Assume we can omit data between pd_lower and pd_upper,如果有空洞,则记录空洞的位置及长度*/
    80. uint16 lower = ((PageHeader) page)->pd_lower;
    81. uint16 upper = ((PageHeader) page)->pd_upper;
    82. if (lower >= SizeOfPageHeaderData &&
    83. upper > lower &&
    84. upper <= BLCKSZ)
    85. {
    86. bimg.hole_offset = lower;
    87. cbimg.hole_length = upper - lower;
    88. }
    89. else
    90. {
    91. /* No "hole" to remove,没有空洞 */
    92. bimg.hole_offset = 0;
    93. cbimg.hole_length = 0;
    94. }
    95. }
    96. else
    97. {
    98. /* Not a standard page header, don't try to eliminate "hole",不是标准的页头,不尝试估算空洞 */
    99. bimg.hole_offset = 0;
    100. cbimg.hole_length = 0;
    101. }
    102. /*
    103. * Try to compress a block image if wal_compression is enabled,如果开启了wal_compression参数,则会对保存进日志记录的数据页面进行压缩
    104. */
    105. if (wal_compression)
    106. {
    107. is_compressed =
    108. XLogCompressBackupBlock(page, bimg.hole_offset,
    109. cbimg.hole_length,
    110. regbuf->compressed_page,
    111. &compressed_len);
    112. }
    113. /*
    114. * Fill in the remaining fields in the XLogRecordBlockHeader struct
    115. */
    116. bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
    117. /* Report a full page image constructed for the WAL record */
    118. *num_fpi += 1;
    119. /*
    120. * Construct XLogRecData entries for the page content.当前阶段已经不适宜再调用Register系列函数,所以开始使用registered_buffer里面的临时槽位
    121. */
    122. rdt_datas_last->next = &regbuf->bkp_rdatas[0];
    123. rdt_datas_last = rdt_datas_last->next;
    124. bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
    125. /*
    126. * If WAL consistency checking is enabled for the resource manager
    127. * of this WAL record, a full-page image is included in the record
    128. * for the block modified. During redo, the full-page is replayed
    129. * only if BKPIMAGE_APPLY is set. 如WAL一致性检查已启用,被更新的block已在XLOG Record中包含了FPI.在redo期间,在设置了BKPIMAGE_APPLY标记的情况下full-page才会回放
    130. */
    131. if (needs_backup)
    132. bimg.bimg_info |= BKPIMAGE_APPLY;
    133. if (is_compressed) // 情况1:如果是压缩页面,则空洞信息已经包含在其中
    134. {
    135. bimg.length = compressed_len;
    136. bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
    137. rdt_datas_last->data = regbuf->compressed_page;
    138. rdt_datas_last->len = compressed_len;
    139. }

    1. else
    2. {
    3. bimg.length = BLCKSZ - cbimg.hole_length;
    4. if (cbimg.hole_length == 0) // 情况2:如果空洞长度是0,则直接记录整个页面
    5. {
    6. rdt_datas_last->data = page;
    7. rdt_datas_last->len = BLCKSZ;
    8. }
    9. else // 情况3:如果未压缩且有空洞,则需要借用registered_buffer里面的两个槽位
    10. {
    11. /* must skip the hole */
    12. rdt_datas_last->data = page;
    13. rdt_datas_last->len = bimg.hole_offset;
    14. rdt_datas_last->next = &regbuf->bkp_rdatas[1];
    15. rdt_datas_last = rdt_datas_last->next;
    16. rdt_datas_last->data =
    17. page + (bimg.hole_offset + cbimg.hole_length);
    18. rdt_datas_last->len =
    19. BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
    20. }
    21. }
    22. total_len += bimg.length;
    23. }

    情况2

    情况3

    1. // 把XLogRegisterBufData注册到registered_buffer中的数据链接进数组中
    2. // 此操作通常和FPW相反
    3. // 因为FPW会记录整个页面,所以如果做了FPW通常不会记录日志修改的数据信息
    4. // 但在有些情况下除外,例如逻辑日志解析可能需要数据信息
    5. if (needs_data)
    6. {
    7. /*
    8. * Link the caller-supplied rdata chain for this buffer to the
    9. * overall list.
    10. */
    11. bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
    12. bkpb.data_length = regbuf->rdata_len;
    13. total_len += regbuf->rdata_len;
    14. rdt_datas_last->next = regbuf->rdata_head;
    15. rdt_datas_last = regbuf->rdata_tail;
    16. }
    17. // 如果连续的两个日志都是同一个表中的日志记录,则可以省略一个filenode的空间
    18. // 这里做个标记,下面会根据这个标记做对应的操作
    19. if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
    20. {
    21. samerel = true;
    22. bkpb.fork_flags |= BKPBLOCK_SAME_REL;
    23. }
    24. else
    25. samerel = false;
    26. prev_regbuf = regbuf;

    下面是核心部分,真正开始组装,前面大都是准备工作

    1. /* Ok, copy the header to the scratch buffer,正式组装,复制多个Block相关的Header到hdr_scratch */
    2. // 1. 复制XLogRecordBlockHeader信息
    3. memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
    4. scratch += SizeOfXLogRecordBlockHeader;
    5. if (include_image)
    6. {
    7. // 2. 复制XLogRecordBlockImageHeader信息
    8. memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
    9. scratch += SizeOfXLogRecordBlockImageHeader;
    10. if (cbimg.hole_length != 0 && is_compressed)
    11. {
    12. // 3. 复制XLogRecordBlockCompressHeader信息
    13. memcpy(scratch, &cbimg,
    14. SizeOfXLogRecordBlockCompressHeader);
    15. scratch += SizeOfXLogRecordBlockCompressHeader;
    16. }
    17. }
    18. if (!samerel) // 是否可以节省一个filenode空间
    19. {
    20. // 4. 复制RelFileNode,这个数据是从之前注册的regbuf->rnode中获取的
    21. memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
    22. scratch += sizeof(RelFileNode);
    23. }
    24. // 5. 复制BlockNumber,这部分是必有的
    25. memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
    26. scratch += sizeof(BlockNumber);
    27. }
    28. /* followed by the record's origin, if any */
    29. if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
    30. replorigin_session_origin != InvalidRepOriginId)
    31. {
    32. *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
    33. memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
    34. scratch += sizeof(replorigin_session_origin);
    35. }
    36. /* followed by toplevel XID, if not already included in previous record */
    37. if (IsSubTransactionAssignmentPending())
    38. {
    39. TransactionId xid = GetTopTransactionIdIfAny();
    40. /* update the flag (later used by XLogResetInsertion) */
    41. XLogSetRecordFlags(XLOG_INCLUDE_XID);
    42. *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
    43. memcpy(scratch, &xid, sizeof(TransactionId));
    44. scratch += sizeof(TransactionId);
    45. }
    46. /* followed by main data, if any,记录maindata的长度,实际上maindata的主要内容会保存在hdr_rdt对应的数据链中*/
    47. // 6. 复制mainrdata_len(short或long)
    48. if (mainrdata_len > 0)
    49. {
    50. if (mainrdata_len > 255)
    51. {
    52. *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG; // 用4字节表示长度
    53. memcpy(scratch, &mainrdata_len, sizeof(uint32));
    54. scratch += sizeof(uint32);
    55. }
    56. else
    57. {
    58. *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT; // 用1字节表示长度
    59. *(scratch++) = (uint8) mainrdata_len;
    60. }
    61. rdt_datas_last->next = mainrdata_head;
    62. rdt_datas_last = mainrdata_last;
    63. total_len += mainrdata_len;
    64. }
    65. rdt_datas_last->next = NULL;
    66. hdr_rdt.len = (scratch - hdr_scratch);
    67. total_len += hdr_rdt.len;

    数据CRC校验  

    1. /*
    2. * Calculate CRC of the data
    3. *
    4. * Note that the record header isn't added into the CRC initially since we
    5. * don't know the prev-link yet. Thus, the CRC will represent the CRC of
    6. * the whole record in the order: rdata, then backup blocks, then record
    7. * header.
    8. */
    9. INIT_CRC32C(rdata_crc);
    10. COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
    11. for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
    12. COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
    13. /*
    14. * Fill in the fields in the record header. Prev-link is filled in later,
    15. * once we know where in the WAL the record will be inserted. The CRC does
    16. * not include the record header yet.
    17. */
    18. rechdr->xl_xid = GetCurrentTransactionIdIfAny();
    19. rechdr->xl_tot_len = total_len;
    20. rechdr->xl_info = info;
    21. rechdr->xl_rmid = rmid;
    22. rechdr->xl_prev = InvalidXLogRecPtr;
    23. rechdr->xl_crc = rdata_crc;
    24. return &hdr_rdt;
    25. }


    三、 如何将XLOG各部分串链

           XLogRecordAssemble函数最终返回hdr_rdt,所以我们需要观察函数是如何操作hdr_rdt的。

    1. 初始化

    hdr_rdt将作为链表的链头,所以这里使用rdt_datas_last指针指向链头。

    1. hdr_rdt.next = NULL;            //初始化next的指针
    2. rdt_datas_last = &hdr_rdt;        //指向链头

    2. XLOG头加入链表

            当前hdr_rdt为链头,所以直接将XLOG头的buffer赋值给data,构建好XLOG头之后,再计算XLOG头的长度。

    1. hdr_rdt.data = hdr_scratch;
    2. //中间代码省略
    3. hdr_rdt.len = (scratch - hdr_scratch);

    3. xl_heap_header、元组具体数据加入链表

            在注册阶段,我们知道xl_heap_header和元组具体数据都存放在regbuf的XLogRecData链表中,并且xl_heap_header在前、元组具体数据在后(xl_heap_header先注册)。所以直接将regbuf的XLogRecData链表头,添加到hdr_rdt中即可。

    1. if (needs_data)
    2. {
    3.     /*
    4.      * Link the caller-supplied rdata chain for this buffer to the
    5.      * overall list.
    6.      */
    7.     bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
    8.     bkpb.data_length = regbuf->rdata_len;
    9.     total_len += regbuf->rdata_len;
    10.     //串链
    11.     rdt_datas_last->next = regbuf->rdata_head;
    12.     rdt_datas_last = regbuf->rdata_tail;
    13. }

    4. xl_heap_insert加入链表

    在组装mainrdata_len部分

    1. rdt_datas_last->next = mainrdata_head;
    2. rdt_datas_last = mainrdata_last;
    3. total_len += mainrdata_len;

    参考

    PostgreSQL数据库WAL——日志合成XLogRecordAssemble_肥叔菌的博客-CSDN博客

    PostgreSQL预写式日志的内核实现详解-wal记录写入 - 知乎

    https://www.geek-share.com/detail/2799289354.html

    PostgreSQL Source Code: XLogRecord Struct Reference

    Postgresql源碼(21)update生成XLOG過程&內容解析

    PostgreSQL Source Code: XLogRecord Struct Reference

    https://www.jianshu.com/p/2c6c29a01eda

    PostgreSQL重启恢复---XLOG 1.0_obvious__的博客-CSDN博客

  • 相关阅读:
    03-vue-cli-项目创建
    全行为路径分析模型
    Unity中浮力与水物理的完整指南:基于C#和现有物理引擎的简单实现
    Java项目硅谷课堂学习笔记-P4前端基础知识2
    二叉查找树(二)- C++实现
    深度剖析 ZooKeeper 核心原理
    大数据-之LibrA数据库系统告警处理(ALM-25005 Nscd服务异常)
    模块化设计瞎谈
    数据库设计
    【算法题解】2022河南萌新联赛第(四)场:郑州轻工业大学
  • 原文地址:https://blog.csdn.net/Hehuyi_In/article/details/125431559