🚀 Day 9:成果落地 —— Act 阶段战报生成与大屏数据落盘
今日目标:完成整个 PEAK 框架的最核心闭环:Act with Knowledge(基于知识采取行动)。 此时,Day 8 的代码已经执行完毕,我们的 Python 内存中拥有了一个名为 all_hunt_evidence 的列表,里面装满了诸如 round_1_hit_count: 150 这样的硬核数据。接下来的任务是:将这些数据转化为 LLM 能看懂的 Prompt,触发二次推理,并将最终的 JSON 战报连同前面的证据一起,打包成一个巨大的“企业级 Payload”写入 Splunk 索引。
💻 Step 1: Act 阶段 (喂入证据、二次触发 LLM 与防御性解析)
动作解析: 这一步的代码必须严格粘贴在 Day 8 的 for 循环结束之后。
1. 数据序列化:把 Python 里的字典数组转成 JSON 字符串,因为大模型只能读懂文本。
2. 构建 Prompt:赋予 AI “安全总监”的人设,要求它根据命中数(Hit Counts)的逻辑关系(如 R1 很高但 R2 为 0,说明是误报;R1 和 R2 都很高,说明是真实攻击),给出一个 0-100 的确切风险评分。
3. 防弹解析:通过 try...except json.JSONDecodeError,防止大模型因为抽风输出非 JSON 格式而导致整个 5 分钟的调度任务崩溃。
📝 注入核心逻辑代码 (接在 Day 8 代码下方):
#==========================================
#STEP 4: The Act Phase (Day 9 AI Final Qualification)
#==========================================
helper.log_info("InitiatingAct Phase: Triggering LLM API for Final Assessment...")
#1. Serialize the collected evidence list into a JSON string to feed the LLM
evidence_payload= json.dumps(all_hunt_evidence, ensure_ascii=False)
#2. In a production environment, you would construct your messages here
#and execute the second API call. For testing the pipeline, we mock the response.
#This mock represents the LLM analyzing the 'evidence_payload' and making a decision.
mock_act_response= """
{
"executive_summary":"通过两轮下钻分析,假设1(暴力破解)在R2深度关联中发现连续高频请求,证实存在凭证填充攻击风险。",
"threat_qualification":"Confirmed Threat",
"risk_score":92,
"recommended_alert_spl":"search index=main CIp=* | bin _time span=1s | stats count by _time, CIp | where count > 10"
}
"""
#3. Defensive parsing block to handle potential AI format hallucinations
final_report= {}
try:
#Strip whitespace to prevent JSONDecodeError from trailing newlines
final_report= json.loads(mock_act_response.strip())
helper.log_info(f"AIAssessment Complete. Final Risk Score calculated: {final_report.get('risk_score')}")
exceptjson.JSONDecodeError as e:
helper.log_error(f"Criticalerror: Failed to parse final report JSON. Detail: {str(e)}")
#Fallback mechanism to ensure the data pipeline does not break
final_report= {
"executive_summary":"Error parsing AI response during Act Phase.",
"threat_qualification":"Unknown",
"risk_score":0,
"recommended_alert_spl":""
}
💻 Step 2: 数据落盘 (组装上帝视角 Payload 并写入 Splunk)
动作解析: 很多新手写 Splunk 插件,习惯用 print() 或者 helper.log_info() 输出结果,这是大错特错的!打印在后台的日志(_internal 索引)根本无法用于企业级大屏展示。
1. 组装 Master Payload:我们将 Day 7 的蓝图(ai_hunting_plan)、Day 8 的证据(execution_metrics)和刚才 Day 9 得到的终极定性(final_assessment)合并到一个字典中。这就相当于一份包含了起因、经过、结果的完整卷宗。
2. 强制指定 Sourcetype:使用 helper.new_event() 时,必须将 sourcetype 显式声明为 _json。这会触发 Splunk 底层的自动字段提取机制(Auto KV Extraction),让复杂的 JSON 树在前端直接变成可被搜索的独立字段。
3. 提交写入:使用 ew.write_event(event) 将这条史诗级事件永久打入业务索引(如 main 索引)。
📝 注入落盘逻辑代码:
#==========================================
#STEP 5: Data Ingestion (Writing the Master Payload to Splunk)
#==========================================
helper.log_info("AssemblingMaster Hunt Report for index ingestion...")
#1. Assemble the ultimate JSON document containing all three phases of PEAK
master_payload= {
"event_type":"PEAK_Hunting_Report",# CRITICAL: Anchor field for Dashboard searches
"timestamp":datetime.datetime.utcnow().isoformat(),
"target_index":target_index,
"hunting_plan":ai_hunting_plan,# The original blueprint from Prepare Phase
"execution_metrics":all_hunt_evidence, # The hits and duration from Execute Phase
"final_assessment":final_report# The ultimate AI qualification from Act Phase
}
#2. Create a new Splunk event object using the Add-on Builder helper
#Setting sourcetype to "_json" enables native Splunk JSON syntax highlighting and field extraction
event= helper.new_event(
source=helper.get_input_type(),
index=target_index,
sourcetype="_json",
data=json.dumps(master_payload,ensure_ascii=False)
)
#3. Commit the event to the user-specified Splunk data store
ew.write_event(event)
#Stop the master stopwatch and calculate total execution time
total_cycle_time= round(time.time() - cycle_start_time, 2)
helper.log_info(f"SUCCESS:Master Hunt Report completely written to Splunk! Total cycle time: {total_cycle_time} seconds.")
#4. Catch-all exception block for the entire Agentic Workflow
exceptException as e:
helper.log_error(f"CriticalFailure during Agentic Execution Workflow: {str(e)}")
🔍 Step 3: 终极极客验证 (全自动闭环的结果确认)
代码写完,真正的极客绝不相信“理论上能跑”,我们必须用 SPL 在前端验证数据是否已完美结构化落地。
详细验证操作:
1. 在 AOB 代码编辑器的右上角点击绿色的 Test 按钮。
2. 观察底部 Output 面板,等待打印出绿色的 Done。
3. 点击右上角 Save 按钮保存代码! (不点 Save 代码不会生效)
4. 打开一个新的浏览器标签页,进入 Splunk 的 Search & Reporting (搜索与报表) 应用。
5. 在搜索框中输入以下极客指令(将时间范围调整为 Last 15 minutes):
index=main sourcetype="_json" event_type="PEAK_Hunting_Report"
| rename final_assessment.risk_score as Risk_Score,
final_assessment.threat_qualificationas Threat_Level,
final_assessment.executive_summaryas Summary
| table _time, target_index, Threat_Level, Risk_Score, Summary
| sort - Risk_Score
🎉 终极成功标志与意义:
• 你将看到什么:表格中清晰展现出 Risk_Score = 92, Threat_Level = Confirmed Threat,以及详细的中文战报摘要。
• 背后的架构意义:由于我们在 Step 5 中设置了 sourcetype="_json",Splunk 极其聪明地自动解析了深达三层嵌套的 JSON 结构(它自动识别出了 final_assessment.risk_score 这个字段路径)。这种无缝的结构化数据落盘,是 Day 16 我们能够用几行 SPL 就画出极其绚丽的高管安全态势大屏的核心保障!
👇 全套教程多平台同步更新 👇
https://blog.csdn.net/thewindrider/category_13144991.html
https://gitcode.com/Chang_feng_Po/Splunk-AI-PEAK-Tutorial
夜雨聆风