From 6f5a26b0cf74693fc32f8d870f6737f02bdd0329 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Sun, 9 Jul 2023 20:47:47 +0200 Subject: [PATCH] aqhome: Started rewriting endpoints for version 2 of the msgio interface. This interface is much simpler. First rewritten endpoint is that for MQTT. --- aqhome/libtest.c | 27 ++- aqhome/mqtt/0BUILD | 4 +- aqhome/mqtt/endpoint2_mqtt.c | 216 +++++++++++++++++++- aqhome/mqtt/endpoint2_mqtt.h | 34 +++- aqhome/mqtt/endpoint2_mqtt_p.h | 31 +++ aqhome/mqtt/endpoint2_mqttc.c | 346 +++++++++++++++++++++++++++++++++ aqhome/mqtt/endpoint2_mqttc.h | 44 +++++ 7 files changed, 697 insertions(+), 5 deletions(-) create mode 100644 aqhome/mqtt/endpoint2_mqtt_p.h create mode 100644 aqhome/mqtt/endpoint2_mqttc.c create mode 100644 aqhome/mqtt/endpoint2_mqttc.h diff --git a/aqhome/libtest.c b/aqhome/libtest.c index 3cac040..23850c1 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -14,6 +14,7 @@ #include "aqhome/ipc/msg_ipc_forward.h" #include "aqhome/ipc/msg_ipc_setaccmsggrps.h" #include "aqhome/mqtt/endpoint_mqttc.h" +#include "aqhome/mqtt/endpoint2_mqttc.h" #include "aqhome/mqtt/msg_mqtt_connect.h" #include "aqhome/mqtt/msg_mqtt_connack.h" #include "aqhome/mqtt/msg_mqtt_publish.h" @@ -249,6 +250,29 @@ int testMqttConnection() +int testMqttConnection2() +{ + GWEN_MSG_ENDPOINT2 *epClient; + int loop; + + AQH_Init(); + + epClient=AQH_MqttClientEndpoint2_new("TESTCLIENT1234", "127.0.0.1", 1883, NULL, 1); + + for (loop=0;; loop++) { + DBG_INFO(GWEN_LOGDOMAIN, "Loop %d:", loop); + GWEN_MsgEndpoint2_IoLoop(epClient, 2000); /* 2000 ms */ + + if (GWEN_MsgEndpoint2_GetState(epClient)==GWEN_MSG_ENDPOINT_STATE_CONNECTED) { + DBG_INFO(AQH_LOGDOMAIN, "Connected."); + break; + } + } + return 0; +} + + + int testMqttSubscribe(int argc, char **argv) { int rv; @@ -557,7 +581,8 @@ int main(int argc, char **argv) //return testIpcConnection(); //return testHexfile(argc, argv); //return testFlashRecords(argc, argv); - return testMqttSubscribe(argc, argv); + //return testMqttSubscribe(argc, argv); + return testMqttConnection2(); } diff --git a/aqhome/mqtt/0BUILD b/aqhome/mqtt/0BUILD index 70d304b..f5480a6 100644 --- a/aqhome/mqtt/0BUILD +++ b/aqhome/mqtt/0BUILD @@ -47,6 +47,7 @@ endpoint_mqttc.h endpoint2_mqtt.h + endpoint2_mqttc.h msg_mqtt.h msg_mqtt_connect.h msg_mqtt_connack.h @@ -58,7 +59,7 @@ - endpoint_mqttc_p.h + endpoint2_mqtt_p.h @@ -67,6 +68,7 @@ endpoint_mqttc.c endpoint2_mqtt.c + endpoint2_mqttc.c msg_mqtt.c msg_mqtt_connect.c msg_mqtt_connack.c diff --git a/aqhome/mqtt/endpoint2_mqtt.c b/aqhome/mqtt/endpoint2_mqtt.c index 7020fa2..ae334a3 100644 --- a/aqhome/mqtt/endpoint2_mqtt.c +++ b/aqhome/mqtt/endpoint2_mqtt.c @@ -10,10 +10,22 @@ # include #endif -#include "./endpoint2_mqtt.h" +#include "aqhome/mqtt/endpoint2_mqtt_p.h" + +#include "aqhome/mqtt/msg_mqtt_connect.h" +#include "aqhome/mqtt/msg_mqtt_publish.h" #include #include +#include + + +#define AQH_ENDPOINT2_MQTT_DEFAULT_KEEPALIVE 600 + + + +GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_ENDPOINT2_MQTT) + @@ -22,6 +34,7 @@ * ------------------------------------------------------------------------------------------------ */ +static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _getBytesNeededForMessage(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg); static int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg); @@ -36,12 +49,134 @@ static int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg); void AQH_MqttEndpoint2_Extend(GWEN_MSG_ENDPOINT2 *ep) { if (ep) { + AQH_ENDPOINT2_MQTT *xep; + + GWEN_NEW_OBJECT(AQH_ENDPOINT2_MQTT, xep); + GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep, xep, _freeData); + xep->keepAliveTime=AQH_ENDPOINT2_MQTT_DEFAULT_KEEPALIVE; + GWEN_MsgIoEndpoint2_SetGetNeededBytesFn(ep, _getBytesNeededForMessage); } } +void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) +{ + AQH_ENDPOINT2_MQTT *xep; + + xep=(AQH_ENDPOINT2_MQTT*) p; + free(xep->clientId); + free(xep->topicPrefix); + GWEN_FREE_OBJECT(xep); +} + + + +const char *AQH_MqttEndpoint2_GetClientId(const GWEN_MSG_ENDPOINT2 *ep) +{ + if (ep) { + AQH_ENDPOINT2_MQTT *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); + if (xep) { + return xep->clientId; + } + } + return NULL; +} + + + +void AQH_MqttEndpoint2_SetClientId(GWEN_MSG_ENDPOINT2 *ep, const char *s) +{ + if (ep) { + AQH_ENDPOINT2_MQTT *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); + if (xep) { + free(xep->clientId); + xep->clientId=s?strdup(s):NULL; + } + } +} + + + +const char *AQH_MqttEndpoint2_GetTopicPrefix(const GWEN_MSG_ENDPOINT2 *ep) +{ + if (ep) { + AQH_ENDPOINT2_MQTT *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); + if (xep) { + return xep->topicPrefix; + } + } + return NULL; +} + + + +void AQH_MqttEndpoint2_SetTopicPrefix(GWEN_MSG_ENDPOINT2 *ep, const char *s) +{ + if (ep) { + AQH_ENDPOINT2_MQTT *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); + if (xep) { + free(xep->topicPrefix); + xep->topicPrefix=s?strdup(s):NULL; + } + } +} + + + +uint16_t AQH_MqttEndpoint2_GetNextPacketId(const GWEN_MSG_ENDPOINT2 *ep) +{ + if (ep) { + AQH_ENDPOINT2_MQTT *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); + if (xep) { + return ++(xep->lastPacketId); + } + } + return 0; +} + + + +uint16_t AQH_MqttEndpoint2_GetKeepAliveTime(const GWEN_MSG_ENDPOINT2 *ep) +{ + if (ep) { + AQH_ENDPOINT2_MQTT *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); + if (xep) { + return xep->keepAliveTime; + } + } + return 0; +} + + + +void AQH_MqttEndpoint2_SetKeepAliveTime(GWEN_MSG_ENDPOINT2 *ep, uint16_t i) +{ + if (ep) { + AQH_ENDPOINT2_MQTT *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); + if (xep) { + xep->keepAliveTime=i; + } + } +} + + + int _getBytesNeededForMessage(GWEN_UNUSED GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg) { uint32_t bytesInMsg; @@ -104,4 +239,83 @@ int _calcAndSetPayloadSizeAndOffset(GWEN_MSG *msg) +GWEN_MSG *AQH_MqttEndpoint2_CreateMsgConnect(GWEN_MSG_ENDPOINT2 *ep) +{ + if (ep) { + AQH_ENDPOINT2_MQTT *xep; + + DBG_INFO(AQH_LOGDOMAIN, "Sending connect msg"); + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); + if (xep) { + return GWEN_ConnectMqttMsg_new("MQTT", 4, 0, xep->keepAliveTime, xep->clientId, NULL, NULL); + } + } + return NULL; +} + + + +GWEN_MSG *AQH_MqttEndpoint2_CreateMsgPublishDouble(GWEN_MSG_ENDPOINT2 *ep, uint32_t uid, int valueId, const char *valuePath, double v) +{ + char numBuf[16]; + + snprintf(numBuf, sizeof(numBuf)-1, "%f", v); + numBuf[sizeof(numBuf)-1]=0; + return AQH_MqttEndpoint2_CreateMsgPublishString(ep, uid, valueId, valuePath, numBuf); +} + + + +GWEN_MSG *AQH_MqttEndpoint2_CreateMsgPublishInt(GWEN_MSG_ENDPOINT2 *ep, uint32_t uid, int valueId, const char *valuePath, int v) +{ + char numBuf[16]; + + snprintf(numBuf, sizeof(numBuf)-1, "%d", v); + numBuf[sizeof(numBuf)-1]=0; + return AQH_MqttEndpoint2_CreateMsgPublishString(ep, uid, valueId, valuePath, numBuf); +} + + + +GWEN_MSG *AQH_MqttEndpoint2_CreateMsgPublishString(GWEN_MSG_ENDPOINT2 *ep, + uint32_t uid, + int valueId, + const char *valuePath, + const char *v) +{ + if (ep) { + AQH_ENDPOINT2_MQTT *xep; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT2, AQH_ENDPOINT2_MQTT, ep); + if (xep) { + GWEN_BUFFER *bufTopic; + GWEN_MSG *pubMsg; + + bufTopic=GWEN_Buffer_new(0, 64, 0, 1); + if (valueId>0) + GWEN_Buffer_AppendArgs(bufTopic, "%s/%08x/%d/%s", + xep->topicPrefix, + uid, + valueId, + valuePath); + else + GWEN_Buffer_AppendArgs(bufTopic, "%s/%08x/%s", + xep->topicPrefix, + uid, + valuePath); + + DBG_INFO(AQH_LOGDOMAIN, "MQTT PUBLISH %s: %s", GWEN_Buffer_GetStart(bufTopic), v); + pubMsg=GWEN_PublishMqttMsg_new(0, 0, GWEN_Buffer_GetStart(bufTopic), (const uint8_t*) v, strlen(v)); + GWEN_Buffer_free(bufTopic); + if (pubMsg==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "here"); + } + return pubMsg; + } + } + return NULL; +} + + + diff --git a/aqhome/mqtt/endpoint2_mqtt.h b/aqhome/mqtt/endpoint2_mqtt.h index e8fa9c2..388af8f 100644 --- a/aqhome/mqtt/endpoint2_mqtt.h +++ b/aqhome/mqtt/endpoint2_mqtt.h @@ -6,8 +6,8 @@ * should have received along with this file. ****************************************************************************/ -#ifndef AQH_ENDPOINT2_MQTTC_H -#define AQH_ENDPOINT2_MQTTC_H +#ifndef AQH_ENDPOINT2_MQTT_H +#define AQH_ENDPOINT2_MQTT_H #include @@ -27,6 +27,36 @@ extern "C" { AQHOME_API void AQH_MqttEndpoint2_Extend(GWEN_MSG_ENDPOINT2 *ep); +AQHOME_API const char *AQH_MqttEndpoint2_GetClientId(const GWEN_MSG_ENDPOINT2 *ep); +AQHOME_API void AQH_MqttEndpoint2_SetClientId(GWEN_MSG_ENDPOINT2 *ep, const char *s); + +AQHOME_API const char *AQH_MqttEndpoint2_GetTopicPrefix(const GWEN_MSG_ENDPOINT2 *ep); +AQHOME_API void AQH_MqttEndpoint2_SetTopicPrefix(GWEN_MSG_ENDPOINT2 *ep, const char *s); + +AQHOME_API uint16_t AQH_MqttEndpoint2_GetNextPacketId(const GWEN_MSG_ENDPOINT2 *ep); + +AQHOME_API uint16_t AQH_MqttEndpoint2_GetKeepAliveTime(const GWEN_MSG_ENDPOINT2 *ep); +AQHOME_API void AQH_MqttEndpoint2_SetKeepAliveTime(GWEN_MSG_ENDPOINT2 *ep, uint16_t i); + + + +AQHOME_API GWEN_MSG *AQH_MqttEndpoint2_CreateMsgConnect(GWEN_MSG_ENDPOINT2 *ep); +AQHOME_API GWEN_MSG *AQH_MqttEndpoint2_CreateMsgPublishString(GWEN_MSG_ENDPOINT2 *ep, + uint32_t uid, + int valueId, + const char *valuePath, + const char *v); +AQHOME_API GWEN_MSG *AQH_MqttEndpoint2_CreateMsgPublishDouble(GWEN_MSG_ENDPOINT2 *ep, + uint32_t uid, + int valueId, + const char *valuePath, + double v); +AQHOME_API GWEN_MSG *AQH_MqttEndpoint2_CreateMsgPublishInt(GWEN_MSG_ENDPOINT2 *ep, + uint32_t uid, + int valueId, + const char *valuePath, + int v); + diff --git a/aqhome/mqtt/endpoint2_mqtt_p.h b/aqhome/mqtt/endpoint2_mqtt_p.h new file mode 100644 index 0000000..85c0c3d --- /dev/null +++ b/aqhome/mqtt/endpoint2_mqtt_p.h @@ -0,0 +1,31 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_ENDPOINT2_MQTT_P_H +#define AQH_ENDPOINT2_MQTT_P_H + + +#include "aqhome/mqtt/endpoint2_mqtt.h" + +#include + + +typedef struct AQH_ENDPOINT2_MQTT AQH_ENDPOINT2_MQTT; +struct AQH_ENDPOINT2_MQTT { + char *clientId; + char *topicPrefix; + uint16_t lastPacketId; + uint16_t keepAliveTime; +}; + + + + + +#endif + diff --git a/aqhome/mqtt/endpoint2_mqttc.c b/aqhome/mqtt/endpoint2_mqttc.c new file mode 100644 index 0000000..dc96c46 --- /dev/null +++ b/aqhome/mqtt/endpoint2_mqttc.c @@ -0,0 +1,346 @@ +/**************************************************************************** + * This file is part of the project Gwenhywfar. + * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "aqhome/mqtt/endpoint2_mqttc.h" + +#include "aqhome/mqtt/endpoint2_mqtt.h" +#include "aqhome/mqtt/msg_mqtt_connect.h" +#include "aqhome/mqtt/msg_mqtt_connack.h" +#include "aqhome/mqtt/msg_mqtt_publish.h" + +#include +#include +#include +#include + + +#define AQH_ENDPOINT2_MQTT_NAME "mqtt-client" +#define AQH_ENDPOINT2_MQTTC_RECONNECT_TIME 5 +#define AQH_ENDPOINT2_MQTTC_CONNECT_TIMEOUT 10 + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void _addSockets(GWEN_MSG_ENDPOINT2 *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_UNUSED GWEN_SOCKETSET *xSet); +static void _checkSockets(GWEN_MSG_ENDPOINT2 *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); +static int _startConnect(GWEN_MSG_ENDPOINT2 *ep); +static void _moveMessagesBetweenLists(GWEN_MSG_LIST *srcList, GWEN_MSG_LIST *dstList); + +static void _addSocketsWhenUnconnected(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG_ENDPOINT2 *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); +static void _addSocketsWhenConnecting(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG_ENDPOINT2 *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); +static void _addSocketsWhenConnected(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG_ENDPOINT2 *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); + +static void _checkSocketsWhenConnecting(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG_ENDPOINT2 *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); +static void _checkSocketsWhenConnected(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG_ENDPOINT2 *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + + + +GWEN_MSG_ENDPOINT2 *AQH_MqttClientEndpoint2_new(const char *clientId, + const char *host, int port, + const char *name, int groupId) +{ + GWEN_MSG_ENDPOINT2 *ep; + GWEN_MSG_ENDPOINT2 *epChild; + + ep=GWEN_MsgEndpoint2_new(name?name:AQH_ENDPOINT2_MQTT_NAME, groupId); + GWEN_MsgEndpoint2_SetAddSocketsFn(ep, _addSockets); + GWEN_MsgEndpoint2_SetCheckSocketsFn(ep, _checkSockets); + + epChild=GWEN_TcpcEndpoint2_new(host, port, NULL, groupId); + GWEN_MsgIoEndpoint2_Extend(epChild); + AQH_MqttEndpoint2_Extend(epChild); + AQH_MqttEndpoint2_SetClientId(epChild, clientId); + GWEN_MsgEndpoint2_Tree2_AddChild(ep, epChild); + + return ep; +} + + + +int AQH_MqttClientEndpoint2_StartConnect(GWEN_MSG_ENDPOINT2 *ep) +{ + if (ep) { + if (GWEN_MsgEndpoint2_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { + int rv; + + /* connect, set state */ + rv=_startConnect(ep); + if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { + DBG_INFO(AQH_LOGDOMAIN, "Endpoint %s: Error connecting (%d)", GWEN_MsgEndpoint2_GetName(ep), rv); + GWEN_MsgEndpoint2_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTING); + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Endpoint %s: Connecting.", GWEN_MsgEndpoint2_GetName(ep)); + GWEN_MsgEndpoint2_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTING); + } + return rv; + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Endpoint %s: Not unconnected", GWEN_MsgEndpoint2_GetName(ep)); + } + } + else { + DBG_ERROR(GWEN_LOGDOMAIN, "No endpoint"); + } + return GWEN_ERROR_GENERIC; +} + + + +uint16_t AQH_MqttClientEndpoint2_GetKeepAliveTime(const GWEN_MSG_ENDPOINT2 *ep) +{ + if (ep) { + GWEN_MSG_ENDPOINT2 *epChild; + + epChild=GWEN_MsgEndpoint2_Tree2_GetFirstChild(ep); + if (epChild) + return AQH_MqttEndpoint2_GetKeepAliveTime(epChild); + } + return 0; +} + + + +void AQH_MqttClientEndpoint2_SetKeepAliveTime(GWEN_MSG_ENDPOINT2 *ep, uint16_t i) +{ + if (ep) { + GWEN_MSG_ENDPOINT2 *epChild; + + epChild=GWEN_MsgEndpoint2_Tree2_GetFirstChild(ep); + if (epChild) { + AQH_MqttEndpoint2_SetKeepAliveTime(epChild, i); + } + } +} + + + +void _addSockets(GWEN_MSG_ENDPOINT2 *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) +{ + if (ep) { + GWEN_MSG_ENDPOINT2 *epChild; + + epChild=GWEN_MsgEndpoint2_Tree2_GetFirstChild(ep); + if (epChild) { + if (GWEN_MsgEndpoint2_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) + _addSocketsWhenUnconnected(ep, epChild, readSet, writeSet, xSet); + else { + if (GWEN_MsgEndpoint2_GetState(epChild)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { + DBG_ERROR(AQH_LOGDOMAIN, "Error on tcp layer, disconnecting"); + GWEN_MsgEndpoint2_Disconnect(epChild); + GWEN_MsgEndpoint2_Disconnect(ep); + } + else { + if (GWEN_MsgEndpoint2_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_CONNECTING) + _addSocketsWhenConnecting(ep, epChild, readSet, writeSet, xSet); + if (GWEN_MsgEndpoint2_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_CONNECTED) + _addSocketsWhenConnected(ep, epChild, readSet, writeSet, xSet); + } + } + } /* if (epChild) */ + } /* if (ep) */ +} + + + +void _checkSockets(GWEN_MSG_ENDPOINT2 *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) +{ + if (ep) { + GWEN_MSG_ENDPOINT2 *epChild; + + epChild=GWEN_MsgEndpoint2_Tree2_GetFirstChild(ep); + if (epChild) { + if (GWEN_MsgEndpoint2_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { + /* nothing to do here */ + } /* if GWEN_MSG_ENDPOINT_STATE_UNCONNECTED */ + else { + if (GWEN_MsgEndpoint2_GetState(epChild)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { + DBG_ERROR(AQH_LOGDOMAIN, "Error on tcp layer, disconnecting"); + GWEN_MsgEndpoint2_Disconnect(epChild); + GWEN_MsgEndpoint2_Disconnect(ep); + } + else { + if (GWEN_MsgEndpoint2_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_CONNECTING) + _checkSocketsWhenConnecting(ep, epChild, readSet, writeSet, xSet); + else if (GWEN_MsgEndpoint2_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_CONNECTED) + _checkSocketsWhenConnected(ep, epChild, readSet, writeSet, xSet); + } + } + } + } +} + + + +void _addSocketsWhenUnconnected(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG_ENDPOINT2 *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) +{ + time_t now; + + now=time(NULL); + if ((now-GWEN_MsgEndpoint2_GetTimeOfLastStateChange(ep))>=AQH_ENDPOINT2_MQTTC_RECONNECT_TIME) { + int rv; + + /* (re)connect, set state */ + DBG_INFO(AQH_LOGDOMAIN, "Starting to (re-)connect"); + rv=AQH_MqttClientEndpoint2_StartConnect(ep); + if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { + DBG_INFO(GWEN_LOGDOMAIN, "here (%d)", rv); + } + } +} + + + +void _addSocketsWhenConnecting(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG_ENDPOINT2 *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) +{ + time_t now; + + now=time(NULL); + if ((now-GWEN_MsgEndpoint2_GetTimeOfLastStateChange(ep))>=AQH_ENDPOINT2_MQTTC_CONNECT_TIMEOUT || + GWEN_MsgEndpoint2_GetState(epChild)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { + DBG_ERROR(AQH_LOGDOMAIN, "Timeout on connect"); + GWEN_MsgEndpoint2_Disconnect(epChild); + GWEN_MsgEndpoint2_Disconnect(ep); + } + else + GWEN_MsgEndpoint2_AddSockets(epChild, readSet, writeSet, xSet); +} + + + +void _addSocketsWhenConnected(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG_ENDPOINT2 *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) +{ + if (GWEN_MsgEndpoint2_GetState(epChild)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { + DBG_ERROR(AQH_LOGDOMAIN, "Error on tcp layer, disconnecting"); + GWEN_MsgEndpoint2_Disconnect(epChild); + GWEN_MsgEndpoint2_Disconnect(ep); + } + else { + /* move to-send messages to child */ + _moveMessagesBetweenLists(GWEN_MsgEndpoint2_GetSendMessageList(ep), GWEN_MsgEndpoint2_GetSendMessageList(epChild)); + GWEN_MsgEndpoint2_AddSockets(epChild, readSet, writeSet, xSet); + } +} + + + +void _checkSocketsWhenConnecting(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG_ENDPOINT2 *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) +{ + GWEN_MSG *msg; + + GWEN_MsgEndpoint2_CheckSockets(epChild, readSet, writeSet, xSet); /* let base layer work */ + + msg=GWEN_MsgEndpoint2_GetFirstReceivedMessage(epChild); + while(msg) { + GWEN_MSG *msgNext; + uint8_t msgType; + + msgNext=GWEN_Msg_List_Next(msg); + msgType=AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0; + if (msgType==AQH_MQTTMSG_MSGTYPE_CONNACK) { + int code; + + GWEN_Msg_List_Del(msg); /* remove from list */ + code=AQH_ConnAckMqttMsg_GetResultCode(msg); + if (code==AQH_MQTTMSG_CONNACK_RESULT_ACCEPTED) { + DBG_INFO(AQH_LOGDOMAIN, "Positive CONNACK response, connected"); + GWEN_MsgEndpoint2_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTED); + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Negative CONNACK response (%d)", code); + GWEN_MsgEndpoint2_Disconnect(epChild); + GWEN_MsgEndpoint2_Disconnect(ep); + } + GWEN_Msg_free(msg); + break; + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Ignoring response (%d)", msgType); + } + msg=msgNext; + } /* while */ +} + + + +void _checkSocketsWhenConnected(GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG_ENDPOINT2 *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) +{ + _moveMessagesBetweenLists(GWEN_MsgEndpoint2_GetSendMessageList(ep), GWEN_MsgEndpoint2_GetSendMessageList(epChild)); + GWEN_MsgEndpoint2_CheckSockets(epChild, readSet, writeSet, xSet); + _moveMessagesBetweenLists(GWEN_MsgEndpoint2_GetReceivedMessageList(epChild), GWEN_MsgEndpoint2_GetReceivedMessageList(ep)); +} + + + +int _startConnect(GWEN_MSG_ENDPOINT2 *ep) +{ + GWEN_MSG_ENDPOINT2 *epChild; + + epChild=GWEN_MsgEndpoint2_Tree2_GetFirstChild(ep); + if (epChild) { + int rv; + GWEN_MSG *msg; + + rv=GWEN_TcpcEndpoint2_StartConnect(epChild); + if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { + DBG_INFO(AQH_LOGDOMAIN, "Error starting to connect child layer (%d)", rv); + return rv; + } + msg=AQH_MqttEndpoint2_CreateMsgConnect(epChild); + if (msg) { + GWEN_MsgEndpoint2_AddSendMessage(epChild, msg); + GWEN_MsgEndpoint2_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTING); + return rv; /* result from GWEN_TcpcEndpoint2_StartConnect() above */ + } + } + return GWEN_ERROR_GENERIC; +} + + + + +void _moveMessagesBetweenLists(GWEN_MSG_LIST *srcList, GWEN_MSG_LIST *dstList) +{ + GWEN_MSG *msg; + + while( (msg=GWEN_Msg_List_First(srcList)) ) { + GWEN_Msg_List_Del(msg); + GWEN_Msg_List_Add(msg, dstList); + } +} + + + + + + diff --git a/aqhome/mqtt/endpoint2_mqttc.h b/aqhome/mqtt/endpoint2_mqttc.h new file mode 100644 index 0000000..502d53d --- /dev/null +++ b/aqhome/mqtt/endpoint2_mqttc.h @@ -0,0 +1,44 @@ +/**************************************************************************** + * This file is part of the project Gwenhywfar. + * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_ENDPOINT2_MQTTC_H +#define AQH_ENDPOINT2_MQTTC_H + + +#include + +#include + + +#ifdef __cplusplus +extern "C" { +#endif + + +/** + */ +AQHOME_API GWEN_MSG_ENDPOINT2 *AQH_MqttClientEndpoint2_new(const char *clientId, + const char *host, int port, + const char *name, int groupId); + +AQHOME_API uint16_t AQH_MqttClientEndpoint2_GetKeepAliveTime(const GWEN_MSG_ENDPOINT2 *ep); +AQHOME_API void AQH_MqttClientEndpoint2_SetKeepAliveTime(GWEN_MSG_ENDPOINT2 *ep, uint16_t i); + + +AQHOME_API int AQH_MqttClientEndpoint2_StartConnect(GWEN_MSG_ENDPOINT2 *ep); + + + +#ifdef __cplusplus +} +#endif + + +#endif + +