在AI时代,RAG(检索增强生成)系统已成为企业知识管理的核心技术。然而,面对海量文档,如何精准找到用户真正需要的信息?如何让AI不再"答非所问"?这正是每个技术团队都在面临的"大海捞针"挑战。
这篇文章将尝试解决什么问题?
以金融文档为例,展开讨论。同时,因为个人并非 AI 专业,仅以 AI 爱好者的视角来探讨哈,欢迎批评批证 😊,文章稍长,推荐关注、收藏哈,文中代码仅用于说明逻辑(重点看注释和方法名),实际情况会更复杂。
👆👆👆欢迎关注,一起进步👆👆👆
金融文档检索难题
- 年报、季报、审计报告中的关键信息如何快速定位?
- 财务三大表数据如何统一处理和智能检索?
- 多主体关联信息(发行人、担保人、子公司)如何有效管理?
检索效果优化挑战
- 用户查询与专业术语不匹配怎么办?
- 如何提升召回率的同时保证准确性?
- 向量检索结果如何进行质量诊断和可视化验证?
系统架构设计困惑
- 向量数据库集合应该统一还是分离?
- 如何设计高效的索引和元数据结构?
- 查询优化和重排序策略如何实现?
RAG系统概述
RAG(Retrieval-Augmented Generation)是一种结合检索和生成的AI架构,通过从外部知识库检索相关信息来增强大语言模型的回答能力。
大海捞针问题分析
问题本质
"大海捞针"问题指的是在海量文档中精确找到与查询相关的信息片段。主要挑战包括:
- 语义匹配难度 :用户查询与文档内容的表达方式差异
- 信息密度低 :相关信息在大量无关内容中稀疏分布
- 上下文依赖 :关键信息可能分散在多个文档片段中
- 查询表达局限 :用户难以用专业术语精确表达需求
解决思路
向量数据库索引构建
集合架构设计决策分析
在RAG系统中,向量数据库的集合设计是一个关键的架构决策。我们需要在统一集合 和分离集合 之间做出选择,这个决策将直接影响系统的性能、可维护性和扩展性。
统一集合 vs 分离集合对比分析
统一集合设计的核心架构
我们选择了统一集合设计 ,主要基于以下分析:
1. 架构简化优势
- 单一集合减少系统复杂度
- 统一的索引和缓存策略
- 简化备份和维护操作
- 降低运维成本
2. 跨文档类型检索能力
- 支持同时检索多种文档类型的相关信息
- 便于进行综合分析和对比
- 避免多次查询不同集合的复杂性
- 提供更丰富的上下文信息
3. 灵活的业务逻辑支持
- 通过metadata实现业务级别的文档分类
- 支持动态添加新的文档类型
- 便于实现复杂的业务查询逻辑
- 适应业务需求变化
统一集合的技术实现策略
1. 智能分区机制
# 基于时间的分区策略
time\_
partition
s = [
"2020\_2021", "2022", "2023", "2024"
]
# 基于文档类型的分区策略
doc\_type\_
partition
s = [
"annual\_reports", # 年报
"quarterly\_reports", # 季报
"audit\_reports", # 审计报告
"announcements" # 公告
]
2. 丰富的Metadata字段设计
doc\_type: 文档类型标识company\_name: 公司名称industry\_category: 行业分类publish\_date: 发布日期report\_period: 报告期section\_type: 章节类型financial\_indicators: 财务指标
3. 复合索引优化策略
# 常用查询组合的复合索引
composite
\_indexes = [ ["company\_name", "doc\_type", "publish\_date"],
["industry\_category", "report\_period"],
["doc\_type", "section\_type", "confidence\_score"],
["publish\_date", "source\_reliability"]
]
性能优化机制
1. 预过滤策略
- 通过metadata预过滤减少向量计算量
- 智能查询路由到相关分区
- 缓存热点查询结果
2. 查询优化流程
适用场景分析
推荐使用统一集合的场景 :
✅ 需要跨文档类型进行综合检索
✅ 文档类型相对固定且metadata结构相似
✅ 希望简化系统架构和维护成本
✅ 业务逻辑需要灵活的文档分类和过滤
✅ 团队规模较小,需要降低运维复杂度
考虑分离集合的场景 :
❌ 不同文档类型的向量维度差异很大
❌ 各文档类型的访问模式完全不同
❌ 需要独立的索引和优化策略
❌ 数据量极大且性能要求极高
❌ 有严格的数据隔离要求
设计决策总结
我们选择统一集合设计的核心原因:
- 业务需求匹配 :金融文档检索经常需要跨类型查询
- 架构简化 :减少系统复杂度,提高可维护性
- 性能平衡 :通过分区和索引策略保证查询性能
- 扩展性考虑 :便于添加新的文档类型和业务逻辑
- 成本效益 :降低开发和运维成本
这种设计通过统一集合+智能分区+丰富metadata 的方案,既保持了架构的简洁性,又实现了高效的查询性能,是针对金融文档RAG系统的最优选择。
金融文档结构分析与设计策略
金融文档具有复杂的层次结构和多样的内容组织方式。正确理解和处理这些结构对RAG系统的检索效果至关重要。
主要金融文档类型结构分析
1. 年度/季度财务报告结构
2. 审计报告结构
3. 债券募集说明书结构
财务三大表统一处理策略
财务三大表(资产负债表、利润表、现金流量表)在不同文档中重复出现,需要统一的处理策略。
财务报表处理流程架构
数据结构设计层次
1. 标准化数据结构设计
以下代码实现了财务报表的标准化数据结构,通过枚举类型确保数据一致性,通过数据类简化元数据管理:
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
classFinancialStatementType(Enum):
"""财务报表类型"""
BALANCE\_SHEET = "balance\_sheet" # 资产负债表
INCOME\_STATEMENT = "income\_statement" # 利润表
CASH\_FLOW = "cash\_flow" # 现金流量表
EQUITY\_CHANGE = "equity\_change" # 所有者权益变动表
classReportScope(Enum):
"""报表范围"""
CONSOLIDATED = "consolidated"# 合并报表
PARENT\_ONLY = "parent\_only" # 母公司报表
@dataclass
classFinancialStatementMetadata:
"""财务报表元数据"""
statement\_type: FinancialStatementType
report\_scope: ReportScope
period\_end\_date: str
period\_type: str # 年报/半年报/季报
currency: str = "CNY"
unit: str = "元"# 元/千元/万元/百万元
# 来源文档信息
source\_document\_type: str # 年报/季报/募集说明书/审计报告
source\_page: Optional[int] = None
source\_section: Optional[str] = None
# 主体信息
entity\_name: str
entity\_code: str
entity\_type: str # 发行人/担保人/子公司
# 数据质量
is\_audited: bool = False
auditor\_name: Optional[str] = None
audit\_opinion: Optional[str] = None
classFinancialStatementProcessor:
"""财务报表处理器"""
def\_\_init\_\_(self):
self.statement\_templates = self.\_load\_statement\_templates()
self.account\_mapping = self.\_load\_account\_mapping()
defextract\_financial\_statements(self, text: str, doc\_metadata: Dict) -> List[Dict]:
"""提取财务报表数据"""
statements = []
# 1. 识别报表类型和位置
statement\_sections = self.\_identify\_statement\_sections(text)
for section in statement\_sections:
# 2. 提取报表元数据
stmt\_metadata = self.\_extract\_statement\_metadata(section, doc\_metadata)
# 3. 提取财务数据
financial\_data = self.\_extract\_financial\_data(section, stmt\_metadata)
# 4. 标准化科目名称
normalized\_data = self.\_normalize\_account\_names(financial\_data)
# 5. 构建完整的报表记录
statement\_record = {
"metadata": stmt\_metadata,
"financial\_data": normalized\_data,
"raw\_text": section["text"],
"embedding\_chunks": self.\_create\_embedding\_chunks(section, stmt\_metadata)
}
statements.append(statement\_record)
return statements
def\_identify\_statement\_sections(self, text: str) -> List[Dict]:
"""识别财务报表章节"""
sections = []
# 资产负债表识别模式
balance\_sheet\_patterns = [
r"(合并)?资产负债表.*?(?=利润表|现金流量表|所有者权益|$)",
r"(合并)?资产负债表.*?(?=\n\s*\n.*?表|$)"
]
# 利润表识别模式
income\_patterns = [
r"(合并)?利润表.*?(?=现金流量表|资产负债表|所有者权益|$)",
r"(合并)?利润及利润分配表.*?(?=现金流量表|资产负债表|$)"
]
# 现金流量表识别模式
cashflow\_patterns = [
r"(合并)?现金流量表.*?(?=资产负债表|利润表|所有者权益|$)",
r"(合并)?现金流量表.*?(?=\n\s*\n.*?表|$)"
]
# 执行模式匹配
for pattern\_type, patterns in [
("balance\_sheet", balance\_sheet\_patterns),
("income\_statement", income\_patterns),
("cash\_flow", cashflow\_patterns)
]:
for pattern in patterns:
matches = re.finditer(pattern, text, re.DOTALL | re.IGNORECASE)
for match in matches:
sections.append({
"type": pattern\_type,
"text": match.group(),
"start\_pos": match.start(),
"end\_pos": match.end()
})
return sections
def\_normalize\_account\_names(self, financial\_data: Dict) -> Dict:
"""标准化会计科目名称"""
normalized = {}
for account, value in financial\_data.items():
# 使用预定义的科目映射表
standard\_name = self.account\_mapping.get(account, account)
normalized[standard\_name] = value
return normalized
def\_create\_embedding\_chunks(self, section: Dict, metadata: FinancialStatementMetadata) -> List[Dict]:
"""为财务报表创建向量化分块"""
chunks = []
# 按报表项目分块
text\_lines = section["text"].split("\n")
current\_chunk = []
for line in text\_lines:
if self.\_is\_account\_line(line):
if current\_chunk:
# 完成当前分块
chunk\_text = "\n".join(current\_chunk)
chunks.append({
"text": chunk\_text,
"metadata": {
"statement\_type": metadata.statement\_type.value,
"report\_scope": metadata.report\_scope.value,
"entity\_name": metadata.entity\_name,
"period\_end\_date": metadata.period\_end\_date,
"chunk\_type": "financial\_statement\_item"
}
})
current\_chunk = []
current\_chunk.append(line)
# 处理最后一个分块
if current\_chunk:
chunk\_text = "\n".join(current\_chunk)
chunks.append({
"text": chunk\_text,
"metadata": {
"statement\_type": metadata.statement\_type.value,
"report\_scope": metadata.report\_scope.value,
"entity\_name": metadata.entity\_name,
"period\_end\_date": metadata.period\_end\_date,
"chunk\_type": "financial\_statement\_item"
}
})
return chunks
多主体关联设计策略
债券募集说明书中包含多个主体(发行人、担保人、子公司等),需要建立有效的关联机制。
多主体关联架构设计
主体关系数据模型
1. 主体关系建模
以下代码实现了多主体信息的提取和关联,通过枚举类型定义主体类型和关系类型,确保数据的一致性和完整性:
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional
classEntityType(Enum):
"""主体类型"""
ISSUER = "issuer" # 发行人
GUARANTOR = "guarantor" # 担保人
CONTROLLING\_SHAREHOLDER = "controlling\_shareholder"# 控股股东
ACTUAL\_CONTROLLER = "actual\_controller"# 实际控制人
SUBSIDIARY = "subsidiary" # 子公司
ASSOCIATE = "associate" # 联营公司
JOINT\_VENTURE = "joint\_venture" # 合营公司
classRelationshipType(Enum):
"""关系类型"""
PARENT\_SUBSIDIARY = "parent\_subsidiary" # 母子公司
GUARANTEE = "guarantee" # 担保关系
CONTROL = "control" # 控制关系
SIGNIFICANT
\_
INFLUENCE
= "
significant
\_
influence
"# 重大影响
RELATED\_PARTY = "related\_party" # 关联方
@dataclass
classEntityInfo:
"""主体信息"""
entity\_id: str
entity\_name: str
entity\_code: Optional[str]
entity\_type: EntityType
# 基本信息
legal\_
represent
ative: Optional[str] = None
registered\_capital: Optional[str] = None
business\_scope: Optional[str] = None
registration\_address: Optional[str] = None
# 在文档中的位置信息
mention\_positions: List[Dict] = None# 在文档中被提及的位置
detail\_sections: List[Dict] = None # 详细信息所在章节
def\_\_post\_init\_\_(self):
if self.mention\_positions isNone:
self.mention\_positions = []
if self.detail\_sections isNone:
self.detail\_sections = []
@dataclass
classEntityRelationship:
"""主体关系"""
from\_entity\_id: str
to\_entity\_id: str
relationship\_type: RelationshipType
relationship\_description: str
# 关系强度和距离
relationship\_strength:
float
= 1.0# 0-1之间,1表示最强关系
text\_distance: int = 0# 在文档中的文本距离
# 关系的有效性
is\_active: bool = True
effective\_date: Optional[str] = None
expiry\_date: Optional[str] = None
classMultiEntityProcessor:
"""多主体处理器"""
def\_\_init\_\_(self):
self.entity\_patterns = self.\_load\_entity\_patterns()
self.relationship\_patterns = self.\_load\_relationship\_patterns()
defextract\_entities\_and\_relationships(self, text: str, doc\_metadata: Dict) -> Dict:
"""提取主体和关系信息"""
result = {
"entities": [],
"relationships": [],
"entity\_chunks": []
}
# 1. 识别所有主体
entities = self.\_identify\_entities(text, doc\_metadata)
# 2. 建立主体关系
relationships = self.\_extract\_relationships(text, entities)
# 3. 创建主体相关的文本分块
entity\_chunks = self.\_create\_entity\_chunks(text, entities, relationships)
result["entities"] = entities
result["relationships"] = relationships
result["entity\_chunks"] = entity\_chunks
return result
def\_identify\_entities(self, text: str, doc\_metadata: Dict) -> List[EntityInfo]:
"""识别文档中的所有主体"""
entities = []
# 发行人识别
issuer\_patterns = [
r"发行人[::](.*?)(?=\n|,|。)",
r"本公司[::]?(.*?)(?=\n|,|。)",
r"(.*?)(?:股份)?(?:有限)?公司(?=作为发行人|发行)"
]
# 担保人识别
guarantor\_patterns = [
r"担保人[::](.*?)(?=\n|,|。)",
r"保证人[::](.*?)(?=\n|,|。)",
r"(.*?)(?:股份)?(?:有限)?公司(?=提供担保|作为担保人)"
]
# 子公司识别
subsidiary\_patterns = [
r"子公司[::](.*?)(?=\n|,|。)",
r"控股子公司[::](.*?)(?=\n|,|。)",
r"全资子公司[::](.*?)(?=\n|,|。)"
]
# 执行实体识别
for entity\_type, patterns in [
(EntityType.ISSUER, issuer\_patterns),
(EntityType.GUARANTOR, guarantor\_patterns),
(EntityType.SUBSIDIARY, subsidiary\_patterns)
]:
for pattern in patterns:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
entity\_name = match.group(1).strip() if match.groups() else match.group().strip()
entity = EntityInfo(
entity\_id=f"{entity\_type.value}\_{len(entities)}",
entity\_name=entity\_name,
entity\_type=entity\_type
)
# 记录在文档中的位置
entity.mention\_positions.append({
"start\_pos": match.start(),
"end\_pos": match.end(),
"context": text[max(0, match.start()-100):match.end()+100]
})
entities.append(entity)
return entities
def\_extract\_relationships(self, text: str, entities: List[EntityInfo]) -> List[EntityRelationship]:
"""提取主体间关系"""
relationships = []
# 担保关系识别
guarantee\_patterns = [
r"(.*?)(?:为|对)(.*?)(?:提供担保|承担保证责任)",
r"(.*?)(?:担保|保证)(.*?)(?:债券|债务)"
]
# 控制关系识别
control\_patterns = [
r"(.*?)(?:控股|持有)(.*?)(?:%|股份|股权)",
r"(.*?)(?:为|是)(.*?)(?:的)?(?:控股股东|实际控制人)"
]
for pattern\_type, patterns in [
(RelationshipType.GUARANTEE, guarantee\_patterns),
(RelationshipType.CONTROL, control\_patterns)
]:
for pattern in patterns:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
# 尝试匹配到具体的主体
from\_entity = self.\_match\_entity\_by\_name(match.group(1), entities)
to\_entity = self.\_match\_entity\_by\_name(match.group(2), entities)
if from\_entity and to\_entity:
relationship = EntityRelationship(
from\_entity\_id=from\_entity.entity\_id,
to\_entity\_id=to\_entity.entity\_id,
relationship\_type=pattern\_type,
relationship\_description=match.group(),
text\_distance=abs(match.start() - match.end())
)
relationships.append(relationship)
return relationships
def\_create\_entity\_chunks(self, text: str, entities: List[EntityInfo],
relationships: List[EntityRelationship]) -> List[Dict]:
"""创建主体相关的文本分块"""
chunks = []
for entity in entities:
# 为每个主体创建专门的分块
entity\_sections = self.\_find\_entity\_sections(text, entity)
for section in entity\_sections:
# 计算与其他主体的关联强度
related\_entities = self.\_find\_related\_entities\_in\_section(
section, entity, entities, relationships
)
chunk = {
"text": section["text"],
"metadata": {
"primary\_entity\_id": entity.entity\_id,
"primary\_entity\_name": entity.entity\_name,
"primary\_entity\_type": entity.entity\_type.value,
"related\_entities": related\_entities,
"section\_type": section["section\_type"],
"chunk\_type": "entity\_information"
},
"embedding": None# 将在后续步骤中生成
}
chunks.append(chunk)
return chunks
def\_find\_entity\_sections(self, text: str, entity: EntityInfo) -> List[Dict]:
"""查找主体相关的文档章节"""
sections = []
# 基于主体名称查找相关章节
entity\_name\_pattern = re.escape(entity.entity\_name)
# 查找包含主体名称的段落
paragraphs = text.split("\n\n")
for i, paragraph in enumerate(paragraphs):
if re.search(entity\_name\_pattern, paragraph, re.IGNORECASE):
# 扩展上下文
start\_idx = max(0, i - 1)
end\_idx = min(len(paragraphs), i + 2)
extended\_text = "\n\n".join(paragraphs[start\_idx:end\_idx])
section = {
"text": extended\_text,
"section\_type": self.\_classify\_section\_type(extended\_text),
"paragraph\_index": i,
"relevance\_score": self.\_calculate\_relevance\_score(paragraph, entity)
}
sections.append(section)
return sections
def\_calculate\_relevance\_score(self, text: str, entity: EntityInfo) ->
float
:
"""计算文本与主体的相关性分数"""
score = 0.0
# 基于主体名称出现频率
entity\_mentions = len(re.findall(re.escape(entity.entity\_name), text, re.IGNORECASE))
score += entity\_mentions * 0.3
# 基于主体类型相关关键词
type\_keywords = {
EntityType.ISSUER: ["发行", "募集", "债券", "资金"],
EntityType.GUARANTOR: ["担保", "保证", "责任", "承担"],
EntityType.SUBSIDIARY: ["子公司", "控股", "投资", "股权"]
}
keywords = type\_keywords.get(entity.entity\_type, [])
for
keyword
in keywords:
if
keyword
in text:
score += 0.2
return min(score, 1.0)
距离关联解决方案
针对主体和数据距离较远的问题,采用以下策略。
距离关联解决方案架构
上下文扩展策略
1. 上下文窗口扩展
以下代码实现了上下文窗口扩展策略,通过扩大分块的上下文范围来捕获距离较远但相关的信息:
classContextualChunkingStrategy:
"""上下文分块策略"""
def\_\_init\_\_(self, window\_size: int = 3):
self.window\_size = window\_size
defcreate\_contextual\_chunks(self, text: str, entities: List[EntityInfo]) -> List[Dict]:
"""创建包含上下文的分块"""
chunks = []
paragraphs = text.split("\n\n")
for entity in entities:
entity\_paragraphs = self.\_find\_entity\_paragraphs(paragraphs, entity)
for para\_idx in entity\_paragraphs:
# 扩展上下文窗口
start\_idx = max(0, para\_idx - self.window\_size)
end\_idx = min(len(paragraphs), para\_idx + self.window\_size + 1)
context\_text = "\n\n".join(paragraphs[start\_idx:end\_idx])
chunk = {
"text": context\_text,
"primary\_entity": entity.entity\_name,
"context\_range": (start\_idx, end\_idx),
"focus\_paragraph": para\_idx
}
chunks.append(chunk)
return chunks
2. 引用链追踪
classReferenceTracker:
"""引用链追踪器"""
deftrack\_entity\_references(self, text: str, entities: List[EntityInfo]) -> Dict:
"""追踪主体引用链"""
reference\_chains = {}
for entity in entities:
chains = []
# 查找直接引用
direct\_refs = self.\_find\_direct\_references(text, entity)
# 查找间接引用(代词、简称等)
indirect\_refs = self.\_find\_indirect\_references(text, entity, direct\_refs)
# 构建引用链
all\_refs = direct\_refs + indirect\_refs
all\_refs.sort(key=lambda x: x["position"])
reference\_chains[entity.entity\_id] = all\_refs
return reference\_chains
def\_find\_indirect\_references(self, text: str, entity: EntityInfo,
direct\_refs: List[Dict]) -> List[Dict]:
"""查找间接引用"""
indirect\_refs = []
# 代词引用模式
pronoun\_patterns = ["本公司", "该公司", "发行人", "担保人", "公司"]
for pattern in pronoun\_patterns:
matches = re.finditer(pattern, text)
for match in matches:
# 检查是否在主体提及的上下文中
if self.\_is\_in\_entity\_context(match.start(), direct\_refs):
indirect\_refs.append({
"text": match.group(),
"position": match.start(),
"type": "pronoun\_reference",
"confidence": 0.7
})
return indirect\_refs
3. 语义关联增强
classSemantic
Association
Enhancer:
"""语义关联增强器"""
defenhance\_entity\_
association
s(self, chunks: List[Dict],
entities: List[EntityInfo]) -> List[Dict]:
"""增强主体关联"""
enhanced\_chunks = []
for chunk in chunks:
# 为每个分块添加语义关联信息
association
s = self.\_calculate\_semantic\_associations(
chunk["text"], entities
)
enhanced\_chunk = chunk.copy()
enhanced\_chunk["semantic\_associations"] = associations
enhanced\_chunk["association\_summary"] = self.\_create\_
association
\_summary(
association
s)
enhanced\_chunks.append(enhanced\_chunk)
return enhanced\_chunks
def\_calculate\_semantic\_
association
s(self, text: str,
entities: List[EntityInfo]) -> Dict:
"""计算语义关联度"""
associations = {}
for entity in entities:
# 计算文本与主体的语义相似度
similarity\_score = self.\_calculate\_semantic\_similarity(text, entity)
# 计算关键词匹配度
keyword\_score = self.\_calculate\_keyword\_relevance(text, entity)
# 综合评分
final\_score = (similarity\_score * 0.6 + keyword\_score * 0.4)
if final\_score > 0.3: # 阈值过滤
associations[entity.entity\_id] = {
"entity\_name": entity.entity\_name,
"entity\_type": entity.entity\_type.value,
"relevance\_score": final\_score,
"association\_type": self.\_classify\_association\_type(text, entity)
}
return
association
s
这套设计方案通过结构化的文档分析、标准化的数据处理、智能的主体关联和上下文增强,有效解决了金融文档中复杂结构和远距离关联的问题。
表格数据索引构建与查询优化
金融文档中包含大量的表格数据(如财务报表、数据统计表等),需要专门的索引策略和查询优化方案来提升检索效率。
表格数据结构化处理
表格处理流程架构
表格数据模型设计
1. 表格识别与解析
以下代码实现了表格的自动识别和结构化解析,通过模式匹配和结构分析来提取表格数据:
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from enum import Enum
import pandas as pd
import re
classTableType(Enum):
"""表格类型"""
FINANCIAL\_STATEMENT = "financial\_statement" # 财务报表
DATA\_STATISTICS = "data\_statistics" # 数据统计表
COMPARISON\_TABLE = "comparison\_table" # 对比表
SCHEDULE\_TABLE = "schedule\_table" # 明细表
RATIO\_ANALYSIS = "ratio\_analysis" # 比率分析表
CASH\_FLOW\_DETAIL = "cash\_flow\_detail" # 现金流明细
classTableStructure(Enum):
"""表格结构类型"""
SIMPLE\_GRID = "simple\_grid" # 简单网格
HIERARCHICAL = "hierarchical" # 层次结构
MULTI\_HEADER = "multi\_header" # 多级表头
PIVOT\_TABLE = "pivot\_table" # 透视表
CROSS\_TAB = "cross\_tab" # 交叉表
@dataclass
classTableMetadata:
"""表格元数据"""
table\_id: str
table\_type: TableType
table\_structure: TableStructure
title: str
# 位置信息
page\_number: Optional[int] = None
section\_name: Optional[str] = None
start\_position: Optional[int] = None
end\_position: Optional[int] = None
# 表格属性
row\_count: int = 0
column\_count: int = 0
has\_header: bool = True
has\_index: bool = True
# 数据属性
period\_info: Optional[str] = None # 时间期间
currency: Optional[str] = None # 货币单位
unit: Optional[str] = None # 数值单位
# 关联信息
related\_entities: List[str] = None # 相关主体
related\_tables: List[str] = None # 相关表格
def\_\_post\_init\_\_(self):
if self.related\_entities isNone:
self.related\_entities = []
if self.related\_tables isNone:
self.related\_tables = []
classTableExtractor:
"""表格提取器"""
def\_\_init\_\_(self):
self.table\_patterns = self.\_load\_table\_patterns()
self.header\_patterns = self.\_load\_header\_patterns()
defextract\_tables\_from\_text(self, text: str, doc\_metadata: Dict) -> List[Dict]:
"""从文本中提取表格"""
tables = []
# 1. 识别表格边界
table\_boundaries = self.\_identify\_table\_boundaries(text)
for boundary in table\_boundaries:
# 2. 提取表格内容
table\_content = text[boundary['start']:boundary['end']]
# 3. 解析表格结构
parsed\_table = self.\_parse\_table\_structure(table\_content)
# 4. 生成表格元数据
table\_metadata = self.\_generate\_table\_metadata(
parsed\_table, boundary, doc\_metadata
)
# 5. 创建结构化表格对象
structured\_table = {
"metadata": table\_metadata,
"raw\_content": table\_content,
"structured\_data": parsed\_table,
"index\_data": self.\_create\_index\_data(parsed\_table, table\_metadata)
}
tables.append(structured\_table)
return tables
def\_identify\_table\_boundaries(self, text: str) -> List[Dict]:
"""识别表格边界"""
boundaries = []
# 表格标识模式
table\_start\_patterns = [
r"表\s*\d+[::].*?\n", # 表1:标题
r"(资产负债表|利润表|现金流量表).*?\n", # 财务报表标题
r"单位[::].*?\n.*?\n", # 单位说明
r"\|.*?\|.*?\|.*?\n" # 表格分隔符
]
# 表格结束模式
table\_end\_patterns = [
r"\n\s*\n(?!\s*\|)", # 空行后非表格内容
r"\n(?=表\s*\d+)", # 下一个表格开始
r"\n(?=[一二三四五六七八九十]+[、.])", # 章节开始
r"\n(?=\d+[、.])", # 编号列表
]
# 执行边界识别
for start\_pattern in table\_start\_patterns:
start\_matches = list(re.finditer(start\_pattern, text, re.MULTILINE))
for start\_match in start\_matches:
start\_pos = start\_match.start()
# 查找表格结束位置
end\_pos = len(text)
remaining\_text = text[start\_match.end():]
for end\_pattern in table\_end\_patterns:
end\_match = re.search(end\_pattern, remaining\_text)
if end\_match:
end\_pos = start\_match.end() + end\_match.start()
break
boundaries.append({
"start": start\_pos,
"end": end\_pos,
"confidence": self.\_calculate\_table\_confidence(
text[start\_pos:end\_pos]
)
})
# 按置信度排序并去重
boundaries = sorted(boundaries, key=lambda x: x['confidence'], reverse=True)
return self.\_remove\_overlapping\_boundaries(boundaries)
def\_parse\_table\_structure(self, table\_content: str) -> pd.DataFrame:
"""解析表格结构"""
lines = table\_content.strip().split('\n')
# 识别表头
header\_lines = self.\_identify\_header\_lines(lines)
data\_lines = lines[len(header\_lines):]
# 解析列结构
columns = self.\_parse\_columns(header\_lines)
# 解析数据行
data\_rows = []
for line in data\_lines:
if self.\_is\_data\_line(line):
row\_data = self.\_parse\_data\_row(line, columns)
if row\_data:
data\_rows.append(row\_data)
# 创建DataFrame
if data\_rows and columns:
df = pd.DataFrame(data\_rows, columns=columns)
return self.\_clean\_dataframe(df)
return pd.DataFrame()
def\_create\_index\_data(self, df: pd.DataFrame, metadata: TableMetadata) -> Dict:
"""创建索引数据"""
index\_data = {
"row\_index": {},
"column\_index": {},
"value\_index": {},
"semantic\_index": {}
}
if df.empty:
return index\_data
# 行索引:基于行标签或第一列
for idx, row in df.iterrows():
row\_key = str(row.iloc[0]) if len(row) > 0else str(idx)
index\_data["row\_index"][row\_key] = {
"row\_number": idx,
"row\_data": row.to\_dict(),
"row\_text": " ".join([str(v) for v in row.values if pd.notna(v)])
}
# 列索引:基于列名
for col in df.columns:
col\_values = df[col].dropna().tolist()
index\_data["column\_index"][str(col)] = {
"column\_name": str(col),
"data\_type": str(df[col].dtype),
"value\_count": len(col\_values),
"sample\_values": col\_values[:5],
"column\_text": " ".join([str(v) for v in col\_values[:10]])
}
# 数值索引:提取所有数值数据
numeric\_columns = df.select\_dtypes(include=['number']).columns
for col in numeric\_columns:
values = df[col].dropna()
ifnot values.empty:
index\_data["value\_index"][str(col)] = {
"min\_value":
float
(values.min()),
"max\_value": float(values.max()),
"mean\_value": float(values.mean()),
"value\_range": [float(values.min()), float(values.max())],
"sample\_values": values.head(10).tolist()
}
# 语义索引:基于表格内容的语义信息
index\_data["semantic\_index"] = {
"table\_summary": self.\_generate\_table\_summary(df, metadata),
"key\_metrics": self.\_extract\_key\_metrics(df, metadata),
"data\_patterns": self.\_identify\_data\_patterns(df),
"content\_keywords": self.\_extract\_content\_keywords(df)
}
return index\_data
多维索引构建策略
1. 复合索引设计
classTableIndexBuilder:
"""表格索引构建器"""
def\_\_init\_\_(self, vector\_store):
self.vector\_store = vector\_store
self.index\_strategies = {
"content\_index": self.\_build\_content\_index,
"structure\_index": self.\_build\_structure\_index,
"semantic\_index": self.\_build\_semantic\_index,
"numerical\_index": self.\_build\_numerical\_index,
"temporal\_index": self.\_build\_temporal\_index
}
defbuild\_
comprehensive
\_index(self, tables: List[Dict]) -> Dict:
"""构建综合索引"""
comprehensive
\_index = {
"content\_index": {},
"structure\_index": {},
"semantic\_index": {},
"numerical\_index": {},
"temporal\_index": {},
"cross\_reference\_index": {}
}
for table in tables:
table\_id = table["metadata"].table\_id
# 构建各类索引
for index\_type, builder\_func in self.index\_strategies.items():
index\_data = builder\_func(table)
comprehensive
\_index[index\_type][table\_id] = index\_data
# 构建交叉引用索引
comprehensive
\_index["cross\_reference\_index"] = self.\_build\_cross\_reference\_index(tables)
return
comprehensive
\_index
def\_build\_content\_index(self, table: Dict) -> Dict:
"""构建内容索引"""
df = table["structured\_data"]
metadata = table["metadata"]
content\_index = {
"cell\_content": {},
"row\_content": {},
"column\_content": {},
"full\_text": ""
}
# 单元格级别索引
for row\_idx, row in df.iterrows():
for col\_idx, value in enumerate(row):
if pd.notna(value):
cell\_key = f"{row\_idx}\_{col\_idx}"
content\_index["cell\_content"][cell\_key] = {
"value": str(value),
"row\_index": row\_idx,
"column\_index": col\_idx,
"column\_name": df.columns[col\_idx],
"data\_type": type(value).\_\_name\_\_
}
# 行级别索引
for row\_idx, row in df.iterrows():
row\_text = " ".join([str(v) for v in row.values if pd.notna(v)])
content\_index["row\_content"][str(row\_idx)] = {
"text": row\_text,
"values": row.to\_dict(),
"key\_value": str(row.iloc[0]) if len(row) > 0else""
}
# 列级别索引
for col in df.columns:
col\_text = " ".join([str(v) for v in df[col].dropna().values])
content\_index["column\_content"][str(col)] = {
"text": col\_text,
"header": str(col),
"data\_type": str(df[col].dtype),
"unique\_values": df[col].nunique()
}
# 全文索引
all\_text\_parts = []
all\_text\_parts.append(metadata.title)
all\_text\_parts.extend([str(col) for col in df.columns])
all\_text\_parts.extend([str(v) for v in df.values.flatten() if pd.notna(v)])
content\_index["full\_text"] = " ".join(all\_text\_parts)
return content\_index
def\_build\_numerical\_index(self, table: Dict) -> Dict:
"""构建数值索引"""
df = table["structured\_data"]
numerical\_index = {
"ranges": {},
"statistics": {},
"patterns": {},
"comparisons": {}
}
numeric\_columns = df.select\_dtypes(include=['number']).columns
for col in numeric\_columns:
values = df[col].dropna()
ifnot values.empty:
# 数值范围
numerical\_index["ranges"][str(col)] = {
"min": float(values.min()),
"max": float(values.max()),
"range": float(values.max() - values.min()),
"quartiles": values.quantile([0.25, 0.5, 0.75]).tolist()
}
# 统计信息
numerical\_index["statistics"][str(col)] = {
"mean": float(values.mean()),
"median": float(values.median()),
"std": float(values.std()),
"count": int(values.count()),
"sum": float(values.sum())
}
# 数据模式
numerical\_index["patterns"][str(col)] = {
"trend": self.\_analyze\_trend(values),
"seasonality": self.\_detect\_seasonality(values),
"outliers": self.\_detect\_outliers(values),
"growth\_rate": self.\_calculate\_growth\_rate(values)
}
# 列间比较
if len(numeric\_columns) > 1:
for i, col1 in enumerate(numeric\_columns):
for col2 in numeric\_columns[i+1:]:
correlation = df[col1].corr(df[col2])
ifnot pd.isna(correlation):
numerical\_index["comparisons"][f"{col1}\_vs\_{col2}"] = {
"correlation": float(correlation),
"ratio\_mean": float(df[col1].mean() / df[col2].mean()) if df[col2].mean() != 0elseNone
}
return numerical\_index
def\_build\_temporal\_index(self, table: Dict) -> Dict:
"""构建时间索引"""
df = table["structured\_data"]
metadata = table["metadata"]
temporal\_index = {
"time\_columns": {},
"period\_info": {},
"time\_series": {},
"temporal\_patterns": {}
}
# 识别时间列
time\_columns = self.\_identify\_time\_columns(df)
for col in time\_columns:
temporal\_index["time\_columns"][str(col)] = {
"column\_name": str(col),
"time\_format": self.\_detect\_time\_format(df[col]),
"time\_range": self.\_get\_time\_range(df[col]),
"frequency": self.\_detect\_frequency(df[col])
}
# 从元数据提取期间信息
if metadata.period\_info:
temporal\_index["period\_info"] = {
"period": metadata.period\_info,
"period\_type": self.\_classify\_period\_type(metadata.period\_info),
"fiscal\_year": self.\_extract\_fiscal\_year(metadata.period\_info)
}
return temporal\_index
智能查询引擎
1. 多模态查询处理
classTableQueryEngine:
"""表格查询引擎"""
def\_\_init\_\_(self, index\_data: Dict, vector\_store):
self.index\_data = index\_data
self.vector\_store = vector\_store
self.query\_processors = {
"semantic\_query": self.\_process\_semantic\_query,
"numerical\_query": self.\_process\_numerical\_query,
"structural\_query": self.\_process\_structural\_query,
"temporal\_query": self.\_process\_temporal\_query,
"comparative\_query": self.\_process\_comparative\_query
}
defexecute\_query(self, query: str, query\_type: str = "auto") -> Dict:
"""执行查询"""
# 自动识别查询类型
if query\_type == "auto":
query\_type = self.\_classify\_query\_type(query)
# 解析查询意图
query\_intent = self.\_parse\_query\_intent(query)
# 执行相应的查询处理
if query\_type in self.query\_processors:
results = self.query\_processors[query\_type](query, query\_intent)
else:
# 混合查询处理
results = self.\_process\_hybrid\_query(query, query\_intent)
# 后处理和排序
final\_results = self.\_post\_process\_results(results, query\_intent)
return {
"query": query,
"query\_type": query\_type,
"query\_intent": query\_intent,
"results": final\_results,
"execution\_info": {
"total\_matches": len(final\_results),
"processing\_time": self.\_get\_processing\_time(),
"confidence\_scores": [r.get("confidence", 0) for r in final\_results]
}
}
def\_process\_numerical\_query(self, query: str, intent: Dict) -> List[Dict]:
"""处理数值查询"""
results = []
# 提取数值条件
numerical\_conditions = self.\_extract\_numerical\_conditions(query)
for table\_id, num\_index in self.index\_data["numerical\_index"].items():
for condition in numerical\_conditions:
matches = self.\_match\_numerical\_condition(num\_index, condition)
for match in matches:
results.append({
"table\_id": table\_id,
"match\_type": "numerical",
"condition": condition,
"match\_details": match,
"confidence": self.\_calculate\_numerical\_confidence(match, condition)
})
return results
def\_process\_semantic\_query(self, query: str, intent: Dict) -> List[Dict]:
"""处理语义查询"""
results = []
# 向量相似度搜索
query\_embedding = self.vector\_store.embed\_query(query)
for table\_id, content\_index in self.index\_data["content\_index"].items():
# 全文匹配
full\_text = content\_index["full\_text"]
text\_embedding = self.vector\_store.embed\_query(full\_text)
similarity = self.\_calculate\_cosine\_similarity(query\_embedding, text\_embedding)
if similarity > 0.3: # 阈值过滤
results.append({
"table\_id": table\_id,
"match\_type": "semantic",
"similarity\_score": similarity,
"matched\_content": full\_text[:200],
"confidence": similarity
})
# 细粒度匹配(行、列、单元格)
for content\_type in ["row\_content", "column\_content", "cell\_content"]:
for key, content\_data in content\_index[content\_type].items():
if"text"in content\_data:
content\_embedding = self.vector\_store.embed\_query(content\_data["text"])
similarity = self.\_calculate\_cosine\_similarity(query\_embedding, content\_embedding)
if similarity > 0.4:
results.append({
"table\_id": table\_id,
"match\_type": f"semantic\_{content\_type}",
"content\_key": key,
"similarity\_score": similarity,
"matched\_content": content\_data["text"],
"confidence": similarity
})
return results
def\_process\_comparative\_query(self, query: str, intent: Dict) -> List[Dict]:
"""处理比较查询"""
results = []
# 识别比较对象和维度
comparison\_entities = self.\_extract\_comparison\_entities(query)
comparison\_metrics = self.\_extract\_comparison\_metrics(query)
# 跨表比较
for metric in comparison\_metrics:
metric\_data = {}
# 收集各表中的相关数据
for table\_id, num\_index in self.index\_data["numerical\_index"].items():
if metric in num\_index["statistics"]:
metric\_data[table\_id] = num\_index["statistics"][metric]
# 执行比较分析
if len(metric\_data) > 1:
comparison\_result = self.\_perform\_comparison\_analysis(metric\_data, metric)
results.append({
"match\_type": "comparative",
"metric": metric,
"comparison\_data": metric\_data,
"analysis\_result": comparison\_result,
"confidence": 0.8
})
return results
def\_extract\_numerical\_conditions(self, query: str) -> List[Dict]:
"""提取数值条件"""
conditions = []
# 数值比较模式
patterns = [
r"(大于|>)\s*(\d+(?:\.\d+)?)",
r"(小于|<)\s*(\d+(?:\.\d+)?)",
r"(等于|=)\s*(\d+(?:\.\d+)?)",
r"(\d+(?:\.\d+)?)\s*到\s*(\d+(?:\.\d+)?)",
r"(\d+(?:\.\d+)?)\s*-\s*(\d+(?:\.\d+)?)"
]
for pattern in patterns:
matches = re.finditer(pattern, query)
for match in matches:
if"到"in match.group() or"-"in match.group():
# 范围条件
values = re.findall(r"\d+(?:\.\d+)?", match.group())
if len(values) >= 2:
conditions.append({
"type": "range",
"min\_value": float(values[0]),
"max\_value":
float
(values[1]),
"original\_text": match.group()
})
else:
# 比较条件
operator = match.group(1)
value =
float
(match.group(2))
conditions.append({
"type": "comparison",
"operator": operator,
"value": value,
"original\_text": match.group()
})
return conditions
查询优化策略
1. 查询性能优化
2. 缓存策略实现
classQueryCacheManager:
"""查询缓存管理器"""
def\_\_init\_\_(self, cache\_size: int = 1000):
self.cache\_size = cache\_size
self.query\_cache = {}
self.access\_times = {}
self.cache\_stats = {
"hits": 0,
"misses": 0,
"evictions": 0
}
defget\_cached\_result(self, query\_hash: str) -> Optional[Dict]:
"""获取缓存结果"""
if query\_hash in self.query\_cache:
self.access\_times[query\_hash] = time.time()
self.cache\_stats["hits"] += 1
return self.query\_cache[query\_hash]
self.cache\_stats["misses"] += 1
returnNone
defcache\_result(self, query\_hash: str, result: Dict) -> None:
"""缓存查询结果"""
if len(self.query\_cache) >= self.cache\_size:
self.\_evict\_least\_recently\_used()
self.query\_cache[query\_hash] = result
self.access\_times[query\_hash] = time.time()
def\_evict\_least\_recently\_used(self) -> None:
"""淘汰最近最少使用的缓存"""
ifnot self.access\_times:
return
lru\_key = min(self.access\_times.items(), key=lambda x: x[1])[0]
del self.query\_cache[lru\_key]
del self.access\_times[lru\_key]
self.cache\_stats["evictions"] += 1
classOptimizedTableQueryEngine(TableQueryEngine):
"""优化的表格查询引擎"""
def\_\_init\_\_(self, index\_data: Dict, vector\_store):
super().\_\_init\_\_(index\_data, vector\_store)
self.cache\_manager = QueryCacheManager()
self.query\_optimizer = QueryOptimizer()
defexecute\_optimized\_query(self, query: str, query\_type: str = "auto") -> Dict:
"""执行优化查询"""
# 生成查询哈希
query\_hash = self.\_generate\_query\_hash(query, query\_type)
# 检查缓存
cached\_result = self.cache\_manager.get\_cached\_result(query\_hash)
if cached\_result:
return cached\_result
# 查询优化
optimized\_query = self.query\_optimizer.optimize\_query(query, self.index\_data)
# 执行查询
result = self.execute\_query(optimized\_query, query\_type)
# 缓存结果
self.cache\_manager.cache\_result(query\_hash, result)
return result
这套表格数据索引构建与查询优化方案通过多维索引 、智能查询引擎 、性能优化策略 和缓存机制 ,实现了对金融文档中表格数据的高效检索和精准查询。
文档预处理策略
针对财报、审计报告、债券募集说明书等金融文档的特殊处理:
import re
from typing import List, Dict, Any
from langchain.text\_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from pymilvus import Collection, connections, FieldSchema, CollectionSchema, DataType
classFinancialDocumentProcessor:
def\_\_init\_\_(self):
self.embeddings = OpenAIEmbeddings()
self.text\_splitter = RecursiveCharacterTextSplitter(
chunk\_size=1000,
chunk\_overlap=200,
separator
s=["\n\n", "\n", "。", ";", ",", " "]
)
defextract\_financial\_metadata(self, text: str, doc\_type: str) -> Dict[str, Any]:
"""提取金融文档的结构化元数据"""
metadata = {
"doc\_type": doc\_type,
"financial\_indicators": [],
"time\_period": None,
"company\_name": None,
"section\_type": None
}
# 提取财务指标
financial\_patterns = [
r"营业收入|净利润|总资产|净资产|资产负债率|毛利率|净资产收益率",
r"现金流量|经营活动|投资活动|筹资活动",
r"应收账款|存货|固定资产|无形资产"
]
for pattern in financial\_patterns:
matches = re.findall(pattern, text)
metadata["financial\_indicators"].extend(matches)
# 提取时间信息
time\_pattern = r"(\d{4})年|(\d{4})-(\d{2})-(\d{2})"
time\_matches = re.findall(time\_pattern, text)
if time\_matches:
metadata["time\_period"] = time\_matches[0]
# 提取公司名称
company\_pattern = r"([\u4e00-\u9fa5]+(?:股份)?(?:有限)?公司)"
company\_matches = re.findall(company\_pattern, text)
if company\_matches:
metadata["company\_name"] = company\_matches[0]
return metadata
defintelligent\_chunking(self, text: str, doc\_type: str) -> List[Dict[str, Any]]:
"""智能分块策略"""
chunks = []
if doc\_type == "财报":
# 按财务报表结构分块
sections = self.\_split\_by\_financial\_sections(text)
elif doc\_type == "审计报告":
# 按审计意见、管理建议等分块
sections = self.\_split\_by\_audit\_sections(text)
elif doc\_type == "债券募集说明书":
# 按募集资金用途、风险因素等分块
sections = self.\_split\_by\_bond\_sections(text)
else:
sections = [text]
for section in sections:
sub\_chunks = self.text\_splitter.split\_text(section)
for chunk in sub\_chunks:
metadata = self.extract\_financial\_metadata(chunk, doc\_type)
chunks.append({
"text": chunk,
"metadata": metadata,
"embedding": self.embeddings.embed\_query(chunk)
})
return chunks
def\_split\_by\_financial\_sections(self, text: str) -> List[str]:
"""按财务报表结构分割"""
section\_patterns = [
r"资产负债表.*?(?=利润表|现金流量表|$)",
r"利润表.*?(?=现金流量表|资产负债表|$)",
r"现金流量表.*?(?=资产负债表|利润表|$)",
r"财务报表附注.*?(?=$)"
]
sections = []
for pattern in section\_patterns:
matches = re.findall(pattern, text, re.DOTALL)
sections.extend(matches)
return sections if sections else [text]
Metadata辅助查询设计与实现
Metadata架构设计原则
Metadata提取与处理流程
Metadata查询架构设计
Metadata数据模型层次结构
增强的Milvus向量数据库配置
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Union, List, Dict, Any
import json
from datetime import datetime
classDocumentType(Enum):
"""文档类型枚举"""
ANNUAL\_REPORT = "annual\_report"
QUARTERLY\_REPORT = "quarterly\_report"
AUDIT\_REPORT = "audit\_report"
BOND\_PROSPECTUS = "bond\_prospectus"
ANNOUNCEMENT = "announcement"
RESEARCH\_REPORT = "research\_report"
REGULATORY\_FILING = "regulatory\_filing"
classIndustryCategory(Enum):
"""行业分类枚举"""
BANKING = "banking"
INSURANCE = "insurance"
SECURITIES = "securities"
REAL\_ESTATE = "real\_estate"
MANUFACTURING = "manufacturing"
TECHNOLOGY = "technology"
HEALTHCARE = "healthcare"
ENERGY = "energy"
@dataclass
classDocumentMetadata:
"""文档元数据结构"""
# 基础信息
document\_id: str
title: str
doc\_type: DocumentType
file\_path: str
file\_size: int
# 业务信息
company\_name: str
company\_code: Optional[str] = None
industry\_category: Optional[IndustryCategory] = None
market\_type: Optional[str] = None# 主板/创业板/科创板
# 时间信息
publish\_date: Optional[datetime] = None
report\_period: Optional[str] = None# 2023Q1, 2023年度
created\_at: datetime = None
updated\_at: Optional[datetime] = None
# 内容信息
section\_type: Optional[str] = None
page\_number: Optional[int] = None
paragraph\_index: Optional[int] = None
financial\_indicators: List[str] = None
# 质量信息
confidence\_score:
float
= 1.0
source\_reliability: str = "high"# high/medium/low
data\_version: str = "1.0"
# 地理信息
region: Optional[str] = None
country: str = "CN"
# 自定义标签
tags: List[str] = None
custom\_fields: Dict[str, Any] = None
def\_\_post\_init\_\_(self):
if self.created\_at isNone:
self.created\_at = datetime.now()
if self.financial\_indicators isNone:
self.financial\_indicators = []
if self.tags isNone:
self.tags = []
if self.custom\_fields isNone:
self.custom\_fields = {}
classEnhancedMilvusVectorStore:
def\_\_init\_\_(self, collection\_name: str = "financial\_docs\_enhanced"):
self.collection\_name = collection\_name
self.connect\_to\_milvus()
self.create\_enhanced\_collection()
self.create\_metadata\_indexes()
defconnect\_to\_milvus(self):
"""连接Milvus数据库"""
connections.connect(
alias="default",
host="localhost",
port="19530"
)
defcreate\_enhanced\_collection(self):
"""创建增强的集合schema"""
fields = [
# 主键和向量
FieldSchema(name="id", dtype=DataType.INT64, is\_primary=True, auto\_id=True),
FieldSchema(name="document\_id", dtype=DataType.VARCHAR, max\_length=100),
FieldSchema(name="text", dtype=DataType.VARCHAR, max\_length=65535),
FieldSchema(name="embedding", dtype=DataType.FLOAT\_VECTOR, dim=1536),
# 基础信息
FieldSchema(name="title", dtype=DataType.VARCHAR, max\_length=500),
FieldSchema(name="doc\_type", dtype=DataType.VARCHAR, max\_length=50),
FieldSchema(name="file\_path", dtype=DataType.VARCHAR, max\_length=1000),
FieldSchema(name="file\_size", dtype=DataType.INT64),
# 业务信息
FieldSchema(name="company\_name", dtype=DataType.VARCHAR, max\_length=200),
FieldSchema(name="company\_code", dtype=DataType.VARCHAR, max\_length=20),
FieldSchema(name="industry\_category", dtype=DataType.VARCHAR, max\_length=50),
FieldSchema(name="market\_type", dtype=DataType.VARCHAR, max\_length=20),
# 时间信息
FieldSchema(name="publish\_date", dtype=DataType.VARCHAR, max\_length=20),
FieldSchema(name="report\_period", dtype=DataType.VARCHAR, max\_length=20),
FieldSchema(name="created\_at", dtype=DataType.VARCHAR, max\_length=30),
FieldSchema(name="updated\_at", dtype=DataType.VARCHAR, max\_length=30),
# 内容信息
FieldSchema(name="section\_type", dtype=DataType.VARCHAR, max\_length=100),
FieldSchema(name="page\_number", dtype=DataType.INT32),
FieldSchema(name="paragraph\_index", dtype=DataType.INT32),
FieldSchema(name="financial\_indicators", dtype=DataType.VARCHAR, max\_length=2000),
# 质量信息
FieldSchema(name="confidence\_score", dtype=DataType.FLOAT),
FieldSchema(name="source\_reliability", dtype=DataType.VARCHAR, max\_length=20),
FieldSchema(name="data\_version", dtype=DataType.VARCHAR, max\_length=20),
# 地理信息
FieldSchema(name="region", dtype=DataType.VARCHAR, max\_length=50),
FieldSchema(name="country", dtype=DataType.VARCHAR, max\_length=10),
# 标签和自定义字段
FieldSchema(name="tags", dtype=DataType.VARCHAR, max\_length=1000),
FieldSchema(name="custom\_fields", dtype=DataType.VARCHAR, max\_length=5000)
]
schema = CollectionSchema(fields, "Enhanced financial documents collection with metadata")
self.collection = Collection(self.collection\_name, schema)
# 创建向量索引
vector\_index\_params = {
"metric\_type": "COSINE",
"index\_type": "IVF\_FLAT",
"params": {"nlist": 1024}
}
self.collection.create\_index("embedding", vector\_index\_params)
defcreate\_metadata\_indexes(self):
"""为metadata字段创建索引以提高查询性能"""
# 为常用查询字段创建索引
index\_fields = [
"document\_id", "doc\_type", "company\_name", "company\_code",
"industry\_category", "market\_type", "publish\_date", "report\_period",
"section\_type", "source\_reliability", "region", "country"
]
for field in index\_fields:
try:
# 创建标量索引
self.collection.create\_index(
field\_name=field,
index\_params={"index\_type": "TRIE"}
)
except
Exception as e:
print(f"Warning: Could not create
index
for {field}: {e}")
definsert\_documents\_with\_metadata(self, chunks: List[Dict[str, Any]]):
"""批量插入带有增强metadata的文档"""
entities = []
for chunk in chunks:
metadata = chunk.get("metadata", {})
# 处理时间字段
publish\_date = metadata.get("publish\_date")
if isinstance(publish\_date, datetime):
publish\_date = publish\_date.isoformat()[:10]
created\_at = metadata.get("created\_at")
if isinstance(created\_at, datetime):
created\_at = created\_at.isoformat()
updated\_at = metadata.get("updated\_at")
if isinstance(updated\_at, datetime):
updated\_at = updated\_at.isoformat()
entity = [
metadata.get("document\_id", ""),
chunk.get("text", ""),
chunk.get("embedding", []),
metadata.get("title", ""),
metadata.get("doc\_type", ""),
metadata.get("file\_path", ""),
metadata.get("file\_size", 0),
metadata.get("company\_name", ""),
metadata.get("company\_code", ""),
metadata.get("industry\_category", ""),
metadata.get("market\_type", ""),
publish\_date or"",
metadata.get("report\_period", ""),
created\_at or"",
updated\_at or"",
metadata.get("section\_type", ""),
metadata.get("page\_number", -1),
metadata.get("paragraph\_index", -1),
",".join(metadata.get("financial\_indicators", [])),
metadata.get("confidence\_score", 1.0),
metadata.get("source\_reliability", "high"),
metadata.get("data\_version", "1.0"),
metadata.get("region", ""),
metadata.get("country", "CN"),
",".join(metadata.get("tags", [])),
json.dumps(metadata.get("custom\_fields", {}), ensure\_ascii=False)
]
entities.append(entity)
self.collection.insert(entities)
self.collection.flush()
print(f"Inserted {len(entities)} documents with enhanced metadata")
Metadata辅助查询实现
classMetadataQueryBuilder:
"""Metadata查询构建器"""
def\_\_init\_\_(self, vector\_store: EnhancedMilvusVectorStore):
self.vector\_store = vector\_store
self.filters = []
self.vector\_query = None
self.limit = 10
self.output\_fields = ["*"]
deffilter\_by\_company(self, company\_names: Union[str, List[str]]) -> 'MetadataQueryBuilder':
"""按公司名称过滤"""
if isinstance(company\_names, str):
company\_names = [company\_names]
company\_filter = f"company\_name in {company\_names}"
self.filters.append(company\_filter)
return self
deffilter\_by\_doc\_type(self, doc\_types: Union[str, List[str]]) -> 'MetadataQueryBuilder':
"""按文档类型过滤"""
if isinstance(doc\_types, str):
doc\_types = [doc\_types]
doc\_type\_filter = f"doc\_type in {doc\_types}"
self.filters.append(doc\_type\_filter)
return self
deffilter\_by\_industry(self, industries: Union[str, List[str]]) -> 'MetadataQueryBuilder':
"""按行业分类过滤"""
if isinstance(industries, str):
industries = [industries]
industry\_filter = f"industry\_category in {industries}"
self.filters.append(industry\_filter)
return self
deffilter\_by\_date\_range(self, start\_date: str, end\_date: str) -> 'MetadataQueryBuilder':
"""按日期范围过滤"""
date\_filter = f"publish\_date >= '{start\_date}' and publish\_date <= '{end\_date}'"
self.filters.append(date\_filter)
return self
deffilter\_by\_report\_period(self, periods: Union[str, List[str]]) -> 'MetadataQueryBuilder':
"""按报告期过滤"""
if isinstance(periods, str):
periods = [periods]
period\_filter = f"report\_period in {periods}"
self.filters.append(period\_filter)
return self
deffilter\_by\_section(self, sections: Union[str, List[str]]) -> 'MetadataQueryBuilder':
"""按章节类型过滤"""
if isinstance(sections, str):
sections = [sections]
section\_filter = f"section\_type in {sections}"
self.filters.append(section\_filter)
return self
deffilter\_by\_confidence(self, min\_confidence: float) -> 'MetadataQueryBuilder':
"""按置信度过滤"""
confidence\_filter = f"confidence\_score >= {min\_confidence}"
self.filters.append(confidence\_filter)
return self
deffilter\_by\_tags(self, tags: Union[str, List[str]], match\_all: bool = False) -> 'MetadataQueryBuilder':
"""按标签过滤"""
if isinstance(tags, str):
tags = [tags]
if match\_all:
# 所有标签都必须匹配
tag\_filters = [f"tags like '%{tag}%'"for tag in tags]
tag\_filter = " and ".join(tag\_filters)
else:
# 任一标签匹配即可
tag\_filters = [f"tags like '%{tag}%'"for tag in tags]
tag\_filter = " or ".join(tag\_filters)
self.filters.append(f"({tag\_filter})")
return self
deffilter\_by\_custom\_field(self, field\_name: str, field\_value: Any) -> 'MetadataQueryBuilder':
"""按自定义字段过滤"""
custom\_filter = f"custom\_fields like '%\"{field\_name}\":\"{field\_value}\"%'"
self.filters.append(custom\_filter)
return self
defwith\_vector\_similarity(self, query\_vector: List[float], top\_k: int = 10) -> 'MetadataQueryBuilder':
"""添加向量相似度查询"""
self.vector\_query = query\_vector
self.limit = top\_k
return self
defselect\_fields(self, fields: List[str]) -> 'MetadataQueryBuilder':
"""选择输出字段"""
self.output\_fields = fields
return self
defexecute(self) -> List[Dict[str, Any]]:
"""执行查询"""
self.vector\_store.collection.load()
# 构建过滤表达式
filter\_expr = " and ".join(self.filters) if self.filters elseNone
if self.vector\_query:
# 向量相似度查询 + metadata过滤
search\_params = {
"metric\_type": "COSINE",
"params": {"nprobe": 10}
}
results = self.vector\_store.collection.search(
data=[self.vector\_query],
anns\_field="embedding",
param=search\_params,
limit=self.limit,
expr=filter\_expr,
output\_fields=self.output\_fields
)
# 格式化结果
formatted\_results = []
for hit in results[0]:
result = {
"id": hit.id,
"score": hit.score,
"distance": hit.distance
}
# 添加所有输出字段
for field in self.output\_fields:
if field != "*"and hasattr(hit.entity, field):
result[field] = hit.entity.get(field)
elif field == "*":
# 获取所有字段
for entity\_field in hit.entity.fields:
result[entity\_field] = hit.entity.get(entity\_field)
formatted\_results.append(result)
return formatted\_results
else:
# 纯metadata查询
ifnot filter\_expr:
raise ValueError("Must provide either vector query or metadata filters")
results = self.vector\_store.collection.query(
expr=filter\_expr,
output\_fields=self.output\_fields,
limit=self.limit
)
return results
classSmartMetadataExtractor:
"""智能metadata提取器"""
def\_\_init\_\_(self, llm\_client):
self.llm\_client = llm\_client
defextract\_
comprehensive
\_metadata(self, text: str, doc\_info: Dict[str, Any]) -> DocumentMetadata:
"""提取全面的metadata"""
# 基础信息
document\_id = doc\_info.get("document\_id", self.\_generate\_document\_id())
title = doc\_info.get("title", self.\_extract\_title(text))
doc\_type = self.\_classify\_document\_type(text, doc\_info)
# 使用LLM提取业务信息
business\_info = self.\_extract\_business\_info(text)
# 提取时间信息
time\_info = self.\_extract\_time\_info(text)
# 提取财务指标
financial\_indicators = self.\_extract\_financial\_indicators(text)
# 生成标签
tags = self.\_generate\_tags(text, business\_info)
# 计算置信度
confidence\_score = self.\_calculate\_confidence\_score(text, business\_info)
return DocumentMetadata(
document\_id=document\_id,
title=title,
doc\_type=doc\_type,
file\_path=doc\_info.get("file\_path", ""),
file\_size=doc\_info.get("file\_size", 0),
company\_name=business\_info.get("company\_name", ""),
company\_code=business\_info.get("company\_code"),
industry\_category=business\_info.get("industry\_category"),
market\_type=business\_info.get("market\_type"),
publish\_date=time\_info.get("publish\_date"),
report\_period=time\_info.get("report\_period"),
section\_type=self.\_identify\_section\_type(text),
financial\_indicators=financial\_indicators,
confidence\_score=confidence\_score,
source\_reliability=self.\_assess\_source\_reliability(doc\_info),
region=business\_info.get("region"),
tags=tags,
custom\_fields=doc\_info.get("custom\_fields", {})
)
def\_extract\_business\_info(self, text: str) -> Dict[str, Any]:
"""使用LLM提取业务信息"""
prompt = f"""
从以下文本中提取业务信息,返回JSON格式:
文本:{text[:1000]}...
请提取:
1. company\_name: 公司名称
2. company\_code: 股票代码
3. industry\_category: 行业分类(banking/insurance/securities/real\_estate/manufacturing/technology/healthcare/energy)
4. market\_type: 市场类型(主板/创业板/科创板/新三板)
5. region: 地区
返回JSON格式,如果信息不存在则返回null。
"""
try:
response
= self.llm\_client.generate(prompt)
return json.loads(
response
)
except
:
return {}
def\_extract\_time\_info(self, text: str) -> Dict[str, Any]:
"""提取时间信息"""
import re
time\_info = {}
# 提取发布日期
date\_patterns = [
r"(\d{4})年(\d{1,2})月(\d{1,2})日",
r"(\d{4})-(\d{2})-(\d{2})",
r"(\d{4})\.(\d{2})\.(\d{2})"
]
for pattern in date\_patterns:
matches = re.findall(pattern, text)
if matches:
year, month, day = matches[0]
time\_info["publish\_date"] = datetime(int(year), int(month), int(day))
break
# 提取报告期
period\_patterns = [
r"(\d{4})年度",
r"(\d{4})年第([一二三四])季度",
r"(\d{4})Q([1-4])"
]
for pattern in period\_patterns:
matches = re.findall(pattern, text)
if matches:
if"年度"in pattern:
time\_info["report\_period"] = f"{matches[0]}年度"
else:
year, quarter = matches[0]
time\_info["report\_period"] = f"{year}Q{quarter}"
break
return time\_info
def\_extract\_financial\_indicators(self, text: str) -> List[str]:
"""提取财务指标"""
indicators = [
"营业收入", "净利润", "总资产", "净资产", "资产负债率",
"毛利率", "净资产收益率", "每股收益", "现金流量",
"应收账款", "存货", "固定资产", "无形资产", "短期借款",
"长期借款", "股东权益", "营业成本", "管理费用", "财务费用"
]
found\_indicators = []
for indicator in indicators:
if indicator in text:
found\_indicators.append(indicator)
return found\_indicators
def\_generate\_tags(self, text: str, business\_info: Dict[str, Any]) -> List[str]:
"""生成标签"""
tags = []
# 基于行业的标签
industry = business\_info.get("industry\_category")
if industry:
tags.append(f"行业\_{industry}")
# 基于内容的标签
if"风险"in text:
tags.append("风险分析")
if"财务"in text:
tags.append("财务信息")
if"审计"in text:
tags.append("审计报告")
if"投资"in text:
tags.append("投资相关")
return tags
def\_calculate\_confidence\_score(self, text: str, business\_info: Dict[str, Any]) -> float:
"""计算置信度分数"""
score = 0.5# 基础分数
# 基于文本长度
if len(text) > 500:
score += 0.2
# 基于提取信息的完整性
if business\_info.get("company\_name"):
score += 0.1
if business\_info.get("company\_code"):
score += 0.1
if business\_info.get("industry\_category"):
score += 0.1
return min(1.0, score)
def\_generate\_document\_id(self) -> str:
"""生成文档ID"""
import uuid
return str(uuid.uuid4())
def\_extract\_title(self, text: str) -> str:
"""提取标题"""
lines = text.split('\n')
for line in lines[:5]: # 检查前5行
if len(line.strip()) > 10and len(line.strip()) < 100:
return line.strip()
return text[:50] + "..."
def\_classify\_document\_type(self, text: str, doc\_info: Dict[str, Any]) -> DocumentType:
"""分类文档类型"""
text\_lower = text.lower()
if"年报"in text or"annual report"in text\_lower:
return DocumentType.ANNUAL\_REPORT
elif"季报"in text or"quarterly"in text\_lower:
return DocumentType.QUARTERLY\_REPORT
elif"审计"in text or"audit"in text\_lower:
return DocumentType.AUDIT\_REPORT
elif"债券"in text or"bond"in text\_lower:
return DocumentType.BOND\_PROSPECTUS
elif"公告"in text or"announcement"in text\_lower:
return DocumentType.ANNOUNCEMENT
else:
return DocumentType.RESEARCH\_REPORT
def\_identify\_section\_type(self, text: str) -> str:
"""识别章节类型"""
section\_keywords = {
"财务状况": ["资产负债表", "财务状况", "资产", "负债"],
"经营成果": ["利润表", "收入", "利润", "经营成果"],
"现金流量": ["现金流量表", "现金流", "经营活动", "投资活动", "筹资活动"],
"风险因素": ["风险", "不确定性", "风险因素"],
"管理层讨论": ["管理层讨论", "MD&A", "经营分析"],
"审计意见": ["审计意见", "审计报告", "注册会计师"]
}
for section, keywords in section\_keywords.items():
if any(
keyword
in text for keyword in keywords):
return section
return"其他"
def\_assess\_source\_reliability(self, doc\_info: Dict[str, Any]) -> str:
"""评估来源可靠性"""
file\_path = doc\_info.get("file\_path", "")
if"official"in file\_path.lower() or"年报"in file\_path:
return"high"
elif"research"in file\_path.lower():
return"medium"
else:
return"low"
Metadata辅助查询实际应用示例
classMetadataAssistedRAG:
"""基于Metadata辅助的RAG系统"""
def\_\_init\_\_(self, vector\_store: EnhancedMilvusVectorStore, llm\_client, embedding\_client):
self.vector\_store = vector\_store
self.llm\_client = llm\_client
self.embedding\_client = embedding\_client
self.metadata\_extractor = SmartMetadataExtractor(llm\_client)
defsmart\_query(self, user\_query: str, query\_context: Dict[str, Any] = None) -> Dict[str, Any]:
"""智能查询,自动识别查询意图并应用相应的metadata过滤"""
# 1. 分析查询意图
query\_intent = self.\_analyze\_query\_intent(user\_query)
# 2. 生成查询向量
query\_vector = self.embedding\_client.embed\_query(user\_query)
# 3. 构建metadata查询
query\_builder = MetadataQueryBuilder(self.vector\_store)
query\_builder = self.\_apply\_intent\_filters(query\_builder, query\_intent, query\_context)
# 4. 执行混合查询
results = query\_builder.with\_vector\_similarity(query\_vector, top\_k=20).execute()
# 5. 后处理和重排序
processed\_results = self.\_post\_process\_results(results, user\_query, query\_intent)
return {
"query": user\_query,
"intent": query\_intent,
"results": processed\_results[:10],
"metadata\_filters\_applied": query\_builder.filters,
"total\_matches": len(results)
}
def\_analyze\_query\_intent(self, query: str) -> Dict[str, Any]:
"""分析查询意图"""
intent\_prompt = f"""
分析以下查询的意图,提取关键信息:
查询:{query}
请识别:
1. 目标公司(如果有)
2. 时间范围(如果有)
3. 文档类型偏好(年报/季报/审计报告等)
4. 行业类别(如果有)
5. 财务指标关注点
6. 查询类型(对比分析/趋势分析/风险评估/财务分析等)
返回JSON格式。
"""
try:
response
= self.llm\_client.generate(intent\_prompt)
return json.loads(response)
except
:
return {}
def\_apply\_intent\_filters(self, query\_builder: MetadataQueryBuilder,
intent: Dict[str, Any], context: Dict[str, Any] = None) -> MetadataQueryBuilder:
"""根据查询意图应用metadata过滤器"""
# 公司过滤
if intent.get("target\_companies"):
query\_builder.filter\_by\_company(intent["target\_companies"])
# 时间过滤
if intent.get("time\_range"):
time\_range = intent["time\_range"]
if"start\_date"in time\_range and"end\_date"in time\_range:
query\_builder.filter\_by\_date\_range(time\_range["start\_date"], time\_range["end\_date"])
# 文档类型过滤
if intent.get("document\_types"):
query\_builder.filter\_by\_doc\_type(intent["document\_types"])
# 行业过滤
if intent.get("industries"):
query\_builder.filter\_by\_industry(intent["industries"])
# 报告期过滤
if intent.get("report\_periods"):
query\_builder.filter\_by\_report\_period(intent["report\_periods"])
# 章节类型过滤
if intent.get("section\_preferences"):
query\_builder.filter\_by\_section(intent["section\_preferences"])
# 置信度过滤
min\_confidence = context.get("min\_confidence", 0.7) if context else0.7
query\_builder.filter\_by\_confidence(min\_confidence)
# 标签过滤
if intent.get("tags"):
query\_builder.filter\_by\_tags(intent["tags"])
return query\_builder
def\_post\_process\_results(self, results: List[Dict[str, Any]],
query: str, intent: Dict[str, Any]) -> List[Dict[str, Any]]:
"""后处理和重排序结果"""
# 1. 基于查询意图的相关性评分
for result in results:
result["relevance\_score"] = self.\_calculate\_relevance\_score(result, query, intent)
# 2. 去重(基于document\_id和section\_type)
unique\_results = self.\_deduplicate\_results(results)
# 3. 重排序
sorted\_results = sorted(unique\_results,
key=lambda x: (x["relevance\_score"], x["score"]),
reverse=True)
return sorted\_results
def\_calculate\_relevance\_score(self, result: Dict[str, Any],
query: str, intent: Dict[str, Any]) -> float:
"""计算相关性评分"""
score = result.get("score", 0.0)
# 基于文档类型的加权
doc\_type = result.get("doc\_type", "")
if intent.get("document\_types") and doc\_type in intent["document\_types"]:
score += 0.1
# 基于时间新近性的加权
publish\_date = result.get("publish\_date", "")
if publish\_date:
try:
pub\_date = datetime.fromisoformat(publish\_date)
days\_ago = (datetime.now() - pub\_date).days
if days\_ago < 365: # 一年内的文档
score += 0.05
except:
pass
# 基于置信度的加权
confidence = result.get("confidence\_score", 0.0)
score += confidence * 0.1
return score
def\_deduplicate\_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""去重结果"""
seen = set()
unique\_results = []
for result in results:
key = (result.get("document\_id", ""), result.get("section\_type", ""))
if key notin seen:
seen.add(key)
unique\_results.append(result)
return unique\_results
# 使用示例
defdemonstrate\_metadata\_assisted\_queries():
"""演示metadata辅助查询的各种场景"""
# 初始化系统
vector\_store = EnhancedMilvusVectorStore()
rag\_system = MetadataAssistedRAG(vector\_store, llm\_client, embedding\_client)
# 场景1:特定公司的财务分析
print("=== 场景1:特定公司财务分析 ===")
query1 = "分析腾讯控股2023年的盈利能力"
result1 = rag\_system.smart\_query(query1)
print(f"查询:{result1['query']}")
print(f"应用的过滤器:{result1['metadata\_filters\_applied']}")
print(f"匹配结果数:{result1['total\_matches']}")
# 场景2:行业对比分析
print("\n=== 场景2:行业对比分析 ===")
query2 = "比较银行业和保险业的风险管理策略"
context2 = {"min\_confidence": 0.8}
result2 = rag\_system.smart\_query(query2, context2)
# 场景3:时间序列分析
print("\n=== 场景3:时间序列分析 ===")
query3 = "分析2022-2023年科技行业的研发投入趋势"
result3 = rag\_system.smart\_query(query3)
# 场景4:特定文档类型查询
print("\n=== 场景4:审计报告专项查询 ===")
query4 = "查找最近的审计意见中提到的重大风险"
result4 = rag\_system.smart\_query(query4)
return [result1, result2, result3, result4]
# 高级查询示例
defadvanced\_metadata\_queries():
"""高级metadata查询示例"""
vector\_store = EnhancedMilvusVectorStore()
query\_builder = MetadataQueryBuilder(vector\_store)
# 复杂组合查询1:特定行业、时间范围、高置信度
print("=== 复杂查询1:银行业2023年高质量财务数据 ===")
results1 = (query\_builder
.filter\_by\_industry(["banking"])
.filter\_by\_date\_range("2023-01-01", "2023-12-31")
.filter\_by\_confidence(0.9)
.filter\_by\_doc\_type(["annual\_report", "quarterly\_report"])
.select\_fields(["company\_name", "title", "publish\_date", "confidence\_score"])
.execute())
# 复杂组合查询2:多公司对比
print("\n=== 复杂查询2:BATJ公司对比分析 ===")
query\_builder = MetadataQueryBuilder(vector\_store) # 重新初始化
results2 = (query\_builder
.filter\_by\_company(["百度", "阿里巴巴", "腾讯", "京东"])
.filter\_by\_section(["财务状况", "经营成果"])
.filter\_by\_tags(["财务信息"], match\_all=True)
.execute())
# 复杂组合查询3:风险相关文档
print("\n=== 复杂查询3:风险管理专项分析 ===")
query\_builder = MetadataQueryBuilder(vector\_store) # 重新初始化
results3 = (query\_builder
.filter\_by\_tags(["风险分析", "风险管理"], match\_all=False)
.filter\_by\_section(["风险因素", "管理层讨论"])
.filter\_by\_confidence(0.8)
.filter\_by\_custom\_field("risk\_level", "high")
.execute())
return results1, results2, results3
Metadata查询性能优化策略
classMetadataQueryOptimizer:
"""Metadata查询性能优化器"""
def\_\_init\_\_(self, vector\_store: EnhancedMilvusVectorStore):
self.vector\_store = vector\_store
self.query\_cache = {}
self.metadata\_cache = {}
self.performance\_stats = {
"cache\_hits": 0,
"cache\_misses": 0,
"avg\_query\_time": 0.0
}
defoptimize\_collection\_structure(self):
"""优化集合结构"""
# 1. 创建复合索引
self.\_create\_
composite
\_indexes()
# 2. 设置分区策略
self.\_setup\_
partition
ing()
# 3. 优化内存配置
self.\_optimize\_memory\_settings()
def\_create\_
composite
\_indexes(self):
"""创建复合索引以提高多字段查询性能"""
# 常用查询组合的复合索引
composite\_indexes = [
["company\_name", "doc\_type", "publish\_date"],
["industry\_category", "report\_period"],
["doc\_type", "section\_type", "confidence\_score"],
["publish\_date", "source\_reliability"]
]
for fields in composite\_indexes:
try:
# Milvus中的复合索引实现
index\_name = "\_".join(fields)
print(f"Creating
composite
index
: {index\_name}")
# 注意:实际的复合索引创建需要根据Milvus版本调整
except
Exception as e:
print(f"Failed to create
composite
index
for {fields}: {e}")
def\_setup\_partitioning(self):
"""设置分区策略"""
# 基于时间的分区策略
time\_partitions = [
"2020\_2021", "2022", "2023", "2024"
]
# 基于文档类型的分区策略
doc\_type\_
partition
s = [
"annual\_reports", "quarterly\_reports",
"audit\_reports", "announcements"
]
print("Setting up
partition
ing strategy...")
# 实际分区创建逻辑
def\_optimize\_memory\_settings(self):
"""优化内存设置"""
# 设置合适的缓存大小
cache\_config = {
"cache.cache\_size": "2GB",
"cache.insert\_buffer\_size": "256MB",
"cache.preload\_collection": True
}
print("Optimizing memory settings...")
# 应用内存配置
defexecute\_optimized\_query(self, query\_builder: MetadataQueryBuilder) -> List[Dict[str, Any]]:
"""执行优化的查询"""
import time
import hashlib
start\_time = time.time()
# 1. 生成查询缓存键
query\_key = self.\_generate\_cache\_key(query\_builder)
# 2. 检查缓存
if query\_key in self.query\_cache:
self.performance\_stats["cache\_hits"] += 1
return self.query\_cache[query\_key]
# 3. 执行查询
self.performance\_stats["cache\_misses"] += 1
results = query\_builder.execute()
# 4. 缓存结果
self.query\_cache[query\_key] = results
# 5. 更新性能统计
query\_time = time.time() - start\_time
self.\_update\_performance\_stats(query\_time)
return results
def\_generate\_cache\_key(self, query\_builder: MetadataQueryBuilder) -> str:
"""生成查询缓存键"""
cache\_data = {
"filters": sorted(query\_builder.filters),
"limit": query\_builder.limit,
"output\_fields": sorted(query\_builder.output\_fields),
"has\_vector\_query": query\_builder.vector\_query isnotNone
}
cache\_str = json.dumps(cache\_data, sort\_keys=True)
return hashlib.md5(cache\_str.encode()).hexdigest()
def\_update\_performance\_stats(self, query\_time: float):
"""更新性能统计"""
total\_queries = self.performance\_stats["cache\_hits"] + self.performance\_stats["cache\_misses"]
if total\_queries == 1:
self.performance\_stats["avg\_query\_time"] = query\_time
else:
# 计算移动平均
current\_avg = self.performance\_stats["avg\_query\_time"]
self.performance\_stats["avg\_query\_time"] = (
(current\_avg * (total\_queries - 1) + query\_time) / total\_queries
)
defget\_performance\_report(self) -> Dict[str, Any]:
"""获取性能报告"""
total\_queries = self.performance\_stats["cache\_hits"] + self.performance\_stats["cache\_misses"]
cache\_hit\_rate = self.performance\_stats["cache\_hits"] / total\_queries if total\_queries > 0else0
return {
"total\_queries": total\_queries,
"cache\_hit\_rate": f"{cache\_hit\_rate:.2%}",
"avg\_query\_time": f"{self.performance\_stats['avg\_query\_time']:.3f}s",
"cache\_hits": self.performance\_stats["cache\_hits"],
"cache\_misses": self.performance\_stats["cache\_misses"]
}
defclear\_cache(self):
"""清空缓存"""
self.query\_cache.clear()
self.metadata\_cache.clear()
print("Cache cleared")
