字节跳动 EMR 产品在 Spark SQL 的优化实践

数据中台大数据数据湖仓

本文重点介绍了字节跳动 EMR 产品在 SparkSQL 的优化实践。

picture.image 文 | 惊帆 来自 字节跳动数据平台EMR团队

数据湖引擎集成

Hudi、Iceberg等数据湖引擎目前使用的越来越广泛,很多B端客户在使用Spark SQL的时候也存在需要使用数据湖引擎的需求,因此字节EMR产品需要将数据湖引擎集成到Spark SQL中,在这个过程碰到非常多的问题。

首先在与Iceberg集成的时候,对体验和易用的问题进行了优化,用户在使用Spark SQL过程中,需要手动输入很多指令,并且需要找到对应的spark-iceberg 依赖包,这个也是目前集成Iceberg最常用的方案。我们的解决方式是在预先安装的过程中,提前把iceberg的相关jar包放到spark jars目录下,这样用户只需要指定catalog即可,无需再手动输出很多指令。

其次在Spark与Hive跨引擎分析场景下使用Iceberg,Spark正常创建表,Presto/Trono可以正常读写,但Hive无法正常读写,这个问题官方的文档也没有清晰的描述,解决方案是需要修改Spark的配置文件或者修改Hive的hive-site-spark override配置,确保初始化出来的Spark Session中的配置项iceberg.engine.hive.enable的值为true,Hive才能正常的读取Spark创建的表。 

picture.image

问题上本质上是由于Iceberg为了支持Hive引擎,在整体的设计上做了一些妥协,使用了Storage Handler的方式去实现Hive对Iceberg格式的表的读写,需要显式的指定Hive的Input/Output Format实现,而Presto/Trono则可以基于Hive的format_type自动识别表的格式进行识别。

在兼容性上,由于Iceberg 0.12版本不支持Spark 3.2,由于升级Spark的影响范围非常大,于是更新了Iceberg,使用了社区的一个master的snapshot版本进行编译,与Spark 3.2进行集成。

Spark SQL 服务器

虽然行业针对Spark SQL 提供一个SQL 服务器已经有Spark Thrift Server或者Kyuubi这样的工具,但是在某些B端客户的业务的背景下,这些工具并不能完全满足要求,因此字节跳动EMR团队自己设计实现了Spark SQL Server,主要聚焦解决的是如下场景:

  • 兼容Hive语义: 由于大部分B端客户早期是基于Hive构建的数据仓库,后续逐步全部替换为Spark SQL,中间必然面临大量的系统迁移,而由于Hive与Spark SQL语义不尽相同,重写SQL实现的工作量非常大,因此在字节EMR产品中的Spark SQL Server中实现Hive 语义和Spark SQL语义的兼容,在实现方案上采用的时候讲Hive SQL解析注入到Spark 引擎中,形成一个SQL Parser Chain,最终会匹配到某一个解析器,实现对SQL的解析,从而达到对整个SQL语义的兼容。
  • 提前初始化Spark SQL引擎: 在业务请求到达前提前在YARN上提交Spark任务,初始化资源信息,让整个引擎处于等待的状态,可以减少任务提交消耗的时间,在用户较多的情况下可以提示整体的任务执行时间。
  • 跨Yarn队列的任务提交: 用户可以指定Yarn队列执行任务。 

picture.image

如上图所示,SQL服务器是一个实现了Thrift 接口的服务器,提供标准的JDBC访问接口,Spark SQL引擎同样实现了Thrift 接口,Spark SQL引擎在服务启动的时候便已经被提交至Yarn,处于等待状态。当业务任务到达的时候,由SQL服务器实现引擎的筛选,匹配一个已经存在的引擎,或者重新提交一个全新的引擎用来执行任务。

SQL 服务器支持OpenLDAP,Kerberos等常用的权限认证,同时支持多种不同的隔离级别,例如Session级别则每一个业务SQL都会初始化一个Spark SQL引擎用来接收任务,任务执行结束后,引擎从Yarn中销毁。而User级别则针对用户会初始性0-N个引擎,常驻于Yarn中,处于交替执行任务。

这样的服务器设计打破了Spark Thrift Server的单Driver所带来的局限,解耦了SQL服务和任务执行,也就支持更细粒度的资源管理和跨队列的任务提交。

同时也兼容了Hive的接口,用户可以通过如下方式访问服务器:

  • HA访问链接:
 ./bin/beeline -u  "jdbc:hive2://emr-5fqkwudj144d2gc1k8hi-master-1/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=midas/ha;auth=LDAP"  -n emr_dev -pEMR123456emr
  • 非HA访问链接:
./bin/beeline -u  "jdbc:hive2://emr-master-2:10005/default;auth=LDAP”  -n test_sub -pEMR123456emr

HA模式下的信息被记录在Zookeeper中,保存的内容格式与HiveServer2的内容一致,能确保使用Hive的客户端可以直接访问HA模式下的服务器。 

Spark SQL 多租户

picture.image

在Hive任务执行过程中,HiveServer2服务承担了提供SQL服务器进行用户身份认证,权限判断,以及解析SQL生成最终的执行计划,再由MR引擎执行具体的分布式任务。

在这个过程中HiveServer2承担了非常重的职责,因此需要消耗非常大的资源,因此会很大程度的影响用户的并发。对于分布式任务运行来说,它的资源约束来自于Yarn作为资源管理器所分配的资源,但是在Hive架构下却受限于HiveServer2的影响,导致用户并发的数量无法随着Yarn资源的提升进行提升。

而在Spark SQL引擎中,SQL解析是下推到引擎内部,与具体的分布式任务执行合为一体,不需要单独的服务器去做SQL解析。也正因为Spark SQL与Hive在解析模块的架构存在差异,Hive On Spark的模式会变得非常难。

针对如上的场景,字节跳动EMR团队重新设计的SQL服务器只负责任务的接收,进行用户资源,权限和身份的判断,然后将任务发送给运行在Yarn中的Spark SQL服务器。 打破了Hive这种并发受制于HiveServer2和Yarn两层约束的局面,只由Yarn的资源决定用户的并发程度,从而极大的降低了Spark SQL服务器的资源需求,增强了其稳定性,在用户并发上有了非常大的提升。

其次通过引擎预热的功能减少任务执行的时间,提升整体速度,引擎预热指的是在服务启动的时候便向Yarn提交Spark SQL引擎,处于等待的状态,当业务请求到达的时候,基于业务类型从已经处于就绪的引擎中选择一个引擎来执行任务。 

picture.image

并不是每一个预热提交的引擎都会被选择执行,在SQL服务器中存在如下三种引擎隔离级别:

  • Session: 基于每一个connection都会全新提交Spark SQL引擎,在链接断开后,引擎从Yarn上销毁。
  • User: 同一个用户可以共享多个Spark SQL引擎,具体的Spark SQL引擎个数由该用户提交的任务资源需求决定,引擎在连接断开后不会销毁,直到引擎空闲时长到达上限。
  • Open: 所有用户都可共享的Spark SQL引擎,通常是用来应对大账号,或者集群不需要做权限管理的场景。

由此可见只有在User,Open的级别下引擎预热才会产生价值,预热省去了Spark Submit的时间,当用户数量足够多,群体为统计单位所节省的时间越多。 

SparkSQL 事务支持

Hive的事务力度是基于HiveServer2实现的,例如通过如下语法:

CREATE TABLE tm (a int, b int) stored as orc TBLPROPERTIES
('transactional'='true', 'transactional_properties'='insert_only')

可开启事务,但是由于对事务的管理是在服务器上,因此需要开启ACID的时候受影响的是整个HiveServer2的所有请求,而Spark SQL很好的集成和支持了Hudi,Iceberg等数据湖格式,因此在Spark SQL服务器中不需要实现类似HiveServer2的事务机制,只需要在最终读取处理数据的时候,采用Hudi,Iceberg等特性便可达到支持事务的效果。

例如对于Icdberg数据格式的表已支持update、delete操作:

MERGE INTO prod.nyc.taxis ptUSING (SELECT * FROM staging.nyc.taxis) stON pt.id = st.idWHEN NOT MATCHED THEN INSERT *;

因此当Spark SQL集成了Iceberg后,便具有了事务能力,再配合SQL服务器,便可在很低的成本上具有和Hive事务能力同等的效益,同时也没有Hive下的一些约束,从这样的架构设计上来看,能够完整的替换Hive。另外一个方面当用户数量变多,同时数据会发生修改,更新等操作,很容易造大量的小广播传输,从而引起Driver的OOM。虽然大广播也会存在OOM的问题,但是大广播可以通过阈值控制,而小广播阈值对其不生效,一旦说数量变多,很容易引起Driver的OOM。

字节跳动数据平台EMR团队通过对小广播进行合并广播,解决大量小广播进行传播,导致打爆Driver的情况出现。

尾声

随着企业的业务发展越来越复杂,需要更加灵活,更加高效的数仓架构,在这样的业务驱动背景下,Hive 的局限变得越来越明显,而基于 Spark SQL 灵活构建数仓的方案将会变得越来越主流。所以企业在考虑数据仓库构建体系的时候,可以考虑如何基于Spark SQL 构建自身数据体系,Spark 完善和开放的生态在未来必然会有更多优秀的服务会围绕 Spark 形成强大的优势。

55
0
0
0
关于作者
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论