字节跳动实时数据湖构建的探索和实践

数据湖仓

本文是字节跳动数据平台开发套件团队在Flink Forward Asia 2021: Flink Forward 峰会上的演讲,着重分享了字节跳动数据湖技术上的选型思考和探索实践。

image.png

文 | Gary Li  字节跳动数据平台开发套件团队高级研发工程师,数据湖开源项目Apache Hudi PMC Member

随着Flink社区的不断发展,越来越多的公司将Flink作为首选的大数据计算引擎。字节跳动也在持续探索Flink,作为众多Flink用户中的一员,对于Flink的投入也是逐年增加。

字节跳动数据集成的现状

在2018年,我们基于Flink构造了异构数据源之间批式同步通道,主要用于将在线数据库导入到离线数仓,和不同数据源之间的批式传输。

在2020年,我们基于Flink构造了MQ-Hive的实时数据集成通道,主要用于将消息队列中的数据实时写入到Hive和HDFS,在计算引擎上做到了流批统一。

到了2021年,我们基于Flink构造了实时数据湖集成通道,从而完成了湖仓一体的数据集成系统的构建。

image.png

字节跳动数据集成系统目前支持了几十条不同的数据传输管道,涵盖了线上数据库,例如Mysql Oracle和MangoDB;消息队列,例如Kafka RocketMQ;大数据生态系统的各种组件,例如HDFS、HIVE和ClickHouse。

在字节跳动内部,数据集成系统服务了几乎所有的业务线,包括抖音、今日头条等大家耳熟能详的应用。

整个系统主要分成3种模式——批式集成、流式集成和增量集成。****

  • 批式集成模式基于Flink Batch模式打造,将数据以批的形式在不同系统中传输,目前支持了20多种不同数据源类型。
  • 流式集成模式主要是从MQ将数据导入到Hive和HDFS,任务的稳定性和实时性都受到了用户广泛的认可。
  • 增量模式即CDC模式,用于支持通过数据库变更日志Binlog,将数据变更同步到外部组件的数据库。这种模式目前支持5种数据源,虽然数据源不多,但是任务数量非常庞大,其中包含了很多核心链路,例如各个业务线的计费、结算等,对数据准确性要求非常高。在CDC链路的整体链路比较长。首先,首次导入为批式导入,我们通过Flink Batch模式直连Mysql库拉取全量数据写入到Hive,增量Binlog数据通过流式任务导入到HDFS。由于Hive不支持更新操作,我们依旧使用了一条基于Spark的批处理链路,通过T-1增量合并的方式,将前一天的Hive表和新增的Binlog进行合并从而产出当天的Hive表。

随着业务的快速发展,这条链路暴露出来的问题也越来越多。

  • 首先,这条基于Spark的离线链路资源消耗严重,每次产出新数据都会涉及到一次全量数据Shuffle以及一份全量数据落盘,中间所消耗的储存以及计算资源都比较严重。
  • 同时,随着字节跳动业务的快速发展,近实时分析的需求也越来越多。
  • 最后,整条链路流程太长,涉及到Spark和Flink两个计算引擎,以及3个不同的任务类型,用户使用成本和学习成本都比较高,并且带来了不小的运维成本。

为了解决这些问题,我们希望对增量模式做一次彻底的架构升级,将增量模式合并到流式集成中,从而可以摆脱对Spark的依赖,在计算引擎层面做到统一。

改造完成后,基于Flink的数据集成引擎就能同时支持批式、流式和增量模式,几乎可以覆盖所有的数据集成场景。

同时,在增量模式上,提供和流式通道相当的数据延迟,赋予用户近实时分析能力。在达到这些目标的同时,还可以进一步降低计算成本、提高效率。

image.png

经过一番探索,我们关注到了正在兴起的数据湖技术。

关于数据湖技术选型的思考

我们的目光集中在了Apache软件基金会旗下的两款开源数据湖框架Iceberg和Hudi中。Iceberg和Hudi两款数据湖框架都非常优秀。但两个项目被创建的目的是为了解决不同的问题,所以在功能上的侧重点也有所不同。

  • Iceberg:核心抽象对接新的计算引擎的成本比较低,并且提供先进的查询优化功能和完全的schema变更。
  • Hudi:更注重于高效率的Upsert和近实时更新,提供了Merge On Read文件格式,以及便于搭建增量ETL管道的增量查询功能。

一番对比下来,两个框架各有千秋,并且离我们想象中的数据湖最终形态都有一定距离,于是我们的核心问题便集中在了以下两个问题:

  • 哪个框架可以更好的支持我们CDC数据处理的核心诉求?
  • 哪个框架可以更快速补齐另一个框架的功能,从而成长为一个通用并且成熟的数据湖框架?

经过多次的内部讨论,我们认为:Hudi在处理CDC数据上更为成熟,并且社区迭代速度非常快,特别是最近一年补齐了很多重要的功能,与Flink的集成也愈发成熟,最终我们选择了Hudi作为我们的数据湖底座。

01 - 索引系统

我们选择Hudi,最为看重的就是Hudi的索引系统。

image.png

这张图是一个有索引和没有索引的对比。

在CDC数据写入的过程中,为了让新增的Update数据作用在底表上,我们需要明确知道这条数据是否出现过、出现在哪里,从而把数据写到正确的地方。在合并的时候,我们就可以只合并单个文件,而不需要去管全局数据。

如果没有索引,合并的操作只能通过合并全局数据,带来的就是全局的shuffle。

在图中的例子中,没有索引的合并开销是有索引的两倍,并且如果随着底表数据量的增大,这个性能差距会呈指数型上升。

所以,在字节跳动的业务数据量级下,索引带来的性能收益是非常巨大的。

Hudi提供了多种索引来适配不同的场景,每种索引都有不同的优缺点,索引的选择需要根据具体的数据分布来进行取舍,从而达到写入和查询的最优解。

下面举两个不同场景的例子。

日志数据去重场景

在日志数据去重的场景中,数据通常会有一个create_time的时间戳,底表的分布也是按照这个时间戳进行分区,最近几小时或者几天的数据会有比较频繁的更新,但是更老的数据则不会有太多的变化。

冷热分区的场景就比较适合布隆索引、带TTL的State索引和哈希索引

CDC场景

第二个例子是一个数据库导出的例子,也就是CDC场景。这个场景更新数据会随机分布,没有什么规律可言,并且底表的数据量会比较大,新增的数据量通常相比底表会比较小。

在这种场景下,我们可以选用哈希索引、State索引和Hbase索引来做到高效率的全局索引

这两个例子说明了不同场景下,索引的选择也会决定了整个表读写性能。Hudi提供多种开箱即用的索引,已经覆盖了绝大部分场景,用户使用成本非常低。

02 - Merge On Read表格式

除了索引系统之外,Hudi的Merge On Read表格式也是一个我们看重的核心功能之一。这种表格式让实时写入、近实时查询成为了可能。在大数据体系的建设中,写入引擎和查询引擎存在着天然的冲突:

  • 写入引擎更倾向于写小文件,以行存的数据格式写入,尽可能避免在写入过程中有过多的计算包袱,最好是来一条写一条。
  • 查询引擎则更倾向于读大文件,以列存的文件格式储存数据,比如说parquet和orc,数据以某种规则严格分布,比如根据某个常用字段进行排序,从而做到可以在查询的时候,跳过扫描无用的数据,来减少计算开销。

为了在这种天然的冲突下找到最佳的取舍,Hudi支持了Merge On Read的文件格式。

image.png

MOR格式中包含两种文件:一种是基于行存Avro格式的log文件,一种是基于列存格式的base文件,包括Parquet或者ORC。log文件通常体积较小,包含了新增的更新数据。base文件体积较大,包含了所有的历史数据。

  • 写入引擎可以低延迟的将更新的数据写入到log文件中。
  • 查询引擎在读的时候将log文件与base文件进行合并,从而可以读到最新的视图;compaction任务定时触发合并base文件和log文件,避免log文件持续膨胀。在这个机制下,Merge On Read文件格式做到了实时写入和近实时查询。

03 - 增量计算

索引系统Merge On Read格式给实时数据湖打下了非常坚实的基础,增量计算则是这个基础之上的Hudi的又一个亮眼功能:

增量计算赋予了Hudi类似于消息队列的能力。用户可以通过类似于offset的时间戳,在Hudi的时间线上拉取一段时间内的新增数据。

在一些数据延迟容忍度在分钟级别的场景中,基于Hudi可以统一Lambda架构,同时服务于实时场景和离线场景,在储存上做到流批一体。

在选择了基于Hudi的数据湖框架后,我们基于字节跳动内部的场景,打造定制化落地方案。我们的目标是通过Hudi来支持所有带Update的数据链路:

  • 需要高效率且低成本的Upsert
  • 支持高吞吐
  • 端到端的数据可见性控制在5-10分钟以内

目标明确后,我们开始了对Hudi Flink Writer进行了测试。这个图是Hudi on Flink Writer的架构:一条新的数据进来之后,首先会经过一个索引层,从而找到它需要去的地方。

image.png

  • State索引中保存了所有主键和文件ID的一一映射关系,对于Update数据,会找到其所存在的文件ID,对于Insert数据,索引层会给他指定一个新的文件ID,或者是历史文件中的小文件,让其填充到小文件中,从而避免小文件问题。
  • 经过索引层之后,每条数据都会带有一个文件ID,Flink会根据文件ID进行一次shuffle,将相同文件ID的数据导入到同一个子任务中,同时可以避免多个任务写入同一个文件的问题。
  • 写入子任务中有一个内存缓冲区,用于储存当前批次的所有数据,当Checkpoint触发时,子任务缓冲区的数据会被传入Hudi Client中,Client会去执行一些微批模式的计算操作,比如Insert/Upsert/Insert overwrite等,每种操作的计算逻辑不同,比如说Insert操作,会生成一个新的文件,Upsert操作可能会和历史文件做一次合并,
  • 待计算完成后,将处理好的数据写入到HDFS中,并同时收集元数据。
  • Compaction任务为流任务的一部分,会定时的去轮训Hudi的时间线,查看是否有Compaction计划存在,如果有Compaction计划,会通过额外的Compaction算子来执行。

在测试过程中,我们遇到了以下几个问题:

  • 在数据量比较大的场景下,所有的主键和文件ID的映射关系都会存在State中,State的体积膨胀的非常快,带来了额外的储存开销,并且有时会造成Checkpoint超时的问题。
  • 第二个问题是,由于Checkpoint期间,Hudi Client操作比较重,比如说和底层的base文件进行合并,这种操作涉及到了历史文件的读取,去重,以及写入新的文件,如果遇到HDFS的抖动,很容易出现Checkpoint超时的问题
  • 第三个问题是,Compaction任务作为流式任务的一部分,任务启动后资源就不可调节,如果需要调节,只能重启整个任务,开销比较大,如果不能灵活调节Compaction任务,就可能会出现Compaction算子空跑导致资源浪费,或者资源不足导致任务失败的情况

为了解决这些问题,我们开始针对我们的场景进行了定制化的优化

技术方案

01 - 索引层

索引的目的就是找到当前这条数据所在的文件地点,存在State中的话每条数据都涉及到一次State的读和写,在数据量大的场景下,所带来的计算和储存开销都是比较大的。

字节跳动内部开发了一种基于哈希的索引,可以通过直接对主键的哈希操作来找到文件所在的位置,这种方式在非分区表下可以做到全局索引,绕过了对State的依赖,改造过后,索引层变成了一层简单的哈希操作。

image.png

02 - 写入层

早期的Hudi写入和Spark强绑定,在2020年底,Hudi社区对底层的Hudi Client进行了拆分,并且支持了Flink引擎,这种改造方式是将Spark RDD的操作变成了一个List的操作,所以底层还是一个批式操作,对于Flink来说,每一次Checkpoint期间所需要做的计算逻辑是类似于Spark RDD的,相当于是执行了一次批式的操作,计算包袱是比较大的。

写入层的具体流程是:一条数据经过索引层后,来到了写入层,数据首先会在Flink的内存缓冲区积攒,同时通过内存监控来避免内存超出限制导致任务失败,到了Checkpoint的时候,数据会被导入到Hudi Client,然后Hudi Client会通过Insert,Append,Merge等操作计算最终的写入数据,计算完成后将新的文件写入到HDFS并同时回收元数据。

image.png

我们的核心目标在于如何让这种微批的写入模式更加的流式化,从而降低Checkpoint期间的计算负担。

  • 在表结构上, 我们选择了与流式写入更加匹配的Merge on Read格式,写入的算子只负责对于log文件的追加写入,不做任何别的额外的操作,例如和base文件进行合并。
  • 在内存上,我们将第一层Flink的内存缓冲区去掉,直接把内存缓冲区建立在了hudi client中,在数据写入的同时进行内存监控避免内存超出限制的情况,
  • 我们将写入hdfs的操作和Checkpoint进行了解耦,任务运行过程中,每一小批数据就会写入HDFS一次,由于HDFS支持追加写操作,这种形式也不会带来小文件的问题,从而将Checkpoint尽可能的轻量化,避免HDFS抖动和计算量过大带来的Checkpoint超时的问题。

03- Compaction层

Compaction任务本质上是一个批任务,所以需要和流式写入进行拆分,目前Hudi on Flink支持了异步执行Compaction的操作,我们的线上任务全部使用了这种模式。

在这种模式下,流式任务可以专注于写入,提升吞吐能力和提高写入的稳定性,批式的Compaction任务可以流式任务解耦,弹性伸缩高效的利用计算资源,专注于资源利用率和节约成本。

在这一系列的优化过后,我们在一个2百万rps的Kafka数据源上进行了测试,使用了200个并发导入到Hudi。和之前相比,Checkpoint耗时从3-5分钟降低到了1分钟以内,HDFS抖动带来的任务失败率也大幅度下降由于Checkpoint耗时降低,实际用于数据处理的时间变得更多了,数据吞吐量翻了一倍,同时State的存储开销也降到了最低。

image.png

这是最终的CDC数据导入流程图。

首先,不同的数据库会将Binlog发送到消息队列中,Flink任务会将所有数据转换成HoodieRecord格式,然后通过哈希索引找到对应的文件ID,通过一层对文件ID的shuffle后,数据到达了写入层,写入算子以追加写的形式将数据频繁的写入到HDFS中,Checkpoint触发后,Flink会将所有的元数据收集到一起,并写入到hudi的元数据系统中,这里就标志了一个Commit提交完成,一个新的Commit会随之开始。

用户可以通过Flink Spark Presto等查询引擎,近实时的查询已经提交完成的数据。

数据湖平台侧托管的Compaction服务会定时提交Flink Batch模式的Compaction任务,对Hudi表进行压缩操作,这个过程对用户无感知并且不影响写入任务。

image.png

我们这一整套解决方案也会贡献给社区,感兴趣的同学可以关注Hudi社区最新的进展。

流式数据湖集成框架的典型落地场景

流式数据湖集成框架改造完成后,我们找到了一些典型的落地场景:

应用最普遍的就是将线上数据库导入到离线数仓进行分析的场景,和之前的Spark离线链路相比:端到端的数据延迟从一个小时以上降低到了5-10分钟,用户可以进行近实时的数据分析操作。

在资源利用率方面,我们模拟了一个Mysql导入离线数仓进行分析的场景,将Flink流式导入Hudi和Spark离线合并的方案进行了对比,在用户小时级查询的场景下,端到端的计算资源大约节约了70%左右

在字节跳动EB级数据量的数仓场景下,这种资源利用率的提升所带来的收益是非常巨大的。

image.png

对于基于消息队列和Flink构建实时数仓的用户来说,他们可以把不同数仓层级的实时数据导入到Hudi,这类数据update的情况很多,所以相较于Hive,Hudi可以提供高效率且低成本的Upsert操作,从而用户可以对于全量数据进行近实时查询,避免了一次去重的操作。

image.png

这是一个Flink双流Join的场景,很多Flink的用户会使用双流Join来进行实时的字段拼接,在使用这个功能的时候,用户通常会开一个时间窗口,然后将这个时间窗口中来自不同数据源的数据拼接起来,这个字段拼接功能也可以在Hudi的层面实现。

我们正在探索一个功能,在Flink中只将不同Topic的数据Union在一起,然后通过Hudi的索引机制,将相同主键的数据都写入到同一个文件当中,然后通过Compaction的操作,将数据进行拼接。

这种方式的优点在于,我们可以通过Hudi的索引机制来进行全局字段拼接,不会受到一个窗口的限制。

整个拼接逻辑通过HoodiePayload实现,用户可以简单的继承HoodiePayload,然后来开发自己的自定义的拼接逻辑,拼接的时机可以是Compaction任务,也可以是Merge on Read近实时查询,用户可以根据需求场景,灵活的使用计算资源。但是相比Flink双流Join,这种模式会有一个缺点,就是实时性和易用性上要差一些。

image.png

结论

在这一系列的工作过后,我们对数据湖的未来满怀期待,同时也设立的明确的目标。

首先,我们希望将Hudi作为所有CDC数据源的底层存储,完全替换掉基于Spark的离线合并方案,通过数据集成引擎流式导入,将近实时离线分析的能力带给所有的在线数据库。

image.png

接着,增量ETL场景也是一个重要的落地场景,对于数据延迟容忍度在分钟级的场景,Hudi可以作为统一存储同时服务于实时链路和离线链路,从而将传统的数仓Lambda架构升级到真正意义上的流批一体。

image.png

最后,我们希望建设一个智能数据湖平台,这个平台会托管所有数据湖的运维管理,达到自我治理的一个状态,用户则不需要再为运维而烦恼。

同时,我们希望提供自动化调优的功能,基于数据的分布找到最佳的配置参数,例如之前提到的不同索引之间的性能取舍问题,我们希望通过算法来找到最佳的配置,从而提高资源利用率,并降低用户的使用门槛。

极佳的用户体验也是我们的追求之一,我们希望在平台侧做到一键入湖入仓,大大降低用户的开发成本。

image.png

数据湖集成技术也已经通过火山引擎大数据研发治理套件DataLeap对外开放。

火山引擎大数据研发治理套件DataLeap: 一站式数据中台套件,帮助用户快速完成数据集成、开发、运维、治理、资产、安全等全套数据中台建设,帮助数据团队有效的降低工作成本和数据维护成本、挖掘数据价值、为企业决策提供数据支撑。

268
0
0
0
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论