mqtt: fixed connect issues.
no works with connectable endpoint code.
This commit is contained in:
@@ -22,7 +22,6 @@
|
|||||||
#include "aqhome/msg/msg_recvstats.h"
|
#include "aqhome/msg/msg_recvstats.h"
|
||||||
|
|
||||||
#include <gwenhywfar/endpoint_tcpc.h>
|
#include <gwenhywfar/endpoint_tcpc.h>
|
||||||
#include <gwenhywfar/endpoint_connectable.h>
|
|
||||||
#include <gwenhywfar/debug.h>
|
#include <gwenhywfar/debug.h>
|
||||||
|
|
||||||
|
|
||||||
@@ -36,7 +35,8 @@ GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC)
|
|||||||
|
|
||||||
|
|
||||||
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
|
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
|
||||||
static void _run(GWEN_MSG_ENDPOINT *ep);
|
//static void _run(GWEN_MSG_ENDPOINT *ep);
|
||||||
|
static int _connect(GWEN_MSG_ENDPOINT *ep);
|
||||||
static void _sendConnectMsg(GWEN_MSG_ENDPOINT *ep);
|
static void _sendConnectMsg(GWEN_MSG_ENDPOINT *ep);
|
||||||
static void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep);
|
static void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep);
|
||||||
static void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg);
|
static void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg);
|
||||||
@@ -71,8 +71,10 @@ GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const
|
|||||||
|
|
||||||
GWEN_MsgEndpoint_SetDefaultBufferSize(ep, GWEN_ENDPOINT_MQTTC_BUFFERSIZE);
|
GWEN_MsgEndpoint_SetDefaultBufferSize(ep, GWEN_ENDPOINT_MQTTC_BUFFERSIZE);
|
||||||
GWEN_MsgEndpoint_SetIsMsgCompleteFn(ep, _isMsgComplete);
|
GWEN_MsgEndpoint_SetIsMsgCompleteFn(ep, _isMsgComplete);
|
||||||
xep->previousRunFn=GWEN_MsgEndpoint_SetRunFn(ep, _run);
|
//xep->previousRunFn=GWEN_MsgEndpoint_SetRunFn(ep, _run);
|
||||||
GWEN_MsgEndpoint_SetProcessOutMsgFn(ep, _processOutMessage);
|
GWEN_MsgEndpoint_SetProcessOutMsgFn(ep, _processOutMessage);
|
||||||
|
xep->previousConnectFn=GWEN_ConnectableMsgEndpoint_SetConnectFn(ep, _connect);
|
||||||
|
GWEN_ConnectableMsgEndpoint_SetFullyConnectedState(ep, GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED);
|
||||||
|
|
||||||
return ep;
|
return ep;
|
||||||
}
|
}
|
||||||
@@ -216,16 +218,79 @@ void _run(GWEN_MSG_ENDPOINT *ep)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int _connect(GWEN_MSG_ENDPOINT *ep)
|
||||||
|
{
|
||||||
|
AQH_ENDPOINT_MQTTC *xep;
|
||||||
|
|
||||||
|
DBG_DEBUG(AQH_LOGDOMAIN, "Called CONNECT");
|
||||||
|
xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep);
|
||||||
|
if (xep) {
|
||||||
|
int state;
|
||||||
|
|
||||||
|
state=GWEN_ConnectableMsgEndpoint_GetState(ep);
|
||||||
|
if (state<GWEN_MSG_ENDPOINT_CONN_STATE_CONNECTED) {
|
||||||
|
DBG_DEBUG(AQH_LOGDOMAIN, "Not yet physically connected.");
|
||||||
|
if (xep->previousConnectFn) {
|
||||||
|
int rv;
|
||||||
|
|
||||||
|
rv=xep->previousConnectFn(ep);
|
||||||
|
if (rv<0) {
|
||||||
|
if (rv==GWEN_ERROR_TRY_AGAIN) {
|
||||||
|
DBG_DEBUG(AQH_LOGDOMAIN, "Still connecting...");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
|
||||||
|
}
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
state=GWEN_ConnectableMsgEndpoint_GetState(ep);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
DBG_DEBUG(AQH_LOGDOMAIN, "Physically connected, checking further.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state<GWEN_MSG_ENDPOINT_CONN_STATE_CONNECTED) {
|
||||||
|
/* still not connected */
|
||||||
|
DBG_DEBUG(AQH_LOGDOMAIN, "Still not physically connected.");
|
||||||
|
}
|
||||||
|
else if (state==GWEN_MSG_ENDPOINT_CONN_STATE_CONNECTED) {
|
||||||
|
DBG_DEBUG(AQH_LOGDOMAIN, "Physically connected, sending connect msg.");
|
||||||
|
/* freshly connected, initiate logical connection */
|
||||||
|
_sendConnectMsg(ep);
|
||||||
|
return GWEN_ERROR_TRY_AGAIN;
|
||||||
|
}
|
||||||
|
else if (state==GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK) {
|
||||||
|
DBG_DEBUG(AQH_LOGDOMAIN, "Waiting for response to connect msg.");
|
||||||
|
/* connect message sent, check for response */
|
||||||
|
_checkForConnAckMsg(ep);
|
||||||
|
}
|
||||||
|
else if (state==GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED ||
|
||||||
|
state<GWEN_MSG_ENDPOINT_CONN_STATE_CONNECTED){
|
||||||
|
/* nothing to do */
|
||||||
|
DBG_DEBUG(AQH_LOGDOMAIN, "Fully connected.");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
DBG_INFO(AQH_LOGDOMAIN, "Unhandled connection status %d", state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void _sendConnectMsg(GWEN_MSG_ENDPOINT *ep)
|
void _sendConnectMsg(GWEN_MSG_ENDPOINT *ep)
|
||||||
{
|
{
|
||||||
AQH_ENDPOINT_MQTTC *xep;
|
AQH_ENDPOINT_MQTTC *xep;
|
||||||
GWEN_MSG *msg;
|
GWEN_MSG *msg;
|
||||||
|
|
||||||
|
DBG_INFO(AQH_LOGDOMAIN, "Sending connect msg");
|
||||||
xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep);
|
xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_MQTTC, ep);
|
||||||
/* send CONNECT */
|
/* send CONNECT */
|
||||||
msg=GWEN_ConnectMqttMsg_new("MQTT", 4, 0, xep->keepAliveTime, xep->clientId, NULL, NULL);
|
msg=GWEN_ConnectMqttMsg_new("MQTT", 4, 0, xep->keepAliveTime, xep->clientId, NULL, NULL);
|
||||||
if (msg) {
|
if (msg) {
|
||||||
DBG_INFO(AQH_LOGDOMAIN, "Sending MQTT CONNECT request.");
|
DBG_DEBUG(AQH_LOGDOMAIN, "Sending MQTT CONNECT request.");
|
||||||
GWEN_MsgEndpoint_AddSendMessage(ep, msg);
|
GWEN_MsgEndpoint_AddSendMessage(ep, msg);
|
||||||
GWEN_ConnectableMsgEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK);
|
GWEN_ConnectableMsgEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK);
|
||||||
}
|
}
|
||||||
@@ -237,6 +302,7 @@ void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep)
|
|||||||
{
|
{
|
||||||
GWEN_MSG *msg;
|
GWEN_MSG *msg;
|
||||||
|
|
||||||
|
DBG_DEBUG(AQH_LOGDOMAIN, "Checking for connect response");
|
||||||
msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep);
|
msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep);
|
||||||
if (msg) {
|
if (msg) {
|
||||||
uint8_t msgType;
|
uint8_t msgType;
|
||||||
|
|||||||
@@ -12,6 +12,8 @@
|
|||||||
|
|
||||||
#include "aqhome/mqtt/endpoint_mqttc.h"
|
#include "aqhome/mqtt/endpoint_mqttc.h"
|
||||||
|
|
||||||
|
#include <gwenhywfar/endpoint_connectable.h>
|
||||||
|
|
||||||
|
|
||||||
typedef struct AQH_ENDPOINT_MQTTC AQH_ENDPOINT_MQTTC;
|
typedef struct AQH_ENDPOINT_MQTTC AQH_ENDPOINT_MQTTC;
|
||||||
struct AQH_ENDPOINT_MQTTC {
|
struct AQH_ENDPOINT_MQTTC {
|
||||||
@@ -21,6 +23,7 @@ struct AQH_ENDPOINT_MQTTC {
|
|||||||
uint16_t keepAliveTime;
|
uint16_t keepAliveTime;
|
||||||
|
|
||||||
GWEN_MSG_ENDPOINT_RUN_FN previousRunFn;
|
GWEN_MSG_ENDPOINT_RUN_FN previousRunFn;
|
||||||
|
GWEN_CONN_ENDPOINT_CONNECT_FN previousConnectFn;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user