More work on mqtt tool.

This commit is contained in:
Martin Preuss
2023-10-04 16:02:02 +02:00
parent 4730943931
commit bfed937950
27 changed files with 1527 additions and 300 deletions

4
0BUILD
View File

@@ -44,6 +44,7 @@
<setVar name="includedir">$(option_prefix)/include</setVar>
<setVar name="datarootdir">$(option_prefix)/share</setVar>
<setVar name="datadir">$(option_prefix)/share</setVar>
<setVar name="rtdatadir">$(option_prefix)/var/lib</setVar>
<setVar name="localedir">$(option_prefix)/share/locale</setVar>
<setVar name="pkglibdir">$(libdir)/$(package)</setVar>
@@ -87,6 +88,7 @@
<setVar name="aqhome_cfg_searchdir">etc</setVar>
<setVar name="aqhome_locale_searchdir">share/locale</setVar>
<setVar name="aqhome_data_searchdir">share"</setVar>
<setVar name="aqhome_rtdata_searchdir">var/lib</setVar>
</then>
<else>
<define name="OS_POSIX" value="1" />
@@ -96,11 +98,13 @@
<setVar name="aqhome_cfg_searchdir">etc</setVar>
<setVar name="aqhome_locale_searchdir">share/locale</setVar>
<setVar name="aqhome_data_searchdir">share</setVar>
<setVar name="aqhome_rtdata_searchdir">var/lib</setVar>
</then>
<else>
<setVar name="aqhome_cfg_searchdir">$(sysconfdir)</setVar>
<setVar name="aqhome_locale_searchdir">$(datadir)/locale</setVar>
<setVar name="aqhome_data_searchdir">$(datadir)</setVar>
<setVar name="aqhome_rtdata_searchdir">$(rtdatadir)</setVar>
</else>
</ifVarMatches>
</else>

View File

@@ -8,6 +8,8 @@
$(gwenhywfar_cflags)
-I$(topsrcdir)
-I$(topbuilddir)
-I$(topsrcdir)/apps
-I$(topbuilddir)/apps
-I$(builddir)
-I$(srcdir)
</includes>
@@ -47,10 +49,14 @@
<headers dist="true" >
init.h
fini.h
loop.h
loop_ipc.h
loop_mqtt.h
aqhome_mqtt.h
aqhome_mqtt_p.h
mqtt.h
messages.h
xmlread.h
</headers>
<sources>
@@ -58,13 +64,18 @@
aqhome_mqtt.c
init.c
fini.c
loop.c
loop_ipc.c
loop_mqtt.c
main.c
mqtt.c
messages.c
xmlread.c
</sources>
<useTargets>
aqhome
aqhmqtt_types
</useTargets>
<libraries>
@@ -72,6 +83,7 @@
</libraries>
<subdirs>
types
</subdirs>

View File

@@ -24,7 +24,7 @@
AQHOME_MQTT *AqHomeMqtt_new()
AQHOME_MQTT *AqHomeMqtt_new(void)
{
AQHOME_MQTT *aqh;
@@ -38,6 +38,8 @@ AQHOME_MQTT *AqHomeMqtt_new()
void AqHomeMqtt_free(AQHOME_MQTT *aqh)
{
if (aqh) {
AQHMQTT_Device_List_free(aqh->availableDeviceList);
AQHMQTT_Device_List_free(aqh->registeredDeviceList);
GWEN_MsgEndpoint_free(aqh->rootEndpoint);
GWEN_DB_Group_free(aqh->dbArgs);
free(aqh->pidFile);
@@ -90,3 +92,27 @@ int AqHomeMqtt_GetTimeout(const AQHOME_MQTT *aqh)
{
return aqh?aqh->timeout:0;
}
AQHMQTT_DEVICE_LIST *AqHomeMqtt_GetAvailableDeviceList(const AQHOME_MQTT *aqh)
{
return aqh?aqh->availableDeviceList:NULL;
}
void AqHomeMqtt_SetAvailableDeviceList(AQHOME_MQTT *aqh, AQHMQTT_DEVICE_LIST *dl)
{
if (aqh) {
AQHMQTT_Device_List_free(aqh->availableDeviceList);
aqh->availableDeviceList=dl;
}
}

View File

@@ -12,6 +12,7 @@
#include "./mqttvalue.h"
#include "./mqtttopic.h"
#include "aqhome-mqttlog/types/device.h"
#include <gwenhywfar/endpoint.h>
@@ -21,7 +22,7 @@
typedef struct AQHOME_MQTT AQHOME_MQTT;
AQHOME_MQTT *AqHomeMqtt_new();
AQHOME_MQTT *AqHomeMqtt_new(void);
void AqHomeMqtt_free(AQHOME_MQTT *aqh);
GWEN_MSG_ENDPOINT *AqHomeMqtt_GetBrokerEndpoint(const AQHOME_MQTT *aqh);
@@ -35,6 +36,8 @@ void AqHomeMqtt_SetPidFile(AQHOME_MQTT *aqh, const char *s);
int AqHomeMqtt_GetTimeout(const AQHOME_MQTT *aqh);
AQHMQTT_DEVICE_LIST *AqHomeMqtt_GetAvailableDeviceList(const AQHOME_MQTT *aqh);
void AqHomeMqtt_SetAvailableDeviceList(AQHOME_MQTT *aqh, AQHMQTT_DEVICE_LIST *dl);
#endif

View File

@@ -16,7 +16,7 @@
#define AQHOME_MQTT_DEFAULT_PIDFILE "/var/run/aqhome-mqtt.pid"
#define AQHOME_MQTT_DEFAULT_DATADIR "/var/lib/aqhome-mqtt/data"
#define AQHOME_MQTT_DEFAULT_DATADIR "/var/lib/aqhome-mqtt"
#define AQHOME_MQTT_DEFAULT_BROKER_PORT 1899
#define AQHOME_MQTT_DEFAULT_BROKER_CLIENTID "mqtt"
@@ -35,6 +35,10 @@ struct AQHOME_MQTT {
AQH_MQTT_VALUE *mqttValueList;
AQH_MQTT_TOPIC *mqttTopicList;
AQHMQTT_DEVICE_LIST *availableDeviceList;
AQHMQTT_DEVICE_LIST *registeredDeviceList;
};

View File

@@ -14,6 +14,7 @@
#include "./init.h"
#include "./mqtt.h"
#include "./aqhome_mqtt_p.h"
#include "./xmlread.h"
#include "aqhome/ipc/endpoint_ipc.h"
#include "aqhome/ipc/endpoint_ipcclient.h"
@@ -104,6 +105,17 @@ int AqHomeMqtt_Init(AQHOME_MQTT *aqh, int argc, char **argv)
void AqHomeMqtt_ReloadDeviceFiles(AQHOME_MQTT *aqh)
{
AQHMQTT_DEVICE_LIST *deviceList;
deviceList=AqHomeMqttLog_ReadSysconfDeviceFiles(aqh);
if (deviceList)
AqHomeMqtt_SetAvailableDeviceList(aqh, deviceList);
}
int _createPidFile(const char *pidFilename)
{
FILE *f;

View File

@@ -0,0 +1,56 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./loop.h"
#include "./loop_ipc.h"
#include "./loop_mqtt.h"
#include "./aqhome_mqtt_p.h"
#include <gwenhywfar/gwenhywfar.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endpoint.h>
#include <gwenhywfar/text.h>
#include <gwenhywfar/json_read.h>
#include <gwenhywfar/db.h>
//#define FULL_DEBUG
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
void AqHomeMqttLog_Loop(AQHOME_MQTT *aqh, int timeoutInMsecs)
{
if (aqh) {
GWEN_MsgEndpoint_ChildrenIoLoop(aqh->rootEndpoint, timeoutInMsecs);
AqHomeMqttLog_ReadAndHandleMqttMessages(aqh);
AqHomeMqttLog_ReadAndHandleIpcMessages(aqh);
}
}

View File

@@ -0,0 +1,26 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQHOMEMQTT_LOOP_H
#define AQHOMEMQTT_LOOP_H
#include "./aqhome_mqtt.h"
#include <gwenhywfar/endpoint.h>
#include <gwenhywfar/msg.h>
void AqHomeMqttLog_Loop(AQHOME_MQTT *aqh, int timeoutInMsecs);
#endif

View File

@@ -0,0 +1,58 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./loop_ipc.h"
#include "./aqhome_mqtt_p.h"
#include "aqhome/ipc/data/ipc_data.h"
#include <gwenhywfar/gwenhywfar.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endpoint.h>
#include <gwenhywfar/text.h>
#include <gwenhywfar/json_read.h>
#include <gwenhywfar/db.h>
#define FULL_DEBUG
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
void AqHomeMqttLog_ReadAndHandleIpcMessages(AQHOME_MQTT *aqh)
{
GWEN_MSG_ENDPOINT *epTcp;
GWEN_MSG *msg;
epTcp=aqh->brokerEndpoint;
while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) {
GWEN_Msg_free(msg);
}
}

View File

@@ -0,0 +1,26 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQHOMEMQTT_LOOP_IPC_H
#define AQHOMEMQTT_LOOP_IPC_H
#include "./aqhome_mqtt.h"
#include <gwenhywfar/endpoint.h>
#include <gwenhywfar/msg.h>
void AqHomeMqttLog_ReadAndHandleIpcMessages(AQHOME_MQTT *aqh);
#endif

View File

@@ -0,0 +1,405 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./loop_mqtt.h"
#include "./aqhome_mqtt_p.h"
#include "aqhome/mqtt/msg_mqtt_publish.h"
#include "aqhome/ipc/data/msg_data_multidata.h"
#include "aqhome/ipc/data/ipc_data.h"
#include <gwenhywfar/gwenhywfar.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endpoint.h>
#include <gwenhywfar/text.h>
#include <gwenhywfar/json_read.h>
#include <gwenhywfar/db.h>
#define FULL_DEBUG
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static void _handleMqttMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg);
static void _handlePublishMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg);
static int _handlePublish(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *topic, const char *value);
static void _handleNumTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *dev, AQHMQTT_TOPIC *t, const char *rcvdValue);
static void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *dev, AQHMQTT_TOPIC *t, const char *rcvdValue);
static void _sendMessage(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_VALUE *value, const char *rcvdValue);
static int _registerNewDeviceForTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue);
static AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir);
static AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir);
static GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
void AqHomeMqttLog_ReadAndHandleMqttMessages(AQHOME_MQTT *aqh)
{
GWEN_MSG_ENDPOINT *epTcp;
GWEN_MSG *msg;
epTcp=aqh->mqttEndpoint;
while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) {
#ifdef FULL_DEBUG
DBG_ERROR(NULL, "Received this message:");
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
#endif
_handleMqttMsg(aqh, epTcp, msg);
GWEN_Msg_free(msg);
}
}
int AqHomeMqttLog_SendPing(AQHOME_MQTT *aqh)
{
GWEN_MSG_ENDPOINT *epTcp;
GWEN_MSG *msgOut;
DBG_INFO(AQH_LOGDOMAIN, "Sending PING");
epTcp=aqh->mqttEndpoint;
msgOut=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL);
if (msgOut==NULL) {
DBG_ERROR(NULL, "Error creating message");
return GWEN_ERROR_INTERNAL;
}
GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut);
return 0;
}
void _handleMqttMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg)
{
if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) {
#ifdef FULL_DEBUG
GWEN_BUFFER *buf;
buf=GWEN_Buffer_new(0, 256, 0, 1);
AQH_PublishMqttMsg_DumpToBuffer(msg, buf, "received");
fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf));
GWEN_Buffer_free(buf);
#endif
_handlePublishMsg(aqh, ep, msg);
}
else if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0)) {
DBG_INFO(AQH_LOGDOMAIN, "PING response received");
}
else {
#ifdef FULL_DEBUG
DBG_ERROR(NULL, "Received this message:");
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
#endif
}
}
void _handlePublishMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg)
{
char *topic;
char *value;
topic=AQH_PublishMqttMsg_ExtractTopic(msg);
value=AQH_PublishMqttMsg_ExtractValue(msg);
if (topic && value) {
int rv;
rv=_handlePublish(aqh, ep, topic, value);
if (rv!=1) {
DBG_INFO(NULL, "New topic \"%s\", trying to register", topic);
rv=_registerNewDeviceForTopic(aqh, ep, topic, value);
if (rv==1) {
rv=_handlePublish(aqh, ep, topic, value);
if (rv!=1) {
DBG_ERROR(NULL, "Topic \"%s\" still not handled, SNH!", topic);
}
}
}
}
else {
DBG_ERROR(NULL, "Either topic or value missing in PUBLISH msg");
}
free(value);
free(topic);
}
int _handlePublish(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue)
{
if (rcvdTopic && *rcvdTopic) {
if (aqh->registeredDeviceList) {
AQHMQTT_DEVICE *device;
device=AQHMQTT_Device_List_First(aqh->registeredDeviceList);
while(device) {
AQHMQTT_TOPIC_LIST *topicList;
topicList=AQHMQTT_Device_GetTopicList(device);
if (topicList) {
AQHMQTT_TOPIC *topic;
topic=_findTopicMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In);
if (topic==NULL) {
topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In);
if (topic)
AQHMQTT_Topic_SetTopic(topic, rcvdTopic);
}
if (topic) {
if (AQHMQTT_Topic_GetTopicType(topic)==AQHMQTT_TopicType_Json)
_handleJsonTopic(aqh, ep, device, topic, rcvdValue);
else
_handleNumTopic(aqh, ep, device, topic, rcvdValue);
return 1;
}
}
device=AQHMQTT_Device_List_Next(device);
}
}
DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic);
}
return 0;
}
void _handleNumTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue)
{
AQHMQTT_VALUE_LIST *valueList;
valueList=AQHMQTT_Topic_GetValueList(topic);
if (valueList)
_sendMessage(aqh, ep, device, AQHMQTT_Value_List_First(valueList), rcvdValue);
else {
DBG_INFO(NULL, "No value list in device \"%s\"", AQHMQTT_Device_GetId(device));
}
}
void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue)
{
GWEN_JSON_ELEM *jeRoot;
jeRoot=GWEN_JsonElement_fromString(rcvdValue);
if (jeRoot==NULL) {
DBG_INFO(NULL, "Could not parse JSON value: %s", rcvdValue);
}
else {
AQHMQTT_VALUE_LIST *valueList;
valueList=AQHMQTT_Topic_GetValueList(topic);
if (valueList) {
AQHMQTT_VALUE *value;
value=AQHMQTT_Value_List_First(valueList);
while(value) {
const char *path;
path=AQHMQTT_Value_GetPath(value);
if (path && *path) {
GWEN_JSON_ELEM *je;
je=GWEN_JsonElement_GetElementByPath(jeRoot, path, 0);
if (je) {
const char *s;
s=GWEN_JsonElement_GetData(je);
if (s && *s)
_sendMessage(aqh, ep, device, value, s);
}
}
value=AQHMQTT_Value_List_Next(value);
} /* while */
}
GWEN_JsonElement_free(jeRoot);
}
}
void _sendMessage(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_VALUE *value, const char *rcvdValue)
{
int rv;
union {double f; uint64_t i;} u;
rv=GWEN_Text_StringToDouble(rcvdValue, &(u.f));
if (rv<0) {
DBG_ERROR(NULL, "Invalid value received from MQTT server (%s)", rcvdValue?rcvdValue:"<empty>");
}
else {
GWEN_MSG *pubMsg;
uint64_t arrayToSend[2];
AQH_VALUE *msgValue;
arrayToSend[0]=(uint64_t) time(NULL);
arrayToSend[1]=u.i;
msgValue=AQH_Value_new();
AQH_Value_SetDeviceName(msgValue, AQHMQTT_Device_GetId(device));
AQH_Value_SetName(msgValue, AQHMQTT_Value_GetName(value));
AQH_Value_SetValueUnits(msgValue, AQHMQTT_Value_GetValueUnits(value));
AQH_Value_SetValueType(msgValue, 0);
pubMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgValue, arrayToSend, 1);
if (pubMsg) {
DBG_INFO(AQH_LOGDOMAIN, "BROKER PUBLISH %s: %f", AQH_Value_GetName(msgValue), u.f);
GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, pubMsg);
}
AQH_Value_free(msgValue);
}
}
int _registerNewDeviceForTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue)
{
if (rcvdTopic && *rcvdTopic) {
if (aqh->availableDeviceList) {
AQHMQTT_DEVICE *device;
device=AQHMQTT_Device_List_First(aqh->availableDeviceList);
while(device) {
AQHMQTT_TOPIC_LIST *topicList;
topicList=AQHMQTT_Device_GetTopicList(device);
if (topicList) {
AQHMQTT_TOPIC *topic;
topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In);
if (topic) {
GWEN_BUFFER *buf;
buf=_extractDeviceId(topic, rcvdTopic);
if (buf) {
AQHMQTT_DEVICE *newDevice;
newDevice=AQHMQTT_Device_dup(device);
AQHMQTT_Device_SetId(newDevice, GWEN_Buffer_GetStart(buf));
topic=_findMaskMatchingTopic(AQHMQTT_Device_GetTopicList(newDevice), rcvdTopic, AQHMQTT_TopicDir_In);
AQHMQTT_Topic_SetTopic(topic, rcvdTopic);
if (aqh->registeredDeviceList==NULL)
aqh->registeredDeviceList=AQHMQTT_Device_List_new();
DBG_ERROR(NULL, "Registered device \"%s\" (%s)", AQHMQTT_Device_GetId(newDevice), AQHMQTT_Device_GetName(newDevice));
AQHMQTT_Device_List_Add(newDevice, aqh->registeredDeviceList);
GWEN_Buffer_free(buf);
return 1;
}
}
}
device=AQHMQTT_Device_List_Next(device);
}
}
DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic);
}
return 0;
}
AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir)
{
if (topicList) {
AQHMQTT_TOPIC *topic;
topic=AQHMQTT_Topic_List_First(topicList);
while(topic) {
if (AQHMQTT_Topic_GetDirection(topic)==dir) {
const char *sMask;
sMask=AQHMQTT_Topic_GetMask(topic);
if (sMask && *sMask && GWEN_Text_ComparePattern(rcvdTopic, sMask, 0)!=-1) {
/* found a matching topic */
return topic;
}
}
topic=AQHMQTT_Topic_List_Next(topic);
}
}
return NULL;
}
AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir)
{
if (topicList) {
AQHMQTT_TOPIC *topic;
topic=AQHMQTT_Topic_List_First(topicList);
while(topic) {
if (AQHMQTT_Topic_GetDirection(topic)==dir) {
const char *sTopic;
sTopic=AQHMQTT_Topic_GetTopic(topic);
if (sTopic && *sTopic && strcasecmp(rcvdTopic, sTopic)==0) {
return topic;
}
}
topic=AQHMQTT_Topic_List_Next(topic);
}
}
return NULL;
}
GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic)
{
const char *sBefore;
const char *sAfter;
sBefore=AQHMQTT_Topic_GetBeforeId(topic);
sAfter=AQHMQTT_Topic_GetAfterId(topic);
if (sBefore && *sBefore && sAfter && *sAfter) {
GWEN_BUFFER *buf;
int rv;
buf=GWEN_Buffer_new(0, 256, 0, 1);
GWEN_Buffer_AppendString(buf, rcvdTopic);
rv=GWEN_Buffer_KeepTextBetweenStrings(buf, sBefore, sAfter, 1);
if (rv<0) {
DBG_INFO(NULL, "Could not extract id from [%s] (beforeId=%s, afterId=%s)", rcvdTopic, sBefore, sAfter);
GWEN_Buffer_free(buf);
return NULL;
}
return buf;
}
return NULL;
}

View File

@@ -0,0 +1,27 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQHOMEMQTT_LOOP_MQTT_H
#define AQHOMEMQTT_LOOP_MQTT_H
#include "./aqhome_mqtt.h"
#include <gwenhywfar/endpoint.h>
#include <gwenhywfar/msg.h>
void AqHomeMqttLog_ReadAndHandleMqttMessages(AQHOME_MQTT *aqh);
int AqHomeMqttLog_SendPing(AQHOME_MQTT *aqh);
#endif

View File

@@ -11,12 +11,12 @@
#endif
#include "./item.h"
#include "./mqtt.h"
#include "./messages.h"
#include "./init.h"
#include "./fini.h"
#include "./loop.h"
#include "./loop_mqtt.h"
#include "aqhome/aqhome.h"
#include "aqhome/mqtt/msg_mqtt.h"
#include "aqhome/mqtt/msg_mqtt_publish.h"
#include <gwenhywfar/gwenhywfar.h>
#include <gwenhywfar/args.h>
@@ -49,7 +49,7 @@
#define AQHOME_MQTTLOG_PING_INTERVAL 120
//#define FULL_DEBUG
#define FULL_DEBUG
@@ -58,9 +58,7 @@
* ------------------------------------------------------------------------------------------------
*/
static int _serve(GWEN_DB_NODE *dbArgs);
static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs);
static int _createPidFile(const char *pidFilename);
static int _serve(AQHOME_MQTT *aqh);
#ifdef HAVE_SIGNAL_H
static int _setSignalHandlers(void);
@@ -87,6 +85,7 @@ static int stopService=0;
int main(int argc, char **argv)
{
AQHOME_MQTT *aqh;
GWEN_DB_NODE *dbArgs;
int rv;
GWEN_GUI *gui;
@@ -108,16 +107,16 @@ int main(int argc, char **argv)
return 2;
}
dbArgs=GWEN_DB_Group_new("arguments");
rv=_readArgs(argc, argv, dbArgs);
aqh=AqHomeMqtt_new();
rv=AqHomeMqtt_Init(aqh, argc, argv);
if (rv<0) {
if (rv==GWEN_ERROR_CLOSE)
return 1;
DBG_INFO(NULL, "here (%d)", rv);
return 2;
}
else if (rv==1) {
DBG_INFO(NULL, "Help printed, done");
return 0;
}
dbArgs=AqHomeMqtt_GetDbArgs(aqh);
gui=GWEN_Gui_CGui_new();
s=GWEN_DB_GetCharValue(dbArgs, "charset", 0, NULL);
@@ -125,13 +124,15 @@ int main(int argc, char **argv)
GWEN_Gui_SetCharSet(gui, s);
GWEN_Gui_SetGui(gui);
rv=_serve(dbArgs);
rv=_serve(aqh);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
return 2;
}
GWEN_DB_Group_free(dbArgs);
AqHomeMqtt_Fini(aqh);
AqHomeMqtt_free(aqh);
GWEN_Gui_SetGui(NULL);
GWEN_Gui_free(gui);
@@ -140,24 +141,18 @@ int main(int argc, char **argv)
int _serve(GWEN_DB_NODE *dbArgs)
int _serve(AQHOME_MQTT *aqh)
{
const char *pidFile;
GWEN_MSG_ENDPOINT *epTcp;
ITEM_LIST *itemList;
int rv;
int timeout;
time_t startTime;
time_t lastPingSendTime;
const char *baseFolder;
GWEN_DB_NODE *dbArgs;
startTime=time(NULL);
itemList=AqHomeMqttLog_ReadItems(dbArgs);
if (itemList==NULL) {
DBG_ERROR(NULL, "No items to listen to, aborting.");
return GWEN_ERROR_GENERIC;
}
dbArgs=AqHomeMqtt_GetDbArgs(aqh);
rv=_setSignalHandlers();
if (rv<0) {
@@ -165,80 +160,15 @@ int _serve(GWEN_DB_NODE *dbArgs)
return rv;
}
baseFolder=GWEN_DB_GetCharValue(dbArgs, "writeToFolder", 0, "/tmp/aqhome");
timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0);
pidFile=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, "aqhome-mqttlog.pid");
if (pidFile && *pidFile) {
rv=_createPidFile(pidFile);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
return rv;
}
}
epTcp=AqHomeMqttLog_CreateMqttEndpoint(dbArgs);
if (epTcp==NULL) {
DBG_INFO(NULL, "here");
Item_List_free(itemList);
return GWEN_ERROR_GENERIC;
}
rv=AqHomeMqttLog_MqttConnect(epTcp);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
GWEN_MsgEndpoint_free(epTcp);
Item_List_free(itemList);
return rv;
}
rv=AqHomeMqttLog_Subscribe(epTcp, "#");
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
GWEN_MsgEndpoint_free(epTcp);
Item_List_free(itemList);
return rv;
}
lastPingSendTime=time(NULL);
epTcp=AqHomeMqtt_GetMqttEndpoint(aqh);
while(!stopService) {
DBG_DEBUG(NULL, "Next loop");
GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
if (GWEN_MsgEndpoint_GetState(epTcp)!=GWEN_MSG_ENDPOINT_STATE_CONNECTED) {
DBG_INFO(NULL, "Not connected...");
}
else {
GWEN_MSG *msg;
while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) {
#ifdef FULL_DEBUG
DBG_ERROR(NULL, "Received this message:");
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
#endif
if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) {
#ifdef FULL_DEBUG
GWEN_BUFFER *buf;
buf=GWEN_Buffer_new(0, 256, 0, 1);
AQH_PublishMqttMsg_DumpToBuffer(msg, buf, "received");
fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf));
GWEN_Buffer_free(buf);
#endif
AqHomeMqttLog_HandlePublishMsg(baseFolder, itemList, msg);
}
else if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0)) {
DBG_INFO(AQH_LOGDOMAIN, "PING response received");
}
else {
#ifdef FULL_DEBUG
DBG_ERROR(NULL, "Received this message:");
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
#endif
}
GWEN_Msg_free(msg);
}
}
AqHomeMqttLog_Loop(aqh, 2000);
if (timeout) {
time_t now;
@@ -254,7 +184,7 @@ int _serve(GWEN_DB_NODE *dbArgs)
now=time(NULL);
if (now-lastPingSendTime>AQHOME_MQTTLOG_PING_INTERVAL) {
rv=AqHomeMqttLog_Ping(epTcp);
rv=AqHomeMqttLog_SendPing(aqh);
if (rv<0) {
DBG_INFO(NULL, "Error sending PING");
}
@@ -263,11 +193,7 @@ int _serve(GWEN_DB_NODE *dbArgs)
}
}
if (pidFile && *pidFile)
remove(pidFile);
GWEN_MsgEndpoint_free(epTcp);
Item_List_free(itemList);
return 0;
}
@@ -339,195 +265,3 @@ void _signalHandler(int s)
int _createPidFile(const char *pidFilename)
{
FILE *f;
int pidfd;
if (remove(pidFilename)==0) {
DBG_ERROR(0, "Old PID file existed, removed. (Unclean shutdown?)");
}
#ifdef HAVE_SYS_STAT_H
pidfd = open(pidFilename, O_EXCL|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
if (pidfd < 0) {
DBG_ERROR(NULL, "Could not create PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno));
return GWEN_ERROR_IO;
}
f = fdopen(pidfd, "w");
#else /* HAVE_STAT_H */
f=fopen(pidFilename,"w+");
#endif /* HAVE_STAT_H */
/* write pid */
#ifdef HAVE_GETPID
fprintf(f,"%d\n",getpid());
#else
fprintf(f,"-1\n");
#endif
if (fclose(f)) {
DBG_ERROR(0, "Could not close PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno));
return GWEN_ERROR_IO;
}
return 0;
}
int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs)
{
int rv;
const GWEN_ARGS args[]= {
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"cfgdir", /* name */
0, /* minnum */
1, /* maxnum */
"D", /* short option */
"cfgdir", /* long option */
I18S("Specify the configuration folder"),
I18S("Specify the configuration folder")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"charset", /* name */
0, /* minnum */
1, /* maxnum */
0, /* short option */
"charset", /* long option */
I18S("Specify the output character set"), /* short description */
I18S("Specify the output character set") /* long description */
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"mqttAddress", /* name */
0, /* minnum */
1, /* maxnum */
"ma", /* short option */
"mqttaddress", /* long option */
I18S("Specify the address of the MQTT server to connect to (disabled if missing)"),
I18S("Specify the address of the MQTT server to connect to (disabled if missing)")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Int, /* type */
"mqttPort", /* name */
0, /* minnum */
1, /* maxnum */
"mp", /* short option */
"mqttport", /* long option */
I18S("Specify the port of the MQTT server (default: 1883)"),
I18S("Specify the port of the MQTT server (default: 1883)")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"mqttClientId", /* name */
0, /* minnum */
1, /* maxnum */
NULL, /* short option */
"mqttclientid", /* long option */
I18S("Specify client id for the MQTT server (default: \"aqhomed\")"),
I18S("Specify client id for the MQTT server (default: \"aqhomed\")")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Int, /* type */
"mqttKeepAlive", /* name */
0, /* minnum */
1, /* maxnum */
"mk", /* short option */
"mqttkeepalive", /* long option */
I18S("Specify keepalive time in seconds (defaults: 600)"),
I18S("Specify keepalive time in seconds (defaults: 600)")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"writeToFolder", /* name */
0, /* minnum */
1, /* maxnum */
"W", /* short option */
NULL, /* long option */
I18S("Specify folder to write received values to"),
I18S("Specify folder to write received values to")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"pidfile", /* name */
0, /* minnum */
1, /* maxnum */
"p", /* short option */
"pidfile", /* long option */
I18S("Specify the PID file"),
I18S("Specify the PID file")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"itemfile", /* name */
0, /* minnum */
1, /* maxnum */
"i", /* short option */
"itemfile", /* long option */
I18S("Specify the item definitions file"),
I18S("Specify the item definitions file")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Int, /* type */
"timeout", /* name */
0, /* minnum */
1, /* maxnum */
"T", /* short option */
"timeout", /* long option */
I18S("Specify timeout in second (default: no timeout)"),
I18S("Specify timeout in second (default: no timeout)")
},
{
GWEN_ARGS_FLAGS_HELP | GWEN_ARGS_FLAGS_LAST, /* flags */
GWEN_ArgsType_Int, /* type */
"help", /* name */
0, /* minnum */
0, /* maxnum */
"h", /* short option */
"help",
I18S("Show this help screen."),
I18S("Show this help screen.")
}
};
rv=GWEN_Args_Check(argc, argv, 1, 0, args, dbArgs);
if (rv==GWEN_ARGS_RESULT_ERROR) {
fprintf(stderr, "ERROR: Could not parse arguments main\n");
return GWEN_ERROR_INVALID;
}
else if (rv==GWEN_ARGS_RESULT_HELP) {
GWEN_BUFFER *ubuf;
ubuf=GWEN_Buffer_new(0, 1024, 0, 1);
GWEN_Buffer_AppendArgs(ubuf,
I18N("This is version %s.\nUsage: %s [OPTIONS]\n\nOptions:\n"),
AQHOME_VERSION_STRING,
argv[0]);
if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) {
fprintf(stderr, "ERROR: Could not create help string\n");
return 1;
}
GWEN_Buffer_AppendString(ubuf, "\n");
fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(ubuf));
GWEN_Buffer_free(ubuf);
return GWEN_ERROR_CLOSE;
}
return 0;
}

View File

@@ -85,7 +85,7 @@ GWEN_MSG_ENDPOINT *AqHomeMqttLog_CreateMqttEndpoint(GWEN_DB_NODE *dbArgs)
}
#if 0
int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT *epTcp)
{
if (GWEN_MsgEndpoint_GetState(epTcp)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) {
@@ -136,6 +136,7 @@ int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT *epTcp, const char *topicFilter)
return 0;
}
#endif
@@ -156,6 +157,7 @@ int AqHomeMqttLog_Ping(GWEN_MSG_ENDPOINT *epTcp)
#if 0
GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int timeoutInSeconds)
{
time_t startTime;
@@ -187,5 +189,7 @@ GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int
return NULL;
}
#endif

View File

@@ -17,8 +17,8 @@
GWEN_MSG_ENDPOINT *AqHomeMqttLog_CreateMqttEndpoint(GWEN_DB_NODE *dbArgs);
int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT *epTcp);
int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT *epTcp, const char *topicFilter);
/*int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT *epTcp); */
/* int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT *epTcp, const char *topicFilter); */
int AqHomeMqttLog_Ping(GWEN_MSG_ENDPOINT *epTcp);

View File

@@ -0,0 +1,81 @@
<?xml?>
<gwbuild>
<target type="ConvenienceLibrary" name="aqhmqtt_types" >
<includes type="c" >
$(gwenhywfar_cflags)
-I$(topsrcdir)
-I$(topbuilddir)
-I$(topsrcdir)/apps
-I$(topbuilddir)/apps
-I$(builddir)
-I$(srcdir)
</includes>
<includes type="tm2" >
--include=$(builddir)
--include=$(srcdir)
</includes>
<setVar name="local/cflags">$(visibility_cflags)</setVar>
<setVar name="tm2flags" >
</setVar>
<setVar name="local/typefiles" >
device.t2d
value.t2d
topic.t2d
translation.t2d
</setVar>
<setVar name="local/built_sources" >
device.c
value.c
topic.c
translation.c
</setVar>
<setVar name="local/built_headers_pub">
</setVar>
<setVar name="local/built_headers_priv" >
device.h
device_p.h
value.h
value_p.h
topic.h
topic_p.h
translation.h
translation_p.h
</setVar>
<headers dist="true" >
</headers>
<sources>
$(local/typefiles)
</sources>
<useTargets>
</useTargets>
<libraries>
</libraries>
<subdirs>
</subdirs>
<extradist>
</extradist>
</target>
</gwbuild>

View File

@@ -0,0 +1,74 @@
<?xml?>
<tm2>
<type id="AQHMQTT_DEVICE" type="pointer">
<descr>
This object and its objects are used to store registered devices and definitions for possible new devices.
</descr>
<lang id="c">
<identifier>AQHMQTT_DEVICE</identifier>
<prefix>AQHMQTT_Device</prefix>
<baseFileName>device</baseFileName>
<flags>
with_list1
with_list2
</flags>
<headers>
<header type="sys" loc="pre">aqhome/api.h</header>
<header type="sys" loc="pre">aqhome-mqttlog/types/topic.h</header>
</headers>
<inlines>
</inlines>
</lang>
<enums>
</enums>
<members>
<member name="id" type="char_ptr" maxlen="128">
<descr>Only set for registered devices</descr>
<default>NULL</default>
<preset>NULL</preset>
<access>public</access>
<flags>own with_getbymember</flags>
</member>
<member name="name" type="char_ptr" maxlen="128">
<default>NULL</default>
<preset>NULL</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="driver" type="char_ptr" maxlen="64">
<default>NULL</default>
<preset>NULL</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="topicList" type="AQHMQTT_TOPIC_LIST" >
<default>NULL</default>
<preset>NULL</preset>
<access>public</access>
<flags>own noconst</flags>
<getflags>none</getflags>
</member>
</members>
</type>
</tm2>

View File

@@ -0,0 +1,106 @@
<?xml?>
<tm2>
<type id="AQHMQTT_TOPIC" type="pointer">
<descr>
</descr>
<lang id="c">
<identifier>AQHMQTT_TOPIC</identifier>
<prefix>AQHMQTT_Topic</prefix>
<baseFileName>topic</baseFileName>
<flags>
with_list1
</flags>
<headers>
<header type="sys" loc="pre">aqhome/api.h</header>
<header type="sys" loc="pre">aqhome-mqttlog/types/value.h</header>
</headers>
<inlines>
</inlines>
</lang>
<enums>
<enum id="AQHMQTT_TOPIC_TYPE" prefix="AQHMQTT_TopicType_">
<item name="num" value="0">
<descr>numeric type</descr>
</item>
<item name="json" >
<descr>JSON type</descr>
</item>
</enum>
<enum id="AQHMQTT_TOPIC_DIR" prefix="AQHMQTT_TopicDir_">
<item name="in" value="0"/>
<item name="out" />
</enum>
</enums>
<members>
<member name="topic" type="char_ptr" maxlen="128">
<descr>Only set for registered devices</descr>
<default>NULL</default>
<preset>NULL</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="topicType" type="int" maxlen="8">
<default>0</default>
<preset>0</preset>
<access>public</access>
<flags></flags>
</member>
<member name="direction" type="int" maxlen="8">
<default>0</default>
<preset>0</preset>
<access>public</access>
<flags></flags>
</member>
<member name="mask" type="char_ptr" maxlen="128">
<default>NULL</default>
<preset>NULL</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="beforeId" type="char_ptr" maxlen="128">
<default>NULL</default>
<preset>NULL</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="afterId" type="char_ptr" maxlen="128">
<default>NULL</default>
<preset>NULL</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="valueList" type="AQHMQTT_VALUE_LIST" >
<default>NULL</default>
<preset>NULL</preset>
<access>public</access>
<flags>own</flags>
<getflags>none</getflags>
</member>
</members>
</type>
</tm2>

View File

@@ -0,0 +1,55 @@
<?xml?>
<tm2>
<type id="AQHMQTT_TRANSLATION" type="pointer">
<descr>
</descr>
<lang id="c">
<identifier>AQHMQTT_TRANSLATION</identifier>
<prefix>AQHMQTT_Translation</prefix>
<baseFileName>translation</baseFileName>
<flags>
with_db
with_list1
</flags>
<headers>
<header type="sys" loc="pre">aqhome/api.h</header>
</headers>
<inlines>
</inlines>
</lang>
<enums>
</enums>
<members>
<member name="aqhValue" type="char_ptr" maxlen="128">
<default>0</default>
<preset>0</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="driverValue" type="char_ptr" maxlen="128">
<default>0</default>
<preset>0</preset>
<access>public</access>
<flags>own</flags>
</member>
</members>
</type>
</tm2>

View File

@@ -0,0 +1,91 @@
<?xml?>
<tm2>
<type id="AQHMQTT_VALUE" type="pointer">
<descr>
</descr>
<lang id="c">
<identifier>AQHMQTT_VALUE</identifier>
<prefix>AQHMQTT_Value</prefix>
<baseFileName>value</baseFileName>
<flags>
with_list1
with_list2
</flags>
<headers>
<header type="sys" loc="pre">aqhome/api.h</header>
<header type="sys" loc="pre">aqhome-mqttlog/types/translation.h</header>
</headers>
<inlines>
</inlines>
</lang>
<enums>
<enum id="AQHMQTT_VALUE_TYPE" prefix="AQHMQTT_ValueType_">
<item name="sensor" value="0">
<descr>sensor</descr>
</item>
<item name="actor" >
<descr>actor</descr>
</item>
</enum>
</enums>
<members>
<member name="name" type="char_ptr" maxlen="128">
<default>0</default>
<preset>0</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="valueType" type="int" maxlen="8">
<default>0</default>
<preset>0</preset>
<access>public</access>
<flags></flags>
</member>
<member name="valueUnits" type="char_ptr" maxlen="32">
<default>0</default>
<preset>0</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="path" type="char_ptr" maxlen="256">
<default>0</default>
<preset>0</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="translationList" type="AQHMQTT_TRANSLATION_LIST" >
<default>NULL</default>
<preset>NULL</preset>
<access>public</access>
<flags>own</flags>
<getflags>none</getflags>
</member>
</members>
</type>
</tm2>

View File

@@ -0,0 +1,346 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./xmlread.h"
#include "./aqhome_mqtt_p.h"
#include "aqhome-mqttlog/types/topic.h"
#include "aqhome-mqttlog/types/value.h"
#include <aqhome/api.h>
#include <aqhome/aqhome.h>
#include <gwenhywfar/endpoint_multilayer.h>
#include <gwenhywfar/text.h>
#include <gwenhywfar/xml.h>
#include <gwenhywfar/debug.h>
/* ------------------------------------------------------------------------------------------------
* defines
* ------------------------------------------------------------------------------------------------
*/
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static AQHMQTT_DEVICE_LIST *_readDeviceFiles(AQHOME_MQTT *aqh, const GWEN_STRINGLIST *sl);
static AQHMQTT_DEVICE *_readDeviceFile(AQHOME_MQTT *aqh, const char *sFilename);
static AQHMQTT_DEVICE *_readXmlDevice(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceNode);
static AQHMQTT_TOPIC_LIST *_readXmlTopicList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode);
static AQHMQTT_TOPIC *_readXmlTopic(AQHOME_MQTT *aqh, GWEN_XMLNODE *topicNode);
static AQHMQTT_VALUE_LIST *_readXmlValueList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode);
static AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadSysconfDeviceFiles(AQHOME_MQTT *aqh)
{
GWEN_STRINGLIST *sl;
sl=AQH_GetListOfMatchingSysconfFiles("devices", "*.xml");
if (sl) {
AQHMQTT_DEVICE_LIST *deviceList;
deviceList=_readDeviceFiles(aqh, sl);
GWEN_StringList_free(sl);
if (deviceList==NULL) {
DBG_INFO(NULL, "Error reading sysconf device files");
return NULL;
}
return deviceList;
}
else {
DBG_INFO(NULL, "No sysconf device files");
return NULL;
}
}
AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadRuntimeDataDeviceFiles(AQHOME_MQTT *aqh)
{
GWEN_STRINGLIST *sl;
sl=AQH_GetListOfMatchingRuntimeDataFiles("aqhome-mqtt/devices", "*.xml");
if (sl) {
AQHMQTT_DEVICE_LIST *deviceList;
deviceList=_readDeviceFiles(aqh, sl);
GWEN_StringList_free(sl);
if (deviceList==NULL) {
DBG_INFO(NULL, "Error reading sysconf device files");
return NULL;
}
return deviceList;
}
else {
DBG_INFO(NULL, "No sysconf device files");
return NULL;
}
}
AQHMQTT_DEVICE_LIST *_readDeviceFiles(AQHOME_MQTT *aqh, const GWEN_STRINGLIST *sl)
{
GWEN_STRINGLISTENTRY *se;
AQHMQTT_DEVICE_LIST *deviceList;
deviceList=AQHMQTT_Device_List_new();
se=GWEN_StringList_FirstEntry(sl);
while(se) {
const char *s;
s=GWEN_StringListEntry_Data(se);
if (s && *s) {
AQHMQTT_DEVICE *device;
device=_readDeviceFile(aqh, s);
if (device)
AQHMQTT_Device_List_Add(device, deviceList);
}
se=GWEN_StringListEntry_Next(se);
}
if (AQHMQTT_Device_List_GetCount(deviceList)<1) {
AQHMQTT_Device_List_free(deviceList);
return NULL;
}
return deviceList;
}
AQHMQTT_DEVICE *_readDeviceFile(AQHOME_MQTT *aqh, const char *sFilename)
{
GWEN_XMLNODE *rootNode;
GWEN_XMLNODE *deviceNode;
int rv;
rootNode=GWEN_XMLNode_new(GWEN_XMLNodeTypeTag, NULL);
rv=GWEN_XML_ReadFile(rootNode, sFilename, GWEN_XML_FLAGS_DEFAULT);
if (rv<0) {
DBG_ERROR(NULL, "Error reading XML file \"%s\": %d", sFilename, rv);
GWEN_XMLNode_free(rootNode);
return NULL;
}
deviceNode=GWEN_XMLNode_FindFirstTag(rootNode, "device", NULL, NULL);
if (deviceNode) {
AQHMQTT_DEVICE *device;
device=_readXmlDevice(aqh, deviceNode);
GWEN_XMLNode_free(rootNode);
if (device==NULL) {
DBG_INFO(NULL, "Error reading device from XML file \"%s\" (%d)", sFilename, rv);
return NULL;
}
return device;
}
else {
DBG_INFO(NULL, "XML file \"%s\" does not contain a <device> element", sFilename);
GWEN_XMLNode_free(rootNode);
return NULL;
}
GWEN_XMLNode_free(rootNode);
return NULL;
}
AQHMQTT_DEVICE *_readXmlDevice(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceNode)
{
AQHMQTT_DEVICE *device;
GWEN_XMLNODE *topicsNode;
device=AQHMQTT_Device_new();
AQHMQTT_Device_SetId(device, GWEN_XMLNode_GetProperty(deviceNode, "id", NULL));
AQHMQTT_Device_SetName(device, GWEN_XMLNode_GetProperty(deviceNode, "name", NULL));
AQHMQTT_Device_SetDriver(device, GWEN_XMLNode_GetProperty(deviceNode, "driver", NULL));
topicsNode=GWEN_XMLNode_FindFirstTag(deviceNode, "mqtttopics", NULL, NULL);
if (topicsNode) {
AQHMQTT_TOPIC_LIST *topicList;
topicList=_readXmlTopicList(aqh, topicsNode);
if (topicList)
AQHMQTT_Device_SetTopicList(device, topicList);
else {
DBG_INFO(NULL, "No mqtt topics read");
AQHMQTT_Device_free(device);
return NULL;
}
}
else {
DBG_INFO(NULL, "No <mqtttopics> element");
AQHMQTT_Device_free(device);
return NULL;
}
return device;
}
AQHMQTT_TOPIC_LIST *_readXmlTopicList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode)
{
AQHMQTT_TOPIC_LIST *topicList;
GWEN_XMLNODE *topicNode;
topicList=AQHMQTT_Topic_List_new();
topicNode=GWEN_XMLNode_FindFirstTag(parentNode, "mqtttopic", NULL, NULL);
while(topicNode) {
AQHMQTT_TOPIC *topic=_readXmlTopic(aqh, topicNode);
if (topic)
AQHMQTT_Topic_List_Add(topic, topicList);
else {
DBG_INFO(NULL, "Error reading <mqtttopic> element");
AQHMQTT_Topic_List_free(topicList);
return NULL;
}
topicNode=GWEN_XMLNode_FindNextTag(topicNode, "mqtttopic", NULL, NULL);
}
if (AQHMQTT_Topic_List_GetCount(topicList)<1) {
AQHMQTT_Topic_List_free(topicList);
return NULL;
}
return topicList;
}
AQHMQTT_TOPIC *_readXmlTopic(AQHOME_MQTT *aqh, GWEN_XMLNODE *topicNode)
{
AQHMQTT_TOPIC *topic;
GWEN_XMLNODE *valuesNode;
int i;
const char *s;
topic=AQHMQTT_Topic_new();
s=GWEN_XMLNode_GetProperty(topicNode, "type", NULL);
i=AQHMQTT_TopicType_fromString(s);
if (i==AQHMQTT_TopicType_Unknown) {
DBG_ERROR(NULL, "Invalid topic type \"%s\"", s?s:"<empty>");
AQHMQTT_Topic_free(topic);
return NULL;
}
AQHMQTT_Topic_SetTopicType(topic, i);
s=GWEN_XMLNode_GetProperty(topicNode, "direction", NULL);
i=AQHMQTT_TopicDir_fromString(s);
if (i==AQHMQTT_TopicDir_Unknown) {
DBG_ERROR(NULL, "Invalid topic direction \"%s\"", s?s:"<empty>");
AQHMQTT_Topic_free(topic);
return NULL;
}
AQHMQTT_Topic_SetDirection(topic, i);
AQHMQTT_Topic_SetTopic(topic, GWEN_XMLNode_GetCharValue(topicNode, "topic", NULL));
AQHMQTT_Topic_SetMask(topic, GWEN_XMLNode_GetCharValue(topicNode, "mask", NULL));
AQHMQTT_Topic_SetBeforeId(topic, GWEN_XMLNode_GetCharValue(topicNode, "beforeId", NULL));
AQHMQTT_Topic_SetAfterId(topic, GWEN_XMLNode_GetCharValue(topicNode, "afterId", NULL));
valuesNode=GWEN_XMLNode_FindFirstTag(topicNode, "values", NULL, NULL);
if (valuesNode) {
AQHMQTT_VALUE_LIST *valueList;
valueList=_readXmlValueList(aqh, valuesNode);
if (valueList)
AQHMQTT_Topic_SetValueList(topic, valueList);
else {
DBG_INFO(NULL, "No values read");
AQHMQTT_Topic_free(topic);
return NULL;
}
}
else {
DBG_INFO(NULL, "No <values> element");
AQHMQTT_Topic_free(topic);
return NULL;
}
return topic;
}
AQHMQTT_VALUE_LIST *_readXmlValueList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode)
{
AQHMQTT_VALUE_LIST *valueList;
GWEN_XMLNODE *valueNode;
valueList=AQHMQTT_Value_List_new();
valueNode=GWEN_XMLNode_FindFirstTag(parentNode, "value", NULL, NULL);
while(valueNode) {
AQHMQTT_VALUE *value=_readXmlValue(aqh, valueNode);
if (value)
AQHMQTT_Value_List_Add(value, valueList);
else {
DBG_INFO(NULL, "Error reading <value> element");
AQHMQTT_Value_List_free(valueList);
return NULL;
}
valueNode=GWEN_XMLNode_FindNextTag(valueNode, "value", NULL, NULL);
}
if (AQHMQTT_Value_List_GetCount(valueList)<1) {
AQHMQTT_Value_List_free(valueList);
return NULL;
}
return valueList;
}
AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode)
{
AQHMQTT_VALUE *value;
const char *s;
int i;
value=AQHMQTT_Value_new();
AQHMQTT_Value_SetName(value, GWEN_XMLNode_GetProperty(valueNode, "name", NULL));
AQHMQTT_Value_SetValueUnits(value, GWEN_XMLNode_GetProperty(valueNode, "units", NULL));
AQHMQTT_Value_SetPath(value, GWEN_XMLNode_GetProperty(valueNode, "path", NULL));
s=GWEN_XMLNode_GetProperty(valueNode, "type", NULL);
i=AQHMQTT_ValueType_fromString(s);
if (i==AQHMQTT_ValueType_Unknown) {
DBG_ERROR(NULL, "Invalid value type \"%s\"", s?s:"<empty>");
AQHMQTT_Value_free(value);
return NULL;
}
AQHMQTT_Value_SetValueType(value, i);
/* TODO: read translationList */
return value;
}

View File

@@ -0,0 +1,26 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQHOME_MQTTLOG_XMLREAD_H
#define AQHOME_MQTTLOG_XMLREAD_H
#include "aqhome-mqttlog/aqhome_mqtt.h"
#include "aqhome-mqttlog/types/device.h"
AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadSysconfDeviceFiles(AQHOME_MQTT *aqh);
AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadRuntimeDataDeviceFiles(AQHOME_MQTT *aqh);
#endif

View File

@@ -62,8 +62,6 @@ static void _setDeviceName(AQH_VALUE *value, uint32_t uid);
* ------------------------------------------------------------------------------------------------
*/
void AqHomed_ForwardTtyMsgToBroker(AQHOMED *aqh, const GWEN_MSG *nodeMsg)
{
if (GWEN_MsgEndpoint_GetState(aqh->brokerEndpoint)==GWEN_MSG_ENDPOINT_STATE_CONNECTED) {

View File

@@ -24,6 +24,7 @@
<define name="AQHOME_SYSCONF_DIR" value="$(aqhome_cfg_searchdir)" quoted="TRUE" />
<define name="AQHOME_LOCALEF_DIR" value="$(aqhome_locale_searchdir)" quoted="TRUE" />
<define name="AQHOME_DATA_DIR" value="$(aqhome_data_searchdir)" quoted="TRUE" />
<define name="AQHOME_RTDATA_DIR" value="$(aqhome_rtdata_searchdir)" quoted="TRUE" />
<setVar name="local/cflags">$(visibility_cflags)</setVar>

View File

@@ -25,6 +25,7 @@
#define AQHOME_PM_LIBNAME "aqhome"
#define AQHOME_PM_SYSCONFDIR "sysconfdir"
#define AQHOME_PM_DATADIR "datadir"
#define AQHOME_PM_RTDATADIR "rtdatadir"
#define AQHOME_PM_LOCALEDIR "localedir"
#define AQHOME_SYSCONFIG_FILE "aqhome.conf"
@@ -37,6 +38,7 @@ static void _initPathManager(void);
static void _finiPathManager(void);
static void _initI18n(void);
static void _definePath(const char *pathName, const char *pathValue);
static GWEN_STRINGLIST *_getListOfMatchingFiles(const char *pathName, const char *subFolder, const char *mask);
@@ -83,6 +85,28 @@ GWEN_STRINGLIST *AQH_GetGlobalSysconfDirs(void)
GWEN_STRINGLIST *AQH_GetListOfMatchingDataFiles(const char *subFolder, const char *mask)
{
return _getListOfMatchingFiles(AQHOME_PM_DATADIR, subFolder, mask);
}
GWEN_STRINGLIST *AQH_GetListOfMatchingRuntimeDataFiles(const char *subFolder, const char *mask)
{
return _getListOfMatchingFiles(AQHOME_PM_RTDATADIR, subFolder, mask);
}
GWEN_STRINGLIST *AQH_GetListOfMatchingSysconfFiles(const char *subFolder, const char *mask)
{
return _getListOfMatchingFiles(AQHOME_PM_SYSCONFDIR, subFolder, mask);
}
GWEN_DB_NODE *AQH_LoadConfigFile(void)
{
GWEN_BUFFER *fbuf;
@@ -161,6 +185,7 @@ void _initPathManager(void)
_definePath(AQHOME_PM_SYSCONFDIR, AQHOME_SYSCONF_DIR);
_definePath(AQHOME_PM_LOCALEDIR, AQHOME_SYSCONF_DIR);
_definePath(AQHOME_PM_DATADIR, AQHOME_DATA_DIR);
_definePath(AQHOME_PM_RTDATADIR, AQHOME_RTDATA_DIR);
}
@@ -168,6 +193,7 @@ void _initPathManager(void)
void _finiPathManager(void)
{
GWEN_PathManager_UndefinePath(AQHOME_PM_LIBNAME, AQHOME_PM_LOCALEDIR);
GWEN_PathManager_UndefinePath(AQHOME_PM_LIBNAME, AQHOME_PM_RTDATADIR);
GWEN_PathManager_UndefinePath(AQHOME_PM_LIBNAME, AQHOME_PM_DATADIR);
GWEN_PathManager_UndefinePath(AQHOME_PM_LIBNAME, AQHOME_PM_SYSCONFDIR);
GWEN_PathManager_RemovePaths(AQHOME_PM_LIBNAME);
@@ -218,3 +244,22 @@ void _definePath(const char *pathName, const char *pathValue)
GWEN_STRINGLIST *_getListOfMatchingFiles(const char *pathName, const char *subFolder, const char *mask)
{
int rv;
GWEN_STRINGLIST *sl;
sl=GWEN_StringList_new();
rv=GWEN_PathManager_GetMatchingFilesRecursively(AQHOME_PM_LIBNAME, AQHOME_PM_DATADIR, subFolder, sl, mask);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN,
"Error listing matching data files (folder=%s, mask=%s)",
subFolder?subFolder:"<empty>", mask?mask:"<empty>");
GWEN_StringList_free(sl);
return NULL;
}
return sl;
}

View File

@@ -23,6 +23,10 @@ AQHOME_API void AQH_Fini(void);
AQHOME_API GWEN_DB_NODE *AQH_LoadConfigFile(void);
AQHOME_API void AQH_MergeConfigFileIntoConfig(GWEN_DB_NODE *dbArgs, const char *destDbGroupName);
AQHOME_API GWEN_STRINGLIST *AQH_GetListOfMatchingDataFiles(const char *subFolder, const char *mask);
AQHOME_API GWEN_STRINGLIST *AQH_GetListOfMatchingRuntimeDataFiles(const char *subFolder, const char *mask);
AQHOME_API GWEN_STRINGLIST *AQH_GetListOfMatchingSysconfFiles(const char *subFolder, const char *mask);
AQHOME_API GWEN_STRINGLIST *AQH_GetGlobalDataDirs(void);
AQHOME_API GWEN_STRINGLIST *AQH_GetGlobalSysconfDirs(void);

View File

@@ -4,3 +4,6 @@ int brokerPort=1899
char nodesAddr="127.0.0.1"
int nodesPort=45454
char mqttAddr="192.168.117.192"
int mqttPort=1883