AI工程化实战——从原型到生产的完整路径
开篇
在AI技术快速发展的今天,很多团队都能快速做出一个令人惊艳的原型演示,但真正能把AI应用落地到生产环境并稳定运行的团队却少之又少。从原型到生产,中间隔着一道巨大的鸿沟——这就是AI工程化要解决的问题。
从代码规范到测试策略,从CI/CD到监控运维,从性能优化到安全合规,AI工程化涵盖了从原型到生产的完整生命周期。但AI工程化究竟是什么?它与传统软件工程有何不同?如何构建一套完整的AI工程化体系?又该如何把你的AI项目从原型部署到生产环境?
本文将带你深入理解AI工程化实战,从原型验证到生产部署的完整流程,涵盖代码规范、测试策略、CI/CD、监控运维、性能优化、安全合规等关键环节,帮你把AI项目从原型落地到生产环境。全文约30000字。
一、AI工程化概述
1.1 从原型到生产的鸿沟
1.1.1 原型与生产的差异
原型阶段的特点:
快速验证想法 数据规模小 没有用户压力 可以接受错误 代码质量要求低 没有监控和运维
生产阶段的要求:
稳定可靠运行 处理大规模数据 高并发用户访问 高可用性要求 代码质量高 完善的监控和运维
典型的原型到生产的鸿沟:
原型:
- Jupyter Notebook代码
- 硬编码的API密钥
- 没有错误处理
- 没有测试
- 手动运行
生产:
- 模块化的代码库
- 配置管理
- 完善的错误处理
- 自动化测试
- 自动化部署
- 监控和告警
1.1.2 AI工程化的特殊挑战
AI相比传统软件的额外挑战:
| 维度 | 传统软件 | AI软件 |
|---|---|---|
| 行为确定性 | 确定 | 概率性 |
| 测试方法 | 单元测试、集成测试 | 额外需要数据测试、模型评估 |
| 部署方式 | 代码部署 | 代码+模型+数据部署 |
| 监控内容 | 系统指标 | 系统指标+模型指标+数据指标 |
| 更新频率 | 定期更新 | 模型需要持续重新训练 |
| 数据依赖 | 较低 | 极高 |
| 性能瓶颈 | CPU/IO | GPU/内存 |
1.2 AI工程化的核心目标
1.2.1 可靠性(Reliability)
什么是可靠性:
系统在预期时间内持续正常运行 出现问题时能够快速恢复 数据完整性得到保障
如何实现可靠性:
容错设计
优雅降级 重试机制 熔断机制
冗余设计
多实例部署 数据备份 灾备方案
故障恢复
自动重启 快速回滚 应急预案
1.2.2 可扩展性(Scalability)
什么是可扩展性:
随着负载增加,系统能够通过增加资源来提升性能 支持水平扩展和垂直扩展 资源利用率高
如何实现可扩展性:
水平扩展
无状态设计 负载均衡 消息队列
垂直扩展
资源优化 性能调优 硬件升级
弹性伸缩
自动扩缩容 按需分配资源 成本优化
1.2.3 可维护性(Maintainability)
什么是可维护性:
代码易于理解和修改 问题易于定位和调试 系统易于升级和扩展
如何实现可维护性:
代码质量
代码规范 模块化设计 文档完善
可观测性
日志记录 指标监控 链路追踪
开发效率
自动化测试 CI/CD流水线 开发工具链
1.2.4 安全性(Security)
什么是安全性:
数据安全(加密、权限控制) 模型安全(对抗攻击、模型窃取) 系统安全(漏洞防护、访问控制)
如何实现安全性:
数据安全
数据加密 访问控制 数据脱敏
模型安全
对抗样本防护 模型水印 推理防护
系统安全
漏洞扫描 安全审计 合规检查
1.3 AI工程化的完整生命周期
1.3.1 阶段一:原型验证(Prototype)
目标:快速验证想法的可行性
关键活动:
数据探索和分析 模型选型和实验 快速原型开发 效果验证
产出:
Jupyter Notebook 实验报告 验证结论
工具:
Jupyter Notebook Python脚本 MLflow(实验跟踪)
1.3.2 阶段二:开发(Development)
目标:把原型代码变成可维护的产品代码
关键活动:
代码重构和模块化 编写单元测试 代码审查 文档编写
产出:
模块化代码库 测试套件 技术文档
工具:
Git(版本控制) pytest(测试) Black/flake8(代码规范) Sphinx(文档)
1.3.3 阶段三:测试(Testing)
目标:确保系统质量达到生产标准
关键活动:
自动化测试 性能测试 安全测试 验收测试
产出:
测试报告 性能基准 安全报告
工具:
pytest(单元测试) Locust(性能测试) OWASP ZAP(安全测试)
1.3.4 阶段四:部署(Deployment)
目标:把系统部署到生产环境
关键活动:
环境准备 自动化部署 灰度发布 回滚预案
产出:
生产环境 部署流水线 运维手册
工具:
Docker/Kubernetes GitHub Actions/GitLab CI Argo CD(GitOps)
1.3.5 阶段五:运维(Operations)
目标:保障系统在生产环境稳定运行
关键活动:
监控和告警 故障处理 性能优化 容量规划
产出:
监控大盘 运维报告 优化方案
工具:
Prometheus/Grafana ELK(日志) Jaeger(追踪)
二、代码规范与工程实践
2.1 从Notebook到生产代码
2.1.1 Notebook的局限性
Notebook适合的场景:
数据探索 快速实验 教学演示 可视化展示
Notebook不适合生产的原因:
难以版本控制
单元格执行顺序不明确 输出结果污染代码 合并冲突难以解决
缺乏模块化
代码重复 难以复用 难以测试
错误处理不完善
没有异常处理 没有日志记录 难以调试
没有自动化
手动执行 没有CI/CD 难以部署
2.1.2 重构Notebook代码
重构步骤:
步骤1:提取函数和类
# Notebook代码(不好)
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# 读取数据
df = pd.read_csv('data.csv')
# 预处理
df = df.dropna()
X = df.drop('target', axis=1)
y = df['target']
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# 训练模型
model = RandomForestClassifier()
model.fit(X_train, y_train)
# 评估
y_pred = model.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred)}")
# 生产代码(好)
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_data(file_path: str) -> pd.DataFrame:
"""加载数据"""
logger.info(f"Loading data from {file_path}")
return pd.read_csv(file_path)
def preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
"""预处理数据"""
logger.info("Preprocessing data")
df = df.dropna()
return df
def split_features_target(df: pd.DataFrame, target_col: str):
"""划分特征和目标"""
X = df.drop(target_col, axis=1)
y = df[target_col]
return X, y
def train_model(X_train, y_train, **kwargs):
"""训练模型"""
logger.info("Training model")
model = RandomForestClassifier(**kwargs)
model.fit(X_train, y_train)
return model
def evaluate_model(model, X_test, y_test) -> float:
"""评估模型"""
logger.info("Evaluating model")
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
logger.info(f"Accuracy: {accuracy:.4f}")
return accuracy
def main():
"""主函数"""
# 加载数据
df = load_data('data.csv')
# 预处理
df = preprocess_data(df)
# 划分特征和目标
X, y = split_features_target(df, 'target')
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 训练模型
model = train_model(X_train, y_train, n_estimators=100)
# 评估
accuracy = evaluate_model(model, X_test, y_test)
return model, accuracy
if __name__ == "__main__":
main()
步骤2:模块化组织
my_project/
├── data/ # 数据
│ ├── raw/
│ ├── processed/
│ └── external/
├── models/ # 模型
│ ├── trained/
│ └── checkpoints/
├── src/ # 源代码
│ ├── __init__.py
│ ├── data/ # 数据处理
│ │ ├── __init__.py
│ │ ├── loading.py
│ │ └── preprocessing.py
│ ├── features/ # 特征工程
│ │ ├── __init__.py
│ │ └── build_features.py
│ ├── models/ # 模型定义
│ │ ├── __init__.py
│ │ ├── train_model.py
│ │ └── predict_model.py
│ ├── visualization/ # 可视化
│ │ ├── __init__.py
│ │ └── visualize.py
│ └── utils/ # 工具函数
│ ├── __init__.py
│ └── helpers.py
├── notebooks/ # Notebook
│ ├── 01_data_exploration.ipynb
│ └── 02_model_training.ipynb
├── tests/ # 测试
│ ├── __init__.py
│ ├── test_data.py
│ ├── test_features.py
│ └── test_models.py
├── scripts/ # 脚本
│ ├── train.py
│ ├── evaluate.py
│ └── deploy.py
├── configs/ # 配置
│ ├── default.yaml
│ └── production.yaml
├── logs/ # 日志
├── docs/ # 文档
│ ├── README.md
│ ├── API.md
│ └── deployment.md
├── .gitignore
├── requirements.txt
├── setup.py
└── README.md
2.2 代码规范
2.2.1 Python代码规范
使用Black格式化代码:
# 安装
pip install black
# 格式化
black src/
# 检查
black --check src/
使用flake8检查代码:
# 安装
pip install flake8
# 检查
flake8 src/
使用isort组织导入:
# 安装
pip install isort
# 排序导入
isort src/
配置pyproject.toml:
[tool.black]
line-length = 100
target-version = ['py39', 'py310', 'py311']
[tool.isort]
profile = "black"
line-length = 100
[tool.flake8]
max-line-length = 100
extend-ignore = ["E203", "E501", "W503"]
2.2.2 类型提示
使用类型提示:
from typing import Dict, List, Optional, Tuple, Union
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator
def load_data(file_path: str) -> pd.DataFrame:
"""加载数据"""
return pd.read_csv(file_path)
def preprocess_data(
df: pd.DataFrame,
drop_na: bool = True,
fill_value: Optional[Union[int, float]] = None
) -> pd.DataFrame:
"""预处理数据"""
df = df.copy()
if drop_na:
df = df.dropna()
elif fill_value is not None:
df = df.fillna(fill_value)
return df
def train_model(
X_train: pd.DataFrame,
y_train: pd.Series,
model_params: Optional[Dict] = None
) -> BaseEstimator:
"""训练模型"""
from sklearn.ensemble import RandomForestClassifier
params = model_params or {}
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
return model
def predict_model(
model: BaseEstimator,
X: pd.DataFrame
) -> Tuple[np.ndarray, np.ndarray]:
"""预测"""
y_pred = model.predict(X)
y_proba = model.predict_proba(X)
return y_pred, y_proba
2.3 配置管理
2.3.1 使用配置文件
YAML配置示例:
# configs/default.yaml
data:
raw_path: "data/raw/"
processed_path: "data/processed/"
test_size: 0.2
random_state: 42
model:
type: "RandomForestClassifier"
params:
n_estimators: 100
max_depth: 10
random_state: 42
training:
batch_size: 32
epochs: 100
learning_rate: 0.001
logging:
level: "INFO"
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file: "logs/app.log"
加载配置:
# src/config.py
import yaml
from pathlib import Path
from typing import Dict, Any
import os
class Config:
"""配置管理"""
def __init__(self, config_path: Optional[str] = None):
if config_path is None:
# 从环境变量读取配置路径
config_path = os.environ.get("CONFIG_PATH", "configs/default.yaml")
self.config_path = Path(config_path)
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""加载配置文件"""
if not self.config_path.exists():
raise FileNotFoundError(f"Config file not found: {self.config_path}")
with open(self.config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# 环境变量覆盖
config = self._override_with_env(config)
return config
def _override_with_env(self, config: Dict[str, Any], prefix: str = "APP") -> Dict[str, Any]:
"""用环境变量覆盖配置"""
for key, value in config.items():
env_key = f"{prefix}_{key.upper()}"
if isinstance(value, dict):
config[key] = self._override_with_env(value, env_key)
elif env_key in os.environ:
# 类型转换
if isinstance(value, int):
config[key] = int(os.environ[env_key])
elif isinstance(value, float):
config[key] = float(os.environ[env_key])
elif isinstance(value, bool):
config[key] = os.environ[env_key].lower() in ('true', '1', 'yes')
else:
config[key] = os.environ[env_key]
return config
def get(self, key: str, default: Any = None) -> Any:
"""获取配置值"""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def __getitem__(self, key: str) -> Any:
"""下标访问"""
return self.get(key)
# 全局配置实例
config = Config()
2.3.2 多环境配置
目录结构:
configs/
├── base.yaml # 基础配置
├── development.yaml # 开发环境
├── staging.yaml # 预发布环境
└── production.yaml # 生产环境
base.yaml:
# configs/base.yaml
data:
test_size: 0.2
random_state: 42
model:
type: "RandomForestClassifier"
params:
n_estimators: 100
random_state: 42
logging:
level: "INFO"
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
development.yaml:
# configs/development.yaml
# 继承base.yaml
_base_: base.yaml
data:
raw_path: "data/raw/"
processed_path: "data/processed/"
logging:
level: "DEBUG"
file: "logs/dev.log"
production.yaml:
# configs/production.yaml
# 继承base.yaml
_base_: base.yaml
data:
raw_path: "/data/raw/"
processed_path: "/data/processed/"
model:
params:
n_estimators: 200
max_depth: 20
logging:
level: "WARNING"
file: "/var/log/app/prod.log"
三、测试策略
3.1 测试金字塔
3.1.1 AI项目的测试层次
传统测试金字塔:
/\
/E2E\ 端到端测试(少量)
/------\
/集成测试\ 集成测试(适中)
/----------\
/ 单元测试 \ 单元测试(大量)
/--------------\
AI项目的测试金字塔:
/\
/ E2E \ 端到端测试(少量)
/-------\
/ 模型评估 \ 模型评估(适中)
/-----------\
/ 数据测试 \ 数据测试(适中)
/---------------\
/ 单元测试 \ 单元测试(大量)
/-------------------\
3.2 单元测试
3.2.1 使用pytest
安装pytest:
pip install pytest pytest-cov
基础测试示例:
# tests/test_data.py
import pandas as pd
import numpy as np
from src.data.preprocessing import preprocess_data
def test_preprocess_data_drop_na():
"""测试预处理数据 - 删除缺失值"""
# 准备测试数据
df = pd.DataFrame({
'A': [1, 2, np.nan, 4],
'B': [5, np.nan, 7, 8]
})
# 执行
result = preprocess_data(df, drop_na=True)
# 验证
assert len(result) == 2
assert not result.isna().any().any()
def test_preprocess_data_fill_value():
"""测试预处理数据 - 填充缺失值"""
# 准备测试数据
df = pd.DataFrame({
'A': [1, 2, np.nan, 4],
'B': [5, np.nan, 7, 8]
})
# 执行
result = preprocess_data(df, drop_na=False, fill_value=0)
# 验证
assert len(result) == 4
assert not result.isna().any().any()
assert (result['A'] == [1, 2, 0, 4]).all()
使用fixture:
# tests/conftest.py
import pytest
import pandas as pd
import numpy as np
@pytest.fixture
def sample_data():
"""示例数据fixture"""
return pd.DataFrame({
'feature1': [1, 2, 3, 4, 5],
'feature2': [10, 20, 30, 40, 50],
'target': [0, 1, 0, 1, 0]
})
@pytest.fixture
def sample_data_with_na():
"""包含缺失值的示例数据"""
return pd.DataFrame({
'feature1': [1, np.nan, 3, np.nan, 5],
'feature2': [10, 20, np.nan, 40, 50],
'target': [0, 1, np.nan, 1, 0]
})
@pytest.fixture
def trained_model(sample_data):
"""训练好的模型fixture"""
from sklearn.ensemble import RandomForestClassifier
X = sample_data[['feature1', 'feature2']]
y = sample_data['target']
model = RandomForestClassifier(random_state=42)
model.fit(X, y)
return model
使用fixture的测试:
# tests/test_model.py
import numpy as np
def test_model_predict(trained_model, sample_data):
"""测试模型预测"""
X = sample_data[['feature1', 'feature2']]
# 预测
y_pred = trained_model.predict(X)
# 验证
assert len(y_pred) == len(X)
assert set(y_pred).issubset({0, 1})
def test_model_predict_proba(trained_model, sample_data):
"""测试模型预测概率"""
X = sample_data[['feature1', 'feature2']]
# 预测概率
y_proba = trained_model.predict_proba(X)
# 验证
assert y_proba.shape == (len(X), 2)
assert np.all((y_proba >= 0) & (y_proba <= 1))
assert np.allclose(y_proba.sum(axis=1), 1)
3.2.2 测试覆盖率
运行测试并生成覆盖率报告:
# 运行测试
pytest tests/
# 生成覆盖率报告
pytest tests/ --cov=src --cov-report=html --cov-report=term
# 查看HTML报告
open htmlcov/index.html
配置pytest:
# pytest.ini
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
--strict-markers
--cov=src
--cov-report=term-missing
--cov-fail-under=80
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
integration: marks tests as integration tests
3.3 数据测试
3.3.1 数据质量测试
数据测试示例:
# tests/test_data_quality.py
import pandas as pd
import numpy as np
import pytest
def test_data_schema(sample_data):
"""测试数据schema"""
expected_columns = {'feature1', 'feature2', 'target'}
assert set(sample_data.columns) == expected_columns
def test_data_types(sample_data):
"""测试数据类型"""
assert sample_data['feature1'].dtype in (np.int64, np.float64)
assert sample_data['feature2'].dtype in (np.int64, np.float64)
assert sample_data['target'].dtype in (np.int64, np.float64)
def test_no_missing_values(sample_data):
"""测试没有缺失值"""
assert not sample_data.isna().any().any()
def test_target_values(sample_data):
"""测试目标值范围"""
assert set(sample_data['target'].unique()).issubset({0, 1})
def test_feature_ranges(sample_data):
"""测试特征范围"""
assert sample_data['feature1'].between(0, 100).all()
assert sample_data['feature2'].between(0, 1000).all()
def test_data_size(sample_data):
"""测试数据量"""
assert len(sample_data) >= 100 # 至少100个样本
assert len(sample_data.columns) >= 2 # 至少2个特征
3.3.2 使用Great Expectations
安装Great Expectations:
pip install great-expectations
创建Expectations:
# tests/test_great_expectations.py
import great_expectations as ge
import pandas as pd
def test_data_with_great_expectations(sample_data):
"""使用Great Expectations测试数据"""
# 转换为Great Expectations DataFrame
df_ge = ge.from_pandas(sample_data)
# 检查列是否存在
df_ge.expect_table_columns_to_match_set(
column_set=['feature1', 'feature2', 'target']
)
# 检查没有缺失值
df_ge.expect_column_values_to_not_be_null('feature1')
df_ge.expect_column_values_to_not_be_null('feature2')
df_ge.expect_column_values_to_not_be_null('target')
# 检查值范围
df_ge.expect_column_values_to_be_between('feature1', min_value=0, max_value=100)
df_ge.expect_column_values_to_be_between('feature2', min_value=0, max_value=1000)
# 检查目标值
df_ge.expect_column_values_to_be_in_set('target', value_set=[0, 1])
# 验证
validation_result = df_ge.validate()
# 输出结果
print(validation_result)
# 断言所有expectations都通过
assert validation_result['success']
3.4 模型测试
3.4.1 模型性能测试
模型评估测试:
# tests/test_model_performance.py
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np
def test_model_accuracy(trained_model, sample_data):
"""测试模型准确率"""
X = sample_data[['feature1', 'feature2']]
y_true = sample_data['target']
y_pred = trained_model.predict(X)
accuracy = accuracy_score(y_true, y_pred)
assert accuracy >= 0.7 # 准确率至少70%
def test_model_precision(trained_model, sample_data):
"""测试模型精确率"""
X = sample_data[['feature1', 'feature2']]
y_true = sample_data['target']
y_pred = trained_model.predict(X)
precision = precision_score(y_true, y_pred, zero_division=0)
assert precision >= 0.6 # 精确率至少60%
def test_model_recall(trained_model, sample_data):
"""测试模型召回率"""
X = sample_data[['feature1', 'feature2']]
y_true = sample_data['target']
y_pred = trained_model.predict(X)
recall = recall_score(y_true, y_pred, zero_division=0)
assert recall >= 0.6 # 召回率至少60%
def test_model_f1(trained_model, sample_data):
"""测试模型F1分数"""
X = sample_data[['feature1', 'feature2']]
y_true = sample_data['target']
y_pred = trained_model.predict(X)
f1 = f1_score(y_true, y_pred, zero_division=0)
assert f1 >= 0.65 # F1分数至少65%
def test_prediction_latency(trained_model, sample_data):
"""测试预测延迟"""
import time
X = sample_data[['feature1', 'feature2']]
# 预热
trained_model.predict(X.iloc[:1])
# 测试
start_time = time.time()
for _ in range(100):
trained_model.predict(X.iloc[:1])
end_time = time.time()
avg_latency = (end_time - start_time) / 100
assert avg_latency < 0.1 # 平均延迟小于100ms
3.4.2 模型鲁棒性测试
对抗样本测试:
# tests/test_model_robustness.py
import numpy as np
def test_model_noise_robustness(trained_model, sample_data):
"""测试模型对噪声的鲁棒性"""
X = sample_data[['feature1', 'feature2']].copy()
y_true = sample_data['target']
# 原始预测
y_pred_original = trained_model.predict(X)
accuracy_original = np.mean(y_pred_original == y_true)
# 添加噪声
X_noisy = X.copy()
noise_level = 0.1 # 10%噪声
for col in X.columns:
noise = np.random.normal(0, X[col].std() * noise_level, size=len(X))
X_noisy[col] = X[col] + noise
# 噪声预测
y_pred_noisy = trained_model.predict(X_noisy)
accuracy_noisy = np.mean(y_pred_noisy == y_true)
# 准确率下降不应超过20%
accuracy_drop = accuracy_original - accuracy_noisy
assert accuracy_drop < 0.2
def test_model_missing_value_robustness(trained_model, sample_data):
"""测试模型对缺失值的鲁棒性"""
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
X = sample_data[['feature1', 'feature2']].copy()
y_true = sample_data['target']
# 创建带缺失值处理的pipeline
pipeline = Pipeline([
('imputer', SimpleImputer(strategy='mean')),
('model', trained_model)
])
# 原始数据预测
y_pred_original = trained_model.predict(X)
accuracy_original = np.mean(y_pred_original == y_true)
# 制造缺失值
X_missing = X.copy()
missing_rate = 0.2 # 20%缺失值
for col in X.columns:
mask = np.random.random(len(X)) < missing_rate
X_missing.loc[mask, col] = np.nan
# 使用pipeline预测
y_pred_missing = pipeline.predict(X_missing)
accuracy_missing = np.mean(y_pred_missing == y_true)
# 准确率下降不应超过30%
accuracy_drop = accuracy_original - accuracy_missing
assert accuracy_drop < 0.3
四、CI/CD流水线
4.1 CI/CD概述
4.1.1 什么是CI/CD
CI(Continuous Integration,持续集成):
频繁地将代码合并到主干 自动化构建和测试 快速发现问题
CD(Continuous Delivery,持续交付):
自动化部署到预发布环境 手动确认后发布到生产 确保代码随时可发布
CD(Continuous Deployment,持续部署):
自动化部署到生产 无需人工干预 快速迭代
4.2 GitHub Actions
4.2.1 基础CI流水线
配置文件:
# .github/workflows/ci.yml
name: CI
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
name: Run Tests
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11']
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Cache dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov black flake8
- name: Check code formatting
run: black --check src/ tests/
- name: Lint code
run: flake8 src/ tests/
- name: Run tests
run: pytest tests/ --cov=src --cov-report=xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
4.2.2 完整CI/CD流水线
# .github/workflows/cicd.yml
name: CI/CD Pipeline
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
release:
types: [ created ]
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
test:
name: Run Tests
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11']
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov black flake8
- name: Format check
run: black --check src/ tests/
- name: Lint
run: flake8 src/ tests/
- name: Test
run: pytest tests/ --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
build-and-push:
name: Build and Push Docker Image
needs: test
runs-on: ubuntu-latest
if: github.event_name == 'push' || github.event_name == 'release'
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=sha
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
deploy-staging:
name: Deploy to Staging
needs: build-and-push
runs-on: ubuntu-latest
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
environment:
name: staging
url: https://staging.example.com
steps:
- name: Deploy to staging
run: |
echo "Deploying to staging..."
# 这里添加部署到staging的脚本
# 例如:kubectl apply -f k8s/staging/
deploy-production:
name: Deploy to Production
needs: build-and-push
runs-on: ubuntu-latest
if: github.event_name == 'release'
environment:
name: production
url: https://example.com
steps:
- name: Deploy to production
run: |
echo "Deploying to production..."
# 这里添加部署到production的脚本
五、监控与运维
5.1 可观测性三大支柱
5.1.1 日志(Logs)
使用Python logging:
# src/utils/logger.py
import logging
import sys
from pathlib import Path
from typing import Optional
def setup_logger(
name: str = "app",
log_file: Optional[str] = None,
level: str = "INFO",
json_format: bool = False
) -> logging.Logger:
"""设置日志"""
logger = logging.getLogger(name)
logger.setLevel(getattr(logging, level.upper()))
logger.handlers.clear() # 清除已有handler
# 格式
if json_format:
from pythonjsonlogger import jsonlogger
formatter = jsonlogger.JsonFormatter(
"%(asctime)s %(name)s %(levelname)s %(message)s"
)
else:
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 控制台输出
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 文件输出
if log_file:
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
# 使用示例
logger = setup_logger(
name="myapp",
log_file="logs/app.log",
level="INFO"
)
logger.debug("调试信息")
logger.info("普通信息")
logger.warning("警告信息")
logger.error("错误信息")
logger.critical("严重错误")
# 带上下文信息
logger.info(
"用户登录",
extra={
"user_id": "123",
"ip": "192.168.1.1"
}
)
结构化日志:
# 使用structlog
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
logger.info(
"request_processed",
path="/api/v1/predict",
method="POST",
status_code=200,
duration_ms=123.45,
model_version="v1.0.0"
)
5.1.2 指标(Metrics)
使用Prometheus:
# src/utils/metrics.py
from prometheus_client import (
Counter, Gauge, Histogram, Summary,
start_http_server
)
import time
import functools
# 定义指标
REQUESTS = Counter(
'api_requests_total',
'Total number of API requests',
['endpoint', 'method', 'status_code']
)
INFERENCE_TIME = Histogram(
'model_inference_seconds',
'Time spent in model inference',
['model_version']
)
ACTIVE_REQUESTS = Gauge(
'api_active_requests',
'Number of active API requests'
)
MODEL_ACCURACY = Gauge(
'model_accuracy',
'Current model accuracy',
['model_version']
)
def start_metrics_server(port: int = 8000):
"""启动指标服务器"""
start_http_server(port)
print(f"Metrics server started on port {port}")
def track_request(endpoint: str):
"""装饰器:追踪请求"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
ACTIVE_REQUESTS.inc()
method = kwargs.get('method', 'GET')
try:
result = func(*args, **kwargs)
status_code = 200
return result
except Exception as e:
status_code = 500
raise
finally:
REQUESTS.labels(endpoint, method, status_code).inc()
ACTIVE_REQUESTS.dec()
return wrapper
return decorator
def track_inference(model_version: str):
"""装饰器:追踪推理时间"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
INFERENCE_TIME.labels(model_version).observe(duration)
return wrapper
return decorator
# 使用示例
@track_request("/predict")
@track_inference("v1.0.0")
def predict(data):
"""预测函数"""
# 模型推理逻辑
time.sleep(0.1) # 模拟推理
return {"result": "prediction"}
# 更新模型准确率
MODEL_ACCURACY.labels("v1.0.0").set(0.895)
5.1.3 链路追踪(Tracing)
使用OpenTelemetry:
# src/utils/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter
)
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
import functools
def setup_tracing(service_name: str, jaeger_host: str = "localhost"):
"""设置链路追踪"""
# 创建TracerProvider
provider = TracerProvider()
trace.set_tracer_provider(provider)
# Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name=jaeger_host,
agent_port=6831
)
# 添加处理器
provider.add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# 也输出到控制台(调试用)
provider.add_span_processor(
BatchSpanProcessor(ConsoleSpanExporter())
)
# instrument requests库
RequestsInstrumentor().instrument()
return trace.get_tracer(service_name)
def trace_with_span(span_name: str):
"""装饰器:创建span"""
tracer = trace.get_tracer(__name__)
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with tracer.start_as_current_span(span_name) as span:
# 添加属性
span.set_attribute("function", func.__name__)
try:
result = func(*args, **kwargs)
span.set_status(trace.StatusCode.OK)
return result
except Exception as e:
span.set_status(trace.StatusCode.ERROR)
span.record_exception(e)
raise
return wrapper
return decorator
# 使用示例
tracer = setup_tracing("my-ai-service")
@trace_with_span("data_preprocessing")
def preprocess_data(data):
"""数据预处理"""
with tracer.start_as_current_span("cleaning"):
# 清洗逻辑
pass
with tracer.start_as_current_span("feature_engineering"):
# 特征工程
pass
return data
@trace_with_span("model_inference")
def predict(data):
"""模型推理"""
data = preprocess_data(data)
with tracer.start_as_current_span("inference") as span:
span.set_attribute("model_version", "v1.0.0")
# 推理逻辑
pass
return result
5.2 监控系统搭建
5.2.1 Prometheus + Grafana
docker-compose.yml:
version: '3'
services:
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # UI
- "6831:6831/udp" # Thrift
loki:
image: grafana/loki:latest
ports:
- "3100:3100"
promtail:
image: grafana/promtail:latest
volumes:
- ./logs:/var/log
- ./promtail-config.yml:/etc/promtail/config.yml
volumes:
prometheus_data:
grafana_data:
prometheus.yml:
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'ai-service'
static_configs:
- targets: ['ai-service:8000']
metrics_path: '/metrics'
六、性能优化
6.1 模型推理优化
6.1.1 模型量化
使用ONNX Runtime量化:
# src/optimization/quantization.py
import onnx
from onnxruntime.quantization import quantize_dynamic, QuantType
def quantize_model(model_path: str, output_path: str):
"""动态量化模型"""
quantize_dynamic(
model_input=model_path,
model_output=output_path,
weight_type=QuantType.QUInt8
)
print(f"Model quantized and saved to {output_path}")
# 使用示例
quantize_model("model.onnx", "model_quantized.onnx")
6.1.2 模型剪枝
使用TorchPrune:
# src/optimization/pruning.py
import torch
import torch.nn as nn
import torch.nn.utils.prune as prune
def prune_model(model: nn.Module, amount: float = 0.3):
"""剪枝模型"""
# 对所有线性层进行剪枝
for name, module in model.named_modules():
if isinstance(module, nn.Linear):
prune.l1_unstructured(module, name='weight', amount=amount)
prune.remove(module, 'weight')
return model
6.2 系统性能优化
6.2.1 批处理
批处理推理:
# src/optimization/batching.py
import queue
import threading
import time
from typing import List, Any
class BatchProcessor:
"""批处理器"""
def __init__(self, process_func, batch_size: int = 32, timeout: float = 0.1):
self.process_func = process_func
self.batch_size = batch_size
self.timeout = timeout
self.queue = queue.Queue()
self.results = {}
self.lock = threading.Lock()
self.event = threading.Event()
# 启动处理线程
self.thread = threading.Thread(target=self._process_loop, daemon=True)
self.thread.start()
def submit(self, item: Any) -> Any:
"""提交单个项目"""
with self.lock:
request_id = id(item)
self.queue.put((request_id, item))
# 等待结果
while True:
with self.lock:
if request_id in self.results:
return self.results.pop(request_id)
time.sleep(0.001)
def _process_loop(self):
"""处理循环"""
while True:
batch = []
# 收集批次
start_time = time.time()
while len(batch) < self.batch_size and (time.time() - start_time) < self.timeout:
try:
item = self.queue.get(timeout=0.01)
batch.append(item)
except queue.Empty:
pass
# 处理批次
if batch:
request_ids = [item[0] for item in batch]
items = [item[1] for item in batch]
# 批量处理
results = self.process_func(items)
# 返回结果
with self.lock:
for req_id, result in zip(request_ids, results):
self.results[req_id] = result
# 使用示例
def batch_predict(items: List):
"""批量预测"""
print(f"Processing batch of {len(items)} items")
return [f"result_{item}" for item in items]
# 创建批处理器
processor = BatchProcessor(batch_predict, batch_size=32, timeout=0.1)
# 提交请求
for i in range(100):
result = processor.submit(i)
print(f"Got result: {result}")
6.2.2 缓存
使用Redis缓存:
# src/optimization/caching.py
import redis
import hashlib
import json
from functools import wraps
from typing import Any, Optional
class Cache:
"""缓存管理器"""
def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
self.redis = redis.Redis(host=host, port=port, db=db)
def _get_key(self, prefix: str, data: Any) -> str:
"""生成缓存键"""
data_str = json.dumps(data, sort_keys=True)
data_hash = hashlib.md5(data_str.encode()).hexdigest()
return f"{prefix}:{data_hash}"
def get(self, prefix: str, data: Any) -> Optional[Any]:
"""获取缓存"""
key = self._get_key(prefix, data)
value = self.redis.get(key)
if value:
return json.loads(value)
return None
def set(self, prefix: str, data: Any, value: Any, ttl: int = 3600):
"""设置缓存"""
key = self._get_key(prefix, data)
self.redis.setex(key, ttl, json.dumps(value))
def cached(self, prefix: str, ttl: int = 3600):
"""缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 构建缓存键数据
cache_data = {
'args': args,
'kwargs': kwargs
}
# 尝试获取缓存
cached_result = self.get(prefix, cache_data)
if cached_result is not None:
return cached_result
# 执行函数
result = func(*args, **kwargs)
# 缓存结果
self.set(prefix, cache_data, result, ttl)
return result
return wrapper
return decorator
# 使用示例
cache = Cache()
@cache.cached(prefix="predictions", ttl=3600)
def predict(data):
"""预测函数(带缓存)"""
print("Running prediction...")
time.sleep(0.5) # 模拟耗时
return {"result": data * 2}
# 第一次调用会执行,第二次会从缓存返回
print(predict(10)) # 执行
print(predict(10)) # 缓存
七、安全与合规
7.1 数据安全
7.1.1 数据加密
加密数据:
# src/security/encryption.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
def generate_key(password: str, salt: bytes) -> bytes:
"""生成加密密钥"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=480000
)
return base64.urlsafe_b64encode(kdf.derive(password.encode()))
def encrypt_data(data: str, key: bytes) -> str:
"""加密数据"""
fernet = Fernet(key)
return fernet.encrypt(data.encode()).decode()
def decrypt_data(encrypted_data: str, key: bytes) -> str:
"""解密数据"""
fernet = Fernet(key)
return fernet.decrypt(encrypted_data.encode()).decode()
# 使用示例
password = "my-secret-password"
salt = os.urandom(16)
key = generate_key(password, salt)
# 加密
original = "sensitive data"
encrypted = encrypt_data(original, key)
print(f"Encrypted: {encrypted}")
# 解密
decrypted = decrypt_data(encrypted, key)
print(f"Decrypted: {decrypted}")
7.1.2 数据脱敏
数据脱敏:
# src/security/anonymization.py
import re
def anonymize_text(text: str) -> str:
"""文本脱敏"""
# 手机号
text = re.sub(r'1[3-9]\d{9}', lambda m: m.group(0)[:3] + '****' + m.group(0)[7:], text)
# 身份证号
text = re.sub(r'\d{17}[\dXx]', lambda m: m.group(0)[:6] + '********' + m.group(0)[14:], text)
# 邮箱
text = re.sub(r'(\w+)@(\w+\.\w+)', r'***@\2', text)
# 银行卡号
text = re.sub(r'\d{16,19}', lambda m: m.group(0)[:4] + '****' + '****' + m.group(0)[-4:], text)
return text
# 使用示例
text = "手机号:13812345678,身份证:110101199001011234,邮箱:test@example.com"
anonymized = anonymize_text(text)
print(anonymized)
# 输出:手机号:138****5678,身份证:110101********1234,邮箱:***@example.com
7.2 模型安全
7.2.1 对抗样本防护
输入验证:
# src/security/adversarial_defense.py
import numpy as np
from typing import List
def validate_input(data: np.ndarray, min_val: float = 0.0, max_val: float = 1.0) -> bool:
"""验证输入数据"""
# 检查范围
if np.any(data < min_val) or np.any(data > max_val):
return False
# 检查异常值
mean = np.mean(data)
std = np.std(data)
if np.any(np.abs(data - mean) > 5 * std):
return False
return True
def add_input_noise(data: np.ndarray, noise_level: float = 0.01) -> np.ndarray:
"""添加随机噪声(增强鲁棒性)"""
noise = np.random.normal(0, noise_level, data.shape)
return np.clip(data + noise, 0, 1)
def smooth_predictions(predictions: np.ndarray, temperature: float = 1.0) -> np.ndarray:
"""平滑预测结果"""
predictions = predictions / temperature
exp_preds = np.exp(predictions - np.max(predictions))
return exp_preds / np.sum(exp_preds)
八、总结与展望
8.1 核心要点回顾
从原型到生产:了解原型与生产的差异,以及AI工程化的特殊挑战 代码规范:从Notebook到生产代码的重构,代码规范与最佳实践 测试策略:单元测试、数据测试、模型测试的完整测试体系 CI/CD流水线:自动化构建、测试、部署的完整流程 监控运维:日志、指标、链路追踪三大支柱,以及完整的监控体系 性能优化:模型量化、剪枝、批处理、缓存等优化技术 安全合规:数据加密、脱敏,模型安全防护
8.2 AI工程化的未来趋势
短期趋势(6-12个月):
MLOps工具成熟:更多自动化、智能化的MLOps平台 大模型工程化:针对大模型的部署、优化、监控技术 GPU/TPU优化:更高效的硬件利用和调度
长期趋势(1-2年):
自动化AI工程:从数据到部署的全流程自动化 自优化系统:自动监控、自动调优、自动更新 AI工程标准化:行业标准、最佳实践的形成
互动与延伸
思考问题:
你的AI项目在哪些环节最需要工程化改进? 如何平衡快速迭代和工程质量? AI工程化中最大的挑战是什么?
推荐阅读:
《Machine Learning Engineering》 《Designing Data-Intensive Applications》 第一篇:《Agent架构深度解析》 第二篇:《MCP协议详解》 第三篇:《Skill系统设计》 第四篇:《AI CLI工具链》 第五篇:《记忆系统与RAG》
本文为「AI新堆栈」系列第六篇,全文约30000字。至此,「AI新堆栈」系列已完整覆盖从Agent架构到工程化实战的核心主题。后续将推出更多AI技术深度文章,敬请关注。
夜雨聆风