本文为 火山引擎湖仓一体分析服务LAS 嘉宾分享文章,文章会为大家讲解字节跳动 在Spark技术上的实践 ——LAS Spark的基本原理,分析该技术相较于社区版本如何实现性能更高、功能更多,为大家揭秘该技术做到极致优化的内幕,同时,还会为大家带来团队关于LAS Spark技术的未来规划。
本篇文章将分为四个部分呈现:
文 | 友军 火山引擎LAS团队
/ 整体架构 /
火山引擎LAS (LakeHouse Analysis Service)湖仓一体分析服务,包含批流一体 SQL,以及Spark/Presto多个计算引擎,其中LAS Spark作为高效的批式计算引擎,字节内部日均处理EB级数据,全覆盖离线ETL场景。
LAS Spark架构图如下所示, 整体基于Spark On K8S的云原生架构,底层容器服务为VCI,支持极致高效的弹性伸缩能力, 并且可按需付费,减少非必要开销,降低成本。
LAS Spark紧跟技术前沿, 目前已经从Spark2.3全面升级到了Spark3.0, 并且接下来会进一步升级到Spark3.2。 从社区的TPC-DS Benchmark口径来看,Spark2.3 到Spark3.0的版本升级,性能可提升36%,Spark3.0到Spark 3.2的版本升级,性能可提升15%。 从字节内部的实测来看,也分别可以带来16%和7%的性能提升。 因此,版本升级所带来的整体收益比较可观。
LAS Spark基于社区版本进一步进行了系列深度优化,目前在TPC-DS 10T Benchmark上来看, 性能为开源版本的2.76倍, 后续将展开介绍我们所做的相关工作。
/ 基本概念 /
为后续更清晰的介绍我们在Spark上做的系列优化,此处简单说明一些相关的基本概念。
首先,结合下面的示例图,一个SQL会被Spark引擎经过SQL语法解析、元数据绑定、执行计划优化等多个过程,最终生成右边的执行计划,其中包含TableScan、Filter、Exchange、Sort、Join、Exchange、Aggregate、InsertInto等多个算子。后续,执行计划会被分配到多个Task上并行执行。
●
Spark任务由一个Driver和多个Executor构成,其中Driver负责管理Executor及其内部的Task,整个SQL的解析过程也都在Driver中完成。Spark会将解析后的执行计划拆分成多个Task,并调度到Executor上进行实际计算,多个Task并行执行。
●
如下图所示,数据主要按照Table/Partition/File分级存储,其中Parquet File内部由多个RowGroup和一个Footer组成,RowGroup负责实际数据的存储,Footer存储每个RowGroup的min/max等索引信息。
上文向大家介绍了LAS Spark整体架构和基本概念, 那么LAS Spark如何在技术上实现性能的高精尖、功能的丰富度呢?
接下来将通过 算得更少、智能计算、算得更快、预先计算 4个方向讲解性能上的优化,另外从 自研UIMeta、深度融合数据湖 来说明功能的多样性。
/ 如何算得更少?/
Spark计算过程中,读取的数据量越少,整体的计算也会越快。大多数情况下,可以直接跳过一些没必要的数据, 即Data Skipping。
Data Skipping核心思路主要分为三个层面:
1. Range Partition
Partition Skipping是Data Skipping三种策略中效果最好的一种, 但在实际场景中分区表会遇到一个比较大的问题,即分区数据分布不均匀,对元数据服务和文件系统造成比较大的压力。
从下图可以看到,业务场景可能会按date和app做分区,但不同app的数据量是不一样的,同时app的枚举值可能会比较多。如图中的分区app=A和app=B的数据较多,但其他分区app=C、D、E、F、G数据量较少。
为解决分区数据不均的问题, 我们引入了Range Partition,本质上是将数据量较小的分区自动合并成一个物理分区。
比如我们对于A、B分区来说,本身数据量较大,则还是放到各自单独的物理分区。但对剩余的分区,我们将根据指定的规则进行分区合并。其中C和D分区的数据合并到 app=~D分区,E、F、G三个分区合并到app=~分区。
2. LocalSort
RowGroup Skipping作为细粒度的Data Skipping策略,数据分布对于RowGroup Skipping的影响较大。 数据分布越紧凑,min/max索引越精确,RowGroup Skipping效果越好。
如下左图,数据分散存储,RowGroup1中的a列分布在[2, 78],RowGroup2中的a列分布在[1, 99],对于过滤条件a=10,无法过滤任何一个RowGroup,需要读取整个文件数据。
为此,我们引入LocalSort。Spark引擎会在数据写入Parquet文件之前基于指定字段做一次本地排序,这样能将数据分布更加紧凑,最大发挥出Parquet Footer中 min/max等索引的。如下右图,经过LocalSort处理之后,数据会基于a列进行排序,RowGroup1中的a列分布在[1, 12],RowGroup2中的a列分布在 [65, 99],对于过滤条件a=10,可以过滤掉RowGroup2,仅需要读取RowGroup1即可。
LocalSort在提升RowGroup Skipping效率的同时,因为数据的紧凑分布, 压缩率更高,可减少40%的存储。
3. 合并小文件
从数据分布的角度继续挖掘,LocalSort更多的是针对文件内部数据分布做调整。 但如果存在小文件问题, 数据分布在多个文件中,每个文件可能最多只存在单个较小的 RowGroup, 此时LocalSort也收效甚微。
如下左图,数据存储在5个Parquet文件,每个文件中仅存在单个RowGroup,每个RowGroup的a列分布均包含10这个值,无法做RowGroup Skipping,因此需要读取全部5个文件的所有RowGroup数据。
为此,我们需要进行小文件合并。如下右图,5个小文件被合并成了一个大文件,此时LocalSort又可以很好的工作。同时, 可以解决小文件带来的其他问题,尤其是可以降低文件系统的压力。
合并小文件主要是两种思路: MergeFile和FragPartitionCompaction, 使用场景和具体实现均不同。
4. Prewhere
到目前为止,RowGroup Skipping是最小粒度的Data Skipping策略, 我们做了系列优化,效果显著。
此时,我们不得不开始进行RowGroup的读取。Spark在读取RowGroup的时候,实际会按照batch的方式读取,即每次会读取一批数据。在这个过程中,我们引入了 Prewhere优化,其在RowGroup Skipping的基础上进一步基于batch粒度的过滤。
具体而言,如下图所示,我们会拆分FilterReader和NonFilterReader两个 Reader,首先会基于FilterReader读取a列的一批值,并判断是否能够Match上Data Filter,如果能够Match则进一步使用NonFilterReader读取其他列,最终将两部分数据拼接成完整的batch返回给上层。如果没有Match,则直接Skip这个batch。
通过Prewhere,我们实际将DataSkipping效率进一步提升,最终可带来30%左右的性能提升。
5. Dynamic BloomFilterJoin
以上介绍的各种DataSkipping优化其实都需要基于Data Filter,通常是SQL中显示指定的Filter,比如where a=10, 但对于没有指定Filter的场景,这些 DataSkipping优化手段都将无用武之地。
Dynamic BloomFilterJoin 主要思路是在已有Data Filter基础之上动态构造 Filter, 进一步做DataSkipping,以此提升查询性能。当然为了避免引入额外损耗,仅适用于部分Join场景。
如下图所示,两表Join,左表数据量较大,右表数据量较少,则可以提前将右表join key读取出来,在左表动态生成一个Filter算子,其效果相当于:where id in (select event_id from table_2)。在接下来的Join阶段,左表实际参与Join的数据量将会减少。
/ 如何智能计算?/
Shuffle作为Spark计算过程中开销最大的一个阶段,同时也是查询优化需要关注的重点。 常见的Join、Aggregate算子均需要要引入Shuffle阶段。
如下图所示,对于Join,数据首先按照join keys (id, event_id)将相同的记录划分到同一个partition(task)中,然后完成每个partition内部的join,最终即可获得全局的join结果。
Join阶段存在几个比较常见的问题:
1. Bucket
优化Shuffle最直接的方式就是消除Shuffle。 我们引入Bucket特性,其核心思路就是一次Shuffle,多次消除。数据在写入过程进行Shuffle和Sort,即数据会按照指定的列进行数据分布。
查询时,Spark引擎会检测到当前的数据分布,直接消除不必要的Shuffle和Sort阶段,直接后续的计算,从而避免Shuffle带来的一系列问题。
如下图所示,写入阶段,左表会按照id列进行Shuffle + Sort,右表按照event_id列进行Shuffle+Sort;查询阶段,左表id Join右表event_id,正常会两边均存在 Shuffle+Sort进行数据重分布,但数据其实已经提前按照预期方式分布好了,因此可以直接消除。
Bucket所能带来的收益是显而易见的, 为了覆盖更多场景,我们做了较多努力,其中包括支持倍数Bucket Join、SparkSQL Bucket Join与Hive Bucket Join完全兼容、Bucket Join支持超集等。
2. AQE Skewed Join
当然,Bucket所适用的场景有限,并且即使引入Bucket也只能消除查询时的 Shuffle,写入阶段的Shuffle依然无法避免。 为此,我们需直面Shuffle相关问题,数据倾斜是Shuffle的痛点问题,尤其是Join场景更为常见。
以下图为例,数据倾斜情况的下的Join,Join key往往存在个别固定key的记录条数过多,单个task会被分配到大量数据,导致其运行时间远超其他task,即长尾 task,从而拖慢整个作业的运行。
如下图所示,A表inner joinB表,并且A表中第0个partition(A0)是一个倾斜的 partition(id=10的记录有10w条),正常情况下,A0会和B表的第0个partition(B0)发生join,由于此时A0倾斜,task0就会成为长尾task。
为解决数据倾斜问题, 常规方法是人工发现倾斜数据,然后手工改动SQL逻辑去处理倾斜数据,开发成本较高,且效果不明显。 为此,智能计算尤为重要。
Spark AQE(Adaptive Query Execution)SkewedJoin能利用运行阶段的实时统计数据自动识别并处理数据倾斜,在用户无需感知和介入的情况下,实现数据倾斜的自动发现和处理,大大降低了用户处理数据倾斜的成本。 实际场景测试,性能可提升 35%。
Spark AQE在执行A Join B之前,通过收集上游stage的统计信息,发现partition A0明显超过平均值的数倍,即判断A Join B发生了数据倾斜,且倾斜分区为 partition A0。
Spark AQE会将A0的数据拆成N份,使用N个task去处理该partition,每个task只读取若干个MapTask的shuffle输出文件,如下图所示,A0-0只会读取 Stage0#MapTask0中属于A0的数据。这N个Task然后都读取B表partition 0的数据做join。这N个task执行的结果和A表的A0 join B0的结果是等价的。
不难看出,在这样的处理中,B表的partition0会被读取N次,虽然这增加了一定的额外成本,但是通过N个任务处理倾斜数据带来的收益仍然大于这样的成本。
社区版本Spark从3.0版本开始支持了AQE以及对Join数据倾斜问题的自动优化功能,虽然已经能够解决一部分问题,但在我们的实践中,还是遇到了几个问题:
一是统计信息不准确, 导致无法识别数据倾斜; 二是切分不均匀导致处理效果不理想 ; 三是不支持复杂场景, 例如同一个字段发生连续join。不管是AQE SkewedJoin不生效,或者倾斜处理效果不理想,都会导致作业整体耗时异常甚至失败。
为此,我们做了以下四方面的优化, 极大提升覆盖范围,在字节内部90%+存在数据倾斜的线上作业,在用户无感知的情况下被自动处理。
3. AQE ShuffleHashJoin
Spark Join主要有三种算法实现: BroadcastHashJoin、ShuffleHashJoin以及 SortMergeJoin。
其中BroadcastHashJoin性能最高,但仅适用于小表场景,要求右表默认<10M;ShuffleHashJoin其次,覆盖场景较BroadcastHashJoin更多,性能较SortMergeJoin更好,通过Shuffle + HashJoin的方式;SortMergeJoin耗时最为严重,需要Join两边先做Shuffle + Sort,然后进行Join,一般作为Join兜底策略。
当全部满足如下两个硬性条件的情况下, 才可能会走ShuffleHashJoin逻辑,但多数执行计划生成阶段很难满足,因此会走到SortMergeJoin逻辑。
Spark AQE ShuffleHashJoin可利用运行阶段的实时统计数据自动调整更合适的Join类型,将SortMergeJoin转换成ShuffleHashJoin。
如下图所示,右表经过filter之后,数据量大幅减少,在Join之前进行Spark AQE处理,即可满足ShuffleHashJoin条件。 基于我们对Spark AQE ShuffleHashJoin 的增强,字节内部可以覆盖14%的线上作业,性能提升15%。
/ 如何算得更快?/
1. 语言层面
开源Spark在写入数据阶段,会通过Parquet Writer将数据写到Parquet文件内,内部视角来看,主要分为三个部分:Encode、Compress、Sync,整个过程均在JVM 内部完成,存在较多性能损耗,比如需要额外的内存拷贝、内存管理等。
为此,我们支持了C++语言版本的Parquet Writer,将整个执行过程放在JVM外部。InertInto算子直接通过GNI调用将数据直接写到Parquet文件内。 经过实际场景测试,性能提升35%左右。
2. 贴近硬件
基于我们的调研,simdjson使用了SIMD指令来优化JSON解析操作,号称比已知的其他JSON解析快至少2.5倍,并且已经在Microsoft FishStore, Yandex ClickHouse, Clang Build Analyzer中使用。
而对于字节内部的场景中,有大量的JSON解析操作。因此,我们决定引入SIMD替换 Spark使用的Jackson,以此提升查询性能。 最终通过引入simdjson,Spark查询性能提升了15%。
3.算子层面
算子层面的优化较多,此处简单仅列举几个优化实例:
Spark Join的实际执行主要有三种算法策略:BroadcastHashJoin、ShuffleHashJoin以及SortMergeJoin,分别覆盖不同的Join场景。但其中,开源版本的ShuffleHashJoin不支持Codegen。
Codegen是Spark Runtime优化性能的关键技术,核心在于动态生成Java代码、即时Compile和加载,把解释执行转化为编译执行。Spark Codegen分为Expression级别和WholeStage级别,分别针对表达式计算和全Stage计算做代码生成,都取得了数量级的性能提升。 为此我们拓展ShuffleHashJoin支持了Codegen能力,从而将ShuffleHashJoin的性能进一步提升12%。
对于Limit和Ordered Limit下推我们已经做了一些优化,并且产生了不错的价值,比如: 1.将Ordered Limit转换成TakeOrderedAndProject / 2.将Window + Rank转换成TopK Per Window/Group
通过分析,我们可以对Ordered Limit做进一步优化,将其下推至Aggregation。 对于下面的AGG+ORDER+Limit场景的queries,可以将Ordered Limit限制下推到Aggregation中:
`select a, b, c, agg_f0, agg_f1, agg_f2`
`from t`
`group by a, b, c`
`order by c, b, [agg_f0]...`
`limit 100`
`-- 限制条件: order by 的前缀字段需要是 group by 字段的子集`
一般来讲,上述的Query会生成Agg+Sort+Limit算子,其中Sort+Limit算子会被优化成Top,也即Agg+TopK,其中Agg算子不会感知到任何limit或者order信息。
但仔细观察上述查询特征,order by中的最前面几个字段是group by字段的子集,这些字段在Partial聚合过程已经确定,因此我们可以利用Orderd Limit信息,在Partitial聚合阶段就应用这部分信息,减少数据聚合。通过PushedOrderLimit优化,其命中性能可提升2.5%。
/ 如何预先计算 /
在计算的过程中,如果一些耗时的过程已经被提前计算好了,引擎便可拿到计算结果直接返回,性能将得到大幅提升,这便是预计算。
预计算的优化思路,本质上是以空间换时间, 通过将一些重复pattern(如子查询、表达式)提前计算,并将结果存储到文件系统,由计算引擎在查询时自动路由到这些计算结果,并直接返回。一次计算,多次复用。
以下主要介绍两种方式:物化列和物化视图。
1. 物化列
物化列主要通过预计算的方式,解决高频表达式重复计算的问题。
原生Spark在查询嵌套类型(Map/Array/Struct/Json)列中的某一子列时,首先会读取整个列的数据,然后在内存中提取出所要查询的子列。例如下图中的普通读取流程,people列是Map类型,用户在查询people.age子列时,需要将整个people列的数据完整读取到内存中,然后提取age子列的值。
这种实现方式会导致如下问题:
2. 物化视图
物化视图主要通过预计算的方式,解决高频子查询重复计算的问题。
物化视图主要分为两个部分:构建和查询。 其中构建如下图SQL所示,以创建普通视图的方式创建物化视图,不同的点在于,物化视图创建之后会自动计算SELECT结果并存储到物化视图实体表中。
查询时,Spark引擎会自动判断是否命中物化视图,并将执行计划rewrite到直接查询物化视图实体表,以此大幅提升查询性能。 通过复用重复子查询的方式,字节内部可覆盖20%+的ad-hoc查询,平均性能提升240%。
/ 自研 UIMeta /
原生Spark History Server的架构简图如下左图,Spark任务运行期间会不断产生包含运行信息的SparkListenerEvent,Event会被所有注册在ListenerBus中的Listener监听,其中EventLoggingListener会将Event序列化为Json格式不断追加写到EventLog HDFS文件中。
Spark History Server会扫描指定的HDFS路径中的EventLog,逐行Replay整个文件中的Event,将其中包含的运行信息变化反映到内存中的KVStore中,而WebUI会和KVStore交互,从而实现页面的渲染。
原生Spark History Server架构存在两个比较大的痛点问题:
其一,EventLog存储开销大, Spark History Server依赖Spark引擎将运行中的Event信息全部记录到FileSystem中,对于复杂作业和长作业而言,EventLog量较大,甚至可以达到单任务GB级别;
其二,EventLog解析性能差, Spark HistoryServer采用后台Replay EventLog的方式还原Spark UI,相当于把Spark引擎的事件全部重放一遍,延迟较高,特别是作业较多或者较复杂的情况下,延迟可达分钟甚至十分钟级别。
为此,我们完全自研了UIMeta用于替换原生的Spark History Server。 全新的架构简单来说,就是不再存储中间运行信息,直接将Spark任务的最终状态信息序列化存储到UIMetaFile中,History Server读取UIMetaFile经反序列化得到Spark任务状态可直接用于页面渲染。
从效果上来看,基于自研的UIMeta架构,平均文件大小减少90%,解析时间缩短75%。
/ 深度融合数据湖 /
LAS为湖仓一体分析服务,为了降低上层使用数据湖的使用成本,LAS Spark融合了数据湖Apache HUDI能力,并且进行了深度封装。
主要体现为三个层面的能力:
1. 完全覆盖离线数仓场景, 包括对海量数据的高效Insert/Overwrite能力,离线场景可与Hive写入性能完全对齐,同时完全兼容原有的HSQL;
2. 具备CRUD能力, 可通过SQL对历史数据进行Delete/Update等操作;
3. 具备Upsert和Append能力, 支持事务型数据(主键)导入、以及日志型数据(非主键)导入。
LAS Spark未来规划主要分为三个方向:
第一,LAS Spark将从现在的Spark3.0升级到Spark3.2。 从社区的TPC-DS Benchmark来看,版本升级的性能提升在15%左右,同时,从我们在字节内部实际场景测试的结果来看,也能至少带来7%左右的性能提升,因此这个升级所能带来的收益是比较明确的。
第二,LAS Spark将继续进行极致的优化,进一步提升Spark性能。 我们过往对Spark优化带来的收益是比较明显的,目前在TPC-DS 10T Benchmark上,LAS Spark性能是开源版本的2.76倍。后续我们将从多个方面尝试开展优化工作,比如:增加文件索引,以此增强File Skipping的能力;拓展C++向量化引擎,全面覆盖Spark的各个阶段;从硬件层面探索Spark On GPU的可能性等。
第三,LAS Spark将发力智能数仓, 主要基于智能参数配置(PBO)、智能物化列/物化视图构建、智能数据分布(LocalSort、Bucket、MergeFile)等手段,让用户上手更快、查询更快、成本更低。
以上就是字节跳动在Spark的实践, 目前均已通过火山引擎湖仓一体分析服务LAS产品对外服务 ,欢迎对这方面有需求、感兴趣的用户都可以积极地来体验一下我们的LAS湖仓一体分析服务。
产品介绍
火山引擎湖仓一体分析服务 LAS
是面向湖仓一体架构的Serverless数据处理分析服务,提供字节跳动最佳实践的一站式 EB 级海量数据存储计算和交互分析能力,兼容Spark、Presto生态,帮助企业轻松构建智能实时湖仓。
「 点击文末“阅读原文”,可顺滑体验,或后台回复数字“4”了解产品」