started working on mqtt support in aqhome-nodes
This commit is contained in:
@@ -44,6 +44,7 @@
|
|||||||
r_forward.h
|
r_forward.h
|
||||||
r_setaccmsggrps.h
|
r_setaccmsggrps.h
|
||||||
r_getnodes.h
|
r_getnodes.h
|
||||||
|
r_publish.h
|
||||||
server.h
|
server.h
|
||||||
server_p.h
|
server_p.h
|
||||||
</headers>
|
</headers>
|
||||||
@@ -59,6 +60,7 @@
|
|||||||
r_forward.c
|
r_forward.c
|
||||||
r_setaccmsggrps.c
|
r_setaccmsggrps.c
|
||||||
r_getnodes.c
|
r_getnodes.c
|
||||||
|
r_publish.c
|
||||||
main.c
|
main.c
|
||||||
</sources>
|
</sources>
|
||||||
|
|
||||||
|
|||||||
@@ -36,6 +36,7 @@
|
|||||||
|
|
||||||
#define CONNCLEAN_INTERVAL_IN_SECS 2
|
#define CONNCLEAN_INTERVAL_IN_SECS 2
|
||||||
#define CONNCHECK_INTERVAL_IN_SECS 10
|
#define CONNCHECK_INTERVAL_IN_SECS 10
|
||||||
|
#define PING_INTERVAL_IN_SECS 120
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -133,11 +134,13 @@ void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop)
|
|||||||
int timeout;
|
int timeout;
|
||||||
time_t timeLastConnectionCleanup;
|
time_t timeLastConnectionCleanup;
|
||||||
time_t timeLastConnCheck;
|
time_t timeLastConnCheck;
|
||||||
|
time_t timeLastPingSend;
|
||||||
|
|
||||||
timeout=AQH_NodeServer_GetTimeout(aqh);
|
timeout=AQH_NodeServer_GetTimeout(aqh);
|
||||||
timeStart=time(NULL);
|
timeStart=time(NULL);
|
||||||
timeLastConnectionCleanup=time(NULL);
|
timeLastConnectionCleanup=time(NULL);
|
||||||
timeLastConnCheck=time(NULL);
|
timeLastConnCheck=time(NULL);
|
||||||
|
timeLastPingSend=time(NULL);
|
||||||
|
|
||||||
while(!stopService) {
|
while(!stopService) {
|
||||||
time_t now;
|
time_t now;
|
||||||
@@ -146,6 +149,7 @@ void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop)
|
|||||||
AQH_NodeServer_HandleTtyMsgs(aqh);
|
AQH_NodeServer_HandleTtyMsgs(aqh);
|
||||||
AQH_NodeServer_HandleClientMsgs(aqh);
|
AQH_NodeServer_HandleClientMsgs(aqh);
|
||||||
AQH_NodeServer_HandleBrokerMsgs(aqh);
|
AQH_NodeServer_HandleBrokerMsgs(aqh);
|
||||||
|
AQH_NodeServer_HandleMqttMsgs(aqh);
|
||||||
|
|
||||||
now=time(NULL);
|
now=time(NULL);
|
||||||
|
|
||||||
@@ -159,9 +163,20 @@ void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop)
|
|||||||
DBG_INFO(NULL, "Check connections");
|
DBG_INFO(NULL, "Check connections");
|
||||||
AQH_NodeServer_CheckBrokerConnection(aqh);
|
AQH_NodeServer_CheckBrokerConnection(aqh);
|
||||||
AQH_NodeServer_CheckTtyConnection(aqh);
|
AQH_NodeServer_CheckTtyConnection(aqh);
|
||||||
|
AQH_NodeServer_CheckMqttConnection(aqh);
|
||||||
timeLastConnCheck=now;
|
timeLastConnCheck=now;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_diffInSeconds(now, timeLastPingSend)>PING_INTERVAL_IN_SECS) {
|
||||||
|
int rv;
|
||||||
|
|
||||||
|
rv=AQH_NodeServer_SendPing(aqh);
|
||||||
|
if (rv<0) {
|
||||||
|
DBG_INFO(NULL, "Error sending PING");
|
||||||
|
}
|
||||||
|
timeLastPingSend=time(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
if (timeout && (_diffInSeconds(now, timeStart)>timeout)) {
|
if (timeout && (_diffInSeconds(now, timeStart)>timeout)) {
|
||||||
DBG_INFO(NULL, "Timeout");
|
DBG_INFO(NULL, "Timeout");
|
||||||
break;
|
break;
|
||||||
|
|||||||
34
apps/aqhome-nodes/r_publish.c
Normal file
34
apps/aqhome-nodes/r_publish.c
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
/****************************************************************************
|
||||||
|
* This file is part of the project AqHome.
|
||||||
|
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
|
||||||
|
*
|
||||||
|
* The license for this file can be found in the file COPYING which you
|
||||||
|
* should have received along with this file.
|
||||||
|
****************************************************************************/
|
||||||
|
|
||||||
|
#ifdef HAVE_CONFIG_H
|
||||||
|
# include <config.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
#include "./r_publish.h"
|
||||||
|
#include "./server_p.h"
|
||||||
|
#include "aqhome/ipc2/endpoint.h"
|
||||||
|
#include <aqhome/msg/mqtt/m_mqtt_publish.h>
|
||||||
|
|
||||||
|
#include <gwenhywfar/debug.h>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------------------------------
|
||||||
|
* code
|
||||||
|
* ------------------------------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
void AQH_NodeServer_HandlePublishMsg(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
28
apps/aqhome-nodes/r_publish.h
Normal file
28
apps/aqhome-nodes/r_publish.h
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
/****************************************************************************
|
||||||
|
* This file is part of the project AqHome.
|
||||||
|
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
|
||||||
|
*
|
||||||
|
* The license for this file can be found in the file COPYING which you
|
||||||
|
* should have received along with this file.
|
||||||
|
****************************************************************************/
|
||||||
|
|
||||||
|
#ifndef AQHOMED_R_PUBLISH_H
|
||||||
|
#define AQHOMED_R_PUBLISH_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "./server.h"
|
||||||
|
|
||||||
|
#include <aqhome/events2/object.h>
|
||||||
|
#include <aqhome/ipc2/message.h>
|
||||||
|
|
||||||
|
|
||||||
|
void AQH_NodeServer_HandlePublishMsg(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
@@ -18,6 +18,7 @@
|
|||||||
#include "./r_forward.h"
|
#include "./r_forward.h"
|
||||||
#include "./r_setaccmsggrps.h"
|
#include "./r_setaccmsggrps.h"
|
||||||
#include "./r_getnodes.h"
|
#include "./r_getnodes.h"
|
||||||
|
#include "./r_publish.h"
|
||||||
|
|
||||||
#include <aqhome/aqhome.h>
|
#include <aqhome/aqhome.h>
|
||||||
#include <aqhome/ipc2/ipc_endpoint.h>
|
#include <aqhome/ipc2/ipc_endpoint.h>
|
||||||
@@ -41,6 +42,14 @@
|
|||||||
#include <aqhome/msg/node/m_value.h>
|
#include <aqhome/msg/node/m_value.h>
|
||||||
#include <aqhome/msg/node/m_recvstats.h>
|
#include <aqhome/msg/node/m_recvstats.h>
|
||||||
#include <aqhome/msg/node/m_sendstats.h>
|
#include <aqhome/msg/node/m_sendstats.h>
|
||||||
|
#include <aqhome/ipc2/mqtt_endpoint.h>
|
||||||
|
#include <aqhome/ipc2/mqtt_client.h>
|
||||||
|
#include <aqhome/msg/mqtt/m_mqtt.h>
|
||||||
|
#include <aqhome/msg/mqtt/m_mqtt_connect.h>
|
||||||
|
#include <aqhome/msg/mqtt/m_mqtt_connack.h>
|
||||||
|
#include <aqhome/msg/mqtt/m_mqtt_subscribe.h>
|
||||||
|
#include <aqhome/msg/mqtt/m_mqtt_suback.h>
|
||||||
|
#include <aqhome/msg/mqtt/m_mqtt_publish.h>
|
||||||
#include <aqhome/data/value.h>
|
#include <aqhome/data/value.h>
|
||||||
|
|
||||||
#include <gwenhywfar/args.h>
|
#include <gwenhywfar/args.h>
|
||||||
@@ -70,13 +79,15 @@
|
|||||||
|
|
||||||
#define AQH_NODE_SERVER_BROKER_RESTARTTIME 10
|
#define AQH_NODE_SERVER_BROKER_RESTARTTIME 10
|
||||||
#define AQH_NODE_SERVER_TTY_RESTARTTIME 10
|
#define AQH_NODE_SERVER_TTY_RESTARTTIME 10
|
||||||
|
#define AQH_NODE_SERVER_MQTT_RESTARTTIME 10
|
||||||
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
AQH_NODE_SERVER_SLOT_NEWCLIENT=1,
|
AQH_NODE_SERVER_SLOT_NEWCLIENT=1,
|
||||||
AQH_NODE_SERVER_SLOT_CLIENTCLOSED,
|
AQH_NODE_SERVER_SLOT_CLIENTCLOSED,
|
||||||
AQH_NODE_SERVER_SLOT_BROKERCLOSED,
|
AQH_NODE_SERVER_SLOT_BROKERCLOSED,
|
||||||
AQH_NODE_SERVER_SLOT_TTYCLOSED
|
AQH_NODE_SERVER_SLOT_TTYCLOSED,
|
||||||
|
AQH_NODE_SERVER_SLOT_MQTTCLOSED,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@@ -97,13 +108,17 @@ GWEN_INHERIT(AQH_OBJECT, AQH_NODE_SERVER)
|
|||||||
|
|
||||||
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
|
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
|
||||||
static void _readConfig(AQH_OBJECT *o, AQH_NODE_SERVER *xo, GWEN_DB_NODE *dbArgs);
|
static void _readConfig(AQH_OBJECT *o, AQH_NODE_SERVER *xo, GWEN_DB_NODE *dbArgs);
|
||||||
static const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue);
|
static const char *_readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue);
|
||||||
static int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue);
|
static int _readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue);
|
||||||
|
|
||||||
static int _startIpc(AQH_OBJECT *o, AQH_NODE_SERVER *xo);
|
static int _startIpc(AQH_OBJECT *o, AQH_NODE_SERVER *xo);
|
||||||
static int _startTty(AQH_OBJECT *o, AQH_NODE_SERVER *xo);
|
static int _startTty(AQH_OBJECT *o, AQH_NODE_SERVER *xo);
|
||||||
static int _startBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo);
|
static int _startBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo);
|
||||||
static int _exchangeConnect(AQH_NODE_SERVER *xo, uint32_t flags);
|
static int _exchangeConnect(AQH_NODE_SERVER *xo, uint32_t flags);
|
||||||
|
static int _startMqtt(AQH_OBJECT *o, AQH_NODE_SERVER *xo);
|
||||||
|
static int _exchangeMqttConnect(AQH_NODE_SERVER *xo);
|
||||||
|
static int _exchangeMqttSubscribe(AQH_NODE_SERVER *xo);
|
||||||
|
|
||||||
static void _setupDb(AQH_NODE_SERVER *xo);
|
static void _setupDb(AQH_NODE_SERVER *xo);
|
||||||
static int _loadDeviceList(AQH_NODE_SERVER *xo);
|
static int _loadDeviceList(AQH_NODE_SERVER *xo);
|
||||||
|
|
||||||
@@ -112,11 +127,15 @@ static int _handleNewIpcClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *c
|
|||||||
static int _handleIpcClientDown(AQH_OBJECT *clientEndpoint);
|
static int _handleIpcClientDown(AQH_OBJECT *clientEndpoint);
|
||||||
static int _handleBrokerDown(AQH_NODE_SERVER *xo);
|
static int _handleBrokerDown(AQH_NODE_SERVER *xo);
|
||||||
static int _handleTtyDown(AQH_NODE_SERVER *xo);
|
static int _handleTtyDown(AQH_NODE_SERVER *xo);
|
||||||
|
static int _handleMqttDown(AQH_NODE_SERVER *xo);
|
||||||
|
|
||||||
static void _handleMsgsFromClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *ep);
|
static void _handleMsgsFromClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *ep);
|
||||||
static void _handleMsgFromClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
|
static void _handleMsgFromClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
|
||||||
|
static void _handleMsgFromMqtt(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
|
||||||
static void _handleMsgFromTty(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg);
|
static void _handleMsgFromTty(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg);
|
||||||
|
static void _handleMqttMsgPingRsp(void);
|
||||||
|
|
||||||
|
|
||||||
static void _writeTtyMsgToLogFile(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg);
|
static void _writeTtyMsgToLogFile(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg);
|
||||||
static void _forwardTtyMsgToBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg);
|
static void _forwardTtyMsgToBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg);
|
||||||
static void _forwardValueMessageToBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg);
|
static void _forwardValueMessageToBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg);
|
||||||
@@ -178,6 +197,7 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
|
|||||||
AQH_Object_free(xo->ipcEndpoint);
|
AQH_Object_free(xo->ipcEndpoint);
|
||||||
AQH_Object_free(xo->ttyEndpoint);
|
AQH_Object_free(xo->ttyEndpoint);
|
||||||
AQH_Object_free(xo->brokerEndpoint);
|
AQH_Object_free(xo->brokerEndpoint);
|
||||||
|
AQH_Object_free(xo->mqttEndpoint);
|
||||||
AQHNODE_Device_List_free(xo->deviceDefList);
|
AQHNODE_Device_List_free(xo->deviceDefList);
|
||||||
GWEN_DB_Group_free(xo->dbArgs);
|
GWEN_DB_Group_free(xo->dbArgs);
|
||||||
AQH_MsgRequest_free(xo->requestTree);
|
AQH_MsgRequest_free(xo->requestTree);
|
||||||
@@ -190,6 +210,9 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
|
|||||||
free(xo->tcpAddress);
|
free(xo->tcpAddress);
|
||||||
free(xo->brokerAddress);
|
free(xo->brokerAddress);
|
||||||
free(xo->brokerClientId);
|
free(xo->brokerClientId);
|
||||||
|
free(xo->mqttAddress);
|
||||||
|
free(xo->mqttClientId);
|
||||||
|
free(xo->mqttDiscoveryPrefix);
|
||||||
|
|
||||||
GWEN_FREE_OBJECT(xo);
|
GWEN_FREE_OBJECT(xo);
|
||||||
}
|
}
|
||||||
@@ -361,6 +384,81 @@ void AQH_NodeServer_SetBrokerClientId(AQH_OBJECT *o, const char *s)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void AQH_NodeServer_SetMqttAddress(AQH_OBJECT *o, const char *s)
|
||||||
|
{
|
||||||
|
if (o) {
|
||||||
|
AQH_NODE_SERVER *xo;
|
||||||
|
|
||||||
|
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o);
|
||||||
|
if (xo) {
|
||||||
|
free(xo->mqttAddress);
|
||||||
|
xo->mqttAddress=s?strdup(s):NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void AQH_NodeServer_SetMqttPort(AQH_OBJECT *o, int i)
|
||||||
|
{
|
||||||
|
if (o) {
|
||||||
|
AQH_NODE_SERVER *xo;
|
||||||
|
|
||||||
|
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o);
|
||||||
|
if (xo) {
|
||||||
|
xo->mqttPort=i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void AQH_NodeServer_SetMqttClientId(AQH_OBJECT *o, const char *s)
|
||||||
|
{
|
||||||
|
if (o) {
|
||||||
|
AQH_NODE_SERVER *xo;
|
||||||
|
|
||||||
|
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o);
|
||||||
|
if (xo) {
|
||||||
|
free(xo->mqttClientId);
|
||||||
|
xo->mqttClientId=s?strdup(s):NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void AQH_NodeServer_SetMqttKeepAlive(AQH_OBJECT *o, int i)
|
||||||
|
{
|
||||||
|
if (o) {
|
||||||
|
AQH_NODE_SERVER *xo;
|
||||||
|
|
||||||
|
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o);
|
||||||
|
if (xo) {
|
||||||
|
xo->mqttKeepAlive=i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void AQH_NodeServer_SetMqttDiscoveryPrefix(AQH_OBJECT *o, const char *s)
|
||||||
|
{
|
||||||
|
if (o) {
|
||||||
|
AQH_NODE_SERVER *xo;
|
||||||
|
|
||||||
|
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o);
|
||||||
|
if (xo) {
|
||||||
|
free(xo->mqttDiscoveryPrefix);
|
||||||
|
xo->mqttDiscoveryPrefix=s?strdup(s):NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||||
@@ -427,12 +525,25 @@ int AQH_NodeServer_Init(AQH_OBJECT *o, int argc, char **argv)
|
|||||||
DBG_INFO(NULL, "here (%d)", rv);
|
DBG_INFO(NULL, "here (%d)", rv);
|
||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
DBG_INFO(NULL, "Starting Broker Connection");
|
|
||||||
rv=_startBroker(o, xo);
|
if (xo->flags & AQHOMED_FLAGS_START_BROKER) {
|
||||||
if (rv<0) {
|
DBG_INFO(NULL, "Starting Broker Connection");
|
||||||
DBG_INFO(NULL, "here (%d)", rv);
|
rv=_startBroker(o, xo);
|
||||||
return rv;
|
if (rv<0) {
|
||||||
|
DBG_INFO(NULL, "here (%d)", rv);
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (xo->flags & AQHOMED_FLAGS_START_MQTT) {
|
||||||
|
DBG_INFO(NULL, "Starting MQTT Connection");
|
||||||
|
rv=_startMqtt(o, xo);
|
||||||
|
if (rv<0) {
|
||||||
|
DBG_INFO(NULL, "here (%d)", rv);
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@@ -450,23 +561,31 @@ void _readConfig(AQH_OBJECT *o, AQH_NODE_SERVER *xo, GWEN_DB_NODE *dbArgs)
|
|||||||
xo->timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0);
|
xo->timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0);
|
||||||
xo->noAttn=GWEN_DB_GetIntValue(dbArgs, "noAttn", 0, 0);
|
xo->noAttn=GWEN_DB_GetIntValue(dbArgs, "noAttn", 0, 0);
|
||||||
xo->nodeAddress=GWEN_DB_GetIntValue(dbArgs, "nodeAddress", 0, AQHOMED_DEFAULT_NODEADDR);
|
xo->nodeAddress=GWEN_DB_GetIntValue(dbArgs, "nodeAddress", 0, AQHOMED_DEFAULT_NODEADDR);
|
||||||
|
xo->flags|=GWEN_DB_GetIntValue(dbArgs, "startBroker", 0, 1)?AQHOMED_FLAGS_START_BROKER:0;
|
||||||
|
xo->flags|=GWEN_DB_GetIntValue(dbArgs, "startMqtt", 0, 0)?AQHOMED_FLAGS_START_MQTT:0;
|
||||||
|
|
||||||
AQH_NodeServer_SetDbFile(o, GWEN_DB_GetCharValue(dbArgs, "dbfile", 0, NULL));
|
AQH_NodeServer_SetDbFile(o, GWEN_DB_GetCharValue(dbArgs, "dbfile", 0, NULL));
|
||||||
AQH_NodeServer_SetLogFile(o, GWEN_DB_GetCharValue(dbArgs, "logfile", 0, NULL));
|
AQH_NodeServer_SetLogFile(o, GWEN_DB_GetCharValue(dbArgs, "logfile", 0, NULL));
|
||||||
|
|
||||||
AQH_NodeServer_SetDevicePath(o, GWEN_DB_GetCharValue(dbArgs, "device", 0, AQHOMED_DEFAULT_DEVICE));
|
AQH_NodeServer_SetDevicePath(o, GWEN_DB_GetCharValue(dbArgs, "device", 0, AQHOMED_DEFAULT_DEVICE));
|
||||||
AQH_NodeServer_SetTpcAddress(o, readCharConfigWithAlt(dbArgs, "tcpAddress", "ConfigFile/nodesAddress", NULL));
|
AQH_NodeServer_SetTpcAddress(o, _readCharConfigWithAlt(dbArgs, "tcpAddress", "ConfigFile/nodesAddress", NULL));
|
||||||
AQH_NodeServer_SetTcpPort(o, readIntConfigWithAlt(dbArgs, "tcpPort", "ConfigFile/nodesPort", AQHOMED_DEFAULT_IPC_PORT, -1));
|
AQH_NodeServer_SetTcpPort(o, _readIntConfigWithAlt(dbArgs, "tcpPort", "ConfigFile/nodesPort", AQHOMED_DEFAULT_IPC_PORT, -1));
|
||||||
|
|
||||||
AQH_NodeServer_SetBrokerAddress(o, readCharConfigWithAlt(dbArgs, "brokerAddress", "ConfigFile/brokerAddress", "127.0.0.1"));
|
AQH_NodeServer_SetBrokerAddress(o, _readCharConfigWithAlt(dbArgs, "brokerAddress", "ConfigFile/brokerAddress", "127.0.0.1"));
|
||||||
AQH_NodeServer_SetBrokerPort(o, readIntConfigWithAlt(dbArgs, "brokerPort", "ConfigFile/brokerPort", AQHOMED_DEFAULT_BROKER_PORT, -1));
|
AQH_NodeServer_SetBrokerPort(o, _readIntConfigWithAlt(dbArgs, "brokerPort", "ConfigFile/brokerPort", AQHOMED_DEFAULT_BROKER_PORT, -1));
|
||||||
AQH_NodeServer_SetBrokerClientId(o, GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, AQHOMED_DEFAULT_BROKER_CLIENTID));
|
AQH_NodeServer_SetBrokerClientId(o, GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, AQHOMED_DEFAULT_BROKER_CLIENTID));
|
||||||
|
|
||||||
|
AQH_NodeServer_SetMqttAddress(o, _readCharConfigWithAlt(dbArgs, "mqttAddress", "ConfigFile/mqttAddr", "127.0.0.1"));
|
||||||
|
AQH_NodeServer_SetMqttPort(o, _readIntConfigWithAlt(dbArgs, "mqttPort", "ConfigFile/mqttPort", 1883, -1));
|
||||||
|
AQH_NodeServer_SetMqttClientId(o, _readCharConfigWithAlt(dbArgs, "mqttClientId", "ConfigFile/mqttClientId", "aqhome-mqtt"));
|
||||||
|
AQH_NodeServer_SetMqttKeepAlive(o, _readIntConfigWithAlt(dbArgs, "mqttKeepAlive", "ConfigFile/mqttKeepAlive", 600, -1));
|
||||||
|
AQH_NodeServer_SetMqttDiscoveryPrefix(o, _readCharConfigWithAlt(dbArgs, "mqttDiscovery", "ConfigFile/mqttDiscovery", "homeassistant"));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue)
|
const char *_readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue)
|
||||||
{
|
{
|
||||||
const char *s;
|
const char *s;
|
||||||
|
|
||||||
@@ -478,7 +597,7 @@ const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, con
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue)
|
int _readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
@@ -615,6 +734,132 @@ int _exchangeConnect(AQH_NODE_SERVER *xo, uint32_t flags)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int _startMqtt(AQH_OBJECT *o, AQH_NODE_SERVER *xo)
|
||||||
|
{
|
||||||
|
if (xo->mqttEndpoint) {
|
||||||
|
AQH_Object_Disable(xo->mqttEndpoint);
|
||||||
|
AQH_Object_free(xo->mqttEndpoint);
|
||||||
|
xo->mqttEndpoint=NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (xo->mqttAddress && *(xo->mqttAddress) && xo->mqttPort) {
|
||||||
|
AQH_OBJECT *ep;
|
||||||
|
int fd;
|
||||||
|
int rv;
|
||||||
|
|
||||||
|
fd=AQH_TcpObject_CreateConnectedSocket(xo->mqttAddress, xo->mqttPort);
|
||||||
|
if (fd<0) {
|
||||||
|
DBG_ERROR(NULL, "Error connecting to MQTT server %s:%d", xo->mqttAddress, xo->mqttPort);
|
||||||
|
return GWEN_ERROR_IO;
|
||||||
|
}
|
||||||
|
DBG_INFO(NULL, "Physically connected to MQTT server %s:%d", xo->mqttAddress, xo->mqttPort);
|
||||||
|
|
||||||
|
ep=AQH_MqttClientObject_new(AQH_Object_GetEventLoop(o), fd);
|
||||||
|
assert(ep);
|
||||||
|
AQH_Endpoint_SetServiceName(ep, xo->mqttClientId);
|
||||||
|
AQH_Object_AddLink(ep, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_NODE_SERVER_SLOT_MQTTCLOSED, o);
|
||||||
|
AQH_Object_Enable(ep);
|
||||||
|
xo->mqttEndpoint=ep;
|
||||||
|
|
||||||
|
rv=_exchangeMqttConnect(xo);
|
||||||
|
if (rv!=0) {
|
||||||
|
DBG_ERROR(NULL, "MQTT: Error exchanging CONNECT request (%d)", rv);
|
||||||
|
return (rv<0)?rv:GWEN_ERROR_PERMISSIONS;
|
||||||
|
}
|
||||||
|
|
||||||
|
rv=_exchangeMqttSubscribe(xo);
|
||||||
|
if (rv!=0) {
|
||||||
|
DBG_ERROR(NULL, "MQTT: Error exchanging SUBSCRIBE request (%d)", rv);
|
||||||
|
return (rv<0)?rv:GWEN_ERROR_PERMISSIONS;
|
||||||
|
}
|
||||||
|
|
||||||
|
DBG_NOTICE(NULL, "Connected to MQTT at %s:%d", xo->mqttAddress, xo->mqttPort);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
DBG_ERROR(NULL, "No MQTT server settings");
|
||||||
|
return GWEN_ERROR_BAD_DATA;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int _exchangeMqttConnect(AQH_NODE_SERVER *xo)
|
||||||
|
{
|
||||||
|
AQH_MESSAGE *msg;
|
||||||
|
|
||||||
|
msg=AQH_MqttMessageConnect_new("MQTT", 0x04, 0, xo->mqttKeepAlive, xo->mqttClientId, NULL, NULL);
|
||||||
|
AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msg);
|
||||||
|
|
||||||
|
msg=AQH_MqttEndpoint_WaitForConnAckMsg(xo->mqttEndpoint, xo->timeoutInSeconds);
|
||||||
|
if (msg) {
|
||||||
|
int resultCode;
|
||||||
|
|
||||||
|
resultCode=AQH_MqttMessageConnAck_GetResultCode(msg);
|
||||||
|
AQH_Message_free(msg);
|
||||||
|
if (resultCode==AQH_MQTTMSG_CONNACK_RESULT_ACCEPTED) {
|
||||||
|
DBG_INFO(AQH_LOGDOMAIN, "Positive CONNACK response");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
DBG_ERROR(NULL, "Negative CONNACK response: %d", resultCode);
|
||||||
|
return GWEN_ERROR_GENERIC;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
DBG_ERROR(NULL, "No CONNACK message received.");
|
||||||
|
return GWEN_ERROR_GENERIC;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int _exchangeMqttSubscribe(AQH_NODE_SERVER *xo)
|
||||||
|
{
|
||||||
|
uint16_t pckId;
|
||||||
|
AQH_MESSAGE *msg;
|
||||||
|
|
||||||
|
pckId=AQH_Endpoint_GetNextMessageId(xo->mqttEndpoint);
|
||||||
|
msg=AQH_MqttMessageSubscribe_new(0, pckId, "#", 0);
|
||||||
|
AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msg);
|
||||||
|
|
||||||
|
msg=AQH_MqttEndpoint_WaitForMsg(xo->mqttEndpoint, AQH_MQTTMSG_MSGTYPE_SUBACK, xo->timeoutInSeconds);
|
||||||
|
if (msg) {
|
||||||
|
int resultCode;
|
||||||
|
|
||||||
|
resultCode=AQH_MqttMessageSubAck_GetResultCode(msg);
|
||||||
|
AQH_Message_free(msg);
|
||||||
|
if (resultCode!=128) {
|
||||||
|
DBG_INFO(AQH_LOGDOMAIN, "Positive SUBACK response");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
DBG_ERROR(NULL, "Negative SUBACK response: %d", resultCode);
|
||||||
|
return GWEN_ERROR_GENERIC;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
DBG_ERROR(NULL, "No SUBACK message received.");
|
||||||
|
return GWEN_ERROR_GENERIC;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void _setupDb(AQH_NODE_SERVER *xo)
|
void _setupDb(AQH_NODE_SERVER *xo)
|
||||||
{
|
{
|
||||||
if (xo->dbFile) {
|
if (xo->dbFile) {
|
||||||
@@ -1139,7 +1384,7 @@ void AQH_NodeServer_CheckBrokerConnection(AQH_OBJECT *o)
|
|||||||
AQH_NODE_SERVER *xo;
|
AQH_NODE_SERVER *xo;
|
||||||
|
|
||||||
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o);
|
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o);
|
||||||
if (xo && xo->dbArgs) {
|
if (xo && xo->dbArgs && (xo->flags & AQHOMED_FLAGS_START_BROKER)) {
|
||||||
|
|
||||||
if (xo->brokerEndpoint) {
|
if (xo->brokerEndpoint) {
|
||||||
if (AQH_Object_GetFlags(xo->brokerEndpoint) & AQH_OBJECT_FLAGS_DELETE) {
|
if (AQH_Object_GetFlags(xo->brokerEndpoint) & AQH_OBJECT_FLAGS_DELETE) {
|
||||||
@@ -1170,6 +1415,106 @@ void AQH_NodeServer_CheckBrokerConnection(AQH_OBJECT *o)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||||
|
* MQTT management functions
|
||||||
|
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
void AQH_NodeServer_HandleMqttMsgs(AQH_OBJECT *o)
|
||||||
|
{
|
||||||
|
AQH_NODE_SERVER *xo;
|
||||||
|
|
||||||
|
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o);
|
||||||
|
if (xo && xo->mqttEndpoint) {
|
||||||
|
AQH_MESSAGE *msg;
|
||||||
|
|
||||||
|
while( (msg=AQH_Endpoint_GetNextMsgIn(xo->mqttEndpoint)) ) {
|
||||||
|
AQH_Message_SetObject(msg, xo->mqttEndpoint);
|
||||||
|
_handleMsgFromMqtt(o, xo->mqttEndpoint, msg);
|
||||||
|
AQH_Message_free(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void _handleMsgFromMqtt(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg)
|
||||||
|
{
|
||||||
|
uint8_t code;
|
||||||
|
|
||||||
|
/* exec IPC message */
|
||||||
|
code=AQH_MqttMessage_GetTypeAndFlags(msg);
|
||||||
|
switch(code & 0xf0) {
|
||||||
|
case (AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0): AQH_NodeServer_HandlePublishMsg(o, ep, msg); break;
|
||||||
|
case (AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0): _handleMqttMsgPingRsp(); break;
|
||||||
|
default: break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void _handleMqttMsgPingRsp(void)
|
||||||
|
{
|
||||||
|
DBG_INFO(NULL, "PING response received");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void AQH_NodeServer_CheckMqttConnection(AQH_OBJECT *o)
|
||||||
|
{
|
||||||
|
AQH_NODE_SERVER *xo;
|
||||||
|
|
||||||
|
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o);
|
||||||
|
if (xo && (xo->flags & AQHOMED_FLAGS_START_MQTT)) {
|
||||||
|
if (xo->mqttEndpoint) {
|
||||||
|
if (AQH_Object_GetFlags(xo->mqttEndpoint) & AQH_OBJECT_FLAGS_DELETE) {
|
||||||
|
DBG_INFO(NULL, "Deleting mqtt connection");
|
||||||
|
AQH_Object_Disable(xo->mqttEndpoint);
|
||||||
|
AQH_Object_free(xo->mqttEndpoint);
|
||||||
|
xo->mqttEndpoint=NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (xo->mqttEndpoint==NULL) {
|
||||||
|
time_t now;
|
||||||
|
|
||||||
|
now=time(NULL);
|
||||||
|
if (_diffInSeconds(now, xo->timestampMqttDown)>AQH_NODE_SERVER_MQTT_RESTARTTIME) {
|
||||||
|
int rv;
|
||||||
|
|
||||||
|
DBG_INFO(NULL, "Restarting MQTT connection");
|
||||||
|
rv=_startMqtt(o, xo);
|
||||||
|
if (rv<0) {
|
||||||
|
DBG_ERROR(NULL, "here (%d)", rv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int AQH_NodeServer_SendPing(AQH_OBJECT *o)
|
||||||
|
{
|
||||||
|
AQH_NODE_SERVER *xo;
|
||||||
|
|
||||||
|
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o);
|
||||||
|
if (xo && xo->mqttEndpoint) {
|
||||||
|
AQH_MESSAGE *msgOut;
|
||||||
|
|
||||||
|
DBG_INFO(NULL, "Sending PING");
|
||||||
|
msgOut=AQH_MqttMessage_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL);
|
||||||
|
AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msgOut);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return GWEN_ERROR_INVALID;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||||
* request management functions
|
* request management functions
|
||||||
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||||
@@ -1231,6 +1576,7 @@ int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, GWEN
|
|||||||
case AQH_NODE_SERVER_SLOT_CLIENTCLOSED: return _handleIpcClientDown(senderObject);
|
case AQH_NODE_SERVER_SLOT_CLIENTCLOSED: return _handleIpcClientDown(senderObject);
|
||||||
case AQH_NODE_SERVER_SLOT_BROKERCLOSED: return _handleBrokerDown(xo);
|
case AQH_NODE_SERVER_SLOT_BROKERCLOSED: return _handleBrokerDown(xo);
|
||||||
case AQH_NODE_SERVER_SLOT_TTYCLOSED: return _handleTtyDown(xo);
|
case AQH_NODE_SERVER_SLOT_TTYCLOSED: return _handleTtyDown(xo);
|
||||||
|
case AQH_NODE_SERVER_SLOT_MQTTCLOSED: return _handleMqttDown(xo);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -1284,6 +1630,18 @@ int _handleTtyDown(AQH_NODE_SERVER *xo)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int _handleMqttDown(AQH_NODE_SERVER *xo)
|
||||||
|
{
|
||||||
|
if (xo->mqttEndpoint) {
|
||||||
|
DBG_WARN(NULL, "MQTT connection down");
|
||||||
|
AQH_Object_AddFlags(xo->mqttEndpoint, AQH_OBJECT_FLAGS_DELETE);
|
||||||
|
xo->timestampMqttDown=time(NULL);
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||||
@@ -1411,10 +1769,16 @@ int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs)
|
|||||||
{ A_ARG, A_CHAR, "brokerAddress", 0, 1, "ba", "brokerddress", I18S("Broker address [127.0.0.1]"), NULL},
|
{ A_ARG, A_CHAR, "brokerAddress", 0, 1, "ba", "brokerddress", I18S("Broker address [127.0.0.1]"), NULL},
|
||||||
{ A_ARG, A_INT, "brokerPort", 0, 1, "bp", "brokerport", I18S("Broker port [1899]"), NULL},
|
{ A_ARG, A_INT, "brokerPort", 0, 1, "bp", "brokerport", I18S("Broker port [1899]"), NULL},
|
||||||
{ A_ARG, A_CHAR, "brokerClientId", 0, 1, NULL, "brokerclientid", I18S("Broker client id"), NULL},
|
{ A_ARG, A_CHAR, "brokerClientId", 0, 1, NULL, "brokerclientid", I18S("Broker client id"), NULL},
|
||||||
|
{ A_ARG, A_CHAR, "mqttAddress", 0, 1, "ma", "mqttaddress", I18S("Address of MQTT server"), NULL},
|
||||||
|
{ A_ARG, A_INT, "mqttPort", 0, 1, "mp", "mqttport", I18S("Port of MQTT server (default: 1883)"), NULL},
|
||||||
|
{ A_ARG, A_CHAR, "mqttClientId", 0, 1, NULL, "mqttclientid", I18S("MQTT client id"), NULL},
|
||||||
|
{ A_ARG, A_INT, "mqttKeepAlive", 0, 1, "P", "mqttkeepalive", I18S("MQTT keep-alive time"), NULL},
|
||||||
{ A_ARG, A_CHAR, "dbfile", 0, 1, "db", "dbfile", I18S("DB file to read/write node database"), NULL},
|
{ A_ARG, A_CHAR, "dbfile", 0, 1, "db", "dbfile", I18S("DB file to read/write node database"), NULL},
|
||||||
{ A_ARG, A_CHAR, "pidfile", 0, 1, "p", "pidfile", I18S("PID file"), NULL},
|
{ A_ARG, A_CHAR, "pidfile", 0, 1, "p", "pidfile", I18S("PID file"), NULL},
|
||||||
{ A_ARG, A_INT, "timeout", 0, 1, "T", NULL, I18S("timeout in seconds [0]"), NULL},
|
{ A_ARG, A_INT, "timeout", 0, 1, "T", NULL, I18S("timeout in seconds [0]"), NULL},
|
||||||
{ 0, A_INT, "noAttn", 0, 1, "N", "noattn", I18S("Don't use ATTN line (for T03 or newer)"), NULL},
|
{ 0, A_INT, "noAttn", 0, 1, "N", "noattn", I18S("Don't use ATTN line (for T03 or newer)"), NULL},
|
||||||
|
{ 0, A_INT, "startMqtt", 0, 1, "M", "startMqtt", I18S("Start MQTT connection"), NULL},
|
||||||
|
{ 0, A_INT, "startBroker", 0, 1, "M", "startBroker", I18S("Start AqHome Broker connection"), NULL},
|
||||||
{ A_END, A_INT, "help", 0, 0, "h", "help", I18S("Show this help screen"), NULL}
|
{ A_END, A_INT, "help", 0, 0, "h", "help", I18S("Show this help screen"), NULL}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -57,6 +57,9 @@ void AQH_NodeServer_HandleClientMsgs(AQH_OBJECT *o);
|
|||||||
void AQH_NodeServer_HandleBrokerMsgs(AQH_OBJECT *o);
|
void AQH_NodeServer_HandleBrokerMsgs(AQH_OBJECT *o);
|
||||||
void AQH_NodeServer_CheckBrokerConnection(AQH_OBJECT *o);
|
void AQH_NodeServer_CheckBrokerConnection(AQH_OBJECT *o);
|
||||||
void AQH_NodeServer_CheckTtyConnection(AQH_OBJECT *o);
|
void AQH_NodeServer_CheckTtyConnection(AQH_OBJECT *o);
|
||||||
|
void AQH_NodeServer_HandleMqttMsgs(AQH_OBJECT *o);
|
||||||
|
void AQH_NodeServer_CheckMqttConnection(AQH_OBJECT *o);
|
||||||
|
int AQH_NodeServer_SendPing(AQH_OBJECT *o);
|
||||||
|
|
||||||
|
|
||||||
/* getters and setters */
|
/* getters and setters */
|
||||||
@@ -72,6 +75,12 @@ void AQH_NodeServer_SetBrokerAddress(AQH_OBJECT *o, const char *s);
|
|||||||
void AQH_NodeServer_SetBrokerPort(AQH_OBJECT *o, int i);
|
void AQH_NodeServer_SetBrokerPort(AQH_OBJECT *o, int i);
|
||||||
void AQH_NodeServer_SetBrokerClientId(AQH_OBJECT *o, const char *s);
|
void AQH_NodeServer_SetBrokerClientId(AQH_OBJECT *o, const char *s);
|
||||||
|
|
||||||
|
void AQH_NodeServer_SetMqttAddress(AQH_OBJECT *o, const char *s);
|
||||||
|
void AQH_NodeServer_SetMqttPort(AQH_OBJECT *o, int i);
|
||||||
|
void AQH_NodeServer_SetMqttClientId(AQH_OBJECT *o, const char *s);
|
||||||
|
void AQH_NodeServer_SetMqttKeepAlive(AQH_OBJECT *o, int i);
|
||||||
|
void AQH_NodeServer_SetMqttDiscoveryPrefix(AQH_OBJECT *o, const char *s);
|
||||||
|
|
||||||
|
|
||||||
/* device management */
|
/* device management */
|
||||||
const AQHNODE_DEVICE_LIST *AQH_NodeServer_GetDeviceDefList(const AQH_OBJECT *o);
|
const AQHNODE_DEVICE_LIST *AQH_NodeServer_GetDeviceDefList(const AQH_OBJECT *o);
|
||||||
|
|||||||
@@ -27,12 +27,16 @@
|
|||||||
#define AQHOMED_DEFAULT_BROKER_PORT 1899
|
#define AQHOMED_DEFAULT_BROKER_PORT 1899
|
||||||
#define AQHOMED_DEFAULT_BROKER_CLIENTID "nodes"
|
#define AQHOMED_DEFAULT_BROKER_CLIENTID "nodes"
|
||||||
|
|
||||||
|
#define AQHOMED_FLAGS_START_BROKER 0x00000001
|
||||||
|
#define AQHOMED_FLAGS_START_MQTT 0x00000002
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
typedef struct AQH_NODE_SERVER AQH_NODE_SERVER;
|
typedef struct AQH_NODE_SERVER AQH_NODE_SERVER;
|
||||||
struct AQH_NODE_SERVER {
|
struct AQH_NODE_SERVER {
|
||||||
AQH_OBJECT *ttyEndpoint;
|
AQH_OBJECT *ttyEndpoint;
|
||||||
AQH_OBJECT *brokerEndpoint;
|
AQH_OBJECT *brokerEndpoint;
|
||||||
|
AQH_OBJECT *mqttEndpoint;
|
||||||
|
|
||||||
AQH_OBJECT *ipcEndpoint;
|
AQH_OBJECT *ipcEndpoint;
|
||||||
AQH_OBJECT_LIST *ipcClientList;
|
AQH_OBJECT_LIST *ipcClientList;
|
||||||
@@ -44,6 +48,8 @@ struct AQH_NODE_SERVER {
|
|||||||
|
|
||||||
GWEN_DB_NODE *dbArgs;
|
GWEN_DB_NODE *dbArgs;
|
||||||
|
|
||||||
|
uint32_t flags;
|
||||||
|
|
||||||
char *dbFile;
|
char *dbFile;
|
||||||
char *logFile;
|
char *logFile;
|
||||||
char *pidFile;
|
char *pidFile;
|
||||||
@@ -58,8 +64,15 @@ struct AQH_NODE_SERVER {
|
|||||||
int brokerPort;
|
int brokerPort;
|
||||||
char *brokerClientId;
|
char *brokerClientId;
|
||||||
|
|
||||||
|
char *mqttAddress;
|
||||||
|
int mqttPort;
|
||||||
|
char *mqttClientId;
|
||||||
|
char *mqttDiscoveryPrefix;
|
||||||
|
int mqttKeepAlive;
|
||||||
|
|
||||||
time_t timestampTtyDown;
|
time_t timestampTtyDown;
|
||||||
time_t timestampBrokerDown;
|
time_t timestampBrokerDown;
|
||||||
|
time_t timestampMqttDown;
|
||||||
|
|
||||||
int nodeAddress;
|
int nodeAddress;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user