Started working on aqhome-nodes which will replace aqhomed.

This commit is contained in:
Martin Preuss
2023-09-13 23:31:02 +02:00
parent 161b979e84
commit 9b7d043682
49 changed files with 3143 additions and 258 deletions

View File

@@ -15,7 +15,7 @@
#include "aqhome/msg/msg_node.h"
#include "aqhome/ipc/msg_ipc_result.h"
#include "aqhome/ipc/data/msg_data_datapoints.h"
#include "aqhome/ipc/data/msg_data_multidata.h"
#include "aqhome/ipc/data/ipc_data.h"
#include <gwenhywfar/args.h>
@@ -296,7 +296,7 @@ void _sendCommand(GWEN_MSG_ENDPOINT *epTcp, const char *valueName, const char *v
arrayToSend[0]=timestampToSend;
arrayToSend[1]=u.i;
msgOut=AQH_DataPointsDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, 0, 0, valueName, valueUnits, arrayToSend, 1);
msgOut=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, valueName, valueUnits, 0, arrayToSend, 1);
GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut);
}

View File

@@ -15,8 +15,10 @@
#include "aqhome/msg/msg_node.h"
#include "aqhome/ipc/msg_ipc_result.h"
#include "aqhome/ipc/data/msg_data_datapoints.h"
#include "aqhome/ipc/data/msg_data_value.h"
#include "aqhome/ipc/data/msg_data_singledata.h"
#include "aqhome/ipc/data/ipc_data.h"
#include "aqhome/ipc/msg_ipc_tag16.h"
#include <gwenhywfar/args.h>
#include <gwenhywfar/i18n.h>
@@ -31,11 +33,22 @@
#define I18N(msg) GWEN_I18N_Translate(PACKAGE, msg)
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static int _doGetLastDataPoint(GWEN_DB_NODE *dbArgs);
static void _sendCommand(GWEN_MSG_ENDPOINT *epTcp, const char *valueName);
static int _awaitAndHandleResponse(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds);
static int _handleDataResponse(const GWEN_MSG *msg);
/* ------------------------------------------------------------------------------------------------
* code
* ------------------------------------------------------------------------------------------------
*/
int AQH_Tool_GetLastDataPoint(GWEN_DB_NODE *dbGlobalArgs, int argc, char **argv)
{
@@ -163,8 +176,8 @@ int _doGetLastDataPoint(GWEN_DB_NODE *dbArgs)
{
GWEN_MSG_ENDPOINT *epTcp;
int timeoutInSeconds;
GWEN_MSG *msg;
const char *valueName;
int rv;
timeoutInSeconds=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 5);
valueName=GWEN_DB_GetCharValue(dbArgs, "valueName", 0, NULL);
@@ -179,46 +192,12 @@ int _doGetLastDataPoint(GWEN_DB_NODE *dbArgs)
_sendCommand(epTcp, valueName);
for (;;) {
uint16_t code;
msg=Utils_WaitForSpecificIpcMessage(epTcp, AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP, timeoutInSeconds);
if (msg==NULL) {
DBG_ERROR(NULL, "No response received");
return 2;
}
code=GWEN_IpcMsg_GetCode(msg);
if (code==AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP) {
if (AQH_DataPointsDataIpcMsg_IsValid(msg)) {
Utils_PrintDataPoints(AQH_DataPointsDataIpcMsg_GetDataPoints(msg),
AQH_DataPointsDataIpcMsg_GetNumValues(msg),
AQH_DataPointsDataIpcMsg_GetUnits(msg));
if (AQH_DataPointsDataIpcMsg_GetFlags(msg) & AQH_MSGDATA_DATAPOINTS_FLAGS_LASTMSG) {
DBG_INFO(NULL, "Last message received");
break;
}
}
else {
DBG_ERROR(NULL, "Invalid message received");
GWEN_MsgEndpoint_free(epTcp);
return 3;
}
}
else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
uint32_t resultCode;
resultCode=AQH_ResultIpcMsg_GetResultCode(msg);
fprintf(stderr, "ERROR: %d\n", resultCode);
GWEN_MsgEndpoint_free(epTcp);
return 3;
}
else {
DBG_INFO(NULL, "Unexpected message \"%d\"", code);
GWEN_MsgEndpoint_free(epTcp);
return 3;
}
} /* for */
rv=_awaitAndHandleResponse(epTcp, timeoutInSeconds);
if (rv!=0) {
DBG_INFO(NULL, "here (%d)", rv);
GWEN_MsgEndpoint_free(epTcp);
return rv;
}
GWEN_MsgEndpoint_free(epTcp);
return 0;
}
@@ -229,26 +208,82 @@ void _sendCommand(GWEN_MSG_ENDPOINT *epTcp, const char *valueName)
{
GWEN_MSG *msgOut;
msgOut=AQH_DataPointsDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ, 0, 0, valueName, NULL, NULL, 0);
msgOut=AQH_ValueDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ, valueName, NULL, 0);
GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut);
}
uint64_t _getTimeStampFromString(const char *s)
int _awaitAndHandleResponse(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds)
{
if (s && *s) {
unsigned long int x;
GWEN_MSG *msg;
uint16_t code;
if (1!=sscanf("%lu", s, &x)) {
DBG_ERROR(NULL, "ERROR: Invalid timestamp");
return (uint64_t) (-1);
msg=Utils_WaitForSpecificIpcMessage(epTcp, AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP, timeoutInSeconds);
if (msg) {
code=GWEN_IpcMsg_GetCode(msg);
if (code==AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP) {
int rv;
rv=_handleDataResponse(msg);
GWEN_Msg_free(msg);
return rv;
}
else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
uint32_t resultCode;
resultCode=AQH_ResultIpcMsg_GetResultCode(msg);
fprintf(stderr, "ERROR: %d\n", resultCode);
GWEN_Msg_free(msg);
return 3;
}
else {
DBG_INFO(NULL, "Unexpected message \"%d\"", code);
GWEN_Msg_free(msg);
return 3;
}
return (uint64_t) x;
}
return 0;
else {
DBG_ERROR(NULL, "No response received");
return 2;
}
}
int _handleDataResponse(const GWEN_MSG *msg)
{
GWEN_TAG16_LIST *tagList;
tagList=AQH_Tag16IpcMsg_ParseTags(msg, 0);
if (tagList) {
const GWEN_TAG16 *tag;
char *valueUnits;
uint64_t timestamp;
union {double f; uint64_t i;} u;
tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_UNITS);
valueUnits=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL;
tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_TIME);
timestamp=tag?GWEN_Tag16_GetTagDataAsUint64(tag, 0):0;
tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_DATA);
u.i=tag?GWEN_Tag16_GetTagDataAsUint64(tag, 0):0;
Utils_PrintSingleDataPoint(timestamp, u.f, valueUnits);
free(valueUnits);
return 0;
}
else {
DBG_ERROR(NULL, "Invalid message received");
return 3;
}
}