使用 Eino 框架实现 DeerFlow 系统

点击上方👆蓝字关注我们!

picture.image

简介

开源地址在:eino-deer-flow-go:https://github.com/cloudwego/eino-examples/tree/main/flow/agent/deer-go

DeerFlow(Deep Exploration and Efficient Research Flow)是字节跳动开源的一个深度研究框架,旨在将大语言模型(LLM)与专业工具(如网络搜索、爬虫和 Python 代码执行)相结合,用于自动化研究和内容创作。

据 DeerFlow 作者所述,DeerFlow 内部实现非常简单,很适合小白。所以我尝试着用 Go 语言进行复现。事实上,基于 Eino 在短期内就可以实现一整套 DeerFlow,我称之为 Deer-Go。Deer-Go 的代码已经开源,可以随时在 eino-deer-flow-go: https://github.com/cloudwego/eino-examples/tree/main/flow/agent/deer-go 查阅,欢迎大家一起提交贡献,共建 DeerFlow 和 Eino 社区。

当然,在用 Eino 实现 Deer-Go 的过程中,遇到一些比较常见的问题,本文重点帮助大家在实战中解决以下问题:

  • Eino 中的 Multi-Agent 架构要如何设计与实现?

  • Deer-Go 中子 Agent 的内部功能如何实现?

  • Eino 中如何实现 Interrupt?如何在 Interrupt 后进行恢复?

  • 如何用 Eino + Hertz 实现前后端 SSE 流式输出,最佳实践是什么?

Eino 中实现 Multi-Agent

picture.image

Deer-Go 的架构图完全参考 DeerFlow。DeerFlow 中,Agent 通信有两类:

  • 控制权的转移:比如 Coordinator 将用户的信息传达给 Planner,让 Planner 制定研究规划。
  • 数据状态的共享:比如 Planner 生成的规划传达给 Research Team;Research Team 将调研结果交给 Reporter 进行总结。

但是在具体实现中,由于 Eino 要求 flow 中每个节点的输入必须来自上一个节点的输出,且二者的类型需要保持一致。所以在 Deer-Go 中,控制权的转移用 Eino 的 flow 中节点的输入输出来实现,而数据状态的共享用全局状态(State)来实现。

具体实现是:

每个子 Agent 都是一个子图节点,子图内部是一条完整的 flow,每个节点的输入来自上一个节点的输出。主要分为三块功能区:

  • load节点:用于加载该节点需要的 prompt、工具等信息
  • llm 节点:用于将 load 节点的输入放入 llm 节点中
  • router 节点:用于处理 llm 节点的输出,并根据全局状态和输出来决策下一个 Agent 的 name

子图之间,由Eino提供的分支节点(Branch Node)来转移控制权,如下图所示:

picture.image

具体实现代码如下:

  
// 在主图中新增Coordinator和Planner节点  
\_ = g.AddGraphNode(consts.Coordinator, coordinatorGraph, compose.WithNodeName(consts.Coordinator))  
\_ = g.AddGraphNode(consts.Planner, plannerGraph, compose.WithNodeName(consts.Planner))  
  
// 在Coordinator和Planner节点后添加分支节点  
\_ = g.AddBranch(consts.Coordinator, compose.NewGraphBranch(agentHandOff, outMap))  
\_ = g.AddBranch(consts.Planner, compose.NewGraphBranch(agentHandOff, outMap))  

而分支节点与子 Agent 内部的 router 节点配合完成控制权的转移,代码如下,router 节点中,根据配置和大模型的输出来决定控制权要转移到哪个结点,然后将该节点 name 写入全局状态的 Goto 变量中。而 agentHandOff 获取该变量,将控制权交给对应的 Agent。

  
/ Coordinator的router节点  
funcrouter(ctx context.Context, input *schema.Message, opts ...any)(output string, err error) {  
    err = compose.ProcessState[*model.State](ctx, func(\_ context.Context, state *model.State)error {  
       deferfunc() {  
          output = state.Goto  
       }()  
       state.Goto = compose.END // 默认转移到结束节点  
       iflen(input.ToolCalls) > 0 {  
          ...  
          if state.EnableBackgroundInvestigation {  
             state.Goto = consts.BackgroundInvestigator // 转移到背调agent  
          } else {  
             state.Goto = consts.Planner // 转移到规划器agent  
          }  
       }  
       returnnil  
    })  
    return output, nil  
}  
  
// 子图流转函数,由上一个子图决定接下来流转到哪个agent  
// 并将其name写入 state.Goto ,该函数读取 state.Goto 并将控制权交给对应agent  
funcagentHandOff(ctx context.Context, input string)(next string, err error) {  
    deferfunc() {  
       ilog.EventInfo(ctx, "agent\_hand\_off", "input", input, "next", next)  
    }()  
    \_ = compose.ProcessState[*model.State](ctx, func(\_ context.Context, state *model.State)error {  
       next = state.Goto  
       returnnil  
    })  
    return next, nil  
}  

子 Agent 的内部实现

多智能体设计中,最重要的是控制权如何转移。所以每个子 Agent 都必须存在 router 节点。

picture.image

Interrupt&CheckPoint

在 DeerFlow 中,Planner 在完成规划后,会将规划展示给用户,由用户决定是继续执行该规划,还是要调整该规划。引入 Human-in-the-loop 节点,用户可以轻松地与 AI 协作,确保最终结果完全符合预期。

结合了 Human-in-the-loop 的大模型应用虽然效果更好,但是对应用本身的实现提出更大的挑战。一个最直接的问题:大模型等待用户输入的时候,后端状态是怎样的?

将整个图都存储在内存中并阻塞不失为一个选择,但这在工程部署上,是个最差的选择——如果你服务重启,那么这个会话就会丢失,就算用户审核通过,也无法再继续了。或者类似的请求增多,你的服务器内存就会被这样的请求占满,进而崩溃。

更好的一个方式是,当发生 Interrupt 时,后端将 Graph 的所有状态都持久化存储,等用户交互完成,再将 Graph 进行恢复。所以 Interrupt 总是和 CheckPoint 一起出现。

除了 Interrupt 时,对用户的请求进行分析、排错、定位等情况,我们需要也需要将图的一些状态存入后端数据库。所以这就更需要对全局状态进行持久化操作了。

Interrupt

使用 Interrupt & CheckPoint 功能,可以实现在指定位置暂停 Graph 执行并在之后断点续传,如果是 StateGraph,还可以在断点续传前修改 State。

断点续传仅能复原输入和运行时各节点产生的数据,需要确保 Graph 编排和 CallOptions 完全相同。

Eino 中的 Interrupt 是一种特殊的报错信息。可以参考 Human 节点的 router 子节点代码,发出一个特殊的报错 compose.InterruptAndRerun。

  
funcrouterHuman(ctx context.Context, input string, opts ...any)(output string, err error) {  
    err = compose.ProcessState[*model.State](ctx, func(\_ context.Context, state *model.State)error {  
       deferfunc() {  
          output = state.Goto  
          state.InterruptFeedback = ""  
       }()  
       state.Goto = consts.ResearchTeam  
       if !state.AutoAcceptedPlan {  
          switch state.InterruptFeedback {  
          case consts.AcceptPlan:  
             returnnil  
          case consts.EditPlan:  
             state.Goto = consts.Planner  
             returnnil  
          default:  
             return compose.InterruptAndRerun // 发出Interrupt  
          }  
       }  
       state.Goto = consts.ResearchTeam  
       returnnil  
    })  
    return output, err  
}  

可以用 compose.ExtractInterruptInfo 从运行返回的 error 中获得本次运行是否 Interrupt 以及 Interrupt 信息:

  
result, err := runner.Invoke(ctx, input)  
if info, ok := compose.ExtractInterruptInfo(err); ok {  
    ilog.EventDebug(ctx, "ChatStream\_interrupt", "info", info)  
    data := &model.ChatResp{  
       ThreadID:     req.ThreadID,  
       ID:           "human\_feedback:" + util.RandStr(20),  
       Role:         "assistant",  
       Content:      "检查计划",  
       FinishReason: "interrupt",  
       Options: []map[string]any{  
          {  
             "text":  "编辑计划",  
             "value": "edit\_plan",  
          },  
          {  
             "text":  "开始执行",  
             "value": "accepted",  
          },  
       },  
    }  
    dB, \_ := json.Marshal(data)  
    w.WriteEvent("", "interrupt", dB)  
}  

CheckPoint

在发生 Interrupt 的时候,需要将当前图的状态存储起来,等完成用户交互,再将图状态加载出来并继续运行。这个过程就涉及到对图的存取问题。

CheckPointStore 接口

Eino 中对图进行存取操作的接口是 CheckPointStore 接口,CheckPointStore 是一个 key 类型为 string、value 类型为 []byte 的 KV 存储接口,Eino 没有提供封装和默认实现,需要用户自行实现,用来存储 checkpoint。以下是 Deer-Go 的 DeerCheckPoint 实现,非常简单,就是使用一个单实例 deerCheckPoint 变量完成图的存储,变量内部实际是一个 map。在实际的工程中,Get 与 Set 接口应当与后端数据库连接,实现持久化读写。

  
// DeerCheckPoint DeerGo的全局状态存储点,  
// 实现CheckPointStore接口,用checkPointID进行索引  
// 此处粗略使用map实现,工程上可以用工业存储组件实现  
type DeerCheckPoint struct {  
    buf map[string][]byte  
}  
  
func(dc *DeerCheckPoint)Get(ctx context.Context, checkPointID string)([]byte, bool, error) {  
    data, ok := dc.buf[checkPointID]  
    return data, ok, nil  
}  
  
func(dc *DeerCheckPoint)Set(ctx context.Context, checkPointID string, checkPoint []byte)error {  
    dc.buf[checkPointID] = checkPoint  
    returnnil  
}  
  
// 创建一个全局状态存储点实例并返回  
var deerCheckPoint = DeerCheckPoint{  
    buf: make(map[string][]byte),  
}  
  
funcNewDeerCheckPoint(ctx context.Context)compose.CheckPointStore {  
    return &deerCheckPoint  
}  

Eino 中可以在图编译时使用 compose.WithCheckPointStore 配置,将 CheckPointStore 与图连接起来。

  
r, err := g.Compile(ctx,  
    compose.WithGraphName("EinoDeer"),  
    compose.WithCheckPointStore(model.NewDeerCheckPoint(ctx)),  
)  

在图运行时使用 compose.WithCheckPointID 配置,输入 CheckPointID,有相同 CheckPointID 的请求,被认为是同一张图,且共享同样的图状态。

  
\_, err = r.Invoke(ctx, consts.Coordinator,  
    compose.WithCheckPointID(req.ThreadID),  
)  

注册序列化方法

CheckPoint 的保存和读取涉及对 Graph 节点输入输出以及 State 的序列化和反序列化,在仅使用简单类型或 Eino 内置类型(比如 Message 或 Document)时,用户无需额外操作;当引入自定义 struct 时,需要提前注册类型。

Deer-Go 中对 State 进行注册的代码如下,同时如果注册的类型实现了 json Marshaler 和 Unmarshaler,此类型的序列化和反序列化会使用自定义方法。

  
funcinit() {  
    err := compose.RegisterSerializableType[State]("DeerState")  
    if err != nil {  
       panic(err)  
    }  
}  
  
  
...  
  
  
func(s *State)MarshalJSON()([]byte, error) {  
    b, err := json.Marshal(*s)  
    if err != nil {  
       returnnil, err  
    }  
    return b, nil  
}  
  
func(s *State)UnmarshalJSON(b []byte)error {  
    type Alias State  
    var tmp Alias  
    if err := json.Unmarshal(b, &tmp); err != nil {  
       return err  
    }  
    *s = State(tmp)  
    returnnil  
}  

与前端交互

与 Hertz 框架结合

Deer-Go 采用 Hertz 框架实现,hertz 官方提供的 sse 包 github.com/cloudwego/hertz/pkg/protocol/sse 也对可以直接使用。以下是 Deer-Go 的 handler 层的逻辑和对应的代码,Eino 中的每个 Node 都包含一些切面,其中常见的切面有 onStart 和 onEnd,分别在 Node 执行前由 Eino 框架进行调用,主要用于输出该 Node 的输入与输出,这样只需要将这些节点的输出写入到 sse writer 中,前端即可收到 sse 流式返回。

picture.image

callback 机制

Eino 中的 Callback 机制在固定的时机 (Callback Timing),回调用户提供的 function (Callback Handler),并把自己是谁 (RunInfo),以及当时发生了什么 (Callback Input & Output) 传出去。

具体来说,Deer-Go 的 LoggerCallback 实现了以下几个 Handler,OnStart 和 OnEnd 分别是在 Eino 中节点在输入输出时调用,OnStartWithStreamInput 和 OnEndWithStreamOutput 是在流式输入和输出时调用,OnError 在节点报错时调用。

本文的重点是 OnEndWithStreamOutput 函数,因为 Deer-Go 是采用流式输出的,每个流式节点(包括大模型节点)的输出,都需要传递给用户。

  
type Handler interface {  
    OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context  
    OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context  
    OnError(ctx context.Context, info *RunInfo, err error) context.Context  
    OnStartWithStreamInput(ctx context.Context, info *RunInfo,  
       input *schema.StreamReader[CallbackInput]) context.Context  
    OnEndWithStreamOutput(ctx context.Context, info *RunInfo,  
       output *schema.StreamReader[CallbackOutput]) context.Context  
}  

上述 5 个接口分别对应了 5 个 callback 时机 (Callback Timing)

  
const (  
    TimingOnStart CallbackTiming = iota// 进入并开始执行  
    TimingOnEnd // 成功完成即将 return  
    TimingOnError // 失败并即将 return err   
    TimingOnStartWithStreamInput // OnStart,但是输入是 StreamReader  
    TimingOnEndWithStreamOutput // OnEnd,但是输出是 StreamReader)  

具体实现

在 Deer-Go 运行 Graph 之前,使用 compose.WithCallbacksLoggerCallback 进行初始化,将前后端的 sse 通道传入自定义的 LoggerCallback 中。

  
type LoggerCallback struct {  
    callbacks.HandlerBuilder  
  
    ID  string  
    SSE *sse.Writer  // 与前端交流的sse通道  
}  
\_, err = r.Stream(ctx, consts.Coordinator,  
    compose.WithCallbacks(&infra.LoggerCallback{  
       SSE: w,  
    }),  
)  

OnEndWithStreamOutput 中,对 output 流进行类型断言,并对类型进行拆解分析。在 Deer-Go 中,output 流有以下类型:

  • 如果是普通的 msg,则触发 message_chunk 事件
  • 如果是工具调用,则触发 tool_calls 事件
  • 如果是 string,则说明是子 Agent 在交接控制权
  
func(cb *LoggerCallback)OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo,  
    output *schema.StreamReader[callbacks.CallbackOutput])context.Context {  
    msgID := util.RandStr(20)  
    gofunc() {  
       defer output.Close() // remember to close the stream in defer  
       deferfunc() {  
          if err := recover(); err != nil {  
             ilog.EventFatal(ctx, "[OnEndStream]panic\_recover", "msgID", msgID, "err", err)  
          }  
       }()  
       for {  
          frame, err := output.Recv()  
          if errors.Is(err, io.EOF) {  
             break  
          }  
          if err != nil {  
             ilog.EventError(ctx, err, "[OnEndStream] recv\_error")  
             return  
          }  
  
          switch v := frame.(type) {  
          case *schema.Message:  
             \_ = cb.pushMsg(ctx, msgID, v)  
          case *ec\_model.CallbackOutput:  
             \_ = cb.pushMsg(ctx, msgID, v.Message)  
          case []*schema.Message:  
             for \_, m := range v {  
                \_ = cb.pushMsg(ctx, msgID, m)  
             }  
          casestring:  
             //ilog.EventInfo(ctx, "frame\_type", "type", "str", "v", v)  
          default:  
             ilog.EventInfo(ctx, "frame\_type", "type", "unknown", "v", v)  
          }  
       }  
  
    }()  
    return ctx  
}  

运行 Deer-Go

推荐使用 Go 1.23.0 环境运行 Deer-Go:

  
git clone https://github.com/cloudwego/eino-examples.git  
cd eino-examples/flow/agent/deer-go  
go mod tidy  

在运行 DeerFlow 的前端同时,按照以下命令操作,即可运行 Deer-Go:

  1. 进入 conf 文件夹中,复制演示配置文件,并填入配置 key,具体配置方法在下方展示
  
cp ./conf/deer-go.yaml.1 ./conf/deer-go.yaml  

  1. 运行 run.sh,编译并执行。
  
./run.sh  

  1. 如果想要用 DeerFlow 的前端来演示 Deer-Go,也是可以的。Deer-Go 只实现了 DeerFlow 的后端部分,与 DeerFlow 共用同一个前端。目前与 DeerFlow 的 2a79f7e 版本的前端对齐。运行 DeerFlow 的前端后,在 Deer-Go 目录下运行 run.sh,携带 -s 参数即可以 server 的方式运行 Deer-Go
  
./run.sh -s  

0
0
0
0
评论
未登录
暂无评论