diff --git a/apps/aqhome-mqttlog/0BUILD b/apps/aqhome-mqttlog/0BUILD index 0373131..2fae7ed 100644 --- a/apps/aqhome-mqttlog/0BUILD +++ b/apps/aqhome-mqttlog/0BUILD @@ -23,26 +23,39 @@ - itemvar.t2d item.t2d + itemvar.t2d + mqttvalue.t2d + mqtttopic.t2d + mqttvalue.c + mqtttopic.c + mqttvalue.h + mqttvalue_p.h + mqtttopic.h + mqtttopic_p.h + init.h + aqhome_mqtt.h + aqhome_mqtt_p.h mqtt.h messages.h $(local/typefiles) + aqhome_mqtt.c + init.c main.c mqtt.c messages.c diff --git a/apps/aqhome-mqttlog/aqhome_mqtt.c b/apps/aqhome-mqttlog/aqhome_mqtt.c new file mode 100644 index 0000000..bb5a418 --- /dev/null +++ b/apps/aqhome-mqttlog/aqhome_mqtt.c @@ -0,0 +1,94 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./aqhome_mqtt_p.h" +#include "aqhome/ipc/endpoint_ipc.h" +#include "aqhome/ipc/endpoint_ipcclient.h" +#include "aqhome/mqtt/endpoint_mqttc.h" +#include "aqhome/mqtt/msg_mqtt_publish.h" + +#include +#include + + + + + +AQHOME_MQTT *AqHomeMqtt_new() +{ + AQHOME_MQTT *aqh; + + GWEN_NEW_OBJECT(AQHOME_MQTT, aqh); + + aqh->rootEndpoint=GWEN_MsgEndpoint_new("root", 0); + + return aqh; +} + + + +void AqHomeMqtt_free(AQHOME_MQTT *aqh) +{ + if (aqh) { + GWEN_MsgEndpoint_free(aqh->rootEndpoint); + GWEN_DB_Group_free(aqh->dbArgs); + free(aqh->pidFile); + + GWEN_FREE_OBJECT(aqh); + } +} + + + +GWEN_MSG_ENDPOINT *AqHomeMqtt_GetBrokerEndpoint(const AQHOME_MQTT *aqh) +{ + return aqh?(aqh->brokerEndpoint):NULL; +} + + + +GWEN_MSG_ENDPOINT *AqHomeMqtt_GetMqttEndpoint(const AQHOME_MQTT *aqh) +{ + return aqh?(aqh->mqttEndpoint):NULL; +} + + + +GWEN_DB_NODE *AqHomeMqtt_GetDbArgs(const AQHOME_MQTT *aqh) +{ + return aqh?(aqh->dbArgs):NULL; +} + + + +const char *AqHomeMqtt_GetPidFile(const AQHOME_MQTT *aqh) +{ + return aqh?aqh->pidFile:NULL; +} + + + +void AqHomeMqtt_SetPidFile(AQHOME_MQTT *aqh, const char *s) +{ + if (aqh) { + free(aqh->pidFile); + aqh->pidFile=s?strdup(s):NULL; + } +} + + + +int AqHomeMqtt_GetTimeout(const AQHOME_MQTT *aqh) +{ + return aqh?aqh->timeout:0; +} diff --git a/apps/aqhome-mqttlog/aqhome_mqtt.h b/apps/aqhome-mqttlog/aqhome_mqtt.h new file mode 100644 index 0000000..1a4b1e9 --- /dev/null +++ b/apps/aqhome-mqttlog/aqhome_mqtt.h @@ -0,0 +1,41 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_MQTT_H +#define AQHOME_MQTT_H + + +#include "./mqttvalue.h" +#include "./mqtttopic.h" + + +#include + + + +typedef struct AQHOME_MQTT AQHOME_MQTT; + + +AQHOME_MQTT *AqHomeMqtt_new(); +void AqHomeMqtt_free(AQHOME_MQTT *aqh); + +GWEN_MSG_ENDPOINT *AqHomeMqtt_GetBrokerEndpoint(const AQHOME_MQTT *aqh); +GWEN_MSG_ENDPOINT *AqHomeMqtt_GetMqttEndpoint(const AQHOME_MQTT *aqh); + +GWEN_DB_NODE *AqHomeMqtt_GetDbArgs(const AQHOME_MQTT *aqh); + +const char *AqHomeMqtt_GetPidFile(const AQHOME_MQTT *aqh); +void AqHomeMqtt_SetPidFile(AQHOME_MQTT *aqh, const char *s); + +int AqHomeMqtt_GetTimeout(const AQHOME_MQTT *aqh); + + + + +#endif + diff --git a/apps/aqhome-mqttlog/aqhome_mqtt_p.h b/apps/aqhome-mqttlog/aqhome_mqtt_p.h new file mode 100644 index 0000000..e40f3e9 --- /dev/null +++ b/apps/aqhome-mqttlog/aqhome_mqtt_p.h @@ -0,0 +1,42 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_MQTT_P_H +#define AQHOME_MQTT_P_H + + +#include "./aqhome_mqtt.h" + +#include + + +#define AQHOME_MQTT_DEFAULT_PIDFILE "/var/run/aqhome-mqtt.pid" +#define AQHOME_MQTT_DEFAULT_DATADIR "/var/lib/aqhome-mqtt/data" + +#define AQHOME_MQTT_DEFAULT_BROKER_PORT 1899 +#define AQHOME_MQTT_DEFAULT_BROKER_CLIENTID "mqtt" + + + + +struct AQHOME_MQTT { + GWEN_MSG_ENDPOINT *rootEndpoint; + GWEN_MSG_ENDPOINT *brokerEndpoint; /* do not free (is part of tree pointed to by rootEndpoint) */ + GWEN_MSG_ENDPOINT *mqttEndpoint; /* do not free (is part of tree pointed to by rootEndpoint) */ + + GWEN_DB_NODE *dbArgs; + char *pidFile; + int timeout; /* timeout for run e.g. inside valgrind */ + + AQH_MQTT_VALUE *mqttValueList; + AQH_MQTT_TOPIC *mqttTopicList; +}; + + +#endif + diff --git a/apps/aqhome-mqttlog/init.c b/apps/aqhome-mqttlog/init.c new file mode 100644 index 0000000..c3ee1ce --- /dev/null +++ b/apps/aqhome-mqttlog/init.c @@ -0,0 +1,359 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./init.h" +#include "./mqtt.h" +#include "./aqhome_mqtt_p.h" + +#include "aqhome/ipc/endpoint_ipc.h" +#include "aqhome/ipc/endpoint_ipcclient.h" +#include "aqhome/mqtt/endpoint_mqttc.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + +#define I18N(msg) msg +#define I18S(msg) msg + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static int _createPidFile(const char *pidFilename); +static int _setupBroker(AQHOME_MQTT *aqh, GWEN_DB_NODE *dbArgs); +static int _setupMqtt(AQHOME_MQTT *aqh, GWEN_DB_NODE *dbArgs); +static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + +int AqHomeMqtt_Init(AQHOME_MQTT *aqh, int argc, char **argv) +{ + int rv; + GWEN_DB_NODE *dbArgs; + const char *s; + + dbArgs=GWEN_DB_Group_new("args"); + rv=_readArgs(argc, argv, dbArgs); + if (rv<0) { + DBG_ERROR(NULL, "Error reading args (%d)", rv); + return rv; + } + aqh->dbArgs=dbArgs; + + s=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, AQHOME_MQTT_DEFAULT_PIDFILE); + if (s && *s) { + AqHomeMqtt_SetPidFile(aqh, s); + rv=_createPidFile(s); + if (rv<0) { + DBG_ERROR(NULL, "Error creating PID file (%d)", rv); + return rv; + } + } + + rv=_setupMqtt(aqh, dbArgs); + if (rv<0) { + DBG_ERROR(NULL, "Error setting up connection to broker (%d)", rv); + return rv; + } + + rv=_setupBroker(aqh, dbArgs); + if (rv<0) { + DBG_ERROR(NULL, "Error setting up connection to broker (%d)", rv); + return rv; + } + + return 0; +} + + + +int _createPidFile(const char *pidFilename) +{ + FILE *f; + int pidfd; + + if (remove(pidFilename)==0) { + DBG_ERROR(0, "Old PID file existed, removed. (Unclean shutdown?)"); + } + +#ifdef HAVE_SYS_STAT_H + pidfd = open(pidFilename, O_EXCL|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); + if (pidfd < 0) { + DBG_ERROR(NULL, "Could not create PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + + f = fdopen(pidfd, "w"); +#else /* HAVE_STAT_H */ + f=fopen(pidFilename,"w+"); +#endif /* HAVE_STAT_H */ + + /* write pid */ +#ifdef HAVE_GETPID + fprintf(f,"%d\n",getpid()); +#else + fprintf(f,"-1\n"); +#endif + if (fclose(f)) { + DBG_ERROR(0, "Could not close PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + return 0; +} + + + +int _setupBroker(AQHOME_MQTT *aqh, GWEN_DB_NODE *dbArgs) +{ + const char *brokerAddress; + int brokerPort; + const char *brokerClientId; + + brokerAddress=GWEN_DB_GetCharValue(dbArgs, "brokerAddress", 0, NULL); + brokerPort=GWEN_DB_GetIntValue(dbArgs, "brokerPort", 0, AQHOME_MQTT_DEFAULT_BROKER_PORT); + brokerClientId=GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, AQHOME_MQTT_DEFAULT_BROKER_CLIENTID); + + if (brokerAddress && *brokerAddress && brokerPort) { + GWEN_MSG_ENDPOINT *ep; + GWEN_MSG_ENDPOINT *ipcBaseEndpoint; + int rv; + + ep=AQH_ClientIpcEndpoint_new("brokerIpcClient", 0); + ipcBaseEndpoint=AQH_IpcEndpoint_CreateIpcTcpClient(brokerAddress, brokerPort, "brokerPhysEndpoint", 0); + AQH_IpcEndpoint_SetServiceName(ipcBaseEndpoint, brokerClientId); + GWEN_MsgEndpoint_Tree2_AddChild(ep, ipcBaseEndpoint); + + GWEN_MsgEndpoint_Tree2_AddChild(aqh->rootEndpoint, ep); + aqh->brokerEndpoint=ep; + + rv=GWEN_MultilayerEndpoint_StartConnect(ep); + if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { + DBG_ERROR(NULL, "Error connecting to broker server %s:%d (%d), will retry later", brokerAddress, brokerPort, rv); + return rv; + } + } + + return 0; +} + + + +int _setupMqtt(AQHOME_MQTT *aqh, GWEN_DB_NODE *dbArgs) +{ + GWEN_MSG_ENDPOINT *ep; + + ep=AqHomeMqttLog_CreateMqttEndpoint(dbArgs); + if (ep==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint for MQTT"); + return GWEN_ERROR_GENERIC; + } + + GWEN_MsgEndpoint_Tree2_AddChild(aqh->rootEndpoint, ep); + aqh->mqttEndpoint=ep; + + return 0; +} + + + +int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) +{ + int rv; + const GWEN_ARGS args[]= { + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "cfgdir", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "D", /* short option */ + "cfgdir", /* long option */ + I18S("Specify the configuration folder"), + I18S("Specify the configuration folder") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "charset", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + 0, /* short option */ + "charset", /* long option */ + I18S("Specify the output character set"), /* short description */ + I18S("Specify the output character set") /* long description */ + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "brokerAddress", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "ba", /* short option */ + "brokeraddress", /* long option */ + I18S("Specify the address of the broker server to connect to (disabled if missing)"), + I18S("Specify the address of the broker server to connect to (disabled if missing)") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "brokerPort", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "bp", /* short option */ + "brokerport", /* long option */ + I18S("Specify the port of the broker server (default: 1899)"), + I18S("Specify the port of the broker server (default: 1899)") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "brokerClientId", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + NULL, /* short option */ + "brokerclientid", /* long option */ + I18S("Specify client id for the broker server (default: \"nodes\")"), + I18S("Specify client id for the broker server (default: \"nodes\")") + }, + + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "mqttAddress", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "ma", /* short option */ + "mqttaddress", /* long option */ + I18S("Specify the address of the MQTT server to connect to (disabled if missing)"), + I18S("Specify the address of the MQTT server to connect to (disabled if missing)") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "mqttPort", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "mp", /* short option */ + "mqttport", /* long option */ + I18S("Specify the port of the MQTT server (default: 1883)"), + I18S("Specify the port of the MQTT server (default: 1883)") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "mqttClientId", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + NULL, /* short option */ + "mqttclientid", /* long option */ + I18S("Specify client id for the MQTT server (default: \"aqhomed\")"), + I18S("Specify client id for the MQTT server (default: \"aqhomed\")") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "mqttKeepAlive", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "mk", /* short option */ + "mqttkeepalive", /* long option */ + I18S("Specify keepalive time in seconds (defaults: 600)"), + I18S("Specify keepalive time in seconds (defaults: 600)") + }, + + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "pidfile", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "p", /* short option */ + "pidfile", /* long option */ + I18S("Specify the PID file"), + I18S("Specify the PID file") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "timeout", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "T", /* short option */ + "timeout", /* long option */ + I18S("Specify timeout in second (default: no timeout)"), + I18S("Specify timeout in second (default: no timeout)") + }, + { + GWEN_ARGS_FLAGS_HELP | GWEN_ARGS_FLAGS_LAST, /* flags */ + GWEN_ArgsType_Int, /* type */ + "help", /* name */ + 0, /* minnum */ + 0, /* maxnum */ + "h", /* short option */ + "help", + I18S("Show this help screen."), + I18S("Show this help screen.") + } + }; + + rv=GWEN_Args_Check(argc, argv, 1, 0, args, dbArgs); + if (rv==GWEN_ARGS_RESULT_ERROR) { + fprintf(stderr, "ERROR: Could not parse arguments main\n"); + return GWEN_ERROR_INVALID; + } + else if (rv==GWEN_ARGS_RESULT_HELP) { + GWEN_BUFFER *ubuf; + + ubuf=GWEN_Buffer_new(0, 1024, 0, 1); + GWEN_Buffer_AppendArgs(ubuf, + I18N("This is version %s.\nUsage: %s [OPTIONS]\n\nOptions:\n"), + AQHOME_VERSION_STRING, + argv[0]); + if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) { + fprintf(stderr, "ERROR: Could not create help string\n"); + return 1; + } + GWEN_Buffer_AppendString(ubuf, "\n"); + + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(ubuf)); + GWEN_Buffer_free(ubuf); + return GWEN_ERROR_CLOSE; + } + return 0; +} + + + diff --git a/apps/aqhome-mqttlog/init.h b/apps/aqhome-mqttlog/init.h new file mode 100644 index 0000000..ec9dd24 --- /dev/null +++ b/apps/aqhome-mqttlog/init.h @@ -0,0 +1,23 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOMEMQTT_INIT_H +#define AQHOMEMQTT_INIT_H + + +#include "./aqhome_mqtt.h" + + + +int AqHomeMqtt_Init(AQHOME_MQTT *aqh, int argc, char **argv); + + + +#endif + + diff --git a/apps/aqhome-mqttlog/mqtttopic.t2d b/apps/aqhome-mqttlog/mqtttopic.t2d new file mode 100644 index 0000000..426b600 --- /dev/null +++ b/apps/aqhome-mqttlog/mqtttopic.t2d @@ -0,0 +1,79 @@ + + + + + + + + AQH_MQTT_TOPIC + AQH_MqttTopic + mqtttopic + + + with_xml + with_db + with_list1 + + + +
aqhome/api.h
+
+ + + + +
+ + + + + + + numeric type + + + + json data + + + + + + + + + + + 0 + 0 + public + with_getbymember + + + + 0 + 0 + public + + + + + 0 + 0 + public + own with_getbymember + + + + 0 + 0 + public + + + + + +
+ +
+ diff --git a/apps/aqhome-mqttlog/mqttvalue.t2d b/apps/aqhome-mqttlog/mqttvalue.t2d new file mode 100644 index 0000000..8b0ea26 --- /dev/null +++ b/apps/aqhome-mqttlog/mqttvalue.t2d @@ -0,0 +1,94 @@ + + + + + + + + AQH_MQTT_VALUE + AQH_MqttValue + mqttvalue + + + with_xml + with_db + with_list1 + + + +
aqhome/api.h
+
+ + + + + + +
+ + + + + + + + + + + 0 + 0 + public + with_getbymember + + + + 0 + 0 + public + own + + + + + 0 + 0 + public + own + + + + 0 + 0 + public + + + + + 0 + 0 + public + own + + + + + 0 + 0 + public + + + + + 0 + 0 + public + own + + + + + +
+ +
+