/**************************************************************************** * This file is part of the project Gwenhywfar. * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./df_direct_p.h" #include #include #include GWEN_INHERIT(AQH_DATAFILE, AQH_DATAFILE_DIRECT); /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static void _cbFreeData(void *bp, void *p); static int _cbCreate(AQH_DATAFILE *df); static int _cbOpen(AQH_DATAFILE *df); static int _cbClose(AQH_DATAFILE *df); static uint64_t _cbGetNumberOfEntries(AQH_DATAFILE *df); static int _cbReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue); static int _cbAppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value); static int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue); static int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value); /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ AQH_DATAFILE *AQH_DataFileDirect_new(const char *fileName, uint64_t valueId) { AQH_DATAFILE *df; AQH_DATAFILE_DIRECT *xdf; df=AQH_DataFile_new(); AQH_DataFile_SetValueId(df, valueId); GWEN_NEW_OBJECT(AQH_DATAFILE_DIRECT, xdf); xdf->fileName=fileName?strdup(fileName):NULL; GWEN_INHERIT_SETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df, xdf, _cbFreeData); AQH_DataFile_SetCreateFn(df, _cbCreate); AQH_DataFile_SetOpenFn(df, _cbOpen); AQH_DataFile_SetCloseFn(df, _cbClose); AQH_DataFile_SetGetNumberOfEntriesFn(df, _cbGetNumberOfEntries); AQH_DataFile_SetReadRecordFn(df, _cbReadRecord); AQH_DataFile_SetAppendRecordFn(df, _cbAppendRecord); return df; } void _cbFreeData(GWEN_UNUSED void *bp, void *p) { AQH_DATAFILE_DIRECT *xdf; xdf=(AQH_DATAFILE_DIRECT*)p; GWEN_SyncIo_free(xdf->sio); free(xdf->fileName); GWEN_FREE_OBJECT(xdf); } const char *_cbGetFileName(const AQH_DATAFILE *df) { AQH_DATAFILE_DIRECT *xdf; xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df); return xdf?xdf->fileName:NULL; } uint64_t _cbGetNumberOfEntries(AQH_DATAFILE *df) { AQH_DATAFILE_DIRECT *xdf; xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df); return xdf?xdf->numberOfEntries:0; } int _cbCreate(AQH_DATAFILE *df) { if (df) { AQH_DATAFILE_DIRECT *xdf; xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df); if (xdf) { GWEN_SYNCIO *sio; int rv; if (xdf->sio) { DBG_ERROR(AQH_LOGDOMAIN, "File already open"); return GWEN_ERROR_INVALID; } sio=GWEN_SyncIo_File_new(xdf->fileName, GWEN_SyncIo_File_CreationMode_CreateNew); GWEN_SyncIo_SetFlags(sio, GWEN_SYNCIO_FILE_FLAGS_WRITE | GWEN_SYNCIO_FILE_FLAGS_READ | GWEN_SYNCIO_FILE_FLAGS_UREAD | GWEN_SYNCIO_FILE_FLAGS_UWRITE); rv=GWEN_SyncIo_Connect(sio); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", xdf->fileName, rv); GWEN_SyncIo_free(sio); return rv; } rv=_writeRecord(sio, 0, 0, 0.0); if (rv<0) { DBG_DEBUG(AQH_LOGDOMAIN, "Error adding first record to file \"%s\" (%d)", xdf->fileName, rv); GWEN_SyncIo_free(sio); return rv; } xdf->sio=sio; xdf->numberOfEntries=1; /* we just added the 0-record */ DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" created.", xdf->fileName); return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Not of type AQH_DATAFILE_DIRECT"); return GWEN_ERROR_GENERIC; } } else { DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); return GWEN_ERROR_GENERIC; } } int _cbOpen(AQH_DATAFILE *df) { if (df) { AQH_DATAFILE_DIRECT *xdf; xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df); if (xdf) { GWEN_SYNCIO *sio; int rv; int64_t len; if (xdf->sio) { DBG_ERROR(AQH_LOGDOMAIN, "File already open"); return GWEN_ERROR_INVALID; } sio=GWEN_SyncIo_File_new(xdf->fileName, GWEN_SyncIo_File_CreationMode_OpenExisting); GWEN_SyncIo_SetFlags(sio, GWEN_SYNCIO_FILE_FLAGS_WRITE | GWEN_SYNCIO_FILE_FLAGS_READ); rv=GWEN_SyncIo_Connect(sio); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error opening file \"%s\" (%d)", xdf->fileName, rv); GWEN_SyncIo_free(sio); return rv; } len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_End); if (len<0) { DBG_INFO(AQH_LOGDOMAIN, "Error seeking to end of file \"%s\" (%d)", xdf->fileName, rv); GWEN_SyncIo_free(sio); return rv; } if (len % AQH_DATAFILE_RECORDSIZE) { DBG_ERROR(AQH_LOGDOMAIN, "Invalid file size of file \"%s\" (not a multiple of current record size)", xdf->fileName); GWEN_SyncIo_Disconnect(sio); GWEN_SyncIo_free(sio); unlink(xdf->fileName); return GWEN_ERROR_BAD_DATA; } xdf->numberOfEntries=len/AQH_DATAFILE_RECORDSIZE; len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_Set); if (len<0) { DBG_INFO(AQH_LOGDOMAIN, "Error seeking to begin of file \"%s\" (%d)", xdf->fileName, rv); GWEN_SyncIo_free(sio); return rv; } xdf->sio=sio; return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Not of type AQH_DATAFILE_DIRECT"); return GWEN_ERROR_GENERIC; } } else { DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); return GWEN_ERROR_GENERIC; } } int _cbClose(AQH_DATAFILE *df) { if (df) { AQH_DATAFILE_DIRECT *xdf; xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df); if (xdf) { int rv; if (xdf->sio==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "File not open"); return GWEN_ERROR_INVALID; } rv=GWEN_SyncIo_Disconnect(xdf->sio); if (rv<0) { DBG_ERROR(AQH_LOGDOMAIN, "Error closing file \"%s\" (%d)", xdf->fileName, rv); GWEN_SyncIo_free(xdf->sio); xdf->sio=NULL; xdf->numberOfEntries=0; return rv; } GWEN_SyncIo_free(xdf->sio); xdf->sio=NULL; xdf->numberOfEntries=0; return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Not of type AQH_DATAFILE_DIRECT"); return GWEN_ERROR_GENERIC; } } else { DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); return GWEN_ERROR_GENERIC; } } int _cbReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue) { if (df) { AQH_DATAFILE_DIRECT *xdf; xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df); if (xdf) { int rv; if (xdf->sio==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "File not open"); return GWEN_ERROR_INVALID; } rv=_readRecord(xdf->sio, idx, pTimestamp, pValue);; if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error reading from file \"%s\" (%d)", xdf->fileName, rv); return rv; } return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Not of type AQH_DATAFILE_DIRECT"); return GWEN_ERROR_GENERIC; } } else { DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); return GWEN_ERROR_GENERIC; } } int _cbAppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value) { if (df) { AQH_DATAFILE_DIRECT *xdf; xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df); if (xdf) { int rv; uint64_t filePos; if (xdf->sio==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "File not open"); return GWEN_ERROR_INVALID; } filePos=(xdf->numberOfEntries)*AQH_DATAFILE_RECORDSIZE; rv=_writeRecord(xdf->sio, filePos, timestamp, value); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error writing to file \"%s\" (%d)", xdf->fileName, rv); return rv; } xdf->numberOfEntries++; DBG_DEBUG(AQH_LOGDOMAIN, "File \"%s\" has now %lu entries", xdf->fileName, (unsigned long int) (xdf->numberOfEntries)); return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Not of type AQH_DATAFILE_DIRECT"); return GWEN_ERROR_GENERIC; } } else { DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer"); return GWEN_ERROR_GENERIC; } } int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue) { uint8_t record[AQH_DATAFILE_RECORDSIZE]; union {double f; uint64_t i;} u; uint64_t filePos; int64_t retPos; const uint8_t *ptr; uint64_t v; int rv; filePos=idx*AQH_DATAFILE_RECORDSIZE; retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set); if (retPos<0) { DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos); return (int) retPos; } rv=GWEN_SyncIo_ReadForced(sio, record, AQH_DATAFILE_RECORDSIZE); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error reading at pos %lu (%d)", (unsigned long int) filePos, rv); return rv; } ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP; v=0; v|=(uint64_t)(*(ptr++)); v|=(uint64_t)(*(ptr++))<<8; v|=(uint64_t)(*(ptr++))<<16; v|=(uint64_t)(*(ptr++))<<24; v|=(uint64_t)(*(ptr++))<<32; v|=(uint64_t)(*(ptr++))<<40; v|=(uint64_t)(*(ptr++))<<48; v|=(uint64_t)(*(ptr++))<<56; *pTimestamp=v; ptr=record+AQH_DATAFILE_OFFSET_VALUE; v=0; v|=(uint64_t)(*(ptr++)); v|=(uint64_t)(*(ptr++))<<8; v|=(uint64_t)(*(ptr++))<<16; v|=(uint64_t)(*(ptr++))<<24; v|=(uint64_t)(*(ptr++))<<32; v|=(uint64_t)(*(ptr++))<<40; v|=(uint64_t)(*(ptr++))<<48; v|=(uint64_t)(*(ptr++))<<56; u.i=v; *pValue=u.f; return 0; } int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value) { uint8_t record[AQH_DATAFILE_RECORDSIZE]; union {double f; uint64_t i;} u; uint8_t *ptr; int64_t retPos; uint64_t v; int rv; ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP; v=timestamp; *(ptr++)=(v) & 0xff; *(ptr++)=(v>>8) & 0xff; *(ptr++)=(v>>16) & 0xff; *(ptr++)=(v>>24) & 0xff; *(ptr++)=(v>>32) & 0xff; *(ptr++)=(v>>40) & 0xff; *(ptr++)=(v>>48) & 0xff; *(ptr++)=(v>>56) & 0xff; u.f=value; v=u.i; ptr=record+AQH_DATAFILE_OFFSET_VALUE; *(ptr++)=(v) & 0xff; *(ptr++)=(v>>8) & 0xff; *(ptr++)=(v>>16) & 0xff; *(ptr++)=(v>>24) & 0xff; *(ptr++)=(v>>32) & 0xff; *(ptr++)=(v>>40) & 0xff; *(ptr++)=(v>>48) & 0xff; *(ptr++)=(v>>56) & 0xff; retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set); if (retPos<0) { DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos); return (int) retPos; } rv=GWEN_SyncIo_WriteForced(sio, record, AQH_DATAFILE_RECORDSIZE); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "Error writing to file (%d)", rv); return rv; } return 0; }