aqhome apps: more work on transition to events2.

This commit is contained in:
Martin Preuss
2025-03-09 00:06:12 +01:00
parent ca2103f7b3
commit ea564ba101
32 changed files with 475 additions and 268 deletions

View File

@@ -146,7 +146,7 @@ void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop)
while(!stopService) { while(!stopService) {
time_t now; time_t now;
DBG_ERROR(NULL, "Next loop (%d clients)", AqHomeDataServer_GetClientNum(aqh)); DBG_INFO(NULL, "Next loop (%d clients)", AqHomeDataServer_GetClientNum(aqh));
AQH_EventLoop_Run(eventLoop, 2000); AQH_EventLoop_Run(eventLoop, 2000);
AqHomeDataServer_HandleClientMsgs(aqh); AqHomeDataServer_HandleClientMsgs(aqh);

View File

@@ -29,30 +29,24 @@
* ------------------------------------------------------------------------------------------------ * ------------------------------------------------------------------------------------------------
*/ */
void AqHomeDataServer_HandleAddValue(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) void AqHomeDataServer_HandleAddValue(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList)
{ {
AQHOME_SERVER *xo; AQHOME_SERVER *xo;
xo=AqHomeDataServer_GetServerData(o); xo=AqHomeDataServer_GetServerData(o);
if (xo) { if (xo) {
AQH_MESSAGE *outMsg; AQH_MESSAGE *outMsg;
GWEN_TAG16_LIST *tagList;
int resultCode=AQH_MSGDATA_RESULT_SUCCESS; int resultCode=AQH_MSGDATA_RESULT_SUCCESS;
AQH_VALUE *recvdValue;
tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0); recvdValue=AQH_IpcdMessageValues_ReadFirstValue(tagList);
if (tagList) { if (recvdValue) {
AQH_VALUE *recvdValue; AQH_VALUE *value;
recvdValue=AQH_IpcdMessageValues_ReadFirstValue(tagList); value=AqHomeDataServer_GetOrCreateValueForDriverWithTemplate(o, ep, recvdValue);
if (recvdValue) { if (value==NULL)
AQH_VALUE *value; resultCode=AQH_MSGDATA_RESULT_ERROR_PERMS;
AQH_Value_free(recvdValue);
value=AqHomeDataServer_GetOrCreateValueForDriverWithTemplate(o, ep, recvdValue);
if (value==NULL)
resultCode=AQH_MSGDATA_RESULT_ERROR_PERMS;
AQH_Value_free(recvdValue);
}
GWEN_Tag16_List_free(tagList);
} }
else else
resultCode=AQH_MSGDATA_RESULT_ERROR_BADDATA; resultCode=AQH_MSGDATA_RESULT_ERROR_BADDATA;

View File

@@ -12,8 +12,10 @@
#include "./server.h" #include "./server.h"
#include <gwenhywfar/tag16.h>
void AqHomeDataServer_HandleAddValue(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
void AqHomeDataServer_HandleAddValue(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList);

View File

@@ -29,24 +29,18 @@
* ------------------------------------------------------------------------------------------------ * ------------------------------------------------------------------------------------------------
*/ */
void AqHomeDataServer_HandleAnnounceValue(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) void AqHomeDataServer_HandleAnnounceValue(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList)
{ {
AQHOME_SERVER *xo; AQHOME_SERVER *xo;
xo=AqHomeDataServer_GetServerData(o); xo=AqHomeDataServer_GetServerData(o);
if (xo) { if (xo) {
GWEN_TAG16_LIST *tagList; AQH_VALUE *recvdValue;
tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0); recvdValue=AQH_IpcdMessageValues_ReadFirstValue(tagList);
if (tagList) { if (recvdValue) {
AQH_VALUE *recvdValue; AqHomeDataServer_GetOrCreateValueForDriverWithTemplate(o, ep, recvdValue);
AQH_Value_free(recvdValue);
recvdValue=AQH_IpcdMessageValues_ReadFirstValue(tagList);
if (recvdValue) {
AqHomeDataServer_GetOrCreateValueForDriverWithTemplate(o, ep, recvdValue);
AQH_Value_free(recvdValue);
}
GWEN_Tag16_List_free(tagList);
} }
} }
} }

View File

@@ -12,8 +12,10 @@
#include "./server.h" #include "./server.h"
#include <gwenhywfar/tag16.h>
void AqHomeDataServer_HandleAnnounceValue(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
void AqHomeDataServer_HandleAnnounceValue(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList);

View File

@@ -29,9 +29,8 @@
* ------------------------------------------------------------------------------------------------ * ------------------------------------------------------------------------------------------------
*/ */
void AqHomeDataServer_HandleConnect(GWEN_UNUSED AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) void AqHomeDataServer_HandleConnect(GWEN_UNUSED AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList)
{ {
GWEN_TAG16_LIST *tagList;
AQH_MESSAGE *outMsg; AQH_MESSAGE *outMsg;
int resultCode=AQH_MSGDATA_RESULT_SUCCESS; int resultCode=AQH_MSGDATA_RESULT_SUCCESS;
char *clientId=NULL; char *clientId=NULL;
@@ -39,7 +38,6 @@ void AqHomeDataServer_HandleConnect(GWEN_UNUSED AQH_OBJECT *o, AQH_OBJECT *ep, c
char *passw=NULL; char *passw=NULL;
uint32_t flags; uint32_t flags;
tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0);
clientId=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSG_CONNECT_TAGS_CLIENTID, NULL); clientId=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSG_CONNECT_TAGS_CLIENTID, NULL);
userId=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSG_CONNECT_TAGS_USERID, NULL); userId=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSG_CONNECT_TAGS_USERID, NULL);
flags=AQH_Tag16_GetTagDataAsUint32(tagList, AQH_MSG_CONNECT_TAGS_FLAGS, 0); flags=AQH_Tag16_GetTagDataAsUint32(tagList, AQH_MSG_CONNECT_TAGS_FLAGS, 0);

View File

@@ -12,8 +12,10 @@
#include "./server.h" #include "./server.h"
#include <gwenhywfar/tag16.h>
void AqHomeDataServer_HandleConnect(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
void AqHomeDataServer_HandleConnect(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList);

View File

@@ -60,7 +60,7 @@ static void _getAndSendLastDatapoint(AQH_STORAGE *storage, AQH_OBJECT *ep, const
*/ */
void AqHomeDataServer_HandleGetDataPoints(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg) void AqHomeDataServer_HandleGetDataPoints(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg, const GWEN_TAG16_LIST *tagList)
{ {
AQHOME_SERVER *xo; AQHOME_SERVER *xo;
@@ -70,44 +70,34 @@ void AqHomeDataServer_HandleGetDataPoints(AQH_OBJECT *o, AQH_OBJECT *ep, const A
refMsgId=AQH_IpcMessage_GetMsgId(recvdMsg); refMsgId=AQH_IpcMessage_GetMsgId(recvdMsg);
if (AQH_Endpoint_GetPermissions(ep) & AQH_ENDPOINT_PERMS_READDATA) { if (AQH_Endpoint_GetPermissions(ep) & AQH_ENDPOINT_PERMS_READDATA) {
GWEN_TAG16_LIST *tagList; char *valueName;
tagList=AQH_IpcMessageTag16_ParsePayload(recvdMsg, 0); valueName=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSGDATA_GETDATA_TAGS_NAME, NULL);
if (tagList) { if (valueName && *valueName) {
char *valueName; AQH_VALUE *value;
uint64_t tsBegin;
uint64_t tsEnd;
uint64_t numRequested;
valueName=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSGDATA_GETDATA_TAGS_NAME, NULL); tsBegin=AQH_Tag16_GetTagDataAsUint64(tagList, AQH_MSGDATA_GETDATA_TAGS_BEGIN, 0);
if (valueName && *valueName) { tsEnd=AQH_Tag16_GetTagDataAsUint64(tagList, AQH_MSGDATA_GETDATA_TAGS_END, 0);
AQH_VALUE *value; numRequested=AQH_Tag16_GetTagDataAsUint64(tagList, AQH_MSGDATA_GETDATA_TAGS_NUM, 0);
uint64_t tsBegin;
uint64_t tsEnd;
uint64_t numRequested;
tsBegin=AQH_Tag16_GetTagDataAsUint64(tagList, AQH_MSGDATA_GETDATA_TAGS_BEGIN, 0); value=AQH_Storage_GetValueByNameForSystem(xo->storage, valueName);
tsEnd=AQH_Tag16_GetTagDataAsUint64(tagList, AQH_MSGDATA_GETDATA_TAGS_END, 0); if (value) {
numRequested=AQH_Tag16_GetTagDataAsUint64(tagList, AQH_MSGDATA_GETDATA_TAGS_NUM, 0); int resultCode;
value=AQH_Storage_GetValueByNameForSystem(xo->storage, valueName); resultCode=_getAndSendDataPoints(xo->storage, ep, value, tsBegin, tsEnd, numRequested, refMsgId);
if (value) { AqHomeDataServer_SendResponseResultToEndpoint(ep, refMsgId, resultCode);
int resultCode;
resultCode=_getAndSendDataPoints(xo->storage, ep, value, tsBegin, tsEnd, numRequested, refMsgId);
AqHomeDataServer_SendResponseResultToEndpoint(ep, refMsgId, resultCode);
}
else {
DBG_INFO(NULL, "Value \"%s\" does not exist", valueName);
AqHomeDataServer_SendResponseResultToEndpoint(ep, refMsgId, AQH_MSGDATA_RESULT_ERROR_NOTFOUND);
}
free(valueName);
} }
else { else {
DBG_INFO(NULL, "Missing value name"); DBG_INFO(NULL, "Value \"%s\" does not exist", valueName);
AqHomeDataServer_SendResponseResultToEndpoint(ep, refMsgId, AQH_MSGDATA_RESULT_ERROR_BADDATA); AqHomeDataServer_SendResponseResultToEndpoint(ep, refMsgId, AQH_MSGDATA_RESULT_ERROR_NOTFOUND);
} }
GWEN_Tag16_List_free(tagList); free(valueName);
} }
else { else {
DBG_ERROR(AQH_LOGDOMAIN, "No value"); DBG_INFO(NULL, "Missing value name");
AqHomeDataServer_SendResponseResultToEndpoint(ep, refMsgId, AQH_MSGDATA_RESULT_ERROR_BADDATA); AqHomeDataServer_SendResponseResultToEndpoint(ep, refMsgId, AQH_MSGDATA_RESULT_ERROR_BADDATA);
} }
} }

View File

@@ -12,8 +12,10 @@
#include "./server.h" #include "./server.h"
#include <gwenhywfar/tag16.h>
void AqHomeDataServer_HandleGetDataPoints(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg);
void AqHomeDataServer_HandleGetDataPoints(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg, const GWEN_TAG16_LIST *tagList);

View File

@@ -47,7 +47,7 @@ static void _sendDeviceList(AQH_OBJECT *ep, const AQH_DEVICE_LIST *vl, uint32_t
* ------------------------------------------------------------------------------------------------ * ------------------------------------------------------------------------------------------------
*/ */
void AqHomeDataServer_HandleGetDevices(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) void AqHomeDataServer_HandleGetDevices(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList)
{ {
AQHOME_SERVER *xo; AQHOME_SERVER *xo;

View File

@@ -12,8 +12,10 @@
#include "./server.h" #include "./server.h"
#include <gwenhywfar/tag16.h>
void AqHomeDataServer_HandleGetDevices(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
void AqHomeDataServer_HandleGetDevices(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList);

View File

@@ -47,7 +47,7 @@ static void _sendValueList(AQH_OBJECT *ep, const AQH_VALUE_LIST *vl, uint32_t fl
* ------------------------------------------------------------------------------------------------ * ------------------------------------------------------------------------------------------------
*/ */
void AqHomeDataServer_HandleGetValues(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) void AqHomeDataServer_HandleGetValues(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList)
{ {
AQHOME_SERVER *xo; AQHOME_SERVER *xo;

View File

@@ -12,8 +12,10 @@
#include "./server.h" #include "./server.h"
#include <gwenhywfar/tag16.h>
void AqHomeDataServer_HandleGetValues(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
void AqHomeDataServer_HandleGetValues(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList);

View File

@@ -34,7 +34,7 @@
*/ */
void AqHomeDataServer_HandleModDevice(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg) void AqHomeDataServer_HandleModDevice(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg, const GWEN_TAG16_LIST *tagList)
{ {
AQHOME_SERVER *xo; AQHOME_SERVER *xo;
@@ -43,61 +43,52 @@ void AqHomeDataServer_HandleModDevice(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_M
int resultCode=AQH_MSGDATA_RESULT_SUCCESS; int resultCode=AQH_MSGDATA_RESULT_SUCCESS;
if (AQH_Endpoint_GetPermissions(ep) & AQH_ENDPOINT_PERMS_MODDEVICE) { if (AQH_Endpoint_GetPermissions(ep) & AQH_ENDPOINT_PERMS_MODDEVICE) {
GWEN_TAG16_LIST *tagList; AQH_DEVICE *device;
tagList=AQH_IpcMessageTag16_ParsePayload(recvdMsg, 0); device=AQH_IpcdMessageDevices_ReadFirstDevice(tagList);
if (tagList) { if (device) {
AQH_DEVICE *device; const char *deviceNameForSystem;
device=AQH_IpcdMessageDevices_ReadFirstDevice(tagList); deviceNameForSystem=AQH_Device_GetNameForSystem(device);
if (device) { if (deviceNameForSystem && *deviceNameForSystem) {
const char *deviceNameForSystem; AQH_DEVICE *storedDevice;
deviceNameForSystem=AQH_Device_GetNameForSystem(device); storedDevice=AQH_Storage_GetDeviceByNameForSystem(xo->storage, deviceNameForSystem);
if (deviceNameForSystem && *deviceNameForSystem) { if (storedDevice) {
AQH_DEVICE *storedDevice; const char *s;
storedDevice=AQH_Storage_GetDeviceByNameForSystem(xo->storage, deviceNameForSystem); s=AQH_Device_GetNameForGui(device);
if (storedDevice) { if (s && *s)
const char *s; AQH_Device_SetNameForGui(storedDevice, s);
s=AQH_Device_GetNameForGui(device); s=AQH_Device_GetRoomName(device);
if (s && *s) if (s && *s)
AQH_Device_SetNameForGui(storedDevice, s); AQH_Device_SetRoomName(storedDevice, s);
s=AQH_Device_GetRoomName(device); s=AQH_Device_GetLocation(device);
if (s && *s) if (s && *s)
AQH_Device_SetRoomName(storedDevice, s); AQH_Device_SetLocation(storedDevice, s);
s=AQH_Device_GetLocation(device); s=AQH_Device_GetDescription(device);
if (s && *s) if (s && *s)
AQH_Device_SetLocation(storedDevice, s); AQH_Device_SetDescription(storedDevice, s);
s=AQH_Device_GetDescription(device); AQH_Storage_AddRuntimeFlags(xo->storage, AQH_STORAGE_RTFLAGS_MODIFIED);
if (s && *s) resultCode=AQH_MSGDATA_RESULT_SUCCESS;
AQH_Device_SetDescription(storedDevice, s);
AQH_Storage_AddRuntimeFlags(xo->storage, AQH_STORAGE_RTFLAGS_MODIFIED);
resultCode=AQH_MSGDATA_RESULT_SUCCESS;
}
else {
DBG_INFO(NULL, "Device \"%s\" not found", deviceNameForSystem);
resultCode=AQH_MSGDATA_RESULT_ERROR_NOTFOUND;
}
} }
else { else {
DBG_INFO(NULL, "No name for value"); DBG_INFO(NULL, "Device \"%s\" not found", deviceNameForSystem);
resultCode=AQH_MSGDATA_RESULT_ERROR_NOTFOUND; resultCode=AQH_MSGDATA_RESULT_ERROR_NOTFOUND;
} }
} }
else { else {
DBG_INFO(NULL, "No device info in message"); DBG_INFO(NULL, "No name for value");
resultCode=AQH_MSGDATA_RESULT_ERROR_INVALID; resultCode=AQH_MSGDATA_RESULT_ERROR_NOTFOUND;
} }
} }
else { else {
DBG_INFO(NULL, "No tag16 list in message"); DBG_INFO(NULL, "No device info in message");
resultCode=AQH_MSGDATA_RESULT_ERROR_BADDATA; resultCode=AQH_MSGDATA_RESULT_ERROR_INVALID;
} }
} }
else { else {

View File

@@ -12,8 +12,10 @@
#include "./server.h" #include "./server.h"
#include <gwenhywfar/tag16.h>
void AqHomeDataServer_HandleModDevice(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg);
void AqHomeDataServer_HandleModDevice(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg, const GWEN_TAG16_LIST *tagList);

View File

@@ -60,75 +60,69 @@ static void _subRqAbort(AQH_MSG_REQUEST *rq, int reason);
* ------------------------------------------------------------------------------------------------ * ------------------------------------------------------------------------------------------------
*/ */
void AqHomeDataServer_HandleSetData(AQH_OBJECT *o, AQH_OBJECT *epSrc, const AQH_MESSAGE *recvdMsg) void AqHomeDataServer_HandleSetData(AQH_OBJECT *o, AQH_OBJECT *epSrc, const AQH_MESSAGE *recvdMsg, const GWEN_TAG16_LIST *tagList)
{ {
AQHOME_SERVER *xo; AQHOME_SERVER *xo;
xo=AqHomeDataServer_GetServerData(o); xo=AqHomeDataServer_GetServerData(o);
if (xo) { if (xo) {
uint32_t msgId; uint32_t msgId;
GWEN_TAG16_LIST *tagList; AQH_VALUE *recvdValue;
msgId=AQH_IpcMessage_GetMsgId(recvdMsg); msgId=AQH_IpcMessage_GetMsgId(recvdMsg);
DBG_INFO(NULL, "Received IPC SetDataRequest message (msgId=%d)", msgId); DBG_INFO(NULL, "Received IPC SetDataRequest message (msgId=%d)", msgId);
tagList=AQH_IpcMessageTag16_ParsePayload(recvdMsg, 0); recvdValue=AQH_IpcdMessageSetData_ReadValue(tagList);
if (tagList) { if (recvdValue) {
AQH_VALUE *recvdValue; const char *valueName;
char *valueDataFreeable;
AQH_VALUE *systemValue;
recvdValue=AQH_IpcdMessageSetData_ReadValue(tagList); valueName=AQH_Value_GetNameForSystem(recvdValue);
if (recvdValue) { valueDataFreeable=AQH_IpcdMessageSetData_ReadData(tagList);
const char *valueName;
char *valueDataFreeable;
AQH_VALUE *systemValue;
valueName=AQH_Value_GetNameForSystem(recvdValue); systemValue=AQH_Storage_GetValueByNameForSystem(xo->storage, valueName);
valueDataFreeable=AQH_IpcdMessageSetData_ReadData(tagList); if (systemValue) {
if (AQH_Value_GetValueType(systemValue)==AQH_ValueType_Actor) {
const char *driverName;
systemValue=AQH_Storage_GetValueByNameForSystem(xo->storage, valueName); driverName=AQH_Value_GetDriver(systemValue);
if (systemValue) { if (driverName && *driverName) {
if (AQH_Value_GetValueType(systemValue)==AQH_ValueType_Actor) { AQH_OBJECT *epDriver;
const char *driverName;
driverName=AQH_Value_GetDriver(systemValue); epDriver=AqHomeDataServer_GetIpcEndpointByServiceName(o, driverName);
if (driverName && *driverName) { if (epDriver) {
AQH_OBJECT *epDriver; AQH_MSG_REQUEST *rq;
epDriver=AqHomeDataServer_GetIpcEndpointByServiceName(o, driverName); DBG_ERROR(NULL, "Creating SETDATA request for driver endpoint (%s)", AQH_Endpoint_GetServiceName(epDriver));
if (epDriver) { rq=_mkRequest_SetData(o, epSrc, msgId, epDriver, systemValue, valueDataFreeable);
AQH_MSG_REQUEST *rq; AqHomeDataServer_AddRequestToTree(o, rq);
DBG_ERROR(NULL, "Creating SETDATA request for driver endpoint (%s)", AQH_Endpoint_GetServiceName(epDriver));
rq=_mkRequest_SetData(o, epSrc, msgId, epDriver, systemValue, valueDataFreeable);
AqHomeDataServer_AddRequestToTree(o, rq);
}
else {
DBG_ERROR(NULL, "Driver \"%s\" not available", driverName);
AqHomeDataServer_SendResponseResultToEndpoint(epSrc, msgId, AQH_MSGDATA_RESULT_ERROR_GENERIC);
}
} }
else { else {
DBG_ERROR(NULL, "No driver name"); DBG_ERROR(NULL, "Driver \"%s\" not available", driverName);
AqHomeDataServer_SendResponseResultToEndpoint(epSrc, msgId, AQH_MSGDATA_RESULT_ERROR_GENERIC); AqHomeDataServer_SendResponseResultToEndpoint(epSrc, msgId, AQH_MSGDATA_RESULT_ERROR_GENERIC);
} }
} /* if actor */
else {
DBG_ERROR(NULL, "Value \"%s\" is not an actor", valueName);
AqHomeDataServer_SendResponseResultToEndpoint(epSrc, msgId, AQH_MSGDATA_RESULT_ERROR_INVALID);
} }
} else {
DBG_ERROR(NULL, "No driver name");
AqHomeDataServer_SendResponseResultToEndpoint(epSrc, msgId, AQH_MSGDATA_RESULT_ERROR_GENERIC);
}
} /* if actor */
else { else {
DBG_ERROR(NULL, "Unknown value \"%s\"", valueName); DBG_ERROR(NULL, "Value \"%s\" is not an actor", valueName);
AqHomeDataServer_SendResponseResultToEndpoint(epSrc, msgId, AQH_MSGDATA_RESULT_ERROR_NOTFOUND); AqHomeDataServer_SendResponseResultToEndpoint(epSrc, msgId, AQH_MSGDATA_RESULT_ERROR_INVALID);
} }
AQH_Value_free(recvdValue);
free(valueDataFreeable);
} /* if recvdValue */
else {
DBG_ERROR(NULL, "No value in message");
AqHomeDataServer_SendResponseResultToEndpoint(epSrc, msgId, AQH_MSGDATA_RESULT_ERROR_BADDATA);
} }
GWEN_Tag16_List_free(tagList); else {
DBG_ERROR(NULL, "Unknown value \"%s\"", valueName);
AqHomeDataServer_SendResponseResultToEndpoint(epSrc, msgId, AQH_MSGDATA_RESULT_ERROR_NOTFOUND);
}
AQH_Value_free(recvdValue);
free(valueDataFreeable);
} /* if recvdValue */
else {
DBG_ERROR(NULL, "No value in message");
AqHomeDataServer_SendResponseResultToEndpoint(epSrc, msgId, AQH_MSGDATA_RESULT_ERROR_BADDATA);
} }
} }
} }

View File

@@ -12,8 +12,10 @@
#include "./server.h" #include "./server.h"
#include <gwenhywfar/tag16.h>
void AqHomeDataServer_HandleSetData(AQH_OBJECT *o, AQH_OBJECT *epSrc, const AQH_MESSAGE *recvdMsg);
void AqHomeDataServer_HandleSetData(AQH_OBJECT *o, AQH_OBJECT *epSrc, const AQH_MESSAGE *recvdMsg, const GWEN_TAG16_LIST *tagList);

View File

@@ -51,59 +51,53 @@ static void _sendDataChangedMsgToAllClients(AQHOME_SERVER *xo, AQH_OBJECT *epSrc
* ------------------------------------------------------------------------------------------------ * ------------------------------------------------------------------------------------------------
*/ */
void AqHomeDataServer_HandleUpdateData(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) void AqHomeDataServer_HandleUpdateData(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList)
{ {
AQHOME_SERVER *xo; AQHOME_SERVER *xo;
xo=AqHomeDataServer_GetServerData(o); xo=AqHomeDataServer_GetServerData(o);
if (xo) { if (xo) {
AQH_MESSAGE *outMsg; AQH_MESSAGE *outMsg;
GWEN_TAG16_LIST *tagList;
int resultCode=AQH_MSGDATA_RESULT_SUCCESS; int resultCode=AQH_MSGDATA_RESULT_SUCCESS;
AQH_VALUE *recvdValue;
tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0); recvdValue=AQH_IpcdMessageMultiData_ReadValue(tagList);
if (tagList) { if (recvdValue) {
AQH_VALUE *recvdValue; const char *valueName;
const uint64_t *dataPoints=NULL;
uint64_t numberOfPoints=0;
recvdValue=AQH_IpcdMessageMultiData_ReadValue(tagList); valueName=recvdValue?AQH_Value_GetName(recvdValue):NULL;
if (recvdValue) { AQH_IpcdMessageMultiData_ReadDatapoints(tagList, &dataPoints, &numberOfPoints);
const char *valueName; if (numberOfPoints>0) {
const uint64_t *dataPoints=NULL; AQH_VALUE *value;
uint64_t numberOfPoints=0;
valueName=recvdValue?AQH_Value_GetName(recvdValue):NULL; value=AqHomeDataServer_GetOrCreateValueForDriverWithTemplate(o, ep, recvdValue);
AQH_IpcdMessageMultiData_ReadDatapoints(tagList, &dataPoints, &numberOfPoints); if (value) {
if (numberOfPoints>0) { if (AQH_Endpoint_GetPermissions(ep) & AQH_ENDPOINT_PERMS_ADDDATA) {
AQH_VALUE *value; resultCode=_storeDataPoints(xo, value, dataPoints, numberOfPoints);
if (resultCode==AQH_MSGDATA_RESULT_SUCCESS)
value=AqHomeDataServer_GetOrCreateValueForDriverWithTemplate(o, ep, recvdValue); _sendDataChangedMsgToAllClients(xo, ep, value, dataPoints, numberOfPoints);
if (value) {
if (AQH_Endpoint_GetPermissions(ep) & AQH_ENDPOINT_PERMS_ADDDATA) {
resultCode=_storeDataPoints(xo, value, dataPoints, numberOfPoints);
if (resultCode==AQH_MSGDATA_RESULT_SUCCESS)
_sendDataChangedMsgToAllClients(xo, ep, value, dataPoints, numberOfPoints);
}
else {
DBG_INFO(NULL, "No permissions to add data to value \"%s\"", valueName);
resultCode=AQH_MSGDATA_RESULT_ERROR_PERMS;
}
} }
else { else {
DBG_INFO(NULL, "No permissions to add/create value \"%s\"", valueName); DBG_INFO(NULL, "No permissions to add data to value \"%s\"", valueName);
resultCode=AQH_MSGDATA_RESULT_ERROR_PERMS; resultCode=AQH_MSGDATA_RESULT_ERROR_PERMS;
} }
} }
else { else {
DBG_INFO(NULL, "No datapoints"); DBG_INFO(NULL, "No permissions to add/create value \"%s\"", valueName);
resultCode=AQH_MSGDATA_RESULT_ERROR_INVALID; resultCode=AQH_MSGDATA_RESULT_ERROR_PERMS;
} }
AQH_Value_free(recvdValue);
} }
else { else {
DBG_INFO(NULL, "No value"); DBG_INFO(NULL, "No datapoints");
resultCode=AQH_MSGDATA_RESULT_ERROR_INVALID; resultCode=AQH_MSGDATA_RESULT_ERROR_INVALID;
} }
GWEN_Tag16_List_free(tagList); AQH_Value_free(recvdValue);
}
else {
DBG_INFO(NULL, "No value");
resultCode=AQH_MSGDATA_RESULT_ERROR_INVALID;
} }
outMsg=AQH_IpcMessageResult_new(AQH_IPC_PROTOCOL_DATA_ID, outMsg=AQH_IpcMessageResult_new(AQH_IPC_PROTOCOL_DATA_ID,

View File

@@ -12,8 +12,10 @@
#include "./server.h" #include "./server.h"
#include <gwenhywfar/tag16.h>
void AqHomeDataServer_HandleUpdateData(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
void AqHomeDataServer_HandleUpdateData(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList);

View File

@@ -27,6 +27,7 @@
#include <aqhome/ipc2/tcpd_object.h> #include <aqhome/ipc2/tcpd_object.h>
#include <aqhome/msg/ipc/m_ipc.h> #include <aqhome/msg/ipc/m_ipc.h>
#include <aqhome/msg/ipc/m_ipc_result.h> #include <aqhome/msg/ipc/m_ipc_result.h>
#include <aqhome/msg/ipc/m_ipc_tag16.h>
#include <aqhome/msg/ipc/data/m_ipcd.h> #include <aqhome/msg/ipc/data/m_ipcd.h>
#include <gwenhywfar/args.h> #include <gwenhywfar/args.h>
@@ -77,7 +78,7 @@ static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObjec
static int _handleNewClient(AQH_OBJECT *o, AQH_OBJECT *clientEndpoint); static int _handleNewClient(AQH_OBJECT *o, AQH_OBJECT *clientEndpoint);
static int _handleClientDown(AQH_OBJECT *o, AQH_OBJECT *clientEndpoint); static int _handleClientDown(AQH_OBJECT *o, AQH_OBJECT *clientEndpoint);
static void _handleMsgsFromClient(AQH_OBJECT *o, AQHOME_SERVER *xo, AQH_OBJECT *ep); static void _handleMsgsFromClient(AQH_OBJECT *o, AQHOME_SERVER *xo, AQH_OBJECT *ep);
static void _handleMsgFromClient(AQH_OBJECT *o, AQH_OBJECT *ep, AQH_MESSAGE *msg); static void _handleMsgFromClient(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
static AQH_DEVICE *_getOrCreateDeviceForDriver(AQHOME_SERVER *xo, AQH_OBJECT *epDriver, const char *deviceName); static AQH_DEVICE *_getOrCreateDeviceForDriver(AQHOME_SERVER *xo, AQH_OBJECT *epDriver, const char *deviceName);
static int _createPidFile(const char *pidFilename); static int _createPidFile(const char *pidFilename);
static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs); static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs);
@@ -418,31 +419,36 @@ void _handleMsgsFromClient(AQH_OBJECT *o, AQHOME_SERVER *xo, AQH_OBJECT *ep)
void _handleMsgFromClient(AQH_OBJECT *o, AQH_OBJECT *ep, AQH_MESSAGE *msg) void _handleMsgFromClient(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg)
{ {
uint16_t code; GWEN_TAG16_LIST *tagList;
uint8_t protoId;
/* exec IPC message */ tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0);
code=AQH_IpcMessage_GetCode(msg); if (tagList) {
protoId=AQH_IpcMessage_GetProtoId(msg); uint16_t code;
if (protoId==AQH_IPC_PROTOCOL_DATA_ID) { uint8_t protoId;
DBG_ERROR(NULL, "Received IPC packet %d (%x)", (int) code, code);
switch(code) { code=AQH_IpcMessage_GetCode(msg);
case AQH_MSGTYPE_IPC_DATA_CONNECT_REQ: AqHomeDataServer_HandleConnect(o, ep, msg); break; protoId=AQH_IpcMessage_GetProtoId(msg);
case AQH_MSGTYPE_IPC_DATA_UPDATEDATA: AqHomeDataServer_HandleUpdateData(o, ep, msg); break; if (protoId==AQH_IPC_PROTOCOL_DATA_ID) {
case AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ: AqHomeDataServer_HandleGetValues(o, ep, msg); break; DBG_INFO(NULL, "Received IPC packet %d (%x)", (int) code, code);
case AQH_MSGTYPE_IPC_DATA_GETDATA_REQ: AqHomeDataServer_HandleGetDataPoints(o, ep, msg); break; switch(code) {
case AQH_MSGTYPE_IPC_DATA_SETDATA: AqHomeDataServer_HandleSetData(o, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_CONNECT_REQ: AqHomeDataServer_HandleConnect(o, ep, msg, tagList); break;
case AQH_MSGTYPE_IPC_DATA_ADDVALUE: AqHomeDataServer_HandleAddValue(o, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_UPDATEDATA: AqHomeDataServer_HandleUpdateData(o, ep, msg, tagList); break;
case AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE: AqHomeDataServer_HandleAnnounceValue(o, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ: AqHomeDataServer_HandleGetValues(o, ep, msg, tagList); break;
case AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ: AqHomeDataServer_HandleGetDevices(o, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_GETDATA_REQ: AqHomeDataServer_HandleGetDataPoints(o, ep, msg, tagList); break;
case AQH_MSGTYPE_IPC_DATA_MODDEVICE_REQ: AqHomeDataServer_HandleModDevice(o, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_SETDATA: AqHomeDataServer_HandleSetData(o, ep, msg, tagList); break;
default: break; case AQH_MSGTYPE_IPC_DATA_ADDVALUE: AqHomeDataServer_HandleAddValue(o, ep, msg, tagList); break;
case AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE: AqHomeDataServer_HandleAnnounceValue(o, ep, msg, tagList); break;
case AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ: AqHomeDataServer_HandleGetDevices(o, ep, msg, tagList); break;
case AQH_MSGTYPE_IPC_DATA_MODDEVICE_REQ: AqHomeDataServer_HandleModDevice(o, ep, msg, tagList); break;
default: break;
}
} }
} else {
else { DBG_ERROR(NULL, "Invalid IPC protocol %d (%02x)", protoId, protoId);
DBG_ERROR(NULL, "Invalid IPC protocol %d (%02x)", protoId, protoId); }
GWEN_Tag16_List_free(tagList);
} }
} }

View File

@@ -43,6 +43,7 @@
server.h server.h
server_p.h server_p.h
s_publish.h s_publish.h
s_setdata.h
</headers> </headers>
<sources> <sources>
@@ -54,6 +55,7 @@
c_setdata.c c_setdata.c
server.c server.c
s_publish.c s_publish.c
s_setdata.c
</sources> </sources>
<useTargets> <useTargets>

View File

@@ -191,7 +191,7 @@ void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop)
break; break;
} }
} /* while */ } /* while */
DBG_ERROR(NULL, "Leaving server");
rv=AQH_MqttLogServer_SaveRuntimeDeviceFiles(aqh); rv=AQH_MqttLogServer_SaveRuntimeDeviceFiles(aqh);
if (rv<0) { if (rv<0) {

View File

@@ -214,19 +214,27 @@ void _sendMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device, const AQ
DBG_ERROR(NULL, "Invalid value received from MQTT server (%s)", rcvdValue?rcvdValue:"<empty>"); DBG_ERROR(NULL, "Invalid value received from MQTT server (%s)", rcvdValue?rcvdValue:"<empty>");
} }
else { else {
AQH_MESSAGE *pubMsg;
uint64_t now; uint64_t now;
AQH_VALUE *msgValue; AQH_VALUE *msgValue;
now=(uint64_t) time(NULL); now=(uint64_t) time(NULL);
msgValue=_mkMessageValue(device, value); msgValue=_mkMessageValue(device, value);
pubMsg=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, if (xo->brokerEndpoint) {
AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, AQH_MESSAGE *pubMsg;
msgValue, now, f);
DBG_INFO(AQH_LOGDOMAIN, "BROKER UPDATE_DATA %s/%s: %f", pubMsg=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA,
deviceName?deviceName:"<no device name>", AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0,
AQH_Value_GetName(msgValue), f); msgValue, now, f);
AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg); DBG_INFO(AQH_LOGDOMAIN, "BROKER UPDATE_DATA %s/%s: %f",
deviceName?deviceName:"<no device name>",
AQH_Value_GetName(msgValue), f);
AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg);
}
else {
DBG_INFO(AQH_LOGDOMAIN, "Skipping BROKER UPDATE_DATA %s/%s: %f",
deviceName?deviceName:"<no device name>",
AQH_Value_GetName(msgValue), f);
}
AQH_Value_free(msgValue); AQH_Value_free(msgValue);
} }
} }
@@ -269,14 +277,19 @@ void _sendAnnounceValueMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *dev
AQH_VALUE *msgValue; AQH_VALUE *msgValue;
msgValue=_mkMessageValue(device, value); msgValue=_mkMessageValue(device, value);
pubMsg=AQH_IpcdMessageValues_newForOne(AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE, if (xo->brokerEndpoint) {
AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, pubMsg=AQH_IpcdMessageValues_newForOne(AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE,
0, msgValue); AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0,
if (pubMsg) { 0, msgValue);
DBG_INFO(AQH_LOGDOMAIN, "BROKER ANNOUNCE_VALUE %s", AQH_Value_GetName(msgValue)); if (pubMsg) {
AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg); DBG_INFO(AQH_LOGDOMAIN, "BROKER ANNOUNCE_VALUE %s", AQH_Value_GetName(msgValue));
AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg);
}
AQH_Value_free(msgValue);
}
else {
DBG_INFO(AQH_LOGDOMAIN, "Ignoring BROKER ANNOUNCE_VALUE %s", AQH_Value_GetName(msgValue));
} }
AQH_Value_free(msgValue);
} }

View File

@@ -0,0 +1,171 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./s_setdata.h"
#include "./server_p.h"
#include "aqhome/data/value.h"
#include "aqhome/msg/ipc/data/m_ipcd_setdata.h"
#include "aqhome/msg/mqtt/m_mqtt_publish.h"
#include "aqhome/ipc2/endpoint.h"
#include <gwenhywfar/debug.h>
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static void _sendDataForDevice(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device,
const char *valueName, const char *valueData);
static void _sendValueToMqtt(AQH_MQTTLOG_SERVER *xo, const char *deviceId,
const AQHMQTT_TOPIC *topic, const char *valueData);
static GWEN_BUFFER *_createBufferForTopic(const char *deviceId, const AQHMQTT_TOPIC *topic);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
void AQH_MqttLogServer_HandleSetData(AQH_OBJECT *o,
const AQH_MESSAGE *msg,
const GWEN_TAG16_LIST *tagList)
{
if (o && msg) {
AQH_MQTTLOG_SERVER *xo;
xo=AQH_MqttLogServer_GetServerData(o);
if (xo) {
AQH_VALUE *recvdValue;
DBG_ERROR(NULL, "Received SETDATA request");
recvdValue=AQH_IpcdMessageSetData_ReadValue(tagList);
if (recvdValue) {
const char *valueName;
const char *deviceName;
valueName=recvdValue?AQH_Value_GetName(recvdValue):NULL;
deviceName=recvdValue?AQH_Value_GetDeviceName(recvdValue):NULL;
if (valueName && deviceName) {
AQHMQTT_DEVICE *device;
device=AQH_MqttLogServer_FindRegisteredDevice(o, deviceName);
if (device) {
char *valueDataFreeable;
DBG_ERROR(NULL, "Sending data to value \"%s\" of device \"%s\"", valueName, deviceName);
valueDataFreeable=AQH_IpcdMessageSetData_ReadData(tagList);
_sendDataForDevice(xo, device, valueName, valueDataFreeable);
free(valueDataFreeable);
}
else {
DBG_ERROR(NULL, "Device \"%s\" not found", deviceName);
AQH_MqttLogServer_DumpRegisteredDevices(o);
}
}
else {
DBG_ERROR(NULL, "Either value name or device name missing in request");
}
AQH_Value_free(recvdValue);
}
else {
DBG_ERROR(NULL, "Request does not contain a value object");
}
}
}
}
void _sendDataForDevice(AQH_MQTTLOG_SERVER *xo,
const AQHMQTT_DEVICE *device,
const char *valueName, const char *valueData)
{
const char *deviceId;
deviceId=AQHMQTT_Device_GetId(device);
if (deviceId && *deviceId) {
AQHMQTT_TOPIC_LIST *topicList;
topicList=AQHMQTT_Device_GetTopicList(device);
if (topicList) {
AQHMQTT_TOPIC *topic;
topic=AQHMQTT_Topic_List_First(topicList);
while(topic) {
if (AQHMQTT_Topic_GetDirection(topic)==AQHMQTT_TopicDir_Out) {
AQHMQTT_VALUE_LIST *valueList;
AQHMQTT_VALUE *value;
valueList=AQHMQTT_Topic_GetValueList(topic);
value=valueList?AQHMQTT_Value_List_GetByName(valueList, valueName):NULL;
if (value) {
/* found value, create publish msg, send */
DBG_ERROR(NULL, "Topic \"%s\" contains value \"%s\"", AQHMQTT_Topic_GetName(topic), valueName);
_sendValueToMqtt(xo, deviceId, topic, valueData);
}
} /* if out */
topic=AQHMQTT_Topic_List_Next(topic);
} /* while topic */
}
}
else {
DBG_ERROR(NULL, "Device has no id");
}
}
void _sendValueToMqtt(AQH_MQTTLOG_SERVER *xo, const char *deviceId, const AQHMQTT_TOPIC *topic, const char *valueData)
{
GWEN_BUFFER *buf;
AQH_MESSAGE *msgOut;
buf=_createBufferForTopic(deviceId, topic);
DBG_ERROR(NULL, "MQTT PUBLISH: %s = %s", GWEN_Buffer_GetStart(buf), valueData?valueData:"<empty>");
msgOut=AQH_MqttMessagePublish_new(0, 0, GWEN_Buffer_GetStart(buf),
(const uint8_t*) (valueData?valueData:NULL),
valueData?strlen(valueData):0);
if (msgOut)
AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msgOut);
else {
DBG_ERROR(NULL, "Error creating message");
}
GWEN_Buffer_free(buf);
}
GWEN_BUFFER *_createBufferForTopic(const char *deviceId, const AQHMQTT_TOPIC *topic)
{
GWEN_BUFFER *buf;
const char *s;
buf=GWEN_Buffer_new(0, 256, 0, 1);
s=AQHMQTT_Topic_GetBeforeId(topic);
if (s && *s)
GWEN_Buffer_AppendString(buf, s);
GWEN_Buffer_AppendString(buf, deviceId);
s=AQHMQTT_Topic_GetAfterId(topic);
if (s && *s)
GWEN_Buffer_AppendString(buf, s);
return buf;
}

View File

@@ -0,0 +1,27 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQHOMEMQTT_S_SETDATA_H
#define AQHOMEMQTT_S_SETDATA_H
#include "./aqhome_mqtt.h"
#include <aqhome/events2/object.h>
#include <aqhome/ipc2/message.h>
#include <gwenhywfar/tag16.h>
void AQH_MqttLogServer_HandleSetData(AQH_OBJECT *o, const AQH_MESSAGE *recvdMsg, const GWEN_TAG16_LIST *tagList);
#endif

View File

@@ -12,6 +12,7 @@
#include "./server_p.h" #include "./server_p.h"
#include "./s_publish.h" #include "./s_publish.h"
#include "./s_setdata.h"
#include "./xmlread.h" #include "./xmlread.h"
#include "./xmlwrite.h" #include "./xmlwrite.h"
@@ -25,6 +26,7 @@
#include <aqhome/msg/ipc/m_ipc.h> #include <aqhome/msg/ipc/m_ipc.h>
#include <aqhome/msg/ipc/m_ipc_result.h> #include <aqhome/msg/ipc/m_ipc_result.h>
#include <aqhome/msg/ipc/m_ipc_connect.h> #include <aqhome/msg/ipc/m_ipc_connect.h>
#include <aqhome/msg/ipc/m_ipc_tag16.h>
#include <aqhome/msg/ipc/data/m_ipcd.h> #include <aqhome/msg/ipc/data/m_ipcd.h>
#include <aqhome/msg/ipc/data/m_ipcd_multidata.h> #include <aqhome/msg/ipc/data/m_ipcd_multidata.h>
#include <aqhome/ipc2/mqtt_endpoint.h> #include <aqhome/ipc2/mqtt_endpoint.h>
@@ -657,21 +659,26 @@ void AQH_MqttLogServer_HandleBrokerMsgs(AQH_OBJECT *o)
void _handleMsgFromBroker(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) void _handleMsgFromBroker(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg)
{ {
uint16_t code; GWEN_TAG16_LIST *tagList;
uint8_t protoId;
/* exec IPC message */ tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0);
code=AQH_IpcMessage_GetCode(msg); if (tagList) {
protoId=AQH_IpcMessage_GetProtoId(msg); uint16_t code;
if (protoId==AQH_IPC_PROTOCOL_DATA_ID) { uint8_t protoId;
DBG_ERROR(NULL, "Received IPC packet %d (%x)", (int) code, code);
switch(code) { code=AQH_IpcMessage_GetCode(msg);
// case AQH_MSGTYPE_IPC_DATA_SETDATA: AQH_MqttLogServer_HandleSetData(o, ep, msg); break; protoId=AQH_IpcMessage_GetProtoId(msg);
default: break; if (protoId==AQH_IPC_PROTOCOL_DATA_ID) {
DBG_ERROR(NULL, "Received IPC packet %d (%x)", (int) code, code);
switch(code) {
case AQH_MSGTYPE_IPC_DATA_SETDATA: AQH_MqttLogServer_HandleSetData(o, msg, tagList); break;
default: break;
}
} }
} else {
else { DBG_ERROR(NULL, "Invalid IPC protocol %d (%02x)", protoId, protoId);
DBG_ERROR(NULL, "Invalid IPC protocol %d (%02x)", protoId, protoId); }
GWEN_Tag16_List_free(tagList);
} }
} }

View File

@@ -567,6 +567,7 @@ int _startBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo)
AQH_Endpoint_SetServiceName(ep, xo->brokerClientId); AQH_Endpoint_SetServiceName(ep, xo->brokerClientId);
AQH_Object_AddLink(ep, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_NODE_SERVER_SLOT_BROKERCLOSED, o); AQH_Object_AddLink(ep, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_NODE_SERVER_SLOT_BROKERCLOSED, o);
AQH_Object_Enable(ep); AQH_Object_Enable(ep);
xo->brokerEndpoint=ep;
rv=_exchangeConnect(o, xo, 0); rv=_exchangeConnect(o, xo, 0);
if (rv!=0) { if (rv!=0) {
@@ -574,7 +575,6 @@ int _startBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo)
return (rv<0)?rv:GWEN_ERROR_PERMISSIONS; return (rv<0)?rv:GWEN_ERROR_PERMISSIONS;
} }
DBG_ERROR(NULL, "Connected to broker at %s:%d", xo->brokerAddress, xo->brokerPort); DBG_ERROR(NULL, "Connected to broker at %s:%d", xo->brokerAddress, xo->brokerPort);
xo->brokerEndpoint=ep;
return 0; return 0;
} }
else { else {
@@ -907,7 +907,6 @@ void _publishInt(AQH_NODE_SERVER *xo, uint32_t uid, const char *vPath, int vModa
void _publishDouble(AQH_NODE_SERVER *xo, uint32_t uid, const char *vPath, int vModality, const char *vUnits, double v) void _publishDouble(AQH_NODE_SERVER *xo, uint32_t uid, const char *vPath, int vModality, const char *vUnits, double v)
{ {
AQH_MESSAGE *pubMsg;
union {double f; uint64_t i;} u; union {double f; uint64_t i;} u;
uint64_t arrayToSend[2]; uint64_t arrayToSend[2];
AQH_VALUE *value; AQH_VALUE *value;
@@ -923,17 +922,29 @@ void _publishDouble(AQH_NODE_SERVER *xo, uint32_t uid, const char *vPath, int vM
AQH_Value_SetValueType(value, AQH_ValueType_Sensor); AQH_Value_SetValueType(value, AQH_ValueType_Sensor);
AQH_Value_SetModality(value, vModality); AQH_Value_SetModality(value, vModality);
pubMsg=AQH_IpcdMessageMultiData_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, if (xo->brokerEndpoint) {
AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, AQH_MESSAGE *pubMsg;
value, arrayToSend, 1);
if (pubMsg) { pubMsg=AQH_IpcdMessageMultiData_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA,
AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0,
value, arrayToSend, 1);
if (pubMsg) {
DBG_ERROR(AQH_LOGDOMAIN,
"BROKER PUBLISH %s(%s/%s): %f",
AQH_Value_GetName(value),
AQH_Value_GetDeviceName(value),
AQH_Value_GetName(value),
v);
AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg);
}
}
else {
DBG_ERROR(AQH_LOGDOMAIN, DBG_ERROR(AQH_LOGDOMAIN,
"BROKER PUBLISH %s(%s/%s): %f", "Skipping BROKER PUBLISH %s(%s/%s): %f (no broker connection)",
AQH_Value_GetName(value), AQH_Value_GetName(value),
AQH_Value_GetDeviceName(value), AQH_Value_GetDeviceName(value),
AQH_Value_GetName(value), AQH_Value_GetName(value),
v); v);
AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg);
} }
AQH_Value_free(value); AQH_Value_free(value);
} }

View File

@@ -169,7 +169,7 @@ int AQH_FdObject_Read(AQH_OBJECT *o, uint8_t *ptrBuffer, uint32_t lenBuffer)
} }
else if (rv>0) { else if (rv>0) {
/* data received */ /* data received */
DBG_INFO(AQH_LOGDOMAIN, "Received %d bytes", (int) rv); DBG_DEBUG(AQH_LOGDOMAIN, "Received %d bytes", (int) rv);
return (int) rv; return (int) rv;
} }
else { else {

View File

@@ -390,6 +390,7 @@ AQH_MESSAGE *AQH_Endpoint_GetNextMsgOut(AQH_OBJECT *o)
void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg) void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg)
{ {
assert(o);
if (o && msg) { if (o && msg) {
AQH_ENDPOINT *xo; AQH_ENDPOINT *xo;
@@ -456,7 +457,7 @@ void AQH_Endpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg)
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo) { if (xo) {
AQH_Message_List_Add(msg, xo->msgInList); AQH_Message_List_Add(msg, xo->msgInList);
DBG_INFO(AQH_LOGDOMAIN, "now %d msgs in list", AQH_Message_List_GetCount(xo->msgInList)); DBG_DEBUG(AQH_LOGDOMAIN, "now %d msgs in list", AQH_Message_List_GetCount(xo->msgInList));
} }
} }
} }
@@ -510,7 +511,7 @@ int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr)
{ {
AQH_MESSAGE *msg; AQH_MESSAGE *msg;
DBG_INFO(AQH_LOGDOMAIN, "Msg received:"); DBG_DEBUG(AQH_LOGDOMAIN, "Msg received:");
/*GWEN_Text_LogString((const char*) msgPtr, msgLen, AQH_LOGDOMAIN, GWEN_LoggerLevel_Error);*/ /*GWEN_Text_LogString((const char*) msgPtr, msgLen, AQH_LOGDOMAIN, GWEN_LoggerLevel_Error);*/
msg=AQH_NodeMessage_fromBuffer(msgPtr, msgLen); msg=AQH_NodeMessage_fromBuffer(msgPtr, msgLen);
AQH_Endpoint_AddMsgIn(o, msg); AQH_Endpoint_AddMsgIn(o, msg);
@@ -522,7 +523,7 @@ int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr)
int _handleMsgSent(AQH_OBJECT *o) int _handleMsgSent(AQH_OBJECT *o)
{ {
DBG_INFO(AQH_LOGDOMAIN, "Msg sent"); DBG_DEBUG(AQH_LOGDOMAIN, "Msg sent");
if (o) { if (o) {
AQH_ENDPOINT *xo; AQH_ENDPOINT *xo;
@@ -540,7 +541,7 @@ int _handleMsgSent(AQH_OBJECT *o)
/* get next message in list */ /* get next message in list */
msg=AQH_Message_List_First(xo->msgOutList); msg=AQH_Message_List_First(xo->msgOutList);
if (msg) { if (msg) {
DBG_INFO(AQH_LOGDOMAIN, "Sending next message"); DBG_DEBUG(AQH_LOGDOMAIN, "Sending next message");
AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg)); AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg));
} }
else { else {

View File

@@ -64,7 +64,7 @@ int _readMsg(AQH_OBJECT *o)
int rv; int rv;
if (xo->bytesReceived<AQH_MSG_READER_HEADER_SIZE) { if (xo->bytesReceived<AQH_MSG_READER_HEADER_SIZE) {
DBG_INFO(AQH_LOGDOMAIN, "Reading header"); DBG_DEBUG(AQH_LOGDOMAIN, "Reading header");
rv=_readHeaderFromRingbuffer(xo); rv=_readHeaderFromRingbuffer(xo);
if (rv<0) { if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
@@ -73,7 +73,7 @@ int _readMsg(AQH_OBJECT *o)
} }
if (xo->bytesReceived>=AQH_MSG_READER_HEADER_SIZE) { if (xo->bytesReceived>=AQH_MSG_READER_HEADER_SIZE) {
DBG_INFO(AQH_LOGDOMAIN, "Reading body"); DBG_DEBUG(AQH_LOGDOMAIN, "Reading body");
/* reading remainder of msg directly into allocated buffer */ /* reading remainder of msg directly into allocated buffer */
rv=AQH_MsgReader_ReadRemainderFromRingbuffer(o); rv=AQH_MsgReader_ReadRemainderFromRingbuffer(o);
if (rv<0) { if (rv<0) {
@@ -122,7 +122,7 @@ int _readHeaderFromRingbuffer(AQH_MSG_READER *xo)
xferSize=remaining; xferSize=remaining;
rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->headerBuffer+xo->bytesReceived), &xferSize); rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->headerBuffer+xo->bytesReceived), &xferSize);
if (rv<0) { if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer empty"); DBG_DEBUG(AQH_LOGDOMAIN, "Ringbuffer empty");
return 0; return 0;
} }
if (xferSize<remaining) { if (xferSize<remaining) {

View File

@@ -110,7 +110,6 @@ void AQH_MsgReader_SetFlags(AQH_OBJECT *o, uint32_t f)
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo) { if (xo) {
DBG_ERROR(AQH_LOGDOMAIN, "Set flags: %08x", f);
xo->flags=f; xo->flags=f;
} }
} }
@@ -123,7 +122,6 @@ void AQH_MsgReader_AddFlags(AQH_OBJECT *o, uint32_t f)
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo) { if (xo) {
DBG_ERROR(AQH_LOGDOMAIN, "Adding flags: %08x", f);
xo->flags|=f; xo->flags|=f;
} }
} }
@@ -136,7 +134,6 @@ void AQH_MsgReader_SubFlags(AQH_OBJECT *o, uint32_t f)
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo) { if (xo) {
DBG_ERROR(AQH_LOGDOMAIN, "Clearing flags: %08x", f);
xo->flags&=~f; xo->flags&=~f;
} }
} }
@@ -207,7 +204,7 @@ int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject)
{ {
AQH_MSG_READER *xo; AQH_MSG_READER *xo;
DBG_INFO(AQH_LOGDOMAIN, "Socket ready"); DBG_DEBUG(AQH_LOGDOMAIN, "Socket ready");
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo) { if (xo) {
int rv; int rv;
@@ -314,7 +311,7 @@ int _fillRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo, AQH_OBJECT *fdObject)
rv=AQH_FdObject_Read(fdObject, (uint8_t*) GWEN_RingBuffer_GetWritePointer(xo->ringBuffer), len); rv=AQH_FdObject_Read(fdObject, (uint8_t*) GWEN_RingBuffer_GetWritePointer(xo->ringBuffer), len);
if (rv<0) { if (rv<0) {
if (rv!=GWEN_ERROR_TRY_AGAIN) { if (rv!=GWEN_ERROR_TRY_AGAIN) {
DBG_ERROR(AQH_LOGDOMAIN, "here (%d)", rv); DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
} }
return rv; return rv;
} }
@@ -330,8 +327,7 @@ int _fillRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo, AQH_OBJECT *fdObject)
} }
} }
else { else {
DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer full"); DBG_DEBUG(AQH_LOGDOMAIN, "Ringbuffer full");
/*return GWEN_ERROR_BUFFER_OVERFLOW;*/
return 0; return 0;
} }
} }
@@ -347,7 +343,7 @@ int AQH_MsgReader_ReadRemainderFromRingbuffer(AQH_OBJECT *o)
if (xo) { if (xo) {
if (xo->bytesLeft==0) { if (xo->bytesLeft==0) {
/* msg finished */ /* msg finished */
DBG_INFO(AQH_LOGDOMAIN, "Message complete"); DBG_DEBUG(AQH_LOGDOMAIN, "Message complete");
return 1; return 1;
} }
else { else {
@@ -367,7 +363,7 @@ int AQH_MsgReader_ReadRemainderFromRingbuffer(AQH_OBJECT *o)
xferSize=bytesToRead; xferSize=bytesToRead;
rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->currentMsgBuf+xo->bytesReceived), &xferSize); rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->currentMsgBuf+xo->bytesReceived), &xferSize);
if (rv<0) { if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer empty"); DBG_DEBUG(AQH_LOGDOMAIN, "Ringbuffer empty");
return 0; return 0;
} }
if (xferSize<bytesToRead) { if (xferSize<bytesToRead) {
@@ -378,7 +374,7 @@ int AQH_MsgReader_ReadRemainderFromRingbuffer(AQH_OBJECT *o)
xo->bytesLeft-=xferSize; xo->bytesLeft-=xferSize;
if (xo->bytesLeft==0) { if (xo->bytesLeft==0) {
/* msg finished */ /* msg finished */
DBG_INFO(AQH_LOGDOMAIN, "Message complete"); DBG_DEBUG(AQH_LOGDOMAIN, "Message complete");
return 1; return 1;
} }
} }

View File

@@ -129,7 +129,7 @@ int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject)
{ {
AQH_MSG_WRITER *xo; AQH_MSG_WRITER *xo;
DBG_INFO(AQH_LOGDOMAIN, "Socket ready"); DBG_DEBUG(AQH_LOGDOMAIN, "Socket ready");
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o);
if (xo) { if (xo) {
int rv; int rv;