源码加更06_接收分发、QoS 事务和诊断闭环
[!abstract] 这一篇完整公开接收分发、QoS ACK、Inflight 事务、QoS2 入站表、Topic Alias 和消息记录方法。这是 MQTT Client 现场稳定性的收口层。
适合谁收藏
正在排查 QoS1/QoS2 超时、重发、ACK 对不上的工程师。 需要理解为什么 M_ProcessReceive 不能只等单一响应的读者。 想把事务表、诊断和现场观测量连起来的 PLC 开发者。
本篇核心图

读图重点:先看源码对象之间的职责边界,再看数据、状态和错误如何沿着调用链流动。源码加更不是把文件名列出来,而是把完整代码、工程意图和验证口径一起讲清楚。
先给结论
高频 MQTT 通信的难点不是能不能收到报文,而是接收后能不能正确分发、ACK、记录和超时收口。这层一乱,状态机就会被在途事务拖死。
从理论到代码实现链路
MQTT 标准给的是报文类型、固定头、可变头、载荷、QoS 交互和会话语义;PLC 工程真正要解决的是周期扫描、缓冲区长度、错误锁存、在线变量、连接重入和现场可诊断性。
所以这套开源实现不能只按协议章节拆,也不能只按文件名拆。正确读法是把标准约束翻译成程序对象:入口程序负责给命令和观测点,GVL 和 DUT 定义容量与数据模型,主功能块负责调度状态机,构建方法负责出站报文,处理方法负责入站报文,辅助方法负责长度、队列、事务、主题和诊断边界。
本篇完整公开接收分发、QoS ACK、Inflight、RxQoS2、Topic Alias 和消息记录相关方法。
再往下一层看,这里其实有两条线同时存在。第一条是协议线:固定头、Remaining Length、PacketId、QoS、Topic、Payload 和 Reason Code 必须能按 MQTT 规则组合起来。第二条是 PLC 工程线:每个周期只能推进有限步骤,所有中间状态都要能被在线变量观察,所有错误都要能被锁存并归类,所有缓冲区长度都要在写入前被检查。
这就是源码加更必须完整公开的原因。只给几段核心片段,读者最多能看懂某个判断;把完整对象放出来,读者才能看到对象之间如何传递状态、长度、错误和诊断信息。完整源码讲解不是为了堆代码,而是为了让读者能从标准约束一路追到可运行的 ST 对象,再从现场现象反向定位到具体边界。
本篇公开的完整源码范围
M_ProcessReceive.st | ||
M_ProcessPendingFrames.st | ||
M_BuildPubAckPacket.st | ||
M_BuildPubRecPacket.st | ||
M_BuildPubRelPacket.st | ||
M_BuildPubCompPacket.st | ||
M_HandlePubAck.st | ||
M_HandlePubRec.st | ||
M_HandlePubRel.st | ||
M_HandlePubComp.st | ||
M_InflightAdd.st | ||
M_InflightCheckTimeout.st | ||
M_InflightClear.st | ||
M_InflightFind.st | ||
M_InflightRemove.st | ||
M_InflightUpdateState.st | ||
M_RxQoS2Add.st | ||
M_RxQoS2Find.st | ||
M_RxQoS2Remove.st | ||
M_TopicAliasClear.st | ||
M_TopicAliasLookup.st | ||
M_TopicAliasRegister.st | ||
M_RecordReceivedMessage.st | ||
FB_MqttPropertyCodec.st | ||
FB_Random.st |
怎么读这些源码
第一遍只看对象职责:这个文件解决哪一层问题,是入口、模型、状态、构建、接收、事务,还是诊断。
第二遍看边界变量:长度、索引、PacketId、QoS、状态枚举、错误码、缓冲区水位和在线观测量。PLC 通信代码最怕的是“能跑但不可诊断”,所以每个关键对象都要问一句:现场出问题时,我能不能从它留下的变量看出原因。
第三遍再看具体语句。源码全部公开,不等于读者要从第一行顺序读到最后一行。更稳的方式是用图和表先建立地图,再回到完整代码里确认每个边界确实落地。
工程验证路径
验证时看 Inflight 数量、PacketId 状态、QoS2 入站记录、Topic Alias 映射和最近接收消息。这些量能对上,才说明 QoS 闭环真的站住了。
本篇完整开源代码
完整代码 1:M_ProcessReceive.st
这一段完整公开 M_ProcessReceive.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_ProcessReceive/// 功能 : 统一处理接收到的 MQTT 报文/// 说明 : 校验完整帧后,按报文类型执行解析、确认和状态更新。/// 编程人员 : ControlRookie/// 时间 : 2026-05-07/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_ProcessReceive : BOOLVARbyHeader : BYTE; // 固定报头首字节byMsgType : BYTE; // 报文类型byFlags : BYTE; // 固定报头标志byReasonCode : BYTE; // 原因码bySubAckCode : BYTE; // SUBACK 原因码uiFixedHeaderLen : UINT; // 固定报头总长度uiRemainingLen : UINT; // 当前报文固定报头中携带的 Remaining Length 值[byte]uiFrameEndPos : UINT; // 当前帧结束位置uiTopicPos : UINT; // 主题起始偏移uiTopicLen : UINT; // 主题长度uiPayloadPos : UINT; // 载荷起始偏移uiPayloadLen : UINT; // 当前报文载荷区实际长度[byte]uiPacketId : UINT; // 当前接收报文中解析出的 Packet IdentifieruiAliasId : UINT; // 主题别名udiSubIdentifier : UDINT; // 订阅标识符uiPropsLen : UINT; // 属性总长度uiPropsHeaderLen : UINT; // 属性长度这个 VBI 字段本身占用的字节数[byte]uiPropsEnd : UINT; // 属性区结束偏移uiPropsPos : UINT; // 属性读取偏移uiIndex : UINT; // 通用扫描或历史列表搬移时使用的索引uiRxQoS2Index : UINT; // 入站 QoS2 去重表中命中的槽位索引i : UINT; // 循环索引sAliasTopic : STRING(GVL_Mqtt.cnMaxTopicLen); // 别名映射主题END_VAR// === IMPLEMENTATION ===/// 至少要先拿到 1 字节固定报头 + 1 字节 Remaining Length,/// 否则连“这是不是一帧完整 MQTT 报文”都还无法判断。IF uiRxLength < 2 THENM_ProcessReceive := FALSE;RETURN;END_IF/// 先解析固定报头和 Remaining Length,得到“当前完整帧”在接收缓冲区中的边界。byHeader := aRxBuf[0];byMsgType := byHeader AND GVL_Mqtt.cnHdrTypeMask;byFlags := byHeader AND GVL_Mqtt.cnHdrFlagsMask;uiFixedHeaderLen := 1 + M_DecodeRemainingLength(pBuffer := ADR(aRxBuf[1]),uiLength => uiRemainingLen);IF uiFixedHeaderLen = 0 THENM_ProcessReceive := FALSE;RETURN;END_IFuiFrameEndPos := uiFixedHeaderLen + uiRemainingLen;IF uiRxLength < uiFrameEndPos THENM_ProcessReceive := FALSE;RETURN;END_IF/// 当前方法一次只消费一帧。/// 如果缓冲区里还有后续报文,最后会把余下字节整体前移,留给下一次继续处理。CASE byMsgType OFE_MqttPacketType.byConnAck:/// 正常 CONNACK 只允许在连接建立阶段进入专用等待态处理。/// 一旦在通用接收路径中再次看到 CONNACK,说明会话时序已经错乱。M_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Unexpected CONNACK packet');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;E_MqttPacketType.byPublish:/// 先从固定报头里提取 QoS / DUP / Retain,/// 后续再结合主题、属性和载荷拼出一条完整的入站业务消息。byReceivedQoS := SHR(byFlags AND GVL_Mqtt.cnHdrQoSMask, 1);uiAliasId := 0;udiSubIdentifier := 0;bReceivedRetain := (byFlags AND GVL_Mqtt.cnHdrRetainFlag) <> 0;IF (byReceivedQoS > TO_BYTE(E_MqttQoS.byQoS2)) OR((byReceivedQoS = TO_BYTE(E_MqttQoS.byQoS0)) AND ((byFlags AND GVL_Mqtt.cnHdrDupFlag) <> 0)) THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Invalid PUBLISH header flags');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFuiTopicPos := uiFixedHeaderLen;IF uiTopicPos + 1 >= uiFrameEndPos THENM_ProcessReceive := FALSE;RETURN;END_IFuiTopicLen := SHL(BYTE_TO_UINT(aRxBuf[uiTopicPos]), 8) OR BYTE_TO_UINT(aRxBuf[uiTopicPos + 1]);uiTopicPos := uiTopicPos + 2;uiPayloadPos := uiTopicPos + uiTopicLen;/// MQTT 允许 PUBLISH 主题字符串为空,但那只在 Topic Alias 续用场景才合法。/// 因此这里先尝试读主题,后面再结合 Alias 规则决定最终主题是否有效。sRecTopic := '';IF uiTopicLen > 0 THENIF uiPayloadPos > uiFrameEndPos THENM_ProcessReceive := FALSE;RETURN;END_IFIF NOT M_CopyBytesToString(pSource := ADR(aRxBuf[uiTopicPos]),uiByteCount := uiTopicLen,sTarget := sRecTopic) THENM_ProcessReceive := FALSE;RETURN;END_IFEND_IFuiPacketId := 0;IF byReceivedQoS > TO_BYTE(E_MqttQoS.byQoS0) THEN/// QoS1 / QoS2 的入站 PUBLISH 必须带 Packet Identifier,/// 后续 ACK / 去重 / 四步握手都依赖这个编号闭环。IF uiPayloadPos + 1 >= uiFrameEndPos THENM_ProcessReceive := FALSE;RETURN;END_IFuiPacketId := SHL(BYTE_TO_UINT(aRxBuf[uiPayloadPos]), 8) OR BYTE_TO_UINT(aRxBuf[uiPayloadPos + 1]);IF uiPacketId = 0 THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Packet ID must not be zero');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFuiPayloadPos := uiPayloadPos + 2;END_IFIF eVersion = E_MqttVersion.byMqttVersion50 THEN/// MQTT 5.0 的 PUBLISH 还要先跳过属性区,/// 才能准确落到真正业务 payload 的起点。IF uiPayloadPos >= uiFrameEndPos THENM_ProcessReceive := FALSE;RETURN;END_IFuiPropsHeaderLen := M_DecodeRemainingLength(pBuffer := ADR(aRxBuf[uiPayloadPos]),uiLength => uiPropsLen);IF uiPropsHeaderLen = 0 THENM_ProcessReceive := FALSE;RETURN;END_IFuiPropsPos := uiPayloadPos + uiPropsHeaderLen;uiPropsEnd := uiPropsPos + uiPropsLen;IF uiPropsEnd > uiFrameEndPos THENM_ProcessReceive := FALSE;RETURN;END_IFWHILE uiPropsPos < uiPropsEnd DO/// 这里只处理当前客户端已经实现并确实会影响运行行为的属性。/// 其余未支持属性直接按协议错误处理,避免“悄悄忽略后跑偏”。CASE aRxBuf[uiPropsPos] OFGVL_Mqtt.cnPropTopicAlias:uiPropsPos := uiPropsPos + 1;IF uiPropsPos + 1 >= uiPropsEnd THENM_ProcessReceive := FALSE;RETURN;END_IFuiAliasId := SHL(BYTE_TO_UINT(aRxBuf[uiPropsPos]), 8) OR BYTE_TO_UINT(aRxBuf[uiPropsPos + 1]);uiPropsPos := uiPropsPos + 2;GVL_Mqtt.cnPropSubscriptionId:uiPropsPos := uiPropsPos + 1;uiIndex := 0;uiPropsHeaderLen := M_DecodeRemainingLength(pBuffer := ADR(aRxBuf[uiPropsPos]),uiLength => uiIndex);IF uiPropsHeaderLen = 0 THENM_ProcessReceive := FALSE;RETURN;END_IFudiSubIdentifier := TO_UDINT(uiIndex);uiPropsPos := uiPropsPos + uiPropsHeaderLen;GVL_Mqtt.cnPropPayloadFormat,GVL_Mqtt.cnPropRetainAvailable,GVL_Mqtt.cnPropRequestProblemInfo,GVL_Mqtt.cnPropRequestResponseInfo,GVL_Mqtt.cnPropWildcardSubAvail,GVL_Mqtt.cnPropSubIdAvail,GVL_Mqtt.cnPropSharedSubAvail:uiPropsPos := uiPropsPos + 2;GVL_Mqtt.cnPropReceiveMaximum,GVL_Mqtt.cnPropTopicAliasMax,GVL_Mqtt.cnPropServerKeepAlive:uiPropsPos := uiPropsPos + 3;GVL_Mqtt.cnPropMessageExpiry,GVL_Mqtt.cnPropSessionExpiry,GVL_Mqtt.cnPropMaxPacketSize:uiPropsPos := uiPropsPos + 5;GVL_Mqtt.cnPropContentType,GVL_Mqtt.cnPropResponseTopic,GVL_Mqtt.cnPropCorrelationData,GVL_Mqtt.cnPropAssignedClientId,GVL_Mqtt.cnPropAuthMethod,GVL_Mqtt.cnPropAuthData,GVL_Mqtt.cnPropResponseInfo,GVL_Mqtt.cnPropServerReference,GVL_Mqtt.cnPropReasonString:uiPropsPos := uiPropsPos + 1;IF uiPropsPos + 1 >= uiPropsEnd THENM_ProcessReceive := FALSE;RETURN;END_IFuiPropsPos := uiPropsPos + 2 + (SHL(BYTE_TO_UINT(aRxBuf[uiPropsPos]), 8) ORBYTE_TO_UINT(aRxBuf[uiPropsPos + 1]));GVL_Mqtt.cnPropUserProperty:uiPropsPos := uiPropsPos + 1;FOR i := 1 TO 2 DOIF uiPropsPos + 1 >= uiPropsEnd THENM_ProcessReceive := FALSE;RETURN;END_IFuiPropsPos := uiPropsPos + 2 + (SHL(BYTE_TO_UINT(aRxBuf[uiPropsPos]), 8) ORBYTE_TO_UINT(aRxBuf[uiPropsPos + 1]));END_FORELSEM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Unsupported PUBLISH property');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_CASEIF uiPropsPos > uiPropsEnd THENM_ProcessReceive := FALSE;RETURN;END_IFEND_WHILEuiPayloadPos := uiPropsEnd;IF uiAliasId > 0 THEN/// Topic Alias 有两种合法用法:/// 1. 主题字符串非空:表示“登记或覆盖别名映射”;/// 2. 主题字符串为空:表示“直接复用之前登记过的别名主题”。IF uiAliasId > GVL_Mqtt.cnMaxTopicAlias THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrTopicAliasInvalid),sMessage := 'Topic Alias exceeds client maximum');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFIF sRecTopic = '' THENsAliasTopic := M_TopicAliasLookup(uiAliasId := uiAliasId);IF sAliasTopic = '' THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrTopicAliasInvalid),sMessage := 'Unknown Topic Alias');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFsRecTopic := sAliasTopic;ELSEIF NOT M_TopicAliasRegister(uiAliasId := uiAliasId, sTopic := sRecTopic) THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrTopicAliasInvalid),sMessage := 'Topic Alias registration failed');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFEND_IFEND_IFEND_IFuiPayloadLen := uiFrameEndPos - uiPayloadPos;sRecPayload := '';IF uiPayloadLen > 0 THENIF NOT M_CopyBytesToString(pSource := ADR(aRxBuf[uiPayloadPos]),uiByteCount := uiPayloadLen,sTarget := sRecPayload) THENM_ProcessReceive := FALSE;RETURN;END_IFEND_IFIF byReceivedQoS = TO_BYTE(E_MqttQoS.byQoS1) THEN/// QoS1 入站消息要先登记“占用了一个接收侧未完成配额”,/// 再构建 PUBACK,确保高压场景下不会无限接收入站 QoS>0 消息。IF uiRxInFlightQosCount >= uiReceiveMax THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrReceiveMaxExceeded),sMessage := 'Receive Maximum exceeded');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFuiRxInFlightQosCount := uiRxInFlightQosCount + 1;IF NOT M_BuildPubAckPacket(uiPacketId := uiPacketId) THENIF uiRxInFlightQosCount > 0 THENuiRxInFlightQosCount := uiRxInFlightQosCount - 1;END_IFM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Build PUBACK failed');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFxPendingImmediateTx := TRUE;M_RecordReceivedMessage();ELSIF byReceivedQoS = TO_BYTE(E_MqttQoS.byQoS2) THEN/// QoS2 先查去重表:/// - 第一次看到该 Packet ID:登记、记录业务消息、回 PUBREC/// - 重复看到该 Packet ID:不重复投递业务消息,只重发握手响应uiRxQoS2Index := M_RxQoS2Find(uiPacketId := uiPacketId);IF uiRxQoS2Index = 0 THENIF uiRxInFlightQosCount >= uiReceiveMax THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrReceiveMaxExceeded),sMessage := 'Receive Maximum exceeded');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFIF NOT M_RxQoS2Add(uiPacketId := uiPacketId) THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrReceiveMaxExceeded),sMessage := 'QoS2 dedup queue is full');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFM_RecordReceivedMessage();END_IFIF NOT M_BuildPubRecPacket(uiPacketId := uiPacketId) THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Build PUBREC failed');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFxPendingImmediateTx := TRUE;ELSE/// QoS0 没有后续握手,记录消息后即可视为完成。M_RecordReceivedMessage();END_IFE_MqttPacketType.byPubAck:/// 出站 QoS1 发布的终点:匹配到期望 Packet ID 后关闭等待态并释放 inflight。IF (byFlags <> 0) OR (uiFrameEndPos < 4) THENM_ProcessReceive := FALSE;RETURN;END_IFuiPacketId := SHL(BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen]), 8) OR BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen + 1]);IF uiPacketId = uiExpectedPacketId THENbyReasonCode := 0;IF (eVersion = E_MqttVersion.byMqttVersion50) AND (uiFrameEndPos > uiFixedHeaderLen + 2) THENbyReasonCode := aRxBuf[uiFixedHeaderLen + 2];END_IFxWaitingForAck := FALSE;bDup := FALSE;IF byReasonCode >= 16#80 THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrPubAckRefused),sMessage := 'PUBACK rejected');ELSExPublishedEvent := TRUE;END_IFM_InflightRemove(uiPacketId := uiPacketId);ELSEM_ProcessReceive := FALSE;RETURN;END_IFE_MqttPacketType.byPubRec:/// 出站 QoS2 发布第 2 步:收到 PUBREC 后,把 inflight 状态推进到“可发 PUBREL”。IF (byFlags <> 0) OR (uiFrameEndPos < 4) THENM_ProcessReceive := FALSE;RETURN;END_IFuiPacketId := SHL(BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen]), 8) OR BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen + 1]);IF uiPacketId = uiExpectedPacketId THENbyReasonCode := 0;IF (eVersion = E_MqttVersion.byMqttVersion50) AND (uiFrameEndPos >= 5) THENbyReasonCode := aRxBuf[uiFixedHeaderLen + 2];IF byReasonCode >= 16#80 THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrPubRecRefused),sMessage := 'PUBREC rejected');M_InflightRemove(uiPacketId := uiPacketId);xWaitingForAck := FALSE;END_IFEND_IFIF NOT bError THENM_InflightUpdateState(uiPacketId := uiPacketId,eNewState := E_MqttInflightState.iPubRecReceived);xWaitingForAck := FALSE;uiQoS2PacketId := uiPacketId;END_IFELSEM_ProcessReceive := FALSE;RETURN;END_IF16#60:/// PUBREL 固定报头类型是 16#60,且标志必须固定为 16#02。/// 这里代表“入站 QoS2 对端开始请求我们完成第 3 / 4 步握手”。IF (byFlags <> 2) OR (uiFrameEndPos < 4) THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Invalid PUBREL flags');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFuiPacketId := SHL(BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen]), 8) OR BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen + 1]);M_RxQoS2Remove(uiPacketId := uiPacketId);IF NOT M_BuildPubCompPacket(uiPacketId := uiPacketId) THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Build PUBCOMP failed');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFxPendingImmediateTx := TRUE;E_MqttPacketType.byPubComp:/// 出站 QoS2 发布终点:收到 PUBCOMP 后,整条 inflight 事务才真正完成。IF (byFlags <> 0) OR (uiFrameEndPos < 4) THENM_ProcessReceive := FALSE;RETURN;END_IFuiPacketId := SHL(BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen]), 8) OR BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen + 1]);IF uiPacketId = uiQoS2PacketId THENbyReasonCode := 0;IF (eVersion = E_MqttVersion.byMqttVersion50) AND (uiFrameEndPos > uiFixedHeaderLen + 2) THENbyReasonCode := aRxBuf[uiFixedHeaderLen + 2];END_IFxWaitingForAck := FALSE;bDup := FALSE;IF byReasonCode >= 16#80 THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrPubCompRefused),sMessage := 'PUBCOMP rejected');ELSExPublishedEvent := TRUE;END_IFM_InflightRemove(uiPacketId := uiPacketId);ELSEM_ProcessReceive := FALSE;RETURN;END_IFE_MqttPacketType.bySubAck:/// SUBACK 收到后既要结束等待态,也要把“当前激活订阅请求”写回本地订阅表。IF (byFlags <> 0) OR (uiFrameEndPos < 5) THENM_ProcessReceive := FALSE;RETURN;END_IFuiPacketId := SHL(BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen]), 8) OR BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen + 1]);IF uiPacketId = uiPendingSubPacketId THENuiPayloadPos := uiFixedHeaderLen + 2;IF eVersion = E_MqttVersion.byMqttVersion50 THENuiPropsHeaderLen := M_DecodeRemainingLength(pBuffer := ADR(aRxBuf[uiPayloadPos]),uiLength => uiPropsLen);IF uiPropsHeaderLen = 0 THENM_ProcessReceive := FALSE;RETURN;END_IFIF uiPayloadPos + uiPropsHeaderLen + uiPropsLen > uiFrameEndPos THENM_ProcessReceive := FALSE;RETURN;END_IFuiPayloadPos := uiPayloadPos + uiPropsHeaderLen + uiPropsLen;END_IFIF uiPayloadPos >= uiFrameEndPos THENM_ProcessReceive := FALSE;RETURN;END_IFbySubAckCode := aRxBuf[uiPayloadPos];xWaitingForSubAck := FALSE;IF bySubAckCode < 16#80 THENIF bySubAckCode > TO_BYTE(E_MqttQoS.byQoS2) THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'SUBACK reason code is invalid');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFxSubscribedEvent := TRUE;M_SubListAdd(sTopic := sActiveSubTopic,eQos := eActiveSubQoS,udiSubscriptionId := udiActiveSubscriptionId);IF bActiveSubscribeRestore THENIF uiRestoreSubscriptionIndex >= GVL_Mqtt.cnMaxSubscriptions THENbRestoreSubscriptions := FALSE;uiRestoreSubscriptionIndex := 0;END_IFEND_IFsActiveSubTopic := '';eActiveSubQoS := E_MqttQoS.byQoS0;udiActiveSubscriptionId := 0;bActiveSubscribeRestore := FALSE;ELSEM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrSubscriptionFailed),sMessage := 'Subscription rejected');bRestoreSubscriptions := FALSE;uiRestoreSubscriptionIndex := 0;sActiveSubTopic := '';eActiveSubQoS := E_MqttQoS.byQoS0;udiActiveSubscriptionId := 0;bActiveSubscribeRestore := FALSE;END_IFELSEM_ProcessReceive := FALSE;RETURN;END_IFE_MqttPacketType.byUnsubAck:/// UNSUBACK 成功后要同步移除本地订阅表中的主题意图;/// 若服务端返回“不存在该订阅”,按 MQTT 5.0 允许值处理,不视为协议错。IF byFlags <> 0 THENM_ProcessReceive := FALSE;RETURN;END_IFIF ((eVersion = E_MqttVersion.byMqttVersion50) AND (uiFrameEndPos < 5)) OR((eVersion <> E_MqttVersion.byMqttVersion50) AND (uiFrameEndPos < 4)) THENM_ProcessReceive := FALSE;RETURN;END_IFuiPacketId := SHL(BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen]), 8) OR BYTE_TO_UINT(aRxBuf[uiFixedHeaderLen + 1]);IF uiPacketId = uiPendingUnsubPacketId THENxWaitingForUnsubAck := FALSE;byReasonCode := 0;IF eVersion = E_MqttVersion.byMqttVersion50 THENuiPayloadPos := uiFixedHeaderLen + 2;uiPropsHeaderLen := M_DecodeRemainingLength(pBuffer := ADR(aRxBuf[uiPayloadPos]),uiLength => uiPropsLen);IF uiPropsHeaderLen = 0 THENM_ProcessReceive := FALSE;RETURN;END_IFIF uiPayloadPos + uiPropsHeaderLen + uiPropsLen > uiFrameEndPos THENM_ProcessReceive := FALSE;RETURN;END_IFuiPayloadPos := uiPayloadPos + uiPropsHeaderLen + uiPropsLen;IF uiPayloadPos < uiFrameEndPos THENbyReasonCode := aRxBuf[uiPayloadPos];END_IFEND_IFIF (byReasonCode <> 0) AND (byReasonCode <> GVL_Mqtt.cnRcNoSubscriptionExisted) AND (byReasonCode < 16#80) THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'UNSUBACK reason code is invalid');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFIF byReasonCode >= 16#80 THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrSubscriptionFailed),sMessage := 'Unsubscribe rejected');ELSExUnsubscribedEvent := TRUE;M_SubListRemove(sTopic := sUnsubTopic);END_IFELSEM_ProcessReceive := FALSE;RETURN;END_IFE_MqttPacketType.byPingResp:/// PINGRESP 只是把“当前还有一笔心跳等待”清掉,不携带业务数据。IF (byFlags <> 0) OR (uiRemainingLen <> 0) THENM_ProcessReceive := FALSE;RETURN;END_IFbPingPending := FALSE;E_MqttPacketType.byDisconnect:/// 服务端主动发 DISCONNECT 时,客户端必须退出当前会话,/// 同时把原因码转成可见诊断,便于现场判断是协议拒绝、管理断开还是迁移提示。byReasonCode := 0;IF eVersion = E_MqttVersion.byMqttVersion50 THENIF byFlags <> 0 THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Invalid DISCONNECT flags');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFIF uiRemainingLen >= 1 THENbyReasonCode := aRxBuf[uiFixedHeaderLen];END_IFELSIF (byFlags <> 0) OR (uiRemainingLen <> 0) THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Invalid MQTT 3.1.1 DISCONNECT');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_IFM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrNotConnected),sMessage := CONCAT('Server sent DISCONNECT rc=', BYTE_TO_STRING(byReasonCode)));eState := E_MqttState.iTcpDisconnect;E_MqttPacketType.byAuth:/// V2.0 当前不实现增强认证交换。/// 一旦服务端进入 AUTH 流程,直接以明确诊断退出,避免假装兼容。IF eVersion <> E_MqttVersion.byMqttVersion50 THENM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Unexpected AUTH packet');eState := E_MqttState.iTcpDisconnect;ELSEM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrAuthFailed),sMessage := 'AUTH exchange is not supported in MqttClient_V2_0');eState := E_MqttState.iTcpDisconnect;END_IFELSEM_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrProtocolError),sMessage := 'Unsupported packet type');eState := E_MqttState.iTcpDisconnect;M_ProcessReceive := FALSE;RETURN;END_CASE/// 如果接收缓冲区里还粘着后续报文,把未消费字节整体前移,/// 这样下一次扫描就能从新帧起点继续解析。IF uiRxLength > uiFrameEndPos THENuiPayloadLen := uiRxLength - uiFrameEndPos;IF uiPayloadLen > 0 THENFOR i := 1 TO uiPayloadLen DOaRxBuf[i - 1] := aRxBuf[uiFrameEndPos + i - 1];END_FOREND_IFuiRxLength := uiPayloadLen;ELSEuiRxLength := 0;END_IFM_ProcessReceive := TRUE;
完整代码 2:M_ProcessPendingFrames.st
这一段完整公开 M_ProcessPendingFrames.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_ProcessPendingFrames/// 功能 : 连续处理缓冲区中的完整报文/// 说明 : 只要存在完整报文就循环调用 M_ProcessReceive/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_ProcessPendingFrames : BOOLVARbProcessed : BOOL; // 是否处理过至少一帧END_VAR// === IMPLEMENTATION ===bProcessed := FALSE;WHILE uiRxLength > 0 DOIF NOT M_ProcessReceive() THENEXIT;END_IFbProcessed := TRUE;// 生成即时协议响应后,先交还主状态机发送 ACK,再继续处理后续入站帧。IF xPendingImmediateTx THENEXIT;END_IFEND_WHILEM_ProcessPendingFrames := bProcessed;
完整代码 3:M_BuildPubAckPacket.st
这一段完整公开 M_BuildPubAckPacket.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_BuildPubAckPacket/// 功能 : 构建 PUBACK 发送报文/// 说明 : 根据 MQTT 版本组装 QoS1 发布确认报文并写入发送缓冲区。/// 编程人员 : ControlRookie/// 时间 : 2026-05-05/// 版本 : V1.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_BuildPubAckPacket : BOOLVAR_INPUTuiPacketId : UINT; // 需要回给 Broker 的 QoS1 发布报文标识符END_VARVARuiPos : UINT := 0; // 当前写入发送缓冲区的位置偏移[byte]uiRemainingLen : UINT; // 写入固定报头中的 Remaining Length 值[byte]i : DINT; // 清空发送缓冲区时使用的循环索引END_VAR// === IMPLEMENTATION ===// BUG-10: 缓冲区溢出保护(PUBACK最大6字节)IF SIZEOF(aTxBuf) < 6 THENM_BuildPubAckPacket := FALSE;RETURN;END_IF// 清空发送缓冲区FOR i := LOWER_BOUND(aTxBuf, 1) TO UPPER_BOUND(aTxBuf, 1) DOaTxBuf[i] := 0;END_FOR/// =======================================================================/// 创建报文/// =======================================================================uiPos := 0;IF eVersion = E_MqttVersion.byMqttVersion50 THEN// MQTT 5.0: Packet ID(2) + Reason Code(1) + Properties Length(1)uiRemainingLen := 4;aTxBuf[0] := E_MqttPacketType.byPubAck;uiPos := 1;uiPos := uiPos + M_EncodeRemainingLength(uiRemainingLen, ADR(aTxBuf[uiPos]));// Packet IDaTxBuf[uiPos] := UINT_TO_BYTE(SHR(uiPacketId, 8)); uiPos := uiPos + 1;aTxBuf[uiPos] := UINT_TO_BYTE(uiPacketId AND 16#FF); uiPos := uiPos + 1;// Reason Code: 0x00 = SuccessaTxBuf[uiPos] := 16#00; uiPos := uiPos + 1;// Properties Length: 0aTxBuf[uiPos] := 0; uiPos := uiPos + 1;ELSE// MQTT 3.1.1: Packet ID(2) only, BUG-03修复: remaining length = 2uiRemainingLen := 2;aTxBuf[0] := E_MqttPacketType.byPubAck;uiPos := 1;uiPos := uiPos + M_EncodeRemainingLength(uiRemainingLen, ADR(aTxBuf[uiPos]));aTxBuf[uiPos] := UINT_TO_BYTE(SHR(uiPacketId, 8)); uiPos := uiPos + 1;aTxBuf[uiPos] := UINT_TO_BYTE(uiPacketId AND 16#FF); uiPos := uiPos + 1;END_IFuiTxLength := uiPos;M_BuildPubAckPacket := TRUE;;
完整代码 4:M_BuildPubRecPacket.st
这一段完整公开 M_BuildPubRecPacket.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_BuildPubRecPacket/// 功能 : 构建 PUBREC 发送报文/// 说明 : 根据 MQTT 版本组装 QoS2 第一步确认报文并更新发送长度。/// 编程人员 : ControlRookie/// 时间 : 2026-05-05/// 版本 : V1.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_BuildPubRecPacket : BOOLVAR_INPUTuiPacketId : UINT; // 需要回给对端的 QoS2 第一步确认报文标识符END_VARVARuiPos : UINT := 0; // 当前写入发送缓冲区的位置偏移[byte]uiRemainingLen : UINT; // 写入固定报头中的 Remaining Length 值[byte]i : DINT; // 清空发送缓冲区时使用的循环索引END_VAR// === IMPLEMENTATION ===// BUG-10: 缓冲区溢出保护(PUBREC最大6字节)IF SIZEOF(aTxBuf) < 6 THENM_BuildPubRecPacket := FALSE;RETURN;END_IF// 清空发送缓冲区FOR i := LOWER_BOUND(aTxBuf, 1) TO UPPER_BOUND(aTxBuf, 1) DOaTxBuf[i] := 0;END_FORuiPos := 0;IF eVersion = E_MqttVersion.byMqttVersion50 THEN// MQTT 5.0: Packet ID(2) + Reason Code(1) + Properties Length(1)uiRemainingLen := 4;aTxBuf[0] := E_MqttPacketType.byPubRec;uiPos := 1;uiPos := uiPos + M_EncodeRemainingLength(uiRemainingLen, ADR(aTxBuf[uiPos]));aTxBuf[uiPos] := UINT_TO_BYTE(SHR(uiPacketId, 8)); uiPos := uiPos + 1;aTxBuf[uiPos] := UINT_TO_BYTE(uiPacketId AND 16#FF); uiPos := uiPos + 1;// Reason Code: 0x00 = SuccessaTxBuf[uiPos] := 16#00; uiPos := uiPos + 1;// Properties Length: 0aTxBuf[uiPos] := 0; uiPos := uiPos + 1;ELSE// MQTT 3.1.1: Packet ID(2) only, remaining length = 2uiRemainingLen := 2;aTxBuf[0] := E_MqttPacketType.byPubRec;uiPos := 1;uiPos := uiPos + M_EncodeRemainingLength(uiRemainingLen, ADR(aTxBuf[uiPos]));aTxBuf[uiPos] := UINT_TO_BYTE(SHR(uiPacketId, 8)); uiPos := uiPos + 1;aTxBuf[uiPos] := UINT_TO_BYTE(uiPacketId AND 16#FF); uiPos := uiPos + 1;END_IFuiTxLength := uiPos;M_BuildPubRecPacket := TRUE;;
完整代码 5:M_BuildPubRelPacket.st
这一段完整公开 M_BuildPubRelPacket.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_BuildPubRelPacket/// 功能 : 构建 PUBREL 发送报文/// 说明 : 根据 MQTT 版本组装 QoS2 第二步释放报文并更新发送长度。/// 编程人员 : ControlRookie/// 时间 : 2026-05-05/// 版本 : V1.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_BuildPubRelPacket : BOOLVARuiPos : UINT := 0; // 当前写入发送缓冲区的位置偏移[byte]uiRemainingLen : UINT; // 写入固定报头中的 Remaining Length 值[byte]i : DINT; // 清空发送缓冲区时使用的循环索引END_VAR// === IMPLEMENTATION ===// BUG-10: 缓冲区溢出保护(PUBREL最大6字节)IF SIZEOF(aTxBuf) < 6 THENM_BuildPubRelPacket := FALSE;RETURN;END_IF// 清空发送缓冲区FOR i := LOWER_BOUND(aTxBuf, 1) TO UPPER_BOUND(aTxBuf, 1) DOaTxBuf[i] := 0;END_FORuiPos := 0;IF eVersion = E_MqttVersion.byMqttVersion50 THEN// MQTT 5.0: Packet ID(2) + Reason Code(1) + Properties Length(1)uiRemainingLen := 4;aTxBuf[0] := E_MqttPacketType.byPubRel;uiPos := 1;uiPos := uiPos + M_EncodeRemainingLength(uiRemainingLen, ADR(aTxBuf[uiPos]));aTxBuf[uiPos] := UINT_TO_BYTE(SHR(uiQoS2PacketId, 8)); uiPos := uiPos + 1;aTxBuf[uiPos] := UINT_TO_BYTE(uiQoS2PacketId AND 16#FF); uiPos := uiPos + 1;// Reason Code: 0x00 = SuccessaTxBuf[uiPos] := 16#00; uiPos := uiPos + 1;// Properties Length: 0aTxBuf[uiPos] := 0; uiPos := uiPos + 1;ELSE// MQTT 3.1.1: Packet ID(2) only, remaining length = 2uiRemainingLen := 2;aTxBuf[0] := E_MqttPacketType.byPubRel;uiPos := 1;uiPos := uiPos + M_EncodeRemainingLength(uiRemainingLen, ADR(aTxBuf[uiPos]));aTxBuf[uiPos] := UINT_TO_BYTE(SHR(uiQoS2PacketId, 8)); uiPos := uiPos + 1;aTxBuf[uiPos] := UINT_TO_BYTE(uiQoS2PacketId AND 16#FF); uiPos := uiPos + 1;END_IFuiTxLength := uiPos;M_BuildPubRelPacket := TRUE;;
完整代码 6:M_BuildPubCompPacket.st
这一段完整公开 M_BuildPubCompPacket.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_BuildPubCompPacket/// 功能 : 构建 PUBCOMP 发送报文/// 说明 : 根据 MQTT 版本组装 QoS2 完成确认报文并更新发送长度。/// 编程人员 : ControlRookie/// 时间 : 2026-05-05/// 版本 : V1.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_BuildPubCompPacket : BOOLVAR_INPUTuiPacketId : UINT; // 需要回给对端的 QoS2 完成阶段报文标识符END_VARVARuiPos : UINT := 0; // 当前写入发送缓冲区的位置偏移[byte]uiRemainingLen : UINT; // 写入固定报头中的 Remaining Length 值[byte]i : DINT; // 清空发送缓冲区时使用的循环索引END_VAR// === IMPLEMENTATION ===// BUG-10: 缓冲区溢出保护(PUBCOMP最大6字节)IF SIZEOF(aTxBuf) < 6 THENM_BuildPubCompPacket := FALSE;RETURN;END_IF// 清空发送缓冲区FOR i := LOWER_BOUND(aTxBuf, 1) TO UPPER_BOUND(aTxBuf, 1) DOaTxBuf[i] := 0;END_FORuiPos := 0;IF eVersion = E_MqttVersion.byMqttVersion50 THEN// MQTT 5.0: Packet ID(2) + Reason Code(1) + Properties Length(1)uiRemainingLen := 4;aTxBuf[0] := E_MqttPacketType.byPubComp;uiPos := 1;uiPos := uiPos + M_EncodeRemainingLength(uiRemainingLen, ADR(aTxBuf[uiPos]));aTxBuf[uiPos] := UINT_TO_BYTE(SHR(uiPacketId, 8)); uiPos := uiPos + 1;aTxBuf[uiPos] := UINT_TO_BYTE(uiPacketId AND 16#FF); uiPos := uiPos + 1;// Reason Code: 0x00 = SuccessaTxBuf[uiPos] := 16#00; uiPos := uiPos + 1;// Properties Length: 0aTxBuf[uiPos] := 0; uiPos := uiPos + 1;ELSE// MQTT 3.1.1: BUG-04修复, remaining length = 2uiRemainingLen := 2;aTxBuf[0] := E_MqttPacketType.byPubComp;uiPos := 1;uiPos := uiPos + M_EncodeRemainingLength(uiRemainingLen, ADR(aTxBuf[uiPos]));aTxBuf[uiPos] := UINT_TO_BYTE(SHR(uiPacketId, 8)); uiPos := uiPos + 1;aTxBuf[uiPos] := UINT_TO_BYTE(uiPacketId AND 16#FF); uiPos := uiPos + 1;END_IFuiTxLength := uiPos;M_BuildPubCompPacket := TRUE;;
完整代码 7:M_HandlePubAck.st
这一段完整公开 M_HandlePubAck.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_HandlePubAck/// 功能 : 兼容桩方法,保留 PUBACK 旧接口/// 说明 : 该方法已废弃,实际处理逻辑已迁移至 M_ProcessReceive,仅为兼容旧调用保留。/// 编程人员 : ControlRookie/// 时间 : 2026-05-05/// 版本 : V1.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_HandlePubAck : BOOLVAR_INPUTEND_VAR// === IMPLEMENTATION ===// 逻辑已移至M_ProcessReceiveM_HandlePubAck := FALSE;
完整代码 8:M_HandlePubRec.st
这一段完整公开 M_HandlePubRec.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_HandlePubRec/// 功能 : 兼容桩方法,保留 PUBREC 旧接口/// 说明 : 该方法已废弃,实际处理逻辑已迁移至 M_ProcessReceive,仅为兼容旧调用保留。/// 编程人员 : ControlRookie/// 时间 : 2026-05-05/// 版本 : V1.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_HandlePubRec : BOOLVAR_INPUTEND_VAR// === IMPLEMENTATION ===// 逻辑已移至M_ProcessReceiveM_HandlePubRec := FALSE;
完整代码 9:M_HandlePubRel.st
这一段完整公开 M_HandlePubRel.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_HandlePubRel/// 功能 : 兼容桩方法,保留 PUBREL 旧接口/// 说明 : 该方法已废弃,实际处理逻辑已迁移至 M_ProcessReceive,仅为兼容旧调用保留。/// 编程人员 : ControlRookie/// 时间 : 2026-05-05/// 版本 : V1.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_HandlePubRel : BOOLVAR_INPUTEND_VAR// === IMPLEMENTATION ===// 逻辑已移至M_ProcessReceiveM_HandlePubRel := FALSE;
完整代码 10:M_HandlePubComp.st
这一段完整公开 M_HandlePubComp.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_HandlePubComp/// 功能 : 兼容桩方法,保留 PUBCOMP 旧接口/// 说明 : 该方法已废弃,实际处理逻辑已迁移至 M_ProcessReceive,仅为兼容旧调用保留。/// 编程人员 : ControlRookie/// 时间 : 2026-05-05/// 版本 : V1.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_HandlePubComp : BOOLVAR_INPUTEND_VAR// === IMPLEMENTATION ===// 逻辑已移至M_ProcessReceiveM_HandlePubComp := FALSE;
完整代码 11:M_InflightAdd.st
这一段完整公开 M_InflightAdd.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_InflightAdd/// 功能 : 将出站消息加入在途队列/// 说明 : 为 QoS1 / QoS2 发布消息创建在途记录并返回索引/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_InflightAdd : UINTVAR_INPUTuiPacketId : UINT; // 需要登记到出站在途队列的 Packet IdentifiereQoS : E_MqttQoS; // 消息 QoS 等级sTopic : STRING(GVL_Mqtt.cnMaxTopicLen); // 发布主题sPayload : STRING(GVL_Mqtt.cnMaxPayloadSize); // 发布载荷uiPayloadLen : UINT; // 发布载荷长度[byte]bRetain : BOOL; // Retain 标志END_VARVARi : UINT; // 循环索引END_VAR// === IMPLEMENTATION ===// 先做容量保护,避免超出本地在途窗口上限后继续塞入新消息。IF uiInflightCount >= GVL_Mqtt.cnMaxInflight THENM_InflightAdd := 0;RETURN;END_IF// 在线性数组里找第一条空槽位,用最简单直接的方式维护 QoS>0 出站上下文。FOR i := 1 TO GVL_Mqtt.cnMaxInflight DOIF NOT aInflight[i].bUsed THENaInflight[i].bUsed := TRUE;aInflight[i].uiPacketId := uiPacketId;aInflight[i].eQoS := eQoS;aInflight[i].sTopic := sTopic;aInflight[i].sPayload := sPayload;aInflight[i].uiPayloadLen := uiPayloadLen;aInflight[i].bRetain := bRetain;aInflight[i].bDup := FALSE;aInflight[i].uiRetryCount := 0;aInflight[i].tLastSend := TIME();aInflight[i].eState := E_MqttInflightState.iPublishSent;uiInflightCount := uiInflightCount + 1;// MQTT 5.0 下,发送一条新的 QoS>0 出站消息就要消耗一份服务器授予的发送配额。IF (eVersion = E_MqttVersion.byMqttVersion50) AND (uiSendQuota > 0) THENuiSendQuota := uiSendQuota - 1;END_IFM_InflightAdd := i;RETURN;END_IFEND_FORM_InflightAdd := 0;
完整代码 12:M_InflightCheckTimeout.st
这一段完整公开 M_InflightCheckTimeout.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_InflightCheckTimeout/// 功能 : 扫描在途消息超时/// 说明 : 找到需要重发的首个在途消息并返回其索引/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_InflightCheckTimeout : UINTVARi : UINT; // 循环索引tCurrent : TIME; // 当前时间END_VAR// === IMPLEMENTATION ===// 逐条扫描在途表,找到第一条已经超时且仍允许重发的消息。tCurrent := TIME();FOR i := 1 TO GVL_Mqtt.cnMaxInflight DOIF aInflight[i].bUsed THENIF (tCurrent - aInflight[i].tLastSend) >= GVL_Mqtt.cnInflightTimeout THENIF aInflight[i].uiRetryCount < GVL_Mqtt.cnMaxRetries THEN// 进入重发前先置 DUP,并刷新“上次发送时间”和重试计数。aInflight[i].bDup := TRUE;aInflight[i].uiRetryCount := aInflight[i].uiRetryCount + 1;aInflight[i].tLastSend := tCurrent;M_InflightCheckTimeout := i;RETURN;ELSE// 超过最大重试次数后,说明这条消息的交付闭环已经不可恢复,记录错误并释放槽位。M_SetError(uiErrorCode := TO_UINT(E_ReasonCode.uiErrTimeout),sMessage := 'Inflight publish retry exceeded');M_InflightRemove(uiPacketId := aInflight[i].uiPacketId);END_IFEND_IFEND_IFEND_FORM_InflightCheckTimeout := 0;
完整代码 13:M_InflightClear.st
这一段完整公开 M_InflightClear.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_InflightClear/// 功能 : 清空出站在途队列/// 说明 : 在禁用、断开或新会话开始时重置在途消息/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_InflightClear : BOOLVARi : UINT; // 循环索引END_VAR// === IMPLEMENTATION ===// 清空所有出站在途槽位,同时一并清掉接收侧 QoS2 去重表。FOR i := 1 TO GVL_Mqtt.cnMaxInflight DOaInflight[i].bUsed := FALSE;aInflight[i].uiPacketId := 0;aInflight[i].sTopic := '';aInflight[i].sPayload := '';aInflight[i].uiPayloadLen := 0;aInflight[i].bRetain := FALSE;aInflight[i].bDup := FALSE;aInflight[i].uiRetryCount := 0;aInflight[i].tLastSend := T#0S;aInflight[i].eState := E_MqttInflightState.iIdle;aRxQoS2PacketIds[i] := 0;END_FORuiInflightCount := 0;uiRxInFlightQosCount := 0;uiRetryInflightIndex := 0;// 新会话开始时,发送窗口恢复到服务端允许的 Receive Maximum;// 如果服务端还没明确给过值,就退回本地默认窗口。IF uiServerReceiveMax = 0 THENuiSendQuota := GVL_Mqtt.cnDefaultReceiveMax;ELSEuiSendQuota := uiServerReceiveMax;END_IFM_InflightClear := TRUE;
完整代码 14:M_InflightFind.st
这一段完整公开 M_InflightFind.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_InflightFind/// 功能 : 按 Packet ID 查找在途消息/// 说明 : 返回队列索引,0 表示未找到/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_InflightFind : UINTVAR_INPUTuiPacketId : UINT; // 需要在出站在途队列中查找的 Packet IdentifierEND_VARVARi : UINT; // 循环索引END_VAR// === IMPLEMENTATION ===FOR i := 1 TO GVL_Mqtt.cnMaxInflight DOIF aInflight[i].bUsed AND aInflight[i].uiPacketId = uiPacketId THENM_InflightFind := i;RETURN;END_IFEND_FORM_InflightFind := 0;
完整代码 15:M_InflightRemove.st
这一段完整公开 M_InflightRemove.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_InflightRemove/// 功能 : 从在途队列删除消息/// 说明 : 在 QoS1 / QoS2 流程完成后释放在途槽位/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_InflightRemove : BOOLVAR_INPUTuiPacketId : UINT; // 需要从出站在途队列移除的 Packet IdentifierEND_VARVARuiIndex : UINT; // 命中的出站在途队列槽位索引END_VAR// === IMPLEMENTATION ===// 先按 Packet Identifier 找到对应在途槽位;找不到说明当前没有需要释放的上下文。uiIndex := M_InflightFind(uiPacketId := uiPacketId);IF uiIndex = 0 THENM_InflightRemove := FALSE;RETURN;END_IF// 释放槽位时把所有和这条消息相关的缓存一起清掉,避免后续误复用旧数据。aInflight[uiIndex].bUsed := FALSE;aInflight[uiIndex].uiPacketId := 0;aInflight[uiIndex].sTopic := '';aInflight[uiIndex].sPayload := '';aInflight[uiIndex].uiPayloadLen := 0;aInflight[uiIndex].bRetain := FALSE;aInflight[uiIndex].bDup := FALSE;aInflight[uiIndex].uiRetryCount := 0;aInflight[uiIndex].tLastSend := T#0S;aInflight[uiIndex].eState := E_MqttInflightState.iIdle;IF uiInflightCount > 0 THENuiInflightCount := uiInflightCount - 1;END_IF// MQTT 5.0 下,消息完成后需要把一份发送配额还给本地发送窗口。IF eVersion = E_MqttVersion.byMqttVersion50 THENIF uiSendQuota < uiServerReceiveMax THENuiSendQuota := uiSendQuota + 1;END_IFEND_IFM_InflightRemove := TRUE;
完整代码 16:M_InflightUpdateState.st
这一段完整公开 M_InflightUpdateState.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_InflightUpdateState/// 功能 : 更新在途消息状态/// 说明 : 对 QoS1 / QoS2 流程的状态迁移执行最小合法性约束/// 编程人员 : ControlRookie/// 时间 : 2026-05-05/// 版本 : V1.1/// ======================================================================={attribute 'hide_all_locals'}METHOD M_InflightUpdateState : BOOLVAR_INPUTuiPacketId : UINT; // 需要更新状态的出站在途 Packet IdentifiereNewState : E_MqttInflightState; // 新状态END_VARVARuiIndex : UINT; // 命中的出站在途队列槽位索引bTransitionOk : BOOL; // 状态迁移是否合法END_VAR// === IMPLEMENTATION ===// 只有已经存在于在途表中的 Packet Identifier 才允许更新状态。uiIndex := M_InflightFind(uiPacketId := uiPacketId);IF uiIndex = 0 THENM_InflightUpdateState := FALSE;RETURN;END_IF// 这里只允许最小必要的合法迁移,避免状态机被异常报文拉进不合理状态。bTransitionOk := FALSE;CASE aInflight[uiIndex].eState OFE_MqttInflightState.iPublishSent:IF (eNewState = E_MqttInflightState.iPublishSent) OR(eNewState = E_MqttInflightState.iPubRecReceived) OR(eNewState = E_MqttInflightState.iCompleted) THENbTransitionOk := TRUE;END_IFE_MqttInflightState.iPubRecReceived:IF (eNewState = E_MqttInflightState.iPubRecReceived) OR(eNewState = E_MqttInflightState.iPubRelSent) THENbTransitionOk := TRUE;END_IFE_MqttInflightState.iPubRelSent:IF (eNewState = E_MqttInflightState.iPubRelSent) OR(eNewState = E_MqttInflightState.iCompleted) THENbTransitionOk := TRUE;END_IFE_MqttInflightState.iCompleted:IF eNewState = E_MqttInflightState.iCompleted THENbTransitionOk := TRUE;END_IFEND_CASEIF NOT bTransitionOk THENM_InflightUpdateState := FALSE;RETURN;END_IF// 合法迁移才真正落盘,供后续超时重发和 ACK 匹配继续使用。aInflight[uiIndex].eState := eNewState;M_InflightUpdateState := TRUE;
完整代码 17:M_RxQoS2Add.st
这一段完整公开 M_RxQoS2Add.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_RxQoS2Add/// 功能 : 记录入站 QoS2 报文标识符/// 说明 : 用于 QoS2 消息去重,成功返回 TRUE/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_RxQoS2Add : BOOLVAR_INPUTuiPacketId : UINT; // 需要登记到入站 QoS2 去重表的 Packet IdentifierEND_VARVARi : UINT; // 循环索引END_VAR// === IMPLEMENTATION ===// QoS2 的 Packet Identifier 不能为 0。IF uiPacketId = 0 THENM_RxQoS2Add := FALSE;RETURN;END_IF// 已经存在则直接视为登记成功,避免重复占用去重槽位。IF M_RxQoS2Find(uiPacketId := uiPacketId) > 0 THENM_RxQoS2Add := TRUE;RETURN;END_IF// 找第一条空槽位登记,用于后续识别重复到达的 QoS2 PUBLISH。FOR i := 1 TO GVL_Mqtt.cnMaxInflight DOIF aRxQoS2PacketIds[i] = 0 THENaRxQoS2PacketIds[i] := uiPacketId;uiRxInFlightQosCount := uiRxInFlightQosCount + 1;M_RxQoS2Add := TRUE;RETURN;END_IFEND_FORM_RxQoS2Add := FALSE;
完整代码 18:M_RxQoS2Find.st
这一段完整公开 M_RxQoS2Find.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_RxQoS2Find/// 功能 : 查找入站 QoS2 去重记录/// 说明 : 返回索引,0 表示未找到/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_RxQoS2Find : UINTVAR_INPUTuiPacketId : UINT; // 需要在入站 QoS2 去重表中查找的 Packet IdentifierEND_VARVARi : UINT; // 循环索引END_VAR// === IMPLEMENTATION ===FOR i := 1 TO GVL_Mqtt.cnMaxInflight DOIF aRxQoS2PacketIds[i] = uiPacketId THENM_RxQoS2Find := i;RETURN;END_IFEND_FORM_RxQoS2Find := 0;
完整代码 19:M_RxQoS2Remove.st
这一段完整公开 M_RxQoS2Remove.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_RxQoS2Remove/// 功能 : 删除入站 QoS2 去重记录/// 说明 : 在收到对应 PUBREL 后释放记录/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_RxQoS2Remove : BOOLVAR_INPUTuiPacketId : UINT; // 需要从入站 QoS2 去重表中移除的 Packet IdentifierEND_VARVARuiIndex : UINT; // 命中的入站 QoS2 去重表槽位索引END_VAR// === IMPLEMENTATION ===// 先找到对应 Packet Identifier 的去重记录;没有就说明当前无需释放。uiIndex := M_RxQoS2Find(uiPacketId := uiPacketId);IF uiIndex = 0 THENM_RxQoS2Remove := FALSE;RETURN;END_IF// PUBREL 完成后,这条入站 QoS2 消息的去重上下文就可以释放。aRxQoS2PacketIds[uiIndex] := 0;IF uiRxInFlightQosCount > 0 THENuiRxInFlightQosCount := uiRxInFlightQosCount - 1;END_IFM_RxQoS2Remove := TRUE;
完整代码 20:M_TopicAliasClear.st
这一段完整公开 M_TopicAliasClear.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_TopicAliasClear/// 功能 : 清空主题别名表/// 说明 : 在新连接建立时清理旧连接的 Topic Alias 映射/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_TopicAliasClear : BOOLVARi : UINT; // 循环索引END_VAR// === IMPLEMENTATION ===// Topic Alias 属于单连接上下文;新连接建立前必须把旧映射全部清掉。FOR i := 1 TO GVL_Mqtt.cnMaxTopicAlias DOaTopicAlias[i].bUsed := FALSE;aTopicAlias[i].uiAliasId := 0;aTopicAlias[i].sTopic := '';END_FORuiTopicAliasCount := 0;uiNextTopicAlias := 1;M_TopicAliasClear := TRUE;
完整代码 21:M_TopicAliasLookup.st
这一段完整公开 M_TopicAliasLookup.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_TopicAliasLookup/// 功能 : 查找主题别名对应的主题/// 说明 : 返回主题字符串,未找到时返回空字符串/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_TopicAliasLookup : STRING(GVL_Mqtt.cnMaxTopicLen)VAR_INPUTuiAliasId : UINT; // 主题别名END_VARVARi : UINT; // 循环索引END_VAR// === IMPLEMENTATION ===// Topic Alias 0 无效,直接返回空字符串。IF uiAliasId = 0 THENM_TopicAliasLookup := '';RETURN;END_IF// 在本地别名表里按编号查找,命中后返回缓存的原始主题。FOR i := 1 TO GVL_Mqtt.cnMaxTopicAlias DOIF aTopicAlias[i].bUsed AND aTopicAlias[i].uiAliasId = uiAliasId THENM_TopicAliasLookup := aTopicAlias[i].sTopic;RETURN;END_IFEND_FORM_TopicAliasLookup := '';
完整代码 22:M_TopicAliasRegister.st
这一段完整公开 M_TopicAliasRegister.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_TopicAliasRegister/// 功能 : 注册主题别名映射/// 说明 : 保存接收方向 Topic Alias 与主题的映射关系/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_TopicAliasRegister : BOOLVAR_INPUTuiAliasId : UINT; // 主题别名sTopic : STRING(GVL_Mqtt.cnMaxTopicLen); // 主题名称END_VARVARi : UINT; // 循环索引END_VAR// === IMPLEMENTATION ===// Topic Alias 0 在 MQTT 5.0 中非法,直接拒绝。IF uiAliasId = 0 THENM_TopicAliasRegister := FALSE;RETURN;END_IF// 本地别名表也受固定容量限制,超过后不再继续登记。IF uiAliasId > GVL_Mqtt.cnMaxTopicAlias THENM_TopicAliasRegister := FALSE;RETURN;END_IF// 如果别名已存在,则按协议语义覆盖为最新主题映射。FOR i := 1 TO GVL_Mqtt.cnMaxTopicAlias DOIF aTopicAlias[i].bUsed AND aTopicAlias[i].uiAliasId = uiAliasId THENaTopicAlias[i].sTopic := sTopic;M_TopicAliasRegister := TRUE;RETURN;END_IFEND_FOR// 否则找空槽位插入一条新映射,供后续“仅带 Topic Alias 不带 Topic Name”的报文复原主题。FOR i := 1 TO GVL_Mqtt.cnMaxTopicAlias DOIF NOT aTopicAlias[i].bUsed THENaTopicAlias[i].bUsed := TRUE;aTopicAlias[i].uiAliasId := uiAliasId;aTopicAlias[i].sTopic := sTopic;uiTopicAliasCount := uiTopicAliasCount + 1;M_TopicAliasRegister := TRUE;RETURN;END_IFEND_FORM_TopicAliasRegister := FALSE;
完整代码 23:M_RecordReceivedMessage.st
这一段完整公开 M_RecordReceivedMessage.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : M_RecordReceivedMessage/// 功能 : 记录最新接收消息/// 说明 : 统一维护接收历史、单次事件标志和统计量,避免多处分支重复写入。/// 编程人员 : ControlRookie/// 时间 : 2026-05-07/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}METHOD M_RecordReceivedMessage : BOOLVARj : DINT; // 历史数组循环索引END_VAR// === IMPLEMENTATION ===FOR j := UPPER_BOUND(aRecTopicList, 1) TO LOWER_BOUND(aRecTopicList, 1) + 1 BY -1 DOaRecTopicList[j] := aRecTopicList[j - 1];aRecPayloadList[j] := aRecPayloadList[j - 1];END_FORaRecTopicList[0] := sRecTopic;aRecPayloadList[0] := sRecPayload;bMessageReceived := TRUE;uiMessagesReceived := uiMessagesReceived + 1;dtLastMessageTime := ULINT_TO_DT(uliSysTime / 1000);M_RecordReceivedMessage := TRUE;
完整代码 24:FB_MqttPropertyCodec.st
这一段完整公开 FB_MqttPropertyCodec.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : FB_MqttPropertyCodec/// 功能 : MQTT 5.0 属性编解码器/// 说明 : 提供属性的编码(写入缓冲区)和解码(从缓冲区读取)功能,/// 供 CONNECT、CONNACK、PUBLISH、SUBSCRIBE 等报文复用/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}FUNCTION_BLOCK FB_MqttPropertyCodecVAR// 编码缓冲区(由 Encode 方法写入 MQTT 5.0 属性时复用)aPropBuf : ARRAY[0..511] OF BYTE; // MQTT 5.0 属性编码临时缓冲区[byte]uiPropPos : UINT; // 当前属性缓冲区写入位置索引[byte]END_VAR// === IMPLEMENTATION ===// 本 FB 自身不直接执行业务逻辑,// 真实的属性读写行为全部由其下属方法承担。
完整代码 25:FB_Random.st
这一段完整公开 FB_Random.st。读代码时先看对象职责,再看状态、长度、错误和返回值,不要只抄几行赋值。
/// =======================================================================/// 名称 : FB_Random/// 功能 : 简单伪随机数发生器/// 说明 : 根据 3 个输入种子生成 ASCII 可打印区间内的伪随机值,/// 主要用于示例或轻量场景,不作为密码学随机源/// 编程人员 : ControlRookie/// 时间 : 2026-05-08/// 版本 : V2.0/// ======================================================================={attribute 'hide_all_locals'}FUNCTION_BLOCK FB_RandomVAR_INPUTdwSeedA : DWORD; // 输入种子 AdwSeedB : DWORD; // 输入种子 BdwSeedV : DWORD; // 输入种子 VEND_VARVAR_OUTPUTdwRandom : DWORD; // 输出的伪随机 ASCII 码值END_VARVARdwModulus : DWORD; // 取模基数dwSequence : DWORD; // 内部序列值END_VAR// === IMPLEMENTATION ===// 用一个很轻量的递增序列扰动输入种子,产出 A..Y 区间的可打印 ASCII 值。dwSequence := dwSequence + 2;dwModulus := 25;dwRandom := (dwSeedA + dwSeedV + dwSeedB + dwSequence) MOD dwModulus + 65;
这一篇你最该记住的几句话
源码加更不是片段展示,而是完整源码对象公开讲解。 先建立对象地图,再读状态、报文和事务,现场调试才不会迷路。 判断源码成熟度,不只看功能是否实现,还要看边界、错误和在线观测量是否闭环。
系列导航
系列定位:MqttClient 系列教程,源码加更阶段,第 16 篇 / 共 16 篇 上一篇:源码加更05 下一篇:系列收官,无下一篇
夜雨聆风