广播变量
- 举个例子,2个Executor处理四个分区。当本地对象被发送到每个分区的处理线程上使用,也就是一个Executor内。每个Executor相当于就发送了两份数据,Executor进程内资源共享,所以就造成内存资源的浪费。
- 而如果将本地对象标记成广播变量,则给每个Executor发送一份数据,而不像给每个Executor里的每个分区进程都发送一份数据,从而大大地减少了内存资源浪费。
广播变量的代码演示
bc = sc.broadcast(list)
value = bc.value
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('test')
sc = SparkContext(conf=conf)
list_stu = [(1, '马云', 1001),
(2, '马化腾', 1002),
(3, '刘强东', 1003),
(4, '李彦宏', 1004)]
bc = sc.broadcast(list_stu)
rdd_score = sc.parallelize([
(1, '语文', 90),
(1, '数学', 95),
(1, '英语', 98),
(1, '计算机', 85),
(2, '语文', 90),
(2, '数学', 97),
(2, '英语', 93),
(2, '计算机', 96),
(3, '语文', 90),
(3, '数学', 90),
(3, '英语', 80),
(3, '计算机', 85),
(4, '语文', 90),
(4, '数学', 98),
(4, '英语', 93),
(4, '计算机', 99),
])
def data_func(data):
id =data[0]
name = ""
for stu in bc.value:
stu_id = stu[0]
if id == stu_id:
name = stu[1]
return (name,data[1],data[2])
rdd_map = rdd_score.map(data_func)
rdd_rs = rdd_map.collect()
print(rdd_rs)
[('马云', '语文', 90), ('马云', '数学', 95), ('马云', '英语', 98), ('马云', '计算机', 85), ('马化腾', '语文', 90), ('马化腾', '数学', 97), ('马化腾', '英语', 93), ('马化腾', '计算机', 96), ('刘强东', '语文', 90), ('刘强东', '数学', 90), ('刘强东', '英语', 80), ('刘强东', '计算机', 85), ('李彦宏', '语文', 90), ('李彦宏', '数学', 98), ('李彦宏', '英语', 93), ('李彦宏', '计算机', 99)]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
广播变量的使用场景
- 使用场景:本地集合对象和分布式集合对象(RDD) 进行关联的时候使用广播变量。需要将本地集合对象封装为广播变量
- 为什么不能把本地对象转换为RDD进行分布式计算呢?
- 对于这个问题,如果是两个RDD进行分布式计算,按照上述代码需求,难免会使用Join算子,从而会产生大量的shuffle,导致性能降低。所以对于本地数据量不是特别大的时候(KB,MB级别),没有必要转换为RDD进行分布式计算。如果本地集合过大,Driver内存明显不足的情况下,还是转换为RDD进行join计算。
- 使用广播变量的优点:
- 累加器是用来把Executor端变量信息聚合到Driver端,在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
- 以下段代码为例:
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('test')
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,0],2)
count = 0
def func(data):
global count
count += 1
print(count)
rdd_rs = rdd1.map(func).collect()
print(count)
- count来自Driver,在分布式算子需要count对象时,Driver会将count对象发送给每个Executor,在最后执行print的时候。这个被print的count还是driver的那个,并没有被聚合得到10。所以不管累加多少,这个值始终是0。所以此时我们需要使用累加器。
累加器的代码演示
acmlt = sc.accumulator(0)
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('test')
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,0],2)
acmlt = sc.accumulator(0)
def func(data):
global acmlt
acmlt += 1
print(acmlt)
rdd_rs = rdd1.map(func).collect()
print(acmlt)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
使用累加器的注意事项
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('test')
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,0],2)
acmlt = sc.accumulator(0)
def func(data):
global acmlt
acmlt += 1
print(acmlt)
rdd2 = rdd1.map(func)
rdd_rs1 = rdd2.collect()
rdd3 = rdd2.map(lambda x:x)
rdd_rs2 = rdd3.collect()
print(acmlt)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 在执行rdd3的时候,rdd2此时数据已经没有了,所有又要重新构建了一次rdd2,所以导致累加器累加的数据再次被执行,结果就是20了。
- 我们需要注意:使用累加器的时候,因为RDD是过程数据,如果RDD被多次使用,可能会重新构建此RDD。如果是累加器累加代码,存在重新构建中,累加器的累加代码被多次执行。
- 我们可以重视cache()或是CheckPoint避免此类事情的发生。
rdd2 = rdd1.map(func)
rdd2.cache()
rdd_rs1 = rdd2.collect()
rdd3 = rdd2.map(lambda x:x)
rdd_rs2 = rdd3.collect()