• GreenPlum的gpfdist使用与原理流程分析


    一、简介

    GreenPlum 的数据导入功能作为对数据源的一种扩充,数据导入的方式有:

    • 1、insert 该方式通过 sql 语句,把数据一条一条插入至表中。这种方式,不仅读取数据慢(一条一条读取),且数据需要经过 master 节点后再分发给所有 segment,所以 master 制约着导入性能。
    • 2、copy 该方式实现了数据的批量读取,但数据依然需要通过 master 节点,所以 master 制约着导入性能,无法实现并行、高效的数据加载。
    • 3、gpfdist 该方式使用 gpfdist 协议,segment 与 数据源直连,数据读取后直接发送给每个 segment。这种方式,数据不再通过 master,真正实现了数据加载的并行、高效。

    gpfdist 是 Greenplum 数据库并行文件分发程序。
    它可以被外部表和 gpload 用来并行地将外部表文件提供给所有的 Greenplum 数据库 Segment。
    它也可以被可写外部表使用,并行接受来自 Greenplum 数据库 Segment 的输出流,并将它们写出到文件中。
    总的来说,可以并行读文件数据,通过 segment 将数据读取至 master 中, 可以并行写文件数据,通过 segment 将数据写入文件中。 gpfdist 本身是单进程单线程程序,所以如果需要实现服务端的并行,需要启动多个 gpfdist 服务。

    二、架构部署


    在这里插入图片描述

    三、配置与使用

    命令格式:

    gpfdist [-d ] [-p ] [-l ] [-t ] 
    [-S] [-w 
    • 1
    • 2
    • 3
    • 4

    例如:gpfdist -p 9000

    参数:

    -d     可以指定工作目录,如果没指定,则为当前目录
    -l      指定 log 文件,如果没指定,则直接输出到屏幕中
    -p     指定服务端口,默认是 8080
    -m    指定最大一行数据的大小,单位是 byte,默认是 32768, 即 32K,可配范围是 32K ~ 256M
    -S    写入文件的时候,同步等待数据写入至存盘后再返回
    -v                      显示详细信息
    -V                显示更详细信息,当使用这个 V 时,上面的 v 也会被显示出来
    -s                    不显示头信息   (这个在 --help 中没有显示,但代码里面是支持的,可以用)
    -c                指定一个配置文件,用来执行数据转换的 (这个在 --help 中没有显示,但代码是支持的)
    --ssl    指定 ssl 加密
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    使用示例

    示例中主要使用的是 gpfdist 协议,使用的文件格式主要是 csv。

    1、创建只读外部表基本语法

    只读外部表

    CREATE [READABLE] EXTERNAL [TEMPORARY | TEMP] TABLE table_name     
        ( column_name data_type [, ...] | LIKE other_table )
         LOCATION ('file://seghost[:port]/path/file' [, ...])
           | ('gpfdist://filehost[:port]/file_pattern[#transform=trans_name]'
               [, ...]
           | ('gpfdists://filehost[:port]/file_pattern[#transform=trans_name]'
               [, ...])
           | ('pxf://path-to-data?PROFILE=profile_name[&SERVER=server_name][&custom-option=value[...]]'))
           | ('s3://S3_endpoint[:port]/bucket_name/[S3_prefix] [region=S3-region] [config=config_file]')
         [ON MASTER]
         FORMAT 'TEXT' 
               [( [HEADER]
                  [DELIMITER [AS] 'delimiter' | 'OFF']
                  [NULL [AS] 'null string']
                  [ESCAPE [AS] 'escape' | 'OFF']
                  [NEWLINE [ AS ] 'LF' | 'CR' | 'CRLF']
                  [FILL MISSING FIELDS] )]
              | 'CSV'
               [( [HEADER]
                  [QUOTE [AS] 'quote'] 
                  [DELIMITER [AS] 'delimiter']
                  [NULL [AS] 'null string']
                  [FORCE NOT NULL column [, ...]]
                  [ESCAPE [AS] 'escape']
                  [NEWLINE [ AS ] 'LF' | 'CR' | 'CRLF']
                  [FILL MISSING FIELDS] )]
              | 'CUSTOM' (Formatter=<formatter_specifications>)
        [ ENCODING 'encoding' ]
          [ [LOG ERRORS [PERSISTENTLY]] SEGMENT REJECT LIMIT count
          [ROWS | PERCENT] ]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    1.1、 使用csv 文件创建只读外部表示例

    只读外部表

    指定了csv格式的时候,默认的 分隔符(DELIMITER)为 ‘,’ ,默认的引号值(QUOTE)为 ‘"’,默认的换行符(NEWLINE)为\n。

    create external table ext (id int, name char(20)) 
        location ('gpfdist://gp_init:9000/data.csv') 
        format 'csv';
    
       insert into a select * from ext;
    
    create external table ext2 (id int, name char(20)) 
        location ('gpfdist://gp_init:9000/data.csv', 
                  'gpfdist://gp_init:9000/data2.csv') 
        format 'csv';
    
    create external table ext6 (id int, name char(20)) 
        location ('gpfdist://gp_init:9000/data*.csv') 
        format 'csv';
    
    create external table ext7 (id int, name char(20)) 
        location ('gpfdist://gp_init:9000/data.csv', 
                  'gpfdist://gp_init:9001/data2.csv') 
        format 'csv';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    查看数据文件与外部表

    数据文件的内容

    [root@gp_init gpfdist]# cat data.csv 
    0, asdfghjkl;
    1, asdfghjkl;
    [root@gp_init gpfdist]# cat data2.csv 
    0, asdfghjkl;
    1, asdfghjkl;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    查询外部表
    postgres=# select * from ext;
     id |         name         
    ----+----------------------
      0 |  asdfghjkl;         
      1 |  asdfghjkl;         
    (2 rows)
    
    postgres=# select * from ext2;
     id |         name         
    ----+----------------------
      0 |  asdfghjkl;         
      1 |  asdfghjkl;         
      0 |  asdfghjkl;         
      1 |  asdfghjkl;         
    (4 rows)
    
    postgres=# select * from ext6;
     id |         name         
    ----+----------------------
      0 |  asdfghjkl;         
      1 |  asdfghjkl;         
      0 |  asdfghjkl;         
      1 |  asdfghjkl;         
    (4 rows)
    
    postgres=# select * from ext7;
     id |         name         
    ----+----------------------
      0 |  asdfghjkl;         
      1 |  asdfghjkl;         
      0 |  asdfghjkl;         
      1 |  asdfghjkl;         
    (4 rows)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    1.2、使用 pipe管道创建只读外部表示例

    只读外部表
    gpfdist 支持从管道中读取数据流。

    创建一个管道
    [root@gp_init gpfdist]# mkfifo gpfdist_pipe
    
    • 1
    往管道里填写数据
    cat data.csv  >  gpfdist_pipe
    
    • 1
    创建外部表
    create external table ext_pipe(id int, name varchar(30))
    location ('gpfdist://gp_init:9000/gpfdist_pipe')
    format 'csv';
    
    • 1
    • 2
    • 3
    查询外部表
    postgres=# select * from ext_pipe ;
     id |    name     
    ----+-------------
      0 |  asdfghjkl;
      1 |  asdfghjkl;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、创建可写外部表

    2.1、使用csv 文件创建可写外部表示例

    目前可写的外部表只支持 gpfdist 协议。

    CREATE WRITABLE EXTERNAL [TEMPORARY | TEMP] TABLE table_name
        ( column_name data_type [, ...] | LIKE other_table )
         LOCATION('gpfdist://outputhost[:port]/filename[#transform=trans_name]'
              [, ...])
          | ('gpfdists://outputhost[:port]/file_pattern[#transform=trans_name]'
              [, ...])
          FORMAT 'TEXT' 
                   [( [DELIMITER [AS] 'delimiter']
                   [NULL [AS] 'null string']
                   [ESCAPE [AS] 'escape' | 'OFF'] )]
              | 'CSV'
                   [([QUOTE [AS] 'quote'] 
                   [DELIMITER [AS] 'delimiter']
                   [NULL [AS] 'null string']
                   [FORCE QUOTE column [, ...]] | * ]
                   [ESCAPE [AS] 'escape'] )]
    
               | 'CUSTOM' (Formatter=<formatter specifications>)
        [ ENCODING 'write_encoding' ]
        [ DISTRIBUTED BY ({column [opclass]}, [ ... ] ) | DISTRIBUTED RANDOMLY ]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    创建可写外部表
    create writable external table extw (id int, name char(20)) 
        location ('gpfdist://gp_init:9000/data.csv')
        format 'csv';
    
    create writable external table extw2 (id int, name char(20)) 
        location ('gpfdist://gp_init:9000/data.csv', 
                  'gpfdist://gp_init:9000/data2.csv') 
        format 'csv';
    
     create writable external table extw3 (id int, name char(20)) 
        location ('gpfdist://gp_init:9000/data.csv', 
                  'gpfdist://gp_init:9001/data2.csv') 
        format 'csv';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    插入数据
    postgres=# create table abc(id int, name char(20));
    postgres=# insert INTO abc select * from ext;
    postgres=# select count(*) from abc;
     count  
    --------
     100000
    (1 row)
    
    -- 写入文件
    -- 基本上是会把数据平局分给各个文件里。
    postgres=# insert INTO extw select * from abc ;
    INSERT 0 100000
    [root@gp_init gpfdist]# wc -l data.csv 
    100000 data.csv
    
    
    postgres=# insert INTO extw2 select * from abc ;
    INSERT 0 100000
    [root@gp_init gpfdist]# wc -l data.csv  data2.csv 
      50171 data.csv
      49829 data2.csv
     100000 total
    
    
    postgres=# insert INTO extw3 select * from abc ;
    INSERT 0 100000
    [root@gp_init gpfdist]# wc -l data.csv  data2.csv 
      50135 data.csv
      49865 data2.csv
     100000 total
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2.2、使用 pipe管道创建可写外部表示例

    创建可写外部表
    create writable external table extw4 (id int, name char(20)) 
        location ('gpfdist://gp_init:9000/gpfdist_pipe') 
        format 'csv';
    
    • 1
    • 2
    • 3
    -- 执行写入操作
    postgres=# insert INTO extw4 select * from abc;
    [root@gp_init opt]# bash read.sh gpfdist_pipe 
    100000
    
    • 1
    • 2
    • 3
    • 4

    主要函数代码解析

    1、main

    • gpfdist_init :所有的处理都是在 gpfdist_init 函数中。
    • gpfdist_run:只有一个 event_dispatch。

    2、gpfdist_init

    • 命令行输入参数解析:parse_command_line。
    • 注册信号处理signal_register:对信号的处理。捕获 SIGTERM 信号,设置最高优先级,注册 process_term_signal 回调函数,在回调函数中,关闭所有 socket 后,退出。
    • http服务配置http_setup:
      • 先根据配置的端口,获取地址信息,然后对地址进行端口绑定。这里获取的地址信息包含两个:ipv4 地址 和 ipv6 地址。gpfdist 优先使用ipv6进行绑定,因为IPv6的监听套接字能够同时监听v4客户端和v6客户端。
      • 遍历所有可用的套接字,设置监听事件:EV_READ | EV_PERSIST(持久属性),设置优先级,并绑定回调函数:do_accept。可用套接字个数保存在 gcb.listen_sock_count 中,可用的 socket 保存在 gcb.listen_socks 数组中。
      • do_accept:对请求进行 accept,记录了自己 accept 产生的 socket(r->sock),client 的 port(r->port),并对socket设置一些属性,例如:keepalive、reuseaeddr 等。
      • 并调用 setup_read 设置read事件。
        • setup_read设置可读事件的回调函数:do_read_request,并判断该请求中是否带 timeout 参数:
          • 如果没带 timeout 参数,则是普通请求,不需要设置 timeout。
          • 如果带 timeout 参数,则是超时事件,需要重新设置超时参数。
    • 看门狗初始化:watchdog_thread。

    3、do_read_request
    如果是超时事件,则直接发送http_end,后续会关闭连接。
    读取socket,如果是返回0,发送 http_end。

    • gnet_parse_request :解析出请求的方法,以及后面跟的参数
    • request_validate :检测请求参数是否正确:必须是GET或POST,且使用协议必须是 HTTP/1.xx。
    • percent_encoding_to_char :检测path中的字符是否存在 %XX 的格式,如果有,则转化为 char 类型的字符。
      如果 path 为 “/gpfdist/status”,则直接返回 send_gpfdist_status。(该请求作为 debug 使用)
    • request_set_path: 将 path 相对路径转换为绝对路径。
    • request_parse_gp_headers:解析 header 中的信息,比如 xid / cid / sn / segcount / csvopt 等。
    • request_set_transform : 执行 transform 转换。
    • session_attach:根据文件名和 TID (由 xid/cid/sn 生成) 创建一个会话,对文件的处理都在会话中处理。根据 tid 和 path 组成 key,从 hash 表中获取该 session,如果没有则进行创建。如果 session 中没获取到 fstream,则直接返回错误。
      如果是 GET,则通过 handle_get_request 函数增加写事件监听(被监听的 fd 是在 do_accept 函数中 accept的 fd),如果是 POST,则调用 handle_post_request 函数。
    • do_write:
      • 1、如果上次读取的数据,都已经发送完成了(通过 block_t 里面的 bot == top 来判断),则再通过 session_get_block 读取一个 block。
      • 2、如果 gp_proto 的协议号为 1,则调用 local_send:发送头部数据,即 blockhdr_t 中的 hbyte
      • 3、调用 local_send:再发送真正的数据,即 block_t 中的 data
      • 4、通过 setup_write,再次添加当前 fd 的写事件
    数据读取示意图

    在这里插入图片描述
    学习记录,原文

  • 相关阅读:
    三分类网络的物理意义是什么?
    服务器中间件的种类和日常使用
    黑客(网络安全)技术自学30天
    vue 打包性能优化总结
    LocalDate、LocalTime、LocalDateTime常用方法
    Linux(Centos7)服务器中配置Mysql主从数据库,以及数据库的安装,防火墙操作
    039:vue中数字货币化快速显示
    CSS核心使用
    信息学奥赛一本通:1139:整理药名
    阿里云天池大赛赛题(机器学习)——阿里云安全恶意程序检测(完整代码)
  • 原文地址:https://blog.csdn.net/qq_37432174/article/details/132578849