adapted to latest changes in gwen, more work on data and nodes servers.

This commit is contained in:
Martin Preuss
2024-09-26 10:45:22 +02:00
parent be053b035f
commit b0b6efb1c3
88 changed files with 1745 additions and 445 deletions

View File

@@ -62,7 +62,9 @@ void AqHomeData_HandleAddValue(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG
AQH_Value_free(recvdValue);
}
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT,
GWEN_MsgEndpoint_GetNextMessageId(ep), GWEN_IpcMsg_GetMsgId(msg),
resultCode);
GWEN_MsgEndpoint_AddSendMessage(ep, outMsg);
}

View File

@@ -83,7 +83,9 @@ void AqHomeData_HandleConnect(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG
free(userId);
free(clientId);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT,
GWEN_MsgEndpoint_GetNextMessageId(ep), GWEN_IpcMsg_GetMsgId(msg),
resultCode);
GWEN_MsgEndpoint_AddSendMessage(ep, outMsg);
}

View File

@@ -40,11 +40,13 @@
static int _getAndSendDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep,
const AQH_VALUE *value,
uint64_t tsBegin, uint64_t tsEnd, uint64_t num);
static int _getAndSendDataPointsNoNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd);
static int _getAndSendDataPointsWithNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t num);
static void _sendDataPointsResponse(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, const uint64_t *tablePtr);
static void _getAndSendLastDatapoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value);
uint64_t tsBegin, uint64_t tsEnd, uint64_t num, uint32_t refMsgId);
static int _getAndSendDataPointsNoNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd,
uint32_t refMsgId);
static int _getAndSendDataPointsWithNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t num, uint32_t refMsgId);
static void _sendDataPointsResponse(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, const uint64_t *tablePtr,
uint32_t refMsgId);
static void _getAndSendLastDatapoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint32_t refMsgId);
@@ -74,7 +76,7 @@ void AqHomeData_HandleGetDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWE
value=AQH_Storage_GetValueByNameForSystem(aqh->storage, valueName);
if (value) {
resultCode=_getAndSendDataPoints(aqh, ep, value, tsBegin, tsEnd, numRequested);
resultCode=_getAndSendDataPoints(aqh, ep, value, tsBegin, tsEnd, numRequested, GWEN_IpcMsg_GetMsgId(recvdMsg));
if (resultCode==AQH_MSG_IPC_SUCCESS)
return;
}
@@ -89,27 +91,31 @@ void AqHomeData_HandleGetDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWE
resultCode=AQH_MSG_IPC_ERROR_PERMS;
}
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT,
GWEN_MsgEndpoint_GetNextMessageId(ep), GWEN_IpcMsg_GetMsgId(recvdMsg),
resultCode);
GWEN_MsgEndpoint_AddSendMessage(ep, outMsg);
}
int _getAndSendDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd, uint64_t num)
int _getAndSendDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd, uint64_t num,
uint32_t refMsgId)
{
if (num==0)
return _getAndSendDataPointsNoNum(aqh, ep, value, tsBegin, tsEnd);
return _getAndSendDataPointsNoNum(aqh, ep, value, tsBegin, tsEnd, refMsgId);
else if (num==1) {
_getAndSendLastDatapoint(aqh, ep, value);
_getAndSendLastDatapoint(aqh, ep, value, refMsgId);
return AQH_MSG_IPC_SUCCESS;
}
else
return _getAndSendDataPointsWithNum(aqh, ep, value, num);
return _getAndSendDataPointsWithNum(aqh, ep, value, num, refMsgId);
}
int _getAndSendDataPointsNoNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd)
int _getAndSendDataPointsNoNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd,
uint32_t refMsgId)
{
uint64_t valueId;
uint64_t *tablePtr;
@@ -117,7 +123,7 @@ int _getAndSendDataPointsNoNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQ
valueId=AQH_Value_GetId(value);
tablePtr=AQH_Storage_GetDataPoints(aqh->storage, valueId, tsBegin, tsEnd, AQHOMEDATA_HANDLEGETDATAPOINTS_MAXTABLEENTRIES);
if (tablePtr) {
_sendDataPointsResponse(aqh, ep, value, tablePtr);
_sendDataPointsResponse(aqh, ep, value, tablePtr, refMsgId);
free(tablePtr);
return AQH_MSG_IPC_SUCCESS;
}
@@ -129,7 +135,7 @@ int _getAndSendDataPointsNoNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQ
int _getAndSendDataPointsWithNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t num)
int _getAndSendDataPointsWithNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t num, uint32_t refMsgId)
{
uint64_t valueId;
uint64_t *tablePtr;
@@ -139,7 +145,7 @@ int _getAndSendDataPointsWithNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const
valueId=AQH_Value_GetId(value);
tablePtr=AQH_Storage_GetLastNDataPoints(aqh->storage, valueId, num);
if (tablePtr) {
_sendDataPointsResponse(aqh, ep, value, tablePtr);
_sendDataPointsResponse(aqh, ep, value, tablePtr, refMsgId);
free(tablePtr);
return AQH_MSG_IPC_SUCCESS;
}
@@ -151,7 +157,8 @@ int _getAndSendDataPointsWithNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const
void _sendDataPointsResponse(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, const uint64_t *tablePtr)
void _sendDataPointsResponse(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, const uint64_t *tablePtr,
uint32_t refMsgId)
{
int numTableEntries;
int numDataPoints;
@@ -159,13 +166,15 @@ void _sendDataPointsResponse(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_
numTableEntries=(int)(tablePtr[0]);
numDataPoints=numTableEntries/2;
outMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETDATA_RSP, value, &(tablePtr[1]), numDataPoints);
outMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETDATA_RSP,
GWEN_MsgEndpoint_GetNextMessageId(ep), refMsgId,
value, &(tablePtr[1]), numDataPoints);
GWEN_MsgEndpoint_AddSendMessage(ep, outMsg);
}
void _getAndSendLastDatapoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value)
void _getAndSendLastDatapoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint32_t refMsgId)
{
GWEN_MSG *outMsg;
int resultCode=AQH_MSG_IPC_SUCCESS;
@@ -182,12 +191,16 @@ void _getAndSendLastDatapoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH
}
}
else {
outMsg=AQH_MultiDataDataIpcMsg_newForOne(AQH_MSGTYPE_IPC_DATA_GETDATA_RSP, value, timestamp, data);
outMsg=AQH_MultiDataDataIpcMsg_newForOne(AQH_MSGTYPE_IPC_DATA_GETDATA_RSP,
GWEN_MsgEndpoint_GetNextMessageId(ep), refMsgId,
value, timestamp, data);
GWEN_MsgEndpoint_AddSendMessage(ep, outMsg);
return;
}
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT,
GWEN_MsgEndpoint_GetNextMessageId(ep), refMsgId,
resultCode);
GWEN_MsgEndpoint_AddSendMessage(ep, outMsg);
}

View File

@@ -35,7 +35,7 @@
* ------------------------------------------------------------------------------------------------
*/
static void _sendDeviceList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_DEVICE_LIST *vl, uint32_t flags);
static void _sendDeviceList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_DEVICE_LIST *vl, uint32_t flags, uint32_t refMsgId);
@@ -54,7 +54,7 @@ void AqHomeData_HandleGetDevices(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const
DBG_INFO(NULL, "Have a list of %d devices", AQH_Device_List_GetCount(origDeviceList));
if (AQH_Device_List_GetCount(origDeviceList)<AQHOMEDATA_DEVICESPERMSG) {
DBG_INFO(NULL, "Sending all entries in one message");
_sendDeviceList(aqh, ep, origDeviceList, AQH_MSGDATA_DEVICES_FLAGS_LASTMSG);
_sendDeviceList(aqh, ep, origDeviceList, AQH_MSGDATA_DEVICES_FLAGS_LASTMSG, GWEN_IpcMsg_GetMsgId(msg));
}
else {
AQH_DEVICE_LIST *tmpDeviceList;
@@ -72,31 +72,33 @@ void AqHomeData_HandleGetDevices(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const
AQH_Device_List_Add(copyOfDevice, tmpDeviceList);
if (AQH_Device_List_GetCount(tmpDeviceList)>=AQHOMEDATA_DEVICESPERMSG) {
DBG_INFO(NULL, "Sending %d devices", AQH_Device_List_GetCount(tmpDeviceList));
_sendDeviceList(aqh, ep, tmpDeviceList, next?0:AQH_MSGDATA_DEVICES_FLAGS_LASTMSG);
_sendDeviceList(aqh, ep, tmpDeviceList, next?0:AQH_MSGDATA_DEVICES_FLAGS_LASTMSG, GWEN_IpcMsg_GetMsgId(msg));
AQH_Device_List_Clear(tmpDeviceList);
}
v=next;
}
if (AQH_Device_List_GetCount(tmpDeviceList)) {
DBG_INFO(NULL, "Sending %d devices", AQH_Device_List_GetCount(tmpDeviceList));
_sendDeviceList(aqh, ep, tmpDeviceList, AQH_MSGDATA_DEVICES_FLAGS_LASTMSG); /* send remaining */
_sendDeviceList(aqh, ep, tmpDeviceList, AQH_MSGDATA_DEVICES_FLAGS_LASTMSG, GWEN_IpcMsg_GetMsgId(msg)); /* send remaining */
}
AQH_Device_List_free(tmpDeviceList);
}
}
else {
/* empty list */
_sendDeviceList(aqh, ep, NULL, AQH_MSGDATA_DEVICES_FLAGS_LASTMSG);
_sendDeviceList(aqh, ep, NULL, AQH_MSGDATA_DEVICES_FLAGS_LASTMSG, GWEN_IpcMsg_GetMsgId(msg));
}
}
void _sendDeviceList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_DEVICE_LIST *vl, uint32_t flags)
void _sendDeviceList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_DEVICE_LIST *vl, uint32_t flags, uint32_t refMsgId)
{
GWEN_MSG *msg;
msg=AQH_DevicesDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP, flags, vl);
msg=AQH_DevicesDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP,
GWEN_MsgEndpoint_GetNextMessageId(ep), refMsgId,
flags, vl);
GWEN_MsgEndpoint_AddSendMessage(ep, msg);
}

View File

@@ -73,7 +73,9 @@ void AqHomeData_HandleGetLastDataPoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep,
}
}
else {
outMsg=AQH_MultiDataDataIpcMsg_newForOne(AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP, storedValue, timestamp, data);
outMsg=AQH_MultiDataDataIpcMsg_newForOne(AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP,
GWEN_MsgEndpoint_GetNextMessageId(ep), GWEN_IpcMsg_GetMsgId(recvdMsg),
storedValue, timestamp, data);
GWEN_MsgEndpoint_AddSendMessage(ep, outMsg);
free(valueName);
return;
@@ -95,7 +97,9 @@ void AqHomeData_HandleGetLastDataPoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep,
resultCode=AQH_MSG_IPC_ERROR_PERMS;
}
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT,
GWEN_MsgEndpoint_GetNextMessageId(ep), GWEN_IpcMsg_GetMsgId(recvdMsg),
resultCode);
GWEN_MsgEndpoint_AddSendMessage(ep, outMsg);
}

View File

@@ -35,7 +35,7 @@
* ------------------------------------------------------------------------------------------------
*/
static void _sendValueList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE_LIST *vl, uint32_t flags);
static void _sendValueList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE_LIST *vl, uint32_t flags, uint32_t refMsgId);
@@ -54,7 +54,7 @@ void AqHomeData_HandleGetValues(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const G
DBG_INFO(NULL, "Have a list of %d values", AQH_Value_List_GetCount(origValueList));
if (AQH_Value_List_GetCount(origValueList)<AQHOMEDATA_VALUESPERMSG) {
DBG_INFO(NULL, "Sending all entries in one message");
_sendValueList(aqh, ep, origValueList, AQH_MSGDATA_VALUES_FLAGS_LASTMSG);
_sendValueList(aqh, ep, origValueList, AQH_MSGDATA_VALUES_FLAGS_LASTMSG, GWEN_IpcMsg_GetMsgId(msg));
}
else {
AQH_VALUE_LIST *tmpValueList;
@@ -72,31 +72,33 @@ void AqHomeData_HandleGetValues(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const G
AQH_Value_List_Add(copyOfValue, tmpValueList);
if (AQH_Value_List_GetCount(tmpValueList)>=AQHOMEDATA_VALUESPERMSG) {
DBG_INFO(NULL, "Sending %d values", AQH_Value_List_GetCount(tmpValueList));
_sendValueList(aqh, ep, tmpValueList, next?0:AQH_MSGDATA_VALUES_FLAGS_LASTMSG);
_sendValueList(aqh, ep, tmpValueList, next?0:AQH_MSGDATA_VALUES_FLAGS_LASTMSG, GWEN_IpcMsg_GetMsgId(msg));
AQH_Value_List_Clear(tmpValueList);
}
v=next;
}
if (AQH_Value_List_GetCount(tmpValueList)) {
DBG_INFO(NULL, "Sending %d values", AQH_Value_List_GetCount(tmpValueList));
_sendValueList(aqh, ep, tmpValueList, AQH_MSGDATA_VALUES_FLAGS_LASTMSG); /* send remaining */
_sendValueList(aqh, ep, tmpValueList, AQH_MSGDATA_VALUES_FLAGS_LASTMSG, GWEN_IpcMsg_GetMsgId(msg)); /* send remaining */
}
AQH_Value_List_free(tmpValueList);
}
}
else {
/* empty list */
_sendValueList(aqh, ep, NULL, AQH_MSGDATA_VALUES_FLAGS_LASTMSG);
_sendValueList(aqh, ep, NULL, AQH_MSGDATA_VALUES_FLAGS_LASTMSG, GWEN_IpcMsg_GetMsgId(msg));
}
}
void _sendValueList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE_LIST *vl, uint32_t flags)
void _sendValueList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE_LIST *vl, uint32_t flags, uint32_t refMsgId)
{
GWEN_MSG *msg;
msg=AQH_ValuesDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP, flags, vl);
msg=AQH_ValuesDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP,
GWEN_MsgEndpoint_GetNextMessageId(ep), refMsgId,
flags, vl);
GWEN_MsgEndpoint_AddSendMessage(ep, msg);
}

View File

@@ -104,7 +104,9 @@ void AqHomeData_HandleModDevice(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MS
resultCode=AQH_MSG_IPC_ERROR_PERMS;
}
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT,
GWEN_MsgEndpoint_GetNextMessageId(ep), GWEN_IpcMsg_GetMsgId(recvdMsg),
resultCode);
GWEN_MsgEndpoint_AddSendMessage(ep, outMsg);
}

View File

@@ -14,6 +14,7 @@
#include "./c_setdata.h"
#include "./aqhome_data_p.h"
#include "./loop.h"
#include "aqhome/aqhome.h"
#include "aqhome/ipc/data/ipc_data.h"
#include "aqhome/ipc/data/msg_data_set.h"
#include "aqhome/ipc/endpoint_ipc.h"
@@ -77,7 +78,9 @@ void AqHomeData_HandleSetData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epSrc, GWEN_M
AQH_Value_free(recvdValue);
free(valueDataFreeable);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT,
GWEN_MsgEndpoint_GetNextMessageId(epSrc), GWEN_IpcMsg_GetMsgId(recvdMsg),
resultCode);
GWEN_MsgEndpoint_AddSendMessage(epSrc, outMsg);
}
@@ -96,7 +99,9 @@ int _forwardDataToDriver(AQHOME_DATA *aqh, const AQH_VALUE *v, const char *data)
GWEN_MSG *driverMsg;
DBG_INFO(AQH_LOGDOMAIN, "Sending SETDATA msg to driver endpoint (%s)", GWEN_MsgEndpoint_GetName(ep));
driverMsg=AQH_SetDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_SETDATA, v, data);
driverMsg=AQH_SetDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_SETDATA,
GWEN_MsgEndpoint_GetNextMessageId(ep), 0,
v, data);
GWEN_MsgEndpoint_AddSendMessage(ep, driverMsg);
return AQH_MSG_IPC_SUCCESS;
}

View File

@@ -87,7 +87,9 @@ void AqHomeData_HandleUpdateData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_M
}
AQH_Value_free(recvdValue);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT,
GWEN_MsgEndpoint_GetNextMessageId(ep), GWEN_IpcMsg_GetMsgId(recvdMsg),
resultCode);
GWEN_MsgEndpoint_AddSendMessage(ep, outMsg);
}
@@ -131,7 +133,9 @@ void _sendDataChangedMsgToAllClients(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epSrc,
GWEN_MSG *msg;
DBG_DEBUG(AQH_LOGDOMAIN, "Sending update msg to endpoint %s", GWEN_MsgEndpoint_GetName(ep));
msg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_DATACHANGED, v, dataPoints, numValues);
msg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_DATACHANGED,
GWEN_MsgEndpoint_GetNextMessageId(ep), 0,
v, dataPoints, numValues);
GWEN_MsgEndpoint_AddSendMessage(ep, msg);
}
else {