• Spark Join类型和适用的场景


    本文档描述了 Join 类型和适用的场景。示例仅从语法上进行描述,并不一定具有实际意义。

    1. BroadcastJoin

    BroadcastJoin 至少有一方的数据量小于指定值。如果两方都小于,则 broad cast 数据量小的一方。有 BroadcastHashJoin 和 BroadcastNestedJoin 两种。

    employee 表的定义如下:

    Emp_ Iddept_Idsalary
    e1d1100
    e2d1200
    e3d2300
    e4d4300

    dept 表的定义如下

    dept_Iddept_nameavg_salarycategory
    d1行政部1001
    d2财务部2501
    d3销售部02

    1.1 BroadcastHashJoin

    假设 dept 表的数据量小于阈值,employee 的数据量交大。适用于按键值=的处理。

    select employee.emp_id, dept.dept_name from employee join dept on employee.dept_id = dept.dept_id;
    
    • 1

    以上的语句,由于 dept 表的数据量比较小,广播到所有 employee 分区所在的 worker 中,构建 hash 表。计算时,把本分区的 employee 记录依次取出,判断 hash 表是否有对于的 数据。

    找到 category = 1 的所有员工和部门名称

    select * from employee where exist (select * from dept where employee.dept_id = dept.dept_id and category=1)
    
    • 1

    1.2 BroadcastNestedJoin

    指可以用广播的方法,但是不能用 hash 计算,一般关联的条件不是 =,或者多个 or 条件。
    假设 dept 表的数据量小于阈值,employee 的数据量交大。
    SQL1:关联的条件不是 =

    select * from employee join dept on employee.salary > dept.avg_salary
    
    • 1

    SQL2: 多个 or 条件

    select * from employee join dept on employee.dept_id= dept.dept_id or employee.salary=dept.avg_salary
    
    • 1

    SQL3: full outer join

    select * from employee full outer join dept on employee.dept_id= dept.dept_id or employee.salary=dept.avg_salary
    
    • 1

    2. Sort Merge Join

    分为可以按 join 的 keys 进行排序,或者数据本来有序两种情况。

    2.1 可以按 join 的 keys 进行排序

    如果可以排序,在Map 端对每个 output partition 的数据按照 join 的 key 进行排序,那么在 reduce 端,直接 merge 就可以。使用于连接的两张表都比较大的情况。需要连接的条件是 =,多个条件用 and 连接。

    select employee.emp_id, dept.dept_name from employee join dept on employee.dept_id = dept.dept_id;
    
    • 1

    2.2 数据本来有序(Hive 支持)

    还是执行此SQL

    select employee.emp_id, dept.dept_name from employee join dept on employee.dept_id = dept.dept_id;
    
    • 1

    如果 employee 和 dept 表在创建的时候, clustered by 和 order by 的字段就是关联的字段,并且 bucket 的数量相等。如果数据本来有序,则在 Map 端直接可以 join。

    3. Shuffled Hash join

    关联表的数据都大于 broadcast 的阈值。
    需要按 按 join 的 keys 都是 = 并且用 and 连接。需要按照关联的 keys 进行 hash 计算,使 key 相同的数据进入同一个 reduce。在 reduce 端,用数据量小的一方的表构建 hash 表。
    如SQL:

    select employee.emp_id, dept.dept_name from employee join dept on employee.dept_id = dept.dept_id;
    
    • 1

    条件:一方的数据是另一方的至少3倍,并且 spark.sql.join.preferSortMergeJoin 是 false,或者spark.sql.jion.forceApplyShuffledHashJoin = true。

    4. CartesianProduct 迪卡尔积

    不符合 Shuffled Hash join 的内连接用 CartesianProduct 计算。用此方法,如 C = A join B,则 C 的分区数是 A的分区数 * B 的分区数。

    select employee.emp_id, dept.dept_name from employee join dept on employee.salary < dept.avg_salary
    
    • 1

    5. BroatcastNLJoin

    用于两张表都很大,外连接。需要把数据量小的一方完全 broadcast,容易 OOM,但是没有其他更好方案。

    select employee.emp_id, dept.dept_name from employee full outer join dept on employee.dept_id = dept.dept_id;
    
    • 1
  • 相关阅读:
    Java#17(static)
    分享一个网上搜不到的「Redis」实现「聊天回合制」的方案
    使用Julia语言和R语言实现K-均值
    2023中国(深圳)国际设备维护、状态监测及故障诊断维修展览会
    农业信息化技术导论886笔记复习
    从8连挂到面面offer,我只用了一个月,最后定薪25K,分享面经血泪史...
    els 兼容性DC、传递图片到窗口
    企业出海打造爆款游戏,需要什么样的云服务?
    长尾关键词优化 网站是卖大闸蟹的 怎么用长尾关键词优化网站
    QT_字符串相关操作_QString
  • 原文地址:https://blog.csdn.net/houzhizhen/article/details/126361977