diff --git a/aqhome/client/0BUILD b/aqhome/client/0BUILD
new file mode 100644
index 0000000..9128de9
--- /dev/null
+++ b/aqhome/client/0BUILD
@@ -0,0 +1,78 @@
+
+
+
+
+
+
+
+ $(gwenhywfar_cflags)
+ -I$(topsrcdir)
+ -I$(topbuilddir)
+
+
+
+ --include=$(builddir)
+ --include=$(srcdir)
+
+
+
+
+
+ $(visibility_cflags)
+
+
+
+ --api=AQHOME_API
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ $(local/built_headers_pub)
+
+
+
+
+ connection.h
+
+
+
+
+
+
+
+
+ $(local/typefiles)
+
+ connection.c
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/aqhome/client/connection.c b/aqhome/client/connection.c
new file mode 100644
index 0000000..0f63ccc
--- /dev/null
+++ b/aqhome/client/connection.c
@@ -0,0 +1,175 @@
+/****************************************************************************
+ * This file is part of the project AqHome.
+ * AqHome (c) by 2024 Martin Preuss, all rights reserved.
+ *
+ * The license for this file can be found in the file COPYING which you
+ * should have received along with this file.
+ ****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include
+#endif
+
+#include "./connection.h"
+
+#include "aqhome/ipc/endpoint_ipc.h"
+#include "aqhome/ipc/endpoint_ipcclient.h"
+#include "aqhome/ipc/data/ipc_data.h"
+#include "aqhome/ipc/data/msg_data_connect.h"
+#include "aqhome/ipc/msg_ipc_result.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+
+
+
+GWEN_MSG_ENDPOINT *_physConnectToBroker(const char *addr, int port, const char *clientId, uint32_t flags);
+
+
+
+
+int AQH_BrokerConnection_FlushOutMessageQueue(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds)
+{
+ time_t startTime;
+
+ startTime=time(NULL);
+
+ while(GWEN_MsgEndpoint_HaveMessageToSend(epTcp)) {
+ time_t now;
+
+ GWEN_MsgEndpoint_IoLoop(epTcp, 1000); /* 1000 ms */
+ now=time(NULL);
+ if (now-startTime>timeoutInSeconds) {
+ DBG_INFO(AQH_LOGDOMAIN, "Timeout");
+ return GWEN_ERROR_TIMEOUT;
+ }
+ }
+
+ return 0;
+}
+
+
+
+GWEN_MSG *AQH_BrokerConnection_WaitForSpecificMessage(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds)
+{
+ time_t startTime;
+
+ startTime=time(NULL);
+
+ for (;;) {
+ GWEN_MSG *msg;
+ time_t now;
+
+ GWEN_MsgEndpoint_IoLoop(epTcp, 1000); /* 1000 ms */
+ msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp);
+ if (msg) {
+ uint16_t code;
+
+ code=GWEN_IpcMsg_GetCode(msg);
+ if (code==msgCode) {
+ DBG_INFO(AQH_LOGDOMAIN, "Received expected IPC message");
+ return msg;
+ }
+ else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
+ DBG_INFO(AQH_LOGDOMAIN, "Received IPC result message");
+ return msg;
+ }
+ else {
+ DBG_INFO(AQH_LOGDOMAIN, "Received unexpected message %d (%x)", code, code);
+ }
+ GWEN_Msg_free(msg);
+ }
+ now=time(NULL);
+ if (now-startTime>timeoutInSeconds) {
+ DBG_INFO(AQH_LOGDOMAIN, "Timeout");
+ break;
+ }
+ }
+
+ return NULL;
+}
+
+
+
+GWEN_MSG_ENDPOINT *AQH_BrokerConnection_OpenConnection(const char *addr, int port,
+ const char *clientId,
+ const char *userId, const char *passwd,
+ uint32_t flags,
+ int timeoutInSeconds)
+{
+ GWEN_MSG_ENDPOINT *epTcp;
+ GWEN_MSG *msgOut;
+ GWEN_MSG *msgIn;
+ uint32_t result;
+
+ epTcp=_physConnectToBroker(addr, port, clientId, 0);
+ if (epTcp==NULL) {
+ DBG_ERROR(AQH_LOGDOMAIN, "ERROR creating TCP connection");
+ return NULL;
+ }
+
+ msgOut=AQH_ConnectDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_CONNECT_REQ,
+ GWEN_MsgEndpoint_GetNextMessageId(epTcp), 0,
+ clientId, userId, passwd, flags);
+ if (msgOut==NULL) {
+ DBG_ERROR(AQH_LOGDOMAIN, "Error creating message");
+ GWEN_MsgEndpoint_free(epTcp);
+ return NULL;
+ }
+ GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut);
+
+ msgIn=AQH_BrokerConnection_WaitForSpecificMessage(epTcp, AQH_MSGTYPE_IPC_DATA_RESULT, timeoutInSeconds);
+ if (msgIn==NULL) {
+ DBG_ERROR(AQH_LOGDOMAIN, "No response received");
+ GWEN_MsgEndpoint_free(epTcp);
+ return NULL;
+ }
+
+ result=AQH_ResultIpcMsg_GetResultCode(msgIn);
+ GWEN_Msg_free(msgIn);
+
+ if (result!=AQH_MSG_IPC_SUCCESS) {
+ DBG_ERROR(AQH_LOGDOMAIN, "Response: %d", result);
+ GWEN_MsgEndpoint_free(epTcp);
+ return NULL;
+ }
+
+ return epTcp;
+}
+
+
+
+GWEN_MSG_ENDPOINT *_physConnectToBroker(const char *addr, int port, const char *clientId, uint32_t flags)
+{
+ GWEN_MSG_ENDPOINT *ep;
+ GWEN_MSG_ENDPOINT *ipcBaseEndpoint;
+ int rv;
+
+ ep=AQH_ClientIpcEndpoint_new("brokerIpcClient", 0);
+ GWEN_MsgEndpoint_AddFlags(ep, flags);
+
+ ipcBaseEndpoint=AQH_IpcEndpoint_CreateIpcTcpClient(addr, port, "brokerPhysEndpoint", 0);
+ AQH_IpcEndpoint_SetServiceName(ipcBaseEndpoint, clientId);
+ GWEN_MsgEndpoint_Tree2_AddChild(ep, ipcBaseEndpoint);
+
+ rv=GWEN_MultilayerEndpoint_StartConnect(ep);
+ if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) {
+ DBG_ERROR(AQH_LOGDOMAIN, "Error connecting to broker server %s:%d (%d)", addr, port, rv);
+ GWEN_MsgEndpoint_free(ep);
+ return NULL;
+ }
+
+ return ep;
+}
+
+
+
+
+
+
diff --git a/aqhome/client/connection.h b/aqhome/client/connection.h
new file mode 100644
index 0000000..b247219
--- /dev/null
+++ b/aqhome/client/connection.h
@@ -0,0 +1,28 @@
+/****************************************************************************
+ * This file is part of the project AqHome.
+ * AqHome (c) by 2024 Martin Preuss, all rights reserved.
+ *
+ * The license for this file can be found in the file COPYING which you
+ * should have received along with this file.
+ ****************************************************************************/
+
+#ifndef AQHOME_CLIENT_CONNECTION_H
+#define AQHOME_CLIENT_CONNECTION_H
+
+
+#include
+
+#include
+
+
+GWEN_MSG_ENDPOINT *AQH_BrokerConnection_OpenConnection(const char *addr, int port,
+ const char *clientId,
+ const char *userId, const char *passwd,
+ uint32_t flags,
+ int timeoutInSeconds);
+int AQH_BrokerConnection_FlushOutMessageQueue(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds);
+GWEN_MSG *AQH_BrokerConnection_WaitForSpecificMessage(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds);
+
+
+
+#endif
diff --git a/aqhome/ipc/data/cmd_data.c b/aqhome/ipc/data/cmd_data.c
new file mode 100644
index 0000000..462979d
--- /dev/null
+++ b/aqhome/ipc/data/cmd_data.c
@@ -0,0 +1,44 @@
+
+
+
+
+
+GWEN_MSG *AQH_CmdDataIpc_WaitForSpecificIpcMessage(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds)
+{
+ time_t startTime;
+
+ startTime=time(NULL);
+
+ for (;;) {
+ GWEN_MSG *msg;
+ time_t now;
+
+ while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) {
+ uint16_t code;
+
+ code=GWEN_IpcMsg_GetCode(msg);
+ if (code==msgCode) {
+ DBG_INFO(NULL, "Received expected IPC message");
+ return msg;
+ }
+ else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
+ DBG_INFO(NULL, "Received IPC result message");
+ return msg;
+ }
+ else {
+ DBG_INFO(NULL, "Received unexpected message %d (%x)", code, code);
+ }
+ GWEN_Msg_free(msg);
+ }
+ now=time(NULL);
+ if (now-startTime>timeoutInSeconds) {
+ DBG_INFO(NULL, "Timeout");
+ break;
+ }
+ GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
+ }
+
+ return NULL;
+}
+
+
diff --git a/aqhome/ipc/data/cmd_data_getdata.c b/aqhome/ipc/data/cmd_data_getdata.c
new file mode 100644
index 0000000..f8c0e49
--- /dev/null
+++ b/aqhome/ipc/data/cmd_data_getdata.c
@@ -0,0 +1,110 @@
+/****************************************************************************
+ * This file is part of the project AqHome.
+ * AqHome (c) by 2023 Martin Preuss, all rights reserved.
+ *
+ * The license for this file can be found in the file COPYING which you
+ * should have received along with this file.
+ ****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include
+#endif
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+
+#include
+#include
+
+
+
+
+GWEN_MSG *_waitForMatchingResponse(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds);
+
+
+
+
+GWEN_MSG *AQH_CmdDataIpc_RequestDatapointsMsg(GWEN_MSG_ENDPOINT *ep,
+ const char *valueName, uint64_t tsBegin, uint64_t tsEnd, uint64_t num,
+ int timeoutInSeconds)
+{
+ GWEN_MSG *msg;
+ time_t startTime;
+
+ msg=AQH_GetDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ, valueName, tsBegin, tsEnd, num);
+ GWEN_MsgEndpoint_AddSendMessage(ep, msg);
+
+ msg=_waitForMatchingResponse(ep, AQH_MSGTYPE_IPC_DATA_GETDATA_RSP, timeoutInSeconds);
+ if (msg) {
+ uint16_t code;
+
+ code=GWEN_IpcMsg_GetCode(msg);
+ if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
+ uint32_t resultCode;
+
+ resultCode=AQH_ResultIpcMsg_GetResultCode(msg);
+ DBG_INFO(AQH_LOGDOMAIN, "IPC error: %d", resultCode);
+ GWEN_Msg_free(msg);
+ return NULL;
+ }
+ else if (code==AQH_MSGTYPE_IPC_DATA_GETDATA_RSP) {
+ int rv;
+
+ AQH_MultiDataDataIpcMsg_Parse(msg, 0);
+ return msg;
+ }
+ GWEN_Msg_free(msg);
+ }
+ return NULL;
+}
+
+
+
+GWEN_MSG *_waitForMatchingResponse(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds)
+{
+ time_t startTime;
+
+ startTime=time(NULL);
+
+ for (;;) {
+ GWEN_MSG *msg;
+ time_t now;
+
+ while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) {
+ uint16_t code;
+
+ code=GWEN_IpcMsg_GetCode(msg);
+ if (code==msgCode) {
+ DBG_INFO(NULL, "Received expected IPC message");
+ return msg;
+ }
+ else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
+ DBG_INFO(NULL, "Received IPC result message");
+ return msg;
+ }
+ else {
+ DBG_INFO(NULL, "Received unexpected message %d (%x), ignoring", code, code);
+ }
+ GWEN_Msg_free(msg);
+ } /* while */
+
+ now=time(NULL);
+ if (now-startTime>timeoutInSeconds) {
+ DBG_INFO(NULL, "Timeout");
+ break;
+ }
+ GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
+ }
+
+ return NULL;
+}
+
+
+