使用libcurl库创建的资源(主要是curl 句柄和headers列表)需要在出错时主动去清理,所以greenplum利用了resource owner机制(当释放ResourceOwner时,自定义的回调函数可以用来清理libcurl库创建的资源)。greenplum使用curlhandle_t结构体将curl 句柄和headers列表等将curl相关资源封装在一起,并利用open_curl_handles双向链表来组织当前正在使用的libcurl库创建的资源。

libcurl库创建的资源使用双向链表进行组织,那么创建销毁curlhandle_t就需要对应的函数来操作该双向链表,其代码如下所示(链表操作非常简单,这里就不赘述)。需要知道的是curlhandle_t是在TopMemoryContext内存上下文中申请的,而不是在子内存上下文中;owner指向CurrentResourceOwner。
static curlhandle_t *open_curl_handles; /* Linked list of open curl handles. These are allocated in TopMemoryContext, and tracked by resource owners. */
static curlhandle_t *create_curlhandle(void) {
curlhandle_t *h = MemoryContextAlloc(TopMemoryContext, sizeof(curlhandle_t));
h->handle = NULL;
h->x_httpheader = NULL;
h->in_multi_handle = false;
h->owner = CurrentResourceOwner;
h->prev = NULL;
h->next = open_curl_handles;
if (open_curl_handles) open_curl_handles->prev = h;
open_curl_handles = h;
return h;
}
static void destroy_curlhandle(curlhandle_t *h) {
if (h->prev) h->prev->next = h->next; /* unlink from linked list first */
else open_curl_handles = open_curl_handles->next;
if (h->next) h->next->prev = h->prev;
if (h->x_httpheader) { // 释放httpheader
curl_slist_free_all(h->x_httpheader);
h->x_httpheader = NULL;
}
if (h->handle) {
if (h->in_multi_handle) { /* If this handle was registered in the multi-handle, remove it */
CURLMcode e = curl_multi_remove_handle(multi_handle, h->handle);
if (CURLM_OK != e) elog(LOG, "internal error curl_multi_remove_handle (%d - %s)", e, curl_easy_strerror(e));
h->in_multi_handle = false;
}
curl_easy_cleanup(h->handle); /* cleanup */
h->handle = NULL;
}
pfree(h);
}
ResourceOwner自定义的回调函数用于清理open_curl_handles链表上属于CurrentResourceOwner的libcurl资源,链表操作也很简单,这里就不再赘述。
/* Close any open curl handles on abort. Note that this only releases the low-level curl objects, in the curlhandle_t struct. The UTL_CURL_FILE struct itself is allocated in a memory context, and will go away with the context. */
static void url_curl_abort_callback(ResourceReleasePhase phase, bool isCommit, bool isTopLevel, void *arg) {
curlhandle_t *curr; curlhandle_t *next;
if (phase != RESOURCE_RELEASE_AFTER_LOCKS) return;
next = open_curl_handles;
while (next){
curr = next; next = curr->next;
if (curr->owner == CurrentResourceOwner) {
if (isCommit) elog(LOG, "url_curl reference leak: %p still referenced", curr);
destroy_curlhandle(curr);
}
}
}
url_curl_fopen函数用于初始化libcurl历程,并建立和服务端的连接。其主要流程如下:只注册一次回调函数url_curl_abort_callback;对url输入参数进行校验【是否是ipv6、是否使用SSL、替换gpfdist为httpreplace gpfdist:// with http:// or gpfdists:// with https:// by overriding 'dist' with 'http'】;申请URL_CURL_FILE结构体【设置类型为CFTYPE_CURL,确定是否需要写数据(利用POST)、调用create_curlhandle创建curl_handles】;调用curl_easy_init函数创建curl句柄,调用CURL_EASY_SETOPT设置属性【设置header数据获取回调函数header_callback、设置data数据获取回调函数write_callback】;设置httpheader【对于写出数据,httpheader需要额外设置 X-GP-PROTO为0、X-GP-SEQ为1、Content-Type为text/xml;对于读取数据,httpheader需要额外设置X-GP-PROTO为1和数据格式位置相关参数】;为输入输出数据设置缓冲区;最后检查连接情况【start the fetch if we’re SELECTing (GET request), or write an empty message if we’re INSERTing (POST request)】。
static bool url_curl_resowner_callback_registered;
URL_FILE *url_curl_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pstate) {
int e;
curl_Error_Buffer[0] = '\0'; /* Reset curl_Error_Buffer */
if (!url_curl_resowner_callback_registered){ // 通过静态变量来实现只注册一次回调函数url_curl_abort_callback
RegisterResourceReleaseCallback(url_curl_abort_callback, NULL);
url_curl_resowner_callback_registered = true;
}
bool is_ipv6 = url_has_ipv6_format(url);
int sz = make_url(url, NULL, is_ipv6); // make sure that the URL string contains a numerical IP address
if (sz < 0) elog(ERROR, "illegal URL: %s", url);
URL_CURL_FILE * file = (URL_CURL_FILE *) palloc0(sizeof(URL_CURL_FILE));
file->common.type = CFTYPE_CURL;
file->common.url = pstrdup(url);
file->for_write = forwrite;
file->curl = create_curlhandle();
if (!IS_GPFDISTS_URI(url)){
file->curl_url = (char *) palloc0(sz + 1);
make_url(file->common.url, file->curl_url, is_ipv6);
/* We need to call is_url_ipv6 for the case where inside make_url function a domain name was transformed to an IPv6 address. */
if ( !is_ipv6 ) is_ipv6 = url_has_ipv6_format(file->curl_url);
}else{
/* SSL support addition negotiation will fail if verifyhost is on, so we *must* not resolve the hostname in this case. I have decided to not resolve it anyway and let libcurl do the work. */
file->curl_url = pstrdup(file->common.url);
char* tmp_resolved = palloc0(sz + 1);
make_url(file->common.url, tmp_resolved, is_ipv6);
/* keep the same ipv6 logic here */
if ( !is_ipv6 ) is_ipv6 = url_has_ipv6_format(tmp_resolved);
}
if (IS_GPFDIST_URI(file->curl_url) || IS_GPFDISTS_URI(file->curl_url)) {
/* replace gpfdist:// with http:// or gpfdists:// with https:// by overriding 'dist' with 'http' */
unsigned int tmp_len = strlen(file->curl_url) + 1;
memmove(file->curl_url, file->curl_url + 3, tmp_len - 3);
memcpy(file->curl_url, "http", 4);
pstate->header_line = 0;
}
/* initialize a curl session and get a libcurl handle for it */
if (! (file->curl->handle = curl_easy_init())) elog(ERROR, "internal error: curl_easy_init failed");
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_URL, file->curl_url);
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_VERBOSE, 0L /* FALSE */);
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_HEADERFUNCTION, header_callback); /* set callback for each header received from server */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_WRITEHEADER, file); /* 'file' is the application variable that gets passed to header_callback */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_WRITEFUNCTION, write_callback); /* set callback for each data block arriving from server to be written to application */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_WRITEDATA, file); /* 'file' is the application variable that gets passed to write_callback */
int ip_mode;
if ( !is_ipv6 ) ip_mode = CURL_IPRESOLVE_V4;
else ip_mode = CURL_IPRESOLVE_V6;
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_IPRESOLVE, ip_mode);
if (IS_HTTP_URI(url)) { /* support multihomed http use cases. see MPP-11874 */
char domain[HOST_NAME_SIZE] = {0};
extract_http_domain(file->common.url, domain, HOST_NAME_SIZE);
set_httpheader(file, "Host", domain);
}
set_httpheader(file, "X-GP-XID", ev->GP_XID);
set_httpheader(file, "X-GP-CID", ev->GP_CID);
set_httpheader(file, "X-GP-SN", ev->GP_SN);
set_httpheader(file, "X-GP-SEGMENT-ID", ev->GP_SEGMENT_ID);
set_httpheader(file, "X-GP-SEGMENT-COUNT", ev->GP_SEGMENT_COUNT);
set_httpheader(file, "X-GP-LINE-DELIM-STR", ev->GP_LINE_DELIM_STR);
set_httpheader(file, "X-GP-LINE-DELIM-LENGTH", ev->GP_LINE_DELIM_LENGTH);
if (forwrite) {
// TIMEOUT for POST only, GET is single HTTP request, probablity take long time.
elog(LOG, "gpfdist_retry_timeout = %d", gpfdist_retry_timeout);
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_TIMEOUT, (long)gpfdist_retry_timeout);
file->seq_number = 1; /*init sequence number*/
set_httpheader(file, "X-GP-PROTO", "0"); /* write specific headers */
set_httpheader(file, "X-GP-SEQ", "1");
set_httpheader(file, "Content-Type", "text/xml");
}else{
/* read specific - (TODO: unclear why some of these are needed) */
set_httpheader(file, "X-GP-PROTO", "1");
set_httpheader(file, "X-GP-MASTER_HOST", ev->GP_MASTER_HOST);
set_httpheader(file, "X-GP-MASTER_PORT", ev->GP_MASTER_PORT);
set_httpheader(file, "X-GP-CSVOPT", ev->GP_CSVOPT);
set_httpheader(file, "X-GP_SEG_PG_CONF", ev->GP_SEG_PG_CONF);
set_httpheader(file, "X-GP_SEG_DATADIR", ev->GP_SEG_DATADIR);
set_httpheader(file, "X-GP-DATABASE", ev->GP_DATABASE);
set_httpheader(file, "X-GP-USER", ev->GP_USER);
set_httpheader(file, "X-GP-SEG-PORT", ev->GP_SEG_PORT);
set_httpheader(file, "X-GP-SESSION-ID", ev->GP_SESSION_ID);
}
{
/* MPP-13031 copy #transform fragment, if present, into X-GP-TRANSFORM header */
char* p = local_strstr(file->common.url, "#transform=");
if (p && p[11]) set_httpheader(file, "X-GP-TRANSFORM", p + 11);
}
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_HTTPHEADER, file->curl->x_httpheader);
if (!multi_handle){
if (! (multi_handle = curl_multi_init())) elog(ERROR, "internal error: curl_multi_init failed");
}
/* SSL configuration */
if (IS_GPFDISTS_URI(url)){
Insist(PointerIsValid(DataDir));
elog(LOG,"trying to load certificates from %s", DataDir);
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_ERRORBUFFER, curl_Error_Buffer); /* curl will save its last error in curlErrorBuffer */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSLCERTTYPE, "PEM"); /* cert is stored PEM coded in file... */
if (extssl_cert != NULL) { /* set the cert for client authentication */
memset(extssl_cer_full, 0, MAXPGPATH);
snprintf(extssl_cer_full, MAXPGPATH, "%s/%s", DataDir, extssl_cert);
if (!is_file_exists(extssl_cer_full)) ereport(ERROR,(errcode(errcode_for_file_access()), errmsg("could not open certificate file \"%s\": %m",extssl_cer_full)));
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSLCERT, extssl_cer_full);
}
if (extssl_pass != NULL) CURL_EASY_SETOPT(file->curl->handle, CURLOPT_KEYPASSWD, extssl_pass); /* set the key passphrase */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSLKEYTYPE,"PEM");
if (extssl_key != NULL) { /* set the private key (file or ID in engine) */
memset(extssl_key_full, 0, MAXPGPATH);
snprintf(extssl_key_full, MAXPGPATH, "%s/%s", DataDir, extssl_key);
if (!is_file_exists(extssl_key_full)) ereport(ERROR,(errcode(errcode_for_file_access()), errmsg("could not open private key file \"%s\": %m",extssl_key_full)));
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSLKEY, extssl_key_full);
}
if (extssl_ca != NULL) { /* set the file with the CA certificates, for validating the server */
memset(extssl_cas_full, 0, MAXPGPATH);
snprintf(extssl_cas_full, MAXPGPATH, "%s/%s", DataDir, extssl_ca);
if (!is_file_exists(extssl_cas_full)) ereport(ERROR,(errcode(errcode_for_file_access()),errmsg("could not open private key file \"%s\": %m",extssl_cas_full)));
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_CAINFO, extssl_cas_full);
}
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSL_VERIFYPEER,(long)(verify_gpfdists_cert ? extssl_verifycert : extssl_no_verifycert)); /* set cert verification */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSL_VERIFYHOST,(long)(verify_gpfdists_cert ? extssl_verifyhost : extssl_no_verifyhost)); /* set host verification */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSLVERSION, extssl_protocol); /* set protocol */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSL_SESSIONID_CACHE, 0); /* disable session ID cache */
if (CURLE_OK != (e = curl_easy_setopt(file->curl->handle, CURLOPT_VERBOSE, (long)extssl_libcurldebug))){ /* set debug */
if (extssl_libcurldebug){ elog(INFO, "internal error: curl_easy_setopt CURLOPT_VERBOSE error (%d - %s)", e, curl_easy_strerror(e)); }
}
}
/* Allocate input and output buffers. */
file->in.ptr = palloc(1024); /* 1 kB buffer initially */
file->in.max = 1024;
file->in.bot = file->in.top = 0;
if (forwrite){
int bufsize = writable_external_table_bufsize * 1024;
file->out.ptr = (char *) palloc(bufsize);
file->out.max = bufsize;
file->out.bot = file->out.top = 0;
}
/* lets check our connection. start the fetch if we're SELECTing (GET request), or write an empty message if we're INSERTing (POST request) */
if (!forwrite){
gp_perform_backoff_and_check_response(file, multi_perform_work);
}else{ /* use empty message */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDS, "");
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDSIZE, 0);
/* post away and check response, retry if failed (timeout or * connect error) */
gp_perform_backoff_and_check_response(file, easy_perform_work);
file->seq_number++;
}
return (URL_FILE *) file;
}
当数据包从服务端返回时,header_callback函数会将header从数据包中提取出来设置到URL_CURL_FILE的http_response中。当数据包从服务端返回时,write_callback函数会将data从数据包中提取出来设置到URL_CURL_FILE的in缓冲区中。
读取PROTO 0协议时仅期望数据包中为数据而不是任何类型的元数据。对于PROTO 1协议,每个数据块由元数据标志:byte 0: type (can be 'F’ilename, 'O’ffset, 'D’ata, 'E’rror, 'L’inenumber)、 byte 1-4: length. # bytes of following data block. in network-order、byte 5-X: the block itself。
/* gp_proto0_read get data from the server and handle it according to PROTO 0. In PROTO 0 we expect the content of the file without any kind of meta info. Simple. */
static size_t gp_proto0_read(char *buf, int bufsz, URL_CURL_FILE *file) {
int n = 0;
fill_buffer(file, bufsz);
/* check if there's data in the buffer - if not fill_buffer() either errored or EOF. For proto0, we cannot distinguish between error and EOF. */
n = file->in.top - file->in.bot;
if (n == 0 && !file->still_running) file->eof = 1;
if (n > bufsz) n = bufsz;
/* xfer data to caller */
memcpy(buf, file->in.ptr + file->in.bot, n);
file->in.bot += n;
return n;
}
/* gp_proto1_read get data from the server and handle it according to PROTO 1. In this protocol each data block is tagged by meta info like this:
* byte 0: type (can be 'F'ilename, 'O'ffset, 'D'ata, 'E'rror, 'L'inenumber)
* byte 1-4: length. # bytes of following data block. in network-order.
* byte 5-X: the block itself. */
static size_t gp_proto1_read(char *buf, int bufsz, URL_CURL_FILE *file, CopyState pstate, char *buf2) {
char type;
int n, len;
/* Loop through and get all types of messages, until we get actual data, or until there's no more data. Then quit the loop to process it and return it. */
while (file->block.datalen == 0 && !file->eof){
/* need 5 bytes, 1 byte type + 4 bytes length */
fill_buffer(file, 5);
n = file->in.top - file->in.bot;
if (n == 0) ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE),errmsg("gpfdist error: server closed connection")));
if (n < 5) ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE),errmsg("gpfdist error: incomplete packet - packet len %d", n)));
type = file->in.ptr[file->in.bot++]; /* read type */
memcpy(&len, &file->in.ptr[file->in.bot], 4); /* read len */
len = ntohl(len); /* change order */
file->in.bot += 4;
if (len < 0) elog(ERROR, "gpfdist error: bad packet type %d len %d", type, len);
/* Error */
if (type == 'E'){
fill_buffer(file, len);
n = file->in.top - file->in.bot;
if (n > len) n = len;
if (n > 0){
/* cheat a little. swap last char and NUL-terminator. then print string (without last char) and print last char artificially */
char x = file->in.ptr[file->in.bot + n - 1];
file->in.ptr[file->in.bot + n - 1] = 0;
ereport(ERROR,(errcode(ERRCODE_DATA_EXCEPTION), errmsg("gpfdist error - %s%c", &file->in.ptr[file->in.bot], x)));
}
elog(ERROR, "gpfdist error: please check gpfdist log messages.");
return -1;
}
/* Filename */
if (type == 'F'){
if (buf != buf2){
file->in.bot -= 5;
return 0;
}
if (len > 256) elog(ERROR, "gpfdist error: filename too long (%d)", len);
if (-1 == fill_buffer(file, len)) elog(ERROR, "gpfdist error: stream ends suddenly");
/* If SREH is used we now update it with the actual file that the gpfdist server is reading. This is because SREH (or the client in general) doesn't know which file gpfdist is reading, since the original URL may include a wildcard or a directory listing. */
if (pstate->cdbsreh){
char fname[257];
memcpy(fname, file->in.ptr + file->in.bot, len);
fname[len] = 0;
snprintf(pstate->cdbsreh->filename, sizeof pstate->cdbsreh->filename, "%s [%s]", file->common.url, fname);
}
file->in.bot += len;
Assert(file->in.bot <= file->in.top);
continue;
}
/* Offset */
if (type == 'O'){
if (len != 8) elog(ERROR, "gpfdist error: offset not of length 8 (%d)", len);
if (-1 == fill_buffer(file, len)) elog(ERROR, "gpfdist error: stream ends suddenly");
file->in.bot += 8;
Assert(file->in.bot <= file->in.top);
continue;
}
/* Line number */
if (type == 'L'){
int64 line_number;
if (len != 8) elog(ERROR, "gpfdist error: line number not of length 8 (%d)", len);
if (-1 == fill_buffer(file, len)) elog(ERROR, "gpfdist error: stream ends suddenly");
/* update the line number of the first line we're about to get from gpfdist. pstate will update the following lines when processing the data */
memcpy(&line_number, file->in.ptr + file->in.bot, len);
line_number = local_ntohll(line_number);
pstate->cur_lineno = line_number ? line_number : INT64_MIN;
file->in.bot += 8;
Assert(file->in.bot <= file->in.top);
continue;
}
/* Data */
if (type == 'D'){
file->block.datalen = len;
file->eof = (len == 0);
break;
}
elog(ERROR, "gpfdist error: unknown meta type %d", type);
}
/* read data block */
if (bufsz > file->block.datalen) bufsz = file->block.datalen;
fill_buffer(file, bufsz);
n = file->in.top - file->in.bot;
/* if gpfdist closed connection prematurely or died catch it here */
if (n == 0 && !file->eof){
file->error = 1;
if (!file->still_running)
ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE),errmsg("gpfdist server closed connection"), errhint("The root cause is likely to be an overload of the ETL host or a temporary network glitch between the database and the ETL host causing the connection between the gpfdist and database to disconnect.")));
}
if (n > bufsz) n = bufsz;
memcpy(buf, file->in.ptr + file->in.bot, n);
file->in.bot += n;
file->block.datalen -= n;
return n;
}
gp_proto0_write使用curl向服务端写数据,使用POST请求实现push model。gp_proto0_write_done发送空POST请求,添加X-GP-DONE header。
/* gp_proto0_write use curl to write data to a the remote gpfdist server. We use a push model with a POST request. */
static void gp_proto0_write(URL_CURL_FILE *file, CopyState pstate){
char* buf = file->out.ptr; // 使用out buffer
int nbytes = file->out.top;
if (nbytes == 0) return;
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDS, buf); /* post binary data */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDSIZE, nbytes); /* set the size of the postfields data */
char seq[128] = {0}; /* set sequence number */
snprintf(seq, sizeof(seq), INT64_FORMAT, file->seq_number);
replace_httpheader(file, "X-GP-SEQ", seq);
gp_perform_backoff_and_check_response(file, easy_perform_work);
file->seq_number++;
}
/* Send an empty POST request, with an added X-GP-DONE header. */
static void gp_proto0_write_done(URL_CURL_FILE *file){
set_httpheader(file, "X-GP-DONE", "1");
/* use empty message */
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDS, "");
CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDSIZE, 0);
/* post away! */
gp_perform_backoff_and_check_response(file, easy_perform_work);
}
