AI应用中基于语音驱动的RAG如何做?

向量数据库大模型智能语音交互

介绍

在当今快节奏的世界中,信息触手可及,我们经常需要寻找复杂问题的答案。但是,如果您只需对设备提问,就能通过语音获得简洁准确的回答呢?

这就是语音驱动的(音频)RAG的出现,它是一种将语音命令的便利性与问答智能相结合的创新系统。

在这里,用户可以通过语音命令与知识库进行互动。通过利用语音识别和自然语言处理的最新进展,Audio RAG 能够无缝地将您的语音问题转换为文本,进而从庞大的知识库中检索相关信息。

Audio RAG 的工作原理

1.录制音频 :系统首先使用麦克风录制您的语音。只需提出问题,Audio RAG 就会捕捉您的音频输入。 2.语音转换为文本 :一旦音频被录制,它将通过语音识别引擎进行处理。这个强大的工具会分析您的语音声学特性,并将其转换为文本格式,同时保留问题的含义和上下文。 3.执行问答 :在获得转录文本后,Audio RAG 利用先进的语言模型来理解问题背后的含义和意图。然后,它会搜索庞大的知识库,检索最相关的信息,为您提供简洁准确的答案。 4.文本转换为语音 :LLM 生成的回答将被转换为音频文件。

使用的技术栈

1.GROQ :这是一个快速的AI推理技术,由LPU™ AI推理技术提供支持,能够提供快速、经济且高效的AI。 2.gTTS :gTTS(Google Text-to-Speech)是一个Python库和CLI工具,允许您与Google翻译的文本转语音API进行接口操作。它使您能够将文本转换为语音并保存为MP3文件。 3.pydub :pydub是一个Python库,它提供了一个简单易用的高级接口,用于操作音频文件。它允许您对音频数据执行各种操作。 4.LangChain :这是一个用于大语言模型应用开发的框架。 5.HuggingFace嵌入模型 :Hugging Face 是自然语言处理(NLP)和人工智能(AI)领域的一个著名平台,以其广泛的预训练模型和用于构建利用这些模型的应用程序的工具而闻名。Hugging Face 提供的一个关键功能是生成嵌入,这些嵌入是文本数据的密集向量表示。它们可用于各种任务,包括语义搜索、文本分类和聚类。 6.Streamlit :Streamlit是一个开源Python框架,旨在快速轻松地构建和分享交互式数据应用程序。 7.ChromaDB :ChromaDB 是一个开源的向量数据库,旨在促进向量嵌入的存储和检索,使其特别适用于涉及大语言模型(LLMs)和语义搜索的应用程序。

picture.image

启用音频的RAG

代码实现

安装所需依赖


            
!pip install langchain langchain_community langchain_groq chromadb sentence_transformers
            
!pip install -U langchain-huggingface
            
!pip install -U langchain-chroma
            
!pip install python-dotenv
        

创建 .env 文件并设置您的 Groq API 密钥


        
            

          GROQ\_API\_KEY=<你的API密钥>
        
      

设置 Groq API 密钥


          
import os
          
from dotenv import load_dotenv
          

          
# 从 .env 文件加载环境变量
          
load_dotenv()
      

设置 LLM


          
llm = ChatGroq(
          
    model_name="llama3-70b-8192",
          
    temperature=0.1,
          
    max_tokens=1000,
          
)
      

设置嵌入模型


          
from langchain_huggingface import HuggingFaceEmbeddings
          

          
model_name = "BAAI/bge-small-en-v1.5"
          
model_kwargs = {"device": "cpu"}
          
encode_kwargs = {"normalize_embeddings": False}
          
embeddings = HuggingFaceEmbeddings(
          
    model_name=model_name,
          
    model_kwargs=model_kwargs,
          
    encode_kwargs=encode_kwargs
          
)
      

设置文本分割辅助函数


          
# 文本分割器
          
def text_splitter():
          
    text_splitter = RecursiveCharacterTextSplitter(
          
        chunk_size=512,
          
        chunk_overlap=20,
          
        length_function=len,
          
    )
          
    return text_splitter
      

设置 RetrievalQA 辅助函数


          
# RetrievalQA
          
def answer_question(question):
          
    retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
          
    qa = RetrievalQA.from_chain_type(
          
        llm=llm,
          
        chain_type="stuff",
          
        retriever=retriever,
          
        return_source_documents=True
          
    )
          
    result = qa.invoke({"query": question})
          
    return result['result']
      

使用 Groq distil-whisper-large-v3-en 转录音频的辅助函数

distil-whisper-large-v3-en:Distil-Whisper English 是 OpenAI Whisper 模型的一个压缩版本,旨在提供更快、成本更低的英语语音识别,同时保持相当的准确性。

•支持语言:仅限英语


          
import os
          
from groq import Groq
          

          
# 初始化 Groq 客户端
          
client = Groq()
          

          
# 指定音频文件的路径
          
filename = "/content/recorded_audio.wav"
          

          
def transcribe_audio(filename):
          
    # 打开音频文件
          
    with open(filename, "rb") as file:
          
        # 创建音频文件的转录
          
        transcription = client.audio.transcriptions.create(
          
            file=(filename, file.read()),  # 必需的音频文件
          
            model="distil-whisper-large-v3-en",  # 必需的用于转录的模型
          
            prompt="Specify context or spelling",  # 可选的上下文或拼写提示
          
            response_format="json",  # 可选的响应格式
          
            language="en",  # 可选的语言
          
            temperature=0.0  # 可选的温度参数
          
        )
          
        # 打印转录文本
          
        print(transcription.text)
          
    return transcription.text
      

将文本转换为语音的辅助函数


          
from gtts import gTTS
          

          
def text_to_audio(text):
          
    # 将文本转换为语音
          
    tts = gTTS(text=text, lang='en', slow=False)
          
    
          
    # 将音频保存为MP3文件
          
    mp3_file = "temp_audio.mp3"
          
    tts.save(mp3_file)
          
    
          
    return mp3_file
      

为 Streamlit 应用程序准备的完整代码实现.

请编写一个名为 audio_rag.py 的 Python 脚本。


          
import streamlit as st
          
from time import sleep
          
#from st_audiorec import st_audiorec
          
from streamlit_mic_recorder import mic_recorder
          
from streamlit_chat import message
          
import os
          
from groq import Groq
          
from langchain_groq import ChatGroq
          
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
          
from langchain.text_splitter import RecursiveCharacterTextSplitter
          
from langchain.document_loaders import PyPDFLoader
          
from langchain_chroma import Chroma
          
from langchain.chains import RetrievalQA
          
from chromadb.config import Settings
          
import chromadb
          
from gtts import gTTS
          
from pydub import AudioSegment
          
import os
          
from dotenv import load_dotenv
          

          
# 从 .env 文件加载环境变量
          
load_dotenv()
          
设置 Chroma 配置
          

          
chroma_setting = Settings(anonymized_telemetry=False)
          
获取最新的文件路径
          

          
def newest(path):
          
    files = os.listdir(path)
          
    paths = [os.path.join(path, basename) for basename in files]
          
    newest_file_path = max(paths, key=os.path.getctime)
          
    return os.path.basename(newest_file_path)
          
文本转换为音频的辅助函数
          

          
def text_to_audio(text):
          
    # 将文本转换为语音
          
    tts = gTTS(text=text, lang='en', slow=False)
          
    
          
    # 将音频保存为MP3文件
          
    mp3_file = "temp_audio.mp3"
          
    tts.save(mp3_file)
          
    
          
    return mp3_file
          
保存上传的文件
          

          
def save_uploaded_file(uploaded_file, directory):
          
    try:
          
        with open(os.path.join(directory, uploaded_file.name), "wb") as f:
          
            f.write(uploaded_file.getbuffer())
          
        return st.success(f"已保存文件:{uploaded_file.name} 到 {directory}")
          
    except Exception as e:
          
        return st.error(f"保存文件出错:{e}")
          
创建用于保存上传文件的目录
          

          
upload_dir = "uploaded_files"
          
os.makedirs(upload_dir, exist_ok=True)
          
设置 LLM
          

          
llm = ChatGroq(model_name="llama3-70b-8192",
          
    temperature=0.1,
          
    max_tokens=1000,
          
)
          
设置嵌入模型
          

          
model_name ="BAAI/bge-small-en-v1.5"
          
model_kwargs ={"device":"cpu"}
          
encode_kwargs ={"normalize_embeddings":False}
          
embeddings = HuggingFaceBgeEmbeddings(model_name=model_name,
          
                                   model_kwargs=model_kwargs,
          
                                   encode_kwargs=encode_kwargs)
          
设置文本分割器
          

          
def text_splitter():
          
  text_splitter = RecursiveCharacterTextSplitter(
          
      chunk_size=512,
          
      chunk_overlap=20,
          
      length_function=len,
          
  )
          
  return text_splitter
          
设置 RetrievalQA
          

          
def answer_question(question,vectorstore):
          
  retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
          
  qa = RetrievalQA.from_chain_type(llm=llm,
          
                                   chain_type="stuff",
          
                                   retriever=retriever,
          
                                   return_source_documents=True)
          
  result = qa.invoke({"query": question})
          
  return result['result']
          
初始化 Groq 客户端
          

          
groq_client = Groq()
          
指定音频文件的路径
          

          
filename = "recorded_audio.wav"
          
转录音频的辅助函数
          

          
def transcribe_audio(filename):
          
  # 打开音频文件
          
  with open(filename, "rb") as file:
          
      # 创建音频文件的转录
          
      transcription = groq_client.audio.transcriptions.create(
          
        file=(filename, file.read()), # 必需的音频文件
          
        model="distil-whisper-large-v3-en", # 必需的用于转录的模型
          
        prompt="指定上下文或拼写",  # 可选
          
        response_format="json",  # 可选
          
        language="en",  # 可选
          
        temperature=0.0  # 可选
          
      )
          
      # 打印转录文本
          
      print(transcription.text)
          
  return transcription.text
          
初始化 session state 变量
          

          
if 'stop' not in st.session_state:
          
    st.session_state.stop = False
          
设置页面配置
          

          
st.set_page_config(
          
    page_title="音频和书籍应用程序",
          
    page_icon="📚",  # 您可以使用表情符号或图片URL
          
    layout="wide"
          
)
          
创建两列布局
          

          
col1, col2 = st.columns([1, 2])  # 调整比例控制列的宽度
          
在左列显示内容
          

          
with col1:
          
    st.markdown(
          
        """
          
        <h1 style='text-align: center;'>
          
            🎧 启用音频的 📚 知识应用程序
          
        </h1>
          
        <h5 style='text-align: center;'>
          
            您的音频问答系统一站式解决方案!
          
        </h5>
          
        """,
          
        unsafe_allow_html=True
          
    )
          
    st.write("欢迎使用启用音频的 RAG 应用程序!")
          
    st.image("audio.jpeg", caption="音频驱动的 RAG", output_format="auto")
          
    
          
    if st.button("停止进程"):
          
        st.session_state.stop = True  # 设置停止标志为 True
          

          
    if st.session_state.stop:
          
        st.write("进程已停止。您可以刷新页面重新开始。")
          
在右列显示 PDF 上传和阅读器
          

          
with col2:
          
    st.title("PDF 上传和阅读器")
          
    uploaded_file = st.file_uploader("选择一个 PDF 文件", type="pdf")
          
    
          
    persist_directory_path = "chromanew"
          
    if uploaded_file is not None:
          
        save_uploaded_file(uploaded_file, upload_dir)
          
        file_name = uploaded_file.name
          
        loader = PyPDFLoader(f"uploaded_files/{file_name}")
          
        pages = loader.load_and_split(text_splitter())
          
        persist_directory = persist_directory_path + "_" + file_name.split(".")[0]
          
        if os.path.exists(persist_directory):
          
            client = chromadb.PersistentClient(path=persist_directory, settings=chroma_setting)
          
            vectorstore = Chroma(embedding_function=embeddings,
          
                                 client=client,
          
                                 persist_directory=persist_directory,
          
                                 collection_name=file_name.split(".")[0],
          
                                 client_settings=chroma_setting,
          
                                )
          
            print(f"向量存储中加载的文档数量:{len(vectorstore.get()['documents'])}")
          
        else:
          
            client = chromadb.PersistentClient(path=persist_directory, settings=chroma_setting)
          
            vectorstore = Chroma(embedding_function=embeddings,
          
                                 client=client,
          
                                 persist_directory=persist_directory,
          
                                 collection_name=file_name.split(".")[0],
          
                                 client_settings=chroma_setting
          
                                )
          
            MAX_BATCH_SIZE = 100
          
            for i in range(0, len(pages), MAX_BATCH_SIZE):
          
                i_end = min(len(pages), i+MAX_BATCH_SIZE)
          
                batch = pages[i:i_end]
          
                vectorstore.add_documents(batch)
          
            print(f"向量存储中加载的文档数量:{len(vectorstore.get()['documents'])}")
          
创建一个按钮来启动进程
          

          
if 'start_process' not in st.session_state:
          
    st.session_state.start_process = False
          

          
if st.button("启动进程"):
          
    st.session_state.start_process = True
          

          
if st.session_state.start_process:
          
    options = os.listdir("uploaded_files")
          
    none_list = ["none"]
          
    options += none_list
          
    selected_option = st.selectbox("选择一个选项:", options)
          
    file_name = newest("uploaded_files") if selected_option == "none" else selected_option
          
    st.write(f"您选择了:{selected_option}")
          
    st.title("音频录制 - 根据所选选项提问")
          

          
    with st.spinner("正在进行音频录制..."):
          
        audio = mic_recorder(
          
            start_prompt="开始录制",
          
            stop_prompt="停止录制",
          
            just_once=False,
          
            key='recorder'
          
        )
          
        if audio:
          
            st.audio(audio['bytes'], format='audio/wav')
          
            with open("recorded_audio.wav", "wb") as f:
          
                f.write(audio['bytes'])
          
            st.success("音频录制完成!")
          
        
          
    with st.spinner("正在进行音频转录..."):
          
        text = transcribe_audio(filename)
          
        transcription = text
          
        st.markdown(text)
          
    
          
    if "chat_history" not in st.session_state:
          
        st.session_state.chat_history = []
          
    
          
    for i, chat in enumerate(st.session_state.chat_history):
          
        message(chat["question"], is_user=True, key=f"question_{i}")
          
        message(chat["response"], is_user=False, key=f"response_{i}")
          
    
          
    if transcription:
          
        with st.spinner("正在生成回应..."):
          
            persist_directory = persist_directory_path + "_" + file_name.split(".")[0]
          
            client = chromadb.PersistentClient(path=persist_directory, settings=chroma_setting)
          
            vectorstore = Chroma(embedding_function=embeddings,
          
                                client=client,
          
                                persist_directory=persist_directory,
          
                                collection_name=file_name.split(".")[0],
          
                                client_settings=chroma_setting
          
                                )
          
            response = answer_question(transcription, vectorstore)
          
            st.success("回应已生成")
          
        
          
        aud_file = text_to_audio(response)
          
        st.session_state.chat_history.append({"question": transcription, "response": response})
          
        message(transcription, is_user=True)
          
        message(response, is_user=False)
          
        
          
        st.title("音频播放")
          
        st.audio(aud_file, format='audio/wav', start_time=0)
      

运行 Streamlit 应用程序


        
            

          streamlit run audio\_rag.py
        
      

应用程序处理快照

1.浏览并上传 PDF 文件

picture.image

picture.image

picture.image

picture.image

2.现在向量存储已经加载,点击“启动进程”按钮。

picture.image

3.开始录制您的问题。一旦问题录制完成,RAG 过程将被触发。应用程序屏幕上会显示相应的回答以及对应的音频。

picture.image

4.如果您想在已上传的文档上进行问答,请选择知识来源选择框中的选项。

picture.image

picture.image

5.上传另一个文档。如果您想在当前上传的文档中进行问答,请选择“无”选项。

picture.image

picture.image

6.按下停止按钮以停止进程。

picture.image

7.刷新屏幕以重新开始。

picture.image

结论

在本文中,我们探讨了使用Python开发音频驱动的问答系统(Audio RAG)。我介绍了其中的关键步骤,包括录制音频、将语音转换为文本以及使用RAG模型进行问答。通过结合这些技术,我们可以创建一个强大的系统,使用户能够使用语音命令与知识库进行交互。

欢迎尝试使用不同的库和模型,进一步增强Audio RAG系统的功能。祝您编码愉快!

更多信息

本文由山行翻译整理自: https://medium.com/the-ai-forum/voice-driven-rag-a962754db488,如对您有帮助,请帮忙点赞、转发、关注,谢谢!

0
0
0
0
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论