diff --git a/apps/aqhome-nodes/0BUILD b/apps/aqhome-nodes/0BUILD index 243ce46..402448b 100644 --- a/apps/aqhome-nodes/0BUILD +++ b/apps/aqhome-nodes/0BUILD @@ -44,6 +44,7 @@ r_forward.h r_setaccmsggrps.h r_getnodes.h + r_publish.h server.h server_p.h @@ -59,6 +60,7 @@ r_forward.c r_setaccmsggrps.c r_getnodes.c + r_publish.c main.c diff --git a/apps/aqhome-nodes/main.c b/apps/aqhome-nodes/main.c index 0d6fc9e..0c919d7 100644 --- a/apps/aqhome-nodes/main.c +++ b/apps/aqhome-nodes/main.c @@ -36,6 +36,7 @@ #define CONNCLEAN_INTERVAL_IN_SECS 2 #define CONNCHECK_INTERVAL_IN_SECS 10 +#define PING_INTERVAL_IN_SECS 120 @@ -133,11 +134,13 @@ void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop) int timeout; time_t timeLastConnectionCleanup; time_t timeLastConnCheck; + time_t timeLastPingSend; timeout=AQH_NodeServer_GetTimeout(aqh); timeStart=time(NULL); timeLastConnectionCleanup=time(NULL); timeLastConnCheck=time(NULL); + timeLastPingSend=time(NULL); while(!stopService) { time_t now; @@ -146,6 +149,7 @@ void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop) AQH_NodeServer_HandleTtyMsgs(aqh); AQH_NodeServer_HandleClientMsgs(aqh); AQH_NodeServer_HandleBrokerMsgs(aqh); + AQH_NodeServer_HandleMqttMsgs(aqh); now=time(NULL); @@ -159,9 +163,20 @@ void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop) DBG_INFO(NULL, "Check connections"); AQH_NodeServer_CheckBrokerConnection(aqh); AQH_NodeServer_CheckTtyConnection(aqh); + AQH_NodeServer_CheckMqttConnection(aqh); timeLastConnCheck=now; } + if (_diffInSeconds(now, timeLastPingSend)>PING_INTERVAL_IN_SECS) { + int rv; + + rv=AQH_NodeServer_SendPing(aqh); + if (rv<0) { + DBG_INFO(NULL, "Error sending PING"); + } + timeLastPingSend=time(NULL); + } + if (timeout && (_diffInSeconds(now, timeStart)>timeout)) { DBG_INFO(NULL, "Timeout"); break; diff --git a/apps/aqhome-nodes/r_publish.c b/apps/aqhome-nodes/r_publish.c new file mode 100644 index 0000000..897fac4 --- /dev/null +++ b/apps/aqhome-nodes/r_publish.c @@ -0,0 +1,34 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./r_publish.h" +#include "./server_p.h" +#include "aqhome/ipc2/endpoint.h" +#include + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + +void AQH_NodeServer_HandlePublishMsg(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + +} + + + diff --git a/apps/aqhome-nodes/r_publish.h b/apps/aqhome-nodes/r_publish.h new file mode 100644 index 0000000..b68046e --- /dev/null +++ b/apps/aqhome-nodes/r_publish.h @@ -0,0 +1,28 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOMED_R_PUBLISH_H +#define AQHOMED_R_PUBLISH_H + + +#include "./server.h" + +#include +#include + + +void AQH_NodeServer_HandlePublishMsg(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg); + + + + + + +#endif + + diff --git a/apps/aqhome-nodes/server.c b/apps/aqhome-nodes/server.c index 320f612..8a3f3a7 100644 --- a/apps/aqhome-nodes/server.c +++ b/apps/aqhome-nodes/server.c @@ -18,6 +18,7 @@ #include "./r_forward.h" #include "./r_setaccmsggrps.h" #include "./r_getnodes.h" +#include "./r_publish.h" #include #include @@ -41,6 +42,14 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include #include #include @@ -70,13 +79,15 @@ #define AQH_NODE_SERVER_BROKER_RESTARTTIME 10 #define AQH_NODE_SERVER_TTY_RESTARTTIME 10 +#define AQH_NODE_SERVER_MQTT_RESTARTTIME 10 enum { AQH_NODE_SERVER_SLOT_NEWCLIENT=1, AQH_NODE_SERVER_SLOT_CLIENTCLOSED, AQH_NODE_SERVER_SLOT_BROKERCLOSED, - AQH_NODE_SERVER_SLOT_TTYCLOSED + AQH_NODE_SERVER_SLOT_TTYCLOSED, + AQH_NODE_SERVER_SLOT_MQTTCLOSED, }; @@ -97,13 +108,17 @@ GWEN_INHERIT(AQH_OBJECT, AQH_NODE_SERVER) static void GWENHYWFAR_CB _freeData(void *bp, void *p); static void _readConfig(AQH_OBJECT *o, AQH_NODE_SERVER *xo, GWEN_DB_NODE *dbArgs); -static const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue); -static int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue); +static const char *_readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue); +static int _readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue); static int _startIpc(AQH_OBJECT *o, AQH_NODE_SERVER *xo); static int _startTty(AQH_OBJECT *o, AQH_NODE_SERVER *xo); static int _startBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo); static int _exchangeConnect(AQH_NODE_SERVER *xo, uint32_t flags); +static int _startMqtt(AQH_OBJECT *o, AQH_NODE_SERVER *xo); +static int _exchangeMqttConnect(AQH_NODE_SERVER *xo); +static int _exchangeMqttSubscribe(AQH_NODE_SERVER *xo); + static void _setupDb(AQH_NODE_SERVER *xo); static int _loadDeviceList(AQH_NODE_SERVER *xo); @@ -112,11 +127,15 @@ static int _handleNewIpcClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *c static int _handleIpcClientDown(AQH_OBJECT *clientEndpoint); static int _handleBrokerDown(AQH_NODE_SERVER *xo); static int _handleTtyDown(AQH_NODE_SERVER *xo); +static int _handleMqttDown(AQH_NODE_SERVER *xo); static void _handleMsgsFromClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *ep); static void _handleMsgFromClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *ep, const AQH_MESSAGE *msg); - +static void _handleMsgFromMqtt(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg); static void _handleMsgFromTty(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _handleMqttMsgPingRsp(void); + + static void _writeTtyMsgToLogFile(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); static void _forwardTtyMsgToBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); static void _forwardValueMessageToBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); @@ -178,6 +197,7 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) AQH_Object_free(xo->ipcEndpoint); AQH_Object_free(xo->ttyEndpoint); AQH_Object_free(xo->brokerEndpoint); + AQH_Object_free(xo->mqttEndpoint); AQHNODE_Device_List_free(xo->deviceDefList); GWEN_DB_Group_free(xo->dbArgs); AQH_MsgRequest_free(xo->requestTree); @@ -190,6 +210,9 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) free(xo->tcpAddress); free(xo->brokerAddress); free(xo->brokerClientId); + free(xo->mqttAddress); + free(xo->mqttClientId); + free(xo->mqttDiscoveryPrefix); GWEN_FREE_OBJECT(xo); } @@ -361,6 +384,81 @@ void AQH_NodeServer_SetBrokerClientId(AQH_OBJECT *o, const char *s) +void AQH_NodeServer_SetMqttAddress(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + free(xo->mqttAddress); + xo->mqttAddress=s?strdup(s):NULL; + } + } +} + + +void AQH_NodeServer_SetMqttPort(AQH_OBJECT *o, int i) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + xo->mqttPort=i; + } + } +} + + + +void AQH_NodeServer_SetMqttClientId(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + free(xo->mqttClientId); + xo->mqttClientId=s?strdup(s):NULL; + } + } +} + + + +void AQH_NodeServer_SetMqttKeepAlive(AQH_OBJECT *o, int i) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + xo->mqttKeepAlive=i; + } + } +} + + + +void AQH_NodeServer_SetMqttDiscoveryPrefix(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + free(xo->mqttDiscoveryPrefix); + xo->mqttDiscoveryPrefix=s?strdup(s):NULL; + } + } +} + + + + + + /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx @@ -427,12 +525,25 @@ int AQH_NodeServer_Init(AQH_OBJECT *o, int argc, char **argv) DBG_INFO(NULL, "here (%d)", rv); return rv; } - DBG_INFO(NULL, "Starting Broker Connection"); - rv=_startBroker(o, xo); - if (rv<0) { - DBG_INFO(NULL, "here (%d)", rv); - return rv; + + if (xo->flags & AQHOMED_FLAGS_START_BROKER) { + DBG_INFO(NULL, "Starting Broker Connection"); + rv=_startBroker(o, xo); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } } + + if (xo->flags & AQHOMED_FLAGS_START_MQTT) { + DBG_INFO(NULL, "Starting MQTT Connection"); + rv=_startMqtt(o, xo); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + } + return 0; } else { @@ -450,23 +561,31 @@ void _readConfig(AQH_OBJECT *o, AQH_NODE_SERVER *xo, GWEN_DB_NODE *dbArgs) xo->timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0); xo->noAttn=GWEN_DB_GetIntValue(dbArgs, "noAttn", 0, 0); xo->nodeAddress=GWEN_DB_GetIntValue(dbArgs, "nodeAddress", 0, AQHOMED_DEFAULT_NODEADDR); + xo->flags|=GWEN_DB_GetIntValue(dbArgs, "startBroker", 0, 1)?AQHOMED_FLAGS_START_BROKER:0; + xo->flags|=GWEN_DB_GetIntValue(dbArgs, "startMqtt", 0, 0)?AQHOMED_FLAGS_START_MQTT:0; AQH_NodeServer_SetDbFile(o, GWEN_DB_GetCharValue(dbArgs, "dbfile", 0, NULL)); AQH_NodeServer_SetLogFile(o, GWEN_DB_GetCharValue(dbArgs, "logfile", 0, NULL)); AQH_NodeServer_SetDevicePath(o, GWEN_DB_GetCharValue(dbArgs, "device", 0, AQHOMED_DEFAULT_DEVICE)); - AQH_NodeServer_SetTpcAddress(o, readCharConfigWithAlt(dbArgs, "tcpAddress", "ConfigFile/nodesAddress", NULL)); - AQH_NodeServer_SetTcpPort(o, readIntConfigWithAlt(dbArgs, "tcpPort", "ConfigFile/nodesPort", AQHOMED_DEFAULT_IPC_PORT, -1)); + AQH_NodeServer_SetTpcAddress(o, _readCharConfigWithAlt(dbArgs, "tcpAddress", "ConfigFile/nodesAddress", NULL)); + AQH_NodeServer_SetTcpPort(o, _readIntConfigWithAlt(dbArgs, "tcpPort", "ConfigFile/nodesPort", AQHOMED_DEFAULT_IPC_PORT, -1)); - AQH_NodeServer_SetBrokerAddress(o, readCharConfigWithAlt(dbArgs, "brokerAddress", "ConfigFile/brokerAddress", "127.0.0.1")); - AQH_NodeServer_SetBrokerPort(o, readIntConfigWithAlt(dbArgs, "brokerPort", "ConfigFile/brokerPort", AQHOMED_DEFAULT_BROKER_PORT, -1)); + AQH_NodeServer_SetBrokerAddress(o, _readCharConfigWithAlt(dbArgs, "brokerAddress", "ConfigFile/brokerAddress", "127.0.0.1")); + AQH_NodeServer_SetBrokerPort(o, _readIntConfigWithAlt(dbArgs, "brokerPort", "ConfigFile/brokerPort", AQHOMED_DEFAULT_BROKER_PORT, -1)); AQH_NodeServer_SetBrokerClientId(o, GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, AQHOMED_DEFAULT_BROKER_CLIENTID)); + AQH_NodeServer_SetMqttAddress(o, _readCharConfigWithAlt(dbArgs, "mqttAddress", "ConfigFile/mqttAddr", "127.0.0.1")); + AQH_NodeServer_SetMqttPort(o, _readIntConfigWithAlt(dbArgs, "mqttPort", "ConfigFile/mqttPort", 1883, -1)); + AQH_NodeServer_SetMqttClientId(o, _readCharConfigWithAlt(dbArgs, "mqttClientId", "ConfigFile/mqttClientId", "aqhome-mqtt")); + AQH_NodeServer_SetMqttKeepAlive(o, _readIntConfigWithAlt(dbArgs, "mqttKeepAlive", "ConfigFile/mqttKeepAlive", 600, -1)); + AQH_NodeServer_SetMqttDiscoveryPrefix(o, _readCharConfigWithAlt(dbArgs, "mqttDiscovery", "ConfigFile/mqttDiscovery", "homeassistant")); + } -const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue) +const char *_readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue) { const char *s; @@ -478,7 +597,7 @@ const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, con -int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue) +int _readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue) { int i; @@ -615,6 +734,132 @@ int _exchangeConnect(AQH_NODE_SERVER *xo, uint32_t flags) + + + +int _startMqtt(AQH_OBJECT *o, AQH_NODE_SERVER *xo) +{ + if (xo->mqttEndpoint) { + AQH_Object_Disable(xo->mqttEndpoint); + AQH_Object_free(xo->mqttEndpoint); + xo->mqttEndpoint=NULL; + } + + if (xo->mqttAddress && *(xo->mqttAddress) && xo->mqttPort) { + AQH_OBJECT *ep; + int fd; + int rv; + + fd=AQH_TcpObject_CreateConnectedSocket(xo->mqttAddress, xo->mqttPort); + if (fd<0) { + DBG_ERROR(NULL, "Error connecting to MQTT server %s:%d", xo->mqttAddress, xo->mqttPort); + return GWEN_ERROR_IO; + } + DBG_INFO(NULL, "Physically connected to MQTT server %s:%d", xo->mqttAddress, xo->mqttPort); + + ep=AQH_MqttClientObject_new(AQH_Object_GetEventLoop(o), fd); + assert(ep); + AQH_Endpoint_SetServiceName(ep, xo->mqttClientId); + AQH_Object_AddLink(ep, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_NODE_SERVER_SLOT_MQTTCLOSED, o); + AQH_Object_Enable(ep); + xo->mqttEndpoint=ep; + + rv=_exchangeMqttConnect(xo); + if (rv!=0) { + DBG_ERROR(NULL, "MQTT: Error exchanging CONNECT request (%d)", rv); + return (rv<0)?rv:GWEN_ERROR_PERMISSIONS; + } + + rv=_exchangeMqttSubscribe(xo); + if (rv!=0) { + DBG_ERROR(NULL, "MQTT: Error exchanging SUBSCRIBE request (%d)", rv); + return (rv<0)?rv:GWEN_ERROR_PERMISSIONS; + } + + DBG_NOTICE(NULL, "Connected to MQTT at %s:%d", xo->mqttAddress, xo->mqttPort); + return 0; + } + else { + DBG_ERROR(NULL, "No MQTT server settings"); + return GWEN_ERROR_BAD_DATA; + } + + return 0; +} + + + +int _exchangeMqttConnect(AQH_NODE_SERVER *xo) +{ + AQH_MESSAGE *msg; + + msg=AQH_MqttMessageConnect_new("MQTT", 0x04, 0, xo->mqttKeepAlive, xo->mqttClientId, NULL, NULL); + AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msg); + + msg=AQH_MqttEndpoint_WaitForConnAckMsg(xo->mqttEndpoint, xo->timeoutInSeconds); + if (msg) { + int resultCode; + + resultCode=AQH_MqttMessageConnAck_GetResultCode(msg); + AQH_Message_free(msg); + if (resultCode==AQH_MQTTMSG_CONNACK_RESULT_ACCEPTED) { + DBG_INFO(AQH_LOGDOMAIN, "Positive CONNACK response"); + return 0; + } + else { + DBG_ERROR(NULL, "Negative CONNACK response: %d", resultCode); + return GWEN_ERROR_GENERIC; + } + } + else { + DBG_ERROR(NULL, "No CONNACK message received."); + return GWEN_ERROR_GENERIC; + } +} + + + +int _exchangeMqttSubscribe(AQH_NODE_SERVER *xo) +{ + uint16_t pckId; + AQH_MESSAGE *msg; + + pckId=AQH_Endpoint_GetNextMessageId(xo->mqttEndpoint); + msg=AQH_MqttMessageSubscribe_new(0, pckId, "#", 0); + AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msg); + + msg=AQH_MqttEndpoint_WaitForMsg(xo->mqttEndpoint, AQH_MQTTMSG_MSGTYPE_SUBACK, xo->timeoutInSeconds); + if (msg) { + int resultCode; + + resultCode=AQH_MqttMessageSubAck_GetResultCode(msg); + AQH_Message_free(msg); + if (resultCode!=128) { + DBG_INFO(AQH_LOGDOMAIN, "Positive SUBACK response"); + return 0; + } + else { + DBG_ERROR(NULL, "Negative SUBACK response: %d", resultCode); + return GWEN_ERROR_GENERIC; + } + } + else { + DBG_ERROR(NULL, "No SUBACK message received."); + return GWEN_ERROR_GENERIC; + } +} + + + + + + + + + + + + void _setupDb(AQH_NODE_SERVER *xo) { if (xo->dbFile) { @@ -1139,7 +1384,7 @@ void AQH_NodeServer_CheckBrokerConnection(AQH_OBJECT *o) AQH_NODE_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); - if (xo && xo->dbArgs) { + if (xo && xo->dbArgs && (xo->flags & AQHOMED_FLAGS_START_BROKER)) { if (xo->brokerEndpoint) { if (AQH_Object_GetFlags(xo->brokerEndpoint) & AQH_OBJECT_FLAGS_DELETE) { @@ -1170,6 +1415,106 @@ void AQH_NodeServer_CheckBrokerConnection(AQH_OBJECT *o) + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * MQTT management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + + +void AQH_NodeServer_HandleMqttMsgs(AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo && xo->mqttEndpoint) { + AQH_MESSAGE *msg; + + while( (msg=AQH_Endpoint_GetNextMsgIn(xo->mqttEndpoint)) ) { + AQH_Message_SetObject(msg, xo->mqttEndpoint); + _handleMsgFromMqtt(o, xo->mqttEndpoint, msg); + AQH_Message_free(msg); + } + } +} + + + +void _handleMsgFromMqtt(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + uint8_t code; + + /* exec IPC message */ + code=AQH_MqttMessage_GetTypeAndFlags(msg); + switch(code & 0xf0) { + case (AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0): AQH_NodeServer_HandlePublishMsg(o, ep, msg); break; + case (AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0): _handleMqttMsgPingRsp(); break; + default: break; + } +} + + + +void _handleMqttMsgPingRsp(void) +{ + DBG_INFO(NULL, "PING response received"); +} + + + +void AQH_NodeServer_CheckMqttConnection(AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo && (xo->flags & AQHOMED_FLAGS_START_MQTT)) { + if (xo->mqttEndpoint) { + if (AQH_Object_GetFlags(xo->mqttEndpoint) & AQH_OBJECT_FLAGS_DELETE) { + DBG_INFO(NULL, "Deleting mqtt connection"); + AQH_Object_Disable(xo->mqttEndpoint); + AQH_Object_free(xo->mqttEndpoint); + xo->mqttEndpoint=NULL; + } + } + + if (xo->mqttEndpoint==NULL) { + time_t now; + + now=time(NULL); + if (_diffInSeconds(now, xo->timestampMqttDown)>AQH_NODE_SERVER_MQTT_RESTARTTIME) { + int rv; + + DBG_INFO(NULL, "Restarting MQTT connection"); + rv=_startMqtt(o, xo); + if (rv<0) { + DBG_ERROR(NULL, "here (%d)", rv); + } + } + } + } +} + + + +int AQH_NodeServer_SendPing(AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo && xo->mqttEndpoint) { + AQH_MESSAGE *msgOut; + + DBG_INFO(NULL, "Sending PING"); + msgOut=AQH_MqttMessage_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL); + AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msgOut); + return 0; + } + return GWEN_ERROR_INVALID; +} + + + + /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * request management functions * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx @@ -1231,6 +1576,7 @@ int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, GWEN case AQH_NODE_SERVER_SLOT_CLIENTCLOSED: return _handleIpcClientDown(senderObject); case AQH_NODE_SERVER_SLOT_BROKERCLOSED: return _handleBrokerDown(xo); case AQH_NODE_SERVER_SLOT_TTYCLOSED: return _handleTtyDown(xo); + case AQH_NODE_SERVER_SLOT_MQTTCLOSED: return _handleMqttDown(xo); default: break; } @@ -1284,6 +1630,18 @@ int _handleTtyDown(AQH_NODE_SERVER *xo) +int _handleMqttDown(AQH_NODE_SERVER *xo) +{ + if (xo->mqttEndpoint) { + DBG_WARN(NULL, "MQTT connection down"); + AQH_Object_AddFlags(xo->mqttEndpoint, AQH_OBJECT_FLAGS_DELETE); + xo->timestampMqttDown=time(NULL); + } + return 1; +} + + + /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx @@ -1411,10 +1769,16 @@ int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) { A_ARG, A_CHAR, "brokerAddress", 0, 1, "ba", "brokerddress", I18S("Broker address [127.0.0.1]"), NULL}, { A_ARG, A_INT, "brokerPort", 0, 1, "bp", "brokerport", I18S("Broker port [1899]"), NULL}, { A_ARG, A_CHAR, "brokerClientId", 0, 1, NULL, "brokerclientid", I18S("Broker client id"), NULL}, + { A_ARG, A_CHAR, "mqttAddress", 0, 1, "ma", "mqttaddress", I18S("Address of MQTT server"), NULL}, + { A_ARG, A_INT, "mqttPort", 0, 1, "mp", "mqttport", I18S("Port of MQTT server (default: 1883)"), NULL}, + { A_ARG, A_CHAR, "mqttClientId", 0, 1, NULL, "mqttclientid", I18S("MQTT client id"), NULL}, + { A_ARG, A_INT, "mqttKeepAlive", 0, 1, "P", "mqttkeepalive", I18S("MQTT keep-alive time"), NULL}, { A_ARG, A_CHAR, "dbfile", 0, 1, "db", "dbfile", I18S("DB file to read/write node database"), NULL}, { A_ARG, A_CHAR, "pidfile", 0, 1, "p", "pidfile", I18S("PID file"), NULL}, { A_ARG, A_INT, "timeout", 0, 1, "T", NULL, I18S("timeout in seconds [0]"), NULL}, { 0, A_INT, "noAttn", 0, 1, "N", "noattn", I18S("Don't use ATTN line (for T03 or newer)"), NULL}, + { 0, A_INT, "startMqtt", 0, 1, "M", "startMqtt", I18S("Start MQTT connection"), NULL}, + { 0, A_INT, "startBroker", 0, 1, "M", "startBroker", I18S("Start AqHome Broker connection"), NULL}, { A_END, A_INT, "help", 0, 0, "h", "help", I18S("Show this help screen"), NULL} }; diff --git a/apps/aqhome-nodes/server.h b/apps/aqhome-nodes/server.h index a59b7f9..523132d 100644 --- a/apps/aqhome-nodes/server.h +++ b/apps/aqhome-nodes/server.h @@ -57,6 +57,9 @@ void AQH_NodeServer_HandleClientMsgs(AQH_OBJECT *o); void AQH_NodeServer_HandleBrokerMsgs(AQH_OBJECT *o); void AQH_NodeServer_CheckBrokerConnection(AQH_OBJECT *o); void AQH_NodeServer_CheckTtyConnection(AQH_OBJECT *o); +void AQH_NodeServer_HandleMqttMsgs(AQH_OBJECT *o); +void AQH_NodeServer_CheckMqttConnection(AQH_OBJECT *o); +int AQH_NodeServer_SendPing(AQH_OBJECT *o); /* getters and setters */ @@ -72,6 +75,12 @@ void AQH_NodeServer_SetBrokerAddress(AQH_OBJECT *o, const char *s); void AQH_NodeServer_SetBrokerPort(AQH_OBJECT *o, int i); void AQH_NodeServer_SetBrokerClientId(AQH_OBJECT *o, const char *s); +void AQH_NodeServer_SetMqttAddress(AQH_OBJECT *o, const char *s); +void AQH_NodeServer_SetMqttPort(AQH_OBJECT *o, int i); +void AQH_NodeServer_SetMqttClientId(AQH_OBJECT *o, const char *s); +void AQH_NodeServer_SetMqttKeepAlive(AQH_OBJECT *o, int i); +void AQH_NodeServer_SetMqttDiscoveryPrefix(AQH_OBJECT *o, const char *s); + /* device management */ const AQHNODE_DEVICE_LIST *AQH_NodeServer_GetDeviceDefList(const AQH_OBJECT *o); diff --git a/apps/aqhome-nodes/server_p.h b/apps/aqhome-nodes/server_p.h index 8b65a9a..280cc35 100644 --- a/apps/aqhome-nodes/server_p.h +++ b/apps/aqhome-nodes/server_p.h @@ -27,12 +27,16 @@ #define AQHOMED_DEFAULT_BROKER_PORT 1899 #define AQHOMED_DEFAULT_BROKER_CLIENTID "nodes" +#define AQHOMED_FLAGS_START_BROKER 0x00000001 +#define AQHOMED_FLAGS_START_MQTT 0x00000002 + typedef struct AQH_NODE_SERVER AQH_NODE_SERVER; struct AQH_NODE_SERVER { AQH_OBJECT *ttyEndpoint; AQH_OBJECT *brokerEndpoint; + AQH_OBJECT *mqttEndpoint; AQH_OBJECT *ipcEndpoint; AQH_OBJECT_LIST *ipcClientList; @@ -44,6 +48,8 @@ struct AQH_NODE_SERVER { GWEN_DB_NODE *dbArgs; + uint32_t flags; + char *dbFile; char *logFile; char *pidFile; @@ -58,8 +64,15 @@ struct AQH_NODE_SERVER { int brokerPort; char *brokerClientId; + char *mqttAddress; + int mqttPort; + char *mqttClientId; + char *mqttDiscoveryPrefix; + int mqttKeepAlive; + time_t timestampTtyDown; time_t timestampBrokerDown; + time_t timestampMqttDown; int nodeAddress;