使用 fastapi-limiter 插件实现多种限流策略
你的 API 被人疯狂调用了吗?
上线第一天,接口响应正常。第二天,突然多了几百个请求。第三天,服务器开始报警,数据库连接池耗尽,用户投诉页面卡顿…
这就是没有限流的后果。限流(Rate Limiting)就像高速公路的收费站——不是不让车过,而是控制车流量,避免所有人都堵在一起。
今天咱们聊聊 fastapi-limiter —— 一个让你轻松给 FastAPI 接口”安装收费站”的插件。
读完本文,你将学会:
- • 什么是限流,为什么要限流
- • 用 fastapi-limiter 实现多种限流策略
- • 全局、路由级、用户级限流的实战写法
- • 生产环境的最佳实践
一、为什么要限流?
限流是保护自己的最后一道防线
想象一下你的 API 是一扇玻璃门:
- • 正常用户:轻轻推开,优雅地进出
- • 爬虫脚本:疯狂撞击,每分钟几千次
- • 恶意攻击:直接拿锤子砸(DDoS)
没有限流,你的玻璃门迟早碎成渣。
限流的常见场景
| 场景 | 为什么要限流 |
|---|---|
| 登录接口 | 防止暴力破解密码 |
| 短信验证码 | 防止短信轰炸(被人薅短信费) |
| 文件下载 | 防止带宽被占满 |
| 数据查询 | 防止数据库被拖垮 |
| 第三方 API 转发 | 防止超出上游配额 |
二、fastapi-limiter 基础用法
项目结构
limiter-demo/
├── main.py # 入口文件
├── deps.py # 限流依赖
├── redis_client.py # Redis 连接
└── requirements.txt # 依赖
依赖安装
fastapi==0.104.1
uvicorn[standard]==0.24.0
fastapi-limiter==0.1.6
redis==5.0.1
错误写法(新手常见)
from fastapi import FastAPI
from fastapi_limiter import FastAPILimiter
app = FastAPI()
# ❌ 错误:没有初始化 Redis
@app.on_event("startup")
async def startup():
pass # 忘了初始化 limiter
# ❌ 错误:直接在装饰器里写字符串
@app.get("/items")
async def get_items():
return {"items": []}
# 限流逻辑写在了函数内部,性能差、不统一
问题在哪?
- • 没有初始化
FastAPILimiter,限流根本不生效 - • 限流逻辑和业务逻辑混在一起,维护困难
正确写法(基础版本)
import redis.asyncio as redis
from fastapi import FastAPI
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
app = FastAPI(title="FastAPI 限流示例")
@app.on_event("startup")
async def startup():
# ✅ 初始化 Redis 连接
redis_instance = redis.from_url(
"redis://localhost:6379",
encoding="utf-8",
decode_responses=True
)
await FastAPILimiter.init(redis_instance)
@app.on_event("shutdown")
async def shutdown():
await FastAPILimiter.close()
# ✅ 使用 RateLimiter 依赖,每秒最多 5 次
@app.get("/items", dependencies=[Depends(RateLimiter(times=5, seconds=1))])
async def get_items():
return {"items": ["苹果", "香蕉", "橙子"]}
优化写法(生产版本)
import logging
from typing import Callable, Optional
import redis.asyncio as redis
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 配置参数
class Config:
"""应用配置"""
REDIS_URL = "redis://localhost:6379/0"
HOST = "0.0.0.0"
PORT = 8000
app = FastAPI(title="FastAPI 限流实战", version="1.0.0")
@app.on_event("startup")
async def startup():
"""启动时初始化 Redis 和限流器"""
try:
redis_instance = redis.from_url(
Config.REDIS_URL,
encoding="utf-8",
decode_responses=True
)
await FastAPILimiter.init(redis_instance)
logger.info("✅ 限流器初始化成功")
except Exception as e:
logger.error(f"❌ Redis 连接失败: {e}")
raise
@app.on_event("shutdown")
async def shutdown():
"""关闭时清理资源"""
try:
await FastAPILimiter.close()
logger.info("✅ 限流器已关闭")
except Exception as e:
logger.error(f"❌ 关闭限流器失败: {e}")
# ==================== 限流策略定义 ====================
# 1. 全局通用限流:每秒 10 次
limit_10_per_second = RateLimiter(times=10, seconds=1)
# 2. 严格限流:每分钟 5 次(适合敏感接口)
limit_5_per_minute = RateLimiter(times=5, seconds=60)
# 3. 宽松限流:每小时 100 次
limit_100_per_hour = RateLimiter(times=100, seconds=3600)
# ==================== 路由定义 ====================
@app.get("/", dependencies=[Depends(limit_10_per_second)])
async def root():
"""首页 - 普通限流"""
return {"message": "欢迎来到限流演示 API"}
@app.get("/items", dependencies=[Depends(limit_10_per_second)])
async def get_items():
"""获取商品列表 - 普通限流"""
try:
items = ["苹果", "香蕉", "橙子", "葡萄"]
return {"code": 0, "data": items, "msg": "success"}
except Exception as e:
logger.error(f"获取商品列表失败: {e}")
raise HTTPException(500, "服务器内部错误")
@app.post("/login", dependencies=[Depends(limit_5_per_minute)])
async def login(request: Request):
"""
用户登录 - 严格限流
防止暴力破解,每分钟最多尝试 5 次
"""
try:
# 模拟登录逻辑
logger.info(f"登录请求来自: {request.client.host}")
return {"code": 0, "msg": "登录成功", "token": "fake-jwt-token"}
except Exception as e:
logger.error(f"登录失败: {e}")
raise HTTPException(500, "登录服务异常")
@app.get("/download", dependencies=[Depends(limit_100_per_hour)])
async def download_file():
"""
文件下载 - 宽松限流
每小时最多下载 100 次,防止带宽耗尽
"""
try:
return {"code": 0, "msg": "下载链接已生成", "url": "https://example.com/file.zip"}
except Exception as e:
logger.error(f"下载失败: {e}")
raise HTTPException(500, "下载服务异常")
@app.get("/health")
async def health_check():
"""健康检查 - 不限流"""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host=Config.HOST, port=Config.PORT)
三、用户级限流(按用户区分)
上面的限流是全局的——所有人共享同一个配额。但很多时候,我们需要按用户限流。
比如:普通用户每分钟 10 次,VIP 用户每分钟 100 次。
按用户 ID 限流
from fastapi import Request
from fastapi_limiter.depends import RateLimiter
async def get_user_id(request: Request) -> str:
"""
从请求中提取用户 ID
实际项目中可以从 token 或 session 获取
"""
token = request.headers.get("Authorization", "anonymous")
# 简化处理,直接用 token 前 8 位作为用户标识
return token[:8] if len(token) > 8 else token
# 自定义限流键函数
async def user_id_identifier(request: Request):
"""
生成基于用户 ID 的限流键
格式: limiter:user_id:route
"""
user_id = await get_user_id(request)
return user_id
# ✅ 按用户限流:每个用户每分钟 10 次
@app.get("/user-data", dependencies=[
Depends(RateLimiter(times=10, seconds=60, identifier=user_id_identifier))
])
async def get_user_data():
"""获取用户数据 - 按用户限流"""
return {"code": 0, "data": "你的专属数据"}
按 IP 限流
async def ip_identifier(request: Request):
"""
基于 IP 地址的限流键
适用于未登录用户的限流
"""
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
# 获取真实 IP(经过反向代理时)
ip = forwarded.split(",")[0].strip()
else:
ip = request.client.host
return ip
# ✅ 按 IP 限流:每个 IP 每分钟 20 次
@app.get("/search", dependencies=[
Depends(RateLimiter(times=20, seconds=60, identifier=ip_identifier))
])
async def search(keyword: str):
"""搜索接口 - 按 IP 限流"""
return {"code": 0, "results": [f"关于 {keyword} 的结果"]}
四、进阶:多种限流策略组合
策略 1:多维度限流
同时限制每秒请求数和每日请求数:
from fastapi import Depends
from fastapi_limiter.depends import RateLimiter
# 组合限流:每秒 5 次 AND 每天 1000 次
@app.get("/api/data", dependencies=[
Depends(RateLimiter(times=5, seconds=1)), # 瞬时爆发限制
Depends(RateLimiter(times=1000, seconds=86400)) # 日总量限制
])
async def get_api_data():
"""API 数据查询 - 多维度限流"""
return {"code": 0, "data": "重要数据"}
策略 2:动态限流(根据用户等级)
from fastapi import Request, HTTPException
async def dynamic_rate_limit(request: Request):
"""
根据用户等级动态设置限流
VIP 用户更宽松,普通用户更严格
"""
user_level = request.headers.get("X-User-Level", "normal")
if user_level == "vip":
# VIP: 每秒 50 次
limiter = RateLimiter(times=50, seconds=1)
elif user_level == "premium":
# 高级用户: 每秒 20 次
limiter = RateLimiter(times=20, seconds=1)
else:
# 普通用户: 每秒 5 次
limiter = RateLimiter(times=5, seconds=1)
# 执行限流检查
await limiter(request)
@app.get("/premium-api", dependencies=[Depends(dynamic_rate_limit)])
async def premium_api(request: Request):
"""高级 API - 根据用户等级动态限流"""
user_level = request.headers.get("X-User-Level", "normal")
return {"code": 0, "level": user_level, "data": "高级功能"}
策略 3:自定义限流响应
from fastapi import Request
from fastapi.responses import JSONResponse
from fastapi_limiter import FastAPILimiter
from starlette.middleware.base import BaseHTTPMiddleware
class RateLimitResponseMiddleware(BaseHTTPMiddleware):
"""
自定义限流响应格式
默认返回 429,但格式可能不符合你的接口规范
"""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
if response.status_code == 429:
return JSONResponse(
status_code=429,
content={
"code": 429,
"msg": "请求太频繁,请稍后再试",
"retry_after": response.headers.get("Retry-After", "60")
}
)
return response
# 注册中间件
app.add_middleware(RateLimitResponseMiddleware)
五、测试限流效果
使用 wrk 或 ab 压测
# 安装 wrk(如果还没有)
# Ubuntu: sudo apt-get install wrk
# macOS: brew install wrk
# 压测 10 秒,12 个线程,400 个连接
wrk -t12 -c400 -d10s http://localhost:8000/items
# 你应该看到:
# - 前 5 秒大量请求成功
# - 之后部分请求返回 429(Too Many Requests)
使用 Python 脚本测试
import asyncio
import aiohttp
async def test_rate_limit():
"""测试限流效果"""
url = "http://localhost:8000/items"
async with aiohttp.ClientSession() as session:
# 连续发送 20 个请求
tasks = [session.get(url) for _ in range(20)]
responses = await asyncio.gather(*tasks)
success = sum(1 for r in responses if r.status == 200)
limited = sum(1 for r in responses if r.status == 429)
print(f"✅ 成功: {success}")
print(f"⛔ 限流: {limited}")
asyncio.run(test_rate_limit())
六、生产环境注意事项
1. Redis 高可用
# 使用 Redis Sentinel 保证高可用
from redis.asyncio.sentinel import Sentinel
sentinel = Sentinel([
("sentinel1", 26379),
("sentinel2", 26379),
("sentinel3", 26379)
])
redis_master = sentinel.master_for("mymaster", decode_responses=True)
await FastAPILimiter.init(redis_master)
2. 限流Key设计
async def smart_identifier(request: Request):
"""
智能限流键:组合用户ID + 接口路径
好处:不同接口独立计数,互不干扰
"""
user_id = request.headers.get("Authorization", "anon")[:8]
route = request.url.path.replace("/", "_")
return f"{user_id}:{route}"
3. 监控和报警
@app.middleware("http")
async def rate_limit_monitor(request: Request, call_next):
"""监控限流触发情况"""
response = await call_next(request)
if response.status_code == 429:
logger.warning(
f"限流触发: {request.client.host} -> {request.url.path}",
extra={"event": "rate_limit_triggered"}
)
# 这里可以接入 Prometheus / 钉钉报警
return response
总结
今天我们一起学习了 FastAPI 限流实战:
- • 限流的本质:控制请求速率,保护系统资源不被耗尽
- • fastapi-limiter 核心:基于 Redis 的分布式计数器,配合
RateLimiter依赖注入 - • 限流粒度:全局限流 → IP 限流 → 用户级限流,越精细越灵活
- • 组合策略:瞬时限制 + 日总量限制,多维度防护
- • 生产要点:Redis 高可用、智能限流键、监控报警
限流不是惩罚正常用户,而是拒绝恶意流量,保障大多数人能正常使用。
每日踩一坑:
RateLimiter 的 seconds 参数是整个时间窗口,不是冷却时间。比如 times=5, seconds=60 表示60 秒内最多 5 次,而不是”每次请求后等待 60 秒”。搞混了就会发现明明等了很久还是 429!
本期分享就到这里啦,祝君在测开之路越走越远,越走越顺。
夜雨聆风