AdvancedChatAppRunner 是 Dify 多轮对话功能的核心执行器,它采用了「编排式」设计理念,将复杂的对话流程抽象为可视化工作流,使用户能够通过图形界面设计对话流程,而无需编写复杂的代码。
阅读本文前,最好了解以下内容:
从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误
从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制
从零开始学 Dify - 万字详解 Dify 工作流(workflow)的实现机制
从零开始学 Dify - 万字详解 Dify 工作流图引擎(GraphEngine)的实现机制
👆👆👆欢迎关注,一起进步👆👆👆
一、多轮对话概述
Dify 是一个强大的 AI 应用开发平台,其多轮对话功能是核心特性之一。多轮对话允许用户与 AI 进行连续的交互,系统能够记住对话历史并在此基础上生成连贯的回复。
二、核心组件
2.1 对话模型(Conversation)
Dify 使用 Conversation 模型来表示一个完整的对话会话:
Conversation实体存储对话的基本信息,如 ID、应用 ID、模型配置等- 每个对话包含多个
Message,代表用户输入和 AI 回复 Message可以关联MessageFile,支持多模态对话ConversationVariable存储会话级别的变量,支持跨轮对话状态保持WorkflowRun记录工作流执行实例,每次对话会创建一个工作流运行记录NodeRun记录节点执行状态,跟踪工作流中每个节点的执行情况
2.2 工作流引擎
Dify 的多轮对话基于工作流引擎实现,工作流引擎是一个基于图(Graph)的执行系统:
工作流引擎的主要组件:(详细内容可参考:从零开始学 Dify - 万字详解 Dify 工作流(workflow)的实现机制 和 从零开始学 Dify - 万字详解 Dify 工作流图引擎(GraphEngine)的实现机制)
- 工作流入口(WorkflowEntry) :工作流执行的起点,负责初始化执行环境
- 图引擎(GraphEngine) :解析工作流配置,构建执行图,控制节点执行顺序
- 节点(Node) :工作流的基本执行单元,如 LLM 节点、条件节点等
- 变量池(VariablePool) :管理工作流执行过程中的变量
2.3 变量池(VariablePool)
变量池是多轮对话中的关键组件,负责管理对话过程中的各类变量:
变量池中的变量类型:
- 系统变量
:以
sys.为前缀,如sys.query(用户输入)、sys.files(用户上传文件) - 环境变量 :工作流级别的配置变量
- 会话变量 :在会话中持久化的变量,可以跨多轮对话使用
- 节点变量 :各节点的输入输出变量
2.3.1 VariablePool 类实现
以下是 VariablePool 类的核心实现,位于 api/core/workflow/entities/variable\_pool.py:
classVariablePool(BaseModel):
# Variable dictionary is a dictionary for looking up variables by their selector.
# The first element of the selector is the node id, it's the first-level key in the dictionary.
# Other elements of the selector are the keys in the second-level dictionary. To get the key, we hash the
# elements of the selector
except
the first one.
variable\_dictionary: dict[str, dict[int,
Segment
]] = Field(
description="Variables mapping",
default=defaultdict(dict),
)
# TODO: This user inputs is not used for pool.
user\_inputs: Mapping[str, Any] = Field(
description="User inputs",
)
system\_variables: Mapping[SystemVariableKey, Any] = Field(
description="System variables",
)
environment\_variables: Sequence[Variable] = Field(
description="Environment variables.",
default\_factory=list,
)
conversation\_variables: Sequence[Variable] = Field(
description="Conversation variables.",
default\_factory=list,
)
def\_\_init\_\_(
self,
*,
system\_variables: Mapping[SystemVariableKey, Any] | None = None,
user\_inputs: Mapping[str, Any] | None = None,
environment\_variables: Sequence[Variable] | None = None,
conversation\_variables: Sequence[Variable] | None = None,
**kwargs,
):
environment\_variables = environment\_variables or []
conversation\_variables = conversation\_variables or []
user\_inputs = user\_inputs or {}
system\_variables = system\_variables or {}
super().\_\_init\_\_(
system\_variables=system\_variables,
user\_inputs=user\_inputs,
environment\_variables=environment\_variables,
conversation\_variables=conversation\_variables,
**kwargs,
)
for key, value in self.system\_variables.items():
self.add((SYSTEM\_VARIABLE\_NODE\_ID, key.value), value)
# Add environment variables to the variable pool
for var in self.environment\_variables:
self.add((ENVIRONMENT\_VARIABLE\_NODE\_ID, var.name), var)
# Add conversation variables to the variable pool
for var in self.conversation\_variables:
self.add((CONVERSATION\_VARIABLE\_NODE\_ID, var.name), var)
defadd(self, selector: Sequence[str], value: Any, /) -> None:
"""添加变量到变量池"""
if len(selector) < 2:
raise ValueError("Invalid selector")
if isinstance(value, Variable):
variable = value
if isinstance(value,
Segment
):
variable = variable\_factory.segment\_to\_variable(
segment
=value, selector=selector)
else:
segment
= variable\_factory.build\_segment(value)
variable = variable\_factory.segment\_to\_variable(segment=segment, selector=selector)
hash\_key = hash(tuple(selector[1:]))
self.variable\_dictionary[selector[0]][hash\_key] = variable
defget(self, selector: Sequence[str], /) -> Segment | None:
"""从变量池中获取变量值"""
if len(selector) < 2:
returnNone
hash\_key = hash(tuple(selector[1:]))
value = self.variable\_dictionary[selector[0]].get(hash\_key)
if value isNone:
selector, attr = selector[:-1], selector[-1]
# Python support `attr in FileAttribute` after 3.12
if attr notin {item.value for item in FileAttribute}:
returnNone
value = self.get(selector)
ifnot isinstance(value, FileSegment | NoneSegment):
returnNone
if isinstance(value, FileSegment):
attr = FileAttribute(attr)
attr\_value = file\_manager.get\_attr(file=value.value, attr=attr)
return variable\_factory.build\_segment(attr\_value)
return value
return value
defremove(self, selector: Sequence[str], /):
"""从变量池中移除变量"""
ifnot selector:
return
if len(selector) == 1:
self.variable\_dictionary[selector[0]] = {}
return
hash\_key = hash(tuple(selector[1:]))
self.variable\_dictionary[selector[0]].pop(hash\_key, None)
2.3.2 ConversationVariable 类实现
ConversationVariable 类负责会话变量的持久化,位于 api/models/workflow.py:
classConversationVariable(Base):
\_\_tablename\_\_ = "workflow\_conversation\_variables"
id: Mapped[str] = mapped\_column(StringUUID, primary\_key=True)
conversation\_id: Mapped[str] = mapped\_column(StringUUID, nullable=False, primary\_key=True,
index
=True)
app\_id: Mapped[str] = mapped\_column(StringUUID, nullable=False,
index
=True)
data: Mapped[str] = mapped\_column(db.Text, nullable=False)
created\_at: Mapped[datetime] = mapped\_column(
db.DateTime, nullable=False, server\_default=func.current\_timestamp(),
index
=True
)
updated\_at: Mapped[datetime] = mapped\_column(
db.DateTime, nullable=False, server\_default=func.current\_timestamp(), onupdate=func.current\_timestamp()
)
def\_\_init\_\_(self, *, id: str, app\_id: str, conversation\_id: str, data: str) -> None:
self.id = id
self.app\_id = app\_id
self.conversation\_id = conversation\_id
self.data = data
@classmethod
deffrom\_variable(cls, *, app\_id: str, conversation\_id: str, variable: Variable) -> "ConversationVariable":
obj = cls(
id=variable.id,
app\_id=app\_id,
conversation\_id=conversation\_id,
data=variable.model\_dump\_json(),
)
return obj
defto\_variable(self) -> Variable:
mapping = json.loads(self.data)
return variable\_factory.build\_conversation\_variable\_from\_mapping(mapping)
三、多轮对话执行流程
3.1 对话初始化
3.2 消息处理流程
3.3 AdvancedChatAppRunner:多轮对话的核心执行器
3.3.1 设计理念与架构
AdvancedChatAppRunner 是 Dify 多轮对话功能的核心执行器,它采用了「编排式」设计理念,将复杂的对话流程抽象为可视化工作流,使开发者能够通过图形界面设计对话流程,而无需编写复杂的代码。
核心设计理念:
- 工作流驱动 :通过工作流引擎驱动对话流程,支持条件分支、循环等复杂逻辑
- 状态持久化 :通过会话变量机制实现对话状态的持久化,支持跨轮次的状态记忆
- 组件化设计 :将对话流程拆分为多个功能节点,如LLM节点、知识检索节点、条件节点等
- 事件驱动 :采用事件驱动架构,通过消息队列实现异步处理和流式响应
3.3.2 工作原理解析
当用户发送消息时,系统会创建一个 AdvancedChatAppRunner 实例,并调用其 run 方法开始处理用户输入。整个处理流程可以分为以下几个关键步骤:
- 初始化阶段
- 加载应用配置和工作流定义
- 确定用户身份,用于权限控制和个性化处理
- 处理输入内容审核(如敏感内容过滤)
- 处理特殊情况(如标注回复)
- 会话变量处理
- 从数据库查询已有会话变量
- 如果是首次对话,创建初始会话变量
- 将数据库实体转换为变量对象
- 变量池构建
- 创建系统变量(用户输入、文件、会话ID等)
- 添加用户输入变量(表单输入)
- 添加环境变量(应用级配置)
- 添加会话变量(持久化状态)
- 工作流执行
- 初始化工作流入口(WorkflowEntry)
- 运行工作流,获取事件生成器
- 处理工作流生成的事件(如消息块、完成事件等)
这种设计使得 AdvancedChatAppRunner 能够灵活处理各种复杂的对话场景,同时保持代码的可维护性和可扩展性。
3.3.3 代码实现详解
AdvancedChatAppRunner 类位于 api/core/app/apps/advanced\_chat/app\_runner.py,以下是其核心实现(简化版):
classAdvancedChatAppRunner(WorkflowBasedAppRunner):
"""AdvancedChat Application Runner"""
def\_\_init\_\_(
self,
application\_generate\_entity: AdvancedChatAppGenerateEntity,
queue\_manager: AppQueueManager,
conversation: Conversation,
message: Message,
dialogue\_count: int,
) -> None:
# 初始化父类
super().\_\_init\_\_(queue\_manager)
# 保存关键引用
self.application\_generate\_entity = application\_generate\_entity # 应用生成实体
self.conversation = conversation # 当前对话
self.message = message # 当前消息
self.\_dialogue\_count = dialogue\_count # 对话轮次计数
defrun(self) -> None:
# 1. 加载应用配置和工作流
app\_config = self.application\_generate\_entity.app\_config
app\_record = db.session.query(App).filter(App.id == app\_config.app\_id).first()
workflow = self.get\_workflow(app\_model=app\_record, workflow\_id=app\_config.workflow\_id)
# 2. 处理用户身份
user\_id = self.\_get\_user\_id()
# 3. 处理特殊运行模式(单次迭代或单次循环)
if self.application\_generate\_entity.single\_iteration\_run or self.application\_generate\_entity.single\_loop\_run:
# 处理单次迭代或单次循环运行的情况
graph, variable\_pool = self.\_handle\_special\_run\_mode()
else:
# 4. 处理常规运行模式
inputs = self.application\_generate\_entity.inputs
query = self.application\_generate\_entity.query
files = self.application\_generate\_entity.files
# 5. 输入内容审核和标注回复处理
if self.\_handle\_special\_cases(app\_record, inputs, query):
return
# 6. 初始化会话变量
conversation\_variables = self.\_init\_conversation\_variables(workflow)
# 7. 创建变量池
system\_inputs = self.\_create\_system\_inputs(query, files, user\_id, app\_config)
variable\_pool = VariablePool(
system\_variables=system\_inputs,
user\_inputs=inputs,
environment\_variables=workflow.environment\_variables,
conversation\_variables=conversation\_variables,
)
# 8. 初始化图结构
graph = self.\_init\_graph(graph\_config=workflow.graph\_dict)
# 9. 运行工作流
workflow\_entry = self.\_create\_workflow\_entry(workflow, graph, variable\_pool)
generator = workflow\_entry.run()
# 10. 处理工作流生成的事件
for event in generator:
self.\_handle\_event(workflow\_entry, event)
注意 :上面的代码是简化版,实际实现更为复杂。简化版主要展示了核心流程,省略了一些细节和错误处理。
3.3.4 开发者实现指南
想要实现类似的多轮对话功能,可以参考以下步骤:
- 定义数据模型
- 创建对话(Conversation)和消息(Message)模型
- 设计会话变量(ConversationVariable)模型,用于状态持久化
- 实现变量池
- 创建变量池(VariablePool)类,用于管理各类变量
- 实现变量的添加、获取和移除方法
- 设计工作流引擎
- 实现基于图的工作流执行引擎
- 支持各类节点(LLM、知识检索、条件等)
- 实现对话执行器
- 创建类似 AdvancedChatAppRunner 的执行器类
- 实现初始化、变量处理和工作流执行逻辑
- 添加事件处理
- 实现事件驱动机制,处理工作流生成的事件
- 支持流式响应,提升用户体验
3.3.5 预期效果
AdvancedChatAppRunner 是一个「对话编排引擎」,它使你能够:
- 可视化设计对话流程 :通过拖拽节点和连线来设计对话流程,无需编写代码
- 定义对话状态 :通过会话变量来定义和管理对话状态,实现复杂的对话逻辑
- 组合多种 AI 能力 :将 LLM、知识库、条件判断等能力组合起来,创建功能丰富的对话应用
例如,你可以设计一个客服机器人,它能够:
- 使用知识检索节点查询产品信息
- 使用条件节点判断用户意图
- 使用变量操作节点记录用户偏好
- 使用 LLM 节点生成个性化回复
所有这些功能都可以通过可视化界面配置,无需编写代码。
3.4 会话变量持久化
会话变量的更新和持久化是通过 update\_conversation\_variable 方法实现的,位于 api/core/workflow/nodes/variable\_assigner/common/helpers.py:
defupdate\_conversation\_variable(conversation\_id: str, variable: Variable):
stmt = select(ConversationVariable).where(
ConversationVariable.id == variable.id, ConversationVariable.conversation\_id == conversation\_id
)
with Session(db.engine) as session:
row = session.
scalar
(stmt)
ifnot row:
raise VariableOperatorNodeError("conversation variable not found in the database")
row.data = variable.model\_dump\_json()
session.commit()
四、对话历史管理
关于如何避免大模型 Token 超长限制的问题,请阅读从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误,以下为主要的流程。
4.1
Token 限制检查
4.2 对话历史裁剪策略
- 保留最新消息 :优先保留最近的对话轮次
- 保留系统提示 :确保系统提示始终包含在上下文中
- 动态调整 :根据模型的最大上下文长度动态调整历史消息数量
五、会话变量初始化流程
会话变量初始化的代码流程:
- 查询数据库中是否存在会话变量
- 如果不存在,则创建初始会话变量
- 将会话变量加载到变量池中
- 工作流执行过程中更新变量
- 通过
update\_conversation\_variable方法持久化变量
六、从用户输入到回复的完整流程
当用户输入一个问题时,系统会经历以下状态变化和数据更新:
6.1.1 API层处理
- 请求接收
- API服务接收用户的HTTP请求
- 解析请求参数(会话ID、用户输入、文件等)
- 验证用户权限和输入合法性
- 会话管理
- 如果是新会话,创建
Conversation记录 - 如果是已有会话,查询会话信息和历史消息
- 更新会话的
updated\_at时间戳
- 消息创建
- 创建新的
Message记录,设置query字段为用户输入 - 如果有文件,创建
MessageFile关联 - 设置消息状态为
processing
6.1.2 应用运行器初始化
- AppRunner创建
- 创建
AdvancedChatAppRunner实例 - 加载应用配置和工作流定义
- 设置会话和消息引用
- 变量初始化
- 创建系统变量(
sys.query、sys.files等) - 从数据库加载会话变量
- 如果是首次对话,创建初始会话变量
- 队列事件发布
- 创建
QueueAdvancedChatMessageEvent事件 - 将事件发布到消息队列
- 返回流式响应通道给客户端
6.2 工作流执行过程
从零开始学 Dify- 工作流(Workflow)系统架构
从零开始学 Dify - 万字详解 Dify 工作流(workflow)的实现机制
从零开始学 Dify - 万字详解 Dify 工作流图引擎(GraphEngine)的实现机制
6.2.1 任务管道处理
- 事件接收
AdvancedChatAppGenerateTaskPipeline接收队列事件- 初始化任务状态和上下文
- 发布
QueueAdvancedChatMessageStartEvent事件
- 工作流初始化
- 创建
WorkflowEntry实例 - 初始化
GraphEngine和GraphRuntimeState - 在数据库中创建
WorkflowRun记录,状态为running
6.2.2 节点执行过程
- 节点调度
- 图引擎确定起始节点
- 按照工作流定义的边(edge)顺序执行节点
- 每个节点执行前后更新
NodeRuntimeState
- LLM节点执行
- 从变量池获取输入变量(用户查询、对话历史等)
- 构建提示词模板并填充变量
- 调用LLM服务生成回复
- 将生成结果写入变量池
- 更新节点状态为
completed
- 知识检索节点执行
- 从变量池获取查询文本
- 执行向量检索获取相关文档片段
- 将检索结果写入变量池
- 更新节点状态为
completed
- 条件节点执行
- 从变量池获取条件变量
- 执行条件表达式求值
- 根据结果确定下一个执行路径
- 更新节点状态为
completed
- 变量操作节点执行
- 从变量池获取目标变量
- 执行变量操作(赋值、追加、数学运算等)
- 更新变量池中的变量值
- 对于会话变量,调用
update\_conversation\_variable持久化到数据库 - 更新节点状态为
completed
6.2.3 数据状态变化
- 变量池状态变化
- 系统变量:保存用户输入、文件等信息
- 环境变量:保持不变
- 会话变量:根据工作流执行过程更新
- 节点变量:随节点执行创建和更新
- 数据库状态变化
WorkflowRun记录状态更新NodeRun记录创建和状态更新ConversationVariable记录更新Message记录的answer字段逐步更新
6.3 响应生成与返回
6.3.1 流式响应处理
- 消息块生成
- LLM节点生成文本块
- 发布
QueueAdvancedChatMessageChunkEvent事件 - 任务管道接收事件并通过WebSocket发送给客户端
- 客户端逐步显示回复内容
- 工作流完成处理
- 所有节点执行完成后,工作流状态变为
completed - 发布
QueueAdvancedChatMessageEndEvent事件 - 任务管道接收事件并执行完成处理
6.3.2 数据持久化
- 消息保存
- 调用
\_save\_message方法保存完整消息 - 更新
Message记录的answer字段为完整回复 - 更新消息元数据(token使用量、延迟等)
- 会话变量持久化
- 确保所有会话变量已持久化到数据库
- 更新
ConversationVariable记录
- 工作流状态更新
- 更新
WorkflowRun记录状态为completed - 记录工作流执行统计信息(执行时间、节点数等)
- 会话状态更新
- 更新
Conversation记录的updated\_at时间 - 如果是首次对话,可能更新会话名称和摘要
6.4 客户端状态更新
- UI状态变化
- 显示用户输入的消息
- 显示"正在输入"状态
- 逐步显示AI回复内容
- 完成后更新消息状态
- 会话列表更新
- 更新会话列表中的最新消息预览
- 更新会话的时间戳
- 如果是新会话,添加到会话列表
通过这一系列的状态变化和数据更新,Dify实现了从用户输入到生成回复的完整流程,确保了多轮对话的连贯性和状态持久化。
七、总结
Dify 的多轮对话功能基于工作流引擎和变量池实现,通过以下关键机制支持复杂的对话场景:
- 对话管理 :使用 Conversation 和 Message 模型管理对话和消息
- 工作流执行 :基于图的工作流引擎控制对话流程
- 变量管理 :变量池管理系统变量、环境变量和会话变量
- 对话历史 :TokenBufferMemory 管理对话历史,支持上下文感知
- 会话变量持久化 :通过 ConversationVariable 持久化会话变量,支持跨轮对话状态保持
- 流式响应 :通过消息队列和事件机制实现流式响应,提升用户体验
Dify 能够支持复杂的多轮对话场景,如上下文感知、状态追踪、条件分支等,为用户提供了强大而灵活的 AI 对话应用开发能力。
参考资料
https://github.com/langgenius/dify (v1.4.1)
八、推荐阅读
从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误
从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制
从零开始学 Dify - 万字详解 Dify 工作流(workflow)的实现机制
从零开始学 Dify - 万字详解 Dify 工作流图引擎(GraphEngine)的实现机制
👆👆👆欢迎关注,一起进步👆👆👆
欢迎留言讨论哈
🧐点赞、分享、推荐 ,一键三连,养成习惯👍
