• 【PostgreSQL内核学习(十四)—— (PortalRunMulti 和 PortalRunUtility)】


    声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
    本文主要参考了《PostgresSQL数据库内核分析》一书,OpenGauss1.1.0 的开源代码和《OpenGauss数据库源码解析》一书以及OpenGauss社区学习文档

    概述

      在【PostgreSQL内核学习(八)—— 查询执行(查询执行策略)】中我们了解到,Portal 的执行过程为:CreatePortal —> PortalDefineQuery —> PortalStart —> PortalRun —> PortalDrop 。随后,我们在在【PostgreSQL内核学习(十一)—— (CreatePortal)】一文中介绍了 CreatePortal 函数的执行过程。又在【PostgreSQL内核学习(十二)—— (PortalStart)】一文中学习了 PortalStart 的执行过程。其次在【PostgreSQL内核学习(十三)—— (PortalRun)】一文中学习了 PortalRun 的执行过程。其中,在 PortalRun 中我们提到, FillPortalStore 函数用于执行查询并将其结果加载到 portal 的元组存储区域中。而在函数中,PortalRunMultiPortalRunUtility 是两个执行函数,用于处理不同类型的 SQL 查询和命令。本文则分别来学习 PortalRunMultiPortalRunUtility 函数的执行过程。

    PortalRunMulti 函数

      PortalRunMulti 函数的主要作用执行一个 Portal 中的多个查询或非 SELECT 类型的查询。它首先检查是否需要为每个查询创建快照,然后循环处理每个查询,执行它们并返回结果,最后生成适当的完成标签

    函数执行过程的详细解释如下

    1. 首先,函数初始化了一些变量,包括标志是否已设置活动快照迭代查询语句列表的列表项、以及统计时间记录等。
    2. 接下来,函数检查目标接收器的类型,如果目标接收器DestRemoteExecute,则将其更改为 DestNone,因为客户端不会期望接收元组的结果。
    3. 函数检查 SQL 活动功能和仪器化选项,根据一些条件设置仪器化标志位。
    4. 接下来,函数通过遍历查询语句列表处理每个查询。对于每个查询,它执行以下步骤:
    • 如果查询是可计划查询PlannedStmt)且不是实用程序语句,则开始记录查询执行的跟踪信息,并可能重置执行器统计信息。
    • 函数检查是否需要为查询创建快照,如果需要,则创建或更新快照。
    • 如果是分布式环境PGXC_COORDINATOR)或单节点环境SINGLE_NODE),设置查询的仪器化选项
    • 根据查询是否可以设置标签,执行查询并生成相应的完成标签,将结果发送到目标接收器。
    • 如果查询是 INSERT,特殊处理以生成合适的完成标签
    • 记录查询执行完成的统计信息,包括执行时间执行器统计信息
    • 在查询之间递增命令计数器,但在最后一个查询之后不递增。
    • 清除子上下文以回收临时内存
    1. 最后,函数弹出之前压入的快照(如果有的话),并根据情况生成完成标签,如果未提供完成标签,则使用 portalcommandTag 作为默认完成标签

    以下是函数 PortalRunMulti 的入参解释:

    1. Portal portal: 表示要执行的 Portal,一个 Portal 是一个命名的查询执行上下文,包含一个或多个查询语句。
    2. bool isTopLevel: 一个布尔值,表示是否处于最顶层的执行环境。通常,如果这是最顶层的执行,它为 true,否则为 false
    3. DestReceiver* dest: 表示结果输出的目标接收器,用于接收执行查询的结果。可以是不同的类型,如表格文件等。在代码中,会根据目标接收器的类型来决定如何发送结果。
    4. DestReceiver* altdest: 表示备用结果输出的目标接收器。这通常用于处理特殊情况下的查询,例如实用程序语句或 NOTIFY 命令。
    5. char* completionTag: 一个指向字符数组的指针,用于存储执行完成时的标签completion tag)。这个标签通常包括有关执行结果的信息,如受影响的行数等。

      PortalRunMulti 函数源码如下:(路径:src/gausskernel/process/tcop/pquery.cpp

    /*
     * PortalRunMulti
     *		执行一个 portal 中的查询,通常是多个查询或非 SELECT 类型的查询
     */
    static void PortalRunMulti(
        Portal portal, bool isTopLevel, DestReceiver* dest, DestReceiver* altdest, char* completionTag)
    {
        bool active_snapshot_set = false; // 标志当前是否设置了活动快照
        ListCell* stmtlist_item = NULL;   // 用于遍历查询语句列表的列表项指针
        PGSTAT_INIT_TIME_RECORD();        // 初始化统计时间记录
    
    #ifdef PGXC
        CombineTag combine; // 用于组合标签信息的结构,在分布式环境下使用
        combine.cmdType = CMD_UNKNOWN;
        combine.data[0] = '\0';
    #endif
    
        bool force_local_snapshot = false; // 是否强制使用本地快照
    
        if ((portal != NULL) && (portal->cplan != NULL)) {
            /* 将 single_shard_stmt 复制到本地变量 force_local_snapshot */
            force_local_snapshot = portal->cplan->single_shard_stmt;
        }
    
        /*
         * 如果目标接收器是 DestRemoteExecute,则将其更改为 DestNone。
         * 原因是客户端不会期望任何元组,实际上也没有办法知道它们是什么,
         * 因为当这种 portal 执行策略生效时,没有提供描述消息来传递 RowDescription 信息。
         * 这目前只会影响由重写规则添加到非 SELECT 查询中的 SELECT 命令:
         * 这些命令将被执行,但结果将被丢弃,除非使用 "simple Query" 协议。
         */
        if (dest->mydest == DestRemoteExecute)
            dest = None_Receiver;
        if (altdest->mydest == DestRemoteExecute)
            altdest = None_Receiver;
    
        /* SQL 活动功能:处理 CREATE TABLE AS 类型的情况 */
        uint32 instrument_option = 0; // 用于仪器化选项的标志位
        if (IS_PGXC_COORDINATOR && u_sess->attr.attr_resource.resource_track_level == RESOURCE_TRACK_OPERATOR &&
            IS_STREAM && u_sess->attr.attr_resource.use_workload_manager &&
            t_thrd.wlm_cxt.collect_info->status != WLM_STATUS_RUNNING && IsSupportExplain(portal->commandTag) &&
            !u_sess->attr.attr_sql.enable_cluster_resize) {
            instrument_option |= INSTRUMENT_TIMER;
            instrument_option |= INSTRUMENT_BUFFERS;
        }
    
        /*
         * 循环处理由分析和重写生成的单个解析树产生的各个查询
         */
        foreach (stmtlist_item, portal->stmts) {
            Node* stmt = (Node*)lfirst(stmtlist_item);
    
    #ifdef ENABLE_MOT
            bool isMOTTable = false;           // 是否为 MOT 表
            JitExec::JitContext* mot_jit_context = nullptr; // MOT JIT 上下文
    #endif
    
            /*
             * 如果在前一个命令中接收到了取消信号,则退出循环
             */
            CHECK_FOR_INTERRUPTS();
    
            if (IsA(stmt, PlannedStmt) && ((PlannedStmt*)stmt)->utilityStmt == NULL) {
                /*
                 * 处理可计划查询。
                 */
                PlannedStmt* pstmt = (PlannedStmt*)stmt;
    
                TRACE_POSTGRESQL_QUERY_EXECUTE_START(); // 记录查询执行开始的跟踪信息
    
                if (u_sess->attr.attr_common.log_executor_stats)
                    ResetUsage(); // 重置统计信息
    
                PGSTAT_START_TIME_RECORD(); // 开始统计时间
    
                /*
                 * 对于可计划查询,必须始终有一个快照,除非是 MOT 查询。
                 * 第一次执行时,获取一个新的快照;对于后续的查询,只需更新快照的命令计数器。
                 */
    #ifdef ENABLE_MOT
                if ((portal->cplan != NULL && portal->cplan->storageEngineType == SE_TYPE_MOT)) {
                    isMOTTable = true;
                    mot_jit_context = portal->cplan->mot_jit_context;
                }
    
                if (!isMOTTable) {
    #endif
                    if (!active_snapshot_set) {
                        PushActiveSnapshot(GetTransactionSnapshot(force_local_snapshot));
                        active_snapshot_set = true;
                    } else
                        UpdateActiveSnapshotCommandId();
    #ifdef ENABLE_MOT
                }
    #endif
    
                if (IS_PGXC_COORDINATOR || IS_SINGLE_NODE)
                    pstmt->instrument_option = instrument_option;
    
                if (pstmt->canSetTag) {
                    /* 查询可以设置标签字符串 */
    #ifdef ENABLE_MOT
                    ProcessQuery(pstmt, portal->sourceText, portal->portalParams,
                        isMOTTable, mot_jit_context, dest, completionTag);
    #else
                    ProcessQuery(pstmt, portal->sourceText, portal->portalParams, dest, completionTag);
    #endif
    #ifdef PGXC
                    /* 对于 INSERT,需要特殊处理 */
                    if (IS_PGXC_COORDINATOR && pstmt->commandType == CMD_INSERT)
                        HandleCmdComplete(pstmt->commandType, &combine, completionTag, strlen(completionTag));
    #endif
                } else {
                    /* 由重写添加的语句不能设置标签 */
    #ifdef ENABLE_MOT
                    ProcessQuery(pstmt, portal->sourceText, portal->portalParams,
                        isMOTTable, mot_jit_context, altdest, NULL);
    #else
                    ProcessQuery(pstmt, portal->sourceText, portal->portalParams, altdest, NULL);
    #endif
                }
    
                PGSTAT_END_TIME_RECORD(EXECUTION_TIME); // 结束统计时间
    
                if (u_sess->attr.attr_common.log_executor_stats)
                    ShowUsage("EXECUTOR STATISTICS"); // 打印执行器统计信息
    
                TRACE_POSTGRESQL_QUERY_EXECUTE_DONE(); // 记录查询执行完成的跟踪信息
            } else {
                /*
                 * 处理实用程序函数(例如 CREATE、DESTROY 等)
                 *
                 * 如果查询列表中只有一个语句,则假定该语句可以设置标签。
                 *
                 * 我们不能在此处为实用程序命令设置快照(如果需要快照,PortalRunUtility 会处理)。
                 * 如果一个实用程序命令独立存在于一个 portal 中,那么一切都没问题。
                 * 实用程序命令可以成为较长列表的一部分的唯一情况是允许规则包含 NotifyStmt。
                 * NotifyStmt 不关心是否有快照,所以如果有一个快照,我们就让当前快照保持不变。
                 */
                if (list_length(portal->stmts) == 1) {
                    AssertEreport(!active_snapshot_set, MOD_EXECUTOR, "No active snapshot for utility commands");
                    /* 查询可以设置标签字符串 */
                    PortalRunUtility(portal, stmt, isTopLevel, dest, completionTag);
                } else if (IsA(stmt, AlterTableStmt) || IsA(stmt, ViewStmt) || IsA(stmt, RuleStmt)) {
                    AssertEreport(!active_snapshot_set, MOD_EXECUTOR, "No active snapshot for utility commands");
                    /* 查询可以设置标签字符串 */
                    PortalRunUtility(portal, stmt, isTopLevel, dest, NULL);
                } else {
                    AssertEreport(IsA(stmt, NotifyStmt), MOD_EXECUTOR, "Not a NotifyStmt");
                    /* 由重写添加的语句不能设置标签 */
                    PortalRunUtility(portal, stmt, isTopLevel, altdest, NULL);
                }
            }
    
            /*
             * 在查询之间递增命令计数器,但在最后一个查询之后不递增。
             */
            if (lnext(stmtlist_item) != NULL)
                CommandCounterIncrement();
    
            /*
             * 清除子上下文以回收临时内存。
             */
            AssertEreport(
                PortalGetHeapMemory(portal) == CurrentMemoryContext, MOD_EXECUTOR, "Memory context is not consistant");
            MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
        }
    
        /* 弹出快照(如果已经压入) */
        if (active_snapshot_set)
            PopActiveSnapshot();
    
        /*
         * 如果提供了命令完成标签,则使用它。
         * 否则,使用 portal 的 commandTag 作为默认的完成标签。
         *
         * 例外情况:客户端期望 INSERT/UPDATE/DELETE 标签具有计数值,因此
         * 我们使用零来伪造它们。这可以发生在没有与原始查询类型相匹配的
         * 替代查询的情况下,例如,如果 DO INSTEAD 规则中没有替代查询。
         * 在这种情况下,我们在这里打印 "0 0",因为技术上没有匹配标签
         * 类型的查询,如果对于不同查询类型打印非零计数值似乎不正确,
         * 例如,如果 INSERT 执行了 UPDATE,则不应该打印 "0 1",表示更新了一行。
         * 有关详细信息,请参阅 QueryRewrite() 的第 3 步。
         */
        errno_t errorno = EOK;
    #ifdef PGXC
        if (IS_PGXC_COORDINATOR && completionTag != NULL && combine.data[0] != '\0') {
            errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, combine.data);
            securec_check(errorno, "\0", "\0");
        }
    #endif
    
        if (completionTag != NULL && completionTag[0] == '\0') {
            if (portal->commandTag) {
                errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, portal->commandTag);
                securec_check(errorno, "\0", "\0");
            }
            if (strcmp(completionTag, "SELECT") == 0) {
                errorno = sprintf_s(completionTag, COMPLETION_TAG_BUFSIZE, "SELECT 0 0");
                securec_check_ss(errorno, "\0", "\0");
            } else if (strcmp(completionTag, "INSERT") == 0) {
                errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, "INSERT 0 0");
                securec_check(errorno, "\0", "\0");
            } else if (strcmp(completionTag, "UPDATE") == 0) {
                errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, "UPDATE 0");
                securec_check(errorno, "\0", "\0");
            } else if (strcmp(completionTag, "DELETE") == 0) {
                errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, "DELETE 0");
                securec_check(errorno, "\0", "\0");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212

    ProcessQuery 函数

      ProcessQuery 函数是处理一个查询的执行,包括准备执行环境执行查询生成命令完成标签等。以下是函数的主要执行过程:

    1. 首先,函数声明了一个 QueryDesc 类型的指针加粗样式 queryDesc 用于存储查询描述信息
    2. 接着,使用 elog(DEBUG3, "ProcessQuery"); 记录调试信息,表示进入了 ProcessQuery 函数。
    3. 针对启用了 MOT 存储引擎的情况进行了一些特殊处理,包括检查是否需要执行 MOT 的 JIT 编译查询,如果需要则调用 ProcessMotJitQuery 函数执行,否则继续执行下面的步骤。
    4. 创建 queryDesc 对象,该对象包含了执行查询所需的各种信息,包括查询计划源文本快照等。
    5. 如果启用了工作负载管理Workload Manager),则调用 WLMTopSQLReady 函数,准备跟踪顶级 SQL 查询。
    6. 如果是协调节点PGXC_COORDINATOR)或单节点环境SINGLE_NODE),则根据需要设置查询的仪器化选项
    7. 调用 ExecutorStart 函数,准备执行查询计划。这个过程包括初始化查询执行状态准备执行计划中的节点等。
    8. 行触发器(Row Trigger)可运输性信息传递给执行上下文estate)。
    9. 如果启用了工作负载管理,进行一些初始化操作并调用 dywlm_client_manager 函数,用于处理工作负载管理相关的操作。
    10. 调用 ExecutorRun 函数,执行查询计划,生成结果
    11. 根据查询的操作类型(如 SELECT、INSERT、UPDATE、DELETE、MERGE)构建命令完成标签completion tag),并将其存储在 completionTag 中。这个标签包括了有关执行结果的信息,如受影响的行数。
    12. 最后,调用 ExecutorFinishExecutorEnd 函数,关闭所有扫描并释放分配的资源,完成查询的执行。
    13. 最后,释放 queryDesc 对象的内存

      ProcessQuery 函数源码如下:(路径:src/gausskernel/process/tcop/pquery.cpp

    static void ProcessQuery(
        PlannedStmt* plan, const char* sourceText, ParamListInfo params, DestReceiver* dest, char* completionTag)
    {
        QueryDesc* queryDesc = NULL; // 声明一个查询描述结构体的指针,用于存储查询的执行信息
    
        elog(DEBUG3, "ProcessQuery"); // 记录调试信息,表示进入了 ProcessQuery 函数
    
    #ifdef ENABLE_MOT
        Snapshot snap = InvalidSnapshot;
    
        // 如果是 MOT 存储引擎的表,并且 JIT 编译可用,执行 MOT 的 JIT 编译查询
        if (isMOTTable && motJitContext && !IS_PGXC_COORDINATOR && JitExec::IsMotCodegenEnabled()) {
            ProcessMotJitQuery(plan, sourceText, params, motJitContext, completionTag);
            return; // 函数结束,不执行后续步骤
        }
    
        if (!isMOTTable) {
            snap = GetActiveSnapshot(); // 获取当前活动的快照
        }
    #endif
    
        /*
         * 创建 QueryDesc 对象,该对象包含了执行查询所需的各种信息,包括查询计划、源文本、快照等
         */
        queryDesc = CreateQueryDesc(plan, sourceText, snap, InvalidSnapshot, dest, params, 0, motJitContext);
    
        if (ENABLE_WORKLOAD_CONTROL && (IS_PGXC_COORDINATOR || IS_SINGLE_NODE)) {
            WLMTopSQLReady(queryDesc); // 启用工作负载管理时,准备跟踪顶级 SQL 查询
        }
    
        if (IS_PGXC_COORDINATOR || IS_SINGLE_NODE) {
            // 如果需要跟踪资源使用情况,设置查询的仪器化选项
            if (u_sess->exec_cxt.need_track_resource) {
                queryDesc->instrument_options = plan->instrument_option;
                queryDesc->plannedstmt->instrument_option = plan->instrument_option;
            } else {
                queryDesc->plannedstmt->instrument_option = 0;
            }
        }
    
        /*
         * 调用 ExecutorStart 函数,准备执行查询计划,包括初始化查询执行状态、准备执行计划中的节点等
         */
        ExecutorStart(queryDesc, 0);
    
        /* 传递行触发器(Row Trigger)的可运输性信息给执行上下文 */
        queryDesc->estate->isRowTriggerShippable = plan->isRowTriggerShippable;
    
        /* workload client manager */
        if (ENABLE_WORKLOAD_CONTROL) {
            WLMInitQueryPlan(queryDesc); // 初始化工作负载管理相关信息
            dywlm_client_manager(queryDesc); // 处理工作负载管理相关操作
        }
    
        /*
         * 运行查询计划到完成
         */
        ExecutorRun(queryDesc, ForwardScanDirection, 0L);
    
        /*
         * 构建命令完成状态字符串,如果需要的话
         */
        if (completionTag != NULL) {
            Oid lastOid;
            errno_t ret = EOK;
    
            switch (queryDesc->operation) {
                case CMD_SELECT:
                    ret = snprintf_s(completionTag,
                        COMPLETION_TAG_BUFSIZE,
                        COMPLETION_TAG_BUFSIZE - 1,
                        "SELECT %lu",
                        queryDesc->estate->es_processed); // 构建 SELECT 命令的完成标签
                    securec_check_ss(ret, "\0", "\0");
                    break;
                case CMD_INSERT:
                    if (queryDesc->estate->es_processed == 1)
                        lastOid = queryDesc->estate->es_lastoid; // 获取最后一个 OID
                    else
                        lastOid = InvalidOid;
                    ret = snprintf_s(completionTag,
                        COMPLETION_TAG_BUFSIZE,
                        COMPLETION_TAG_BUFSIZE - 1,
                        "INSERT %u %lu",
                        lastOid,
                        queryDesc->estate->es_processed); // 构建 INSERT 命令的完成标签
                    securec_check_ss(ret, "\0", "\0");
                    break;
                case CMD_UPDATE:
                    ret = snprintf_s(completionTag,
                        COMPLETION_TAG_BUFSIZE,
                        COMPLETION_TAG_BUFSIZE - 1,
                        "UPDATE %lu",
                        queryDesc->estate->es_processed); // 构建 UPDATE 命令的完成标签
                    securec_check_ss(ret, "\0", "\0");
                    break;
                case CMD_DELETE:
                    ret = snprintf_s(completionTag,
                        COMPLETION_TAG_BUFSIZE,
                        COMPLETION_TAG_BUFSIZE - 1,
                        "DELETE %lu",
                        queryDesc->estate->es_processed); // 构建 DELETE 命令的完成标签
                    securec_check_ss(ret, "\0", "\0");
                    break;
                case CMD_MERGE:
                    ret = snprintf_s(completionTag,
                        COMPLETION_TAG_BUFSIZE,
                        COMPLETION_TAG_BUFSIZE - 1,
                        "MERGE %lu",
                        queryDesc->estate->es_processed); // 构建 MERGE 命令的完成标签
                    securec_check_ss(ret, "\0", "\0");
                   break;
                default:
                    ret = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, "?\?\?"); // 未知命令类型
                    securec_check(ret, "\0", "\0");
                    break;
            }
        }
    
        /*
         * 现在,关闭所有扫描并释放分配的资源
         */
        ExecutorFinish(queryDesc);
        ExecutorEnd(queryDesc);
    
        FreeQueryDesc(queryDesc); // 释放 QueryDesc 对象的内存
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127

    PortalRunUtility 函数

      PortalRunUtility 函数用于执行 Portal 中的实用程序语句,它设置适当的快照,执行语句的逻辑,然后处理内存上下文和快照的弹出。该函数与 PortalRunMulti 函数最大的区别是 PortalRunMulti 用于执行一系列查询语句,通常用于包含多个查询的 Portal。而 PortalRunUtility 用于执行单个实用程序语句,通常用于包含单个实用程序语句的 Portal

    函数 PortalRunUtility 的执行过程如下

    1. 首先,函数内部声明了一个布尔变量 active_snapshot_set,用于标志是否设置了活动快照
    2. 执行 elog(DEBUG3, "ProcessUtility") 语句,记录调试信息,表示进入了 ProcessUtility 函数。
    3. 接下来,根据实用程序语句的类型和一些条件判断,决定是否需要设置快照。大部分语句都需要设置快照,但有一些特例情况不需要,例如事务控制语句锁操作变量设置SHOW语句等。
    4. 如果需要设置快照,则调用PushActiveSnapshot(GetTransactionSnapshot())设置当前事务的快照为活动快照
    5. 如果启用了工作负载控制ENABLE_WORKLOAD_CONTROL),则调用WLMSetExecutorStartTime()记录执行开始时间
    6. 调用ProcessUtility函数,执行实用程序语句的逻辑。这个函数负责处理各种实用程序语句,如事务控制锁操作变量设置等。
    7. 在执行实用程序语句期间,可能会更改内存上下文(MemoryContext),因此在执行结束后,使用MemoryContextSwitchTo(PortalGetHeapMemory(portal))来切换回 Portal 的内存上下文。
    8. 一些实用程序语句执行后可能会弹出 u_sess->utils_cxt.ActiveSnapshot 栈,因此在此处检查是否需要弹出栈。这是为了确保不会弹出不属于当前操作的快照。
    9. 最后,如果需要生成命令完成标签completionTag),则根据不同类型的实用程序语句设置不同的标签,反映实用程序语句的执行结果

      PortalRunUtility 函数源码如下:(路径:src/gausskernel/process/tcop/pquery.cpp

    /*
     * PortalRunUtility
     *		执行一个 Portal 中的实用程序语句(utility statement)。
     */
    static void PortalRunUtility(Portal portal, Node* utilityStmt, bool isTopLevel, DestReceiver* dest, char* completionTag)
    {
        bool active_snapshot_set = false; // 标志是否设置了活动快照
    
        elog(DEBUG3, "ProcessUtility"); // 记录调试信息,表示进入了 ProcessUtility 函数
    
        /*
         * 如果实用程序语句需要快照,则设置快照。不需要快照的语句列表有限,因此我们列举不需要快照的情况。
         * 事务控制语句、LOCK 语句和 SET 语句不能设置快照,因为它们需要在事务快照模式事务的开始时执行,而不能冻结快照。
         * 我们还允许 SHOW 语句不设置快照。其他语句都需要设置快照,因为它们可能修改数据库。
         */
    #ifdef ENABLE_MOT
        if (!(portal->cplan != NULL && portal->cplan->storageEngineType == SE_TYPE_MOT) &&
                !(IsA(utilityStmt, TransactionStmt) || IsA(utilityStmt, LockStmt) || IsA(utilityStmt, VariableSetStmt) ||
    #else
        if (!(IsA(utilityStmt, TransactionStmt) || IsA(utilityStmt, LockStmt) || IsA(utilityStmt, VariableSetStmt) ||
    #endif
                IsA(utilityStmt, VariableShowStmt) || IsA(utilityStmt, ConstraintsSetStmt) ||
                /* 以下是一些效率优化的特例情况 */
                IsA(utilityStmt, FetchStmt) || IsA(utilityStmt, ListenStmt) || IsA(utilityStmt, NotifyStmt) ||
                IsA(utilityStmt, UnlistenStmt) ||
    #ifdef PGXC
            (IsA(utilityStmt, RefreshMatViewStmt) && IS_PGXC_COORDINATOR) ||
                (IsA(utilityStmt, CheckPointStmt) && IS_PGXC_DATANODE)))
    #else
                IsA(utilityStmt, CheckPointStmt)))
    #endif
        {
            PushActiveSnapshot(GetTransactionSnapshot()); // 设置当前事务的快照为活动快照
            active_snapshot_set = true;
        } else
            active_snapshot_set = false;
    
        /* 如果启用工作负载控制,记录执行开始时间 */
        if (ENABLE_WORKLOAD_CONTROL)
            WLMSetExecutorStartTime();
    
        /*
         * 调用 ProcessUtility 函数,执行实用程序语句。
         * 这将执行语句的实际逻辑,如事务控制、锁操作、变量设置等。
         */
        ProcessUtility(utilityStmt,
            portal->sourceText,
            portal->portalParams,
            isTopLevel,
            dest,
    #ifdef PGXC
            false, // PGXC 相关标志,此处不适用
    #endif /* PGXC */
            completionTag);
    
        /* 一些实用程序语句可能会更改内存上下文 */
        MemoryContextSwitchTo(PortalGetHeapMemory(portal));
    
        /*
         * 一些实用程序语句可能会在我们下面弹出 u_sess->utils_cxt.ActiveSnapshot 栈,
         * 所以我们只在实际设置了快照的情况下弹出栈。
         * 请注意,可以在事务内部运行的实用程序语句的集合必须与不允许在事务内运行的集合相同;
         * 否则,我们可能会弹出属于其他操作的快照。
         */
        if (active_snapshot_set && ActiveSnapshotSet())
            PopActiveSnapshot();
    
        perm_space_value_reset(); // 重置永久内存空间的值
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
  • 相关阅读:
    中国电子云数据库 Mesh 项目 DBPack 的实践
    Window系统安装 bee
    Java开源工具库使用之高性能内存数据查找库CQengine
    查找内轮廓(孔洞)
    【MySQL教程】| (1-1) 2023MySQL-8.1.0 安装教程
    自动增加 Android App 的版本号
    干货 | 如何快速实现 BitSail Connector?
    windows 调用 static lib
    360测试开发技术面试题目
    Maven项目管理(一)
  • 原文地址:https://blog.csdn.net/qq_43899283/article/details/133080541