/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./client_p.h" #include "./utils.h" #include "aqhome/aqhome.h" #include "aqhome/msg/ipc/m_ipc.h" #include "aqhome/msg/ipc/m_ipc_tag16.h" #include "aqhome/msg/ipc/m_ipc_result.h" #include "aqhome/msg/ipc/data/m_ipcd.h" #include "aqhome/msg/ipc/m_ipc_connect.h" #include "aqhome/msg/ipc/nodes/m_ipcn.h" #include "aqhome/msg/ipc/nodes/m_ipcn_forward.h" #include "aqhome/msg/ipc/nodes/m_ipcn_setaccmsggrps.h" #include "aqhome/msg/node/m_node.h" #include "aqhome/ipc2/ipc_endpoint.h" #include #include #include #include GWEN_INHERIT(AQH_OBJECT, AQH_TOOL_CLIENT) static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _connectEndpoint(AQH_OBJECT *o); static int _exchangeConnectMsgs(AQH_TOOL_CLIENT *xo, uint32_t flags); static int _exchangeAcceptedMsgGroups(AQH_TOOL_CLIENT *xo, uint32_t groups); static int _sendWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo); static int _waitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo, uint32_t msgId); static int _nodesSendWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo); static int _nodesWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo); static AQH_MESSAGE *_createRequestMessage(AQH_OBJECT *o, uint32_t msgId); static int _handleResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList, int first); AQH_OBJECT *AQH_ToolClient_new(AQH_EVENT_LOOP *eventLoop, uint8_t protoId, uint8_t protoVer, GWEN_DB_NODE *dbGlobalArgs, const GWEN_ARGS *argDescrs) { AQH_OBJECT *o; AQH_TOOL_CLIENT *xo; o=AQH_Object_new(eventLoop); GWEN_NEW_OBJECT(AQH_TOOL_CLIENT, xo); GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o, xo, _freeData); xo->dbGlobalArgs=dbGlobalArgs; xo->args=argDescrs; xo->protoId=protoId; xo->protoVer=protoVer; return o; } void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) { AQH_TOOL_CLIENT *xo; xo=(AQH_TOOL_CLIENT*)p; AQH_Object_free(xo->ipcEndpoint); GWEN_FREE_OBJECT(xo); } AQH_OBJECT *AQH_ToolClient_GetEndpoint(const AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) return xo->ipcEndpoint; return NULL; } int AQH_ToolClient_ReadLocalArgs(AQH_OBJECT *o, int argc, char **argv) { if (o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) { int rv; GWEN_DB_Group_free(xo->dbLocalArgs); xo->dbLocalArgs=GWEN_DB_GetGroup(xo->dbGlobalArgs, GWEN_DB_FLAGS_DEFAULT, "local"); rv=GWEN_Args_Check(argc, argv, 1, GWEN_ARGS_MODE_ALLOW_FREEPARAM, xo->args, xo->dbLocalArgs); if (rv==GWEN_ARGS_RESULT_ERROR) { fprintf(stderr, "ERROR: Could not parse arguments\n"); return 1; } else if (rv==GWEN_ARGS_RESULT_HELP) { GWEN_BUFFER *ubuf; ubuf=GWEN_Buffer_new(0, 1024, 0, 1); if (GWEN_Args_Usage(xo->args, ubuf, GWEN_ArgsOutType_Txt)) { fprintf(stderr, "ERROR: Could not create help string\n"); return 1; } fprintf(stderr, "%s\n", GWEN_Buffer_GetStart(ubuf)); GWEN_Buffer_free(ubuf); return 1; } xo->timeoutInSeconds=GWEN_DB_GetIntValue(xo->dbLocalArgs, "timeout", 0, 5); AQH_MergeConfigFileIntoConfig(xo->dbLocalArgs, "ConfigFile"); return 0; } } return 1; } void AQH_ToolClient_SetCreateRequestMessageFn(AQH_OBJECT *o, AQH_TOOLCLIENT_CREATEREQUESTMESSAGE_FN f) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) xo->createRequestMessageFn=f; } void AQH_ToolClient_SetHandleResponseMessageFn(AQH_OBJECT *o, AQH_TOOLCLIENT_HANDLERESPONSEMESSAGE_FN f) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) xo->handleResponseMessageFn=f; } GWEN_DB_NODE *AQH_ToolClient_GetDbGlobalArgs(const AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) return xo->dbGlobalArgs; return NULL; } GWEN_DB_NODE *AQH_ToolClient_GetDbLocalArgs(const AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) return xo->dbLocalArgs; return NULL; } uint32_t AQH_ToolClient_GetFlags(const AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) return xo->flags; return 0; } void AQH_ToolClient_SetFlags(AQH_OBJECT *o, uint32_t f) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) xo->flags=f; } void AQH_ToolClient_AddFlags(AQH_OBJECT *o, uint32_t f) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) xo->flags|=f; } void AQH_ToolClient_SubFlags(AQH_OBJECT *o, uint32_t f) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) xo->flags&=~f; } int AQH_ToolClient_Run(AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) { int rv; xo->ipcEndpoint=Utils2_SetupBrokerClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0); if (xo->ipcEndpoint==NULL) { DBG_ERROR(NULL, "ERROR creating TCP connection"); return 2; } rv=_exchangeConnectMsgs(xo, xo->flags); if (rv!=AQH_MSGDATA_RESULT_SUCCESS) { DBG_ERROR(NULL, "Connect response: %d", rv); return 2; } return _sendWaitAndHandle(o, xo); } return GWEN_ERROR_INVALID; } int AQH_ToolClient_RunConnected(AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) return _sendWaitAndHandle(o, xo); return GWEN_ERROR_INVALID; } int AQH_ToolClient_RunConnectedWithNodeMsgs(AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) return _nodesSendWaitAndHandle(o, xo); return GWEN_ERROR_INVALID; } int AQH_ToolClient_Watch(AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) { int rv; xo->ipcEndpoint=Utils2_SetupBrokerClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0); if (xo->ipcEndpoint==NULL) { DBG_ERROR(NULL, "ERROR creating TCP connection"); return 2; } rv=_exchangeConnectMsgs(xo, xo->flags); if (rv!=AQH_MSGDATA_RESULT_SUCCESS) { DBG_ERROR(NULL, "Connect response: %d", rv); return 2; } return _waitAndHandle(o, xo, 0); } return GWEN_ERROR_INVALID; } int AQH_ToolClient_WatchConnected(AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) return _waitAndHandle(o, xo, 0); return GWEN_ERROR_INVALID; } int AQH_ToolClient_Connect(AQH_OBJECT *o, uint32_t connFlags, uint32_t connMsgFlags, uint32_t grps) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) { int rv; rv=_connectEndpoint(o); if (rv<0) { DBG_INFO(NULL, "here(%d)", rv); return rv; } if (connFlags & AQH_TOOL_CLIENT_CONNECTFLAGS_WITHCONNECTMSG) { rv=_exchangeConnectMsgs(xo, connMsgFlags); if (rv<0) { DBG_INFO(NULL, "here(%d)", rv); return rv; } } if (connFlags & AQH_TOOL_CLIENT_CONNECTFLAGS_WITHGRPMSG) { rv=_exchangeAcceptedMsgGroups(xo, grps); if (rv<0) { DBG_INFO(NULL, "here(%d)", rv); return rv; } } return 0; } return GWEN_ERROR_INVALID; } int _connectEndpoint(AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) { xo->ipcEndpoint=Utils2_SetupBrokerClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0); if (xo->ipcEndpoint==NULL) { DBG_ERROR(NULL, "ERROR creating TCP connection"); return 2; } return 0; } return GWEN_ERROR_INVALID; } int _exchangeConnectMsgs(AQH_TOOL_CLIENT *xo, uint32_t flags) { AQH_MESSAGE *msgOut; uint32_t msgId; const char *clientId; const char *userId; const char *passw; clientId=GWEN_DB_GetCharValue(xo->dbLocalArgs, "brokerClientId", 0, "aqhome-tool"); userId=GWEN_DB_GetCharValue(xo->dbLocalArgs, "userId", 0, NULL); passw=GWEN_DB_GetCharValue(xo->dbLocalArgs, "password", 0, NULL); msgId=AQH_Endpoint_GetNextMessageId(xo->ipcEndpoint); msgOut=AQH_IpcMessageConnect_new(xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_CONNECT_REQ, msgId, 0, clientId, userId, passw, flags); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); return AQH_IpcEndpoint_WaitForResultMsg(xo->ipcEndpoint, xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_RESULT, msgId, xo->timeoutInSeconds); } int _exchangeAcceptedMsgGroups(AQH_TOOL_CLIENT *xo, uint32_t groups) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=AQH_Endpoint_GetNextMessageId(xo->ipcEndpoint); msgOut=AQH_IpcnMessageSetAcceptedMsgGroups_new(AQH_MSGTYPE_IPC_NODES_SETACCMSGGRPS, msgId, 0, groups); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); return AQH_IpcEndpoint_WaitForResultMsg(xo->ipcEndpoint, xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_RESULT, msgId, xo->timeoutInSeconds); } void AQH_ToolClient_SendNodeMsg(AQH_OBJECT *o, const AQH_MESSAGE *nodeMsg) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) { AQH_MESSAGE *msgOut; msgOut=AQH_IpcnMessageForward_new(AQH_MSGTYPE_IPC_NODES_FORWARD, AQH_Endpoint_GetNextMessageId(xo->ipcEndpoint), 0, AQH_Message_GetMsgPointer(nodeMsg), AQH_Message_GetUsedSize(nodeMsg)); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); } } AQH_MESSAGE *AQH_ToolClient_WaitForNodeMsg(AQH_OBJECT *o, int nodeSrcAddr, uint8_t nodeMsgType, int timeoutInSeconds) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) { time_t startTime; startTime=time(NULL); for (;;) { AQH_MESSAGE *msg; time_t now; AQH_EventLoop_Run(AQH_Object_GetEventLoop(xo->ipcEndpoint), 500); msg=AQH_Endpoint_GetNextMsgIn(xo->ipcEndpoint); if (msg) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0); if (tagList) { uint16_t code; code=AQH_IpcMessage_GetCode(msg); if (code==AQH_MSGTYPE_IPC_NODES_FORWARD) { const GWEN_TAG16 *tag; const uint8_t *ptr; uint32_t len; tag=tagList?GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGNODE_FORWARD_TAGS_MSG):NULL; ptr=tag?GWEN_Tag16_GetTagData(tag):NULL; len=tag?GWEN_Tag16_GetTagLength(tag):0; if (ptr && len) { AQH_MESSAGE *nodeMsg; nodeMsg=AQH_NodeMessage_fromBuffer(ptr, len); if (nodeMsg) { if ((nodeSrcAddr==0xff || nodeSrcAddr==0x00 || nodeSrcAddr==AQH_NodeMessage_GetSourceAddress(nodeMsg)) && (nodeMsgType==0 || nodeMsgType==AQH_NodeMessage_GetMsgType(nodeMsg))) { GWEN_Tag16_List_free(tagList); AQH_Message_free(msg); return nodeMsg; } AQH_Message_free(nodeMsg); } } else { DBG_ERROR(NULL, "Empty node msg"); } } else { DBG_ERROR(NULL, "Received unexpected message %d (%x), ignoring", code, code); } GWEN_Tag16_List_free(tagList); } AQH_Message_free(msg); } now=time(NULL); if (now-startTime>timeoutInSeconds) { DBG_INFO(NULL, "Timeout"); break; } } } return NULL; } int AQH_ToolClient_HandleResultMsg(GWEN_UNUSED const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList, GWEN_UNUSED int first) { int result; result=AQH_IpcMessageResult_GetResult(tagList); if (result!=AQH_MSGDATA_RESULT_SUCCESS) { char *text; text=AQH_IpcMessageResult_GetText(tagList); DBG_ERROR(NULL, "ERROR: %d (%s)", result, text?text:""); return 3; } return 0; } int _sendWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=AQH_Endpoint_GetNextMessageId(xo->ipcEndpoint); msgOut=_createRequestMessage(o, msgId); if (msgOut==NULL) { DBG_ERROR(NULL, "Error creating outbound message"); return 2; } AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); return _waitAndHandle(o, xo, msgId); } int _waitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo, uint32_t msgId) { int first=1; for (;;) { AQH_MESSAGE *msgIn; msgIn=AQH_IpcEndpoint_WaitForResponseMsg(xo->ipcEndpoint, msgId, xo->timeoutInSeconds); if (msgIn) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); if (tagList) { int rv; rv=_handleResponseMessage(o, msgIn, tagList, first); GWEN_Tag16_List_free(tagList); AQH_Message_free(msgIn); first=0; if (rv<0) { DBG_ERROR(NULL, "here (%d)", rv); return 3; } else if (rv==1) { DBG_ERROR(NULL, "Done."); return 0; } } } } /* for */ return 1; } int _nodesSendWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo) { AQH_MESSAGE *nodeMsg; AQH_MESSAGE *msgOut; uint32_t msgId; msgId=AQH_Endpoint_GetNextMessageId(xo->ipcEndpoint); nodeMsg=_createRequestMessage(o, msgId); if (nodeMsg==NULL) { DBG_ERROR(NULL, "Error creating outbound message"); return 2; } msgOut=AQH_IpcnMessageForward_new(AQH_MSGTYPE_IPC_NODES_FORWARD, msgId, 0, AQH_Message_GetMsgPointer(nodeMsg), AQH_Message_GetUsedSize(nodeMsg)); AQH_Message_free(nodeMsg); AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); return _nodesWaitAndHandle(o, xo); } int _nodesWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo) { int first=1; for (;;) { AQH_MESSAGE *msgIn; msgIn=AQH_ToolClient_WaitForNodeMsg(o, 0, 0, xo->timeoutInSeconds); if (msgIn) { int rv; rv=_handleResponseMessage(o, msgIn, NULL, first); AQH_Message_free(msgIn); first=0; if (rv<0) { DBG_ERROR(NULL, "here (%d)", rv); return 3; } else if (rv==1) { DBG_ERROR(NULL, "Done."); return 0; } } } /* for */ return 1; } AQH_MESSAGE *_createRequestMessage(AQH_OBJECT *o, uint32_t msgId) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) { if (xo->createRequestMessageFn) return xo->createRequestMessageFn(o, msgId); } return NULL; } int _handleResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList, int first) { AQH_TOOL_CLIENT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) { if (xo->handleResponseMessageFn) return xo->handleResponseMessageFn(o, msg, tagList, first); else { uint16_t code; code=AQH_IpcMessage_GetCode(msg); if (code==AQH_MSGTYPE_IPC_DATA_RESULT) return AQH_ToolClient_HandleResultMsg(msg, tagList, first); else { DBG_INFO(NULL, "Unexpected message \"%d\"", code); return 3; } } } return 0; }