diff --git a/0BUILD b/0BUILD index 87c74f9..402ba7a 100644 --- a/0BUILD +++ b/0BUILD @@ -44,6 +44,7 @@ $(option_prefix)/include $(option_prefix)/share $(option_prefix)/share + $(option_prefix)/var/lib $(option_prefix)/share/locale $(libdir)/$(package) @@ -87,6 +88,7 @@ etc share/locale share" + var/lib @@ -96,11 +98,13 @@ etc share/locale share + var/lib $(sysconfdir) $(datadir)/locale $(datadir) + $(rtdatadir) diff --git a/apps/aqhome-mqttlog/0BUILD b/apps/aqhome-mqttlog/0BUILD index 226bfe9..e280f12 100644 --- a/apps/aqhome-mqttlog/0BUILD +++ b/apps/aqhome-mqttlog/0BUILD @@ -8,6 +8,8 @@ $(gwenhywfar_cflags) -I$(topsrcdir) -I$(topbuilddir) + -I$(topsrcdir)/apps + -I$(topbuilddir)/apps -I$(builddir) -I$(srcdir) @@ -47,10 +49,14 @@ init.h fini.h + loop.h + loop_ipc.h + loop_mqtt.h aqhome_mqtt.h aqhome_mqtt_p.h mqtt.h messages.h + xmlread.h @@ -58,13 +64,18 @@ aqhome_mqtt.c init.c fini.c + loop.c + loop_ipc.c + loop_mqtt.c main.c mqtt.c messages.c + xmlread.c aqhome + aqhmqtt_types @@ -72,6 +83,7 @@ + types diff --git a/apps/aqhome-mqttlog/aqhome_mqtt.c b/apps/aqhome-mqttlog/aqhome_mqtt.c index 293f929..c6b967f 100644 --- a/apps/aqhome-mqttlog/aqhome_mqtt.c +++ b/apps/aqhome-mqttlog/aqhome_mqtt.c @@ -24,7 +24,7 @@ -AQHOME_MQTT *AqHomeMqtt_new() +AQHOME_MQTT *AqHomeMqtt_new(void) { AQHOME_MQTT *aqh; @@ -38,6 +38,8 @@ AQHOME_MQTT *AqHomeMqtt_new() void AqHomeMqtt_free(AQHOME_MQTT *aqh) { if (aqh) { + AQHMQTT_Device_List_free(aqh->availableDeviceList); + AQHMQTT_Device_List_free(aqh->registeredDeviceList); GWEN_MsgEndpoint_free(aqh->rootEndpoint); GWEN_DB_Group_free(aqh->dbArgs); free(aqh->pidFile); @@ -90,3 +92,27 @@ int AqHomeMqtt_GetTimeout(const AQHOME_MQTT *aqh) { return aqh?aqh->timeout:0; } + + + +AQHMQTT_DEVICE_LIST *AqHomeMqtt_GetAvailableDeviceList(const AQHOME_MQTT *aqh) +{ + return aqh?aqh->availableDeviceList:NULL; +} + + + +void AqHomeMqtt_SetAvailableDeviceList(AQHOME_MQTT *aqh, AQHMQTT_DEVICE_LIST *dl) +{ + if (aqh) { + AQHMQTT_Device_List_free(aqh->availableDeviceList); + aqh->availableDeviceList=dl; + } +} + + + + + + + diff --git a/apps/aqhome-mqttlog/aqhome_mqtt.h b/apps/aqhome-mqttlog/aqhome_mqtt.h index 1a4b1e9..1357be5 100644 --- a/apps/aqhome-mqttlog/aqhome_mqtt.h +++ b/apps/aqhome-mqttlog/aqhome_mqtt.h @@ -12,6 +12,7 @@ #include "./mqttvalue.h" #include "./mqtttopic.h" +#include "aqhome-mqttlog/types/device.h" #include @@ -21,7 +22,7 @@ typedef struct AQHOME_MQTT AQHOME_MQTT; -AQHOME_MQTT *AqHomeMqtt_new(); +AQHOME_MQTT *AqHomeMqtt_new(void); void AqHomeMqtt_free(AQHOME_MQTT *aqh); GWEN_MSG_ENDPOINT *AqHomeMqtt_GetBrokerEndpoint(const AQHOME_MQTT *aqh); @@ -35,6 +36,8 @@ void AqHomeMqtt_SetPidFile(AQHOME_MQTT *aqh, const char *s); int AqHomeMqtt_GetTimeout(const AQHOME_MQTT *aqh); +AQHMQTT_DEVICE_LIST *AqHomeMqtt_GetAvailableDeviceList(const AQHOME_MQTT *aqh); +void AqHomeMqtt_SetAvailableDeviceList(AQHOME_MQTT *aqh, AQHMQTT_DEVICE_LIST *dl); #endif diff --git a/apps/aqhome-mqttlog/aqhome_mqtt_p.h b/apps/aqhome-mqttlog/aqhome_mqtt_p.h index e40f3e9..7aa29d6 100644 --- a/apps/aqhome-mqttlog/aqhome_mqtt_p.h +++ b/apps/aqhome-mqttlog/aqhome_mqtt_p.h @@ -16,7 +16,7 @@ #define AQHOME_MQTT_DEFAULT_PIDFILE "/var/run/aqhome-mqtt.pid" -#define AQHOME_MQTT_DEFAULT_DATADIR "/var/lib/aqhome-mqtt/data" +#define AQHOME_MQTT_DEFAULT_DATADIR "/var/lib/aqhome-mqtt" #define AQHOME_MQTT_DEFAULT_BROKER_PORT 1899 #define AQHOME_MQTT_DEFAULT_BROKER_CLIENTID "mqtt" @@ -35,6 +35,10 @@ struct AQHOME_MQTT { AQH_MQTT_VALUE *mqttValueList; AQH_MQTT_TOPIC *mqttTopicList; + + AQHMQTT_DEVICE_LIST *availableDeviceList; + AQHMQTT_DEVICE_LIST *registeredDeviceList; + }; diff --git a/apps/aqhome-mqttlog/init.c b/apps/aqhome-mqttlog/init.c index ce20ccb..d3bc53c 100644 --- a/apps/aqhome-mqttlog/init.c +++ b/apps/aqhome-mqttlog/init.c @@ -14,6 +14,7 @@ #include "./init.h" #include "./mqtt.h" #include "./aqhome_mqtt_p.h" +#include "./xmlread.h" #include "aqhome/ipc/endpoint_ipc.h" #include "aqhome/ipc/endpoint_ipcclient.h" @@ -104,6 +105,17 @@ int AqHomeMqtt_Init(AQHOME_MQTT *aqh, int argc, char **argv) +void AqHomeMqtt_ReloadDeviceFiles(AQHOME_MQTT *aqh) +{ + AQHMQTT_DEVICE_LIST *deviceList; + + deviceList=AqHomeMqttLog_ReadSysconfDeviceFiles(aqh); + if (deviceList) + AqHomeMqtt_SetAvailableDeviceList(aqh, deviceList); +} + + + int _createPidFile(const char *pidFilename) { FILE *f; diff --git a/apps/aqhome-mqttlog/loop.c b/apps/aqhome-mqttlog/loop.c new file mode 100644 index 0000000..ffa0dcc --- /dev/null +++ b/apps/aqhome-mqttlog/loop.c @@ -0,0 +1,56 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./loop.h" +#include "./loop_ipc.h" +#include "./loop_mqtt.h" +#include "./aqhome_mqtt_p.h" + +#include +#include +#include +#include +#include +#include + + +//#define FULL_DEBUG + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + +void AqHomeMqttLog_Loop(AQHOME_MQTT *aqh, int timeoutInMsecs) +{ + if (aqh) { + GWEN_MsgEndpoint_ChildrenIoLoop(aqh->rootEndpoint, timeoutInMsecs); + AqHomeMqttLog_ReadAndHandleMqttMessages(aqh); + AqHomeMqttLog_ReadAndHandleIpcMessages(aqh); + } +} + + + + + + + + diff --git a/apps/aqhome-mqttlog/loop.h b/apps/aqhome-mqttlog/loop.h new file mode 100644 index 0000000..28a6679 --- /dev/null +++ b/apps/aqhome-mqttlog/loop.h @@ -0,0 +1,26 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOMEMQTT_LOOP_H +#define AQHOMEMQTT_LOOP_H + + +#include "./aqhome_mqtt.h" + +#include +#include + + + +void AqHomeMqttLog_Loop(AQHOME_MQTT *aqh, int timeoutInMsecs); + + + +#endif + + diff --git a/apps/aqhome-mqttlog/loop_ipc.c b/apps/aqhome-mqttlog/loop_ipc.c new file mode 100644 index 0000000..3f3a9e2 --- /dev/null +++ b/apps/aqhome-mqttlog/loop_ipc.c @@ -0,0 +1,58 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./loop_ipc.h" +#include "./aqhome_mqtt_p.h" +#include "aqhome/ipc/data/ipc_data.h" + +#include +#include +#include +#include +#include +#include + + +#define FULL_DEBUG + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + + +void AqHomeMqttLog_ReadAndHandleIpcMessages(AQHOME_MQTT *aqh) +{ + GWEN_MSG_ENDPOINT *epTcp; + GWEN_MSG *msg; + + epTcp=aqh->brokerEndpoint; + while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) { + GWEN_Msg_free(msg); + } +} + + + + + + + + diff --git a/apps/aqhome-mqttlog/loop_ipc.h b/apps/aqhome-mqttlog/loop_ipc.h new file mode 100644 index 0000000..657bd65 --- /dev/null +++ b/apps/aqhome-mqttlog/loop_ipc.h @@ -0,0 +1,26 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOMEMQTT_LOOP_IPC_H +#define AQHOMEMQTT_LOOP_IPC_H + + +#include "./aqhome_mqtt.h" + +#include +#include + + + +void AqHomeMqttLog_ReadAndHandleIpcMessages(AQHOME_MQTT *aqh); + + + +#endif + + diff --git a/apps/aqhome-mqttlog/loop_mqtt.c b/apps/aqhome-mqttlog/loop_mqtt.c new file mode 100644 index 0000000..e2c5d9a --- /dev/null +++ b/apps/aqhome-mqttlog/loop_mqtt.c @@ -0,0 +1,405 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./loop_mqtt.h" +#include "./aqhome_mqtt_p.h" +#include "aqhome/mqtt/msg_mqtt_publish.h" +#include "aqhome/ipc/data/msg_data_multidata.h" +#include "aqhome/ipc/data/ipc_data.h" + +#include +#include +#include +#include +#include +#include + + +#define FULL_DEBUG + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void _handleMqttMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); +static void _handlePublishMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); +static int _handlePublish(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *topic, const char *value); +static void _handleNumTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *dev, AQHMQTT_TOPIC *t, const char *rcvdValue); +static void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *dev, AQHMQTT_TOPIC *t, const char *rcvdValue); +static void _sendMessage(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_VALUE *value, const char *rcvdValue); +static int _registerNewDeviceForTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue); +static AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir); +static AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir); +static GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + +void AqHomeMqttLog_ReadAndHandleMqttMessages(AQHOME_MQTT *aqh) +{ + GWEN_MSG_ENDPOINT *epTcp; + GWEN_MSG *msg; + + epTcp=aqh->mqttEndpoint; + while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) { +#ifdef FULL_DEBUG + DBG_ERROR(NULL, "Received this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); +#endif + _handleMqttMsg(aqh, epTcp, msg); + GWEN_Msg_free(msg); + } +} + + + +int AqHomeMqttLog_SendPing(AQHOME_MQTT *aqh) +{ + GWEN_MSG_ENDPOINT *epTcp; + GWEN_MSG *msgOut; + + DBG_INFO(AQH_LOGDOMAIN, "Sending PING"); + epTcp=aqh->mqttEndpoint; + msgOut=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL); + if (msgOut==NULL) { + DBG_ERROR(NULL, "Error creating message"); + return GWEN_ERROR_INTERNAL; + } + GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); + + return 0; +} + + + +void _handleMqttMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) +{ + if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) { +#ifdef FULL_DEBUG + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_PublishMqttMsg_DumpToBuffer(msg, buf, "received"); + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); +#endif + _handlePublishMsg(aqh, ep, msg); + } + else if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0)) { + DBG_INFO(AQH_LOGDOMAIN, "PING response received"); + } + else { +#ifdef FULL_DEBUG + DBG_ERROR(NULL, "Received this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); +#endif + } +} + + + +void _handlePublishMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) +{ + char *topic; + char *value; + + topic=AQH_PublishMqttMsg_ExtractTopic(msg); + value=AQH_PublishMqttMsg_ExtractValue(msg); + + if (topic && value) { + int rv; + + rv=_handlePublish(aqh, ep, topic, value); + if (rv!=1) { + DBG_INFO(NULL, "New topic \"%s\", trying to register", topic); + rv=_registerNewDeviceForTopic(aqh, ep, topic, value); + if (rv==1) { + rv=_handlePublish(aqh, ep, topic, value); + if (rv!=1) { + DBG_ERROR(NULL, "Topic \"%s\" still not handled, SNH!", topic); + } + } + } + } + else { + DBG_ERROR(NULL, "Either topic or value missing in PUBLISH msg"); + } + free(value); + free(topic); +} + + + +int _handlePublish(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue) +{ + if (rcvdTopic && *rcvdTopic) { + if (aqh->registeredDeviceList) { + AQHMQTT_DEVICE *device; + + device=AQHMQTT_Device_List_First(aqh->registeredDeviceList); + while(device) { + AQHMQTT_TOPIC_LIST *topicList; + + topicList=AQHMQTT_Device_GetTopicList(device); + if (topicList) { + AQHMQTT_TOPIC *topic; + + topic=_findTopicMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); + if (topic==NULL) { + topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); + if (topic) + AQHMQTT_Topic_SetTopic(topic, rcvdTopic); + } + if (topic) { + if (AQHMQTT_Topic_GetTopicType(topic)==AQHMQTT_TopicType_Json) + _handleJsonTopic(aqh, ep, device, topic, rcvdValue); + else + _handleNumTopic(aqh, ep, device, topic, rcvdValue); + return 1; + } + } + + device=AQHMQTT_Device_List_Next(device); + } + } + DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic); + } + return 0; +} + + + +void _handleNumTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue) +{ + AQHMQTT_VALUE_LIST *valueList; + + valueList=AQHMQTT_Topic_GetValueList(topic); + if (valueList) + _sendMessage(aqh, ep, device, AQHMQTT_Value_List_First(valueList), rcvdValue); + else { + DBG_INFO(NULL, "No value list in device \"%s\"", AQHMQTT_Device_GetId(device)); + } +} + + + +void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue) +{ + GWEN_JSON_ELEM *jeRoot; + + jeRoot=GWEN_JsonElement_fromString(rcvdValue); + if (jeRoot==NULL) { + DBG_INFO(NULL, "Could not parse JSON value: %s", rcvdValue); + } + else { + AQHMQTT_VALUE_LIST *valueList; + + valueList=AQHMQTT_Topic_GetValueList(topic); + if (valueList) { + AQHMQTT_VALUE *value; + + value=AQHMQTT_Value_List_First(valueList); + while(value) { + const char *path; + + path=AQHMQTT_Value_GetPath(value); + if (path && *path) { + GWEN_JSON_ELEM *je; + + je=GWEN_JsonElement_GetElementByPath(jeRoot, path, 0); + if (je) { + const char *s; + + s=GWEN_JsonElement_GetData(je); + if (s && *s) + _sendMessage(aqh, ep, device, value, s); + } + } + value=AQHMQTT_Value_List_Next(value); + } /* while */ + } + GWEN_JsonElement_free(jeRoot); + } +} + + + +void _sendMessage(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_VALUE *value, const char *rcvdValue) +{ + int rv; + union {double f; uint64_t i;} u; + + rv=GWEN_Text_StringToDouble(rcvdValue, &(u.f)); + if (rv<0) { + DBG_ERROR(NULL, "Invalid value received from MQTT server (%s)", rcvdValue?rcvdValue:""); + } + else { + GWEN_MSG *pubMsg; + uint64_t arrayToSend[2]; + AQH_VALUE *msgValue; + + arrayToSend[0]=(uint64_t) time(NULL); + arrayToSend[1]=u.i; + + msgValue=AQH_Value_new(); + AQH_Value_SetDeviceName(msgValue, AQHMQTT_Device_GetId(device)); + AQH_Value_SetName(msgValue, AQHMQTT_Value_GetName(value)); + AQH_Value_SetValueUnits(msgValue, AQHMQTT_Value_GetValueUnits(value)); + AQH_Value_SetValueType(msgValue, 0); + + pubMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgValue, arrayToSend, 1); + if (pubMsg) { + DBG_INFO(AQH_LOGDOMAIN, "BROKER PUBLISH %s: %f", AQH_Value_GetName(msgValue), u.f); + GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, pubMsg); + } + AQH_Value_free(msgValue); + } +} + + + +int _registerNewDeviceForTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue) +{ + if (rcvdTopic && *rcvdTopic) { + if (aqh->availableDeviceList) { + AQHMQTT_DEVICE *device; + + device=AQHMQTT_Device_List_First(aqh->availableDeviceList); + while(device) { + AQHMQTT_TOPIC_LIST *topicList; + + topicList=AQHMQTT_Device_GetTopicList(device); + if (topicList) { + AQHMQTT_TOPIC *topic; + + topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); + if (topic) { + GWEN_BUFFER *buf; + + buf=_extractDeviceId(topic, rcvdTopic); + if (buf) { + AQHMQTT_DEVICE *newDevice; + + newDevice=AQHMQTT_Device_dup(device); + AQHMQTT_Device_SetId(newDevice, GWEN_Buffer_GetStart(buf)); + topic=_findMaskMatchingTopic(AQHMQTT_Device_GetTopicList(newDevice), rcvdTopic, AQHMQTT_TopicDir_In); + AQHMQTT_Topic_SetTopic(topic, rcvdTopic); + if (aqh->registeredDeviceList==NULL) + aqh->registeredDeviceList=AQHMQTT_Device_List_new(); + DBG_ERROR(NULL, "Registered device \"%s\" (%s)", AQHMQTT_Device_GetId(newDevice), AQHMQTT_Device_GetName(newDevice)); + AQHMQTT_Device_List_Add(newDevice, aqh->registeredDeviceList); + GWEN_Buffer_free(buf); + return 1; + } + + } + } + + device=AQHMQTT_Device_List_Next(device); + } + } + DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic); + } + return 0; + +} + + + +AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir) +{ + if (topicList) { + AQHMQTT_TOPIC *topic; + + topic=AQHMQTT_Topic_List_First(topicList); + while(topic) { + if (AQHMQTT_Topic_GetDirection(topic)==dir) { + const char *sMask; + + sMask=AQHMQTT_Topic_GetMask(topic); + if (sMask && *sMask && GWEN_Text_ComparePattern(rcvdTopic, sMask, 0)!=-1) { + /* found a matching topic */ + return topic; + } + } + + topic=AQHMQTT_Topic_List_Next(topic); + } + } + return NULL; +} + + + +AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir) +{ + if (topicList) { + AQHMQTT_TOPIC *topic; + + topic=AQHMQTT_Topic_List_First(topicList); + while(topic) { + if (AQHMQTT_Topic_GetDirection(topic)==dir) { + const char *sTopic; + + sTopic=AQHMQTT_Topic_GetTopic(topic); + if (sTopic && *sTopic && strcasecmp(rcvdTopic, sTopic)==0) { + return topic; + } + } + topic=AQHMQTT_Topic_List_Next(topic); + } + } + return NULL; +} + + + +GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic) +{ + const char *sBefore; + const char *sAfter; + + sBefore=AQHMQTT_Topic_GetBeforeId(topic); + sAfter=AQHMQTT_Topic_GetAfterId(topic); + + if (sBefore && *sBefore && sAfter && *sAfter) { + GWEN_BUFFER *buf; + int rv; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Buffer_AppendString(buf, rcvdTopic); + rv=GWEN_Buffer_KeepTextBetweenStrings(buf, sBefore, sAfter, 1); + if (rv<0) { + DBG_INFO(NULL, "Could not extract id from [%s] (beforeId=%s, afterId=%s)", rcvdTopic, sBefore, sAfter); + GWEN_Buffer_free(buf); + return NULL; + } + + return buf; + } + + return NULL; +} + + + + + + + + diff --git a/apps/aqhome-mqttlog/loop_mqtt.h b/apps/aqhome-mqttlog/loop_mqtt.h new file mode 100644 index 0000000..4a04337 --- /dev/null +++ b/apps/aqhome-mqttlog/loop_mqtt.h @@ -0,0 +1,27 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOMEMQTT_LOOP_MQTT_H +#define AQHOMEMQTT_LOOP_MQTT_H + + +#include "./aqhome_mqtt.h" + +#include +#include + + + +void AqHomeMqttLog_ReadAndHandleMqttMessages(AQHOME_MQTT *aqh); +int AqHomeMqttLog_SendPing(AQHOME_MQTT *aqh); + + + +#endif + + diff --git a/apps/aqhome-mqttlog/main.c b/apps/aqhome-mqttlog/main.c index 09c6318..78fccef 100644 --- a/apps/aqhome-mqttlog/main.c +++ b/apps/aqhome-mqttlog/main.c @@ -11,12 +11,12 @@ #endif #include "./item.h" -#include "./mqtt.h" -#include "./messages.h" +#include "./init.h" +#include "./fini.h" +#include "./loop.h" +#include "./loop_mqtt.h" #include "aqhome/aqhome.h" -#include "aqhome/mqtt/msg_mqtt.h" -#include "aqhome/mqtt/msg_mqtt_publish.h" #include #include @@ -49,7 +49,7 @@ #define AQHOME_MQTTLOG_PING_INTERVAL 120 -//#define FULL_DEBUG +#define FULL_DEBUG @@ -58,9 +58,7 @@ * ------------------------------------------------------------------------------------------------ */ -static int _serve(GWEN_DB_NODE *dbArgs); -static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs); -static int _createPidFile(const char *pidFilename); +static int _serve(AQHOME_MQTT *aqh); #ifdef HAVE_SIGNAL_H static int _setSignalHandlers(void); @@ -87,6 +85,7 @@ static int stopService=0; int main(int argc, char **argv) { + AQHOME_MQTT *aqh; GWEN_DB_NODE *dbArgs; int rv; GWEN_GUI *gui; @@ -108,16 +107,16 @@ int main(int argc, char **argv) return 2; } - dbArgs=GWEN_DB_Group_new("arguments"); - rv=_readArgs(argc, argv, dbArgs); + aqh=AqHomeMqtt_new(); + rv=AqHomeMqtt_Init(aqh, argc, argv); if (rv<0) { + if (rv==GWEN_ERROR_CLOSE) + return 1; DBG_INFO(NULL, "here (%d)", rv); return 2; } - else if (rv==1) { - DBG_INFO(NULL, "Help printed, done"); - return 0; - } + + dbArgs=AqHomeMqtt_GetDbArgs(aqh); gui=GWEN_Gui_CGui_new(); s=GWEN_DB_GetCharValue(dbArgs, "charset", 0, NULL); @@ -125,13 +124,15 @@ int main(int argc, char **argv) GWEN_Gui_SetCharSet(gui, s); GWEN_Gui_SetGui(gui); - rv=_serve(dbArgs); + rv=_serve(aqh); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return 2; } - GWEN_DB_Group_free(dbArgs); + AqHomeMqtt_Fini(aqh); + AqHomeMqtt_free(aqh); + GWEN_Gui_SetGui(NULL); GWEN_Gui_free(gui); @@ -140,24 +141,18 @@ int main(int argc, char **argv) -int _serve(GWEN_DB_NODE *dbArgs) +int _serve(AQHOME_MQTT *aqh) { - const char *pidFile; GWEN_MSG_ENDPOINT *epTcp; - ITEM_LIST *itemList; int rv; int timeout; time_t startTime; time_t lastPingSendTime; - const char *baseFolder; + GWEN_DB_NODE *dbArgs; startTime=time(NULL); - itemList=AqHomeMqttLog_ReadItems(dbArgs); - if (itemList==NULL) { - DBG_ERROR(NULL, "No items to listen to, aborting."); - return GWEN_ERROR_GENERIC; - } + dbArgs=AqHomeMqtt_GetDbArgs(aqh); rv=_setSignalHandlers(); if (rv<0) { @@ -165,80 +160,15 @@ int _serve(GWEN_DB_NODE *dbArgs) return rv; } - baseFolder=GWEN_DB_GetCharValue(dbArgs, "writeToFolder", 0, "/tmp/aqhome"); timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0); - pidFile=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, "aqhome-mqttlog.pid"); - if (pidFile && *pidFile) { - rv=_createPidFile(pidFile); - if (rv<0) { - DBG_INFO(NULL, "here (%d)", rv); - return rv; - } - } - - epTcp=AqHomeMqttLog_CreateMqttEndpoint(dbArgs); - if (epTcp==NULL) { - DBG_INFO(NULL, "here"); - Item_List_free(itemList); - return GWEN_ERROR_GENERIC; - } - - rv=AqHomeMqttLog_MqttConnect(epTcp); - if (rv<0) { - DBG_INFO(NULL, "here (%d)", rv); - GWEN_MsgEndpoint_free(epTcp); - Item_List_free(itemList); - return rv; - } - - rv=AqHomeMqttLog_Subscribe(epTcp, "#"); - if (rv<0) { - DBG_INFO(NULL, "here (%d)", rv); - GWEN_MsgEndpoint_free(epTcp); - Item_List_free(itemList); - return rv; - } lastPingSendTime=time(NULL); + epTcp=AqHomeMqtt_GetMqttEndpoint(aqh); + while(!stopService) { DBG_DEBUG(NULL, "Next loop"); - GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */ - if (GWEN_MsgEndpoint_GetState(epTcp)!=GWEN_MSG_ENDPOINT_STATE_CONNECTED) { - DBG_INFO(NULL, "Not connected..."); - } - else { - GWEN_MSG *msg; - - while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) { -#ifdef FULL_DEBUG - DBG_ERROR(NULL, "Received this message:"); - GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); -#endif - - if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) { -#ifdef FULL_DEBUG - GWEN_BUFFER *buf; - - buf=GWEN_Buffer_new(0, 256, 0, 1); - AQH_PublishMqttMsg_DumpToBuffer(msg, buf, "received"); - fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); - GWEN_Buffer_free(buf); -#endif - AqHomeMqttLog_HandlePublishMsg(baseFolder, itemList, msg); - } - else if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0)) { - DBG_INFO(AQH_LOGDOMAIN, "PING response received"); - } - else { -#ifdef FULL_DEBUG - DBG_ERROR(NULL, "Received this message:"); - GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); -#endif - } - GWEN_Msg_free(msg); - } - } + AqHomeMqttLog_Loop(aqh, 2000); if (timeout) { time_t now; @@ -254,7 +184,7 @@ int _serve(GWEN_DB_NODE *dbArgs) now=time(NULL); if (now-lastPingSendTime>AQHOME_MQTTLOG_PING_INTERVAL) { - rv=AqHomeMqttLog_Ping(epTcp); + rv=AqHomeMqttLog_SendPing(aqh); if (rv<0) { DBG_INFO(NULL, "Error sending PING"); } @@ -263,11 +193,7 @@ int _serve(GWEN_DB_NODE *dbArgs) } } - if (pidFile && *pidFile) - remove(pidFile); - GWEN_MsgEndpoint_free(epTcp); - Item_List_free(itemList); return 0; } @@ -339,195 +265,3 @@ void _signalHandler(int s) -int _createPidFile(const char *pidFilename) -{ - FILE *f; - int pidfd; - - if (remove(pidFilename)==0) { - DBG_ERROR(0, "Old PID file existed, removed. (Unclean shutdown?)"); - } - -#ifdef HAVE_SYS_STAT_H - pidfd = open(pidFilename, O_EXCL|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); - if (pidfd < 0) { - DBG_ERROR(NULL, "Could not create PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); - return GWEN_ERROR_IO; - } - - f = fdopen(pidfd, "w"); -#else /* HAVE_STAT_H */ - f=fopen(pidFilename,"w+"); -#endif /* HAVE_STAT_H */ - - /* write pid */ -#ifdef HAVE_GETPID - fprintf(f,"%d\n",getpid()); -#else - fprintf(f,"-1\n"); -#endif - if (fclose(f)) { - DBG_ERROR(0, "Could not close PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); - return GWEN_ERROR_IO; - } - return 0; -} - - - -int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) -{ - int rv; - const GWEN_ARGS args[]= { - { - GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ - GWEN_ArgsType_Char, /* type */ - "cfgdir", /* name */ - 0, /* minnum */ - 1, /* maxnum */ - "D", /* short option */ - "cfgdir", /* long option */ - I18S("Specify the configuration folder"), - I18S("Specify the configuration folder") - }, - { - GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ - GWEN_ArgsType_Char, /* type */ - "charset", /* name */ - 0, /* minnum */ - 1, /* maxnum */ - 0, /* short option */ - "charset", /* long option */ - I18S("Specify the output character set"), /* short description */ - I18S("Specify the output character set") /* long description */ - }, - { - GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ - GWEN_ArgsType_Char, /* type */ - "mqttAddress", /* name */ - 0, /* minnum */ - 1, /* maxnum */ - "ma", /* short option */ - "mqttaddress", /* long option */ - I18S("Specify the address of the MQTT server to connect to (disabled if missing)"), - I18S("Specify the address of the MQTT server to connect to (disabled if missing)") - }, - { - GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ - GWEN_ArgsType_Int, /* type */ - "mqttPort", /* name */ - 0, /* minnum */ - 1, /* maxnum */ - "mp", /* short option */ - "mqttport", /* long option */ - I18S("Specify the port of the MQTT server (default: 1883)"), - I18S("Specify the port of the MQTT server (default: 1883)") - }, - { - GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ - GWEN_ArgsType_Char, /* type */ - "mqttClientId", /* name */ - 0, /* minnum */ - 1, /* maxnum */ - NULL, /* short option */ - "mqttclientid", /* long option */ - I18S("Specify client id for the MQTT server (default: \"aqhomed\")"), - I18S("Specify client id for the MQTT server (default: \"aqhomed\")") - }, - { - GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ - GWEN_ArgsType_Int, /* type */ - "mqttKeepAlive", /* name */ - 0, /* minnum */ - 1, /* maxnum */ - "mk", /* short option */ - "mqttkeepalive", /* long option */ - I18S("Specify keepalive time in seconds (defaults: 600)"), - I18S("Specify keepalive time in seconds (defaults: 600)") - }, - { - GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ - GWEN_ArgsType_Char, /* type */ - "writeToFolder", /* name */ - 0, /* minnum */ - 1, /* maxnum */ - "W", /* short option */ - NULL, /* long option */ - I18S("Specify folder to write received values to"), - I18S("Specify folder to write received values to") - }, - { - GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ - GWEN_ArgsType_Char, /* type */ - "pidfile", /* name */ - 0, /* minnum */ - 1, /* maxnum */ - "p", /* short option */ - "pidfile", /* long option */ - I18S("Specify the PID file"), - I18S("Specify the PID file") - }, - { - GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ - GWEN_ArgsType_Char, /* type */ - "itemfile", /* name */ - 0, /* minnum */ - 1, /* maxnum */ - "i", /* short option */ - "itemfile", /* long option */ - I18S("Specify the item definitions file"), - I18S("Specify the item definitions file") - }, - { - GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ - GWEN_ArgsType_Int, /* type */ - "timeout", /* name */ - 0, /* minnum */ - 1, /* maxnum */ - "T", /* short option */ - "timeout", /* long option */ - I18S("Specify timeout in second (default: no timeout)"), - I18S("Specify timeout in second (default: no timeout)") - }, - { - GWEN_ARGS_FLAGS_HELP | GWEN_ARGS_FLAGS_LAST, /* flags */ - GWEN_ArgsType_Int, /* type */ - "help", /* name */ - 0, /* minnum */ - 0, /* maxnum */ - "h", /* short option */ - "help", - I18S("Show this help screen."), - I18S("Show this help screen.") - } - }; - - rv=GWEN_Args_Check(argc, argv, 1, 0, args, dbArgs); - if (rv==GWEN_ARGS_RESULT_ERROR) { - fprintf(stderr, "ERROR: Could not parse arguments main\n"); - return GWEN_ERROR_INVALID; - } - else if (rv==GWEN_ARGS_RESULT_HELP) { - GWEN_BUFFER *ubuf; - - ubuf=GWEN_Buffer_new(0, 1024, 0, 1); - GWEN_Buffer_AppendArgs(ubuf, - I18N("This is version %s.\nUsage: %s [OPTIONS]\n\nOptions:\n"), - AQHOME_VERSION_STRING, - argv[0]); - if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) { - fprintf(stderr, "ERROR: Could not create help string\n"); - return 1; - } - GWEN_Buffer_AppendString(ubuf, "\n"); - - fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(ubuf)); - GWEN_Buffer_free(ubuf); - return GWEN_ERROR_CLOSE; - } - return 0; -} - - - - diff --git a/apps/aqhome-mqttlog/mqtt.c b/apps/aqhome-mqttlog/mqtt.c index fc69bea..cae8c43 100644 --- a/apps/aqhome-mqttlog/mqtt.c +++ b/apps/aqhome-mqttlog/mqtt.c @@ -85,7 +85,7 @@ GWEN_MSG_ENDPOINT *AqHomeMqttLog_CreateMqttEndpoint(GWEN_DB_NODE *dbArgs) } - +#if 0 int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT *epTcp) { if (GWEN_MsgEndpoint_GetState(epTcp)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { @@ -136,6 +136,7 @@ int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT *epTcp, const char *topicFilter) return 0; } +#endif @@ -156,6 +157,7 @@ int AqHomeMqttLog_Ping(GWEN_MSG_ENDPOINT *epTcp) +#if 0 GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int timeoutInSeconds) { time_t startTime; @@ -187,5 +189,7 @@ GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int return NULL; } +#endif + diff --git a/apps/aqhome-mqttlog/mqtt.h b/apps/aqhome-mqttlog/mqtt.h index e7bf6b2..5dcad78 100644 --- a/apps/aqhome-mqttlog/mqtt.h +++ b/apps/aqhome-mqttlog/mqtt.h @@ -17,8 +17,8 @@ GWEN_MSG_ENDPOINT *AqHomeMqttLog_CreateMqttEndpoint(GWEN_DB_NODE *dbArgs); -int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT *epTcp); -int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT *epTcp, const char *topicFilter); +/*int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT *epTcp); */ +/* int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT *epTcp, const char *topicFilter); */ int AqHomeMqttLog_Ping(GWEN_MSG_ENDPOINT *epTcp); diff --git a/apps/aqhome-mqttlog/types/0BUILD b/apps/aqhome-mqttlog/types/0BUILD new file mode 100644 index 0000000..08dd080 --- /dev/null +++ b/apps/aqhome-mqttlog/types/0BUILD @@ -0,0 +1,81 @@ + + + + + + + + $(gwenhywfar_cflags) + -I$(topsrcdir) + -I$(topbuilddir) + -I$(topsrcdir)/apps + -I$(topbuilddir)/apps + -I$(builddir) + -I$(srcdir) + + + + --include=$(builddir) + --include=$(srcdir) + + + $(visibility_cflags) + + + + + + device.t2d + value.t2d + topic.t2d + translation.t2d + + + + device.c + value.c + topic.c + translation.c + + + + + + + device.h + device_p.h + value.h + value_p.h + topic.h + topic_p.h + translation.h + translation_p.h + + + + + + + $(local/typefiles) + + + + + + + + + + + + + + + + + + + + + + diff --git a/apps/aqhome-mqttlog/types/device.t2d b/apps/aqhome-mqttlog/types/device.t2d new file mode 100644 index 0000000..0d17bdf --- /dev/null +++ b/apps/aqhome-mqttlog/types/device.t2d @@ -0,0 +1,74 @@ + + + + + + This object and its objects are used to store registered devices and definitions for possible new devices. + + + AQHMQTT_DEVICE + AQHMQTT_Device + device + + + with_list1 + with_list2 + + + +
aqhome/api.h
+
aqhome-mqttlog/types/topic.h
+
+ + + + + + +
+ + + + + + + + + + Only set for registered devices + NULL + NULL + public + own with_getbymember + + + + NULL + NULL + public + own + + + + NULL + NULL + public + own + + + + + NULL + NULL + public + own noconst + none + + + + + +
+ +
+ diff --git a/apps/aqhome-mqttlog/types/topic.t2d b/apps/aqhome-mqttlog/types/topic.t2d new file mode 100644 index 0000000..871d571 --- /dev/null +++ b/apps/aqhome-mqttlog/types/topic.t2d @@ -0,0 +1,106 @@ + + + + + + + + AQHMQTT_TOPIC + AQHMQTT_Topic + topic + + + with_list1 + + + +
aqhome/api.h
+
aqhome-mqttlog/types/value.h
+
+ + + + + + +
+ + + + + + + numeric type + + + JSON type + + + + + + + + + + + + + + + Only set for registered devices + NULL + NULL + public + own + + + + 0 + 0 + public + + + + + 0 + 0 + public + + + + + NULL + NULL + public + own + + + + NULL + NULL + public + own + + + + NULL + NULL + public + own + + + + NULL + NULL + public + own + none + + + + +
+ +
+ diff --git a/apps/aqhome-mqttlog/types/translation.t2d b/apps/aqhome-mqttlog/types/translation.t2d new file mode 100644 index 0000000..ce1d4dc --- /dev/null +++ b/apps/aqhome-mqttlog/types/translation.t2d @@ -0,0 +1,55 @@ + + + + + + + + AQHMQTT_TRANSLATION + AQHMQTT_Translation + translation + + + with_db + with_list1 + + + +
aqhome/api.h
+
+ + + + + + +
+ + + + + + + + + + 0 + 0 + public + own + + + + 0 + 0 + public + own + + + + + +
+ +
+ diff --git a/apps/aqhome-mqttlog/types/value.t2d b/apps/aqhome-mqttlog/types/value.t2d new file mode 100644 index 0000000..64154f6 --- /dev/null +++ b/apps/aqhome-mqttlog/types/value.t2d @@ -0,0 +1,91 @@ + + + + + + + + AQHMQTT_VALUE + AQHMQTT_Value + value + + + with_list1 + with_list2 + + + +
aqhome/api.h
+
aqhome-mqttlog/types/translation.h
+
+ + + + + + +
+ + + + + + + sensor + + + actor + + + + + + + + + + + 0 + 0 + public + own + + + + 0 + 0 + public + + + + + 0 + 0 + public + own + + + + + 0 + 0 + public + own + + + + + NULL + NULL + public + own + none + + + + + +
+ +
+ diff --git a/apps/aqhome-mqttlog/xmlread.c b/apps/aqhome-mqttlog/xmlread.c new file mode 100644 index 0000000..a844987 --- /dev/null +++ b/apps/aqhome-mqttlog/xmlread.c @@ -0,0 +1,346 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./xmlread.h" +#include "./aqhome_mqtt_p.h" +#include "aqhome-mqttlog/types/topic.h" +#include "aqhome-mqttlog/types/value.h" + +#include +#include + +#include +#include +#include +#include + + + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static AQHMQTT_DEVICE_LIST *_readDeviceFiles(AQHOME_MQTT *aqh, const GWEN_STRINGLIST *sl); +static AQHMQTT_DEVICE *_readDeviceFile(AQHOME_MQTT *aqh, const char *sFilename); +static AQHMQTT_DEVICE *_readXmlDevice(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceNode); +static AQHMQTT_TOPIC_LIST *_readXmlTopicList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode); +static AQHMQTT_TOPIC *_readXmlTopic(AQHOME_MQTT *aqh, GWEN_XMLNODE *topicNode); +static AQHMQTT_VALUE_LIST *_readXmlValueList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode); +static AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode); + + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + + +AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadSysconfDeviceFiles(AQHOME_MQTT *aqh) +{ + GWEN_STRINGLIST *sl; + + sl=AQH_GetListOfMatchingSysconfFiles("devices", "*.xml"); + if (sl) { + AQHMQTT_DEVICE_LIST *deviceList; + + deviceList=_readDeviceFiles(aqh, sl); + GWEN_StringList_free(sl); + if (deviceList==NULL) { + DBG_INFO(NULL, "Error reading sysconf device files"); + return NULL; + } + return deviceList; + } + else { + DBG_INFO(NULL, "No sysconf device files"); + return NULL; + } +} + + + +AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadRuntimeDataDeviceFiles(AQHOME_MQTT *aqh) +{ + GWEN_STRINGLIST *sl; + + sl=AQH_GetListOfMatchingRuntimeDataFiles("aqhome-mqtt/devices", "*.xml"); + if (sl) { + AQHMQTT_DEVICE_LIST *deviceList; + + deviceList=_readDeviceFiles(aqh, sl); + GWEN_StringList_free(sl); + if (deviceList==NULL) { + DBG_INFO(NULL, "Error reading sysconf device files"); + return NULL; + } + return deviceList; + } + else { + DBG_INFO(NULL, "No sysconf device files"); + return NULL; + } +} + + + +AQHMQTT_DEVICE_LIST *_readDeviceFiles(AQHOME_MQTT *aqh, const GWEN_STRINGLIST *sl) +{ + GWEN_STRINGLISTENTRY *se; + AQHMQTT_DEVICE_LIST *deviceList; + + deviceList=AQHMQTT_Device_List_new(); + se=GWEN_StringList_FirstEntry(sl); + while(se) { + const char *s; + + s=GWEN_StringListEntry_Data(se); + if (s && *s) { + AQHMQTT_DEVICE *device; + + device=_readDeviceFile(aqh, s); + if (device) + AQHMQTT_Device_List_Add(device, deviceList); + } + se=GWEN_StringListEntry_Next(se); + } + + if (AQHMQTT_Device_List_GetCount(deviceList)<1) { + AQHMQTT_Device_List_free(deviceList); + return NULL; + } + + return deviceList; +} + + + + +AQHMQTT_DEVICE *_readDeviceFile(AQHOME_MQTT *aqh, const char *sFilename) +{ + GWEN_XMLNODE *rootNode; + GWEN_XMLNODE *deviceNode; + int rv; + + rootNode=GWEN_XMLNode_new(GWEN_XMLNodeTypeTag, NULL); + rv=GWEN_XML_ReadFile(rootNode, sFilename, GWEN_XML_FLAGS_DEFAULT); + if (rv<0) { + DBG_ERROR(NULL, "Error reading XML file \"%s\": %d", sFilename, rv); + GWEN_XMLNode_free(rootNode); + return NULL; + } + deviceNode=GWEN_XMLNode_FindFirstTag(rootNode, "device", NULL, NULL); + if (deviceNode) { + AQHMQTT_DEVICE *device; + + device=_readXmlDevice(aqh, deviceNode); + GWEN_XMLNode_free(rootNode); + if (device==NULL) { + DBG_INFO(NULL, "Error reading device from XML file \"%s\" (%d)", sFilename, rv); + return NULL; + } + return device; + } + else { + DBG_INFO(NULL, "XML file \"%s\" does not contain a element", sFilename); + GWEN_XMLNode_free(rootNode); + return NULL; + } + + GWEN_XMLNode_free(rootNode); + return NULL; +} + + + +AQHMQTT_DEVICE *_readXmlDevice(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceNode) +{ + AQHMQTT_DEVICE *device; + GWEN_XMLNODE *topicsNode; + + device=AQHMQTT_Device_new(); + AQHMQTT_Device_SetId(device, GWEN_XMLNode_GetProperty(deviceNode, "id", NULL)); + AQHMQTT_Device_SetName(device, GWEN_XMLNode_GetProperty(deviceNode, "name", NULL)); + AQHMQTT_Device_SetDriver(device, GWEN_XMLNode_GetProperty(deviceNode, "driver", NULL)); + + topicsNode=GWEN_XMLNode_FindFirstTag(deviceNode, "mqtttopics", NULL, NULL); + if (topicsNode) { + AQHMQTT_TOPIC_LIST *topicList; + + topicList=_readXmlTopicList(aqh, topicsNode); + if (topicList) + AQHMQTT_Device_SetTopicList(device, topicList); + else { + DBG_INFO(NULL, "No mqtt topics read"); + AQHMQTT_Device_free(device); + return NULL; + } + } + else { + DBG_INFO(NULL, "No element"); + AQHMQTT_Device_free(device); + return NULL; + } + + return device; +} + + + +AQHMQTT_TOPIC_LIST *_readXmlTopicList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode) +{ + AQHMQTT_TOPIC_LIST *topicList; + GWEN_XMLNODE *topicNode; + + topicList=AQHMQTT_Topic_List_new(); + topicNode=GWEN_XMLNode_FindFirstTag(parentNode, "mqtttopic", NULL, NULL); + while(topicNode) { + AQHMQTT_TOPIC *topic=_readXmlTopic(aqh, topicNode); + if (topic) + AQHMQTT_Topic_List_Add(topic, topicList); + else { + DBG_INFO(NULL, "Error reading element"); + AQHMQTT_Topic_List_free(topicList); + return NULL; + } + topicNode=GWEN_XMLNode_FindNextTag(topicNode, "mqtttopic", NULL, NULL); + } + if (AQHMQTT_Topic_List_GetCount(topicList)<1) { + AQHMQTT_Topic_List_free(topicList); + return NULL; + } + + return topicList; +} + + + +AQHMQTT_TOPIC *_readXmlTopic(AQHOME_MQTT *aqh, GWEN_XMLNODE *topicNode) +{ + AQHMQTT_TOPIC *topic; + GWEN_XMLNODE *valuesNode; + int i; + const char *s; + + topic=AQHMQTT_Topic_new(); + s=GWEN_XMLNode_GetProperty(topicNode, "type", NULL); + i=AQHMQTT_TopicType_fromString(s); + if (i==AQHMQTT_TopicType_Unknown) { + DBG_ERROR(NULL, "Invalid topic type \"%s\"", s?s:""); + AQHMQTT_Topic_free(topic); + return NULL; + } + AQHMQTT_Topic_SetTopicType(topic, i); + + s=GWEN_XMLNode_GetProperty(topicNode, "direction", NULL); + i=AQHMQTT_TopicDir_fromString(s); + if (i==AQHMQTT_TopicDir_Unknown) { + DBG_ERROR(NULL, "Invalid topic direction \"%s\"", s?s:""); + AQHMQTT_Topic_free(topic); + return NULL; + } + AQHMQTT_Topic_SetDirection(topic, i); + + AQHMQTT_Topic_SetTopic(topic, GWEN_XMLNode_GetCharValue(topicNode, "topic", NULL)); + AQHMQTT_Topic_SetMask(topic, GWEN_XMLNode_GetCharValue(topicNode, "mask", NULL)); + AQHMQTT_Topic_SetBeforeId(topic, GWEN_XMLNode_GetCharValue(topicNode, "beforeId", NULL)); + AQHMQTT_Topic_SetAfterId(topic, GWEN_XMLNode_GetCharValue(topicNode, "afterId", NULL)); + + valuesNode=GWEN_XMLNode_FindFirstTag(topicNode, "values", NULL, NULL); + if (valuesNode) { + AQHMQTT_VALUE_LIST *valueList; + + valueList=_readXmlValueList(aqh, valuesNode); + if (valueList) + AQHMQTT_Topic_SetValueList(topic, valueList); + else { + DBG_INFO(NULL, "No values read"); + AQHMQTT_Topic_free(topic); + return NULL; + } + } + else { + DBG_INFO(NULL, "No element"); + AQHMQTT_Topic_free(topic); + return NULL; + } + + return topic; +} + + + +AQHMQTT_VALUE_LIST *_readXmlValueList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode) +{ + AQHMQTT_VALUE_LIST *valueList; + GWEN_XMLNODE *valueNode; + + valueList=AQHMQTT_Value_List_new(); + valueNode=GWEN_XMLNode_FindFirstTag(parentNode, "value", NULL, NULL); + while(valueNode) { + AQHMQTT_VALUE *value=_readXmlValue(aqh, valueNode); + if (value) + AQHMQTT_Value_List_Add(value, valueList); + else { + DBG_INFO(NULL, "Error reading element"); + AQHMQTT_Value_List_free(valueList); + return NULL; + } + valueNode=GWEN_XMLNode_FindNextTag(valueNode, "value", NULL, NULL); + } + if (AQHMQTT_Value_List_GetCount(valueList)<1) { + AQHMQTT_Value_List_free(valueList); + return NULL; + } + + return valueList; +} + + + +AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode) +{ + AQHMQTT_VALUE *value; + const char *s; + int i; + + value=AQHMQTT_Value_new(); + AQHMQTT_Value_SetName(value, GWEN_XMLNode_GetProperty(valueNode, "name", NULL)); + AQHMQTT_Value_SetValueUnits(value, GWEN_XMLNode_GetProperty(valueNode, "units", NULL)); + AQHMQTT_Value_SetPath(value, GWEN_XMLNode_GetProperty(valueNode, "path", NULL)); + + s=GWEN_XMLNode_GetProperty(valueNode, "type", NULL); + i=AQHMQTT_ValueType_fromString(s); + if (i==AQHMQTT_ValueType_Unknown) { + DBG_ERROR(NULL, "Invalid value type \"%s\"", s?s:""); + AQHMQTT_Value_free(value); + return NULL; + } + AQHMQTT_Value_SetValueType(value, i); + + /* TODO: read translationList */ + return value; +} + + + diff --git a/apps/aqhome-mqttlog/xmlread.h b/apps/aqhome-mqttlog/xmlread.h new file mode 100644 index 0000000..b94f721 --- /dev/null +++ b/apps/aqhome-mqttlog/xmlread.h @@ -0,0 +1,26 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_MQTTLOG_XMLREAD_H +#define AQHOME_MQTTLOG_XMLREAD_H + + +#include "aqhome-mqttlog/aqhome_mqtt.h" +#include "aqhome-mqttlog/types/device.h" + + + +AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadSysconfDeviceFiles(AQHOME_MQTT *aqh); +AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadRuntimeDataDeviceFiles(AQHOME_MQTT *aqh); + + + + +#endif + + diff --git a/apps/aqhome-nodes/loop_tty_broker.c b/apps/aqhome-nodes/loop_tty_broker.c index 24c8825..3baa70a 100644 --- a/apps/aqhome-nodes/loop_tty_broker.c +++ b/apps/aqhome-nodes/loop_tty_broker.c @@ -62,8 +62,6 @@ static void _setDeviceName(AQH_VALUE *value, uint32_t uid); * ------------------------------------------------------------------------------------------------ */ - - void AqHomed_ForwardTtyMsgToBroker(AQHOMED *aqh, const GWEN_MSG *nodeMsg) { if (GWEN_MsgEndpoint_GetState(aqh->brokerEndpoint)==GWEN_MSG_ENDPOINT_STATE_CONNECTED) { diff --git a/aqhome/0BUILD b/aqhome/0BUILD index dc8d720..f70c046 100644 --- a/aqhome/0BUILD +++ b/aqhome/0BUILD @@ -21,9 +21,10 @@ - + - + + $(visibility_cflags) diff --git a/aqhome/aqhome.c b/aqhome/aqhome.c index 3b7158c..1832f82 100644 --- a/aqhome/aqhome.c +++ b/aqhome/aqhome.c @@ -25,6 +25,7 @@ #define AQHOME_PM_LIBNAME "aqhome" #define AQHOME_PM_SYSCONFDIR "sysconfdir" #define AQHOME_PM_DATADIR "datadir" +#define AQHOME_PM_RTDATADIR "rtdatadir" #define AQHOME_PM_LOCALEDIR "localedir" #define AQHOME_SYSCONFIG_FILE "aqhome.conf" @@ -37,6 +38,7 @@ static void _initPathManager(void); static void _finiPathManager(void); static void _initI18n(void); static void _definePath(const char *pathName, const char *pathValue); +static GWEN_STRINGLIST *_getListOfMatchingFiles(const char *pathName, const char *subFolder, const char *mask); @@ -83,6 +85,28 @@ GWEN_STRINGLIST *AQH_GetGlobalSysconfDirs(void) +GWEN_STRINGLIST *AQH_GetListOfMatchingDataFiles(const char *subFolder, const char *mask) +{ + return _getListOfMatchingFiles(AQHOME_PM_DATADIR, subFolder, mask); +} + + + +GWEN_STRINGLIST *AQH_GetListOfMatchingRuntimeDataFiles(const char *subFolder, const char *mask) +{ + return _getListOfMatchingFiles(AQHOME_PM_RTDATADIR, subFolder, mask); +} + + + +GWEN_STRINGLIST *AQH_GetListOfMatchingSysconfFiles(const char *subFolder, const char *mask) +{ + return _getListOfMatchingFiles(AQHOME_PM_SYSCONFDIR, subFolder, mask); +} + + + + GWEN_DB_NODE *AQH_LoadConfigFile(void) { GWEN_BUFFER *fbuf; @@ -161,6 +185,7 @@ void _initPathManager(void) _definePath(AQHOME_PM_SYSCONFDIR, AQHOME_SYSCONF_DIR); _definePath(AQHOME_PM_LOCALEDIR, AQHOME_SYSCONF_DIR); _definePath(AQHOME_PM_DATADIR, AQHOME_DATA_DIR); + _definePath(AQHOME_PM_RTDATADIR, AQHOME_RTDATA_DIR); } @@ -168,6 +193,7 @@ void _initPathManager(void) void _finiPathManager(void) { GWEN_PathManager_UndefinePath(AQHOME_PM_LIBNAME, AQHOME_PM_LOCALEDIR); + GWEN_PathManager_UndefinePath(AQHOME_PM_LIBNAME, AQHOME_PM_RTDATADIR); GWEN_PathManager_UndefinePath(AQHOME_PM_LIBNAME, AQHOME_PM_DATADIR); GWEN_PathManager_UndefinePath(AQHOME_PM_LIBNAME, AQHOME_PM_SYSCONFDIR); GWEN_PathManager_RemovePaths(AQHOME_PM_LIBNAME); @@ -218,3 +244,22 @@ void _definePath(const char *pathName, const char *pathValue) +GWEN_STRINGLIST *_getListOfMatchingFiles(const char *pathName, const char *subFolder, const char *mask) +{ + int rv; + GWEN_STRINGLIST *sl; + + sl=GWEN_StringList_new(); + rv=GWEN_PathManager_GetMatchingFilesRecursively(AQHOME_PM_LIBNAME, AQHOME_PM_DATADIR, subFolder, sl, mask); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, + "Error listing matching data files (folder=%s, mask=%s)", + subFolder?subFolder:"", mask?mask:""); + GWEN_StringList_free(sl); + return NULL; + } + return sl; +} + + + diff --git a/aqhome/aqhome.h b/aqhome/aqhome.h index 7ee1b00..0438dec 100644 --- a/aqhome/aqhome.h +++ b/aqhome/aqhome.h @@ -23,6 +23,10 @@ AQHOME_API void AQH_Fini(void); AQHOME_API GWEN_DB_NODE *AQH_LoadConfigFile(void); AQHOME_API void AQH_MergeConfigFileIntoConfig(GWEN_DB_NODE *dbArgs, const char *destDbGroupName); +AQHOME_API GWEN_STRINGLIST *AQH_GetListOfMatchingDataFiles(const char *subFolder, const char *mask); +AQHOME_API GWEN_STRINGLIST *AQH_GetListOfMatchingRuntimeDataFiles(const char *subFolder, const char *mask); +AQHOME_API GWEN_STRINGLIST *AQH_GetListOfMatchingSysconfFiles(const char *subFolder, const char *mask); + AQHOME_API GWEN_STRINGLIST *AQH_GetGlobalDataDirs(void); AQHOME_API GWEN_STRINGLIST *AQH_GetGlobalSysconfDirs(void); diff --git a/etc/aqhome.conf b/etc/aqhome.conf index fcded1b..1cd500b 100644 --- a/etc/aqhome.conf +++ b/etc/aqhome.conf @@ -4,3 +4,6 @@ int brokerPort=1899 char nodesAddr="127.0.0.1" int nodesPort=45454 +char mqttAddr="192.168.117.192" +int mqttPort=1883 +