FastAPI 完整学习文档
目标:学完本教程能独立开发完整的后端项目
目录
-
1. 环境与工具链 -
2. FastAPI 基础 -
3. Pydantic v2 数据验证 -
4. 请求与响应 -
5. 依赖注入 -
6. 异常处理 -
7. 中间件 -
8. SQLAlchemy 2.0 数据库 -
9. Alembic 数据库迁移 -
10. 认证与授权(JWT) -
11. 文件上传 -
12. 后台任务 -
13. CORS 与跨域 -
14. 日志配置 -
15. WebSocket -
16. 测试 -
17. 部署 -
18. 最佳实践与项目结构 -
19. 常见错误排查 -
20. 参考链接
1. 环境与工具链
1.1 Python 版本
使用 Python >= 3.12,支持最新的类型注解语法。
1.2 uv 包管理
# 安装 uvcurl -LsSf https://astral.sh/uv/install.sh | sh# 初始化项目uv init --appuv add fastapi uvicorn sqlalchemy alembic pydantic-settingsuv add --dev ruff pytest httpx# 运行uv run uvicorn app.main:app --reload# 安装依赖uv sync# 添加/移除依赖uv add <package>uv remove <package>
1.3 依赖速查表
|
|
|
|---|---|
fastapi[standard] |
|
uvicorn[standard] |
|
pydantic |
|
pydantic-settings |
|
sqlalchemy[asyncio] |
|
aiomysql
asyncpg/ aiosqlite |
|
alembic |
|
python-jose[cryptography] |
|
passlib[bcrypt] |
|
python-multipart |
|
httpx |
|
ruff |
|
pytest
pytest-asyncio |
|
2. FastAPI 基础
2.1 核心概念:ASGI 与异步
在学习代码之前,先理解 FastAPI 的根基——ASGI。
什么是 ASGI?
ASGI(Asynchronous Server Gateway Interface)是 Python 的异步 Web 服务器接口标准。它解决了传统 WSGI(如 Flask/Django 用的)不能处理 WebSocket、长连接、并发请求的问题。
传统 WSGI: 请求 → 处理 → 返回(同步,一次一个)ASGI: 请求 → 处理 → 返回(异步,可同时处理多个)FastAPI → 基于 ASGI → 支持异步 → 高并发 + WebSocketFlask → 基于 WSGI → 同步 → 简单但并发弱
为什么用异步?
# 同步方式:2 个请求依次处理,共 4 秒# 请求1:开始 → 查数据库(2秒) → 返回# 请求2: 开始 → 查数据库(2秒) → 返回# 异步方式:2 个请求同时处理,共 2 秒# 请求1:开始 → 查数据库(2秒,等待时不阻塞) → 返回# 请求2:开始 → 查数据库(2秒,等待时不阻塞) → 返回
异步编程的核心思想:等待 I/O(数据库查询、HTTP 请求、文件读写)时,CPU 不空闲等着,而是去处理其他请求。这就是 FastAPI 能支撑高并发的根本原因。
请求在 FastAPI 中的完整流程:
客户端 → Uvicorn(ASGI服务器) → 中间件1 → 中间件2 → 路由匹配→ 依赖注入解析 → 参数校验(Pydantic) → 路由函数执行→ 响应模型转换 → 中间件2 → 中间件1 → 响应给客户端
2.2 应用实例
from fastapi import FastAPIapp = FastAPI(title="My API", version="1.0.0")@app.get("/")async def root(): return {"message": "Hello World"}
只有 app实例是必须的,title/version等参数用于生成 OpenAPI 文档。
async def定义异步路由。如果不需要在函数内 await异步操作,也可以用 def:
@app.get("/sync")def sync_route(): # 纯计算、无 I/O 等待的操作,用 def 即可 return {"message": "sync"}@app.get("/async")async def async_route(): # 涉及数据库查询、HTTP 请求等 I/O 操作,用 async def result = await db_query() return {"message": "async"}
FastAPI 会自动在独立线程中运行 def路由,不会阻塞主循环。
2.3 lifespan 生命周期
控制应用启动和关闭时的行为:
from contextlib import asynccontextmanagerfrom collections.abc import AsyncGeneratorfrom fastapi import FastAPI@asynccontextmanagerasync def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # 启动时执行 print("Starting up...") yield # 关闭时执行 print("Shutting down...")app = FastAPI(lifespan=lifespan)
常见用途:创建数据库连接池、加载模型、初始化缓存。
2.3 路由装饰器
@app.get("/")@app.post("/")@app.put("/")@app.delete("/")@app.patch("/")@app.options("/")@app.head("/")
2.4 路径参数
@app.get("/items/{item_id}")async def get_item(item_id: int): # 自动类型转换和验证 return {"item_id": item_id}
路径参数按声明顺序匹配,类型注解让 FastAPI 自动做类型转换和验证。如果传入非数字,返回 422。
2.5 查询参数
@app.get("/items/")async def list_items( skip: int = 0, limit: int = 10, q: str | None = None, # 可选参数): return {"skip": skip, "limit": limit, "q": q}
未给默认值的参数是必填的,有默认值的参数是可选的。
2.6 路径参数和查询参数混用
@app.get("/users/{user_id}/items/")async def get_user_items( user_id: int, skip: int = 0, limit: int = 10,): pass
3. Pydantic v2 数据验证
3.1 基础模型
from pydantic import BaseModelclass Item(BaseModel): name: str price: float is_offer: bool = False # 有默认值,可选 description: str | None = None # 可为空
3.2 字段验证
from pydantic import BaseModel, Fieldclass Item(BaseModel): name: str = Field(min_length=1, max_length=100) price: float = Field(gt=0, le=10000) # greater than, less or equal quantity: int = Field(ge=0, default=0) # greater or equal email: str = Field(pattern=r"^[\w\.-]+@[\w\.-]+\.\w+$")
3.3 自定义验证器
from pydantic import BaseModel, field_validatorclass Item(BaseModel): name: str password: str @field_validator("name") @classmethod def name_must_be_meaningful(cls, v: str) -> str: if len(v) < 2: raise ValueError("name too short") return v.strip() @field_validator("password") @classmethod def password_strength(cls, v: str) -> str: if len(v) < 8: raise ValueError("password must be at least 8 characters") return v
3.4 模型配置
from pydantic import BaseModelfrom datetime import datetimeclass UserResponse(BaseModel): id: int username: str created_at: datetime model_config = {"from_attributes": True} # 允许从 ORM 模型创建:UserResponse.model_validate(db_user)
3.5 序列化与反序列化
data = {"name": "foo", "price": 9.99}item = Item(**data) # 字典 → Pydanticitem.model_dump() # Pydantic → 字典item.model_dump_json() # Pydantic → JSON 字符串item.model_dump(exclude_unset=True) # 只返回显式设置的字段
3.6 嵌套模型
class Image(BaseModel): url: str name: strclass Item(BaseModel): name: str image: Image | None = None tags: list[str] = []
3.7 Pydantic v2 进阶
from pydantic import BaseModel, field_validator, model_validator, field_serializer, computed_fieldfrom datetime import datetime# ── 模型级验证器 ──────────────────────────────────class Registration(BaseModel): username: str password: str confirm_password: str @model_validator(mode="after") def passwords_match(self): if self.password != self.confirm_password: raise ValueError("passwords do not match") return self# ── 计算字段(不存数据,由其他字段运算得出) ──────class Rectangle(BaseModel): width: float height: float @computed_field @property def area(self) -> float: return self.width * self.height @computed_field @property def perimeter(self) -> float: return 2 * (self.width + self.height)# ── 自定义序列化器 ────────────────────────────────class User(BaseModel): name: str joined_at: datetime @field_serializer("joined_at") def serialize_datetime(self, value: datetime) -> str: return value.strftime("%Y-%m-%d %H:%M:%S")# ── 联合模型(同一接口返回多类型) ────────────────from typing import Unionfrom pydantic import BaseModelclass Cat(BaseModel): pet_type: str = "cat" meow_volume: intclass Dog(BaseModel): pet_type: str = "dog" bark_pitch: floatPet = Union[Cat, Dog]@app.get("/pets/{pet_id}", response_model=Pet)async def get_pet(pet_id: int): # FastAPI 根据响应数据自动判断返回 Cat 还是 Dog ...# ── 严格模式(禁止额外字段) ──────────────────────class StrictItem(BaseModel): name: str model_config = {"extra": "forbid"} # 请求体中多传字段 → 422 错误
4. 请求与响应
4.1 请求体
from pydantic import BaseModelclass ItemCreate(BaseModel): name: str price: float@app.post("/items/")async def create_item(item: ItemCreate): # FastAPI 自动从 JSON 解析 return item
4.2 请求体 + 路径参数 + 查询参数
@app.put("/items/{item_id}")async def update_item( item_id: int, # 路径参数 item: ItemCreate, # 请求体(JSON) q: str | None = None, # 查询参数): return {"item_id": item_id, **item.model_dump(), "q": q}
4.3 表单数据
from fastapi import Form@app.post("/login/")async def login( username: str = Form(), password: str = Form(),): return {"username": username}
需要安装 python-multipart。
4.4 响应模型
from pydantic import BaseModelfrom datetime import datetimeclass ItemResponse(BaseModel): id: int name: str price: float created_at: datetime model_config = {"from_attributes": True}@app.post("/items/", response_model=ItemResponse, status_code=201)async def create_item(item: ItemCreate): # 返回的 dict/ORM 对象会被自动转换为 ItemResponse return db_item@app.get("/items/", response_model=list[ItemResponse])async def list_items(): return db_items
response_model的作用:
-
• 限制输出字段(过滤敏感信息) -
• 自动序列化(datetime → ISO 字符串) -
• 生成正确的 OpenAPI 文档
4.5 OpenAPI 文档自定义
from fastapi import APIRouterfrom pydantic import BaseModelclass ItemResponse(BaseModel): id: int name: strrouter = APIRouter(prefix="/items", tags=["items"])@router.get( "/", summary="列出所有物品", description="支持分页和关键词搜索的物品列表接口", response_description="物品列表", deprecated=False,)async def list_items(): """函数文档字符串也会被 FastAPI 提取到 OpenAPI 文档""" pass@router.get( "/{item_id}", response_model=ItemResponse, responses={ 404: {"description": "物品不存在"}, 403: {"description": "无权限访问"}, },)async def get_item(item_id: int): pass
参数说明:
-
• summary— 接口短名称 -
• description— 接口详细说明 -
• response_description— 响应说明 -
• deprecated— 标记为已弃用 -
• responses— 自定义错误响应文档 -
• 函数 docstring 也会被自动提取
4.6 响应类型
from fastapi.responses import ( JSONResponse, HTMLResponse, PlainTextResponse, RedirectResponse, FileResponse, StreamingResponse,)import json@app.get("/json")async def json_response(): return JSONResponse(content={"message": "ok"})@app.get("/html")async def html_response(): return HTMLResponse(content="<h1>Hello</h1>")@app.get("/redirect")async def redirect(): return RedirectResponse(url="/login")@app.get("/file")async def file_response(): return FileResponse(path="static/photo.jpg", media_type="image/jpeg")@app.get("/stream")async def stream_response(): async def generate(): for i in range(10): yield f"data: {i}\n\n" return StreamingResponse(generate(), media_type="text/event-stream")
默认情况下 FastAPI 会自动将 dict/list/Pydantic 转为 JSONResponse。其他响应类型用于特定场景。
4.8 状态码
from fastapi import status@app.post("/items/", status_code=status.HTTP_201_CREATED)@app.delete("/items/{id}", status_code=status.HTTP_204_NO_CONTENT)
4.9 Header 和 Cookie
from fastapi import Header, Cookie@app.get("/")async def read_root( user_agent: str | None = Header(default=None), session_id: str | None = Cookie(default=None),): return {"user_agent": user_agent}
5. 依赖注入
5.1 为什么需要依赖注入?
依赖注入的目的是解耦和复用。看一个反面例子:
# ❌ 不好的做法:每个路由自己创建数据库连接@app.get("/items/")async def list_items(): db = create_connection() # 重复代码 items = db.query(...) db.close() return items@app.get("/users/")async def list_users(): db = create_connection() # 重复代码 users = db.query(...) db.close() return users# ✅ 依赖注入:由 FastAPI 统一管理@app.get("/items/")async def list_items(db: Session = Depends(get_db)): # 注入 return db.query(...)@app.get("/users/")async def list_users(db: Session = Depends(get_db)): # 复用 return db.query(...)
依赖注入的好处:
-
• 复用:相同的逻辑(数据库连接、认证、分页)只写一次 -
• 测试:注入 mock 对象,无需修改路由代码 -
• 可替换:修改数据库驱动时只需改依赖函数,不用改所有路由 -
• 生命周期管理:FastAPI 自动处理资源的创建和释放(如关闭数据库连接)
5.2 基础用法
5.3 类作为依赖
from fastapi import Dependsfrom pydantic import BaseModelclass Pagination: def __init__(self, skip: int = 0, limit: int = 10): self.skip = skip self.limit = limit@app.get("/items/")async def list_items(pagination: Pagination = Depends()): return {"skip": pagination.skip, "limit": pagination.limit}
当依赖是类时,Depends()会自动实例化。
5.4 可调用对象作为依赖
from fastapi import Dependsclass DatabaseSession: def __call__(self): return get_db_session()db_session = DatabaseSession()@app.get("/items/")async def list_items(db=Depends(db_session)): pass
5.5 依赖链
async def get_db(): db = SessionLocal() try: yield db finally: db.close()async def get_current_user(db=Depends(get_db)): user = db.query(User).first() return user@app.get("/users/me")async def read_current_user(current_user=Depends(get_current_user)): return current_user
依赖可以嵌套依赖,FastAPI 会自动解析整个依赖树。
5.6 全局依赖
app = FastAPI(dependencies=[Depends(verify_token)])
所有路由都会先执行 verify_token。
5.7 路由级依赖(APIRouter)
from fastapi import APIRouter, Depends# 该路由器下所有路由都会执行 get_current_userrouter = APIRouter( prefix="/api/v1/admin", tags=["admin"], dependencies=[Depends(get_current_user)], # 路由级依赖)# 子路由可以额外叠加依赖@router.get("/dashboard", dependencies=[Depends(require_admin)])async def dashboard(): return {"message": "Admin dashboard"}# 不同路由器的依赖互不影响public_router = APIRouter(prefix="/api/v1/public") # 无认证admin_router = APIRouter(prefix="/api/v1/admin", dependencies=[Depends(verify_admin)]) # 需认证
执行顺序:全局依赖 → 路由级依赖 → 路由函数依赖 → 路由函数。
6. 异常处理
6.1 HTTPException
from fastapi import HTTPException@app.get("/items/{item_id}")async def get_item(item_id: int): if item_id < 1: raise HTTPException( status_code=400, detail="Invalid item ID", ) item = find_item(item_id) if not item: raise HTTPException( status_code=404, detail="Item not found", ) return item
6.2 自定义异常
from fastapi import HTTPExceptionclass CredentialsException(HTTPException): def __init__(self): super().__init__( status_code=401, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"}, )# 使用raise CredentialsException()
6.3 全局异常处理器
from fastapi import Requestfrom fastapi.responses import JSONResponse@app.exception_handler(ValueError)async def value_error_handler(request: Request, exc: ValueError): return JSONResponse( status_code=400, content={"message": str(exc)}, )@app.exception_handler(Exception)async def general_error_handler(request: Request, exc: Exception): return JSONResponse( status_code=500, content={"message": "Internal server error"}, )
7. 中间件
7.1 基础中间件
from fastapi import Requestimport time@app.middleware("http")async def add_process_time_header(request: Request, call_next): start_time = time.perf_counter() response = await call_next(request) process_time = time.perf_counter() - start_time response.headers["X-Process-Time"] = str(process_time) return response
中间件接收请求 → 执行路由处理 → 接收响应 → 返回响应。
7.2 CORS 中间件
from fastapi.middleware.cors import CORSMiddlewareapp.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"],)
7.3 中间件执行顺序
中间件按添加顺序包裹路由,执行时是从外到内再到外:
Middleware1 (request) → Middleware2 (request) → Route → Middleware2 (response) → Middleware1 (response)
7.4 常用内置中间件
from fastapi.middleware.gzip import GZipMiddlewarefrom fastapi.middleware.trustedhost import TrustedHostMiddlewarefrom fastapi.middleware.httpsredirect import HTTPSRedirectMiddlewareimport uuid# ── Gzip 压缩 ────────────────────────────────────────# 压缩响应体,减少传输大小app.add_middleware(GZipMiddleware, minimum_size=1000) # 大于 1KB 的响应才压缩# ── TrustedHost 安全限制 ─────────────────────────────# 只允许指定 Host 的请求访问,防止 Host 头攻击app.add_middleware( TrustedHostMiddleware, allowed_hosts=["localhost", "*.example.com"],)# ── 请求 ID 追踪 ─────────────────────────────────────# 每个请求分配唯一 ID,方便串联日志和排查问题@app.middleware("http")async def request_id_middleware(request: Request, call_next): request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())) response = await call_next(request) response.headers["X-Request-ID"] = request_id return response
8. SQLAlchemy 2.0 数据库
8.1 配置引擎和会话
from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker, create_async_engine,)from sqlalchemy.orm import DeclarativeBaseDATABASE_URL = "mysql+aiomysql://root:root@localhost:3306/app_db"engine = create_async_engine( DATABASE_URL, echo=False, pool_size=10, # 连接池大小 max_overflow=20, # 超出 pool_size 后允许的最大连接数 pool_pre_ping=True, # 每次取连接前检查是否有效(避免使用已断开的连接) pool_recycle=3600, # 连接最大存活时间(秒),MySQL 默认 8 小时断开空闲连接)async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)class Base(DeclarativeBase): passasync def get_db(): async with async_session() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
8.2 定义模型
from sqlalchemy import String, Integer, Boolean, Text, Float, ForeignKey, DateTimefrom sqlalchemy.orm import Mapped, mapped_column, relationshipfrom datetime import datetime, timezonedef _utcnow() -> datetime: return datetime.now(timezone.utc).replace(tzinfo=None)class User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False) email: Mapped[str] = mapped_column(String(100), unique=True, nullable=False) hashed_password: Mapped[str] = mapped_column(String(255), nullable=False) is_active: Mapped[bool] = mapped_column(Boolean, default=True) created_at: Mapped[datetime] = mapped_column(default=_utcnow) updated_at: Mapped[datetime] = mapped_column(default=_utcnow, onupdate=_utcnow) items: Mapped[list["Item"]] = relationship(back_populates="owner")class Item(Base): __tablename__ = "items" id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) title: Mapped[str] = mapped_column(String(100), nullable=False) description: Mapped[str | None] = mapped_column(Text, nullable=True) price: Mapped[float] = mapped_column(Float, default=0) is_done: Mapped[bool] = mapped_column(Boolean, default=False) owner_id: Mapped[int] = mapped_column(ForeignKey("users.id")) created_at: Mapped[datetime] = mapped_column(default=_utcnow) updated_at: Mapped[datetime] = mapped_column(default=_utcnow, onupdate=_utcnow) owner: Mapped["User"] = relationship(back_populates="items")
8.3 字段类型速查
|
|
|
|
|---|---|---|
int |
|
|
str |
|
|
str |
String(100) |
|
str |
Text |
|
float |
|
|
bool |
Boolean |
|
datetime |
DateTime
|
|
bytes |
LargeBinary |
|
int |
ForeignKey("t.id") |
|
8.4 数据库索引
合理使用索引大幅提升查询性能:
from sqlalchemy import Index, UniqueConstraintclass User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) username: Mapped[str] = mapped_column(String(50), unique=True, index=True) # 单列索引 + 唯一 email: Mapped[str] = mapped_column(String(100), unique=True) is_active: Mapped[bool] = mapped_column(Boolean, default=False) __table_args__ = ( Index("idx_user_active_created", "is_active", "created_at"), # 联合索引 UniqueConstraint("username", "email", name="uq_username_email"), # 联合唯一 )# 常见索引策略:## 经常查询的字段 → index=True# 经常排序的字段 → index=True# 外键字段 → 自动创建索引或手动加 index=True# 唯一约束字段 → unique=True(自动创建索引)# 组合查询条件 → 联合索引 Index("name", "col1", "col2")# 文本搜索 → 使用数据库全文索引,而非 LIKE## 避免:# 每个字段都加索引(写入变慢、占用空间)# 在低选择度字段上建索引(如 boolean 字段单独建索引意义不大)# Alembic 迁移文件中也需添加:# def upgrade():# op.create_index("idx_user_active_created", "users", ["is_active", "created_at"])
8.5 增删改查
from sqlalchemy import select, delete, func# 查询列表result = await db.execute(select(Item).where(Item.is_done == False).order_by(Item.id))items = result.scalars().all()# 分页result = await db.execute( select(Item).offset(skip).limit(limit).order_by(Item.id))items = result.scalars().all()# 单条查询item = await db.get(Item, item_id)# 条件查询result = await db.execute( select(Item).where(Item.title.ilike(f"%{keyword}%")))items = result.scalars().all()# 统计result = await db.execute(select(func.count()).select_from(Item))total = result.scalar()# 创建item = Item(title="foo", owner_id=1)db.add(item)await db.flush()await db.refresh(item) # 获取自增 id# 更新item = await db.get(Item, item_id)item.title = "new title"# 自动在 get_db 的 commit 时保存# 删除item = await db.get(Item, item_id)await db.delete(item)# 关联查询result = await db.execute( select(Item).where(Item.owner.has(User.username == "admin")))
8.6 分页查询模式
from pydantic import BaseModelfrom sqlalchemy import select, funcclass PaginationParams: def __init__(self, skip: int = 0, limit: int = 20): self.skip = skip self.limit = min(limit, 100) # 防止恶意大查询class Page(BaseModel): items: list total: int skip: int limit: intasync def paginate( db: AsyncSession, query, pagination: PaginationParams,) -> Page: # 统计总数 count_query = select(func.count()).select_from(query.subquery()) total_result = await db.execute(count_query) total = total_result.scalar() # 分页查询 result = await db.execute( query.offset(pagination.skip).limit(pagination.limit) ) items = result.scalars().all() return Page(items=items, total=total, skip=pagination.skip, limit=pagination.limit)# 使用@app.get("/items/", response_model=Page)async def list_items( pagination: PaginationParams = Depends(), db: AsyncSession = Depends(get_db),): query = select(Item).order_by(Item.id) return await paginate(db, query, pagination)
8.7 同步方式(SQLite 不需要外部数据库)
# 适用于开发阶段from sqlalchemy import create_enginefrom sqlalchemy.orm import Sessionengine = create_engine("sqlite:///./app.db")SessionLocal = sessionmaker(engine)def get_db(): db = SessionLocal() try: yield db db.commit() except Exception: db.rollback() raise finally: db.close()
8.8 事务控制
# get_db 已经自动 commit/rollback# 但可以手动控制:async def create_order(user_id: int, item_ids: list[int], db: AsyncSession): order = Order(user_id=user_id) db.add(order) await db.flush() for item_id in item_ids: item = await db.get(Item, item_id) if not item: await db.rollback() raise HTTPException(404, f"Item {item_id} not found") order.items.append(item) # get_db 会自动 commit
8.9 N+1 查询与关系加载策略
N+1 问题是新手最容易踩的性能陷阱:
# ── 错误:N+1 查询 ─────────────────────────────────# 假设有 100 篇文章,每篇需要查作者posts = (await db.execute(select(Post))).scalars().all()for post in posts: print(post.author.name) # 每次循环都发一条 SQL!共 1+100 条# ── 正确:预先加载关系 ─────────────────────────────from sqlalchemy.orm import selectinload, joinedload# 方式一:selectinload(发额外 SELECT 一次性加载所有关联)query = select(Post).options(selectinload(Post.author))posts = (await db.execute(query)).scalars().all()for post in posts: print(post.author.name) # 仅 2 条 SQL# 方式二:joinedload(JOIN 在同一 SQL 中加载) query = select(Post).options(joinedload(Post.author))posts = (await db.execute(query)).scalars().all()# ── 加载多级关系 ───────────────────────────────────from sqlalchemy.orm import selectinload# 文章 + 作者 + 作者的所有文章(嵌套)query = select(Post).options( selectinload(Post.author).selectinload(User.posts))# ── 只加载部分字段 ─────────────────────────────────from sqlalchemy.orm import load_onlyquery = select(User).options(load_only(User.id, User.username))# ── 关系加载策略速查 ───────────────────────────────# selectinload() → 发额外 SELECT ... WHERE id IN (...),适用于 to-many# joinedload() → LEFT JOIN 在同一 SQL 中加载,适用于 to-one# subqueryload() → 子查询方式,不常用# lazy=True → 默认,按需加载(导致 N+1)# lazy="selectin" → 自动使用 selectinload# lazy="joined" → 自动使用 joinedload# lazy="subquery" → 自动使用 subqueryload## 建议:在 query 时用 options() 显式指定,而不是修改模型默认 lazy
8.10 批量插入与更新
from sqlalchemy import insert, update# ── 批量插入(比逐条 insert 快 10-100 倍) ──────────# 方式一:逐条 add(适合少量数据,< 100 条)for i in range(10): db.add(Item(title=f"item {i}", owner_id=1))# 方式二:bulk_insert_mappings(适合大量数据,100-10000 条)await db.execute( insert(Item), [ {"title": f"item {i}", "owner_id": 1} for i in range(1000) ])# 注意:bulk insert 不会触发 ORM 事件,也不会自动设置 created_at# 方式三:批量更新await db.execute( update(Item) .where(Item.owner_id == 1) .values(is_done=True))# ── upsert(存在则更新,不存在则插入,MySQL 语法)──from sqlalchemy.dialects.mysql import insert as mysql_insertstmt = mysql_insert(Item).values( id=1, title="upserted", owner_id=1)stmt = stmt.on_duplicate_key_update(title=stmt.inserted.title)await db.execute(stmt)
8.11 并发控制
from sqlalchemy import select# ── 问题场景 ────────────────────────────────────────# 用户 A 和 B 同时读取库存 = 1,各自下单,最终库存变成 -1# ── 方案一:悲观锁(适合写多读少) ──────────────────# 查询时锁定该行,其他事务必须等待result = await db.execute( select(Item).where(Item.id == item_id).with_for_update())item = result.scalar_one_or_none()# 在当前事务提交前,其他事务无法修改这条记录if item.stock < quantity: raise HTTPException(400, "Insufficient stock")item.stock -= quantity# 事务提交后解锁# ── 方案二:乐观锁(适合读多写少) ──────────────────# 在模型中加版本号字段,更新时检查版本号class Product(Base): __tablename__ = "products" id: Mapped[int] = mapped_column(primary_key=True) stock: Mapped[int] version: Mapped[int] = mapped_column(default=1) # 版本号# 更新时验证版本号result = await db.execute( update(Product) .where( Product.id == product_id, Product.version == current_version, # 版本号匹配才更新 ) .values(stock=new_stock, version=current_version + 1))if result.rowcount == 0: # 版本号不匹配,说明被其他事务修改了 raise HTTPException(409, "Conflict: product was modified by another user")# ── 选择策略 ─────────────────────────────────────────## 悲观锁 with_for_update():# - 适合写冲突频繁的场景# - 会降低并发性能(事务排队)## 乐观锁 version 字段:# - 适合写冲突少的场景# - 不阻塞读操作,性能好# - 冲突时需重试(客户端重新读取并重试)
9. Alembic 数据库迁移
9.1 初始化
# 异步项目alembic init -t async alembic# 同步项目alembic init alembic
9.2 配置
修改 alembic.ini:
# 注释掉硬编码的 URL# sqlalchemy.url = driver://user:pass@localhost/dbname
修改 alembic/env.py:
from app.config import get_settingsfrom app.database import Basefrom app.models import User, Item # 导入所有模型config = context.configsettings = get_settings()config.set_main_option("sqlalchemy.url", settings.database_url)target_metadata = Base.metadata
9.3 常用命令
# 自动生成迁移文件alembic revision --autogenerate -m "描述变更"# 查看状态alembic historyalembic current# 执行迁移alembic upgrade head # 升级到最新alembic upgrade +2 # 升级两步alembic downgrade -1 # 回退一步alembic downgrade <hash> # 回退到指定版本# 手动创建空迁移alembic revision -m "description"
在空迁移文件中编写 SQL:
def upgrade(): op.create_table( "my_table", sa.Column("id", sa.Integer, primary_key=True), sa.Column("name", sa.String(50), nullable=False), )def downgrade(): op.drop_table("my_table")
10. 认证与授权(JWT)
10.1 密码哈希
from passlib.context import CryptContextpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def hash_password(password: str) -> str: return pwd_context.hash(password)def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed)
10.2 JWT 令牌
from jose import JWTError, jwtfrom datetime import datetime, timedelta, timezoneSECRET_KEY = "your-secret-key"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire, "type": "access"}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)def decode_token(token: str) -> dict | None: try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: return None
10.3 登录接口
from fastapi import APIRouter, Depends, HTTPExceptionfrom fastapi.security import OAuth2PasswordRequestFormfrom sqlalchemy.ext.asyncio import AsyncSessionfrom pydantic import BaseModelrouter = APIRouter(prefix="/auth", tags=["auth"])class TokenResponse(BaseModel): access_token: str token_type: str = "bearer"@router.post("/login", response_model=TokenResponse)async def login( form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db),): # 查询用户 result = await db.execute( select(User).where(User.username == form_data.username) ) user = result.scalar_one_or_none() # 验证密码 if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException(status_code=401, detail="Invalid credentials") # 生成令牌 token = create_access_token({"sub": str(user.id)}) return TokenResponse(access_token=token)
10.4 认证依赖
from fastapi import Depends, HTTPExceptionfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentialssecurity = HTTPBearer()async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: AsyncSession = Depends(get_db),): token = credentials.credentials payload = decode_token(token) if payload is None or payload.get("type") != "access": raise HTTPException(status_code=401, detail="Invalid token") user_id = int(payload.get("sub")) user = await db.get(User, user_id) if not user: raise HTTPException(status_code=401, detail="User not found") return user
10.5 保护路由
@app.get("/users/me")async def read_current_user(current_user: User = Depends(get_current_user)): return current_user@app.get("/admin/dashboard")async def admin_dashboard(current_user: User = Depends(get_current_user)): if not current_user.is_admin: raise HTTPException(status_code=403, detail="Admin only") return {"message": "Welcome admin"}
10.6 刷新令牌
REFRESH_TOKEN_EXPIRE_DAYS = 7def create_refresh_token(data: dict) -> str: to_encode = data.copy() expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire, "type": "refresh"}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)@router.post("/refresh")async def refresh_token( token_data: RefreshTokenRequest,): payload = decode_token(token_data.refresh_token) if payload is None or payload.get("type") != "refresh": raise HTTPException(status_code=401, detail="Invalid refresh token") user_id = int(payload.get("sub")) new_access = create_access_token({"sub": str(user_id)}) return TokenResponse(access_token=new_access)
11. 文件上传
from fastapi import APIRouter, UploadFile, Fileimport osrouter = APIRouter(prefix="/upload", tags=["upload"])UPLOAD_DIR = "uploads"os.makedirs(UPLOAD_DIR, exist_ok=True)@router.post("/")async def upload_file(file: UploadFile = File()): # 读取文件内容 contents = await file.read() # 保存到磁盘 file_path = os.path.join(UPLOAD_DIR, file.filename) with open(file_path, "wb") as f: f.write(contents) return { "filename": file.filename, "content_type": file.content_type, "size": len(contents), }@router.post("/multiple/")async def upload_multiple(files: list[UploadFile] = File()): return [{"filename": f.filename} for f in files]
12. 后台任务
from fastapi import APIRouter, BackgroundTasksrouter = APIRouter(prefix="/tasks", tags=["tasks"])def write_log(message: str): with open("log.txt", "a") as f: f.write(f"{message}\n")async def send_email(to: str, subject: str): # 模拟发送邮件 await asyncio.sleep(5) print(f"Email sent to {to}: {subject}")@router.post("/send-notification")async def send_notification( email: str, background_tasks: BackgroundTasks,): background_tasks.add_task(send_email, email, "Welcome!") return {"message": "Email will be sent in background"}
后台任务在返回响应后执行,适用于非关键操作(日志、通知、缓存清理)。关键操作应使用消息队列(Celery、RabbitMQ)。
13. CORS 与跨域
from fastapi.middleware.cors import CORSMiddlewareapp.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:3000", "https://myapp.com", ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["X-Process-Time"], # 暴露自定义响应头)
开发阶段建议:
# 开发环境放行所有来源allow_origins=["*"]# 生产环境严格限制allow_origins=["https://your-frontend.com"]
14. 日志配置
14.1 基础日志
import loggingimport sys# 配置根日志logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", stream=sys.stdout, # 用 stdout 而非 stderr,避免日志顺序错乱)logger = logging.getLogger("app") # 应用级 logger# 在代码中使用@app.get("/items/{item_id}")async def get_item(item_id: int): logger.info(f"Fetching item {item_id}") try: item = await db.get(Item, item_id) if not item: logger.warning(f"Item {item_id} not found") raise HTTPException(404) logger.debug(f"Item found: {item.title}") # debug 级别默认不输出 return item except Exception: logger.exception(f"Error fetching item {item_id}") # 自动记录异常堆栈 raise
14.2 结构化日志(生产推荐)
uv add structlog
import structlogimport loggingstructlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer(), # JSON 格式,直接输送到日志系统 ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True,)logger = structlog.get_logger("app")@app.get("/items/{item_id}")async def get_item(item_id: int, request: Request): logger.info("fetching_item", item_id=item_id, path=str(request.url)) return item
输出示例(JSON 格式,可直接输入到 ELK/Datadog):
{"event": "fetching_item", "item_id": 42, "path": "/items/42", "level": "info", "timestamp": "2024-01-01T12:00:00Z", "logger": "app"}
14.3 日志分级
|
|
|
|
|---|---|---|
DEBUG |
logger.debug() |
|
INFO |
logger.info() |
|
WARNING |
logger.warning() |
|
ERROR |
logger.error() |
|
CRITICAL |
logger.critical() |
|
14.4 请求日志中间件
import timeimport logginglogger = logging.getLogger("app.access")@app.middleware("http")async def access_log_middleware(request: Request, call_next): start = time.perf_counter() response = await call_next(request) duration = time.perf_counter() - start logger.info( "%s %s → %s (%.0fms)", request.method, request.url.path, response.status_code, duration * 1000, ) return response
15. WebSocket
15.1 基础 WebSocket
from fastapi import WebSocket, WebSocketDisconnect@app.websocket("/ws")async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_text() await websocket.send_text(f"Echo: {data}") except WebSocketDisconnect: print("Client disconnected")
15.2 WebSocket 连接管理器
from fastapi import WebSocketimport jsonclass ConnectionManager: def __init__(self): self.active: dict[int, list[WebSocket]] = {} # room_id → [ws, ws, ...] async def connect(self, websocket: WebSocket, room_id: int): await websocket.accept() if room_id not in self.active: self.active[room_id] = [] self.active[room_id].append(websocket) def disconnect(self, websocket: WebSocket, room_id: int): self.active[room_id].remove(websocket) if not self.active[room_id]: del self.active[room_id] async def broadcast(self, room_id: int, message: dict): for ws in self.active.get(room_id, []): try: await ws.send_json(message) except Exception: pass @property def stats(self) -> dict: return { "rooms": len(self.active), "connections": sum(len(v) for v in self.active.values()), }manager = ConnectionManager()@router.websocket("/ws/chat/{room_id}")async def chat_websocket(websocket: WebSocket, room_id: int): await manager.connect(websocket, room_id) try: while True: data = await websocket.receive_text() message = json.loads(data) await manager.broadcast(room_id, { "user": message.get("user", "anonymous"), "text": message.get("text", ""), "room_id": room_id, }) except WebSocketDisconnect: manager.disconnect(websocket, room_id) await manager.broadcast(room_id, { "type": "system", "text": f"User left room {room_id}", })@router.get("/ws/stats")async def ws_stats(): return manager.stats
WebSocket 客户端示例:
// 浏览器端const ws = new WebSocket("ws://localhost:8000/api/v1/ws/chat/1");ws.onopen = () => ws.send(JSON.stringify({user: "alice", text: "hello"}));ws.onmessage = (e) => console.log(JSON.parse(e.data));
15.3 WebSocket 鉴权
from fastapi import WebSocket, statusfrom app.core.security import decode_token@router.websocket("/ws/auth")async def auth_websocket(websocket: WebSocket): token = websocket.query_params.get("token") if not token or not decode_token(token): await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return await websocket.accept() # ... 正常处理
15.4 SSE(Server-Sent Events)
from fastapi.responses import StreamingResponseimport asyncio@app.get("/events")async def event_stream(): async def event_generator(): while True: yield f"data: {{\"time\": \"{datetime.now(timezone.utc).isoformat()}\"}}\n\n" await asyncio.sleep(1) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", }, )
16. 测试
16.1 配置
uv add --dev pytest httpx pytest-asyncio
pyproject.toml:
[tool.pytest.ini_options]asyncio_mode = "auto"
16.2 编写测试
import pytestfrom httpx import AsyncClient, ASGITransportfrom app.main import app@pytest.fixturedef client(): transport = ASGITransport(app=app) with AsyncClient(transport=transport, base_url="http://test") as ac: yield ac@pytest.mark.asyncioasync def test_health_check(client: AsyncClient): response = await client.get("/health") assert response.status_code == 200 assert response.json() == {"status": "ok"}@pytest.mark.asyncioasync def test_create_item(client: AsyncClient): response = await client.post( "/api/v1/items/", json={"title": "test item"}, ) assert response.status_code == 201 data = response.json() assert data["title"] == "test item" assert "id" in data
16.3 运行测试
uv run pytest -vuv run pytest -v -k "item" # 按名称筛选uv run pytest -v --cov=app # 覆盖率
16.4 测试数据库
import pytestfrom httpx import AsyncClient, ASGITransportfrom app.main import appfrom app.database import engine, Base, async_session@pytest.fixture(autouse=True)async def setup_database(): """每个测试函数前重建表结构,保证测试隔离""" async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) yield async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all)@pytest.fixtureasync def client(): transport = ASGITransport(app=app) with AsyncClient(transport=transport, base_url="http://test") as ac: yield ac@pytest.fixtureasync def db(): async with async_session() as session: yield session@pytest.fixtureasync def auth_headers(client: AsyncClient, db: AsyncSession): """注册用户并返回认证头""" await client.post("/api/v1/auth/register", json={ "username": "testuser", "email": "test@example.com", "password": "testpass123", }) resp = await client.post("/api/v1/auth/login", data={ "username": "testuser", "password": "testpass123", }) token = resp.json()["access_token"] return {"Authorization": f"Bearer {token}"}# 使用@pytest.mark.asyncioasync def test_create_item(client: AsyncClient, auth_headers: dict): response = await client.post( "/api/v1/items/", json={"title": "test"}, headers=auth_headers, ) assert response.status_code == 201
16.5 Mock 外部服务
from unittest.mock import patch, AsyncMock@pytest.mark.asyncioasync def test_send_email(): """Mock 邮件服务,不真实发送""" with patch("app.services.email.EmailService.send", new_callable=AsyncMock) as mock_send: mock_send.return_value = True response = await client.post("/auth/forgot-password", json={"email": "test@test.com"}) assert response.status_code == 200 mock_send.assert_called_once()
17. 部署
17.1 多阶段 Dockerfile
FROM python:3.12-slim AS builderWORKDIR /appCOPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uvCOPY pyproject.toml uv.lock* ./RUN uv sync --no-dev --frozenFROM python:3.12-slim AS runnerWORKDIR /appCOPY --from=builder /app/.venv /app/.venvCOPY . .ENV PATH="/app/.venv/bin:$PATH"EXPOSE 8000CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
17.2 Docker Compose
services: backend: build: ./backend ports: - "8000:8000" environment: - DATABASE_URL=mysql+aiomysql://root:root@db:3306/app_db depends_on: db: condition: service_healthy db: image: mysql:8 environment: - MYSQL_ROOT_PASSWORD=root - MYSQL_DATABASE=app_db healthcheck: test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
17.3 环境变量管理
from pydantic_settings import BaseSettingsclass Settings(BaseSettings): database_url: str jwt_secret_key: str jwt_algorithm: str = "HS256" access_token_expire_minutes: int = 30 cors_origins: str = "http://localhost:3000" debug: bool = False @property def cors_origin_list(self) -> list[str]: return [o.strip() for o in self.cors_origins.split(",")] model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}@lru_cachedef get_settings() -> Settings: return Settings()
多环境配置(dev/staging/prod)
# ── .env.dev(开发环境) ─────────────────────────────# DATABASE_URL=mysql+aiomysql://root:root@localhost:3306/app_db# DEBUG=true# ── .env.prod(生产环境) ────────────────────────────# DATABASE_URL=mysql+aiomysql://user:pass@prod-host:3306/app_db# DEBUG=false# JWT_SECRET_KEY=real-secret-key# ── config.py 中按环境加载不同的文件 ────────────────import osclass Settings(BaseSettings): environment: str = "development" debug: bool = False database_url: str jwt_secret_key: str ... model_config = { "env_file": f".env.{os.getenv('APP_ENV', 'dev')}", # 根据 APP_ENV 加载不同文件 "env_file_encoding": "utf-8", "extra": "ignore", # 忽略多余的环境变量 }# 运行方式:# APP_ENV=prod uv run uvicorn app.main:app# 或者使用 Docker 环境变量覆盖(推荐):# docker run -e DATABASE_URL=... -e JWT_SECRET_KEY=... myapp
17.4 Sentry 错误监控
uv add sentry-sdk
import sentry_sdkfrom sentry_sdk.integrations.fastapi import FastApiIntegrationfrom sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegrationfrom app.config import get_settingssettings = get_settings()sentry_sdk.init( dsn=settings.sentry_dsn, # 从 Sentry 项目设置中获取 integrations=[ FastApiIntegration(), SqlalchemyIntegration(), ], traces_sample_rate=0.1, # 性能追踪采样率 10% environment=settings.environment, # "development" / "staging" / "production")# 手动捕获异常try: risky_operation()except Exception as e: sentry_sdk.capture_exception(e)# 设置用户上下文(方便定位问题用户)from sentry_sdk import set_userset_user({"id": current_user.id, "username": current_user.username})
17.5 GitHub Actions CI/CD
.github/workflows/ci.yml:
name: CIon: push: branches: [main, develop] pull_request: branches: [main]env: DATABASE_URL: sqlite+aiosqlite:///./test.db JWT_SECRET_KEY: test-secret-keyjobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Install uv uses: astral-sh/setup-uv@v4 with: version: latest - name: Set up Python run: uv python install 3.12 - name: Install dependencies run: uv sync - name: Lint run: uv run ruff check . - name: Type check run: uv run mypy app/ - name: Test run: uv run pytest -v --cov=app --cov-report=xml - name: Upload coverage uses: codecov/codecov-action@v4 with: file: ./coverage.xml build: needs: test runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v4 - name: Build Docker image run: docker build -t myapp:latest . - name: Push to registry env: DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} run: | echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin docker tag myapp:latest $DOCKER_USERNAME/myapp:latest docker push $DOCKER_USERNAME/myapp:latest
18. 最佳实践与项目结构
18.1 推荐项目结构
backend/├── pyproject.toml├── uv.lock├── .env.example├── Dockerfile├── docker-compose.yml├── alembic.ini├── alembic/│ ├── env.py│ └── versions/└── app/ ├── __init__.py ├── main.py # 应用入口 ├── config.py # 配置 ├── database.py # 数据库引擎 ├── dependencies.py # 全局依赖 ├── api/ │ ├── __init__.py │ ├── router.py # 汇总所有路由 │ └── v1/ │ ├── __init__.py │ ├── auth.py # 认证接口 │ ├── users.py # 用户接口 │ └── items.py # 资源接口 ├── core/ │ ├── __init__.py │ ├── security.py # JWT/密码工具 │ └── exceptions.py # 自定义异常 ├── crud/ │ ├── __init__.py │ ├── user.py # 用户数据库操作 │ └── item.py # 资源数据库操作 ├── models/ │ ├── __init__.py │ ├── user.py │ └── item.py └── schemas/ ├── __init__.py ├── user.py └── item.py
18.2 分层职责
|
|
|
|
|---|---|---|
api/ |
|
schemas/
crud/, dependencies |
schemas/ |
|
|
crud/ |
|
models/
database |
models/ |
|
database |
core/ |
|
|
18.3 编码规范
# 1. 异步优先async def get_user(db: AsyncSession, user_id: int): ...# 2. 类型注解完整def create_item(data: ItemCreate, db: AsyncSession) -> Item: ...# 3. 显式状态码@app.post("/items/", status_code=201)@app.delete("/items/{id}", status_code=204)# 4. 路由按资源组织prefix="/api/v1/items", tags=["items"]# 5. 使用依赖注入管理会话def get_current_user(db=Depends(get_db)): ...# 6. 环境变量分离settings = get_settings() # 从 .env 读取# 7. 敏感信息不过滤# UserResponse 不包含 hashed_password 字段# 8. 错误信息有含义raise HTTPException(404, detail="User not found")
18.4 API 版本管理
# 方式一:URL 路径版本(推荐)# /api/v1/posts → v1# /api/v2/posts → v2# app/api/router.pyfrom fastapi import APIRouterrouter_v1 = APIRouter(prefix="/api/v1")router_v1.include_router(posts_router)# 新版本变化大时,创建 v2 路由器router_v2 = APIRouter(prefix="/api/v2")router_v2.include_router(posts_v2_router)# 方式二:Header 版本from fastapi import Header@app.get("/posts")async def list_posts( api_version: str = Header(default="v1", alias="X-API-Version"),): if api_version == "v2": # v2 逻辑 ... else: # v1 逻辑 ...# 方式三:兼容策略(新增字段用 optional,不破坏旧接口)class PostV1(BaseModel): id: int title: strclass PostV2(PostV1): content: str | None = None # 新增字段设为可选
18.5 常见陷阱
|
|
|
|---|---|
datetime.utcnow() |
datetime.now(timezone.utc) |
default=datetime.now()
|
default=function_ref
|
|
|
|
|
|
|
|
|
|
response_model
|
|
* |
|
|
|
|
18.6 学习路径总结
第一阶段:基础├── 路由、参数、请求体├── Pydantic 验证└── uvicorn 启动第二阶段:数据库├── SQLAlchemy 2.0 ORM├── Alembic 迁移└── CRUD 操作第三阶段:进阶├── 依赖注入├── 认证(JWT)├── 错误处理└── 中间件第四阶段:完善├── 文件上传├── 后台任务├── 测试└── 部署(Docker)第五阶段:扩展├── WebSocket├── 消息队列(Celery / RabbitMQ)├── 缓存(Redis)├── 监控与日志├── CI/CD└── GraphQL(Strawberry)
19. 常见错误排查
19.1 启动时报错
|
|
|
|
|---|---|---|
ModuleNotFoundError: No module named 'app' |
|
backend/目录下执行 uvicorn app.main:app |
Error loading ASGI app. Could not import module "app.main" |
main.py
|
main.py
app/main.py,不是项目根目录 |
sqlalchemy.exc.OperationalError: Can't connect to MySQL server |
|
.env中 DATABASE_URL和 MySQL 服务状态 |
ModuleNotFoundError: No module named 'aiomysql' |
|
uv add aiomysql |
pydantic_core.ValidationError: 1 validation error for Settings |
.env
|
.env是否包含所有必填变量 |
ImportError: cannot import name 'AsyncGenerator' from 'collections.abc' |
|
|
19.2 运行时错误
# ── AttributeError: 'AsyncSession' object has no attribute 'query'# SQLAlchemy 2.0 异步会话使用 execute(),不是 query()# ❌ 错误items = db.query(Item).all()# ✅ 正确result = await db.execute(select(Item))items = result.scalars().all()# ── RuntimeError: You cannot use AsyncToSync in the same process# 在异步函数中调用了同步数据库操作# ✅ 要么全部异步,要么全部同步,不要混用# ── pydantic.ValidationError: Field required (type=missing)# 请求体缺少必填字段# ✅ 检查 JSON 请求体是否包含了所有必填字段# ── sqlalchemy.exc.IntegrityError: (1062, "Duplicate entry 'admin' for key 'users.username'")# 唯一约束冲突(用户名/邮箱重复)# ✅ 操作前先检查唯一字段是否存在# ── sqlalchemy.exc.OperationalError: (2006, "MySQL server has gone away")# 数据库连接超时断开# ✅ 设置 pool_recycle=3600(小于 MySQL 的 wait_timeout)
19.3 Alembic 迁移问题
|
|
|
|
|---|---|---|
Target database is not up to date |
|
alembic upgrade head |
No changes detected |
|
alembic/env.py中 import 所有模型 |
FAILED: No such revision |
|
alembic/versions/中手动删除的文件,或用 alembic stamp head |
Can't locate revision identified by |
|
alembic stamp head
|
19.4 Docker 问题
# ── port is already allocated# 端口被占用# 方案一:停掉占用端口的服务lsof -i :8000kill -9 <PID># 方案二:在 docker-compose.yml 中更换端口ports: - "8001:8000"# ── container name already in use# 容器未清理docker compose down# ── ERROR: failed to solve: failed to read dockerfile# Dockerfile 不存在ls Dockerfile*# ── Module not found in Docker container# .dockerignore 排除了源码 或 Dockerfile 未 COPY 源码# 检查 Dockerfile 中是否有 COPY . . 或 COPY app/ app/
20. 参考链接
官方文档
|
|
|
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
学习资源
|
|
|
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
第三方扩展库
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
夜雨聆风