From 27a4993f5cab85384c233c85d19e6af3c9c5f028 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Sun, 9 Apr 2023 00:35:17 +0200 Subject: [PATCH] aqhome: use new type GWEN_ConnectableMsgEndpoint. This allows for reconnect of endpoints if necessary. --- aqhome/libtest.c | 5 ----- aqhome/mqtt/endpoint_mqttc.c | 27 ++++++++--------------- aqhome/mqtt/endpoint_mqttc.h | 4 ++-- aqhome/msg/endpoint_tty.c | 42 ++++++++++++++++++++++++++---------- aqhome/msgmanager.c | 3 ++- 5 files changed, 44 insertions(+), 37 deletions(-) diff --git a/aqhome/libtest.c b/aqhome/libtest.c index ecb8154..c58d397 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -189,11 +189,6 @@ int testMqttConnection() return 2; } GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTcp); - rv=GWEN_TcpcEndpoint_StartConnect(epTcp); - if (rv<0) { - DBG_ERROR(NULL, "Error starting connect (%d)", rv); - return 2; - } fprintf(stdout, "Sending CONNECT\n"); msgOut=GWEN_ConnectMqttMsg_new("MQTT", 4, 0, 10, "CLIENTID123", NULL, NULL); diff --git a/aqhome/mqtt/endpoint_mqttc.c b/aqhome/mqtt/endpoint_mqttc.c index 9f9512e..c30853a 100644 --- a/aqhome/mqtt/endpoint_mqttc.c +++ b/aqhome/mqtt/endpoint_mqttc.c @@ -22,6 +22,7 @@ #include "aqhome/msg/msg_recvstats.h" #include +#include #include @@ -198,27 +199,17 @@ void _run(GWEN_MSG_ENDPOINT *ep) if (xep->previousRunFn) xep->previousRunFn(ep); - state=GWEN_TcpcEndpoint_GetState(ep); - if (state==GWEN_MSG_ENDPOINT_TCPC_STATE_UNCONNECTED) { - int rv; - - rv=GWEN_TcpcEndpoint_StartConnect(ep); - if (rv<0) { - DBG_INFO(AQH_LOGDOMAIN, "Error starting to connect (%d)", rv); - } - } - else if (state==GWEN_MSG_ENDPOINT_TCPC_STATE_CONNECTING) { - DBG_DEBUG(AQH_LOGDOMAIN, "Still connecting"); - } - else if (state==GWEN_MSG_ENDPOINT_TCPC_STATE_CONNECTED) + state=GWEN_ConnectableMsgEndpoint_GetState(ep); + if (state==GWEN_MSG_ENDPOINT_CONN_STATE_CONNECTED) _sendConnectMsg(ep); else if (state==GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK) _checkForConnAckMsg(ep); - else if (state==GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED){ + else if (state==GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED || + state -#define GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK (GWEN_MSG_ENDPOINT_TCPC_STATE_NEXTFREE+0) -#define GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED (GWEN_MSG_ENDPOINT_TCPC_STATE_NEXTFREE+1) +#define GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK (GWEN_MSG_ENDPOINT_CONN_STATE_NEXTFREE+0) +#define GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED (GWEN_MSG_ENDPOINT_CONN_STATE_NEXTFREE+1) AQHOME_API GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const char *name, int groupId); diff --git a/aqhome/msg/endpoint_tty.c b/aqhome/msg/endpoint_tty.c index 29d27bc..c8bf3b1 100644 --- a/aqhome/msg/endpoint_tty.c +++ b/aqhome/msg/endpoint_tty.c @@ -15,6 +15,7 @@ #include "aqhome/msg/endpoint_node.h" #include "aqhome/msg/msg_node.h" +#include #include #include #include @@ -40,13 +41,12 @@ GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_TTY) -static int _getReadFd(GWEN_MSG_ENDPOINT *ep); -static int _getWriteFd(GWEN_MSG_ENDPOINT *ep); static int _handleReadable(GWEN_MSG_ENDPOINT *ep, GWEN_UNUSED GWEN_MSG_ENDPOINT_MGR *emgr); static int _handleWritable(GWEN_MSG_ENDPOINT *ep, GWEN_UNUSED GWEN_MSG_ENDPOINT_MGR *emgr); static void GWENHYWFAR_CB _freeData(void *bp, void *p); +static int _connect(GWEN_MSG_ENDPOINT *ep); static int _startMsg(GWEN_MSG_ENDPOINT *ep); static int _endMsg(GWEN_MSG_ENDPOINT *ep); static int _isLineBusy(GWEN_MSG_ENDPOINT *ep); @@ -64,19 +64,23 @@ GWEN_MSG_ENDPOINT *AQH_TtyNodeEndpoint_new(const char *devicePath, int groupId) { GWEN_MSG_ENDPOINT *ep; AQH_MSG_ENDPOINT_TTY *xep; - int fd; +// int fd; ep=AQH_NodeEndpoint_new(AQH_MSG_ENDPOINT_TTY_NAME, groupId); + + GWEN_ConnectableMsgEndpoint_Extend(ep); + GWEN_ConnectableMsgEndpoint_SetReconnectWaitTime(ep, 5); + GWEN_NEW_OBJECT(AQH_MSG_ENDPOINT_TTY, xep); GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_TTY, ep, xep, _freeData); GWEN_MsgEndpoint_SetHandleReadableFn(ep, _handleReadable); GWEN_MsgEndpoint_SetHandleWritableFn(ep, _handleWritable); - GWEN_MsgEndpoint_SetGetReadFdFn(ep, _getReadFd); - GWEN_MsgEndpoint_SetGetWriteFdFn(ep, _getWriteFd); + GWEN_ConnectableMsgEndpoint_SetConnectFn(ep, _connect); xep->deviceName=strdup(devicePath); +#if 0 fd=_openDevice(ep); if (fd<0) { DBG_INFO(NULL, "here (%d)", fd); @@ -85,6 +89,7 @@ GWEN_MSG_ENDPOINT *AQH_TtyNodeEndpoint_new(const char *devicePath, int groupId) } GWEN_MsgEndpoint_SetFd(ep, fd); _attnHigh(ep); +#endif return ep; } @@ -101,16 +106,31 @@ void _freeData(void *bp, void *p) -int _getReadFd(GWEN_MSG_ENDPOINT *ep) +int _connect(GWEN_MSG_ENDPOINT *ep) { - return GWEN_MsgEndpoint_GetFd(ep); -} + AQH_MSG_ENDPOINT_TTY *xep; + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_TTY, ep); + if (xep) { + int state; + + state=GWEN_ConnectableMsgEndpoint_GetState(ep); + if (statedeviceName); + fd=_openDevice(ep); + if (fd<0) { + DBG_INFO(NULL, "here (%d)", fd); + return fd; + } + GWEN_MsgEndpoint_SetFd(ep, fd); + _attnHigh(ep); + } + return 0; + } -int _getWriteFd(GWEN_MSG_ENDPOINT *ep) -{ - return GWEN_MsgEndpoint_HaveMessageToSend(ep)?GWEN_MsgEndpoint_GetFd(ep):GWEN_ERROR_NO_DATA; + return GWEN_ERROR_GENERIC; } diff --git a/aqhome/msgmanager.c b/aqhome/msgmanager.c index 5e28b6d..e6ba3d2 100644 --- a/aqhome/msgmanager.c +++ b/aqhome/msgmanager.c @@ -24,6 +24,7 @@ #include "aqhome/mqtt/endpoint_mqttc.h" #include +#include #include #include @@ -140,7 +141,7 @@ void _loopOnceOverEndpoints(GWEN_MSG_ENDPOINT_MGR *emgr) ep=GWEN_MsgEndpoint_List_First(endpointList); while(ep) { if (GWEN_MsgEndpoint_GetGroupId(ep) & AQH_MSGMGR_ENDPOINTGROUP_MQTT) { - if (GWEN_TcpcEndpoint_GetState(ep)>=GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED) + if (GWEN_ConnectableMsgEndpoint_GetState(ep)>=GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED) _handleEndpoint(emgr, ep); else { DBG_INFO(AQH_LOGDOMAIN, "Not handling MQTT endpoint right now (not fully connected)");