aqhome apps: sending a message via aqhome-nodes to nodes now works.

This commit is contained in:
Martin Preuss
2025-03-10 23:22:30 +01:00
parent 541b5ee2ca
commit 9cca3af402
20 changed files with 376 additions and 37 deletions

View File

@@ -30,6 +30,8 @@
#include <gwenhywfar/debug.h>
#include <gwenhywfar/text.h>
#include <time.h>
GWEN_INHERIT(AQH_OBJECT, AQH_TOOL_CLIENT)
@@ -50,6 +52,8 @@ static int _nodesWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo);
static AQH_MESSAGE *_createRequestMessage(AQH_OBJECT *o, uint32_t msgId);
static int _handleResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList, int first);
static int _handleNodeResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, int first);
static int _diffInSeconds(time_t t1, time_t t0);
@@ -228,6 +232,17 @@ void AQH_ToolClient_SubFlags(AQH_OBJECT *o, uint32_t f)
void AQH_ToolClient_SetAcceptedGroups(AQH_OBJECT *o, uint32_t f)
{
AQH_TOOL_CLIENT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o);
if (xo)
xo->acceptedGroups=f;
}
int AQH_ToolClient_Run(AQH_OBJECT *o)
{
AQH_TOOL_CLIENT *xo;
@@ -236,15 +251,9 @@ int AQH_ToolClient_Run(AQH_OBJECT *o)
if (xo) {
int rv;
xo->ipcEndpoint=Utils2_SetupBrokerClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0);
if (xo->ipcEndpoint==NULL) {
DBG_ERROR(NULL, "ERROR creating TCP connection");
return 2;
}
rv=_exchangeConnectMsgs(xo, xo->flags);
if (rv!=AQH_MSGDATA_RESULT_SUCCESS) {
DBG_ERROR(NULL, "Connect response: %d", rv);
rv=AQH_ToolClient_Connect(o, 0, xo->flags, 0);
if (rv<0) {
DBG_ERROR(NULL, "Error connecting (%d)", rv);
return 2;
}
return _sendWaitAndHandle(o, xo);
@@ -338,11 +347,13 @@ int AQH_ToolClient_Connect(AQH_OBJECT *o, uint32_t connFlags, uint32_t connMsgFl
}
}
if (connFlags & AQH_TOOL_CLIENT_CONNECTFLAGS_WITHGRPMSG) {
rv=_exchangeAcceptedMsgGroups(xo, grps);
if (rv<0) {
DBG_INFO(NULL, "here(%d)", rv);
return rv;
if (xo->protoId==AQH_IPC_PROTOCOL_NODES_ID) {
if (connFlags & AQH_TOOL_CLIENT_CONNECTFLAGS_WITHGRPMSG) {
rv=_exchangeAcceptedMsgGroups(xo, grps);
if (rv<0) {
DBG_INFO(NULL, "here(%d)", rv);
return rv;
}
}
}
@@ -359,7 +370,10 @@ int _connectEndpoint(AQH_OBJECT *o)
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o);
if (xo) {
xo->ipcEndpoint=Utils2_SetupBrokerClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0);
if (xo->protoId==AQH_IPC_PROTOCOL_DATA_ID)
xo->ipcEndpoint=Utils2_SetupBrokerClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0);
else if (xo->protoId==AQH_IPC_PROTOCOL_NODES_ID)
xo->ipcEndpoint=Utils2_SetupNodesClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0);
if (xo->ipcEndpoint==NULL) {
DBG_ERROR(NULL, "ERROR creating TCP connection");
return 2;
@@ -538,9 +552,13 @@ int _sendWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo)
int _waitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo, uint32_t msgId)
{
int first=1;
time_t startTime;
startTime=time(NULL);
for (;;) {
AQH_MESSAGE *msgIn;
time_t now;
msgIn=AQH_IpcEndpoint_WaitForResponseMsg(xo->ipcEndpoint, msgId, xo->timeoutInSeconds);
if (msgIn) {
@@ -564,6 +582,11 @@ int _waitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo, uint32_t msgId)
}
}
}
now=time(NULL);
if (xo->timeoutInSeconds && _diffInSeconds(now, startTime)>xo->timeoutInSeconds) {
DBG_ERROR(NULL, "Timeout");
return 3;
}
} /* for */
return 1;
}
@@ -595,15 +618,20 @@ int _nodesSendWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo)
int _nodesWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo)
{
int first=1;
time_t startTime;
startTime=time(NULL);
for (;;) {
AQH_MESSAGE *msgIn;
time_t now;
msgIn=AQH_ToolClient_WaitForNodeMsg(o, 0, 0, xo->timeoutInSeconds);
if (msgIn) {
int rv;
rv=_handleResponseMessage(o, msgIn, NULL, first);
DBG_ERROR(NULL, "Handling node message %d", AQH_NodeMessage_GetMsgType(msgIn));
rv=_handleNodeResponseMessage(o, msgIn, first);
AQH_Message_free(msgIn);
first=0;
if (rv<0) {
@@ -615,6 +643,11 @@ int _nodesWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo)
return 0;
}
}
now=time(NULL);
if (xo->timeoutInSeconds && _diffInSeconds(now, startTime)>xo->timeoutInSeconds) {
DBG_ERROR(NULL, "Timeout");
return 3;
}
} /* for */
return 1;
}
@@ -660,3 +693,24 @@ int _handleResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, const GWEN_TAG
int _handleNodeResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, int first)
{
AQH_TOOL_CLIENT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o);
if (xo) {
if (xo->handleResponseMessageFn)
return xo->handleResponseMessageFn(o, msg, NULL, first);
}
return 0;
}
int _diffInSeconds(time_t t1, time_t t0)
{
return t1-t0;
}