• 小表join顺序和广播问题


    1. 问题背景

    最近CRM平台上报的数据,DI ingestion老出问题,需要做一个CRM平台的上报数据与后台统计数据的基础对账功能。大概步骤如下:

    1. CRM用flink + hive方式,ingestion到近实时日志表
    2. 统计后台昨天entry的所有task,及每个task的用户数
    3. 根据后台统计的task,统计日志表昨天每个task用户数
    4. 后台统计的每个task用户数与日志表统计的用户数进行对比

    发现在第3步,即统计昨天每个task的用户数时,spark任务OOM了。

    2. 问题分析

    示例代码

    explain
    with be as (
      select
        task_tab.region as region,
        journey_tab.journey_name as journey_name,
        task_tab.journey_id as journey_id,
        task_tab.version_id as version_id,
        task_tab.id as task_id,
        task_tab.total_user_count as be_user_count
      from table_xx1 as task_tab
      left join table_xx2 as journey_tab on journey_tab.id = task_tab.journey_id and journey_tab.region = task_tab.region
      left join table_xx3 as version_tab on version_tab.id = task_tab.version_id and version_tab.region = task_tab.region
      where task_tab.create_time >= unix_timestamp('2022-11-03', 'yyyy-MM-dd')
        and task_tab.create_time < unix_timestamp('2022-11-04', 'yyyy-MM-dd')
        and get_json_object(version_tab.schedule_setting,'$.repeatSetting.frequency') = 1
    ),
    de as (
      select
        be.region as region,
        be.journey_name as journey_name,
        be.journey_id as journey_id,
        be.version_id as version_id,
        be.task_id as task_id,
        count(distinct user_id) as de_user_count
      from be
      left join table_xx4 as tmp on tmp.grass_region = be.region
        and tmp.journey_id = be.journey_id
        and tmp.task_id = be.task_id
        and tmp.grass_date = '2022-11-03'
      group by 1,2,3,4,5
    )
    select
      be.region,
      be.journey_name,
      be.journey_id,
      be.version_id,
      be.task_id,
      be.be_user_count,
      de.de_user_count
    from be
    left join de on de.region = be.region and de.task_id = be.task_id
    ;

    发现一直卡在下面这段SQL:

    select
      be.region as region,
      be.journey_name as journey_name,
      be.journey_id as journey_id,
      be.version_id as version_id,
      be.task_id as task_id,
      count(distinct user_id) as de_user_count
    from be
    left join table_xx4 as tmp on tmp.grass_region = be.region
      and tmp.journey_id = be.journey_id
      and tmp.task_id = be.task_id
      and tmp.grass_date = '2022-11-03'
    group by 1,2,3,4,5

    统计分析发现,其实临时表be的数据量只有几十条,ods_crm_di_user_task_rt_live虽然过滤完还有20亿条左右的数据,理论上来说会进行小表广播优化,不应该那么慢,看yarn ui分析,发现大量spill disk和shuffle操作。

    初步分析有两个问题:一方面是大小表join顺序问题,二是物理计划解析对临时表无法识别大小。临时表be没有被判断出是小表,无法进行优化并广播,直接采用普通的left join,详细分析看下面。

    2.1. 临时小表join大表与broadcast

    2.1.1 临时小表left join大表:不广播

    explain sql查看原来sql物理计划,left join底层实际是用SortMergeJoin,没有按预期进行小表广播,没有用预期的BroadcastHashJoin。

    2.1.2. 临时小表inner join大表:不广播

    left join改成inner join不影响最终想要的结果,看看物理执行计划,inner join底层还是SortMergeJoin,还是没有按预期优化成BroadcastHashJoin。

     2.1.3. 大表left join临时小表:不广播

    代码示例如下,调整大小表left join顺序,大表在前,小表在后,也还是无法将SortMergeJoin优化成BroadcastHashJoin!

    2.2. BroadcastHashJoin与hint用法

    2.2.1. 临时小表left join大表 + hint广播:不广播

    示例代码

    explain
    with be as (
        select
            task_tab.region as region,
            journey_tab.journey_name as journey_name,
            task_tab.journey_id as journey_id,
            task_tab.version_id as version_id,
            task_tab.id as task_id,
            task_tab.total_user_count as be_user_count
        from table_xx1 as task_tab
        left join table_xx2 as journey_tab on journey_tab.id = task_tab.journey_id and journey_tab.region = task_tab.region
        left join table_xx3 as version_tab on version_tab.id = task_tab.version_id and version_tab.region = task_tab.region
        where task_tab.create_time >= unix_timestamp('2022-11-03', 'yyyy-MM-dd')
            and task_tab.create_time < unix_timestamp('2022-11-04', 'yyyy-MM-dd')
            and get_json_object(version_tab.schedule_setting,'$.repeatSetting.frequency') = 1
    ),
    de as (
        select
            /*+ BROADCAST(be) */
            be.region as region,
            be.journey_name as journey_name,
            be.journey_id as journey_id,
            be.version_id as version_id,
            be.task_id as task_id,
            count(distinct user_id) as de_user_count
        from be
        left join table_xx4 as tmp on tmp.grass_region = be.region
            and tmp.journey_id = be.journey_id
            and tmp.task_id = be.task_id
            and tmp.grass_date = '2022-11-03'
        group by 1,2,3,4,5
    )
    select
        be.region,
        be.journey_name,
        be.journey_id,
        be.version_id,
        be.task_id,
        be.be_user_count,
        de.de_user_count
    from be
    left join de on de.region = be.region and de.task_id = be.task_id
    ;

    2.2.2 临时小表inner join大表 + hint广播:广播

    示例代码

    explain
    with be as (
        select
            task_tab.region as region,
            journey_tab.journey_name as journey_name,
            task_tab.journey_id as journey_id,
            task_tab.version_id as version_id,
            task_tab.id as task_id,
            task_tab.total_user_count as be_user_count
        from table_xx1 as task_tab
        left join table_xx2 as journey_tab on journey_tab.id = task_tab.journey_id and journey_tab.region = task_tab.region
        left join table_xx3 as version_tab on version_tab.id = task_tab.version_id and version_tab.region = task_tab.region
        where task_tab.create_time >= unix_timestamp('2022-11-03', 'yyyy-MM-dd')
            and task_tab.create_time < unix_timestamp('2022-11-04', 'yyyy-MM-dd')
            and get_json_object(version_tab.schedule_setting,'$.repeatSetting.frequency') = 1
    ),
    de as (
        select
            /*+ BROADCAST(be) */
            be.region as region,
            be.journey_name as journey_name,
            be.journey_id as journey_id,
            be.version_id as version_id,
            be.task_id as task_id,
            count(distinct user_id) as de_user_count
        from be
        inner join table_xx4 as tmp on tmp.grass_region = be.region
            and tmp.journey_id = be.journey_id
            and tmp.task_id = be.task_id
            and tmp.grass_date = '2022-11-03'
        group by 1,2,3,4,5
    )
    select
        be.region,
        be.journey_name,
        be.journey_id,
        be.version_id,
        be.task_id,
        be.be_user_count,
        de.de_user_count
    from be
    left join de on de.region = be.region and de.task_id = be.task_id
    ;

    2.2.3. 大表left join临时小表 + hint广播:广播

    示例代码

    explain
    with be as (
        select
            task_tab.region as region,
            journey_tab.journey_name as journey_name,
            task_tab.journey_id as journey_id,
            task_tab.version_id as version_id,
            task_tab.id as task_id,
            task_tab.total_user_count as be_user_count
        from table_xx1 as task_tab
        left join table_xx2 as journey_tab on journey_tab.id = task_tab.journey_id and journey_tab.region = task_tab.region
        left join table_xx3 as version_tab on version_tab.id = task_tab.version_id and version_tab.region = task_tab.region
        where task_tab.create_time >= unix_timestamp('2022-11-03', 'yyyy-MM-dd')
            and task_tab.create_time < unix_timestamp('2022-11-04', 'yyyy-MM-dd')
            and get_json_object(version_tab.schedule_setting,'$.repeatSetting.frequency') = 1
    ),
    de as (
        select
            /*+ BROADCAST(be) */
            be.region as region,
            be.journey_name as journey_name,
            be.journey_id as journey_id,
            be.version_id as version_id,
            be.task_id as task_id,
            count(distinct user_id) as de_user_count
        from table_xx4 as tmp
        inner join be on tmp.grass_region = be.region
            and tmp.journey_id = be.journey_id
            and tmp.task_id = be.task_id
            and tmp.grass_date = '2022-11-03'
        group by 1,2,3,4,5
    )
    select
        be.region,
        be.journey_name,
        be.journey_id,
        be.version_id,
        be.task_id,
        be.be_user_count,
        de.de_user_count
    from be
    left join de on de.region = be.region and de.task_id = be.task_id
    ;

    2.3 中间小表join大表与broadcast

    将临时小表落地到中间表

    drop table temp;
    create table temp
    select
        task_tab.region as region,
        journey_tab.journey_name as journey_name,
        task_tab.journey_id as journey_id,
        task_tab.version_id as version_id,
        task_tab.id as task_id,
        task_tab.total_user_count as be_user_count
    from table_xx1 as task_tab
    left join table_xx2 as journey_tab on journey_tab.id = task_tab.journey_id and journey_tab.region = task_tab.region
    left join table_xx3 as version_tab on version_tab.id = task_tab.version_id and version_tab.region = task_tab.region
    where task_tab.create_time >= unix_timestamp('2022-11-03', 'yyyy-MM-dd')
        and task_tab.create_time < unix_timestamp('2022-11-04', 'yyyy-MM-dd')
        and get_json_object(version_tab.schedule_setting,'$.repeatSetting.frequency') = 1
    ;

    2.3.1. 中间表小left join大表:不广播

    示例代码

    explain

    select

        tmp.grass_region as region,

        tmp.journey_name as journey_name,

        tmp.journey_id as journey_id,

        tmp.version_id as version_id,

        tmp.task_id as task_id,

        count(distinct user_id) as de_user_count

    from temp as be

    left join table_xx4 as tmp on tmp.grass_region = be.region

        and tmp.journey_id = be.journey_id

        and tmp.task_id = be.task_id

        and tmp.grass_date = '2022-11-03'

        and tmp.grass_region != 'ID'

    group by 1,2,3,4,5;

    2.3.1. 中间表小inner join大表:广播

    示例代码

    explain

    select

        tmp.grass_region as region,

        tmp.journey_name as journey_name,

        tmp.journey_id as journey_id,

        tmp.version_id as version_id,

        tmp.task_id as task_id,

        count(distinct user_id) as de_user_count

    from temp as be

    inner join table_xx4 as tmp on tmp.grass_region = be.region

        and tmp.journey_id = be.journey_id

        and tmp.task_id = be.task_id

        and tmp.grass_date = '2022-11-03'

        and tmp.grass_region != 'ID'

    group by 1,2,3,4,5;

    2.3.3. 大表left join中间小表:广播

    示例代码

    explain

    select

        tmp.grass_region as region,

        tmp.journey_name as journey_name,

        tmp.journey_id as journey_id,

        tmp.version_id as version_id,

        tmp.task_id as task_id,

        count(distinct user_id) as de_user_count

    from table_xx4 as tmp

    inner join temp as be on tmp.grass_region = be.region

        and tmp.journey_id = be.journey_id

        and tmp.task_id = be.task_id

        and tmp.grass_date = '2022-11-03'

        and be.task_id is not null

    group by 1,2,3,4,5;

    3. 问题总结

    1.  临时表:复杂sql生成的临时表,哪些只有少量数据,spark-sql并不会识别临时表大小并进行优化和广播
    2. left join顺序:小表left join大表,无论如何也不会对小表进行广播;想要广播小表,必须大表left join小表
    3. hint显式广播:临时小表inner join大表、大表left join临时小表,可以用hint显示指定广播,达到SortMergeJoin优化成BroadcastHashJoin的目的
    4. 中间表替换临时表:临时小表落地到中间表,中间小表inner join或者大表、大表left join中间小表,无需hint显示指定广播,spark-sql都可以自动识别优化,SortMergeJoin优化成BroadcastHashJoin
  • 相关阅读:
    安装程序2502/2503错误的解决方法
    信息技术服务连续性策略
    Flink的处理函数——processFunction
    【EMQX】2.1.4 EMQ X 有哪些产品
    Docker-compose详解和LNMP搭建实战
    怎么在插件列表中隐藏一个WordPress插件?
    [Qualcomm][GPIO]高通芯片引脚相关知识记录
    全国地级市城镇化和协调发展指数测算数据(2005-2019)六份数据
    会议OA项目-首页->flex弹性布局,轮播图后台数据获取及组件使用(后台数据交互mockjs),首页布局
    【算法】单调栈
  • 原文地址:https://blog.csdn.net/L13763338360/article/details/127734970