所有文章 > AI驱动 > 基于 RAG 的 LLM 应用程序构建(一):详述数据源准备工作

基于 RAG 的 LLM 应用程序构建(一):详述数据源准备工作

TLDR

本指南将通过“Ray 技术小助手”的构建过程,为大家展示如何构建一个基于 RAGLLM 应用程序,并分享我们在此过程中遇到的挑战以及我们如何克服这些挑战。通过阅读本文,你将学习到如何:

  • 💻 从头开始开发基于检索增强生成 (RAG) 的 LLM 应用程序。
  • 🚀 在具有不同计算资源的多个工作器之间扩展主要工作负载(加载、分块、嵌入、索引、服务等)。
  • ✅ 评估我们应用程序的不同配置,以优化每个组件(例如 retrieval_score)和整体性能(quality_score)。
  • 🔀 在 OSS 和封闭 LLM 之间实现混合代理路由方法,以创建性能最高且最具成本效益的应用程序。
  • 📦 以高扩展性和高可用性的方式为应用程序提供服务。
  • 💡 了解微调、提示工程、词汇搜索、重新排名、数据飞轮等方法如何影响我们应用程序的性能。

本指南将分为3大部分:

  • 「准备工作」:这部分介绍了如何准备数据源,包括数据加载、内容处理、块数据创建、嵌入数据制作、向量数据库建立等步骤。
  • 「预处理」:这部分重点介绍了如何使用基于检索增强生成(RAG)的 LLM 来扩展应用程序的功能。具体来说,它描述了如何使用RAG技术来查询嵌入查询向量,并使用向量数据库检索前 k 个相关上下文。然后,将查询文本和检索到的上下文文本传递给 LLM,以生成响应。
  • 「跑模型」:这部分讨论了在生产环境中部署服务的注意事项,如应用程序的可扩展性和性能评估。它还提到了一些优化应用程序性能的方法,如微调、提示工程、词汇搜索、重新排名等。最后,它强调了评估和性能优化是开发此类应用程序的关键步骤,并分享了在实践中遇到的挑战及解决办法。

首先,本文将详细介绍该指南的第一部分——「准备工作」

概述

大型语言模型 (LLM) 无疑改变了我们与信息交互的方式。然而,它们在我们可以向其提出要求方面也存在相当大的局限性。

基础 LLM(例如 Llama-2-70b、gpt-4 等)仅知道它们训练过的信息,当我们要求它们知道超出此范围的信息时,它们就会力不从心。基于检索增强生成 (RAG) 的 LLM 应用程序解决了这一确切问题,并将 LLM 的实用性扩展到我们的特定数据源。

在本指南中,我们将构建一个基于 RAGLLM 应用程序,我们将在其中整合外部数据源以增强 LLM 的功能。具体来说,我们将构建一个可以回答有关 Ray (一个用于生产和扩展 ML 工作负载的 Python 框架)问题的助手。

这里的目标是让开发人员更容易采用 Ray,而且正如我们将在本指南中看到的那样,帮助改进我们的 Ray 文档本身并为其他 LLM 应用程序提供基础。我们还将分享我们在此过程中遇到的挑战以及我们如何克服这些挑战。

注意:我们已经概括了整个指南,以便可以轻松扩展它以在您自己的数据之上构建基于 RAGLLM 应用程序。

  1. 将查询传递给嵌入模型,以将其在语义上表示为嵌入查询向量。
  2. 将嵌入查询向量传递给我们的向量数据库。
  3. 检索前 k 个相关上下文 – 通过查询嵌入与我们知识库中所有嵌入块之间的距离来衡量。
  4. 将查询文本和检索到的上下文文本传递给我们的 LLM
  5. LLM 将使用提供的内容生成响应。

除了构建我们的 LLM 应用程序之外,我们还将专注于扩展和在生产中提供服务。与传统机器学习甚至监督式深度学习不同,从一开始,规模就是 LLM 应用程序的瓶颈。大型数据集、模型、计算密集型工作负载、服务要求等。随着我们周围的世界不断发展,我们将开发能够处理任何规模的应用程序。

我们还将专注于评估和性能。我们的应用程序涉及许多可变的部分:嵌入模型、分块逻辑、LLM 本身等,因此,重要的是我们要尝试不同的配置以优化最佳质量响应。但是,评估和定量比较生成任务的不同配置并非易事。我们将分解应用程序各个部分的评估(给定查询的检索、给定源的生成),还评估整体性能(端到端生成)并分享优化配置的结果。

注意:在本指南中,我们将尝试使用不同的 LLM(OpenAI、Llama 等)。您需要 OpenAI 凭据才能访问 ChatGPT 模型和 Anyscale Endpoints(可用的公共和私有终端)来提供 + 微调 OSS LLM

OpenAI 凭据:https://platform.openai.com/account/api-keysAnyscale Endpoints:https://www.anyscale.com/

向量数据库创建

在开始构建 RAG 应用程序之前,我们需要首先创建包含已处理数据源的向量数据库。

数据加载

我们将首先将 Ray 文档从网站加载到本地目录:Ray 文档:https://docs.ray.io/en/master/?

export EFS_DIR=/desired/output/directory
wget -e robots=off --recursive --no-clobber --page-requisites \
--html-extension --convert-links --restrict-file-names=windows \
--domains docs.ray.io --no-parent --accept=html \
-P $EFS_DIR https://docs.ray.io/en/master/

然后,我们将把文档内容加载到 Ray 数据集中,以便可以对它们执行大规模操作(例如嵌入、索引等)。对于大型数据源、模型和应用程序服务需求,规模是 LLM 应用程序的首要任务。我们希望以这样的方式构建我们的应用程序,使它们能够随着我们的需求增长而扩展,而无需我们稍后更改代码。Ray 数据集:https://docs.ray.io/en/latest/data/data.html?

# Ray dataset
DOCS_DIR = Path(EFS_DIR, "docs.ray.io/en/master/")
ds = ray.data.from_items([{"path": path} for path in DOCS_DIR.rglob("*.html")
if not path.is_dir()])
print(f"{ds.count()} documents")

章节

现在我们有了包含所有 html 文件路径的数据集,我们将开发一些可以适当地从这些文件中提取内容的函数。我们希望以一种通用的方式来执行此操作,以便我们可以在所有文档页面中执行此提取(这样您就可以将其用于您自己的数据源)。我们的流程是首先识别 html 页面中的章节,然后提取它们之间的文本。我们将所有这些保存到一个字典列表中,该字典将章节内的文本映射到具有章节锚点 ID 的特定 url。

sample_html_fp = Path(EFS_DIR, "docs.ray.io/en/master/rllib/rllib-env.html")
extract_sections({"path": sample_html_fp})[0]
{'source': 'https://docs.ray.io/en/master/rllib/rllib-env.html#environments', 'text': '\nEnvironments#\nRLlib works with several different types of environments, including Farama-Foundation Gymnasium, user-defined, multi-agent, and also batched environments.\nTip\nNot all environments work with all algorithms. Check out the algorithm overview for more information.\n'}

我们可以使用 Ray Data 的 flat_map 仅用一行代码将此提取过程(extract_section)并行应用于数据集中的所有文件路径。 flat_map:https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.flat_map.html?

# Extract sections
sections_ds = ds.flat_map(extract_sections)
sections = sections_ds.take_all()
print (len(sections))

块数据

我们现在有了一个章节列表(包含每个章节的文本和来源),但我们现在还不应该直接将其用作 RAG 应用程序的上下文。每个章节的文本长度各不相同,而且很多都是相当大的块。

如果我们使用这些大段文本,那么我们就会插入大量嘈杂/不需要的上下文,而且由于所有 LLM 都有最大上下文长度,我们无法容纳太多其他相关上下文。因此,我们将把每个部分中的文本拆分成较小的块。直观地说,较小的块将封装单个/几个概念,与较大的块相比,噪声较少。我们现在将选择一些典型的文本拆分值(例如,chunk_size=300)来创建我们的块,但稍后我们将尝试使用更广泛的值。

from langchain.document_loaders import ReadTheDocsLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Text splitter
chunk_size = 300
chunk_overlap = 50
text_splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", " ", ""],
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)

# Chunk a sample section
sample_section = sections_ds.take(1)[0]
chunks = text_splitter.create_documents(
texts=[sample_section["text"]],
metadatas=[{"source": sample_section["source"]}])
print (chunks[0])
page_content='ray.tune.TuneConfig.search_alg#\nTuneConfig.search_alg: Optional[Union[ray.tune.search.searcher.Searcher, ray.tune.search.search_algorithm.SearchAlgorithm]] = None#' metadata={'source': 'https://docs.ray.io/en/master/tune/api/doc/ray.tune.TuneConfig.search_alg.html#ray-tune-tuneconfig-search-alg'}

虽然对数据集进行分块相对较快,但让我们将分块逻辑包装到一个函数中,以便我们可以大规模应用工作负载,从而使分块速度与数据源的增长一样快:

def chunk_section(section, chunk_size, chunk_overlap):
text_splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", " ", ""],
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len)
chunks = text_splitter.create_documents(
texts=[sample_section["text"]],
metadatas=[{"source": sample_section["source"]}])
return [{"text": chunk.page_content, "source": chunk.metadata["source"]} for chunk in chunks]

# Scale chunking
chunks_ds = sections_ds.flat_map(partial(
chunk_section,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap))
print(f"{chunks_ds.count()} chunks")
chunks_ds.show(1)
5727 chunks
{'text': 'ray.tune.TuneConfig.search_alg#\nTuneConfig.search_alg: Optional[Union[ray.tune.search.searcher.Searcher, ray.tune.search.search_algorithm.SearchAlgorithm]] = None#', 'source': 'https://docs.ray.io/en/master/tune/api/doc/ray.tune.TuneConfig.search_alg.html#ray-tune-tuneconfig-search-alg'}

嵌入数据

现在我们已经从各个部分创建了小块,我们需要一种方法来识别与给定查询最相关的块。一种非常有效且快速的方法是使用预训练模型嵌入我们的数据,并使用相同的模型嵌入查询。然后,我们可以计算所有块嵌入和我们的查询嵌入之间的距离,以确定前 k 个块。有许多不同的预训练模型可供选择来嵌入我们的数据,但最受欢迎的模型可以通过 HuggingFace 的海量文本嵌入基准 (MTEB) 排行榜发现。这些模型通过诸如下一个/掩码标记预测等任务在非常大的文本语料库上进行了预训练,这使它们能够学习在 N 维中表示子标记并捕获语义关系。我们可以利用这一点来表示我们的数据并确定用于回答给定查询的最相关上下文。我们使用 Langchain 的嵌入包装器(HuggingFaceEmbeddings 和 OpenAIEmbeddings)轻松加载模型并嵌入我们的文档块。

注意:嵌入并不是确定更相关块的唯一方法。我们也可以使用 LLM 来决定!但是,由于 LLM 比这些嵌入模型大得多,并且具有最大上下文长度,因此最好使用嵌入来检索前 k 个块。然后,我们可以在较少的 k 个块上使用 LLM 来确定要用作上下文来回答查询的 <k 个块。我们还可以使用重新排名(例如 Cohere Rerank)来进一步确定要使用的最相关块。我们还可以将嵌入与传统的信息检索方法(例如关键字匹配)相结合,这对于匹配嵌入子标记时可能丢失的唯一标记很有用。

HuggingFace 的海量文本嵌入基准:

https://huggingface.co/spaces/mteb/leaderboard

HuggingFaceEmbeddings:

https://api.python.langchain.com/en/latest/embeddings/langchain.embeddings.huggingface.HuggingFaceEmbeddings.html

OpenAIEmbeddings:

https://api.python.langchain.com/en/latest/embeddings/langchain.embeddings.openai.OpenAIEmbeddings.html

 Cohere Rerank:

https://txt.cohere.com/rerank/

from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
import numpy as np
from ray.data import ActorPoolStrategy

class EmbedChunks:
def __init__(self, model_name):
if model_name == "text-embedding-ada-002":
self.embedding_model = OpenAIEmbeddings(
model=model_name,
openai_api_base=os.environ["OPENAI_API_BASE"],
openai_api_key=os.environ["OPENAI_API_KEY"])
else:
self.embedding_model = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs={"device": "cuda"},
encode_kwargs={"device": "cuda", "batch_size": 100})

def __call__(self, batch):
embeddings = self.embedding_model.embed_documents(batch["text"])
return {"text": batch["text"], "source": batch["source"], "embeddings":
embeddings}

在这里,我们能够使用 map_batches 按比例嵌入我们的块。我们所要做的就是定义 batch_size 和计算(我们使用两个工作器,每个工作器有 1 个 GPU)。

# Embed chunks
embedding_model_name = "thenlper/gte-base"
embedded_chunks = chunks_ds.map_batches(
EmbedChunks,
fn_constructor_kwargs={"model_name": embedding_model_name},
batch_size=100,
num_gpus=1,
compute=ActorPoolStrategy(size=2))
# Sample (text, source, embedding) triplet
[{'text': 'External library integrations for Ray Tune#',
'source': 'https://docs.ray.io/en/master/tune/api/integration.html#external-library-integrations-for-ray-tune',
'embeddings': [
0.012108353897929192,
0.009078810922801495,
0.030281754210591316,
-0.0029687234200537205,
…]
}

索引数据

现在我们有了嵌入的块,我们需要将它们索引(存储)到某个地方,以便我们可以快速检索它们进行推理。虽然有许多流行的向量数据库选项,但我们将使用 Postgres 和 pgvector,因为它简单且性能好。我们将创建一个表(文档)并为每个嵌入的块写入(文本、源、嵌入)三元组。Postgres 和 pgvector:https://github.com/pgvector/pgvector

class StoreResults:
def __call__(self, batch):
with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
register_vector(conn)
with conn.cursor() as cur:
for text, source, embedding in zip
(batch["text"], batch["source"], batch["embeddings"]):
cur.execute("INSERT INTO document (text, source, embedding)
VALUES (%s, %s, %s)", (text, source, embedding,),)
return {}

再次,我们可以利用 Ray Data map_batches 来并行执行此索引:

map_batches :

https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html?

# Index data
embedded_chunks.map_batches(
StoreResults,
batch_size=128,
num_cpus=1,
compute=ActorPoolStrategy(size=28),
).count()

查询检索

在我们的向量数据库中索引了嵌入的块后,我们就可以针对给定的查询执行检索了。首先,我们将使用与嵌入文本块相同的嵌入模型来嵌入传入的查询。

# Embed query
embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)
query = "What is the default batch size for map_batches?"

embedding = np.array(embedding_model.embed_query(query))
len(embedding)
768

然后,我们将通过提取与我们的嵌入式查询最接近的嵌入块来检索前 k 个最相关的块。我们使用余弦距离 (<=>),但有很多选项(https://github.com/pgvector/pgvector#vector-operators)可供选择。一旦我们检索到前 num_chunks,我们就可以收集每个块的文本并将其用作上下文来生成响应。

# Get context
num_chunks = 5
with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
register_vector(conn)
with conn.cursor() as cur:
cur.execute("SELECT * FROM document ORDER BY embedding <-> %s LIMIT %s", (embedding, num_chunks))
rows = cur.fetchall()
context = [{"text": row[1]} for row in rows]
sources = [row[2] for row in rows]
https://docs.ray.io/en/master/data/api/doc/ray.data.Dataset.map_batches.html#ray-data-dataset-map-batches
entire blocks as batches (blocks may contain different numbers of rows).
The actual size of the batch provided to fn may be smaller than
batch_size if batch_size doesn’t evenly divide the block(s) sent
to a given map task. Default batch_size is 4096 with “default”.

https://docs.ray.io/en/master/data/transforming-data.html#configuring-batch-size
The default batch size depends on your resource type. If you’re using CPUs,
the default batch size is 4096. If you’re using GPUs, you must specify an explicit batch size.

(cont…)

我们可以将所有这些组合成一个方便的函数:

def semantic_search(query, embedding_model, k):
embedding = np.array(embedding_model.embed_query(query))
with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
register_vector(conn)
with conn.cursor() as cur:
cur.execute("SELECT * FROM document ORDER BY embedding <=> %s LIMIT %s", (embedding, k),)
rows = cur.fetchall()
semantic_context = [{"id": row[0], "text": row[1], "source": row[2]} for row in rows]
return semantic_context

响应生成

现在,我们可以使用上下文从 LLM 生成响应。如果没有检索到的相关上下文,LLM 可能无法准确回答我们的问题。随着数据的增长,我们可以轻松地嵌入和索引任何新数据,并能够检索它来回答问题。

from rag.generate import prepare_response
from rag.utils import get_client

def generate_response(
llm, temperature=0.0, stream=True,
system_content="", assistant_content="", user_content="",
max_retries=1, retry_interval=60):
"""Generate response from an LLM."""
retry_count = 0
client = get_client(llm=llm)
messages = [{"role": role, "content": content} for role, content in [
("system", system_content),
("assistant", assistant_content),
("user", user_content)] if content]
while retry_count <= max_retries:
try:
chat_completion = client.chat.completions.create(
model=llm,
temperature=temperature,
stream=stream,
messages=messages,
)
return prepare_response(chat_completion, stream=stream)

except Exception as e:
print(f"Exception: {e}")
time.sleep(retry_interval) # default is per-minute rate limits
retry_count += 1
return ""

注意:我们使用 0.0 的温度来启用可重复的实验,但您应该根据您的用例进行调整。对于需要始终以事实为依据的用例,我们建议使用非常低的温度值,而更具创造性的任务可以从更高的温度下受益。

# Generate response
query = "What is the default batch size for map_batches?"
response = generate_response(
llm="meta-llama/Llama-2-70b-chat-hf",
temperature=0.0,
stream=True,
system_content="Answer the query using the context provided. Be succinct.",
user_content=f"query: {query}, context: {context}")
The default batch size for map_batches is 4096.

让我们将上下文检索和响应生成结合在一起,形成一个方便的查询代理,我们可以使用它轻松生成响应。这将负责设置我们的代理(嵌入和 LLM 模型)以及上下文检索,并将其传递给我们的 LLM 以生成响应。

class QueryAgent:
def __init__(self, embedding_model_name="thenlper/gte-base",
llm="meta-llama/Llama-2-70b-chat-hf", temperature=0.0,
max_context_length=4096, system_content="", assistant_content=""):

# Embedding model
self.embedding_model = get_embedding_model(
embedding_model_name=embedding_model_name,
model_kwargs={"device": "cuda"},
encode_kwargs={"device": "cuda", "batch_size": 100})

# Context length (restrict input length to 50% of total length)
max_context_length = int(0.5*max_context_length)

# LLM
self.llm = llm
self.temperature = temperature
self.context_length = max_context_length - get_num_tokens(system_content + assistant_content)
self.system_content = system_content
self.assistant_content = assistant_content

def __call__(self, query, num_chunks=5, stream=True):
# Get sources and context
context_results = semantic_search(
query=query,
embedding_model=self.embedding_model,
k=num_chunks)

# Generate response
context = [item["text"] for item in context_results]
sources = [item["source"] for item in context_results]
user_content = f"query: {query}, context: {context}"

answer = generate_response(
llm=self.llm,
temperature=self.temperature,
stream=stream,
system_content=self.system_content,
assistant_content=self.assistant_content,
user_content=trim(user_content, self.context_length))

# Result
result = {
"question": query,
"sources": sources,
"answer": answer,
"llm": self.llm,
}
return result

有了这个,我们只需几行就可以使用我们的 RAG 应用程序:

llm = "meta-llama/Llama-2-7b-chat-hf"
agent = QueryAgent(
embedding_model_name="thenlper/gte-base",
llm=llm,
max_context_length=MAX_CONTEXT_LENGTHS[llm],
system_content="Answer the query using the context provided. Be succinct.")
result = agent(query="What is the default batch size for map_batches?")
print("\n\n", json.dumps(result, indent=2))
The default batch size for `map_batches` is 4096

{
"question": "What is the default batch size for map_batches?",
"sources": [
"ray.data.Dataset.map_batches — Ray 2.7.1",
"Transforming Data — Ray 2.7.1",
"Ray Data Internals — Ray 2.7.1",
"Dynamic Request Batching — Ray 2.7.1",
"Image Classification Batch Inference with PyTorch — Ray 2.7.1"
],
"answer": "The default batch size for `map_batches` is 4096",
"llm": "meta-llama/Llama-2-7b-chat-hf"
}

评估

到目前为止,我们已经为 RAG 应用程序的各个部分选择了典型/任意值。但是,如果我们要更改某些内容,例如分块逻辑、嵌入模型、LLM 等,我们如何知道我们拥有比以前更好的配置?像这样的生成任务很难进行定量评估,因此我们需要开发可靠的方法来进行评估。由于我们的应用程序中有许多可变组件,因此我们需要执行单元/组件和端到端评估。组件评估可能涉及单独评估我们的检索(是我们检索到的一组块中的最佳来源)和评估我们的 LLM 响应(给定最佳来源,LLM 是否能够产生高质量的答案)。对于端到端评估,我们可以评估整个系统的质量(给定数据源,响应的质量如何)。我们将要求我们的评估员 LLM 使用上下文对回答的质量进行 1-5 之间的评分,但是,我们也可以让它为其他维度生成分数,例如幻觉(仅使用提供的上下文中的信息生成的答案)、毒性等。注意:我们可以将分数限制为二进制(0/1),这可能更容易解释(例如,回答要么正确要么不正确)。但是,我们在分数中引入了更高的方差,以便更深入、更细致地了解 LLM 如何对回答进行评分(例如,LLM 对回答的偏见)。

检索系统和 LLM 的组件评估(左),总体评估(右)。

评估器我们将从确定评估器开始。给定查询的响应和相关上下文,我们的评估器应该是一种值得信赖的方法来评分/评估响应的质量。但在确定评估器之前,我们需要一个问题数据集和答案的来源。我们可以使用此数据集要求不同的评估者提供答案,然后对他们的答案进行评分(例如,分数在 1-5 之间)。然后,我们可以检查此数据集以确定我们的评估者是否公正,并且对分配的分数有合理的推理。注意:我们正在评估我们的 LLM 在给定相关上下文的情况下生成响应的能力。这是一个组件级评估(quality_score (LLM)),因为我们没有使用检索来获取相关上下文。我们将从手动创建数据集开始(如果您无法手动创建数据集,请继续阅读)。我们有一个用户查询列表和回答查询的理想来源 datasets/eval-dataset-v1.jsonl。我们将使用上面的 LLM 应用程序通过 GPT-4 为每个查询/源对生成参考答案。

datasets/eval-dataset-v1.jsonl:

https://github.com/ray-project/llm-applications/blob/main/datasets/eval-dataset-v1.jsonl

with open(Path(ROOT_DIR, "datasets/eval-dataset-v1.jsonl"), "r") as f:
data = [json.loads(item) for item in list(f)]
[{'question': 'I’m struggling a bit with Ray Data type conversions when I do map_batches. Any advice?',
'source': 'https://docs.ray.io/en/master/data/transforming-data.html'},

{'question': 'Is Ray integrated with DeepSpeed?',
'source': 'https://docs.ray.io/en/master/ray-air/examples/gptj_deepspeed_fine_tuning.html#fine-tuning-the-model-with-ray-air-a-name-train-a'}]

每个数据点都有一个问题,并且标记的源具有与问题答案相关的精确上下文:

# Sample
uri = "https://docs.ray.io/en/master/data/transforming-data.html#configuring-batch-format"
fetch_text(uri=uri)
'\nConfiguring batch format#\nRay Data represents batches as dicts of NumPy ndarrays or pandas DataFrames. …'
我们可以从此上下文中提取文本并将其传递给我们的 LLM 以生成问题的答案。我们还将要求它对查询的响应质量进行评分。为此,我们定义了一个继承自 QueryAgent 的 QueryAgentWithContext,不同之处在于我们提供上下文,它不需要检索它。
class QueryAgentWithContext(QueryAgent):
def __call__(self, query, context):
user_content = f"query: {query}, context: {context}"
response = generate_response(
llm=self.llm,
temperature=self.temperature,
stream=True,
system_content=self.system_content,
assistant_content=self.assistant_content,
user_content=user_content[: self.context_length])
return response

现在,我们可以创建一个包含问题、来源、答案、分数和推理的数据集。我们可以检查它以确定我们的评估器是否高质量。

  • 问题:“当我执行 map_batches 时,我对 Ray Data 类型转换有点困惑。有什么建议吗?”
  • 来源:“https://docs.ray.io/en/master/data/transforming-data.html#configuring-batch-format”
  • 答案:“您可以通过在 ‘map_batches()’ 函数中指定 ‘batch_format’ 来配置 Ray Data 中的批处理类型。如果您使用的是 NumPy ndarray,则您的函数应该返回 ndarrays 的字典。如果您使用的是 pandas DataFrames,则您的函数应该返回 DataFrame。确保您的函数根据您指定的 ‘batch_format’ 返回正确的类型。”
  • 得分:5
  • 理由:“上下文提供了有关如何在 Ray Data 中配置批处理类型以及如何使用 ‘map_batches()’ 函数的明确说明。它还提供了 NumPy 和 pandas 的示例,可直接回答查询。”

根据它提供的分数和推理,我们发现 GPT-4 是一款高质量的评估器。我们对其他 LLM(例如 Llama-2-70b)进行了同样的评估,发现它们缺乏适当的推理,并且非常慷慨地给出了自己的答案。

EVALUATOR = "gpt-4"

注意:更彻底的评估还会通过要求评估者比较以下不同 LLM 的回答来测试以下内容:

  • 位置(我们首先显示哪些回答)
  • 冗长程度(较长的回答更受青睐)
  • 裙带关系(例如 GPT4 更喜欢 GPT 3.5,等等)

冷启动我们可能并不总是有准备好的问题数据集和随时可用的最佳来源来回答该问题。为了解决这个冷启动问题,我们可以使用 LLM 查看我们的文本块并生成特定块将回答的问题。这为我们提供了高质量的问题和答案的确切来源。但是,这种数据集生成方法可能会有点嘈杂。生成的问题可能并不总是与用户可能提出的问题高度一致。我们所说的最佳来源的特定块也可能在其他块中具有该确切信息。尽管如此,在我们收集 + 手动标记高质量数据集的同时,这仍然是开始我们的开发过程的好方法。

# Prompt
num_questions = 3
system_content = f"""
Create {num_questions} questions using only the context provided.
End each question with a '?' character and then in a newline write the answer to that question using only the context provided.
Separate each question/answer pair by a newline.
"""

# Generate questions
synthetic_data = []
for chunk in chunks[:1]: # small samples
response = generate_response(
llm="gpt-4",
temperature=0.0,
system_content=system_content,
user_content=f"context: {chunk.page_content}")
entries = response.split("\n\n")
for entry in entries:
question, answer = entry.split("\n")
synthetic_data.append({"question": question, "source": chunk.metadata["source"], "answer": answer})
synthetic_data[:3]
[{'question': 'What can you use to monitor and debug your Ray applications and clusters?',
'source': 'https://docs.ray.io/en/master/ray-observability/reference/index.html#reference',
'answer': 'You can use the API and CLI documented in the references to monitor and debug your Ray applications and clusters.'},
{'question': 'What are the guides included in the references?',
'source': 'https://docs.ray.io/en/master/ray-observability/reference/index.html#reference',
'answer': 'The guides included in the references are State API, State CLI, and System Metrics.'},
{'question': 'What are the two types of interfaces mentioned for monitoring and debugging Ray applications and clusters?',
'source': 'https://docs.ray.io/en/master/ray-observability/reference/index.html#reference',
'answer': 'The two types of interfaces mentioned for monitoring and debugging Ray applications and clusters are API and CLI.'}]

本文章转载微信公众号@Ray中文社区

#你可能也喜欢这些API文章!