从"答非所问"到"精准回答":30+架构图解密RAG系统优化的核心秘籍

在AI时代,RAG(检索增强生成)系统已成为企业知识管理的核心技术。然而,面对海量文档,如何精准找到用户真正需要的信息?如何让AI不再"答非所问"?这正是每个技术团队都在面临的"大海捞针"挑战。

这篇文章将尝试解决什么问题?

以金融文档为例,展开讨论。同时,因为个人并非 AI 专业,仅以 AI 爱好者的视角来探讨哈,欢迎批评批证 😊,文章稍长,推荐关注、收藏哈,文中代码仅用于说明逻辑(重点看注释和方法名),实际情况会更复杂。

👆👆👆欢迎关注,一起进步👆👆👆

金融文档检索难题

  • 年报、季报、审计报告中的关键信息如何快速定位?
  • 财务三大表数据如何统一处理和智能检索?
  • 多主体关联信息(发行人、担保人、子公司)如何有效管理?

检索效果优化挑战

  • 用户查询与专业术语不匹配怎么办?
  • 如何提升召回率的同时保证准确性?
  • 向量检索结果如何进行质量诊断和可视化验证?

系统架构设计困惑

  • 向量数据库集合应该统一还是分离?
  • 如何设计高效的索引和元数据结构?
  • 查询优化和重排序策略如何实现?

RAG系统概述

RAG(Retrieval-Augmented Generation)是一种结合检索和生成的AI架构,通过从外部知识库检索相关信息来增强大语言模型的回答能力。

picture.image

大海捞针问题分析

问题本质

"大海捞针"问题指的是在海量文档中精确找到与查询相关的信息片段。主要挑战包括:

  1. 语义匹配难度 :用户查询与文档内容的表达方式差异
  2. 信息密度低 :相关信息在大量无关内容中稀疏分布
  3. 上下文依赖 :关键信息可能分散在多个文档片段中
  4. 查询表达局限 :用户难以用专业术语精确表达需求

解决思路

picture.image

向量数据库索引构建

集合架构设计决策分析

在RAG系统中,向量数据库的集合设计是一个关键的架构决策。我们需要在统一集合分离集合 之间做出选择,这个决策将直接影响系统的性能、可维护性和扩展性。

统一集合 vs 分离集合对比分析

picture.image

统一集合设计的核心架构

我们选择了统一集合设计 ,主要基于以下分析:

1. 架构简化优势

  • 单一集合减少系统复杂度
  • 统一的索引和缓存策略
  • 简化备份和维护操作
  • 降低运维成本

2. 跨文档类型检索能力

  • 支持同时检索多种文档类型的相关信息
  • 便于进行综合分析和对比
  • 避免多次查询不同集合的复杂性
  • 提供更丰富的上下文信息

3. 灵活的业务逻辑支持

  • 通过metadata实现业务级别的文档分类
  • 支持动态添加新的文档类型
  • 便于实现复杂的业务查询逻辑
  • 适应业务需求变化

统一集合的技术实现策略

1. 智能分区机制

  
# 基于时间的分区策略  
time\_
   
 partition
 s = [  
    "2020\_2021", "2022", "2023", "2024"  
]  
  
# 基于文档类型的分区策略  
doc\_type\_
   
 partition
 s = [  
    "annual\_reports",     # 年报  
    "quarterly\_reports",  # 季报  
    "audit\_reports",      # 审计报告  
    "announcements"       # 公告  
]  

2. 丰富的Metadata字段设计

  • doc\_type : 文档类型标识
  • company\_name : 公司名称
  • industry\_category : 行业分类
  • publish\_date : 发布日期
  • report\_period : 报告期
  • section\_type : 章节类型
  • financial\_indicators : 财务指标

3. 复合索引优化策略

  
# 常用查询组合的复合索引  

   
 composite
 \_indexes = [      ["company\_name", "doc\_type", "publish\_date"],  
    ["industry\_category", "report\_period"],  
    ["doc\_type", "section\_type", "confidence\_score"],  
    ["publish\_date", "source\_reliability"]  
]  

性能优化机制

1. 预过滤策略

  • 通过metadata预过滤减少向量计算量
  • 智能查询路由到相关分区
  • 缓存热点查询结果

2. 查询优化流程

picture.image

适用场景分析

推荐使用统一集合的场景

✅ 需要跨文档类型进行综合检索

✅ 文档类型相对固定且metadata结构相似

✅ 希望简化系统架构和维护成本

✅ 业务逻辑需要灵活的文档分类和过滤

✅ 团队规模较小,需要降低运维复杂度

考虑分离集合的场景

❌ 不同文档类型的向量维度差异很大

❌ 各文档类型的访问模式完全不同

❌ 需要独立的索引和优化策略

❌ 数据量极大且性能要求极高

❌ 有严格的数据隔离要求

设计决策总结

我们选择统一集合设计的核心原因:

  1. 业务需求匹配 :金融文档检索经常需要跨类型查询
  2. 架构简化 :减少系统复杂度,提高可维护性
  3. 性能平衡 :通过分区和索引策略保证查询性能
  4. 扩展性考虑 :便于添加新的文档类型和业务逻辑
  5. 成本效益 :降低开发和运维成本

这种设计通过统一集合+智能分区+丰富metadata 的方案,既保持了架构的简洁性,又实现了高效的查询性能,是针对金融文档RAG系统的最优选择。

金融文档结构分析与设计策略

金融文档具有复杂的层次结构和多样的内容组织方式。正确理解和处理这些结构对RAG系统的检索效果至关重要。

主要金融文档类型结构分析

1. 年度/季度财务报告结构

picture.image

2. 审计报告结构

picture.image

3. 债券募集说明书结构

picture.image

财务三大表统一处理策略

财务三大表(资产负债表、利润表、现金流量表)在不同文档中重复出现,需要统一的处理策略。

财务报表处理流程架构

picture.image

数据结构设计层次

picture.image

1. 标准化数据结构设计

以下代码实现了财务报表的标准化数据结构,通过枚举类型确保数据一致性,通过数据类简化元数据管理:

  
from dataclasses import dataclass  
from typing import Dict, List, Optional  
from enum import Enum  
  
classFinancialStatementType(Enum):  
    """财务报表类型"""  
    BALANCE\_SHEET = "balance\_sheet"          # 资产负债表  
    INCOME\_STATEMENT = "income\_statement"    # 利润表  
    CASH\_FLOW = "cash\_flow"                  # 现金流量表  
    EQUITY\_CHANGE = "equity\_change"          # 所有者权益变动表  
  
classReportScope(Enum):  
    """报表范围"""  
    CONSOLIDATED = "consolidated"# 合并报表  
    PARENT\_ONLY = "parent\_only"   # 母公司报表  
  
@dataclass  
classFinancialStatementMetadata:  
    """财务报表元数据"""  
    statement\_type: FinancialStatementType  
    report\_scope: ReportScope  
    period\_end\_date: str  
    period\_type: str  # 年报/半年报/季报  
    currency: str = "CNY"  
    unit: str = "元"# 元/千元/万元/百万元  
      
    # 来源文档信息  
    source\_document\_type: str  # 年报/季报/募集说明书/审计报告  
    source\_page: Optional[int] = None  
    source\_section: Optional[str] = None  
      
    # 主体信息  
    entity\_name: str  
    entity\_code: str  
    entity\_type: str  # 发行人/担保人/子公司  
      
    # 数据质量  
    is\_audited: bool = False  
    auditor\_name: Optional[str] = None  
    audit\_opinion: Optional[str] = None  
  
classFinancialStatementProcessor:  
    """财务报表处理器"""  
      
    def\_\_init\_\_(self):  
        self.statement\_templates = self.\_load\_statement\_templates()  
        self.account\_mapping = self.\_load\_account\_mapping()  
      
    defextract\_financial\_statements(self, text: str, doc\_metadata: Dict) -> List[Dict]:  
        """提取财务报表数据"""  
        statements = []  
          
        # 1. 识别报表类型和位置  
        statement\_sections = self.\_identify\_statement\_sections(text)  
          
        for section in statement\_sections:  
            # 2. 提取报表元数据  
            stmt\_metadata = self.\_extract\_statement\_metadata(section, doc\_metadata)  
              
            # 3. 提取财务数据  
            financial\_data = self.\_extract\_financial\_data(section, stmt\_metadata)  
              
            # 4. 标准化科目名称  
            normalized\_data = self.\_normalize\_account\_names(financial\_data)  
              
            # 5. 构建完整的报表记录  
            statement\_record = {  
                "metadata": stmt\_metadata,  
                "financial\_data": normalized\_data,  
                "raw\_text": section["text"],  
                "embedding\_chunks": self.\_create\_embedding\_chunks(section, stmt\_metadata)  
            }  
              
            statements.append(statement\_record)  
          
        return statements  
      
    def\_identify\_statement\_sections(self, text: str) -> List[Dict]:  
        """识别财务报表章节"""  
        sections = []  
          
        # 资产负债表识别模式  
        balance\_sheet\_patterns = [  
            r"(合并)?资产负债表.*?(?=利润表|现金流量表|所有者权益|$)",  
            r"(合并)?资产负债表.*?(?=\n\s*\n.*?表|$)"  
        ]  
          
        # 利润表识别模式  
        income\_patterns = [  
            r"(合并)?利润表.*?(?=现金流量表|资产负债表|所有者权益|$)",  
            r"(合并)?利润及利润分配表.*?(?=现金流量表|资产负债表|$)"  
        ]  
          
        # 现金流量表识别模式  
        cashflow\_patterns = [  
            r"(合并)?现金流量表.*?(?=资产负债表|利润表|所有者权益|$)",  
            r"(合并)?现金流量表.*?(?=\n\s*\n.*?表|$)"  
        ]  
          
        # 执行模式匹配  
        for pattern\_type, patterns in [  
            ("balance\_sheet", balance\_sheet\_patterns),  
            ("income\_statement", income\_patterns),  
            ("cash\_flow", cashflow\_patterns)  
        ]:  
            for pattern in patterns:  
                matches = re.finditer(pattern, text, re.DOTALL | re.IGNORECASE)  
                for match in matches:  
                    sections.append({  
                        "type": pattern\_type,  
                        "text": match.group(),  
                        "start\_pos": match.start(),  
                        "end\_pos": match.end()  
                    })  
          
        return sections  
      
    def\_normalize\_account\_names(self, financial\_data: Dict) -> Dict:  
        """标准化会计科目名称"""  
        normalized = {}  
          
        for account, value in financial\_data.items():  
            # 使用预定义的科目映射表  
            standard\_name = self.account\_mapping.get(account, account)  
            normalized[standard\_name] = value  
          
        return normalized  
      
    def\_create\_embedding\_chunks(self, section: Dict, metadata: FinancialStatementMetadata) -> List[Dict]:  
        """为财务报表创建向量化分块"""  
        chunks = []  
          
        # 按报表项目分块  
        text\_lines = section["text"].split("\n")  
        current\_chunk = []  
          
        for line in text\_lines:  
            if self.\_is\_account\_line(line):  
                if current\_chunk:  
                    # 完成当前分块  
                    chunk\_text = "\n".join(current\_chunk)  
                    chunks.append({  
                        "text": chunk\_text,  
                        "metadata": {  
                            "statement\_type": metadata.statement\_type.value,  
                            "report\_scope": metadata.report\_scope.value,  
                            "entity\_name": metadata.entity\_name,  
                            "period\_end\_date": metadata.period\_end\_date,  
                            "chunk\_type": "financial\_statement\_item"  
                        }  
                    })  
                    current\_chunk = []  
              
            current\_chunk.append(line)  
          
        # 处理最后一个分块  
        if current\_chunk:  
            chunk\_text = "\n".join(current\_chunk)  
            chunks.append({  
                "text": chunk\_text,  
                "metadata": {  
                    "statement\_type": metadata.statement\_type.value,  
                    "report\_scope": metadata.report\_scope.value,  
                    "entity\_name": metadata.entity\_name,  
                    "period\_end\_date": metadata.period\_end\_date,  
                    "chunk\_type": "financial\_statement\_item"  
                }  
            })  
          
        return chunks  

多主体关联设计策略

债券募集说明书中包含多个主体(发行人、担保人、子公司等),需要建立有效的关联机制。

多主体关联架构设计

picture.image

主体关系数据模型

picture.image

1. 主体关系建模

以下代码实现了多主体信息的提取和关联,通过枚举类型定义主体类型和关系类型,确保数据的一致性和完整性:

  
from enum import Enum  
from dataclasses import dataclass  
from typing import List, Dict, Optional  
  
classEntityType(Enum):  
    """主体类型"""  
    ISSUER = "issuer"                    # 发行人  
    GUARANTOR = "guarantor"              # 担保人  
    CONTROLLING\_SHAREHOLDER = "controlling\_shareholder"# 控股股东  
    ACTUAL\_CONTROLLER = "actual\_controller"# 实际控制人  
    SUBSIDIARY = "subsidiary"            # 子公司  
    ASSOCIATE = "associate"              # 联营公司  
    JOINT\_VENTURE = "joint\_venture"      # 合营公司  
  
classRelationshipType(Enum):  
    """关系类型"""  
    PARENT\_SUBSIDIARY = "parent\_subsidiary"      # 母子公司  
    GUARANTEE = "guarantee"                      # 担保关系  
    CONTROL = "control"                          # 控制关系  
    
   
 SIGNIFICANT
 \_
   
 INFLUENCE
  = "
 
 significant
 \_
 
 influence
 "# 重大影响  
    RELATED\_PARTY = "related\_party"              # 关联方  
  
@dataclass  
classEntityInfo:  
    """主体信息"""  
    entity\_id: str  
    entity\_name: str  
    entity\_code: Optional[str]  
    entity\_type: EntityType  
      
    # 基本信息  
    legal\_
   
 represent
 ative: Optional[str] = None  
    registered\_capital: Optional[str] = None  
    business\_scope: Optional[str] = None  
    registration\_address: Optional[str] = None  
      
    # 在文档中的位置信息  
    mention\_positions: List[Dict] = None# 在文档中被提及的位置  
    detail\_sections: List[Dict] = None    # 详细信息所在章节  
      
    def\_\_post\_init\_\_(self):  
        if self.mention\_positions isNone:  
            self.mention\_positions = []  
        if self.detail\_sections isNone:  
            self.detail\_sections = []  
  
@dataclass  
classEntityRelationship:  
    """主体关系"""  
    from\_entity\_id: str  
    to\_entity\_id: str  
    relationship\_type: RelationshipType  
    relationship\_description: str  
      
    # 关系强度和距离  
    relationship\_strength: 
   
 float
  = 1.0# 0-1之间,1表示最强关系  
    text\_distance: int = 0# 在文档中的文本距离  
      
    # 关系的有效性  
    is\_active: bool = True  
    effective\_date: Optional[str] = None  
    expiry\_date: Optional[str] = None  
  
classMultiEntityProcessor:  
    """多主体处理器"""  
      
    def\_\_init\_\_(self):  
        self.entity\_patterns = self.\_load\_entity\_patterns()  
        self.relationship\_patterns = self.\_load\_relationship\_patterns()  
      
    defextract\_entities\_and\_relationships(self, text: str, doc\_metadata: Dict) -> Dict:  
        """提取主体和关系信息"""  
        result = {  
            "entities": [],  
            "relationships": [],  
            "entity\_chunks": []  
        }  
          
        # 1. 识别所有主体  
        entities = self.\_identify\_entities(text, doc\_metadata)  
          
        # 2. 建立主体关系  
        relationships = self.\_extract\_relationships(text, entities)  
          
        # 3. 创建主体相关的文本分块  
        entity\_chunks = self.\_create\_entity\_chunks(text, entities, relationships)  
          
        result["entities"] = entities  
        result["relationships"] = relationships  
        result["entity\_chunks"] = entity\_chunks  
          
        return result  
      
    def\_identify\_entities(self, text: str, doc\_metadata: Dict) -> List[EntityInfo]:  
        """识别文档中的所有主体"""  
        entities = []  
          
        # 发行人识别  
        issuer\_patterns = [  
            r"发行人[::](.*?)(?=\n|,|。)",  
            r"本公司[::]?(.*?)(?=\n|,|。)",  
            r"(.*?)(?:股份)?(?:有限)?公司(?=作为发行人|发行)"  
        ]  
          
        # 担保人识别  
        guarantor\_patterns = [  
            r"担保人[::](.*?)(?=\n|,|。)",  
            r"保证人[::](.*?)(?=\n|,|。)",  
            r"(.*?)(?:股份)?(?:有限)?公司(?=提供担保|作为担保人)"  
        ]  
          
        # 子公司识别  
        subsidiary\_patterns = [  
            r"子公司[::](.*?)(?=\n|,|。)",  
            r"控股子公司[::](.*?)(?=\n|,|。)",  
            r"全资子公司[::](.*?)(?=\n|,|。)"  
        ]  
          
        # 执行实体识别  
        for entity\_type, patterns in [  
            (EntityType.ISSUER, issuer\_patterns),  
            (EntityType.GUARANTOR, guarantor\_patterns),  
            (EntityType.SUBSIDIARY, subsidiary\_patterns)  
        ]:  
            for pattern in patterns:  
                matches = re.finditer(pattern, text, re.IGNORECASE)  
                for match in matches:  
                    entity\_name = match.group(1).strip() if match.groups() else match.group().strip()  
                      
                    entity = EntityInfo(  
                        entity\_id=f"{entity\_type.value}\_{len(entities)}",  
                        entity\_name=entity\_name,  
                        entity\_type=entity\_type  
                    )  
                      
                    # 记录在文档中的位置  
                    entity.mention\_positions.append({  
                        "start\_pos": match.start(),  
                        "end\_pos": match.end(),  
                        "context": text[max(0, match.start()-100):match.end()+100]  
                    })  
                      
                    entities.append(entity)  
          
        return entities  
      
    def\_extract\_relationships(self, text: str, entities: List[EntityInfo]) -> List[EntityRelationship]:  
        """提取主体间关系"""  
        relationships = []  
          
        # 担保关系识别  
        guarantee\_patterns = [  
            r"(.*?)(?:为|对)(.*?)(?:提供担保|承担保证责任)",  
            r"(.*?)(?:担保|保证)(.*?)(?:债券|债务)"  
        ]  
          
        # 控制关系识别  
        control\_patterns = [  
            r"(.*?)(?:控股|持有)(.*?)(?:%|股份|股权)",  
            r"(.*?)(?:为|是)(.*?)(?:的)?(?:控股股东|实际控制人)"  
        ]  
          
        for pattern\_type, patterns in [  
            (RelationshipType.GUARANTEE, guarantee\_patterns),  
            (RelationshipType.CONTROL, control\_patterns)  
        ]:  
            for pattern in patterns:  
                matches = re.finditer(pattern, text, re.IGNORECASE)  
                for match in matches:  
                    # 尝试匹配到具体的主体  
                    from\_entity = self.\_match\_entity\_by\_name(match.group(1), entities)  
                    to\_entity = self.\_match\_entity\_by\_name(match.group(2), entities)  
                      
                    if from\_entity and to\_entity:  
                        relationship = EntityRelationship(  
                            from\_entity\_id=from\_entity.entity\_id,  
                            to\_entity\_id=to\_entity.entity\_id,  
                            relationship\_type=pattern\_type,  
                            relationship\_description=match.group(),  
                            text\_distance=abs(match.start() - match.end())  
                        )  
                        relationships.append(relationship)  
          
        return relationships  
      
    def\_create\_entity\_chunks(self, text: str, entities: List[EntityInfo],   
                            relationships: List[EntityRelationship]) -> List[Dict]:  
        """创建主体相关的文本分块"""  
        chunks = []  
          
        for entity in entities:  
            # 为每个主体创建专门的分块  
            entity\_sections = self.\_find\_entity\_sections(text, entity)  
              
            for section in entity\_sections:  
                # 计算与其他主体的关联强度  
                related\_entities = self.\_find\_related\_entities\_in\_section(  
                    section, entity, entities, relationships  
                )  
                  
                chunk = {  
                    "text": section["text"],  
                    "metadata": {  
                        "primary\_entity\_id": entity.entity\_id,  
                        "primary\_entity\_name": entity.entity\_name,  
                        "primary\_entity\_type": entity.entity\_type.value,  
                        "related\_entities": related\_entities,  
                        "section\_type": section["section\_type"],  
                        "chunk\_type": "entity\_information"  
                    },  
                    "embedding": None# 将在后续步骤中生成  
                }  
                  
                chunks.append(chunk)  
          
        return chunks  
      
    def\_find\_entity\_sections(self, text: str, entity: EntityInfo) -> List[Dict]:  
        """查找主体相关的文档章节"""  
        sections = []  
          
        # 基于主体名称查找相关章节  
        entity\_name\_pattern = re.escape(entity.entity\_name)  
          
        # 查找包含主体名称的段落  
        paragraphs = text.split("\n\n")  
          
        for i, paragraph in enumerate(paragraphs):  
            if re.search(entity\_name\_pattern, paragraph, re.IGNORECASE):  
                # 扩展上下文  
                start\_idx = max(0, i - 1)  
                end\_idx = min(len(paragraphs), i + 2)  
                  
                extended\_text = "\n\n".join(paragraphs[start\_idx:end\_idx])  
                  
                section = {  
                    "text": extended\_text,  
                    "section\_type": self.\_classify\_section\_type(extended\_text),  
                    "paragraph\_index": i,  
                    "relevance\_score": self.\_calculate\_relevance\_score(paragraph, entity)  
                }  
                  
                sections.append(section)  
          
        return sections  
      
    def\_calculate\_relevance\_score(self, text: str, entity: EntityInfo) -> 
 
 float
 :  
        """计算文本与主体的相关性分数"""  
        score = 0.0  
          
        # 基于主体名称出现频率  
        entity\_mentions = len(re.findall(re.escape(entity.entity\_name), text, re.IGNORECASE))  
        score += entity\_mentions * 0.3  
          
        # 基于主体类型相关关键词  
        type\_keywords = {  
            EntityType.ISSUER: ["发行", "募集", "债券", "资金"],  
            EntityType.GUARANTOR: ["担保", "保证", "责任", "承担"],  
            EntityType.SUBSIDIARY: ["子公司", "控股", "投资", "股权"]  
        }  
          
        keywords = type\_keywords.get(entity.entity\_type, [])  
        for
   
 keyword
 in keywords:  
            if
   
 keyword
 in text:  
                score += 0.2  
          
        return min(score, 1.0)  

距离关联解决方案

针对主体和数据距离较远的问题,采用以下策略。

距离关联解决方案架构

picture.image

上下文扩展策略

picture.image

1. 上下文窗口扩展

以下代码实现了上下文窗口扩展策略,通过扩大分块的上下文范围来捕获距离较远但相关的信息:

  
classContextualChunkingStrategy:  
    """上下文分块策略"""  
      
    def\_\_init\_\_(self, window\_size: int = 3):  
        self.window\_size = window\_size  
      
    defcreate\_contextual\_chunks(self, text: str, entities: List[EntityInfo]) -> List[Dict]:  
        """创建包含上下文的分块"""  
        chunks = []  
        paragraphs = text.split("\n\n")  
          
        for entity in entities:  
            entity\_paragraphs = self.\_find\_entity\_paragraphs(paragraphs, entity)  
              
            for para\_idx in entity\_paragraphs:  
                # 扩展上下文窗口  
                start\_idx = max(0, para\_idx - self.window\_size)  
                end\_idx = min(len(paragraphs), para\_idx + self.window\_size + 1)  
                  
                context\_text = "\n\n".join(paragraphs[start\_idx:end\_idx])  
                  
                chunk = {  
                    "text": context\_text,  
                    "primary\_entity": entity.entity\_name,  
                    "context\_range": (start\_idx, end\_idx),  
                    "focus\_paragraph": para\_idx  
                }  
                  
                chunks.append(chunk)  
          
        return chunks  

2. 引用链追踪

  
classReferenceTracker:  
    """引用链追踪器"""  
      
    deftrack\_entity\_references(self, text: str, entities: List[EntityInfo]) -> Dict:  
        """追踪主体引用链"""  
        reference\_chains = {}  
          
        for entity in entities:  
            chains = []  
              
            # 查找直接引用  
            direct\_refs = self.\_find\_direct\_references(text, entity)  
              
            # 查找间接引用(代词、简称等)  
            indirect\_refs = self.\_find\_indirect\_references(text, entity, direct\_refs)  
              
            # 构建引用链  
            all\_refs = direct\_refs + indirect\_refs  
            all\_refs.sort(key=lambda x: x["position"])  
              
            reference\_chains[entity.entity\_id] = all\_refs  
          
        return reference\_chains  
      
    def\_find\_indirect\_references(self, text: str, entity: EntityInfo,   
                                direct\_refs: List[Dict]) -> List[Dict]:  
        """查找间接引用"""  
        indirect\_refs = []  
          
        # 代词引用模式  
        pronoun\_patterns = ["本公司", "该公司", "发行人", "担保人", "公司"]  
          
        for pattern in pronoun\_patterns:  
            matches = re.finditer(pattern, text)  
            for match in matches:  
                # 检查是否在主体提及的上下文中  
                if self.\_is\_in\_entity\_context(match.start(), direct\_refs):  
                    indirect\_refs.append({  
                        "text": match.group(),  
                        "position": match.start(),  
                        "type": "pronoun\_reference",  
                        "confidence": 0.7  
                    })  
          
        return indirect\_refs  

3. 语义关联增强

  
classSemantic
 
 Association
 Enhancer:  
    """语义关联增强器"""  
      
    defenhance\_entity\_
 
 association
 s(self, chunks: List[Dict],   
                                  entities: List[EntityInfo]) -> List[Dict]:  
        """增强主体关联"""  
        enhanced\_chunks = []  
          
        for chunk in chunks:  
            # 为每个分块添加语义关联信息  
            
   
 association
 s = self.\_calculate\_semantic\_associations(  
                chunk["text"], entities  
            )  
              
            enhanced\_chunk = chunk.copy()  
            enhanced\_chunk["semantic\_associations"] = associations  
            enhanced\_chunk["association\_summary"] = self.\_create\_
   
 association
 \_summary(
   
 association
 s)  
              
            enhanced\_chunks.append(enhanced\_chunk)  
          
        return enhanced\_chunks  
      
    def\_calculate\_semantic\_
 
 association
 s(self, text: str,   
                                       entities: List[EntityInfo]) -> Dict:  
        """计算语义关联度"""  
        associations = {}  
          
        for entity in entities:  
            # 计算文本与主体的语义相似度  
            similarity\_score = self.\_calculate\_semantic\_similarity(text, entity)  
              
            # 计算关键词匹配度  
            keyword\_score = self.\_calculate\_keyword\_relevance(text, entity)  
              
            # 综合评分  
            final\_score = (similarity\_score * 0.6 + keyword\_score * 0.4)  
              
            if final\_score > 0.3:  # 阈值过滤  
                associations[entity.entity\_id] = {  
                    "entity\_name": entity.entity\_name,  
                    "entity\_type": entity.entity\_type.value,  
                    "relevance\_score": final\_score,  
                    "association\_type": self.\_classify\_association\_type(text, entity)  
                }  
          
        return
   
 association
 s  

这套设计方案通过结构化的文档分析、标准化的数据处理、智能的主体关联和上下文增强,有效解决了金融文档中复杂结构和远距离关联的问题。

表格数据索引构建与查询优化

金融文档中包含大量的表格数据(如财务报表、数据统计表等),需要专门的索引策略和查询优化方案来提升检索效率。

表格数据结构化处理

表格处理流程架构

picture.image

表格数据模型设计

picture.image

1. 表格识别与解析

以下代码实现了表格的自动识别和结构化解析,通过模式匹配和结构分析来提取表格数据:

  
from dataclasses import dataclass  
from typing import Dict, List, Optional, Any  
from enum import Enum  
import pandas as pd  
import re  
  
classTableType(Enum):  
    """表格类型"""  
    FINANCIAL\_STATEMENT = "financial\_statement"      # 财务报表  
    DATA\_STATISTICS = "data\_statistics"              # 数据统计表  
    COMPARISON\_TABLE = "comparison\_table"            # 对比表  
    SCHEDULE\_TABLE = "schedule\_table"                # 明细表  
    RATIO\_ANALYSIS = "ratio\_analysis"                # 比率分析表  
    CASH\_FLOW\_DETAIL = "cash\_flow\_detail"            # 现金流明细  
  
classTableStructure(Enum):  
    """表格结构类型"""  
    SIMPLE\_GRID = "simple\_grid"          # 简单网格  
    HIERARCHICAL = "hierarchical"        # 层次结构  
    MULTI\_HEADER = "multi\_header"        # 多级表头  
    PIVOT\_TABLE = "pivot\_table"          # 透视表  
    CROSS\_TAB = "cross\_tab"              # 交叉表  
  
@dataclass  
classTableMetadata:  
    """表格元数据"""  
    table\_id: str  
    table\_type: TableType  
    table\_structure: TableStructure  
    title: str  
      
    # 位置信息  
    page\_number: Optional[int] = None  
    section\_name: Optional[str] = None  
    start\_position: Optional[int] = None  
    end\_position: Optional[int] = None  
      
    # 表格属性  
    row\_count: int = 0  
    column\_count: int = 0  
    has\_header: bool = True  
    has\_index: bool = True  
      
    # 数据属性  
    period\_info: Optional[str] = None     # 时间期间  
    currency: Optional[str] = None        # 货币单位  
    unit: Optional[str] = None            # 数值单位  
      
    # 关联信息  
    related\_entities: List[str] = None    # 相关主体  
    related\_tables: List[str] = None      # 相关表格  
      
    def\_\_post\_init\_\_(self):  
        if self.related\_entities isNone:  
            self.related\_entities = []  
        if self.related\_tables isNone:  
            self.related\_tables = []  
  
classTableExtractor:  
    """表格提取器"""  
      
    def\_\_init\_\_(self):  
        self.table\_patterns = self.\_load\_table\_patterns()  
        self.header\_patterns = self.\_load\_header\_patterns()  
      
    defextract\_tables\_from\_text(self, text: str, doc\_metadata: Dict) -> List[Dict]:  
        """从文本中提取表格"""  
        tables = []  
          
        # 1. 识别表格边界  
        table\_boundaries = self.\_identify\_table\_boundaries(text)  
          
        for boundary in table\_boundaries:  
            # 2. 提取表格内容  
            table\_content = text[boundary['start']:boundary['end']]  
              
            # 3. 解析表格结构  
            parsed\_table = self.\_parse\_table\_structure(table\_content)  
              
            # 4. 生成表格元数据  
            table\_metadata = self.\_generate\_table\_metadata(  
                parsed\_table, boundary, doc\_metadata  
            )  
              
            # 5. 创建结构化表格对象  
            structured\_table = {  
                "metadata": table\_metadata,  
                "raw\_content": table\_content,  
                "structured\_data": parsed\_table,  
                "index\_data": self.\_create\_index\_data(parsed\_table, table\_metadata)  
            }  
              
            tables.append(structured\_table)  
          
        return tables  
      
    def\_identify\_table\_boundaries(self, text: str) -> List[Dict]:  
        """识别表格边界"""  
        boundaries = []  
          
        # 表格标识模式  
        table\_start\_patterns = [  
            r"表\s*\d+[::].*?\n",                    # 表1:标题  
            r"(资产负债表|利润表|现金流量表).*?\n",      # 财务报表标题  
            r"单位[::].*?\n.*?\n",                   # 单位说明  
            r"\|.*?\|.*?\|.*?\n"                     # 表格分隔符  
        ]  
          
        # 表格结束模式  
        table\_end\_patterns = [  
            r"\n\s*\n(?!\s*\|)",                     # 空行后非表格内容  
            r"\n(?=表\s*\d+)",                       # 下一个表格开始  
            r"\n(?=[一二三四五六七八九十]+[、.])",        # 章节开始  
            r"\n(?=\d+[、.])",                       # 编号列表  
        ]  
          
        # 执行边界识别  
        for start\_pattern in table\_start\_patterns:  
            start\_matches = list(re.finditer(start\_pattern, text, re.MULTILINE))  
              
            for start\_match in start\_matches:  
                start\_pos = start\_match.start()  
                  
                # 查找表格结束位置  
                end\_pos = len(text)  
                remaining\_text = text[start\_match.end():]  
                  
                for end\_pattern in table\_end\_patterns:  
                    end\_match = re.search(end\_pattern, remaining\_text)  
                    if end\_match:  
                        end\_pos = start\_match.end() + end\_match.start()  
                        break  
                  
                boundaries.append({  
                    "start": start\_pos,  
                    "end": end\_pos,  
                    "confidence": self.\_calculate\_table\_confidence(  
                        text[start\_pos:end\_pos]  
                    )  
                })  
          
        # 按置信度排序并去重  
        boundaries = sorted(boundaries, key=lambda x: x['confidence'], reverse=True)  
        return self.\_remove\_overlapping\_boundaries(boundaries)  
      
    def\_parse\_table\_structure(self, table\_content: str) -> pd.DataFrame:  
        """解析表格结构"""  
        lines = table\_content.strip().split('\n')  
          
        # 识别表头  
        header\_lines = self.\_identify\_header\_lines(lines)  
        data\_lines = lines[len(header\_lines):]  
          
        # 解析列结构  
        columns = self.\_parse\_columns(header\_lines)  
          
        # 解析数据行  
        data\_rows = []  
        for line in data\_lines:  
            if self.\_is\_data\_line(line):  
                row\_data = self.\_parse\_data\_row(line, columns)  
                if row\_data:  
                    data\_rows.append(row\_data)  
          
        # 创建DataFrame  
        if data\_rows and columns:  
            df = pd.DataFrame(data\_rows, columns=columns)  
            return self.\_clean\_dataframe(df)  
          
        return pd.DataFrame()  
      
    def\_create\_index\_data(self, df: pd.DataFrame, metadata: TableMetadata) -> Dict:  
        """创建索引数据"""  
        index\_data = {  
            "row\_index": {},  
            "column\_index": {},  
            "value\_index": {},  
            "semantic\_index": {}  
        }  
          
        if df.empty:  
            return index\_data  
          
        # 行索引:基于行标签或第一列  
        for idx, row in df.iterrows():  
            row\_key = str(row.iloc[0]) if len(row) > 0else str(idx)  
            index\_data["row\_index"][row\_key] = {  
                "row\_number": idx,  
                "row\_data": row.to\_dict(),  
                "row\_text": " ".join([str(v) for v in row.values if pd.notna(v)])  
            }  
          
        # 列索引:基于列名  
        for col in df.columns:  
            col\_values = df[col].dropna().tolist()  
            index\_data["column\_index"][str(col)] = {  
                "column\_name": str(col),  
                "data\_type": str(df[col].dtype),  
                "value\_count": len(col\_values),  
                "sample\_values": col\_values[:5],  
                "column\_text": " ".join([str(v) for v in col\_values[:10]])  
            }  
          
        # 数值索引:提取所有数值数据  
        numeric\_columns = df.select\_dtypes(include=['number']).columns  
        for col in numeric\_columns:  
            values = df[col].dropna()  
            ifnot values.empty:  
                index\_data["value\_index"][str(col)] = {  
                    "min\_value": 
   
 float
 (values.min()),  
                    "max\_value": float(values.max()),  
                    "mean\_value": float(values.mean()),  
                    "value\_range": [float(values.min()), float(values.max())],  
                    "sample\_values": values.head(10).tolist()  
                }  
          
        # 语义索引:基于表格内容的语义信息  
        index\_data["semantic\_index"] = {  
            "table\_summary": self.\_generate\_table\_summary(df, metadata),  
            "key\_metrics": self.\_extract\_key\_metrics(df, metadata),  
            "data\_patterns": self.\_identify\_data\_patterns(df),  
            "content\_keywords": self.\_extract\_content\_keywords(df)  
        }  
          
        return index\_data  

多维索引构建策略

1. 复合索引设计

  
classTableIndexBuilder:  
    """表格索引构建器"""  
      
    def\_\_init\_\_(self, vector\_store):  
        self.vector\_store = vector\_store  
        self.index\_strategies = {  
            "content\_index": self.\_build\_content\_index,  
            "structure\_index": self.\_build\_structure\_index,  
            "semantic\_index": self.\_build\_semantic\_index,  
            "numerical\_index": self.\_build\_numerical\_index,  
            "temporal\_index": self.\_build\_temporal\_index  
        }  
      
    defbuild\_
 
 comprehensive
 \_index(self, tables: List[Dict]) -> Dict:  
        """构建综合索引"""  
        
   
 comprehensive
 \_index = {  
            "content\_index": {},  
            "structure\_index": {},  
            "semantic\_index": {},  
            "numerical\_index": {},  
            "temporal\_index": {},  
            "cross\_reference\_index": {}  
        }  
          
        for table in tables:  
            table\_id = table["metadata"].table\_id  
              
            # 构建各类索引  
            for index\_type, builder\_func in self.index\_strategies.items():  
                index\_data = builder\_func(table)  
                
   
 comprehensive
 \_index[index\_type][table\_id] = index\_data  
          
        # 构建交叉引用索引  
        
   
 comprehensive
 \_index["cross\_reference\_index"] = self.\_build\_cross\_reference\_index(tables)  
          
        return
   
 comprehensive
 \_index  
      
    def\_build\_content\_index(self, table: Dict) -> Dict:  
        """构建内容索引"""  
        df = table["structured\_data"]  
        metadata = table["metadata"]  
          
        content\_index = {  
            "cell\_content": {},  
            "row\_content": {},  
            "column\_content": {},  
            "full\_text": ""  
        }  
          
        # 单元格级别索引  
        for row\_idx, row in df.iterrows():  
            for col\_idx, value in enumerate(row):  
                if pd.notna(value):  
                    cell\_key = f"{row\_idx}\_{col\_idx}"  
                    content\_index["cell\_content"][cell\_key] = {  
                        "value": str(value),  
                        "row\_index": row\_idx,  
                        "column\_index": col\_idx,  
                        "column\_name": df.columns[col\_idx],  
                        "data\_type": type(value).\_\_name\_\_  
                    }  
          
        # 行级别索引  
        for row\_idx, row in df.iterrows():  
            row\_text = " ".join([str(v) for v in row.values if pd.notna(v)])  
            content\_index["row\_content"][str(row\_idx)] = {  
                "text": row\_text,  
                "values": row.to\_dict(),  
                "key\_value": str(row.iloc[0]) if len(row) > 0else""  
            }  
          
        # 列级别索引  
        for col in df.columns:  
            col\_text = " ".join([str(v) for v in df[col].dropna().values])  
            content\_index["column\_content"][str(col)] = {  
                "text": col\_text,  
                "header": str(col),  
                "data\_type": str(df[col].dtype),  
                "unique\_values": df[col].nunique()  
            }  
          
        # 全文索引  
        all\_text\_parts = []  
        all\_text\_parts.append(metadata.title)  
        all\_text\_parts.extend([str(col) for col in df.columns])  
        all\_text\_parts.extend([str(v) for v in df.values.flatten() if pd.notna(v)])  
        content\_index["full\_text"] = " ".join(all\_text\_parts)  
          
        return content\_index  
      
    def\_build\_numerical\_index(self, table: Dict) -> Dict:  
        """构建数值索引"""  
        df = table["structured\_data"]  
          
        numerical\_index = {  
            "ranges": {},  
            "statistics": {},  
            "patterns": {},  
            "comparisons": {}  
        }  
          
        numeric\_columns = df.select\_dtypes(include=['number']).columns  
          
        for col in numeric\_columns:  
            values = df[col].dropna()  
            ifnot values.empty:  
                # 数值范围  
                numerical\_index["ranges"][str(col)] = {  
                    "min": float(values.min()),  
                    "max": float(values.max()),  
                    "range": float(values.max() - values.min()),  
                    "quartiles": values.quantile([0.25, 0.5, 0.75]).tolist()  
                }  
                  
                # 统计信息  
                numerical\_index["statistics"][str(col)] = {  
                    "mean": float(values.mean()),  
                    "median": float(values.median()),  
                    "std": float(values.std()),  
                    "count": int(values.count()),  
                    "sum": float(values.sum())  
                }  
                  
                # 数据模式  
                numerical\_index["patterns"][str(col)] = {  
                    "trend": self.\_analyze\_trend(values),  
                    "seasonality": self.\_detect\_seasonality(values),  
                    "outliers": self.\_detect\_outliers(values),  
                    "growth\_rate": self.\_calculate\_growth\_rate(values)  
                }  
          
        # 列间比较  
        if len(numeric\_columns) > 1:  
            for i, col1 in enumerate(numeric\_columns):  
                for col2 in numeric\_columns[i+1:]:  
                    correlation = df[col1].corr(df[col2])  
                    ifnot pd.isna(correlation):  
                        numerical\_index["comparisons"][f"{col1}\_vs\_{col2}"] = {  
                            "correlation": float(correlation),  
                            "ratio\_mean": float(df[col1].mean() / df[col2].mean()) if df[col2].mean() != 0elseNone  
                        }  
          
        return numerical\_index  
      
    def\_build\_temporal\_index(self, table: Dict) -> Dict:  
        """构建时间索引"""  
        df = table["structured\_data"]  
        metadata = table["metadata"]  
          
        temporal\_index = {  
            "time\_columns": {},  
            "period\_info": {},  
            "time\_series": {},  
            "temporal\_patterns": {}  
        }  
          
        # 识别时间列  
        time\_columns = self.\_identify\_time\_columns(df)  
          
        for col in time\_columns:  
            temporal\_index["time\_columns"][str(col)] = {  
                "column\_name": str(col),  
                "time\_format": self.\_detect\_time\_format(df[col]),  
                "time\_range": self.\_get\_time\_range(df[col]),  
                "frequency": self.\_detect\_frequency(df[col])  
            }  
          
        # 从元数据提取期间信息  
        if metadata.period\_info:  
            temporal\_index["period\_info"] = {  
                "period": metadata.period\_info,  
                "period\_type": self.\_classify\_period\_type(metadata.period\_info),  
                "fiscal\_year": self.\_extract\_fiscal\_year(metadata.period\_info)  
            }  
          
        return temporal\_index  

智能查询引擎

1. 多模态查询处理

  
classTableQueryEngine:  
    """表格查询引擎"""  
      
    def\_\_init\_\_(self, index\_data: Dict, vector\_store):  
        self.index\_data = index\_data  
        self.vector\_store = vector\_store  
        self.query\_processors = {  
            "semantic\_query": self.\_process\_semantic\_query,  
            "numerical\_query": self.\_process\_numerical\_query,  
            "structural\_query": self.\_process\_structural\_query,  
            "temporal\_query": self.\_process\_temporal\_query,  
            "comparative\_query": self.\_process\_comparative\_query  
        }  
      
    defexecute\_query(self, query: str, query\_type: str = "auto") -> Dict:  
        """执行查询"""  
        # 自动识别查询类型  
        if query\_type == "auto":  
            query\_type = self.\_classify\_query\_type(query)  
          
        # 解析查询意图  
        query\_intent = self.\_parse\_query\_intent(query)  
          
        # 执行相应的查询处理  
        if query\_type in self.query\_processors:  
            results = self.query\_processors[query\_type](query, query\_intent)  
        else:  
            # 混合查询处理  
            results = self.\_process\_hybrid\_query(query, query\_intent)  
          
        # 后处理和排序  
        final\_results = self.\_post\_process\_results(results, query\_intent)  
          
        return {  
            "query": query,  
            "query\_type": query\_type,  
            "query\_intent": query\_intent,  
            "results": final\_results,  
            "execution\_info": {  
                "total\_matches": len(final\_results),  
                "processing\_time": self.\_get\_processing\_time(),  
                "confidence\_scores": [r.get("confidence", 0) for r in final\_results]  
            }  
        }  
      
    def\_process\_numerical\_query(self, query: str, intent: Dict) -> List[Dict]:  
        """处理数值查询"""  
        results = []  
          
        # 提取数值条件  
        numerical\_conditions = self.\_extract\_numerical\_conditions(query)  
          
        for table\_id, num\_index in self.index\_data["numerical\_index"].items():  
            for condition in numerical\_conditions:  
                matches = self.\_match\_numerical\_condition(num\_index, condition)  
                  
                for match in matches:  
                    results.append({  
                        "table\_id": table\_id,  
                        "match\_type": "numerical",  
                        "condition": condition,  
                        "match\_details": match,  
                        "confidence": self.\_calculate\_numerical\_confidence(match, condition)  
                    })  
          
        return results  
      
    def\_process\_semantic\_query(self, query: str, intent: Dict) -> List[Dict]:  
        """处理语义查询"""  
        results = []  
          
        # 向量相似度搜索  
        query\_embedding = self.vector\_store.embed\_query(query)  
          
        for table\_id, content\_index in self.index\_data["content\_index"].items():  
            # 全文匹配  
            full\_text = content\_index["full\_text"]  
            text\_embedding = self.vector\_store.embed\_query(full\_text)  
            similarity = self.\_calculate\_cosine\_similarity(query\_embedding, text\_embedding)  
              
            if similarity > 0.3:  # 阈值过滤  
                results.append({  
                    "table\_id": table\_id,  
                    "match\_type": "semantic",  
                    "similarity\_score": similarity,  
                    "matched\_content": full\_text[:200],  
                    "confidence": similarity  
                })  
              
            # 细粒度匹配(行、列、单元格)  
            for content\_type in ["row\_content", "column\_content", "cell\_content"]:  
                for key, content\_data in content\_index[content\_type].items():  
                    if"text"in content\_data:  
                        content\_embedding = self.vector\_store.embed\_query(content\_data["text"])  
                        similarity = self.\_calculate\_cosine\_similarity(query\_embedding, content\_embedding)  
                          
                        if similarity > 0.4:  
                            results.append({  
                                "table\_id": table\_id,  
                                "match\_type": f"semantic\_{content\_type}",  
                                "content\_key": key,  
                                "similarity\_score": similarity,  
                                "matched\_content": content\_data["text"],  
                                "confidence": similarity  
                            })  
          
        return results  
      
    def\_process\_comparative\_query(self, query: str, intent: Dict) -> List[Dict]:  
        """处理比较查询"""  
        results = []  
          
        # 识别比较对象和维度  
        comparison\_entities = self.\_extract\_comparison\_entities(query)  
        comparison\_metrics = self.\_extract\_comparison\_metrics(query)  
          
        # 跨表比较  
        for metric in comparison\_metrics:  
            metric\_data = {}  
              
            # 收集各表中的相关数据  
            for table\_id, num\_index in self.index\_data["numerical\_index"].items():  
                if metric in num\_index["statistics"]:  
                    metric\_data[table\_id] = num\_index["statistics"][metric]  
              
            # 执行比较分析  
            if len(metric\_data) > 1:  
                comparison\_result = self.\_perform\_comparison\_analysis(metric\_data, metric)  
                results.append({  
                    "match\_type": "comparative",  
                    "metric": metric,  
                    "comparison\_data": metric\_data,  
                    "analysis\_result": comparison\_result,  
                    "confidence": 0.8  
                })  
          
        return results  
      
    def\_extract\_numerical\_conditions(self, query: str) -> List[Dict]:  
        """提取数值条件"""  
        conditions = []  
          
        # 数值比较模式  
        patterns = [  
            r"(大于|>)\s*(\d+(?:\.\d+)?)",  
            r"(小于|<)\s*(\d+(?:\.\d+)?)",  
            r"(等于|=)\s*(\d+(?:\.\d+)?)",  
            r"(\d+(?:\.\d+)?)\s*到\s*(\d+(?:\.\d+)?)",  
            r"(\d+(?:\.\d+)?)\s*-\s*(\d+(?:\.\d+)?)"  
        ]  
          
        for pattern in patterns:  
            matches = re.finditer(pattern, query)  
            for match in matches:  
                if"到"in match.group() or"-"in match.group():  
                    # 范围条件  
                    values = re.findall(r"\d+(?:\.\d+)?", match.group())  
                    if len(values) >= 2:  
                        conditions.append({  
                            "type": "range",  
                            "min\_value": float(values[0]),  
                            "max\_value": 
   
 float
 (values[1]),  
                            "original\_text": match.group()  
                        })  
                else:  
                    # 比较条件  
                    operator = match.group(1)  
                    value = 
   
 float
 (match.group(2))  
                    conditions.append({  
                        "type": "comparison",  
                        "operator": operator,  
                        "value": value,  
                        "original\_text": match.group()  
                    })  
          
        return conditions  

查询优化策略

1. 查询性能优化

picture.image

2. 缓存策略实现

  
classQueryCacheManager:  
    """查询缓存管理器"""  
      
    def\_\_init\_\_(self, cache\_size: int = 1000):  
        self.cache\_size = cache\_size  
        self.query\_cache = {}  
        self.access\_times = {}  
        self.cache\_stats = {  
            "hits": 0,  
            "misses": 0,  
            "evictions": 0  
        }  
      
    defget\_cached\_result(self, query\_hash: str) -> Optional[Dict]:  
        """获取缓存结果"""  
        if query\_hash in self.query\_cache:  
            self.access\_times[query\_hash] = time.time()  
            self.cache\_stats["hits"] += 1  
            return self.query\_cache[query\_hash]  
          
        self.cache\_stats["misses"] += 1  
        returnNone  
      
    defcache\_result(self, query\_hash: str, result: Dict) -> None:  
        """缓存查询结果"""  
        if len(self.query\_cache) >= self.cache\_size:  
            self.\_evict\_least\_recently\_used()  
          
        self.query\_cache[query\_hash] = result  
        self.access\_times[query\_hash] = time.time()  
      
    def\_evict\_least\_recently\_used(self) -> None:  
        """淘汰最近最少使用的缓存"""  
        ifnot self.access\_times:  
            return  
          
        lru\_key = min(self.access\_times.items(), key=lambda x: x[1])[0]  
        del self.query\_cache[lru\_key]  
        del self.access\_times[lru\_key]  
        self.cache\_stats["evictions"] += 1  
  
classOptimizedTableQueryEngine(TableQueryEngine):  
    """优化的表格查询引擎"""  
      
    def\_\_init\_\_(self, index\_data: Dict, vector\_store):  
        super().\_\_init\_\_(index\_data, vector\_store)  
        self.cache\_manager = QueryCacheManager()  
        self.query\_optimizer = QueryOptimizer()  
      
    defexecute\_optimized\_query(self, query: str, query\_type: str = "auto") -> Dict:  
        """执行优化查询"""  
        # 生成查询哈希  
        query\_hash = self.\_generate\_query\_hash(query, query\_type)  
          
        # 检查缓存  
        cached\_result = self.cache\_manager.get\_cached\_result(query\_hash)  
        if cached\_result:  
            return cached\_result  
          
        # 查询优化  
        optimized\_query = self.query\_optimizer.optimize\_query(query, self.index\_data)  
          
        # 执行查询  
        result = self.execute\_query(optimized\_query, query\_type)  
          
        # 缓存结果  
        self.cache\_manager.cache\_result(query\_hash, result)  
          
        return result  

这套表格数据索引构建与查询优化方案通过多维索引智能查询引擎性能优化策略缓存机制 ,实现了对金融文档中表格数据的高效检索和精准查询。

文档预处理策略

针对财报、审计报告、债券募集说明书等金融文档的特殊处理:

  
import re  
from typing import List, Dict, Any  
from langchain.text\_splitter import RecursiveCharacterTextSplitter  
from langchain.embeddings import OpenAIEmbeddings  
from pymilvus import Collection, connections, FieldSchema, CollectionSchema, DataType  
  
classFinancialDocumentProcessor:  
    def\_\_init\_\_(self):  
        self.embeddings = OpenAIEmbeddings()  
        self.text\_splitter = RecursiveCharacterTextSplitter(  
            chunk\_size=1000,  
            chunk\_overlap=200,  
            
   
 separator
 s=["\n\n", "\n", "。", ";", ",", " "]  
        )  
      
    defextract\_financial\_metadata(self, text: str, doc\_type: str) -> Dict[str, Any]:  
        """提取金融文档的结构化元数据"""  
        metadata = {  
            "doc\_type": doc\_type,  
            "financial\_indicators": [],  
            "time\_period": None,  
            "company\_name": None,  
            "section\_type": None  
        }  
          
        # 提取财务指标  
        financial\_patterns = [  
            r"营业收入|净利润|总资产|净资产|资产负债率|毛利率|净资产收益率",  
            r"现金流量|经营活动|投资活动|筹资活动",  
            r"应收账款|存货|固定资产|无形资产"  
        ]  
          
        for pattern in financial\_patterns:  
            matches = re.findall(pattern, text)  
            metadata["financial\_indicators"].extend(matches)  
          
        # 提取时间信息  
        time\_pattern = r"(\d{4})年|(\d{4})-(\d{2})-(\d{2})"  
        time\_matches = re.findall(time\_pattern, text)  
        if time\_matches:  
            metadata["time\_period"] = time\_matches[0]  
          
        # 提取公司名称  
        company\_pattern = r"([\u4e00-\u9fa5]+(?:股份)?(?:有限)?公司)"  
        company\_matches = re.findall(company\_pattern, text)  
        if company\_matches:  
            metadata["company\_name"] = company\_matches[0]  
          
        return metadata  
      
    defintelligent\_chunking(self, text: str, doc\_type: str) -> List[Dict[str, Any]]:  
        """智能分块策略"""  
        chunks = []  
          
        if doc\_type == "财报":  
            # 按财务报表结构分块  
            sections = self.\_split\_by\_financial\_sections(text)  
        elif doc\_type == "审计报告":  
            # 按审计意见、管理建议等分块  
            sections = self.\_split\_by\_audit\_sections(text)  
        elif doc\_type == "债券募集说明书":  
            # 按募集资金用途、风险因素等分块  
            sections = self.\_split\_by\_bond\_sections(text)  
        else:  
            sections = [text]  
          
        for section in sections:  
            sub\_chunks = self.text\_splitter.split\_text(section)  
            for chunk in sub\_chunks:  
                metadata = self.extract\_financial\_metadata(chunk, doc\_type)  
                chunks.append({  
                    "text": chunk,  
                    "metadata": metadata,  
                    "embedding": self.embeddings.embed\_query(chunk)  
                })  
          
        return chunks  
      
    def\_split\_by\_financial\_sections(self, text: str) -> List[str]:  
        """按财务报表结构分割"""  
        section\_patterns = [  
            r"资产负债表.*?(?=利润表|现金流量表|$)",  
            r"利润表.*?(?=现金流量表|资产负债表|$)",  
            r"现金流量表.*?(?=资产负债表|利润表|$)",  
            r"财务报表附注.*?(?=$)"  
        ]  
          
        sections = []  
        for pattern in section\_patterns:  
            matches = re.findall(pattern, text, re.DOTALL)  
            sections.extend(matches)  
          
        return sections if sections else [text]  

Metadata辅助查询设计与实现

Metadata架构设计原则

Metadata提取与处理流程

picture.image

Metadata查询架构设计

picture.image

Metadata数据模型层次结构

picture.image

增强的Milvus向量数据库配置

  
from enum import Enum  
from dataclasses import dataclass  
from typing import Optional, Union, List, Dict, Any  
import json  
from datetime import datetime  
  
classDocumentType(Enum):  
    """文档类型枚举"""  
    ANNUAL\_REPORT = "annual\_report"  
    QUARTERLY\_REPORT = "quarterly\_report"  
    AUDIT\_REPORT = "audit\_report"  
    BOND\_PROSPECTUS = "bond\_prospectus"  
    ANNOUNCEMENT = "announcement"  
    RESEARCH\_REPORT = "research\_report"  
    REGULATORY\_FILING = "regulatory\_filing"  
  
classIndustryCategory(Enum):  
    """行业分类枚举"""  
    BANKING = "banking"  
    INSURANCE = "insurance"  
    SECURITIES = "securities"  
    REAL\_ESTATE = "real\_estate"  
    MANUFACTURING = "manufacturing"  
    TECHNOLOGY = "technology"  
    HEALTHCARE = "healthcare"  
    ENERGY = "energy"  
  
@dataclass  
classDocumentMetadata:  
    """文档元数据结构"""  
    # 基础信息  
    document\_id: str  
    title: str  
    doc\_type: DocumentType  
    file\_path: str  
    file\_size: int  
      
    # 业务信息  
    company\_name: str  
    company\_code: Optional[str] = None  
    industry\_category: Optional[IndustryCategory] = None  
    market\_type: Optional[str] = None# 主板/创业板/科创板  
      
    # 时间信息  
    publish\_date: Optional[datetime] = None  
    report\_period: Optional[str] = None# 2023Q1, 2023年度  
    created\_at: datetime = None  
    updated\_at: Optional[datetime] = None  
      
    # 内容信息  
    section\_type: Optional[str] = None  
    page\_number: Optional[int] = None  
    paragraph\_index: Optional[int] = None  
    financial\_indicators: List[str] = None  
      
    # 质量信息  
    confidence\_score: 
   
 float
  = 1.0  
    source\_reliability: str = "high"# high/medium/low  
    data\_version: str = "1.0"  
      
    # 地理信息  
    region: Optional[str] = None  
    country: str = "CN"  
      
    # 自定义标签  
    tags: List[str] = None  
    custom\_fields: Dict[str, Any] = None  
      
    def\_\_post\_init\_\_(self):  
        if self.created\_at isNone:  
            self.created\_at = datetime.now()  
        if self.financial\_indicators isNone:  
            self.financial\_indicators = []  
        if self.tags isNone:  
            self.tags = []  
        if self.custom\_fields isNone:  
            self.custom\_fields = {}  
  
classEnhancedMilvusVectorStore:  
    def\_\_init\_\_(self, collection\_name: str = "financial\_docs\_enhanced"):  
        self.collection\_name = collection\_name  
        self.connect\_to\_milvus()  
        self.create\_enhanced\_collection()  
        self.create\_metadata\_indexes()  
      
    defconnect\_to\_milvus(self):  
        """连接Milvus数据库"""  
        connections.connect(  
            alias="default",  
            host="localhost",  
            port="19530"  
        )  
      
    defcreate\_enhanced\_collection(self):  
        """创建增强的集合schema"""  
        fields = [  
            # 主键和向量  
            FieldSchema(name="id", dtype=DataType.INT64, is\_primary=True, auto\_id=True),  
            FieldSchema(name="document\_id", dtype=DataType.VARCHAR, max\_length=100),  
            FieldSchema(name="text", dtype=DataType.VARCHAR, max\_length=65535),  
            FieldSchema(name="embedding", dtype=DataType.FLOAT\_VECTOR, dim=1536),  
              
            # 基础信息  
            FieldSchema(name="title", dtype=DataType.VARCHAR, max\_length=500),  
            FieldSchema(name="doc\_type", dtype=DataType.VARCHAR, max\_length=50),  
            FieldSchema(name="file\_path", dtype=DataType.VARCHAR, max\_length=1000),  
            FieldSchema(name="file\_size", dtype=DataType.INT64),  
              
            # 业务信息  
            FieldSchema(name="company\_name", dtype=DataType.VARCHAR, max\_length=200),  
            FieldSchema(name="company\_code", dtype=DataType.VARCHAR, max\_length=20),  
            FieldSchema(name="industry\_category", dtype=DataType.VARCHAR, max\_length=50),  
            FieldSchema(name="market\_type", dtype=DataType.VARCHAR, max\_length=20),  
              
            # 时间信息  
            FieldSchema(name="publish\_date", dtype=DataType.VARCHAR, max\_length=20),  
            FieldSchema(name="report\_period", dtype=DataType.VARCHAR, max\_length=20),  
            FieldSchema(name="created\_at", dtype=DataType.VARCHAR, max\_length=30),  
            FieldSchema(name="updated\_at", dtype=DataType.VARCHAR, max\_length=30),  
              
            # 内容信息  
            FieldSchema(name="section\_type", dtype=DataType.VARCHAR, max\_length=100),  
            FieldSchema(name="page\_number", dtype=DataType.INT32),  
            FieldSchema(name="paragraph\_index", dtype=DataType.INT32),  
            FieldSchema(name="financial\_indicators", dtype=DataType.VARCHAR, max\_length=2000),  
              
            # 质量信息  
            FieldSchema(name="confidence\_score", dtype=DataType.FLOAT),  
            FieldSchema(name="source\_reliability", dtype=DataType.VARCHAR, max\_length=20),  
            FieldSchema(name="data\_version", dtype=DataType.VARCHAR, max\_length=20),  
              
            # 地理信息  
            FieldSchema(name="region", dtype=DataType.VARCHAR, max\_length=50),  
            FieldSchema(name="country", dtype=DataType.VARCHAR, max\_length=10),  
              
            # 标签和自定义字段  
            FieldSchema(name="tags", dtype=DataType.VARCHAR, max\_length=1000),  
            FieldSchema(name="custom\_fields", dtype=DataType.VARCHAR, max\_length=5000)  
        ]  
          
        schema = CollectionSchema(fields, "Enhanced financial documents collection with metadata")  
        self.collection = Collection(self.collection\_name, schema)  
          
        # 创建向量索引  
        vector\_index\_params = {  
            "metric\_type": "COSINE",  
            "index\_type": "IVF\_FLAT",  
            "params": {"nlist": 1024}  
        }  
        self.collection.create\_index("embedding", vector\_index\_params)  
      
    defcreate\_metadata\_indexes(self):  
        """为metadata字段创建索引以提高查询性能"""  
        # 为常用查询字段创建索引  
        index\_fields = [  
            "document\_id", "doc\_type", "company\_name", "company\_code",  
            "industry\_category", "market\_type", "publish\_date", "report\_period",  
            "section\_type", "source\_reliability", "region", "country"  
        ]  
          
        for field in index\_fields:  
            try:  
                # 创建标量索引  
                self.collection.create\_index(  
                    field\_name=field,  
                    index\_params={"index\_type": "TRIE"}  
                )  
            
 
 except
  Exception as e:  
                print(f"Warning: Could not create 
 
 index
  for {field}: {e}")  
      
    definsert\_documents\_with\_metadata(self, chunks: List[Dict[str, Any]]):  
        """批量插入带有增强metadata的文档"""  
        entities = []  
          
        for chunk in chunks:  
            metadata = chunk.get("metadata", {})  
              
            # 处理时间字段  
            publish\_date = metadata.get("publish\_date")  
            if isinstance(publish\_date, datetime):  
                publish\_date = publish\_date.isoformat()[:10]  
              
            created\_at = metadata.get("created\_at")  
            if isinstance(created\_at, datetime):  
                created\_at = created\_at.isoformat()  
              
            updated\_at = metadata.get("updated\_at")  
            if isinstance(updated\_at, datetime):  
                updated\_at = updated\_at.isoformat()  
              
            entity = [  
                metadata.get("document\_id", ""),  
                chunk.get("text", ""),  
                chunk.get("embedding", []),  
                metadata.get("title", ""),  
                metadata.get("doc\_type", ""),  
                metadata.get("file\_path", ""),  
                metadata.get("file\_size", 0),  
                metadata.get("company\_name", ""),  
                metadata.get("company\_code", ""),  
                metadata.get("industry\_category", ""),  
                metadata.get("market\_type", ""),  
                publish\_date or"",  
                metadata.get("report\_period", ""),  
                created\_at or"",  
                updated\_at or"",  
                metadata.get("section\_type", ""),  
                metadata.get("page\_number", -1),  
                metadata.get("paragraph\_index", -1),  
                ",".join(metadata.get("financial\_indicators", [])),  
                metadata.get("confidence\_score", 1.0),  
                metadata.get("source\_reliability", "high"),  
                metadata.get("data\_version", "1.0"),  
                metadata.get("region", ""),  
                metadata.get("country", "CN"),  
                ",".join(metadata.get("tags", [])),  
                json.dumps(metadata.get("custom\_fields", {}), ensure\_ascii=False)  
            ]  
            entities.append(entity)  
          
        self.collection.insert(entities)  
        self.collection.flush()  
        print(f"Inserted {len(entities)} documents with enhanced metadata")  

Metadata辅助查询实现

  
classMetadataQueryBuilder:  
    """Metadata查询构建器"""  
      
    def\_\_init\_\_(self, vector\_store: EnhancedMilvusVectorStore):  
        self.vector\_store = vector\_store  
        self.filters = []  
        self.vector\_query = None  
        self.limit = 10  
        self.output\_fields = ["*"]  
      
    deffilter\_by\_company(self, company\_names: Union[str, List[str]]) -> 'MetadataQueryBuilder':  
        """按公司名称过滤"""  
        if isinstance(company\_names, str):  
            company\_names = [company\_names]  
          
        company\_filter = f"company\_name in {company\_names}"  
        self.filters.append(company\_filter)  
        return self  
      
    deffilter\_by\_doc\_type(self, doc\_types: Union[str, List[str]]) -> 'MetadataQueryBuilder':  
        """按文档类型过滤"""  
        if isinstance(doc\_types, str):  
            doc\_types = [doc\_types]  
          
        doc\_type\_filter = f"doc\_type in {doc\_types}"  
        self.filters.append(doc\_type\_filter)  
        return self  
      
    deffilter\_by\_industry(self, industries: Union[str, List[str]]) -> 'MetadataQueryBuilder':  
        """按行业分类过滤"""  
        if isinstance(industries, str):  
            industries = [industries]  
          
        industry\_filter = f"industry\_category in {industries}"  
        self.filters.append(industry\_filter)  
        return self  
      
    deffilter\_by\_date\_range(self, start\_date: str, end\_date: str) -> 'MetadataQueryBuilder':  
        """按日期范围过滤"""  
        date\_filter = f"publish\_date >= '{start\_date}' and publish\_date <= '{end\_date}'"  
        self.filters.append(date\_filter)  
        return self  
      
    deffilter\_by\_report\_period(self, periods: Union[str, List[str]]) -> 'MetadataQueryBuilder':  
        """按报告期过滤"""  
        if isinstance(periods, str):  
            periods = [periods]  
          
        period\_filter = f"report\_period in {periods}"  
        self.filters.append(period\_filter)  
        return self  
      
    deffilter\_by\_section(self, sections: Union[str, List[str]]) -> 'MetadataQueryBuilder':  
        """按章节类型过滤"""  
        if isinstance(sections, str):  
            sections = [sections]  
          
        section\_filter = f"section\_type in {sections}"  
        self.filters.append(section\_filter)  
        return self  
      
    deffilter\_by\_confidence(self, min\_confidence: float) -> 'MetadataQueryBuilder':  
        """按置信度过滤"""  
        confidence\_filter = f"confidence\_score >= {min\_confidence}"  
        self.filters.append(confidence\_filter)  
        return self  
      
    deffilter\_by\_tags(self, tags: Union[str, List[str]], match\_all: bool = False) -> 'MetadataQueryBuilder':  
        """按标签过滤"""  
        if isinstance(tags, str):  
            tags = [tags]  
          
        if match\_all:  
            # 所有标签都必须匹配  
            tag\_filters = [f"tags like '%{tag}%'"for tag in tags]  
            tag\_filter = " and ".join(tag\_filters)  
        else:  
            # 任一标签匹配即可  
            tag\_filters = [f"tags like '%{tag}%'"for tag in tags]  
            tag\_filter = " or ".join(tag\_filters)  
          
        self.filters.append(f"({tag\_filter})")  
        return self  
      
    deffilter\_by\_custom\_field(self, field\_name: str, field\_value: Any) -> 'MetadataQueryBuilder':  
        """按自定义字段过滤"""  
        custom\_filter = f"custom\_fields like '%\"{field\_name}\":\"{field\_value}\"%'"  
        self.filters.append(custom\_filter)  
        return self  
      
    defwith\_vector\_similarity(self, query\_vector: List[float], top\_k: int = 10) -> 'MetadataQueryBuilder':  
        """添加向量相似度查询"""  
        self.vector\_query = query\_vector  
        self.limit = top\_k  
        return self  
      
    defselect\_fields(self, fields: List[str]) -> 'MetadataQueryBuilder':  
        """选择输出字段"""  
        self.output\_fields = fields  
        return self  
      
    defexecute(self) -> List[Dict[str, Any]]:  
        """执行查询"""  
        self.vector\_store.collection.load()  
          
        # 构建过滤表达式  
        filter\_expr = " and ".join(self.filters) if self.filters elseNone  
          
        if self.vector\_query:  
            # 向量相似度查询 + metadata过滤  
            search\_params = {  
                "metric\_type": "COSINE",  
                "params": {"nprobe": 10}  
            }  
              
            results = self.vector\_store.collection.search(  
                data=[self.vector\_query],  
                anns\_field="embedding",  
                param=search\_params,  
                limit=self.limit,  
                expr=filter\_expr,  
                output\_fields=self.output\_fields  
            )  
              
            # 格式化结果  
            formatted\_results = []  
            for hit in results[0]:  
                result = {  
                    "id": hit.id,  
                    "score": hit.score,  
                    "distance": hit.distance  
                }  
                  
                # 添加所有输出字段  
                for field in self.output\_fields:  
                    if field != "*"and hasattr(hit.entity, field):  
                        result[field] = hit.entity.get(field)  
                    elif field == "*":  
                        # 获取所有字段  
                        for entity\_field in hit.entity.fields:  
                            result[entity\_field] = hit.entity.get(entity\_field)  
                  
                formatted\_results.append(result)  
              
            return formatted\_results  
          
        else:  
            # 纯metadata查询  
            ifnot filter\_expr:  
                raise ValueError("Must provide either vector query or metadata filters")  
              
            results = self.vector\_store.collection.query(  
                expr=filter\_expr,  
                output\_fields=self.output\_fields,  
                limit=self.limit  
            )  
              
            return results  
  
classSmartMetadataExtractor:  
    """智能metadata提取器"""  
      
    def\_\_init\_\_(self, llm\_client):  
        self.llm\_client = llm\_client  
      
    defextract\_
 
 comprehensive
 \_metadata(self, text: str, doc\_info: Dict[str, Any]) -> DocumentMetadata:  
        """提取全面的metadata"""  
          
        # 基础信息  
        document\_id = doc\_info.get("document\_id", self.\_generate\_document\_id())  
        title = doc\_info.get("title", self.\_extract\_title(text))  
        doc\_type = self.\_classify\_document\_type(text, doc\_info)  
          
        # 使用LLM提取业务信息  
        business\_info = self.\_extract\_business\_info(text)  
          
        # 提取时间信息  
        time\_info = self.\_extract\_time\_info(text)  
          
        # 提取财务指标  
        financial\_indicators = self.\_extract\_financial\_indicators(text)  
          
        # 生成标签  
        tags = self.\_generate\_tags(text, business\_info)  
          
        # 计算置信度  
        confidence\_score = self.\_calculate\_confidence\_score(text, business\_info)  
          
        return DocumentMetadata(  
            document\_id=document\_id,  
            title=title,  
            doc\_type=doc\_type,  
            file\_path=doc\_info.get("file\_path", ""),  
            file\_size=doc\_info.get("file\_size", 0),  
            company\_name=business\_info.get("company\_name", ""),  
            company\_code=business\_info.get("company\_code"),  
            industry\_category=business\_info.get("industry\_category"),  
            market\_type=business\_info.get("market\_type"),  
            publish\_date=time\_info.get("publish\_date"),  
            report\_period=time\_info.get("report\_period"),  
            section\_type=self.\_identify\_section\_type(text),  
            financial\_indicators=financial\_indicators,  
            confidence\_score=confidence\_score,  
            source\_reliability=self.\_assess\_source\_reliability(doc\_info),  
            region=business\_info.get("region"),  
            tags=tags,  
            custom\_fields=doc\_info.get("custom\_fields", {})  
        )  
      
    def\_extract\_business\_info(self, text: str) -> Dict[str, Any]:  
        """使用LLM提取业务信息"""  
        prompt = f"""  
        从以下文本中提取业务信息,返回JSON格式:  
          
        文本:{text[:1000]}...  
          
        请提取:  
        1. company\_name: 公司名称  
        2. company\_code: 股票代码  
        3. industry\_category: 行业分类(banking/insurance/securities/real\_estate/manufacturing/technology/healthcare/energy)  
        4. market\_type: 市场类型(主板/创业板/科创板/新三板)  
        5. region: 地区  
          
        返回JSON格式,如果信息不存在则返回null。  
        """  
          
        try:  
            
   
 response
  = self.llm\_client.generate(prompt)  
            return json.loads(
   
 response
 )  
        
 
 except
 :  
            return {}  
      
    def\_extract\_time\_info(self, text: str) -> Dict[str, Any]:  
        """提取时间信息"""  
        import re  
          
        time\_info = {}  
          
        # 提取发布日期  
        date\_patterns = [  
            r"(\d{4})年(\d{1,2})月(\d{1,2})日",  
            r"(\d{4})-(\d{2})-(\d{2})",  
            r"(\d{4})\.(\d{2})\.(\d{2})"  
        ]  
          
        for pattern in date\_patterns:  
            matches = re.findall(pattern, text)  
            if matches:  
                year, month, day = matches[0]  
                time\_info["publish\_date"] = datetime(int(year), int(month), int(day))  
                break  
          
        # 提取报告期  
        period\_patterns = [  
            r"(\d{4})年度",  
            r"(\d{4})年第([一二三四])季度",  
            r"(\d{4})Q([1-4])"  
        ]  
          
        for pattern in period\_patterns:  
            matches = re.findall(pattern, text)  
            if matches:  
                if"年度"in pattern:  
                    time\_info["report\_period"] = f"{matches[0]}年度"  
                else:  
                    year, quarter = matches[0]  
                    time\_info["report\_period"] = f"{year}Q{quarter}"  
                break  
          
        return time\_info  
      
    def\_extract\_financial\_indicators(self, text: str) -> List[str]:  
        """提取财务指标"""  
        indicators = [  
            "营业收入", "净利润", "总资产", "净资产", "资产负债率",  
            "毛利率", "净资产收益率", "每股收益", "现金流量",  
            "应收账款", "存货", "固定资产", "无形资产", "短期借款",  
            "长期借款", "股东权益", "营业成本", "管理费用", "财务费用"  
        ]  
          
        found\_indicators = []  
        for indicator in indicators:  
            if indicator in text:  
                found\_indicators.append(indicator)  
          
        return found\_indicators  
      
    def\_generate\_tags(self, text: str, business\_info: Dict[str, Any]) -> List[str]:  
        """生成标签"""  
        tags = []  
          
        # 基于行业的标签  
        industry = business\_info.get("industry\_category")  
        if industry:  
            tags.append(f"行业\_{industry}")  
          
        # 基于内容的标签  
        if"风险"in text:  
            tags.append("风险分析")  
        if"财务"in text:  
            tags.append("财务信息")  
        if"审计"in text:  
            tags.append("审计报告")  
        if"投资"in text:  
            tags.append("投资相关")  
          
        return tags  
      
    def\_calculate\_confidence\_score(self, text: str, business\_info: Dict[str, Any]) -> float:  
        """计算置信度分数"""  
        score = 0.5# 基础分数  
          
        # 基于文本长度  
        if len(text) > 500:  
            score += 0.2  
          
        # 基于提取信息的完整性  
        if business\_info.get("company\_name"):  
            score += 0.1  
        if business\_info.get("company\_code"):  
            score += 0.1  
        if business\_info.get("industry\_category"):  
            score += 0.1  
          
        return min(1.0, score)  
      
    def\_generate\_document\_id(self) -> str:  
        """生成文档ID"""  
        import uuid  
        return str(uuid.uuid4())  
      
    def\_extract\_title(self, text: str) -> str:  
        """提取标题"""  
        lines = text.split('\n')  
        for line in lines[:5]:  # 检查前5行  
            if len(line.strip()) > 10and len(line.strip()) < 100:  
                return line.strip()  
        return text[:50] + "..."  
      
    def\_classify\_document\_type(self, text: str, doc\_info: Dict[str, Any]) -> DocumentType:  
        """分类文档类型"""  
        text\_lower = text.lower()  
          
        if"年报"in text or"annual report"in text\_lower:  
            return DocumentType.ANNUAL\_REPORT  
        elif"季报"in text or"quarterly"in text\_lower:  
            return DocumentType.QUARTERLY\_REPORT  
        elif"审计"in text or"audit"in text\_lower:  
            return DocumentType.AUDIT\_REPORT  
        elif"债券"in text or"bond"in text\_lower:  
            return DocumentType.BOND\_PROSPECTUS  
        elif"公告"in text or"announcement"in text\_lower:  
            return DocumentType.ANNOUNCEMENT  
        else:  
            return DocumentType.RESEARCH\_REPORT  
      
    def\_identify\_section\_type(self, text: str) -> str:  
        """识别章节类型"""  
        section\_keywords = {  
            "财务状况": ["资产负债表", "财务状况", "资产", "负债"],  
            "经营成果": ["利润表", "收入", "利润", "经营成果"],  
            "现金流量": ["现金流量表", "现金流", "经营活动", "投资活动", "筹资活动"],  
            "风险因素": ["风险", "不确定性", "风险因素"],  
            "管理层讨论": ["管理层讨论", "MD&A", "经营分析"],  
            "审计意见": ["审计意见", "审计报告", "注册会计师"]  
        }  
          
        for section, keywords in section\_keywords.items():  
            if any(
   
 keyword
 in text for keyword in keywords):  
                return section  
          
        return"其他"  
      
    def\_assess\_source\_reliability(self, doc\_info: Dict[str, Any]) -> str:  
        """评估来源可靠性"""  
        file\_path = doc\_info.get("file\_path", "")  
          
        if"official"in file\_path.lower() or"年报"in file\_path:  
            return"high"  
        elif"research"in file\_path.lower():  
            return"medium"  
        else:  
            return"low"  

Metadata辅助查询实际应用示例

  
classMetadataAssistedRAG:  
    """基于Metadata辅助的RAG系统"""  
      
    def\_\_init\_\_(self, vector\_store: EnhancedMilvusVectorStore, llm\_client, embedding\_client):  
        self.vector\_store = vector\_store  
        self.llm\_client = llm\_client  
        self.embedding\_client = embedding\_client  
        self.metadata\_extractor = SmartMetadataExtractor(llm\_client)  
      
    defsmart\_query(self, user\_query: str, query\_context: Dict[str, Any] = None) -> Dict[str, Any]:  
        """智能查询,自动识别查询意图并应用相应的metadata过滤"""  
          
        # 1. 分析查询意图  
        query\_intent = self.\_analyze\_query\_intent(user\_query)  
          
        # 2. 生成查询向量  
        query\_vector = self.embedding\_client.embed\_query(user\_query)  
          
        # 3. 构建metadata查询  
        query\_builder = MetadataQueryBuilder(self.vector\_store)  
        query\_builder = self.\_apply\_intent\_filters(query\_builder, query\_intent, query\_context)  
          
        # 4. 执行混合查询  
        results = query\_builder.with\_vector\_similarity(query\_vector, top\_k=20).execute()  
          
        # 5. 后处理和重排序  
        processed\_results = self.\_post\_process\_results(results, user\_query, query\_intent)  
          
        return {  
            "query": user\_query,  
            "intent": query\_intent,  
            "results": processed\_results[:10],  
            "metadata\_filters\_applied": query\_builder.filters,  
            "total\_matches": len(results)  
        }  
      
    def\_analyze\_query\_intent(self, query: str) -> Dict[str, Any]:  
        """分析查询意图"""  
        intent\_prompt = f"""  
        分析以下查询的意图,提取关键信息:  
          
        查询:{query}  
          
        请识别:  
        1. 目标公司(如果有)  
        2. 时间范围(如果有)  
        3. 文档类型偏好(年报/季报/审计报告等)  
        4. 行业类别(如果有)  
        5. 财务指标关注点  
        6. 查询类型(对比分析/趋势分析/风险评估/财务分析等)  
          
        返回JSON格式。  
        """  
          
        try:  
            
   
 response
  = self.llm\_client.generate(intent\_prompt)  
            return json.loads(response)  
        
 
 except
 :  
            return {}  
      
    def\_apply\_intent\_filters(self, query\_builder: MetadataQueryBuilder,   
                            intent: Dict[str, Any], context: Dict[str, Any] = None) -> MetadataQueryBuilder:  
        """根据查询意图应用metadata过滤器"""  
          
        # 公司过滤  
        if intent.get("target\_companies"):  
            query\_builder.filter\_by\_company(intent["target\_companies"])  
          
        # 时间过滤  
        if intent.get("time\_range"):  
            time\_range = intent["time\_range"]  
            if"start\_date"in time\_range and"end\_date"in time\_range:  
                query\_builder.filter\_by\_date\_range(time\_range["start\_date"], time\_range["end\_date"])  
          
        # 文档类型过滤  
        if intent.get("document\_types"):  
            query\_builder.filter\_by\_doc\_type(intent["document\_types"])  
          
        # 行业过滤  
        if intent.get("industries"):  
            query\_builder.filter\_by\_industry(intent["industries"])  
          
        # 报告期过滤  
        if intent.get("report\_periods"):  
            query\_builder.filter\_by\_report\_period(intent["report\_periods"])  
          
        # 章节类型过滤  
        if intent.get("section\_preferences"):  
            query\_builder.filter\_by\_section(intent["section\_preferences"])  
          
        # 置信度过滤  
        min\_confidence = context.get("min\_confidence", 0.7) if context else0.7  
        query\_builder.filter\_by\_confidence(min\_confidence)  
          
        # 标签过滤  
        if intent.get("tags"):  
            query\_builder.filter\_by\_tags(intent["tags"])  
          
        return query\_builder  
      
    def\_post\_process\_results(self, results: List[Dict[str, Any]],   
                            query: str, intent: Dict[str, Any]) -> List[Dict[str, Any]]:  
        """后处理和重排序结果"""  
          
        # 1. 基于查询意图的相关性评分  
        for result in results:  
            result["relevance\_score"] = self.\_calculate\_relevance\_score(result, query, intent)  
          
        # 2. 去重(基于document\_id和section\_type)  
        unique\_results = self.\_deduplicate\_results(results)  
          
        # 3. 重排序  
        sorted\_results = sorted(unique\_results,   
                              key=lambda x: (x["relevance\_score"], x["score"]),   
                              reverse=True)  
          
        return sorted\_results  
      
    def\_calculate\_relevance\_score(self, result: Dict[str, Any],   
                                 query: str, intent: Dict[str, Any]) -> float:  
        """计算相关性评分"""  
        score = result.get("score", 0.0)  
          
        # 基于文档类型的加权  
        doc\_type = result.get("doc\_type", "")  
        if intent.get("document\_types") and doc\_type in intent["document\_types"]:  
            score += 0.1  
          
        # 基于时间新近性的加权  
        publish\_date = result.get("publish\_date", "")  
        if publish\_date:  
            try:  
                pub\_date = datetime.fromisoformat(publish\_date)  
                days\_ago = (datetime.now() - pub\_date).days  
                if days\_ago < 365:  # 一年内的文档  
                    score += 0.05  
            except:  
                pass  
          
        # 基于置信度的加权  
        confidence = result.get("confidence\_score", 0.0)  
        score += confidence * 0.1  
          
        return score  
      
    def\_deduplicate\_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:  
        """去重结果"""  
        seen = set()  
        unique\_results = []  
          
        for result in results:  
            key = (result.get("document\_id", ""), result.get("section\_type", ""))  
            if key notin seen:  
                seen.add(key)  
                unique\_results.append(result)  
          
        return unique\_results  
  
# 使用示例  
defdemonstrate\_metadata\_assisted\_queries():  
    """演示metadata辅助查询的各种场景"""  
      
    # 初始化系统  
    vector\_store = EnhancedMilvusVectorStore()  
    rag\_system = MetadataAssistedRAG(vector\_store, llm\_client, embedding\_client)  
      
    # 场景1:特定公司的财务分析  
    print("=== 场景1:特定公司财务分析 ===")  
    query1 = "分析腾讯控股2023年的盈利能力"  
    result1 = rag\_system.smart\_query(query1)  
    print(f"查询:{result1['query']}")  
    print(f"应用的过滤器:{result1['metadata\_filters\_applied']}")  
    print(f"匹配结果数:{result1['total\_matches']}")  
      
    # 场景2:行业对比分析  
    print("\n=== 场景2:行业对比分析 ===")  
    query2 = "比较银行业和保险业的风险管理策略"  
    context2 = {"min\_confidence": 0.8}  
    result2 = rag\_system.smart\_query(query2, context2)  
      
    # 场景3:时间序列分析  
    print("\n=== 场景3:时间序列分析 ===")  
    query3 = "分析2022-2023年科技行业的研发投入趋势"  
    result3 = rag\_system.smart\_query(query3)  
      
    # 场景4:特定文档类型查询  
    print("\n=== 场景4:审计报告专项查询 ===")  
    query4 = "查找最近的审计意见中提到的重大风险"  
    result4 = rag\_system.smart\_query(query4)  
      
    return [result1, result2, result3, result4]  
  
# 高级查询示例  
defadvanced\_metadata\_queries():  
    """高级metadata查询示例"""  
      
    vector\_store = EnhancedMilvusVectorStore()  
    query\_builder = MetadataQueryBuilder(vector\_store)  
      
    # 复杂组合查询1:特定行业、时间范围、高置信度  
    print("=== 复杂查询1:银行业2023年高质量财务数据 ===")  
    results1 = (query\_builder  
                .filter\_by\_industry(["banking"])  
                .filter\_by\_date\_range("2023-01-01", "2023-12-31")  
                .filter\_by\_confidence(0.9)  
                .filter\_by\_doc\_type(["annual\_report", "quarterly\_report"])  
                .select\_fields(["company\_name", "title", "publish\_date", "confidence\_score"])  
                .execute())  
      
    # 复杂组合查询2:多公司对比  
    print("\n=== 复杂查询2:BATJ公司对比分析 ===")  
    query\_builder = MetadataQueryBuilder(vector\_store)  # 重新初始化  
    results2 = (query\_builder  
                .filter\_by\_company(["百度", "阿里巴巴", "腾讯", "京东"])  
                .filter\_by\_section(["财务状况", "经营成果"])  
                .filter\_by\_tags(["财务信息"], match\_all=True)  
                .execute())  
      
    # 复杂组合查询3:风险相关文档  
    print("\n=== 复杂查询3:风险管理专项分析 ===")  
    query\_builder = MetadataQueryBuilder(vector\_store)  # 重新初始化  
    results3 = (query\_builder  
                .filter\_by\_tags(["风险分析", "风险管理"], match\_all=False)  
                .filter\_by\_section(["风险因素", "管理层讨论"])  
                .filter\_by\_confidence(0.8)  
                .filter\_by\_custom\_field("risk\_level", "high")  
                .execute())  
      
    return results1, results2, results3  

Metadata查询性能优化策略

picture.image

  
classMetadataQueryOptimizer:  
    """Metadata查询性能优化器"""  
      
    def\_\_init\_\_(self, vector\_store: EnhancedMilvusVectorStore):  
        self.vector\_store = vector\_store  
        self.query\_cache = {}  
        self.metadata\_cache = {}  
        self.performance\_stats = {  
            "cache\_hits": 0,  
            "cache\_misses": 0,  
            "avg\_query\_time": 0.0  
        }  
      
    defoptimize\_collection\_structure(self):  
        """优化集合结构"""  
          
        # 1. 创建复合索引  
        self.\_create\_
   
 composite
 \_indexes()  
          
        # 2. 设置分区策略  
        self.\_setup\_
   
 partition
 ing()  
          
        # 3. 优化内存配置  
        self.\_optimize\_memory\_settings()  
      
    def\_create\_
 
 composite
 \_indexes(self):  
        """创建复合索引以提高多字段查询性能"""  
          
        # 常用查询组合的复合索引  
        composite\_indexes = [  
            ["company\_name", "doc\_type", "publish\_date"],  
            ["industry\_category", "report\_period"],  
            ["doc\_type", "section\_type", "confidence\_score"],  
            ["publish\_date", "source\_reliability"]  
        ]  
          
        for fields in composite\_indexes:  
            try:  
                # Milvus中的复合索引实现  
                index\_name = "\_".join(fields)  
                print(f"Creating 
 
 composite
 
 
 index
 : {index\_name}")  
                # 注意:实际的复合索引创建需要根据Milvus版本调整  
            
 
 except
  Exception as e:  
                print(f"Failed to create 
 
 composite
 
 
 index
  for {fields}: {e}")  
      
    def\_setup\_partitioning(self):  
        """设置分区策略"""  
          
        # 基于时间的分区策略  
        time\_partitions = [  
            "2020\_2021", "2022", "2023", "2024"  
        ]  
          
        # 基于文档类型的分区策略  
        doc\_type\_
   
 partition
 s = [  
            "annual\_reports", "quarterly\_reports",   
            "audit\_reports", "announcements"  
        ]  
          
        print("Setting up 
 
 partition
 ing strategy...")  
        # 实际分区创建逻辑  
      
    def\_optimize\_memory\_settings(self):  
        """优化内存设置"""  
          
        # 设置合适的缓存大小  
        cache\_config = {  
            "cache.cache\_size": "2GB",  
            "cache.insert\_buffer\_size": "256MB",  
            "cache.preload\_collection": True  
        }  
          
        print("Optimizing memory settings...")  
        # 应用内存配置  
      
    defexecute\_optimized\_query(self, query\_builder: MetadataQueryBuilder) -> List[Dict[str, Any]]:  
        """执行优化的查询"""  
        import time  
        import hashlib  
          
        start\_time = time.time()  
          
        # 1. 生成查询缓存键  
        query\_key = self.\_generate\_cache\_key(query\_builder)  
          
        # 2. 检查缓存  
        if query\_key in self.query\_cache:  
            self.performance\_stats["cache\_hits"] += 1  
            return self.query\_cache[query\_key]  
          
        # 3. 执行查询  
        self.performance\_stats["cache\_misses"] += 1  
        results = query\_builder.execute()  
          
        # 4. 缓存结果  
        self.query\_cache[query\_key] = results  
          
        # 5. 更新性能统计  
        query\_time = time.time() - start\_time  
        self.\_update\_performance\_stats(query\_time)  
          
        return results  
      
    def\_generate\_cache\_key(self, query\_builder: MetadataQueryBuilder) -> str:  
        """生成查询缓存键"""  
          
        cache\_data = {  
            "filters": sorted(query\_builder.filters),  
            "limit": query\_builder.limit,  
            "output\_fields": sorted(query\_builder.output\_fields),  
            "has\_vector\_query": query\_builder.vector\_query isnotNone  
        }  
          
        cache\_str = json.dumps(cache\_data, sort\_keys=True)  
        return hashlib.md5(cache\_str.encode()).hexdigest()  
      
    def\_update\_performance\_stats(self, query\_time: float):  
        """更新性能统计"""  
          
        total\_queries = self.performance\_stats["cache\_hits"] + self.performance\_stats["cache\_misses"]  
          
        if total\_queries == 1:  
            self.performance\_stats["avg\_query\_time"] = query\_time  
        else:  
            # 计算移动平均  
            current\_avg = self.performance\_stats["avg\_query\_time"]  
            self.performance\_stats["avg\_query\_time"] = (  
                (current\_avg * (total\_queries - 1) + query\_time) / total\_queries  
            )  
      
    defget\_performance\_report(self) -> Dict[str, Any]:  
        """获取性能报告"""  
          
        total\_queries = self.performance\_stats["cache\_hits"] + self.performance\_stats["cache\_misses"]  
        cache\_hit\_rate = self.performance\_stats["cache\_hits"] / total\_queries if total\_queries > 0else0  
          
        return {  
            "total\_queries": total\_queries,  
            "cache\_hit\_rate": f"{cache\_hit\_rate:.2%}",  
            "avg\_query\_time": f"{self.performance\_stats['avg\_query\_time']:.3f}s",  
            "cache\_hits": self.performance\_stats["cache\_hits"],  
            "cache\_misses": self.performance\_stats["cache\_misses"]  
        }  
      
    defclear\_cache(self):  
        """清空缓存"""  
        self.query\_cache.clear()  
        self.metadata\_cache.clear()  
        print("Cache cleared")  

金融文档知识库构建策略

知识库架构设计

0
0
0
0
评论
未登录
暂无评论