From 03f9178dd28ea42c14652377f91cf89fde1530c8 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Mon, 30 Sep 2024 18:28:38 +0200 Subject: [PATCH] Revert "Revert "aqhome: convenience code."" This reverts commit bb77c6acd180d85fde57234aca3a4098f8151297. --- aqhome/client/0BUILD | 78 +++++++++++++ aqhome/client/connection.c | 175 +++++++++++++++++++++++++++++ aqhome/client/connection.h | 28 +++++ aqhome/ipc/data/cmd_data.c | 44 ++++++++ aqhome/ipc/data/cmd_data_getdata.c | 110 ++++++++++++++++++ 5 files changed, 435 insertions(+) create mode 100644 aqhome/client/0BUILD create mode 100644 aqhome/client/connection.c create mode 100644 aqhome/client/connection.h create mode 100644 aqhome/ipc/data/cmd_data.c create mode 100644 aqhome/ipc/data/cmd_data_getdata.c diff --git a/aqhome/client/0BUILD b/aqhome/client/0BUILD new file mode 100644 index 0000000..9128de9 --- /dev/null +++ b/aqhome/client/0BUILD @@ -0,0 +1,78 @@ + + + + + + + + $(gwenhywfar_cflags) + -I$(topsrcdir) + -I$(topbuilddir) + + + + --include=$(builddir) + --include=$(srcdir) + + + + + + $(visibility_cflags) + + + + --api=AQHOME_API + + + + + + + + + + + + + + + + + + $(local/built_headers_pub) + + + + + connection.h + + + + + + + + + $(local/typefiles) + + connection.c + + + + + + + + + + + + + + + + + + + diff --git a/aqhome/client/connection.c b/aqhome/client/connection.c new file mode 100644 index 0000000..0f63ccc --- /dev/null +++ b/aqhome/client/connection.c @@ -0,0 +1,175 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2024 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./connection.h" + +#include "aqhome/ipc/endpoint_ipc.h" +#include "aqhome/ipc/endpoint_ipcclient.h" +#include "aqhome/ipc/data/ipc_data.h" +#include "aqhome/ipc/data/msg_data_connect.h" +#include "aqhome/ipc/msg_ipc_result.h" + +#include +#include +#include +#include +#include +#include + +#include + + + +GWEN_MSG_ENDPOINT *_physConnectToBroker(const char *addr, int port, const char *clientId, uint32_t flags); + + + + +int AQH_BrokerConnection_FlushOutMessageQueue(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds) +{ + time_t startTime; + + startTime=time(NULL); + + while(GWEN_MsgEndpoint_HaveMessageToSend(epTcp)) { + time_t now; + + GWEN_MsgEndpoint_IoLoop(epTcp, 1000); /* 1000 ms */ + now=time(NULL); + if (now-startTime>timeoutInSeconds) { + DBG_INFO(AQH_LOGDOMAIN, "Timeout"); + return GWEN_ERROR_TIMEOUT; + } + } + + return 0; +} + + + +GWEN_MSG *AQH_BrokerConnection_WaitForSpecificMessage(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds) +{ + time_t startTime; + + startTime=time(NULL); + + for (;;) { + GWEN_MSG *msg; + time_t now; + + GWEN_MsgEndpoint_IoLoop(epTcp, 1000); /* 1000 ms */ + msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp); + if (msg) { + uint16_t code; + + code=GWEN_IpcMsg_GetCode(msg); + if (code==msgCode) { + DBG_INFO(AQH_LOGDOMAIN, "Received expected IPC message"); + return msg; + } + else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { + DBG_INFO(AQH_LOGDOMAIN, "Received IPC result message"); + return msg; + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Received unexpected message %d (%x)", code, code); + } + GWEN_Msg_free(msg); + } + now=time(NULL); + if (now-startTime>timeoutInSeconds) { + DBG_INFO(AQH_LOGDOMAIN, "Timeout"); + break; + } + } + + return NULL; +} + + + +GWEN_MSG_ENDPOINT *AQH_BrokerConnection_OpenConnection(const char *addr, int port, + const char *clientId, + const char *userId, const char *passwd, + uint32_t flags, + int timeoutInSeconds) +{ + GWEN_MSG_ENDPOINT *epTcp; + GWEN_MSG *msgOut; + GWEN_MSG *msgIn; + uint32_t result; + + epTcp=_physConnectToBroker(addr, port, clientId, 0); + if (epTcp==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "ERROR creating TCP connection"); + return NULL; + } + + msgOut=AQH_ConnectDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_CONNECT_REQ, + GWEN_MsgEndpoint_GetNextMessageId(epTcp), 0, + clientId, userId, passwd, flags); + if (msgOut==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error creating message"); + GWEN_MsgEndpoint_free(epTcp); + return NULL; + } + GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); + + msgIn=AQH_BrokerConnection_WaitForSpecificMessage(epTcp, AQH_MSGTYPE_IPC_DATA_RESULT, timeoutInSeconds); + if (msgIn==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "No response received"); + GWEN_MsgEndpoint_free(epTcp); + return NULL; + } + + result=AQH_ResultIpcMsg_GetResultCode(msgIn); + GWEN_Msg_free(msgIn); + + if (result!=AQH_MSG_IPC_SUCCESS) { + DBG_ERROR(AQH_LOGDOMAIN, "Response: %d", result); + GWEN_MsgEndpoint_free(epTcp); + return NULL; + } + + return epTcp; +} + + + +GWEN_MSG_ENDPOINT *_physConnectToBroker(const char *addr, int port, const char *clientId, uint32_t flags) +{ + GWEN_MSG_ENDPOINT *ep; + GWEN_MSG_ENDPOINT *ipcBaseEndpoint; + int rv; + + ep=AQH_ClientIpcEndpoint_new("brokerIpcClient", 0); + GWEN_MsgEndpoint_AddFlags(ep, flags); + + ipcBaseEndpoint=AQH_IpcEndpoint_CreateIpcTcpClient(addr, port, "brokerPhysEndpoint", 0); + AQH_IpcEndpoint_SetServiceName(ipcBaseEndpoint, clientId); + GWEN_MsgEndpoint_Tree2_AddChild(ep, ipcBaseEndpoint); + + rv=GWEN_MultilayerEndpoint_StartConnect(ep); + if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { + DBG_ERROR(AQH_LOGDOMAIN, "Error connecting to broker server %s:%d (%d)", addr, port, rv); + GWEN_MsgEndpoint_free(ep); + return NULL; + } + + return ep; +} + + + + + + diff --git a/aqhome/client/connection.h b/aqhome/client/connection.h new file mode 100644 index 0000000..b247219 --- /dev/null +++ b/aqhome/client/connection.h @@ -0,0 +1,28 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2024 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_CLIENT_CONNECTION_H +#define AQHOME_CLIENT_CONNECTION_H + + +#include + +#include + + +GWEN_MSG_ENDPOINT *AQH_BrokerConnection_OpenConnection(const char *addr, int port, + const char *clientId, + const char *userId, const char *passwd, + uint32_t flags, + int timeoutInSeconds); +int AQH_BrokerConnection_FlushOutMessageQueue(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds); +GWEN_MSG *AQH_BrokerConnection_WaitForSpecificMessage(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds); + + + +#endif diff --git a/aqhome/ipc/data/cmd_data.c b/aqhome/ipc/data/cmd_data.c new file mode 100644 index 0000000..462979d --- /dev/null +++ b/aqhome/ipc/data/cmd_data.c @@ -0,0 +1,44 @@ + + + + + +GWEN_MSG *AQH_CmdDataIpc_WaitForSpecificIpcMessage(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds) +{ + time_t startTime; + + startTime=time(NULL); + + for (;;) { + GWEN_MSG *msg; + time_t now; + + while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) { + uint16_t code; + + code=GWEN_IpcMsg_GetCode(msg); + if (code==msgCode) { + DBG_INFO(NULL, "Received expected IPC message"); + return msg; + } + else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { + DBG_INFO(NULL, "Received IPC result message"); + return msg; + } + else { + DBG_INFO(NULL, "Received unexpected message %d (%x)", code, code); + } + GWEN_Msg_free(msg); + } + now=time(NULL); + if (now-startTime>timeoutInSeconds) { + DBG_INFO(NULL, "Timeout"); + break; + } + GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */ + } + + return NULL; +} + + diff --git a/aqhome/ipc/data/cmd_data_getdata.c b/aqhome/ipc/data/cmd_data_getdata.c new file mode 100644 index 0000000..f8c0e49 --- /dev/null +++ b/aqhome/ipc/data/cmd_data_getdata.c @@ -0,0 +1,110 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + + + + +GWEN_MSG *_waitForMatchingResponse(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds); + + + + +GWEN_MSG *AQH_CmdDataIpc_RequestDatapointsMsg(GWEN_MSG_ENDPOINT *ep, + const char *valueName, uint64_t tsBegin, uint64_t tsEnd, uint64_t num, + int timeoutInSeconds) +{ + GWEN_MSG *msg; + time_t startTime; + + msg=AQH_GetDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ, valueName, tsBegin, tsEnd, num); + GWEN_MsgEndpoint_AddSendMessage(ep, msg); + + msg=_waitForMatchingResponse(ep, AQH_MSGTYPE_IPC_DATA_GETDATA_RSP, timeoutInSeconds); + if (msg) { + uint16_t code; + + code=GWEN_IpcMsg_GetCode(msg); + if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { + uint32_t resultCode; + + resultCode=AQH_ResultIpcMsg_GetResultCode(msg); + DBG_INFO(AQH_LOGDOMAIN, "IPC error: %d", resultCode); + GWEN_Msg_free(msg); + return NULL; + } + else if (code==AQH_MSGTYPE_IPC_DATA_GETDATA_RSP) { + int rv; + + AQH_MultiDataDataIpcMsg_Parse(msg, 0); + return msg; + } + GWEN_Msg_free(msg); + } + return NULL; +} + + + +GWEN_MSG *_waitForMatchingResponse(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds) +{ + time_t startTime; + + startTime=time(NULL); + + for (;;) { + GWEN_MSG *msg; + time_t now; + + while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) { + uint16_t code; + + code=GWEN_IpcMsg_GetCode(msg); + if (code==msgCode) { + DBG_INFO(NULL, "Received expected IPC message"); + return msg; + } + else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { + DBG_INFO(NULL, "Received IPC result message"); + return msg; + } + else { + DBG_INFO(NULL, "Received unexpected message %d (%x), ignoring", code, code); + } + GWEN_Msg_free(msg); + } /* while */ + + now=time(NULL); + if (now-startTime>timeoutInSeconds) { + DBG_INFO(NULL, "Timeout"); + break; + } + GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */ + } + + return NULL; +} + + +