• Spark RDD编程模型及算子介绍(二)


    常见的Action算子

    • countByKey算子:统计Key出现的次数,部分代码如下:
    rdd_file = sc.textFile("../Data/input/words.txt")
    rdd_map = rdd_file.flatMap(lambda line: line.split(" ")).map(lambda x:(x, 1))
    rdd_count = rdd_map.countByKey()
    print(rdd_count)
    print(type(rdd_count))
    # 返回结果为字典
    # defaultdict(, {'Apple': 4, 'Banana': 5, 'Orange': 4, 'Peach': 2})
    # 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • collect算子:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。RDD是分布式对象,数据量可以很大,所以用这个算子之前需要知道如果数据集结果很大,就会把driver内存撑爆,出现oom。

    • reduce算子:对RDD数据集按照传入的逻辑进行聚合操作,部分代码如下:

    rdd = sc.parallelize(range(1,10))
    rdd_reduce = rdd.reduce(lambda a,b : a+b)
    print(rdd_reduce)
    # 45
    
    • 1
    • 2
    • 3
    • 4
    • fold算子:和reduce一样接收传入逻辑进行聚合,聚合是带有初始值的。这个初始值既要作用在分区内,也要作用在分区间,部分代码如下:
    rdd = sc.parallelize(range(1,10),3)
    rdd_reduce = rdd.fold(10,lambda a,b : a+b)
    print(rdd_reduce)
    # 1 分为[1,2,3] [4,5,6] [7,8,9]
    # 2 每个分区+10
    # 3 最后汇总再+10 得到结果85
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • first算子:取出RDD第一个元素
    sc.parallelize([1,2,3,4]).first()
    # 1
    
    • 1
    • 2
    • take算子:取出RDD的前N个元素
    sc.parallelize([1,2,3,4],3).take(2)
    # [1,2]
    
    • 1
    • 2
    • top算子:对RDD元素进行降序排序,取前N个
    sc.parallelize([1,2,3,4],3).top(2)
    # [4, 3]
    
    • 1
    • 2
    • count算子:计算RDD有多少条数据,返回值为一个数字
    sc.parallelize([1,2,3,4],3).count()
    # 4
    
    • 1
    • 2
    • takeSample算子:随机抽样RDD的数据,部分代码如下:
    rdd = sc.parallelize([1,2,3,4,5,6,7,6,5,4,3,2,1],1)
    rdd_takeSample1 = rdd.takeSample(True, 18)
    print(rdd_takeSample1)
    rdd_takeSample2 = rdd.takeSample(False, 18)
    print(rdd_takeSample2)
    
    # [1, 1, 1, 4, 6, 4, 1, 1, 5, 4, 6, 7, 5, 1, 6, 6, 6, 2]
    # [2, 4, 2, 5, 5, 6, 3, 7, 4, 1, 6, 3, 1]
    # 参数一:bool型,True表示运行取同一个数据,False表示不允许取同一个数据,与数据内容无关,是否重复表示的是同一个位置的数据。
    # 参数二:抽样的数目(设置为false则无法超越RDD总数)
    # 参数三:随机种子(一般不需要传参)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • takeOrdered算子:对RDD排序取前N个,部分代码如下:
    rdd = sc.parallelize([1,2,3,4,5,6,7])
    #升序
    rdd_takeOrdered1 = rdd.takeOrdered(4)
    #降序
    rdd_takeOrdered2 = rdd.takeOrdered(4,lambda x : -x)
    
    print(rdd_takeOrdered1)
    print(rdd_takeOrdered2)
    # [1, 2, 3, 4]
    # [7, 6, 5, 4]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • foreach算子:对RDD的每个元素,执行逻辑操作与map类似,但是这个方法没有返回值。如果想显示值,只能在里面自行打印(无需经过Driver,直接在Executor打印效率更高)。
    rdd = sc.parallelize([1,2,3,4,5,6,7],1)
    rdd1 = rdd.foreach(lambda x : 2*x +1)
    rdd2 = rdd.foreach(lambda x : print(2*x +1))
    print(rdd1)
    3
    5
    7
    9
    11
    13
    15
    None
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • saveAsTextFile算子:保存文件API,分布式执行,不经过Driver,每个分区所在的Executor直接控制数据写出到目标文件系统中,每个分区产生1个结果文件。
    #设置为三个分区
    rdd_file = sc.textFile("hdfs://node1:8020/Test/WordCount.txt",3)
    rdd_words = rdd_file.flatMap(lambda line: line.split(" "))
    rdd_map = rdd_words.map(lambda x:(x, 1))
    rdd_total = rdd_map.reduceByKey(lambda a,b: a + b)
    rdd_rs = rdd_total.saveAsTextFile("hdfs://node1:8020/Test/word_rs1")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    结果如下图所示在HDFS WebUI上查看
    在这里插入图片描述

    常见分区操作算子

    • mapPartitions算子:与map相似,只是一次被传递的是一整个分区的数据,虽然在执行次数上与map相同,但是可以因为减少了网络io的传输次数,效率会大大的提高。部分代码如下:
    rdd = sc.parallelize([1,2,3,4,5,6],3)
    def func(iter):
        rs = list()
        for it  in iter:
            rs.append(2 * it + 1)
        return rs
    rdd_part = rdd.mapPartitions(func)
    rdd_rs = rdd_part.collect()
    print(rdd_rs)
    
    # [3, 5, 7, 9, 11, 13]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • foreachPartition算子:与普通foreach一样,只是一次被传递的是一整个分区的数据,部分代码如下:
    rdd = sc.parallelize([1,2,3,4,5,6],3)
    # 因为没有返回值所以不需要return
    def func(iter):
        rs = list()
        for it  in iter:
            rs.append(2 * it + 1)
        print(rs)
    
    rdd_part = rdd.foreachPartition(func)
    
    # [3, 5]
    # [7, 9]
    # [11, 13]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • partitionBy算子:对RDD进行自定义分区操作,部分代码如下
    # 参数1 重新分区后有几个分区
    # 参数2 自定义分区规则,函数传入(返回编号为int类型,分区编号从0开始,不要超过分区数)
    rdd = sc.parallelize([('a',1),('b',2),('c',3),('d',4),('e',5),('f',6)])
    
    def func(key):
        if key == 'a' or key == 'b' : return 0
        if key == 'c' or key == 'd' : return 1
        return 2
    
    rdd_part = rdd.partitionBy(3,func)
    rdd_rs = rdd_part.glom().collect()
    print(rdd_rs)
    
    # [[('a', 1), ('b', 2)], [('c', 3), ('d', 4)], [('e', 5), ('f', 6)]]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • repartition算子:对RDD的分区执行重新分区。不建议使用此算子,除非做全局排序的时候,将其设置为1。如果修改尽量减少,不要增加,增加会导致shuffle。不管是增加还是减少都会影响并行计算(内存迭代并行的管道数量),部分代码如下:
    rdd = sc.parallelize([1,2,3,4,5,6],3)
    rdd_re1 = rdd.getNumPartitions()
    print(rdd_re1)
    rdd_re2 = rdd.repartition(1).getNumPartitions()
    print(rdd_re2)
    rdd_re3 = rdd.repartition(5).getNumPartitions()
    print(rdd_re3)
    # 3
    # 1
    # 5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • coalesce算子:对分区数量进行增减,部分代码如下:
    # 参数1:分区数
    # 参数2:Bool True表示允许shuffle,False表示不允许(默认)。
    rdd_re4 = rdd.coalesce(1).getNumPartitions()
    print(rdd_re4)
    rdd_re5 = rdd.coalesce(5).getNumPartitions()
    print(rdd_re5)
    rdd_re6 = rdd.coalesce(5,shuffle=True).getNumPartitions()
    print(rdd_re6)
    # 1
    # 3 没有加shuffle=True这里有个API安全机制,分区不会增加
    # 5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 在源码中我们可以发现reparation算子底层调用的就是coalesce算子,只不过shuffle定义为true。源码如下:
    def repartition(self, numPartitions):
        return self.coalesce(numPartitions, shuffle=True)
    
    • 1
    • 2
  • 相关阅读:
    【AI视野·今日Robot 机器人论文速览 第三十九期】Fri, 22 Sep 2023
    Redis Lua沙盒绕过命令执行(CVE-2022-0543)
    PMP备考|通关宝典
    react-渲染过程--新节点挂载
    【JAVA案例】作业管理系统(控制台版本)
    马斯克热搜体质无疑,称已将大脑上传云端,却遭网友热议!
    pbootcms 二级域名 迁移网站后 出现 内页 404
    Log4Qt 使用
    第四章字符串_反转字符串里的单词
    CSP赛前复习总结
  • 原文地址:https://blog.csdn.net/sinat_31854967/article/details/127905759