
全网最详细的Spring入门教程
在上一篇文章中,我们深入探讨了使用 LangChain 进行检索增强生成 (RAG) 的基础知识。我们探索了核心概念,构建了一个基本的 RAG 系统,并在 Jupyter 笔记本环境中演示了其功能。虽然这种方法非常适合原型设计和理解底层机制,但它还没有完全准备好用于实际应用。(但它并不完全适用于实际应用。)
今天,我们采取了下一个关键步骤:将我们的 RAG 原型转变为生产就绪型 API。我们将使用 FastAPI,这是一个现代、快速(高性能)的 Web 框架,用于使用 Python 构建 API。FastAPI 特别适合我们的需求,因为它的速度、易用性和对异步编程的内置支持。(FastAPI因其速度、易用性和对异步编程的内置支持而特别适合我们的需求。)
在本教程中,我们将创建一个强大的 API,它提供以下功能:
我们将以模块化、可维护的方式构建我们的应用程序,使其易于在生产环境中扩展和部署。
在我们深入研究之前,请确保您拥有以下内容:
首先,让我们设置我们的项目环境。为您的项目创建一个新目录并导航到该目录:
mkdir rag-fastapi-project
cd rag-fastapi-project
现在,让我们安装必要的软件包。创建包含以下内容的文件:requirements.txt
langchain
langchain-openai
langchain-core
langchain_community
docx2txt
pypdf
langchain_chroma
python-multipart
fastapi
uvicorn
使用 pip 安装这些软件包:
pip install -r requirements.txt
设置好环境后,我们就可以开始构建生产就绪的 RAG 聊天机器人 API。在下一节中,我们将深入研究项目结构并开始实现我们的 FastAPI 应用程序。
当然!让我们继续下一部分,在那里我们将讨论项目结构概述。本节将帮助读者了解我们如何组织代码以提高可维护性和可伸缩性。(当然!让我们进入下一部分,我们将讨论项目结构概述。这一节将帮助读者了解我们如何组织代码以实现更好的可维护性和可扩展性。)
在从原型向生产就绪型应用过渡的过程中,合理的代码组织变得至关重要。结构良好的项目更易于维护、测试和扩展。对于我们的RAG聊天机器人API,我们将采用模块化结构,以分离关注点并促进代码重用。
以下是我们项目结构的概述:
rag-fastapi-project/
│
├── main.py
├── chroma_utils.py
├── db_utils.py
├── langchain_utils.py
├── pydantic_models.py
├── requirements.txt
└── chroma_db/ (directory for Chroma persistence)
让我们分解每个文件的用途:
此结构遵循 FastAPI 应用程序的最佳实践,并为构建我们的 RAG 聊天机器人 API 提供了坚实的基础。在本教程中,我们将深入研究这些文件,解释它们的内容以及它们如何协同工作以创建我们的生产就绪系统。
main.py 文件是我们 FastAPI 应用程序的核心。它定义了我们的 API 端点并编排了我们系统的不同组件之间的交互。让我们分解此文件的关键元素:(main.py文件是我们FastAPI应用的核心。它定义了我们的API端点,并协调我们系统不同组件之间的交互。让我们分解此文件的关键元素:)
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic_models import QueryInput, QueryResponse, DocumentInfo, DeleteFileRequest
from langchain_utils import get_rag_chain
from db_utils import insert_application_logs, get_chat_history, get_all_documents, insert_document_record, delete_document_record
from chroma_utils import index_document_to_chroma, delete_doc_from_chroma
import os
import uuid
import logging
import shutil
# Set up logging
logging.basicConfig(filename='app.log', level=logging.INFO)
# Initialize FastAPI app
app = FastAPI()
在这里,我们导入必要的模块并初始化我们的 FastAPI 应用程序。我们还设置了基本日志记录来跟踪应用程序中的重要事件。
现在,让我们看看我们的主要 API 端点:
@app.post("/chat", response_model=QueryResponse)
def chat(query_input: QueryInput):
session_id = query_input.session_id or str(uuid.uuid4())
logging.info(f"Session ID: {session_id}, User Query: {query_input.question}, Model: {query_input.model.value}")
chat_history = get_chat_history(session_id)
rag_chain = get_rag_chain(query_input.model.value)
answer = rag_chain.invoke({
"input": query_input.question,
"chat_history": chat_history
})['answer']
insert_application_logs(session_id, query_input.question, answer, query_input.model.value)
logging.info(f"Session ID: {session_id}, AI Response: {answer}")
return QueryResponse(answer=answer, session_id=session_id, model=query_input.model)
此终端节点处理聊天交互。如果未提供,它会生成会话 ID,检索聊天记录,调用 RAG 链以生成响应,记录交互并返回响应。(此端点处理聊天交互。如果未提供会话ID,则生成一个会话ID,检索聊天历史,调用RAG链生成响应,记录交互,并返回响应。)
@app.post("/upload-doc")
def upload_and_index_document(file: UploadFile = File(...)):
allowed_extensions = ['.pdf', '.docx', '.html']
file_extension = os.path.splitext(file.filename)[1].lower()
if file_extension not in allowed_extensions:
raise HTTPException(status_code=400, detail=f"Unsupported file type. Allowed types are: {', '.join(allowed_extensions)}")
temp_file_path = f"temp_{file.filename}"
try:
# Save the uploaded file to a temporary file
with open(temp_file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
file_id = insert_document_record(file.filename)
success = index_document_to_chroma(temp_file_path, file_id)
if success:
return {"message": f"File {file.filename} has been successfully uploaded and indexed.", "file_id": file_id}
else:
delete_document_record(file_id)
raise HTTPException(status_code=500, detail=f"Failed to index {file.filename}.")
finally:
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
此终端节点处理文档上传。它会检查允许的文件类型,临时保存文件,在 Chroma 中为其编制索引,并更新数据库中的文档记录。
@app.get("/list-docs", response_model=list[DocumentInfo])
def list_documents():
return get_all_documents()
这个简单的端点返回所有索引文档的列表。
@app.post("/delete-doc")
def delete_document(request: DeleteFileRequest):
chroma_delete_success = delete_doc_from_chroma(request.file_id)
if chroma_delete_success:
db_delete_success = delete_document_record(request.file_id)
if db_delete_success:
return {"message": f"Successfully deleted document with file_id {request.file_id} from the system."}
else:
return {"error": f"Deleted from Chroma but failed to delete document with file_id {request.file_id} from the database."}
else:
return {"error": f"Failed to delete document with file_id {request.file_id} from Chroma."}
此端点处理文档删除,从 Chroma 和数据库中删除文档。
Pydantic是一个数据验证库,它使用Python类型注解来定义数据模式。在我们的FastAPI应用中,我们使用Pydantic模型来定义请求和响应数据的结构。让我们分解在models.py中定义的模型:
from pydantic import BaseModel, Field
from enum import Enum
from datetime import datetime
class ModelName(str, Enum):
GPT4_O = "gpt-4o"
GPT4_O_MINI = "gpt-4o-mini"
class QueryInput(BaseModel):
question: str
session_id: str = Field(default=None)
model: ModelName = Field(default=ModelName.GPT4_O_MINI)
class QueryResponse(BaseModel):
answer: str
session_id: str
model: ModelName
class DocumentInfo(BaseModel):
id: int
filename: str
upload_timestamp: datetime
class DeleteFileRequest(BaseModel):
file_id: int
让我们看看每个模型及其用途:
ModelName
(枚举):
QueryInput
:
question
:用户的问题 (必填)。session_id
:可选会话 ID。如果未提供,将生成一个。model
:要使用的语言模型,默认为 GPT4_O_MINI。QueryResponse
:
answer
:生成的答案。session_id
:会话 ID(用于继续对话)。model
:用于生成响应的模型。DocumentInfo
:
id
:文档的唯一标识符。filename
:上传文件的名称。upload_timestamp
:上传文档并为其编制索引的时间。DeleteFileRequest
:
file_id
:要删除的文档的 ID。在我们的 main.py 中,我们使用这些模型来定义请求和响应数据的形状。例如:
@app.post("/chat", response_model=QueryResponse)
def chat(query_input: QueryInput):
# Function implementation
在这里,FastAPI 用于验证传入的请求数据,以及验证和序列化响应。这可确保我们的 API 行为一致,并在提供无效数据时提供清晰的错误消息。(在这里,FastAPI使用Pydantic模型来验证传入的请求数据,并验证和序列化响应。这确保了我们的API行为一致,并在提供无效数据时提供清晰的错误消息,QueryInput和QueryResponse)QueryInputQueryResponse
随着我们的API不断发展,我们可以轻松地扩展这些模型。例如,如果我们想为我们的文档信息添加更多元数据,只需向模型中添加字段即可:DocumentInfo
class DocumentInfo(BaseModel):
id: int
filename: str
upload_timestamp: datetime
file_size: int # New field
content_type: str # New field
FastAPI 和 Pydantic 将自动处理新字段,提供验证和文档,而无需对我们的端点逻辑进行任何更改。
通过使用 Pydantic 模型,我们为 API 创建了强大的基础,确保了数据的完整性,并为我们的端点提供了清晰的契约。这种方法显著减少了我们需要编写的手动验证代码的数量,并有助于防止与错误数据处理相关的漏洞。
(utils.py
文件包含用于与我们的 SQLite 数据库交互的函数。我们使用 SQLite 是因为它的简单性和易于设置,使其非常适合原型设计和中小型应用程序。让我们分解此文件的关键组件:db_
utils.py
文件包含与SQLite数据库交互的函数。我们选择SQLite是因为其简单性和易于设置的特性,使其非常适合原型设计和中小型应用程序。让我们分解该文件的关键组件:)
import sqlite3
from datetime import datetime
DB_NAME = "rag_app.db"
def get_db_connection():
conn = sqlite3.connect(DB_NAME)
conn.row_factory = sqlite3.Row
return conn
我们首先导入必要的模块并定义数据库名称。get_db_connection()
函数用于创建与SQLite数据库的连接,并将行工厂设置为sqlite3.Row
,以便更容易地访问数据。
def create_application_logs():
conn = get_db_connection()
conn.execute('''CREATE TABLE IF NOT EXISTS application_logs
(id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
user_query TEXT,
gpt_response TEXT,
model TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
conn.close()
def create_document_store():
conn = get_db_connection()
conn.execute('''CREATE TABLE IF NOT EXISTS document_store
(id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT,
upload_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
conn.close()
这些函数创建了我们的两个主要表:
application_logs
:存储聊天历史记录和模型响应。document_store
:跟踪上传的文档。def insert_application_logs(session_id, user_query, gpt_response, model):
conn = get_db_connection()
conn.execute('INSERT INTO application_logs (session_id, user_query, gpt_response, model) VALUES (?, ?, ?, ?)',
(session_id, user_query, gpt_response, model))
conn.commit()
conn.close()
def get_chat_history(session_id):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT user_query, gpt_response FROM application_logs WHERE session_id = ? ORDER BY created_at', (session_id,))
messages = []
for row in cursor.fetchall():
messages.extend([
{"role": "human", "content": row['user_query']},
{"role": "ai", "content": row['gpt_response']}
])
conn.close()
return messages
这些函数处理插入新的聊天日志和检索给定会话的聊天历史记录。聊天记录的格式设置为我们的 RAG 系统易于使用。
def insert_document_record(filename):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO document_store (filename) VALUES (?)', (filename,))
file_id = cursor.lastrowid
conn.commit()
conn.close()
return file_id
def delete_document_record(file_id):
conn = get_db_connection()
conn.execute('DELETE FROM document_store WHERE id = ?', (file_id,))
conn.commit()
conn.close()
return True
def get_all_documents():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, filename, upload_timestamp FROM document_store ORDER BY upload_timestamp DESC')
documents = cursor.fetchall()
conn.close()
return [dict(doc) for doc in documents]
这些函数处理文档记录的 CRUD 操作:
在文件末尾,我们初始化我们的数据库表:
# Initialize the database tables
create_application_logs()
create_document_store()
这可确保在应用程序启动时创建我们的表(如果它们尚不存在)。(这确保了当应用程序启动时(如果表尚不存在),我们的表会被创建。)
通过将数据库操作集中在utils.py
中,我们保持了关注点的清晰分离。我们的主要应用程序逻辑无需担心数据库交互的细节,从而使代码更加模块化且易于维护。
在生产环境中,您可以考虑使用像 SQLAlchemy 这样的 ORM(对象关系映射)库来实现更复杂的数据库操作和更好的可扩展性。但是,对于我们当前的需求,这种简单的 SQLite 实现效果很好。(在生产环境中,对于更复杂的数据库操作和更好的可扩展性,您可能会考虑使用像SQLAlchemy这样的ORM(对象关系映射)库。然而,对于我们当前的需求,这种直接的SQLite实现已经足够好了。)
utils.py
文件包含与Chroma向量存储交互的函数,这对于我们的检索增强型生成(RAG)系统的检索功能至关重要。让我们分解该文件的关键组件:
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, UnstructuredHTMLLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from typing import List
from langchain_core.documents import Document
import os
# Initialize text splitter and embedding function
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len)
embedding_function = OpenAIEmbeddings()
# Initialize Chroma vector store
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embedding_function)
在这里,我们导入了必要的模块,并初始化了文本分割器、嵌入函数和Chroma向量存储。RecursiveCharacterTextSplitter
用于将文档分割成可管理的块,而OpenAIEmbeddings
则提供了我们的文档嵌入函数。
def load_and_split_document(file_path: str) -> List[Document]:
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_path.endswith('.docx'):
loader = Docx2txtLoader(file_path)
elif file_path.endswith('.html'):
loader = UnstructuredHTMLLoader(file_path)
else:
raise ValueError(f"Unsupported file type: {file_path}")
documents = loader.load()
return text_splitter.split_documents(documents)
此函数处理加载不同的文档类型(PDF、DOCX、HTML)并将它们拆分为块。它根据文件扩展名使用适当的加载器,然后应用我们的文本拆分器来创建可管理的文档块。
def index_document_to_chroma(file_path: str, file_id: int) -> bool:
try:
splits = load_and_split_document(file_path)
# Add metadata to each split
for split in splits:
split.metadata['file_id'] = file_id
vectorstore.add_documents(splits)
return True
except Exception as e:
print(f"Error indexing document: {e}")
return False
此函数采用文件路径和文件 ID,加载并拆分文档,将元数据(文件 ID)添加到每个拆分中,然后将这些文档块添加到我们的 Chroma 矢量存储中。元数据允许我们将 vector store 条目链接回我们的数据库记录。(此函数接受文件路径和文件ID,加载并分割文档,为每个分割添加元数据(文件ID),然后将这些文档块添加到我们的Chroma向量存储中。元数据使我们能够将向量存储条目链接回我们的数据库记录。)
def delete_doc_from_chroma(file_id: int):
try:
docs = vectorstore.get(where={"file_id": file_id})
print(f"Found {len(docs['ids'])} document chunks for file_id {file_id}")
vectorstore._collection.delete(where={"file_id": file_id})
print(f"Deleted all documents with file_id {file_id}")
return True
except Exception as e:
print(f"Error deleting document with file_id {file_id} from Chroma: {str(e)}")
return False
此函数从 Chroma 矢量存储中删除与给定文件 ID 关联的所有文档块。它首先检索文档以确认其存在,然后执行删除。
尽管在utils.py
文件中没有明确显示,但Chroma向量存储对于我们的RAG系统的检索步骤至关重要。在utils.py
中,我们使用此向量存储来创建一个检索器:
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
然后,在我们的 RAG 链中使用此检索器,以根据用户的查询获取相关的文档块。
通过将我们的向量存储操作集中在utils.py
中,我们保持了关注点的清晰分离,并在未来需要时更容易替换或升级我们的向量存储实现。
utils.py
文件是我们使用LangChain实现检索增强型生成(RAG)系统核心的地方。此文件设置了语言模型、检索器和RAG链。让我们分解其关键组件:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from typing import List
from langchain_core.documents import Document
import os
from chroma_utils import vectorstore
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
output_parser = StrOutputParser()
在这里,我们导入必要的 LangChain 组件,并使用我们之前创建的 Chroma vectorstore 设置我们的检索器。我们还初始化了一个字符串输出解析器,用于处理语言模型的输出。
contextualize_q_system_prompt = (
"Given a chat history and the latest user question "
"which might reference context in the chat history, "
"formulate a standalone question which can be understood "
"without the chat history. Do NOT answer the question, "
"just reformulate it if needed and otherwise return it as is."
)
contextualize_q_prompt = ChatPromptTemplate.from_messages([
("system", contextualize_q_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
qa_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful AI assistant. Use the following context to answer the user's question."),
("system", "Context: {context}"),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}")
])
我们定义了两个主要提示:
contextualize_q_prompt
:用于根据聊天记录重新构建用户的问题。qa_prompt
:用于根据检索到的上下文和聊天历史记录生成最终答案。def get_rag_chain(model="gpt-4o-mini"):
llm = ChatOpenAI(model=model)
history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_q_prompt)
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
return rag_chain
此函数创建我们的 RAG 链:
ChatOpenAI
在我们的main.py中,我们在聊天端点使用了这个RAG链:
@app.post("/chat", response_model=QueryResponse)
def chat(query_input: QueryInput):
# ... (other code)
rag_chain = get_rag_chain(query_input.model.value)
answer = rag_chain.invoke({
"input": query_input.question,
"chat_history": chat_history
})['answer']
# ... (rest of the function)
这显示了如何使用用户指定的模型实例化 RAG 链,以及如何使用用户的问题和聊天记录调用。
通过将我们的 LangChain 逻辑集中在 (通过将LangChain逻辑集中在utils.py中,我们保持了关注点的清晰分离,并使得未来修改或扩展RAG系统变得更加容易。这种模块化方法使我们能够轻松尝试不同的模型、检索器或链结构,而不会影响应用程序的其他部分。)utils.py
中,我们保持了清晰的关注点分离,并使得将来更容易修改或扩展我们的 RAG 系统。这种模块化方法使我们能够轻松地试验不同的模型、检索器或链结构,而不会影响应用程序的其余部分。langchain_
在本教程中,我们逐步了解了如何使用FastAPI和LangChain构建一个生产就绪的检索增强生成(RAG)聊天机器人。让我们回顾一下我们所完成的工作,并讨论一些关键收获和可能的下一步行动。
此架构允许可在生产环境中部署可扩展、可维护和可扩展的 RAG 系统。
构建生产就绪型 RAG 聊天机器人涉及的不仅仅是将语言模型连接到文档存储。它需要仔细考虑数据流、错误处理、可扩展性和用户体验。我们构建的系统提供了坚实的基础,可以进行调整和扩展以满足特定的业务需求。(我们所构建的系统提供了一个坚实的基础,可以根据特定的业务需求进行适应和扩展。)
随着AI和自然语言处理技术的不断发展,像这样的系统对于创建智能的、支持上下文感知的应用将变得越来越重要。通过了解RAG系统的原理和组件,您将在自己的项目中构建和改进这项技术方面做好准备。
请记住,成功的RAG系统的关键不仅在于各个组件本身,还在于它们如何协同工作以创建无缝、智能的交互。基于实际使用的持续测试、监控和精炼对于确保您的RAG聊天机器人的长期成功和有效性至关重要。
为了帮助您进一步了解和实施这个 RAG 聊天机器人系统,我准备了一些额外的资源:
在 FutureSmart AI,我们专注于根据您的特定需求构建定制的自然语言处理 (NLP) 解决方案。我们的专业知识不仅限于 RAG 系统,还包括:
我们已经成功地为各个行业实施了这些技术,帮助企业利用 AI 的强大功能来增强其运营和用户体验。
无论您是希望实施像我们在本教程中构建的 RAG 系统,还是有更具体的 NLP 需求,我们 FutureSmart AI 的团队都可以帮助您将 AI 愿望变为现实。(无论您是想实施像本教程中构建的RAG系统,还是有更具体的NLP需求,FutureSmart AI的团队都在这里帮助您将AI愿景变为现实。)
原文链接:https://blog.futuresmart.ai/building-a-production-ready-rag-chatbot-with-fastapi-and-langchain