aqhome: use new type GWEN_ConnectableMsgEndpoint.
This allows for reconnect of endpoints if necessary.
This commit is contained in:
@@ -189,11 +189,6 @@ int testMqttConnection()
|
|||||||
return 2;
|
return 2;
|
||||||
}
|
}
|
||||||
GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTcp);
|
GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTcp);
|
||||||
rv=GWEN_TcpcEndpoint_StartConnect(epTcp);
|
|
||||||
if (rv<0) {
|
|
||||||
DBG_ERROR(NULL, "Error starting connect (%d)", rv);
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
fprintf(stdout, "Sending CONNECT\n");
|
fprintf(stdout, "Sending CONNECT\n");
|
||||||
msgOut=GWEN_ConnectMqttMsg_new("MQTT", 4, 0, 10, "CLIENTID123", NULL, NULL);
|
msgOut=GWEN_ConnectMqttMsg_new("MQTT", 4, 0, 10, "CLIENTID123", NULL, NULL);
|
||||||
|
|||||||
@@ -22,6 +22,7 @@
|
|||||||
#include "aqhome/msg/msg_recvstats.h"
|
#include "aqhome/msg/msg_recvstats.h"
|
||||||
|
|
||||||
#include <gwenhywfar/endpoint_tcpc.h>
|
#include <gwenhywfar/endpoint_tcpc.h>
|
||||||
|
#include <gwenhywfar/endpoint_connectable.h>
|
||||||
#include <gwenhywfar/debug.h>
|
#include <gwenhywfar/debug.h>
|
||||||
|
|
||||||
|
|
||||||
@@ -198,27 +199,17 @@ void _run(GWEN_MSG_ENDPOINT *ep)
|
|||||||
if (xep->previousRunFn)
|
if (xep->previousRunFn)
|
||||||
xep->previousRunFn(ep);
|
xep->previousRunFn(ep);
|
||||||
|
|
||||||
state=GWEN_TcpcEndpoint_GetState(ep);
|
state=GWEN_ConnectableMsgEndpoint_GetState(ep);
|
||||||
if (state==GWEN_MSG_ENDPOINT_TCPC_STATE_UNCONNECTED) {
|
if (state==GWEN_MSG_ENDPOINT_CONN_STATE_CONNECTED)
|
||||||
int rv;
|
|
||||||
|
|
||||||
rv=GWEN_TcpcEndpoint_StartConnect(ep);
|
|
||||||
if (rv<0) {
|
|
||||||
DBG_INFO(AQH_LOGDOMAIN, "Error starting to connect (%d)", rv);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (state==GWEN_MSG_ENDPOINT_TCPC_STATE_CONNECTING) {
|
|
||||||
DBG_DEBUG(AQH_LOGDOMAIN, "Still connecting");
|
|
||||||
}
|
|
||||||
else if (state==GWEN_MSG_ENDPOINT_TCPC_STATE_CONNECTED)
|
|
||||||
_sendConnectMsg(ep);
|
_sendConnectMsg(ep);
|
||||||
else if (state==GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK)
|
else if (state==GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK)
|
||||||
_checkForConnAckMsg(ep);
|
_checkForConnAckMsg(ep);
|
||||||
else if (state==GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED){
|
else if (state==GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED ||
|
||||||
|
state<GWEN_MSG_ENDPOINT_CONN_STATE_CONNECTED){
|
||||||
/* nothing to do */
|
/* nothing to do */
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
DBG_ERROR(AQH_LOGDOMAIN, "Unhandled connection status %d", state);
|
DBG_INFO(AQH_LOGDOMAIN, "Unhandled connection status %d", state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -236,7 +227,7 @@ void _sendConnectMsg(GWEN_MSG_ENDPOINT *ep)
|
|||||||
if (msg) {
|
if (msg) {
|
||||||
DBG_INFO(AQH_LOGDOMAIN, "Sending MQTT CONNECT request.");
|
DBG_INFO(AQH_LOGDOMAIN, "Sending MQTT CONNECT request.");
|
||||||
GWEN_MsgEndpoint_AddSendMessage(ep, msg);
|
GWEN_MsgEndpoint_AddSendMessage(ep, msg);
|
||||||
GWEN_TcpcEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK);
|
GWEN_ConnectableMsgEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -253,7 +244,7 @@ void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep)
|
|||||||
msgType=AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0;
|
msgType=AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0;
|
||||||
if (msgType==AQH_MQTTMSG_MSGTYPE_CONNACK) {
|
if (msgType==AQH_MQTTMSG_MSGTYPE_CONNACK) {
|
||||||
DBG_INFO(AQH_LOGDOMAIN, "MQTT CONNACK received, logical connection established.");
|
DBG_INFO(AQH_LOGDOMAIN, "MQTT CONNACK received, logical connection established.");
|
||||||
GWEN_TcpcEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED);
|
GWEN_ConnectableMsgEndpoint_SetState(ep, GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
DBG_ERROR(AQH_LOGDOMAIN, "Unexpected message received (%s)", AQH_MqttMsg_MsgTypeToString(msgType));
|
DBG_ERROR(AQH_LOGDOMAIN, "Unexpected message received (%s)", AQH_MqttMsg_MsgTypeToString(msgType));
|
||||||
@@ -266,7 +257,7 @@ void _checkForConnAckMsg(GWEN_MSG_ENDPOINT *ep)
|
|||||||
|
|
||||||
void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg)
|
void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg)
|
||||||
{
|
{
|
||||||
if (GWEN_TcpcEndpoint_GetState(ep)==GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED) {
|
if (GWEN_ConnectableMsgEndpoint_GetState(ep)==GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED) {
|
||||||
DBG_DEBUG(AQH_LOGDOMAIN, "Processing output message");
|
DBG_DEBUG(AQH_LOGDOMAIN, "Processing output message");
|
||||||
switch(AQH_NodeMsg_GetMsgType(nodeMsg)) {
|
switch(AQH_NodeMsg_GetMsgType(nodeMsg)) {
|
||||||
case AQH_MSG_TYPE_VALUE2:
|
case AQH_MSG_TYPE_VALUE2:
|
||||||
|
|||||||
@@ -14,8 +14,8 @@
|
|||||||
|
|
||||||
#include <gwenhywfar/endpoint.h>
|
#include <gwenhywfar/endpoint.h>
|
||||||
|
|
||||||
#define GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK (GWEN_MSG_ENDPOINT_TCPC_STATE_NEXTFREE+0)
|
#define GWEN_ENDPOINT_MQTTC_STATE_WAITFORCONNACK (GWEN_MSG_ENDPOINT_CONN_STATE_NEXTFREE+0)
|
||||||
#define GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED (GWEN_MSG_ENDPOINT_TCPC_STATE_NEXTFREE+1)
|
#define GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED (GWEN_MSG_ENDPOINT_CONN_STATE_NEXTFREE+1)
|
||||||
|
|
||||||
|
|
||||||
AQHOME_API GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const char *name, int groupId);
|
AQHOME_API GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *host, int port, const char *name, int groupId);
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
#include "aqhome/msg/endpoint_node.h"
|
#include "aqhome/msg/endpoint_node.h"
|
||||||
#include "aqhome/msg/msg_node.h"
|
#include "aqhome/msg/msg_node.h"
|
||||||
|
|
||||||
|
#include <gwenhywfar/endpoint_connectable.h>
|
||||||
#include <gwenhywfar/inherit.h>
|
#include <gwenhywfar/inherit.h>
|
||||||
#include <gwenhywfar/debug.h>
|
#include <gwenhywfar/debug.h>
|
||||||
#include <gwenhywfar/text.h>
|
#include <gwenhywfar/text.h>
|
||||||
@@ -40,13 +41,12 @@ GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_TTY)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
static int _getReadFd(GWEN_MSG_ENDPOINT *ep);
|
|
||||||
static int _getWriteFd(GWEN_MSG_ENDPOINT *ep);
|
|
||||||
static int _handleReadable(GWEN_MSG_ENDPOINT *ep, GWEN_UNUSED GWEN_MSG_ENDPOINT_MGR *emgr);
|
static int _handleReadable(GWEN_MSG_ENDPOINT *ep, GWEN_UNUSED GWEN_MSG_ENDPOINT_MGR *emgr);
|
||||||
static int _handleWritable(GWEN_MSG_ENDPOINT *ep, GWEN_UNUSED GWEN_MSG_ENDPOINT_MGR *emgr);
|
static int _handleWritable(GWEN_MSG_ENDPOINT *ep, GWEN_UNUSED GWEN_MSG_ENDPOINT_MGR *emgr);
|
||||||
|
|
||||||
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
|
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
|
||||||
|
|
||||||
|
static int _connect(GWEN_MSG_ENDPOINT *ep);
|
||||||
static int _startMsg(GWEN_MSG_ENDPOINT *ep);
|
static int _startMsg(GWEN_MSG_ENDPOINT *ep);
|
||||||
static int _endMsg(GWEN_MSG_ENDPOINT *ep);
|
static int _endMsg(GWEN_MSG_ENDPOINT *ep);
|
||||||
static int _isLineBusy(GWEN_MSG_ENDPOINT *ep);
|
static int _isLineBusy(GWEN_MSG_ENDPOINT *ep);
|
||||||
@@ -64,19 +64,23 @@ GWEN_MSG_ENDPOINT *AQH_TtyNodeEndpoint_new(const char *devicePath, int groupId)
|
|||||||
{
|
{
|
||||||
GWEN_MSG_ENDPOINT *ep;
|
GWEN_MSG_ENDPOINT *ep;
|
||||||
AQH_MSG_ENDPOINT_TTY *xep;
|
AQH_MSG_ENDPOINT_TTY *xep;
|
||||||
int fd;
|
// int fd;
|
||||||
|
|
||||||
ep=AQH_NodeEndpoint_new(AQH_MSG_ENDPOINT_TTY_NAME, groupId);
|
ep=AQH_NodeEndpoint_new(AQH_MSG_ENDPOINT_TTY_NAME, groupId);
|
||||||
|
|
||||||
|
GWEN_ConnectableMsgEndpoint_Extend(ep);
|
||||||
|
GWEN_ConnectableMsgEndpoint_SetReconnectWaitTime(ep, 5);
|
||||||
|
|
||||||
GWEN_NEW_OBJECT(AQH_MSG_ENDPOINT_TTY, xep);
|
GWEN_NEW_OBJECT(AQH_MSG_ENDPOINT_TTY, xep);
|
||||||
GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_TTY, ep, xep, _freeData);
|
GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_TTY, ep, xep, _freeData);
|
||||||
|
|
||||||
GWEN_MsgEndpoint_SetHandleReadableFn(ep, _handleReadable);
|
GWEN_MsgEndpoint_SetHandleReadableFn(ep, _handleReadable);
|
||||||
GWEN_MsgEndpoint_SetHandleWritableFn(ep, _handleWritable);
|
GWEN_MsgEndpoint_SetHandleWritableFn(ep, _handleWritable);
|
||||||
GWEN_MsgEndpoint_SetGetReadFdFn(ep, _getReadFd);
|
GWEN_ConnectableMsgEndpoint_SetConnectFn(ep, _connect);
|
||||||
GWEN_MsgEndpoint_SetGetWriteFdFn(ep, _getWriteFd);
|
|
||||||
|
|
||||||
xep->deviceName=strdup(devicePath);
|
xep->deviceName=strdup(devicePath);
|
||||||
|
|
||||||
|
#if 0
|
||||||
fd=_openDevice(ep);
|
fd=_openDevice(ep);
|
||||||
if (fd<0) {
|
if (fd<0) {
|
||||||
DBG_INFO(NULL, "here (%d)", fd);
|
DBG_INFO(NULL, "here (%d)", fd);
|
||||||
@@ -85,6 +89,7 @@ GWEN_MSG_ENDPOINT *AQH_TtyNodeEndpoint_new(const char *devicePath, int groupId)
|
|||||||
}
|
}
|
||||||
GWEN_MsgEndpoint_SetFd(ep, fd);
|
GWEN_MsgEndpoint_SetFd(ep, fd);
|
||||||
_attnHigh(ep);
|
_attnHigh(ep);
|
||||||
|
#endif
|
||||||
|
|
||||||
return ep;
|
return ep;
|
||||||
}
|
}
|
||||||
@@ -101,16 +106,31 @@ void _freeData(void *bp, void *p)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
int _getReadFd(GWEN_MSG_ENDPOINT *ep)
|
int _connect(GWEN_MSG_ENDPOINT *ep)
|
||||||
{
|
{
|
||||||
return GWEN_MsgEndpoint_GetFd(ep);
|
AQH_MSG_ENDPOINT_TTY *xep;
|
||||||
}
|
|
||||||
|
|
||||||
|
xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_TTY, ep);
|
||||||
|
if (xep) {
|
||||||
|
int state;
|
||||||
|
|
||||||
|
state=GWEN_ConnectableMsgEndpoint_GetState(ep);
|
||||||
|
if (state<GWEN_MSG_ENDPOINT_CONN_STATE_CONNECTED) {
|
||||||
|
int fd;
|
||||||
|
|
||||||
|
DBG_INFO(AQH_LOGDOMAIN, "Opening device %s", xep->deviceName);
|
||||||
|
fd=_openDevice(ep);
|
||||||
|
if (fd<0) {
|
||||||
|
DBG_INFO(NULL, "here (%d)", fd);
|
||||||
|
return fd;
|
||||||
|
}
|
||||||
|
GWEN_MsgEndpoint_SetFd(ep, fd);
|
||||||
|
_attnHigh(ep);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int _getWriteFd(GWEN_MSG_ENDPOINT *ep)
|
return GWEN_ERROR_GENERIC;
|
||||||
{
|
|
||||||
return GWEN_MsgEndpoint_HaveMessageToSend(ep)?GWEN_MsgEndpoint_GetFd(ep):GWEN_ERROR_NO_DATA;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -24,6 +24,7 @@
|
|||||||
#include "aqhome/mqtt/endpoint_mqttc.h"
|
#include "aqhome/mqtt/endpoint_mqttc.h"
|
||||||
|
|
||||||
#include <gwenhywfar/endpoint_tcpc.h>
|
#include <gwenhywfar/endpoint_tcpc.h>
|
||||||
|
#include <gwenhywfar/endpoint_connectable.h>
|
||||||
#include <gwenhywfar/misc.h>
|
#include <gwenhywfar/misc.h>
|
||||||
#include <gwenhywfar/debug.h>
|
#include <gwenhywfar/debug.h>
|
||||||
|
|
||||||
@@ -140,7 +141,7 @@ void _loopOnceOverEndpoints(GWEN_MSG_ENDPOINT_MGR *emgr)
|
|||||||
ep=GWEN_MsgEndpoint_List_First(endpointList);
|
ep=GWEN_MsgEndpoint_List_First(endpointList);
|
||||||
while(ep) {
|
while(ep) {
|
||||||
if (GWEN_MsgEndpoint_GetGroupId(ep) & AQH_MSGMGR_ENDPOINTGROUP_MQTT) {
|
if (GWEN_MsgEndpoint_GetGroupId(ep) & AQH_MSGMGR_ENDPOINTGROUP_MQTT) {
|
||||||
if (GWEN_TcpcEndpoint_GetState(ep)>=GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED)
|
if (GWEN_ConnectableMsgEndpoint_GetState(ep)>=GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED)
|
||||||
_handleEndpoint(emgr, ep);
|
_handleEndpoint(emgr, ep);
|
||||||
else {
|
else {
|
||||||
DBG_INFO(AQH_LOGDOMAIN, "Not handling MQTT endpoint right now (not fully connected)");
|
DBG_INFO(AQH_LOGDOMAIN, "Not handling MQTT endpoint right now (not fully connected)");
|
||||||
|
|||||||
Reference in New Issue
Block a user