• Greenplum数据库外部表——fileam封装


    请添加图片描述
    该篇博客主要关注src/backend/access/external/fileam.c文件,其封装了底层获取数据和流控的实现细节,为上层执行节点提供抽象API(SCAN、INSERT功能)。

    SCAN

    ExecInitExternalScan
     | -- external_beginscan
    
    ExecExternalScan 
     | -- ExternalNext
       | -- external_getnext_init
       | -- external_getnext
           
    ExecReScanExternal
     | -- external_rescan
          | -- external_stopscan
    
    ExecSquelchExternalScan
     | -- external_stopscan
    
    ExecEndExternalScan/ExternalNext/ExecSquelchExternalScan
     | -- ExecEagerFreeExternalScan
           | -- external_endscan
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    ExecInitExternalScan函数在Greenplum数据库外部表——Scan执行节点中描述过其大致流程,这里我们主要关注于currentScanDesc = external_beginscan(currentRelation, node->scancounter, node->uriList, node->fmtOptString, node->fmtType, node->isMasterOnly, node->rejLimit, node->rejLimitInRows, node->logErrors, node->encoding)函数的调用,其主要工作就是创建FileScanDesc结构体并设置到ExternalScanState.ess_ScanDesc成员中,如下图的黄框所示。函数首先给FileScanDescData申请内存,设置fs_inited为false表明FileScanDesc还没有初始化完成,fs_rd设置为形参relation,fs_scancounter为形参scancounter,fs_noop成员代表segment没有数据获取(也就是no operation),fs_file设置为NULL,raw_buf_done仅用于custom external table;如果relation->rd_att->constr != NULL && relation->rd_att->constr->num_check > 0成立,说明外部表上由constraint,需要将constraintExpr设置到fs_constraintExprs成员;下一步需要根据角色进行不同的初始化,如果为QE,则从uriList中更加segindex获取对应的uri,最后设置到fs_uri中,如果是QD且设置了isMasterOnly,说明是ON MASTER table,最后设置到fs_uri中;从系统表中为外部表每列获取input function和element type设置到in_functions和typioparams数组中;如果是custom格式,则从形参fmtOptString中获取custom_formatter_name和custom_formatter_params,否则调用parseCopyFormatString函数从形参fmtOptString中获取设置为copyFmtOpts;调用BeginCopyFrom(relation, NULL, false, external_getdata_callback,(void *) scan,NIL,copyFmtOpts,NIL)初始化fs_pstate成员;从pg_exttable系统表中获取该外部表条目,利用InitParseState(scan->fs_pstate, relation, false, fmtType,scan->fs_uri, rejLimit, rejLimitInRows, logErrors, extentry->options)再次初始化fs_pstate成员;如果是custom external table,通过custom_formatter_name获取formatter函数oid,并将其设置到fs_custom_formatter_func,最后为fs_formatter创建FormatterData内存。
    请添加图片描述

    ExternalNext函数首先调用external_getnext_init函数创建ExternalSelectDescData结构体,如果node->ss.ps不为null,则将其ps_ProjInfo设置到ExternalSelectDesc的projInfo成员中(projInfo和filter_quals,即投影和过滤可以下推到external_getnext中处理)。获取tuple利用Greenplum数据库外部表——external_getnext获取元组函数获取,而ExternalSelectDescData结构体也是从该函数穿下去的,因此只有使用custom extable才能进行谓词下推。

    static TupleTableSlot *ExternalNext(ExternalScanState *node){
    	bool		scanNext = true;	
    	EState	   *estate = node->ss.ps.state; /* get information from the estate and scan state */
    	FileScanDesc scandesc = node->ess_ScanDesc;
    	ScanDirection direction = estate->es_direction;
    	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
    	
    	ExternalSelectDesc externalSelectDesc = external_getnext_init(&(node->ss.ps));
    	if (gp_external_enable_filter_pushdown)
    		externalSelectDesc->filter_quals = node->ss.ps.plan->qual;
    		
    	/* get the next tuple from the file access methods */
    	while(scanNext){
    		HeapTuple tuple = external_getnext(scandesc, direction, externalSelectDesc);
    		if (tuple){
    			ExecStoreHeapTuple(tuple, slot, InvalidBuffer, true);
    			if (node->ess_ScanDesc->fs_hasConstraints && !ExternalConstraintCheck(slot, node)){
    				ExecClearTuple(slot);
    				continue;
    			}
    		    /* CDB: Label each row with a synthetic ctid if needed for subquery dedup.  */
    		    if (node->cdb_want_ctid && !TupIsNull(slot)){ slot_set_ctid_from_fake(slot, &node->cdb_fake_ctid); }
    		}else{
    			ExecClearTuple(slot);
    			if (!node->delayEagerFree){ ExecEagerFreeExternalScan(node); }
    		}
    		scanNext = false;
    	}
    	pfree(externalSelectDesc);
    	return slot;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在这里插入图片描述
    ExecReScanExternal函数的执行流程如下所示,其实就是调用external_rescan函数,而该函数最终会调用external_stopscan函数。

    ExecReScanExternal
     | -- FileScanDesc fileScan = node->ess_ScanDesc
     | -- external_rescan
          | -- external_stopscan
    
    • 1
    • 2
    • 3
    • 4

    ExecSquelchExternalScan函数执行与ExecEndExternalScan相同,只是忽略了关闭错误。当外部数据源未用尽时(如LIMIT子句),此函数用于正常终止。Performs identically to ExecEndExternalScan except that closure errors are ignored. This function is called for normal termination when the external data source is NOT exhausted (such as for a LIMIT clause). ExecSquelchNode --> ExecSquelchExternalScan --> external_stopscan

    ExecEndExternalScan函数主义用于释放上述流程分配的任何存储。比如node->ss.psnode->ss.ps.ps_ResultTupleSlotnode->ss.ss_ScanTupleSlotnode->ess_ScanDesc、关闭relation和执行ExecEagerFreeExternalScan函数。

    ExecEndExternalScan/ExternalNext/ExecSquelchExternalScan
     | -- ExecEagerFreeExternalScan
           | -- external_endscan
    
    • 1
    • 2
    • 3

    INSERT

    向外部表INSERT流程主要包含如下三个部分external_insert_init、external_insert_init和external_insert_finish。这3个函数主要使用如下的结构体ExternalInsertDescData。

    CopyFrom/ExecInsert
     | -- external_insert_init
    CopyFrom/ExecInsert
     | -- external_insert
    CopyFrom/ExecEndPlan
     | -- CloseResultRelInfo 
           | -- external_insert_finish
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述
    ExternalInsertDesc external_insert_init(Relation rel)函数首先为ExternalInsertDesc申请内存空间,将ext_rel设置为形参Relation,如果是QD,则将ext_noop设置为true,如果是QE,则将ext_noop设置为false;如果使用的EXECUTE exttable,则将ext_uri设置为execute:extentry->command;否则就是gpfdist或custom,从extentry->urilocations获取segment自己的uri(int my_url = segindex % num_urls),设置到extInsertDesc->ext_uri;为ext_pstate、ext_tupDesc、ext_values、ext_nulls分配内存;如果fmtcode为custom,则使用parseCustomFormatString解析fmtopts(parseCustomFormatString(extentry->fmtopts,&custom_formatter_name,&custom_formatter_params)),否则使用copyFmtOpts = parseCopyFormatString(rel, extentry->fmtopts, extentry->fmtcode);使用BeginCopyToForeignTable(rel, copyFmtOpts)初始化extInsertDesc->ext_pstate,使用InitParseState(extInsertDesc->ext_pstate, rel, true, extentry->fmtcode, extInsertDesc->ext_uri, extentry->rejectlimit, (extentry->rejectlimittype == 'r'), extentry->logerrors, extentry->options)初始化ext_pstate成员;如果是custom external table,通过custom_formatter_name获取formatter函数oid,并将其设置到fs_custom_formatter_func,最后为fs_formatter创建FormatterData内存。

    Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup)函数将tuple格式化后写入外部数据源,和heap_insert不同支持有3处:wal是旁路的;事务信息是不care的;元组永远是向destination发送的(local file or remote target)。其主要流程是:打开外部数据源;根据格式将元组解构并格式化为外部源的数据格式;调用external_senddata函数发送数据。

    Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup){
    	TupleDesc	tupDesc = extInsertDesc->ext_tupDesc;
    	Datum	   *values = extInsertDesc->ext_values;
    	bool	   *nulls = extInsertDesc->ext_nulls;
    	CopyStateData *pstate = extInsertDesc->ext_pstate;
    	bool		customFormat = (extInsertDesc->ext_custom_formatter_func != NULL);
    
    	if (extInsertDesc->ext_noop) return InvalidOid;
    	/* Open our output file or output stream if not yet open */
    	if (!extInsertDesc->ext_file && !extInsertDesc->ext_noop) open_external_writable_source(extInsertDesc);
    
    	/* deconstruct the tuple and format it into text */
    	if (!customFormat){/* TEXT or CSV */
    		heap_deform_tuple(instup, tupDesc, values, nulls);
    		CopyOneRowTo(pstate, HeapTupleGetOid(instup), values, nulls);
    		CopySendEndOfRow(pstate);
    	}else{ /* custom format. convert tuple using user formatter */
    		FunctionCallInfoData fcinfo;
    		/* There is some redundancy between FormatterData and ExternalInsertDesc we may be able to consolidate data structures a little. */
    		FormatterData *formatter = extInsertDesc->ext_formatter_data;
    		/* per call formatter prep */
    		FunctionCallPrepareFormatter(&fcinfo,1,pstate,extInsertDesc->ext_custom_formatter_func,extInsertDesc->ext_custom_formatter_params,formatter,extInsertDesc->ext_rel,extInsertDesc->ext_tupDesc,pstate->out_functions,NULL);
    		/* Mark the correct record type in the passed tuple */
    		HeapTupleHeaderSetDatumLength(instup->t_data, instup->t_len);
    		HeapTupleHeaderSetTypeId(instup->t_data, tupDesc->tdtypeid);
    		HeapTupleHeaderSetTypMod(instup->t_data, tupDesc->tdtypmod);
    		fcinfo.arg[0] = HeapTupleGetDatum(instup);
    		fcinfo.argnull[0] = false;
    		Datum d = FunctionCallInvoke(&fcinfo);
    		MemoryContextReset(formatter->fmt_perrow_ctx);
    
    		/* We do not expect a null result */
    		if (fcinfo.isnull) elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
    
    		bytea	   *b = DatumGetByteaP(d);
    		CopyOneCustomRowTo(pstate, b);
    	}
    	
    	external_senddata(extInsertDesc->ext_file, pstate); /* Write the data into the external source */
    
    	/* Reset our buffer to start clean next round */
    	pstate->fe_msgbuf->len = 0;
    	pstate->fe_msgbuf->data[0] = '\0';
    
    	return HeapTupleGetOid(instup);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    void external_insert_finish(ExternalInsertDesc extInsertDesc)用于将缓冲区中remaining数据flush到文件中。

    void external_insert_finish(ExternalInsertDesc extInsertDesc){
    	/* Close the external source */
    	if (extInsertDesc->ext_file){
    		char	   *relname = RelationGetRelationName(extInsertDesc->ext_rel);
    		url_fflush(extInsertDesc->ext_file, extInsertDesc->ext_pstate);
    		url_fclose(extInsertDesc->ext_file, true, relname);
    	}
    	if (extInsertDesc->ext_formatter_data) pfree(extInsertDesc->ext_formatter_data);
    	pfree(extInsertDesc);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    #define ExtTableRelationId	6040
    CATALOG(pg_exttable,6040) BKI_WITHOUT_OIDS
    {
    	Oid		reloid;				/* refers to this relation's oid in pg_class  */
    	text	urilocation[1];		/* array of URI strings */
    	text	execlocation[1];	/* array of ON locations */
    	char	fmttype;			/* 't' (text) or 'c' (csv) */
    	text	fmtopts;			/* the data format options */
    	text	options[1];			/* the array of external table options */
    	text	command;			/* the command string to EXECUTE */
    	int32	rejectlimit;		/* error count reject limit per segment */
    	char	rejectlimittype;	/* 'r' (rows) or 'p' (percent) */
    	bool	logerrors;			/* 't' to log errors into file */
    	int32	encoding;			/* character encoding of this external table */
    	bool	writable;			/* 't' if writable, 'f' if readable */
    } FormData_pg_exttable;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    GPHDFS
    gpAux/extensions/gphdfs/gphdfsprotocol.c

  • 相关阅读:
    [阅读笔记18][CITING]LARGE LANGUAGE MODELS CREATE CURRICULUM FOR INSTRUCTION TUNING
    没有任何销售经验怎么管理销售团队?
    【Spring框架学习3】Spring Bean的作用域 及 生命周期
    猿创征文|【Vue五分钟】 Vuex状态管理总结
    pandas定位选取某列某指标最大值所在的行记录,比如月底
    用于LLM的Chain-of-Symbol Prompting(符号链提示、CoS)
    从0到1实现python基于RPC协议的接口自动化测试
    Linux命令200例:compress用来对文件进行压缩和解压缩
    发力“幸福感”消费,荟语酒店如何引领创新体验?
    mysql大表数据删除
  • 原文地址:https://blog.csdn.net/asmartkiller/article/details/127814612