从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

Dify 是一个开源的 LLM 应用开发平台,提供了直观的界面,结合了 AI 工作流、RAG、Agent、模型管理、可观测性功能等,让用户可以快速从原型到生产。

本文将深入分析 Dify 聊天助手应用的实现机制。

👆👆👆欢迎关注,一起进步👆👆👆

一、聊天助手应用创建流程图

以下流程图展示了在 Dify 中创建聊天助手应用的过程:

picture.image

二、应用创建代码分析

2.1 控制器层

应用创建流程从控制器层开始,主要在api/controllers/console/app/app.py文件中的AppListApi.post方法:

  1. 接收前端请求,解析参数: namedescriptionmodeicon\_typeiconicon\_background
  2. 验证用户权限(要求用户为 editor 角色)
  3. 验证必要参数,特别是 mode 参数(可选值: chatagent-chatadvanced-chatworkflowcompletion
  4. 调用 AppService.create\_app 方法创建应用
  5. 返回应用信息和201状态码

2.2 服务层

服务层的实现在api/services/app\_service.py文件中的create\_app方法:

  1. 根据应用模式( AppMode )从 default\_app\_templates 获取默认应用模板
  2. 处理默认模型实例的获取和设置
  3. 创建 App 实例并设置其属性(名称、描述、模式、图标、租户ID等)
  4. 创建 AppModelConfig 实例并设置其属性
  5. 将应用和关联的模型配置保存到数据库
  6. 触发 app\_was\_created 事件
  7. 返回创建的应用实例

2.3 模型定义

应用创建涉及的主要数据模型定义在api/models/model.py文件中:

  1. App 模型:应用的核心数据结构,包含应用的基本信息和配置
  2. AppModelConfig 模型:应用的模型配置,包含模型提供商、模型ID、参数等
  3. InstalledApp 模型:记录应用的安装信息,包含租户ID、应用ID、位置等
  4. Site 模型:记录应用的站点信息,包含标题、图标、默认语言等

2.4 默认应用模板

默认应用模板定义在`api/

constants /model_template.py文件中的default_app_templates`字典:

  1. 为不同的应用模式( WORKFLOWCOMPLETIONCHATADVANCED\_CHATAGENT\_CHAT )定义默认配置
  2. 包含 enable\_siteenable\_api 等默认设置

2.5 事件处理

应用创建后触发的事件处理在以下文件中实现:

  1. api/events/event\_handlers/create\_installed\_app\_when\_app\_created.py :创建 InstalledApp 记录
  2. api/events/event\_handlers/create\_site\_record\_when\_app\_created.py :创建 Site 记录

这些事件处理器通过app\_was\_created信号连接到应用创建过程,确保在应用创建后自动创建相关记录。

三、系统数据流图

接下来将分析 Dify 项目中“聊天应用”的实现机制,涵盖消息的收发、会话管理、消息流式/阻塞返回、上下文与多轮对话、文件与反馈等核心能力。该机制是 Dify 支持 AI 聊天、客服、知识问答等场景的基础,直接影响用户体验、系统扩展性与业务创新能力。

3.1 以"会话-消息"为核心的流程图

以下是 Dify 系统中主要数据流的示意图:

picture.image

3.2 相关源码定位

| 角色/功能 | 主要模块/文件 | 主要职责说明 | | --- | --- | --- | | API 入口 | api/controllers/service\_api/app/conversation.py | 聊天消息的 API 接口,参数解析、鉴权、调用生成逻辑 | | 消息生成主流程 | api/core/app/apps/chat/app\_generator.py | 聊天消息生成主流程,队列与 worker 管理 | | 消息队列管理 | api/core/app/apps/message\_based\_app\_queue\_manager.py | 消息事件推送、流式/阻塞返回 | | 会话与消息服务 | services/conversation\_service.py | 会话、消息的增删查改,历史消息查询 | | 数据模型 | models/model.py

models/account.py | Conversation、Message、Account 等 ORM 定义 | | 数据库扩展 | extensions/ext\_database.py | 数据库会话管理 | | 前端接口文档 | web/app/components/develop/template/template\_chat.zh.mdx | API 说明、参数与返回格式 |

3.3 代码示例

3.3.1 API 入口

文件:api/controllers/service\_api/app/conversation.py

  
class ChatMessageApi(Resource):  
    def post(self):  
        args = request.get\_json()  
        user = get\_user\_from\_token(request)  
        result = ChatAppGenerator().generate(app\_model, user, args, invoke\_from=InvokeFrom.SERVICE\_API)  
        return result  

  • 解析请求参数,鉴权,调用核心生成逻辑。

3.3.2 消息生成主流程

文件:api/core/app/apps/chat/app\_generator.py

  
class ChatAppGenerator(MessageBasedAppGenerator):  
    def generate(self, app\_model, user, args, invoke\_from, streaming=True):  
        # 参数校验  
        query = args["query"]  
        # 获取/新建会话  
        conversation = ConversationService.get\_conversation(app\_model, args.get("conversation\_id"), user)  
        # 创建消息  
        message = Message(...)  
        db.session.add(message)  
        db.session.commit()  
        # 初始化队列与worker  
        queue\_manager = MessageBasedAppQueueManager(...)  
        worker\_thread = threading.Thread(target=worker\_with\_context)  
        worker\_thread.start()  
        # 返回流式/阻塞响应  
        return self.\_handle\_response(...)  

  • 负责会话与消息的创建、队列与 worker 的初始化、响应的返回。

3.3.3 Worker 线程与消息推送

  
def \_generate\_worker(self, flask\_app, application\_generate\_entity, queue\_manager, conversation\_id, message\_id):  
    with flask\_app.app\_context():  
        conversation = self.\_get\_conversation(conversation\_id)  
        message = self.\_get\_message(message\_id)  
        runner = ChatAppRunner()  
        runner.run(  
            application\_generate\_entity=application\_generate\_entity,  
            queue\_manager=queue\_manager,  
            conversation=conversation,  
            message=message,  
        )  

  • Worker 线程负责实际的 LLM 调用与消息推送,支持异步处理。

3.3.4 消息队列与流式返回

文件:api/core/app/apps/message\_based\_app\_queue\_manager.py

  • 负责消息事件的推送,支持 SSE(Server-Sent Events)流式返回,提升用户体验。

四、数据模型设计 (ER图)

以下是Dify聊天助手应用的核心数据模型关系图:

picture.image

4.1 数据模型详细解释

4.1.1 核心实体说明

  1. Tenant(租户)
  • id :租户唯一标识符

  • name :租户名称

  • plan :订阅计划类型

  • created\_at :创建时间

  • 代表使用 Dify 平台的组织或团队

  • 每个租户可以拥有多个账户(Account)

  • 关键字段:

  • Account(账户)
  • id :账户唯一标识符

  • name :用户名

  • email :电子邮箱,用于登录

  • password :加密存储的密码

  • status :账户状态(如活跃、禁用等)

  • last\_login\_at :最后登录时间

  • 代表可以登录和使用 Dify 平台的用户

  • 每个账户属于一个租户,可以创建和管理多个应用(App)

  • 关键字段:

  • App(应用)
  • id :应用唯一标识符

  • tenant\_id :所属租户ID

  • name :应用名称

  • description :应用描述

  • mode :应用模式(如聊天模式、完成模式等)

  • app\_model\_config\_id :关联的模型配置ID

  • workflow\_id :关联的工作流ID

  • enable\_site :是否启用网站访问

  • enable\_api :是否启用API访问

  • created\_at :创建时间

  • 代表在 Dify 平台上创建的 AI 应用,如聊天助手

  • 每个应用属于一个租户,由账户创建和管理

  • 关键字段:

  • AppModelConfig(应用模型配置)
  • id :配置唯一标识符

  • app\_id :关联的应用ID

  • provider :模型提供商(如OpenAI、Anthropic等)

  • model\_id :使用的模型ID

  • configs :模型参数配置(JSON格式)

  • pre\_prompt :系统提示词

  • opening\_statement :对话开场白

  • agent\_mode :Agent模式设置

  • 定义应用使用的 AI 模型和相关配置

  • 每个应用有一个对应的模型配置

  • 关键字段:

  • Conversation(对话)
  • id :对话唯一标识符

  • app\_id :关联的应用ID

  • name :对话名称

  • inputs :对话初始输入

  • from\_end\_user\_id :发起对话的终端用户ID

  • from\_account\_id :发起对话的账户ID

  • status :对话状态

  • created\_at :创建时间

  • updated\_at :更新时间

  • 代表用户与应用之间的一次完整对话

  • 每个对话属于一个应用,包含多条消息

  • 关键字段:

  • Message(消息)
  • id :消息唯一标识符

  • app\_id :关联的应用ID

  • conversation\_id :关联的对话ID

  • query :用户查询内容

  • message :完整消息内容(JSON格式)

  • answer :系统回复内容

  • status :消息处理状态

  • from\_source :消息来源

  • created\_at :创建时间

  • 代表对话中的一条消息,包括用户输入和系统回复

  • 每条消息属于一个对话

  • 关键字段:

  • EndUser(终端用户)
  • id :用户唯一标识符

  • session\_id :会话ID

  • type :用户类型

  • created\_at :创建时间

  • 代表使用应用的最终用户

  • 一个终端用户可以发起多个对话

  • 关键字段:

  • Workflow(工作流)
  • id :工作流唯一标识符

  • tenant\_id :所属租户ID

  • app\_id :关联的应用ID

  • type :工作流类型

  • version :工作流版本

  • graph :工作流图定义(文本格式)

  • 定义应用的处理流程

  • 每个工作流关联到一个应用

  • 关键字段:

  • ConversationVariable(对话变量)
  • id :变量唯一标识符

  • conversation\_id :关联的对话ID

  • key :变量名

  • value :变量值

  • 存储对话过程中产生的变量

  • 每个变量属于一个对话

  • 关键字段:

4.1.2 实体关系说明

  1. Tenant 与 Account 关系
  • 一对多关系:一个租户可以包含多个账户
  • 关系描述:"包含"
  • 实现方式:Account 表中的 tenant_id 外键关联到 Tenant 表的 id
  • Account 与 App 关系
  • 一对多关系:一个账户可以创建和管理多个应用
  • 关系描述:"创建/管理"
  • 实现方式:通过中间表或权限表实现
  • App 与 AppModelConfig 关系
  • 一对一关系:一个应用有一个对应的模型配置
  • 关系描述:"配置"
  • 实现方式:App 表中的 app_model_config_id 外键关联到 AppModelConfig 表的 id,同时 AppModelConfig 表中的 app_id 外键关联到 App 表的 id
  • App 与 Conversation 关系
  • 一对多关系:一个应用可以包含多个对话
  • 关系描述:"包含"
  • 实现方式:Conversation 表中的 app_id 外键关联到 App 表的 id
  • Conversation 与 Message 关系
  • 一对多关系:一个对话包含多条消息
  • 关系描述:"包含"
  • 实现方式:Message 表中的 conversation_id 外键关联到 Conversation 表的 id
  • Conversation 与 ConversationVariable 关系
  • 一对多关系:一个对话可以包含多个变量
  • 关系描述:"包含"
  • 实现方式:ConversationVariable 表中的 conversation_id 外键关联到 Conversation 表的 id
  • App 与 Workflow 关系
  • 一对多关系:一个应用可以关联多个工作流(不同版本)
  • 关系描述:"关联"
  • 实现方式:Workflow 表中的 app_id 外键关联到 App 表的 id,同时 App 表中的 workflow_id 外键关联到当前使用的 Workflow 表的 id
  • EndUser 与 Conversation 关系
  • 一对多关系:一个终端用户可以发起多个对话
  • 关系描述:"发起"
  • 实现方式:Conversation 表中的 from_end_user_id 外键关联到 EndUser 表的 id

4.1.3 数据模型设计特点

  1. 多租户架构 :通过 Tenant 实体实现多租户隔离,每个租户拥有自己的账户、应用和数据
  2. 灵活的应用配置 :通过 AppModelConfig 实现对不同模型和参数的灵活配置
  3. 完整的对话历史 :通过 Conversation 和 Message 实体存储完整的对话历史和上下文
  4. 工作流支持 :通过 Workflow 实体支持复杂的 AI 处理流程定义
  5. 变量存储 :通过 ConversationVariable 支持对话过程中的变量存储和使用
  6. 用户区分 :区分平台用户(Account)和应用终端用户(EndUser)

五、关键流程

5.1 消息处理流程图

以下流程图展示了 Dify 聊天助手应用处理用户消息的过程:

picture.image

5.2 消息处理源代码实现

消息处理的入口点在控制器层,主要由ChatApi类处理:

  
# api/controllers/web/completion.py  
class ChatApi(WebApiResource):  
    def post(self, app\_model, end\_user):  
        app\_mode = AppMode.value\_of(app\_model.mode)  
        if app\_mode notin {AppMode.CHAT, AppMode.AGENT\_CHAT, AppMode.ADVANCED\_CHAT}:  
            raise NotChatAppError()  
  
        parser = reqparse.RequestParser()  
        parser.add\_argument("inputs", type=dict, required=True, 
   
 location
 ="json")  
        parser.add\_argument("query", type=str, required=True, 
   
 location
 ="json")  
        parser.add\_argument("files", type=list, required=False, 
   
 location
 ="json")  
        parser.add\_argument("response\_mode", type=str, choices=["blocking", "streaming"], 
   
 location
 ="json")  
        parser.add\_argument("conversation\_id", type=uuid\_value, 
   
 location
 ="json")  
        parser.add\_argument("parent\_message\_id", type=uuid\_value, required=False, location="json")  
        parser.add\_argument("retriever\_from", type=str, required=False, default="web\_app", location="json")  
  
        args = parser.parse\_args()  
  
        streaming = args["response\_mode"] == "streaming"  
        args["auto\_generate\_name"] = False  
  
        try:  
            
   
 response
  = AppGenerateService.generate(  
                app\_model=app\_model, user=end\_user, args=args, invoke\_from=InvokeFrom.WEB\_APP, streaming=streaming  
            )  
  
            return helper.compact\_generate\_response(
   
 response
 )  
        
 
 except
  services.errors.conversation.ConversationNotExistsError:  
            raise NotFound("Conversation Not Exists.")  
        # 异常处理...  

消息生成的核心服务在AppGenerateService类中:

  
# api/services/app\_generate\_service.py  
class AppGenerateService:  
    @classmethod  
    def generate(  
        cls,  
        app\_model: App,  
        user: Union[Account, EndUser],  
        args: Mapping[str, Any],  
        invoke\_from: InvokeFrom,  
        streaming: bool = True,  
    ):  
        # 系统级别限流检查  
        if dify\_config.BILLING\_ENABLED:  
            # 检查是否是免费计划  
            # ...  
  
        # 应用级别限流  
        max\_active\_request = AppGenerateService.\_get\_max\_active\_requests(app\_model)  
        rate\_limit = RateLimit(app\_model.id, max\_active\_request)  
        request\_id = RateLimit.gen\_request\_key()  
        try:  
            request\_id = rate\_limit.enter(request\_id)  
            # 根据应用模式选择不同的生成器  
            if app\_model.mode == AppMode.COMPLETION.value:  
                return rate\_limit.generate(  
                    CompletionAppGenerator.convert\_to\_event\_stream(  
                        CompletionAppGenerator().generate(  
                            app\_model=app\_model, user=user, args=args, invoke\_from=invoke\_from, streaming=streaming  
                        ),  
                    ),  
                    request\_id=request\_id,  
                )  
            elif app\_model.mode == AppMode.AGENT\_CHAT.value or app\_model.is\_agent:  
                # Agent聊天模式  
                # ...  
            elif app\_model.mode == AppMode.CHAT.value:  
                # 普通聊天模式  
                # ...  
            elif app\_model.mode == AppMode.ADVANCED\_CHAT.value:  
                # 高级聊天模式  
                # ...  
            # ...  
        except RateLimitError as e:  
            raise InvokeRateLimitError(str(e))  
        # ...  

具体的消息生成逻辑在各个 AppGenerator 类中实现,以 ChatAppGenerator 为例:

  
# api/core/app/apps/chat/app\_generator.py  
class ChatAppGenerator(MessageBasedAppGenerator):  
    def generate(  
        self,  
        app\_model: App,  
        user: Union[Account, EndUser],  
        args: Mapping[str, Any],  
        invoke\_from: InvokeFrom,  
        streaming: bool = True,  
    ) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:  
        # 验证查询参数  
        ifnot args.get("query"):  
            raise ValueError("query is required")  
  
        query = args["query"]  
        inputs = args["inputs"]  
        extras = {"auto\_generate\_conversation\_name": args.get("auto\_generate\_name", True)}  
  
        # 获取或创建对话  
        conversation = None  
        conversation\_id = args.get("conversation\_id")  
        if conversation\_id:  
            conversation = ConversationService.get\_conversation(  
                app\_model=app\_model, conversation\_id=conversation\_id, user=user  
            )  
        # 获取应用模型配置  
        app\_model\_config = self.\_get\_app\_model\_config(app\_model=app\_model, conversation=conversation)  
  
        # 验证覆盖模型配置  
        # ...  
  
        # 解析文件  
        # ...  
  
        # 转换为应用配置  
        # ...  
  
        # 初始化应用生成实体  
        application\_generate\_entity = ChatAppGenerateEntity(  
            task\_id=str(uuid.uuid4()),  
            app\_config=app\_config,  
            model\_conf=ModelConfigConverter.convert(app\_config),  
            file\_upload\_config=file\_extra\_config,  
            conversation\_id=conversation.id if conversation elseNone,  
            inputs=self.\_prepare\_user\_inputs(  
                user\_inputs=inputs, variables=app\_config.variables, tenant\_id=app\_model.tenant\_id  
            ),  
            query=query,  
            files=list(file\_objs),  
            parent\_message\_id=args.get("parent\_message\_id") if invoke\_from != InvokeFrom.SERVICE\_API else UUID\_NIL,  
            user\_id=user.id,  
            invoke\_from=invoke\_from,  
            extras=extras,  
            trace\_manager=trace\_manager,  
            stream=streaming,  
        )  
  
        # 初始化生成记录  
        (conversation, message) = self.\_init\_generate\_records(application\_generate\_entity, conversation)  
  
        # 初始化队列管理器  
        queue\_manager = MessageBasedAppQueueManager(  
            task\_id=application\_generate\_entity.task\_id,  
            user\_id=application\_generate\_entity.user\_id,  
            invoke\_from=application\_generate\_entity.invoke\_from,  
            conversation\_id=conversation.id,  
            app\_mode=conversation.mode,  
            message\_id=message.id,  
        )  
  
        # 在新线程中执行生成任务  
        worker\_thread = threading.Thread(target=worker\_with\_context)  
        worker\_thread.start()  
  
        # 返回响应或流生成器  
        response = self.\_handle\_response(  
            application\_generate\_entity=application\_generate\_entity,  
            queue\_manager=queue\_manager,  
            conversation=conversation,  
            message=message,  
            user=user,  
            stream=streaming,  
        )  
  
        return ChatAppGenerateResponseConverter.convert(response=response, invoke\_from=invoke\_from)  

5.3 Conversation 管理流程

以下流程图详细展示了 Conversation 的管理过程:

picture.image

5.4 Conversation 管理源代码实现

Conversation 管理的核心实现在ConversationService类中:

  
# api/services/conversation\_service.py  
class ConversationService:  
    @classmethod  
    def get\_conversation(cls, app\_model: App, conversation\_id: str, user: Optional[Union[Account, EndUser]]):  
        conversation = (  
            db.session.query(Conversation)  
            .filter(  
                Conversation.id == conversation\_id,  
                Conversation.app\_id == app\_model.id,  
                Conversation.from\_source == ("api"if isinstance(user, EndUser) else"console"),  
                Conversation.from\_end\_user\_id == (user.id if isinstance(user, EndUser) elseNone),  
                Conversation.from\_account\_id == (user.id if isinstance(user, Account) elseNone),  
                Conversation.is\_deleted == False,  
            )  
            .first()  
        )  
  
        ifnot conversation:  
            raise ConversationNotExistsError()  
  
        return conversation  
      
    @classmethod  
    def rename(cls, app\_model: App, conversation\_id: str, user: Optional[Union[Account, EndUser]], name: str, auto\_generate: bool):  
        conversation = cls.get\_conversation(app\_model, conversation\_id, user)  
  
        if auto\_generate:  
            return cls.auto\_generate\_name(app\_model, conversation)  
        else:  
            conversation.name = name  
            conversation.updated\_at = datetime.now(UTC).
   
 replace
 (tzinfo=None)  
            db.session.commit()  
  
        return conversation  
      
    @classmethod  
    def auto\_generate\_name(cls, app\_model: App, conversation: Conversation):  
        # 获取对话的第一条消息  
        message = (  
            db.session.query(Message)  
            .filter(Message.app\_id == app\_model.id, Message.conversation\_id == conversation.id)  
            .order\_by(Message.created\_at.asc())  
            .first()  
        )  
  
        ifnot message:  
            raise MessageNotExistsError()  
  
        # 生成对话名称  
        try:  
            name = LLMGenerator.generate\_conversation\_name(  
                app\_model.tenant\_id, message.query, conversation.id, app\_model.id  
            )  
            conversation.name = name  
        
 
 except
 :  
            pass  
  
        db.session.commit()  
  
        return conversation  
      
    @classmethod  
    def delete(cls, app\_model: App, conversation\_id: str, user: Optional[Union[Account, EndUser]]):  
        conversation = cls.get\_conversation(app\_model, conversation\_id, user)  
  
        conversation.is\_deleted = True  
        conversation.updated\_at = datetime.now(UTC).
   
 replace
 (tzinfo=None)  
        db.session.commit()  

在消息生成过程中,Conversation 的创建和更新主要在 ChatAppGenerator 类的 \_init\_generate\_records 方法中实现:

  
# api/core/app/apps/chat/app\_generator.py (简化示例)  
def \_init\_generate\_records(self, application\_generate\_entity: ChatAppGenerateEntity, conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:  
    # 获取或创建对话  
    ifnot conversation:  
        # 创建新对话  
        conversation = Conversation(  
            id=str(uuid.uuid4()),  
            app\_id=application\_generate\_entity.app\_config.app\_id,  
            app\_model\_config\_id=application\_generate\_entity.app\_config.app\_model\_config\_id,  
            model\_provider=application\_generate\_entity.model\_conf.model\_provider,  
            model\_id=application\_generate\_entity.model\_conf.model\_id,  
            mode=application\_generate\_entity.app\_config.mode,  
            name="",  # 初始名称为空  
            inputs=application\_generate\_entity.inputs,  
            status="created",  
            from\_source=("api"if application\_generate\_entity.invoke\_from == InvokeFrom.API else"console"),  
            from\_end\_user\_id=(application\_generate\_entity.user\_id if application\_generate\_entity.invoke\_from == InvokeFrom.API elseNone),  
            from\_account\_id=(application\_generate\_entity.user\_id if application\_generate\_entity.invoke\_from != InvokeFrom.API elseNone),  
            invoke\_from=application\_generate\_entity.invoke\_from.value,  
        )  
        db.session.add(conversation)  
    else:  
        # 更新现有对话  
        conversation.status = "processing"  
        conversation.updated\_at = datetime.now(UTC).replace(tzinfo=None)  
      
    # 创建消息记录  
    message = Message(  
        id=str(uuid.uuid4()),  
        conversation\_id=conversation.id,  
        app\_id=application\_generate\_entity.app\_config.app\_id,  
        model\_provider=application\_generate\_entity.model\_conf.model\_provider,  
        model\_id=application\_generate\_entity.model\_conf.model\_id,  
        query=application\_generate\_entity.query,  
        inputs=application\_generate\_entity.inputs,  
        message\_files=[],  
        answer="",  
        parent\_message\_id=application\_generate\_entity.parent\_message\_id,  
        user\_id=application\_generate\_entity.user\_id,  
        from\_source=("api"if application\_generate\_entity.invoke\_from == InvokeFrom.API else"console"),  
        from\_end\_user\_id=(application\_generate\_entity.user\_id if application\_generate\_entity.invoke\_from == InvokeFrom.API elseNone),  
        from\_account\_id=(application\_generate\_entity.user\_id if application\_generate\_entity.invoke\_from != InvokeFrom.API elseNone),  
    )  
    db.session.add(message)  
    db.session.commit()  
      
    return conversation, message  

对话变量的管理在消息处理完成后进行:

  
# api/core/app/apps/chat/app\_runner.py (简化示例)  
def \_handle\_variables(self, conversation: Conversation, variables: dict):  
    # 处理对话变量  
    for var\_name, var\_value in variables.items():  
        # 检查变量是否已存在  
        variable = (  
            db.session.query(ConversationVariable)  
            .filter(  
                ConversationVariable.conversation\_id == conversation.id,  
                ConversationVariable.name == var\_name,  
            )  
            .first()  
        )  
          
        if variable:  
            # 更新现有变量  
            variable.value = var\_value  
        else:  
            # 创建新变量  
            variable = ConversationVariable(  
                id=str(uuid.uuid4()),  
                conversation\_id=conversation.id,  
                app\_id=conversation.app\_id,  
                name=var\_name,  
                value=var\_value,  
            )  
            db.session.add(variable)  
      
    db.session.commit()  

六、对话状态图

以下状态图展示了 Dify 聊天助手应用中对话的状态变化:

picture.image

6.1 对话状态源代码实现

对话状态在数据库模型中的定义:

  
# api/models/model.py  
class Conversation(Base):  
    \_\_tablename\_\_ = "conversations"  
    # ...  
    status = db.Column(db.
   
 String
 (255), nullable=False)  
    # ...  

对话状态的初始化在\_init\_generate\_records方法中实现:

  
# api/core/app/apps/message\_based\_app\_generator.py  
def \_init\_generate\_records(self, application\_generate\_entity: Union[ChatAppGenerateEntity, ...], conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:  
    # ...  
    ifnot conversation:  
        conversation = Conversation(  
            # ...  
            status="normal",  # 初始状态为normal  
            # ...  
        )  
        db.session.add(conversation)  
        db.session.commit()  
        db.session.refresh(conversation)  
    else:  
        conversation.updated\_at = datetime.now(UTC).replace(tzinfo=None)  
        db.session.commit()  
    # ...  

在 ChatAppGenerator 中,对话状态在处理消息前更新为 "processing":

  
# api/core/app/apps/chat/app\_generator.py  
def \_init\_generate\_records(self, application\_generate\_entity: ChatAppGenerateEntity, conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:  
    # 获取或创建对话  
    ifnot conversation:  
        # 创建新对话  
        conversation = Conversation(  
            # ...  
            status="created",  
            # ...  
        )  
        db.session.add(conversation)  
    else:  
        # 更新现有对话  
        conversation.status = "processing"  
        conversation.updated\_at = datetime.now(UTC).replace(tzinfo=None)  
    # ...  

在消息处理完成后,对话状态更新为 "completed":

  
# api/core/app/apps/chat/app\_runner.py  
def run(self, application\_generate\_entity: ChatAppGenerateEntity, queue\_manager: AppQueueManager, conversation: Conversation, message: Message) -> None:  
    # ...  
    # 处理完成后更新对话状态  
    conversation.status = "completed"  
    db.session.commit()  

如果处理过程中出现错误,对话状态会更新为 "

error ":

  
# 错误处理  
try:  
    # 处理消息  
    # ...  
except Exception as e:  
    # 更新对话状态为错误  
    conversation.status = "error"  
    db.session.commit()  
    # 记录错误日志  
    logger.exception("Error when processing message")  

七、消息处理时序图

以下时序图展示了用户与 Dify 聊天助手应用交互的详细过程:

picture.image

7.1 消息处理时序源代码实现

消息处理的时序在代码中主要体现在以下几个关键部分:

  1. 控制器层接收请求并调用服务层:
  
# api/controllers/web/completion.py  
class ChatApi(WebApiResource):  
    def post(self, app\_model, end\_user):  
        # 解析请求参数  
        parser = reqparse.RequestParser()  
        # ...  
        args = parser.parse\_args()  
          
        try:  
            # 调用AppGenerateService生成回复  
            
   
 response
  = AppGenerateService.generate(  
                app\_model=app\_model, user=end\_user, args=args, invoke\_from=InvokeFrom.WEB\_APP, streaming=streaming  
            )  
            return helper.compact\_generate\_response(
   
 response
 )  
        
 
 except
  services.errors.conversation.ConversationNotExistsError:  
            raise NotFound("Conversation Not Exists.")  
        # 异常处理...  

  1. 服务层根据应用模式选择不同的生成器:
  
# api/services/app\_generate\_service.py  
class AppGenerateService:  
    @classmethod  
    def generate(cls, app\_model: App, user: Union[Account, EndUser], args: Mapping[str, Any], invoke\_from: InvokeFrom, streaming: bool = True):  
        # 限流检查  
        # ...  
          
        # 根据应用模式选择不同的生成器  
        if app\_model.mode == AppMode.CHAT.value:  
            return rate\_limit.generate(  
                ChatAppGenerator.convert\_to\_event\_stream(  
                    ChatAppGenerator().generate(  
                        app\_model=app\_model, user=user, args=args, invoke\_from=invoke\_from, streaming=streaming  
                    ),  
                ),  
                request\_id=request\_id,  
            )  
        # 其他模式...  

  1. 生成器初始化对话和消息记录:
  
# api/core/app/apps/chat/app\_generator.py  
def generate(self, app\_model: App, user: Union[Account, EndUser], args: Mapping[str, Any], invoke\_from: InvokeFrom, streaming: bool = True):  
    # 获取或创建对话  
    conversation = None  
    conversation\_id = args.get("conversation\_id")  
    if conversation\_id:  
        conversation = ConversationService.get\_conversation(  
            app\_model=app\_model, conversation\_id=conversation\_id, user=user  
        )  
      
    # 初始化应用生成实体  
    application\_generate\_entity = ChatAppGenerateEntity(  
        # 参数设置  
        # ...  
    )  
      
    # 初始化生成记录  
    (conversation, message) = self.\_init\_generate\_records(application\_generate\_entity, conversation)  
      
    # 在新线程中执行生成任务  
    worker\_thread = threading.Thread(target=worker\_with\_context)  
    worker\_thread.start()  
      
    # 返回响应或流生成器  
    response = self.\_handle\_response(  
        application\_generate\_entity=application\_generate\_entity,  
        queue\_manager=queue\_manager,  
        conversation=conversation,  
        message=message,  
        user=user,  
        stream=streaming,  
    )  
      
    return ChatAppGenerateResponseConverter.convert(response=response, invoke\_from=invoke\_from)  

  1. 工作线程中执行实际的生成任务:
  
# api/core/app/apps/chat/app\_generator.py  
def \_generate\_worker(self, flask\_app: Flask, application\_generate\_entity: ChatAppGenerateEntity, queue\_manager: AppQueueManager, conversation\_id: str, message\_id: str) -> None:  
    with flask\_app.app\_context():  
        try:  
            # 获取对话和消息  
            conversation = self.\_get\_conversation(conversation\_id)  
            message = self.\_get\_message(message\_id)  
            if message isNone:  
                raise MessageNotExistsError("Message not exists")  
  
            # 运行聊天应用  
            runner = ChatAppRunner()  
            runner.run(  
                application\_generate\_entity=application\_generate\_entity,  
                queue\_manager=queue\_manager,  
                conversation=conversation,  
                message=message,  
            )  
        except GenerateTaskStoppedError:  
            pass  
        # 异常处理...  
        finally:  
            db.session.close()  

  1. 消息处理完成后更新状态:
  
# api/core/app/apps/chat/app\_runner.py (简化示例)  
def run(self, application\_generate\_entity, queue\_manager, conversation, message):  
    # 执行RAG检索(如果启用)  
    # 准备模型输入  
    # 调用LLM生成回复  
    # 保存回复到消息  
      
    # 更新对话状态  
    conversation.status = "completed"  
    conversation.updated\_at = datetime.now(UTC).replace(tzinfo=None)  
    db.session.commit()  
      
    # 处理对话变量(如果有)  
    if new\_variables:  
        for var\_name, var\_value in new\_variables.items():  
            # 创建或更新对话变量  
            # ...  

八、数据生命周期

为了更好地理解 Dify 系统中数据的完整生命周期,展示主要数据实体在系统中的创建、使用、更新和删除过程。

8.1 App 数据生命周期

以下图表展示了 App 实体的完整生命周期,以及与其他数据模型的关联:

picture.image

8.2 Conversation 数据生命周期

以下图表展示了 Conversation 实体的完整生命周期,以及与 Message 和 ConversationVariable 的关联:

picture.image

九、总结

Dify 提供了一个全面的 LLM 应用开发平台,通过直观的界面和强大的后端支持,使开发者能够快速构建和部署AI应用。

Dify 聊天应用机制以“会话-消息”为核心,结合异步队列、流式返回、丰富的扩展表设计,实现了高效、可扩展的智能对话服务。其与认证、插件、知识库等模块协作,形成了强大的智能应用生态。

参考资料

https://github.com/langgenius/dify (v1.4.1)

十、推荐阅读

👆👆👆欢迎关注,一起进步👆👆👆

欢迎留言讨论哈

🧐点赞、分享、推荐 ,一键三连,养成习惯👍

0
0
0
0
评论
未登录
暂无评论