Smallpond 是一套由 DeepSeek 推出的、针对 AI 领域,基于 Ray 和 DuckDB 实现的轻量级数据处理引擎。它具有以下优点:
- 轻量级
- 高性能
- 支持规模大
- 无需运维
- Per Job 的资源调度
快速开始
Smallpond 提供了两套 API(具体介绍见下文),一套是 High-level 的 Dataframe API,一套是 Low-level 的Logicalplan API。前者简单、易理解,使用上非常类似 Pandas、PySpark 等引擎;后者灵活度高,可以实现更加复杂的数据处理逻辑。
- Dataframe API
import smallpond
sp = smallpond.init()
df = sp.read_parquet("path/to/dataset/*.parquet")
df = df.repartition(10)
df = df.map("x + 1")
df.write_parquet("path/to/output")
当前 Dataframe API 功能还比较薄弱,针对一些高级场景,比如定义 Ray 运行参数、GPU等尚无法设置。
- LogicalPlan API
from smallpond.logical.dataset import ParquetDataSet
from smallpond.logical.node import Context, DataSourceNode, DataSetPartitionNode, SqlEngineNode, LogicalPlan
from smallpond.execution.driver import Driver
def my_pipeline(input_paths: List[str], npartitions: int):
ctx = Context()
dataset = ParquetDataSet(input_paths)
node = DataSourceNode(ctx, dataset)
node = DataSetPartitionNode(ctx, (node,), npartitions=npartitions)
node = SqlEngineNode(ctx, (node,), "SELECT * FROM {0}")
return LogicalPlan(ctx, node)
if __name__ == "__main__":
driver = Driver()
driver.add_argument("-i", "--input_paths", nargs="+")
driver.add_argument("-n", "--npartitions", type=int, default=10)
plan = my_pipeline(**driver.get_arguments())
driver.run(plan)
python script.py -i "path/to/*.parquet" -n 10 Ray # Ray 引擎
python script.py -i "path/to/*.parquet" -n 10 scheduler # built-in 引擎
注意,Smallpond 支持两种分布式引擎(具体介绍见下文),一种是 Ray 引擎,一种是 Built-in 引擎。使用方式见上文脚本所示。
架构介绍
下图为 Smallpond 架构
整体架构类似于 Spark 的架构,其组件 Dataframe、Logicalplan、Physicalplan、Scheduler 等 Spark 都有对应,是一个典型的批处理 形式 SQL内核 架构。
- DataFrame的接口目前只能支持Ray Engine
- 最底层是存储层。这个存储有两个作用:1. 作为源数据和中间执行数据的存储,可以被 mount 到本地路径;2. 如果选用 Built-in 执行引擎,这个存储还是 task 的序列化存储用于从 driver 节点向 executor 节点派发任务。除了 3FS 存储,Smallpond 还支持 fsspec 接口,从而对接其他存储。
- 引擎层。这里有两个选项,一个是 Ray,一个是 Built-in (run driver 的时候通过 mode 来指定,如果选项是 Ray,走 Ray 引擎,如果选项是 Scheduler,走 Built-in 引擎)。官方说两套引擎是历史原因,未来会逐渐合并。
- 执行层。完全类似 Spark 的实现,有 Logicalplan,如果选用 Dataframe 接口,还有优化器支持。最后的物理计划生成 task,会被调度器扔到远端的 worker 计算。task 的执行有两种选择:DuckDB 和 Arrow(官方文档未给出)。
- API 层。支持 High-level 与 Low-level 的 API。
主要特点
特点 1:使用 Ray 做分布式调度和执行
Smallpond 使用 Ray 作为其分布式执行引擎之一,另一种为 Built-in 引擎。根据笔者测试,Ray 引擎相较于 Built-in 引擎有明显的性能方面的优势。一个可能重要的原因是,在 Built-in 引擎中,driver 向 executor 发送序列化任务不是通过 RPC 进行,而是通过共享存储方式进行,这个过程 task 序列化需要落共享存储。另外 Ray 相比有非常高效的 task 调度能力。
特点 2:MPI 的支持与 numa 绑定
这是一个可能容易被忽视的创新点:借助 MPI 框架,用户可以自定义任务,使用 MPI 做高效的集合通信。上文已经说到,用户完全可以自定义自己的 Python 脚本,而用户可以在自己的 Python 脚本里写 MPI 程序,从而使用 MPI 做高效的集合通信。
同时,worker 也做了 NUMA 绑定,做到更加高效的内存存取。
另外,代码中设置了 openmp 的环境变量。用户可以使用单核多线程来加速程序。
特点 3:极具灵活性的 Low Level API
通过 Low Level API,用户不但可以自己定义 map、filter 等典型的 SQL 类型算子,也可以定义非 SQL 算子,例如可以定义 PythonScriptNode
,用于执行 Python 脚本。
这样做的好处在于,极大地增强了 数据处理 的灵活性:某些数据处理需求可能不方便使用类似 Dataframe 的 map 算子来处理,就可以写 Python 代码自由地处理这些任务了。
从这个角度看,这个 L ogicalplan 已经超出了 Spark L ogicalplan 的范畴,兼具了一些类似于“工作流调度”的能力,可以调度进程。
特点 4:与 3FS 的结合
Smallpond 将 3FS 挂载到本地,可以利用 3FS 的性能优势,结合 DuckDB 的优秀处理能力,达到很高的处理效率。
不过,笔者认为与 3FS 的结合不能作为一个创新点来看待,因为两者是松耦合的(非深度结合),只是说运行在 3FS 上会使得 Smallpond 运行地更快,而挂载 filesystem 到本地实现分布式计算也是一个常规的行为。
总体架构
随着LLM和多模态AI技术的飞速发展,非结构化数据量呈指数级增长,这极大地增加了数据管理、计算和存储的复杂性。传统的数据湖解决方案已难以适应AI场景下对数据的新需求。
为了应对这一挑战,新一代数据湖必须解决以下多模态数据带来的关键问题:
-
数据管理:传统数据管理侧重于库表结构,而面对多模态非结构化数据,如何实现高效管理
-
数据计算:如何从非结构化数据中挖掘潜在价值,如何提高CPU和GPU利用率,如何使用模型来处理数据
-
数据存储:传统数据湖格式在非结构化数据存储方面存在局限,是否可实现全模态数据的统一湖格式存储
-
AI 场景支撑:多模态数据湖如何支撑 预训练、后训练、知识库、AI 搜索、智能体、安全合规 等场景的智能化应用
火山引擎 基于内外部客户的实践,推出了一款面向 AI场景的多模态数据湖服务。总体架构如下:
功能介绍
LAS 提供了如下功能:
- 数据集管理。用户可以根据数据的使用场景创建不同类型的数据集。比如,针对大规模预训练场景的大规模数据集,可以使用 LAS 的分布式处理能力;对于后训练阶段的 SFT 场景,LAS 推出了数据洞察以及细粒度的数据编辑功能。此外,LAS 还支持数据集多版本,满足算法人员在不同数据版本之间做对比实验的需求。
- 统一 Catalog。用户可以将自己的数据注册为 catalog table,即能够使用平台提供的针对格式化数据的计算与分析工具。
- 丰富的算子支持。LAS 提供了针对文本、图片、视频、音频、文档等类型的 100+ 算子,用户可以一键调用,标准化自己的数据处理流程。
- 工作流支持。通过工作流,用户可以提交各种类型的数据处理作业,比如除了内置的算子标准化作业,还支持用户提交 Python 作业、Spark 作业、Ray 作业等等。
- 多数据湖格式/数据源。支持 lance、Iceberg、Parquet、Json、CSV、VikingDB、Opensearch 等,满足各种场景需求。例如,针对训练或者微调过程,需要有高性能的点查需求,用户可以选择 lance;针对线上业务数据回流场景,可以选择 Iceberg;针对 RAG 场景,可以选择 VikingDB 作为数据 sink。
- 存算分离架构与分布式数据缓存。LAS 推荐使用存算分离架构,以减少存储成本,提升计算的可扩展性。同时,LAS 针对存算分离场景提供了 Proton 缓存服务,以加速对 TOS 数据的访问。
Smallpond DataFrame + LAS Ray 计算资源组
当前,有很多客户在云上运行他们的计算解决方案的同时,也希望能够在云上用上 Smallpond。为此,LAS 提出了基于 Ray 的云上方案,如下图所示:
该云上方案具备五大优点:
- 环境准备简单:无需用户需手工添加节点,打通 SSH,构建MPI集群
- 资源隔离:支持对IO/网络/内存等更加严格的资源条件
- 认证鉴权:对资源的申请做用户鉴权
- 资源统一管理:用户无需手动管理计算资源,开箱即用
- K8S调度:完全交由平台运维解决,支持排队,抢占等
在该方案中,LAS 中的集群能够无缝的与 Smallpond 融合,只需要在云上开通资源 ,将 ray_address 设置成已开通的资源队里,其余逻辑无需改造,就可以完成数据预处理。
sp = smallpond.init(ray_address= "ray://192.xxxx:10001")
Smallpond基于Proton/TOS-FS对接云存储
Smallpond 不仅支持 3FS 协议的存储,还支持 fuse 和 fsspec 的接口。因此,我们可以针对大规模数据处理的场景,将数据存储在 TOS 对象存储上(LAS 支持 TOS 的 fsspec 协议访问),而将训练场景的数据放置到 vePFS 存储中。
扩展 Datasource-Lance 多模态数据湖
Lance 是新一代的列式存储结构,它被设计用来存储视频,图像,音频以及普通列式数据。它可以被存储在任何 POSIX 文件系统以及像 S3,TOS 等云存储上。Lance 允许数据被随机访问,在随机访问场景下它比 Parquet 性能快 100 倍。同时它具备向量检索,零拷贝的能力,并且与 Pyarrow,DuckDB 生态紧密结合。
Lance 的主要能力:
- 多版本管理: Lance 是数据湖, 提供了多版本的能力, 能够快速的实现增删查改以及结构变更的需求, 也提供 time travel 的能力.
- 多维分析: Lance 能够对接分布式计算引擎, 例如 Spark/Ray, 完成大规模数据分析需求.
- 随机检索: Lance 构建了主键 索引 和 二级索引, 能够实现快速的随机检索
- 向量检索: Lance 上实现了 IVF-PQ 和 IVF-HNSW 向量索引, 以及全文索引, 具有混合搜索能力.
- 多模数据: Lance 自定义了底层文件格式, 能够写入大宽表和大宽列, 直接在表字段中存储多模数据, 例如文本/图像
- 开放生态: Lance 支持 Python/Java 客户端, 内存采用 arrow 格式, 适配了很多AI生态的引擎和大数据计算引擎.
LAS 中提供了完整的产品化的 Lance 湖服务能力,包括元数据管理,小文件合并服务, 而 Smallpond 也是能够无缝的接入lance的数据源。
以下是 Smallpond支持 Lance 的实践样例:
import lance
import arrow
from smallpond.logical.dataset import ArrowTableDataSet
# 从 Lance 格式的文件中读取数据
lance_ds = lance.dataset("example.lance")
# 将数据转换为 Arrow 的 Table
arrow_table = lance_ds.to_table()
# 将arrow_table转换成smallpond的dataset
smallpond_dataset = ArrowTableDataSet()
集成 LAS 的算子
Smallpond 支持 map/map_batches 的并行算子逻辑,其接口方式跟 Ray类似。而火山引擎 LAS 的算子服务能力接口是可以同时兼容 Spark/Ray。因此 LAS Built-in 的算子也都能够直接跑在 Smallpond 上。
实践示例
场景描述:在RAG架构的离线入库场景,通过LAS 产品提供的 分布式计算能力 Smallpond,实现从对象存储到向量数据库的全流程优化。在该链路中,读取数据后,利用 Smallpond 高效完成数据的切分(chunk)和向量化处理,并最终将向量数据批量入库至向量数据库。
同时,LAS 提供有 Chunk 和 Embedding 的算子,平台产品界面中,有对算子进行详细和介绍和 Demo 示例,便于用户快速搭建该链路。此外,也支持自定义算子。
以下按照自定义算子示例:
Step 1: 创建 LAS 计算资源
在 LAS 平台中提供有 CPU 和 GPU 的计算资源队列。由于 Embedding 消耗的算力较大,建议采用 GPU 计算资源。
Step 2: 实现该流程的代码
定义三种处理器:Chunk、Embedding、写向量数据库。由于SmallPond未提供写入向量数据库的接口,可以利用map_batches方法实现。
import copy
import logging
import pyarrow as pa
import smallpond
from FlagEmbedding import FlagModel
from llama_index.core import Document
from llama_index.core.node_parser import SentenceSplitter
from volcengine.viking_db import Data, VikingDBService
class ChunkProcessor:
"""将文本切分成chunk片段,用于文本检索等场景。"""
def __init__(
self,
input_col_name: str,
output_col_name: str = "chunk",
chunk_size: int = 500,
chunk_overlap: int = 50,
) -> None:
self.input_col_name = input_col_name
self.output_col_name = output_col_name
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self._sentence_splitter = SentenceSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
)
def __call__(self, row: dict, **kwargs) -> list[dict]:
text_list = [row[self.input_col_name]]
documents = [Document(text=t) for t in text_list]
sentence_split_nodes = self._sentence_splitter.get_nodes_from_documents(documents)
chunk_result = []
for node in sentence_split_nodes:
node_text = node.text
if len(node_text) == 0:
continue
row_new = copy.deepcopy(row)
row_new[self.output_col_name] = node_text
chunk_result.append(row_new)
return chunk_result
class EmbeddingProcessor:
"""使用 BGE 系列模型计算文本的 embedding。"""
def __init__(
self,
input_col_name: str,
output_col_name: str = "embedding",
) -> None:
self.input_col_name = input_col_name
self.output_col_name = output_col_name
self._embedding_model = FlagModel("BAAI/bge-m3", use_fp16=True)
def __call__(self, table: pa.Table) -> pa.Table:
rows = table.to_pandas()
content_list = rows[self.input_col_name].tolist()
embeddings = self._embedding_model.encode(content_list)
embeddings = [embedding.tolist() for embedding in embeddings]
rows[self.output_col_name] = embeddings
return pa.Table.from_pandas(rows, preserve_index=False)
class VikingdbSinkProcessor:
""" 将数据写入到火山的向量数据库VikingDB中 """
def __init__(
self,
collection_name: str,
**kwargs,
) -> None:
self.collection_name = collection_name
vikingdb_service = VikingDBService(
host=vikingdb_endpoint,
region=vikingdb_region,
scheme="http",
)
vikingdb_service.set_ak(vikingdb_ak)
vikingdb_service.set_sk(vikingdb_sk)
self.collection = vikingdb_service.get_collection(self.collection_name)
def _upsert_data(self, df):
datas = []
for _, row in df.iterrows():
new_row = row.to_dict()
datas.append(Data(new_row))
batch_size = 10
for i in range(0, len(datas), batch_size):
batch = datas[i : i + batch_size]
self.collection.upsert_data(batch, async_upsert=True)
def __call__(self, table:pa.Table):
self._upsert_data(table.to_pandas())
return table
if __name__ == "__main__":
# 初始化
sp = smallpond.init()
# 数据处理流水线
sp.read_csv(paths, schema)\
.flat_map(ChunkProcessor(input_col_name = "index"))\
.map_batches(EmbeddingProcessor(input_col_name = "chunk"))\
.map_batches(VikingdbSinkProcessor(collection_name = vikingdb_dataset))\
.take_all()
Step 3: 在 LAS 平台提交该示例代码
LAS 平台提供直接运行 Python 脚本的能力。
- 在 LAS 平台中,算子管理菜单中 上传上述代码,以及代码依赖的镜像。也可以使用 LAS 平台提供的镜像。LAS 平台提供的镜像有 LAS 算子执行的镜像,也提供含 PyTorch 基础镜像等等。
- LAS 平台中,工作流中通过拖拽式方式,将该算子拖到画布中点击执行按钮,便可启动任务。界面上可以查看到执行日志和进度。
Smallpond 是 DeepSeek 开源的一个优秀的轻量级、高性能 AI 场景数据处理框架,一经推出便引起了业界的关注,项目 Star 数快速增长。其优点以及创新点上文已经有了详细的介绍,但由于项目处于开源初期,仍有很多问题有待解决,比如:
- 支持已有 Ray Cluster的接入方式
- 数据源需要支持 S3 协议、TOS 等其他对象存储的协议
- 适配更多的数据格式,尤其是面向多模的数据格式,例如 Lance,LMDB,Webdataset,Pickle 等
相信随着时间的推移,上述这些问题都能得到很好的解决。
根据项目介绍,Smallpond 的目标是解决 AI 场景灵活的数据处理需求,这与火山引擎 LAS 多模数据湖的目标是相同的。LAS 数据湖配套有自己的数据处理框架,以及大量的用于多模数据处理的算子,用户可以开箱使用。而如果用户选择使用类似 Smallpond 的数据处理框架,通过 LAS 也能很好的支持,同时也能很好的发挥云的优势。
在未来,火山引擎 LAS会考虑结合 Smallpond 优秀的架构能力与云低成本、易运维、以及生态协同的优势,为用户提供更加强大的 AI 数据处理功能。