摘要:本文整理自网易游戏资深开发工程师林小铂在 Flink Forward Asia 2021 平台建设专场的演讲。主要内容包括:
- 网易游戏 Flink SQL 发展历程
- 基于模板 jar 的 StreamflySQL v1
- 基于 SQL Gateway 的 StreamflySQL v2
- 未来工作
网易游戏实时计算平台叫做 Streamfly,这个名字取名自电影《驯龙高手》中的 Stormfly。由于我们已经在从 Storm 迁移到 Flink,所以将 Stormfly 中的 Storm 替换成了更为通用的 Stream。
Streamfly 前身是离线作业平台 Omega 下的名为 Lambda 的子系统,它负责了所有实时作业的调度,最开始开始支持 Storm 和 Spark Streaming,后来改为只支持 Flink。在 2019 年的时候我们将 Lambda 独立出来以此为基础建立了 Streamfly 计算平台。随后,我们在 2019 年底开发并上线了第一个版本 Flink SQL 平台 StreamflySQL。这个版本基于模板 jar 提供了基本 Flink SQL 的功能,但是用户体验还有待提升,因此我们在 2021 年年初从零开始重新建设了第二个版本的 StreamflySQL,而第二个版本是基于 SQL Gateway。
要了解这两个版本的不同,我们需要先回顾下 Flink SQL 的基本工作流程。
用户提交的 SQL 首先会被 Parser 解析为逻辑执行计划;逻辑执行计划经过 Planner Optimizer 优化,会生成物理执行计划;物理执行计划再通过 Planner CodeGen 代码生成,翻译为 DataStream API 常见的 Transformation;最后 StreamGraphGenerator 会将这些 Transformation 转换为 Flink 作业的最终表示 JobGraph 提交到 Flink 集群。
上述一系列过程都发生在 TableEnvironment 里面。取决于部署模式的不同,TableEnvironment 可能运行在 Flink Client 或者 JobManager 里。Flink 现在支持 3 种集群部署模式,包括 Application、 Per-Job 和 Session 模式。在 Application 模式下,TableEnvironment 会在 JobManager 端运行,而在其余两种模式下,TableEnvironment 都运行在 Client 端。不过这三种模式都有一个共同的特点,TableEnvironment 都是一次性的,会在提交 JobGraph 之后自动退出。
为了更好地复用 TableEnvironment 提高效率和提供有状态的操作,有的项目会将 TableEnvironment 放到一个新的独立 Server 端进程里面去运行,由此产生了一种新的架构,我们称之为 Server 端 SQL 编译。相对地,还有 Client 端 SQL 编译。
有同学可能会问,为什么没有 JobManager 端 SQL 编译,这是因为 JobManager 是相对封闭的组件,不适合拓展,而且即使做了达到的效果跟 Client 端编译效果基本一样。所以总体来看,一般就有 Client 和 Server 两种常见的 Flink SQL 平台架构。
Client 端 SQL 编译,顾名思义就是 SQL 的解析翻译优化都在 Client 端里进行(这里的 Client 是广义的 Client,并不一定是 Flink Client)。典型的案例就是通用模板 jar 和 Flink 的 SQL Client。这种架构的优点是开箱即用,开发成本低,而且使用的是 Flink public 的 API,版本升级比较容易;缺点是难以支持高级的功能,而且每次都要先启动一个比较重的 TableEnvironment 所以性能比较差。
然后是 Server 端 SQL 编辑。这种架构将 SQL 解析翻译优化逻辑放到一个独立的 Server 进程去进行,让 Client 变得非常轻,比较接近于传统数据库的架构。典型的案例是 Ververica 的 SQL Gateway。这种架构的优点是可拓展性好,可以支持很多定制化功能,而且性能好;缺点则是现在开源界没有成熟的解决方案,像上面提到 SQL Gateway 只是一个比较初期的原型系统,缺乏很多企业级特性,如果用到生产环境需要经过一定的改造,而且这些改造涉及比较多 Flink 内部 API,需要比较多 Flink 的背景知识,总体来说开发成本比较高,而且后续版本升级工作量也比较大。
编者按:Apache Flink 社区目前正在开发 SQL Gateway 组件,将原生提供 Flink SQL 服务化的能力,并兼容 HiveServer2 协议,计划于 1.16 版本中发布,敬请期待。感兴趣的同学可以关注 FLIP-91 [1] 和 FLIP-223 [2] 了解更多,也非常欢迎大家参与贡献。
回到我们 Flink SQL 平台,我们 StreamflySQL v1 是基于 Client 端 SQL 编译,而 v2 是基于 Server 端的 SQL 编译。下面就让我逐个介绍一下。
StreamflySQL v1 选择 Client 端 SQL 编译的主要原因有三个:
上图是 v1 版本的整体架构图。我们在主要在 Lambda 作业平台的基础上新增了 StreamflySQL 后端作为配置中心,负责根据用户提交的 SQL 和作业运行配置加上通用的模板 jar 来生成一个 Lambda 作业。
总体的作业提交流程如下:
StreamflySQL v1 实现了 Flink SQL 平台从零到一的建设,满足了部分业务需求,但仍有不少痛点。
第一个痛点是响应慢。
以一个比较典型的 SQL 来说,以模板 jar 的方式启动作业需要准备 TableEnviroment,这可能会花费 5 秒钟,然后执行 SQL 的编译优化包括与 Catalog 交互去获取元数据,也可能会花费 5 秒钟;编译得到jobgraph之后还需要准备 per-job cluster,一般来说也会花费 20 秒以上;最后还需要等待 Flink job的调度,也就是作业从 scheduled 变成 running 的状态,这个可能也需要 10 秒钟。
总体来说,v1 版本启动一个 Flink SQL 作业至少需要 40 秒的时间,这样的耗时相对来说是比较长的。但是仔细分析这些步骤,只有 SQL的编译优化和 job 调度是不可避免的,其他的比如 TableEnvironment 和 Flink cluster 其实都可以提前准备,这里的慢就慢在资源是懒初始化的,而且几乎没有复用。
第二个痛点是调试难。
我们对 SQL 调试的需求有以下几点:
在 v1 版本中,我们对上述需求设计了如下解决方案:
调试模式的架构如上图所示,比起一般的 SQL 提交流程,主要区别在于作业不会提交到 YARN 上,而是在 Lambda 服务器的本地执行,从而节省了准备 Flink 集群的开销,并且更容易管控资源和获取结果。
上述调试解决方案基本可用,但是实际使用过程中依然存在不少问题。
第三个痛点是 v1 版本只允许单条 DML。
相比传统的数据库,我们支持的 SQL 语句是很有限的,比如,MySQL 的 SQL 可以分成 DML、DQL、DDL 和 DCL。
综合来看,v1 版本只支持了单条 DML,这让我们很漂亮的 SQL 编辑器变得空有其表。基于以上这些痛点,我们在今年调研并开发了 StreamflySQL v2。v2 采用的是 Server 端 SQL 编译的架构。
我们的核心需求是解决 v1 版本的几个痛点,包括改善用户体验和提供更完整的 SQL 支持。总体的思路是采用 Server 端的 SQL 编译的架构,提高可拓展性和性能。此外,我们的集群部署模式也改成 Session Cluster,预先准备好集群资源,省去启动 YARN application 的时间。
这里会有两个关键问题。
这是 StreamflySQL v2 的架构图。我们将 SQL Gateway 内嵌到 SpringBoot 应用中,开发了新的后端。总体看起来比 v1 版本要复杂,原因是原本的一级调度变成了会话和作业的两级调度。
首先用户需要创建一个 SQL 会话,StreamflySQL 后端会生成一个会话作业。在 Lambda 看来会话作业是一种特殊作业,启动时会使用 yarn-session 的脚本来启动一个 Flink Session Cluster。在 Session Cluster 初始化之后,用户就可以在会话内去提交 SQL。StreamflySQL 后端会给每个会话开启一个 TableEnvironment,负责执行 SQL 语句。如果是只涉及元数据的 SQL,会直接调用 Catalog 接口完成,如果是作业类型的 SQL,会编译成 JobGraph 提交到 Session Cluster 去执行。
v2 版本很大程度上解决了 v1 版本的几个痛点:
不过 SQL Gateway 虽然有不错的核心功能,但我们使用起来并不是一帆风顺,也遇到一些挑战。
首先最为重要的是元数据的持久化。
SQL Gateway 本身的元数据只保存在内存中,如果进程重启或是遇到异常崩溃,就会导致元数据丢失,这在企业的生产环境里面是不可接受的。因此我们将 SQL Gateway 集成到 SpringBoot 程序之后,很自然地就将元数据保存到了数据库。
元数据主要是会话元数据,包括会话的 Catalog、Function、Table 和作业等等。这些元数据按照作用范围可以分为 4 层。底下的两层是全局的配置,以配置文件的形式存在;上面两层是运行时动态生成的元数据,存在数据库中。上层的配置项优先级更高,可以用于覆盖下层的配置。
我们从下往上看这些元数据:
这样比较灵活的设计除了解决了元数据持久化的问题,也为我们的多租户特性奠定了基础。
第二个挑战是多租户。
多租户分为资源和认证两个方面:
第三个挑战是水平拓展。
为了高可用和拓展服务能力,StreamflySQL 很自然需要以多实例的架构部署。因为我们已经将主要的状态元数据存到数据库,我们可以随时从数据库构建出一个新的 TableEnvironment,所以 StreamflySQL 实例类似普通 Web 服务一样非常轻,可以很容易地扩容缩容。
但是并不是所有状态都可以持久化的,另外有些状态我们故意会不持久化。比如用户使用 SET 命令来改变 TableEnvironment 的属性,比如开启 Table Hints,这些属于临时属性,会在重建 TableEnvironment 后被重置。这是符合预期的。再比如用户提交 select 查询做调试预览时,TaskManager 会与 StreamflySQL 后端建立 socket 链接,而 socket 链接显然也是不可持久化的。因此我们在 StreamflySQL 的多实例前加了亲和性的负载均衡,按照 Session ID 来调度流量,让在正常情况下同一个用户的请求都落到同一个实例上,确保用户使用体验的连续性。
第四个挑战是作业状态管理。
其实这里的状态一词是双关,有两个含义:
这里我可以分享下我们的算法。其实自动查找最新 checkpoint 的功能 Lambda 也有提供,但是 Lambda 假设作业都是 Per-Job Cluster,因此只要查找集群 checkpoint 目录里最新的一个 checkpoint 就可以了。但这样的算法对 StreamflySQL 却不适用,因为 Session Cluster 有多个作业,最新的 checkpoint 并不一定是我们目标作业的。因此,我们改为了使用类似 JobManager HA 的查找方式,先读取作业归档目录元数据,从里面提取最新的一个 checkpoint。