乐于分享
好东西不私藏

OpenClaw.NET 技术解读(第六篇):Home Assistant 与 MQTT 事件桥

OpenClaw.NET 技术解读(第六篇):Home Assistant 与 MQTT 事件桥

摘要:除聊天与 Webhook 入站外,OpenClaw 还支持把 Home Assistant 实时事件 与 MQTT 消息 转成 InboundMessage 写入核心 MessagePipeline,从而触发固定 ChannelId / SessionId 上的 Agent 回合。实现位于 OpenClaw.Agent/Integrations,由网关在启动时以 BackgroundService 方式拉起。


一、与「工具」的区别

HomeAssistantTool / MqttTool 等是 Agent 被动响应时由模型调用的 ITool(查状态、发指令等)。事件桥则是 主动向管道塞入一条系统消息SenderId = "system",文本由模板渲染,走与渠道消息相同的 MessagePipeline → 中间件 → Agent 路径。

因此:工具解决 「用户/模型想操作时怎么调用 HA/MQTT」;事件桥解决 「外部世界变化时怎么叫醒 Agent」


二、网关如何挂载

StartNativeEventBridges 在运行时初始化后调用:若 原生 Home Assistant 插件开启 且 Events.Enabled,则构造 HomeAssistantEventBridge;若 MQTT 开启 且 Events.Enabled,则构造 MqttEventBridge。二者都接收 MessagePipeline.InboundWriter,与 Worker 消费的是同一有界 Channel。

    private static void StartNativeEventBridges(
        GatewayConfig config,
        ILoggerFactory loggerFactory,
        MessagePipeline pipeline,
        CancellationToken stoppingToken)
    {
        if (config.Plugins.Native.HomeAssistant.Enabled && config.Plugins.Native.HomeAssistant.Events.Enabled)
        {
            var haLogger = loggerFactory.CreateLogger<HomeAssistantEventBridge>();
            var haBridge = new HomeAssistantEventBridge(
                config.Plugins.Native.HomeAssistant,
                haLogger,
                pipeline.InboundWriter);
            _ = haBridge.StartAsync(stoppingToken).ContinueWith(
                t => haLogger.LogError(t.Exception!.InnerException, "HomeAssistant event bridge failed to start"),
                TaskContinuationOptions.OnlyOnFaulted);
        }

        if (config.Plugins.Native.Mqtt.Enabled && config.Plugins.Native.Mqtt.Events.Enabled)
        {
            var mqttLogger = loggerFactory.CreateLogger<MqttEventBridge>();
            var mqttBridge = new MqttEventBridge(
                config.Plugins.Native.Mqtt,
                mqttLogger,
                pipeline.InboundWriter);
            _ = mqttBridge.StartAsync(stoppingToken).ContinueWith(
                t => mqttLogger.LogError(t.Exception!.InnerException, "MQTT event bridge failed to start"),
                TaskContinuationOptions.OnlyOnFaulted);
        }
    }

启动采用 StartAsync + ContinueWith(OnlyOnFaulted) 记录异常,桥内部自带 断线重连与指数退避(见下文)。


三、Home Assistant 事件桥

HomeAssistantEventBridge 继承 BackgroundService。未启用时直接打日志返回。

连接与鉴权:用 ClientWebSocket 连到 BaseUrl 推导的 ws(s)://…/api/websocket,完成 HA 要求的 auth_required → auth(access_token)→ auth_ok 握手,再对每个配置的 SubscribeEventTypes 发送 subscribe_events 并等待 result.success

事件处理:只处理 type == "event" 的帧;从 event.data 解析 entity_idold_state/new_statefriendly_name 等。先按 Events.AllowEntityIdGlobs / DenyEntityIdGlobs 做实体过滤。

规则与模板HomeAssistantRuleEngine.SelectRule 在 Events.Rules 中按 实体 glob、from/to 状态、本地时间段、星期 匹配;无匹配时若 EmitAllMatchingEvents 为 false 则丢弃。命中后 Render 用规则或全局 PromptTemplate 替换 {event_type}{entity_id} 等占位符。

冷却GlobalCooldownSeconds 全局限流;每条规则可有 CooldownSeconds,避免同一自动化在短时间刷屏。

出站:组装 InboundMessageChannelId / SessionId 来自配置,SenderId 固定 system),WriteAsync 到 InboundWriter

        var msg = new InboundMessage
        {
            ChannelId = _config.Events.ChannelId,
            SessionId = _config.Events.SessionId,
            SenderId = "system",
            Text = text
        };

        await _inbound.WriteAsync(msg, ct);

配置模型见 Core HomeAssistantConfig / HomeAssistantEventsConfigGatewayConfig 的 Plugins.Native.HomeAssistant 下),字段如 SubscribeEventTypesPromptTemplateRules 等与 USER_GUIDE.md 中 Home Assistant 章节一致。


四、MQTT 事件桥

MqttEventBridge 同样为 BackgroundService,依赖 MQTTnet。启用条件:Mqtt.EnabledEvents.Enabled 且 Events.Subscriptions 非空。

连接:通过 OpenClawMqttClientFactory 创建客户端与连接选项(与 MQTT 工具共用工厂,保证策略一致)。

订阅:对每个 MqttSubscriptionConfig.Topic(支持通配符)校验 Policy.AllowSubscribeTopicGlobs / DenySubscribeTopicGlobs,再 SubscribeAsync

消息处理:收到消息后再次对 实际 topic做策略校验(纵深防御);载荷截断到MaxPayloadBytes,UTF-8 解码后写入MqttMessageCache(供MqttTool等读取最近载荷)。用MqttTopicFilterComparer找到匹配的第一条订阅配置,按CooldownSeconds限流,再用PromptTemplate或默认"MQTT message on {topic}: {payload}"生成文本,封装为InboundMessage 写入管道。

        var msg = new InboundMessage
        {
            ChannelId = _config.Events.ChannelId,
            SessionId = _config.Events.SessionId,
            SenderId = "system",
            Text = text
        };

        await _inbound.WriteAsync(msg, ct);

五、可靠性行为(两处共通)

  • 外层循环
    ExecuteAsync 内 while (!stoppingToken.IsCancellationRequested),异常后 Task.Delay 退避(MQTT/HA 均类似,HA 在 HomeAssistantEventBridge 中上限约 30s 倍增)。
  • 单桥实例
    :每种桥各一个 BackgroundService,与网关进程同生命周期;不单独占端口,只消费上游 WS/MQTT。

六、运维提示

  • SessionId / ChannelId
     必须指向网关已注册、且策略允许处理 system 发送者 的会话/渠道,否则消息可能无法进入预期 Agent 路径(需结合渠道与 allowlist 配置排查)。
  • 事件风暴时优先调 GlobalCooldownSecondsEmitAllMatchingEvents(HA)、CooldownSeconds(MQTT 按订阅)与 规则收窄
  • 与 第五篇 的关系:事件桥写入的 InboundMessage 最终仍由 MessagePipeline 与 AgentRuntime 消费;工具层 OpenClawToolExecutor 不参与「入站」阶段,只参与模型发起工具调用之后。

延伸阅读

  • src/OpenClaw.Agent/Integrations/HomeAssistantEventBridge.cs
  • src/OpenClaw.Agent/Integrations/MqttEventBridge.cs
  • src/OpenClaw.Agent/Integrations/HomeAssistantRuleEngine.cs
  • docs/TOOLS_GUIDE.md
    (Home Assistant / MQTT 工具与策略)
  • docs/USER_GUIDE.md
    (Home Assistant 事件与 MQTT 配置说明)

版权声明:基于开源仓库 OpenClaw.NET 整理,仅供技术交流。