系列「企业级 AI Agent 实现拆解」第六篇。上一篇讲了 SSE 推流,这篇看安全护栏怎么以插件形式嵌进 ReAct 循环。
为什么需要 Hook
ReAct 循环跑起来后,我遇到了几个没法在循环本身里解决的问题:
某些租户有内部合规规定,LLM 的输出不能含 PII(姓名、电话、身份证号) 高危工具(执行 SQL、发 Email)需要人工审批,但不同租户的审批规则不同 需要记录每次工具调用的详情,但不想把审计逻辑嵌进循环代码
这些需求有个共同特点:它们是横切关注点,跟循环的主逻辑无关,但需要在循环的特定时机插入执行。
做法有两种:把这些逻辑硬编码进循环,或者设计成可配置的钩子。我们选了后者——7 个阶段 × 三种实现类型 × 四种决定,组合成一个插件系统。
7 个 Phase,覆盖完整生命周期
type Phase stringconst (PhasePreSession Phase = "pre_session" // session 建立前PhasePreModel Phase = "pre_model_call" // 调 LLM 前PhasePostModel Phase = "post_model_call"// LLM 返回后PhasePreTool Phase = "pre_tool_use" // 工具调用前(HITL 在这里)PhasePostTool Phase = "post_tool_use" // 工具返回后(PII 脱敏在这里)PhasePostSession Phase = "post_session" // session 结束时PhaseOnError Phase = "on_error" // 任意阶段报错时)
循环里每到一个阶段,就调对应的 HookRunner 方法。application 层只知道接口,不知道背后跑的是什么 hook。
3 种实现:从平台内置到租户自定义
type Impl stringconst (ImplBuiltin Impl = "builtin" // 内置 Go funcImplWASM Impl = "wasm" // 租户上传 WASM,gVisor 沙箱跑ImplHTTP Impl = "http" // 远程 webhook(对接审批系统、风控))
Builtin:同进程 Go 函数,零网络开销,适合平台级规则(PII 脱敏、SQL 危险关键字检测)。
WASM:租户上传 .wasm 文件,在 gVisor 沙箱里执行,有 CPU/内存限制,租户自定义逻辑但无法逃逸到宿主系统。
HTTP:对接已有的审批系统或风控平台。一个 webhook URL,hook 框架发 POST,等响应里的 decision 字段。
4 种决定,主流程根据结果分支
type Decision stringconst (DecAllow Decision = "allow" // 放行DecDeny Decision = "deny" // 拒绝DecModify Decision = "modify" // 改写 payload 后放行DecRequire Decision = "require_approval" // 触发 HITL)
DecRequire 是 Hook 系统和 HITL 中断的连接点之一。在实际实现中,HITL 主要通过 Eino 图级的 WithInterruptBeforeNodes(["tools"]) 实现——当 ReAct 循环即将执行工具时,Eino 框架自动暂停图执行(详见第 25 篇)。Hook 的 DecRequire 则是另一种触发方式:PreToolUse hook 检测到高危操作时返回 require_approval,应用层收到后同样调 sess.Pause() 进入等待。两条路径最终汇聚到同一个 Session 状态机。
Dispatch:按优先级串行执行
Hook BC 接受 agent 的 Dispatch(phase, payload) 请求,找出同 phase 的所有 hook,按 priority 升序执行:
// dispatch.go — Handle(简化)func(h *DispatchHandler) Handle(ctx context.Context, in DispatchInput) (*DispatchResult, error) {list, _ := h.hooks.ListByPhase(ctx, in.TenantID, in.Phase)payload := in.Payloadfor _, hk := range list {if !hk.Matches(payload) { continue }// 找到对应的 runner(builtin / wasm / http 三种实现)runner, ok := h.runners[hk.Impl()]if !ok { continue }// 带超时执行hctx, cancel := context.WithTimeout(ctx, time.Duration(hk.TimeoutMs())*time.Millisecond)start := time.Now()decision, modified, reason, runErr := runner.Run(hctx, hk, payload)cancel()// 发布执行事件(outbox pattern → NATS → audit consumer 落库)h.bus.Publish(ctx, in.SessionID, []model.DomainEvent{model.EventHookExecuted{HookName: hk.Name(), Decision: decision,LatencyMs: time.Since(start).Milliseconds(),},})// hook 报错:看 fail_open 决定放行还是 denyif runErr != nil {if hk.FailOpen() { continue }return &DispatchResult{Decision: model.DecDeny, DenyReason: runErr.Error()}, nil}switch decision {case model.DecDeny:// 额外发一条 EventHookDenied,审计链需要h.bus.Publish(ctx, in.SessionID, []model.DomainEvent{model.EventHookDenied{...}})return &DispatchResult{Decision: model.DecDeny, DenyReason: reason}, nilcase model.DecRequire:return &DispatchResult{Decision: model.DecRequire, DenyReason: reason}, nilcase model.DecModify:payload = modified // 改写 payload,后续 hook 看到新的}}return &DispatchResult{Decision: model.DecAllow, ModifiedPayload: payload}, nil}
三个设计决定值得说明:
deny/require_approval 遇到立即返回:一个 hook 说拒绝了,就没必要再问下一个。
modify 累积:payload 一层层改,PostToolUse 阶段用这个做 PII 脱敏——第一个 hook 把姓名脱敏,第二个把电话脱敏,最终拼进消息历史的是脱敏后的版本。
执行事件走 outbox pattern:每次 hook 执行完,先发一条 EventHookExecuted 领域事件到 outbox 表,再由 relay 异步推 NATS,audit consumer 消费后落 hook_executions 表。比 goroutine 直写更可靠:DB 写入成功即不丢失,NATS 不可用时 relay 会重试。
fail_open:不同场景不同容忍度
Hook.failOpen 决定 hook 本身报错时的处理(默认 true):
fail_open=true(默认):hook 挂了就跳过继续。适合审计告警——告警系统挂了不能阻塞业务。 fail_open=false:hook 挂了按 deny 处理。适合 PII 脱敏——合规系统挂了就不该让数据出去。
每次 hook 执行都会发布 EventHookExecuted 领域事件,经过 outbox → NATS → audit consumer 链路,最终写入 hook_executions 表。审计系统能直接查哪个 hook 在什么时候做了什么决定、耗时多少。
Hook 怎么接入 ReAct 循环:Eino Callback 桥接
上面定义了 Hook 的模型和 Dispatch 逻辑,但 ReAct 循环是 Eino 在跑——Eino 不知道什么是 Hook、什么是 Phase。两者之间的桥梁是 einoadapter/callback.go。
Eino 框架提供了一个 Callback 机制:在图的节点执行前后,框架会调用注册的回调函数。我们的 AgentCallback 就是利用这个机制,把 Eino 的回调翻译成 HookRunner 的方法调用:
// callback.go(简化)funcNewAgentCallback(runner port.HookRunner, streamer port.Streamer, sid model.SessionID) callbacks.Handler {b := callbacks.NewHandlerBuilder()// Eino 节点开始执行时b.OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input ...) context.Context {switch info.Component {case "ChatModel": // LLM 节点runner.BeforeModelCall(ctx, sid, ...) // → pre_model_call hookcase "Tool": // 工具节点runner.BeforeToolUse(ctx, sid, ...) // → pre_tool_use hookstreamer.Emit(ctx, sid, StreamEvent{Type: "turn.tool_call", ...}) // → SSE}return ctx})// Eino 节点执行完成后b.OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output ...) context.Context {switch info.Component {case "ChatModel":runner.AfterModelCall(ctx, sid, ...) // → post_model_call hookcase "Tool":runner.AfterToolUse(ctx, sid, ...) // → post_tool_use hookstreamer.Emit(ctx, sid, StreamEvent{Type: "turn.tool_result", ...}) // → SSE}return ctx})// 节点报错时b.OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {runner.OnError(ctx, sid, err) // → on_error hookreturn ctx})return b.Build()}
OnStart 对应 “调用前”(BeforeModelCall、BeforeToolUse),OnEnd 对应 “调用后”(AfterModelCall、AfterToolUse),OnError 对应 “报错时”。Eino 告诉你"哪个组件在干什么",callback 把它翻译成具体的 hook 方法调用。
注意 callback 还顺便做了 SSE 推流——BeforeToolUse 时发 turn.tool_call,AfterToolUse 时发 turn.tool_result。因为这两个时机和 SSE 推流的时机完全重合,放在同一个 callback 里最自然,不用再加一层。
小结
Hook 系统的价值在于把安全策略从循环代码里解耦出来。加一条 PII 脱敏规则,不需要改 ReAct 循环,在数据库里插一行 hook 配置,下一次循环就生效。
解耦的关键是一层桥接:Eino Callback 负责在节点执行前后通知我们,AgentCallback 把通知翻译成 HookRunner 方法调用,DispatchHandler 按 priority 串行执行匹配的 hook。三层各管各的——Eino 管图的执行,Callback 管时机翻译,Hook 管业务策略。
下一篇:工具调用 —— Agent 的手和眼
夜雨聆风