diff --git a/apps/0BUILD b/apps/0BUILD index 4fc63a3..3adc6a8 100644 --- a/apps/0BUILD +++ b/apps/0BUILD @@ -5,6 +5,7 @@ aqhomed aqhome-tool + aqhome-mqttlog diff --git a/apps/aqhome-mqttlog/0BUILD b/apps/aqhome-mqttlog/0BUILD new file mode 100644 index 0000000..519c03f --- /dev/null +++ b/apps/aqhome-mqttlog/0BUILD @@ -0,0 +1,69 @@ + + + + + + + + $(gwenhywfar_cflags) + -I$(topsrcdir) + -I$(topbuilddir) + -I$(builddir) + -I$(srcdir) + + + + --include=$(builddir) + --include=$(srcdir) + + + $(visibility_cflags) + + + + + + itemvar.t2d + item.t2d + + + + + + + + + + + + + + + + + $(local/typefiles) + main.c + + + + aqhome + + + + $(gwenhywfar_libs) + + + + + + + + + + + + + + + + diff --git a/apps/aqhome-mqttlog/item.t2d b/apps/aqhome-mqttlog/item.t2d new file mode 100644 index 0000000..d4f0fbc --- /dev/null +++ b/apps/aqhome-mqttlog/item.t2d @@ -0,0 +1,75 @@ + + + + + + + + ITEM + Item + item + + + with_db + with_list1 + + + +
aqhome/api.h
+
itemvar.h
+
+ + + + + + +
+ + + + + + public + own + const dup + const + + + + public + own + const dup + const + + + + public + own + const dup + const + + + + public + own + const dup + const + + + + NULL + ItemVar_List_new() + public + own + nodup + none + + + + + +
+ +
+ diff --git a/apps/aqhome-mqttlog/itemvar.t2d b/apps/aqhome-mqttlog/itemvar.t2d new file mode 100644 index 0000000..a202e81 --- /dev/null +++ b/apps/aqhome-mqttlog/itemvar.t2d @@ -0,0 +1,50 @@ + + + + + + + + ITEMVAR + ItemVar + itemvar + + + with_db + with_list1 + + + +
aqhome/api.h
+
+ + + + + + +
+ + + + + + public + own + const dup + const + + + + public + own + const dup + const + + + + +
+ +
+ diff --git a/apps/aqhome-mqttlog/main.c b/apps/aqhome-mqttlog/main.c new file mode 100644 index 0000000..bfc870e --- /dev/null +++ b/apps/aqhome-mqttlog/main.c @@ -0,0 +1,875 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./item.h" + +#include +#include + +#include +#include +#include +#include "aqhome/mqtt/endpoint_mqttc.h" +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef HAVE_SIGNAL_H +# include +#endif + +#ifdef HAVE_SYS_TYPES_H +# include +#endif + +#ifdef HAVE_SYS_STAT_H +# include +#endif + +#include +#include +#include +#include +#include +#include + + +#define I18N(msg) msg +#define I18S(msg) msg + +//#define FULL_DEBUG + + +static int _serve(GWEN_DB_NODE *dbArgs); +static GWEN_MSG_ENDPOINT *_createMqttEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs); +static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs); +static int _createPidFile(const char *pidFilename); +static ITEM_LIST *_readItems(GWEN_DB_NODE *dbArgs); + +static const ITEM *_getItemForTopic(const ITEM_LIST *itemList, const char *topic); +int _subscribe(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp, const char *topicFilter); +int _mqttConnect(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp); +GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType); + +void _handlePublishMsg(const char *baseFolder, const ITEM_LIST *itemList, const GWEN_MSG *msg); +void _handlePublish(const char *baseFolder, const ITEM_LIST *itemList, const char *topic, const char *value); +void _handleRawMsgForItem(const char *baseFolder, const ITEM *item, const char *value); +void _handleJsonMsgForItem(const char *baseFolder, const ITEM *item, const char *value); +void _handleJsonItemVar(const char *baseFolder, const ITEM *item, const ITEMVAR *itemVar, GWEN_JSON_ELEM *jeRoot); +void _writeValueAccordingToItem(const char *baseFolder, const ITEM *item, const ITEMVAR *itemVar, const char *value); +void _writeToFile(const char *filename, const char *txt); + + +#ifdef HAVE_SIGNAL_H +static int _setSignalHandlers(void); +static int _setupSigAction(struct sigaction *sa, int sig); +static void _signalHandler(int s); +static struct sigaction saINT,saTERM, saHUP, saTSTP, saCONT; +#endif + +static int stopService=0; + + + + +int main(int argc, char **argv) +{ + GWEN_DB_NODE *dbArgs; + int rv; + GWEN_GUI *gui; + const char *s; + + rv=GWEN_Init(); + if (rv) { + fprintf(stderr, "ERROR: Unable to init Gwen.\n"); + return 2; + } + + GWEN_Logger_Open(0, "aqhome-mqttlog", 0, GWEN_LoggerType_Console, GWEN_LoggerFacility_User); + GWEN_Logger_SetLevel(0, GWEN_LoggerLevel_Warning); + /*GWEN_Logger_SetLevel(0, GWEN_LoggerLevel_Info);*/ + + rv=AQH_Init(); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return 2; + } + + dbArgs=GWEN_DB_Group_new("arguments"); + rv=_readArgs(argc, argv, dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return 2; + } + else if (rv==1) { + DBG_INFO(NULL, "Help printed, done"); + return 0; + } + + gui=GWEN_Gui_CGui_new(); + s=GWEN_DB_GetCharValue(dbArgs, "charset", 0, NULL); + if (s && *s) + GWEN_Gui_SetCharSet(gui, s); + GWEN_Gui_SetGui(gui); + + rv=_serve(dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return 2; + } + + GWEN_DB_Group_free(dbArgs); + GWEN_Gui_SetGui(NULL); + GWEN_Gui_free(gui); + + return 0; +} + + + +int _serve(GWEN_DB_NODE *dbArgs) +{ + const char *pidFile; + GWEN_MSG_ENDPOINT_MGR *emgr; + GWEN_MSG_ENDPOINT *epTcp; + ITEM_LIST *itemList; + int rv; + int timeout; + time_t startTime; + const char *baseFolder; + + startTime=time(NULL); + + itemList=_readItems(dbArgs); + if (itemList==NULL) { + DBG_ERROR(NULL, "No items to listen to, aborting."); + return GWEN_ERROR_GENERIC; + } + + rv=_setSignalHandlers(); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + + baseFolder=GWEN_DB_GetCharValue(dbArgs, "writeToFolder", 0, "/tmp/aqhome"); + timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0); + pidFile=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, "aqhome-mqttlog.pid"); + if (pidFile && *pidFile) { + rv=_createPidFile(pidFile); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + } + + emgr=GWEN_MsgEndpointMgr_new(); + epTcp=_createMqttEndpoint(emgr, dbArgs); + if (epTcp==NULL) { + DBG_INFO(NULL, "here"); + GWEN_MsgEndpointMgr_free(emgr); + return GWEN_ERROR_GENERIC; + } + + rv=_mqttConnect(emgr, epTcp); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + GWEN_MsgEndpointMgr_free(emgr); + return rv; + } + + rv=_subscribe(emgr, epTcp, "#"); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + GWEN_MsgEndpointMgr_free(emgr); + return rv; + } + + while(!stopService) { + DBG_DEBUG(NULL, "Next loop"); + GWEN_MsgEndpointMgr_IoLoopOnce(emgr); + GWEN_MsgEndpointMgr_RunAllEndpoints(emgr); + if (GWEN_ConnectableMsgEndpoint_GetState(epTcp)timeout) { + DBG_INFO(NULL, "Timeout, stopping service"); + break; + } + } + } + + if (pidFile && *pidFile) + remove(pidFile); + + GWEN_MsgEndpointMgr_free(emgr); + + return 0; +} + + + +void _handlePublishMsg(const char *baseFolder, const ITEM_LIST *itemList, const GWEN_MSG *msg) +{ + char *topic; + char *value; + + topic=AQH_PublishMqttMsg_ExtractTopic(msg); + value=AQH_PublishMqttMsg_ExtractValue(msg); + + if (topic && value) + _handlePublish(baseFolder, itemList, topic, value); + else { + DBG_ERROR(NULL, "Either topic or value missing in PUBLISH msg"); + } + free(value); + free(topic); +} + + + +void _handlePublish(const char *baseFolder, const ITEM_LIST *itemList, const char *topic, const char *value) +{ + const ITEM *item; + + item=_getItemForTopic(itemList, topic); + if (item) { + const char *t; + + DBG_INFO(NULL, "HANDLING topic \"%s\"", topic); + t=Item_GetDataType(item); + if (t && strcasecmp(t, "json")==0) + _handleJsonMsgForItem(baseFolder, item, value); + else + _handleRawMsgForItem(baseFolder, item, value); + } + else { + DBG_INFO(NULL, "ignoring topic \"%s\"", topic); + } +} + + + +void _handleJsonMsgForItem(const char *baseFolder, const ITEM *item, const char *value) +{ + GWEN_JSON_ELEM *jeRoot; + + jeRoot=GWEN_JsonElement_fromString(value); + if (jeRoot==NULL) { + DBG_INFO(NULL, "Could not parse JSON value: %s", value); + } + else { + const ITEMVAR_LIST *itemVarList; + + itemVarList=Item_GetVarList(item); + if (itemVarList) { + ITEMVAR *itemVar; + + itemVar=ItemVar_List_First(itemVarList); + while(itemVar) { + _handleJsonItemVar(baseFolder, item, itemVar, jeRoot); + itemVar=ItemVar_List_Next(itemVar); + } + } + GWEN_JsonElement_free(jeRoot); + } +} + + + +void _handleJsonItemVar(const char *baseFolder, const ITEM *item, const ITEMVAR *itemVar, GWEN_JSON_ELEM *jeRoot) +{ + const char *path; + + path=ItemVar_GetPath(itemVar); + if (path) { + GWEN_JSON_ELEM *je; + + je=GWEN_JsonElement_GetElementByPath(jeRoot, path, 0); + if (je) { + const char *s; + + s=GWEN_JsonElement_GetData(je); + if (s && *s) + _writeValueAccordingToItem(baseFolder, item, itemVar, s); + else { + DBG_ERROR(NULL, "Path \"%s\" in JSON data contains no data", path); + } + } + else { + DBG_ERROR(NULL, "Path \"%s\" not found in JSON data", path); + } + } +} + + + +void _handleRawMsgForItem(const char *baseFolder, const ITEM *item, const char *value) +{ + const ITEMVAR_LIST *itemVarList; + + itemVarList=Item_GetVarList(item); + if (itemVarList) { + ITEMVAR *itemVar; + + itemVar=ItemVar_List_First(itemVarList); + while(itemVar) { + _writeValueAccordingToItem(baseFolder, item, itemVar, value); + itemVar=ItemVar_List_Next(itemVar); + } + } +} + + + +void _writeValueAccordingToItem(const char *baseFolder, const ITEM *item, const ITEMVAR *itemVar, const char *value) +{ + const char *id; + const char *name; + + id=Item_GetId(item); + name=ItemVar_GetName(itemVar); + if (id && *id && name && *name) { + GWEN_BUFFER *fnameBuf; + + fnameBuf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Buffer_AppendString(fnameBuf, baseFolder); + GWEN_Buffer_AppendString(fnameBuf, GWEN_DIR_SEPARATOR_S); + GWEN_Buffer_AppendArgs(fnameBuf, "%s_%s", id, name); + _writeToFile(GWEN_Buffer_GetStart(fnameBuf), value); + GWEN_Buffer_free(fnameBuf); + } + else { + DBG_ERROR(NULL, "Either id or name missing in item list file"); + } +} + + + +void _writeToFile(const char *filename, const char *txt) +{ + if (txt && *txt) { + GWEN_BUFFER *tmpNameBuf; + int rv; + + tmpNameBuf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Buffer_AppendString(tmpNameBuf, filename); + GWEN_Buffer_AppendString(tmpNameBuf, ".tmp"); + + rv=GWEN_Directory_GetPath(GWEN_Buffer_GetStart(tmpNameBuf), GWEN_PATH_FLAGS_VARIABLE); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error getting path for %s (%d)", GWEN_Buffer_GetStart(tmpNameBuf), rv); + } + else { + FILE *f; + + f=fopen(GWEN_Buffer_GetStart(tmpNameBuf), "w"); + if (f) { + if (1!=fwrite(txt, strlen(txt), 1, f)) { + DBG_ERROR(AQH_LOGDOMAIN, "Error writing."); + fclose(f); + } + else { + fclose(f); + rename(GWEN_Buffer_GetStart(tmpNameBuf), filename); + } + } + } + GWEN_Buffer_free(tmpNameBuf); + } +} + + + +const ITEM *_getItemForTopic(const ITEM_LIST *itemList, const char *topic) +{ + const ITEM *item; + + item=Item_List_First(itemList); + while(item) { + const char *s; + + s=Item_GetTopic(item); + if (s && GWEN_Text_ComparePattern(topic, s, 0)!=-1) + return item; + item=Item_List_Next(item); + } + + return NULL; +} + + + +int _subscribe(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp, const char *topicFilter) +{ + uint16_t pckId; + GWEN_MSG *msgOut; + GWEN_MSG *msgIn; + + DBG_INFO(NULL, "Sending SUBSCRIBE %s", topicFilter); + pckId=AQH_MqttClientEndpoint_GetNextPacketId(epTcp); + msgOut=GWEN_SubscribeMqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE, pckId, topicFilter, 0); + if (msgOut==NULL) { + DBG_ERROR(NULL, "Error creating message"); + return GWEN_ERROR_INTERNAL; + } + DBG_ERROR(NULL, "Sending this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msgOut), GWEN_Msg_GetBytesInBuffer(msgOut), 2); + + GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); + + DBG_INFO(NULL, "Waiting for response"); + msgIn=_awaitPacket(emgr, epTcp, AQH_MQTTMSG_MSGTYPE_SUBACK); + if (msgIn) { + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_SubAckMqttMsg_DumpToBuffer(msgIn, buf, "received"); + DBG_INFO(NULL, "%s", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + GWEN_Msg_free(msgIn); + } + + return 0; +} + + + +GWEN_MSG_ENDPOINT *_createMqttEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs) +{ + const char *mqttAddress; + int mqttPort; + const char *mqttClientId; + const char *mqttTopicPrefix; + int mqttKeepAlive; + + mqttAddress=GWEN_DB_GetCharValue(dbArgs, "mqttAddress", 0, NULL); + mqttPort=GWEN_DB_GetIntValue(dbArgs, "mqttPort", 0, 1883); + mqttClientId=GWEN_DB_GetCharValue(dbArgs, "mqttClientId", 0, "aqhome-mqttlog"); + mqttTopicPrefix=GWEN_DB_GetCharValue(dbArgs, "mqttTopicPrefix", 0, "aqhome/sensors"); + mqttKeepAlive=GWEN_DB_GetIntValue(dbArgs, "mqttKeepAlive", 0, 600); + + if (mqttAddress && *mqttAddress && mqttPort) { + GWEN_MSG_ENDPOINT *epMqtt; + + DBG_INFO(AQH_LOGDOMAIN, "Connecting to %s (port %d)", mqttAddress, mqttPort); + epMqtt=AQH_MqttClientEndpoint_new(mqttAddress, mqttPort, NULL, AQH_MSGMGR_ENDPOINTGROUP_MQTT); + if (epMqtt==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint TCP"); + return NULL; + } + GWEN_MsgEndpoint_SetAcceptedGroupIds(epMqtt, AQH_MSGMGR_ENDPOINTGROUP_NODE | AQH_MSGMGR_ENDPOINTGROUP_MQTT); + if (mqttClientId && *mqttClientId) + AQH_MqttClientEndpoint_SetClientId(epMqtt, mqttClientId); + if (mqttTopicPrefix && *mqttTopicPrefix) + AQH_MqttClientEndpoint_SetTopicPrefix(epMqtt, mqttTopicPrefix); + AQH_MqttClientEndpoint_SetKeepAliveTime(epMqtt, mqttKeepAlive); + + GWEN_MsgEndpointMgr_AddEndpoint(emgr, epMqtt); + return epMqtt; + } + + return NULL; +} + + + +int _setSignalHandlers(void) +{ +#ifdef HAVE_SIGNAL_H + int rv; + + rv=_setupSigAction(&saINT, SIGINT); + if (rv) + return rv; + + rv=_setupSigAction(&saTERM, SIGTERM); + if (rv) + return rv; + + rv=_setupSigAction(&saHUP, SIGHUP); + if (rv) + return rv; + +# ifdef SIGTSTP + rv=_setupSigAction(&saTSTP, SIGTSTP); + if (rv) + return rv; +# endif + +# ifdef SIGCONT + rv=_setupSigAction(&saCONT, SIGCONT); + if (rv) + return rv; +# endif +#endif + return 0; +} + + + +int _setupSigAction(struct sigaction *sa, int sig) +{ + sa->sa_handler=_signalHandler; + sigemptyset(&sa->sa_mask); + sa->sa_flags=0; + if (sigaction(sig, sa, 0)) { + DBG_ERROR(NULL, "Could not setup signal handler for signal %d", sig); + return GWEN_ERROR_IO; + } + + return 0; +} + + + +void _signalHandler(int s) +{ + switch(s) { + case SIGINT: + case SIGTERM: + case SIGHUP: + DBG_WARN(0, "Received signal %d, stopping service in next loop.",s); + stopService=1; + break; + default: + DBG_WARN(0, "Unknown signal %d",s); + break; + } +} + + + +int _createPidFile(const char *pidFilename) +{ + FILE *f; + int pidfd; + + if (remove(pidFilename)==0) { + DBG_ERROR(0, "Old PID file existed, removed. (Unclean shutdown?)"); + } + +#ifdef HAVE_SYS_STAT_H + pidfd = open(pidFilename, O_EXCL|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); + if (pidfd < 0) { + DBG_ERROR(NULL, "Could not create PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + + f = fdopen(pidfd, "w"); +#else /* HAVE_STAT_H */ + f=fopen(pidFilename,"w+"); +#endif /* HAVE_STAT_H */ + + /* write pid */ +#ifdef HAVE_GETPID + fprintf(f,"%d\n",getpid()); +#else + fprintf(f,"-1\n"); +#endif + if (fclose(f)) { + DBG_ERROR(0, "Could not close PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + return 0; +} + + + +int _mqttConnect(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp) +{ + while(GWEN_ConnectableMsgEndpoint_GetState(epTcp)