解决多Agent信息传递难题:AgentScope实现分布式架构下的高效协作与优化跨机器通信与任务调度,扩展性提升

大模型向量数据库云通信
1.AgentScope 简介

AgentScope是通义实验室开源的multi-agent编程框架,专为开发人员设计,提供了丰富的组件, 全面的文档和广泛的兼容性。同时,AgentScope Workstation提供了在线拖拉拽编程和在线小助手(copilot)功能,帮助开发者迅速上手!支持自定义的容错控制和重试机制,以提高应用程序的稳定性,支持以中心化的方式构建分布式多智能体应用程序。

picture.image

AgentScope 的多智能体平台,由阿里巴巴团队开发。随着大语言模型(LLMs)的快速发展,多智能体应用(即多个智能体协作完成任务)取得了显著进展。然而,开发健壮、高效的多智能体应用面临诸多挑战,例如:

  • 协调复杂性

: 管理多个智能体的协作、通信和工作流程非常复杂。

  • LLM 不稳定性

: LLMs 可能产生幻觉或错误,这些错误会在多智能体系统中传播。

  • 集成难度

: 将多模态数据(图像、音频等)、外部工具(如搜索、代码执行)和知识库(RAG)集成到多智能体应用中是系统性的挑战。

  • 分布式部署

: 在不同机器上部署和管理智能体,并保证效率和可靠性,对开发者要求很高。

picture.image

2.Agent 应用构建

2.1 Agent 内逻辑

picture.image

2.2 Agnet通信

Agnet通信四种模式:顺序、循环、条件、复合

picture.image

  • 挑战一:用户需指定每一次信息交换(怎么传)

picture.image

  • 挑战二:信息传递的目标不容易控制(传给谁)

为了简化广播消息的操作,AgentScope 提供了一个 msghub 模块。 具体来说,在同一个 msghub 中的智能体会自动接收其它参与者发送的消息。 我们只需要组织发言的顺序,而不需要显式地将消息传递给其它智能体。

picture.image

  • 例子展示
  
from agentscope.agents import DialogAgent, UserAgent  
from agentscope.message import Msg  
from agentscope import msghub  
import agentscope  
  
agentscope.init(model\_configs="agentscope/agentscope/examples/test/model\_configs.json")  
  
  
alice = DialogAgent(  
    name="Alice",  
    sys\_prompt="你是一个名叫Alice的助手。",  
    model\_config\_name="my-qwen-max",  
)  
  
bob = DialogAgent(  
    name="Bob",  
    sys\_prompt="你是一个名叫Bob的助手。",  
    model\_config\_name="my-qwen-max",  
)  
  
charlie = DialogAgent(  
    name="Charlie",  
    sys\_prompt="你是一个名叫Charlie的助手。",  
    model\_config\_name="my-qwen-max",  
)  
  
# 介绍对话规则  
greeting = Msg(  
    name="user",  
    content="现在开始从1开始逐个报数,每次只产生一个数字,绝对不能产生其他任何信息。",  
    role="user",  
)  
  
with msghub(  
    participants=[alice, bob, charlie],  #参与者  
    announcement=greeting,  # 开始时,通知消息将广播给所有参与者--公告  
) as hub:  
# 对话的第一轮  
    alice()  
    bob()  
    charlie()  
  
# 我们可以动态管理参与者,例如从对话中删除一个智能体  
    hub.delete(charlie)  
  
# 向所有参与者广播一条消息  
    hub.broadcast(  
        Msg(  
"user",  
"Charlie已离开对话。",  
"user",  
        ),  
    )  
  
# 对话的第二轮  
    alice()  
    bob()  
    charlie()  
  
print("Alice的记忆:")  
for msg in alice.memory.get\_memory():  
    print(f"{msg.name}:{msg.content}")  
  
print("\nCharlie的记忆:")  
for msg in charlie.memory.get\_memory():  
    print(f"{msg.name}:{msg.content}")  

  
[93mAlice[0m: 1  
[91mBob[0m: 2  
[90mCharlie[0m: 3  
[93mAlice[0m: 4  
[91mBob[0m: 5  
[90mCharlie[0m: 4  
  
  
Alice的记忆:  
user:现在开始从1开始逐个报数,每次只产生一个数字,绝对不能产生其他任何信息。  
Alice:1  
Bob:2  
Charlie:3  
user:Charlie已离开对话。  
Alice:4  
Bob:5  
  
Charlie的记忆:  
user:现在开始从1开始逐个报数,每次只产生一个数字,绝对不能产生其他任何信息。  
Alice:1  
Bob:2  
Charlie:3  
Charlie:4  

2.3 Agent 二开

AgentScope 内置若干智能体类,以支持不同使用场景,同时展示如何使用 AgentScope 构建智能体

更多内容参考:https://doc.agentscope.io/zh*CN/build*tutorial/builtin\_agent.html

picture.image

描述
UserAgent允许用户参与对话的智能体。
DialogAgent使用自然语言交谈的智能体。
DictDialogAgent支持结构化输出的智能体。
ReActAgent以 reasoning-acting 循环的方式使用工具的智能体。
LlamaIndexAgent检索增强型生成 (RAG) 智能体。
  
import agentscope  
  
for module in agentscope.agents.\_\_all\_\_:  
if module.endswith("Agent"):  
        print(module)  
  
DialogAgent  
DictDialogAgent  
UserAgent  
ReActAgent  
LlamaIndexAgent  

为了使同一个智能体类能够支持不同的大语言模型 API,所有内置智能体类都通过模型配置名 modelconfigname 来进行初始化。如果你构建的智能体不打算多个不同的模型,推荐可以显式地进行模型初始化,而不是使用模型配置名。

  
import agentscope  
  
agentscope.init(  
    model\_configs=[  
        {  
"config\_name": "gpt-4o\_temperature-0.5",  
"model\_type": "openai\_chat",  
"model\_name": "gpt-4o",  
"api\_key": "xxx",  
"temperature": 0.5,  
        },  
        {  
"config\_name": "my-qwen-max",  
"model\_type": "dashscope\_chat",  
"model\_name": "qwen-max",  
        },  
    ],  
)  

picture.image

  1. AgentScope 分布式 =================

现实生活中存在众多Agent分布在不同机器上的场景

  • Agent数量超过单机负载
  • 大规模仿真
  • 有多个用户参与
  • 多人在线游戏
  • 涉及到隐私或机密数据
  • 私人Al助理
  • 分布式能够提供更好的性能
  • 更好的扩展性
  • 借助并行加速

3.1分布式挑战

  • 通信机制设计
  • 并行任务调度
  • 降低编程门槛
  • 避免额外开发

主要特性

  • HTTP/2

:gRPC默认使用HTTP/2作为传输协议,相比HTTP/1.1,提供了多项性能提升,如多路复用、头部压缩和服务器推送等。

  • Protocol Buffers

:用于定义服务接口和数据类型。Protocol Buffers是Google开发的一种轻量、高效的结构化数据存储格式,类似于XML或JSON,但更小、更快、更简单。

  • 双向流

:gRPC支持四种模式的数据交换方式:简单RPC(Unary RPC)、服务器流式RPC(Server streaming RPC)、客户端流式RPC(Client streaming RPC)以及双向流式RPC(Bidirectional streaming RPC),这为开发者提供了极大的灵活性。

  • 强大的IDL(Interface Definition Language)

:通过.proto文件定义服务接口和消息格式,gRPC自动生成客户端和服务端代码,大大简化了开发流程。

  • 跨语言支持

:gRPC的一大亮点是其对多种编程语言的支持,允许使用不同编程语言编写的服务之间进行通信。picture.image

3.2 AgentScope 分布式demo

AgentScope 原生提供了基于 gRPC 的分布式模式, 在这种模式下,一个应用程序中的多个智能体可以部署到不同的进程或者甚至不同的机器上,从而充分利用计算资源,提高效率。

与传统模式相比,AgentScope 的分布式模式不需要修改主进程代码。只需在初始化智能体时调用 to_dist 函数。为了展示 AgentScope 分布式模式带来的加速效果,示例中将自定义一个 WebAgent 类,在该类型将等待5秒来模拟抓取网页和从中寻找答案的过程。

执行搜索的过程是 run 函数。传统模式和分布式模式之间唯一的区别在于初始化阶段,即 initwithoutdist 和 initwithdist。 在分布式模式下,您需要调用 to_dist 函数,将原始智能体转换为对应的分布式版本。

  
import time  
import agentscope  
from agentscope.agents import AgentBase  
from agentscope.message import Msg  
  
classWebAgent(AgentBase):  
  
def\_\_init\_\_(self, name):  
        super().\_\_init\_\_(name)  
  
defget\_answer(self, url: str, query: str):  
        time.sleep(5)  
returnf"来自 {self.name} 的答案"  
  
defreply(self, x: dict = None) -> dict:  
return Msg(  
            name=self.name,  
            content=self.get\_answer(x.content["url"], x.content["query"])  
        )  
  
  
QUERY = "示例查询"  
URLS = ["页面\_1", "页面\_2", "页面\_3", "页面\_4", "页面\_5"]  
  
definit\_without\_dist():  
return [WebAgent(f"W{i}") for i in range(len(URLS))]  
  
  
definit\_with\_dist():  
return [WebAgent(f"W{i}").to\_dist() for i in range(len(URLS))]  
  
  
defrun(agents):  
    start = time.time()  
    results = []  
for i, url in enumerate(URLS):  
        results.append(agents[i].reply(  
            Msg(  
                name="system",  
                role="system",  
                content={  
"url": url,  
"query": QUERY  
                }  
            )  
        ))  
for result in results:  
        print(result.content)  
    end = time.time()  
return end - start  
  
  
if \_\_name\_\_ == "\_\_main\_\_":  
    agentscope.init()  
    start = time.time()  
    simple\_agents = init\_without\_dist()  
    dist\_agents = init\_with\_dist()  
    end = time.time()  
    print(f"初始化时间:{end - start}")  
    print(f"无分布式模式下的运行时间:{run(simple\_agents)}")  
    print(f"分布式模式下的运行时间:{run(dist\_agents)}")  

  
来自 W0 的答案  
来自 W1 的答案  
来自 W2 的答案  
来自 W3 的答案  
来自 W4 的答案  
无分布式模式下的运行时间:25.0039165019989  
  
来自 W0 的答案  
来自 W1 的答案  
来自 W2 的答案  
来自 W3 的答案  
来自 W4 的答案  
分布式模式下的运行时间:5.011251211166382  

  • 主进程:AgentScope应用程序所在的进程被称为主进程。例如,上例中的 run 函数就是在主进程中运行的。每个 AgentScope 应用程序只有一个主进程。

  • 智能体服务器进程:AgentScope智能体服务器进程是智能体在分布式模式下运行的进程。例如,在上例中,dist_agents 中的所有智能体都在智能体服务器进程中运行。可以有多个AgentScope智能体服务器进程。这些进程可以运行在任何可网络访问的机器上,每个智能体服务器进程中可以同时运行多个智能体。

  • 子进程模式:在子进程模式下,智能体服务器进程由主进程启动为子进程。例如,在上例中,distagents 中的每个智能体实际上都是主进程的一个子进程。这是默认模式,也就是说,如果您直接调用 todist 函数而不传入任何参数,它将使用此模式。

  • 独立进程模式:在独立进程模式下,智能体服务器与主进程无关,需要预先在机器上启动。

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
CV 技术在视频创作中的应用
本次演讲将介绍在拍摄、编辑等场景,我们如何利用 AI 技术赋能创作者;以及基于这些场景,字节跳动积累的领先技术能力。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论