从零开始学 Dify - 万字详解 Dify 工作流图引擎(GraphEngine)的实现机制

技术

一、概述

Dify 工作流引擎是一个基于事件驱动的图执行引擎,它能够处理复杂的业务逻辑流程,支持条件分支、并行执行、循环和错误处理等高级功能。接下来将深入理解 Dify 工作流引擎的工作原理和核心设计。

👆👆👆欢迎关注,一起进步👆👆👆

提示 :文中包含大量流程图、时序图和状态图,这些图表能辅助更直观地理解工作流引擎的各个组件和执行过程。建议先收藏、转发、分享

1.1 什么是工作流引擎?

工作流引擎是一种软件系统,用于定义、执行、监控和管理工作流程。在 Dify 中,工作流引擎负责协调各种 AI 组件(如 LLM 调用、工具使用、数据处理等)的执行,确保它们按照预定义的顺序和条件运行,从而实现复杂的AI应用逻辑。在 Dify 中使用图结构表示工作流。

1.1.1 工作流引擎的实际应用场景

为了更好地理解工作流引擎的作用,以下是几个在 Dify 中的实际应用场景:

  1. 多轮对话处理 :用户提问 → LLM 理解意图 → 调用相关工具 → 整合结果 → 生成回复
  2. 数据处理流程 :接收数据 → 清洗数据 → 分析数据 → 生成报告
  3. 条件决策流程 :分析用户输入 → 根据不同条件执行不同分支 → 合并结果
  4. 迭代处理 :对列表中的每个元素执行相同的操作,如批量翻译、批量分析等

1.2 为什么需要图执行引擎?

图是一种直观表达复杂关系的数据结构,由节点(代表操作)和边(代表节点间的连接关系)组成。使用图来表示工作流有以下优势:

  • 灵活性 :可以表达各种复杂的执行路径和条件分支
  • 例如:根据用户意图分类,将请求路由到不同的处理节点
  • 可视化 :便于直观理解和设计工作流程
  • 例如:通过可视化编辑器拖拽节点和连线,无需编写代码
  • 模块化 :节点可以独立开发和复用
  • 例如:创建一个通用的"数据验证"节点,在多个工作流中重复使用
  • 并行处理 :自然支持并行执行多个任务
  • 例如:同时调用多个外部 API 获取数据,提高响应速度

1.2.1 图执行引擎与传统顺序执行的对比

| 特性 | 传统顺序执行 | 图执行引擎 | | --- | --- | --- | | 执行路径 | 固定、线性 | 动态、多分支 | | 并行能力 | 需要额外编码 | 原生支持 | | 错误处理 | 通常全局处理 | 可针对节点定制 | | 可视化 | 难以直观表示 | 天然可视化 | | 扩展性 | 修改成本高 | 易于添加新节点 |

1.3 事件驱动架构的优势

事件驱动架构是一种软件设计模式,其中系统组件通过事件(如状态变化、用户操作等)相互通信。在工作流引擎中采用事件驱动架构有以下好处:

  • 松耦合 :组件之间通过事件通信,减少直接依赖
  • 例如:节点执行完成后发送事件,而不是直接调用下一个节点
  • 实时响应 :能够立即对状态变化做出反应
  • 例如:当 LLM 生成流式输出时,立即将内容传递给前端显示
  • 可扩展性 :易于添加新的事件处理器和监听器
  • 例如:添加新的监控组件,无需修改现有代码
  • 异步处理 :支持非阻塞操作,提高系统响应性
  • 例如:长时间运行的节点不会阻塞整个工作流的执行

1.3.1 事件驱动架构在 Dify 中的应用

在 Dify 工作流引擎中,事件是信息传递的核心机制。以下是一些关键事件类型及其作用:

| 事件类型 | 触发时机 | 作用 | | --- | --- | --- | | GraphRunStartedEvent | 工作流开始执行 | 通知客户端工作流已启动 | | NodeRunStartedEvent | 节点开始执行 | 更新节点状态,准备接收输出 | | NodeRunStreamChunkEvent | 节点产生流式输出 | 实时传递生成内容到前端 | | NodeRunSucceededEvent | 节点成功完成 | 更新状态,触发后续节点执行 | | NodeRunFailedEvent | 节点执行失败 | 触发错误处理逻辑 |

通过对Dify工作流引擎代码的分析,我们可以看到它具有以下核心组件和特性:

二、核心组件

2.1 图结构(Graph)

图结构是工作流引擎的基础,它定义了工作流中各个节点之间的连接关系和执行逻辑。简单来说,图结构就像是一张路线图,告诉引擎从起点到终点应该如何行进。

2.1.1 核心概念

  • 节点(Node) :工作流中的基本执行单元,代表一个具体的操作
  • 例如:LLM 节点(生成文本)、工具节点(调用API)、条件节点(判断分支)
  • 每个节点都有唯一的 ID、类型和配置参数
  • 边(Edge) :连接两个节点,表示执行流程的方向和条件
  • 例如:从"用户输入"节点到"LLM处理"节点的边
  • 边可以包含条件,决定是否沿着这条路径执行
  • 并行分支(Parallel) :允许多个节点同时执行的机制
  • 例如:同时调用多个不同的 API 获取数据

2.2.2 主要组件

  • Graph 类:定义了整个工作流的结构
  • 核心属性: root\_node\_id (起始节点)、 node\_ids (所有节点ID列表)、 edge\_mapping (节点间连接关系)
  • 主要方法: init (初始化图结构)、 get\_next\_nodes (获取下一步要执行的节点)
  • GraphEdge 类:定义了节点之间的连接关系
  • 核心属性: source\_node\_id (源节点ID)、 target\_node\_id (目标节点ID)、 run\_condition (执行条件)
  • GraphParallel 类:定义了并行执行的分支
  • 核心属性: id (并行分支ID)、 start\_from\_node\_id (起始节点)、 end\_to\_node\_id (结束节点)

picture.image

2.2.3 工作原理

当工作流执行时,引擎会从根节点开始,根据边的定义和条件,决定下一步执行哪些节点。如果遇到并行分支,则会同时执行多个节点,提高处理效率。

2.2.3.1 图结构的 JSON 表示示例
  
{  
  "nodes": [  
    { "id": "start", "data": { "type": "start" } },  
    { "id": "llm1", "data": { "type": "llm", "prompt": "分析以下内容..." } },  
    { "id": "condition", "data": { "type": "condition", "condition": "{{#llm1.sentiment#}} == '
 
 positive
 '" } },  
    { "id": "tool1", "data": { "type": "tool", "tool\_name": "search\_database" } },  
    { "id": "answer", "data": { "type": "answer", "template": "分析结果: {{#llm1.text#}}" } }  
  ],  
"edges": [  
    { "id": "e1", "source": "start", "target": "llm1" },  
    { "id": "e2", "source": "llm1", "target": "condition" },  
    { "id": "e3", "source": "condition", "target": "tool1", "sourceHandle": "true" },  
    { "id": "e4", "source": "condition", "target": "answer", "sourceHandle": "false" },  
    { "id": "e5", "source": "tool1", "target": "answer" }  
  ]  
}  

2.2.3.2 流程图:图结构构建与解析流程

picture.image

2.2.3.3 状态图:图结构状态转换

picture.image

2.2.4 源码实现

  
class Graph(BaseModel):  
    root\_node\_id: str = Field(..., description="root node id of the graph")  
    node\_ids: list[str] = Field(default\_factory=list, description="graph node ids")  
    node\_id\_config\_mapping: dict[str, dict] = Field(  
        default\_factory=dict, description="node configs mapping (node id: node config)"  
    )  
    edge\_mapping: dict[str, list[GraphEdge]] = Field(  
        default\_factory=dict, description="graph edge mapping (source node id: edges)"  
    )  
    reverse\_edge\_mapping: dict[str, list[GraphEdge]] = Field(  
        default\_factory=dict, description="reverse graph edge mapping (target node id: edges)"  
    )  
    parallel\_mapping: dict[str, GraphParallel] = Field(  
        default\_factory=dict, description="graph parallel mapping (parallel id: parallel)"  
    )  
    node\_parallel\_mapping: dict[str, str] = Field(  
        default\_factory=dict, description="graph node parallel mapping (node id: parallel id)"  
    )  

从源码可以看出,Graph 类使用了 Pydantic 的 BaseModel 作为基类,提供了数据验证和序列化功能。核心属性包括:

  • root\_node\_id :图的根节点 ID,工作流执行的起点
  • node\_ids :图中所有节点的 ID 列表
  • node\_id\_config\_mapping :节点 ID 到节点配置的映射
  • edge\_mapping :源节点 ID 到边列表的映射,定义了节点的出边
  • reverse\_edge\_mapping :目标节点 ID 到边列表的映射,定义了节点的入边
  • parallel\_mapping :并行 ID 到并行配置的映射
  • node\_parallel\_mapping :节点 ID 到并行 ID 的映射,标识节点属于哪个并行分支

2.2 图引擎(GraphEngine)

图引擎是工作流的核心执行器,负责协调和控制整个工作流的运行过程。它就像是一个指挥官,根据图结构(路线图)指挥各个节点(执行单元)按照正确的顺序和逻辑执行。

2.2.1 核心功能

  • 工作流执行 :从根节点开始,按照图结构定义的路径执行各个节点
  • 例如:先执行用户输入节点,再执行 LLM 处理节点,然后根据条件决定后续路径
  • 事件处理 :生成和处理各类事件,驱动工作流状态变化
  • 例如:当 LLM 节点开始执行时,触发 NodeRunStartedEvent 事件,更新 UI 显示节点状态为"运行中"
  • 并行处理 :管理并行分支的创建、执行和同步
  • 例如:同时调用多个外部 API 获取数据,然后等待所有 API 返回结果后继续执行
  • 错误处理 :处理节点执行过程中的异常,支持重试和继续执行策略
  • 例如:当 API 调用失败时,可以选择重试、跳过或终止整个工作流
  • 资源管理 :控制执行步数和时间,防止无限循环和资源耗尽
  • 例如:限制最大执行步数为100,防止无限循环消耗过多资源

2.2.2 主要组件

  • GraphEngine 类:工作流引擎的主类,负责整体协调和控制
  • 核心方法: run (启动工作流)、 \_run\_node (执行单个节点)、 \_handle\_parallel (处理并行分支)
  • 重要属性: graph (图结构)、 variable\_pool (变量池)、 event\_handler (事件处理器)
  • GraphEngineThreadPool 类:管理并行任务的线程池,控制并发数量
  • 核心方法: submit (提交任务)、 wait (等待任务完成)
  • 重要属性: max\_workers (最大工作线程数)、 timeout (超时时间)

picture.image

2.2.3 工作原理

  1. 初始化 :创建图引擎实例,加载图结构和初始变量
  
engine = GraphEngine(  
    graph=graph,                      # 图结构  
    variable\_pool=variable\_pool,      # 初始变量  
    event\_handler=event\_handler       # 事件处理器  
)  

  1. 启动 :调用 run 方法,触发 GraphRunStartedEvent 事件
  
result = engine.run()  

  1. 执行 :从根节点开始,递归执行各个节点
  • 对每个节点,先处理输入变量,然后执行节点逻辑,最后处理输出变量
  • 根据节点执行结果和边的条件,决定下一步执行哪些节点
  • 并行处理 :遇到并行分支时,创建多个任务并提交到线程池
  
# 伪代码示例  
for branch in parallel\_branches:  
    thread\_pool.submit(run\_branch, branch)  
# 等待所有分支完成  
thread\_pool.wait()  

  • 事件传递 :节点执行过程中产生的事件会传递给客户端,用于状态更新和UI反馈
  
# 伪代码示例  
event = NodeRunStartedEvent(node\_id=node\_id)  
event\_handler.handle\_event(event)  

  • 完成 :所有节点执行完毕后,触发 GraphRunSucceededEvent 或相应的失败事件

2.2.4 图引擎执行流程示意图

picture.image

2.2.4.1 流程图:图引擎执行流程

picture.image

2.2.4.2 时序图:图引擎与节点交互

picture.image

2.2.4.3 状态图:节点执行状态转换

picture.image

2.2.4.4 源码实现

  
class GraphEngine:  
    workflow\_thread\_pool\_mapping: dict[str, GraphEngineThreadPool] = {}  
  
    def \_\_init\_\_(  
        self,  
        tenant\_id: str,  
        app\_id: str,  
        workflow\_type: WorkflowType,  
        workflow\_id: str,  
        user\_id: str,  
        user\_from: UserFrom,  
        invoke\_from: InvokeFrom,  
        call\_depth: int,  
        graph: Graph,  
        graph\_config: Mapping[str, Any],  
        variable\_pool: VariablePool,  
        max\_execution\_steps: int,  
        max\_execution\_time: int,  
        thread\_pool\_id: Optional[str] = None,  
    ) -> None:  
        # 初始化线程池和其他属性  
        pass  
          
    def run(self) -> Generator[GraphEngineEvent, None, None]:  
        # 触发图运行开始事件  
        yield GraphRunStartedEvent()  
        handle\_exceptions: list[str] = []  
          
        try:  
            # 初始化流处理器  
            if self.init\_params.workflow\_type == WorkflowType.CHAT:  
                stream\_processor = AnswerStreamProcessor(  
                    graph=self.graph, variable\_pool=self.graph\_runtime\_state.variable\_pool  
                )  
            else:  
                stream\_processor = EndStreamProcessor(  
                    graph=self.graph, variable\_pool=self.graph\_runtime\_state.variable\_pool  
                )  
  
            # 运行图  
            # ...  

从源码可以看出,GraphEngine 类是工作流执行的核心,主要功能包括:

  1. 初始化 :通过构造函数接收各种参数,包括租户ID、应用ID、工作流类型、图结构、变量池等
  2. 线程池管理 :使用静态字典 workflow\_thread\_pool\_mapping 管理工作流线程池
  3. 事件生成run() 方法返回一个生成器,用于生成和传递各种事件
  4. 流处理器 :根据工作流类型(聊天或其他)选择不同的流处理器

run() 方法是执行的入口点,它首先生成一个 GraphRunStartedEvent 事件,然后根据工作流类型初始化相应的流处理器,最后执行图中的节点。整个执行过程是基于事件驱动的,通过生成器机制将事件传递给调用者。

2.3 节点(BaseNode)

节点是工作流中的基本执行单元,每个节点代表一个具体的操作或功能。可以将节点想象为工作流中的"积木块",通过组合不同类型的节点,可以构建出复杂的应用逻辑。

2.3.1 主要组件

图引擎的节点系统由以下主要组件构成:

  • BaseNode :所有节点类型的抽象基类,定义了通用接口和行为
  • 核心方法: run()\_run() (抽象方法)、 should\_retry()should\_continue\_on\_error()
  • 职责:提供统一的节点执行接口,处理错误和事件生成
  • NodeFactory :负责创建不同类型的节点实例
  • 核心方法: create\_node()
  • 职责:根据节点类型和配置创建对应的节点实例
  • NodeRunResult :表示节点执行结果的数据结构
  • 核心属性: statusoutputerrorerror\_type
  • 职责:封装节点执行的结果信息
  • NodeType :节点类型的枚举定义
  • 值: LLMTOOLITERATIONLOOPCONDITIONCODE
  • 职责:标识不同类型的节点
  • 具体节点类 :继承自BaseNode的各种节点实现
  • LLMNode:调用大语言模型
  • ToolNode:执行工具调用
  • IterationNode:迭代处理数据集合
  • LoopNode:条件循环执行
  • ConditionNode:条件分支
  • CodeNode:执行自定义代码

picture.image

2.3.2 节点类型

在 Dify 工作流中,常见的节点类型包括:

  • LLM节点 :调用大语言模型进行文本生成
  • 例如:使用 GPT-4 生成回答、创建内容摘要、翻译文本
  • 配置项:模型选择、系统提示词、温度参数等
  • 工具节点 :执行特定的工具或 API 调用
  • 例如:搜索网络、查询数据库、调用第三方服务
  • 配置项:工具名称、参数映射、超时设置等
  • 迭代节点 :对数据集合进行迭代处理
  • 例如:处理用户上传的多个文件、分析多条数据记录
  • 配置项:迭代数据源、最大迭代次数、并行设置等
  • 循环节点 :根据条件重复执行某些操作
  • 例如:持续提问直到获得满意答案、分批处理大量数据
  • 配置项:循环条件、最大循环次数、退出条件等
  • 条件节点 :根据条件决定执行路径
  • 例如:根据情感分析结果选择不同回复策略、根据用户输入类型选择处理方式
  • 配置项:条件表达式、分支映射等
  • 代码节点 :执行自定义代码逻辑
  • 例如:数据转换、复杂计算、自定义规则处理
  • 配置项:代码内容、语言选择、超时设置等

2.3.3 核心功能

  • 执行逻辑 :每种节点类型都有自己特定的执行逻辑,通过 \_run 方法实现
  • 基类定义通用接口,子类实现具体功能
  • 例如:LLM 节点实现模型调用,工具节点实现API调用
  • 变量处理 :从变量池获取输入,并将执行结果存入变量池
  • 输入变量:通过变量引用(如 {{#previous\_node.output#}} )获取其他节点的输出
  • 输出变量:将执行结果存入变量池,供后续节点使用
  • 事件生成 :执行过程中生成各种事件,用于状态更新和流程控制
  • 例如: NodeRunStartedEventNodeRunStreamChunkEventNodeRunSucceededEvent
  • 错误处理 :提供错误处理策略,包括重试和继续执行
  • 配置项:错误处理策略、最大重试次数、重试间隔等

2.3.4 错误处理策略

  • 继续执行(should\_continue\_on\_error :当节点执行失败时,记录错误但不中断工作流
  • 适用场景:非关键节点失败时,希望工作流继续执行
  • 例如:当网络搜索工具失败时,仍然可以使用LLM生成基于已有知识的回答
  • 配置示例: {"error\_strategy": "continue", "error\_message\_template": "搜索失败,但继续执行"}
  • 重试机制(should\_retry :当节点执行失败时,根据配置进行重试
  • 适用场景:临时性错误,如网络波动、服务暂时不可用
  • 例如:API调用超时、LLM服务暂时过载
  • 配置示例: {"retry": {"max\_retries": 3, "retry\_interval": 1000}}

2.3.5 工作原理

  1. 初始化 :创建节点实例,加载配置和上下文
  
node = NodeFactory.create\_node(  
    node\_type="llm",  
    node\_id="node\_1",  
    config={"model": "gpt-4", "prompt": "分析以下内容..."},  
    variable\_pool=variable\_pool  
)  

  1. 执行 :调用 run 方法,内部会调用子类实现的 \_run 方法
  
# 基类中的通用执行逻辑  
def run(self):  
    # 处理输入变量  
    self.\_process\_input\_variables()  
    # 执行节点特定逻辑  
    result = self.\_run()  
    # 处理输出变量  
    self.\_process\_output\_variables(result)  
    return result  

  1. 事件生成 :执行过程中生成各种事件(如开始、流式输出、成功、失败等)
  
# 节点执行开始事件  
yield NodeRunStartedEvent(node\_id=self.node\_id)  
# 节点流式输出事件  
yield NodeRunStreamChunkEvent(node\_id=self.node\_id, chunk=chunk)  
# 节点执行成功事件  
yield NodeRunSucceededEvent(node\_id=self.node\_id, output=output)  

  1. 错误处理 :遇到错误时,根据配置决定是重试、继续还是失败
  
try:  
    result = self.\_run()  

 
 except
  Exception as e:  
    if self.should\_retry(e):  
        # 重试逻辑  
        yield NodeRunRetryEvent(node\_id=self.node\_id, error=str(e))  
        result = self.\_run()  # 重试  
    elif self.should\_continue\_on\_error(e):  
        # 继续执行逻辑  
        yield NodeRunExceptionEvent(node\_id=self.node\_id, error=str(e))  
        returnNone# 返回空结果但不中断工作流  
    else:  
        # 失败逻辑  
        yield NodeRunFailedEvent(node\_id=self.node\_id, error=str(e))  
        raise# 向上传递异常  

  1. 结果返回 :执行完成后,返回结果或生成完成事件
2.3.5.1 流程图:节点执行流程

picture.image

2.3.5.2 时序图:节点内部执行流程

picture.image

2.3.5.3 状态图:不同类型节点的状态转换

picture.image

2.3.6源码实现

  
class BaseNode(Generic[GenericNodeData]):  
    \_node\_data\_cls: type[GenericNodeData]  
    \_node\_type: NodeType  
  
    def \_\_init\_\_(  
        self,  
        id: str,  
        config: Mapping[str, Any],  
        graph\_init\_params: "GraphInitParams",  
        graph: "Graph",  
        graph\_runtime\_state: "GraphRuntimeState",  
        previous\_node\_id: Optional[str] = None,  
        thread\_pool\_id: Optional[str] = None,  
    ) -> None:  
        # 初始化节点属性  
        pass  
          
    @abstractmethod  
    def \_run(self) -> NodeRunResult | Generator[Union[NodeEvent, "InNodeEvent"], None, None]:  
        """节点的具体运行逻辑,由子类实现"""  
        raise NotImplementedError  
          
    def run(self) -> Generator[Union[NodeEvent, "InNodeEvent"], None, None]:  
        try:  
            result = self.\_run()  
        
 
 except
  Exception as e:  
            logger.exception(f"Node {self.node\_id} failed to run")  
            result = NodeRunResult(  
                status=WorkflowNodeExecutionStatus.FAILED,  
                error=str(e),  
                error\_type="WorkflowNodeError",  
            )  
  
        if isinstance(result, NodeRunResult):  
            yield RunCompletedEvent(run\_result=result)  
        else:  
            yieldfrom result  

从源码可以看出,BaseNode 是所有节点类型的基类,采用了泛型设计,可以适应不同类型的节点数据。主要特点包括:

  1. 抽象方法设计\_run() 是一个抽象方法,必须由子类实现,包含节点的具体执行逻辑
  2. 错误处理run() 方法包装了 \_run() 方法,添加了异常捕获和处理
  3. 事件生成 :执行结果可以是单个 NodeRunResult 或者是事件生成器
  4. 统一接口 :所有节点类型都通过相同的接口进行调用,便于图引擎统一处理

节点执行的核心流程是:

  1. 图引擎调用节点的 run() 方法
  2. run() 方法调用子类实现的 \_run() 方法
  3. 如果执行成功,返回结果或生成事件
  4. 如果执行失败,捕获异常并生成失败结果

2.4 运行时状态(GraphRuntimeState)

运行时状态负责维护工作流执行过程中的各种状态信息,是工作流执行的上下文环境。

2.4.1 核心功能

  • 状态管理 :记录每个节点的执行状态(未开始、运行中、成功、失败等)
  • 变量存储 :通过变量池存储和管理工作流中的变量
  • 资源跟踪 :记录令牌使用量、LLM调用情况等资源消耗
  • 结果收集 :收集和整合工作流的输出结果

2.4.2 主要组件

  • GraphRuntimeState 类:工作流运行时的主状态容器
  • RuntimeRouteState 类:管理节点执行路径和状态
  • RouteNodeState 类:记录单个节点的执行状态和结果
  • VariablePool 类:管理工作流中的变量

picture.image

2.4.3 工作原理

  1. 初始化:工作流启动时创建运行时状态实例
  2. 状态更新:节点执行过程中不断更新状态信息
  3. 变量管理:节点执行结果存入变量池,供后续节点使用
  4. 资源统计:累计记录令牌使用量和LLM调用情况
  5. 结果汇总:工作流完成时,收集最终输出结果

2.5 变量池(VariablePool)

变量池是工作流中的数据中心,负责存储和管理工作流执行过程中的各种变量。它就像是工作流的"内存系统",使得不同节点之间可以共享数据和传递信息。

2.5.1 主要组件

变量池系统由以下主要组件构成:

  • VariablePool :变量池的核心类,负责变量的存储、访问和管理
  • 核心方法: add()get()remove()convert\_template()add\_file()get\_file()
  • 职责:提供统一的变量存储和访问接口
  • 变量字典(variable_dictionary) :内部数据结构,按命名空间组织变量
  • 核心命名空间: systemenvconversationnode\_outputstemp
  • 职责:按类型分类存储不同来源的变量
  • 文件存储(files) :专门用于存储文件类型变量的字典
  • 职责:管理工作流中的文件资源
  • 变量选择器(selector) :用于定位和访问变量的字符串表达式
  • 格式: namespace.variable\_namenode\_id.output\_name
  • 职责:提供统一的变量访问语法
  • 模板引擎 :处理包含变量引用的文本模板
  • 语法: {{#selector#}}
  • 职责:将模板中的变量引用替换为实际值

picture.image

2.5.2 变量类型

  • 系统变量 :由系统提供的内置变量,如当前时间、工作流ID等
  • 例如: system.current\_timesystem.workflow\_idsystem.user\_id
  • 特点:只读,由系统自动维护
  • 环境变量 :应用级别的配置变量,如 API密钥、服务 URL 等
  • 例如: env.openai\_api\_keyenv.search\_api\_urlenv.max\_tokens
  • 特点:全局可用,通常在应用配置中设置
  • 会话变量 :当前会话中的变量,如用户输入、对话历史等
  • 例如: conversation.messagesconversation.user\_inputconversation.context
  • 特点:会话级别持久化,跨多轮对话保持
  • 节点输出变量 :各节点执行后产生的结果变量
  • 例如: node\_1.outputllm\_node.texttool\_node.result
  • 特点:动态生成,节点执行后可用

2.5.3 核心功能

  • 变量存储 :提供统一的变量存储机制
  • 内部实现:使用嵌套字典结构存储不同类型的变量
  • 示例: variable\_dictionary = {"system": {...}, "env": {...}, "node\_outputs": {...}}
  • 变量访问 :支持通过选择器(selector)获取变量值
  • 选择器语法: namespace.variable\_namenode\_id.output\_name
  • 示例: variable\_pool.get("llm\_node.text") 返回LLM节点生成的文本
  • 变量修改 :允许添加、更新和删除变量
  • 方法: add(selector, value)remove(selector)
  • 示例: variable\_pool.add("temp.processed\_data", processed\_result)
  • 模板处理 :支持在文本中引用变量,并自动替换为实际值
  • 模板语法: {{#selector#}}
  • 示例: "你好,{{#user.name#}},今天是{{#system.current\_date#}}""你好,张三,今天是2023-05-20"
  • 文件处理 :支持存储和获取文件类型的变量
  • 方法: add\_file(selector, file)get\_file(selector)
  • 示例:存储用户上传的图片,供后续节点处理

2.5.4 工作原理

  1. 初始化 :工作流启动时创建变量池,加载系统变量、环境变量和会话变量
  
variable\_pool = VariablePool(  
    system\_variables={"current\_time": "2023-05-20T10:30:00Z", "workflow\_id": "wf\_123"},  
    environment\_variables={"openai\_api\_key": "sk-***", "max\_tokens": 1000},  
    conversation\_variables={"user\_input": "如何使用工作流?", "history": [...]}  
)  

  1. 变量引用 :使用特定语法在模板中引用变量
  
# 在LLM提示词中引用变量  
prompt = """用户问题:{{#conversation.user\_input#}}  
历史对话:{{#conversation.history#}}  
请回答上述问题。"""  

  1. 变量更新 :节点执行过程中不断更新变量值
  
# LLM节点执行后,将结果存入变量池  
variable\_pool.add("llm\_node.text", "工作流是一种自动化处理流程的方式...")  
variable\_pool.add("llm\_node.tokens", {"prompt\_tokens": 120, "completion\_tokens": 85})  

  1. 模板转换 :将包含变量引用的模板转换为实际值
  
template = "LLM的回答是:{{#llm\_node.text#}}"  
actual\_text = variable\_pool.convert\_template(template)  
# 结果:"LLM的回答是:工作流是一种自动化处理流程的方式..."  

  1. 变量传递 :节点之间通过变量池共享数据
  
# 工具节点获取LLM节点的输出作为输入  
llm\_output = variable\_pool.get("llm\_node.text")  
# 处理后将结果存回变量池  
variable\_pool.add("tool\_node.result", process\_data(llm\_output))  

2.5.5 变量选择器示例

| 变量选择器 | 描述 | 示例值 | | --- | --- | --- | | system.current\_time | 当前系统时间 | "2023-05-20T10:30:00Z" | | env.openai\_api\_key | OpenAI API密钥 | "sk-***" | | conversation.user\_input | 用户当前输入 | "如何使用工作流?" | | llm\_node.text | LLM节点生成的文本 | "工作流是一种..." | | tool\_node.result | 工具节点的执行结果 | {"data": [...]} |

2.5.5.1 流程图:变量处理流程

picture.image

2.5.5.2 时序图:变量池与节点交互

picture.image

2.5.6 源码实现

  
class VariablePool:  
    """变量池类,负责存储和管理工作流执行过程中的各种变量"""  
      
    def \_\_init\_\_(self, system\_variables=None, environment\_variables=None, conversation\_variables=None):  
        """初始化变量池  
          
        Args:  
            system\_variables: 系统变量字典  
            environment\_variables: 环境变量字典  
            conversation\_variables: 会话变量字典  
        """  
        # 初始化变量字典,包含不同类型的变量  
        self.variable\_dictionary = {  
            "system": system\_variables or {},  
            "env": environment\_variables or {},  
            "conversation": conversation\_variables or {},  
            "node\_outputs": {},  # 存储节点输出变量  
            "temp": {}  # 临时变量  
        }  
          
        # 文件存储字典,用于存储文件类型的变量  
        self.files = {}  
      
    def add(self, selector, value):  
        """添加或更新变量  
          
        Args:  
            selector: 变量选择器,格式为 "namespace.variable\_name" 或 "node\_id.output\_name"  
            value: 变量值  
              
        Returns:  
            bool: 操作是否成功  
        """  
        namespace, variable\_name = self.parse\_selector(selector)  
          
        # 系统变量是只读的,不允许修改  
        if namespace == "system":  
            returnFalse  
              
        # 确保命名空间存在  
        if namespace notin self.variable\_dictionary:  
            self.variable\_dictionary[namespace] = {}  
              
        # 添加或更新变量  
        self.variable\_dictionary[namespace][variable\_name] = value  
        returnTrue  
      
    def get(self, selector):  
        """获取变量值  
          
        Args:  
            selector: 变量选择器  
              
        Returns:  
            变量值,如果变量不存在则返回None  
        """  
        return self.\_get\_variable\_by\_selector(selector)  
      
    def remove(self, selector):  
        """删除变量  
          
        Args:  
            selector: 变量选择器  
              
        Returns:  
            bool: 操作是否成功  
        """  
        namespace, variable\_name = self.parse\_selector(selector)  
          
        # 系统变量是只读的,不允许删除  
        if namespace == "system":  
            returnFalse  
              
        # 检查命名空间和变量是否存在  
        if namespace in self.variable\_dictionary and variable\_name in self.variable\_dictionary[namespace]:  
            del self.variable\_dictionary[namespace][variable\_name]  
            returnTrue  
              
        returnFalse  
      
    def convert\_template(self, template):  
        """处理模板中的变量引用  
          
        Args:  
            template: 包含变量引用的模板字符串  
              
        Returns:  
            str: 替换变量引用后的字符串  
        """  
        ifnot template ornot isinstance(template, str):  
            return template  
              
        # 使用正则表达式查找所有变量引用  
        pattern = r"\{\{#([^#]+)#\}\}"  
        matches = re.findall(pattern, template)  
          
        result = template  
        for selector in matches:  
            # 获取变量值  
            value = self.get(selector.strip())  
              
            # 将变量值转换为字符串并替换到模板中  
            if value isnotNone:  
                if isinstance(value, (dict, list)):  
                    value\_str = json.dumps(value, ensure\_ascii=False)  
                else:  
                    value\_str = str(value)  
                      
                result = result.
   
 replace
 (f"{{{{#{selector}#}}}}", value\_str)  
                  
        return result  
      
    def add\_file(self, selector, file):  
        """添加文件类型的变量  
          
        Args:  
            selector: 变量选择器  
            file: 文件对象  
              
        Returns:  
            bool: 操作是否成功  
        """  
        self.files[selector] = file  
        returnTrue  
      
    def get\_file(self, selector):  
        """获取文件类型的变量  
          
        Args:  
            selector: 变量选择器  
              
        Returns:  
            文件对象,如果不存在则返回None  
        """  
        return self.files.get(selector)  
      
    def parse\_selector(self, selector):  
        """解析变量选择器  
          
        Args:  
            selector: 变量选择器字符串  
              
        Returns:  
            tuple: (命名空间, 变量名)  
        """  
        parts = selector.split(".", 1)  
          
        if len(parts) == 2:  
            return parts[0], parts[1]  
        else:  
            # 默认使用临时变量命名空间  
            return"temp", parts[0]  
      
    def \_get\_variable\_by\_selector(self, selector):  
        """根据选择器获取变量值  
          
        Args:  
            selector: 变量选择器  
              
        Returns:  
            变量值,如果变量不存在则返回None  
        """  
        namespace, variable\_name = self.parse\_selector(selector)  
          
        # 处理特殊情况:节点输出变量  
        if namespace notin self.variable\_dictionary and namespace.startswith("node\_"):  
            namespace = "node\_outputs"  
            variable\_name = f"{selector}"  
          
        # 检查命名空间和变量是否存在  
        if namespace in self.variable\_dictionary and variable\_name in self.variable\_dictionary[namespace]:  
            return self.variable\_dictionary[namespace][variable\_name]  
              
        returnNone  

2.5.7 变量池与事件系统的交互

变量池与事件系统紧密协作,共同支持工作流的执行。事件系统负责传递状态变化和控制流程,而变量池则负责存储和管理数据。

2.5.7.1 交互方式
  1. 事件携带变量更新 :某些事件(如 NodeRunSucceededEvent)会携带节点执行结果,图引擎接收到这些事件后,会将结果存入变量池
  
# 处理节点成功事件  
def handle\_node\_run\_succeeded\_event(self, event):  
    # 将节点输出结果存入变量池  
    node\_id = event.node\_id  
    output = event.run\_result.output  
    self.variable\_pool.add(f"{node\_id}.output", output)  

  1. 变量池支持事件生成 :节点在执行过程中,会从变量池获取所需的输入数据,处理后生成新的事件
  
# LLM节点执行  
def \_run(self, variable\_pool):  
    # 从变量池获取提示词模板  
    prompt\_template = self.node\_data.prompt\_template  
    # 处理模板中的变量引用  
    prompt = variable\_pool.convert\_template(prompt\_template)  
    # 调用LLM服务  
    
      
 response
  = self.llm\_service.generate(prompt)  
    # 生成流式输出事件  
    yield RunStreamChunkEvent(chunk\_content=
      
 response
 .text)  
    # 返回执行结果  
    return NodeRunResult(output=
      
 response
 .text)  

  1. 事件触发变量更新 :某些事件会触发变量池的更新操作,如用户输入事件会更新会话变量
  
# 处理用户输入事件  
def handle\_user\_input\_event(self, event):  
    # 更新会话变量  
    self.variable\_pool.add("conversation.user\_input", event.content)  
    # 更新对话历史  
    history = self.variable\_pool.get("conversation.history") or []  
    history.append({"role": "user", "content": event.content})  
    self.variable\_pool.add("conversation.history", history)  

2.5.8 变量池在实际应用中的使用示例

2.5.8.1 多轮对话应用
  
# 初始化变量池  
var\_pool = VariablePool(  
    system\_variables={"current\_time": "2023-05-20T10:30:00Z"},  
    conversation\_variables={"history": []}  
)  
  
# 第一轮对话  
var\_pool.add("conversation.user\_input", "你好,我想了解一下工作流")  
# LLM节点处理后  
var\_pool.add("llm\_node.text", "你好!工作流是一种自动化处理流程的方式...")  
# 更新对话历史  
history = var\_pool.get("conversation.history")  
history.append({"role": "user", "content": var\_pool.get("conversation.user\_input")})  
history.append({"role": "assistant", "content": var\_pool.get("llm\_node.text")})  
var\_pool.add("conversation.history", history)  
  
# 第二轮对话  
var\_pool.add("conversation.user\_input", "工作流有哪些节点类型?")  
# 构建带有历史上下文的提示词  
template = """历史对话:  
{{#conversation.history#}}  
  
用户问题:{{#conversation.user\_input#}}  
请回答上述问题。"""  
full\_prompt = var\_pool.convert\_template(template)  
# 结果:包含完整历史对话和当前问题的提示词  

2.5.8.2 工具调用应用
  
# 初始化变量池  
var\_pool = VariablePool(  
    system\_variables={"current\_time": "2023-05-20T10:30:00Z"},  
    environment\_variables={"weather\_api\_key": "abc123"}  
)  
  
# 用户输入  
var\_pool.add("conversation.user\_input", "今天北京的天气怎么样?")  
  
# LLM节点处理后,生成工具调用请求  
var\_pool.add("llm\_node.tool\_calls", [{  
    "name": "get\_weather",  
    "arguments": {"
 
 location
 ": "北京", "date": "today"}  
}])  
  
# 工具节点执行  
tool\_calls = var\_pool.get("llm\_node.tool\_calls")  
for call in tool\_calls:  
    if call["name"] == "get\_weather":  
        # 获取API密钥  
        api\_key = var\_pool.get("env.weather\_api\_key")  
        # 调用天气API  
        weather\_data = weather\_api.get\_weather(  
            location=call["arguments"]["location"],  
            date=call["arguments"]["date"],  
            api\_key=api\_key  
        )  
        # 存储结果  
        var\_pool.add("tool\_node.weather\_data", weather\_data)  
  
# 最终响应生成  
response\_template = "根据天气数据,{{#tool\_node.weather\_data.location#}}今天的天气是{{#tool\_node.weather\_data.description#}},温度{{#tool\_node.weather\_data.temperature#}}度。"  
final\_response = var\_pool.convert\_template(response\_template)  
# 结果:"根据天气数据,北京今天的天气是晴朗,温度25度。"  

三、事件系统

3.1 事件驱动架构概述

工作流引擎采用事件驱动架构,通过不同类型的事件传递状态和数据。事件是工作流引擎中信息传递的基本单位,它们记录了工作流执行过程中的各种状态变化和数据流转。

3.1.1 事件的作用

  • 状态通知 :通知客户端工作流和节点的执行状态(开始、成功、失败等)
  • 数据传递 :在不同组件之间传递数据(如LLM生成的文本、检索的资源等)
  • 流程控制 :触发下一步操作或处理异常情况
  • 异步通信 :支持异步操作和非阻塞执行

3.1.2 事件处理流程

  1. 事件生成:工作流引擎或节点在执行过程中生成各种事件
  2. 事件传递:事件通过生成器(Generator)机制传递给事件处理器
  3. 事件处理:事件处理器根据事件类型执行相应的处理逻辑
  4. 状态更新:根据事件更新工作流和节点的状态
  5. 客户端通知:将事件传递给客户端,用于UI更新和用户反馈
3.1.2.1 流程图:事件生命周期

picture.image

3.1.2.2 时序图:事件传递流程

picture.image

3.1.3 源码实现

  
class GraphEngineEvent(BaseModel):  
    pass  
  
class BaseGraphEvent(GraphEngineEvent):  
    pass  
  
class BaseNodeEvent(GraphEngineEvent):  
    id: str = Field(..., description="node execution id")  
    node\_id: str = Field(..., description="node id")  
    node\_type: NodeType = Field(..., description="node type")  
    node\_data: BaseNodeData = Field(..., description="node data")  
    route\_node\_state: RouteNodeState = Field(..., description="route node state")  
    # 其他属性...  
  
# 节点内部事件定义  
class RunCompletedEvent(BaseModel):  
    run\_result: NodeRunResult = Field(..., description="run result")  
  
class RunStreamChunkEvent(BaseModel):  
    chunk\_content: str = Field(..., description="chunk content")  
    from\_variable\_selector: list[str] = Field(..., description="from variable selector")  
  
# 节点事件类型定义  
NodeEvent = RunCompletedEvent | RunStreamChunkEvent | RunRetrieverResourceEvent | ModelInvokeCompletedEvent  
  
# 内部节点事件类型定义  
InNodeEvent = BaseNodeEvent | BaseParallelBranchEvent | BaseIterationEvent | BaseAgentEvent | BaseLoopEvent  

从源码可以看出,事件系统采用了继承结构设计:

  1. 基础事件类GraphEngineEvent 是所有事件的基类,基于 Pydantic 的 BaseModel
  2. 事件类型分类 :根据不同的功能划分为图事件、节点事件、并行分支事件等
  3. 内部事件 :节点内部事件如 RunCompletedEvent 用于节点内部状态传递
  4. 联合类型 :使用 Python 的联合类型(Union)定义事件类型集合

事件系统的设计使得工作流引擎可以通过统一的接口处理不同类型的事件,同时保持类型安全和数据验证。

3.2 事件类型层次结构

picture.image

3.3 图事件(Graph Events)

图事件是与整个工作流执行相关的事件,反映了工作流的整体状态变化。

3.3.1 主要图事件类型


GraphRunStartedEvent **:工作流开始执行时触发,标志着工作流的启动


GraphRunSucceededEvent **:工作流成功完成时触发,包含最终输出结果


GraphRunFailedEvent **:工作流执行失败时触发,包含错误信息和异常计数


GraphRunPartialSucceededEvent **:工作流部分成功时触发,表示有些节点失败但不影响整体结果

picture.image

3.3.2 图事件的应用场景

  • 工作流状态监控 :通过监听图事件,了解工作流的执行状态
  • 结果获取 :从成功事件中获取工作流的最终输出结果
  • 错误诊断 :从失败事件中获取错误信息,进行问题诊断
  • 性能分析 :记录工作流的开始和结束时间,分析执行性能

3.4 节点事件(Node Events)

节点事件是与单个节点执行相关的事件,反映了节点的状态变化和数据输出。

3.4.1 主要节点事件类型


NodeRunStartedEvent **:节点开始执行时触发,包含前置节点ID和并行模式信息


NodeRunStreamChunkEvent **:节点产生流式输出时触发,用于实时传递生成内容


NodeRunRetrieverResourceEvent **:节点检索到资源时触发,包含检索结果和上下文


NodeRunSucceededEvent **:节点成功执行完成时触发


NodeRunFailedEvent **:节点执行失败时触发,包含错误信息


NodeRunExceptionEvent **:节点执行过程中发生异常时触发,但允许工作流继续执行


NodeRunRetryEvent **:节点执行失败后进行重试时触发,包含重试索引和开始时间

picture.image

3.4.2 节点事件的应用场景

  • 节点状态监控 :跟踪每个节点的执行状态和进度
  • 流式输出处理 :实时处理和显示LLM生成的内容
  • 资源展示 :展示检索到的参考资料和上下文
  • 错误处理 :根据节点失败事件进行错误处理和重试
  • 性能优化 :分析节点执行时间,识别性能瓶颈

3.5 节点内部事件(Node Internal Events)

节点内部事件是节点在执行过程中产生的内部事件,用于节点内部状态传递和数据流转。这些事件通常不会直接传递给客户端,而是由节点内部处理或转换为节点事件后再传递。

3.5.1 主要节点内部事件类型

  • RunCompletedEvent :节点执行完成时产生,包含执行结果(成功或失败)
  • RunStreamChunkEvent :节点产生流式输出时产生,包含输出内容片段
  • RunRetrieverResourceEvent :节点检索到资源时产生,包含检索结果
  • ModelInvokeCompletedEvent :LLM模型调用完成时产生,包含生成文本、使用情况和完成原因

picture.image

3.5.2 节点内部事件的处理流程

  1. 事件生成:节点执行过程中生成内部事件
  2. 事件处理:节点内部处理这些事件,如更新状态、记录结果等
  3. 事件转换:将内部事件转换为节点事件(如将 RunStreamChunkEvent 转换为 NodeRunStreamChunkEvent
  4. 事件传递:将转换后的节点事件传递给图引擎

3.5.3 节点内部事件与节点事件的区别

  • 作用域不同 :内部事件仅在节点内部使用,节点事件在整个工作流中使用
  • 信息粒度不同 :内部事件通常包含更详细的原始信息,节点事件可能经过处理和聚合
  • 传递方式不同 :内部事件通过节点的 \_run 方法返回,节点事件通过图引擎的事件机制传递

3.6 并行分支事件(Parallel Branch Events)

并行分支事件是与并行执行相关的事件,用于管理和监控多个分支同时执行的状态。

3.6.1 并行执行的工作原理

在工作流中,有时需要同时执行多个独立的任务,这就需要并行分支。并行分支允许工作流从一个起始节点分叉出多个执行路径,这些路径可以同时执行,然后在某个汇合点合并结果。

3.6.2 主要并行分支事件类型


ParallelBranchRunStartedEvent **:并行分支开始执行时触发,标志着一个新分支的启动


ParallelBranchRunSucceededEvent **:并行分支成功完成时触发,表示该分支的所有节点都已成功执行


ParallelBranchRunFailedEvent **:并行分支执行失败时触发,包含错误信息

picture.image

3.6.3 并行分支事件的应用场景

  • 并行任务管理 :管理多个同时执行的任务分支
  • 资源协调 :协调并行分支间的资源使用
  • 结果汇总 :收集并汇总各个并行分支的执行结果
  • 错误隔离 :一个分支的失败不会立即影响其他分支的执行

3.7 迭代和循环事件(Iteration and Loop Events)

迭代和循环事件是与重复执行相关的事件,用于管理和监控对数据集合的迭代处理或条件循环执行。

3.7.1 迭代与循环的区别

  • 迭代(Iteration) :对一个已知的数据集合(如数组、列表)中的每个元素依次执行相同的操作
  • 循环(Loop) :根据条件重复执行某些操作,直到条件不满足为止

3.7.2 主要迭代事件类型

  • IterationRunStartedEvent :迭代开始时触发,包含输入数据和元数据
  • IterationRunNextEvent :处理下一个迭代项时触发
  • IterationRunSucceededEvent :迭代成功完成时触发,包含所有迭代的输出结果
  • IterationRunFailedEvent :迭代执行失败时触发,包含错误信息

picture.image

3.7.3 主要循环事件类型

  • LoopRunStartedEvent :循环开始时触发,包含初始条件和输入数据
  • LoopRunNextEvent :开始下一次循环时触发
  • LoopRunSucceededEvent :循环成功完成时触发,包含最终结果
  • LoopRunFailedEvent :循环执行失败时触发,包含错误信息

picture.image

3.7.4 迭代和循环事件的应用场景

  • 批量处理 :对多个数据项进行批量处理,如批量翻译、批量分析等
  • 递进处理 :通过多次循环逐步完善结果,如多轮对话、迭代优化等
  • 条件执行 :根据条件决定是否继续执行,如搜索直到找到满足条件的结果
  • 进度监控 :跟踪迭代或循环的进度,提供实时反馈

四、执行流程(Execution Flow)

工作流执行是Dify图引擎的核心功能,下面详细介绍整个执行流程。

4.1 执行流程概述

  1. 初始化阶段
  • 客户端发起工作流执行请求
  • 图引擎加载图结构和初始化变量池
  • 图引擎向客户端发送图执行开始事件
  • 节点执行阶段
  • 图引擎根据图结构确定起始节点
  • 循环执行节点,直到所有路径执行完毕
  • 每个节点执行时会产生各种事件,图引擎将这些事件转发给客户端
  • 完成阶段
  • 所有节点执行完毕后,图引擎向客户端发送图执行完成事件
  • 客户端接收最终结果和执行状态

4.2 详细执行流程图

picture.image

4.3 关键执行阶段说明

4.3.1 变量处理

在执行过程中,变量的处理是关键环节:

  • 输入变量 :从客户端传入,存储在变量池中
  • 节点间变量传递 :前一个节点的输出可以作为后续节点的输入
  • 模板变量替换 :支持在节点配置中使用模板语法引用变量

4.3.2 事件处理

事件是工作流执行过程中的重要通信机制:

  • 事件生成 :节点执行过程中生成各种事件
  • 事件传递 :图引擎接收节点事件,并转发给客户端
  • 事件处理 :客户端根据接收到的事件更新UI或执行其他操作

4.3.3 错误处理

错误处理策略决定了工作流在遇到错误时的行为:

  • 继续执行 :跳过失败节点,继续执行后续节点
  • 重试机制 :重新执行失败的节点,可设置重试次数和间隔
  • 终止执行 :停止整个工作流的执行,返回错误信息

五、错误处理机制

Dify 图引擎提供了灵活的错误处理机制,确保工作流在遇到问题时能够按照预期的方式处理。

5.1 错误处理策略

图引擎支持三种主要的错误处理策略:

  1. 继续执行(Continue)
  • 当节点执行失败时,记录错误信息但不中断工作流
  • 跳过失败的节点,继续执行后续节点
  • 适用于非关键节点,即使失败也不影响整体流程的情况
  • 重试机制(Retry)
  • 当节点执行失败时,自动重新执行该节点
  • 可配置最大重试次数和重试间隔
  • 适用于可能因临时问题(如网络波动、资源暂时不可用)导致失败的节点
  • 终止执行(Terminate)
  • 当节点执行失败时,立即停止整个工作流的执行
  • 返回详细的错误信息
  • 适用于关键节点,其失败会导致整个流程无法继续的情况

5.2 错误处理流程图

picture.image

5.3 错误处理时序图

picture.image

5.4 错误处理状态图

picture.image

5.5 错误类型与处理方式

图引擎处理的错误类型包括:

  1. 节点执行错误
  • 节点内部逻辑执行失败
  • 输入参数不符合要求
  • 依赖服务不可用
  • 变量处理错误
  • 必要变量缺失
  • 变量类型不匹配
  • 变量模板解析失败
  • 资源限制错误
  • 执行超时
  • 内存限制
  • API调用限制

5.6 错误信息传递

错误信息通过事件系统传递:

  1. 节点执行失败时,生成 NodeRunFailedEventNodeRunExceptionEvent
  2. 图引擎根据错误处理策略决定后续行为
  3. 如果策略是重试,则生成 NodeRunRetryEvent
  4. 如果策略是终止执行,则生成 GraphRunFailedEvent
  5. 如果策略是继续执行,则记录错误并继续,最终生成 GraphRunPartialSucceededEvent

5.7 错误处理最佳实践

  1. 为关键节点设置适当的错误处理策略
  • 对于影响整个流程的关键节点,建议使用终止执行策略
  • 对于可能因外部因素临时失败的节点,建议使用重试策略
  • 对于非关键节点,可以使用继续执行策略
  • 提供清晰的错误信息
  • 在节点实现中,确保抛出的异常包含详细的错误信息
  • 使用结构化的错误格式,便于客户端解析和展示
  • 实现错误恢复机制
  • 对于长时间运行的工作流,考虑实现检查点和恢复机制
  • 保存中间状态,以便在失败后可以从断点继续执行

六、总结与最佳实践

6.1 Dify 图引擎的核心优势

  1. 灵活的工作流定义
  • 支持多种节点类型(LLM、工具、条件、循环等)
  • 可自由组合节点构建复杂工作流
  • 支持并行执行、条件分支和循环迭代
  • 强大的事件驱动机制
  • 实时反馈执行状态和进度
  • 支持流式输出和资源展示
  • 便于前端实现交互式界面
  • 完善的错误处理
  • 多种错误处理策略
  • 细粒度的错误信息
  • 支持重试和部分成功
  • 高效的变量管理
  • 统一的变量存储和访问机制
  • 支持多种变量类型和作用域
  • 模板变量替换简化配置

6.2 使用建议

  1. 工作流设计
  • 将复杂任务拆分为小型、专注的节点
  • 利用并行分支提高执行效率
  • 使用条件节点处理不同场景
  • 合理设置错误处理策略
  • 事件处理
  • 实现完整的事件监听机制
  • 根据事件类型更新 UI 状态
  • 利用流式事件提供实时反馈
  • 变量使用
  • 合理规划变量命名和作用域
  • 利用模板语法简化配置
  • 注意变量类型的一致性
  • 性能优化
  • 避免创建过于复杂的工作流
  • 合理使用并行执行
  • 优化节点内部逻辑

这种基于事件驱动的图执行引擎设计使 Dify 工作流能够灵活处理各种复杂的业务逻辑,包括条件分支、并行执行、循环和错误处理,同时保持代码的模块化和可扩展性。

下一篇尝试实现一个图引擎的 MVP。

参考资料

https://github.com/langgenius/dify (v1.4.1)

七、推荐阅读

👆👆👆欢迎关注,一起进步👆👆👆

欢迎留言讨论哈

🧐点赞、分享、推荐 ,一键三连,养成习惯👍

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
在火山引擎云搜索服务上构建混合搜索的设计与实现
本次演讲将重点介绍字节跳动在混合搜索领域的探索,并探讨如何在多模态数据场景下进行海量数据搜索。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论