• Hive、SparkSQL是如何决定写文件的数量的?


    Hive自身和Spark都提供了对Hive的SQL支持,用SQL的交互方式操作Hive底层的HDFS文件,两种方式在写文件的时候有一些区别:

    1. Hive

    1.1 without shuffle

    Hive在通过SQL写文件是通过MapReduce任务完成的,如下面这个例子:

    hive> insert into table temp.czc_hive_test_write values ('col1_value'1),('col1_value'2);
    

    在表中插入数据后,可以hdfs对应路径下找到存储的文件

    1. hadoop fs -ls /user/hive/warehouse/temp.db/czc_hive_test_write/part_date=2018-12-12           
    2. Found 2 items
    3. -rwxrwxrwx   3 hadoop supergroup         26 2019-12-20 15:56 hdfs://sdg/user/hive/warehouse/temp.db/czc_hive_test_write/part_date=2018-12-12/000000_0

    可以看到插入生成了1个文件,这是因为每一条插入语句都会单独启动一个MapReduce任务,一个MapReduce任务对应一个结果文件。

    1.2 with shuffle

    当插入过程有shuffle时:

    1. hive> insert into table temp.czc_hive_game select count(*), game_id from temp.source_table group by game_id;
    2. ...
    3. Hadoop job information for Stage-1: number of mappers: 62; number of reducers: 1
    4. ...

    由Hive实现group by的过程可知,group by的时候会以group by的字段为key进行shuffle,即上例中的game_id字段。从执行日志中可以看到整个任务启用了62个mapper和1个reducer,由于最终写数据的过程是在reducer中完成,所以最终写数据的文件数量也应该只有1个。

    1. $ hadoop fs -ls  /user/hive/warehouse/temp.db/czc_hive_game
    2. Found 1 items
    3. -rwxrwxrwx   3 hadoop supergroup        268 2019-12-20 16:31 /user/hive/warehouse/temp.db/czc_hive_game/000000_0

    注:Hive控制reducer数量的规则如下:

    Hive自己如何确定reduce数:
    reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:
    hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1G)
    hive.exec.reducers.max
    即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;

    Spark SQL

    2.1 without shuffle

    Spark SQL也可以在hive中操作文件,执行命令

    spark.sql("insert into table temp.czc_spark_test_write values ('col1_value', 1),('col1_value', 2)")
    

    Hdfs中文件的存储如下:

    1. $ hadoop fs -ls  /user/hive/warehouse/temp.db/czc_spark_test_write
    2. Found 2 items
    3. -rwxrwxrwx   3 hadoop supergroup         13 2019-12-20 17:01 /user/hive/warehouse/temp.db/czc_spark_test_write/part-00000-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000
    4. -rwxrwxrwx   3 hadoop supergroup         13 2019-12-20 17:01 /user/hive/warehouse/temp.db/czc_spark_test_write/part-00001-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000
    5. $ hadoop fs -cat /user/hive/warehouse/temp.db/czc_spark_test_write/part-00000-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000
    6. col1_value      1
    7. $ hadoop fs -cat /user/hive/warehouse/temp.db/czc_spark_test_write/part-00001-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000
    8. col1_value      2

    可以发现即使是同一条语句,spark也会启动两个任务区并行的写文件,最终产生了两个文件结果。

    2.2 with shuffle

    Spark中同样以类似的SQL为例:

    1. scala>  spark.sql("insert into table temp.czc_spark_game select count(*), game_id from temp.source_table  group by game_id");
    2. res1: org.apache.spark.sql.DataFrame = []

    与Hive不同的是,Spark在执行shuffle过程的时候,会为每一个shuffle的key启动一个任务来写数据,上例中的key game_id在源数据source_table的分布情况是共有26个不同的key。

    1. hive> select count(distinct game_id) from temp.source_table;
    2. OK
    3. 26

    因此spark会启动26个任务来写数据,在最终的结果文件中也应该有26个文件:

    1. hadoop fs -ls hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game
    2. Found 26 items
    3. -rwxrwxrwx   3 hadoop supergroup          0 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00000-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    4. -rwxrwxrwx   3 hadoop supergroup          8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00007-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    5. -rwxrwxrwx   3 hadoop supergroup          7 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00010-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    6. -rwxrwxrwx   3 hadoop supergroup          9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00011-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    7. -rwxrwxrwx   3 hadoop supergroup          9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00012-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    8. -rwxrwxrwx   3 hadoop supergroup          9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00032-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    9. -rwxrwxrwx   3 hadoop supergroup         14 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00036-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    10. -rwxrwxrwx   3 hadoop supergroup          9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00043-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    11. -rwxrwxrwx   3 hadoop supergroup          8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00048-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    12. -rwxrwxrwx   3 hadoop supergroup         24 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00065-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    13. -rwxrwxrwx   3 hadoop supergroup          9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00066-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    14. -rwxrwxrwx   3 hadoop supergroup         16 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00083-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    15. -rwxrwxrwx   3 hadoop supergroup          8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00086-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    16. -rwxrwxrwx   3 hadoop supergroup         16 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00101-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    17. -rwxrwxrwx   3 hadoop supergroup          9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00102-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    18. -rwxrwxrwx   3 hadoop supergroup          9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00105-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    19. -rwxrwxrwx   3 hadoop supergroup         14 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00111-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    20. -rwxrwxrwx   3 hadoop supergroup         12 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00123-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    21. -rwxrwxrwx   3 hadoop supergroup          9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00124-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    22. -rwxrwxrwx   3 hadoop supergroup          8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00136-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    23. -rwxrwxrwx   3 hadoop supergroup          9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00162-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    24. -rwxrwxrwx   3 hadoop supergroup          8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00163-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    25. -rwxrwxrwx   3 hadoop supergroup         10 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00165-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    26. -rwxrwxrwx   3 hadoop supergroup          8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00174-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    27. -rwxrwxrwx   3 hadoop supergroup         17 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00176-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
    28. -rwxrwxrwx   3 hadoop supergroup          9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00199-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000

    2.3 解决小文件问题

    由于spark的写文件方式,会导致产生很多小文件,会对NameNode造成压力,读写性能变差,为了解决这种小文件问题,spark新的版本(笔者使用2.4.0.cloudera2版本)中支持了动态规划shuffle过程,需要配置spark.sql.adaptive.enabled属性。

    1. scala> spark.sql("set spark.sql.adaptive.enabled=true")
    2. scala> spark.sql("insert into table temp.czc_spark_game select count(*), game_id from temp.source_table group by game_id")

    在将spark.sql.adaptive.enabled属性设置为true后,spark写文件的结果为

    1. hadoop fs -ls hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game
    2. Found 1 items
    3. -rwxr-xr-x   3 hadoop supergroup        268 2019-12-20 20:55 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00000-a293e3b3-3136-4f57-bf66-f0ee2d4f8dbb-c000

    从结果可以看到只有一个文件,这是由于动态规划的作用,在写文件的时候只启动了一个任务。动态规划的细节请参考Adaptive Execution 让 Spark SQL 更高效更智能。

  • 相关阅读:
    C语言-变量与数据类型
    【多智能体强化学习】
    K8s 集成 Jenkins 部署Go Web项目
    研发主管接私活被辞退,法院判决公司赔偿20.7万元
    QT页面布局方法大全
    企业实践开源的动机
    springboot框架中生成一个md5文件校验类,md5文件校验类必须包括传入的一个key值秘钥,还有上传内容是byte[]类型
    新手如何入门Web3?
    [apue] 文件中的空洞
    docker安装mysql并挂载配置文件和修改密码
  • 原文地址:https://blog.csdn.net/ytp552200ytp/article/details/126364208