第
2
章 资源调优
2.1
资源规划
2.1.1
资源设定考虑
1
、总体原则
以单台服务器
128G
内存,
32
线程为例。
先设定单个
Executor
核数,根据
Yarn
配置得出每个节点最多的
Executor
数量,每个节
点的
yarn
内存
/
每个节点数量
=
单个节点的数量
总的
executor
数
=
单节点数量
*
节点数。
2
、具体提交参数
1
)
executor-cores
每个
executor
的最大核数。根据经验实践,设定在
3~6
之间比较合理。
2
)
num-executors
该参数值
=
每个节点的
executor
数
* work
节点数
每个
node
的
executor
数
=
单节点
yarn
总核数
/
每个
executor
的最大
cpu
核数
考虑到系统基础服务和
HDFS
等组件的余量,
yarn.nodemanager.resource.cpu-vcores
配
置为:
28
,参数
executor-cores
的值为:
4
,那么每个
node
的
executor
数
= 28/4 = 7,
假设集
群节点为
10
,那么
num-executors = 7 * 10 = 70
3
)
executor-memory
该参数值
=yarn-nodemanager.resource.memory-mb /
每个节点的
executor
数量
如果
yarn
的参数配置为
100G
,那么每个
Executor
大概就是
100G/7
≈
14G,
同时要注意
yarn
配置中每个容器允许的最大内存是否匹配。
2.1.2
内存估算
➢
估算
Other
内存
=
自定义数据结构
*
每个
Executor
核数
➢
估算
Storage
内存
=
广播变量
+ cache/Executor
数量
➢
估算
Executor
内存
=
每个
Executor
核数
*
(数据集大小
/
并行度)
2.1.3
调整内存配置项
一般情况下,各个区域的内存比例保持默认值即可。如果需要更加精确的控制内存分
配,可以按照如下思路:
spark.memory.fraction=
(估算
storage
内存
+
估算
Execution
内存)
/
(估算
storage
内存
+
估算
Execution
内存
+
估算
Other
内存)得到
spark.memory.storageFraction =
(估算
storage
内存)
/
(估算
storage
内存
+
估算
Execution
内存)
代入公式计算:
Storage
堆内内存
=(spark.executor.memory
–
300MB)*spark.memory.fraction*spark.memory.storageFraction
Execution
堆内内存
=
(spark.executor.memory
–
300MB)*spark.memory.fraction*(1-spark.memory.storageFraction)
2.1
持久化和序列化
2.1.1 RDD
1
、
cache
打成
jar
,提交
yarn
任务
,
并在
yarn
界面查看
spark ui
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num
executors 3 --executor-cores 2 --executor-memory 6g --class
com.atguigu.sparktuning.cache.
RddCacheDemo
spark-tuning-1.0-SNAPSHOT-jar
with-dependencies.jar
通过
spark ui
看到,
rdd
使用默认
cache
缓存级别,占用内存
2.5GB,
并且
storage
内存
还不够,只缓存了
29%
。
2
、
kryo+
序列化缓存
使用
kryo
序列化并且使用
rdd
序列化缓存级别。使用
kryo
序列化需要修改
spark
的序
列化模式,并且需要进程注册类操作。
打成
jar
包在
yarn
上运行。
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num
executors 3 --executor-cores 2 --executor-memory 6g --class
com.atguigu.sparktuning.cache.
RddCacheKryoDemo
spark-tuning-1.0-SNAPSHOT
jar-with-dependencies.jar
查看
storage
所占内存,内存占用减少了
1083.6mb
并且缓存了
100%
。使用序列化缓
存配合
kryo
序列化,可以优化存储内存占用。
根据官网的描述,那么可以推断出,如果
yarn
内存资源充足情况下,使用默认级别
MEMORY_ONLY
是对
CPU
的支持最好的。但是序列化缓存可以让体积更小,那么当
yarn
内
存资源不充足情况下可以考虑使用
MEMORY_ONLY_SER
配合
kryo
使用序列化缓存。
2.1.2 DataFrame
、
DataSet
1
、
cache
提交任务,在
yarn
上查看
spark ui
,查看
storage
内存占用。内存使用
612.3mb
。
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num
executors 3 --executor-cores 2 --executor-memory 6g --class
com.atguigu.sparktuning.cache.
DatasetCacheDemo
spark-tuning-1.0-SNAPSHOT
jar-with-dependencies.jar
DataSet
的
cache
默认缓存级别与
RDD
不一样,是
MEMORY_AND_DISK
。
源码:
Dataset.cache() -> Dataset.persist() -> CacheManager.cacheQuery()
2
、序列化缓存
DataSet
类似
RDD
,但是并不使用
JAVA
序列化也不使用
Kryo
序列化,而是使用一种特
有的编码器进行序列化对象。
打成
jar
包,提交
yarn
。查看
spark ui,storage
占用内存
646.2mb
。
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num
executors 3 --executor-cores 2 --executor-memory 6g --class
com.atguigu.sparktuning.cache.
DatasetCacheSerDemo
spark-tuning-1.0-
SNAPSHOT-jar-with-dependencies.jar
和默认
cache
缓存级别差别不大。所以
DataSet
可以直接使用
cache
。
从性能上来讲,
DataSet,DataFrame
大于
RDD
,建议开发中使用
DataSet
、
DataFrame
。
2.2 CPU
优化
2.2.1 CPU
低效原因
1
、概念理解
1
)并行度
➢
spark.default.parallelism
设置
RDD
的默认并行度,没有设置时,由
join
、
reduceByKey
和
parallelize
等转换决定。
➢
spark.sql.shuffle.partitions
适用
SparkSQL
时,
Shuffle Reduce
阶段默认的并行度,默认
200
。此参数只能控制
Spark sql
、
DataFrame
、
DataSet
分区个数。不能控制
RDD
分区个数
2
)并发度:同时执行的
task
数
2
、
CPU
低效原因
1
)并行度较低、数据分片较大容易导致
CPU
线程挂起
2
)并行度过高、数据过于分散会让调度开销更多
Executor
接收到
TaskDescription
之后,首先需要对
TaskDescription
反序列化才能读取任
务信息,然后将任务代码再反序列化得到可执行代码,最后再结合其他任务信息创建
TaskRunner
。当数据过于分散,分布式任务数量会大幅增加,但每个任务需要处理的数据
量却少之又少,就
CPU
消耗来说,相比花在数据处理上的比例,任务调度上的开销几乎与
之分庭抗礼。显然,在这种情况下,
CPU
的有效利用率也是极低的。
2.2.2
合理利用
CPU
资源
每个并行度的数据量(总数据量
/
并行度) 在(
Executor
内存
/core
数
/2, Executor
内存
/core
数)区间
提交执行:
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num
executors 3 --executor-cores 4 --executor-memory 6g --class
com.atguigu.sparktuning.partition.PartitionDemo spark-tuning-1.0-
SNAPSHOT-jar-with-dependencies.jar
去向
yarn
申请的
executor vcore
资源个数为
12
个(
num-executors*executor-cores
)
,
如
果不修改
spark sql
分区个数,那么就会像上图所展示存在
cpu
空转的情况。这个时候需要
合理控制
shuffle
分区个数。如果想要让任务运行的最快当然是一个
task
对应一个
vcore,
但
是一般不会这样设置,为了合理利用资源,一般会将并行度(
task
数)设置成并发度
(
vcore
数)的
2
倍到
3
倍。
修改参数
spark.sql.shuffle.partitions
(默认
200
)
,
根据我们当前任务的提交参数有
12
个
vcore
,将此参数设置为
24
或
36
为最优效果:
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num
executors 3 --executor-cores 4 --executor-memory 6g --class
com.atguigu.sparktuning.partition.PartitionTuning spark-tuning-1.0-
SNAPSHOT-jar-with-dependencies.jar