• 【 OpenGauss源码学习 —— 列存储(CUStorage)】


    声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
    本文主要参考了 OpenGauss1.1.0 的开源代码和《OpenGauss数据库源码解析》一书以及OpenGauss社区学习文档和一些参考资料

    概述

      在学习完 CUCStore 后,我们紧接着来了解一下 CUStorage 类。通常,CStore 类作为整个列式存储引擎的核心,通过管理 CU 和 CUDesc 来实现对列式存储数据的存储检索和操作CUStorage 类可能提供了底层的物理存储和读写操作
      CUStorage 类是数据库内核中与列存储Column Store)相关的实现之一。以下是该类的主要职责和功能:

    1. 存储管理: 通过 SaveCUOverwriteCU 方法,将列存储CUCompression Unit)数据写入存储文件。可以选择直接写入用于扩展
    2. 加载数据: 通过 LoadCURemoteLoadCU 方法,从存储中加载列存储数据。可以选择直接加载用于缓存
    3. 文件操作: 包括创建文件打开文件关闭文件等操作,通过 CreateFileOpenFileCloseFile私有成员函数实现。
    4. 空间分配: 使用 CStoreFreeSpace 类来管理存储中的空闲空间,通过 InitCstoreFreeSpace 和其他相关方法初始化管理
    5. 文件信息获取: 通过 GetFileNameGetBcmFileNameIsDataFileExistIsBcmFileExist 等方法获取列文件的名称和相关信息
    6. 其他功能: 包括分配空间刷新文件获取文件描述符设置分配策略等。

      以下为 CUStorage 类的函数源码:(路径:src/include/storage/custorage.h

    class CUStorage : public BaseObject {
    public:
        // 构造函数,初始化 CUStorage 对象
        CUStorage(const CFileNode& cFileNode, CStoreAllocateStrategy strategy = APPEND_ONLY);
    
        // 析构函数,释放 CUStorage 对象占用的资源
        virtual ~CUStorage();
    
        // 销毁函数,用于在析构对象时执行额外的清理操作
        virtual void Destroy();
    
        // 声明 CUFile 为友元类,使其可以访问 CUStorage 的私有成员
        friend class CUFile;
    
        // 将 CU 数据写入存储
        void SaveCU(_in_ char* write_buf, _in_ uint64 offset, _in_ int size, bool direct_flag, bool for_extension = false);
    
        // 重写 CU 数据到存储,通常用于扩展操作
        void OverwriteCU(
            _in_ char* write_buf, _in_ uint64 offset, _in_ int size, bool direct_flag, bool for_extension = false);
    
        // 从存储加载 CU 数据
        void LoadCU(_in_ CU* cuPtr, _in_ uint64 offset, _in_ int size, bool direct_flag, bool inCUCache);
    
        // 远程加载 CU 数据,通常用于分布式存储
        void RemoteLoadCU(_in_ CU* cuPtr, _in_ uint64 offset, _in_ int size, bool direct_flag, bool inCUCache);
    
        // 从文件加载数据到缓冲区
        void Load(_in_ uint64 offset, _in_ int size, __inout char* outbuf, bool direct_flag);
    
        // 在写时加载数据到缓冲区
        int WSLoad(_in_ uint64 offset, _in_ int size, __inout char* outbuf, bool direct_flag);
    
        // 获取文件名
        void GetFileName(_out_ char* fileName, _in_ const size_t capacity, _in_ const int fileId) const;
    
        // 检查数据文件是否存在
        bool IsDataFileExist(int fileId) const;
    
        // 获取 BCM 文件名
        void GetBcmFileName(_out_ char* bcmfile, _in_ int fileId) const;
    
        // 检查 BCM 文件是否存在
        bool IsBcmFileExist(_in_ int fileId) const;
    
        // 获取列文件名
        const char* GetColumnFileName() const;
    
        // 分配指定大小的空间
        uint64 AllocSpace(_in_ int size);
    
        // 刷新数据文件
        void FlushDataFile() const;
    
        // 设置分配策略
        void SetAllocateStrategy(CStoreAllocateStrategy strategy)
        {
            m_strategy = strategy;
        };
    
        // 设置自由空间管理器
        void SetFreeSpace(CStoreFreeSpace* fspace)
        {
            Assert(fspace != NULL);
            m_freespace = fspace;
        };
    
        // 获取自由空间管理器
        FORCE_INLINE CStoreFreeSpace* GetFreeSpace()
        {
            return m_freespace;
        };
    
        // 创建存储
        void CreateStorage(int fileId, bool isRedo) const;
    
        // 获取 CU 文件的文件描述符
        File GetCUFileFd(uint64 offset);
    
        // 获取 CU 在文件中的偏移量
        uint64 GetCUOffsetInFile(uint64 offset) const;
    
        // 检查 CU 是否存储在同一文件中
        bool IsCUStoreInOneFile(uint64 offset, int size) const;
    
        // 获取对齐的 CU 偏移量
        uint64 GetAlignCUOffset(uint64 offset) const;
    
        // 获取对齐的 CU 大小
        int GetAlignCUSize(int size) const;
    
        // 快速扩展文件大小
        void FastExtendFile(uint64 extend_offset, uint32 size, bool keep_size);
    
        // 截断数据文件
        void TruncateDataFile();
    
        // 截断 BCM 文件
        void TruncateBcmFile();
    
        // 设置是否启用 2 字节对齐
        void Set2ByteAlign(bool is_2byte_align);
    
        // 检查是否启用 2 字节对齐
        bool Is2ByteAlign();
    
    private:
        // 初始化文件名前缀
        void InitFileNamePrefix(_in_ const CFileNode& cFileNode);
    
        // 创建文件
        File CreateFile(_in_ char* file_name, _in_ int fileId, bool isRedo) const;
    
        // 打开文件
        File OpenFile(_in_ char* file_name, _in_ int fileId, bool direct_flag);
    
        // 在写时打开文件
        File WSOpenFile(_in_ char* file_name, _in_ int fileId, bool direct_flag);
    
        // 初始化自由空间管理器
        void InitCstoreFreeSpace(CStoreAllocateStrategy strategy);
    
        // 关闭文件
        void CloseFile(_in_ File fd) const;
    
    public:
        // 存储文件节点信息
        CFileNode m_cnode;
    
    private:
        // 列文件名的通用前缀
        char m_fileNamePrefix[MAXPGPATH];
    
        // 列文件名
        char m_fileName[MAXPGPATH];
    
        // 自由空间管理器
        CStoreFreeSpace* m_freespace;
    
        // 当前读写文件描述符
        File m_fd;
    
        // 分配策略:追加、重用
        CStoreAllocateStrategy m_strategy;
    
        // 追加模式标志
        bool append_only;
    
        // 是否启用 2 字节对齐
        bool is_2byte_align;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151

      本文将首先围绕 CUStorage 类中的部分成员函数展开学习。

    CUStorage::SaveCU 函数

      CUStorage::SaveCU 函数的主要作用是将指定的 CU 数据保存到存储中支持大文件存储,通过循环写入实现对超过单个文件大小限制CU 数据的存储。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    /*
     * 将 CU 数据保存到存储中,支持大文件存储。
     * 参数:
     *   - write_buf: 待写入的数据缓冲区指针
     *   - offset: 写入的起始偏移量
     *   - size: 待写入的数据大小
     *   - direct_flag: 是否使用直接 I/O,通常用于绕过系统缓存
     *   - for_extension: 是否为扩展操作,用于判断是否刷新数据文件
     */
    void CUStorage::SaveCU(char* write_buf, _in_ uint64 offset, _in_ int size, bool direct_flag, bool for_extension)
    {
        // 根据偏移量计算写入的文件 ID 和在文件内的偏移量
        int writeFileId = offset / MAX_FILE_SIZE;
        uint64 writeOffset = offset % MAX_FILE_SIZE;
        // 计算当前写入的大小,不超过文件剩余大小
        int write_size = std::min(size, (int)(MAX_FILE_SIZE - writeOffset));
        int left_size = size - write_size;
        // 获取表空间 OID
        Oid tableSpaceOid = m_cnode.m_rnode.spcNode;
        // 临时文件名缓冲区
        char tmpFileName[MAXPGPATH] = {0};
        errno_t rc = 0;
    
        // 如果是追加模式,检查表空间使用是否超过最大值
        if (append_only)
            TableSpaceUsageManager::IsExceedMaxsize(tableSpaceOid, size);
    
        // 循环写入数据,直至全部数据写入完成
        while (write_size > 0) {
            // 获取当前写入的文件名
            GetFileName(tmpFileName, MAXPGPATH, writeFileId);
            // 如果文件名发生变化,关闭之前的文件并打开新的文件
            if (strcmp(tmpFileName, m_fileName) != 0) {
                if (m_fd != FILE_INVALID) {
                    /*
                     * 如果切换数据文件,刷新数据。在文件扩展期间不执行此操作,因为很快就会 fsync 实际数据。
                     */
                    if (!for_extension)
                        FlushDataFile();
                    FileClose(m_fd);
                }
    
                // 打开新文件,并更新当前文件名
                m_fd = OpenFile(tmpFileName, writeFileId, direct_flag);
                Assert(m_fd != FILE_INVALID);
                rc = strcpy_s(m_fileName, MAXPGPATH, tmpFileName);
                securec_check_c(rc, "\0", "\0");
            }
            Assert(m_fd != FILE_INVALID);
    
            // 将数据写入文件
            int writtenBytes = FilePWrite(m_fd, write_buf, write_size, writeOffset);
            // 检查写入是否成功
            if (writtenBytes != write_size) {
                int align_size = is_2byte_align ? ALIGNOF_TIMESERIES_CUSIZE : ALIGNOF_CUSIZE;
                // 报告 I/O 错误
                SaveCUReportIOError(tmpFileName, writeOffset, writtenBytes, write_size, size, align_size);
            }
    
            // 更新文件 ID、偏移量和数据缓冲区指针
            ++writeFileId;
            writeOffset = 0;
            write_buf += write_size;
            // 计算下一轮写入的大小
            write_size = (((unsigned int)left_size > MAX_FILE_SIZE) ? MAX_FILE_SIZE : left_size);
            left_size -= write_size;
        }
    
        // 检查是否所有数据均已写入
        if (left_size != 0) {
            ereport(ERROR, (errcode_for_file_access(),
                            errmsg("write file  \"%s\" failed in savecu!", tmpFileName)));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    CUStorage::GetFileName 函数

      CUStorage::GetFileName 函数用于获取 CU 文件的文件名。它将文件名构造为以给定前缀 m_fileNamePrefix 开头,后跟文件 ID 的形式。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    void CUStorage::GetFileName(_out_ char* fileName, _in_ const size_t capacity, _in_ const int fileId) const
    {
        Assert(fileId >= 0);  // 断言文件ID应为非负数
    
        // 表示一个关系的一个列的CU文件。
        // 与bcm文件名不同,其文件列表为:
        //   16385_C1.0 16385_C1.1 16385_C1.2 ...
        int rc = snprintf_s(fileName, capacity, capacity - 1, "%s.%d", m_fileNamePrefix, fileId);
        securec_check_ss(rc, "", "");
        fileName[capacity - 1] = '\0';  // 确保文件名以 null 结尾
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    CUStorage::OverwriteCU 函数

      此函数的主要作用是远程读取 CU 数据后,需要覆盖本地 CU 数据。它通过循环写入实现对超过单个文件大小限制的 CU 数据的覆盖。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    /*
     * 在远程读取 CU 数据后,需要覆盖本地 CU 数据。
     * 参数:
     *   - write_buf: 待写入的数据缓冲区指针
     *   - offset: 写入的起始偏移量
     *   - size: 待写入的数据大小
     *   - direct_flag: 是否使用直接 I/O,通常用于绕过系统缓存
     *   - for_extension: 是否为扩展操作,用于判断是否刷新数据文件
     */
    void CUStorage::OverwriteCU(
        _in_ char* write_buf, _in_ uint64 offset, _in_ int size, bool direct_flag, bool for_extension)
    {
        // 检查偏移量和大小
        int writeFileId = offset / MAX_FILE_SIZE;
        uint64 writeOffset = offset % MAX_FILE_SIZE;
        // 计算当前写入的大小,不超过文件剩余大小
        int write_size = std::min(size, (int)(MAX_FILE_SIZE - writeOffset));
        int left_size = size - write_size;
        // 临时文件名缓冲区
        char tmpFileName[MAXPGPATH] = {0};
        errno_t rc = 0;
    
        // 覆盖 CU,不增加最大大小
        while (write_size > 0) {
            // 获取当前写入的文件名
            GetFileName(tmpFileName, MAXPGPATH, writeFileId);
            // 如果文件名发生变化,关闭之前的文件并打开新的文件
            if (strcmp(tmpFileName, m_fileName) != 0) {
                if (m_fd != FILE_INVALID) {
                    /*
                     * 如果切换数据文件,刷新数据。在文件扩展期间不执行此操作,因为很快就会 fsync 实际数据。
                     */
                    if (!for_extension)
                        FlushDataFile();
                    FileClose(m_fd);
                }
    
                // 打开新文件,并更新当前文件名
                m_fd = OpenFile(tmpFileName, writeFileId, direct_flag);
                Assert(m_fd != FILE_INVALID);
                rc = strcpy_s(m_fileName, MAXPGPATH, tmpFileName);
                securec_check(rc, "\0", "\0");
            }
            // 如果文件句柄无效,报告文件访问错误
            if (m_fd == FILE_INVALID) {
                ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", tmpFileName)));
            }
    
            // 将数据写入文件
            int nbytes = 0;
            if ((nbytes = FilePWrite(m_fd, write_buf, write_size, writeOffset)) != write_size) {
                // 仅报告警告,不中断执行
                ereport(WARNING,
                        (errcode_for_file_access(),
                         errmsg("Overwrite CU failed. file \"%s\" , offset(%lu), size(%d), expect_write_size(%d), "
                                "acture_write_size(%d): %m",
                                tmpFileName,
                                writeOffset,
                                size,
                                write_size,
                                nbytes),
                         handle_in_client(true)));
            }
    
            // 更新文件 ID、偏移量和数据缓冲区指针
            ++writeFileId;
            writeOffset = 0;
            write_buf += write_size;
            // 计算下一轮写入的大小
            write_size = (((unsigned int)left_size > MAX_FILE_SIZE) ? MAX_FILE_SIZE : left_size);
            left_size -= write_size;
        }
    
        // 检查是否所有数据均已写入
        if (left_size != 0) {
            ereport(ERROR, (errcode_for_file_access(),
                            errmsg("write file \"%s\" failed in OverwriteCU!", tmpFileName)));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    注释:远程读取 CU 数据” 通常指的是从一个远程的存储位置或节点上获取 CUColumn Unit,列存储中的基本数据单元)的数据。在数据库系统中,特别是在分布式集群环境中,可能存在将数据存储在不同的节点上的情况。当需要在一个节点上执行操作,但数据实际存储在另一个节点上时,就需要进行远程读取

    CUStorage::OverwriteCU 函数

      CUStorage::OverwriteCU 函数主要目的是CU 文件中加载 CU 数据,并将其存储在相应的数据结构中,以便后续使用。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    /*
     * @Description: 从 CU 文件中加载 CU 数据
     * @Param[IN/OUT] cuPtr: 待加载数据的 CU 对象
     * @Param[IN] direct_flag: 如果启用 ADIO 特性,使用 DIO(Direct I/O)
     * @Param[IN] inCUCache: 指示 cuPtr 是否在 CU 缓存中
     * @Param[IN] offset: CU 数据在逻辑文件中的逻辑偏移量
     * @Param[IN] size: CU 数据的大小
     * @See also: 有关更多信息,请参阅...
     */
    void CUStorage::LoadCU(_in_ CU* cuPtr, _in_ uint64 offset, _in_ int size, bool direct_flag, bool inCUCache)
    {
        // 检查参数的有效性
        if (size < 0 || (uint64)size > MAX_FILE_SIZE) {
            ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
                            errmsg("CUStorage::LoadCU 中的无效大小(%u)", size)));
        }
    
        // 如果 size 为 0,则直接报错并返回
        if (size == 0) {
            cuPtr->m_compressedBufSize = 0;
            ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
                            errmsg("CUStorage::LoadCU 中的大小为 0")));
            return;
        }
    
        uint64 load_offset;
        int load_size;
    
        // 计算对齐后的偏移量和大小
        load_offset = GetAlignCUOffset(offset);
        cuPtr->m_head_padding_size = offset - load_offset;
        load_size = GetAlignCUSize(cuPtr->m_head_padding_size + size);
    
        // 分配加载缓冲区内存,并加载数据
        // 注意:为了避免在 readData 函数中越界读取,多分配 8 字节内存。
        cuPtr->m_compressedLoadBuf = (char*)CStoreMemAlloc::Palloc(load_size + 8, !inCUCache);
        Load(load_offset, load_size, cuPtr->m_compressedLoadBuf, direct_flag);
    
        // 设置 CU 数据的指针和大小
        cuPtr->m_compressedBuf = cuPtr->m_compressedLoadBuf + cuPtr->m_head_padding_size;
        cuPtr->SetCUSize(size);
        cuPtr->m_compressedBufSize = size;
        cuPtr->m_cache_compressed = true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    CUStorage::RemoteLoadCU 函数

      CUStorage::RemoteLoadCU 函数的主要目的是远程节点加载 CU 数据,并将其存储在相应的数据结构中,以供后续使用。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    /*
     * @Description: 从远程节点加载 CU 数据
     * @Param[IN] cuPtr: 待加载数据的 CU 对象
     * @Param[IN] direct_flag: 如果启用 ADIO 特性,使用 DIO(Direct I/O)
     * @Param[IN] inCUCache: 指示 cuPtr 是否在 CU 缓存中
     * @Param[IN] offset: CU 数据在逻辑文件中的逻辑偏移量
     * @Param[IN] size: CU 数据的大小
     * @See also: 有关更多信息,请参阅...
     */
    void CUStorage::RemoteLoadCU(_in_ CU* cuPtr, _in_ uint64 offset, _in_ int size, bool direct_flag, bool inCUCache)
    {
        /* 调用方应该为 m_compressedLoadBuf 分配内存 */
        Assert(cuPtr->m_compressedLoadBuf != NULL);
    
        /* 获取偏移量和大小 */
        uint64 load_offset = GetAlignCUOffset(offset);
        cuPtr->m_head_padding_size = offset - load_offset;
        int load_size = GetAlignCUSize(cuPtr->m_head_padding_size + size);
    
        /* 获取当前 XLog 插入位置 */
        XLogRecPtr cur_lsn = GetInsertRecPtr();
    
        /* 获取远程地址 */
        char remote_address1[MAXPGPATH] = {0}; /* remote_address1[0] = '\0'; */
        char remote_address2[MAXPGPATH] = {0}; /* remote_address2[0] = '\0'; */
        GetRemoteReadAddress(remote_address1, remote_address2, MAXPGPATH);
    
        const char* remote_address = remote_address1;
        int retry_times = 0;
    
    retry:
        if (remote_address[0] == '\0' || remote_address[0] == ':')
            ereport(ERROR, (errcode(ERRCODE_IO_ERROR), (errmodule(MOD_REMOTE), errmsg("远程不可用"))));
    
        ereport(LOG,
                (errmodule(MOD_REMOTE),
                 errmsg("从远程节点读取 CU 文件,%s 偏移 %lu 大小 %d,源地址:%s",
                        GetColumnFileName(),
                        offset,
                        size,
                        remote_address)));
    
        PROFILING_REMOTE_START();
    
        int ret_code = ::RemoteGetCU(remote_address,
                                     m_cnode.m_rnode.spcNode,
                                     m_cnode.m_rnode.dbNode,
                                     m_cnode.m_rnode.relNode,
                                     m_cnode.m_attid,
                                     load_offset,
                                     load_size,
                                     cur_lsn,
                                     cuPtr->m_compressedLoadBuf);
    
        PROFILING_REMOTE_END_READ(size, (ret_code == REMOTE_READ_OK));
    
        if (ret_code != REMOTE_READ_OK) {
            if (IS_DN_DUMMY_STANDYS_MODE() || retry_times >= 1) {
                ereport(ERROR,
                        (errcode(ERRCODE_IO_ERROR),
                         (errmodule(MOD_REMOTE),
                          errmsg("从 %s 读取失败,%s", remote_address, RemoteReadErrMsg(ret_code)))));
            } else {
                ereport(WARNING,
                        (errmodule(MOD_REMOTE),
                         errmsg("从 %s 读取失败,%s,尝试另一个地址", remote_address, RemoteReadErrMsg(ret_code)),
                         handle_in_client(true)));
    
                /* 检查中断 */
                CHECK_FOR_INTERRUPTS();
    
                remote_address = remote_address2;
                ++retry_times;
                goto retry; /* 跳出 retry_times >= 1 */
            }
        }
    
        // CU 数据已加载完成,因此设置 CU 大小。
        // 我们将在解压缩 CU 数据时检查此值。
        cuPtr->m_compressedBuf = cuPtr->m_compressedLoadBuf + cuPtr->m_head_padding_size;
        cuPtr->SetCUSize(size);
        cuPtr->m_compressedBufSize = size;
        cuPtr->m_cache_compressed = true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    CUStorage::Load 函数

      CUStorage::RemoteLoadCU 函数的目的是从文件中读取数据,然后存储到指定的缓冲区 outbuf。函数会根据给定的偏移量和大小,从相应的文件中读取数据块。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    void CUStorage::Load(_in_ uint64 offset, _in_ int size, __inout char* outbuf, bool direct_flag)
    {
        int readFileId = CU_FILE_ID(offset);        // 获取读取文件的文件ID
        uint64 readOffset = CU_FILE_OFFSET(offset); // 获取在文件中的偏移量
        int read_size = min(size, (int)(MAX_FILE_SIZE - readOffset)); // 计算读取的大小,不超过文件剩余大小
        int left_size = size - read_size; // 剩余需要读取的大小
    
        char* read_buf = outbuf; // 读取缓冲区指针
        char tmpFileName[MAXPGPATH]; // 临时文件名
        errno_t rc = 0;
    
        while (read_size > 0) {
            GetFileName(tmpFileName, MAXPGPATH, readFileId); // 获取当前文件名
            if (strcmp(tmpFileName, m_fileName) != 0) {
                if (m_fd != FILE_INVALID)
                    FileClose(m_fd); // 关闭之前打开的文件
                m_fd = OpenFile(tmpFileName, readFileId, direct_flag); // 打开新文件
    
                if (m_fd == FILE_INVALID) {
                    ereport(ERROR, (errcode_for_file_access(), errmsg("无法打开文件 \"%s\"", tmpFileName)));
                }
    
                rc = strcpy_s(m_fileName, MAXPGPATH, tmpFileName); // 更新当前文件名
                securec_check_c(rc, "\0", "\0");
            }
    
            int nbytes = FilePRead(m_fd, read_buf, read_size, readOffset); // 从文件读取数据
            if (nbytes != read_size) {
                LoadCUReportIOError(tmpFileName, readOffset, nbytes, read_size, size); // 报告读取错误
            }
    
            ++readFileId;
            readOffset = 0;
            read_buf += read_size;
            read_size = (((unsigned int)left_size > MAX_FILE_SIZE) ? MAX_FILE_SIZE : left_size);
            left_size -= read_size;
        }
        if (left_size != 0) {
            ereport(ERROR, (errcode_for_file_access(),
                            errmsg("在加载数据时读取文件 \"%s\" 失败!", tmpFileName)));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    CUStorage::WSLoad 函数

      CUStorage::WSLoad 函数是用于工作空间WS)中加载数据。它会读取指定偏移量和大小的数据块,并将其存储到指定的缓冲区 outbuf 中。函数会检查读取的大小是否有效,以及是否符合对齐要求。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    int CUStorage::WSLoad(_in_ uint64 offset, _in_ int size, __inout char* outbuf, bool direct_flag)
    {
        int readFileId = CU_FILE_ID(offset);          // 获取读取文件的文件ID
        uint64 readOffset = CU_FILE_OFFSET(offset);   // 获取在文件中的偏移量
        int read_size = min(size, (int)(MAX_FILE_SIZE - readOffset)); // 计算读取的大小,不超过文件剩余大小
        int left_size = size - read_size;             // 剩余需要读取的大小
        errno_t rc = 0;
    
        char* read_buf = outbuf;  // 读取缓冲区指针
        char tmpFileName[MAXPGPATH];  // 临时文件名
        bool isCUReadSizeValid = false;  // 检查读取的大小是否有效
    
        const int CUALIGNSIZE = is_2byte_align ? ALIGNOF_TIMESERIES_CUSIZE : ALIGNOF_CUSIZE;
        isCUReadSizeValid = (read_size > 0 && 0 == read_size % CUALIGNSIZE);
    
        if (!isCUReadSizeValid) {
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("意外的CU文件读取信息: 偏移(%lu), 大小(%d), 文件ID(%d), 文件偏移(%lu), 期望读取大小(%d).",
                            offset,
                            size,
                            readFileId,
                            readOffset,
                            read_size)));
    
            return -1;
        }
    
        while (read_size > 0) {
            GetFileName(tmpFileName, MAXPGPATH, readFileId);  // 获取当前文件名
            if (strcmp(tmpFileName, m_fileName) != 0) {
                if (m_fd != FILE_INVALID)
                    FileClose(m_fd);  // 关闭之前打开的文件
                m_fd = WSOpenFile(tmpFileName, readFileId, direct_flag);  // 打开新文件
    
                if (FILE_INVALID == m_fd)
                    return 0;
    
                rc = strcpy_s(m_fileName, MAXPGPATH, tmpFileName);  // 更新当前文件名
                securec_check(rc, "\0", "\0");
            }
    
            int nbytes = 0;
    
            /* IO collector and IO scheduler for cstore insert */
            if (ENABLE_WORKLOAD_CONTROL)
                IOSchedulerAndUpdate(IO_TYPE_READ, 1, IO_TYPE_COLUMN);
    
            if ((nbytes = FilePRead(m_fd, read_buf, read_size, readOffset)) != read_size) {
                if (0 == nbytes) {
                    if (u_sess->attr.attr_storage.HaModuleDebug)
                        ereport(NOTICE,
                                (errcode_for_file_access(),
                                 errmsg("HA-WSLoad: 读取文件 \"%s\" 获取了0字节,请检查相应的CU文件。",
                                        tmpFileName)));
                    return 0;
                }
    
                if (nbytes % CUALIGNSIZE != 0) {
                    ereport(ERROR,
                            (errcode_for_file_access(),
                             errmsg("读取文件 \"%s\" 失败, 偏移(%lu), 大小(%d), 期望读取大小(%d), "
                                    "实际读取大小(%d), 可能需要首先升级cstore数据文件",
                                    tmpFileName,
                                    offset,
                                    size,
                                    read_size,
                                    nbytes)));
                } else {
                    ereport(ERROR,
                            (errcode_for_file_access(),
                             errmsg("无法读取文件 \"%s\", 偏移(%lu), 大小(%d), 期望读取大小(%d), "
                                    "实际读取大小(%d): %m",
                                    tmpFileName,
                                    offset,
                                    size,
                                    read_size,
                                    nbytes)));
                }
            }
    
            ++readFileId;
            readOffset = 0;
            read_buf += read_size;
            read_size = (((unsigned int)left_size > MAX_FILE_SIZE) ? MAX_FILE_SIZE : left_size);
            left_size -= read_size;
        }
    
        if (left_size != 0) {
            ereport(ERROR, (errcode_for_file_access(),
                            errmsg("在WSLoad中读取文件 \"%s\" 失败!", tmpFileName)));
        }
    
        return size;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95

    CUStorage::TruncateDataFile 函数

      CUStorage::TruncateDataFile 函数的目的是在同一个事务块XACT block)中,当关系创建截断操作同一个事务中发生时截断列数据文件。函数会循环处理所有的数据文件,打开每个文件,截断其内容,然后关闭文件。如果截断操作失败,会发出警告。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    /*
     * @Description:  在同一个XACT块中创建和截断关系时,截断列数据文件
     */
    void CUStorage::TruncateDataFile()
    {
        int fileId = 0;  // 文件ID初始化为0
        char tmpFileName[MAXPGPATH];  // 临时文件名的缓冲区
    
        while (1) {  // 无限循环,直到没有更多的数据文件
            if (!IsDataFileExist(fileId))  // 如果数据文件不存在,跳出循环
                break;
    
            GetFileName(tmpFileName, MAXPGPATH, fileId);  // 获取数据文件名
            File vfd = OpenFile(tmpFileName, fileId, false);  // 打开数据文件
            if (FileTruncate(vfd, 0)) {  // 截断文件内容
                ereport(WARNING, (errmsg("could not ftruncate file \"%s\": %m", tmpFileName)));  // 如果截断失败,发出警告
            }
            CloseFile(vfd);  // 关闭文件
    
            ++fileId;  // 增加文件ID,处理下一个文件
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    CUStorage::IsDataFileExist 函数

      CUStorage::IsDataFileExist 函数的目的是检查给定文件 ID 对应的数据文件是否存在。函数会构造数据文件名,然后使用 lstat 函数获取文件的状态信息。如果获取失败,说明文件不存在,返回 false;如果获取成功,说明文件存在,返回 true。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    bool CUStorage::IsDataFileExist(int fileId) const
    {
        char tmpFileName[MAXPGPATH];  // 临时文件名的缓冲区
        GetFileName(tmpFileName, MAXPGPATH, fileId);  // 获取数据文件名
    
        struct stat st;  // 用于保存文件状态信息的结构
        if (lstat((const char*)tmpFileName, &st) == -1)  // 获取文件状态信息
            return false;  // 如果获取失败,文件不存在
    
        return true;  // 如果获取成功,文件存在
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    CUStorage::GetBcmFileName 函数

      CUStorage::GetBcmFileName 函数的目的是构造与给定文件 ID 相关的 BCM 文件名。函数首先使用 snprintf_s 构造文件名,格式为 “%s_%s.%d” 或 “%s_%s”,其中 %s 会被替换为文件名前缀和 BCM fork 的名称%d 会被替换为文件 ID。如果文件 ID 大于0,表示有序列号,会构造带有序列号的文件名;如果文件 ID0,表示无序列号,会构造不带序列号的文件名。函数最后确保字符串以 null 结尾。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    void CUStorage::GetBcmFileName(_out_ char* bcmfile, _in_ int fileId) const
    {
        Assert(fileId >= 0);  // 断言文件ID必须大于等于0
    
        // bcm file list: 16385_C1_bcm  16385_C1_bcm.1  16385_C1_bcm.2 ....
        int rc = 0;  // 用于保存 `snprintf_s` 函数的返回值
        if (fileId > 0) {  // 如果文件ID大于0,表示有序列号
            rc = snprintf_s(bcmfile, MAXPGPATH, MAXPGPATH - 1, "%s_%s.%d", m_fileNamePrefix, forkNames[BCM_FORKNUM], fileId);
        } else {  // 如果文件ID为0,表示无序列号
            rc = snprintf_s(bcmfile, MAXPGPATH, MAXPGPATH - 1, "%s_%s", m_fileNamePrefix, forkNames[BCM_FORKNUM]);
        }
        securec_check_ss(rc, "", "");  // 检查 `snprintf_s` 的返回值
        bcmfile[MAXPGPATH - 1] = '\0';  // 确保字符串以 null 结尾
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    CUStorage::TruncateBcmFile 函数

      CUStorage::TruncateBcmFile 函数的目的是==截断与给定文件 ID 相关的 BCM 文件==。函数首先初始化文件 ID 为 0,然后进入一个无限循环。在每次循环中,它检查与当前文件 ID 相关的 BCM 文件是否存在,如果不存在则退出循环。如果文件存在,函数获取 BCM 文件名,然后打开文件,尝试截断文件大小为 0。如果截断文件失败,函数会发出警告。最后,函数关闭文件,然后增加文件 ID,以便处理下一个 BCM 文件。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    /*
     * @Description:  truncate column bcm files which relation CREATE and TRUNCATE in same XACT block
     */
    void CUStorage::TruncateBcmFile()
    {
        int fileId = 0;  // 初始化文件ID为0
        char tmpFileName[MAXPGPATH];  // 用于保存文件名的缓冲区
    
        while (1) {  // 无限循环,直到找不到更多的BCM文件为止
            if (!IsBcmFileExist(fileId))  // 如果BCM文件不存在,退出循环
                break;
    
            GetBcmFileName(tmpFileName, fileId);  // 获取BCM文件名
            File vfd = OpenFile(tmpFileName, fileId, false);  // 打开BCM文件
            if (FileTruncate(vfd, 0)) {  // 如果截断文件失败,发出警告
                ereport(WARNING, (errmsg("could not ftruncate file \"%s\": %m", tmpFileName)));
            }
            CloseFile(vfd);  // 关闭文件
    
            ++fileId;  // 增加文件ID,处理下一个BCM文件
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    CUStorage::IsBcmFileExist 函数

      CUStorage::IsBcmFileExist 函数的目的是检查与给定文件 ID 相关的 BCM 文件是否存在。函数首先获取 BCM 文件名,然后使用 lstat 函数检查文件是否存在。如果 lstat 返回 -1,说明文件不存在,函数返回 false;否则,说明文件存在,函数返回 true。函数源码如下所示:(路径:src/gausskernel/storage/cstore/custorage.cpp

    bool CUStorage::IsBcmFileExist(_in_ int fileId) const
    {
        char tmpFileName[MAXPGPATH];  // 用于保存文件名的缓冲区
        GetBcmFileName(tmpFileName, fileId);  // 获取BCM文件名
    
        struct stat st;  // 用于存储文件状态信息的结构体
        if (lstat((const char*)tmpFileName, &st) == -1)  // 使用lstat检查文件是否存在
            return false;  // 如果文件不存在,返回false
    
        return true;  // 如果文件存在,返回true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    总结

      CUStorage 类相关成员函数操作较多,这里不一一列举了,感兴趣的读者可自行阅读源码。

  • 相关阅读:
    Java怎么实现word转PDF?
    HBase入门至进阶以及开发等知识梳理
    大四开始学前端|Javascript
    用Vuex做共享,但Echarts不同步更新,如何更新dom
    Vue生命周期与自定义指令
    Go 单元测试之HTTP请求与API测试
    VauditDemo靶场代码审计
    Kubernetes 系统化学习之 持久存储篇(五)
    SimpleChannelInboundHandler使用总结
    气象台卫星监测vr交互教学增强学生的学习兴趣和动力
  • 原文地址:https://blog.csdn.net/qq_43899283/article/details/134404259