点击上方蓝字关注我们
之前我们介绍了来自LLM开发框架LlamaIndex的新特性:Workflows,一种事件驱动、用于构建复杂AI工作流应用的新方法(参考: 深入解析LlamaIndex Workflows:构建复杂RAG与智能体工作流的新利器【上篇】 )。在本篇中,我们将继续学习如何基于Workflows来构建一个ReAct模式的AI智能体。尽管在LlamaIndex框架中已经提供了开箱即用的ReActAgent组件,但通过Workflows来从零构建ReAct智能体,可以更深入的了解ReAct智能体的内部原理,在未来帮助实现更底层、更灵活的控制能力。
01
ReAct Agent再回顾
很多人都对ReAct智能体有所了解,在LlamaIndex与LangChain框架中也都有现成的ReActAgent封装组件,可以开箱即用的构建ReAct模式的AI智能体。
ReAct模式的AI智能体采用迭代式的推理(Reasoning)到行动(Acting)的工作流程,旨在应对更复杂的人工任务和问题。它通过将推理步骤与实际行动相结合,使得智能体可以逐步理解任务、采取行动,并观察行动获得的新信息以推理后续步骤。过程大致如下:
- 推理 :智能体会分析任务与环境、推理步骤、决定下一步行动
- 行动 :调用外部工具,如搜索、执行计算、与外部API交互等
- 观察并循环 :观察行动结果,推理后续步骤,调整策略,直至任务完成
ReAct模式具备很好的动态性,使得AI能够应对复杂和未知情况,适用于更开放性的问题和探索性的任务,展现出更高的自主决策智能。
ReAct Agent基本构成
02
设计ReAct Agent工作流
根据ReAct智能体的基本思想,其工作流中最核心的步骤(step)应该包括:
- 将输入问题(或任务)、已有对话、工具信息、已经获得的信息(即已调用工具的返回内容)等输入LLM,让LLM推理下一步动作
- 如果此时LLM可以回答,则直接给出答案,结束流程
- 如果此时LLM无法回答,则给出使用工具的信息(工具名、输入参数等)
- 如果需要使用工具,则根据第3步给出的信息进行工具调用,并获得返回
- 循环 到第一步,进行迭代,直到在第2步能够完成任务;
基于LlamaIndex Workflows开发ReAct Agent的工作流程图如下:
现在可以参考这个工作流来实现ReAct智能体,这里基于官方的样例进行讲解。
03
基于Workflows实现ReAct Agent
【定义Event】
参考上面的工作流图,定义几个需要的Event类型:
from llama\_index.core.llms import ChatMessage
from llama\_index.core.tools import ToolSelection, ToolOutput
from llama\_index.core.workflow import Event
import os
#通知事件
class PrepEvent(Event):
pass
#LLM输入事件:包含输入LLM的历史消息
class InputEvent(Event):
input: list[ChatMessage]
#工具调用事件:包含工具调用信息
class ToolCallEvent(Event):
tool\_calls: list[ToolSelection]
#工具输出事件:包含工具输出信息
class FunctionOutputEvent(Event):
output: ToolOutput
【ReAct Agent初始化】
工作流初始化,主要是为了给智能体准备必备的“工具”,最重要的就是智能体需要的几大件: LLM大模型、Memory记忆、以及可以使用的Tools工具 。
from typing import Any, List
from llama\_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama\_index.core.agent.react.types import ActionReasoningStep,ObservationReasoningStep
from llama\_index.core.llms.llm import LLM
from llama\_index.core.memory import ChatMemoryBuffer
from llama\_index.core.tools.types import BaseTool
from llama\_index.core.workflow import Context,Workflow,StartEvent,StopEvent,step
from llama\_index.llms.openai import OpenAI
class ReActAgent(Workflow):
def \_\_init\_\_(
self,
*args: Any,
llm: LLM | None = None,
tools: list[BaseTool] | None = None,
extra\_context: str | None = None,
**kwargs: Any,
) -> None:
super().\_\_init\_\_(*args, **kwargs)
#可用的工具tools
self.tools = tools or []
#使用的LLM(大模型)
self.llm = llm or OpenAI()
#持久记忆
self.memory = ChatMemoryBuffer.from\_defaults(llm=llm)
#用来把历史对话、已有的推理历史格式化成下一次LLM的输入消息历史
self.formatter = ReActChatFormatter(context=extra\_context or "")
#解析LLM的输出(直接回答、使用工具、使用工具后回答)
self.output\_parser = ReActOutputParser()
#保存工具调用输出
self.sources = []
这里有两个辅助工具:
- formatter :用来把保存在memory中的对话历史以及推理历史,格式化成LLM输入的消息格式(通常是一个包含role与content属性的对象列表);还要附加上引导LLM进行思考的系统指令。
- out_parser :解析LLM输出的解析器。在ReAct模式下,LLM的输出可能是类似Thought...Action...Action Input...这样的推理结果,需要对这样的输出进行解析,以决定下一步是使用工具还是输出答案。
【用户输入消息处理:new_user_msg】
这是一次性的步骤,简单的把输入问题/任务放入memory即可:
@step
async def new\_user\_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
"""
流程入口: 接受用户输入, 并放置到Memory中; 并触发下一步
"""
self.sources = []
user\_input = ev.input
user\_msg = ChatMessage(role="user", content=user\_input)
self.memory.put(user\_msg)
await ctx.set("current\_reasoning", [])
return PrepEvent()
【LLM输入准备:prepare_chat_history 】
在这个步骤中,利用上面初始化的formatter,把对话历史与推理历史格式化,用来输入给LLM做推理。注意 在一次任务中,这个步骤有可能会被多次循环调用,除非输入问题被LLM直接回答(无需借助工具)。
@step
async def prepare\_chat\_history(
self, ctx: Context, ev: PrepEvent
) -> InputEvent:
"""
将对话与推理历史组装成LLM的输入消息列表(通常是角色+内容)。
推理历史包括:
1. LLM输出的推理结果(直接回答问题、需要工具调用、观察工具调用结果后可以回答)
2. 工具调用的结果
"""
#获取历史消息
chat\_history = self.memory.get()
print(f'\n------------当前消息历史------------')
for idx, message in enumerate(chat\_history, start=1):
print(f'\n{idx}. {message}')
current\_reasoning = await ctx.get("current\_reasoning", default=[])
print('\n-------------当前推理历史------------')
for idx, reasoning in enumerate(current\_reasoning, start=1):
print(f'\n{idx}. {reasoning}')
#将历史用户消息与推理历史组装成列表
llm\_input = self.formatter.format(
self.tools, chat\_history, current\_reasoning=current\_reasoning
)
return InputEvent(input=llm\_input)
【LLM调用:handle_llm_input 】
使用上一步骤准备的输入内容,调用LLM,并解析结果。以决定下一步动作(返回不同的事件),具体可以参考下面的代码及注释:
@step
async def handle\_llm\_input(
self, ctx: Context, ev: InputEvent
) -> ToolCallEvent | StopEvent:
"""
调用LLM;
解析输出结果, 获得推理结果;
判断是结束(可以回答问题), 还是需要调用工具;
"""
chat\_history = ev.input
#调用LLM
response = await self.llm.achat(chat\_history)
try:
#解析输出的推理结果
reasoning\_step = self.output\_parser.parse(response.message.content)
(await ctx.get("current\_reasoning", default=[])).append(
reasoning\_step
)
#如果已经结束:输出结果,流程结束(可立即回答,或者观察工具调用结果后可以回答)
if reasoning\_step.is\_done:
self.memory.put(
ChatMessage(
role="assistant", content=reasoning\_step.response
)
)
return StopEvent(
result={
"response": reasoning\_step.response,
"sources": [*self.sources],
"reasoning": await ctx.get(
"current\_reasoning", default=[]
),
}
)
#如果无法回答,需要调用工具
elif isinstance(reasoning\_step, ActionReasoningStep):
tool\_name = reasoning\_step.action
tool\_args = reasoning\_step.action\_input
return ToolCallEvent(
tool\_calls=[
ToolSelection(
tool\_id="",
tool\_name=tool\_name,
tool\_kwargs=tool\_args,
)
]
)
except Exception as e:
(await ctx.get("current\_reasoning", default=[])).append(
ObservationReasoningStep(
observation=f"There was an error in parsing my reasoning: {e}"
)
)
# 其他情况则进行下一次迭代,继续尝试
return PrepEvent()
【工具调用:handle_tool_calls 】
这是智能体使用外部工具(Tools)的关键步骤。根据上一步骤LLM输出的工具调用需求,调用外部工具(可能有多次调用),并把返回结果放在推理历史中,用于下一次迭代。
@step
async def handle\_tool\_calls(
self, ctx: Context, ev: ToolCallEvent
) -> PrepEvent:
"""
工具调用,将调用结果作为LLM的观察对象;
并将观察内容添加到推理历史
"""
tool\_calls = ev.tool\_calls
tools\_by\_name = {tool.metadata.get\_name(): tool for tool in self.tools}
# 工具调用
for tool\_call in tool\_calls:
tool = tools\_by\_name.get(tool\_call.tool\_name)
if not tool:
(await ctx.get("current\_reasoning", default=[])).append(
ObservationReasoningStep(
observation=f"Tool {tool\_call.tool\_name} does not exist"
)
)
continue
try:
#调用工具,并将工具调用结果作为观察对象,添加到推理历史
tool\_output = tool(**tool\_call.tool\_kwargs)
self.sources.append(tool\_output)
(await ctx.get("current\_reasoning", default=[])).append(
ObservationReasoningStep(observation=tool\_output.content)
)
except Exception as e:
(await ctx.get("current\_reasoning", default=[])).append(
ObservationReasoningStep(
observation=f"Error calling tool {tool.metadata.get\_name()}: {e}"
)
)
# 进入下一次迭代
return PrepEvent()
【测试实现的ReAct Agent 】
现在,整个ReAct Agent的工作流就完成了,过程还是比较简单清晰的。当然这里也会利用到一些LlamaIndex提供的组件,比如用来封装LLM推理结果的xxxReasoningStep组件等。
我们来测试这个ReAct Agent组件,准备两个模拟工具(利用LlamaIndex中的FunctionTool快速构造基于函数的工具),然后创建一个Agent :
from llama\_index.core.tools import BaseTool, FunctionTool
#模拟发送邮件
def send\_email(subject: str, message: str, email: str) -> None:
"""用于发送电子邮件"""
print(f"邮件已发送至 {email},主题为 {subject},内容为 {message}")
tool\_send\_mail = FunctionTool.from\_defaults(fn=send\_email,name='tool\_send\_mail',description='用于发送电子邮件')
#模拟客户查询
def query\_customer(phone: str) -> str:
"""用于查询客户信息"""
result = f"该客户信息为:\n姓名: 张三\n积分: 50000分\n邮件: test@gmail.com"
return result
tool\_customer = FunctionTool.from\_defaults(fn=query\_customer,name='tool\_customer',description='用于查询客户信息,包括姓名、积分与邮件')
agent = ReActAgent(
llm=OpenAI(model="gpt-4o-mini"), tools=[tool\_send\_mail,tool\_customer], timeout=120, verbose=True
)
现在调用这个Agent,我们发出一个比较复杂的请求,来看看会发生什么:
async def main():
ret = await agent.run(input="给客户13688888888发电子邮件,通知他最新的积分")
print(ret["response"])
if \_\_name\_\_ == "\_\_main\_\_":
import asyncio
asyncio.run(main())
这里的任务很显然需要借助两次工具调用,一次查询客户信息,一次发送邮件,我们从输出中来观察最后一次的迭代信息:
注意到这里的推理历史,完整的反应了LLM的“思考”过程:首先需要查询客户信息(调用tool_customer);然后观察到客户信息(工具调用结果);判断需要发送邮件(调用tool_send_mail);最后观察返回结果后结束流程。这是一个完整的符合ReAct模式(推理-行动-观察)的工作流,也证明了这里基于Workflows构建的ReAct Agent的可用性。
04
结束语
至此我们对LlamaIndex所推出的新特性Workflows已经有了较为全面的认识,很显然,这是一个与LangChain的LangGraph相似的另一种智能体开发底层框架,两者都面向复杂的智能体/RAG应用工作流,但又采取了不同的设计思想,至于哪个更好或许是见仁见智的问题,但对于大量LLM应用的开发者来说,的确又多了一个强大的工具,相信随着后续的迭代,Workflows也会越来越强大。
END
点击下方关注我,不迷路
交流请识别以下名片并说明来源