乐于分享
好东西不私藏

使用 fastapi-limiter 插件实现多种限流策略

使用 fastapi-limiter 插件实现多种限流策略

你的 API 被人疯狂调用了吗?

上线第一天,接口响应正常。第二天,突然多了几百个请求。第三天,服务器开始报警,数据库连接池耗尽,用户投诉页面卡顿…

这就是没有限流的后果。限流(Rate Limiting)就像高速公路的收费站——不是不让车过,而是控制车流量,避免所有人都堵在一起。

今天咱们聊聊 fastapi-limiter —— 一个让你轻松给 FastAPI 接口”安装收费站”的插件。

读完本文,你将学会:

  • • 什么是限流,为什么要限流
  • • 用 fastapi-limiter 实现多种限流策略
  • • 全局、路由级、用户级限流的实战写法
  • • 生产环境的最佳实践

一、为什么要限流?

限流是保护自己的最后一道防线

想象一下你的 API 是一扇玻璃门:

  • 正常用户:轻轻推开,优雅地进出
  • 爬虫脚本:疯狂撞击,每分钟几千次
  • 恶意攻击:直接拿锤子砸(DDoS)

没有限流,你的玻璃门迟早碎成渣。

限流的常见场景

       

         
           
           
         

场景 为什么要限流
登录接口 防止暴力破解密码
短信验证码 防止短信轰炸(被人薅短信费)
文件下载 防止带宽被占满
数据查询 防止数据库被拖垮
第三方 API 转发 防止超出上游配额

       

     

二、fastapi-limiter 基础用法

项目结构

limiter-demo/
├── main.py          # 入口文件
├── deps.py          # 限流依赖
├── redis_client.py  # Redis 连接
└── requirements.txt # 依赖

依赖安装

fastapi==0.104.1
uvicorn[standard]==0.24.0
fastapi-limiter==0.1.6
redis==5.0.1

错误写法(新手常见)

from fastapi import FastAPI
from
 fastapi_limiter import FastAPILimiter

app = FastAPI()

# ❌ 错误:没有初始化 Redis

@app.on_event("startup")

async
 def startup():
    pass
  # 忘了初始化 limiter

# ❌ 错误:直接在装饰器里写字符串

@app.get("/items")

async
 def get_items():
    return
 {"items": []}
    # 限流逻辑写在了函数内部,性能差、不统一

问题在哪?

  • • 没有初始化 FastAPILimiter,限流根本不生效
  • • 限流逻辑和业务逻辑混在一起,维护困难

正确写法(基础版本)

import redis.asyncio as redis
from
 fastapi import FastAPI
from
 fastapi_limiter import FastAPILimiter
from
 fastapi_limiter.depends import RateLimiter

app = FastAPI(title="FastAPI 限流示例")

@app.on_event("startup")

async
 def startup():
    # ✅ 初始化 Redis 连接

    redis_instance = redis.from_url(
        "redis://localhost:6379"
,
        encoding="utf-8",
        decode_responses=True
    )
    await
 FastAPILimiter.init(redis_instance)

@app.on_event("shutdown")

async
 def shutdown():
    await
 FastAPILimiter.close()

# ✅ 使用 RateLimiter 依赖,每秒最多 5 次

@app.get("/items", dependencies=[Depends(RateLimiter(times=5, seconds=1))])

async
 def get_items():
    return
 {"items": ["苹果", "香蕉", "橙子"]}

优化写法(生产版本)

import logging
from
 typing import Callable, Optional
import
 redis.asyncio as redis
from
 fastapi import FastAPI, Request, HTTPException, Depends
from
 fastapi_limiter import FastAPILimiter
from
 fastapi_limiter.depends import RateLimiter

# 配置日志

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 配置参数

class
 Config:
    """应用配置"""

    REDIS_URL = "redis://localhost:6379/0"
    HOST = "0.0.0.0"
    PORT = 8000

app = FastAPI(title="FastAPI 限流实战", version="1.0.0")

@app.on_event("startup")

async
 def startup():
    """启动时初始化 Redis 和限流器"""

    try
:
        redis_instance = redis.from_url(
            Config.REDIS_URL,
            encoding="utf-8",
            decode_responses=True
        )
        await
 FastAPILimiter.init(redis_instance)
        logger.info("✅ 限流器初始化成功")
    except
 Exception as e:
        logger.error(f"❌ Redis 连接失败: {e}")
        raise


@app.on_event("shutdown")

async
 def shutdown():
    """关闭时清理资源"""

    try
:
        await
 FastAPILimiter.close()
        logger.info("✅ 限流器已关闭")
    except
 Exception as e:
        logger.error(f"❌ 关闭限流器失败: {e}")

# ==================== 限流策略定义 ====================


# 1. 全局通用限流:每秒 10 次

limit_10_per_second = RateLimiter(times=10, seconds=1)

# 2. 严格限流:每分钟 5 次(适合敏感接口)

limit_5_per_minute = RateLimiter(times=5, seconds=60)

# 3. 宽松限流:每小时 100 次

limit_100_per_hour = RateLimiter(times=100, seconds=3600)

# ==================== 路由定义 ====================


@app.get("/", dependencies=[Depends(limit_10_per_second)])

async
 def root():
    """首页 - 普通限流"""

    return
 {"message": "欢迎来到限流演示 API"}

@app.get("/items", dependencies=[Depends(limit_10_per_second)])

async
 def get_items():
    """获取商品列表 - 普通限流"""

    try
:
        items = ["苹果", "香蕉", "橙子", "葡萄"]
        return
 {"code": 0, "data": items, "msg": "success"}
    except
 Exception as e:
        logger.error(f"获取商品列表失败: {e}")
        raise
 HTTPException(500, "服务器内部错误")

@app.post("/login", dependencies=[Depends(limit_5_per_minute)])

async
 def login(request: Request):
    """
    用户登录 - 严格限流
    
    防止暴力破解,每分钟最多尝试 5 次
    """

    try
:
        # 模拟登录逻辑

        logger.info(f"登录请求来自: {request.client.host}")
        return
 {"code": 0, "msg": "登录成功", "token": "fake-jwt-token"}
    except
 Exception as e:
        logger.error(f"登录失败: {e}")
        raise
 HTTPException(500, "登录服务异常")

@app.get("/download", dependencies=[Depends(limit_100_per_hour)])

async
 def download_file():
    """
    文件下载 - 宽松限流
    
    每小时最多下载 100 次,防止带宽耗尽
    """

    try
:
        return
 {"code": 0, "msg": "下载链接已生成", "url": "https://example.com/file.zip"}
    except
 Exception as e:
        logger.error(f"下载失败: {e}")
        raise
 HTTPException(500, "下载服务异常")

@app.get("/health")

async
 def health_check():
    """健康检查 - 不限流"""

    return
 {"status": "healthy"}

if
 __name__ == "__main__":
    import
 uvicorn
    uvicorn.run(app, host=Config.HOST, port=Config.PORT)

三、用户级限流(按用户区分)

上面的限流是全局的——所有人共享同一个配额。但很多时候,我们需要按用户限流

比如:普通用户每分钟 10 次,VIP 用户每分钟 100 次。

按用户 ID 限流

from fastapi import Request
from
 fastapi_limiter.depends import RateLimiter

async
 def get_user_id(request: Request) -> str:
    """
    从请求中提取用户 ID
    
    实际项目中可以从 token 或 session 获取
    """

    token = request.headers.get("Authorization", "anonymous")
    # 简化处理,直接用 token 前 8 位作为用户标识

    return
 token[:8] if len(token) > 8 else token

# 自定义限流键函数

async
 def user_id_identifier(request: Request):
    """
    生成基于用户 ID 的限流键
    
    格式: limiter:user_id:route
    """

    user_id = await get_user_id(request)
    return
 user_id

# ✅ 按用户限流:每个用户每分钟 10 次

@app.get("/user-data", dependencies=[
    Depends(RateLimiter(times=10, seconds=60, identifier=user_id_identifier))
]
)

async
 def get_user_data():
    """获取用户数据 - 按用户限流"""

    return
 {"code": 0, "data": "你的专属数据"}

按 IP 限流

async def ip_identifier(request: Request):
    """
    基于 IP 地址的限流键
    
    适用于未登录用户的限流
    """

    forwarded = request.headers.get("X-Forwarded-For")
    if
 forwarded:
        # 获取真实 IP(经过反向代理时)

        ip = forwarded.split(",")[0].strip()
    else
:
        ip = request.client.host
    return
 ip

# ✅ 按 IP 限流:每个 IP 每分钟 20 次

@app.get("/search", dependencies=[
    Depends(RateLimiter(times=20, seconds=60, identifier=ip_identifier))
]
)

async
 def search(keyword: str):
    """搜索接口 - 按 IP 限流"""

    return
 {"code": 0, "results": [f"关于 {keyword} 的结果"]}

四、进阶:多种限流策略组合

策略 1:多维度限流

同时限制每秒请求数每日请求数

from fastapi import Depends
from
 fastapi_limiter.depends import RateLimiter

# 组合限流:每秒 5 次 AND 每天 1000 次

@app.get("/api/data", dependencies=[
    Depends(RateLimiter(times=5, seconds=1)),    # 瞬时爆发限制
    Depends(RateLimiter(times=1000, seconds=86400))  # 日总量限制
]
)

async
 def get_api_data():
    """API 数据查询 - 多维度限流"""

    return
 {"code": 0, "data": "重要数据"}

策略 2:动态限流(根据用户等级)

from fastapi import Request, HTTPException

async
 def dynamic_rate_limit(request: Request):
    """
    根据用户等级动态设置限流
    
    VIP 用户更宽松,普通用户更严格
    """

    user_level = request.headers.get("X-User-Level", "normal")
    
    if
 user_level == "vip":
        # VIP: 每秒 50 次

        limiter = RateLimiter(times=50, seconds=1)
    elif
 user_level == "premium":
        # 高级用户: 每秒 20 次

        limiter = RateLimiter(times=20, seconds=1)
    else
:
        # 普通用户: 每秒 5 次

        limiter = RateLimiter(times=5, seconds=1)
    
    # 执行限流检查

    await
 limiter(request)

@app.get("/premium-api", dependencies=[Depends(dynamic_rate_limit)])

async
 def premium_api(request: Request):
    """高级 API - 根据用户等级动态限流"""

    user_level = request.headers.get("X-User-Level", "normal")
    return
 {"code": 0, "level": user_level, "data": "高级功能"}

策略 3:自定义限流响应

from fastapi import Request
from
 fastapi.responses import JSONResponse
from
 fastapi_limiter import FastAPILimiter
from
 starlette.middleware.base import BaseHTTPMiddleware

class
 RateLimitResponseMiddleware(BaseHTTPMiddleware):
    """
    自定义限流响应格式
    
    默认返回 429,但格式可能不符合你的接口规范
    """

    async
 def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        
        if
 response.status_code == 429:
            return
 JSONResponse(
                status_code=429,
                content={
                    "code"
: 429,
                    "msg"
: "请求太频繁,请稍后再试",
                    "retry_after"
: response.headers.get("Retry-After", "60")
                }
            )
        
        return
 response

# 注册中间件

app.add_middleware(RateLimitResponseMiddleware)

五、测试限流效果

使用 wrk 或 ab 压测

# 安装 wrk(如果还没有)
# Ubuntu: sudo apt-get install wrk

# macOS: brew install wrk


# 压测 10 秒,12 个线程,400 个连接

wrk -t12 -c400 -d10s http://localhost:8000/items

# 你应该看到:

# - 前 5 秒大量请求成功

# - 之后部分请求返回 429(Too Many Requests)

使用 Python 脚本测试

import asyncio
import
 aiohttp

async
 def test_rate_limit():
    """测试限流效果"""

    url = "http://localhost:8000/items"
    
    async
 with aiohttp.ClientSession() as session:
        # 连续发送 20 个请求

        tasks = [session.get(url) for _ in range(20)]
        responses = await asyncio.gather(*tasks)
        
        success = sum(1 for r in responses if r.status == 200)
        limited = sum(1 for r in responses if r.status == 429)
        
        print
(f"✅ 成功: {success}")
        print
(f"⛔ 限流: {limited}")

asyncio.run(test_rate_limit())

六、生产环境注意事项

1. Redis 高可用

# 使用 Redis Sentinel 保证高可用
from
 redis.asyncio.sentinel import Sentinel

sentinel = Sentinel([
    ("sentinel1", 26379),
    ("sentinel2", 26379),
    ("sentinel3", 26379)
])

redis_master = sentinel.master_for("mymaster", decode_responses=True)
await
 FastAPILimiter.init(redis_master)

2. 限流Key设计

async def smart_identifier(request: Request):
    """
    智能限流键:组合用户ID + 接口路径
    
    好处:不同接口独立计数,互不干扰
    """

    user_id = request.headers.get("Authorization", "anon")[:8]
    route = request.url.path.replace("/", "_")
    return
 f"{user_id}:{route}"

3. 监控和报警

@app.middleware("http")
async
 def rate_limit_monitor(request: Request, call_next):
    """监控限流触发情况"""

    response = await call_next(request)
    
    if
 response.status_code == 429:
        logger.warning(
            f"限流触发: {request.client.host} -> {request.url.path}"
,
            extra={"event": "rate_limit_triggered"}
        )
        # 这里可以接入 Prometheus / 钉钉报警

    
    return
 response

总结

今天我们一起学习了 FastAPI 限流实战:

  • 限流的本质:控制请求速率,保护系统资源不被耗尽
  • fastapi-limiter 核心:基于 Redis 的分布式计数器,配合 RateLimiter 依赖注入
  • 限流粒度:全局限流 → IP 限流 → 用户级限流,越精细越灵活
  • 组合策略:瞬时限制 + 日总量限制,多维度防护
  • 生产要点:Redis 高可用、智能限流键、监控报警

限流不是惩罚正常用户,而是拒绝恶意流量,保障大多数人能正常使用


每日踩一坑

RateLimiterseconds 参数是整个时间窗口,不是冷却时间。比如 times=5, seconds=60 表示60 秒内最多 5 次,而不是”每次请求后等待 60 秒”。搞混了就会发现明明等了很久还是 429!

本期分享就到这里啦,祝君在测开之路越走越远,越走越顺。