From 1751170940c8a0db6dca7afca663fdabc8f7754f Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Sun, 14 May 2023 22:24:55 +0200 Subject: [PATCH] aqhome/mqtt: added messages regarding subscription. --- aqhome/libtest.c | 170 +++++++++++++++++++++++++------ aqhome/mqtt/0BUILD | 4 + aqhome/mqtt/endpoint_mqttc.c | 29 +++--- aqhome/mqtt/msg_mqtt.c | 34 +++++++ aqhome/mqtt/msg_mqtt.h | 5 + aqhome/mqtt/msg_mqtt_publish.c | 42 +------- aqhome/mqtt/msg_mqtt_suback.c | 68 +++++++++++++ aqhome/mqtt/msg_mqtt_suback.h | 27 +++++ aqhome/mqtt/msg_mqtt_subscribe.c | 127 +++++++++++++++++++++++ aqhome/mqtt/msg_mqtt_subscribe.h | 27 +++++ 10 files changed, 444 insertions(+), 89 deletions(-) create mode 100644 aqhome/mqtt/msg_mqtt_suback.c create mode 100644 aqhome/mqtt/msg_mqtt_suback.h create mode 100644 aqhome/mqtt/msg_mqtt_subscribe.c create mode 100644 aqhome/mqtt/msg_mqtt_subscribe.h diff --git a/aqhome/libtest.c b/aqhome/libtest.c index 0163cb8..3cac040 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -18,6 +18,8 @@ #include "aqhome/mqtt/msg_mqtt_connack.h" #include "aqhome/mqtt/msg_mqtt_publish.h" #include "aqhome/mqtt/msg_mqtt_pubresponse.h" +#include "aqhome/mqtt/msg_mqtt_subscribe.h" +#include "aqhome/mqtt/msg_mqtt_suback.h" #include "aqhome/msgmanager.h" #include "aqhome/hexfile/hexfile.h" #include "aqhome/hexfile/flashrecord.h" @@ -30,11 +32,16 @@ #include #include #include +#include #include #include +static int _mqttConnect(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp); +static GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType); + + GWEN_MSG *createPingMsg(uint8_t destAddr, uint8_t srcAddr) { @@ -186,7 +193,6 @@ int testMqttConnection() if (rv<0) { } - //emgr=AQH_MsgManager_new(0xc0); emgr=GWEN_MsgEndpointMgr_new(); epTcp=AQH_MqttClientEndpoint_new("127.0.0.1", 1883, "MQTTClient", 0); @@ -196,39 +202,11 @@ int testMqttConnection() } GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTcp); - fprintf(stdout, "Sending CONNECT\n"); - msgOut=GWEN_ConnectMqttMsg_new("MQTT", 4, 0, 10, "CLIENTID123", NULL, NULL); - if (msgOut==NULL) { - DBG_ERROR(NULL, "Error creating message"); + rv=_mqttConnect(emgr, epTcp); + if (rv!=0) { + DBG_ERROR(NULL, "Error connecting"); return 2; } - GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); - - fprintf(stdout, "Waiting for response\n"); - for (;;) { - GWEN_MSG *msg; - - DBG_DEBUG(AQH_LOGDOMAIN, "Next loop"); - //GWEN_MsgManager_LoopOnce(emgr); - GWEN_MsgEndpointMgr_IoLoopOnce(emgr); - msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp); - if (msg) { - if (AQH_MqttMsg_GetMsgTypeAndFlags(msg)==AQH_MQTTMSG_MSGTYPE_CONNACK) { - GWEN_BUFFER *buf; - - buf=GWEN_Buffer_new(0, 256, 0, 1); - AQH_ConnAckMqttMsg_DumpToBuffer(msg, buf, "received"); - fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); - GWEN_Buffer_free(buf); - } - else { - DBG_ERROR(NULL, "Received this message:"); - GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); - } - GWEN_Msg_free(msg); - break; - } - } fprintf(stdout, "Sending PUBLISH\n"); //msgOut=GWEN_PublishMqttMsg_new(AQH_MQTTMSG_FLAGS_QOS1, 1, "test/subject1", (const uint8_t*) "29.9", 4); @@ -271,6 +249,131 @@ int testMqttConnection() +int testMqttSubscribe(int argc, char **argv) +{ + int rv; + GWEN_MSG_ENDPOINT_MGR *emgr; + GWEN_MSG_ENDPOINT *epTcp; + GWEN_MSG *msgOut; + GWEN_MSG *msgIn; + uint16_t pckId; + const char *host="127.0.0.1"; + + if (argc>1) + host=argv[1]; + + rv=AQH_Init(); + if (rv<0) { + } + + //emgr=AQH_MsgManager_new(0xc0); + emgr=GWEN_MsgEndpointMgr_new(); + + epTcp=AQH_MqttClientEndpoint_new(host, 1883, "MQTTClient", 0); + if (epTcp==NULL) { + DBG_ERROR(NULL, "Error creating endpoint TCPc"); + return 2; + } + AQH_MqttClientEndpoint_SetClientId(epTcp, "CLIENTID123"); + GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTcp); + + rv=_mqttConnect(emgr, epTcp); + if (rv!=0) { + DBG_ERROR(NULL, "Error connecting"); + return 2; + } + + fprintf(stdout, "Sending SUBSCRIBE\n"); + pckId=AQH_MqttClientEndpoint_GetNextPacketId(epTcp); + //msgOut=GWEN_PublishMqttMsg_new(AQH_MQTTMSG_FLAGS_QOS1, 1, "test/subject1", (const uint8_t*) "29.9", 4); + msgOut=GWEN_SubscribeMqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE, pckId, "aqhome/#", 0); + if (msgOut==NULL) { + DBG_ERROR(NULL, "Error creating message"); + return 2; + } + DBG_ERROR(NULL, "Sending this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msgOut), GWEN_Msg_GetBytesInBuffer(msgOut), 2); + + GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); + + fprintf(stdout, "Waiting for response\n"); + msgIn=_awaitPacket(emgr, epTcp, AQH_MQTTMSG_MSGTYPE_SUBACK); + if (msgIn) { + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_SubAckMqttMsg_DumpToBuffer(msgIn, buf, "received"); + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + GWEN_Msg_free(msgIn); + } + + for (;;) { + GWEN_MSG *msg; + + //GWEN_MsgManager_LoopOnce(emgr); + GWEN_MsgEndpointMgr_IoLoopOnce(emgr); + msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp); + if (msg) { + if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) { + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_PublishMqttMsg_DumpToBuffer(msg, buf, "received"); + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + } + else { + DBG_ERROR(NULL, "Received this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); + } + GWEN_Msg_free(msg); + } + } + + return 0; +} + + + +int _mqttConnect(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp) +{ + while(GWEN_ConnectableMsgEndpoint_GetState(epTcp) @@ -68,6 +70,8 @@ msg_mqtt_connack.c msg_mqtt_publish.c msg_mqtt_pubresponse.c + msg_mqtt_subscribe.c + msg_mqtt_suback.c diff --git a/aqhome/mqtt/endpoint_mqttc.c b/aqhome/mqtt/endpoint_mqttc.c index 50a6a2d..5a3b7fb 100644 --- a/aqhome/mqtt/endpoint_mqttc.c +++ b/aqhome/mqtt/endpoint_mqttc.c @@ -45,7 +45,6 @@ static void _processRecvStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); static void _publishDouble(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, double v); static void _publishInt(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, int v); static void _publishString(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, const char *v); -static const char *_valueTypeToString(int t); static int _isMsgComplete(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg); static int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg); @@ -318,11 +317,18 @@ void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) void _processValue2Message(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) { - _publishDouble(ep, - AQH_Value2Msg_GetUid(nodeMsg), - AQH_Value2Msg_GetValueId(nodeMsg), - _valueTypeToString(AQH_Value2Msg_GetValueId(nodeMsg)), - AQH_Value2Msg_GetValue(nodeMsg)); + if (AQH_Value2Msg_GetValueType(nodeMsg)==AQH_MSG_VALUE2_TYPE_DOOR) + _publishString(ep, + AQH_Value2Msg_GetUid(nodeMsg), + AQH_Value2Msg_GetValueId(nodeMsg), + AQH_Value2Msg_GetValueTypeName(nodeMsg), + AQH_Value2Msg_GetValueAsWindowStateString(nodeMsg)); + else + _publishDouble(ep, + AQH_Value2Msg_GetUid(nodeMsg), + AQH_Value2Msg_GetValueId(nodeMsg), + AQH_Value2Msg_GetValueTypeName(nodeMsg), + AQH_Value2Msg_GetValue(nodeMsg)); } @@ -437,17 +443,6 @@ void _publishString(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char -const char *_valueTypeToString(int t) -{ - switch(t) { - case AQH_MSG_VALUE2_TYPE_TEMP: return "temperature"; - case AQH_MSG_VALUE2_TYPE_HUMIDITY: return "humidity"; - default: return "unknown"; - } -} - - - int _isMsgComplete(GWEN_UNUSED GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg) { int rv; diff --git a/aqhome/mqtt/msg_mqtt.c b/aqhome/mqtt/msg_mqtt.c index daa8fcb..4f275b7 100644 --- a/aqhome/mqtt/msg_mqtt.c +++ b/aqhome/mqtt/msg_mqtt.c @@ -168,6 +168,40 @@ void _flagsToDb(uint8_t flags, GWEN_DB_NODE *dbDest, const char *varNameFlags, c +/* helper functions */ + + +void AQH_MqttMsg_AppendStringWithLen(GWEN_BUFFER *buf, const char *s) +{ + unsigned int len; + + len=strlen(s); + GWEN_Buffer_AppendByte(buf, (len>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, len & 0xff); + if (s && *s) + GWEN_Buffer_AppendString(buf, s); +} + + + +int AQH_MqttMsg_DumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf) +{ + if (len>1) { + int slen; + + slen=(ptr[0]<<8)+ptr[1]; + if (slen) { + if (slen>(len-2)) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", + (unsigned long int) slen, (unsigned long int) len); + return GWEN_ERROR_BAD_DATA; + } + GWEN_Buffer_AppendBytes(buf, (const char*) ptr+2, slen); + } + return slen+2; + } + return GWEN_ERROR_BAD_DATA; +} diff --git a/aqhome/mqtt/msg_mqtt.h b/aqhome/mqtt/msg_mqtt.h index 81ba619..e1faa27 100644 --- a/aqhome/mqtt/msg_mqtt.h +++ b/aqhome/mqtt/msg_mqtt.h @@ -57,5 +57,10 @@ AQHOME_API const char *AQH_MqttMsg_MsgTypeToString(uint8_t t); AQHOME_API uint8_t AQH_MqttMsg_GetMsgTypeAndFlags(const GWEN_MSG *msg); +/* helper functions */ +AQHOME_API void AQH_MqttMsg_AppendStringWithLen(GWEN_BUFFER *buf, const char *s); +AQHOME_API int AQH_MqttMsg_DumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf); + + #endif diff --git a/aqhome/mqtt/msg_mqtt_publish.c b/aqhome/mqtt/msg_mqtt_publish.c index aa6d66d..35e35c5 100644 --- a/aqhome/mqtt/msg_mqtt_publish.c +++ b/aqhome/mqtt/msg_mqtt_publish.c @@ -18,8 +18,6 @@ static int _dumpPayload(uint8_t flags, const uint8_t *payloadPtr, uint32_t payloadLen, GWEN_BUFFER *dbuf); -static void _appendStringWithLen(GWEN_BUFFER *buf, const char *s); -static int _dumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf); @@ -32,7 +30,7 @@ GWEN_MSG *GWEN_PublishMqttMsg_new(uint8_t flags, uint16_t packetId, const char * GWEN_BUFFER *buf; buf=GWEN_Buffer_new(0, 64, 0, 1); - _appendStringWithLen(buf, sTopic); + AQH_MqttMsg_AppendStringWithLen(buf, sTopic); if (flags & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { GWEN_Buffer_AppendByte(buf, (packetId>>8) & 0xff); GWEN_Buffer_AppendByte(buf, packetId & 0xff); @@ -100,7 +98,7 @@ int _dumpPayload(uint8_t flags, const uint8_t *payloadPtr, uint32_t payloadLen, GWEN_Buffer_AppendArgs(dbuf, " QOS%d", (flags>>1) & 0x03); GWEN_Buffer_AppendString(dbuf, " topic: "); - rv=_dumpString(payloadPtr, payloadLen, dbuf); + rv=AQH_MqttMsg_DumpString(payloadPtr, payloadLen, dbuf); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; @@ -121,7 +119,7 @@ int _dumpPayload(uint8_t flags, const uint8_t *payloadPtr, uint32_t payloadLen, } GWEN_Buffer_AppendString(dbuf, " message: "); - rv=_dumpString(payloadPtr, payloadLen, dbuf); + rv=AQH_MqttMsg_DumpString(payloadPtr, payloadLen, dbuf); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; @@ -134,37 +132,3 @@ int _dumpPayload(uint8_t flags, const uint8_t *payloadPtr, uint32_t payloadLen, -void _appendStringWithLen(GWEN_BUFFER *buf, const char *s) -{ - unsigned int len; - - len=strlen(s); - GWEN_Buffer_AppendByte(buf, (len>>8) & 0xff); - GWEN_Buffer_AppendByte(buf, len & 0xff); - if (s && *s) - GWEN_Buffer_AppendString(buf, s); -} - - - -int _dumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf) -{ - if (len>1) { - int slen; - - slen=(ptr[0]<<8)+ptr[1]; - if (slen) { - if (slen>(len-2)) { - DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", - (unsigned long int) slen, (unsigned long int) len); - return GWEN_ERROR_BAD_DATA; - } - GWEN_Buffer_AppendBytes(buf, (const char*) ptr+2, slen); - } - return slen+2; - } - return GWEN_ERROR_BAD_DATA; -} - - - diff --git a/aqhome/mqtt/msg_mqtt_suback.c b/aqhome/mqtt/msg_mqtt_suback.c new file mode 100644 index 0000000..fb2c180 --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_suback.c @@ -0,0 +1,68 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/mqtt/msg_mqtt_suback.h" + +#include + + + +GWEN_MSG *GWEN_SubAckMqttMsg_new(uint8_t flags, uint16_t packetId, uint8_t result) +{ + GWEN_MSG *msg; + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + if (flags & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + GWEN_Buffer_AppendByte(buf, (packetId>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, packetId & 0xff); + } + /* payload */ + GWEN_Buffer_AppendByte(buf, result); + + msg=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBACK | flags, GWEN_Buffer_GetUsedBytes(buf), (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + if (msg==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "here"); + return NULL; + } + return msg; +} + + + +void AQH_SubAckMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) +{ + const uint8_t *msgPtr; + uint32_t msgLen; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + + if (msgLen>1) { + const uint8_t *payloadPtr; + uint32_t payloadLen; + + GWEN_Buffer_AppendArgs(dbuf, "%s %s", AQH_MqttMsg_MsgTypeToString(msgPtr[0] & 0xf0), sText); + payloadLen=GWEN_Msg_GetParsedPayloadSize(msg); + payloadPtr=msgPtr+GWEN_Msg_GetParsedPayloadOffset(msg); + if (payloadLen>=1) + GWEN_Buffer_AppendArgs(dbuf, "(result=%d)", payloadPtr[0]); + } +} + + + + + + diff --git a/aqhome/mqtt/msg_mqtt_suback.h b/aqhome/mqtt/msg_mqtt_suback.h new file mode 100644 index 0000000..ecc4da1 --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_suback.h @@ -0,0 +1,27 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_MQTT_SUBACK_H +#define AQH_MSG_MQTT_SUBACK_H + + +#include +#include + +#include +#include + + + +AQHOME_API GWEN_MSG *GWEN_SubAckMqttMsg_new(uint8_t flags, uint16_t packetId, uint8_t result); +AQHOME_API void AQH_SubAckMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); + + + + +#endif diff --git a/aqhome/mqtt/msg_mqtt_subscribe.c b/aqhome/mqtt/msg_mqtt_subscribe.c new file mode 100644 index 0000000..78170f8 --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_subscribe.c @@ -0,0 +1,127 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/mqtt/msg_mqtt_subscribe.h" + +#include + + + +static int _dumpPayload(uint8_t flags, const uint8_t *payloadPtr, uint32_t payloadLen, GWEN_BUFFER *dbuf); + + + + + +GWEN_MSG *GWEN_SubscribeMqttMsg_new(uint8_t flags, uint16_t packetId, const char *sTopic, uint8_t requestedQos) +{ + if (sTopic && *sTopic) { + GWEN_MSG *msg; + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + GWEN_Buffer_AppendByte(buf, (packetId>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, packetId & 0xff); + + /* add topic filter / qos pair */ + AQH_MqttMsg_AppendStringWithLen(buf, sTopic); + GWEN_Buffer_AppendByte(buf, requestedQos); + + msg=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE | flags, + GWEN_Buffer_GetUsedBytes(buf), + (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + if (msg==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "here"); + return NULL; + } + return msg; + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Missing topic or message"); + return NULL; + } +} + + + +void AQH_SubscribeMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) +{ + const uint8_t *msgPtr; + uint32_t msgLen; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + + if (msgLen>1) { + uint32_t payloadLen; + const uint8_t *payloadPtr; + int rv; + + payloadLen=GWEN_Msg_GetParsedPayloadSize(msg); + payloadPtr=msgPtr+GWEN_Msg_GetParsedPayloadOffset(msg); + + GWEN_Buffer_AppendArgs(dbuf, "%s %s", AQH_MqttMsg_MsgTypeToString(msgPtr[0] & 0xf0), sText); + + GWEN_Buffer_AppendString(dbuf, "("); + rv=_dumpPayload((msgPtr[0] & 0x0f), payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + } + else { + GWEN_Buffer_AppendString(dbuf, ")"); + } + } +} + + + +int _dumpPayload(uint8_t flags, const uint8_t *payloadPtr, uint32_t payloadLen, GWEN_BUFFER *dbuf) +{ + int rv; + + GWEN_Buffer_AppendString(dbuf, "flags:"); + if (flags & AQH_MQTTMSG_FLAGS_DUP) + GWEN_Buffer_AppendString(dbuf, " DUP"); + if (flags & AQH_MQTTMSG_FLAGS_RETAIN) + GWEN_Buffer_AppendString(dbuf, " RETAIN"); + GWEN_Buffer_AppendArgs(dbuf, " QOS%d", (flags>>1) & 0x03); + + if (flags & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + if (payloadLen<2) { + DBG_ERROR(AQH_LOGDOMAIN, "Msg too small"); + return GWEN_ERROR_BAD_DATA; + } + else { + GWEN_Buffer_AppendArgs(dbuf, " packet id: %d", (payloadPtr[0]<<8)+payloadPtr[1]); + payloadLen-=2; + payloadPtr+=2; + } + } + + while(payloadLen) { + GWEN_Buffer_AppendString(dbuf, " topic: "); + rv=AQH_MqttMsg_DumpString(payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + payloadLen-=rv; + payloadPtr+=rv; + } + + return 0; +} + + + diff --git a/aqhome/mqtt/msg_mqtt_subscribe.h b/aqhome/mqtt/msg_mqtt_subscribe.h new file mode 100644 index 0000000..9ec42e0 --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_subscribe.h @@ -0,0 +1,27 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_MQTT_SUBSCRIBE_H +#define AQH_MSG_MQTT_SUBSCRIBE_H + + +#include +#include + +#include +#include + + + +AQHOME_API GWEN_MSG *GWEN_SubscribeMqttMsg_new(uint8_t flags, uint16_t packetId, const char *sTopic, uint8_t requestedQos); +AQHOME_API void AQH_SubscribeMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); + + + + +#endif