向量存储是检索增强生成(RAG)系统中的关键部分。本质上,“向量存储”只是一个数据库,用于存储向量、文本、元数据、ID以及其他应用特定的数据。通用关系型数据库引擎与向量存储之间的区别在于,后者经过专门设计,以优化高维度数据的检索效率。在大规模应用中,这种差异更加明显;想象一个需要扫描成千上万份文档的应用程序,每份文档包含数千页内容,这些文档存储在多个不同的向量存储中。
存在一些用例不需要最大化效率。有些应用程序仅引用几个私人文档,或者开发人员正在构建一个概念验证(Proof of Concept,简称PoC)来展示功能。对于这些应用程序,很容易陷入无底洞,即评估所有可用工具、比较它们并做出决策(如果你陷入其中,我建议点击这篇文章来了解各种向量数据库的评估标准)。另一种途径是直接使用可用的工具。如果团队有一个 Google Cloud 环境,这些应用程序的开发选择就变得相对简单。我们可以使用 Vertex AI 来构建模型,同时使用 BigQuery 作为向量数据库和检索引擎。幸运的是,LangChain 框架提供了 Python 库,使得开发变得非常简单。
我们将要开发一个简单的Python程序,使用Streamlit前端、FastAPI后端、Langchain和Google Cloud服务来针对2024 NFL规则手册中的问题进行提问。我们将使用Docker容器来运行这个应用。(可在此处下载:2024 NFL规则手册)
此应用程序的完整版本位于GitHub,但在本教程中我们将逐步构建它。需要一个启用了BigQuery和Vertex AI API功能的Google Cloud项目。注意:这将产生数据存储和模型推理相关的费用,建议在教程结束后删除该项目。
以下是期望的完整应用程序结构;但是,本教程主要关注的是支持RAG应用程序的后端代码。
以下为文件和目录结构:
└── bq_vector_store └── client └── app.py └── Dockerfile └── requirements.txt └── server └── app.py └── Dockerfile └── requirements.txt └── .env └── .env.example └── .gitignore └── docker-compose.yaml └── README.md
首先,我们需要安装所有必要的Python库。这些库可以一个一个地安装,也可以通过一个需求文件进行批量安装。
/server/requirements.txt
fastapi langchain langchain-google-vertexai langchain-google-community[featurestore] langchain_community uvicorn pdfplumber
app.py
文件将是 FastAPI 应用程序。这里发生所有事情。我们将创建一个 BigQuery 向量存储,设置模型接口,接受上传至向量存储的文件,并根据向量存储中的文件提问。运行应用程序只需要这两条路径;/upload_pdf
和 /ask_question
。
在我们开始讨论或查看那些API路径之前,我们需要配置我们的模型和BigQuery向量存储。为此,我们可以使用LangChain为托管在Vertex AI上的模型创建客户端对象。请注意,PROJECT_ID
将专门针对您的Google Cloud环境。
服务器上的应用程序文件路径 /服务器/app.py
(服务器端的应用程序文件) (服务器上的 app.py
文件)(服务器上的应用程序文件)
从langchain_google_vertexai导入VertexAIEmbeddings, VertexAI embedding = VertexAIEmbeddings(model_name='textembedding-gecko@latest', project=PROJECT_ID) llm = VertexAI(model_name='gemini-1.5-flash-002')
接下来,我们可以创建一个利用 BigQuery 的 Vector Store 对象。这些常量将根据应用程序的具体情况而定。创建完 BigQueryVectorStore 对象后,我们可以将其实例化为检索器。这样我们就可以调用检索器来获取基于查询的上下文。search_kwargs
参数定义了从向量存储中返回多少文本片段。
/server/app.py
从 langchain_google_community 导入 BigQueryVectorStore 作为 BigQueryVectorStore store = BigQueryVectorStore( project_id=PROJECT_ID, # 项目ID dataset_name=DATASET, # 数据集名称 table_name=TABLE, # 表名称 location=REGION, # 地域 embedding=embedding, # 嵌入 ) retriever = store.as_retriever( search_kwargs={ # 搜索参数 "k": 10 } )
现在基础已经设置好(嵌入模型、大型语言模型和向量存储),我们现在可以开始添加聊天记录和一些标准的模型训练提示。这个“指导”是提示工程中的一个环节,它指示大型语言模型使用我们向量存储中的信息。我们将分步骤来解释每个提示和链,进一步解释每个提示和链。
/server/app.py
from langchain_core.prompts import MessagesPlaceholder from langchain_core.prompts import ChatPromptTemplate from langchain.chains import create_history_aware_retriever system_prompt = ( "你是一个用于问答任务的助手。" "使用以下检索到的信息来回答问题。" "如果不知道答案,就说不知道。" "\n\n" "{context}" ) contextualize_q_system_prompt = ( "给定一个聊天历史和最新的用户问题,该问题可能引用了聊天历史中的上下文," "制定一个独立且自包含的问题,这个问题可以在没有聊天历史的情况下被理解。" "不要回答问题,如果需要的话,重新制定问题,否则就按原样返回。" ) contextualize_q_prompt = ChatPromptTemplate.from_messages( [ ("系统", contextualize_q_system_prompt), MessagesPlaceholder("chat_history"), ("用户", "{input}"), ] ) history_aware_retriever = create_history_aware_retriever( llm, retriever, contextualize_q_prompt )
上面的 system_prompt
是一个基本提示,告诉模型如何表现。稍后的代码中会插入从向量存储中检索到的 {context}
。
contextualize_q_system_prompt
将作为我们在聊天历史中的虚拟代理。通过查看提示,我们让模型如果认为我们是在问后续问题的话,重新表述这个问题。
这些提示随后被整合到一个更广泛的提示contextualize_q_prompt
中。可以将其视为一系列逐步的指令。首先,我们使用contextualize_q_system_prompt
提示模型利用聊天历史,然后我们用MessagesPlaceholder("chat_history")
获取之前的聊天消息,最后,("human", "{input}")
这一部分将是我们要提问的关于NFL比赛规则的问题的位置。
上述代码块最终以我们的 history_aware_retriever
结束,它负责从向量存储中检索上下文,并用此信息回答 LLM 的问题,使表达更自然流畅。
我们需要采取另外几步,让程序能用我们想要的方式回答问题。
/server/app.py
from langchain.chains 导入 create_retrieval_chain from langchain.chains.combine_documents 导入 create_stuff_documents_chain qa_prompt = ChatPromptTemplate.from_messages( [ ("system", system_prompt), MessagesPlaceholder("chat_history"), ("human", "{input}"), ] ) # 空聊天记录 chat_history = [] # 创建RAG链(相关性感知生成) question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
注意,qa_prompt
与 contextualize_qa_prompt
非常相似,唯一的区别在于 qa_prompt
中我们使用 system_prompt
作为 "system"
的值,而在 contextualize_qa_prompt
中使用的是 contextualize_system_prompt
。这是由于我们的应用程序有两个决策点。决策在于询问模型是否需要参考聊天记录来检索上下文,还是只需检索上下文。这些步骤在 question_answer_chain
和 rag_chain
中有详细说明。此外,我们设置了一个空的 chat_history
列表,作为我们的内存中的聊天记录。注:在实际应用中,我们也希望将聊天记录存储在独立的数据存储中(例如可以使用 Redis 或 BigQuery)。
现在我们可以将上面的代码添加到我们的FastAPI应用中。我们可以使用我们的store
对象将一个pdf文件添加到向量数据库中,如下:
/server/app.py
从langchain_text_splitters导入RecursiveCharacterTextSplitter 从langchain_community.document_loaders导入PDFPlumberLoader @app.post("/upload_pdf") async def upload_pdf(request: Request): """ 用于将PDF上传到BigQuery向量存储的函数 输入必须是一个url。 参数说明: request (Request): 客户端发送的POST请求,必须包含带有'pdf_url'键的json数据。 从请求中获取数据如下: json_data = await request.json() pdf_url = json_data['pdf_url'] loader = PDFPlumberLoader(pdf_url) docs = loader.load() text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200) splits = text_splitter.split_documents(docs) store.add(splits) return {"消息": "PDF上传成功"}
这个代码块添加了几个新组件。一个是PDFPlumberLoader
类,它非常擅长从PDF文件中提取文本。这个类用来创建docs
,从而获取我们想要上传文件的文本版本。然后,RecursiveCharacterTextSplitter
接手,将文本分割成每段1000个字符(chunk_size=1000
)并且每段之间有200字符的重叠(chunk_overlap=200
)的片段。最后,这些片段(splits
)被加入到我们的向量存储(store
)中。
可以使用下面的 curl 命令访问这个API
curl -X POST \ -H "Content-Type: application/json" \ -d '{"pdf_url": "https://operations.nfl.com/media/24emxacq/2024-nfl-rulebook.pdf"}' \ http://localhost:8000/upload_pdf
此处使用 curl
命令向指定的 URL 发送一个 POST 请求,上传一个包含 PDF 链接的 JSON 数据。
一旦文件上传后,我们希望对这个文件向向量存储库提出一些问题。该端点的定义如下:
/server/app.py
从 langchain_core.messages 导入 AIMessage, HumanMessage @app.post("/ask_question") async def ask_question(request: Request): """ 允许向上面定义的链提出问题。 参数: request (Request): 客户端的POST请求必须包含一个带有'question'键的JSON数据。 返回: answer (str): 对用户问题的回答 """ # 从请求中获取JSON数据 json_data = await request.json() question = json_data['question'] # 调用RAG链 response = rag_chain.invoke( { "input": question, "chat_history": chat_history } ) answer = response['answer'] # 扩展聊天历史记录 chat_history.extend( [ HumanMessage(content=question), AIMessage(content=answer), ] ) return answer
在这里,我们从POST请求中抓取问题,并将其添加到我们的rag_chain
中。同时,我们将这个问题加入到我们的chat_history
中。rag_chain.invoke()
方法将通过模型获取响应。我们只需解析答案(response['answer']
),并用用户的提问及回答来扩充我们的chat_history
。这个聊天记录会成为后续问题的参考依据。
下面我会包含Streamlit应用的代码,但不会过多关注代码组织和功能。该应用允许输入PDF文件的URL,并包含一个聊天机器人组件。Streamlit比较容易操作,通常的挑战是在熟悉用户界面。客户端的唯一要求是streamlit
。
/client/requirements.txt
streamlit
关于代码的几点说明;chat_url
和 backend_url
专用于 Docker 部署,可能需要修改为 localhost。另外,聊天记录存储在客户端 (st.session_state.messages
)。我们这么做是为了方便显示消息,同时减少客户端与服务器之间的数据传输。
客户端app.py文件
import streamlit as st import requests chat_url = "http://server:8000/ask_question" backend_url = "http://server:8000/upload_pdf" # 设置侧边栏输入PDF网址 st.sidebar.title("输入PDF网址") pdf_url = st.sidebar.text_input("PDF网址:", "") # 初始化聊天历史 if "messages" not in st.session_state: st.session_state.messages = [] # 上传PDF到后端按钮 if st.sidebar.button("上传PDF"): if pdf_url: # 发送PDF URL到后端API response = requests.post(backend_url, json={ "pdf_url": pdf_url } ) if response.status_code == 200: st.sidebar.success("PDF上传成功!") else: st.sidebar.error("上传PDF失败,请稍后再试。") else: st.sidebar.warning("请输入PDF网址。") # 设置主页面聊天机器人界面 st.title("关于您的PDF聊天") st.write("请在侧边栏输入PDF网址以开始聊天。") # 在重新加载应用程序时显示聊天历史 for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # 对用户输入作出反应 if prompt := st.chat_input("提问"): # 显示用户消息 st.chat_message("user").markdown(prompt) # 添加用户消息到聊天历史 st.session_state.messages.append({"role": "user", "content": prompt}) response = requests.post(chat_url, json={"question": prompt}) answer = response.json() # 显示助手回复 with st.chat_message("assistant"): st.markdown(answer) # 添加助手回复到聊天历史 st.session_state.messages.append({"role": "assistant", "content": answer})
我们只需要再找到四个文件就可以运行应用程序了。这些文件的定义及注释如下:
/docker-compose.yaml
服务: 后端: image: server build: context: ./server ports: - "8000:8000" volumes: - ${GCLOUD_CONFIG}:/root/.config/gcloud/application_default_credentials.json 环境变量: - GOOGLE_APPLICATION_CREDENTIALS=/root/.config/gcloud/application_default_credentials.json - PROJECT_ID=${PROJECT_ID} - REGION=${REGION} - DATASET=${DATASET} - TABLE=${TABLE} 前端: build: context: ./client ports: - "8501:8501" volumes: - ${GCLOUD_CONFIG}:/root/.config/gcloud/application_default_credentials.json 环境变量: - GOOGLE_APPLICATION_CREDENTIALS=/root/.config/gcloud/application_default_credentials.json
卷:
这将设置所有必要的环境变量,并让我们能够将 Google Cloud 凭证挂载到容器中。
所有的 ${VAR}
环境变量都是从 .env
文件中拉取的。
/.env
PROJECT_ID = your_project_id REGION = your_region DATASET = your_dataset TABLE = nfl_playbook # Linux, macOS: `~/.config/gcloud/application_default_credentials.json` # Windows: `%用户目录%\AppData\gcloud\application_default_credentials.json` GCLOUD_CONFIG = gcloud应用默认凭证.json的路径
这些变量将包括您的 Google Cloud 账户 ID 和路径。请注意,GCLOUD_CONFIG
变量的注释会因操作系统而有所不同。
我们还需要为每个服务准备Dockerfile。这些文件用来指引容器的创建并启动它们的服务器。
/client/Dockerfile
# 使用官方 Python 运行时镜像 FROM python:3.11-slim # 设置工作目录为 /app WORKDIR /app # 复制 requirements 文件到容器 COPY requirements.txt . # 安装所需依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制 Streamlit 应用代码到容器 COPY . . # 暴露应用端口 EXPOSE 8501 # 启动 Streamlit 应用命令 CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0"]
/server/Dockerfile
# 基于官方的 Python 镜像 FROM python:3.11-slim # 设置工作目录为 /app WORKDIR /app # 复制 requirements 文件到容器 COPY requirements.txt . # 安装依赖项 RUN pip install --no-cache-dir -r requirements.txt # 复制 FastAPI 应用代码到容器 COPY . . # 暴露端口 8000 EXPOSE 8000 # 运行 FastAPI 应用使用 uvicorn 的命令 CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
现在所有文件都已经到位,进入 /
目录并执行相关命令。
docker compose up --build
打开浏览器,访问 http://localhost:8051 查看前端页面或访问 http://localhost:8000/docs 查看 API 文档页面。