aqhome: finished transformation of aqhome-data and aqhome-tool.

This commit is contained in:
Martin Preuss
2025-03-02 21:48:22 +01:00
parent 2f468e4f78
commit 58c6d12e36
44 changed files with 1279 additions and 1597 deletions

View File

@@ -13,22 +13,14 @@
#include "./utils.h"
#include "aqhome/ipc/endpoint_ipc.h"
#include "aqhome/ipc/nodes/msg_ipc_setaccmsggrps.h"
#include "aqhome/ipc/nodes/msg_ipc_forward.h"
#include "aqhome/ipc/data/ipc_data.h"
#include "aqhome/ipc/data/msg_data_connect.h"
#include "aqhome/ipc/msg_ipc_result.h"
#include "aqhome/ipc/endpoint_ipcclient.h"
#include "aqhome/msg/ipc/m_ipc.h"
#include "aqhome/msg/ipc/data/m_ipcd.h"
#include "aqhome/msg/ipc/nodes/m_ipcn.h"
#include "aqhome/msg/ipc/nodes/m_ipcn_setaccmsggrps.h"
#include "aqhome/ipc2/tcp_object.h"
#include "aqhome/ipc2/ipc_client.h"
#include <gwenhywfar/endpoint_tcpc.h>
#include <gwenhywfar/endpoint_multilayer.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/timestamp.h>
#include <gwenhywfar/db.h>
@@ -78,315 +70,23 @@ AQH_OBJECT *Utils2_SetupBrokerClientEndpoint(AQH_EVENT_LOOP *eventLoop, GWEN_DB_
GWEN_MSG_ENDPOINT *Utils_SetupBrokerClientEndpoint(GWEN_DB_NODE *dbArgs, uint32_t flags)
int Utils_SendAcceptedMsgGroups(AQH_OBJECT *ep, uint32_t groups)
{
const char *brokerAddress;
int brokerPort;
const char *brokerClientId;
AQH_MESSAGE *msgOut;
brokerAddress=GWEN_DB_GetCharValue(dbArgs, "brokerAddress", 0, NULL);
if (!(brokerAddress && *brokerAddress))
brokerAddress=GWEN_DB_GetCharValue(dbArgs, "ConfigFile/brokerAddress", 0, "127.0.0.1");
brokerPort=GWEN_DB_GetIntValue(dbArgs, "brokerPort", 0, -1);
if (brokerPort<0)
brokerPort=GWEN_DB_GetIntValue(dbArgs, "ConfigFile/brokerPort", 0, 45456);
brokerClientId=GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, "aqhome-tool");
if (brokerAddress && *brokerAddress && brokerPort) {
GWEN_MSG_ENDPOINT *ep;
GWEN_MSG_ENDPOINT *ipcBaseEndpoint;
int rv;
ep=AQH_ClientIpcEndpoint_new("brokerIpcClient", 0);
GWEN_MsgEndpoint_AddFlags(ep, flags);
ipcBaseEndpoint=AQH_IpcEndpoint_CreateIpcTcpClient(brokerAddress, brokerPort, "brokerPhysEndpoint", 0);
AQH_IpcEndpoint_SetServiceName(ipcBaseEndpoint, brokerClientId);
GWEN_MsgEndpoint_Tree2_AddChild(ep, ipcBaseEndpoint);
rv=GWEN_MultilayerEndpoint_StartConnect(ep);
if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) {
DBG_ERROR(NULL, "Error connecting to broker server %s:%d (%d), will retry later", brokerAddress, brokerPort, rv);
GWEN_MsgEndpoint_free(ep);
return NULL;
}
return ep;
}
return NULL;
}
GWEN_MSG_ENDPOINT *Utils_SetupNodesClientEndpoint(GWEN_DB_NODE *dbArgs)
{
return Utils_SetupIpcEndpoint(dbArgs, "tcpAddress", "tcpPort", "ConfigFile/nodesAddress", "ConfigFile/nodesPort", 45454);
}
GWEN_MSG_ENDPOINT *Utils_SetupIpcEndpoint(GWEN_DB_NODE *dbArgs,
const char *varNameAddr,
const char *varNamePort,
const char *fileVarNameAddr,
const char *fileVarNamePort,
int defaultPort)
{
GWEN_MSG_ENDPOINT *epTcp;
const char *tcpAddress;
int tcpPort;
int rv;
tcpAddress=GWEN_DB_GetCharValue(dbArgs, varNameAddr, 0, NULL);
if (!(tcpAddress && *tcpAddress))
tcpAddress=GWEN_DB_GetCharValue(dbArgs, fileVarNameAddr, 0, "127.0.0.1");
tcpPort=GWEN_DB_GetIntValue(dbArgs, varNamePort, 0, -1);
if (tcpPort<0)
tcpPort=GWEN_DB_GetIntValue(dbArgs, fileVarNamePort, 0, defaultPort);
DBG_INFO(NULL, "Setup tcp client endpoint to %s:%d", tcpAddress, tcpPort);
epTcp=AQH_IpcEndpoint_CreateIpcTcpClient(tcpAddress, tcpPort, "aqhome-tool-IPC", 0);
if (epTcp==NULL) {
DBG_ERROR(NULL, "Error creating endpoint TCPc");
return NULL;
}
GWEN_MsgEndpoint_SetDefaultMessageSize(epTcp, UTILS_IPC_ENDPOINT_DEFAULT_MSGSIZE);
rv=GWEN_TcpcEndpoint_StartConnect(epTcp);
if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) {
DBG_ERROR(NULL, "Error connecting (%d)", rv);
GWEN_MsgEndpoint_free(epTcp);
return NULL;
}
return epTcp;
}
GWEN_MSG *Utils_WaitForSpecificNodeMessage(GWEN_MSG_ENDPOINT *epTcp,
int msgCode,
int nodeAddr,
int timeoutInSeconds)
{
time_t startTime;
startTime=time(NULL);
for (;;) {
GWEN_MSG *msg;
time_t now;
while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) {
if (GWEN_IpcMsg_GetCode(msg)==AQH_MSGTYPE_IPC_NODES_FORWARD) {
GWEN_MSG *nodeMsg;
DBG_INFO(NULL, "Received IPC FORWARD message");
nodeMsg=AQH_ForwardIpcMsg_GetCopyOfNodeMsg(msg);
if (nodeMsg) {
DBG_INFO(AQH_LOGDOMAIN,
"Received node msg from %d (%d)",
AQH_NodeMsg_GetSourceAddress(nodeMsg),
AQH_NodeMsg_GetMsgType(nodeMsg));
if (AQH_NodeMsg_GetMsgType(nodeMsg)==msgCode && (nodeAddr==0 || AQH_NodeMsg_GetSourceAddress(nodeMsg)==nodeAddr)) {
GWEN_Msg_free(msg);
return nodeMsg;
}
}
}
else {
DBG_INFO(NULL, "Received IPC message %d, ignoring", GWEN_IpcMsg_GetCode(msg));
}
GWEN_Msg_free(msg);
} /* while */
now=time(NULL);
if (now-startTime>timeoutInSeconds) {
DBG_INFO(NULL, "Timeout");
break;
}
GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
}
return NULL;
}
GWEN_MSG *Utils_WaitForSpecificIpcMessage(GWEN_MSG_ENDPOINT *epTcp,
int msgCode,
int timeoutInSeconds)
{
time_t startTime;
startTime=time(NULL);
for (;;) {
GWEN_MSG *msg;
time_t now;
GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp);
if (msg) {
uint16_t code;
code=GWEN_IpcMsg_GetCode(msg);
if (code==msgCode) {
DBG_INFO(NULL, "Received expected IPC message");
return msg;
}
else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
DBG_INFO(NULL, "Received IPC result message");
return msg;
}
else {
DBG_INFO(NULL, "Received unexpected message %d (%x)", code, code);
}
GWEN_Msg_free(msg);
}
now=time(NULL);
if (now-startTime>timeoutInSeconds) {
DBG_INFO(NULL, "Timeout");
break;
}
}
return NULL;
}
GWEN_MSG *Utils_WaitForResponse(GWEN_MSG_ENDPOINT *epTcp, uint32_t msgId, int timeoutInSeconds)
{
time_t startTime;
startTime=time(NULL);
for (;;) {
GWEN_MSG *msg;
time_t now;
GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp);
if (msg) {
if (GWEN_IpcMsg_GetRefMsgId(msg)==msgId) {
DBG_INFO(NULL, "Received expected IPC message");
return msg;
}
else {
uint16_t code;
code=GWEN_IpcMsg_GetCode(msg);
DBG_ERROR(NULL,
"Received unexpected message %d (%x) [msgId=%d, refMsgId=%d]",
code, code, GWEN_IpcMsg_GetMsgId(msg), GWEN_IpcMsg_GetRefMsgId(msg));
}
GWEN_Msg_free(msg);
}
now=time(NULL);
if (now-startTime>timeoutInSeconds) {
DBG_ERROR(NULL, "Timeout");
break;
}
}
return NULL;
}
int Utils_FlushOutMessageQueue(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds)
{
time_t startTime;
startTime=time(NULL);
while(GWEN_MsgEndpoint_HaveMessageToSend(epTcp)) {
time_t now;
GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
now=time(NULL);
if (now-startTime>timeoutInSeconds) {
DBG_INFO(NULL, "Timeout");
return GWEN_ERROR_TIMEOUT;
}
}
return 0;
}
int Utils_SendAcceptedMsgGroups(GWEN_MSG_ENDPOINT *epTcp, uint32_t groups)
{
GWEN_MSG *msgOut;
msgOut=AQH_SetAcceptedMsgGroupsIpcMsg_new(AQH_MSGTYPE_IPC_NODES_SETACCMSGGRPS,
GWEN_MsgEndpoint_GetNextMessageId(epTcp),0,
groups);
msgOut=AQH_IpcnMessageSetAcceptedMsgGroups_new(AQH_MSGTYPE_IPC_NODES_SETACCMSGGRPS,
AQH_Endpoint_GetNextMessageId(ep),0,
groups);
if (msgOut==NULL) {
DBG_ERROR(NULL, "Error creating message");
return GWEN_ERROR_GENERIC;
}
GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut);
AQH_Endpoint_AddMsgOut(ep, msgOut);
return 0;
}
GWEN_MSG_ENDPOINT *Utils_OpenBrokerConnection(GWEN_DB_NODE *dbArgs, uint32_t flags, int timeoutInSeconds)
{
GWEN_MSG_ENDPOINT *epTcp;
GWEN_MSG *msgOut;
GWEN_MSG *msgIn;
uint32_t result;
const char *clientId;
const char *userId;
const char *password;
clientId=GWEN_DB_GetCharValue(dbArgs, "clientId", 0, NULL);
userId=GWEN_DB_GetCharValue(dbArgs, "userId", 0, NULL);
password=GWEN_DB_GetCharValue(dbArgs, "password", 0, NULL);
epTcp=Utils_SetupBrokerClientEndpoint(dbArgs, 0);
if (epTcp==NULL) {
DBG_ERROR(NULL, "ERROR creating TCP connection");
return NULL;
}
msgOut=AQH_ConnectDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_CONNECT_REQ,
GWEN_MsgEndpoint_GetNextMessageId(epTcp), 0,
clientId, userId, password, flags);
if (msgOut==NULL) {
DBG_ERROR(NULL, "Error creating message");
GWEN_MsgEndpoint_free(epTcp);
return NULL;
}
GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut);
msgIn=Utils_WaitForSpecificIpcMessage(epTcp, AQH_MSGTYPE_IPC_DATA_RESULT, timeoutInSeconds);
if (msgIn==NULL) {
DBG_ERROR(NULL, "No response received");
GWEN_MsgEndpoint_free(epTcp);
return NULL;
}
result=AQH_ResultIpcMsg_GetResultCode(msgIn);
GWEN_Msg_free(msgIn);
if (result!=AQH_MSG_IPC_SUCCESS) {
DBG_ERROR(NULL, "Response: %d", result);
GWEN_MsgEndpoint_free(epTcp);
return NULL;
}
return epTcp;
}
void Utils_PrintDataPoints(const uint64_t *dataPoints, uint32_t numValues, const char *valueUnits)
{
uint32_t i;