From a808450fa22fe1a8d6a9d185088f831c0d781995 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Wed, 25 Jun 2025 00:03:09 +0200 Subject: [PATCH] add dataclient sublib to be used by multiple tools. --- aqhome/0BUILD | 2 + aqhome/dataclient/0BUILD | 81 ++++++ aqhome/dataclient/client.c | 520 +++++++++++++++++++++++++++++++++++ aqhome/dataclient/client.h | 45 +++ aqhome/dataclient/client_p.h | 28 ++ 5 files changed, 676 insertions(+) create mode 100644 aqhome/dataclient/0BUILD create mode 100644 aqhome/dataclient/client.c create mode 100644 aqhome/dataclient/client.h create mode 100644 aqhome/dataclient/client_p.h diff --git a/aqhome/0BUILD b/aqhome/0BUILD index d33f5d3..fbb58e8 100644 --- a/aqhome/0BUILD +++ b/aqhome/0BUILD @@ -69,6 +69,7 @@ hexfile data events2 + dataclient @@ -79,6 +80,7 @@ aqhhexfile aqhdata aqhevents2 + aqhdataclient diff --git a/aqhome/dataclient/0BUILD b/aqhome/dataclient/0BUILD new file mode 100644 index 0000000..5300f9d --- /dev/null +++ b/aqhome/dataclient/0BUILD @@ -0,0 +1,81 @@ + + + + + + + + $(gwenhywfar_cflags) + $(aqdatabase_cflags) + -I$(topsrcdir) + -I$(topbuilddir) + + + + --include=$(builddir) + --include=$(srcdir) + --include=$(aqdatabase_AQDATABASE_TYPEMAKERDIR)/c + + + + + + $(visibility_cflags) + + + + --api=AQHOME_API + + + + + + + + + + + + + + + + + + $(local/built_headers_pub) + + + + + client.h + + + + + client_p.h + + + + + $(local/typefiles) + + client.c + + + + + + + + + + + + + + + + + + + diff --git a/aqhome/dataclient/client.c b/aqhome/dataclient/client.c new file mode 100644 index 0000000..780265a --- /dev/null +++ b/aqhome/dataclient/client.c @@ -0,0 +1,520 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./client_p.h" + +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/m_ipc_tag16.h" +#include "aqhome/msg/ipc/m_ipc_result.h" +#include "aqhome/msg/ipc/data/m_ipcd.h" +#include "aqhome/msg/ipc/nodes/m_ipcn.h" +#include "aqhome/msg/ipc/m_ipc_connect.h" +#include "aqhome/msg/ipc/data/m_ipcd_devices.h" +#include "aqhome/msg/ipc/data/m_ipcd_values.h" +#include "aqhome/msg/ipc/data/m_ipcd_getvalues.h" +#include "aqhome/msg/ipc/data/m_ipcd_getdata.h" +#include "aqhome/msg/ipc/data/m_ipcd_multidata.h" +#include "aqhome/msg/ipc/data/m_ipcd_setdata.h" +#include "aqhome/ipc2/tcp_object.h" +#include "aqhome/ipc2/ipc_client.h" +#include + +#include +#include +#include + + +#define AQH_DATA_CLIENT_DEFAULT_CMD_TIMEOUT 5 + + + +GWEN_INHERIT(AQH_OBJECT, AQH_DATA_CLIENT) + + + + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); + +static int _connectEndpoint(AQH_OBJECT *o, const char *addr, int port, uint32_t flags); +static int _exchangeConnectMsgs(AQH_DATA_CLIENT *xo, const char *userId, const char *passwd, const char *clientId, uint32_t flags); +static uint64_t _getFirstOrLastData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum, int mode); +static uint64_t _handleDataResponses(AQH_DATA_CLIENT *xo, uint64_t *dataPtr, uint64_t maxNum, uint32_t msgId); +static int _handleResult(AQH_DATA_CLIENT *xo, uint32_t msgId); + + + + + + +AQH_OBJECT *AQH_DataClient_new(AQH_EVENT_LOOP *eventLoop, uint8_t protoId, uint8_t protoVer) +{ + AQH_OBJECT *o; + AQH_DATA_CLIENT *xo; + + o=AQH_Object_new(eventLoop); + GWEN_NEW_OBJECT(AQH_DATA_CLIENT, xo); + GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o, xo, _freeData); + + xo->protoId=protoId; + xo->protoVer=protoVer; + xo->timeoutInSeconds=AQH_DATA_CLIENT_DEFAULT_CMD_TIMEOUT; + + return 0; +} + + + +void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) +{ + AQH_DATA_CLIENT *xo; + + xo=(AQH_DATA_CLIENT*)p; + AQH_Object_free(xo->ipcEndpoint); + GWEN_FREE_OBJECT(xo); +} + + + +int AQH_DataClient_Connect(AQH_OBJECT *o, + const char *addr, int port, + const char *userId, const char *passwd, + const char *clientId, + uint32_t flags) +{ + AQH_DATA_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); + if (xo) { + int rv; + + AQH_Object_free(xo->ipcEndpoint); + xo->ipcEndpoint=NULL; + + rv=_connectEndpoint(o, addr, port, 0 /* connection flags */); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + + rv=_exchangeConnectMsgs(xo, userId, passwd, clientId, flags); + if (rv<0) { + AQH_Object_free(xo->ipcEndpoint); + xo->ipcEndpoint=NULL; + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + return 0; + } + return GWEN_ERROR_INVALID; +} + + + +int AQH_DataClient_Disconnect(AQH_OBJECT *o) +{ + AQH_DATA_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); + if (xo) { + AQH_Object_free(xo->ipcEndpoint); + xo->ipcEndpoint=NULL; + return 0; + } + return GWEN_ERROR_INVALID; +} + + + +void AQH_DataClient_SetTimeout(AQH_OBJECT *o, int i) +{ + AQH_DATA_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); + if (xo) { + xo->timeoutInSeconds=i; + } +} + + + +AQH_DEVICE_LIST *AQH_DataClient_GetDevices(AQH_OBJECT *o) +{ + AQH_DATA_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); + if (xo) { + AQH_MESSAGE *msgOut; + AQH_MESSAGE *msgIn; + uint32_t msgId; + AQH_DEVICE_LIST *fullDeviceList; + + fullDeviceList=AQH_Device_List_new(); + msgId=++(xo->lastMsgId); + msgOut=AQH_IpcMessage_new(xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ, msgId, 0, 0, NULL); + AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); + + while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(xo->ipcEndpoint, msgId, xo->timeoutInSeconds)) ) { + GWEN_TAG16_LIST *tagList; + + tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); + if (tagList) { + uint16_t code; + + code=AQH_IpcMessage_GetCode(msgIn); + if (code==AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP) { + AQH_DEVICE_LIST *deviceList; + + deviceList=AQH_IpcdMessageDevices_ReadDeviceList(tagList); + if (deviceList) { + AQH_Device_List_AddList(fullDeviceList, deviceList); + AQH_Device_List_free(deviceList); + } + if (AQH_IpcdMessageDevices_GetFlags(tagList) & AQH_MSGDATA_DEVICES_FLAGS_LASTMSG) { + GWEN_Tag16_List_free(tagList); + AQH_Message_free(msgIn); + break; + } + } + else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { + DBG_ERROR(NULL, "Server Error: %d", AQH_IpcMessageResult_GetResult(tagList)); + GWEN_Tag16_List_free(tagList); + AQH_Message_free(msgIn); + AQH_Device_List_free(fullDeviceList); + return NULL; + } + else { + DBG_INFO(NULL, "Ignoring message \"%d\"", code); + } + GWEN_Tag16_List_free(tagList); + } + AQH_Message_free(msgIn); + } /* while */ + + if (AQH_Device_List_GetCount(fullDeviceList)>0) + return fullDeviceList; + AQH_Device_List_free(fullDeviceList); + } + return NULL; +} + + + +AQH_VALUE_LIST *AQH_DataClient_GetValues(AQH_OBJECT *o, const char *deviceName, int modality) +{ + AQH_DATA_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); + if (xo) { + AQH_MESSAGE *msgOut; + AQH_MESSAGE *msgIn; + uint32_t msgId; + AQH_VALUE_LIST *fullValueList; + + fullValueList=AQH_Value_List_new(); + msgId=++(xo->lastMsgId); + msgOut=AQH_IpcdMessageGetValues_new(AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ, msgId, 0, deviceName, modality); + AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); + + while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(xo->ipcEndpoint, msgId, xo->timeoutInSeconds)) ) { + GWEN_TAG16_LIST *tagList; + + tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); + if (tagList) { + uint16_t code; + + code=AQH_IpcMessage_GetCode(msgIn); + if (code==AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP) { + AQH_VALUE_LIST *valueList; + + valueList=AQH_IpcdMessageValues_ReadValueList(tagList); + if (valueList) { + AQH_Value_List_AddList(fullValueList, valueList); + AQH_Value_List_free(valueList); + } + if (AQH_IpcdMessageValues_GetFlags(tagList) & AQH_MSGDATA_VALUES_FLAGS_LASTMSG) { + GWEN_Tag16_List_free(tagList); + AQH_Message_free(msgIn); + break; + } + } + else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { + DBG_ERROR(NULL, "Server Error: %d", AQH_IpcMessageResult_GetResult(tagList)); + GWEN_Tag16_List_free(tagList); + AQH_Message_free(msgIn); + AQH_Value_List_free(fullValueList); + return NULL; + } + else { + DBG_INFO(NULL, "Ignoring message \"%d\"", code); + } + GWEN_Tag16_List_free(tagList); + } + AQH_Message_free(msgIn); + } /* while */ + + if (AQH_Value_List_GetCount(fullValueList)>0) + return fullValueList; + AQH_Value_List_free(fullValueList); + } + return NULL; +} + + + +uint64_t AQH_DataClient_GetFirstData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum) +{ + return _getFirstOrLastData(o, valueName, dataPtr, maxNum, AQH_MSGDATA_GETDATA_MODE_FIRST); +} + + + +uint64_t AQH_DataClient_GetLastData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum) +{ + return _getFirstOrLastData(o, valueName, dataPtr, maxNum, AQH_MSGDATA_GETDATA_MODE_LAST); +} + + + +uint64_t AQH_DataClient_GetPeriodData(AQH_OBJECT *o, const char *valueName, + uint64_t *dataPtr, uint64_t maxNum, + uint64_t tsBegin, uint64_t tsEnd) +{ + AQH_DATA_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); + if (xo) { + AQH_MESSAGE *msgOut; + uint32_t msgId; + + msgId=++(xo->lastMsgId); + msgOut=AQH_IpcdMessageGetData_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ, + msgId, 0, + AQH_MSGDATA_GETDATA_MODE_PERIOD, + valueName, tsBegin, tsEnd, maxNum); + AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); + + return _handleDataResponses(xo, dataPtr, maxNum, msgId); + } + return 0; +} + + + +int AQH_DataClient_SetData(AQH_OBJECT *o, const AQH_VALUE *v, const char *data) +{ + AQH_DATA_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); + if (xo) { + AQH_MESSAGE *msgOut; + uint32_t msgId; + + msgId=++(xo->lastMsgId); + msgOut=AQH_IpcdMessageSetData_new(AQH_MSGTYPE_IPC_DATA_SETDATA, msgId, 0, v, data); + AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); + + return _handleResult(xo, msgId); + } + + return GWEN_ERROR_INVALID; +} + + + +int AQH_DataClient_UpdateData(AQH_OBJECT *o, const AQH_VALUE *v, uint64_t timeStamp, double dataPoint) +{ + AQH_DATA_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); + if (xo) { + AQH_MESSAGE *msgOut; + uint32_t msgId; + + msgId=++(xo->lastMsgId); + msgOut=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgId, 0, v, timeStamp, dataPoint); + AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); + + return _handleResult(xo, msgId); + } + + return GWEN_ERROR_INVALID; +} + + + + + + +int _connectEndpoint(AQH_OBJECT *o, const char *addr, int port, uint32_t flags) +{ + AQH_DATA_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); + if (xo) { + AQH_OBJECT *ep; + int fd; + + fd=AQH_TcpObject_CreateConnectedSocket(addr, port); + if (fd<0) { + DBG_ERROR(NULL, "Error connecting to broker server %s:%d", addr, port); + return GWEN_ERROR_IO; + } + + ep=AQH_IpcClientObject_new(AQH_Object_GetEventLoop(o), fd); + assert(ep); + AQH_Endpoint_AddFlags(ep, flags); + xo->ipcEndpoint=ep; + return 0; + } + + return GWEN_ERROR_INVALID; +} + + + +int _exchangeConnectMsgs(AQH_DATA_CLIENT *xo, const char *userId, const char *passwd, const char *clientId, uint32_t flags) +{ + AQH_MESSAGE *msgOut; + uint32_t msgId; + + DBG_INFO(NULL, "Sending connect message for proto=%d.%d", xo->protoId, xo->protoVer); + msgId=AQH_Endpoint_GetNextMessageId(xo->ipcEndpoint); + msgOut=AQH_IpcMessageConnect_new(xo->protoId, xo->protoVer, + AQH_MSGTYPE_IPC_CONNECT_REQ, + msgId, 0, + clientId, userId, passwd, flags); + AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); + return AQH_IpcEndpoint_WaitForResultMsg(xo->ipcEndpoint, + xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_RESULT, + msgId, xo->timeoutInSeconds); +} + + + +uint64_t _getFirstOrLastData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum, int mode) +{ + AQH_DATA_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); + if (xo) { + AQH_MESSAGE *msgOut; + uint32_t msgId; + + msgId=++(xo->lastMsgId); + msgOut=AQH_IpcdMessageGetData_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ, + msgId, 0, + mode, + valueName, 0, 0, maxNum); + AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); + + return _handleDataResponses(xo, dataPtr, maxNum, msgId); + } + return 0; +} + + + +uint64_t _handleDataResponses(AQH_DATA_CLIENT *xo, uint64_t *dataPtr, uint64_t maxNum, uint32_t msgId) +{ + AQH_MESSAGE *msgIn; + uint64_t fullNumberOfPoints=0; + + while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(xo->ipcEndpoint, msgId, xo->timeoutInSeconds)) ) { + GWEN_TAG16_LIST *tagList; + + tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); + if (tagList) { + uint16_t code; + + code=AQH_IpcMessage_GetCode(msgIn); + if (code==AQH_MSGTYPE_IPC_DATA_GETDATA_RSP) { + const uint64_t *recvDataPtr; + uint64_t recvNumberOfPoints; + + AQH_IpcdMessageMultiData_ReadDatapoints(tagList, &recvDataPtr, &recvNumberOfPoints); + if (recvNumberOfPoints) { + uint64_t i; + + for (i=0; iipcEndpoint, msgId, xo->timeoutInSeconds)) ) { + GWEN_TAG16_LIST *tagList; + + tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); + if (tagList) { + uint16_t code; + + code=AQH_IpcMessage_GetCode(msgIn); + if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { + int result; + + result=AQH_IpcMessageResult_GetResult(tagList); + DBG_INFO(NULL, "Server result: %d", result); + GWEN_Tag16_List_free(tagList); + AQH_Message_free(msgIn); + if (result!=AQH_MSGDATA_RESULT_SUCCESS) { + DBG_INFO(NULL, "here (%d)", result); + return GWEN_ERROR_GENERIC; + } + return 0; + } + else { + DBG_INFO(NULL, "Ignoring message \"%d\"", code); + } + GWEN_Tag16_List_free(tagList); + } + AQH_Message_free(msgIn); + } /* while */ + + return GWEN_ERROR_TIMEOUT; +} + + + + + diff --git a/aqhome/dataclient/client.h b/aqhome/dataclient/client.h new file mode 100644 index 0000000..85cbb77 --- /dev/null +++ b/aqhome/dataclient/client.h @@ -0,0 +1,45 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_DATA_CLIENT_H +#define AQHOME_DATA_CLIENT_H + +#include +#include +#include +#include + + + +AQHOME_API AQH_OBJECT *AQH_DataClient_new(AQH_EVENT_LOOP *eventLoop, uint8_t protoId, uint8_t protoVer); + +AQHOME_API void AQH_DataClient_SetTimeout(AQH_OBJECT *o, int i); + + +AQHOME_API int AQH_DataClient_Connect(AQH_OBJECT *o, + const char *addr, int port, + const char *userId, const char *passwd, + const char *clientId, + uint32_t flags); +AQHOME_API int AQH_DataClient_Disconnect(AQH_OBJECT *o); + + +AQHOME_API AQH_DEVICE_LIST *AQH_DataClient_GetDevices(AQH_OBJECT *o); +AQHOME_API AQH_VALUE_LIST *AQH_DataClient_GetValues(AQH_OBJECT *o, const char *deviceName, int modality); + +AQHOME_API uint64_t AQH_DataClient_GetFirstData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum); +AQHOME_API uint64_t AQH_DataClient_GetLastData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum); +AQHOME_API uint64_t AQH_DataClient_GetPeriodData(AQH_OBJECT *o, const char *valueName, + uint64_t *dataPtr, uint64_t maxNum, + uint64_t tsBegin, uint64_t tsEnd); + +AQHOME_API int AQH_DataClient_SetData(AQH_OBJECT *o, const AQH_VALUE *v, const char *data); +AQHOME_API int AQH_DataClient_UpdateData(AQH_OBJECT *o, const AQH_VALUE *v, uint64_t timeStamp, double dataPoint); + + +#endif diff --git a/aqhome/dataclient/client_p.h b/aqhome/dataclient/client_p.h new file mode 100644 index 0000000..fdfac3f --- /dev/null +++ b/aqhome/dataclient/client_p.h @@ -0,0 +1,28 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_DATA_CLIENT_P_H +#define AQHOME_DATA_CLIENT_P_H + +#include "aqhome/dataclient/client.h" + + +typedef struct AQH_DATA_CLIENT AQH_DATA_CLIENT; +struct AQH_DATA_CLIENT { + AQH_OBJECT *ipcEndpoint; + int timeoutInSeconds; + + uint8_t protoId; + uint8_t protoVer; + + uint32_t lastMsgId; +}; + + + +#endif