本文整理自字节跳动基础架构的大数据开发工程师魏中佳在 ApacheCon Aisa 2022 「大数据」议题下的演讲,主要介绍 Cloud Shuffle Service(CSS) 在字节跳动 Spark 场景下的设计与实现。
作者|字节跳动基础架构的大数据开发工程师-魏中佳
在大数据场景下,数据 Shuffle 表示了不同分区数据交换的过程,Shuffle 的性能往往会成为作业甚至整个集群的性能瓶颈。特别是在字节跳动每日上百 PB Shuffle 数据的场景下,Shuffle 过程暴露出来了很多问题,本文会逐个展开此类问题并介绍在字节跳动的优化实践。
External Shuffle Service
首先来看,在 Spark 3.0 及最新的 Spark 3.3 中,External Shuffle Service(以下简称 ESS)是如何完成 Shuffle 任务的?
如下图,每一个 Map Task,从 Mapper 1 到 Mapper M 都会在本地生成属于自己的 Shuffle 文件。这个 Shuffle 文件内部由 R 个连续的数据片段组成。每一个 Reduce Task 运行时都会分别连接所有的 Task,从 Mapper 1 一直到 Mapper M 。连接成功后,Reduce Task 会读取每个文件中属于自己的数据片段。
上述方式带来的问题是显而易见的:
- 由于每次读取的都是这个 Shuffle 文件的 1/R,通常情况下这个数据量是非常非常小的,大概是 KB 级别(从几百 KB 到几 KB 不等),这样会给磁盘(尤其是 HDD )带来大量随机的读请求。
- 同时,大家可以看到,Reduce 进行的 Shuffle Fetch 请求整体看是一个网状结构,也就是说会存在大量的网络请求,量级大概是 M 乘以 R,这个请求的数量级也是非常大的。
这两个问题随着作业规模的扩大,会带来越来越严重的 Shuffle Failure 问题。Shuffle Failure 意味着超时,Shuffle Failure 本身还有可能导致 Stage 重算,甚至导致作业失败,严重影响批式作业的稳定性,同时还会浪费大量的计算资源(因为 Fetch 等待超时的时候,CPU 是空闲的)。
Spark 在字节跳动的应用
在字节跳动内部,Spark 作业规模较大:
- 日均 100 万左右个作业
- 日均 300 PB Shuffle 数据
- 大量作业签署 SLA,对稳定性要求非常高,超时严重还会严重影响下游
- 大量 HDD 机器和少量 SSD 机器
- 大量在线业务低峰出让的资源,可用磁盘空间非常小,需要把存储拉远
下图是字节跳动内部一个 Spark 作业的 Shuffle Chunk Size 情况。这个作业只有 400 兆的 Shuffle 数据,但是它的 M 乘以 R 量级是 4 万乘 4 万。简单算一下,在这个例子中,平均的 Fetch Chunk 大小甚至远远小于 1K ,量级是非常非常小的。
再看一个混部集群中 Spark 作业的 Shuffle Fetch-Failure 的实时监控。下图监控中每个点的含义是——在这个时刻处于 Running 状态的 Application 的 Fetch-Failure 次数的总和。
上文提到,每一个 Fetch-Failure 都可能意味着一定时间的超时等待和计算资源空跑,同时还可能意味着触发 Stage 重算,甚至作业的失败。
所以,解决这个问题对于提升 Spark 的资源利用率和稳定性都具有重要意义。
问题总结
综上所述,ESS 在字节跳动业务场景下面临如下问题:
- Chunk Size 过小导致磁盘产生大量随机 IO,降低磁盘的吞吐,引发 Chunk Fetch 请求的堆积、超时甚至引发 Stage Retry;
- 磁盘 IOPS 无法在操作系统层面进行隔离,Shuffle 过程中不同 Application 作业会互相影响;
- 在离线混部场景下,我们希望利用在线服务业务低峰期的 CPU,但缺少对应的磁盘资源。
针对上述问题和需求,我们先对 ESS 进行了优化。
参数调优
首先是参数调优。为了实现参数调优,我们研发了一个旁路系统,如下图。
- 首先,采集 Spark、Yarn 运行时的 Event Log 作为数据源;
- 其次,使用 Flink 对原始数据进行 Join 和计算,得到作业某个 Stage 的 Shuffle 量、Task 数量等指标;
-
针对上述指标,
- 一方面,在计算过程使用可插拔的启发式规则对单个作业进行诊断;
- 另一方面,同时存在着大量的周期作业重复运行生成该作业的历史画像;
- 最终,结合历史画像与特征诊断信息对特定作业进行自动调参。
下面是一个自动调参的例子。经过若干次调参的迭代后,最终调整了两个参数并达到稳定状态:
- spark.sql.adaptive.shuffle.targetPostShuffleInputSize:64M->512M
- spark.sql.files.maxPartitionBytes:1G->40G
最终效果如下图,
因为我们增大了单个 Task 处理的数据量,恰好这个作业又使用了 Combine 算子,所以它整体的 Shuffle 量有所降低,从 300G 降低到了 68G。
因为增大了这个 Chunk Size,也就是降低了这个作业的并发度,从而减小了整个 Shuffle 过程中的 IOPS,避免了长时间的 Blocked Time。如截图所示,大家可以看到就是在截图的指标里边, Shuffle Read Blocked Time 最大从 21 分钟降到了 79 毫秒,整体这个作业的端到端时间也降低为原来的一半,从 40 多分钟降到了 20 分钟。
以上是参数调优对这一个作业的影响,实际上这一个作业的调优还会影响其他作业。在调参之前,21 分钟的 Shuffle Read Blocked Time 意味着磁盘是忙碌状态,在这个磁盘上的其他作业都会受到影响。当前在线上,我们针对 Shuffle 进行自动调参的作业大概有 2 万个,大量数据的验证表明,调参优化的效果是很不错的。
Shuffle 限流
Shuffle 限流主要解决的是磁盘的 IOPS 不易隔离的问题。我们通过对低优但高负载的作业进行限流,来减轻对同节点上高优作业的影响。整体的思路是
当我们发现 ESS 响应请求的 Letency (延迟)升高到一定程度时,比如 10 秒或 15 秒,我们就认为这个节点当前处于异常状态,这时 ESS 就会针对内部正在排队的 Fetch 请求,按照 Application 分类进行分析,综合当前堆积的排队长度和作业的优先级,给每个作业划定一个合适的长度范围,超过范围的作业会被 ESS 告知对应的 Shuffle Client 进行休眠,暂停数据请求,通常暂停1~2分钟,这时该作业的客户端就进入休眠状态,进行等待,同时原本分配给它的 ESS 的服务能力提供给更高优或其他不受影响的作业。
通过 Shuffle 限流,我们实现了以下目标:
- 正常任务打开限流没有影响,不会触发流量限制;
- 异常任务开启限流,不会让任务变慢或失败,大概率会使得任务变快 (限流减少重试,减轻 Server 压力);
此处有必要解释一下,为什么任务会变得更快呢?原因在于当 Latency 升高时,Chunkr Fetch 开始堆积,大量排队,此时往往容易形成恶性循环,请求过来-开始排队-超时-超时后重试-重试后继续排队-继续超时,Fetch 请求可能永远都得不到正常响应。
但当我们开启限流之后,我们主动地让客户端等待,而非发一个请求过来在服务端排队,由此就可以避免大量无效的 Fetch 请求。也正因如此,大概率即便是被限流的作业也会变得更快。
- 不同优先级的任务,在限流情况下,高优先级任务允许更高的流量;
上文提到,我们是根据排队的数量,及作业的优先级综合地划定一个合适的范围。在划定这个范围的时候,更高优的作业大概率是不会被限流的。
- 异常节点快速恢复,2min~5min 能恢复正常。
结合第二点,因为我们让一部分发送大量 Fetch 请求的作业的客户端进行了等待休眠,所以异常节点会得到一个非常快速的恢复,大概 2~5 分钟就能恢复正常,恢复正常后,就可以给所有的 Fetch 继续提供服务。
我们针对 ESS 存在的问题进行了上述优化,但是 ESS 的 Fetch Based 的整体思路决定了其存在不可避免的性能瓶颈(随机读、写放大)。针对这些问题,我们自研了 Cloud Shuffle Service(以下简称CSS),接下来从基本思路、整体架构、读写过程、性能分析四个方面阐述 CSS 的设计与实现。
基本思路
Cloud Shuffle Service 的整体思路是 Push Based Shuffle,在 Shuffle Write 阶段,直接把相同 Partition 的数据通过网络写入到远端的一个 Buffer 并最终 Dump 到文件中,在 Shuffle Read 阶段,可以通过连续读的方式直接读取已经合并好的文件。对该思路进行拆解,我们可以概括为以下三个方面:
第一个问题是备份。为了解决我们在背景中提到的大量随机读请求的问题,我们需要在 Reduce 读取前使用 Push Shuffle 的方式将数据聚合到一起。由于是远程聚合,所以还可以顺便解决本地磁盘空间不足的问题。
然而,聚合虽然可以解决随机 Shuffle 的问题,但也会带来一个新的问题——数据丢失的成本比原来更高。原因在于,以前每个 Task 生成自己的文件虽然没有备份,但这个文件丢失的成本是非常低的,只需要单个 Task 重算即可。但当我们把所有 Map Task 同一个环节的数据都聚合到一起时,一旦发生数据丢失,就需要重算整个 Stage。
因此我们需要对这些数据进行备份。备份的时候,我们发现 HDFS 太重了,它的写入速度满足不了我们的需求,随后我们就采用了双磁盘副本的方式,通过自己管理两个客户端双写来解决这个问题。
第二个问题是 IO 聚合。IO 聚合对于读提升是显而易见的,因为它将大量的随机 IO 变成了极少数的连续 IO,但是在写入速度上就有可能会受影响。因为写入的时候原来是直接写本地盘,现在变成需要通过网络请求来写数据。
同时因为可能需要多个 Mapper 去写一个 Buffer,这个时候就有可能在写 Buffer 的时候会产生锁的争抢,这些都是写入时的代价。这就需要我们去花更多的时间在写入时去做优化。
所以面临的第三个问题是写入速度。在写入速度的优化上,我们选择了主从 InMemory 副本,全部都是异步刷盘。即在数据写入到服务端的内存后就快速返回主从,写入到内存中的数据通过异步的方式去刷到磁盘里面。这其中有一个风险,即如果主从同时刷盘失败,就会造成数据丢失。主从只有一个刷完失败的话,有一个磁盘的文件数据丢失,另外一个磁盘的文件是没有丢失,但因为可能后续可能继续运行一段时间,可能将来完整的文件都会丢失,虽然不是同时丢失,但可能会在不同的时间丢失数据,这样的话就会造成整个 Stage 重算。但我们认为这个概率是非常非常低的,我们以极小的失败几率换取更高速的写入速度是完全值得的。事实也证明,这个思路是正确的,在整个 CSS 的应用过程中,到目前我们还没有在线上观察到任何一起数据丢失的问题。
整体架构
CSS 整体架构
上图右侧是 CSS 的整体架构,主要分为4个部分:
- Zookeeper WorkerList:我们使用 zookeeper 来提供服务发现的功能;
- CSS Worker [Partitions / Disk | HDFS ] :管理磁盘并提供 Shuffle Push 服务节点。每一个机器上都会启动 Worker 进程,当收到启动指令时,它就会向 Zookeeper 进行注册,并定时更新上报信息;
-
Spark Driver:集成启动 CSS Master 和 ClusterName + ZK
- CSS Master 的作用是规划和统计,Master 从 Zookeeper 中拉取所有 Worker 的信息,并对 Worker 进行分配,然后把 Worker 和 Shuffle 以及每个 Partition 的对应关系通知到 Executor
- ClusterName + ZK:通过配置的 ClusterName 在 ZK 中寻找对应的 Workerlist
-
CSS ShuffleClient:Writer 和 Read 的集合,负责跟 Worker通信,读取数据或写入数据。
读写过程
下面我们来看读写过程,下图是完整的写入过程。
写入过程
首先 Mapper 从 Master 中获取分配好的 Worker List 及它们与 Partition 的对应关系,也就是上图中 P0 对应的 Worker 0 和 Worker 1。
随后 Mapper 开始写数据,正常的话它会把数据写入到内存,然后返回,由 Worker 异步地把数据刷到磁盘中。
直到某一次 Worker 刷数据的时候发生异常,数据没有写到磁盘中,比如说此时磁盘突然坏了。此时,实际上这个请求已经返回给了 Mapper,Mapper 会认为它的两次写都是成功的,直到 Mapper 下一次写的时候,因为 Worker 已经把异常记录到了内存里,等 Mapper 下次写的时候,Worker 就会向 Mapper 返回上次写入失败的信息。
这时 Mapper 意识到它上次写入的数据是失败,这时他就会向 Master 再申请新的一个 Worker 就是我们看到的 Worker 3,再继续进行写入请求。
大家可以注意到,在第一个文件也就是 P0-0 里,实际上它保存了失败前所有的数据,因此这个过程中实际上并没有数据丢失,最终生成的成功的完整文件就是 P0-0、P0-1 和 P0'-1 三个文件。
此处有必要提到,实际上 P0-0 里是包含了一份多余的信息,即 P0-1 的第一条数据。下面我们说读取过程的时候也会提到。
读取过程
上文提到,正确的文件有三个,P0-0 是唯一一个正确的文件,P0-1 和 P0'-1 可以任选其一。
这些 Mata 信息其实都记录在 Driver 的 Master 里,然后 Reduce 会根据这些文件的 Banner 信息选择合适的文件来读取。
值得一提的是去重,除了写失败可能导致的数据重复之外,因为 Spark 支持推测执行,所以还可能存在其他的重复问题,所以我们最终使用了 Mapld、Attemptld 和 Batchld 来共同进行数据去重。
性能分析
1TB 级别 TPS-DS 测试结果
CSS 开发完成后,我们用 TPS-DS 进行了测试。上图是 1TB 级别的 TPS-DS 的测试结果。
通过上图可以看到,相比原生的 ESS,使用 CSS 在查询时间上有整体30%的提升。在个别 Query,如 q38 和 q35,提升是非常明显的,大概有 60% 到 70%。
上面是从线上作业中选取的一个具体案例。可以看到,在使用原生的 ESS 时,读取时间是 20 分钟左右。使用 CSS 后,因为 CSS 使用了更高压缩比的压缩算法,所以整体的 Shuffle 数据量减少了很多。同时因为 IO 聚合读取的时间也非常快,降低到了秒级,三个 Stage 加一起可能都不到一分钟,相比是原来读取时间的 1/20。
上文分析了 Cloud Shuffle Service 的设计和实现,下面讲一下 Cloud Shuffle Service 的应用实践。
CSS 在字节内部已经推广,最新的数据显示:
- CSS Worker 数量 1000+,对应1000多台机器
- 部署模式灵活:Shell、Yarn、K8S
- 支持作业类型众多:Spark、MR、Flink Batch
- 接入作业数 6w+
- 单日 Shuffle 量 9PB+
集群部署&作业接入
构建运维接入管理平台(CSS-Coordinator)
为了降低接入门槛,我们构建了一个运维接入管理平台,叫作 CSS-Coordinator,他提供了如下功能:
- 提供用户作业无感知接入功能:直接帮用户注入 CSS 相关的参数;
- 提供 Cluster|Queue|Job等维度的灰度模式:支持以各种纬度接入作业,用户仅需配置对应的接入纬度,该维度下的所有作业都会接入到 CSS 中;
- 异常作业的监控告警:作业运行结果会上报到 Coordinator 平台,对于运行失败的作业会进行报警
- 历史 Shuffle 作业的 HBO 优化:平台在作业接入过程中会针对作业历史的 Shuffle 数据量进行评估,当 CSS 集群资源不足时会拒绝大 Shuffle 的作业接入 CSS;
此外,我们设计了作业 Shuffle FallBack 机制:
- 设置 spark.yarn.maxAppAttempts=2
- 保留用户原始配置
- 作业 CSS 失败自动 FallBack 到原生 Shuffle
踩坑记录
在实践的过程中,我们也踩了很多坑:
CSS 服务相关
-
超大 Register Shuffle 启动缓慢
- 在最初的设计中,Register Shuffle 会对所有 Worker 进行初始化工作。因此,在规模比较大的 Shuffle 的场景下,Register 就会非常慢,用户启动一个 Stage 可能需要 2-3 分钟。
- 后来,我们对 Register Shuffle 进行了精简,把 Worker 的初始化动作改成了 Lazy 模式,即只有第一次数据 Push 过来的时候,Worker 才针对这一个作业的 Partition 进行对应的初始化工作。在 Register Shuffle 的时候,只进行 Worker 和 Partition 之间的分配,大大缓解了超大 Register Shuffle 启动缓慢的问题。
-
Client 发送速率过快
- 因为我们是一个有状态的服务,无法把 QPS 通过负载均衡的方式降下来,只能通过一些负反馈的方式让 Client 降速,即当 Server 的服务能力无法满足请求时,就让请求在客户端等待。
- 后续我们尝试了很多方法,包括 Spark 原生的 Max Inflight 等,但效果都不太好,最终我们选择了 Netflix 的一个三方库。
- 大致原理是,针对最近一段时间的 RTT 做一个 Smoth 处理,得到一个理论的 RTT,然后拿当前的 RTT 与理论 RTT 做比较,如果小于这个值的话,就在 QPS 上做爬坡。如果大于这个值的话,系统就认为现在的 Server 有排队现象,然后就启动限流。
-
服务热上线,用户如何不感知
- 在 CSS beta 的过程中,每天都会有新的 Commit 合到主分支,每天也会产生新的问题。但是公司内部的 Spark 发展周期是比较长,跟 CSS 的迭代周期无法 Match。
- 最终,我们在 Spark 里只集成了一个最简单的接口,其他的实现都放到 HDFS 上,这样就把公司内 Spark 版本的周期与 CSS 的版本周期做了解耦,CSS 就可以做到小步快跑。在小步快跑的过程中,那我们解决了大量的问题。
Spark 集成相关
接下来看2个与 Spark 集成相关的问题:
-
AQE Skew-Join 读放大问题
- AQE Skew-Join 原理图
- 上图是 Spark 社区提供的 AQE Skew-Join 原理图,根据这个原理,当 Spark 发现某一个 Partition 数据非常大,远超其他 Partition 的时候,它会主动把该 Partition 的数据拆分成多份数据,然后分别去做 Join。这样最终每个 Task 处理的数据量就会更平均,整体作业的运营时间也会变短。
- 设想一下,当我们把 Map 的数据全部聚合起来后会发生什么?一个文件会读很多遍,每次读的时候还会 Skip 很多无效数据。举个例子,一个倾斜的 Partition 上有 1T 数据,Spark 想把它拆成十份去读,这时会发现什么呢?就是这个被聚合后 1T 的文件要读 10 遍,且每次有 1/9 读到的数据都是 Skip 的。
- 面对这个问题,我们的解决办法也非常朴素,就是不再盲目地追求生成一个非常大的连续文件。实际上我们要解决的就是随机读的问题,所以只要文件足够大就可以。因此,我们把文件默认按照 512G 的大小进行切分,一个大的 Partition 数据最终会被切分成若干个小文件。
- 比如上文的例子,1T 的数据会被切分成很多份 512G 的文件,当 AQE Skew-Join 触发时,就不必把一个超大文件读很多遍,只需把这些 512G 的文件按需分配给不同的 Task 进行 Join 就可以。
-
Task Huge Partition 导致 Executor 内存占用过大
-
在最初的设计中,基于 Push 的特性,我们是不想做排序的。最初的思路类似于 By Pass 的实现思路,给每一个 Partition 准备一个 64k 的 Buffer,一旦这个 Partition 的 Buffer 写满,就发送出去。后来发现当 Partition 数量非常大的时候,Buffer 就会占非常大的空间。
-
假设一个极端的场景,当有 10 万个Partition 时,如果一个 Partition 的 Buffer 是 64k,那占用的内存还是非常大的。所以最终我们还是回到了 Sort 的路线,即把数据整体在内存里写满之后,再进行 Source Build, 那么 Spill 也不会再写到磁盘里,Spill 之后也不需要 Merge 把 Spill 的数据发送出去。
-
这样做还可以降低 Push 的请求数,同一个 Worker 不同的 Partition push 数据的时候,就可以把它们放到一起放到 Push Request 里。
-
收益分析
下面是我们线上的一些实际收益。第一个例子是某业务某个小时级的任务,这个任务的规模很大,有1.2 万 Cores,在混部队列上平均需要 2.5 小时。使用 CSS 之后,平均速度提升到了 1.3 个小时,提升 50%。
第二个例子是某业务 3小时周期调度任务整体稳定性的提升。在使用 CSS 之前,因为它的 Shuffle 经常触发 Fetch-Failure 异常,造成作业频繁重试,有时可能需要重试 8 次才能最终成功。接入 CSS 后,所有作业都可以一次性跑完,整体的稳定性提升了 70%。
下面是 CSS 未来的规划和展望。
第一是服务分级,即如何满足 Quota & Shuffle 优先级,对不同的业务承诺不同的 SLA,未来我们希望 CSS 能以更有力的方式保证高优业务。
第二是 CSS 作业构建 Shuffle 元仓,进行更好的 HBO 优化。 当前 CSS Shuffle 数据元仓的 HBO 优化只有比较简单的 Yes 和 No 的功能,即用户根据历史的 Shuffle 数据量来允许或者拒绝提供服务。后续我们希望可以根据元仓的数据加强调度,比如把数据量大的作业更广泛地打散,让大的 Shuffle 作业和小的 Shuffle 作业同时分配在一台机器上或一块磁盘上,避免把很多大的作业同时调度到一块磁盘上,从而让负载更加均匀。
第三是 CSS Worker 支持异构机器,自动调节负载,降低运维成本。 因为我们收到的节点的型号、磁盘数量、网卡数量都不一样。目前我们分配的算法会考虑负载能力,但是对相对比较静态的负载,负载能力的这种差异还无法完全地自适应异构机器,自动地调节负载。在缺失这些能力的情况下,如果我们一个集群里使用了异构机器,就会导致某些相对来说性能比较差的机器,影响整个作业的性能。但是如果我们把不同类型的机器拆分出来,做成不同的集群,又会提高运维成本。所以支持异构机器是我们将来一个非常重要的目标。
此前,Cloud Shuffle Service 已在 Github 上开源,基于字节跳动大规模实践的火山引擎批式计算 Spark 版也已经上线火山引擎,支持公有云、混合云及多云部署,全面贴合企业上云策略,欢迎了解:https://www.volcengine.com/product/spark
更多资讯欢迎关注公众号【字节跳动云原生计算】