/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2024 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "aqhome/data/storage_p.h" #include "aqhome/data/storage_readxml.h" #include "aqhome/data/storage_writexml.h" #include "aqhome/data/datafile/df_direct/df_direct.h" #include #include #include #include #include #define AQH_STORAGE_DATAPOINTS_STEPS 128 /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId); static AQH_DATAFILE *_findDataFileByValueId(const AQH_STORAGE *sto, uint64_t valueId); static AQH_DATAFILE *_openOrCreateDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId); static GWEN_BUFFER *_getDataFilePathForValueId(const AQH_STORAGE *sto, uint64_t valueId); /* ------------------------------------------------------------------------------------------------ * GWEN macros * ------------------------------------------------------------------------------------------------ */ GWEN_INHERIT_FUNCTIONS(AQH_STORAGE) /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ AQH_STORAGE *AQH_Storage_new(void) { AQH_STORAGE *sto; GWEN_NEW_OBJECT(AQH_STORAGE, sto); GWEN_INHERIT_INIT(AQH_STORAGE, sto); sto->deviceList=AQH_Device_List_new(); sto->valueList=AQH_Value_List_new(); sto->dataFileList=AQH_DataFile_List_new(); return sto; } void AQH_Storage_free(AQH_STORAGE *sto) { if (sto) { GWEN_INHERIT_FINI(AQH_STORAGE, sto); AQH_DataFile_List_free(sto->dataFileList); AQH_Value_List_free(sto->valueList); AQH_Device_List_free(sto->deviceList); free(sto->dataFileFolder); free(sto->stateFile); GWEN_FREE_OBJECT(sto); } } const char *AQH_Storage_GetStateFile(const AQH_STORAGE *sto) { return sto?(sto->stateFile):NULL; } void AQH_Storage_SetStateFile(AQH_STORAGE *sto, const char *s) { if (sto) { free(sto->stateFile); sto->stateFile=s?strdup(s):NULL; } } void AQH_Storage_AddValue(AQH_STORAGE *sto, AQH_VALUE *value) { if (sto && value) { uint64_t id; id=++(sto->lastValueId); AQH_Value_SetId(value, id); AQH_Value_List_Add(value, sto->valueList); AQH_Storage_AddRuntimeFlags(sto, AQH_STORAGE_RTFLAGS_MODIFIED); } } AQH_VALUE_LIST *AQH_Storage_GetValueList(const AQH_STORAGE *sto) { return sto?sto->valueList:NULL; } AQH_VALUE *AQH_Storage_GetValueById(const AQH_STORAGE *sto, uint64_t id) { return sto?AQH_Value_List_GetById(sto->valueList, id):NULL; } AQH_VALUE *AQH_Storage_GetValueByNameForSystem(const AQH_STORAGE *sto, const char *s) { return sto?AQH_Value_List_GetByNameForSystem(sto->valueList, s):NULL; } void AQH_Storage_AddDevice(AQH_STORAGE *sto, AQH_DEVICE *device) { if (sto && device) { uint64_t id; id=++(sto->lastDeviceId); AQH_Device_SetId(device, id); AQH_Device_List_Add(device, sto->deviceList); AQH_Storage_AddRuntimeFlags(sto, AQH_STORAGE_RTFLAGS_MODIFIED); } } AQH_DEVICE_LIST *AQH_Storage_GetDeviceList(const AQH_STORAGE *sto) { return sto?sto->deviceList:NULL; } AQH_DEVICE *AQH_Storage_GetDeviceById(const AQH_STORAGE *sto, uint64_t id) { return sto?AQH_Device_List_GetById(sto->deviceList, id):NULL; } AQH_DEVICE *AQH_Storage_GetDeviceByNameForSystem(const AQH_STORAGE *sto, const char *s) { return sto?AQH_Device_List_GetByNameForSystem(sto->deviceList, s):NULL; } uint32_t AQH_Storage_GetRuntimeFlags(const AQH_STORAGE *sto) { return sto?sto->runtimeFlags:0; } void AQH_Storage_SetRuntimeFlags(AQH_STORAGE *sto, uint32_t flags) { if (sto) sto->runtimeFlags=flags; } void AQH_Storage_AddRuntimeFlags(AQH_STORAGE *sto, uint32_t flags) { if (sto) sto->runtimeFlags|=flags; } void AQH_Storage_SubRuntimeFlags(AQH_STORAGE *sto, uint32_t flags) { if (sto) sto->runtimeFlags&=~flags; } int AQH_Storage_WriteState(AQH_STORAGE *sto) { int rv; rv=AQH_Storage_WriteStateFile(sto, sto->stateFile); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } sto->runtimeFlags&=~AQH_STORAGE_RTFLAGS_MODIFIED; return 0; } const char *AQH_Storage_GetDataFileFolder(const AQH_STORAGE *sto) { return sto?sto->dataFileFolder:NULL; } void AQH_Storage_SetDataFileFolder(AQH_STORAGE *sto, const char *s) { if (sto) { free(sto->dataFileFolder); sto->dataFileFolder=s?strdup(s):NULL; } } AQH_STORAGE_INIT_FN AQH_Storage_SetInitFn(AQH_STORAGE *sto, AQH_STORAGE_INIT_FN fn) { AQH_STORAGE_INIT_FN oldFn; oldFn=sto->initFn; sto->initFn=fn; return oldFn; } AQH_STORAGE_FINI_FN AQH_Storage_SetFiniFn(AQH_STORAGE *sto, AQH_STORAGE_FINI_FN fn) { AQH_STORAGE_FINI_FN oldFn; oldFn=sto->finiFn; sto->finiFn=fn; return oldFn; } AQH_STORAGE_ADDDATAPOINT_FN AQH_Storage_SetAddDatapointFn(AQH_STORAGE *sto, AQH_STORAGE_ADDDATAPOINT_FN fn) { AQH_STORAGE_ADDDATAPOINT_FN oldFn; oldFn=sto->addDatapointFn; sto->addDatapointFn=fn; return oldFn; } AQH_STORAGE_GETDATAPOINTS_FN AQH_Storage_SetGetDatapointsFn(AQH_STORAGE *sto, AQH_STORAGE_GETDATAPOINTS_FN fn) { AQH_STORAGE_GETDATAPOINTS_FN oldFn; oldFn=sto->getDatapointsFn; sto->getDatapointsFn=fn; return oldFn; } AQH_STORAGE_GETFIRSTDATAPOINT_FN AQH_Storage_SetGetFirstDatapointFn(AQH_STORAGE *sto, AQH_STORAGE_GETFIRSTDATAPOINT_FN fn) { AQH_STORAGE_GETFIRSTDATAPOINT_FN oldFn; oldFn=sto->getFirstDatapointFn; sto->getFirstDatapointFn=fn; return oldFn; } AQH_STORAGE_GETLASTDATAPOINT_FN AQH_Storage_SetGetLastDatapointFn(AQH_STORAGE *sto, AQH_STORAGE_GETLASTDATAPOINT_FN fn) { AQH_STORAGE_GETLASTDATAPOINT_FN oldFn; oldFn=sto->getLastDatapointFn; sto->getLastDatapointFn=fn; return oldFn; } AQH_STORAGE_GETFIRSTNDATAPOINTS_FN AQH_Storage_SetGetFirstNDatapointsFn(AQH_STORAGE *sto, AQH_STORAGE_GETFIRSTNDATAPOINTS_FN fn) { AQH_STORAGE_GETFIRSTNDATAPOINTS_FN oldFn; oldFn=sto->getFirstNDatapointsFn; sto->getFirstNDatapointsFn=fn; return oldFn; } AQH_STORAGE_GETLASTNDATAPOINTS_FN AQH_Storage_SetGetLastNDatapointsFn(AQH_STORAGE *sto, AQH_STORAGE_GETLASTNDATAPOINTS_FN fn) { AQH_STORAGE_GETLASTNDATAPOINTS_FN oldFn; oldFn=sto->getLastNDatapointsFn; sto->getLastNDatapointsFn=fn; return oldFn; } int AQH_Storage_Init(AQH_STORAGE *sto) { int rv; rv=GWEN_Directory_GetPath(sto->stateFile, GWEN_PATH_FLAGS_CHECKROOT | GWEN_PATH_FLAGS_PATHMUSTEXIST | GWEN_PATH_FLAGS_NAMEMUSTEXIST | GWEN_PATH_FLAGS_VARIABLE); if (rv==0) { rv=AQH_Storage_ReadStateFile(sto, sto->stateFile); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } } else { DBG_WARN(AQH_LOGDOMAIN, "State file \"%s\" not available, will try to create it later (%d)", sto->stateFile, rv); } return 0; } int AQH_Storage_Fini(AQH_STORAGE *sto) { int errors=0; if (sto) { AQH_DATAFILE *df; while( (df=AQH_DataFile_List_First(sto->dataFileList)) ) { int rv; AQH_DataFile_List_Del(df); rv=AQH_DataFile_Close(df); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error closing file \"%lu\"", AQH_DataFile_GetValueId(df)); errors++; } AQH_DataFile_free(df); } } return (errors==0)?0:GWEN_ERROR_GENERIC; } int AQH_Storage_AddDatapoint(AQH_STORAGE *sto, uint64_t valueId, uint64_t timestamp, double dataPoint) { AQH_DATAFILE *df; df=_getDataFileByValueId(sto, valueId); if (df) { DBG_DEBUG(AQH_LOGDOMAIN, "Appending record to datafile"); AQH_DataFile_AppendRecord(df, timestamp, dataPoint); return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Error getting data file for value %lu", (unsigned long int) valueId); return GWEN_ERROR_GENERIC; } } uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t fromTime, uint64_t toTime, uint64_t maxDataPointsRequested) { AQH_DATAFILE *df; uint64_t numEntries; uint64_t arrayLen; uint64_t arrayPos; uint64_t *arrayPtr; uint64_t i; df=_getDataFileByValueId(sto, valueId); if (df==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "No file for value id %lu", (unsigned long int) valueId); return NULL; } numEntries=AQH_DataFile_GetNumberOfEntries(df); if (maxDataPointsRequested>numEntries) maxDataPointsRequested=numEntries; arrayLen=(maxDataPointsRequested*2)+1; arrayPtr=(uint64_t*) malloc(arrayLen*sizeof(uint64_t)); if (arrayPtr==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Not enough memory for %lu entries", (unsigned long int) arrayLen); free(arrayPtr); return NULL; } arrayPos=1; for (i=1; i=fromTime) && (toTime==0 || ts<=toTime)) { if ((arrayPos+1)>arrayLen) { DBG_INFO(AQH_LOGDOMAIN, "Limit for number of returned entries reached"); break; } arrayPtr[arrayPos++]=ts; arrayPtr[arrayPos++]=u.i; } } /* for */ if (arrayPos<=1) { DBG_INFO(AQH_LOGDOMAIN, "No matching records"); free(arrayPtr); return NULL; } arrayPtr[0]=arrayPos-1; return arrayPtr; } uint64_t *AQH_Storage_GetFirstNDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t maxDataPointsRequested) { AQH_DATAFILE *df; uint64_t numEntries; uint64_t numOfDataEntries; uint64_t arrayLen; uint64_t arrayPos; uint64_t *arrayPtr; uint64_t firstRecord; uint64_t i; df=_getDataFileByValueId(sto, valueId); if (df==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "No file for value id %lu", (unsigned long int) valueId); return NULL; } numEntries=AQH_DataFile_GetNumberOfEntries(df); numOfDataEntries=numEntries-1; /* first entry is reserved, don't count it here */ if (numOfDataEntries<1) { DBG_INFO(AQH_LOGDOMAIN, "No data records for value id %lu", (unsigned long int) valueId); return NULL; } firstRecord=1; if (numOfDataEntries>maxDataPointsRequested) /* more entries in file than requested */ arrayLen=(maxDataPointsRequested*2)+1; /* +1 because the first array entry contains the number of entries */ else arrayLen=(numOfDataEntries*2)+1; arrayPtr=(uint64_t*) malloc(arrayLen*sizeof(uint64_t)); if (arrayPtr==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Not enough memory for %lu entries", (unsigned long int) arrayLen); return NULL; } arrayPos=1; for (i=firstRecord; i=arrayLen) { DBG_INFO(AQH_LOGDOMAIN, "Requested number of entries reached"); break; } arrayPtr[arrayPos++]=ts; arrayPtr[arrayPos++]=u.i; } /* for */ arrayPtr[0]=arrayPos-1; return arrayPtr; } uint64_t *AQH_Storage_GetLastNDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t maxDataPointsRequested) { AQH_DATAFILE *df; uint64_t numEntries; uint64_t numOfDataEntries; uint64_t arrayLen; uint64_t arrayPos; uint64_t *arrayPtr; uint64_t firstRecord; uint64_t i; df=_getDataFileByValueId(sto, valueId); if (df==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "No file for value id %lu", (unsigned long int) valueId); return NULL; } numEntries=AQH_DataFile_GetNumberOfEntries(df); numOfDataEntries=numEntries-1; /* first entry is reserved, don't count it here */ if (numOfDataEntries<1) { DBG_INFO(AQH_LOGDOMAIN, "No data records for value id %lu", (unsigned long int) valueId); return NULL; } if (numOfDataEntries>maxDataPointsRequested) { /* more entries in file than requested */ arrayLen=(maxDataPointsRequested*2)+1; /* +1 because the first array entry contains the number of entries */ firstRecord=1+(numOfDataEntries-maxDataPointsRequested); } else { arrayLen=(numOfDataEntries*2)+1; firstRecord=1; } arrayPtr=(uint64_t*) malloc(arrayLen*sizeof(uint64_t)); if (arrayPtr==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Not enough memory for %lu entries", (unsigned long int) arrayLen); return NULL; } arrayPos=1; for (i=firstRecord; i=arrayLen) { DBG_INFO(AQH_LOGDOMAIN, "Requested number of entries reached"); break; } arrayPtr[arrayPos++]=ts; arrayPtr[arrayPos++]=u.i; } /* for */ arrayPtr[0]=arrayPos-1; return arrayPtr; } int AQH_Storage_GetLastDataPoint(AQH_STORAGE *sto, uint64_t valueId, uint64_t *pTimestamp, double *pValue) { AQH_DATAFILE *df; uint64_t numEntries; int rv; df=_getDataFileByValueId(sto, valueId); if (df==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "No file for value id %lu", (unsigned long int) valueId); return GWEN_ERROR_INVALID; } numEntries=AQH_DataFile_GetNumberOfEntries(df); if (numEntries<2) { DBG_INFO(AQH_LOGDOMAIN, "No data entries in file"); return GWEN_ERROR_NO_DATA; } rv=AQH_DataFile_ReadRecord(df, numEntries-1, pTimestamp, pValue); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } return 0; } int AQH_Storage_GetFirstDataPoint(AQH_STORAGE *sto, uint64_t valueId, uint64_t *pTimestamp, double *pValue) { AQH_DATAFILE *df; uint64_t numEntries; int rv; df=_getDataFileByValueId(sto, valueId); if (df==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "No file for value id %lu", (unsigned long int) valueId); return GWEN_ERROR_INVALID; } numEntries=AQH_DataFile_GetNumberOfEntries(df); if (numEntries<2) { DBG_INFO(AQH_LOGDOMAIN, "No data entries in file"); return GWEN_ERROR_NO_DATA; } rv=AQH_DataFile_ReadRecord(df, 1, pTimestamp, pValue); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } return 0; } AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId) { AQH_DATAFILE *df; df=_findDataFileByValueId(sto, valueId); if (df==NULL) { DBG_DEBUG(AQH_LOGDOMAIN, "Datafile for valueId \"%lu\" not in list, loading", (unsigned long int) valueId); df=_openOrCreateDataFileByValueId(sto, valueId); if (df==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Error opening/creating datafile for valueId \"%lu\"", (unsigned long int) valueId); return NULL; } DBG_DEBUG(AQH_LOGDOMAIN, "Adding datafile for valueId \"%lu\" to list", (unsigned long int) valueId); AQH_DataFile_List_Add(df, sto->dataFileList); } return df; } AQH_DATAFILE *_findDataFileByValueId(const AQH_STORAGE *sto, uint64_t valueId) { if (sto && sto->dataFileList) return AQH_DataFile_List_GetByValueId(sto->dataFileList, valueId); return NULL; } AQH_DATAFILE *_openOrCreateDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId) { AQH_DATAFILE *df; GWEN_BUFFER *nameBuf; int rv; nameBuf=_getDataFilePathForValueId(sto, valueId); df=AQH_DataFileDirect_new(GWEN_Buffer_GetStart(nameBuf), valueId); rv=GWEN_Directory_GetPath(GWEN_Buffer_GetStart(nameBuf), GWEN_PATH_FLAGS_CHECKROOT | GWEN_PATH_FLAGS_PATHMUSTEXIST | GWEN_PATH_FLAGS_NAMEMUSTEXIST | GWEN_PATH_FLAGS_VARIABLE); if (rv==0) { DBG_DEBUG(AQH_LOGDOMAIN, "File \"%s\" exists, trying to open", GWEN_Buffer_GetStart(nameBuf)); rv=AQH_DataFile_Open(df); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error opening data file \"%s\" (%d)", GWEN_Buffer_GetStart(nameBuf), rv); GWEN_Buffer_free(nameBuf); return NULL; } } else { DBG_INFO(AQH_LOGDOMAIN, "Creating file \"%s\"", GWEN_Buffer_GetStart(nameBuf)); rv=AQH_DataFile_Create(df); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error creating data file \"%s\" (%d)", GWEN_Buffer_GetStart(nameBuf), rv); GWEN_Buffer_free(nameBuf); return NULL; } } GWEN_Buffer_free(nameBuf); return df; } GWEN_BUFFER *_getDataFilePathForValueId(const AQH_STORAGE *sto, uint64_t valueId) { GWEN_BUFFER *buf; buf=GWEN_Buffer_new(0, 256, 0, 1); if (sto->dataFileFolder) { GWEN_Buffer_AppendString(buf, sto->dataFileFolder); GWEN_Buffer_AppendString(buf, GWEN_DIR_SEPARATOR_S); } GWEN_Buffer_AppendArgs(buf, "%" PRIu64 ".data", valueId); return buf; }