• Spark 广播变量和累加器


    广播变量

    • 举个例子,2个Executor处理四个分区。当本地对象被发送到每个分区的处理线程上使用,也就是一个Executor内。每个Executor相当于就发送了两份数据,Executor进程内资源共享,所以就造成内存资源的浪费。
    • 而如果将本地对象标记成广播变量,则给每个Executor发送一份数据,而不像给每个Executor里的每个分区进程都发送一份数据,从而大大地减少了内存资源浪费。

    广播变量的代码演示

    • 广播变量API介绍
    # 将本地list数据标记为广播变量
    bc = sc.broadcast(list)
    # 使用广播变量
    value = bc.value
    
    • 1
    • 2
    • 3
    • 4
    • 代码演示
    # coding: utf8
    from pyspark import SparkConf, SparkContext
    from pyspark.storagelevel import StorageLevel
    if __name__ == '__main__':
        conf  = SparkConf().setMaster('local[*]').setAppName('test')
        sc = SparkContext(conf=conf)
        # 定义本地python list对象
        list_stu = [(1, '马云', 1001),
                         (2, '马化腾', 1002),
                         (3, '刘强东', 1003),
                         (4, '李彦宏', 1004)]
        # 1. 将本地Python List对象标记为广播变量
        bc = sc.broadcast(list_stu)
        rdd_score = sc.parallelize([
            (1, '语文', 90),
            (1, '数学', 95),
            (1, '英语', 98),
            (1, '计算机', 85),
            (2, '语文', 90),
            (2, '数学', 97),
            (2, '英语', 93),
            (2, '计算机', 96),
            (3, '语文', 90),
            (3, '数学', 90),
            (3, '英语', 80),
            (3, '计算机', 85),
            (4, '语文', 90),
            (4, '数学', 98),
            (4, '英语', 93),
            (4, '计算机', 99),
        ])
    
        # 匹配本地list和分布式rdd中的学生ID  匹配成功后 即可获得当前学生的姓名
        def data_func(data):
            id =data[0]
            name = ""
            # 2. 在使用到本地集合对象的地方, 从广播变量中取出来用即可
            for stu in bc.value:
                stu_id = stu[0]
                if  id == stu_id:
                      name = stu[1]
            return (name,data[1],data[2])
    
        rdd_map = rdd_score.map(data_func)
        rdd_rs = rdd_map.collect()
        print(rdd_rs)
    # 输出结果
    [('马云', '语文', 90), ('马云', '数学', 95), ('马云', '英语', 98), ('马云', '计算机', 85), ('马化腾', '语文', 90), ('马化腾', '数学', 97), ('马化腾', '英语', 93), ('马化腾', '计算机', 96), ('刘强东', '语文', 90), ('刘强东', '数学', 90), ('刘强东', '英语', 80), ('刘强东', '计算机', 85), ('李彦宏', '语文', 90), ('李彦宏', '数学', 98), ('李彦宏', '英语', 93), ('李彦宏', '计算机', 99)]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    广播变量的使用场景

    • 使用场景:本地集合对象和分布式集合对象(RDD) 进行关联的时候使用广播变量。需要将本地集合对象封装为广播变量
    • 为什么不能把本地对象转换为RDD进行分布式计算呢?
    • 对于这个问题,如果是两个RDD进行分布式计算,按照上述代码需求,难免会使用Join算子,从而会产生大量的shuffle,导致性能降低。所以对于本地数据量不是特别大的时候(KB,MB级别),没有必要转换为RDD进行分布式计算。如果本地集合过大,Driver内存明显不足的情况下,还是转换为RDD进行join计算。
    • 使用广播变量的优点:
      • 减少网络IO次数。
      • 减少Executor内存占用。

    累加器

    • 累加器是用来把Executor端变量信息聚合到Driver端,在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
    • 以下段代码为例:
    # coding: utf8
    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
        conf  = SparkConf().setMaster('local[*]').setAppName('test')
        sc = SparkContext(conf=conf)
    
        rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,0],2)
        count = 0
        def func(data):
            global count
            count += 1
            print(count)
    
        rdd_rs = rdd1.map(func).collect()
        print(count)
    # 最终结果是0,而不是10?
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • count来自Driver,在分布式算子需要count对象时,Driver会将count对象发送给每个Executor,在最后执行print的时候。这个被print的count还是driver的那个,并没有被聚合得到10。所以不管累加多少,这个值始终是0。所以此时我们需要使用累加器。

    累加器的代码演示

    • 累加器API介绍:
    # 设置累加器,累加器里的数字指定初始值
    acmlt = sc.accumulator(0)
    
    • 1
    • 2
    # coding: utf8
    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
        conf  = SparkConf().setMaster('local[*]').setAppName('test')
        sc = SparkContext(conf=conf)
    
        rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,0],2)
        acmlt = sc.accumulator(0)
    
        def func(data):
            global acmlt
            acmlt += 1
            print(acmlt)
    
        rdd_rs = rdd1.map(func).collect()
        print(acmlt)
    # 最终数据结果为10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    使用累加器的注意事项

    • 以下一段代码为例
    # coding: utf8
    from pyspark import SparkConf, SparkContext
    if __name__ == '__main__':
        conf  = SparkConf().setMaster('local[*]').setAppName('test')
        sc = SparkContext(conf=conf)
    
        rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,0],2)
        acmlt = sc.accumulator(0)
    
        def func(data):
            global acmlt
            acmlt += 1
            print(acmlt)
    
        rdd2 = rdd1.map(func)
        rdd_rs1 = rdd2.collect()
        rdd3 = rdd2.map(lambda x:x)
        rdd_rs2 = rdd3.collect()
    
        print(acmlt)
    # 最终输出结果是20,为什么不是10?
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 在执行rdd3的时候,rdd2此时数据已经没有了,所有又要重新构建了一次rdd2,所以导致累加器累加的数据再次被执行,结果就是20了。
    • 我们需要注意:使用累加器的时候,因为RDD是过程数据,如果RDD被多次使用,可能会重新构建此RDD。如果是累加器累加代码,存在重新构建中,累加器的累加代码被多次执行。
    • 我们可以重视cache()或是CheckPoint避免此类事情的发生。
    rdd2 = rdd1.map(func)
    # 添加cache()
    rdd2.cache()
    rdd_rs1 = rdd2.collect()
    rdd3 = rdd2.map(lambda x:x)
    rdd_rs2 = rdd3.collect()
    # 结果 10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    【提问募集】向世界级软件开发大师“Bob 大叔”Robert C. Martin 提出你的疑虑!
    【AI】《动手学-深度学习-PyTorch版》笔记(二十二):单发多框检测(SSD)
    python+django协同过滤算法的音乐推荐系统研究vue
    LeetCode每日一练 —— OR36 链表的回文结构
    《你不知道的javaScript》中卷——第一部分——第一章——类型
    【LeetCode力扣】86. 分隔链表
    【ES-实战入门】
    Java 中的参数传递方式
    超螺旋滑模控制详细介绍(全网独家)
    【Leetcode】水题-面试题 01.03. URL化
  • 原文地址:https://blog.csdn.net/sinat_31854967/article/details/127982159