本文整理自网易互娱资深开发工程师、Apache Kyuubi Committer 林小铂的《基于 Kyuubi 实现分布式 Flink SQL 网关》分享,内容主要分为以下四部分:
- Kyuubi 是什么
- Kyuubi 架构设计
- Flink x Kyuubi 优势
- 未来展望
简单来说,Kyuubi 是一个分布式、多租户的 SQL 网关。相信大家先会想到的问题是,Flink 已经提供 SQL Gateway,为什么还需要 Kyuubi 呢?其实答案就是分布式和多租户两个关键词,两者相辅相成,Kyuubi 很多高级功能都是从分布式和多租户衍生而来的。


Kyuubi 在 2018 年立项,最初目的主要为解决 Hive 用户迁移到 Spark SQL (Spark Thrift Server) 的痛点,比如租户间隔离和服务稳定性的问题。2018-2020 年是 Kyuubi 的 0.x 版本阶段,这个时期主要定位为 Spark Thrift Server 的升级版,用户主要来自 Spark 社区。
在 Kyuubi 发展过程中,我们逐渐意识到 Kyuubi 解决的问题并不是 Spark 独有,而是不同引擎都会遇到的共性问题。于是 2021 年 Kyuubi 进行了架构升级,将通用解决方案抽象成通用的 Kyuubi Server,然后对接不同引擎逻辑则成为 Kyuubi Engine。这是 Kyuubi 的 1.x 版本阶段,定位演变为通用的分布式多租户 SQL 网关。
2021 年 9 月 Kyuubi 进入 Apache 孵化器,在 Flink 1.16 引入 SQL Gateway 后,Kyuubi Flink Engine 进行重构将底层改为基于 Flink SQL Gateway。
2023 年初 Kyuubi 从孵化器顺利毕业成为 Apache 顶级项目。

Kyuubi 的发展历程和开源大数据的计算引擎发展趋势是密不可分的。
业界最早的开源计算引擎是 MapReduce,这时编程 API 是 Java 不支持 SQL,所以也没有 SQL 网关一说。
在 2014 年左右,伴随着 SQL on Hadoop 潮流,Hive 代替 MapReduce 成为主流引擎。Hive 提供 Hive Server2 负责将 SQL 翻译成底层的 MapReduce,可以说是 SQL 网关的鼻祖。
后续 Spark 出现,其内存计算性能比起 Hive 大大提升,不少用户从 Hive 迁移至 Spark。Spark 提供了SparkSQL 编程 API,然后对应的 SQL 服务组件为 Spark Thrift Server。
在最近几年,Flink 在实时计算和流批一体上的优势又吸引不少用户从 Spark 或者 Hive 迁移过来。Flink 提供 FlinkSQL,对应的 SQL 服务组件为 Flink SQL Gateway。
在这几轮技术浪潮中,每个引擎有自己的 SQL 服务组件(我们可以统称他们为 SQL Gateway),但它们的功能其实并没有完全对齐,很多企业级的特性是缺失的,而 Kyuubi 的目标则是作为统一标准化的 SQL 网关来屏蔽掉这些差异。

上图是不同 SQL 网关的关键特性对比。
多租户包括认证和资源隔离两个关键标准。
水平拓展和高可用这两个特性通常和分布式的架构绑定,因此分布式架构的 Hive Server2 和 Kyuubi 都是支持的,而 Spark Thrift Server 和 Flink SQL Gateway 暂不支持。
SQL 网关可以接受一些低版本的 Client 的请求,但通常要求和集群的计算引擎的版本是保持一致的。这会导致升级集群版本的时候可能要开多个 SQL 网关的实例,Kyuubi 对这方面做了兼容处理。Kyuubi 的一个实例,是可以兼容多个版本的计算引擎。比如 Kyuubi 的最新版本 Flink Engine ,它兼容 Flink 1.16 到 1.18 三个版本。

Kyuubi 从部署架构上分为 Kyuubi Client、Kyuubi Server 和 Kyuubi Engine 三层:

在 Kyuubi 架构中 Client 与 Server、Server 与 Engine 之间的通信均有基于 Zookeeper 的服务发现和负载均衡,这意味着不同模块之间是非常松耦合的。在同一个 Zookeeper namespace 下,Client 可以连到任意一个 Server 上, Server 也能访问任意一个 Engine。
一个请求最终落到哪个 Engine 上由 Kyuubi Server 自动控制。Kyuubi Server 会根据多租户隔离策略的自动选择一个合适的 Engine,如果满足要求的 Engine 不存在,那么 Kyuubi Server 会即时启动一个。相对地,当一个 Engine 闲置一段时间后,Engine 会自动退出来节省资源,当然也可以通过 Kyuubi Server 主动关闭 Engine。可以说 Kyuubi Engine 是按需启动的,然后生命周期也是由 Server 管理。

那么 Kyuubi Server 如何决定是否启动一个新的 Engine 呢?这很大程度上是由 Kyuubi 引擎隔离级别决定的。
Kyuubi 提供 4 种引擎隔离级别:
在引擎隔离级别之上,Kyuubi 还提供细粒度的路由设定,也就是 Subdomain。比如一个用户可能希望用一个 Flink Engine 做实时计算,另外一个 Flink Engine 做 OLAP 查询。针对这种情况,Kyuubi 提供 subdomain 配置项。subdomain 允许在一个 namespace 再做划分。比如上述场景,用户可以分别在连接时指定 kyuubi.engine.share.level.subdomain=realtime 或 kyuubi.engine.share.level.subdomain=olap ,这样 Kyuubi Server 就会启动两个 Flink Engine。

大部分情况下 Kyuubi Server 是没有状态的,但也会有例外。例如当用户批量提交大量的 SQL 到 Kyuubi,Kyuubi Server 会将请求放入队列,但这个队列是在内存的,其他 Kyuubi Server 实例并不知道这批请求。如果用户前后两个请求被路由到不同的 Kyuubi Server,那么可能会产生状态不一致的问题。用户可能会说我刚刚提交了一批作业,为什么现在查不到。
为此,Kyuubi 针对不同场景进行了优化。首先取决于 Client 类型,如果是基于 JDBC,JDBC 是长连接比较稳定,所以一般不会出问题。但如果是基于 REST,那么都是 HTTP 短链接,负载均衡器很可能把请求路由到不同实例上。
Kyuubi 的解决方案是引入了数据库来持久化 Server 的状态,包括请求的内容、Server ID 和 Client ID。如果其他 Kyuubi Server 接收到不属于自己的请求,可以通过 REST Fronted 将请求转发给正确的 Kyuubi 实例。

这是 Kyuubi 的一个标准优势:水平拓展、多租户、高可用、多版本和动态资源配置。
值得一提的是,在以前主打实时计算的背景下,其实 Flink 对 SQL 网关的需求不是很高,通常我们可能单实例的 SQL 网关就能解决问题。
但是随着 Flink 往流批一体和 OLAP 的方向发展,我们对 SQL 网关的需求越来越高,无论是连接数还是请求量都有可能翻一个数量级,甚至两个数量级。那么这时 Kyuubi 分布式的解决方案就会成为一个必需品。

Kyuubi 的 Flink Application 模式其实和标准的 Application 模式是有一定差别的。Flink 是提供了四种部署的模式,可以分成两类:
Session 和 Standalone 模式。这类是单集群可以运行多作业,但是集群的生命周期需要手动管控,需要在请求之前启动一个集群,不用了再手动关掉。
Application 和 PerJob 模式。这类是单集群只允许单作业,但是优点是集群的生命周期可以直接绑定作业的生命周期,作业结束集群就会退出,释放资源。
以上这两类的部署模式是互有优缺点的。而 Kyuubi 的 Application 模式结合了两者的优点。它允许单集群多作业,而集群的生命周期由 Kyuubi 管控,用户是不用去操心的。为了解这点,我们下面先简单回顾一下 Flink SQL 的翻译流程。

从用户提交的 Flink SQL 到变成 Flink Runtime 可以去执行的作业,会有一个 SQL 翻译的过程。这个过程可以分成四步。
通常来说,这四步都会在一个 TableEnvironment里面去完成。

在 PerJob 架构下 Flink SQL Gateway 会在内部开启 TableEnvironment , SQL 翻译的四步全部做完后,生成 JobGraph 提交给 JobManager 。
这是很标准的一个使用制式,当然是没什么问题,当下 PerJob 的部署模式已经过时了,我们要迁移到 Application 的模式下。

在标准的一个 Application 模式下,会要求用户的 main 函数在 JobManager 上去执行,那么这意味着SQL 翻译至少最后一步的部署,会在 JobManager 上去完成。
针对这个问题,社区提出一个方案 FLIP-316,就是 Flink SQL Gateway去支持 Application 的一个模式。在这个议案里面 Flink SQL Gateway 会在本地内部的 TableEnvironment 完成 SQL 翻译的前两步,也就是解析优化。生成中间的物理执行计划可以被序列化为 JsonPlan ,然后与其他资源文件一起提交给 JobManager ,JobManager 会启一个main 函数去完成编译和部署这两步。通过这种方式 Flink SQL Gateway 就可以去支持 Application 模式。
但是这个方案有一个缺点,就是 JsonPlan 目前是只支持 DML ,这意味着在一些 OLAP 场景,常用的 Select 查询没有办法直接通过 Application 来支持。

Kyuubi 的 Flink Application 架构模式是怎么解决这些问题的。
首先, Kyuubi 的 Flink Engine 是运行在 JobManager 里面的,它可以通过 RPC 的方式去接收到用户提交的 SQL ,他在内部的 Table Environment ,就可以像Flink SQL Gateway 里的 PerJob 模式,一次性完成上述四步工作。
最后得到的作业,通过 JobManager 内部的 Dispatcher API 去提交。通过这种方式就可以绕开 Application 模式下 JobManager 要求每个集群只有一个作业的限制,也没有 JsonPlan 带来的问题。
对于 JobManager 来说, Kyuubi Flink Engine相当于是一直在运行,没有退出的用户main函数。 Flink Engine 什么时候退出, JobManager 就会 Shutdown 把资源给释放掉。

这可以分成统一入口和引擎插件两个方面。
另一方面,Kyuubi 提供灵活的插件机制支持满足平台管理需求,例如 SQL 血缘提供、集成 Ranger 提供列级的访问控制或者监听事件。部分插件会依赖于引擎本身的支持,例如 Kyuubi Spark Engine 的血缘提取是通过 Spark 提供的 QueryExecutionListener 接口完成,而 Flink 的血缘支持也在 1.18 版本有了接口,后续 Kyuubi 这边也会跟进。

Kyuubi 的 Metric 系统是很完善的。
同时,Kyuubi Server 提供内嵌的 Web UI,可以很直观的看到各项指标。

这是 Kyuubi 的 Flink Engine 的一些 Roadmap ,其中很多工作依赖 Flink 本身暴露的能力,需要两个社区共同推进。