大家好,老 J。
经过前八期,我们已经掌握了一个完整 Agent 的核心能力。但还有一个问题没解决:如何把 Agent 从原型变成生产级服务?
这期不讲具体代码,而是讲架构设计——高可用、异步、缓存、成本优化、安全防护。
一、生产级 Agent 架构全景
┌─────────────────────────────────────────────────────────────────┐
│ 生产级 Agent 架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 客户端 ──► API Gateway ──► 负载均衡 ──► Agent 服务集群 │
│ │ │
│ ┌───────────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ │ Agent 1 │ │ Agent 2 │ │ Agent N │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │ │
│ └───────────┬───────────┴───────────────────┘
│ │
│ ┌───────────────────────┼───────────────────────┐
│ │ │ │
│ ▼ ▼ ▼
│ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ │ Redis │ │ MQ │ │ DB │
│ │ 缓存 │ │ 队列 │ │ 持久化 │
│ └─────────┘ └─────────┘ └─────────┘
│ │
│ ▼
│ ┌─────────────┐
│ │ LLM 服务 │
│ │ (多模型降级) │
│ └─────────────┘
│ │
└─────────────────────────────────────────────────────────────────┘二、高可用架构
2.1 无状态设计
# stateless_agent.py
"""
无状态 Agent 设计 - 支持水平扩展
"""
classStatelessAgent:
"""无状态 Agent,状态存储在 Redis"""
def__init__(self, redis_client):
self.redis = redis_client
self.llm = ChatOpenAI()
asyncdefchat(self, session_id: str, message: str) -> str:
# 从 Redis 加载历史
history = self.redis.get(f"session:{session_id}")
# 调用 LLM
response = awaitself.llm.acall(message, history)
# 保存历史到 Redis
self.redis.setex(f"session:{session_id}", 3600, new_history)
return response2.2 健康检查与优雅停机
# health_check.py
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
import signal
app = FastAPI()
@app.get("/health/liveness")
asyncdefliveness():
"""存活探针"""
return {"status": "alive"}
@app.get("/health/readiness")
asyncdefreadiness():
"""就绪探针"""
# 检查依赖服务状态
checks = {
"llm": await check_llm_health(),
"redis": await check_redis_health()
}
ifall(checks.values()):
return {"status": "ready", "checks": checks}
return {"status": "not_ready", "checks": checks}, 503
@asynccontextmanager
asyncdeflifespan(app: FastAPI):
# 启动时初始化
await init_agent()
yield
# 关闭时优雅停机
await shutdown_agent()三、异步处理与队列
3.1 同步 vs 异步
3.2 异步任务队列
# async_queue.py
"""
使用 Celery + Redis 实现异步任务队列
"""
from celery import Celery
from typing importDict, Any
# 配置 Celery
app = Celery('agent_tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3, soft_time_limit=60)
defprocess_agent_task(self, task_id: str, user_input: str) -> Dict[str, Any]:
"""异步处理 Agent 任务"""
try:
agent = Agent()
result = agent.invoke(user_input)
# 保存结果到 Redis
redis_client.setex(f"task_result:{task_id}", 3600, result)
return {"status": "completed", "result": result}
except Exception as e:
# 重试机制
self.retry(exc=e, countdown=60)
return {"status": "failed", "error": str(e)}
# API 层
from fastapi import BackgroundTasks
@app.post("/agent/async")
asyncdefagent_async(request: Request, background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
# 异步执行
background_tasks.add_task(process_agent_task, task_id, request.message)
return {"task_id": task_id, "status": "pending"}
@app.get("/agent/result/{task_id}")
asyncdefget_result(task_id: str):
result = redis_client.get(f"task_result:{task_id}")
if result:
return {"status": "completed", "result": result}
return {"status": "pending"}四、缓存策略
4.1 多级缓存
# cache_strategy.py
"""
多级缓存策略
本地缓存 → Redis 缓存 → LLM 调用
"""
from functools import lru_cache
import hashlib
import redis
from typing importOptional
classMultiLevelCache:
"""多级缓存"""
def__init__(self):
self.redis_client = redis.Redis(decode_responses=True)
self.local_cache = {}
def_get_key(self, query: str) -> str:
returnf"agent:cache:{hashlib.md5(query.encode()).hexdigest()}"
defget(self, query: str) -> Optional[str]:
# L1: 本地缓存
if query inself.local_cache:
returnself.local_cache[query]
# L2: Redis 缓存
key = self._get_key(query)
cached = self.redis_client.get(key)
if cached:
self.local_cache[query] = cached
return cached
returnNone
defset(self, query: str, response: str, ttl: int = 3600):
key = self._get_key(query)
self.local_cache[query] = response
self.redis_client.setex(key, ttl, response)
classSemanticCache:
"""语义缓存(基于向量相似度)"""
def__init__(self, threshold: float = 0.95):
self.embeddings = OpenAIEmbeddings()
self.vectorstore = Chroma()
self.threshold = threshold
defget(self, query: str) -> Optional[str]:
query_embedding = self.embeddings.embed_query(query)
results = self.vectorstore.similarity_search_by_vector(
query_embedding, k=1
)
if results and results[0].score > self.threshold:
return results[0].metadata["response"]
returnNone
defset(self, query: str, response: str):
embedding = self.embeddings.embed_query(query)
self.vectorstore.add_texts(
[query],
metadatas=[{"response": response, "embedding": embedding}]
)4.2 缓存策略选择
五、成本优化
5.1 Token 优化策略
# cost_optimizer.py
"""
成本优化器
"""
classCostOptimizer:
"""控制 Token 消耗"""
def__init__(self, max_tokens_per_request=2000):
self.max_tokens_per_request = max_tokens_per_request
deftruncate_context(self, context: str, max_chars: int = 2000) -> str:
"""截断过长的上下文"""
iflen(context) <= max_chars:
return context
return context[:max_chars] + "...(已截断)"
defsmart_prompt(self, user_input: str, history: list) -> str:
"""智能构建 prompt,只保留关键信息"""
# 只保留最近 3 轮对话
recent_history = history[-3:] if history else []
# 压缩用户输入
compressed_input = user_input[:500]
returnself._build_prompt(compressed_input, recent_history)
defestimate_cost(self, prompt_tokens: int, completion_tokens: int, model: str = "gpt-4") -> float:
"""估算成本"""
rates = {
"gpt-4": {"prompt": 0.03, "completion": 0.06},
"gpt-3.5-turbo": {"prompt": 0.001, "completion": 0.002}
}
rate = rates.get(model, rates["gpt-4"])
cost = (prompt_tokens / 1000) * rate["prompt"] + \
(completion_tokens / 1000) * rate["completion"]
return cost5.2 模型降级策略
# model_fallback.py
"""
多模型降级策略
"""
classModelRouter:
"""模型路由器"""
def__init__(self):
self.models = {
"primary": "gpt-4",
"secondary": "gpt-3.5-turbo",
"fallback": "claude-instant"
}
self.costs = {
"gpt-4": 1.0,
"gpt-3.5-turbo": 0.1,
"claude-instant": 0.05
}
asyncdefroute(self, user_input: str, context: dict) -> str:
"""智能路由到合适的模型"""
# 简单问题用便宜模型
iflen(user_input) < 50andnotself._needs_reasoning(user_input):
model = self.models["secondary"]
# 复杂推理用 gpt-4
elifself._needs_reasoning(user_input):
model = self.models["primary"]
# 默认
else:
model = self.models["secondary"]
returnawaitself._call_model(model, user_input, context)
asyncdeffallback(self, user_input: str, context: dict) -> str:
"""降级策略"""
for model in [self.models["secondary"], self.models["fallback"]]:
try:
returnawaitself._call_model(model, user_input, context)
except Exception:
continue
return"服务繁忙,请稍后重试"六、安全防护
6.1 输入/输出过滤
# security.py
"""
安全防护
"""
import re
from typing importList
classContentFilter:
"""内容过滤器"""
# 敏感词列表
SENSITIVE_WORDS = ["暴力", "色情", "政治敏感词"]
# 注入模式
INJECTION_PATTERNS = [
r"ignore previous instructions",
r"system\s*:",
r"你是一个.*助手",
]
deffilter_input(self, user_input: str) -> str:
"""过滤用户输入"""
# 移除注入尝试
for pattern inself.INJECTION_PATTERNS:
user_input = re.sub(pattern, "", user_input, flags=re.IGNORECASE)
# 检查敏感词
for word inself.SENSITIVE_WORDS:
if word in user_input:
raise ValueError(f"输入包含敏感词: {word}")
return user_input.strip()[:2000]
deffilter_output(self, output: str) -> str:
"""过滤模型输出"""
# 脱敏处理
output = self._redact_pii(output)
# 检查敏感信息
output = self._check_sensitive(output)
return output
def_redact_pii(self, text: str) -> str:
"""脱敏处理"""
# 手机号脱敏
text = re.sub(r'1[3-9]\d{9}', '138****0000', text)
# 邮箱脱敏
text = re.sub(r'\b\w+@\w+\.\w+\b', 'user@example.com', text)
return text6.2 限流与配额
# rate_limit.py
"""
限流与配额
"""
import time
from collections import defaultdict
from typing importDict
classTokenBucket:
"""令牌桶限流"""
def__init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
defconsume(self, tokens: int = 1) -> bool:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
ifself.tokens >= tokens:
self.tokens -= tokens
returnTrue
returnFalse
classRateLimiter:
"""多维度限流器"""
def__init__(self):
self.limiters: Dict[str, TokenBucket] = {}
defcheck_limit(self, user_id: str, api_key: str) -> bool:
"""检查是否超过限制"""
# 用户级别限制:100 次/分钟
user_key = f"user:{user_id}"
if user_key notinself.limiters:
self.limiters[user_key] = TokenBucket(100, 100/60)
# API Key 级别限制:1000 次/分钟
api_key = f"api:{api_key}"
if api_key notinself.limiters:
self.limiters[api_key] = TokenBucket(1000, 1000/60)
returnself.limiters[user_key].consume() andself.limiters[api_key].consume()七、监控与告警
# monitoring.py
"""
监控与告警
"""
from dataclasses import dataclass
from typing importList, Dict
import logging
@dataclass
classAlert:
"""告警"""
name: str
level: str# info, warning, critical
message: str
value: float
threshold: float
classAgentMonitor:
"""Agent 监控器"""
def__init__(self):
self.logger = logging.getLogger(__name__)
self.metrics = {
"requests_total": 0,
"requests_success": 0,
"requests_failed": 0,
"avg_latency_ms": 0,
"token_usage": 0
}
defrecord_request(self, success: bool, latency_ms: float, tokens: int):
"""记录请求"""
self.metrics["requests_total"] += 1
if success:
self.metrics["requests_success"] += 1
else:
self.metrics["requests_failed"] += 1
# 滑动平均延迟
self.metrics["avg_latency_ms"] = (
self.metrics["avg_latency_ms"] * 0.9 + latency_ms * 0.1
)
self.metrics["token_usage"] += tokens
# 检查告警
self._check_alerts()
def_check_alerts(self):
"""检查告警条件"""
alerts = []
# 错误率告警
error_rate = self.metrics["requests_failed"] / max(1, self.metrics["requests_total"])
if error_rate > 0.05: # >5%
alerts.append(Alert(
name="high_error_rate",
level="critical",
message="错误率过高",
value=error_rate,
threshold=0.05
))
# 延迟告警
ifself.metrics["avg_latency_ms"] > 5000: # >5秒
alerts.append(Alert(
name="high_latency",
level="warning",
message="响应延迟过高",
value=self.metrics["avg_latency_ms"],
threshold=5000
))
for alert in alerts:
self._send_alert(alert)
def_send_alert(self, alert: Alert):
"""发送告警"""
self.logger.warning(f"[{alert.level}] {alert.name}: {alert.message}")
# 可接入钉钉/企微/邮件八、成本估算公式
# cost_estimate.py
"""
成本估算工具
"""
defestimate_monthly_cost(
daily_requests: int,
avg_tokens_per_request: int,
model: str = "gpt-4",
cache_hit_rate: float = 0.3
) -> dict:
"""估算月度成本"""
rates = {
"gpt-4": {"prompt": 0.03, "completion": 0.06},
"gpt-4-turbo": {"prompt": 0.01, "completion": 0.03},
"gpt-3.5-turbo": {"prompt": 0.001, "completion": 0.002}
}
monthly_requests = daily_requests * 30
cached_requests = monthly_requests * cache_hit_rate
llm_requests = monthly_requests - cached_requests
# 假设 prompt:completion = 2:1
prompt_tokens = avg_tokens_per_request * 2/3
completion_tokens = avg_tokens_per_request * 1/3
rate = rates.get(model, rates["gpt-3.5-turbo"])
cost_per_request = (prompt_tokens / 1000) * rate["prompt"] + \
(completion_tokens / 1000) * rate["completion"]
total_cost = llm_requests * cost_per_request
return {
"model": model,
"monthly_requests": monthly_requests,
"cache_hit_rate": cache_hit_rate,
"llm_calls": llm_requests,
"cost_per_request_usd": round(cost_per_request, 4),
"estimated_monthly_cost_usd": round(total_cost, 2)
}
# 示例:估算
# estimate_monthly_cost(10000, 500, "gpt-3.5-turbo", 0.3)
# → {"estimated_monthly_cost_usd": ~12}九、部署架构建议
十、下期预告
AI Agent 智能体从入门到实战(十):智能编程助手实战
• 代码生成与补全 • 代码解释与重构 • Bug 定位与修复 • 单元测试生成
我是老 J,下期见。
夜雨聆风