乐于分享
好东西不私藏

网络设备一键巡检(五):“AI巡检系统”终极进化 →自动推送到企微+安全脱敏–以h3c+Fortinet为例(附完整源码)

网络设备一键巡检(五):“AI巡检系统”终极进化 →自动推送到企微+安全脱敏–以h3c+Fortinet为例(附完整源码)

点击关注,一起学习更多IT知识。
 前言:
  1. 上期我们做了“网络设备一键巡检(四):我把网络设备巡检->升级成“会思考的AI系统”–以h3c+Fortinet为例(附源码)”,但是:
    1.出问题时,人不在电脑前告警太多,看不过来。
    2.只知道“哪里坏了”,不知道“为什么坏”。
    3.推送消息存在安全隐患。
  2. 这期我们进行升级重点升级了4个核心能力:Webhook告警 + AI分析建议 + 安全脱敏 + 告警降噪。
  3. 整个流程:


  • 设备巡检:
    → 数据采集(CPU/内存/接口)
    → 异常识别(drop / CRC / flap 等)
    → 告警聚合(按设备合并后,涉及到设备信息也做脱敏处理。)
    → 程序将采集、处理好的错误信息交给AI
    → AI根因分析(为什么出问题)
     AI生成处理建议
    → 安全脱敏(对外数据保护)
    → Webhook推送
    → 前端展示 / 报告输出。
  • 注:有条件的还是建议接入本地部署的大模型。最大程度确保数据安全。(现在很多公司都有自己的AI)
一、我们还是先来看最终效果:
二、我们根据上一版,我们先把安装环境+ai_api等都部署上,再在上一版基础上加获取webhook,然后更新代码,请看以下为上一版链接:
网络设备一键巡检(四):我把网络设备巡检->升级成“会思考的AI系统”–以h3c+Fortinet为例(附源码)
三、企业微信webhook获取:
  1. 获取方式:打开企微 → 群聊 → 消息推送 → 获取Webhook地址。
  2. 请看以下是webhook获取截图:
四、上代码,以下是这一版完整版部署。请看以下步骤:
  •    1、建立文件目录,请看以下截图详细介绍:
  •    2、设备信息–“device.json”,请看以下代码(注:除了设备类型是要根据设备来填,其它信息根据实际情况填)
[  {    "name": "FortiGate",    "device_type": "fortinet",    "host": "192.168.31.254",    "username": "admin",    "password": "Password"  },  {    "name": "H3C-Core",    "device_type": "hp_comware",    "host": "192.168.31.253",    "username": "admin",    "password": "Password"  }]
  •    3、存放ai_api代码–“config.py”(注:以阿里云ai_api为例):
API_KEY = "你的ai_api"BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"MODEL = "qwen-plus"
  •    4、主程序–“app.py”代码(注:由于代码过长,我放以下链接里了,请复制以下链接到浏览器后,直接可打开代码。将webhook地址改成你获取的就行。):
https://gitee.com/IT_pangge/it-explorer-fat-brother/raw/master/AI%20%E7%BD%91%E7%BB%9C%E4%B8%80%E9%94%AE%E5%B7%A1%E6%A3%80/app.py
  •    注:以下是“app.py”代码展示:
# -*- coding: utf-8 -*-from flask import Flask, render_template, jsonify, requestimport json, time, re, webbrowser, requests, osfrom threading import Threadfrom netmiko import ConnectHandlerfrom functools import wrapsapp = Flask(__name__)# ================= 配置 =================DEBUG_MODE = TrueAPI_TOKEN = "default_token_123"WEBHOOK_URL = "你的webhook地址"REPORT_DIR = "reports"device_stats = []event_log = []last_alerts = {}IGNORE_PORTS = ["port2""port3"]# ================= ✅ 设备别名(脱敏但可识别) =================DEVICE_ALIAS = {"H3C-Core""Core-SW","H3C-Access1""Access-SW1","H3C-Access2""Access-SW2","FortiGate""FW"}defmask_device(name):return DEVICE_ALIAS.get(name, name)# ================= 初始化 =================definit():ifnot os.path.exists(REPORT_DIR):        os.makedirs(REPORT_DIR)# ================= Token =================defrequire_token(f):    @wraps(f)defwrapper(*args, **kwargs):if DEBUG_MODE:return f(*args, **kwargs)        token = request.headers.get("X-Token")if token != API_TOKEN:return jsonify({"error""Unauthorized"}), 403return f(*args, **kwargs)return wrapper# ================= 工具 =================defshort_list(lst):return"正常"ifnot lst else", ".join(lst[:3]) + ("..."if len(lst) > 3else"")defgen_issue_key(name):return name  # 聚合到设备# ================= H3C =================defparse_h3c_cpu(text):    m = re.search(r'(\d+)%\s+in last 5 seconds', text)return int(m.group(1)) if m elseNonedefparse_h3c_mem(text):    total_total, total_used = 00for line in text.splitlines():        parts = line.split()if len(parts) >= 7:try:                total_total += int(parts[2])                total_used += int(parts[3])except:continuereturn round(total_used / total_total * 1001if total_total elseNonedefparse_h3c_interface(text):    issues, port = [], ""for line in text.splitlines():if re.match(r'^\S+Ethernet', line):            port = line.split()[0]continueif"Last link flapping"in line:            m = re.search(r'(\d+)', line)if m:                minutes = int(m.group(1))if minutes < 1:                    issues.append(f"{port}(flap-critical)")elif minutes < 5:                    issues.append(f"{port}(flap-warn)")if"CRC"in line:            m = re.search(r'(\d+)', line)if m and int(m.group(1)) > 1000:                issues.append(f"{port}(crc)")if"drops"in line.lower():            m = re.search(r'(\d+)', line)if m and int(m.group(1)) > 1000:                issues.append(f"{port}(drop)")return list(set(issues))# ================= Fortinet =================defparse_forti_cpu(text):    m = re.search(r'(\d+)%\s*idle', text)return100 - int(m.group(1)) if m elseNonedefparse_forti_mem(text):    m = re.search(r'used \((\d+(?:\.\d+)?)%\)', text)return int(float(m.group(1))) if m elseNonedefparse_forti_interface(text):    issues, port = [], ""for line in text.splitlines():if line.startswith("name:"):            port = line.split(":")[1].strip()if"link:"in line.lower():            status = line.split(":")[1].strip().lower()if port in IGNORE_PORTS:continueif status == "down":                issues.append(f"{port}(down)")return list(set(issues))# ================= 状态 =================defevaluate(cpu, mem, iface):if cpu isNoneor mem isNone:return"异常"if cpu > 90or mem > 90:return"异常"if any("(drop)"in i or"(crc)"in i for i in iface):return"异常"if any("flap"in i for i in iface):return"警告"return"正常"# ================= AI根因分析 =================defanalyze_root_cause(device):    iface_list = device['iface_list']    cpu = device['cpu']    mem = device['mem']    drop = [i for i in iface_list if"(drop)"in i]    crc = [i for i in iface_list if"(crc)"in i]    flap = [i for i in iface_list if"flap"in i]if drop:return"链路拥塞/带宽不足""检查带宽/流量"if crc:return"物理层问题(光模块/网线)""更换光模块或网线"if len(flap) >= 5:return"疑似环路/STP震荡""检查二层环路/STP"if flap:return"链路不稳定""检查对端设备/协商"if isinstance(cpu, (int, float)) and cpu > 90:return"CPU过高""检查流量/攻击"if isinstance(mem, (int, float)) and mem > 90:return"内存异常""检查进程"returnNoneNone# ================= 🚨 告警(最终版) =================defbuild_alert(device):    name = device['name']    safe_name = mask_device(name)    iface_list = device['iface_list']    reason, suggestion = analyze_root_cause(device)ifnot reason:returnNone# 👉 接口展示(关键)    iface_show = []for i in iface_list[:5]:        iface_show.append(i.split("(")[0].replace("GigabitEthernet""Gi"))    iface_str = ",".join(iface_show)if len(iface_list) > 5:        iface_str += f"...共{len(iface_list)}个接口异常"returnf"""🚨AI巡检告警设备:{name}{safe_name}接口:{iface_str}原因:{reason}建议:{suggestion}"""# ================= Webhook =================defsend_webhook(msg):ifnot WEBHOOK_URL ornot msg:returntry:        print("📤 Webhook:", msg)        requests.post(WEBHOOK_URL, json={"msgtype""text","text": {"content": msg}        }, timeout=5)except Exception as e:        print("Webhook失败:", e)# ================= HTML =================defgenerate_html():    html = "<html><body><h2>AI巡检报告</h2><table border='1'>"    html += "<tr><th>设备</th><th>CPU</th><th>内存</th><th>接口</th><th>状态</th></tr>"for d in device_stats:        html += f"<tr><td>{d['name']}</td><td>{d['cpu']}%</td><td>{d['mem']}%</td><td>{d['iface']}</td><td>{d['status']}</td></tr>"    html += "</table></body></html>"return html# ================= 主循环 =================defcollect():global device_statswhileTrue:        new_stats = []        alerts = []        devices = json.load(open("devices.json", encoding="utf-8"))for d in devices:            dev = d.copy()            name = dev.pop("name", dev.get("host"))try:                conn = ConnectHandler(**dev)if dev["device_type"] == "hp_comware":                    cpu = parse_h3c_cpu(conn.send_command("display cpu-usage"))                    mem = parse_h3c_mem(conn.send_command("display memory summary"))                    iface = parse_h3c_interface(conn.send_command("display interface"))elif dev["device_type"] == "fortinet":                    perf = conn.send_command("get system performance status")                    cpu = parse_forti_cpu(perf)                    mem = parse_forti_mem(perf)                    iface = parse_forti_interface(conn.send_command("get system interface physical"))else:                    cpu, mem, iface = NoneNone, []                conn.disconnect()                status = evaluate(cpu, mem, iface)                device = {"name": name,"cpu": cpu if cpu isnotNoneelse"-","mem": mem if mem isnotNoneelse"-","iface": short_list(iface),"iface_list": iface,"status": status                }                new_stats.append(device)if status != "正常":                    alerts.append(device)except Exception as e:                error_msg = str(e)                new_stats.append({"name": name,"cpu""-","mem""-","iface"f"错误:{error_msg}","iface_list": [],"status""异常"                })                print(f"❌ {name} 出错:", error_msg)        device_stats[:] = new_stats        now = time.time()for device in alerts:            key = gen_issue_key(device['name'])if key in last_alerts and now - last_alerts[key] < 1800:continue            alert_text = build_alert(device)ifnot alert_text:continue            send_webhook(alert_text)            event_log.append({"time": time.strftime("%H:%M:%S"),"event": alert_text            })            print("🔥 告警:", alert_text)            last_alerts[key] = now        event_log[:] = event_log[-10:]        time.sleep(5)# ================= API =================@app.route("/")defindex():return render_template("index.html")@app.route("/api/data")@require_tokendefdata():return jsonify({"devices": device_stats})@app.route("/api/events")@require_tokendefevents():return jsonify(event_log)# ================= 启动 =================if __name__ == "__main__":    init()    Thread(target=collect, daemon=True).start()defopen_browser():        time.sleep(2)        webbrowser.open("http://127.0.0.1:5001")    Thread(target=open_browser).start()    app.run(host="127.0.0.1", port=5001)
五、可以开始进行测试了,cmd进入文件目录,运行app.py文件,当有告警时,企微里同步收到,说明测试成功。请看以下截图:
  • 注:测试完就可以建立bat文件一键运行,以下是示例代码:
@echo off cd /d E:\python_test_2026\ai 诊断网络\webhook版本\test6python app.py
六、对这版功能升级的总结与启示:
  • 这一版实际上主要升级解决了4个问题:

  1. 不可能一直盯系统-> 自动推送到企微。

  2. 告警降噪: 之前:一个设备 → 十几条接口告警。现在:一个设备 → 一条总结告警。告警减少80%,不刷屏,更聚焦问题。

  3. “可控脱敏”(比上一版增强):内部人员能识别、外部不会暴露架构、可接Webhook / AI。(注:能接入本地部署的AI最稳)

  4. AI自动分析原因 + 建议。注:文中部分代码由AI辅助生成。

  5. 启示:当然这自建的版本有很多地方待完善优化;也没法和市面上专业网络监控相比。但希望可以给也在探索与尝试的朋友们,提供更多思路。后续将做更多优化分享,感谢关注