• Spark - 第13章 高级RDD


    • 聚合和key-value RDD
    • 自定义分区
    • RDD连接

    Key-Value基础(Key-Value RDD)

            基于RDD的许多方法要求数据是key-value格式,这种方法都有形如<some-operation>ByKey的API名称,只要在方法名称中看到ByKey,就意味着只能以PairRDD类型执行此操作。最简单的方法就是将当前的RDD映射到基本的key-value结构,也就是说在RDD的每个记录中都有两个值。

    keyBy

            也可以使用keyBy函数,它是根据当前value创建key的函数。

    对值进行映射

            在有一组键值对之后,你可以开始对它们进行操作。如果我们有一个元组,Spark将假设第一个元素是key,第二个是value。在这种格式中,你可以显式选择映射value(并忽略key)。当然,可以手动执行此操作,但当你知道只是要修改value时,这可以帮助防止错误。

    提取key和value

            当数据是键值对这种格式时,我们可以使用方法来提取特定的key或value。

    lookup

            在RDD上很常见的任务就是查找某个key对应的value。请注意,并不强制规定每一个键入都只有一个键,所以如果当我们查找“s”时,我们将获得与该key相关的两个value,即“Spark”和“Simple”。

    sampleByKey

            有两种方法可以通过一组key来采样RDD,这可以是近似的方法也可以是精确的方法。这两种操作都可以使用或不适用替换策略,以及根据给定键值对数据集部分采样。这是通过对RDD的一次遍历来简单随机采样,采样数量大约是所有key-value对数量的math.ceil(numItems * samplingRate)这么多。
            使用sampleByKeyExact的方法不同于sampleByKey,因为他需要额外遍历RDD,以99.99%的置信度构造大小等于所有key-value对数量的math.ceil(numItems * samplingRate)这么多的样本集合。若设置不替换,则要一次额外遍历RDD以保证样本集大小。若设置替换取样,需要另外两次额外遍历。

    聚合操作

            可以在纯RDD或PairRDD上执行聚合操作,具体取决于所使用的方法。

    countByKey

            可以计算每个key对应的数据项的数量,并将结果写到本地Map中。你还可以近似地执行此操作,在Scala或Java中指定超时时间和置信度。

    了解聚合操作的实现

            有几种方法可以创建key-value PairRDD,但是实现方法对任务的稳定性非常重要。我们比较两个基本的实现方法:groupBy和reduceBy。

    groupByKey

            根本问题是每个执行器再执行函数之前必须在内存中保存一个key对应的所有value。 这会有什么问题吗?如果有严重的key负载倾斜现象,则某些分组可能由于key对应着太多的value而导致超载问题,进而出现OutOfMemoryErrors错误。当前的小数据集显然不会出现这种问题,但它可能会在处理大规模数据时爆发严重问题。这不一定会发生,但它可能会发生。
            groupByKey在某些情况下是可以的。如果每个key的value数量都差不多,并且知道它们能够被执行器的内存容纳那就可以了。对于其他情况,有一种首选的方法,就是使用reduceBuKey。

    reduceByKey

            因为我们是执行一个简单的计数,一个更稳妥的方法是同样执行flatMap,然后执行map将每个字母实例映射为数字,然后执行reduceByKey配以求和函数以将结果存储到数组。这种方法更加稳定,因为reduce发生在每个分组,并且不需要将所有内容放在内存中。此外,此操作不会导致shuffle过程,在执行最后的reduce之前所有的任务都在每个工作节点单独执行。着大大提高了执行速度以及该操作的稳定性。
            reduceByKey方法返回一个组(对应一个RDD)的RDD和不保证有序的元素序列。因此,当我们的任务操作满足结合律时,这种方法是完全可行的,而如果元素的顺序很重要时就不适合了。我理解的就是不需要将所有相同的key发送至一个节点。

    其他聚合方法

            还有许多高级聚合方法,使用它们主要取决于具体工作负载,而我们发现在当今Spark作业中,用户极少遇到这种工作负载(或需要执行这种操作)。因为使用结构化API执行更简单的聚合时,很少会使用这些非常低级的工具。这些函数允许你具体地控制在集群上执行某种聚合操作。

    aggregate

            此函数需要一个null值和一个起始值,并需要你指定两个不同的函数,第一个函数执行分区内聚合,第二个执行分区间聚合。起始值在两个聚合级别都使用。
            aggregate确实有一些性能问题,因为它在驱动器上执行最终聚合。如果执行器的结果太大,则会导致驱动器出现OutOfMemoryError错误并最终让程序崩溃。还有另一个方法treeAggregate,它基于不同的实现方法可以得到与aggregate相同的结果。它基本上是以“下推”方式完成一些子聚合(创建从执行器到执行器传输聚合结果的树),最后在执行最终聚合。多层级的形式确保驱动器在聚合过程中不会耗尽内存,这些基于树的实现通常会提高某些操作的稳定性,

    aggregateByKey

            此函数与aggregate基本相同,但是基于key聚合而非基于分区聚合。起始值和函数的属性配置也都相同。

    combineByKey

            不但可以指定聚合函数,还可以指定一个合并函数。该合并函数针对某个key进行操作,并根据某个函数对value合并,然后合并各个合并器的输出结果并得出最终结果。我们还可以按照自定义输出分区程序指定输出分区的数量。

    foldByKey

            foldByKey使用满足结合律函数和中性的“零值”合并每个key的value,支持多次累计到结果,并且不能更改结果(例如,0为加法,或1为乘法)

    CoGroups

            CoGroups在Scala中允许将三个key-value RDD一起分组,在Python中允许将两个key-value RDD一起分组。它基于key连接value,这实际上等效于基于组的RDD连接操作。执行此操作时,还可以指定多个输出分区或自定义分区函数,以精确控制此数据在整个集群上的分布情况。
            结果是一组kay-value对,key在一边,其所有的value在另一边。

    连接操作

            RDD的连接与结构化API中的连接有很多相同之处,它们都遵循相同的基本格式,包括执行连接操作的两个RDD,以及输出分区数或自定义分区函数。

    内连接

            全外连接,左外连接,右外连接,笛卡尔连接都遵循相同的基本格式。

    zip

            zip其实并不是一个连接操作,但它将两个RDD组合在一起,因此我们暂将它归类为连接操作。zip把两个RDD的元素对应的匹配在一起,要求两个RDD的元素个数相同,同时也要求两个RDD的分区数也相同,结果会生成一个PairRDD。

    控制分区

            使用RDD,你可以控制数据在整个集群上的物理分布,其中一些方法与结构化API中的基本相同,但是最关键的区别(在结构化API中不支持的)在于,它可以指定一个数据分区函数(自定义Partitioner)

    coalesce

            coalesce有效地折叠(collapse)同一工作节点上的分区,以便在重新分区时避免数据洗牌(shuffle)

    repartition

            Repartition操作将对数据进行重新分布,跨界点的分区会执行shuffle操作。对于map和filter操作,增加分区可以提高并行度。

    repartitionAndSortWithinPartitions

            此操作将对数据重新分区,并指定每个输出分区的顺序。使用repartitionAndSortWithinPartition,分区和key比较函数可由用户定义。

    自定义分区

            自定义分区是使用RDD的主要原因之一,而结构化API不支持自定义数据分区,RDD包含影响任务能否成功运行的低级实现细节。自定义分区的典型示例是PageRank实现,你需要控制集群上数据的分布并避免shuffle操作,而在我们的shopping数据集中,可能需要我们根据客户ID对数据进行分区。
            简而言之,自定义分区的唯一目标是将数据均匀地分布到整个集群中,以避免诸如数据倾斜之类的问题。如果要使用自定义分区,则应从结构化API定义的数据降级为RDD,应用自定义分区程序,然后再将RDD转换回DataFrame或Dataset。只有真正需要时,才会使用RDD自定义分区,这样就可以利用两方面的优势。
            要执行自定义分区,你需要实现Partitioner的子类。只有当你很了解特定领域知识时,你才需要这样做。如果你只是想对一个值或一组值(列)进行分区,那么用DataFrame API实现就可以了。
            Spark有两个内置的分区器,你可以在RDD API调用,它们是用于离散值划分的HashPartitioner(基于哈希的分区)以及RangePartitioner(根据数值范围分区),这两个分区器分别针对离散值和连续值。Saprk的结构化API已经包含了它们,也可以在RDD中使用它们。虽然哈希分区和范围分区程序都很有用,但它们是最基本的分区方法。有时,因为数据量很大并且存在严重的数据倾斜(由于某些key对应的value项比其他key对应的value项多很多导致的数据倾斜),你将需要实现一些非常底层的分区方法。你希望尽可能多地拆分这些key以提高并行性,并在执行过程中防止OutOfMemoryError错误发生。
            一个典型情况是,(当且仅当某个key有特定形式时)由于某个key对应的value太多,需要把这个key拆分成很多key。例如,数据集中可能对某两个客户的数据处理总是会使分析过程崩溃,我们需要对这两个客户进行细分,就是说比其他客户ID更细粒度地分解它们。由于这两个key倾斜的情况很严重,所以需要特别处理,而其他的key可以被集中到大组中。自定义key分发的逻辑仅在RDD级别适用,展示了以任意逻辑在集群中部署数据的能力。

    自定义序列化

            Kryo序列化问题,任何你希望并行处理(或函数操作)的对象都必须是可序列化的。默认序列化的方式可能很慢,Spark可以适用Kryo库更快地序列化对象。Kryo序列化的速度比Java序列化更快,压缩也更紧凑(通常是10倍),但并不是支持所有序列化类型的,并且要求你先注册在程序中使用的类。
            你可以借助于SparkConf使用Kryo初始化你的任务,并设置“spark.serializer”为“org.apache.spark.serializer.KyroSerializer”。此配置用于在工作节点之间数据传输时或将RDD写到磁盘上时,Spark所采用序列化工具。Spark没有选择Kryo作为默认序列化工具的原因式它要求自定义注册,但我们建议在网络传输量大的应用程序中尝试使用它。自Spark 2.0.0后,我们在对简单类型、简单类型数组或字符串类型的RDD进行shuffle操作时,已经默认采用Kryo序列化。

    小结

            讨论了有关RDD的许多更高级的主题。特别需要注意的部分是自定义分区,它允许你以特定的函数来划分数据。

  • 相关阅读:
    shardingJdbc分库分表实战
    nvm的下载,安装与使用详解
    面试 — 快手(后端开发)
    20221205英语学习
    linux下tomcat怎么部署war包
    csv和excel文件操作
    N32学习笔记9-串口dma方式收发数据+printf的代码版本
    【JavaEE】 饿汉模式与懒汉模式详解与实现
    HTML5文旅文化旅游网站模板源码
    删除类及其对象的属性:delattr()函数
  • 原文地址:https://blog.csdn.net/weixin_44556968/article/details/125533706