火山引擎 EMR 作为一款云原生开源大数据平台产品,集成了包括 Hadoop、Spark、Flink 等引擎,并做到100%开源兼容。Doris 作为 OLAP 领域中一款极具代表性的开源组件,也被集成到了火山引擎 EMR 产品生态中。
本文主要介绍 Apache Doris 设计和开发数据湖联邦分析特性的思考和实践。全文分为三部分,首先介绍数据湖相关技术的演进,其次介绍 Apache Doris 数据湖联邦分析的整体设计和相关特性,最后介绍 Apache Doris 在数据湖联邦分析上的未来规划。
回顾湖仓一体的发展史,主要经历了三个阶段。第一个阶段是数据仓库,第二个阶段是数据湖,第三个阶段是湖仓一体。
数据仓库阶段
数据仓库是在上个世纪80年代兴起的一项技术。随着企业业务发展和大规模计算技术的发展,越来越多的企业使用数据仓库来处理企业产生的数据,发现数据的商业价值。
在这个时期,主要是将来自业务系统的多种结构化数据聚合到数据仓库中,利用 MPP 等大规模并发技术对企业的数据进行分析,支撑上层的商业分析和决策。
数据湖阶段
数仓的主要特点是只能处理结构化数据。随着数据科学和人工智能的发展,产生了越来越多的非结构化数据,但非结构化数据在数仓中处理中相对麻烦,于是数据湖技术出现了。
数据湖可以被定义为一种存储各类原始数据的存储库,原始数据包含结构化、半结构化以及非结构化数据。一部分原始数据会经过 ETL 同步到数据集市中,支撑商业分析和决策类应用,另一部分数据将被机器学习和数据科学类应用直接访问。
湖仓一体阶段
数据湖模式缺乏一些关键特性,如不支持事务、数据缺乏一致性、缺乏隔离性、无法保证数据质量等,导致数据湖管理复杂,如果管理不善,数据湖将会退化成数据沼泽。
于是,2020年湖仓一体的概念被提出,主要指在数据湖中建设存储、湖上建仓。
湖仓一体的优势特性包括:
● 支持事务。 在企业中,数据往往由业务系统提供、并发读取和写入,对事务性要求高。由于一部分业务在读取数据,同时另一部分业务在写入数据,需要保证在并发过程中数据的一致性和正确性。
● 支持数据模型化和治理, 并在数据湖上建设数仓模型,如星型、雪花模型都可以在数据湖上构建,进一步支持上层商业智能类应用,并对接多种BI类工具。
● 支持存算分离, 数据湖中有海量数据,如果存储在数仓等系统中会非常昂贵,因此需要存储在对象存储等较便宜的存储系统中。利用湖仓一体这种架构,实现存算分离模式。
● 更好的开放性。 支持 Parquet、ORC 等常见的大数据存储格式,也支持 Hudi、Iceberg、DeltaLake 等表格管理存储格式,支持结构化、半结构化和非结构化等数据类型,支持不同类型的工作负载等。
● 生态工具与组件丰富。 围绕数据湖也出现了很多相关工具和组件,如数据目录、开发工具、隐私计算、元数据管理等,其中以 Hudi、Iceberg、DeltaLake 这三种数据湖存储格式最为流行。
湖仓一体技术也存在一些缺点,其中比较突出的是对实时性支持不足。如果我们把数据湖和实时数仓进行融合,利用实时数仓的快速分析能力去查询数据湖中的海量数据,势必将会给企业带来更高的价值。
数据湖和实时数仓具备不同特点:
● 数据湖: 提供多模存储引擎,如 S3、HDFS 等,也支持多计算引擎,如 Hive、Spark、Flink 等。在事务性方面,数据湖支持 ACID 和 snapshot 等方式。同时,数据湖提供了 Hudi、Iceberg、DeltaLake 等表格式的定义,也支持结构化、半结构化和非结构化数据。
● 实时数仓: 提供实时指标的聚合,数据可以秒级入库。实时数仓的分析能力也较强,支持秒级和亚秒级的数据分析,支持多维分析和联合分析。对外可以提供高并发数据服务,如 Doris 可以提供万级 QPS 的数据服务,也提供数据更新能力。
通过结合数据湖和 Doris 两方的特性,既可以利用数据湖中存储的海量数据,又可以利用 Doris 向量化分析能力加速海量数据的洞察效率,利用 Doris 提供高并发数据服务和数据更新能力,那必将事半功倍,这也是字节跳动进行数据湖联邦分析特性的初衷。
Multi-Catalog的架构设计
目前字节跳动的数据湖联邦分析架构采用 multi-catalog模式,由外表的方式转为catalog的方式,如下图所示:
原本 Doris 采用两层架构,一层是 Database、一层是 Table。先创建 Database,再创建各种 Table。
除了 OLAP 内表模式外,还支持创建各种类型的外表,如 Hive 外表、Iceberg 外表、JDBC 外表和 ElasticSearch 外表等。
基于 Doris 原生外表模式,也可以访问数据湖中的数据源,但存在如下缺点:
● 首先需要在 Doris 中创建外表,创建时还需要制定 Schema。如果外部数据源多,一个一个在 Doris 中进行创建就显得非常繁琐和不便。
● 如果外部数据源,如 Hive 中的 Schema 发生了变更,那 Doris 中对应的表就需要重建,否则查询就会失败。
针对以上问题,我们参考数据库的设计理念,增加了 Catalog 一层,将原有的 Database 和 Table 挂在 Internal Catalog 下,目前已经实现了 Hive Catalog、JDBC Catalog 和 ElasticSearch Catalog。
在该架构下,增加新的 Catalog 会非常便捷。在 Catalog 下,通过 Create Catalog 可以方便地创建 Hive Catalog。创建完成后,即可切换到 Hive Catalog 中,通过 Show Database、Show Table 来查看 Hive 对应的 Database 和 Table。
在 Show Database 时查看的 Database,即 Hive Catalog 下的 Database,也就是 Hive Metastore 中的 Database 列表。我们在某个 DB 下 Show Table,也可以看到该 DB 下的 Table,同样和 Hive Metastore 保持一致,无需创建外表。其他类型的 Catalog 也类似。
Multi-Catalog的元数据技术原理
那么, Catalog 如何与外部元数据对接?
以 Hive MetaStore举例。元数据架构设计如下图所示,设计思路包括几方面:
我们已经添加了 Hive MetaStore这一类型的 Catalog,可以动态添加、删除和切换 Catalog。通过 Create Catalog,将 Type 指定为 Hive,指定 Hive Catalog 的地址,即可完成创建。
通过 Drop 和 Switch 命令也可以很容易地进行删除和切换。在 Doris 中无需创建外表,执行 Show Database 和 Table 的时候,FE 会连接至对应的 Hive MetaStore,来查询其中的 DB 和 Table。获取到 DB 和 Table 之后,再由 FE 返回客户端。
当制定 Select 查询操作时,FE 会连接到 Hive MetaStore 来获取该 Table 下的元数据信息,包含它的 Schema、Location、格式等信息,完成查询规划,进而完成查询。
我们也会对从 Hive MetaStore 中获取的元数据进行缓存,来加速查询。JDBC Catalog 和 ES Catalog 也是类似的方式,会分别连接到外部的 JDBC Server 和 ES Server 来进行元数据获取。
在这种统一的数据查询框架下,我们要新增一种新的数据源非常方便的,大概一人周就可以完成一种新的数据源的开发。
当前我们已经内置提供了 Hive、JDBC、ES等数据源。添加新数据源时,只需关心数据源自身的访问相关操作,增加新的 ScanNode。例如,在 Hive、JDBC、ES 的设计中,分别内置了 FileScanNode、JDBCScanNode 和 ESScanNode。
在统一的调度框架下 Scanner Scheduler 下,我们会将 ScanNode 产生的 Scanner 提交到 Scanner Thread Pool 进行扫描查询。
对于 Hive 的 FileScanNode 来说,大多数情况是读取外部存储系统的文件,我们提供了 Parquet Reader、ORC Reader 和 TEXT Reader,支持对Parquer、ORC、 JSON 和 CSV 进行读取。
对于 Scan 之上的操作,我们完全无需关心,因为 Scan 产生的这种 Block 数据可以直接被上层应用进行向量化查询。在 Scan 层面,我们也增强了基于代价的查询优化器,可以根据统计信息进行查询优化。
在算子优化方面,我们也针对 Predicate Pushdown,Join Runtime Filter 和 Streaming Aggregation 等进行了优化。
Multi-Catalog的查询框架
那么,整个查询流程是怎么做的呢?
举个例子,比如执行一个查询,Select * from Hive Catalog 中的 DB1 下的 Table1 的流程。
第一步,对于这样的查询,在 FE 中会首先连接到 Hive MetaStore ,获取 Table 相应的元数据。元数据中包含Schema 信息。
第二步,如果分区表,也会获取相应的分区。如果过滤条件中包含分区过滤条件,也会将过滤条件传递到 Hive MetaStore 中,减少返回的分区大小。
第三步,对返回的元数据信息进行分区裁剪和计划生成。分区裁剪完之后,我们会根据元数据信息链接到 HDFS 或 S3 中获取文件列表。获取到文件列表后,会进行计划生成,该生成逻辑和原有的逻辑类似。
第四步,生成完之后,我们会对任务进行拆分和下发,下发到 BE 中执行。对于下发的任务,BE 会基于 FE 下发的信息,直连 HDFS 和 S3 进行文件读取,读取效率非常高。
以上就是一个完整的查询流程。
除此之外,字节跳动也对查询过程进行了很多优化。
● 在缓存方面,我们在 FE 中实现元数据缓存, 主要缓存 Db/Table 信息、Table 分区信息、Table 分区值信息和 File list 信息。Schema Cache 主要是指 Get Table 的时候,如果此时 FE 不存在对应 Table 信息,需要连接到 Hive MetaStore 获取 Table 的元数据信息,包括 Schema 和 格式。
● 相关信息获取完后,我们会把信息维护在 FE 的内存中。 当再度访问相同 Table 时,可直接使用内存中的元数据信息,减少多次 RPC 调用,提高查询效率。
● 如果是分区表,我们也提供 Partition Cache 、Partition Value Cache以及 File Cache, 其中File Cache 是指 File list,并不是文件的数据内容。
● 在查询规划方面,我们会连接存储系统, 获取到相应的 File list,并将该信息维护在 FE 缓存中,进行查询加速。
以上是 FE 中缓存的相应元数据信息。
在 BE 端,我们也实现了一部分缓存信息:
● 第一, Prefetch Buffer 功能。在 BE 去查询 HDFS 和 S3 数据时,如 Parquet 或者 ORC 格式,会进行跳跃式读取。读完当前 Block ,读下一个 Block 时,我们会对 IO 做合并,一次读取多个 Block 信息,减少 RPC 调用。读取完数据,后续查询可以直接利用已读取的数据。
● 第二, 维护File Block Cache。读取完 Parquet 文件中数据后,我们会对 Block 数据进行本次缓存,下次再查询相同文件时,可以充分利用本地这份缓存,减少和远端存储系统交互,提高查询效率。
此外,字节跳动也对 FileReader 进行重构。
FileReader 和外表交互的原始方式是使用 Arrow Parquet Reader,主要存在以下问题:
● Arrow Parquet Reader 读取数据时会先做一层内存转换,把 Parquet 文件数据转换成 Arrow 这种内存格式。然后,Doris 会再做一次转化,将其转为 Doris 使用的向量化的 Block 内存格式。这就造成了多一次的内存转换。
● 无法使用 Parquet 中的 Page Index,导致读取效率较低。
● 无法使用 Parquet 中的 Bloom Filter,在部分查询有过滤条件时,将导致读取效率不高。
● 不支持字典编码。
针对以上问题,我们对 Reader 进行了重构,重写了 Native Parquet Reader,具备以下特点:
● Native Parquet Reader 读取数据会直接转化为 Doris 使用的向量化的 Block 内存格式。下游进行计算时可以直接利用这种格式,避免二次转换。
● 可以利用 Parquet 中的 Page Index,能够更精确地过滤掉无用数据,降低数据 IO。
● 可以利用 Parquet 中的 Bloom Filter 来过滤数据,提高查询效率。
● 支持字典编码和延迟物化,比如性别男和女在文件存储中使用0和1。
我们设置过滤条件只查询性别为男的数据,常规的读取方式会先把文件存储中的0和1数据用字典解码为性别男和女。然后,再将男和女的字符串和过滤条件进行比较,保留性别为男的数据。
这种模式因为有字符串的参与,效率会非常低。在我们这次重构中对此进行了优化,在设置过滤条件时可以通过字典知道对应数据是0,所以查询中直接可以使用 0 这个 int 类型数据来进行读取和延迟物化,从而提供查询效率。
Compute-Node计算节点
最后,我们增加了弹性计算节点。
原始的 Doris 中,BE 节点是存算一体的,提供 Local Storage 这种存储引擎,如 OLAP 表就存储在本地。但在数据湖联邦分析场景中,我们查询远端数据湖数据时是不需要 Local Storage 这种本地存储的,因为数据湖中数据量较大,会造成 BE 节点扩容。
因此,我们增加了无状态的 BE 节点,即 BE Compute Node,可以快速进行扩容,增加计算负载,提高数据湖查询效率。在弹性场景中,这个特性会非常有用。
数据湖联邦分析性能提升
针对以上优化,我们也进行了性能测试。在同资源规格下查询 Iceberg TPCH 100G 数据集,相比于 Trino (372),实现了3~5倍性能提升。
数据湖index增强
首先,我们将对 Iceberg/Hudi Index 进行增强。
Hive、Spark、Presto、Flink等引擎已经针对 Iceberg/Hudi Index 做了很多相关优化,这些引擎与数据湖已经有比较紧密的结合,各种优化与加速手段相对比较完善。
但对于 Doris 这种新接入的引擎来说,对于 Iceberg/Hudi Index 的支持还不是很完善,我们计划对其进行增强。在 FE 查询规划阶段,充分利用 Iceberg/Hudi metadata 中的 Index 信息进行查询过滤,减少数据扫描,提升查询效率。
数据湖写入能力
其次,我们计划增强数据湖写入功能, DataLake Sink。
当前,针对 Hive、Iceberg、Hudi 仅支持查询,计划做以下增强:
● 针对 Hive 表, 增加 Parquet/Orc Writer 和 CSV/JSON Writer,支持对 Hive 表的 Insert 操作。
● 针对 Iceberg 表, 增加 Parquet/Orc Writer 和 Iceberg metadata commiter。
● 针对 Hudi 表, 增加 Parquet Writer 和 Hudi metadata sync。
Iceberg metadata center
此外,我们计划增加 Doris Iceberg metadata center,期望 Doris 可以作为 Iceberg 的元数据维护中心,也希望这个功能可以提供给 Doris 之外的引擎。
目前 Iceberg 元数据维护在 HMS 或 HDFS 中,这种模式下操作效率较低。我们期望由 Doris FE 维护 Iceberg 的元数据信息,接收变更、读取、查询等。
在这个模式下,首先 Doris 可以完成Iceberg 的数据查询加速,另外其他引擎的变更也可以统一提交至 Doris 维护的 Iceberg 的元数据维护中心,实现统计信息的全面维护和查询效率的提升。
本文提及的 Doris 数据湖联邦分析特性均已在火山引擎EMR产品中对外提供服务,欢迎大家试用体验。