/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "aqhome/mqtt/endpoint2_mqtt_p.h" #include "aqhome/mqtt/msg_mqtt_connect.h" #include "aqhome/mqtt/msg_mqtt_publish.h" #include #include #include #define AQH_ENDPOINT2_MQTT_DEFAULT_KEEPALIVE 600 GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_ENDPOINT2_MQTT) /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _getBytesNeededForMessage(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg); static int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg); /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ void AQH_MqttEndpoint2_Extend(GWEN_MSG_ENDPOINT2 *ep) { if (ep) { AQH_ENDPOINT2_MQTT *xep; GWEN_NEW_OBJECT(AQH_ENDPOINT2_MQTT, xep); GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep, xep, _freeData); xep->keepAliveTime=AQH_ENDPOINT2_MQTT_DEFAULT_KEEPALIVE; GWEN_MsgIoEndpoint2_SetGetNeededBytesFn(ep, _getBytesNeededForMessage); } } void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) { AQH_ENDPOINT2_MQTT *xep; xep=(AQH_ENDPOINT2_MQTT*) p; free(xep->clientId); free(xep->topicPrefix); GWEN_FREE_OBJECT(xep); } const char *AQH_MqttEndpoint2_GetClientId(const GWEN_MSG_ENDPOINT2 *ep) { if (ep) { AQH_ENDPOINT2_MQTT *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); if (xep) { return xep->clientId; } } return NULL; } void AQH_MqttEndpoint2_SetClientId(GWEN_MSG_ENDPOINT2 *ep, const char *s) { if (ep) { AQH_ENDPOINT2_MQTT *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); if (xep) { free(xep->clientId); xep->clientId=s?strdup(s):NULL; } } } const char *AQH_MqttEndpoint2_GetTopicPrefix(const GWEN_MSG_ENDPOINT2 *ep) { if (ep) { AQH_ENDPOINT2_MQTT *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); if (xep) { return xep->topicPrefix; } } return NULL; } void AQH_MqttEndpoint2_SetTopicPrefix(GWEN_MSG_ENDPOINT2 *ep, const char *s) { if (ep) { AQH_ENDPOINT2_MQTT *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); if (xep) { free(xep->topicPrefix); xep->topicPrefix=s?strdup(s):NULL; } } } uint16_t AQH_MqttEndpoint2_GetNextPacketId(const GWEN_MSG_ENDPOINT2 *ep) { if (ep) { AQH_ENDPOINT2_MQTT *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); if (xep) { return ++(xep->lastPacketId); } } return 0; } uint16_t AQH_MqttEndpoint2_GetKeepAliveTime(const GWEN_MSG_ENDPOINT2 *ep) { if (ep) { AQH_ENDPOINT2_MQTT *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); if (xep) { return xep->keepAliveTime; } } return 0; } void AQH_MqttEndpoint2_SetKeepAliveTime(GWEN_MSG_ENDPOINT2 *ep, uint16_t i) { if (ep) { AQH_ENDPOINT2_MQTT *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); if (xep) { xep->keepAliveTime=i; } } } int _getBytesNeededForMessage(GWEN_UNUSED GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg) { uint32_t bytesInMsg; bytesInMsg=GWEN_Msg_GetBytesInBuffer(msg); if (bytesInMsg<2) { DBG_INFO(AQH_LOGDOMAIN, "Header not yet complete"); return (int) (2-bytesInMsg); } if (!(GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET)) _calcAndSetPayloadSizeAndOffset(msg); if (GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET) { uint32_t msgLength; msgLength=GWEN_Msg_GetParsedPayloadOffset(msg)+GWEN_Msg_GetParsedPayloadSize(msg); return (int)(msgLength-bytesInMsg); } else { DBG_INFO(AQH_LOGDOMAIN, "Size field not complete, requesting another byte"); return 1; } } int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg) { int remainingBytesInBuffer; uint32_t mqttRemainingLength=0; remainingBytesInBuffer=GWEN_Msg_GetBytesInBuffer(msg); if (remainingBytesInBuffer>1) { const uint8_t *ptr; int idx; int shift=0; ptr=GWEN_Msg_GetConstBuffer(msg); idx=1; remainingBytesInBuffer--; while(remainingBytesInBuffer) { uint8_t len; len=ptr[idx]; mqttRemainingLength+=(len & 0x7f)<keepAliveTime, xep->clientId, NULL, NULL); } } return NULL; } GWEN_MSG *AQH_MqttEndpoint2_CreateMsgPublishDouble(GWEN_MSG_ENDPOINT2 *ep, uint32_t uid, int valueId, const char *valuePath, double v) { char numBuf[16]; snprintf(numBuf, sizeof(numBuf)-1, "%f", v); numBuf[sizeof(numBuf)-1]=0; return AQH_MqttEndpoint2_CreateMsgPublishString(ep, uid, valueId, valuePath, numBuf); } GWEN_MSG *AQH_MqttEndpoint2_CreateMsgPublishInt(GWEN_MSG_ENDPOINT2 *ep, uint32_t uid, int valueId, const char *valuePath, int v) { char numBuf[16]; snprintf(numBuf, sizeof(numBuf)-1, "%d", v); numBuf[sizeof(numBuf)-1]=0; return AQH_MqttEndpoint2_CreateMsgPublishString(ep, uid, valueId, valuePath, numBuf); } GWEN_MSG *AQH_MqttEndpoint2_CreateMsgPublishString(GWEN_MSG_ENDPOINT2 *ep, uint32_t uid, int valueId, const char *valuePath, const char *v) { if (ep) { AQH_ENDPOINT2_MQTT *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); if (xep) { GWEN_BUFFER *bufTopic; GWEN_MSG *pubMsg; bufTopic=GWEN_Buffer_new(0, 64, 0, 1); if (valueId>0) GWEN_Buffer_AppendArgs(bufTopic, "%s/%08x/%d/%s", xep->topicPrefix, uid, valueId, valuePath); else GWEN_Buffer_AppendArgs(bufTopic, "%s/%08x/%s", xep->topicPrefix, uid, valuePath); DBG_INFO(AQH_LOGDOMAIN, "MQTT PUBLISH %s: %s", GWEN_Buffer_GetStart(bufTopic), v); pubMsg=AQH_PublishMqttMsg_new(0, 0, GWEN_Buffer_GetStart(bufTopic), (const uint8_t*) v, strlen(v)); GWEN_Buffer_free(bufTopic); if (pubMsg==NULL) { DBG_INFO(AQH_LOGDOMAIN, "here"); } return pubMsg; } } return NULL; }