/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./loop_mqtt.h" #include "./aqhome_mqtt_p.h" #include "aqhome/mqtt/msg_mqtt_publish.h" #include "aqhome/ipc/data/msg_data_multidata.h" #include "aqhome/ipc/data/msg_data_values.h" #include "aqhome/ipc/data/ipc_data.h" #include #include #include #include #include #include //#define FULL_DEBUG /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static void _handleMqttMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handlePublishMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static int _handlePublish(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *topic, const char *value); static void _handleNumTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *dev, AQHMQTT_TOPIC *t, const char *rcvdValue); static void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *dev, AQHMQTT_TOPIC *t, const char *rcvdValue); static void _sendMessage(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value, const char *rcvdValue); static void _announceDeviceToBroker(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device); static void _sendAnnounceValueMessage(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value); static AQH_VALUE *_mkMessageValue(const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value); static int _mqttValueTypeTessageValueType(int t); static int _registerNewDeviceForTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue); static AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir); static AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir); static GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic); /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ void AqHomeMqttLog_ReadAndHandleMqttMessages(AQHOME_MQTT *aqh) { GWEN_MSG_ENDPOINT *epTcp; GWEN_MSG *msg; epTcp=aqh->mqttEndpoint; while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) { #ifdef FULL_DEBUG DBG_ERROR(NULL, "Received this message:"); GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); #endif _handleMqttMsg(aqh, epTcp, msg); GWEN_Msg_free(msg); } } int AqHomeMqttLog_SendPing(AQHOME_MQTT *aqh) { GWEN_MSG_ENDPOINT *epTcp; GWEN_MSG *msgOut; DBG_INFO(AQH_LOGDOMAIN, "Sending PING"); epTcp=aqh->mqttEndpoint; msgOut=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL); if (msgOut==NULL) { DBG_ERROR(NULL, "Error creating message"); return GWEN_ERROR_INTERNAL; } GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); return 0; } void _handleMqttMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) { DBG_INFO(AQH_LOGDOMAIN, "PUBLISH message received"); #ifdef FULL_DEBUG GWEN_BUFFER *buf; buf=GWEN_Buffer_new(0, 256, 0, 1); AQH_PublishMqttMsg_DumpToBuffer(msg, buf, "received"); fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); GWEN_Buffer_free(buf); #endif _handlePublishMsg(aqh, ep, msg); } else if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0)) { DBG_INFO(AQH_LOGDOMAIN, "PING response received"); } else { #ifdef FULL_DEBUG DBG_ERROR(NULL, "Received this message:"); GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); #endif } } void _handlePublishMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { char *topic; char *value; topic=AQH_PublishMqttMsg_ExtractTopic(msg); value=AQH_PublishMqttMsg_ExtractValue(msg); if (topic && value) { int rv; rv=_handlePublish(aqh, ep, topic, value); if (rv!=1) { DBG_INFO(NULL, "New topic \"%s\", trying to register", topic); rv=_registerNewDeviceForTopic(aqh, ep, topic, value); if (rv==1) { rv=_handlePublish(aqh, ep, topic, value); if (rv!=1) { DBG_ERROR(NULL, "Topic \"%s\" still not handled, SNH!", topic); } } } } else { DBG_ERROR(NULL, "Either topic or value missing in PUBLISH msg"); } free(value); free(topic); } int _handlePublish(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue) { if (rcvdTopic && *rcvdTopic) { if (aqh->registeredDeviceList) { AQHMQTT_DEVICE *device; device=AQHMQTT_Device_List_First(aqh->registeredDeviceList); while(device) { AQHMQTT_TOPIC_LIST *topicList; const char *sDeviceName; const char *sDeviceId; sDeviceName=AQHMQTT_Device_GetName(device); sDeviceId=AQHMQTT_Device_GetId(device); topicList=AQHMQTT_Device_GetTopicList(device); if (topicList) { AQHMQTT_TOPIC *topic; topic=_findTopicMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); #if 0 if (topic==NULL) { topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); if (topic) AQHMQTT_Topic_SetTopic(topic, rcvdTopic); } #endif if (topic) { DBG_INFO(AQH_LOGDOMAIN, "Handling topic \"%s\" for device type %s (id: %s)", rcvdTopic, sDeviceName, sDeviceId?sDeviceId:""); if (AQHMQTT_Topic_GetTopicType(topic)==AQHMQTT_TopicType_Json) _handleJsonTopic(aqh, ep, device, topic, rcvdValue); else _handleNumTopic(aqh, ep, device, topic, rcvdValue); return 1; } } device=AQHMQTT_Device_List_Next(device); } } DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic); } return 0; } void _handleNumTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue) { AQHMQTT_VALUE_LIST *valueList; valueList=AQHMQTT_Topic_GetValueList(topic); if (valueList) _sendMessage(aqh, device, AQHMQTT_Value_List_First(valueList), rcvdValue); else { DBG_INFO(NULL, "No value list in device \"%s\"", AQHMQTT_Device_GetId(device)); } } void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue) { GWEN_JSON_ELEM *jeRoot; jeRoot=GWEN_JsonElement_fromString(rcvdValue); if (jeRoot==NULL) { DBG_INFO(NULL, "Could not parse JSON value: %s", rcvdValue); } else { AQHMQTT_VALUE_LIST *valueList; valueList=AQHMQTT_Topic_GetValueList(topic); if (valueList) { AQHMQTT_VALUE *value; value=AQHMQTT_Value_List_First(valueList); while(value) { const char *path; path=AQHMQTT_Value_GetPath(value); if (path && *path) { GWEN_JSON_ELEM *je; je=GWEN_JsonElement_GetElementByPath(jeRoot, path, 0); if (je) { const char *s; s=GWEN_JsonElement_GetData(je); if (s && *s) _sendMessage(aqh, device, value, s); } } value=AQHMQTT_Value_List_Next(value); } /* while */ } GWEN_JsonElement_free(jeRoot); } } void _sendMessage(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value, const char *rcvdValue) { int rv; union {double f; uint64_t i;} u; const char *deviceName; deviceName=AQHMQTT_Device_GetId(device); rv=GWEN_Text_StringToDouble(rcvdValue, &(u.f)); if (rv<0) { DBG_ERROR(NULL, "Invalid value received from MQTT server (%s)", rcvdValue?rcvdValue:""); } else { GWEN_MSG *pubMsg; uint64_t arrayToSend[2]; AQH_VALUE *msgValue; arrayToSend[0]=(uint64_t) time(NULL); arrayToSend[1]=u.i; msgValue=_mkMessageValue(device, value); pubMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgValue, arrayToSend, 1); if (pubMsg) { DBG_INFO(AQH_LOGDOMAIN, "BROKER UPDATE_DATA %s/%s: %f", deviceName?deviceName:"", AQH_Value_GetName(msgValue), u.f); GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, pubMsg); } AQH_Value_free(msgValue); } } void _announceDeviceToBroker(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device) { AQHMQTT_TOPIC_LIST *topicList; topicList=AQHMQTT_Device_GetTopicList(device); if (topicList) { AQHMQTT_TOPIC *topic; topic=AQHMQTT_Topic_List_First(topicList); while(topic) { AQHMQTT_VALUE_LIST *valueList; valueList=AQHMQTT_Topic_GetValueList(topic); if (valueList) { const AQHMQTT_VALUE *value; value=AQHMQTT_Value_List_First(valueList); while(value) { _sendAnnounceValueMessage(aqh, device, value); value=AQHMQTT_Value_List_Next(value); } } topic=AQHMQTT_Topic_List_Next(topic); } } } void _sendAnnounceValueMessage(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value) { GWEN_MSG *pubMsg; AQH_VALUE *msgValue; msgValue=_mkMessageValue(device, value); pubMsg=AQH_ValuesDataIpcMsg_newForOneValue(AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE, 0, msgValue); if (pubMsg) { DBG_INFO(AQH_LOGDOMAIN, "BROKER ANNOUNCE_VALUE %s", AQH_Value_GetName(msgValue)); GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, pubMsg); } AQH_Value_free(msgValue); } AQH_VALUE *_mkMessageValue(const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value) { AQH_VALUE *msgValue; msgValue=AQH_Value_new(); AQH_Value_SetDeviceName(msgValue, AQHMQTT_Device_GetId(device)); AQH_Value_SetName(msgValue, AQHMQTT_Value_GetName(value)); AQH_Value_SetValueUnits(msgValue, AQHMQTT_Value_GetValueUnits(value)); AQH_Value_SetValueType(msgValue, _mqttValueTypeTessageValueType(AQHMQTT_Value_GetValueType(value))); return msgValue; } int _mqttValueTypeTessageValueType(int t) { switch(t){ case AQHMQTT_ValueType_Sensor: return AQH_ValueType_Sensor; case AQHMQTT_ValueType_Actor: return AQH_ValueType_Actor; default: DBG_ERROR(AQH_LOGDOMAIN, "Invalid mqtt value type %d", t); return AQH_ValueType_Sensor; } } int _registerNewDeviceForTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue) { if (rcvdTopic && *rcvdTopic) { if (aqh->availableDeviceList) { AQHMQTT_DEVICE *device; device=AQHMQTT_Device_List_First(aqh->availableDeviceList); while(device) { AQHMQTT_TOPIC_LIST *topicList; topicList=AQHMQTT_Device_GetTopicList(device); if (topicList) { AQHMQTT_TOPIC *topic; topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In); if (topic) { GWEN_BUFFER *buf; buf=_extractDeviceId(topic, rcvdTopic); if (buf) { AQHMQTT_DEVICE *newDevice; newDevice=AQHMQTT_Device_dup(device); AQHMQTT_Device_SetId(newDevice, GWEN_Buffer_GetStart(buf)); topic=_findMaskMatchingTopic(AQHMQTT_Device_GetTopicList(newDevice), rcvdTopic, AQHMQTT_TopicDir_In); AQHMQTT_Topic_SetTopic(topic, rcvdTopic); if (aqh->registeredDeviceList==NULL) aqh->registeredDeviceList=AQHMQTT_Device_List_new(); DBG_ERROR(NULL, "Registered device \"%s\" (%s)", AQHMQTT_Device_GetId(newDevice), AQHMQTT_Device_GetName(newDevice)); AQHMQTT_Device_List_Add(newDevice, aqh->registeredDeviceList); _announceDeviceToBroker(aqh, newDevice); GWEN_Buffer_free(buf); return 1; } } } device=AQHMQTT_Device_List_Next(device); } } DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic); } return 0; } AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir) { if (topicList) { AQHMQTT_TOPIC *topic; topic=AQHMQTT_Topic_List_First(topicList); while(topic) { if (AQHMQTT_Topic_GetDirection(topic)==dir) { const char *sMask; sMask=AQHMQTT_Topic_GetMask(topic); if (sMask && *sMask && GWEN_Text_ComparePattern(rcvdTopic, sMask, 0)!=-1) { /* found a matching topic */ return topic; } } topic=AQHMQTT_Topic_List_Next(topic); } } return NULL; } AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir) { if (topicList) { AQHMQTT_TOPIC *topic; topic=AQHMQTT_Topic_List_First(topicList); while(topic) { if (AQHMQTT_Topic_GetDirection(topic)==dir) { const char *sTopic; sTopic=AQHMQTT_Topic_GetTopic(topic); if (sTopic && *sTopic && strcasecmp(rcvdTopic, sTopic)==0) { return topic; } } topic=AQHMQTT_Topic_List_Next(topic); } } return NULL; } GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic) { const char *sBefore; const char *sAfter; sBefore=AQHMQTT_Topic_GetBeforeId(topic); sAfter=AQHMQTT_Topic_GetAfterId(topic); if (sBefore && *sBefore && sAfter && *sAfter) { GWEN_BUFFER *buf; int rv; buf=GWEN_Buffer_new(0, 256, 0, 1); GWEN_Buffer_AppendString(buf, rcvdTopic); rv=GWEN_Buffer_KeepTextBetweenStrings(buf, sBefore, sAfter, 1); if (rv<0) { DBG_INFO(NULL, "Could not extract id from [%s] (beforeId=%s, afterId=%s)", rcvdTopic, sBefore, sAfter); GWEN_Buffer_free(buf); return NULL; } return buf; } return NULL; }