/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2024 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./connection.h" #include "aqhome/ipc/endpoint_ipc.h" #include "aqhome/ipc/endpoint_ipcclient.h" #include "aqhome/ipc/data/ipc_data.h" #include "aqhome/ipc/data/msg_data_connect.h" #include "aqhome/ipc/msg_ipc_result.h" #include #include #include #include #include #include #include GWEN_MSG_ENDPOINT *_physConnectToBroker(const char *addr, int port, const char *clientId, uint32_t flags); int AQH_BrokerConnection_FlushOutMessageQueue(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds) { time_t startTime; startTime=time(NULL); while(GWEN_MsgEndpoint_HaveMessageToSend(epTcp)) { time_t now; GWEN_MsgEndpoint_IoLoop(epTcp, 1000); /* 1000 ms */ now=time(NULL); if (now-startTime>timeoutInSeconds) { DBG_INFO(AQH_LOGDOMAIN, "Timeout"); return GWEN_ERROR_TIMEOUT; } } return 0; } GWEN_MSG *AQH_BrokerConnection_WaitForSpecificMessage(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds) { time_t startTime; startTime=time(NULL); for (;;) { GWEN_MSG *msg; time_t now; GWEN_MsgEndpoint_IoLoop(epTcp, 1000); /* 1000 ms */ msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp); if (msg) { uint16_t code; code=GWEN_IpcMsg_GetCode(msg); if (code==msgCode) { DBG_INFO(AQH_LOGDOMAIN, "Received expected IPC message"); return msg; } else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { DBG_INFO(AQH_LOGDOMAIN, "Received IPC result message"); return msg; } else { DBG_INFO(AQH_LOGDOMAIN, "Received unexpected message %d (%x)", code, code); } GWEN_Msg_free(msg); } now=time(NULL); if (now-startTime>timeoutInSeconds) { DBG_INFO(AQH_LOGDOMAIN, "Timeout"); break; } } return NULL; } GWEN_MSG_ENDPOINT *AQH_BrokerConnection_OpenConnection(const char *addr, int port, const char *clientId, const char *userId, const char *passwd, uint32_t flags, int timeoutInSeconds) { GWEN_MSG_ENDPOINT *epTcp; GWEN_MSG *msgOut; GWEN_MSG *msgIn; uint32_t result; epTcp=_physConnectToBroker(addr, port, clientId, 0); if (epTcp==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "ERROR creating TCP connection"); return NULL; } msgOut=AQH_ConnectDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_CONNECT_REQ, GWEN_MsgEndpoint_GetNextMessageId(epTcp), 0, clientId, userId, passwd, flags); if (msgOut==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Error creating message"); GWEN_MsgEndpoint_free(epTcp); return NULL; } GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); msgIn=AQH_BrokerConnection_WaitForSpecificMessage(epTcp, AQH_MSGTYPE_IPC_DATA_RESULT, timeoutInSeconds); if (msgIn==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "No response received"); GWEN_MsgEndpoint_free(epTcp); return NULL; } result=AQH_ResultIpcMsg_GetResultCode(msgIn); GWEN_Msg_free(msgIn); if (result!=AQH_MSG_IPC_SUCCESS) { DBG_ERROR(AQH_LOGDOMAIN, "Response: %d", result); GWEN_MsgEndpoint_free(epTcp); return NULL; } return epTcp; } GWEN_MSG_ENDPOINT *_physConnectToBroker(const char *addr, int port, const char *clientId, uint32_t flags) { GWEN_MSG_ENDPOINT *ep; GWEN_MSG_ENDPOINT *ipcBaseEndpoint; int rv; ep=AQH_ClientIpcEndpoint_new("brokerIpcClient", 0); GWEN_MsgEndpoint_AddFlags(ep, flags); ipcBaseEndpoint=AQH_IpcEndpoint_CreateIpcTcpClient(addr, port, "brokerPhysEndpoint", 0); AQH_IpcEndpoint_SetServiceName(ipcBaseEndpoint, clientId); GWEN_MsgEndpoint_Tree2_AddChild(ep, ipcBaseEndpoint); rv=GWEN_MultilayerEndpoint_StartConnect(ep); if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { DBG_ERROR(AQH_LOGDOMAIN, "Error connecting to broker server %s:%d (%d)", addr, port, rv); GWEN_MsgEndpoint_free(ep); return NULL; } return ep; }