407 lines
12 KiB
C
407 lines
12 KiB
C
/****************************************************************************
|
|
* This file is part of the project AqHome.
|
|
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
|
|
*
|
|
* The license for this file can be found in the file COPYING which you
|
|
* should have received along with this file.
|
|
****************************************************************************/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
# include <config.h>
|
|
#endif
|
|
|
|
#include "./loop_mqtt.h"
|
|
#include "./aqhome_mqtt_p.h"
|
|
#include "aqhome/mqtt/msg_mqtt_publish.h"
|
|
#include "aqhome/ipc/data/msg_data_multidata.h"
|
|
#include "aqhome/ipc/data/ipc_data.h"
|
|
|
|
#include <gwenhywfar/gwenhywfar.h>
|
|
#include <gwenhywfar/debug.h>
|
|
#include <gwenhywfar/endpoint.h>
|
|
#include <gwenhywfar/text.h>
|
|
#include <gwenhywfar/json_read.h>
|
|
#include <gwenhywfar/db.h>
|
|
|
|
|
|
//#define FULL_DEBUG
|
|
|
|
|
|
/* ------------------------------------------------------------------------------------------------
|
|
* forward declarations
|
|
* ------------------------------------------------------------------------------------------------
|
|
*/
|
|
|
|
static void _handleMqttMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg);
|
|
static void _handlePublishMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg);
|
|
static int _handlePublish(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *topic, const char *value);
|
|
static void _handleNumTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *dev, AQHMQTT_TOPIC *t, const char *rcvdValue);
|
|
static void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *dev, AQHMQTT_TOPIC *t, const char *rcvdValue);
|
|
static void _sendMessage(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_VALUE *value, const char *rcvdValue);
|
|
static int _registerNewDeviceForTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue);
|
|
static AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir);
|
|
static AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir);
|
|
static GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic);
|
|
|
|
|
|
|
|
/* ------------------------------------------------------------------------------------------------
|
|
* implementations
|
|
* ------------------------------------------------------------------------------------------------
|
|
*/
|
|
|
|
void AqHomeMqttLog_ReadAndHandleMqttMessages(AQHOME_MQTT *aqh)
|
|
{
|
|
GWEN_MSG_ENDPOINT *epTcp;
|
|
GWEN_MSG *msg;
|
|
|
|
epTcp=aqh->mqttEndpoint;
|
|
while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) {
|
|
#ifdef FULL_DEBUG
|
|
DBG_ERROR(NULL, "Received this message:");
|
|
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
|
|
#endif
|
|
_handleMqttMsg(aqh, epTcp, msg);
|
|
GWEN_Msg_free(msg);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
int AqHomeMqttLog_SendPing(AQHOME_MQTT *aqh)
|
|
{
|
|
GWEN_MSG_ENDPOINT *epTcp;
|
|
GWEN_MSG *msgOut;
|
|
|
|
DBG_INFO(AQH_LOGDOMAIN, "Sending PING");
|
|
epTcp=aqh->mqttEndpoint;
|
|
msgOut=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL);
|
|
if (msgOut==NULL) {
|
|
DBG_ERROR(NULL, "Error creating message");
|
|
return GWEN_ERROR_INTERNAL;
|
|
}
|
|
GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
|
|
void _handleMqttMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg)
|
|
{
|
|
if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) {
|
|
DBG_INFO(AQH_LOGDOMAIN, "PUBLISH message received");
|
|
#ifdef FULL_DEBUG
|
|
GWEN_BUFFER *buf;
|
|
|
|
buf=GWEN_Buffer_new(0, 256, 0, 1);
|
|
AQH_PublishMqttMsg_DumpToBuffer(msg, buf, "received");
|
|
fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf));
|
|
GWEN_Buffer_free(buf);
|
|
#endif
|
|
_handlePublishMsg(aqh, ep, msg);
|
|
}
|
|
else if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0)) {
|
|
DBG_INFO(AQH_LOGDOMAIN, "PING response received");
|
|
}
|
|
else {
|
|
#ifdef FULL_DEBUG
|
|
DBG_ERROR(NULL, "Received this message:");
|
|
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void _handlePublishMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg)
|
|
{
|
|
char *topic;
|
|
char *value;
|
|
|
|
topic=AQH_PublishMqttMsg_ExtractTopic(msg);
|
|
value=AQH_PublishMqttMsg_ExtractValue(msg);
|
|
|
|
if (topic && value) {
|
|
int rv;
|
|
|
|
rv=_handlePublish(aqh, ep, topic, value);
|
|
if (rv!=1) {
|
|
DBG_INFO(NULL, "New topic \"%s\", trying to register", topic);
|
|
rv=_registerNewDeviceForTopic(aqh, ep, topic, value);
|
|
if (rv==1) {
|
|
rv=_handlePublish(aqh, ep, topic, value);
|
|
if (rv!=1) {
|
|
DBG_ERROR(NULL, "Topic \"%s\" still not handled, SNH!", topic);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
DBG_ERROR(NULL, "Either topic or value missing in PUBLISH msg");
|
|
}
|
|
free(value);
|
|
free(topic);
|
|
}
|
|
|
|
|
|
|
|
int _handlePublish(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue)
|
|
{
|
|
if (rcvdTopic && *rcvdTopic) {
|
|
if (aqh->registeredDeviceList) {
|
|
AQHMQTT_DEVICE *device;
|
|
|
|
device=AQHMQTT_Device_List_First(aqh->registeredDeviceList);
|
|
while(device) {
|
|
AQHMQTT_TOPIC_LIST *topicList;
|
|
|
|
topicList=AQHMQTT_Device_GetTopicList(device);
|
|
if (topicList) {
|
|
AQHMQTT_TOPIC *topic;
|
|
|
|
topic=_findTopicMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In);
|
|
if (topic==NULL) {
|
|
topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In);
|
|
if (topic)
|
|
AQHMQTT_Topic_SetTopic(topic, rcvdTopic);
|
|
}
|
|
if (topic) {
|
|
if (AQHMQTT_Topic_GetTopicType(topic)==AQHMQTT_TopicType_Json)
|
|
_handleJsonTopic(aqh, ep, device, topic, rcvdValue);
|
|
else
|
|
_handleNumTopic(aqh, ep, device, topic, rcvdValue);
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
device=AQHMQTT_Device_List_Next(device);
|
|
}
|
|
}
|
|
DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
|
|
void _handleNumTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue)
|
|
{
|
|
AQHMQTT_VALUE_LIST *valueList;
|
|
|
|
valueList=AQHMQTT_Topic_GetValueList(topic);
|
|
if (valueList)
|
|
_sendMessage(aqh, ep, device, AQHMQTT_Value_List_First(valueList), rcvdValue);
|
|
else {
|
|
DBG_INFO(NULL, "No value list in device \"%s\"", AQHMQTT_Device_GetId(device));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_TOPIC *topic, const char *rcvdValue)
|
|
{
|
|
GWEN_JSON_ELEM *jeRoot;
|
|
|
|
jeRoot=GWEN_JsonElement_fromString(rcvdValue);
|
|
if (jeRoot==NULL) {
|
|
DBG_INFO(NULL, "Could not parse JSON value: %s", rcvdValue);
|
|
}
|
|
else {
|
|
AQHMQTT_VALUE_LIST *valueList;
|
|
|
|
valueList=AQHMQTT_Topic_GetValueList(topic);
|
|
if (valueList) {
|
|
AQHMQTT_VALUE *value;
|
|
|
|
value=AQHMQTT_Value_List_First(valueList);
|
|
while(value) {
|
|
const char *path;
|
|
|
|
path=AQHMQTT_Value_GetPath(value);
|
|
if (path && *path) {
|
|
GWEN_JSON_ELEM *je;
|
|
|
|
je=GWEN_JsonElement_GetElementByPath(jeRoot, path, 0);
|
|
if (je) {
|
|
const char *s;
|
|
|
|
s=GWEN_JsonElement_GetData(je);
|
|
if (s && *s)
|
|
_sendMessage(aqh, ep, device, value, s);
|
|
}
|
|
}
|
|
value=AQHMQTT_Value_List_Next(value);
|
|
} /* while */
|
|
}
|
|
GWEN_JsonElement_free(jeRoot);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void _sendMessage(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_VALUE *value, const char *rcvdValue)
|
|
{
|
|
int rv;
|
|
union {double f; uint64_t i;} u;
|
|
|
|
rv=GWEN_Text_StringToDouble(rcvdValue, &(u.f));
|
|
if (rv<0) {
|
|
DBG_ERROR(NULL, "Invalid value received from MQTT server (%s)", rcvdValue?rcvdValue:"<empty>");
|
|
}
|
|
else {
|
|
GWEN_MSG *pubMsg;
|
|
uint64_t arrayToSend[2];
|
|
AQH_VALUE *msgValue;
|
|
|
|
arrayToSend[0]=(uint64_t) time(NULL);
|
|
arrayToSend[1]=u.i;
|
|
|
|
msgValue=AQH_Value_new();
|
|
AQH_Value_SetDeviceName(msgValue, AQHMQTT_Device_GetId(device));
|
|
AQH_Value_SetName(msgValue, AQHMQTT_Value_GetName(value));
|
|
AQH_Value_SetValueUnits(msgValue, AQHMQTT_Value_GetValueUnits(value));
|
|
AQH_Value_SetValueType(msgValue, 0);
|
|
|
|
pubMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgValue, arrayToSend, 1);
|
|
if (pubMsg) {
|
|
DBG_INFO(AQH_LOGDOMAIN, "BROKER PUBLISH %s: %f", AQH_Value_GetName(msgValue), u.f);
|
|
GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, pubMsg);
|
|
}
|
|
AQH_Value_free(msgValue);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
int _registerNewDeviceForTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue)
|
|
{
|
|
if (rcvdTopic && *rcvdTopic) {
|
|
if (aqh->availableDeviceList) {
|
|
AQHMQTT_DEVICE *device;
|
|
|
|
device=AQHMQTT_Device_List_First(aqh->availableDeviceList);
|
|
while(device) {
|
|
AQHMQTT_TOPIC_LIST *topicList;
|
|
|
|
topicList=AQHMQTT_Device_GetTopicList(device);
|
|
if (topicList) {
|
|
AQHMQTT_TOPIC *topic;
|
|
|
|
topic=_findMaskMatchingTopic(topicList, rcvdTopic, AQHMQTT_TopicDir_In);
|
|
if (topic) {
|
|
GWEN_BUFFER *buf;
|
|
|
|
buf=_extractDeviceId(topic, rcvdTopic);
|
|
if (buf) {
|
|
AQHMQTT_DEVICE *newDevice;
|
|
|
|
newDevice=AQHMQTT_Device_dup(device);
|
|
AQHMQTT_Device_SetId(newDevice, GWEN_Buffer_GetStart(buf));
|
|
topic=_findMaskMatchingTopic(AQHMQTT_Device_GetTopicList(newDevice), rcvdTopic, AQHMQTT_TopicDir_In);
|
|
AQHMQTT_Topic_SetTopic(topic, rcvdTopic);
|
|
if (aqh->registeredDeviceList==NULL)
|
|
aqh->registeredDeviceList=AQHMQTT_Device_List_new();
|
|
DBG_ERROR(NULL, "Registered device \"%s\" (%s)", AQHMQTT_Device_GetId(newDevice), AQHMQTT_Device_GetName(newDevice));
|
|
AQHMQTT_Device_List_Add(newDevice, aqh->registeredDeviceList);
|
|
GWEN_Buffer_free(buf);
|
|
return 1;
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
device=AQHMQTT_Device_List_Next(device);
|
|
}
|
|
}
|
|
DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", rcvdTopic);
|
|
}
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir)
|
|
{
|
|
if (topicList) {
|
|
AQHMQTT_TOPIC *topic;
|
|
|
|
topic=AQHMQTT_Topic_List_First(topicList);
|
|
while(topic) {
|
|
if (AQHMQTT_Topic_GetDirection(topic)==dir) {
|
|
const char *sMask;
|
|
|
|
sMask=AQHMQTT_Topic_GetMask(topic);
|
|
if (sMask && *sMask && GWEN_Text_ComparePattern(rcvdTopic, sMask, 0)!=-1) {
|
|
/* found a matching topic */
|
|
return topic;
|
|
}
|
|
}
|
|
|
|
topic=AQHMQTT_Topic_List_Next(topic);
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
|
|
|
|
AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir)
|
|
{
|
|
if (topicList) {
|
|
AQHMQTT_TOPIC *topic;
|
|
|
|
topic=AQHMQTT_Topic_List_First(topicList);
|
|
while(topic) {
|
|
if (AQHMQTT_Topic_GetDirection(topic)==dir) {
|
|
const char *sTopic;
|
|
|
|
sTopic=AQHMQTT_Topic_GetTopic(topic);
|
|
if (sTopic && *sTopic && strcasecmp(rcvdTopic, sTopic)==0) {
|
|
return topic;
|
|
}
|
|
}
|
|
topic=AQHMQTT_Topic_List_Next(topic);
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
|
|
|
|
GWEN_BUFFER *_extractDeviceId(const AQHMQTT_TOPIC *topic, const char *rcvdTopic)
|
|
{
|
|
const char *sBefore;
|
|
const char *sAfter;
|
|
|
|
sBefore=AQHMQTT_Topic_GetBeforeId(topic);
|
|
sAfter=AQHMQTT_Topic_GetAfterId(topic);
|
|
|
|
if (sBefore && *sBefore && sAfter && *sAfter) {
|
|
GWEN_BUFFER *buf;
|
|
int rv;
|
|
|
|
buf=GWEN_Buffer_new(0, 256, 0, 1);
|
|
GWEN_Buffer_AppendString(buf, rcvdTopic);
|
|
rv=GWEN_Buffer_KeepTextBetweenStrings(buf, sBefore, sAfter, 1);
|
|
if (rv<0) {
|
|
DBG_INFO(NULL, "Could not extract id from [%s] (beforeId=%s, afterId=%s)", rcvdTopic, sBefore, sAfter);
|
|
GWEN_Buffer_free(buf);
|
|
return NULL;
|
|
}
|
|
|
|
return buf;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|