From 2bc17525978227e1c324e882f0756e6351d4d5c0 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Tue, 25 Apr 2023 00:36:40 +0200 Subject: [PATCH] mqtt: fixed connect issues. no works with connectable endpoint code. --- aqhome/mqtt/endpoint_mqttc.c | 74 ++++++++++++++++++++++++++++++++-- aqhome/mqtt/endpoint_mqttc_p.h | 3 ++ 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/aqhome/mqtt/endpoint_mqttc.c b/aqhome/mqtt/endpoint_mqttc.c index 221a0bf..9a980cf 100644 --- a/aqhome/mqtt/endpoint_mqttc.c +++ b/aqhome/mqtt/endpoint_mqttc.c @@ -22,7 +22,6 @@ #include "aqhome/msg/msg_recvstats.h" #include -#include #include @@ -36,7 +35,8 @@ GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC) static void GWENHYWFAR_CB _freeData(void *bp, void *p); -static void _run(GWEN_MSG_ENDPOINT *ep); +//static void _run(GWEN_MSG_ENDPOINT *ep); +static int _connect(GWEN_MSG_ENDPOINT *ep); static void _sendConnectMsg(GWEN_MSG_ENDPOINT *ep); static void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep); static void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); @@ -71,8 +71,10 @@ GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const GWEN_MsgEndpoint_SetDefaultBufferSize(ep, GWEN_ENDPOINT_MQTTC_BUFFERSIZE); GWEN_MsgEndpoint_SetIsMsgCompleteFn(ep, _isMsgComplete); - xep->previousRunFn=GWEN_MsgEndpoint_SetRunFn(ep, _run); + //xep->previousRunFn=GWEN_MsgEndpoint_SetRunFn(ep, _run); GWEN_MsgEndpoint_SetProcessOutMsgFn(ep, _processOutMessage); + xep->previousConnectFn=GWEN_ConnectableMsgEndpoint_SetConnectFn(ep, _connect); + GWEN_ConnectableMsgEndpoint_SetFullyConnectedState(ep, GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED); return ep; } @@ -216,16 +218,79 @@ void _run(GWEN_MSG_ENDPOINT *ep) +int _connect(GWEN_MSG_ENDPOINT *ep) +{ + AQH_ENDPOINT_MQTTC *xep; + + DBG_DEBUG(AQH_LOGDOMAIN, "Called CONNECT"); + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep); + if (xep) { + int state; + + state=GWEN_ConnectableMsgEndpoint_GetState(ep); + if (statepreviousConnectFn) { + int rv; + + rv=xep->previousConnectFn(ep); + if (rv<0) { + if (rv==GWEN_ERROR_TRY_AGAIN) { + DBG_DEBUG(AQH_LOGDOMAIN, "Still connecting..."); + } + else { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + } + return rv; + } + state=GWEN_ConnectableMsgEndpoint_GetState(ep); + } + } + else { + DBG_DEBUG(AQH_LOGDOMAIN, "Physically connected, checking further."); + } + + if (statekeepAliveTime, xep->clientId, NULL, NULL); if (msg) { - DBG_INFO(AQH_LOGDOMAIN, "Sending MQTT CONNECT request."); + DBG_DEBUG(AQH_LOGDOMAIN, "Sending MQTT CONNECT request."); GWEN_MsgEndpoint_AddSendMessage(ep, msg); GWEN_ConnectableMsgEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK); } @@ -237,6 +302,7 @@ void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep) { GWEN_MSG *msg; + DBG_DEBUG(AQH_LOGDOMAIN, "Checking for connect response"); msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep); if (msg) { uint8_t msgType; diff --git a/aqhome/mqtt/endpoint_mqttc_p.h b/aqhome/mqtt/endpoint_mqttc_p.h index 7433c00..1d34b01 100644 --- a/aqhome/mqtt/endpoint_mqttc_p.h +++ b/aqhome/mqtt/endpoint_mqttc_p.h @@ -12,6 +12,8 @@ #include "aqhome/mqtt/endpoint_mqttc.h" +#include + typedef struct AQH_ENDPOINT_MQTTC AQH_ENDPOINT_MQTTC; struct AQH_ENDPOINT_MQTTC { @@ -21,6 +23,7 @@ struct AQH_ENDPOINT_MQTTC { uint16_t keepAliveTime; GWEN_MSG_ENDPOINT_RUN_FN previousRunFn; + GWEN_CONN_ENDPOINT_CONNECT_FN previousConnectFn; };