从零开始学 Dify - 万字详解 Dify 多轮对话的实现机制

AdvancedChatAppRunner 是 Dify 多轮对话功能的核心执行器,它采用了「编排式」设计理念,将复杂的对话流程抽象为可视化工作流,使用户能够通过图形界面设计对话流程,而无需编写复杂的代码。

阅读本文前,最好了解以下内容:

从零开始学 Dify- 对话系统的关键功能

从零开始学 Dify- 工作流(Workflow)系统架构

从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

从零开始学 Dify - 万字详解 Dify 工作流(workflow)的实现机制

从零开始学 Dify - 万字详解 Dify 工作流图引擎(GraphEngine)的实现机制

👆👆👆欢迎关注,一起进步👆👆👆

一、多轮对话概述

Dify 是一个强大的 AI 应用开发平台,其多轮对话功能是核心特性之一。多轮对话允许用户与 AI 进行连续的交互,系统能够记住对话历史并在此基础上生成连贯的回复。

picture.image

二、核心组件

2.1 对话模型(Conversation)

Dify 使用 Conversation 模型来表示一个完整的对话会话:

picture.image

  • Conversation 实体存储对话的基本信息,如 ID、应用 ID、模型配置等
  • 每个对话包含多个 Message ,代表用户输入和 AI 回复
  • Message 可以关联 MessageFile ,支持多模态对话
  • ConversationVariable 存储会话级别的变量,支持跨轮对话状态保持
  • WorkflowRun 记录工作流执行实例,每次对话会创建一个工作流运行记录
  • NodeRun 记录节点执行状态,跟踪工作流中每个节点的执行情况

2.2 工作流引擎

Dify 的多轮对话基于工作流引擎实现,工作流引擎是一个基于图(Graph)的执行系统:

picture.image

工作流引擎的主要组件:(详细内容可参考:从零开始学 Dify - 万字详解 Dify 工作流(workflow)的实现机制从零开始学 Dify - 万字详解 Dify 工作流图引擎(GraphEngine)的实现机制

  • 工作流入口(WorkflowEntry) :工作流执行的起点,负责初始化执行环境
  • 图引擎(GraphEngine) :解析工作流配置,构建执行图,控制节点执行顺序
  • 节点(Node) :工作流的基本执行单元,如 LLM 节点、条件节点等
  • 变量池(VariablePool) :管理工作流执行过程中的变量

2.3 变量池(VariablePool)

变量池是多轮对话中的关键组件,负责管理对话过程中的各类变量:

picture.image

变量池中的变量类型:

  1. 系统变量 :以 sys. 为前缀,如 sys.query (用户输入)、 sys.files (用户上传文件)
  2. 环境变量 :工作流级别的配置变量
  3. 会话变量 :在会话中持久化的变量,可以跨多轮对话使用
  4. 节点变量 :各节点的输入输出变量

2.3.1 VariablePool 类实现

以下是 VariablePool 类的核心实现,位于 api/core/workflow/entities/variable\_pool.py

  
classVariablePool(BaseModel):  
    # Variable dictionary is a dictionary for looking up variables by their selector.  
    # The first element of the selector is the node id, it's the first-level key in the dictionary.  
    # Other elements of the selector are the keys in the second-level dictionary. To get the key, we hash the  
    # elements of the selector 
 
 except
  the first one.  
    variable\_dictionary: dict[str, dict[int, 
   
 Segment
 ]] = Field(  
        description="Variables mapping",  
        default=defaultdict(dict),  
    )  
    # TODO: This user inputs is not used for pool.  
    user\_inputs: Mapping[str, Any] = Field(  
        description="User inputs",  
    )  
    system\_variables: Mapping[SystemVariableKey, Any] = Field(  
        description="System variables",  
    )  
    environment\_variables: Sequence[Variable] = Field(  
        description="Environment variables.",  
        default\_factory=list,  
    )  
    conversation\_variables: Sequence[Variable] = Field(  
        description="Conversation variables.",  
        default\_factory=list,  
    )  
  
    def\_\_init\_\_(  
        self,  
        *,  
        system\_variables: Mapping[SystemVariableKey, Any] | None = None,  
        user\_inputs: Mapping[str, Any] | None = None,  
        environment\_variables: Sequence[Variable] | None = None,  
        conversation\_variables: Sequence[Variable] | None = None,  
        **kwargs,  
    ):  
        environment\_variables = environment\_variables or []  
        conversation\_variables = conversation\_variables or []  
        user\_inputs = user\_inputs or {}  
        system\_variables = system\_variables or {}  
  
        super().\_\_init\_\_(  
            system\_variables=system\_variables,  
            user\_inputs=user\_inputs,  
            environment\_variables=environment\_variables,  
            conversation\_variables=conversation\_variables,  
            **kwargs,  
        )  
  
        for key, value in self.system\_variables.items():  
            self.add((SYSTEM\_VARIABLE\_NODE\_ID, key.value), value)  
        # Add environment variables to the variable pool  
        for var in self.environment\_variables:  
            self.add((ENVIRONMENT\_VARIABLE\_NODE\_ID, var.name), var)  
        # Add conversation variables to the variable pool  
        for var in self.conversation\_variables:  
            self.add((CONVERSATION\_VARIABLE\_NODE\_ID, var.name), var)  
  
    defadd(self, selector: Sequence[str], value: Any, /) -> None:  
        """添加变量到变量池"""  
        if len(selector) < 2:  
            raise ValueError("Invalid selector")  
  
        if isinstance(value, Variable):  
            variable = value  
        if isinstance(value, 
   
 Segment
 ):  
            variable = variable\_factory.segment\_to\_variable(
   
 segment
 =value, selector=selector)  
        else:  
            
   
 segment
  = variable\_factory.build\_segment(value)  
            variable = variable\_factory.segment\_to\_variable(segment=segment, selector=selector)  
  
        hash\_key = hash(tuple(selector[1:]))  
        self.variable\_dictionary[selector[0]][hash\_key] = variable  
  
    defget(self, selector: Sequence[str], /) -> Segment | None:  
        """从变量池中获取变量值"""  
        if len(selector) < 2:  
            returnNone  
  
        hash\_key = hash(tuple(selector[1:]))  
        value = self.variable\_dictionary[selector[0]].get(hash\_key)  
  
        if value isNone:  
            selector, attr = selector[:-1], selector[-1]  
            # Python support `attr in FileAttribute` after 3.12  
            if attr notin {item.value for item in FileAttribute}:  
                returnNone  
            value = self.get(selector)  
            ifnot isinstance(value, FileSegment | NoneSegment):  
                returnNone  
            if isinstance(value, FileSegment):  
                attr = FileAttribute(attr)  
                attr\_value = file\_manager.get\_attr(file=value.value, attr=attr)  
                return variable\_factory.build\_segment(attr\_value)  
            return value  
  
        return value  
  
    defremove(self, selector: Sequence[str], /):  
        """从变量池中移除变量"""  
        ifnot selector:  
            return  
        if len(selector) == 1:  
            self.variable\_dictionary[selector[0]] = {}  
            return  
        hash\_key = hash(tuple(selector[1:]))  
        self.variable\_dictionary[selector[0]].pop(hash\_key, None)  

2.3.2 ConversationVariable 类实现

ConversationVariable 类负责会话变量的持久化,位于 api/models/workflow.py

  
classConversationVariable(Base):  
    \_\_tablename\_\_ = "workflow\_conversation\_variables"  
  
    id: Mapped[str] = mapped\_column(StringUUID, primary\_key=True)  
    conversation\_id: Mapped[str] = mapped\_column(StringUUID, nullable=False, primary\_key=True, 
   
 index
 =True)  
    app\_id: Mapped[str] = mapped\_column(StringUUID, nullable=False, 
   
 index
 =True)  
    data: Mapped[str] = mapped\_column(db.Text, nullable=False)  
    created\_at: Mapped[datetime] = mapped\_column(  
        db.DateTime, nullable=False, server\_default=func.current\_timestamp(), 
   
 index
 =True  
    )  
    updated\_at: Mapped[datetime] = mapped\_column(  
        db.DateTime, nullable=False, server\_default=func.current\_timestamp(), onupdate=func.current\_timestamp()  
    )  
  
    def\_\_init\_\_(self, *, id: str, app\_id: str, conversation\_id: str, data: str) -> None:  
        self.id = id  
        self.app\_id = app\_id  
        self.conversation\_id = conversation\_id  
        self.data = data  
  
    @classmethod  
    deffrom\_variable(cls, *, app\_id: str, conversation\_id: str, variable: Variable) -> "ConversationVariable":  
        obj = cls(  
            id=variable.id,  
            app\_id=app\_id,  
            conversation\_id=conversation\_id,  
            data=variable.model\_dump\_json(),  
        )  
        return obj  
  
    defto\_variable(self) -> Variable:  
        mapping = json.loads(self.data)  
        return variable\_factory.build\_conversation\_variable\_from\_mapping(mapping)  

三、多轮对话执行流程

3.1 对话初始化

picture.image

3.2 消息处理流程

picture.image

3.3 AdvancedChatAppRunner:多轮对话的核心执行器

3.3.1 设计理念与架构

AdvancedChatAppRunner 是 Dify 多轮对话功能的核心执行器,它采用了「编排式」设计理念,将复杂的对话流程抽象为可视化工作流,使开发者能够通过图形界面设计对话流程,而无需编写复杂的代码。

核心设计理念:

  • 工作流驱动 :通过工作流引擎驱动对话流程,支持条件分支、循环等复杂逻辑
  • 状态持久化 :通过会话变量机制实现对话状态的持久化,支持跨轮次的状态记忆
  • 组件化设计 :将对话流程拆分为多个功能节点,如LLM节点、知识检索节点、条件节点等
  • 事件驱动 :采用事件驱动架构,通过消息队列实现异步处理和流式响应

picture.image

3.3.2 工作原理解析

当用户发送消息时,系统会创建一个 AdvancedChatAppRunner 实例,并调用其 run 方法开始处理用户输入。整个处理流程可以分为以下几个关键步骤:

  1. 初始化阶段
  • 加载应用配置和工作流定义
  • 确定用户身份,用于权限控制和个性化处理
  • 处理输入内容审核(如敏感内容过滤)
  • 处理特殊情况(如标注回复)
  • 会话变量处理
  • 从数据库查询已有会话变量
  • 如果是首次对话,创建初始会话变量
  • 将数据库实体转换为变量对象
  • 变量池构建
  • 创建系统变量(用户输入、文件、会话ID等)
  • 添加用户输入变量(表单输入)
  • 添加环境变量(应用级配置)
  • 添加会话变量(持久化状态)
  • 工作流执行
  • 初始化工作流入口(WorkflowEntry)
  • 运行工作流,获取事件生成器
  • 处理工作流生成的事件(如消息块、完成事件等)

这种设计使得 AdvancedChatAppRunner 能够灵活处理各种复杂的对话场景,同时保持代码的可维护性和可扩展性。

3.3.3 代码实现详解

AdvancedChatAppRunner 类位于 api/core/app/apps/advanced\_chat/app\_runner.py,以下是其核心实现(简化版):

  
classAdvancedChatAppRunner(WorkflowBasedAppRunner):  
    """AdvancedChat Application Runner"""  
  
    def\_\_init\_\_(  
        self,  
        application\_generate\_entity: AdvancedChatAppGenerateEntity,  
        queue\_manager: AppQueueManager,  
        conversation: Conversation,  
        message: Message,  
        dialogue\_count: int,  
    ) -> None:  
        # 初始化父类  
        super().\_\_init\_\_(queue\_manager)  
  
        # 保存关键引用  
        self.application\_generate\_entity = application\_generate\_entity  # 应用生成实体  
        self.conversation = conversation  # 当前对话  
        self.message = message  # 当前消息  
        self.\_dialogue\_count = dialogue\_count  # 对话轮次计数  
  
    defrun(self) -> None:  
        # 1. 加载应用配置和工作流  
        app\_config = self.application\_generate\_entity.app\_config  
        app\_record = db.session.query(App).filter(App.id == app\_config.app\_id).first()  
        workflow = self.get\_workflow(app\_model=app\_record, workflow\_id=app\_config.workflow\_id)  
          
        # 2. 处理用户身份  
        user\_id = self.\_get\_user\_id()  
          
        # 3. 处理特殊运行模式(单次迭代或单次循环)  
        if self.application\_generate\_entity.single\_iteration\_run or self.application\_generate\_entity.single\_loop\_run:  
            # 处理单次迭代或单次循环运行的情况  
            graph, variable\_pool = self.\_handle\_special\_run\_mode()  
        else:  
            # 4. 处理常规运行模式  
            inputs = self.application\_generate\_entity.inputs  
            query = self.application\_generate\_entity.query  
            files = self.application\_generate\_entity.files  
  
            # 5. 输入内容审核和标注回复处理  
            if self.\_handle\_special\_cases(app\_record, inputs, query):  
                return  
  
            # 6. 初始化会话变量  
            conversation\_variables = self.\_init\_conversation\_variables(workflow)  
  
            # 7. 创建变量池  
            system\_inputs = self.\_create\_system\_inputs(query, files, user\_id, app\_config)  
            variable\_pool = VariablePool(  
                system\_variables=system\_inputs,  
                user\_inputs=inputs,  
                environment\_variables=workflow.environment\_variables,  
                conversation\_variables=conversation\_variables,  
            )  
  
            # 8. 初始化图结构  
            graph = self.\_init\_graph(graph\_config=workflow.graph\_dict)  
  
        # 9. 运行工作流  
        workflow\_entry = self.\_create\_workflow\_entry(workflow, graph, variable\_pool)  
        generator = workflow\_entry.run()  
  
        # 10. 处理工作流生成的事件  
        for event in generator:  
            self.\_handle\_event(workflow\_entry, event)  

注意 :上面的代码是简化版,实际实现更为复杂。简化版主要展示了核心流程,省略了一些细节和错误处理。

3.3.4 开发者实现指南

想要实现类似的多轮对话功能,可以参考以下步骤:

  1. 定义数据模型
  • 创建对话(Conversation)和消息(Message)模型
  • 设计会话变量(ConversationVariable)模型,用于状态持久化
  • 实现变量池
  • 创建变量池(VariablePool)类,用于管理各类变量
  • 实现变量的添加、获取和移除方法
  • 设计工作流引擎
  • 实现基于图的工作流执行引擎
  • 支持各类节点(LLM、知识检索、条件等)
  • 实现对话执行器
  • 创建类似 AdvancedChatAppRunner 的执行器类
  • 实现初始化、变量处理和工作流执行逻辑
  • 添加事件处理
  • 实现事件驱动机制,处理工作流生成的事件
  • 支持流式响应,提升用户体验

3.3.5 预期效果

AdvancedChatAppRunner 是一个「对话编排引擎」,它使你能够:

  1. 可视化设计对话流程 :通过拖拽节点和连线来设计对话流程,无需编写代码
  2. 定义对话状态 :通过会话变量来定义和管理对话状态,实现复杂的对话逻辑
  3. 组合多种 AI 能力 :将 LLM、知识库、条件判断等能力组合起来,创建功能丰富的对话应用

例如,你可以设计一个客服机器人,它能够:

  • 使用知识检索节点查询产品信息
  • 使用条件节点判断用户意图
  • 使用变量操作节点记录用户偏好
  • 使用 LLM 节点生成个性化回复

所有这些功能都可以通过可视化界面配置,无需编写代码。

3.4 会话变量持久化

会话变量的更新和持久化是通过 update\_conversation\_variable 方法实现的,位于 api/core/workflow/nodes/variable\_assigner/common/helpers.py

  
defupdate\_conversation\_variable(conversation\_id: str, variable: Variable):  
    stmt = select(ConversationVariable).where(  
        ConversationVariable.id == variable.id, ConversationVariable.conversation\_id == conversation\_id  
    )  
    with Session(db.engine) as session:  
        row = session.
   
 scalar
 (stmt)  
        ifnot row:  
            raise VariableOperatorNodeError("conversation variable not found in the database")  
        row.data = variable.model\_dump\_json()  
        session.commit()  

四、对话历史管理

关于如何避免大模型 Token 超长限制的问题,请阅读从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误,以下为主要的流程。

4.1

Token 限制检查

picture.image

4.2 对话历史裁剪策略

  1. 保留最新消息 :优先保留最近的对话轮次
  2. 保留系统提示 :确保系统提示始终包含在上下文中
  3. 动态调整 :根据模型的最大上下文长度动态调整历史消息数量

五、会话变量初始化流程

picture.image

会话变量初始化的代码流程:

  1. 查询数据库中是否存在会话变量
  2. 如果不存在,则创建初始会话变量
  3. 将会话变量加载到变量池中
  4. 工作流执行过程中更新变量
  5. 通过 update\_conversation\_variable 方法持久化变量

六、从用户输入到回复的完整流程

picture.image

当用户输入一个问题时,系统会经历以下状态变化和数据更新:

6.1.1 API层处理

  1. 请求接收
  • API服务接收用户的HTTP请求
  • 解析请求参数(会话ID、用户输入、文件等)
  • 验证用户权限和输入合法性
  • 会话管理
  • 如果是新会话,创建 Conversation 记录
  • 如果是已有会话,查询会话信息和历史消息
  • 更新会话的 updated\_at 时间戳
  • 消息创建
  • 创建新的 Message 记录,设置 query 字段为用户输入
  • 如果有文件,创建 MessageFile 关联
  • 设置消息状态为 processing

6.1.2 应用运行器初始化

  1. AppRunner创建
  • 创建 AdvancedChatAppRunner 实例
  • 加载应用配置和工作流定义
  • 设置会话和消息引用
  • 变量初始化
  • 创建系统变量( sys.querysys.files 等)
  • 从数据库加载会话变量
  • 如果是首次对话,创建初始会话变量
  • 队列事件发布
  • 创建 QueueAdvancedChatMessageEvent 事件
  • 将事件发布到消息队列
  • 返回流式响应通道给客户端

6.2 工作流执行过程

从零开始学 Dify- 工作流(Workflow)系统架构

从零开始学 Dify - 万字详解 Dify 工作流(workflow)的实现机制

从零开始学 Dify - 万字详解 Dify 工作流图引擎(GraphEngine)的实现机制

picture.image

6.2.1 任务管道处理

  1. 事件接收
  • AdvancedChatAppGenerateTaskPipeline 接收队列事件
  • 初始化任务状态和上下文
  • 发布 QueueAdvancedChatMessageStartEvent 事件
  • 工作流初始化
  • 创建 WorkflowEntry 实例
  • 初始化 GraphEngineGraphRuntimeState
  • 在数据库中创建 WorkflowRun 记录,状态为 running

6.2.2 节点执行过程

  1. 节点调度
  • 图引擎确定起始节点
  • 按照工作流定义的边(edge)顺序执行节点
  • 每个节点执行前后更新 NodeRuntimeState
  • LLM节点执行
  • 从变量池获取输入变量(用户查询、对话历史等)
  • 构建提示词模板并填充变量
  • 调用LLM服务生成回复
  • 将生成结果写入变量池
  • 更新节点状态为 completed
  • 知识检索节点执行
  • 从变量池获取查询文本
  • 执行向量检索获取相关文档片段
  • 将检索结果写入变量池
  • 更新节点状态为 completed
  • 条件节点执行
  • 从变量池获取条件变量
  • 执行条件表达式求值
  • 根据结果确定下一个执行路径
  • 更新节点状态为 completed
  • 变量操作节点执行
  • 从变量池获取目标变量
  • 执行变量操作(赋值、追加、数学运算等)
  • 更新变量池中的变量值
  • 对于会话变量,调用 update\_conversation\_variable 持久化到数据库
  • 更新节点状态为 completed

6.2.3 数据状态变化

  1. 变量池状态变化
  • 系统变量:保存用户输入、文件等信息
  • 环境变量:保持不变
  • 会话变量:根据工作流执行过程更新
  • 节点变量:随节点执行创建和更新
  • 数据库状态变化
  • WorkflowRun 记录状态更新
  • NodeRun 记录创建和状态更新
  • ConversationVariable 记录更新
  • Message 记录的 answer 字段逐步更新

6.3 响应生成与返回

picture.image

6.3.1 流式响应处理

  1. 消息块生成
  • LLM节点生成文本块
  • 发布 QueueAdvancedChatMessageChunkEvent 事件
  • 任务管道接收事件并通过WebSocket发送给客户端
  • 客户端逐步显示回复内容
  • 工作流完成处理
  • 所有节点执行完成后,工作流状态变为 completed
  • 发布 QueueAdvancedChatMessageEndEvent 事件
  • 任务管道接收事件并执行完成处理

6.3.2 数据持久化

  1. 消息保存
  • 调用 \_save\_message 方法保存完整消息
  • 更新 Message 记录的 answer 字段为完整回复
  • 更新消息元数据(token使用量、延迟等)
  • 会话变量持久化
  • 确保所有会话变量已持久化到数据库
  • 更新 ConversationVariable 记录
  • 工作流状态更新
  • 更新 WorkflowRun 记录状态为 completed
  • 记录工作流执行统计信息(执行时间、节点数等)
  • 会话状态更新
  • 更新 Conversation 记录的 updated\_at 时间
  • 如果是首次对话,可能更新会话名称和摘要

6.4 客户端状态更新

  1. UI状态变化
  • 显示用户输入的消息
  • 显示"正在输入"状态
  • 逐步显示AI回复内容
  • 完成后更新消息状态
  • 会话列表更新
  • 更新会话列表中的最新消息预览
  • 更新会话的时间戳
  • 如果是新会话,添加到会话列表

通过这一系列的状态变化和数据更新,Dify实现了从用户输入到生成回复的完整流程,确保了多轮对话的连贯性和状态持久化。

七、总结

Dify 的多轮对话功能基于工作流引擎和变量池实现,通过以下关键机制支持复杂的对话场景:

  1. 对话管理 :使用 Conversation 和 Message 模型管理对话和消息
  2. 工作流执行 :基于图的工作流引擎控制对话流程
  3. 变量管理 :变量池管理系统变量、环境变量和会话变量
  4. 对话历史 :TokenBufferMemory 管理对话历史,支持上下文感知
  5. 会话变量持久化 :通过 ConversationVariable 持久化会话变量,支持跨轮对话状态保持
  6. 流式响应 :通过消息队列和事件机制实现流式响应,提升用户体验

Dify 能够支持复杂的多轮对话场景,如上下文感知、状态追踪、条件分支等,为用户提供了强大而灵活的 AI 对话应用开发能力。

参考资料

https://github.com/langgenius/dify (v1.4.1)

八、推荐阅读

从零开始学 Dify- 对话系统的关键功能

从零开始学 Dify- 工作流(Workflow)系统架构

从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

从零开始学 Dify - 万字详解 Dify 工作流(workflow)的实现机制

从零开始学 Dify - 万字详解 Dify 工作流图引擎(GraphEngine)的实现机制

👆👆👆欢迎关注,一起进步👆👆👆

欢迎留言讨论哈

🧐点赞、分享、推荐 ,一键三连,养成习惯👍

0
0
0
0
评论
未登录
暂无评论