• Greenplum数据库外部协议——GPHDFS实现协议


    Greenplum Database(下面简称GPDB)主要提供了两种方式:PXF和GPHDFS。虽然二者都利用了GPDB的外部表功能,但是前者需要额外安装部署PXF服务器进程,在复杂的IT环境中流程繁琐、极易出错,终端用户体验不佳。GPHDFS通过增加一种访问HDFS的外部表协议,让各个计算节点直连HDFS集群,不通过任何中间节点或者系统。GPHDFS实现协议定义在gpAux/extensions/gphdfs文件中,其中包括gphdfsprotocol.c、gphdfsformatter.c、gphdfs.sql和src文件夹。

    CREATE OR REPLACE FUNCTION pg_catalog.gphdfs_export(record) RETURNS bytea
    AS '$libdir/gphdfs.so', 'gphdfsformatter_export'
    LANGUAGE C STABLE;
    CREATE OR REPLACE FUNCTION pg_catalog.gphdfs_import() RETURNS record
    AS '$libdir/gphdfs.so', 'gphdfsformatter_import'
    LANGUAGE C STABLE;
    
    CREATE OR REPLACE FUNCTION pg_catalog.gphdfs_read() RETURNS integer
    AS '$libdir/gphdfs.so', 'gphdfsprotocol_import'
    LANGUAGE C STABLE;
    CREATE OR REPLACE FUNCTION pg_catalog.gphdfs_write() RETURNS integer
    AS '$libdir/gphdfs.so', 'gphdfsprotocol_export'
    LANGUAGE C STABLE;
    CREATE OR REPLACE FUNCTION pg_catalog.gphdfs_validate() RETURNS void
    AS '$libdir/gphdfs.so', 'gphdfsprotocol_validate_urls'
    LANGUAGE C STABLE;
    CREATE TRUSTED PROTOCOL gphdfs (writefunc     = gphdfs_write, readfunc      = gphdfs_read, validatorfunc = gphdfs_validate);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    read、write和validate

    gphdfs_read函数定义为gphdfsprotocol_import函数。gphdfs_write函数定义为gphdfsprotocol_export函数。gphdfs_validate函数定义为gphdfsprotocol_validate_urls函数。gphdfs_import_t句柄结构体包含成员importFile(代表URL位置和类型)、importDone。

    gphdfsprotocol_validate_urls

    gphdfsprotocol_validate_urls函数仅运行使用单个url,并检查该url是否合法。

    /* Validate the URLs */
    Datum gphdfsprotocol_validate_urls(PG_FUNCTION_ARGS){	
    	if (!CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo)) /* Must be called via the external table format manager */
    		elog(ERROR, "cannot execute gphdfsprotocol_validate_urls outside protocol manager");	
    	if (EXTPROTOCOL_VALIDATOR_GET_NUM_URLS(fcinfo) != 1) /* Condition 1: there must be only ONE url. */
                ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("number of URLs must be one")));	
    	char* url_user = EXTPROTOCOL_VALIDATOR_GET_NTH_URL(fcinfo, 1); /* Check for illegal characters. */
    	if (hasIllegalCharacters(url_user)) {
    		ereport(ERROR, (0, errmsg("illegal char in url")));
    	}
    
    	PG_RETURN_VOID();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    gphdfsprotocol_import

    typedef struct {
    	URL_FILE *importFile;
    	bool      importDone;
    } gphdfs_import_t;
    
    • 1
    • 2
    • 3
    • 4

    gphdfsprotocol_import函数首先获取fcinfo中关联的gphdfs_import_t结构体,首先判定是否时最后一次调用(这时就需要调用url_fclose关闭myData->importFile),否则初始化gphdfs_import_t结构体;调用url_execute_fread读取数据;如果读取数据量为零,则调用url_fclose。

    Datum gphdfsprotocol_import(PG_FUNCTION_ARGS) {	
    	if (!CALLED_AS_EXTPROTOCOL(fcinfo)) /* Must be called via the external table format manager */
    		elog(ERROR, "cannot execute gphdfsprotocol_import outside protocol manager");
    	/* Get our internal description of the protocol */
    	gphdfs_import_t *myData = (gphdfs_import_t*) EXTPROTOCOL_GET_USER_CTX(fcinfo);
    
    	/* =======================================================================
    	 *                            DO LAST CALL
    	 *                            Nothing to be done if it has already been closed
    	 * ======================================================================= */
    	if (EXTPROTOCOL_IS_LAST_CALL(fcinfo)){
    		if (myData != NULL && !myData->importDone)
    			url_fclose(myData->importFile, false, "gphdfs protocol");
    		PG_RETURN_INT32(0);
    	}
    
    	/* =======================================================================
    	 *                            DO OPEN
    	 * ======================================================================= */
    	if (myData == NULL){
    		myData = palloc(sizeof(gphdfs_import_t));
    		myData->importFile = gphdfs_fopen(fcinfo, false);
    		myData->importDone = false;
    		EXTPROTOCOL_SET_USER_CTX(fcinfo, myData);
    	}
    
    	/* =======================================================================
    	 *                            DO THE IMPORT
    	 * ======================================================================= */
    	char     *data 	= EXTPROTOCOL_GET_DATABUF(fcinfo);
    	int datlen 	= EXTPROTOCOL_GET_DATALEN(fcinfo);
    
    	if (datlen > 0 && !myData->importDone)
    		size_t nread = url_execute_fread(data, datlen, myData->importFile, NULL);
    
    	/* =======================================================================
    	 *                            DO CLOSE
    	 *                            close early to raise error early
    	 * ======================================================================= */
    	if (nread == 0){
    		myData->importDone = true;
    		url_fclose(myData->importFile, true, "gphdfs protocol");
    	}
    
    	PG_RETURN_INT32((int)nread);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    url_execute_fread函数从管道里读取数据。

    // src/backend/access/external/url_execute.c
    size_t url_execute_fread(void *ptr, size_t size, URL_FILE *file, CopyState pstate) {
    	URL_EXECUTE_FILE *efile = (URL_EXECUTE_FILE *) file;
    	return piperead(efile->handle->pipes[EXEC_DATA_P], ptr, size);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    gphdfs_fopen

    gphdfsprotocol_export

    gphdfsprotocol_export函数由GPDB向HDFS中写出数据,获取fcinfo中关联的gphdfs_import_t结构体,首先判定是否时最后一次调用(这时就需要调用url_fclose关闭URL_File),否则调用gphdfs_fopen函数;向URL_File写入外部表的schema信息(url_execute_fwrite(schema_head->data, schema_head->len, myData, NULL),url_execute_fwrite(schema_data->data, schema_data->len, myData, NULL));调用url_execute_fwrite(data, datlen, myData, NULL)写出数据。

    /* Export data out of GPDB. */
    Datum gphdfsprotocol_export(PG_FUNCTION_ARGS) {
    	size_t    wrote = 0;
    	static char	ebuf[512] = {0};
    	int	    	ebuflen = 512;
    	
    	if (!CALLED_AS_EXTPROTOCOL(fcinfo)) /* Must be called via the external table format manager */
    		elog(ERROR, "cannot execute gphdfsprotocol_export outside protocol manager");
    	/* Get our internal description of the protocol */
    	URL_FILE *myData = (URL_FILE *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
    	/* =======================================================================
    	 *                            DO CLOSE
    	 * ======================================================================= */
    	if (EXTPROTOCOL_IS_LAST_CALL(fcinfo)){
    		if (myData) url_fclose(myData, true, "gphdfs protocol");
    		PG_RETURN_INT32(0);
    	}
    
    	/* =======================================================================
    	 *                            DO OPEN
    	 * ======================================================================= */
    	if (myData == NULL){
    		myData = gphdfs_fopen(fcinfo, true);
    		EXTPROTOCOL_SET_USER_CTX(fcinfo, myData);
    		
    		StringInfo schema_data = makeStringInfo(); /* add schema info to pipe */
    		Relation relation = FORMATTER_GET_RELATION(fcinfo);
    		ExtTableEntry *exttbl = GetExtTableEntry(relation->rd_id);
    		if (fmttype_is_avro(exttbl->fmtcode) || fmttype_is_parquet(exttbl->fmtcode) ){
    			int relNameLen = strlen(relation->rd_rel->relname.data);
    			appendIntToBuffer(schema_data, relNameLen);
    			appendBinaryStringInfo(schema_data, relation->rd_rel->relname.data, relNameLen);
    			int ncolumns = relation->rd_att->natts;
    			appendIntToBuffer(schema_data, ncolumns);	
    			for (int i = 0; i< ncolumns; i++){
    				Oid type = relation->rd_att->attrs[i]->atttypid;
    				/* add attname,atttypid,attnotnull,attndims to schema_data filed */
    				int attNameLen = strlen(relation->rd_att->attrs[i]->attname.data);
    				appendIntToBuffer(schema_data, attNameLen);
    				appendBinaryStringInfo(schema_data, relation->rd_att->attrs[i]->attname.data, attNameLen);
    				appendIntToBuffer(schema_data, type);
    				bool notNull = relation->rd_att->attrs[i]->attnotnull;
    				appendInt1ToBuffer(schema_data, notNull?1:0);
    				appendIntToBuffer(schema_data, relation->rd_att->attrs[i]->attndims);			
    				char delim = 0; /* add type delimiter, for udt, it can be anychar */
    				int16 typlen;
    				bool typbyval;
    				char typalien;
    				Oid typioparam;
    				Oid func;
    				get_type_io_data(type, IOFunc_input, &typlen, &typbyval, &typalien, &delim, &typioparam, &func);
    				appendInt1ToBuffer(schema_data, delim);
    			}
    			StringInfo schema_head = makeStringInfo();
    			appendIntToBuffer(schema_head, schema_data->len + 2);
    			appendInt2ToBuffer(schema_head, 2);
    			url_execute_fwrite(schema_head->data, schema_head->len, myData, NULL);
    			url_execute_fwrite(schema_data->data, schema_data->len, myData, NULL);
    			pfree(schema_head->data);
    			pfree(schema_data->data);
    		}
    	}
    
    
    	/* =======================================================================
    	 *                            DO THE EXPORT
    	 * ======================================================================= */
    	char     *data   = EXTPROTOCOL_GET_DATABUF(fcinfo);
    	int datlen = EXTPROTOCOL_GET_DATALEN(fcinfo);
    	if (datlen > 0) wrote = url_execute_fwrite(data, datlen, myData, NULL);
    
    	if (url_ferror(myData, wrote, ebuf, ebuflen)){
    		ereport(ERROR,(errcode_for_file_access(),strlen(ebuf) > 0 ? errmsg("could not write to external resource:\n%s",ebuf) :errmsg("could not write to external resource: %m")));
    	}
    
    	PG_RETURN_INT32((int)wrote);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    export和import

    gphdfs_export和gphdfs_import定义在gphdfsformatter.c文件中

    Datum gphdfsformatter_export(PG_FUNCTION_ARGS) {
    	HeapTupleHeader		rec	= PG_GETARG_HEAPTUPLEHEADER(0);
    	HeapTupleData		tuple;
    	int                  nullBitLen;
    	
    	if (!CALLED_AS_FORMATTER(fcinfo)) /* Must be called via the external table format manager */
    		ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("cannot execute gphdfsformatter_export outside format manager")));
    
    	TupleDesc tupdesc = FORMATTER_GET_TUPDESC(fcinfo);	
    	AttrNumber ncolumns = tupdesc->natts;
    	/* Get our internal description of the formatter */
    	format_t           *myData = (format_t *) FORMATTER_GET_USER_CTX(fcinfo);	
    	if (myData == NULL){ /* Initialize the context structure */
    		myData          = palloc(sizeof(format_t));
    		myData->values  = palloc(sizeof(Datum) * ncolumns);
    		myData->outval  = palloc(sizeof(char*) * ncolumns);
    		myData->nulls   = palloc(sizeof(bool) * ncolumns);
    		myData->outlen  = palloc(sizeof(int) * ncolumns);
    		myData->outpadlen  = palloc(sizeof(int) * ncolumns);
    		myData->io_functions  = palloc(sizeof(FmgrInfo) * ncolumns);
    		myData->export_format_tuple = makeStringInfo();	
    		for(i=0; i<ncolumns; i++){ /* setup the text/binary input function */
    			Oid type = tupdesc->attrs[i]->atttypid;
    			bool isvarlena;
    			Oid functionId;
    			/* External table do not support dropped columns; error out now */
    			if (tupdesc->attrs[i]->attisdropped) ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("cannot handle external table with dropped columns")));			
    			if (isBinaryFormatType(type)) /* Get the text/binary "send" function */
    				getTypeBinaryOutputInfo(type, &(functionId), &isvarlena);
    			else
    				getTypeOutputInfo(type, &(functionId), &isvarlena);
    			fmgr_info(functionId, &(myData->io_functions[i]));
    		}
    		FORMATTER_SET_USER_CTX(fcinfo, myData);
    	}
    
    	MemoryContext per_row_ctx = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo);
    	MemoryContext oldcontext  = MemoryContextSwitchTo(per_row_ctx);
    
    	/* break the input tuple into fields */
    	tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
    	ItemPointerSetInvalid(&(tuple.t_self));
    	tuple.t_data = rec;
    	heap_deform_tuple(&tuple, tupdesc, myData->values, myData->nulls); 
    
    	/* Starting from here. The conversion to bytes is  exactly the same as GPDBWritable.toBytes() */
    	/* Now, compute the total payload and header length header = total length (4 byte), Version (2 byte), #col (2 byte) col type array = #col * 1 byte null bit array = ceil(#col/8) */
    	int datlen = sizeof(int32)+sizeof(int16)+sizeof(int16);
    	datlen += ncolumns;
    	datlen += getNullByteArraySize(ncolumns);
    
    	/* We need to know the total length of the tuple. So, we've to transformed each column so that we know the transformed size and the alignment padding. Since we're computing the conversion function, we use per-row memory context inside the loop. */
    	for (AttrNumber i = 0; i < ncolumns; i++){
    		Oid   type       = tupdesc->attrs[i]->atttypid;
    		Datum val        = myData->values[i];
    		bool  nul        = myData->nulls[i];
    		FmgrInfo *iofunc = &(myData->io_functions[i]);
    		int alignpadlen   = 0;
    		if (nul) myData->outlen[i]=0;
    		else{
    			if (isBinaryFormatType(type)){
    				bytea* tmpval = SendFunctionCall(iofunc, val);;
    				/* NOTE: exclude the header length */
    				myData->outval[i] = VARDATA(tmpval);
    				myData->outlen[i] = VARSIZE_ANY_EXHDR(tmpval);
    			}else{/* NOTE: include the "\0" in the length for text format */
    				myData->outval[i] = OutputFunctionCall(iofunc, val);
    				myData->outlen[i] = strlen(myData->outval[i])+1;
    			}
    			/* For variable length type, we added a 4 byte length header. So, it'll be aligned int4. For fixed length type, we'll use the type aligment. */
    			if (isVariableLength(type)){
    				alignpadlen = INTALIGN(datlen) - datlen;
    				datlen += sizeof(int32);
    			}else alignpadlen = att_align_nominal(datlen, tupdesc->attrs[i]->attalign) - datlen;
    			myData->outpadlen[i] = alignpadlen;
    			datlen += alignpadlen;
    		}
    		datlen += myData->outlen[i];
    	}
    
    	/* Add the final alignment padding for the next record */
    	int endpadding = DOUBLEALIGN(datlen) - datlen;
    	datlen += endpadding;
    	/* Now, we're done with per-row computation. Switch back to the old memory context */
    	MemoryContextSwitchTo(oldcontext);
    
    	/* Resize buffer, if needed The new size includes the 4 byte VARHDSZ, the entire payload and 1 more byte for '\0' that StringInfo always ends with. */
    	if (myData->export_format_tuple->maxlen < VARHDRSZ+datlen+1){
    		pfree(myData->export_format_tuple->data);
    		initStringInfoOfSize(myData->export_format_tuple, VARHDRSZ+datlen+1);
    	}
    
    	/* Reset the export format buffer */
    	resetStringInfo(myData->export_format_tuple);
    	/* Reserve VARHDRSZ bytes for the bytea length word */
    	appendStringInfoFill(myData->export_format_tuple, VARHDRSZ, '\0');
    	/* Construct the packet header */
    	appendIntToBuffer(myData->export_format_tuple, datlen);
    	appendInt2ToBuffer(myData->export_format_tuple, GPDBWRITABLE_VERSION);
    	appendInt2ToBuffer(myData->export_format_tuple, ncolumns);
    	/* Write col type */
    	for(i=0; i<ncolumns; i++)
    		appendInt1ToBuffer(myData->export_format_tuple,getJavaEnumOrdinal(tupdesc->attrs[i]->atttypid));
    
    	/* Write Nullness */
    	bits8               *nullBit = boolArrayToByteArray(myData->nulls, ncolumns, &nullBitLen);
    	appendBinaryStringInfo(myData->export_format_tuple, nullBit, nullBitLen);
    
    	/* Column Value */
    	for(i = 0; i < ncolumns; i++){
    		if (!myData->nulls[i]){
    			/* Pad the alignment byte first */
    			appendStringInfoFill(myData->export_format_tuple, myData->outpadlen[i], '\0');
    			/* For variable length type, we added a 4 byte length header */
    			if (isVariableLength(tupdesc->attrs[i]->atttypid))
    				appendIntToBuffer(myData->export_format_tuple, myData->outlen[i]);
    			/* Now, write the actual column value */
    			appendBinaryStringInfo(myData->export_format_tuple,myData->outval[i], myData->outlen[i]);
    		}
    	}
    	/* End padding */
    	appendStringInfoFill(myData->export_format_tuple, endpadding, '\0');
    	
    	Insist(myData->export_format_tuple->len == datlen + VARHDRSZ);
    	SET_VARSIZE(myData->export_format_tuple->data, datlen + VARHDRSZ);
    	PG_RETURN_BYTEA_P(myData->export_format_tuple->data);
    }
    
    Datum gphdfsformatter_import(PG_FUNCTION_ARGS) {
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_FORMATTER(fcinfo)) ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("cannot execute gphdfsformatter_import outside format manager")));
    
    	TupleDesc tupdesc = FORMATTER_GET_TUPDESC(fcinfo);	
    	AttrNumber ncolumns = tupdesc->natts;
    	format_t           *myData = (format_t *) FORMATTER_GET_USER_CTX(fcinfo); /* Get our internal description of the formatter */
    	/* Initialize the context structure */
    	if (myData == NULL){
    		myData          = palloc(sizeof(format_t));
    		myData->values  = palloc(sizeof(Datum) * ncolumns);
    		myData->nulls   = palloc(sizeof(bool) * ncolumns);
    		myData->outlen  = palloc(sizeof(int) * ncolumns);
    		myData->typioparams = (Oid *) palloc(ncolumns * sizeof(Oid));
    		myData->io_functions = palloc(sizeof(FmgrInfo) * ncolumns);
    		for (AttrNumber i = 0; i < ncolumns; i++){
    			Oid type = tupdesc->attrs[i]->atttypid;
    			Oid functionId;
    			/* External table do not support dropped columns; error out now */
    			if (tupdesc->attrs[i]->attisdropped) ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("cannot handle external table with dropped columns")));
    			/* Get the text/binary "receive" function */
    			if (isBinaryFormatType(type)) getTypeBinaryInputInfo(type, &(functionId), &myData->typioparams[i]);
    			else getTypeInputInfo(type, &(functionId), &myData->typioparams[i]);
    			fmgr_info(functionId, &(myData->io_functions[i]));
    		}
    		FORMATTER_SET_USER_CTX(fcinfo, myData);
    	}
    
    	/* get our input data buf and number of valid bytes in it */
    	char               *data_buf = FORMATTER_GET_DATABUF(fcinfo);
    	int data_len = FORMATTER_GET_DATALEN(fcinfo);
    	int data_cur = FORMATTER_GET_DATACURSOR(fcinfo);
    
    	/* =======================================================================
    	 *                            MAIN FORMATTING CODE
    	 *
    	 *
    	 * ======================================================================= */
    	/* Get the first 4 byte; That's the length of the entire packet */
    	int remaining = data_len - data_cur;
    	int bufidx    = data_cur;
    	/* NOTE: Unexpected EOF Error Handling
    	 * The first time we noticed an unexpected EOF, we'll set the datacursor
    	 * forward and then raise the error. But then, the framework will still
    	 * call the formatter the function again. Now, the formatter function will
    	 * be provided with a zero length data buffer. In this case, we should not
    	 * raise an error again, but simply returns "NEED MORE DATA". This is how
    	 * the formatter framework works.
    	 */
    	if (remaining == 0 && FORMATTER_GET_SAW_EOF(fcinfo))
    		FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA);
    
    	if (remaining < 4){
    		if (FORMATTER_GET_SAW_EOF(fcinfo)){
    			FORMATTER_SET_BAD_ROW_DATA(fcinfo, data_buf+data_cur, remaining);
    			ereport(ERROR,(errcode(ERRCODE_DATA_EXCEPTION),errmsg("unexpected end of file")));
    		}
    		FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA);
    	}
    	int tuplelen = readIntFromBuffer(data_buf, &bufidx);
    
    	/* Now, make sure we've received the entire tuple */
    	if (remaining < tuplelen){
    		if (FORMATTER_GET_SAW_EOF(fcinfo)){
    			FORMATTER_SET_BAD_ROW_DATA(fcinfo, data_buf+data_cur, remaining);
    			ereport(ERROR,(errcode(ERRCODE_DATA_EXCEPTION),errmsg("unexpected end of file")));
    		}
    		FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA);
    	}
    	/* We got here. So, we've the ENTIRE tuple in the buffer */
    	FORMATTER_SET_BAD_ROW_DATA(fcinfo, data_buf+data_cur, tuplelen);
    	/* start clean */
    	MemSet(myData->values, 0,    ncolumns * sizeof(Datum));
    	MemSet(myData->nulls,  true, ncolumns * sizeof(bool));
    	MemoryContext per_row_ctx = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo);
    	MemoryContext oldcontext  = MemoryContextSwitchTo(per_row_ctx);
    	/* extract the version and column count */
    	int16 version = readInt2FromBuffer(data_buf, &bufidx);
    	int16 colcnt  = readInt2FromBuffer(data_buf, &bufidx);
    	if (version != GPDBWRITABLE_VERSION) ereport(ERROR,(errcode(ERRCODE_DATA_EXCEPTION),errmsg("cannot import data version %d", version)));
    	if (colcnt != ncolumns) ereport(ERROR,(errcode(ERRCODE_DATA_EXCEPTION),errmsg("input data column count (%d) did not match the external table definition",colcnt)));
    
    	/* Extract Column Type and check against External Table definition */
    	for(i=0; i< ncolumns; i++){
    		Oid   input_type = 0;
    		Oid   defined_type = tupdesc->attrs[i]->atttypid;
    		int8  enumType   = readInt1FromBuffer(data_buf, &bufidx);
    		/* Convert enumType to type oid */
    		input_type = getTypeOidFromJavaEnumOrdinal(enumType);
    		if ((isBinaryFormatType(defined_type) || isBinaryFormatType(input_type)) && input_type != defined_type){
    			char	   *intype;
    			intype = get_type_name(input_type);
    			Insist(intype);
    			ereport(ERROR,(errcode(ERRCODE_DATA_EXCEPTION), errmsg("input data column %d of type \"%s\" did not match the external table definition",i+1, intype)));
    		}
    	}
    	/* Extract null bit array */
    	{
    		int nullByteLen = getNullByteArraySize(ncolumns);
    		bits8* nullByteArray = (bits8*)(data_buf+bufidx);
    		bufidx += nullByteLen;
    		byteArrayToBoolArray(nullByteArray, nullByteLen, &myData->nulls, ncolumns);
    	}
    
    	/* extract column value */
    	for(i=0; i< ncolumns; i++){
    		if (!myData->nulls[i]){
    			FmgrInfo *iofunc = &(myData->io_functions[i]);
    			/* Skip the aligment padding For fixed length type, use the type aligment. For var length type, always align int4 because we'reading a length header. */
    			if (isVariableLength(tupdesc->attrs[i]->atttypid)) bufidx = INTALIGN(bufidx);
    			else bufidx = att_align_nominal(bufidx, tupdesc->attrs[i]->attalign);
    			/* For fixed length type, we can use the type length attribute. For variable length type, we'll get the payload length from the first 4 byte. */
    			if (isVariableLength(tupdesc->attrs[i]->atttypid)) myData->outlen[i] = readIntFromBuffer(data_buf, &bufidx);
    			else myData->outlen[i] = tupdesc->attrs[i]->attlen;
    
    			if (isBinaryFormatType(tupdesc->attrs[i]->atttypid)){
    				StringInfoData tmpbuf;
    				tmpbuf.data   = data_buf+bufidx;
    				tmpbuf.maxlen = myData->outlen[i];
    				tmpbuf.len    = myData->outlen[i];
    				tmpbuf.cursor = 0;
    				myData->values[i] = ReceiveFunctionCall(iofunc,&tmpbuf,myData->typioparams[i],tupdesc->attrs[i]->atttypmod);
    			}else{
    				myData->values[i] = InputFunctionCall(iofunc,data_buf+bufidx,myData->typioparams[i],tupdesc->attrs[i]->atttypmod);
    			}
    			bufidx += myData->outlen[i];
    		}
    	}
    	bufidx = DOUBLEALIGN(bufidx);
    	if(data_cur + tuplelen != bufidx) ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("Tuplelen != bufidx: %d:%d:%d", tuplelen, bufidx, data_cur)));
    
    	data_cur += tuplelen;
    	MemoryContextSwitchTo(oldcontext);
    	FORMATTER_SET_DATACURSOR(fcinfo, data_cur);
    	HeapTuple tuple = heap_form_tuple(tupdesc, myData->values, myData->nulls);
    	FORMATTER_SET_TUPLE(fcinfo, tuple);
    	FORMATTER_RETURN_TUPLE(tuple);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267

    利用execute extable实现

    gphdfs_fopen

    // src/backend/access/external/url_execute.c
    size_t url_execute_fread(void *ptr, size_t size, URL_FILE *file, CopyState pstate) {
    	URL_EXECUTE_FILE *efile = (URL_EXECUTE_FILE *) file;
    	return piperead(efile->handle->pipes[EXEC_DATA_P], ptr, size);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    size_t url_execute_fwrite(void *ptr, size_t size, URL_FILE *file, CopyState pstate){
        URL_EXECUTE_FILE *efile = (URL_EXECUTE_FILE *) file;
        int fd = efile->handle->pipes[EXEC_DATA_P];
        size_t offset = 0;
        const char* p = (const char* ) ptr;   
        while(size > offset){ /* ensure all data in buffer is send out to pipe*/
            size_t n = pipewrite(fd,p,size - offset);
            if(n == -1) return -1;
            if(n == 0) break;
            offset += n;
            p = (const char*)ptr + offset;
        }
        if(offset < size) elog(WARNING,"partial write, expected %lu, written %lu", size, offset);
    
        return offset;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
  • 相关阅读:
    解决java.lang.ArrayIndexOutOfBoundsException: Index x out of bounds for length y
    好看又炫酷的网页特效例子收集
    基于PSO算法的功率角摆动曲线优化研究(Matlab代码实现)
    Keeplived安装部署(单机&双机)
    Django
    4.4 Go语言中的单元测试
    15:00面试,15:08就出来了,问的问题有点变态。。。
    有效的括号(leetcode 20)
    前端js手写面试题汇总(二)
    线上验证真的就是点点点吗?
  • 原文地址:https://blog.csdn.net/asmartkiller/article/details/127825108