Apache Iceberg 是一种开源数据 LakeHouse 表格式,提供强大的功能和开放的生态系统,如:Time travel,ACID事务,partition evolution,schema evolution等功能。
本文将讨论火山引擎EMR团队针对Iceberg组件的优化思路,通过引入索引来提高查询性能。
文丨火山引擎EMR团队-蕾蕾
/ 采用Iceberg构建数据湖仓 /
火山引擎 E-MapReduce(简称EMR)是火山引擎数智平台(VeDI)旗下的云原生开源大数据平台产品, 提供了企业级的 Hadoop、Spark、Flink、Hive、Presto、Kafka、StarRocks、Doris、Hudi、Iceberg 等大数据生态组件 , 100%开源兼容,可以帮助企业快速构建企业级大数据平台,降低运维门槛。
秉承业界领先的 EMR Stateless 理念,火山引擎 EMR 可以实现集群级别的弹性伸缩,即无业务需求时释放集群,有业务需求时再拉起集群,配合智能化的冷热数据分层存储能力,助力企业在大数据基建领域进一步降本提效。
基于火山引擎EMR产品,可以构建 数据湖仓、近实时数仓、实时数仓 等场景。
例如,使用Iceberg构建数据湖仓,从ODS到DWD等不同的分层进行建模,将数据HFDS或TOS(火山引擎对象存储产品)上,然后采用Trino或者Spark去做分析。
如何加速查询性能,使其尽可能接近专门的分布式数仓(如ClickHouse等),是需要思考和探究的问题。
索引是业界常用的提高查询性能的手段之一,针对Iceberg我们也采用了增加索引的方式。 对常用的列字段构建Index,在进行table scan时利用Index只返回匹配的数据,降低匹配数据量,从而大大提高查询性能。
/ Iceberg介绍 /
介绍Iceberg Index功能之前,我们先简单介绍下Iceberg的架构。Iceberg具有分层的元数据架构,如下如所示。
Spark、Presto、Flink等多种引擎读取Iceberg的数据,就是利用分层的元数据找到data file列表。例如,Spark引擎解析SQL语句,然后调用Iceberg的接口,获取data file并进行task切分。
在Manifest file中记录了data file中字段的最大值和最小值。
`"data_file": {`
`"content": 0,`
`"file_path": "hdfs://emr-cluster/warehouse/hive/db.db/sample/data/ts_day=2020-12-31/category=diamond/00000-0-220aa9a6-4530-499f-9450-da946d667624-00001.parquet",`
`"file_format": "PARQUET",`
`......`
`"lower_bounds": {`
`"array": [{`
`"key": 1,`
`"value": "\u0006\u0000\u0000\u0000"`
`}, {`
`"key": 2,`
`"value": "diamond"`
`}, {`
`"key": 3,`
`"value": "\u0000\u0004ÜÅ·\u0005\u0000"`
`}]`
`},`
`"upper_bounds": {`
`"array": [{`
`"key": 1,`
`"value": "\u0007\u0000\u0000\u0000"`
`}, {`
`"key": 2,`
`"value": "diamond"`
`}, {`
`"key": 3,`
`"value": "\u0000¨odÆ·\u0005\u0000"`
`}]`
`},`
`......`
`}`
利用这些信息,可以进行data file级别的初步过滤,把不符合条件的data file过滤掉,进而减少一部分数据的读取。
/ 实现索引的必要性 /
既然Iceberg已经提供data file级别的过滤。为什么我们还需要引入索引呢?以下面例子进行介绍,前两个表格分别是data file文件里面的内容,最下面表格是data file对应的manifest file。
| data file 1: | | id | name | age | | 001 | Adelina | 10 | | 002 | Virginia | 40 | | 003 | LiLy | 25 |
| data file 2: | | id | name | age | | 004 | LiLy | 20 | | 005 | Eleanor | 39 | | 006 | Willa | 70 |
| manifest file : | | datafile | Colunm name | min | max | | data file 1 | id | 001 | 003 | | data file 1 | name | Adelina | Virginia | | data file 1 | age | 10 | 40 | | data file 2 | id | 004 | 006 | | data file 2 | name | Eleanor | Willa | | data file 2 | age | 20 | 70 |
针对
SELECT * FROM table WHERE age > 50
,
利用min-max统计信息,很容易发现data file 1中没有满足条件的数据,因此data file 1就不会参与计算。
但是针对多维分析,如
name = 'LiLy' AND age > 30
,
利用
name
和
age
的min-max的统计信息分别对条件
name = 'LiLy'
和
age > 30
进行判断,得到data file 1和data file 2都满足条件。
然而,仔细分析data file 1和data file 2的数据,并不存在符合条件的数据,因此min-max过滤效果不太理想。所以通过引入合适的索引功能,可以提高data skipping的概率,提高查询性能。
- 首先探究索引类型
索引类型有多种,如 BloomFilter、Ribbon Filter、Dictionary Index、BitMap等 。为了满足多维分析场景,我们选择了 Range-Encoded BitMap ( Base-2, Bit-sliced Index),可适用于高基数场景,满足=、<、>、IN、BETWEEN等操作的多维分析 。
例如,对上面的name和age两列分别计算索引信息。由于name属于字符串类型,需要先进行字典编码再进行计算索引信息。采用Range-Encoded技术,根据数据的二进制相关信息以及对应的pos信息生成索引数据。
利用索引数据分析得到,同时满足
name = 'LiLy'
和
age > 30
的数据不在同一行,恰好可利用Range-Encoded的交并运算将数据进行过滤掉,因此data file 1不用参与计算。
也就是说,BitMap的交并运算可以更好地在复杂过滤条件的情况下过滤掉更多的数据文件。
- 接下来探究索引的粒度。
Iceberg提供的min-max,也是一种文件级别的索引。 文件级别的索引就是根据filter条件过滤掉不符合条件的data file 。文件级别的索引可适用于多种文件类型,但这种粒度比较粗,只要data file中有一条数据符合条件,该data file中的数据就会全部读取出来参与计算,从而影响SQL的查询性能。
对于Parquet、ORC的文件格式,提供有file chunk的概念(row group or stripe),我们完全可以按照row group / stripe粒度,对数据进行过滤。(为了方便描述,我们将row group和stripe统称split。)
如:SQL语句:
SELECT * FROM table WHERE col\_1> v1 AND col\_2 = v2
,其中对col_1字段和col_2字段已构建Index信息。现在利用索引对SQL语句作用。
SQL语句解析后,将符合条件的data file列表进行切分后,得到很多split的列表。利用索引,分析split中数据是否满足条件,如果不满足则跳过。如上图data file列表切分后,得到数万级别数量的split列表。
将索引数据作用在split1,发现split1中没有同时
col\_1> v1 AND col\_2 = v2
满足条件的数据,该split1中的数据就不会参与计算。最后处理后,只得到了少量的split列表,数据过滤度达到10%以上,查询性能有明显提升。
因此,采用row group / stripe级别的细粒度索引,可以过滤大部分数据。
/ 细粒度 索引实现逻辑 /
Iceberg元数据中manifest file中除了提供min-max等统计信息,还提供有split相关信息:
"split\_offsets":{"array":[4,...]}
,极大方便我们实现row group / stripe级别的细粒度索引。
1.提供索引的构建API
Iceberg中提供构建索引的API,引擎端调用该API即可实现索引构建功能。对于Spark 3.3及以上版本,已经提供有索引的SQL语句,在Iceberg的Spark模块实现Spark提供的索引接口即可。
2.构建索引
我们采用异步构建索引,不影响主线任务。也提供了增量构建索引功能,只对append数据进行构建索引。调用TableScan读取数据,按照data file的split offset切分数据,进行构建索引,并保存索引数据和对应的元数据信息。为了避免出现小文件存在,我们会进行索引数据合并。
3.索引文件存储
索引文件格式采用puffin格式,这是一种二进制格式。
`Magic Blob₁ Blob₂ ... Blobₙ Footer`
在Footer中保存每个blob的元数据信息。索引构建成功后,会生成类似于下面内容的文件。
/ 索引带来的收益 /
Range-Encoded BitMap适用于多维分析场景,且Ranger范围较小时,效果非常明显。下面我们基于Spark引擎性能测试。
- 构造1TB的SSB测试数据,分别在构建Index前后,对以下用例进行测试。
`Q1: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 19665277`
`Q2: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 19665277 AND lo_revenue = 2141624`
`Q3: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 19665277 AND lo_revenue >=10304000`
`Q4: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 21877827 AND lo_revenue >= 83800 AND lo_revenue <= 103800`
`Q5: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice > 21877827 AND lo_revenue >= 83800 AND lo_revenue <= 93800`
`Q6: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice >= 93565 AND lo_ordtotalprice < 93909`
`Q7: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice >= 93565 AND`
`lo_ordtotalprice < 91003562 AND lo_revenue >=904300 AND lo_revenue`
`<= 9904300`
上图展示了7条SQL语句分别在没有Index和采用Index情况下的执行时间。下图展示采用Index后,7 条 SQL 语句读数据的split数量。很明显读数据的split数量越少,Index效果越好。最糟糕的情况,所有的split都参数计算,这时和没有构建索引的效果类似。
- 采用SSB基准测试
由于SSB提供的测试场景,和Range-Encoded有利的场景,不太匹配,所以Index的效果并没有明显的效果。但也不会比不采用Index的效果差。如下面左图,分别是构建索引前后,SQL语句的执行时间,构建索引的优势并没有体现出来。右图中,可以看到所有的split都参与了计算。
/ 总结 /
根据上面的介绍,这里总结下Iceberg中索引实现的一些特征:
- 细粒度索引级别:提供RowGroup/Stripe级别的索引,可以更加精确的定位数据的查询范围,减少不必要数据输入,从而提高查询性能;
- 索引作用于执行端:查询任务被分配多个执行端,每个执行端只判断该节点上的RowGroup/Stripe数据是否符合即可;
- 适配多种引擎:索引构建后,可用于多种引擎;
- 提供异步构建Index,从而不影响主业务的进行;
- 适用于高基数&低基数场景,且占有存储空间小。满足范围查询、等值查询等场景。且范围越小,收益效果越明显。
本文提及的Iceberg索引功能已在火山引擎 EMR 产品中对外提供服务,欢迎大家试用体验。
--推荐阅读--