RAG系统有时候会失败,不是因为 LLM 缺乏智能,而是因为其架构过于简单 。人们试图用线性、一次性的方法来处理循环的、多步骤的问题。
许多复杂的查询需要推理、反思,以及关于何时采取行动的明智决策 ,就像我们面对问题时检索信息的方式一样。这正是Agentic RAG Pipeline用武之地。让我们来看看一个典型的深度思考 RAG Pipeline是什么样的。
-
计划(Plan):首先,代理将复杂的用户查询分解为结构化的、多步骤的研究计划,并决定每个步骤需要哪种工具(内部文档搜索或网络搜索)。
-
检索(Retrieve):对于每个步骤,supervisor会动态选择最佳搜索策略(向量、关键字或混合检索)来执行一个自适应的多阶段检索漏斗。
-
细化(Refine):然后使用高精度cross-encoder对初始结果进行重新排序,并使用代理提取最佳块并将其压缩到简洁的上下文中。
-
反思(Reflect):每一步之后,Agent都会总结目前所有内容并更新其研究历史,从而对问题建立累积的理解。
-
批评(Critique):然后,策略agent会检查历史信息,做出战略决策:要么继续下一步研究,要么在遇到死胡同时修改计划,要么结束研究。
-
综合(Synthesize):研究完成后,最终agent将从所有来源收集的所有证据综合成一个单一、全面且可引用的答案。
在这篇博客中,我们将实现整个深度思考 RAG 管道,并将其与基本的 RAG 管道进行比较,以演示它如何解决复杂的多跳查询 。
目录
- 配置环境
- 获取知识库
- 理解我们的多源、多跳查询
- 构建一个将会失败的浅层 RAG 管道
- 为中央代理系统定义 RAG 状态
- 战略规划和查询制定
- 使用工具感知规划器分解问题
- 使用查询重写agent优化检索
- 元数据感知分块的精度
- 创建多阶段检索漏斗
- 使用 Supervisor 动态选择策略
- 关键词和语义搜索的混合召回
- 使用cross-encoder重排序器实现高精度
- 使用上下文蒸馏进行合成
- 利用网络搜索增强知识
- 自我批评和控制流政策
- 更新并反映累积研究历史
- 构建控制流策略代理
-
定义图形节点
-
定义条件边
-
连接深度思考 RAG 系统
-
编译和可视化迭代工作流
-
运行深度思考管道
-
分析最终的高质量答案
-
比较结果
-
评估框架与分析结果
-
总结整个流程
-
利用马尔可夫决策过程(MDP)学习策略
一、配置环境
在开始编写深度 RAG Pipeline代码之前,我们需要配置环境。当我们开始开发管道并对其进行反复试验时,最好以简单的字典格式定义我们的配置,因为稍后当管道变得复杂时,我们可以简单地参考该字典来更改配置并查看其对整体性能的影响。
# Central Configuration Dictionary to manage all system parameters
config = {
"data_dir": "./data", # Directory to store raw and cleaned data
"vector_store_dir": "./vector_store", # Directory to persist our vector store
"llm_provider": "openai", # The LLM provider we are using
"reasoning_llm": "gpt-4o", # The powerful model for planning and synthesis
"fast_llm": "gpt-4o-mini", # A faster, cheaper model for simpler tasks like the baseline RAG
"embedding_model": "text-embedding-3-small", # The model for creating document embeddings
"reranker_model": "cross-encoder/ms-marco-MiniLM-L-6-v2", # The model for precision reranking
"max_reasoning_iterations": 7, # A safeguard to prevent the agent from getting into an infinite loop
"top_k_retrieval": 10, # Number of documents for initial broad recall
"top_n_rerank": 3, # Number of documents to keep after precision reranking
}
这些配置参数很容易理解,但有三个配置参数值得一提:
-
llm_provider :这是我们使用的 LLM 提供商,在本例中是 OpenAI。我使用 OpenAI 是因为我们可以轻松地在 LangChain 中切换模型和提供商,但您也可以选择任何适合您需求的提供商,例如 Ollama 。
-
reasoning_llm :这必须是我们整个设置中最强大的,因为它将用于规划和综合。
-
fast_llm :这应该是一个更快、更便宜的模型,因为它将用于更简单的任务,如baseline RAG。
现在需要导入管道使用的所需库,并将 api 密钥设置为环境变量,以避免在代码块中暴露它。
import os # For interacting with the operating system (e.g., managing environment variables)
import re # For regular expression operations, useful for text cleaning
import json # For working with JSON data
from getpass import getpass # To securely prompt for user input like API keys without echoing to the screen
from pprint import pprint # For pretty-printing Python objects, making them more readable
import uuid # To generate unique identifiers
from typing import List, Dict, TypedDict, Literal, Optional # For type hinting to create clean, readable, and maintainable code
# Helper function to securely set environment variables if they are not already present
def _set_env(var: str):
# Check if the environment variable is not already set
if not os.environ.get(var):
# If not, prompt the user to enter it securely
os.environ[var] = getpass(f"Enter your {var}: ")
# Set the API keys for the services we will use_set
_env("OPENAI_API_KEY") # For accessing OpenAI models (GPT-4o, embeddings)
_set_env("LANGSMITH_API_KEY") # For tracing and debugging with LangSmith
_set_env("TAVILY_API_KEY") # For the web search tool
# Enable LangSmith tracing to get detailed logs and visualizations of our agent's execution
os.environ["LANGSMITH_TRACING"] = "true"
# Define a project name in LangSmith to organize our runs
os.environ["LANGSMITH_PROJECT"] = "Advanced-Deep-Thinking-RAG"
这里还启用了 LangSmith 的追踪功能。当您使用复杂、循环工作流的代理系统时, 它的追踪功能可以帮助您直观地了解正在发生的事情,并使调试代理的思维过程变得更加容易。
二、获取知识库
生产级 RAG 系统需要复杂且严苛的知识库才能真正展现其有效性。为此,我们将使用 NVIDIA 2023 年的 10-K 文件(https://www.sec.gov/Archives/edgar/data/1045810/000104581023000017/nvda-20230129.htm),这是一份超过一百页的综合性文件,详细说明了公司的业务运营、财务业绩和披露的风险因素。
首先,我们自定义一个函数直接从 SEC EDGAR 数据库下载 10-K 文件,解析原始 HTML,并将其转换为适合我们的 RAG 管道提取的干净结构化文本格式。函数代码如下所示:
import requests # For making HTTP requests to download the document
from bs4 import BeautifulSoup # A powerful library for parsing HTML and XML documents
from langchain.docstore.document import Document # LangChain's standard data structure for a piece of text
def download_and_parse_10k(url, doc_path_raw, doc_path_clean):
# Check if the cleaned file already exists to avoid re-downloading
if os.path.exists(doc_path_clean):
print(f"Cleaned 10-K file already exists at: {doc_path_clean}")
return
print(f"Downloading 10-K filing from {url}...")
# Set a User-Agent header to mimic a browser, as some servers block scripts
headers = {'User-Agent': 'Mozilla/5.0'}
# Make the GET request to the URL
response = requests.get(url, headers=headers)
# Raise an error if the download fails (e.g., 404 Not Found)
response.raise_for_status()
# Save the raw HTML content to a file for inspection
with open(doc_path_raw, 'w', encoding='utf-8') as f:
f.write(response.text)
print(f"Raw document saved to {doc_path_raw}")
# Use BeautifulSoup to parse and clean the HTML content
soup = BeautifulSoup(response.content, 'html.parser')
# Extract text from common HTML tags, attempting to preserve paragraph structure
text = ''
for p in soup.find_all(['p', 'div', 'span']):
# Get the text from each tag, stripping extra whitespace, and add newlines
text += p.get_text(strip=True) + '\n\n'
# Use regex to clean up excessive newlines and spaces for a cleaner final text
clean_text = re.sub(r'\n{3,}', '\n\n', text).strip() # Collapse 3+ newlines into 2
clean_text = re.sub(r'\s{2,}', ' ', clean_text).strip() # Collapse 2+ spaces into 1
# Save the final cleaned text to a .txt file
with open(doc_path_clean, 'w', encoding='utf-8') as f:
f.write(clean_text)
print(f"Cleaned text content extracted and saved to {doc_path_clean}")
代码很容易理解,使用 beautifulsoup4 来解析 HTML 内容并提取文本。它将帮助我们轻松浏览 HTML 结构并检索相关信息,同时忽略任何不必要的元素,例如脚本或样式。
现在,让我们执行它并看看它是如何工作的。
print("Downloading and parsing NVIDIA's 2023 10-K filing...")
# Execute the download and parsing function
download_and_parse_10k(url_10k, doc_path_raw, doc_path_clean)
# Open the cleaned file and print a sample to verify the result
with open(doc_path_clean, 'r', encoding='utf-8') as f:
print("\n--- Sample content from cleaned 10-K ---")
print(f.read(1000) + "...")
#### OUTPUT ####
Downloading and parsing NVIDIA 2023 10-K filing...
Successfully downloaded 10-K filing from https://www.sec.gov/Archives/edgar/data/1045810/000104581023000017/nvda-20230129.htm
Raw document saved to ./data/nvda_10k_2023_raw.html
Cleaned text content extracted and saved to ./data/nvda_10k_2023_clean.txt
# --- Sample content from cleaned 10-K ---
Item 1. Business.
OVERVIEW
NVIDIA is the pioneer of accelerated computing. We are a full-stack computing company with a platform strategy that brings together hardware, systems, software, algorithms, libraries, and services to create unique value for the markets we serve. Our work in accelerated computing and AI is reshaping the worlds largest industries and profoundly impacting society.
Founded in 1993, we started as a PC graphics chip company, inventing the graphics processing unit, or GPU. The GPU was essential for the growth of the PC gaming market and has since been repurposed to revolutionize computer graphics, high performance computing, or HPC, and AI.
The programmability of our GPUs made them ...
我们只是调用这个函数将所有内容存储在一个 txt 文件中,该文件将作为我们的 rag 管道的上下文。
当我们运行上述代码时,您可以看到它开始为我们下载报告,并且我们可以看到下载内容的样本是什么样的。
三、理解我们的多源、多跳查询
为了测试我们实施的管道并将其与基本 RAG 进行比较,我们需要使用一个非常复杂的查询,涵盖我们知识库文档的不同方面。
Our Complex Query:
"Based on NVIDIA's 2023 10-K filing, identify their key risks related to
competition. Then, find recent news (post-filing, from 2024) about AMD's
AI chip strategy and explain how this new strategy directly addresses or
exacerbates one of NVIDIA's stated risks."
让我们分析一下为什么这个查询对于标准 RAG 管道来说如此困难:
-
多跳推理:系统必须首先识别风险,然后找到 AMD 消息,最后将两者综合起来。
-
多源知识: 所需信息存在于两个完全不同的地方。风险信息存在于我们静态的内部文档(10-K)中,而 AMD 新闻则位于外部,需要访问实时网络。
-
综合与分析: 该查询并非要求简单地列出事实。它要求解释一组事实如何使另一组事实恶化,这项任务需要真正的综合能力。
在下一节中,我们将实现基本的 RAG 管道,实际上看看简单的 RAG 是如何失败的。
四、构建一个将会失败的浅层 RAG 管道
现在我们已经配置好了环境,并且准备好了具有挑战性的知识库,下一步就是构建一个标准的原生 RAG 流水线。
首先构建最简单的解决方案,我们可以对其运行复杂的查询,并观察它失败的原因和方式 。
-
加载和分块文档: 我们将提取已清理的 10-K 文件并将其拆分成小的、固定大小的块,这是一种常见但语义上简单的方法。
-
创建向量存储: 然后我们将嵌入这些块并将它们索引到 ChromaDB 向量存储中,以实现基本的语义搜索。
-
组装 RAG 链: 我们将使用 LangChain 表达语言 ( LCEL )将我们的检索器、提示模板和 LLM 连接成一个线性管道。
-
演示复杂查询: 我们将针对这个简单的系统执行多跳、多源查询,并分析其不充分的响应。
首先,我们需要加载已清理的文档并将其拆分。我们将使用 LangChain 生态系统中的标准工具 RecursiveCharacterTextSplitter 。
from langchain_community.document_loaders import TextLoader # A simple loader for .txt files
from langchain.text_splitter import RecursiveCharacterTextSplitter # A standard text splitter
print("Loading and chunking the document...")
# Initialize the loader with the path to our cleaned 10-K file
loader = TextLoader(doc_path_clean, encoding='utf-8')
# Load the document into memory
documents = loader.load()
# Initialize the text splitter with a defined chunk size and overlap
# chunk_size=1000: Each chunk will be approximately 1000 characters long.
# chunk_overlap=150: Each chunk will share 150 characters with the previous one to maintain some context.
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150)
# Split the loaded document into smaller, manageable chunks
doc_chunks = text_splitter.split_documents(documents)
print(f"Document loaded and split into {len(doc_chunks)} chunks.")
#### OUTPUT ####
Loading and chunking the document...
Document loaded and split into 378 chunks.
我们的主文档中有 378 数据块,下一步创建向量嵌入并将其存储在数据库中。我们将使用 ChromaDB (一种流行的内存向量存储)和配置中定义的 text-embedding-3-small 模型。
from langchain_community.vectorstores import Chroma # The vector store we will use
from langchain_openai import OpenAIEmbeddings # The function to create embeddings
print("Creating baseline vector store...")
# Initialize the embedding function using the model specified in our config
embedding_function = OpenAIEmbeddings(model=config['embedding_model'])
# Create the Chroma vector store from our document chunks
# This process takes each chunk, creates an embedding for it, and indexes it.
baseline_vector_store = Chroma.from_documents(
documents=doc_chunks,
embedding=embedding_function
)
# Create a retriever from the vector store
# The retriever is the component that will actually perform the search.
# search_kwargs={"k": 3}: This tells the retriever to return the top 3 most relevant chunks for any given query.
baseline_retriever = baseline_vector_store.as_retriever(search_kwargs={"k": 3})
print(f"Vector store created with {baseline_vector_store._collection.count()} embeddings.")
#### OUTPUT ####
Creating baseline vector store...
Vector store created with 378 embeddings.
Chroma.from\_documents 将所有向量存储在一个可搜索的索引中。最后一步是使用 LangChain 表达式语言 (LCEL) 将它们组装成一个可运行的 RAG 链。
该链将定义数据的线性流:从用户的问题到检索器,再到提示,最后到 LLM。
from langchain_core.prompts import ChatPromptTemplate # For creating prompt templates
from langchain_openai import ChatOpenAI # The OpenAI chat model interface
from langchain_core.runnable import RunnablePassthrough # A tool to pass inputs through the chain
from langchain_core.output_parsers import StrOutputParser # To parse the LLM's output as a simple string
# This template instructs the LLM on how to behave.
# {context}: This is where we will inject the content from our retrieved documents.
# {question}: This is where the user's original question will go.
template = """You are an AI financial analyst. Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
# We use our 'fast_llm' for this simple task, as defined in our config
llm = ChatOpenAI(model=config["fast_llm"], temperature=0)
# A helper function to format the list of retrieved documents into a single string
def format_docs(docs):
return "\n\n---\n\n".join(doc.page_content for doc in docs)
# The complete RAG chain defined using LCEL's pipe (|) syntax
baseline_rag_chain = (
# The first step is a dictionary that defines the inputs to our prompt
{"context": baseline_retriever | format_docs, "question": RunnablePassthrough()}
# The context is generated by taking the question, passing it to the retriever, and formatting the result
# The original question is passed through unchanged
| prompt # The dictionary is then passed to the prompt template
| llm # The formatted prompt is passed to the language model
| StrOutputParser() # The LLM's output message is parsed into a string
)
首先,定义了提示词模板,它的 context 键由子链填充,输入问题来自baseline\_retriever ,其输出( Document 对象列表)是通过format\_docs函数格式化为单个字符串。question 键只需使用 RunnablePassthrough 传递原始输入即可。
让我们运行这个简单的管道并了解它在哪里失败了。
from rich.console import Console # For pretty-printing output with markdown
from rich.markdown import Markdown
# Initialize the rich console for better output formatting
console = Console()
# Our complex, multi-hop, multi-source query
complex_query_adv = "Based on NVIDIA's 2023 10-K filing, identify their key risks related to competition. Then, find recent news (post-filing, from 2024) about AMD's AI chip strategy and explain how this new strategy directly addresses or exacerbates one of NVIDIA's stated risks."
print("Executing complex query on the baseline RAG chain...")
# Invoke the chain with our challenging query
baseline_result = baseline_rag_chain.invoke(complex_query_adv)
console.print("\n--- BASELINE RAG FAILED OUTPUT ---")
# Print the result using markdown formatting for readability
console.print(Markdown(baseline_result))
当您运行上述代码时,我们会得到以下输出。
#### OUTPUT ####
Executing complex query on the baseline RAG chain...
--- BASELINE RAG FAILED OUTPUT ---
Based on the provided context, NVIDIA operates in an intensely competitive semiconductor
industry and faces competition from companies like AMD. The context mentions
that the industry is characterized by rapid technological change. However, the provided documents do not contain any specific information about AMD's recent AI chip strategy from 2024 or how it might impact NVIDIA's stated risks.
分析一下该RAG 管道失败的三个方面:
-
不相关的上下文:Retriever 抓取了 “NVIDIA” 、 “competition” 和 “AMD” 的一般信息,但没有抓取到 2024 年 AMD 战略的具体细节。
-
信息缺失:关键缺陷在于 2023 年的数据无法涵盖 2024 年的事件。系统没有意识到自己缺少关键信息。
-
缺乏规划或工具使用:将复杂的查询视为简单的查询,无法将其分解为多个步骤,也无法使用网页搜索等工具来填补空白。
该系统的失败并非因为LLM太笨,而是因为其架构过于简单 。它试图用一个线性的、一次性的过程来解决一个循环的、多步骤的问题。我们已经了解了基本 RAG 管道的问题,接下来让我们看看深度思考RAG是如何解决好复杂查询的。
五、为Agent系统定义RAGState
要构建推理agnent,我们首先需要一种方法来管理它的状态。在我们简单的 RAG 链中,每个步骤都是无状态的,但是智能agent需要记忆。它需要记住最初的问题、制定的计划以及迄今为止收集的证据。
RAGState 将充当中央内存,在 LangGraph 工作流程中的每个节点之间传递。为了构建它,我们将定义一系列结构化数据类,从最基本的构建块开始:研究计划中的单个步骤。
我们希望定义智能体计划的原子单元。每个 Step 不仅要包含一个需要回答的问题,还要包含其背后的推理,以及至关重要的是,智能体应该使用的具体工具。这迫使智能体的规划过程变得清晰明确且结构化。
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import BaseModel, Field
# Pydantic model for a single step in the agent's reasoning plan
class Step(BaseModel):
# A specific, answerable sub-question for this research step
sub_question: str = Field(description="A specific, answerable question for this step.")
# The agent's justification for why this step is necessary
justification: str = Field(description="A brief explanation of why this step is necessary to answer the main query.")
# The specific tool to use for this step: either internal document search or external web search
tool: Literal["search_10k", "search_web"] = Field(description="The tool to use for this step.")
# A list of critical keywords to improve the accuracy of the search
keywords: List[str] = Field(description="A list of critical keywords for searching relevant document sections.")
# (Optional) A likely document section to perform a more targeted, filtered search within
document_section: Optional[str] = Field(description="A likely document section title (e.g., 'Item 1A. Risk Factors') to search within. Only for 'search_10k' tool.")
Step类使用了 Pydantic BaseModel,定义了Planner Agent严格执行的规范。tool tool: Literal[...] 字段定义 LLM 需要做出具体的决策,使用我们的内部知识( search\_10k )还是寻求外部信息( search\_web )。
这种结构化的输出比尝试解析自然语言计划要可靠得多。
现在我们已经定义了一个 Step ,我们需要一个容器来保存整个步骤序列。我们将创建一个 Plan 类,它只是一个 Step 对象的列表。这代表了代理完整的端到端研究策略。
# Pydantic model for the overall plan, which is a list of individual steps
class Plan(BaseModel):
# A list of Step objects that outlines the full research plan
steps: List[Step] = Field(description="A detailed, multi-step plan to answer the user's query.")
我们编写了一个 Plan 类,它将为整个研究过程提供结构。当我们调用 Planner Agent 时,我们会要求它返回一个符合此模式的 JSON 对象。这确保了在执行任何检索操作之前,代理策略清晰、有序且机器可读。
接下来,当我们的智能体执行其计划时,它需要一种方法来记住它所学到的知识。我们将定义一个 PastStep 字典来存储每个已完成步骤的结果。这将构成智能体的研究历史或实验室笔记 。
# A TypedDict to store the results of a completed step in our research history
class PastStep(TypedDict):
step_index: int # The index of the completed step (e.g., 1, 2, 3)
sub_question: str # The sub-question that was addressed in this step
retrieved_docs: List[Document] # The precise documents retrieved and reranked for this step
summary: str # The agent's one-sentence summary of the findings from this step
PastStep 结构对于智能体的自我批评循环至关重要。每完成一步,我们都会填充其中一个字典并将其添加到状态中。然后,智能体将能够查看这个不断增长的摘要列表,以了解其已知信息,并确定是否拥有足够的信息来完成其任务。
最后,我们将所有这些部分整合到主 RAGState 字典中。它是贯穿整个图的核心对象,包含原始查询、完整计划、过去步骤的历史记录以及当前执行步骤的所有中间数据。
# The main state dictionary that will be passed between all nodes in our LangGraph agent
class RAGState(TypedDict):
original_question: str # The initial, complex query from the user that starts the process
plan: Plan # The multi-step plan generated by the Planner Agent
past_steps: List[PastStep] # A cumulative history of completed research steps and their findings
current_step_index: int # The index of the current step in the plan being executed
retrieved_docs: List[Document] # Documents retrieved in the current step (results of broad recall)
reranked_docs: List[Document] # Documents after precision reranking in the current step
synthesized_context: str # The concise, distilled context generated from the reranked docs
final_answer: str # The final, synthesized answer to the user's original question
这个 RAGState(TypedDict) 是我们agent的完整大脑。图中的每个节点都会接收这个字典作为输入,并返回其更新版本作为输出。
例如,plan\_node将填充 plan 字段,retrieval\_node将填充 retrieved\_docs 字段,依此类推。这种共享的持久状态使得我们简单的 RAG 链能够进行复杂的迭代推理,而这正是我们简单的 RAG 链所缺乏的。
现在,我们已经定义了agent的记忆蓝图,可以构建系统的第一个认知组件:填充此状态的规划agent。
六、战略规划和查询制定
定义好 RAGState 后,我们现在可以构建agent的第一个、可以说是最关键的认知组件:规划能力,这正是我们的系统从简单的数据获取器跃升为真正的推理引擎的地方。我们的agent不会简单地将用户的复杂查询视为单一搜索,而是会先暂停、思考,然后构建一个详细的、循序渐进的研究策略。
本节分为三个关键工程步骤:
- 工具感知规划器: 我们将构建一个由 LLM 驱动的agent,其唯一工作是将用户的查询分解为结构化的 Plan 对象,并决定每个步骤使用哪种工具。
- 查询重写器: 我们将创建一个专门的agent,将规划器的简单子问题转换为高效、优化的搜索查询。
- 元数据感知分块: 我们将重新处理源文档以添加部分级元数据,这是实现高精度、过滤检索的关键步骤。
6.1 使用工具感知规划器分解问题
我们要构建我们的行动大脑,当这个大脑遇到一个复杂的问题时,它需要做的第一件事就是制定一个任务规划计划。
我们不能直接把整个问题扔到数据库中,然后指望它能给出最好的结果。我们需要教会代理如何将问题分解成更小、更易于处理的部分。
为此,我们将创建一个专用的规划agent,我们需要给它一组非常清晰的指令或提示,明确地告诉它它的工作是什么。
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from rich.pretty import pprint as rprint
# The system prompt that instructs the LLM how to behave as a planner
planner_prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert research planner. Your task is to create a clear, multi-step plan to answer a complex user query by retrieving information from multiple sources.
You have two tools available:
1. `search_10k`: Use this to search for information within NVIDIA's 2023 10-K financial filing. This is best for historical facts, financial data, and stated company policies or risks from that specific time period.
2. `search_web`: Use this to search the public internet for recent news, competitor information, or any topic that is not specific to NVIDIA's 2023 10-K.
Decompose the user's query into a series of simple, sequential sub-questions. For each step, decide which tool is more appropriate.
For `search_10k` steps, also identify the most likely section of the 10-K (e.g., 'Item 1A. Risk Factors', 'Item 7. Management's Discussion and Analysis...').
It is critical to use the exact section titles found in a 10-K filing where possible."""),
("human", "User Query: {question}") # The user's original, complex query
])
我们实际上是在赋予 LLM 一个新的角色:一位专家级的研究规划师 。我们明确地告诉它可以使用的两个工具( search\_10k 和 search\_web ),并指导它何时使用它们。这就是“工具感知”的部分。
我们不仅要求它制定计划,还要求它创建一个直接映射到我们已经构建的能力的计划。
现在我们可以启动推理模型并将其与提示链接在一起,这里非常重要的一步是告诉 LLM 它的最终输出必须采用 Pydantic Plan 类的格式,使得输出结构化且可预测。
# Initialize our powerful reasoning model, as defined in the config
reasoning_llm = ChatOpenAI(model=config["reasoning_llm"], temperature=0)
# Create the planner agent by piping the prompt to the LLM and instructing it to use our structured 'Plan' output
planner_agent = planner_prompt | reasoning_llm.with_structured_output(Plan)
print("Tool-Aware Planner Agent created successfully.")
# Let's test the planner agent with our complex query to see its output
print("\n--- Testing Planner Agent ---")
test_plan = planner_agent.invoke({"question": complex_query_adv})
s# Use rich's pretty print for a clean, readable display of the Pydantic object
rprint(test_plan)
我们将 planner\_prompt 传递给强大的 reasoning\_llm ,然后使用 .with\_structured\_output(Plan) 方法。这会告诉 LangChain 使用模型函数调用功能将其响应格式化为与 Plan Pydantic 模式完美匹配的 JSON 对象。这比尝试解析纯文本响应要可靠得多。
让我们看看使用挑战查询进行测试时的输出。
#### OUTPUT ####
Tool-Aware Planner Agent created successfully.
--- Testing Planner Agent ---
Plan(
│ steps=[ │ │ Step( │ │ │ sub_question="What are the key risks related to competition as stated in NVIDIA's 2023 10-K filing?", │ │ │ justification="This step is necessary to extract the foundational information about competitive risks directly from the source document as requested by the user.", │ │ │ tool='search_10k', │ │ │ keywords=['competition', 'risk factors', 'semiconductor industry', 'competitors'],
│ │ │ document_section='Item 1A. Risk Factors'
│ │ ),
│ │ Step(
│ │ │ sub_question="What are the recent news and developments in AMD's AI chip strategy in 2024?",
│ │ │ justification="This step requires finding up-to-date, external information that is not available in the 2023 10-K filing. A web search is necessary to get the latest details on AMD's strategy.",
│ │ │ tool='search_web',
│ │ │ keywords=['AMD', 'AI chip strategy', '2024', 'MI300X', 'Instinct accelerator'],
│ │ │ document_section=None
│ │ )
│ ]
)
如果我们查看输出,就会发现agent不仅仅给出了一个模糊的计划,它还生成了一个结构化的 Plan 对象。它正确地识别出该查询包含两个部分。
- 对于第一部分,它知道答案在 10-K 中并选择了 search_10k 工具,甚至正确猜出了正确的文档部分。
- 对于第二部分,它知道“2024 年的新闻”不可能出现在 2023 年的文档中,并正确地选择了 search_web 工具。这标志着我们的管道至少在思考过程中能够给出令人满意的结果。
6.2 使用查询重写代理优化检索
所以,基本上我们有一个包含良好子问题的计划。
但像“风险是什么?”这样的问题并不是一个很好的搜索查询。它太笼统了。搜索引擎,无论是矢量数据库还是网页搜索,最适合处理具体的、关键词丰富的查询。
为了解决这个问题,我们将构建另一个小型的专用agent:查询重写器 。它的唯一工作就是获取当前步骤的子问题,并通过从我们已经学到的知识中添加相关的关键字和上下文,使其更适合搜索。
首先,让我们为这个新代理设计提示。
from langchain_core.output_parsers import StrOutputParser # To parse the LLM's output as a simple string
# The prompt for our query rewriter, instructing it to act as a search expert
query_rewriter_prompt = ChatPromptTemplate.from_messages([
("system", """You are a search query optimization expert. Your task is to rewrite a given sub-question into a highly effective search query for a vector database or web search engine, using keywords and context from the research plan.
The rewritten query should be specific, use terminology likely to be found in the target source (a financial 10-K or news articles), and be structured to retrieve the most relevant text snippets."""),
("human", "Current sub-question: {sub_question}\n\nRelevant keywords from plan: {keywords}\n\nContext from past steps:\n{past_context}")
])
我们基本上是在告诉这个agent,让它像一个搜索查询优化专家一样工作。我们给它提供了三条信息:简单的 sub\_question 、我们的规划器已经识别出的 keywords ,以及来自任何先前研究步骤的 past\_context 。这为它提供了构建更优查询所需的所有原始材料。
现在我们可以启动这个agent了。这是一个简单的链,因为我们只需要一个字符串作为输出。
# Create the agent by piping the prompt to our reasoning LLM and a string output parser
query_rewriter_agent = query_rewriter_prompt | reasoning_llm | StrOutputParser()
print("Query Rewriter Agent created successfully.")
# Let's test the rewriter agent. We'll pretend we've already completed the first two steps of our plan.
print("\n--- Testing Query Rewriter Agent ---")
# Let's imagine we are at a final synthesis step that needs context from the first two.
test_sub_q = "How does AMD's 2024 AI chip strategy potentially exacerbate the competitive risks identified in NVIDIA's 10-K?"
test_keywords = ['impact', 'threaten', 'competitive pressure', 'market share', 'technological change']
# We create some mock "past context" to simulate what the agent would know at this point in a real run.
test_past_context = "Step 1 Summary: NVIDIA's 10-K lists intense competition and rapid technological change as key risks. Step 2 Summary: AMD launched its MI300X AI accelerator in 2024 to directly compete with NVIDIA's H100."
# Invoke the agent with our test data
rewritten_q = query_rewriter_agent.invoke({
"sub_question": test_sub_q,
"keywords": test_keywords,
"past_context": test_past_context
})
print(f"Original sub-question: {test_sub_q}")
print(f"Rewritten Search Query: {rewritten_q}")
为了正确测试这一点,我们必须模拟一个真实的场景。我们创建一个 test\_past\_context 字符串,它代表agent在其计划的前两个步骤中已经生成的摘要。然后,我们将它和下一个子问题一起输入到 query\_rewriter\_agent 中。
我们来看看结果。
#### OUTPUT ####
Query Rewriter Agent created successfully.
--- Testing Query Rewriter Agent ---
Original sub-question: How does AMD 2024 AI chip strategy potentially exacerbate the competitive risks identified in NVIDIA 10-K?
Rewritten Search Query: analysis of how AMD 2024 AI chip strategy, including products like the MI300X, exacerbates NVIDIA's stated competitive risks such as rapid technological change and market share erosion in the data center and AI semiconductor industry
最初的问题是针对分析师的,他重写了查询语句,用于搜索引擎。它被分配了特定的术语,例如 “MI300X” 、 “market share erosion” 和 “data center”, 所有这些术语都是根据关键词和过去的上下文综合而成的。
这样的查询更有可能检索到正确的文档,从而使我们的整个系统更加准确和高效。这个重写步骤将成为我们主代理循环的关键部分。
6.3 元数据感知分块的精度
规划agent给了我们一个很好的机会。它不仅仅是告诉我们 “发现风险” ,还给了我们一个提示: 在项目 1A 的“风险因素”部分中查找风险 。
但目前,我们的检索器无法使用该提示。我们的向量存储只是一个包含 378 个文本块的扁平大列表。它根本不知道“section”是什么。
我们需要解决这个问题。我们将从头开始重建文档块。这次,对于我们创建的每个块,我们将添加一个标签或标记,用于标记其元数据 ,以便系统准确地告知它来自 10-K 文档的哪个部分。这将使我们的代理稍后能够执行高度精确的过滤搜索。
首先,我们需要一种编程方式来查找原始文本文件中每个部分的起始位置。查看文档,我们可以看到一个清晰的模式:每个主要部分都以单词 “ITEM” 开头,后跟一个数字,例如 “ITEM 1A” 或 “ITEM 7” 。这对于正则表达式来说非常完美。
# This regex is designed to find section titles like 'ITEM 1A.' or 'ITEM 7.' in the 10-K text.
# It looks for the word 'ITEM', followed by a space, a number, an optional letter, a period, and then captures the title text.
# The `re.IGNORECASE | re.DOTALL` flags make the search case-insensitive and allow '.' to match newlines.
section_pattern = r"(ITEM\\s+\\d[A-Z]?\\.\\s*.*?)(?=\\nITEM\\s+\\d[A-Z]?\\.|$)"
我们基本上是在创建一个模式,作为我们的文本段检测器 。它应该被设计得足够灵活,能够捕捉不同的格式,同时又足够具体,不会抓取错误的文本。
现在我们可以使用此模式将文档分成两个单独的列表:一个仅包含章节标题,另一个包含每个章节内的内容。
# We'll work with the raw text loaded earlier from our Document object
raw_text = documents[0].page_content
# Use re.findall to apply our pattern and extract all section titles into a list
section_titles = re.findall(section_pattern, raw_text, re.IGNORECASE | re.DOTALL)
# A quick cleanup step to remove any extra whitespace or newlines from the titles
section_titles = [title.strip().replace('\\n', ' ') for title in section_titles]
# Now, use re.split to break the document apart at each point where a section title occurs
sections_content = re.split(section_pattern, raw_text, flags=re.IGNORECASE | re.DOTALL)
# The split results in a list with titles and content mixed, so we filter it to get only the content parts
sections_content = [content.strip() for content in sections_content if content.strip() and not content.strip().lower().startswith('item ')]
print(f"Identified {len(section_titles)} document sections.")
# This is a crucial sanity check: if the number of titles doesn't match the number of content blocks, something went wrong.
assert len(section_titles) == len(sections_content), "Mismatch between titles and content sections"
这是一种解析半结构化文档的非常有效的方法。我们使用了两次正则表达式:一次是获取所有章节标题的清晰列表,另一次是将正文拆分为内容块列表。 assert 语句让我们确信解析逻辑是合理的。
好了,现在我们有了这些部分:一个标题列表和一个对应的内容列表。现在我们可以循环遍历它们,并创建最终的、包含丰富元数据的块。
import uuid # We'll use this to give each chunk a unique ID, which is good practice
# This list will hold our new, metadata-rich document chunks
doc_chunks_with_metadata = []
# Loop through each section's content along with its title using enumerate
for i, content in enumerate(sections_content):
# Get the corresponding title for the current content block
section_title = section_titles[i]
# Use the same text splitter as before, but this time, we run it ONLY on the content of the current section
section_chunks = text_splitter.split_text(content)
# Now, loop through the smaller chunks created from this one section
for chunk in section_chunks:
# Generate a unique ID for this specific chunk
chunk_id = str(uuid.uuid4())
# Create a new LangChain Document object for the chunk
doc_chunks_with_metadata.append(
Document(
page_content=chunk,
# This is the most important part: we attach the metadata
metadata={
"section": section_title, # The section this chunk belongs to
"source_doc": doc_path_clean, # Where the document came from
"id": chunk_id # The unique ID for this chunk
}
)
)
print(f"Created {len(doc_chunks_with_metadata)} chunks with section metadata.")
print("\n--- Sample Chunk with Metadata ---")
# To prove it worked, let's find a chunk that we know should be in the 'Risk Factors' section and print it
sample_chunk = next(c for c in doc_chunks_with_metadata if "Risk Factors" in c.metadata.get("section", ""))
print(sample_chunk)
这是我们升级的核心。我们逐一迭代每个部分。对于每个部分,我们创建文本块。但在将它们添加到最终列表之前,我们会创建一个 metadata 字典并附加 section\_title 。这有效地为每个块标记了其来源。
让我们看一下输出并看看有什么区别。
#### OUTPUT ####
Processing document and adding metadata...
Identified 22 document sections.
Created 381 chunks with section metadata.
--- Sample Chunk with Metadata ---
Document(
│ page_content='Our industry is intensely competitive. We operate in the semiconductor\\nindustry, which is intensely competitive and characterized by rapid\\ntechnological change and evolving industry standards. We compete with a number of\\ncompanies that have different business models and different combinations of\\nhardware, software, and systems expertise, many of which have substantially\\ngreater resources than we have. We expect competition to increase from existing\\ncompetitors, as well as new and emerging companies. Our competitors include\\nIntel, AMD, and Qualcomm; cloud service providers, or CSPs, such as Amazon Web\\nServices, or AWS, Google Cloud, and Microsoft Azure; and various companies\\ndeveloping or that may develop processors or systems for the AI, HPC, data\\ncenter, gaming, professional visualization, and automotive markets. Some of our\\ncustomers are also our competitors. Our business could be materially and\\nadversely affected if our competitors announce or introduce new products, services,\\nor technologies that have better performance or features, are less expensive, or\\nthat gain market acceptance.',
│ metadata={
│ │ 'section': 'Item 1A. Risk Factors.',
│ │ 'source_doc': './data/nvda_10k_2023_clean.txt',
│ │ 'id': '...'
│ }
)
看看那个 metadata 块。我们之前提到的那段文本现在附加了一段上下文: 'section': 'Item 1A. Risk Factors.' 。
现在,当我们的agent需要查找风险时,它可以告诉检索器: “嘿,不要搜索所有 381 个块。只需搜索部分元数据为‘项目 1A. 风险因素’的块即可。”
这个简单的改变将我们的检索器从钝器转变为手术工具,这是构建真正生产级 RAG 系统的关键原则。
七、创建多阶段检索漏斗
到目前为止,我们已经设计了一个智能规划器,并用元数据丰富了我们的文档。现在,我们准备构建系统的核心:一个复杂的检索管道。
简单的一次性语义搜索已经不够了。对于生产级代理,我们需要一个自适应且多阶段的检索过程。
我们将把检索过程设计成一个漏斗,其中每个阶段都会细化前一个阶段的结果:
- 检索Supervisor:构建一个新的Supervisor代理 ,充当动态路由器,分析每个子问题并选择最佳搜索策略(向量、关键字或混合)。
- 第 1 阶段(广泛回忆):Supervisor可以选择的不同检索策略,重点是广泛收集所有可能相关的文档。
- 第 2 阶段(高精度):使用 Cross-Encoder 模型对初始结果进行重新排序,丢弃噪音并将最相关的文档排序到最前面。
- 第 3 阶段(综合):最后,我们将创建一个 Distiller Agent ,将排名靠前的文档压缩成一个简洁的段落,供我们的下游agent使用。
7.1 使用 Supervisor 动态选择策略
并非所有搜索查询都是一样的。像“计算和网络”部分的收入是多少?这样的问题包含具体、精确的术语。基于关键词的搜索非常适合这种情况。
公司对市场竞争的看法是什么?这还是个概念问题。基于语义向量的搜索会更好。
我们不会采用硬编码一个策略,而是构建一个小型智能代理—— 检索主管 (Retrieval Supervisor) ,来为我们做出这个决定。它唯一的工作就是查看搜索查询,并决定哪种检索方法最合适。
首先,我们需要定义主管可能做出的决策。我们将使用 Pydantic BaseModel 来构建其输出。
class RetrievalDecision(BaseModel):
# The chosen retrieval strategy. Must be one of these three options.
strategy: Literal["vector_search", "keyword_search", "hybrid_search"]
# The agent's justification for its choice.
justification: str
监管者必须从这三种策略中选择一种,并解释其理由。这使得其决策过程透明可靠。
现在,让我们创建指导该代理行为的提示。
retrieval_supervisor_prompt = ChatPromptTemplate.from_messages([
("system", """You are a retrieval strategy expert. Based on the user's query, you must decide the best retrieval strategy.
You have three options:
1. `vector_search`: Best for conceptual, semantic, or similarity-based queries.
2. `keyword_search`: Best for queries with specific, exact terms, names, or codes (e.g., 'Item 1A', 'Hopper architecture').
3. `hybrid_search`: A good default that combines both, but may be less precise than a targeted strategy."""),
("human", "User Query: {sub_question}") # The rewritten search query will be passed here.
])
我们在这里创建了一个非常直接的提示,告诉 LLM 它的角色是检索策略专家 ,并清楚地解释其每个可用策略何时最有效。
最后,我们可以组装我们的主管代理。
# Create the agent by piping our prompt to the reasoning LLM and structuring its output with our Pydantic class
retrieval_supervisor_agent = retrieval_supervisor_prompt | reasoning_llm.with_structured_output(RetrievalDecision)
print("Retrieval Supervisor Agent created.")
# Let's test it with two different types of queries to see how it behaves
print("\n--- Testing Retrieval Supervisor Agent ---")
query1 = "revenue growth for the Compute & Networking segment in fiscal year 2023"
decision1 = retrieval_supervisor_agent.invoke({"sub_question": query1})
print(f"Query: '{query1}'")
print(f"Decision: {decision1.strategy}, Justification: {decision1.justification}")
query2 = "general sentiment about market competition and technological innovation"
decision2 = retrieval_supervisor_agent.invoke({"sub_question": query2})
print(f"\nQuery: '{query2}'")
print(f"Decision: {decision2.strategy}, Justification: {decision2.justification}")
使用.with\_structured\_output(RetrievalDecision)把所有东西连在一起,确保我们从 LLM 返回一个干净、可预测的 RetrievalDecision 对象。让我们看看测试结果。
#### OUTPUT ####
Retrieval Supervisor Agent created.
# --- Testing Retrieval Supervisor Agent ---
Query: 'revenue growth for the Compute & Networking segment in fiscal year 2023'
Decision: keyword_search, Justification: The query contains specific keywords like 'revenue growth', 'Compute & Networking', and 'fiscal year 2023' which are ideal for a keyword-based search to find exact financial figures.
Query: 'general sentiment about market competition and technological innovation'
Decision: vector_search, Justification: This query is conceptual and seeks to understand sentiment and broader themes. Vector search is better suited to capture the semantic meaning of 'market competition' and 'technological innovation' rather than relying on exact keywords.
我们可以看到,它正确识别出第一个充满特定术语的查询并选择了 keyword\_search。
对于第二个概念性抽象的查询,正确地选择了 vector\_search 。这种在检索漏斗开始时就进行动态决策的做法,比起千篇一律的做法,无疑是一次很好的升级。
7.2 通过混合搜索、关键字和语义搜索进行广泛召回
现在我们有了主管来选择策略,我们需要构建检索策略本身。我们漏斗的第一阶段是关于召回的,我们的目标是广撒网,捕获所有可能相关的文档,即使我们在此过程中会发现一些噪音。
为此,我们将实现可以调用的三个不同的搜索功能:
-
矢量搜索:我们的标准语义搜索,但现在升级为使用元数据过滤器。
-
关键词搜索(BM25):一种经典、强大的算法,擅长查找具有特定、精确术语的文档。
-
混合搜索:两种方法中最好的一种是使用称为倒数秩融合(RRF)的技术将向量和关键字搜索的结果结合起来。
首先,我们需要使用上一节中创建的富含元数据的块来创建一个新的高级矢量存储。
import numpy as np # A fundamental library for numerical operations in Python
from rank_bm25 import BM25Okapi # The library for implementing the BM25 keyword search algorithm
print("Creating advanced vector store with metadata...")
# We create a new Chroma vector store, this time using our metadata-rich chunks
advanced_vector_store = Chroma.from_documents(
documents=doc_chunks_with_metadata,
embedding=embedding_function
)
print(f"Advanced vector store created with {advanced_vector_store._collection.count()} embeddings.")
这是一个简单但关键的步骤。现在, advanced\_vector\_store 包含与基准相同的文本,但每个嵌入的块都带有其章节标题的标签,从而解锁了我们执行过滤搜索的能力。
接下来,我们需要为关键词搜索做准备。BM25 算法的工作原理是分析文档中单词的频率。为了实现这一点,我们需要对语料库进行预处理,将每个文档的内容拆分成一个单词列表(token)。
print("\nBuilding BM25 index for keyword search...")
# Create a list where each element is a list of words from a document
tokenized_corpus = [doc.page_content.split(" ") for doc in doc_chunks_with_metadata]
# Create a list of all unique document IDs
doc_ids = [doc.metadata["id"] for doc in doc_chunks_with_metadata]
# Create a mapping from a document's ID back to the full Document object for easy lookup
doc_map = {doc.metadata["id"]: doc for doc in doc_chunks_with_metadata}
# Initialize the BM25Okapi index with our tokenized corpus
bm25 = BM25Okapi(tokenized_corpus)
我们基本上是在为 BM25 索引创建必要的数据结构。tokenized\_corpus 是 tokenized\_corpus 要搜索的内容,而 doc\_map 则允许我们在搜索完成后快速检索完整的 Document 对象。
现在我们可以定义三个检索函数。
# Strategy 1: Pure Vector Search with Metadata Filtering
def vector_search_only(query: str, section_filter: str = None, k: int = 10):
# This dictionary defines the metadata filter. ChromaDB will only search documents that match this.
filter_dict = {"section": section_filter} if section_filter and "Unknown" not in section_filter else None
# Perform the similarity search with the optional filter
return advanced_vector_store.similarity_search(query, k=k, filter=filter_dict)
# Strategy 2: Pure Keyword Search (BM25)
def bm25_search_only(query: str, k: int = 10):
# Tokenize the incoming query
tokenized_query = query.split(" ")
# Get the BM25 scores for the query against all documents in the corpus
bm25_scores = bm25.get_scores(tokenized_query)
# Get the indices of the top k documents
top_k_indices = np.argsort(bm25_scores)[::-1][:k]
# Use our doc_map to return the full Document objects for the top results
return [doc_map[doc_ids[i]] for i in top_k_indices]
# Strategy 3: Hybrid Search with Reciprocal Rank Fusion (RRF)
def hybrid_search(query: str, section_filter: str = None, k: int = 10):
# 1. Perform a keyword search
bm25_docs = bm25_search_only(query, k=k)
# 2. Perform a semantic search with the metadata filter
semantic_docs = vector_search_only(query, section_filter=section_filter, k=k)
# 3. Combine and re-rank the results using Reciprocal Rank Fusion (RRF)
# Get a unique set of all documents found by either search method
all_docs = {doc.metadata["id"]: doc for doc in bm25_docs + semantic_docs}.values()
# Create lists of just the document IDs from each search result
ranked_lists = [[doc.metadata["id"] for doc in bm25_docs], [doc.metadata["id"] for doc in semantic_docs]]
# Initialize a dictionary to store the RRF scores for each document
rrf_scores = {}
# Loop through each ranked list (BM25 and Semantic)
for doc_list in ranked_lists:
# Loop through each document ID in the list with its rank (i)
for i, doc_id in enumerate(doc_list):
if doc_id not in rrf_scores:
rrf_scores[doc_id] = 0
# The RRF formula: add 1 / (rank + k) to the score. We use k=61 as a standard default.
rrf_scores[doc_id] += 1 / (i + 61)
# Sort the document IDs based on their final RRF scores in descending order
sorted_doc_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)
# Return the top k Document objects based on the fused ranking
final_docs = [doc_map[doc_id] for doc_id in sorted_doc_ids[:k]]
return final_docs
print("\nAll retrieval strategy functions ready.")
我们现在已经实现了自适应检索系统的核心。
-
vector_search_only函数是我们升级的语义搜索函数。关键新增功能是 filter=filter_dict 参数,它允许我们从规划器的 Step 中传递 document_section ,并强制搜索仅考虑包含该元数据的块。
-
bm25_search_only函数是我们的纯关键词检索器。它能够快速有效地查找语义搜索可能遗漏的特定术语。
-
hybrid_search函数并行运行两个搜索,然后使用 RRF 智能地合并结果。RRF 是一种简单但功能强大的算法,它根据文档在每个列表中的位置对其进行排序,从而有效地赋予在两个搜索结果中排名靠前的文档更高的权重。
让我们快速测试一下关键词搜索的效果。我们将搜索规划师确定的确切章节标题。
# Test Keyword Search to see if it can precisely find a specific section
print("\n--- Testing Keyword Search ---")
test_query = "Item 1A. Risk Factors"
test_results = bm25_search_only(test_query)
print(f"Query: {test_query}")
print(f"Found {len(test_results)} documents. Top result section: {test_results[0].metadata['section']}")
#### OUTPUT ####
Creating advanced vector store with metadata...
Advanced vector store created with 381 embeddings.
Building BM25 index for keyword search...
All retrieval strategy functions ready.
# --- Testing Keyword Search ---
Query: Item 1A. Risk Factors
Found 10 documents. Top result section: Item 1A. Risk Factors.
输出结果正是我们想要的。BM25 搜索以关键词为中心,只需搜索标题,就能完美且快速地检索到 “Item 1A. Risk Factors” 部分的文档。
当查询包含特定关键字(例如章节标题)时,我们的主管现在可以选择这个精确的工具。
随着我们广泛的召回阶段的构建,我们拥有了一个强大的机制来查找所有可能相关的文档。然而,这张广袤的网络也可能带来不相关的噪音。我们漏斗的下一阶段将专注于高精度地过滤这些噪音。
7.3 使用Cross-Encoder重排序器实现高精度
我们的第一阶段检索在召回率方面做得很好,提取了 10 篇可能与我们的子问题相关的文档。
但问题是,它们只是潜在相关的。将这 10 个文档直接输入到我们的推理 LLM ,效率低下且风险很大。它会增加令牌成本,更重要的是,它会用嘈杂的、半相关的信息混淆模型 。
我们现在需要的是 “精准”阶段 。我们需要一种方法来检查这 10 篇候选文档,并挑选出绝对最佳的文档。这时,重排器就派上用场了。
关键的区别在于这些模型的工作方式。
-
我们的初始检索使用双编码器 (嵌入模型),它分别针对查询和文档创建向量。它速度很快,非常适合搜索数百万个项目。
-
重排序的交叉编码器将查询和单个文档作为一对,进行更深入、更细致的比较。它速度较慢,但准确率更高。
所以,我们构建一个函数来接收检索到的 10 个文档,并使用跨编码器模型为每个文档提供精确的相关性得分。然后,我们只保留前 3 个,正如我们在 config 中定义的一样。
首先,让我们初始化我们的交叉编码器模型。我们将使用 sentence-transformers 库中一个小型但高效的模型,正如我们在配置中指定的一样。
from sentence_transformers import CrossEncoder # The library for using cross-encoder models
print("Initializing CrossEncoder reranker...")
# Initialize the CrossEncoder model using the name from our central config dictionary.
# The library will automatically download the model from the Hugging Face Hub if it's not cached.
reranker = CrossEncoder(config["reranker_model"])
我们将预先训练好的重排序模型加载到内存中,这只需执行一次。我们选择的模型 ms-marco-MiniLM-L-6-v2 非常适合这项任务,因为它在速度和准确率之间实现了良好的平衡。
现在我们可以创建执行重新排序的函数。
def rerank_documents_function(query: str, documents: List[Document]) -> List[Document]:
# If we have no documents to rerank, return an empty list immediately.
if not documents:
return []
# Create the pairs of [query, document_content] that the cross-encoder needs.
pairs = [(query, doc.page_content) for doc in documents]
# Use the reranker to predict a relevance score for each pair. This returns a list of scores.
scores = reranker.predict(pairs)
# Combine the original documents with their new scores.
doc_scores = list(zip(documents, scores))
# Sort the list of (document, score) tuples in descending order based on the score.
doc_scores.sort(key=lambda x: x[1], reverse=True)
# Extract just the Document objects from the top N sorted results.
# The number of documents to keep is controlled by 'top_n_rerank' in our config.
reranked_docs = [doc for doc, score in doc_scores[:config["top_n_rerank"]]]
return reranked_docs
这个函数(rerank\_documents\_function)是我们精确度阶段的主要部分。它接收 query 和来自召回阶段的 10 个 documents 列表。最重要的步骤是 reranker.predict(pairs) 。
在这里,模型不会创建嵌入,而是对查询与每个文档内容进行全面比较,并为每个文档生成相关性分数。获得分数后,我们只需对文档进行排序并对列表进行切片以仅保留前 3 个。此功能的输出将是一个简短、干净且高度相关的文档列表,这是我们下游代理的完美上下文。
这种漏斗式方法,从高召回率的第一阶段过渡到高精度的第二阶段,是生产级 RAG 系统的一个组成部分。 它确保我们获得最佳证据,同时最大限度地降低噪音和成本。
7.4 使用上下文提炼进行合成
现在,我们的检索漏斗运行得非常好。我们先进行大范围搜索,找到了 10 篇可能相关的文档。然后,我们的高精度重排序器筛选出最相关的前 3 篇文档,但在将这些信息交给主要推理代理之前,我们仍然可以做最后的改进。目前,我们有三个独立的文本块。
虽然它们都相关,但可能包含冗余信息或重叠句子。将它们呈现为三个不同的区块,对于LLM来说,处理起来仍然有些笨拙。
我们检索漏斗的最后一个阶段是语境提炼。目标很简单:提取最相关的三个文档块,并将它们提炼成一个简洁明了的段落。这将消除所有冗余信息,并为我们的下游代理提供完美合成的证据。
这个提炼步骤充当了最后的压缩层。它确保输入到我们更昂贵的推理代理中的上下文尽可能密集且信息丰富,从而最大化信号并最小化噪声。为此,我们将创建另一个小型的专用代理,我们将其称为 Distiller Agent 。
首先,我们需要设计引导其行为的提示。
# The prompt for our distiller agent, instructing it to synthesize and be concise
distiller_prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant. Your task is to synthesize the following retrieved document snippets into a single, concise paragraph.
The goal is to provide a clear and coherent context that directly answers the question: '{question}'.
Focus on removing redundant information and organizing the content logically. Answer only with the synthesized context."""),
("human", "Retrieved Documents:\n{context}") # The content of our top 3 reranked documents will be passed here
])
我们给这个代理一个非常专注的任务,告诉它:“这里有一些文本片段。你唯一的任务就是把它们合并成一个连贯的段落来回答这个特定的问题。” “只用合成的上下文来回答” 这个指令很重要,它可以防止代理添加任何对话废话或试图回答问题本身。它只是一个纯粹的文本处理工具。
现在,我们可以组装简单的 distiller\_agent 。
# Create the agent by piping our prompt to the reasoning LLM and a string output parser
distiller_agent = distiller_prompt | reasoning_llm | StrOutputParser()
print("Contextual Distiller Agent created.")
这是另一个简单的 LCEL 链。我们获取 distiller\_prompt ,将其通过管道传输到强大的 reasoning\_llm 进行合成,然后使用 StrOutputParser 获取最终的、干净的文本段落。
创建好这个 distiller\_agent 后,我们的多阶段检索漏斗就完成了。在我们的主代理循环中,每个研究步骤的流程如下:
-
主管:选择一种检索策略( vector 、 keyword 或 hybrid )。
-
召回阶段:执行选定的策略以获取前 10 个文档。
-
精确阶段:使用 rerank_documents_function 获取前 3 个文档。
-
蒸馏阶段:使用 distiller_agent 将前 3 个文档压缩成一个干净的段落。
这个多阶段流程确保我们的agent所处理的证据具有最高的质量。下一步是赋予我们的agent超越其内部知识的能力,并搜索网络。
八、利用网络搜索增强知识
现在,我们的检索漏斗现在非常强大,但它有一个巨大的盲点,它只能看到2023 年的 10-K 文件的内容。为了解决我们的挑战查询,我们的agent需要查找有关 AMD AI 芯片战略的最新消息(提交文件后,从 2024 年开始)。这些信息在我们的静态知识库中根本不存在。
要真正打造一个 “深度思考”的智能体,它需要能够认识到自身知识的局限性,并在其他地方寻找答案。我们需要给它一扇通往外部世界的窗户。
在这一步,我们用一个新工具 ——Web Search—— 来增强我们agent的功能。这将使我们的系统从一个针对特定文档的问答机器人转变为一个真正的多源研究助手。
为此,我们将使用 Tavily Search API 。这是一个专为 LLM 打造的搜索引擎,提供简洁、无广告且相关的搜索结果,非常适合 RAG 流程。它还能与 LangChain 无缝集成。
所以,我们需要做的第一件事就是初始化 Tavily 搜索工具本身。
from langchain_community.tools.tavily_search import TavilySearchResults
# Initialize the Tavily search tool.
# k=3: This parameter instructs the tool to return the top 3 most relevant search results for a given query.
web_search_tool = TavilySearchResults(k=3)
创建一个 Tavily 搜索工具的实例,供我们的agent调用。k=3 参数是一个很好的起点,它提供了一些高质量的来源,而不会让代理承受过多的信息。
现在,原始的 API 响应并非我们真正需要的。我们的下游组件,重新排序器和提取器,都设计为与特定的数据结构协同工作:一个 LangChain Document 对象列表。为了确保无缝集成,我们需要创建一个简单的包装函数。该函数将接受查询,调用 Tavily 工具,然后将原始结果格式化为标准的 Document 结构。
def web_search_function(query: str) -> List[Document]:
# Invoke the Tavily search tool with the provided query.
results = web_search_tool.invoke({"query": query})
# Format the results into a list of LangChain Document objects.
# We use a list comprehension for a concise and readable implementation.
return [
Document(
# The main content of the search result goes into 'page_content'.
page_content=res["content"],
# We store the source URL in the 'metadata' dictionary for citations.
metadata={"source": res["url"]}
) for res in results
]
这个 web\_search\_function 充当着一个至关重要的适配器,它调用 web\_search\_tool.invoke返回一个字典列表,每个字典都包含诸如 "content" 和 "url" 之类的键。
然后,使用列表推导循环遍历这些结果并将它们整齐地重新打包到我们的管道所需的 Document 对象中。
page\_content 获取主要文本,重要的是,我们将 url 存储在 metadata 中,这确保了当我们的代理生成最终答案时,它可以正确引用其网络来源。
这样,我们的外部知识源看起来和感觉起来与我们的内部知识源完全一样,从而允许我们对两者使用相同的处理管道。
函数准备就绪后,我们来快速测试一下,确保它能按预期工作。我们将使用与主要挑战第二部分相关的查询。
# Test the web search function with a query about AMD's 2024 strategy
print("\n--- Testing Web Search Tool ---")
test_query_web = "AMD AI chip strategy 2024"
test_results_web = web_search_function(test_query_web)
print(f"Found {len(test_results_web)} results for query: '{test_query_web}'")
# Print a snippet from the first result to see what we got back
if test_results_web:
print(f"Top result snippet: {test_results_web[0].page_content[:250]}...")
#### OUTPUT ####
Web search tool (Tavily) initialized.
--- Testing Web Search Tool ---
Found 3 results for query: 'AMD AI chip strategy 2024'
Top result snippet: AMD has intensified its battle with Nvidia in the AI chip market with the release of the Instinct MI300X accelerator, a powerful GPU designed to challenge Nvidia's H100 in training and inference for large language models. Major cloud providers like Microsoft Azure and Oracle Cloud are adopting the MI300X, indicating strong market interest...
输出结果证实了我们的工具运行正常。它找到了 3 个与我们的查询相关的网页。顶部结果中的片段正是我们的代理所缺少的那种最新的外部信息。其中提到了 AMD“Instinct MI300X”及其与 NVIDIA “H100” 的竞争,这正是解决我们问题后半部分所需的证据。
我们的智能体现在拥有了一扇通往外部世界的窗户,它的规划器可以智能地决定何时打开这扇窗户。最后一个难题是赋予智能体反思其发现的能力,并决定其研究何时完成。
九、自我批评和控制流政策
到目前为止,我们已经构建了一台强大的研究机器。我们的agent可以制定计划,选择合适的工具,并执行复杂的检索漏斗。但它缺少一个关键要素: 思考自身进展的能力。一个盲目地、一步步遵循计划的智能体并非真正的智能。它需要一种自我批评的机制。
这就是我们构建agent自主认知核心的地方。在每个研究步骤之后,我们的agent都会暂停并进行反思。它会查看刚刚发现的新信息,并将其与已知的信息进行比较,然后做出战略决策:我的研究完成了吗?还是我需要继续?
这种自我批评循环使我们的系统从脚本化的工作流程提升为自主的代理。正是这种机制让它能够判断何时收集到足够的证据,从而自信地回答用户的问题。
我们将使用两个新的专门代理来实现这一点:
- Reflection Agent:该代理将从已完成的步骤中提取上下文,并创建一个简洁的单句摘要,该摘要随后会被添加到我们代理的“研究历史”中。
- Policy Agent:这是战略大师。经过反思,它将根据原计划审查整个研究历史,并做出一个关键决定: CONTINUE_PLAN 还是 FINISH 。
9.1 更新并反映累积研究历史
当我们的代理完成一个研究步骤(例如,检索并提取有关 NVIDIA 风险的信息)后,我们不想就此打住。我们需要将这些新知识整合到代理的记忆中。
我们将构建一个反射代理,它将从当前步骤中提取丰富、精炼的上下文,并将其总结成一个单一、真实的句子。然后,这个总结会被添加到 RAGState 中的 past\_steps 列表中。
首先,让我们为该代理创建提示。
# The prompt for our reflection agent, instructing it to be concise and factual
reflection_prompt = ChatPromptTemplate.from_messages([
("system", """You are a research assistant. Based on the retrieved context for the current sub-question, write a concise, one-sentence summary of the key findings.
This summary will be added to our research history. Be factual and to the point."""),
("human", "Current sub-question: {sub_question}\n\nDistilled context:\n{context}")
])
我们让这个智能体像一个勤奋的研究助理一样工作。它的任务不是发挥创造力,而是做好笔记。它会阅读 context 并撰写 summary 。现在我们可以组装智能体本身了。
# Create the agent by piping our prompt to the reasoning LLM and a string output parser
reflection_agent = reflection_prompt | reasoning_llm | StrOutputParser()
print("Reflection Agent created.")
这个 reflection\_agent 是我们认知循环的一部分。通过创建这些简洁的摘要,它构建了一个清晰易读的研究历史记录。这些历史记录将成为我们下一个也是最重要的代理的输入:决定何时停止的代理。
9.2 构建控制流策略代理
这是我们自主agent的大脑。在 reflection\_agent 更新了研究历史记录后, 策略代理便开始发挥作用。它充当整个操作的监督者。
它的工作是查看代理所知道的一切——原始问题、初步计划以及已完成步骤的完整摘要历史记录,并做出高层战略决策。
我们将首先使用 Pydantic 模型定义其决策的结构。
class Decision(BaseModel):
# The decision must be one of these two actions.
next_action: Literal["CONTINUE_PLAN", "FINISH"]
# The agent must justify its decision.
justification: str
这个 Decision 类强制我们的策略代理做出明确的二元选择,并解释其推理,这使得它的行为透明且易于调试。
接下来,我们设计指导其决策过程的提示。
# The prompt for our policy agent, instructing it to act as a master strategist
policy_prompt = ChatPromptTemplate.from_messages([
("system", """You are a master strategist. Your role is to analyze the research progress and decide the next action.
You have the original question, the initial plan, and a log of completed steps with their summaries.
- If the collected information in the Research History is sufficient to comprehensively answer the Original Question, decide to FINISH.
- Otherwise, if the plan is not yet complete, decide to CONTINUE_PLAN."""),
("human", "Original Question: {question}\n\nInitial Plan:\n{plan}\n\nResearch History (Completed Steps):\n{history}")
])
我们要求LLM进行元分析,它不是回答问题本身,而是对研究过程的状态进行推理。它将现有信息( history )与所需信息( plan 和 question )进行比较,并做出判断。
现在,我们可以组装 policy\_agent 。
# Create the agent by piping our prompt to the reasoning LLM and structuring its output with our Decision class
policy_agent = policy_prompt | reasoning_llm.with_structured_output(Decision)
print("Policy Agent created.")
# Now, let's test the policy agent with two different states of our research process
print("\n--- Testing Policy Agent (Incomplete State) ---")
# First, a state where only Step 1 is complete.
plan_str = json.dumps([s.dict() for s in test_plan.steps])
incomplete_history = "Step 1 Summary: NVIDIA's 10-K states that the semiconductor industry is intensely competitive and subject to rapid technological change."
decision1 = policy_agent.invoke({"question": complex_query_adv, "plan": plan_str, "history": incomplete_history})
print(f"Decision: {decision1.next_action}, Justification: {decision1.justification}")
print("\n--- Testing Policy Agent (Complete State) ---")
# Second, a state where both Step 1 and Step 2 are complete.
complete_history = incomplete_history + "\nStep 2 Summary: In 2024, AMD launched its MI300X accelerator to directly compete with NVIDIA in the AI chip market, gaining adoption from major cloud providers."
decision2 = policy_agent.invoke({"question": complex_query_adv, "plan": plan_str, "history": complete_history})
print(f"Decision: {decision2.next_action}, Justification: {decision2.justification}")
为了正确测试我们的 policy\_agent ,我们模拟了代理生命周期中的两个不同时刻。在第一个测试中,我们为其提供仅包含步骤 1 摘要的历史记录。在第二个测试中,我们为其提供步骤 1 和步骤 2 的摘要。
让我们来研究一下它在每种情况下做出的决定。
#### OUTPUT ####
Policy Agent created.
--- Testing Policy Agent (Incomplete State) ---
Decision: CONTINUE_PLAN, Justification: The research has only identified NVIDIA's competitive risks from the 10-K. It has not yet gathered the required external information about AMD's 2024 strategy, which is the next step in the plan.
--- Testing Policy Agent (Complete State) ---
Decision: FINISH, Justification: The research history now contains comprehensive summaries of both NVIDIA's stated competitive risks and AMD's recent AI chip strategy. All necessary information has been gathered to perform the final synthesis and answer the user's question.
让我们了解一下输出……
在不完整状态下,agent正确识别出缺少有关 AMD 策略的信息。它查看了计划,发现下一步是使用网络搜索,并正确决定执行 CONTINUE\_PLAN。
在完整阶段, 在获得网络搜索的摘要后,它再次分析了其历史记录。这一次,它意识到自己已经掌握了 NVIDIA 风险和 AMD 战略的所有信息。它正确地决定,研究已经完成,是时候 FINISH 了。
通过这个 policy\_agent,我们构建了自主系统的大脑。最后一步是使用 LangGraph 将所有这些组件连接成一个完整的、可执行的工作流。
十、定义图形节点
我们已经设计了所有这些酷炫的专业agent。现在是时候将它们转化为我们工作流程的实际构建块了。在 LangGraph 中,这些构建块被称为节点 。节点只是一个执行特定任务的 Python 函数。它以代理的当前内存( RAGState )作为输入,执行其任务,然后返回一个包含该内存任何更新的字典。
我们将为agent需要采取的每个主要步骤创建一个节点。
首先,我们需要一个简单的辅助函数。由于我们的agent经常需要查看研究历史记录,因此我们需要一种简洁的方法将 past\_steps 列表格式化为可读的字符串。
# A helper function to format the research history for prompts
def get_past_context_str(past_steps: List[PastStep]) -> str:
# This takes the list of PastStep dictionaries and joins them into a single string.
# Each step is clearly labeled for the LLM to understand the context.
return "\\n\\n".join([f"Step {s['step_index']}: {s['sub_question']}\\nSummary: {s['summary']}" for s in past_steps])
我们创建一个实用程序,它将在我们的几个节点内使用,为我们的提示提供历史背景。
现在来看看我们的第一个真实节点: plan\_node 。这是我们agent推理的起点。它唯一的任务就是调用我们的 planner\_agent 并填充 RAGState 中的 plan 字段。
# Node 1: The Planner
def plan_node(state: RAGState) -> Dict:
console.print("--- 🧠: Generating Plan ---")
# We call the planner_agent we created earlier, passing in the user's original question.
plan = planner_agent.invoke({"question": state["original_question"]})
rprint(plan)
# We return a dictionary with the updates for our RAGState.
# LangGraph will automatically merge this into the main state.
return {"plan": plan, "current_step_index": 0, "past_steps": []}
此节点启动所有操作。它从状态中获取 original\_question ,获取 plan,然后将 current\_step\_index 初始化为 0(从第一步开始),并清除 past\_steps 历史记录,以便进行新的运行。
接下来,我们需要实际查找信息的节点。由于我们的规划器可以在两种工具之间进行选择,因此我们需要两个独立的检索节点。让我们从用于搜索内部 10K 文档的 retrieval\_node 开始。
# Node 2a: Retrieval from the 10-K document
def retrieval_node(state: RAGState) -> Dict:
# First, get the details for the current step in the plan.
current_step_index = state["current_step_index"]
current_step = state["plan"].steps[current_step_index]
console.print(f"--- 🔍: Retrieving from 10-K (Step {current_step_index + 1}: {current_step.sub_question}) ---")
# Use our query rewriter to optimize the sub-question for search.
past_context = get_past_context_str(state['past_steps'])
rewritten_query = query_rewriter_agent.invoke({
"sub_question": current_step.sub_question,
"keywords": current_step.keywords,
"past_context": past_context
})
console.print(f" Rewritten Query: {rewritten_query}")
# Get the supervisor's decision on which retrieval strategy is best.
retrieval_decision = retrieval_supervisor_agent.invoke({"sub_question": rewritten_query})
console.print(f" Supervisor Decision: Use `{retrieval_decision.strategy}`. Justification: {retrieval_decision.justification}")
# Based on the decision, execute the correct retrieval function.
if retrieval_decision.strategy == 'vector_search':
retrieved_docs = vector_search_only(rewritten_query, section_filter=current_step.document_section, k=config['top_k_retrieval'])
elif retrieval_decision.strategy == 'keyword_search':
retrieved_docs = bm25_search_only(rewritten_query, k=config['top_k_retrieval'])
else: # hybrid_search
retrieved_docs = hybrid_search(rewritten_query, section_filter=current_step.document_section, k=config['top_k_retrieval'])
# Return the retrieved documents to be added to the state.
return {"retrieved_docs": retrieved_docs}
这个节点可以执行大量智能工作,不仅仅是一个简单的检索器。它编排了一个微型管道:它重写查询,向主管请求最佳策略,然后执行该策略。
现在,我们需要另一个工具的对应节点:网络搜索。
# Node 2b: Retrieval from the Web
def web_search_node(state: RAGState) -> Dict:
# Get the details for the current step.
current_step_index = state["current_step_index"]
current_step = state["plan"].steps[current_step_index]
console.print(f"--- 🌐: Searching Web (Step {current_step_index + 1}: {current_step.sub_question}) ---")
# Rewrite the sub-question for a web search engine.
past_context = get_past_context_str(state['past_steps'])
rewritten_query = query_rewriter_agent.invoke({
"sub_question": current_step.sub_question,
"keywords": current_step.keywords,
"past_context": past_context
})
console.print(f" Rewritten Query: {rewritten_query}")
# Call our web search function.
retrieved_docs = web_search_function(rewritten_query)
# Return the results.
return {"retrieved_docs": retrieved_docs}
这个 web\_search\_node 更简单,因为它不需要supervisor,只有一种搜索网络的方法。但它仍然使用我们强大的查询重写器来确保搜索尽可能高效。
检索文档(无论来自哪个来源)后,我们需要运行精度和综合漏斗。我们将为每个阶段创建一个节点。首先是 rerank\_node 。
# Node 3: The Reranker
def rerank_node(state: RAGState) -> Dict:
console.print("--- 🎯: Reranking Documents ---")
# Get the current step's details.
current_step_index = state["current_step_index"]
current_step = state["plan"].steps[current_step_index]
# Call our reranking function on the documents we just retrieved.
reranked_docs = rerank_documents_function(current_step.sub_question, state["retrieved_docs"])
console.print(f" Reranked to top {len(reranked_docs)} documents.")
# Update the state with the high-precision documents.
return {"reranked_docs": reranked_docs}
该节点获取 retrieved\_docs (我们对 10 个文档的广泛回忆)并使用交叉编码器将它们过滤到前 3 个,并将结果放在 reranked\_docs 中。
接下来, compression\_node 将获取前 3 个文档并对其进行提炼。
# Node 4: The Compressor / Distiller
def compression_node(state: RAGState) -> Dict:
console.print("--- ✂️: Distilling Context ---")
# Get the current step's details.
current_step_index = state["current_step_index"]
current_step = state["plan"].steps[current_step_index]
# Format the top 3 documents into a single string.
context = format_docs(state["reranked_docs"])
# Call our distiller agent to synthesize them into one paragraph.
synthesized_context = distiller_agent.invoke({"question": current_step.sub_question, "context": context})
console.print(f" Distilled Context Snippet: {synthesized_context[:200]}...")
# Update the state with the final, clean context.
return {"synthesized_context": synthesized_context}
此节点是我们检索漏斗的最后一步。它获取 reranked\_docs 并生成一个干净的 synthesized\_context 段落。
现在我们有了证据,我们需要反思它并更新我们的研究历史。这就是 reflection\_node 的工作。
# Node 5: The Reflection / Update Step
def reflection_node(state: RAGState) -> Dict:
console.print("--- : Reflecting on Findings ---")
# Get the current step's details.
current_step_index = state["current_step_index"]
current_step = state["plan"].steps[current_step_index]
# Call our reflection agent to summarize the findings.
summary = reflection_agent.invoke({"sub_question": current_step.sub_question, "context": state['synthesized_context']})
console.print(f" Summary: {summary}")
# Create a new PastStep dictionary with all the results from this step.
new_past_step = {
"step_index": current_step_index + 1,
"sub_question": current_step.sub_question,
"retrieved_docs": state['reranked_docs'], # We save the reranked docs for final citation
"summary": summary
}
# Append the new step to our history and increment the step index to move to the next step.
return {"past_steps": state["past_steps"] + [new_past_step], "current_step_index": current_step_index + 1}
这个节点是我们agent的簿记员。它调用 reflection\_agent 创建摘要,然后将当前研究周期的所有结果整齐地打包到 new\_past\_step 对象中。然后,它将这个对象添加到 past\_steps 列表中,并增加 current\_step\_index 值,使代理为下一个循环做好准备。
最后,当研究完成时,我们需要最后一个节点来生成最终答案。
# Node 6: The Final Answer Generator
def final_answer_node(state: RAGState) -> Dict:
console.print("--- ✅: Generating Final Answer with Citations ---")
# First, we need to gather all the evidence we've collected from ALL past steps.
final_context = ""
for i, step in enumerate(state['past_steps']):
final_context += f"\\n--- Findings from Research Step {i+1} ---\\n"
# We include the source metadata (section or URL) for each document to enable citations.
for doc in step['retrieved_docs']:
source = doc.metadata.get('section') or doc.metadata.get('source')
final_context += f"Source: {source}\\nContent: {doc.page_content}\\n\\n"
# We create a new prompt specifically for generating the final, citable answer.
final_answer_prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert financial analyst. Synthesize the research findings from internal documents and web searches into a comprehensive, multi-paragraph answer for the user's original question.
Your answer must be grounded in the provided context. At the end of any sentence that relies on specific information, you MUST add a citation. For 10-K documents, use [Source: <section title>]. For web results, use [Source: <URL>]."""),
("human", "Original Question: {question}\n\nResearch History and Context:\n{context}")
])
# We create a temporary agent for this final task and invoke it.
final_answer_agent = final_answer_prompt | reasoning_llm | StrOutputParser()
final_answer = final_answer_agent.invoke({"question": state['original_question'], "context": final_context})
# Update the state with the final answer.
return {"final_answer": final_answer}
这个 final\_answer\_node 是我们最终的收官之作。它将 past\_steps 历史记录中每一步的所有高质量、重新排序的文档整合成一个庞大的上下文。然后,它使用专用的提示来指示我们强大的 reasoning\_llm 将这些信息综合成一个包含引文的、全面的、多段落的答案,从而让我们的研究过程圆满结束。
定义好所有节点后,我们现在有了agent的所有构建块。下一步是定义连接它们并控制图的流程的“线路”。
十一、定义条件边
至此,我们已经构建好了所有节点。我们有一个规划器、检索器、一个重排序器、一个提取器和一个反射器。可以把它们想象成一个房间里的一群专家。现在我们需要定义对话规则。谁在什么时候发言?我们如何决定下一步做什么?
这就是 LangGraph 中边的作用。简单的边很简单, “在节点 A 之后,总是指向节点 B” 。但真正的智能来自条件边 。
条件边是一种函数,它查看代理的当前内存( RAGState )并做出决策,根据情况将工作流程路由到不同的路径。
我们的agent需要两个关键的决策功能:
- 工具路由器( route_by_tool ): 制定计划后,此功能将查看计划的当前步骤并决定是否将工作流发送到 retrieve_10k 节点或 retrieve_web 节点。
- 主控制循环( should_continue_node ): 这是最重要的一个。在每个研究步骤完成并反思之后,此函数将调用我们的 policy_agent 来决定是继续执行计划的下一步,还是完成研究并生成最终答案。
首先,让我们构建简单的工具路由器。
# Conditional Edge 1: The Tool Router
def route_by_tool(state: RAGState) -> str:
# Get the index of the current step we are on.
current_step_index = state["current_step_index"]
# Get the full details of the current step from the plan.
current_step = state["plan"].steps[current_step_index]
# Return the name of the tool specified for this step.
# LangGraph will use this string to decide which node to go to next.
return current_step.tool
这个函数非常简单,但至关重要。它就像一个总机,从状态中读取 current\_step\_index ,在 plan 中找到对应的 Step ,并返回其 tool 字段的值(可能是 "search\_10k" 或 "search\_web" )。当我们连接图时,我们会告诉它使用这个函数的输出来选择下一个节点。
现在我们需要创建一个函数来控制代理的主要推理循环。这就是我们的 policy\_agent 发挥作用的地方。
# Conditional Edge 2: The Main Control Loop
def should_continue_node(state: RAGState) -> str:
console.print("--- 🚦: Evaluating Policy ---")
# Get the index of the step we are about to start.
current_step_index = state["current_step_index"]
# First, check our basic stopping conditions.
# Condition 1: Have we completed all the steps in the plan?
if current_step_index >= len(state["plan"].steps):
console.print(" -> Plan complete. Finishing.")
return "finish"
# Condition 2: Have we exceeded our safety limit for the number of iterations?
if current_step_index >= config["max_reasoning_iterations"]:
console.print(" -> Max iterations reached. Finishing.")
return "finish"
# A special case: If the last retrieval step failed to find any documents,
# there's no point in reflecting. It's better to just move on to the next step.
if state.get("reranked_docs") is not None and not state["reranked_docs"]:
console.print(" -> Retrieval failed for the last step. Continuing with next step in plan.")
return "continue"
# If none of the basic conditions are met, it's time to ask our Policy Agent.
# We format the history and plan into strings for the prompt.
history = get_past_context_str(state['past_steps'])
plan_str = json.dumps([s.dict() for s in state['plan'].steps])
# Invoke the policy agent to get its strategic decision.
decision = policy_agent.invoke({"question": state["original_question"], "plan": plan_str, "history": history})
console.print(f" -> Decision: {decision.next_action} | Justification: {decision.justification}")
# Based on the agent's decision, return the appropriate signal.
if decision.next_action == "FINISH":
return "finish"
else: # CONTINUE_PLAN
return "continue"
这个 should\_continue\_node 函数是我们代理控制流的认知核心。它在每个 reflection\_node 之后运行。
它首先检查简单的、硬编码的停止条件 。计划是否用完了所有步骤?是否达到了 max\_reasoning\_iterations 的安全限制?这些条件可以防止智能体永远运行下去。
如果这些检查通过,它就会调用我们强大的 policy\_agent 。它为策略代理提供所需的所有上下文:原始目标( question )、完整 plan 以及迄今为止已完成工作的 history 。
最后,它获取 policy\_agent 的结构化输出( CONTINUE\_PLAN 或 FINISH ),并返回简单字符串 "continue" 或 "finish" 。LangGraph 将使用此字符串来循环回另一个研究周期或继续执行 final\_answer\_node 。
现在,我们已经定义了节点(专家)和条件边(对话规则),并且拥有了所需的一切。
现在是时候将所有这些部分组装成一个完整的、可运行的 StateGraph 了。
十二、连接深度思考 RAG 机器
我们已经准备好所有单独的组件:
- 我们的节点( 工作者 )
- 我们的条件边缘( 经理 )。
现在是时候将它们全部连接起来形成一个单一的、有凝聚力的系统了。
我们将使用 LangGraph 的 StateGraph 来定义智能体的完整认知架构。在这里,我们规划了agent思维过程的蓝图,并精确定义了信息如何从一个步骤流向下一个步骤。
我们要做的第一件事是创建一个 StateGraph 实例。我们会告诉它,它将要传递的 “状态” 是我们的 RAGState 字典。
from langgraph.graph import StateGraph, END # Import the main graph components
# Instantiate the graph, telling it to use our RAGState TypedDict as its state schema.
graph = StateGraph(RAGState)
现在我们有了一个空图。下一步是添加我们之前定义的所有节点。 .add\_node() 方法接受两个参数:节点的唯一字符串名称,以及该节点将执行的 Python 函数。
# Add all of our Python functions as nodes in the graph
graph.add_node("plan", plan_node) # The node that creates the initial plan
graph.add_node("retrieve_10k", retrieval_node) # The node for internal document retrieval
graph.add_node("retrieve_web", web_search_node) # The node for external web search
graph.add_node("rerank", rerank_node) # The node that performs precision reranking
graph.add_node("compress", compression_node) # The node that distills the context
graph.add_node("reflect", reflection_node) # The node that summarizes findings and updates history
graph.add_node("generate_final_answer", final_answer_node) # The node that synthesizes the final answer
现在所有专家都到齐了。最后也是最关键的一步是定义连接他们的“线路”。在这里,我们使用 .add\_edge() 和 .add\_conditional\_edges() 方法来定义控制流。
# The entry point of our graph is the "plan" node. Every run starts here.
graph.set_entry_point("plan")
# After the "plan" node, we use our first conditional edge to decide which tool to use.
graph.add_conditional_edges(
"plan", # The source node
route_by_tool, # The function that makes the decision
{ # A dictionary mapping the function's output string to the destination node
"search_10k": "retrieve_10k",
"search_web": "retrieve_web",
},
)
# After retrieving from either the 10-K or the web, the flow is linear for a bit.
graph.add_edge("retrieve_10k", "rerank") # After internal retrieval, always go to rerank.
graph.add_edge("retrieve_web", "rerank") # After web retrieval, also always go to rerank.
graph.add_edge("rerank", "compress") # After reranking, always go to compress.
graph.add_edge("compress", "reflect") # After compressing, always go to reflect.
# After the "reflect" node, we hit our main conditional edge, which controls the reasoning loop.
graph.add_conditional_edges(
"reflect", # The source node
should_continue_node, # The function that calls our Policy Agent
{ # A dictionary mapping the decision to the next step
"continue": "plan", # If the decision is "continue", we loop back to the "plan" node to route the next step.
"finish": "generate_final_answer", # If the decision is "finish", we proceed to generate the final answer.
},
)
# The "generate_final_answer" node is the last step before the end.
graph.add_edge("generate_final_answer", END) # After generating the answer, the graph concludes.
print("StateGraph constructed successfully.")
这是我们的agent大脑的蓝图。让我们来追踪一下它的流程:
-
一切总是按 plan 开始。
-
然后, route_by_tool 条件边充当开关,将流引导至 retrieve_10k 或 retrieve_web 。
-
无论运行哪个检索器,输出总是通过 rerank -> compress -> reflect 管道进行传输。
-
这引出了最重要的部分: should_continue_node 条件边。这是我们循环推理的核心。
-
如果策略代理说 CONTINUE_PLAN ,边缘节点会将工作流一路发送回 plan 节点。我们返回到 plan (而不是直接发送到下一个检索器),以便 route_by_tool 能够正确地路由计划中的下一步 。
-
如果策略代理说 FINISH ,边缘将中断循环并将工作流发送到 generate_final_answer 节点。
-
最后,在生成答案后,图表在 END 处终止。
我们已经成功定义了深度思考agent的完整、复杂且循环的架构。剩下要做的就是将这个蓝图编译成一个可运行的应用程序,并将其可视化,以查看我们构建的内容。
十三、编译和可视化迭代工作流
当我们的图完全连接好后,组装过程的最后一步就是编译它。.compile .compile() 方法将我们对节点和边的抽象定义转换为具体的可执行应用程序。
然后,我们可以使用内置的 LangGraph 实用程序生成图的示意图。可视化工作流程对于理解和调试复杂的代理系统非常有帮助。它将我们的代码转换为直观的流程图,清晰地显示代理可能的推理路径。
所以,基本上,我们正在将我们的蓝图变成一台真正的机器。
# The .compile() method takes our graph definition and creates a runnable object.
deep_thinking_rag_graph = graph.compile()
print("Graph compiled successfully.")
# Now, let's visualize the architecture we've built.
try:
from IPython.display import Image, display
# We can get a PNG image of the graph's structure.
png_image = deep_thinking_rag_graph.get_graph().draw_png()
# Display the image in our notebook.
display(Image(png_image))
except Exception as e:
# This can fail if pygraphviz and its system dependencies are not installed.
print(f"Graph visualization failed: {e}. Please ensure pygraphviz is installed.")
deep\_thinking\_rag\_graph 对象现在已经成为我们功能齐全的agent了。可视化代码随后调用 .get\_graph().draw\_png() 来生成我们构建的状态机的可视化表示。
我们可以清晰的看到:
-
初始分支逻辑是 route_by_tool 在 retrieve_10k 和 retrieve_web 之间进行选择。
-
每个研究步骤的线性处理管道( rerank - compress - reflect )。
-
关键的反馈循环 ,其中 should_continue 边缘将工作流发送回 plan 节点以开始下一个研究周期。
-
研究完成后,最后的“出口坡道”将通向 generate_final_answer 。
这就是一个能够思考的系统的架构。现在,让我们来测试一下。
十四、运行深度思考管道
我们已经设计出一个推理引擎。现在是时候看看它能否在我们基线系统彻底失败的地方取得成功。
我们将使用完全相同的多跳、多源挑战查询来调用已编译的 deep\_thinking\_rag\_graph 。我们将使用 .stream() 方法实时、逐步地跟踪代理的执行情况,观察其解决问题时的“思考过程”。
这是本节的计划:
-
调用图表: 我们将运行我们的代理并观察它如何执行其计划、在工具之间切换以及构建其研究历史。
-
分析最终输出: 我们将检查最终的综合答案,看看它是否成功整合了来自 10-K 和网络的信息。
-
比较结果: 我们将进行最后的并列比较,以明确突出我们的深度思考代理的架构优势。
我们将设置初始输入,它只是一个包含 original_question 的字典,然后调用 stream .stream() 方法。stream 方法非常适合调试和观察,因为它会在每个节点完成工作后返回图的状态。
# This will hold the final state of the graph after the run is complete.
final_state = None
# The initial input for our graph, containing the original user query.
graph_input = {"original_question": complex_query_adv}
print("--- Invoking Deep Thinking RAG Graph ---")
# We use .stream() to watch the agent's process in real-time.
# "values" mode means we get the full RAGState object after each step.
for chunk in deep_thinking_rag_graph.stream(graph_input, stream_mode="values"):
# The final chunk in the stream will be the terminal state of the graph.
final_state = chunk
print("\n--- Graph Stream Finished ---")
这个循环就是我们的agent开始运作的地方。每次迭代时,LangGraph 都会执行工作流中的下一个节点,更新 RAGState ,并将新的状态返回给我们。我们嵌入在节点中的 rich 库 console.print 语句将为我们提供代理操作和决策的持续评论。
#### OUTPUT ####
--- Invoking Deep Thinking RAG Graph ---
--- 🧠: Generating Plan ---
plan:
steps:
- sub_question: What are the key risks related to competition as stated in NVIDIA's 2023 10-K filing?
tool: search_10k
...
- sub_question: What are the recent news and developments in AMD's AI chip strategy in 2024?
tool: search_web
...
--- 🔍: Retrieving from 10-K (Step 1: ...) ---
Rewritten Query: key competitive risks for NVIDIA in the semiconductor industry...
Supervisor Decision: Use `hybrid_search`. ...
--- 🎯: Reranking Documents ---
Reranked to top 3 documents.
--- ✂️: Distilling Context ---
Distilled Context Snippet: NVIDIA operates in the intensely competitive semiconductor industry...
--- 🤔: Reflecting on Findings ---
Summary: According to its 2023 10-K, NVIDIA operates in an intensely competitive semiconductor industry...
--- 🚦: Evaluating Policy ---
-> Decision: CONTINUE_PLAN | Justification: The first step...has been completed. The next step...is still pending...
--- 🌐: Searching Web (Step 2: ...) ---
Rewritten Query: AMD AI chip strategy news and developments 2024...
--- 🎯: Reranking Documents ---
Reranked to top 3 documents.
--- ✂️: Distilling Context ---
Distilled Context Snippet: AMD has ramped up its challenge to Nvidia in the AI accelerator market with its Instinct MI300 series...
--- 🤔: Reflecting on Findings ---
Summary: In 2024, AMD is aggressively competing with NVIDIA in the AI chip market through its Instinct MI300X accelerator...
--- 🚦: Evaluating Policy ---
-> Decision: FINISH | Justification: The research history now contains comprehensive summaries of both NVIDIA's stated risks and AMD's recent strategy...
--- ✅: Generating Final Answer with Citations ---
--- Graph Stream Finished ---
您可以看到我们设计的执行情况:
-
计划: 它创建了正确的两步、多工具计划。
-
执行步骤 1: 使用 search_10k ,通过完整的检索漏斗运行,并反映结果。
-
自我批评: 政策agent发现计划尚未完成,并决定 CONTINUE_PLAN 。
-
执行步骤 2: 它正确切换到 search_web 工具,通过相同的漏斗运行它,然后再次反映。
-
再次自我批评: 这一次,政策agent看到所有必要的信息都已收集,并正确地决定 FINISH。
-
综合: 工作流程随后进入 generate_final_answer 节点。
agent已成功处理完复杂的查询。现在,让我们来检查一下它生成的最终答案。
十五、分析最终的高质量答案
agent已完成研究。 final\_state 变量现在保存了完整的 RAGState ,包括 final\_answer 。让我们将其打印出来,看看它是否成功地将来自两个来源的信息合成为一个完整的、带有引用的分析性响应。
console.print("--- DEEP THINKING RAG FINAL ANSWER ---")
console.print(Markdown(final_state['final_answer']))
#### OUTPUT ####
--- DEEP THINKING RAG FINAL ANSWER ---
Based on an analysis of NVIDIA's 2023 10-K filing and recent news from 2024 regarding AMD's AI chip strategy, the following synthesis can be made:
**NVIDIA's Stated Competitive Risks:**
In its 2023 10-K filing, NVIDIA identifies its operating environment as the "intensely competitive" semiconductor industry, which is characterized by rapid technological change. A primary risk is that competitors, including AMD, could introduce new products with better performance or lower costs that gain significant market acceptance, which could materially and adversely affect its business [Source: Item 1A. Risk Factors.].
**AMD's 2024 AI Chip Strategy:**
In 2024, AMD has moved aggressively to challenge NVIDIA's dominance in the AI hardware market with its Instinct MI300 series of accelerators, particularly the MI300X. This product is designed to compete directly with NVIDIA's H100 GPU. AMD's strategy has gained significant traction, with major cloud providers such as Microsoft Azure and Oracle announcing plans to use the new chips [Source: https://www.reuters.com/technology/amd-forecasts-35-billion-ai-chip-revenue-2024-2024-01-30/].
**Synthesis and Impact:**
AMD's 2024 AI chip strategy directly exacerbates the competitive risks outlined in NVIDIA's 10-K. The successful launch and adoption of the MI300X is a materialization of the specific risk that a competitor could introduce a product with comparable performance. The adoption of AMD's chips by major cloud providers signifies a direct challenge to NVIDIA's market share in the lucrative data center segment, validating NVIDIA's stated concerns about rapid technological change [Source: Item 1A. Risk Factors. and https://www.cnbc.com/2023/12/06/amd-launches-new-mi300x-ai-chip-to-compete-with-nvidias-h100.html].
这是一次圆满的成功。答案是一份深入的分析清单。
- 正确地总结了 10-K 中的风险。
- 正确地总结了来自网络搜索的 AMD 新闻。
- 至关重要的是,在“综合与影响”部分,它执行原始查询所需的多跳推理,解释后者如何加剧前者。
- 最后,它提供了正确的出处 ,其中的引用指向内部文档部分和外部网址。
十六、并列比较
让我们将这两个结果并列放在一起,以便清楚地看到差异。
| Feature | Vanilla RAG (Failed) | Deep Thinking RAG (Success) | | --- | --- | --- | | Thinking Style | One-shot, no memory. | Multi-step, memory-based reasoning. | | Planning | No planning treats whole query as one search. | Breaks query into steps, chooses best tools (internal or web) for each. | | Search Method | Basic semantic search on one source. | Smart, adaptive search using the best method for each step. | | Sources Used | Only one static document. | Mixes internal docs with live web data. | | Answer Quality | Failed no synthesis. | Success—clear, well cited answer from multiple sources. |
这次比较提供了明确的结论。架构转变为一个循环的、工具感知的、自我批评的agent,使得在复杂的现实世界查询上的性能得到了显著且可衡量的提升。
十七、评估框架与分析结果
我们已经见证了我们的高级agent在一次非常困难的查询中取得了成功。但在生产环境中,我们需要的不仅仅是一个成功案例。我们需要客观、量化且自动化的验证。
为了实现这一目标,我们现在将使用 RAGA (RAG 评估)库构建一个严格的评估框架。我们将重点关注 RAGA 提供的四个关键指标:
-
上下文精确度和召回率:这衡量了我们检索流程的质量。 精确度指的是:“在我们检索到的文档中,有多少是真正相关的?”(信号与噪声)。 召回率指的是:“在所有存在的相关文档中,我们实际找到了多少?”(完整性)。
-
答案忠实度:这衡量生成的答案是否基于所提供的上下文,作为我们对 LLM 幻觉的主要检查。
-
答案正确性:这是质量的最终衡量标准。它将生成的答案与人工编制的“基本事实”答案进行比较,以评估其事实准确性和完整性。
要运行 RAGA 评估,我们需要准备一个数据集。该数据集将包含我们的挑战查询、由我们的基线和高级流程生成的答案、它们各自使用的上下文,以及我们自己编写的作为理想响应的“基本事实”答案。
from datasets import Dataset # From the Hugging Face datasets library, which RAGAs uses
from ragas import evaluate
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
answer_correctness,
)
import pandas as pd
print("Preparing evaluation dataset...")
# This is our manually crafted, ideal answer to the complex query.
ground_truth_answer_adv = "NVIDIA's 2023 10-K lists intense competition and rapid technological change as key risks. This risk is exacerbated by AMD's 2024 strategy, specifically the launch of the MI300X AI accelerator, which directly competes with NVIDIA's H100 and has been adopted by major cloud providers, threatening NVIDIA's market share in the data center segment."
# We need to re-run the retriever for the baseline model to get its context for the evaluation.
retrieved_docs_for_baseline_adv = baseline_retriever.invoke(complex_query_adv)
baseline_contexts = [[doc.page_content for doc in retrieved_docs_for_baseline_adv]]
# For the advanced agent, we'll consolidate all the documents it retrieved across all research steps.
advanced_contexts_flat = []
for step in final_state['past_steps']:
advanced_contexts_flat.extend([doc.page_content for doc in step['retrieved_docs']])
# We use a set to remove any duplicate documents for a cleaner evaluation.
advanced_contexts = [list(set(advanced_contexts_flat))]
# Now, we construct the dictionary that will be turned into our evaluation dataset.
eval_data = {
'question': [complex_query_adv, complex_query_adv], # The same question for both systems
'answer': [baseline_result, final_state['final_answer']], # The answers from each system
'contexts': baseline_contexts + advanced_contexts, # The contexts each system used
'ground_truth': [ground_truth_answer_adv, ground_truth_answer_adv] # The ideal answer
}
# Create the Hugging Face Dataset object.
eval_dataset = Dataset.from_dict(eval_data)
# Define the list of metrics we want to compute.
metrics = [
context_precision,
context_recall,
faithfulness,
answer_correctness,
]
print("Running RAGAs evaluation...")
# Run the evaluation. RAGAs will call an LLM to perform the scoring for each metric.
result = evaluate(eval_dataset, metrics=metrics, is_async=False)
print("Evaluation complete.")
# Format the results into a clean pandas DataFrame for easy comparison.
results_df = result.to_pandas()
results_df.index = ['baseline_rag', 'deep_thinking_rag']
print("\n--- RAGAs Evaluation Results ---")
print(results_df[['context_precision', 'context_recall', 'faithfulness', 'answer_correctness']].T)
开始进行评估实验。我们收集了单个硬查询所需的所有必要数据:问题、两个不同的答案、两组不同的上下文以及我们理想的基本事实。然后,我们将这个打包好的 eval\_dataset 提供给 ragas.evaluate 函数。
在后台,RAGA 会进行一系列 LLM 调用,要求其充当判断者。例如,对于 faithfulness ”,它会询问:“这个答案是否完全符合上下文?”对于 answer\_correctness ”,它会询问……
这个答案与这个基本事实答案在事实上有多相似?
我们可以看看数字分数……
#### OUTPUT ####
Preparing evaluation dataset...
Running RAGAs evaluation...
Evaluation complete.
--- RAGAs Evaluation Results ---
baseline_rag deep_thinking_rag
context_precision 0.500000 0.890000
context_recall 0.333333 1.000000
faithfulness 1.000000 1.000000
answer_correctness 0.395112 0.991458
定量结果对深度思考架构的优越性提供了明确而客观的判断。
-
语境准确率(0.50 vs 0.89):baseline agent的语境仅有一半相关性,因为它只能检索关于比赛的一般信息。高级智能体通过多步骤、多工具的检索获得了完美的准确率。
-
上下文召回率(0.33 vs 1.00):baseline检索器完全遗漏了来自网络的关键信息,导致召回率得分非常低。高级智能体通过规划和工具运用,确保找到所有必要信息,实现了完美的召回率。
-
忠诚度(1.00 vs 1.00):两个系统都高度忠诚。baseline系统正确地指出它没有这些信息,而高级agent正确地使用了它找到的信息。这对两者来说都是一个好兆头,但缺乏正确性的忠诚度是没有意义的。
-
答案正确率(0.40 vs 0.99):这是质量的最终衡量标准。baseline agent的答案正确率不到 40%,因为它缺少所需分析的整个后半部分。高级agent的答案几乎完美。
十八、总结整个流程
在本文中,我们完成了从简单、脆弱的 RAG 管道到复杂的自主推理代理的完整架构。
-
我们首先构建了一个原始的 RAG 系统,并展示了它在复杂的多源查询中可预测的失败。
-
然后,我们系统地设计了一个深度思考代理,使其具有规划、使用多种工具和调整检索策略的能力。
-
我们构建了一个多阶段检索漏斗,从广泛召回(使用混合搜索)到高精度(使用跨编码器重新排序器),最后到合成(使用蒸馏器代理)。
-
我们使用 LangGraph 来协调整个认知架构,创建一个循环的、有状态的工作流程,实现真正的多步骤推理。
-
我们实施了一个自我批评循环 ,允许代理识别失败,修改自己的计划,并在找不到答案时优雅地退出。
-
最后,我们通过生产级评估验证了我们的成功,使用 RAGA 为高级代理的优越性提供客观、定量的证明。
十九、利用马尔可夫决策过程(MDP)学习策略
我们的agent有一个策略agent,负责决定 CONTINUE 还是 FINISH 。目前,每个决策都依赖于像 GPT-4o 这样昂贵的通用LLM。虽然这种方法有效,但在生产环境中可能速度慢且成本高昂。学术前沿提供了一条更优化的思路:
- RAG 作为决策过程:我们可以将agent的推理循环构建为马尔可夫决策过程 (MDP) 。在这个模型中,每个 RAGState 都是一个“状态”,每个动作( CONTINUE 、 REVISE 、 FINISH )都会引导我们进入一个具有特定奖励(例如,找到正确答案)的新状态。
- 从经验中学习:我们在 LangSmith 中记录的数千条成功和失败的推理轨迹是宝贵的训练数据。每条轨迹都是代理导航此 MDP 的一个示例。
- 训练策略模型:利用这些数据,我们可以应用强化学习来训练一个更小、更专业的策略模型。
- 目标:速度与效率:目标是将 GPT-4o 等模型的复杂推理提炼为一个紧凑且经过微调的模型(例如,一个 7B 参数模型)。这种学习到的策略可以更快、更经济地完成 CONTINUE / FINISH 的决策,同时针对特定领域进行高度优化。这是 DeepRAG 等高级研究论文背后的核心思想,代表了自主 RAG 系统优化的更高水平。
