点击上方👆蓝字关注我们!
简介
开源地址在:eino-deer-flow-go:https://github.com/cloudwego/eino-examples/tree/main/flow/agent/deer-go
DeerFlow(Deep Exploration and Efficient Research Flow)是字节跳动开源的一个深度研究框架,旨在将大语言模型(LLM)与专业工具(如网络搜索、爬虫和 Python 代码执行)相结合,用于自动化研究和内容创作。
据 DeerFlow 作者所述,DeerFlow 内部实现非常简单,很适合小白。所以我尝试着用 Go 语言进行复现。事实上,基于 Eino 在短期内就可以实现一整套 DeerFlow,我称之为 Deer-Go。Deer-Go 的代码已经开源,可以随时在 eino-deer-flow-go: https://github.com/cloudwego/eino-examples/tree/main/flow/agent/deer-go 查阅,欢迎大家一起提交贡献,共建 DeerFlow 和 Eino 社区。
当然,在用 Eino 实现 Deer-Go 的过程中,遇到一些比较常见的问题,本文重点帮助大家在实战中解决以下问题:
-
Eino 中的 Multi-Agent 架构要如何设计与实现?
-
Deer-Go 中子 Agent 的内部功能如何实现?
-
Eino 中如何实现 Interrupt?如何在 Interrupt 后进行恢复?
-
如何用 Eino + Hertz 实现前后端 SSE 流式输出,最佳实践是什么?
Eino 中实现 Multi-Agent
Deer-Go 的架构图完全参考 DeerFlow。DeerFlow 中,Agent 通信有两类:
- 控制权的转移:比如 Coordinator 将用户的信息传达给 Planner,让 Planner 制定研究规划。
- 数据状态的共享:比如 Planner 生成的规划传达给 Research Team;Research Team 将调研结果交给 Reporter 进行总结。
但是在具体实现中,由于 Eino 要求 flow 中每个节点的输入必须来自上一个节点的输出,且二者的类型需要保持一致。所以在 Deer-Go 中,控制权的转移用 Eino 的 flow 中节点的输入输出来实现,而数据状态的共享用全局状态(State)来实现。
具体实现是:
每个子 Agent 都是一个子图节点,子图内部是一条完整的 flow,每个节点的输入来自上一个节点的输出。主要分为三块功能区:
- load节点:用于加载该节点需要的 prompt、工具等信息
- llm 节点:用于将 load 节点的输入放入 llm 节点中
- router 节点:用于处理 llm 节点的输出,并根据全局状态和输出来决策下一个 Agent 的 name
子图之间,由Eino提供的分支节点(Branch Node)来转移控制权,如下图所示:
具体实现代码如下:
// 在主图中新增Coordinator和Planner节点
\_ = g.AddGraphNode(consts.Coordinator, coordinatorGraph, compose.WithNodeName(consts.Coordinator))
\_ = g.AddGraphNode(consts.Planner, plannerGraph, compose.WithNodeName(consts.Planner))
// 在Coordinator和Planner节点后添加分支节点
\_ = g.AddBranch(consts.Coordinator, compose.NewGraphBranch(agentHandOff, outMap))
\_ = g.AddBranch(consts.Planner, compose.NewGraphBranch(agentHandOff, outMap))
而分支节点与子 Agent 内部的 router 节点配合完成控制权的转移,代码如下,router 节点中,根据配置和大模型的输出来决定控制权要转移到哪个结点,然后将该节点 name 写入全局状态的 Goto 变量中。而 agentHandOff 获取该变量,将控制权交给对应的 Agent。
/ Coordinator的router节点
funcrouter(ctx context.Context, input *schema.Message, opts ...any)(output string, err error) {
err = compose.ProcessState[*model.State](ctx, func(\_ context.Context, state *model.State)error {
deferfunc() {
output = state.Goto
}()
state.Goto = compose.END // 默认转移到结束节点
iflen(input.ToolCalls) > 0 {
...
if state.EnableBackgroundInvestigation {
state.Goto = consts.BackgroundInvestigator // 转移到背调agent
} else {
state.Goto = consts.Planner // 转移到规划器agent
}
}
returnnil
})
return output, nil
}
// 子图流转函数,由上一个子图决定接下来流转到哪个agent
// 并将其name写入 state.Goto ,该函数读取 state.Goto 并将控制权交给对应agent
funcagentHandOff(ctx context.Context, input string)(next string, err error) {
deferfunc() {
ilog.EventInfo(ctx, "agent\_hand\_off", "input", input, "next", next)
}()
\_ = compose.ProcessState[*model.State](ctx, func(\_ context.Context, state *model.State)error {
next = state.Goto
returnnil
})
return next, nil
}
子 Agent 的内部实现
多智能体设计中,最重要的是控制权如何转移。所以每个子 Agent 都必须存在 router 节点。
Interrupt&CheckPoint
在 DeerFlow 中,Planner 在完成规划后,会将规划展示给用户,由用户决定是继续执行该规划,还是要调整该规划。引入 Human-in-the-loop 节点,用户可以轻松地与 AI 协作,确保最终结果完全符合预期。
结合了 Human-in-the-loop 的大模型应用虽然效果更好,但是对应用本身的实现提出更大的挑战。一个最直接的问题:大模型等待用户输入的时候,后端状态是怎样的?
将整个图都存储在内存中并阻塞不失为一个选择,但这在工程部署上,是个最差的选择——如果你服务重启,那么这个会话就会丢失,就算用户审核通过,也无法再继续了。或者类似的请求增多,你的服务器内存就会被这样的请求占满,进而崩溃。
更好的一个方式是,当发生 Interrupt 时,后端将 Graph 的所有状态都持久化存储,等用户交互完成,再将 Graph 进行恢复。所以 Interrupt 总是和 CheckPoint 一起出现。
除了 Interrupt 时,对用户的请求进行分析、排错、定位等情况,我们需要也需要将图的一些状态存入后端数据库。所以这就更需要对全局状态进行持久化操作了。
Interrupt
使用 Interrupt & CheckPoint 功能,可以实现在指定位置暂停 Graph 执行并在之后断点续传,如果是 StateGraph,还可以在断点续传前修改 State。
断点续传仅能复原输入和运行时各节点产生的数据,需要确保 Graph 编排和 CallOptions 完全相同。
Eino 中的 Interrupt 是一种特殊的报错信息。可以参考 Human 节点的 router 子节点代码,发出一个特殊的报错 compose.InterruptAndRerun。
funcrouterHuman(ctx context.Context, input string, opts ...any)(output string, err error) {
err = compose.ProcessState[*model.State](ctx, func(\_ context.Context, state *model.State)error {
deferfunc() {
output = state.Goto
state.InterruptFeedback = ""
}()
state.Goto = consts.ResearchTeam
if !state.AutoAcceptedPlan {
switch state.InterruptFeedback {
case consts.AcceptPlan:
returnnil
case consts.EditPlan:
state.Goto = consts.Planner
returnnil
default:
return compose.InterruptAndRerun // 发出Interrupt
}
}
state.Goto = consts.ResearchTeam
returnnil
})
return output, err
}
可以用 compose.ExtractInterruptInfo 从运行返回的 error 中获得本次运行是否 Interrupt 以及 Interrupt 信息:
result, err := runner.Invoke(ctx, input)
if info, ok := compose.ExtractInterruptInfo(err); ok {
ilog.EventDebug(ctx, "ChatStream\_interrupt", "info", info)
data := &model.ChatResp{
ThreadID: req.ThreadID,
ID: "human\_feedback:" + util.RandStr(20),
Role: "assistant",
Content: "检查计划",
FinishReason: "interrupt",
Options: []map[string]any{
{
"text": "编辑计划",
"value": "edit\_plan",
},
{
"text": "开始执行",
"value": "accepted",
},
},
}
dB, \_ := json.Marshal(data)
w.WriteEvent("", "interrupt", dB)
}
CheckPoint
在发生 Interrupt 的时候,需要将当前图的状态存储起来,等完成用户交互,再将图状态加载出来并继续运行。这个过程就涉及到对图的存取问题。
CheckPointStore 接口
Eino 中对图进行存取操作的接口是 CheckPointStore 接口,CheckPointStore 是一个 key 类型为 string、value 类型为 []byte 的 KV 存储接口,Eino 没有提供封装和默认实现,需要用户自行实现,用来存储 checkpoint。以下是 Deer-Go 的 DeerCheckPoint 实现,非常简单,就是使用一个单实例 deerCheckPoint 变量完成图的存储,变量内部实际是一个 map。在实际的工程中,Get 与 Set 接口应当与后端数据库连接,实现持久化读写。
// DeerCheckPoint DeerGo的全局状态存储点,
// 实现CheckPointStore接口,用checkPointID进行索引
// 此处粗略使用map实现,工程上可以用工业存储组件实现
type DeerCheckPoint struct {
buf map[string][]byte
}
func(dc *DeerCheckPoint)Get(ctx context.Context, checkPointID string)([]byte, bool, error) {
data, ok := dc.buf[checkPointID]
return data, ok, nil
}
func(dc *DeerCheckPoint)Set(ctx context.Context, checkPointID string, checkPoint []byte)error {
dc.buf[checkPointID] = checkPoint
returnnil
}
// 创建一个全局状态存储点实例并返回
var deerCheckPoint = DeerCheckPoint{
buf: make(map[string][]byte),
}
funcNewDeerCheckPoint(ctx context.Context)compose.CheckPointStore {
return &deerCheckPoint
}
Eino 中可以在图编译时使用 compose.WithCheckPointStore 配置,将 CheckPointStore 与图连接起来。
r, err := g.Compile(ctx,
compose.WithGraphName("EinoDeer"),
compose.WithCheckPointStore(model.NewDeerCheckPoint(ctx)),
)
在图运行时使用 compose.WithCheckPointID 配置,输入 CheckPointID,有相同 CheckPointID 的请求,被认为是同一张图,且共享同样的图状态。
\_, err = r.Invoke(ctx, consts.Coordinator,
compose.WithCheckPointID(req.ThreadID),
)
注册序列化方法
CheckPoint 的保存和读取涉及对 Graph 节点输入输出以及 State 的序列化和反序列化,在仅使用简单类型或 Eino 内置类型(比如 Message 或 Document)时,用户无需额外操作;当引入自定义 struct 时,需要提前注册类型。
Deer-Go 中对 State 进行注册的代码如下,同时如果注册的类型实现了 json Marshaler 和 Unmarshaler,此类型的序列化和反序列化会使用自定义方法。
funcinit() {
err := compose.RegisterSerializableType[State]("DeerState")
if err != nil {
panic(err)
}
}
...
func(s *State)MarshalJSON()([]byte, error) {
b, err := json.Marshal(*s)
if err != nil {
returnnil, err
}
return b, nil
}
func(s *State)UnmarshalJSON(b []byte)error {
type Alias State
var tmp Alias
if err := json.Unmarshal(b, &tmp); err != nil {
return err
}
*s = State(tmp)
returnnil
}
与前端交互
与 Hertz 框架结合
Deer-Go 采用 Hertz 框架实现,hertz 官方提供的 sse 包 github.com/cloudwego/hertz/pkg/protocol/sse 也对可以直接使用。以下是 Deer-Go 的 handler 层的逻辑和对应的代码,Eino 中的每个 Node 都包含一些切面,其中常见的切面有 onStart 和 onEnd,分别在 Node 执行前由 Eino 框架进行调用,主要用于输出该 Node 的输入与输出,这样只需要将这些节点的输出写入到 sse writer 中,前端即可收到 sse 流式返回。
callback 机制
Eino 中的 Callback 机制在固定的时机 (Callback Timing),回调用户提供的 function (Callback Handler),并把自己是谁 (RunInfo),以及当时发生了什么 (Callback Input & Output) 传出去。
具体来说,Deer-Go 的 LoggerCallback 实现了以下几个 Handler,OnStart 和 OnEnd 分别是在 Eino 中节点在输入输出时调用,OnStartWithStreamInput 和 OnEndWithStreamOutput 是在流式输入和输出时调用,OnError 在节点报错时调用。
本文的重点是 OnEndWithStreamOutput 函数,因为 Deer-Go 是采用流式输出的,每个流式节点(包括大模型节点)的输出,都需要传递给用户。
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
OnStartWithStreamInput(ctx context.Context, info *RunInfo,
input *schema.StreamReader[CallbackInput]) context.Context
OnEndWithStreamOutput(ctx context.Context, info *RunInfo,
output *schema.StreamReader[CallbackOutput]) context.Context
}
上述 5 个接口分别对应了 5 个 callback 时机 (Callback Timing)
const (
TimingOnStart CallbackTiming = iota// 进入并开始执行
TimingOnEnd // 成功完成即将 return
TimingOnError // 失败并即将 return err
TimingOnStartWithStreamInput // OnStart,但是输入是 StreamReader
TimingOnEndWithStreamOutput // OnEnd,但是输出是 StreamReader)
具体实现
在 Deer-Go 运行 Graph 之前,使用 compose.WithCallbacks 将 LoggerCallback 进行初始化,将前后端的 sse 通道传入自定义的 LoggerCallback 中。
type LoggerCallback struct {
callbacks.HandlerBuilder
ID string
SSE *sse.Writer // 与前端交流的sse通道
}
\_, err = r.Stream(ctx, consts.Coordinator,
compose.WithCallbacks(&infra.LoggerCallback{
SSE: w,
}),
)
OnEndWithStreamOutput 中,对 output 流进行类型断言,并对类型进行拆解分析。在 Deer-Go 中,output 流有以下类型:
- 如果是普通的 msg,则触发 message_chunk 事件
- 如果是工具调用,则触发 tool_calls 事件
- 如果是 string,则说明是子 Agent 在交接控制权
func(cb *LoggerCallback)OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo,
output *schema.StreamReader[callbacks.CallbackOutput])context.Context {
msgID := util.RandStr(20)
gofunc() {
defer output.Close() // remember to close the stream in defer
deferfunc() {
if err := recover(); err != nil {
ilog.EventFatal(ctx, "[OnEndStream]panic\_recover", "msgID", msgID, "err", err)
}
}()
for {
frame, err := output.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
ilog.EventError(ctx, err, "[OnEndStream] recv\_error")
return
}
switch v := frame.(type) {
case *schema.Message:
\_ = cb.pushMsg(ctx, msgID, v)
case *ec\_model.CallbackOutput:
\_ = cb.pushMsg(ctx, msgID, v.Message)
case []*schema.Message:
for \_, m := range v {
\_ = cb.pushMsg(ctx, msgID, m)
}
casestring:
//ilog.EventInfo(ctx, "frame\_type", "type", "str", "v", v)
default:
ilog.EventInfo(ctx, "frame\_type", "type", "unknown", "v", v)
}
}
}()
return ctx
}
运行 Deer-Go
推荐使用 Go 1.23.0 环境运行 Deer-Go:
git clone https://github.com/cloudwego/eino-examples.git
cd eino-examples/flow/agent/deer-go
go mod tidy
在运行 DeerFlow 的前端同时,按照以下命令操作,即可运行 Deer-Go:
- 进入
conf文件夹中,复制演示配置文件,并填入配置 key,具体配置方法在下方展示
cp ./conf/deer-go.yaml.1 ./conf/deer-go.yaml
- 运行
run.sh,编译并执行。
./run.sh
- 如果想要用 DeerFlow 的前端来演示 Deer-Go,也是可以的。Deer-Go 只实现了 DeerFlow 的后端部分,与 DeerFlow 共用同一个前端。目前与 DeerFlow 的 2a79f7e 版本的前端对齐。运行 DeerFlow 的前端后,在 Deer-Go 目录下运行
run.sh,携带-s参数即可以 server 的方式运行 Deer-Go
./run.sh -s
