从零开始学 Dify - 万字详解 Dify 工作流(workflow)的实现机制

技术

工作流系统(Workflow System)是 Dify 的核心组件,它支持通过可视化编程界面创建复杂的 AI 应用程序。它允许用户通过将不同的功能块连接在一起来设计工作流,以处理数据、与 AI 模型交互、管理条件并执行各种操作。

接下来将详细介绍 Dify 工作流的实现机制,通过分析代码实现、数据流动和执行过程,充分理解工作流的实现原理。

👆👆👆欢迎关注,一起进步👆👆👆

一、工作流系统概述

1.1 核心概念

Dify 工作流系统是一个基于图(Graph)的执行引擎,允许用户通过可视化界面设计和执行复杂的 AI 工作流。工作流由多种类型的节点(Node)组成,这些节点通过边(Edge)连接,形成有向图结构。

1.2 系统架构

picture.image

工作流系统主要由以下几个部分组成:

  • 图引擎 :负责解析工作流配置,构建执行图,并控制节点的执行顺序
  • 节点实现 :各种类型节点的具体实现,如 LLM、知识检索、条件分支等
  • 变量管理 :管理工作流执行过程中的变量传递和存储
  • 执行记录 :记录工作流和节点的执行状态、输入输出和性能指标

二、数据模型设计

2.1 工作流数据模型

picture.image

Dify 使用多个模型来表示工作流及其执行状态:

  • WorkflowModel :工作流的基本信息,包括 ID、名称、描述、配置等
  • WorkflowRunModel :工作流的执行记录,包括执行状态、开始时间、结束时间等
  • WorkflowNodeExecutionModel :节点的执行记录,包括节点类型、输入、输出、状态等
  • ConversationVariable :存储会话变量,包括名称、值类型、值等
  • WorkflowDraftVariable :存储草稿工作流中的变量,包括会话变量、系统变量和节点变量

2.2 工作流节点类型

Dify 工作流支持多种类型的节点,每种节点有不同的功能和配置:

picture.image

Dify 支持多种类型的节点,包括:

  • START :工作流的起始节点
  • END :工作流的结束节点
  • LLM :大语言模型节点,用于生成文本
  • **KNOWLEDGE_

RETRIEVAL** :知识检索节点,用于从知识库中检索信息

  • IF_ELSE :条件分支节点,根据条件选择执行路径
  • CODE :代码执行节点,执行自定义代码
  • HTTP_REQUEST :HTTP 请求节点,与外部 API 交互
  • TOOL :工具节点,调用预定义的工具
  • AGENT :代理节点,执行复杂的任务

三、工作流执行机制

3.1 工作流执行流程

picture.image

  1. 初始化工作流运行记录
  2. 解析工作流配置,构建执行图
  3. 从起始节点开始执行
  4. 根据图的边定义,确定下一个要执行的节点
  5. 执行节点,记录执行结果
  6. 重复步骤 4-5,直到达到结束节点或出现错误
  7. 完成工作流执行,更新运行记录

3.2 图引擎执行机制

picture.image

图引擎是工作流执行的核心,负责:

  • 解析节点和边配置
  • 构建边映射和反向边映射
  • 识别根节点和叶子节点
  • 检查节点连接性和循环
  • 管理并行执行
  • 控制执行流程

四、变量管理机制

4.1 变量池设计

Dify 工作流使用变量池(VariablePool)管理工作流执行过程中的变量。变量池包含以下几类变量:

  1. 系统变量 :以 sys. 为前缀,如 sys.query (用户输入)、 sys.files (用户上传文件)
  2. 环境变量 :工作流级别的配置变量
  3. 会话变量 :在会话中持久化的变量
  4. 节点变量 :各节点的输入输出变量

picture.image

4.2 变量传递机制

节点之间通过变量池传递数据。每个节点执行时:

  1. 节点执行后,将输出添加到变量池中
  2. 下一个节点从变量池中获取所需的输入变量
  3. 支持通过选择器和模板字符串引用变量
  4. 支持文件类型变量的传递

变量的引用使用 {{#node\_id.variable\_name#}} 的模板语法。

五、节点实现机制

5.1 基础节点结构

所有节点都继承自 BaseNode 抽象类,实现自己的 \_run 方法:

picture.image

所有节点都继承自 BaseNode 类,实现以下方法:

  • _run :节点的具体执行逻辑
  • _get_inputs :获取节点的输入变量
  • _get_outputs :处理节点的输出变量

以下是 BaseNode 类的核心实现:

  
class BaseNode(Generic[GenericNodeData]):  
    \_node\_data\_cls: type[GenericNodeData]  
    \_node\_type: NodeType  
  
    def \_\_init\_\_(  
        self,  
        id: str,  
        config: Mapping[str, Any],  
        graph\_init\_params: "GraphInitParams",  
        graph: "Graph",  
        graph\_runtime\_state: "GraphRuntimeState",  
        previous\_node\_id: Optional[str] = None,  
        thread\_pool\_id: Optional[str] = None,  
    ) -> None:  
        self.id = id  
        self.tenant\_id = graph\_init\_params.tenant\_id  
        self.app\_id = graph\_init\_params.app\_id  
        self.workflow\_type = graph\_init\_params.workflow\_type  
        self.workflow\_id = graph\_init\_params.workflow\_id  
        self.graph\_config = graph\_init\_params.graph\_config  
        self.user\_id = graph\_init\_params.user\_id  
        self.user\_from = graph\_init\_params.user\_from  
        self.invoke\_from = graph\_init\_params.invoke\_from  
        self.workflow\_call\_depth = graph\_init\_params.call\_depth  
        self.graph = graph  
        self.graph\_runtime\_state = graph\_runtime\_state  
        self.previous\_node\_id = previous\_node\_id  
        self.thread\_pool\_id = thread\_pool\_id  
  
        node\_id = config.get("id")  
        ifnot node\_id:  
            raise ValueError("Node ID is required.")  
  
        self.node\_id = node\_id  
  
        node\_data = self.\_node\_data\_cls.model\_validate(config.get("data", {}))  
        self.node\_data = node\_data  
  
    @abstractmethod  
    def \_run(self) -> NodeRunResult | Generator[Union[NodeEvent, "InNodeEvent"], None, None]:  
        """  
        Run node  
        :return:  
        """  
        raise NotImplementedError  
  
    def run(self) -> Generator[Union[NodeEvent, "InNodeEvent"], None, None]:  
        try:  
            result = self.\_run()  
        
 
 except
  Exception as e:  
            logger.exception(f"Node {self.node\_id} failed to run")  
            result = NodeRunResult(  
                status=WorkflowNodeExecutionStatus.FAILED,  
                
   
 error
 =str(e),  
                error\_type="WorkflowNodeError",  
            )  
  
        if isinstance(result, NodeRunResult):  
            yield RunCompletedEvent(run\_result=result)  
        else:  
            yieldfrom result  

BaseNode 类是所有节点的基类,它定义了节点的基本属性和方法:

  1. 初始化方法 :接收节点 ID、配置、图引擎参数等,初始化节点的基本属性
  2. 抽象方法 _run :子类必须实现的方法,包含节点的具体执行逻辑
  3. run 方法 :调用 _run 方法并处理异常,将结果包装为事件返回

5.2 节点类型实现

5.2.1 StartNode 实现

StartNode 是工作流的起始节点,负责将用户输入和系统变量作为节点的输出:

  
class StartNode(BaseNode[StartNodeData]):  
    \_node\_data\_cls = StartNodeData  
    \_node\_type = NodeType.START  
  
    def \_run(self) -> NodeRunResult:  
        node\_inputs = dict(self.graph\_runtime\_state.variable\_pool.user\_inputs)  
        system\_inputs = self.graph\_runtime\_state.variable\_pool.system\_variables  
  
        # TODO: System variables should be directly accessible, no need for special handling  
        # Set system variables as node outputs.  
        for var in system\_inputs:  
            node\_inputs[SYSTEM\_VARIABLE\_NODE\_ID + "." + var] = system\_inputs[var]  
  
        return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=node\_inputs, outputs=node\_inputs)  

StartNode 的实现非常简单,它主要完成以下工作:

  1. 从变量池中获取用户输入和系统变量
  2. 将系统变量添加到节点输入中,以 SYSTEM\_VARIABLE\_NODE\_ID.var 的形式作为键
  3. 返回包含这些输入和输出的 NodeRunResult ,状态为成功

5.2.2 IfElseNode 实现

IfElseNode 是条件分支节点,根据条件选择执行路径:

  
class IfElseNode(BaseNode[IfElseNodeData]):  
    \_node\_data\_cls = IfElseNodeData  
    \_node\_type = NodeType.IF\_ELSE  
  
    def \_run(self) -> NodeRunResult:  
        """  
        Run node  
        :return:  
        """  
        node\_inputs: dict[str, list] = {"conditions": []}  
  
        process\_data: dict[str, list] = {"condition\_results": []}  
  
        input\_conditions = []  
        final\_result = False  
        selected\_case\_id = None  
        condition\_processor = ConditionProcessor()  
        try:  
            # Check if the new cases structure is used  
            if self.node\_data.cases:  
                for case in self.node\_data.cases:  
                    input\_conditions, group\_result, final\_result = condition\_processor.process\_conditions(  
                        variable\_pool=self.graph\_runtime\_state.variable\_pool,  
                        conditions=case.conditions,  
                        operator=case.logical\_operator,  
                    )  
  
                    process\_data["condition\_results"].append(  
                        {  
                            "group": case.model\_dump(),  
                            "results": group\_result,  
                            "final\_result": final\_result,  
                        }  
                    )  
  
                    # Break if a case passes (logical short-circuit)  
                    if final\_result:  
                        selected\_case\_id = case.case\_id  # Capture the ID of the passing case  
                        break  
  
            else:  
                # Fallback to old structure if cases are not defined  
                input\_conditions, group\_result, final\_result = \_should\_not\_use\_old\_function(  
                    condition\_processor=condition\_processor,  
                    variable\_pool=self.graph\_runtime\_state.variable\_pool,  
                    conditions=self.node\_data.conditions or [],  
                    operator=self.node\_data.logical\_operator or"and",  
                )  
  
                selected\_case\_id = "true"if final\_result else"false"  
  
                process\_data["condition\_results"].append(  
                    {"group": "default", "results": group\_result, "final\_result": final\_result}  
                )  
  
            node\_inputs["conditions"] = input\_conditions  
  
        
 
 except
  Exception as e:  
            return NodeRunResult(  
                status=WorkflowNodeExecutionStatus.FAILED, inputs=node\_inputs, process\_data=process\_data, 
   
 error
 =str(e)  
            )  
  
        outputs = {"result": final\_result, "selected\_case\_id": selected\_case\_id}  
  
        data = NodeRunResult(  
            status=WorkflowNodeExecutionStatus.SUCCEEDED,  
            inputs=node\_inputs,  
            process\_data=process\_data,  
            edge\_source\_handle=selected\_case\_id or"false",  # Use case ID or 'default'  
            outputs=outputs,  
        )  
  
        return data  

IfElseNode 的实现主要完成以下工作:

  1. 使用 ConditionProcessor 处理条件逻辑
  2. 遍历 cases 结构中的条件组,并根据结果确定 selected\_case\_id
  3. 如果使用旧的结构,则调用 \_should\_not\_use\_old\_function 进行兼容处理
  4. 返回包含条件结果的 NodeRunResult ,并设置 edge\_source\_handle 以指示下一个要执行的节点

5.2.3 LLM 节点实现

LLM 节点是工作流中最核心的节点之一,它负责调用大语言模型生成文本。LLM 节点的执行流程:

picture.image

以下是 LLMNode 类的部分实现:

  
class LLMNode(BaseNode[LLMNodeData]):  
    \_node\_data\_cls = LLMNodeData  
    \_node\_type = NodeType.LLM  
  
    # Instance attributes specific to LLMNode.  
    # Output variable for file  
    \_file\_outputs: list["File"]  
    \_llm\_file\_saver: LLMFileSaver  
  
    def \_\_init\_\_(  
        self,  
        id: str,  
        config: Mapping[str, Any],  
        graph\_init\_params: "GraphInitParams",  
        graph: "Graph",  
        graph\_runtime\_state: "GraphRuntimeState",  
        previous\_node\_id: Optional[str] = None,  
        thread\_pool\_id: Optional[str] = None,  
        *,  
        llm\_file\_saver: LLMFileSaver | None = None,  
    ) -> None:  
        super().\_\_init\_\_(  
            id=id,  
            config=config,  
            graph\_init\_params=graph\_init\_params,  
            graph=graph,  
            graph\_runtime\_state=graph\_runtime\_state,  
            previous\_node\_id=previous\_node\_id,  
            thread\_pool\_id=thread\_pool\_id,  
        )  
        # LLM file outputs, used for MultiModal outputs.  
        self.\_file\_outputs: list[File] = []  
  
        if llm\_file\_saver isNone:  
            llm\_file\_saver = FileSaverImpl(  
                user\_id=graph\_init\_params.user\_id,  
                tenant\_id=graph\_init\_params.tenant\_id,  
            )  
        self.\_llm\_file\_saver = llm\_file\_saver  
  
    def \_run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:  
        def process\_structured\_output(text: str) -> Optional[dict[str, Any]]:  
            """Process structured output if enabled"""  
            ifnot self.node\_data.structured\_output\_enabled ornot self.node\_data.structured\_output:  
                returnNone  
            return self.\_parse\_structured\_output(text)  
  
        node\_inputs: Optional[dict[str, Any]] = None  
        process\_data = None  
        result\_text = ""  
        usage = LLMUsage.empty\_usage()  
        finish\_reason = None  
  
        try:  
            # init messages template  
            self.node\_data.prompt\_template = self.\_transform\_chat\_messages(self.node\_data.prompt\_template)  
  
            # fetch variables and fetch values from variable pool  
            inputs = self.\_fetch\_inputs(node\_data=self.node\_data)  
  
            # fetch jinja2 inputs  
            jinja\_inputs = self.\_fetch\_jinja\_inputs(node\_data=self.node\_data)  
  
            # merge inputs  
            inputs.update(jinja\_inputs)  
  
            node\_inputs = {}  
  
            # fetch files  
            files = (  
                self.\_fetch\_files(selector=self.node\_data.vision.configs.variable\_selector)  
                if self.node\_data.vision.enabled  
                else []  
            )  
  
            if files:  
                node\_inputs["#files#"] = [file.to\_dict() for file in files]  
  
            # fetch context value  
            generator = self.\_fetch\_context(node\_data=self.node\_data)  
            context = None  
            for event in generator:  
                if isinstance(event, RunRetrieverResourceEvent):  
                    context = event.context  
                    yield event  
            if context:  
                node\_inputs["#context#"] = context  
  
            # fetch model config  
            model\_instance, model\_config = self.\_fetch\_model\_config(self.node\_data.model)  
              
            # ... 更多代码 ...  

LLMNode 是一个典型的节点实现,负责调用大语言模型:

  1. 初始化节点参数和模型配置
  2. 处理输入变量和文件
  3. 构建提示消息
  4. 调用 LLM 模型
  5. 处理模型返回的结果
  6. 生成节点执行结果

5.2.4 ToolNode 实现

ToolNode 是工具节点,负责调用预定义的工具:

  
class ToolNode(BaseNode[ToolNodeData]):  
    """  
    Tool Node  
    """  
  
    \_node\_data\_cls = ToolNodeData  
    \_node\_type = NodeType.TOOL  
  
    def \_run(self) -> Generator:  
        """  
        Run the tool node  
        """  
  
        node\_data = 
   
 cast
 (ToolNodeData, self.node\_data)  
  
        # fetch tool icon  
        tool\_info = {  
            "provider\_type": node\_data.provider\_type.value,  
            "provider\_id": node\_data.provider\_id,  
            "plugin\_unique\_identifier": node\_data.plugin\_unique\_identifier,  
        }  
  
        # get tool runtime  
        try:  
            from core.tools.tool\_manager import ToolManager  
  
            tool\_runtime = ToolManager.get\_workflow\_tool\_runtime(  
                self.tenant\_id, self.app\_id, self.node\_id, self.node\_data, self.invoke\_from  
            )  
        
 
 except
  ToolNodeError as e:  
            yield RunCompletedEvent(  
                run\_result=NodeRunResult(  
                    status=WorkflowNodeExecutionStatus.FAILED,  
                    inputs={},  
                    metadata={WorkflowNodeExecutionMetadataKey.TOOL\_INFO: tool\_info},  
                    
   
 error
 =f"Failed to get tool runtime: {str(e)}",  
                    error\_type=type(e).\_\_name\_\_,  
                )  
            )  
            return  
  
        # get parameters  
        tool\_parameters = tool\_runtime.get\_merged\_runtime\_parameters() or []  
        parameters = self.\_generate\_parameters(  
            tool\_parameters=tool\_parameters,  
            variable\_pool=self.graph\_runtime\_state.variable\_pool,  
            node\_data=self.node\_data,  
        )  
          
        # get conversation id  
        conversation\_id = self.graph\_runtime\_state.variable\_pool.get\_system\_variable("conversation\_id")  
  
        # invoke tool  
        try:  
            from core.tools.entities.tool\_entities import ToolInvokeMessage  
            from core.tools.tool\_engine import ToolEngine  
  
            # invoke tool  
            tool\_invoke\_message = ToolInvokeMessage(  
                conversation\_id=conversation\_id,  
                tool\_parameters=parameters,  
            )  
  
            # invoke tool  
            tool\_response = ToolEngine.generic\_invoke(  
                tenant\_id=self.tenant\_id,  
                app\_id=self.app\_id,  
                tool\_runtime=tool\_runtime,  
                tool\_invoke\_message=tool\_invoke\_message,  
                user\_id=self.user\_id,  
                invoke\_from=self.invoke\_from,  
            )  
  
            # ... 处理工具响应 ...  
        
 
 except
  Exception as e:  
            # ... 处理异常 ...  

ToolNode 的实现主要完成以下工作:

  1. 获取工具信息和工具运行时
  2. 生成工具参数
  3. 获取会话 ID
  4. 通过 ToolEngine.generic\_invoke 调用工具
  5. 处理工具返回的结果
  6. 生成节点执行结果

5.2.5 Knowledge

Retrieval Node 实现

`Knowledge

Retrieval Node` 是知识检索节点,负责从知识库中检索相关信息:

  
class Knowledge
 
 Retrieval
 Node(LLMNode):  
    """  
    Knowledge 
 
 Retrieval
  Node  
    """  
  
    \_node\_data\_cls = Knowledge
   
 Retrieval
 NodeData  
    \_node\_type = NodeType.KNOWLEDGE\_
   
 RETRIEVAL
   
  
    def \_run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:  
        """  
        Run node  
        """  
        node\_data = 
   
 cast
 (Knowledge
   
 Retrieval
 NodeData, self.node\_data)  
  
        # get query variable  
        query\_variable = node\_data.query\_variable  
        ifnot query\_variable:  
            yield RunCompletedEvent(  
                run\_result=NodeRunResult(  
                    status=WorkflowNodeExecutionStatus.FAILED,  
                    inputs={},  
                    
   
 error
 ="Query variable is not set",  
                )  
            )  
            return  
  
        # get query from variable pool  
        query = self.graph\_runtime\_state.variable\_pool.get\_variable(query\_variable)  
        ifnot query:  
            yield RunCompletedEvent(  
                run\_result=NodeRunResult(  
                    status=WorkflowNodeExecutionStatus.FAILED,  
                    inputs={},  
                    
   
 error
 =f"Query variable {query\_variable} is empty",  
                )  
            )  
            return  
  
        # check rate limit  
        ifnot self.\_check\_rate\_limit():  
            yield RunCompletedEvent(  
                run\_result=NodeRunResult(  
                    status=WorkflowNodeExecutionStatus.FAILED,  
                    inputs={},  
                    
   
 error
 ="Rate limit exceeded",  
                )  
            )  
            return  
  
        # get 
 
 retrieval
  model config  
        retrieval\_model\_config = {  
            "search\_method": node\_data.search\_method,  
            "reranking\_enable": node\_data.reranking\_enable,  
            "reranking\_model": node\_data.reranking\_model,  
            "top\_k": node\_data.top\_k,  
            "score\_threshold": node\_data.score\_threshold,  
        }  
  
        # ... 执行知识检索逻辑 ...  

`Knowledge

Retrieval Node` 的实现主要完成以下工作:

  1. 从变量池中提取查询变量
  2. 检查查询是否为空
  3. 进行速率限制检查
  4. 定义检索模型配置
  5. 执行知识检索
  6. 处理检索结果
  7. 生成节点执行结果

5.2.6 CodeNode 实现

CodeNode 是代码执行节点,负责执行用户定义的代码:

  
class CodeNode(BaseNode[CodeNodeData]):  
    """  
    Code Node  
    """  
  
    \_node\_data\_cls = CodeNodeData  
    \_node\_type = NodeType.CODE  
  
    def \_run(self) -> NodeRunResult:  
        """  
        Run node  
        """  
        node\_data = 
   
 cast
 (CodeNodeData, self.node\_data)  
  
        # get code language and content  
        code\_language = node\_data.code\_language  
        code\_content = node\_data.code\_content  
  
        # get input variables  
        input\_variables = {}  
        for input\_variable in node\_data.input\_variables:  
            variable\_name = input\_variable.variable\_name  
            variable\_value = self.graph\_runtime\_state.variable\_pool.get\_variable(input\_variable.variable\_selector)  
            input\_variables[variable\_name] = variable\_value  
  
        # execute code  
        try:  
            from core.workflow.nodes.code.code\_executor import CodeExecutor  
  
            result = CodeExecutor.execute\_workflow\_code\_template(  
                code\_language=code\_language,  
                code\_content=code\_content,  
                input\_variables=input\_variables,  
            )  
  
            # check output variables  
            outputs = {}  
            for output\_variable in node\_data.output\_variables:  
                variable\_name = output\_variable.variable\_name  
                if variable\_name notin result:  
                    return NodeRunResult(  
                        status=WorkflowNodeExecutionStatus.FAILED,  
                        inputs=input\_variables,  
                        
   
 error
 =f"Output variable {variable\_name} not found in code execution result",  
                    )  
  
                variable\_value = result[variable\_name]  
                variable\_type = output\_variable.variable\_type  
  
                # check variable type  
                if variable\_type == "
 
 string
 ":  
                    ifnot self.\_check\_string(variable\_value, output\_variable.max\_length):  
                        return NodeRunResult(  
                            status=WorkflowNodeExecutionStatus.FAILED,  
                            inputs=input\_variables,  
                            
   
 error
 =f"Output variable {variable\_name} is not a valid 
 
 string
  or exceeds max length",  
                        )  
                elif variable\_type == "number":  
                    ifnot self.\_check\_number(variable\_value):  
                        return NodeRunResult(  
                            status=WorkflowNodeExecutionStatus.FAILED,  
                            inputs=input\_variables,  
                            
   
 error
 =f"Output variable {variable\_name} is not a valid number",  
                        )  
                # ... 其他类型检查 ...  
  
                outputs[variable\_name] = variable\_value  
  
            return NodeRunResult(  
                status=WorkflowNodeExecutionStatus.SUCCEEDED,  
                inputs=input\_variables,  
                outputs=outputs,  
            )  
  
        
 
 except
  (CodeExecutionError, CodeNodeError) as e:  
            return NodeRunResult(  
                status=WorkflowNodeExecutionStatus.FAILED,  
                inputs=input\_variables,  
                
   
 error
 =str(e),  
            )  

CodeNode 的实现主要完成以下工作:

  1. 获取代码语言和代码内容
  2. 从变量池中获取输入变量
  3. 通过 CodeExecutor.execute\_workflow\_code\_template 执行代码
  4. 检查输出变量的类型和长度
  5. 处理执行结果和潜在的异常
  6. 生成节点执行结果

5.2.7 AgentNode 实现

AgentNode 是代理节点,负责调用 AI 代理执行复杂任务:

  
class AgentNode(ToolNode):  
    """  
    Agent Node  
    """  
  
    \_node\_data\_cls = AgentNodeData  
    \_node\_type = NodeType.AGENT  
  
    def \_run(self) -> Generator:  
        """  
        Run the agent node  
        """  
        node\_data = 
   
 cast
 (AgentNodeData, self.node\_data)  
  
        # get agent strategy  
        try:  
            from core.agent.strategy.strategy\_factory import StrategyFactory  
  
            strategy = StrategyFactory.create\_strategy(  
                tenant\_id=self.tenant\_id,  
                app\_id=self.app\_id,  
                strategy\_mode=node\_data.strategy\_mode,  
                strategy\_config=node\_data.strategy\_config,  
                user\_id=self.user\_id,  
                invoke\_from=self.invoke\_from,  
            )  
        
 
 except
  Exception as e:  
            yield RunCompletedEvent(  
                run\_result=NodeRunResult(  
                    status=WorkflowNodeExecutionStatus.FAILED,  
                    inputs={},  
                    
   
 error
 =f"Failed to create agent strategy: {str(e)}",  
                )  
            )  
            return  
  
        # generate agent parameters  
        agent\_parameters = self.\_generate\_parameters(  
            tool\_parameters=node\_data.parameters,  
            variable\_pool=self.graph\_runtime\_state.variable\_pool,  
            node\_data=node\_data,  
        )  
  
        # get conversation id  
        conversation\_id = self.graph\_runtime\_state.variable\_pool.get\_system\_variable("conversation\_id")  
  
        # invoke agent  
        try:  
            agent\_response = strategy.invoke(  
                conversation\_id=conversation\_id,  
                inputs=agent\_parameters,  
                files=[],  
            )  
  
            # ... 处理代理响应 ...  
        
 
 except
  Exception as e:  
            # ... 处理异常 ...  

AgentNode 的实现主要完成以下工作:

  1. 获取代理策略
  2. 生成代理参数
  3. 获取会话 ID
  4. 通过 strategy.invoke 调用代理
  5. 处理代理返回的结果
  6. 生成节点执行结果

5.2.8 HttpRequestNode 实现

HttpRequestNode 是 HTTP 请求节点,负责发送 HTTP 请求并处理响应:

  
class HttpRequestNode(BaseNode[HttpRequestNodeData]):  
    """  
    Http Request Node  
    """  
  
    \_node\_data\_cls = HttpRequestNodeData  
    \_node\_type = NodeType.HTTP\_REQUEST  
  
    def \_run(self) -> NodeRunResult:  
        """  
        Run node  
        """  
        node\_data = 
   
 cast
 (HttpRequestNodeData, self.node\_data)  
  
        # get default config  
        default\_config = {  
            "method": node\_data.method,  
            "url": node\_data.url,  
            "headers": node\_data.headers,  
            "params": node\_data.params,  
            "body": node\_data.body,  
            "timeout": node\_data.timeout,  
            "retry\_count": node\_data.retry\_count,  
            "retry\_interval": node\_data.retry\_interval,  
        }  
  
        # init executor  
        executor = HttpRequestExecutor(  
            tenant\_id=self.tenant\_id,  
            app\_id=self.app\_id,  
            user\_id=self.user\_id,  
            variable\_pool=self.graph\_runtime\_state.variable\_pool,  
            default\_config=default\_config,  
        )  
  
        # execute http request  
        try:  
            
   
 response
  = executor.execute()  
  
            # extract files  
            files = []  
            if
   
 response
 .files:  
                for file in
   
 response
 .files:  
                    files.append(file.to\_dict())  
  
            # success  
            if
   
 response
 .success:  
                return NodeRunResult(  
                    status=WorkflowNodeExecutionStatus.SUCCEEDED,  
                    inputs=
   
 response
 .request\_info,  
                    outputs={  
                        "status\_code": 
   
 response
 .status\_code,  
                        "response\_body": 
   
 response
 .response\_body,  
                        "response\_headers": 
   
 response
 .response\_headers,  
                        "files": files,  
                    },  
                )  
            # failed  
            else:  
                return NodeRunResult(  
                    status=WorkflowNodeExecutionStatus.FAILED,  
                    inputs=
   
 response
 .request\_info,  
                    
   
 error
 =
   
 response
 .
   
 error
 ,  
                )  
        
 
 except
  Exception as e:  
            return NodeRunResult(  
                status=WorkflowNodeExecutionStatus.FAILED,  
                inputs={},  
                
   
 error
 =str(e),  
            )  

HttpRequestNode 的实现主要完成以下工作:

  1. 获取默认配置
  2. 初始化 HttpRequestExecutor
  3. 执行 HTTP 请求
  4. 处理响应(包括成功和失败情况)
  5. 提取文件
  6. 生成节点执行结果

六、工作流数据流动

6.1 工作流创建和发布

picture.image

  1. 用户在界面上设计工作流,定义节点和连接
  2. 系统将设计转换为工作流配置
  3. 创建工作流模型和草稿变量
  4. 发布工作流,使其可被调用

6.2 工作流调试和执行

picture.image

  1. 用户触发工作流执行
  2. 系统创建工作流运行记录
  3. 图引擎解析工作流配置,构建执行图
  4. 按照图的定义执行节点
  5. 记录每个节点的执行状态和结果
  6. 完成工作流执行,更新运行记录

七、图引擎机制

图引擎是工作流执行的核心,负责解析工作流图结构并执行节点。

7.1 图引擎实现

以下是 GraphEngine 类的部分实现:

  
class GraphEngine:  
    """  
    Graph Engine  
    """  
  
    def \_\_init\_\_(  
        self,  
        tenant\_id: str,  
        app\_id: str,  
        workflow\_type: WorkflowType,  
        workflow\_id: str,  
        user\_id: str,  
        invoke\_from: InvokeFrom,  
    ) -> None:  
        """  
        Initialize graph engine  
        """  
        self.tenant\_id = tenant\_id  
        self.app\_id = app\_id  
        self.workflow\_type = workflow\_type  
        self.workflow\_id = workflow\_id  
        self.user\_id = user\_id  
        self.invoke\_from = invoke\_from  
  
    def execute(  
        self,  
        workflow\_run\_id: str,  
        workflow\_config: dict[str, Any],  
        user\_inputs: dict[str, Any],  
        system\_variables: dict[str, Any],  
        environment\_variables: dict[str, Any],  
        session\_variables: dict[str, Any],  
        *,  
        event\_handler: Optional[Callable[[WorkflowEvent], None]] = None,  
    ) -> Generator[WorkflowEvent, None, None]:  
        """  
        Execute workflow  
        """  
        # Create graph init params  
        graph\_init\_params = GraphInitParams(  
            tenant\_id=self.tenant\_id,  
            app\_id=self.app\_id,  
            workflow\_id=self.workflow\_id,  
            workflow\_run\_id=workflow\_run\_id,  
            user\_id=self.user\_id,  
            invoke\_from=self.invoke\_from,  
        )  
  
        # Create variable pool  
        variable\_pool = VariablePool(  
            user\_inputs=user\_inputs,  
            system\_variables=system\_variables,  
            environment\_variables=environment\_variables,  
            session\_variables=session\_variables,  
        )  
  
        # Create graph runtime state  
        graph\_runtime\_state = GraphRuntimeState(  
            variable\_pool=variable\_pool,  
        )  
  
        # Create graph  
        graph = Graph(  
            workflow\_config=workflow\_config,  
            graph\_init\_params=graph\_init\_params,  
            graph\_runtime\_state=graph\_runtime\_state,  
        )  
  
        # Create thread pool  
        thread\_pool = GraphEngineThreadPool(max\_workers=10)  
  
        # Execute graph  
        try:  
            # Yield workflow started event  
            yield WorkflowStartedEvent(  
                workflow\_run\_id=workflow\_run\_id,  
            )  
  
            # Execute graph  
            for event in graph.execute(thread\_pool=thread\_pool):  
                # Handle event  
                if event\_handler:  
                    event\_handler(event)  
  
                # Yield event  
                yield event  
  
                # Check if workflow is completed  
                if isinstance(event, WorkflowCompletedEvent):  
                    break  
  
        
 
 except
  Exception as e:  
            # Yield workflow failed event  
            yield WorkflowFailedEvent(  
                workflow\_run\_id=workflow\_run\_id,  
                
   
 error
 =str(e),  
            )  
  
        finally:  
            # Shutdown thread pool  
            thread\_pool.shutdown(wait=True)  

GraphEngineThreadPool 是一个继承自 ThreadPoolExecutor 的线程池,用于管理工作流的并行执行:

  
class GraphEngineThreadPool(ThreadPoolExecutor):  
    """  
    Graph Engine Thread Pool  
    """  
  
    def \_\_init\_\_(self, max\_workers: Optional[int] = None) -> None:  
        """  
        Initialize graph engine thread pool  
        """  
        super().\_\_init\_\_(max\_workers=max\_workers)  
        self.\_futures: dict[str, 
   
 Future
 ] = {}  
  
    def submit\_task(  
        self, thread\_pool\_id: str, fn: Callable, *args: Any, **kwargs: Any  
    ) -> 
 
 Future
 :  
        """  
        Submit 
 
 task
  to thread pool  
        """  
        
   
 future
  = self.submit(fn, *args, **kwargs)  
        self.\_futures[thread\_pool\_id] = 
   
 future
   
        
   
 future
 .add\_done\_callback(lambda \_: self.\_futures.pop(thread\_pool\_id, None))  
        return
   
 future
   
  
    def is\_full(self) -> bool:  
        """  
        Check if thread pool is full  
        """  
        return len(self.\_futures) >= self.\_max\_workers  
  
    def get\_future(self, thread\_pool\_id: str) -> Optional[
 
 Future
 ]:  
        """  
        Get 
 
 future
  by thread pool id  
        """  
        return self.\_futures.get(thread\_pool\_id)  

7.2 图结构解析

图引擎首先解析工作流的图结构,包括:

  1. 解析节点 :解析工作流中的所有节点,包括节点类型、配置等
  2. 解析边 :解析节点之间的连接关系,包括源节点、目标节点、源端口、目标端口等
  3. 构建节点映射 :构建节点ID到节点对象的映射
  4. 构建边映射 :构建边ID到边对象的映射

7.3 图结构实现

以下是 Graph 类的部分实现,它负责解析工作流配置并执行节点:

  
class Graph:  
    """  
    Graph  
    """  
  
    def \_\_init\_\_(  
        self,  
        workflow\_config: dict[str, Any],  
        graph\_init\_params: GraphInitParams,  
        graph\_runtime\_state: GraphRuntimeState,  
    ) -> None:  
        """  
        Initialize graph  
        """  
        self.workflow\_config = workflow\_config  
        self.graph\_init\_params = graph\_init\_params  
        self.graph\_runtime\_state = graph\_runtime\_state  
  
        # Parse workflow config  
        self.nodes = self.\_parse\_nodes(workflow\_config.get("nodes", {}))  
        self.edges = self.\_parse\_edges(workflow\_config.get("edges", {}))  
  
        # Build node and edge mappings  
        self.node\_mapping = self.\_build\_node\_mapping(self.nodes)  
        self.edge\_mapping = self.\_build\_edge\_mapping(self.edges)  
  
        # Build source and target node mappings  
        self.source\_node\_mapping = self.\_build\_source\_node\_mapping(self.edges)  
        self.target\_node\_mapping = self.\_build\_target\_node\_mapping(self.edges)  
  
    def execute(self, thread\_pool: GraphEngineThreadPool) -> Generator[WorkflowEvent, None, None]:  
        """  
        Execute graph  
        """  
        # Find start node  
        start\_node = self.\_find\_start\_node()  
        ifnot start\_node:  
            yield WorkflowFailedEvent(  
                workflow\_run\_id=self.graph\_init\_params.workflow\_run\_id,  
                
   
 error
 ="Start node not found",  
            )  
            return  
  
        # Execute start node  
        for event in self.\_execute\_node(start\_node, thread\_pool=thread\_pool):  
            yield event  
  
        # Yield workflow completed event  
        yield WorkflowCompletedEvent(  
            workflow\_run\_id=self.graph\_init\_params.workflow\_run\_id,  
        )  
  
    def \_execute\_node(  
        self, node: BaseNode, thread\_pool: GraphEngineThreadPool, previous\_node\_id: Optional[str] = None  
    ) -> Generator[WorkflowEvent, None, None]:  
        """  
        Execute node  
        """  
        # Yield node started event  
        yield NodeStartedEvent(  
            workflow\_run\_id=self.graph\_init\_params.workflow\_run\_id,  
            node\_id=node.node\_id,  
            node\_type=node.node\_type,  
        )  
  
        # Run node  
        try:  
            for event in node.run():  
                # Handle node event  
                if isinstance(event, RunCompletedEvent):  
                    # Get node run result  
                    node\_run\_result = event.run\_result  
  
                    # Update variable pool  
                    if node\_run\_result.outputs:  
                        for variable\_name, variable\_value in node\_run\_result.outputs.items():  
                            self.graph\_runtime\_state.variable\_pool.add(  
                                node\_id=node.node\_id,  
                                variable\_name=variable\_name,  
                                variable\_value=variable\_value,  
                            )  
  
                    # Yield node completed event  
                    yield NodeCompletedEvent(  
                        workflow\_run\_id=self.graph\_init\_params.workflow\_run\_id,  
                        node\_id=node.node\_id,  
                        node\_type=node.node\_type,  
                        status=node\_run\_result.status,  
                        inputs=node\_run\_result.inputs,  
                        outputs=node\_run\_result.outputs,  
                        process\_data=node\_run\_result.process\_data,  
                        
   
 error
 =node\_run\_result.
   
 error
 ,  
                    )  
  
                    # Find next nodes  
                    next\_nodes = self.\_find\_next\_nodes(  
                        node\_id=node.node\_id,  
                        edge\_source\_handle=node\_run\_result.edge\_source\_handle,  
                    )  
  
                    # Execute next nodes  
                    for next\_node in next\_nodes:  
                        # Check if thread pool is full  
                        if thread\_pool.is\_full():  
                            # Execute next node in current thread  
                            for event in self.\_execute\_node(  
                                node=next\_node,  
                                thread\_pool=thread\_pool,  
                                previous\_node\_id=node.node\_id,  
                            ):  
                                yield event  
                        else:  
                            # Execute next node in new thread  
                            thread\_pool\_id = str(uuid.uuid4())  
                            thread\_pool.submit\_task(  
                                thread\_pool\_id=thread\_pool\_id,  
                                fn=self.\_execute\_node\_in\_thread,  
                                node=next\_node,  
                                thread\_pool=thread\_pool,  
                                previous\_node\_id=node.node\_id,  
                                thread\_pool\_id=thread\_pool\_id,  
                            )  
                else:  
                    # Yield other events  
                    yield event  
  
        
 
 except
  Exception as e:  
            # Yield node failed event  
            yield NodeFailedEvent(  
                workflow\_run\_id=self.graph\_init\_params.workflow\_run\_id,  
                node\_id=node.node\_id,  
                node\_type=node.node\_type,  
                
   
 error
 =str(e),  
            )  

7.4 节点执行

图引擎根据图结构执行节点:

  1. 确定起始节点 :通常是START节点
  2. 执行节点 :调用节点的run方法
  3. 处理节点结果 :根据节点执行结果确定下一个要执行的节点
  4. 处理并行执行 :如果有多个分支,可以并行执行

7.4.1 节点执行的主要流程

节点执行的主要流程如下:

  1. 发出节点开始事件 :触发 NodeRunStartedEvent ,通知系统节点开始执行
  2. 调用节点的 run 方法 :执行节点的具体逻辑
  3. 处理节点事件
  • 处理 RunCompletedEvent :获取节点执行结果
  • 处理 RunStreamChunkEvent :处理流式输出
  • 处理 RunRetrieverResourceEvent :处理检索资源
  • 处理重试逻辑 :如果节点执行失败且配置了重试,则进行重试

  • 更新变量池 :将节点输出变量添加到变量池中

  • 发出节点完成事件 :根据执行结果触发相应事件

  • 成功:触发 NodeRunSucceededEvent
  • 失败:触发 NodeRunFailedEvent
  • 异常但继续:触发 NodeRunExceptionEvent
  • 查找下一个要执行的节点 :根据边映射和条件确定下一个节点

  • 执行下一个节点 :可能是串行执行或并行执行

picture.image

7.4.2 查找下一个节点的机制

在工作流执行过程中,确定下一个要执行的节点是关键步骤。GraphEngine 类的 \_run 方法实现了这一机制:

  1. 获取边映射 :通过 self.graph.edge\_mapping.get(next\_node\_id) 获取当前节点的所有出边
  2. 单边处理 :如果只有一条出边,直接获取目标节点ID
  
if len(edge\_mappings) == 1:  
    edge = edge\_mappings[0]  
    # 检查是否有运行条件  
    if edge.run\_condition:  
        result = ConditionManager.get\_condition\_handler(...).check(...)  
        if not result:  
            break  # 条件不满足,停止执行  
    next\_node\_id = edge.target\_node\_id  

  1. 多边处理 :如果有多条出边,需要根据条件或并行策略确定下一个节点
  • 条件分支 :如果边有运行条件,根据条件结果确定要执行的分支
  
if any(edge.run\_condition for edge in edge\_mappings):  
    # 按条件分组  
    condition\_edge\_mappings: dict[str, list[GraphEdge]] = {}  
    # 检查每个条件组  
    for \_, sub\_edge\_mappings in condition\_edge\_mappings.items():  
        # 检查条件是否满足  
        result = ConditionManager.get\_condition\_handler(...).check(...)  
        if result:  
            # 条件满足,确定下一个节点  
            if len(sub\_edge\_mappings) == 1:  
                final\_node\_id = edge.target\_node\_id  
            else:  
                # 并行执行多个分支  
                parallel\_generator = self.\_run\_parallel\_branches(...)  

  • 并行分支 :如果没有条件或条件满足,可能需要并行执行多个分支
  
else:  
    parallel\_generator = self.\_run\_parallel\_branches(  
        edge\_mappings=edge\_mappings,  
        in\_parallel\_id=in\_parallel\_id,  
        parallel\_start\_node\_id=parallel\_start\_node\_id,  
        handle\_exceptions=handle\_exceptions,  
    )  

  • 并行分支执行 :通过 \_run\_parallel\_branches 方法处理并行分支
  • 创建线程池和队列管理并行执行
  • 为每个分支创建一个线程执行
  • 收集并处理所有分支的执行结果
  • 检查节点是否在当前并行分支内 :确保节点执行不会跨越并行分支边界
  
if in\_parallel\_id and self.graph.node\_parallel\_mapping.get(next\_node\_id, "") != in\_parallel\_id:  
    break  

通过这种机制,工作流系统能够灵活地处理各种复杂的执行路径,包括条件分支和并行执行,确保工作流按照设计的逻辑正确执行。

八、图引擎与节点执行的通信机制

图引擎与节点执行之间的通信是通过事件驱动机制实现的,这种机制使得工作流执行过程中的各个组件能够松耦合地交互,提高了系统的可扩展性和可维护性。

8.1 事件驱动架构

工作流系统采用事件驱动架构,通过定义和传递各种事件来实现图引擎与节点之间的通信。这种架构具有以下特点:

  1. 松耦合 :图引擎和节点之间通过事件进行通信,而不是直接调用,降低了组件间的依赖
  2. 可扩展 :新的节点类型和事件类型可以轻松添加到系统中,而不需要修改现有代码
  3. 异步处理 :事件可以异步处理,提高系统的响应性和吞吐量
  4. 状态追踪 :通过事件可以追踪工作流的执行状态和历史

picture.image

8.2 核心事件类型

工作流系统定义了多种事件类型,用于表示工作流执行过程中的不同状态和操作:

8.2.1 图级事件

  • GraphRunStartedEvent :工作流开始执行
  • GraphRunSucceededEvent :工作流成功完成
  • GraphRunFailedEvent :工作流执行失败
  • GraphRunPartialSucceededEvent :工作流部分成功(有些节点失败但不影响整体结果)

8.2.2 节点级事件

  • NodeRunStartedEvent :节点开始执行
  • NodeRunSucceededEvent :节点执行成功
  • NodeRunFailedEvent :节点执行失败
  • NodeRunExceptionEvent :节点执行异常但继续执行
  • NodeRunRetryEvent :节点重试执行
  • NodeRunStreamChunkEvent :节点产生流式输出
  • NodeRunRetrieverResourceEvent :节点检索资源

8.2.3 并行分支事件

  • ParallelBranchRunStartedEvent :并行分支开始执行
  • ParallelBranchRunSucceededEvent :并行分支执行成功
  • ParallelBranchRunFailedEvent :并行分支执行失败

8.2.4 迭代和循环事件

  • IterationRunStartedEvent :迭代开始
  • IterationRunNextEvent :迭代下一步
  • IterationRunSucceededEvent :迭代成功完成
  • IterationRunFailedEvent :迭代失败
  • LoopRunStartedEvent :循环开始
  • LoopRunNextEvent :循环下一步
  • LoopRunSucceededEvent :循环成功完成
  • LoopRunFailedEvent :循环失败

8.3 事件传递流程

事件在工作流系统中的传递流程如下:

  1. 事件生成 :图引擎或节点执行器生成事件
  
yield NodeRunStartedEvent(  
    id=node\_instance.id,  
    node\_id=node\_instance.node\_id,  
    node\_type=node\_instance.node\_type,  
    node\_data=node\_instance.node\_data,  
    route\_node\_state=route\_node\_state,  
    predecessor\_node\_id=node\_instance.previous\_node\_id,  
    # 其他参数...  
)  

  1. 事件传递 :通过 Python 生成器(Generator)机制传递事件
  
def run(self) -> Generator[GraphEngineEvent, None, None]:  
    # ...  
    generator = graph\_engine.run()  
    for event in generator:  
        # 处理事件  
        yield event  

  1. 事件处理 :工作流入口点(WorkflowEntry)接收事件并分发给回调处理器
  
for event in generator:  
    if callbacks:  
        for callback in callbacks:  
            callback.on\_event(event=event)  
    yield event  

  1. 回调处理 :回调处理器根据事件类型执行相应的操作
  
def on\_event(self, event: GraphEngineEvent) -> None:  
    if isinstance(event, NodeRunStartedEvent):  
        self.on\_workflow\_node\_execute\_started(event=event)  
    elif isinstance(event, NodeRunSucceededEvent):  
        self.on\_workflow\_node\_execute\_succeeded(event=event)  
    # 处理其他事件类型...  

8.4 事件处理回调

工作流系统定义了回调接口,允许外部系统注册回调函数来处理工作流事件:

  
class WorkflowCallback(ABC):  
    @abstractmethod  
    def on\_event(self, event: GraphEngineEvent) -> None:  
        """处理工作流事件"""  
        raise NotImplementedError  

系统提供了多种内置回调实现,如:

  1. WorkflowLoggingCallback :记录工作流执行日志
  2. WorkflowAppRunnerCallback :处理应用级别的工作流事件

8.5 事件与状态管理

事件不仅用于通信,还用于管理工作流的状态:

  1. 节点状态追踪 :通过事件记录节点的执行状态和结果
  
# 节点开始执行  
yield NodeRunStartedEvent(...)  
  
# 节点执行成功  
yield NodeRunSucceededEvent(...)  

  1. 变量传递 :事件携带节点的输入和输出变量
  
# 节点执行成功事件包含输出变量  
yield NodeRunSucceededEvent(  
    # ...  
    outputs=run\_result.outputs,  
    # ...  
)  

  1. 错误处理 :事件携带错误信息,用于错误处理和重试
  
# 节点执行失败事件包含错误信息  
yield NodeRunFailedEvent(  
    error=route\_node\_state.failed\_reason or "Unknown error.",  
    # ...  
)  

8.6 事件转换与应用集成

工作流应用运行器(WorkflowAppRunner)将工作流事件转换为应用级别的队列事件,实现与应用系统的集成:

  
def \_handle\_event(self, workflow\_entry: WorkflowEntry, event: GraphEngineEvent) -> None:  
    if isinstance(event, NodeRunSucceededEvent):  
        self.\_publish\_event(  
            QueueNodeSucceededEvent(  
                node\_execution\_id=event.id,  
                node\_id=event.node\_id,  
                node\_type=event.node\_type,  
                node\_data=event.node\_data,  
                inputs=inputs,  
                process\_data=process\_data,  
                outputs=outputs,  
                execution\_metadata=execution\_metadata,  
                # 其他参数...  
            )  
        )  
    # 处理其他事件类型...  

这种转换机制使得工作流系统能够与外部应用系统无缝集成,同时保持内部实现的独立性。

8.7 事件通信示例

以下是一个完整的事件通信流程示例,展示了从节点执行到事件处理的整个过程:

8.7.1 节点执行与事件生成

当图引擎执行一个节点时,会生成一系列事件:

  
# 1. 节点开始执行事件  
yield NodeRunStartedEvent(  
    id=node\_instance.id,  
    node\_id=node\_instance.node\_id,  
    node\_type=node\_instance.node\_type,  
    node\_data=node\_instance.node\_data,  
    route\_node\_state=route\_node\_state,  
    # 其他参数...  
)  
  
# 2. 执行节点的run方法  
generator = node\_instance.run()  
for item in generator:  
    # 传递节点产生的事件  
    yield item  
  
# 3. 节点执行成功事件  
yield NodeRunSucceededEvent(  
    id=node\_instance.id,  
    node\_id=node\_instance.node\_id,  
    node\_type=node\_instance.node\_type,  
    node\_data=node\_instance.node\_data,  
    route\_node\_state=route\_node\_state,  
    # 其他参数...  
)  

8.7.2 事件传递与处理

事件通过工作流入口点传递给回调处理器:

  
# WorkflowEntry.run方法  
def run(self, *, callbacks: Sequence[WorkflowCallback]) -> Generator[GraphEngineEvent, None, None]:  
    generator = graph\_engine.run()  
    for event in generator:  
        # 分发事件给回调处理器  
        for callback in callbacks:  
            callback.on\_event(event=event)  
        # 继续传递事件  
        yield event  

8.7.3 回调处理器处理事件

回调处理器根据事件类型执行相应的操作:

  
# WorkflowLoggingCallback.on\_event方法  
def on\_event(self, event: GraphEngineEvent) -> None:  
    if isinstance(event, NodeRunStartedEvent):  
        self.print\_text("\n[NodeRunStartedEvent]", color="yellow")  
        self.print\_text(f"Node ID: {event.node\_id}", color="yellow")  
        self.print\_text(f"Node Title: {event.node\_data.title}", color="yellow")  
        self.print\_text(f"Type: {event.node\_type.value}", color="yellow")  
    elif isinstance(event, NodeRunSucceededEvent):  
        self.print\_text("\n[NodeRunSucceededEvent]", color="green")  
        # 打印节点执行结果  
        if event.route\_node\_state.node\_run\_result:  
            node\_run\_result = event.route\_node\_state.node\_run\_result  
            self.print\_text(f"Outputs: {jsonable\_encoder(node\_run\_result.outputs)}", color="green")  
    # 处理其他事件类型...  

8.7.4 应用运行器处理事件

应用运行器将工作流事件转换为应用级别的队列事件:

  
# WorkflowAppRunner.\_handle\_event方法  
def \_handle\_event(self, workflow\_entry: WorkflowEntry, event: GraphEngineEvent) -> None:  
    if isinstance(event, GraphRunStartedEvent):  
        self.\_publish\_event(QueueWorkflowStartedEvent(...))  
    elif isinstance(event, NodeRunSucceededEvent):  
        self.\_publish\_event(QueueNodeSucceededEvent(...))  
    elif isinstance(event, NodeRunFailedEvent):  
        self.\_publish\_event(QueueNodeFailedEvent(...))  
    # 处理其他事件类型...  

8.8 事件通信的优势

图引擎与节点执行之间基于事件的通信机制具有以下优势:

  1. 解耦组件 :图引擎和节点执行器通过事件进行通信,而不是直接调用,降低了组件间的耦合度
  2. 简化调试 :事件包含完整的上下文信息,便于调试和问题排查
  3. 支持异步执行 :事件可以异步处理,支持并行执行和分布式部署
  4. 可扩展性 :新的节点类型和事件类型可以轻松添加到系统中,而不需要修改现有代码
  5. 状态追踪 :通过事件可以完整记录工作流的执行状态和历史,便于监控和审计
  6. 错误处理 :事件携带错误信息,支持灵活的错误处理策略和重试机制

九、错误处理机制

工作流系统提供了完善的错误处理机制,包括错误策略、重试机制和异常处理,确保工作流在面对各种异常情况时能够灵活应对。

9.1 错误处理策略

工作流系统提供了两种主要的错误处理策略:

  1. FAIL_BRANCH :当节点执行失败时,沿着失败分支继续执行
  • 将错误信息和类型添加到变量池
  • 设置 edge\_source\_handleFAILED ,使工作流可以沿着专门处理失败情况的分支继续执行
  • 适用于需要针对失败情况执行特定逻辑的场景
  • DEFAULT_VALUE :当节点执行失败时,使用预定义的默认值继续执行
  • 将错误信息和类型添加到变量池
  • 使用节点配置中预定义的默认值作为节点输出
  • 适用于即使失败也需要提供某种结果的场景

picture.image

9.2 节点重试机制

对于某些类型的节点,系统支持在执行失败时进行重试:

  1. 重试配置
  • max\_retries :最大重试次数
  • retry\_interval\_seconds :重试间隔时间(秒)
  • 重试流程
  • 节点执行失败后,检查是否配置了重试
  • 如果当前重试次数小于最大重试次数,触发 NodeRunRetryEvent 事件
  • 等待指定的重试间隔时间
  • 重新执行节点
  • 重试事件
  • 系统触发 NodeRunRetryEvent 事件,包含重试索引、开始时间等信息
  • 事件可用于监控和记录重试情况
  
if node\_instance.should\_retry and retries < max\_retries:  
    retries += 1  
    route\_node\_state.node\_run\_result = run\_result  
    yield NodeRunRetryEvent(  
        id=str(uuid.uuid4()),  
        node\_id=node\_instance.node\_id,  
        node\_type=node\_instance.node\_type,  
        node\_data=node\_instance.node\_data,  
        route\_node\_state=route\_node\_state,  
        error=run\_result.error or "Unknown 
 
 error
 ",  
        retry\_index=retries,  
        start\_at=retry\_start\_at,  
    )  
    time.sleep(retry\_interval)  

8.3 可继续执行和可重试的节点类型

系统定义了特定类型的节点,它们在错误处理方面有特殊行为:

  1. 可继续执行的节点类型CONTINUE\_ON\_ERROR\_NODE\_TYPE):
  • 即使执行失败,工作流也可以继续执行
  • 例如:HTTP请求节点、LLM节点等
  • 这些节点可以配置错误策略(FAIL_BRANCH或DEFAULT_VALUE)
  • 可重试的节点类型RETRY\_ON\_ERROR\_NODE\_TYPE):
  • 执行失败时可以自动重试
  • 例如:HTTP请求节点、数据库操作节点等
  • 这些节点可以配置最大重试次数和重试间隔

通过这些机制,工作流系统能够灵活处理各种错误情况,提高工作流的健壮性和可靠性。

  1. 变量管理机制

变量管理是工作流执行的重要组成部分,负责管理工作流中的变量。

8.1 变量池

变量池是工作流中所有变量的集合,包括:

  1. 用户输入变量 :用户提供的输入
  2. 系统变量 :系统提供的变量,如时间戳、会话ID等
  3. 环境变量 :环境相关的变量
  4. 会话变量 :会话相关的变量
  5. 节点输出变量 :节点执行后的输出变量

以下是 VariablePool 类的部分实现:

  
class VariablePool:  
    """  
    Variable Pool  
    """  
  
    def \_\_init\_\_(  
        self,  
        user\_inputs: dict[str, Any],  
        system\_variables: dict[str, Any],  
        environment\_variables: dict[str, Any],  
        session\_variables: dict[str, Any],  
    ) -> None:  
        """  
        Initialize variable pool  
        """  
        self.user\_inputs = user\_inputs  
        self.system\_variables = system\_variables  
        self.environment\_variables = environment\_variables  
        self.session\_variables = session\_variables  
  
        # Initialize variable dictionary  
        self.variable\_dictionary: dict[str, Any] = {}  
  
    def add(self, node\_id: str, variable\_name: str, variable\_value: Any) -> None:  
        """  
        Add variable to variable pool  
        """  
        # Check if variable value is File  
        if isinstance(variable\_value, File):  
            # Convert File to dict  
            variable\_value = variable\_value.to\_dict()  
  
        # Add variable to variable dictionary  
        self.variable\_dictionary[f"{node\_id}.{variable\_name}"] = variable\_value  
  
    def get\_variable(self, variable\_selector: str) -> Any:  
        """  
        Get variable from variable pool  
        """  
        # Check if variable selector is empty  
        ifnot variable\_selector:  
            returnNone  
  
        # Check if variable selector is system variable  
        if variable\_selector.startswith(SYSTEM\_VARIABLE\_NODE\_ID):  
            # Get system variable  
            variable\_name = variable\_selector.split(".", 1)[1]  
            return self.get\_system\_variable(variable\_name)  
  
        # Check if variable selector is user input  
        if variable\_selector.startswith(USER\_INPUT\_NODE\_ID):  
            # Get user input  
            variable\_name = variable\_selector.split(".", 1)[1]  
            return self.get\_user\_input(variable\_name)  
  
        # Check if variable selector is environment variable  
        if variable\_selector.startswith(ENVIRONMENT\_VARIABLE\_NODE\_ID):  
            # Get environment variable  
            variable\_name = variable\_selector.split(".", 1)[1]  
            return self.get\_environment\_variable(variable\_name)  
  
        # Check if variable selector is session variable  
        if variable\_selector.startswith(SESSION\_VARIABLE\_NODE\_ID):  
            # Get session variable  
            variable\_name = variable\_selector.split(".", 1)[1]  
            return self.get\_session\_variable(variable\_name)  
  
        # Get variable from variable dictionary  
        return self.variable\_dictionary.get(variable\_selector)  

8.2 变量传递

变量在节点之间的传递遵循以下规则:

  1. 变量选择器 :通过变量选择器指定要使用的变量
  2. 变量作用域 :变量的作用域为整个工作流
  3. 变量覆盖 :后执行的节点可以覆盖先执行的节点的变量

变量选择器的格式为 node\_id.variable\_name,例如:

  • system.conversation\_id :系统变量中的会话ID
  • user\_input.query :用户输入中的查询
  • node\_1.result :节点1的输出变量 result
  1. 并行执行机制

工作流支持并行执行多个分支,通过 GraphEngineThreadPool 实现:

picture.image

Dify 工作流支持并行执行多个分支:

  1. 通过 GraphParallel 模型定义并行分支
  2. 使用 parallel\_mappingnode\_parallel\_mapping 管理并行关系
  3. 支持条件分支,根据条件选择执行路径
  4. 限制并行层级,避免过度复杂的执行图

十、总结

Dify 工作流系统是一个功能强大的可视化 AI 工作流引擎,通过图结构组织节点执行,使用变量池管理数据流动,支持多种节点类型、错误处理和并行执行。系统的核心组件包括:

  1. 工作流服务 :管理工作流的生命周期
  2. 工作流入口 :工作流执行的入口点
  3. 图引擎 :负责节点的调度和执行
  4. 变量池 :管理工作流中的变量
  5. 节点实现 :各类节点的具体实现

通过这些组件的协同工作,Dify 工作流系统能够支持从简单到复杂的 AI 应用场景,为用户提供灵活且强大的工作流设计和执行能力。

参考资料

https://github.com/langgenius/dify (v1.4.1)

十一、推荐阅读

👆👆👆欢迎关注,一起进步👆👆👆 欢迎留言讨论哈

🧐点赞、分享、推荐 ,一键三连,养成习惯👍

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
火山引擎HTTPDNS边缘云原生技术实践
《火山引擎HTTPDNS边缘云原生技术实践》 赵彦奇 | 火山引擎边缘云网络研发工程师
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论