aqhome-storage now checks and parses mqtt messages and stores values in datafiles.

This commit is contained in:
Martin Preuss
2023-08-12 16:55:06 +02:00
parent bcd3e3325c
commit a4c0f2e6fd
18 changed files with 517 additions and 53 deletions

View File

@@ -39,9 +39,11 @@
aqhomestorage.h aqhomestorage.h
init.h init.h
init_http.h init_http.h
init_mqtt.h
fini.h fini.h
loop.h loop.h
loop_http.h loop_http.h
loop_mqtt.h
cleanup.h cleanup.h
u_base.h u_base.h
u_login.h u_login.h
@@ -63,9 +65,11 @@
aqhomestorage.c aqhomestorage.c
init.c init.c
init_http.c init_http.c
init_mqtt.c
fini.c fini.c
loop.c loop.c
loop_http.c loop_http.c
loop_mqtt.c
cleanup.c cleanup.c
u_base.c u_base.c
u_login.c u_login.c

View File

@@ -0,0 +1,30 @@
TODO
- isolate storage service:
- remove http service from here
- add ipc service
- admin:
- add room/device/MQTT topic/value
- edit room/device/MQTT topic/value
- del room/device/MQTT topic/value
- get received topics
- list rooms/devices/MQTT topics/values
- getValues(valueId, timeFrom, timeUntil)
- addValue(valueId/valueName, timeStamp, value)
- aqhome-tool
- add ipc admin code to connect to ipc service
- create http service as stand-alone app or create PHP code which uses aqhome-tool
- connect to storage service for information/admin
- move http service into own folder
- isolate functions:
- getRoomList
- getDeviceList
- getTopicList
- addRoom/Device/Topic/Value
- delRoom/Device/Topic/Value
- editRoom/Device/Topic/Value

View File

@@ -24,6 +24,7 @@
#define AQHOME_STORAGE_DEFAULT_CONFIGDIR "/var/lib/aqhomestorage/config" #define AQHOME_STORAGE_DEFAULT_CONFIGDIR "/var/lib/aqhomestorage/config"
#define AQHOME_STORAGE_DEFAULT_HTTP_SOURCEDIR "/var/lib/aqhomestorage/html" #define AQHOME_STORAGE_DEFAULT_HTTP_SOURCEDIR "/var/lib/aqhomestorage/html"
#define AQHOME_STORAGE_DEFAULT_DATADIR "/var/lib/aqhomestorage/data"
#define AQHOME_STORAGE_DEFAULT_STATEFILE "/var/lib/aqhomestorage/config/statefile" #define AQHOME_STORAGE_DEFAULT_STATEFILE "/var/lib/aqhomestorage/config/statefile"

View File

@@ -13,6 +13,7 @@
#include "./init.h" #include "./init.h"
#include "./init_http.h" #include "./init_http.h"
#include "./init_mqtt.h"
#include "./aqhomestorage_p.h" #include "./aqhomestorage_p.h"
#include "./aqhomehttp.h" #include "./aqhomehttp.h"
@@ -68,7 +69,6 @@ static int _setupFolders(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs);
static int _setupStorage(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs); static int _setupStorage(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs);
static void _setupIpc(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs); static void _setupIpc(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs);
static void _setupMqtt(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs);
static GWEN_MSG_ENDPOINT *_acceptIpcFn(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKET *sk, const GWEN_INETADDRESS *addr, void *data); static GWEN_MSG_ENDPOINT *_acceptIpcFn(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKET *sk, const GWEN_INETADDRESS *addr, void *data);
@@ -122,7 +122,13 @@ int AqHomeStorage_Init(AQHOME_STORAGE *aqh, int argc, char **argv)
} }
_setupIpc(aqh, dbArgs); _setupIpc(aqh, dbArgs);
_setupMqtt(aqh, dbArgs);
rv=AqHomeStorage_SetupMqtt(aqh, dbArgs);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
return rv;
}
rv=AqHomeStorage_SetupHttp(aqh, dbArgs); rv=AqHomeStorage_SetupHttp(aqh, dbArgs);
if (rv<0) { if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv); DBG_INFO(NULL, "here (%d)", rv);
@@ -184,8 +190,10 @@ int _setupFolders(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs)
int _setupStorage(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs) int _setupStorage(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs)
{ {
const char *dataFolder;
const char *stateFile; const char *stateFile;
dataFolder=GWEN_DB_GetCharValue(dbArgs, "dataFolder", 0, AQHOME_STORAGE_DEFAULT_DATADIR);
stateFile=GWEN_DB_GetCharValue(dbArgs, "stateFile", 0, NULL); stateFile=GWEN_DB_GetCharValue(dbArgs, "stateFile", 0, NULL);
if (stateFile && *stateFile) { if (stateFile && *stateFile) {
AQH_STORAGE *sto; AQH_STORAGE *sto;
@@ -194,6 +202,8 @@ int _setupStorage(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs)
sto=AQH_Storage_new(); sto=AQH_Storage_new();
AQH_Storage_SetStateFile(sto, stateFile); AQH_Storage_SetStateFile(sto, stateFile);
AQH_Storage_SetDataFileFolder(sto, (dataFolder && *dataFolder)?dataFolder:NULL);
rv=AQH_Storage_Init(sto); rv=AQH_Storage_Init(sto);
if (rv<0) { if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv); DBG_INFO(NULL, "here (%d)", rv);
@@ -232,37 +242,6 @@ void _setupIpc(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs)
void _setupMqtt(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs)
{
const char *mqttAddress;
int mqttPort;
const char *mqttClientId;
int mqttKeepAlive;
mqttAddress=GWEN_DB_GetCharValue(dbArgs, "mqttAddress", 0, NULL);
mqttPort=GWEN_DB_GetIntValue(dbArgs, "mqttPort", 0, AQHOME_STORAGE_DEFAULT_MQTT_PORT);
mqttClientId=GWEN_DB_GetCharValue(dbArgs, "mqttClientId", 0, AQHOME_STORAGE_DEFAULT_MQTT_CLIENTID);
mqttKeepAlive=GWEN_DB_GetIntValue(dbArgs, "mqttKeepAlive", 0, AQHOME_STORAGE_DEFAULT_MQTT_KEEPALIVE);
if (mqttAddress && *mqttAddress && mqttPort) {
GWEN_MSG_ENDPOINT *ep;
int rv;
ep=AQH_MqttClientEndpoint_new(mqttClientId, mqttAddress, mqttPort, NULL, 0);
AQH_MqttClientEndpoint_SetKeepAliveTime(ep, mqttKeepAlive);
GWEN_MsgEndpoint_Tree2_AddChild(aqh->rootEndpoint, ep);
aqh->mqttEndpoint=ep;
rv=AQH_MqttClientEndpoint_StartConnect(ep);
if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) {
DBG_ERROR(NULL, "Error connecting to MQTT server %s:%d (%d), will retry later", mqttAddress, mqttPort, rv);
}
}
}
GWEN_MSG_ENDPOINT *_acceptIpcFn(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *_acceptIpcFn(GWEN_MSG_ENDPOINT *ep,
GWEN_SOCKET *sk, GWEN_SOCKET *sk,
const GWEN_INETADDRESS *addr, const GWEN_INETADDRESS *addr,
@@ -440,6 +419,17 @@ int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs)
I18S("Folder where static HTML source files are stored"), I18S("Folder where static HTML source files are stored"),
I18S("Folder where static HTML source files are stored") I18S("Folder where static HTML source files are stored")
}, },
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"datafolder", /* name */
0, /* minnum */
1, /* maxnum */
NULL, /* short option */
"datafolder", /* long option */
I18S("Folder where data files are stored"),
I18S("Folder where data files are stored")
},
{ {
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */ GWEN_ArgsType_Char, /* type */

View File

@@ -0,0 +1,212 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./init_mqtt.h"
#include "./aqhomestorage_p.h"
#include "./aqhomehttp.h"
#include "./u_login.h"
#include "./u_rooms.h"
#include "./u_devices.h"
#include "./u_mqtttopics.h"
#include "./u_values.h"
#include "./u_static.h"
#include "aqhome/msg/endpoint_tty.h"
#include "aqhome/ipc/endpoint_ipc.h"
#include "aqhome/mqtt/endpoint_mqttc.h"
#include <aqhome/mqtt/msg_mqtt_subscribe.h>
#include <aqhome/mqtt/msg_mqtt_suback.h>
#include "aqhome/http/endpoint_http.h"
#include "aqhome/http/httpservice_conf.h"
#include "aqhome/http/httpservice_http.h"
#include "aqhome/http/httpservice.h"
#include "aqhome/http/content_files.h"
#include <gwenhywfar/gwenhywfar.h>
#include <gwenhywfar/args.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endpoint_tcpd.h>
#include <gwenhywfar/endpoint_msgio.h>
#include <gwenhywfar/directory.h>
#include <gwenhywfar/text.h>
#ifdef HAVE_SYS_TYPES_H
# include <sys/types.h>
#endif
#ifdef HAVE_SYS_STAT_H
# include <sys/stat.h>
#endif
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <time.h>
/* ------------------------------------------------------------------------------------------------
* defines
* ------------------------------------------------------------------------------------------------
*/
//#define I18N(msg) msg
#define I18S(msg) msg
#define AQHOME_STORAGE_DEFAULT_CMDTIMEOUT 10000
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static int _mqttConnect(GWEN_MSG_ENDPOINT *epTcp);
static int _subscribe(AQHOME_STORAGE *aqh, const char *topicFilter);
static GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int timeoutInSeconds);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
int AqHomeStorage_SetupMqtt(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs)
{
const char *mqttAddress;
int mqttPort;
const char *mqttClientId;
int mqttKeepAlive;
mqttAddress=GWEN_DB_GetCharValue(dbArgs, "mqttAddress", 0, NULL);
mqttPort=GWEN_DB_GetIntValue(dbArgs, "mqttPort", 0, AQHOME_STORAGE_DEFAULT_MQTT_PORT);
mqttClientId=GWEN_DB_GetCharValue(dbArgs, "mqttClientId", 0, AQHOME_STORAGE_DEFAULT_MQTT_CLIENTID);
mqttKeepAlive=GWEN_DB_GetIntValue(dbArgs, "mqttKeepAlive", 0, AQHOME_STORAGE_DEFAULT_MQTT_KEEPALIVE);
if (mqttAddress && *mqttAddress && mqttPort) {
GWEN_MSG_ENDPOINT *ep;
int rv;
ep=AQH_MqttClientEndpoint_new(mqttClientId, mqttAddress, mqttPort, NULL, 0);
AQH_MqttClientEndpoint_SetKeepAliveTime(ep, mqttKeepAlive);
GWEN_MsgEndpoint_Tree2_AddChild(aqh->rootEndpoint, ep);
aqh->mqttEndpoint=ep;
rv=_mqttConnect(ep);
if (rv<0) {
DBG_ERROR(NULL, "Error connecting to MQTT server %s:%d (%d)", mqttAddress, mqttPort, rv);
return rv;
}
rv=_subscribe(aqh, "#");
if (rv<0) {
DBG_ERROR(NULL, "Error subscribingconnecting to MQTT server %s:%d (%d)", mqttAddress, mqttPort, rv);
return rv;
}
}
return 0;
}
int _mqttConnect(GWEN_MSG_ENDPOINT *epTcp)
{
if (GWEN_MsgEndpoint_GetState(epTcp)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) {
int rv;
rv=AQH_MqttClientEndpoint_StartConnect(epTcp);
if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) {
DBG_ERROR(NULL, "Error starting to connect (%d)", rv);
return rv;
}
}
while(GWEN_MsgEndpoint_GetState(epTcp)!=GWEN_MSG_ENDPOINT_STATE_CONNECTED) {
DBG_DEBUG(NULL, "Next loop");
GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
}
return 0;
}
int _subscribe(AQHOME_STORAGE *aqh, const char *topicFilter)
{
uint16_t pckId;
GWEN_MSG *msgOut;
GWEN_MSG *msgIn;
DBG_INFO(NULL, "Sending SUBSCRIBE %s", topicFilter);
pckId=AQH_MqttClientEndpoint_GetNextPacketId(aqh->mqttEndpoint);
msgOut=GWEN_SubscribeMqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE, pckId, topicFilter, 0);
if (msgOut==NULL) {
DBG_ERROR(NULL, "Error creating message");
return GWEN_ERROR_INTERNAL;
}
GWEN_MsgEndpoint_AddSendMessage(aqh->mqttEndpoint, msgOut);
DBG_INFO(NULL, "Waiting for response");
msgIn=_awaitPacket(aqh->mqttEndpoint, AQH_MQTTMSG_MSGTYPE_SUBACK, AQHOME_STORAGE_DEFAULT_CMDTIMEOUT);
if (msgIn) {
GWEN_BUFFER *buf;
buf=GWEN_Buffer_new(0, 256, 0, 1);
AQH_SubAckMqttMsg_DumpToBuffer(msgIn, buf, "received");
DBG_INFO(NULL, "%s", GWEN_Buffer_GetStart(buf));
GWEN_Buffer_free(buf);
GWEN_Msg_free(msgIn);
}
return 0;
}
GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int timeoutInSeconds)
{
time_t startTime;
startTime=time(NULL);
for (;;) {
GWEN_MSG *msg;
time_t now;
GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp);
if (msg) {
if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(expectedPacketType & 0xf0)) {
return msg;
}
else {
DBG_ERROR(NULL, "Received this message:");
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
}
GWEN_Msg_free(msg);
}
now=time(NULL);
if (now-startTime>timeoutInSeconds) {
DBG_INFO(NULL, "Timeout");
break;
}
}
return NULL;
}

View File

@@ -0,0 +1,25 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQHOME_STORAGE_INIT_MQTT_H
#define AQHOME_STORAGE_INIT_MQTT_H
#include "./aqhomestorage.h"
int AqHomeStorage_SetupMqtt(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs);
#endif

View File

@@ -13,6 +13,7 @@
#include "./loop.h" #include "./loop.h"
#include "./loop_http.h" #include "./loop_http.h"
#include "./loop_mqtt.h"
#include "./aqhomehttp.h" #include "./aqhomehttp.h"
#include "./aqhomestorage_p.h" #include "./aqhomestorage_p.h"
#include "aqhome/http/httpservice_conf.h" #include "aqhome/http/httpservice_conf.h"
@@ -50,6 +51,8 @@ void AqHomeStorage_Loop(AQHOME_STORAGE *aqh, int timeoutInMsecs)
if (aqh) { if (aqh) {
GWEN_MsgEndpoint_ChildrenIoLoop(aqh->rootEndpoint, timeoutInMsecs); GWEN_MsgEndpoint_ChildrenIoLoop(aqh->rootEndpoint, timeoutInMsecs);
AqHomeStorage_ReadAndHandleHttpMessages(aqh); AqHomeStorage_ReadAndHandleHttpMessages(aqh);
AqHomeStorage_ReadAndHandleMqttMessages(aqh);
// AqHomeStorage_ReadAndHandleIpcMessages(aqh); // AqHomeStorage_ReadAndHandleIpcMessages(aqh);
} }
} }

View File

@@ -0,0 +1,110 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./loop_mqtt.h"
#include "./aqhomestorage_p.h"
#include "aqhome/mqtt/msg_mqtt_publish.h"
#include <gwenhywfar/gwenhywfar.h>
#include <gwenhywfar/debug.h>
/* ------------------------------------------------------------------------------------------------
* defines
* ------------------------------------------------------------------------------------------------
*/
//#define I18N(msg) msg
#define I18S(msg) msg
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static void _handlePublishMsg(AQHOME_STORAGE *aqh, GWEN_MSG *msg);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
void AqHomeStorage_ReadAndHandleMqttMessages(AQHOME_STORAGE *aqh)
{
GWEN_MSG *msg;
while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(aqh->mqttEndpoint)) ) {
if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) {
_handlePublishMsg(aqh, msg);
}
else if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0)) {
DBG_INFO(AQH_LOGDOMAIN, "PING response received");
}
else {
DBG_INFO(NULL, "Received unexpected MQTT message %02x", AQH_MqttMsg_GetMsgTypeAndFlags(msg));
}
GWEN_Msg_free(msg);
}
}
int AqHomeStorage_MqttPing(AQHOME_STORAGE *aqh)
{
GWEN_MSG *msgOut;
DBG_INFO(AQH_LOGDOMAIN, "Sending PING");
msgOut=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL);
if (msgOut==NULL) {
DBG_ERROR(NULL, "Error creating message");
return GWEN_ERROR_INTERNAL;
}
GWEN_MsgEndpoint_AddSendMessage(aqh->mqttEndpoint, msgOut);
return 0;
}
void _handlePublishMsg(AQHOME_STORAGE *aqh, GWEN_MSG *msg)
{
char *topic;
char *value;
topic=AQH_PublishMqttMsg_ExtractTopic(msg);
value=AQH_PublishMqttMsg_ExtractValue(msg);
if (topic && value)
AQH_Storage_HandleMqttPublish(aqh->storage, topic, value);
else {
DBG_ERROR(NULL, "Either topic or value missing in PUBLISH msg");
}
free(value);
free(topic);
}

View File

@@ -0,0 +1,23 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQHOME_STORAGE_MQTT_H
#define AQHOME_STORAGE_MQTT_H
#include "./aqhomestorage.h"
void AqHomeStorage_ReadAndHandleMqttMessages(AQHOME_STORAGE *aqh);
int AqHomeStorage_MqttPing(AQHOME_STORAGE *aqh);
#endif

View File

@@ -17,6 +17,7 @@
#include "./init.h" #include "./init.h"
#include "./fini.h" #include "./fini.h"
#include "./loop.h" #include "./loop.h"
#include "./loop_mqtt.h"
#include "./cleanup.h" #include "./cleanup.h"
#include <gwenhywfar/gwenhywfar.h> #include <gwenhywfar/gwenhywfar.h>
@@ -35,6 +36,7 @@
#define CLEANUP_INTERVAL_IN_SECS (60) #define CLEANUP_INTERVAL_IN_SECS (60)
#define WRITE_INTERVAL_IN_SECS (60) #define WRITE_INTERVAL_IN_SECS (60)
#define PING_INTERVAL 120
@@ -136,12 +138,14 @@ void _runService(AQHOME_STORAGE *aqh)
time_t timeStart; time_t timeStart;
time_t timeLastCleanup; time_t timeLastCleanup;
time_t timeLastWrite; time_t timeLastWrite;
time_t timeLastPing;
int timeout; int timeout;
timeout=AqHomeStorage_GetTimeout(aqh); timeout=AqHomeStorage_GetTimeout(aqh);
timeStart=time(NULL); timeStart=time(NULL);
timeLastCleanup=time(NULL); timeLastCleanup=time(NULL);
timeLastWrite=time(NULL); timeLastWrite=time(NULL);
timeLastPing=time(NULL);
while(!stopService) { while(!stopService) {
time_t now; time_t now;
@@ -163,6 +167,12 @@ void _runService(AQHOME_STORAGE *aqh)
timeLastWrite=now; timeLastWrite=now;
} }
if (timeout && ((int)difftime(now, timeLastPing))>timeout) {
DBG_INFO(NULL, "Sending ping");
AqHomeStorage_MqttPing(aqh);
timeLastPing=now;
}
if (timeout && ((int)difftime(now, timeStart))>timeout) { if (timeout && ((int)difftime(now, timeStart))>timeout) {
DBG_INFO(NULL, "Timeout"); DBG_INFO(NULL, "Timeout");

View File

@@ -204,6 +204,7 @@ void _writeEditingTable(AQH_HTTP_URLHANDLER *uh, GWEN_DB_NODE *dbValues, GWEN_BU
{ {
AQH_SERVICE *sv; AQH_SERVICE *sv;
AQH_STORAGE *sto; AQH_STORAGE *sto;
const GWEN_STRINGLIST *seenTopicsList;
const AQH_DEVICE_LIST *deviceList; const AQH_DEVICE_LIST *deviceList;
unsigned long int selectedDeviceId=0; unsigned long int selectedDeviceId=0;
int dataType; int dataType;
@@ -223,18 +224,37 @@ void _writeEditingTable(AQH_HTTP_URLHANDLER *uh, GWEN_DB_NODE *dbValues, GWEN_BU
"<table>" "<table>"
"<tr>" "<tr>"
" <td><label for=\"topic\">%s: </label></td>" " <td><label for=\"topic\">%s: </label></td>"
" <td><input type=\"text\" name=\"topic\" value=\"%s\" required></td>" " <td>"
" </tr>", " <input list=\"seenTopics\" type=\"text\" name=\"topic\" id=\"topic\" value=\"%s\" size=\"64\" required>\n"
" <datalist id=\"seenTopics\">",
I18N("MQTT Topic"), I18N("MQTT Topic"),
dbValues?GWEN_DB_GetCharValue(dbValues, "topic", 0, ""):""); dbValues?GWEN_DB_GetCharValue(dbValues, "topic", 0, ""):"");
seenTopicsList=AQH_Storage_GetRecvdTopicList(sto);
if (seenTopicsList && GWEN_StringList_Count(seenTopicsList)) {
GWEN_STRINGLISTENTRY *se;
se=GWEN_StringList_FirstEntry(seenTopicsList);
while(se) {
const char *s=GWEN_StringListEntry_Data(se);
if (s && *s) {
DBG_INFO(NULL, "Adding MQTT string %s", s);
GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%s\">\n", s);
}
se=GWEN_StringListEntry_Next(se);
}
}
else {
}
GWEN_Buffer_AppendString(pageBuf, "</datalist></td></tr>");
/* device */
GWEN_Buffer_AppendArgs(pageBuf, GWEN_Buffer_AppendArgs(pageBuf,
"<tr><td><label for=\"deviceId\">%s: </label></td>" "<tr><td><label for=\"deviceId\">%s: </label></td>"
"<td><select id=\"deviceId\" name=\"deviceId\">", "<td><select id=\"deviceId\" name=\"deviceId\">",
I18N("Device")); I18N("Device"));
GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"0\">%s</option>", I18N("-- select device --")); GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"0\">%s</option>\n", I18N("-- select device --"));
/* device */
if (deviceList) { if (deviceList) {
const AQH_DEVICE *device; const AQH_DEVICE *device;
@@ -247,14 +267,14 @@ void _writeEditingTable(AQH_HTTP_URLHANDLER *uh, GWEN_DB_NODE *dbValues, GWEN_BU
snprintf(numbuf, sizeof(numbuf)-1, "%lu", (unsigned long int) AQH_Device_GetId(device)); snprintf(numbuf, sizeof(numbuf)-1, "%lu", (unsigned long int) AQH_Device_GetId(device));
numbuf[sizeof(numbuf)-1]=0; numbuf[sizeof(numbuf)-1]=0;
if (selectedDeviceId && AQH_Device_GetId(device)==selectedDeviceId) if (selectedDeviceId && AQH_Device_GetId(device)==selectedDeviceId)
GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%s\" selected>%s</option>", numbuf, deviceName); GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%s\" selected>%s</option>\n", numbuf, deviceName);
else else
GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%s\">%s</option>", numbuf, deviceName); GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%s\">%s</option>\n", numbuf, deviceName);
} }
device=AQH_Device_List_Next(device); device=AQH_Device_List_Next(device);
} }
} }
GWEN_Buffer_AppendString(pageBuf, "</select></td></tr>"); GWEN_Buffer_AppendString(pageBuf, "</select></td></tr>\n");
/* data type */ /* data type */
GWEN_Buffer_AppendArgs(pageBuf, GWEN_Buffer_AppendArgs(pageBuf,
@@ -262,16 +282,35 @@ void _writeEditingTable(AQH_HTTP_URLHANDLER *uh, GWEN_DB_NODE *dbValues, GWEN_BU
"<td><select id=\"dataType\" name=\"dataType\">", "<td><select id=\"dataType\" name=\"dataType\">",
I18N("Data Type")); I18N("Data Type"));
if (dataType==AQH_MqttTopicType_Num) if (dataType==AQH_MqttTopicType_Num)
GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%d\" selected>%s</option>", dataType, I18N("numeric")); GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%d\" selected>%s</option>", AQH_MqttTopicType_Num, I18N("numeric"));
else else
GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%d\" >%s</option>", dataType, I18N("numeric")); GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%d\" >%s</option>", AQH_MqttTopicType_Num, I18N("numeric"));
if (dataType==AQH_MqttTopicType_Json) if (dataType==AQH_MqttTopicType_Json)
GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%d\" selected>%s</option>", dataType, I18N("JSON")); GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%d\" selected>%s</option>", AQH_MqttTopicType_Json, I18N("JSON"));
else else
GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%d\" >%s</option>", dataType, I18N("JSON")); GWEN_Buffer_AppendArgs(pageBuf, "<option value=\"%d\" >%s</option>", AQH_MqttTopicType_Json, I18N("JSON"));
GWEN_Buffer_AppendString(pageBuf, "</select></td></tr>"); GWEN_Buffer_AppendString(pageBuf, "</select></td></tr>\n");
GWEN_Buffer_AppendString(pageBuf, "</tbody></table>");
if (seenTopicsList && GWEN_StringList_Count(seenTopicsList)) {
GWEN_STRINGLISTENTRY *se;
GWEN_Buffer_AppendArgs(pageBuf, "<tr><td valign=\"top\"><label for=\"receivedTopicsList\">%s: </label></td>", I18N("Received Topics"));
GWEN_Buffer_AppendString(pageBuf, "<td><output name=\"receivedTopicsList\" id=\"receivedTopicsList\">");
se=GWEN_StringList_FirstEntry(seenTopicsList);
while(se) {
const char *s=GWEN_StringListEntry_Data(se);
if (s && *s)
GWEN_Buffer_AppendArgs(pageBuf, "%s<br>\n", s);
se=GWEN_StringListEntry_Next(se);
}
GWEN_Buffer_AppendString(pageBuf, "</output></td></tr>");
}
GWEN_Buffer_AppendString(pageBuf, "</tbody></table>\n");
} }

View File

@@ -7,6 +7,7 @@ export LD_LIBRARY_PATH="0-build/aqhome/:$LD_LIBRARY_PATH"
0-build/apps/aqhome-storage/aqhome-storage \ 0-build/apps/aqhome-storage/aqhome-storage \
--sourcefolder=apps/aqhome-storage/test/html \ --sourcefolder=apps/aqhome-storage/test/html \
--datafolder=apps/aqhome-storage/test/data \
-D apps/aqhome-storage/test/config \ -D apps/aqhome-storage/test/config \
--statefile=apps/aqhome-storage/test/config/state \ --statefile=apps/aqhome-storage/test/config/state \
-ma 192.168.117.192 -mp 1883 --mqttclientid=AQHOMESTORAGETEST \ -ma 192.168.117.192 -mp 1883 --mqttclientid=AQHOMESTORAGETEST \

View File

@@ -105,6 +105,9 @@ int AQH_DataFile_Create(AQH_DATAFILE *df)
} }
sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew); sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew);
GWEN_SyncIo_SetFlags(sio,
GWEN_SYNCIO_FILE_FLAGS_WRITE | GWEN_SYNCIO_FILE_FLAGS_READ |
GWEN_SYNCIO_FILE_FLAGS_UREAD | GWEN_SYNCIO_FILE_FLAGS_UWRITE);
rv=GWEN_SyncIo_Connect(sio); rv=GWEN_SyncIo_Connect(sio);
if (rv<0) { if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv); DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv);
@@ -143,7 +146,8 @@ int AQH_DataFile_Open(AQH_DATAFILE *df)
return GWEN_ERROR_INVALID; return GWEN_ERROR_INVALID;
} }
sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew); sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_OpenExisting);
GWEN_SyncIo_SetFlags(sio, GWEN_SYNCIO_FILE_FLAGS_WRITE | GWEN_SYNCIO_FILE_FLAGS_READ);
rv=GWEN_SyncIo_Connect(sio); rv=GWEN_SyncIo_Connect(sio);
if (rv<0) { if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv); DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv);
@@ -279,7 +283,7 @@ AQH_DATAFILE *AQH_DataFile_List_GetByValueId(AQH_DATAFILE_LIST *fileList, uint64
df=AQH_DataFile_List_First(fileList); df=AQH_DataFile_List_First(fileList);
while(df) { while(df) {
if (df->valueId==id) if (df->valueId==id)
break; return df;
df=AQH_DataFile_List_Next(df); df=AQH_DataFile_List_Next(df);
} }
} }

View File

@@ -367,6 +367,7 @@ void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *sTopic, const c
topic=AQH_Storage_GetMqttTopicByTopic(sto, sTopic); topic=AQH_Storage_GetMqttTopicByTopic(sto, sTopic);
if (topic) { if (topic) {
DBG_INFO(AQH_LOGDOMAIN, "Handling MQTT topic \"%s\"", sTopic);
if (AQH_MqttTopic_GetDataType(topic)==AQH_MqttTopicType_Json) if (AQH_MqttTopic_GetDataType(topic)==AQH_MqttTopicType_Json)
_handleJsonTopic(sto, topic, sValue); _handleJsonTopic(sto, topic, sValue);
else else
@@ -502,7 +503,13 @@ AQH_DATAFILE *_getDataFileByValueId(AQH_STORAGE *sto, uint64_t valueId)
df=_findDataFileByValueId(sto, valueId); df=_findDataFileByValueId(sto, valueId);
if (df==NULL) { if (df==NULL) {
DBG_INFO(AQH_LOGDOMAIN, "Datafile for valueId \"%lu\" not in list, loading", (unsigned long int) valueId);
df=_openOrCreateDataFileByValueId(sto, valueId); df=_openOrCreateDataFileByValueId(sto, valueId);
if (df==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "Error opening/creating datafile for valueId \"%lu\"", (unsigned long int) valueId);
return NULL;
}
DBG_INFO(AQH_LOGDOMAIN, "Adding datafile for valueId \"%lu\" to list", (unsigned long int) valueId);
AQH_DataFile_List_Add(df, sto->dataFileList); AQH_DataFile_List_Add(df, sto->dataFileList);
} }

View File

@@ -522,6 +522,7 @@ void _abortMessage(GWEN_MSG_ENDPOINT *ep)
DBG_DEBUG(AQH_LOGDOMAIN, "Aborting message (if any)."); DBG_DEBUG(AQH_LOGDOMAIN, "Aborting message (if any).");
xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_HTTP, ep); xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_HTTP, ep);
if (xep->currentReadBuffer)
GWEN_Buffer_Reset(xep->currentReadBuffer); GWEN_Buffer_Reset(xep->currentReadBuffer);
xep->currentHeaderPos=0; xep->currentHeaderPos=0;
xep->currentBodyPos=0; xep->currentBodyPos=0;

View File

@@ -657,7 +657,9 @@ int _unescapeUrlEncoded(const char *src, unsigned int srclen, char *buffer, unsi
x=='.' || x=='.' ||
x=='*' || x=='*' ||
x=='?' || x=='?' ||
x=='+' x=='+' ||
x=='-' ||
x=='_'
) { ) {
if (size<(maxsize-1)) { if (size<(maxsize-1)) {
buffer[size++]=(x=='+')?' ':x; buffer[size++]=(x=='+')?' ':x;

View File

@@ -9,6 +9,7 @@ valgrind --tool=callgrind --trace-children=yes --dump-instr=yes --collect-jumps=
0-build/apps/aqhome-storage/aqhome-storage \ 0-build/apps/aqhome-storage/aqhome-storage \
--sourcefolder=apps/aqhome-storage/test/html \ --sourcefolder=apps/aqhome-storage/test/html \
-D apps/aqhome-storage/test/config \ -D apps/aqhome-storage/test/config \
--datafolder=apps/aqhome-storage/test/data \
--statefile=apps/aqhome-storage/test/config/state \ --statefile=apps/aqhome-storage/test/config/state \
-ma 192.168.117.192 -mp 1883 --mqttclientid=AQHOMESTORAGEVG \ -ma 192.168.117.192 -mp 1883 --mqttclientid=AQHOMESTORAGEVG \
-ha 127.0.0.1 -hp 1884 \ -ha 127.0.0.1 -hp 1884 \

View File

@@ -12,6 +12,7 @@ valgrind \
--track-origins=yes --num-callers=50 --keep-stacktraces=alloc-and-free \ --track-origins=yes --num-callers=50 --keep-stacktraces=alloc-and-free \
0-build/apps/aqhome-storage/aqhome-storage \ 0-build/apps/aqhome-storage/aqhome-storage \
--sourcefolder=apps/aqhome-storage/test/html \ --sourcefolder=apps/aqhome-storage/test/html \
--datafolder=apps/aqhome-storage/test/data \
-D apps/aqhome-storage/test/config \ -D apps/aqhome-storage/test/config \
--statefile=apps/aqhome-storage/test/config/state \ --statefile=apps/aqhome-storage/test/config/state \
-ma 192.168.117.192 -mp 1883 --mqttclientid=AQHOMESTORAGEVG \ -ma 192.168.117.192 -mp 1883 --mqttclientid=AQHOMESTORAGEVG \