/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./s_publish.h" #include "./server_p.h" #include #include #include #include #include #include #include #include #include /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static int _handlePublish(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo, const char *rcvdTopic, const char *rcvdValue); static void _handleNumTopic(AQH_MQTTLOG_SERVER *xo, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue); static void _handleJsonTopic(AQH_MQTTLOG_SERVER *xo, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue); static void _sendMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value, const char *rcvdValue); static void _announceDeviceToBroker(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device); static void _sendAnnounceValueMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value); static AQH_VALUE *_mkMessageValue(const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value); static int _mqttValueTypeMessageValueType(int t); static int _registerNewDeviceForTopic(AQH_MQTTLOG_SERVER *xo, const char *rcvdTopic); static AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir); static AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir); static GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic); /* ------------------------------------------------------------------------------------------------ * code * ------------------------------------------------------------------------------------------------ */ void AQH_MqttLogServer_HandlePublishMsg(AQH_OBJECT *o, GWEN_UNUSED AQH_OBJECT *ep, const AQH_MESSAGE *msg) { if (o && msg) { AQH_MQTTLOG_SERVER *xo; xo=AQH_MqttLogServer_GetServerData(o); if (xo && xo->registeredDeviceList) { char *topic; char *value; topic=AQH_MqttMessagePublish_ExtractTopic(msg); value=AQH_MqttMessagePublish_ExtractValue(msg); if (topic && value) { int rv; rv=_handlePublish(o, xo, topic, value); if (rv!=1) { DBG_INFO(NULL, "New topic \"%s\", trying to register", topic); rv=_registerNewDeviceForTopic(xo, topic); if (rv==1) { rv=_handlePublish(o, xo, topic, value); if (rv!=1) { DBG_ERROR(NULL, "Topic \"%s\" still not handled, SNH!", topic); } } } } else { DBG_ERROR(NULL, "Either topic or value missing in PUBLISH msg"); } free(value); free(topic); } } } int _handlePublish(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo, const char *rcvdTopic, const char *rcvdValue) { if (o && rcvdTopic && *rcvdTopic && xo && xo->registeredDeviceList) { AQHMQTT_DEVICE *device; device=AQHMQTT_Device_List_First(xo->registeredDeviceList); while(device) { AQHMQTT_TOPIC_LIST *topicList; const char *sDeviceName; const char *sDeviceId; sDeviceName=AQHMQTT_Device_GetName(device); sDeviceId=AQHMQTT_Device_GetId(device); topicList=AQHMQTT_Device_GetTopicList(device); if (topicList) { AQHMQTT_TOPIC *topic; topic=_findTopicMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); #if 0 if (topic==NULL) { topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); if (topic) AQHMQTT_Topic_SetTopic(topic, rcvdTopic); } #endif if (topic) { DBG_INFO(AQH_LOGDOMAIN, "Handling topic \"%s\" for device type %s (id: %s)", rcvdTopic, sDeviceName, sDeviceId?sDeviceId:""); if (AQHMQTT_Topic_GetTopicType(topic)==AQHMQTT_TopicType_Json) _handleJsonTopic(xo, device, topic, rcvdValue); else _handleNumTopic(xo, device, topic, rcvdValue); return 1; } } device=AQHMQTT_Device_List_Next(device); } DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic); } return 0; } void _handleNumTopic(AQH_MQTTLOG_SERVER *xo, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue) { AQHMQTT_VALUE_LIST *valueList; valueList=AQHMQTT_Topic_GetValueList(topic); if (valueList) _sendMessage(xo, device, AQHMQTT_Value_List_First(valueList), rcvdValue); else { DBG_INFO(NULL, "No value list in device \"%s\"", AQHMQTT_Device_GetId(device)); } } void _handleJsonTopic(AQH_MQTTLOG_SERVER *xo, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue) { GWEN_JSON_ELEM *jeRoot; jeRoot=GWEN_JsonElement_fromString(rcvdValue); if (jeRoot==NULL) { DBG_INFO(NULL, "Could not parse JSON value: %s", rcvdValue); } else { AQHMQTT_VALUE_LIST *valueList; valueList=AQHMQTT_Topic_GetValueList(topic); if (valueList) { AQHMQTT_VALUE *value; value=AQHMQTT_Value_List_First(valueList); while(value) { const char *path; path=AQHMQTT_Value_GetPath(value); if (path && *path) { GWEN_JSON_ELEM *je; je=GWEN_JsonElement_GetElementByPath(jeRoot, path, 0); if (je) { const char *s; s=GWEN_JsonElement_GetData(je); if (s && *s) _sendMessage(xo, device, value, s); } } value=AQHMQTT_Value_List_Next(value); } /* while */ } GWEN_JsonElement_free(jeRoot); } } void _sendMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value, const char *rcvdValue) { int rv; double f; const char *deviceName; deviceName=AQHMQTT_Device_GetId(device); rv=GWEN_Text_StringToDouble(rcvdValue, &f); if (rv<0) { DBG_ERROR(NULL, "Invalid value received from MQTT server (%s)", rcvdValue?rcvdValue:""); } else { uint64_t now; AQH_VALUE *msgValue; now=(uint64_t) time(NULL); msgValue=_mkMessageValue(device, value); if (xo->brokerEndpoint) { AQH_MESSAGE *pubMsg; pubMsg=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, msgValue, now, f); DBG_INFO(AQH_LOGDOMAIN, "BROKER UPDATE_DATA %s/%s: %f", deviceName?deviceName:"", AQH_Value_GetName(msgValue), f); AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg); } else { DBG_INFO(AQH_LOGDOMAIN, "Skipping BROKER UPDATE_DATA %s/%s: %f", deviceName?deviceName:"", AQH_Value_GetName(msgValue), f); } AQH_Value_free(msgValue); } } void _announceDeviceToBroker(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device) { AQHMQTT_TOPIC_LIST *topicList; topicList=AQHMQTT_Device_GetTopicList(device); if (topicList) { AQHMQTT_TOPIC *topic; topic=AQHMQTT_Topic_List_First(topicList); while(topic) { AQHMQTT_VALUE_LIST *valueList; valueList=AQHMQTT_Topic_GetValueList(topic); if (valueList) { const AQHMQTT_VALUE *value; value=AQHMQTT_Value_List_First(valueList); while(value) { _sendAnnounceValueMessage(xo, device, value); value=AQHMQTT_Value_List_Next(value); } } topic=AQHMQTT_Topic_List_Next(topic); } } } void _sendAnnounceValueMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value) { AQH_MESSAGE *pubMsg; AQH_VALUE *msgValue; msgValue=_mkMessageValue(device, value); if (xo->brokerEndpoint) { pubMsg=AQH_IpcdMessageValues_newForOne(AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE, AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, 0, msgValue); if (pubMsg) { DBG_ERROR(AQH_LOGDOMAIN, "BROKER ANNOUNCE_VALUE %s/%s", AQH_Value_GetDeviceName(msgValue), AQH_Value_GetName(msgValue)); AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg); } AQH_Value_free(msgValue); } else { DBG_INFO(AQH_LOGDOMAIN, "Ignoring BROKER ANNOUNCE_VALUE %s", AQH_Value_GetName(msgValue)); } } AQH_VALUE *_mkMessageValue(const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value) { AQH_VALUE *msgValue; msgValue=AQH_Value_new(); AQH_Value_SetDeviceName(msgValue, AQHMQTT_Device_GetId(device)); AQH_Value_SetName(msgValue, AQHMQTT_Value_GetName(value)); AQH_Value_SetValueUnits(msgValue, AQHMQTT_Value_GetValueUnits(value)); AQH_Value_SetValueType(msgValue, _mqttValueTypeMessageValueType(AQHMQTT_Value_GetValueType(value))); return msgValue; } int _mqttValueTypeMessageValueType(int t) { switch(t){ case AQHMQTT_ValueType_Sensor: return AQH_ValueType_Sensor; case AQHMQTT_ValueType_Actor: return AQH_ValueType_Actor; default: break; } DBG_ERROR(AQH_LOGDOMAIN, "Invalid mqtt value type %d", t); return AQH_ValueType_Sensor; } int _registerNewDeviceForTopic(AQH_MQTTLOG_SERVER *xo, const char *rcvdTopic) { if (rcvdTopic && *rcvdTopic) { if (xo->availableDeviceList) { AQHMQTT_DEVICE *device; device=AQHMQTT_Device_List_First(xo->availableDeviceList); while(device) { AQHMQTT_TOPIC_LIST *topicList; topicList=AQHMQTT_Device_GetTopicList(device); if (topicList) { AQHMQTT_TOPIC *topic; topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); if (topic) { GWEN_BUFFER *buf; buf=_extractDeviceId(topic, rcvdTopic); if (buf) { AQHMQTT_DEVICE *newDevice; newDevice=AQHMQTT_Device_dup(device); AQHMQTT_Device_SetId(newDevice, GWEN_Buffer_GetStart(buf)); topic=_findMaskMatchingTopic(AQHMQTT_Device_GetTopicList(newDevice), rcvdTopic, AQHMQTT_TopicDir_In); AQHMQTT_Topic_SetTopic(topic, rcvdTopic); if (xo->registeredDeviceList==NULL) xo->registeredDeviceList=AQHMQTT_Device_List_new(); DBG_ERROR(NULL, "Registered device \"%s\" (%s)", AQHMQTT_Device_GetId(newDevice), AQHMQTT_Device_GetName(newDevice)); AQHMQTT_Device_List_Add(newDevice, xo->registeredDeviceList); _announceDeviceToBroker(xo, newDevice); GWEN_Buffer_free(buf); return 1; } } } device=AQHMQTT_Device_List_Next(device); } } DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic); } return 0; } AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir) { if (topicList) { AQHMQTT_TOPIC *topic; topic=AQHMQTT_Topic_List_First(topicList); while(topic) { if (AQHMQTT_Topic_GetDirection(topic)==dir) { const char *sMask; sMask=AQHMQTT_Topic_GetMask(topic); if (sMask && *sMask && GWEN_Text_ComparePattern(rcvdTopic, sMask, 0)!=-1) { /* found a matching topic */ return topic; } } topic=AQHMQTT_Topic_List_Next(topic); } } return NULL; } AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir) { if (topicList) { AQHMQTT_TOPIC *topic; topic=AQHMQTT_Topic_List_First(topicList); while(topic) { if (AQHMQTT_Topic_GetDirection(topic)==dir) { const char *sTopic; sTopic=AQHMQTT_Topic_GetTopic(topic); if (sTopic && *sTopic && strcasecmp(rcvdTopic, sTopic)==0) { return topic; } } topic=AQHMQTT_Topic_List_Next(topic); } } return NULL; } GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic) { const char *sBefore; const char *sAfter; sBefore=AQHMQTT_Topic_GetBeforeId(topic); sAfter=AQHMQTT_Topic_GetAfterId(topic); if (sBefore && *sBefore && sAfter && *sAfter) { GWEN_BUFFER *buf; int rv; buf=GWEN_Buffer_new(0, 256, 0, 1); GWEN_Buffer_AppendString(buf, rcvdTopic); rv=GWEN_Buffer_KeepTextBetweenStrings(buf, sBefore, sAfter, 1); if (rv<0) { DBG_INFO(NULL, "Could not extract id from [%s] (beforeId=%s, afterId=%s)", rcvdTopic, sBefore, sAfter); GWEN_Buffer_free(buf); return NULL; } return buf; } return NULL; }