/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./client_p.h" #include "aqhome/aqhome.h" #include "aqhome/msg/ipc/m_ipc.h" #include "aqhome/msg/ipc/m_ipc_tag16.h" #include "aqhome/msg/ipc/m_ipc_result.h" #include "aqhome/msg/ipc/data/m_ipcd.h" #include "aqhome/msg/ipc/nodes/m_ipcn.h" #include "aqhome/msg/ipc/m_ipc_connect.h" #include "aqhome/msg/ipc/data/m_ipcd_devices.h" #include "aqhome/msg/ipc/data/m_ipcd_values.h" #include "aqhome/msg/ipc/data/m_ipcd_getvalues.h" #include "aqhome/msg/ipc/data/m_ipcd_getdevices.h" #include "aqhome/msg/ipc/data/m_ipcd_getdata.h" #include "aqhome/msg/ipc/data/m_ipcd_multidata.h" #include "aqhome/msg/ipc/data/m_ipcd_setdata.h" #include "aqhome/ipc2/tcp_object.h" #include "aqhome/ipc2/ipc_client.h" #include #include #include #include #include #include #define AQH_DATA_CLIENT_DEFAULT_CMD_TIMEOUT 5 static int _connectEndpoint(AQH_DATACLIENT *dc, const char *addr, int port, uint32_t flags); static int _exchangeConnectMsgs(AQH_DATACLIENT *dc, const char *userId, const char *passwd, const char *clientId, uint32_t flags); static uint64_t _getFirstOrLastData(AQH_DATACLIENT *dc, const char *valueName, uint64_t *dataPtr, uint64_t maxNum, int mode); static uint64_t _handleDataResponses(AQH_DATACLIENT *dc, uint64_t *dataPtr, uint64_t maxNum, uint32_t msgId); static int _handleResult(AQH_DATACLIENT *dc, uint32_t msgId); AQH_DATACLIENT *AQH_DataClient_new(AQH_EVENT_LOOP *eventLoop, uint8_t protoId, uint8_t protoVer) { AQH_DATACLIENT *dc; GWEN_NEW_OBJECT(AQH_DATACLIENT, dc); dc->eventLoop=eventLoop; dc->protoId=protoId; dc->protoVer=protoVer; dc->timeoutInSeconds=AQH_DATA_CLIENT_DEFAULT_CMD_TIMEOUT; return dc; } void AQH_DataClient_free(AQH_DATACLIENT *dc) { if (dc) { AQH_Object_free(dc->ipcEndpoint); GWEN_FREE_OBJECT(dc); } } int AQH_DataClient_ReadLocalArgs(AQH_DATACLIENT *dc, GWEN_DB_NODE *dbGlobalArgs, const GWEN_ARGS *args, int argc, char **argv) { if (dc) { int rv; GWEN_DB_Group_free(dc->dbLocalArgs); dc->dbLocalArgs=GWEN_DB_GetGroup(dbGlobalArgs, GWEN_DB_FLAGS_DEFAULT, "local"); rv=GWEN_Args_Check(argc, argv, 1, GWEN_ARGS_MODE_ALLOW_FREEPARAM, args, dc->dbLocalArgs); if (rv==GWEN_ARGS_RESULT_ERROR) { fprintf(stderr, "ERROR: Could not parse arguments\n"); return 1; } else if (rv==GWEN_ARGS_RESULT_HELP) { GWEN_BUFFER *ubuf; ubuf=GWEN_Buffer_new(0, 1024, 0, 1); if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) { fprintf(stderr, "ERROR: Could not create help string\n"); return 1; } fprintf(stderr, "%s\n", GWEN_Buffer_GetStart(ubuf)); GWEN_Buffer_free(ubuf); return 1; } dc->timeoutInSeconds=GWEN_DB_GetIntValue(dc->dbLocalArgs, "timeout", 0, 5); AQH_MergeConfigFileIntoConfig(dc->dbLocalArgs, "ConfigFile"); return 0; } return GWEN_ERROR_INVALID; } int AQH_DataClient_ReadConfigFile(AQH_DATACLIENT *dc) { GWEN_DB_NODE *dbConfig; dbConfig=AQH_LoadConfigFile(); if (dbConfig) { GWEN_DB_Group_free(dc->dbLocalArgs); dc->dbLocalArgs=dbConfig; return 0; } return GWEN_ERROR_GENERIC; } GWEN_DB_NODE *AQH_DataClient_GetDbLocalArgs(const AQH_DATACLIENT *dc) { return dc?dc->dbLocalArgs:NULL; } int AQH_DataClient_Connect(AQH_DATACLIENT *dc, const char *addr, int port, const char *userId, const char *passwd, const char *clientId, uint32_t flags) { if (dc) { int rv; AQH_Object_free(dc->ipcEndpoint); dc->ipcEndpoint=NULL; rv=_connectEndpoint(dc, addr, port, 0 /* connection flags */); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return rv; } rv=_exchangeConnectMsgs(dc, userId, passwd, clientId, flags); if (rv<0) { AQH_Object_free(dc->ipcEndpoint); dc->ipcEndpoint=NULL; DBG_INFO(NULL, "here (%d)", rv); return rv; } return 0; } return GWEN_ERROR_INVALID; } int AQH_DataClient_Disconnect(AQH_DATACLIENT *dc) { if (dc) { AQH_Object_free(dc->ipcEndpoint); dc->ipcEndpoint=NULL; return 0; } return GWEN_ERROR_INVALID; } void AQH_DataClient_SetTimeout(AQH_DATACLIENT *dc, int i) { if (dc) { dc->timeoutInSeconds=i; } } AQH_DEVICE_LIST *AQH_DataClient_GetDevices(AQH_DATACLIENT *dc, const char *deviceName) { if (dc) { AQH_MESSAGE *msgOut; AQH_MESSAGE *msgIn; uint32_t msgId; AQH_DEVICE_LIST *fullDeviceList; fullDeviceList=AQH_Device_List_new(); msgId=++(dc->lastMsgId); msgOut=AQH_IpcdMessageGetDevices_new(AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ, msgId, 0, deviceName); AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut); while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(dc->ipcEndpoint, msgId, dc->timeoutInSeconds)) ) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); if (tagList) { uint16_t code; code=AQH_IpcMessage_GetCode(msgIn); if (code==AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP) { AQH_DEVICE_LIST *deviceList; deviceList=AQH_IpcdMessageDevices_ReadDeviceList(tagList); if (deviceList) { AQH_Device_List_AddList(fullDeviceList, deviceList); AQH_Device_List_free(deviceList); } if (AQH_IpcdMessageDevices_GetFlags(tagList) & AQH_MSGDATA_DEVICES_FLAGS_LASTMSG) { GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); break; } } else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { DBG_ERROR(NULL, "Server Error: %d", AQH_IpcMessageResult_GetResult(tagList)); GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); AQH_Device_List_free(fullDeviceList); return NULL; } else { DBG_INFO(NULL, "Ignoring message \"%d\"", code); } GWEN_Tag16_List_free(tagList); } AQH_Message_free(msgIn); } /* while */ if (AQH_Device_List_GetCount(fullDeviceList)>0) return fullDeviceList; AQH_Device_List_free(fullDeviceList); } return NULL; } AQH_VALUE_LIST *AQH_DataClient_GetValues(AQH_DATACLIENT *dc, const char *deviceName, int modality) { if (dc) { AQH_MESSAGE *msgOut; AQH_MESSAGE *msgIn; uint32_t msgId; AQH_VALUE_LIST *fullValueList; fullValueList=AQH_Value_List_new(); msgId=++(dc->lastMsgId); msgOut=AQH_IpcdMessageGetValues_new(AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ, msgId, 0, deviceName, modality); AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut); while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(dc->ipcEndpoint, msgId, dc->timeoutInSeconds)) ) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); if (tagList) { uint16_t code; code=AQH_IpcMessage_GetCode(msgIn); if (code==AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP) { AQH_VALUE_LIST *valueList; valueList=AQH_IpcdMessageValues_ReadValueList(tagList); if (valueList) { AQH_Value_List_AddList(fullValueList, valueList); AQH_Value_List_free(valueList); } if (AQH_IpcdMessageValues_GetFlags(tagList) & AQH_MSGDATA_VALUES_FLAGS_LASTMSG) { GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); break; } } else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { DBG_ERROR(NULL, "Server Error: %d", AQH_IpcMessageResult_GetResult(tagList)); GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); AQH_Value_List_free(fullValueList); return NULL; } else { DBG_INFO(NULL, "Ignoring message \"%d\"", code); } GWEN_Tag16_List_free(tagList); } AQH_Message_free(msgIn); } /* while */ if (AQH_Value_List_GetCount(fullValueList)>0) return fullValueList; AQH_Value_List_free(fullValueList); } return NULL; } uint64_t AQH_DataClient_GetFirstData(AQH_DATACLIENT *dc, const char *valueName, uint64_t *dataPtr, uint64_t maxNum) { return _getFirstOrLastData(dc, valueName, dataPtr, maxNum, AQH_MSGDATA_GETDATA_MODE_FIRST); } uint64_t AQH_DataClient_GetLastData(AQH_DATACLIENT *dc, const char *valueName, uint64_t *dataPtr, uint64_t maxNum) { return _getFirstOrLastData(dc, valueName, dataPtr, maxNum, AQH_MSGDATA_GETDATA_MODE_LAST); } uint64_t AQH_DataClient_GetPeriodData(AQH_DATACLIENT *dc, const char *valueName, uint64_t *dataPtr, uint64_t maxNum, uint64_t tsBegin, uint64_t tsEnd) { if (dc) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=++(dc->lastMsgId); msgOut=AQH_IpcdMessageGetData_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ, msgId, 0, AQH_MSGDATA_GETDATA_MODE_PERIOD, valueName, tsBegin, tsEnd, maxNum); AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut); return _handleDataResponses(dc, dataPtr, maxNum, msgId); } return 0; } int AQH_DataClient_SetData(AQH_DATACLIENT *dc, const AQH_VALUE *v, double data) { if (dc) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=++(dc->lastMsgId); msgOut=AQH_IpcdMessageSetData_new(AQH_MSGTYPE_IPC_DATA_SETDATA, msgId, 0, v, data); AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut); return _handleResult(dc, msgId); } return GWEN_ERROR_INVALID; } int AQH_DataClient_UpdateData(AQH_DATACLIENT *dc, const AQH_VALUE *v, uint64_t timeStamp, double dataPoint) { if (dc) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=++(dc->lastMsgId); msgOut=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgId, 0, 0, v, timeStamp, dataPoint); AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut); return _handleResult(dc, msgId); } return GWEN_ERROR_INVALID; } int AQH_DataClient_ModDevice(AQH_DATACLIENT *dc, const AQH_DEVICE *d) { if (dc) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=++(dc->lastMsgId); msgOut=AQH_IpcdMessageDevices_newForOneDevice(AQH_MSGTYPE_IPC_DATA_MODDEVICE_REQ, msgId, 0, AQH_MSGDATA_DEVICES_FLAGS_LASTMSG, d); AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut); return _handleResult(dc, msgId); } return GWEN_ERROR_INVALID; } int _connectEndpoint(AQH_DATACLIENT *dc, const char *addr, int port, uint32_t flags) { if (dc) { AQH_OBJECT *ep; int fd; fd=AQH_TcpObject_CreateConnectedSocket(addr, port); if (fd<0) { DBG_ERROR(NULL, "Error connecting to broker server %s:%d", addr, port); return GWEN_ERROR_IO; } ep=AQH_IpcClientObject_new(dc->eventLoop, fd); assert(ep); AQH_Endpoint_AddFlags(ep, flags); dc->ipcEndpoint=ep; return 0; } return GWEN_ERROR_INVALID; } int _exchangeConnectMsgs(AQH_DATACLIENT *dc, const char *userId, const char *passwd, const char *clientId, uint32_t flags) { AQH_MESSAGE *msgOut; uint32_t msgId; DBG_INFO(NULL, "Sending connect message for proto=%d.%d", dc->protoId, dc->protoVer); msgId=AQH_Endpoint_GetNextMessageId(dc->ipcEndpoint); msgOut=AQH_IpcMessageConnect_new(dc->protoId, dc->protoVer, AQH_MSGTYPE_IPC_CONNECT_REQ, msgId, 0, clientId, userId, passwd, flags); AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut); return AQH_IpcEndpoint_WaitForResultMsg(dc->ipcEndpoint, dc->protoId, dc->protoVer, AQH_MSGTYPE_IPC_RESULT, msgId, dc->timeoutInSeconds); } uint64_t _getFirstOrLastData(AQH_DATACLIENT *dc, const char *valueName, uint64_t *dataPtr, uint64_t maxNum, int mode) { if (dc) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=++(dc->lastMsgId); msgOut=AQH_IpcdMessageGetData_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ, msgId, 0, mode, valueName, 0, 0, maxNum); AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut); return _handleDataResponses(dc, dataPtr, maxNum, msgId); } return 0; } uint64_t _handleDataResponses(AQH_DATACLIENT *dc, uint64_t *dataPtr, uint64_t maxNum, uint32_t msgId) { AQH_MESSAGE *msgIn; uint64_t fullNumberOfPoints=0; while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(dc->ipcEndpoint, msgId, dc->timeoutInSeconds)) ) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); if (tagList) { uint16_t code; code=AQH_IpcMessage_GetCode(msgIn); if (code==AQH_MSGTYPE_IPC_DATA_GETDATA_RSP) { const uint64_t *recvDataPtr; uint64_t recvNumberOfPoints; uint32_t flags; flags=AQH_IpcdMessageMultiData_GetFlags(tagList); AQH_IpcdMessageMultiData_ReadDatapoints(tagList, &recvDataPtr, &recvNumberOfPoints); if (recvNumberOfPoints) { uint64_t i; for (i=0; iipcEndpoint, msgId, dc->timeoutInSeconds)) ) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); if (tagList) { uint16_t code; code=AQH_IpcMessage_GetCode(msgIn); if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { int result; result=AQH_IpcMessageResult_GetResult(tagList); DBG_INFO(NULL, "Server result: %d", result); GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); if (result!=AQH_MSGDATA_RESULT_SUCCESS) { DBG_INFO(NULL, "here (%d)", result); return GWEN_ERROR_GENERIC; } return 0; } else { DBG_INFO(NULL, "Ignoring message \"%d\"", code); } GWEN_Tag16_List_free(tagList); } AQH_Message_free(msgIn); } /* while */ return GWEN_ERROR_TIMEOUT; } int AQH_DataClient_ConnectWithArgs(AQH_DATACLIENT *dc, uint32_t flags) { const char *brokerAddress; int brokerPort; const char *userId; const char *passwd; const char *clientId; int rv; brokerAddress=GWEN_DB_GetCharValue(dc->dbLocalArgs, "brokerAddress", 0, NULL); if (!(brokerAddress && *brokerAddress)) brokerAddress=GWEN_DB_GetCharValue(dc->dbLocalArgs, "ConfigFile/brokerAddress", 0, "127.0.0.1"); brokerPort=GWEN_DB_GetIntValue(dc->dbLocalArgs, "brokerPort", 0, -1); if (brokerPort<0) brokerPort=GWEN_DB_GetIntValue(dc->dbLocalArgs, "ConfigFile/brokerPort", 0, 1899); userId=GWEN_DB_GetCharValue(dc->dbLocalArgs, "userId", 0, NULL); passwd=GWEN_DB_GetCharValue(dc->dbLocalArgs, "password", 0, NULL); clientId=GWEN_DB_GetCharValue(dc->dbLocalArgs, "brokerClientId", 0, NULL); rv=AQH_DataClient_Connect(dc, brokerAddress, brokerPort, userId, passwd, clientId, flags); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return rv; } return 0; }