LLM之RAG实战(五十)| FastAPI:构建基于LLM的WEB接口界面

向量数据库大模型关系型数据库

picture.image

   FastAPI是WEB UI接口,随着LLM的蓬勃发展,FastAPI的生态也迎来了新的机遇本文将围绕FastAPIOpenAI的API以及FastCRUD,来创建一个个性化的电子邮件写作助手,以展示如何结合这些技术来构建强大的应用程序

下面我们开始分步骤操作:

一、安装环境

首先,我们创建一个文件夹


        
            

          mkdir email-assistant-api
        
      

进入到该文件夹


        
            

          cd email-assistant-api
        
      

我们使用poetry来管理python包,首先需要先安装poetry


        
            

          pip install poetry
        
      

进入到email-assistant-api文件夹中,并初始化


        
            

          poetry init
        
      

写下所要求的内容(默认为空),然后按 Enter 键

picture.image

对交互式依赖项键入 no,直到您获得如下内容:

picture.image

   然后只需按 enter 键确认生成。您应该注意到在您的文件夹中创建了一个 pyproject.toml 文件,这是 poetry 用来管理依赖项的文件。


    让我们从添加依赖项开始

          
poetry add fastapi fastcrud sqlmodel openai aiosqlite greenlet python-jose bcrypt
          

      
  现在你应该还注意到一个 poetry.lock 文件,这是 poetry 保存已安装包的实际版本的方式。

二、项目结构

   对于 FastAPI 应用程序,我们有三个主要内容:模型、架构和端点。由于这个 API 很简单,我们可以像这样创建我们的结构:

          
email_assistant_api/
          
│
          
├── app/
          
│   ├── __init__.py
          
│   ├── main.py        # The main application file
          
│   ├── routes.py      # Contains API route definitions and endpoint logic
          
│   ├── database.py    # Database setup and session management
          
│   ├── models.py      # SQLModel models for the application
          
│   ├── crud.py        # CRUD operation implementations using FastCRUD
          
│   ├── schemas.py     # Schemas for request and response models
          
│   └── .env           # Environment variables
          
│
          
├── pyproject.toml     # Project configuration and dependencies
          
├── README.md          # Provides an overview and documentation
          
└── .gitignore         # Files to be ignored by version control
      
  • models.py 中定义我们的模型(数据库表的抽象);
  • schemas.py

用于验证和序列化数据;

  • routes.py定义端点;
  • database.py定义数据库相关信息;
  • crud.py定义与数据库交互的crud操作;
  • .env定义环境变量,比如API Key;
  • main.py定义 FastAPI 应用程序和 API 的入口点;

请注意,此结构适用于小型应用程序,但如果您想为大型应用程序提供更强大的模板,请参考:https://github.com/igorbenav/FastAPI-boilerplate

2.1 对数据库进行建模

对于数据库,我们有一个简单的模型

  • 用户有一个用户名、一个名字、一个电子邮件,我们存储一个哈希密码。
  • 电子邮件日志包含我们为输入定义的内容,以及与此日志关联的用户的时间戳、generated_email 和 ID。
  • 用户可能有多个电子邮件日志。

picture.image

models.py代码示例:


          
# app/models.py
          

          
from sqlmodel import SQLModel, Field
          
from typing import Optional
          

          
class User(SQLModel, table=True):
          
    id: Optional[int] = Field(default=None, primary_key=True)
          
    name: str = Field(..., min_length=2, max_length=30)
          
    username: str = Field(..., min_length=2, max_length=20)
          
    email: str
          
    hashed_password: str
          

          
class EmailLog(SQLModel, table=True):
          
    id: Optional[int] = Field(default=None, primary_key=True)
          
    user_id: int = Field(foreign_key="user.id")
          
    user_input: str
          
    reply_to: Optional[str] = Field(default=None)
          
    context: Optional[str] = Field(default=None)
          
    length: Optional[int] = Field(default=None)
          
    tone: str
          
    generated_email: str
          
    timestamp: str
      
   为了与我们的数据库交互,我们将在 crud.py 中为每个模型实例化 FastCRUD[1]

          
# app/crud.py
          

          
from fastcrud import FastCRUD
          
from .models import User, EmailLog
          
crud_user = FastCRUD(User)
          
crud_email_log = FastCRUD(EmailLog)
      

2.2 创建schemas

schemas.py 中创建我们的 schemas


          
# app/schemas.py
          

          
from datetime import datetime
          
from typing import Optional
          
from sqlmodel import SQLModel, Field
          

          

          
# ------- user -------
          
class UserCreate(SQLModel):
          
    name: str
          
    username: str
          
    email: str
          
    password: str
          

          
class UserRead(SQLModel):
          
    id: int
          
    name: str
          
    username: str
          
    email: str
          

          
class UserCreateInternal(SQLModel):
          
    name: str
          
    username: str
          
    email: str
          
    hashed_password: str
          

          

          
# ------- email -------
          
class EmailRequest(SQLModel):
          
    user_input: str
          
    reply_to: Optional[str] = None
          
    context: Optional[str] = None
          
    length: int = 120
          
    tone: str = "formal"
          

          
class EmailResponse(SQLModel):
          
    generated_email: str
          

          

          
# ------- email log -------
          
class EmailLogCreate(SQLModel):
          
    user_id: int
          
    user_input: str
          
    reply_to: Optional[str] = None
          
    context: Optional[str] = None
          
    length: Optional[int] = None
          
    tone: Optional[str] = None
          
    generated_email: str
          
    timestamp: datetime = Field(
          
      default_factory=lambda: datetime.now(UTC)
          
    )
          

          
class EmailLogRead(SQLModel):
          
    user_id: int
          
    user_input: str
          
    reply_to: Optional[str]
          
    context: Optional[str]
          
    length: Optional[int]
          
    tone: Optional[str]
          
    generated_email: str
          
    timestamp: datetime
      
  • 要创建用户,我们要求提供姓名、用户名、电子邮件和密码(我们将存储哈希值);

  • 我们将默认长度设置为 120,默认tone设置为 “正式”;

  • 我们自动生成 EmailLog 的时间戳

2.3 创建我们的应用程序并设置数据库

   尽管我们已经有了模型和架构,但实际上我们既没有为终端节点提供服务的应用程序,也没有用于创建表的数据库。

下面看一下database.py:


          
# app/database.py
          

          
from sqlmodel import SQLModel, create_engine, AsyncSession
          
from sqlalchemy.ext.asyncio import create_async_engine
          
from sqlalchemy.orm import sessionmaker
          

          
DATABASE_URL = "sqlite+aiosqlite:///./emailassistant.db"
          
engine = create_async_engine(DATABASE_URL, echo=True)
          

          
async_session = sessionmaker(
          
    engine, class_=AsyncSession, expire_on_commit=False
          
)
          

          
async def create_db_and_tables():
          
    async with engine.begin() as conn:
          
        await conn.run_sync(SQLModel.metadata.create_all)
          

          
async def get_session() -> AsyncSession:
          
    async with async_session() as session:
          
        yield session
      
  在这里,我们连接到了一个 SQLite 数据库,创建了一个函数来创建我们的数据库和表,以及一个允许我们与该数据库交互的会话。

现在让我们最终创建我们的 FastAPI 应用程序:


          
# app/main.py
          

          
from fastapi import FastAPI
          
from .database import create_db_and_tables
          

          
async def lifespan(app):
          
    await create_db_and_tables()
          
    yield
          

          
app = FastAPI(lifespan=lifespan)
      

我们定义lifespan,以便在启动时创建 db 和 tables。

让我们运行一下代码来测试一下:


        
            

          poetry run fastapi run
        
      

结果如下所示:

picture.image

在浏览器登录如下地址:


        
            

          
 127.0.0.1:8000/docs
 
        
      

可以看到如下界面:

picture.image

可以在终端中按 Ctrl C 暂时关闭应用程序。

2.4 创建端点

   接下来,创建端点来生成电子邮件。



   首先在 .env 中输入 OpenAI API 密钥(这将被 .gitignore 忽略,并且不会出现在我们的存储库中):

          
# app/.env
          

          
OPENAI_API_KEY="my_openai_api_key"
      

将其写入 .gitignore 以确保不会提交此 API 密钥:


          
# .gitignore
          

          
.env
          
.venv
          
env/
          
venv/
          
ENV/
          
env.bak/
          
venv.bak/
      

现在从 .env 中获取 OpenAI API 密钥并实例化客户端:


          
# app/routes.py
          

          
import os
          

          
from starlette.config import Config
          
from openai import OpenAI
          

          
current_file_dir = os.path.dirname(os.path.realpath(__file__))
          
env_path = os.path.join(current_file_dir, ".env")
          

          
config = Config(env_path)
          

          
OPENAI_API_KEY = config("OPENAI_API_KEY")
          
open_ai_client = OpenAI(api_key=OPENAI_API_KEY)
      

为电子邮件终端节点创建一个路由器,并实际为电子邮件创建终端节点:

  • 我们将创建一个系统提示符,使输出适应我们想要的结果;
  • 我们将创建一个基本提示,用于格式化传递的信息;
  • 然后我们将此信息传递给 OpenAI 客户端;
  • 最后,我们将在数据库中创建一个日志条目并返回生成的电子邮件

          
# app/routes.py
          

          
...
          

          
from openai import OpenAI
          
from fastapi import APIRouter, Depends, HTTPException
          

          
from .schemas import EmailRequest, EmailResponse
          
from .database import get_session
          

          
...
          

          
# ------- email -------
          
email_router = APIRouter()
          
@email_router.post("/", response_model=EmailResponse)
          
async def generate_email(
          
    request: EmailRequest, 
          
    db: AsyncSession = Depends(get_session)
          
):
          
    try:
          
        system_prompt = f"""
          
        You are a helpful email assistant. 
          
        You get a prompt to write an email,
          
        you reply with the email and nothing else.
          
        """
          
        
          
        prompt = f"""
          
        Write an email based on the following input:
          
        - User Input: {request.user_input}
          
        - Reply To: {request.reply_to if request.reply_to else 'N/A'}
          
        - Context: {request.context if request.context else 'N/A'}
          
        - Length: {request.length if request.length else 'N/A'} characters
          
        - Tone: {request.tone if request.tone else 'N/A'}
          
        """
          
        
          
        response = await open_ai_client.chat.completions.create(
          
            model="gpt-3.5-turbo",
          
            messages=[
          
                {"role": "system", "content": system_prompt},
          
                {"role": "user", "content": prompt},
          
            ],
          
            max_tokens=request.length
          
        )
          
        
          
        generated_email = response.choices[0].message['content'].strip()
          
        log_entry = EmailLogCreate(
          
            user_id=request.user_id,
          
            user_input=request.user_input,
          
            reply_to=request.reply_to,
          
            context=request.context,
          
            length=request.length,
          
            tone=request.tone,
          
            generated_email=generated_email,
          
        )
          
        await crud_email_logs.create(db, log_entry)
          

          
        return EmailResponse(generated_email=generated_email)
          
    except Exception as e:
          
        raise HTTPException(status_code=500, detail=str(e))
      
  现在定义 app/main.py ,将这个电子邮件路由器包含到我们的 FastAPI 应用程序中:

          
# app/main.py
          

          
from fastapi import FastAPI
          

          
from .database import create_db_and_tables
          
from .routes import email_router
          

          
async def lifespan(app):
          
    await create_db_and_tables()
          
    yield
          

          
app = FastAPI(lifespan=lifespan)
          
app.include_router(email_router, prefix="/generate", tags=["Email"])
      
   再次保存并运行 FastAPI 应用程序 (127.0.0.1:8000/docs),会看到如下界面:

picture.image

  点击这个新创建的 post 端点,传递一些信息并单击 execute

picture.image

可以得到如下内容:

picture.image

   结果是我们所希望的回应,但是目前还无法通过查看日志来判断系统是否正常工作,因此让我们也创建电子邮件日志端点:

          
# app/routes.py
          

          
...
          

          
from fastapi import APIRouter, Depends, HTTPException
          
from sqlalchemy.ext.asyncio.session import AsyncSession
          

          
from .schemas import EmailLogCreate, EmailLogRead
          

          
...
          

          
# ------- email log -------
          
log_router = APIRouter()
          

          
@log_router.get("/")
          
async def read_logs(db: AsyncSession = Depends(get_session)):
          
    logs = await crud_email_logs.get_multi(db)
          
    return logs
          

          
@log_router.get("/{log_id}", response_model=EmailLogRead)
          
async def read_log(log_id: int, db: AsyncSession = Depends(get_session)):
          
    log = await crud_email_logs.get(db, id=log_id)
          
    if not log:
          
        raise HTTPException(status_code=404, detail="Log not found")
          
    return log
      
   我们可以按其 ID 查看多个日志或一个日志。让我们也将这个路由器包含在我们的应用程序中:

          
# app/main.py
          

          
from fastapi import FastAPI
          

          
from .database import create_db_and_tables
          
from .routes import email_router, log_router
          

          
async def lifespan(app):
          
    await create_db_and_tables()
          
    yield
          

          
app = FastAPI(lifespan=lifespan)
          

          
app.include_router(email_router, prefix="/generate", tags=["Email"])
          
app.include_router(log_router, prefix="/logs", tags=["Logs"])
      

三、用户功能、身份验证和安全性

现在,让我们添加实际的用户创建功能。首先在终端上运行:


        
            

          openssl rand -hex 32
        
      

然后将结果写入 .env 作为SECRET_KEY


          
# app/.env
          

          
OPENAI_API_KEY="my_openai_api_key"
          
SECRET_KEY="my_secret_key"
      

首先创建一个文件 helper.py 并将以下代码粘贴到其中:


          
# app/helper.py
          

          
import os
          
from datetime import UTC, datetime, timedelta
          
from typing import Any, Annotated
          

          
import bcrypt
          
from jose import JWTError, jwt
          
from fastapi import Depends, HTTPException
          
from fastapi.security import OAuth2PasswordBearer
          
from sqlalchemy.ext.asyncio import AsyncSession
          
from sqlmodel import SQLModel
          
from starlette.config import Config
          

          
from .database import get_session
          
from .crud import crud_users
          

          
current_file_dir = os.path.dirname(os.path.realpath(__file__))
          
env_path = os.path.join(current_file_dir, ".env")
          
config = Config(env_path)
          

          

          
# Security settings
          
SECRET_KEY = config("SECRET_KEY")
          
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/login")
          

          

          
# Token models
          
class Token(SQLModel):
          
    access_token: str
          
    token_type: str
          
class TokenData(SQLModel):
          
    username_or_email: str
          

          

          
# Utility functions
          
async def verify_password(plain_password: str, hashed_password: str) -> bool:
          
    """Verify a plain password against a hashed password."""
          
    return bcrypt.checkpw(plain_password.encode(), hashed_password.encode())
          

          
def get_password_hash(password: str) -> str:
          
    """Hash a password."""
          
    return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
          

          
async def create_access_token(
          
    data: dict[str, Any], 
          
    expires_delta: timedelta | None = None
          
) -> str:
          
    """Create a JWT access token."""
          
    to_encode = data.copy()
          
    if expires_delta:
          
        expire = datetime.now(UTC).replace(tzinfo=None) + expires_delta
          
    else:
          
        expire = datetime.now(UTC).replace(tzinfo=None) + timedelta(minutes=15)
          
    to_encode.update({"exp": expire})
          
    return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
          

          
async def verify_token(token: str, db: AsyncSession) -> TokenData | None:
          
    """Verify a JWT token and extract the user data."""
          
    try:
          
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
          
        username_or_email: str = payload.get("sub")
          
        if username_or_email is None:
          
            return None
          
        return TokenData(username_or_email=username_or_email)
          
    except JWTError:
          
        return None
          

          
async def authenticate_user(username_or_email: str, password: str, db: AsyncSession):
          
    if "@" in username_or_email:
          
        db_user: dict | None = await crud_users.get(db=db, email=username_or_email, is_deleted=False)
          
    else:
          
        db_user = await crud_users.get(db=db, username=username_or_email, is_deleted=False)
          
    if not db_user:
          
        return False
          
    elif not await verify_password(password, db_user["hashed_password"]):
          
        return False
          
    return db_user
          

          

          
# Dependency
          
async def get_current_user(
          
    token: Annotated[str, Depends(oauth2_scheme)], 
          
    db: Annotated[AsyncSession, Depends(get_session)]
          
) -> dict[str, Any] | None:
          
    """Get the current authenticated user."""
          
    token_data = await verify_token(token, db)
          
    if token_data is None:
          
        raise HTTPException(status_code=401, detail="User not authenticated.")
          

          
    if "@" in token_data.username_or_email:
          
        user = await crud_users.get(
          
            db=db, email=token_data.username_or_email, is_deleted=False
          
        )
          
    else:
          
        user = await crud_users.get(
          
            db=db, username=token_data.username_or_email, is_deleted=False
          
        )
          

          
    if user:
          
        return user
          
    raise HTTPException(status_code=401, detail="User not authenticated.")
      
  • verify_password : 根据哈希密码验证普通密码。它用于检查用户提供的密码是否与存储的哈希密码匹配。
  • get_password_hash : 在将用户提供的密码存储到数据库之前对其进行哈希处理。
  • create_access_token : 用于为经过身份验证的用户生成 JWT 类型的令牌。
  • verify_token : 验证 JWT 令牌并提取用户数据。
  • authenticate_user : 负责根据用户的用户名或电子邮件和密码对用户进行身份验证。
  • get_current_user : 是一种依赖项,它根据提供的令牌检索当前经过身份验证的用户。

现在,让我们使用这些实用程序函数来创建用户路由。


          
# app/routes.py
          

          
from datetime import timedelta
          
from fastapi import APIRouter, Depends, HTTPException
          

          
from .database import get_session
          
from .schemas import UserCreate, UserRead
          
from .helper import (
          
    get_password_hash, 
          
    authenticate_user, 
          
    create_access_token, 
          
    get_current_user, 
          
    Token
          
)
          

          
# ------- user -------
          
user_router = APIRouter()
          

          
@user_router.post("/register", response_model=UserRead)
          
async def register_user(
          
    user: UserCreate, 
          
    db: AsyncSession = Depends(get_session)
          
):
          
    hashed_password = get_password_hash(user.password)
          
    user_data = user.dict()
          
    user_data["hashed_password"] = hashed_password
          
    del user_data["password"]
          
    
          
    new_user = await crud_users.create(
          
        db, 
          
        object=UserCreateInternal(**user_data)
          
    )
          
    return new_user
          

          
@user_router.post("/login", response_model=Token)
          
async def login_user(user: UserCreate, db: AsyncSession = Depends(get_session)):
          
    db_user = await crud_users.get(db, email=user.email)
          
    password_verified = await verify_password(
          
      user.password, db_user.hashed_password
          
    )
          
    if not db_user or not password_verified:
          
        raise HTTPException(status_code=400, detail="Invalid credentials")
          
    
          
    access_token_expires = timedelta(minutes=30)
          
    access_token = await create_access_token(
          
        data={"sub": user["username"]}, 
          
        expires_delta=access_token_expires
          
    )
          
    return {"access_token": access_token, "token_type": "bearer"}
      

并将路由器包含在我们的应用程序中:


          
# app/main.py
          

          
from fastapi import FastAPI
          

          
from .database import create_db_and_tables
          
from .routes import user_router, email_router, log_router
          

          
async def lifespan(app):
          
    await create_db_and_tables()
          
    yield
          

          
app = FastAPI(lifespan=lifespan)
          

          
app.include_router(user_router, prefix="/users", tags=["Users"])
          
app.include_router(email_router, prefix="/generate", tags=["Email"])
          
app.include_router(log_router, prefix="/logs", tags=["Logs"])
      
  最后,让我们在 generate\_email 端点中注入 get\_current\_user 依赖项,添加用户登录以生成电子邮件的需求,此外,还会自动将用户的 ID 存储在日志中:

          
# app/routes.py
          

          
...
          

          
@email_router.post("/", response_model=EmailResponse)
          
async def generate_email(
          
    request: EmailRequest, 
          
    db: AsyncSession = Depends(get_session),
          
    current_user: dict = Depends(get_current_user)
          
):
          
    try:
          
        prompt = f"""
          
        Write an email based on the following input:
          
        - User Input: {request.user_input}
          
        - Reply To: {request.reply_to if request.reply_to else 'N/A'}
          
        - Context: {request.context if request.context else 'N/A'}
          
        - Length: {request.length if request.length else 'N/A'} characters
          
        - Tone: {request.tone if request.tone else 'N/A'}
          
        """
          
        
          
        response = open_ai_client.chat.completions.create(
          
            model="gpt-3.5-turbo",
          
            messages=[
          
                {"role": "system", "content": "You are a helpful email assistant."},
          
                {"role": "user", "content": prompt}
          
            ],
          
            max_tokens=request.length
          
        )
          
        generated_email = response.choices[0].message.content
          

          
        log_entry = EmailLogCreate(
          
            user_id=current_user['id'],
          
            user_input=request.user_input,
          
            reply_to=request.reply_to,
          
            context=request.context,
          
            length=request.length,
          
            tone=request.tone,
          
            generated_email=generated_email,
          
        )
          
        await crud_email_logs.create(db, log_entry)
          

          
        return EmailResponse(generated_email=generated_email)
          
    except Exception as e:
          
        raise HTTPException(status_code=500, detail=str(e))
      

如果现在检查终端节点,您将在右侧看到一个小锁。

picture.image

  现在运行程序,需要进行身份验证,您可以通过单击锁并在此处传递有效的用户名和密码(您创建的用户)来完成:

picture.image

 现在,我们还将此依赖项设置为日志端点,此外,让我们使用 FastCRUD 仅过滤当前用户 ID 的日志。

我们可以通过注入 get_current_user 依赖项并将 user_id=current_user[“id”] 传递给 FastCRUD 来实现这一点(当前用户是 get_current_user 返回的)。


          
...
          

          
# ------- email log -------
          
log_router = APIRouter()
          

          
@log_router.get("/")
          
async def read_logs(
          
    db: AsyncSession = Depends(get_session),
          
    current_user: dict[str, Any] = Depends(get_current_user)
          
):
          
    logs = await crud_email_logs.get_multi(db, user_id=current_user["id"])
          
    return logs
          

          
@log_router.get("/{log_id}", response_model=EmailLogRead)
          
async def read_log(
          
    log_id: int, 
          
    db: AsyncSession = Depends(get_session),
          
    current_user: dict[str, Any] = Depends(get_current_user)
          
):
          
    log = await crud_email_logs.get(db, id=log_id, user_id=current_user["id"])
          
    if not log:
          
        raise HTTPException(status_code=404, detail="Log not found")
          
    return log
      
  现在,我们实际上只能读取我们自己的日志,而且,只有在登录时才能读取。

最终的 routes 文件:


          
# app/routes.py
          

          
import os
          
from typing import Annotated, Any
          
from datetime import timedelta
          

          
from fastapi import APIRouter, Depends, HTTPException
          
from fastapi.security import OAuth2PasswordRequestForm
          
from sqlalchemy.ext.asyncio.session import AsyncSession
          
from starlette.config import Config
          
from openai import OpenAI
          

          
from .crud import crud_email_logs, crud_users
          
from .database import get_session
          
from .schemas import (
          
    EmailRequest, 
          
    EmailResponse, 
          
    EmailLogCreate, 
          
    EmailLogRead,
          
    UserCreate, 
          
    UserRead, 
          
    UserCreateInternal, 
          
)
          
from .helper import (
          
    get_password_hash, 
          
    authenticate_user, 
          
    create_access_token, 
          
    get_current_user, 
          
    Token
          
)
          

          
current_file_dir = os.path.dirname(os.path.realpath(__file__))
          
env_path = os.path.join(current_file_dir, ".env")
          
config = Config(env_path)
          

          
OPENAI_API_KEY = config("OPENAI_API_KEY")
          

          
open_ai_client = OpenAI(api_key=OPENAI_API_KEY)
          

          

          
# ------- user -------
          
user_router = APIRouter()
          

          
@user_router.post("/register", response_model=UserRead)
          
async def register_user(
          
    user: UserCreate, 
          
    db: AsyncSession = Depends(get_session)
          
):
          
    hashed_password = get_password_hash(user.password)
          
    user_data = user.dict()
          
    user_data["hashed_password"] = hashed_password
          
    del user_data["password"]
          
    
          
    new_user = await crud_users.create(
          
        db, 
          
        object=UserCreateInternal(**user_data)
          
    )
          
    return new_user
          

          
@user_router.post("/login", response_model=Token)
          
async def login_user(
          
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
          
    db: AsyncSession = Depends(get_session)
          
):
          
    user = await authenticate_user(
          
        username_or_email=form_data.username, 
          
        password=form_data.password, 
          
        db=db
          
    )
          
    if not user:
          
        raise HTTPException(status_code=400, detail="Invalid credentials")
          
    
          
    access_token_expires = timedelta(minutes=30)
          
    access_token = await create_access_token(
          
        data={"sub": user["username"]}, 
          
        expires_delta=access_token_expires
          
    )
          
    return {"access_token": access_token, "token_type": "bearer"}
          

          

          
# ------- email -------
          
email_router = APIRouter()
          

          
@email_router.post("/", response_model=EmailResponse)
          
async def generate_email(
          
    request: EmailRequest, 
          
    db: AsyncSession = Depends(get_session),
          
    current_user: dict = Depends(get_current_user)
          
):
          
    try:
          
        system_prompt = f"""
          
        You are a helpful email assistant. 
          
        You get a prompt to write an email,
          
        you reply with the email and nothing else.
          
        """
          
        prompt = f"""
          
        Write an email based on the following input:
          
        - User Input: {request.user_input}
          
        - Reply To: {request.reply_to if request.reply_to else 'N/A'}
          
        - Context: {request.context if request.context else 'N/A'}
          
        - Length: {request.length if request.length else 'N/A'} characters
          
        - Tone: {request.tone if request.tone else 'N/A'}
          
        """
          
        
          
        response = open_ai_client.chat.completions.create(
          
            model="gpt-3.5-turbo",
          
            messages=[
          
                {"role": "system", "content": system_prompt},
          
                {"role": "user", "content": prompt}
          
            ],
          
            max_tokens=request.length
          
        )
          
        generated_email = response.choices[0].message.content
          
        log_entry = EmailLogCreate(
          
            user_id=current_user['id'],
          
            user_input=request.user_input,
          
            reply_to=request.reply_to,
          
            context=request.context,
          
            length=request.length,
          
            tone=request.tone,
          
            generated_email=generated_email,
          
        )
          
        await crud_email_logs.create(db, log_entry)
          
        return EmailResponse(generated_email=generated_email)
          
    except Exception as e:
          
        raise HTTPException(status_code=500, detail=str(e))
          

          

          
# ------- email log -------
          
log_router = APIRouter()
          

          
@log_router.get("/")
          
async def read_logs(
          
    db: AsyncSession = Depends(get_session),
          
    current_user: dict[str, Any] = Depends(get_current_user)
          
):
          
    logs = await crud_email_logs.get_multi(db, user_id=current_user["id"])
          
    return logs
          

          
@log_router.get("/{log_id}", response_model=EmailLogRead)
          
async def read_log(
          
    log_id: int, 
          
    db: AsyncSession = Depends(get_session),
          
    current_user: dict[str, Any] = Depends(get_current_user)
          
):
          
    log = await crud_email_logs.get(db, id=log_id, user_id=current_user["id"])
          
    if not log:
          
        raise HTTPException(status_code=404, detail="Log not found")
          
    return log
      

参考文献

[1] https://github.com/igorbenav/fastcrud?tab=readme-ov-file

0
0
0
0
关于作者
相关资源
DataSail CDC 数据整库实时入仓入湖实践
在线数据库数据导入到数仓分析的链路已经存在多年,随着近年来实时计算的发展,业务希望有延迟更低、运维更便捷、效率更高的CDC同步通道。本次分享主要介绍DataSail实现CDC整库实时同步的技术方案和业务实践。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论