From f5878f43ff48e0fa2cb54b7bd3f4b6c6824c07fd Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Sat, 12 Aug 2023 02:06:54 +0200 Subject: [PATCH] Added datafile and handling of MQTT publish message. --- aqhome/data/0BUILD | 3 + aqhome/data/README | 17 ++ aqhome/data/datafile.c | 397 ++++++++++++++++++++++++++++++++++++++ aqhome/data/datafile.h | 44 +++++ aqhome/data/datafile_p.h | 39 ++++ aqhome/data/datapoint.t2d | 10 +- aqhome/data/storage.c | 277 +++++++++++++++++++++++++- aqhome/data/storage.h | 5 + aqhome/data/storage_p.h | 4 + 9 files changed, 783 insertions(+), 13 deletions(-) create mode 100644 aqhome/data/datafile.c create mode 100644 aqhome/data/datafile.h create mode 100644 aqhome/data/datafile_p.h diff --git a/aqhome/data/0BUILD b/aqhome/data/0BUILD index ed915d8..c7f5840 100644 --- a/aqhome/data/0BUILD +++ b/aqhome/data/0BUILD @@ -68,6 +68,7 @@ storage.h + datafile.h @@ -75,6 +76,7 @@ storage_p.h storage_readxml.h storage_writexml.h + datafile_p.h @@ -84,6 +86,7 @@ storage.c storage_readxml.c storage_writexml.c + datafile.c diff --git a/aqhome/data/README b/aqhome/data/README index 7cee608..a601e5e 100644 --- a/aqhome/data/README +++ b/aqhome/data/README @@ -77,3 +77,20 @@ Storage API: +datafile: +- valueId +- filename +- number of entries +- fd +- getNumOfEntries() +- readRecord(id, &record) +- addRecord(record) + +datarecord in file: +- 8 bytes timestamp +- 8 bytes value + + + + + diff --git a/aqhome/data/datafile.c b/aqhome/data/datafile.c new file mode 100644 index 0000000..15b6ae3 --- /dev/null +++ b/aqhome/data/datafile.c @@ -0,0 +1,397 @@ +/**************************************************************************** + * This file is part of the project Gwenhywfar. + * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./datafile_p.h" + +#include +#include + +#include + + +GWEN_LIST_FUNCTIONS(AQH_DATAFILE, AQH_DataFile); + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + + +static int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue); +static int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + +AQH_DATAFILE *AQH_DataFile_new(const char *fileName, uint64_t valueId) +{ + AQH_DATAFILE *df; + + GWEN_NEW_OBJECT(AQH_DATAFILE, df); + GWEN_LIST_INIT(AQH_DATAFILE, df); + df->valueId=valueId; + df->fileName=fileName?strdup(fileName):NULL; + + return df; +} + + + +void AQH_DataFile_free(AQH_DATAFILE *df) +{ + if (df) { + GWEN_LIST_FINI(AQH_DATAFILE, df); + GWEN_SyncIo_free(df->sio); + free(df->fileName); + GWEN_FREE_OBJECT(df); + } +} + + + +const char *AQH_DataFile_GetFileName(const AQH_DATAFILE *df) +{ + return df?df->fileName:NULL; +} + + + +uint64_t AQH_DataFile_GetValueId(const AQH_DATAFILE *df) +{ + return df?df->valueId:0; +} + + + +uint64_t AQH_DataFile_GetNumberOfEntries(const AQH_DATAFILE *df) +{ + return df?df->numberOfEntries:0; +} + + + + +uint32_t AQH_DataFile_GetRuntimeFlags(const AQH_DATAFILE *df) +{ + return df?df->runtimeFlags:0; +} + + + +int AQH_DataFile_Create(AQH_DATAFILE *df) +{ + if (df) { + GWEN_SYNCIO *sio; + int rv; + + if (df->sio) { + DBG_ERROR(AQH_LOGDOMAIN, "File already open"); + return GWEN_ERROR_INVALID; + } + + sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew); + rv=GWEN_SyncIo_Connect(sio); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv); + GWEN_SyncIo_free(sio); + return rv; + } + rv=_writeRecord(sio, 0, 0, 0.0); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error adding first record to file \"%s\" (%d)", df->fileName, rv); + GWEN_SyncIo_free(sio); + return rv; + } + df->sio=sio; + df->numberOfEntries=1; /* we just added the 0-record */ + + DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" created.", df->fileName); + return 0; + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); + return GWEN_ERROR_GENERIC; + } +} + + + +int AQH_DataFile_Open(AQH_DATAFILE *df) +{ + if (df) { + GWEN_SYNCIO *sio; + int rv; + int64_t len; + + if (df->sio) { + DBG_ERROR(AQH_LOGDOMAIN, "File already open"); + return GWEN_ERROR_INVALID; + } + + sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew); + rv=GWEN_SyncIo_Connect(sio); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv); + GWEN_SyncIo_free(sio); + return rv; + } + + len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_End); + if (len<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error seeking to end of file \"%s\" (%d)", df->fileName, rv); + GWEN_SyncIo_free(sio); + return rv; + } + if (len % AQH_DATAFILE_RECORDSIZE) { + DBG_INFO(AQH_LOGDOMAIN, "Invalid file size of file \"%s\" ((not a multiple of current record size))", df->fileName); + GWEN_SyncIo_Disconnect(sio); + GWEN_SyncIo_free(sio); + unlink(df->fileName); + return GWEN_ERROR_BAD_DATA; + } + df->numberOfEntries=len/AQH_DATAFILE_RECORDSIZE; + + len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_Set); + if (len<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error seeking to begin of file \"%s\" (%d)", df->fileName, rv); + GWEN_SyncIo_free(sio); + return rv; + } + df->sio=sio; + return 0; + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); + return GWEN_ERROR_GENERIC; + } +} + + + +int AQH_DataFile_Close(AQH_DATAFILE *df) +{ + if (df) { + int rv; + + if (df->sio==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "File not open"); + return GWEN_ERROR_INVALID; + } + + rv=GWEN_SyncIo_Disconnect(df->sio); + if (rv<0) { + DBG_ERROR(AQH_LOGDOMAIN, "Error closing file \"%s\" (%d)", df->fileName, rv); + GWEN_SyncIo_free(df->sio); + df->sio=NULL; + df->numberOfEntries=0; + return rv; + } + GWEN_SyncIo_free(df->sio); + df->sio=NULL; + df->numberOfEntries=0; + return 0; + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); + return GWEN_ERROR_GENERIC; + } +} + + + + +int AQH_DataFile_ReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue) +{ + if (df) { + int rv; + + if (df->sio==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "File not open"); + return GWEN_ERROR_INVALID; + } + + rv=_readRecord(df->sio, idx, pTimestamp, pValue);; + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error writing to file \"%s\" (%d)", df->fileName, rv); + return rv; + } + + return 0; + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); + return GWEN_ERROR_GENERIC; + } +} + + + +int AQH_DataFile_AppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value) +{ + if (df) { + int rv; + uint64_t filePos; + + if (df->sio==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "File not open"); + return GWEN_ERROR_INVALID; + } + + filePos=(df->numberOfEntries)*AQH_DATAFILE_RECORDSIZE; + rv=_writeRecord(df->sio, filePos, timestamp, value); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error writing to file \"%s\" (%d)", df->fileName, rv); + return rv; + } + + df->numberOfEntries++; + DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" has now %lu entries", df->fileName, (unsigned long int) (df->numberOfEntries)); + return 0; + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); + return GWEN_ERROR_GENERIC; + } +} + + + +AQH_DATAFILE *AQH_DataFile_List_GetByValueId(AQH_DATAFILE_LIST *fileList, uint64_t id) +{ + if (fileList) { + AQH_DATAFILE *df; + + df=AQH_DataFile_List_First(fileList); + while(df) { + if (df->valueId==id) + break; + df=AQH_DataFile_List_Next(df); + } + } + return NULL; +} + + + +int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue) +{ + uint8_t record[AQH_DATAFILE_RECORDSIZE]; + union {double f; uint64_t i;} u; + uint64_t filePos; + int64_t retPos; + const uint8_t *ptr; + uint64_t v; + int rv; + + filePos=idx*AQH_DATAFILE_RECORDSIZE; + retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set); + if (retPos<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos); + return (int) retPos; + } + + rv=GWEN_SyncIo_ReadForced(sio, record, AQH_DATAFILE_RECORDSIZE); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error reading at pos %lu (%d)", (unsigned long int) filePos, rv); + return rv; + } + + ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP; + v=0; + v|=(uint64_t)(*(ptr++)); + v|=(uint64_t)(*(ptr++))<<8; + v|=(uint64_t)(*(ptr++))<<16; + v|=(uint64_t)(*(ptr++))<<24; + v|=(uint64_t)(*(ptr++))<<32; + v|=(uint64_t)(*(ptr++))<<40; + v|=(uint64_t)(*(ptr++))<<48; + v|=(uint64_t)(*(ptr++))<<56; + *pTimestamp=v; + + ptr=record+AQH_DATAFILE_OFFSET_VALUE; + v=0; + v|=(uint64_t)(*(ptr++)); + v|=(uint64_t)(*(ptr++))<<8; + v|=(uint64_t)(*(ptr++))<<16; + v|=(uint64_t)(*(ptr++))<<24; + v|=(uint64_t)(*(ptr++))<<32; + v|=(uint64_t)(*(ptr++))<<40; + v|=(uint64_t)(*(ptr++))<<48; + v|=(uint64_t)(*(ptr++))<<56; + u.i=v; + *pValue=u.f; + + return 0; +} + + + +int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value) +{ + uint8_t record[AQH_DATAFILE_RECORDSIZE]; + union {double f; uint64_t i;} u; + uint8_t *ptr; + int64_t retPos; + uint64_t v; + int rv; + + ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP; + v=timestamp; + *(ptr++)=(v) & 0xff; + *(ptr++)=(v>>8) & 0xff; + *(ptr++)=(v>>16) & 0xff; + *(ptr++)=(v>>24) & 0xff; + *(ptr++)=(v>>32) & 0xff; + *(ptr++)=(v>>40) & 0xff; + *(ptr++)=(v>>48) & 0xff; + *(ptr++)=(v>>56) & 0xff; + + u.f=value; + v=u.i; + ptr=record+AQH_DATAFILE_OFFSET_VALUE; + *(ptr++)=(v) & 0xff; + *(ptr++)=(v>>8) & 0xff; + *(ptr++)=(v>>16) & 0xff; + *(ptr++)=(v>>24) & 0xff; + *(ptr++)=(v>>32) & 0xff; + *(ptr++)=(v>>40) & 0xff; + *(ptr++)=(v>>48) & 0xff; + *(ptr++)=(v>>56) & 0xff; + + retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set); + if (retPos<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos); + return (int) retPos; + } + rv=GWEN_SyncIo_WriteForced(sio, record, AQH_DATAFILE_RECORDSIZE); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error writing to file (%d)", rv); + return rv; + } + return 0; +} + + + + + + + + + + diff --git a/aqhome/data/datafile.h b/aqhome/data/datafile.h new file mode 100644 index 0000000..3a1b589 --- /dev/null +++ b/aqhome/data/datafile.h @@ -0,0 +1,44 @@ +/**************************************************************************** + * This file is part of the project Gwenhywfar. + * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_DATAFILE_H +#define AQH_DATAFILE_H + + +#include "aqhome/data/storage.h" + +#include + + +typedef struct AQH_DATAFILE AQH_DATAFILE; +GWEN_LIST_FUNCTION_LIB_DEFS(AQH_DATAFILE, AQH_DataFile, AQHOME_API) + + +AQHOME_API AQH_DATAFILE *AQH_DataFile_new(const char *fileName, uint64_t valueId); +AQHOME_API void AQH_DataFile_free(AQH_DATAFILE *df); + +AQHOME_API int AQH_DataFile_Create(AQH_DATAFILE *df); +AQHOME_API int AQH_DataFile_Open(AQH_DATAFILE *df); +AQHOME_API int AQH_DataFile_Close(AQH_DATAFILE *df); + +AQHOME_API const char *AQH_DataFile_GetFileName(const AQH_DATAFILE *df); +AQHOME_API uint64_t AQH_DataFile_GetValueId(const AQH_DATAFILE *df); + +AQHOME_API uint64_t AQH_DataFile_GetNumberOfEntries(const AQH_DATAFILE *df); + +AQHOME_API uint32_t AQH_DataFile_GetRuntimeFlags(const AQH_DATAFILE *df); + +AQHOME_API int AQH_DataFile_ReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue); +AQHOME_API int AQH_DataFile_AppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value); + +AQHOME_API AQH_DATAFILE *AQH_DataFile_List_GetByValueId(AQH_DATAFILE_LIST *fileList, uint64_t id); + + +#endif + + diff --git a/aqhome/data/datafile_p.h b/aqhome/data/datafile_p.h new file mode 100644 index 0000000..bb71eba --- /dev/null +++ b/aqhome/data/datafile_p.h @@ -0,0 +1,39 @@ +/**************************************************************************** + * This file is part of the project Gwenhywfar. + * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_DATAFILE_P_H +#define AQH_DATAFILE_P_H + + +#include "./datafile.h" + +#include + + +#define AQH_DATAFILE_RECORDSIZE 16 +#define AQH_DATAFILE_OFFSET_TIMESTAMP 0 +#define AQH_DATAFILE_OFFSET_VALUE 8 + + +struct AQH_DATAFILE { + GWEN_LIST_ELEMENT(AQH_DATAFILE); + + char *fileName; + uint64_t valueId; + GWEN_SYNCIO *sio; + uint64_t numberOfEntries; + uint32_t runtimeFlags; +}; + + + + + +#endif + + diff --git a/aqhome/data/datapoint.t2d b/aqhome/data/datapoint.t2d index 31233a7..00de5b7 100644 --- a/aqhome/data/datapoint.t2d +++ b/aqhome/data/datapoint.t2d @@ -34,7 +34,7 @@ public - + 0 0 @@ -48,13 +48,11 @@ 0 - + public own with_sortbymember - const dup - const - NULL - NULL + 0.0 + 0.0 diff --git a/aqhome/data/storage.c b/aqhome/data/storage.c index d6dae45..1aa9a59 100644 --- a/aqhome/data/storage.c +++ b/aqhome/data/storage.c @@ -17,6 +17,8 @@ #include #include #include +#include +#include @@ -26,6 +28,17 @@ */ +static void _handleJsonTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue); +static void _handleNumTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue); +static void _handleValueForJsonElement(AQH_STORAGE *sto, + const AQH_MQTT_TOPIC *topic, + GWEN_JSON_ELEM *json, + time_t timestamp, + const AQH_VALUE *value); +static AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId); +static AQH_DATAFILE *_findDataFileByValueId(const AQH_STORAGE *sto, uint64_t valueId); +static AQH_DATAFILE *_openOrCreateDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId); +static GWEN_BUFFER *_getDataFilePathForValueId(const AQH_STORAGE *sto, uint64_t valueId); @@ -44,6 +57,7 @@ AQH_STORAGE *AQH_Storage_new(void) sto->deviceList=AQH_Device_List_new(); sto->mqttTopicList=AQH_MqttTopic_List_new(); sto->valueList=AQH_Value_List_new(); + sto->dataFileList=AQH_DataFile_List_new(); return sto; } @@ -53,10 +67,12 @@ AQH_STORAGE *AQH_Storage_new(void) void AQH_Storage_free(AQH_STORAGE *sto) { if (sto) { + AQH_DataFile_List_free(sto->dataFileList); AQH_Value_List_free(sto->valueList); AQH_MqttTopic_List_free(sto->mqttTopicList); AQH_Device_List_free(sto->deviceList); AQH_Room_List_free(sto->roomList); + free(sto->dataFileFolder); free(sto->stateFile); GWEN_FREE_OBJECT(sto); @@ -249,13 +265,6 @@ void AQH_Storage_SubRuntimeFlags(AQH_STORAGE *sto, uint32_t flags) -void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *topic, const char *value) -{ - /* TODO */ -} - - - int AQH_Storage_Init(AQH_STORAGE *sto) { int rv; @@ -281,6 +290,31 @@ int AQH_Storage_Init(AQH_STORAGE *sto) +int AQH_Storage_Fini(AQH_STORAGE *sto) +{ + int errors=0; + + if (sto) { + AQH_DATAFILE *df; + + while( (df=AQH_DataFile_List_First(sto->dataFileList)) ) { + int rv; + + AQH_DataFile_List_Del(df); + rv=AQH_DataFile_Close(df); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error closing file \"%s\"", AQH_DataFile_GetFileName(df)); + errors++; + } + AQH_DataFile_free(df); + } + } + + return (errors==0)?0:GWEN_ERROR_GENERIC; +} + + + int AQH_Storage_WriteState(AQH_STORAGE *sto) { int rv; @@ -297,5 +331,234 @@ int AQH_Storage_WriteState(AQH_STORAGE *sto) +const char *AQH_Storage_GetDataFileFolder(const AQH_STORAGE *sto) +{ + return sto?sto->dataFileFolder:NULL; +} + + + +void AQH_Storage_SetDataFileFolder(AQH_STORAGE *sto, const char *s) +{ + if (sto) { + free(sto->dataFileFolder); + sto->dataFileFolder=s?strdup(s):NULL; + } +} + + + + +void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *sTopic, const char *sValue) +{ + if (sto) { + const AQH_MQTT_TOPIC *topic; + + topic=AQH_Storage_GetMqttTopicByTopic(sto, sTopic); + if (topic) { + if (AQH_MqttTopic_GetDataType(topic)==AQH_MqttTopicType_Json) + _handleJsonTopic(sto, topic, sValue); + else + _handleNumTopic(sto, topic, sValue); + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Unknown MQTT topic \"%s\"", sTopic); + } + } +} + + + +void _handleJsonTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue) +{ + time_t now; + GWEN_JSON_ELEM *json; + + now=time(NULL); + json=GWEN_JsonElement_fromString(sValue); + if (json) { + AQH_VALUE *value; + uint64_t topicId; + + topicId=AQH_MqttTopic_GetId(topic); + value=AQH_Value_List_First(sto->valueList); + while(value) { + if (AQH_Value_GetTopicId(value)==topicId) { + _handleValueForJsonElement(sto, topic, json, now, value); + } + value=AQH_Value_List_Next(value); + } + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Error parsing JSON data [%s]", sValue); + } +} + + + +void _handleValueForJsonElement(AQH_STORAGE *sto, + const AQH_MQTT_TOPIC *topic, + GWEN_JSON_ELEM *json, + time_t timestamp, + const AQH_VALUE *value) +{ + const char *dataPath; + + dataPath=AQH_Value_GetDataPath(value); + if (dataPath) { + GWEN_JSON_ELEM *jsonElem; + + jsonElem=GWEN_JsonElement_GetElementByPath(json, dataPath, GWEN_PATH_FLAGS_PATHMUSTEXIST); + if (jsonElem) { + const char *s; + + s=GWEN_JsonElement_GetData(jsonElem); + if (s && *s) { + int rv; + double v; + + rv=GWEN_Text_StringToDouble(s, &v); + if (rv<0) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid value \"%s\" for topic %s:%s", s, AQH_MqttTopic_GetTopic(topic), dataPath); + } + else { + AQH_DATAFILE *df; + + df=_getDataFileByValueId(sto, AQH_Value_GetId(value)); + if (df) { + DBG_INFO(AQH_LOGDOMAIN, "Appending record to datafile"); + AQH_DataFile_AppendRecord(df, timestamp, v); + } + } + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Empty JSON element \"%s\"", dataPath); + } + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "No value for topic %s:%s", AQH_MqttTopic_GetTopic(topic), dataPath); + } + } + else { + DBG_INFO(AQH_LOGDOMAIN, "No datapath in value \"%s\"", AQH_Value_GetName(value)); + } +} + + + +void _handleNumTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue) +{ + int rv; + double v; + time_t now; + + now=time(NULL); + + rv=GWEN_Text_StringToDouble(sValue, &v); + if (rv<0) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid value \"%s\" for topic %s, ignoring", sValue, AQH_MqttTopic_GetTopic(topic)); + } + else { + AQH_VALUE *value; + uint64_t topicId; + + topicId=AQH_MqttTopic_GetId(topic); + value=AQH_Value_List_First(sto->valueList); + while(value) { + if (AQH_Value_GetTopicId(value)==topicId) + break; + value=AQH_Value_List_Next(value); + } + if (value) { + AQH_DATAFILE *df; + + df=_getDataFileByValueId(sto, AQH_Value_GetId(value)); + if (df) { + DBG_INFO(AQH_LOGDOMAIN, "Appending record to datafile"); + AQH_DataFile_AppendRecord(df, now, v); + } + } + } +} + + + + +AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId) +{ + AQH_DATAFILE *df; + + df=_findDataFileByValueId(sto, valueId); + if (df==NULL) { + df=_openOrCreateDataFileByValueId(sto, valueId); + AQH_DataFile_List_Add(df, sto->dataFileList); + } + + return df; +} + + + +AQH_DATAFILE *_findDataFileByValueId(const AQH_STORAGE *sto, uint64_t valueId) +{ + if (sto && sto->dataFileList) + return AQH_DataFile_List_GetByValueId(sto->dataFileList, valueId); + return NULL; +} + + + +AQH_DATAFILE *_openOrCreateDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId) +{ + AQH_DATAFILE *df; + GWEN_BUFFER *nameBuf; + int rv; + + nameBuf=_getDataFilePathForValueId(sto, valueId); + df=AQH_DataFile_new(GWEN_Buffer_GetStart(nameBuf), valueId); + + rv=GWEN_Directory_GetPath(GWEN_Buffer_GetStart(nameBuf), + GWEN_PATH_FLAGS_CHECKROOT | + GWEN_PATH_FLAGS_PATHMUSTEXIST | + GWEN_PATH_FLAGS_NAMEMUSTEXIST | + GWEN_PATH_FLAGS_VARIABLE); + if (rv==0) { + DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" exists, trying to open", GWEN_Buffer_GetStart(nameBuf)); + rv=AQH_DataFile_Open(df); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error opening data file \"%s\" (%d)", GWEN_Buffer_GetStart(nameBuf), rv); + GWEN_Buffer_free(nameBuf); + return NULL; + } + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Creating file \"%s\"", GWEN_Buffer_GetStart(nameBuf)); + rv=AQH_DataFile_Create(df); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Error creating data file \"%s\" (%d)", GWEN_Buffer_GetStart(nameBuf), rv); + GWEN_Buffer_free(nameBuf); + return NULL; + } + } + GWEN_Buffer_free(nameBuf); + + return df; +} + + + +GWEN_BUFFER *_getDataFilePathForValueId(const AQH_STORAGE *sto, uint64_t valueId) +{ + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + if (sto->dataFileFolder) { + GWEN_Buffer_AppendString(buf, sto->dataFileFolder); + GWEN_Buffer_AppendString(buf, GWEN_DIR_SEPARATOR_S); + } + GWEN_Buffer_AppendArgs(buf, "%" PRIu64 ".data", valueId); + return buf; +} + diff --git a/aqhome/data/storage.h b/aqhome/data/storage.h index 7679949..7122c20 100644 --- a/aqhome/data/storage.h +++ b/aqhome/data/storage.h @@ -72,6 +72,9 @@ AQHOME_API AQH_VALUE *AQH_Storage_GetValueByName(const AQH_STORAGE *sto, const c AQHOME_API const char *AQH_Storage_GetStateFile(const AQH_STORAGE *sto); AQHOME_API void AQH_Storage_SetStateFile(AQH_STORAGE *sto, const char *s); +AQHOME_API const char *AQH_Storage_GetDataFileFolder(const AQH_STORAGE *sto); +AQHOME_API void AQH_Storage_SetDataFileFolder(AQH_STORAGE *sto, const char *s); + AQHOME_API uint32_t AQH_Storage_GetRuntimeFlags(const AQH_STORAGE *sto); AQHOME_API void AQH_Storage_SetRuntimeFlags(AQH_STORAGE *sto, uint32_t flags); AQHOME_API void AQH_Storage_AddRuntimeFlags(AQH_STORAGE *sto, uint32_t flags); @@ -79,6 +82,8 @@ AQHOME_API void AQH_Storage_SubRuntimeFlags(AQH_STORAGE *sto, uint32_t flags); AQHOME_API int AQH_Storage_Init(AQH_STORAGE *sto); +AQHOME_API int AQH_Storage_Fini(AQH_STORAGE *sto); + AQHOME_API int AQH_Storage_WriteState(AQH_STORAGE *sto); diff --git a/aqhome/data/storage_p.h b/aqhome/data/storage_p.h index c5f0fed..0327dee 100644 --- a/aqhome/data/storage_p.h +++ b/aqhome/data/storage_p.h @@ -11,6 +11,7 @@ #include "aqhome/data/storage.h" +#include "aqhome/data/datafile.h" #define AQH_STORAGE_XML_ELEMENTNAME_LASTIDS "lastIds" @@ -41,6 +42,9 @@ struct AQH_STORAGE { uint64_t lastValueId; char *stateFile; + char *dataFileFolder; + + AQH_DATAFILE_LIST *dataFileList; uint32_t runtimeFlags; };