diff --git a/apps/aqhome-data/s_getdatapoints.c b/apps/aqhome-data/s_getdatapoints.c index fd9993e..2c5d56d 100644 --- a/apps/aqhome-data/s_getdatapoints.c +++ b/apps/aqhome-data/s_getdatapoints.c @@ -138,10 +138,8 @@ int _getAndSendDataPointsPeriod(AQH_STORAGE *storage, AQH_OBJECT *ep, { uint64_t valueId; uint64_t *tablePtr; - + valueId=AQH_Value_GetId(value); - if (num==0 || num>AQHOMEDATA_HANDLEGETDATAPOINTS_MAXTABLEENTRIES) - num=AQHOMEDATA_HANDLEGETDATAPOINTS_MAXTABLEENTRIES; tablePtr=AQH_Storage_GetDataPoints(storage, valueId, tsBegin, tsEnd, num); if (tablePtr) { _sendDataPointsResponse(ep, value, tablePtr, refMsgId); @@ -163,8 +161,6 @@ int _getAndSendDataPointsLast(AQH_STORAGE *storage, AQH_OBJECT *ep, uint64_t valueId; uint64_t *tablePtr; - if (num>AQHOMEDATA_HANDLEGETDATAPOINTS_MAXDATAPOINTS) - num=AQHOMEDATA_HANDLEGETDATAPOINTS_MAXDATAPOINTS; valueId=AQH_Value_GetId(value); tablePtr=AQH_Storage_GetLastNDataPoints(storage, valueId, num); if (tablePtr) { @@ -187,8 +183,6 @@ int _getAndSendDataPointsFirst(AQH_STORAGE *storage, AQH_OBJECT *ep, uint64_t valueId; uint64_t *tablePtr; - if (num>AQHOMEDATA_HANDLEGETDATAPOINTS_MAXDATAPOINTS) - num=AQHOMEDATA_HANDLEGETDATAPOINTS_MAXDATAPOINTS; valueId=AQH_Value_GetId(value); tablePtr=AQH_Storage_GetFirstNDataPoints(storage, valueId, num); if (tablePtr) { @@ -210,14 +204,28 @@ void _sendDataPointsResponse(AQH_OBJECT *ep, { int numTableEntries; int numDataPoints; - AQH_MESSAGE *outMsg; numTableEntries=(int)(tablePtr[0]); numDataPoints=numTableEntries/2; - outMsg=AQH_IpcdMessageMultiData_new(AQH_MSGTYPE_IPC_DATA_GETDATA_RSP, - AQH_Endpoint_GetNextMessageId(ep), refMsgId, - value, &(tablePtr[1]), numDataPoints); - AQH_Endpoint_AddMsgOut(ep, outMsg); + tablePtr++; + + while(numDataPoints) { + AQH_MESSAGE *outMsg; + int toSend; + uint32_t flags=0; + + toSend=numDataPoints; + if (toSend>AQHOMEDATA_HANDLEGETDATAPOINTS_MAXDATAPOINTS) + toSend=AQHOMEDATA_HANDLEGETDATAPOINTS_MAXDATAPOINTS; + numDataPoints-=toSend; + if (numDataPoints==0) + flags|=AQH_MSGDATA_MULTIDATA_FLAGS_LASTMSG; + outMsg=AQH_IpcdMessageMultiData_new(AQH_MSGTYPE_IPC_DATA_GETDATA_RSP, + AQH_Endpoint_GetNextMessageId(ep), refMsgId, flags, + value, tablePtr, toSend); + tablePtr+=(toSend*2); + AQH_Endpoint_AddMsgOut(ep, outMsg); + } } diff --git a/apps/aqhome-data/s_updatedata.c b/apps/aqhome-data/s_updatedata.c index 671cd17..7b5ccde 100644 --- a/apps/aqhome-data/s_updatedata.c +++ b/apps/aqhome-data/s_updatedata.c @@ -152,7 +152,7 @@ void _sendDataChangedMsgToAllClients(AQHOME_SERVER *xo, AQH_OBJECT *epSrc, const DBG_DEBUG(AQH_LOGDOMAIN, "Sending update msg to endpoint"); msg=AQH_IpcdMessageMultiData_new(AQH_MSGTYPE_IPC_DATA_DATACHANGED, - AQH_Endpoint_GetNextMessageId(ep), 0, + AQH_Endpoint_GetNextMessageId(ep), 0, 0, v, dataPoints, numValues); AQH_Endpoint_AddMsgOut(ep, msg); } diff --git a/apps/aqhome-mqttlog/s_publish.c b/apps/aqhome-mqttlog/s_publish.c index 0de7bae..2b5c707 100644 --- a/apps/aqhome-mqttlog/s_publish.c +++ b/apps/aqhome-mqttlog/s_publish.c @@ -218,7 +218,7 @@ void _sendMessage(AQH_MQTTLOG_SERVER *xo, const AQHMQTT_DEVICE *device, const AQ AQH_MESSAGE *pubMsg; pubMsg=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, - AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, + AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, 0, msgValue, now, f); DBG_INFO(AQH_LOGDOMAIN, "BROKER UPDATE_DATA %s/%s: %f", deviceName?deviceName:"", diff --git a/apps/aqhome-nodes/server.c b/apps/aqhome-nodes/server.c index e4b6d1f..231be75 100644 --- a/apps/aqhome-nodes/server.c +++ b/apps/aqhome-nodes/server.c @@ -1009,7 +1009,7 @@ void _publishDouble(AQH_NODE_SERVER *xo, uint32_t uid, const char *vPath, int vM AQH_MESSAGE *pubMsg; pubMsg=AQH_IpcdMessageMultiData_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, - AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, + AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, 0, value, arrayToSend, 1); if (pubMsg) { DBG_INFO(AQH_LOGDOMAIN, diff --git a/apps/aqhome-tool/data/adddata.c b/apps/aqhome-tool/data/adddata.c index a748ad4..122906b 100644 --- a/apps/aqhome-tool/data/adddata.c +++ b/apps/aqhome-tool/data/adddata.c @@ -197,7 +197,7 @@ AQH_MESSAGE *_createRequestMessage(GWEN_UNUSED AQH_OBJECT *o, uint32_t msgId) AQH_Value_SetValueUnits(v, valueUnits); AQH_Value_SetDeviceName(v, deviceName); - msg=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgId, 0, v, timestampToSend, dataToSend); + msg=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgId, 0, 0, v, timestampToSend, dataToSend); AQH_Value_free(v); GWEN_JsonElement_free(jRoot); return msg; diff --git a/aqhome/data/storage.c b/aqhome/data/storage.c index 8315828..ad3ffd1 100644 --- a/aqhome/data/storage.c +++ b/aqhome/data/storage.c @@ -404,13 +404,14 @@ uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t uint64_t *arrayPtr; uint64_t i; + DBG_ERROR(NULL, "Requested %d entries", (int) maxDataPointsRequested); df=_getDataFileByValueId(sto, valueId); if (df==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "No file for value id %lu", (unsigned long int) valueId); return NULL; } numEntries=AQH_DataFile_GetNumberOfEntries(df); - if (maxDataPointsRequested>numEntries) + if (maxDataPointsRequested>numEntries || maxDataPointsRequested==0) maxDataPointsRequested=numEntries; arrayLen=(maxDataPointsRequested*2)+1; arrayPtr=(uint64_t*) malloc(arrayLen*sizeof(uint64_t)); @@ -435,7 +436,8 @@ uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t if ((fromTime==0 || ts>=fromTime) && (toTime==0 || ts<=toTime)) { if ((arrayPos+1)>arrayLen) { - DBG_INFO(AQH_LOGDOMAIN, "Limit for number of returned entries reached"); + DBG_INFO(AQH_LOGDOMAIN, "Limit for number of returned entries reached (%d, numEntries=%d)", + (int) arrayLen, (int) numEntries); break; } diff --git a/aqhome/dataclient/client.c b/aqhome/dataclient/client.c index d64526f..9b6336e 100644 --- a/aqhome/dataclient/client.c +++ b/aqhome/dataclient/client.c @@ -368,7 +368,7 @@ int AQH_DataClient_UpdateData(AQH_DATACLIENT *dc, const AQH_VALUE *v, uint64_t t uint32_t msgId; msgId=++(dc->lastMsgId); - msgOut=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgId, 0, v, timeStamp, dataPoint); + msgOut=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgId, 0, 0, v, timeStamp, dataPoint); AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut); return _handleResult(dc, msgId); @@ -461,7 +461,9 @@ uint64_t _handleDataResponses(AQH_DATACLIENT *dc, uint64_t *dataPtr, uint64_t ma if (code==AQH_MSGTYPE_IPC_DATA_GETDATA_RSP) { const uint64_t *recvDataPtr; uint64_t recvNumberOfPoints; + uint32_t flags; + flags=AQH_IpcdMessageMultiData_GetFlags(tagList); AQH_IpcdMessageMultiData_ReadDatapoints(tagList, &recvDataPtr, &recvNumberOfPoints); if (recvNumberOfPoints) { uint64_t i; @@ -478,9 +480,11 @@ uint64_t _handleDataResponses(AQH_DATACLIENT *dc, uint64_t *dataPtr, uint64_t ma } } } - GWEN_Tag16_List_free(tagList); - AQH_Message_free(msgIn); - break; + if (flags & AQH_MSGDATA_MULTIDATA_FLAGS_LASTMSG) { + GWEN_Tag16_List_free(tagList); + AQH_Message_free(msgIn); + break; + } } else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { DBG_INFO(NULL, "Server Error: %d", AQH_IpcMessageResult_GetResult(tagList)); diff --git a/aqhome/msg/ipc/data/m_ipcd_multidata.c b/aqhome/msg/ipc/data/m_ipcd_multidata.c index 9886775..e5ab1f5 100644 --- a/aqhome/msg/ipc/data/m_ipcd_multidata.c +++ b/aqhome/msg/ipc/data/m_ipcd_multidata.c @@ -35,8 +35,7 @@ * ------------------------------------------------------------------------------------------------ */ -AQH_MESSAGE *AQH_IpcdMessageMultiData_new(uint16_t code, - uint32_t msgId, uint32_t refMsgId, +AQH_MESSAGE *AQH_IpcdMessageMultiData_new(uint16_t code, uint32_t msgId, uint32_t refMsgId, uint32_t flags, const AQH_VALUE *value, const uint64_t *i64Ptr, int numOfDataPoints) { AQH_MESSAGE *msg; @@ -45,6 +44,7 @@ AQH_MESSAGE *AQH_IpcdMessageMultiData_new(uint16_t code, buf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Tag16_WriteUint32TagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_FLAGS, flags, buf); rv=AQH_Tag16_WriteValueAsTagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_VALUE, value, buf); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); @@ -63,8 +63,7 @@ AQH_MESSAGE *AQH_IpcdMessageMultiData_new(uint16_t code, -AQH_MESSAGE *AQH_IpcdMessageMultiData_newForOne(uint16_t code, - uint32_t msgId, uint32_t refMsgId, +AQH_MESSAGE *AQH_IpcdMessageMultiData_newForOne(uint16_t code, uint32_t msgId, uint32_t refMsgId, uint32_t flags, const AQH_VALUE *value, uint64_t timeStamp, double dataPoint) { AQH_MESSAGE *msg; @@ -75,6 +74,7 @@ AQH_MESSAGE *AQH_IpcdMessageMultiData_newForOne(uint16_t code, buf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Tag16_WriteUint32TagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_FLAGS, flags, buf); rv=AQH_Tag16_WriteValueAsTagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_VALUE, value, buf); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); @@ -95,6 +95,18 @@ AQH_MESSAGE *AQH_IpcdMessageMultiData_newForOne(uint16_t code, +uint32_t AQH_IpcdMessageMultiData_GetFlags(const GWEN_TAG16_LIST *tagList) +{ + return tagList?AQH_Tag16_GetTagDataAsUint32(tagList, + AQH_MSGDATA_MULTIDATA_TAGS_FLAGS, + AQH_MSGDATA_MULTIDATA_FLAGS_LASTMSG):AQH_MSGDATA_MULTIDATA_FLAGS_LASTMSG; +} + + + + + + AQH_VALUE *AQH_IpcdMessageMultiData_ReadValue(const GWEN_TAG16_LIST *tagList) { return tagList?AQH_Tag16_ReadValueFromTagList(tagList, AQH_MSGDATA_MULTIDATA_TAGS_VALUE):NULL; @@ -129,7 +141,9 @@ void AQH_IpcdMessageMultiData_DumpToBuffer(const AQH_MESSAGE *msg, const GWEN_TA const char *valueUnits; int valueType; unsigned int numberOfPoints=0; + uint32_t flags; + flags=AQH_IpcdMessageMultiData_GetFlags(tagList); value=tagList?AQH_IpcdMessageMultiData_ReadValue(tagList):NULL; valueName=value?AQH_Value_GetNameForSystem(value):NULL; valueUnits=value?AQH_Value_GetValueUnits(value):NULL; @@ -139,12 +153,14 @@ void AQH_IpcdMessageMultiData_DumpToBuffer(const AQH_MESSAGE *msg, const GWEN_TA numberOfPoints=(tag?GWEN_Tag16_GetTagLength(tag):0)/(2*sizeof(uint64_t)); GWEN_Buffer_AppendArgs(dbuf, - "MULTIDATA(%s) %s (code=%d, proto=%d, proto version=%d, name=%s, units=%s, type=%d, datapoints=%u)\n", + "MULTIDATA(%s) %s (code=%d, proto=%d, proto version=%d, flags=0x%08x, " + "name=%s, units=%s, type=%d, datapoints=%u)\n", AQH_IpcdMessage_MsgTypeToChar(AQH_IpcMessage_GetCode(msg)), sText?sText:"", AQH_IpcMessage_GetCode(msg), AQH_IpcMessage_GetProtoId(msg), - AQH_IpcMessage_GetProtoVersion(msg), + AQH_IpcMessage_GetProtoVersion(msg), + flags, valueName?valueName:"", valueUnits?valueUnits:"", valueType, diff --git a/aqhome/msg/ipc/data/m_ipcd_multidata.h b/aqhome/msg/ipc/data/m_ipcd_multidata.h index 68edd91..f8e8a48 100644 --- a/aqhome/msg/ipc/data/m_ipcd_multidata.h +++ b/aqhome/msg/ipc/data/m_ipcd_multidata.h @@ -20,18 +20,20 @@ +#define AQH_MSGDATA_MULTIDATA_FLAGS_LASTMSG 0x0001 + +#define AQH_MSGDATA_MULTIDATA_TAGS_FLAGS 0x01 #define AQH_MSGDATA_MULTIDATA_TAGS_VALUE 0xc1 #define AQH_MSGDATA_MULTIDATA_TAGS_DATA 0xc2 -AQHOME_API AQH_MESSAGE *AQH_IpcdMessageMultiData_new(uint16_t code, - uint32_t msgId, uint32_t refMsgId, +AQHOME_API AQH_MESSAGE *AQH_IpcdMessageMultiData_new(uint16_t code, uint32_t msgId, uint32_t refMsgId, uint32_t flags, const AQH_VALUE *value, const uint64_t *i64Ptr, int numOfDataPoints); -AQHOME_API AQH_MESSAGE *AQH_IpcdMessageMultiData_newForOne(uint16_t code, - uint32_t msgId, uint32_t refMsgId, +AQHOME_API AQH_MESSAGE *AQH_IpcdMessageMultiData_newForOne(uint16_t code, uint32_t msgId, uint32_t refMsgId, uint32_t flags, const AQH_VALUE *value, uint64_t timeStamp, double dataPoint); +AQHOME_API uint32_t AQH_IpcdMessageMultiData_GetFlags(const GWEN_TAG16_LIST *tagList); AQHOME_API AQH_VALUE *AQH_IpcdMessageMultiData_ReadValue(const GWEN_TAG16_LIST *tagList); AQHOME_API void AQH_IpcdMessageMultiData_ReadDatapoints(const GWEN_TAG16_LIST *tagList, const uint64_t **pDataPtr, uint64_t *pNumberOfPoints);