Improved mqtt device detection and handling. Add command to announce new values.

This commit is contained in:
Martin Preuss
2024-02-13 23:49:56 +01:00
parent de1a975586
commit eeffe225ec
8 changed files with 253 additions and 16 deletions

View File

@@ -48,6 +48,7 @@
c_getlastdatapoint.h
c_setdata.h
c_addvalue.h
c_annvalue.h
c_moddevice.h
</headers>
@@ -66,6 +67,7 @@
c_getlastdatapoint.c
c_setdata.c
c_addvalue.c
c_annvalue.c
c_moddevice.c
main.c
</sources>

View File

@@ -0,0 +1,65 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2024 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./c_annvalue.h"
#include "./aqhome_data_p.h"
#include "./loop.h"
#include "aqhome/ipc/data/ipc_data.h"
#include "aqhome/ipc/endpoint_ipc.h"
#include "aqhome/ipc/data/msg_data_values.h"
#include "aqhome/ipc/msg_ipc_tag16.h"
#include <gwenhywfar/debug.h>
/* ------------------------------------------------------------------------------------------------
* defines
* ------------------------------------------------------------------------------------------------
*/
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
void AqHomeData_HandleAnnounceValue(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
{
AQH_VALUE *recvdValue;
AQH_ValuesDataIpcMsg_Parse(msg, 0);
recvdValue=AQH_ValuesDataIpcMsg_ReadFirstValue(msg);
if (recvdValue) {
AQH_VALUE *value;
value=AqHomeData_GetOrCreateValueForDriverWithTemplate(aqh, ep, recvdValue);
if (value==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "Could not create announced value");
}
AQH_Value_free(recvdValue);
}
}

View File

@@ -0,0 +1,25 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2024 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQHOME_DATA_C_ANNVALUE_H
#define AQHOME_DATA_C_ANNVALUE_H
#include "./aqhome_data.h"
void AqHomeData_HandleAnnounceValue(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg);
#endif

View File

@@ -20,6 +20,7 @@
#include "./c_getdevices.h"
#include "./c_setdata.h"
#include "./c_addvalue.h"
#include "./c_annvalue.h"
#include "./c_moddevice.h"
#include "./aqhome_data_p.h"
#include "aqhome/ipc/data/ipc_data.h"
@@ -233,6 +234,7 @@ void _handleIpcMsg(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
case AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ: AqHomeData_HandleGetLastDataPoint(aqh, ep, msg); break;
case AQH_MSGTYPE_IPC_DATA_SETDATA: AqHomeData_HandleSetData(aqh, ep, msg); break;
case AQH_MSGTYPE_IPC_DATA_ADDVALUE: AqHomeData_HandleAddValue(aqh, ep, msg); break;
case AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE: AqHomeData_HandleAnnounceValue(aqh, ep, msg); break;
case AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ: AqHomeData_HandleGetDevices(aqh, ep, msg); break;
case AQH_MSGTYPE_IPC_DATA_MODDEVICE_REQ: AqHomeData_HandleModDevice(aqh, ep, msg); break;
default: break;

View File

@@ -14,6 +14,7 @@
#include "./aqhome_mqtt_p.h"
#include "aqhome/mqtt/msg_mqtt_publish.h"
#include "aqhome/ipc/data/msg_data_multidata.h"
#include "aqhome/ipc/data/msg_data_values.h"
#include "aqhome/ipc/data/ipc_data.h"
#include <gwenhywfar/gwenhywfar.h>
@@ -37,7 +38,11 @@ static void _handlePublishMsg(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const GWE
static int _handlePublish(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *topic, const char *value);
static void _handleNumTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *dev, AQHMQTT_TOPIC *t, const char *rcvdValue);
static void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *dev, AQHMQTT_TOPIC *t, const char *rcvdValue);
static void _sendMessage(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_VALUE *value, const char *rcvdValue);
static void _sendMessage(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value, const char *rcvdValue);
static void _announceDeviceToBroker(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device);
static void _sendAnnounceValueMessage(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value);
static AQH_VALUE *_mkMessageValue(const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value);
static int _mqttValueTypeTessageValueType(int t);
static int _registerNewDeviceForTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const char *rcvdTopic, const char *rcvdValue);
static AQHMQTT_TOPIC *_findMaskMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir);
static AQHMQTT_TOPIC *_findTopicMatchingTopic(AQHMQTT_TOPIC_LIST *topicList, const char *rcvdTopic, int dir);
@@ -202,7 +207,7 @@ void _handleNumTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *de
valueList=AQHMQTT_Topic_GetValueList(topic);
if (valueList)
_sendMessage(aqh, ep, device, AQHMQTT_Value_List_First(valueList), rcvdValue);
_sendMessage(aqh, device, AQHMQTT_Value_List_First(valueList), rcvdValue);
else {
DBG_INFO(NULL, "No value list in device \"%s\"", AQHMQTT_Device_GetId(device));
}
@@ -239,7 +244,7 @@ void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *d
s=GWEN_JsonElement_GetData(je);
if (s && *s)
_sendMessage(aqh, ep, device, value, s);
_sendMessage(aqh, device, value, s);
}
}
value=AQHMQTT_Value_List_Next(value);
@@ -251,7 +256,7 @@ void _handleJsonTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *d
void _sendMessage(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *device, AQHMQTT_VALUE *value, const char *rcvdValue)
void _sendMessage(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value, const char *rcvdValue)
{
int rv;
union {double f; uint64_t i;} u;
@@ -268,18 +273,87 @@ void _sendMessage(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, AQHMQTT_DEVICE *devic
arrayToSend[0]=(uint64_t) time(NULL);
arrayToSend[1]=u.i;
msgValue=_mkMessageValue(device, value);
pubMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgValue, arrayToSend, 1);
if (pubMsg) {
DBG_INFO(AQH_LOGDOMAIN, "BROKER UPDATE_DATA %s: %f", AQH_Value_GetName(msgValue), u.f);
GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, pubMsg);
}
AQH_Value_free(msgValue);
}
}
void _announceDeviceToBroker(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device)
{
AQHMQTT_TOPIC_LIST *topicList;
topicList=AQHMQTT_Device_GetTopicList(device);
if (topicList) {
AQHMQTT_TOPIC *topic;
topic=AQHMQTT_Topic_List_First(topicList);
while(topic) {
AQHMQTT_VALUE_LIST *valueList;
valueList=AQHMQTT_Topic_GetValueList(topic);
if (valueList) {
const AQHMQTT_VALUE *value;
value=AQHMQTT_Value_List_First(valueList);
while(value) {
_sendAnnounceValueMessage(aqh, device, value);
value=AQHMQTT_Value_List_Next(value);
}
}
topic=AQHMQTT_Topic_List_Next(topic);
}
}
}
void _sendAnnounceValueMessage(AQHOME_MQTT *aqh, const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value)
{
GWEN_MSG *pubMsg;
AQH_VALUE *msgValue;
msgValue=_mkMessageValue(device, value);
pubMsg=AQH_ValuesDataIpcMsg_newForOneValue(AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE, 0, msgValue);
if (pubMsg) {
DBG_INFO(AQH_LOGDOMAIN, "BROKER ANNOUNCE_VALUE %s", AQH_Value_GetName(msgValue));
GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, pubMsg);
}
AQH_Value_free(msgValue);
}
AQH_VALUE *_mkMessageValue(const AQHMQTT_DEVICE *device, const AQHMQTT_VALUE *value)
{
AQH_VALUE *msgValue;
msgValue=AQH_Value_new();
AQH_Value_SetDeviceName(msgValue, AQHMQTT_Device_GetId(device));
AQH_Value_SetName(msgValue, AQHMQTT_Value_GetName(value));
AQH_Value_SetValueUnits(msgValue, AQHMQTT_Value_GetValueUnits(value));
AQH_Value_SetValueType(msgValue, 0);
AQH_Value_SetValueType(msgValue, _mqttValueTypeTessageValueType(AQHMQTT_Value_GetValueType(value)));
return msgValue;
}
pubMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgValue, arrayToSend, 1);
if (pubMsg) {
DBG_INFO(AQH_LOGDOMAIN, "BROKER PUBLISH %s: %f", AQH_Value_GetName(msgValue), u.f);
GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, pubMsg);
}
AQH_Value_free(msgValue);
int _mqttValueTypeTessageValueType(int t)
{
switch(t){
case AQHMQTT_ValueType_Sensor: return AQH_ValueType_Sensor;
case AQHMQTT_ValueType_Actor: return AQH_ValueType_Actor;
default:
DBG_ERROR(AQH_LOGDOMAIN, "Invalid mqtt value type %d", t);
return AQH_ValueType_Sensor;
}
}
@@ -315,7 +389,7 @@ int _registerNewDeviceForTopic(AQHOME_MQTT *aqh, GWEN_MSG_ENDPOINT *ep, const ch
aqh->registeredDeviceList=AQHMQTT_Device_List_new();
DBG_ERROR(NULL, "Registered device \"%s\" (%s)", AQHMQTT_Device_GetId(newDevice), AQHMQTT_Device_GetName(newDevice));
AQHMQTT_Device_List_Add(newDevice, aqh->registeredDeviceList);
/* TODO: add devices and values to data server */
_announceDeviceToBroker(aqh, newDevice);
GWEN_Buffer_free(buf);
return 1;
}

View File

@@ -14,6 +14,7 @@
#include "./aqhome_mqtt_p.h"
#include "aqhome-mqttlog/types/topic.h"
#include "aqhome-mqttlog/types/value.h"
#include "aqhome-mqttlog/types/translation.h"
#include <aqhome/api.h>
#include <aqhome/aqhome.h>
@@ -46,6 +47,8 @@ static AQHMQTT_TOPIC_LIST *_readXmlTopicList(AQHOME_MQTT *aqh, GWEN_XMLNODE *par
static AQHMQTT_TOPIC *_readXmlTopic(AQHOME_MQTT *aqh, GWEN_XMLNODE *topicNode);
static AQHMQTT_VALUE_LIST *_readXmlValueList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode);
static AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode);
static AQHMQTT_TRANSLATION_LIST *_readXmlTranslationList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode);
static AQHMQTT_TRANSLATION *_readXmlTranslation(AQHOME_MQTT *aqh, GWEN_XMLNODE *translationNode);
@@ -331,6 +334,7 @@ AQHMQTT_VALUE_LIST *_readXmlValueList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode
AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode)
{
AQHMQTT_VALUE *value;
GWEN_XMLNODE *translationNode;
const char *s;
int i;
@@ -348,9 +352,74 @@ AQHMQTT_VALUE *_readXmlValue(AQHOME_MQTT *aqh, GWEN_XMLNODE *valueNode)
}
AQHMQTT_Value_SetValueType(value, i);
/* TODO: read translationList */
translationNode=GWEN_XMLNode_FindFirstTag(valueNode, "translations", NULL, NULL);
if (translationNode) {
AQHMQTT_TRANSLATION_LIST *translationList;
translationList=_readXmlTranslationList(aqh, translationNode);
if (translationList) {
DBG_INFO(NULL, "Translations read");
AQHMQTT_Value_SetTranslationList(value, translationList);
}
}
else {
DBG_INFO(NULL, "No <translations> element");
}
return value;
}
AQHMQTT_TRANSLATION_LIST *_readXmlTranslationList(AQHOME_MQTT *aqh, GWEN_XMLNODE *parentNode)
{
AQHMQTT_TRANSLATION_LIST *translationList;
GWEN_XMLNODE *translationNode;
translationList=AQHMQTT_Value_List_new();
translationNode=GWEN_XMLNode_FindFirstTag(parentNode, "translation", NULL, NULL);
while(translationNode) {
AQHMQTT_TRANSLATION *translation=_readXmlTranslation(aqh, translationNode);
if (translation)
AQHMQTT_Translation_List_Add(translation, translationList);
else {
DBG_INFO(NULL, "Error reading <translation> element");
AQHMQTT_Translation_List_free(translationList);
return NULL;
}
translationNode=GWEN_XMLNode_FindNextTag(translationNode, "translation", NULL, NULL);
}
if (AQHMQTT_Translation_List_GetCount(translationList)<1) {
AQHMQTT_Translation_List_free(translationList);
return NULL;
}
return translationList;
}
AQHMQTT_TRANSLATION *_readXmlTranslation(AQHOME_MQTT *aqh, GWEN_XMLNODE *translationNode)
{
const char *sAqhValue;
const char *sDriverValue;
sAqhValue=GWEN_XMLNode_GetProperty(translationNode, "aqhValue", NULL);
sDriverValue=GWEN_XMLNode_GetProperty(translationNode, "driverValue", NULL);
if (sAqhValue && *sAqhValue && sDriverValue && *sDriverValue) {
AQHMQTT_TRANSLATION *translation;
translation=AQHMQTT_Translation_new();
AQHMQTT_Translation_SetAqhValue(translation, sAqhValue);
AQHMQTT_Translation_SetDriverValue(translation, sDriverValue);
return translation;
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Either AqhValue or DriverValue missing in device description file");
return NULL;
}
}

View File

@@ -49,7 +49,7 @@
#define AQH_MSGTYPE_IPC_DATA_MODDEVICE_REQ 0xd00 /* AQH_DevicesDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE 0xe00 /* AQH_ValuesDataIpcMsg */

View File

@@ -27,8 +27,8 @@
<values>
<value name="power" type="actor">
<translations>
<translation aqh_value="0.0" dev_value="off"/>
<translation aqh_value="1.0" dev_value="on"/>
<translation aqhValue="0.0" driverValue="off"/>
<translation aqhValue="1.0" driverValue="on"/>
</translations>
</value>
</values>