From bb3d194b5a1586b6a909308492831e81d5c9cd32 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Wed, 29 Mar 2023 16:46:32 +0200 Subject: [PATCH] aqhomed, aqhome: added MQTT endpoint, publish value changes via MQTT. --- apps/aqhomed/main.c | 180 ++++++++++++++-- aqhome/mqtt/endpoint_mqttc.c | 321 +++++++++++++++++++++++++++++ aqhome/mqtt/endpoint_mqttc.h | 11 + aqhome/mqtt/endpoint_mqttc_p.h | 7 +- aqhome/mqtt/msg_mqtt_pubresponse.c | 20 +- aqhome/mqtt/msg_mqtt_pubresponse.h | 1 + aqhome/msgmanager.c | 21 +- aqhome/msgmanager.h | 1 + 8 files changed, 538 insertions(+), 24 deletions(-) diff --git a/apps/aqhomed/main.c b/apps/aqhomed/main.c index 19dcc4c..bdfda83 100644 --- a/apps/aqhomed/main.c +++ b/apps/aqhomed/main.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -52,6 +53,12 @@ static int _serve(GWEN_DB_NODE *dbArgs); static GWEN_MSG_ENDPOINT_MGR *_setupService(GWEN_DB_NODE *dbArgs); + +static int _setupTty(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs); +static int _setupLog(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs); +static int _setupIpc(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs); +static int _setupMqtt(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs); + static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs); static int _createPidFile(const char *pidFilename); @@ -67,6 +74,7 @@ static int stopService=0; + int main(int argc, char **argv) { GWEN_DB_NODE *dbArgs; @@ -162,47 +170,96 @@ GWEN_MSG_ENDPOINT_MGR *_setupService(GWEN_DB_NODE *dbArgs) { GWEN_MSG_ENDPOINT_MGR *emgr; int nodeAddress; - const char *devicePath; - const char *logFile; - const char *tcpAddress; - int tcpPort; + int rv; nodeAddress=GWEN_DB_GetIntValue(dbArgs, "nodeAddress", 0, 240); - logFile=GWEN_DB_GetCharValue(dbArgs, "logfile", 0, NULL); - devicePath=GWEN_DB_GetCharValue(dbArgs, "device", 0, "/dev/ttyUSB0"); - tcpAddress=GWEN_DB_GetCharValue(dbArgs, "tcpAddress", 0, NULL); - tcpPort=GWEN_DB_GetIntValue(dbArgs, "tcpPort", 0, 45454); emgr=AQH_MsgManager_new(nodeAddress & 0xff); + rv=_setupTty(emgr, dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + GWEN_MsgEndpointMgr_free(emgr); + return NULL; + } + rv=_setupLog(emgr, dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + GWEN_MsgEndpointMgr_free(emgr); + return NULL; + } + + rv=_setupIpc(emgr, dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + GWEN_MsgEndpointMgr_free(emgr); + return NULL; + } + + rv=_setupMqtt(emgr, dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + GWEN_MsgEndpointMgr_free(emgr); + return NULL; + } + + return emgr; +} + + + +int _setupTty(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs) +{ + const char *devicePath; + + devicePath=GWEN_DB_GetCharValue(dbArgs, "device", 0, "/dev/ttyUSB0"); if (devicePath && *devicePath) { GWEN_MSG_ENDPOINT *epTty; epTty=AQH_TtyNodeEndpoint_new(devicePath, AQH_MSGMGR_ENDPOINTGROUP_NODE); if (epTty==NULL) { DBG_ERROR(NULL, "Error creating endpoint TTY"); - GWEN_MsgEndpointMgr_free(emgr); - return NULL; + return GWEN_ERROR_GENERIC; } GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTty); } else { DBG_ERROR(NULL, "Missing device path"); - GWEN_MsgEndpointMgr_free(emgr); - return NULL; + return GWEN_ERROR_GENERIC; } + return 0; +} + + + +int _setupLog(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs) +{ + const char *logFile; + + logFile=GWEN_DB_GetCharValue(dbArgs, "logfile", 0, NULL); if (logFile && *logFile) { GWEN_MSG_ENDPOINT *epLog; epLog=AQH_LogEndpoint_new(logFile, AQH_MSGMGR_ENDPOINTGROUP_NODE); if (epLog==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint LOG"); - GWEN_MsgEndpointMgr_free(emgr); - return NULL; + return GWEN_ERROR_GENERIC; } GWEN_MsgEndpointMgr_AddEndpoint(emgr, epLog); } + return 0; +} + + + +int _setupIpc(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs) +{ + const char *tcpAddress; + int tcpPort; + + tcpAddress=GWEN_DB_GetCharValue(dbArgs, "tcpAddress", 0, NULL); + tcpPort=GWEN_DB_GetIntValue(dbArgs, "tcpPort", 0, 45454); if (tcpAddress && *tcpAddress && tcpPort) { GWEN_MSG_ENDPOINT *epTcp; @@ -210,17 +267,49 @@ GWEN_MSG_ENDPOINT_MGR *_setupService(GWEN_DB_NODE *dbArgs) epTcp=AQH_TcpdIpcNodeEndpoint_new(tcpAddress, tcpPort, NULL, AQH_MSGMGR_ENDPOINTGROUP_IPC); if (epTcp==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint TCP"); - GWEN_MsgEndpointMgr_free(emgr); - return NULL; + return GWEN_ERROR_GENERIC; } GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTcp); } - - return emgr; + return 0; } +int _setupMqtt(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs) +{ + const char *mqttAddress; + int mqttPort; + const char *mqttClientId; + const char *mqttTopicPrefix; + int mqttKeepAlive; + + mqttAddress=GWEN_DB_GetCharValue(dbArgs, "mqttAddress", 0, NULL); + mqttPort=GWEN_DB_GetIntValue(dbArgs, "mqttPort", 0, 1883); + mqttClientId=GWEN_DB_GetCharValue(dbArgs, "mqttClientId", 0, "aqhomed"); + mqttTopicPrefix=GWEN_DB_GetCharValue(dbArgs, "mqttTopicPrefix", 0, "aqhome/sensors"); + mqttKeepAlive=GWEN_DB_GetIntValue(dbArgs, "mqttKeepAlive", 0, 600); + + if (mqttAddress && *mqttAddress && mqttPort) { + GWEN_MSG_ENDPOINT *epMqtt; + + epMqtt=AQH_MqttClientEndpoint_new(mqttAddress, mqttPort, NULL, AQH_MSGMGR_ENDPOINTGROUP_NODE|AQH_MSGMGR_ENDPOINTGROUP_MQTT); + if (epMqtt==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint TCP"); + return GWEN_ERROR_GENERIC; + } + if (mqttClientId && *mqttClientId) + AQH_MqttClientEndpoint_SetClientId(epMqtt, mqttClientId); + if (mqttTopicPrefix && *mqttTopicPrefix) + AQH_MqttClientEndpoint_SetTopicPrefix(epMqtt, mqttTopicPrefix); + AQH_MqttClientEndpoint_SetKeepAliveTime(epMqtt, mqttKeepAlive); + + GWEN_MsgEndpointMgr_AddEndpoint(emgr, epMqtt); + } + return 0; +} + + int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) { @@ -303,6 +392,61 @@ int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) I18S("Specify the TCP port to listen on"), I18S("Specify the TCP port to listen on") }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "mqttAddress", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "ma", /* short option */ + "mqttaddress", /* long option */ + I18S("Specify the address of the MQTT server to connect to (disabled if missing)"), + I18S("Specify the address of the MQTT server to connect to (disabled if missing)") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "mqttPort", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "mp", /* short option */ + "mqttport", /* long option */ + I18S("Specify the port of the MQTT server (default: 1883)"), + I18S("Specify the port of the MQTT server (default: 1883)") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "mqttClientId", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + NULL, /* short option */ + "mqttclientid", /* long option */ + I18S("Specify client id for the MQTT server (default: \"aqhomed\")"), + I18S("Specify client id for the MQTT server (default: \"aqhomed\")") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "mqttTopicPrefix", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "mt", /* short option */ + "mqtttopicprefix", /* long option */ + I18S("Specify prefix of MQTT topics when publishing (defaults to \"aqhome/sensors\")"), + I18S("Specify prefix of MQTT topics when publishing (defaults to \"aqhome/sensors\")") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "mqttKeepAlive", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "mk", /* short option */ + "mqttkeepalive", /* long option */ + I18S("Specify keepalive time in seconds (defaults: 600)"), + I18S("Specify keepalive time in seconds (defaults: 600)") + }, { GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ GWEN_ArgsType_Char, /* type */ diff --git a/aqhome/mqtt/endpoint_mqttc.c b/aqhome/mqtt/endpoint_mqttc.c index 2fa3b6e..599c458 100644 --- a/aqhome/mqtt/endpoint_mqttc.c +++ b/aqhome/mqtt/endpoint_mqttc.c @@ -13,6 +13,12 @@ #include "aqhome/mqtt/endpoint_mqttc_p.h" #include "aqhome/mqtt/msg_mqtt.h" +#include "aqhome/mqtt/msg_mqtt_connect.h" +#include "aqhome/mqtt/msg_mqtt_publish.h" +#include "aqhome/msg/endpoint_node.h" +#include "aqhome/msg/msg_node.h" +#include "aqhome/msg/msg_value2.h" +#include "aqhome/msg/msg_sendstats.h" #include #include @@ -23,10 +29,21 @@ #define GWEN_ENDPOINT_MQTTC_BUFFERSIZE 1024 + GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC) static void GWENHYWFAR_CB _freeData(void *bp, void *p); +static void _run(GWEN_MSG_ENDPOINT *ep); +static void _sendConnectMsg(GWEN_MSG_ENDPOINT *ep); +static void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep); +static void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); +static void _processValue2Message(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); +static void _processSendStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); +static void _publishDouble(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, double v); +static void _publishInt(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, int v); +static void _publishString(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, const char *v); +static const char *_valueTypeToString(int t); static int _isMsgComplete(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg); static int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg); @@ -43,11 +60,16 @@ GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const DBG_INFO(AQH_LOGDOMAIN, "here"); return NULL; } + AQH_NodeEndpoint_Extend(ep); + AQH_NodeEndpoint_SetAcceptedMsgGroups(ep, AQH_MSG_TYPEGROUP_ALL); + GWEN_NEW_OBJECT(AQH_ENDPOINT_MQTTC, xep); GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep, xep, _freeData); GWEN_MsgEndpoint_SetDefaultBufferSize(ep, GWEN_ENDPOINT_MQTTC_BUFFERSIZE); GWEN_MsgEndpoint_SetIsMsgCompleteFn(ep, _isMsgComplete); + xep->previousRunFn=GWEN_MsgEndpoint_SetRunFn(ep, _run); + GWEN_MsgEndpoint_SetProcessOutMsgFn(ep, _processOutMessage); return ep; } @@ -64,6 +86,305 @@ void _freeData(void *bp, void *p) +uint16_t AQH_MqttClientEndpoint_GetNextPacketId(GWEN_MSG_ENDPOINT *ep) +{ + if (ep) { + AQH_ENDPOINT_MQTTC *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + if (xep) + return ++(xep->lastPacketId); + } + return 0; +} + + + +uint16_t AQH_MqttClientEndpoint_GetKeepAliveTime(const GWEN_MSG_ENDPOINT *ep) +{ + if (ep) { + AQH_ENDPOINT_MQTTC *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + if (xep) + return xep->keepAliveTime; + } + return 0; +} + + + +void AQH_MqttClientEndpoint_SetKeepAliveTime(GWEN_MSG_ENDPOINT *ep, uint16_t i) +{ + if (ep) { + AQH_ENDPOINT_MQTTC *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + if (xep) + xep->keepAliveTime=i; + } +} + + + +const char *AQH_MqttClientEndpoint_GetClientId(const GWEN_MSG_ENDPOINT *ep) +{ + if (ep) { + AQH_ENDPOINT_MQTTC *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + if (xep) + return xep->clientId; + } + return NULL; +} + + + +void AQH_MqttClientEndpoint_SetClientId(GWEN_MSG_ENDPOINT *ep, const char *s) +{ + if (ep) { + AQH_ENDPOINT_MQTTC *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + if (xep) { + free(xep->clientId); + xep->clientId=s?strdup(s):NULL; + } + } +} + + + +const char *AQH_MqttClientEndpoint_GetTopicPrefix(const GWEN_MSG_ENDPOINT *ep) +{ + if (ep) { + AQH_ENDPOINT_MQTTC *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + if (xep) + return xep->topicPrefix; + } + return NULL; +} + + + +void AQH_MqttClientEndpoint_SetTopicPrefix(GWEN_MSG_ENDPOINT *ep, const char *s) +{ + if (ep) { + AQH_ENDPOINT_MQTTC *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + if (xep) { + free(xep->topicPrefix); + xep->topicPrefix=s?strdup(s):NULL; + } + } +} + + + +void _run(GWEN_MSG_ENDPOINT *ep) +{ + AQH_ENDPOINT_MQTTC *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + if (xep) { + int state; + + if (xep->previousRunFn) + xep->previousRunFn(ep); + + state=GWEN_TcpcEndpoint_GetState(ep); + if (state==GWEN_MSG_ENDPOINT_TCPC_STATE_UNCONNECTED) { + int rv; + + rv=GWEN_TcpcEndpoint_StartConnect(ep); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error starting to connect (%d)", rv); + } + } + else if (state==GWEN_MSG_ENDPOINT_TCPC_STATE_CONNECTING) { + DBG_DEBUG(AQH_LOGDOMAIN, "Still connecting"); + } + else if (state==GWEN_MSG_ENDPOINT_TCPC_STATE_CONNECTED) + _sendConnectMsg(ep); + else if (state==GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK) + _checkForConnAckMsg(ep); + else if (state==GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED){ + /* nothing to do */ + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Unhandled connection status %d", state); + } + } +} + + + +void _sendConnectMsg(GWEN_MSG_ENDPOINT *ep) +{ + AQH_ENDPOINT_MQTTC *xep; + GWEN_MSG *msg; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + /* send CONNECT */ + msg=GWEN_ConnectMqttMsg_new("MQTT", 4, 0, xep->keepAliveTime, xep->clientId, NULL, NULL); + if (msg) { + DBG_INFO(AQH_LOGDOMAIN, "Sending MQTT CONNECT request."); + GWEN_MsgEndpoint_AddSendMessage(ep, msg); + GWEN_TcpcEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK); + } +} + + + +void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep) +{ + GWEN_MSG *msg; + + msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep); + if (msg) { + uint8_t msgType; + + msgType=AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0; + if (msgType==AQH_MQTTMSG_MSGTYPE_CONNACK) { + DBG_INFO(AQH_LOGDOMAIN, "MQTT CONNACK received, logical connection established."); + GWEN_TcpcEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED); + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Unexpected message received (%s)", AQH_MqttMsg_MsgTypeToString(msgType)); + } + GWEN_Msg_free(msg); + } +} + + + +void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) +{ + if (GWEN_TcpcEndpoint_GetState(ep)==GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED) { + DBG_DEBUG(AQH_LOGDOMAIN, "Processing output message"); + switch(AQH_NodeMsg_GetMsgType(nodeMsg)) { + case AQH_MSG_TYPE_VALUE2: + _processValue2Message(ep, nodeMsg); + break; + case AQH_MSG_TYPE_COMSENDSTATS: + _processSendStatsMessage(ep, nodeMsg); + break; + default: + break; + } + GWEN_Msg_free(nodeMsg); + } +} + + + +void _processValue2Message(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) +{ + _publishDouble(ep, + AQH_Value2Msg_GetUid(nodeMsg), + AQH_Value2Msg_GetValueId(nodeMsg), + _valueTypeToString(AQH_Value2Msg_GetValueId(nodeMsg)), + AQH_Value2Msg_GetValue(nodeMsg)); +} + + + +void _processSendStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) +{ + uint16_t packetsOutInt; + + packetsOutInt=AQH_SendStatsMsg_GetPacketsOut(nodeMsg); + if (packetsOutInt) { + double packetsOut; + double collisions; + double aborted; + double collisionsPercentage=0.0; + double abortedPercentage=0.0; + + packetsOut=(double) packetsOutInt; + collisions=(double)AQH_SendStatsMsg_GetCollisions(nodeMsg); + aborted=(double)AQH_SendStatsMsg_GetAborted(nodeMsg); + + collisionsPercentage=collisions*100.0/packetsOut; + abortedPercentage=aborted*100.0/packetsOut; + + _publishInt(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/packetsOut", packetsOutInt); + _publishInt(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/collisions", (int) AQH_SendStatsMsg_GetCollisions(nodeMsg)); + _publishDouble(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/collisionsPercent", collisionsPercentage); + _publishDouble(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/abortedPercent", abortedPercentage); + } +} + + + +void _publishDouble(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, double v) +{ + char numBuf[16]; + + snprintf(numBuf, sizeof(numBuf)-1, "%f", v); + numBuf[sizeof(numBuf)-1]=0; + _publishString(ep, uid, valueId, valuePath, numBuf); +} + + + +void _publishInt(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, int v) +{ + char numBuf[16]; + + snprintf(numBuf, sizeof(numBuf)-1, "%d", v); + numBuf[sizeof(numBuf)-1]=0; + _publishString(ep, uid, valueId, valuePath, numBuf); +} + + + +void _publishString(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, const char *v) +{ + AQH_ENDPOINT_MQTTC *xep; + GWEN_BUFFER *bufTopic; + GWEN_MSG *pubMsg; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + + bufTopic=GWEN_Buffer_new(0, 64, 0, 1); + if (valueId>0) + GWEN_Buffer_AppendArgs(bufTopic, "%s/%08x/%d/%s", + xep->topicPrefix, + uid, + valueId, + valuePath); + else + GWEN_Buffer_AppendArgs(bufTopic, "%s/%08x/%s", + xep->topicPrefix, + uid, + valuePath); + + pubMsg=GWEN_PublishMqttMsg_new(0, 0, GWEN_Buffer_GetStart(bufTopic), (const uint8_t*) v, strlen(v)); + if (pubMsg) { + DBG_INFO(AQH_LOGDOMAIN, "MQTT PUBLISH %s: %s", GWEN_Buffer_GetStart(bufTopic), v); + GWEN_MsgEndpoint_AddSendMessage(ep, pubMsg); + } + GWEN_Buffer_free(bufTopic); +} + + + +const char *_valueTypeToString(int t) +{ + switch(t) { + case AQH_MSG_VALUE2_TYPE_TEMP: return "temperature"; + case AQH_MSG_VALUE2_TYPE_HUMIDITY: return "humidity"; + default: return "unknown"; + } +} + + + int _isMsgComplete(GWEN_UNUSED GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg) { int rv; diff --git a/aqhome/mqtt/endpoint_mqttc.h b/aqhome/mqtt/endpoint_mqttc.h index 6d65641..ed9df18 100644 --- a/aqhome/mqtt/endpoint_mqttc.h +++ b/aqhome/mqtt/endpoint_mqttc.h @@ -14,10 +14,21 @@ #include +#define GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK (GWEN_MSG_ENDPOINT_TCPC_STATE_NEXTFREE+0) +#define GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED (GWEN_MSG_ENDPOINT_TCPC_STATE_NEXTFREE+1) AQHOME_API GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const char *name, int groupId); +AQHOME_API uint16_t AQH_MqttClientEndpoint_GetNextPacketId(GWEN_MSG_ENDPOINT *ep); +AQHOME_API uint16_t AQH_MqttClientEndpoint_GetKeepAliveTime(const GWEN_MSG_ENDPOINT *ep); +AQHOME_API void AQH_MqttClientEndpoint_SetKeepAliveTime(GWEN_MSG_ENDPOINT *ep, uint16_t i); + +AQHOME_API const char *AQH_MqttClientEndpoint_GetClientId(const GWEN_MSG_ENDPOINT *ep); +AQHOME_API void AQH_MqttClientEndpoint_SetClientId(GWEN_MSG_ENDPOINT *ep, const char *s); + +AQHOME_API const char *AQH_MqttClientEndpoint_GetTopicPrefix(const GWEN_MSG_ENDPOINT *ep); +AQHOME_API void AQH_MqttClientEndpoint_SetTopicPrefix(GWEN_MSG_ENDPOINT *ep, const char *s); diff --git a/aqhome/mqtt/endpoint_mqttc_p.h b/aqhome/mqtt/endpoint_mqttc_p.h index 9d7b341..7433c00 100644 --- a/aqhome/mqtt/endpoint_mqttc_p.h +++ b/aqhome/mqtt/endpoint_mqttc_p.h @@ -15,7 +15,12 @@ typedef struct AQH_ENDPOINT_MQTTC AQH_ENDPOINT_MQTTC; struct AQH_ENDPOINT_MQTTC { - int dummy; + char *clientId; + char *topicPrefix; + uint16_t lastPacketId; + uint16_t keepAliveTime; + + GWEN_MSG_ENDPOINT_RUN_FN previousRunFn; }; diff --git a/aqhome/mqtt/msg_mqtt_pubresponse.c b/aqhome/mqtt/msg_mqtt_pubresponse.c index 3b7129c..2967576 100644 --- a/aqhome/mqtt/msg_mqtt_pubresponse.c +++ b/aqhome/mqtt/msg_mqtt_pubresponse.c @@ -35,7 +35,7 @@ GWEN_MSG *GWEN_PubResponseMqttMsg_new(uint8_t typeAndFlags, uint16_t pkgId) -void AQH_PubResponseMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) +uint16_t AQH_PubResponseMqttMsg_GetPacketId(const GWEN_MSG *msg) { const uint8_t *msgPtr; uint32_t msgLen; @@ -47,12 +47,24 @@ void AQH_PubResponseMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const uint8_t *payloadPtr; uint32_t payloadLen; - GWEN_Buffer_AppendArgs(dbuf, "%s %s", AQH_MqttMsg_MsgTypeToString(msgPtr[0] & 0xf0), sText); payloadLen=GWEN_Msg_GetParsedPayloadSize(msg); payloadPtr=msgPtr+GWEN_Msg_GetParsedPayloadOffset(msg); - if (payloadLen>=2) - GWEN_Buffer_AppendArgs(dbuf, "(packet id=%04x)", (payloadPtr[0]<<8)+payloadPtr[1]); + if (payloadPtr && payloadLen>=2) + return (payloadPtr[0]<<8)+payloadPtr[1]; } + return 0; +} + + + +void AQH_PubResponseMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) +{ + uint8_t msgType; + uint16_t packetId; + + msgType=AQH_MqttMsg_GetMsgTypeAndFlags(msg); + packetId=AQH_PubResponseMqttMsg_GetPacketId(msg); + GWEN_Buffer_AppendArgs(dbuf, "%s %s (packet id: %04x)", AQH_MqttMsg_MsgTypeToString(msgType & 0xf0), sText, packetId); } diff --git a/aqhome/mqtt/msg_mqtt_pubresponse.h b/aqhome/mqtt/msg_mqtt_pubresponse.h index 597b3ea..2b336d0 100644 --- a/aqhome/mqtt/msg_mqtt_pubresponse.h +++ b/aqhome/mqtt/msg_mqtt_pubresponse.h @@ -22,6 +22,7 @@ * Use for PUBACK, PUBREC, PUBREL and PUBCOMP. */ AQHOME_API GWEN_MSG *GWEN_PubResponseMqttMsg_new(uint8_t typeAndFlags, uint16_t pkgId); +AQHOME_API uint16_t AQH_PubResponseMqttMsg_GetPacketId(const GWEN_MSG *msg); AQHOME_API void AQH_PubResponseMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); diff --git a/aqhome/msgmanager.c b/aqhome/msgmanager.c index 853d51e..a438771 100644 --- a/aqhome/msgmanager.c +++ b/aqhome/msgmanager.c @@ -21,7 +21,9 @@ #include "aqhome/msg/msg_claimaddr.h" #include "aqhome/msg/msg_haveaddr.h" #include "aqhome/msg/msg_device.h" +#include "aqhome/mqtt/endpoint_mqttc.h" +#include #include #include @@ -35,6 +37,7 @@ static void _loopOnceOverEndpoints(GWEN_MSG_ENDPOINT_MGR *emgr); static void _handleEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep); static void _handleNodeMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleIpcMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); +static void _handleMqttMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMsgValue2(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMsgNeedAddress(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMsgClaimAddress(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); @@ -99,7 +102,15 @@ void _loopOnceOverEndpoints(GWEN_MSG_ENDPOINT_MGR *emgr) ep=GWEN_MsgEndpoint_List_First(endpointList); while(ep) { - _handleEndpoint(emgr, ep); + if (GWEN_MsgEndpoint_GetGroupId(ep) & AQH_MSGMGR_ENDPOINTGROUP_MQTT) { + if (GWEN_TcpcEndpoint_GetState(ep)>=GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED) + _handleEndpoint(emgr, ep); + else { + DBG_INFO(AQH_LOGDOMAIN, "Not handling MQTT endpoint right now (not fully connected)"); + } + } + else + _handleEndpoint(emgr, ep); ep=GWEN_MsgEndpoint_List_Next(ep); } /* while */ } @@ -127,6 +138,7 @@ void _handleEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep) switch(groupId) { case AQH_MSGMGR_ENDPOINTGROUP_NODE: _handleNodeMsg(emgr, ep, msg); break; case AQH_MSGMGR_ENDPOINTGROUP_IPC: _handleIpcMsg(emgr, ep, msg); break; + case AQH_MSGMGR_ENDPOINTGROUP_MQTT: _handleMqttMsg(emgr, ep, msg); break; default: DBG_ERROR(AQH_LOGDOMAIN, "unhandled groupId %d (%02x), ignoring message", groupId, groupId); break; @@ -176,6 +188,13 @@ void _handleIpcMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWE +void _handleMqttMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) +{ + /* exec MQTT message */ +} + + + void _handleMsgValue2(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { AQH_MSG_MANAGER *xmgr; diff --git a/aqhome/msgmanager.h b/aqhome/msgmanager.h index 3d89a58..d78c98b 100644 --- a/aqhome/msgmanager.h +++ b/aqhome/msgmanager.h @@ -18,6 +18,7 @@ #define AQH_MSGMGR_ENDPOINTGROUP_NODE 1 #define AQH_MSGMGR_ENDPOINTGROUP_IPC 2 +#define AQH_MSGMGR_ENDPOINTGROUP_MQTT 4