/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2024 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./loop.h" #include "./aqhome_react_p.h" #include "aqhome-react/units/u_varchanges.h" #include "aqhome/ipc/data/ipc_data.h" #include "aqhome/ipc/data/msg_data_multidata.h" #include "aqhome/ipc/msg_ipc_tag16.h" #include #include /* ------------------------------------------------------------------------------------------------ * defines * ------------------------------------------------------------------------------------------------ */ /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static void _handleDataResponse(AQHREACT_UNIT *varChangeUnit, GWEN_MSG *msg); static int _processAllNets(AQHOME_REACT *aqh); static int _processNet(AQHREACT_UNIT_NET *unitNet); /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ void AqHomeReact_IoLoop(AQHOME_REACT *aqh, int timeoutInMilliSecs) { GWEN_MSG *msg; GWEN_MsgEndpoint_IoLoop(aqh->brokerEndpoint, timeoutInMilliSecs); while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(aqh->brokerEndpoint)) ) { uint16_t code; code=GWEN_IpcMsg_GetCode(msg); if (code==AQH_MSGTYPE_IPC_DATA_DATACHANGED) { DBG_INFO(NULL, "Received expected IPC message"); _handleDataResponse(aqh->varChangeUnit, msg); } else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { DBG_INFO(NULL, "Received IPC result message, ignoring"); } else { DBG_INFO(NULL, "Received unexpected message %d (%x)", code, code); } GWEN_Msg_free(msg); } /* while */ } void AqHomeReact_ProcessAllUnits(AQHOME_REACT *aqh) { int rv; do { rv=_processAllNets(aqh); } while (rv==1); } int _processAllNets(AQHOME_REACT *aqh) { int result=0; int rv; AQHREACT_UNIT_NET *unitNet; rv=AQHREACT_Unit_Process(aqh->varChangeUnit); if (rv>0) result=1; rv=AQHREACT_Unit_Process(aqh->timerUnit); if (rv>0) result=1; unitNet=AQHREACT_UnitNet_List_First(aqh->unitNetList); while(unitNet) { rv=_processNet(unitNet); if (rv>0) result=1; unitNet=AQHREACT_UnitNet_List_Next(unitNet); } return result; } int _processNet(AQHREACT_UNIT_NET *unitNet) { AQHREACT_UNIT_LIST *unitList; const char *netName; netName=AQHREACT_UnitNet_GetName(unitNet); DBG_INFO(NULL, "Processing net \"%s\"", netName?netName:""); unitList=AQHREACT_UnitNet_GetUnitList(unitNet); if (unitList) { int result=0; AQHREACT_UNIT *unit; unit=AQHREACT_Unit_List_First(unitList); while(unit) { int rv; rv=AQHREACT_Unit_Process(unit); if (rv>0) result=1; unit=AQHREACT_Unit_List_Next(unit); } return result; } return 0; } void _handleDataResponse(AQHREACT_UNIT *varChangeUnit, GWEN_MSG *msg) { AQH_VALUE *value; const GWEN_TAG16 *tag; unsigned int numberOfPoints; const uint64_t *dataPoints; AQH_MultiDataDataIpcMsg_Parse(msg, 0); value=AQH_MultiDataDataIpcMsg_ReadValue(msg); tag=AQH_Tag16IpcMsg_FindFirstTagByType(msg, AQH_MSGDATA_MULTIDATA_TAGS_DATA); numberOfPoints=(tag?GWEN_Tag16_GetTagLength(tag):0)/(2*sizeof(uint64_t)); dataPoints=tag?((const uint64_t*) GWEN_Tag16_GetTagData(tag)):NULL; if (numberOfPoints>0 && dataPoints) { uint32_t i; for(i=0; i