From d7ac2c74fbf20e4aba7c761fec76c033a3c4e49d Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Wed, 26 Apr 2023 17:33:44 +0200 Subject: [PATCH] aqhome: added plugin which writes received values into files. used by munin. --- apps/aqhomed/main.c | 42 ++++++ aqhome/msg/0BUILD | 4 +- aqhome/msg/endpoint_write.c | 263 ++++++++++++++++++++++++++++++++++ aqhome/msg/endpoint_write.h | 23 +++ aqhome/msg/endpoint_write_p.h | 29 ++++ 5 files changed, 360 insertions(+), 1 deletion(-) create mode 100644 aqhome/msg/endpoint_write.c create mode 100644 aqhome/msg/endpoint_write.h create mode 100644 aqhome/msg/endpoint_write_p.h diff --git a/apps/aqhomed/main.c b/apps/aqhomed/main.c index 61b594c..041d8a2 100644 --- a/apps/aqhomed/main.c +++ b/apps/aqhomed/main.c @@ -15,9 +15,11 @@ #include #include +#include #include #include #include +#include #include #include @@ -57,6 +59,7 @@ static GWEN_MSG_ENDPOINT_MGR *_setupService(GWEN_DB_NODE *dbArgs); static int _setupTty(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs); static int _setupLog(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs); +static int _setupWriter(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs); static int _setupIpc(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs); static int _setupMqtt(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs); @@ -217,6 +220,13 @@ GWEN_MSG_ENDPOINT_MGR *_setupService(GWEN_DB_NODE *dbArgs) return NULL; } + rv=_setupWriter(emgr, dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + GWEN_MsgEndpointMgr_free(emgr); + return NULL; + } + rv=_setupIpc(emgr, dbArgs); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); @@ -283,6 +293,27 @@ int _setupLog(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs) +int _setupWriter(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs) +{ + const char *writeToFolder; + + writeToFolder=GWEN_DB_GetCharValue(dbArgs, "writeToFolder", 0, NULL); + if (writeToFolder && *writeToFolder) { + GWEN_MSG_ENDPOINT *epWrite; + + epWrite=AQH_WriteEndpoint_new(writeToFolder, AQH_MSGMGR_ENDPOINTGROUP_NODE); + if (epWrite==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint WRITE"); + return GWEN_ERROR_GENERIC; + } + GWEN_MsgEndpoint_SetAcceptedGroupIds(epWrite, AQH_MSG_TYPEGROUP_INFO | AQH_MSG_TYPEGROUP_VALUES); + GWEN_MsgEndpointMgr_AddEndpoint(emgr, epWrite); + } + return 0; +} + + + int _setupIpc(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs) { const char *tcpAddress; @@ -491,6 +522,17 @@ int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) I18S("Specify DB file to read/write node database"), I18S("Specify DB file to read/write node database") }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "writeToFolder", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "W", /* short option */ + NULL, /* long option */ + I18S("Specify folder to write received values to"), + I18S("Specify folder to write received values to") + }, { GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ GWEN_ArgsType_Char, /* type */ diff --git a/aqhome/msg/0BUILD b/aqhome/msg/0BUILD index 1c9e706..39d361c 100644 --- a/aqhome/msg/0BUILD +++ b/aqhome/msg/0BUILD @@ -48,6 +48,7 @@ endpointmgr.h endpoint_node.h endpoint_log.h + endpoint_write.h endpoint_tty.h msg_node.h msg_ping.h @@ -77,7 +78,7 @@ endpoint_node_p.h endpoint_log_p.h endpoint_tty_p.h - msg_node_p.h + endpoint_write_p.h @@ -88,6 +89,7 @@ endpoint_node.c endpoint_log.c endpoint_tty.c + endpoint_write.c msg_node.c msg_ping.c msg_pong.c diff --git a/aqhome/msg/endpoint_write.c b/aqhome/msg/endpoint_write.c new file mode 100644 index 0000000..bf75dfd --- /dev/null +++ b/aqhome/msg/endpoint_write.c @@ -0,0 +1,263 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/msg/endpoint_write_p.h" +#include "aqhome/msg/endpoint_node.h" + +#include "aqhome/msg/msg_value2.h" +#include "aqhome/msg/msg_sendstats.h" +#include "aqhome/msg/msg_recvstats.h" +#include "aqhome/msg/msg_memstats.h" +#include "aqhome/msg/msg_sysstats.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + + + +#define AQH_MSG_ENDPOINT_WRITE_NAME "write" + + + +GWEN_INHERIT(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_WRITE) + + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); +static void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); +static void _processValue2Message(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); +static void _processSendStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); +static void _processRecvStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg); +static const char *_valueTypeToString(int t); +static void _writeDouble(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, double v); +static void _writeInt(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, int v); +static void _writeString(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, const char *v); +static void _writeToFile(const char *filename, const char *txt); + + + + + +GWEN_MSG_ENDPOINT *AQH_WriteEndpoint_new(const char *folder, int groupId) +{ + GWEN_MSG_ENDPOINT *ep; + AQH_MSG_ENDPOINT_WRITE *xep; + + ep=AQH_NodeEndpoint_new(AQH_MSG_ENDPOINT_WRITE_NAME, groupId); + GWEN_NEW_OBJECT(AQH_MSG_ENDPOINT_WRITE, xep); + xep->folder=strdup(folder); + GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_WRITE, ep, xep, _freeData); + AQH_NodeEndpoint_SetAcceptedMsgGroups(ep, AQH_MSG_TYPEGROUP_INFO | AQH_MSG_TYPEGROUP_VALUES); + GWEN_MsgEndpoint_AddFlags(ep, GWEN_MSG_ENDPOINT_FLAGS_NOIO); + GWEN_MsgEndpoint_SetProcessOutMsgFn(ep, _processOutMessage); + + return ep; +} + + + +void _freeData(void *bp, void *p) +{ + AQH_MSG_ENDPOINT_WRITE *xep; + + xep=(AQH_MSG_ENDPOINT_WRITE*) p; + free(xep->folder); + GWEN_FREE_OBJECT(xep); +} + + + +void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) +{ + DBG_DEBUG(AQH_LOGDOMAIN, "Processing output message"); + switch(AQH_NodeMsg_GetMsgType(nodeMsg)) { + case AQH_MSG_TYPE_VALUE2: + _processValue2Message(ep, nodeMsg); + break; + case AQH_MSG_TYPE_COMSENDSTATS: + _processSendStatsMessage(ep, nodeMsg); + break; + case AQH_MSG_TYPE_COMRECVSTATS: + _processRecvStatsMessage(ep, nodeMsg); + break; + default: + break; + } + GWEN_Msg_free(nodeMsg); +} + + + + +void _processValue2Message(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) +{ + _writeDouble(ep, + AQH_Value2Msg_GetUid(nodeMsg), + AQH_Value2Msg_GetValueId(nodeMsg), + _valueTypeToString(AQH_Value2Msg_GetValueId(nodeMsg)), + AQH_Value2Msg_GetValue(nodeMsg)); +} + + + +void _processSendStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) +{ + uint16_t packetsOutInt; + + packetsOutInt=AQH_SendStatsMsg_GetPacketsOut(nodeMsg); + if (packetsOutInt) { + double packetsOut; + double collisions; + double busy; + double collisionsPercentage=0.0; + double busyPercentage=0.0; + + packetsOut=(double) packetsOutInt; + collisions=(double)AQH_SendStatsMsg_GetCollisions(nodeMsg); + busy=(double)AQH_SendStatsMsg_GetBusyErrors(nodeMsg); + + collisionsPercentage=collisions*100.0/packetsOut; + busyPercentage=busy*100.0/packetsOut; + + _writeInt(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/packetsOut", packetsOutInt); + _writeInt(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/collisions", (int) AQH_SendStatsMsg_GetCollisions(nodeMsg)); + _writeDouble(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/collisionsPercent", collisionsPercentage); + _writeDouble(ep, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "net/busyPercent", busyPercentage); + } +} + + + +void _processRecvStatsMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) +{ + uint16_t packetsInInt; + + packetsInInt=AQH_RecvStatsMsg_GetPacketsIn(nodeMsg); + if (packetsInInt) { + double packetsIn; + double crcErrors; + double ioErrors; + double crcErrorsPercentage=0.0; + double ioErrorsPercentage=0.0; + + packetsIn=(double) packetsInInt; + crcErrors=(double)AQH_RecvStatsMsg_GetCrcErrors(nodeMsg); + ioErrors=(double)AQH_RecvStatsMsg_GetIoErrors(nodeMsg); + + crcErrorsPercentage=crcErrors*100.0/packetsIn; + ioErrorsPercentage=ioErrors*100.0/packetsIn; + + _writeInt(ep, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "net/packetsIn", packetsInInt); + _writeInt(ep, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "net/crcerrors", (int) AQH_RecvStatsMsg_GetCrcErrors(nodeMsg)); + _writeInt(ep, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "net/ioerrors", (int) AQH_RecvStatsMsg_GetIoErrors(nodeMsg)); + _writeDouble(ep, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "net/crcerrorsPercent", crcErrorsPercentage); + _writeDouble(ep, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "net/ioerrorsPercent", ioErrorsPercentage); + } +} + + + +const char *_valueTypeToString(int t) +{ + switch(t) { + case AQH_MSG_VALUE2_TYPE_TEMP: return "temperature"; + case AQH_MSG_VALUE2_TYPE_HUMIDITY: return "humidity"; + default: return "unknown"; + } +} + + + +void _writeDouble(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, double v) +{ + char numBuf[16]; + + snprintf(numBuf, sizeof(numBuf)-1, "%f", v); + numBuf[sizeof(numBuf)-1]=0; + _writeString(ep, uid, valueId, valuePath, numBuf); +} + + + +void _writeInt(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, int v) +{ + char numBuf[16]; + + snprintf(numBuf, sizeof(numBuf)-1, "%d", v); + numBuf[sizeof(numBuf)-1]=0; + _writeString(ep, uid, valueId, valuePath, numBuf); +} + + + +void _writeString(GWEN_MSG_ENDPOINT *ep, uint32_t uid, int valueId, const char *valuePath, const char *v) +{ + AQH_MSG_ENDPOINT_WRITE *xep; + GWEN_BUFFER *bufFilename; + + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_WRITE, ep); + + bufFilename=GWEN_Buffer_new(0, 64, 0, 1); + if (valueId>0) + GWEN_Buffer_AppendArgs(bufFilename, "%s/%08x/%d/%s", xep->folder, uid, valueId, valuePath); + else + GWEN_Buffer_AppendArgs(bufFilename, "%s/%08x/%s", xep->folder, uid, valuePath); + _writeToFile(GWEN_Buffer_GetStart(bufFilename), v); + GWEN_Buffer_free(bufFilename); +} + + + +void _writeToFile(const char *filename, const char *txt) +{ + if (txt && *txt) { + GWEN_BUFFER *tmpNameBuf; + int rv; + + tmpNameBuf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Buffer_AppendString(tmpNameBuf, filename); + GWEN_Buffer_AppendString(tmpNameBuf, ".tmp"); + + rv=GWEN_Directory_GetPath(GWEN_Buffer_GetStart(tmpNameBuf), GWEN_PATH_FLAGS_VARIABLE); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error getting path for %s (%d)", GWEN_Buffer_GetStart(tmpNameBuf), rv); + } + else { + FILE *f; + + f=fopen(GWEN_Buffer_GetStart(tmpNameBuf), "w"); + if (f) { + if (1!=fwrite(txt, strlen(txt), 1, f)) { + DBG_ERROR(AQH_LOGDOMAIN, "Error writing."); + fclose(f); + } + else { + fclose(f); + rename(GWEN_Buffer_GetStart(tmpNameBuf), filename); + } + } + } + GWEN_Buffer_free(tmpNameBuf); + } +} + + + + + diff --git a/aqhome/msg/endpoint_write.h b/aqhome/msg/endpoint_write.h new file mode 100644 index 0000000..26288d0 --- /dev/null +++ b/aqhome/msg/endpoint_write.h @@ -0,0 +1,23 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_ENDPOINT_WRITE_H +#define AQH_ENDPOINT_WRITE_H + + +#include "aqhome/msg/endpoint_node.h" + + + +AQHOME_API GWEN_MSG_ENDPOINT *AQH_WriteEndpoint_new(const char *folder, int groupId); + + + + +#endif + diff --git a/aqhome/msg/endpoint_write_p.h b/aqhome/msg/endpoint_write_p.h new file mode 100644 index 0000000..57639cd --- /dev/null +++ b/aqhome/msg/endpoint_write_p.h @@ -0,0 +1,29 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_ENDPOINT_WRITE_P_H +#define AQH_ENDPOINT_WRITE_P_H + + +#include + +#include "aqhome/msg/endpoint_write.h" + +#include +#include + + +typedef struct AQH_MSG_ENDPOINT_WRITE AQH_MSG_ENDPOINT_WRITE; +struct AQH_MSG_ENDPOINT_WRITE { + char *folder; +}; + + + +#endif +