Coze Studio 源码解读(六):知识库与 RAG 实现
Coze Studio 源码解读(六):知识库与 RAG 实现
深入理解检索增强生成的核心实现
一、知识库概述
1.1 知识库的作用
知识库是 Agent 获取外部知识的重要途径,通过 RAG(检索增强生成)技术,让 Agent 能够访问和理解大量私有数据。
知识库核心价值
扩展知识边界
├── 访问企业私有数据
├── 使用最新信息
├── 领域专业知识
└── 实时数据更新
提升回答质量
├── 减少幻觉
├── 提供准确引用
├── 支持溯源
└── 增强可信度
应用场景
├── 企业知识问答
├── 产品文档助手
├── 法律法规查询
├── 医疗知识库
└── 技术文档检索
1.2 RAG 架构
RAG 系统架构
┌─────────────────────────────────────────────┐
│ 文档处理流程 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 上传 │ │ 解析 │ │ 分块 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 向量化 │ │ 索引 │ │ 存储 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ 检索流程 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 查询 │ │ 向量化 │ │ 检索 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 重排序 │ │ 过滤 │ │ 返回 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ 生成流程 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 构建提示│ │ LLM生成 │ │ 返回答案│ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────┘
二、文档处理
2.1 知识库数据结构
// 知识库定义
type Knowledge struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
// 配置
EmbeddingModel string `json:"embedding_model"` // Embedding 模型
ChunkSize int `json:"chunk_size"` // 分块大小
ChunkOverlap int `json:"chunk_overlap"` // 分块重叠
// 统计
DocCount int `json:"doc_count"` // 文档数量
ChunkCount int `json:"chunk_count"` // 分块数量
// 状态
Status int `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
// 文档定义
type Document struct {
ID string `json:"id"`
KnowledgeID string `json:"knowledge_id"`
Name string `json:"name"`
Source string `json:"source"` // 来源 URL/文件路径
Type string `json:"type"` // pdf/word/md/txt
// 内容
Content string `json:"content"` // 原始内容
Chunks []Chunk `json:"chunks"` // 分块
// 元数据
Metadata map[string]interface{} `json:"metadata"`
// 状态
Status int `json:"status"`
}
// 分块定义
type Chunk struct {
ID string `json:"id"`
DocID string `json:"doc_id"`
Content string `json:"content"`
Embedding []float32 `json:"embedding"` // 向量
// 元数据
Position int `json:"position"` // 在文档中的位置
Metadata map[string]interface{} `json:"metadata"`
}
2.2 文档解析器
// 文档解析器接口
type DocumentParser interface {
Parse(content []byte) (*ParsedDocument, error)
SupportedTypes() []string
}
// PDF 解析器
type PDFParser struct{}
func (p *PDFParser) Parse(content []byte) (*ParsedDocument, error) {
reader := bytes.NewReader(content)
pdfReader, err := pdf.NewPdfReader(reader)
if err != nil {
return nil, err
}
// 提取文本
var textBuilder strings.Builder
numPages, _ := pdfReader.GetNumPages()
for i := 1; i <= numPages; i++ {
page, _ := pdfReader.GetPage(i)
text, _ := page.ExtractText()
textBuilder.WriteString(text)
textBuilder.WriteString("\n")
}
return &ParsedDocument{
Content: textBuilder.String(),
Metadata: map[string]interface{}{
"pages": numPages,
},
}, nil
}
// Markdown 解析器
type MarkdownParser struct{}
func (p *MarkdownParser) Parse(content []byte) (*ParsedDocument, error) {
// 解析 Markdown 结构
md := goldmark.New()
var buf bytes.Buffer
if err := md.Convert(content, &buf); err != nil {
return nil, err
}
return &ParsedDocument{
Content: string(content),
HTML: buf.String(),
}, nil
}
2.3 文本分块
// 分块器
type Chunker struct {
chunkSize int
chunkOverlap int
}
// 分块
func (c *Chunker) Chunk(text string) []Chunk {
chunks := []Chunk{}
// 按段落分割
paragraphs := strings.Split(text, "\n\n")
currentChunk := ""
position := 0
for _, para := range paragraphs {
if len(currentChunk)+len(para) > c.chunkSize {
// 保存当前分块
if currentChunk != "" {
chunks = append(chunks, Chunk{
ID: uuid.New().String(),
Content: strings.TrimSpace(currentChunk),
Position: position,
})
position++
// 保留重叠部分
overlap := c.getOverlap(currentChunk)
currentChunk = overlap + para
}
} else {
currentChunk += para + "\n\n"
}
}
// 最后一个分块
if currentChunk != "" {
chunks = append(chunks, Chunk{
ID: uuid.New().String(),
Content: strings.TrimSpace(currentChunk),
Position: position,
})
}
return chunks
}
// 获取重叠部分
func (c *Chunker) getOverlap(text string) string {
if len(text) <= c.chunkOverlap {
return text
}
// 找到最后一个完整句子
overlap := text[len(text)-c.chunkOverlap:]
if idx := strings.Index(overlap, "。"); idx != -1 {
overlap = overlap[idx+1:]
}
return overlap
}
三、向量化存储
3.1 Embedding 服务
// Embedding 服务
type EmbeddingService struct {
client EmbeddingClient
model string
}
// 生成向量
func (s *EmbeddingService) Embed(ctx context.Context, texts []string) ([][]float32, error) {
// 调用 Embedding 模型
response, err := s.client.Embed(ctx, &EmbedRequest{
Model: s.model,
Input: texts,
})
if err != nil {
return nil, err
}
return response.Embeddings, nil
}
// 批量生成向量
func (s *EmbeddingService) BatchEmbed(ctx context.Context, chunks []Chunk) error {
batchSize := 100
for i := 0; i < len(chunks); i += batchSize {
end := i + batchSize
if end > len(chunks) {
end = len(chunks)
}
batch := chunks[i:end]
texts := make([]string, len(batch))
for j, chunk := range batch {
texts[j] = chunk.Content
}
embeddings, err := s.Embed(ctx, texts)
if err != nil {
return err
}
for j, embedding := range embeddings {
batch[j].Embedding = embedding
}
}
return nil
}
3.2 向量存储
// 向量存储接口
type VectorStore interface {
// 存储
Store(ctx context.Context, chunks []Chunk) error
// 检索
Search(ctx context.Context, query []float32, k int) ([]SearchResult, error)
// 删除
Delete(ctx context.Context, docID string) error
}
// Milvus 向量存储
type MilvusStore struct {
client *milvus.Client
}
func (s *MilvusStore) Store(ctx context.Context, chunks []Chunk) error {
// 准备数据
ids := make([]string, len(chunks))
vectors := make([][]float32, len(chunks))
contents := make([]string, len(chunks))
for i, chunk := range chunks {
ids[i] = chunk.ID
vectors[i] = chunk.Embedding
contents[i] = chunk.Content
}
// 插入数据
return s.client.Insert(ctx, &milvus.InsertRequest{
CollectionName: "knowledge_chunks",
Fields: []milvus.Field{
{Name: "id", Values: ids},
{Name: "vector", Values: vectors},
{Name: "content", Values: contents},
},
})
}
func (s *MilvusStore) Search(ctx context.Context, query []float32, k int) ([]SearchResult, error) {
// 向量检索
results, err := s.client.Search(ctx, &milvus.SearchRequest{
CollectionName: "knowledge_chunks",
Vectors: [][]float32{query},
TopK: int64(k),
MetricType: milvus.COSINE,
})
if err != nil {
return nil, err
}
// 转换结果
searchResults := make([]SearchResult, 0, k)
for _, result := range results[0] {
searchResults = append(searchResults, SearchResult{
ID: result.ID.(string),
Content: result.Fields["content"].(string),
Score: result.Score,
})
}
return searchResults, nil
}
四、检索策略
4.1 语义检索
// 语义检索器
type SemanticRetriever struct {
embeddingService *EmbeddingService
vectorStore VectorStore
}
// 检索
func (r *SemanticRetriever) Retrieve(ctx context.Context, query string, k int) ([]SearchResult, error) {
// 向量化查询
embeddings, err := r.embeddingService.Embed(ctx, []string{query})
if err != nil {
return nil, err
}
// 向量检索
results, err := r.vectorStore.Search(ctx, embeddings[0], k)
if err != nil {
return nil, err
}
return results, nil
}
4.2 关键词检索
// 关键词检索器
type KeywordRetriever struct {
esClient *elasticsearch.Client
}
// 检索
func (r *KeywordRetriever) Retrieve(ctx context.Context, query string, k int) ([]SearchResult, error) {
// 构建查询
searchQuery := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"content": query,
},
},
"size": k,
}
// 执行搜索
response, err := r.esClient.Search(
r.esClient.Search.WithContext(ctx),
r.esClient.Search.WithBody(bytes.NewReader(searchQuery)),
)
if err != nil {
return nil, err
}
// 解析结果
return r.parseResults(response), nil
}
4.3 混合检索
// 混合检索器
type HybridRetriever struct {
semanticRetriever *SemanticRetriever
keywordRetriever *KeywordRetriever
reranker *Reranker
}
// 检索
func (r *HybridRetriever) Retrieve(ctx context.Context, query string, k int) ([]SearchResult, error) {
// 并行执行语义和关键词检索
var wg sync.WaitGroup
var semanticResults, keywordResults []SearchResult
var semanticErr, keywordErr error
wg.Add(2)
go func() {
defer wg.Done()
semanticResults, semanticErr = r.semanticRetriever.Retrieve(ctx, query, k)
}()
go func() {
defer wg.Done()
keywordResults, keywordErr = r.keywordRetriever.Retrieve(ctx, query, k)
}()
wg.Wait()
// 合并结果
merged := r.mergeResults(semanticResults, keywordResults)
// 重排序
reranked := r.reranker.Rerank(query, merged, k)
return reranked, nil
}
// 合并结果
func (r *HybridRetriever) mergeResults(semantic, keyword []SearchResult) []SearchResult {
seen := make(map[string]bool)
merged := []SearchResult{}
// 交错合并
maxLen := max(len(semantic), len(keyword))
for i := 0; i < maxLen; i++ {
if i < len(semantic) {
if !seen[semantic[i].ID] {
merged = append(merged, semantic[i])
seen[semantic[i].ID] = true
}
}
if i < len(keyword) {
if !seen[keyword[i].ID] {
merged = append(merged, keyword[i])
seen[keyword[i].ID] = true
}
}
}
return merged
}
五、RAG 实现
5.1 RAG 服务
// RAG 服务
type RAGService struct {
retriever Retriever
llmService LLMService
promptBuilder *PromptBuilder
}
// 问答
func (s *RAGService) Query(ctx context.Context, query string, knowledgeID string) (*Answer, error) {
// 1. 检索相关文档
results, err := s.retriever.Retrieve(ctx, query, knowledgeID, 5)
if err != nil {
return nil, err
}
// 2. 构建上下文
context := s.buildContext(results)
// 3. 构建提示词
prompt := s.promptBuilder.Build(query, context)
// 4. 调用 LLM
answer, err := s.llmService.Chat(ctx, prompt)
if err != nil {
return nil, err
}
return &Answer{
Content: answer,
Sources: results,
}, nil
}
// 构建上下文
func (s *RAGService) buildContext(results []SearchResult) string {
var builder strings.Builder
for i, result := range results {
builder.WriteString(fmt.Sprintf("[%d] %s\n\n", i+1, result.Content))
}
return builder.String()
}
5.2 提示词模板
# RAG 提示词模板
你是一个智能助手,请根据提供的参考资料回答用户问题。
## 参考资料
{{context}}
## 用户问题
{{query}}
## 回答要求
1. 只使用参考资料中的信息回答
2. 如果参考资料中没有相关信息,请明确说明
3. 在回答末尾标注引用来源,格式为 [1]、[2] 等
4. 回答要准确、简洁、有条理
## 回答
六、本章小结
6.1 知识点回顾
本章要点
1. 知识库概念
├── 作用与价值
└── RAG 架构
2. 文档处理
├── 文档解析
├── 文本分块
└── 元数据管理
3. 向量存储
├── Embedding 服务
├── 向量数据库
└── 索引管理
4. 检索策略
├── 语义检索
├── 关键词检索
└── 混合检索
5. RAG 实现
├── 检索增强
└── 提示词构建
下一节预告
下一节我们将学习:前端架构与组件设计
内容包括:
-
Monorepo 架构 -
工作流画布 -
状态管理 -
组件设计
软件定制开发,微信联系:rustgopy
夜雨聆风