diff --git a/aqhome/0BUILD b/aqhome/0BUILD index 08d6eaf..72785e0 100644 --- a/aqhome/0BUILD +++ b/aqhome/0BUILD @@ -60,6 +60,7 @@ msg ipc nodes + mqtt @@ -67,6 +68,7 @@ aqhmsg aqhipc aqhnodes + aqhmqtt diff --git a/aqhome/libtest.c b/aqhome/libtest.c index 320ff96..96c4cd9 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -9,6 +9,8 @@ #include "aqhome/msg/endpoint_log.h" #include "aqhome/msg/endpoint_tty.h" #include "aqhome/ipc/endpoint_node_ipc_tcp.h" +#include "aqhome/mqtt/endpoint_mqttc.h" +#include "aqhome/mqtt/msg_mqtt_connect.h" #include "aqhome/msgmanager.h" #include "aqhome/aqhome.h" @@ -17,7 +19,8 @@ #include #include #include -#include +#include +#include #include #include @@ -120,7 +123,7 @@ int testEndpoints() } GWEN_MsgEndpointMgr_AddEndpoint(emgr, epLog); - epTcp=AQH_TcpIpcNodeEndpoint_new(NULL, "127.0.0.1", 45454, AQH_MSGMGR_ENDPOINTGROUP_IPC); + epTcp=AQH_TcpdIpcNodeEndpoint_new("127.0.0.1", 45454, NULL, AQH_MSGMGR_ENDPOINTGROUP_IPC); if (epTcp==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint TCP"); return 2; @@ -136,11 +139,92 @@ int testEndpoints() +int testMsgMqttConnect() +{ + GWEN_MSG *msg; + GWEN_BUFFER *dbuf; + + msg=GWEN_ConnectMqttMsg_new("MQTT", 4, + AQH_MQTTMSG_CONNECT_FLAGS_USERNAME | AQH_MQTTMSG_CONNECT_FLAGS_PASSWD, + 512, + "CLIENTID123", + "USER123", + "PASSWD123"); + if (msg==NULL) { + DBG_ERROR(NULL, "Error creating message"); + return 2; + } + dbuf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_ConnectMqttMsg_DumpToBuffer(msg, dbuf, "created"); + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(dbuf)); + GWEN_Buffer_free(dbuf); + + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); + + return 0; +} + + + +int testMqttConnection() +{ + int rv; + GWEN_MSG_ENDPOINT_MGR *emgr; + GWEN_MSG_ENDPOINT *epTcp; + GWEN_MSG *msgOut; + + rv=AQH_Init(); + if (rv<0) { + } + + //emgr=AQH_MsgManager_new(0xc0); + emgr=GWEN_MsgEndpointMgr_new(); + + epTcp=AQH_MqttClientEndpoint_new("127.0.0.1", 1883, "MQTTClient", 0); + if (epTcp==NULL) { + DBG_ERROR(NULL, "Error creating endpoint TCPc"); + return 2; + } + GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTcp); + rv=GWEN_TcpcEndpoint_StartConnect(epTcp); + if (rv<0) { + DBG_ERROR(NULL, "Error starting connect (%d)", rv); + return 2; + } + + msgOut=GWEN_ConnectMqttMsg_new("MQTT", 4, 0, 10, "CLIENTID123", NULL, NULL); + if (msgOut==NULL) { + DBG_ERROR(NULL, "Error creating message"); + return 2; + } + GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); + + for (;;) { + GWEN_MSG *msg; + + DBG_DEBUG(AQH_LOGDOMAIN, "Next loop"); + //GWEN_MsgManager_LoopOnce(emgr); + GWEN_MsgEndpointMgr_IoLoopOnce(emgr); + msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp); + if (msg) { + DBG_ERROR(NULL, "Received this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); + GWEN_Msg_free(msg); + break; + } + } + return 0; +} + + + int main(int argc, char **argv) { - return testEndpoints(); + //return testEndpoints(); + //return testMsgMqttConnect(); + return testMqttConnection(); } diff --git a/aqhome/mqtt/0BUILD b/aqhome/mqtt/0BUILD new file mode 100644 index 0000000..123680b --- /dev/null +++ b/aqhome/mqtt/0BUILD @@ -0,0 +1,82 @@ + + + + + + + + $(gwenhywfar_cflags) + -I$(topsrcdir) + -I$(topbuilddir) + + + + --include=$(builddir) + --include=$(srcdir) + + + + + + $(visibility_cflags) + + + + --api=AQHOME_API + + + + + + + + + + + + + + + + + + $(local/built_headers_pub) + + + + + endpoint_mqttc.h + msg_mqtt.h + msg_mqtt_connect.h + + + + + + + + + $(local/typefiles) + + endpoint_mqttc.c + msg_mqtt.c + msg_mqtt_connect.c + + + + + + + + + + + + + + + + + + + diff --git a/aqhome/mqtt/endpoint_mqttc.c b/aqhome/mqtt/endpoint_mqttc.c new file mode 100644 index 0000000..2fa3b6e --- /dev/null +++ b/aqhome/mqtt/endpoint_mqttc.c @@ -0,0 +1,121 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/mqtt/endpoint_mqttc_p.h" +#include "aqhome/mqtt/msg_mqtt.h" + +#include +#include + + + +#define GWEN_MSG_ENDPOINT_MQTTC_NAME "mqttc" +#define GWEN_ENDPOINT_MQTTC_BUFFERSIZE 1024 + + +GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC) + + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); +static int _isMsgComplete(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg); +static int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg); + + + + +GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const char *name, int groupId) +{ + GWEN_MSG_ENDPOINT *ep; + AQH_ENDPOINT_MQTTC *xep; + + ep=GWEN_TcpcEndpoint_new(host, port, name?name:GWEN_MSG_ENDPOINT_MQTTC_NAME, groupId); + if (ep==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "here"); + return NULL; + } + GWEN_NEW_OBJECT(AQH_ENDPOINT_MQTTC, xep); + GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep, xep, _freeData); + + GWEN_MsgEndpoint_SetDefaultBufferSize(ep, GWEN_ENDPOINT_MQTTC_BUFFERSIZE); + GWEN_MsgEndpoint_SetIsMsgCompleteFn(ep, _isMsgComplete); + + return ep; +} + + + +void _freeData(void *bp, void *p) +{ + AQH_ENDPOINT_MQTTC *xep; + + xep=(AQH_ENDPOINT_MQTTC*) p; + GWEN_FREE_OBJECT(xep); +} + + + +int _isMsgComplete(GWEN_UNUSED GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg) +{ + int rv; + + if (!(GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET)) { + rv=_calcAndSetPayloadSizeAndOffset(msg); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + } + rv=AQH_MqttMsg_IsMsgComplete(msg); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + + return rv; +} + + + +int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg) +{ + int remainingBytesInBuffer; + uint32_t mqttRemainingLength=0; + + remainingBytesInBuffer=GWEN_Msg_GetBytesInBuffer(msg); + if (remainingBytesInBuffer>1) { + const uint8_t *ptr; + int idx; + + ptr=GWEN_Msg_GetConstBuffer(msg); + idx=1; + remainingBytesInBuffer--; + while(remainingBytesInBuffer) { + uint8_t len; + + len=ptr[idx]; + mqttRemainingLength<<=8; + mqttRemainingLength+=(len & 0x7f); + if (!(len & 0x80)) { + /* last byte of size */ + GWEN_Msg_SetParsedPayloadSize(msg, mqttRemainingLength); + GWEN_Msg_SetParsedPayloadOffset(msg, idx+1); /* payload follows at next byte */ + GWEN_Msg_AddFlags(msg, GWEN_MSG_FLAGS_PAYLOADINFO_SET); + return 1; /* size successfully determined */ + } + } + } + return 0; +} + + + diff --git a/aqhome/mqtt/endpoint_mqttc.h b/aqhome/mqtt/endpoint_mqttc.h new file mode 100644 index 0000000..6d65641 --- /dev/null +++ b/aqhome/mqtt/endpoint_mqttc.h @@ -0,0 +1,25 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_ENDPOINT_MQTTC_H +#define AQH_ENDPOINT_MQTTC_H + + +#include + +#include + + + +AQHOME_API GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const char *name, int groupId); + + + + +#endif + diff --git a/aqhome/mqtt/endpoint_mqttc_p.h b/aqhome/mqtt/endpoint_mqttc_p.h new file mode 100644 index 0000000..9d7b341 --- /dev/null +++ b/aqhome/mqtt/endpoint_mqttc_p.h @@ -0,0 +1,26 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_ENDPOINT_MQTTC_P_H +#define AQH_ENDPOINT_MQTTC_P_H + + +#include "aqhome/mqtt/endpoint_mqttc.h" + + +typedef struct AQH_ENDPOINT_MQTTC AQH_ENDPOINT_MQTTC; +struct AQH_ENDPOINT_MQTTC { + int dummy; +}; + + + + + +#endif + diff --git a/aqhome/mqtt/msg_mqtt.c b/aqhome/mqtt/msg_mqtt.c new file mode 100644 index 0000000..7022cea --- /dev/null +++ b/aqhome/mqtt/msg_mqtt.c @@ -0,0 +1,153 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/mqtt/msg_mqtt.h" + +#include + +// maximum remaining length: fff ffff + + +static void _flagsToDb(uint8_t flags, GWEN_DB_NODE *dbDest, const char *varNameFlags, const char *varNameQos); + + + + +GWEN_MSG *GWEN_MqttMsg_new(uint8_t typeAndFlags, uint32_t payloadLen, const uint8_t *payload) +{ + if (payloadLen>0xfffffffu) { + DBG_ERROR(AQH_LOGDOMAIN, "Too many bytes in payload, can't encode into MQTT message"); + return NULL; + } + else { + GWEN_MSG *msg; + uint8_t *ptr; + uint32_t i; + uint32_t len; + + msg=GWEN_Msg_new(payloadLen+1+4); + ptr=GWEN_Msg_GetBuffer(msg); + *(ptr++)=typeAndFlags; + len=payloadLen; + i=0; + + do { + uint8_t b; + + b=len & 0x7f; + len>>=7; + if (len) + b|=0x80; + *(ptr++)=b; + } while(len && i<4); + + if (payloadLen) { + GWEN_Msg_SetParsedPayloadSize(msg, payloadLen); + GWEN_Msg_SetParsedPayloadOffset(msg, ptr-GWEN_Msg_GetBuffer(msg)); + memmove(ptr, payload, payloadLen); + ptr+=payloadLen; + } + i=ptr-GWEN_Msg_GetBuffer(msg); + GWEN_Msg_SetBytesInBuffer(msg, i); + + return msg; + } +} + + + +int AQH_MqttMsg_IsMsgComplete(const GWEN_MSG *msg) +{ + if (msg && (GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET)) { + uint32_t msgLength; + + msgLength=GWEN_Msg_GetParsedPayloadOffset(msg)+GWEN_Msg_GetParsedPayloadSize(msg); + if (GWEN_Msg_GetBytesInBuffer(msg)>=msgLength) + return 1; + } + return 0; +} + + + +void AQH_MqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) +{ +} + + + +int AQH_MqttMsg_toDb(const GWEN_MSG *msg, GWEN_DB_NODE *dbDest) +{ + const uint8_t *msgPtr; + uint32_t msgLen; + uint8_t b; + const char *s; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + if (msgLen<2) { + DBG_ERROR(AQH_LOGDOMAIN, "Message too small (%d bytes)", msgLen); + return GWEN_ERROR_BAD_DATA; + } + b=msgPtr[0]; + s=AQH_MqttMsg_MsgTypeToString(b); + GWEN_DB_SetCharValue(dbDest, GWEN_DB_FLAGS_OVERWRITE_VARS, "cmdString", s); + GWEN_DB_SetIntValue(dbDest, GWEN_DB_FLAGS_OVERWRITE_VARS, "cmdAndFlags", b); + + _flagsToDb(b & 0x0f, dbDest, "flags", "qos"); + return 0; +} + + + +const char *AQH_MqttMsg_MsgTypeToString(uint8_t t) +{ + switch(t & 0xf0) { + case (AQH_MQTTMSG_MSGTYPE_CONNECT & 0xf0): return "CONNECT"; + case (AQH_MQTTMSG_MSGTYPE_CONNACK & 0xf0): return "CONACK"; + case (AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0): return "PUBLISH"; + case (AQH_MQTTMSG_MSGTYPE_PUBACK & 0xf0): return "PUBACK"; + case (AQH_MQTTMSG_MSGTYPE_PUBREC & 0xf0): return "PUBREC"; + case (AQH_MQTTMSG_MSGTYPE_PUBREL & 0xf0): return "PUBREL"; + case (AQH_MQTTMSG_MSGTYPE_PUBCOMP & 0xf0): return "PUBCOMP"; + case (AQH_MQTTMSG_MSGTYPE_SUBSCRIBE & 0xf0): return "SUBSCRIBE"; + case (AQH_MQTTMSG_MSGTYPE_SUBACK & 0xf0): return "SUBACK"; + case (AQH_MQTTMSG_MSGTYPE_UNSUBSCRIBE & 0xf0): return "UNSUBSCRIBE"; + case (AQH_MQTTMSG_MSGTYPE_UNSUBACK & 0xf0): return "UNSUBACK"; + case (AQH_MQTTMSG_MSGTYPE_PINGREQ & 0xf0): return "PINGREQ"; + case (AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0): return "PINGRESP"; + case (AQH_MQTTMSG_MSGTYPE_DISCONNECT & 0xf0): return "DISCONNECT"; + default: return "(unknown)"; + } +} + + + +void _flagsToDb(uint8_t flags, GWEN_DB_NODE *dbDest, const char *varNameFlags, const char *varNameQos) +{ + GWEN_DB_DeleteVar(dbDest, varNameFlags); + GWEN_DB_DeleteVar(dbDest, varNameQos); + if (flags & AQH_MQTTMSG_FLAGS_DUP) + GWEN_DB_SetCharValue(dbDest, GWEN_DB_FLAGS_DEFAULT, varNameFlags, "dup"); + if (flags & AQH_MQTTMSG_FLAGS_RETAIN) + GWEN_DB_SetCharValue(dbDest, GWEN_DB_FLAGS_DEFAULT, varNameFlags, "retain"); + + GWEN_DB_SetIntValue(dbDest, GWEN_DB_FLAGS_DEFAULT, varNameQos, (flags>>1) & 0x3); + +} + + + + + + diff --git a/aqhome/mqtt/msg_mqtt.h b/aqhome/mqtt/msg_mqtt.h new file mode 100644 index 0000000..bacd63d --- /dev/null +++ b/aqhome/mqtt/msg_mqtt.h @@ -0,0 +1,59 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_MQTT_H +#define AQH_MSG_MQTT_H + + +#include + +#include +#include +#include + + + +#define AQH_MQTTMSG_OFFS_CONTROL 0 +#define AQH_MQTTMSG_OFFS_REMAINING_LENGTH 1 + + +/* from https://docs.solace.com/API/MQTT-311-Prtl-Conformance-Spec/MQTT%20Control%20Packet%20format.htm + */ +#define AQH_MQTTMSG_MSGTYPE_CONNECT 0x10u +#define AQH_MQTTMSG_MSGTYPE_CONNACK 0x20u +#define AQH_MQTTMSG_MSGTYPE_PUBLISH 0x30u +#define AQH_MQTTMSG_MSGTYPE_PUBACK 0x40u +#define AQH_MQTTMSG_MSGTYPE_PUBREC 0x50u /* assured delivery part 1 */ +#define AQH_MQTTMSG_MSGTYPE_PUBREL 0x62u /* assured delivery part 2 */ +#define AQH_MQTTMSG_MSGTYPE_PUBCOMP 0x70u /* assured delivery part 3 */ +#define AQH_MQTTMSG_MSGTYPE_SUBSCRIBE 0x82u +#define AQH_MQTTMSG_MSGTYPE_SUBACK 0x90u +#define AQH_MQTTMSG_MSGTYPE_UNSUBSCRIBE 0xa2u +#define AQH_MQTTMSG_MSGTYPE_UNSUBACK 0xb0u +#define AQH_MQTTMSG_MSGTYPE_PINGREQ 0xc0u +#define AQH_MQTTMSG_MSGTYPE_PINGRESP 0xd0u +#define AQH_MQTTMSG_MSGTYPE_DISCONNECT 0xe0u + + +#define AQH_MQTTMSG_FLAGS_DUP 0x08u +#define AQH_MQTTMSG_FLAGS_QOS2 0x04u +#define AQH_MQTTMSG_FLAGS_QOS1 0x02u +#define AQH_MQTTMSG_FLAGS_RETAIN 0x01u + + + +AQHOME_API GWEN_MSG *GWEN_MqttMsg_new(uint8_t typeAndFlags, uint32_t payloadLen, const uint8_t *payload); + +AQHOME_API int AQH_MqttMsg_IsMsgComplete(const GWEN_MSG *msg); +AQHOME_API void AQH_MqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); + +AQHOME_API const char *AQH_MqttMsg_MsgTypeToString(uint8_t t); + + + +#endif diff --git a/aqhome/mqtt/msg_mqtt_connect.c b/aqhome/mqtt/msg_mqtt_connect.c new file mode 100644 index 0000000..09b3505 --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_connect.c @@ -0,0 +1,223 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/mqtt/msg_mqtt_connect.h" + +#include + + + +static int _dumpPayload(const uint8_t *payloadPtr, uint32_t payloadLen, GWEN_BUFFER *dbuf); +static void _appendStringWithLen(GWEN_BUFFER *buf, const char *s); +static int _dumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf); + + + + + +GWEN_MSG *GWEN_ConnectMqttMsg_new(const char *protoName, + uint8_t protoLevel, + uint8_t connectFlags, + uint16_t keepAliveTime, + const char *clientId, + const char *userName, + const char *password) +{ + GWEN_MSG *msg; + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + _appendStringWithLen(buf, protoName?protoName:"MQTT"); + GWEN_Buffer_AppendByte(buf, protoLevel?protoLevel:4); + GWEN_Buffer_AppendByte(buf, connectFlags); + GWEN_Buffer_AppendByte(buf, (keepAliveTime>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, keepAliveTime & 0xff); + + _appendStringWithLen(buf, clientId); + /* here could be inserted: will topic, will message */ + if (connectFlags & AQH_MQTTMSG_CONNECT_FLAGS_USERNAME) + _appendStringWithLen(buf, userName); + if (connectFlags & AQH_MQTTMSG_CONNECT_FLAGS_PASSWD) + _appendStringWithLen(buf, password); + + msg=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_CONNECT, GWEN_Buffer_GetUsedBytes(buf), (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + if (msg==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "here"); + return NULL; + } + + return msg; +} + + + +void AQH_ConnectMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) +{ + const uint8_t *msgPtr; + uint32_t msgLen; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + + if (msgLen>1) { + uint32_t payloadLen; + const uint8_t *payloadPtr; + int rv; + + payloadLen=GWEN_Msg_GetParsedPayloadSize(msg); + payloadPtr=msgPtr+GWEN_Msg_GetParsedPayloadOffset(msg); + + GWEN_Buffer_AppendArgs(dbuf, "%s %s", AQH_MqttMsg_MsgTypeToString(msgPtr[0] & 0xf0), sText); + + GWEN_Buffer_AppendString(dbuf, "("); + rv=_dumpPayload(payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + } + else { + GWEN_Buffer_AppendString(dbuf, ")"); + } + } +} + + + +int _dumpPayload(const uint8_t *payloadPtr, uint32_t payloadLen, GWEN_BUFFER *dbuf) +{ + int rv; + uint8_t connectFlags; + uint16_t keepAliveTime; + + GWEN_Buffer_AppendString(dbuf, "proto: "); + rv=_dumpString(payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + payloadLen-=rv; + payloadPtr+=rv; + if (payloadLen<4) { /* expect at least proto ver(1), flags(1) and keepAlive(2) bytes */ + DBG_ERROR(AQH_LOGDOMAIN, "Msg too small"); + return GWEN_ERROR_BAD_DATA; + } + else { + GWEN_Buffer_AppendArgs(dbuf, " proto ver: %d", payloadPtr[0]); + payloadLen--; + payloadPtr++; + + connectFlags=payloadPtr[0]; + GWEN_Buffer_AppendArgs(dbuf, " flags: %02x", connectFlags); + payloadLen--; + payloadPtr++; + + keepAliveTime=(payloadPtr[0]<<8)+payloadPtr[1]; + GWEN_Buffer_AppendArgs(dbuf, " keep alive: %d", keepAliveTime); + payloadLen-=2; + payloadPtr+=2; + } + + GWEN_Buffer_AppendString(dbuf, " client id: "); + rv=_dumpString(payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + payloadLen-=rv; + payloadPtr+=rv; + + if (connectFlags & AQH_MQTTMSG_CONNECT_FLAGS_WILL_FLAG) { + GWEN_Buffer_AppendString(dbuf, " will topic: "); + rv=_dumpString(payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + payloadLen-=rv; + payloadPtr+=rv; + + GWEN_Buffer_AppendString(dbuf, " will msg: "); + rv=_dumpString(payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + payloadLen-=rv; + payloadPtr+=rv; + } + + if (connectFlags & AQH_MQTTMSG_CONNECT_FLAGS_USERNAME) { + GWEN_Buffer_AppendString(dbuf, " username: "); + rv=_dumpString(payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + payloadLen-=rv; + payloadPtr+=rv; + } + if (connectFlags & AQH_MQTTMSG_CONNECT_FLAGS_PASSWD) { + GWEN_Buffer_AppendString(dbuf, " password: "); + rv=_dumpString(payloadPtr, payloadLen, dbuf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + payloadLen-=rv; + payloadPtr+=rv; + } + + return 0; +} + + + +void _appendStringWithLen(GWEN_BUFFER *buf, const char *s) +{ + unsigned int len; + + len=strlen(s); + GWEN_Buffer_AppendByte(buf, (len>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, len & 0xff); + if (s && *s) + GWEN_Buffer_AppendString(buf, s); +} + + + +int _dumpString(const uint8_t *ptr, uint32_t len, GWEN_BUFFER *buf) +{ + if (len>1) { + int slen; + + slen=(ptr[0]<<8)+ptr[1]; + if (slen) { + if (slen>(len-2)) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", + (unsigned long int) slen, (unsigned long int) len); + return GWEN_ERROR_BAD_DATA; + } + GWEN_Buffer_AppendBytes(buf, (const char*) ptr+2, slen); + } + return slen+2; + } + return GWEN_ERROR_BAD_DATA; +} + + + + + + + + diff --git a/aqhome/mqtt/msg_mqtt_connect.h b/aqhome/mqtt/msg_mqtt_connect.h new file mode 100644 index 0000000..dd56f7f --- /dev/null +++ b/aqhome/mqtt/msg_mqtt_connect.h @@ -0,0 +1,53 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_MQTT_CONNECT_H +#define AQH_MSG_MQTT_CONNECT_H + + +#include +#include + +#include +#include + + + +#define AQH_MQTTMSG_CONNECT_FLAGS_USERNAME 0x80u +#define AQH_MQTTMSG_CONNECT_FLAGS_PASSWD 0x40u +#define AQH_MQTTMSG_CONNECT_FLAGS_WILL_RETAIN 0x20u +#define AQH_MQTTMSG_CONNECT_FLAGS_WILL_QOS2 0x10u +#define AQH_MQTTMSG_CONNECT_FLAGS_WILL_QOS1 0x08u +#define AQH_MQTTMSG_CONNECT_FLAGS_WILL_FLAG 0x04u +#define AQH_MQTTMSG_CONNECT_FLAGS_CLEAN_SESSION 0x01u + + + + +/** + * order in payload: + * - client id + * - will topic + * - will message + * - user name + * - password + */ +AQHOME_API GWEN_MSG *GWEN_ConnectMqttMsg_new(const char *protoName, /* "MQTT" */ + uint8_t protoLevel, /* 0x04 for MQTT 3.1.1 */ + uint8_t connectFlags, + uint16_t keepAliveTime, + const char *clientId, + const char *userName, + const char *password); + +AQHOME_API void AQH_ConnectMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); + + + + +#endif