• Spark读取Hive表后会有多少个Task?


    Hive 作为大数据中数仓的重要框架,从速度贼慢的MR引擎,再到Tez,到如今的Spark,速度一直在提升。虽然一条Hive SQL会转换成Spark的几个job,以及会生成多少Stage,我们还不好判断,「但是Spark如何读取Hive表后会有多少个Task呢?」

    我们知道「Spark的Task数由partitions决定」,那么又如何决定呢?

    1. Hive在读取不可切片文件的时候只能由单个节点来读入所有数据,即使自己手动设置分区都不行

    2. 如果Hive表的每个分区的文件都是几M的可切片的小文件,那么spark在读取的时候,每个Task只处理这么小的文件不仅浪费资源还浪费时间,如何优化?

    那我们从spark读取文件的源码开始分析:

    1. //简单写个读取文件的语句
    2. val wordsRDD[String] = sc.textFile("xxxx"3)

    我们从textfile()方法进入,

    1.   def textFile(
    2.       path: String,
    3.       //注意看这里的最小分区,如果textfile()方法中传了分区参数的话就会以传入的为准,否则就会使用默认值
    4.       //def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
    5.       //我们通过源码发现,defaultMinPartitions 是默认并行度和2的最小值,而默认并行度=自己的cpu核数,所以分区最小值一般等于2
    6.       minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    7.     assertNotStopped()
    8.     hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    9.     //把上面的minPartitions传到这个hadoopFile()方法中。
    10.       minPartitions).map(pair => pair._2.toString).setName(path)
    11.   }

    于是我们从hadoopFile()进入

    发现读取文件将调用的是HadoopRDD,而这里的inputFormatClass是Hive创建时指定的,默认不指定为 org.apache.hadoop.mapred.TextInputFormat,同时注意这里的参数minPartitions,它是我们刚刚上面方法传过来的值。这次,继续从HadoopRDD进入,然后检索minPartitions,看一下这个参数被哪个方法使用了。经过检索,发现了以下的方法

    1. //getSplits()获取切片数
    2. val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)

    那么继续从getSplits方法进入,然后通过Ctrl+h找到实现类,我们这里选择FileInputFormat然后继续检索getSplits,然后找到了下面的这个方法我们来看看它的源码:

    1.   public InputSplit[] getSplits(JobConf job, int numSplits)
    2.     throws IOException {
    3.     Stopwatch sw = new Stopwatch().start();
    4.     FileStatus[] files = listStatus(job);
    5.     
    6.     // Save the number of input files for metrics/loadgen
    7.     job.setLong(NUM_INPUT_FILES, files.length);
    8.     long totalSize = 0;                           // compute total size
    9.     for (FileStatus file: files) {                // check we have valid files
    10.       if (file.isDirectory()) {
    11.         throw new IOException("Not a file: "+ file.getPath());
    12.       }
    13.       totalSize += file.getLen();
    14.     }
    15. //注意看这里,文件的总大小,直接除以之前获取到切片数,为2
    16.     long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    17.     long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
    18.       FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
    19.       //而上面的minSplitSize通过看源码发现private long minSplitSize = 1;
    20. //从这里得到的minSize 也等于1
    21.     // generate splits
    22.     ArrayList splits = new ArrayList(numSplits);
    23.     NetworkTopology clusterMap = new NetworkTopology();
    24.      files是上面扫描的分区目录下的part-*****文件
    25.     for (FileStatus file: files) {
    26.       Path path = file.getPath();
    27.       long length = file.getLen();
    28.       if (length != 0) {
    29.         FileSystem fs = path.getFileSystem(job);
    30.         BlockLocation[] blkLocations;
    31.         if (file instanceof LocatedFileStatus) {
    32.           blkLocations = ((LocatedFileStatus) file).getBlockLocations();
    33.         } else {
    34.           blkLocations = fs.getFileBlockLocations(file, 0, length);
    35.         }
    36.         //判断文件是否可切割
    37.         if (isSplitable(fs, path)) {
    38.         // 这里获取的不是文件本身的大小,它的大小从上面的length就可以知道,这里获取的是HDFS文件块(跟文件本身没有关系)的大小
    39.       // HDFS文件块的大小由两个参数决定,分别是 dfs.block.size 和 fs.local.block.size
    40.       // 在HDFS集群模式下,由 dfs.block.size 决定,对于Hadoop2.0来说,默认值是128MB
    41.       // 在HDFS的local模式下,由 fs.local.block.size 决定,默认值是32MB
    42.           long blockSize = file.getBlockSize();// 128MB
    43.           // 这里计算splitSize,goalSize是textfile()方法中指定路径下的文件总大小,minSize为1
    44.           long splitSize = computeSplitSize(goalSize, minSize, blockSize);
    45. //而这里computeSplitSize = Math.max(minSize, Math.min(goalSize, blockSize))
    46. //所以如果文件大小>128M,那么splitSize 就等于128M,否则就等于文件大小
    47.           long bytesRemaining = length;
    48.           // 如果文件大小大于splitSize,就按照splitSize对它进行分块
    49.       // 由此可以看出,这里是为了并行化更好,所以按照splitSize会对文件分的更细,因而split会更多
    50.       //SPLIT_SLOP 为1.1,也就是说,如果文件大小是切片大小的1.1倍以下时,也会分到一个切片,而不会分为2个
    51.           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    52.             String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
    53.                 length-bytesRemaining, splitSize, clusterMap);
    54.             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
    55.                 splitHosts[0], splitHosts[1]));
    56.             bytesRemaining -= splitSize;
    57.           }
    58. //而当切到最后一个切片<1.1倍时,就会再追加一个切片。
    59. //举个例子,假如文件大小为160M,因为160/128>1.1,所以切了一个之后,还剩32M
    60. //32M/128<1.1,但是32M != 0 ,所以就会为这32M生成一个切片。
    61.           if (bytesRemaining != 0) {
    62.             String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
    63.                 - bytesRemaining, bytesRemaining, clusterMap);
    64.             splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
    65.                 splitHosts[0], splitHosts[1]));
    66.           }
    67.         } else {
    68.         //这里指的是文件不可分割
    69.           String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
    70.           //在这里就makeSplit = new FileSplit(file, start, length, hosts);
    71.           //所以就是1个分区直接读取,所以假如这个文件的大小是500G不可分割的文件,
    72.           //那么只能是一个节点去读,只能用Spark的一个Task,容易数据倾斜。
    73.           splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
    74.         }
    75.       } else { 
    76.         //Create empty hosts array for zero length files
    77.         splits.add(makeSplit(path, 0, length, new String[0]));
    78.       }
    79.     }
    80.     sw.stop();
    81.     if (LOG.isDebugEnabled()) {
    82.       LOG.debug("Total # of splits generated by getSplits: " + splits.size()
    83.           + ", TimeTaken: " + sw.elapsedMillis());
    84.     }
    85.     return splits.toArray(new FileSplit[splits.size()]);
    86.   }

    所以如果Hive表分区下如果有200个小文件,大小假如为5M,那么每个小文件就是一个split了,从而对应了一个Spark的partition,所以当有多个小文件分区时,Spark Task的数量也将直线上升,而自指定的partition数对小文件来说并不能解决问题,所以无法改变读取Hive表Task数多的情况。

    而如果是大小是500G(远大于128M块大小的不可分割文件)不可分割的文件,那么只能是一个节点去读,只能用Spark的一个Task,容易数据倾斜。

    那么再回到刚开始的问题?这两种场景如何优化:

    1. 根据实际的业务场景,公司的数据量比较大,每天会有若干G的数据,那么再存储时就不要使用不可分割的压缩方式,可以使用Lzo,或者bzip2

    2. 产生了多个小文件,就要在小文件的源头,也就是思考自己目前的Hive表的分区方式是否合理?到Hive的日志文件的滚动配置是否合理?最后就是要合并小文件。

    最后我把源码的追溯过程放在了一张图上,方便大家查看。

  • 相关阅读:
    什么是B1级阻燃电缆
    HashMap底层分析
    【工作总结】工作为什么总是手忙脚乱
    Elasticsearch:使用你的 RAG 来进行聊天
    ChatGPT 学习笔记 - 1
    (标签-机器学习|关键词-set)
    财务管理-报支单提交中和付款申请被驳回解决措施
    forEach和map区别
    Node.js学习(二)
    TC8:SOMEIPSRV_SD_BEHAVIOR_01-04
  • 原文地址:https://blog.csdn.net/ytp552200ytp/article/details/126155510