从0到1:我用AI搭了一套MySQL慢查询自动分析系统
上周五下午,DBA在群里发了一条消息:
“谁负责的 order-service?你们有条SQL跑了47秒,把主库CPU打到了92%,差点把整个库拖挂。”
然后甩了一条200多字符的SQL过来。
开发一脸懵:”这SQL线上跑了半年了,之前没问题啊?”
DBA:”数据量涨了,之前10万行现在200万行,全表扫描能不慢吗?”
然后就是熟悉的流程——开发改SQL、DBA审核、测试、上线,折腾了两天。
问题是:这条慢查询其实一周前就出现了,只是没人注意slow log里的告警。等到CPU飙升才发现,已经是事故了。
这种场景每个月都在重复。于是我决定搞一套自动化的慢查询分析系统:
-
自动采集MySQL慢查询日志 -
用AI分析每条慢SQL的问题和优化方案 -
自动生成优化报告推送到企业微信/钉钉 -
开发自己就能看懂,不用每次都找DBA
从零开始搭,一共花了两个周末。 今天把完整过程写出来,你照着做就行。
先看最终效果
每天早上9点,企业微信群自动收到一条消息:
📊 MySQL慢查询日报 — 年-月-日
⚠️ 发现 12 条慢查询,其中 3 条高危
━━━━━━━━━━━━━━━━━━━━━━━
🔴 高危 #1
库名:order_db
SQL摘要:SELECT * FROM orders WHERE user_id = ? AND status = ?
执行次数:1,283次/天
平均耗时:3.2秒
最大耗时:47秒
扫描行数:2,180,000(全表扫描)
AI诊断:
问题:user_id + status 组合查询缺少联合索引,导致全表扫描
影响:高峰期可能导致主库CPU飙升,影响所有依赖order_db的服务
建议:
ALTER TABLE orders ADD INDEX idx_user_status (user_id, status);
预期效果:扫描行数从218万降至约50行,查询耗时<10ms
风险评估:该表当前218万行,加索引预计耗时约15秒,建议在低峰期执行
🔴 高危 #2
...
🟡 中危(2条)
...
🟢 低危(7条)
[点击查看完整报告]
━━━━━━━━━━━━━━━━━━━━━━━
报告由 AI慢查询分析系统 自动生成
开发看到这个消息,自己就知道该怎么改了。DBA只需要审核一下AI给的加索引建议是否合理,不用再手动分析每一条慢SQL。
目标效果看完了,下面开始搭。
整体架构
┌─────────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────────┐
│ MySQL │────▶│ 采集模块 │────▶│ AI分析模块 │────▶│ 推送模块 │
│ Slow Log │ │ (Python) │ │ (LLM API) │ │ (企微/钉钉) │
└─────────────┘ └──────────────┘ └──────────────┘ └────────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ SQLite/ │ │ HTML报告 │
│ 本地存储 │ │ (可选) │
└──────────────┘ └──────────────┘
四个模块:
-
采集模块 —— 从MySQL拉取慢查询日志,做去重和聚合 -
AI分析模块 —— 把慢SQL + 表结构 + 执行计划喂给大模型分析 -
推送模块 —— 把分析结果格式化后推送到企业微信/钉钉 -
存储 —— 历史记录存SQLite,方便趋势对比
技术栈: Python 3.10+ / 任意LLM API(OpenAI兼容接口、Claude API、或国产大模型都行)/ SQLite / 企业微信Webhook
不需要额外的中间件,一台普通Linux服务器就能跑。
Step 1:确保MySQL开启了慢查询日志
先确认你的MySQL开启了慢查询日志:
-- 查看慢查询日志状态
SHOWVARIABLESLIKE'slow_query%';
SHOWVARIABLESLIKE'long_query_time';
如果没有开启:
-- 动态开启(不需要重启MySQL)
SETGLOBAL slow_query_log = 'ON';
SETGLOBAL long_query_time = 1; -- 超过1秒的SQL记录
SETGLOBAL log_queries_not_using_indexes = ON; -- 没用索引的也记录
-- 要持久化的话,写到 my.cnf
-- [mysqld]
-- slow_query_log = 1
-- slow_query_log_file = /var/log/mysql/slow.log
-- long_query_time = 1
-- log_queries_not_using_indexes = 1
另一种方式:从 performance_schema 采集(推荐,不用解析日志文件)
-- MySQL 5.7+ 支持
-- 从 performance_schema.events_statements_summary_by_digest 获取慢查询摘要
SELECT
SCHEMA_NAME as db_name,
DIGEST_TEXT as sql_text,
COUNT_STAR as exec_count,
ROUND(AVG_TIMER_WAIT/1000000000000, 3) as avg_time_sec,
ROUND(MAX_TIMER_WAIT/1000000000000, 3) as max_time_sec,
SUM_ROWS_EXAMINED as total_rows_examined,
SUM_ROWS_SENT as total_rows_sent,
FIRST_SEEN as first_seen,
LAST_SEEN as last_seen
FROM performance_schema.events_statements_summary_by_digest
WHERE AVG_TIMER_WAIT > 1000000000000-- 平均耗时 > 1秒
AND SCHEMA_NAME ISNOTNULL
AND DIGEST_TEXT NOTLIKE'COMMIT%'
AND DIGEST_TEXT NOTLIKE'SET%'
ORDERBY AVG_TIMER_WAIT DESC
LIMIT50;
我们用 performance_schema 方案,更干净。
Step 2:采集模块
创建项目目录:
mkdir -p slow-query-analyzer/{config,modules,reports,templates}
cd slow-query-analyzer
先装依赖:
pip install pymysql openai requests jinja2 python-dotenv
配置文件 config/settings.py
"""
慢查询分析系统配置
"""
import os
from dotenv import load_dotenv
load_dotenv()
# ========== MySQL 配置 ==========
MYSQL_CONFIG = {
"host": os.getenv("MYSQL_HOST", "127.0.0.1"),
"port": int(os.getenv("MYSQL_PORT", 3306)),
"user": os.getenv("MYSQL_USER", "monitor"),
"password": os.getenv("MYSQL_PASSWORD", ""),
"charset": "utf8mb4",
}
# 要监控的数据库列表(空列表表示监控所有)
MONITOR_DATABASES = os.getenv("MONITOR_DATABASES", "").split(",") if os.getenv("MONITOR_DATABASES") else []
# 慢查询阈值(秒)
SLOW_QUERY_THRESHOLD = float(os.getenv("SLOW_QUERY_THRESHOLD", "1.0"))
# 每次最多分析的SQL数量
MAX_QUERIES_PER_RUN = int(os.getenv("MAX_QUERIES_PER_RUN", "20"))
# ========== AI 配置 ==========
AI_CONFIG = {
"api_key": os.getenv("AI_API_KEY", ""),
"base_url": os.getenv("AI_BASE_URL", "https://api.openai.com/v1"), # 兼容OpenAI接口
"model": os.getenv("AI_MODEL", "gpt-4o"), # 可换成 claude/qwen/deepseek
"max_tokens": 2000,
"temperature": 0.1, # 分析型任务用低temperature
}
# ========== 推送配置 ==========
# 企业微信 Webhook
WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "")
# 钉钉 Webhook(二选一或都配)
DINGTALK_WEBHOOK = os.getenv("DINGTALK_WEBHOOK", "")
# ========== 存储配置 ==========
SQLITE_DB_PATH = os.getenv("SQLITE_DB_PATH", "data/slow_queries.db")
# ========== 风险等级定义 ==========
RISK_LEVELS = {
"high": {
"avg_time_threshold": 5.0, # 平均耗时>5秒
"scan_rows_threshold": 500000, # 扫描行数>50万
"exec_count_threshold": 100, # 日执行次数>100
"emoji": "🔴",
"label": "高危"
},
"medium": {
"avg_time_threshold": 2.0,
"scan_rows_threshold": 100000,
"exec_count_threshold": 50,
"emoji": "🟡",
"label": "中危"
},
"low": {
"avg_time_threshold": 1.0,
"scan_rows_threshold": 10000,
"exec_count_threshold": 10,
"emoji": "🟢",
"label": "低危"
}
}
环境变量文件 .env
# MySQL
MYSQL_HOST=10.0.1.100
MYSQL_PORT=3306
MYSQL_USER=monitor
MYSQL_PASSWORD=your_password_here
# 监控的数据库(逗号分隔,留空表示全部)
MONITOR_DATABASES=order_db,user_db,payment_db
# AI API(以下示例用DeepSeek,便宜好用)
AI_API_KEY=sk-xxxxxxxxxxxxx
AI_BASE_URL=https://api.deepseek.com/v1
AI_MODEL=deepseek-chat
# 企业微信 Webhook
WECHAT_WEBHOOK=https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxx
# 慢查询阈值
SLOW_QUERY_THRESHOLD=1.0
MAX_QUERIES_PER_RUN=20
安全提醒:.env 文件不要提交到Git,在 .gitignore 里加上。
采集模块 modules/collector.py
"""
慢查询采集模块
从 performance_schema 采集慢查询,同时获取表结构和执行计划
"""
import pymysql
import logging
from config.settings import MYSQL_CONFIG, SLOW_QUERY_THRESHOLD, MAX_QUERIES_PER_RUN, MONITOR_DATABASES
logger = logging.getLogger(__name__)
classSlowQueryCollector:
def__init__(self):
self.conn = None
defconnect(self):
"""建立MySQL连接"""
try:
self.conn = pymysql.connect(
**MYSQL_CONFIG,
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=10,
read_timeout=30,
)
logger.info("MySQL连接成功")
except Exception as e:
logger.error(f"MySQL连接失败: {e}")
raise
defclose(self):
"""关闭连接"""
if self.conn:
self.conn.close()
defcollect_slow_queries(self) -> list:
"""
从 performance_schema 采集慢查询摘要
返回慢查询列表
"""
threshold_ps = SLOW_QUERY_THRESHOLD * 1000000000000# 转为皮秒
sql = """
SELECT
SCHEMA_NAME as db_name,
DIGEST as digest,
DIGEST_TEXT as sql_text,
COUNT_STAR as exec_count,
ROUND(AVG_TIMER_WAIT/1000000000000, 3) as avg_time_sec,
ROUND(MAX_TIMER_WAIT/1000000000000, 3) as max_time_sec,
ROUND(SUM_TIMER_WAIT/1000000000000, 3) as total_time_sec,
SUM_ROWS_EXAMINED as total_rows_examined,
SUM_ROWS_SENT as total_rows_sent,
ROUND(SUM_ROWS_EXAMINED/NULLIF(COUNT_STAR,0), 0) as avg_rows_examined,
ROUND(SUM_ROWS_SENT/NULLIF(COUNT_STAR,0), 0) as avg_rows_sent,
FIRST_SEEN as first_seen,
LAST_SEEN as last_seen
FROM performance_schema.events_statements_summary_by_digest
WHERE AVG_TIMER_WAIT > %s
AND SCHEMA_NAME IS NOT NULL
AND DIGEST_TEXT NOT LIKE 'COMMIT%%'
AND DIGEST_TEXT NOT LIKE 'ROLLBACK%%'
AND DIGEST_TEXT NOT LIKE 'SET %%'
AND DIGEST_TEXT NOT LIKE 'SHOW %%'
AND DIGEST_TEXT NOT LIKE 'SELECT @@%%'
ORDER BY AVG_TIMER_WAIT DESC
LIMIT %s
"""
with self.conn.cursor() as cursor:
cursor.execute(sql, (threshold_ps, MAX_QUERIES_PER_RUN))
queries = cursor.fetchall()
# 如果配置了指定数据库,过滤
if MONITOR_DATABASES:
queries = [q for q in queries if q["db_name"] in MONITOR_DATABASES]
logger.info(f"采集到 {len(queries)} 条慢查询")
return queries
defget_table_schema(self, db_name: str, sql_text: str) -> str:
"""
根据SQL中出现的表名,获取表结构
简单实现:用正则提取FROM/JOIN后面的表名
"""
import re
# 提取表名(简单正则,覆盖常见场景)
table_pattern = r'(?:FROM|JOIN|UPDATE|INTO)\s+`?(\w+)`?'
tables = list(set(re.findall(table_pattern, sql_text, re.IGNORECASE)))
ifnot tables:
return"无法解析表名"
schemas = []
for table in tables[:5]: # 最多取5个表,防止太长
try:
with self.conn.cursor() as cursor:
cursor.execute(f"USE `{db_name}`")
cursor.execute(f"SHOW CREATE TABLE `{table}`")
result = cursor.fetchone()
if result:
schemas.append(result.get("Create Table", ""))
except Exception as e:
schemas.append(f"-- 获取表 {table} 结构失败: {e}")
return"\n\n".join(schemas)
defget_explain_result(self, db_name: str, sql_text: str) -> str:
"""
对SQL执行EXPLAIN,获取执行计划
注意:performance_schema中的SQL是参数化的(用?占位),
EXPLAIN不能直接执行,这里尝试简单替换
"""
try:
# 将 ? 替换为 1(简单处理,用于获取大致的执行计划)
explain_sql = sql_text.replace("?", "1")
# 去掉末尾的 LIMIT(EXPLAIN不需要)
# explain_sql = re.sub(r'LIMIT\s+\d+', '', explain_sql, flags=re.IGNORECASE)
with self.conn.cursor() as cursor:
cursor.execute(f"USE `{db_name}`")
cursor.execute(f"EXPLAIN {explain_sql}")
rows = cursor.fetchall()
ifnot rows:
return"EXPLAIN无结果"
# 格式化为可读的文本
lines = []
for row in rows:
line = (
f" table: {row.get('table', 'N/A')}, "
f"type: {row.get('type', 'N/A')}, "
f"possible_keys: {row.get('possible_keys', 'N/A')}, "
f"key: {row.get('key', 'N/A')}, "
f"rows: {row.get('rows', 'N/A')}, "
f"Extra: {row.get('Extra', 'N/A')}"
)
lines.append(line)
return"\n".join(lines)
except Exception as e:
returnf"EXPLAIN执行失败: {e}"
defenrich_queries(self, queries: list) -> list:
"""
为每条慢查询补充表结构和执行计划
"""
enriched = []
for q in queries:
db_name = q["db_name"]
sql_text = q["sql_text"]
q["table_schema"] = self.get_table_schema(db_name, sql_text)
q["explain_result"] = self.get_explain_result(db_name, sql_text)
enriched.append(q)
logger.info(f"已补充信息: {db_name} - {sql_text[:60]}...")
return enriched
Step 3:AI分析模块
这是核心模块——把慢查询的SQL + 表结构 + 执行计划 + 统计数据一起喂给大模型分析。
modules/analyzer.py
"""
AI分析模块
用大模型分析每条慢查询,给出诊断和优化建议
"""
import json
import logging
from openai import OpenAI
from config.settings import AI_CONFIG, RISK_LEVELS
logger = logging.getLogger(__name__)
# 初始化AI客户端(OpenAI兼容接口)
client = OpenAI(
api_key=AI_CONFIG["api_key"],
base_url=AI_CONFIG["base_url"],
)
# 系统提示词
SYSTEM_PROMPT = """你是一位资深MySQL DBA,擅长SQL性能优化和索引设计。
你需要分析慢查询SQL,给出专业的诊断和优化建议。
分析原则:
1. 先看执行计划(EXPLAIN),判断是否全表扫描、索引使用情况
2. 再看表结构,判断索引设计是否合理
3. 综合执行频率和影响面给出风险评估
4. 优化建议要具体可执行,直接给出ALTER TABLE或改写后的SQL
5. 要考虑加索引对写入性能的影响
6. 如果SQL本身写法有问题(如SELECT *、子查询可改JOIN等),也要指出
输出必须是严格的JSON格式(不要包含markdown代码块标记),包含以下字段:
{
"diagnosis": "问题诊断(一句话说清楚根因)",
"impact": "影响分析(这个慢查询会造成什么后果)",
"suggestions": [
{
"type": "add_index | rewrite_sql | adjust_config | other",
"description": "优化建议描述",
"sql": "具体的SQL语句(如ALTER TABLE或改写后的查询)",
"expected_improvement": "预期改善效果"
}
],
"risk_of_fix": "执行优化操作的风险提示(如加索引锁表时间)",
"priority": "high | medium | low"
}
"""
defanalyze_single_query(query: dict) -> dict:
"""
用AI分析单条慢查询
"""
# 构造用户消息
user_message = f"""请分析以下MySQL慢查询:
## SQL语句
```sql
{query['sql_text']}
统计信息
-
数据库:{query[‘db_name’]} -
日均执行次数:{query[‘exec_count’]} -
平均耗时:{query[‘avg_time_sec’]}秒 -
最大耗时:{query[‘max_time_sec’]}秒 -
平均扫描行数:{query.get(‘avg_rows_examined’, ‘N/A’)} -
平均返回行数:{query.get(‘avg_rows_sent’, ‘N/A’)} -
首次出现:{query.get(‘first_seen’, ‘N/A’)}
表结构
{query.get('table_schema', '无法获取')}
EXPLAIN执行计划
{query.get('explain_result', '无法获取')}
请按要求的JSON格式输出分析结果。
try:
response = client.chat.completions.create(
model=AI_CONFIG["model"],
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_message},
],
max_tokens=AI_CONFIG["max_tokens"],
temperature=AI_CONFIG["temperature"],
)
content = response.choices[0].message.content.strip()
# 清理可能的markdown代码块标记
if content.startswith("```"):
content = content.split("\n", 1)[1] # 去掉第一行
if content.endswith("```"):
content = content.rsplit("```", 1)[0] # 去掉最后的```
content = content.strip()
analysis = json.loads(content)
logger.info(f"AI分析完成: {query['sql_text'][:50]}...")
return analysis
except json.JSONDecodeError as e:
logger.error(f"AI返回JSON解析失败: {e}\n原始内容: {content}")
return {
"diagnosis": "AI分析结果解析失败,请人工检查",
"impact": "未知",
"suggestions": [],
"risk_of_fix": "未知",
"priority": "medium"
}
except Exception as e:
logger.error(f"AI分析异常: {e}")
return {
"diagnosis": f"AI分析异常: {str(e)}",
"impact": "未知",
"suggestions": [],
"risk_of_fix": "未知",
"priority": "medium"
}
def classify_risk(query: dict) -> str:
"""
根据规则对慢查询进行风险分级
AI给出的priority作为参考,结合硬指标做最终判断
"""
avg_time = float(query.get("avg_time_sec", 0))
avg_rows = int(query.get("avg_rows_examined", 0))
exec_count = int(query.get("exec_count", 0))
for level in ["high", "medium", "low"]:
thresholds = RISK_LEVELS[level]
if (avg_time >= thresholds["avg_time_threshold"]
or avg_rows >= thresholds["scan_rows_threshold"]
or (avg_time >= thresholds["avg_time_threshold"] * 0.8
and exec_count >= thresholds["exec_count_threshold"])):
return level
return"low"
def analyze_all_queries(queries: list) -> list:
"""
分析所有慢查询,返回分析结果列表
"""
results = []
for query in queries:
# AI分析
analysis = analyze_single_query(query)
# 风险分级
risk_level = classify_risk(query)
# 如果AI给的priority更高,用AI的
ai_priority = analysis.get("priority", "low")
priority_rank = {"high": 3, "medium": 2, "low": 1}
final_risk = risk_level if priority_rank.get(risk_level, 0) >= priority_rank.get(ai_priority, 0) else ai_priority
result = {
**query,
"analysis": analysis,
"risk_level": final_risk,
"risk_emoji": RISK_LEVELS.get(final_risk, RISK_LEVELS["low"])["emoji"],
"risk_label": RISK_LEVELS.get(final_risk, RISK_LEVELS["low"])["label"],
}
results.append(result)
# 按风险等级排序(高危在前)
results.sort(key=lambda x: priority_rank.get(x["risk_level"], 0), reverse=True)
logger.info(f"分析完成,共 {len(results)} 条,"
f"高危 {sum(1 for r in results if r['risk_level'] == 'high')} 条,"
f"中危 {sum(1 for r in results if r['risk_level'] == 'medium')} 条,"
f"低危 {sum(1 for r in results if r['risk_level'] == 'low')} 条")
return results
Step 4:推送模块
modules/notifier.py
"""
推送模块
支持企业微信 / 钉钉 Webhook
"""
import requests
import logging
from datetime import datetime
from config.settings import WECHAT_WEBHOOK, DINGTALK_WEBHOOK
logger = logging.getLogger(__name__)
defformat_report(results: list) -> str:
"""
把分析结果格式化为可读的文本报告
"""
now = datetime.now().strftime("%Y-%m-%d")
total = len(results)
high_count = sum(1for r in results if r["risk_level"] == "high")
medium_count = sum(1for r in results if r["risk_level"] == "medium")
low_count = sum(1for r in results if r["risk_level"] == "low")
lines = []
lines.append(f"📊 MySQL慢查询日报 — {now}")
lines.append("")
if high_count > 0:
lines.append(f"⚠️ 发现 {total} 条慢查询,其中 {high_count} 条高危")
else:
lines.append(f"✅ 发现 {total} 条慢查询,无高危")
lines.append("━" * 30)
# 详细列出高危和中危
detail_count = 0
for r in results:
if r["risk_level"] in ("high", "medium"):
detail_count += 1
analysis = r.get("analysis", {})
lines.append("")
lines.append(f"{r['risk_emoji']}{r['risk_label']} #{detail_count}")
lines.append(f"库名:{r['db_name']}")
# SQL摘要(截断到120字符)
sql_summary = r["sql_text"][:120]
if len(r["sql_text"]) > 120:
sql_summary += "..."
lines.append(f"SQL:{sql_summary}")
lines.append(f"执行次数:{r['exec_count']}次")
lines.append(f"平均耗时:{r['avg_time_sec']}秒")
lines.append(f"最大耗时:{r['max_time_sec']}秒")
lines.append(f"平均扫描行数:{r.get('avg_rows_examined', 'N/A')}")
lines.append("")
lines.append(f"AI诊断:{analysis.get('diagnosis', '无')}")
suggestions = analysis.get("suggestions", [])
if suggestions:
lines.append("优化建议:")
for i, s in enumerate(suggestions, 1):
lines.append(f" {i}. {s.get('description', '')}")
if s.get("sql"):
lines.append(f" {s['sql']}")
if s.get("expected_improvement"):
lines.append(f" 预期效果:{s['expected_improvement']}")
risk_of_fix = analysis.get("risk_of_fix", "")
if risk_of_fix:
lines.append(f"⚠️ 执行风险:{risk_of_fix}")
lines.append("━" * 30)
# 低危只列摘要
if low_count > 0:
lines.append("")
lines.append(f"🟢 低危({low_count}条):")
for r in results:
if r["risk_level"] == "low":
lines.append(f" · [{r['db_name']}] {r['sql_text'][:60]}... "
f"(avg: {r['avg_time_sec']}s)")
lines.append("")
lines.append("报告由 AI慢查询分析系统 自动生成")
return"\n".join(lines)
defsend_wechat(content: str):
"""发送到企业微信"""
ifnot WECHAT_WEBHOOK:
logger.warning("未配置企业微信Webhook,跳过推送")
return
# 企业微信消息最大4096字节,超过需要截断
if len(content.encode("utf-8")) > 4000:
content = content[:1500] + "\n\n... (报告过长已截断,完整报告请查看HTML版本)"
payload = {
"msgtype": "text",
"text": {
"content": content
}
}
try:
resp = requests.post(WECHAT_WEBHOOK, json=payload, timeout=10)
resp.raise_for_status()
result = resp.json()
if result.get("errcode") == 0:
logger.info("企业微信推送成功")
else:
logger.error(f"企业微信推送失败: {result}")
except Exception as e:
logger.error(f"企业微信推送异常: {e}")
defsend_dingtalk(content: str):
"""发送到钉钉"""
ifnot DINGTALK_WEBHOOK:
logger.warning("未配置钉钉Webhook,跳过推送")
return
payload = {
"msgtype": "text",
"text": {
"content": content
}
}
try:
resp = requests.post(DINGTALK_WEBHOOK, json=payload, timeout=10)
resp.raise_for_status()
result = resp.json()
if result.get("errcode") == 0:
logger.info("钉钉推送成功")
else:
logger.error(f"钉钉推送失败: {result}")
except Exception as e:
logger.error(f"钉钉推送异常: {e}")
defsend_report(results: list):
"""
格式化并推送报告
"""
ifnot results:
logger.info("没有慢查询需要报告")
return
content = format_report(results)
# 同时打印到控制台(方便调试)
print(content)
# 推送
send_wechat(content)
send_dingtalk(content)
Step 5:本地存储模块(可选但推荐)
用SQLite记录历史数据,可以做趋势对比(”这条SQL上周还是0.5秒,这周变成了3秒”)。
modules/storage.py
"""
本地存储模块
用SQLite记录慢查询历史,支持趋势对比
"""
import sqlite3
import json
import os
import logging
from datetime import datetime
from config.settings import SQLITE_DB_PATH
logger = logging.getLogger(__name__)
definit_db():
"""初始化数据库"""
os.makedirs(os.path.dirname(SQLITE_DB_PATH), exist_ok=True)
conn = sqlite3.connect(SQLITE_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS slow_query_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
report_date TEXT NOT NULL,
db_name TEXT,
digest TEXT,
sql_text TEXT,
exec_count INTEGER,
avg_time_sec REAL,
max_time_sec REAL,
avg_rows_examined INTEGER,
risk_level TEXT,
analysis_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_report_date ON slow_query_history(report_date)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_digest ON slow_query_history(digest)
""")
conn.commit()
conn.close()
logger.info("数据库初始化完成")
defsave_results(results: list):
"""保存分析结果到SQLite"""
init_db()
conn = sqlite3.connect(SQLITE_DB_PATH)
cursor = conn.cursor()
report_date = datetime.now().strftime("%Y-%m-%d")
for r in results:
cursor.execute("""
INSERT INTO slow_query_history
(report_date, db_name, digest, sql_text, exec_count,
avg_time_sec, max_time_sec, avg_rows_examined,
risk_level, analysis_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
report_date,
r.get("db_name"),
r.get("digest"),
r.get("sql_text"),
r.get("exec_count"),
r.get("avg_time_sec"),
r.get("max_time_sec"),
r.get("avg_rows_examined"),
r.get("risk_level"),
json.dumps(r.get("analysis", {}), ensure_ascii=False),
))
conn.commit()
conn.close()
logger.info(f"已保存 {len(results)} 条分析结果")
defget_trend(digest: str, days: int = 7) -> list:
"""
获取某条SQL的历史趋势
用于在报告中展示"这条SQL越来越慢了"
"""
init_db()
conn = sqlite3.connect(SQLITE_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT report_date, avg_time_sec, exec_count, avg_rows_examined
FROM slow_query_history
WHERE digest = ?
ORDER BY report_date DESC
LIMIT ?
""", (digest, days))
rows = cursor.fetchall()
conn.close()
return [
{
"date": row[0],
"avg_time": row[1],
"exec_count": row[2],
"avg_rows": row[3],
}
for row in rows
]
Step 6:主程序入口
main.py
#!/usr/bin/env python3
"""
MySQL慢查询AI分析系统 - 主入口
Usage:
python main.py # 正常运行:采集→分析→推送
python main.py --dry-run # 试运行:只采集和分析,不推送
python main.py --json # 输出JSON格式结果
"""
import sys
import logging
import argparse
import json
from datetime import datetime
from modules.collector import SlowQueryCollector
from modules.analyzer import analyze_all_queries
from modules.notifier import send_report
from modules.storage import save_results
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(f"logs/run_{datetime.now().strftime('%Y%m%d')}.log"),
]
)
logger = logging.getLogger(__name__)
defmain():
parser = argparse.ArgumentParser(description="MySQL慢查询AI分析系统")
parser.add_argument("--dry-run", action="store_true", help="试运行,不推送消息")
parser.add_argument("--json", action="store_true", help="输出JSON格式结果")
args = parser.parse_args()
import os
os.makedirs("logs", exist_ok=True)
os.makedirs("data", exist_ok=True)
logger.info("=" * 50)
logger.info("MySQL慢查询AI分析系统 启动")
logger.info("=" * 50)
# Step 1: 采集
logger.info("Step 1/4: 采集慢查询...")
collector = SlowQueryCollector()
try:
collector.connect()
queries = collector.collect_slow_queries()
ifnot queries:
logger.info("未发现慢查询,结束运行")
return
# Step 2: 补充表结构和执行计划
logger.info("Step 2/4: 获取表结构和执行计划...")
enriched_queries = collector.enrich_queries(queries)
finally:
collector.close()
# Step 3: AI分析
logger.info("Step 3/4: AI分析中...")
results = analyze_all_queries(enriched_queries)
# 输出JSON(如果指定了--json)
if args.json:
print(json.dumps(results, ensure_ascii=False, indent=2, default=str))
return
# Step 4: 保存 + 推送
logger.info("Step 4/4: 保存结果并推送...")
save_results(results)
if args.dry_run:
logger.info("[DRY RUN] 跳过推送,只打印报告:")
from modules.notifier import format_report
print(format_report(results))
else:
send_report(results)
logger.info("运行完成!")
if __name__ == "__main__":
main()
Step 7:配置定时任务
Crontab方式(最简单)
# 每天早上8:30执行,9:00前团队就能看到报告
30 8 * * * cd /opt/slow-query-analyzer && /usr/bin/python3 main.py >> logs/cron.log 2>&1
Systemd Timer方式(更可靠)
# /etc/systemd/system/slow-query-analyzer.service
[Unit]
Description=MySQL Slow Query AI Analyzer
After=network.target
[Service]
Type=oneshot
User=monitor
WorkingDirectory=/opt/slow-query-analyzer
ExecStart=/usr/bin/python3 main.py
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
# /etc/systemd/system/slow-query-analyzer.timer
[Unit]
Description=Run slow query analyzer daily
[Timer]
OnCalendar=*-*-* 08:30:00
Persistent=true
[Install]
WantedBy=timers.target
systemctl enable slow-query-analyzer.timer
systemctl start slow-query-analyzer.timer
# 查看下次执行时间
systemctl list-timers | grep slow-query
Step 8:先跑一次试试
cd /opt/slow-query-analyzer
# 第一次运行,用 --dry-run 测试,不发消息
python3 main.py --dry-run
# 确认没问题后,正式运行
python3 main.py
# 如果想看JSON格式(方便调试AI输出)
python3 main.py --json | python3 -m json.tool
完整项目目录
slow-query-analyzer/
├── config/
│ ├── __init__.py
│ └── settings.py # 配置文件
├── modules/
│ ├── __init__.py
│ ├── collector.py # 采集模块
│ ├── analyzer.py # AI分析模块
│ ├── notifier.py # 推送模块
│ └── storage.py # 存储模块
├── data/
│ └── slow_queries.db # SQLite数据库(自动创建)
├── logs/ # 日志目录(自动创建)
├── .env # 环境变量(不提交Git)
├── .gitignore
├── requirements.txt
├── main.py # 主入口
└── README.md
requirements.txt
pymysql>=1.1.0
openai>=1.0.0
requests>=2.31.0
jinja2>=3.1.0
python-dotenv>=1.0.0
.gitignore
.env
data/
logs/
__pycache__/
*.pyc
实际使用效果和踩坑经验
跑了两周后的真实数据:
效果数据
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
踩过的坑
坑1:performance_schema数据是累计的
performance_schema 的数据从MySQL启动后一直累积。如果你的MySQL跑了半年没重启,采集到的 exec_count 是半年的总量,不是今天的。
解决方案:
-- 每天采集前先记录当前值,第二天对比差值
-- 或者在每次采集后重置统计
CALL sys.ps_truncate_all_tables(FALSE);
-- 注意:这会清空所有performance_schema统计数据,
-- 确认不影响其他监控后再执行
更好的方案: 在storage模块里做差值计算,不清空原始数据。
坑2:AI分析有时候会”幻觉”
大模型偶尔会建议创建一个”已经存在的索引”,或者给出语法有误的SQL。
解决方案:
-
在Prompt里强调”请仔细查看已有索引,不要建议重复创建” -
分析结果标注”AI建议仅供参考,执行前请DBA审核” -
可以加一层校验:用Python检查建议的索引是否已存在
坑3:有些SQL太长,超过大模型Token限制
有的ORM生成的SQL几千字符,加上表结构和执行计划可能超Token限制。
解决方案:
# 在collector里加上SQL截断
MAX_SQL_LENGTH = 2000
sql_text = query["sql_text"]
if len(sql_text) > MAX_SQL_LENGTH:
sql_text = sql_text[:MAX_SQL_LENGTH] + "... (已截断)"
坑4:API费用控制
每条慢查询调一次API,如果每天50条,一个月就是1500次调用。
费用参考(2026年价格):
|
|
|
|
|
|---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
推荐用DeepSeek或Qwen,SQL分析这个场景国产模型完全够用,费用只有GPT的十分之一。
进阶玩法
搭完基础版之后,还可以继续扩展:
1. 趋势告警——SQL在变慢
# 在analyzer.py中增加趋势检测
from modules.storage import get_trend
defcheck_degradation(query: dict) -> str:
"""检查SQL是否在恶化"""
trend = get_trend(query["digest"], days=7)
if len(trend) < 3:
return""
recent_avg = trend[0]["avg_time"]
week_ago_avg = trend[-1]["avg_time"]
if recent_avg > week_ago_avg * 2:
returnf"⚠️ 趋势恶化:该SQL一周内平均耗时从{week_ago_avg}s增长到{recent_avg}s(翻倍),可能是数据量增长导致"
return""
2. 对接Grafana Dashboard
把SQLite的数据接入Grafana,做一个慢查询趋势面板:
-
每日慢查询数量趋势 -
Top 10 慢SQL排行 -
各数据库的慢查询分布 -
优化前后的耗时对比
3. 自动生成Jira工单
把高危慢查询自动创建Jira工单,指派给对应的开发团队:
# 根据数据库名映射到开发团队
DB_OWNER_MAP = {
"order_db": "订单团队",
"user_db": "用户团队",
"payment_db": "支付团队",
}
4. 接入多个MySQL实例
真实环境不会只有一个MySQL。改造方法很简单——配置文件里写多个MySQL连接,循环采集:
MYSQL_INSTANCES = [
{"name": "主库-订单", "host": "10.0.1.100", "port": 3306},
{"name": "主库-用户", "host": "10.0.1.101", "port": 3306},
{"name": "主库-支付", "host": "10.0.1.102", "port": 3306},
]
最后
这套系统的本质是什么?
把DBA的专业经验,通过AI”平民化”了。
以前分析慢查询是DBA的专属技能——要看执行计划、要懂索引设计、要权衡各种方案。普通开发和运维做不来。
现在AI可以做到DBA 80%的分析工作。DBA从”每条SQL都要我看”变成了”我只审核AI的建议”,效率提升了10倍。
这就是AI时代运维的工作方式——不是取代人,而是让每个人都拥有专家级的分析能力。
这套系统的代码不复杂(核心不到500行),但它解决的是一个真实的、高频的、痛点明确的问题。如果你想在简历上加一个”AI+运维”的项目经验,这个就很合适。
觉得有用?分享给更多运维同行看到 👇
转发给你身边还在手动分析慢查询的DBA和运维同事,这篇能帮他们省掉每周好几个小时的重复劳动。
我是「运维AI进化论」,一个用AI武装自己的运维工程师。分享运维+AI实战干货。关注我,一起进化 🚀
夜雨聆风