OpenClaw.NET 技术解读(第六篇):Home Assistant 与 MQTT 事件桥
摘要:除聊天与 Webhook 入站外,OpenClaw 还支持把 Home Assistant 实时事件 与 MQTT 消息 转成 InboundMessage 写入核心 MessagePipeline,从而触发固定 ChannelId / SessionId 上的 Agent 回合。实现位于 OpenClaw.Agent/Integrations,由网关在启动时以 BackgroundService 方式拉起。

一、与「工具」的区别
HomeAssistantTool / MqttTool 等是 Agent 被动响应时由模型调用的 ITool(查状态、发指令等)。事件桥则是 主动向管道塞入一条系统消息:SenderId = "system",文本由模板渲染,走与渠道消息相同的 MessagePipeline → 中间件 → Agent 路径。
因此:工具解决 「用户/模型想操作时怎么调用 HA/MQTT」;事件桥解决 「外部世界变化时怎么叫醒 Agent」。
二、网关如何挂载
StartNativeEventBridges 在运行时初始化后调用:若 原生 Home Assistant 插件开启 且 Events.Enabled,则构造 HomeAssistantEventBridge;若 MQTT 开启 且 Events.Enabled,则构造 MqttEventBridge。二者都接收 MessagePipeline.InboundWriter,与 Worker 消费的是同一有界 Channel。
private static void StartNativeEventBridges(
GatewayConfig config,
ILoggerFactory loggerFactory,
MessagePipeline pipeline,
CancellationToken stoppingToken)
{
if (config.Plugins.Native.HomeAssistant.Enabled && config.Plugins.Native.HomeAssistant.Events.Enabled)
{
var haLogger = loggerFactory.CreateLogger<HomeAssistantEventBridge>();
var haBridge = new HomeAssistantEventBridge(
config.Plugins.Native.HomeAssistant,
haLogger,
pipeline.InboundWriter);
_ = haBridge.StartAsync(stoppingToken).ContinueWith(
t => haLogger.LogError(t.Exception!.InnerException, "HomeAssistant event bridge failed to start"),
TaskContinuationOptions.OnlyOnFaulted);
}
if (config.Plugins.Native.Mqtt.Enabled && config.Plugins.Native.Mqtt.Events.Enabled)
{
var mqttLogger = loggerFactory.CreateLogger<MqttEventBridge>();
var mqttBridge = new MqttEventBridge(
config.Plugins.Native.Mqtt,
mqttLogger,
pipeline.InboundWriter);
_ = mqttBridge.StartAsync(stoppingToken).ContinueWith(
t => mqttLogger.LogError(t.Exception!.InnerException, "MQTT event bridge failed to start"),
TaskContinuationOptions.OnlyOnFaulted);
}
}
启动采用 StartAsync + ContinueWith(OnlyOnFaulted) 记录异常,桥内部自带 断线重连与指数退避(见下文)。
三、Home Assistant 事件桥
HomeAssistantEventBridge 继承 BackgroundService。未启用时直接打日志返回。
连接与鉴权:用 ClientWebSocket 连到 BaseUrl 推导的 ws(s)://…/api/websocket,完成 HA 要求的 auth_required → auth(access_token)→ auth_ok 握手,再对每个配置的 SubscribeEventTypes 发送 subscribe_events 并等待 result.success。
事件处理:只处理 type == "event" 的帧;从 event.data 解析 entity_id、old_state/new_state、friendly_name 等。先按 Events.AllowEntityIdGlobs / DenyEntityIdGlobs 做实体过滤。
规则与模板:HomeAssistantRuleEngine.SelectRule 在 Events.Rules 中按 实体 glob、from/to 状态、本地时间段、星期 匹配;无匹配时若 EmitAllMatchingEvents 为 false 则丢弃。命中后 Render 用规则或全局 PromptTemplate 替换 {event_type}、{entity_id} 等占位符。
冷却:GlobalCooldownSeconds 全局限流;每条规则可有 CooldownSeconds,避免同一自动化在短时间刷屏。
出站:组装 InboundMessage(ChannelId / SessionId 来自配置,SenderId 固定 system),WriteAsync 到 InboundWriter。
var msg = new InboundMessage
{
ChannelId = _config.Events.ChannelId,
SessionId = _config.Events.SessionId,
SenderId = "system",
Text = text
};
await _inbound.WriteAsync(msg, ct);
配置模型见 Core HomeAssistantConfig / HomeAssistantEventsConfig(GatewayConfig 的 Plugins.Native.HomeAssistant 下),字段如 SubscribeEventTypes、PromptTemplate、Rules 等与 USER_GUIDE.md 中 Home Assistant 章节一致。

四、MQTT 事件桥
MqttEventBridge 同样为 BackgroundService,依赖 MQTTnet。启用条件:Mqtt.Enabled、Events.Enabled 且 Events.Subscriptions 非空。
连接:通过 OpenClawMqttClientFactory 创建客户端与连接选项(与 MQTT 工具共用工厂,保证策略一致)。
订阅:对每个 MqttSubscriptionConfig.Topic(支持通配符)校验 Policy.AllowSubscribeTopicGlobs / DenySubscribeTopicGlobs,再 SubscribeAsync。
消息处理:收到消息后再次对 实际 topic做策略校验(纵深防御);载荷截断到MaxPayloadBytes,UTF-8 解码后写入MqttMessageCache(供MqttTool等读取最近载荷)。用MqttTopicFilterComparer找到匹配的第一条订阅配置,按CooldownSeconds限流,再用PromptTemplate或默认"MQTT message on {topic}: {payload}"生成文本,封装为InboundMessage 写入管道。
var msg = new InboundMessage
{
ChannelId = _config.Events.ChannelId,
SessionId = _config.Events.SessionId,
SenderId = "system",
Text = text
};
await _inbound.WriteAsync(msg, ct);

五、可靠性行为(两处共通)
- 外层循环
: ExecuteAsync内while (!stoppingToken.IsCancellationRequested),异常后Task.Delay退避(MQTT/HA 均类似,HA 在HomeAssistantEventBridge中上限约 30s 倍增)。 - 单桥实例
:每种桥各一个 BackgroundService,与网关进程同生命周期;不单独占端口,只消费上游 WS/MQTT。
六、运维提示
SessionId/ChannelId
必须指向网关已注册、且策略允许处理 system发送者 的会话/渠道,否则消息可能无法进入预期 Agent 路径(需结合渠道与 allowlist 配置排查)。-
事件风暴时优先调 GlobalCooldownSeconds、EmitAllMatchingEvents(HA)、CooldownSeconds(MQTT 按订阅)与 规则收窄。 -
与 第五篇 的关系:事件桥写入的 InboundMessage最终仍由MessagePipeline与AgentRuntime消费;工具层OpenClawToolExecutor不参与「入站」阶段,只参与模型发起工具调用之后。
延伸阅读
src/OpenClaw.Agent/Integrations/HomeAssistantEventBridge.cssrc/OpenClaw.Agent/Integrations/MqttEventBridge.cssrc/OpenClaw.Agent/Integrations/HomeAssistantRuleEngine.csdocs/TOOLS_GUIDE.md
(Home Assistant / MQTT 工具与策略) docs/USER_GUIDE.md
(Home Assistant 事件与 MQTT 配置说明)
版权声明:基于开源仓库 OpenClaw.NET 整理,仅供技术交流。
夜雨聆风