Files
aqhomecontrol/apps/aqhome-react/loop.c
Martin Preuss 1050ee1c75 aqhome-react: major rebuild of unit handling.
now nested networks are allowed to allow for complex networks.
2024-04-17 22:26:17 +02:00

136 lines
3.4 KiB
C

/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2024 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./loop.h"
#include "./aqhome_react_p.h"
#include "aqhome-react/units/u_varchanges.h"
#include "aqhome/ipc/data/ipc_data.h"
#include "aqhome/ipc/data/msg_data_multidata.h"
#include "aqhome/ipc/msg_ipc_tag16.h"
#include <gwenhywfar/debug.h>
#include <time.h>
/* ------------------------------------------------------------------------------------------------
* defines
* ------------------------------------------------------------------------------------------------
*/
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static void _handleDataResponse(AQHREACT_UNIT *varChangeUnit, GWEN_MSG *msg);
static int _processAllUnits(AQHOME_REACT *aqh);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
void AqHomeReact_IoLoop(AQHOME_REACT *aqh, int timeoutInMilliSecs)
{
GWEN_MSG *msg;
GWEN_MsgEndpoint_IoLoop(aqh->brokerEndpoint, timeoutInMilliSecs);
while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(aqh->brokerEndpoint)) ) {
uint16_t code;
code=GWEN_IpcMsg_GetCode(msg);
if (code==AQH_MSGTYPE_IPC_DATA_DATACHANGED) {
DBG_DEBUG(NULL, "Received expected IPC message");
_handleDataResponse(aqh->varChangeUnit, msg);
}
else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
DBG_INFO(NULL, "Received IPC result message, ignoring");
}
else {
DBG_INFO(NULL, "Received unexpected message %d (%x)", code, code);
}
GWEN_Msg_free(msg);
} /* while */
}
void AqHomeReact_ProcessAllUnits(AQHOME_REACT *aqh)
{
int rv;
do {
rv=_processAllUnits(aqh);
} while (rv==1);
}
int _processAllUnits(AQHOME_REACT *aqh)
{
int result=0;
AQHREACT_UNIT *unit;
unit=AQHREACT_Unit_List_First(aqh->unitList);
while(unit) {
int rv;
rv=AQHREACT_Unit_Process(unit);
if (rv>0)
result=1;
unit=AQHREACT_Unit_List_Next(unit);
}
return result;
}
void _handleDataResponse(AQHREACT_UNIT *varChangeUnit, GWEN_MSG *msg)
{
AQH_VALUE *value;
const GWEN_TAG16 *tag;
unsigned int numberOfPoints;
const uint64_t *dataPoints;
AQH_MultiDataDataIpcMsg_Parse(msg, 0);
value=AQH_MultiDataDataIpcMsg_ReadValue(msg);
tag=AQH_Tag16IpcMsg_FindFirstTagByType(msg, AQH_MSGDATA_MULTIDATA_TAGS_DATA);
numberOfPoints=(tag?GWEN_Tag16_GetTagLength(tag):0)/(2*sizeof(uint64_t));
dataPoints=tag?((const uint64_t*) GWEN_Tag16_GetTagData(tag)):NULL;
if (numberOfPoints>0 && dataPoints) {
uint32_t i;
for(i=0; i<numberOfPoints; i++) {
uint64_t timestamp;
union {double f; uint64_t i;} u;
timestamp=*(dataPoints++);
u.i=*(dataPoints++);
AqHomeReact_UnitVarChanges_ValueUpdated(varChangeUnit, value, timestamp, u.f);
}
}
AQH_Value_free(value);
}