工作流系统(Workflow System)是 Dify 的核心组件,它支持通过可视化编程界面创建复杂的 AI 应用程序。它允许用户通过将不同的功能块连接在一起来设计工作流,以处理数据、与 AI 模型交互、管理条件并执行各种操作。
- 从零开始学 Dify
- 从零开始学 Dify-系统架构
- 从零开始学 Dify- API 后端架构原来是这样设计的
- 从零开始学 Dify- 帐户与租户管理系统设计揭秘
- 从零开始学 Dify- 模型提供者系统设计实现模型统一调用接口
- 从零开始学 Dify- RAG 知识库系统设计详解
- 从零开始学 Dify- 对话系统的关键功能
- 从零开始学 Dify- 工作流(Workflow)系统架构
- 从零开始学 Dify-扫描、加载和管理模型提供者的详细过程
- 从零开始学 Dify-详细介绍 Dify 模型运行时的核心架构
- 从零开始学 Dify - 详细介绍 Dify 工具(Tool)系统核心架构设计
- 从零开始学 Dify - Dify 的 RAG 系统如何有效地处理和检索大量文档?
- 从零开始学 Dify - 万字详解RAG父子分段模式的原理与实现
- 从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误
- 从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制
接下来将详细介绍 Dify 工作流的实现机制,通过分析代码实现、数据流动和执行过程,充分理解工作流的实现原理。
👆👆👆欢迎关注,一起进步👆👆👆
一、工作流系统概述
1.1 核心概念
Dify 工作流系统是一个基于图(Graph)的执行引擎,允许用户通过可视化界面设计和执行复杂的 AI 工作流。工作流由多种类型的节点(Node)组成,这些节点通过边(Edge)连接,形成有向图结构。
1.2 系统架构
工作流系统主要由以下几个部分组成:
- 图引擎 :负责解析工作流配置,构建执行图,并控制节点的执行顺序
- 节点实现 :各种类型节点的具体实现,如 LLM、知识检索、条件分支等
- 变量管理 :管理工作流执行过程中的变量传递和存储
- 执行记录 :记录工作流和节点的执行状态、输入输出和性能指标
二、数据模型设计
2.1 工作流数据模型
Dify 使用多个模型来表示工作流及其执行状态:
- WorkflowModel :工作流的基本信息,包括 ID、名称、描述、配置等
- WorkflowRunModel :工作流的执行记录,包括执行状态、开始时间、结束时间等
- WorkflowNodeExecutionModel :节点的执行记录,包括节点类型、输入、输出、状态等
- ConversationVariable :存储会话变量,包括名称、值类型、值等
- WorkflowDraftVariable :存储草稿工作流中的变量,包括会话变量、系统变量和节点变量
2.2 工作流节点类型
Dify 工作流支持多种类型的节点,每种节点有不同的功能和配置:
Dify 支持多种类型的节点,包括:
- START :工作流的起始节点
- END :工作流的结束节点
- LLM :大语言模型节点,用于生成文本
- **KNOWLEDGE_
RETRIEVAL** :知识检索节点,用于从知识库中检索信息
- IF_ELSE :条件分支节点,根据条件选择执行路径
- CODE :代码执行节点,执行自定义代码
- HTTP_REQUEST :HTTP 请求节点,与外部 API 交互
- TOOL :工具节点,调用预定义的工具
- AGENT :代理节点,执行复杂的任务
三、工作流执行机制
3.1 工作流执行流程
- 初始化工作流运行记录
- 解析工作流配置,构建执行图
- 从起始节点开始执行
- 根据图的边定义,确定下一个要执行的节点
- 执行节点,记录执行结果
- 重复步骤 4-5,直到达到结束节点或出现错误
- 完成工作流执行,更新运行记录
3.2 图引擎执行机制
图引擎是工作流执行的核心,负责:
- 解析节点和边配置
- 构建边映射和反向边映射
- 识别根节点和叶子节点
- 检查节点连接性和循环
- 管理并行执行
- 控制执行流程
四、变量管理机制
4.1 变量池设计
Dify 工作流使用变量池(VariablePool)管理工作流执行过程中的变量。变量池包含以下几类变量:
- 系统变量
:以
sys.
为前缀,如sys.query
(用户输入)、sys.files
(用户上传文件) - 环境变量 :工作流级别的配置变量
- 会话变量 :在会话中持久化的变量
- 节点变量 :各节点的输入输出变量
4.2 变量传递机制
节点之间通过变量池传递数据。每个节点执行时:
- 节点执行后,将输出添加到变量池中
- 下一个节点从变量池中获取所需的输入变量
- 支持通过选择器和模板字符串引用变量
- 支持文件类型变量的传递
变量的引用使用 {{#node\_id.variable\_name#}}
的模板语法。
五、节点实现机制
5.1 基础节点结构
所有节点都继承自 BaseNode
抽象类,实现自己的 \_run
方法:
所有节点都继承自 BaseNode
类,实现以下方法:
- _run :节点的具体执行逻辑
- _get_inputs :获取节点的输入变量
- _get_outputs :处理节点的输出变量
以下是 BaseNode
类的核心实现:
class BaseNode(Generic[GenericNodeData]):
\_node\_data\_cls: type[GenericNodeData]
\_node\_type: NodeType
def \_\_init\_\_(
self,
id: str,
config: Mapping[str, Any],
graph\_init\_params: "GraphInitParams",
graph: "Graph",
graph\_runtime\_state: "GraphRuntimeState",
previous\_node\_id: Optional[str] = None,
thread\_pool\_id: Optional[str] = None,
) -> None:
self.id = id
self.tenant\_id = graph\_init\_params.tenant\_id
self.app\_id = graph\_init\_params.app\_id
self.workflow\_type = graph\_init\_params.workflow\_type
self.workflow\_id = graph\_init\_params.workflow\_id
self.graph\_config = graph\_init\_params.graph\_config
self.user\_id = graph\_init\_params.user\_id
self.user\_from = graph\_init\_params.user\_from
self.invoke\_from = graph\_init\_params.invoke\_from
self.workflow\_call\_depth = graph\_init\_params.call\_depth
self.graph = graph
self.graph\_runtime\_state = graph\_runtime\_state
self.previous\_node\_id = previous\_node\_id
self.thread\_pool\_id = thread\_pool\_id
node\_id = config.get("id")
ifnot node\_id:
raise ValueError("Node ID is required.")
self.node\_id = node\_id
node\_data = self.\_node\_data\_cls.model\_validate(config.get("data", {}))
self.node\_data = node\_data
@abstractmethod
def \_run(self) -> NodeRunResult | Generator[Union[NodeEvent, "InNodeEvent"], None, None]:
"""
Run node
:return:
"""
raise NotImplementedError
def run(self) -> Generator[Union[NodeEvent, "InNodeEvent"], None, None]:
try:
result = self.\_run()
except
Exception as e:
logger.exception(f"Node {self.node\_id} failed to run")
result = NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error
=str(e),
error\_type="WorkflowNodeError",
)
if isinstance(result, NodeRunResult):
yield RunCompletedEvent(run\_result=result)
else:
yieldfrom result
BaseNode
类是所有节点的基类,它定义了节点的基本属性和方法:
- 初始化方法 :接收节点 ID、配置、图引擎参数等,初始化节点的基本属性
- 抽象方法 _run :子类必须实现的方法,包含节点的具体执行逻辑
- run 方法 :调用 _run 方法并处理异常,将结果包装为事件返回
5.2 节点类型实现
5.2.1 StartNode 实现
StartNode
是工作流的起始节点,负责将用户输入和系统变量作为节点的输出:
class StartNode(BaseNode[StartNodeData]):
\_node\_data\_cls = StartNodeData
\_node\_type = NodeType.START
def \_run(self) -> NodeRunResult:
node\_inputs = dict(self.graph\_runtime\_state.variable\_pool.user\_inputs)
system\_inputs = self.graph\_runtime\_state.variable\_pool.system\_variables
# TODO: System variables should be directly accessible, no need for special handling
# Set system variables as node outputs.
for var in system\_inputs:
node\_inputs[SYSTEM\_VARIABLE\_NODE\_ID + "." + var] = system\_inputs[var]
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=node\_inputs, outputs=node\_inputs)
StartNode
的实现非常简单,它主要完成以下工作:
- 从变量池中获取用户输入和系统变量
- 将系统变量添加到节点输入中,以
SYSTEM\_VARIABLE\_NODE\_ID.var
的形式作为键 - 返回包含这些输入和输出的
NodeRunResult
,状态为成功
5.2.2 IfElseNode 实现
IfElseNode
是条件分支节点,根据条件选择执行路径:
class IfElseNode(BaseNode[IfElseNodeData]):
\_node\_data\_cls = IfElseNodeData
\_node\_type = NodeType.IF\_ELSE
def \_run(self) -> NodeRunResult:
"""
Run node
:return:
"""
node\_inputs: dict[str, list] = {"conditions": []}
process\_data: dict[str, list] = {"condition\_results": []}
input\_conditions = []
final\_result = False
selected\_case\_id = None
condition\_processor = ConditionProcessor()
try:
# Check if the new cases structure is used
if self.node\_data.cases:
for case in self.node\_data.cases:
input\_conditions, group\_result, final\_result = condition\_processor.process\_conditions(
variable\_pool=self.graph\_runtime\_state.variable\_pool,
conditions=case.conditions,
operator=case.logical\_operator,
)
process\_data["condition\_results"].append(
{
"group": case.model\_dump(),
"results": group\_result,
"final\_result": final\_result,
}
)
# Break if a case passes (logical short-circuit)
if final\_result:
selected\_case\_id = case.case\_id # Capture the ID of the passing case
break
else:
# Fallback to old structure if cases are not defined
input\_conditions, group\_result, final\_result = \_should\_not\_use\_old\_function(
condition\_processor=condition\_processor,
variable\_pool=self.graph\_runtime\_state.variable\_pool,
conditions=self.node\_data.conditions or [],
operator=self.node\_data.logical\_operator or"and",
)
selected\_case\_id = "true"if final\_result else"false"
process\_data["condition\_results"].append(
{"group": "default", "results": group\_result, "final\_result": final\_result}
)
node\_inputs["conditions"] = input\_conditions
except
Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED, inputs=node\_inputs, process\_data=process\_data,
error
=str(e)
)
outputs = {"result": final\_result, "selected\_case\_id": selected\_case\_id}
data = NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs=node\_inputs,
process\_data=process\_data,
edge\_source\_handle=selected\_case\_id or"false", # Use case ID or 'default'
outputs=outputs,
)
return data
IfElseNode
的实现主要完成以下工作:
- 使用
ConditionProcessor
处理条件逻辑 - 遍历
cases
结构中的条件组,并根据结果确定selected\_case\_id
- 如果使用旧的结构,则调用
\_should\_not\_use\_old\_function
进行兼容处理 - 返回包含条件结果的
NodeRunResult
,并设置edge\_source\_handle
以指示下一个要执行的节点
5.2.3 LLM 节点实现
LLM 节点是工作流中最核心的节点之一,它负责调用大语言模型生成文本。LLM 节点的执行流程:
以下是 LLMNode
类的部分实现:
class LLMNode(BaseNode[LLMNodeData]):
\_node\_data\_cls = LLMNodeData
\_node\_type = NodeType.LLM
# Instance attributes specific to LLMNode.
# Output variable for file
\_file\_outputs: list["File"]
\_llm\_file\_saver: LLMFileSaver
def \_\_init\_\_(
self,
id: str,
config: Mapping[str, Any],
graph\_init\_params: "GraphInitParams",
graph: "Graph",
graph\_runtime\_state: "GraphRuntimeState",
previous\_node\_id: Optional[str] = None,
thread\_pool\_id: Optional[str] = None,
*,
llm\_file\_saver: LLMFileSaver | None = None,
) -> None:
super().\_\_init\_\_(
id=id,
config=config,
graph\_init\_params=graph\_init\_params,
graph=graph,
graph\_runtime\_state=graph\_runtime\_state,
previous\_node\_id=previous\_node\_id,
thread\_pool\_id=thread\_pool\_id,
)
# LLM file outputs, used for MultiModal outputs.
self.\_file\_outputs: list[File] = []
if llm\_file\_saver isNone:
llm\_file\_saver = FileSaverImpl(
user\_id=graph\_init\_params.user\_id,
tenant\_id=graph\_init\_params.tenant\_id,
)
self.\_llm\_file\_saver = llm\_file\_saver
def \_run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
def process\_structured\_output(text: str) -> Optional[dict[str, Any]]:
"""Process structured output if enabled"""
ifnot self.node\_data.structured\_output\_enabled ornot self.node\_data.structured\_output:
returnNone
return self.\_parse\_structured\_output(text)
node\_inputs: Optional[dict[str, Any]] = None
process\_data = None
result\_text = ""
usage = LLMUsage.empty\_usage()
finish\_reason = None
try:
# init messages template
self.node\_data.prompt\_template = self.\_transform\_chat\_messages(self.node\_data.prompt\_template)
# fetch variables and fetch values from variable pool
inputs = self.\_fetch\_inputs(node\_data=self.node\_data)
# fetch jinja2 inputs
jinja\_inputs = self.\_fetch\_jinja\_inputs(node\_data=self.node\_data)
# merge inputs
inputs.update(jinja\_inputs)
node\_inputs = {}
# fetch files
files = (
self.\_fetch\_files(selector=self.node\_data.vision.configs.variable\_selector)
if self.node\_data.vision.enabled
else []
)
if files:
node\_inputs["#files#"] = [file.to\_dict() for file in files]
# fetch context value
generator = self.\_fetch\_context(node\_data=self.node\_data)
context = None
for event in generator:
if isinstance(event, RunRetrieverResourceEvent):
context = event.context
yield event
if context:
node\_inputs["#context#"] = context
# fetch model config
model\_instance, model\_config = self.\_fetch\_model\_config(self.node\_data.model)
# ... 更多代码 ...
LLMNode
是一个典型的节点实现,负责调用大语言模型:
- 初始化节点参数和模型配置
- 处理输入变量和文件
- 构建提示消息
- 调用 LLM 模型
- 处理模型返回的结果
- 生成节点执行结果
5.2.4 ToolNode 实现
ToolNode
是工具节点,负责调用预定义的工具:
class ToolNode(BaseNode[ToolNodeData]):
"""
Tool Node
"""
\_node\_data\_cls = ToolNodeData
\_node\_type = NodeType.TOOL
def \_run(self) -> Generator:
"""
Run the tool node
"""
node\_data =
cast
(ToolNodeData, self.node\_data)
# fetch tool icon
tool\_info = {
"provider\_type": node\_data.provider\_type.value,
"provider\_id": node\_data.provider\_id,
"plugin\_unique\_identifier": node\_data.plugin\_unique\_identifier,
}
# get tool runtime
try:
from core.tools.tool\_manager import ToolManager
tool\_runtime = ToolManager.get\_workflow\_tool\_runtime(
self.tenant\_id, self.app\_id, self.node\_id, self.node\_data, self.invoke\_from
)
except
ToolNodeError as e:
yield RunCompletedEvent(
run\_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs={},
metadata={WorkflowNodeExecutionMetadataKey.TOOL\_INFO: tool\_info},
error
=f"Failed to get tool runtime: {str(e)}",
error\_type=type(e).\_\_name\_\_,
)
)
return
# get parameters
tool\_parameters = tool\_runtime.get\_merged\_runtime\_parameters() or []
parameters = self.\_generate\_parameters(
tool\_parameters=tool\_parameters,
variable\_pool=self.graph\_runtime\_state.variable\_pool,
node\_data=self.node\_data,
)
# get conversation id
conversation\_id = self.graph\_runtime\_state.variable\_pool.get\_system\_variable("conversation\_id")
# invoke tool
try:
from core.tools.entities.tool\_entities import ToolInvokeMessage
from core.tools.tool\_engine import ToolEngine
# invoke tool
tool\_invoke\_message = ToolInvokeMessage(
conversation\_id=conversation\_id,
tool\_parameters=parameters,
)
# invoke tool
tool\_response = ToolEngine.generic\_invoke(
tenant\_id=self.tenant\_id,
app\_id=self.app\_id,
tool\_runtime=tool\_runtime,
tool\_invoke\_message=tool\_invoke\_message,
user\_id=self.user\_id,
invoke\_from=self.invoke\_from,
)
# ... 处理工具响应 ...
except
Exception as e:
# ... 处理异常 ...
ToolNode
的实现主要完成以下工作:
- 获取工具信息和工具运行时
- 生成工具参数
- 获取会话 ID
- 通过
ToolEngine.generic\_invoke
调用工具 - 处理工具返回的结果
- 生成节点执行结果
5.2.5 Knowledge
Retrieval Node 实现
`Knowledge
Retrieval Node` 是知识检索节点,负责从知识库中检索相关信息:
class Knowledge
Retrieval
Node(LLMNode):
"""
Knowledge
Retrieval
Node
"""
\_node\_data\_cls = Knowledge
Retrieval
NodeData
\_node\_type = NodeType.KNOWLEDGE\_
RETRIEVAL
def \_run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
"""
Run node
"""
node\_data =
cast
(Knowledge
Retrieval
NodeData, self.node\_data)
# get query variable
query\_variable = node\_data.query\_variable
ifnot query\_variable:
yield RunCompletedEvent(
run\_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs={},
error
="Query variable is not set",
)
)
return
# get query from variable pool
query = self.graph\_runtime\_state.variable\_pool.get\_variable(query\_variable)
ifnot query:
yield RunCompletedEvent(
run\_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs={},
error
=f"Query variable {query\_variable} is empty",
)
)
return
# check rate limit
ifnot self.\_check\_rate\_limit():
yield RunCompletedEvent(
run\_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs={},
error
="Rate limit exceeded",
)
)
return
# get
retrieval
model config
retrieval\_model\_config = {
"search\_method": node\_data.search\_method,
"reranking\_enable": node\_data.reranking\_enable,
"reranking\_model": node\_data.reranking\_model,
"top\_k": node\_data.top\_k,
"score\_threshold": node\_data.score\_threshold,
}
# ... 执行知识检索逻辑 ...
`Knowledge
Retrieval Node` 的实现主要完成以下工作:
- 从变量池中提取查询变量
- 检查查询是否为空
- 进行速率限制检查
- 定义检索模型配置
- 执行知识检索
- 处理检索结果
- 生成节点执行结果
5.2.6 CodeNode 实现
CodeNode
是代码执行节点,负责执行用户定义的代码:
class CodeNode(BaseNode[CodeNodeData]):
"""
Code Node
"""
\_node\_data\_cls = CodeNodeData
\_node\_type = NodeType.CODE
def \_run(self) -> NodeRunResult:
"""
Run node
"""
node\_data =
cast
(CodeNodeData, self.node\_data)
# get code language and content
code\_language = node\_data.code\_language
code\_content = node\_data.code\_content
# get input variables
input\_variables = {}
for input\_variable in node\_data.input\_variables:
variable\_name = input\_variable.variable\_name
variable\_value = self.graph\_runtime\_state.variable\_pool.get\_variable(input\_variable.variable\_selector)
input\_variables[variable\_name] = variable\_value
# execute code
try:
from core.workflow.nodes.code.code\_executor import CodeExecutor
result = CodeExecutor.execute\_workflow\_code\_template(
code\_language=code\_language,
code\_content=code\_content,
input\_variables=input\_variables,
)
# check output variables
outputs = {}
for output\_variable in node\_data.output\_variables:
variable\_name = output\_variable.variable\_name
if variable\_name notin result:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=input\_variables,
error
=f"Output variable {variable\_name} not found in code execution result",
)
variable\_value = result[variable\_name]
variable\_type = output\_variable.variable\_type
# check variable type
if variable\_type == "
string
":
ifnot self.\_check\_string(variable\_value, output\_variable.max\_length):
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=input\_variables,
error
=f"Output variable {variable\_name} is not a valid
string
or exceeds max length",
)
elif variable\_type == "number":
ifnot self.\_check\_number(variable\_value):
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=input\_variables,
error
=f"Output variable {variable\_name} is not a valid number",
)
# ... 其他类型检查 ...
outputs[variable\_name] = variable\_value
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs=input\_variables,
outputs=outputs,
)
except
(CodeExecutionError, CodeNodeError) as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=input\_variables,
error
=str(e),
)
CodeNode
的实现主要完成以下工作:
- 获取代码语言和代码内容
- 从变量池中获取输入变量
- 通过
CodeExecutor.execute\_workflow\_code\_template
执行代码 - 检查输出变量的类型和长度
- 处理执行结果和潜在的异常
- 生成节点执行结果
5.2.7 AgentNode 实现
AgentNode
是代理节点,负责调用 AI 代理执行复杂任务:
class AgentNode(ToolNode):
"""
Agent Node
"""
\_node\_data\_cls = AgentNodeData
\_node\_type = NodeType.AGENT
def \_run(self) -> Generator:
"""
Run the agent node
"""
node\_data =
cast
(AgentNodeData, self.node\_data)
# get agent strategy
try:
from core.agent.strategy.strategy\_factory import StrategyFactory
strategy = StrategyFactory.create\_strategy(
tenant\_id=self.tenant\_id,
app\_id=self.app\_id,
strategy\_mode=node\_data.strategy\_mode,
strategy\_config=node\_data.strategy\_config,
user\_id=self.user\_id,
invoke\_from=self.invoke\_from,
)
except
Exception as e:
yield RunCompletedEvent(
run\_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs={},
error
=f"Failed to create agent strategy: {str(e)}",
)
)
return
# generate agent parameters
agent\_parameters = self.\_generate\_parameters(
tool\_parameters=node\_data.parameters,
variable\_pool=self.graph\_runtime\_state.variable\_pool,
node\_data=node\_data,
)
# get conversation id
conversation\_id = self.graph\_runtime\_state.variable\_pool.get\_system\_variable("conversation\_id")
# invoke agent
try:
agent\_response = strategy.invoke(
conversation\_id=conversation\_id,
inputs=agent\_parameters,
files=[],
)
# ... 处理代理响应 ...
except
Exception as e:
# ... 处理异常 ...
AgentNode
的实现主要完成以下工作:
- 获取代理策略
- 生成代理参数
- 获取会话 ID
- 通过
strategy.invoke
调用代理 - 处理代理返回的结果
- 生成节点执行结果
5.2.8 HttpRequestNode 实现
HttpRequestNode
是 HTTP 请求节点,负责发送 HTTP 请求并处理响应:
class HttpRequestNode(BaseNode[HttpRequestNodeData]):
"""
Http Request Node
"""
\_node\_data\_cls = HttpRequestNodeData
\_node\_type = NodeType.HTTP\_REQUEST
def \_run(self) -> NodeRunResult:
"""
Run node
"""
node\_data =
cast
(HttpRequestNodeData, self.node\_data)
# get default config
default\_config = {
"method": node\_data.method,
"url": node\_data.url,
"headers": node\_data.headers,
"params": node\_data.params,
"body": node\_data.body,
"timeout": node\_data.timeout,
"retry\_count": node\_data.retry\_count,
"retry\_interval": node\_data.retry\_interval,
}
# init executor
executor = HttpRequestExecutor(
tenant\_id=self.tenant\_id,
app\_id=self.app\_id,
user\_id=self.user\_id,
variable\_pool=self.graph\_runtime\_state.variable\_pool,
default\_config=default\_config,
)
# execute http request
try:
response
= executor.execute()
# extract files
files = []
if
response
.files:
for file in
response
.files:
files.append(file.to\_dict())
# success
if
response
.success:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs=
response
.request\_info,
outputs={
"status\_code":
response
.status\_code,
"response\_body":
response
.response\_body,
"response\_headers":
response
.response\_headers,
"files": files,
},
)
# failed
else:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=
response
.request\_info,
error
=
response
.
error
,
)
except
Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs={},
error
=str(e),
)
HttpRequestNode
的实现主要完成以下工作:
- 获取默认配置
- 初始化
HttpRequestExecutor
- 执行 HTTP 请求
- 处理响应(包括成功和失败情况)
- 提取文件
- 生成节点执行结果
六、工作流数据流动
6.1 工作流创建和发布
- 用户在界面上设计工作流,定义节点和连接
- 系统将设计转换为工作流配置
- 创建工作流模型和草稿变量
- 发布工作流,使其可被调用
6.2 工作流调试和执行
- 用户触发工作流执行
- 系统创建工作流运行记录
- 图引擎解析工作流配置,构建执行图
- 按照图的定义执行节点
- 记录每个节点的执行状态和结果
- 完成工作流执行,更新运行记录
七、图引擎机制
图引擎是工作流执行的核心,负责解析工作流图结构并执行节点。
7.1 图引擎实现
以下是 GraphEngine
类的部分实现:
class GraphEngine:
"""
Graph Engine
"""
def \_\_init\_\_(
self,
tenant\_id: str,
app\_id: str,
workflow\_type: WorkflowType,
workflow\_id: str,
user\_id: str,
invoke\_from: InvokeFrom,
) -> None:
"""
Initialize graph engine
"""
self.tenant\_id = tenant\_id
self.app\_id = app\_id
self.workflow\_type = workflow\_type
self.workflow\_id = workflow\_id
self.user\_id = user\_id
self.invoke\_from = invoke\_from
def execute(
self,
workflow\_run\_id: str,
workflow\_config: dict[str, Any],
user\_inputs: dict[str, Any],
system\_variables: dict[str, Any],
environment\_variables: dict[str, Any],
session\_variables: dict[str, Any],
*,
event\_handler: Optional[Callable[[WorkflowEvent], None]] = None,
) -> Generator[WorkflowEvent, None, None]:
"""
Execute workflow
"""
# Create graph init params
graph\_init\_params = GraphInitParams(
tenant\_id=self.tenant\_id,
app\_id=self.app\_id,
workflow\_id=self.workflow\_id,
workflow\_run\_id=workflow\_run\_id,
user\_id=self.user\_id,
invoke\_from=self.invoke\_from,
)
# Create variable pool
variable\_pool = VariablePool(
user\_inputs=user\_inputs,
system\_variables=system\_variables,
environment\_variables=environment\_variables,
session\_variables=session\_variables,
)
# Create graph runtime state
graph\_runtime\_state = GraphRuntimeState(
variable\_pool=variable\_pool,
)
# Create graph
graph = Graph(
workflow\_config=workflow\_config,
graph\_init\_params=graph\_init\_params,
graph\_runtime\_state=graph\_runtime\_state,
)
# Create thread pool
thread\_pool = GraphEngineThreadPool(max\_workers=10)
# Execute graph
try:
# Yield workflow started event
yield WorkflowStartedEvent(
workflow\_run\_id=workflow\_run\_id,
)
# Execute graph
for event in graph.execute(thread\_pool=thread\_pool):
# Handle event
if event\_handler:
event\_handler(event)
# Yield event
yield event
# Check if workflow is completed
if isinstance(event, WorkflowCompletedEvent):
break
except
Exception as e:
# Yield workflow failed event
yield WorkflowFailedEvent(
workflow\_run\_id=workflow\_run\_id,
error
=str(e),
)
finally:
# Shutdown thread pool
thread\_pool.shutdown(wait=True)
GraphEngineThreadPool
是一个继承自 ThreadPoolExecutor
的线程池,用于管理工作流的并行执行:
class GraphEngineThreadPool(ThreadPoolExecutor):
"""
Graph Engine Thread Pool
"""
def \_\_init\_\_(self, max\_workers: Optional[int] = None) -> None:
"""
Initialize graph engine thread pool
"""
super().\_\_init\_\_(max\_workers=max\_workers)
self.\_futures: dict[str,
Future
] = {}
def submit\_task(
self, thread\_pool\_id: str, fn: Callable, *args: Any, **kwargs: Any
) ->
Future
:
"""
Submit
task
to thread pool
"""
future
= self.submit(fn, *args, **kwargs)
self.\_futures[thread\_pool\_id] =
future
future
.add\_done\_callback(lambda \_: self.\_futures.pop(thread\_pool\_id, None))
return
future
def is\_full(self) -> bool:
"""
Check if thread pool is full
"""
return len(self.\_futures) >= self.\_max\_workers
def get\_future(self, thread\_pool\_id: str) -> Optional[
Future
]:
"""
Get
future
by thread pool id
"""
return self.\_futures.get(thread\_pool\_id)
7.2 图结构解析
图引擎首先解析工作流的图结构,包括:
- 解析节点 :解析工作流中的所有节点,包括节点类型、配置等
- 解析边 :解析节点之间的连接关系,包括源节点、目标节点、源端口、目标端口等
- 构建节点映射 :构建节点ID到节点对象的映射
- 构建边映射 :构建边ID到边对象的映射
7.3 图结构实现
以下是 Graph
类的部分实现,它负责解析工作流配置并执行节点:
class Graph:
"""
Graph
"""
def \_\_init\_\_(
self,
workflow\_config: dict[str, Any],
graph\_init\_params: GraphInitParams,
graph\_runtime\_state: GraphRuntimeState,
) -> None:
"""
Initialize graph
"""
self.workflow\_config = workflow\_config
self.graph\_init\_params = graph\_init\_params
self.graph\_runtime\_state = graph\_runtime\_state
# Parse workflow config
self.nodes = self.\_parse\_nodes(workflow\_config.get("nodes", {}))
self.edges = self.\_parse\_edges(workflow\_config.get("edges", {}))
# Build node and edge mappings
self.node\_mapping = self.\_build\_node\_mapping(self.nodes)
self.edge\_mapping = self.\_build\_edge\_mapping(self.edges)
# Build source and target node mappings
self.source\_node\_mapping = self.\_build\_source\_node\_mapping(self.edges)
self.target\_node\_mapping = self.\_build\_target\_node\_mapping(self.edges)
def execute(self, thread\_pool: GraphEngineThreadPool) -> Generator[WorkflowEvent, None, None]:
"""
Execute graph
"""
# Find start node
start\_node = self.\_find\_start\_node()
ifnot start\_node:
yield WorkflowFailedEvent(
workflow\_run\_id=self.graph\_init\_params.workflow\_run\_id,
error
="Start node not found",
)
return
# Execute start node
for event in self.\_execute\_node(start\_node, thread\_pool=thread\_pool):
yield event
# Yield workflow completed event
yield WorkflowCompletedEvent(
workflow\_run\_id=self.graph\_init\_params.workflow\_run\_id,
)
def \_execute\_node(
self, node: BaseNode, thread\_pool: GraphEngineThreadPool, previous\_node\_id: Optional[str] = None
) -> Generator[WorkflowEvent, None, None]:
"""
Execute node
"""
# Yield node started event
yield NodeStartedEvent(
workflow\_run\_id=self.graph\_init\_params.workflow\_run\_id,
node\_id=node.node\_id,
node\_type=node.node\_type,
)
# Run node
try:
for event in node.run():
# Handle node event
if isinstance(event, RunCompletedEvent):
# Get node run result
node\_run\_result = event.run\_result
# Update variable pool
if node\_run\_result.outputs:
for variable\_name, variable\_value in node\_run\_result.outputs.items():
self.graph\_runtime\_state.variable\_pool.add(
node\_id=node.node\_id,
variable\_name=variable\_name,
variable\_value=variable\_value,
)
# Yield node completed event
yield NodeCompletedEvent(
workflow\_run\_id=self.graph\_init\_params.workflow\_run\_id,
node\_id=node.node\_id,
node\_type=node.node\_type,
status=node\_run\_result.status,
inputs=node\_run\_result.inputs,
outputs=node\_run\_result.outputs,
process\_data=node\_run\_result.process\_data,
error
=node\_run\_result.
error
,
)
# Find next nodes
next\_nodes = self.\_find\_next\_nodes(
node\_id=node.node\_id,
edge\_source\_handle=node\_run\_result.edge\_source\_handle,
)
# Execute next nodes
for next\_node in next\_nodes:
# Check if thread pool is full
if thread\_pool.is\_full():
# Execute next node in current thread
for event in self.\_execute\_node(
node=next\_node,
thread\_pool=thread\_pool,
previous\_node\_id=node.node\_id,
):
yield event
else:
# Execute next node in new thread
thread\_pool\_id = str(uuid.uuid4())
thread\_pool.submit\_task(
thread\_pool\_id=thread\_pool\_id,
fn=self.\_execute\_node\_in\_thread,
node=next\_node,
thread\_pool=thread\_pool,
previous\_node\_id=node.node\_id,
thread\_pool\_id=thread\_pool\_id,
)
else:
# Yield other events
yield event
except
Exception as e:
# Yield node failed event
yield NodeFailedEvent(
workflow\_run\_id=self.graph\_init\_params.workflow\_run\_id,
node\_id=node.node\_id,
node\_type=node.node\_type,
error
=str(e),
)
7.4 节点执行
图引擎根据图结构执行节点:
- 确定起始节点 :通常是START节点
- 执行节点 :调用节点的run方法
- 处理节点结果 :根据节点执行结果确定下一个要执行的节点
- 处理并行执行 :如果有多个分支,可以并行执行
7.4.1 节点执行的主要流程
节点执行的主要流程如下:
- 发出节点开始事件
:触发
NodeRunStartedEvent
,通知系统节点开始执行 - 调用节点的
run
方法 :执行节点的具体逻辑 - 处理节点事件 :
- 处理
RunCompletedEvent
:获取节点执行结果 - 处理
RunStreamChunkEvent
:处理流式输出 - 处理
RunRetrieverResourceEvent
:处理检索资源
-
处理重试逻辑 :如果节点执行失败且配置了重试,则进行重试
-
更新变量池 :将节点输出变量添加到变量池中
-
发出节点完成事件 :根据执行结果触发相应事件
- 成功:触发
NodeRunSucceededEvent
- 失败:触发
NodeRunFailedEvent
- 异常但继续:触发
NodeRunExceptionEvent
-
查找下一个要执行的节点 :根据边映射和条件确定下一个节点
-
执行下一个节点 :可能是串行执行或并行执行
7.4.2 查找下一个节点的机制
在工作流执行过程中,确定下一个要执行的节点是关键步骤。GraphEngine
类的 \_run
方法实现了这一机制:
- 获取边映射 :通过
self.graph.edge\_mapping.get(next\_node\_id)
获取当前节点的所有出边 - 单边处理 :如果只有一条出边,直接获取目标节点ID
if len(edge\_mappings) == 1:
edge = edge\_mappings[0]
# 检查是否有运行条件
if edge.run\_condition:
result = ConditionManager.get\_condition\_handler(...).check(...)
if not result:
break # 条件不满足,停止执行
next\_node\_id = edge.target\_node\_id
- 多边处理 :如果有多条出边,需要根据条件或并行策略确定下一个节点
- 条件分支 :如果边有运行条件,根据条件结果确定要执行的分支
if any(edge.run\_condition for edge in edge\_mappings):
# 按条件分组
condition\_edge\_mappings: dict[str, list[GraphEdge]] = {}
# 检查每个条件组
for \_, sub\_edge\_mappings in condition\_edge\_mappings.items():
# 检查条件是否满足
result = ConditionManager.get\_condition\_handler(...).check(...)
if result:
# 条件满足,确定下一个节点
if len(sub\_edge\_mappings) == 1:
final\_node\_id = edge.target\_node\_id
else:
# 并行执行多个分支
parallel\_generator = self.\_run\_parallel\_branches(...)
- 并行分支 :如果没有条件或条件满足,可能需要并行执行多个分支
else:
parallel\_generator = self.\_run\_parallel\_branches(
edge\_mappings=edge\_mappings,
in\_parallel\_id=in\_parallel\_id,
parallel\_start\_node\_id=parallel\_start\_node\_id,
handle\_exceptions=handle\_exceptions,
)
- 并行分支执行 :通过
\_run\_parallel\_branches
方法处理并行分支
- 创建线程池和队列管理并行执行
- 为每个分支创建一个线程执行
- 收集并处理所有分支的执行结果
- 检查节点是否在当前并行分支内 :确保节点执行不会跨越并行分支边界
if in\_parallel\_id and self.graph.node\_parallel\_mapping.get(next\_node\_id, "") != in\_parallel\_id:
break
通过这种机制,工作流系统能够灵活地处理各种复杂的执行路径,包括条件分支和并行执行,确保工作流按照设计的逻辑正确执行。
八、图引擎与节点执行的通信机制
图引擎与节点执行之间的通信是通过事件驱动机制实现的,这种机制使得工作流执行过程中的各个组件能够松耦合地交互,提高了系统的可扩展性和可维护性。
8.1 事件驱动架构
工作流系统采用事件驱动架构,通过定义和传递各种事件来实现图引擎与节点之间的通信。这种架构具有以下特点:
- 松耦合 :图引擎和节点之间通过事件进行通信,而不是直接调用,降低了组件间的依赖
- 可扩展 :新的节点类型和事件类型可以轻松添加到系统中,而不需要修改现有代码
- 异步处理 :事件可以异步处理,提高系统的响应性和吞吐量
- 状态追踪 :通过事件可以追踪工作流的执行状态和历史
8.2 核心事件类型
工作流系统定义了多种事件类型,用于表示工作流执行过程中的不同状态和操作:
8.2.1 图级事件
- GraphRunStartedEvent :工作流开始执行
- GraphRunSucceededEvent :工作流成功完成
- GraphRunFailedEvent :工作流执行失败
- GraphRunPartialSucceededEvent :工作流部分成功(有些节点失败但不影响整体结果)
8.2.2 节点级事件
- NodeRunStartedEvent :节点开始执行
- NodeRunSucceededEvent :节点执行成功
- NodeRunFailedEvent :节点执行失败
- NodeRunExceptionEvent :节点执行异常但继续执行
- NodeRunRetryEvent :节点重试执行
- NodeRunStreamChunkEvent :节点产生流式输出
- NodeRunRetrieverResourceEvent :节点检索资源
8.2.3 并行分支事件
- ParallelBranchRunStartedEvent :并行分支开始执行
- ParallelBranchRunSucceededEvent :并行分支执行成功
- ParallelBranchRunFailedEvent :并行分支执行失败
8.2.4 迭代和循环事件
- IterationRunStartedEvent :迭代开始
- IterationRunNextEvent :迭代下一步
- IterationRunSucceededEvent :迭代成功完成
- IterationRunFailedEvent :迭代失败
- LoopRunStartedEvent :循环开始
- LoopRunNextEvent :循环下一步
- LoopRunSucceededEvent :循环成功完成
- LoopRunFailedEvent :循环失败
8.3 事件传递流程
事件在工作流系统中的传递流程如下:
- 事件生成 :图引擎或节点执行器生成事件
yield NodeRunStartedEvent(
id=node\_instance.id,
node\_id=node\_instance.node\_id,
node\_type=node\_instance.node\_type,
node\_data=node\_instance.node\_data,
route\_node\_state=route\_node\_state,
predecessor\_node\_id=node\_instance.previous\_node\_id,
# 其他参数...
)
- 事件传递 :通过 Python 生成器(Generator)机制传递事件
def run(self) -> Generator[GraphEngineEvent, None, None]:
# ...
generator = graph\_engine.run()
for event in generator:
# 处理事件
yield event
- 事件处理 :工作流入口点(WorkflowEntry)接收事件并分发给回调处理器
for event in generator:
if callbacks:
for callback in callbacks:
callback.on\_event(event=event)
yield event
- 回调处理 :回调处理器根据事件类型执行相应的操作
def on\_event(self, event: GraphEngineEvent) -> None:
if isinstance(event, NodeRunStartedEvent):
self.on\_workflow\_node\_execute\_started(event=event)
elif isinstance(event, NodeRunSucceededEvent):
self.on\_workflow\_node\_execute\_succeeded(event=event)
# 处理其他事件类型...
8.4 事件处理回调
工作流系统定义了回调接口,允许外部系统注册回调函数来处理工作流事件:
class WorkflowCallback(ABC):
@abstractmethod
def on\_event(self, event: GraphEngineEvent) -> None:
"""处理工作流事件"""
raise NotImplementedError
系统提供了多种内置回调实现,如:
- WorkflowLoggingCallback :记录工作流执行日志
- WorkflowAppRunnerCallback :处理应用级别的工作流事件
8.5 事件与状态管理
事件不仅用于通信,还用于管理工作流的状态:
- 节点状态追踪 :通过事件记录节点的执行状态和结果
# 节点开始执行
yield NodeRunStartedEvent(...)
# 节点执行成功
yield NodeRunSucceededEvent(...)
- 变量传递 :事件携带节点的输入和输出变量
# 节点执行成功事件包含输出变量
yield NodeRunSucceededEvent(
# ...
outputs=run\_result.outputs,
# ...
)
- 错误处理 :事件携带错误信息,用于错误处理和重试
# 节点执行失败事件包含错误信息
yield NodeRunFailedEvent(
error=route\_node\_state.failed\_reason or "Unknown error.",
# ...
)
8.6 事件转换与应用集成
工作流应用运行器(WorkflowAppRunner)将工作流事件转换为应用级别的队列事件,实现与应用系统的集成:
def \_handle\_event(self, workflow\_entry: WorkflowEntry, event: GraphEngineEvent) -> None:
if isinstance(event, NodeRunSucceededEvent):
self.\_publish\_event(
QueueNodeSucceededEvent(
node\_execution\_id=event.id,
node\_id=event.node\_id,
node\_type=event.node\_type,
node\_data=event.node\_data,
inputs=inputs,
process\_data=process\_data,
outputs=outputs,
execution\_metadata=execution\_metadata,
# 其他参数...
)
)
# 处理其他事件类型...
这种转换机制使得工作流系统能够与外部应用系统无缝集成,同时保持内部实现的独立性。
8.7 事件通信示例
以下是一个完整的事件通信流程示例,展示了从节点执行到事件处理的整个过程:
8.7.1 节点执行与事件生成
当图引擎执行一个节点时,会生成一系列事件:
# 1. 节点开始执行事件
yield NodeRunStartedEvent(
id=node\_instance.id,
node\_id=node\_instance.node\_id,
node\_type=node\_instance.node\_type,
node\_data=node\_instance.node\_data,
route\_node\_state=route\_node\_state,
# 其他参数...
)
# 2. 执行节点的run方法
generator = node\_instance.run()
for item in generator:
# 传递节点产生的事件
yield item
# 3. 节点执行成功事件
yield NodeRunSucceededEvent(
id=node\_instance.id,
node\_id=node\_instance.node\_id,
node\_type=node\_instance.node\_type,
node\_data=node\_instance.node\_data,
route\_node\_state=route\_node\_state,
# 其他参数...
)
8.7.2 事件传递与处理
事件通过工作流入口点传递给回调处理器:
# WorkflowEntry.run方法
def run(self, *, callbacks: Sequence[WorkflowCallback]) -> Generator[GraphEngineEvent, None, None]:
generator = graph\_engine.run()
for event in generator:
# 分发事件给回调处理器
for callback in callbacks:
callback.on\_event(event=event)
# 继续传递事件
yield event
8.7.3 回调处理器处理事件
回调处理器根据事件类型执行相应的操作:
# WorkflowLoggingCallback.on\_event方法
def on\_event(self, event: GraphEngineEvent) -> None:
if isinstance(event, NodeRunStartedEvent):
self.print\_text("\n[NodeRunStartedEvent]", color="yellow")
self.print\_text(f"Node ID: {event.node\_id}", color="yellow")
self.print\_text(f"Node Title: {event.node\_data.title}", color="yellow")
self.print\_text(f"Type: {event.node\_type.value}", color="yellow")
elif isinstance(event, NodeRunSucceededEvent):
self.print\_text("\n[NodeRunSucceededEvent]", color="green")
# 打印节点执行结果
if event.route\_node\_state.node\_run\_result:
node\_run\_result = event.route\_node\_state.node\_run\_result
self.print\_text(f"Outputs: {jsonable\_encoder(node\_run\_result.outputs)}", color="green")
# 处理其他事件类型...
8.7.4 应用运行器处理事件
应用运行器将工作流事件转换为应用级别的队列事件:
# WorkflowAppRunner.\_handle\_event方法
def \_handle\_event(self, workflow\_entry: WorkflowEntry, event: GraphEngineEvent) -> None:
if isinstance(event, GraphRunStartedEvent):
self.\_publish\_event(QueueWorkflowStartedEvent(...))
elif isinstance(event, NodeRunSucceededEvent):
self.\_publish\_event(QueueNodeSucceededEvent(...))
elif isinstance(event, NodeRunFailedEvent):
self.\_publish\_event(QueueNodeFailedEvent(...))
# 处理其他事件类型...
8.8 事件通信的优势
图引擎与节点执行之间基于事件的通信机制具有以下优势:
- 解耦组件 :图引擎和节点执行器通过事件进行通信,而不是直接调用,降低了组件间的耦合度
- 简化调试 :事件包含完整的上下文信息,便于调试和问题排查
- 支持异步执行 :事件可以异步处理,支持并行执行和分布式部署
- 可扩展性 :新的节点类型和事件类型可以轻松添加到系统中,而不需要修改现有代码
- 状态追踪 :通过事件可以完整记录工作流的执行状态和历史,便于监控和审计
- 错误处理 :事件携带错误信息,支持灵活的错误处理策略和重试机制
九、错误处理机制
工作流系统提供了完善的错误处理机制,包括错误策略、重试机制和异常处理,确保工作流在面对各种异常情况时能够灵活应对。
9.1 错误处理策略
工作流系统提供了两种主要的错误处理策略:
- FAIL_BRANCH :当节点执行失败时,沿着失败分支继续执行
- 将错误信息和类型添加到变量池
- 设置
edge\_source\_handle
为FAILED
,使工作流可以沿着专门处理失败情况的分支继续执行 - 适用于需要针对失败情况执行特定逻辑的场景
- DEFAULT_VALUE :当节点执行失败时,使用预定义的默认值继续执行
- 将错误信息和类型添加到变量池
- 使用节点配置中预定义的默认值作为节点输出
- 适用于即使失败也需要提供某种结果的场景
9.2 节点重试机制
对于某些类型的节点,系统支持在执行失败时进行重试:
- 重试配置 :
max\_retries
:最大重试次数retry\_interval\_seconds
:重试间隔时间(秒)
- 重试流程 :
- 节点执行失败后,检查是否配置了重试
- 如果当前重试次数小于最大重试次数,触发
NodeRunRetryEvent
事件 - 等待指定的重试间隔时间
- 重新执行节点
- 重试事件 :
- 系统触发
NodeRunRetryEvent
事件,包含重试索引、开始时间等信息 - 事件可用于监控和记录重试情况
if node\_instance.should\_retry and retries < max\_retries:
retries += 1
route\_node\_state.node\_run\_result = run\_result
yield NodeRunRetryEvent(
id=str(uuid.uuid4()),
node\_id=node\_instance.node\_id,
node\_type=node\_instance.node\_type,
node\_data=node\_instance.node\_data,
route\_node\_state=route\_node\_state,
error=run\_result.error or "Unknown
error
",
retry\_index=retries,
start\_at=retry\_start\_at,
)
time.sleep(retry\_interval)
8.3 可继续执行和可重试的节点类型
系统定义了特定类型的节点,它们在错误处理方面有特殊行为:
- 可继续执行的节点类型 (
CONTINUE\_ON\_ERROR\_NODE\_TYPE
):
- 即使执行失败,工作流也可以继续执行
- 例如:HTTP请求节点、LLM节点等
- 这些节点可以配置错误策略(FAIL_BRANCH或DEFAULT_VALUE)
- 可重试的节点类型 (
RETRY\_ON\_ERROR\_NODE\_TYPE
):
- 执行失败时可以自动重试
- 例如:HTTP请求节点、数据库操作节点等
- 这些节点可以配置最大重试次数和重试间隔
通过这些机制,工作流系统能够灵活处理各种错误情况,提高工作流的健壮性和可靠性。
- 变量管理机制
变量管理是工作流执行的重要组成部分,负责管理工作流中的变量。
8.1 变量池
变量池是工作流中所有变量的集合,包括:
- 用户输入变量 :用户提供的输入
- 系统变量 :系统提供的变量,如时间戳、会话ID等
- 环境变量 :环境相关的变量
- 会话变量 :会话相关的变量
- 节点输出变量 :节点执行后的输出变量
以下是 VariablePool
类的部分实现:
class VariablePool:
"""
Variable Pool
"""
def \_\_init\_\_(
self,
user\_inputs: dict[str, Any],
system\_variables: dict[str, Any],
environment\_variables: dict[str, Any],
session\_variables: dict[str, Any],
) -> None:
"""
Initialize variable pool
"""
self.user\_inputs = user\_inputs
self.system\_variables = system\_variables
self.environment\_variables = environment\_variables
self.session\_variables = session\_variables
# Initialize variable dictionary
self.variable\_dictionary: dict[str, Any] = {}
def add(self, node\_id: str, variable\_name: str, variable\_value: Any) -> None:
"""
Add variable to variable pool
"""
# Check if variable value is File
if isinstance(variable\_value, File):
# Convert File to dict
variable\_value = variable\_value.to\_dict()
# Add variable to variable dictionary
self.variable\_dictionary[f"{node\_id}.{variable\_name}"] = variable\_value
def get\_variable(self, variable\_selector: str) -> Any:
"""
Get variable from variable pool
"""
# Check if variable selector is empty
ifnot variable\_selector:
returnNone
# Check if variable selector is system variable
if variable\_selector.startswith(SYSTEM\_VARIABLE\_NODE\_ID):
# Get system variable
variable\_name = variable\_selector.split(".", 1)[1]
return self.get\_system\_variable(variable\_name)
# Check if variable selector is user input
if variable\_selector.startswith(USER\_INPUT\_NODE\_ID):
# Get user input
variable\_name = variable\_selector.split(".", 1)[1]
return self.get\_user\_input(variable\_name)
# Check if variable selector is environment variable
if variable\_selector.startswith(ENVIRONMENT\_VARIABLE\_NODE\_ID):
# Get environment variable
variable\_name = variable\_selector.split(".", 1)[1]
return self.get\_environment\_variable(variable\_name)
# Check if variable selector is session variable
if variable\_selector.startswith(SESSION\_VARIABLE\_NODE\_ID):
# Get session variable
variable\_name = variable\_selector.split(".", 1)[1]
return self.get\_session\_variable(variable\_name)
# Get variable from variable dictionary
return self.variable\_dictionary.get(variable\_selector)
8.2 变量传递
变量在节点之间的传递遵循以下规则:
- 变量选择器 :通过变量选择器指定要使用的变量
- 变量作用域 :变量的作用域为整个工作流
- 变量覆盖 :后执行的节点可以覆盖先执行的节点的变量
变量选择器的格式为 node\_id.variable\_name
,例如:
system.conversation\_id
:系统变量中的会话IDuser\_input.query
:用户输入中的查询node\_1.result
:节点1的输出变量 result
- 并行执行机制
工作流支持并行执行多个分支,通过 GraphEngineThreadPool
实现:
Dify 工作流支持并行执行多个分支:
- 通过
GraphParallel
模型定义并行分支 - 使用
parallel\_mapping
和node\_parallel\_mapping
管理并行关系 - 支持条件分支,根据条件选择执行路径
- 限制并行层级,避免过度复杂的执行图
十、总结
Dify 工作流系统是一个功能强大的可视化 AI 工作流引擎,通过图结构组织节点执行,使用变量池管理数据流动,支持多种节点类型、错误处理和并行执行。系统的核心组件包括:
- 工作流服务 :管理工作流的生命周期
- 工作流入口 :工作流执行的入口点
- 图引擎 :负责节点的调度和执行
- 变量池 :管理工作流中的变量
- 节点实现 :各类节点的具体实现
通过这些组件的协同工作,Dify 工作流系统能够支持从简单到复杂的 AI 应用场景,为用户提供灵活且强大的工作流设计和执行能力。
参考资料
https://github.com/langgenius/dify (v1.4.1)
十一、推荐阅读
- 从零开始学 Dify
- 从零开始学 Dify-系统架构
- 从零开始学 Dify- API 后端架构原来是这样设计的
- 从零开始学 Dify- 帐户与租户管理系统设计揭秘
- 从零开始学 Dify- 模型提供者系统设计实现模型统一调用接口
- 从零开始学 Dify- RAG 知识库系统设计详解
- 从零开始学 Dify- 对话系统的关键功能
- 从零开始学 Dify- 工作流(Workflow)系统架构
- 从零开始学 Dify-扫描、加载和管理模型提供者的详细过程
- 从零开始学 Dify-详细介绍 Dify 模型运行时的核心架构
- 从零开始学 Dify - 详细介绍 Dify 工具(Tool)系统核心架构设计
- 从零开始学 Dify - Dify 的 RAG 系统如何有效地处理和检索大量文档?
- 从零开始学 Dify - 万字详解RAG父子分段模式的原理与实现
- 从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误
- 从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制
👆👆👆欢迎关注,一起进步👆👆👆 欢迎留言讨论哈