Added datafile and handling of MQTT publish message.

This commit is contained in:
Martin Preuss
2023-08-12 02:06:54 +02:00
parent edcac1f2b9
commit f5878f43ff
9 changed files with 783 additions and 13 deletions

View File

@@ -68,6 +68,7 @@
<headers dist="true" install="$(pkgincludedir)/data" > <headers dist="true" install="$(pkgincludedir)/data" >
storage.h storage.h
datafile.h
</headers> </headers>
@@ -75,6 +76,7 @@
storage_p.h storage_p.h
storage_readxml.h storage_readxml.h
storage_writexml.h storage_writexml.h
datafile_p.h
</headers> </headers>
@@ -84,6 +86,7 @@
storage.c storage.c
storage_readxml.c storage_readxml.c
storage_writexml.c storage_writexml.c
datafile.c
</sources> </sources>

View File

@@ -77,3 +77,20 @@ Storage API:
datafile:
- valueId
- filename
- number of entries
- fd
- getNumOfEntries()
- readRecord(id, &record)
- addRecord(record)
datarecord in file:
- 8 bytes timestamp
- 8 bytes value

397
aqhome/data/datafile.c Normal file
View File

@@ -0,0 +1,397 @@
/****************************************************************************
* This file is part of the project Gwenhywfar.
* Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./datafile_p.h"
#include <gwenhywfar/debug.h>
#include <gwenhywfar/syncio_file.h>
#include <unistd.h>
GWEN_LIST_FUNCTIONS(AQH_DATAFILE, AQH_DataFile);
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue);
static int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
AQH_DATAFILE *AQH_DataFile_new(const char *fileName, uint64_t valueId)
{
AQH_DATAFILE *df;
GWEN_NEW_OBJECT(AQH_DATAFILE, df);
GWEN_LIST_INIT(AQH_DATAFILE, df);
df->valueId=valueId;
df->fileName=fileName?strdup(fileName):NULL;
return df;
}
void AQH_DataFile_free(AQH_DATAFILE *df)
{
if (df) {
GWEN_LIST_FINI(AQH_DATAFILE, df);
GWEN_SyncIo_free(df->sio);
free(df->fileName);
GWEN_FREE_OBJECT(df);
}
}
const char *AQH_DataFile_GetFileName(const AQH_DATAFILE *df)
{
return df?df->fileName:NULL;
}
uint64_t AQH_DataFile_GetValueId(const AQH_DATAFILE *df)
{
return df?df->valueId:0;
}
uint64_t AQH_DataFile_GetNumberOfEntries(const AQH_DATAFILE *df)
{
return df?df->numberOfEntries:0;
}
uint32_t AQH_DataFile_GetRuntimeFlags(const AQH_DATAFILE *df)
{
return df?df->runtimeFlags:0;
}
int AQH_DataFile_Create(AQH_DATAFILE *df)
{
if (df) {
GWEN_SYNCIO *sio;
int rv;
if (df->sio) {
DBG_ERROR(AQH_LOGDOMAIN, "File already open");
return GWEN_ERROR_INVALID;
}
sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew);
rv=GWEN_SyncIo_Connect(sio);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv);
GWEN_SyncIo_free(sio);
return rv;
}
rv=_writeRecord(sio, 0, 0, 0.0);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error adding first record to file \"%s\" (%d)", df->fileName, rv);
GWEN_SyncIo_free(sio);
return rv;
}
df->sio=sio;
df->numberOfEntries=1; /* we just added the 0-record */
DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" created.", df->fileName);
return 0;
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
return GWEN_ERROR_GENERIC;
}
}
int AQH_DataFile_Open(AQH_DATAFILE *df)
{
if (df) {
GWEN_SYNCIO *sio;
int rv;
int64_t len;
if (df->sio) {
DBG_ERROR(AQH_LOGDOMAIN, "File already open");
return GWEN_ERROR_INVALID;
}
sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew);
rv=GWEN_SyncIo_Connect(sio);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv);
GWEN_SyncIo_free(sio);
return rv;
}
len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_End);
if (len<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error seeking to end of file \"%s\" (%d)", df->fileName, rv);
GWEN_SyncIo_free(sio);
return rv;
}
if (len % AQH_DATAFILE_RECORDSIZE) {
DBG_INFO(AQH_LOGDOMAIN, "Invalid file size of file \"%s\" ((not a multiple of current record size))", df->fileName);
GWEN_SyncIo_Disconnect(sio);
GWEN_SyncIo_free(sio);
unlink(df->fileName);
return GWEN_ERROR_BAD_DATA;
}
df->numberOfEntries=len/AQH_DATAFILE_RECORDSIZE;
len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_Set);
if (len<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error seeking to begin of file \"%s\" (%d)", df->fileName, rv);
GWEN_SyncIo_free(sio);
return rv;
}
df->sio=sio;
return 0;
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
return GWEN_ERROR_GENERIC;
}
}
int AQH_DataFile_Close(AQH_DATAFILE *df)
{
if (df) {
int rv;
if (df->sio==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "File not open");
return GWEN_ERROR_INVALID;
}
rv=GWEN_SyncIo_Disconnect(df->sio);
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Error closing file \"%s\" (%d)", df->fileName, rv);
GWEN_SyncIo_free(df->sio);
df->sio=NULL;
df->numberOfEntries=0;
return rv;
}
GWEN_SyncIo_free(df->sio);
df->sio=NULL;
df->numberOfEntries=0;
return 0;
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
return GWEN_ERROR_GENERIC;
}
}
int AQH_DataFile_ReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue)
{
if (df) {
int rv;
if (df->sio==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "File not open");
return GWEN_ERROR_INVALID;
}
rv=_readRecord(df->sio, idx, pTimestamp, pValue);;
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error writing to file \"%s\" (%d)", df->fileName, rv);
return rv;
}
return 0;
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
return GWEN_ERROR_GENERIC;
}
}
int AQH_DataFile_AppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value)
{
if (df) {
int rv;
uint64_t filePos;
if (df->sio==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "File not open");
return GWEN_ERROR_INVALID;
}
filePos=(df->numberOfEntries)*AQH_DATAFILE_RECORDSIZE;
rv=_writeRecord(df->sio, filePos, timestamp, value);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error writing to file \"%s\" (%d)", df->fileName, rv);
return rv;
}
df->numberOfEntries++;
DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" has now %lu entries", df->fileName, (unsigned long int) (df->numberOfEntries));
return 0;
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
return GWEN_ERROR_GENERIC;
}
}
AQH_DATAFILE *AQH_DataFile_List_GetByValueId(AQH_DATAFILE_LIST *fileList, uint64_t id)
{
if (fileList) {
AQH_DATAFILE *df;
df=AQH_DataFile_List_First(fileList);
while(df) {
if (df->valueId==id)
break;
df=AQH_DataFile_List_Next(df);
}
}
return NULL;
}
int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue)
{
uint8_t record[AQH_DATAFILE_RECORDSIZE];
union {double f; uint64_t i;} u;
uint64_t filePos;
int64_t retPos;
const uint8_t *ptr;
uint64_t v;
int rv;
filePos=idx*AQH_DATAFILE_RECORDSIZE;
retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set);
if (retPos<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos);
return (int) retPos;
}
rv=GWEN_SyncIo_ReadForced(sio, record, AQH_DATAFILE_RECORDSIZE);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error reading at pos %lu (%d)", (unsigned long int) filePos, rv);
return rv;
}
ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP;
v=0;
v|=(uint64_t)(*(ptr++));
v|=(uint64_t)(*(ptr++))<<8;
v|=(uint64_t)(*(ptr++))<<16;
v|=(uint64_t)(*(ptr++))<<24;
v|=(uint64_t)(*(ptr++))<<32;
v|=(uint64_t)(*(ptr++))<<40;
v|=(uint64_t)(*(ptr++))<<48;
v|=(uint64_t)(*(ptr++))<<56;
*pTimestamp=v;
ptr=record+AQH_DATAFILE_OFFSET_VALUE;
v=0;
v|=(uint64_t)(*(ptr++));
v|=(uint64_t)(*(ptr++))<<8;
v|=(uint64_t)(*(ptr++))<<16;
v|=(uint64_t)(*(ptr++))<<24;
v|=(uint64_t)(*(ptr++))<<32;
v|=(uint64_t)(*(ptr++))<<40;
v|=(uint64_t)(*(ptr++))<<48;
v|=(uint64_t)(*(ptr++))<<56;
u.i=v;
*pValue=u.f;
return 0;
}
int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value)
{
uint8_t record[AQH_DATAFILE_RECORDSIZE];
union {double f; uint64_t i;} u;
uint8_t *ptr;
int64_t retPos;
uint64_t v;
int rv;
ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP;
v=timestamp;
*(ptr++)=(v) & 0xff;
*(ptr++)=(v>>8) & 0xff;
*(ptr++)=(v>>16) & 0xff;
*(ptr++)=(v>>24) & 0xff;
*(ptr++)=(v>>32) & 0xff;
*(ptr++)=(v>>40) & 0xff;
*(ptr++)=(v>>48) & 0xff;
*(ptr++)=(v>>56) & 0xff;
u.f=value;
v=u.i;
ptr=record+AQH_DATAFILE_OFFSET_VALUE;
*(ptr++)=(v) & 0xff;
*(ptr++)=(v>>8) & 0xff;
*(ptr++)=(v>>16) & 0xff;
*(ptr++)=(v>>24) & 0xff;
*(ptr++)=(v>>32) & 0xff;
*(ptr++)=(v>>40) & 0xff;
*(ptr++)=(v>>48) & 0xff;
*(ptr++)=(v>>56) & 0xff;
retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set);
if (retPos<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos);
return (int) retPos;
}
rv=GWEN_SyncIo_WriteForced(sio, record, AQH_DATAFILE_RECORDSIZE);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error writing to file (%d)", rv);
return rv;
}
return 0;
}

44
aqhome/data/datafile.h Normal file
View File

@@ -0,0 +1,44 @@
/****************************************************************************
* This file is part of the project Gwenhywfar.
* Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_DATAFILE_H
#define AQH_DATAFILE_H
#include "aqhome/data/storage.h"
#include <gwenhywfar/list.h>
typedef struct AQH_DATAFILE AQH_DATAFILE;
GWEN_LIST_FUNCTION_LIB_DEFS(AQH_DATAFILE, AQH_DataFile, AQHOME_API)
AQHOME_API AQH_DATAFILE *AQH_DataFile_new(const char *fileName, uint64_t valueId);
AQHOME_API void AQH_DataFile_free(AQH_DATAFILE *df);
AQHOME_API int AQH_DataFile_Create(AQH_DATAFILE *df);
AQHOME_API int AQH_DataFile_Open(AQH_DATAFILE *df);
AQHOME_API int AQH_DataFile_Close(AQH_DATAFILE *df);
AQHOME_API const char *AQH_DataFile_GetFileName(const AQH_DATAFILE *df);
AQHOME_API uint64_t AQH_DataFile_GetValueId(const AQH_DATAFILE *df);
AQHOME_API uint64_t AQH_DataFile_GetNumberOfEntries(const AQH_DATAFILE *df);
AQHOME_API uint32_t AQH_DataFile_GetRuntimeFlags(const AQH_DATAFILE *df);
AQHOME_API int AQH_DataFile_ReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue);
AQHOME_API int AQH_DataFile_AppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value);
AQHOME_API AQH_DATAFILE *AQH_DataFile_List_GetByValueId(AQH_DATAFILE_LIST *fileList, uint64_t id);
#endif

39
aqhome/data/datafile_p.h Normal file
View File

@@ -0,0 +1,39 @@
/****************************************************************************
* This file is part of the project Gwenhywfar.
* Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_DATAFILE_P_H
#define AQH_DATAFILE_P_H
#include "./datafile.h"
#include <gwenhywfar/syncio.h>
#define AQH_DATAFILE_RECORDSIZE 16
#define AQH_DATAFILE_OFFSET_TIMESTAMP 0
#define AQH_DATAFILE_OFFSET_VALUE 8
struct AQH_DATAFILE {
GWEN_LIST_ELEMENT(AQH_DATAFILE);
char *fileName;
uint64_t valueId;
GWEN_SYNCIO *sio;
uint64_t numberOfEntries;
uint32_t runtimeFlags;
};
#endif

View File

@@ -34,7 +34,7 @@
<access>public</access> <access>public</access>
</member> </member>
<member name="valueId" type="int" maxlen="8"> <member name="valueId" type="uint64_t" maxlen="8">
<default>0</default> <default>0</default>
<preset>0</preset> <preset>0</preset>
<flags></flags> <flags></flags>
@@ -48,13 +48,11 @@
<preset>0</preset> <preset>0</preset>
</member> </member>
<member name="value" type="AQDB_VALUE" maxlen="256" > <member name="value" type="double" maxlen="8" >
<access>public</access> <access>public</access>
<flags>own with_sortbymember</flags> <flags>own with_sortbymember</flags>
<setflags>const dup</setflags> <default>0.0</default>
<getflags>const</getflags> <preset>0.0</preset>
<default>NULL</default>
<preset>NULL</preset>
</member> </member>

View File

@@ -17,6 +17,8 @@
#include <gwenhywfar/debug.h> #include <gwenhywfar/debug.h>
#include <gwenhywfar/xml.h> #include <gwenhywfar/xml.h>
#include <gwenhywfar/directory.h> #include <gwenhywfar/directory.h>
#include <gwenhywfar/text.h>
#include <gwenhywfar/json_read.h>
@@ -26,6 +28,17 @@
*/ */
static void _handleJsonTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue);
static void _handleNumTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue);
static void _handleValueForJsonElement(AQH_STORAGE *sto,
const AQH_MQTT_TOPIC *topic,
GWEN_JSON_ELEM *json,
time_t timestamp,
const AQH_VALUE *value);
static AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId);
static AQH_DATAFILE *_findDataFileByValueId(const AQH_STORAGE *sto, uint64_t valueId);
static AQH_DATAFILE *_openOrCreateDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId);
static GWEN_BUFFER *_getDataFilePathForValueId(const AQH_STORAGE *sto, uint64_t valueId);
@@ -44,6 +57,7 @@ AQH_STORAGE *AQH_Storage_new(void)
sto->deviceList=AQH_Device_List_new(); sto->deviceList=AQH_Device_List_new();
sto->mqttTopicList=AQH_MqttTopic_List_new(); sto->mqttTopicList=AQH_MqttTopic_List_new();
sto->valueList=AQH_Value_List_new(); sto->valueList=AQH_Value_List_new();
sto->dataFileList=AQH_DataFile_List_new();
return sto; return sto;
} }
@@ -53,10 +67,12 @@ AQH_STORAGE *AQH_Storage_new(void)
void AQH_Storage_free(AQH_STORAGE *sto) void AQH_Storage_free(AQH_STORAGE *sto)
{ {
if (sto) { if (sto) {
AQH_DataFile_List_free(sto->dataFileList);
AQH_Value_List_free(sto->valueList); AQH_Value_List_free(sto->valueList);
AQH_MqttTopic_List_free(sto->mqttTopicList); AQH_MqttTopic_List_free(sto->mqttTopicList);
AQH_Device_List_free(sto->deviceList); AQH_Device_List_free(sto->deviceList);
AQH_Room_List_free(sto->roomList); AQH_Room_List_free(sto->roomList);
free(sto->dataFileFolder);
free(sto->stateFile); free(sto->stateFile);
GWEN_FREE_OBJECT(sto); GWEN_FREE_OBJECT(sto);
@@ -249,13 +265,6 @@ void AQH_Storage_SubRuntimeFlags(AQH_STORAGE *sto, uint32_t flags)
void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *topic, const char *value)
{
/* TODO */
}
int AQH_Storage_Init(AQH_STORAGE *sto) int AQH_Storage_Init(AQH_STORAGE *sto)
{ {
int rv; int rv;
@@ -281,6 +290,31 @@ int AQH_Storage_Init(AQH_STORAGE *sto)
int AQH_Storage_Fini(AQH_STORAGE *sto)
{
int errors=0;
if (sto) {
AQH_DATAFILE *df;
while( (df=AQH_DataFile_List_First(sto->dataFileList)) ) {
int rv;
AQH_DataFile_List_Del(df);
rv=AQH_DataFile_Close(df);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error closing file \"%s\"", AQH_DataFile_GetFileName(df));
errors++;
}
AQH_DataFile_free(df);
}
}
return (errors==0)?0:GWEN_ERROR_GENERIC;
}
int AQH_Storage_WriteState(AQH_STORAGE *sto) int AQH_Storage_WriteState(AQH_STORAGE *sto)
{ {
int rv; int rv;
@@ -297,5 +331,234 @@ int AQH_Storage_WriteState(AQH_STORAGE *sto)
const char *AQH_Storage_GetDataFileFolder(const AQH_STORAGE *sto)
{
return sto?sto->dataFileFolder:NULL;
}
void AQH_Storage_SetDataFileFolder(AQH_STORAGE *sto, const char *s)
{
if (sto) {
free(sto->dataFileFolder);
sto->dataFileFolder=s?strdup(s):NULL;
}
}
void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *sTopic, const char *sValue)
{
if (sto) {
const AQH_MQTT_TOPIC *topic;
topic=AQH_Storage_GetMqttTopicByTopic(sto, sTopic);
if (topic) {
if (AQH_MqttTopic_GetDataType(topic)==AQH_MqttTopicType_Json)
_handleJsonTopic(sto, topic, sValue);
else
_handleNumTopic(sto, topic, sValue);
}
else {
DBG_INFO(AQH_LOGDOMAIN, "Unknown MQTT topic \"%s\"", sTopic);
}
}
}
void _handleJsonTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue)
{
time_t now;
GWEN_JSON_ELEM *json;
now=time(NULL);
json=GWEN_JsonElement_fromString(sValue);
if (json) {
AQH_VALUE *value;
uint64_t topicId;
topicId=AQH_MqttTopic_GetId(topic);
value=AQH_Value_List_First(sto->valueList);
while(value) {
if (AQH_Value_GetTopicId(value)==topicId) {
_handleValueForJsonElement(sto, topic, json, now, value);
}
value=AQH_Value_List_Next(value);
}
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Error parsing JSON data [%s]", sValue);
}
}
void _handleValueForJsonElement(AQH_STORAGE *sto,
const AQH_MQTT_TOPIC *topic,
GWEN_JSON_ELEM *json,
time_t timestamp,
const AQH_VALUE *value)
{
const char *dataPath;
dataPath=AQH_Value_GetDataPath(value);
if (dataPath) {
GWEN_JSON_ELEM *jsonElem;
jsonElem=GWEN_JsonElement_GetElementByPath(json, dataPath, GWEN_PATH_FLAGS_PATHMUSTEXIST);
if (jsonElem) {
const char *s;
s=GWEN_JsonElement_GetData(jsonElem);
if (s && *s) {
int rv;
double v;
rv=GWEN_Text_StringToDouble(s, &v);
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Invalid value \"%s\" for topic %s:%s", s, AQH_MqttTopic_GetTopic(topic), dataPath);
}
else {
AQH_DATAFILE *df;
df=_getDataFileByValueId(sto, AQH_Value_GetId(value));
if (df) {
DBG_INFO(AQH_LOGDOMAIN, "Appending record to datafile");
AQH_DataFile_AppendRecord(df, timestamp, v);
}
}
}
else {
DBG_INFO(AQH_LOGDOMAIN, "Empty JSON element \"%s\"", dataPath);
}
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "No value for topic %s:%s", AQH_MqttTopic_GetTopic(topic), dataPath);
}
}
else {
DBG_INFO(AQH_LOGDOMAIN, "No datapath in value \"%s\"", AQH_Value_GetName(value));
}
}
void _handleNumTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue)
{
int rv;
double v;
time_t now;
now=time(NULL);
rv=GWEN_Text_StringToDouble(sValue, &v);
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Invalid value \"%s\" for topic %s, ignoring", sValue, AQH_MqttTopic_GetTopic(topic));
}
else {
AQH_VALUE *value;
uint64_t topicId;
topicId=AQH_MqttTopic_GetId(topic);
value=AQH_Value_List_First(sto->valueList);
while(value) {
if (AQH_Value_GetTopicId(value)==topicId)
break;
value=AQH_Value_List_Next(value);
}
if (value) {
AQH_DATAFILE *df;
df=_getDataFileByValueId(sto, AQH_Value_GetId(value));
if (df) {
DBG_INFO(AQH_LOGDOMAIN, "Appending record to datafile");
AQH_DataFile_AppendRecord(df, now, v);
}
}
}
}
AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId)
{
AQH_DATAFILE *df;
df=_findDataFileByValueId(sto, valueId);
if (df==NULL) {
df=_openOrCreateDataFileByValueId(sto, valueId);
AQH_DataFile_List_Add(df, sto->dataFileList);
}
return df;
}
AQH_DATAFILE *_findDataFileByValueId(const AQH_STORAGE *sto, uint64_t valueId)
{
if (sto && sto->dataFileList)
return AQH_DataFile_List_GetByValueId(sto->dataFileList, valueId);
return NULL;
}
AQH_DATAFILE *_openOrCreateDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId)
{
AQH_DATAFILE *df;
GWEN_BUFFER *nameBuf;
int rv;
nameBuf=_getDataFilePathForValueId(sto, valueId);
df=AQH_DataFile_new(GWEN_Buffer_GetStart(nameBuf), valueId);
rv=GWEN_Directory_GetPath(GWEN_Buffer_GetStart(nameBuf),
GWEN_PATH_FLAGS_CHECKROOT |
GWEN_PATH_FLAGS_PATHMUSTEXIST |
GWEN_PATH_FLAGS_NAMEMUSTEXIST |
GWEN_PATH_FLAGS_VARIABLE);
if (rv==0) {
DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" exists, trying to open", GWEN_Buffer_GetStart(nameBuf));
rv=AQH_DataFile_Open(df);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error opening data file \"%s\" (%d)", GWEN_Buffer_GetStart(nameBuf), rv);
GWEN_Buffer_free(nameBuf);
return NULL;
}
}
else {
DBG_INFO(AQH_LOGDOMAIN, "Creating file \"%s\"", GWEN_Buffer_GetStart(nameBuf));
rv=AQH_DataFile_Create(df);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error creating data file \"%s\" (%d)", GWEN_Buffer_GetStart(nameBuf), rv);
GWEN_Buffer_free(nameBuf);
return NULL;
}
}
GWEN_Buffer_free(nameBuf);
return df;
}
GWEN_BUFFER *_getDataFilePathForValueId(const AQH_STORAGE *sto, uint64_t valueId)
{
GWEN_BUFFER *buf;
buf=GWEN_Buffer_new(0, 256, 0, 1);
if (sto->dataFileFolder) {
GWEN_Buffer_AppendString(buf, sto->dataFileFolder);
GWEN_Buffer_AppendString(buf, GWEN_DIR_SEPARATOR_S);
}
GWEN_Buffer_AppendArgs(buf, "%" PRIu64 ".data", valueId);
return buf;
}

View File

@@ -72,6 +72,9 @@ AQHOME_API AQH_VALUE *AQH_Storage_GetValueByName(const AQH_STORAGE *sto, const c
AQHOME_API const char *AQH_Storage_GetStateFile(const AQH_STORAGE *sto); AQHOME_API const char *AQH_Storage_GetStateFile(const AQH_STORAGE *sto);
AQHOME_API void AQH_Storage_SetStateFile(AQH_STORAGE *sto, const char *s); AQHOME_API void AQH_Storage_SetStateFile(AQH_STORAGE *sto, const char *s);
AQHOME_API const char *AQH_Storage_GetDataFileFolder(const AQH_STORAGE *sto);
AQHOME_API void AQH_Storage_SetDataFileFolder(AQH_STORAGE *sto, const char *s);
AQHOME_API uint32_t AQH_Storage_GetRuntimeFlags(const AQH_STORAGE *sto); AQHOME_API uint32_t AQH_Storage_GetRuntimeFlags(const AQH_STORAGE *sto);
AQHOME_API void AQH_Storage_SetRuntimeFlags(AQH_STORAGE *sto, uint32_t flags); AQHOME_API void AQH_Storage_SetRuntimeFlags(AQH_STORAGE *sto, uint32_t flags);
AQHOME_API void AQH_Storage_AddRuntimeFlags(AQH_STORAGE *sto, uint32_t flags); AQHOME_API void AQH_Storage_AddRuntimeFlags(AQH_STORAGE *sto, uint32_t flags);
@@ -79,6 +82,8 @@ AQHOME_API void AQH_Storage_SubRuntimeFlags(AQH_STORAGE *sto, uint32_t flags);
AQHOME_API int AQH_Storage_Init(AQH_STORAGE *sto); AQHOME_API int AQH_Storage_Init(AQH_STORAGE *sto);
AQHOME_API int AQH_Storage_Fini(AQH_STORAGE *sto);
AQHOME_API int AQH_Storage_WriteState(AQH_STORAGE *sto); AQHOME_API int AQH_Storage_WriteState(AQH_STORAGE *sto);

View File

@@ -11,6 +11,7 @@
#include "aqhome/data/storage.h" #include "aqhome/data/storage.h"
#include "aqhome/data/datafile.h"
#define AQH_STORAGE_XML_ELEMENTNAME_LASTIDS "lastIds" #define AQH_STORAGE_XML_ELEMENTNAME_LASTIDS "lastIds"
@@ -41,6 +42,9 @@ struct AQH_STORAGE {
uint64_t lastValueId; uint64_t lastValueId;
char *stateFile; char *stateFile;
char *dataFileFolder;
AQH_DATAFILE_LIST *dataFileList;
uint32_t runtimeFlags; uint32_t runtimeFlags;
}; };