• spark在什么情况下会使用mapPartition


    spark中,map算子可以说是我们使用最频繁的一个基础算子,功能也非常简单,将一条数据经过变换形成另外一条数据。那么还有一个算子和它非常相似:mapPartition。
    很显然,这个算子多了一个Partition,所以表示的含义就是:对于每一个分区的数据,整体进行数据的变换。
    有人可能会问,难道map就不是对分区进行的?也是。不过map没有分区的概念,就是一条数据一条处理,也是分布式的处理,只是在map无法看到分区而已。
    那么对于mapPartition算子,什么情况下会使用呢?这里例举两种最频繁的应用场景:
    (1)数据频繁进行与外部交互的时候。
    最典型的比如需要将分布式的数据data每条进行与数据库的增删改查操作。很自然的想法就是:

    sc.parallelize(Seq(1, 2, 3, 4, 5))
    .map{x =>
    	XXX // 连接数据库操作
    	XXX.insert(x) // 增删改查该数据到数据库
    	XXX// 关闭数据库
    x
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    很显然这里采用直白的map操作,那么每条数据都会进行一次数据库的连接与关闭操作,显然是没有必要的。试想一下,其实每个分区的数据都在同一台机器上,那么对这个机器上的数据,其实只需要做一次数据库的连接与关闭操作不就行了?

    这就自然引出了mapPartition算子。mapPartition本身会将一个分区的数据打包成一个迭代器统一操作。
    还是刚才的例子,那么伪代码就是:

    sc.parallelize(Seq(1, 2, 3, 4, 5))
    .mapPartitions{iter =>
    	XXX // 连接数据库操作
    	var res = List[Int]()
    	while(iter.hasNext) {
    	    val next = iter.next()
    	    XXX.insert(next) // 增删改查该数据到数据库
    	    res = res :+ next
    	}
    	XXX// 关闭数据库
    	res.toIterator
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    可以看到,对于每一个分区的数据,只需要建立一次连接数据库,然后数据库变量可以对整个分区数据使用。mapPartitions传入的是整个分区数据,并且是个迭代器(Iterator),并且输出也得是迭代器,要不然会报错。
    所以,如果某个函数需要对整个分区都生效,并且不需要频繁调用,都可以考虑mapPartitions。其他情况下,如果没有此类操作,那么多数时候还是map更好。

    (2)数据需要在分区内先进行预处理的操作
    简而言之就是:分区间需要各自独立计算(如扩充数据、删减数据、组合变换形成新数据等操作)然后输出

    这种情况下通常是在优化任务性能的时候、或者分区间进行各自独立处理然后把结果汇总的时候会用到。

    比如你有一个(k,v)形式的海量数据需要进行各种聚合等操作,但是k太多了你有不想要这么多的k数据,那么可以在聚合后的分区里面单独处理,使用filter过滤掉分区中的某些数据,也就是对分区自己进行某种形式的瘦身;

    好了对于mapPartitions我觉得大多是情况下会在这两类应用场景使用较多。

  • 相关阅读:
    Llama改进之——分组查询注意力
    谈谈我们是如何实践 OKR 的(多年宝贵实践经验逼坑指南)
    【Linux】多线程
    AcWing102. 最佳牛围栏
    Pytorch与tensorboard观察Loss变化
    8086汇编笔记
    LeetCode39. Combination Sum
    python基础1
    Java中Date实现比较大小
    有效 TCP RST
  • 原文地址:https://blog.csdn.net/on2way/article/details/127658358