/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "aqhome/mqtt/msg_mqtt.h" #include #include #include // maximum remaining length: fff ffff static void _flagsToDb(uint8_t flags, GWEN_DB_NODE *dbDest, const char *varNameFlags, const char *varNameQos); GWEN_MSG *GWEN_MqttMsg_new(uint8_t typeAndFlags, uint32_t payloadLen, const uint8_t *payload) { if (payloadLen>0xfffffffu) { DBG_ERROR(AQH_LOGDOMAIN, "Too many bytes in payload, can't encode into MQTT message"); return NULL; } else { GWEN_MSG *msg; uint8_t *ptr; uint32_t i; uint32_t len; msg=GWEN_Msg_new(payloadLen+1+4); ptr=GWEN_Msg_GetBuffer(msg); *(ptr++)=typeAndFlags; len=payloadLen; i=0; do { uint8_t b; b=len & 0x7f; len>>=7; if (len) b|=0x80; *(ptr++)=b; } while(len && i<4); if (payloadLen) { GWEN_Msg_SetParsedPayloadSize(msg, payloadLen); GWEN_Msg_SetParsedPayloadOffset(msg, ptr-GWEN_Msg_GetBuffer(msg)); memmove(ptr, payload, payloadLen); ptr+=payloadLen; } i=ptr-GWEN_Msg_GetBuffer(msg); GWEN_Msg_SetBytesInBuffer(msg, i); return msg; } } int AQH_MqttMsg_IsMsgComplete(const GWEN_MSG *msg) { if (msg && (GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET)) { uint32_t msgLength; msgLength=GWEN_Msg_GetParsedPayloadOffset(msg)+GWEN_Msg_GetParsedPayloadSize(msg); if (GWEN_Msg_GetBytesInBuffer(msg)>=msgLength) return 1; } return 0; } void AQH_MqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) { GWEN_Text_DumpString2Buffer((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), dbuf, 2); } int AQH_MqttMsg_toDb(const GWEN_MSG *msg, GWEN_DB_NODE *dbDest) { const uint8_t *msgPtr; uint32_t msgLen; uint8_t b; const char *s; msgPtr=GWEN_Msg_GetConstBuffer(msg); msgLen=GWEN_Msg_GetBytesInBuffer(msg); if (msgLen<2) { DBG_ERROR(AQH_LOGDOMAIN, "Message too small (%d bytes)", msgLen); return GWEN_ERROR_BAD_DATA; } b=msgPtr[0]; s=AQH_MqttMsg_MsgTypeToString(b); GWEN_DB_SetCharValue(dbDest, GWEN_DB_FLAGS_OVERWRITE_VARS, "cmdString", s); GWEN_DB_SetIntValue(dbDest, GWEN_DB_FLAGS_OVERWRITE_VARS, "cmdAndFlags", b); _flagsToDb(b & 0x0f, dbDest, "flags", "qos"); return 0; } const char *AQH_MqttMsg_MsgTypeToString(uint8_t t) { switch(t & 0xf0) { case (AQH_MQTTMSG_MSGTYPE_CONNECT & 0xf0): return "CONNECT"; case (AQH_MQTTMSG_MSGTYPE_CONNACK & 0xf0): return "CONACK"; case (AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0): return "PUBLISH"; case (AQH_MQTTMSG_MSGTYPE_PUBACK & 0xf0): return "PUBACK"; case (AQH_MQTTMSG_MSGTYPE_PUBREC & 0xf0): return "PUBREC"; case (AQH_MQTTMSG_MSGTYPE_PUBREL & 0xf0): return "PUBREL"; case (AQH_MQTTMSG_MSGTYPE_PUBCOMP & 0xf0): return "PUBCOMP"; case (AQH_MQTTMSG_MSGTYPE_SUBSCRIBE & 0xf0): return "SUBSCRIBE"; case (AQH_MQTTMSG_MSGTYPE_SUBACK & 0xf0): return "SUBACK"; case (AQH_MQTTMSG_MSGTYPE_UNSUBSCRIBE & 0xf0): return "UNSUBSCRIBE"; case (AQH_MQTTMSG_MSGTYPE_UNSUBACK & 0xf0): return "UNSUBACK"; case (AQH_MQTTMSG_MSGTYPE_PINGREQ & 0xf0): return "PINGREQ"; case (AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0): return "PINGRESP"; case (AQH_MQTTMSG_MSGTYPE_DISCONNECT & 0xf0): return "DISCONNECT"; default: return "(unknown)"; } } uint8_t AQH_MqttMsg_GetMsgTypeAndFlags(const GWEN_MSG *msg) { uint32_t msgLen; msgLen=GWEN_Msg_GetBytesInBuffer(msg); if (msgLen>0) { const uint8_t *msgPtr; msgPtr=GWEN_Msg_GetConstBuffer(msg); return msgPtr[0]; } return 0; } void _flagsToDb(uint8_t flags, GWEN_DB_NODE *dbDest, const char *varNameFlags, const char *varNameQos) { GWEN_DB_DeleteVar(dbDest, varNameFlags); GWEN_DB_DeleteVar(dbDest, varNameQos); if (flags & AQH_MQTTMSG_FLAGS_DUP) GWEN_DB_SetCharValue(dbDest, GWEN_DB_FLAGS_DEFAULT, varNameFlags, "dup"); if (flags & AQH_MQTTMSG_FLAGS_RETAIN) GWEN_DB_SetCharValue(dbDest, GWEN_DB_FLAGS_DEFAULT, varNameFlags, "retain"); GWEN_DB_SetIntValue(dbDest, GWEN_DB_FLAGS_DEFAULT, varNameQos, (flags>>1) & 0x3); } /* helper functions */ void AQH_MqttMsg_AppendStringWithLen(GWEN_BUFFER *buf, const char *s) { unsigned int len; len=strlen(s); GWEN_Buffer_AppendByte(buf, (len>>8) & 0xff); GWEN_Buffer_AppendByte(buf, len & 0xff); if (s && *s) GWEN_Buffer_AppendString(buf, s); } int AQH_MqttMsg_DumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf) { if (len>1) { int slen; slen=(ptr[0]<<8)+ptr[1]; if (slen) { if (slen>(len-2)) { DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", (unsigned long int) slen, (unsigned long int) len); return GWEN_ERROR_BAD_DATA; } GWEN_Buffer_AppendBytes(buf, (const char*) ptr+2, slen); } return slen+2; } return GWEN_ERROR_BAD_DATA; } char *AQH_MqttMsg_ExtractStringAt(const uint8_t *ptr, uint32_t len) { if (len>1) { int slen; slen=(ptr[0]<<8)+ptr[1]; if (slen) { char *result; if (slen>(len-2)) { DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", (unsigned long int) slen, (unsigned long int) len); return NULL; } result=(char*) malloc(slen+1); if (result==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Error on malloc"); return NULL; } memmove(result, ptr+2, slen); result[slen]=0; return result; } } return NULL; } int AQH_MqttMsg_SkipStringAt(const uint8_t *ptr, uint32_t len) { if (len>1) { int slen; slen=(ptr[0]<<8)+ptr[1]; if (slen) { if (slen>(len-2)) { DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", (unsigned long int) slen, (unsigned long int) len); return GWEN_ERROR_BAD_DATA; } } return slen+2; } return GWEN_ERROR_BAD_DATA; }