本文将详细介绍如何在金融、证券领域构建智能Agent系统,实现复杂问题的自动化任务分解、依赖管理和并行执行。通过大模型、意图识别、工具使用的协同配合,为用户提供高效、准确的金融数据分析和决策支持。
👆👆👆欢迎关注,一起进步👆👆👆
代码以逻辑为主,并非完整可运行,其中的 RAG检索 和 NL2SQL 可以为独立的系统。因个人知识有限,难免会出现错误,欢迎批评指正哈,文章略长,建议先收藏,如果喜欢,请多多转发,谢谢😊
- 系统架构概览
1.1 整体架构设计
1.2 核心组件说明
1.2.1 意图识别模块
- 功能 :识别用户查询的业务意图和数据需求
- 输入 :自然语言查询(如"分析平安银行2023年ROE变化趋势")
- 输出 :结构化意图信息(查询类型、目标实体、时间范围、指标类型等)
1.2.2 任务分解器
- 功能 :将复杂金融问题分解为可执行的子任务
- 策略 :基于金融业务场景的专业分解模式
- 输出 :子任务列表及其执行要求
1.2.3 依赖关系分析器
- 功能 :分析子任务间的数据依赖和逻辑依赖
- 输出 :任务依赖图和执行约束
1.2.4 执行引擎
- 并行执行 :独立子任务同时执行,提高效率
- 串行执行 :有依赖关系的任务按序执行,保证正确性
- 大模型与Agent协同架构
2.1 大模型在金融Agent中的核心作用
2.1.1 大模型的多层次应用
1. 理解层(Understanding Layer)
- 语义理解 :解析复杂金融术语和业务逻辑
- 意图识别 :识别用户的真实需求和查询目标
- 上下文感知 :理解对话历史和业务背景
2. 推理层(Reasoning Layer)
- 逻辑推理 :基于金融知识进行逻辑推断
- 因果分析 :分析金融指标间的因果关系
- 趋势预测 :基于历史数据预测未来趋势
3. 生成层(Generation Layer)
- 代码生成 :自动生成SQL查询和数据处理代码
- 报告生成 :生成专业的金融分析报告
- 解释生成 :为分析结果提供可理解的解释
classFinancialLLMEngine:
def\_\_init\_\_(self):
self.llm\_model = self.\_initialize\_llm()
self.financial\_knowledge\_base = FinancialKnowledgeBase()
self.prompt\_templates = FinancialPromptTemplates()
defunderstand\_query(self, user\_input: str, context: dict = None) -> dict:
"""大模型理解用户查询"""
# 构建理解提示词
understanding\_prompt = self.prompt\_templates.get\_understanding\_prompt(
user\_input=user\_input,
context=context,
financial\_context=self.financial\_knowledge\_base.get\_relevant\_context(user\_input)
)
# 大模型推理
understanding\_result = self.llm\_model.generate(
prompt=understanding\_prompt,
max\_tokens=1000,
temperature=0.1
)
return self.\_parse\_understanding\_result(understanding\_result)
defgenerate\_task\_plan(self, intent\_analysis: dict) -> List[dict]:
"""大模型生成任务执行计划"""
planning\_prompt = self.prompt\_templates.get\_planning\_prompt(
intent=intent\_analysis["primary\_intent"],
entities=intent\_analysis["entities"],
complexity=intent\_analysis["complexity"],
available\_tools=self.\_get\_available\_tools()
)
plan\_result = self.llm\_model.generate(
prompt=planning\_prompt,
max\_tokens=2000,
temperature=0.2
)
return self.\_parse\_task\_plan(plan\_result)
defgenerate\_financial\_analysis(self, data: dict, analysis\_type: str) -> str:
"""大模型生成金融分析"""
analysis\_prompt = self.prompt\_templates.get\_analysis\_prompt(
data=data,
analysis\_type=analysis\_type,
financial\_principles=self.financial\_knowledge\_base.get\_analysis\_principles(analysis\_type)
)
analysis\_result = self.llm\_model.generate(
prompt=analysis\_prompt,
max\_tokens=3000,
temperature=0.3
)
return analysis\_result
2.2 Agent架构设计模式
2.2.1 多Agent协作架构
2.2.2 Agent实现框架
from abc import ABC, abstractmethod
from typing import Dict, List, Any
import asyncio
classBaseFinancialAgent(ABC):
"""金融Agent基类"""
def\_\_init\_\_(self, agent\_id: str, llm\_engine: FinancialLLMEngine):
self.agent\_id = agent\_id
self.llm\_engine = llm\_engine
self.state = "idle"
self.memory = AgentMemory()
self.tools = {}
@abstractmethod
asyncdefprocess(self, input\_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理输入数据"""
pass
@abstractmethod
defget\_capabilities(self) -> List[str]:
"""获取Agent能力列表"""
pass
defupdate\_memory(self, key: str, value: Any):
"""更新Agent记忆"""
self.memory.update(key, value)
defget\_memory(self, key: str) -> Any:
"""获取Agent记忆"""
return self.memory.get(key)
classIntentRecognitionAgent(BaseFinancialAgent):
"""意图识别Agent"""
def\_\_init\_\_(self, agent\_id: str, llm\_engine: FinancialLLMEngine):
super().\_\_init\_\_(agent\_id, llm\_engine)
self.intent\_classifier = FinancialIntentClassifier()
asyncdefprocess(self, input\_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理意图识别"""
user\_query = input\_data.get("user\_query", "")
context = input\_data.get("context", {})
# 使用大模型进行深度理解
llm\_understanding = await self.llm\_engine.understand\_query(user\_query, context)
# 结合规则分类器
rule\_classification = self.intent\_classifier.classify\_intent(user\_query)
# 融合结果
final\_intent = self.\_merge\_intent\_results(llm\_understanding, rule\_classification)
# 更新记忆
self.update\_memory("last\_intent", final\_intent)
return {
"intent\_result": final\_intent,
"confidence": final\_intent.get("confidence", 0.0),
"next\_agent": "task\_decomposition"
}
defget\_capabilities(self) -> List[str]:
return ["intent\_recognition", "entity\_extraction", "context\_understanding"]
def\_merge\_intent\_results(self, llm\_result: dict, rule\_result: dict) -> dict:
"""融合大模型和规则的识别结果"""
# 实现融合逻辑
merged\_result = {
"primary\_intent": llm\_result.get("intent", rule\_result.get("primary\_intent")),
"entities": {**rule\_result.get("entities", {}), **llm\_result.get("entities", {})},
"confidence": max(llm\_result.get("confidence", 0), rule\_result.get("confidence", 0)),
"complexity": llm\_result.get("complexity", rule\_result.get("complexity", "medium")),
"reasoning": llm\_result.get("reasoning", "")
}
return merged\_result
classTaskDecompositionAgent(BaseFinancialAgent):
"""任务分解Agent"""
def\_\_init\_\_(self, agent\_id: str, llm\_engine: FinancialLLMEngine):
super().\_\_init\_\_(agent\_id, llm\_engine)
self.decomposer = FinancialTaskDecomposer()
asyncdefprocess(self, input\_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理任务分解"""
intent\_result = input\_data.get("intent\_result", {})
# 使用大模型生成任务计划
llm\_plan = await self.llm\_engine.generate\_task\_plan(intent\_result)
# 结合规则分解器优化
rule\_subtasks = self.decomposer.decompose\_complex\_query(
intent\_result.get("original\_query", ""),
intent\_result
)
# 融合和优化任务计划
optimized\_plan = self.\_optimize\_task\_plan(llm\_plan, rule\_subtasks)
# 依赖关系分析
dependency\_analysis = self.\_analyze\_dependencies(optimized\_plan)
return {
"task\_plan": optimized\_plan,
"dependency\_analysis": dependency\_analysis,
"execution\_strategy": self.\_determine\_execution\_strategy(dependency\_analysis),
"next\_agent": "data\_extraction"
}
defget\_capabilities(self) -> List[str]:
return ["task\_decomposition", "dependency\_analysis", "execution\_planning"]
classMasterCoordinatorAgent(BaseFinancialAgent):
"""主控协调Agent"""
def\_\_init\_\_(self, agent\_id: str, llm\_engine: FinancialLLMEngine):
super().\_\_init\_\_(agent\_id, llm\_engine)
self.agents = {}
self.execution\_queue = asyncio.Queue()
self.results\_store = {}
defregister\_agent(self, agent: BaseFinancialAgent):
"""注册子Agent"""
self.agents[agent.agent\_id] = agent
asyncdefprocess(self, input\_data: Dict[str, Any]) -> Dict[str, Any]:
"""协调整个处理流程"""
user\_query = input\_data.get("user\_query", "")
# 1. 意图识别
intent\_result = await self.agents["intent\_recognition"].process({
"user\_query": user\_query,
"context": input\_data.get("context", {})
})
# 2. 任务分解
decomposition\_result = await self.agents["task\_decomposition"].process({
"intent\_result": intent\_result["intent\_result"],
"original\_query": user\_query
})
# 3. 执行任务计划
execution\_results = await self.\_execute\_task\_plan(
decomposition\_result["task\_plan"],
decomposition\_result["execution\_strategy"]
)
# 4. 结果整合
final\_result = await self.\_integrate\_results(execution\_results, intent\_result["intent\_result"])
return final\_result
asyncdef\_execute\_task\_plan(self, task\_plan: List[dict], strategy: dict) -> dict:
"""执行任务计划"""
results = {}
if strategy["type"] == "parallel":
# 并行执行
tasks = []
for task in task\_plan:
ifnot task.get("dependencies"):
tasks.append(self.\_execute\_single\_task(task))
parallel\_results = await asyncio.gather(*tasks)
for i, result in enumerate(parallel\_results):
results[task\_plan[i]["task\_id"]] = result
elif strategy["type"] == "sequential":
# 串行执行
for task in task\_plan:
result = await self.\_execute\_single\_task(task)
results[task["task\_id"]] = result
return results
asyncdef\_execute\_single\_task(self, task: dict) -> dict:
"""执行单个任务"""
task\_type = task.get("task\_type")
if task\_type == "data\_retrieval":
returnawait self.agents["data\_extraction"].process(task)
elif task\_type == "calculation":
returnawait self.agents["calculation\_analysis"].process(task)
elif task\_type == "report\_generation":
returnawait self.agents["report\_generation"].process(task)
return {"status": "unknown\_task\_type", "task\_id": task.get("task\_id")}
defget\_capabilities(self) -> List[str]:
return ["coordination", "workflow\_management", "result\_integration"]
2.3 大模型Prompt工程
2.3.1 金融领域专用Prompt模板
classFinancialPromptTemplates:
"""金融领域Prompt模板库"""
def\_\_init\_\_(self):
self.templates = {
"understanding": self.\_get\_understanding\_template(),
"planning": self.\_get\_planning\_template(),
"analysis": self.\_get\_analysis\_template(),
"sql\_generation": self.\_get\_sql\_template(),
"report\_generation": self.\_get\_report\_template()
}
def\_get\_understanding\_template(self) -> str:
return"""
你是一个专业的金融分析师和AI助手,具备深厚的金融知识和数据分析能力。
用户查询:{user\_input}
上下文信息:{context}
金融背景:{financial\_context}
请分析用户的查询意图,包括:
1. 主要意图类型(指标查询/对比分析/筛选过滤/计算分析/报告生成)
2. 涉及的金融实体(公司、指标、时间等)
3. 查询的复杂程度
4. 需要的数据源类型
5. 预期的输出格式
请以JSON格式返回分析结果:
{{
"intent": "主要意图类型",
"entities": {{
"companies": ["公司列表"],
"metrics": ["指标列表"],
"time\_period": "时间范围",
"other": {{}}
}},
"complexity": "simple/medium/complex",
"data\_sources": ["需要的数据源"],
"output\_format": "预期输出格式",
"confidence": 0.95,
"reasoning": "分析推理过程"
}}
"""
def\_get\_planning\_template(self) -> str:
return"""
作为金融数据分析专家,请为以下查询制定详细的执行计划。
查询意图:{intent}
实体信息:{entities}
复杂程度:{complexity}
可用工具:{available\_tools}
请将复杂查询分解为具体的子任务,每个子任务应该:
1. 有明确的目标和输出
2. 指定需要使用的工具
3. 明确数据依赖关系
4. 设置优先级
请以JSON格式返回任务计划:
{{
"tasks": [
{{
"task\_id": "唯一标识",
"description": "任务描述",
"task\_type": "任务类型",
"tool\_required": "需要的工具",
"data\_sources": ["数据源列表"],
"expected\_output": "预期输出",
"dependencies": ["依赖的任务ID"],
"priority": "high/medium/low",
"estimated\_time": "预估时间(秒)"
}}
],
"execution\_strategy": "parallel/sequential/hybrid",
"total\_estimated\_time": "总预估时间",
"risk\_factors": ["潜在风险"]
}}
"""
def\_get\_analysis\_template(self) -> str:
return"""
作为资深金融分析师,请对以下数据进行专业分析。
分析类型:{analysis\_type}
数据内容:{data}
分析原则:{financial\_principles}
请提供:
1. 数据概况总结
2. 关键指标分析
3. 趋势变化解读
4. 风险因素识别
5. 投资建议(如适用)
分析要求:
- 使用专业的金融术语
- 提供量化的分析结果
- 给出明确的结论和建议
- 注明分析的局限性
请以结构化的方式组织分析报告。
"""
2.3.2 Agent记忆与学习机制
classAgentMemory:
"""Agent记忆系统"""
def\_\_init\_\_(self):
self.short\_term\_memory = {} # 当前会话记忆
self.long\_term\_memory = {} # 持久化记忆
self.episodic\_memory = [] # 历史交互记录
self.semantic\_memory = {} # 知识库记忆
defupdate\_short\_term(self, key: str, value: Any):
"""更新短期记忆"""
self.short\_term\_memory[key] = {
"value": value,
"timestamp": datetime.now(),
"access\_count": self.short\_term\_memory.get(key, {}).get("access\_count", 0) + 1
}
defconsolidate\_to\_long\_term(self, threshold: int = 5):
"""将频繁访问的短期记忆转为长期记忆"""
for key, memory\_item in self.short\_term\_memory.items():
if memory\_item["access\_count"] >= threshold:
self.long\_term\_memory[key] = memory\_item
defadd\_episode(self, interaction: dict):
"""添加交互记录"""
episode = {
"timestamp": datetime.now(),
"user\_query": interaction.get("user\_query"),
"intent": interaction.get("intent"),
"task\_plan": interaction.get("task\_plan"),
"execution\_result": interaction.get("execution\_result"),
"user\_feedback": interaction.get("user\_feedback"),
"success\_rate": interaction.get("success\_rate")
}
self.episodic\_memory.append(episode)
# 保持记忆大小限制
if len(self.episodic\_memory) > 1000:
self.episodic\_memory = self.episodic\_memory[-1000:]
deflearn\_from\_feedback(self, feedback: dict):
"""从用户反馈中学习"""
if feedback.get("rating", 0) >= 4: # 高评分
# 强化成功模式
successful\_pattern = {
"query\_pattern": feedback.get("query\_pattern"),
"task\_decomposition": feedback.get("task\_decomposition"),
"tool\_selection": feedback.get("tool\_selection"),
"success\_score": feedback.get("rating")
}
self.semantic\_memory["successful\_patterns"] = \
self.semantic\_memory.get("successful\_patterns", []) + [successful\_pattern]
elif feedback.get("rating", 0) <= 2: # 低评分
# 记录失败模式
failure\_pattern = {
"query\_pattern": feedback.get("query\_pattern"),
"error\_type": feedback.get("error\_type"),
"improvement\_suggestion": feedback.get("suggestion")
}
self.semantic\_memory["failure\_patterns"] = \
self.semantic\_memory.get("failure\_patterns", []) + [failure\_pattern]
classAdaptiveLearningAgent(BaseFinancialAgent):
"""自适应学习Agent"""
def\_\_init\_\_(self, agent\_id: str, llm\_engine: FinancialLLMEngine):
super().\_\_init\_\_(agent\_id, llm\_engine)
self.learning\_rate = 0.1
self.performance\_history = []
defadapt\_strategy(self, performance\_metrics: dict):
"""根据性能指标调整策略"""
self.performance\_history.append(performance\_metrics)
# 分析性能趋势
if len(self.performance\_history) >= 10:
recent\_performance = self.performance\_history[-10:]
avg\_success\_rate = sum(p["success\_rate"] for p in recent\_performance) / 10
if avg\_success\_rate < 0.8: # 性能下降
self.\_adjust\_parameters("decrease\_complexity")
elif avg\_success\_rate > 0.95: # 性能优秀
self.\_adjust\_parameters("increase\_efficiency")
def\_adjust\_parameters(self, adjustment\_type: str):
"""调整Agent参数"""
if adjustment\_type == "decrease\_complexity":
# 降低任务分解复杂度
self.memory.update\_short\_term("max\_subtasks", 5)
self.memory.update\_short\_term("parallel\_threshold", 2)
elif adjustment\_type == "increase\_efficiency":
# 提高执行效率
self.memory.update\_short\_term("max\_subtasks", 10)
self.memory.update\_short\_term("parallel\_threshold", 4)
2.4 大模型与Agent的实际应用场景
2.4.1 智能投资研究助手
2.4.2 实际应用代码示例
classInvestmentResearchAgent(MasterCoordinatorAgent):
"""投资研究Agent"""
def\_\_init\_\_(self, llm\_engine: FinancialLLMEngine):
super().\_\_init\_\_("investment\_research", llm\_engine)
self.\_initialize\_specialized\_agents()
def\_initialize\_specialized\_agents(self):
"""初始化专业Agent"""
# 注册各种专业Agent
self.register\_agent(IntentRecognitionAgent("intent\_recognition", self.llm\_engine))
self.register\_agent(TaskDecompositionAgent("task\_decomposition", self.llm\_engine))
self.register\_agent(FinancialDataAgent("data\_extraction", self.llm\_engine))
self.register\_agent(FinancialAnalysisAgent("financial\_analysis", self.llm\_engine))
self.register\_agent(ReportGenerationAgent("report\_generation", self.llm\_engine))
asyncdefconduct\_investment\_research(self, research\_query: str) -> dict:
"""执行投资研究"""
# 记录开始时间
start\_time = time.time()
try:
# 执行完整的研究流程
result = await self.process({
"user\_query": research\_query,
"context": {
"research\_type": "investment\_analysis",
"output\_format": "comprehensive\_report",
"urgency": "normal"
}
})
# 计算执行时间
execution\_time = time.time() - start\_time
# 添加元数据
result["metadata"] = {
"execution\_time": execution\_time,
"agent\_version": "v2.0",
"llm\_model": self.llm\_engine.model\_name,
"timestamp": datetime.now().isoformat()
}
# 学习和优化
await self.\_learn\_from\_execution(research\_query, result)
return result
except Exception as e:
# 错误处理和恢复
returnawait self.\_handle\_research\_error(research\_query, str(e))
asyncdef\_learn\_from\_execution(self, query: str, result: dict):
"""从执行结果中学习"""
# 分析执行效果
performance\_metrics = {
"success\_rate": 1.0if result.get("status") == "success"else0.0,
"execution\_time": result.get("metadata", {}).get("execution\_time", 0),
"data\_quality": self.\_assess\_data\_quality(result),
"user\_satisfaction": result.get("user\_feedback", {}).get("rating", 3.0)
}
# 更新Agent记忆
for agent in self.agents.values():
if hasattr(agent, 'adapt\_strategy'):
agent.adapt\_strategy(performance\_metrics)
# 记录成功模式
if performance\_metrics["success\_rate"] > 0.8:
self.memory.add\_episode({
"user\_query": query,
"task\_plan": result.get("task\_plan"),
"execution\_result": result,
"success\_rate": performance\_metrics["success\_rate"]
})
classFinancialAnalysisAgent(BaseFinancialAgent):
"""金融分析专业Agent"""
def\_\_init\_\_(self, agent\_id: str, llm\_engine: FinancialLLMEngine):
super().\_\_init\_\_(agent\_id, llm\_engine)
self.analysis\_tools = {
"ratio\_analysis": RatioAnalysisTool(),
"trend\_analysis": TrendAnalysisTool(),
"peer\_comparison": PeerComparisonTool(),
"valuation\_analysis": ValuationAnalysisTool()
}
asyncdefprocess(self, input\_data: Dict[str, Any]) -> Dict[str, Any]:
"""执行金融分析"""
analysis\_type = input\_data.get("analysis\_type", "comprehensive")
financial\_data = input\_data.get("financial\_data", {})
# 选择合适的分析工具
selected\_tools = self.\_select\_analysis\_tools(analysis\_type, financial\_data)
# 执行分析
analysis\_results = {}
for tool\_name in selected\_tools:
tool = self.analysis\_tools[tool\_name]
result = await tool.analyze(financial\_data)
analysis\_results[tool\_name] = result
# 使用大模型整合分析结果
integrated\_analysis = await self.llm\_engine.generate\_financial\_analysis(
data=analysis\_results,
analysis\_type=analysis\_type
)
return {
"analysis\_results": analysis\_results,
"integrated\_analysis": integrated\_analysis,
"confidence\_score": self.\_calculate\_confidence(analysis\_results),
"recommendations": self.\_generate\_recommendations(analysis\_results)
}
def\_select\_analysis\_tools(self, analysis\_type: str, data: dict) -> List[str]:
"""智能选择分析工具"""
tool\_selection = {
"comprehensive": ["ratio\_analysis", "trend\_analysis", "peer\_comparison"],
"valuation": ["valuation\_analysis", "ratio\_analysis"],
"performance": ["ratio\_analysis", "trend\_analysis"],
"comparison": ["peer\_comparison", "ratio\_analysis"]
}
return tool\_selection.get(analysis\_type, ["ratio\_analysis"])
defget\_capabilities(self) -> List[str]:
return ["financial\_analysis", "ratio\_calculation", "trend\_analysis", "peer\_comparison"]
2.5 大模型在Agent系统中的核心价值
2.5.1 认知能力增强
大模型为Agent系统提供了强大的认知能力,主要体现在以下几个方面:
classCognitiveCapabilities:
"""大模型认知能力封装"""
def\_\_init\_\_(self, llm\_engine: FinancialLLMEngine):
self.llm\_engine = llm\_engine
self.knowledge\_base = FinancialKnowledgeBase()
asyncdefunderstand\_context(self, query: str, context: dict) -> dict:
"""深度理解上下文"""
# 构建理解提示
understanding\_prompt = f"""
作为金融领域专家,请深度分析以下查询:
用户查询:{query}
上下文信息:{json.dumps(context, ensure\_ascii=False, indent=2)}
请从以下维度进行分析:
1. 查询的核心意图和隐含需求
2. 涉及的金融概念和专业术语
3. 需要的数据类型和分析方法
4. 预期的输出格式和详细程度
5. 潜在的风险点和注意事项
请以JSON格式返回分析结果。
"""
response = await self.llm\_engine.generate(
prompt=understanding\_prompt,
temperature=0.1,
max\_tokens=1000
)
return json.loads(response)
asyncdefreason\_about\_task(self, task\_description: str, available\_tools: List[str]) -> dict:
"""任务推理和规划"""
reasoning\_prompt = f"""
任务描述:{task\_description}
可用工具:{', '.join(available\_tools)}
请进行以下推理:
6. 分析任务的复杂度和执行难度
7. 确定最优的执行策略(串行/并行)
8. 选择合适的工具组合
9. 预估执行时间和资源需求
10. 识别潜在的执行风险
返回详细的推理结果和执行建议。
"""
response = await self.llm\_engine.generate(
prompt=reasoning\_prompt,
temperature=0.2,
max\_tokens=800
)
return {"reasoning\_result": response, "confidence": 0.85}
asyncdefsynthesize\_information(self, data\_sources: List[dict]) -> dict:
"""信息综合和洞察生成"""
synthesis\_prompt = f"""
请综合分析以下多个数据源的信息:
{json.dumps(data\_sources, ensure\_ascii=False, indent=2)}
请进行以下分析:
11. 识别数据间的关联性和一致性
12. 发现潜在的矛盾或异常
13. 提取关键洞察和趋势
14. 生成综合性结论
15. 评估信息的可靠性
返回综合分析报告。
"""
response = await self.llm\_engine.generate(
prompt=synthesis\_prompt,
temperature=0.3,
max\_tokens=1200
)
return {"synthesis\_report": response, "data\_quality\_score": 0.9}
2.5.2 动态决策支持
classDynamicDecisionSupport:
"""动态决策支持系统"""
def\_\_init\_\_(self, llm\_engine: FinancialLLMEngine):
self.llm\_engine = llm\_engine
self.decision\_history = []
asyncdefmake\_strategic\_decision(self, situation: dict, options: List[dict]) -> dict:
"""战略决策制定"""
decision\_prompt = f"""
当前情况:{json.dumps(situation, ensure\_ascii=False, indent=2)}
可选方案:
{json.dumps(options, ensure\_ascii=False, indent=2)}
作为金融专家,请进行决策分析:
1. 评估每个方案的优缺点
2. 分析风险和收益
3. 考虑市场环境和监管要求
4. 推荐最优方案并说明理由
5. 提供备选方案和应急预案
请提供详细的决策分析报告。
"""
decision\_analysis = await self.llm\_engine.generate(
prompt=decision\_prompt,
temperature=0.2,
max\_tokens=1500
)
# 记录决策历史
decision\_record = {
"timestamp": datetime.now(),
"situation": situation,
"options": options,
"analysis": decision\_analysis,
"decision\_id": str(uuid.uuid4())
}
self.decision\_history.append(decision\_record)
return decision\_record
asyncdefadapt\_to\_feedback(self, decision\_id: str, feedback: dict) -> dict:
"""根据反馈调整决策"""
# 找到原始决策
original\_decision = next(
(d for d in self.decision\_history if d["decision\_id"] == decision\_id),
None
)
ifnot original\_decision:
return {"error": "Decision not found"}
adaptation\_prompt = f"""
原始决策:{json.dumps(original\_decision, ensure\_ascii=False, indent=2)}
反馈信息:{json.dumps(feedback, ensure\_ascii=False, indent=2)}
请基于反馈调整决策:
6. 分析反馈的有效性和重要性
7. 识别原决策的不足之处
8. 提出改进建议
9. 更新决策方案
10. 制定实施计划
返回调整后的决策方案。
"""
adapted\_decision = await self.llm\_engine.generate(
prompt=adaptation\_prompt,
temperature=0.25,
max\_tokens=1200
)
return {
"original\_decision\_id": decision\_id,
"adapted\_decision": adapted\_decision,
"adaptation\_timestamp": datetime.now(),
"feedback\_incorporated": feedback
}
2.5.3 知识图谱增强
classKnowledgeGraphEnhancement:
"""知识图谱增强系统"""
def\_\_init\_\_(self, llm\_engine: FinancialLLMEngine):
self.llm\_engine = llm\_engine
self.knowledge\_graph = FinancialKnowledgeGraph()
asyncdefextract\_financial\_entities(self, text: str) -> List[dict]:
"""提取金融实体"""
extraction\_prompt = f"""
请从以下文本中提取金融相关实体:
文本:{text}
请识别以下类型的实体:
1. 公司名称(包括简称和全称)
2. 金融指标(如ROE、PE、营收等)
3. 时间期间(如2023年、Q1等)
4. 金融产品(如股票、债券、基金等)
5. 市场名称(如A股、港股等)
6. 监管机构(如证监会、银保监会等)
返回JSON格式的实体列表,包含实体名称、类型、置信度。
"""
response = await self.llm\_engine.generate(
prompt=extraction\_prompt,
temperature=0.1,
max\_tokens=800
)
return json.loads(response)
asyncdefinfer\_relationships(self, entities: List[dict], context: str) -> List[dict]:
"""推理实体关系"""
relationship\_prompt = f"""
实体列表:{json.dumps(entities, ensure\_ascii=False, indent=2)}
上下文:{context}
请推理实体之间的关系:
7. 所属关系(如公司-行业)
8. 比较关系(如公司A vs 公司B)
9. 时间关系(如2022年 vs 2023年)
10. 因果关系(如政策-市场影响)
11. 层级关系(如集团-子公司)
返回关系三元组列表:[主体, 关系, 客体, 置信度]
"""
response = await self.llm\_engine.generate(
prompt=relationship\_prompt,
temperature=0.15,
max\_tokens=1000
)
return json.loads(response)
asyncdefenhance\_agent\_knowledge(self, agent: BaseFinancialAgent, domain: str) -> dict:
"""增强Agent知识"""
# 获取领域相关知识
domain\_knowledge = await self.knowledge\_graph.get\_domain\_knowledge(domain)
# 生成知识增强提示
enhancement\_prompt = f"""
领域:{domain}
现有知识:{json.dumps(domain\_knowledge, ensure\_ascii=False, indent=2)}
请为Agent生成增强知识:
12. 关键概念和定义
13. 常见分析方法
14. 重要的计算公式
15. 行业最佳实践
16. 风险控制要点
返回结构化的知识增强包。
"""
enhanced\_knowledge = await self.llm\_engine.generate(
prompt=enhancement\_prompt,
temperature=0.2,
max\_tokens=1500
)
# 更新Agent知识库
agent.update\_knowledge\_base(json.loads(enhanced\_knowledge))
return {"status": "success", "enhanced\_knowledge": enhanced\_knowledge}
- 金融领域意图识别
3.1 金融查询意图分类
3.2 意图识别实现
classFinancialIntentClassifier:
def\_\_init\_\_(self):
self.intent\_patterns = {
"indicator\_query": {
"keywords": ["ROE", "净利润", "营业收入", "资产", "负债", "现金流", "毛利率"],
"patterns": [r".*查询.*指标.*", r".*的.*是多少", r".*指标.*情况"],
"entities": ["financial\_metrics", "companies", "time\_period"]
},
"comparison\_analysis": {
"keywords": ["对比", "比较", "排名", "同比", "环比", "增长"],
"patterns": [r".*对比.*", r".*比较.*", r".*排名.*", r".*同比.*"],
"entities": ["comparison\_targets", "comparison\_dimensions"]
},
"filtering\_screening": {
"keywords": ["筛选", "过滤", "条件", "满足", "符合", "大于", "小于"],
"patterns": [r".*筛选.*", r".*条件.*", r".*满足.*"],
"entities": ["filter\_conditions", "target\_universe"]
},
"calculation\_analysis": {
"keywords": ["计算", "分析", "评估", "测算", "预测"],
"patterns": [r".*计算.*", r".*分析.*", r".*评估.*"],
"entities": ["calculation\_type", "input\_data"]
},
"report\_generation": {
"keywords": ["报告", "总结", "分析", "研究", "建议"],
"patterns": [r".*报告.*", r".*总结.*", r".*研究.*"],
"entities": ["report\_type", "analysis\_scope"]
}
}
self.financial\_entities = {
"companies": ["平安银行", "招商银行", "工商银行", "建设银行", "中国银行"],
"financial\_metrics": ["ROE", "ROA", "净利润", "营业收入", "总资产", "净资产"],
"time\_periods": ["2023年", "2022年", "Q1", "Q2", "Q3", "Q4", "上半年", "全年"],
"industries": ["银行业", "证券业", "保险业", "房地产", "制造业"],
"regions": ["北京", "上海", "深圳", "广州", "杭州"]
}
defclassify\_intent(self, user\_query: str) -> dict:
"""分类用户查询意图"""
intent\_scores = {}
# 计算各意图类型的匹配分数
for intent\_type, patterns in self.intent\_patterns.items():
score = self.\_calculate\_intent\_score(user\_query, patterns)
intent\_scores[intent\_type] = score
# 确定主要意图
primary\_intent = max(intent\_scores, key=intent\_scores.get)
confidence = intent\_scores[primary\_intent]
# 提取实体信息
entities = self.\_extract\_financial\_entities(user\_query)
# 分析查询复杂度
complexity = self.\_analyze\_query\_complexity(user\_query, entities)
return {
"primary\_intent": primary\_intent,
"confidence": confidence,
"entities": entities,
"complexity": complexity,
"query\_type": self.\_determine\_query\_type(primary\_intent, entities)
}
def\_extract\_financial\_entities(self, query: str) -> dict:
"""提取金融相关实体"""
entities = {}
for entity\_type, entity\_list in self.financial\_entities.items():
found\_entities = [entity for entity in entity\_list if entity in query]
if found\_entities:
entities[entity\_type] = found\_entities
# 提取数值和时间
import re
entities["numbers"] = re.findall(r'\d+(?:\.\d+)?', query)
entities["years"] = re.findall(r'\d{4}年', query)
entities["quarters"] = re.findall(r'Q[1-4]|[一二三四]季度', query)
return entities
def\_analyze\_query\_complexity(self, query: str, entities: dict) -> str:
"""分析查询复杂度"""
complexity\_indicators = {
"simple": len(entities) <= 2and len(query) < 50,
"medium": 2 < len(entities) <= 4and50 <= len(query) < 100,
"complex": len(entities) > 4or len(query) >= 100
}
for level, condition in complexity\_indicators.items():
if condition:
return level
return"medium"
- 任务分解策略
4.1 金融业务任务分解模式
4.2 具体分解示例
4.2.1 复杂查询示例:"分析平安银行2023年ROE变化趋势,与同业对比,并给出投资建议"
任务分解结果:
classFinancialTaskDecomposer:
defdecompose\_complex\_query(self, query: str, intent\_result: dict) -> List[SubTask]:
"""分解复杂金融查询"""
# 示例:"分析平安银行2023年ROE变化趋势,与同业对比,并给出投资建议"
subtasks = [
SubTask(
task\_id="data\_collection\_1",
description="收集平安银行2023年各季度ROE数据",
task\_type="data\_retrieval",
tool\_required="rag\_search",
data\_sources=["财务报告库", "Wind数据库"],
expected\_output="平安银行2023年Q1-Q4 ROE数据",
dependencies=[],
priority="high"
),
SubTask(
task\_id="data\_collection\_2",
description="收集同业银行2023年ROE数据",
task\_type="data\_retrieval",
tool\_required="nl2sql",
data\_sources=["Wind数据库"],
expected\_output="招商银行、工商银行、建设银行等ROE数据",
dependencies=[],
priority="high"
),
SubTask(
task\_id="trend\_analysis",
description="分析平安银行ROE变化趋势",
task\_type="calculation",
tool\_required="data\_analysis",
expected\_output="ROE趋势分析结果(增长率、波动性等)",
dependencies=["data\_collection\_1"],
priority="medium"
),
SubTask(
task\_id="peer\_comparison",
description="进行同业ROE对比分析",
task\_type="comparison",
tool\_required="data\_analysis",
expected\_output="平安银行与同业ROE对比结果",
dependencies=["data\_collection\_1", "data\_collection\_2"],
priority="medium"
),
SubTask(
task\_id="investment\_recommendation",
description="基于分析结果生成投资建议",
task\_type="report\_generation",
tool\_required="llm\_analysis",
expected\_output="投资建议报告",
dependencies=["trend\_analysis", "peer\_comparison"],
priority="low"
)
]
return subtasks
4.3 依赖关系管理
依赖关系分析器实现:
classDependencyAnalyzer:
def\_\_init\_\_(self):
self.dependency\_graph = {}
self.execution\_groups = []
defanalyze\_dependencies(self, subtasks: List[SubTask]) -> dict:
"""分析任务依赖关系"""
# 构建依赖图
self.dependency\_graph = self.\_build\_dependency\_graph(subtasks)
# 检测循环依赖
if self.\_has\_circular\_dependency():
raise ValueError("检测到循环依赖,请检查任务设计")
# 生成执行组(拓扑排序)
self.execution\_groups = self.\_generate\_execution\_groups()
# 识别并行执行机会
parallel\_groups = self.\_identify\_parallel\_groups()
return {
"dependency\_graph": self.dependency\_graph,
"execution\_groups": self.execution\_groups,
"parallel\_groups": parallel\_groups,
"total\_stages": len(self.execution\_groups)
}
def\_build\_dependency\_graph(self, subtasks: List[SubTask]) -> dict:
"""构建依赖关系图"""
graph = {}
for task in subtasks:
graph[task.task\_id] = {
"task": task,
"dependencies": task.dependencies,
"dependents": []
}
# 建立反向依赖关系
for task\_id, task\_info in graph.items():
for dep in task\_info["dependencies"]:
if dep in graph:
graph[dep]["dependents"].append(task\_id)
return graph
def\_generate\_execution\_groups(self) -> List[List[str]]:
"""生成执行组(拓扑排序)"""
groups = []
remaining\_tasks = set(self.dependency\_graph.keys())
while remaining\_tasks:
# 找到当前可执行的任务(无未完成依赖)
current\_group = []
for task\_id in remaining\_tasks:
dependencies = self.dependency\_graph[task\_id]["dependencies"]
if all(dep notin remaining\_tasks for dep in dependencies):
current\_group.append(task\_id)
ifnot current\_group:
raise ValueError("无法解析依赖关系,可能存在循环依赖")
groups.append(current\_group)
remaining\_tasks -= set(current\_group)
return groups
def\_identify\_parallel\_groups(self) -> List[dict]:
"""识别可并行执行的任务组"""
parallel\_groups = []
for i, group in enumerate(self.execution\_groups):
if len(group) > 1:
# 分析并行任务的资源需求
resource\_analysis = self.\_analyze\_resource\_requirements(group)
parallel\_groups.append({
"stage": i + 1,
"tasks": group,
"parallelizable": True,
"resource\_requirements": resource\_analysis,
"estimated\_time\_saving": self.\_estimate\_time\_saving(group)
})
return parallel\_groups
def\_analyze\_resource\_requirements(self, task\_group: List[str]) -> dict:
"""分析资源需求"""
resource\_usage = {
"database\_connections": 0,
"api\_calls": 0,
"memory\_intensive": False,
"cpu\_intensive": False
}
for task\_id in task\_group:
task = self.dependency\_graph[task\_id]["task"]
if task.tool\_required in ["nl2sql", "database\_query"]:
resource\_usage["database\_connections"] += 1
if task.tool\_required in ["rag\_search", "api\_call"]:
resource\_usage["api\_calls"] += 1
if task.task\_type in ["data\_analysis", "calculation"]:
resource\_usage["cpu\_intensive"] = True
if"large\_dataset"in task.expected\_output.lower():
resource\_usage["memory\_intensive"] = True
return resource\_usage
- 工具使用与数据源集成
5.1 工具架构设计
5.2 RAG检索工具实现
classFinancialRAGTool:
def\_\_init\_\_(self):
self.vector\_db = VectorDatabase()
self.document\_parser = DocumentParser()
self.embedding\_model = EmbeddingModel()
# 文档类型映射
self.document\_types = {
"audit\_report": "审计报告",
"bond\_prospectus": "债券募集说明书",
"financial\_report": "财务报告",
"ipo\_document": "IPO文档"
}
defsearch\_financial\_documents(self, query: str, doc\_types: List[str] = None,
companies: List[str] = None,
time\_range: dict = None) -> List[dict]:
"""搜索金融文档"""
# 构建搜索向量
query\_embedding = self.embedding\_model.encode(query)
# 构建过滤条件
filters = self.\_build\_search\_filters(doc\_types, companies, time\_range)
# 执行向量搜索
search\_results = self.vector\_db.search(
query\_vector=query\_embedding,
filters=filters,
top\_k=20,
similarity\_threshold=0.7
)
# 重排序和后处理
ranked\_results = self.\_rerank\_results(search\_results, query)
return ranked\_results
def\_build\_search\_filters(self, doc\_types: List[str], companies: List[str],
time\_range: dict) -> dict:
"""构建搜索过滤条件"""
filters = {}
if doc\_types:
filters["document\_type"] = {"$in": doc\_types}
if companies:
filters["company\_name"] = {"$in": companies}
if time\_range:
filters["report\_date"] = {
"$gte": time\_range.get("start\_date"),
"$lte": time\_range.get("end\_date")
}
return filters
defextract\_financial\_metrics(self, documents: List[dict],
metrics: List[str]) -> dict:
"""从文档中提取财务指标"""
extracted\_data = {}
for doc in documents:
doc\_content = doc["content"]
company = doc["company\_name"]
report\_date = doc["report\_date"]
# 使用NER和规则提取指标
metrics\_data = self.\_extract\_metrics\_from\_text(doc\_content, metrics)
if company notin extracted\_data:
extracted\_data[company] = {}
extracted\_data[company][report\_date] = metrics\_data
return extracted\_data
def\_extract\_metrics\_from\_text(self, text: str, metrics: List[str]) -> dict:
"""从文本中提取具体指标"""
import re
extracted = {}
# 定义指标提取模式
metric\_patterns = {
"ROE": r"净资产收益率[::]*\s*([\d.]+)%?",
"ROA": r"总资产收益率[::]*\s*([\d.]+)%?",
"净利润": r"净利润[::]*\s*([\d,.]+)\s*[万亿]?元",
"营业收入": r"营业收入[::]*\s*([\d,.]+)\s*[万亿]?元",
"总资产": r"总资产[::]*\s*([\d,.]+)\s*[万亿]?元"
}
for metric in metrics:
if metric in metric\_patterns:
pattern = metric\_patterns[metric]
matches = re.findall(pattern, text)
if matches:
extracted[metric] = self.\_normalize\_number(matches[0])
return extracted
def\_normalize\_number(self, number\_str: str) -> float:
"""标准化数字格式"""
# 移除逗号和其他格式字符
cleaned = re.sub(r'[,,]', '', number\_str)
try:
return float(cleaned)
except ValueError:
returnNone
5.3 NL2SQL工具实现
classFinancialNL2SQLTool:
def\_\_init\_\_(self):
self.sql\_generator = SQLGenerator()
self.query\_optimizer = QueryOptimizer()
self.database\_connector = DatabaseConnector()
# 数据库schema信息
self.schema\_info = {
"wind\_database": {
"tables": {
"stock\_basic\_info": ["stock\_code", "stock\_name", "industry", "list\_date"],
"financial\_indicators": ["stock\_code", "report\_date", "roe", "roa", "net\_profit"],
"market\_data": ["stock\_code", "trade\_date", "close\_price", "volume", "market\_cap"]
},
"relationships": [
("stock\_basic\_info.stock\_code", "financial\_indicators.stock\_code"),
("stock\_basic\_info.stock\_code", "market\_data.stock\_code")
]
}
}
defnatural\_language\_to\_sql(self, nl\_query: str, database: str = "wind\_database") -> dict:
"""将自然语言转换为SQL查询"""
# 解析自然语言查询
parsed\_query = self.\_parse\_nl\_query(nl\_query)
# 生成SQL语句
sql\_query = self.\_generate\_sql(parsed\_query, database)
# 优化查询
optimized\_sql = self.query\_optimizer.optimize(sql\_query)
# 执行查询
results = self.\_execute\_query(optimized\_sql, database)
return {
"original\_query": nl\_query,
"parsed\_query": parsed\_query,
"sql\_query": optimized\_sql,
"results": results,
"execution\_time": self.\_get\_execution\_time()
}
def\_parse\_nl\_query(self, nl\_query: str) -> dict:
"""解析自然语言查询"""
parsed = {
"select\_fields": [],
"tables": [],
"conditions": [],
"aggregations": [],
"order\_by": [],
"limit": None
}
# 识别查询字段
field\_patterns = {
"ROE": "roe",
"净资产收益率": "roe",
"ROA": "roa",
"总资产收益率": "roa",
"净利润": "net\_profit",
"股票代码": "stock\_code",
"股票名称": "stock\_name",
"行业": "industry"
}
for pattern, field in field\_patterns.items():
if pattern in nl\_query:
parsed["select\_fields"].append(field)
# 识别条件
condition\_patterns = {
r"(\d{4})年": lambda m: f"report\_date LIKE '{m.group(1)}%'",
r"ROE\s*[>大于]\s*([\d.]+)": lambda m: f"roe > {m.group(1)}",
r"行业\s*[=是]\s*([\u4e00-\u9fa5]+)": lambda m: f"industry = '{m.group(1)}'"
}
import re
for pattern, condition\_func in condition\_patterns.items():
matches = re.finditer(pattern, nl\_query)
for match in matches:
parsed["conditions"].append(condition\_func(match))
# 识别排序和限制
if"排名"in nl\_query or"前"in nl\_query:
parsed["order\_by"].append("roe DESC")
# 提取数量限制
limit\_match = re.search(r"前(\d+)", nl\_query)
if limit\_match:
parsed["limit"] = int(limit\_match.group(1))
return parsed
def\_generate\_sql(self, parsed\_query: dict, database: str) -> str:
"""生成SQL查询语句"""
# 确定需要的表
required\_tables = self.\_determine\_required\_tables(parsed\_query["select\_fields"])
# 构建SELECT子句
select\_clause = "SELECT " + ", ".join(parsed\_query["select\_fields"])
# 构建FROM子句
from\_clause = "FROM " + " JOIN ".join(required\_tables)
# 构建WHERE子句
where\_clause = ""
if parsed\_query["conditions"]:
where\_clause = "WHERE " + " AND ".join(parsed\_query["conditions"])
# 构建ORDER BY子句
order\_clause = ""
if parsed\_query["order\_by"]:
order\_clause = "ORDER BY " + ", ".join(parsed\_query["order\_by"])
# 构建LIMIT子句
limit\_clause = ""
if parsed\_query["limit"]:
limit\_clause = f"LIMIT {parsed\_query['limit']}"
# 组合完整SQL
sql\_parts = [select\_clause, from\_clause, where\_clause, order\_clause, limit\_clause]
sql\_query = " ".join([part for part in sql\_parts if part])
return sql\_query
def\_determine\_required\_tables(self, fields: List[str]) -> List[str]:
"""确定查询所需的表"""
required\_tables = set()
field\_table\_mapping = {
"stock\_code": "stock\_basic\_info",
"stock\_name": "stock\_basic\_info",
"industry": "stock\_basic\_info",
"roe": "financial\_indicators",
"roa": "financial\_indicators",
"net\_profit": "financial\_indicators",
"close\_price": "market\_data",
"market\_cap": "market\_data"
}
for field in fields:
if field in field\_table\_mapping:
required\_tables.add(field\_table\_mapping[field])
return list(required\_tables)
- 执行引擎与调度
6.1 任务调度器设计
6.2 调度器实现
classFinancialTaskScheduler:
def\_\_init\_\_(self):
self.task\_queue = TaskQueue()
self.dependency\_manager = DependencyManager()
self.execution\_engine = ExecutionEngine()
self.result\_manager = ResultManager()
self.error\_handler = ErrorHandler()
# 执行状态跟踪
self.task\_status = {}
self.execution\_history = []
defschedule\_and\_execute(self, subtasks: List[SubTask]) -> dict:
"""调度和执行任务"""
# 初始化任务状态
for task in subtasks:
self.task\_status[task.task\_id] = "pending"
self.task\_queue.add\_task(task)
# 分析依赖关系
dependency\_analysis = self.dependency\_manager.analyze\_dependencies(subtasks)
execution\_groups = dependency\_analysis["execution\_groups"]
# 按组执行任务
overall\_results = {}
for group\_index, task\_group in enumerate(execution\_groups):
group\_results = self.\_execute\_task\_group(task\_group, group\_index)
overall\_results.update(group\_results)
# 检查是否有失败的关键任务
if self.\_has\_critical\_failures(group\_results):
return self.\_handle\_critical\_failure(overall\_results)
# 整合最终结果
final\_result = self.result\_manager.integrate\_results(overall\_results)
return {
"status": "completed",
"results": final\_result,
"execution\_summary": self.\_generate\_execution\_summary(),
"performance\_metrics": self.\_calculate\_performance\_metrics()
}
def\_execute\_task\_group(self, task\_group: List[str], group\_index: int) -> dict:
"""执行任务组"""
group\_results = {}
if len(task\_group) == 1:
# 串行执行单个任务
task\_id = task\_group[0]
result = self.\_execute\_single\_task(task\_id)
group\_results[task\_id] = result
else:
# 并行执行多个任务
group\_results = self.\_execute\_parallel\_tasks(task\_group)
return group\_results
def\_execute\_parallel\_tasks(self, task\_group: List[str]) -> dict:
"""并行执行任务组"""
import concurrent.futures
import threading
results = {}
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max\_workers=len(task\_group)) as executor:
# 提交所有任务
future\_to\_task = {
executor.submit(self.\_execute\_single\_task, task\_id): task\_id
for task\_id in task\_group
}
# 收集结果
for future in concurrent.futures.as\_completed(future\_to\_task):
task\_id = future\_to\_task[future]
try:
result = future.result(timeout=300) # 5分钟超时
results[task\_id] = result
self.task\_status[task\_id] = "completed"
except Exception as e:
results[task\_id] = {"status": "failed", "error": str(e)}
self.task\_status[task\_id] = "failed"
return results
def\_execute\_single\_task(self, task\_id: str) -> dict:
"""执行单个任务"""
task = self.task\_queue.get\_task(task\_id)
try:
self.task\_status[task\_id] = "executing"
# 根据任务类型选择执行器
if task.task\_type == "data\_retrieval":
result = self.\_execute\_data\_retrieval\_task(task)
elif task.task\_type == "calculation":
result = self.\_execute\_calculation\_task(task)
elif task.task\_type == "comparison":
result = self.\_execute\_comparison\_task(task)
elif task.task\_type == "report\_generation":
result = self.\_execute\_report\_generation\_task(task)
else:
raise ValueError(f"未知任务类型: {task.task\_type}")
self.task\_status[task\_id] = "completed"
# 记录执行历史
self.execution\_history.append({
"task\_id": task\_id,
"status": "completed",
"execution\_time": result.get("execution\_time", 0),
"timestamp": datetime.now()
})
return result
except Exception as e:
self.task\_status[task\_id] = "failed"
error\_result = self.error\_handler.handle\_task\_error(task, e)
# 记录错误
self.execution\_history.append({
"task\_id": task\_id,
"status": "failed",
"error": str(e),
"timestamp": datetime.now()
})
return error\_result
def\_execute\_data\_retrieval\_task(self, task: SubTask) -> dict:
"""执行数据检索任务"""
start\_time = time.time()
if task.tool\_required == "rag\_search":
# 使用RAG工具
rag\_tool = FinancialRAGTool()
results = rag\_tool.search\_financial\_documents(
query=task.description,
doc\_types=task.data\_sources
)
elif task.tool\_required == "nl2sql":
# 使用NL2SQL工具
nl2sql\_tool = FinancialNL2SQLTool()
results = nl2sql\_tool.natural\_language\_to\_sql(task.description)
else:
raise ValueError(f"不支持的工具: {task.tool\_required}")
execution\_time = time.time() - start\_time
return {
"status": "completed",
"data": results,
"execution\_time": execution\_time,
"task\_type": task.task\_type
}
def\_execute\_calculation\_task(self, task: SubTask) -> dict:
"""执行计算任务"""
start\_time = time.time()
# 获取依赖任务的结果
input\_data = self.\_get\_dependency\_results(task.dependencies)
# 执行计算
calculator = FinancialCalculator()
if"趋势分析"in task.description:
results = calculator.trend\_analysis(input\_data)
elif"增长率"in task.description:
results = calculator.growth\_rate\_calculation(input\_data)
elif"波动性"in task.description:
results = calculator.volatility\_analysis(input\_data)
else:
results = calculator.general\_calculation(task.description, input\_data)
execution\_time = time.time() - start\_time
return {
"status": "completed",
"data": results,
"execution\_time": execution\_time,
"task\_type": task.task\_type
}
def\_get\_dependency\_results(self, dependencies: List[str]) -> dict:
"""获取依赖任务的结果"""
dependency\_data = {}
for dep\_task\_id in dependencies:
if dep\_task\_id in self.result\_manager.results:
dependency\_data[dep\_task\_id] = self.result\_manager.results[dep\_task\_id]
else:
raise ValueError(f"依赖任务 {dep\_task\_id} 的结果不可用")
return dependency\_data
- 常见业务场景实现
7.1 场景分类与处理策略
7.2 具体场景实现
7.2.1 场景一:"查询平安银行2023年ROE,并计算同比增长率"
classROEQueryScenario:
def\_\_init\_\_(self):
self.scenario\_name = "ROE查询与同比计算"
self.complexity = "medium"
defdecompose\_task(self, query: str) -> List[SubTask]:
"""任务分解"""
return [
SubTask(
task\_id="roe\_current\_year",
description="查询平安银行2023年ROE数据",
task\_type="data\_retrieval",
tool\_required="nl2sql",
data\_sources=["Wind数据库"],
sql\_template="SELECT roe FROM financial\_indicators WHERE stock\_name='平安银行' AND report\_date LIKE '2023%'",
expected\_output="2023年各季度ROE数据",
dependencies=[],
priority="high"
),
SubTask(
task\_id="roe\_previous\_year",
description="查询平安银行2022年ROE数据",
task\_type="data\_retrieval",
tool\_required="nl2sql",
data\_sources=["Wind数据库"],
sql\_template="SELECT roe FROM financial\_indicators WHERE stock\_name='平安银行' AND report\_date LIKE '2022%'",
expected\_output="2022年各季度ROE数据",
dependencies=[],
priority="high"
),
SubTask(
task\_id="yoy\_calculation",
description="计算ROE同比增长率",
task\_type="calculation",
tool\_required="data\_analysis",
calculation\_formula="(ROE\_2023 - ROE\_2022) / ROE\_2022 * 100",
expected\_output="ROE同比增长率",
dependencies=["roe\_current\_year", "roe\_previous\_year"],
priority="medium"
)
]
defexecute\_scenario(self, query: str) -> dict:
"""执行场景"""
# 任务分解
subtasks = self.decompose\_task(query)
# 调度执行
scheduler = FinancialTaskScheduler()
results = scheduler.schedule\_and\_execute(subtasks)
# 结果格式化
formatted\_result = self.\_format\_roe\_result(results)
return formatted\_result
def\_format\_roe\_result(self, results: dict) -> dict:
"""格式化ROE查询结果"""
roe\_2023 = results["results"]["roe\_current\_year"]["data"]
roe\_2022 = results["results"]["roe\_previous\_year"]["data"]
yoy\_growth = results["results"]["yoy\_calculation"]["data"]
return {
"company": "平安银行",
"metric": "ROE",
"current\_year": {
"year": 2023,
"value": roe\_2023,
"unit": "%"
},
"previous\_year": {
"year": 2022,
"value": roe\_2022,
"unit": "%"
},
"yoy\_growth": {
"value": yoy\_growth,
"unit": "%",
"interpretation": self.\_interpret\_growth(yoy\_growth)
},
"analysis\_summary": self.\_generate\_summary(roe\_2023, roe\_2022, yoy\_growth)
}
def\_interpret\_growth(self, growth\_rate: float) -> str:
"""解释增长率"""
if growth\_rate > 10:
return"显著增长"
elif growth\_rate > 0:
return"正增长"
elif growth\_rate > -10:
return"轻微下降"
else:
return"显著下降"
7.2.2 场景二:"筛选ROE大于15%的银行股,按市值排序"
classBankStockScreeningScenario:
def\_\_init\_\_(self):
self.scenario\_name = "银行股筛选排序"
self.complexity = "medium"
defdecompose\_task(self, query: str) -> List[SubTask]:
"""任务分解"""
return [
SubTask(
task\_id="get\_bank\_list",
description="获取所有银行股列表",
task\_type="data\_retrieval",
tool\_required="nl2sql",
sql\_template="SELECT stock\_code, stock\_name FROM stock\_basic\_info WHERE industry='银行业'",
expected\_output="银行股票列表",
dependencies=[],
priority="high"
),
SubTask(
task\_id="get\_roe\_data",
description="获取银行股ROE数据",
task\_type="data\_retrieval",
tool\_required="nl2sql",
sql\_template="SELECT stock\_code, roe FROM financial\_indicators WHERE stock\_code IN (银行股代码) AND report\_date='2023-12-31'",
expected\_output="银行股ROE数据",
dependencies=["get\_bank\_list"],
priority="high"
),
SubTask(
task\_id="filter\_by\_roe",
description="筛选ROE大于15%的银行股",
task\_type="filtering",
tool\_required="data\_analysis",
filter\_condition="roe > 15",
expected\_output="符合ROE条件的银行股",
dependencies=["get\_roe\_data"],
priority="medium"
),
SubTask(
task\_id="get\_market\_cap",
description="获取筛选后银行股的市值数据",
task\_type="data\_retrieval",
tool\_required="nl2sql",
sql\_template="SELECT stock\_code, market\_cap FROM market\_data WHERE stock\_code IN (筛选后股票) AND trade\_date='2023-12-31'",
expected\_output="银行股市值数据",
dependencies=["filter\_by\_roe"],
priority="medium"
),
SubTask(
task\_id="sort\_by\_market\_cap",
description="按市值排序",
task\_type="sorting",
tool\_required="data\_analysis",
sort\_criteria="market\_cap DESC",
expected\_output="按市值排序的银行股列表",
dependencies=["get\_market\_cap"],
priority="low"
)
]
7.2.3 场景三:"生成银行业2023年度分析报告"
classBankingIndustryReportScenario:
def\_\_init\_\_(self):
self.scenario\_name = "银行业年度分析报告"
self.complexity = "high"
defdecompose\_task(self, query: str) -> List[SubTask]:
"""任务分解"""
return [
# 数据收集阶段(可并行)
SubTask(
task\_id="collect\_industry\_overview",
description="收集银行业整体概况数据",
task\_type="data\_retrieval",
tool\_required="rag\_search",
data\_sources=["行业报告库", "监管公告库"],
search\_keywords=["银行业", "2023年", "行业概况", "发展趋势"],
expected\_output="银行业整体发展情况",
dependencies=[],
priority="high"
),
SubTask(
task\_id="collect\_financial\_data",
description="收集主要银行财务数据",
task\_type="data\_retrieval",
tool\_required="nl2sql",
sql\_template="SELECT * FROM financial\_indicators WHERE industry='银行业' AND report\_date LIKE '2023%'",
expected\_output="银行业财务指标数据",
dependencies=[],
priority="high"
),
SubTask(
task\_id="collect\_market\_data",
description="收集银行股市场表现数据",
task\_type="data\_retrieval",
tool\_required="nl2sql",
sql\_template="SELECT * FROM market\_data WHERE industry='银行业' AND trade\_date BETWEEN '2023-01-01' AND '2023-12-31'",
expected\_output="银行股市场表现数据",
dependencies=[],
priority="high"
),
SubTask(
task\_id="collect\_regulatory\_data",
description="收集监管政策和合规数据",
task\_type="data\_retrieval",
tool\_required="rag\_search",
data\_sources=["监管文件库", "政策公告库"],
search\_keywords=["银行监管", "2023年", "政策变化", "合规要求"],
expected\_output="监管政策变化情况",
dependencies=[],
priority="medium"
),
# 分析计算阶段(依赖数据收集)
SubTask(
task\_id="financial\_analysis",
description="进行财务指标分析",
task\_type="calculation",
tool\_required="data\_analysis",
analysis\_types=["盈利能力分析", "资产质量分析", "资本充足率分析"],
expected\_output="财务分析结果",
dependencies=["collect\_financial\_data"],
priority="medium"
),
SubTask(
task\_id="market\_performance\_analysis",
description="进行市场表现分析",
task\_type="calculation",
tool\_required="data\_analysis",
analysis\_types=["股价表现", "估值水平", "投资者情绪"],
expected\_output="市场分析结果",
dependencies=["collect\_market\_data"],
priority="medium"
),
SubTask(
task\_id="competitive\_analysis",
description="进行竞争格局分析",
task\_type="comparison",
tool\_required="data\_analysis",
comparison\_dimensions=["市场份额", "业务结构", "创新能力"],
expected\_output="竞争分析结果",
dependencies=["collect\_financial\_data", "collect\_market\_data"],
priority="medium"
),
# 报告生成阶段(依赖所有分析)
SubTask(
task\_id="generate\_executive\_summary",
description="生成执行摘要",
task\_type="report\_generation",
tool\_required="llm\_analysis",
report\_section="executive\_summary",
expected\_output="执行摘要内容",
dependencies=["financial\_analysis", "market\_performance\_analysis"],
priority="low"
),
SubTask(
task\_id="generate\_detailed\_analysis",
description="生成详细分析章节",
task\_type="report\_generation",
tool\_required="llm\_analysis",
report\_section="detailed\_analysis",
expected\_output="详细分析内容",
dependencies=["financial\_analysis", "market\_performance\_analysis", "competitive\_analysis"],
priority="low"
),
SubTask(
task\_id="generate\_conclusions",
description="生成结论和建议",
task\_type="report\_generation",
tool\_required="llm\_analysis",
report\_section="conclusions\_recommendations",
expected\_output="结论和投资建议",
dependencies=["generate\_detailed\_analysis"],
priority="low"
),
SubTask(
task\_id="format\_final\_report",
description="格式化最终报告",
task\_type="report\_generation",
tool\_required="report\_formatter",
output\_format="pdf",
expected\_output="完整的银行业分析报告",
dependencies=["generate\_executive\_summary", "generate\_detailed\_analysis", "generate\_conclusions"],
priority="low"
)
]
- 性能优化与最佳实践
8.1 性能优化策略
8.2 缓存策略实现
classFinancialDataCache:
def\_\_init\_\_(self):
self.redis\_client = redis.Redis(host='localhost', port=6379, db=0)
self.cache\_ttl = {
"financial\_data": 3600, # 1小时
"market\_data": 300, # 5分钟
"calculation\_results": 1800, # 30分钟
"report\_content": 7200 # 2小时
}
defget\_cached\_result(self, cache\_key: str, data\_type: str) -> dict:
"""获取缓存结果"""
try:
cached\_data = self.redis\_client.get(cache\_key)
if cached\_data:
return json.loads(cached\_data)
except Exception as e:
logger.warning(f"缓存读取失败: {e}")
returnNone
defcache\_result(self, cache\_key: str, data: dict, data\_type: str):
"""缓存结果"""
try:
ttl = self.cache\_ttl.get(data\_type, 3600)
self.redis\_client.setex(
cache\_key,
ttl,
json.dumps(data, ensure\_ascii=False)
)
except Exception as e:
logger.warning(f"缓存写入失败: {e}")
defgenerate\_cache\_key(self, task: SubTask) -> str:
"""生成缓存键"""
key\_components = [
task.task\_type,
task.description,
str(hash(str(task.dependencies)))
]
return"financial\_agent:" + ":".join(key\_components)
- 错误处理与容错机制
9.1 错误分类与处理策略
9.2 容错机制实现
classFinancialErrorHandler:
def\_\_init\_\_(self):
self.retry\_config = {
"max\_retries": 3,
"base\_delay": 1.0,
"max\_delay": 60.0,
"backoff\_factor": 2.0
}
self.fallback\_strategies = {
"data\_source\_unavailable": self.\_fallback\_to\_alternative\_source,
"calculation\_error": self.\_fallback\_to\_simplified\_calculation,
"api\_rate\_limit": self.\_fallback\_to\_cached\_data,
"network\_timeout": self.\_fallback\_to\_local\_data
}
defhandle\_task\_error(self, task: SubTask, error: Exception) -> dict:
"""处理任务错误"""
error\_type = self.\_classify\_error(error)
# 尝试重试
if self.\_should\_retry(error\_type):
return self.\_retry\_task(task, error)
# 尝试降级处理
if error\_type in self.fallback\_strategies:
return self.fallback\_strategies[error\_type](task, error)
# 记录错误并返回失败结果
self.\_log\_error(task, error)
return {
"status": "failed",
"error\_type": error\_type,
"error\_message": str(error),
"fallback\_available": False
}
def\_retry\_task(self, task: SubTask, error: Exception) -> dict:
"""重试任务"""
for attempt in range(self.retry\_config["max\_retries"]):
try:
# 计算延迟时间
delay = min(
self.retry\_config["base\_delay"] * (self.retry\_config["backoff\_factor"] ** attempt),
self.retry\_config["max\_delay"]
)
time.sleep(delay)
# 重新执行任务
result = self.\_execute\_task\_with\_retry(task)
if result["status"] == "completed":
return result
except Exception as retry\_error:
if attempt == self.retry\_config["max\_retries"] - 1:
return {
"status": "failed\_after\_retry",
"original\_error": str(error),
"final\_error": str(retry\_error),
"retry\_attempts": attempt + 1
}
return {"status": "retry\_exhausted"}
- 监控与日志
10.1 监控指标体系
classFinancialAgentMonitor:
def\_\_init\_\_(self):
self.metrics\_collector = MetricsCollector()
self.alert\_manager = AlertManager()
# 关键性能指标
self.kpi\_metrics = {
"task\_success\_rate": 0.0,
"average\_execution\_time": 0.0,
"data\_quality\_score": 0.0,
"user\_satisfaction\_score": 0.0,
"system\_availability": 0.0
}
deftrack\_task\_execution(self, task: SubTask, result: dict):
"""跟踪任务执行"""
execution\_metrics = {
"task\_id": task.task\_id,
"task\_type": task.task\_type,
"execution\_time": result.get("execution\_time", 0),
"status": result.get("status"),
"data\_quality": self.\_assess\_data\_quality(result),
"timestamp": datetime.now()
}
self.metrics\_collector.record\_metrics(execution\_metrics)
# 检查是否需要告警
self.\_check\_alerts(execution\_metrics)
defgenerate\_performance\_report(self, time\_range: dict) -> dict:
"""生成性能报告"""
metrics\_data = self.metrics\_collector.get\_metrics(time\_range)
return {
"summary": {
"total\_tasks": len(metrics\_data),
"success\_rate": self.\_calculate\_success\_rate(metrics\_data),
"average\_execution\_time": self.\_calculate\_avg\_time(metrics\_data),
"error\_rate": self.\_calculate\_error\_rate(metrics\_data)
},
"task\_type\_breakdown": self.\_analyze\_by\_task\_type(metrics\_data),
"performance\_trends": self.\_analyze\_trends(metrics\_data),
"recommendations": self.\_generate\_recommendations(metrics\_data)
}
- 总结与展望
11.1 系统优势
- 智能化任务分解 :基于意图识别的自动任务分解,提高处理效率
- 灵活的执行策略 :支持并行和串行执行,优化资源利用
- 丰富的工具集成 :RAG检索、NL2SQL、数据计算等多种工具协同
- 强大的容错能力 :多层次错误处理和降级策略
- 全面的监控体系 :实时性能监控和质量评估
11.2 应用价值
- 提升分析效率 :自动化复杂金融分析流程,减少人工干预
- 保证数据质量 :多源数据整合和质量检查机制
- 降低技术门槛 :自然语言交互,降低专业技能要求
- 增强决策支持 :提供全面、准确的分析结果和建议
- 推荐阅读
从"答非所问"到"精准回答":30+架构图解密RAG系统优化的核心秘籍
AI Agent 的数据架构:数据库、语料库、知识库与 LLM 的关系和协作
从零开始学 Dify - 万字详解 Dify 循环和迭代的实现机制
从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制
从零开始学 Dify - 万字详解 Dify 工作流(workflow)的实现机制
👆👆👆欢迎关注,一起进步👆👆👆
欢迎留言讨论哈
🧐点赞、分享、推荐 ,一键三连,养成习惯👍
