干货|湖仓一体架构在火山引擎LAS的探索与实践

技术

picture.image

火山引擎湖仓一体分析服务LAS(Lakehouse Analytics Service),是面向湖仓一体架构的 Serverless 数据处理分析服务,提供字节跳动最佳实践的一站式 EB 级海量数据存储计算和交互分析能力,兼容 Spark、Presto、Flink 生态,帮助企业轻松构建智能实时湖仓。

LAS服务是什么?LAS有哪些优化特性?本文将从基础概念、数据库内核特性优化、数据服务化、业务实践等角度全方位介绍湖仓一体架构在LAS的探索与实践。 文末可下载本文对应的PPT材料。

picture.image

picture.image

在了解Las服务是什么之前,先来了解一下数据平台整体行业的发展趋势,大概分为三个阶段。

picture.image

第一阶段,一般被称为传统数仓,一种从1980年开始的基于传统数据库技术来做的BI分析场景。在这种架构下,通常计算和存储是高度一体的。整体系统能支撑的计算能力,依赖于服务提供商的硬件配置,整体成本高,存在物理上限,扩展起来比较麻烦。

第二阶段,随着技术的演进, 2010年开始出现了以 Hadoop 技术体系为主流的传统数据湖。在以 Hadoop 技术为主的数据平台架构下,通常可以支持服务在普通硬件上面去部署,整体的计算和存储的扩展性都得到了解决。基于开源技术生态,多个大型公司也参与到数据湖技术发展中来,整体生态繁荣度也在逐步提升。

但在这一阶段凸显出了一个问题,随着生态技术的发展,越来越多的开源组件开始累积。对于一个企业来说,为了解决不同领域的问题,需要运维多个开源的组件,来满足不同领域的数据需求,就导致整个企业的技术运维成本逐步提升。

基于这个问题,随着技术的进一步发展,在2020年,湖仓一体的架构开始被提出。

相比起传统数据湖,湖仓一体架构支持原生的ACID 能力,支持像BI分析、报表分析,机器学习和流式分析多种类型的计算范式,以及云上的对象存储和弹性计算能力。以上能力,让湖仓一体架构能够有效地去解决企业的对数据规模,以及对计算能力的弹性伸缩需求。同时,湖仓一体可以在很大程度上规避传统Lambda架构存在的多个计算组件,或者多种架构范式导致的架构负担,让企业能够更专注地去解决他们的业务价值。

picture.image

LAS就是基于湖仓一体的架构进行设计的。从上图来看,LAS架构整体上分为三个部分。最上层是开发工具层,开发工具层会通过计算层提供的统一 SQL 访问服务去访问计算层,根据用户的 SQL 类型自动做SQL解析。所有引擎计算能力统一由弹性容器服务来提供,可以支持弹性伸缩,按需使用。

再往下就是湖仓一体的存储层。首先,湖仓一体存储会通过统一的元数据服务,向计算层提供统一的元数据视图,屏蔽底层的具体元数据实现细节,可以使多个引擎无缝对接到统一的元数据服务。

接下来是湖仓存储引擎,它主要提供了事务管理能力,也就是 ACID的能力,以及对数据批流一体的读写能力。

再往下就是 LAS基于火山引擎对象存储服务TOS和CloudFS ,来提供EB级的数据存储能力和数据访问的缓存加速能力。

以上就是 LAS整体的技术架构。

picture.image

这一版块将向大家呈现LAS数据湖内核的特性及优化。

picture.image

LAS 数据湖 内核 —— ByteLake ,它是什么?

首先,ByteLake是基于开源Apache Hudi进行内部增强的湖仓一体存储引擎,提供湖仓一体的存储能力。

它的第一个主要能力是提供了湖仓统一的元数据服务,完全兼容开源的Hive Metastore,可以无缝对接多种计算引擎。第二个主要能力是可以支持对海量数据的Insert,完全兼容Hive SQL,可以平迁传统数仓场景下的Hive任务。第三,ByteLake支持对大规模历史数据的Update和Delete,以及对新增数据的Upsert和Append能力。最后,ByteLake支持流批一体的读写能力,提供流式读写的 source 和sink,支持近实时分析。

ByteLake 又是怎么做到这些能力的呢?接下来从以下几个特性来展开阐述。

picture.image

如何实现高效数据更新?

第一个场景是流式写入更新场景。在这种场景下,最明显的特点就是小批量数据频繁写入更新。但主要的问题是如何去定位要写入的记录呢?是做 update 操作还是 insert 操作?

在这样的背景下,ByteLake提供了一种Bucket Index的索引实现方案。

这是基于哈希的一种索引实现方案。它可以快速地去定位一条记录所对应的Fail Group,从而快速定位当前记录是否已经存在,来判断这一条记录是做Update还是做Insert操作,从而可以快速地将这种小规模的数据去添加到Append Log。在读取时,通过Compaction就可以将LogFile和BaseFile里边的数据进行Merge去重,从而达到数据更新的效果。

针对日志数据入湖,通常来说是不需要主键的,这种基于Hash索引的实现方式,是需要有Shuffle操作的。因为在基于Hash的索引实现中,当一批数据过来之后,会根据这一批数据去找分别对应的File Group,再基于File Group 去聚合要更新的这些数据,通过同一个Task,去更新同一个File Group来实现原子写入。

在数据Shuffle的过程,其实对于数据湖日志写入是有额外的开销的,但ByteLake提供了一种Non index的实现方案,去掉了索引的约束,可以减少数据Shuffle的过程,从而达到快速入湖的能力。

picture.image

存量数据如何高效更新?

存量数据,一大特点就是数据量大,单表的规模可能有几百 TB ,甚至到 PB 的级别。针对于这种大规模的历史数据的更新场景,如何去提升更新性能?其实最主要的就是要如何去降低数据更新的规模。

基于此,ByteLake提出了一种实现方案——Column Family,将单表多列的场景分别存储到不同列簇。不同的文件可以基于Row Number进行聚合,合并后就是一个完整的行。如果要更新历史数据,只需要去找到要更新的那些列对应的Column Family对应的文件,把这些文件做一些局部更新,就可以达到整体更新的效果。从而在很大程度上减少这些非必要数据的扫描,提升存量历史数据更新场景的性能。

picture.image

如何提升并发性能?

谈到并发,通常会有两部分内容。比如有很多个任务同时去往ByteLake引擎里边写数据,这就意味着有大批量的任务去访问ByteLake的MetaStore Service。在这种场景下,ByteLake MetaStore Service就会成为一个性能瓶颈。

为了突破这个瓶颈,除了无限的堆加资源之外,另一个比较有效的方案就是增加缓存。通过元数据服务端去缓存比较热点的数据,比如Commit Metadata和Table Metadata,来达到服务端的性能提升。

另外一块,是在引擎侧做优化。比如在Flink引擎层面将Timeline的读取优化到 JobManager 端。同一个任务下,只要JobManager去访问 Hive ByteLake MetaStore Service,缓存到JobManager的本地之后,所有的TaskManager只要去访问JobManager本身缓存的 Timeline 信息就可以了。

picture.image

从单个任务的视角来看,比如多个任务要同时去更新同一张表,这种情况下要保证数据的正确性,同时又能保证并发性能,应该如何来做?ByteLake提供的解决方案——基于乐观锁的一个并发控制。

针对多任务写同一个表的场景,ByteLake可以支持多种并发策略的设置。业务可以根据对数据一致性的要求,以及对数据并发性能的要求,选择灵活的并发策略,来达到它的数据并发写入的性能指标。

picture.image

这个版块将向大家呈现ByteLake服务化过程中的一些设计实践。

picture.image

● CatalogService:统一的元数据视图

CatalogService主要提供了与HMS的兼容接口,同时为所有的查询引擎提供了统一的元数据视图,解决了异构数据源的元数据管理问题。

CatalogService 整体分三层,第一层是Catalog Federation,提供统一的视图和跨地域的数据访问能力。以及提供了对源数据请求的路由能力,可以根据元数据请求的类型,支持通过Mapping的方式,来路由不同的服务请求对应的底层元数据服务实例。

第二层是CatalogService下层的具体元数据服务的实现,比如Hive MetaStore Service以及ByteLake MetaStore Service等。可能还有不同的元数据服务对接到CatalogService,来统一向上层引擎提供这种元数据服务。

最后一层是MetaStore的存储层,它通过插件式的方式来提供不同的存储引擎,来满足上层不同元数据服务实例的存储要求。

/ BMS详解 /

1. 湖仓一体元数据管理服务

Bytelake MetaStore Service,简称BMS,它是一个湖仓一体的元数据管理服务,整体的架构分为以下几个部分。首先第一个就是Catalog,Catalog是对单表的元数据访问的抽象。主要逻辑是通过MetaStore Client来访问Meta Server,同时它会去缓存单表的Schema信息以及属性等信息。

另外一部分就是Meta Server,也就是BMS里边最核心的部分。它主要是包含两大部分服务层,第一是Bytelake MetaStore元数据服务模型,比如Table Service,Timeline Service,Partition Service和Snapshot Service。存储层提供了MetaStore所有元数据的存储能力。最后一部分就是Eventbus, Eventbus主要目的是为了将元数据的CUD事件发送给监听者,来达到元数据信息的分发和同步。

picture.image

2. 元数据写入流程

关于元数据写入流程,简单来讲,当有一个Client去提交了Instant 之后,Bytelake Catalog会去访问Bytelake Meta Store 的接口,会将Instance改成Completed,然后将请求发到Bytelake的MetaStore,之后Bytelake MetaStore Server 会做一个原子提交。

在此之后,Timeline Service会把提交的状态更新到数据库里边。接下来这些分区信息将再被提交给Partition Service,同步到对应的分区存储表里去。最后一步,把这些所有的变更作为一个快照,同步到 Snapshot Service 里,它会把文件层面的变更存储到数据库里,做持久化存储。

picture.image

3. 元数据读取流程

对于源数据的读取流程,举个例子,有一个计算引擎它读取了一个SQL,通过 SQL 解析拿到一张表,这张表会通过Bytelake Catalog Service去请求Bytelake MetaStore,最终会路由到Table Service 拿到这些表的信息。

拿到表的信息做SQL Plan优化的时候,会做一些分区的下推或裁剪。这个时候会去请求到Bytelake的Partition Service做过滤,接着会根据分区信息去扫描文件,在此过程中会去请求Timeline Service获取对应的Timeline信息。接下来,基Timeline的信息时间去Snapshot Service拿到对应文件,再通过 SQL 执行器来实现数据文件的读取。

picture.image

4. 元数据变更通知

元数据变更通知具体的实现流程主要依托于两个部分。

一是Eventbus,二是listener。所有的元数据请求都会发送到Eventbus,由Eventbus分发事件到所有已经注册的 Listener上面。listener再根据下游系统的需求,去订阅Eventbus里边的对应事件类型进行响应,从而达到让上下游的组件感知到元数据的变化,实现元数据的同步。

picture.image

/ TMS详解 /

1. 统一表管理服务

LAS的另外一个服务——TMS,全称是 Table Management Service。它主要解决的问题是异步任务的托管优化。为什么会做异步任务的托管优化?因为正常来讲,Flinker SQL 任务写ByteLake表的过程,其实就是把批量的数据写入下游表里边去。随着时间的推移,一个是Commit的日志非常多,另外一个是小文件非常多。通常的Flink引擎层面的实现方案,是在数据写了一定的次数后,追加一个Compaction操作,把之前写入的文件做一个压缩。

但针对流式任务去做Compaction,对正常的流式任务稳定性有很大影响,因为压缩本身是一个开销比较大的动作,对流式计算资源的消耗是很难去评估的,会导致整个流式写入任务的波动,从而影响流式写入任务的稳定性。

基于此,LAS提供了一个统一的表管理服务,异步托管这些本身内置到引擎内部的任务,统一由Table Management Service来托管。它整体的架构是一个主从架构,主要包含的组件一个是Event Receiver,用来接收Metastore下发的一个Event。PlanGenerator就是根据Meta store Server下发的Event信息,来触发Action Plan的生成。

什么是Action Plan?简单讲,就是这一次要做哪些事情,比如你要做一个压缩任务,还是做一次历史文件的清理,还是做一些小文件的合并,都称为Action Plan。Job Scheduler就是去调度需要被执行的Acting Plan。

什么是Job Manager?它主要用于和集群交互,比如Yarn或K8S,管理Action Plan对应的执行任务,做一些任务运维层面的工作。

picture.image

2. 执行计划生成

picture.image

就执行计划生成展开来讲,Plan Generator会接收Metastore下发的一些事件,根据用户在表的DDL里的配置策略,来决定是否要生成执行计划。

这个策略通常会有几种,比如,一种基于它 Delta Commit 的数量,连续提交了多次达到了一定的阈值,就会触发一个Action Plan 的生成,来做一次数据的压缩。另外一种,是根据Log File的大小,来判断Compaction操作是否需要执行。PlanGenerator策略会根据当前 Log File的 Meta 信息,来决定是否要触发Action Plan的生成。

3****. 执行计划调度管理********

picture.image

执行计划生成结束之后,最后一步就是怎么去调度管理执行计划。执行计划调度的核心流程主要由Job Scheduler来做,Job Scheduler会定时地去轮询已经生成的Action Plan,再分发给Job Manager。Job Manager拿到了Action Plan之后,会到集群上提交一个任务,同时不断去轮询任务的状态,更新任务的状态到数据库,保证Action Plan执行的可靠性和稳定性。通常JobScheduler一般会有先进先出的调度策略,来保证Action Plan达到预期调度效果。

picture.image

/ 抖音电商在湖仓 /

picture.image

抖音电商的业务场景,主要是营销大促、流量诊断以及物流状态的监控。他们的业务痛点是什么?数据量大,计算逻辑复杂,同质数据源也比较多,宽表的构建成本比较高,包括一些其他的技术问题。还有一个痛点就是计算周期长,增量计算成本比较高。

基于LAS湖仓一体架构下,可以解决哪些问题呢?

首先,通过LAS快数据入湖能力,可以解决多数据源的快速入湖。把外部的业务系统和业务日志,通过LAS这种实时入湖能力快速导入到ODS层。通过离线数仓可以直接引用ODS层的准实时入库数据,来达到离线数仓的日增量数据,同步提升数据的时效性。

其次,实时数仓中DW层的一些明细数据,也可以通过流式入湖的能力,直接导入到ByteLake,达到数据复用的目的。当把这些数据导到了ByteLake之后,针对大宽表场景,就可以基于ByteLake的多流拼接能力,直接在底层的存储引擎层,实现宽表的构建。

从而解决在常规场景下,通过Flink SQL做多源或多流join,导致的任务状态比较大,或者任务处理复杂度比较高的这种稳定性问题,从而更好地去保障业务数据的及时性和稳定性。

/ 消费行业传统数仓架构升级 /

picture.image

消费行业的客户场景,实际就是在零售场景下的财务管理、库存管理相关的一些计算场景。客户的实现方案基于传统的数据库,业务和离线分析的请求都是统一在一个传统数据库上边来做的。

在这种场景下,其实整个RDBMS要同时承接业务处理逻辑和离线ETL分析逻辑。随着业务数据量的增长,很快就会发现传统数据库的计算能力和存储支撑能力达到了上限,导致计算能力不足,扩展性比较差,无法在满足后续的业务数据规模的上量。

LAS针对这种场景的解决方案,是将客户的离线ETL的分析场景,通过实时集成的方式直接导入到LAS里边,通过LAS的弹性计算能力,为用户的ETL分析场景提供有效的算力保障。

在满足客户低成本约束的情况下,达到客户预期的计算效果,和对数据产出的及时性的要求。同时会通过云上的ByteHouse服务来解决客户自建的CK的运维成本以及性能调优的问题。优化了原有的基于RDBMS的数据链路,保证业务数据量快速增长的同时,满足它的底层的算力要求。

/ 湖仓一体架构下的批流融合计算 /

picture.image

典型场景就是数据实时入湖,客户的数据源会通过 Flink SQL 持续地去写入到LAS的Bytelake表里。但下游如果是一个离线任务,其实用户没办法很便利地去判断数据写到了哪个位置,或者分区数据现在是不是已经完备的。

如果仅依赖系统时间来实现,比如在上游的这种Flink SQL任务,在写入过程正常时倒没有特别大的问题。但是一旦上游Flink SQL任务出现一些数据积压或者任务异常的场景,下游依赖系统时间去调度,就会存在某些分区会出现数据空洞或数据偏移的问题。例如本来数据应该落在7点的分区,因为上游的这些 SQL 任务的消费延迟,导致7点的数据并没有准时地落下来, 导致下游去消费7点的数据的时候,拿到的是一个不完整的数据,导致出现数据空洞或数据偏移的问题。

针对这种场景,LAS提供了一种叫归档的能力,也就是在Flink SQL写入的过程中,会基于业务事件时间实时写入对应的数据分区。通过ByteLake提供归档能力,分区数据就绪后,可自动生成一个归档标签。下游的spark SQL 任务可以根据分区是否有归档标签,来判断对应分区的数据是否就绪,来决定当前离线任务是不是要调度起来。

这项能力的实现逻辑,其实就是Flink SQL每次去提交一个Commit的时候,会去判断当前提交的业务的事件时间,是否比当前的未提交分区的时间超过了某一个阈值。比如当前分区的时间是7点,Flink SQL在持续提交微批数据的时候,它判断出来当前的最小的业务时间已经到 7 点半了,而业务定义的可容忍的延迟间隔是 15 分钟, ByteLake认为这个数据其实已经写完了,就会把7点的分区数据打上一个归档标签,来标示数据已经完成了。下游就可以去正常地去消费7点的分区数据,从而保证数据的完整性。

在提供了这种归档能力的情况下,LAS的整体计算链路就可以实现批流融合。比如ODS的ByteLake表是一个准实时的表,下层的Spark SQL任务可以直接通过Spark ETL去做处理,产出一个离线表。可能后边还会有一些SQL场景依赖离线表做数据的准实时消费。在这种情况下,Flink SQL会再生成一张ByteLake表,这张表同样可以被下游的Spark SQL的离线任务依赖,从而达到在整个Pipeline里,做到批流计算相互融合的状态。

产品介绍

火山引擎湖仓一体分析服务 LAS

湖仓一体架构 Serverless 数据平台,孵化自字节跳动最佳实践,提供一站式 EB 级海量数据存 储、计算和交互分析能力,兼容Spark、Presto、Hudi生态,助力企业构建云原生智能实时湖仓。 后台回复数字“4”了解产品

——相关阅读——

picture.image

picture.image

picture.image

picture.image picture.image

0
0
0
0
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论