字节跳动湖平台在批计算和特征场景的实践

计算

本文整理自火山引擎云原生计算研发工程师刘纬在 DataFunCon 2022 上的演讲。随着业务的发展,字节跳动特征存储已到达 EB 级别,日均增量 PB 级别,每天训练资源量级为百万 Core。随之而来的是内部业务方对原始数据存储、特征回填需求、降低成本、提升速度等需求的期待。本次分享将围绕问题背景、选型& Iceberg 简介、基于 Iceberg 的实践及未来规划展开。

作者:刘纬

整理:王吉东,于惠

问题背景

用户使用流程

如我们所知,字节跳动是一家擅长做 A/B test 的公司。以特征工程调研场景为例,流程如下:

  • 首先由算法工程师进行在线特征抽取;
  • 将抽取到的特征,使用 Protobuf 的格式按行存至 HDFS;出于存储成本的考量,一般只存储抽取后的特征,而不存储原始特征
  • 将 HDFS 存储的特征交由字节自研的分布式框架( Primus )进行并发读取,并进行编码和解码操作,进而发送给训练器。
  • 由训练器对模型进行高效训练如果模型训练效果符合算法工程师的预期,说明该调研特征生效,进而算法工程师对调研特征进行回溯,通过 Spark 作业将特征回填到历史数据中,分享给其他算法工程师,进而迭代更多的优质模型如果模型训练效果不符合算法工程师的预期,则调研特征不对原有特征集合产生影响

picture.image

业务规模

公司庞大的业务规模,带来了巨大的计算和存储体量:

  • 特征存储总量达 EB 级;
  • 单表特征最大可达百 PB 级(如广告业务);
  • 单日特征存储增量达 PB 级;
  • 单日训练资源开销达 PB 级。

picture.image

遇到的问题

当特征调研场景叠加巨大的数据体量,将会遇到以下困难:

  • 特征存储空间占用较大

  • 样本读放大,不能列裁剪,很难落特征进样本;

  • 样本写放大,COW 很难做特征回溯调研;

  • 不支持特征 Schema 校验;

  • 平台端到端体验差,用户使用成本高

选型& Iceberg简介

在特征调研场景下,行存储是个低效的存储方式;因此,我们选择 Iceberg 存储方式来解决上述问题。

整体分层

picture.image

Apache Iceberg 是由 Netflix 公司推出的一种用于大型分析表的高性能通用表格式实现方案。

如上图所示,系统分成引擎层、表格式层、文件格式层、缓存加速层、对象存储层。图中可以看出,Iceberg 所处的层级和 Hudi,DeltaLake 等工具一样,都是表格式层:

  • 向上提供统一的操作 API
  • Iceberg 定义表元数据信息以及 API 接口,包括表字段信息、表文件组织形式、表索引信息、表统计信息以及上层查询引擎读取、表写入文件接口等,使得 Spark, Flink 等计算引擎能够同时高效使用相同的表。
  • 下层有 parquet、orc、avro 等文件格式可供选择
  • 下接缓存加速层,包括开源的 Alluxio、火山引擎自研的 CFS 等;CFS 全称是Cloud File System, 是面向火山引擎和专有云场景下的大数据统一存储服务,支持高性能的缓存和带宽加速,提供兼容 HDFS API 的访问接口。
  • 最底层的实际物理存储,可以选择对象存储,比如 AWS S3,火山引擎的 TOS,或者可以直接使用 HDFS。

通过上图可以比较清晰地了解到,Iceberg 这个抽象层最大的优势在于:将底层文件的细节对用户屏蔽,将上层的计算与下层的存储进行分离,从而在存储和计算的选择上更为灵活,用户可以通过表的方式去访问,无需关心底层文件的信息。

Iceberg简介

Iceberg 架构

picture.image

Iceberg 的本质是一种文件的组织形式。如上图所示,包括多级的结构:

  • Iceberg Catalog:用于保存表和存储路径的映射关系,其核心信息是保存 Version 文件所在的目录。Iceberg Catalog 共有8种实现方式,包括 HadoopCatalog,HiveCatalog,JDBCCatalog,RestCatalog 等不同的实现方式,其底层存储信息会略有不同;RestCatalog 方式无需对接任何一种具体的存储,而是通过提供 Restful API 接口,借助 Web 服务实现 Catalog,进一步实现了底层存储的解耦。

  • Metadata File:用来存储表的元数据信息,包括表的 Schema、分区信息、快照信息( Snapshot )等。Snapshot 是快照信息,表示表在某一时刻的状态;用户每次对 Table 进行一次写操作,均会生成一个新的 SnapShot。 Manifestlist 是清单文件列表,用于存储单个快照的清单文件。Manifestfile 是存储的每个数据文件对应的清单文件,用来追踪这个数据文件的位置、分区信息、列的最大最小值、是否存在 Null 值等统计信息。

  • Data File 是存储的数据,数据将以 Parquet、Orc、Avro 等文件格式进行存储。

Iceberg 特点

  • SchemaEvolution:Iceberg 表结构的更新,本质是内在元信息的更新,因此无需进行数据迁移或数据重写。Iceberg 保证模式的演化( Schema Evolution )是个独立的、没有副作用的操作流程,不会涉及到重写数据文件等操作。

  • Time travel:用户可任意读取历史时刻的相关数据,并使用完全相同的快照进行重复查询。

  • MVCC:Iceberg 通过 MVCC 来支持事务,解决读写冲突的问题;

  • 开放标准:Iceberg 不绑定任何计算引擎,拥有完全独立开放的标准,易于拓展。

Iceberg 读写流程和提交流程

picture.image

1.读写

  • 每次 Iceberg 的写操作,只有在发生 Commit 之后,才是可读的;如有多个线程同时在读,一部分线程在写,就只有在 Commit 全部数据之后,对用户进行的读操作才能被用户的读线程所看到,从而实现读写分离。
  • 例如上图中,在对 S3 进行写操作的时候,S2、S1 的读操作是不受影响的;此时 S3 无法被读到,只有Commit 之后 S3 才会被读到。此时 Current Snapshot 会指向 S3。
  • Iceberg 默认从最新 Current Snapshot 读取数据;如果读更早的数据,可通过指定对应的 Snapshot ID ,实现数据回溯。

2.事务性提交

  • 写操作:记录当前元数据的版本——Base Version,创建新的元数据以及 Manifest 文件,原子性将 Base Version 替换为新的版本。

  • 原子性替换:原子性替换保证了线性历史,通过元数据管理器所提供的能力,以及 HDFS 或本地文件系统所提供的原子化 Rename 能力实现。

  • 冲突解决:基于乐观锁实现,每一个 Writer 假定当前没有其他的写操作,即对表的 Write 进行原子性的 Commit,若遇到冲突则基于当前最新的元数据进行重试。

分区裁剪

  • 直接定位到 Parquet 文件,无需调用文件系统的 List 操作;

  • Partition 的存储方式对用户透明,用户在修改 Partition 定义时,Iceberg 可以自动地修改存储布局,无需用户重复操作。

谓词下推

Iceberg 会在两个层面实现谓词下推:

  • 在 Snapshot 层面,过滤掉不满足条件的 Data File;
  • 在 Data File 层面,过滤掉不满足条件的数据。

其中,Snapshot 层面的过滤操作为 Iceberg 所特有,正是利用到 Manifest 文件中的元数据信息,逐字段实现文件的筛选,大大地减少了文件的扫描量。而同为Table Format 产品、在字节其他业务产线已投入使用的 Hudi,虽然同样具备分区剪枝功能,但是尚不具备谓词下推功能。

基于 Iceberg 的实践

Hudi、Iceberg、DeltaLake 这三款 TableFormat 产品各有优劣,然而并没有任何一款产品能够直接满足我们的使用场景需求;考虑到 Iceberg 具备良好的 Schema Evolution 能力、支持下推,且无需绑定计算引擎等优点,因此字节选择使用 Iceberg 作为数据湖工具。

整体架构

picture.image

  • 在字节的整体架构中,最上层是业务层,包含抖音,头条,小说等字节绝大部分业务线,以及火山引擎云原生计算等相关 ToB 产品(如 Seveless Spark 等);
  • 在平台层,使用 Global Lake Service 给业务方提供简单易用的 UI 和访问控制等功能;
  • 在框架层,使用 Spark 作为特征处理框架(包含预处理和特征调研等),使用字节自研的 Primus 分布式框架作为训练框架,使用 Flink 实现流式训练;
  • 在格式层,选择 Parquet 作为文件格式,使用 Iceberg 作为表格式;
  • 最下层是调度器层和存储层。选择 Yarn 和 K8S 作为调度器;存储层一般选择 HDFS 进行存储,对于 ToB 产品,则使用 CFS 进行存储。

Data-Parquet

picture.image

结合上图可以看出,列存储在特征调研场景存在以下优势:

  • 可选择指定列进行读取:有效减少读放大问题,同时易于增列,即新增一列的时候,只需单独写入一列即可,元数据信息会记录每一列所在的磁盘位置;
  • 压缩:同一列的数据格式相同,因此具有更好的压缩比;同一列的数据名称相同,因此无需进行冗余字符串存储;
  • 谓词下推:对每一列数据记录相应的统计信息(如 Min,Max 等),因此可以实现部分的谓词下推。

为了解决业务方的痛点问题,我们改成使用 Parquet 列存储格式,以降低数据的存储成本;同时由于 Parquet 选列具备下推到存储层的特性,在训练时只需读取模型所需要的特征即可,从而降低训练时序列化、反序列化的成本,提升训练的速度。

然而使用 Parquet 列存储,带来优点的同时也相应地带来了一些问题:

  • 原来的行存储方式是基于 Protobuf 定义的半结构化数据,无需预先定义 Schema;然而使用 Parquet 之后,需要预先指定 Schema 才能进行数据的存取;这样在特征新增和淘汰的时候,Schema 的更新将会变成一个棘手的问题。
  • 此外,Parquet 不支持数据回填;如果需要要回填比较长的数据,就需要将数据全量读取,增加新列,再全量写回。这样一方面会造成大量计算资源的浪费,另一方面会带来 Overwrite 操作,导致正在进行训练的任务由于文件被替换而失败。

为了解决以上两个问题,我们引入了Iceberg 来支持 SchemaEvolution,特征回填以及并发读写。

特征回填

COW

picture.image

从上图可以看出,使用 Iceberg COW( Copy on Write )方式进行特征回填,通过 BackFill 任务将原快照中的数据全部读出,然后添加新列写出到新的 Data File 中,并生成新的快照。这种方式的缺点在于,仅仅新增一列数据的写入,却需要整体数据全部读出后再全部写回,浪费了大量的计算资源和存储资源;因此,我们基于开源的 Iceberg 自研了一种 MOR( Merge on Read )的 BackFill 方案。

MOR

picture.image

从上图可以看出,在 MOR 方案中,我们仍然需要一个 BackFill 任务来读取原始的 Data File 文件;所不同的是,我们只需读取少数需要的字段。例如对 A 列通过一些计算逻辑生成 C 列,那么 BackFill 任务只需从 Snapshot1 中读取 A 列的数据,且只需将 C 列的 Update 文件写入 Snapshot2 即可。

随着新增列的增多,需要将 Update 文件合并到 Data File 里面;为此,可以进一步提供一种 Compaction 逻辑,即通过读取旧的 Data File 和 Update File,合并生成新的 Data File。实现细节如下:

  • 旧 Data File 和 Update File 增加一个主键,每个文件按照主键排序;
  • 读取旧 Data File 时根据用户选择的列,分析具体需要哪些 Update File 和 Data File;
  • 根据旧 Data File 中 Min-Max 值去选择对应的 Update File。

由此可以看出,MOR 的本质是对多个 Data File 文件和 Update File 文件进行多路归并,归并的顺序由 SEQ 决定,SEQ 大的数据(表明数据越新)会覆盖 SEQ 小的数据。

两种特征回填方式对比

  • COW:读写放大严重、存储空间浪费、读取逻辑简单、写入耗费更多资源、读取无需额外计算资源;
  • MOR:没有读写放大、节省存储空间、读取逻辑复杂、写入耗费较少资源、绝大多数场景,不需要额外资源;

相比于 COW 方式的全量读取和写入,MOR 的优势在于只读取需要的列,同样也只写入更新的列,因此避免了读写放大的问题,节省大量计算资源,并大大降低读写 I/O;相比 COW 方式每次 COW 翻倍的情况,MOR 只需存储新增列,大量节省了存储资源。

对于模型训练任务而言,大多数模型训练只需要用到少量的列,因此大量的线上模型都无需 MOR 操作,涉及开销可忽略不计;对于少数的特征调研模型,只需读取模型对应的 Update File 即可,因此带来的读取资源增加也非常有限。

其他

picture.image

除了上面提到的借助 Compaction 提高读性能以及分析特征删除场景外,还提供了以下几个服务:

  • ExpirationSnapshot Expiration: 用于处理过期的 Snapshots。过期 Snapshots 不及时清理,会导致元数据文件堆积,从而带来文件膨胀问题,会给算法工程师带来困扰,因此需要服务定期做一些清理。我们通过平台化改造实现 Snapshots 文件的统一维护和清理;Data Expiration: 大部分数据是有新鲜度和时效性的,因此用户可设置数据保存多久后被清理。

  • CleanUp:由于一些事务的失败,或者一些快照的过期,导致文件在元数据文件中已经不再被引用,需要定期清理掉。

  • Roll-Back:对于一些在 Table 中非预期数据或者 Schema 变更,希望将其回滚到之前稳定的 Snapshot;结合平台的事件管理器,可以比较容易的实现这一功能。

  • Statistics: 用来实现一些湖平台可视化信息的展示,以及后端服务给业务带来的价值归纳。

平台化改造

picture.image

这里分享下自字节内部实现的平台化工作。上图是批式特征存储的列表,借助站内实现的湖平台化工作,业务部门可以轻松实现特征的可视化操作,以及信息概览的获取。

下图是一张特征表样例,通过这张表可以直观地看到存储空间的使用、文件数的统计、记录数统计、特征统计等信息。

picture.image

未来规划

规划重点

在未来规划中,计划逐步支持以下功能:

  • 湖冷热分层:在成本优化方面,可以通过湖冷热分层实现。前文提到对于保存超过一定时间的数据,可以直接删除;然而在某些特定的场景下,这些数据还会被使用,只是访问频率较低;因此未来考虑增加数据湖冷热分层功能,帮助用户降低成本。

  • 物化视图:在查询优化方面,通过物化视图提升查询性能。该功能是源于 ToB 客户的真实场景需求,目前这部分的优化工作正处于商业化交付流程中,大家可以后续在火山引擎官网相关的产品上进行体验。

  • Self-Optimize:在体验优化方面,实现 Self-Optimize,例如前文提到的一些数据维护的优化等。

  • 支持更多引擎:为了增加生态的丰富度, Iceberg 在未来也会逐渐更多的引擎。

整体平台架构总览

picture.image

整体平台架构以计算引擎产品为核心,包含两部分服务:

  • 云原生管理控制:Quota 服务、租户管理服务、运行时管理、生态整合服务、交付部署服务、网关服务;
  • 云原生运维平台:组件服务生命周期管理、Helm Chart 管理、日志&审计、监控报警、容灾&高可用;

如前文所述,该平台不仅支持公司内部的业务,还会支持一定的 ToB 的业务,以上在字节内部实现的功能,以及未来规划的能力也会基于内外一致的思路进行演进;最终都会落地到上图中涉及到的几款云原生计算产品中,如流式计算 Flink 版,云原生消息引擎 BMQ,云搜索服务 OpenSearch,大数据文件存储 CloudFS 等。以上均为 Serverless 的全托管产品,让用户更聚焦于自己的业务逻辑,减少数据运维带来的困扰。

如有需求,欢迎填写问卷,参与技术交流「链接」

268
0
1
0
关于作者
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论