A任务在凌晨1点到3点,平均耗时1.5h,且是核心公共任务,急需优化。
整体代码逻辑示意:
// 从tableA读取一次数据,放到临时表t1
DROP TABLE IF EXISTS temp.tmp_xx_$date_1;
CREATE TABLE IF NOT EXISTS temp.tmp_xxx_$date_1
as
select
xxx
from tableA
where xxxx;
// 从临时表t1读取和转换数据,得临时表t2
DROP TABLE IF EXISTS temp.tmp_xx_$date_2;
CREATE TABLE IF NOT EXISTS temp.tmp_xxx_$date_2
as
select
xxx
from temp.tmp_xx_$date_1
where xxxx;
// 从临时表t1读取和转换数据,得临时表t3
DROP TABLE IF EXISTS temp.tmp_xx_$date_3;
CREATE TABLE IF NOT EXISTS temp.tmp_xxx_$date_3
as
select
xxx
from temp.tmp_xx_$date_1
where xxxx;
//合并t2,t3结果写入最终结果表
INSERT OVERWRITE TABLE biads.xxxx
PARTITION (pt_d='$date')
select
xxx
from temp.tmp_xx_$date_2
union all
select
xxx
from temp.tmp_xx_$date_3
问题1: 读取tableA耗时20min , 读取时间较长
问题2: 写入临时表t1耗时20min,写入临时表时间较长
问题3:创建和写入临时表t2,t3 耗时近20min,临时表冗余,
问题4: executor中task分布不均,存在部分exectuor运行了20-30个task,而其余只运行了1个task
问题1,2,4—参数优化
// 增大读取task数量
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 67108864
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 1
// 减小合并小文件的大小,注意自研spark的合并小文件大小参数
spark.sql.mergefile.maxSize 134217728
// 增大driver资源,减轻gc
spark.driver.memory 8G
spark.driver.cores 4
// 避免executor中task倾斜
spark.locality.wait.process 200
spark.locality.wait.node 200
spark.locality.wait.rack 200
问题3-- 逻辑优化
// 从tableA读取一次数据,放到临时表t1
DROP TABLE IF EXISTS temp.tmp_xx_$date_1;
CREATE TABLE IF NOT EXISTS temp.tmp_xxx_$date_1
as
select
xxx
from tableA
where xxxx;
//消除中间临时表,直接读取t1, 写入最终结果表
INSERT OVERWRITE TABLE biads.xxxx
PARTITION (pt_d='$date')
select
xxx
from temp.tmp_xx_$date_1
where xxxx;
union all
select
xxx
from temp.tmp_xx_$date_1
where xxxx;
问题1,2耗时分布降低至10min左右;
问题3耗时直接消除。
问题4 task倾斜缓解。
总体耗时从100min减少为50min