diff --git a/apps/aqhome-data/c_getdatapoints.c b/apps/aqhome-data/c_getdatapoints.c index 8d29db3..75a6881 100644 --- a/apps/aqhome-data/c_getdatapoints.c +++ b/apps/aqhome-data/c_getdatapoints.c @@ -30,7 +30,7 @@ */ #define AQHOMEDATA_HANDLEGETDATAPOINTS_MAXTABLEENTRIES 2048 - +#define AQHOMEDATA_HANDLEGETDATAPOINTS_MAXDATAPOINTS 1024 /* ------------------------------------------------------------------------------------------------ @@ -38,7 +38,13 @@ * ------------------------------------------------------------------------------------------------ */ -static int _getAndSendDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd); +static int _getAndSendDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, + const AQH_VALUE *value, + uint64_t tsBegin, uint64_t tsEnd, uint64_t num); +static int _getAndSendDataPointsNoNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd); +static int _getAndSendDataPointsWithNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t num); +static void _sendDataPointsResponse(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, const uint64_t *tablePtr); +static void _getAndSendLastDatapoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value); @@ -58,15 +64,17 @@ void AqHomeData_HandleGetDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWE char *valueName; uint64_t tsBegin; uint64_t tsEnd; + uint64_t numRequested; AQH_GetDataDataIpcMsg_Parse(recvdMsg, 0); valueName=AQH_Tag16IpcMsg_GetTagDataAsNewString(recvdMsg, AQH_MSGDATA_GETDATA_TAGS_NAME, NULL); tsBegin=AQH_Tag16IpcMsg_GetTagDataAsUint64(recvdMsg, AQH_MSGDATA_GETDATA_TAGS_BEGIN, 0); tsEnd=AQH_Tag16IpcMsg_GetTagDataAsUint64(recvdMsg, AQH_MSGDATA_GETDATA_TAGS_END, 0); + numRequested=AQH_Tag16IpcMsg_GetTagDataAsUint64(recvdMsg, AQH_MSGDATA_GETDATA_TAGS_NUM, 0); value=AQH_Storage_GetValueByNameForSystem(aqh->storage, valueName); if (value) { - resultCode=_getAndSendDataPoints(aqh, ep, value, tsBegin, tsEnd); + resultCode=_getAndSendDataPoints(aqh, ep, value, tsBegin, tsEnd, numRequested); if (resultCode==AQH_MSG_IPC_SUCCESS) return; } @@ -87,23 +95,29 @@ void AqHomeData_HandleGetDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWE -int _getAndSendDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd) +int _getAndSendDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd, uint64_t num) +{ + if (num==0) + return _getAndSendDataPointsNoNum(aqh, ep, value, tsBegin, tsEnd); + else if (num==1) { + _getAndSendLastDatapoint(aqh, ep, value); + return AQH_MSG_IPC_SUCCESS; + } + else + return _getAndSendDataPointsWithNum(aqh, ep, value, num); +} + + + +int _getAndSendDataPointsNoNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t tsBegin, uint64_t tsEnd) { uint64_t valueId; uint64_t *tablePtr; valueId=AQH_Value_GetId(value); - tablePtr=AQH_Storage_GetDataPoints(aqh->storage, valueId, tsBegin, tsEnd, AQHOMEDATA_HANDLEGETDATAPOINTS_MAXTABLEENTRIES); if (tablePtr) { - int numTableEntries; - int numDataPoints; - GWEN_MSG *outMsg; - - numTableEntries=(int)(tablePtr[0]); - numDataPoints=numTableEntries/2; - outMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETDATA_RSP, value, &(tablePtr[1]), numDataPoints); - GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); + _sendDataPointsResponse(aqh, ep, value, tablePtr); free(tablePtr); return AQH_MSG_IPC_SUCCESS; } @@ -115,6 +129,70 @@ int _getAndSendDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VAL +int _getAndSendDataPointsWithNum(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, uint64_t num) +{ + uint64_t valueId; + uint64_t *tablePtr; + + if (num>AQHOMEDATA_HANDLEGETDATAPOINTS_MAXDATAPOINTS) + num=AQHOMEDATA_HANDLEGETDATAPOINTS_MAXDATAPOINTS; + valueId=AQH_Value_GetId(value); + tablePtr=AQH_Storage_GetLastNDataPoints(aqh->storage, valueId, num); + if (tablePtr) { + _sendDataPointsResponse(aqh, ep, value, tablePtr); + free(tablePtr); + return AQH_MSG_IPC_SUCCESS; + } + else { + DBG_INFO(NULL, "No matching datapoints for value \"%s\"", AQH_Value_GetNameForSystem(value)); + return AQH_MSG_IPC_ERROR_NODATA; + } +} + + + +void _sendDataPointsResponse(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value, const uint64_t *tablePtr) +{ + int numTableEntries; + int numDataPoints; + GWEN_MSG *outMsg; + + numTableEntries=(int)(tablePtr[0]); + numDataPoints=numTableEntries/2; + outMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETDATA_RSP, value, &(tablePtr[1]), numDataPoints); + GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); +} + + + +void _getAndSendLastDatapoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE *value) +{ + GWEN_MSG *outMsg; + int resultCode=AQH_MSG_IPC_SUCCESS; + int rv; + uint64_t timestamp=0; + double data=0.0; + + rv=AQH_Storage_GetLastDataPoint(aqh->storage, AQH_Value_GetId(value), ×tamp, &data); + if (rv<0) { + switch(rv) { + case GWEN_ERROR_INVALID: resultCode=AQH_MSG_IPC_ERROR_INVALID; break; + case GWEN_ERROR_NO_DATA: resultCode=AQH_MSG_IPC_ERROR_NODATA; break; + default: resultCode=AQH_MSG_IPC_ERROR_GENERIC; break; + } + } + else { + outMsg=AQH_MultiDataDataIpcMsg_newForOne(AQH_MSGTYPE_IPC_DATA_GETDATA_RSP, value, timestamp, data); + GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); + return; + } + + outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode); + GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); +} + + + diff --git a/apps/aqhome-tool/data/getdatapoints.c b/apps/aqhome-tool/data/getdatapoints.c index cf4b424..0df7be5 100644 --- a/apps/aqhome-tool/data/getdatapoints.c +++ b/apps/aqhome-tool/data/getdatapoints.c @@ -113,6 +113,17 @@ int AQH_Tool_GetDataPoints(GWEN_DB_NODE *dbGlobalArgs, int argc, char **argv) I18S("Get data up until this timestamp (latest timestamp if omitted)"), I18S("Get data up until this timestamp (latest timestamp if omitted)") }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "numOfLastDatapoints", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "n", /* short option */ + NULL, /* long option */ + I18S("Get last n datapoints"), + I18S("Get last n datapoints") + }, { 0, /* flags */ GWEN_ArgsType_Int, /* type */ @@ -206,6 +217,7 @@ int _doGetDataPoints(GWEN_DB_NODE *dbArgs) const char *valueName; uint64_t tsBegin; uint64_t tsEnd; + uint64_t num; GWEN_MSG *msgOut; int printMean; int rv; @@ -213,6 +225,7 @@ int _doGetDataPoints(GWEN_DB_NODE *dbArgs) printMean=GWEN_DB_GetIntValue(dbArgs, "printMean", 0, 0); timeoutInSeconds=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 5); valueName=GWEN_DB_GetCharValue(dbArgs, "valueName", 0, NULL); + num=GWEN_DB_GetIntValue(dbArgs, "numOfLastDatapoints", 0, 0); tsBegin=_getTimeStampFromString(GWEN_DB_GetCharValue(dbArgs, "tsBegin", 0, NULL)); if (tsBegin==(uint64_t) (-1)) { DBG_ERROR(NULL, "Bad begin timestamp"); @@ -231,8 +244,7 @@ int _doGetDataPoints(GWEN_DB_NODE *dbArgs) } /*fprintf(stdout, "Sending GetDataPoints request\n");*/ - - msgOut=AQH_GetDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ, valueName, tsBegin, tsEnd); + msgOut=AQH_GetDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ, valueName, tsBegin, tsEnd, num); GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); rv=_awaitAndCalcAndPrintResponse(epTcp, timeoutInSeconds, printMean?1:0); diff --git a/apps/aqhome-tool/data/getlastdatapoint.c b/apps/aqhome-tool/data/getlastdatapoint.c index 5425ba4..c86592a 100644 --- a/apps/aqhome-tool/data/getlastdatapoint.c +++ b/apps/aqhome-tool/data/getlastdatapoint.c @@ -211,7 +211,7 @@ void _sendCommand(GWEN_MSG_ENDPOINT *epTcp, const char *valueName) { GWEN_MSG *msgOut; - msgOut=AQH_GetDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ, valueName, 0, 0); + msgOut=AQH_GetDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ, valueName, 0, 0, 1); GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); } diff --git a/apps/aqhome-tool/utils.c b/apps/aqhome-tool/utils.c index 58abfa5..07e74d7 100644 --- a/apps/aqhome-tool/utils.c +++ b/apps/aqhome-tool/utils.c @@ -325,17 +325,17 @@ void Utils_PrintSingleDataPoint(uint64_t timestamp, double data, const char *val ts=GWEN_Timestamp_fromLocalTime((time_t) timestamp); if (ts) - fprintf(stdout, "%04d/%02d/%02d-%02d:%02d:%02d\t%lf\t%s\n", + fprintf(stdout, "%04d/%02d/%02d-%02d:%02d:%02d\t%lf\t%s\t%lu\n", GWEN_Timestamp_GetYear(ts), GWEN_Timestamp_GetMonth(ts), GWEN_Timestamp_GetDay(ts), GWEN_Timestamp_GetHour(ts), GWEN_Timestamp_GetMinute(ts), GWEN_Timestamp_GetSecond(ts), - data, valueUnits?valueUnits:""); + data, valueUnits?valueUnits:"", (unsigned long) timestamp); else - fprintf(stdout, "\t%lf\t%s\n", - data, valueUnits?valueUnits:""); + fprintf(stdout, "\t%lf\t%s\t%lu\n", + data, valueUnits?valueUnits:"", (unsigned long) timestamp); } diff --git a/aqhome/data/storage.c b/aqhome/data/storage.c index d35a611..9afa0fc 100644 --- a/aqhome/data/storage.c +++ b/aqhome/data/storage.c @@ -311,9 +311,9 @@ uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t } numEntries=AQH_DataFile_GetNumberOfEntries(df); if (fromTime==0 && toTime==0) - arrayLen=numEntries+1; + arrayLen=(numEntries*2)+1; else - arrayLen=AQH_STORAGE_DATAPOINTS_STEPS+1; + arrayLen=(AQH_STORAGE_DATAPOINTS_STEPS*2)+1; if (arrayLen>maxArrayLen+1) arrayLen=maxArrayLen+1; @@ -330,17 +330,15 @@ uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t uint64_t ts; int rv; - DBG_DEBUG(NULL, "Reading record %lu", (unsigned long int) i); rv=AQH_DataFile_ReadRecord(df, i, &ts, &(u.f)); if (rv<0) { DBG_ERROR(AQH_LOGDOMAIN, "here (%d)", rv); free(arrayPtr); return NULL; } - DBG_DEBUG(NULL, "Read record %lu (%lu - %lf)", (unsigned long int) i, (unsigned long int) ts, u.f); if ((fromTime==0 || ts>=fromTime) && (toTime==0 || ts<=toTime)) { - if (arrayPos>maxArrayLen) { + if ((arrayPos+1)>maxArrayLen) { DBG_INFO(AQH_LOGDOMAIN, "Limit for number of returned entries reached"); break; } @@ -348,7 +346,7 @@ uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t uint64_t newArrayLen; void *p; - newArrayLen=arrayLen+AQH_STORAGE_DATAPOINTS_STEPS; + newArrayLen=arrayLen+(AQH_STORAGE_DATAPOINTS_STEPS*2); if (newArrayLen>maxArrayLen+1) newArrayLen=maxArrayLen+1; if (newArrayLen==arrayLen) { @@ -367,9 +365,6 @@ uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t arrayPtr[arrayPos++]=ts; arrayPtr[arrayPos++]=u.i; } - else { - DBG_DEBUG(NULL, "Entry %lu does not match", (unsigned long int) i); - } } /* for */ if (arrayPos<=1) { @@ -384,6 +379,70 @@ uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t +uint64_t *AQH_Storage_GetLastNDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t maxDataPointsRequested) +{ + AQH_DATAFILE *df; + uint64_t numEntries; + uint64_t numOfDataEntries; + uint64_t arrayLen; + uint64_t arrayPos; + uint64_t *arrayPtr; + uint64_t firstRecord; + uint64_t i; + + df=_getDataFileByValueId(sto, valueId); + if (df==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "No file for value id %lu", (unsigned long int) valueId); + return NULL; + } + numEntries=AQH_DataFile_GetNumberOfEntries(df); + numOfDataEntries=numEntries-1; /* first entry is reserved, don't count it here */ + if (numOfDataEntries<1) { + DBG_INFO(AQH_LOGDOMAIN, "No data records for value id %lu", (unsigned long int) valueId); + return NULL; + } + if (numOfDataEntries>maxDataPointsRequested) { /* more entries in file than requested */ + arrayLen=(maxDataPointsRequested*2)+1; /* +1 because the first array entry contains the number of entries */ + firstRecord=1+(numOfDataEntries-maxDataPointsRequested); + } + else { + arrayLen=(numOfDataEntries*2)+1; + firstRecord=1; + } + + arrayPtr=(uint64_t*) malloc(arrayLen*sizeof(uint64_t)); + if (arrayPtr==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Not enough memory for %lu entries", (unsigned long int) arrayLen); + return NULL; + } + arrayPos=1; + + for (i=firstRecord; i=arrayLen) { + DBG_INFO(AQH_LOGDOMAIN, "Requested number of entries reached"); + break; + } + arrayPtr[arrayPos++]=ts; + arrayPtr[arrayPos++]=u.i; + } /* for */ + + arrayPtr[0]=arrayPos-1; + return arrayPtr; +} + + + int AQH_Storage_GetLastDataPoint(AQH_STORAGE *sto, uint64_t valueId, uint64_t *pTimestamp, double *pValue) { AQH_DATAFILE *df; diff --git a/aqhome/data/storage.h b/aqhome/data/storage.h index ae4330f..3dff203 100644 --- a/aqhome/data/storage.h +++ b/aqhome/data/storage.h @@ -79,6 +79,7 @@ AQHOME_API uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueI uint64_t fromTime, uint64_t toTime, uint64_t maxArrayLen); AQHOME_API int AQH_Storage_GetLastDataPoint(AQH_STORAGE *sto, uint64_t valueId, uint64_t *pTimestamp, double *pValue); +AQHOME_API uint64_t *AQH_Storage_GetLastNDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t maxDataPointsRequested); diff --git a/aqhome/ipc/data/msg_data_getdata.c b/aqhome/ipc/data/msg_data_getdata.c index 33aba6a..b14dd1a 100644 --- a/aqhome/ipc/data/msg_data_getdata.c +++ b/aqhome/ipc/data/msg_data_getdata.c @@ -27,7 +27,7 @@ -GWEN_MSG *AQH_GetDataDataIpcMsg_new(uint16_t code, const char *valueName, uint64_t tsBegin, uint64_t tsEnd) +GWEN_MSG *AQH_GetDataDataIpcMsg_new(uint16_t code, const char *valueName, uint64_t tsBegin, uint64_t tsEnd, uint64_t num) { GWEN_MSG *msg; GWEN_BUFFER *buf; @@ -37,6 +37,7 @@ GWEN_MSG *AQH_GetDataDataIpcMsg_new(uint16_t code, const char *valueName, uint64 GWEN_Tag16_WriteStringTagToBuffer(AQH_MSGDATA_GETDATA_TAGS_NAME, valueName, buf); GWEN_Tag16_WriteUint64TagToBuffer(AQH_MSGDATA_GETDATA_TAGS_BEGIN, tsBegin, buf); GWEN_Tag16_WriteUint64TagToBuffer(AQH_MSGDATA_GETDATA_TAGS_END, tsEnd, buf); + GWEN_Tag16_WriteUint64TagToBuffer(AQH_MSGDATA_GETDATA_TAGS_NUM, num, buf); msg=AQH_Tag16IpcMsg_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, code, GWEN_Buffer_GetUsedBytes(buf), (const uint8_t*) GWEN_Buffer_GetStart(buf)); diff --git a/aqhome/ipc/data/msg_data_getdata.h b/aqhome/ipc/data/msg_data_getdata.h index 2fdb62e..48876f3 100644 --- a/aqhome/ipc/data/msg_data_getdata.h +++ b/aqhome/ipc/data/msg_data_getdata.h @@ -24,10 +24,11 @@ #define AQH_MSGDATA_GETDATA_TAGS_NAME 0x0001 #define AQH_MSGDATA_GETDATA_TAGS_BEGIN 0x0020 #define AQH_MSGDATA_GETDATA_TAGS_END 0x0021 +#define AQH_MSGDATA_GETDATA_TAGS_NUM 0x0022 -AQHOME_API GWEN_MSG *AQH_GetDataDataIpcMsg_new(uint16_t code, const char *valueName, uint64_t tsBegin, uint64_t tsEnd); +AQHOME_API GWEN_MSG *AQH_GetDataDataIpcMsg_new(uint16_t code, const char *valueName, uint64_t tsBegin, uint64_t tsEnd, uint64_t num); AQHOME_API void AQH_GetDataDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy);