作为新一代数据湖平台,Apache Hudi在实时场景中广泛使用。但在应用过程中也存在同步执行、异步执行等问题。本文将从表服务管理角度,详细解读字节跳动基于Apache Hudi的优化方案和最佳实践。关注字节跳动数据平台公众号,回复【0222】获得此次分享PPT。
Hudi基本概念
Apache HUDI 作为数据湖框架的一种开源实现,提供了事务、高效的更新和删除、高级索引、 流式集成、小文件合并、log文件合并优化和并发支持等多种能力,支持实时消费增量数据、离线批量更新数据,并且可通过 Spark、Flink、Presto 等计算引擎进行写入和查询。
Hudi 表由 timeline 和 file group 两大项构成。Timeline 由一个个 commit 构成,一次写入过程对应时间线中的一个 commit,记录本次操作修改的文件。
相较于传统数仓,Hudi 要求每条记录必须有唯一的主键,并且同分区内,相同主键只存在在一个 file group 中。底层存储由多个 file group 构成,有其特定的 file ID。File group 内的文件分为 base file 和 log file,其中 log file 记录对 base file 的修改,通过 compaction 合并成新的 base file,多个版本的 base file 会同时存在。
Hudi表类型
Hudi 表分为 COW 和 MOR两种类型:
- COW 表适用于离线批量更新场景,对于更新数据,会先读取旧的 base file,然后合并更新数据,生成新的 base file。
- MOR 表适用于实时高频更新场景,更新数据会直接写入 log file 中,读时再进行合并。为了减少读放大的问题,会定期合并 log file 到 base file 中。
Hudi表服务介绍
Hudi 表中的数据可能存在 Base File 和 Log File 中,需要使用 Compaction 进行合并,而且会分布在多个 File Group 中,在部分 File Group 数据量过小或着过大的时候,为了保证读取性能需要进行 File Group 的重分布。所以为了维护 Hudi 表写入了读取性能,文件数目等多种需求,Hudi 提供了多种重要的表服务,它们是:
- Compaction:用于合并 Base File 和 Log File 的,同时会生成一个新版本文件。通过这种预先合并的方式就可以提升读取效率。如果不进行 Compaction,需要在读取的过程中对 Base File 和 Log File 进行合并,在完成 Compaction 之后会生成新版本文件,从而提升读取效率。
- Clustering:用于重分布 File Group, 最主要的用处是用于合并小文件。并且在合并过程中,可以提供多种排序能力,使得读取时可以有更好的 data skipping 的能力。
- Clean:用于清理版本过期的文件,会将多余的版本自动清理掉,防止历史文件过多的存留。
- Rollback:用于回滚未完成的 instant 所写入的文件及元数据。如果有一次写入没有完成就失败了,在这种情况下,它会存留下一些未完成的文件,比如还有一些元数据的记录,需要用 Rollback 来回滚清理掉这次 instant 对应的数据文件和元数据记录。
- Indexing:用于查询时提升读取性能。如果提前构建索引读取,读取的时候能够更快定位到文件。
Hudi表服务的问题
下面介绍表服务(Table Service)的几种执行模式:
- 同步执行:在每次执行之后,都会产生一个 commit。在每次产生 commit 之后,会根据配置来判断一下是否需要进行 Table Service,比如之前提到的 Compaction 和 Clean,会依次把这些需要执行的 Table Service 都执行一遍,之后继续下一次的写入过程。这种方式结构是最简单的,但也会带来一些问题,比如执行表服务时会阻塞写入。
- 异步执行,会共享写入任务资源,导致任务资源占用较大,对于 Flink 实时入湖作业来说,增量导入的数据所需要的资源与存量数据 Compaction 所需要的资源其实往往是不太一样的。同时会因为执行表服务而影响写入任务的稳定性。比如两个任务并发写入同一张表,哪怕写入的数据是完全不冲突的。由于各自在进行 Compaction,导致任务资源占用较大。
- 独立任务执行,需要手动启动一个任务来进行表服务,同时缺少管理功能,导致维护成本较高。此模式会有一个任务来进行数据的写入,同时再起另外一个任务来进行 Table Service 的执行,和现有任务完全隔离。但是这样会带来一些新的问题,第一个就是它的随着生产任务增长,这些 Table Service 任务本身管理就是一个问题;第二个是一个任务绑定的一个 Table Service 任务,每次做调整的时候,可能需要多个任务一起调整,导致维护成本较高。
Hudi 表服务在字节的主要落地场景是 LAS 湖仓一体分析服务,下面介绍一下在 LAS 在落地过程中所面临的挑战。
LAS数据湖落地挑战
LAS 数据湖落地的挑战有以下几点:
- 实时入湖稳定性差
- 用户理解成本高
- 任务占用资源高
- 任务管理成本高
针对上述问题,我们设计了一个表管理服务,也就是 Table Management Service(TMS),通过表服务托管的方式来对现状做一些优化。
为了方便大家理解整个场景,下面主要介绍一下整个 湖仓一体分析服务 LAS 。
LAS 全称是 Lakehouse Analysis Service,湖仓一体分析服务。融合湖与仓的优势,既能够利用湖的优势,将所有数据存储到廉价存储中,供机器学习、数据分析等场景使用。又能够基于数据湖,构建数仓,供 BI、报表等业务场景使用。
LAS 的整体架构,第一层是湖仓开发工具,然后是分析引擎,支持批流一体 SQL,一套 SQL 既能用于流作业又能用于批作业。并且我们支持引擎智能选择及加速,根据 SQL 特点自动路由到 Spark、Presto、Flink 执行。再往下是统一元数据层。第四层是批流一体存储。
LAS 整体架构存算分离,计算存储可以按需扩展,避免资源浪费。因为存算分离,所以一份数据可以被多个引擎分析,相较存算一体,TCO 可以下降 5-30%。并且我们还支持动态的弹性扩缩容,进一步降低用户成本。
那么如何基于 LAS 构建企业级的实时湖仓?无论是离线数据还是实时数据都可以直接放到 LAS 的统一批流一体存储中。需要实时处理的数据,可以利用 LAS 的 streaming 能力,流读流写,流式的写入下一层表中,层层构建 ODS、DWD等层级关系。那如果需要进行离线回溯,不需要换存储,直接通过批流一体 SQL 运行离线任务。
LAS 的批流一体存储层是基于开源的 Apache Hudi 构建,在整个的落地过程中,我们遇到了一些问题。Apache Hudi 仅支持单表的元数据管理,缺乏统一的全局视图,会存在数据孤岛。Hudi 选择通过同步分区、表信息到 Hive Metastore Server 的方式,提供全局的元数据访问。但是,两个系统之间的同步无法保证原子性,会有一致性问题。因此,缺乏一个全局的、可靠的视图。
整体介绍
TMS (Table Management Service) 是全托管的 Hudi 表优化服务,提供高可用,可扩展,高性能的表服务管理。主要负责表服务异步调度、执行和监控在内的全生命周期管理,提高 Hudi 写入稳定性及查询性能,对用户屏蔽数据湖底层技术细节专注聚焦业务,让用户可以不用去关心各种 Table Service,比如Compaction、Clean 等等这些操作。
技术架构
下面介绍 Table Management Service 的服务架构,主要包括以下 6 个模块。
- Event Receiver / Handler:事件接收器,同时处理一些外部请求。
- Scheduler:负责任务的调度(一个表操作对应一个tms 任务),将符合执行条件的任务根据一系列分配规则投递执行。
- Executor:任务执行器,提交任务的同时并管理运行时任务。
- Resource Manage:资源管理器,负责任务执行的资源。
- Monitor:监控组件,监控队列资源使用和任务执行情况,同时还会去检查 Table Service 任务的执行情况。例如执行失败或者执行超时等等,会将对应的 event 发到监控系统里面,进行一些报警和面板展示。
- Restore / Retry Service,服务重启 / 任务重试的处理器,分别负责服务重启状态恢复和失败任务重试。
执行计划生成流程
我们先看 Plan Generator 和 Meta Server之间的交互逻辑。当 TMS 监听到 MetaServer 侧传递的 instant commit 事件后, Plan Generator 决定本次是否需要生成新的 action plan,若需要,则向 metaServer 提交一个 requested 状态对应异步操作的 instant,表示该 action 后续需要被执行。提交成功后记录本次 action.requested 相关信息,如表名,时间戳,状态等,等待调度执行。
异步生成执行计划
刚刚既然说到了 TMS,支持在系统外去生成一个计划。那在什么情况下并发?在并发场景下,为什么会存在数据不一致的问题?我们在下方的图来大概解释一下。
- 时间节点描述:
-
- t1:Client1 提交dc1,开始写入数据
- t2:Client2 提交dc2,开始写入数据
- t3:调度 Compaction Plan 并提交(此时 dc1 和 dc2 写入未完成,存在数据丢失)
- t4:Client3 提交dc3,开始写入数据(此时以 t3 作为 basetime ,符合预期)
- t5:Client1 写入完成,Commit dc1
- t6:Client2 写入完成,Commit dc2
- t6':补偿 t3 时刻生成 Plan 缺少的数据 (此刻以 t3 前一个版本作为 basetime 的所有写入完成)
如上时间线所述,若 client1 和 client2 分别在 t1 和 t2 提交 dc, t3 执行 cc,则该时刻生成的 Compaction Plan 不包含 dc1 和 dc2,可能会因 Compaction Plan 不完整而引起数据丢失;为解决该问题,我们在 t6'(以 t3 前一个版本作为 basetime 的 dc commit 全部完成) 引入一个 Sync 逻辑,修改 Plan,填补丢失的数据
既然把 schedule plan 放到外面,我们不希望 schedule plan 过程它本身会导致写入任务失败,因此解决方式是生成一个补偿计划。在 Data commit 1 和 Data commit 2 完成之后,Compaction 会把这个 dc3 进行补偿。
核心功能
资源管理
表管理服务的资源管理主要由资源调度和资源优化这 2 个方面组成。资源调度以负载均衡为主,其它调度策略为辅,多层分配方式:
- 负载均衡调度:根据任务可使用的队列列表、队列资源使用情况以及任务申请资源进行分配队列
- 优先级调度:根据表保障级别限制可使用队列,高优任务会优先调度到高优保障的独占队列,低优任务会调度到混部的非保障非高优保障队列
- 有限使用调度:根据库/表服务的类型,规定可使用的公共队列并或者专用队列。
资源优化则通过自适应调度、动态计算资源、资源限制来实现。以 Compaction 为例,不同的 Compaction 任务需要的资源相差较大,另外对于数据倾斜严重的任务,同一个Compaction Plan 中的 FileGroup 之间数据量可能存在很大差异,上述两种情况可能会引起资源浪费和任务执行稳定性差。所以我们会根据表服务任务实际需要申请资源,从而达到自适应温度;同时以 Compaction Plan 中最大的 File Group Size * 1.4 作为使用内存;最后限制最大并发度,避免资源被异常任务不受限制的侵占达到资源限制的目的。
监控管理
表管理服务的监控管理主要由服务监控和任务监控这 2 个方面组成。服务监控主要有以下 4 种:
- 服务存活监控:定期进行服务探活
- 组件职能监控:调度器是否正常调度任务、API接口是否正常响应等
- 组件性能监控:任务调度速率、Pending 任务堆积情况等,通过一些 metrics 接口对外进行了暴露
- 服务资源使用监控:监控 CPU 、内存等指标,了解服务的整体负载情况
任务监控主要为表任务类型结合告警类型,并且辅以黑名单机制和智能探测,避免无效告警的监控方式。其中告警类型包含这些:运行失败并且重试次数超过阈值、运行超时、长时间未调度执行。另外黑名单机制主要由告警类型和黑名单类型两部分组成,包括表级别黑名单、库级别黑名单、用户级别黑名单和队列级别黑名单。
数据备份与恢复
数据备份通过表级别注册的方式,Table Management Service 每天定时生成数据备份和数据清理任务,实现数据备份和历史数据清理,通过读取最新历史备份版本中的稳定数据和增量数据构建新的备份版本来实现数据备份,通过设置保留历史版本数,数据清理任务删除过期的版本来进行数据清理。
另一方面,我们通过使用最新的备份版本和增量消费源数据的方式进行恢复,从而大大降低恢复成本,彻底解决由于源数据不存在而导致的不可恢复问题。
多实例 / 高可用
Table Management Service 的模型分为三层:
- Service
-
- 负责响应外部请求并进行处理
- Worker
-
- 负责任务调度以及定期的失败重试等操作
- Manager
-
- 主要负责维护心跳,实例管理、监控告警
其中 Manager 为 Master / Slave 模型,同一时间只有一个实例会进行工作,Service 和 Worker 则为 Serverless 模型。
我们可以通过上图了解主从选举维护的一个简单流程。首先 Manager 会启动,接着它会检查是否存在 Manager Master,如果存在,会检查心跳是否过期。心跳如果没有过期,竞争 Master 失败,回来重新检查心跳是否过期。如果过期或者不存在,它会写入数据,在 database 存储中写入数据, 如果写入成功,它就成为 Master,并且维护心跳。如果失败,他就会继续查看租约内是否有心跳。如果没有,它会继续尝试写入 Master,如果存在,就竞争 Master 失败,直接去定期检查是心跳是否过期。它就成为 worker 节点。
其余功能
- 运维 Ops:提供了丰富的任务管理工具,支持手动重跑,挂起任务等运维操作
- Compaction TTL:对于不按照时间分区的数据,根据指定时间字段进行 TTL 操作
- 灰度升级:根据队列、库、表灰度升级执行任务所依赖的 Engine (Spark / Flink) 资源
- 异步的索引构建:提升读取性能,目前这部分就是索引构建的一部分,还没有集成到整个 TMS。
- 物化视图:通过 TMS 做个管理,物化视图其实主要是通过预计算的方式来解决高频子查询、重复计算的问题。假设同一个查询,如果没有物化视图,需要每次查询的时候去重新计算一遍,但如果有了物化视图,就相当于是可以通过一个查询构建出一个临时视图。以后每次查询都可以从视图直接去读取。并且对于快速变化指标,可以用物化视图的方式来进行构建,无需用户维护数据加工的链路,从而通过 TMS 来维护物化制作管理。
- 智能探测:通过队列使用资源,同时结合多个参数,提前判断任务可能执行超时或者执行失败的一些预警,而不是在真正超时或者失败之后才收到报警,从而提升响应时间。
以上就是字节跳动在 Apache Hudi 数据湖表优化管理服务上的实践,目前均已通过火山引擎 湖仓一体分析服务 LAS 产品对外服务,欢迎对这方面有需求、感兴趣的用户都可以积极地来体验一下我们的 LAS 湖仓一体分析服务 。
火山引擎 湖仓一体分析服务 LAS(Lakehouse Analytics Service)是面向湖仓一体架构的
Serverless 数据处理分析服务,提供字节跳动最佳实践的一站式 EB 级海量数据存储计算和交
互分析能力,兼容 Spark、Presto、Flink 生态,帮助企业轻松构建智能实时湖仓。
飞书扫码,沟通交流,1v1 咨询