aqhome: make datafile a virtual class. Add datafile_direct.
this is to allow for cached data file handling later.
This commit is contained in:
444
aqhome/data/datafile_direct.c
Normal file
444
aqhome/data/datafile_direct.c
Normal file
@@ -0,0 +1,444 @@
|
||||
/****************************************************************************
|
||||
* This file is part of the project Gwenhywfar.
|
||||
* Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
|
||||
*
|
||||
* The license for this file can be found in the file COPYING which you
|
||||
* should have received along with this file.
|
||||
****************************************************************************/
|
||||
|
||||
#ifdef HAVE_CONFIG_H
|
||||
# include <config.h>
|
||||
#endif
|
||||
|
||||
#include "./datafile_direct_p.h"
|
||||
|
||||
#include <gwenhywfar/debug.h>
|
||||
#include <gwenhywfar/syncio_file.h>
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
|
||||
GWEN_INHERIT(AQH_DATAFILE, AQH_DATAFILE_DIRECT);
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------------------------------------------
|
||||
* forward declarations
|
||||
* ------------------------------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
static void _cbFreeData(void *bp, void *p);
|
||||
|
||||
static int _cbCreate(AQH_DATAFILE *df);
|
||||
static int _cbOpen(AQH_DATAFILE *df);
|
||||
static int _cbClose(AQH_DATAFILE *df);
|
||||
|
||||
static uint64_t _cbGetNumberOfEntries(AQH_DATAFILE *df);
|
||||
|
||||
static int _cbReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue);
|
||||
static int _cbAppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value);
|
||||
|
||||
static int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue);
|
||||
static int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value);
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------------------------------------------
|
||||
* implementations
|
||||
* ------------------------------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
AQH_DATAFILE *AQH_DataFileDirect_new(const char *fileName, uint64_t valueId)
|
||||
{
|
||||
AQH_DATAFILE *df;
|
||||
AQH_DATAFILE_DIRECT *xdf;
|
||||
|
||||
df=AQH_DataFile_new();
|
||||
AQH_DataFile_SetValueId(df, valueId);
|
||||
|
||||
GWEN_NEW_OBJECT(AQH_DATAFILE_DIRECT, xdf);
|
||||
xdf->fileName=fileName?strdup(fileName):NULL;
|
||||
GWEN_INHERIT_SETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df, xdf, _cbFreeData);
|
||||
|
||||
AQH_DataFile_SetCreateFn(df, _cbCreate);
|
||||
AQH_DataFile_SetOpenFn(df, _cbOpen);
|
||||
AQH_DataFile_SetCloseFn(df, _cbClose);
|
||||
AQH_DataFile_SetGetNumberOfEntriesFn(df, _cbGetNumberOfEntries);
|
||||
AQH_DataFile_SetReadRecordFn(df, _cbReadRecord);
|
||||
AQH_DataFile_SetAppendRecordFn(df, _cbAppendRecord);
|
||||
|
||||
return df;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void _cbFreeData(void *bp, void *p)
|
||||
{
|
||||
AQH_DATAFILE_DIRECT *xdf;
|
||||
|
||||
xdf=(AQH_DATAFILE_DIRECT*)p;
|
||||
GWEN_SyncIo_free(xdf->sio);
|
||||
free(xdf->fileName);
|
||||
GWEN_FREE_OBJECT(xdf);
|
||||
}
|
||||
|
||||
|
||||
|
||||
const char *_cbGetFileName(const AQH_DATAFILE *df)
|
||||
{
|
||||
AQH_DATAFILE_DIRECT *xdf;
|
||||
|
||||
xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df);
|
||||
return xdf?xdf->fileName:NULL;
|
||||
}
|
||||
|
||||
|
||||
|
||||
uint64_t _cbGetNumberOfEntries(AQH_DATAFILE *df)
|
||||
{
|
||||
AQH_DATAFILE_DIRECT *xdf;
|
||||
|
||||
xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df);
|
||||
return xdf?xdf->numberOfEntries:0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
int _cbCreate(AQH_DATAFILE *df)
|
||||
{
|
||||
if (df) {
|
||||
AQH_DATAFILE_DIRECT *xdf;
|
||||
|
||||
xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df);
|
||||
if (xdf) {
|
||||
GWEN_SYNCIO *sio;
|
||||
int rv;
|
||||
|
||||
if (xdf->sio) {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "File already open");
|
||||
return GWEN_ERROR_INVALID;
|
||||
}
|
||||
|
||||
sio=GWEN_SyncIo_File_new(xdf->fileName, GWEN_SyncIo_File_CreationMode_CreateNew);
|
||||
GWEN_SyncIo_SetFlags(sio,
|
||||
GWEN_SYNCIO_FILE_FLAGS_WRITE | GWEN_SYNCIO_FILE_FLAGS_READ |
|
||||
GWEN_SYNCIO_FILE_FLAGS_UREAD | GWEN_SYNCIO_FILE_FLAGS_UWRITE);
|
||||
rv=GWEN_SyncIo_Connect(sio);
|
||||
if (rv<0) {
|
||||
DBG_INFO(AQH_LOGDOMAIN, "Error creating file \"%s\" (%d)", xdf->fileName, rv);
|
||||
GWEN_SyncIo_free(sio);
|
||||
return rv;
|
||||
}
|
||||
rv=_writeRecord(sio, 0, 0, 0.0);
|
||||
if (rv<0) {
|
||||
DBG_DEBUG(AQH_LOGDOMAIN, "Error adding first record to file \"%s\" (%d)", xdf->fileName, rv);
|
||||
GWEN_SyncIo_free(sio);
|
||||
return rv;
|
||||
}
|
||||
xdf->sio=sio;
|
||||
xdf->numberOfEntries=1; /* we just added the 0-record */
|
||||
|
||||
DBG_INFO(AQH_LOGDOMAIN, "File \"%s\" created.", xdf->fileName);
|
||||
return 0;
|
||||
}
|
||||
else {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Not of type AQH_DATAFILE_DIRECT");
|
||||
return GWEN_ERROR_GENERIC;
|
||||
}
|
||||
}
|
||||
else {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
|
||||
return GWEN_ERROR_GENERIC;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
int _cbOpen(AQH_DATAFILE *df)
|
||||
{
|
||||
if (df) {
|
||||
AQH_DATAFILE_DIRECT *xdf;
|
||||
|
||||
xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df);
|
||||
if (xdf) {
|
||||
GWEN_SYNCIO *sio;
|
||||
int rv;
|
||||
int64_t len;
|
||||
|
||||
if (xdf->sio) {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "File already open");
|
||||
return GWEN_ERROR_INVALID;
|
||||
}
|
||||
|
||||
sio=GWEN_SyncIo_File_new(xdf->fileName, GWEN_SyncIo_File_CreationMode_OpenExisting);
|
||||
GWEN_SyncIo_SetFlags(sio, GWEN_SYNCIO_FILE_FLAGS_WRITE | GWEN_SYNCIO_FILE_FLAGS_READ);
|
||||
rv=GWEN_SyncIo_Connect(sio);
|
||||
if (rv<0) {
|
||||
DBG_INFO(AQH_LOGDOMAIN, "Error opening file \"%s\" (%d)", xdf->fileName, rv);
|
||||
GWEN_SyncIo_free(sio);
|
||||
return rv;
|
||||
}
|
||||
|
||||
len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_End);
|
||||
if (len<0) {
|
||||
DBG_INFO(AQH_LOGDOMAIN, "Error seeking to end of file \"%s\" (%d)", xdf->fileName, rv);
|
||||
GWEN_SyncIo_free(sio);
|
||||
return rv;
|
||||
}
|
||||
if (len % AQH_DATAFILE_RECORDSIZE) {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Invalid file size of file \"%s\" (not a multiple of current record size)", xdf->fileName);
|
||||
GWEN_SyncIo_Disconnect(sio);
|
||||
GWEN_SyncIo_free(sio);
|
||||
unlink(xdf->fileName);
|
||||
return GWEN_ERROR_BAD_DATA;
|
||||
}
|
||||
xdf->numberOfEntries=len/AQH_DATAFILE_RECORDSIZE;
|
||||
|
||||
len=GWEN_SyncIo_File_Seek(sio, 0, GWEN_SyncIo_File_Whence_Set);
|
||||
if (len<0) {
|
||||
DBG_INFO(AQH_LOGDOMAIN, "Error seeking to begin of file \"%s\" (%d)", xdf->fileName, rv);
|
||||
GWEN_SyncIo_free(sio);
|
||||
return rv;
|
||||
}
|
||||
xdf->sio=sio;
|
||||
return 0;
|
||||
}
|
||||
else {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Not of type AQH_DATAFILE_DIRECT");
|
||||
return GWEN_ERROR_GENERIC;
|
||||
}
|
||||
}
|
||||
else {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
|
||||
return GWEN_ERROR_GENERIC;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
int _cbClose(AQH_DATAFILE *df)
|
||||
{
|
||||
if (df) {
|
||||
AQH_DATAFILE_DIRECT *xdf;
|
||||
|
||||
xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df);
|
||||
if (xdf) {
|
||||
int rv;
|
||||
|
||||
if (xdf->sio==NULL) {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "File not open");
|
||||
return GWEN_ERROR_INVALID;
|
||||
}
|
||||
|
||||
rv=GWEN_SyncIo_Disconnect(xdf->sio);
|
||||
if (rv<0) {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Error closing file \"%s\" (%d)", xdf->fileName, rv);
|
||||
GWEN_SyncIo_free(xdf->sio);
|
||||
xdf->sio=NULL;
|
||||
xdf->numberOfEntries=0;
|
||||
return rv;
|
||||
}
|
||||
GWEN_SyncIo_free(xdf->sio);
|
||||
xdf->sio=NULL;
|
||||
xdf->numberOfEntries=0;
|
||||
return 0;
|
||||
}
|
||||
else {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Not of type AQH_DATAFILE_DIRECT");
|
||||
return GWEN_ERROR_GENERIC;
|
||||
}
|
||||
}
|
||||
else {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
|
||||
return GWEN_ERROR_GENERIC;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
int _cbReadRecord(AQH_DATAFILE *df, uint64_t idx, uint64_t *pTimestamp, double *pValue)
|
||||
{
|
||||
if (df) {
|
||||
AQH_DATAFILE_DIRECT *xdf;
|
||||
|
||||
xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df);
|
||||
if (xdf) {
|
||||
int rv;
|
||||
|
||||
if (xdf->sio==NULL) {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "File not open");
|
||||
return GWEN_ERROR_INVALID;
|
||||
}
|
||||
|
||||
rv=_readRecord(xdf->sio, idx, pTimestamp, pValue);;
|
||||
if (rv<0) {
|
||||
DBG_INFO(AQH_LOGDOMAIN, "Error reading from file \"%s\" (%d)", xdf->fileName, rv);
|
||||
return rv;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
else {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Not of type AQH_DATAFILE_DIRECT");
|
||||
return GWEN_ERROR_GENERIC;
|
||||
}
|
||||
}
|
||||
else {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
|
||||
return GWEN_ERROR_GENERIC;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
int _cbAppendRecord(AQH_DATAFILE *df, uint64_t timestamp, double value)
|
||||
{
|
||||
if (df) {
|
||||
AQH_DATAFILE_DIRECT *xdf;
|
||||
|
||||
xdf=GWEN_INHERIT_GETDATA(AQH_DATAFILE, AQH_DATAFILE_DIRECT, df);
|
||||
if (xdf) {
|
||||
int rv;
|
||||
uint64_t filePos;
|
||||
|
||||
if (xdf->sio==NULL) {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "File not open");
|
||||
return GWEN_ERROR_INVALID;
|
||||
}
|
||||
|
||||
filePos=(xdf->numberOfEntries)*AQH_DATAFILE_RECORDSIZE;
|
||||
rv=_writeRecord(xdf->sio, filePos, timestamp, value);
|
||||
if (rv<0) {
|
||||
DBG_INFO(AQH_LOGDOMAIN, "Error writing to file \"%s\" (%d)", xdf->fileName, rv);
|
||||
return rv;
|
||||
}
|
||||
|
||||
xdf->numberOfEntries++;
|
||||
DBG_DEBUG(AQH_LOGDOMAIN, "File \"%s\" has now %lu entries", xdf->fileName, (unsigned long int) (xdf->numberOfEntries));
|
||||
return 0;
|
||||
}
|
||||
else {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Not of type AQH_DATAFILE_DIRECT");
|
||||
return GWEN_ERROR_GENERIC;
|
||||
}
|
||||
}
|
||||
else {
|
||||
DBG_ERROR(AQH_LOGDOMAIN, "Nullpointer");
|
||||
return GWEN_ERROR_GENERIC;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
int _readRecord(GWEN_SYNCIO *sio, uint64_t idx, uint64_t *pTimestamp, double *pValue)
|
||||
{
|
||||
uint8_t record[AQH_DATAFILE_RECORDSIZE];
|
||||
union {double f; uint64_t i;} u;
|
||||
uint64_t filePos;
|
||||
int64_t retPos;
|
||||
const uint8_t *ptr;
|
||||
uint64_t v;
|
||||
int rv;
|
||||
|
||||
filePos=idx*AQH_DATAFILE_RECORDSIZE;
|
||||
retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set);
|
||||
if (retPos<0) {
|
||||
DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos);
|
||||
return (int) retPos;
|
||||
}
|
||||
|
||||
rv=GWEN_SyncIo_ReadForced(sio, record, AQH_DATAFILE_RECORDSIZE);
|
||||
if (rv<0) {
|
||||
DBG_INFO(AQH_LOGDOMAIN, "Error reading at pos %lu (%d)", (unsigned long int) filePos, rv);
|
||||
return rv;
|
||||
}
|
||||
|
||||
ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP;
|
||||
v=0;
|
||||
v|=(uint64_t)(*(ptr++));
|
||||
v|=(uint64_t)(*(ptr++))<<8;
|
||||
v|=(uint64_t)(*(ptr++))<<16;
|
||||
v|=(uint64_t)(*(ptr++))<<24;
|
||||
v|=(uint64_t)(*(ptr++))<<32;
|
||||
v|=(uint64_t)(*(ptr++))<<40;
|
||||
v|=(uint64_t)(*(ptr++))<<48;
|
||||
v|=(uint64_t)(*(ptr++))<<56;
|
||||
*pTimestamp=v;
|
||||
|
||||
ptr=record+AQH_DATAFILE_OFFSET_VALUE;
|
||||
v=0;
|
||||
v|=(uint64_t)(*(ptr++));
|
||||
v|=(uint64_t)(*(ptr++))<<8;
|
||||
v|=(uint64_t)(*(ptr++))<<16;
|
||||
v|=(uint64_t)(*(ptr++))<<24;
|
||||
v|=(uint64_t)(*(ptr++))<<32;
|
||||
v|=(uint64_t)(*(ptr++))<<40;
|
||||
v|=(uint64_t)(*(ptr++))<<48;
|
||||
v|=(uint64_t)(*(ptr++))<<56;
|
||||
u.i=v;
|
||||
*pValue=u.f;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int _writeRecord(GWEN_SYNCIO *sio, uint64_t filePos, uint64_t timestamp, double value)
|
||||
{
|
||||
uint8_t record[AQH_DATAFILE_RECORDSIZE];
|
||||
union {double f; uint64_t i;} u;
|
||||
uint8_t *ptr;
|
||||
int64_t retPos;
|
||||
uint64_t v;
|
||||
int rv;
|
||||
|
||||
ptr=record+AQH_DATAFILE_OFFSET_TIMESTAMP;
|
||||
v=timestamp;
|
||||
*(ptr++)=(v) & 0xff;
|
||||
*(ptr++)=(v>>8) & 0xff;
|
||||
*(ptr++)=(v>>16) & 0xff;
|
||||
*(ptr++)=(v>>24) & 0xff;
|
||||
*(ptr++)=(v>>32) & 0xff;
|
||||
*(ptr++)=(v>>40) & 0xff;
|
||||
*(ptr++)=(v>>48) & 0xff;
|
||||
*(ptr++)=(v>>56) & 0xff;
|
||||
|
||||
u.f=value;
|
||||
v=u.i;
|
||||
ptr=record+AQH_DATAFILE_OFFSET_VALUE;
|
||||
*(ptr++)=(v) & 0xff;
|
||||
*(ptr++)=(v>>8) & 0xff;
|
||||
*(ptr++)=(v>>16) & 0xff;
|
||||
*(ptr++)=(v>>24) & 0xff;
|
||||
*(ptr++)=(v>>32) & 0xff;
|
||||
*(ptr++)=(v>>40) & 0xff;
|
||||
*(ptr++)=(v>>48) & 0xff;
|
||||
*(ptr++)=(v>>56) & 0xff;
|
||||
|
||||
retPos=GWEN_SyncIo_File_Seek(sio, filePos, GWEN_SyncIo_File_Whence_Set);
|
||||
if (retPos<0) {
|
||||
DBG_INFO(AQH_LOGDOMAIN, "Error seeking to pos %lu (%d)", (unsigned long int) filePos, (int) retPos);
|
||||
return (int) retPos;
|
||||
}
|
||||
rv=GWEN_SyncIo_WriteForced(sio, record, AQH_DATAFILE_RECORDSIZE);
|
||||
if (rv<0) {
|
||||
DBG_INFO(AQH_LOGDOMAIN, "Error writing to file (%d)", rv);
|
||||
return rv;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user