从零开始学 Dify - 万字详解RAG父子分段模式的原理与实现

向量数据库大模型NoSQL数据库

在知识库检索系统中,文档分段策略对检索效果有着至关重要的影响。Dify 作为一个强大的 RAG(检索增强生成)平台,提供了多种文档分段策略,其中父子分段模式(Parent-Child Chunking)是一种特别高级的处理方式,能够显著提升检索质量。接下来将深入解析Dify中父子分段模式的原理和实现细节。

👆👆👆欢迎关注,一起进步👆👆👆

一、父子分段模式原理

1.1 基本概念

父子分段模式是一种层次化的文档处理策略,它将文档分为两个层次:

  • 父段落(Parent Chunk) :较大的文本块,通常是一个完整的段落或多个相关段落的集合,提供上下文信息。
  • 子段落(Child Chunk) :从父段落中进一步分割出的较小文本块,是实际用于检索的单位。

这种分段模式的核心思想是:用小的子段落进行精确检索,用大的父段落提供完整上下文

其基本机制包括:

  • 子分段匹配查询
  • 将文档拆分为较小、集中的信息单元(例如一句话),更加精准的匹配用户所输入的问题。
  • 子分段能快速提供与用户需求最相关的初步结果。
  • 父分段提供上下文
  • 将包含匹配子分段的更大部分(如段落、章节甚至整个文档)视作父分段并提供给大语言模型(LLM)。
  • 父分段能为 LLM 提供完整的背景信息,避免遗漏重要细节,帮助 LLM 输出更贴合知识库内容的回答。

1.2 工作流程

1.2.1 父分段

父分段设置提供以下分段选项:

  • 段落 :根据预设的分隔符规则和最大块长度将文本拆分为段落。每个段落视为父分段,适用于文本量较大,内容清晰且段落相对独立的文档。支持以下设置项:
  • 分段标识符 :默认值为 \n ,即按照文本段落分段。你可以遵循正则表达式语法自定义分块规则,系统将在文本出现分段标识符时自动执行分段。
  • 分段最大长度 :指定分段内的文本字符数最大上限,超出该长度时将强制分段。默认值为 500 Tokens,分段长度的最大上限为 4000 Tokens;
  • 全文 :不进行段落分段,而是直接将全文视为单一父分段。出于性能原因,仅保留文本内的前 10000 Tokens 字符,适用于文本量较小,但段落间互有关联,需要完整检索全文的场景。

picture.image

图片来自 Dify 官网

1.2.2 子分段

子分段文本是在父文本分段基础上,由分隔符规则切分而成,用于查找和匹配与问题关键词最相关和直接的信息。如果使用默认的子分段规则,通常呈现以下分段效果:

  • 当父分段为段落时,子分段对应各个段落中的单个句子。
  • 父分段为全文时,子分段对应全文中各个单独的句子。

在子分段内填写以下分段设置:

  • 分段标识符 :默认值为 ,即按照句子进行分段。你可以遵循正则表达式语法自定义分块规则,系统将在文本出现分段标识符时自动执行分段。
  • 分段最大长度 :指定分段内的文本字符数最大上限,超出该长度时将强制分段。默认值为 200 Tokens,分段长度的最大上限为 4000 Tokens;

还可以使用文本预处理规则 过滤知识库内部分无意义的内容:

  • 替换连续的空格、换行符和制表符
  • 删除所有 URL 和电子邮件地址

配置完成后,点击”预览区块”即可查看分段后的效果。你可以查看父分段的整体字符数。背景标蓝的字符为子分块,同时显示当前子段的字符数。

picture.image

图片来自 Dify 官网

1.2.3 父子分段模式的工作流程

picture.image

父子分段模式的工作流程

1.3 优势分析

父子分段模式相比传统的单一分段模式有以下优势:

  1. 检索精度更高 :子段落粒度小,能够更精确地匹配用户查询。
  2. 上下文更完整 :返回父段落作为上下文,提供更全面的信息。
  3. 减少语义断裂 :避免了单一分段可能导致的语义断裂问题。
  4. 灵活性更强 :可以根据不同类型的文档选择不同的父子分段策略。

二、系统架构设计

2.1 整体架构

picture.image

整体架构

2.2 配置界面

picture.image

图片来自 Dify 官网

2.3 数据模型设计

2.3.1 实体模型关系

picture.image

实体模型关系

2.3.1 实体关系说明

  1. Dataset(数据集) :
  • 是知识库的顶层容器
  • 包含多个 Document(文档)
  • 定义了索引技术、嵌入模型等全局设置
  • Document(文档) :
  • 属于一个 Dataset
  • 包含多个 DocumentSegment(文档分段)
  • 关联一个 DatasetProcessRule(处理规则)
  • 定义了文档的语言、分段模式等属性
  • DocumentSegment(文档分段) :
  • 属于一个 Document
  • 在父子分段模式下,包含多个 ChildChunk(子分段)
  • 有自己的索引节点 ID 和哈希值
  • 存储分段内容、位置、词数等信息
  • ChildChunk(子分段) :
  • 属于一个 DocumentSegment
  • 有自己的索引节点 ID 和哈希值
  • 存储子分段内容、位置、词数等信息
  • 类型可以是自动生成或自定义
  • DatasetProcessRule(数据集处理规则) :
  • 关联到 Dataset 和 Document
  • 包含处理模式和规则配置
  • 规则中定义了父子分段的处理方式
  • Rule(规则) :
  • 定义父模式(全文档或段落)
  • 包含分段和子分段的配置
  • Segmentation(分段配置) :
  • 定义分隔符、最大标记数和重叠设置

三、核心代码实现

3.1 数据模型实现

在 Dify 中,父子分段模式的数据模型主要由DocumentSegmentChildChunk两个类实现:

3.1.1 DocumentSegment 模型

DocumentSegment模型定义在api/models/dataset.py文件中,代表父段落:

  
classDocumentSegment(db.Model):  
    \_\_tablename\_\_ = "document\_segments"  
      
    id = db.Column(db.
   
 String
 (36), primary\_key=True)  
    tenant\_id = db.Column(db.
   
 String
 (36), nullable=False)  
    dataset\_id = db.Column(db.
   
 String
 (36), nullable=False)  
    document\_id = db.Column(db.
   
 String
 (36), nullable=False)  
    position = db.Column(db.Integer, nullable=False, default=0)  
    content = db.Column(db.Text, nullable=False)  
    # 其他字段省略...  
      
    @property  
    defchild\_chunks(self):  
        # 获取子段落的属性方法  
        dataset = db.session.query(Dataset).filter(Dataset.id == self.dataset\_id).first()  
        ifnot dataset or dataset.process\_rule.mode != "hierarchical"or dataset.process\_rule.rules.get("parent\_mode") == "FULL\_DOC":  
            return []  
        child\_chunks = (  
            db.session.query(ChildChunk)  
            .filter(  
                ChildChunk.tenant\_id == self.tenant\_id,  
                ChildChunk.dataset\_id == self.dataset\_id,  
                ChildChunk.document\_id == self.document\_id,  
                ChildChunk.segment\_id == self.id,  
            )  
            .all()  
        )  
        return child\_chunks or []  
      
    defget\_child\_chunks(self):  
        # 获取子段落的方法  
        child\_chunks = (  
            db.session.query(ChildChunk)  
            .filter(  
                ChildChunk.tenant\_id == self.tenant\_id,  
                ChildChunk.dataset\_id == self.dataset\_id,  
                ChildChunk.document\_id == self.document\_id,  
                ChildChunk.segment\_id == self.id,  
            )  
            .all()  
        )  
        return child\_chunks or []  

3.1.2 ChildChunk 模型

ChildChunk模型同样定义在api/models/dataset.py文件中,代表子段落:

  
classChildChunk(db.Model):  
    \_\_tablename\_\_ = "child\_chunks"  
      
    \_\_table\_args\_\_ = (  
        db.PrimaryKeyConstraint("id", 
   
 name
 ="child\_chunk\_pkey"),  
        db.
   
 Index
 ("child\_chunk\_dataset\_id\_idx", "tenant\_id", "dataset\_id", "document\_id", "segment\_id", "index\_node\_id"),  
        db.
   
 Index
 ("child\_chunks\_node\_idx", "index\_node\_id", "dataset\_id"),  
        db.
   
 Index
 ("child\_chunks\_segment\_idx", "segment\_id"),  
    )  
      
    id = db.Column(db.
   
 String
 (36), primary\_key=True, default=lambda: str(uuid.uuid4()))  
    tenant\_id = db.Column(db.
   
 String
 (36), nullable=False)  
    dataset\_id = db.Column(db.
   
 String
 (36), nullable=False)  
    document\_id = db.Column(db.
   
 String
 (36), nullable=False)  
    segment\_id = db.Column(db.
   
 String
 (36), nullable=False)  
    position = db.Column(db.Integer, nullable=False, default=0)  
    content = db.Column(db.Text, nullable=False)  
    word\_count = db.Column(db.Integer, nullable=False, default=0)  
    index\_node\_id = db.Column(db.
   
 String
 (36))  
    index\_node\_hash = db.Column(db.
   
 String
 (64))  
    type = db.Column(db.
   
 String
 (16), nullable=False, default="automatic")  
    # 其他字段省略...  

3.2 索引处理器实现

父子分段模式的索引处理由ParentChildIndexProcessor类实现,定义在api/core/rag/index\_processor/processor/parent\_child\_index\_processor.py文件中:

  
classParentChildIndexProcessor(BaseIndexProcessor):  
    defextract(self, extract\_setting: ExtractSetting, **kwargs) -> list[Document]:  
        # 从原始文档中提取文本  
        text\_docs = ExtractProcessor.extract(  
            extract\_setting=extract\_setting,  
            is\_automatic=(  
                kwargs.get("process\_rule\_mode") == "automatic"or kwargs.get("process\_rule\_mode") == "hierarchical"  
            ),  
        )  
        return text\_docs  
  
    deftransform(self, documents: list[Document], **kwargs) -> list[Document]:  
        # 将文本转换为父子段落结构  
        process\_rule = kwargs.get("process\_rule")  
        ifnot process\_rule:  
            raise ValueError("No process rule found.")  
        ifnot process\_rule.get("rules"):  
            raise ValueError("No rules found in process rule.")  
        rules = Rule(**process\_rule.get("rules"))  
        all\_documents = []  # type: ignore  
          
        if rules.parent\_mode == ParentMode.PARAGRAPH:  
            # 段落模式:将文档分割为多个父段落  
            # 代码省略...  
              
        elif rules.parent\_mode == ParentMode.FULL\_DOC:  
            # 全文模式:将整个文档作为一个父段落  
            page\_content = "\n".join([document.page\_content for document in documents])  
            document = Document(page\_content=page\_content, metadata=documents[0].metadata)  
            # 解析文档为子节点  
            child\_nodes = self.\_split\_child\_nodes(  
                document, rules, process\_rule.get("mode"), kwargs.get("embedding\_model\_instance")  
            )  
            if kwargs.get("preview"):  
                if len(child\_nodes) > dify\_config.CHILD\_CHUNKS\_PREVIEW\_NUMBER:  
                    child\_nodes = child\_nodes[: dify\_config.CHILD\_CHUNKS\_PREVIEW\_NUMBER]  
  
            document.children = child\_nodes  
            doc\_id = str(uuid.uuid4())  
            hash = helper.generate\_text\_hash(document.page\_content)  
            document.metadata["doc\_id"] = doc\_id  
            document.metadata["doc\_hash"] = hash  
            all\_documents.append(document)  
  
        return all\_documents  
  
    defload(self, dataset: Dataset, documents: list[Document], with\_keywords: bool = True, **kwargs):  
        # 将父子段落加载到向量数据库  
        if dataset.indexing\_technique == "high\_quality":  
            vector = Vector(dataset)  
            for document in documents:  
                child\_documents = document.children  
                if child\_documents:  
                    formatted\_child\_documents = [  
                        Document(**child\_document.model\_dump()) for child\_document in child\_documents  
                    ]  
                    vector.create(formatted\_child\_documents)  
  
    def\_split\_child\_nodes(  
        self,  
        document\_node: Document,  
        rules: Rule,  
        process\_rule\_mode: str,  
        embedding\_model\_instance: Optional[ModelInstance],  
    ) -> list[ChildDocument]:  
        # 将父段落分割为子段落  
        ifnot rules.subchunk\_segmentation:  
            raise ValueError("No subchunk segmentation found in rules.")  
        child\_splitter = self.\_get\_splitter(  
            processing\_rule\_mode=process\_rule\_mode,  
            max\_tokens=rules.subchunk\_segmentation.max\_tokens,  
            chunk\_overlap=rules.subchunk\_segmentation.chunk\_overlap,  
            
   
 separator
 =rules.subchunk\_segmentation.
   
 separator
 ,  
            embedding\_model\_instance=embedding\_model\_instance,  
        )  
        # 解析文档为子节点  
        child\_nodes = []  
        child\_documents = child\_splitter.split\_documents([document\_node])  
        for child\_document\_node in child\_documents:  
            if child\_document\_node.page\_content.strip():  
                doc\_id = str(uuid.uuid4())  
                hash = helper.generate\_text\_hash(child\_document\_node.page\_content)  
                child\_document = ChildDocument(  
                    page\_content=child\_document\_node.page\_content, metadata=document\_node.metadata  
                )  
                child\_document.metadata["doc\_id"] = doc\_id  
                child\_document.metadata["doc\_hash"] = hash  
                child\_page\_content = child\_document.page\_content  
                if child\_page\_content.startswith(".") or child\_page\_content.startswith("。"):  
                    child\_page\_content = child\_page\_content[1:].strip()  
                if len(child\_page\_content) > 0:  
                    child\_document.page\_content = child\_page\_content  
                    child\_nodes.append(child\_document)  
        return child\_nodes  

3.3 API 控制器实现

父子分段的API控制器主要包括ChildChunkAddApiChildChunkUpdateApi,定义在api/controllers/console/datasets/datasets\_segments.py文件中:

  
classChildChunkAddApi(Resource):  
    @setup\_required  
    @login\_required  
    @account\_initialization\_required  
    @cloud\_edition\_billing\_resource\_check("vector\_space")  
    @cloud\_edition\_billing\_knowledge\_limit\_check("add\_segment")  
    @cloud\_edition\_billing\_rate\_limit\_check("knowledge")  
    defpost(self, dataset\_id, document\_id, segment\_id):  
        # 添加子段落  
        # 代码省略...  
        try:  
            child\_chunk = SegmentService.create\_child\_chunk(args.get("content"), 
   
 segment
 , document, dataset)  
        
 
 except
  ChildChunkIndexingServiceError as e:  
            raise ChildChunkIndexingError(str(e))  
        return {"
 
 data
 ": marshal(child\_chunk, child\_chunk\_fields)}, 200  
  
    @setup\_required  
    @login\_required  
    @account\_initialization\_required  
    defget(self, dataset\_id, document\_id, segment\_id):  
        # 获取子段落列表  
        # 代码省略...  
        child\_chunks = SegmentService.get\_child\_chunks(segment\_id, document\_id, dataset\_id, page, limit, 
   
 keyword
 )  
        return {  
            "
 
 data
 ": marshal(child\_chunks.items, child\_chunk\_fields),  
            "total": child\_chunks.total,  
            "total\_pages": child\_chunks.pages,  
            "page": page,  
            "limit": limit,  
        }, 200  
  
classChildChunkUpdateApi(Resource):  
    @setup\_required  
    @login\_required  
    @account\_initialization\_required  
    @cloud\_edition\_billing\_rate\_limit\_check("knowledge")  
    defdelete(self, dataset\_id, document\_id, segment\_id, child\_chunk\_id):  
        # 删除子段落  
        # 代码省略...  
        try:  
            SegmentService.delete\_child\_chunk(child\_chunk, dataset)  
        
 
 except
  ChildChunkDeleteIndexServiceError as e:  
            raise ChildChunkDeleteIndexError(str(e))  
        return {"result": "success"}, 204  
  
    @setup\_required  
    @login\_required  
    @account\_initialization\_required  
    @cloud\_edition\_billing\_resource\_check("vector\_space")  
    @cloud\_edition\_billing\_rate\_limit\_check("knowledge")  
    defpatch(self, dataset\_id, document\_id, segment\_id, child\_chunk\_id):  
        # 更新子段落  
        # 代码省略...  
        try:  
            child\_chunk = SegmentService.update\_child\_chunk(  
                args.get("content"), child\_chunk, 
   
 segment
 , document, dataset  
            )  
        
 
 except
  ChildChunkIndexingServiceError as e:  
            raise ChildChunkIndexingError(str(e))  
        return {"
 
 data
 ": marshal(child\_chunk, child\_chunk\_fields)}, 200  

3.4 服务层实现

子段落的服务层主要在SegmentService类中实现,定义在api/services/dataset\_service.py文件中:

  
@classmethod  
defcreate\_child\_chunk(  
    cls, content: str, 
 
 segment
 : DocumentSegment, document: Document, dataset: Dataset  
) -> ChildChunk:  
    lock\_name = "add\_child\_lock\_{}".format(
   
 segment
 .id)  
    with redis\_client.lock(lock\_name, timeout=20):  
        index\_node\_id = str(uuid.uuid4())  
        index\_node\_hash = helper.generate\_text\_hash(content)  
        child\_chunk\_count = (  
            db.session.query(ChildChunk)  
            .filter(  
                ChildChunk.tenant\_id == current\_user.current\_tenant\_id,  
                ChildChunk.dataset\_id == dataset.id,  
                ChildChunk.document\_id == document.id,  
                ChildChunk.segment\_id == 
   
 segment
 .id,  
            )  
            .count()  
        )  
        max\_position = (  
            db.session.query(func.max(ChildChunk.position))  
            .filter(  
                ChildChunk.tenant\_id == current\_user.current\_tenant\_id,  
                ChildChunk.dataset\_id == dataset.id,  
                ChildChunk.document\_id == document.id,  
                ChildChunk.segment\_id == 
   
 segment
 .id,  
            )  
            .
   
 scalar
 ()  
        )  
        child\_chunk = ChildChunk(  
            tenant\_id=current\_user.current\_tenant\_id,  
            dataset\_id=dataset.id,  
            document\_id=document.id,  
            segment\_id=
   
 segment
 .id,  
            position=max\_position + 1if max\_position else1,  
            index\_node\_id=index\_node\_id,  
            index\_node\_hash=index\_node\_hash,  
            content=content,  
            word\_count=len(content),  
            type="customized",  
            created\_by=current\_user.id,  
        )  
        db.session.add(child\_chunk)  
        # 保存向量索引  
        try:  
            VectorService.create\_child\_chunk\_vector(child\_chunk, dataset)  
        
 
 except
  Exception as e:  
            logging.exception("create child chunk 
 
 index
  failed")  
            db.session.rollback()  
            raise ChildChunkIndexingError(str(e))  
        db.session.commit()  
  
        return child\_chunk  

3.5 向量服务实现

子段落的向量服务主要在VectorService类中实现,定义在api/services/vector\_service.py文件中:

  
@classmethod  
defcreate\_child\_chunk\_vector(cls, child\_segment: ChildChunk, dataset: Dataset):  
    child\_document = Document(  
        page\_content=child\_segment.content,  
        metadata={  
            "doc\_id": child\_segment.index\_node\_id,  
            "doc\_hash": child\_segment.index\_node\_hash,  
            "document\_id": child\_segment.document\_id,  
            "dataset\_id": child\_segment.dataset\_id,  
        },  
    )  
    if dataset.indexing\_technique == "high\_quality":  
        # 保存向量索引  
        vector = Vector(dataset=dataset)  
        vector.add\_texts([child\_document], duplicate\_check=True)  

四、使用场景与最佳实践

4.1 适用场景

父子分段模式特别适合以下场景:

  1. 长文档处理 :对于长篇文档,父子分段可以保持上下文的完整性。
  2. 结构化文档 :对于有明确段落结构的文档,如学术论文、技术文档等。
  3. 需要精确检索的场景 :当用户查询需要精确匹配特定内容,同时又需要提供足够上下文的场景。
  4. 多语言文档 :对于包含多种语言的文档,父子分段可以更好地处理语言边界。

4.2 配置建议

在 Dify 中配置父子分段模式时,可以参考以下建议:

  1. 父段落大小 :建议设置为 300-500 个token,这样可以包含足够的上下文信息。
  2. 子段落大小 :建议设置为 100-200 个token,这样可以保证检索的精确性。
  3. 分隔符选择
  • 父段落分隔符:建议使用 \n\n (两个换行符),适合分割段落。
  • 子段落分隔符:建议使用 \n (单个换行符),适合在段落内部进一步分割。

4.3 性能优化

为了优化父子分段模式的性能,可以考虑以下几点:

  1. 合理设置子段落数量 :过多的子段落会增加索引大小和检索时间,建议根据文档特性合理设置。
  2. 使用高质量的嵌入模型 :选择性能更好的嵌入模型可以提高检索质量。
  3. 定期重新生成子段落 :当文档内容有重大更新时,可以考虑重新生成子段落。

五、实际应用示例

5.1 技术文档检索

假设我们有一份技术文档,需要实现精确的检索功能。使用父子分段模式的步骤如下:

  1. 上传文档 :将技术文档上传到 Dify 平台。
  2. 选择分段模式 :在数据集设置中选择"Parent-Child"分段模式。
  3. 配置分段参数
  • 父段落模式:选择"段落模式"
  • 父段落分隔符: \n\n
  • 父段落大小:400 tokens
  • 子段落分隔符: \n
  • 子段落大小:150 tokens
  • 处理文档 :系统会自动将文档分割为父段落和子段落,并创建向量索引。

  • 检索测试 :使用不同的查询测试检索效果,观察返回的父段落是否提供了足够的上下文信息。

5.2 自定义子段落

在某些情况下,自动生成的子段落可能不够理想,这时可以手动添加或编辑子段落:

  1. 查看父段落 :在文档详情页面查看已生成的父段落。
  2. 添加子段落 :点击"添加子段落"按钮,输入自定义的子段落内容。
  3. 编辑子段落 :选择已有的子段落,点击编辑按钮修改内容。
  4. 删除子段落 :选择不需要的子段落,点击删除按钮移除。
  5. 重新生成 :如果需要重新生成所有子段落,可以点击"重新生成子段落"按钮。

六、总结与展望

6.1 技术总结

Dify 的父子分段模式是一种高级的文档处理策略,通过将文档分为父段落和子段落两个层次,实现了精确检索和完整上下文的平衡,为用户提供了更精确、更全面的文档检索体验。其核心实现包括:

  1. 数据模型设计 :使用 DocumentSegmentChildChunk 两个模型分别表示父段落和子段落。
  2. 索引处理器 :通过 ParentChildIndexProcessor 实现文档的提取、转换和加载。
  3. API控制器 :提供了添加、更新、删除子段落的API接口。
  4. 向量服务 :负责创建和管理子段落的向量表示。
  5. 前端实现 :提供了友好的用户界面,方便用户管理子段落。

参考资料

https://github.com/langgenius/dify (v1.4.1)

https://docs.dify.ai/zh-hans/guides/knowledge-base/create-knowledge-and-upload-documents/chunking-and-cleaning-text

七、推荐阅读

👆👆👆欢迎关注,一起进步👆👆👆

欢迎留言讨论哈

🧐点赞、分享、推荐 ,一键三连,养成习惯👍

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
云原生机器学习系统落地和实践
机器学习在字节跳动有着丰富业务场景:推广搜、CV/NLP/Speech 等。业务规模的不断增大对机器学习系统从用户体验、训练效率、编排调度、资源利用等方面也提出了新的挑战,而 Kubernetes 云原生理念的提出正是为了应对这些挑战。本次分享将主要介绍字节跳动机器学习系统云原生化的落地和实践。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论