深入解析LlamaIndex Workflows【下篇】:实现ReAct模式AI智能体的新方法

大模型向量数据库NoSQL数据库

picture.image

picture.image

点击上方蓝字关注我们

之前我们介绍了来自LLM开发框架LlamaIndex的新特性:Workflows,一种事件驱动、用于构建复杂AI工作流应用的新方法(参考: 深入解析LlamaIndex Workflows:构建复杂RAG与智能体工作流的新利器【上篇】 )。在本篇中,我们将继续学习如何基于Workflows来构建一个ReAct模式的AI智能体。尽管在LlamaIndex框架中已经提供了开箱即用的ReActAgent组件,但通过Workflows来从零构建ReAct智能体,可以更深入的了解ReAct智能体的内部原理,在未来帮助实现更底层、更灵活的控制能力。

01

ReAct Agent再回顾

很多人都对ReAct智能体有所了解,在LlamaIndex与LangChain框架中也都有现成的ReActAgent封装组件,可以开箱即用的构建ReAct模式的AI智能体。

ReAct模式的AI智能体采用迭代式的推理(Reasoning)到行动(Acting)的工作流程,旨在应对更复杂的人工任务和问题。它通过将推理步骤与实际行动相结合,使得智能体可以逐步理解任务、采取行动,并观察行动获得的新信息以推理后续步骤。过程大致如下:

  1. 推理 :智能体会分析任务与环境、推理步骤、决定下一步行动
  2. 行动 :调用外部工具,如搜索、执行计算、与外部API交互等
  3. 观察并循环 :观察行动结果,推理后续步骤,调整策略,直至任务完成

ReAct模式具备很好的动态性,使得AI能够应对复杂和未知情况,适用于更开放性的问题和探索性的任务,展现出更高的自主决策智能。

picture.image

ReAct Agent基本构成

02

设计ReAct Agent工作流

根据ReAct智能体的基本思想,其工作流中最核心的步骤(step)应该包括:

  1. 将输入问题(或任务)、已有对话、工具信息、已经获得的信息(即已调用工具的返回内容)等输入LLM,让LLM推理下一步动作
  2. 如果此时LLM可以回答,则直接给出答案,结束流程
  3. 如果此时LLM无法回答,则给出使用工具的信息(工具名、输入参数等)
  4. 如果需要使用工具,则根据第3步给出的信息进行工具调用,并获得返回
  5. 循环 到第一步,进行迭代,直到在第2步能够完成任务;

基于LlamaIndex Workflows开发ReAct Agent的工作流程图如下:

picture.image

现在可以参考这个工作流来实现ReAct智能体,这里基于官方的样例进行讲解。

03

基于Workflows实现ReAct Agent

【定义Event】

参考上面的工作流图,定义几个需要的Event类型:


        
            

          from llama\_index.core.llms import ChatMessage  
from llama\_index.core.tools import ToolSelection, ToolOutput  
from llama\_index.core.workflow import Event  
import os  
  
#通知事件  
class PrepEvent(Event):  
    pass  
  
#LLM输入事件:包含输入LLM的历史消息  
class InputEvent(Event):  
    input: list[ChatMessage]  
  
#工具调用事件:包含工具调用信息  
class ToolCallEvent(Event):  
    tool\_calls: list[ToolSelection]  
  
#工具输出事件:包含工具输出信息  
class FunctionOutputEvent(Event):  
    output: ToolOutput
        
      

【ReAct Agent初始化】

工作流初始化,主要是为了给智能体准备必备的“工具”,最重要的就是智能体需要的几大件: LLM大模型、Memory记忆、以及可以使用的Tools工具


        
            

          from typing import Any, List  
from llama\_index.core.agent.react import ReActChatFormatter, ReActOutputParser  
from llama\_index.core.agent.react.types import ActionReasoningStep,ObservationReasoningStep  
from llama\_index.core.llms.llm import LLM  
from llama\_index.core.memory import ChatMemoryBuffer  
from llama\_index.core.tools.types import BaseTool  
from llama\_index.core.workflow import Context,Workflow,StartEvent,StopEvent,step  
from llama\_index.llms.openai import OpenAI  
  
class ReActAgent(Workflow):  
    def \_\_init\_\_(  
        self,  
        *args: Any,  
        llm: LLM | None = None,  
        tools: list[BaseTool] | None = None,  
        extra\_context: str | None = None,  
        **kwargs: Any,  
    ) -> None:  
        super().\_\_init\_\_(*args, **kwargs)  
  
        #可用的工具tools  
        self.tools = tools or []  
  
        #使用的LLM(大模型)  
        self.llm = llm or OpenAI()  
  
        #持久记忆  
        self.memory = ChatMemoryBuffer.from\_defaults(llm=llm)  
  
        #用来把历史对话、已有的推理历史格式化成下一次LLM的输入消息历史  
        self.formatter = ReActChatFormatter(context=extra\_context or "")  
  
        #解析LLM的输出(直接回答、使用工具、使用工具后回答)  
        self.output\_parser = ReActOutputParser()  
  
        #保存工具调用输出  
        self.sources = []
        
      

这里有两个辅助工具:

  • formatter :用来把保存在memory中的对话历史以及推理历史,格式化成LLM输入的消息格式(通常是一个包含role与content属性的对象列表);还要附加上引导LLM进行思考的系统指令。
  • out_parser :解析LLM输出的解析器。在ReAct模式下,LLM的输出可能是类似Thought...Action...Action Input...这样的推理结果,需要对这样的输出进行解析,以决定下一步是使用工具还是输出答案。

【用户输入消息处理:new_user_msg】

这是一次性的步骤,简单的把输入问题/任务放入memory即可:


        
            

              @step  
    async def new\_user\_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:  
  
        """  
        流程入口: 接受用户输入, 并放置到Memory中; 并触发下一步  
        """  
  
        self.sources = []  
  
        user\_input = ev.input  
        user\_msg = ChatMessage(role="user", content=user\_input)  
        self.memory.put(user\_msg)  
  
        await ctx.set("current\_reasoning", [])  
        return PrepEvent()
        
      

【LLM输入准备:prepare_chat_history

在这个步骤中,利用上面初始化的formatter,把对话历史与推理历史格式化,用来输入给LLM做推理。注意 在一次任务中,这个步骤有可能会被多次循环调用,除非输入问题被LLM直接回答(无需借助工具)。


        
            

             @step  
    async def prepare\_chat\_history(  
        self, ctx: Context, ev: PrepEvent  
    ) -> InputEvent:  
          
        """  
        将对话与推理历史组装成LLM的输入消息列表(通常是角色+内容)。  
        推理历史包括:  
        1. LLM输出的推理结果(直接回答问题、需要工具调用、观察工具调用结果后可以回答)  
        2. 工具调用的结果  
        """  
  
        #获取历史消息  
        chat\_history = self.memory.get()  
  
        print(f'\n------------当前消息历史------------')  
        for idx, message in enumerate(chat\_history, start=1):  
            print(f'\n{idx}. {message}')  
  
        current\_reasoning = await ctx.get("current\_reasoning", default=[])  
        print('\n-------------当前推理历史------------')  
        for idx, reasoning in enumerate(current\_reasoning, start=1):  
            print(f'\n{idx}. {reasoning}')  
  
        #将历史用户消息与推理历史组装成列表  
        llm\_input = self.formatter.format(  
            self.tools, chat\_history, current\_reasoning=current\_reasoning  
        )  
  
        return InputEvent(input=llm\_input)
        
      

【LLM调用:handle_llm_input

使用上一步骤准备的输入内容,调用LLM,并解析结果。以决定下一步动作(返回不同的事件),具体可以参考下面的代码及注释:


        
            

            
          @step  
    async def handle\_llm\_input(  
        self, ctx: Context, ev: InputEvent  
    ) -> ToolCallEvent | StopEvent:  
          
        """  
        调用LLM;   
        解析输出结果, 获得推理结果;  
        判断是结束(可以回答问题), 还是需要调用工具;  
        """  
  
        chat\_history = ev.input  
  
        #调用LLM  
        response = await self.llm.achat(chat\_history)  
  
        try:  
  
            #解析输出的推理结果  
            reasoning\_step = self.output\_parser.parse(response.message.content)  
            (await ctx.get("current\_reasoning", default=[])).append(  
                reasoning\_step  
            )  
  
            #如果已经结束:输出结果,流程结束(可立即回答,或者观察工具调用结果后可以回答)  
            if reasoning\_step.is\_done:  
                self.memory.put(  
                    ChatMessage(  
                        role="assistant", content=reasoning\_step.response  
                    )  
                )  
                return StopEvent(  
                    result={  
                        "response": reasoning\_step.response,  
                        "sources": [*self.sources],  
                        "reasoning": await ctx.get(  
                            "current\_reasoning", default=[]  
                        ),  
                    }  
                )  
              
            #如果无法回答,需要调用工具  
            elif isinstance(reasoning\_step, ActionReasoningStep):  
                tool\_name = reasoning\_step.action  
                tool\_args = reasoning\_step.action\_input  
                return ToolCallEvent(  
                    tool\_calls=[  
                        ToolSelection(  
                            tool\_id="",  
                            tool\_name=tool\_name,  
                            tool\_kwargs=tool\_args,  
                        )  
                    ]  
                )  
        except Exception as e:  
            (await ctx.get("current\_reasoning", default=[])).append(  
                ObservationReasoningStep(  
                    observation=f"There was an error in parsing my reasoning: {e}"  
                )  
            )  
  
        # 其他情况则进行下一次迭代,继续尝试  
        return PrepEvent()
        
      

【工具调用:handle_tool_calls

这是智能体使用外部工具(Tools)的关键步骤。根据上一步骤LLM输出的工具调用需求,调用外部工具(可能有多次调用),并把返回结果放在推理历史中,用于下一次迭代。


        
            

             @step  
    async def handle\_tool\_calls(  
        self, ctx: Context, ev: ToolCallEvent  
    ) -> PrepEvent:  
          
        """  
        工具调用,将调用结果作为LLM的观察对象;  
        并将观察内容添加到推理历史  
        """  
  
        tool\_calls = ev.tool\_calls  
        tools\_by\_name = {tool.metadata.get\_name(): tool for tool in self.tools}  
  
        # 工具调用  
        for tool\_call in tool\_calls:  
            tool = tools\_by\_name.get(tool\_call.tool\_name)  
            if not tool:  
                (await ctx.get("current\_reasoning", default=[])).append(  
                    ObservationReasoningStep(  
                        observation=f"Tool {tool\_call.tool\_name} does not exist"  
                    )  
                )  
                continue  
  
            try:  
  
                #调用工具,并将工具调用结果作为观察对象,添加到推理历史  
                tool\_output = tool(**tool\_call.tool\_kwargs)  
                self.sources.append(tool\_output)  
                (await ctx.get("current\_reasoning", default=[])).append(  
                    ObservationReasoningStep(observation=tool\_output.content)  
                )  
            except Exception as e:  
                (await ctx.get("current\_reasoning", default=[])).append(  
                    ObservationReasoningStep(  
                        observation=f"Error calling tool {tool.metadata.get\_name()}: {e}"  
                    )  
                )  
  
        # 进入下一次迭代  
        return PrepEvent()
        
      

【测试实现的ReAct Agent

现在,整个ReAct Agent的工作流就完成了,过程还是比较简单清晰的。当然这里也会利用到一些LlamaIndex提供的组件,比如用来封装LLM推理结果的xxxReasoningStep组件等。

我们来测试这个ReAct Agent组件,准备两个模拟工具(利用LlamaIndex中的FunctionTool快速构造基于函数的工具),然后创建一个Agent


        
            

          from llama\_index.core.tools import BaseTool, FunctionTool   
  
#模拟发送邮件  
def send\_email(subject: str, message: str, email: str) -> None:  
    """用于发送电子邮件"""  
    print(f"邮件已发送至 {email},主题为 {subject},内容为 {message}")  
  
tool\_send\_mail = FunctionTool.from\_defaults(fn=send\_email,name='tool\_send\_mail',description='用于发送电子邮件')  
  
#模拟客户查询  
def query\_customer(phone: str) -> str:  
    """用于查询客户信息"""  
    result = f"该客户信息为:\n姓名: 张三\n积分: 50000分\n邮件: test@gmail.com"  
    return result  
  
tool\_customer = FunctionTool.from\_defaults(fn=query\_customer,name='tool\_customer',description='用于查询客户信息,包括姓名、积分与邮件')  
  
agent = ReActAgent(  
    llm=OpenAI(model="gpt-4o-mini"), tools=[tool\_send\_mail,tool\_customer], timeout=120, verbose=True  
)
        
      

现在调用这个Agent,我们发出一个比较复杂的请求,来看看会发生什么:


        
            

          async def main():  
    ret = await agent.run(input="给客户13688888888发电子邮件,通知他最新的积分")  
    print(ret["response"])  
  
if \_\_name\_\_ == "\_\_main\_\_":   
    import asyncio  
    asyncio.run(main())
        
      

这里的任务很显然需要借助两次工具调用,一次查询客户信息,一次发送邮件,我们从输出中来观察最后一次的迭代信息:

picture.image

注意到这里的推理历史,完整的反应了LLM的“思考”过程:首先需要查询客户信息(调用tool_customer);然后观察到客户信息(工具调用结果);判断需要发送邮件(调用tool_send_mail);最后观察返回结果后结束流程。这是一个完整的符合ReAct模式(推理-行动-观察)的工作流,也证明了这里基于Workflows构建的ReAct Agent的可用性。

04

结束语

至此我们对LlamaIndex所推出的新特性Workflows已经有了较为全面的认识,很显然,这是一个与LangChain的LangGraph相似的另一种智能体开发底层框架,两者都面向复杂的智能体/RAG应用工作流,但又采取了不同的设计思想,至于哪个更好或许是见仁见智的问题,但对于大量LLM应用的开发者来说,的确又多了一个强大的工具,相信随着后续的迭代,Workflows也会越来越强大。

picture.image

END

点击下方关注我,不迷路

交流请识别以下名片并说明来源

picture.image

0
0
0
0
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论