• Spark Join类型和适用的场景


    本文档描述了 Join 类型和适用的场景。示例仅从语法上进行描述,并不一定具有实际意义。

    1. BroadcastJoin

    BroadcastJoin 至少有一方的数据量小于指定值。如果两方都小于,则 broad cast 数据量小的一方。有 BroadcastHashJoin 和 BroadcastNestedJoin 两种。

    employee 表的定义如下:

    Emp_ Iddept_Idsalary
    e1d1100
    e2d1200
    e3d2300
    e4d4300

    dept 表的定义如下

    dept_Iddept_nameavg_salarycategory
    d1行政部1001
    d2财务部2501
    d3销售部02

    1.1 BroadcastHashJoin

    假设 dept 表的数据量小于阈值,employee 的数据量交大。适用于按键值=的处理。

    select employee.emp_id, dept.dept_name from employee join dept on employee.dept_id = dept.dept_id;
    
    • 1

    以上的语句,由于 dept 表的数据量比较小,广播到所有 employee 分区所在的 worker 中,构建 hash 表。计算时,把本分区的 employee 记录依次取出,判断 hash 表是否有对于的 数据。

    找到 category = 1 的所有员工和部门名称

    select * from employee where exist (select * from dept where employee.dept_id = dept.dept_id and category=1)
    
    • 1

    1.2 BroadcastNestedJoin

    指可以用广播的方法,但是不能用 hash 计算,一般关联的条件不是 =,或者多个 or 条件。
    假设 dept 表的数据量小于阈值,employee 的数据量交大。
    SQL1:关联的条件不是 =

    select * from employee join dept on employee.salary > dept.avg_salary
    
    • 1

    SQL2: 多个 or 条件

    select * from employee join dept on employee.dept_id= dept.dept_id or employee.salary=dept.avg_salary
    
    • 1

    SQL3: full outer join

    select * from employee full outer join dept on employee.dept_id= dept.dept_id or employee.salary=dept.avg_salary
    
    • 1

    2. Sort Merge Join

    分为可以按 join 的 keys 进行排序,或者数据本来有序两种情况。

    2.1 可以按 join 的 keys 进行排序

    如果可以排序,在Map 端对每个 output partition 的数据按照 join 的 key 进行排序,那么在 reduce 端,直接 merge 就可以。使用于连接的两张表都比较大的情况。需要连接的条件是 =,多个条件用 and 连接。

    select employee.emp_id, dept.dept_name from employee join dept on employee.dept_id = dept.dept_id;
    
    • 1

    2.2 数据本来有序(Hive 支持)

    还是执行此SQL

    select employee.emp_id, dept.dept_name from employee join dept on employee.dept_id = dept.dept_id;
    
    • 1

    如果 employee 和 dept 表在创建的时候, clustered by 和 order by 的字段就是关联的字段,并且 bucket 的数量相等。如果数据本来有序,则在 Map 端直接可以 join。

    3. Shuffled Hash join

    关联表的数据都大于 broadcast 的阈值。
    需要按 按 join 的 keys 都是 = 并且用 and 连接。需要按照关联的 keys 进行 hash 计算,使 key 相同的数据进入同一个 reduce。在 reduce 端,用数据量小的一方的表构建 hash 表。
    如SQL:

    select employee.emp_id, dept.dept_name from employee join dept on employee.dept_id = dept.dept_id;
    
    • 1

    条件:一方的数据是另一方的至少3倍,并且 spark.sql.join.preferSortMergeJoin 是 false,或者spark.sql.jion.forceApplyShuffledHashJoin = true。

    4. CartesianProduct 迪卡尔积

    不符合 Shuffled Hash join 的内连接用 CartesianProduct 计算。用此方法,如 C = A join B,则 C 的分区数是 A的分区数 * B 的分区数。

    select employee.emp_id, dept.dept_name from employee join dept on employee.salary < dept.avg_salary
    
    • 1

    5. BroatcastNLJoin

    用于两张表都很大,外连接。需要把数据量小的一方完全 broadcast,容易 OOM,但是没有其他更好方案。

    select employee.emp_id, dept.dept_name from employee full outer join dept on employee.dept_id = dept.dept_id;
    
    • 1
  • 相关阅读:
    父子盒子边距塌陷之为什么设置margin-top父子盒子会一起移动
    小程序npm包--API Promise化
    SpringBoot分页其实很简单
    【Netty 的设计与应用】
    我的AIGC部署实践03
    [ vulhub漏洞复现篇 ] ActiveMQ反序列化漏洞复现(CVE-2015-5254)
    mac系统如何安装nacos(window系统通用)?详细教程一文解决
    数据结构与算法 树 课后习题 选择题部分
    2.3.C++项目:网络版五子棋对战之实用工具类模块的设计
    MySQL主键类型使用int、bigint、varchar区别
  • 原文地址:https://blog.csdn.net/houzhizhen/article/details/126361977