DeepSeek Smallpond 在火山引擎 AI 数据湖的探索实践

向量数据库大模型机器学习

点击上方👆蓝字关注我们!

picture.image

picture.image

DeepSeek Smallpond 介绍

Smallpond 是一套由 DeepSeek 推出的 、针对 AI 领域,基于 Ray 和 DuckDB 实现的轻量级数据处理引擎,具有以下优点:

  1. 轻量级
  2. 高性能
  3. 支持规模大
  4. 无需运维
  5. Per Job 的资源调度

快速开始

Smallpond 提供了两套 API(具体介绍见下文),一套是 High-level 的 Dataframe API,一套是 Low-level 的Logicalplan API。前者简单、易理解,使用上非常类似 Pandas、PySpark 等引擎;后者灵活度高,可以实现更加复杂的数据处理逻辑。

  • Dataframe API

import smallpond

sp = smallpond.init()

df = sp.read_parquet("path/to/dataset/*.parquet")
df = df.repartition(10)
df = df.map("x + 1")
df.write_parquet("path/to/output")

当前 Dataframe API 功能还比较薄弱,针对一些高级场景,比如定义 Ray 运行参数、GPU等尚无法设置。

  • LogicalPlan API

from smallpond.logical.dataset import ParquetDataSet
from smallpond.logical.node import Context, DataSourceNode, DataSetPartitionNode, SqlEngineNode, LogicalPlan
from smallpond.execution.driver import Driver

def my_pipeline(input_paths: List[str], npartitions: int):
ctx = Context()
dataset = ParquetDataSet(input_paths)
node = DataSourceNode(ctx, dataset)
node = DataSetPartitionNode(ctx, (node,), npartitions=npartitions)
node = SqlEngineNode(ctx, (node,), "SELECT * FROM {0}")
return LogicalPlan(ctx, node)

if __name__ == "__main__":
driver = Driver()
driver.add_argument("-i", "--input_paths", nargs="+")
driver.add_argument("-n", "--npartitions", type=int, default=10)

plan = my_pipeline(**driver.get_arguments())
driver.run(plan)

python script.py -i "path/to/.parquet" -n 10 Ray # Ray 引擎
python script.py -i "path/to/
.parquet" -n 10 scheduler # built-in 引擎

注意,Smallpond 支持两种分布式引擎(具体介绍见下文),一种是 Ray 引擎,一种是 Built-in 引擎。使用方式见上文脚本所示。

架构介绍

下图为 Smallpond 架构:

picture.image

整体架构类似于 Spark 的架构,其组件 Dataframe、Logicalplan、Physicalplan、Scheduler 等 Spark 都有对应,是一个典型的 批处理形式 SQL 内核架构

  1. DataFrame 的接口目前只能支持 Ray Engine。
  2. 最底层是 存储层 。这个存储有两个作用:
  • 作为源数据和中间执行数据的存储,可以被 mount 到本地路径;
  • 如果选用 Built-in 执行引擎,这个存储还是 task 的序列化存储用于从 driver 节点向 executor 节点派发任务。除了 3FS 存储,Smallpond 还支持 fsspec 接口 ,从而对接其他存储。
  1. 引擎层 。这里有两个选项,一个是 Ray,一个是 Built-in (run driver 的时候通过 mode 来指定,如果选项是 Ray,走 Ray 引擎,如果选项是 Scheduler,走 Built-in 引擎)。官方说两套引擎是历史原因,未来会逐渐合并。
  2. 执行层 。完全类似 Spark 的实现,有 Logicalplan,如果选用 Dataframe 接口,还有优化器支持。最后的物理计划生成 task,会被调度器扔到远端的 worker 计算。task 的执行有两种选择:DuckDB 和 Arrow(官方文档未给出)。
  3. API 层 。支持 High-level 与 Low-level 的 API。

主要特点

特点 1:使用 Ray 做分布式调度和执行

Smallpond 使用 Ray 作为其分布式执行引擎之一,另一种为 Built-in 引擎。根据笔者测试,Ray 引擎相较于 Built-in 引擎有明显的性能方面的优势。一个可能重要的原因是,在 Built-in 引擎中,driver 向 executor 发送序列化任务不是通过 RPC 进行,而是通过共享存储方式进行,这个过程 task 序列化需要落共享存储。另外 Ray 相比有非常高效的 task 调度能力。

特点 2:MPI 的支持与 numa 绑定

这是一个可能容易被忽视的创新点:借助 MPI 框架,用户可以自定义任务,使用 MPI 做高效的集合通信。上文已经说到,用户完全可以自定义自己的 Python 脚本,而用户可以在自己的 Python 脚本里写 MPI 程序,从而使用 MPI 做高效的集合通信。

同时,worker 也做了 NUMA 绑定,做到更加高效的内存存取。

另外,代码中设置了 openmp 的环境变量。用户可以使用单核多线程来加速程序。

picture.image

特点 3:极具灵活性的 Low Level API

通过 Low Level API,用户不但可以自己定义 map、filter 等典型的 SQL 类型算子,也可以定义非 SQL 算子,例如可以定义 PythonScriptNode,用于执行 Python 脚本。

这样做的好处在于, 极大地增强了数据处理的灵活性 :某些数据处理需求可能不方便使用类似 Dataframe 的 map 算子来处理,就可以写 Python 代码自由地处理这些任务了。

从这个角度看,这个 Logicalplan 已经超出了 Spark Logicalplan 的范畴 ,兼具了一些类似于“ 工作流调度 ”的能力,可以调度进程。

特点 4:与 3FS 的结合

Smallpond 将 3FS 挂载到本地,可以利用 3FS 的性能优势,结合 DuckDB 的优秀处理能力,达到很高的处理效率。

不过,笔者认为与 3FS 的结合不能作为一个创新点来看待,因为两者是松耦合的(非深度结合),只是说运行在 3FS 上会使得 Smallpond 运行地更快,而挂载 filesystem 到本地实现分布式计算也是一个常规的行为。

火山引擎 AI 数据湖 LAS 介绍

总体架构

随着 LLM 和多模态 AI 技术的飞速发展,非结构化数据量呈指数级增长,这极大地增加了数据管理、计算和存储的复杂性。传统的数据湖解决方案已难以适应 AI 场景下对数据的新需求。为了应对这一挑战,新一代数据湖必须解决以下多模态数据带来的关键问题:

  • 数据管理 :传统数据管理侧重于库表结构,而面对多模态非结构化数据,如何实现高效管理。
  • 数据计算 :如何从非结构化数据中挖掘潜在价值,如何提高CPU和GPU利用率,如何使用模型来处理数据。
  • 数据存储 :传统数据湖格式在非结构化数据存储方面存在局限,是否可实现全模态数据的统一湖格式存储。
  • AI 场景支撑 :多模态数据湖如何支撑 预训练、后训练、知识库、AI 搜索、智能体、安全合规 等场景的智能化应用。

火山引擎基于内外部客户的实践,推出了一款面向 AI 场景的多模态数据湖服务 。总体架构如下

picture.image

功能介绍

LAS 提供了如下功能:

  1. 数据集管理 。用户可以根据数据的使用场景创建不同类型的数据集。比如,针对大规模预训练场景的大规模数据集,可以使用 LAS 的分布式处理能力;对于后训练阶段的 SFT 场景,LAS 推出了数据洞察以及细粒度的数据编辑功能。此外,LAS 还支持数据集多版本,满足算法人员在不同数据版本之间做对比实验的需求。

  2. 统一 Catalog 。用户可以将自己的数据注册为 catalog table,即能够使用平台提供的针对格式化数据的计算与分析工具。

  3. 丰富的算子支持 。LAS 提供了针对文本、图片、视频、音频、文档等类型的 100+ 算子,用户可以一键调用,标准化自己的数据处理流程。

  4. 工作流支持 。通过工作流,用户可以提交各种类型的数据处理作业,比如除了内置的算子标准化作业,还支持用户提交 Python 作业、Spark 作业、Ray 作业等等。

  5. 多数据湖格式/数据源 。支持 lance、Iceberg、Parquet、Json、CSV、VikingDB、Opensearch 等,满足各种场景需求。例如,针对训练或者微调过程,需要有高性能的点查需求,用户可以选择 lance;针对线上业务数据回流场景,可以选择 Iceberg;针对 RAG 场景,可以选择 VikingDB 作为数据 sink。

  6. 存算分离架构与分布式数据缓存 。LAS 推荐使用存算分离架构,以减少存储成本,提升计算的可扩展性。同时,LAS 针对存算分离场景提供了 Proton 缓存服务,以加速对 TOS 数据的访问。

Smallpond 与 LAS 融合实践

Smallpond DataFrame + LAS Ray 计算资源组

当前,有很多客户在云上运行他们的计算解决方案的同时,也希望能够在云上用上 Smallpond。为此,LAS 提出了基于 Ray 的云上方案,如下图所示:

picture.image

该云上方案具备五大优点:

  • 环境准备简单:无需用户需手工添加节点,打通 SSH,构建 MPI 集群。
  • 资源隔离:支持对 IO/网络/内存等更加严格的资源条件。
  • 认证鉴权:对资源的申请做用户鉴权。
  • 资源统一管理:用户无需手动管理计算资源,开箱即用。
  • K8s 调度:完全交由平台运维解决,支持排队,抢占等。

在该方案中,LAS 中的集群能够无缝的与 Smallpond 融合,只需要在云上开通资源 ,将 ray_address 设置成已开通的资源队里,其余逻辑无需改造,就可以完成数据预处理。

sp = smallpond.init(ray_address= "ray://192.xxxx:10001")

Smallpond 基于 Proton/TOS-FS 对接云存储

picture.image

Smallpond 不仅支持 3FS 协议的存储,还支持 fuse 和 fsspec 的接口。因此,我们可以针对大规模数据处理的场景,将数据存储在 TOS 对象存储上(LAS 支持 TOS 的 fsspec 协议访问),而将训练场景的数据放置到 vePFS 存储中。

扩展 Datasource-Lance 多模态数据湖

Lance 是新一代的列式存储结构,它被设计用来存储视频,图像,音频以及普通列式数据。它可以被存储在任何 POSIX 文件系统以及像 S3,TOS 等云存储上。Lance 允许数据被随机访问,在随机访问场景下它比 Parquet 性能快 100 倍。同时它具备向量检索,零拷贝的能力,并且与 Pyarrow,DuckDB 生态紧密结合。

Lance 的主要能力:

  1. 多版本管理 : Lance 是数据湖, 提供了 多版本的能力 , 能够快速的实现增删查改以及结构变更的需求, 也提供 time travel 的能力。
  2. 多维分析 : Lance 能够对接分布式计算引擎, 例如 Spark/Ray, 完成大规模数据分析需求。
  3. 随机检索 : Lance 构建了 主键索引和二级索引 , 能够实现快速的随机检索。
  4. 向量检索 : Lance 上实现了 IVF-PQ 和 IVF-HNSW 向量索引, 以及全文索引, 具有 混合搜索能力
  5. 多模数据 : Lance 自定义了底层文件格式, 能够写入大宽表和大宽列, 直接在表字段中存储多模数据, 例如文本/图像。
  6. 开放生态 : Lance 支持 Python/Java 客户端, 内存采用 arrow 格式, 适配了很多AI生态的引擎和大数据计算引擎。

LAS 中提供了完整的产品化的 Lance 湖服务能力,包括元数据管理,小文件合并服务, 而 Smallpond 也是能够无缝的接入lance的数据源。

以下是 Smallpond支持 Lance 的实践样例:

import lance
import arrow
from smallpond.logical.dataset import ArrowTableDataSet

从 Lance 格式的文件中读取数据

lance_ds = lance.dataset("example.lance")

将数据转换为 Arrow 的 Table

arrow_table = lance_ds.to_table()

将arrow_table转换成smallpond的dataset

smallpond_dataset = ArrowTableDataSet()

集成 LAS 的算子

Smallpond 支持 map/map_batches 的并行算子逻辑,其接口方式跟 Ray类似。而火山引擎 LAS 的算子服务能力接口是可以同时兼容 Spark/Ray。因此 LAS Built-in 的算子也都能够直接跑在 Smallpond 上。

picture.image

实践示例

场景描述:在 RAG 架构的离线入库场景,通过 LAS 产品提供的 分布式计算能力 Smallpond,实现从对象存储到向量数据库的全流程优化。在该链路中,读取数据后,利用 Smallpond 高效完成数据的切分(chunk)和向量化处理,并最终将向量数据批量入库至向量数据库。

同时,LAS 提供有 Chunk 和 Embedding 的算子,平台产品界面中,有对算子进行详细和介绍和 Demo 示例,便于用户快速搭建该链路。此外,也支持自定义算子。

以下按照自定义算子示例:

picture.image

Step 1:创建 LAS 计算资源

在 LAS 平台中提供有 CPU 和 GPU 的计算资源队列。由于 Embedding 消耗的算力较大,建议采用 GPU 计算资源。

Step 2:实现该流程的代码

定义三种处理器:Chunk、Embedding、写向量数据库。由于SmallPond未提供写入向量数据库的接口,可以利用map_batches方法实现。

import copy
import logging

import pyarrow as pa
import smallpond
from FlagEmbedding import FlagModel
from llama_index.core import Document
from llama_index.core.node_parser import SentenceSplitter
from volcengine.viking_db import Data, VikingDBService

class ChunkProcessor:
"""将文本切分成chunk片段,用于文本检索等场景。"""

def \_\_init\_\_(  
    self,  
    input\_col\_name: str,  
    output\_col\_name: str = "chunk",  
    chunk\_size: int = 500,  
    chunk\_overlap: int = 50,  
) -> None:  
    self.input\_col\_name = input\_col\_name  
    self.output\_col\_name = output\_col\_name  
    self.chunk\_size = chunk\_size  
    self.chunk\_overlap = chunk\_overlap  

    self.\_sentence\_splitter = SentenceSplitter(  
        chunk\_size=self.chunk\_size,  
        chunk\_overlap=self.chunk\_overlap,  
    )  

def \_\_call\_\_(self, row: dict, **kwargs) -> list[dict]:  
    text\_list = [row[self.input\_col\_name]]  
    documents = [Document(text=t) for t in text\_list]  
    sentence\_split\_nodes = self.\_sentence\_splitter.get\_nodes\_from\_documents(documents)  
    chunk\_result = []  

    for node in sentence\_split\_nodes:  
        node\_text = node.text  
        if len(node\_text) == 0:  
            continue  
        row\_new = copy.deepcopy(row)  
        row\_new[self.output\_col\_name] = node\_text  
        chunk\_result.append(row\_new)  
    return chunk\_result  

class EmbeddingProcessor:
"""使用 BGE 系列模型计算文本的 embedding。"""

def \_\_init\_\_(  
    self,  
    input\_col\_name: str,  
    output\_col\_name: str = "embedding",  
) -> None:  
    self.input\_col\_name = input\_col\_name  
    self.output\_col\_name = output\_col\_name  
    self.\_embedding\_model  = FlagModel("BAAI/bge-m3", use\_fp16=True)  

def \_\_call\_\_(self, table: pa.Table) -> pa.Table:  
    rows = table.to\_pandas()  
    content\_list = rows[self.input\_col\_name].tolist()  
    embeddings = self.\_embedding\_model.encode(content\_list)  
    embeddings = [embedding.tolist() for embedding in embeddings]  
    rows[self.output\_col\_name] = embeddings  
    return pa.Table.from\_pandas(rows, preserve\_index=False)  

class VikingdbSinkProcessor:
""" 将数据写入到火山的向量数据库VikingDB中 """

def \_\_init\_\_(  
    self,  
    collection\_name: str,  
    **kwargs,  
) -> None:  
    self.collection\_name = collection\_name  
    vikingdb\_service = VikingDBService(  
        host=vikingdb\_endpoint,  
        region=vikingdb\_region,  
        scheme="http",  
    )  
    vikingdb\_service.set\_ak(vikingdb\_ak)  
    vikingdb\_service.set\_sk(vikingdb\_sk)  
    self.collection =  vikingdb\_service.get\_collection(self.collection\_name)  

def \_upsert\_data(self, df):  
    datas = []  
    for \_, row in df.iterrows():  
        new\_row = row.to\_dict()  
        datas.append(Data(new\_row))  

    batch\_size = 10  
    for i in range(0, len(datas), batch\_size):  
        batch = datas[i : i + batch\_size]  
        self.collection.upsert\_data(batch, async\_upsert=True)  

def \_\_call\_\_(self, table:pa.Table):  
    self.\_upsert\_data(table.to\_pandas())  
    return table  
  

if __name__ == "__main__":
# 初始化
sp = smallpond.init()

# 数据处理流水线  
sp.read\_csv(paths, schema)\  
.flat\_map(ChunkProcessor(input\_col\_name = "index"))\  
.map\_batches(EmbeddingProcessor(input\_col\_name = "chunk"))\  
.map\_batches(VikingdbSinkProcessor(collection\_name = vikingdb\_dataset))\  
.take\_all()  

Step 3:在 LAS 平台提交该示例代码

LAS 平台提供直接运行 Python 脚本的能力。

  1. 在 LAS 平台中,算子管理菜单中 上传上述代码,以及代码依赖的镜像。也可以使用 LAS 平台提供的镜像。LAS 平台提供的镜像有 LAS 算子执行的镜像,也提供含 PyTorch 基础镜像等等。

  2. LAS 平台中,工作流中通过拖拽式方式,将该算子拖到画布中点击执行按钮,便可启动任务。界面上可以查看到执行日志和进度。

小结与规划

Smallpond 是 DeepSeek 开源的一个优秀的轻量级、高性能 AI 场景数据处理框架,一经推出便引起了业界的关注,项目 Star 数快速增长。其优点以及创新点上文已经有了详细的介绍,但由于项目处于开源初期,仍有很多问题有待解决,比如:

  1. 支持已有 Ray Cluster的接入方式;
  2. 数据源需要支持 S3 协议、TOS 等其他对象存储的协议;
  3. 适配更多的数据格式,尤其是面向多模的数据格式,例如 Lance,LMDB,Webdataset,Pickle 等。

相信随着时间的推移,上述这些问题都能得到很好的解决。

根据项目介绍,Smallpond 的目标是解决 AI 场景灵活的数据处理需求,这与火山引擎 LAS 多模数据湖的目标是相同的。LAS 数据湖配套有自己的数据处理框架,以及大量的用于多模数据处理的算子,用户可以开箱使用。而如果用户选择使用类似 Smallpond 的数据处理框架,通过 LAS 也能很好的支持,同时也能很好的发挥云的优势。

在未来,火山引擎 LAS会考虑 结合 Smallpond 优秀的架构能力与云低成本、易运维、以及生态协同的优势 ,为用户提供更加强大的 AI 数据处理功能。

点击【 阅读原文 】填写申请,抢先试用火山引擎 AI 数据湖!

0
0
0
0
关于作者

文章

0

获赞

0

收藏

0

相关资源
火山引擎大规模机器学习平台架构设计与应用实践
围绕数据加速、模型分布式训练框架建设、大规模异构集群调度、模型开发过程标准化等AI工程化实践,全面分享如何以开发者的极致体验为核心,进行机器学习平台的设计与实现。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论