/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./client_p.h" #include "aqhome/msg/ipc/m_ipc.h" #include "aqhome/msg/ipc/m_ipc_tag16.h" #include "aqhome/msg/ipc/m_ipc_result.h" #include "aqhome/msg/ipc/data/m_ipcd.h" #include "aqhome/msg/ipc/nodes/m_ipcn.h" #include "aqhome/msg/ipc/m_ipc_connect.h" #include "aqhome/msg/ipc/data/m_ipcd_devices.h" #include "aqhome/msg/ipc/data/m_ipcd_values.h" #include "aqhome/msg/ipc/data/m_ipcd_getvalues.h" #include "aqhome/msg/ipc/data/m_ipcd_getdata.h" #include "aqhome/msg/ipc/data/m_ipcd_multidata.h" #include "aqhome/msg/ipc/data/m_ipcd_setdata.h" #include "aqhome/ipc2/tcp_object.h" #include "aqhome/ipc2/ipc_client.h" #include #include #include #include #define AQH_DATA_CLIENT_DEFAULT_CMD_TIMEOUT 5 GWEN_INHERIT(AQH_OBJECT, AQH_DATA_CLIENT) static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _connectEndpoint(AQH_OBJECT *o, const char *addr, int port, uint32_t flags); static int _exchangeConnectMsgs(AQH_DATA_CLIENT *xo, const char *userId, const char *passwd, const char *clientId, uint32_t flags); static uint64_t _getFirstOrLastData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum, int mode); static uint64_t _handleDataResponses(AQH_DATA_CLIENT *xo, uint64_t *dataPtr, uint64_t maxNum, uint32_t msgId); static int _handleResult(AQH_DATA_CLIENT *xo, uint32_t msgId); AQH_OBJECT *AQH_DataClient_new(AQH_EVENT_LOOP *eventLoop, uint8_t protoId, uint8_t protoVer) { AQH_OBJECT *o; AQH_DATA_CLIENT *xo; o=AQH_Object_new(eventLoop); GWEN_NEW_OBJECT(AQH_DATA_CLIENT, xo); GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o, xo, _freeData); xo->protoId=protoId; xo->protoVer=protoVer; xo->timeoutInSeconds=AQH_DATA_CLIENT_DEFAULT_CMD_TIMEOUT; return 0; } void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) { AQH_DATA_CLIENT *xo; xo=(AQH_DATA_CLIENT*)p; AQH_Object_free(xo->ipcEndpoint); GWEN_FREE_OBJECT(xo); } int AQH_DataClient_Connect(AQH_OBJECT *o, const char *addr, int port, const char *userId, const char *passwd, const char *clientId, uint32_t flags) { AQH_DATA_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); if (xo) { int rv; AQH_Object_free(xo->ipcEndpoint); xo->ipcEndpoint=NULL; rv=_connectEndpoint(o, addr, port, 0 /* connection flags */); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return rv; } rv=_exchangeConnectMsgs(xo, userId, passwd, clientId, flags); if (rv<0) { AQH_Object_free(xo->ipcEndpoint); xo->ipcEndpoint=NULL; DBG_INFO(NULL, "here (%d)", rv); return rv; } return 0; } return GWEN_ERROR_INVALID; } int AQH_DataClient_Disconnect(AQH_OBJECT *o) { AQH_DATA_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); if (xo) { AQH_Object_free(xo->ipcEndpoint); xo->ipcEndpoint=NULL; return 0; } return GWEN_ERROR_INVALID; } void AQH_DataClient_SetTimeout(AQH_OBJECT *o, int i) { AQH_DATA_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); if (xo) { xo->timeoutInSeconds=i; } } AQH_DEVICE_LIST *AQH_DataClient_GetDevices(AQH_OBJECT *o) { AQH_DATA_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); if (xo) { AQH_MESSAGE *msgOut; AQH_MESSAGE *msgIn; uint32_t msgId; AQH_DEVICE_LIST *fullDeviceList; fullDeviceList=AQH_Device_List_new(); msgId=++(xo->lastMsgId); msgOut=AQH_IpcMessage_new(xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ, msgId, 0, 0, NULL); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(xo->ipcEndpoint, msgId, xo->timeoutInSeconds)) ) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); if (tagList) { uint16_t code; code=AQH_IpcMessage_GetCode(msgIn); if (code==AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP) { AQH_DEVICE_LIST *deviceList; deviceList=AQH_IpcdMessageDevices_ReadDeviceList(tagList); if (deviceList) { AQH_Device_List_AddList(fullDeviceList, deviceList); AQH_Device_List_free(deviceList); } if (AQH_IpcdMessageDevices_GetFlags(tagList) & AQH_MSGDATA_DEVICES_FLAGS_LASTMSG) { GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); break; } } else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { DBG_ERROR(NULL, "Server Error: %d", AQH_IpcMessageResult_GetResult(tagList)); GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); AQH_Device_List_free(fullDeviceList); return NULL; } else { DBG_INFO(NULL, "Ignoring message \"%d\"", code); } GWEN_Tag16_List_free(tagList); } AQH_Message_free(msgIn); } /* while */ if (AQH_Device_List_GetCount(fullDeviceList)>0) return fullDeviceList; AQH_Device_List_free(fullDeviceList); } return NULL; } AQH_VALUE_LIST *AQH_DataClient_GetValues(AQH_OBJECT *o, const char *deviceName, int modality) { AQH_DATA_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); if (xo) { AQH_MESSAGE *msgOut; AQH_MESSAGE *msgIn; uint32_t msgId; AQH_VALUE_LIST *fullValueList; fullValueList=AQH_Value_List_new(); msgId=++(xo->lastMsgId); msgOut=AQH_IpcdMessageGetValues_new(AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ, msgId, 0, deviceName, modality); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(xo->ipcEndpoint, msgId, xo->timeoutInSeconds)) ) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); if (tagList) { uint16_t code; code=AQH_IpcMessage_GetCode(msgIn); if (code==AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP) { AQH_VALUE_LIST *valueList; valueList=AQH_IpcdMessageValues_ReadValueList(tagList); if (valueList) { AQH_Value_List_AddList(fullValueList, valueList); AQH_Value_List_free(valueList); } if (AQH_IpcdMessageValues_GetFlags(tagList) & AQH_MSGDATA_VALUES_FLAGS_LASTMSG) { GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); break; } } else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { DBG_ERROR(NULL, "Server Error: %d", AQH_IpcMessageResult_GetResult(tagList)); GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); AQH_Value_List_free(fullValueList); return NULL; } else { DBG_INFO(NULL, "Ignoring message \"%d\"", code); } GWEN_Tag16_List_free(tagList); } AQH_Message_free(msgIn); } /* while */ if (AQH_Value_List_GetCount(fullValueList)>0) return fullValueList; AQH_Value_List_free(fullValueList); } return NULL; } uint64_t AQH_DataClient_GetFirstData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum) { return _getFirstOrLastData(o, valueName, dataPtr, maxNum, AQH_MSGDATA_GETDATA_MODE_FIRST); } uint64_t AQH_DataClient_GetLastData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum) { return _getFirstOrLastData(o, valueName, dataPtr, maxNum, AQH_MSGDATA_GETDATA_MODE_LAST); } uint64_t AQH_DataClient_GetPeriodData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum, uint64_t tsBegin, uint64_t tsEnd) { AQH_DATA_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); if (xo) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=++(xo->lastMsgId); msgOut=AQH_IpcdMessageGetData_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ, msgId, 0, AQH_MSGDATA_GETDATA_MODE_PERIOD, valueName, tsBegin, tsEnd, maxNum); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); return _handleDataResponses(xo, dataPtr, maxNum, msgId); } return 0; } int AQH_DataClient_SetData(AQH_OBJECT *o, const AQH_VALUE *v, const char *data) { AQH_DATA_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); if (xo) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=++(xo->lastMsgId); msgOut=AQH_IpcdMessageSetData_new(AQH_MSGTYPE_IPC_DATA_SETDATA, msgId, 0, v, data); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); return _handleResult(xo, msgId); } return GWEN_ERROR_INVALID; } int AQH_DataClient_UpdateData(AQH_OBJECT *o, const AQH_VALUE *v, uint64_t timeStamp, double dataPoint) { AQH_DATA_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); if (xo) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=++(xo->lastMsgId); msgOut=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgId, 0, v, timeStamp, dataPoint); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); return _handleResult(xo, msgId); } return GWEN_ERROR_INVALID; } int _connectEndpoint(AQH_OBJECT *o, const char *addr, int port, uint32_t flags) { AQH_DATA_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); if (xo) { AQH_OBJECT *ep; int fd; fd=AQH_TcpObject_CreateConnectedSocket(addr, port); if (fd<0) { DBG_ERROR(NULL, "Error connecting to broker server %s:%d", addr, port); return GWEN_ERROR_IO; } ep=AQH_IpcClientObject_new(AQH_Object_GetEventLoop(o), fd); assert(ep); AQH_Endpoint_AddFlags(ep, flags); xo->ipcEndpoint=ep; return 0; } return GWEN_ERROR_INVALID; } int _exchangeConnectMsgs(AQH_DATA_CLIENT *xo, const char *userId, const char *passwd, const char *clientId, uint32_t flags) { AQH_MESSAGE *msgOut; uint32_t msgId; DBG_INFO(NULL, "Sending connect message for proto=%d.%d", xo->protoId, xo->protoVer); msgId=AQH_Endpoint_GetNextMessageId(xo->ipcEndpoint); msgOut=AQH_IpcMessageConnect_new(xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_CONNECT_REQ, msgId, 0, clientId, userId, passwd, flags); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); return AQH_IpcEndpoint_WaitForResultMsg(xo->ipcEndpoint, xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_RESULT, msgId, xo->timeoutInSeconds); } uint64_t _getFirstOrLastData(AQH_OBJECT *o, const char *valueName, uint64_t *dataPtr, uint64_t maxNum, int mode) { AQH_DATA_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_DATA_CLIENT, o); if (xo) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=++(xo->lastMsgId); msgOut=AQH_IpcdMessageGetData_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ, msgId, 0, mode, valueName, 0, 0, maxNum); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); return _handleDataResponses(xo, dataPtr, maxNum, msgId); } return 0; } uint64_t _handleDataResponses(AQH_DATA_CLIENT *xo, uint64_t *dataPtr, uint64_t maxNum, uint32_t msgId) { AQH_MESSAGE *msgIn; uint64_t fullNumberOfPoints=0; while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(xo->ipcEndpoint, msgId, xo->timeoutInSeconds)) ) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); if (tagList) { uint16_t code; code=AQH_IpcMessage_GetCode(msgIn); if (code==AQH_MSGTYPE_IPC_DATA_GETDATA_RSP) { const uint64_t *recvDataPtr; uint64_t recvNumberOfPoints; AQH_IpcdMessageMultiData_ReadDatapoints(tagList, &recvDataPtr, &recvNumberOfPoints); if (recvNumberOfPoints) { uint64_t i; for (i=0; iipcEndpoint, msgId, xo->timeoutInSeconds)) ) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); if (tagList) { uint16_t code; code=AQH_IpcMessage_GetCode(msgIn); if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { int result; result=AQH_IpcMessageResult_GetResult(tagList); DBG_INFO(NULL, "Server result: %d", result); GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); if (result!=AQH_MSGDATA_RESULT_SUCCESS) { DBG_INFO(NULL, "here (%d)", result); return GWEN_ERROR_GENERIC; } return 0; } else { DBG_INFO(NULL, "Ignoring message \"%d\"", code); } GWEN_Tag16_List_free(tagList); } AQH_Message_free(msgIn); } /* while */ return GWEN_ERROR_TIMEOUT; }