From 9cca3af402d2210a275c3a837020f1674051b1a0 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Mon, 10 Mar 2025 23:22:30 +0100 Subject: [PATCH] aqhome apps: sending a message via aqhome-nodes to nodes now works. --- apps/aqhome-data/main.c | 2 +- apps/aqhome-nodes/0BUILD | 4 + apps/aqhome-nodes/r_connect.c | 4 +- apps/aqhome-nodes/r_connect.h | 4 +- apps/aqhome-nodes/r_forward.c | 79 +++++++++++++++++++ apps/aqhome-nodes/r_forward.h | 27 +++++++ apps/aqhome-nodes/r_setaccmsggrps.c | 56 ++++++++++++++ apps/aqhome-nodes/r_setaccmsggrps.h | 27 +++++++ apps/aqhome-nodes/server.c | 26 +++++-- apps/aqhome-tool/client.c | 86 +++++++++++++++++---- apps/aqhome-tool/client.h | 2 + apps/aqhome-tool/client_p.h | 1 + apps/aqhome-tool/nodes/getnodes.c | 4 + apps/aqhome-tool/nodes/ping.c | 6 +- apps/aqhome-tool/utils.c | 39 ++++++++++ apps/aqhome-tool/utils.h | 2 + aqhome/ipc2/msgwriter.c | 4 +- aqhome/ipc2/tty_endpoint.c | 2 +- aqhome/ipc2/ttyobject.c | 36 ++++++++- aqhome/msg/ipc/nodes/m_ipcn_setaccmsggrps.h | 2 +- 20 files changed, 376 insertions(+), 37 deletions(-) create mode 100644 apps/aqhome-nodes/r_forward.c create mode 100644 apps/aqhome-nodes/r_forward.h create mode 100644 apps/aqhome-nodes/r_setaccmsggrps.c create mode 100644 apps/aqhome-nodes/r_setaccmsggrps.h diff --git a/apps/aqhome-data/main.c b/apps/aqhome-data/main.c index 64cba9b..1b0fe29 100644 --- a/apps/aqhome-data/main.c +++ b/apps/aqhome-data/main.c @@ -153,7 +153,7 @@ void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop) now=time(NULL); if (_diffInSeconds(now, timeLastConnectionCleanup)>CONNCLEAN_INTERVAL_IN_SECS) { - DBG_ERROR(NULL, "Cleanup connections"); + DBG_INFO(NULL, "Cleanup connections"); AqHomeDataServer_CleanupClients(aqh); timeLastConnectionCleanup=now; } diff --git a/apps/aqhome-nodes/0BUILD b/apps/aqhome-nodes/0BUILD index 7721d69..316158d 100644 --- a/apps/aqhome-nodes/0BUILD +++ b/apps/aqhome-nodes/0BUILD @@ -41,6 +41,8 @@ devicesdump.h r_setdata.h r_connect.h + r_forward.h + r_setaccmsggrps.h @@ -51,6 +53,8 @@ devicesdump.c r_setdata.c r_connect.c + r_forward.c + r_setaccmsggrps.c main.c diff --git a/apps/aqhome-nodes/r_connect.c b/apps/aqhome-nodes/r_connect.c index a59fda4..cd79aba 100644 --- a/apps/aqhome-nodes/r_connect.c +++ b/apps/aqhome-nodes/r_connect.c @@ -29,9 +29,8 @@ * ------------------------------------------------------------------------------------------------ */ -void AQH_NodeServer_HandleConnect(GWEN_UNUSED AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) +void AQH_NodeServer_HandleConnect(GWEN_UNUSED AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList) { - GWEN_TAG16_LIST *tagList; AQH_MESSAGE *outMsg; int resultCode=AQH_MSGDATA_RESULT_SUCCESS; char *clientId=NULL; @@ -39,7 +38,6 @@ void AQH_NodeServer_HandleConnect(GWEN_UNUSED AQH_OBJECT *o, AQH_OBJECT *ep, con char *passw=NULL; uint32_t flags; - tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0); clientId=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSG_CONNECT_TAGS_CLIENTID, NULL); userId=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSG_CONNECT_TAGS_USERID, NULL); flags=AQH_Tag16_GetTagDataAsUint32(tagList, AQH_MSG_CONNECT_TAGS_FLAGS, 0); diff --git a/apps/aqhome-nodes/r_connect.h b/apps/aqhome-nodes/r_connect.h index 7b30024..9087189 100644 --- a/apps/aqhome-nodes/r_connect.h +++ b/apps/aqhome-nodes/r_connect.h @@ -12,11 +12,11 @@ #include "./server.h" -#include +#include -void AQH_NodeServer_HandleConnect(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg); +void AQH_NodeServer_HandleConnect(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg, const GWEN_TAG16_LIST *tagList); diff --git a/apps/aqhome-nodes/r_forward.c b/apps/aqhome-nodes/r_forward.c new file mode 100644 index 0000000..f011e93 --- /dev/null +++ b/apps/aqhome-nodes/r_forward.c @@ -0,0 +1,79 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./r_forward.h" +#include "./server_p.h" +#include "aqhome/ipc2/endpoint.h" +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/nodes/m_ipcn.h" +#include "aqhome/msg/ipc/nodes/m_ipcn_forward.h" +#include "aqhome/msg/ipc/m_ipc_connect.h" +#include "aqhome/msg/ipc/m_ipc_result.h" +#include "aqhome/msg/ipc/m_ipc_tag16.h" + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + +void AQH_NodeServer_HandleForward(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList) +{ + AQH_NODE_SERVER *xo; + + xo=AQH_NodeServer_GetServerData(o); + if (xo) { + AQH_MESSAGE *outMsg; + int resultCode=AQH_MSGDATA_RESULT_SUCCESS; + const GWEN_TAG16 *tag; + const uint8_t *ptr; + uint32_t len; + + tag=tagList?GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGNODE_FORWARD_TAGS_MSG):NULL; + ptr=tag?GWEN_Tag16_GetTagData(tag):NULL; + len=tag?GWEN_Tag16_GetTagLength(tag):0; + + if (ptr && len) { + if (xo->ttyEndpoint) { + AQH_MESSAGE *nodeMsg; + + nodeMsg=AQH_Message_new(); + AQH_Message_SetData(nodeMsg, ptr, len); + AQH_Message_SetUsedSize(nodeMsg, len); + AQH_Endpoint_AddMsgOut(xo->ttyEndpoint, nodeMsg); + } + else { + DBG_ERROR(NULL, "TTY endpoint currently not connected"); + resultCode=AQH_MSGDATA_RESULT_ERROR_TRYAGAIN; + } + } + else { + DBG_ERROR(NULL, "Empty message to forward"); + resultCode=AQH_MSGDATA_RESULT_ERROR_BADDATA; + } + + outMsg=AQH_IpcMessageResult_new(AQH_IPC_PROTOCOL_NODES_ID, + AQH_IPC_PROTOCOL_NODES_VERSION, + AQH_MSGTYPE_IPC_NODES_RESULT, + AQH_Endpoint_GetNextMessageId(ep), + AQH_IpcMessage_GetMsgId(msg), + resultCode, NULL); + AQH_Endpoint_AddMsgOut(ep, outMsg); + } +} + + + diff --git a/apps/aqhome-nodes/r_forward.h b/apps/aqhome-nodes/r_forward.h new file mode 100644 index 0000000..e6f1359 --- /dev/null +++ b/apps/aqhome-nodes/r_forward.h @@ -0,0 +1,27 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2024 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOMED_R_FORWARD_H +#define AQHOMED_R_FORWARD_H + + +#include "./server.h" + +#include + + +void AQH_NodeServer_HandleForward(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList); + + + + + + +#endif + + diff --git a/apps/aqhome-nodes/r_setaccmsggrps.c b/apps/aqhome-nodes/r_setaccmsggrps.c new file mode 100644 index 0000000..15f0f8d --- /dev/null +++ b/apps/aqhome-nodes/r_setaccmsggrps.c @@ -0,0 +1,56 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./r_setaccmsggrps.h" +#include "./server_p.h" +#include "aqhome/ipc2/endpoint.h" +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/nodes/m_ipcn.h" +#include "aqhome/msg/ipc/nodes/m_ipcn_setaccmsggrps.h" +#include "aqhome/msg/ipc/m_ipc_connect.h" +#include "aqhome/msg/ipc/m_ipc_result.h" +#include "aqhome/msg/ipc/m_ipc_tag16.h" + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + +void AQH_NodeServer_HandleSetAccMsgGrps(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList) +{ + AQH_NODE_SERVER *xo; + + xo=AQH_NodeServer_GetServerData(o); + if (xo) { + AQH_MESSAGE *outMsg; + int resultCode=AQH_MSGDATA_RESULT_SUCCESS; + uint32_t msgGrps=0; + + msgGrps=AQH_IpcnMessageSetAcceptedMsgGroups_GetGroups(tagList); + AQH_Endpoint_SetAcceptedMsgGroups(ep, msgGrps); + outMsg=AQH_IpcMessageResult_new(AQH_IPC_PROTOCOL_NODES_ID, + AQH_IPC_PROTOCOL_NODES_VERSION, + AQH_MSGTYPE_IPC_NODES_RESULT, + AQH_Endpoint_GetNextMessageId(ep), + AQH_IpcMessage_GetMsgId(msg), + resultCode, NULL); + AQH_Endpoint_AddMsgOut(ep, outMsg); + } +} + + + diff --git a/apps/aqhome-nodes/r_setaccmsggrps.h b/apps/aqhome-nodes/r_setaccmsggrps.h new file mode 100644 index 0000000..a83f624 --- /dev/null +++ b/apps/aqhome-nodes/r_setaccmsggrps.h @@ -0,0 +1,27 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2024 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOMED_R_SETACCMSGGRPS_H +#define AQHOMED_R_SETACCMSGGRPS_H + + +#include "./server.h" + +#include + + +void AQH_NodeServer_HandleSetAccMsgGrps(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList); + + + + + + +#endif + + diff --git a/apps/aqhome-nodes/server.c b/apps/aqhome-nodes/server.c index 418ae5c..869e154 100644 --- a/apps/aqhome-nodes/server.c +++ b/apps/aqhome-nodes/server.c @@ -15,6 +15,8 @@ #include "./devicesread.h" #include "./r_setdata.h" #include "./r_connect.h" +#include "./r_forward.h" +#include "./r_setaccmsggrps.h" #include #include @@ -29,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -594,13 +597,13 @@ int _exchangeConnect(AQH_NODE_SERVER *xo, uint32_t flags) uint32_t msgId; msgId=AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint); - msgOut=AQH_IpcMessageConnect_new(xo->protoId, xo->protoVer, + msgOut=AQH_IpcMessageConnect_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_CONNECT_REQ, msgId, 0, xo->brokerClientId, NULL, NULL, flags); AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, msgOut); return AQH_IpcEndpoint_WaitForResultMsg(xo->brokerEndpoint, - xo->protoId, xo->protoVer, AQH_MSGTYPE_IPC_RESULT, + AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_RESULT, msgId, xo->timeoutInSeconds); } @@ -751,11 +754,19 @@ void _handleMsgFromClient(GWEN_UNUSED AQH_OBJECT *o, GWEN_UNUSED AQH_NODE_SERVER /* exec IPC message */ code=AQH_IpcMessage_GetCode(msg); protoId=AQH_IpcMessage_GetProtoId(msg); - if (protoId==AQH_IPC_PROTOCOL_DATA_ID) { - DBG_ERROR(NULL, "Received IPC packet %d (%x)", (int) code, code); - switch(code) { - case AQH_MSGTYPE_IPC_DATA_CONNECT_REQ: AQH_NodeServer_HandleConnect(o, ep, msg); break; - default: break; + if (protoId==AQH_IPC_PROTOCOL_NODES_ID) { + GWEN_TAG16_LIST *tagList; + + tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0); + if (tagList) { + DBG_ERROR(NULL, "Received IPC packet %d (%x)", (int) code, code); + switch(code) { + case AQH_MSGTYPE_IPC_NODES_CONNECT_REQ: AQH_NodeServer_HandleConnect(o, ep, msg, tagList); break; + case AQH_MSGTYPE_IPC_NODES_FORWARD: AQH_NodeServer_HandleForward(o, ep, msg, tagList); break; + case AQH_MSGTYPE_IPC_NODES_SETACCMSGGRPS: AQH_NodeServer_HandleSetAccMsgGrps(o, ep, msg, tagList); break; + default: break; + } + GWEN_Tag16_List_free(tagList); } } else { @@ -835,6 +846,7 @@ void _forwardTtyMsgToClients(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) if (AQH_Endpoint_GetAcceptedMsgGroups(ep) & msgGroup) { AQH_MESSAGE *outMsg; + DBG_ERROR(NULL, "Forwarding node message %d to client", AQH_NodeMessage_GetMsgType(msg)); outMsg=AQH_IpcnMessageForward_new(AQH_MSGTYPE_IPC_NODES_FORWARD, AQH_Endpoint_GetNextMessageId(ep), 0, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg)); diff --git a/apps/aqhome-tool/client.c b/apps/aqhome-tool/client.c index 98135d7..57824a6 100644 --- a/apps/aqhome-tool/client.c +++ b/apps/aqhome-tool/client.c @@ -30,6 +30,8 @@ #include #include +#include + GWEN_INHERIT(AQH_OBJECT, AQH_TOOL_CLIENT) @@ -50,6 +52,8 @@ static int _nodesWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo); static AQH_MESSAGE *_createRequestMessage(AQH_OBJECT *o, uint32_t msgId); static int _handleResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList, int first); +static int _handleNodeResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, int first); +static int _diffInSeconds(time_t t1, time_t t0); @@ -228,6 +232,17 @@ void AQH_ToolClient_SubFlags(AQH_OBJECT *o, uint32_t f) +void AQH_ToolClient_SetAcceptedGroups(AQH_OBJECT *o, uint32_t f) +{ + AQH_TOOL_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); + if (xo) + xo->acceptedGroups=f; +} + + + int AQH_ToolClient_Run(AQH_OBJECT *o) { AQH_TOOL_CLIENT *xo; @@ -236,15 +251,9 @@ int AQH_ToolClient_Run(AQH_OBJECT *o) if (xo) { int rv; - xo->ipcEndpoint=Utils2_SetupBrokerClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0); - if (xo->ipcEndpoint==NULL) { - DBG_ERROR(NULL, "ERROR creating TCP connection"); - return 2; - } - - rv=_exchangeConnectMsgs(xo, xo->flags); - if (rv!=AQH_MSGDATA_RESULT_SUCCESS) { - DBG_ERROR(NULL, "Connect response: %d", rv); + rv=AQH_ToolClient_Connect(o, 0, xo->flags, 0); + if (rv<0) { + DBG_ERROR(NULL, "Error connecting (%d)", rv); return 2; } return _sendWaitAndHandle(o, xo); @@ -338,11 +347,13 @@ int AQH_ToolClient_Connect(AQH_OBJECT *o, uint32_t connFlags, uint32_t connMsgFl } } - if (connFlags & AQH_TOOL_CLIENT_CONNECTFLAGS_WITHGRPMSG) { - rv=_exchangeAcceptedMsgGroups(xo, grps); - if (rv<0) { - DBG_INFO(NULL, "here(%d)", rv); - return rv; + if (xo->protoId==AQH_IPC_PROTOCOL_NODES_ID) { + if (connFlags & AQH_TOOL_CLIENT_CONNECTFLAGS_WITHGRPMSG) { + rv=_exchangeAcceptedMsgGroups(xo, grps); + if (rv<0) { + DBG_INFO(NULL, "here(%d)", rv); + return rv; + } } } @@ -359,7 +370,10 @@ int _connectEndpoint(AQH_OBJECT *o) xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); if (xo) { - xo->ipcEndpoint=Utils2_SetupBrokerClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0); + if (xo->protoId==AQH_IPC_PROTOCOL_DATA_ID) + xo->ipcEndpoint=Utils2_SetupBrokerClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0); + else if (xo->protoId==AQH_IPC_PROTOCOL_NODES_ID) + xo->ipcEndpoint=Utils2_SetupNodesClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0); if (xo->ipcEndpoint==NULL) { DBG_ERROR(NULL, "ERROR creating TCP connection"); return 2; @@ -538,9 +552,13 @@ int _sendWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo) int _waitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo, uint32_t msgId) { int first=1; + time_t startTime; + + startTime=time(NULL); for (;;) { AQH_MESSAGE *msgIn; + time_t now; msgIn=AQH_IpcEndpoint_WaitForResponseMsg(xo->ipcEndpoint, msgId, xo->timeoutInSeconds); if (msgIn) { @@ -564,6 +582,11 @@ int _waitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo, uint32_t msgId) } } } + now=time(NULL); + if (xo->timeoutInSeconds && _diffInSeconds(now, startTime)>xo->timeoutInSeconds) { + DBG_ERROR(NULL, "Timeout"); + return 3; + } } /* for */ return 1; } @@ -595,15 +618,20 @@ int _nodesSendWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo) int _nodesWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo) { int first=1; + time_t startTime; + + startTime=time(NULL); for (;;) { AQH_MESSAGE *msgIn; + time_t now; msgIn=AQH_ToolClient_WaitForNodeMsg(o, 0, 0, xo->timeoutInSeconds); if (msgIn) { int rv; - rv=_handleResponseMessage(o, msgIn, NULL, first); + DBG_ERROR(NULL, "Handling node message %d", AQH_NodeMessage_GetMsgType(msgIn)); + rv=_handleNodeResponseMessage(o, msgIn, first); AQH_Message_free(msgIn); first=0; if (rv<0) { @@ -615,6 +643,11 @@ int _nodesWaitAndHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo) return 0; } } + now=time(NULL); + if (xo->timeoutInSeconds && _diffInSeconds(now, startTime)>xo->timeoutInSeconds) { + DBG_ERROR(NULL, "Timeout"); + return 3; + } } /* for */ return 1; } @@ -660,3 +693,24 @@ int _handleResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, const GWEN_TAG +int _handleNodeResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, int first) +{ + AQH_TOOL_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); + if (xo) { + if (xo->handleResponseMessageFn) + return xo->handleResponseMessageFn(o, msg, NULL, first); + } + return 0; +} + + + +int _diffInSeconds(time_t t1, time_t t0) +{ + return t1-t0; +} + + + diff --git a/apps/aqhome-tool/client.h b/apps/aqhome-tool/client.h index 44cee91..49b572f 100644 --- a/apps/aqhome-tool/client.h +++ b/apps/aqhome-tool/client.h @@ -57,6 +57,8 @@ void AQH_ToolClient_SetFlags(AQH_OBJECT *o, uint32_t f); void AQH_ToolClient_AddFlags(AQH_OBJECT *o, uint32_t f); void AQH_ToolClient_SubFlags(AQH_OBJECT *o, uint32_t f); +void AQH_ToolClient_SetAcceptedGroups(AQH_OBJECT *o, uint32_t f); + /** * @param o client object diff --git a/apps/aqhome-tool/client_p.h b/apps/aqhome-tool/client_p.h index e7c52bb..2903f3d 100644 --- a/apps/aqhome-tool/client_p.h +++ b/apps/aqhome-tool/client_p.h @@ -27,6 +27,7 @@ struct AQH_TOOL_CLIENT { AQH_OBJECT *ipcEndpoint; int timeoutInSeconds; uint32_t flags; + uint32_t acceptedGroups; uint8_t protoId; uint8_t protoVer; diff --git a/apps/aqhome-tool/nodes/getnodes.c b/apps/aqhome-tool/nodes/getnodes.c index 770103f..84e6e20 100644 --- a/apps/aqhome-tool/nodes/getnodes.c +++ b/apps/aqhome-tool/nodes/getnodes.c @@ -19,6 +19,7 @@ #include "aqhome/msg/ipc/nodes/m_ipcn.h" #include "aqhome/msg/ipc/nodes/m_ipcn_getdevices_req.h" #include "aqhome/msg/ipc/nodes/m_ipcn_getdevices_rsp.h" +#include "aqhome/msg/node/m_node.h" #include #include @@ -78,6 +79,9 @@ int AQH_Tool_GetNodes(GWEN_DB_NODE *dbGlobalArgs, int argc, char **argv) eventLoop=AQH_EventLoop_new(); o=AQH_ToolClient_new(eventLoop, AQH_IPC_PROTOCOL_NODES_ID, AQH_IPC_PROTOCOL_NODES_VERSION, dbGlobalArgs, args); + AQH_ToolClient_SetFlags(o, AQH_TOOL_CLIENT_CONNECTFLAGS_WITHCONNECTMSG | AQH_TOOL_CLIENT_CONNECTFLAGS_WITHGRPMSG); + AQH_ToolClient_SetAcceptedGroups(o, AQH_MSG_TYPEGROUP_ALL); + AQH_ToolClient_SetCreateRequestMessageFn(o, _createRequestMessage); AQH_ToolClient_SetHandleResponseMessageFn(o, _handleResponseMessage); rv=AQH_ToolClient_ReadLocalArgs(o, argc, argv); diff --git a/apps/aqhome-tool/nodes/ping.c b/apps/aqhome-tool/nodes/ping.c index 7cbddc4..d01f19e 100644 --- a/apps/aqhome-tool/nodes/ping.c +++ b/apps/aqhome-tool/nodes/ping.c @@ -125,11 +125,11 @@ int _handleResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, GWEN_UNUSED co code=AQH_NodeMessage_GetMsgType(msg); if ((code==AQH_MSG_TYPE_PONG) && (nodeAddr==0 || nodeAddr==0xff || nodeAddr==AQH_NodeMessage_GetSourceAddress(msg))) { - return 0; + return 1; } else { - DBG_INFO(NULL, "Unexpected message \"%d\"", code); - return 3; + DBG_ERROR(NULL, "Unexpected message \"%d\"", code); + return 0; } } diff --git a/apps/aqhome-tool/utils.c b/apps/aqhome-tool/utils.c index 00c2228..967a68a 100644 --- a/apps/aqhome-tool/utils.c +++ b/apps/aqhome-tool/utils.c @@ -70,6 +70,45 @@ AQH_OBJECT *Utils2_SetupBrokerClientEndpoint(AQH_EVENT_LOOP *eventLoop, GWEN_DB_ +AQH_OBJECT *Utils2_SetupNodesClientEndpoint(AQH_EVENT_LOOP *eventLoop, GWEN_DB_NODE *dbArgs, uint32_t flags) +{ + const char *address; + int port; + + address=GWEN_DB_GetCharValue(dbArgs, "tcpAddress", 0, NULL); + if (!(address && *address)) + address=GWEN_DB_GetCharValue(dbArgs, "ConfigFile/nodesAddress", 0, "127.0.0.1"); + + port=GWEN_DB_GetIntValue(dbArgs, "tcpPort", 0, -1); + if (port<0) + port=GWEN_DB_GetIntValue(dbArgs, "ConfigFile/nodesPort", 0, 45456); + + if (address && *address && port) { + AQH_OBJECT *ep; + int fd; + + DBG_ERROR(NULL, "Connecting to nodes server %s:%d", address, port); + fd=AQH_TcpObject_CreateConnectedSocket(address, port); + if (fd<0) { + DBG_ERROR(NULL, "Error connecting to nodes server %s:%d", address, port); + return NULL; + } + DBG_ERROR(NULL, "Connected to nodes server %s:%d", address, port); + + ep=AQH_IpcClientObject_new(eventLoop, fd); + assert(ep); + AQH_Endpoint_AddFlags(ep, flags); + return ep; + } + else { + DBG_ERROR(NULL, "No server settings"); + } + + return NULL; +} + + + int Utils_SendAcceptedMsgGroups(AQH_OBJECT *ep, uint32_t groups) { AQH_MESSAGE *msgOut; diff --git a/apps/aqhome-tool/utils.h b/apps/aqhome-tool/utils.h index 575d9e7..125c31d 100644 --- a/apps/aqhome-tool/utils.h +++ b/apps/aqhome-tool/utils.h @@ -24,6 +24,8 @@ AQH_OBJECT *Utils2_SetupBrokerClientEndpoint(AQH_EVENT_LOOP *eventLoop, GWEN_DB_NODE *dbArgs, uint32_t flags); AQH_MESSAGE *Utils2_WaitForResponseMsg(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *epTcp, uint32_t refMsgId, int timeoutInSeconds); + +AQH_OBJECT *Utils2_SetupNodesClientEndpoint(AQH_EVENT_LOOP *eventLoop, GWEN_DB_NODE *dbArgs, uint32_t flags); int Utils_SendAcceptedMsgGroups(AQH_OBJECT *ep, uint32_t groups); diff --git a/aqhome/ipc2/msgwriter.c b/aqhome/ipc2/msgwriter.c index 1c87c8b..4c4322e 100644 --- a/aqhome/ipc2/msgwriter.c +++ b/aqhome/ipc2/msgwriter.c @@ -20,7 +20,7 @@ #include -#define AQH_MSGWRITER_FLAGS_MSGSTARTED 0x0001 +#define AQH_MSGWRITER_FLAGS_MSGSTARTED 0x80000000 @@ -136,6 +136,7 @@ int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject) if (xo->bytesLeft) { if (!(xo->flags & AQH_MSGWRITER_FLAGS_MSGSTARTED)) { + DBG_ERROR(NULL, "Starting message"); rv=_startMsg(xo, fdObject); if (rv<0) { if (rv==GWEN_ERROR_TRY_AGAIN) { @@ -166,6 +167,7 @@ int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject) const uint8_t *msgPtr; _endMsg(xo, fdObject); + DBG_ERROR(NULL, "Ended message"); msgPtr=xo->msgBufPtr; msgLen=xo->msgBufLen; _resetBuffer(o); diff --git a/aqhome/ipc2/tty_endpoint.c b/aqhome/ipc2/tty_endpoint.c index 846307c..c66cbc3 100644 --- a/aqhome/ipc2/tty_endpoint.c +++ b/aqhome/ipc2/tty_endpoint.c @@ -45,7 +45,7 @@ AQH_OBJECT *AQH_TtyEndpoint2_new(AQH_EVENT_LOOP *eventLoop, int fd) msgReader=AQH_NodeMsgReader_new(eventLoop, fdReader); AQH_Object_Enable(msgReader); - fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE); + fdWriter=AQH_TtyObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE); msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter); endpoint=AQH_Endpoint_new(eventLoop, msgReader, msgWriter); diff --git a/aqhome/ipc2/ttyobject.c b/aqhome/ipc2/ttyobject.c index 066620b..876ff56 100644 --- a/aqhome/ipc2/ttyobject.c +++ b/aqhome/ipc2/ttyobject.c @@ -19,6 +19,8 @@ #include #include #include +#include + #define AQH_TTYOBJECT_BAUDRATE B19200 @@ -35,6 +37,7 @@ static void _endMsg(AQH_OBJECT *o); static int _getAttn(int fd); static int _setAttn(int fd, int val); static int _fdSetBlocking(int sk, int fl); +//static int _msleep(long int msec); @@ -70,6 +73,7 @@ int _startMsg(AQH_OBJECT *o) return rv; } else if (rv==0) { + DBG_ERROR(NULL, "Line busy"); return GWEN_ERROR_TRY_AGAIN; /* line busy */ } else { @@ -89,8 +93,9 @@ void _endMsg(AQH_OBJECT *o) int fd; fd=AQH_FdObject_GetFd(o); - if (fd>=0) + if (fd>=0) { _setAttn(fd, 1); /* set ATTN high */ + } } } @@ -132,6 +137,7 @@ int _setAttn(int fd, int val) DBG_ERROR(AQH_LOGDOMAIN, "Error on ioctl: %s (%d)", strerror(errno), errno); return GWEN_ERROR_IO; } + DBG_ERROR(NULL, "Set ATTN to %d", val); return 0; } @@ -187,6 +193,12 @@ int AQH_TtyObject_OpenAndInitDevice(const char *device, struct termios *initialT return GWEN_ERROR_IO; } + rv=_setAttn(fd, 1); + if (rv<0) { + DBG_ERROR(AQH_LOGDOMAIN, "Error on setAttn(%s): %s (%d)", device, strerror(errno), errno); + return GWEN_ERROR_IO; + } + return fd; } @@ -225,6 +237,26 @@ int _fdSetBlocking(int fd, int fl) - +#if 0 +int _msleep(long int msec) +{ + struct timespec ts; + int rv; + + if (msec<0) { + errno=EINVAL; + return -1; + } + + ts.tv_sec=msec/1000; + ts.tv_nsec=(msec%1000)*1000000; + + do { + rv=nanosleep(&ts, &ts); + } while (rv && errno==EINTR); + + return rv; +} +#endif diff --git a/aqhome/msg/ipc/nodes/m_ipcn_setaccmsggrps.h b/aqhome/msg/ipc/nodes/m_ipcn_setaccmsggrps.h index db21a1c..e2ea1dc 100644 --- a/aqhome/msg/ipc/nodes/m_ipcn_setaccmsggrps.h +++ b/aqhome/msg/ipc/nodes/m_ipcn_setaccmsggrps.h @@ -29,7 +29,7 @@ AQHOME_API AQH_MESSAGE *AQH_IpcnMessageSetAcceptedMsgGroups_new(uint16_t code, u AQHOME_API void AQH_IpcdMessageSetAcceptedMsgGroups_DumpToBuffer(const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList, GWEN_BUFFER *dbuf, const char *sText); -uint32_t AQH_IpcnMessageSetAcceptedMsgGroups_GetGroups(const GWEN_TAG16_LIST *tagList); +AQHOME_API uint32_t AQH_IpcnMessageSetAcceptedMsgGroups_GetGroups(const GWEN_TAG16_LIST *tagList); #endif