本文整理自字节跳动基础架构周伊莎的演讲内容。Flink SQL 作为实时数仓建设中重要的工具,能够帮助用户快速开发流式任务,支持实时数据处理的场景和需求,本文将分享 SQL 作业迭代中状态的保持——状态迁移相关的现状、问题解决及未来规划。
作者|字节跳动基础架构工程师-周伊莎
Flink SQL 作为实时数仓建设中重要的工具,能够帮助用户快速开发流式任务,支持实时数据处理的场景和需求。相比 DataStream 作业,SQL 作业在开发成本和维护成本上都具有非常大的优势,无需掌握复杂的开发语言,编程环境等等,无需经历打包,部署等耗时的流程,简单地编辑 SQL 语句即可创建拥有复杂逻辑的流式任务。然而,对用户屏蔽掉底层细节,意味着 SQL 作业会丧失一些代码层面的灵活度。
其中一个非常重要的话题就是 SQL 作业迭代中状态的保持——状态迁移。
现状
首先,为什么要迁移旧状态呢?
除了一些简单的 ETL 任务,很多流式任务承载着复杂的业务逻辑,例如:计算每分钟的订单总额。这些计算逻辑的中间结果在 Flink 内部会作为状态被保存,方便在 Failover 或迭代后基于上一个状态继续计算。
当前,如果我们无法迁移状态时,旧的状态会被丢弃,然后回拨作业 Offset 去重跑任务,以达到计算的连续性(通常会保证 At Least Once)。
那么这样做有什么问题呢?
- 重跑会带来计算资源的浪费;
- 对于时延性要求比较高的作业来说,重跑带来的数据 Delay 是用户无法接受的;
- 如果有一些长周期的任务,譬如说计算月粒度窗口的聚合,而输入的数据只保存了 7 天或者更短的时间,那么这样的任务就会因为输入数据的缺失而无法重跑;
- 在某些场景下可能会导致计算出错,例如,将 Offset 回拨到某个窗口的起始时间戳,则上一个窗口的迟到数据可能会导致错误的输出。
因此,在流式作业的迭代时,需要尽量迁移旧状态,来保证计算的连续性和正确性。
SQL 作业与状态
状态的恢复有两个充分必要条件,其一是 OperatorID 的一致性,OperatorID 与算子的状态是强绑定的——算子状态的 Namespace 以其 OperatorID 命名;其二是算子 State Serializer 的兼容性。当 OperatorID 保持不变且算子新旧 State Serializer 相互兼容时,才能成功从 Checkpoint 中恢复作业的状态。
在 DataStream 作业中,可以通过为有状态算子设置 UID / UID Hash 来保证 OperatorID 的一致性,通过自定义 State Serializer 来解决 Serializer 的兼容问题,因此,即使作业进行迭代,逻辑改变,也很容易在作业版本间平滑地迁移状态。
但是在 SQL 作业中,用户直观可见的只有 SQL 这一层,SQL 层往下的 Table 层, Datastream API 层 以及 Runtime 层,用户都是无法直接控制的。因此 SQL 作业的状态对用户来说是完全黑盒的,意味着 SQL 作业的用户是无法通过 API 来完成与状态的交互的,同时,迭代中对 SQL 的修改,也很容易使得前文的两个条件被打破,从而导致状态无法迁移。
问题分类
由此,我们可以把 SQL 作业状态迁移的问题分为两大类:
- DAG 极易发生变更;
- State serializer 不可兼容。
首先来看看问题一,SQL 作业的 DAG 是极易随着用户的修改发生变更的。包括两种修改:
- 第一种是隐式修改:例如,在上图的 SQL 中,Bigint Field 后面增加了一个加 2000 这样的逻辑,导致 DAG 图里新增一个 Calc 节点;打开了 Mini-batch 优化或者为 Source 新增了Watermark,也会导致作业的 DAG 中新增 Mini-batch Assigner 或者 Watermark Assigner 节点。
- 另一种是显式修改:例如,新增维表,输入的 Source,输出的 Sink 等等,这些都是比较直观的导致 DAG 图新增节点的情况。
DAG 发生变更之后,OperatorID 基本都会发生变化,导致状态无法恢复。这是本文计划解决的问题类型。
问题二是 State serializer 不可兼容。在 SQL 任务中,Flink 版本不变的情况下,相同的算子使用的 State 类型是一致的,例如,GroupAggregate 算子里会存一个 ValueState,这个 valueState 里面存的是一个由所有 Accumulator 组成的 Row。但随着 SQL 中相关逻辑的修改,State 里实际存储的数据类型会发生变化,导致新旧 State serializer 无法兼容。
例如上图中,我们在第四行新增了一个 Last value 聚合,GroupAggregate 算子的存储的 ValueState 从一个4列 Row 的变成一个5列的 Row,因此导致新旧 Serializer 不兼容,状态无法被正常读取,从而恢复失败。
这类问题的解决方案不在本文的探讨范围内,将在未来展望一节中简要介绍字节目前的探索方案。
解决思路
在 DataStream API 层,Flink 已经提供了一种帮助用户在 DAG 变更时进行状态迁移的能力,即为算子设置 UID 或者 UID Hash 来保证 Operator ID 不变。
那么在 SQL 作业中怎么去使用这样的能力呢?主要分三步:
- 先为 SQL 作业提供 DAG 的可视化预览。
- 允许用户对 DAG 中算子的属性进行编辑。
- 将用户编辑的 UID 和 UID Hash 传递到运行时。
DAG 预览
运行时我们能在 Flink Web UI 上看到一个 Task 粒度的 DAG 图,它对应的内部抽象是 JobGraph。而在我们的场景下需要一个算子粒度的 DAG 图,内部也有一个对应的抽象是 StreamGraph。但为了隔离外部存储的 DAG 和 StreamGraph 的实现,此处提出一个独立的抽象,叫 PlanGraph,将 StreamGraph 里的一些属性映射上去。
那么 StreamGraph 中的算子和 PlanGraph 中的节点如何形成稳定的映射呢?我们复用了Job Graph Generator 中使用的 Stream Graph Hasher V2 来为每个算子生成确定性的 ID。
上图右侧是 PlanGraph 抽象的一些核心 Field,第一个是上文提到的确定性 ID;第二个是 Generated OperatorID 这个是与 JobGraph 中算子的 OperatorID 一一对应的。第三个是 User Provided Hash,用户可以通过这个字段来为每一个算子指定他的 UID 和 UID Hash。另外还有一些其他的 StreamNode 的属性和一些展示相关的属性。
上图展示的是一个 SQL 任务初始的可视化效果,左侧是一个简单的 SQL,它的逻辑是从 Source 读数据,做一次全局聚合后写出到 Sink 。右侧是对应的可视化效果,它展示了所有 Task 粒度的节点,展开每个 Task 节点,可以看到各个 Task 包含的算子链。点击算子或 Task 节点,下方的属性 Tab 会展示节点相关属性,如算子粒度会展示:算子 ID、算子名称、并行度等等。
注意这里的一个小 Tip,为了减少用户的理解复杂度,对外暴露的属性只有算子 Hash 一个,而实际上这个值会被同时设置成算子的 UID 和 UID Hash。
另外,为了减少用户的配置工作量,字节内部版本在检查 Checkpoint 中各算子 State 的元信息时,会跳过没有实际存储状态的部分,这意味着用户无需为无状态的算子去配置 UID。
当用户对任务做了一些迭代修改导致 DAG 发生变更后,会展现出上图所示的 DAG Diff 面板。图中这个例子里,我们为任务打开了 Mini-batch 优化(注意,为了举例方便,我们暂时关闭了 Local-global 优化),可以看到右侧的新 DAG 相较于左侧的旧 DAG 新增了一个 Mini-batch Assigner 节点。
显然,此时 GroupAggregate 算子的 OperatorID 会发生变化,导致下一次重启时,它的状态恢复失败。
那么如果要做状态的迁移该怎么操作呢?
首先在左右两张图上都选中我们需要迁移状态的 Group Aggregate 算子,从左侧把旧的算子 ID,复制到右侧的 Hash 属性中即可,至此我们就完成了基本的编辑步骤,只要将这些信息提交到运行时,我们就能将旧任务的状态迁移至新任务中了。
整体使用流程
下面介绍一下整体的使用流程:
- 对于每一个作业版本,包括它的 SQL 跟 Normal configs ,系统都会为其生成一个 PlanGraph ,然后存储到外部系统;
- 当用户对作业做了一些迭代和修改之后,会产生新版本的 SQL 跟 Normal configs,和与此对应的 PlanGraph;
- 把旧的 PlanGraph 与新的 PlanGraph 进行 diff 对比后,由用户手工地修改或者采用自动映射来复用旧图中的算子 ID;
- 修改后的PlanGraph会和SQL及Normal configs一起提交给Flink API, PlanGraph 中的算子 ID 会被映射到实际生成的 JobGraph 中去,最终,包含这些信息的 JobGraph 会被提交到运行时。
到此为止,为用户提供了基础的 SQL 作业状态迁移能力。
易用性问题
在实际使用中,上述方案会遇到非常多的易用性问题。用户的 SQL DAG 远不止几个算子那么简单,对于复杂的 DAG,为它所有的节点去手动配置 UID 或者 UIDHash 的成本是非常高的。即使真的要手动地去配置,我们也很难快速地去定位到底哪一些节点是有状态的。
针对这些易用性问题,我们提供了以下解决方案:
- 提供 Best Effort 的自动映射,把旧图中的算子 ID 自动地映射到新图上;
- 高亮使用状态的节点;
- 除图形化的 DAG Diff 外,额外提供 DAG 对应的 JSON 的代码对比。
Best Effort 的自动映射
本功能旨在减少用户手动配置工作量,自动为在新旧图中相同的节点完成算子 UID / UID Hash 的映射。Best Effort 意味着尽量地进行映射,但不保证所有的节点都可以完成映射。
在阐述具体的算法之前,需要先了解一个前提:算子的 Description (即 RelDetailed Description,包含对应 RelNode 的 Plan 级别的属性)是描述算子的一个强有力的信息。当两个算子 Description 完全相等的时候,它在新旧图中大概率是相同的节点。
以下是这个算法的基本流程:
- 分别在新图和旧图里去收集具有相同 Description 的算子;
- 为每一对这样的新旧算子计算它们的相似度,并放入最大堆。相似度的计算 Tips:主要是去比较它所有的出入节点的属性, 每有一个相同的出或入节点时,都会被加权后累加到最终的相似度中。
- 轮询这个最大堆,直到新图或旧图中的所有节点都完成匹配。每个节点仅会被匹配一次,每发现一对匹配的节点,从旧节点中取出它的 Generated OperatorID 填入到新节点的 User Provided Hash 中。
至此,一次 Best Effort 的自动映射就已经完成。在实际应用中,这种算法效果良好,对于简单的 DAG 改动,能够完成 100% 的映射,无需用户再进行手动配置。
JSON 代码比较
第二个提高易用性的功能是提供算子 JSON 代码的比较。当 DAG 图十分复杂且自动映射功能无法完成全部映射时,仍然需要用户为剩余的有状态节点手动设置 UID / UID Hash,对于这部分节点的定位,我们可以通过 JSON 代码的形式来呈现。
按照拓扑排序的顺序以 JSON 的形式呈现算子属性列表,当新增或者删除节点时,通过 JSON 代码的比较,可以非常快速的定位到两张图的 Diff,而在新旧图中相同的节点,除了 OperatorID 发生变化,其余属性是完全一致的,只需简单的从左侧复制 OperatorID 到右侧即可。
其他优化
第三个是一些比较小的优化点,包括把有状态节点打上特殊的标记,来提示用户去重点关注。
另外还有一个图搜索的功能,用户可以通过它的一些节点属性例如 Description 等来进行节点的搜索,这样可以方便用户在图模式下进行手工修改时,快速的定位需要修改的节点。
在整体的规划中,我们的目标是构建全面的 SQL 作业状态恢复能力图谱——划分能力边界, 明确可恢复场景,提供解决方案,对不可恢复场景做到提前发现。
因此计划从以下两个方面着手:
- 提供完善的状态不可恢复事前检查能力;
- 持续增强 SQL State 恢复能力,覆盖两类典型问题的典型场景。
对于第一个方向,目前我们正在探索 FLIP-22 中提到的 Eager State Declaration, 来将获取 State 元信息的时机提前,允许在运行时之前就获取相关元信息来判断状态是否可成功迁移。
对于第二个方向,上文介绍的工作为 SQL 作业在 DAG 变更导致的状态恢复问题提供了一套较为完整的解决方案,但我们仍在思考这个场景是否有更易用的解决方案,例如,利用 SQL Hint 来为各个 SQL 代码段来设置更为易读的 UID。
除此之外,我们还在探索基于列血缘信息的 State Schema Evolution 方案,为 State 中存储的RowData 的每一个字段提供血缘 Digest,以期解决典型场景下 State Serializer 不兼容导致的状态无法迁移的问题。
目前,字节跳动流式计算团队同步支持的火山引擎流式计算 Flink 版正在公测中,支持云中立模式,支持公共云、混合云及多云部署,全面贴合企业上云策略,欢迎申请试用:流式计算 Flink 版-火山引擎
欢迎关注「字节跳动云原生计算」公众号,后台回复加入技术交流群,参与技术交流,了解更多信息!