/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "aqhome/mqtt/endpoint_mqttc_p.h" #include "aqhome/mqtt/msg_mqtt.h" #include #include #define GWEN_MSG_ENDPOINT_MQTTC_NAME "mqttc" #define GWEN_ENDPOINT_MQTTC_BUFFERSIZE 1024 GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC) static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _isMsgComplete(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg); static int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg); GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const char *name, int groupId) { GWEN_MSG_ENDPOINT *ep; AQH_ENDPOINT_MQTTC *xep; ep=GWEN_TcpcEndpoint_new(host, port, name?name:GWEN_MSG_ENDPOINT_MQTTC_NAME, groupId); if (ep==NULL) { DBG_INFO(AQH_LOGDOMAIN, "here"); return NULL; } GWEN_NEW_OBJECT(AQH_ENDPOINT_MQTTC, xep); GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep, xep, _freeData); GWEN_MsgEndpoint_SetDefaultBufferSize(ep, GWEN_ENDPOINT_MQTTC_BUFFERSIZE); GWEN_MsgEndpoint_SetIsMsgCompleteFn(ep, _isMsgComplete); return ep; } void _freeData(void *bp, void *p) { AQH_ENDPOINT_MQTTC *xep; xep=(AQH_ENDPOINT_MQTTC*) p; GWEN_FREE_OBJECT(xep); } int _isMsgComplete(GWEN_UNUSED GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg) { int rv; if (!(GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET)) { rv=_calcAndSetPayloadSizeAndOffset(msg); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } } rv=AQH_MqttMsg_IsMsgComplete(msg); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } return rv; } int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg) { int remainingBytesInBuffer; uint32_t mqttRemainingLength=0; remainingBytesInBuffer=GWEN_Msg_GetBytesInBuffer(msg); if (remainingBytesInBuffer>1) { const uint8_t *ptr; int idx; ptr=GWEN_Msg_GetConstBuffer(msg); idx=1; remainingBytesInBuffer--; while(remainingBytesInBuffer) { uint8_t len; len=ptr[idx]; mqttRemainingLength<<=8; mqttRemainingLength+=(len & 0x7f); if (!(len & 0x80)) { /* last byte of size */ GWEN_Msg_SetParsedPayloadSize(msg, mqttRemainingLength); GWEN_Msg_SetParsedPayloadOffset(msg, idx+1); /* payload follows at next byte */ GWEN_Msg_AddFlags(msg, GWEN_MSG_FLAGS_PAYLOADINFO_SET); return 1; /* size successfully determined */ } } } return 0; }