• 大数据之路读书笔记-13计算管理


    大数据之路读书笔记-13计算管理

    目前内部 MaxCompute 集群上有 200 多万个任务,每天存储资源、计算资源消耗都很大。 如何降低计算资源的消耗,提高任务执行的性能,提升任务产出 的时间,是计算平台和 ETL 开发工程师孜孜追求的目标。本章分别从系统优化和任务优化 面介绍计算优化。


    13.1 系统优化

    Hadoop 等分布式计算系统评估资源的方式,一般是根据输入数据量进行静态评估, Map任务用于处理输入,对于普通的Map 任务,评估一般符合预期 :而对于 Reduce 任务,其输入来自于 Map 的输出,但一般只能根据 Map 任务的输入进行评估,经常和实际需要的资源数相差很大 ,所以在任务稳定的情况下,可以考虑基于任务的历史执行情况进行资源评估, 采用 HBO (History-Based Optimizer, 基于历史的优化器)。

    提到 CBO (Cost-Based Optimizer 基于代价的优化器),首先会想到Oracle CBO Oracle 会根据收集到的表、分区、索引等统计信息来计算每种执行方式的代价( Cost ),进而选择其中最优的执行方式。一般来说,对于更多的、更准确的统计信息, CBO 则可能生成代价更小的执行计划。但对表和列上统计信息的收集也是有代价的,尤其是在大数据环境下,表的体量巨大,消耗大量资源收集的统计信息利用率很低。 MaxCompute 采用各种抽样统计算法,通过较少的资源获得大量的统计信息,基于先进的优化模型,具备了完善的 CBO 能力,与传统的大数据计算系统相比,性能提升明显。

    13.1.1 HBO

    HBO 是根据任务历史执行情况为任务分配更合理的资源,包括内存、 CPU 以及 Instance 个数。 HBO 是对集群资源分配的 种优化,概括起来就是:任务执行历史+集群状态信息+优化规则→更优的执行配置。
    1 背景
    ( 1) MaxCompute 原资源分配策略
    首先看一下 MaxCompute 最初是如何分配 MR 执行过程的Instance个数的。默认的 Instance 分配算法如 13.1 示。在这里插入图片描述
    在这个分配算法的基础上,根据历史数据统计,各个 Instance 处理的数据量分布如下:

    ● Map Instance
    在这里插入图片描述
    ● Reduce Instance在这里插入图片描述
    ● Join Instance
    在这里插入图片描述

    注解: fivenum是R语言中统计数据分布的函数,统计值有5个,分别是最小值、下四分位数、 中位数、上四分位数、最大值。$map input bytes $reduce_input_ bytes $j in_input_bytes 分别表示Map Reduce Join 三种 Task 的输入数据量(bytes )。

    从上面内容可以看出,大部分 Instance 处理的数据量远远没有达到预期,即一个 Instance 处理 256MB 的数据。同时有些 Instance 处理的数据量很大,很容易导致任务长尾。

    假如把处理数据量小的任务称作小任务,处理数据量大的任务称作大任务,总结 :在默认的 Instance 算法下,小任务存在资源浪费,而大任务却资源不足。综上所述,需要有更合理的方法来进行资源分配,HBO 应运而生。

    (2) HBO 的提出
    通过数据分析,发现在系统中存在大量的周期性调度的脚本(物理计划稳定),且这些脚本的输入一般比较稳定,如果能对这部分脚本进行优化,那么对整个集群的计算资源的使用率将会得到显著提升。由此,我们想到了 HBO ,根据任务的执行历史为其分配更合理的计算源。HBO一般通过自适应调整系统参数来达到控制计算资源的目的。
    2. HBO原理
    HBO 分配资源的步骤如下:

    ● 前提 :最近7天内任务代码没有发生变更且任务运行4次。

    ● Instance 分配逻辑:基础资源估算值+加权资源估算值。
    (1)基础资源数量的逻辑 ·对于 MapTask ,系统需要初始化不同的输入数据 ,根据期望的每个Map能处理的数据量,再结合用户提交任务的输入数据量,就可以估算出用户提交的任务所需要的 Map量。为了保证集群上任务的整体吞吐量,保证集群的资源不会被一些超大任务占有,我们采用分层的方式,提供平均每个 Map 能处理的数据量。

    ● 对于 Reduce Task ,比较 Hive 使用 Map 输入数据量, MaxCompute 使用最近 Reduce 对应 Map 的平均输出数据量作为 Reduce 的输入数据 ,用于计算 Instance 的数量。对于 Reduce 个数的估算与 Map 估算基本相同,不再赘述

    (2)加权资源数量的逻辑
    ● 对于 Map Task ,系统需要初始化期望的每个 Map 能处理的数据量。通过该 Map 在最近一段时间内的平均处理速度与系统设定的期望值做比较,如果平均处理速度小于期望值,则按照同等比例对基础资源数量进行加权,估算出该 Map 的加权资源数量。
    ● 对于 ReduceTask ,方法同上。

    最终的 Instance 个数为:基础资源估算值+加权资源估算值。
    ● CPU/内存分配逻辑:类似于 Instance 分配逻辑,也是采用基础资源估算值+加权资源估算值的方法。

    3. HBO 效果
    (1)提高 CPU 利用率
    通过适当降低每个Instance CPU 资源数,集群利用率从 40% 提升到 80% 。其中早上 4:00-7 :00 节省的 CPU 资源可以供6万个 Instance 并发使用。

    (2)提高内存利用率
    在保障并行度,同时又能提高执行效率的基础上,合理分配内存, 早上 4:00 7:00 节省的内存资源可以供4万个 Instance 并发使用。

    (3)提高 Instance 并发数
    合理设置 Task的Instance 个数, Instance 峰值并发数提升了 57%。

    (4)降低执行时长
    在某机器上测试效果很明显。该集群有 700 台机器,任务数约 16万个 ,总执行时长减少 4472 小时(没有开启 HBO 时总执行时长是 8356小时,开启 HBO 后总执行时长为 3884 小时)。

    4. HBO 改进与优化
    HBO 是基于执行历史来设置计划的,对于日常来说,数据量波动不大,工作良好。但是某些任务在特定场合下依旧有数据量暴涨的情况发生,尤其是在大促“双11”和“双 12 ”期间,这个日常生成的 HBO计划就不适用了。针对这个问题, HBO 也增加了根据数据量动态调整Instance 数的功能,主要依据 Map 的数据量增长情况进行调整。

    13.1.2 CBO

    MaxCompute 2.0 引人了基于代价的优化器( CBO ),根据收集的统计信息来计算每种执行方式的代价,进而选择最优的执行方式。该优化器对性能提升做出了卓越改进。通过性能评测, MaxCompute 2.0 离线计算比同类产品 Hive 2.0 on Tez 90% 以上。

    1 .优化器原理
    优化器( Optimizer )引人了 Volcano 模型(请参考论文 The Volcano Optimizer Gener tor: Extensibility and fficient Search ,该模型是基于代价的优化器( CBO ),并且引人了重新排序 Join (Join Reorder )和MapJoin (Auto MapJoin )优化规则等,同时基于 Volcano 模型的优化器会尽最大的搜索宽度来获取最优计划。

    优化器有多个模块相互组合协调工作,包括 Meta Manager (元数据)、 Statistics (统计信息)、 Rule Set (优化规则集)、 Volcano Planner Core(核心计划器)等,如图 13.1 所示。在这里插入图片描述
    ( 1 ) Meta Manager
    Meta 模块主要提供元数据信息,包括表的元数据、统计信息元数据等。当优化器在选择计划时,需要根据元数据的一些信息进行优化。比如表分区裁剪( TableScan Part on Prunning )优化时, 需要通过 Meta信息获取表数据有哪些分区,然后根据过滤条件来裁剪分区。同时还有一些基本的元数据,如表是否是分区表、表有哪些列等。

    对于 Meta 的管理, MaxCompute 提供了 Meta Manager 与优化器行交互。 Meta Manager 与底层的 Meta 部分对接 ,提供了优化器所需要的信息。

    ( 2) Statistics
    Statistics 主要是帮助优化器选择计划时,提供准确的统计信息,如表的 Count 值、列的 Distinct 值、 TopN 值等。优化器只有拥有准确的统计信息,才能计算出真正最优的计划。比如 Join 是选择 Hash Join还是Merge Join ,优化器会根据 Join 的输入数据量( Count 值)来进行选择。
    优化器提供了 UDF 来收集统计信息,包括 Distinct 值、 TopN 等,而Count 值等统计信息是由底层 Meta 直接提供的。

    ( 3) Rule Set
    优化规则是根据不同情况选择不同的优化点,然后由优化器根据代价模型( Cost Model )来选择启用哪些优化规则。比如工程合并规则(Project Merge Rule ), 将临近的两个 Project 合并成一个 Project ;过滤条件下推规则( Filter Push Down ),将过滤条件尽量下推,使得数据先进行过滤,然后再进行其他计算,以减少其他操作的数据量。这些所有的优化都放置在优化规则集中。

    Max Compute 优化器提供了大量的优化规则,用户也可以通过 set等命令来控制所使用的规则。规则被分为 Substitute Rule (被认为是优化了肯定好的规则)、 Explore Rule (优化后需要考虑多种优化结果)、Build Rule (可以认为优化后的结果不能再次使用规则进行优化)。

    ( 4) Volcano Planner Core
    Volcano Planner 是整个优化器的灵魂,它会将所有信息( Meta息、统计信息、规则)统一起来处理,然后根据代价模型的计算,获得一个最优计划。

    ① 代价模型。代价模型会根据不同操作符(如 Join Project 等)计算不同的代价,然后计算出整个计划中最小代价的计划。 MaxCompter代价模型目前提供的 Cost 由三个维度组成 ,即行数、 1/0 开销、 CPU开销,通过这三个维度来衡量每一个操作符的代价。

    ②工作原理。假设 Planner 的输入是一棵由 ompiler 解析好的计划树,简称 RelNode 树,每个节点简称 Re!Node

    ● Volcanno Planner创建
    Planner 的创建主要是将 Planner 在优化过程中要用到的信息传递给执行计划器,比如规则集,用户指定要使用的规划 Meta Provider ,每个ReINode Meta 计算,如 RowCount 值计算、 Distinct 值计算等;代价模型,计算每个 RelNode 的代价等。这些都是为以后 Planner 提供的必要信息。

    ● Planner 优化
    规则匹配( Rule Match ):是指 RelNode 满足规则的优化条件而建立的一种匹配关系。 Planner 首先将整个 RelNode 树的每一个 ReIN ode 注册到 Planner 内部,同时在注册过程中,会在规则集中找到与每个RelNode 匹配的规则,然后加入到规则应用( Rule Apply )的队列中。所以整个注册过程处理结束后,所有与 RelNode 可以匹配的规则全部加入到队列中,以后应用时只要从队列中取出来就可以了。

    规则应用( Rule Apply ):是指从规则队列( Rule Queue )中弹出( Pop)一个已经匹配成功的规则进行优化。当获取到一个已经匹配的规则进行处理时,如果规则优化成功,则肯定会产生至少一个新的 RelNode ,因为进行了优化,所以与之前未优化时的 ReINode 有差异。这时需要再次进行注册以及规则匹配操作,把与新产生的 RelNode 匹配的规则加入到规则队列中,然后接着下次规则应用。

    Planner 会一直应用所有的规则,包括后来叠加的规则,直到不会有新的规则匹配到。至此,整个优化结束,这时就可以找到一个最优计划。

    代价计算( Cost Compute ): 每当规则应用之后,如果规则优化成功,则会产生新的 ReINode 。在新的 ReINode 注册过程中,有 个步骤是计算 RelNode 的代价。

    代价计算由代价模型对每个 RelNode 的代价进行估算。
    ▷ 如果不存在代价,或者 Child 的代价还没有估算(默认是最大值),则忽略。
    ▷ 如果存在代价,则会将本身的代价和 Child (即输入的所有RelNode )的代价进行累加, 若小于 Best ,则认为优化后的ReINode 是当前最优的 。并且会对其 Parent 进行递归估算代价,即传播代价计算( Propagate Calculate Cost )。

    比如计划: Project->TableScan ,当 TableScan 计算代价为1时,则会继续估算 Project 的代价,假设为1,则整个 Project 的代价就是 1+1=2

    也就是说,当 ReINode 本身的代价估算出来后,会递归地对 Parent进行代价估算,这样就可以对整条链路的计划进行估算。在这个估算过程中借助了 Meta Manager Statistics 提供的信息(见图 13.2 )。在这里插入图片描述
    2. 优化器新特性
    优化器具有一些新特性,主要是重新排序 Join (Join Reorder )和自动MapJoin (Auto MapJoin )。

    ( 1 )重新排序Join
    Join 可以认为是关系数据库中最重要的操作符之一。 Join 的性能也直接关系到整个 SQL 的性能。
    重新排序 Join 可以认为是将 Join 的所有不同输入进行一个全排列,然后找到代价最小的一个排列。之前仅仅保持了用户书写 SQL 语句的Join 顺序,这样的 Join 顺序不一定是最优的,所以通过重新排序 Join规则可以实现更好的选择,提供更优的性能。、

    (2 )自动 MapJoin
    Join 实现算法有多种,目前主要有 Merge Join和MapJoin 。对于小数据量, MapJoin和Merge Join 性能更优。之前是通过 Hint方式来指定
    是否使用 MapJoin ,这样对用户不是很友好,且使用不方便。自动MapJoin 充分利用优化器的代价模型进行估算,获得更优的 MapJoin方式,而不是通过 Hint 方式来进行处理。

    3.优化器使用
    Max Compute 优化器具有一些新特性 ,也提供了许多优化规则,将内部已经实现的优化规则进行分类,并且提供 set 等命令方便用户使用。一些基础优化规则都会默认打开,用户也可以自己任意搭配使用
    优化器提供的 Flag 有:
    规则白名单一一odps.optimizer.cbo.rule. filter.white
    规则黑名单一-odps.optimizer.cbo.rule.filter.black
    使用方法很简单,如果用户需要使用哪些优化规则,只要将规则的缩写名称加人白名单即可;反之,需要关闭哪些优化规则,只要将名称加入黑名单即可。比如 set odps.optimizer.cbo.rule.filter.black=xxx,yyy;, 就表示将 xxx,yyy 对应的优化规则关闭。

    对于重新排序 Join 和自动 MapJoin ,对应的标记分别是 porj和hj即如果想使用上述优化,则可以进行如下设置:

    set odps. optimizer . cbo. rule.filter.white = pojr,hj;

    下面列举 TPC-H 些测试结果,如图 13.3 示。
    在这里插入图片描述

    优化前: 5129s ,优化后: 3814s ,性能提升 25.7%。

    4.注意事项
    由于用户书写 SQL 语句时可能存在一些不确定因素,所以应尽量避免这些因素带来 的性能影响,甚至结果非预期。

    Optimizer 会提供谓词下推( Predicate Push Down )优化 ,主要目的是尽量早地进行谓词过滤,以减少后续操作的数据量,提高性能。但需要注意的是:
    (1) UDF
    对于 UDF 是否下推,优化器做了限制,不会任意下推这种带有用户意图的函数,主要是因为不同用户书写的函数含义不一样,不可以一概而论。如果用户需要下推 UDF ,则要自己改动SQL ,这样做主要的好处是用户自己控制 UDF 执行的逻辑,最了解自己的 UDF 使用在 SQL的哪个部分,而不是优化器任意下推等。例如UDF 可能和数据顺序有关,下推和不下推会导致出现不同的结果。理论上,过滤条件可以下推Exchange 之后,但不是所有U’DF 都是这样的 否则会导致结果违背了用户书写 SQL 语句的本意。

    (2 )不确定函数
    对于不确定函数,优化器也不会任意下推,比如 sample 函数,如果用户将其写在 where 子句中,同时语句存在 Join ,则优化器是不会下推到 TableScan 的,因为这样可能改变了原意。

    SELECT * 
    FROM tl 
    JOIN t2 
    ON tl. cl=t2 . dl 
    WHERE sample(4, 1) =true ;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    则sample 函数在 Join 之后执行,而不会直接在 TableScan 后执行。
    如果用户需要对 TableScan 进行抽样,则需要自己修改 SQL 来达到目的;否则优化器进行下推可能会错误地理解用户的意图。
    对上述 SQL 语句修改如下:

    SELECT * 
    FROM (
    SELECT * 
    FROM t1 
    WHERE sample(4 , 1) =true 
    ) t1 
    JOIN t2 
    ON tl. cl=t2 . dl ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    (3 )隐式类型转换
    书写 SQL 语句时 应尽量避免 Join Key 存在隐式类型转换。例如String = Bigint ,在这种情况下会转换为 ToDouble(String) ToDouble(Bigint ),这是不是用户原本的意图,数据库本身不得而知。这样可能引发的后果有两种:一种是转换失败,报错:另一种是虽然执行成功了,但结果与用户期望的不一致。

    13.2 任务优化

    本节主要从数据倾斜方面讨论数据优化。下面给出 SQL/MR从提交到最后执行在 MaxCompute 中的细分步骤,如图 13.4 所示。在这里插入图片描述
    SQL/MR 作业一般会生成 MapReduce 任务,在 MaxCompute 中则会生成 MaxCompute Instance ,通过唯一ID 进行标识。

    ● Fuxi Job :对于 MaxCompute Instance ,则会生成一个或多个由Fuxi Task 组成的有向无环图,即 Fuxi Job MaxCompute Instance 和Fuxi Job 类似于 Hive Job 的概念。
    ● Fuxi Task :主要包含三种类型,分别是 Map Reduce Join类似于 Hive Task 的概念。
    ● Fuxi Instance 真正的计算单元,和 Hive 中的概念类似,一般和槽位( slot )对应。

    13.2.1 Map 倾斜

    1. 背景
    Map 端是 MR 任务的起始阶段, Map 端的主要功能是从磁盘中将数据读人内存, Map 端的两个主要过程如图 13.5 所示。在这里插入图片描述
    ● 每个输入分片会让一个 Map Instance 来处理,在默认情况下,以Pangu 文件系统的一个文件块的大小(默认为 256MB )为一个分片。 Map Instance 输出的结果会暂时放在一个环形内存缓冲区中,当该缓冲区快要溢出时会在本地文件系统中创建一个溢出文件,即Write Dump Map 读数据阶段,可以通过“ set odps.mapper.split.size=256 ”来调节 Map Instance 的个数,提高数据读入的效率,同时也可以通过“ set odps.mapper.merge. limit.size=64 ”来控制 Map Instance 读取文件的个数。如果输数据的文件大小差异比较大,那么每个 Map Instance 读取的数据量和读取时间差异也会很大。

    ● 在写人磁盘之前,线程首先根据 Reduce Instance 的个数划分分区,数据将会根据 Key Hash 到不同的分区上,一个 ReduceInstance 对应一个分区的数据。 Map 端也会做部分聚合操作,以减少输入 Reduce 端的数据量。由于数据是根据 Hash 分配的,因此也会导致有些 Reduce Instance 会分配到大量数据,而有些Reduce Instance 却分配到很少数据,甚至没有分配到数据。

    在Map 端读数据时,由于读入数据的文件大小分布不均匀,因此会导致有些 Map Instance 读取并且处理的数据特别多,而有些 MapInstance 处理的数据特别少,造成 Map 端长尾。以下两种情况可能会导Map 端长尾:

    ● 上游表文件的大小特别不均匀,并且小文件特别多,导致当前表Map 端读取的数据分布不均匀,引起长尾。
    ● Map 端做聚合时,由于某些 Map Instance 读取文件的某个值特别多而引起长尾,主要是指 Count Distinct 操作

    2 . 方案
    第一种情况导致的 Map 端长尾,可通过对上游合并小文件,同时调节本节点的小文件的参数来进行优化,即通过设置“ set odps.sql. mapper.merge.limit.size 64 ”和“ set odps .s ql.mapper.s plit.size=256两个参数来调节,其中第一个参数用于调节 Map 任务的 Map Instance的个数:第二个参数用于调节单个 Map Instance 读取的小文件个数,防止由于小文件过多导致 Map Instance 读取的数据量很不均匀;两个参数配合调整。下面主要讨论第二种情况的处理方式。

    如下代码的作用是获取手机 APP 日志明细中的前一个页面的页面组信息,其中pre_page 是前一个页面的页面标识, page_ut 表是存储的手机APP 的页面组, pre_page 只能通过匹配正则或者所属的页面组信息,进行笛卡儿积 Join。

    原始代码:
    在这里插入图片描述
    运行上述代码, LogView 日志如图 13.6 所示。
    在这里插入图片描述
    Ll_Stg4 MapJoin 表的分发阶段: M3_Stg 是读取明细日志表Map 阶段,与 MapJoin 小表的 Join 操作也发生在这个阶段: R5 Stg2是进行分组排序的阶段。

    通过日志发现, M3_Stgl 阶段单个 Instance 的处理时间达到了 40分钟,而且长尾的 Instance 个数比较固定,应是不同的 Map 读入的文件块分布不均匀导致的,文件块大的 Map 数据量比较大,在与小表进行笛卡儿积操作时,非常耗时,造成 Map 端长尾。针对这种情况,可以使用“ distribute by rand ()”来打乱数据分布,使数据尽可能分布均匀。
    修改后代码如下:
    在这里插入图片描述
    执行上述代码, LogView 志如图 13 .7 所示。
    在这里插入图片描述
    通过“distribute by rand ()”会将 Map 端分发后的数据重新按照随机值再进行一次分发。原先不加随机分发函数时, Map 阶段需要与使用MapJoin 的小表进行笛卡儿积操作, Map 端完成了大小表的分发和笛卡儿积操作。使用随机分布函数后, Map 端只负责数据的分发,不再有复杂的聚合或者笛卡儿积操作,因此不会导致 Map 端长尾。

    3. 思考
    Map 长尾的根本原因是由于读入的文件块的数据分布不均匀,再加上UDF 函数性能、 Join 、聚合操作等,导致读人数据量大的 Maplnstance 耗时较长。在开发过程中如果遇到 Map 端长尾的情况,首先考虑如何让 Map Instance 读取的数据量足够均匀,然后判断是哪些操作导Map Instance 比较慢 ,最后考虑这些操作是否必须在 Map 完成在其他阶段是否会做得更好。

    13.2.2 Join 倾斜

    1.背景
    Join 操作需要参与 Map Reduce 的整个阶段。首先通过一段 JoinSQL 来看整个 Map Reduce 阶段的执行过程以及数据的变化,进而对 Join 的执行原理有所了解。
    假设有下面一段 Join SQL:
    在这里插入图片描述
    通过上面的执行过程可以看出, MaxCompute SQL Join 执行阶段会将 Join Key 相同的数据分发到同一个执行 Instance 上处理 。如果某个Key 上的数据量比较大,则会导致该 Instance 执行时间较长。其表现为:在执行日志中该 Join Task 的大部分 Instance 都已执行完成,但少数几Instance 一直处于执行中(这种现象称之为长尾)。在这里插入图片描述
    因为数据倾斜导致长尾的现象比较普遍,严重影响任务的执行时间,尤其是在“双 l l ”等大型活动期间,长尾程度比平时更严重。比如某些大型店铺的 PV 远远超过一般店铺的 PV ,当用浏览日志数据和卖家维表关联时,会按照卖家 ID 进行分发,导致某些大卖家所在Instance 处理的数据量远远超过其他 Instance ,而整个任务会因为这个长尾的 Instance 迟迟无法结束。

    本文的目的就是对 MaxCompute SQL 执行中 Join 阶段的数据倾斜情况进行分析总结,根据不同的倾斜原因给出对应的解决方案。这里主要讲述三种常见的倾斜场景。

    • Join 的某路输入比较小,可以采用 MapJoin ,避免分发引起长尾。
    • Join 的每路输入都较大,且长尾是空值导致的,可以将空值处理成随机值,避免聚集。
    • Join 的每路输入都较大,且长尾是热点值导致的,可以对热点值和非热点值分别进行处理,再合并数据。

    下面会针对这三种场景给出具体的解决方案。首先我们了解 下如何确认 Join 是否发生数据倾斜。
    打开 MaxCompute SQL 执行时产生的 LogView 日志,点开日志会看到每个 Fuxi Task 的详细执行信息,如图 13.9 所示。在这里插入图片描述
    从图 13.9 中可以看到每一个 Map Join、 Reduce的Fuxi Task 任务,点击其中一个 Join 任务,可以看到有 115 Instance 长尾:再点击Std Out ,可以查看 Instance 读人的数据量,如图 13.10 所示。在这里插入图片描述
    图13. 10 中显示 Join 路输入读取的数据量是 1389941257 行。如果 Long-Tails Instance 读人的数据量远超过其他 Instance 读取的数据量,则表示某个 Instance 处理的数据量超大导致长尾。
    2. 方案
    针对上面提到的三种倾斜场景,给出以下三种对应的解决方案。
    (1) MapJoin 方案
    Join 倾斜时,如果某路输入比较小,则可以采用 MapJoin 避免倾斜MapJoin 的原理是将 Join 操作提前到 Map 端执行, 将小表读人内存,
    顺序扫描大表完成 Join 。这样可以避免因为分发 key 不均匀导致数据倾斜。但是 MapJoin 的使用有限制,必须是 Join 中的从表比较小才可用。所谓从表,即左外连接中的右表,或者右外连接中的左表。

    MapJoin 的使用方法非常简单,在代码中 select 后加上“/*mapjoin(a) */”即可,其中a代表小表(或者子查询)的别名。现在Max Compute 已经可以自动选择是否使用 MapJoin ,可以不使用显式Hint 。例如:在这里插入图片描述
    另外,使用 MapJoin 时对小表的大小有限制,默认小表读人内存后的大小不能超过 512MB ,但是用户可以通过设置“ set odps.sql.mapjoin. memory.max=2048 ”加大内存,最大为 2048MB

    (2) Join 因为空值导致长尾
    另外,数据表中经常出现空值的数据,如果关联 key 为空值且数据量比较大, join 时就会因为空值的聚集导致长尾 ,针对这种情况可以将空值处理成随机值。因为空值无法关联上,只是分发到一处 因此处理成随机值既不会影响关联结果,也能很好地避免聚焦导致长尾。例如
    在这里插入图片描述
    ( 3) Join 因为热点值导致长尾
    如果是因为热点值导致的长尾,并且 Join 的输入比较大无法使用MapJoin ,则可以先将热点 key 取出,对于主表数据用热点 key 切分成热点数据和非热点数据两部分分别处理,最后合并。这里以淘宝的 PV日志表关联商品维表取商品属性为例进行介绍。
    ● 取热点 key :将 PV 大于 50000 的商品 ID 取出到临时表中。
    在这里插入图片描述
    ● 取出非热点数据。
    将主表( pv 表)和热点 key 表( topk_item 表)外关联后,通过条件“ bl .item_id is null "取关联不到的数据即非热点商品的日志数据 ,此时需要使用 MapJoin 。再用非热点数据关联商品维表,因为已经排除了热点数据,所以不会存在长尾。在这里插入图片描述
    ● 取出热点数据。
    将主表( pv 表)和热点 key 表( topk_item 表)内关联,此时需要使用 MapJoin ,取到热点商品的日志数据。同时,需要将商品维表( item表)和热点 key topk item )内联,取到热点商品的维表数据然后将第一部分数据外联第二部分数据,因为第二部分数据只有热点商品的维表,数据量小,可以使用 MapJoin 避免长尾。

    ●将上面取到的非热点数据和热点数据通过“union all ”合并后即得到完整的日志数据,并且关联了商品信息。针对倾斜问题, MaxCompute 系统也提供了专门的参数用来解决长尾问题,如下所示。
    ·开启功能

    set odps.sql .skewjoin=true/false

    ·设置倾斜的 key 及对应的值:

    set odps . sql . skewinfo=skewed_src: (skewed_key) [(” skewed value " )]

    其中 skewe _key 代表倾斜的列, skewed_value 代表倾斜列上的倾斜值。
    设置参数的好处很明显,简单方便 坏处是如果倾斜值发生变化需要修改代码 而且一般无法提前知道变化。另外,如果倾斜值比较多也不方便在参数中设置。需要根据实际情况选择拆分代码或者设置参数。
    3.思考
    当大表和大表 Join 因为热点值发生倾斜时,虽然可以通过修改代码来解决,但是修改起来很麻烦,代码改动也很大,且影响阅读。而Max Compute 现有的参数设置使用不够灵活,倾斜值多的时候不可能将所有值都列在参数中,且倾斜值可能经常变动。因此,我们还一直在探索和思考,期望有更好的、更智能的解决方案,如生成统计信息,Max Compute 内部根据统计信息来自动生成解决倾斜的代码,避免投过多的人力。

    13.2.3 Reduce 倾斜

    1.背景
    Reduce 端负责的是对 Map 端梳理后的有 key-value 键值对进行聚合,即进行 Count Sum Avg 等聚合操作,得到最终聚合的结果。
    Distinct是MaxCompute SQL 中支持的语法,用于对字段去重。比如计算在某个时间段内支付买家数、访问 UV 等,都是需要用 Distinct进行去重的。 MaxCompute Distinct 的执行原理是将需要去重的字段以及 Gro up By 宇段联合作为 key 将数据分发到 Reduce 端。

    因为 Distinct 操作,数据无法在 Map 端的 Shuffle 阶段根据 Group By 先做一次聚合操作,以减少传输的数据量,而是将所有的数据都传输到Reduce 端,当 key 的数据分发不均匀时,就会导致 Reduce 端长尾。

    Reduce 端产生长尾的主要原因就是 key 的数据分布不均匀。比如有些 Reduce 任务 Instance 处理的数据记录多,有些处理的数据记录少,造成 Reduce 端长尾 。如下几种情况会造成 Reduce 端长尾:
    ● 对同一个表按照维度对不同的列进行 Count Distinct 操作,造成Map 端数据膨胀,从而使得下游的 Join Reduce 出现链路上的长尾。
    ● Map 端直接做聚合时出现 key 值分布不均匀,造成 Reduce 端长尾。
    ● 动态分区数过多时可能造成小文件过多,从而引起 Reduce 端长尾。
    ● 多个 Distinct 同时出现在 SQL 代码中时,数据会被分发多次,不仅会造成数据膨胀N倍,还会把长尾现象放大N倍。

    2. 方案
    对于上面提到的第二种情况,可以对热点 key 进行单独处理,然后通过“union All ”合并。这种解决方案已经在“Join 倾斜”一节中介绍过。

    对于上面提到的第三种情况,可以把符合不同条件的数据放到不同的分区,避免通过多次“Insert Overwrite”写人表中,特别是分区数比较多时,能够很好地简化代码。但是动态分区也有可能会带来小文件过多的困扰。以最简 SQL 为例

    INSERT OVERWRITE TABLE part test PARTITION(ds) 
    SELECT * 
    FROM part test;
    
    • 1
    • 2
    • 3

    假设有K个Map Instance,N个目标分区:
    那么在最坏的情况下,可能产生 KxN 个小文件,而过多的小文件会对文件系统造成巨大的管理压力,因此 MaxCompute 对动态分区的处理是引人额外一级的 Reduce Task ,把相同的目标分区交由同一个(或少量几个) Reduce Instance 来写人,避免小文件过多,并且这个 Reduce肯定是最后一个 Reduce Task 操作。 MaxCompute 是默认开启这个功能的,也就是将下面参数设置为 true

    set odps.sql reshuffle.dynam cpt=true

    设置这个参数引人额外一级的 Reduce Task 的初衷是为了解决小件过多的问题,那么如果目标分区数比较少,根本就不会造成小文件过多,这时候默认开启这个功能不仅浪费了计算资源,而且还降低了性能。因此,在此种情况下关闭这个功能:

    set odps.sql . reshuffle .dynamicpt=false

    上面几种情况相对比较简单,这里重点介绍第四种情况。
    如图 13.1 所示这段代码是在7天、 30 天等时间范围内,分PC 端、无线端、所有终端,计算支付买家数和支付商品数,其中支付买家数和支付商品数指标需要去重。因为需要根据日期、终端等多种条件组合对买家和商品进行去重计算,因此有 12 Count Distinct 计算。在计算过程中会根据 12 个组合 key 分发数据来统计支付买家数和支付商品数。这样做使得节点运行效率变低。在这里插入图片描述
    如图 13.12 所示是该代码的运行 LogView 日志,节点运行时长为lh14min ,数据膨胀。
    针对上面的问题,可以先分别进行查询,执行 Gro up By 原表粒度+ buyer_id ,计算出 PC 端、无线端、所有终端以及在7天、 30 天等统计口径下的 buyer_id (这里可以理解为买家支付的次数),然后在子查询外 Group By 原表粒度,当上一步的 Count 值大于1时,说明这一买家在这个统计口径下有过支付 ,计入支付买家数,否则不计入。计算支付商品数采用同样的处理方式。最后对支付商品数和支付买家数进行Join 操作。
    在这里插入图片描述
    按上述方案修改后的代码如下(仅示例支付买家数的计算):
    在这里插入图片描述
    经测试,修改后的运行时间为 13min 后的效果还是非常明显的。整体运行的LogView 日志 13.13 所示,可以看到和 Count Distinct 计算方式相比数据没有膨胀,约为原方式的 1/ 10.
    在这里插入图片描述
    3.思考
    对 Multi DIstinct 的思考如下:
    ●上述方案中如果出现多个需要去重的指标,那么在把不同指Join在一起之前, 一定要确保指标的粒度是原始表的数据粒度。比如支付买家数和支付商品数,在子查询中指标粒度分别是:原始表的数据粒度+ buyer_id 和原始表的数据粒度 item_id ,这时两个指标不是同一数据粒度,所以不能 Join ,需要再套一层代码,分别把指标 Group By 到“原始表的数据粒度”,然后再进行 Join操作。

    ● 修改前 Multi Distinct 代码的可读性比较强,代码简洁,便于维护;修改后的代码较为复杂。当出现的 Distinct 个数不多、表的数据量也不是很大、表的数据分布较均匀时,不使用 MultiDistinct 的计算效果也是可以接受的。所以,在性能和代码简洁、可维护之间需要根据具体情况进行权衡。另外,这种代码改动还是比较大的,需要投入一定的时间成本,因此可以考虑做成自动化,通过检测代码、优化代码自动生成将会更加方便。

    ● 当代码比较膝肿时,也可以将上述子查询落到中间表里,这样数据模型更合理、复用性更强、层次更清晰。当需要去除类似的多Distinct 时,也可以查一下是否有更细粒度的表可用,避免重复计算。

    目前 Reduce 端数据倾斜很多是由 Count Distinct 问题引起的,因ETL 开发工作中应该予以重视 Count Distinct 问题,避免数据膨胀。对于一些表的 Join 阶段的 Null 值问题,应该对表的数据分布要有清楚的认识,在开发时解决这个问题。

  • 相关阅读:
    vue使用xlsx、xlsx-style和fileSaver导出excel表格
    HTTPS原理(证书验证+数据传输)
    LeetCode 练习——剑指 Offer 66. 构建乘积数组
    数据结构-作业11
    11.springboot监控
    MQTT基础--MQTT 客户端和代理以及 MQTT 服务器和连接建立说明:第 3 部分
    2022牛客多校六 M-Z-Game on grid(动态规划)
    Python问题:树的镜面映射
    剑指 Offer II 098. 路径的数目 / 剑指 Offer II 099. 最小路径之和
    船用低速发动机缸压在线监测系统
  • 原文地址:https://blog.csdn.net/weixin_43850384/article/details/125796447