本文档介绍的各种flink sql的语法基于flink-1.13.x
,flink版本低于1.13.x
的用户,在sql运行出错误时,需要自行去flink官网查看对应版本的语法支持。
另外,flink新版本支持的语法,文档中会进行特殊标注,说明对应语法在 flink 哪个版本开始支持,但凡是没有特殊标注的,均支持flink-1.13.x
及以上版本。
以下所有的配置均可通过 flink sql 的SET
语句进行设置。
以下选项可用于调优查询执行的性能。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
table.exec.async-lookup.buffer-capacity Batch Streaming | 100 | Integer | 异步查找join触发的最大异步i/o操作的数量。 |
table.exec.async-lookup.timeout Batch Streaming | 3 min | Duration | 异步操作完成的异步超时时间。 |
1.15.x开始支持 table.exec.deduplicate.insert-update-after-sensitive-enabled Streaming | true | Boolean | 设置任务(尤其是 sink )是否对 INSERT 消息和 UPDATE_AFTER 消息敏感。如果为 false,有时(比如删除最后一行)flink 会将第一行消息设置为 UPDATE_AFTER 而不是 INSERT 。 |
1.15.x开始支持 table.exec.deduplicate.mini-batch.compact-changes-enabled Streaming | false | Boolean | 设置在开启 mini-batch 时,是否压缩发送到下游的数据变更。 如果设置为 true,flink 将会压缩更改,并且只发送最新的变更到下游。注意,如果下游需要每个版本的数据,该优化将会失效。 如果设置为 false,在没有开启 mini-batch 时,flink 将会发送所有的变更到下游。 |
table.exec.disabled-operators Batch | (none) | String | 主要为了测试。用逗号分隔的算子名称列表,买个名称代表一类禁止操作的算子。可以被禁止的算子包括:“NestedLoopJoin”, “ShuffleHashJoin”, “BroadcastHashJoin”, “SortMergeJoin”, “HashAgg”, “SortAgg”。默认不禁止任何算子。 |
1.15.x开始支持 table.exec.legacy-cast-behaviour Batch Streaming | DISABLED | 枚举 可用值: ENABLED DISABLED | 确定 CAST 操作是按照之前的方式执行,还是按照新的修复了很多问题,并且改进过的新方式进行。 可选值有: ENABLED:CAST 将会按照之前的方式执行。 DISABLED:CAST 将会按照新的正确的方式执行。 |
table.exec.mini-batch.allow-latency Streaming | 0 ms | Duration | MiniBatch 缓存输入数据的最大延迟时间。MiniBatch 可以优化数据缓存,以减少state状态访问。MiniBatch 在允许的时间间隔内收到最大的缓存数据量时触发。注意:如果table.exec.mini-batch.enabled设置为true,该值必须大于0。 |
table.exec.mini-batch.enabled Streaming | false | Boolean | 是否开启MiniBatch 优化。MiniBatch 可以优化数据缓存,以减少state状态访问。默认false禁用。可以设置为true来开启。注意:如果开启mini-batch,则必须设置’table.exec.mini-batch.allow-latency’和’table.exec.mini-batch.size’。 |
table.exec.mini-batch.size Streaming | -1 | Long | MiniBatch 可以缓存的最大输入数据数量。MiniBatch 可以优化数据缓存,以减少state状态访问。MiniBatch 在允许的时间间隔内收到最大的缓存数据量时触发。注意:如果table.exec.mini-batch.enabled设置为true,该值必须为正值。 |
1.15.x开始支持 table.exec.rank.topn-cache-size Streaming | 10000 | Long | Rank 操作会缓存部分状态数据以减少对状态数据的访问。设置的缓存大小为每个 rank 任务中数据的数量。 |
table.exec.resource.default-parallelism Batch Streaming | -1 | Integer | 设置所有算子的默认并行度(比如aggregate,join,filter)。该配置的优先级高于StreamExecutionEnvironment (实际上,该配置会覆盖StreamExecutionEnvironment 设置的并行度)。-1表示不设置默认并行度,然后使用StreamExecutionEnvironment设置的并行度。 |
只支持1.13.x版本,在1.14.x中被删除 table.exec.shuffle-mode Batch | ALL_EDGES_BLOCKING | String | 设置执行的shuffle模式。 可用值有: ALL_EDGES_BLOCKING: 所有edges使用阻塞shuffle. FORWARD_EDGES_PIPELINED: Forward edges will use pipelined shuffle, others blocking. POINTWISE_EDGES_PIPELINED: Pointwise edges will use pipelined shuffle, others blocking. Pointwise edges include forward and rescale edges. ALL_EDGES_PIPELINED: All edges will use pipelined shuffle. batch: 和 ALL_EDGES_BLOCKING 一样,过期值。 pipelined:和 ALL_EDGES_PIPELINED 一样,过期值。 注意:阻塞shuffle表好似数据在被发送到消费者之前,将会全部产生,pipelined shuffle表示数据一旦产生,就会立即被发送给消费者。 |
1.15.x开始支持 table.exec.simplify-operator-name-enabled Batch Streaming | true | Boolean | 当设置为 true 时,优化器将使用 id 和 ExecNode 的返回类型来简化展示算子的名称,并且将详细信息放到描述中。默认值为 true。 |
1.15.x开始支持 table.exec.sink.keyed-shuffle Streaming | AUTO | 枚举 可用值: NONE AUTO FORCE | 为了最小化很多用户在向有主键的表中写入数据时遇到的分布式乱序问题,在 sink 的并行和上游算子的并行度不同,并且上游为只追加类型时, flink 会自动增加一个默认的 keyed shuffle。 该优化只会在上游能够确保多个记录主键有序时生效,否则,增加的 shuffle 并不能解决问题(这种情况下,更有效的做法是刚开始就在 source 端对数据执行去重操作,或者是使用定义了主键的 upsert source ,以此来影响数据的评估) 默认情况下, keyed shuffle 将会在 sink 的并行度和上游并行度不同时被添加,也可以设置不使用 shuffle(NONE),或者是强制使用 shuffle(FORCE)。 可用值如下:NONE、AUTO、FORCE。 |
table.exec.sink.not-null-enforcer Batch Streaming | ERROR | 枚举 可用值: ERROR DROP | 决定当 NOT NULL 字段遇到 null 值时,flink 怎么处理。 可用值: ERROR:NOT NULL 字段遇到 null 值时抛出运行时异常。 DROP:NOT NULL 字段遇到 null 值时直接丢弃数据。 |
1.15.x开始支持 table.exec.sink.type-length-enforcer Batch Streaming | IGNORE | 枚举 可用值: IGNORE TRIM_PAD | 是否截取 CHAR(length)/VARCHAR(length)/BINARY(length)/VARBINARY(length) 或填充 CHAR(length)/BINARY(length) 类型字段的值,以此来让他们的长度和 CHAR/VARCHAR/BINARY/VARBINARY 类型字段定义的长度一样。 可选值有: IGNORE:不截取或填充,忽略 CHAR/VARCHAR/BINARY/VARBINARY 长度的定义。 TRIM_PAD:截取并填充字符串和 binary 值,以匹配 CHAR/VARCHAR/BINARY/VARBINARY 类型定义的长度 |
table.exec.sink.upsert-materialize Streaming | AUTO | 枚举 可用值: NONE AUTO FORCE | 由于分布式系统中的 shuffle 会造成 ChalgeLog 数据的乱序,所以 sink 接收到的数据可能在全局的 upsert 中乱序,所以要在 upsert sink 之前添加一个 upsert 物化算子。该算子接收上游 changelog 数据,并且给下游生成一个 upsert 视图。 默认情况下,在唯一 key 遇到分布式乱序时,该物化算子会被添加,也可以选择不物化(NONE),或者是强制物化(FORCE)。 可选值有:NONE、AUTO、FORCE。 |
table.exec.sort.async-merge-enabled Batch | true | Boolean | 是否异步合并排序的溢出文件。 |
table.exec.sort.default-limit Batch | -1 | Integer | 在使用order by语句后,用户没有使用limit语句,则默认使用该设置limit值。-1表示忽略该限制。 |
table.exec.sort.max-num-file-handles Batch | 128 | Integer | 外部归并排序的最大扇入文件数。该配置限制每个算子操作的文件数量。如果该值设置过小,可能会导致中间合并。但是如果设置过大,则会导致被同时打开的文件数太多,占用内存,并导致随机读取。 |
table.exec.source.cdc-events-duplicate Streaming | false | Boolean | 指定任务中的CDC(更改数据获取)source产生重复更改事件时,框架是否需要进行去重,获取一致性结果。CDC source会产生所有的更改事件,包括:INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE。比如:kafka source使用Debezium 格式化。该配置默认值为false。 然而,有重复更改事件是一种常见的情况。因为CDC工具(比如Debezium),在遇到失败时,会使用至少一次语义,因此,在异常情况下,Debezium 会交付重复的更改事件到kafka,然后flink将获取到重复的时间。这可能会导致flink查询产生错误的结果,或者是不期望遇到的异常。 因此,如果CDC工具设置的至少一次语义,则要求更改此配置。开启该配置要求CDC cource定义PRIMARY KEY主键。主键将用于对更改事件去重,并且生成有状态的changelog流。 |
table.exec.source.idle-timeout Streaming | 0 ms | Duration | 当一个source在超时时间内没有接收到任何数据时,它将被标记为临时空闲。这允许下游任务在其空闲时不需要等待来自该source的水印而发送其水印。缺省值为0,表示不开启source空闲检测。 |
table.exec.spill-compression.block-size Batch | 64 kb | MemorySize | 溢出数据时用于压缩的内存大小。内存越大,压缩比越高,但是作业消耗的内存资源也更多。 |
table.exec.spill-compression.enabled Batch | true | Boolean | 是否压缩溢出数据。目前,我们只支持对sort、hash-agg和hash-join算子压缩溢出数据。 |
table.exec.window-agg.buffer-size-limit Batch | 100000 | Integer | 设置group window agg算子中使用的窗口元素缓冲区大小限制。 |
以下配置可用于调整查询优化器,以获得更好的执行计划。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
table.optimizer.agg-phase-strategy Batch Streaming | AUTO | String | AUTO:不指定聚合策略。根据情况选择两阶段聚合或者是一阶段聚合。 TWO_PHASE:指定使用两阶段聚合,两阶段包括:localAggregate和globalAggregate。如果聚合不支持两阶段聚合优化,则会采用一阶段聚合。 ONE_PHASE:指定使用一阶段聚合,只包括:CompleteGlobalAggregate。 |
table.optimizer.distinct-agg.split.bucket-num Streaming | 1024 | Integer | 配置切分distinct聚合时的bucket桶的总数。该数字用于第一阶段聚合,其用来通过“hash_code(distinct_key)%BUCKET_NUM”计算出额外的分组key,以将数据打散到不同子任务。 |
table.optimizer.join-reorder-enabled Batch Streaming | false | Boolean | 在优化器中启用join重新排序。默认为禁用。 |
table.optimizer.multiple-input-enabled Batch | true | Boolean | 当设置为true时,优化器将会合并pipelined shuff输入为多个算子,以减少shuff,优化性能。默认值为true。 |
table.optimizer.reuse-source-enabled Batch Streaming | true | Boolean | 当设置为true时,优化器将尝试发现重复的表source,然后重用他们。要启用该设置,必须设置table.optimizer.reuse-sub-plan-enabled为true。 |
table.optimizer.reuse-sub-plan-enabled Batch Streaming | true | Boolean | 当设置为true时,优化器将尝试发现重复的子任务,然后重用他们。 |
1.15.x开始支持 table.optimizer.source.aggregate-pushdown-enabled Batch | true | Boolean | 当设置为 true 时,如果 TableSource 实现了 SupportsAggregatePushDown ,优化器就会将本地聚合进行下推。 |
table.optimizer.source.predicate-pushdown-enabled Batch Streaming | true | Boolean | 当设置为true时,优化器将谓词下推为 FilterableTableSource,默认为true。 |
以下选项可用于调整表计划器的行为。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
1.15.x开始支持 table.builtin-catalog-name Batch Streaming | default_catalog | String | 指定实例化 TableEnvironment 对象时初始化的 catalog 名称。 |
1.15.x开始支持 table.builtin-database-name Batch Streaming | default_database | String | 指定实例化 TableEnvironment 对象时在初始化 catalog 中的默认数据库名称。 |
table.dml-sync Batch Streaming | false | Boolean | 指定DML任务(比如插入操作)为异步/同步执行。默认为异步执行,因此可以同时提交多个DML任务。如果设置为true,则插入操作会等待任务完成才会结束。 |
table.dynamic-table-options.enabled Batch Streaming | 1.13.x:false 1.14.x:true | Boolean | 是否启用用于动态表的OPTIONS提示,如果禁用,则指定OPTIONS之后会抛出异常。 |
table.generated-code.max-length Batch Streaming | 1.13.x:64000 1.14.x:4000 | Integer | 1.13.x:指定一个阈值,将生成的代码拆分为子函数调用。Java的最大方法长度为64kb。如果有必要,则可以通过该参数设置更细的粒度。 1.14.x:指定一个阈值,将生成的代码拆分为子函数调用。Java的最大方法长度为64kb。如果有必要,则可以通过该参数设置更细的粒度。默认值为 4000 ,代替 64KB,JIT 会拒绝代码超过 8K 字节的方法执行。 |
table.local-time-zone Batch Streaming | default | String | 定义当前会话的本地时间时区id。该值用于转化或转化为TIMESTAMP WITH LOCAL TIME ZONE时间类型。在内部实现中,timestamps with local time zone通常表示UTC时区(0时区)。然而,当将该类型转化为不包含时区的数据类型(比如TIMESTAMP、TIME、简单的STRING)时,将会用到会话时区设置。该值可以使用完全的名称(比如:“America/Los_Angeles”),也可以使用自定义的时区ID(比如:“GMT+08:00”)。 |
只支持1.13.x版本,在1.14.x中被删除 table.planner Batch Streaming | BLINK | 枚举 可用值: BLINK、OLD | 使用“blink”或“old”计划器,默认为blink计划器。对于TableEnvironment来说,该设置用于创建TableEnvironment对象,而且对象创建之后无法修改该设置。注意:old计划器将会在flink 1.14版本中移除,因此该配置将会被废弃。 |
1.15.x开始支持 table.plan.compile.catalog-objects Batch Streaming | ALL | 枚举 可用值: ALL SCHEMA IDENTIFIER | 指定计划器编译时存储 catalog 对象,比如:表、函数、数据类型等的策略,该策略会影响在恢复算子期间需要提供的 catalog 元数据,并且影响计划占用空间的大小。 该配置选项不会影响匿名、内置或临时对象。如果可能的话,匿名或内置对象将会被完全持久化,包括元数据和选项配置,否则会编译失败。临时对象只会持久化他们的标识符,并且在恢复时提供给会话上下文。 可选值: ALL:所有的元数据,包括 catalog 表、函数或数据类型,都会在计划编译期间被持久化到计划中。对于 catalog 中的表来说,包括表的标识符、schema信息和选项配置。对于 catalog 中的函数,包括函数的标识符和 class 类。对于 catalog 中的数据类型,包括标识符和整个数据结构。使用该策略,在恢复算子期间,catalog 的元数据不必再可用。 SCHEMA:除了标识符之外,catalog 表、函数或数据类型的SCHEMA信息将在编译期间持久化到计划中。SCHEMA 允许在计划恢复算子期间检测目录中的不兼容更改。但是,所有其他元数据仍将从 catalog 中检索。 IDENTIFIER:只有 catalog 表、函数或数据类型会在编译期间被持久化到计划中。在算子恢复期间,所有的元数据都需要从 catalog 中检索。使用该策略,计划将会有更少的信息冗余。 |
1.15.x开始支持 table.plan.force-recompile Streaming | false | Boolean | 当该值为 false 时, 如果计划输出文件已经存在,则 COMPILE PLAN 语句会运行失败,除非使用了 IF NOT EXISTS 。当该值为 true 时,COMPILE PLAN 会覆盖已存在的计划输出文件。我们建议只在 debug 时启用该特性。 |
1.15.x开始支持 table.plan.restore.catalog-objects Batch Streaming | All | 枚举 可用值: ALL ALL_ENFORCED IDENTIFIER | 指定通过给定的计划恢复 catalog 对象,比如表、函数或者是数据类型,并在必要时执行 catalog 查找的策略。他会影响提供 catalog 的需要,并且充实部分计划信息。 可用值: ALL:读取所有持久化到计划中的元数据,包括 catalog 表、函数和数据类型。该策略会根据标识符从 catalog 中查找,以填充缺失的信息或丰富可变选项。当原始对象在 catalog 中不可用时,如果计划中存在所有必要的信息, pipeline 仍然可以恢复。 ALL_ENFORCED:要求所有已经持久化到计划中的元数据,包括 catalog 表、函数和数据类型。该策略既不会根据标识符在 catalog 中查找,也不会使用 catalog 信息充实可变选项。如果计划中没有包含所有必要的信息,恢复就会失败。 IDENTIFIER:只使用 catalog 表、函数或数据类型的标识符,并始终在 catalog 中查找。如果原始对象在目录中不再可用,恢复就会失败。计划中可能包含的其他元数据将被忽略。 |
table.sql-dialect Batch Streaming | default | String | 定义转化SQL查询的方言。不同的方言支持不同的SQL语法,目前支持default和hive方言。 |
SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。
此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。但是并不是所有的优化都是默认启用的,所以对于某些查询任务,可以通过开启一些配置来提高性能。
下面我们将介绍一些有用的优化选项和流聚合的内部结构,这些配置在某些情况下会带来很大的性能优化。
下面提到的流聚合优化现在都支持分组聚合和窗口TVF聚合。
默认情况下,分组聚合算子会逐个处理输入记录,即:
state
状态读取累加器
累加/撤回
到累加器
累加器
写回状态
这种处理模式可能会增加StateBackend
的开销(特别是RocksDB StateBackend
)。此外,生产中常见的数据倾斜会使问题更加严重,使任务更容易处于反压状态。
MiniBatch
微批处理聚合的核心思想是将大量输入缓存到聚合算子内部的缓冲区中。当输入记录集合被触发进行处理时,每个key只需要访问一次状态。这可以显著减少状态开销并获得更好的吞吐量。
但这可能会增加一些延迟,因为它会先缓冲一些记录而不是立即处理它们。这是吞吐量和延迟之间的权衡。
下图解释了MiniBatch处理聚合如何减少状态操作。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wVCFikOp-1659430842750)(E:\ProgramData\wps\WPS Cloud Files\笔记总结\flink\官网整理\flink-sql查询配置与性能优化参数详解.assets\mini-batch-16594307294771.png)]
解释:上面是一个记录读取一次状态,写入一次状态。下面是多个相同key的记录缓存之后,访问一次状态,写入一次状态。
默认情况下,分组聚合会禁用MiniBatch
优化。
为了启用此优化,需要设置table.exec.mini-batch.enabled、table.exec.mini-batch.allow-latency、table.exec.mini-batch.size。
详情请参阅查询配置页面。
无论上述配置如何,窗口TVF聚合始终启用MiniBatch优化。窗口TVF聚合缓冲区记录在托管内存中,而不是JVM堆中,因此没有过载GC或OOM问题的风险。
下面的示例展示如何启用这些选项。
set 'table.exec.mini-batch.enabled' = 'true'; -- 启用mini-batch
set 'table.exec.mini-batch.allow-latency' = '5 s'; -- 使用5s时间去缓存输入记录
set 'table.exec.mini-batch.size' = '5000'; -- 每个聚合算子任务最多可以缓存的最大记录数量
local-global
算法通过将分组聚合分为两个阶段来解决数据倾斜问题,即先在上游进行局部聚合,然后在下游进行全局聚合,类似于MapReduce
中的Combine + Reduce
模式。例如有以下SQL:
SELECT color, sum(id)
FROM T
GROUP BY color;
数据流中的记录可能是倾斜的,因此一些聚合算子的实例必须处理比其他实例多得多的记录,这就导致了热点问题。
本地聚合可以在上游先将具有相同键的一定数量的输入积累到单个累加器中,全局聚合将只接收少量的累加器,而不是大量的原始输入。
这可以显著降低网络shuffle和状态访问的成本。本地聚合每次累积的输入记录数量基于微批聚合的时间间隔。这意味着本地聚合依赖于启用微批聚合。
下图显示本地-全局聚合如何提高性能。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iIH4asvf-1659430842752)(E:\ProgramData\wps\WPS Cloud Files\笔记总结\flink\官网整理\flink-sql查询配置与性能优化参数详解.assets\local-global-agg-16594307595215.png)]
解释:左边聚合,聚合算子会收集所有输入,因此上面的聚合算子收到很多原始记录,造成了热点问题。
右边聚合,上游的本地聚合会先将输入在进行和聚合算子相同的操作,将输入根据key来进行聚合,下游的聚合算子只需要接收上游本地聚合之后的累加器即可,因此可以显著减少下游聚合算子的输入数据量。
下面的示例说明如何启用本地-全局聚合。
set 'table.exec.mini-batch.enabled' = 'true'; -- 本地-全局聚合依赖于开启微批聚合
set 'table.exec.mini-batch.allow-latency' = '5 s'; -- 使用5s时间去缓存输入记录
set 'table.exec.mini-batch.size' = '5000'; -- 每个聚合算子任务最多可以缓存的最大记录数量
set 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE'; -- 启用两阶段聚合策略,比如:本地-全局聚合
本地-全局优化对于一般聚合(SUM
、COUNT
、MAX
、MIN
、AVG
)的数据倾斜是有效的,但在处理distinct聚合
时性能并不理想。
例如,如果我们想要分析今天有多少独立用户登录。我们可能会进行以下查询:
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day;
COUNT DISTINCT
不擅长于减少记录,如果DISTINCT
键(即user_id)的值是稀疏的,即使启用了本地-全局
优化,也没有多大帮助。
因为累加器仍然包含几乎所有的原始记录,全局聚合将成为瓶颈(大多数重量级累加器都由一个任务处理,即在同一天)。
切分distinct聚合优化
的思想是将不同的聚合(例如COUNT(distinct col))分解为两个层次。第一个聚合按分组键
和附加的bucket
总数进行shuffle
。
bucket键
使用HASH_CODE(distinct_key) % BUCKET_NUM
计算。默认情况下,BUCKET_NUM
是1024
,可以通过table.optimizer.distinct-agg.split.bucket-num
配置。
第二个聚合按原始分组键进行shuffle
,并使用SUM
聚合来自不同bucket
的COUNT DISTINCT
值。因为相同的distinct字段值只会在相同的bucket中计算,所以转换是等价的。
bucket键
作为一个额外的分组键,分担分组键中热点的负担。bucket键使任务具有可伸缩性,以解决distinct聚合中的数据倾斜/热点问题。
拆分不同的聚合后,上面的查询将被自动重写为下面的查询:
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day;
下图显示分割distinct聚合如何提高性能(假设颜色代表天数,字母代表user_id)。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0bVxklTy-1659430842753)(E:\ProgramData\wps\WPS Cloud Files\笔记总结\flink\官网整理\flink-sql查询配置与性能优化参数详解.assets\split-distinct-16594307777257.png)]
解释:左图聚合,本地聚合会先对相同键进行聚合,以减少数据量,全局聚合的一个算子也还是会收到所有他所应该聚合的所有同一天的累加器。
右图聚合,agg1
设置bucket
为4,然后将map
的输入值通过天的hash
和bucket
取余,放到不同的agg1
并行度,agg1
接收到数据后,进行聚合。
agg2
只需要接收每个agg1
里不同颜色中user_id
的数量即可(一个颜色中有两个user_id,就传递数字2),然后对接收到的数量进行累加即可。
注意:上例只是一个简单的示例。除此之外,Flink还支持分割更复杂的聚合查询,例如,多个distinct聚合
具有不同的distinct键
(例如COUNT(distinct a), SUM(distinct b)),
与其他非不同的聚合(例如SUM
, MAX
, MIN
, COUNT
)一起使用。
目前,分割优化不支持包含用户自定义的AggregateFunction
的聚合。
下面的示例演示如何启用分割distinct聚合优化。
set 'table.optimizer.distinct-agg.split.enabled' = 'true' -- 启用distinct聚合分割
在某些情况下,用户可能需要计算来自不同维度的UV(唯一访问者)的数量,例如来自Android的UV,来自iPhone的UV,来自Web的UV和总UV。很多用户会选择CASE WHEN
来实现这个需求,例如:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day;
建议使用FILTER语法
而不是CASE WHEN
。因为FILTER
更符合SQL标准,且能获得更大的性能优化。FILTER
是用于聚合函数的修饰符,用于限制聚合中使用的值。将上面的示例替换为FILTER
修饰符,如下所示:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
Flink SQL优化器可以识别相同distinct
键上的不同筛选器参数。例如,在上面的示例中,所有三个COUNT DISTINCT
都在user_id
列上。
这样,Flink就可以只使用一个共享状态实例而不是三个状态实例来减少状态访问次数和状态大小。在某些任务中可以获得显著的性能优化。