乐于分享
好东西不私藏

从0到1:我用AI搭了一套MySQL慢查询自动分析系统

从0到1:我用AI搭了一套MySQL慢查询自动分析系统


上周五下午,DBA在群里发了一条消息:

“谁负责的 order-service?你们有条SQL跑了47秒,把主库CPU打到了92%,差点把整个库拖挂。”

然后甩了一条200多字符的SQL过来。

开发一脸懵:”这SQL线上跑了半年了,之前没问题啊?”

DBA:”数据量涨了,之前10万行现在200万行,全表扫描能不慢吗?”

然后就是熟悉的流程——开发改SQL、DBA审核、测试、上线,折腾了两天。

问题是:这条慢查询其实一周前就出现了,只是没人注意slow log里的告警。等到CPU飙升才发现,已经是事故了。

这种场景每个月都在重复。于是我决定搞一套自动化的慢查询分析系统

  • 自动采集MySQL慢查询日志
  • 用AI分析每条慢SQL的问题和优化方案
  • 自动生成优化报告推送到企业微信/钉钉
  • 开发自己就能看懂,不用每次都找DBA

从零开始搭,一共花了两个周末。 今天把完整过程写出来,你照着做就行。


先看最终效果

每天早上9点,企业微信群自动收到一条消息:

📊 MySQL慢查询日报 — 年-月-日

⚠️ 发现 12 条慢查询,其中 3 条高危

━━━━━━━━━━━━━━━━━━━━━━━

🔴 高危 #1
库名:order_db
SQL摘要:SELECT * FROM orders WHERE user_id = ? AND status = ?
执行次数:1,283次/天
平均耗时:3.2秒
最大耗时:47秒
扫描行数:2,180,000(全表扫描)

AI诊断:
  问题:user_id + status 组合查询缺少联合索引,导致全表扫描
  影响:高峰期可能导致主库CPU飙升,影响所有依赖order_db的服务
  建议:
    ALTER TABLE orders ADD INDEX idx_user_status (user_id, status);
  预期效果:扫描行数从218万降至约50行,查询耗时<10ms
  风险评估:该表当前218万行,加索引预计耗时约15秒,建议在低峰期执行

🔴 高危 #2
...

🟡 中危(2条)
...

🟢 低危(7条)
[点击查看完整报告]

━━━━━━━━━━━━━━━━━━━━━━━
报告由 AI慢查询分析系统 自动生成

开发看到这个消息,自己就知道该怎么改了。DBA只需要审核一下AI给的加索引建议是否合理,不用再手动分析每一条慢SQL。

目标效果看完了,下面开始搭。


整体架构

┌─────────────┐     ┌──────────────┐     ┌──────────────┐     ┌────────────┐
│  MySQL      │────▶│  采集模块     │────▶│  AI分析模块   │────▶│  推送模块   │
│  Slow Log   │     │  (Python)    │     │  (LLM API)   │     │ (企微/钉钉) │
└─────────────┘     └──────────────┘     └──────────────┘     └────────────┘
                           │                    │
                           ▼                    ▼
                    ┌──────────────┐     ┌──────────────┐
                    │  SQLite/     │     │  HTML报告     │
                    │  本地存储     │     │  (可选)       │
                    └──────────────┘     └──────────────┘

四个模块:

  1. 采集模块 —— 从MySQL拉取慢查询日志,做去重和聚合
  2. AI分析模块 —— 把慢SQL + 表结构 + 执行计划喂给大模型分析
  3. 推送模块 —— 把分析结果格式化后推送到企业微信/钉钉
  4. 存储 —— 历史记录存SQLite,方便趋势对比

技术栈: Python 3.10+ / 任意LLM API(OpenAI兼容接口、Claude API、或国产大模型都行)/ SQLite / 企业微信Webhook

不需要额外的中间件,一台普通Linux服务器就能跑。


Step 1:确保MySQL开启了慢查询日志

先确认你的MySQL开启了慢查询日志:

-- 查看慢查询日志状态
SHOWVARIABLESLIKE'slow_query%';
SHOWVARIABLESLIKE'long_query_time';

如果没有开启:

-- 动态开启(不需要重启MySQL)
SETGLOBAL slow_query_log = 'ON';
SETGLOBAL long_query_time = 1;  -- 超过1秒的SQL记录
SETGLOBAL log_queries_not_using_indexes = ON;  -- 没用索引的也记录

-- 要持久化的话,写到 my.cnf
-- [mysqld]
-- slow_query_log = 1
-- slow_query_log_file = /var/log/mysql/slow.log
-- long_query_time = 1
-- log_queries_not_using_indexes = 1

另一种方式:从 performance_schema 采集(推荐,不用解析日志文件)

-- MySQL 5.7+ 支持
-- 从 performance_schema.events_statements_summary_by_digest 获取慢查询摘要
SELECT
    SCHEMA_NAME as db_name,
    DIGEST_TEXT as sql_text,
    COUNT_STAR as exec_count,
ROUND(AVG_TIMER_WAIT/10000000000003as avg_time_sec,
ROUND(MAX_TIMER_WAIT/10000000000003as max_time_sec,
    SUM_ROWS_EXAMINED as total_rows_examined,
    SUM_ROWS_SENT as total_rows_sent,
    FIRST_SEEN as first_seen,
    LAST_SEEN as last_seen
FROM performance_schema.events_statements_summary_by_digest
WHERE AVG_TIMER_WAIT > 1000000000000-- 平均耗时 > 1秒
AND SCHEMA_NAME ISNOTNULL
AND DIGEST_TEXT NOTLIKE'COMMIT%'
AND DIGEST_TEXT NOTLIKE'SET%'
ORDERBY AVG_TIMER_WAIT DESC
LIMIT50;

我们用 performance_schema 方案,更干净。


Step 2:采集模块

创建项目目录:

mkdir -p slow-query-analyzer/{config,modules,reports,templates}
cd slow-query-analyzer

先装依赖:

pip install pymysql openai requests jinja2 python-dotenv

配置文件 config/settings.py

"""
慢查询分析系统配置
"""

import os
from dotenv import load_dotenv

load_dotenv()

# ========== MySQL 配置 ==========
MYSQL_CONFIG = {
"host": os.getenv("MYSQL_HOST""127.0.0.1"),
"port": int(os.getenv("MYSQL_PORT"3306)),
"user": os.getenv("MYSQL_USER""monitor"),
"password": os.getenv("MYSQL_PASSWORD"""),
"charset""utf8mb4",
}

# 要监控的数据库列表(空列表表示监控所有)
MONITOR_DATABASES = os.getenv("MONITOR_DATABASES""").split(","if os.getenv("MONITOR_DATABASES"else []

# 慢查询阈值(秒)
SLOW_QUERY_THRESHOLD = float(os.getenv("SLOW_QUERY_THRESHOLD""1.0"))

# 每次最多分析的SQL数量
MAX_QUERIES_PER_RUN = int(os.getenv("MAX_QUERIES_PER_RUN""20"))

# ========== AI 配置 ==========
AI_CONFIG = {
"api_key": os.getenv("AI_API_KEY"""),
"base_url": os.getenv("AI_BASE_URL""https://api.openai.com/v1"),  # 兼容OpenAI接口
"model": os.getenv("AI_MODEL""gpt-4o"),  # 可换成 claude/qwen/deepseek
"max_tokens"2000,
"temperature"0.1,  # 分析型任务用低temperature
}

# ========== 推送配置 ==========
# 企业微信 Webhook
WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK""")

# 钉钉 Webhook(二选一或都配)
DINGTALK_WEBHOOK = os.getenv("DINGTALK_WEBHOOK""")

# ========== 存储配置 ==========
SQLITE_DB_PATH = os.getenv("SQLITE_DB_PATH""data/slow_queries.db")

# ========== 风险等级定义 ==========
RISK_LEVELS = {
"high": {
"avg_time_threshold"5.0,       # 平均耗时>5秒
"scan_rows_threshold"500000,    # 扫描行数>50万
"exec_count_threshold"100,      # 日执行次数>100
"emoji""🔴",
"label""高危"
    },
"medium": {
"avg_time_threshold"2.0,
"scan_rows_threshold"100000,
"exec_count_threshold"50,
"emoji""🟡",
"label""中危"
    },
"low": {
"avg_time_threshold"1.0,
"scan_rows_threshold"10000,
"exec_count_threshold"10,
"emoji""🟢",
"label""低危"
    }
}

环境变量文件 .env

# MySQL
MYSQL_HOST=10.0.1.100
MYSQL_PORT=3306
MYSQL_USER=monitor
MYSQL_PASSWORD=your_password_here

# 监控的数据库(逗号分隔,留空表示全部)
MONITOR_DATABASES=order_db,user_db,payment_db

# AI API(以下示例用DeepSeek,便宜好用)
AI_API_KEY=sk-xxxxxxxxxxxxx
AI_BASE_URL=https://api.deepseek.com/v1
AI_MODEL=deepseek-chat

# 企业微信 Webhook
WECHAT_WEBHOOK=https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxx

# 慢查询阈值
SLOW_QUERY_THRESHOLD=1.0
MAX_QUERIES_PER_RUN=20

安全提醒:.env 文件不要提交到Git,在 .gitignore 里加上。

采集模块 modules/collector.py

"""
慢查询采集模块
从 performance_schema 采集慢查询,同时获取表结构和执行计划
"""

import pymysql
import logging
from config.settings import MYSQL_CONFIG, SLOW_QUERY_THRESHOLD, MAX_QUERIES_PER_RUN, MONITOR_DATABASES

logger = logging.getLogger(__name__)


classSlowQueryCollector:
def__init__(self):
        self.conn = None

defconnect(self):
"""建立MySQL连接"""
try:
            self.conn = pymysql.connect(
                **MYSQL_CONFIG,
                cursorclass=pymysql.cursors.DictCursor,
                connect_timeout=10,
                read_timeout=30,
            )
            logger.info("MySQL连接成功")
except Exception as e:
            logger.error(f"MySQL连接失败: {e}")
raise

defclose(self):
"""关闭连接"""
if self.conn:
            self.conn.close()

defcollect_slow_queries(self) -> list:
"""
        从 performance_schema 采集慢查询摘要
        返回慢查询列表
        """

        threshold_ps = SLOW_QUERY_THRESHOLD * 1000000000000# 转为皮秒

        sql = """
        SELECT
            SCHEMA_NAME as db_name,
            DIGEST as digest,
            DIGEST_TEXT as sql_text,
            COUNT_STAR as exec_count,
            ROUND(AVG_TIMER_WAIT/1000000000000, 3) as avg_time_sec,
            ROUND(MAX_TIMER_WAIT/1000000000000, 3) as max_time_sec,
            ROUND(SUM_TIMER_WAIT/1000000000000, 3) as total_time_sec,
            SUM_ROWS_EXAMINED as total_rows_examined,
            SUM_ROWS_SENT as total_rows_sent,
            ROUND(SUM_ROWS_EXAMINED/NULLIF(COUNT_STAR,0), 0) as avg_rows_examined,
            ROUND(SUM_ROWS_SENT/NULLIF(COUNT_STAR,0), 0) as avg_rows_sent,
            FIRST_SEEN as first_seen,
            LAST_SEEN as last_seen
        FROM performance_schema.events_statements_summary_by_digest
        WHERE AVG_TIMER_WAIT > %s
          AND SCHEMA_NAME IS NOT NULL
          AND DIGEST_TEXT NOT LIKE 'COMMIT%%'
          AND DIGEST_TEXT NOT LIKE 'ROLLBACK%%'
          AND DIGEST_TEXT NOT LIKE 'SET %%'
          AND DIGEST_TEXT NOT LIKE 'SHOW %%'
          AND DIGEST_TEXT NOT LIKE 'SELECT @@%%'
        ORDER BY AVG_TIMER_WAIT DESC
        LIMIT %s
        """


with self.conn.cursor() as cursor:
            cursor.execute(sql, (threshold_ps, MAX_QUERIES_PER_RUN))
            queries = cursor.fetchall()

# 如果配置了指定数据库,过滤
if MONITOR_DATABASES:
            queries = [q for q in queries if q["db_name"in MONITOR_DATABASES]

        logger.info(f"采集到 {len(queries)} 条慢查询")
return queries

defget_table_schema(self, db_name: str, sql_text: str) -> str:
"""
        根据SQL中出现的表名,获取表结构
        简单实现:用正则提取FROM/JOIN后面的表名
        """

import re

# 提取表名(简单正则,覆盖常见场景)
        table_pattern = r'(?:FROM|JOIN|UPDATE|INTO)\s+`?(\w+)`?'
        tables = list(set(re.findall(table_pattern, sql_text, re.IGNORECASE)))

ifnot tables:
return"无法解析表名"

        schemas = []
for table in tables[:5]:  # 最多取5个表,防止太长
try:
with self.conn.cursor() as cursor:
                    cursor.execute(f"USE `{db_name}`")
                    cursor.execute(f"SHOW CREATE TABLE `{table}`")
                    result = cursor.fetchone()
if result:
                        schemas.append(result.get("Create Table"""))
except Exception as e:
                schemas.append(f"-- 获取表 {table} 结构失败: {e}")

return"\n\n".join(schemas)

defget_explain_result(self, db_name: str, sql_text: str) -> str:
"""
        对SQL执行EXPLAIN,获取执行计划
        注意:performance_schema中的SQL是参数化的(用?占位),
        EXPLAIN不能直接执行,这里尝试简单替换
        """

try:
# 将 ? 替换为 1(简单处理,用于获取大致的执行计划)
            explain_sql = sql_text.replace("?""1")

# 去掉末尾的 LIMIT(EXPLAIN不需要)
# explain_sql = re.sub(r'LIMIT\s+\d+', '', explain_sql, flags=re.IGNORECASE)

with self.conn.cursor() as cursor:
                cursor.execute(f"USE `{db_name}`")
                cursor.execute(f"EXPLAIN {explain_sql}")
                rows = cursor.fetchall()

ifnot rows:
return"EXPLAIN无结果"

# 格式化为可读的文本
            lines = []
for row in rows:
                line = (
f"  table: {row.get('table''N/A')}, "
f"type: {row.get('type''N/A')}, "
f"possible_keys: {row.get('possible_keys''N/A')}, "
f"key: {row.get('key''N/A')}, "
f"rows: {row.get('rows''N/A')}, "
f"Extra: {row.get('Extra''N/A')}"
                )
                lines.append(line)

return"\n".join(lines)

except Exception as e:
returnf"EXPLAIN执行失败: {e}"

defenrich_queries(self, queries: list) -> list:
"""
        为每条慢查询补充表结构和执行计划
        """

        enriched = []
for q in queries:
            db_name = q["db_name"]
            sql_text = q["sql_text"]

            q["table_schema"] = self.get_table_schema(db_name, sql_text)
            q["explain_result"] = self.get_explain_result(db_name, sql_text)

            enriched.append(q)
            logger.info(f"已补充信息: {db_name} - {sql_text[:60]}...")

return enriched

Step 3:AI分析模块

这是核心模块——把慢查询的SQL + 表结构 + 执行计划 + 统计数据一起喂给大模型分析。

modules/analyzer.py

"""
AI分析模块
用大模型分析每条慢查询,给出诊断和优化建议
"""

import json
import logging
from openai import OpenAI
from config.settings import AI_CONFIG, RISK_LEVELS

logger = logging.getLogger(__name__)

# 初始化AI客户端(OpenAI兼容接口)
client = OpenAI(
    api_key=AI_CONFIG["api_key"],
    base_url=AI_CONFIG["base_url"],
)

# 系统提示词
SYSTEM_PROMPT = """你是一位资深MySQL DBA,擅长SQL性能优化和索引设计。
你需要分析慢查询SQL,给出专业的诊断和优化建议。

分析原则:
1. 先看执行计划(EXPLAIN),判断是否全表扫描、索引使用情况
2. 再看表结构,判断索引设计是否合理
3. 综合执行频率和影响面给出风险评估
4. 优化建议要具体可执行,直接给出ALTER TABLE或改写后的SQL
5. 要考虑加索引对写入性能的影响
6. 如果SQL本身写法有问题(如SELECT *、子查询可改JOIN等),也要指出

输出必须是严格的JSON格式(不要包含markdown代码块标记),包含以下字段:
{
    "diagnosis": "问题诊断(一句话说清楚根因)",
    "impact": "影响分析(这个慢查询会造成什么后果)",
    "suggestions": [
        {
            "type": "add_index | rewrite_sql | adjust_config | other",
            "description": "优化建议描述",
            "sql": "具体的SQL语句(如ALTER TABLE或改写后的查询)",
            "expected_improvement": "预期改善效果"
        }
    ],
    "risk_of_fix": "执行优化操作的风险提示(如加索引锁表时间)",
    "priority": "high | medium | low"
}
"""



defanalyze_single_query(query: dict) -> dict:
"""
    用AI分析单条慢查询
    """

# 构造用户消息
    user_message = f"""请分析以下MySQL慢查询:

## SQL语句
```sql
{query['sql_text']}

统计信息

  • 数据库:{query[‘db_name’]}
  • 日均执行次数:{query[‘exec_count’]}
  • 平均耗时:{query[‘avg_time_sec’]}秒
  • 最大耗时:{query[‘max_time_sec’]}秒
  • 平均扫描行数:{query.get(‘avg_rows_examined’, ‘N/A’)}
  • 平均返回行数:{query.get(‘avg_rows_sent’, ‘N/A’)}
  • 首次出现:{query.get(‘first_seen’, ‘N/A’)}

表结构

{query.get('table_schema', '无法获取')}

EXPLAIN执行计划

{query.get('explain_result''无法获取')}

请按要求的JSON格式输出分析结果。

try:
    response = client.chat.completions.create(
        model=AI_CONFIG["model"],
        messages=[
            {"role""system""content": SYSTEM_PROMPT},
            {"role""user""content": user_message},
        ],
        max_tokens=AI_CONFIG["max_tokens"],
        temperature=AI_CONFIG["temperature"],
    )

    content = response.choices[0].message.content.strip()

# 清理可能的markdown代码块标记
if content.startswith("```"):
        content = content.split("\n", 1)[1]  # 去掉第一行
if content.endswith("```"):
        content = content.rsplit("```", 1)[0]  # 去掉最后的```
    content = content.strip()

    analysis = json.loads(content)
    logger.info(f"AI分析完成: {query['sql_text'][:50]}...")
return analysis

except json.JSONDecodeError as e:
    logger.error(f"AI返回JSON解析失败: {e}\n原始内容: {content}")
return {
"diagnosis""AI分析结果解析失败,请人工检查",
"impact""未知",
"suggestions": [],
"risk_of_fix""未知",
"priority""medium"
    }
except Exception as e:
    logger.error(f"AI分析异常: {e}")
return {
"diagnosis": f"AI分析异常: {str(e)}",
"impact""未知",
"suggestions": [],
"risk_of_fix""未知",
"priority""medium"
    }
def classify_risk(query: dict) -> str:
"""
    根据规则对慢查询进行风险分级
    AI给出的priority作为参考,结合硬指标做最终判断
    "
""
    avg_time = float(query.get("avg_time_sec", 0))
    avg_rows = int(query.get("avg_rows_examined", 0))
    exec_count = int(query.get("exec_count", 0))

for level in ["high""medium""low"]:
        thresholds = RISK_LEVELS[level]
if (avg_time >= thresholds["avg_time_threshold"]
                or avg_rows >= thresholds["scan_rows_threshold"]
                or (avg_time >= thresholds["avg_time_threshold"] * 0.8
                    and exec_count >= thresholds["exec_count_threshold"])):
return level

return"low"


def analyze_all_queries(queries: list) -> list:
"""
    分析所有慢查询,返回分析结果列表
    "
""
    results = []

for query in queries:
# AI分析
        analysis = analyze_single_query(query)

# 风险分级
        risk_level = classify_risk(query)

# 如果AI给的priority更高,用AI的
        ai_priority = analysis.get("priority""low")
        priority_rank = {"high": 3, "medium": 2, "low": 1}
        final_risk = risk_level if priority_rank.get(risk_level, 0) >= priority_rank.get(ai_priority, 0) else ai_priority

        result = {
            **query,
"analysis": analysis,
"risk_level": final_risk,
"risk_emoji": RISK_LEVELS.get(final_risk, RISK_LEVELS["low"])["emoji"],
"risk_label": RISK_LEVELS.get(final_risk, RISK_LEVELS["low"])["label"],
        }
        results.append(result)

# 按风险等级排序(高危在前)
    results.sort(key=lambda x: priority_rank.get(x["risk_level"], 0), reverse=True)

    logger.info(f"分析完成,共 {len(results)} 条,"
                f"高危 {sum(1 for r in results if r['risk_level'] == 'high')} 条,"
                f"中危 {sum(1 for r in results if r['risk_level'] == 'medium')} 条,"
                f"低危 {sum(1 for r in results if r['risk_level'] == 'low')} 条")

return results

Step 4:推送模块

modules/notifier.py

"""
推送模块
支持企业微信 / 钉钉 Webhook
"""

import requests
import logging
from datetime import datetime
from config.settings import WECHAT_WEBHOOK, DINGTALK_WEBHOOK

logger = logging.getLogger(__name__)


defformat_report(results: list) -> str:
"""
    把分析结果格式化为可读的文本报告
    """

    now = datetime.now().strftime("%Y-%m-%d")

    total = len(results)
    high_count = sum(1for r in results if r["risk_level"] == "high")
    medium_count = sum(1for r in results if r["risk_level"] == "medium")
    low_count = sum(1for r in results if r["risk_level"] == "low")

    lines = []
    lines.append(f"📊 MySQL慢查询日报 — {now}")
    lines.append("")

if high_count > 0:
        lines.append(f"⚠️ 发现 {total} 条慢查询,其中 {high_count} 条高危")
else:
        lines.append(f"✅ 发现 {total} 条慢查询,无高危")

    lines.append("━" * 30)

# 详细列出高危和中危
    detail_count = 0
for r in results:
if r["risk_level"in ("high""medium"):
            detail_count += 1
            analysis = r.get("analysis", {})

            lines.append("")
            lines.append(f"{r['risk_emoji']}{r['risk_label']} #{detail_count}")
            lines.append(f"库名:{r['db_name']}")

# SQL摘要(截断到120字符)
            sql_summary = r["sql_text"][:120]
if len(r["sql_text"]) > 120:
                sql_summary += "..."
            lines.append(f"SQL:{sql_summary}")

            lines.append(f"执行次数:{r['exec_count']}次")
            lines.append(f"平均耗时:{r['avg_time_sec']}秒")
            lines.append(f"最大耗时:{r['max_time_sec']}秒")
            lines.append(f"平均扫描行数:{r.get('avg_rows_examined''N/A')}")

            lines.append("")
            lines.append(f"AI诊断:{analysis.get('diagnosis''无')}")

            suggestions = analysis.get("suggestions", [])
if suggestions:
                lines.append("优化建议:")
for i, s in enumerate(suggestions, 1):
                    lines.append(f"  {i}{s.get('description''')}")
if s.get("sql"):
                        lines.append(f"     {s['sql']}")
if s.get("expected_improvement"):
                        lines.append(f"     预期效果:{s['expected_improvement']}")

            risk_of_fix = analysis.get("risk_of_fix""")
if risk_of_fix:
                lines.append(f"⚠️ 执行风险:{risk_of_fix}")

            lines.append("━" * 30)

# 低危只列摘要
if low_count > 0:
        lines.append("")
        lines.append(f"🟢 低危({low_count}条):")
for r in results:
if r["risk_level"] == "low":
                lines.append(f"  · [{r['db_name']}{r['sql_text'][:60]}... "
f"(avg: {r['avg_time_sec']}s)")

    lines.append("")
    lines.append("报告由 AI慢查询分析系统 自动生成")

return"\n".join(lines)


defsend_wechat(content: str):
"""发送到企业微信"""
ifnot WECHAT_WEBHOOK:
        logger.warning("未配置企业微信Webhook,跳过推送")
return

# 企业微信消息最大4096字节,超过需要截断
if len(content.encode("utf-8")) > 4000:
        content = content[:1500] + "\n\n... (报告过长已截断,完整报告请查看HTML版本)"

    payload = {
"msgtype""text",
"text": {
"content": content
        }
    }

try:
        resp = requests.post(WECHAT_WEBHOOK, json=payload, timeout=10)
        resp.raise_for_status()
        result = resp.json()
if result.get("errcode") == 0:
            logger.info("企业微信推送成功")
else:
            logger.error(f"企业微信推送失败: {result}")
except Exception as e:
        logger.error(f"企业微信推送异常: {e}")


defsend_dingtalk(content: str):
"""发送到钉钉"""
ifnot DINGTALK_WEBHOOK:
        logger.warning("未配置钉钉Webhook,跳过推送")
return

    payload = {
"msgtype""text",
"text": {
"content": content
        }
    }

try:
        resp = requests.post(DINGTALK_WEBHOOK, json=payload, timeout=10)
        resp.raise_for_status()
        result = resp.json()
if result.get("errcode") == 0:
            logger.info("钉钉推送成功")
else:
            logger.error(f"钉钉推送失败: {result}")
except Exception as e:
        logger.error(f"钉钉推送异常: {e}")


defsend_report(results: list):
"""
    格式化并推送报告
    """

ifnot results:
        logger.info("没有慢查询需要报告")
return

    content = format_report(results)

# 同时打印到控制台(方便调试)
    print(content)

# 推送
    send_wechat(content)
    send_dingtalk(content)

Step 5:本地存储模块(可选但推荐)

用SQLite记录历史数据,可以做趋势对比(”这条SQL上周还是0.5秒,这周变成了3秒”)。

modules/storage.py

"""
本地存储模块
用SQLite记录慢查询历史,支持趋势对比
"""

import sqlite3
import json
import os
import logging
from datetime import datetime
from config.settings import SQLITE_DB_PATH

logger = logging.getLogger(__name__)


definit_db():
"""初始化数据库"""
    os.makedirs(os.path.dirname(SQLITE_DB_PATH), exist_ok=True)

    conn = sqlite3.connect(SQLITE_DB_PATH)
    cursor = conn.cursor()

    cursor.execute("""
    CREATE TABLE IF NOT EXISTS slow_query_history (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        report_date TEXT NOT NULL,
        db_name TEXT,
        digest TEXT,
        sql_text TEXT,
        exec_count INTEGER,
        avg_time_sec REAL,
        max_time_sec REAL,
        avg_rows_examined INTEGER,
        risk_level TEXT,
        analysis_json TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    """
)

    cursor.execute("""
    CREATE INDEX IF NOT EXISTS idx_report_date ON slow_query_history(report_date)
    """
)

    cursor.execute("""
    CREATE INDEX IF NOT EXISTS idx_digest ON slow_query_history(digest)
    """
)

    conn.commit()
    conn.close()
    logger.info("数据库初始化完成")


defsave_results(results: list):
"""保存分析结果到SQLite"""
    init_db()

    conn = sqlite3.connect(SQLITE_DB_PATH)
    cursor = conn.cursor()

    report_date = datetime.now().strftime("%Y-%m-%d")

for r in results:
        cursor.execute("""
        INSERT INTO slow_query_history
        (report_date, db_name, digest, sql_text, exec_count,
         avg_time_sec, max_time_sec, avg_rows_examined,
         risk_level, analysis_json)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """
, (
            report_date,
            r.get("db_name"),
            r.get("digest"),
            r.get("sql_text"),
            r.get("exec_count"),
            r.get("avg_time_sec"),
            r.get("max_time_sec"),
            r.get("avg_rows_examined"),
            r.get("risk_level"),
            json.dumps(r.get("analysis", {}), ensure_ascii=False),
        ))

    conn.commit()
    conn.close()
    logger.info(f"已保存 {len(results)} 条分析结果")


defget_trend(digest: str, days: int = 7) -> list:
"""
    获取某条SQL的历史趋势
    用于在报告中展示"这条SQL越来越慢了"
    """

    init_db()

    conn = sqlite3.connect(SQLITE_DB_PATH)
    cursor = conn.cursor()

    cursor.execute("""
    SELECT report_date, avg_time_sec, exec_count, avg_rows_examined
    FROM slow_query_history
    WHERE digest = ?
    ORDER BY report_date DESC
    LIMIT ?
    """
, (digest, days))

    rows = cursor.fetchall()
    conn.close()

return [
        {
"date": row[0],
"avg_time": row[1],
"exec_count": row[2],
"avg_rows": row[3],
        }
for row in rows
    ]

Step 6:主程序入口

main.py

#!/usr/bin/env python3
"""
MySQL慢查询AI分析系统 - 主入口
Usage:
    python main.py              # 正常运行:采集→分析→推送
    python main.py --dry-run    # 试运行:只采集和分析,不推送
    python main.py --json       # 输出JSON格式结果
"""

import sys
import logging
import argparse
import json
from datetime import datetime

from modules.collector import SlowQueryCollector
from modules.analyzer import analyze_all_queries
from modules.notifier import send_report
from modules.storage import save_results

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler(f"logs/run_{datetime.now().strftime('%Y%m%d')}.log"),
    ]
)
logger = logging.getLogger(__name__)


defmain():
    parser = argparse.ArgumentParser(description="MySQL慢查询AI分析系统")
    parser.add_argument("--dry-run", action="store_true", help="试运行,不推送消息")
    parser.add_argument("--json", action="store_true", help="输出JSON格式结果")
    args = parser.parse_args()

import os
    os.makedirs("logs", exist_ok=True)
    os.makedirs("data", exist_ok=True)

    logger.info("=" * 50)
    logger.info("MySQL慢查询AI分析系统 启动")
    logger.info("=" * 50)

# Step 1: 采集
    logger.info("Step 1/4: 采集慢查询...")
    collector = SlowQueryCollector()
try:
        collector.connect()
        queries = collector.collect_slow_queries()

ifnot queries:
            logger.info("未发现慢查询,结束运行")
return

# Step 2: 补充表结构和执行计划
        logger.info("Step 2/4: 获取表结构和执行计划...")
        enriched_queries = collector.enrich_queries(queries)

finally:
        collector.close()

# Step 3: AI分析
    logger.info("Step 3/4: AI分析中...")
    results = analyze_all_queries(enriched_queries)

# 输出JSON(如果指定了--json)
if args.json:
        print(json.dumps(results, ensure_ascii=False, indent=2, default=str))
return

# Step 4: 保存 + 推送
    logger.info("Step 4/4: 保存结果并推送...")
    save_results(results)

if args.dry_run:
        logger.info("[DRY RUN] 跳过推送,只打印报告:")
from modules.notifier import format_report
        print(format_report(results))
else:
        send_report(results)

    logger.info("运行完成!")


if __name__ == "__main__":
    main()

Step 7:配置定时任务

Crontab方式(最简单)

# 每天早上8:30执行,9:00前团队就能看到报告
30 8 * * * cd /opt/slow-query-analyzer && /usr/bin/python3 main.py >> logs/cron.log 2>&1

Systemd Timer方式(更可靠)

# /etc/systemd/system/slow-query-analyzer.service
[Unit]
Description=MySQL Slow Query AI Analyzer
After=network.target

[Service]
Type=oneshot
User=monitor
WorkingDirectory=/opt/slow-query-analyzer
ExecStart=/usr/bin/python3 main.py
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target
# /etc/systemd/system/slow-query-analyzer.timer
[Unit]
Description=Run slow query analyzer daily

[Timer]
OnCalendar=*-*-* 08:30:00
Persistent=true

[Install]
WantedBy=timers.target
systemctl enable slow-query-analyzer.timer
systemctl start slow-query-analyzer.timer

# 查看下次执行时间
systemctl list-timers | grep slow-query

Step 8:先跑一次试试

cd /opt/slow-query-analyzer

# 第一次运行,用 --dry-run 测试,不发消息
python3 main.py --dry-run

# 确认没问题后,正式运行
python3 main.py

# 如果想看JSON格式(方便调试AI输出)
python3 main.py --json | python3 -m json.tool

完整项目目录

slow-query-analyzer/
├── config/
│   ├── __init__.py
│   └── settings.py          # 配置文件
├── modules/
│   ├── __init__.py
│   ├── collector.py          # 采集模块
│   ├── analyzer.py           # AI分析模块
│   ├── notifier.py           # 推送模块
│   └── storage.py            # 存储模块
├── data/
│   └── slow_queries.db       # SQLite数据库(自动创建)
├── logs/                     # 日志目录(自动创建)
├── .env                      # 环境变量(不提交Git)
├── .gitignore
├── requirements.txt
├── main.py                   # 主入口
└── README.md

requirements.txt

pymysql>=1.1.0
openai>=1.0.0
requests>=2.31.0
jinja2>=3.1.0
python-dotenv>=1.0.0

.gitignore

.env
data/
logs/
__pycache__/
*.pyc

实际使用效果和踩坑经验

跑了两周后的真实数据:

效果数据

指标
之前
之后
慢查询发现时间
出事后才发现(小时级)
每天早上主动推送(分钟级)
慢查询处理周期
3-5天(找DBA→分析→给方案→开发改)
1天(开发看报告自己改,DBA只需审核)
每月因慢查询导致的故障
2-3次
0次(连续两个月)
DBA分析慢查询的时间投入
每周4-5小时
每周30分钟(审核AI建议)

踩过的坑

坑1:performance_schema数据是累计的

performance_schema 的数据从MySQL启动后一直累积。如果你的MySQL跑了半年没重启,采集到的 exec_count 是半年的总量,不是今天的。

解决方案:

-- 每天采集前先记录当前值,第二天对比差值
-- 或者在每次采集后重置统计
CALL sys.ps_truncate_all_tables(FALSE);
-- 注意:这会清空所有performance_schema统计数据,
-- 确认不影响其他监控后再执行

更好的方案: 在storage模块里做差值计算,不清空原始数据。

坑2:AI分析有时候会”幻觉”

大模型偶尔会建议创建一个”已经存在的索引”,或者给出语法有误的SQL。

解决方案:

  • 在Prompt里强调”请仔细查看已有索引,不要建议重复创建”
  • 分析结果标注”AI建议仅供参考,执行前请DBA审核”
  • 可以加一层校验:用Python检查建议的索引是否已存在

坑3:有些SQL太长,超过大模型Token限制

有的ORM生成的SQL几千字符,加上表结构和执行计划可能超Token限制。

解决方案:

# 在collector里加上SQL截断
MAX_SQL_LENGTH = 2000

sql_text = query["sql_text"]
if len(sql_text) > MAX_SQL_LENGTH:
    sql_text = sql_text[:MAX_SQL_LENGTH] + "... (已截断)"

坑4:API费用控制

每条慢查询调一次API,如果每天50条,一个月就是1500次调用。

费用参考(2026年价格):

模型
每条分析约Token
每条约费用
每月50条/天
GPT-4o
~3000 tokens
¥0.15
¥225/月
DeepSeek-V3
~3000 tokens
¥0.01
¥15/月
Qwen-Plus
~3000 tokens
¥0.02
¥30/月

推荐用DeepSeek或Qwen,SQL分析这个场景国产模型完全够用,费用只有GPT的十分之一。


进阶玩法

搭完基础版之后,还可以继续扩展:

1. 趋势告警——SQL在变慢

# 在analyzer.py中增加趋势检测
from modules.storage import get_trend

defcheck_degradation(query: dict) -> str:
"""检查SQL是否在恶化"""
    trend = get_trend(query["digest"], days=7)
if len(trend) < 3:
return""

    recent_avg = trend[0]["avg_time"]
    week_ago_avg = trend[-1]["avg_time"]

if recent_avg > week_ago_avg * 2:
returnf"⚠️ 趋势恶化:该SQL一周内平均耗时从{week_ago_avg}s增长到{recent_avg}s(翻倍),可能是数据量增长导致"
return""

2. 对接Grafana Dashboard

把SQLite的数据接入Grafana,做一个慢查询趋势面板:

  • 每日慢查询数量趋势
  • Top 10 慢SQL排行
  • 各数据库的慢查询分布
  • 优化前后的耗时对比

3. 自动生成Jira工单

把高危慢查询自动创建Jira工单,指派给对应的开发团队:

# 根据数据库名映射到开发团队
DB_OWNER_MAP = {
"order_db""订单团队",
"user_db""用户团队",
"payment_db""支付团队",
}

4. 接入多个MySQL实例

真实环境不会只有一个MySQL。改造方法很简单——配置文件里写多个MySQL连接,循环采集:

MYSQL_INSTANCES = [
    {"name""主库-订单""host""10.0.1.100""port"3306},
    {"name""主库-用户""host""10.0.1.101""port"3306},
    {"name""主库-支付""host""10.0.1.102""port"3306},
]

最后

这套系统的本质是什么?

把DBA的专业经验,通过AI”平民化”了。

以前分析慢查询是DBA的专属技能——要看执行计划、要懂索引设计、要权衡各种方案。普通开发和运维做不来。

现在AI可以做到DBA 80%的分析工作。DBA从”每条SQL都要我看”变成了”我只审核AI的建议”,效率提升了10倍。

这就是AI时代运维的工作方式——不是取代人,而是让每个人都拥有专家级的分析能力。

这套系统的代码不复杂(核心不到500行),但它解决的是一个真实的、高频的、痛点明确的问题。如果你想在简历上加一个”AI+运维”的项目经验,这个就很合适。


觉得有用?分享给更多运维同行看到 👇

转发给你身边还在手动分析慢查询的DBA和运维同事,这篇能帮他们省掉每周好几个小时的重复劳动。

我是「运维AI进化论」,一个用AI武装自己的运维工程师。分享运维+AI实战干货。关注我,一起进化 🚀