/**************************************************************************** * This file is part of the project Gwenhywfar. * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./datafile_p.h" #include #include #include GWEN_LIST_FUNCTIONS(AQH_DATAFILE, AQH_DataFile); /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue); static int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value); /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ AQH_DATAFILE *AQH_DataFile_new(const char *fileName, uint64_t valueId) { AQH_DATAFILE *df; GWEN_NEW_OBJECT(AQH_DATAFILE, df); GWEN_LIST_INIT(AQH_DATAFILE, df); df->valueId=valueId; df->fileName=fileName?strdup(fileName):NULL; return df; } void AQH_DataFile_free(AQH_DATAFILE *df) { if (df) { GWEN_LIST_FINI(AQH_DATAFILE, df); GWEN_SyncIo_free(df->sio); free(df->fileName); GWEN_FREE_OBJECT(df); } } const char *AQH_DataFile_GetFileName(const AQH_DATAFILE *df) { return df?df->fileName:NULL; } uint64_t AQH_DataFile_GetValueId(const AQH_DATAFILE *df) { return df?df->valueId:0; } uint64_t AQH_DataFile_GetNumberOfEntries(const AQH_DATAFILE *df) { return df?df->numberOfEntries:0; } uint32_t AQH_DataFile_GetRuntimeFlags(const AQH_DATAFILE *df) { return df?df->runtimeFlags:0; } int AQH_DataFile_Create(AQH_DATAFILE *df) { if (df) { GWEN_SYNCIO *sio; int rv; if (df->sio) { DBG_ERROR(AQH_LOGDOMAIN, "File already open"); return GWEN_ERROR_INVALID; } sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_CreateNew); GWEN_SyncIo_SetFlags(sio, GWEN_SYNCIO_FILE_FLAGS_WRITE | GWEN_SYNCIO_FILE_FLAGS_READ | GWEN_SYNCIO_FILE_FLAGS_UREAD | GWEN_SYNCIO_FILE_FLAGS_UWRITE); rv=GWEN_SyncIo_Connect(sio); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", df->fileName, rv); GWEN_SyncIo_free(sio); return rv; } rv=_writeRecord(sio, 0, 0, 0.0); if (rv<0) { DBG_DEBUG(AQH_LOGDOMAIN, "Error adding first record to file \"%s\" (%d)", df->fileName, rv); GWEN_SyncIo_free(sio); return rv; } df->sio=sio; df->numberOfEntries=1; /* we just added the 0-record */ DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" created.", df->fileName); return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); return GWEN_ERROR_GENERIC; } } int AQH_DataFile_Open(AQH_DATAFILE *df) { if (df) { GWEN_SYNCIO *sio; int rv; int64_t len; if (df->sio) { DBG_ERROR(AQH_LOGDOMAIN, "File already open"); return GWEN_ERROR_INVALID; } sio=GWEN_SyncIo_File_new(df->fileName, GWEN_SyncIo_File_CreationMode_OpenExisting); GWEN_SyncIo_SetFlags(sio, GWEN_SYNCIO_FILE_FLAGS_WRITE | GWEN_SYNCIO_FILE_FLAGS_READ); rv=GWEN_SyncIo_Connect(sio); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error opening file \"%s\" (%d)", df->fileName, rv); GWEN_SyncIo_free(sio); return rv; } len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_End); if (len<0) { DBG_INFO(AQH_LOGDOMAIN, "Error seeking to end of file \"%s\" (%d)", df->fileName, rv); GWEN_SyncIo_free(sio); return rv; } if (len % AQH_DATAFILE_RECORDSIZE) { DBG_ERROR(AQH_LOGDOMAIN, "Invalid file size of file \"%s\" (not a multiple of current record size)", df->fileName); GWEN_SyncIo_Disconnect(sio); GWEN_SyncIo_free(sio); unlink(df->fileName); return GWEN_ERROR_BAD_DATA; } df->numberOfEntries=len/AQH_DATAFILE_RECORDSIZE; len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_Set); if (len<0) { DBG_INFO(AQH_LOGDOMAIN, "Error seeking to begin of file \"%s\" (%d)", df->fileName, rv); GWEN_SyncIo_free(sio); return rv; } df->sio=sio; return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); return GWEN_ERROR_GENERIC; } } int AQH_DataFile_Close(AQH_DATAFILE *df) { if (df) { int rv; if (df->sio==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "File not open"); return GWEN_ERROR_INVALID; } rv=GWEN_SyncIo_Disconnect(df->sio); if (rv<0) { DBG_ERROR(AQH_LOGDOMAIN, "Error closing file \"%s\" (%d)", df->fileName, rv); GWEN_SyncIo_free(df->sio); df->sio=NULL; df->numberOfEntries=0; return rv; } GWEN_SyncIo_free(df->sio); df->sio=NULL; df->numberOfEntries=0; return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); return GWEN_ERROR_GENERIC; } } int AQH_DataFile_ReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue) { if (df) { int rv; if (df->sio==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "File not open"); return GWEN_ERROR_INVALID; } rv=_readRecord(df->sio, idx, pTimestamp, pValue);; if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error reading from file \"%s\" (%d)", df->fileName, rv); return rv; } return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); return GWEN_ERROR_GENERIC; } } int AQH_DataFile_AppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value) { if (df) { int rv; uint64_t filePos; if (df->sio==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "File not open"); return GWEN_ERROR_INVALID; } filePos=(df->numberOfEntries)*AQH_DATAFILE_RECORDSIZE; rv=_writeRecord(df->sio, filePos, timestamp, value); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error writing to file \"%s\" (%d)", df->fileName, rv); return rv; } df->numberOfEntries++; DBG_DEBUG(AQH_LOGDOMAIN, "File \"%s\" has now %lu entries", df->fileName, (unsigned long int) (df->numberOfEntries)); return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); return GWEN_ERROR_GENERIC; } } AQH_DATAFILE *AQH_DataFile_List_GetByValueId(AQH_DATAFILE_LIST *fileList, uint64_t id) { if (fileList) { AQH_DATAFILE *df; df=AQH_DataFile_List_First(fileList); while(df) { if (df->valueId==id) return df; df=AQH_DataFile_List_Next(df); } } return NULL; } int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue) { uint8_t record[AQH_DATAFILE_RECORDSIZE]; union {double f; uint64_t i;} u; uint64_t filePos; int64_t retPos; const uint8_t *ptr; uint64_t v; int rv; filePos=idx*AQH_DATAFILE_RECORDSIZE; retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set); if (retPos<0) { DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos); return (int) retPos; } rv=GWEN_SyncIo_ReadForced(sio, record, AQH_DATAFILE_RECORDSIZE); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error reading at pos %lu (%d)", (unsigned long int) filePos, rv); return rv; } ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP; v=0; v|=(uint64_t)(*(ptr++)); v|=(uint64_t)(*(ptr++))<<8; v|=(uint64_t)(*(ptr++))<<16; v|=(uint64_t)(*(ptr++))<<24; v|=(uint64_t)(*(ptr++))<<32; v|=(uint64_t)(*(ptr++))<<40; v|=(uint64_t)(*(ptr++))<<48; v|=(uint64_t)(*(ptr++))<<56; *pTimestamp=v; ptr=record+AQH_DATAFILE_OFFSET_VALUE; v=0; v|=(uint64_t)(*(ptr++)); v|=(uint64_t)(*(ptr++))<<8; v|=(uint64_t)(*(ptr++))<<16; v|=(uint64_t)(*(ptr++))<<24; v|=(uint64_t)(*(ptr++))<<32; v|=(uint64_t)(*(ptr++))<<40; v|=(uint64_t)(*(ptr++))<<48; v|=(uint64_t)(*(ptr++))<<56; u.i=v; *pValue=u.f; return 0; } int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value) { uint8_t record[AQH_DATAFILE_RECORDSIZE]; union {double f; uint64_t i;} u; uint8_t *ptr; int64_t retPos; uint64_t v; int rv; ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP; v=timestamp; *(ptr++)=(v) & 0xff; *(ptr++)=(v>>8) & 0xff; *(ptr++)=(v>>16) & 0xff; *(ptr++)=(v>>24) & 0xff; *(ptr++)=(v>>32) & 0xff; *(ptr++)=(v>>40) & 0xff; *(ptr++)=(v>>48) & 0xff; *(ptr++)=(v>>56) & 0xff; u.f=value; v=u.i; ptr=record+AQH_DATAFILE_OFFSET_VALUE; *(ptr++)=(v) & 0xff; *(ptr++)=(v>>8) & 0xff; *(ptr++)=(v>>16) & 0xff; *(ptr++)=(v>>24) & 0xff; *(ptr++)=(v>>32) & 0xff; *(ptr++)=(v>>40) & 0xff; *(ptr++)=(v>>48) & 0xff; *(ptr++)=(v>>56) & 0xff; retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set); if (retPos<0) { DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos); return (int) retPos; } rv=GWEN_SyncIo_WriteForced(sio, record, AQH_DATAFILE_RECORDSIZE); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error writing to file (%d)", rv); return rv; } return 0; }