/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./loop.h" #include "./aqhome_data_p.h" #include "aqhome/ipc/data/ipc_data.h" #include "aqhome/ipc/data/msg_data_values.h" #include "aqhome/ipc/data/msg_data_datapoints.h" #include "aqhome/ipc/msg_ipc_result.h" #include #include #include #include #include /* ------------------------------------------------------------------------------------------------ * defines * ------------------------------------------------------------------------------------------------ */ #define AQHOMEDATA_VALUESPERMSG 10 /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static void _readAndHandleIpcMessages(AQHOME_DATA *aqh); static void _handleIpcEndpoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep); static void _handleIpcMsg(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleGetValues(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _sendValueList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE_LIST *vl, uint32_t flags); static void _handleAddValue(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleEditValues(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleAddDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleGetDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleGetLastDataPoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ void AqHomeData_Loop(AQHOME_DATA *aqh, int timeoutInMsecs) { if (aqh) { GWEN_MsgEndpoint_IoLoop(aqh->ipcdEndpoint, timeoutInMsecs); _readAndHandleIpcMessages(aqh); } } int AqHomeData_WriteStorageIfChanged(AQHOME_DATA *aqh) { if (AQH_Storage_GetRuntimeFlags(aqh->storage) & AQH_STORAGE_RTFLAGS_MODIFIED) { int rv; DBG_INFO(NULL, "Storage modified, writing statefile"); rv=AqHomeData_LockStorage(aqh); if (rv<0) { DBG_INFO(NULL, "Error locking storage (%d)", rv); return rv; } rv=AQH_Storage_WriteState(aqh->storage); if (rv<0) { DBG_INFO(NULL, "Error writing state file (%d)", rv); AqHomeData_UnlockStorage(aqh); return rv; } rv=AqHomeData_UnlockStorage(aqh); if (rv<0) { DBG_INFO(NULL, "Error unlocking storage (%d)", rv); return rv; } } return 0; } void _readAndHandleIpcMessages(AQHOME_DATA *aqh) { if (aqh->ipcdEndpoint) { GWEN_MSG_ENDPOINT *ep; ep=GWEN_MsgEndpoint_Tree2_GetFirstChild(aqh->ipcdEndpoint); while(ep) { _handleIpcEndpoint(aqh, ep); ep=GWEN_MsgEndpoint_Tree2_GetNext(ep); } } } void _handleIpcEndpoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep) { GWEN_MSG *msg; while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep)) ) { _handleIpcMsg(aqh, ep, msg); GWEN_Msg_free(msg); } } void _handleIpcMsg(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { uint16_t code; /* exec IPC message */ code=GWEN_IpcMsg_GetCode(msg); DBG_ERROR(AQH_LOGDOMAIN, "Received IPC packet %d", (int) code); switch(code) { case AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ: _handleGetValues(aqh, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_ADDVALUES_REQ: _handleAddValue(aqh, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_EDITVALUE_REQ: _handleEditValues(aqh, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_ADDDATAPOINTS_REQ: _handleAddDataPoints(aqh, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_GETDATAPOINTS_REQ: _handleGetDataPoints(aqh, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_GETLASTDATAPOINT_REQ: _handleGetLastDataPoint(aqh, ep, msg); break; default: break; } } void _handleGetValues(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { const AQH_VALUE_LIST *origValueList; origValueList=AQH_Storage_GetValueList(aqh->storage); if (origValueList) { if (AQH_Value_List_GetCount(origValueList)=AQHOMEDATA_VALUESPERMSG) { _sendValueList(aqh, ep, tmpValueList, next?0:AQH_MSGDATA_VALUES_FLAGS_LASTMSG); AQH_Value_List_Clear(tmpValueList); } v=next; } if (AQH_Value_List_GetCount(tmpValueList)) _sendValueList(aqh, ep, tmpValueList, AQH_MSGDATA_VALUES_FLAGS_LASTMSG); /* send remaining */ AQH_Value_List_free(tmpValueList); } } else { /* empty list */ _sendValueList(aqh, ep, NULL, AQH_MSGDATA_VALUES_FLAGS_LASTMSG); } } void _sendValueList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE_LIST *vl, uint32_t flags) { GWEN_MSG *msg; msg=AQH_ValuesDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP, flags, vl); GWEN_MsgEndpoint_AddSendMessage(ep, msg); } void _handleAddValue(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *recvdMsg) { GWEN_MSG *outMsg; int resultCode=0; if (AQH_ValuesDataIpcMsg_IsValid(recvdMsg)) { uint32_t numValues; numValues=AQH_ValuesDataIpcMsg_GetNumValues(recvdMsg); if (numValues==1) { const char *s; s=AQH_ValuesDataIpcMsg_GetValueName(recvdMsg, 0); if (s && *s) { if (AQH_Storage_GetValueByName(aqh->storage, s)==NULL) { AQH_VALUE *v; v=AQH_Value_new(); AQH_Value_SetName(v, s); s=AQH_ValuesDataIpcMsg_GetValueUnits(recvdMsg, 0); if (s && *s) AQH_Value_SetValueUnits(v, s); DBG_INFO(NULL, "Adding value \"%s\" (%s)", AQH_Value_GetName(v), AQH_Value_GetValueUnits(v)); AQH_Storage_AddValue(aqh->storage, v); AQH_Storage_AddRuntimeFlags(aqh->storage, AQH_STORAGE_RTFLAGS_MODIFIED); resultCode=AQH_MSG_IPC_SUCCESS; } else { DBG_INFO(NULL, "Value \"%s\" already exists", s); resultCode=AQH_MSG_IPC_ERROR_EXISTS; } } else { DBG_INFO(NULL, "Value without name "); resultCode=AQH_MSG_IPC_ERROR_INVALID; } } else { DBG_INFO(NULL, "Invalid number of values in message (%d)", numValues); resultCode=AQH_MSG_IPC_ERROR_INVALID; } } else { DBG_INFO(NULL, "Invalid message received"); resultCode=AQH_MSG_IPC_ERROR_BADDATA; } outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode); GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); } void _handleEditValues(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *recvdMsg) { GWEN_MSG *outMsg; int resultCode=0; if (AQH_ValuesDataIpcMsg_IsValid(recvdMsg)) { uint32_t numValues; numValues=AQH_ValuesDataIpcMsg_GetNumValues(recvdMsg); if (numValues==1) { const char *s; s=AQH_ValuesDataIpcMsg_GetValueName(recvdMsg, 0); if (s && *s) { AQH_VALUE *v; v=AQH_Storage_GetValueByName(aqh->storage, s); if (v==NULL) { DBG_INFO(NULL, "Value \"%s\" doesn't not exist", s); resultCode=AQH_MSG_IPC_ERROR_EXISTS; } else { DBG_INFO(NULL, "Updating value \"%s\" (%s)", AQH_Value_GetName(v), AQH_Value_GetValueUnits(v)); s=AQH_ValuesDataIpcMsg_GetValueUnits(recvdMsg, 0); if (s && *s) AQH_Value_SetValueUnits(v, s); AQH_Storage_AddRuntimeFlags(aqh->storage, AQH_STORAGE_RTFLAGS_MODIFIED); resultCode=AQH_MSG_IPC_SUCCESS; } } else { DBG_INFO(NULL, "Value without name "); resultCode=AQH_MSG_IPC_ERROR_INVALID; } } else { DBG_INFO(NULL, "Invalid number of values in message (%d)", numValues); resultCode=AQH_MSG_IPC_ERROR_INVALID; } } else { DBG_INFO(NULL, "Invalid message received"); resultCode=AQH_MSG_IPC_ERROR_BADDATA; } outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode); GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); } void _handleAddDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *recvdMsg) { GWEN_MSG *outMsg; int resultCode=0; if (AQH_DataPointsDataIpcMsg_IsValid(recvdMsg)) { uint32_t numValues; numValues=AQH_DataPointsDataIpcMsg_GetNumValues(recvdMsg); if (numValues) { const char *s; s=AQH_DataPointsDataIpcMsg_GetValueName(recvdMsg); if (s && *s) { AQH_VALUE *v; v=AQH_Storage_GetValueByName(aqh->storage, s); if (v==NULL) { // TODO: maybe create the value on the fly DBG_INFO(NULL, "Value \"%s\" doesn't not exist", s); resultCode=AQH_MSG_IPC_ERROR_EXISTS; } else { const uint64_t *dataPoints; dataPoints=AQH_DataPointsDataIpcMsg_GetDataPoints(recvdMsg); if (dataPoints) { uint32_t i; for(i=0; istorage, AQH_Value_GetId(v), timestamp, u.f); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); resultCode=AQH_MSG_IPC_ERROR_GENERIC; } else { DBG_INFO(NULL, "Datapoint added for value \"%s\"", s); resultCode=0; } } /* for */ } /* if datapoints */ else { DBG_INFO(NULL, "No datapoints"); resultCode=AQH_MSG_IPC_ERROR_BADDATA; } } } else { DBG_INFO(NULL, "Value without name "); resultCode=AQH_MSG_IPC_ERROR_INVALID; } } else { DBG_INFO(NULL, "No datapoints"); resultCode=AQH_MSG_IPC_ERROR_BADDATA; } } else { DBG_INFO(NULL, "Invalid message received"); resultCode=AQH_MSG_IPC_ERROR_BADDATA; } outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode); GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); } void _handleGetDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { } void _handleGetLastDataPoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { }