RAG-文档切分存储查询完全指南,让AI再也不会胡说八道
RAG 系统:文档切分、存储与查询完整指南
概述
RAG(Retrieval-Augmented Generation)是一种结合检索和生成的 AI 技术方案,可以让语言模型基于知识库中的信息生成更准确、更贴切的回答。本文档详细说明工程中 RAG 系统的完整实现,包括文档处理、向量存储、相似度检索等核心环节。
适用场景:
-
企业知识库问答 -
文档智能检索 -
专业领域 AI 助手 -
AIOps 故障诊断
核心概念
1. 向量(Vector)
将文本转换为数学向量形式,使机器能够理解文本的语义。
-
维度:1024 维(本工程) -
生成方式:调用 Embedding 模型 API -
作用:用于计算文本相似度
示例:
文本: "CPU 使用率高怎么排查"
↓ Embedding Model
向量: [0.021, -0.043, 0.083, ..., 0.067] // 1024 个浮点数
2. Chunk(文档块)
将大文档分割成小块便于处理和存储的单位。
-
字符数:~1600 字符(含重叠) -
包含内容:文本内容 + 元数据(标题、来源等) -
标识方式:UUID4(全局唯一)
3. Embedding 模型
一个专门的 AI 模型,用于将文本转换为向量。
-
本工程使用: nvidia/nv-embedqa-e5-v5 -
输入类型: -
passage:文档块转向量(存储时) -
query:用户问题转向量(查询时) -
API 端点:NVIDIA NIM( https://integrate.api.nvidia.com/v1)
4. L2 距离
衡量两个向量相似度的数学指标(欧氏距离)。
-
距离小 → 语义相近 -
距离大 → 语义不同
系统架构
整体架构图
┌─────────────────────────────────────────────────────────────┐
│ 用户界面 (FastAPI) │
│ localhost:9900 │
└─────────────────┬───────────────────────────────────────────┘
│
┌─────────┴─────────┐
▼ ▼
┌──────────┐ ┌──────────┐
│ /chat │ │ /upload │
│ 查询接口 │ │ 上传接口 │
└──────────┘ └──────────┘
│ │
├─────────┬─────────┤
▼ ▼ ▼
┌──────────────────────────────────────┐
│ 核心服务层(Python) │
│ - RAG Agent Service │
│ - Vector Store Manager │
│ - Document Splitter Service │
│ - Vector Index Service │
└──────────┬──────────────────────────┘
│
┌─────────┼─────────┬──────────┐
▼ ▼ ▼ ▼
┌────┐ ┌────────┐ ┌───────┐ ┌──────┐
│LLM │ │Embedding│ │Milvus │ │MCP │
│NIM │ │ API │ │ DB │ │Tools │
└────┘ └────────┘ └───────┘ └──────┘
存储架构
Milvus Collection (biz) 的表结构:
|
|
|
|
|
|---|---|---|---|
| id |
|
|
a1b2c3d4-e5f6-47g8-h9i0-j1k2l3m4n5o6 |
| vector |
|
|
[0.021, -0.043, ..., 0.067] |
| content |
|
|
"## 排查步骤\n### 步骤1..." |
| metadata |
|
|
{"h1": "CPU告警", "_file_name": "..."} |
完整工作流程
文档存储流程
上传文件
↓
POST /upload
↓
读取文件内容 (UTF-8)
↓
删除旧数据 (若同文件已存在)
↓
文档切分 (3阶段)
├─ 第1阶段:按Markdown标题切分 (h1/h2)
├─ 第2阶段:按字符数递归切分 (1600字符/块)
└─ 第3阶段:合并微小块 (<300字符)
↓
为每个块生成UUID4标识符
↓
批量调用 Embedding API
├─ 入参:chunk 原文本
├─ 参数:input_type="passage"
└─ 返回:[1024维向量, ...]
↓
组装 Milvus 记录
├─ id: UUID4
├─ vector: 1024维浮点数组
├─ content: 原文本
└─ metadata: JSON 元数据
↓
插入 Milvus Collection
↓
返回成功 (插入的chunk数)
用户查询流程
用户问题
↓
POST /chat { "question": "...", "id": "session_id" }
↓
LangGraph Agent 启动
├─ 加载系统提示词
└─ 将问题转成 HumanMessage
↓
LLM 推理
├─ 分析问题
└─ 决定是否调用工具
↓
是否需要知识库?
/ \
是 否
↓ ↓
调用知识库工具 直接回答
↓
embed_query(问题)
├─ 参数:input_type="query"
└─ 返回:query_vector [1024维]
↓
Milvus ANN 检索
├─ 输入:query_vector
├─ 索引:IVF_FLAT + L2距离
├─ 距离计算:$d = \sqrt{\sum (V_{query_i} - V_{doc_i})^2}$
└─ 返回:距离最小的 top-3 记录
↓
组装参考资料
├─ 提取 content(正文)
├─ 提取 metadata(标题、来源)
└─ 格式化为人类可读文本
↓
LLM 生成最终答案
├─ 输入:参考资料 + 原问题
└─ 输出:解答文字
↓
返回给用户
组件详解
1. 文档分割服务 (DocumentSplitterService)
文件位置:app/services/document_splitter_service.py
3阶段切分策略:
第1阶段:MarkdownHeaderTextSplitter
按 Markdown 标题结构切分(h1/h2),保留标题信息。
# 输入
content = """
# CPU 告警处理
## 排查步骤
内容...
## 解决方案
内容...
"""
# 输出(第1阶段)
[
Document(content="# CPU 告警处理\n## 排查步骤\n内容...", metadata={"h1": "CPU 告警处理", "h2": "排查步骤"}),
Document(content="## 解决方案\n内容...", metadata={"h1": "CPU 告警处理", "h2": "解决方案"}),
]
第2阶段:RecursiveCharacterTextSplitter
按字符数进一步切分,防止单块过大。
chunk_size = 1600# 目标块大小
chunk_overlap = 100# 相邻块重叠字符数
目的: 避免文本被突兀切割,保留语境连贯性。
第3阶段:合并微小块
合并小于 300 字符的相邻块,避免碎片化。
# 输入:[chunk1(200字符), chunk2(800字符), chunk3(150字符), ...]
# 输出:[chunk1+chunk2(1000字符), chunk3+..., ...]
2. 向量嵌入服务 (DashScopeEmbeddings)
文件位置:app/services/vector_embedding_service.py
两个关键方法:
# 方法1:批量嵌入文档(存储时)
defembed_documents(texts: List[str]) -> List[List[float]]:
response = client.embeddings.create(
model="nvidia/nv-embedqa-e5-v5",
input=texts,
extra_body={"input_type": "passage"} # ← passage 模式
)
return [item.embedding for item in response.data]
# 方法2:单个查询嵌入(查询时)
defembed_query(text: str) -> List[float]:
response = client.embeddings.create(
model="nvidia/nv-embedqa-e5-v5",
input=text,
extra_body={"input_type": "query"} # ← query 模式
)
return response.data[0].embedding
两种模式的区别:
|
|
|
|
|
|---|---|---|---|
|
|
|
|
|
|
|
|
|
|
3. 向量存储管理 (VectorStoreManager)
文件位置:app/services/vector_store_manager.py
核心操作:
# 初始化 Milvus VectorStore
self.vector_store = Milvus(
embedding_function=vector_embedding_service, # 自动调用 embed_documents
collection_name="biz",
text_field="content", # 文本字段
vector_field="vector", # 向量字段
primary_field="id", # 主键字段
metadata_field="metadata"# 元数据字段
)
# 添加文档(自动向量化)
result_ids = self.vector_store.add_documents(
documents, # List[Document],包含 page_content 和 metadata
ids=[str(uuid.uuid4()) for _ in documents] # UUID4 主键
)
# 相似度搜索
docs = self.vector_store.similarity_search(query, k=3)
# 内部流程:
# 1. embed_query(query) → query_vector
# 2. Milvus.search(query_vector, top_k=3) → 最相近的3条记录
# 3. 转换为 Document 对象返回
4. Milvus 客户端配置 (MilvusClientManager)
文件位置:app/core/milvus_client.py
Collection Schema 定义:
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=8000),
FieldSchema(name="metadata", dtype=DataType.JSON),
]
索引配置:
index_params = {
"metric_type": "L2", # 欧氏距离
"index_type": "IVF_FLAT", # 倒排文件 + 平坦量化
"params": {"nlist": 128}, # 128 个聚类中心
}
参数说明:
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
5. 知识检索工具 (retrieve_knowledge)
文件位置:app/tools/knowledge_tool.py
工具定义:
@tool(response_format="content_and_artifact")
defretrieve_knowledge(query: str) -> Tuple[str, List[Document]]:
"""从知识库检索相关信息"""
vector_store = vector_store_manager.get_vector_store()
retriever = vector_store.as_retriever(search_kwargs={"k": 3})
docs = retriever.invoke(query)
# 格式化为人类可读的上下文
context = format_docs(docs)
return context, docs
格式化输出示例:
【参考资料 1】
标题: CPU使用率过高告警处理方案 > 排查步骤
来源: cpu_high_usage.md
内容:
## 排查步骤
### 步骤1: 获取当前时间
...
【参考资料 2】
...
6. RAG Agent 服务 (RagAgentService)
文件位置:app/services/rag_agent_service.py
两阶段初始化:
# 第1阶段:__init__(同步)
def__init__(self):
self.model = ChatOpenAI(...) # LLM 客户端
self.tools = [retrieve_knowledge, get_current_time] # 本地工具
self.checkpointer = MemorySaver() # 会话记忆
self._agent_initialized = False
# 第2阶段:_initialize_agent(异步)
asyncdef_initialize_agent(self):
mcp_client = await get_mcp_client_with_retry() # 加载 MCP 工具
mcp_tools = await mcp_client.get_tools()
all_tools = self.tools + mcp_tools
self.agent = create_agent(
self.model,
tools=all_tools,
checkpointer=self.checkpointer, # LangGraph 状态管理
)
Agent 执行流程:
asyncdefquery(self, question: str, session_id: str) -> str:
await self._initialize_agent()
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=question)
]
result = await self.agent.ainvoke(
input={"messages": messages},
config={"configurable": {"thread_id": session_id}}
)
return result["messages"][-1].content
部署配置
环境变量配置 (.env)
# 应用配置
APP_NAME=SuperBizAgent
PORT=9900
# LLM 配置 (NVIDIA NIM)
DASHSCOPE_API_KEY=nvapi-xxx...xxx
DASHSCOPE_API_BASE=https://integrate.api.nvidia.com/v1
DASHSCOPE_MODEL=nvidia/nemotron-3-super-120b-a12b
DASHSCOPE_EMBEDDING_MODEL=nvidia/nv-embedqa-e5-v5
# Milvus 配置
MILVUS_HOST=localhost
MILVUS_PORT=19530
MILVUS_TIMEOUT=10000
# RAG 配置
RAG_TOP_K=3 # 检索返回的文档数
CHUNK_MAX_SIZE=800 # 文档块最大字符数
CHUNK_OVERLAP=100 # 块间重叠字符数
# MCP 服务配置
MCP_CLS_TRANSPORT=streamable-http
MCP_CLS_URL=http://localhost:8003/mcp
MCP_MONITOR_TRANSPORT=streamable-http
MCP_MONITOR_URL=http://localhost:8004/mcp
启动服务
# 1. 启动 Milvus(Docker)
docker-compose -f vector-database.yml up -d
# 2. 启动 MCP 服务
python mcp_servers/cls_server.py &
python mcp_servers/monitor_server.py &
# 3. 启动主应用
python app/main.py
生产环境部署
只需修改 .env 中的 MCP 服务地址,将本地服务改为真实云服务:
# 将本地地址改为真实服务
MCP_CLS_URL=https://your-cls-mcp-server.company.com/mcp
MCP_MONITOR_URL=https://your-monitor-mcp-server.company.com/mcp
# 业务代码无需改动,因为工具通过 MCP 协议动态发现
关键代码解析
代码示例1:文档上传与存储完整流程
# 文件:app/api/file.py
@router.post("/upload")
asyncdefupload_file(file: UploadFile):
# 1. 保存文件到磁盘
file_path = f"./uploads/{file.filename}"
with open(file_path, "wb") as f:
f.write(await file.read())
# 2. 触发索引流程(内部调用)
from app.services.vector_index_service import vector_index_service
result = vector_index_service.index_single_file(file_path)
return {
"success": result.success,
"chunks": result.success_count,
"message": f"成功索引 {result.success_count} 个文档块"
}
internal 流程:
# 文件:app/services/vector_index_service.py
defindex_single_file(self, file_path: str):
# 第1步:读取文件
content = Path(file_path).read_text(encoding="utf-8")
# 第2步:删除旧数据
vector_store_manager.delete_by_source(file_path)
# 第3步:分割文档
documents = document_splitter_service.split_document(content, file_path)
# 返回值:List[Document]
# 每个 Document 包含:
# - page_content: 文本块
# - metadata: {"h1": "...", "h2": "...", "_file_name": "...", ...}
# 第4步:向量存储(自动向量化)
if documents:
vector_store_manager.add_documents(documents)
# 内部流程:
# 1. 生成 UUID4 作为 id
# 2. 调用 embed_documents(texts, input_type="passage")
# 3. 组装 Milvus 记录:{id, vector, content, metadata}
# 4. 批量插入数据库
代码示例2:查询流程完整解析
# 文件:app/api/chat.py - POST /chat
@router.post("/chat")
asyncdefchat(request: ChatRequest):
question = request.question
session_id = request.id
# 调用 RAG Agent
answer = await rag_agent_service.query(question, session_id)
return {
"code": 200,
"data": {
"success": True,
"answer": answer,
"errorMessage": None
}
}
RAG Agent 内部执行过程:
# 文件:app/services/rag_agent_service.py
asyncdefquery(self, question: str, session_id: str) -> str:
# 1. 构建消息列表
messages = [
SystemMessage(content="你是专业AI助手..."),
HumanMessage(content=question)
]
# 2. 调用 Agent(LangGraph)
result = await self.agent.ainvoke(
input={"messages": messages},
config={"configurable": {"thread_id": session_id}}
)
# agent.ainvoke 内部流程:
# (1) LLM 收到消息,分析问题
# (2) LLM 决定:需要调用 retrieve_knowledge?
# (3) 若需要,LLM 自动调用工具:
# query_str = "CPU 使用率高怎么排查"
# tool_result = retrieve_knowledge(query_str)
# (4) retrieve_knowledge 内部:
# a. embed_query(query_str, input_type="query")
# → query_vector = [0.019, -0.040, ..., 0.069]
# b. Milvus.search(data=[query_vector], metric_type="L2", limit=3)
# → 计算 query_vector 与所有 doc_vector 的 L2 距离
# → 返回距离最小的前3条记录
# c. format_docs() 组装成可读文本
# (5) LLM 根据工具结果生成最终答案
# (6) 返回答案
return result["messages"][-1].content
代码示例3:Milvus 向量检索原理
# 当执行 retriever.invoke(query) 时的底层过程
# Step 1:向量化查询
query_vector = embedding_function.embed_query(
query="CPU 使用率高怎么排查",
input_type="query"
)
# query_vector 的值:
# [-0.042, 0.017, 0.083, -0.061, ..., 0.031] # 1024维
# Step 2:Milvus 检索
# 伪代码展示内部计算
for each_doc in collection:
# 计算 L2 距离
distance = sqrt(sum((query_vector[i] - each_doc.vector[i])^2for i in0..1024))
# 示例:
# doc1: vector = [-0.041, 0.016, 0.082, ...] → distance = 0.003 ✓ 最小
# doc2: vector = [-0.038, 0.019, 0.079, ...] → distance = 0.012 ✓ 次小
# doc3: vector = [0.520, 0.310, -0.220, ...] → distance = 1.847 ✗ 太大
# Step 3:返回 top-3
results = [doc1, doc2, doc3] # 按距离从小到大排序
常见问题
Q1: 为什么存储时用 input_type=passage,查询时用 input_type=query?
A: 同一个 Embedding 模型对”文档”和”查询”有不同的优化:
-
passage 模式:对长文本和细节保留优化,适合文档块转向量 -
query 模式:对短句和语义意图优化,适合用户问题转向量
两种模式的向量在语义空间中方向更对齐,提高检索准确度。
Q2: chunk 大小 1600 字符是怎么确定的?
A: 这是经验值权衡:
-
太小(<800):上下文不足,语义断裂,检索结果碎片化 -
太大(>2400):包含多个独立语义单元,相似度搜索不精准 -
1600:平衡了上下文完整性和语义聚焦性
Q3: L2 距离和余弦距离有什么区别?
A:
|
|
|
|
|
|---|---|---|---|
|
|
|
|
|
|
|
|
|
L2 对向量的绝对位置敏感,余弦对方向敏感。本工程选用 L2 是因为 Embedding 模型输出已规范化,L2 距离足以反映语义相似度。
Q4: UUID4 每次都不同吗?会重复吗?
A: UUID4 基于随机数生成(128位),理论上重复概率极低。
-
生成 10 亿个 UUID4,碰撞概率 < 0.00000001% -
本工程用于唯一标识 chunk,足以满足需求
如需自增 ID,可改 Milvus schema 的 auto_id=True。
Q5: 查询时返回的 top-3 一定是最相关的吗?
A: 不一定,取决于:
-
Embedding 模型质量:模型训练数据决定语义理解能力 -
问题表述:问题措辞与文档风格接近度 -
向量质量:存储的文档向量是否准确
如果检索效果差,可以:
-
增加 top_k(返回更多候选) -
调整 chunk 大小 -
优化文档质量
Q6: 生产环境怎么切换 MCP 服务?
A: 只需修改 .env 文件中的 MCP 地址:
# 开发环境(本地 mock)
MCP_CLS_URL=http://localhost:8003/mcp
# 生产环境(真实云服务)
MCP_CLS_URL=https://your-real-cls-service.com/mcp
业务代码零改动,因为所有工具都通过 MCP 协议动态发现和调用。
Q7: Milvus 的 IVF_FLAT 索引 nlist=128 什么意思?
A:
-
IVF_FLAT:将 1024 维向量空间通过 K-means 聚类分成多个”桶” -
nlist=128:总共分成 128 个桶 -
查询时:先找到 query_vector 最近的桶,再在桶内精确搜索
这是一个速度与精度的权衡:
-
nlist越大:精度越高,速度越慢 -
nlist越小:速度越快,精度越低
128 是适合数万级数据量的平衡值。
Q8: 如何删除某个文件的所有 chunks?
A:
# 会自动删除该文件的所有 chunks
vector_store_manager.delete_by_source("path/to/file.md")
# 内部原理:
# 1. 查询所有 metadata._source == "path/to/file.md" 的记录
# 2. 根据这些记录的 id 执行批量删除
Q9: RAG 和普通 LLM 对话的区别是什么?
A:
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Q10: 为什么要用 LangChain + LangGraph?不能直接用 API?
A: LangChain 和 LangGraph 的价值:
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
astream()
|
|
|
|
|
LangChain/LangGraph 让你专注业务逻辑,而不是基础设施。
总结
核心流程三句话
-
存储:文件 → 切分 → 向量化 → Milvus 表格(id, vector, content, metadata) -
查询:问题 → 向量化 → L2 距离检索 → 返回 top-3 chunks -
生成:参考资料 + 问题 → LLM → 最终答案
关键技术点
-
向量维度:1024(NVIDIA NIM Embedding 模型) -
距离度量:L2(欧氏距离) -
索引结构:IVF_FLAT(聚类 + 精确搜索) -
文本分割:3阶段(标题 + 字符 + 合并) -
会话管理:LangGraph MemorySaver -
工具扩展:MCP 协议(可插拔)
下一步优化方向
-
增加 chunk 重排(re-ranking) -
实现多语言支持 -
集成真实监控系统(Prometheus 等) -
添加反馈循环用于持续改进
夜雨聆风