/**************************************************************************** * This file is part of the project Gwenhywfar. * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "aqhome/data/storage_p.h" #include "aqhome/data/storage_readxml.h" #include "aqhome/data/storage_writexml.h" #include #include #include #include #include /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static void _handleJsonTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue); static void _handleNumTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue); static void _handleValueForJsonElement(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, GWEN_JSON_ELEM *json, time_t timestamp, const AQH_VALUE *value); static AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId); static AQH_DATAFILE *_findDataFileByValueId(const AQH_STORAGE *sto, uint64_t valueId); static AQH_DATAFILE *_openOrCreateDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId); static GWEN_BUFFER *_getDataFilePathForValueId(const AQH_STORAGE *sto, uint64_t valueId); /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ AQH_STORAGE *AQH_Storage_new(void) { AQH_STORAGE *sto; GWEN_NEW_OBJECT(AQH_STORAGE, sto); sto->roomList=AQH_Room_List_new(); sto->deviceList=AQH_Device_List_new(); sto->mqttTopicList=AQH_MqttTopic_List_new(); sto->valueList=AQH_Value_List_new(); sto->dataFileList=AQH_DataFile_List_new(); sto->recvdTopicList=GWEN_StringList_new(); return sto; } void AQH_Storage_free(AQH_STORAGE *sto) { if (sto) { AQH_DataFile_List_free(sto->dataFileList); AQH_Value_List_free(sto->valueList); AQH_MqttTopic_List_free(sto->mqttTopicList); AQH_Device_List_free(sto->deviceList); AQH_Room_List_free(sto->roomList); GWEN_StringList_free(sto->recvdTopicList); free(sto->dataFileFolder); free(sto->stateFile); GWEN_FREE_OBJECT(sto); } } const char *AQH_Storage_GetStateFile(const AQH_STORAGE *sto) { return sto?(sto->stateFile):NULL; } void AQH_Storage_SetStateFile(AQH_STORAGE *sto, const char *s) { if (sto) { free(sto->stateFile); sto->stateFile=s?strdup(s):NULL; } } GWEN_STRINGLIST *AQH_Storage_GetRecvdTopicList(const AQH_STORAGE *sto) { return sto?sto->recvdTopicList:NULL; } void AQH_Storage_AddRoom(AQH_STORAGE *sto, AQH_ROOM *r) { if (sto && r) { uint64_t id; id=++(sto->lastRoomId); AQH_Room_SetId(r, id); AQH_Room_List_Add(r, sto->roomList); } } AQH_ROOM_LIST *AQH_Storage_GetRoomList(const AQH_STORAGE *sto) { return sto?sto->roomList:NULL; } AQH_ROOM *AQH_Storage_GetRoomById(const AQH_STORAGE *sto, uint64_t id) { return sto?AQH_Room_List_GetById(sto->roomList, id):NULL; } AQH_ROOM *AQH_Storage_GetRoomByName(const AQH_STORAGE *sto, const char *s) { return sto?AQH_Room_List_GetByName(sto->roomList, s):NULL; } void AQH_Storage_AddDevice(AQH_STORAGE *sto, AQH_DEVICE *dev) { if (sto && dev) { uint64_t id; id=++(sto->lastDeviceId); AQH_Device_SetId(dev, id); AQH_Device_List_Add(dev, sto->deviceList); } } AQH_DEVICE_LIST *AQH_Storage_GetDeviceList(const AQH_STORAGE *sto) { return sto?sto->deviceList:NULL; } AQH_DEVICE *AQH_Storage_GetDeviceById(const AQH_STORAGE *sto, uint64_t id) { return sto?AQH_Device_List_GetById(sto->deviceList, id):NULL; } AQH_DEVICE *AQH_Storage_GetDeviceByName(const AQH_STORAGE *sto, const char *s) { return sto?AQH_Device_List_GetByName(sto->deviceList, s):NULL; } void AQH_Storage_AddMqttTopic(AQH_STORAGE *sto, AQH_MQTT_TOPIC *t) { if (sto && t) { uint64_t id; id=++(sto->lastTopicId); AQH_MqttTopic_SetId(t, id); AQH_MqttTopic_List_Add(t, sto->mqttTopicList); } } AQH_MQTT_TOPIC_LIST *AQH_Storage_GetMqttTopicList(const AQH_STORAGE *sto) { return sto?sto->mqttTopicList:NULL; } AQH_MQTT_TOPIC *AQH_Storage_GetMqttTopicById(const AQH_STORAGE *sto, uint64_t id) { return sto?AQH_MqttTopic_List_GetById(sto->mqttTopicList, id):NULL; } AQH_MQTT_TOPIC *AQH_Storage_GetMqttTopicByTopic(const AQH_STORAGE *sto, const char *topic) { return sto?AQH_MqttTopic_List_GetByTopic(sto->mqttTopicList, topic):NULL; } void AQH_Storage_AddValue(AQH_STORAGE *sto, AQH_VALUE *value) { if (sto && value) { uint64_t id; id=++(sto->lastValueId); AQH_Value_SetId(value, id); AQH_Value_List_Add(value, sto->valueList); } } AQH_VALUE_LIST *AQH_Storage_GetValueList(const AQH_STORAGE *sto) { return sto?sto->valueList:NULL; } AQH_VALUE *AQH_Storage_GetValueById(const AQH_STORAGE *sto, uint64_t id) { return sto?AQH_Value_List_GetById(sto->valueList, id):NULL; } AQH_VALUE *AQH_Storage_GetValueByName(const AQH_STORAGE *sto, const char *s) { return sto?AQH_Value_List_GetByName(sto->valueList, s):NULL; } uint32_t AQH_Storage_GetRuntimeFlags(const AQH_STORAGE *sto) { return sto?sto->runtimeFlags:0; } void AQH_Storage_SetRuntimeFlags(AQH_STORAGE *sto, uint32_t flags) { if (sto) sto->runtimeFlags=flags; } void AQH_Storage_AddRuntimeFlags(AQH_STORAGE *sto, uint32_t flags) { if (sto) sto->runtimeFlags|=flags; } void AQH_Storage_SubRuntimeFlags(AQH_STORAGE *sto, uint32_t flags) { if (sto) sto->runtimeFlags&=~flags; } int AQH_Storage_Init(AQH_STORAGE *sto) { int rv; rv=GWEN_Directory_GetPath(sto->stateFile, GWEN_PATH_FLAGS_CHECKROOT | GWEN_PATH_FLAGS_PATHMUSTEXIST | GWEN_PATH_FLAGS_NAMEMUSTEXIST | GWEN_PATH_FLAGS_VARIABLE); if (rv==0) { rv=AQH_Storage_ReadStateFile(sto, sto->stateFile); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } } else { DBG_WARN(AQH_LOGDOMAIN, "State file \"%s\" not available, will try to create it later (%d)", sto->stateFile, rv); } return 0; } int AQH_Storage_Fini(AQH_STORAGE *sto) { int errors=0; if (sto) { AQH_DATAFILE *df; while( (df=AQH_DataFile_List_First(sto->dataFileList)) ) { int rv; AQH_DataFile_List_Del(df); rv=AQH_DataFile_Close(df); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error closing file \"%s\"", AQH_DataFile_GetFileName(df)); errors++; } AQH_DataFile_free(df); } } return (errors==0)?0:GWEN_ERROR_GENERIC; } int AQH_Storage_WriteState(AQH_STORAGE *sto) { int rv; rv=AQH_Storage_WriteStateFile(sto, sto->stateFile); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } sto->runtimeFlags&=~AQH_STORAGE_RTFLAGS_MODIFIED; return 0; } const char *AQH_Storage_GetDataFileFolder(const AQH_STORAGE *sto) { return sto?sto->dataFileFolder:NULL; } void AQH_Storage_SetDataFileFolder(AQH_STORAGE *sto, const char *s) { if (sto) { free(sto->dataFileFolder); sto->dataFileFolder=s?strdup(s):NULL; } } void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *sTopic, const char *sValue) { if (sto) { const AQH_MQTT_TOPIC *topic; topic=AQH_Storage_GetMqttTopicByTopic(sto, sTopic); if (topic) { DBG_INFO(AQH_LOGDOMAIN, "Handling MQTT topic \"%s\"", sTopic); if (AQH_MqttTopic_GetDataType(topic)==AQH_MqttTopicType_Json) _handleJsonTopic(sto, topic, sValue); else _handleNumTopic(sto, topic, sValue); } else { DBG_INFO(AQH_LOGDOMAIN, "Unknown MQTT topic \"%s\"", sTopic); GWEN_StringList_AppendString(sto->recvdTopicList, sTopic, 0, 1); } } } void _handleJsonTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue) { time_t now; GWEN_JSON_ELEM *json; now=time(NULL); json=GWEN_JsonElement_fromString(sValue); if (json) { AQH_VALUE *value; uint64_t topicId; topicId=AQH_MqttTopic_GetId(topic); value=AQH_Value_List_First(sto->valueList); while(value) { if (AQH_Value_GetTopicId(value)==topicId) { _handleValueForJsonElement(sto, topic, json, now, value); } value=AQH_Value_List_Next(value); } } else { DBG_ERROR(AQH_LOGDOMAIN, "Error parsing JSON data [%s]", sValue); } } void _handleValueForJsonElement(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, GWEN_JSON_ELEM *json, time_t timestamp, const AQH_VALUE *value) { const char *dataPath; dataPath=AQH_Value_GetDataPath(value); if (dataPath) { GWEN_JSON_ELEM *jsonElem; jsonElem=GWEN_JsonElement_GetElementByPath(json, dataPath, GWEN_PATH_FLAGS_PATHMUSTEXIST); if (jsonElem) { const char *s; s=GWEN_JsonElement_GetData(jsonElem); if (s && *s) { int rv; double v; rv=GWEN_Text_StringToDouble(s, &v); if (rv<0) { DBG_ERROR(AQH_LOGDOMAIN, "Invalid value \"%s\" for topic %s:%s", s, AQH_MqttTopic_GetTopic(topic), dataPath); } else { AQH_DATAFILE *df; df=_getDataFileByValueId(sto, AQH_Value_GetId(value)); if (df) { DBG_INFO(AQH_LOGDOMAIN, "Appending record to datafile"); AQH_DataFile_AppendRecord(df, timestamp, v); } } } else { DBG_INFO(AQH_LOGDOMAIN, "Empty JSON element \"%s\"", dataPath); } } else { DBG_ERROR(AQH_LOGDOMAIN, "No value for topic %s:%s", AQH_MqttTopic_GetTopic(topic), dataPath); } } else { DBG_INFO(AQH_LOGDOMAIN, "No datapath in value \"%s\"", AQH_Value_GetName(value)); } } void _handleNumTopic(AQH_STORAGE *sto, const AQH_MQTT_TOPIC *topic, const char *sValue) { int rv; double v; time_t now; now=time(NULL); rv=GWEN_Text_StringToDouble(sValue, &v); if (rv<0) { DBG_ERROR(AQH_LOGDOMAIN, "Invalid value \"%s\" for topic %s, ignoring", sValue, AQH_MqttTopic_GetTopic(topic)); } else { AQH_VALUE *value; uint64_t topicId; topicId=AQH_MqttTopic_GetId(topic); value=AQH_Value_List_First(sto->valueList); while(value) { if (AQH_Value_GetTopicId(value)==topicId) break; value=AQH_Value_List_Next(value); } if (value) { AQH_DATAFILE *df; df=_getDataFileByValueId(sto, AQH_Value_GetId(value)); if (df) { DBG_INFO(AQH_LOGDOMAIN, "Appending record to datafile"); AQH_DataFile_AppendRecord(df, now, v); } } } } AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId) { AQH_DATAFILE *df; df=_findDataFileByValueId(sto, valueId); if (df==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Datafile for valueId \"%lu\" not in list, loading", (unsigned long int) valueId); df=_openOrCreateDataFileByValueId(sto, valueId); if (df==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Error opening/creating datafile for valueId \"%lu\"", (unsigned long int) valueId); return NULL; } DBG_INFO(AQH_LOGDOMAIN, "Adding datafile for valueId \"%lu\" to list", (unsigned long int) valueId); AQH_DataFile_List_Add(df, sto->dataFileList); } return df; } AQH_DATAFILE *_findDataFileByValueId(const AQH_STORAGE *sto, uint64_t valueId) { if (sto && sto->dataFileList) return AQH_DataFile_List_GetByValueId(sto->dataFileList, valueId); return NULL; } AQH_DATAFILE *_openOrCreateDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId) { AQH_DATAFILE *df; GWEN_BUFFER *nameBuf; int rv; nameBuf=_getDataFilePathForValueId(sto, valueId); df=AQH_DataFile_new(GWEN_Buffer_GetStart(nameBuf), valueId); rv=GWEN_Directory_GetPath(GWEN_Buffer_GetStart(nameBuf), GWEN_PATH_FLAGS_CHECKROOT | GWEN_PATH_FLAGS_PATHMUSTEXIST | GWEN_PATH_FLAGS_NAMEMUSTEXIST | GWEN_PATH_FLAGS_VARIABLE); if (rv==0) { DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" exists, trying to open", GWEN_Buffer_GetStart(nameBuf)); rv=AQH_DataFile_Open(df); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error opening data file \"%s\" (%d)", GWEN_Buffer_GetStart(nameBuf), rv); GWEN_Buffer_free(nameBuf); return NULL; } } else { DBG_INFO(AQH_LOGDOMAIN, "Creating file \"%s\"", GWEN_Buffer_GetStart(nameBuf)); rv=AQH_DataFile_Create(df); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error creating data file \"%s\" (%d)", GWEN_Buffer_GetStart(nameBuf), rv); GWEN_Buffer_free(nameBuf); return NULL; } } GWEN_Buffer_free(nameBuf); return df; } GWEN_BUFFER *_getDataFilePathForValueId(const AQH_STORAGE *sto, uint64_t valueId) { GWEN_BUFFER *buf; buf=GWEN_Buffer_new(0, 256, 0, 1); if (sto->dataFileFolder) { GWEN_Buffer_AppendString(buf, sto->dataFileFolder); GWEN_Buffer_AppendString(buf, GWEN_DIR_SEPARATOR_S); } GWEN_Buffer_AppendArgs(buf, "%" PRIu64 ".data", valueId); return buf; }