diff --git a/aqhome/data/0BUILD b/aqhome/data/0BUILD
index ed915d8..c7f5840 100644
--- a/aqhome/data/0BUILD
+++ b/aqhome/data/0BUILD
@@ -68,6 +68,7 @@
storage.h
+ datafile.h
@@ -75,6 +76,7 @@
storage_p.h
storage_readxml.h
storage_writexml.h
+ datafile_p.h
@@ -84,6 +86,7 @@
storage.c
storage_readxml.c
storage_writexml.c
+ datafile.c
diff --git a/aqhome/data/README b/aqhome/data/README
index 7cee608..a601e5e 100644
--- a/aqhome/data/README
+++ b/aqhome/data/README
@@ -77,3 +77,20 @@ Storage API:
+datafile:
+- valueId
+- filename
+- number of entries
+- fd
+- getNumOfEntries()
+- readRecord(id, &record)
+- addRecord(record)
+
+datarecord in file:
+- 8 bytes timestamp
+- 8 bytes value
+
+
+
+
+
diff --git a/aqhome/data/datafile.c b/aqhome/data/datafile.c
new file mode 100644
index 0000000..15b6ae3
--- /dev/null
+++ b/aqhome/data/datafile.c
@@ -0,0 +1,397 @@
+/****************************************************************************
+ * This file is part of the project Gwenhywfar.
+ * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
+ *
+ * The license for this file can be found in the file COPYING which you
+ * should have received along with this file.
+ ****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include
+#endif
+
+#include "./datafile_p.h"
+
+#include
+#include
+
+#include
+
+
+GWEN_LIST_FUNCTIONS(AQH_DATAFILE, AQH_DataFile);
+
+
+
+/* ------------------------------------------------------------------------------------------------
+ * forward declarations
+ * ------------------------------------------------------------------------------------------------
+ */
+
+
+static int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue);
+static int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value);
+
+
+
+/* ------------------------------------------------------------------------------------------------
+ * implementations
+ * ------------------------------------------------------------------------------------------------
+ */
+
+AQH_DATAFILE *AQH_DataFile_new(const char *fileName, uint64_t valueId)
+{
+ AQH_DATAFILE *df;
+
+ GWEN_NEW_OBJECT(AQH_DATAFILE, df);
+ GWEN_LIST_INIT(AQH_DATAFILE, df);
+ df->valueId=valueId;
+ df->fileName=fileName?strdup(fileName):NULL;
+
+ return df;
+}
+
+
+
+void AQH_DataFile_free(AQH_DATAFILE *df)
+{
+ if (df) {
+ GWEN_LIST_FINI(AQH_DATAFILE, df);
+ GWEN_SyncIo_free(df->sio);
+ free(df->fileName);
+ GWEN_FREE_OBJECT(df);
+ }
+}
+
+
+
+const char *AQH_DataFile_GetFileName(const AQH_DATAFILE *df)
+{
+ return df?df->fileName:NULL;
+}
+
+
+
+uint64_t AQH_DataFile_GetValueId(const AQH_DATAFILE *df)
+{
+ return df?df->valueId:0;
+}
+
+
+
+uint64_t AQH_DataFile_GetNumberOfEntries(const AQH_DATAFILE *df)
+{
+ return df?df->numberOfEntries:0;
+}
+
+
+
+
+uint32_t AQH_DataFile_GetRuntimeFlags(const AQH_DATAFILE *df)
+{
+ return df?df->runtimeFlags:0;
+}
+
+
+
+int AQH_DataFile_Create(AQH_DATAFILE *df)
+{
+ if (df) {
+ GWEN_SYNCIO *sio;
+ int rv;
+
+ if (df->sio) {
+ DBG_ERROR(AQH_LOGDOMAIN, "File already open");
+ return GWEN_ERROR_INVALID;
+ }
+
+ sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew);
+ rv=GWEN_SyncIo_Connect(sio);
+ if (rv<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv);
+ GWEN_SyncIo_free(sio);
+ return rv;
+ }
+ rv=_writeRecord(sio, 0, 0, 0.0);
+ if (rv<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error adding first record to file \"%s\" (%d)", df->fileName, rv);
+ GWEN_SyncIo_free(sio);
+ return rv;
+ }
+ df->sio=sio;
+ df->numberOfEntries=1; /* we just added the 0-record */
+
+ DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" created.", df->fileName);
+ return 0;
+ }
+ else {
+ DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
+ return GWEN_ERROR_GENERIC;
+ }
+}
+
+
+
+int AQH_DataFile_Open(AQH_DATAFILE *df)
+{
+ if (df) {
+ GWEN_SYNCIO *sio;
+ int rv;
+ int64_t len;
+
+ if (df->sio) {
+ DBG_ERROR(AQH_LOGDOMAIN, "File already open");
+ return GWEN_ERROR_INVALID;
+ }
+
+ sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew);
+ rv=GWEN_SyncIo_Connect(sio);
+ if (rv<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv);
+ GWEN_SyncIo_free(sio);
+ return rv;
+ }
+
+ len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_End);
+ if (len<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error seeking to end of file \"%s\" (%d)", df->fileName, rv);
+ GWEN_SyncIo_free(sio);
+ return rv;
+ }
+ if (len % AQH_DATAFILE_RECORDSIZE) {
+ DBG_INFO(AQH_LOGDOMAIN, "Invalid file size of file \"%s\" ((not a multiple of current record size))", df->fileName);
+ GWEN_SyncIo_Disconnect(sio);
+ GWEN_SyncIo_free(sio);
+ unlink(df->fileName);
+ return GWEN_ERROR_BAD_DATA;
+ }
+ df->numberOfEntries=len/AQH_DATAFILE_RECORDSIZE;
+
+ len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_Set);
+ if (len<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error seeking to begin of file \"%s\" (%d)", df->fileName, rv);
+ GWEN_SyncIo_free(sio);
+ return rv;
+ }
+ df->sio=sio;
+ return 0;
+ }
+ else {
+ DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
+ return GWEN_ERROR_GENERIC;
+ }
+}
+
+
+
+int AQH_DataFile_Close(AQH_DATAFILE *df)
+{
+ if (df) {
+ int rv;
+
+ if (df->sio==NULL) {
+ DBG_ERROR(AQH_LOGDOMAIN, "File not open");
+ return GWEN_ERROR_INVALID;
+ }
+
+ rv=GWEN_SyncIo_Disconnect(df->sio);
+ if (rv<0) {
+ DBG_ERROR(AQH_LOGDOMAIN, "Error closing file \"%s\" (%d)", df->fileName, rv);
+ GWEN_SyncIo_free(df->sio);
+ df->sio=NULL;
+ df->numberOfEntries=0;
+ return rv;
+ }
+ GWEN_SyncIo_free(df->sio);
+ df->sio=NULL;
+ df->numberOfEntries=0;
+ return 0;
+ }
+ else {
+ DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
+ return GWEN_ERROR_GENERIC;
+ }
+}
+
+
+
+
+int AQH_DataFile_ReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue)
+{
+ if (df) {
+ int rv;
+
+ if (df->sio==NULL) {
+ DBG_ERROR(AQH_LOGDOMAIN, "File not open");
+ return GWEN_ERROR_INVALID;
+ }
+
+ rv=_readRecord(df->sio, idx, pTimestamp, pValue);;
+ if (rv<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error writing to file \"%s\" (%d)", df->fileName, rv);
+ return rv;
+ }
+
+ return 0;
+ }
+ else {
+ DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
+ return GWEN_ERROR_GENERIC;
+ }
+}
+
+
+
+int AQH_DataFile_AppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value)
+{
+ if (df) {
+ int rv;
+ uint64_t filePos;
+
+ if (df->sio==NULL) {
+ DBG_ERROR(AQH_LOGDOMAIN, "File not open");
+ return GWEN_ERROR_INVALID;
+ }
+
+ filePos=(df->numberOfEntries)*AQH_DATAFILE_RECORDSIZE;
+ rv=_writeRecord(df->sio, filePos, timestamp, value);
+ if (rv<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error writing to file \"%s\" (%d)", df->fileName, rv);
+ return rv;
+ }
+
+ df->numberOfEntries++;
+ DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" has now %lu entries", df->fileName, (unsigned long int) (df->numberOfEntries));
+ return 0;
+ }
+ else {
+ DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
+ return GWEN_ERROR_GENERIC;
+ }
+}
+
+
+
+AQH_DATAFILE *AQH_DataFile_List_GetByValueId(AQH_DATAFILE_LIST *fileList, uint64_t id)
+{
+ if (fileList) {
+ AQH_DATAFILE *df;
+
+ df=AQH_DataFile_List_First(fileList);
+ while(df) {
+ if (df->valueId==id)
+ break;
+ df=AQH_DataFile_List_Next(df);
+ }
+ }
+ return NULL;
+}
+
+
+
+int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue)
+{
+ uint8_t record[AQH_DATAFILE_RECORDSIZE];
+ union {double f; uint64_t i;} u;
+ uint64_t filePos;
+ int64_t retPos;
+ const uint8_t *ptr;
+ uint64_t v;
+ int rv;
+
+ filePos=idx*AQH_DATAFILE_RECORDSIZE;
+ retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set);
+ if (retPos<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos);
+ return (int) retPos;
+ }
+
+ rv=GWEN_SyncIo_ReadForced(sio, record, AQH_DATAFILE_RECORDSIZE);
+ if (rv<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error reading at pos %lu (%d)", (unsigned long int) filePos, rv);
+ return rv;
+ }
+
+ ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP;
+ v=0;
+ v|=(uint64_t)(*(ptr++));
+ v|=(uint64_t)(*(ptr++))<<8;
+ v|=(uint64_t)(*(ptr++))<<16;
+ v|=(uint64_t)(*(ptr++))<<24;
+ v|=(uint64_t)(*(ptr++))<<32;
+ v|=(uint64_t)(*(ptr++))<<40;
+ v|=(uint64_t)(*(ptr++))<<48;
+ v|=(uint64_t)(*(ptr++))<<56;
+ *pTimestamp=v;
+
+ ptr=record+AQH_DATAFILE_OFFSET_VALUE;
+ v=0;
+ v|=(uint64_t)(*(ptr++));
+ v|=(uint64_t)(*(ptr++))<<8;
+ v|=(uint64_t)(*(ptr++))<<16;
+ v|=(uint64_t)(*(ptr++))<<24;
+ v|=(uint64_t)(*(ptr++))<<32;
+ v|=(uint64_t)(*(ptr++))<<40;
+ v|=(uint64_t)(*(ptr++))<<48;
+ v|=(uint64_t)(*(ptr++))<<56;
+ u.i=v;
+ *pValue=u.f;
+
+ return 0;
+}
+
+
+
+int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value)
+{
+ uint8_t record[AQH_DATAFILE_RECORDSIZE];
+ union {double f; uint64_t i;} u;
+ uint8_t *ptr;
+ int64_t retPos;
+ uint64_t v;
+ int rv;
+
+ ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP;
+ v=timestamp;
+ *(ptr++)=(v) & 0xff;
+ *(ptr++)=(v>>8) & 0xff;
+ *(ptr++)=(v>>16) & 0xff;
+ *(ptr++)=(v>>24) & 0xff;
+ *(ptr++)=(v>>32) & 0xff;
+ *(ptr++)=(v>>40) & 0xff;
+ *(ptr++)=(v>>48) & 0xff;
+ *(ptr++)=(v>>56) & 0xff;
+
+ u.f=value;
+ v=u.i;
+ ptr=record+AQH_DATAFILE_OFFSET_VALUE;
+ *(ptr++)=(v) & 0xff;
+ *(ptr++)=(v>>8) & 0xff;
+ *(ptr++)=(v>>16) & 0xff;
+ *(ptr++)=(v>>24) & 0xff;
+ *(ptr++)=(v>>32) & 0xff;
+ *(ptr++)=(v>>40) & 0xff;
+ *(ptr++)=(v>>48) & 0xff;
+ *(ptr++)=(v>>56) & 0xff;
+
+ retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set);
+ if (retPos<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos);
+ return (int) retPos;
+ }
+ rv=GWEN_SyncIo_WriteForced(sio, record, AQH_DATAFILE_RECORDSIZE);
+ if (rv<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error writing to file (%d)", rv);
+ return rv;
+ }
+ return 0;
+}
+
+
+
+
+
+
+
+
+
+
diff --git a/aqhome/data/datafile.h b/aqhome/data/datafile.h
new file mode 100644
index 0000000..3a1b589
--- /dev/null
+++ b/aqhome/data/datafile.h
@@ -0,0 +1,44 @@
+/****************************************************************************
+ * This file is part of the project Gwenhywfar.
+ * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
+ *
+ * The license for this file can be found in the file COPYING which you
+ * should have received along with this file.
+ ****************************************************************************/
+
+#ifndef AQH_DATAFILE_H
+#define AQH_DATAFILE_H
+
+
+#include "aqhome/data/storage.h"
+
+#include
+
+
+typedef struct AQH_DATAFILE AQH_DATAFILE;
+GWEN_LIST_FUNCTION_LIB_DEFS(AQH_DATAFILE, AQH_DataFile, AQHOME_API)
+
+
+AQHOME_API AQH_DATAFILE *AQH_DataFile_new(const char *fileName, uint64_t valueId);
+AQHOME_API void AQH_DataFile_free(AQH_DATAFILE *df);
+
+AQHOME_API int AQH_DataFile_Create(AQH_DATAFILE *df);
+AQHOME_API int AQH_DataFile_Open(AQH_DATAFILE *df);
+AQHOME_API int AQH_DataFile_Close(AQH_DATAFILE *df);
+
+AQHOME_API const char *AQH_DataFile_GetFileName(const AQH_DATAFILE *df);
+AQHOME_API uint64_t AQH_DataFile_GetValueId(const AQH_DATAFILE *df);
+
+AQHOME_API uint64_t AQH_DataFile_GetNumberOfEntries(const AQH_DATAFILE *df);
+
+AQHOME_API uint32_t AQH_DataFile_GetRuntimeFlags(const AQH_DATAFILE *df);
+
+AQHOME_API int AQH_DataFile_ReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue);
+AQHOME_API int AQH_DataFile_AppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value);
+
+AQHOME_API AQH_DATAFILE *AQH_DataFile_List_GetByValueId(AQH_DATAFILE_LIST *fileList, uint64_t id);
+
+
+#endif
+
+
diff --git a/aqhome/data/datafile_p.h b/aqhome/data/datafile_p.h
new file mode 100644
index 0000000..bb71eba
--- /dev/null
+++ b/aqhome/data/datafile_p.h
@@ -0,0 +1,39 @@
+/****************************************************************************
+ * This file is part of the project Gwenhywfar.
+ * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
+ *
+ * The license for this file can be found in the file COPYING which you
+ * should have received along with this file.
+ ****************************************************************************/
+
+#ifndef AQH_DATAFILE_P_H
+#define AQH_DATAFILE_P_H
+
+
+#include "./datafile.h"
+
+#include
+
+
+#define AQH_DATAFILE_RECORDSIZE 16
+#define AQH_DATAFILE_OFFSET_TIMESTAMP 0
+#define AQH_DATAFILE_OFFSET_VALUE 8
+
+
+struct AQH_DATAFILE {
+ GWEN_LIST_ELEMENT(AQH_DATAFILE);
+
+ char *fileName;
+ uint64_t valueId;
+ GWEN_SYNCIO *sio;
+ uint64_t numberOfEntries;
+ uint32_t runtimeFlags;
+};
+
+
+
+
+
+#endif
+
+
diff --git a/aqhome/data/datapoint.t2d b/aqhome/data/datapoint.t2d
index 31233a7..00de5b7 100644
--- a/aqhome/data/datapoint.t2d
+++ b/aqhome/data/datapoint.t2d
@@ -34,7 +34,7 @@
public
-
+
0
0
@@ -48,13 +48,11 @@
0
-
+
public
own with_sortbymember
- const dup
- const
- NULL
- NULL
+ 0.0
+ 0.0
diff --git a/aqhome/data/storage.c b/aqhome/data/storage.c
index d6dae45..1aa9a59 100644
--- a/aqhome/data/storage.c
+++ b/aqhome/data/storage.c
@@ -17,6 +17,8 @@
#include
#include
#include
+#include
+#include
@@ -26,6 +28,17 @@
*/
+static void _handleJsonTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue);
+static void _handleNumTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue);
+static void _handleValueForJsonElement(AQH_STORAGE *sto,
+ const AQH_MQTT_TOPIC *topic,
+ GWEN_JSON_ELEM *json,
+ time_t timestamp,
+ const AQH_VALUE *value);
+static AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId);
+static AQH_DATAFILE *_findDataFileByValueId(const AQH_STORAGE *sto, uint64_t valueId);
+static AQH_DATAFILE *_openOrCreateDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId);
+static GWEN_BUFFER *_getDataFilePathForValueId(const AQH_STORAGE *sto, uint64_t valueId);
@@ -44,6 +57,7 @@ AQH_STORAGE *AQH_Storage_new(void)
sto->deviceList=AQH_Device_List_new();
sto->mqttTopicList=AQH_MqttTopic_List_new();
sto->valueList=AQH_Value_List_new();
+ sto->dataFileList=AQH_DataFile_List_new();
return sto;
}
@@ -53,10 +67,12 @@ AQH_STORAGE *AQH_Storage_new(void)
void AQH_Storage_free(AQH_STORAGE *sto)
{
if (sto) {
+ AQH_DataFile_List_free(sto->dataFileList);
AQH_Value_List_free(sto->valueList);
AQH_MqttTopic_List_free(sto->mqttTopicList);
AQH_Device_List_free(sto->deviceList);
AQH_Room_List_free(sto->roomList);
+ free(sto->dataFileFolder);
free(sto->stateFile);
GWEN_FREE_OBJECT(sto);
@@ -249,13 +265,6 @@ void AQH_Storage_SubRuntimeFlags(AQH_STORAGE *sto, uint32_t flags)
-void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *topic, const char *value)
-{
- /* TODO */
-}
-
-
-
int AQH_Storage_Init(AQH_STORAGE *sto)
{
int rv;
@@ -281,6 +290,31 @@ int AQH_Storage_Init(AQH_STORAGE *sto)
+int AQH_Storage_Fini(AQH_STORAGE *sto)
+{
+ int errors=0;
+
+ if (sto) {
+ AQH_DATAFILE *df;
+
+ while( (df=AQH_DataFile_List_First(sto->dataFileList)) ) {
+ int rv;
+
+ AQH_DataFile_List_Del(df);
+ rv=AQH_DataFile_Close(df);
+ if (rv<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error closing file \"%s\"", AQH_DataFile_GetFileName(df));
+ errors++;
+ }
+ AQH_DataFile_free(df);
+ }
+ }
+
+ return (errors==0)?0:GWEN_ERROR_GENERIC;
+}
+
+
+
int AQH_Storage_WriteState(AQH_STORAGE *sto)
{
int rv;
@@ -297,5 +331,234 @@ int AQH_Storage_WriteState(AQH_STORAGE *sto)
+const char *AQH_Storage_GetDataFileFolder(const AQH_STORAGE *sto)
+{
+ return sto?sto->dataFileFolder:NULL;
+}
+
+
+
+void AQH_Storage_SetDataFileFolder(AQH_STORAGE *sto, const char *s)
+{
+ if (sto) {
+ free(sto->dataFileFolder);
+ sto->dataFileFolder=s?strdup(s):NULL;
+ }
+}
+
+
+
+
+void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *sTopic, const char *sValue)
+{
+ if (sto) {
+ const AQH_MQTT_TOPIC *topic;
+
+ topic=AQH_Storage_GetMqttTopicByTopic(sto, sTopic);
+ if (topic) {
+ if (AQH_MqttTopic_GetDataType(topic)==AQH_MqttTopicType_Json)
+ _handleJsonTopic(sto, topic, sValue);
+ else
+ _handleNumTopic(sto, topic, sValue);
+ }
+ else {
+ DBG_INFO(AQH_LOGDOMAIN, "Unknown MQTT topic \"%s\"", sTopic);
+ }
+ }
+}
+
+
+
+void _handleJsonTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue)
+{
+ time_t now;
+ GWEN_JSON_ELEM *json;
+
+ now=time(NULL);
+ json=GWEN_JsonElement_fromString(sValue);
+ if (json) {
+ AQH_VALUE *value;
+ uint64_t topicId;
+
+ topicId=AQH_MqttTopic_GetId(topic);
+ value=AQH_Value_List_First(sto->valueList);
+ while(value) {
+ if (AQH_Value_GetTopicId(value)==topicId) {
+ _handleValueForJsonElement(sto, topic, json, now, value);
+ }
+ value=AQH_Value_List_Next(value);
+ }
+ }
+ else {
+ DBG_ERROR(AQH_LOGDOMAIN, "Error parsing JSON data [%s]", sValue);
+ }
+}
+
+
+
+void _handleValueForJsonElement(AQH_STORAGE *sto,
+ const AQH_MQTT_TOPIC *topic,
+ GWEN_JSON_ELEM *json,
+ time_t timestamp,
+ const AQH_VALUE *value)
+{
+ const char *dataPath;
+
+ dataPath=AQH_Value_GetDataPath(value);
+ if (dataPath) {
+ GWEN_JSON_ELEM *jsonElem;
+
+ jsonElem=GWEN_JsonElement_GetElementByPath(json, dataPath, GWEN_PATH_FLAGS_PATHMUSTEXIST);
+ if (jsonElem) {
+ const char *s;
+
+ s=GWEN_JsonElement_GetData(jsonElem);
+ if (s && *s) {
+ int rv;
+ double v;
+
+ rv=GWEN_Text_StringToDouble(s, &v);
+ if (rv<0) {
+ DBG_ERROR(AQH_LOGDOMAIN, "Invalid value \"%s\" for topic %s:%s", s, AQH_MqttTopic_GetTopic(topic), dataPath);
+ }
+ else {
+ AQH_DATAFILE *df;
+
+ df=_getDataFileByValueId(sto, AQH_Value_GetId(value));
+ if (df) {
+ DBG_INFO(AQH_LOGDOMAIN, "Appending record to datafile");
+ AQH_DataFile_AppendRecord(df, timestamp, v);
+ }
+ }
+ }
+ else {
+ DBG_INFO(AQH_LOGDOMAIN, "Empty JSON element \"%s\"", dataPath);
+ }
+ }
+ else {
+ DBG_ERROR(AQH_LOGDOMAIN, "No value for topic %s:%s", AQH_MqttTopic_GetTopic(topic), dataPath);
+ }
+ }
+ else {
+ DBG_INFO(AQH_LOGDOMAIN, "No datapath in value \"%s\"", AQH_Value_GetName(value));
+ }
+}
+
+
+
+void _handleNumTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue)
+{
+ int rv;
+ double v;
+ time_t now;
+
+ now=time(NULL);
+
+ rv=GWEN_Text_StringToDouble(sValue, &v);
+ if (rv<0) {
+ DBG_ERROR(AQH_LOGDOMAIN, "Invalid value \"%s\" for topic %s, ignoring", sValue, AQH_MqttTopic_GetTopic(topic));
+ }
+ else {
+ AQH_VALUE *value;
+ uint64_t topicId;
+
+ topicId=AQH_MqttTopic_GetId(topic);
+ value=AQH_Value_List_First(sto->valueList);
+ while(value) {
+ if (AQH_Value_GetTopicId(value)==topicId)
+ break;
+ value=AQH_Value_List_Next(value);
+ }
+ if (value) {
+ AQH_DATAFILE *df;
+
+ df=_getDataFileByValueId(sto, AQH_Value_GetId(value));
+ if (df) {
+ DBG_INFO(AQH_LOGDOMAIN, "Appending record to datafile");
+ AQH_DataFile_AppendRecord(df, now, v);
+ }
+ }
+ }
+}
+
+
+
+
+AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId)
+{
+ AQH_DATAFILE *df;
+
+ df=_findDataFileByValueId(sto, valueId);
+ if (df==NULL) {
+ df=_openOrCreateDataFileByValueId(sto, valueId);
+ AQH_DataFile_List_Add(df, sto->dataFileList);
+ }
+
+ return df;
+}
+
+
+
+AQH_DATAFILE *_findDataFileByValueId(const AQH_STORAGE *sto, uint64_t valueId)
+{
+ if (sto && sto->dataFileList)
+ return AQH_DataFile_List_GetByValueId(sto->dataFileList, valueId);
+ return NULL;
+}
+
+
+
+AQH_DATAFILE *_openOrCreateDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId)
+{
+ AQH_DATAFILE *df;
+ GWEN_BUFFER *nameBuf;
+ int rv;
+
+ nameBuf=_getDataFilePathForValueId(sto, valueId);
+ df=AQH_DataFile_new(GWEN_Buffer_GetStart(nameBuf), valueId);
+
+ rv=GWEN_Directory_GetPath(GWEN_Buffer_GetStart(nameBuf),
+ GWEN_PATH_FLAGS_CHECKROOT |
+ GWEN_PATH_FLAGS_PATHMUSTEXIST |
+ GWEN_PATH_FLAGS_NAMEMUSTEXIST |
+ GWEN_PATH_FLAGS_VARIABLE);
+ if (rv==0) {
+ DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" exists, trying to open", GWEN_Buffer_GetStart(nameBuf));
+ rv=AQH_DataFile_Open(df);
+ if (rv<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error opening data file \"%s\" (%d)", GWEN_Buffer_GetStart(nameBuf), rv);
+ GWEN_Buffer_free(nameBuf);
+ return NULL;
+ }
+ }
+ else {
+ DBG_INFO(AQH_LOGDOMAIN, "Creating file \"%s\"", GWEN_Buffer_GetStart(nameBuf));
+ rv=AQH_DataFile_Create(df);
+ if (rv<0) {
+ DBG_INFO(AQH_LOGDOMAIN, "Error creating data file \"%s\" (%d)", GWEN_Buffer_GetStart(nameBuf), rv);
+ GWEN_Buffer_free(nameBuf);
+ return NULL;
+ }
+ }
+ GWEN_Buffer_free(nameBuf);
+
+ return df;
+}
+
+
+
+GWEN_BUFFER *_getDataFilePathForValueId(const AQH_STORAGE *sto, uint64_t valueId)
+{
+ GWEN_BUFFER *buf;
+
+ buf=GWEN_Buffer_new(0, 256, 0, 1);
+ if (sto->dataFileFolder) {
+ GWEN_Buffer_AppendString(buf, sto->dataFileFolder);
+ GWEN_Buffer_AppendString(buf, GWEN_DIR_SEPARATOR_S);
+ }
+ GWEN_Buffer_AppendArgs(buf, "%" PRIu64 ".data", valueId);
+ return buf;
+}
+
diff --git a/aqhome/data/storage.h b/aqhome/data/storage.h
index 7679949..7122c20 100644
--- a/aqhome/data/storage.h
+++ b/aqhome/data/storage.h
@@ -72,6 +72,9 @@ AQHOME_API AQH_VALUE *AQH_Storage_GetValueByName(const AQH_STORAGE *sto, const c
AQHOME_API const char *AQH_Storage_GetStateFile(const AQH_STORAGE *sto);
AQHOME_API void AQH_Storage_SetStateFile(AQH_STORAGE *sto, const char *s);
+AQHOME_API const char *AQH_Storage_GetDataFileFolder(const AQH_STORAGE *sto);
+AQHOME_API void AQH_Storage_SetDataFileFolder(AQH_STORAGE *sto, const char *s);
+
AQHOME_API uint32_t AQH_Storage_GetRuntimeFlags(const AQH_STORAGE *sto);
AQHOME_API void AQH_Storage_SetRuntimeFlags(AQH_STORAGE *sto, uint32_t flags);
AQHOME_API void AQH_Storage_AddRuntimeFlags(AQH_STORAGE *sto, uint32_t flags);
@@ -79,6 +82,8 @@ AQHOME_API void AQH_Storage_SubRuntimeFlags(AQH_STORAGE *sto, uint32_t flags);
AQHOME_API int AQH_Storage_Init(AQH_STORAGE *sto);
+AQHOME_API int AQH_Storage_Fini(AQH_STORAGE *sto);
+
AQHOME_API int AQH_Storage_WriteState(AQH_STORAGE *sto);
diff --git a/aqhome/data/storage_p.h b/aqhome/data/storage_p.h
index c5f0fed..0327dee 100644
--- a/aqhome/data/storage_p.h
+++ b/aqhome/data/storage_p.h
@@ -11,6 +11,7 @@
#include "aqhome/data/storage.h"
+#include "aqhome/data/datafile.h"
#define AQH_STORAGE_XML_ELEMENTNAME_LASTIDS "lastIds"
@@ -41,6 +42,9 @@ struct AQH_STORAGE {
uint64_t lastValueId;
char *stateFile;
+ char *dataFileFolder;
+
+ AQH_DATAFILE_LIST *dataFileList;
uint32_t runtimeFlags;
};