From 40c3a3ee4e0481bc29c2719390f871184027b299 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Wed, 29 Mar 2023 00:42:49 +0200 Subject: [PATCH] mqtt: added more message types, added test for them. --- aqhome/libtest.c | 56 +++++++++- aqhome/mqtt/0BUILD | 6 + aqhome/mqtt/msg_mqtt.c | 20 ++++ aqhome/mqtt/msg_mqtt.h | 2 + aqhome/mqtt/msg_mqtt_connack.c | 62 +++++++++++ aqhome/mqtt/msg_mqtt_connack.h | 41 +++++++ aqhome/mqtt/msg_mqtt_publish.c | 170 +++++++++++++++++++++++++++++ aqhome/mqtt/msg_mqtt_publish.h | 30 +++++ aqhome/mqtt/msg_mqtt_pubresponse.c | 62 +++++++++++ aqhome/mqtt/msg_mqtt_pubresponse.h | 30 +++++ 10 files changed, 477 insertions(+), 2 deletions(-) create mode 100644 aqhome/mqtt/msg_mqtt_connack.c create mode 100644 aqhome/mqtt/msg_mqtt_connack.h create mode 100644 aqhome/mqtt/msg_mqtt_publish.c create mode 100644 aqhome/mqtt/msg_mqtt_publish.h create mode 100644 aqhome/mqtt/msg_mqtt_pubresponse.c create mode 100644 aqhome/mqtt/msg_mqtt_pubresponse.h diff --git a/aqhome/libtest.c b/aqhome/libtest.c index 96c4cd9..ecb8154 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -11,6 +11,9 @@ #include "aqhome/ipc/endpoint_node_ipc_tcp.h" #include "aqhome/mqtt/endpoint_mqttc.h" #include "aqhome/mqtt/msg_mqtt_connect.h" +#include "aqhome/mqtt/msg_mqtt_connack.h" +#include "aqhome/mqtt/msg_mqtt_publish.h" +#include "aqhome/mqtt/msg_mqtt_pubresponse.h" #include "aqhome/msgmanager.h" #include "aqhome/aqhome.h" @@ -192,6 +195,7 @@ int testMqttConnection() return 2; } + fprintf(stdout, "Sending CONNECT\n"); msgOut=GWEN_ConnectMqttMsg_new("MQTT", 4, 0, 10, "CLIENTID123", NULL, NULL); if (msgOut==NULL) { DBG_ERROR(NULL, "Error creating message"); @@ -199,6 +203,7 @@ int testMqttConnection() } GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); + fprintf(stdout, "Waiting for response\n"); for (;;) { GWEN_MSG *msg; @@ -207,12 +212,59 @@ int testMqttConnection() GWEN_MsgEndpointMgr_IoLoopOnce(emgr); msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp); if (msg) { - DBG_ERROR(NULL, "Received this message:"); - GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); + if (AQH_MqttMsg_GetMsgTypeAndFlags(msg)==AQH_MQTTMSG_MSGTYPE_CONNACK) { + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_ConnAckMqttMsg_DumpToBuffer(msg, buf, "received"); + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + } + else { + DBG_ERROR(NULL, "Received this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); + } GWEN_Msg_free(msg); break; } } + + fprintf(stdout, "Sending PUBLISH\n"); + //msgOut=GWEN_PublishMqttMsg_new(AQH_MQTTMSG_FLAGS_QOS1, 1, "test/subject1", (const uint8_t*) "29.9", 4); + msgOut=GWEN_PublishMqttMsg_new(0, 0, "test/subject1", (const uint8_t*) "29.9", 4); + if (msgOut==NULL) { + DBG_ERROR(NULL, "Error creating message"); + return 2; + } + GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); + + fprintf(stdout, "Waiting for response\n"); + for (;;) { + GWEN_MSG *msg; + + DBG_DEBUG(AQH_LOGDOMAIN, "Next loop"); + //GWEN_MsgManager_LoopOnce(emgr); + GWEN_MsgEndpointMgr_IoLoopOnce(emgr); + msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp); + if (msg) { + if (AQH_MqttMsg_GetMsgTypeAndFlags(msg)==AQH_MQTTMSG_MSGTYPE_PUBACK) { + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_PubResponseMqttMsg_DumpToBuffer(msg, buf, "received"); + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + } + else { + DBG_ERROR(NULL, "Received this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); + } + GWEN_Msg_free(msg); + break; + } + } + + return 0; } diff --git a/aqhome/mqtt/0BUILD b/aqhome/mqtt/0BUILD index 123680b..e89f119 100644 --- a/aqhome/mqtt/0BUILD +++ b/aqhome/mqtt/0BUILD @@ -48,6 +48,9 @@ endpoint_mqttc.h msg_mqtt.h msg_mqtt_connect.h + msg_mqtt_connack.h + msg_mqtt_publish.h + msg_mqtt_pubresponse.h @@ -61,6 +64,9 @@ endpoint_mqttc.c msg_mqtt.c msg_mqtt_connect.c + msg_mqtt_connack.c + msg_mqtt_publish.c + msg_mqtt_pubresponse.c diff --git a/aqhome/mqtt/msg_mqtt.c b/aqhome/mqtt/msg_mqtt.c index 7022cea..daa8fcb 100644 --- a/aqhome/mqtt/msg_mqtt.c +++ b/aqhome/mqtt/msg_mqtt.c @@ -133,6 +133,26 @@ const char *AQH_MqttMsg_MsgTypeToString(uint8_t t) +uint8_t AQH_MqttMsg_GetMsgTypeAndFlags(const GWEN_MSG *msg) +{ + uint32_t msgLen; + + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + if (msgLen>0) { + const uint8_t *msgPtr; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + return msgPtr[0]; + } + + return 0; +} + + + + + + void _flagsToDb(uint8_t flags, GWEN_DB_NODE *dbDest, const char *varNameFlags, const char *varNameQos) { GWEN_DB_DeleteVar(dbDest, varNameFlags); diff --git a/aqhome/mqtt/msg_mqtt.h b/aqhome/mqtt/msg_mqtt.h index bacd63d..81ba619 100644 --- a/aqhome/mqtt/msg_mqtt.h +++ b/aqhome/mqtt/msg_mqtt.h @@ -54,6 +54,8 @@ AQHOME_API void AQH_MqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, AQHOME_API const char *AQH_MqttMsg_MsgTypeToString(uint8_t t); +AQHOME_API uint8_t AQH_MqttMsg_GetMsgTypeAndFlags(const GWEN_MSG *msg); + #endif diff --git a/aqhome/mqtt/msg_mqtt_connack.c b/aqhome/mqtt/msg_mqtt_connack.c new file mode 100644 index 0000000..e19e2bd --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_connack.c @@ -0,0 +1,62 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/mqtt/msg_mqtt_connack.h" + +#include + + + +GWEN_MSG *GWEN_ConnAckMqttMsg_new(uint8_t flags, uint8_t result) +{ + uint8_t data[2]; + GWEN_MSG *msg; + + data[0]=flags; + data[1]=result; + + msg=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_CONNACK, 2, data); + if (msg==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "here"); + return NULL; + } + return msg; +} + + + +void AQH_ConnAckMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) +{ + const uint8_t *msgPtr; + uint32_t msgLen; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + + if (msgLen>1) { + const uint8_t *payloadPtr; + uint32_t payloadLen; + + GWEN_Buffer_AppendArgs(dbuf, "%s %s", AQH_MqttMsg_MsgTypeToString(msgPtr[0] & 0xf0), sText); + payloadLen=GWEN_Msg_GetParsedPayloadSize(msg); + payloadPtr=msgPtr+GWEN_Msg_GetParsedPayloadOffset(msg); + if (payloadLen>=2) + GWEN_Buffer_AppendArgs(dbuf, "(flags=%02x, result=%d)", payloadPtr[0], payloadPtr[1]); + } +} + + + + + + diff --git a/aqhome/mqtt/msg_mqtt_connack.h b/aqhome/mqtt/msg_mqtt_connack.h new file mode 100644 index 0000000..7f4858c --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_connack.h @@ -0,0 +1,41 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_MQTT_CONNACK_H +#define AQH_MSG_MQTT_CONNACK_H + + +#include +#include + +#include +#include + + + +#define AQH_MQTTMSG_CONNACK_FLAGS_HAVE_SESSION 0x01u + +#define AQH_MQTTMSG_CONNACK_RESULT_ACCEPTED 0x00u +#define AQH_MQTTMSG_CONNACK_RESULT_BAD_PROTO 0x01u +#define AQH_MQTTMSG_CONNACK_RESULT_BAD_CLIENTID 0x02u +#define AQH_MQTTMSG_CONNACK_RESULT_UNAVAILABLE 0x03u +#define AQH_MQTTMSG_CONNACK_RESULT_BAD_CREDENTIALS 0x04u +#define AQH_MQTTMSG_CONNACK_RESULT_UNAUTHORIZED 0x05u + + + + + +AQHOME_API GWEN_MSG *GWEN_ConnAckMqttMsg_new(uint8_t flags, uint8_t result); + +AQHOME_API void AQH_ConnAckMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); + + + + +#endif diff --git a/aqhome/mqtt/msg_mqtt_publish.c b/aqhome/mqtt/msg_mqtt_publish.c new file mode 100644 index 0000000..aa6d66d --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_publish.c @@ -0,0 +1,170 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/mqtt/msg_mqtt_publish.h" + +#include + + + +static int _dumpPayload(uint8_t flags, const uint8_t *payloadPtr, uint32_t payloadLen, GWEN_BUFFER *dbuf); +static void _appendStringWithLen(GWEN_BUFFER *buf, const char *s); +static int _dumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf); + + + + + +GWEN_MSG *GWEN_PublishMqttMsg_new(uint8_t flags, uint16_t packetId, const char *sTopic, const uint8_t *messagePtr, uint32_t messageLen) +{ + if (sTopic && *sTopic && messagePtr && messageLen) { + GWEN_MSG *msg; + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + _appendStringWithLen(buf, sTopic); + if (flags & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + GWEN_Buffer_AppendByte(buf, (packetId>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, packetId & 0xff); + } + GWEN_Buffer_AppendByte(buf, (messageLen>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, messageLen & 0xff); + GWEN_Buffer_AppendBytes(buf, (const char*) messagePtr, messageLen); + + msg=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_PUBLISH | flags, GWEN_Buffer_GetUsedBytes(buf), (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + if (msg==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "here"); + return NULL; + } + return msg; + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Missing topic or message"); + return NULL; + } +} + + + +void AQH_PublishMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) +{ + const uint8_t *msgPtr; + uint32_t msgLen; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + + if (msgLen>1) { + uint32_t payloadLen; + const uint8_t *payloadPtr; + int rv; + + payloadLen=GWEN_Msg_GetParsedPayloadSize(msg); + payloadPtr=msgPtr+GWEN_Msg_GetParsedPayloadOffset(msg); + + GWEN_Buffer_AppendArgs(dbuf, "%s %s", AQH_MqttMsg_MsgTypeToString(msgPtr[0] & 0xf0), sText); + + GWEN_Buffer_AppendString(dbuf, "("); + rv=_dumpPayload((msgPtr[0] & 0x0f), payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + } + else { + GWEN_Buffer_AppendString(dbuf, ")"); + } + } +} + + + +int _dumpPayload(uint8_t flags, const uint8_t *payloadPtr, uint32_t payloadLen, GWEN_BUFFER *dbuf) +{ + int rv; + + GWEN_Buffer_AppendString(dbuf, "flags:"); + if (flags & AQH_MQTTMSG_FLAGS_DUP) + GWEN_Buffer_AppendString(dbuf, " DUP"); + if (flags & AQH_MQTTMSG_FLAGS_RETAIN) + GWEN_Buffer_AppendString(dbuf, " RETAIN"); + GWEN_Buffer_AppendArgs(dbuf, " QOS%d", (flags>>1) & 0x03); + + GWEN_Buffer_AppendString(dbuf, " topic: "); + rv=_dumpString(payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + payloadLen-=rv; + payloadPtr+=rv; + + if (flags & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + if (payloadLen<2) { + DBG_ERROR(AQH_LOGDOMAIN, "Msg too small"); + return GWEN_ERROR_BAD_DATA; + } + else { + GWEN_Buffer_AppendArgs(dbuf, " packet id: %d", (payloadPtr[0]<<8)+payloadPtr[1]); + payloadLen-=2; + payloadPtr+=2; + } + } + + GWEN_Buffer_AppendString(dbuf, " message: "); + rv=_dumpString(payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + payloadLen-=rv; + payloadPtr+=rv; + + return 0; +} + + + +void _appendStringWithLen(GWEN_BUFFER *buf, const char *s) +{ + unsigned int len; + + len=strlen(s); + GWEN_Buffer_AppendByte(buf, (len>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, len & 0xff); + if (s && *s) + GWEN_Buffer_AppendString(buf, s); +} + + + +int _dumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf) +{ + if (len>1) { + int slen; + + slen=(ptr[0]<<8)+ptr[1]; + if (slen) { + if (slen>(len-2)) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", + (unsigned long int) slen, (unsigned long int) len); + return GWEN_ERROR_BAD_DATA; + } + GWEN_Buffer_AppendBytes(buf, (const char*) ptr+2, slen); + } + return slen+2; + } + return GWEN_ERROR_BAD_DATA; +} + + + diff --git a/aqhome/mqtt/msg_mqtt_publish.h b/aqhome/mqtt/msg_mqtt_publish.h new file mode 100644 index 0000000..fddfcef --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_publish.h @@ -0,0 +1,30 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_MQTT_PUBLISH_H +#define AQH_MSG_MQTT_PUBLISH_H + + +#include +#include + +#include +#include + + + +AQHOME_API GWEN_MSG *GWEN_PublishMqttMsg_new(uint8_t flags, uint16_t packetId, + const char *sTopic, + const uint8_t *messagePtr, uint32_t messageLen); + +AQHOME_API void AQH_PublishMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); + + + + +#endif diff --git a/aqhome/mqtt/msg_mqtt_pubresponse.c b/aqhome/mqtt/msg_mqtt_pubresponse.c new file mode 100644 index 0000000..3b7129c --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_pubresponse.c @@ -0,0 +1,62 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/mqtt/msg_mqtt_pubresponse.h" + +#include + + + +GWEN_MSG *GWEN_PubResponseMqttMsg_new(uint8_t typeAndFlags, uint16_t pkgId) +{ + uint8_t data[2]; + GWEN_MSG *msg; + + data[0]=pkgId>>8 & 0xff; + data[1]=pkgId & 0xff; + + msg=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_CONNACK, 2, data); + if (msg==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "here"); + return NULL; + } + return msg; +} + + + +void AQH_PubResponseMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) +{ + const uint8_t *msgPtr; + uint32_t msgLen; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + + if (msgLen>1) { + const uint8_t *payloadPtr; + uint32_t payloadLen; + + GWEN_Buffer_AppendArgs(dbuf, "%s %s", AQH_MqttMsg_MsgTypeToString(msgPtr[0] & 0xf0), sText); + payloadLen=GWEN_Msg_GetParsedPayloadSize(msg); + payloadPtr=msgPtr+GWEN_Msg_GetParsedPayloadOffset(msg); + if (payloadLen>=2) + GWEN_Buffer_AppendArgs(dbuf, "(packet id=%04x)", (payloadPtr[0]<<8)+payloadPtr[1]); + } +} + + + + + + diff --git a/aqhome/mqtt/msg_mqtt_pubresponse.h b/aqhome/mqtt/msg_mqtt_pubresponse.h new file mode 100644 index 0000000..597b3ea --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_pubresponse.h @@ -0,0 +1,30 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_MQTT_PUBRESPONSE_H +#define AQH_MSG_MQTT_PUBRESPONSE_H + + +#include +#include + +#include +#include + + + +/** + * Use for PUBACK, PUBREC, PUBREL and PUBCOMP. + */ +AQHOME_API GWEN_MSG *GWEN_PubResponseMqttMsg_new(uint8_t typeAndFlags, uint16_t pkgId); +AQHOME_API void AQH_PubResponseMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); + + + + +#endif