Dify 是一个开源的 LLM 应用开发平台,提供了直观的界面,结合了 AI 工作流、RAG、Agent、模型管理、可观测性功能等,让用户可以快速从原型到生产。
- 从零开始学 Dify
- 从零开始学 Dify-系统架构
- 从零开始学 Dify- 帐户与租户管理系统设计揭秘
- 从零开始学 Dify- 模型提供者系统设计实现模型统一调用接口
- 从零开始学 Dify- RAG 知识库系统设计详解
- 从零开始学 Dify- 对话系统的关键功能
- 从零开始学 Dify- 工作流(Workflow)系统架构
- 从零开始学 Dify-扫描、加载和管理模型提供者的详细过程
- 从零开始学 Dify-详细介绍 Dify 模型运行时的核心架构
- 从零开始学 Dify - Dify 的 RAG 系统如何有效地处理和检索大量文档?
- 从零开始学 Dify - 详细介绍 Dify 工具(Tool)系统核心架构设计
- 从零开始学 Dify - 万字详解RAG父子分段模式的原理与实现
- 从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误
本文将深入分析 Dify 聊天助手应用的实现机制。
👆👆👆欢迎关注,一起进步👆👆👆
一、聊天助手应用创建流程图
以下流程图展示了在 Dify 中创建聊天助手应用的过程:
二、应用创建代码分析
2.1 控制器层
应用创建流程从控制器层开始,主要在api/controllers/console/app/app.py文件中的AppListApi.post方法:
- 接收前端请求,解析参数:
name、description、mode、icon\_type、icon和icon\_background - 验证用户权限(要求用户为
editor角色) - 验证必要参数,特别是
mode参数(可选值:chat、agent-chat、advanced-chat、workflow和completion) - 调用
AppService.create\_app方法创建应用 - 返回应用信息和201状态码
2.2 服务层
服务层的实现在api/services/app\_service.py文件中的create\_app方法:
- 根据应用模式(
AppMode)从default\_app\_templates获取默认应用模板 - 处理默认模型实例的获取和设置
- 创建
App实例并设置其属性(名称、描述、模式、图标、租户ID等) - 创建
AppModelConfig实例并设置其属性 - 将应用和关联的模型配置保存到数据库
- 触发
app\_was\_created事件 - 返回创建的应用实例
2.3 模型定义
应用创建涉及的主要数据模型定义在api/models/model.py文件中:
App模型:应用的核心数据结构,包含应用的基本信息和配置AppModelConfig模型:应用的模型配置,包含模型提供商、模型ID、参数等InstalledApp模型:记录应用的安装信息,包含租户ID、应用ID、位置等Site模型:记录应用的站点信息,包含标题、图标、默认语言等
2.4 默认应用模板
默认应用模板定义在`api/
constants
/model_template.py文件中的default_app_templates`字典:
- 为不同的应用模式(
WORKFLOW、COMPLETION、CHAT、ADVANCED\_CHAT、AGENT\_CHAT)定义默认配置 - 包含
enable\_site和enable\_api等默认设置
2.5 事件处理
应用创建后触发的事件处理在以下文件中实现:
api/events/event\_handlers/create\_installed\_app\_when\_app\_created.py:创建InstalledApp记录api/events/event\_handlers/create\_site\_record\_when\_app\_created.py:创建Site记录
这些事件处理器通过app\_was\_created信号连接到应用创建过程,确保在应用创建后自动创建相关记录。
三、系统数据流图
接下来将分析 Dify 项目中“聊天应用”的实现机制,涵盖消息的收发、会话管理、消息流式/阻塞返回、上下文与多轮对话、文件与反馈等核心能力。该机制是 Dify 支持 AI 聊天、客服、知识问答等场景的基础,直接影响用户体验、系统扩展性与业务创新能力。
3.1 以"会话-消息"为核心的流程图
以下是 Dify 系统中主要数据流的示意图:
3.2 相关源码定位
|
角色/功能
|
主要模块/文件
|
主要职责说明
|
| --- | --- | --- |
|
API 入口
| api/controllers/service\_api/app/conversation.py |
聊天消息的 API 接口,参数解析、鉴权、调用生成逻辑
|
|
消息生成主流程
| api/core/app/apps/chat/app\_generator.py |
聊天消息生成主流程,队列与 worker 管理
|
|
消息队列管理
| api/core/app/apps/message\_based\_app\_queue\_manager.py |
消息事件推送、流式/阻塞返回
|
|
会话与消息服务
| services/conversation\_service.py |
会话、消息的增删查改,历史消息查询
|
|
数据模型
| models/model.py
、
models/account.py
|
Conversation、Message、Account 等 ORM 定义
|
|
数据库扩展
| extensions/ext\_database.py |
数据库会话管理
|
|
前端接口文档
| web/app/components/develop/template/template\_chat.zh.mdx |
API 说明、参数与返回格式
|
3.3 代码示例
3.3.1 API 入口
文件:api/controllers/service\_api/app/conversation.py
class ChatMessageApi(Resource):
def post(self):
args = request.get\_json()
user = get\_user\_from\_token(request)
result = ChatAppGenerator().generate(app\_model, user, args, invoke\_from=InvokeFrom.SERVICE\_API)
return result
- 解析请求参数,鉴权,调用核心生成逻辑。
3.3.2 消息生成主流程
文件:api/core/app/apps/chat/app\_generator.py
class ChatAppGenerator(MessageBasedAppGenerator):
def generate(self, app\_model, user, args, invoke\_from, streaming=True):
# 参数校验
query = args["query"]
# 获取/新建会话
conversation = ConversationService.get\_conversation(app\_model, args.get("conversation\_id"), user)
# 创建消息
message = Message(...)
db.session.add(message)
db.session.commit()
# 初始化队列与worker
queue\_manager = MessageBasedAppQueueManager(...)
worker\_thread = threading.Thread(target=worker\_with\_context)
worker\_thread.start()
# 返回流式/阻塞响应
return self.\_handle\_response(...)
- 负责会话与消息的创建、队列与 worker 的初始化、响应的返回。
3.3.3 Worker 线程与消息推送
def \_generate\_worker(self, flask\_app, application\_generate\_entity, queue\_manager, conversation\_id, message\_id):
with flask\_app.app\_context():
conversation = self.\_get\_conversation(conversation\_id)
message = self.\_get\_message(message\_id)
runner = ChatAppRunner()
runner.run(
application\_generate\_entity=application\_generate\_entity,
queue\_manager=queue\_manager,
conversation=conversation,
message=message,
)
- Worker 线程负责实际的 LLM 调用与消息推送,支持异步处理。
3.3.4 消息队列与流式返回
文件:api/core/app/apps/message\_based\_app\_queue\_manager.py
- 负责消息事件的推送,支持 SSE(Server-Sent Events)流式返回,提升用户体验。
四、数据模型设计 (ER图)
以下是Dify聊天助手应用的核心数据模型关系图:
4.1 数据模型详细解释
4.1.1 核心实体说明
- Tenant(租户)
-
id:租户唯一标识符 -
name:租户名称 -
plan:订阅计划类型 -
created\_at:创建时间 -
代表使用 Dify 平台的组织或团队
-
每个租户可以拥有多个账户(Account)
-
关键字段:
- Account(账户)
-
id:账户唯一标识符 -
name:用户名 -
email:电子邮箱,用于登录 -
password:加密存储的密码 -
status:账户状态(如活跃、禁用等) -
last\_login\_at:最后登录时间 -
代表可以登录和使用 Dify 平台的用户
-
每个账户属于一个租户,可以创建和管理多个应用(App)
-
关键字段:
- App(应用)
-
id:应用唯一标识符 -
tenant\_id:所属租户ID -
name:应用名称 -
description:应用描述 -
mode:应用模式(如聊天模式、完成模式等) -
app\_model\_config\_id:关联的模型配置ID -
workflow\_id:关联的工作流ID -
enable\_site:是否启用网站访问 -
enable\_api:是否启用API访问 -
created\_at:创建时间 -
代表在 Dify 平台上创建的 AI 应用,如聊天助手
-
每个应用属于一个租户,由账户创建和管理
-
关键字段:
- AppModelConfig(应用模型配置)
-
id:配置唯一标识符 -
app\_id:关联的应用ID -
provider:模型提供商(如OpenAI、Anthropic等) -
model\_id:使用的模型ID -
configs:模型参数配置(JSON格式) -
pre\_prompt:系统提示词 -
opening\_statement:对话开场白 -
agent\_mode:Agent模式设置 -
定义应用使用的 AI 模型和相关配置
-
每个应用有一个对应的模型配置
-
关键字段:
- Conversation(对话)
-
id:对话唯一标识符 -
app\_id:关联的应用ID -
name:对话名称 -
inputs:对话初始输入 -
from\_end\_user\_id:发起对话的终端用户ID -
from\_account\_id:发起对话的账户ID -
status:对话状态 -
created\_at:创建时间 -
updated\_at:更新时间 -
代表用户与应用之间的一次完整对话
-
每个对话属于一个应用,包含多条消息
-
关键字段:
- Message(消息)
-
id:消息唯一标识符 -
app\_id:关联的应用ID -
conversation\_id:关联的对话ID -
query:用户查询内容 -
message:完整消息内容(JSON格式) -
answer:系统回复内容 -
status:消息处理状态 -
from\_source:消息来源 -
created\_at:创建时间 -
代表对话中的一条消息,包括用户输入和系统回复
-
每条消息属于一个对话
-
关键字段:
- EndUser(终端用户)
-
id:用户唯一标识符 -
session\_id:会话ID -
type:用户类型 -
created\_at:创建时间 -
代表使用应用的最终用户
-
一个终端用户可以发起多个对话
-
关键字段:
- Workflow(工作流)
-
id:工作流唯一标识符 -
tenant\_id:所属租户ID -
app\_id:关联的应用ID -
type:工作流类型 -
version:工作流版本 -
graph:工作流图定义(文本格式) -
定义应用的处理流程
-
每个工作流关联到一个应用
-
关键字段:
- ConversationVariable(对话变量)
-
id:变量唯一标识符 -
conversation\_id:关联的对话ID -
key:变量名 -
value:变量值 -
存储对话过程中产生的变量
-
每个变量属于一个对话
-
关键字段:
4.1.2 实体关系说明
- Tenant 与 Account 关系
- 一对多关系:一个租户可以包含多个账户
- 关系描述:"包含"
- 实现方式:Account 表中的 tenant_id 外键关联到 Tenant 表的 id
- Account 与 App 关系
- 一对多关系:一个账户可以创建和管理多个应用
- 关系描述:"创建/管理"
- 实现方式:通过中间表或权限表实现
- App 与 AppModelConfig 关系
- 一对一关系:一个应用有一个对应的模型配置
- 关系描述:"配置"
- 实现方式:App 表中的 app_model_config_id 外键关联到 AppModelConfig 表的 id,同时 AppModelConfig 表中的 app_id 外键关联到 App 表的 id
- App 与 Conversation 关系
- 一对多关系:一个应用可以包含多个对话
- 关系描述:"包含"
- 实现方式:Conversation 表中的 app_id 外键关联到 App 表的 id
- Conversation 与 Message 关系
- 一对多关系:一个对话包含多条消息
- 关系描述:"包含"
- 实现方式:Message 表中的 conversation_id 外键关联到 Conversation 表的 id
- Conversation 与 ConversationVariable 关系
- 一对多关系:一个对话可以包含多个变量
- 关系描述:"包含"
- 实现方式:ConversationVariable 表中的 conversation_id 外键关联到 Conversation 表的 id
- App 与 Workflow 关系
- 一对多关系:一个应用可以关联多个工作流(不同版本)
- 关系描述:"关联"
- 实现方式:Workflow 表中的 app_id 外键关联到 App 表的 id,同时 App 表中的 workflow_id 外键关联到当前使用的 Workflow 表的 id
- EndUser 与 Conversation 关系
- 一对多关系:一个终端用户可以发起多个对话
- 关系描述:"发起"
- 实现方式:Conversation 表中的 from_end_user_id 外键关联到 EndUser 表的 id
4.1.3 数据模型设计特点
- 多租户架构 :通过 Tenant 实体实现多租户隔离,每个租户拥有自己的账户、应用和数据
- 灵活的应用配置 :通过 AppModelConfig 实现对不同模型和参数的灵活配置
- 完整的对话历史 :通过 Conversation 和 Message 实体存储完整的对话历史和上下文
- 工作流支持 :通过 Workflow 实体支持复杂的 AI 处理流程定义
- 变量存储 :通过 ConversationVariable 支持对话过程中的变量存储和使用
- 用户区分 :区分平台用户(Account)和应用终端用户(EndUser)
五、关键流程
5.1 消息处理流程图
以下流程图展示了 Dify 聊天助手应用处理用户消息的过程:
5.2 消息处理源代码实现
消息处理的入口点在控制器层,主要由ChatApi类处理:
# api/controllers/web/completion.py
class ChatApi(WebApiResource):
def post(self, app\_model, end\_user):
app\_mode = AppMode.value\_of(app\_model.mode)
if app\_mode notin {AppMode.CHAT, AppMode.AGENT\_CHAT, AppMode.ADVANCED\_CHAT}:
raise NotChatAppError()
parser = reqparse.RequestParser()
parser.add\_argument("inputs", type=dict, required=True,
location
="json")
parser.add\_argument("query", type=str, required=True,
location
="json")
parser.add\_argument("files", type=list, required=False,
location
="json")
parser.add\_argument("response\_mode", type=str, choices=["blocking", "streaming"],
location
="json")
parser.add\_argument("conversation\_id", type=uuid\_value,
location
="json")
parser.add\_argument("parent\_message\_id", type=uuid\_value, required=False, location="json")
parser.add\_argument("retriever\_from", type=str, required=False, default="web\_app", location="json")
args = parser.parse\_args()
streaming = args["response\_mode"] == "streaming"
args["auto\_generate\_name"] = False
try:
response
= AppGenerateService.generate(
app\_model=app\_model, user=end\_user, args=args, invoke\_from=InvokeFrom.WEB\_APP, streaming=streaming
)
return helper.compact\_generate\_response(
response
)
except
services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
# 异常处理...
消息生成的核心服务在AppGenerateService类中:
# api/services/app\_generate\_service.py
class AppGenerateService:
@classmethod
def generate(
cls,
app\_model: App,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke\_from: InvokeFrom,
streaming: bool = True,
):
# 系统级别限流检查
if dify\_config.BILLING\_ENABLED:
# 检查是否是免费计划
# ...
# 应用级别限流
max\_active\_request = AppGenerateService.\_get\_max\_active\_requests(app\_model)
rate\_limit = RateLimit(app\_model.id, max\_active\_request)
request\_id = RateLimit.gen\_request\_key()
try:
request\_id = rate\_limit.enter(request\_id)
# 根据应用模式选择不同的生成器
if app\_model.mode == AppMode.COMPLETION.value:
return rate\_limit.generate(
CompletionAppGenerator.convert\_to\_event\_stream(
CompletionAppGenerator().generate(
app\_model=app\_model, user=user, args=args, invoke\_from=invoke\_from, streaming=streaming
),
),
request\_id=request\_id,
)
elif app\_model.mode == AppMode.AGENT\_CHAT.value or app\_model.is\_agent:
# Agent聊天模式
# ...
elif app\_model.mode == AppMode.CHAT.value:
# 普通聊天模式
# ...
elif app\_model.mode == AppMode.ADVANCED\_CHAT.value:
# 高级聊天模式
# ...
# ...
except RateLimitError as e:
raise InvokeRateLimitError(str(e))
# ...
具体的消息生成逻辑在各个 AppGenerator 类中实现,以 ChatAppGenerator 为例:
# api/core/app/apps/chat/app\_generator.py
class ChatAppGenerator(MessageBasedAppGenerator):
def generate(
self,
app\_model: App,
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke\_from: InvokeFrom,
streaming: bool = True,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
# 验证查询参数
ifnot args.get("query"):
raise ValueError("query is required")
query = args["query"]
inputs = args["inputs"]
extras = {"auto\_generate\_conversation\_name": args.get("auto\_generate\_name", True)}
# 获取或创建对话
conversation = None
conversation\_id = args.get("conversation\_id")
if conversation\_id:
conversation = ConversationService.get\_conversation(
app\_model=app\_model, conversation\_id=conversation\_id, user=user
)
# 获取应用模型配置
app\_model\_config = self.\_get\_app\_model\_config(app\_model=app\_model, conversation=conversation)
# 验证覆盖模型配置
# ...
# 解析文件
# ...
# 转换为应用配置
# ...
# 初始化应用生成实体
application\_generate\_entity = ChatAppGenerateEntity(
task\_id=str(uuid.uuid4()),
app\_config=app\_config,
model\_conf=ModelConfigConverter.convert(app\_config),
file\_upload\_config=file\_extra\_config,
conversation\_id=conversation.id if conversation elseNone,
inputs=self.\_prepare\_user\_inputs(
user\_inputs=inputs, variables=app\_config.variables, tenant\_id=app\_model.tenant\_id
),
query=query,
files=list(file\_objs),
parent\_message\_id=args.get("parent\_message\_id") if invoke\_from != InvokeFrom.SERVICE\_API else UUID\_NIL,
user\_id=user.id,
invoke\_from=invoke\_from,
extras=extras,
trace\_manager=trace\_manager,
stream=streaming,
)
# 初始化生成记录
(conversation, message) = self.\_init\_generate\_records(application\_generate\_entity, conversation)
# 初始化队列管理器
queue\_manager = MessageBasedAppQueueManager(
task\_id=application\_generate\_entity.task\_id,
user\_id=application\_generate\_entity.user\_id,
invoke\_from=application\_generate\_entity.invoke\_from,
conversation\_id=conversation.id,
app\_mode=conversation.mode,
message\_id=message.id,
)
# 在新线程中执行生成任务
worker\_thread = threading.Thread(target=worker\_with\_context)
worker\_thread.start()
# 返回响应或流生成器
response = self.\_handle\_response(
application\_generate\_entity=application\_generate\_entity,
queue\_manager=queue\_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
return ChatAppGenerateResponseConverter.convert(response=response, invoke\_from=invoke\_from)
5.3 Conversation 管理流程
以下流程图详细展示了 Conversation 的管理过程:
5.4 Conversation 管理源代码实现
Conversation 管理的核心实现在ConversationService类中:
# api/services/conversation\_service.py
class ConversationService:
@classmethod
def get\_conversation(cls, app\_model: App, conversation\_id: str, user: Optional[Union[Account, EndUser]]):
conversation = (
db.session.query(Conversation)
.filter(
Conversation.id == conversation\_id,
Conversation.app\_id == app\_model.id,
Conversation.from\_source == ("api"if isinstance(user, EndUser) else"console"),
Conversation.from\_end\_user\_id == (user.id if isinstance(user, EndUser) elseNone),
Conversation.from\_account\_id == (user.id if isinstance(user, Account) elseNone),
Conversation.is\_deleted == False,
)
.first()
)
ifnot conversation:
raise ConversationNotExistsError()
return conversation
@classmethod
def rename(cls, app\_model: App, conversation\_id: str, user: Optional[Union[Account, EndUser]], name: str, auto\_generate: bool):
conversation = cls.get\_conversation(app\_model, conversation\_id, user)
if auto\_generate:
return cls.auto\_generate\_name(app\_model, conversation)
else:
conversation.name = name
conversation.updated\_at = datetime.now(UTC).
replace
(tzinfo=None)
db.session.commit()
return conversation
@classmethod
def auto\_generate\_name(cls, app\_model: App, conversation: Conversation):
# 获取对话的第一条消息
message = (
db.session.query(Message)
.filter(Message.app\_id == app\_model.id, Message.conversation\_id == conversation.id)
.order\_by(Message.created\_at.asc())
.first()
)
ifnot message:
raise MessageNotExistsError()
# 生成对话名称
try:
name = LLMGenerator.generate\_conversation\_name(
app\_model.tenant\_id, message.query, conversation.id, app\_model.id
)
conversation.name = name
except
:
pass
db.session.commit()
return conversation
@classmethod
def delete(cls, app\_model: App, conversation\_id: str, user: Optional[Union[Account, EndUser]]):
conversation = cls.get\_conversation(app\_model, conversation\_id, user)
conversation.is\_deleted = True
conversation.updated\_at = datetime.now(UTC).
replace
(tzinfo=None)
db.session.commit()
在消息生成过程中,Conversation 的创建和更新主要在 ChatAppGenerator 类的 \_init\_generate\_records 方法中实现:
# api/core/app/apps/chat/app\_generator.py (简化示例)
def \_init\_generate\_records(self, application\_generate\_entity: ChatAppGenerateEntity, conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:
# 获取或创建对话
ifnot conversation:
# 创建新对话
conversation = Conversation(
id=str(uuid.uuid4()),
app\_id=application\_generate\_entity.app\_config.app\_id,
app\_model\_config\_id=application\_generate\_entity.app\_config.app\_model\_config\_id,
model\_provider=application\_generate\_entity.model\_conf.model\_provider,
model\_id=application\_generate\_entity.model\_conf.model\_id,
mode=application\_generate\_entity.app\_config.mode,
name="", # 初始名称为空
inputs=application\_generate\_entity.inputs,
status="created",
from\_source=("api"if application\_generate\_entity.invoke\_from == InvokeFrom.API else"console"),
from\_end\_user\_id=(application\_generate\_entity.user\_id if application\_generate\_entity.invoke\_from == InvokeFrom.API elseNone),
from\_account\_id=(application\_generate\_entity.user\_id if application\_generate\_entity.invoke\_from != InvokeFrom.API elseNone),
invoke\_from=application\_generate\_entity.invoke\_from.value,
)
db.session.add(conversation)
else:
# 更新现有对话
conversation.status = "processing"
conversation.updated\_at = datetime.now(UTC).replace(tzinfo=None)
# 创建消息记录
message = Message(
id=str(uuid.uuid4()),
conversation\_id=conversation.id,
app\_id=application\_generate\_entity.app\_config.app\_id,
model\_provider=application\_generate\_entity.model\_conf.model\_provider,
model\_id=application\_generate\_entity.model\_conf.model\_id,
query=application\_generate\_entity.query,
inputs=application\_generate\_entity.inputs,
message\_files=[],
answer="",
parent\_message\_id=application\_generate\_entity.parent\_message\_id,
user\_id=application\_generate\_entity.user\_id,
from\_source=("api"if application\_generate\_entity.invoke\_from == InvokeFrom.API else"console"),
from\_end\_user\_id=(application\_generate\_entity.user\_id if application\_generate\_entity.invoke\_from == InvokeFrom.API elseNone),
from\_account\_id=(application\_generate\_entity.user\_id if application\_generate\_entity.invoke\_from != InvokeFrom.API elseNone),
)
db.session.add(message)
db.session.commit()
return conversation, message
对话变量的管理在消息处理完成后进行:
# api/core/app/apps/chat/app\_runner.py (简化示例)
def \_handle\_variables(self, conversation: Conversation, variables: dict):
# 处理对话变量
for var\_name, var\_value in variables.items():
# 检查变量是否已存在
variable = (
db.session.query(ConversationVariable)
.filter(
ConversationVariable.conversation\_id == conversation.id,
ConversationVariable.name == var\_name,
)
.first()
)
if variable:
# 更新现有变量
variable.value = var\_value
else:
# 创建新变量
variable = ConversationVariable(
id=str(uuid.uuid4()),
conversation\_id=conversation.id,
app\_id=conversation.app\_id,
name=var\_name,
value=var\_value,
)
db.session.add(variable)
db.session.commit()
六、对话状态图
以下状态图展示了 Dify 聊天助手应用中对话的状态变化:
6.1 对话状态源代码实现
对话状态在数据库模型中的定义:
# api/models/model.py
class Conversation(Base):
\_\_tablename\_\_ = "conversations"
# ...
status = db.Column(db.
String
(255), nullable=False)
# ...
对话状态的初始化在\_init\_generate\_records方法中实现:
# api/core/app/apps/message\_based\_app\_generator.py
def \_init\_generate\_records(self, application\_generate\_entity: Union[ChatAppGenerateEntity, ...], conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:
# ...
ifnot conversation:
conversation = Conversation(
# ...
status="normal", # 初始状态为normal
# ...
)
db.session.add(conversation)
db.session.commit()
db.session.refresh(conversation)
else:
conversation.updated\_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
# ...
在 ChatAppGenerator 中,对话状态在处理消息前更新为 "processing":
# api/core/app/apps/chat/app\_generator.py
def \_init\_generate\_records(self, application\_generate\_entity: ChatAppGenerateEntity, conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:
# 获取或创建对话
ifnot conversation:
# 创建新对话
conversation = Conversation(
# ...
status="created",
# ...
)
db.session.add(conversation)
else:
# 更新现有对话
conversation.status = "processing"
conversation.updated\_at = datetime.now(UTC).replace(tzinfo=None)
# ...
在消息处理完成后,对话状态更新为 "completed":
# api/core/app/apps/chat/app\_runner.py
def run(self, application\_generate\_entity: ChatAppGenerateEntity, queue\_manager: AppQueueManager, conversation: Conversation, message: Message) -> None:
# ...
# 处理完成后更新对话状态
conversation.status = "completed"
db.session.commit()
如果处理过程中出现错误,对话状态会更新为 "
error ":
# 错误处理
try:
# 处理消息
# ...
except Exception as e:
# 更新对话状态为错误
conversation.status = "error"
db.session.commit()
# 记录错误日志
logger.exception("Error when processing message")
七、消息处理时序图
以下时序图展示了用户与 Dify 聊天助手应用交互的详细过程:
7.1 消息处理时序源代码实现
消息处理的时序在代码中主要体现在以下几个关键部分:
- 控制器层接收请求并调用服务层:
# api/controllers/web/completion.py
class ChatApi(WebApiResource):
def post(self, app\_model, end\_user):
# 解析请求参数
parser = reqparse.RequestParser()
# ...
args = parser.parse\_args()
try:
# 调用AppGenerateService生成回复
response
= AppGenerateService.generate(
app\_model=app\_model, user=end\_user, args=args, invoke\_from=InvokeFrom.WEB\_APP, streaming=streaming
)
return helper.compact\_generate\_response(
response
)
except
services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
# 异常处理...
- 服务层根据应用模式选择不同的生成器:
# api/services/app\_generate\_service.py
class AppGenerateService:
@classmethod
def generate(cls, app\_model: App, user: Union[Account, EndUser], args: Mapping[str, Any], invoke\_from: InvokeFrom, streaming: bool = True):
# 限流检查
# ...
# 根据应用模式选择不同的生成器
if app\_model.mode == AppMode.CHAT.value:
return rate\_limit.generate(
ChatAppGenerator.convert\_to\_event\_stream(
ChatAppGenerator().generate(
app\_model=app\_model, user=user, args=args, invoke\_from=invoke\_from, streaming=streaming
),
),
request\_id=request\_id,
)
# 其他模式...
- 生成器初始化对话和消息记录:
# api/core/app/apps/chat/app\_generator.py
def generate(self, app\_model: App, user: Union[Account, EndUser], args: Mapping[str, Any], invoke\_from: InvokeFrom, streaming: bool = True):
# 获取或创建对话
conversation = None
conversation\_id = args.get("conversation\_id")
if conversation\_id:
conversation = ConversationService.get\_conversation(
app\_model=app\_model, conversation\_id=conversation\_id, user=user
)
# 初始化应用生成实体
application\_generate\_entity = ChatAppGenerateEntity(
# 参数设置
# ...
)
# 初始化生成记录
(conversation, message) = self.\_init\_generate\_records(application\_generate\_entity, conversation)
# 在新线程中执行生成任务
worker\_thread = threading.Thread(target=worker\_with\_context)
worker\_thread.start()
# 返回响应或流生成器
response = self.\_handle\_response(
application\_generate\_entity=application\_generate\_entity,
queue\_manager=queue\_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
return ChatAppGenerateResponseConverter.convert(response=response, invoke\_from=invoke\_from)
- 工作线程中执行实际的生成任务:
# api/core/app/apps/chat/app\_generator.py
def \_generate\_worker(self, flask\_app: Flask, application\_generate\_entity: ChatAppGenerateEntity, queue\_manager: AppQueueManager, conversation\_id: str, message\_id: str) -> None:
with flask\_app.app\_context():
try:
# 获取对话和消息
conversation = self.\_get\_conversation(conversation\_id)
message = self.\_get\_message(message\_id)
if message isNone:
raise MessageNotExistsError("Message not exists")
# 运行聊天应用
runner = ChatAppRunner()
runner.run(
application\_generate\_entity=application\_generate\_entity,
queue\_manager=queue\_manager,
conversation=conversation,
message=message,
)
except GenerateTaskStoppedError:
pass
# 异常处理...
finally:
db.session.close()
- 消息处理完成后更新状态:
# api/core/app/apps/chat/app\_runner.py (简化示例)
def run(self, application\_generate\_entity, queue\_manager, conversation, message):
# 执行RAG检索(如果启用)
# 准备模型输入
# 调用LLM生成回复
# 保存回复到消息
# 更新对话状态
conversation.status = "completed"
conversation.updated\_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
# 处理对话变量(如果有)
if new\_variables:
for var\_name, var\_value in new\_variables.items():
# 创建或更新对话变量
# ...
八、数据生命周期
为了更好地理解 Dify 系统中数据的完整生命周期,展示主要数据实体在系统中的创建、使用、更新和删除过程。
8.1 App 数据生命周期
以下图表展示了 App 实体的完整生命周期,以及与其他数据模型的关联:
8.2 Conversation 数据生命周期
以下图表展示了 Conversation 实体的完整生命周期,以及与 Message 和 ConversationVariable 的关联:
九、总结
Dify 提供了一个全面的 LLM 应用开发平台,通过直观的界面和强大的后端支持,使开发者能够快速构建和部署AI应用。
Dify 聊天应用机制以“会话-消息”为核心,结合异步队列、流式返回、丰富的扩展表设计,实现了高效、可扩展的智能对话服务。其与认证、插件、知识库等模块协作,形成了强大的智能应用生态。
参考资料
https://github.com/langgenius/dify (v1.4.1)
十、推荐阅读
- 从零开始学 Dify
- 从零开始学 Dify-系统架构
- 从零开始学 Dify- 帐户与租户管理系统设计揭秘
- 从零开始学 Dify- 模型提供者系统设计实现模型统一调用接口
- 从零开始学 Dify- RAG 知识库系统设计详解
- 从零开始学 Dify- 对话系统的关键功能
- 从零开始学 Dify- 工作流(Workflow)系统架构
- 从零开始学 Dify-扫描、加载和管理模型提供者的详细过程
- 从零开始学 Dify-详细介绍 Dify 模型运行时的核心架构
- 从零开始学 Dify - Dify 的 RAG 系统如何有效地处理和检索大量文档?
- 从零开始学 Dify - 详细介绍 Dify 工具(Tool)系统核心架构设计
- 从零开始学 Dify - 万字详解RAG父子分段模式的原理与实现
- 从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误
👆👆👆欢迎关注,一起进步👆👆👆
欢迎留言讨论哈
🧐点赞、分享、推荐 ,一键三连,养成习惯👍
