/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "aqhome/mqtt/endpoint_mqttc_p.h" #include "aqhome/mqtt/msg_mqtt.h" #include "aqhome/mqtt/msg_mqtt_connect.h" #include "aqhome/mqtt/msg_mqtt_publish.h" #include "aqhome/msg/endpoint_node.h" #include "aqhome/msg/msg_node.h" #include "aqhome/msg/msg_value2.h" #include "aqhome/msg/msg_sendstats.h" #include "aqhome/msg/msg_recvstats.h" #include #include #define GWEN_MSG_ENDPOINT_MQTTC_NAME "mqttc" #define GWEN_ENDPOINT_MQTTC_BUFFERSIZE 1024 GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC) static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _connect(GWEN_MSG_ENDPOINT *ep); static void _sendConnectMsg(GWEN_MSG_ENDPOINT *ep); static void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep); static void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); static void _processValue2Message(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); static void _processSendStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); static void _processRecvStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); static void _publishDouble(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, double v); static void _publishInt(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, int v); static void _publishString(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, const char *v); static int _isMsgComplete(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg); static int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg); GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const char *name, int groupId) { GWEN_MSG_ENDPOINT *ep; AQH_ENDPOINT_MQTTC *xep; ep=GWEN_TcpcEndpoint_new(host, port, name?name:GWEN_MSG_ENDPOINT_MQTTC_NAME, groupId); if (ep==NULL) { DBG_INFO(AQH_LOGDOMAIN, "here"); return NULL; } AQH_NodeEndpoint_Extend(ep); AQH_NodeEndpoint_SetAcceptedMsgGroups(ep, AQH_MSG_TYPEGROUP_ALL); GWEN_NEW_OBJECT(AQH_ENDPOINT_MQTTC, xep); GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep, xep, _freeData); GWEN_MsgEndpoint_SetDefaultBufferSize(ep, GWEN_ENDPOINT_MQTTC_BUFFERSIZE); GWEN_MsgEndpoint_SetIsMsgCompleteFn(ep, _isMsgComplete); GWEN_MsgEndpoint_SetProcessOutMsgFn(ep, _processOutMessage); xep->previousConnectFn=GWEN_ConnectableMsgEndpoint_SetConnectFn(ep, _connect); GWEN_ConnectableMsgEndpoint_SetFullyConnectedState(ep, GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED); return ep; } void _freeData(void *bp, void *p) { AQH_ENDPOINT_MQTTC *xep; xep=(AQH_ENDPOINT_MQTTC*) p; free(xep->clientId); free(xep->topicPrefix); GWEN_FREE_OBJECT(xep); } uint16_t AQH_MqttClientEndpoint_GetNextPacketId(GWEN_MSG_ENDPOINT *ep) { if (ep) { AQH_ENDPOINT_MQTTC *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); if (xep) return ++(xep->lastPacketId); } return 0; } uint16_t AQH_MqttClientEndpoint_GetKeepAliveTime(const GWEN_MSG_ENDPOINT *ep) { if (ep) { AQH_ENDPOINT_MQTTC *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); if (xep) return xep->keepAliveTime; } return 0; } void AQH_MqttClientEndpoint_SetKeepAliveTime(GWEN_MSG_ENDPOINT *ep, uint16_t i) { if (ep) { AQH_ENDPOINT_MQTTC *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); if (xep) xep->keepAliveTime=i; } } const char *AQH_MqttClientEndpoint_GetClientId(const GWEN_MSG_ENDPOINT *ep) { if (ep) { AQH_ENDPOINT_MQTTC *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); if (xep) return xep->clientId; } return NULL; } void AQH_MqttClientEndpoint_SetClientId(GWEN_MSG_ENDPOINT *ep, const char *s) { if (ep) { AQH_ENDPOINT_MQTTC *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); if (xep) { free(xep->clientId); xep->clientId=s?strdup(s):NULL; } } } const char *AQH_MqttClientEndpoint_GetTopicPrefix(const GWEN_MSG_ENDPOINT *ep) { if (ep) { AQH_ENDPOINT_MQTTC *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); if (xep) return xep->topicPrefix; } return NULL; } void AQH_MqttClientEndpoint_SetTopicPrefix(GWEN_MSG_ENDPOINT *ep, const char *s) { if (ep) { AQH_ENDPOINT_MQTTC *xep; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); if (xep) { free(xep->topicPrefix); xep->topicPrefix=s?strdup(s):NULL; } } } int _connect(GWEN_MSG_ENDPOINT *ep) { AQH_ENDPOINT_MQTTC *xep; DBG_DEBUG(AQH_LOGDOMAIN, "Called CONNECT"); xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); if (xep) { int state; state=GWEN_ConnectableMsgEndpoint_GetState(ep); if (statepreviousConnectFn) { int rv; rv=xep->previousConnectFn(ep); if (rv<0) { if (rv==GWEN_ERROR_TRY_AGAIN) { DBG_DEBUG(AQH_LOGDOMAIN, "Still connecting..."); } else { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); } return rv; } state=GWEN_ConnectableMsgEndpoint_GetState(ep); } } else { DBG_DEBUG(AQH_LOGDOMAIN, "Physically connected, checking further."); } if (statekeepAliveTime, xep->clientId, NULL, NULL); if (msg) { DBG_DEBUG(AQH_LOGDOMAIN, "Sending MQTT CONNECT request."); GWEN_MsgEndpoint_AddSendMessage(ep, msg); GWEN_ConnectableMsgEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK); } } void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep) { GWEN_MSG *msg; DBG_DEBUG(AQH_LOGDOMAIN, "Checking for connect response"); msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep); if (msg) { uint8_t msgType; msgType=AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0; if (msgType==AQH_MQTTMSG_MSGTYPE_CONNACK) { DBG_INFO(AQH_LOGDOMAIN, "MQTT CONNACK received, logical connection established."); GWEN_ConnectableMsgEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED); } else { DBG_ERROR(AQH_LOGDOMAIN, "Unexpected message received (%s)", AQH_MqttMsg_MsgTypeToString(msgType)); } GWEN_Msg_free(msg); } } void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) { if (GWEN_ConnectableMsgEndpoint_GetState(ep)==GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED) { DBG_DEBUG(AQH_LOGDOMAIN, "Processing output message"); switch(AQH_NodeMsg_GetMsgType(nodeMsg)) { case AQH_MSG_TYPE_VALUE2: _processValue2Message(ep, nodeMsg); break; case AQH_MSG_TYPE_COMSENDSTATS: _processSendStatsMessage(ep, nodeMsg); break; case AQH_MSG_TYPE_COMRECVSTATS: _processRecvStatsMessage(ep, nodeMsg); break; default: break; } GWEN_Msg_free(nodeMsg); } } void _processValue2Message(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) { if (AQH_Value2Msg_GetValueType(nodeMsg)==AQH_MSG_VALUE2_TYPE_DOOR) _publishString(ep, AQH_Value2Msg_GetUid(nodeMsg), AQH_Value2Msg_GetValueId(nodeMsg), AQH_Value2Msg_GetValueTypeName(nodeMsg), AQH_Value2Msg_GetValueAsWindowStateString(nodeMsg)); else _publishDouble(ep, AQH_Value2Msg_GetUid(nodeMsg), AQH_Value2Msg_GetValueId(nodeMsg), AQH_Value2Msg_GetValueTypeName(nodeMsg), AQH_Value2Msg_GetValue(nodeMsg)); } void _processSendStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) { uint16_t packetsOutInt; packetsOutInt=AQH_SendStatsMsg_GetPacketsOut(nodeMsg); if (packetsOutInt) { double packetsOut; double collisions; double busy; double collisionsPercentage=0.0; double busyPercentage=0.0; packetsOut=(double) packetsOutInt; collisions=(double)AQH_SendStatsMsg_GetCollisions(nodeMsg); busy=(double)AQH_SendStatsMsg_GetBusyErrors(nodeMsg); collisionsPercentage=collisions*100.0/packetsOut; busyPercentage=busy*100.0/packetsOut; _publishInt(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/packetsOut", packetsOutInt); _publishInt(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/collisions", (int) AQH_SendStatsMsg_GetCollisions(nodeMsg)); _publishDouble(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/collisionsPercent", collisionsPercentage); _publishDouble(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/busyPercent", busyPercentage); } } void _processRecvStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) { uint16_t packetsInInt; packetsInInt=AQH_RecvStatsMsg_GetPacketsIn(nodeMsg); if (packetsInInt) { double packetsIn; double crcErrors; double ioErrors; double crcErrorsPercentage=0.0; double ioErrorsPercentage=0.0; packetsIn=(double) packetsInInt; crcErrors=(double)AQH_RecvStatsMsg_GetCrcErrors(nodeMsg); ioErrors=(double)AQH_RecvStatsMsg_GetIoErrors(nodeMsg); crcErrorsPercentage=crcErrors*100.0/packetsIn; ioErrorsPercentage=ioErrors*100.0/packetsIn; _publishInt(ep, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "net/packetsIn", packetsInInt); _publishInt(ep, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "net/crcerrors", (int) AQH_RecvStatsMsg_GetCrcErrors(nodeMsg)); _publishInt(ep, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "net/ioerrors", (int) AQH_RecvStatsMsg_GetIoErrors(nodeMsg)); _publishDouble(ep, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "net/crcerrorsPercent", crcErrorsPercentage); _publishDouble(ep, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "net/ioerrorsPercent", ioErrorsPercentage); } } void _publishDouble(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, double v) { char numBuf[16]; snprintf(numBuf, sizeof(numBuf)-1, "%f", v); numBuf[sizeof(numBuf)-1]=0; _publishString(ep, uid, valueId, valuePath, numBuf); } void _publishInt(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, int v) { char numBuf[16]; snprintf(numBuf, sizeof(numBuf)-1, "%d", v); numBuf[sizeof(numBuf)-1]=0; _publishString(ep, uid, valueId, valuePath, numBuf); } void _publishString(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, const char *v) { AQH_ENDPOINT_MQTTC *xep; GWEN_BUFFER *bufTopic; GWEN_MSG *pubMsg; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); bufTopic=GWEN_Buffer_new(0, 64, 0, 1); if (valueId>0) GWEN_Buffer_AppendArgs(bufTopic, "%s/%08x/%d/%s", xep->topicPrefix, uid, valueId, valuePath); else GWEN_Buffer_AppendArgs(bufTopic, "%s/%08x/%s", xep->topicPrefix, uid, valuePath); pubMsg=GWEN_PublishMqttMsg_new(0, 0, GWEN_Buffer_GetStart(bufTopic), (const uint8_t*) v, strlen(v)); if (pubMsg) { DBG_INFO(AQH_LOGDOMAIN, "MQTT PUBLISH %s: %s", GWEN_Buffer_GetStart(bufTopic), v); GWEN_MsgEndpoint_AddSendMessage(ep, pubMsg); } GWEN_Buffer_free(bufTopic); } int _isMsgComplete(GWEN_UNUSED GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg) { int rv; if (!(GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET)) { rv=_calcAndSetPayloadSizeAndOffset(msg); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } } rv=AQH_MqttMsg_IsMsgComplete(msg); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } return rv; } int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg) { int remainingBytesInBuffer; uint32_t mqttRemainingLength=0; remainingBytesInBuffer=GWEN_Msg_GetBytesInBuffer(msg); if (remainingBytesInBuffer>1) { const uint8_t *ptr; int idx; int shift=0; ptr=GWEN_Msg_GetConstBuffer(msg); idx=1; remainingBytesInBuffer--; while(remainingBytesInBuffer) { uint8_t len; len=ptr[idx]; mqttRemainingLength+=(len & 0x7f)<