diff --git a/aqhome/mqtt/endpoint_mqttc.c b/aqhome/mqtt/endpoint_mqttc.c index 5a3b7fb..a2dd7c8 100644 --- a/aqhome/mqtt/endpoint_mqttc.c +++ b/aqhome/mqtt/endpoint_mqttc.c @@ -474,6 +474,7 @@ int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg) if (remainingBytesInBuffer>1) { const uint8_t *ptr; int idx; + int shift=0; ptr=GWEN_Msg_GetConstBuffer(msg); idx=1; @@ -482,8 +483,7 @@ int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg) uint8_t len; len=ptr[idx]; - mqttRemainingLength<<=8; - mqttRemainingLength+=(len & 0x7f); + mqttRemainingLength+=(len & 0x7f)< +#include + // maximum remaining length: fff ffff @@ -205,3 +207,53 @@ int AQH_MqttMsg_DumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf) +char *AQH_MqttMsg_ExtractStringAt(const uint8_t *ptr, uint32_t len) +{ + if (len>1) { + int slen; + + slen=(ptr[0]<<8)+ptr[1]; + if (slen) { + char *result; + + if (slen>(len-2)) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", + (unsigned long int) slen, (unsigned long int) len); + return NULL; + } + + result=(char*) malloc(slen+1); + if (result==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error on malloc"); + return NULL; + } + memmove(result, ptr+2, slen); + result[slen]=0; + return result; + } + } + return NULL; +} + + + +int AQH_MqttMsg_SkipStringAt(const uint8_t *ptr, uint32_t len) +{ + if (len>1) { + int slen; + + slen=(ptr[0]<<8)+ptr[1]; + if (slen) { + if (slen>(len-2)) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", + (unsigned long int) slen, (unsigned long int) len); + return GWEN_ERROR_BAD_DATA; + } + } + return slen+2; + } + return GWEN_ERROR_BAD_DATA; +} + + + diff --git a/aqhome/mqtt/msg_mqtt.h b/aqhome/mqtt/msg_mqtt.h index e1faa27..45ed922 100644 --- a/aqhome/mqtt/msg_mqtt.h +++ b/aqhome/mqtt/msg_mqtt.h @@ -61,6 +61,9 @@ AQHOME_API uint8_t AQH_MqttMsg_GetMsgTypeAndFlags(const GWEN_MSG *msg); AQHOME_API void AQH_MqttMsg_AppendStringWithLen(GWEN_BUFFER *buf, const char *s); AQHOME_API int AQH_MqttMsg_DumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf); +AQHOME_API int AQH_MqttMsg_SkipStringAt(const uint8_t *ptr, uint32_t len); +AQHOME_API char *AQH_MqttMsg_ExtractStringAt(const uint8_t *ptr, uint32_t len); + #endif diff --git a/aqhome/mqtt/msg_mqtt_publish.c b/aqhome/mqtt/msg_mqtt_publish.c index 35e35c5..28afac7 100644 --- a/aqhome/mqtt/msg_mqtt_publish.c +++ b/aqhome/mqtt/msg_mqtt_publish.c @@ -35,9 +35,8 @@ GWEN_MSG *GWEN_PublishMqttMsg_new(uint8_t flags, uint16_t packetId, const char * GWEN_Buffer_AppendByte(buf, (packetId>>8) & 0xff); GWEN_Buffer_AppendByte(buf, packetId & 0xff); } - GWEN_Buffer_AppendByte(buf, (messageLen>>8) & 0xff); - GWEN_Buffer_AppendByte(buf, messageLen & 0xff); - GWEN_Buffer_AppendBytes(buf, (const char*) messagePtr, messageLen); + if (messagePtr && messageLen) + GWEN_Buffer_AppendBytes(buf, (const char*) messagePtr, messageLen); msg=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_PUBLISH | flags, GWEN_Buffer_GetUsedBytes(buf), (const uint8_t*) GWEN_Buffer_GetStart(buf)); GWEN_Buffer_free(buf); @@ -86,6 +85,90 @@ void AQH_PublishMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, con +char *AQH_PublishMqttMsg_ExtractTopic(const GWEN_MSG *msg) +{ + const uint8_t *msgPtr; + uint32_t msgLen; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + + if (msgLen>1) { + uint32_t payloadLen; + const uint8_t *payloadPtr; + char *result; + + payloadLen=GWEN_Msg_GetParsedPayloadSize(msg); + payloadPtr=msgPtr+GWEN_Msg_GetParsedPayloadOffset(msg); + + result=AQH_MqttMsg_ExtractStringAt(payloadPtr, payloadLen); + if (result==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "here"); + return NULL; + } + return result; + } + + return NULL; +} + + + +char *AQH_PublishMqttMsg_ExtractValue(const GWEN_MSG *msg) +{ + const uint8_t *msgPtr; + uint32_t msgLen; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + + if (msgLen>1) { + uint8_t flags; + uint32_t payloadLen; + const uint8_t *payloadPtr; + int rv; + + flags=msgPtr[0] & 0x0f; + payloadLen=GWEN_Msg_GetParsedPayloadSize(msg); + payloadPtr=msgPtr+GWEN_Msg_GetParsedPayloadOffset(msg); + + rv=AQH_MqttMsg_SkipStringAt(payloadPtr, payloadLen); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return NULL; + } + payloadLen-=rv; + payloadPtr+=rv; + + if (flags & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + if (payloadLen<2) { + DBG_ERROR(AQH_LOGDOMAIN, "Msg too small"); + return NULL; + } + else { + payloadLen-=2; + payloadPtr+=2; + } + } + + if (payloadLen) { + char *result; + + result=(char*) malloc(payloadLen+1); + memmove(result, payloadPtr, payloadLen); + result[payloadLen]=0; + return result; + } + else { + DBG_INFO(AQH_LOGDOMAIN, "No message"); + } + } + + return NULL; +} + + + int _dumpPayload(uint8_t flags, const uint8_t *payloadPtr, uint32_t payloadLen, GWEN_BUFFER *dbuf) { int rv; @@ -118,14 +201,10 @@ int _dumpPayload(uint8_t flags, const uint8_t *payloadPtr, uint32_t payloadLen, } } - GWEN_Buffer_AppendString(dbuf, " message: "); - rv=AQH_MqttMsg_DumpString(payloadPtr, payloadLen, dbuf); - if (rv<0) { - DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); - return rv; + if (payloadLen) { + GWEN_Buffer_AppendString(dbuf, " message: "); + GWEN_Buffer_AppendBytes(dbuf, (const char*) payloadPtr, payloadLen); } - payloadLen-=rv; - payloadPtr+=rv; return 0; } diff --git a/aqhome/mqtt/msg_mqtt_publish.h b/aqhome/mqtt/msg_mqtt_publish.h index fddfcef..14c5328 100644 --- a/aqhome/mqtt/msg_mqtt_publish.h +++ b/aqhome/mqtt/msg_mqtt_publish.h @@ -24,6 +24,9 @@ AQHOME_API GWEN_MSG *GWEN_PublishMqttMsg_new(uint8_t flags, uint16_t packetId, AQHOME_API void AQH_PublishMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); +AQHOME_API char *AQH_PublishMqttMsg_ExtractTopic(const GWEN_MSG *msg); +AQHOME_API char *AQH_PublishMqttMsg_ExtractValue(const GWEN_MSG *msg); +