原因:map 端过程产生大量对象导致内存溢出:这种溢出的原因是在单个 map 中产生了大量的 对象导致的针对这种问题。
解决方案:
如rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。 shuffle 内存溢出的情况可以说都是 shuffle 后,单个文件过大导致的。在 shuffle 的使用,需要传入一个 partitioner,大部分 Spark 中的 shuffle 操作,默认的 partitioner 都是 HashPatitioner,默认值是父 RDD 中最大的分区数.这个参数 spark.default.parallelism 只对 HashPartitioner 有效.如果是别的 partitioner 导致的 shuffle 内存溢出就需要重写 partitioner 代码了.
- //自定义的Hash分区器
- class CustomPartitioner(numPar: Int) extends Partitioner {
- assert(numPar > 0)
-
- // 返回分区数, 必须要大于0.
- override def numPartitions: Int = numPar
-
- //返回指定键的分区编号(0到numPartitions-1)
- override def getPartition(key: Any): Int = key match {
- case null => 0
- case _ => key.hashCode().abs % numPar
- }
- }
- object MyCustomPartitioner {
- def main(args: Array[String]): Unit = {
- val conf: SparkConf = new SparkConf().setAppName("MyCustomPartitioner").setMaster("local[2]")
- val sc: SparkContext = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- val rdd1 = sc.parallelize(Array((10, "a"), (20, "b"), (30, "c"), (40, "d"), (50, "e"), (60, "f")))
- println(rdd1.partitioner)
-
- //使用HashPartitioner重新分区
- val rdd2 = rdd1.partitionBy(new CustomPartitioner(2))
- println(rdd2.partitioner)
- rdd2.glom().map(_.toList).collect().foreach(println)
- }
- }
这是我最近才遇到的一个问题,因为hdfs中不适合存小问题,所以Spark计算后如果产生的文件太小,我们会调用coalesce合并文件再存入hdfs中。但是这会导致一个问题,例如在coalesce之前有100个文件,这也意味着能够有100个Task,现在调用coalesce(10),最后只产生10个文件,因为coalesce并不是shuffle操作,这意味着coalesce并不是按照我原本想的那样先执行100个Task,再将Task的执行结果合并成10个,而是从头到尾只有10个Task在执行,原本100个文件是分开执行的,现在每个Task同时一次读取10个文件,使用的内存是原来的10倍,这导致了OOM。解决这个问题的方法是令程序按照我们想的先执行100个Task再将结果合并成10个文件,这个问题同样可以通过repartition解决,调用repartition(10),因为这就有一个shuffle的过程,shuffle前后是两个Stage,一个100个分区,一个是10个分区,就能按照我们的想法执行。
类似这样rdd.flatMap(x=>for(i <- 1 to 1000) yield (“key”,”value”))导致OOM,但是在同样的情况下,使用rdd.flatMap(x=>for(i <- 1 to 1000) yield “key”+”value”)就不会有OOM的问题,这是因为每次(“key”,”value”)都产生一个Tuple对象,而”key”+”value”,不管多少个,都只有一个对象,指向常量池,于是乎,如果RDD中有大量的重复数据,或者Array中需要存大量重复数据的时候我们都可以将重复数据转化为String,能够有效的减少内存使用.