从零开始学 Dify - Dify 的 RAG 系统如何有效地处理和检索大量文档?

技术

一、整体架构

1.1 架构概述

Dify 的 RAG(检索增强生成)架构是一个完整的文档处理、索引和检索系统,旨在提高大语言模型生成内容的准确性和相关性。该架构由三个主要模块组成:文档处理模块、向量化与索引模块、检索与重排模块。

picture.image

整个系统的数据流如下:

  1. 用户上传文档或提供URL
  2. 文档处理模块提取文本内容并进行清洗
  3. 文本被分割成适当大小的段落(chunks)
  4. 向量化模块将这些段落转换为向量表示并存储在向量数据库中
  5. 同时创建关键词索引以支持混合搜索
  6. 用户查询时,检索模块根据配置的检索方法找到相关段落
  7. 重排模块对检索结果进行排序优化
  8. 最终将优化后的上下文提供给大语言模型生成回答

picture.image

1.2 数据模型关系

Dify RAG 系统的核心数据模型包括:

  • Dataset :知识库,包含多个文档,是 RAG 的基本单位
  • Document :文档,包含元数据和处理规则
  • DocumentSegment :文档分段,是实际被索引和检索的最小单位
  • DatasetKeywordTable :关键词表,用于支持关键词搜索

这些实体之间的关系是:一个 Dataset 包含多个 Document,一个 Document 包含多个 DocumentSegment。Dataset 通过index_struct 字段存储向量数据库的配置信息。

picture.image

二、文档处理模块

2.1 设计思路

文档处理模块负责将各种格式的文档转换为可被索引的文本段落。该模块采用 ETL(提取-转换-加载)模式设计,具有高度的可扩展性和灵活性。

主要功能包括:

  • 支持多种文档格式的文本提取
  • 文本清洗和预处理
  • 文本分段(chunking)
  • 元数据提取和管理

2.1 文档处理架构

picture.image

2.3 核心组件

ExtractProcessor

ExtractProcessor是文档处理的入口,负责根据文档类型选择合适的提取器进行文本提取。它支持多种文档格式,包括:

  • 文本文件(TXT)
  • PDF文档
  • Word文档(DOCX、DOC)
  • Excel表格(XLSX、XLS)
  • PowerPoint演示文稿(PPTX、PPT)
  • HTML网页
  • Markdown文档
  • 电子邮件(EML、MSG)
  • 电子书(EPUB)
  • XML文件
  • CSV数据
  • Notion导出文件

提取器采用策略模式设计,可以轻松扩展以支持新的文档格式。

ExtractProcessor 类实现
  
class ExtractProcessor:  
    @classmethod  
    def extract(cls, extract\_setting: ExtractSetting, is\_automatic: bool = False,  
                file\_path: str = None) -> list[Document]:  
        # 根据数据源类型和文件类型选择合适的提取器  
        if extract\_setting.datasource\_type == DatasourceType.FILE.value:  
            # 根据文件扩展名选择不同的提取器  
            if file\_extension == '.pdf':  
                extractor = PdfExtractor(file\_path)  
            elif file\_extension in ['.md', '.markdown']:  
                extractor = UnstructuredMarkdownExtractor(file\_path) \  
                    if is\_automatic else MarkdownExtractor(file\_path)  
            # ... 其他文件类型的处理  
              
            return extractor.extract()  

文档提取流程

picture.image

TextSplitter

TextSplitter负责将长文本分割成适当大小的段落,是 RAG 系统性能的关键组件。它提供以下功能:

  • 基于不同分隔符的文本分割
  • 支持设置最大 token 数量
  • 支持段落重叠以保持上下文连贯性
  • 支持不同的分词器(Tiktoken、HuggingFace)进行准确的 token 计算

分割策略可以通过处理规则进行配置,允许用户根据不同类型的文档调整分割参数。

TextSplitter 实现类
  
class TextSplitter(BaseDocument
 
 Transformer
 , ABC):  
    def \_\_init\_\_(  
            self,  
            chunk\_size: int = 4000,  
            chunk\_overlap: int = 200,  
            length\_function: Callable[[str], int] = len,  
            keep\_
 
 separator
 : bool = False,  
            add\_start\_index: bool = False,  
    ) -> None:  
        # 初始化分割器参数  
        self.\_chunk\_size = chunk\_size  
        self.\_chunk\_overlap = chunk\_overlap  
        self.\_length\_function = length\_function  
        self.\_keep\_separator = keep\_separator  
        self.\_add\_start\_index = add\_start\_index  
  
    @abstractmethod  
    def split\_text(self, text: str) -> list[str]:  
        """Split text into multiple components."""  
          
    def create\_documents(  
            self, texts: list[str], metadatas: Optional[list[dict]] = None  
    ) -> list[Document]:  
        # 从分割后的文本创建文档对象  

文档切块是RAG中的关键步骤,Dify 使用TextSplitter类及其子类来实现不同的切块策略:

  • FixedRecursiveCharacterTextSplitter :固定大小的递归字符分割
  • EnhanceRecursiveCharacterTextSplitter :增强的递归字符分割

切块后的文档片段会保留原始文档的元数据,并添加唯一标识符(doc_id)和内容哈希值(doc_hash)。

文本分割流程

picture.image

TextCleaner

TextCleaner负责文本清洗,去除不必要的格式和噪声,提高索引和检索质量。清洗操作包括:

  • 移除多余空格
  • 去除URL和电子邮件(可选)
  • 标准化文本格式
  • 处理特殊字符

三、向量化与索引模块

3.1 设计思路

向量化与索引模块负责将文本段落转换为向量表示并存储在向量数据库中,同时创建关键词索引以支持混合搜索。该模块采用工厂模式和适配器模式设计,支持多种向量数据库和嵌入模型。

主要功能包括:

  • 文本向量化(嵌入)
  • 向量存储和索引
  • 关键词提取和索引
  • 缓存管理

3.2 向量化流程图

picture.image

3.3 核心组件

嵌入缓存(CacheEmbedding)

Dify使用CacheEmbedding类来管理文本嵌入过程,它具有缓存功能,可以避免重复计算嵌入向量:

  1. 对于每个文本块,首先计算其哈希值
  2. 检查数据库中是否已存在该哈希值对应的嵌入向量
  3. 如果存在,直接使用缓存的向量
  4. 如果不存在,调用嵌入模型生成向量,并将结果存入缓存
  
class CacheEmbedding(Embeddings):  
    def \_\_init\_\_(self, model\_instance: ModelInstance, user: Optional[str] = None) -> None:  
        self.\_model\_instance = model\_instance  
        self.\_user = user  
  
    def embed\_documents(self, texts: list[str]) -> list[list[
 
 float
 ]]:  
        # 使用文档嵌入缓存或存储(如果不存在)  
        text\_embeddings = [Nonefor \_ in range(len(texts))]  
        embedding\_queue\_indices = []  
          
        # 检查缓存中是否存在嵌入  
        for i, text in enumerate(texts):  
            hash = helper.generate\_text\_hash(text)  
            embedding = db.session.query(Embedding).filter\_by(  
                model\_name=self.\_model\_instance.model,  
                hash=hash,  
                provider\_name=self.\_model\_instance.provider  
            ).first()  
              
            if embedding:  
                text\_embeddings[i] = embedding.get\_embedding()  
            else:  
                embedding\_queue\_indices.append(i)  
                  
        # 处理未缓存的嵌入  
        if embedding\_queue\_indices:  
            # 生成嵌入并缓存  

嵌入缓存流程

picture.image

向量工厂(VectorFactory)

Dify 支持多种向量数据库,通过工厂模式实现:

  
class Vector:  
    def \_\_init\_\_(self, dataset: Dataset, attributes: list = None):  
        if attributes isNone:  
            attributes = ['doc\_id', 'dataset\_id', 'document\_id', 'doc\_hash']  
        self.\_dataset = dataset  
        self.\_embeddings = self.\_get\_embeddings()  
        self.\_attributes = attributes  
        self.\_vector\_processor = self.\_init\_vector()  
  
    def \_init\_vector(self) -> BaseVector:  
        vector\_type = dify\_config.VECTOR\_STORE  
        if self.\_dataset.index\_struct\_dict:  
            vector\_type = self.\_dataset.index\_struct\_dict['type']  
  
        vector\_factory\_cls = self.get\_vector\_factory(vector\_type)  
        return vector\_factory\_cls().init\_vector(self.\_dataset, self.\_attributes, self.\_embeddings)  
  
    @staticmethod  
    def get\_vector\_factory(vector\_type: str) -> type[AbstractVectorFactory]:  
        match vector\_type:  
            case VectorType.CHROMA:  
                
 
 from
  core.rag.datasource.vdb.chroma.chroma\_vector import ChromaVectorFactory  
                return ChromaVectorFactory  
            # ... 其他向量数据库的支持  

向量工厂模式

picture.image

支持的向量数据库包括:

  • Chroma
  • Milvus
  • MyScale
  • PGVector
  • Qdrant
  • Relyt
  • Elasticsearch
  • TiDB Vector
  • Weaviate
  • Tencent
  • Oracle
  • OpenSearch
  • AnalyticDB

索引处理器(IndexingRunner)

IndexingRunner负责协调整个索引过程,包括文档提取、转换和加载。它实现了完整的 ETL 流程:

  • Extract:从原始文档中提取文本
  • Transform:清洗和分割文本
  • Load:将处理后的段落加载到向量数据库和关键词索引中
  
class IndexingRunner:  
    def \_\_init\_\_(self, dataset, document\_id, document\_model, document\_content, document\_metadata, tenant\_id, user\_id):  
        # 初始化  
          
    def run(self):  
        # 1. 提取文本  
        documents = self.extract()  
          
        # 2. 转换(分割和清洗)  
        
   
 segments
  = self.transform(documents)  
          
        # 3. 保存段落  
        self.save\_segments(
   
 segments
 )  
          
        # 4. 加载到索引  
        self.load()  

索引处理流程

picture.image

段落索引处理器(ParagraphIndexProcessor)

段落索引处理器实现了提取、转换、加载和检索方法:

  
class ParagraphIndexProcessor(IndexProcessorBase):  
    def extract(self, extract\_setting: ExtractSetting) -> list[Document]:  
        # 提取文档  
        return ExtractProcessor.extract(extract\_setting)  
          
    def transform(self, documents: list[Document]) -> list[Document]:  
        # 清洗和分割文本  
        cleaned\_documents = self.clean(documents)  
        return self.split(cleaned\_documents)  
          
    def load(self, dataset\_id: str, 
 
 segments
 : list[Document]) -> None:  
        # 创建向量索引和关键词索引  
        self.create\_vector\_index(dataset\_id, segments)  
        self.create\_keyword\_index(dataset\_id, segments)  
          
    def retrieve(self, dataset\_id: str, query: str, **kwargs) -> list[Document]:  
        # 调用检索服务  
        return
   
 Retrieval
 Service.retrieve(  
            retrival\_method=kwargs.get('
 
 retrieval
 \_method', 'semantic\_search'),  
            dataset\_id=dataset\_id,  
            query=query,  
            top\_k=kwargs.get('top\_k', 2),  
            score\_threshold=kwargs.get('score\_threshold', 0.0),  
            reranking\_model=kwargs.get('reranking\_model'),  
            reranking\_mode=kwargs.get('reranking\_mode', 'reranking\_model'),  
            weights=kwargs.get('weights')  
        )  

picture.image

Jieba关键词索引

Jieba组件负责从文本中提取关键词并建立索引,支持关键词搜索和混合搜索。它提供:

  • 关键词提取
  • TF-IDF权重计算
  • 关键词索引创建和更新

四、检索与重排模块

4.1 设计思路

检索与重排模块负责根据用户查询找到最相关的文本段落,并对结果进行优化排序。该模块支持多种检索方法和重排策略,可以根据不同场景进行配置。

主要功能包括:

  • 语义检索(向量相似度搜索)
  • 关键词检索(基于关键词匹配)
  • 全文检索(基于倒排索引)
  • 混合检索(结合多种检索方法)
  • 结果重排序(基于多种因素)

4.2 检索与重排流程图

picture.image

4.3 核心组件

检索方法(RetrievalMethod)

RetrievalMethod是一个枚举类,定义了系统支持的检索方法:

  • SEMANTIC_SEARCH:语义搜索,基于向量相似度
  • FULL_TEXT_SEARCH:全文搜索,基于倒排索引
  • HYBRID_SEARCH:混合搜索,结合语义和关键词
  
class 
 
 Retrieval
 Method(Enum):  
    SEMANTIC\_SEARCH = 'semantic\_search'  
    FULL\_TEXT\_SEARCH = 'full\_text\_search'  
    HYBRID\_SEARCH = 'hybrid\_search'  
  
    @staticmethod  
    def is\_support\_semantic\_search(retrieval\_method: str) -> bool:  
        return retrieval\_method in {
   
 Retrieval
 Method.SEMANTIC\_SEARCH.value, RetrievalMethod.HYBRID\_SEARCH.value}  
  
    @staticmethod  
    def is\_support\_fulltext\_search(
 
 retrieval
 \_method: str) -> bool:  
        return retrieval\_method in {RetrievalMethod.FULL\_TEXT\_SEARCH.value, RetrievalMethod.HYBRID\_SEARCH.value}  

检索方法流程图

picture.image

检索服务(RetrievalService)

Retrieval Service是检索功能的核心实现,负责根据配置的检索方法执行搜索并返回结果。它提供以下功能:

  • 支持多种检索方法
  • 多线程并行检索
  • 结果合并和排序
  • 支持重排序

主要方法包括:

  • retrieve :主检索方法,根据检索方法执行搜索
  • keyword\_search :关键词搜索实现
  • embedding\_search :向量相似度搜索实现
  • full\_text\_index\_search :全文索引搜索实现

检索服务协调不同的检索方法并处理结果:

  
class 
 
 Retrieval
 Service:  
    @classmethod  
    def retrieve(cls, retrival\_method: str, dataset\_id: str, query: str,  
                 top\_k: int, score\_threshold: Optional[
 
 float
 ] = .0,  
                 reranking\_model: Optional[dict] = None, reranking\_mode: Optional[str] = 'reranking\_model',  
                 weights: Optional[dict] = None):  
        # 获取数据集  
        dataset = db.session.query(Dataset).filter(Dataset.id == dataset\_id).first()  
        ifnot dataset or dataset.
   
 available
 \_document\_count == 0or dataset.
   
 available
 \_segment\_count == 0:  
            return []  
              
        all\_documents = []  
        threads = []  
        exceptions = []  
          
        # 关键词搜索  
        if retrival\_method == 'keyword\_search':  
            keyword\_thread = threading.Thread(target=
   
 Retrieval
 Service.keyword\_search, kwargs={...})  
            threads.append(keyword\_thread)  
            keyword\_thread.start()  
              
        # 语义搜索  
        if
   
 Retrieval
 Method.is\_support\_semantic\_search(retrival\_method):  
            embedding\_thread = threading.Thread(target=
   
 Retrieval
 Service.embedding\_search, kwargs={...})  
            threads.append(embedding\_thread)  
            embedding\_thread.start()  
              
        # 全文搜索  
        if
   
 Retrieval
 Method.is\_support\_fulltext\_search(retrival\_method):  
            full\_text\_index\_thread = threading.Thread(target=RetrievalService.full\_text\_index\_search, kwargs={...})  
            threads.append(full\_text\_index\_thread)  
            full\_text\_index\_thread.start()  
              
        # 等待所有线程完成  
        for thread in threads:  
            thread.join()  
              
        # 混合搜索需要后处理  
        if retrival\_method == 
   
 Retrieval
 Method.HYBRID\_SEARCH.value:  
            data\_post\_processor = DataPostProcessor(str(dataset.tenant\_id), reranking\_mode,  
                                                    reranking\_model, weights, False)  
            all\_documents = data\_post\_processor.invoke(  
                query=query,  
                documents=all\_documents,  
                score\_threshold=score\_threshold,  
                top\_n=top\_k  
            )  
              
        return all\_documents  

检索服务时序图

picture.image

权重重排(WeightRerankRunner)

WeightRerankRunner实现了基于权重的重排序策略,可以根据不同因素对检索结果进行优化排序。它考虑以下因素:

  • 向量相似度得分
  • 关键词匹配得分
  • 自定义权重配置

通过调整不同因素的权重,可以优化检索结果的相关性和质量。

权重重排结合了关键词得分和向量相似度得分:

  
class WeightRerankRunner:  
    def \_\_init\_\_(self, tenant\_id: str, weights: Weights) -> None:  
        self.tenant\_id = tenant\_id  
        self.weights = weights  
  
    def run(self, query: str, documents: list[Document], score\_threshold: Optional[float] = None,  
            top\_n: Optional[int] = None, user: Optional[str] = None) -> list[Document]:  
        # 去重  
        docs = []  
        doc\_id = []  
        unique\_documents = []  
        for document in documents:  
            if document.metadata['doc\_id'] notin doc\_id:  
                doc\_id.append(document.metadata['doc\_id'])  
                docs.append(document.page\_content)  
                unique\_documents.append(document)  
  
        documents = unique\_documents  
  
        # 计算关键词得分和向量得分  
        rerank\_documents = []  
        query\_scores = self.\_calculate\_keyword\_score(query, documents)  
        query\_vector\_scores = self.\_calculate\_cosine(self.tenant\_id, query, documents, self.weights.vector\_setting)  
          
        # 合并得分  
        for document, query\_score, query\_vector\_score in zip(documents, query\_scores, query\_vector\_scores):  
            score = self.weights.vector\_setting.vector\_weight * query\_vector\_score + \  
                    self.weights.keyword\_setting.keyword\_weight * query\_score  
            if score\_threshold and score < score\_threshold:  
                continue  
            document.metadata['score'] = score  
            rerank\_documents.append(document)  
              
        # 排序并返回结果  
        rerank\_documents = sorted(rerank\_documents, key=lambda x: x.metadata['score'], reverse=True)  
        return rerank\_documents[:top\_n] if top\_n else rerank\_documents  

权重重排流程图

picture.image

数据后处理器(DataPostProcessor)

DataPostProcessor负责对检索结果进行后处理,包括重排序、去重和格式化。它支持多种重排序策略,可以根据不同场景进行配置。

数据后处理器负责重排序和结果处理:

  
class DataPostProcessor:  
    def \_\_init\_\_(self, tenant\_id: str, reranking\_mode: str, reranking\_model: Optional[dict],  
                 weights: Optional[dict], enable\_reranking: bool = True):  
        # 初始化  
          
    def invoke(self, query: str, documents: list[Document], score\_threshold: Optional[float] = None,  
               top\_n: Optional[int] = None) -> list[Document]:  
        # 根据模式选择重排序方法  
        if self.enable\_reranking and self.reranking\_mode == RerankMode.RERANKING\_MODEL.value:  
            # 使用模型重排序  
            return self.rerank\_model\_runner.run(query, documents, score\_threshold, top\_n)  
        elif self.enable\_reranking and self.reranking\_mode == RerankMode.WEIGHT.value:  
            # 使用权重重排序  
            return self.weight\_rerank\_runner.run(query, documents, score\_threshold, top\_n)  
        else:  
            # 不重排序,直接返回  
            return documents[:top\_n] if top\_n else documents  

数据后处理流程图

picture.image

五、总结

Dify 的 RAG 系统是一个功能完整、灵活可扩展的检索增强生成框架,具有以下特点:

  1. 多格式文档支持 :支持 PDF、Word、Excel、Markdown、HTML、CSV 等多种文档格式。
  2. 灵活的文本分块 :支持多种分块策略,包括固定大小、基于分隔符和基于语义的分块。
  3. 高效的向量化 :实现了嵌入缓存机制,提高了向量化效率。
  4. 多样的存储选项 :支持 Chroma、Milvus、PGVector、Qdrant 等多种向量数据库。
  5. 多种检索方法 :支持语义搜索、全文搜索、关键词搜索和混合搜索。
  6. 高级重排序 :支持基于模型的重排序和基于权重的重排序。
  7. 并行处理 :使用多线程并行执行不同的检索方法,提高效率。
  8. 可扩展架构 :采用工厂模式和抽象基类,便于扩展新的功能。

通过这些组件的协同工作,Dify 的 RAG 系统能够有效地处理和检索大量文档,为大语言模型提供准确的上下文信息,从而生成更加准确和相关的回答。

参考资料

https://github.com/langgenius/dify (v0.6.3)

推荐阅读

👆👆👆欢迎关注,一起进步👆👆👆

欢迎留言讨论哈

🧐点赞、分享、推荐 ,一键三连,养成习惯👍‍

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
火山引擎AB测试总体经济影响
为充分了解火山引擎A/B测试平台为企业带来的潜在投资回报,火山引擎委托Forrester Consulting使用总 体经济影响(TEI)模型进行对其A/B测试产品潜在的投资回报率(ROI)进行评估分析。该研究的目的是为了给读者提供火山引擎A/B测试产品带来潜在财务影响评估的参考。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论