最近CRM平台上报的数据,DI ingestion老出问题,需要做一个CRM平台的上报数据与后台统计数据的基础对账功能。大概步骤如下:
发现在第3步,即统计昨天每个task的用户数时,spark任务OOM了。
示例代码
explain
with be as (
select
task_tab.region as region,
journey_tab.journey_name as journey_name,
task_tab.journey_id as journey_id,
task_tab.version_id as version_id,
task_tab.id as task_id,
task_tab.total_user_count as be_user_count
from table_xx1 as task_tab
left join table_xx2 as journey_tab on journey_tab.id = task_tab.journey_id and journey_tab.region = task_tab.region
left join table_xx3 as version_tab on version_tab.id = task_tab.version_id and version_tab.region = task_tab.region
where task_tab.create_time >= unix_timestamp('2022-11-03', 'yyyy-MM-dd')
and task_tab.create_time < unix_timestamp('2022-11-04', 'yyyy-MM-dd')
and get_json_object(version_tab.schedule_setting,'$.repeatSetting.frequency') = 1
),
de as (
select
be.region as region,
be.journey_name as journey_name,
be.journey_id as journey_id,
be.version_id as version_id,
be.task_id as task_id,
count(distinct user_id) as de_user_count
from be
left join table_xx4 as tmp on tmp.grass_region = be.region
and tmp.journey_id = be.journey_id
and tmp.task_id = be.task_id
and tmp.grass_date = '2022-11-03'
group by 1,2,3,4,5
)
select
be.region,
be.journey_name,
be.journey_id,
be.version_id,
be.task_id,
be.be_user_count,
de.de_user_count
from be
left join de on de.region = be.region and de.task_id = be.task_id
;
发现一直卡在下面这段SQL:
select
be.region as region,
be.journey_name as journey_name,
be.journey_id as journey_id,
be.version_id as version_id,
be.task_id as task_id,
count(distinct user_id) as de_user_count
from be
left join table_xx4 as tmp on tmp.grass_region = be.region
and tmp.journey_id = be.journey_id
and tmp.task_id = be.task_id
and tmp.grass_date = '2022-11-03'
group by 1,2,3,4,5
统计分析发现,其实临时表be的数据量只有几十条,ods_crm_di_user_task_rt_live虽然过滤完还有20亿条左右的数据,理论上来说会进行小表广播优化,不应该那么慢,看yarn ui分析,发现大量spill disk和shuffle操作。
初步分析有两个问题:一方面是大小表join顺序问题,二是物理计划解析对临时表无法识别大小。临时表be没有被判断出是小表,无法进行优化并广播,直接采用普通的left join,详细分析看下面。
explain sql查看原来sql物理计划,left join底层实际是用SortMergeJoin,没有按预期进行小表广播,没有用预期的BroadcastHashJoin。
left join改成inner join不影响最终想要的结果,看看物理执行计划,inner join底层还是SortMergeJoin,还是没有按预期优化成BroadcastHashJoin。
代码示例如下,调整大小表left join顺序,大表在前,小表在后,也还是无法将SortMergeJoin优化成BroadcastHashJoin!
示例代码
explain
with be as (
select
task_tab.region as region,
journey_tab.journey_name as journey_name,
task_tab.journey_id as journey_id,
task_tab.version_id as version_id,
task_tab.id as task_id,
task_tab.total_user_count as be_user_count
from table_xx1 as task_tab
left join table_xx2 as journey_tab on journey_tab.id = task_tab.journey_id and journey_tab.region = task_tab.region
left join table_xx3 as version_tab on version_tab.id = task_tab.version_id and version_tab.region = task_tab.region
where task_tab.create_time >= unix_timestamp('2022-11-03', 'yyyy-MM-dd')
and task_tab.create_time < unix_timestamp('2022-11-04', 'yyyy-MM-dd')
and get_json_object(version_tab.schedule_setting,'$.repeatSetting.frequency') = 1
),
de as (
select
/*+ BROADCAST(be) */
be.region as region,
be.journey_name as journey_name,
be.journey_id as journey_id,
be.version_id as version_id,
be.task_id as task_id,
count(distinct user_id) as de_user_count
from be
left join table_xx4 as tmp on tmp.grass_region = be.region
and tmp.journey_id = be.journey_id
and tmp.task_id = be.task_id
and tmp.grass_date = '2022-11-03'
group by 1,2,3,4,5
)
select
be.region,
be.journey_name,
be.journey_id,
be.version_id,
be.task_id,
be.be_user_count,
de.de_user_count
from be
left join de on de.region = be.region and de.task_id = be.task_id
;
示例代码
explain
with be as (
select
task_tab.region as region,
journey_tab.journey_name as journey_name,
task_tab.journey_id as journey_id,
task_tab.version_id as version_id,
task_tab.id as task_id,
task_tab.total_user_count as be_user_count
from table_xx1 as task_tab
left join table_xx2 as journey_tab on journey_tab.id = task_tab.journey_id and journey_tab.region = task_tab.region
left join table_xx3 as version_tab on version_tab.id = task_tab.version_id and version_tab.region = task_tab.region
where task_tab.create_time >= unix_timestamp('2022-11-03', 'yyyy-MM-dd')
and task_tab.create_time < unix_timestamp('2022-11-04', 'yyyy-MM-dd')
and get_json_object(version_tab.schedule_setting,'$.repeatSetting.frequency') = 1
),
de as (
select
/*+ BROADCAST(be) */
be.region as region,
be.journey_name as journey_name,
be.journey_id as journey_id,
be.version_id as version_id,
be.task_id as task_id,
count(distinct user_id) as de_user_count
from be
inner join table_xx4 as tmp on tmp.grass_region = be.region
and tmp.journey_id = be.journey_id
and tmp.task_id = be.task_id
and tmp.grass_date = '2022-11-03'
group by 1,2,3,4,5
)
select
be.region,
be.journey_name,
be.journey_id,
be.version_id,
be.task_id,
be.be_user_count,
de.de_user_count
from be
left join de on de.region = be.region and de.task_id = be.task_id
;
示例代码
explain
with be as (
select
task_tab.region as region,
journey_tab.journey_name as journey_name,
task_tab.journey_id as journey_id,
task_tab.version_id as version_id,
task_tab.id as task_id,
task_tab.total_user_count as be_user_count
from table_xx1 as task_tab
left join table_xx2 as journey_tab on journey_tab.id = task_tab.journey_id and journey_tab.region = task_tab.region
left join table_xx3 as version_tab on version_tab.id = task_tab.version_id and version_tab.region = task_tab.region
where task_tab.create_time >= unix_timestamp('2022-11-03', 'yyyy-MM-dd')
and task_tab.create_time < unix_timestamp('2022-11-04', 'yyyy-MM-dd')
and get_json_object(version_tab.schedule_setting,'$.repeatSetting.frequency') = 1
),
de as (
select
/*+ BROADCAST(be) */
be.region as region,
be.journey_name as journey_name,
be.journey_id as journey_id,
be.version_id as version_id,
be.task_id as task_id,
count(distinct user_id) as de_user_count
from table_xx4 as tmp
inner join be on tmp.grass_region = be.region
and tmp.journey_id = be.journey_id
and tmp.task_id = be.task_id
and tmp.grass_date = '2022-11-03'
group by 1,2,3,4,5
)
select
be.region,
be.journey_name,
be.journey_id,
be.version_id,
be.task_id,
be.be_user_count,
de.de_user_count
from be
left join de on de.region = be.region and de.task_id = be.task_id
;
将临时小表落地到中间表
drop table temp;
create table temp
select
task_tab.region as region,
journey_tab.journey_name as journey_name,
task_tab.journey_id as journey_id,
task_tab.version_id as version_id,
task_tab.id as task_id,
task_tab.total_user_count as be_user_count
from table_xx1 as task_tab
left join table_xx2 as journey_tab on journey_tab.id = task_tab.journey_id and journey_tab.region = task_tab.region
left join table_xx3 as version_tab on version_tab.id = task_tab.version_id and version_tab.region = task_tab.region
where task_tab.create_time >= unix_timestamp('2022-11-03', 'yyyy-MM-dd')
and task_tab.create_time < unix_timestamp('2022-11-04', 'yyyy-MM-dd')
and get_json_object(version_tab.schedule_setting,'$.repeatSetting.frequency') = 1
;
示例代码
explain
select
tmp.grass_regionasregion,
tmp.journey_nameasjourney_name,
tmp.journey_idasjourney_id,
tmp.version_idasversion_id,
tmp.task_idastask_id,
count(distinctuser_id)asde_user_count
fromtempasbe
leftjointable_xx4astmpontmp.grass_region = be.region
andtmp.journey_id = be.journey_id
andtmp.task_id = be.task_id
andtmp.grass_date ='2022-11-03'
andtmp.grass_region !='ID'
groupby1,2,3,4,5;
示例代码
explain
select
tmp.grass_regionasregion,
tmp.journey_nameasjourney_name,
tmp.journey_idasjourney_id,
tmp.version_idasversion_id,
tmp.task_idastask_id,
count(distinctuser_id)asde_user_count
fromtempasbe
innerjointable_xx4 astmpontmp.grass_region = be.region
andtmp.journey_id = be.journey_id
andtmp.task_id = be.task_id
andtmp.grass_date ='2022-11-03'
andtmp.grass_region !='ID'
groupby1,2,3,4,5;
示例代码
explain
select
tmp.grass_regionasregion,
tmp.journey_nameasjourney_name,
tmp.journey_idasjourney_id,
tmp.version_idasversion_id,
tmp.task_idastask_id,
count(distinctuser_id)asde_user_count
fromtable_xx4 astmp
innerjointempasbeontmp.grass_region = be.region
andtmp.journey_id = be.journey_id
andtmp.task_id = be.task_id
andtmp.grass_date ='2022-11-03'
andbe.task_idisnotnull
groupby1,2,3,4,5;