/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./loop_tty_broker.h" #include "./aqhomed_p.h" #include "./tty_log.h" #include "./db.h" #include "aqhome/msg/endpoint_tty.h" #include "aqhome/msg/msg_node.h" #include "aqhome/msg/msg_value2.h" #include "aqhome/msg/msg_sendstats.h" #include "aqhome/msg/msg_recvstats.h" #include "aqhome/ipc/endpoint_ipc.h" #include "aqhome/ipc/data/msg_data_multidata.h" #include "aqhome/ipc/data/ipc_data.h" #include #include #include #include /* ------------------------------------------------------------------------------------------------ * defines * ------------------------------------------------------------------------------------------------ */ #define I18N(msg) msg #define I18S(msg) msg /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static void _processValue2Message(AQHOMED *aqh, const GWEN_MSG *nodeMsg); static void _processSendStatsMessage(AQHOMED *aqh, const GWEN_MSG *nodeMsg); static void _processRecvStatsMessage(AQHOMED *aqh, const GWEN_MSG *nodeMsg); static void _publishInt(AQHOMED *aqh, uint32_t uid, int valueId, const char *valueUnits, const char *valuePath, int v); static void _publishDouble(AQHOMED *aqh, uint32_t uid, int valueId, const char *valueUnits, const char *valuePath, double v); static void _setDeviceName(AQH_VALUE *value, uint32_t uid); /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ void AqHomed_ForwardTtyMsgToBroker(AQHOMED *aqh, const GWEN_MSG *nodeMsg) { if (GWEN_MsgEndpoint_GetState(aqh->brokerEndpoint)==GWEN_MSG_ENDPOINT_STATE_CONNECTED) { DBG_DEBUG(AQH_LOGDOMAIN, "Processing output message"); switch(AQH_NodeMsg_GetMsgType(nodeMsg)) { case AQH_MSG_TYPE_VALUE2: _processValue2Message(aqh, nodeMsg); break; case AQH_MSG_TYPE_COMSENDSTATS: _processSendStatsMessage(aqh, nodeMsg); break; case AQH_MSG_TYPE_COMRECVSTATS: _processRecvStatsMessage(aqh, nodeMsg); break; default: break; } } } void _processValue2Message(AQHOMED *aqh, const GWEN_MSG *nodeMsg) { _publishDouble(aqh, AQH_Value2Msg_GetUid(nodeMsg), AQH_Value2Msg_GetValueId(nodeMsg), AQH_Value2Msg_GetValueTypeUnits(nodeMsg), AQH_Value2Msg_GetValueTypeName(nodeMsg), AQH_Value2Msg_GetValue(nodeMsg)); } void _processSendStatsMessage(AQHOMED *aqh, const GWEN_MSG *nodeMsg) { uint16_t packetsOutInt; packetsOutInt=AQH_SendStatsMsg_GetPacketsOut(nodeMsg); if (packetsOutInt) { double packetsOut; double collisions; double busy; double collisionsPercentage=0.0; double busyPercentage=0.0; packetsOut=(double) packetsOutInt; collisions=(double)AQH_SendStatsMsg_GetCollisions(nodeMsg); busy=(double)AQH_SendStatsMsg_GetBusyErrors(nodeMsg); collisionsPercentage=collisions*100.0/packetsOut; busyPercentage=busy*100.0/packetsOut; _publishInt(aqh, AQH_SendStatsMsg_GetUid(nodeMsg), 0, NULL, "net/packetsOut", packetsOutInt); _publishInt(aqh, AQH_SendStatsMsg_GetUid(nodeMsg), 0, NULL, "net/collisions", (int) AQH_SendStatsMsg_GetCollisions(nodeMsg)); _publishDouble(aqh, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "%", "net/collisionsPercent", collisionsPercentage); _publishDouble(aqh, AQH_SendStatsMsg_GetUid(nodeMsg), 0, "%", "net/busyPercent", busyPercentage); } } void _processRecvStatsMessage(AQHOMED *aqh, const GWEN_MSG *nodeMsg) { uint16_t packetsInInt; packetsInInt=AQH_RecvStatsMsg_GetPacketsIn(nodeMsg); if (packetsInInt) { double packetsIn; double crcErrors; double ioErrors; double crcErrorsPercentage=0.0; double ioErrorsPercentage=0.0; packetsIn=(double) packetsInInt; crcErrors=(double)AQH_RecvStatsMsg_GetCrcErrors(nodeMsg); ioErrors=(double)AQH_RecvStatsMsg_GetIoErrors(nodeMsg); crcErrorsPercentage=crcErrors*100.0/packetsIn; ioErrorsPercentage=ioErrors*100.0/packetsIn; _publishInt(aqh, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, NULL, "net/packetsIn", packetsInInt); _publishInt(aqh, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, NULL, "net/crcerrors", (int) AQH_RecvStatsMsg_GetCrcErrors(nodeMsg)); _publishInt(aqh, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, NULL, "net/ioerrors", (int) AQH_RecvStatsMsg_GetIoErrors(nodeMsg)); _publishDouble(aqh, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "%", "net/crcerrorsPercent", crcErrorsPercentage); _publishDouble(aqh, AQH_RecvStatsMsg_GetUid(nodeMsg), 0, "%", "net/ioerrorsPercent", ioErrorsPercentage); } } void _publishInt(AQHOMED *aqh, uint32_t uid, int valueId, const char *valueUnits, const char *valuePath, int v) { _publishDouble(aqh, uid, valueId, valueUnits, valuePath, (double) v); } void _publishDouble(AQHOMED *aqh, uint32_t uid, int valueId, const char *valueUnits, const char *valuePath, double v) { GWEN_MSG *pubMsg; union {double f; uint64_t i;} u; uint64_t arrayToSend[2]; AQH_VALUE *value; u.f=v; arrayToSend[0]=(uint64_t) time(NULL); arrayToSend[1]=u.i; value=AQH_Value_new(); AQH_Value_SetName(value, valuePath); AQH_Value_SetValueUnits(value, valueUnits); AQH_Value_SetValueType(value, 0); _setDeviceName(value, uid); pubMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, value, arrayToSend, 1); if (pubMsg) { DBG_INFO(AQH_LOGDOMAIN, "BROKER PUBLISH %s: %f", AQH_Value_GetName(value), v); GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, pubMsg); } AQH_Value_free(value); } void _setDeviceName(AQH_VALUE *value, uint32_t uid) { GWEN_BUFFER *buf; buf=GWEN_Buffer_new(0, 64, 0, 1); GWEN_Buffer_AppendArgs(buf, "%08x", uid); AQH_Value_SetDeviceName(value, GWEN_Buffer_GetStart(buf)); GWEN_Buffer_free(buf); }