干货|揭秘字节跳动对Apache Doris 数据湖联邦分析的升级和优化

数据中台大数据数据湖仓

火山引擎 EMR 作为一款云原生开源大数据平台产品,集成了包括 Hadoop、Spark、Flink 等引擎,并做到100%开源兼容。Doris 作为 OLAP 领域中一款极具代表性的开源组件,也被集成到了火山引擎 EMR 产品生态中。

本文主要介绍 Apache Doris 设计和开发数据湖联邦分析特性的思考和实践。全文分为三部分,首先介绍数据湖相关技术的演进,其次介绍 Apache Doris 数据湖联邦分析的整体设计和相关特性,最后介绍 Apache Doris 在数据湖联邦分析上的未来规划。

1. 湖仓一体架构演进

回顾湖仓一体的发展史,主要经历了三个阶段。第一个阶段是数据仓库,第二个阶段是数据湖,第三个阶段是湖仓一体。

数据仓库阶段

数据仓库是在上个世纪80年代兴起的一项技术。随着企业业务发展和大规模计算技术的发展,越来越多的企业使用数据仓库来处理企业产生的数据,发现数据的商业价值。
在这个时期,主要是将来自业务系统的多种结构化数据聚合到数据仓库中,利用 MPP 等大规模并发技术对企业的数据进行分析,支撑上层的商业分析和决策。

数据湖阶段

数仓的主要特点是只能处理结构化数据。随着数据科学和人工智能的发展,产生了越来越多的非结构化数据,但非结构化数据在数仓中处理中相对麻烦,于是数据湖技术出现了。
数据湖可以被定义为一种存储各类原始数据的存储库,原始数据包含结构化、半结构化以及非结构化数据。一部分原始数据会经过 ETL 同步到数据集市中,支撑商业分析和决策类应用,另一部分数据将被机器学习和数据科学类应用直接访问。

湖仓一体阶段

数据湖模式缺乏一些关键特性,如不支持事务、数据缺乏一致性、缺乏隔离性、无法保证数据质量等,导致数据湖管理复杂,如果管理不善,数据湖将会退化成数据沼泽。
于是,2020年湖仓一体的概念被提出,主要指在数据湖中建设存储、湖上建仓。
湖仓一体的优势特性包括:
● 支持事务。 在企业中,数据往往由业务系统提供、并发读取和写入,对事务性要求高。由于一部分业务在读取数据,同时另一部分业务在写入数据,需要保证在并发过程中数据的一致性和正确性。
● 支持数据模型化和治理, 并在数据湖上建设数仓模型,如星型、雪花模型都可以在数据湖上构建,进一步支持上层商业智能类应用,并对接多种BI类工具。
● 支持存算分离, 数据湖中有海量数据,如果存储在数仓等系统中会非常昂贵,因此需要存储在对象存储等较便宜的存储系统中。利用湖仓一体这种架构,实现存算分离模式。
● 更好的开放性。 支持 Parquet、ORC 等常见的大数据存储格式,也支持 Hudi、Iceberg、DeltaLake 等表格管理存储格式,支持结构化、半结构化和非结构化等数据类型,支持不同类型的工作负载等。
● 生态工具与组件丰富。 围绕数据湖也出现了很多相关工具和组件,如数据目录、开发工具、隐私计算、元数据管理等,其中以 Hudi、Iceberg、DeltaLake 这三种数据湖存储格式最为流行。

picture.image

湖仓一体技术也存在一些缺点,其中比较突出的是对实时性支持不足。如果我们把数据湖和实时数仓进行融合,利用实时数仓的快速分析能力去查询数据湖中的海量数据,势必将会给企业带来更高的价值。
数据湖和实时数仓具备不同特点:
● 数据湖: 提供多模存储引擎,如 S3、HDFS 等,也支持多计算引擎,如 Hive、Spark、Flink 等。在事务性方面,数据湖支持 ACID 和 snapshot 等方式。同时,数据湖提供了 Hudi、Iceberg、DeltaLake 等表格式的定义,也支持结构化、半结构化和非结构化数据。
● 实时数仓: 提供实时指标的聚合,数据可以秒级入库。实时数仓的分析能力也较强,支持秒级和亚秒级的数据分析,支持多维分析和联合分析。对外可以提供高并发数据服务,如 Doris 可以提供万级 QPS 的数据服务,也提供数据更新能力。

picture.image

通过结合数据湖和 Doris 两方的特性,既可以利用数据湖中存储的海量数据,又可以利用 Doris 向量化分析能力加速海量数据的洞察效率,利用 Doris 提供高并发数据服务和数据更新能力,那必将事半功倍,这也是字节跳动进行数据湖联邦分析特性的初衷。

picture.image

2. Apache Doris 数据湖联邦分析特性揭解密

Multi-Catalog的架构设计

目前字节跳动的数据湖联邦分析架构采用 multi-catalog模式,由外表的方式转为catalog的方式,如下图所示:

picture.image

原本 Doris 采用两层架构,一层是 Database、一层是 Table。先创建 Database,再创建各种 Table。
除了 OLAP 内表模式外,还支持创建各种类型的外表,如 Hive 外表、Iceberg 外表、JDBC 外表和 ElasticSearch 外表等。
基于 Doris 原生外表模式,也可以访问数据湖中的数据源,但存在如下缺点:
首先需要在 Doris 中创建外表,创建时还需要制定 Schema。如果外部数据源多,一个一个在 Doris 中进行创建就显得非常繁琐和不便。
如果外部数据源,如 Hive 中的 Schema 发生了变更,那 Doris 中对应的表就需要重建,否则查询就会失败。
针对以上问题,我们参考数据库的设计理念,增加了 Catalog 一层,将原有的 Database 和 Table 挂在 Internal Catalog 下,目前已经实现了 Hive Catalog、JDBC Catalog 和 ElasticSearch Catalog。
在该架构下,增加新的 Catalog 会非常便捷。在 Catalog 下,通过 Create Catalog 可以方便地创建 Hive Catalog。创建完成后,即可切换到 Hive Catalog 中,通过 Show Database、Show Table 来查看 Hive 对应的 Database 和 Table。
在 Show Database 时查看的 Database,即 Hive Catalog 下的 Database,也就是 Hive Metastore 中的 Database 列表。我们在某个 DB 下 Show Table,也可以看到该 DB 下的 Table,同样和 Hive Metastore 保持一致,无需创建外表。其他类型的 Catalog 也类似。

Multi-Catalog的元数据技术原理

那么, Catalog 如何与外部元数据对接?
以 Hive MetaStore举例。元数据架构设计如下图所示,设计思路包括几方面:
我们已经添加了 Hive MetaStore这一类型的 Catalog,可以动态添加、删除和切换 Catalog。通过 Create Catalog,将 Type 指定为 Hive,指定 Hive Catalog 的地址,即可完成创建。
通过 Drop 和 Switch 命令也可以很容易地进行删除和切换。在 Doris 中无需创建外表,执行 Show Database 和 Table 的时候,FE 会连接至对应的 Hive MetaStore,来查询其中的 DB 和 Table。获取到 DB 和 Table 之后,再由 FE 返回客户端。
当制定 Select 查询操作时,FE 会连接到 Hive MetaStore 来获取该 Table 下的元数据信息,包含它的 Schema、Location、格式等信息,完成查询规划,进而完成查询。
我们也会对从 Hive MetaStore 中获取的元数据进行缓存,来加速查询。JDBC Catalog 和 ES Catalog 也是类似的方式,会分别连接到外部的 JDBC Server 和 ES Server 来进行元数据获取。

picture.image

在这种统一的数据查询框架下,我们要新增一种新的数据源非常方便的,大概一人周就可以完成一种新的数据源的开发。
当前我们已经内置提供了 Hive、JDBC、ES等数据源。添加新数据源时,只需关心数据源自身的访问相关操作,增加新的 ScanNode。例如,在 Hive、JDBC、ES 的设计中,分别内置了 FileScanNode、JDBCScanNode 和 ESScanNode。
在统一的调度框架下 Scanner Scheduler 下,我们会将 ScanNode 产生的 Scanner 提交到 Scanner Thread Pool 进行扫描查询。
对于 Hive 的 FileScanNode 来说,大多数情况是读取外部存储系统的文件,我们提供了 Parquet Reader、ORC Reader 和 TEXT Reader,支持对Parquer、ORC、 JSON 和 CSV 进行读取。
对于 Scan 之上的操作,我们完全无需关心,因为 Scan 产生的这种 Block 数据可以直接被上层应用进行向量化查询。在 Scan 层面,我们也增强了基于代价的查询优化器,可以根据统计信息进行查询优化。
在算子优化方面,我们也针对 Predicate Pushdown,Join Runtime Filter 和 Streaming Aggregation 等进行了优化。

Multi-Catalog的查询框架

picture.image

那么,整个查询流程是怎么做的呢?
举个例子,比如执行一个查询,Select * from Hive Catalog 中的 DB1 下的 Table1 的流程。
第一步,对于这样的查询,在 FE 中会首先连接到 Hive MetaStore ,获取 Table 相应的元数据。元数据中包含Schema 信息。
第二步,如果分区表,也会获取相应的分区。如果过滤条件中包含分区过滤条件,也会将过滤条件传递到 Hive MetaStore 中,减少返回的分区大小。
第三步,对返回的元数据信息进行分区裁剪和计划生成。分区裁剪完之后,我们会根据元数据信息链接到 HDFS 或 S3 中获取文件列表。获取到文件列表后,会进行计划生成,该生成逻辑和原有的逻辑类似。
第四步,生成完之后,我们会对任务进行拆分和下发,下发到 BE 中执行。对于下发的任务,BE 会基于 FE 下发的信息,直连 HDFS 和 S3 进行文件读取,读取效率非常高。
以上就是一个完整的查询流程。

picture.image

除此之外,字节跳动也对查询过程进行了很多优化。
● 在缓存方面,我们在 FE 中实现元数据缓存, 主要缓存 Db/Table 信息、Table 分区信息、Table 分区值信息和 File list 信息。Schema Cache 主要是指 Get Table 的时候,如果此时 FE 不存在对应 Table 信息,需要连接到 Hive MetaStore 获取 Table 的元数据信息,包括 Schema 和 格式。
 相关信息获取完后,我们会把信息维护在 FE 的内存中。 当再度访问相同 Table 时,可直接使用内存中的元数据信息,减少多次 RPC 调用,提高查询效率。
如果是分区表,我们也提供 Partition Cache 、Partition Value Cache以及 File Cache, 其中File Cache 是指 File list,并不是文件的数据内容。
在查询规划方面,我们会连接存储系统, 获取到相应的 File list,并将该信息维护在 FE 缓存中,进行查询加速。
以上是 FE 中缓存的相应元数据信息。
在 BE 端,我们也实现了一部分缓存信息:
第一, Prefetch Buffer 功能。在 BE 去查询 HDFS 和 S3 数据时,如 Parquet 或者 ORC 格式,会进行跳跃式读取。读完当前 Block ,读下一个 Block 时,我们会对 IO 做合并,一次读取多个 Block 信息,减少 RPC 调用。读取完数据,后续查询可以直接利用已读取的数据。
 第二, 维护File Block Cache。读取完 Parquet 文件中数据后,我们会对 Block 数据进行本次缓存,下次再查询相同文件时,可以充分利用本地这份缓存,减少和远端存储系统交互,提高查询效率。

picture.image

此外,字节跳动也对 FileReader 进行重构。
FileReader 和外表交互的原始方式是使用 Arrow Parquet Reader,主要存在以下问题:
● Arrow Parquet Reader 读取数据时会先做一层内存转换,把 Parquet 文件数据转换成 Arrow 这种内存格式。然后,Doris 会再做一次转化,将其转为 Doris 使用的向量化的 Block 内存格式。这就造成了多一次的内存转换。
● 无法使用 Parquet 中的 Page Index,导致读取效率较低。
● 无法使用 Parquet 中的 Bloom Filter,在部分查询有过滤条件时,将导致读取效率不高。
● 不支持字典编码。
针对以上问题,我们对 Reader 进行了重构,重写了 Native Parquet Reader,具备以下特点:
● Native Parquet Reader 读取数据会直接转化为 Doris 使用的向量化的 Block 内存格式。下游进行计算时可以直接利用这种格式,避免二次转换。
● 可以利用 Parquet 中的 Page Index,能够更精确地过滤掉无用数据,降低数据 IO。
● 可以利用 Parquet 中的 Bloom Filter 来过滤数据,提高查询效率。
● 支持字典编码和延迟物化,比如性别男和女在文件存储中使用0和1。
我们设置过滤条件只查询性别为男的数据,常规的读取方式会先把文件存储中的0和1数据用字典解码为性别男和女。然后,再将男和女的字符串和过滤条件进行比较,保留性别为男的数据。
这种模式因为有字符串的参与,效率会非常低。在我们这次重构中对此进行了优化,在设置过滤条件时可以通过字典知道对应数据是0,所以查询中直接可以使用 0 这个 int 类型数据来进行读取和延迟物化,从而提供查询效率。

picture.image

Compute-Node计算节点

最后,我们增加了弹性计算节点。
原始的 Doris 中,BE 节点是存算一体的,提供 Local Storage 这种存储引擎,如 OLAP 表就存储在本地。但在数据湖联邦分析场景中,我们查询远端数据湖数据时是不需要 Local Storage 这种本地存储的,因为数据湖中数据量较大,会造成 BE 节点扩容。
因此,我们增加了无状态的 BE 节点,即 BE Compute Node,可以快速进行扩容,增加计算负载,提高数据湖查询效率。在弹性场景中,这个特性会非常有用。

picture.image

数据湖联邦分析性能提升

针对以上优化,我们也进行了性能测试。在同资源规格下查询 Iceberg TPCH 100G 数据集,相比于 Trino (372),实现了3~5倍性能提升。

picture.image

3. Apache Doris 数据湖联邦分析未来规划

数据湖index增强

首先,我们将对 Iceberg/Hudi Index 进行增强。
Hive、Spark、Presto、Flink等引擎已经针对 Iceberg/Hudi Index 做了很多相关优化,这些引擎与数据湖已经有比较紧密的结合,各种优化与加速手段相对比较完善。
但对于 Doris 这种新接入的引擎来说,对于 Iceberg/Hudi Index 的支持还不是很完善,我们计划对其进行增强。在 FE 查询规划阶段,充分利用 Iceberg/Hudi metadata 中的 Index 信息进行查询过滤,减少数据扫描,提升查询效率。

picture.image

数据湖写入能力

其次,我们计划增强数据湖写入功能, DataLake Sink。
当前,针对 Hive、Iceberg、Hudi 仅支持查询,计划做以下增强:

针对 Hive 表, 增加 Parquet/Orc Writer 和 CSV/JSON Writer,支持对 Hive 表的 Insert 操作。
针对 Iceberg 表, 增加 Parquet/Orc Writer 和 Iceberg metadata commiter。
针对 Hudi 表, 增加 Parquet Writer 和 Hudi metadata sync。

picture.image

picture.image

Iceberg metadata center

此外,我们计划增加 Doris Iceberg metadata center,期望 Doris 可以作为 Iceberg 的元数据维护中心,也希望这个功能可以提供给 Doris 之外的引擎。
目前 Iceberg 元数据维护在 HMS 或 HDFS 中,这种模式下操作效率较低。我们期望由 Doris FE 维护 Iceberg 的元数据信息,接收变更、读取、查询等。
在这个模式下,首先 Doris 可以完成Iceberg 的数据查询加速,另外其他引擎的变更也可以统一提交至 Doris 维护的 Iceberg 的元数据维护中心,实现统计信息的全面维护和查询效率的提升。

picture.image

本文提及的 Doris 数据湖联邦分析特性均已在火山引擎EMR产品中对外提供服务,欢迎大家试用体验。

0
0
0
0
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论