From d766a3635a4a636b5792df9c7c5a1cf80f1bbde1 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Sat, 8 Jul 2023 01:58:43 +0200 Subject: [PATCH] mqtt: started working on 2nd generation msgio implementation. --- aqhome/mqtt/0BUILD | 2 + aqhome/mqtt/endpoint2_mqtt.c | 107 +++++++++++++++++++++++++++++++++++ aqhome/mqtt/endpoint2_mqtt.h | 40 +++++++++++++ 3 files changed, 149 insertions(+) create mode 100644 aqhome/mqtt/endpoint2_mqtt.c create mode 100644 aqhome/mqtt/endpoint2_mqtt.h diff --git a/aqhome/mqtt/0BUILD b/aqhome/mqtt/0BUILD index acca051..70d304b 100644 --- a/aqhome/mqtt/0BUILD +++ b/aqhome/mqtt/0BUILD @@ -46,6 +46,7 @@ endpoint_mqttc.h + endpoint2_mqtt.h msg_mqtt.h msg_mqtt_connect.h msg_mqtt_connack.h @@ -65,6 +66,7 @@ $(local/typefiles) endpoint_mqttc.c + endpoint2_mqtt.c msg_mqtt.c msg_mqtt_connect.c msg_mqtt_connack.c diff --git a/aqhome/mqtt/endpoint2_mqtt.c b/aqhome/mqtt/endpoint2_mqtt.c new file mode 100644 index 0000000..7020fa2 --- /dev/null +++ b/aqhome/mqtt/endpoint2_mqtt.c @@ -0,0 +1,107 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./endpoint2_mqtt.h" + +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static int _getBytesNeededForMessage(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg); +static int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + + +void AQH_MqttEndpoint2_Extend(GWEN_MSG_ENDPOINT2 *ep) +{ + if (ep) { + GWEN_MsgIoEndpoint2_SetGetNeededBytesFn(ep, _getBytesNeededForMessage); + } +} + + + +int _getBytesNeededForMessage(GWEN_UNUSED GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg) +{ + uint32_t bytesInMsg; + + bytesInMsg=GWEN_Msg_GetBytesInBuffer(msg); + if (bytesInMsg<2) { + DBG_INFO(AQH_LOGDOMAIN, "Header not yet complete"); + return (int) (2-bytesInMsg); + } + if (!(GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET)) + _calcAndSetPayloadSizeAndOffset(msg); + + if (GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET) { + uint32_t msgLength; + + msgLength=GWEN_Msg_GetParsedPayloadOffset(msg)+GWEN_Msg_GetParsedPayloadSize(msg); + return (int)(msgLength-bytesInMsg); + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Size field not complete, requesting another byte"); + return 1; + } +} + + + +int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg) +{ + int remainingBytesInBuffer; + uint32_t mqttRemainingLength=0; + + remainingBytesInBuffer=GWEN_Msg_GetBytesInBuffer(msg); + if (remainingBytesInBuffer>1) { + const uint8_t *ptr; + int idx; + int shift=0; + + ptr=GWEN_Msg_GetConstBuffer(msg); + idx=1; + remainingBytesInBuffer--; + while(remainingBytesInBuffer) { + uint8_t len; + + len=ptr[idx]; + mqttRemainingLength+=(len & 0x7f)< + +#include + + +#ifdef __cplusplus +extern "C" { +#endif + + +/** + * Extends the given endpoint to support MQTT messages. It expects the function GWEN_MsgIoEndpoint2_Extend() to have been called + * beforehand. + */ +AQHOME_API void AQH_MqttEndpoint2_Extend(GWEN_MSG_ENDPOINT2 *ep); + + + + + +#ifdef __cplusplus +} +#endif + + +#endif + +