乐于分享
好东西不私藏

RAG-文档切分存储查询完全指南,让AI再也不会胡说八道

RAG-文档切分存储查询完全指南,让AI再也不会胡说八道

RAG 系统:文档切分、存储与查询完整指南


概述

RAG(Retrieval-Augmented Generation)是一种结合检索和生成的 AI 技术方案,可以让语言模型基于知识库中的信息生成更准确、更贴切的回答。本文档详细说明工程中 RAG 系统的完整实现,包括文档处理、向量存储、相似度检索等核心环节。

适用场景:

  • 企业知识库问答
  • 文档智能检索
  • 专业领域 AI 助手
  • AIOps 故障诊断

核心概念

1. 向量(Vector)

将文本转换为数学向量形式,使机器能够理解文本的语义。

  • 维度:1024 维(本工程)
  • 生成方式:调用 Embedding 模型 API
  • 作用:用于计算文本相似度

示例:

文本:      "CPU 使用率高怎么排查"
      ↓ Embedding Model
向量:      [0.021, -0.043, 0.083, ..., 0.067]  // 1024 个浮点数

2. Chunk(文档块)

将大文档分割成小块便于处理和存储的单位。

  • 字符数:~1600 字符(含重叠)
  • 包含内容:文本内容 + 元数据(标题、来源等)
  • 标识方式:UUID4(全局唯一)

3. Embedding 模型

一个专门的 AI 模型,用于将文本转换为向量。

  • 本工程使用nvidia/nv-embedqa-e5-v5
  • 输入类型
    • passage:文档块转向量(存储时)
    • query:用户问题转向量(查询时)
  • API 端点:NVIDIA NIM(https://integrate.api.nvidia.com/v1

4. L2 距离

衡量两个向量相似度的数学指标(欧氏距离)。

  • 距离小 → 语义相近
  • 距离大 → 语义不同

系统架构

整体架构图

┌─────────────────────────────────────────────────────────────┐
│                        用户界面 (FastAPI)                    │
│                        localhost:9900                        │
└─────────────────┬───────────────────────────────────────────┘
                  │
        ┌─────────┴─────────┐
        ▼                   ▼
   ┌──────────┐        ┌──────────┐
   │ /chat    │        │ /upload  │
   │ 查询接口 │        │ 上传接口 │
   └──────────┘        └──────────┘
        │                   │
        ├─────────┬─────────┤
        ▼         ▼         ▼
   ┌──────────────────────────────────────┐
   │       核心服务层(Python)            │
   │  - RAG Agent Service                 │
   │  - Vector Store Manager              │
   │  - Document Splitter Service         │
   │  - Vector Index Service              │
   └──────────┬──────────────────────────┘
              │
    ┌─────────┼─────────┬──────────┐
    ▼         ▼         ▼          ▼
 ┌────┐  ┌────────┐  ┌───────┐  ┌──────┐
 │LLM │  │Embedding│ │Milvus │  │MCP   │
 │NIM │  │ API    │  │ DB    │  │Tools │
 └────┘  └────────┘  └───────┘  └──────┘

存储架构

Milvus Collection (biz) 的表结构:

字段名
数据类型
含义
示例
id
VARCHAR
唯一标识符 (主键)
a1b2c3d4-e5f6-47g8-h9i0-j1k2l3m4n5o6
vector
FLOAT_VECTOR
1024 维向量(IVF_FLAT索引)
[0.021, -0.043, ..., 0.067]
content
VARCHAR
文本内容
"## 排查步骤\n### 步骤1..."
metadata
JSON
元数据
{"h1": "CPU告警", "_file_name": "..."}

完整工作流程

文档存储流程

上传文件
   ↓
POST /upload
   ↓
读取文件内容 (UTF-8)
   ↓
删除旧数据 (若同文件已存在)
   ↓
文档切分 (3阶段)
   ├─ 第1阶段:按Markdown标题切分 (h1/h2)
   ├─ 第2阶段:按字符数递归切分 (1600字符/块)
   └─ 第3阶段:合并微小块 (<300字符)
   ↓
为每个块生成UUID4标识符
   ↓
批量调用 Embedding API
   ├─ 入参:chunk 原文本
   ├─ 参数:input_type="passage"
   └─ 返回:[1024维向量, ...]
   ↓
组装 Milvus 记录
   ├─ id: UUID4
   ├─ vector: 1024维浮点数组
   ├─ content: 原文本
   └─ metadata: JSON 元数据
   ↓
插入 Milvus Collection
   ↓
返回成功 (插入的chunk数)

用户查询流程

用户问题
   ↓
POST /chat { "question""...""id""session_id" }
   ↓
LangGraph Agent 启动
   ├─ 加载系统提示词
   └─ 将问题转成 HumanMessage
   ↓
LLM 推理
   ├─ 分析问题
   └─ 决定是否调用工具
   ↓
     是否需要知识库?
   /              \
  是              否
  ↓              ↓
调用知识库工具   直接回答
  ↓
embed_query(问题)
   ├─ 参数:input_type="query"
   └─ 返回:query_vector [1024维]
   ↓
Milvus ANN 检索
   ├─ 输入:query_vector
   ├─ 索引:IVF_FLAT + L2距离
   ├─ 距离计算:$d = \sqrt{\sum (V_{query_i} - V_{doc_i})^2}$
   └─ 返回:距离最小的 top-3 记录
   ↓
组装参考资料
   ├─ 提取 content(正文)
   ├─ 提取 metadata(标题、来源)
   └─ 格式化为人类可读文本
   ↓
LLM 生成最终答案
   ├─ 输入:参考资料 + 原问题
   └─ 输出:解答文字
   ↓
返回给用户

组件详解

1. 文档分割服务 (DocumentSplitterService)

文件位置:app/services/document_splitter_service.py

3阶段切分策略:

第1阶段:MarkdownHeaderTextSplitter

按 Markdown 标题结构切分(h1/h2),保留标题信息。

# 输入
content = """
# CPU 告警处理
## 排查步骤
内容...
## 解决方案
内容...
"""


# 输出(第1阶段)
[
    Document(content="# CPU 告警处理\n## 排查步骤\n内容...", metadata={"h1""CPU 告警处理""h2""排查步骤"}),
    Document(content="## 解决方案\n内容...", metadata={"h1""CPU 告警处理""h2""解决方案"}),
]

第2阶段:RecursiveCharacterTextSplitter

按字符数进一步切分,防止单块过大。

chunk_size = 1600# 目标块大小
chunk_overlap = 100# 相邻块重叠字符数

目的: 避免文本被突兀切割,保留语境连贯性。

第3阶段:合并微小块

合并小于 300 字符的相邻块,避免碎片化。

# 输入:[chunk1(200字符), chunk2(800字符), chunk3(150字符), ...]
# 输出:[chunk1+chunk2(1000字符), chunk3+..., ...]

2. 向量嵌入服务 (DashScopeEmbeddings)

文件位置:app/services/vector_embedding_service.py

两个关键方法:

# 方法1:批量嵌入文档(存储时)
defembed_documents(texts: List[str]) -> List[List[float]]:
    response = client.embeddings.create(
        model="nvidia/nv-embedqa-e5-v5",
        input=texts,
        extra_body={"input_type""passage"}  # ← passage 模式
    )
return [item.embedding for item in response.data]

# 方法2:单个查询嵌入(查询时)
defembed_query(text: str) -> List[float]:
    response = client.embeddings.create(
        model="nvidia/nv-embedqa-e5-v5",
        input=text,
        extra_body={"input_type""query"}  # ← query 模式
    )
return response.data[0].embedding

两种模式的区别:

模式
用途
优化方向
示例
passage
文档块转向量
保留细节内容
入库时的 chunk
query
用户问题转向量
强调语义意图
“怎么排查”

3. 向量存储管理 (VectorStoreManager)

文件位置:app/services/vector_store_manager.py

核心操作:

# 初始化 Milvus VectorStore
self.vector_store = Milvus(
    embedding_function=vector_embedding_service,  # 自动调用 embed_documents
    collection_name="biz",
    text_field="content",      # 文本字段
    vector_field="vector",     # 向量字段
    primary_field="id",        # 主键字段
    metadata_field="metadata"# 元数据字段
)

# 添加文档(自动向量化)
result_ids = self.vector_store.add_documents(
    documents,  # List[Document],包含 page_content 和 metadata
    ids=[str(uuid.uuid4()) for _ in documents]  # UUID4 主键
)

# 相似度搜索
docs = self.vector_store.similarity_search(query, k=3)
# 内部流程:
# 1. embed_query(query) → query_vector
# 2. Milvus.search(query_vector, top_k=3) → 最相近的3条记录
# 3. 转换为 Document 对象返回

4. Milvus 客户端配置 (MilvusClientManager)

文件位置:app/core/milvus_client.py

Collection Schema 定义:

fields = [
    FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=8000),
    FieldSchema(name="metadata", dtype=DataType.JSON),
]

索引配置:

index_params = {
"metric_type""L2",           # 欧氏距离
"index_type""IVF_FLAT",      # 倒排文件 + 平坦量化
"params": {"nlist"128},      # 128 个聚类中心
}

参数说明:

参数
含义
作用
metric_type: L2
欧氏距离
计算向量间相似度(距离小=相似)
index_type: IVF_FLAT
倒排文件
快速定位相似向量所在聚类
nlist: 128
聚类中心数
将1024维空间分成128个桶

5. 知识检索工具 (retrieve_knowledge)

文件位置:app/tools/knowledge_tool.py

工具定义:

@tool(response_format="content_and_artifact")
defretrieve_knowledge(query: str) -> Tuple[str, List[Document]]:
"""从知识库检索相关信息"""
    vector_store = vector_store_manager.get_vector_store()
    retriever = vector_store.as_retriever(search_kwargs={"k"3})
    docs = retriever.invoke(query)

# 格式化为人类可读的上下文
    context = format_docs(docs)
return context, docs

格式化输出示例:

【参考资料 1】
标题: CPU使用率过高告警处理方案 > 排查步骤
来源: cpu_high_usage.md
内容:
## 排查步骤
### 步骤1: 获取当前时间
...

【参考资料 2】
...

6. RAG Agent 服务 (RagAgentService)

文件位置:app/services/rag_agent_service.py

两阶段初始化:

# 第1阶段:__init__(同步)
def__init__(self):
    self.model = ChatOpenAI(...)           # LLM 客户端
    self.tools = [retrieve_knowledge, get_current_time]  # 本地工具
    self.checkpointer = MemorySaver()      # 会话记忆
    self._agent_initialized = False

# 第2阶段:_initialize_agent(异步)
asyncdef_initialize_agent(self):
    mcp_client = await get_mcp_client_with_retry()  # 加载 MCP 工具
    mcp_tools = await mcp_client.get_tools()
    all_tools = self.tools + mcp_tools

    self.agent = create_agent(
        self.model,
        tools=all_tools,
        checkpointer=self.checkpointer,  # LangGraph 状态管理
    )

Agent 执行流程:

asyncdefquery(self, question: str, session_id: str) -> str:
await self._initialize_agent()

    messages = [
        SystemMessage(content=system_prompt),
        HumanMessage(content=question)
    ]

    result = await self.agent.ainvoke(
        input={"messages": messages},
        config={"configurable": {"thread_id": session_id}}
    )

return result["messages"][-1].content

部署配置

环境变量配置 (.env)

# 应用配置
APP_NAME=SuperBizAgent
PORT=9900

# LLM 配置 (NVIDIA NIM)
DASHSCOPE_API_KEY=nvapi-xxx...xxx
DASHSCOPE_API_BASE=https://integrate.api.nvidia.com/v1
DASHSCOPE_MODEL=nvidia/nemotron-3-super-120b-a12b
DASHSCOPE_EMBEDDING_MODEL=nvidia/nv-embedqa-e5-v5

# Milvus 配置
MILVUS_HOST=localhost
MILVUS_PORT=19530
MILVUS_TIMEOUT=10000

# RAG 配置
RAG_TOP_K=3                    # 检索返回的文档数
CHUNK_MAX_SIZE=800             # 文档块最大字符数
CHUNK_OVERLAP=100              # 块间重叠字符数

# MCP 服务配置
MCP_CLS_TRANSPORT=streamable-http
MCP_CLS_URL=http://localhost:8003/mcp
MCP_MONITOR_TRANSPORT=streamable-http
MCP_MONITOR_URL=http://localhost:8004/mcp

启动服务

# 1. 启动 Milvus(Docker)
docker-compose -f vector-database.yml up -d

# 2. 启动 MCP 服务
python mcp_servers/cls_server.py &
python mcp_servers/monitor_server.py &

# 3. 启动主应用
python app/main.py

生产环境部署

只需修改 .env 中的 MCP 服务地址,将本地服务改为真实云服务:

# 将本地地址改为真实服务
MCP_CLS_URL=https://your-cls-mcp-server.company.com/mcp
MCP_MONITOR_URL=https://your-monitor-mcp-server.company.com/mcp

# 业务代码无需改动,因为工具通过 MCP 协议动态发现

关键代码解析

代码示例1:文档上传与存储完整流程

# 文件:app/api/file.py
@router.post("/upload")
asyncdefupload_file(file: UploadFile):
# 1. 保存文件到磁盘
    file_path = f"./uploads/{file.filename}"
with open(file_path, "wb"as f:
        f.write(await file.read())

# 2. 触发索引流程(内部调用)
from app.services.vector_index_service import vector_index_service
    result = vector_index_service.index_single_file(file_path)

return {
"success": result.success,
"chunks": result.success_count,
"message"f"成功索引 {result.success_count} 个文档块"
    }

internal 流程:

# 文件:app/services/vector_index_service.py
defindex_single_file(self, file_path: str):
# 第1步:读取文件
    content = Path(file_path).read_text(encoding="utf-8")

# 第2步:删除旧数据
    vector_store_manager.delete_by_source(file_path)

# 第3步:分割文档
    documents = document_splitter_service.split_document(content, file_path)
# 返回值:List[Document]
# 每个 Document 包含:
# - page_content: 文本块
# - metadata: {"h1": "...", "h2": "...", "_file_name": "...", ...}

# 第4步:向量存储(自动向量化)
if documents:
        vector_store_manager.add_documents(documents)
# 内部流程:
# 1. 生成 UUID4 作为 id
# 2. 调用 embed_documents(texts, input_type="passage")
# 3. 组装 Milvus 记录:{id, vector, content, metadata}
# 4. 批量插入数据库

代码示例2:查询流程完整解析

# 文件:app/api/chat.py - POST /chat
@router.post("/chat")
asyncdefchat(request: ChatRequest):
    question = request.question
    session_id = request.id

# 调用 RAG Agent
    answer = await rag_agent_service.query(question, session_id)

return {
"code"200,
"data": {
"success"True,
"answer": answer,
"errorMessage"None
        }
    }

RAG Agent 内部执行过程:

# 文件:app/services/rag_agent_service.py

asyncdefquery(self, question: str, session_id: str) -> str:
# 1. 构建消息列表
    messages = [
        SystemMessage(content="你是专业AI助手..."),
        HumanMessage(content=question)
    ]

# 2. 调用 Agent(LangGraph)
    result = await self.agent.ainvoke(
        input={"messages": messages},
        config={"configurable": {"thread_id": session_id}}
    )

# agent.ainvoke 内部流程:
# (1) LLM 收到消息,分析问题
# (2) LLM 决定:需要调用 retrieve_knowledge?
# (3) 若需要,LLM 自动调用工具:
#     query_str = "CPU 使用率高怎么排查"
#     tool_result = retrieve_knowledge(query_str)
# (4) retrieve_knowledge 内部:
#     a. embed_query(query_str, input_type="query")
#        → query_vector = [0.019, -0.040, ..., 0.069]
#     b. Milvus.search(data=[query_vector], metric_type="L2", limit=3)
#        → 计算 query_vector 与所有 doc_vector 的 L2 距离
#        → 返回距离最小的前3条记录
#     c. format_docs() 组装成可读文本
# (5) LLM 根据工具结果生成最终答案
# (6) 返回答案

return result["messages"][-1].content

代码示例3:Milvus 向量检索原理

# 当执行 retriever.invoke(query) 时的底层过程

# Step 1:向量化查询
query_vector = embedding_function.embed_query(
    query="CPU 使用率高怎么排查",
    input_type="query"
)
# query_vector 的值:
# [-0.042, 0.017, 0.083, -0.061, ..., 0.031]  # 1024维

# Step 2:Milvus 检索
# 伪代码展示内部计算
for each_doc in collection:
# 计算 L2 距离
    distance = sqrt(sum((query_vector[i] - each_doc.vector[i])^2for i in0..1024))

# 示例:
# doc1: vector = [-0.041, 0.016, 0.082, ...] → distance = 0.003 ✓ 最小
# doc2: vector = [-0.038, 0.019, 0.079, ...] → distance = 0.012 ✓ 次小
# doc3: vector = [0.520, 0.310, -0.220, ...] → distance = 1.847 ✗ 太大

# Step 3:返回 top-3
results = [doc1, doc2, doc3]  # 按距离从小到大排序

常见问题

Q1: 为什么存储时用 input_type=passage,查询时用 input_type=query

A: 同一个 Embedding 模型对”文档”和”查询”有不同的优化:

  • passage 模式:对长文本和细节保留优化,适合文档块转向量
  • query 模式:对短句和语义意图优化,适合用户问题转向量

两种模式的向量在语义空间中方向更对齐,提高检索准确度。

Q2: chunk 大小 1600 字符是怎么确定的?

A: 这是经验值权衡:

  • 太小(<800):上下文不足,语义断裂,检索结果碎片化
  • 太大(>2400):包含多个独立语义单元,相似度搜索不精准
  • 1600:平衡了上下文完整性和语义聚焦性

Q3: L2 距离和余弦距离有什么区别?

A:

指标
公式
含义
本工程
L2距离
向量间的欧氏距离
✓ 使用
余弦距离
向量夹角的补集

L2 对向量的绝对位置敏感,余弦对方向敏感。本工程选用 L2 是因为 Embedding 模型输出已规范化,L2 距离足以反映语义相似度。

Q4: UUID4 每次都不同吗?会重复吗?

A: UUID4 基于随机数生成(128位),理论上重复概率极低。

  • 生成 10 亿个 UUID4,碰撞概率 < 0.00000001%
  • 本工程用于唯一标识 chunk,足以满足需求

如需自增 ID,可改 Milvus schema 的 auto_id=True

Q5: 查询时返回的 top-3 一定是最相关的吗?

A: 不一定,取决于:

  1. Embedding 模型质量:模型训练数据决定语义理解能力
  2. 问题表述:问题措辞与文档风格接近度
  3. 向量质量:存储的文档向量是否准确

如果检索效果差,可以:

  • 增加 top_k(返回更多候选)
  • 调整 chunk 大小
  • 优化文档质量

Q6: 生产环境怎么切换 MCP 服务?

A: 只需修改 .env 文件中的 MCP 地址:

# 开发环境(本地 mock)
MCP_CLS_URL=http://localhost:8003/mcp

# 生产环境(真实云服务)
MCP_CLS_URL=https://your-real-cls-service.com/mcp

业务代码零改动,因为所有工具都通过 MCP 协议动态发现和调用。

Q7: Milvus 的 IVF_FLAT 索引 nlist=128 什么意思?

A:

  • IVF_FLAT:将 1024 维向量空间通过 K-means 聚类分成多个”桶”
  • nlist=128:总共分成 128 个桶
  • 查询时:先找到 query_vector 最近的桶,再在桶内精确搜索

这是一个速度与精度的权衡:

  • nlist 越大:精度越高,速度越慢
  • nlist 越小:速度越快,精度越低

128 是适合数万级数据量的平衡值。

Q8: 如何删除某个文件的所有 chunks?

A:

# 会自动删除该文件的所有 chunks
vector_store_manager.delete_by_source("path/to/file.md")

# 内部原理:
# 1. 查询所有 metadata._source == "path/to/file.md" 的记录
# 2. 根据这些记录的 id 执行批量删除

Q9: RAG 和普通 LLM 对话的区别是什么?

A:

方面
普通 LLM
RAG Agent
信息来源
训练数据(固定)
知识库(动态)
实时性
差(训练数据过时)
好(知识库可实时更新)
准确性
中等(可能幻觉)
高(基于真实数据)
可追溯性
无(无来源)
有(返回参考资料)
成本
低(纯推理)
中等(需检索 + 推理)

Q10: 为什么要用 LangChain + LangGraph?不能直接用 API?

A: LangChain 和 LangGraph 的价值:

方面
直接 API
LangChain/LangGraph
工具集成
手工拼接
统一接口
流程编排
自己写循环
自动编排、状态管理
会话记忆
自己实现
MemorySaver 开箱即用
模型切换
改一堆代码
改配置
流式输出
手工处理
astream()

 一行搞定
错误恢复
自己处理
内置重试机制

LangChain/LangGraph 让你专注业务逻辑,而不是基础设施。


总结

核心流程三句话

  1. 存储:文件 → 切分 → 向量化 → Milvus 表格(id, vector, content, metadata)
  2. 查询:问题 → 向量化 → L2 距离检索 → 返回 top-3 chunks
  3. 生成:参考资料 + 问题 → LLM → 最终答案

关键技术点

  • 向量维度:1024(NVIDIA NIM Embedding 模型)
  • 距离度量:L2(欧氏距离)
  • 索引结构:IVF_FLAT(聚类 + 精确搜索)
  • 文本分割:3阶段(标题 + 字符 + 合并)
  • 会话管理:LangGraph MemorySaver
  • 工具扩展:MCP 协议(可插拔)

下一步优化方向

  1. 增加 chunk 重排(re-ranking)
  2. 实现多语言支持
  3. 集成真实监控系统(Prometheus 等)
  4. 添加反馈循环用于持续改进