字节跳动基于 Apache Hudi 构建实时数仓的实践

数据中台大数据数据湖仓

picture.image

picture.image

分享嘉宾:张友军 字节跳动

编辑整理:王宇翔

出品平台:DataFunTalk

导读: 今天很高兴能与大家分享字节数据平台在实时数仓中的一些实践。目前在数据湖和 Hudi 相关的一些基本技术原理方面社区已有较多的介绍,所以我们今天的分享主要聚焦于实践部分的内容。

今天我们的分享主要分为四个部分:

  • 实时数仓场景介绍
  • 数据湖在实时数仓场景初探
  • 数据湖在实时数仓典型场景实践以及深度优化
  • 未来规划
1. 实时数仓场景介绍

picture.image 为了数据湖更好的落地,我们在落地之前与业务做了一些深入的沟通,并根据不同业务的特点主要分为了三个场景:

①场景一典型的业务主要是短视频和直播,它的数据量级一般都比较大,例如大流量的日志数据,其计算周期一般是自然的天、小时或者分钟级别的,实时性的要求一般是五分钟内,主要诉求是批流的复用,可以容忍少量数据的不一致。

②场景二一般是直播或者电商的部分场景,数据量一般是中等体量,为长周期计算,对于实时性的要求一般是一分钟以内,主要诉求是低成本的数据回溯以及冷启动。

③场景三主要是电商和教育的一些场景,一般都是小规模的业务数据,会对数据做全量计算,其实时性要求是秒级的,主要诉求是强一致性以及高 QPS。

我们结合这些特点基于数据湖做了一些成套的解决方案,接下来我们会基于实际的一些场景和案例一一去了解。

2. 实时数仓场景初探

本节我们讨论的是字节实时数仓场景的初探以及遇到的问题和解决方案

坦白地讲,在最初落地时大家对数据湖能支持线上生产的态度都是存疑的,我们开始的方案也就比较保守。我们首先挑选一些对比现有解决方案,数据湖具有凸显的优势的场景,针对其中的一些痛点问题尝试小规模的落地。

picture.image

离线数仓有两个比较大的问题,一个是时效性问题,现状一般是天或小时级;第二个比较大的问题是更新问题,例如需要更新某个小时内的部分数据,现状需要将分区内数据全部重刷,这样的更新效率是很低的。对于这样的场景,数据湖兼具时效性和高效更新能力。同时相对于实时数仓来说,数据湖可以一份存储,批流两用,从而直接进行高效的数据分析。

基于以上对业务的分析,我们会按照以下步骤来做一线的落地。

picture.image

2.1 基于视频元数据的落地方案

看上图我们原有的方案有三个 Hive 表,Hive Table 1,2,3。对于整个链路来说我们会把左边 MySQL 数据源的数据导到 Table 1 中,右边 Redis 的数据导到 Table 2 中,然后将两个表做 Join。这里存在两个比较大的问题,一个是高峰期的资源占用率较高,因为天级 Dump 数据量较大,且都集中在凌晨;二是就绪时间比较长,因为存在去重逻辑,会将 T-1 天分区的数据和当天分区的数据合并去重计算后落到当天(T 天)的分区。

picture.image

我们通过引入 Hudi 把天级的 Dump 分摊到每个小时进行 Upsert。由于Hudi自身可以支持去重的逻辑,我们可以将 Table 1 看成一个实时的全量数据,当小时级别(例如 23 点)的数据一旦 Upsert 完成之后,我们就可以直接进行下游的 Join 逻辑,这样的话我们可以将数据的就绪时间提前 3.5 个小时左右,高峰期的资源消耗可以减少 40%。

picture.image

2.2 近实时数据校验方案

对于实时场景来说,当实时任务进行一个比较频繁的变更,比如优化或者新增指标的改动,一般需要校验实时任务的产出是否符合预期。我们当前的方案是会跑一个小时级别的 Job,将一个小时的数据从 Kafka Dump 到 Hive 之后再校验全量数据是否符合预期。在一些比较紧急的场景下,我们只能抽查部分数据,这时候就对时效性的要求就比较高。在使用基于的 Hudi 方案后,我们可以通过 Flink 将数据直接 Upsert 到 Hudi 表中,之后直接通过 Presto 查询数据从而做到全量数据近实时的可见可测。从线上效果来看可以极大提高实时任务的开发效率,同时保证数据质量。

picture.image

在以上探索过程中遇到了比较多的问题,第一个问题就是易用性比较差,运维成本和解释成本比较高。对于易用性这一部分,我们起初是通过脚本来提交 SQL,可以看到 SQL 中的参数是比较多的,并且包含 DDL 的 Schema,这在当列数比较多的情况下是比较麻烦的,会导致易用性较差,并且对业务侧来说也是不可接受的。

picture.image

对于以上问题我们做了一个针对性的解决方案,首先我们对之前的任务提交方式替换为了纯 SQL 化提交,并且通过接入统一的 Catalog 自动化读取 Schema 和必要参数,入湖的 SQL 就可以简化为如图的形式。

3. 典型场景实践

picture.image

接下来让我们看字节目前基于 Hudi 的实时数仓整体链路

可以看到,我们支持数据的实时入湖,例如 MySQL,Kafka 通过 Flink 可以直接落到 Hudi;也支持进行一定的湖内计算,比如图中左下将 MySQL 数据通过 Flink 导入 Hudi 进一步通过 Flink 做一些计算后再落到 Hudi。在数据分析方面,我们可以使用 Spark 和 Presto 连接看板 BI 进行一些交互式查询。当我们需要接到其他在线系统,尤其是 QPS 较高的场景,我们会先接入到 KV 存储,再接入业务系统。

让我们来看具体场景。

picture.image

3.1 实时多维汇总

对于一个实时多维汇总的场景,我们可以把 Kafka 数据增量写入到 Hudi 的轻度汇总层中。对于分析场景,可以基于 Presto 按需进行多维度的重度汇总计算,并可以直接构建对应的可视化看板。这个场景对 QPS 和延迟要求都不是很高,所以可以直接构建,但是对于高 QPS 和低延迟诉求的数据产品场景,目前的一个解决方案是通过 Presto 进行多维度预计算,然后导入到 KV 系统,进一步对接数据产品。从中长期来看我们会采取基于物化视图的方式,这样就可以进一步去简化业务侧的一些操作。

picture.image

在以上链路中,我们也遇到了比较多的 问题

①写入稳定性差。第一点就是 Flink 在入湖的过程中任务占用资源比较大,第二点是任务频繁重启很容易导致失败,第三点是 Compaction 没有办法及时执行从而影响到查询。

②更新性能差。会导致任务的反压比较严重。

③并发度难提升。会对 Hudi Metastore Service(目前字节内部自主研发的 Hudi 元数据服务,兼容 Hive 接口,准备贡献到社区)稳定性产生比较大的影响。

④查询性能比较差。有十分钟的延迟甚至经常查询失败。

面对这些问题,我接下来简单介绍一下针对性的一些解决方案:

picture.image

①写入稳定性治理

这一块我们通过异步的 Compaction + Compaction Service 的方案去解决这个问题。我们之前 Flink 入湖默认是在 Flink 内部去做 Compaction,发现这一步是暴露以上一系列问题的关键。经过优化,Flink 入湖任务只负责增量数据的写入,以及 Schedule Compaction 逻辑,而 Compaction 执行则由 Compaction Service 负责。具体而言,Compaction Service 会从 Hudi Metastore 异步拉取 Pending Compaction Plan,并提交 Spark 批任务完成实际的 Compact。Compaction 执行任务与 Flink 写入任务完全异步隔离,从而对稳定性有较大提升。

picture.image

②高效更新索引

支持数据量级的大幅提升。简单来说,我们可以基于哈希计算快速定位目标文件,提升写入性能;同时可以进行哈希过滤,从而也可以进行查询分析侧的优化。

picture.image

③请求模型的优化

当前的 Hudi 社区版的 WriteTask 会轮询 Timeline,导致持续访问 Hudi Metastore,从而造成拓展能力受限的问题。我们将 WriteTask 的轮询请求从 Hudi Metastore 转移到了对 JobManager 缓存的拉取,这样就能大幅降低对 Hudi Metastore 的影响。经过这个优化可以让我们从几十万量级的 RPS(Request Per Sec)提升到近千万的量级。

接下来我们来讲一下查询相关的优化。

picture.image

④MergeOnRead 列裁剪

对于原生的 MergeOnRead 来说,我们会在全量读取 LogFile 和 BaseFile 之后做合并,这在只查询部分列的时候会造成性能损耗,尤其是列比较多的情况。我们所做的优化是把列的读取下推到 Scan 层,同时在进行 log 文件合并时,会使用 map 结构存储 K,V(K 是主键,V 是行记录),之后对行记录做列裁剪,最后再进行 Log Merge 的操作。这样会对序列化和反序列化开销以及内存使用率都有极大降低。

picture.image

⑤并行读优化

一般引擎层在读 Hudi 时,一个 Filegroup 只对应一个 Task,这样当单个 FileGroup 数据量较大时就极易造成性能瓶颈。我们对此的优化方案是对 BaseFile 进行切分,每个切分的文件对应一个 Task 从而提高读并行度。

picture.image

⑥Combine Engine

Hudi 社区版目前在内存中对数据的合并和传输的实现完全是基于 Avro 格式,这会造成与具体引擎对接时有大量的序列化与反序列化计算,从而导致比较大的性能问题。对于这个问题我们与社区合作做了 Combine Engine 的优化,具体做法就是将接口深入到了引擎层的数据结构。例如在读取 FileGroup 时我们直接读取的就是 Spark 的 InternalRow 或是 Flink 的 RowData,从而尽量减少对 Avro 格式的依赖。这样的优化可以极大地提高 MergeOnRead 和 Compaction 的性能。

接下来两个优化由于时间原因就不做详细介绍了。

picture.image

⑦实时数据分析

这个场景我们可以把明细数据直接通过 Flink 导入到 Hudi 中,还会根据 DIM 表做一个宽表的处理从而落到 Hudi 表。这个场景的诉求主要有两点,一个是日志型数据的高效入湖,另一个是实时数据的关联。对于这两个场景的诉求,我们针对性的进行了一些优化。

picture.image

⑧日志型数据高效入湖

对于日志型数据,我们支持了 NonIndex 的索引。Hudi 社区版主要支持是基于有主键的索引,比如 Bloom Filter 或者是我们给社区提供的 Bucket Index。生成基于主键的索引方式主要会有两个步骤,第一个步骤是数据在写进来的时候会先对数据做定位,查询是否有历史数据存在,如果有的话就 Update,没有的话就 Insert,之后会定位到对应的文件把数据 Append 到 Log 中。然后在 Merge 或者在 Compaction 的过程中要在内存中做合并与去重处理,这两个操作也是比较耗时的。对于 NonIndex 来说,是不存在主键的概念的,所以支持的也是没有主键的日志型数据入湖。这样对于日志型数据在写入时可以直接 Append 到 Log File 中,在合并的过程中,我们可以不做去重处理,直接将增量数据数据 Append 到 Base File 中。这样就对入湖的效率有了很大的提升。

picture.image

⑨实时数据关联

针对目前实时 Join 出现的一系列问题,我们基于 Hudi 支持了存储层的关联。对 Hudi 来说不同的流可以完成其所对应列的写入,并在 Merge 的时候做拼接,这样对于外界查询来说就是一个完整的宽表。具体来说,在实时数据写入的过程中有一个比较大的问题是怎么处理多个流的写入冲突问题。我们主要是基于 Hudi Metastore 来做冲突检测。

picture.image

对于读的流程,我们会先将多个 LogFile 读入内存进行 Merge,然后再与 BaseFile 进行最终 Merge,最后输出查询结果,Merge 和 Compaction 都会使用到这个优化。

4. 未来规划

picture.image

4.1 弹性可扩展的索引系统

我们刚刚介绍了 Bucket Index 支持大数据量场景下的更新,Bucket Index 也可以对数据进行分桶存储,但是对于桶数的计算是需要根据当前数据量的大小进行评估的,如果后续需要 re-hash 的话成本也会比较高。在这里我们预计通过建立Extensible Hash Index 来提高哈希索引的可扩展能力。

picture.image

4.2 自适应的表优化服务

为了降低用户的理解和使用成本,我们会与社区深度合作推出 Table Management Service 来托管 Compaction,Clean,Clustering 以及 Index Building 的作业。这样对用户来说相关的优化都是透明的,从而降低用户的使用成本。

picture.image

4.3 元数据服务增强

目前我们内部已经使用 Hudi Metastore 稳定支持了一些线上业务,但是也有更多需求随之而来,预计增强的元数据服务如下:

①Schema Evolution:支持业务对 Hudi Schema 变更的诉求。

②Concurrency Control:在 Hudi Metastore 中支持批流并发写入。

picture.image

4.4 批流一体

对于流批一体处理,我们的规划如下:

①Unified SQL:做到批流统一的 SQL 层,Runtime 由 Flink/Spark/Presto 多引擎协同计算。

②Unified Storage:基于 Hudi 的实时数据湖存储,由 Hudi 来做统一的存储。

③Unified Catalog:统一元数据的构建以及接入。

5. 问答环节

Q1:MergeOnRead 列裁剪的文件格式是列式的还是行式的?

A1:在没有优化前是基于Avro的行式存储我们目前已经支持了Parquet Log的列存格式,能够带来存储以及查询性能上的提升。


Q2:Async Compaction的调度是在Flink内部访问的Hudi Metastore吗?

A2:Flink之前默认是做了三件事,第一件事是Hudi增量写,第二件事是在几次增量写之后会Schedule一个Compaction计划,第三件事是执行Compaction Plan。我们目前所做的事情是只把Compaction Plan执行的这一步拆分了出来使用Compaction Service拉取Hudi Metastore元数据来执行Compaction。(图见写入稳定性治理)


Q3:Hudi 表是如何管理的?

A3:通过Hudi Metastore。目前我们使用的Hudi Metastore主要是部署在MySQL上面,支持文件与库表元数据的管理,Snapshot 服务,Hudi自有的Timeline 服务以及一些并发控制的处理。上层接口我们完全兼容Hive Metastore的接口标准,然后基于Hudi特性,我们拓展了Hudi特有的例如Timeline 相关的接口。同时我们支持流批应用统一接入到Hudi Metastore去做数据处理。(图见元数据服务增强)


Q4:可以深入介绍一下多流写Hudi的流程以及冲突解决吗?

A4:多流Hudi增量写入,不同的流可以写到不同的Log文件中去,这一点是不冲突的,会产生冲突的点是二阶段的提交冲突,如果涉及到写的是同一个列的话是会产生列冲突的,这时我们会在Hudi Metastore中进行列级别的冲突检测,如果有冲突的话会直接拒绝提交,没有的话我们会认为是写两个流,就可以正常并发写入。(图见实时数据关联 — 写流程)


Q5:实时数仓里的Kafka流表和Hudi流表是什么关系,以后是否会用Hudi表来代替Kafka的流表?

A5:我们今天介绍主要是Hudi支持的一些近实时(分钟级别)场景的落地和尝试,在某些场景我们需要秒级的响应,这样就需要我们做一些流批一体的规划和尝试。长期来看我们会使用Hudi 流式能力来替换Kafka的流式能力。(图见未来规划 – 流批一体)


Q6:实时数仓是否每一层都用Hudi?

A6:湖内计算还在小范围的推广阶段,某些场景我们正在做POC,个别场景也在准备上线的过程中。


Q7:为何会使用Bucket Index?

A7:在使用Bucket Index前我们使用的是Bloom Filter Index,布隆过滤器在小数据量场景使用是没有问题的,但在百TB级别的数据下会有突出的假阳性的问题,当数据不存在的时候会扫描很多非必须的文件造成资源浪费。通过Bucket Index 我们可以直接通过hash值的计算能更加快速的定位数据所在的文件。


火山引擎 湖仓一体分析服务 LASLakehouse Analytics Service)是面向湖仓一体架构的
Serverless 数据处理分析服务,提供字节跳动最佳实践的一站式 EB 级海量数据存储计算和交
互分析能力,兼容 SparkPrestoFlink 生态,帮助企业轻松构建智能实时湖仓。

image.png

飞书扫码,沟通交流,1v1 咨询

679
0
0
0
关于作者
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论