• 【Spark】Pyspark RDD


    1 SparkContext 执行环境入口

    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setAppName('test')\
        .setMaster('local[*]')
    sc = SparkContext(conf=conf)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. 构建RDD对象

    2.1 集合 -> RDD

    # 集合对象 -> rdd (集合对象,分区数默认cpu核数)
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
    print(rdd.glom().collect(), rdd.getNumPartitions())
    # [[1, 2], [3, 4], [5, 6]] 3
    
    • 1
    • 2
    • 3
    • 4

    2.2 文件 -> RDD

    # 文件 -> rdd
    rdd = sc.textFile("./data.csv")
    print(rdd.collect())
    # ['1, 2, 3, 4, 5, 6']
    
    • 1
    • 2
    • 3
    • 4

    2.3 RDD -> 文件

    # rdd -> 文件
    rdd = sc.parallelize([1, 2, 3], 3)
    rdd.saveAsTextFile('./output')
    '''  
    生成output文件夹
    里面有按分区存储的多个文件
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3 RDD 算子

    3.1 map、foreach、mapPartitions、foreach Partitions

    # map函数
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
    rdd2 = rdd.map(lambda x: (x, 1))
    print(rdd2.map(lambda x: x[0] + x[1]).collect())
    # [2, 3, 4, 5, 6, 7]
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    # foreach 
    # 同map,但无返回值,且不改变原元素
    # 另有foreachPartitions
    rdd = sc.parallelize([1, 2, 3])
    rdd.foreach(lambda x: print(x))
    # 1 3 2
    rdd.foreach(lambda x: -x)
    rdd.collect()
    # [1, 2, 3]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    
    # mapPartitions
    '''  
    map 一次调出一个元素进行计算,io次数多
    mapPartitions 一次将一个分区的所有元素调出计算s
    '''
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
    
    def func(iter):
        # 相较于map时间复杂度没优化,空间复杂度优化
        res = list()
        for it in iter:
            res.append(it * 10)
        return res
    
    rdd.mapPartitions(func).collect()
    # [10, 20, 30, 40, 50, 60]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    3.2 flatMap 先map再解除嵌套

    # flatMap 先执行map操作,再解除嵌套(降维 softmax前flatten)
    rdd = sc.textFile("./data.csv")
    print(rdd.collect())
    
    rdd.flatMap(lambda x: x.split(' ')).collect()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.3 reduceByKey、reduce、fold 分组聚合

    # reduceByKey 按照key分组,再对组内value完成聚合逻辑
    # key-value型(二元元组)rdd
    rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
    print(rdd.reduceByKey(lambda a, b: a + b).collect())
    # [('b', 3), ('a', 3)]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    # reduce 只聚合
    # 不返回rdd 
    rdd = sc.parallelize(range(1, 3))
    print(rdd.reduce(lambda a, b: a + b))
    # 3
    print(sc.parallelize([('a', 1), ('a', 1)]).reduce(lambda a, b: a + b))
    # ('a', 1, 'a', 1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # fold 带初值的reduce
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
    print(rdd.fold(10, lambda a, b: a + b))
    '''   
    [[1, 2], [3, 4], [5, 6]]
    10 + 1 + 2 = 13
    10 + 3 + 4 = 17
    10 + 5 + 6 = 21
    10 + 13 + 17 + 21 = 61
    > 61
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.4 mapValue 二元组value进行map操作

    # mapValues 对二元组内的value执行map操作, 没有分组操作
    rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
    rdd.mapValues(lambda x: x * 10).collect()
    
    • 1
    • 2
    • 3

    3.5 groupBy、groupByKey

    • groupBy、groupByKey、reduceByKey区别
    # groupBy 多元组皆可进行分组,可选择按哪一个值分组
    # reduceByKey 分组后(ByKey)对value进行聚合(reduce),二元组第一个值为key
    rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])
    
    # 分组 按第一个值
    rdd2 = rdd.groupBy(lambda x: x[0])
    print(rdd2.collect())
    '''
    返回的是迭代器,需进一步转换
    [('a', ), 
    ('b', )]
    '''
    rdd3 = rdd2.map(lambda x: (x[0], list(x[1])))
    print(rdd3.collect())
    '''  
    [('a', [('a', 1), ('a', 2)]), 
    ('b', [('b', 3), ('b', 4)])]
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    # groupByKey
    # 自动按照key分组,分组后没有聚合操作,只允许二元组
    rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])
    
    rdd2 = rdd.groupByKey()
    rdd2.map(lambda x: (x[0], list(x[1]))).collect()
    # [('a', [1, 2]), ('b', [3, 4])]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.6 filter、distinct 过滤筛选

    # filter 过滤器
    # 过滤条件True则保留
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    rdd.filter(lambda x: x > 3).collect()
    # [4, 5]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    # distinct 去重
    rdd = sc.parallelize([1, 1, 1, 1, 2, 3, 'a', 'a'])
    rdd.distinct().collect()
    # [[1, 'a', 2, 3]
    
    • 1
    • 2
    • 3
    • 4

    3.7 union 合并

    # union 合并两个rdd
    # 元素凑在一起,不考虑重复
    rdd_a = sc.parallelize([1, 1, 2, 3])
    rdd_b = sc.parallelize([2, 3, ('a', 1), ('b', 2)])
    
    rdd_a.union(rdd_b).collect()
    # [1, 1, 2, 3, 2, 3, ('a', 1), ('b', 2)]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.8 join、leftOuterJoin、rightOuterJoin 连接

    # join JOIN操作
    # 只用于二元组,相同key进行关联
    rdd_a = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
    rdd_b = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])
    
    print(rdd_a.join(rdd_b).collect())
    '''   
    内连接 取交集
    [('b', (3, 2)), 
    ('a', (1, 1)), 
    ('a', (2, 1))]
    '''
    print(rdd_a.leftOuterJoin(rdd_b).collect())
    '''   
    左连接 取交集和左边全部
    [('b', (3, 2)), 
    ('a', (1, 1)), 
    ('a', (2, 1))]
    '''
    print(rdd_a.rightOuterJoin(rdd_b).collect())
    '''   
    右连接 取交集和右边全部
    [('b', (3, 2)), 
    ('c', (None, 3)), 
    ('a', (1, 1)), 
    ('a', (2, 1))]
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3.9 intersection 交集

    # intersection 取交集
    # 区别于join,没有按key连接的操作
    rdd_a = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])
    rdd_b = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])
    
    rdd_a.intersection(rdd_b).collect()
    # [('a', 1)]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.10 sortBy、sortByKey 排序

    # sortBy
    # func 指定排序元素的方法
    # ascending True生序,False降序
    # numPartitions 用多少分区排序
    rdd = sc.parallelize([[1, 2, 3], 
                          [7, 8, 9],
                          [4, 5, 6]])
    rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect()
    '''  
    [[1, 2, 3], 
    [4, 5, 6], 
    [7, 8, 9]]
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    # sortByKey 针对kv型rdd
    '''   
    ascending True升序,False降序
    numPartitions 全局有序要设为1,否则只能保证分区内有序
    keyfunc 对key进行处理,再排序
    '''
    rdd = sc.parallelize([('a', 1), ('c', 2), ('B', 3)])
    print(rdd.sortByKey(ascending=True, numPartitions=1).collect())
    '''   
    [('B', 3), ('a', 1), ('c', 2)]
    '''
    print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda k: str(k).lower()).collect())
    '''  
    [('a', 1), ('B', 3), ('c', 2)]
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.11 countByKey 统计key出现次数

    # countByKey 统计key出现次数,可多元元组
    # 返回dict 不是rdd
    rdd = sc.parallelize([('a', 1, 2), ('a'), ('b', 1)])
    rdd.countByKey()
    
    • 1
    • 2
    • 3
    • 4

    3.12 first、take、top、count 取元素

    # first 取第一个元素
    rdd = sc.parallelize([('a', 1, 2), ('a'), ('b', 1)])
    print(rdd.first() )
    # ('a', 1, 2)
    
    # take 取前n个元素
    print(rdd.take(2))
    # [('a', 1, 2), 'a']
    
    # count 返回元素个数
    print(rdd.count())
    
    # top 降序排序取前n个
    rdd = sc.parallelize([2, 4, 1, 6])
    print(rdd.top(2))
    # [6, 4]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3.13 takeOrdered 排序取前n个

    # takeOrdered 排序取前n个
    '''   
    param1: n
    param2: func取数前更改元素,不更改元素本身,
    不传func,默认升序(取前n最小值)
    func = lambda x: -x 变为降序,取前n最大值,和top相同
    '''
    rdd = sc.parallelize([2, 4, 1, 6])
    rdd.takeOrdered(2) # [1, 2]
    rdd.takeOrdered(2, lambda x: -x) # [6, 4]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3.14 takeSample 随机抽取

    # takeSample 随机抽取元素
    '''  
    param1: True随机有放回抽样,Fasle不放回抽样 
    param2: 抽样个数
    param3: 随机数种子
    '''
    rdd = sc.parallelize([1])
    rdd.takeSample(True, 2)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4 RDD 缓存

    '''   
    执行完rdd2_2后,rdd1、rdd2都消失;
    执行到2_2后,需要重新执行rdd1、rdd2,再生成rdd2_2
    '''
    rdd1 = sc.parallelize([1, 2, 3])
    rdd2 = rdd1.map(lambda x: x + 1)
    rdd2_1 = rdd2.map(lambda x: x * 10)
    
    rdd2_2 = rdd2.map(lambda x: x * 20)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    from pyspark.storagelevel import StorageLevel
    
    '''   
    缓存rdd2,后续不用再算一遍rdd1、rdd2
    '''
    rdd2.cache() # 缓存到内存
    rdd2.persist(StorageLevel.MEMORY_ONLY) # 仅缓存到内存,内存不够会丢数据
    rdd2.persist(StorageLevel.MEMORY_ONLY_2) # 仅缓存到内存,两个版本,内存不够会丢数据
    rdd2.persist(StorageLevel.DISK_ONLY) # 仅缓存到硬盘 _2,_3
    rdd2.persist(StorageLevel.MEMORY_AND_DISK) # 优先放内存,不够放硬盘 _2
    rdd2.persist(StorageLevel.OFF_HEAP) # 堆外内存(系统内存)
    
    
    rdd2.unpersist() # 清理缓存
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    springboot景区寄存管理系统(源码+sql+论文报告)
    文本生成高精准3D模型,北京智源AI研究院等出品—3D-GPT
    Kafka入门到精通-阶段一(入门&实战&监控)
    learnOpenGl
    Windows服务器TLS协议
    驱动LSM6DS3TR-C实现高效运动检测与数据采集(4)----上报匿名上位机实现可视化
    通过python 的selenium 操作shadow前端页面实现自动点击上传图片
    Django render()函数页面渲染
    九、从0开始卷出一个新项目之瑞萨RZN2L生产烧录固件(jflash擦写读外挂flash)
    leetcode算法每天一题029:两数相除
  • 原文地址:https://blog.csdn.net/qq_45249685/article/details/132641736