diff --git a/apps/aqhome-storage/0BUILD b/apps/aqhome-storage/0BUILD index 2d39297..a11e87e 100644 --- a/apps/aqhome-storage/0BUILD +++ b/apps/aqhome-storage/0BUILD @@ -39,9 +39,11 @@ aqhomestorage.h init.h init_http.h + init_mqtt.h fini.h loop.h loop_http.h + loop_mqtt.h cleanup.h u_base.h u_login.h @@ -63,9 +65,11 @@ aqhomestorage.c init.c init_http.c + init_mqtt.c fini.c loop.c loop_http.c + loop_mqtt.c cleanup.c u_base.c u_login.c diff --git a/apps/aqhome-storage/README b/apps/aqhome-storage/README new file mode 100644 index 0000000..a834ceb --- /dev/null +++ b/apps/aqhome-storage/README @@ -0,0 +1,30 @@ + +TODO + +- isolate storage service: + - remove http service from here + - add ipc service + - admin: + - add room/device/MQTT topic/value + - edit room/device/MQTT topic/value + - del room/device/MQTT topic/value + - get received topics + - list rooms/devices/MQTT topics/values + - getValues(valueId, timeFrom, timeUntil) + - addValue(valueId/valueName, timeStamp, value) + - aqhome-tool + - add ipc admin code to connect to ipc service +- create http service as stand-alone app or create PHP code which uses aqhome-tool + - connect to storage service for information/admin + + + +- move http service into own folder +- isolate functions: + - getRoomList + - getDeviceList + - getTopicList + - addRoom/Device/Topic/Value + - delRoom/Device/Topic/Value + - editRoom/Device/Topic/Value + diff --git a/apps/aqhome-storage/aqhomestorage_p.h b/apps/aqhome-storage/aqhomestorage_p.h index 76a2f2f..44cb278 100644 --- a/apps/aqhome-storage/aqhomestorage_p.h +++ b/apps/aqhome-storage/aqhomestorage_p.h @@ -24,6 +24,7 @@ #define AQHOME_STORAGE_DEFAULT_CONFIGDIR "/var/lib/aqhomestorage/config" #define AQHOME_STORAGE_DEFAULT_HTTP_SOURCEDIR "/var/lib/aqhomestorage/html" +#define AQHOME_STORAGE_DEFAULT_DATADIR "/var/lib/aqhomestorage/data" #define AQHOME_STORAGE_DEFAULT_STATEFILE "/var/lib/aqhomestorage/config/statefile" diff --git a/apps/aqhome-storage/init.c b/apps/aqhome-storage/init.c index d656862..da796eb 100644 --- a/apps/aqhome-storage/init.c +++ b/apps/aqhome-storage/init.c @@ -13,6 +13,7 @@ #include "./init.h" #include "./init_http.h" +#include "./init_mqtt.h" #include "./aqhomestorage_p.h" #include "./aqhomehttp.h" @@ -68,7 +69,6 @@ static int _setupFolders(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs); static int _setupStorage(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs); static void _setupIpc(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs); -static void _setupMqtt(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs); static GWEN_MSG_ENDPOINT *_acceptIpcFn(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKET *sk, const GWEN_INETADDRESS *addr, void *data); @@ -122,7 +122,13 @@ int AqHomeStorage_Init(AQHOME_STORAGE *aqh, int argc, char **argv) } _setupIpc(aqh, dbArgs); - _setupMqtt(aqh, dbArgs); + + rv=AqHomeStorage_SetupMqtt(aqh, dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + rv=AqHomeStorage_SetupHttp(aqh, dbArgs); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); @@ -184,8 +190,10 @@ int _setupFolders(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs) int _setupStorage(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs) { + const char *dataFolder; const char *stateFile; + dataFolder=GWEN_DB_GetCharValue(dbArgs, "dataFolder", 0, AQHOME_STORAGE_DEFAULT_DATADIR); stateFile=GWEN_DB_GetCharValue(dbArgs, "stateFile", 0, NULL); if (stateFile && *stateFile) { AQH_STORAGE *sto; @@ -194,6 +202,8 @@ int _setupStorage(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs) sto=AQH_Storage_new(); AQH_Storage_SetStateFile(sto, stateFile); + AQH_Storage_SetDataFileFolder(sto, (dataFolder && *dataFolder)?dataFolder:NULL); + rv=AQH_Storage_Init(sto); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); @@ -232,37 +242,6 @@ void _setupIpc(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs) -void _setupMqtt(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs) -{ - const char *mqttAddress; - int mqttPort; - const char *mqttClientId; - int mqttKeepAlive; - - mqttAddress=GWEN_DB_GetCharValue(dbArgs, "mqttAddress", 0, NULL); - mqttPort=GWEN_DB_GetIntValue(dbArgs, "mqttPort", 0, AQHOME_STORAGE_DEFAULT_MQTT_PORT); - mqttClientId=GWEN_DB_GetCharValue(dbArgs, "mqttClientId", 0, AQHOME_STORAGE_DEFAULT_MQTT_CLIENTID); - mqttKeepAlive=GWEN_DB_GetIntValue(dbArgs, "mqttKeepAlive", 0, AQHOME_STORAGE_DEFAULT_MQTT_KEEPALIVE); - - if (mqttAddress && *mqttAddress && mqttPort) { - GWEN_MSG_ENDPOINT *ep; - int rv; - - ep=AQH_MqttClientEndpoint_new(mqttClientId, mqttAddress, mqttPort, NULL, 0); - AQH_MqttClientEndpoint_SetKeepAliveTime(ep, mqttKeepAlive); - - GWEN_MsgEndpoint_Tree2_AddChild(aqh->rootEndpoint, ep); - aqh->mqttEndpoint=ep; - - rv=AQH_MqttClientEndpoint_StartConnect(ep); - if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { - DBG_ERROR(NULL, "Error connecting to MQTT server %s:%d (%d), will retry later", mqttAddress, mqttPort, rv); - } - } -} - - - GWEN_MSG_ENDPOINT *_acceptIpcFn(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKET *sk, const GWEN_INETADDRESS *addr, @@ -440,6 +419,17 @@ int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) I18S("Folder where static HTML source files are stored"), I18S("Folder where static HTML source files are stored") }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "datafolder", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + NULL, /* short option */ + "datafolder", /* long option */ + I18S("Folder where data files are stored"), + I18S("Folder where data files are stored") + }, { GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ GWEN_ArgsType_Char, /* type */ diff --git a/apps/aqhome-storage/init_mqtt.c b/apps/aqhome-storage/init_mqtt.c new file mode 100644 index 0000000..acf0465 --- /dev/null +++ b/apps/aqhome-storage/init_mqtt.c @@ -0,0 +1,212 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./init_mqtt.h" +#include "./aqhomestorage_p.h" +#include "./aqhomehttp.h" +#include "./u_login.h" +#include "./u_rooms.h" +#include "./u_devices.h" +#include "./u_mqtttopics.h" +#include "./u_values.h" +#include "./u_static.h" + +#include "aqhome/msg/endpoint_tty.h" +#include "aqhome/ipc/endpoint_ipc.h" +#include "aqhome/mqtt/endpoint_mqttc.h" +#include +#include +#include "aqhome/http/endpoint_http.h" +#include "aqhome/http/httpservice_conf.h" +#include "aqhome/http/httpservice_http.h" +#include "aqhome/http/httpservice.h" +#include "aqhome/http/content_files.h" + +#include +#include +#include +#include +#include +#include +#include + +#ifdef HAVE_SYS_TYPES_H +# include +#endif + +#ifdef HAVE_SYS_STAT_H +# include +#endif + +#include +#include +#include +#include +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + +//#define I18N(msg) msg +#define I18S(msg) msg + +#define AQHOME_STORAGE_DEFAULT_CMDTIMEOUT 10000 + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static int _mqttConnect(GWEN_MSG_ENDPOINT *epTcp); +static int _subscribe(AQHOME_STORAGE *aqh, const char *topicFilter); +static GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int timeoutInSeconds); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + + +int AqHomeStorage_SetupMqtt(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs) +{ + const char *mqttAddress; + int mqttPort; + const char *mqttClientId; + int mqttKeepAlive; + + mqttAddress=GWEN_DB_GetCharValue(dbArgs, "mqttAddress", 0, NULL); + mqttPort=GWEN_DB_GetIntValue(dbArgs, "mqttPort", 0, AQHOME_STORAGE_DEFAULT_MQTT_PORT); + mqttClientId=GWEN_DB_GetCharValue(dbArgs, "mqttClientId", 0, AQHOME_STORAGE_DEFAULT_MQTT_CLIENTID); + mqttKeepAlive=GWEN_DB_GetIntValue(dbArgs, "mqttKeepAlive", 0, AQHOME_STORAGE_DEFAULT_MQTT_KEEPALIVE); + + if (mqttAddress && *mqttAddress && mqttPort) { + GWEN_MSG_ENDPOINT *ep; + int rv; + + ep=AQH_MqttClientEndpoint_new(mqttClientId, mqttAddress, mqttPort, NULL, 0); + AQH_MqttClientEndpoint_SetKeepAliveTime(ep, mqttKeepAlive); + + GWEN_MsgEndpoint_Tree2_AddChild(aqh->rootEndpoint, ep); + aqh->mqttEndpoint=ep; + + rv=_mqttConnect(ep); + if (rv<0) { + DBG_ERROR(NULL, "Error connecting to MQTT server %s:%d (%d)", mqttAddress, mqttPort, rv); + return rv; + } + + rv=_subscribe(aqh, "#"); + if (rv<0) { + DBG_ERROR(NULL, "Error subscribingconnecting to MQTT server %s:%d (%d)", mqttAddress, mqttPort, rv); + return rv; + } + } + return 0; +} + + + +int _mqttConnect(GWEN_MSG_ENDPOINT *epTcp) +{ + if (GWEN_MsgEndpoint_GetState(epTcp)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { + int rv; + + rv=AQH_MqttClientEndpoint_StartConnect(epTcp); + if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { + DBG_ERROR(NULL, "Error starting to connect (%d)", rv); + return rv; + } + } + + while(GWEN_MsgEndpoint_GetState(epTcp)!=GWEN_MSG_ENDPOINT_STATE_CONNECTED) { + DBG_DEBUG(NULL, "Next loop"); + GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */ + } + return 0; +} + + + +int _subscribe(AQHOME_STORAGE *aqh, const char *topicFilter) +{ + uint16_t pckId; + GWEN_MSG *msgOut; + GWEN_MSG *msgIn; + + DBG_INFO(NULL, "Sending SUBSCRIBE %s", topicFilter); + pckId=AQH_MqttClientEndpoint_GetNextPacketId(aqh->mqttEndpoint); + msgOut=GWEN_SubscribeMqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE, pckId, topicFilter, 0); + if (msgOut==NULL) { + DBG_ERROR(NULL, "Error creating message"); + return GWEN_ERROR_INTERNAL; + } + GWEN_MsgEndpoint_AddSendMessage(aqh->mqttEndpoint, msgOut); + + DBG_INFO(NULL, "Waiting for response"); + msgIn=_awaitPacket(aqh->mqttEndpoint, AQH_MQTTMSG_MSGTYPE_SUBACK, AQHOME_STORAGE_DEFAULT_CMDTIMEOUT); + if (msgIn) { + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_SubAckMqttMsg_DumpToBuffer(msgIn, buf, "received"); + DBG_INFO(NULL, "%s", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + GWEN_Msg_free(msgIn); + } + + return 0; +} + + + +GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int timeoutInSeconds) +{ + time_t startTime; + + startTime=time(NULL); + + for (;;) { + GWEN_MSG *msg; + time_t now; + + GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */ + msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp); + if (msg) { + if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(expectedPacketType & 0xf0)) { + return msg; + } + else { + DBG_ERROR(NULL, "Received this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); + } + GWEN_Msg_free(msg); + } + now=time(NULL); + if (now-startTime>timeoutInSeconds) { + DBG_INFO(NULL, "Timeout"); + break; + } + } + + return NULL; +} + diff --git a/apps/aqhome-storage/init_mqtt.h b/apps/aqhome-storage/init_mqtt.h new file mode 100644 index 0000000..5924fb7 --- /dev/null +++ b/apps/aqhome-storage/init_mqtt.h @@ -0,0 +1,25 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_STORAGE_INIT_MQTT_H +#define AQHOME_STORAGE_INIT_MQTT_H + + +#include "./aqhomestorage.h" + + + +int AqHomeStorage_SetupMqtt(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs); + + + + + +#endif + + diff --git a/apps/aqhome-storage/loop.c b/apps/aqhome-storage/loop.c index aba35b9..3db6210 100644 --- a/apps/aqhome-storage/loop.c +++ b/apps/aqhome-storage/loop.c @@ -13,6 +13,7 @@ #include "./loop.h" #include "./loop_http.h" +#include "./loop_mqtt.h" #include "./aqhomehttp.h" #include "./aqhomestorage_p.h" #include "aqhome/http/httpservice_conf.h" @@ -50,6 +51,8 @@ void AqHomeStorage_Loop(AQHOME_STORAGE *aqh, int timeoutInMsecs) if (aqh) { GWEN_MsgEndpoint_ChildrenIoLoop(aqh->rootEndpoint, timeoutInMsecs); AqHomeStorage_ReadAndHandleHttpMessages(aqh); + AqHomeStorage_ReadAndHandleMqttMessages(aqh); + // AqHomeStorage_ReadAndHandleIpcMessages(aqh); } } diff --git a/apps/aqhome-storage/loop_mqtt.c b/apps/aqhome-storage/loop_mqtt.c new file mode 100644 index 0000000..2f6d378 --- /dev/null +++ b/apps/aqhome-storage/loop_mqtt.c @@ -0,0 +1,110 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./loop_mqtt.h" +#include "./aqhomestorage_p.h" + +#include "aqhome/mqtt/msg_mqtt_publish.h" + +#include +#include + + + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + +//#define I18N(msg) msg +#define I18S(msg) msg + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void _handlePublishMsg(AQHOME_STORAGE *aqh, GWEN_MSG *msg); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + + + +void AqHomeStorage_ReadAndHandleMqttMessages(AQHOME_STORAGE *aqh) +{ + GWEN_MSG *msg; + + while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(aqh->mqttEndpoint)) ) { + if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) { + _handlePublishMsg(aqh, msg); + } + else if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0)) { + DBG_INFO(AQH_LOGDOMAIN, "PING response received"); + } + else { + DBG_INFO(NULL, "Received unexpected MQTT message %02x", AQH_MqttMsg_GetMsgTypeAndFlags(msg)); + } + GWEN_Msg_free(msg); + } +} + + + +int AqHomeStorage_MqttPing(AQHOME_STORAGE *aqh) +{ + GWEN_MSG *msgOut; + + DBG_INFO(AQH_LOGDOMAIN, "Sending PING"); + msgOut=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL); + if (msgOut==NULL) { + DBG_ERROR(NULL, "Error creating message"); + return GWEN_ERROR_INTERNAL; + } + GWEN_MsgEndpoint_AddSendMessage(aqh->mqttEndpoint, msgOut); + + return 0; +} + + + + + + +void _handlePublishMsg(AQHOME_STORAGE *aqh, GWEN_MSG *msg) +{ + char *topic; + char *value; + + topic=AQH_PublishMqttMsg_ExtractTopic(msg); + value=AQH_PublishMqttMsg_ExtractValue(msg); + + if (topic && value) + AQH_Storage_HandleMqttPublish(aqh->storage, topic, value); + else { + DBG_ERROR(NULL, "Either topic or value missing in PUBLISH msg"); + } + free(value); + free(topic); +} + + + + diff --git a/apps/aqhome-storage/loop_mqtt.h b/apps/aqhome-storage/loop_mqtt.h new file mode 100644 index 0000000..420bfe8 --- /dev/null +++ b/apps/aqhome-storage/loop_mqtt.h @@ -0,0 +1,23 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_STORAGE_MQTT_H +#define AQHOME_STORAGE_MQTT_H + + +#include "./aqhomestorage.h" + + +void AqHomeStorage_ReadAndHandleMqttMessages(AQHOME_STORAGE *aqh); + +int AqHomeStorage_MqttPing(AQHOME_STORAGE *aqh); + + +#endif + + diff --git a/apps/aqhome-storage/main.c b/apps/aqhome-storage/main.c index d241c03..0c5b9ec 100644 --- a/apps/aqhome-storage/main.c +++ b/apps/aqhome-storage/main.c @@ -17,6 +17,7 @@ #include "./init.h" #include "./fini.h" #include "./loop.h" +#include "./loop_mqtt.h" #include "./cleanup.h" #include @@ -35,6 +36,7 @@ #define CLEANUP_INTERVAL_IN_SECS (60) #define WRITE_INTERVAL_IN_SECS (60) +#define PING_INTERVAL 120 @@ -136,12 +138,14 @@ void _runService(AQHOME_STORAGE *aqh) time_t timeStart; time_t timeLastCleanup; time_t timeLastWrite; + time_t timeLastPing; int timeout; timeout=AqHomeStorage_GetTimeout(aqh); timeStart=time(NULL); timeLastCleanup=time(NULL); timeLastWrite=time(NULL); + timeLastPing=time(NULL); while(!stopService) { time_t now; @@ -163,6 +167,12 @@ void _runService(AQHOME_STORAGE *aqh) timeLastWrite=now; } + if (timeout && ((int)difftime(now, timeLastPing))>timeout) { + DBG_INFO(NULL, "Sending ping"); + AqHomeStorage_MqttPing(aqh); + timeLastPing=now; + } + if (timeout && ((int)difftime(now, timeStart))>timeout) { DBG_INFO(NULL, "Timeout"); diff --git a/apps/aqhome-storage/u_mqtttopics.c b/apps/aqhome-storage/u_mqtttopics.c index 4c0fe2b..34cb5ca 100644 --- a/apps/aqhome-storage/u_mqtttopics.c +++ b/apps/aqhome-storage/u_mqtttopics.c @@ -204,6 +204,7 @@ void _writeEditingTable(AQH_HTTP_URLHANDLER *uh, GWEN_DB_NODE *dbValues, GWEN_BU { AQH_SERVICE *sv; AQH_STORAGE *sto; + const GWEN_STRINGLIST *seenTopicsList; const AQH_DEVICE_LIST *deviceList; unsigned long int selectedDeviceId=0; int dataType; @@ -220,21 +221,40 @@ void _writeEditingTable(AQH_HTTP_URLHANDLER *uh, GWEN_DB_NODE *dbValues, GWEN_BU /* topic */ GWEN_Buffer_AppendArgs(pageBuf, - " " - " " - " " - " " - " ", + "
" + "" + " " + " "); + + + /* device */ GWEN_Buffer_AppendArgs(pageBuf, "" ""); + GWEN_Buffer_AppendString(pageBuf, "\n"); /* data type */ GWEN_Buffer_AppendArgs(pageBuf, @@ -262,16 +282,35 @@ void _writeEditingTable(AQH_HTTP_URLHANDLER *uh, GWEN_DB_NODE *dbValues, GWEN_BU ""); + GWEN_Buffer_AppendArgs(pageBuf, "", AQH_MqttTopicType_Json, I18N("JSON")); + GWEN_Buffer_AppendString(pageBuf, "\n"); - GWEN_Buffer_AppendString(pageBuf, "
" + " \n" + " ", I18N("MQTT Topic"), dbValues?GWEN_DB_GetCharValue(dbValues, "topic", 0, ""):""); + seenTopicsList=AQH_Storage_GetRecvdTopicList(sto); + if (seenTopicsList && GWEN_StringList_Count(seenTopicsList)) { + GWEN_STRINGLISTENTRY *se; + se=GWEN_StringList_FirstEntry(seenTopicsList); + while(se) { + const char *s=GWEN_StringListEntry_Data(se); + if (s && *s) { + DBG_INFO(NULL, "Adding MQTT string %s", s); + GWEN_Buffer_AppendArgs(pageBuf, "
"); + + if (seenTopicsList && GWEN_StringList_Count(seenTopicsList)) { + GWEN_STRINGLISTENTRY *se; + + GWEN_Buffer_AppendArgs(pageBuf, "", I18N("Received Topics")); + GWEN_Buffer_AppendString(pageBuf, ""); + + se=GWEN_StringList_FirstEntry(seenTopicsList); + while(se) { + const char *s=GWEN_StringListEntry_Data(se); + if (s && *s) + GWEN_Buffer_AppendArgs(pageBuf, "%s
\n", s); + se=GWEN_StringListEntry_Next(se); + } + GWEN_Buffer_AppendString(pageBuf, "
"); + } + + + + GWEN_Buffer_AppendString(pageBuf, "\n"); } diff --git a/aqhome-storage.sh b/aqhome-storage.sh index 35003b2..fd53e3f 100755 --- a/aqhome-storage.sh +++ b/aqhome-storage.sh @@ -7,6 +7,7 @@ export LD_LIBRARY_PATH="0-build/aqhome/:$LD_LIBRARY_PATH" 0-build/apps/aqhome-storage/aqhome-storage \ --sourcefolder=apps/aqhome-storage/test/html \ + --datafolder=apps/aqhome-storage/test/data \ -D apps/aqhome-storage/test/config \ --statefile=apps/aqhome-storage/test/config/state \ -ma 192.168.117.192 -mp 1883 --mqttclientid=AQHOMESTORAGETEST \ diff --git a/aqhome/data/datafile.c b/aqhome/data/datafile.c index 15b6ae3..a66d465 100644 --- a/aqhome/data/datafile.c +++ b/aqhome/data/datafile.c @@ -105,6 +105,9 @@ int AQH_DataFile_Create(AQH_DATAFILE *df) } sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew); + GWEN_SyncIo_SetFlags(sio, + GWEN_SYNCIO_FILE_FLAGS_WRITE | GWEN_SYNCIO_FILE_FLAGS_READ | + GWEN_SYNCIO_FILE_FLAGS_UREAD | GWEN_SYNCIO_FILE_FLAGS_UWRITE); rv=GWEN_SyncIo_Connect(sio); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv); @@ -143,7 +146,8 @@ int AQH_DataFile_Open(AQH_DATAFILE *df) return GWEN_ERROR_INVALID; } - sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew); + sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_OpenExisting); + GWEN_SyncIo_SetFlags(sio, GWEN_SYNCIO_FILE_FLAGS_WRITE | GWEN_SYNCIO_FILE_FLAGS_READ); rv=GWEN_SyncIo_Connect(sio); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv); @@ -279,7 +283,7 @@ AQH_DATAFILE *AQH_DataFile_List_GetByValueId(AQH_DATAFILE_LIST *fileList, uint64 df=AQH_DataFile_List_First(fileList); while(df) { if (df->valueId==id) - break; + return df; df=AQH_DataFile_List_Next(df); } } diff --git a/aqhome/data/storage.c b/aqhome/data/storage.c index e702512..b1f11fd 100644 --- a/aqhome/data/storage.c +++ b/aqhome/data/storage.c @@ -367,6 +367,7 @@ void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *sTopic, const c topic=AQH_Storage_GetMqttTopicByTopic(sto, sTopic); if (topic) { + DBG_INFO(AQH_LOGDOMAIN, "Handling MQTT topic \"%s\"", sTopic); if (AQH_MqttTopic_GetDataType(topic)==AQH_MqttTopicType_Json) _handleJsonTopic(sto, topic, sValue); else @@ -502,7 +503,13 @@ AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId) df=_findDataFileByValueId(sto, valueId); if (df==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "Datafile for valueId \"%lu\" not in list, loading", (unsigned long int) valueId); df=_openOrCreateDataFileByValueId(sto, valueId); + if (df==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error opening/creating datafile for valueId \"%lu\"", (unsigned long int) valueId); + return NULL; + } + DBG_INFO(AQH_LOGDOMAIN, "Adding datafile for valueId \"%lu\" to list", (unsigned long int) valueId); AQH_DataFile_List_Add(df, sto->dataFileList); } diff --git a/aqhome/http/endpoint_http.c b/aqhome/http/endpoint_http.c index 0671ac4..3cdd296 100644 --- a/aqhome/http/endpoint_http.c +++ b/aqhome/http/endpoint_http.c @@ -522,7 +522,8 @@ void _abortMessage(GWEN_MSG_ENDPOINT *ep) DBG_DEBUG(AQH_LOGDOMAIN, "Aborting message (if any)."); xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_HTTP, ep); - GWEN_Buffer_Reset(xep->currentReadBuffer); + if (xep->currentReadBuffer) + GWEN_Buffer_Reset(xep->currentReadBuffer); xep->currentHeaderPos=0; xep->currentBodyPos=0; xep->currentBodySize=0; diff --git a/aqhome/http/httprequest.c b/aqhome/http/httprequest.c index e878236..ed4537b 100644 --- a/aqhome/http/httprequest.c +++ b/aqhome/http/httprequest.c @@ -657,7 +657,9 @@ int _unescapeUrlEncoded(const char *src, unsigned int srclen, char *buffer, unsi x=='.' || x=='*' || x=='?' || - x=='+' + x=='+' || + x=='-' || + x=='_' ) { if (size<(maxsize-1)) { buffer[size++]=(x=='+')?' ':x; diff --git a/vg_calls.sh b/vg_calls.sh index 65ae566..4b32eb7 100755 --- a/vg_calls.sh +++ b/vg_calls.sh @@ -9,6 +9,7 @@ valgrind --tool=callgrind --trace-children=yes --dump-instr=yes --collect-jumps= 0-build/apps/aqhome-storage/aqhome-storage \ --sourcefolder=apps/aqhome-storage/test/html \ -D apps/aqhome-storage/test/config \ + --datafolder=apps/aqhome-storage/test/data \ --statefile=apps/aqhome-storage/test/config/state \ -ma 192.168.117.192 -mp 1883 --mqttclientid=AQHOMESTORAGEVG \ -ha 127.0.0.1 -hp 1884 \ diff --git a/vg_storage.sh b/vg_storage.sh index 68de835..f95e259 100755 --- a/vg_storage.sh +++ b/vg_storage.sh @@ -12,6 +12,7 @@ valgrind \ --track-origins=yes --num-callers=50 --keep-stacktraces=alloc-and-free \ 0-build/apps/aqhome-storage/aqhome-storage \ --sourcefolder=apps/aqhome-storage/test/html \ + --datafolder=apps/aqhome-storage/test/data \ -D apps/aqhome-storage/test/config \ --statefile=apps/aqhome-storage/test/config/state \ -ma 192.168.117.192 -mp 1883 --mqttclientid=AQHOMESTORAGEVG \