• Spark rdd之mappartition妙用


    mapPartitions(func)源码

    def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope {
        val cleanedF = sc.clean(f)
        new MapPartitionsRDD(this,(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),preservesPartitioning)
      }
    
    • 1
    • 2
    • 3
    • 4

    类似于 Map 算子,但是不是基于每一条数据,而是基于一个 partition 来计算的,func 将接受一个迭代器,可以从迭代器中获取每一条数据进行操作,返回一个迭代器。形成一个新的 RDD。

    该算子一般用于优化 Map 算子,如下面这个例子:

    sc.parallelize(Seq(1, 2, 3, 4, 5),1)
    .mapPartitions(iter => {
        var res = List[Int]()
        //创建 mysql 客户端
        println("连接数据库")
        while (iter.hasNext) {
            val next = iter.next()
            println("向数据库写入数据:" + next)
            res = res :+ next
        }
        res.toIterator
      })
      .foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    输出如下,我们可以发现,通过一次连接我们就将一个 partition的数据都写入了数据库,
    如果使用的是 Map 算子,那么每写入一条数据都需要一次数据库连接,很明显是不划算的

    #一次加载一个分区的数据
    连接数据库
    向数据库写入数据:1
    向数据库写入数据:2
    向数据库写入数据:3
    向数据库写入数据:4
    向数据库写入数据:5
    1
    2
    3
    4
    5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    上面的写法并非最优写法,我们可以这样写:

     sc.parallelize(Seq(1, 2, 3, 4, 5),1)
      .mapPartitions(iter => {
        var res = List[Int]()
        println("连接数据库")
        iter.map(next=>{
          println("向数据库写入数据:" + next)
          next
        })
      })
      .foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    输出如下,其中的差异你可以细细体会,
    不但代码更简单, 而且可以防止partition数据过大导致的 OOM 等问题:

    #加载一条数据 写一条数据
    连接数据库
    向数据库写入数据:1
    1
    向数据库写入数据:2
    2
    向数据库写入数据:3
    3
    向数据库写入数据:4
    4
    向数据库写入数据:5
    5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    不过这种写法无法关闭数据库,更好的是自定义一个迭代器。迭代器一对一

    case class CustomIterator22(iterator: Iterator[Int]) extends Iterator[Int] {
      println("开启数据库")
    
      override def hasNext: Boolean = {
        val hasNext: Boolean = iterator.hasNext
        if (!hasNext) {
          println("关闭数据库")
        }
        hasNext
      }
    
      override def next(): Int = {
        val next: Int = iterator.next()
        println("写入数据"+next)
        next
      }
    }
    
    main{
    		//优化2:自定义迭代器
        rdd1.mapPartitions(iter=>{CustomIterator22(iter)}).foreach(println)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    开启数据库
    写入数据1
    1
    写入数据2
    2
    写入数据3
    3
    写入数据4
    4
    写入数据5
    5
    关闭数据库
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    迭代器一对多

    mappartition + flatMap + iterator

    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
        val sc: SparkContext = SparkContext.getOrCreate(conf)
    
        val list2: RDD[List[Int]] = sc.parallelize(List(List(1,2,3,4,5,6),List(2,3,4)))
    
        list2.mapPartitions(f=>CustomIter(f))
          .foreach(println)
      }
    
      //自定义迭代器 一对多
      case class CustomIter(iter:Iterator[List[Int]]) extends Iterator[Int] {
    
        //自定义处理函数
        def myF(list: List[Int]): Iterator[Int] ={
          list.iterator
        }
    
        //创建空的迭代器
        private var cur: Iterator[Int] = Iterator.empty
    
        override def hasNext: Boolean = {
          cur.hasNext || iter.hasNext && {
            cur = myF(iter.next())
            hasNext
          }
        }
    
        override def next(): Int = {
          (if (hasNext) cur else Iterator.empty).next()
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
  • 相关阅读:
    GaussDB数据库SQL系列:DROP & TRUNCATE & DELETE
    【栈的应用】数据结构之栈的几种常见题目(数制转换、括号匹配、汉诺塔、迷宫求解)
    Mysql8创建用户以及赋权操作
    Crypto(1) 攻防世界Caesar
    【第三方库】Windows编译libtorrent
    gin框架常见用法大全
    [UE]常见C++类继承关系
    java 泛型返回
    交换机与路由器技术-08-路由器上配置DHCP
    2.15 这样的小红书图片内容,最容易“踩雷”!【玩赚小红书】
  • 原文地址:https://blog.csdn.net/Lzx116/article/details/126978492