从零搭建AI Agent知识库:从理论基础到工程实践
在当今AI技术飞速发展的时代,智能体(Agent)已成为连接大模型与具体应用场景的关键桥梁。然而,大多数AI Agent往往只具备"短期记忆",无法积累长期知识。本文将深入探讨如何为AI Agent构建完整的知识库系统,从理论分析到工程实现,提供一套可落地的解决方案。
一、为什么AI Agent需要知识库?
1.1 大模型的局限性
虽然大语言模型(LLM)展现出了令人惊叹的能力,但它们存在几个关键限制:
上下文限制:模型只能处理有限的上下文长度(如128K tokens) 知识陈旧:训练数据存在时间窗口,无法获取最新信息 缺乏个性化:无法记忆用户的特定偏好和历史交互 成本高昂:将大量知识存储在模型中既不经济也不高效
1.2 知识库的核心价值
一个设计良好的知识库可以解决上述问题:
长期记忆:存储历史对话、决策过程和用户偏好 个性化服务:基于历史交互提供定制化建议 动态更新:实时获取和整合最新信息 成本优化:减少对大模型的依赖,降低调用成本
二、知识库系统架构设计
2.1 三层架构模型
一个完整的AI Agent知识库系统应该包含三个层次:
┌─────────────────────────────────────────┐
│ 应用层(Application) │
│ • 对话管理 • 任务调度 • 工具调用 │
├─────────────────────────────────────────┤
│ 智能层(Intelligence) │
│ • 意图识别 • 知识检索 • 推理决策 │
├─────────────────────────────────────────┤
│ 存储层(Storage) │
│ • 向量数据库 • 关系数据库 • 文件系统 │
└─────────────────────────────────────────┘
2.2 关键技术组件
2.2.1 向量数据库(Vector Database)
作用:存储文本的向量表示,支持语义搜索 选型建议: 生产级:Pinecone, Weaviate, Qdrant 开源方案:Chroma, FAISS, Milvus 轻量级:LanceDB, SQLite-VSS
2.2.2 关系数据库(Relational Database)
作用:存储结构化数据和元数据 选型建议: 通用场景:PostgreSQL, MySQL 嵌入式场景:SQLite 时序数据:TimescaleDB
2.2.3 缓存系统(Cache System)
作用:加速高频访问,减少数据库压力 选型建议: 内存缓存:Redis, Memcached 分布式缓存:Redis Cluster
三、完整实现方案
3.1 Python实现:基于SQLite和Chroma的轻量级知识库
import sqlite3
import chromadb
from typing import List, Dict, Any
import json
from datetime import datetime
import hashlib
class KnowledgeBase:
"""AI Agent知识库核心类"""
def __init__(self, db_path: str = "knowledge.db"):
# 初始化SQLite数据库
self.conn = sqlite3.connect(db_path)
self._init_sqlite_tables()
# 初始化Chroma向量数据库
self.chroma_client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.chroma_client.get_or_create_collection(
name="knowledge_embeddings"
)
def _init_sqlite_tables(self):
"""初始化SQLite表结构"""
cursor = self.conn.cursor()
# 知识条目表
cursor.execute('''
CREATE TABLE IF NOT EXISTS knowledge_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content_type VARCHAR(50) NOT NULL,
content TEXT NOT NULL,
metadata JSON,
tags TEXT,
importance_score FLOAT DEFAULT 1.0,
access_count INTEGER DEFAULT 0,
last_accessed TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 对话历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversation_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id VARCHAR(100) NOT NULL,
user_message TEXT NOT NULL,
agent_response TEXT NOT NULL,
context JSON,
user_sentiment FLOAT,
agent_confidence FLOAT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 用户偏好表
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_preferences (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id VARCHAR(100) NOT NULL,
preference_type VARCHAR(50) NOT NULL,
preference_value TEXT NOT NULL,
confidence_score FLOAT DEFAULT 1.0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def add_knowledge(self, content: str, content_type: str = "text",
metadata: Dict = None, tags: List[str] = None):
"""添加知识条目"""
# 生成唯一ID
content_hash = hashlib.md5(content.encode()).hexdigest()
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO knowledge_items
(content_type, content, metadata, tags, importance_score)
VALUES (?, ?, ?, ?, ?)
''', (content_type, content,
json.dumps(metadata or {}),
','.join(tags or []),
1.0))
# 获取插入的ID
knowledge_id = cursor.lastrowid
# 同时添加到向量数据库
self.collection.add(
documents=[content],
metadatas=[metadata or {}],
ids=[f"knowledge_{knowledge_id}"]
)
self.conn.commit()
return knowledge_id
def search_knowledge(self, query: str, limit: int = 5):
"""语义搜索知识"""
# 向量搜索
vector_results = self.collection.query(
query_texts=[query],
n_results=limit
)
# 关键词搜索(SQLite全文搜索)
cursor = self.conn.cursor()
cursor.execute('''
SELECT id, content, metadata, tags,
importance_score, access_count
FROM knowledge_items
WHERE content LIKE ? OR tags LIKE ?
ORDER BY importance_score DESC, access_count DESC
LIMIT ?
''', (f"%{query}%", f"%{query}%", limit))
text_results = cursor.fetchall()
# 合并和去重结果
all_results = []
seen_ids = set()
# 处理向量搜索结果
for i, doc in enumerate(vector_results['documents'][0]):
metadata = vector_results['metadatas'][0][i]
knowledge_id = int(vector_results['ids'][0][i].split('_')[1])
if knowledge_id not in seen_ids:
all_results.append({
'id': knowledge_id,
'content': doc,
'metadata': metadata,
'score': vector_results['distances'][0][i],
'source': 'vector'
})
seen_ids.add(knowledge_id)
# 处理文本搜索结果
for row in text_results:
if row[0] not in seen_ids:
all_results.append({
'id': row[0],
'content': row[1],
'metadata': json.loads(row[2]),
'tags': row[3].split(',') if row[3] else [],
'importance_score': row[4],
'access_count': row[5],
'source': 'text'
})
seen_ids.add(row[0])
return all_results
def record_conversation(self, session_id: str, user_message: str,
agent_response: str, context: Dict = None):
"""记录对话历史"""
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO conversation_history
(session_id, user_message, agent_response, context)
VALUES (?, ?, ?, ?)
''', (session_id, user_message, agent_response,
json.dumps(context or {})))
self.conn.commit()
return cursor.lastrowid
def get_conversation_context(self, session_id: str, limit: int = 10):
"""获取对话上下文"""
cursor = self.conn.cursor()
cursor.execute('''
SELECT user_message, agent_response, context
FROM conversation_history
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT ?
''', (session_id, limit))
history = cursor.fetchall()
return [
{
'user': row[0],
'agent': row[1],
'context': json.loads(row[2]) if row[2] else {}
}
for row in history
]
def update_user_preference(self, user_id: str,
preference_type: str,
preference_value: Any,
confidence_score: float = 1.0):
"""更新用户偏好"""
cursor = self.conn.cursor()
# 检查是否已存在
cursor.execute('''
SELECT id FROM user_preferences
WHERE user_id = ? AND preference_type = ?
''', (user_id, preference_type))
existing = cursor.fetchone()
if existing:
cursor.execute('''
UPDATE user_preferences
SET preference_value = ?,
confidence_score = ?,
last_updated = CURRENT_TIMESTAMP
WHERE id = ?
''', (json.dumps(preference_value),
confidence_score, existing[0]))
else:
cursor.execute('''
INSERT INTO user_preferences
(user_id, preference_type, preference_value, confidence_score)
VALUES (?, ?, ?, ?)
''', (user_id, preference_type,
json.dumps(preference_value), confidence_score))
self.conn.commit()
def get_user_preferences(self, user_id: str):
"""获取用户所有偏好"""
cursor = self.conn.cursor()
cursor.execute('''
SELECT preference_type, preference_value, confidence_score
FROM user_preferences
WHERE user_id = ?
ORDER BY confidence_score DESC, last_updated DESC
''', (user_id,))
preferences = {}
for row in cursor.fetchall():
preferences[row[0]] = {
'value': json.loads(row[1]),
'confidence': row[2]
}
return preferences
def increment_access_count(self, knowledge_id: int):
"""增加知识访问计数"""
cursor = self.conn.cursor()
cursor.execute('''
UPDATE knowledge_items
SET access_count = access_count + 1,
last_accessed = CURRENT_TIMESTAMP
WHERE id = ?
''', (knowledge_id,))
self.conn.commit()
def update_importance_score(self, knowledge_id: int,
delta: float = 0.1):
"""更新知识重要性评分"""
cursor = self.conn.cursor()
cursor.execute('''
UPDATE knowledge_items
SET importance_score = importance_score + ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (delta, knowledge_id))
self.conn.commit()
def cleanup_old_conversations(self, days: int = 30):
"""清理旧对话记录"""
cursor = self.conn.cursor()
cursor.execute('''
DELETE FROM conversation_history
WHERE created_at < datetime('now', ?)
''', (f'-{days} days',))
deleted_count = cursor.rowcount
self.conn.commit()
return deleted_count
3.2 知识库集成到AI Agent
import openai
from typing import List, Dict, Any
class KnowledgeEnhancedAgent:
"""集成知识库的AI Agent"""
def __init__(self, knowledge_base: KnowledgeBase,
api_key: str = None):
self.kb = knowledge_base
self.client = openai.OpenAI(api_key=api_key)
self.current_session = None
def set_session(self, session_id: str):
"""设置当前会话ID"""
self.current_session = session_id
def generate_response(self, user_message: str,
context: Dict = None) -> str:
"""生成基于知识的回复"""
# 1. 搜索相关知识
relevant_knowledge = self.kb.search_knowledge(user_message, limit=3)
# 2. 获取对话历史
conversation_context = []
if self.current_session:
conversation_context = self.kb.get_conversation_context(
self.current_session, limit=5
)
# 3. 获取用户偏好
user_preferences = {}
if self.current_session:
user_preferences = self.kb.get_user_preferences(self.current_session)
# 4. 构建系统提示
system_prompt = self._build_system_prompt(
relevant_knowledge, conversation_context, user_preferences
)
# 5. 调用大模型
response = self._call_llm(system_prompt, user_message, context)
# 6. 记录对话
if self.current_session:
self.kb.record_conversation(
self.current_session, user_message, response, context
)
# 7. 更新知识访问统计
for knowledge in relevant_knowledge:
self.kb.increment_access_count(knowledge['id'])
# 如果知识被使用,增加重要性
self.kb.update_importance_score(knowledge['id'], 0.05)
return response
def _build_system_prompt(self, knowledge: List[Dict],
conversation: List[Dict],
preferences: Dict) -> str:
"""构建系统提示"""
prompt_parts = []
# 添加知识部分
if knowledge:
prompt_parts.append("## 相关知识库信息:")
for i, item in enumerate(knowledge[:3], 1):
prompt_parts.append(f"{i}. {item['content'][:200]}...")
# 添加上下文部分
if conversation:
prompt_parts.append("\n## 最近对话历史:")
for i, conv in enumerate(conversation[:3], 1):
prompt_parts.append(f"用户: {conv['user'][:100]}")
prompt_parts.append(f"你: {conv['agent'][:100]}")
# 添加偏好部分
if preferences:
prompt_parts.append("\n## 用户偏好:")
for pref_type, pref_data in list(preferences.items())[:5]:
value_str = str(pref_data['value'])[:50]
prompt_parts.append(f"- {pref_type}: {value_str}")
# 添加指令
prompt_parts.append("\n## 指令:")
prompt_parts.append("1. 基于相关知识提供准确回答")
prompt_parts.append("2. 保持对话连贯性")
prompt_parts.append("3. 尊重用户偏好")
prompt_parts.append("4. 如果知识库中没有相关信息,诚实说明")
return "\n".join(prompt_parts)
def _call_llm(self, system_prompt: str, user_message: str,
context: Dict = None) -> str:
"""调用大语言模型"""
try:
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
temperature=0.7,
max_tokens=1000
)
return response.choices[0].message.content
except Exception as e:
return f"抱歉,处理请求时出现错误: {str(e)}"
def learn_from_feedback(self, feedback: str, relevance_score: float):
"""从反馈中学习"""
if self.current_session:
# 解析反馈中的关键词
keywords = self._extract_keywords(feedback)
# 更新用户偏好
self.kb.update_user_preference(
self.current_session,
"feedback_preferences",
{
"recent_feedback": feedback,
"keywords": keywords,
"relevance_score": relevance_score
},
confidence_score=relevance_score
)
def _extract_keywords(self, text: str) -> List[str]:
"""简单关键词提取"""
# 在实际应用中可以使用NLP库
words = text.lower().split()
# 过滤停用词
stop_words = {"的", "了", "和", "是", "在", "有", "我", "你"}
return [word for word in words if word not in stop_words]
3.3 Web API接口
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
app = FastAPI(title="AI Agent Knowledge Base API")
# 初始化知识库和Agent
kb = KnowledgeBase("agent_knowledge.db")
agent = KnowledgeEnhancedAgent(kb)
class MessageRequest(BaseModel):
session_id: str
message: str
context: Optional[Dict] = None
class KnowledgeRequest(BaseModel):
content: str
content_type: str = "text"
metadata: Optional[Dict] = None
tags: Optional[List[str]] = None
class SearchRequest(BaseModel):
query: str
limit: int = 5
@app.post("/chat")
async def chat(request: MessageRequest):
"""聊天接口"""
agent.set_session(request.session_id)
response = agent.generate_response(request.message, request.context)
return {
"session_id": request.session_id,
"response": response,
"timestamp": datetime.now().isoformat()
}
@app.post("/knowledge/add")
async def add_knowledge(request: KnowledgeRequest):
"""添加知识"""
knowledge_id = kb.add_knowledge(
request.content,
request.content_type,
request.metadata,
request.tags
)
return {
"knowledge_id": knowledge_id,
"status": "success"
}
@app.post("/knowledge/search")
async def search_knowledge(request: SearchRequest):
"""搜索知识"""
results = kb.search_knowledge(request.query, request.limit)
return {
"query": request.query,
"results": results,
"count": len(results)
}
@app.get("/conversation/{session_id}")
async def get_conversation(session_id: str, limit: int = 10):
"""获取对话历史"""
history = kb.get_conversation_context(session_id, limit)
return {
"session_id": session_id,
"history": history,
"count": len(history)
}
@app.post("/preferences/{session_id}")
async def update_preference(
session_id: str,
preference_type: str,
preference_value: Dict
):
"""更新用户偏好"""
kb.update_user_preference(
session_id,
preference_type,
preference_value
)
return {"status": "success"}
@app.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"database": "connected"
}
四、进阶功能实现
4.1 智能知识管理
class IntelligentKnowledgeManager:
"""智能知识管理器"""
def __init__(self, knowledge_base: KnowledgeBase):
self.kb = knowledge_base
def auto_categorize(self, content: str) -> List[str]:
"""自动分类"""
# 使用简单的规则或调用NLP模型
categories = []
# 技术相关关键词
tech_keywords = ["代码", "编程", "算法", "API", "数据库", "服务器"]
# 业务相关关键词
business_keywords = ["客户", "订单", "销售", "市场", "财务"]
content_lower = content.lower()
if any(keyword in content_lower for keyword in tech_keywords):
categories.append("技术")
if any(keyword in content_lower for keyword in business_keywords):
categories.append("业务")
if not categories:
categories.append("通用")
return categories
def calculate_relevance_score(self, query: str, content: str) -> float:
"""计算相关性评分"""
# 简单的相关性计算
query_words = set(query.lower().split())
content_words = set(content.lower().split())
if not query_words:
return 0.0
intersection = query_words.intersection(content_words)
return len(intersection) / len(query_words)
def deduplicate_knowledge(self, similarity_threshold: float = 0.8):
"""去重相似知识"""
cursor = self.kb.conn.cursor()
cursor.execute('SELECT id, content FROM knowledge_items')
all_items = cursor.fetchall()
duplicates = []
for i in range(len(all_items)):
for j in range(i + 1, len(all_items)):
id1, content1 = all_items[i]
id2, content2 = all_items[j]
similarity = self._calculate_text_similarity(content1, content2)
if similarity > similarity_threshold:
duplicates.append((id1, id2, similarity))
# 保留重要性更高的条目
for id1, id2, similarity in duplicates:
cursor.execute('''
SELECT importance_score FROM knowledge_items WHERE id = ?
''', (id1,))
score1 = cursor.fetchone()[0]
cursor.execute('''
SELECT importance_score FROM knowledge_items WHERE id = ?
''', (id2,))
score2 = cursor.fetchone()[0]
# 删除重要性较低的条目
to_delete = id2 if score1 > score2 else id1
cursor.execute('DELETE FROM knowledge_items WHERE id = ?', (to_delete,))
self.kb.conn.commit()
return len(duplicates)
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
"""计算文本相似度(简化版)"""
# 在实际应用中可以使用更复杂的算法
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union)
4.2 知识更新与维护
class KnowledgeMaintenance:
"""知识维护系统"""
def __init__(self, knowledge_base: KnowledgeBase):
self.kb = knowledge_base
def update_importance_scores(self):
"""动态更新重要性评分"""
cursor = self.kb.conn.cursor()
# 基于访问频率更新
cursor.execute('''
UPDATE knowledge_items
SET importance_score =
CASE
WHEN access_count > 100 THEN 1.5
WHEN access_count > 50 THEN 1.2
WHEN access_count > 10 THEN 1.0
ELSE 0.8
END
''')
# 基于时间衰减
cursor.execute('''
UPDATE knowledge_items
SET importance_score = importance_score *
CASE
WHEN julianday('now') - julianday(created_at) > 365 THEN 0.5
WHEN julianday('now') - julianday(created_at) > 180 THEN 0.7
WHEN julianday('now') - julianday(created_at) > 90 THEN 0.9
ELSE 1.0
END
''')
self.kb.conn.commit()
def archive_inactive_knowledge(self, days_inactive: int = 180):
"""归档不活跃知识"""
cursor = self.kb.conn.cursor()
# 创建归档表(如果不存在)
cursor.execute('''
CREATE TABLE IF NOT EXISTS archived_knowledge AS
SELECT * FROM knowledge_items WHERE 1=0
''')
# 移动不活跃知识到归档表
cursor.execute('''
INSERT INTO archived_knowledge
SELECT * FROM knowledge_items
WHERE (last_accessed IS NULL OR
julianday('now') - julianday(last_accessed) > ?)
AND importance_score < 0.5
''', (days_inactive,))
# 从主表中删除
cursor.execute('''
DELETE FROM knowledge_items
WHERE id IN (SELECT id FROM archived_knowledge)
''')
moved_count = cursor.rowcount
self.kb.conn.commit()
return moved_count
def export_knowledge(self, format: str = "json") -> str:
"""导出知识库"""
cursor = self.kb.conn.cursor()
cursor.execute('''
SELECT id, content_type, content, metadata, tags,
importance_score, access_count, created_at
FROM knowledge_items
ORDER BY importance_score DESC
''')
knowledge_items = []
for row in cursor.fetchall():
knowledge_items.append({
'id': row[0],
'content_type': row[1],
'content': row[2],
'metadata': json.loads(row[3]),
'tags': row[4].split(',') if row[4] else [],
'importance_score': row[5],
'access_count': row[6],
'created_at': row[7]
})
if format == "json":
return json.dumps(knowledge_items, ensure_ascii=False, indent=2)
elif format == "csv":
# CSV格式导出
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
# 写入标题
writer.writerow(['ID', 'Content Type', 'Content', 'Tags',
'Importance', 'Access Count', 'Created At'])
# 写入数据
for item in knowledge_items:
writer.writerow([
item['id'],
item['content_type'],
item['content'][:100] + "...",
','.join(item['tags']),
item['importance_score'],
item['access_count'],
item['created_at']
])
return output.getvalue()
return "Unsupported format"
五、部署与运维指南
5.1 部署架构
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 前端应用 │────▶│ API网关 │────▶│ 知识库服务 │
│ (Web/App) │ │ (Nginx) │ │ (FastAPI) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ 数据库层 │
│ • SQLite │
│ • ChromaDB │
│ • Redis缓存 │
└─────────────────┘
5.2 Docker部署配置
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
curl \
&& rm -rf /var/lib/apt/lists/*
# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建数据目录
RUN mkdir -p /data/knowledge
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
knowledge-base:
build: .
ports:
- "8000:8000"
volumes:
- ./data:/data/knowledge
environment:
- DATABASE_PATH=/data/knowledge/knowledge.db
- CHROMA_DB_PATH=/data/knowledge/chroma_db
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- knowledge-base
restart: unless-stopped
volumes:
redis_data:
5.3 监控与日志
# monitoring.py
import logging
from datetime import datetime
import psutil
import json
class KnowledgeBaseMonitor:
"""知识库监控系统"""
def __init__(self, log_file: str = "knowledge_monitor.log"):
self.logger = self._setup_logger(log_file)
def _setup_logger(self, log_file: str):
"""设置日志"""
logger = logging.getLogger("knowledge_monitor")
logger.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
def log_operation(self, operation: str, details: Dict):
"""记录操作日志"""
self.logger.info(f"Operation: {operation} - {json.dumps(details)}")
def log_error(self, error: str, context: Dict = None):
"""记录错误日志"""
self.logger.error(f"Error: {error} - Context: {context or {}}")
def get_system_metrics(self) -> Dict:
"""获取系统指标"""
return {
"timestamp": datetime.now().isoformat(),
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent
}
def check_database_health(self, kb: KnowledgeBase) -> Dict:
"""检查数据库健康状态"""
try:
cursor = kb.conn.cursor()
# 检查表
cursor.execute('''
SELECT name FROM sqlite_master
WHERE type='table' AND name LIKE 'knowledge_%'
''')
tables = [row[0] for row in cursor.fetchall()]
# 检查数据量
cursor.execute('SELECT COUNT(*) FROM knowledge_items')
knowledge_count = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM conversation_history')
conversation_count = cursor.fetchone()[0]
return {
"status": "healthy",
"tables": tables,
"knowledge_count": knowledge_count,
"conversation_count": conversation_count,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def generate_report(self, period_days: int = 7) -> Dict:
"""生成统计报告"""
# 这里可以连接数据库生成各种统计
report = {
"period_days": period_days,
"generated_at": datetime.now().isoformat(),
"metrics": self.get_system_metrics(),
"summary": {
"total_knowledge_items": 0,
"total_conversations": 0,
"avg_response_time": 0,
"most_accessed_knowledge": []
}
}
return report
六、最佳实践与优化建议
6.1 性能优化
缓存策略:
高频知识使用Redis缓存 设置合理的过期时间 实现缓存预热机制
数据库优化:
为常用查询字段建立索引 定期清理历史数据 使用连接池管理数据库连接
搜索优化:
结合向量搜索和关键词搜索 实现搜索结果的智能排序 支持搜索条件的动态调整
6.2 安全考虑
数据安全:
敏感数据加密存储 访问权限控制 定期数据备份
API安全:
实现API限流 请求验证和过滤 防止SQL注入和XSS攻击
隐私保护:
用户数据匿名化处理 合规的数据保留策略 明确的隐私政策
6.3 扩展性设计
模块化设计:
各个组件松耦合 支持插件化扩展 配置驱动开发
水平扩展:
无状态的服务设计 支持多实例部署 负载均衡策略
数据分片:
按用户或业务分片 分布式数据库支持 数据迁移策略
七、未来发展方向
7.1 技术演进
多模态知识库:
支持图像、音频、视频知识 跨模态检索能力 多模态内容生成
实时学习:
在线增量学习 主动知识获取 自适应优化
联邦学习:
分布式知识共享 隐私保护学习 协同知识构建
7.2 应用场景扩展
企业知识管理:
文档智能检索 专家经验传承 团队协作优化
教育领域:
个性化学习路径 智能答疑系统 学习效果评估
客服系统:
智能问答机器人 情感分析响应 服务流程优化
八、总结
构建AI Agent知识库是一个系统工程,需要综合考虑技术选型、架构设计、性能优化和实际需求。本文提供的方案具有以下特点:
轻量高效:基于SQLite和ChromaDB,适合中小规模应用 功能完整:覆盖知识存储、检索、更新、维护全流程 易于集成:提供清晰的API接口,便于与现有系统集成 可扩展性强:模块化设计,支持功能扩展和性能优化
随着AI技术的不断发展,知识库将成为智能体系统的核心组件。通过合理的架构设计和持续优化,我们可以构建出更智能、更可靠、更实用的AI Agent系统。
源码获取:本文所有代码均已开源,可在GitHub仓库中找到完整实现。
关于作者:专注于AI系统架构设计,致力于构建下一代智能助手系统。如有问题或建议,欢迎通过微信公众号交流。
夜雨聆风