• Clickhouse 从S3/Hive导入数据


    背景

            我们的埋点数据上传到S3,大概是每天10亿条的数据量级别。最近花了一些时间思考和学习如何将每天如此大量的数据从S3导入到Clickhouse,为后续的实时查询做准备。

    方案一

            1. 先将S3的数据导入到hive,这一步操作比较简单,创建一个外部表即可,按日期字段进行分区。

    1. CREATE TABLE `s3_to_hive_test`(
    2. id ,
    3. aaa ,
    4. bbb ,
    5. ccc ,
    6. ddd ,
    7. …… )
    8. PARTITIONED BY (
    9. `ingestion_date` string)
    10. ROW FORMAT SERDE
    11. 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    12. STORED AS INPUTFORMAT
    13. 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    14. OUTPUTFORMAT
    15. 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    16. LOCATION
    17. 's3://host/bucket/path'
    18. TBLPROPERTIES (
    19. 'last_modified_by'='hadoop',
    20. 'last_modified_time'='1625729438',
    21. 'parquet.column.index.access'='true',
    22. 'spark.sql.create.version'='2.2 or prior',
    23. 'spark.sql.sources.schema.numPartCols'='1',
    24. 'spark.sql.sources.schema.numParts'='1',
    25. 'spark.sql.sources.schema.part.0'='{……}',
    26. 'spark.sql.sources.schema.partCol.0'='ingestion_date')

            2. 每天定时从hive把前一天的数据导入到Clickhouse,这里可以借助waterdrop(现改名为seatunnel)工具进行导入,定时调度可以自己写一个shell脚本,如果导入后还需要进行数据清洗、聚合等,推荐DolphinScheduler

    1. #waterdrop的config文件
    2. spark {
    3. spark.app.name = "Waterdrop"
    4. spark.executor.cores = 1
    5. spark.executor.memory = "2g" // 这个配置必需填写,否则会使用 sparksql 内置元数据库
    6. spark.default.parallelism = 12
    7. spark.driver.memory = "64g"
    8. spark.sql.catalogImplementation = "hive"
    9. }
    10. input {
    11. hive {
    12. pre_sql = "select `id`,`aaa`,`bbb`,`ccc`,`ddd`,…… from default.s3_to_hive_test where ……"
    13. table_name = "waterdrop_tmp"
    14. }
    15. }
    16. filter {
    17. }
    18. output {
    19. clickhouse {
    20. host = "ch_host:ch_port"
    21. database = "default"
    22. username = "***"
    23. password = "******"
    24. table = "ch_mergetree_test"
    25. bulk_size = 200000
    26. retry = 3
    27. fields =
    28. [ id ,
    29. aaa ,
    30. bbb ,
    31. ccc ,
    32. ddd ,
    33. …… ]
    34. }
    35. }

            3. 导入后的清洗、聚合等操作。

    适用场景:

            绝大多数的场景都适用,之前我们项目也是采用此方案。目前由于引入K8S进行管理,hive、Spark、waterdrop、DolphinScheduler分属不同容器,要处理太多网络通信的问题,故改用更为简单的方案三。

    方案二

            1. 如方案一,先将S3的数据以外部表的方式导入到hive中并按日期分区。

    1. CREATE TABLE `s3_to_hive_test`(
    2. id ,
    3. aaa ,
    4. bbb ,
    5. ccc ,
    6. ddd ,
    7. …… )
    8. PARTITIONED BY (
    9. `ingestion_date` string)
    10. ROW FORMAT SERDE
    11. 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    12. STORED AS INPUTFORMAT
    13. 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    14. OUTPUTFORMAT
    15. 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    16. LOCATION
    17. 's3://host/bucket/path'
    18. TBLPROPERTIES (
    19. 'last_modified_by'='hadoop',
    20. 'last_modified_time'='1625729438',
    21. 'parquet.column.index.access'='true',
    22. 'spark.sql.create.version'='2.2 or prior',
    23. 'spark.sql.sources.schema.numPartCols'='1',
    24. 'spark.sql.sources.schema.numParts'='1',
    25. 'spark.sql.sources.schema.part.0'='{……}',
    26. 'spark.sql.sources.schema.partCol.0'='ingestion_date')

            2. Clickhouse建立hive集成表

    1. CREATE TABLE IF NOT EXISTS hive_to_ch_test
    2. (
    3. `id` String NULL,
    4. `aaa` String NULL,
    5. `bbb` String NULL,
    6. `ccc` String NULL,
    7. `ddd` String NULL,
    8. ……
    9. ) ENGINE = Hive('thrift://host:port', 'database', 'table');
    10. PARTITION BY ingestion_date

              3. Clickhouse内部将hive集成表的数据导入到MergeTree表

    1. insert into ch_mergetree_test(
    2. id ,
    3. aaa ,
    4. bbb ,
    5. ccc ,
    6. ddd ,
    7. …… )
    8. SELECT
    9. id,
    10. ifNull(aaa, ''),
    11. ifNull(bbb, ''),
    12. ifNull(ccc, ''),
    13. ifNull(ddd, ''),
    14. ……
    15. FROM hive_to_ch_test
    16. WHERE ……

            4. 后续的数据清洗、聚合等操作

            在实测中,Clickhouse到22.4版本为止似乎还不支持hive集成表的底层存储为S3这种形式。具体表现为能建立hive集成表,但查询的时候报以下错误:

    1. Query id: bfeb2774-eb2b-4b2c-9230-64bd6d35acfe
    2. 0 rows in set. Elapsed: 0.013 sec.
    3. Received exception from server (version 22.4.5):
    4. Code: 210. DB::Exception: Received from localhost:9000. DB::Exception: Unable to connect to HDFS: InvalidParameter: Cannot parse URI: hdfs://****, missing port or invalid HA configuration Caused by: HdfsConfigNotFound: Config key: dfs.ha.namenodes.**** not found. (NETWORK_ERROR)

    但支持底层为HDFS,如下图所示。(注意:如HDFS是HA模式,需要参考官方文档进行一些额外的配置,否则也会报以上错误) 适用场景:

            Clickhouse版本必须高于22.1,因为hive集成表引擎是在22.1版本才发布的。详见Clickhouse ChangeLog

    方案三

     大道至简,直接省略S3导入到Hive这个步骤。

            1. 建立S3集成表

    1. DROP TABLE IF EXISTS s3_to_ch_test ;
    2. CREATE TABLE s3_to_ch_test (
    3. `id` String NULL,
    4. `aaa` String NULL,
    5. `bbb` String NULL,
    6. `ccc` String NULL,
    7. `ddd` String NULL,
    8. …… )
    9. ENGINE=S3(concat('https://s3_host/bucket/path/*' ),'accessKey','secretKey', 'Parquet')
    10. SETTINGS input_format_parquet_allow_missing_columns=true ;

            2. Clickhouse内部将数据从S3集成表导入MergeTree表

    1. insert into ch_mergetree_test(
    2. id ,
    3. aaa ,
    4. bbb ,
    5. ccc ,
    6. ddd ,
    7. …… )
    8. SELECT
    9. id,
    10. ifNull(aaa, ''),
    11. ifNull(bbb, ''),
    12. ifNull(ccc, ''),
    13. ifNull(ddd, ''),
    14. ……
    15. FROM s3_to_ch_test
    16. WHERE ……

            3. 后续的数据清洗、集成等操作

            当然,在具体实施过程,还是会遇到一些坑。

    坑一、超出内存限制

            由于我们在S3中存储的文件格式是Parquet类型,Parquet是面向分析型业务的列式存储格式,Clickhouse在处理Parquet文件是内存密集型的。我尝试将output_format_parquet_row_group_size 参数调小,但没有任何作用,仍报上述异常。猜测Clickhouse在查询数据时并不是按文件一个个读取并插入的,而是将所有文件的列为单位装到内存中。因为我单个文件最大也才10G,分配了50G内存仍超出内存。

    解决方案:调大内存限制的值。

            方式1. 临时设置(仅对当前session有效):

    1. set max_memory_usage=150000000000;
    2. select * from system.settings where name='max_memory_usage';

            方式2. 修改/etc/clickhouse-server/users.xml文件(长期有效)

    <max_memory_usage>150000000000</max_memory_usage>

    坑二、效率问题

    解决方案1:加大内存

            内存100G的情况下,实测10亿条数据导入到MergeTree表消耗时间4300+ Sec,加到150G消耗时间3800+ Sec。

    解决方案2:增加线程数

            当内存150G,max_insert_threads=1时,耗时3837s

             当内存150G,max_insert_threads=2时,耗时1939s

            当内存150G,max_insert_threads=3时,耗时1407s

            当内存150G,max_insert_threads=4时,超出内存限制!!

            虽然增加线程数对导入效率会有明显提升,但不意味着线程数越多越好,因为高线程数是以高内存为代价的,需要根据服务器内存和导入数据量的情况,平衡好max_memory_usage和max_insert_threads的关系。 

  • 相关阅读:
    深入理解独占锁ReentrantLock类锁
    01 `Linux`基础
    处理数据中不必要的部分,并将处理过的数据写入新的文件中
    用 Jmeter 工具做个小型压力测试
    使用舵机和超声波模块实现小车自动避障
    双向链表的实现
    Invalid prop: custom validator check failed for prop “percentage
    油气管道系统安全状态监测技术研究进展
    Spring的copy属性
    无涯教程-JavaScript - ADDRESS函数
  • 原文地址:https://blog.csdn.net/m0_37795099/article/details/125382402