diff --git a/apps/aqhome-mqttlog/0BUILD b/apps/aqhome-mqttlog/0BUILD index d155903..5775c86 100644 --- a/apps/aqhome-mqttlog/0BUILD +++ b/apps/aqhome-mqttlog/0BUILD @@ -37,30 +37,23 @@ - init.h - fini.h - loop.h - loop_ipc.h - loop_mqtt.h - aqhome_mqtt.h - aqhome_mqtt_p.h xmlread.h xmlwrite.h c_setdata.h + server.h + server_p.h + s_publish.h $(local/typefiles) aqhome_mqtt.c - init.c - fini.c - loop.c - loop_ipc.c - loop_mqtt.c main.c xmlread.c xmlwrite.c c_setdata.c + server.c + s_publish.c diff --git a/apps/aqhome-mqttlog/main.c b/apps/aqhome-mqttlog/main.c index 7bacfd0..f692151 100644 --- a/apps/aqhome-mqttlog/main.c +++ b/apps/aqhome-mqttlog/main.c @@ -10,10 +10,7 @@ # include #endif -#include "./init.h" -#include "./fini.h" -#include "./loop.h" -#include "./loop_mqtt.h" +#include "./server.h" #include "aqhome/aqhome.h" @@ -46,8 +43,9 @@ #define I18N(msg) msg #define I18S(msg) msg -#define AQHOME_MQTTLOG_PING_INTERVAL 120 -#define AQHOME_MQTTLOG_SAVE_INTERVAL 60 +#define CONNCHECK_INTERVAL_IN_SECS 10 +#define PING_INTERVAL_IN_SECS 120 +#define SAVE_INTERVAL_IN_SECS 60 #define FULL_DEBUG @@ -58,7 +56,8 @@ * ------------------------------------------------------------------------------------------------ */ -static void _serve(AQHOME_MQTT *aqh); +static void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop); +static int _diffInSeconds(time_t t1, time_t t0); #ifdef HAVE_SIGNAL_H static int _setSignalHandlers(void); @@ -85,11 +84,10 @@ static int stopService=0; int main(int argc, char **argv) { - AQHOME_MQTT *aqh; - GWEN_DB_NODE *dbArgs; + AQH_EVENT_LOOP *eventLoop; + AQH_OBJECT *aqh; int rv; GWEN_GUI *gui; - const char *s; rv=GWEN_Init(); if (rv) { @@ -100,14 +98,25 @@ int main(int argc, char **argv) GWEN_Logger_Open(0, "aqhome-mqttlog", 0, GWEN_LoggerType_Console, GWEN_LoggerFacility_User); GWEN_Logger_SetLevel(0, GWEN_LoggerLevel_Warning); + rv=_setSignalHandlers(); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + rv=AQH_Init(); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return 2; } - aqh=AqHomeMqtt_new(); - rv=AqHomeMqtt_Init(aqh, argc, argv); + gui=GWEN_Gui_CGui_new(); + GWEN_Gui_SetGui(gui); + + eventLoop=AQH_EventLoop_new(); + + aqh=AQH_MqttLogServer_new(eventLoop); + rv=AQH_MqttLogServer_Init(aqh, argc, argv); if (rv<0) { if (rv==GWEN_ERROR_CLOSE) return 1; @@ -115,18 +124,11 @@ int main(int argc, char **argv) return 2; } - dbArgs=AqHomeMqtt_GetDbArgs(aqh); + _runService(aqh, eventLoop); - gui=GWEN_Gui_CGui_new(); - s=GWEN_DB_GetCharValue(dbArgs, "charset", 0, NULL); - if (s && *s) - GWEN_Gui_SetCharSet(gui, s); - GWEN_Gui_SetGui(gui); - _serve(aqh); - - AqHomeMqtt_Fini(aqh); - AqHomeMqtt_free(aqh); + AQH_MqttLogServer_Fini(aqh); + AQH_Object_free(aqh); GWEN_Gui_SetGui(NULL); GWEN_Gui_free(gui); @@ -136,77 +138,65 @@ int main(int argc, char **argv) -void _serve(AQHOME_MQTT *aqh) +void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop) { - int rv; + time_t timeStart; int timeout; - time_t startTime; - time_t lastPingSendTime; - time_t lastSaveTime; - GWEN_DB_NODE *dbArgs; + time_t timeLastConnCheck; + time_t timeLastSave; + time_t timeLastPingSend; + int rv; - startTime=time(NULL); - lastSaveTime=time(NULL); - - dbArgs=AqHomeMqtt_GetDbArgs(aqh); - - rv=_setSignalHandlers(); - if (rv<0) { - DBG_ERROR(NULL, "Error setting signal handlers (%d)", rv); - return; - } - - timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0); - - lastPingSendTime=time(NULL); + timeout=AQH_MqttLogServer_GetTimeout(aqh); + timeStart=time(NULL); + timeLastConnCheck=time(NULL); + timeLastSave=time(NULL); + timeLastPingSend=time(NULL); while(!stopService) { - DBG_DEBUG(NULL, "Next loop"); - AqHomeMqttLog_Loop(aqh, 2000); + time_t now; - if (timeout) { - time_t now; + AQH_EventLoop_Run(eventLoop, 2000); + AQH_MqttLogServer_HandleMqttMsgs(aqh); + AQH_MqttLogServer_HandleBrokerMsgs(aqh); - now=time(NULL); - if ((now-startTime)>timeout) { - DBG_ERROR(NULL, "Timeout, stopping service"); - break; - } - } - if (1){ - time_t now; + now=time(NULL); - now=time(NULL); - if (now-lastPingSendTime>AQHOME_MQTTLOG_PING_INTERVAL) { - rv=AqHomeMqttLog_SendPing(aqh); - if (rv<0) { - DBG_INFO(NULL, "Error sending PING"); - } - lastPingSendTime=time(NULL); - } + if (_diffInSeconds(now, timeLastConnCheck)>CONNCHECK_INTERVAL_IN_SECS) { + DBG_ERROR(NULL, "Check connections"); + AQH_MqttLogServer_CheckBrokerConnection(aqh); + AQH_MqttLogServer_CheckMqttConnection(aqh); + timeLastConnCheck=now; } - if (1){ - time_t now; - - now=time(NULL); - if (now-lastSaveTime>AQHOME_MQTTLOG_SAVE_INTERVAL) { - DBG_ERROR(NULL, "Writing device files"); - rv=AqHomeMqtt_SaveRuntimeDeviceFiles(aqh); - if (rv<0) { - DBG_INFO(NULL, "Error writing runtime data"); - } - lastSaveTime=time(NULL); + if (_diffInSeconds(now, timeLastPingSend)>PING_INTERVAL_IN_SECS) { + rv=AQH_MqttLogServer_SendPing(aqh); + if (rv<0) { + DBG_INFO(NULL, "Error sending PING"); } + timeLastPingSend=time(NULL); } + if (_diffInSeconds(now, timeLastSave)>SAVE_INTERVAL_IN_SECS) { + DBG_ERROR(NULL, "Writing device files"); + rv=AQH_MqttLogServer_SaveRuntimeDeviceFiles(aqh); + if (rv<0) { + DBG_INFO(NULL, "Error writing device file"); + } + timeLastSave=time(NULL); + } + + if (timeout && (_diffInSeconds(now, timeStart)>timeout)) { + DBG_INFO(NULL, "Timeout"); + break; + } } /* while */ - rv=AqHomeMqtt_SaveRuntimeDeviceFiles(aqh); + + rv=AQH_MqttLogServer_SaveRuntimeDeviceFiles(aqh); if (rv<0) { DBG_INFO(NULL, "Error writing runtime data"); } - } @@ -277,3 +267,8 @@ void _signalHandler(int s) +int _diffInSeconds(time_t t1, time_t t0) +{ + return t1-t0; +} + diff --git a/apps/aqhome-mqttlog/s_publish.c b/apps/aqhome-mqttlog/s_publish.c new file mode 100644 index 0000000..cf55aff --- /dev/null +++ b/apps/aqhome-mqttlog/s_publish.c @@ -0,0 +1,436 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./s_publish.h" +#include "./server_p.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static int _handlePublish(AQH_OBJECT *o, const char *rcvdTopic, const char *rcvdValue); +static void _handleNumTopic(AQH_MQTTLOG_SERVER *xo, AQHMQTT_DEVICE *device, + AQHMQTT_TOPIC *topic, const char *rcvdValue); +static void _handleJsonTopic(AQH_MQTTLOG_SERVER *xo, AQHMQTT_DEVICE *device, + AQHMQTT_TOPIC *topic, const char *rcvdValue); +static void _sendMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value, const char *rcvdValue); +static void _announceDeviceToBroker(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device); +static void _sendAnnounceValueMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value); +static AQH_VALUE *_mkMessageValue(const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value); +static int _mqttValueTypeMessageValueType(int t); +static int _registerNewDeviceForTopic(AQH_MQTTLOG_SERVER *xo, const char *rcvdTopic); +static AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir); +static AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir); +static GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic); + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + +void AQH_MqttLogServer_HandlePublishMsg(AQH_OBJECT *o, GWEN_UNUSED AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + if (o && msg) { + AQH_MQTTLOG_SERVER *xo; + + xo=AQH_MqttLogServer_GetServerData(o); + if (xo && xo->registeredDeviceList) { + char *topic; + char *value; + + topic=AQH_MqttMessagePublish_ExtractTopic(msg); + value=AQH_MqttMessagePublish_ExtractValue(msg); + + if (topic && value) { + int rv; + + rv=_handlePublish(o, topic, value); + if (rv!=1) { + DBG_INFO(NULL, "New topic \"%s\", trying to register", topic); + rv=_registerNewDeviceForTopic(xo, topic); + if (rv==1) { + rv=_handlePublish(o, topic, value); + if (rv!=1) { + DBG_ERROR(NULL, "Topic \"%s\" still not handled, SNH!", topic); + } + } + } + } + else { + DBG_ERROR(NULL, "Either topic or value missing in PUBLISH msg"); + } + free(value); + free(topic); + } + } +} + + + +int _handlePublish(AQH_OBJECT *o, const char *rcvdTopic, const char *rcvdValue) +{ + if (o && rcvdTopic && *rcvdTopic) { + AQH_MQTTLOG_SERVER *xo; + + xo=AQH_MqttLogServer_GetServerData(o); + if (xo && xo->registeredDeviceList) { + AQHMQTT_DEVICE *device; + + device=AQHMQTT_Device_List_First(xo->registeredDeviceList); + while(device) { + AQHMQTT_TOPIC_LIST *topicList; + const char *sDeviceName; + const char *sDeviceId; + + sDeviceName=AQHMQTT_Device_GetName(device); + sDeviceId=AQHMQTT_Device_GetId(device); + + topicList=AQHMQTT_Device_GetTopicList(device); + if (topicList) { + AQHMQTT_TOPIC *topic; + + topic=_findTopicMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); +#if 0 + if (topic==NULL) { + topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); + if (topic) + AQHMQTT_Topic_SetTopic(topic, rcvdTopic); + } +#endif + if (topic) { + DBG_INFO(AQH_LOGDOMAIN, + "Handling topic \"%s\" for device type %s (id: %s)", + rcvdTopic, + sDeviceName, sDeviceId?sDeviceId:""); + if (AQHMQTT_Topic_GetTopicType(topic)==AQHMQTT_TopicType_Json) + _handleJsonTopic(xo, device, topic, rcvdValue); + else + _handleNumTopic(xo, device, topic, rcvdValue); + return 1; + } + } + + device=AQHMQTT_Device_List_Next(device); + } + DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic); + } + } + return 0; +} + + + +void _handleNumTopic(AQH_MQTTLOG_SERVER *xo, AQHMQTT_DEVICE *device, + AQHMQTT_TOPIC *topic, const char *rcvdValue) +{ + AQHMQTT_VALUE_LIST *valueList; + + valueList=AQHMQTT_Topic_GetValueList(topic); + if (valueList) + _sendMessage(xo, device, AQHMQTT_Value_List_First(valueList), rcvdValue); + else { + DBG_INFO(NULL, "No value list in device \"%s\"", AQHMQTT_Device_GetId(device)); + } +} + + + +void _handleJsonTopic(AQH_MQTTLOG_SERVER *xo, AQHMQTT_DEVICE *device, + AQHMQTT_TOPIC *topic, const char *rcvdValue) +{ + GWEN_JSON_ELEM *jeRoot; + + jeRoot=GWEN_JsonElement_fromString(rcvdValue); + if (jeRoot==NULL) { + DBG_INFO(NULL, "Could not parse JSON value: %s", rcvdValue); + } + else { + AQHMQTT_VALUE_LIST *valueList; + + valueList=AQHMQTT_Topic_GetValueList(topic); + if (valueList) { + AQHMQTT_VALUE *value; + + value=AQHMQTT_Value_List_First(valueList); + while(value) { + const char *path; + + path=AQHMQTT_Value_GetPath(value); + if (path && *path) { + GWEN_JSON_ELEM *je; + + je=GWEN_JsonElement_GetElementByPath(jeRoot, path, 0); + if (je) { + const char *s; + + s=GWEN_JsonElement_GetData(je); + if (s && *s) + _sendMessage(xo, device, value, s); + } + } + value=AQHMQTT_Value_List_Next(value); + } /* while */ + } + GWEN_JsonElement_free(jeRoot); + } +} + + + +void _sendMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value, const char *rcvdValue) +{ + int rv; + double f; + const char *deviceName; + + deviceName=AQHMQTT_Device_GetId(device); + rv=GWEN_Text_StringToDouble(rcvdValue, &f); + if (rv<0) { + DBG_ERROR(NULL, "Invalid value received from MQTT server (%s)", rcvdValue?rcvdValue:""); + } + else { + AQH_MESSAGE *pubMsg; + uint64_t now; + AQH_VALUE *msgValue; + + now=(uint64_t) time(NULL); + msgValue=_mkMessageValue(device, value); + pubMsg=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, + AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, + msgValue, now, f); + DBG_INFO(AQH_LOGDOMAIN, "BROKER UPDATE_DATA %s/%s: %f", + deviceName?deviceName:"", + AQH_Value_GetName(msgValue), f); + AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg); + AQH_Value_free(msgValue); + } +} + + + +void _announceDeviceToBroker(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device) +{ + AQHMQTT_TOPIC_LIST *topicList; + + topicList=AQHMQTT_Device_GetTopicList(device); + if (topicList) { + AQHMQTT_TOPIC *topic; + + topic=AQHMQTT_Topic_List_First(topicList); + while(topic) { + AQHMQTT_VALUE_LIST *valueList; + + valueList=AQHMQTT_Topic_GetValueList(topic); + if (valueList) { + const AQHMQTT_VALUE *value; + + value=AQHMQTT_Value_List_First(valueList); + while(value) { + _sendAnnounceValueMessage(xo, device, value); + value=AQHMQTT_Value_List_Next(value); + } + } + + topic=AQHMQTT_Topic_List_Next(topic); + } + } +} + + + +void _sendAnnounceValueMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value) +{ + AQH_MESSAGE *pubMsg; + AQH_VALUE *msgValue; + + msgValue=_mkMessageValue(device, value); + pubMsg=AQH_IpcdMessageValues_newForOne(AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE, + AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, + 0, msgValue); + if (pubMsg) { + DBG_INFO(AQH_LOGDOMAIN, "BROKER ANNOUNCE_VALUE %s", AQH_Value_GetName(msgValue)); + AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg); + } + AQH_Value_free(msgValue); +} + + + +AQH_VALUE *_mkMessageValue(const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value) +{ + AQH_VALUE *msgValue; + + msgValue=AQH_Value_new(); + AQH_Value_SetDeviceName(msgValue, AQHMQTT_Device_GetId(device)); + AQH_Value_SetName(msgValue, AQHMQTT_Value_GetName(value)); + AQH_Value_SetValueUnits(msgValue, AQHMQTT_Value_GetValueUnits(value)); + AQH_Value_SetValueType(msgValue, _mqttValueTypeMessageValueType(AQHMQTT_Value_GetValueType(value))); + return msgValue; +} + + + +int _mqttValueTypeMessageValueType(int t) +{ + switch(t){ + case AQHMQTT_ValueType_Sensor: return AQH_ValueType_Sensor; + case AQHMQTT_ValueType_Actor: return AQH_ValueType_Actor; + default: break; + } + DBG_ERROR(AQH_LOGDOMAIN, "Invalid mqtt value type %d", t); + return AQH_ValueType_Sensor; +} + + + +int _registerNewDeviceForTopic(AQH_MQTTLOG_SERVER *xo, const char *rcvdTopic) +{ + if (rcvdTopic && *rcvdTopic) { + if (xo->availableDeviceList) { + AQHMQTT_DEVICE *device; + + device=AQHMQTT_Device_List_First(xo->availableDeviceList); + while(device) { + AQHMQTT_TOPIC_LIST *topicList; + + topicList=AQHMQTT_Device_GetTopicList(device); + if (topicList) { + AQHMQTT_TOPIC *topic; + + topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); + if (topic) { + GWEN_BUFFER *buf; + + buf=_extractDeviceId(topic, rcvdTopic); + if (buf) { + AQHMQTT_DEVICE *newDevice; + + newDevice=AQHMQTT_Device_dup(device); + AQHMQTT_Device_SetId(newDevice, GWEN_Buffer_GetStart(buf)); + topic=_findMaskMatchingTopic(AQHMQTT_Device_GetTopicList(newDevice), rcvdTopic, AQHMQTT_TopicDir_In); + AQHMQTT_Topic_SetTopic(topic, rcvdTopic); + if (xo->registeredDeviceList==NULL) + xo->registeredDeviceList=AQHMQTT_Device_List_new(); + DBG_ERROR(NULL, "Registered device \"%s\" (%s)", AQHMQTT_Device_GetId(newDevice), AQHMQTT_Device_GetName(newDevice)); + AQHMQTT_Device_List_Add(newDevice, xo->registeredDeviceList); + _announceDeviceToBroker(xo, newDevice); + GWEN_Buffer_free(buf); + return 1; + } + + } + } + + device=AQHMQTT_Device_List_Next(device); + } + } + DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic); + } + return 0; + +} + + + +AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir) +{ + if (topicList) { + AQHMQTT_TOPIC *topic; + + topic=AQHMQTT_Topic_List_First(topicList); + while(topic) { + if (AQHMQTT_Topic_GetDirection(topic)==dir) { + const char *sMask; + + sMask=AQHMQTT_Topic_GetMask(topic); + if (sMask && *sMask && GWEN_Text_ComparePattern(rcvdTopic, sMask, 0)!=-1) { + /* found a matching topic */ + return topic; + } + } + + topic=AQHMQTT_Topic_List_Next(topic); + } + } + return NULL; +} + + + +AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir) +{ + if (topicList) { + AQHMQTT_TOPIC *topic; + + topic=AQHMQTT_Topic_List_First(topicList); + while(topic) { + if (AQHMQTT_Topic_GetDirection(topic)==dir) { + const char *sTopic; + + sTopic=AQHMQTT_Topic_GetTopic(topic); + if (sTopic && *sTopic && strcasecmp(rcvdTopic, sTopic)==0) { + return topic; + } + } + topic=AQHMQTT_Topic_List_Next(topic); + } + } + return NULL; +} + + + +GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic) +{ + const char *sBefore; + const char *sAfter; + + sBefore=AQHMQTT_Topic_GetBeforeId(topic); + sAfter=AQHMQTT_Topic_GetAfterId(topic); + + if (sBefore && *sBefore && sAfter && *sAfter) { + GWEN_BUFFER *buf; + int rv; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Buffer_AppendString(buf, rcvdTopic); + rv=GWEN_Buffer_KeepTextBetweenStrings(buf, sBefore, sAfter, 1); + if (rv<0) { + DBG_INFO(NULL, "Could not extract id from [%s] (beforeId=%s, afterId=%s)", rcvdTopic, sBefore, sAfter); + GWEN_Buffer_free(buf); + return NULL; + } + + return buf; + } + + return NULL; +} + + + diff --git a/apps/aqhome-mqttlog/s_publish.h b/apps/aqhome-mqttlog/s_publish.h new file mode 100644 index 0000000..3bbeb7d --- /dev/null +++ b/apps/aqhome-mqttlog/s_publish.h @@ -0,0 +1,25 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOMEMQTT_S_PUBLISH_H +#define AQHOMEMQTT_S_PUBLISH_H + + +#include "./aqhome_mqtt.h" + +#include +#include + + + +void AQH_MqttLogServer_HandlePublishMsg(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg); + + +#endif + + diff --git a/apps/aqhome-mqttlog/server.c b/apps/aqhome-mqttlog/server.c new file mode 100644 index 0000000..9c06bab --- /dev/null +++ b/apps/aqhome-mqttlog/server.c @@ -0,0 +1,1127 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./server_p.h" +#include "./s_publish.h" +#include "./xmlread.h" +#include "./xmlwrite.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + +#define I18N(msg) msg +#define I18S(msg) msg + +#define A_ARG GWEN_ARGS_FLAGS_HAS_ARGUMENT +#define A_END (GWEN_ARGS_FLAGS_HELP | GWEN_ARGS_FLAGS_LAST) +#define A_CHAR GWEN_ArgsType_Char +#define A_INT GWEN_ArgsType_Int + + +#define AQH_MQTTLOG_SERVER_BROKER_RESTARTTIME 10 +#define AQH_MQTTLOG_SERVER_MQTT_RESTARTTIME 10 + + +enum { + AQH_MQTTLOG_SERVER_SLOT_BROKERCLOSED=1, + AQH_MQTTLOG_SERVER_SLOT_MQTTCLOSED +}; + + + +/* ------------------------------------------------------------------------------------------------ + * global vars + * ------------------------------------------------------------------------------------------------ + */ + +GWEN_INHERIT(AQH_OBJECT, AQH_MQTTLOG_SERVER) + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); +static void _readConfig(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo, GWEN_DB_NODE *dbArgs); +static const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue); +static int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue); + +static void _setMqttAddress(AQH_OBJECT *o, const char *s); +static void _setMqttPort(AQH_OBJECT *o, int i); +static void _setMqttClientId(AQH_OBJECT *o, const char *s); +static void _setMqttKeepAlive(AQH_OBJECT *o, int i); + +static int _startBroker(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo); +static int _exchangeBrokerConnect(AQH_MQTTLOG_SERVER *xo, uint32_t flags); + +static int _startMqtt(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo); +static int _exchangeMqttConnect(AQH_MQTTLOG_SERVER *xo); +static int _exchangeMqttSubscribe(AQH_MQTTLOG_SERVER *xo); + +static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); +static int _handleBrokerDown(AQH_MQTTLOG_SERVER *xo); +static int _handleMqttDown(AQH_MQTTLOG_SERVER *xo); + +static void _handleMsgFromBroker(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg); + +static void _handleMsgFromMqtt(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg); +static void _handleMqttMsgPingRsp(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg); + +static int _createPidFile(const char *pidFilename); +static int _diffInSeconds(time_t t1, time_t t0); +static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs); + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * constructor, destructor + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +AQH_OBJECT *AQH_MqttLogServer_new(AQH_EVENT_LOOP *eventLoop) +{ + AQH_OBJECT *o; + AQH_MQTTLOG_SERVER *xo; + + o=AQH_Object_new(eventLoop); + GWEN_NEW_OBJECT(AQH_MQTTLOG_SERVER, xo); + GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o, xo, _freeData); + + AQH_Object_SetSignalHandlerFn(o, _handleSignal); + xo->timeoutInSeconds=5; + + return o; +} + + + +void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) +{ + AQH_MQTTLOG_SERVER *xo; + + xo=(AQH_MQTTLOG_SERVER*) p; + + AQH_Object_free(xo->mqttEndpoint); + AQH_Object_free(xo->brokerEndpoint); + + + + GWEN_FREE_OBJECT(xo); +} + + + +AQH_MQTTLOG_SERVER *AQH_MqttLogServer_GetServerData(const AQH_OBJECT *o) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + return xo; + } + return NULL; +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * getter, setter + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int AQH_MqttLogServer_GetTimeout(const AQH_OBJECT *o) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) + return xo->timeout; + } + return 0; +} + + + +void AQH_MqttLogServer_SetPidFile(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + free(xo->pidFile); + xo->pidFile=s?strdup(s):NULL; + } + } +} + + + +void AQH_MqttLogServer_SetDeviceFile(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + free(xo->deviceFile); + xo->deviceFile=s?strdup(s):NULL; + } + } +} + + + +void _setBrokerAddress(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + free(xo->brokerAddress); + xo->brokerAddress=s?strdup(s):NULL; + } + } +} + + + +void _setBrokerPort(AQH_OBJECT *o, int i) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) + xo->brokerPort=i; + } +} + + + +void _setBrokerClientId(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + free(xo->brokerClientId); + xo->brokerClientId=s?strdup(s):NULL; + } + } +} + + + +void _setMqttAddress(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + free(xo->mqttAddress); + xo->mqttAddress=s?strdup(s):NULL; + } + } +} + + + +void _setMqttPort(AQH_OBJECT *o, int i) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) + xo->mqttPort=i; + } +} + + + +void _setMqttClientId(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + free(xo->mqttClientId); + xo->mqttClientId=s?strdup(s):NULL; + } + } +} + + + +void _setMqttKeepAlive(AQH_OBJECT *o, int i) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) + xo->mqttKeepAlive=i; + } +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * init + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int AQH_MqttLogServer_Init(AQH_OBJECT *o, int argc, char **argv) +{ + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + GWEN_DB_NODE *dbArgs; + int rv; + const char *s; + + dbArgs=GWEN_DB_Group_new("args"); + rv=_readArgs(argc, argv, dbArgs); + if (rv<0) { + DBG_ERROR(NULL, "Error reading args (%d)", rv); + return rv; + } + AQH_MergeConfigFileIntoConfig(dbArgs, "ConfigFile"); + _readConfig(o, xo, dbArgs); + + s=GWEN_DB_GetCharValue(dbArgs, "loglevel", 0, NULL); + if (s && *s) { + GWEN_LOGGER_LEVEL ll; + + ll=GWEN_Logger_Name2Level(s); + GWEN_Logger_SetLevel(NULL, ll); + } + + s=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, AQHOME_MQTT_DEFAULT_PIDFILE); + if (s && *s) { + AQH_MqttLogServer_SetPidFile(o, s); + rv=_createPidFile(s); + if (rv<0) { + DBG_ERROR(NULL, "Error creating PID file (%d)", rv); + return rv; + } + } + + DBG_INFO(NULL, "Starting Broker Connection"); + rv=_startBroker(o, xo); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + + DBG_INFO(NULL, "Starting MQTT Connection"); + rv=_startMqtt(o, xo); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + + AQH_MqttLogServer_LoadRuntimeDeviceFiles(o); + AQH_MqttLogServer_ReloadDeviceFiles(o); + + return 0; + } + else { + DBG_ERROR(NULL, "Not of type AQH_MQTTLOG_SERVER object"); + return GWEN_ERROR_INVALID; + } +} + + + +void _readConfig(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo, GWEN_DB_NODE *dbArgs) +{ + xo->dbArgs=dbArgs; + + xo->timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0); + AQH_MqttLogServer_SetDeviceFile(o, GWEN_DB_GetCharValue(dbArgs, "deviceFile", 0, AQHOME_MQTT_DEFAULT_DEVICEFILE)); + + _setMqttAddress(o, readCharConfigWithAlt(dbArgs, "mqttAddress", "ConfigFile/mqttAddr", "127.0.0.1")); + _setMqttPort(o, readIntConfigWithAlt(dbArgs, "mqttPort", "ConfigFile/mqttPort", 1883, -1)); + _setMqttClientId(o, readCharConfigWithAlt(dbArgs, "mqttClientId", "ConfigFile/mqttClientId", "aqhome-mqttlog")); + _setMqttKeepAlive(o, readIntConfigWithAlt(dbArgs, "mqttKeepAlive", "ConfigFile/mqttKeepAlive", 600, -1)); + + _setBrokerAddress(o, readCharConfigWithAlt(dbArgs, "brokerAddress", "ConfigFile/brokerAddress", "127.0.0.1")); + _setBrokerPort(o, readIntConfigWithAlt(dbArgs, "brokerPort", "ConfigFile/brokerPort", AQHOME_MQTT_DEFAULT_BROKER_PORT, -1)); + _setBrokerClientId(o, GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, AQHOME_MQTT_DEFAULT_BROKER_CLIENTID)); + +} + + + +const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue) +{ + const char *s; + + s=GWEN_DB_GetCharValue(dbArgs, varName, 0, NULL); + if (!(s && *s)) + s=GWEN_DB_GetCharValue(dbArgs, altVarName, 0, NULL); + return (s && *s)?s:defaultValue; +} + + + +int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue) +{ + int i; + + i=GWEN_DB_GetIntValue(dbArgs, varName, 0, nonValue); + if (i==nonValue) + i=GWEN_DB_GetIntValue(dbArgs, altVarName, 0, nonValue); + return (i!=nonValue)?i:defaultValue; +} + + + +int _startBroker(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo) +{ + if (xo->brokerEndpoint) { + AQH_Object_Disable(xo->brokerEndpoint); + AQH_Object_free(xo->brokerEndpoint); + xo->brokerEndpoint=NULL; + } + + if (xo->brokerAddress && *(xo->brokerAddress) && xo->brokerPort) { + AQH_OBJECT *ep; + int fd; + int rv; + + fd=AQH_TcpObject_CreateConnectedSocket(xo->brokerAddress, xo->brokerPort); + if (fd<0) { + DBG_ERROR(NULL, "Error connecting to broker server %s:%d", xo->brokerAddress, xo->brokerPort); + return GWEN_ERROR_IO; + } + DBG_ERROR(NULL, "Physically connected to broker server %s:%d", xo->brokerAddress, xo->brokerPort); + + ep=AQH_IpcClientObject_new(AQH_Object_GetEventLoop(o), fd); + assert(ep); + AQH_Endpoint_SetServiceName(ep, xo->brokerClientId); + AQH_Object_AddLink(ep, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_MQTTLOG_SERVER_SLOT_BROKERCLOSED, o); + AQH_Object_Enable(ep); + xo->brokerEndpoint=ep; + + rv=_exchangeBrokerConnect(xo, 0); + if (rv!=0) { + DBG_ERROR(NULL, "Error connecting to broker: %d", rv); + return (rv<0)?rv:GWEN_ERROR_PERMISSIONS; + } + DBG_ERROR(NULL, "Connected to broker at %s:%d", xo->brokerAddress, xo->brokerPort); + return 0; + } + else { + DBG_ERROR(NULL, "No server settings"); + return GWEN_ERROR_BAD_DATA; + } + + return 0; +} + + + +int _exchangeBrokerConnect(AQH_MQTTLOG_SERVER *xo, uint32_t flags) +{ + AQH_MESSAGE *msgOut; + uint32_t msgId; + + msgId=AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint); + msgOut=AQH_IpcMessageConnect_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, + AQH_MSGTYPE_IPC_CONNECT_REQ, + msgId, 0, + xo->brokerClientId, NULL, NULL, flags); + AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, msgOut); + return AQH_IpcEndpoint_WaitForResultMsg(xo->brokerEndpoint, + AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_RESULT, + msgId, xo->timeoutInSeconds); +} + + + +int _startMqtt(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo) +{ + if (xo->mqttEndpoint) { + AQH_Object_Disable(xo->mqttEndpoint); + AQH_Object_free(xo->mqttEndpoint); + xo->mqttEndpoint=NULL; + } + + if (xo->mqttAddress && *(xo->mqttAddress) && xo->mqttPort) { + AQH_OBJECT *ep; + int fd; + int rv; + + fd=AQH_TcpObject_CreateConnectedSocket(xo->mqttAddress, xo->mqttPort); + if (fd<0) { + DBG_ERROR(NULL, "Error connecting to MQTT server %s:%d", xo->mqttAddress, xo->mqttPort); + return GWEN_ERROR_IO; + } + DBG_ERROR(NULL, "Physically connected to MQTT server %s:%d", xo->mqttAddress, xo->mqttPort); + + ep=AQH_MqttClientObject_new(AQH_Object_GetEventLoop(o), fd); + assert(ep); + AQH_Endpoint_SetServiceName(ep, xo->mqttClientId); + AQH_Object_AddLink(ep, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_MQTTLOG_SERVER_SLOT_MQTTCLOSED, o); + AQH_Object_Enable(ep); + xo->mqttEndpoint=ep; + + rv=_exchangeMqttConnect(xo); + if (rv!=0) { + DBG_ERROR(NULL, "MQTT: Error exchanging CONNECT request (%d)", rv); + return (rv<0)?rv:GWEN_ERROR_PERMISSIONS; + } + + rv=_exchangeMqttSubscribe(xo); + if (rv!=0) { + DBG_ERROR(NULL, "MQTT: Error exchanging SUBSCRIBE request (%d)", rv); + return (rv<0)?rv:GWEN_ERROR_PERMISSIONS; + } + + DBG_ERROR(NULL, "Connected to MQTT at %s:%d", xo->mqttAddress, xo->mqttPort); + return 0; + } + else { + DBG_ERROR(NULL, "No MQTT server settings"); + return GWEN_ERROR_BAD_DATA; + } + + return 0; +} + + + +int _exchangeMqttConnect(AQH_MQTTLOG_SERVER *xo) +{ + AQH_MESSAGE *msg; + + msg=AQH_MqttMessageConnect_new("MQTT", 0x04, 0, xo->mqttKeepAlive, xo->mqttClientId, NULL, NULL); + AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msg); + + msg=AQH_MqttEndpoint_WaitForConnAckMsg(xo->mqttEndpoint, xo->timeoutInSeconds); + if (msg) { + int resultCode; + + resultCode=AQH_MqttMessageConnAck_GetResultCode(msg); + AQH_Message_free(msg); + if (resultCode==AQH_MQTTMSG_CONNACK_RESULT_ACCEPTED) { + DBG_INFO(AQH_LOGDOMAIN, "Positive CONNACK response"); + return 0; + } + else { + DBG_ERROR(NULL, "Negative CONNACK response: %d", resultCode); + return GWEN_ERROR_GENERIC; + } + } + else { + DBG_ERROR(NULL, "No CONNACK message received."); + return GWEN_ERROR_GENERIC; + } +} + + + +int _exchangeMqttSubscribe(AQH_MQTTLOG_SERVER *xo) +{ + uint16_t pckId; + AQH_MESSAGE *msg; + + pckId=AQH_Endpoint_GetNextMessageId(xo->mqttEndpoint); + msg=AQH_MqttMessageSubscribe_new(0, pckId, "#", 0); + AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msg); + + msg=AQH_MqttEndpoint_WaitForMsg(xo->mqttEndpoint, AQH_MQTTMSG_MSGTYPE_SUBACK, xo->timeoutInSeconds); + if (msg) { + int resultCode; + + resultCode=AQH_MqttMessageSubAck_GetResultCode(msg); + AQH_Message_free(msg); + if (resultCode!=128) { + DBG_INFO(AQH_LOGDOMAIN, "Positive SUBACK response"); + return 0; + } + else { + DBG_ERROR(NULL, "Negative SUBACK response: %d", resultCode); + return GWEN_ERROR_GENERIC; + } + } + else { + DBG_ERROR(NULL, "No CONNACK message received."); + return GWEN_ERROR_GENERIC; + } +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * fini + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +void AQH_MqttLogServer_Fini(AQH_OBJECT *o) +{ +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * broker management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +void AQH_MqttLogServer_HandleBrokerMsgs(AQH_OBJECT *o) +{ + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo && xo->brokerEndpoint) { + AQH_MESSAGE *msg; + + while( (msg=AQH_Endpoint_GetNextMsgIn(xo->brokerEndpoint)) ) { + AQH_Message_SetObject(msg, xo->brokerEndpoint); + _handleMsgFromBroker(o, xo->brokerEndpoint, msg); + AQH_Message_free(msg); + } + } +} + + + +void _handleMsgFromBroker(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + uint16_t code; + uint8_t protoId; + + /* exec IPC message */ + code=AQH_IpcMessage_GetCode(msg); + protoId=AQH_IpcMessage_GetProtoId(msg); + if (protoId==AQH_IPC_PROTOCOL_DATA_ID) { + DBG_ERROR(NULL, "Received IPC packet %d (%x)", (int) code, code); + switch(code) { +// case AQH_MSGTYPE_IPC_DATA_SETDATA: AQH_MqttLogServer_HandleSetData(o, ep, msg); break; + default: break; + } + } + else { + DBG_ERROR(NULL, "Invalid IPC protocol %d (%02x)", protoId, protoId); + } +} + + + +void AQH_MqttLogServer_CheckBrokerConnection(AQH_OBJECT *o) +{ + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo && xo->dbArgs) { + + if (xo->brokerEndpoint) { + if (AQH_Object_GetFlags(xo->brokerEndpoint) & AQH_OBJECT_FLAGS_DELETE) { + DBG_ERROR(NULL, "Deleting broker connection"); + AQH_Object_Disable(xo->brokerEndpoint); + AQH_Object_free(xo->brokerEndpoint); + xo->brokerEndpoint=NULL; + } + } + + if (xo->brokerEndpoint==NULL) { + time_t now; + + now=time(NULL); + if (_diffInSeconds(now, xo->timestampBrokerDown)>AQH_MQTTLOG_SERVER_BROKER_RESTARTTIME) { + int rv; + + DBG_ERROR(NULL, "Restarting broker connection"); + rv=_startBroker(o, xo); + if (rv<0) { + DBG_ERROR(NULL, "here (%d)", rv); + } + } + } + } +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * MQTT management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + + +void AQH_MqttLogServer_HandleMqttMsgs(AQH_OBJECT *o) +{ + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo && xo->mqttEndpoint) { + AQH_MESSAGE *msg; + + while( (msg=AQH_Endpoint_GetNextMsgIn(xo->mqttEndpoint)) ) { + AQH_Message_SetObject(msg, xo->mqttEndpoint); + _handleMsgFromMqtt(o, xo->mqttEndpoint, msg); + AQH_Message_free(msg); + } + } +} + + + +void _handleMsgFromMqtt(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + uint8_t code; + + /* exec IPC message */ + code=AQH_MqttMessage_GetTypeAndFlags(msg); + switch(code & 0xf0) { + case (AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0): AQH_MqttLogServer_HandlePublishMsg(o, ep, msg); break; + case (AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0): _handleMqttMsgPingRsp(o, ep, msg); break; + default: break; + } +} + + + +void _handleMqttMsgPingRsp(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + DBG_ERROR(NULL, "PING response received"); +} + + + +void AQH_MqttLogServer_CheckMqttConnection(AQH_OBJECT *o) +{ + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + if (xo->mqttEndpoint) { + if (AQH_Object_GetFlags(xo->mqttEndpoint) & AQH_OBJECT_FLAGS_DELETE) { + DBG_ERROR(NULL, "Deleting mqtt connection"); + AQH_Object_Disable(xo->mqttEndpoint); + AQH_Object_free(xo->mqttEndpoint); + xo->mqttEndpoint=NULL; + } + } + + if (xo->mqttEndpoint==NULL) { + time_t now; + + now=time(NULL); + if (_diffInSeconds(now, xo->timestampMqttDown)>AQH_MQTTLOG_SERVER_MQTT_RESTARTTIME) { + int rv; + + DBG_ERROR(NULL, "Restarting MQTT connection"); + rv=_startMqtt(o, xo); + if (rv<0) { + DBG_ERROR(NULL, "here (%d)", rv); + } + } + } + } +} + + + +int AQH_MqttLogServer_SendPing(AQH_OBJECT *o) +{ + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + AQH_MESSAGE *msgOut; + + DBG_ERROR(NULL, "Sending PING"); + msgOut=AQH_MqttMessage_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL); + AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msgOut); + return 0; + } + return GWEN_ERROR_INVALID; +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * device management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +AQHMQTT_DEVICE_LIST *AQH_MqttLogServer_GetAvailableDeviceList(const AQH_OBJECT *o) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) + return xo->availableDeviceList; + } + return NULL; +} + + + +void AQH_MqttLogServer_SetAvailableDeviceList(AQH_OBJECT *o, AQHMQTT_DEVICE_LIST *dl) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + AQHMQTT_Device_List_free(xo->availableDeviceList); + xo->availableDeviceList=dl; + } + } +} + + + +void AQH_MqttLogServer_SetRegisteredDeviceList(AQH_OBJECT *o, AQHMQTT_DEVICE_LIST *dl) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + AQHMQTT_Device_List_free(xo->registeredDeviceList); + xo->registeredDeviceList=dl; + } + } +} + + + +AQHMQTT_DEVICE *AQH_MqttLogServer_FindRegisteredDevice(AQH_OBJECT *o, const char *wantedDeviceId) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo && xo->registeredDeviceList) { + return AQHMQTT_Device_List_GetById(xo->registeredDeviceList, wantedDeviceId); + } + else { + DBG_ERROR(NULL, "No registered devices"); + } + } + + return NULL; +} + + + +void AQH_MqttLogServer_DumpRegisteredDevices(const AQH_OBJECT *o) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo && xo->registeredDeviceList) { + AQHMQTT_DEVICE *device; + + device=AQHMQTT_Device_List_First(xo->registeredDeviceList); + if (device) { + fprintf(stderr, "Registered Devices:\n"); + while(device) { + const char *sDeviceName; + const char *sDeviceId; + + sDeviceName=AQHMQTT_Device_GetName(device); + sDeviceId=AQHMQTT_Device_GetId(device); + fprintf(stderr, " %s (%s)\n", sDeviceId?sDeviceId:"", sDeviceName?sDeviceName:""); + device=AQHMQTT_Device_List_Next(device); + } + } + else { + fprintf(stderr, "No registered devices\n"); + } + } + else { + fprintf(stderr, "No registered devices\n"); + } + } +} + + + +void AQH_MqttLogServer_ReloadDeviceFiles(AQH_OBJECT *o) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + AQHMQTT_DEVICE_LIST *deviceList; + + DBG_ERROR(NULL, "Loading devices description files"); + deviceList=AQH_MqttLogServer_ReadDataDeviceFiles(o); + if (deviceList) + AQH_MqttLogServer_SetAvailableDeviceList(o, deviceList); + } + } +} + + + +void AQH_MqttLogServer_LoadRuntimeDeviceFiles(AQH_OBJECT *o) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + AQHMQTT_DEVICE_LIST *deviceList; + + DBG_ERROR(NULL, "Loading registered devices from file \"%s\"", xo->deviceFile); + deviceList=AQH_MqttLogServer_ReadDeviceFile(o, xo->deviceFile); + if (deviceList) + AQH_MqttLogServer_SetRegisteredDeviceList(o, deviceList); + } + } +} + + + +int AQH_MqttLogServer_SaveRuntimeDeviceFiles(AQH_OBJECT *o) +{ + if (o) { + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + int rv; + + rv=AQH_MqttLogServer_WriteDevicesFile(xo->registeredDeviceList, xo->deviceFile); + if (rv<0) { + DBG_INFO(NULL, "Error writing devices to \"%s\" (%d)", xo->deviceFile, rv); + return rv; + } + } + } + + return 0; +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * signal handler + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, GWEN_UNUSED int param1, void *param2) +{ + AQH_MQTTLOG_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o); + if (xo) { + switch(slotId) { + case AQH_MQTTLOG_SERVER_SLOT_BROKERCLOSED: return _handleBrokerDown(xo); + case AQH_MQTTLOG_SERVER_SLOT_MQTTCLOSED: return _handleMqttDown(xo); + default: + break; + } + } + + return 0; /* not handled */ +} + + + +int _handleBrokerDown(AQH_MQTTLOG_SERVER *xo) +{ + if (xo->brokerEndpoint) { + DBG_ERROR(NULL, "Broker connection down"); + AQH_Object_AddFlags(xo->brokerEndpoint, AQH_OBJECT_FLAGS_DELETE); + xo->timestampBrokerDown=time(NULL); + } + return 1; +} + + + +int _handleMqttDown(AQH_MQTTLOG_SERVER *xo) +{ + if (xo->mqttEndpoint) { + DBG_ERROR(NULL, "MQTT connection down"); + AQH_Object_AddFlags(xo->mqttEndpoint, AQH_OBJECT_FLAGS_DELETE); + xo->timestampMqttDown=time(NULL); + } + return 1; +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * helper functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int _createPidFile(const char *pidFilename) +{ + FILE *f; + int pidfd; + + if (remove(pidFilename)==0) { + DBG_ERROR(0, "Old PID file existed, removed. (Unclean shutdown?)"); + } + +#ifdef HAVE_SYS_STAT_H + pidfd = open(pidFilename, O_EXCL|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); + if (pidfd < 0) { + DBG_ERROR(NULL, "Could not create PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + + f = fdopen(pidfd, "w"); +#else /* HAVE_STAT_H */ + f=fopen(pidFilename,"w+"); +#endif /* HAVE_STAT_H */ + + /* write pid */ +#ifdef HAVE_GETPID + fprintf(f,"%d\n",getpid()); +#else + fprintf(f,"-1\n"); +#endif + if (fclose(f)) { + DBG_ERROR(0, "Could not close PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + return 0; +} + + + +int _diffInSeconds(time_t t1, time_t t0) +{ + return t1-t0; +} + + + +int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) +{ + int rv; + const GWEN_ARGS args[]= { + /* flags type name min max s long short_descr, long_descr */ + { A_ARG, A_CHAR, "loglevel", 0, 1, "L", "loglevel", I18S("Specify loglevel"), NULL}, + { A_ARG, A_CHAR, "cfgdir", 0, 1, "D", "cfgdir", I18S("Specify the configuration folder"), NULL}, + { A_ARG, A_CHAR, "charset", 0, 1, NULL, "charset", I18S("Specify the output character set"), NULL}, + { A_ARG, A_CHAR, "mqttAddress", 0, 1, "t", "mqttaddress", I18S("Address of MQTT server"), NULL}, + { A_ARG, A_INT, "mqttPort", 0, 1, "P", "mqttport", I18S("Port of MQTT server (default: 1883)"), NULL}, + { A_ARG, A_CHAR, "mqttClientId", 0, 1, NULL, "mqttclientid", I18S("MQTT client id"), NULL}, + { A_ARG, A_INT, "mqttKeepAlive", 0, 1, "P", "mqttkeepalive", I18S("MQTT keep-alive time"), NULL}, + { A_ARG, A_CHAR, "brokerAddress", 0, 1, "ba", "brokerddress", I18S("Broker address [127.0.0.1]"), NULL}, + { A_ARG, A_INT, "brokerPort", 0, 1, "bp", "brokerport", I18S("Broker port [1899]"), NULL}, + { A_ARG, A_CHAR, "brokerClientId", 0, 1, NULL, "brokerclientid", I18S("Broker client id"), NULL}, + { A_ARG, A_CHAR, "deviceFile", 0, 1, "d", "devicefile", I18S("Device file"), NULL}, + { A_ARG, A_CHAR, "pidfile", 0, 1, "p", "pidfile", I18S("PID file"), NULL}, + { A_ARG, A_INT, "timeout", 0, 1, "T", NULL, I18S("timeout in seconds [0]"), NULL}, + { A_END, A_INT, "help", 0, 0, "h", "help", I18S("Show this help screen"), NULL} + }; + + rv=GWEN_Args_Check(argc, argv, 1, 0, args, dbArgs); + if (rv==GWEN_ARGS_RESULT_ERROR) { + fprintf(stderr, "ERROR: Could not parse arguments main\n"); + return GWEN_ERROR_INVALID; + } + else if (rv==GWEN_ARGS_RESULT_HELP) { + GWEN_BUFFER *ubuf; + + ubuf=GWEN_Buffer_new(0, 1024, 0, 1); + GWEN_Buffer_AppendArgs(ubuf, + I18N("This is version %s.\nUsage: %s [OPTIONS]\n\nOptions:\n"), + AQHOME_VERSION_STRING, + argv[0]); + if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) { + fprintf(stderr, "ERROR: Could not create help string\n"); + return 1; + } + GWEN_Buffer_AppendString(ubuf, "\n"); + + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(ubuf)); + GWEN_Buffer_free(ubuf); + return GWEN_ERROR_CLOSE; + } + return 0; +} + + + diff --git a/apps/aqhome-mqttlog/server.h b/apps/aqhome-mqttlog/server.h new file mode 100644 index 0000000..9b7f35b --- /dev/null +++ b/apps/aqhome-mqttlog/server.h @@ -0,0 +1,75 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef SERVER_H +#define SERVER_H + + + +#include "aqhome-mqttlog/types/device.h" + +#include "aqhome/events2/object.h" + + + +#define AQH_ENDPOINT_PERMS_LISTVALUES 0x0001 +#define AQH_ENDPOINT_PERMS_READVALUE 0x0002 +#define AQH_ENDPOINT_PERMS_ADDVALUE 0x0004 + +#define AQH_ENDPOINT_PERMS_LISTDATA 0x0010 +#define AQH_ENDPOINT_PERMS_READDATA 0x0020 +#define AQH_ENDPOINT_PERMS_ADDDATA 0x0040 +#define AQH_ENDPOINT_PERMS_SETDATA 0x0080 + +#define AQH_ENDPOINT_PERMS_LISTDEVICES 0x0100 +#define AQH_ENDPOINT_PERMS_READDEVICE 0x0200 +#define AQH_ENDPOINT_PERMS_ADDDEVICE 0x0400 +#define AQH_ENDPOINT_PERMS_MODDEVICE 0x0800 + + + +AQH_OBJECT *AQH_MqttLogServer_new(AQH_EVENT_LOOP *eventLoop); +int AQH_MqttLogServer_Init(AQH_OBJECT *o, int argc, char **argv); +void AQH_MqttLogServer_Fini(AQH_OBJECT *o); + +void AQH_MqttLogServer_ReloadDeviceFiles(AQH_OBJECT *o); +void AQH_MqttLogServer_LoadRuntimeDeviceFiles(AQH_OBJECT *o); +int AQH_MqttLogServer_SaveRuntimeDeviceFiles(AQH_OBJECT *o); + + +/* loop functions */ +void AQH_MqttLogServer_HandleBrokerMsgs(AQH_OBJECT *o); +void AQH_MqttLogServer_HandleMqttMsgs(AQH_OBJECT *o); +void AQH_MqttLogServer_CheckBrokerConnection(AQH_OBJECT *o); +void AQH_MqttLogServer_CheckMqttConnection(AQH_OBJECT *o); +int AQH_MqttLogServer_SendPing(AQH_OBJECT *o); + + +/* getters and setters */ +int AQH_MqttLogServer_GetTimeout(const AQH_OBJECT *o); +void AQH_MqttLogServer_SetPidFile(AQH_OBJECT *o, const char *s); +void AQH_MqttLogServer_SetDeviceFile(AQH_OBJECT *o, const char *s); + + +/* device management */ +AQHMQTT_DEVICE_LIST *AQH_MqttLogServer_GetAvailableDeviceList(const AQH_OBJECT *o); +void AQH_MqttLogServer_SetAvailableDeviceList(AQH_OBJECT *o, AQHMQTT_DEVICE_LIST *dl); +void AQH_MqttLogServer_SetRegisteredDeviceList(AQH_OBJECT *o, AQHMQTT_DEVICE_LIST *dl); +AQHMQTT_DEVICE *AQH_MqttLogServer_FindRegisteredDevice(AQH_OBJECT *o, const char *wantedDeviceId); +void AQH_MqttLogServer_DumpRegisteredDevices(const AQH_OBJECT *o); + + + + + + + + + + +#endif diff --git a/apps/aqhome-mqttlog/server_p.h b/apps/aqhome-mqttlog/server_p.h new file mode 100644 index 0000000..34f6ad3 --- /dev/null +++ b/apps/aqhome-mqttlog/server_p.h @@ -0,0 +1,68 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef SERVER_P_H +#define SERVER_P_H + + +#include "./server.h" +#include "aqhome-nodes/types/device.h" + +#include "aqhome/nodes/nodedb.h" +#include "aqhome/ipc2/msgrequest.h" + +#include + + +/* default values */ +#define AQHOME_MQTT_DEFAULT_PIDFILE "/var/run/aqhome-mqtt.pid" +#define AQHOME_MQTT_DEFAULT_DATADIR "/var/lib/aqhome-mqtt" +#define AQHOME_MQTT_DEFAULT_DEVICEFILE "mqttlog/registereddevices.xml" + +#define AQHOME_MQTT_DEFAULT_BROKER_PORT 1899 +#define AQHOME_MQTT_DEFAULT_BROKER_CLIENTID "mqtt" + + + +typedef struct AQH_MQTTLOG_SERVER AQH_MQTTLOG_SERVER; +struct AQH_MQTTLOG_SERVER { + AQH_OBJECT *mqttEndpoint; + AQH_OBJECT *brokerEndpoint; + + GWEN_DB_NODE *dbArgs; + + AQHMQTT_DEVICE_LIST *availableDeviceList; + AQHMQTT_DEVICE_LIST *registeredDeviceList; + + char *deviceFile; + char *pidFile; + + int timeout; /* timeout for run e.g. inside valgrind */ + + char *mqttAddress; + int mqttPort; + char *mqttClientId; + int mqttKeepAlive; + + char *brokerAddress; + int brokerPort; + char *brokerClientId; + + time_t timestampMqttDown; + time_t timestampBrokerDown; + + int timeoutInSeconds; + +}; + + +AQH_MQTTLOG_SERVER *AQH_MqttLogServer_GetServerData(const AQH_OBJECT *o); + + +#endif + diff --git a/apps/aqhome-mqttlog/xmlread.c b/apps/aqhome-mqttlog/xmlread.c index e484c7c..39db5d6 100644 --- a/apps/aqhome-mqttlog/xmlread.c +++ b/apps/aqhome-mqttlog/xmlread.c @@ -1,6 +1,6 @@ /**************************************************************************** * This file is part of the project AqHome. - * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. @@ -11,7 +11,7 @@ #endif #include "./xmlread.h" -#include "./aqhome_mqtt_p.h" +#include "./server_p.h" #include "aqhome-mqttlog/types/topic.h" #include "aqhome-mqttlog/types/value.h" #include "aqhome-mqttlog/types/translation.h" @@ -41,16 +41,16 @@ * ------------------------------------------------------------------------------------------------ */ -static AQHMQTT_DEVICE_LIST *_readDeviceFiles(AQHOME_MQTT *aqh, const GWEN_STRINGLIST *sl); -static int _readDeviceFileToList(AQHOME_MQTT *aqh, const char *sFilename, AQHMQTT_DEVICE_LIST *deviceList); -static int _readXmlDevices(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceListNode, AQHMQTT_DEVICE_LIST *deviceList); -static AQHMQTT_DEVICE *_readXmlDevice(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceNode); -static AQHMQTT_TOPIC_LIST *_readXmlTopicList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode); -static AQHMQTT_TOPIC *_readXmlTopic(AQHOME_MQTT *aqh, GWEN_XMLNODE *topicNode); -static AQHMQTT_VALUE_LIST *_readXmlValueList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode); -static AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode); -static AQHMQTT_TRANSLATION_LIST *_readXmlTranslationList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode); -static AQHMQTT_TRANSLATION *_readXmlTranslation(AQHOME_MQTT *aqh, GWEN_XMLNODE *translationNode); +static AQHMQTT_DEVICE_LIST *_readDeviceFiles(const GWEN_STRINGLIST *sl); +static int _readDeviceFileToList(const char *sFilename, AQHMQTT_DEVICE_LIST *deviceList); +static int _readXmlDevices(GWEN_XMLNODE *deviceListNode, AQHMQTT_DEVICE_LIST *deviceList); +static AQHMQTT_DEVICE *_readXmlDevice(GWEN_XMLNODE *deviceNode); +static AQHMQTT_TOPIC_LIST *_readXmlTopicList(GWEN_XMLNODE *parentNode); +static AQHMQTT_TOPIC *_readXmlTopic(GWEN_XMLNODE *topicNode); +static AQHMQTT_VALUE_LIST *_readXmlValueList(GWEN_XMLNODE *parentNode); +static AQHMQTT_VALUE *_readXmlValue(GWEN_XMLNODE *valueNode); +static AQHMQTT_TRANSLATION_LIST *_readXmlTranslationList(GWEN_XMLNODE *parentNode); +static AQHMQTT_TRANSLATION *_readXmlTranslation(GWEN_XMLNODE *translationNode); @@ -60,61 +60,76 @@ static AQHMQTT_TRANSLATION *_readXmlTranslation(AQHOME_MQTT *aqh, GWEN_XMLNODE * * ------------------------------------------------------------------------------------------------ */ -AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadDeviceFile(AQHOME_MQTT *aqh, const char *sFilename) +AQHMQTT_DEVICE_LIST *AQH_MqttLogServer_ReadDeviceFile(AQH_OBJECT *o, const char *sFilename) { - int rv; + if (o) { + AQH_MQTTLOG_SERVER *xo; - rv=GWEN_Directory_GetPath(sFilename, GWEN_PATH_FLAGS_CHECKROOT | GWEN_PATH_FLAGS_PATHMUSTEXIST | GWEN_PATH_FLAGS_VARIABLE); - if (rv<0) { - DBG_ERROR(NULL, "File \"%s\" does not exists, writing later", sFilename); - return NULL; - } - else { - AQHMQTT_DEVICE_LIST *deviceList; + xo=AQH_MqttLogServer_GetServerData(o); + if (xo) { + int rv; - deviceList=AQHMQTT_Device_List_new(); - rv=_readDeviceFileToList(aqh, sFilename, deviceList); - if (rv<0) { - DBG_ERROR(NULL, "File \"%s\" not found", sFilename); - AQHMQTT_Device_List_free(deviceList); - return NULL; + rv=GWEN_Directory_GetPath(sFilename, GWEN_PATH_FLAGS_CHECKROOT | GWEN_PATH_FLAGS_PATHMUSTEXIST | GWEN_PATH_FLAGS_VARIABLE); + if (rv<0) { + DBG_ERROR(NULL, "File \"%s\" does not exists, writing later", sFilename); + return NULL; + } + else { + AQHMQTT_DEVICE_LIST *deviceList; + + deviceList=AQHMQTT_Device_List_new(); + rv=_readDeviceFileToList(sFilename, deviceList); + if (rv<0) { + DBG_ERROR(NULL, "File \"%s\" not found", sFilename); + AQHMQTT_Device_List_free(deviceList); + return NULL; + } + + if (AQHMQTT_Device_List_GetCount(deviceList)<1) { + AQHMQTT_Device_List_free(deviceList); + return NULL; + } + return deviceList; + } } - - if (AQHMQTT_Device_List_GetCount(deviceList)<1) { - AQHMQTT_Device_List_free(deviceList); - return NULL; - } - return deviceList; } + return NULL; } -AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadDataDeviceFiles(AQHOME_MQTT *aqh) +AQHMQTT_DEVICE_LIST *AQH_MqttLogServer_ReadDataDeviceFiles(AQH_OBJECT *o) { - GWEN_STRINGLIST *sl; + if (o) { + AQH_MQTTLOG_SERVER *xo; - sl=AQH_GetListOfMatchingDataFiles("aqhome/devices/mqtt", "*.xml"); - if (sl) { - AQHMQTT_DEVICE_LIST *deviceList; + xo=AQH_MqttLogServer_GetServerData(o); + if (xo) { + GWEN_STRINGLIST *sl; - deviceList=_readDeviceFiles(aqh, sl); - GWEN_StringList_free(sl); - if (deviceList==NULL) { - DBG_INFO(NULL, "Error reading data device files"); - return NULL; + sl=AQH_GetListOfMatchingDataFiles("aqhome/devices/mqtt", "*.xml"); + if (sl) { + AQHMQTT_DEVICE_LIST *deviceList; + + deviceList=_readDeviceFiles(sl); + GWEN_StringList_free(sl); + if (deviceList==NULL) { + DBG_INFO(NULL, "Error reading data device files"); + return NULL; + } + return deviceList; + } + else { + DBG_ERROR(NULL, "No data device files"); + } } - return deviceList; - } - else { - DBG_ERROR(NULL, "No data device files"); - return NULL; } + return NULL; } -AQHMQTT_DEVICE_LIST *_readDeviceFiles(AQHOME_MQTT *aqh, const GWEN_STRINGLIST *sl) +AQHMQTT_DEVICE_LIST *_readDeviceFiles(const GWEN_STRINGLIST *sl) { GWEN_STRINGLISTENTRY *se; AQHMQTT_DEVICE_LIST *deviceList; @@ -129,7 +144,7 @@ AQHMQTT_DEVICE_LIST *_readDeviceFiles(AQHOME_MQTT *aqh, const GWEN_STRINGLIST *s int rv; DBG_INFO(NULL, "Reading device file \"%s\"", s); - rv=_readDeviceFileToList(aqh, s, deviceList); + rv=_readDeviceFileToList(s, deviceList); if (rv<0 && rv!=GWEN_ERROR_NO_DATA) { DBG_WARN(NULL, "Error reading device file \"%s\" (%d), ignoring", s, rv); } @@ -147,7 +162,7 @@ AQHMQTT_DEVICE_LIST *_readDeviceFiles(AQHOME_MQTT *aqh, const GWEN_STRINGLIST *s -int _readDeviceFileToList(AQHOME_MQTT *aqh, const char *sFilename, AQHMQTT_DEVICE_LIST *deviceList) +int _readDeviceFileToList(const char *sFilename, AQHMQTT_DEVICE_LIST *deviceList) { GWEN_XMLNODE *rootNode; GWEN_XMLNODE *deviceListNode; @@ -165,7 +180,7 @@ int _readDeviceFileToList(AQHOME_MQTT *aqh, const char *sFilename, AQHMQTT_DEVIC if (deviceListNode==NULL) deviceListNode=rootNode; - rv=_readXmlDevices(aqh, deviceListNode, deviceList); + rv=_readXmlDevices(deviceListNode, deviceList); if (rv<0 && rv!=GWEN_ERROR_NO_DATA) { DBG_ERROR(AQH_LOGDOMAIN, "Error reading devices from file \"%s\" (%d)", sFilename, rv); GWEN_XMLNode_free(rootNode); @@ -179,7 +194,7 @@ int _readDeviceFileToList(AQHOME_MQTT *aqh, const char *sFilename, AQHMQTT_DEVIC -int _readXmlDevices(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceListNode, AQHMQTT_DEVICE_LIST *deviceList) +int _readXmlDevices(GWEN_XMLNODE *deviceListNode, AQHMQTT_DEVICE_LIST *deviceList) { GWEN_XMLNODE *deviceNode; @@ -192,7 +207,7 @@ int _readXmlDevices(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceListNode, AQHMQTT_DEVI if (driverName && *driverName && strcasecmp(driverName, "mqtt")==0) { AQHMQTT_DEVICE *device; - device=_readXmlDevice(aqh, deviceNode); + device=_readXmlDevice(deviceNode); if (device==NULL) { DBG_INFO(NULL, "Error reading device from XML"); return GWEN_ERROR_BAD_DATA; @@ -227,7 +242,7 @@ int _readXmlDevices(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceListNode, AQHMQTT_DEVI -AQHMQTT_DEVICE *_readXmlDevice(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceNode) +AQHMQTT_DEVICE *_readXmlDevice(GWEN_XMLNODE *deviceNode) { AQHMQTT_DEVICE *device; GWEN_XMLNODE *topicsNode; @@ -241,7 +256,7 @@ AQHMQTT_DEVICE *_readXmlDevice(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceNode) if (topicsNode) { AQHMQTT_TOPIC_LIST *topicList; - topicList=_readXmlTopicList(aqh, topicsNode); + topicList=_readXmlTopicList(topicsNode); if (topicList) AQHMQTT_Device_SetTopicList(device, topicList); else { @@ -261,7 +276,7 @@ AQHMQTT_DEVICE *_readXmlDevice(AQHOME_MQTT *aqh, GWEN_XMLNODE *deviceNode) -AQHMQTT_TOPIC_LIST *_readXmlTopicList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode) +AQHMQTT_TOPIC_LIST *_readXmlTopicList(GWEN_XMLNODE *parentNode) { AQHMQTT_TOPIC_LIST *topicList; GWEN_XMLNODE *topicNode; @@ -269,7 +284,7 @@ AQHMQTT_TOPIC_LIST *_readXmlTopicList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode topicList=AQHMQTT_Topic_List_new(); topicNode=GWEN_XMLNode_FindFirstTag(parentNode, "mqtttopic", NULL, NULL); while(topicNode) { - AQHMQTT_TOPIC *topic=_readXmlTopic(aqh, topicNode); + AQHMQTT_TOPIC *topic=_readXmlTopic(topicNode); if (topic) AQHMQTT_Topic_List_Add(topic, topicList); else { @@ -289,7 +304,7 @@ AQHMQTT_TOPIC_LIST *_readXmlTopicList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode -AQHMQTT_TOPIC *_readXmlTopic(AQHOME_MQTT *aqh, GWEN_XMLNODE *topicNode) +AQHMQTT_TOPIC *_readXmlTopic(GWEN_XMLNODE *topicNode) { AQHMQTT_TOPIC *topic; GWEN_XMLNODE *valuesNode; @@ -325,7 +340,7 @@ AQHMQTT_TOPIC *_readXmlTopic(AQHOME_MQTT *aqh, GWEN_XMLNODE *topicNode) if (valuesNode) { AQHMQTT_VALUE_LIST *valueList; - valueList=_readXmlValueList(aqh, valuesNode); + valueList=_readXmlValueList(valuesNode); if (valueList) AQHMQTT_Topic_SetValueList(topic, valueList); else { @@ -345,7 +360,7 @@ AQHMQTT_TOPIC *_readXmlTopic(AQHOME_MQTT *aqh, GWEN_XMLNODE *topicNode) -AQHMQTT_VALUE_LIST *_readXmlValueList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode) +AQHMQTT_VALUE_LIST *_readXmlValueList(GWEN_XMLNODE *parentNode) { AQHMQTT_VALUE_LIST *valueList; GWEN_XMLNODE *valueNode; @@ -353,7 +368,7 @@ AQHMQTT_VALUE_LIST *_readXmlValueList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode valueList=AQHMQTT_Value_List_new(); valueNode=GWEN_XMLNode_FindFirstTag(parentNode, "value", NULL, NULL); while(valueNode) { - AQHMQTT_VALUE *value=_readXmlValue(aqh, valueNode); + AQHMQTT_VALUE *value=_readXmlValue(valueNode); if (value) AQHMQTT_Value_List_Add(value, valueList); else { @@ -373,7 +388,7 @@ AQHMQTT_VALUE_LIST *_readXmlValueList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode -AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode) +AQHMQTT_VALUE *_readXmlValue(GWEN_XMLNODE *valueNode) { AQHMQTT_VALUE *value; GWEN_XMLNODE *translationNode; @@ -398,7 +413,7 @@ AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode) if (translationNode) { AQHMQTT_TRANSLATION_LIST *translationList; - translationList=_readXmlTranslationList(aqh, translationNode); + translationList=_readXmlTranslationList(translationNode); if (translationList) { DBG_INFO(NULL, "Translations read"); AQHMQTT_Value_SetTranslationList(value, translationList); @@ -413,7 +428,7 @@ AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode) -AQHMQTT_TRANSLATION_LIST *_readXmlTranslationList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode) +AQHMQTT_TRANSLATION_LIST *_readXmlTranslationList(GWEN_XMLNODE *parentNode) { AQHMQTT_TRANSLATION_LIST *translationList; GWEN_XMLNODE *translationNode; @@ -421,7 +436,7 @@ AQHMQTT_TRANSLATION_LIST *_readXmlTranslationList(AQHOME_MQTT *aqh, GWEN_XMLNODE translationList=AQHMQTT_Translation_List_new(); translationNode=GWEN_XMLNode_FindFirstTag(parentNode, "translation", NULL, NULL); while(translationNode) { - AQHMQTT_TRANSLATION *translation=_readXmlTranslation(aqh, translationNode); + AQHMQTT_TRANSLATION *translation=_readXmlTranslation(translationNode); if (translation) AQHMQTT_Translation_List_Add(translation, translationList); else { @@ -441,7 +456,7 @@ AQHMQTT_TRANSLATION_LIST *_readXmlTranslationList(AQHOME_MQTT *aqh, GWEN_XMLNODE -AQHMQTT_TRANSLATION *_readXmlTranslation(AQHOME_MQTT *aqh, GWEN_XMLNODE *translationNode) +AQHMQTT_TRANSLATION *_readXmlTranslation(GWEN_XMLNODE *translationNode) { const char *sAqhValue; const char *sDriverValue; diff --git a/apps/aqhome-mqttlog/xmlread.h b/apps/aqhome-mqttlog/xmlread.h index e3a1772..1b4f964 100644 --- a/apps/aqhome-mqttlog/xmlread.h +++ b/apps/aqhome-mqttlog/xmlread.h @@ -1,6 +1,6 @@ /**************************************************************************** * This file is part of the project AqHome. - * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. @@ -9,14 +9,14 @@ #ifndef AQHOME_MQTTLOG_XMLREAD_H #define AQHOME_MQTTLOG_XMLREAD_H - +#include "aqhome-mqttlog/server.h" #include "aqhome-mqttlog/aqhome_mqtt.h" #include "aqhome-mqttlog/types/device.h" -AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadDataDeviceFiles(AQHOME_MQTT *aqh); -AQHMQTT_DEVICE_LIST *AqHomeMqttLog_ReadDeviceFile(AQHOME_MQTT *aqh, const char *sFilename); +AQHMQTT_DEVICE_LIST *AQH_MqttLogServer_ReadDataDeviceFiles(AQH_OBJECT *o); +AQHMQTT_DEVICE_LIST *AQH_MqttLogServer_ReadDeviceFile(AQH_OBJECT *o, const char *sFilename); diff --git a/apps/aqhome-mqttlog/xmlwrite.c b/apps/aqhome-mqttlog/xmlwrite.c index aee0968..5e9ae67 100644 --- a/apps/aqhome-mqttlog/xmlwrite.c +++ b/apps/aqhome-mqttlog/xmlwrite.c @@ -1,6 +1,6 @@ /**************************************************************************** * This file is part of the project AqHome. - * AqHome (c) by 2024 Martin Preuss, all rights reserved. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. @@ -54,7 +54,7 @@ static void _setXmlCharValueIfNotNull(GWEN_XMLNODE *n, const char *name, const c -int AqHomeMqttLog_WriteDevicesFile(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE_LIST *deviceList, const char *sFilename) +int AQH_MqttLogServer_WriteDevicesFile(const AQHMQTT_DEVICE_LIST *deviceList, const char *sFilename) { int rv; diff --git a/apps/aqhome-mqttlog/xmlwrite.h b/apps/aqhome-mqttlog/xmlwrite.h index f814ba1..e756fa6 100644 --- a/apps/aqhome-mqttlog/xmlwrite.h +++ b/apps/aqhome-mqttlog/xmlwrite.h @@ -1,6 +1,6 @@ /**************************************************************************** * This file is part of the project AqHome. - * AqHome (c) by 2024 Martin Preuss, all rights reserved. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. @@ -9,13 +9,13 @@ #ifndef AQHOME_MQTTLOG_XMLWRITE_H #define AQHOME_MQTTLOG_XMLWRITE_H - +#include "aqhome-mqttlog/server.h" #include "aqhome-mqttlog/aqhome_mqtt.h" #include "aqhome-mqttlog/types/device.h" -int AqHomeMqttLog_WriteDevicesFile(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE_LIST *deviceList, const char *sFilename); +int AQH_MqttLogServer_WriteDevicesFile(const AQHMQTT_DEVICE_LIST *deviceList, const char *sFilename); diff --git a/apps/aqhome-nodes/main.c b/apps/aqhome-nodes/main.c index 9ee2595..ea559b4 100644 --- a/apps/aqhome-nodes/main.c +++ b/apps/aqhome-nodes/main.c @@ -107,6 +107,8 @@ int main(int argc, char **argv) aqh=AQH_NodeServer_new(eventLoop); rv=AQH_NodeServer_Init(aqh, argc, argv); if (rv<0) { + if (rv==GWEN_ERROR_CLOSE) + return 1; DBG_INFO(NULL, "here (%d)", rv); return 2; } diff --git a/apps/aqhome-nodes/server.c b/apps/aqhome-nodes/server.c index e780c69..d63c190 100644 --- a/apps/aqhome-nodes/server.c +++ b/apps/aqhome-nodes/server.c @@ -592,13 +592,13 @@ int _exchangeConnect(AQH_OBJECT *o, AQH_NODE_SERVER *xo, uint32_t flags) AQH_MESSAGE *msgOut; uint32_t msgId; - msgId=AQH_Endpoint_GetNextMessageId(xo->ipcEndpoint); + msgId=AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint); msgOut=AQH_IpcMessageConnect_new(xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_CONNECT_REQ, msgId, 0, xo->brokerClientId, NULL, NULL, flags); - AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); - return AQH_IpcEndpoint_WaitForResultMsg(xo->ipcEndpoint, + AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, msgOut); + return AQH_IpcEndpoint_WaitForResultMsg(xo->brokerEndpoint, xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_RESULT, msgId, xo->timeoutInSeconds); } @@ -1206,7 +1206,7 @@ int _handleTtyDown(AQH_NODE_SERVER *xo) { if (xo->ttyEndpoint) { DBG_ERROR(NULL, "TTY closed"); - AQH_Object_AddFlags(xo->brokerEndpoint, AQH_OBJECT_FLAGS_DELETE); + AQH_Object_AddFlags(xo->ttyEndpoint, AQH_OBJECT_FLAGS_DELETE); xo->timestampTtyDown=time(NULL); } return 1; diff --git a/aqhome-mqttlog.sh b/aqhome-mqttlog.sh index 7219f2f..86a4189 100755 --- a/aqhome-mqttlog.sh +++ b/aqhome-mqttlog.sh @@ -4,5 +4,5 @@ export AQHOME_LOGLEVEL=info export LD_LIBRARY_PATH="0-build/aqhome/:$LD_LIBRARY_PATH" # 0-build/apps/aqhome-mqttlog/aqhome-mqttlog -ma 192.168.117.192 -mp 1883 -W /tmp/aqhome/mqttlog -i apps/aqhome-mqttlog/mqttlog.conf --mqttclientid=AQHOMEMQTTLOGTEST $* -0-build/apps/aqhome-mqttlog/aqhome-mqttlog --mqttclientid=AQHOMEMQTTLOGTEST -p ./aqhome-mqtt.pid -d ./aqhome-mqtt.devices "$@" +0-build/apps/aqhome-mqttlog/aqhome-mqttlog --mqttclientid=MQTTLOGTEST2 -p ./aqhome-mqtt.pid -d ./aqhome-mqtt.devices "$@" diff --git a/aqhome/ipc2/0BUILD b/aqhome/ipc2/0BUILD index 71f75f1..863cf1e 100644 --- a/aqhome/ipc2/0BUILD +++ b/aqhome/ipc2/0BUILD @@ -49,6 +49,7 @@ msgwriter.h ipcmsgreader.h nodemsgreader.h + mqttmsgreader.h ttyobject.h tcpd_object.h tcp_object.h @@ -58,6 +59,8 @@ ipc_server.h ipc_client.h tty_endpoint.h + mqtt_endpoint.h + mqtt_client.h @@ -79,6 +82,7 @@ msgwriter.c ipcmsgreader.c nodemsgreader.c + mqttmsgreader.c ttyobject.c tcpd_object.c tcp_object.c @@ -89,6 +93,8 @@ ipc_client.c ipc_endpoint.c tty_endpoint.c + mqtt_endpoint.c + mqtt_client.c diff --git a/aqhome/ipc2/endpoint.c b/aqhome/ipc2/endpoint.c index cef7b6b..1891adc 100644 --- a/aqhome/ipc2/endpoint.c +++ b/aqhome/ipc2/endpoint.c @@ -395,11 +395,17 @@ void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg) xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); if (xo) { - AQH_Message_List_Add(msg, xo->msgOutList); - if (xo->msgWriter && AQH_Message_List_GetCount(xo->msgOutList)==1) { - DBG_INFO(AQH_LOGDOMAIN, "Enabling msgWriter, sending message"); - AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg)); - AQH_Object_Enable(xo->msgWriter); + if (AQH_Message_GetUsedSize(msg)<1) { + DBG_ERROR(AQH_LOGDOMAIN, "Empty message, not sending"); + AQH_Message_free(msg); + } + else { + AQH_Message_List_Add(msg, xo->msgOutList); + if (xo->msgWriter && AQH_Message_List_GetCount(xo->msgOutList)==1) { + DBG_INFO(AQH_LOGDOMAIN, "Enabling msgWriter, sending message"); + AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg)); + AQH_Object_Enable(xo->msgWriter); + } } } } diff --git a/aqhome/ipc2/mqtt_client.c b/aqhome/ipc2/mqtt_client.c new file mode 100644 index 0000000..f51cc31 --- /dev/null +++ b/aqhome/ipc2/mqtt_client.c @@ -0,0 +1,53 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./mqtt_client.h" + +#include +#include +#include + +#include + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + +AQH_OBJECT *AQH_MqttClientObject_new(AQH_EVENT_LOOP *eventLoop, int fd) +{ + int fdCopy; + AQH_OBJECT *fdReader; + AQH_OBJECT *fdWriter; + AQH_OBJECT *msgReader; + AQH_OBJECT *msgWriter; + AQH_OBJECT *endpoint; + + fdCopy=dup(fd); + + fdReader=AQH_FdObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ); + msgReader=AQH_MqttMsgReader_new(eventLoop, fdReader); + AQH_Object_Enable(msgReader); + + fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE); + msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter); + + endpoint=AQH_Endpoint_new(eventLoop, msgReader, msgWriter); + return endpoint; +} + + + diff --git a/aqhome/ipc2/mqtt_client.h b/aqhome/ipc2/mqtt_client.h new file mode 100644 index 0000000..36d64bb --- /dev/null +++ b/aqhome/ipc2/mqtt_client.h @@ -0,0 +1,27 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MQTT_CLIENT_H +#define AQH_MQTT_CLIENT_H + + +#include + + + +/** + * + * @param eventLoop pointer to eventLoop + * @param fd connected non-blocking socket to work with (see @ref AQH_TcpObject_CreateConnectedSocket). + */ +AQHOME_API AQH_OBJECT *AQH_MqttClientObject_new(AQH_EVENT_LOOP *eventLoop, int fd); + + + +#endif + diff --git a/aqhome/ipc2/mqtt_endpoint.c b/aqhome/ipc2/mqtt_endpoint.c new file mode 100644 index 0000000..35e01b2 --- /dev/null +++ b/aqhome/ipc2/mqtt_endpoint.c @@ -0,0 +1,74 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./mqtt_endpoint.h" +#include "aqhome/msg/mqtt/m_mqtt.h" + +#include + +#include + + + +AQH_MESSAGE *AQH_MqttEndpoint_WaitForConnAckMsg(AQH_OBJECT *mqttEndpoint, int timeoutInSeconds) +{ + return AQH_MqttEndpoint_WaitForMsg(mqttEndpoint, AQH_MQTTMSG_MSGTYPE_CONNACK, timeoutInSeconds); +} + + + +AQH_MESSAGE *AQH_MqttEndpoint_WaitForMsg(AQH_OBJECT *mqttEndpoint, uint8_t t, int timeoutInSeconds) +{ + time_t startTime; + + startTime=time(NULL); + t&=0xf0; + + for (;;) { + AQH_MESSAGE_LIST *msgList; + time_t now; + + msgList=AQH_Endpoint_GetMsgInList(mqttEndpoint); + if (msgList) { + AQH_MESSAGE *msg; + + msg=AQH_Message_List_First(msgList); + while(msg) { + uint8_t msgTypeAndFlags; + + msgTypeAndFlags=AQH_MqttMessage_GetTypeAndFlags(msg); + if ((msgTypeAndFlags & 0xf0) == t) { + AQH_Message_List_Del(msg); + return msg; + } + msg=AQH_Message_List_Next(msg); + } + } + + now=time(NULL); + if (now-startTime>timeoutInSeconds) { + DBG_INFO(NULL, "Timeout"); + break; + } + + AQH_EventLoop_Run(AQH_Object_GetEventLoop(mqttEndpoint), 500); + } /* for */ + + return NULL; +} + + + + + + + diff --git a/aqhome/ipc2/mqtt_endpoint.h b/aqhome/ipc2/mqtt_endpoint.h new file mode 100644 index 0000000..9fa6d52 --- /dev/null +++ b/aqhome/ipc2/mqtt_endpoint.h @@ -0,0 +1,23 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MQTT_ENDPOINT_H +#define AQH_MQTT_ENDPOINT_H + + +#include + + + +AQHOME_API AQH_MESSAGE *AQH_MqttEndpoint_WaitForConnAckMsg(AQH_OBJECT *mqttEndpoint, int timeoutInSeconds); +AQHOME_API AQH_MESSAGE *AQH_MqttEndpoint_WaitForMsg(AQH_OBJECT *mqttEndpoint, uint8_t t, int timeoutInSeconds); + + + +#endif + diff --git a/aqhome/ipc2/mqttmsgreader.c b/aqhome/ipc2/mqttmsgreader.c new file mode 100644 index 0000000..7b30dc6 --- /dev/null +++ b/aqhome/ipc2/mqttmsgreader.c @@ -0,0 +1,182 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./mqttmsgreader.h" +#include "./msgreader_p.h" +#include + +#include +#include +#include +#include + +#include + + +#define AQH_MSG_READER_HEADER_SIZE 2 +#define AQH_MSG_READER_MINMSGSIZE 12 +#define AQH_MSG_READER_MAXMSGSIZE 10240 + +#define AQH_MSG_READER_FLAGS_READBODY 0x0001 + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static int _readMsg(AQH_OBJECT *o); +static int _readHeaderFromRingbuffer(AQH_MSG_READER *xo); + + + +/* ------------------------------------------------------------------------------------------------ + * implementation + * ------------------------------------------------------------------------------------------------ + */ + +AQH_OBJECT *AQH_MqttMsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject) +{ + AQH_OBJECT *o; + + o=AQH_MsgReader_new(eventLoop, fdObject); + AQH_MsgReader_SetReadMsgFn(o, _readMsg); + + return o; +} + + + +int _readMsg(AQH_OBJECT *o) +{ + AQH_MSG_READER *xo; + + xo=AQH_MsgReader_GetData(o); + if (xo) { + int rv; + + if (!(xo->flags & AQH_MSG_READER_FLAGS_READBODY)) { + DBG_INFO(AQH_LOGDOMAIN, "Reading header"); + rv=_readHeaderFromRingbuffer(xo); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + } + + if (xo->flags & AQH_MSG_READER_FLAGS_READBODY) { + DBG_INFO(AQH_LOGDOMAIN, "Reading body"); + /* reading remainder of msg directly into allocated buffer */ + rv=AQH_MsgReader_ReadRemainderFromRingbuffer(o); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + else if (rv==1) { + int msgLen; + uint8_t *msgPtr; + + msgLen=xo->bytesReceived; + msgPtr=xo->currentMsgBuf; + + xo->bytesReceived=0; + xo->bytesLeft=0; + xo->currentMsgBuf=NULL; + xo->flags&=~AQH_MSG_READER_FLAGS_READBODY; + + DBG_ERROR(NULL, "Received message:"); + //GWEN_Text_LogString((const char*) msgPtr, msgLen, NULL, GWEN_LoggerLevel_Error); + rv=AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_MSGRECVD, msgLen, (void*) msgPtr); + if (rv==0) { + DBG_ERROR(AQH_LOGDOMAIN, "Received message ignored"); + } + free(msgPtr); + return 1; + } + } + return 0; + } + return GWEN_ERROR_GENERIC; +} + + + +int _readHeaderFromRingbuffer(AQH_MSG_READER *xo) +{ + int remainingBytesInBuffer; + uint32_t mqttPayloadLen=0; + int idx=0; + const uint8_t *ptr; + uint8_t lenByte; + int shift=0; + int rv; + + ptr=xo->headerBuffer; + + remainingBytesInBuffer=xo->bytesReceived; + if (remainingBytesInBuffer>AQH_MSG_READER_HEADERBUFFER_SIZE) { + DBG_ERROR(AQH_LOGDOMAIN, "Error in message (msg size not determined within %d bytes)", remainingBytesInBuffer); + return GWEN_ERROR_BAD_DATA; + } + + /* read type and flags (first byte) */ + if (xo->bytesReceived<=idx) { + rv=GWEN_RingBuffer_ReadByte(xo->ringBuffer); + if (rv!=-1) { + xo->headerBuffer[idx]=rv; + xo->bytesReceived++; + remainingBytesInBuffer++; + } + else + return 0; + } + idx++; + + /* read address bytes */ + while(idx<6) { /* max 4 bytes size plus type/flags byte */ + if (xo->bytesReceived<=idx) { + rv=GWEN_RingBuffer_ReadByte(xo->ringBuffer); + if (rv!=-1) { + xo->headerBuffer[idx]=rv; + xo->bytesReceived++; + remainingBytesInBuffer++; + } + else + return 0; + } + lenByte=ptr[idx]; + mqttPayloadLen+=(lenByte & 0x7f)<bytesLeft=(fullMsgSize-xo->bytesReceived); + + xo->currentMsgBuf=(uint8_t*) malloc(fullMsgSize); + memmove(xo->currentMsgBuf, xo->headerBuffer, xo->bytesReceived); + xo->bytesLeft=fullMsgSize-xo->bytesReceived; + xo->flags|=AQH_MSG_READER_FLAGS_READBODY; + DBG_ERROR(AQH_LOGDOMAIN, + "Got size: full size=%d, payload pos=%d, payload size=%d (%04x)", + fullMsgSize, idx+1, mqttPayloadLen, mqttPayloadLen); + return 1; /* size successfully determined */ + } + shift+=7; + idx++; + } + DBG_ERROR(AQH_LOGDOMAIN, "Bad MQTT message (could not determine message length)"); + return GWEN_ERROR_BAD_DATA; +} + + + diff --git a/aqhome/ipc2/mqttmsgreader.h b/aqhome/ipc2/mqttmsgreader.h new file mode 100644 index 0000000..40290ba --- /dev/null +++ b/aqhome/ipc2/mqttmsgreader.h @@ -0,0 +1,20 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MQTTMSGREADER_H +#define AQH_MQTTMSGREADER_H + +#include + + +AQH_OBJECT *AQH_MqttMsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject); + + + +#endif + diff --git a/aqhome/ipc2/msgreader.c b/aqhome/ipc2/msgreader.c index 0ee353d..7a10a94 100644 --- a/aqhome/ipc2/msgreader.c +++ b/aqhome/ipc2/msgreader.c @@ -23,7 +23,7 @@ #define AQH_MSGREADER_SKIPTIME_IN_MS 20 -#define AQH_MSGREADER_FLAGS_SKIP 0x0001 +#define AQH_MSGREADER_FLAGS_SKIP 0x80000000 @@ -109,8 +109,10 @@ void AQH_MsgReader_SetFlags(AQH_OBJECT *o, uint32_t f) AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); - if (xo) + if (xo) { + DBG_ERROR(AQH_LOGDOMAIN, "Set flags: %08x", f); xo->flags=f; + } } @@ -120,8 +122,10 @@ void AQH_MsgReader_AddFlags(AQH_OBJECT *o, uint32_t f) AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); - if (xo) + if (xo) { + DBG_ERROR(AQH_LOGDOMAIN, "Adding flags: %08x", f); xo->flags|=f; + } } @@ -131,8 +135,10 @@ void AQH_MsgReader_SubFlags(AQH_OBJECT *o, uint32_t f) AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); - if (xo) + if (xo) { + DBG_ERROR(AQH_LOGDOMAIN, "Clearing flags: %08x", f); xo->flags&=~f; + } } @@ -339,35 +345,45 @@ int AQH_MsgReader_ReadRemainderFromRingbuffer(AQH_OBJECT *o) xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) { - uint32_t bytesInRingBuffer; - uint32_t bytesToRead; - int rv; + if (xo->bytesLeft==0) { + /* msg finished */ + DBG_INFO(AQH_LOGDOMAIN, "Message complete"); + return 1; + } + else { + uint32_t bytesInRingBuffer; + uint32_t bytesToRead; + int rv; - bytesInRingBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer); - - /* still reading header */ - bytesToRead=xo->bytesLeft; - if (bytesInRingBufferringBuffer, (char*) (xo->currentMsgBuf+xo->bytesReceived), &xferSize); - if (rv<0) { - DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer empty"); - return 0; + bytesInRingBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer); + + /* still reading header */ + bytesToRead=xo->bytesLeft; + if (bytesInRingBufferringBuffer, (char*) (xo->currentMsgBuf+xo->bytesReceived), &xferSize); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer empty"); + return 0; + } + if (xferSizebytesReceived+=xferSize; + xo->bytesLeft-=xferSize; + if (xo->bytesLeft==0) { + /* msg finished */ + DBG_INFO(AQH_LOGDOMAIN, "Message complete"); + return 1; + } } - if (xferSizebytesReceived+=xferSize; - xo->bytesLeft-=xferSize; - if (xo->bytesLeft==0) { - /* msg finished */ - DBG_INFO(AQH_LOGDOMAIN, "Message complete"); - return 1; + else { + DBG_ERROR(AQH_LOGDOMAIN, "Nothing to read??"); } } return 0; diff --git a/aqhome/msg/0BUILD b/aqhome/msg/0BUILD index 9f09863..2269d26 100644 --- a/aqhome/msg/0BUILD +++ b/aqhome/msg/0BUILD @@ -110,11 +110,13 @@ aqhmsg_node aqhmsg_ipc + aqhmsg_mqtt node ipc + mqtt diff --git a/aqhome/msg/mqtt/0BUILD b/aqhome/msg/mqtt/0BUILD new file mode 100644 index 0000000..1fa1e56 --- /dev/null +++ b/aqhome/msg/mqtt/0BUILD @@ -0,0 +1,88 @@ + + + + + + + + $(gwenhywfar_cflags) + -I$(topsrcdir) + -I$(topbuilddir) + + + + --include=$(builddir) + --include=$(srcdir) + + + + + + $(visibility_cflags) + + + + --api=AQHOME_API + + + + + + + + + + + + + + + + + + $(local/built_headers_pub) + + + + + m_mqtt.h + m_mqtt_connect.h + m_mqtt_connack.h + m_mqtt_subscribe.h + m_mqtt_suback.h + m_mqtt_publish.h + + + + + + + + + $(local/typefiles) + + m_mqtt.c + m_mqtt_connect.c + m_mqtt_connack.c + m_mqtt_subscribe.c + m_mqtt_suback.c + m_mqtt_publish.c + + + + + + + + + + + + + + + + + + + diff --git a/aqhome/msg/mqtt/m_mqtt.c b/aqhome/msg/mqtt/m_mqtt.c new file mode 100644 index 0000000..4410573 --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt.c @@ -0,0 +1,205 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "aqhome/msg/mqtt/m_mqtt.h" + +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + + + + +/* ------------------------------------------------------------------------------------------------ + * implementation + * ------------------------------------------------------------------------------------------------ + */ + +AQH_MESSAGE *AQH_MqttMessage_new(uint8_t typeAndFlags, uint32_t payloadLen, const uint8_t *payload) +{ + if (payloadLen>0xfffffffu) { + DBG_ERROR(AQH_LOGDOMAIN, "Too many bytes in payload, can't encode into MQTT message"); + return NULL; + } + else { + AQH_MESSAGE *msg; + uint8_t *ptr; + uint32_t msgBufLen; + uint32_t len; + uint32_t bytesUsed=0; + int i=0; + + msgBufLen=16+payloadLen; + msg=AQH_Message_new(); + AQH_Message_SetData(msg, NULL, msgBufLen); /* auto-malloc len bytes */ + + ptr=AQH_Message_GetMsgPointer(msg); + *(ptr++)=typeAndFlags; + bytesUsed++; + + /* store address length */ + len=payloadLen; + do { + uint8_t b; + + b=len & 0x7f; + len>>=7; + if (len) + b|=0x80; + *(ptr++)=b; + bytesUsed++; + } while(len && i<4); + if (payload && payloadLen) { + memmove(ptr, payload, payloadLen); + bytesUsed+=payloadLen; + } + AQH_Message_SetUsedSize(msg, bytesUsed); + return msg; + } +} + + + +void AQH_MqttMessage_AppendStringWithLenToBuffer(GWEN_BUFFER *buf, const char *s) +{ + unsigned int len; + + len=strlen(s); + GWEN_Buffer_AppendByte(buf, (len>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, len & 0xff); + if (s && *s) + GWEN_Buffer_AppendString(buf, s); +} + + + +char *AQH_MqttMessage_ExtractStringAt(const uint8_t *ptr, uint32_t len) +{ + if (len>1) { + uint32_t slen; + + slen=(ptr[0]<<8)+ptr[1]; + if (slen) { + char *result; + + if (slen>(len-2)) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", + (unsigned long int) slen, (unsigned long int) len); + return NULL; + } + + result=(char*) malloc(slen+1); + if (result==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error on malloc"); + return NULL; + } + memmove(result, ptr+2, slen); + result[slen]=0; + return result; + } + } + return NULL; +} + + + +int AQH_MqttMessage_GetOffsetOfPayload(const AQH_MESSAGE *msg) +{ + const uint8_t *ptr; + int usedBytes; + int idx; + + ptr=AQH_Message_GetMsgPointer(msg); + usedBytes=AQH_Message_GetUsedSize(msg); + if (usedBytes>5) + usedBytes=5; + for(idx=1; idx=usedBytes) { + DBG_ERROR(AQH_LOGDOMAIN, "Could not determine length of size field"); + return GWEN_ERROR_BAD_DATA; + } + return idx+1; +} + + + +int AQH_MqttMessage_GetTypeAndFlags(const AQH_MESSAGE *msg) +{ + const uint8_t *ptr; + uint32_t usedBytes; + + ptr=AQH_Message_GetMsgPointer(msg); + usedBytes=AQH_Message_GetUsedSize(msg); + if (usedBytes) + return *ptr; + else { + DBG_ERROR(AQH_LOGDOMAIN, "No data in message"); + return 0; + } +} + + + +int AQH_MqttMessage_SkipStringAt(const uint8_t *ptr, uint32_t len) +{ + if (len>1) { + uint32_t slen; + + slen=(ptr[0]<<8)+ptr[1]; + if (slen) { + if (slen>(len-2)) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid string length (%lu, remaining %lu)", + (unsigned long int) slen, (unsigned long int) len); + return GWEN_ERROR_BAD_DATA; + } + } + return slen+2; + } + return GWEN_ERROR_BAD_DATA; +} + + + +const char *AQH_MqttMessage_MsgTypeToString(uint8_t t) +{ + switch(t & 0xf0) { + case (AQH_MQTTMSG_MSGTYPE_CONNECT & 0xf0): return "CONNECT"; + case (AQH_MQTTMSG_MSGTYPE_CONNACK & 0xf0): return "CONACK"; + case (AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0): return "PUBLISH"; + case (AQH_MQTTMSG_MSGTYPE_PUBACK & 0xf0): return "PUBACK"; + case (AQH_MQTTMSG_MSGTYPE_PUBREC & 0xf0): return "PUBREC"; + case (AQH_MQTTMSG_MSGTYPE_PUBREL & 0xf0): return "PUBREL"; + case (AQH_MQTTMSG_MSGTYPE_PUBCOMP & 0xf0): return "PUBCOMP"; + case (AQH_MQTTMSG_MSGTYPE_SUBSCRIBE & 0xf0): return "SUBSCRIBE"; + case (AQH_MQTTMSG_MSGTYPE_SUBACK & 0xf0): return "SUBACK"; + case (AQH_MQTTMSG_MSGTYPE_UNSUBSCRIBE & 0xf0): return "UNSUBSCRIBE"; + case (AQH_MQTTMSG_MSGTYPE_UNSUBACK & 0xf0): return "UNSUBACK"; + case (AQH_MQTTMSG_MSGTYPE_PINGREQ & 0xf0): return "PINGREQ"; + case (AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0): return "PINGRESP"; + case (AQH_MQTTMSG_MSGTYPE_DISCONNECT & 0xf0): return "DISCONNECT"; + default: return "(unknown)"; + } +} + + diff --git a/aqhome/msg/mqtt/m_mqtt.h b/aqhome/msg/mqtt/m_mqtt.h new file mode 100644 index 0000000..4bdfc0e --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt.h @@ -0,0 +1,63 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_M_MQTT_H +#define AQH_M_MQTT_H + + +#include +#include + +#include + + + +#define AQH_MQTTMSG_OFFS_CONTROL 0 +#define AQH_MQTTMSG_OFFS_REMAINING_LENGTH 1 + + +/* from https://docs.solace.com/API/MQTT-311-Prtl-Conformance-Spec/MQTT%20Control%20Packet%20format.htm + */ +#define AQH_MQTTMSG_MSGTYPE_CONNECT 0x10u +#define AQH_MQTTMSG_MSGTYPE_CONNACK 0x20u +#define AQH_MQTTMSG_MSGTYPE_PUBLISH 0x30u +#define AQH_MQTTMSG_MSGTYPE_PUBACK 0x40u +#define AQH_MQTTMSG_MSGTYPE_PUBREC 0x50u /* assured delivery part 1 */ +#define AQH_MQTTMSG_MSGTYPE_PUBREL 0x62u /* assured delivery part 2 */ +#define AQH_MQTTMSG_MSGTYPE_PUBCOMP 0x70u /* assured delivery part 3 */ +#define AQH_MQTTMSG_MSGTYPE_SUBSCRIBE 0x82u +#define AQH_MQTTMSG_MSGTYPE_SUBACK 0x90u +#define AQH_MQTTMSG_MSGTYPE_UNSUBSCRIBE 0xa2u +#define AQH_MQTTMSG_MSGTYPE_UNSUBACK 0xb0u +#define AQH_MQTTMSG_MSGTYPE_PINGREQ 0xc0u +#define AQH_MQTTMSG_MSGTYPE_PINGRESP 0xd0u +#define AQH_MQTTMSG_MSGTYPE_DISCONNECT 0xe0u + + +#define AQH_MQTTMSG_FLAGS_DUP 0x08u +#define AQH_MQTTMSG_FLAGS_QOS2 0x04u +#define AQH_MQTTMSG_FLAGS_QOS1 0x02u +#define AQH_MQTTMSG_FLAGS_RETAIN 0x01u + + + +AQHOME_API AQH_MESSAGE *AQH_MqttMessage_new(uint8_t typeAndFlags, uint32_t payloadLen, const uint8_t *payload); + +AQHOME_API int AQH_MqttMessage_GetTypeAndFlags(const AQH_MESSAGE *msg); + +AQHOME_API void AQH_MqttMessage_AppendStringWithLenToBuffer(GWEN_BUFFER *buf, const char *s); +AQHOME_API char *AQH_MqttMessage_ExtractStringAt(const uint8_t *ptr, uint32_t len); +AQHOME_API int AQH_MqttMessage_SkipStringAt(const uint8_t *ptr, uint32_t len); + +AQHOME_API int AQH_MqttMessage_GetOffsetOfPayload(const AQH_MESSAGE *msg); + +AQHOME_API const char *AQH_MqttMessage_MsgTypeToString(uint8_t t); + + + +#endif diff --git a/aqhome/msg/mqtt/m_mqtt_connack.c b/aqhome/msg/mqtt/m_mqtt_connack.c new file mode 100644 index 0000000..1410ab6 --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt_connack.c @@ -0,0 +1,88 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "aqhome/msg/mqtt/m_mqtt_connack.h" +#include "aqhome/msg/mqtt/m_mqtt.h" + +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * implementation + * ------------------------------------------------------------------------------------------------ + */ + +AQH_MESSAGE *AQH_MqttMessageConnAck_new(uint8_t flags, uint8_t result) +{ + AQH_MESSAGE *msg; + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + GWEN_Buffer_AppendByte(buf, flags); + GWEN_Buffer_AppendByte(buf, result); + + msg=AQH_MqttMessage_new(AQH_MQTTMSG_MSGTYPE_CONNECT, GWEN_Buffer_GetUsedBytes(buf), (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + + return msg; +} + + + +uint8_t AQH_MqttMessageConnAck_GetResultFlags(const AQH_MESSAGE *msg) +{ + int idx; + + idx=AQH_MqttMessage_GetOffsetOfPayload(msg); + if (idx>0) { + const uint8_t *ptr; + int len; + + ptr=AQH_Message_GetMsgPointer(msg)+idx; + len=(int)AQH_Message_GetUsedSize(msg)-idx; + if (len>0) { + return ptr[0]; + } + } + + return 0; +} + + + +uint8_t AQH_MqttMessageConnAck_GetResultCode(const AQH_MESSAGE *msg) +{ + int idx; + + idx=AQH_MqttMessage_GetOffsetOfPayload(msg); + if (idx>0) { + const uint8_t *ptr; + int len; + + ptr=AQH_Message_GetMsgPointer(msg)+idx; + len=(int)AQH_Message_GetUsedSize(msg)-idx; + if (len>1) { + return ptr[1]; + } + } + + return 0; +} + + + + + + + diff --git a/aqhome/msg/mqtt/m_mqtt_connack.h b/aqhome/msg/mqtt/m_mqtt_connack.h new file mode 100644 index 0000000..02e8e43 --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt_connack.h @@ -0,0 +1,39 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_M_MQTT_CONNACK_H +#define AQH_M_MQTT_CONNACK_H + + +#include +#include + +#include + + + +#define AQH_MQTTMSG_CONNACK_FLAGS_HAVE_SESSION 0x01u + +#define AQH_MQTTMSG_CONNACK_RESULT_ACCEPTED 0x00u +#define AQH_MQTTMSG_CONNACK_RESULT_BAD_PROTO 0x01u +#define AQH_MQTTMSG_CONNACK_RESULT_BAD_CLIENTID 0x02u +#define AQH_MQTTMSG_CONNACK_RESULT_UNAVAILABLE 0x03u +#define AQH_MQTTMSG_CONNACK_RESULT_BAD_CREDENTIALS 0x04u +#define AQH_MQTTMSG_CONNACK_RESULT_UNAUTHORIZED 0x05u + + + +AQHOME_API AQH_MESSAGE *AQH_MqttMessageConnAck_new(uint8_t flags, uint8_t result); +AQHOME_API uint8_t AQH_MqttMessageConnAck_GetResultFlags(const AQH_MESSAGE *msg); +AQHOME_API uint8_t AQH_MqttMessageConnAck_GetResultCode(const AQH_MESSAGE *msg); + + + + + +#endif diff --git a/aqhome/msg/mqtt/m_mqtt_connect.c b/aqhome/msg/mqtt/m_mqtt_connect.c new file mode 100644 index 0000000..303399f --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt_connect.c @@ -0,0 +1,62 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "aqhome/msg/mqtt/m_mqtt_connect.h" +#include "aqhome/msg/mqtt/m_mqtt.h" + +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * implementation + * ------------------------------------------------------------------------------------------------ + */ + +AQH_MESSAGE *AQH_MqttMessageConnect_new(const char *protoName, + uint8_t protoLevel, + uint8_t connectFlags, + uint16_t keepAliveTime, + const char *clientId, + const char *userName, + const char *password) +{ + AQH_MESSAGE *msg; + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + AQH_MqttMessage_AppendStringWithLenToBuffer(buf, protoName?protoName:"MQTT"); + GWEN_Buffer_AppendByte(buf, protoLevel?protoLevel:4); + GWEN_Buffer_AppendByte(buf, connectFlags); + GWEN_Buffer_AppendByte(buf, (keepAliveTime>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, keepAliveTime & 0xff); + + AQH_MqttMessage_AppendStringWithLenToBuffer(buf, clientId); + /* here could be inserted: will topic, will message */ + if (connectFlags & AQH_MQTTMSG_CONNECT_FLAGS_USERNAME) + AQH_MqttMessage_AppendStringWithLenToBuffer(buf, userName); + if (connectFlags & AQH_MQTTMSG_CONNECT_FLAGS_PASSWD) + AQH_MqttMessage_AppendStringWithLenToBuffer(buf, password); + + msg=AQH_MqttMessage_new(AQH_MQTTMSG_MSGTYPE_CONNECT, GWEN_Buffer_GetUsedBytes(buf), (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + + return msg; +} + + + + + + + diff --git a/aqhome/msg/mqtt/m_mqtt_connect.h b/aqhome/msg/mqtt/m_mqtt_connect.h new file mode 100644 index 0000000..9443943 --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt_connect.h @@ -0,0 +1,40 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_M_MQTT_CONNECT_H +#define AQH_M_MQTT_CONNECT_H + + +#include +#include + +#include + + +#define AQH_MQTTMSG_CONNECT_FLAGS_USERNAME 0x80u +#define AQH_MQTTMSG_CONNECT_FLAGS_PASSWD 0x40u +#define AQH_MQTTMSG_CONNECT_FLAGS_WILL_RETAIN 0x20u +#define AQH_MQTTMSG_CONNECT_FLAGS_WILL_QOS2 0x10u +#define AQH_MQTTMSG_CONNECT_FLAGS_WILL_QOS1 0x08u +#define AQH_MQTTMSG_CONNECT_FLAGS_WILL_FLAG 0x04u +#define AQH_MQTTMSG_CONNECT_FLAGS_CLEAN_SESSION 0x01u + + + +AQHOME_API AQH_MESSAGE *AQH_MqttMessageConnect_new(const char *protoName, + uint8_t protoLevel, + uint8_t connectFlags, + uint16_t keepAliveTime, + const char *clientId, + const char *userName, + const char *password); + + + + +#endif diff --git a/aqhome/msg/mqtt/m_mqtt_publish.c b/aqhome/msg/mqtt/m_mqtt_publish.c new file mode 100644 index 0000000..d47e95c --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt_publish.c @@ -0,0 +1,145 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "aqhome/msg/mqtt/m_mqtt_publish.h" +#include "aqhome/msg/mqtt/m_mqtt.h" + +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * implementation + * ------------------------------------------------------------------------------------------------ + */ +AQH_MESSAGE *AQH_MqttMessagePublish_new(uint8_t flags, uint16_t packetId, + const char *sTopic, const uint8_t *messagePtr, uint32_t messageLen) +{ + AQH_MESSAGE *msg; + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + AQH_MqttMessage_AppendStringWithLenToBuffer(buf, sTopic?sTopic:""); + if (flags & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + GWEN_Buffer_AppendByte(buf, (packetId>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, packetId & 0xff); + } + + /* payload */ + if (messagePtr && messageLen) + GWEN_Buffer_AppendBytes(buf, (const char*) messagePtr, messageLen); + + msg=AQH_MqttMessage_new(AQH_MQTTMSG_MSGTYPE_PUBLISH | flags, + GWEN_Buffer_GetUsedBytes(buf), + (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + return msg; +} + + + +int AQH_MqttMessagePublish_GetPacketId(const AQH_MESSAGE *msg) +{ + if (AQH_MqttMessage_GetTypeAndFlags(msg) & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + int idx; + + idx=AQH_MqttMessage_GetOffsetOfPayload(msg); + if (idx>0) { + int len; + + len=(int)AQH_Message_GetUsedSize(msg)-idx; + if (len>1) { + uint8_t *ptr; + int rv; + + ptr=AQH_Message_GetMsgPointer(msg); + rv=AQH_MqttMessage_SkipStringAt(ptr, len); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + idx+=rv; + ptr+=rv; + len-=rv; + if (idx0) { + uint8_t *ptr; + int len; + + ptr=AQH_Message_GetMsgPointer(msg)+idx; + len=(int)AQH_Message_GetUsedSize(msg)-idx; + DBG_ERROR(AQH_LOGDOMAIN, "Extracting string from %d (remaining len=%d)", idx, len); + if (len>1) + return AQH_MqttMessage_ExtractStringAt(ptr, len); + } + return NULL; +} + + + +char *AQH_MqttMessagePublish_ExtractValue(const AQH_MESSAGE *msg) +{ + int idx; + + idx=AQH_MqttMessage_GetOffsetOfPayload(msg); + if (idx>0) { + uint8_t *ptr; + int len; + + ptr=AQH_Message_GetMsgPointer(msg)+idx; + len=(int)AQH_Message_GetUsedSize(msg)-idx; + if (len>1) { + int rv; + + rv=AQH_MqttMessage_SkipStringAt(ptr, len); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return NULL; + } + ptr+=rv; + len-=rv; + if (AQH_MqttMessage_GetTypeAndFlags(msg) & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + ptr+=2; + len-=2; + } + if (len>0) { + char *result; + + result=(char*) malloc(len+1); + if (result) { + memmove(result, ptr, len); + result[len]=0; + return result; + } + } + } + } + return NULL; +} + + + diff --git a/aqhome/msg/mqtt/m_mqtt_publish.h b/aqhome/msg/mqtt/m_mqtt_publish.h new file mode 100644 index 0000000..ec78e19 --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt_publish.h @@ -0,0 +1,29 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_M_MQTT_PUBLISH_H +#define AQH_M_MQTT_PUBLISH_H + + +#include +#include + +#include + + + +AQHOME_API AQH_MESSAGE *AQH_MqttMessagePublish_new(uint8_t flags, uint16_t packetId, + const char *sTopic, const uint8_t *messagePtr, uint32_t messageLen); +AQHOME_API int AQH_MqttMessagePublish_GetPacketId(const AQH_MESSAGE *msg); +AQHOME_API char *AQH_MqttMessagePublish_ExtractTopic(const AQH_MESSAGE *msg); +AQHOME_API char *AQH_MqttMessagePublish_ExtractValue(const AQH_MESSAGE *msg); + + + + +#endif diff --git a/aqhome/msg/mqtt/m_mqtt_suback.c b/aqhome/msg/mqtt/m_mqtt_suback.c new file mode 100644 index 0000000..af5cbe9 --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt_suback.c @@ -0,0 +1,93 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "aqhome/msg/mqtt/m_mqtt_suback.h" +#include "aqhome/msg/mqtt/m_mqtt.h" + +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * implementation + * ------------------------------------------------------------------------------------------------ + */ + +AQH_MESSAGE *AQH_MqttMessageSubAck_new(uint8_t flags, uint16_t packetId, uint8_t result) +{ + AQH_MESSAGE *msg; + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + if (flags & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + GWEN_Buffer_AppendByte(buf, (packetId>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, packetId & 0xff); + } + /* payload */ + GWEN_Buffer_AppendByte(buf, result); + + msg=AQH_MqttMessage_new(AQH_MQTTMSG_MSGTYPE_SUBACK | flags, + GWEN_Buffer_GetUsedBytes(buf), + (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + return msg; +} + + + +int AQH_MqttMessageSubAck_GetPacketId(const AQH_MESSAGE *msg) +{ + int idx; + + idx=AQH_MqttMessage_GetOffsetOfPayload(msg); + if (idx>0) { + int len; + + len=(int)AQH_Message_GetUsedSize(msg)-idx; + if (len>1) { + if (AQH_MqttMessage_GetTypeAndFlags(msg) & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + uint8_t *ptr; + + ptr=AQH_Message_GetMsgPointer(msg); + return (ptr[0]<<8)+ptr[1]; + } + } + } + return 0; +} + + + +int AQH_MqttMessageSubAck_GetResultCode(const AQH_MESSAGE *msg) +{ + int idx; + + idx=AQH_MqttMessage_GetOffsetOfPayload(msg); + if (idx>0) { + int len; + + if (AQH_MqttMessage_GetTypeAndFlags(msg) & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) + idx+=2; + len=(int)AQH_Message_GetUsedSize(msg)-idx; + if (len>0) { + uint8_t *ptr; + + ptr=AQH_Message_GetMsgPointer(msg); + return *ptr; + } + } + return GWEN_ERROR_BAD_DATA; +} + + + diff --git a/aqhome/msg/mqtt/m_mqtt_suback.h b/aqhome/msg/mqtt/m_mqtt_suback.h new file mode 100644 index 0000000..d144d59 --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt_suback.h @@ -0,0 +1,27 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_M_MQTT_SUBACK_H +#define AQH_M_MQTT_SUBACK_H + + +#include +#include + +#include + + + +AQHOME_API AQH_MESSAGE *AQH_MqttMessageSubAck_new(uint8_t flags, uint16_t packetId, uint8_t result); +AQHOME_API int AQH_MqttMessageSubAck_GetResultCode(const AQH_MESSAGE *msg); +AQHOME_API int AQH_MqttMessageSubAck_GetPacketId(const AQH_MESSAGE *msg); + + + + +#endif diff --git a/aqhome/msg/mqtt/m_mqtt_subscribe.c b/aqhome/msg/mqtt/m_mqtt_subscribe.c new file mode 100644 index 0000000..a299e8a --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt_subscribe.c @@ -0,0 +1,46 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "aqhome/msg/mqtt/m_mqtt_subscribe.h" +#include "aqhome/msg/mqtt/m_mqtt.h" + +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * implementation + * ------------------------------------------------------------------------------------------------ + */ + +AQH_MESSAGE *AQH_MqttMessageSubscribe_new(uint8_t flags, uint16_t packetId, const char *sTopic, uint8_t requestedQos) +{ + AQH_MESSAGE *msg; + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + GWEN_Buffer_AppendByte(buf, (packetId>>8) & 0xff); + GWEN_Buffer_AppendByte(buf, packetId & 0xff); + /* add topic filter / qos pair */ + AQH_MqttMessage_AppendStringWithLenToBuffer(buf, sTopic); + GWEN_Buffer_AppendByte(buf, requestedQos); + + msg=AQH_MqttMessage_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE | flags, + GWEN_Buffer_GetUsedBytes(buf), + (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + return msg; +} + + + diff --git a/aqhome/msg/mqtt/m_mqtt_subscribe.h b/aqhome/msg/mqtt/m_mqtt_subscribe.h new file mode 100644 index 0000000..cdef750 --- /dev/null +++ b/aqhome/msg/mqtt/m_mqtt_subscribe.h @@ -0,0 +1,25 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_M_MQTT_SUBSCRIBE_H +#define AQH_M_MQTT_SUBSCRIBE_H + + +#include +#include + +#include + + + +AQHOME_API AQH_MESSAGE *AQH_MqttMessageSubscribe_new(uint8_t flags, uint16_t packetId, const char *sTopic, uint8_t requestedQos); + + + + +#endif diff --git a/mosquitto.sh b/mosquitto.sh index f9eb457..8abb38e 100755 --- a/mosquitto.sh +++ b/mosquitto.sh @@ -1,3 +1,3 @@ -mosquitto_sub -h 192.168.117.192 -t '#' -v +mosquitto_sub -h 192.168.117.194 -t '#' -v