Heavy work on IPC.

We will now have a broker (aqhome-data) which stores data and distributes
value change messages among connected clients.
aqhomed will connect to that broker and send its values there.
aqhome-mqtt will also connect to the broker and send its values there.
Other clients can later connect to check for changes and react according
to rules.
This commit is contained in:
Martin Preuss
2023-09-10 23:13:03 +02:00
parent 2b733a52ca
commit 518a3a53f9
43 changed files with 1412 additions and 707 deletions

View File

@@ -21,6 +21,9 @@
#include <gwenhywfar/json_read.h>
#define AQH_STORAGE_DATAPOINTS_STEPS 128
/* ------------------------------------------------------------------------------------------------
* forward declarations
@@ -219,6 +222,7 @@ void AQH_Storage_AddValue(AQH_STORAGE *sto, AQH_VALUE *value)
id=++(sto->lastValueId);
AQH_Value_SetId(value, id);
AQH_Value_List_Add(value, sto->valueList);
AQH_Storage_AddRuntimeFlags(sto, AQH_STORAGE_RTFLAGS_MODIFIED);
}
}
@@ -238,9 +242,9 @@ AQH_VALUE *AQH_Storage_GetValueById(const AQH_STORAGE *sto, uint64_t id)
AQH_VALUE *AQH_Storage_GetValueByName(const AQH_STORAGE *sto, const char *s)
AQH_VALUE *AQH_Storage_GetValueByNameForSystem(const AQH_STORAGE *sto, const char *s)
{
return sto?AQH_Value_List_GetByName(sto->valueList, s):NULL;
return sto?AQH_Value_List_GetByNameForSystem(sto->valueList, s):NULL;
}
@@ -377,6 +381,96 @@ int AQH_Storage_AddDatapoint(AQH_STORAGE *sto, uint64_t valueId, uint64_t timest
uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueId, uint64_t fromTime, uint64_t toTime, uint64_t maxArrayLen)
{
AQH_DATAFILE *df;
uint64_t numEntries;
uint64_t arrayLen;
uint64_t arrayPos;
uint64_t *arrayPtr;
uint64_t i;
df=_getDataFileByValueId(sto, valueId);
if (df==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "No file for value id %lu", (unsigned long int) valueId);
return NULL;
}
numEntries=AQH_DataFile_GetNumberOfEntries(df);
if (fromTime==0 && toTime==0)
arrayLen=numEntries+1;
else
arrayLen=AQH_STORAGE_DATAPOINTS_STEPS+1;
if (arrayLen>maxArrayLen+1)
arrayLen=maxArrayLen+1;
arrayPtr=(uint64_t*) malloc(arrayLen*sizeof(uint64_t));
if (arrayPtr==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "Not enough memory for %lu entries", (unsigned long int) arrayLen);
free(arrayPtr);
return NULL;
}
arrayPos=1;
for (i=1; i<numEntries; i++) { /* first entry in datafile is reserved */
union {double f; uint64_t i;} u;
uint64_t ts;
int rv;
DBG_INFO(NULL, "Reading record %lu", (unsigned long int) i);
rv=AQH_DataFile_ReadRecord(df, i, &ts, &(u.f));
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "here (%d)", rv);
free(arrayPtr);
return NULL;
}
DBG_INFO(NULL, "Read record %lu (%lu - %lf)", (unsigned long int) i, (unsigned long int) ts, u.f);
if ((fromTime==0 || ts>=fromTime) && (toTime==0 || ts<=toTime)) {
DBG_INFO(NULL, "Will add record %lu", (unsigned long int) i);
if (arrayPos>maxArrayLen) {
DBG_INFO(AQH_LOGDOMAIN, "Limit for number of returned entries reached");
break;
}
if (arrayPos+1>=arrayLen) {
uint64_t newArrayLen;
void *p;
newArrayLen=arrayLen+AQH_STORAGE_DATAPOINTS_STEPS;
if (newArrayLen>maxArrayLen+1)
newArrayLen=maxArrayLen+1;
if (newArrayLen==arrayLen) {
DBG_INFO(AQH_LOGDOMAIN, "Limit for number of returned entries reached");
break;
}
p=realloc((void*) arrayPtr, newArrayLen*sizeof(uint64_t));
if (p==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "Not enough memory for %lu entries", (unsigned long int) arrayLen+AQH_STORAGE_DATAPOINTS_STEPS);
free(arrayPtr);
return NULL;
}
arrayPtr=(uint64_t*) p;
arrayLen=newArrayLen;
}
arrayPtr[arrayPos++]=ts;
arrayPtr[arrayPos++]=u.i;
}
else {
DBG_INFO(NULL, "Entry %lu does not match", (unsigned long int) i);
}
} /* for */
if (arrayPos<=1) {
DBG_INFO(AQH_LOGDOMAIN, "No matching records");
free(arrayPtr);
return NULL;
}
arrayPtr[0]=arrayPos-1;
return arrayPtr;
}
void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *sTopic, const char *sValue)
{
if (sto) {
@@ -470,7 +564,7 @@ void _handleValueForJsonElement(AQH_STORAGE *sto,
}
}
else {
DBG_INFO(AQH_LOGDOMAIN, "No datapath in value \"%s\"", AQH_Value_GetName(value));
DBG_INFO(AQH_LOGDOMAIN, "No datapath in value \"%s\"", AQH_Value_GetNameForSystem(value));
}
}

View File

@@ -67,7 +67,7 @@ AQHOME_API AQH_MQTT_TOPIC *AQH_Storage_GetMqttTopicByTopic(const AQH_STORAGE *st
AQHOME_API void AQH_Storage_AddValue(AQH_STORAGE *sto, AQH_VALUE *value);
AQHOME_API AQH_VALUE_LIST *AQH_Storage_GetValueList(const AQH_STORAGE *sto);
AQHOME_API AQH_VALUE *AQH_Storage_GetValueById(const AQH_STORAGE *sto, uint64_t id);
AQHOME_API AQH_VALUE *AQH_Storage_GetValueByName(const AQH_STORAGE *sto, const char *s);
AQHOME_API AQH_VALUE *AQH_Storage_GetValueByNameForSystem(const AQH_STORAGE *sto, const char *s);
AQHOME_API const char *AQH_Storage_GetStateFile(const AQH_STORAGE *sto);
AQHOME_API void AQH_Storage_SetStateFile(AQH_STORAGE *sto, const char *s);
@@ -87,6 +87,9 @@ AQHOME_API int AQH_Storage_Fini(AQH_STORAGE *sto);
AQHOME_API int AQH_Storage_WriteState(AQH_STORAGE *sto);
AQHOME_API int AQH_Storage_AddDatapoint(AQH_STORAGE *sto, uint64_t valueId, uint64_t timestamp, double dataPoint);
AQHOME_API uint64_t *AQH_Storage_GetDataPoints(AQH_STORAGE *sto, uint64_t valueId,
uint64_t fromTime, uint64_t toTime,
uint64_t maxArrayLen);
AQHOME_API void AQH_Storage_HandleMqttPublish(AQH_STORAGE *sto, const char *topic, const char *value);

View File

@@ -49,7 +49,21 @@
<flags>with_getbymember</flags>
</member>
<member name="name" type="char_ptr" maxlen="32">
<member name="driver" type="char_ptr" maxlen="32">
<default>0</default>
<preset>0</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="nameForDriver" type="char_ptr" maxlen="128">
<default>0</default>
<preset>0</preset>
<access>public</access>
<flags>own</flags>
</member>
<member name="nameForSystem" type="char_ptr" maxlen="128">
<default>0</default>
<preset>0</preset>
<access>public</access>