在知识库检索系统中,文档分段策略对检索效果有着至关重要的影响。Dify 作为一个强大的 RAG(检索增强生成)平台,提供了多种文档分段策略,其中父子分段模式(Parent-Child Chunking)是一种特别高级的处理方式,能够显著提升检索质量。接下来将深入解析Dify中父子分段模式的原理和实现细节。
👆👆👆欢迎关注,一起进步👆👆👆
一、父子分段模式原理
1.1 基本概念
父子分段模式是一种层次化的文档处理策略,它将文档分为两个层次:
- 父段落(Parent Chunk) :较大的文本块,通常是一个完整的段落或多个相关段落的集合,提供上下文信息。
- 子段落(Child Chunk) :从父段落中进一步分割出的较小文本块,是实际用于检索的单位。
这种分段模式的核心思想是:用小的子段落进行精确检索,用大的父段落提供完整上下文 。
其基本机制包括:
- 子分段匹配查询 :
- 将文档拆分为较小、集中的信息单元(例如一句话),更加精准的匹配用户所输入的问题。
- 子分段能快速提供与用户需求最相关的初步结果。
- 父分段提供上下文 :
- 将包含匹配子分段的更大部分(如段落、章节甚至整个文档)视作父分段并提供给大语言模型(LLM)。
- 父分段能为 LLM 提供完整的背景信息,避免遗漏重要细节,帮助 LLM 输出更贴合知识库内容的回答。
1.2 工作流程
1.2.1 父分段
父分段设置提供以下分段选项:
- 段落 :根据预设的分隔符规则和最大块长度将文本拆分为段落。每个段落视为父分段,适用于文本量较大,内容清晰且段落相对独立的文档。支持以下设置项:
- 分段标识符
:默认值为
\n
,即按照文本段落分段。你可以遵循正则表达式语法自定义分块规则,系统将在文本出现分段标识符时自动执行分段。 - 分段最大长度 :指定分段内的文本字符数最大上限,超出该长度时将强制分段。默认值为 500 Tokens,分段长度的最大上限为 4000 Tokens;
- 全文 :不进行段落分段,而是直接将全文视为单一父分段。出于性能原因,仅保留文本内的前 10000 Tokens 字符,适用于文本量较小,但段落间互有关联,需要完整检索全文的场景。
图片来自 Dify 官网
1.2.2 子分段
子分段文本是在父文本分段基础上,由分隔符规则切分而成,用于查找和匹配与问题关键词最相关和直接的信息。如果使用默认的子分段规则,通常呈现以下分段效果:
- 当父分段为段落时,子分段对应各个段落中的单个句子。
- 父分段为全文时,子分段对应全文中各个单独的句子。
在子分段内填写以下分段设置:
- 分段标识符 :默认值为 ,即按照句子进行分段。你可以遵循正则表达式语法自定义分块规则,系统将在文本出现分段标识符时自动执行分段。
- 分段最大长度 :指定分段内的文本字符数最大上限,超出该长度时将强制分段。默认值为 200 Tokens,分段长度的最大上限为 4000 Tokens;
还可以使用文本预处理规则 过滤知识库内部分无意义的内容:
- 替换连续的空格、换行符和制表符
- 删除所有 URL 和电子邮件地址
配置完成后,点击”预览区块”即可查看分段后的效果。你可以查看父分段的整体字符数。背景标蓝的字符为子分块,同时显示当前子段的字符数。
图片来自 Dify 官网
1.2.3 父子分段模式的工作流程
父子分段模式的工作流程
1.3 优势分析
父子分段模式相比传统的单一分段模式有以下优势:
- 检索精度更高 :子段落粒度小,能够更精确地匹配用户查询。
- 上下文更完整 :返回父段落作为上下文,提供更全面的信息。
- 减少语义断裂 :避免了单一分段可能导致的语义断裂问题。
- 灵活性更强 :可以根据不同类型的文档选择不同的父子分段策略。
二、系统架构设计
2.1 整体架构
整体架构
2.2 配置界面
图片来自 Dify 官网
2.3 数据模型设计
2.3.1 实体模型关系
实体模型关系
2.3.1 实体关系说明
- Dataset(数据集) :
- 是知识库的顶层容器
- 包含多个 Document(文档)
- 定义了索引技术、嵌入模型等全局设置
- Document(文档) :
- 属于一个 Dataset
- 包含多个 DocumentSegment(文档分段)
- 关联一个 DatasetProcessRule(处理规则)
- 定义了文档的语言、分段模式等属性
- DocumentSegment(文档分段) :
- 属于一个 Document
- 在父子分段模式下,包含多个 ChildChunk(子分段)
- 有自己的索引节点 ID 和哈希值
- 存储分段内容、位置、词数等信息
- ChildChunk(子分段) :
- 属于一个 DocumentSegment
- 有自己的索引节点 ID 和哈希值
- 存储子分段内容、位置、词数等信息
- 类型可以是自动生成或自定义
- DatasetProcessRule(数据集处理规则) :
- 关联到 Dataset 和 Document
- 包含处理模式和规则配置
- 规则中定义了父子分段的处理方式
- Rule(规则) :
- 定义父模式(全文档或段落)
- 包含分段和子分段的配置
- Segmentation(分段配置) :
- 定义分隔符、最大标记数和重叠设置
三、核心代码实现
3.1 数据模型实现
在 Dify 中,父子分段模式的数据模型主要由DocumentSegment
和ChildChunk
两个类实现:
3.1.1 DocumentSegment 模型
DocumentSegment
模型定义在api/models/dataset.py
文件中,代表父段落:
classDocumentSegment(db.Model):
\_\_tablename\_\_ = "document\_segments"
id = db.Column(db.
String
(36), primary\_key=True)
tenant\_id = db.Column(db.
String
(36), nullable=False)
dataset\_id = db.Column(db.
String
(36), nullable=False)
document\_id = db.Column(db.
String
(36), nullable=False)
position = db.Column(db.Integer, nullable=False, default=0)
content = db.Column(db.Text, nullable=False)
# 其他字段省略...
@property
defchild\_chunks(self):
# 获取子段落的属性方法
dataset = db.session.query(Dataset).filter(Dataset.id == self.dataset\_id).first()
ifnot dataset or dataset.process\_rule.mode != "hierarchical"or dataset.process\_rule.rules.get("parent\_mode") == "FULL\_DOC":
return []
child\_chunks = (
db.session.query(ChildChunk)
.filter(
ChildChunk.tenant\_id == self.tenant\_id,
ChildChunk.dataset\_id == self.dataset\_id,
ChildChunk.document\_id == self.document\_id,
ChildChunk.segment\_id == self.id,
)
.all()
)
return child\_chunks or []
defget\_child\_chunks(self):
# 获取子段落的方法
child\_chunks = (
db.session.query(ChildChunk)
.filter(
ChildChunk.tenant\_id == self.tenant\_id,
ChildChunk.dataset\_id == self.dataset\_id,
ChildChunk.document\_id == self.document\_id,
ChildChunk.segment\_id == self.id,
)
.all()
)
return child\_chunks or []
3.1.2 ChildChunk 模型
ChildChunk
模型同样定义在api/models/dataset.py
文件中,代表子段落:
classChildChunk(db.Model):
\_\_tablename\_\_ = "child\_chunks"
\_\_table\_args\_\_ = (
db.PrimaryKeyConstraint("id",
name
="child\_chunk\_pkey"),
db.
Index
("child\_chunk\_dataset\_id\_idx", "tenant\_id", "dataset\_id", "document\_id", "segment\_id", "index\_node\_id"),
db.
Index
("child\_chunks\_node\_idx", "index\_node\_id", "dataset\_id"),
db.
Index
("child\_chunks\_segment\_idx", "segment\_id"),
)
id = db.Column(db.
String
(36), primary\_key=True, default=lambda: str(uuid.uuid4()))
tenant\_id = db.Column(db.
String
(36), nullable=False)
dataset\_id = db.Column(db.
String
(36), nullable=False)
document\_id = db.Column(db.
String
(36), nullable=False)
segment\_id = db.Column(db.
String
(36), nullable=False)
position = db.Column(db.Integer, nullable=False, default=0)
content = db.Column(db.Text, nullable=False)
word\_count = db.Column(db.Integer, nullable=False, default=0)
index\_node\_id = db.Column(db.
String
(36))
index\_node\_hash = db.Column(db.
String
(64))
type = db.Column(db.
String
(16), nullable=False, default="automatic")
# 其他字段省略...
3.2 索引处理器实现
父子分段模式的索引处理由ParentChildIndexProcessor
类实现,定义在api/core/rag/index\_processor/processor/parent\_child\_index\_processor.py
文件中:
classParentChildIndexProcessor(BaseIndexProcessor):
defextract(self, extract\_setting: ExtractSetting, **kwargs) -> list[Document]:
# 从原始文档中提取文本
text\_docs = ExtractProcessor.extract(
extract\_setting=extract\_setting,
is\_automatic=(
kwargs.get("process\_rule\_mode") == "automatic"or kwargs.get("process\_rule\_mode") == "hierarchical"
),
)
return text\_docs
deftransform(self, documents: list[Document], **kwargs) -> list[Document]:
# 将文本转换为父子段落结构
process\_rule = kwargs.get("process\_rule")
ifnot process\_rule:
raise ValueError("No process rule found.")
ifnot process\_rule.get("rules"):
raise ValueError("No rules found in process rule.")
rules = Rule(**process\_rule.get("rules"))
all\_documents = [] # type: ignore
if rules.parent\_mode == ParentMode.PARAGRAPH:
# 段落模式:将文档分割为多个父段落
# 代码省略...
elif rules.parent\_mode == ParentMode.FULL\_DOC:
# 全文模式:将整个文档作为一个父段落
page\_content = "\n".join([document.page\_content for document in documents])
document = Document(page\_content=page\_content, metadata=documents[0].metadata)
# 解析文档为子节点
child\_nodes = self.\_split\_child\_nodes(
document, rules, process\_rule.get("mode"), kwargs.get("embedding\_model\_instance")
)
if kwargs.get("preview"):
if len(child\_nodes) > dify\_config.CHILD\_CHUNKS\_PREVIEW\_NUMBER:
child\_nodes = child\_nodes[: dify\_config.CHILD\_CHUNKS\_PREVIEW\_NUMBER]
document.children = child\_nodes
doc\_id = str(uuid.uuid4())
hash = helper.generate\_text\_hash(document.page\_content)
document.metadata["doc\_id"] = doc\_id
document.metadata["doc\_hash"] = hash
all\_documents.append(document)
return all\_documents
defload(self, dataset: Dataset, documents: list[Document], with\_keywords: bool = True, **kwargs):
# 将父子段落加载到向量数据库
if dataset.indexing\_technique == "high\_quality":
vector = Vector(dataset)
for document in documents:
child\_documents = document.children
if child\_documents:
formatted\_child\_documents = [
Document(**child\_document.model\_dump()) for child\_document in child\_documents
]
vector.create(formatted\_child\_documents)
def\_split\_child\_nodes(
self,
document\_node: Document,
rules: Rule,
process\_rule\_mode: str,
embedding\_model\_instance: Optional[ModelInstance],
) -> list[ChildDocument]:
# 将父段落分割为子段落
ifnot rules.subchunk\_segmentation:
raise ValueError("No subchunk segmentation found in rules.")
child\_splitter = self.\_get\_splitter(
processing\_rule\_mode=process\_rule\_mode,
max\_tokens=rules.subchunk\_segmentation.max\_tokens,
chunk\_overlap=rules.subchunk\_segmentation.chunk\_overlap,
separator
=rules.subchunk\_segmentation.
separator
,
embedding\_model\_instance=embedding\_model\_instance,
)
# 解析文档为子节点
child\_nodes = []
child\_documents = child\_splitter.split\_documents([document\_node])
for child\_document\_node in child\_documents:
if child\_document\_node.page\_content.strip():
doc\_id = str(uuid.uuid4())
hash = helper.generate\_text\_hash(child\_document\_node.page\_content)
child\_document = ChildDocument(
page\_content=child\_document\_node.page\_content, metadata=document\_node.metadata
)
child\_document.metadata["doc\_id"] = doc\_id
child\_document.metadata["doc\_hash"] = hash
child\_page\_content = child\_document.page\_content
if child\_page\_content.startswith(".") or child\_page\_content.startswith("。"):
child\_page\_content = child\_page\_content[1:].strip()
if len(child\_page\_content) > 0:
child\_document.page\_content = child\_page\_content
child\_nodes.append(child\_document)
return child\_nodes
3.3 API 控制器实现
父子分段的API控制器主要包括ChildChunkAddApi
和ChildChunkUpdateApi
,定义在api/controllers/console/datasets/datasets\_segments.py
文件中:
classChildChunkAddApi(Resource):
@setup\_required
@login\_required
@account\_initialization\_required
@cloud\_edition\_billing\_resource\_check("vector\_space")
@cloud\_edition\_billing\_knowledge\_limit\_check("add\_segment")
@cloud\_edition\_billing\_rate\_limit\_check("knowledge")
defpost(self, dataset\_id, document\_id, segment\_id):
# 添加子段落
# 代码省略...
try:
child\_chunk = SegmentService.create\_child\_chunk(args.get("content"),
segment
, document, dataset)
except
ChildChunkIndexingServiceError as e:
raise ChildChunkIndexingError(str(e))
return {"
data
": marshal(child\_chunk, child\_chunk\_fields)}, 200
@setup\_required
@login\_required
@account\_initialization\_required
defget(self, dataset\_id, document\_id, segment\_id):
# 获取子段落列表
# 代码省略...
child\_chunks = SegmentService.get\_child\_chunks(segment\_id, document\_id, dataset\_id, page, limit,
keyword
)
return {
"
data
": marshal(child\_chunks.items, child\_chunk\_fields),
"total": child\_chunks.total,
"total\_pages": child\_chunks.pages,
"page": page,
"limit": limit,
}, 200
classChildChunkUpdateApi(Resource):
@setup\_required
@login\_required
@account\_initialization\_required
@cloud\_edition\_billing\_rate\_limit\_check("knowledge")
defdelete(self, dataset\_id, document\_id, segment\_id, child\_chunk\_id):
# 删除子段落
# 代码省略...
try:
SegmentService.delete\_child\_chunk(child\_chunk, dataset)
except
ChildChunkDeleteIndexServiceError as e:
raise ChildChunkDeleteIndexError(str(e))
return {"result": "success"}, 204
@setup\_required
@login\_required
@account\_initialization\_required
@cloud\_edition\_billing\_resource\_check("vector\_space")
@cloud\_edition\_billing\_rate\_limit\_check("knowledge")
defpatch(self, dataset\_id, document\_id, segment\_id, child\_chunk\_id):
# 更新子段落
# 代码省略...
try:
child\_chunk = SegmentService.update\_child\_chunk(
args.get("content"), child\_chunk,
segment
, document, dataset
)
except
ChildChunkIndexingServiceError as e:
raise ChildChunkIndexingError(str(e))
return {"
data
": marshal(child\_chunk, child\_chunk\_fields)}, 200
3.4 服务层实现
子段落的服务层主要在SegmentService
类中实现,定义在api/services/dataset\_service.py
文件中:
@classmethod
defcreate\_child\_chunk(
cls, content: str,
segment
: DocumentSegment, document: Document, dataset: Dataset
) -> ChildChunk:
lock\_name = "add\_child\_lock\_{}".format(
segment
.id)
with redis\_client.lock(lock\_name, timeout=20):
index\_node\_id = str(uuid.uuid4())
index\_node\_hash = helper.generate\_text\_hash(content)
child\_chunk\_count = (
db.session.query(ChildChunk)
.filter(
ChildChunk.tenant\_id == current\_user.current\_tenant\_id,
ChildChunk.dataset\_id == dataset.id,
ChildChunk.document\_id == document.id,
ChildChunk.segment\_id ==
segment
.id,
)
.count()
)
max\_position = (
db.session.query(func.max(ChildChunk.position))
.filter(
ChildChunk.tenant\_id == current\_user.current\_tenant\_id,
ChildChunk.dataset\_id == dataset.id,
ChildChunk.document\_id == document.id,
ChildChunk.segment\_id ==
segment
.id,
)
.
scalar
()
)
child\_chunk = ChildChunk(
tenant\_id=current\_user.current\_tenant\_id,
dataset\_id=dataset.id,
document\_id=document.id,
segment\_id=
segment
.id,
position=max\_position + 1if max\_position else1,
index\_node\_id=index\_node\_id,
index\_node\_hash=index\_node\_hash,
content=content,
word\_count=len(content),
type="customized",
created\_by=current\_user.id,
)
db.session.add(child\_chunk)
# 保存向量索引
try:
VectorService.create\_child\_chunk\_vector(child\_chunk, dataset)
except
Exception as e:
logging.exception("create child chunk
index
failed")
db.session.rollback()
raise ChildChunkIndexingError(str(e))
db.session.commit()
return child\_chunk
3.5 向量服务实现
子段落的向量服务主要在VectorService
类中实现,定义在api/services/vector\_service.py
文件中:
@classmethod
defcreate\_child\_chunk\_vector(cls, child\_segment: ChildChunk, dataset: Dataset):
child\_document = Document(
page\_content=child\_segment.content,
metadata={
"doc\_id": child\_segment.index\_node\_id,
"doc\_hash": child\_segment.index\_node\_hash,
"document\_id": child\_segment.document\_id,
"dataset\_id": child\_segment.dataset\_id,
},
)
if dataset.indexing\_technique == "high\_quality":
# 保存向量索引
vector = Vector(dataset=dataset)
vector.add\_texts([child\_document], duplicate\_check=True)
四、使用场景与最佳实践
4.1 适用场景
父子分段模式特别适合以下场景:
- 长文档处理 :对于长篇文档,父子分段可以保持上下文的完整性。
- 结构化文档 :对于有明确段落结构的文档,如学术论文、技术文档等。
- 需要精确检索的场景 :当用户查询需要精确匹配特定内容,同时又需要提供足够上下文的场景。
- 多语言文档 :对于包含多种语言的文档,父子分段可以更好地处理语言边界。
4.2 配置建议
在 Dify 中配置父子分段模式时,可以参考以下建议:
- 父段落大小 :建议设置为 300-500 个token,这样可以包含足够的上下文信息。
- 子段落大小 :建议设置为 100-200 个token,这样可以保证检索的精确性。
- 分隔符选择 :
- 父段落分隔符:建议使用
\n\n
(两个换行符),适合分割段落。 - 子段落分隔符:建议使用
\n
(单个换行符),适合在段落内部进一步分割。
4.3 性能优化
为了优化父子分段模式的性能,可以考虑以下几点:
- 合理设置子段落数量 :过多的子段落会增加索引大小和检索时间,建议根据文档特性合理设置。
- 使用高质量的嵌入模型 :选择性能更好的嵌入模型可以提高检索质量。
- 定期重新生成子段落 :当文档内容有重大更新时,可以考虑重新生成子段落。
五、实际应用示例
5.1 技术文档检索
假设我们有一份技术文档,需要实现精确的检索功能。使用父子分段模式的步骤如下:
- 上传文档 :将技术文档上传到 Dify 平台。
- 选择分段模式 :在数据集设置中选择"Parent-Child"分段模式。
- 配置分段参数 :
- 父段落模式:选择"段落模式"
- 父段落分隔符:
\n\n
- 父段落大小:400 tokens
- 子段落分隔符:
\n
- 子段落大小:150 tokens
-
处理文档 :系统会自动将文档分割为父段落和子段落,并创建向量索引。
-
检索测试 :使用不同的查询测试检索效果,观察返回的父段落是否提供了足够的上下文信息。
5.2 自定义子段落
在某些情况下,自动生成的子段落可能不够理想,这时可以手动添加或编辑子段落:
- 查看父段落 :在文档详情页面查看已生成的父段落。
- 添加子段落 :点击"添加子段落"按钮,输入自定义的子段落内容。
- 编辑子段落 :选择已有的子段落,点击编辑按钮修改内容。
- 删除子段落 :选择不需要的子段落,点击删除按钮移除。
- 重新生成 :如果需要重新生成所有子段落,可以点击"重新生成子段落"按钮。
六、总结与展望
6.1 技术总结
Dify 的父子分段模式是一种高级的文档处理策略,通过将文档分为父段落和子段落两个层次,实现了精确检索和完整上下文的平衡,为用户提供了更精确、更全面的文档检索体验。其核心实现包括:
- 数据模型设计
:使用
DocumentSegment
和ChildChunk
两个模型分别表示父段落和子段落。 - 索引处理器
:通过
ParentChildIndexProcessor
实现文档的提取、转换和加载。 - API控制器 :提供了添加、更新、删除子段落的API接口。
- 向量服务 :负责创建和管理子段落的向量表示。
- 前端实现 :提供了友好的用户界面,方便用户管理子段落。
参考资料
https://github.com/langgenius/dify (v1.4.1)
七、推荐阅读
- 从零开始学 Dify
- 从零开始学 Dify- RAG 知识库系统设计详解
- 从零开始学 Dify- 对话系统的关键功能
- 从零开始学 Dify- 工作流(Workflow)系统架构
- 从零开始学 Dify-扫描、加载和管理模型提供者的详细过程
- 从零开始学 Dify-详细介绍 Dify 模型运行时的核心架构
- 从零开始学 Dify - 详细介绍 Dify 工具(Tool)系统核心架构设计
- 从零开始学 Dify - Dify 的 RAG 系统如何有效地处理和检索大量文档?
- 智能问数:从模板到指标服务
- 替代 NL2SQL,Agent+业务语义的创新产品设计
👆👆👆欢迎关注,一起进步👆👆👆
欢迎留言讨论哈
🧐点赞、分享、推荐 ,一键三连,养成习惯👍