导读: 字节跳动数据湖架构是在 Apache Hudi 开源版本基础上的再次迭代,在 Hudi 落地的过程中,字节跳动也遇到了各种类型的问题,尤其是在索引使用的效率上。针对自身实际的业务场景和数据规模,字节也提出了新的索引实现方式并且贡献到了社区。本次分享主要介绍字节跳动数据湖索引演进。
全文包括以下四部分:
- Hudi 索引介绍
- 问题与挑战
- 字节数据湖索引演进
- 未来规划
分享嘉宾|耿筱喻 字节跳动 火山引擎LAS研发工程师
编辑整理|吕宴全 浙江大学
出品平台|DataFunTalk
1.1 传统数仓数据更新
传统数据仓库的数据更新方法主要是将增量数据与历史的全量数据进行关联,生成最新的全量数据,再重新写入对应的分区。对整个过程进行拆解可以分成 三个主要耗时的操作,分别是:① 读历史的全量数据② 将全量历史数据与增量数据进行关联生成新的全量数据③ 重新写入全量数据
以一个极端的场景为例:假设用户只需要更新 1 条数据,但是历史的全量数据有 100 个文件,每个文件大小有 1G,那么更新这 1 条数据就至少需要 200G 的数据 IO,这样的开销太大了。这就引入了一个值得思考的问题:数据更新的场景下,是否有必要每次读写全部文件?
1.2 Hudi 索引作用
Hudi 为了支持高效的数据更新,减少更新过程中的 IO 操作,引入了索引的概念:索引将数据的主键与文件名进行映射,可以快速找到未更新数据所在的文件,有效地减少读取和写入文件的数量。
1.3 Hudi 索引类型
当前 Hudi 社区支持的索引类型主要包括以下四个:① Bloom Filter Index。这是默认的索引方案,基于布隆过滤器实现,索引信息存储在 Parquet 文件的 Footer 当中。② Hbase Index。索引信息存储在 Hbase 数据库上。③ Bucket Index。字节提出的一种基于哈希的实现,不需要额外存储索引信息,可以直接根据文件名映射构建索引。④ Flink State。Flink 数据入湖的默认实现方式,索引信息存储在 Flink 的State 中。
字节跳动中数据入湖的典型业务场景主要有两种,一是通过 Flink SQL 的实时 Upsert,二是通过 Spark 的离线批量更新。随着数据湖中数据规模的增加,单个分区的 File Group 达到四万,这时更新的速度非常缓慢。这是因为默认的布隆过滤器需要读取全部的文件的 Footer,涉及到了大量文件的 Open/Close 操作,并且布隆过滤器的假阳性问题会随着数据规模的增加而变得严重。“假阳性”是指布隆过滤器只能判断数据一定不在某个文件中,但不能保证数据一定在某个文件,因此会出现多个文件都可能存在某条数据,需要读取所有的 File Group 才能进行准确判断。在超大规模的数据场景下,这种方式几乎是不可用的。
其他索引类型存在的问题: ① HBase Index。业务方不希望引入额外的依赖组件,并且 HBase 集群的维护也需要成本。② State Index。只支持 Flink 类型任务,不支持跨引擎共享;多个 Flink 作业之间不能共享 State,不支持并发。
3.1 Bucket Index
在超大规模数据的场景下,我们期望一种足够轻量并且高效,能够保证更新的时效性,经过实践探索中,字节提出了一种 基于哈希的索引实现,即 Bucket Index。Bucket Index 将所有的分区分成一定数量的桶,每个桶对应一个 File Group,桶标识和 File Group 标识一一对应,通过哈希函数决定某条数据分配到某个桶里,相同标识 Key 的记录一定会落入到相同的桶里。
3.2 写入流程
写入操作可分为 修改已有数据 和 写入新数据 两种。首先需要根据写入数据的索引键计算哈希,将哈希值对分桶数进行取模运算快速定位到对应分桶。如果这个分桶对应的 File Group 是存在的,那么就直接写入或者更新数据,否则创建新的 File Group 再进行写入。
随着数据规模的增长,还需要 增加某个分区下分桶的数量。但是分桶数是一个表级别的参数,所有分区的分桶数都按照建表时的参数进行约束,增大分桶数就需要重写全部的历史数据。但是全量数据重写也是代价高昂的,并且对于历史分区的数据量可能比较小,采用较大的新分桶数可能会带来小文件问题。如下图所示,字节跳动引入了分区级分桶数,历史分区仍然按照第一次写入时的分桶数,而新增的分区则按照当前的分桶数构建。每个分区的分桶数信息会被写入到 Hudi Metastore 上。Hudi Metastore 是字节提出的针对数据湖的统一元数据存储方案。
3.3 查询优化
由于分桶是 Hive 中常用的分区优化方案,各个查询引擎都对 Bucket 表查询进行了不同程度的优化。以 Spark 为例,支持简单的查询优化,例如两个按照 A 列分桶的表进行 Join,可以对 Join 之前的 Shuffle 进行消除。在此基础上,字节进行了更多模式的优化。
第一个查询优化是对 Join 列是分桶列的超集的情况,可以在 Join 时消除或者减少 Shuffle。
第二个查询优化是在分桶数和 Shuffle 并行度成倍数的情况下:
① 当分桶数小于 Shuffle 并行度时,可以通过 Coalesce 减少 Shuffle
② 当分桶数大于 Shuffle 并行度时,可以通过并行读取,相当于用 HDFS 的IO 置换 Shuffle IO。
第三个查询优化是在查询语句里有过滤条件操作时 ,例如有按值查找的情况,由于相同的索引值映射到同一个分桶里,可以优化成只查询一个 File Group。
3.4 可拓展哈希
前面提到了修改分桶数需要重写数据文件,字节调研了几种可拓展哈希的方式,包括了 Extensible Hash、Consistent Hash 和 Linear Hash 等,最后选用了 Extensible Hash 的方案。① Consistent Hash 针对修改分桶数之后的数据重分布,利用一个首尾相接的哈希环移动节点数据完成对桶增加和删除。能够支持灵活的分桶数修改,但是不能根据数据分布进行查询优化,因为计算引擎不能根据数据找到对应的 File Group。② Linear Hash 适用于大部分桶数据溢出较多的场景,利用是 Round-Robin 增加新桶,必须按照顺序拆分数据桶,在最坏的情况下需要等待前面全部的桶都拆分之后才能分割当前桶。③ Extensible Hash 解决的是单个分桶写满之后的拓展问题,核心思路是对单个桶的拆分和合并,只改变部分分桶的物理分布,但是逻辑层的映射依然保持不变,可以复用计算引擎的查询优化。
Extensible Hash 的原理是划分全局分桶数和局部分桶数。对数据量比较大的桶进行拆分时,按照全局分桶数的 2 的倍数进行扩张;对于合并时,也是按照 2 的倍数进行合并。定位文件时,先通过全局分桶数定位到具体的分桶,再通过局部分桶数定位到特定的 File Group。
3.5 非主键索引
非主键索引不需要对原有的数据进行更新,每次都是在文件末尾追加数据,因此相比于桶索引,不需要进行去重操作,少了根据索引值去找对应 File Group 的操作,数据重分布也不是必须的,删去这两个操作可以提升数据入湖的时效性。非主键索引在入湖的过程中,会找到一个数据量较小的 File Group 追加写,如果写入数据量达到阈值,会重新找到一个 File Group 进行写入。
在 Hudi 0.11 版本中提出了一种 Multi-Modal Index 的特性,这是因为Bloom Filter 是在 Parquet 文件的 Footer 中保存索引信息的,每次构建索引都需要读取全量文件,社区中考虑基于 Hudi Table 来存储索引信息,会支持异步构建初始的索引表,在表的索引信息更新时也会取更新这张 Hudi Table 表。基于这个特性,我们可以在上面存储二级索引相关的信息,提升非主键列的点查性能。
另外一个规划的研究方向是 Range Index,字节希望能够借此提升点查和范围查询的效率。一种实现思路是对 Hudi 的 Base File 按照 Range Key 进行排序,然后存储每个文件的 Min/Max 值,可以基于前述的 Multi-Modal Index 去存储,根据存储的 Min/Max 值进行查询判断,过滤不必要的文件。
最后,列举下字节在 Hudi 社区的已有工作,希望在未来能够做出更多的开源贡献。
Q1:分桶列和关联列不相同的情况下,支持查询优化吗?
A1:不支持,所以在设计分桶列的时候需要尽可能的考虑将会频繁使用作为关联的列,选择最为常用的关联列作为分桶列。
Q2:可以支持多个列作为分桶列吗?
A2:支持的。
Q3:Hudi 在字节的应用场景有哪些?
A3:Hudi 在字节的主要应用是基于 COW 的特性替换了存量 Hive 表,第二个应用场景是在实时入湖基于 MOR 特性替换了 ODS 层,第三是基于 Hudi 存储和 Flink 连接构建了完整的实时数仓,整体的实效性达到分钟级别。
Q4:全部 Hive 表都替换成 Hudi 表了吗,有什么优劣?
A4:在某些场景下做到了 80%~90%。主要将 Hive 表替换成了 Hudi 的 COW表,大部分都是优势,比如更新代价低等等。劣势的话,文件数相比原先 Hive 表有增加。
Q5:Hudi Schema 自动更新有什么解决方案吗?
A5:Hudi 目前已经支持了 Schema Evolution,字节在元数据服务上也做了一些适配工作。
Q6:在字节里对 Hudi 的 OLAP 查询,主要使用哪些执行引擎?
A6:主要用 Spark 和 Presto。
Q7:对 Hudi 和 Iceberg 的技术调研是怎么样的?
A7:字节跳动对数据湖应用起步比较早,当时选择 Hudi 的主要原因是支持MOR 特性,并且 Iceberg 对各种计算引擎的支持还不完善。
Q8:在落地 Hudi 过程中遇到最大的问题是什么?
A8:实现起来比较困难的是数据湖的元数据管理,Hudi 通过 Hive Sync 的方式将元数据一致性暴露给计算引擎,但是 Hive Sync 可能失败,存在元数据一致性的问题,并且 Hudi 依赖 File Listing 构建 Snapshot 元数据信息,但是在文件数量很多的情况下这会很耗时,在解决这些问题上花费了较多时间。
Q9:生产环境升级 Hudi 和 Flink 版本升级会影响业务吗?
A9:升级 Hudi 版本很少遇到需要重启 Flink 作业的情况,大部分问题都可以正常迭代解决,对业务影响较少。
Q10:Hive 切换 Hudi 需要用户修改作业吗?
A10:在 Hive 和 Hudi 上使用的 SQL 语义是一致的,用户层面是无感知的。
Q11:字节的 TimeLine 最长会维护多久呢?
A11:字节的 TimeLine 是维护在 MySQL 中,理论上是没有限制的。我们会根据用户的需求返回对应 TimeLine 长度,但是如果用户指定的 TimeLine 过长,查询效率也会降低。
Q12:支持秒级别事务吗?
A12:目前不支持,但是未来计划会支持。
火山引擎 湖仓一体分析服务 LAS(Lakehouse Analytics Service)是面向湖仓一体架构的
Serverless 数据处理分析服务,提供字节跳动最佳实践的一站式 EB 级海量数据存储计算和交
互分析能力,兼容 Spark、Presto、Flink 生态,帮助企业轻松构建智能实时湖仓。
飞书扫码,沟通交流,1v1 咨询