From 8968f14122d8d10908e02a216397ccb3cf153064 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Wed, 26 Feb 2025 20:59:20 +0100 Subject: [PATCH] aqhome: more work on new event/ipc interface. --- aqhome/events2/fdobject.c | 10 +- aqhome/events2/fdobject.h | 9 +- aqhome/events2/object.c | 47 +- aqhome/events2/object.h | 3 +- aqhome/events2/object_p.h | 2 + aqhome/ipc/data/msg_data_connect.c | 3 +- aqhome/ipc/data/msg_data_devices.c | 3 +- aqhome/ipc/data/msg_data_getdata.c | 3 +- aqhome/ipc/data/msg_data_multidata.c | 3 +- aqhome/ipc/data/msg_data_set.c | 3 +- aqhome/ipc/data/msg_data_values.c | 3 +- aqhome/ipc/msg_ipc_qwords.c | 1 - aqhome/ipc/msg_ipc_tag16.c | 5 +- aqhome/ipc2/0BUILD | 14 +- aqhome/ipc2/{nodeendpoint.c => endpoint.c} | 146 ++++-- aqhome/ipc2/endpoint.h | 44 ++ .../ipc2/{nodeendpoint_p.h => endpoint_p.h} | 12 +- aqhome/ipc2/ipc_server.c | 146 ++++++ aqhome/ipc2/ipc_server.h | 35 ++ aqhome/ipc2/ipc_server_p.h | 26 + aqhome/ipc2/message.c | 15 + aqhome/ipc2/message.h | 4 + aqhome/ipc2/message_p.h | 2 + aqhome/ipc2/msgreader.c | 5 + aqhome/ipc2/msgrequest.c | 473 ++++++++++++++++++ aqhome/ipc2/msgrequest.h | 104 ++++ aqhome/ipc2/msgrequest_p.h | 45 ++ aqhome/ipc2/msgwriter.c | 5 + aqhome/ipc2/nodeendpoint.h | 37 -- aqhome/ipc2/tcpd_object.c | 11 +- aqhome/ipc2/tcpd_object.h | 6 +- aqhome/ipc2/tty_endpoint.c | 57 +++ aqhome/ipc2/tty_endpoint.h | 29 ++ aqhome/libtest.c | 48 +- 34 files changed, 1233 insertions(+), 126 deletions(-) rename aqhome/ipc2/{nodeendpoint.c => endpoint.c} (50%) create mode 100644 aqhome/ipc2/endpoint.h rename aqhome/ipc2/{nodeendpoint_p.h => endpoint_p.h} (75%) create mode 100644 aqhome/ipc2/ipc_server.c create mode 100644 aqhome/ipc2/ipc_server.h create mode 100644 aqhome/ipc2/ipc_server_p.h create mode 100644 aqhome/ipc2/msgrequest.c create mode 100644 aqhome/ipc2/msgrequest.h create mode 100644 aqhome/ipc2/msgrequest_p.h delete mode 100644 aqhome/ipc2/nodeendpoint.h create mode 100644 aqhome/ipc2/tty_endpoint.c create mode 100644 aqhome/ipc2/tty_endpoint.h diff --git a/aqhome/events2/fdobject.c b/aqhome/events2/fdobject.c index 252b0c4..e819a5e 100644 --- a/aqhome/events2/fdobject.c +++ b/aqhome/events2/fdobject.c @@ -1,6 +1,6 @@ /**************************************************************************** - * This file is part of the project Gwenhywfar. - * Gwenhywfar (c) by 2025 Martin Preuss, all rights reserved. + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. @@ -66,6 +66,9 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) AQH_FDOBJECT *xo; xo=(AQH_FDOBJECT*)p; + if (xo->fd>=0) + close(xo->fd); + xo->fd=-1; GWEN_FREE_OBJECT(xo); } @@ -175,6 +178,7 @@ int AQH_FdObject_Read(AQH_OBJECT *o, uint8_t *ptrBuffer, uint32_t lenBuffer) } else { DBG_ERROR(AQH_LOGDOMAIN, "Error on read: %s (%d)", strerror(errno), errno); + close(xo->fd); xo->fd=-1; return GWEN_ERROR_IO; } @@ -217,6 +221,7 @@ int AQH_FdObject_FlushInput(AQH_OBJECT *o) else if (rv<0) { if (errno!=EINTR && errno!=EWOULDBLOCK && errno!=EAGAIN) { DBG_ERROR(AQH_LOGDOMAIN, "Error on read: %s (%d)", strerror(errno), errno); + close(xo->fd); xo->fd=-1; return GWEN_ERROR_IO; } @@ -253,6 +258,7 @@ int AQH_FdObject_Write(AQH_OBJECT *o, const uint8_t *ptrBuffer, uint32_t lenBuff } else { DBG_ERROR(AQH_LOGDOMAIN, "Error on write: %s (%d)", strerror(errno), errno); + close(xo->fd); xo->fd=-1; return GWEN_ERROR_IO; } diff --git a/aqhome/events2/fdobject.h b/aqhome/events2/fdobject.h index 4d4779b..f2c923e 100644 --- a/aqhome/events2/fdobject.h +++ b/aqhome/events2/fdobject.h @@ -1,6 +1,6 @@ /**************************************************************************** - * This file is part of the project Gwenhywfar. - * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. @@ -28,7 +28,10 @@ typedef int (*AQH_FDOBJECT_STARTMSG_FN)(AQH_OBJECT *o); typedef void (*AQH_FDOBJECT_ENDMSG_FN)(AQH_OBJECT *o); - +/** + * Create object for given file descriptor (takes over ownership of the given file descriptor, use dup() if + * fd used in another object). + */ AQHOME_API AQH_OBJECT *AQH_FdObject_new(AQH_EVENT_LOOP *eventLoop, int fd, int mode); AQHOME_API int AQH_FdObject_GetFdMode(const AQH_OBJECT *o); diff --git a/aqhome/events2/object.c b/aqhome/events2/object.c index d50118c..4584da5 100644 --- a/aqhome/events2/object.c +++ b/aqhome/events2/object.c @@ -49,6 +49,7 @@ AQH_OBJECT *AQH_Object_new(AQH_EVENT_LOOP *eventLoop) AQH_OBJECT *o; GWEN_NEW_OBJECT(AQH_OBJECT, o); + o->refCount=1; GWEN_INHERIT_INIT(AQH_OBJECT, o); GWEN_LIST_INIT(AQH_OBJECT, o); o->eventLoop=eventLoop; @@ -59,15 +60,25 @@ AQH_OBJECT *AQH_Object_new(AQH_EVENT_LOOP *eventLoop) +void AQH_Object_IncRefCount(AQH_OBJECT *o) +{ + if (o && o->refCount>0) + o->refCount++; +} + + + void AQH_Object_free(AQH_OBJECT *o) { - if (o) { - GWEN_INHERIT_FINI(AQH_OBJECT, o); - GWEN_LIST_FINI(AQH_OBJECT, o); + if (o && o->refCount>0) { + if (--(o->refCount)==0) { + GWEN_INHERIT_FINI(AQH_OBJECT, o); + GWEN_LIST_FINI(AQH_OBJECT, o); - AQH_Link_List_free(o->linkList); + AQH_Link_List_free(o->linkList); - GWEN_FREE_OBJECT(o); + GWEN_FREE_OBJECT(o); + } } } @@ -75,14 +86,14 @@ void AQH_Object_free(AQH_OBJECT *o) uint32_t AQH_Object_GetFlags(const AQH_OBJECT *o) { - return o?o->flags:0; + return (o && o->refCount)?o->flags:0; } void AQH_Object_SetFlags(AQH_OBJECT *o, uint32_t i) { - if (o) + if (o && o->refCount) o->flags=i; } @@ -90,7 +101,7 @@ void AQH_Object_SetFlags(AQH_OBJECT *o, uint32_t i) void AQH_Object_AddFlags(AQH_OBJECT *o, uint32_t i) { - if (o) + if (o && o->refCount) o->flags|=i; } @@ -98,7 +109,7 @@ void AQH_Object_AddFlags(AQH_OBJECT *o, uint32_t i) void AQH_Object_SubFlags(AQH_OBJECT *o, uint32_t i) { - if (o) + if (o && o->refCount) o->flags&=~i; } @@ -106,14 +117,14 @@ void AQH_Object_SubFlags(AQH_OBJECT *o, uint32_t i) AQH_EVENT_LOOP *AQH_Object_GetEventLoop(const AQH_OBJECT *o) { - return o?o->eventLoop:NULL; + return (o && o->refCount)?o->eventLoop:NULL; } AQH_LINK *_findLink(AQH_OBJECT *o, uint32_t signalId, uint32_t slotId, AQH_OBJECT *targetObject) { - if (o) { + if (o && o->refCount) { AQH_LINK *ln; ln=AQH_Link_List_First(o->linkList); @@ -163,7 +174,7 @@ int AQH_Object_EmitSignal(AQH_OBJECT *o, uint32_t signalId, int param1, void *pa { int signalWasHandled=0; - if (o) { + if (o && o->refCount) { AQH_LINK *ln; ln=AQH_Link_List_First(o->linkList); @@ -182,7 +193,7 @@ int AQH_Object_EmitSignal(AQH_OBJECT *o, uint32_t signalId, int param1, void *pa int AQH_Object_HandleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2) { - if (o && o->signalHandlerFn) + if (o && o->refCount && o->signalHandlerFn) return o->signalHandlerFn(o, slotId, senderObject, param1, param2); return 0; } @@ -191,7 +202,7 @@ int AQH_Object_HandleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderOb void AQH_Object_Enable(AQH_OBJECT *o) { - if (o && o->enableFn) + if (o && o->refCount && o->enableFn) o->enableFn(o); } @@ -199,7 +210,7 @@ void AQH_Object_Enable(AQH_OBJECT *o) void AQH_Object_Disable(AQH_OBJECT *o) { - if (o && o->disableFn) + if (o && o->refCount && o->disableFn) o->disableFn(o); } @@ -207,7 +218,7 @@ void AQH_Object_Disable(AQH_OBJECT *o) AQH_OBJECT_SIGNALHANDLER_FN AQH_Object_SetSignalHandlerFn(AQH_OBJECT *o, AQH_OBJECT_SIGNALHANDLER_FN f) { - if (o) { + if (o && o->refCount) { AQH_OBJECT_SIGNALHANDLER_FN oldFn; oldFn=o->signalHandlerFn; @@ -221,7 +232,7 @@ AQH_OBJECT_SIGNALHANDLER_FN AQH_Object_SetSignalHandlerFn(AQH_OBJECT *o, AQH_OBJ AQH_OBJECT_ENABLE_FN AQH_Object_SetEnableFn(AQH_OBJECT *o, AQH_OBJECT_ENABLE_FN f) { - if (o) { + if (o && o->refCount) { AQH_OBJECT_ENABLE_FN oldFn; oldFn=o->enableFn; @@ -235,7 +246,7 @@ AQH_OBJECT_ENABLE_FN AQH_Object_SetEnableFn(AQH_OBJECT *o, AQH_OBJECT_ENABLE_FN AQH_OBJECT_DISABLE_FN AQH_Object_SetDisableFn(AQH_OBJECT *o, AQH_OBJECT_DISABLE_FN f) { - if (o) { + if (o && o->refCount) { AQH_OBJECT_DISABLE_FN oldFn; oldFn=o->disableFn; diff --git a/aqhome/events2/object.h b/aqhome/events2/object.h index ab99672..cb53e79 100644 --- a/aqhome/events2/object.h +++ b/aqhome/events2/object.h @@ -9,7 +9,7 @@ #ifndef AQH_OBJECT_H #define AQH_OBJECT_H -#include "aqhome/api.h" +#include #include #include @@ -60,6 +60,7 @@ typedef void (*AQH_OBJECT_DISABLE_FN)(AQH_OBJECT *o); */ /*@{*/ AQHOME_API AQH_OBJECT *AQH_Object_new(AQH_EVENT_LOOP *eventLoop); +AQHOME_API void AQH_Object_IncRefCount(AQH_OBJECT *o); AQHOME_API void AQH_Object_free(AQH_OBJECT *o); /*@}*/ diff --git a/aqhome/events2/object_p.h b/aqhome/events2/object_p.h index b5f5c6e..003e495 100644 --- a/aqhome/events2/object_p.h +++ b/aqhome/events2/object_p.h @@ -40,6 +40,8 @@ struct AQH_OBJECT { AQH_OBJECT_SIGNALHANDLER_FN signalHandlerFn; AQH_OBJECT_ENABLE_FN enableFn; AQH_OBJECT_DISABLE_FN disableFn; + + int refCount; }; diff --git a/aqhome/ipc/data/msg_data_connect.c b/aqhome/ipc/data/msg_data_connect.c index d2e3263..d3beed6 100644 --- a/aqhome/ipc/data/msg_data_connect.c +++ b/aqhome/ipc/data/msg_data_connect.c @@ -70,7 +70,8 @@ void AQH_ConnectDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, flags=AQH_Tag16IpcMsg_GetTagDataAsUint32(msg, AQH_MSGDATA_CONNECT_TAGS_FLAGS, 0); GWEN_Buffer_AppendArgs(dbuf, - "CONNECT (code=%d, proto=%d, proto version=%d, clientId=%s, userId=%s, flags=%08x)\n", + "CONNECT %s (code=%d, proto=%d, proto version=%d, clientId=%s, userId=%s, flags=%08x)\n", + sText?sText:"", GWEN_IpcMsg_GetCode(msg), GWEN_IpcMsg_GetProtoId(msg), GWEN_IpcMsg_GetProtoVersion(msg), diff --git a/aqhome/ipc/data/msg_data_devices.c b/aqhome/ipc/data/msg_data_devices.c index 60bd066..7a6f4d4 100644 --- a/aqhome/ipc/data/msg_data_devices.c +++ b/aqhome/ipc/data/msg_data_devices.c @@ -137,7 +137,8 @@ void AQH_DevicesDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, { if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSGDATA_DEVICES_MINSIZE) { GWEN_Buffer_AppendArgs(dbuf, - "DEVICES (code=%d, proto=%d, proto version=%d, flags=0x%08x)\n", + "DEVICES %s (code=%d, proto=%d, proto version=%d, flags=0x%08x)\n", + sText?sText:"", GWEN_IpcMsg_GetCode(msg), GWEN_IpcMsg_GetProtoId(msg), GWEN_IpcMsg_GetProtoVersion(msg), diff --git a/aqhome/ipc/data/msg_data_getdata.c b/aqhome/ipc/data/msg_data_getdata.c index 7e3cbb4..6765297 100644 --- a/aqhome/ipc/data/msg_data_getdata.c +++ b/aqhome/ipc/data/msg_data_getdata.c @@ -68,7 +68,8 @@ void AQH_GetDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, tsEnd=AQH_Tag16IpcMsg_GetTagDataAsUint64(msg, AQH_MSGDATA_GETDATA_TAGS_END, 0); GWEN_Buffer_AppendArgs(dbuf, - "GETDATA (code=%d, proto=%d, proto version=%d, name=%s, tsBegin=%lu, tsEnd=%lu)\n", + "GETDATA %s (code=%d, proto=%d, proto version=%d, name=%s, tsBegin=%lu, tsEnd=%lu)\n", + sText?sText:"", GWEN_IpcMsg_GetCode(msg), GWEN_IpcMsg_GetProtoId(msg), GWEN_IpcMsg_GetProtoVersion(msg), diff --git a/aqhome/ipc/data/msg_data_multidata.c b/aqhome/ipc/data/msg_data_multidata.c index ee12dbc..bd2b8f6 100644 --- a/aqhome/ipc/data/msg_data_multidata.c +++ b/aqhome/ipc/data/msg_data_multidata.c @@ -139,7 +139,8 @@ void AQH_MultiDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf numberOfPoints=(tag?GWEN_Tag16_GetTagLength(tag):0)/(2*sizeof(uint64_t)); GWEN_Buffer_AppendArgs(dbuf, - "MULTIDATA (code=%d, proto=%d, proto version=%d, name=%s, units=%s, type=%d, datapoints=%u)\n", + "MULTIDATA %s (code=%d, proto=%d, proto version=%d, name=%s, units=%s, type=%d, datapoints=%u)\n", + sText?sText:"", GWEN_IpcMsg_GetCode(msg), GWEN_IpcMsg_GetProtoId(msg), GWEN_IpcMsg_GetProtoVersion(msg), diff --git a/aqhome/ipc/data/msg_data_set.c b/aqhome/ipc/data/msg_data_set.c index 8eec621..689f133 100644 --- a/aqhome/ipc/data/msg_data_set.c +++ b/aqhome/ipc/data/msg_data_set.c @@ -92,7 +92,8 @@ void AQH_SetDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, cons { if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSGDATA_SET_MINSIZE) { GWEN_Buffer_AppendArgs(dbuf, - "SET (code=%d, proto=%d, proto version=%d)\n", + "SET %s (code=%d, proto=%d, proto version=%d)\n", + sText?sText:"", GWEN_IpcMsg_GetCode(msg), GWEN_IpcMsg_GetProtoId(msg), GWEN_IpcMsg_GetProtoVersion(msg)); diff --git a/aqhome/ipc/data/msg_data_values.c b/aqhome/ipc/data/msg_data_values.c index 3477c56..a07d14a 100644 --- a/aqhome/ipc/data/msg_data_values.c +++ b/aqhome/ipc/data/msg_data_values.c @@ -139,7 +139,8 @@ void AQH_ValuesDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, c { if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSGDATA_VALUES_MINSIZE) { GWEN_Buffer_AppendArgs(dbuf, - "VALUES (code=%d, proto=%d, proto version=%d, flags=0x%08x)\n", + "VALUES %s (code=%d, proto=%d, proto version=%d, flags=0x%08x)\n", + sText?sText:"", GWEN_IpcMsg_GetCode(msg), GWEN_IpcMsg_GetProtoId(msg), GWEN_IpcMsg_GetProtoVersion(msg), diff --git a/aqhome/ipc/msg_ipc_qwords.c b/aqhome/ipc/msg_ipc_qwords.c index 59431bf..e6367d1 100644 --- a/aqhome/ipc/msg_ipc_qwords.c +++ b/aqhome/ipc/msg_ipc_qwords.c @@ -37,7 +37,6 @@ GWEN_MSG *AQH_QwordsIpcMsg_new(uint8_t protoId, uint8_t protoVer, uint16_t code, uint32_t flags, const uint64_t *i64Ptr, int count) { GWEN_MSG *msg; - uint8_t *ptr; int payloadSize; int i; diff --git a/aqhome/ipc/msg_ipc_tag16.c b/aqhome/ipc/msg_ipc_tag16.c index d0c9446..1ffd3b9 100644 --- a/aqhome/ipc/msg_ipc_tag16.c +++ b/aqhome/ipc/msg_ipc_tag16.c @@ -60,7 +60,7 @@ void AQH_Tag16IpcMsg_Extend(GWEN_MSG *msg) -void _freeData(void *bp, void *p) +void _freeData(GWEN_UNUSED void *bp, void *p) { AQH_MSG_IPC_TAG16 *xmsg; @@ -239,7 +239,8 @@ void AQH_Tag16IpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const { if (GWEN_Msg_GetBytesInBuffer(msg)>=GWEN_MSGIPC_OFFS_PAYLOAD) { GWEN_Buffer_AppendArgs(dbuf, - "Tag16 (code=%d, proto=%d, proto version=%d)\n", + "Tag16 %s (code=%d, proto=%d, proto version=%d)\n", + sText?sText:"", GWEN_IpcMsg_GetCode(msg), GWEN_IpcMsg_GetProtoId(msg), GWEN_IpcMsg_GetProtoVersion(msg)); diff --git a/aqhome/ipc2/0BUILD b/aqhome/ipc2/0BUILD index a9c2dd7..62a5e6d 100644 --- a/aqhome/ipc2/0BUILD +++ b/aqhome/ipc2/0BUILD @@ -51,8 +51,11 @@ nodemsgreader.h ttyobject.h tcpd_object.h - nodeendpoint.h + endpoint.h message.h + msgrequest.h + ipc_server.h + tty_endpoint.h @@ -60,8 +63,10 @@ msgreader_p.h msgwriter_p.h tcpd_object_p.h - nodeendpoint_p.h + endpoint_p.h message_p.h + msgrequest_p.h + ipc_server_p.h @@ -74,8 +79,11 @@ nodemsgreader.c ttyobject.c tcpd_object.c - nodeendpoint.c + endpoint.c message.c + msgrequest.c + ipc_server.c + tty_endpoint.c diff --git a/aqhome/ipc2/nodeendpoint.c b/aqhome/ipc2/endpoint.c similarity index 50% rename from aqhome/ipc2/nodeendpoint.c rename to aqhome/ipc2/endpoint.c index f873652..d3b6f79 100644 --- a/aqhome/ipc2/nodeendpoint.c +++ b/aqhome/ipc2/endpoint.c @@ -10,7 +10,7 @@ # include #endif -#include "./nodeendpoint_p.h" +#include "./endpoint_p.h" #include "aqhome/ipc2/msgreader.h" #include "aqhome/ipc2/msgwriter.h" #include "aqhome/msg/node/m_node.h" @@ -25,7 +25,24 @@ -GWEN_INHERIT(AQH_OBJECT, AQH_NODE_ENDPOINT) +/* ------------------------------------------------------------------------------------------------ + * defs and enums + * ------------------------------------------------------------------------------------------------ + */ + +enum { + AQH_ENDPOINT_SLOT_MSG_RECVD=1, + AQH_ENDPOINT_SLOT_MSG_SENT +}; + + + +/* ------------------------------------------------------------------------------------------------ + * global vars + * ------------------------------------------------------------------------------------------------ + */ + +GWEN_INHERIT(AQH_OBJECT, AQH_ENDPOINT) @@ -36,8 +53,8 @@ GWEN_INHERIT(AQH_OBJECT, AQH_NODE_ENDPOINT) static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); -static int _handleMsgRecvd(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr); -static int _handleMsgSent(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr); +static int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr); +static int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr); static void _dumpMsg(const AQH_MESSAGE *msg, const char *sText); @@ -47,14 +64,14 @@ static void _dumpMsg(const AQH_MESSAGE *msg, const char *sText); * ------------------------------------------------------------------------------------------------ */ -AQH_OBJECT *AQH_NodeEndpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter) +AQH_OBJECT *AQH_Endpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter) { AQH_OBJECT *o; - AQH_NODE_ENDPOINT *xo; + AQH_ENDPOINT *xo; o=AQH_Object_new(eventLoop); - GWEN_NEW_OBJECT(AQH_NODE_ENDPOINT, xo); - GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o, xo, _freeData); + GWEN_NEW_OBJECT(AQH_ENDPOINT, xo); + GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_ENDPOINT, o, xo, _freeData); xo->msgOutList=AQH_Message_List_new(); xo->msgInList=AQH_Message_List_new(); @@ -63,12 +80,12 @@ AQH_OBJECT *AQH_NodeEndpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReade if (msgReader) { xo->msgReader=msgReader; - AQH_Object_AddLink(msgReader, AQH_MSG_READER_SIGNAL_MSGRECVD, AQH_NODE_ENDPOINT_SLOT_MSG_RECVD, o); + AQH_Object_AddLink(msgReader, AQH_MSG_READER_SIGNAL_MSGRECVD, AQH_ENDPOINT_SLOT_MSG_RECVD, o); } if (msgWriter) { xo->msgWriter=msgWriter; - AQH_Object_AddLink(msgWriter, AQH_MSG_WRITER_SIGNAL_MSGSENT, AQH_NODE_ENDPOINT_SLOT_MSG_SENT, o); + AQH_Object_AddLink(msgWriter, AQH_MSG_WRITER_SIGNAL_MSGSENT, AQH_ENDPOINT_SLOT_MSG_SENT, o); } return o; @@ -78,9 +95,9 @@ AQH_OBJECT *AQH_NodeEndpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReade void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) { - AQH_NODE_ENDPOINT *xo; + AQH_ENDPOINT *xo; - xo=(AQH_NODE_ENDPOINT*) p; + xo=(AQH_ENDPOINT*) p; AQH_Message_List_free(xo->msgOutList); AQH_Message_List_free(xo->msgInList); AQH_Object_free(xo->msgWriter); @@ -90,12 +107,12 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) -AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgOutList(const AQH_OBJECT *o) +AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgOutList(const AQH_OBJECT *o) { if (o) { - AQH_NODE_ENDPOINT *xo; + AQH_ENDPOINT *xo; - xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o); + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); if (xo) return xo->msgOutList; } @@ -104,12 +121,12 @@ AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgOutList(const AQH_OBJECT *o) -AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgOut(AQH_OBJECT *o) +AQH_MESSAGE *AQH_Endpoint_GetNextMsgOut(AQH_OBJECT *o) { if (o) { - AQH_NODE_ENDPOINT *xo; + AQH_ENDPOINT *xo; - xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o); + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); if (xo) return AQH_Message_List_First(xo->msgOutList); } @@ -118,12 +135,12 @@ AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgOut(AQH_OBJECT *o) -void AQH_NodeEndpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg) +void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg) { if (o && msg) { - AQH_NODE_ENDPOINT *xo; + AQH_ENDPOINT *xo; - xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o); + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); if (xo) { AQH_Message_List_Add(msg, xo->msgOutList); if (xo->msgWriter && AQH_Message_List_GetCount(xo->msgOutList)==1) { @@ -136,12 +153,12 @@ void AQH_NodeEndpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg) -AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgInList(const AQH_OBJECT *o) +AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgInList(const AQH_OBJECT *o) { if (o) { - AQH_NODE_ENDPOINT *xo; + AQH_ENDPOINT *xo; - xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o); + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); if (xo) return xo->msgInList; } @@ -150,12 +167,12 @@ AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgInList(const AQH_OBJECT *o) -AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgIn(AQH_OBJECT *o) +AQH_MESSAGE *AQH_Endpoint_GetNextMsgIn(AQH_OBJECT *o) { if (o) { - AQH_NODE_ENDPOINT *xo; + AQH_ENDPOINT *xo; - xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o); + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); if (xo) return AQH_Message_List_First(xo->msgInList); } @@ -164,12 +181,12 @@ AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgIn(AQH_OBJECT *o) -void AQH_NodeEndpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg) +void AQH_Endpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg) { if (o && msg) { - AQH_NODE_ENDPOINT *xo; + AQH_ENDPOINT *xo; - xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o); + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); if (xo) { AQH_Message_List_Add(msg, xo->msgInList); DBG_ERROR(AQH_LOGDOMAIN, "now %d msgs in list", AQH_Message_List_GetCount(xo->msgInList)); @@ -179,11 +196,39 @@ void AQH_NodeEndpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg) -int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2) +uint32_t AQH_Endpoint_GetLastMessageId(const AQH_OBJECT *o) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + return xo->lastMsgId; + } + return 0; +} + + + +uint32_t AQH_Endpoint_GetNextMessageId(AQH_OBJECT *o) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + return ++(xo->lastMsgId); + } + return 0; +} + + + +int _handleSignal(AQH_OBJECT *o, uint32_t slotId, GWEN_UNUSED AQH_OBJECT *senderObject, int param1, void *param2) { switch(slotId) { - case AQH_NODE_ENDPOINT_SLOT_MSG_RECVD: return _handleMsgRecvd(o, senderObject, param1, param2); - case AQH_NODE_ENDPOINT_SLOT_MSG_SENT: return _handleMsgSent(o, senderObject, param1, param2); + case AQH_ENDPOINT_SLOT_MSG_RECVD: return _handleMsgRecvd(o, param1, param2); + case AQH_ENDPOINT_SLOT_MSG_SENT: return _handleMsgSent(o, param1, param2); default: break; } @@ -193,24 +238,53 @@ int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int -int _handleMsgRecvd(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr) +int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr) { AQH_MESSAGE *msg; DBG_ERROR(AQH_LOGDOMAIN, "Msg received:"); msg=AQH_NodeMessage_fromBuffer(msgPtr, msgLen); _dumpMsg(msg, "Received"); - AQH_NodeEndpoint_AddMsgIn(o, msg); + AQH_Endpoint_AddMsgIn(o, msg); return 1; } -int _handleMsgSent(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr) +int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr) { DBG_ERROR(AQH_LOGDOMAIN, "Msg sent:"); GWEN_Text_LogString((const char*) msgPtr, msgLen, AQH_LOGDOMAIN, GWEN_LoggerLevel_Error); + + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) { + AQH_MESSAGE *msg; + + msg=AQH_Message_List_First(xo->msgOutList); + if (msg) { + /* remove sent message from list */ + AQH_Message_List_Del(msg); + AQH_Message_free(msg); + + /* get next message in list */ + msg=AQH_Message_List_First(xo->msgOutList); + if (msg) { + DBG_ERROR(AQH_LOGDOMAIN, "Sending next message"); + AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg)); + } + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Last message sent, disabling writer"); + AQH_Object_Disable(xo->msgWriter); + } + } + } + + return 1; } diff --git a/aqhome/ipc2/endpoint.h b/aqhome/ipc2/endpoint.h new file mode 100644 index 0000000..a11c500 --- /dev/null +++ b/aqhome/ipc2/endpoint.h @@ -0,0 +1,44 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_ENDPOINT_H +#define AQH_ENDPOINT_H + +#include +#include + + + +/** + * Constructor. + * + * Takes over ownership of msgReader and msgWriter, links signal AQH_MSG_READER_SIGNAL_MSGRECVD of msgReader + * and signal AQH_MSG_WRITER_SIGNAL_MSGSENT of msgWriter to the newly created endpoint object. + * + * @param eventLoop pointer to event loop + * @param msgReader pointer to message reader object (takes over ownership) + * @param msgWriter pointer to message writer object (takes over ownership) + */ +AQHOME_API AQH_OBJECT *AQH_Endpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter); + +AQHOME_API AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgOutList(const AQH_OBJECT *o); +AQHOME_API AQH_MESSAGE *AQH_Endpoint_GetNextMsgOut(AQH_OBJECT *o); +AQHOME_API void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg); + +AQHOME_API AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgInList(const AQH_OBJECT *o); +AQHOME_API AQH_MESSAGE *AQH_Endpoint_GetNextMsgIn(AQH_OBJECT *o); +AQHOME_API void AQH_Endpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg); + +AQHOME_API uint32_t AQH_Endpoint_GetLastMessageId(const AQH_OBJECT *o); +AQHOME_API uint32_t AQH_Endpoint_GetNextMessageId(AQH_OBJECT *o); + + + + +#endif + diff --git a/aqhome/ipc2/nodeendpoint_p.h b/aqhome/ipc2/endpoint_p.h similarity index 75% rename from aqhome/ipc2/nodeendpoint_p.h rename to aqhome/ipc2/endpoint_p.h index 6f60d5d..7039406 100644 --- a/aqhome/ipc2/nodeendpoint_p.h +++ b/aqhome/ipc2/endpoint_p.h @@ -6,19 +6,21 @@ * should have received along with this file. ****************************************************************************/ -#ifndef AQH_NODEENDPOINT_P_H -#define AQH_NODEENDPOINT_P_H +#ifndef AQH_ENDPOINT_P_H +#define AQH_ENDPOINT_P_H -#include "./nodeendpoint.h" +#include "./endpoint.h" -typedef struct AQH_NODE_ENDPOINT AQH_NODE_ENDPOINT; +typedef struct AQH_ENDPOINT AQH_ENDPOINT; -struct AQH_NODE_ENDPOINT { +struct AQH_ENDPOINT { AQH_MESSAGE_LIST *msgOutList; AQH_MESSAGE_LIST *msgInList; AQH_OBJECT *msgWriter; AQH_OBJECT *msgReader; + + uint32_t lastMsgId; }; diff --git a/aqhome/ipc2/ipc_server.c b/aqhome/ipc2/ipc_server.c new file mode 100644 index 0000000..34a5957 --- /dev/null +++ b/aqhome/ipc2/ipc_server.c @@ -0,0 +1,146 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./ipc_server_p.h" + +#include +#include +#include +#include + +#include + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * defs and enums + * ------------------------------------------------------------------------------------------------ + */ + +enum { + AQH_IPCD_OBJECT_SLOT_NEWCONN=1 +}; + + + +/* ------------------------------------------------------------------------------------------------ + * global vars + * ------------------------------------------------------------------------------------------------ + */ + +GWEN_INHERIT(AQH_OBJECT, AQH_IPC_SERVER) + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); +static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); +static int _handleNewConn(AQH_OBJECT *o, int newFd); + + + +/* ------------------------------------------------------------------------------------------------ + * implementation + * ------------------------------------------------------------------------------------------------ + */ + +AQH_OBJECT *AQH_IpcServerObject_new(AQH_EVENT_LOOP *eventLoop, int fd) +{ + AQH_OBJECT *o; + AQH_IPC_SERVER *xo; + + o=AQH_Object_new(eventLoop); + GWEN_NEW_OBJECT(AQH_IPC_SERVER, xo); + GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_IPC_SERVER, o, xo, _freeData); + + AQH_Object_SetSignalHandlerFn(o, _handleSignal); + + xo->tcpdObject=AQH_TcpdObject_new(eventLoop, fd); + AQH_Object_AddLink(xo->tcpdObject, AQH_TCPD_OBJECT_SIGNAL_NEWCONN, AQH_IPCD_OBJECT_SLOT_NEWCONN, o); + AQH_Object_Enable(xo->tcpdObject); + + return o; +} + + + +void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) +{ + AQH_IPC_SERVER *xo; + + xo=(AQH_IPC_SERVER*) p; + if (xo->tcpdObject) { + AQH_Object_Disable(xo->tcpdObject); + AQH_Object_free(xo->tcpdObject); + xo->tcpdObject=NULL; + } + GWEN_FREE_OBJECT(xo); +} + + + +int _handleSignal(AQH_OBJECT *o, + uint32_t slotId, + GWEN_UNUSED AQH_OBJECT *senderObject, + int param1, + GWEN_UNUSED void *param2) +{ + switch(slotId) { + case AQH_TCPD_OBJECT_SIGNAL_NEWCONN: return _handleNewConn(o, param1); + default: + break; + } + + return 0; /* not handled */ +} + + + +int _handleNewConn(AQH_OBJECT *o, int newFd) +{ + AQH_EVENT_LOOP *eventLoop; + int fdCopy; + AQH_OBJECT *fdReader; + AQH_OBJECT *fdWriter; + AQH_OBJECT *msgReader; + AQH_OBJECT *msgWriter; + AQH_OBJECT *endpoint; + + DBG_ERROR(AQH_LOGDOMAIN, "Incoming connection"); + eventLoop=AQH_Object_GetEventLoop(o); + fdCopy=dup(newFd); + + fdReader=AQH_FdObject_new(eventLoop, newFd, AQH_FDOBJECT_FDMODE_READ); + msgReader=AQH_IpcMsgReader_new(eventLoop, fdReader); + + fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE); + msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter); + + endpoint=AQH_Endpoint_new(eventLoop, msgReader, msgWriter); + if (0==AQH_Object_EmitSignal(o, AQH_TCPD_OBJECT_SIGNAL_NEWCONN, 0, (void*) endpoint)) { + DBG_ERROR(AQH_LOGDOMAIN, "New connection not handled"); + AQH_Object_free(endpoint); + } + + return 1; /* msg handled */ +} + + + + + diff --git a/aqhome/ipc2/ipc_server.h b/aqhome/ipc2/ipc_server.h new file mode 100644 index 0000000..038ad79 --- /dev/null +++ b/aqhome/ipc2/ipc_server.h @@ -0,0 +1,35 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_IPC_SERVER_H +#define AQH_IPC_SERVER_H + + +#include + + + +enum { + /** param2=pointer to endpoint object (see @ref AQH_Endpoint_new) */ + AQH_IPC_SERVER_SIGNAL_NEWCLIENT=AQH_OBJECT_SIGNAL_LAST +}; + + + +/** + * Constructor. + * + * @param eventLoop pointer to eventLoop + * @param fd socket to listen on (see @ref AQH_TcpdObject_new). + */ +AQHOME_API AQH_OBJECT *AQH_IpcServerObject_new(AQH_EVENT_LOOP *eventLoop, int fd); + + + +#endif + diff --git a/aqhome/ipc2/ipc_server_p.h b/aqhome/ipc2/ipc_server_p.h new file mode 100644 index 0000000..4e01202 --- /dev/null +++ b/aqhome/ipc2/ipc_server_p.h @@ -0,0 +1,26 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_IPC_SERVER_P_H +#define AQH_IPC_SERVER_P_H + + +#include "./ipc_server.h" + + + +typedef struct AQH_IPC_SERVER AQH_IPC_SERVER; +struct AQH_IPC_SERVER { + AQH_OBJECT *tcpdObject; +}; + + + + +#endif + diff --git a/aqhome/ipc2/message.c b/aqhome/ipc2/message.c index 6a9ff84..587d85b 100644 --- a/aqhome/ipc2/message.c +++ b/aqhome/ipc2/message.c @@ -76,6 +76,21 @@ void AQH_Message_free(AQH_MESSAGE *msg) +AQH_OBJECT *AQH_Message_GetObject(const AQH_MESSAGE *msg) +{ + return (msg && msg->refCount)?msg->object:NULL; +} + + + +void AQH_Message_SetObject(AQH_MESSAGE *msg, AQH_OBJECT *o) +{ + if (msg && msg->refCount) + msg->object=o; +} + + + uint8_t *AQH_Message_GetMsgPointer(const AQH_MESSAGE *msg) { return (msg && msg->refCount)?msg->msgPointer:NULL; diff --git a/aqhome/ipc2/message.h b/aqhome/ipc2/message.h index 3c3c76a..2f15b7e 100644 --- a/aqhome/ipc2/message.h +++ b/aqhome/ipc2/message.h @@ -11,6 +11,7 @@ #include +#include #include #include @@ -54,6 +55,9 @@ AQHOME_API void AQH_Message_WriteBytesAt(AQH_MESSAGE *msg, uint32_t pos, const u AQHOME_API int AQH_Message_WriteStringAt(AQH_MESSAGE *msg, uint32_t pos, uint32_t maxSize, int filler, const char *s); AQHOME_API int AQH_Message_WriteStringWithTrailingNullAt(AQH_MESSAGE *msg, uint32_t pos, uint32_t maxSize, int filler, const char *s); +AQHOME_API AQH_OBJECT *AQH_Message_GetObject(const AQH_MESSAGE *msg); +AQHOME_API void AQH_Message_SetObject(AQH_MESSAGE *msg, AQH_OBJECT *o); + #endif diff --git a/aqhome/ipc2/message_p.h b/aqhome/ipc2/message_p.h index 125dedf..a99fab0 100644 --- a/aqhome/ipc2/message_p.h +++ b/aqhome/ipc2/message_p.h @@ -23,6 +23,8 @@ struct AQH_MESSAGE { uint8_t *msgPointer; uint32_t msgSize; uint32_t usedSize; + + AQH_OBJECT *object; }; diff --git a/aqhome/ipc2/msgreader.c b/aqhome/ipc2/msgreader.c index 8776c05..85cf8cb 100644 --- a/aqhome/ipc2/msgreader.c +++ b/aqhome/ipc2/msgreader.c @@ -84,6 +84,11 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) xo=(AQH_MSG_READER*) p; free(xo->currentMsgBuf); + if (xo->fdObject) { + AQH_Object_Disable(xo->fdObject); + AQH_Object_free(xo->fdObject); + xo->fdObject=NULL; + } GWEN_FREE_OBJECT(xo); } diff --git a/aqhome/ipc2/msgrequest.c b/aqhome/ipc2/msgrequest.c new file mode 100644 index 0000000..953fe21 --- /dev/null +++ b/aqhome/ipc2/msgrequest.c @@ -0,0 +1,473 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./msgrequest_p.h" +#include +#include + +#include + + +GWEN_INHERIT_FUNCTIONS(AQH_MSG_REQUEST) +GWEN_TREE2_FUNCTIONS(AQH_MSG_REQUEST, AQH_MsgRequest) + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void _freeFinishedRequests(AQH_MSG_REQUEST *rq); + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + +AQH_MSG_REQUEST *AQH_MsgRequest_new() +{ + AQH_MSG_REQUEST *rq; + + GWEN_NEW_OBJECT(AQH_MSG_REQUEST, rq); + GWEN_INHERIT_INIT(AQH_MSG_REQUEST, rq); + GWEN_TREE2_INIT(AQH_MSG_REQUEST, rq, AQH_MsgRequest); + + return rq; +} + + + +void AQH_MsgRequest_free(AQH_MSG_REQUEST *rq) +{ + if (rq) { + GWEN_TREE2_FINI(AQH_MSG_REQUEST, rq, AQH_MsgRequest); + GWEN_INHERIT_FINI(AQH_MSG_REQUEST, rq); + + GWEN_Timestamp_free(rq->expiresAt); + GWEN_Timestamp_free(rq->createdAt); + AQH_Message_free(rq->requestMsg); + AQH_Message_List_free(rq->msgList); + AQH_Object_free(rq->endpoint); + + GWEN_FREE_OBJECT(rq); + } +} + + + +int AQH_MsgRequest_GetRequestType(const AQH_MSG_REQUEST *rq) +{ + return rq?rq->requestType:0; +} + + + +void AQH_MsgRequest_SetRequestType(AQH_MSG_REQUEST *rq, int t) +{ + if (rq) + rq->requestType=t; +} + + + +AQH_MESSAGE *AQH_MsgRequest_GetRequestMsg(const AQH_MSG_REQUEST *rq) +{ + return rq?rq->requestMsg:NULL; +} + + + +void AQH_MsgRequest_SetRequestMsg(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg) +{ + if (rq) { + AQH_Message_free(rq->requestMsg); + rq->requestMsg=msg; + } +} + + + +AQH_OBJECT *AQH_MsgRequest_GetEndpoint(const AQH_MSG_REQUEST *rq) +{ + return rq?rq->endpoint:NULL; +} + + + +void AQH_MsgRequest_SetEndpoint(AQH_MSG_REQUEST *rq, AQH_OBJECT *ep) +{ + if (rq) { + if (ep) + AQH_Object_IncRefCount(ep); + if (rq->endpoint) + AQH_Object_free(ep); + rq->endpoint=ep; + } +} + + + +uint32_t AQH_MsgRequest_GetRequestMsgId(const AQH_MSG_REQUEST *rq) +{ + return rq?rq->requestMsgId:0; +} + + + +void AQH_MsgRequest_SetRequestMsgId(AQH_MSG_REQUEST *rq, uint32_t id) +{ + if (rq) + rq->requestMsgId=id; +} + + + +AQH_MESSAGE_LIST *AQH_MsgRequest_GetMsgList(const AQH_MSG_REQUEST *rq) +{ + return rq?rq->msgList:NULL; +} + + + +AQH_MESSAGE *AQH_MsgRequest_GetFirstMsgFromList(const AQH_MSG_REQUEST *rq) +{ + return (rq && rq->msgList)?AQH_Message_List_First(rq->msgList):NULL; +} + + + +void AQH_MsgRequest_AddMsgToList(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg) +{ + if (rq && msg) { + if (rq->msgList==NULL) + rq->msgList=AQH_Message_List_new(); + AQH_Message_List_Add(msg, rq->msgList); + } +} + + + +const GWEN_TIMESTAMP *AQH_MsgRequest_GetCreatedAt(const AQH_MSG_REQUEST *rq) +{ + return rq?rq->createdAt:NULL; +} + + + +void AQH_MsgRequest_SetCreatedAt(AQH_MSG_REQUEST *rq, GWEN_TIMESTAMP *ts) +{ + if (rq) { + GWEN_Timestamp_free(rq->createdAt); + rq->createdAt=ts; + } +} + + + +const GWEN_TIMESTAMP *AQH_MsgRequest_GetExpiresAt(const AQH_MSG_REQUEST *rq) +{ + return rq?rq->expiresAt:NULL; +} + + + +void AQH_MsgRequest_SetExpiresAt(AQH_MSG_REQUEST *rq, GWEN_TIMESTAMP *ts) +{ + if (rq) { + GWEN_Timestamp_free(rq->expiresAt); + rq->expiresAt=ts; + } +} + + + +void AQH_MsgRequest_SetTimestamps(AQH_MSG_REQUEST *rq, int expiresInSecs) +{ + if (rq) { + GWEN_TIMESTAMP *ts; + + ts=GWEN_Timestamp_NowInLocalTime(); + GWEN_Timestamp_free(rq->createdAt); + rq->createdAt=GWEN_Timestamp_dup(ts); + GWEN_Timestamp_AddSeconds(ts, expiresInSecs); + GWEN_Timestamp_free(rq->expiresAt); + rq->expiresAt=ts; + } +} + + + +int AQH_MsgRequest_HandleResponse(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg) +{ + if (rq && rq->handleResponseFn) + return (rq->handleResponseFn)(rq, msg); + return AQH_MSG_REQUEST_RESULT_NOT_HANDLED; +} + + + +void AQH_MsgRequest_SubRequestFinished(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST *subRq, int reason) +{ + if (rq && rq->subRequestFinishedFn) + rq->subRequestFinishedFn(rq, subRq, reason); +} + + + +void AQH_MsgRequest_Abort(AQH_MSG_REQUEST *rq, int reason) +{ + if (rq && rq->abortFn) { + rq->abortFn(rq, reason); + AQH_MsgRequest_SetState(rq, AQH_MSG_REQUEST_STATE_DONE); + } + else { + AQH_MSG_REQUEST *rqParent; + + AQH_MsgRequest_SetState(rq, AQH_MSG_REQUEST_STATE_DONE); + rqParent=AQH_MsgRequest_Tree2_GetParent(rq); + if (rqParent) + AQH_MsgRequest_SubRequestFinished(rqParent, rq, AQH_MSG_REQUEST_REASON_ABORTED); + } +} + + + +AQH_MSG_REQUEST_HANDLERESPONSE_FN AQH_MsgRequest_SetHandleResponseFn(AQH_MSG_REQUEST *rq, + AQH_MSG_REQUEST_HANDLERESPONSE_FN f) +{ + if (rq) { + AQH_MSG_REQUEST_HANDLERESPONSE_FN oldFn; + + oldFn=rq->handleResponseFn; + rq->handleResponseFn=f; + return oldFn; + } + return NULL; +} + + + +AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN AQH_MsgRequest_SetSubRequestFinishedFn(AQH_MSG_REQUEST *rq, + AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN f) +{ + if (rq) { + AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN oldFn; + + oldFn=rq->subRequestFinishedFn; + rq->subRequestFinishedFn=f; + return oldFn; + } + + return NULL; +} + + + +AQH_MSG_REQUEST_ABORT_FN AQH_MsgRequest_SetAbortFn(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST_ABORT_FN f) +{ + if (rq) { + AQH_MSG_REQUEST_ABORT_FN oldFn; + + oldFn=rq->abortFn; + rq->abortFn=f; + return oldFn; + } + return NULL; +} + + + + + + +void *AQH_MsgRequest_GetPrivateData(const AQH_MSG_REQUEST *rq) +{ + return rq?rq->privateData:NULL; +} + + + +void AQH_MsgRequest_SetPrivateData(AQH_MSG_REQUEST *rq, void *p) +{ + if (rq) + rq->privateData=p; +} + + + +int AQH_MsgRequest_GetResult(const AQH_MSG_REQUEST *rq) +{ + return rq?rq->result:0; +} + + + +void AQH_MsgRequest_SetResult(AQH_MSG_REQUEST *rq, int result) +{ + if (rq) + rq->result=result; +} + + + +int AQH_MsgRequest_GetState(const AQH_MSG_REQUEST *rq) +{ + return rq?rq->state:0; +} + + + +void AQH_MsgRequest_SetState(AQH_MSG_REQUEST *rq, int i) +{ + if (rq) + rq->state=i; +} + + + + + + + + +AQH_MSG_REQUEST *AQH_MsgRequest_Tree2_FindByEndpointAndMsgId(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *ep, uint32_t refMsgId) +{ + if (rootRq) { + AQH_MSG_REQUEST *rq; + + rq=AQH_MsgRequest_Tree2_GetFirstChild(rootRq); + while(rq) { + if (rq->endpoint==ep && rq->requestMsgId==refMsgId) + return rq; + rq=AQH_MsgRequest_Tree2_GetBelow(rq); + } /* while */ + } + + return NULL; +} + + + +void AQH_Request_Tree2_CheckTimeouts(AQH_MSG_REQUEST *rootRq) +{ + if (rootRq) { + AQH_MSG_REQUEST *rq; + GWEN_TIMESTAMP *now; + + now=GWEN_Timestamp_NowInLocalTime(); + rq=AQH_MsgRequest_Tree2_GetFirstChild(rootRq); + while(rq) { + const GWEN_TIMESTAMP *ts; + + ts=AQH_MsgRequest_GetExpiresAt(rq); + if (GWEN_Timestamp_Compare(now, ts)>=0) { + /* timeout */ + DBG_INFO(AQH_LOGDOMAIN, "Request timed out, aborting"); + AQH_MsgRequest_Abort(rq, AQH_MSG_REQUEST_REASON_TIMEOUT); + } + rq=AQH_MsgRequest_Tree2_GetBelow(rq); + } + GWEN_Timestamp_free(now); + } +} + + + +void AQH_Request_Tree2_Cleanup(AQH_MSG_REQUEST *rootRq) +{ + if (rootRq) { + AQH_MSG_REQUEST *rq; + + rq=AQH_MsgRequest_Tree2_GetFirstChild(rootRq); + while(rq) { + AQH_MSG_REQUEST *nextSubRq; + + nextSubRq=AQH_MsgRequest_Tree2_GetNext(rq); + _freeFinishedRequests(rq); + rq=nextSubRq; + } + } +} + + + +int AQH_Request_Tree2_HandleIpcMsg(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *srcEp, AQH_MESSAGE *recvdMsg) +{ + if (rootRq) { + uint32_t refMsgId; + + refMsgId=AQH_IpcMessage_GetRefMsgId(recvdMsg); + if (refMsgId) { + AQH_MSG_REQUEST *rq; + + rq=AQH_MsgRequest_Tree2_GetFirstChild(rootRq); + while(rq) { + if (srcEp==AQH_MsgRequest_GetEndpoint(rq) && refMsgId==AQH_MsgRequest_GetRequestMsgId(rq)) { + if (AQH_MsgRequest_HandleResponse(rq, recvdMsg)==AQH_MSG_REQUEST_RESULT_HANDLED) + return AQH_MSG_REQUEST_RESULT_HANDLED; + } + + rq=AQH_MsgRequest_Tree2_GetBelow(rq); + } + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Message has no reference msg id, not a response"); + } + } + return AQH_MSG_REQUEST_RESULT_NOT_HANDLED; +} + + + +int AQH_Request_Tree2_HandleTtyMsg(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *srcEp, AQH_MESSAGE *recvdMsg) +{ + if (rootRq) { + AQH_MSG_REQUEST *rq; + + rq=AQH_MsgRequest_Tree2_GetFirstChild(rootRq); + while(rq) { + if (srcEp==AQH_MsgRequest_GetEndpoint(rq)) { + if (AQH_MsgRequest_HandleResponse(rq, recvdMsg)==AQH_MSG_REQUEST_RESULT_HANDLED) + return AQH_MSG_REQUEST_RESULT_HANDLED; + } + + rq=AQH_MsgRequest_Tree2_GetBelow(rq); + } + } + return AQH_MSG_REQUEST_RESULT_NOT_HANDLED; +} + + + +void _freeFinishedRequests(AQH_MSG_REQUEST *rq) +{ + AQH_MSG_REQUEST *subRq; + + subRq=AQH_MsgRequest_Tree2_GetFirstChild(rq); + while(subRq) { + AQH_MSG_REQUEST *nextSubRq; + + nextSubRq=AQH_MsgRequest_Tree2_GetNext(subRq); + _freeFinishedRequests(subRq); + subRq=nextSubRq; + } + + if (AQH_MsgRequest_GetState(rq)==AQH_MSG_REQUEST_STATE_DONE) { + DBG_INFO(AQH_LOGDOMAIN, "Deleting request"); + AQH_MsgRequest_free(rq); + } +} + + diff --git a/aqhome/ipc2/msgrequest.h b/aqhome/ipc2/msgrequest.h new file mode 100644 index 0000000..e24a922 --- /dev/null +++ b/aqhome/ipc2/msgrequest.h @@ -0,0 +1,104 @@ +/**************************************************************************** + * This file is part of the project Gwenhywfar. + * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSGREQUEST_H +#define AQH_MSGREQUEST_H + +#include + +#include +#include +#include +#include + + +#define AQH_MSG_REQUEST_RESULT_NOT_HANDLED 0 +#define AQH_MSG_REQUEST_RESULT_HANDLED 1 + +#define AQH_MSG_REQUEST_REASON_DONE 0 +#define AQH_MSG_REQUEST_REASON_ABORTED 1 +#define AQH_MSG_REQUEST_REASON_TIMEOUT 2 +#define AQH_MSG_REQUEST_REASON_DISCONNECT 3 + +#define AQH_MSG_REQUEST_STATE_OPEN 0 +#define AQH_MSG_REQUEST_STATE_DONE 1 + + + +typedef struct AQH_MSG_REQUEST AQH_MSG_REQUEST; + +GWEN_INHERIT_FUNCTION_LIB_DEFS(AQH_MSG_REQUEST, AQHOME_API) +GWEN_TREE2_FUNCTION_LIB_DEFS(AQH_MSG_REQUEST, AQH_MsgRequest, AQHOME_API) + +typedef int (*AQH_MSG_REQUEST_HANDLERESPONSE_FN)(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg); +typedef void (*AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN)(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST *subRq, int reason); +typedef void (*AQH_MSG_REQUEST_ABORT_FN)(AQH_MSG_REQUEST *rq, int reason); + + +AQHOME_API AQH_MSG_REQUEST *AQH_MsgRequest_new(); +AQHOME_API void AQH_MsgRequest_free(AQH_MSG_REQUEST *rq); + +AQHOME_API int AQH_MsgRequest_GetRequestType(const AQH_MSG_REQUEST *rq); +AQHOME_API void AQH_MsgRequest_SetRequestType(AQH_MSG_REQUEST *rq, int t); + +AQHOME_API AQH_MESSAGE *AQH_MsgRequest_GetRequestMsg(const AQH_MSG_REQUEST *rq); +AQHOME_API void AQH_MsgRequest_SetRequestMsg(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg); + +AQHOME_API AQH_OBJECT *AQH_MsgRequest_GetEndpoint(const AQH_MSG_REQUEST *rq); +AQHOME_API void AQH_MsgRequest_SetEndpoint(AQH_MSG_REQUEST *rq, AQH_OBJECT *ep); + +AQHOME_API uint32_t AQH_MsgRequest_GetRequestMsgId(const AQH_MSG_REQUEST *rq); +AQHOME_API void AQH_MsgRequest_SetRequestMsgId(AQH_MSG_REQUEST *rq, uint32_t id); + +AQHOME_API AQH_MESSAGE_LIST *AQH_MsgRequest_GetMsgList(const AQH_MSG_REQUEST *rq); +AQHOME_API AQH_MESSAGE *AQH_MsgRequest_GetFirstMsgFromList(const AQH_MSG_REQUEST *rq); +AQHOME_API void AQH_MsgRequest_AddMsgToList(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg); + +AQHOME_API const GWEN_TIMESTAMP *AQH_MsgRequest_GetCreatedAt(const AQH_MSG_REQUEST *rq); +AQHOME_API void AQH_MsgRequest_SetCreatedAt(AQH_MSG_REQUEST *rq, GWEN_TIMESTAMP *ts); + +AQHOME_API const GWEN_TIMESTAMP *AQH_MsgRequest_GetExpiresAt(const AQH_MSG_REQUEST *rq); +AQHOME_API void AQH_MsgRequest_SetExpiresAt(AQH_MSG_REQUEST *rq, GWEN_TIMESTAMP *ts); + +AQHOME_API void AQH_MsgRequest_SetTimestamps(AQH_MSG_REQUEST *rq, int expiresInSecs); + +AQHOME_API int AQH_MsgRequest_GetResult(const AQH_MSG_REQUEST *rq); +AQHOME_API void AQH_MsgRequest_SetResult(AQH_MSG_REQUEST *rq, int result); + +AQHOME_API int AQH_MsgRequest_GetState(const AQH_MSG_REQUEST *rq); +AQHOME_API void AQH_MsgRequest_SetState(AQH_MSG_REQUEST *rq, int i); + + +AQHOME_API int AQH_MsgRequest_HandleResponse(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg); +AQHOME_API void AQH_MsgRequest_SubRequestFinished(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST *subRq, int reason); +AQHOME_API void AQH_MsgRequest_Abort(AQH_MSG_REQUEST *rq, int reason); + + + +AQHOME_API void *AQH_MsgRequest_GetPrivateData(const AQH_MSG_REQUEST *rq); +AQHOME_API void AQH_MsgRequest_SetPrivateData(AQH_MSG_REQUEST *rq, void *p); + +AQHOME_API AQH_MSG_REQUEST_HANDLERESPONSE_FN AQH_MsgRequest_SetHandleResponseFn(AQH_MSG_REQUEST *rq, + AQH_MSG_REQUEST_HANDLERESPONSE_FN fn); + +AQHOME_API AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN AQH_MsgRequest_SetSubRequestFinishedFn(AQH_MSG_REQUEST *rq, + AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN f); +AQHOME_API AQH_MSG_REQUEST_ABORT_FN AQH_MsgRequest_SetAbortFn(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST_ABORT_FN f); + + + +AQHOME_API AQH_MSG_REQUEST *AQH_MsgRequest_Tree2_FindByEndpointAndMsgId(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *ep, uint32_t refMsgId); +AQHOME_API void AQH_Request_Tree2_CheckTimeouts(AQH_MSG_REQUEST *rootRq); +AQHOME_API void AQH_Request_Tree2_Cleanup(AQH_MSG_REQUEST *rootRq); +AQHOME_API int AQH_Request_Tree2_HandleIpcMsg(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *srcEp, AQH_MESSAGE *recvdMsg); +AQHOME_API int AQH_Request_Tree2_HandleTtyMsg(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *srcEp, AQH_MESSAGE *recvdMsg); + + + +#endif + diff --git a/aqhome/ipc2/msgrequest_p.h b/aqhome/ipc2/msgrequest_p.h new file mode 100644 index 0000000..8f7ec38 --- /dev/null +++ b/aqhome/ipc2/msgrequest_p.h @@ -0,0 +1,45 @@ +/**************************************************************************** + * This file is part of the project Gwenhywfar. + * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSGREQUEST_P_H +#define AQH_MSGREQUEST_P_H + +#include "./msgrequest.h" + + + +struct AQH_MSG_REQUEST { + GWEN_INHERIT_ELEMENT(AQH_MSG_REQUEST) + GWEN_TREE2_ELEMENT(AQH_MSG_REQUEST) + + int requestType; + AQH_MESSAGE *requestMsg; /* msg this request is based on */ + AQH_OBJECT *endpoint; /* source/dest endpoint for this request */ + uint32_t requestMsgId; + + AQH_MESSAGE_LIST *msgList; + + GWEN_TIMESTAMP *createdAt; + GWEN_TIMESTAMP *expiresAt; + + AQH_MSG_REQUEST_HANDLERESPONSE_FN handleResponseFn; + AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN subRequestFinishedFn; + AQH_MSG_REQUEST_ABORT_FN abortFn; + + void *privateData; + + int state; + int result; +}; + + + + + +#endif + diff --git a/aqhome/ipc2/msgwriter.c b/aqhome/ipc2/msgwriter.c index 0108518..8d824d9 100644 --- a/aqhome/ipc2/msgwriter.c +++ b/aqhome/ipc2/msgwriter.c @@ -77,6 +77,11 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) AQH_MSG_WRITER *xo; xo=(AQH_MSG_WRITER*) p; + if (xo->fdObject) { + AQH_Object_Disable(xo->fdObject); + AQH_Object_free(xo->fdObject); + xo->fdObject=NULL; + } GWEN_FREE_OBJECT(xo); } diff --git a/aqhome/ipc2/nodeendpoint.h b/aqhome/ipc2/nodeendpoint.h deleted file mode 100644 index 6541a9f..0000000 --- a/aqhome/ipc2/nodeendpoint.h +++ /dev/null @@ -1,37 +0,0 @@ -/**************************************************************************** - * This file is part of the project AqHome. - * AqHome (c) by 2025 Martin Preuss, all rights reserved. - * - * The license for this file can be found in the file COPYING which you - * should have received along with this file. - ****************************************************************************/ - -#ifndef AQH_NODEENDPOINT_H -#define AQH_NODEENDPOINT_H - -#include -#include - - - -enum { - AQH_NODE_ENDPOINT_SLOT_MSG_RECVD=1, - AQH_NODE_ENDPOINT_SLOT_MSG_SENT -}; - - -AQHOME_API AQH_OBJECT *AQH_NodeEndpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter); - -AQHOME_API AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgOutList(const AQH_OBJECT *o); -AQHOME_API AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgOut(AQH_OBJECT *o); -AQHOME_API void AQH_NodeEndpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg); - -AQHOME_API AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgInList(const AQH_OBJECT *o); -AQHOME_API AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgIn(AQH_OBJECT *o); -AQHOME_API void AQH_NodeEndpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg); - - - - -#endif - diff --git a/aqhome/ipc2/tcpd_object.c b/aqhome/ipc2/tcpd_object.c index d748871..9d5e369 100644 --- a/aqhome/ipc2/tcpd_object.c +++ b/aqhome/ipc2/tcpd_object.c @@ -62,7 +62,7 @@ static int _acceptConnection(int serverSocket); * ------------------------------------------------------------------------------------------------ */ -AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject) +AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, int fd) { AQH_OBJECT *o; AQH_TCPD_OBJECT *xo; @@ -72,8 +72,8 @@ AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject) GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o, xo, _freeData); AQH_Object_SetSignalHandlerFn(o, _handleSignal); - xo->fdObject=fdObject; - xo->fdSocket=AQH_FdObject_GetFd(fdObject); + xo->fdSocket=fd; + xo->fdObject=AQH_FdObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ); #if 0 /* create object for readable socket, connect to THIS, enable */ @@ -92,7 +92,10 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) AQH_TCPD_OBJECT *xo; xo=(AQH_TCPD_OBJECT*) p; - + if (xo->fdObject) { + AQH_Object_Disable(xo->fdObject); + AQH_Object_free(xo->fdObject); + } GWEN_FREE_OBJECT(xo); } diff --git a/aqhome/ipc2/tcpd_object.h b/aqhome/ipc2/tcpd_object.h index 99623a5..94e6a9a 100644 --- a/aqhome/ipc2/tcpd_object.h +++ b/aqhome/ipc2/tcpd_object.h @@ -18,11 +18,11 @@ enum { /** - * Start listening to the given fdObject which represents a tcp network socket. - * The socket for that fdObject can be created by @ref AQH_TcpdObject_CreateListeningSocket(). + * Start listening on the given socket which represents a tcp network socket. + * The socket can be created by @ref AQH_TcpdObject_CreateListeningSocket(). * */ -AQHOME_API AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject); +AQHOME_API AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, int fd); /** diff --git a/aqhome/ipc2/tty_endpoint.c b/aqhome/ipc2/tty_endpoint.c new file mode 100644 index 0000000..846307c --- /dev/null +++ b/aqhome/ipc2/tty_endpoint.c @@ -0,0 +1,57 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./tty_endpoint.h" + +#include +#include +#include +#include +#include + +#include + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + +AQH_OBJECT *AQH_TtyEndpoint2_new(AQH_EVENT_LOOP *eventLoop, int fd) +{ + int fdCopy; + AQH_OBJECT *fdReader; + AQH_OBJECT *fdWriter; + AQH_OBJECT *msgReader; + AQH_OBJECT *msgWriter; + AQH_OBJECT *endpoint; + + fdCopy=dup(fd); + + fdReader=AQH_TtyObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ); + AQH_FdObject_FlushInput(fdReader); + msgReader=AQH_NodeMsgReader_new(eventLoop, fdReader); + AQH_Object_Enable(msgReader); + + fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE); + msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter); + + endpoint=AQH_Endpoint_new(eventLoop, msgReader, msgWriter); + return endpoint; +} + + + + diff --git a/aqhome/ipc2/tty_endpoint.h b/aqhome/ipc2/tty_endpoint.h new file mode 100644 index 0000000..8ae2328 --- /dev/null +++ b/aqhome/ipc2/tty_endpoint.h @@ -0,0 +1,29 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_TTY_ENDPOINT_H +#define AQH_TTY_ENDPOINT_H + + + +#include + + + +/** + * Constructor. + * + * @param eventLoop pointer to eventLoop + * @param fd open file descriptor for tty object (see @ref AQH_TtyObject_OpenAndInitDevice). + */ +AQHOME_API AQH_OBJECT *AQH_TtyEndpoint2_new(AQH_EVENT_LOOP *eventLoop, int fd); + + + +#endif + diff --git a/aqhome/libtest.c b/aqhome/libtest.c index 4b1da83..f0e97e4 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -23,8 +23,8 @@ #include "aqhome/ipc2/nodemsgreader.h" #include "aqhome/ipc2/msgreader.h" #include "aqhome/ipc2/msgwriter.h" -#include "aqhome/ipc2/nodeendpoint.h" - +#include "aqhome/ipc2/endpoint.h" +#include "aqhome/ipc2/tty_endpoint.h" #include "aqhome/aqhome.h" @@ -480,7 +480,7 @@ int testTty(int argc, char **argv) int fd; AQH_OBJECT *ttyReadObject; AQH_OBJECT *msgReaderObject; - AQH_OBJECT *nodeEndpointObject; + AQH_OBJECT *endpointObject; int rv; if (argc<2) { @@ -504,7 +504,7 @@ int testTty(int argc, char **argv) ttyReadObject=AQH_TtyObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ); AQH_FdObject_FlushInput(ttyReadObject); msgReaderObject=AQH_NodeMsgReader_new(eventLoop, ttyReadObject); - nodeEndpointObject=AQH_NodeEndpoint_new(eventLoop, msgReaderObject, NULL); + endpointObject=AQH_Endpoint_new(eventLoop, msgReaderObject, NULL); AQH_Object_Enable(msgReaderObject); @@ -517,6 +517,44 @@ int testTty(int argc, char **argv) +int testTty2(int argc, char **argv) +{ + const char *deviceName; + AQH_EVENT_LOOP *eventLoop; + struct termios initialTermiosState; + int fd; + AQH_OBJECT *endpointObject; + int rv; + + if (argc<2) { + fprintf(stderr, "Missing device name\n"); + return 1; + } + + rv=AQH_Init(); + if (rv<0) { + } + + deviceName=argv[1]; + + eventLoop=AQH_EventLoop_new(); + fd=AQH_TtyObject_OpenAndInitDevice(deviceName, &initialTermiosState); + if (fd<0) { + fprintf(stderr, "Error opening device \"%s\"\n", deviceName); + return 2; + } + + endpointObject=AQH_TtyEndpoint2_new(eventLoop, fd); + + for (;;) { + AQH_EventLoop_Run(eventLoop); + } + + return 0; +} + + + int main(int argc, char **argv) { @@ -526,7 +564,7 @@ int main(int argc, char **argv) //return testMqttSubscribe2(argc, argv); //return testHttpDaemon(); //return testMqttSubscribe3(argc, argv); - return testTty(argc, argv); + return testTty2(argc, argv); return 0; }