干货 | 在字节跳动,一个更好的企业级SparkSQL Server这么做

数据湖仓

SparkSQL是Spark生态系统中非常重要的组件。面向企业级服务时,SparkSQL存在易用性较差的问题,导致难满足日常的业务开发需求。本文将详细解读,如何通过构建SparkSQL服务器实现使用效率提升和使用门槛降低。

前言

Spark 组件由于其较好的容错与故障恢复机制,在企业的长时作业中使用的非常广泛,而SparkSQL又是使用Spark组件中最为常用的一种方式。

相比直接使用编程式的方式操作Spark的RDD或者DataFrame的API,SparkSQL可直接输入SQL对数据进行ETL等工作的处理,极大提升了易用度。但是相比Hive等引擎来说,由于SparkSQL缺乏一个类似Hive Server2的SQL服务器,导致SparkSQL在易用性上比不上Hive。

很多时候,SparkSQL只能将自身SQL作业打包成一个Jar,进行spark-submit命令提交,因而大大降低Spark的易用性。除此之外,还可使用周边工具,如Livy,但Livy更像一个Spark 服务器,而不是SparkSQL服务器,因此无法支持类似BI工具或者JDBC这样的标准接口进行访问。

虽然Spark 提供Spark Thrift Server,但是Spark Thrift Server的局限非常多,几乎很难满足日常的业务开发需求,具体的分析请查看:干货 | 看 SparkSQL 如何支撑企业级数仓

标准的JDBC接口

Java.sql包下定义了使用Java访问存储介质的所有接口,但是并没有具体的实现,也就是说JavaEE里面仅仅定义了使用Java访问存储介质的标准流程,具体的实现需要依靠周边的第三方服务实现。

例如,访问MySQL的mysql-connector-java启动包,即基于java.sql包下定义的接口,实现了如何去连接MySQL的流程,在代码中只需要通过如下的代码方式:

Class.forName("com.mysql.cj.jdbc.Driver");
Connection connection= DriverManager.getConnection(DB_URL,USER,PASS);
//操作
connection.close();

第一,初始化驱动、创建连接,第二,基于连接进行对数据的操作,例如增删改查。可以看到在Java定义的标准接口访问中,先创建一个connection完成存储介质,然后完成connection后续操作。

性能问题导致单次请求实时创建connection的性能较差。因此我们往往通过维护一个存有多个connection的连接池,将connection的创建与使用分开以提升性能,因而也衍生出很多数据库连接池,例如C3P0,DBCP等。

Hive的JDBC实现

构建SparkSQL服务器最好的方式是用如上Java接口,且大数据生态下行业已有标杆例子,即Hive Server2。Hive Server2在遵循Java JDBC接口规范上,通过对数据操作的方式,实现了访问Hive服务。除此之外,Hive Server2在实现上,与MySQL等关系型数据稍有不同。

首先,Hive Server2本身是提供了一系列RPC接口,具体的接口定义在org.apache.hive.service.rpc.thrift包下的TCLIService.Iface中,部分接口如下:

public TOpenSessionResp OpenSession(TOpenSessionReq req) throws org.apache.thrift.TException;

public TCloseSessionResp CloseSession(TCloseSessionReq req) throws org.apache.thrift.TException;

public TGetInfoResp GetInfo(TGetInfoReq req) throws org.apache.thrift.TException;

public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws org.apache.thrift.TException;

public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws org.apache.thrift.TException;

public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws org.apache.thrift.TException;

public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws org.apache.thrift.TException;

public TGetTablesResp GetTables(TGetTablesReq req) throws org.apache.thrift.TException;

public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws org.apache.thrift.TException;

public TGetColumnsResp GetColumns(TGetColumnsReq req) throws org.apache.thrift.TException;

也就是说,Hive Server2的每一个请求都是独立的,并且是通过参数的方式将操作和认证信息传递。Hive 提供了一个JDBC的驱动实现,通过如下的依赖便可引入:

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>version/version>
</dependency>

在HiveConnection类中实现了将Java中定义的SQL访问接口转化为调用Hive Server2的RPC接口的实现,并且扩充了一部分Java定义中没有的能力,例如实时的日志获取,但是使用这个能力的时候需要将对应的实现类转换为Hive的实现类,例如:

HiveStatement hiveStatement = (HiveStatement) connection.createStatement();
List<String> logs = hiveStatement.getQueryLog();

对于Log的获取获取也是调用FetchResult接口,通过不同的参数来区分是获取Log信息还是获取内容信息,所以Hive JDBC背后封装的调用Hive Server2的RPC接口的的流程是:

image.png

如果该流程触发获取MetaData、获取Functions等操作,则会调用其他接口,其中身份信息即token,是用THandleIdentifier类进行封装。在OpenSession时,由Hive Server2生成并且返回,后续所有接口都会附带传递这个信息,此信息是一次Connection连接的唯一标志。

但是,Hive Server2在FetchResults方法中存在bug。由于Hive Server2没有很好处理hasMoreRows字段,导致Hive JDBC 客户端并未通过hasMoreRows字段去判断是否还有下一页,而是通过返回的List是否为空来判断。因此,相比Mysql Driver等驱动,Hive会多发起一次请求,直到返回List为空则停止获取下一页,对应的客户端的JDBC代码是:

ResultSet rs = hiveStatement.executeQuery(sql);
while (rs.next()) {
    // 
}

即Hive JDBC实现next方法是通过返回的List是否为空来退出while循环。

构建SparkSQL服务器

介绍完前面的JDBC接口知识与Hive的JDBC知识后,如果要构建一个SparkSQL服务器,那么这个服务器需要有以下几个特点:

  1. 支持JDBC接口,也就是可以通过Java 的JDBC标准进行访问,这样可以很好的与周边生态进行集成。并且降低使用门槛。
  1. 兼容Hive协议,如果要支持JDBC接口,那么就需要提供一个SparkSQL的JDBC Driver,而目前大数据领域Hive Server2提供的Hive-JDBC-Driver已经被广泛使用,从迁移成本来说最好的方式就是保持Hive的使用方式不变,只需要换个端口就行,也就是可以通过Hive的JDBC Driver直接访问SparkSQL服务器。
  1. 支持多租户,以及类似用户名+密码和Kerberos这类常见的用户认证能力。
  1. 支持跨队列提交,同时支持在JDBC的参数里面配置Spark的相关作业参数,例如Driver Memory,Execute Number等。

这里还有一个问题需要考虑,那就是用户通过SparkSQL服务器提交过来的是一段SQL代码,而SparkSQL在执行的时候需要向Yarn提交一个Jar去执行,如何实现这个SQL到Jar提交的转换?

一个最简单的方式是用户每来一个SQL就执行一次spark-submit命令,将结果保存下来再缓存下来,再提供给客户端,但是更好的方式是提交一个常驻的Spark 作业,这个作业是一个常驻任务,这个作业会开启一个端口,用来接收用户的SQL进行执行,并且进行保存。

但是为了解决类似Spark Thrift Server的问题,这个作业需要和用户进行绑定,而不是随着Spark的组件启动进行绑定,也就是作业的提交,以及接收哪个用户的请求,均来自于用户的行为触发。

image.png

有了这样几个大的方向后,便可以开始开发这个SparkSQL服务器了,首先需要实现TCLIService.Iface下的所有接口,下面用代码+注释的方式来讲述这些Thrift接口的含义,以及如果实现一个SparkSQL服务器,需要在这些接口做什么内容:

public class SparkSQLThriftServer implements TCLIService.Iface {
    @Override
    public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
        //Hive JDBC Driver在执行创建Connection的时候会调用此接口,在这里维护一个用户与Spark 作业的对应关系。
        //来判断是需要复用一个已经存在的Spark作业,还是全新执行一次spark-submt。
        //用户与是否需要spark-submit的关联关系均在这里实现。
        //同时需要生成THandleIdentifier对象,并且和用户身份进行关联,后续其他方法调用均需要使用这个对象关联出用户的信息。
        return null;
    }

    @Override
    public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
        //客户端调用connection.close()方法后会进入到这里,在这里进行用户状态的清除,同时需要基于用户的情况判断是否需要停止用来执行该用户SQL的Spark 作业引擎。
        return null;
    }

    @Override
    public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
        //获取服务器的元数据信息,例如使用BI工具,在命令会列出所连接的服务的版本号等信息,均由此方法提供。
        return null;
    }

    @Override
    public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
        //执行SQL任务,这里传递过来的是用户在客户端提交的SQL作业,接收到用户SQL后,将该SQL发送给常驻的Spark作业,这个常驻的作业在OpenSession的时候已经确定。
        return null;
    }

    @Override
    public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
        //获取数据库支持的类型信息,使用BI工具,例如beeline的时候会调用到这里。
        return null;
    }

    @Override
    public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
        //获取Catalog,使用BI工具,例如beeline的时候会调用到这里。
        return null;
    }


    @Override
    public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
        //返回查询结果,基于THandleIdentifier对象查询到用户的SQL执行的情况,将请求转发至常驻的Spark 实例,获取结果。
        //参数中通过TFetchResultsReq的getFetchType来区分是获取日志数据还是查询结果数据,getFetchType == 1为获取Log,为0是查询数据查询结果。
        return null;
    }

    
}

可以看到我们需要实现一个兼容Hive JDBC Driver的服务器,这是复用当前生态最好的方式,有了上面的Thrift接口实现后,则需要启动一个Thrift服务,例如:

TThreadPoolServer.Args thriftArgs = new TThreadPoolServer.Args(serverTransport)
        .processorFactory(new TProcessorFactory(this))
        .transportFactory(new TSaslServerTransport.Factory())
        .protocolFactory(new TBinaryProtocol.Factory())
        .inputProtocolFactory(
                new TBinaryProtocol.Factory(
                        true,
                        true,
                        10000,
                        10000

                )
        )
        .requestTimeout(1000L)
        .requestTimeoutUnit(TimeUnit.MILLISECONDS)
        .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
        .executorService(executorService);
thriftArgs
        .executorService(
                new ThreadPoolExecutor(
                        config.getMinWorkerThreads(),
                        config.getMaxWorkerThreads(),
                        config.getKeepAliveTime(),
        TimeUnit.SECONDS, new SynchronousQueue<>()));
        
TThreadPoolServer server = new TThreadPoolServer(thriftArgs);
server.serve();

至此便开发了一个支持Hive JDBC Driver访问的服务器,并且在这个服务器的方法中,实现了对Spark 作业的管理,因此我们还需要开发一个预设的Spark Jar,这个Jar同样实现了如上的接口,只是这个作业的实现是实际的去执行用户的SQL。

经过前面的流程,已经开发了一个可以工作的SparkSQL服务器,可以接收用户的请求,执行SQL,并且返回结果,但是还有一些细粒度的工作需要考虑,例如如何实现跨队列的提交,如何实现用户的细粒度的资源管理,以及如何维护多个Spark 作业的连接池。

image.png

由于对于Spark作业在Yarn上的提交,运行,停止均由SparkSQL服务器管理,对用户是不可见的,用户只需要编写标准的JDBC代码即可,因此可以基于用户的参数信息来匹配合适的引擎去执行,同时还可以限制一个Spark 常驻作业的任务个数,实现更加灵活的SparkSQL作业的管理,同时也可以实现类似C3P0连接池的思想,维护一个用户信息到Spark常驻作业的关联池。

SparkSQL服务器的HA

Hive Server2在启动的时候会将自己的服务器信息写入Zookeeper中,结构体如下所示:

[zk: localhost:2181(CONNECTED) 1] ls /hiveserver2\
[serverUri=127.0.01:10000;version=3.1.2;sequence=0000000000]

当连接HA模式下的服务器的时候,Hive JDBC Driver的URL需要切换成zookeeper的地址,Hive JDBC Driver会从多个地址中随机选择一个,作为该Connection的地址,在整个Connection中均会使用该地址。

因此对于我们实现的SparkSQL服务器,只需要在服务器启动的时候,保持与Hive一致的数据格式,将自己的服务器的地址信息写入到Zookeeper中即可,便可通过标准的zk地址进行访问,例如:

./bin/beeline -u  "jdbc:hive2://127.0.01/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=自定义的节点;auth=LDAP"  -n 用户名 -p密码

由于服务器的选择基于Connection级别的,也就是在Connection被生成新的之前,整个服务器的地址是不会发生变化的,在发生错误的时候服务端可以进行重试,进行地址的切换,因此HA的力度是在Connection级别而非请求级别。

对接生态工具

经过前面的开发,对于大部分的场景下,使用标准的Hive驱动,只需要切换一个端口号均可直接使用,特别需要提到的一个工具是Hue,Hue由于和Hive的集成并未使用标准的JDBC接口,而是直接分开调用的Hive Server2的Thrift接口,也就是Hue自行维护来如何访问Thrift的接口的顺序问题。

可以发现在这样的情况会有一个问题就是对于Hue来说,并没有Connection的概念,正常的SparkSQL在JDBC的交互方式下处理流程是:

image.png

但是在Hue下由于没有Connection的概念,意味着Hue的请求并不会先到OpenSession,而是会直接发起ExecuteStatement,而正常流程下ExecuteStatement处接收到Hue的请求会发现这个请求是非法的,因为没有上下文信息。所以OpenSession便不能作为连接的起点,需要在每一个Thrift接口处均需要实现OpenSession的能力,作为全新连接的上下文初始化。

尾声

SparkSQL在企业中的使用比重越来越大,而有一个更好用的SQL服务器,则会大大提升使用效率和降低使用门槛,而SparkSQL在服务器这方面的能力显然不如Hive Server2提供的更加标准,所以各个企业均可基于自身情况,选择是否需要开发一个合适于自身的SparkSQL服务器。

字节跳动的EMR团队在给企业提供Spark组件的时候也考虑到这个问题,结合字节跳动内部以及客户外部的需求情况,EMR团队研发了Ksana for SparkSQL提供一个生产可用的SparkSQL服务器,并且在Spark 性能方面也做了较大的优化,本文主要围绕技术实现的角度来阐述如何实现一个SparkSQL服务,后续会有更多文章讲述其他相关的优化。

欢迎关注并使用 火山引擎E-MapReduce

0
0
0
0
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论