aqhome: more work on new event/ipc interface.

This commit is contained in:
Martin Preuss
2025-02-26 20:59:20 +01:00
parent f63079af11
commit 8968f14122
34 changed files with 1233 additions and 126 deletions

View File

@@ -1,6 +1,6 @@
/****************************************************************************
* This file is part of the project Gwenhywfar.
* Gwenhywfar (c) by 2025 Martin Preuss, all rights reserved.
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
@@ -66,6 +66,9 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
AQH_FDOBJECT *xo;
xo=(AQH_FDOBJECT*)p;
if (xo->fd>=0)
close(xo->fd);
xo->fd=-1;
GWEN_FREE_OBJECT(xo);
}
@@ -175,6 +178,7 @@ int AQH_FdObject_Read(AQH_OBJECT *o, uint8_t *ptrBuffer, uint32_t lenBuffer)
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Error on read: %s (%d)", strerror(errno), errno);
close(xo->fd);
xo->fd=-1;
return GWEN_ERROR_IO;
}
@@ -217,6 +221,7 @@ int AQH_FdObject_FlushInput(AQH_OBJECT *o)
else if (rv<0) {
if (errno!=EINTR && errno!=EWOULDBLOCK && errno!=EAGAIN) {
DBG_ERROR(AQH_LOGDOMAIN, "Error on read: %s (%d)", strerror(errno), errno);
close(xo->fd);
xo->fd=-1;
return GWEN_ERROR_IO;
}
@@ -253,6 +258,7 @@ int AQH_FdObject_Write(AQH_OBJECT *o, const uint8_t *ptrBuffer, uint32_t lenBuff
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Error on write: %s (%d)", strerror(errno), errno);
close(xo->fd);
xo->fd=-1;
return GWEN_ERROR_IO;
}

View File

@@ -1,6 +1,6 @@
/****************************************************************************
* This file is part of the project Gwenhywfar.
* Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
@@ -28,7 +28,10 @@ typedef int (*AQH_FDOBJECT_STARTMSG_FN)(AQH_OBJECT *o);
typedef void (*AQH_FDOBJECT_ENDMSG_FN)(AQH_OBJECT *o);
/**
* Create object for given file descriptor (takes over ownership of the given file descriptor, use dup() if
* fd used in another object).
*/
AQHOME_API AQH_OBJECT *AQH_FdObject_new(AQH_EVENT_LOOP *eventLoop, int fd, int mode);
AQHOME_API int AQH_FdObject_GetFdMode(const AQH_OBJECT *o);

View File

@@ -49,6 +49,7 @@ AQH_OBJECT *AQH_Object_new(AQH_EVENT_LOOP *eventLoop)
AQH_OBJECT *o;
GWEN_NEW_OBJECT(AQH_OBJECT, o);
o->refCount=1;
GWEN_INHERIT_INIT(AQH_OBJECT, o);
GWEN_LIST_INIT(AQH_OBJECT, o);
o->eventLoop=eventLoop;
@@ -59,15 +60,25 @@ AQH_OBJECT *AQH_Object_new(AQH_EVENT_LOOP *eventLoop)
void AQH_Object_IncRefCount(AQH_OBJECT *o)
{
if (o && o->refCount>0)
o->refCount++;
}
void AQH_Object_free(AQH_OBJECT *o)
{
if (o) {
GWEN_INHERIT_FINI(AQH_OBJECT, o);
GWEN_LIST_FINI(AQH_OBJECT, o);
if (o && o->refCount>0) {
if (--(o->refCount)==0) {
GWEN_INHERIT_FINI(AQH_OBJECT, o);
GWEN_LIST_FINI(AQH_OBJECT, o);
AQH_Link_List_free(o->linkList);
AQH_Link_List_free(o->linkList);
GWEN_FREE_OBJECT(o);
GWEN_FREE_OBJECT(o);
}
}
}
@@ -75,14 +86,14 @@ void AQH_Object_free(AQH_OBJECT *o)
uint32_t AQH_Object_GetFlags(const AQH_OBJECT *o)
{
return o?o->flags:0;
return (o && o->refCount)?o->flags:0;
}
void AQH_Object_SetFlags(AQH_OBJECT *o, uint32_t i)
{
if (o)
if (o && o->refCount)
o->flags=i;
}
@@ -90,7 +101,7 @@ void AQH_Object_SetFlags(AQH_OBJECT *o, uint32_t i)
void AQH_Object_AddFlags(AQH_OBJECT *o, uint32_t i)
{
if (o)
if (o && o->refCount)
o->flags|=i;
}
@@ -98,7 +109,7 @@ void AQH_Object_AddFlags(AQH_OBJECT *o, uint32_t i)
void AQH_Object_SubFlags(AQH_OBJECT *o, uint32_t i)
{
if (o)
if (o && o->refCount)
o->flags&=~i;
}
@@ -106,14 +117,14 @@ void AQH_Object_SubFlags(AQH_OBJECT *o, uint32_t i)
AQH_EVENT_LOOP *AQH_Object_GetEventLoop(const AQH_OBJECT *o)
{
return o?o->eventLoop:NULL;
return (o && o->refCount)?o->eventLoop:NULL;
}
AQH_LINK *_findLink(AQH_OBJECT *o, uint32_t signalId, uint32_t slotId, AQH_OBJECT *targetObject)
{
if (o) {
if (o && o->refCount) {
AQH_LINK *ln;
ln=AQH_Link_List_First(o->linkList);
@@ -163,7 +174,7 @@ int AQH_Object_EmitSignal(AQH_OBJECT *o, uint32_t signalId, int param1, void *pa
{
int signalWasHandled=0;
if (o) {
if (o && o->refCount) {
AQH_LINK *ln;
ln=AQH_Link_List_First(o->linkList);
@@ -182,7 +193,7 @@ int AQH_Object_EmitSignal(AQH_OBJECT *o, uint32_t signalId, int param1, void *pa
int AQH_Object_HandleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2)
{
if (o && o->signalHandlerFn)
if (o && o->refCount && o->signalHandlerFn)
return o->signalHandlerFn(o, slotId, senderObject, param1, param2);
return 0;
}
@@ -191,7 +202,7 @@ int AQH_Object_HandleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderOb
void AQH_Object_Enable(AQH_OBJECT *o)
{
if (o && o->enableFn)
if (o && o->refCount && o->enableFn)
o->enableFn(o);
}
@@ -199,7 +210,7 @@ void AQH_Object_Enable(AQH_OBJECT *o)
void AQH_Object_Disable(AQH_OBJECT *o)
{
if (o && o->disableFn)
if (o && o->refCount && o->disableFn)
o->disableFn(o);
}
@@ -207,7 +218,7 @@ void AQH_Object_Disable(AQH_OBJECT *o)
AQH_OBJECT_SIGNALHANDLER_FN AQH_Object_SetSignalHandlerFn(AQH_OBJECT *o, AQH_OBJECT_SIGNALHANDLER_FN f)
{
if (o) {
if (o && o->refCount) {
AQH_OBJECT_SIGNALHANDLER_FN oldFn;
oldFn=o->signalHandlerFn;
@@ -221,7 +232,7 @@ AQH_OBJECT_SIGNALHANDLER_FN AQH_Object_SetSignalHandlerFn(AQH_OBJECT *o, AQH_OBJ
AQH_OBJECT_ENABLE_FN AQH_Object_SetEnableFn(AQH_OBJECT *o, AQH_OBJECT_ENABLE_FN f)
{
if (o) {
if (o && o->refCount) {
AQH_OBJECT_ENABLE_FN oldFn;
oldFn=o->enableFn;
@@ -235,7 +246,7 @@ AQH_OBJECT_ENABLE_FN AQH_Object_SetEnableFn(AQH_OBJECT *o, AQH_OBJECT_ENABLE_FN
AQH_OBJECT_DISABLE_FN AQH_Object_SetDisableFn(AQH_OBJECT *o, AQH_OBJECT_DISABLE_FN f)
{
if (o) {
if (o && o->refCount) {
AQH_OBJECT_DISABLE_FN oldFn;
oldFn=o->disableFn;

View File

@@ -9,7 +9,7 @@
#ifndef AQH_OBJECT_H
#define AQH_OBJECT_H
#include "aqhome/api.h"
#include <aqhome/api.h>
#include <gwenhywfar/inherit.h>
#include <gwenhywfar/list.h>
@@ -60,6 +60,7 @@ typedef void (*AQH_OBJECT_DISABLE_FN)(AQH_OBJECT *o);
*/
/*@{*/
AQHOME_API AQH_OBJECT *AQH_Object_new(AQH_EVENT_LOOP *eventLoop);
AQHOME_API void AQH_Object_IncRefCount(AQH_OBJECT *o);
AQHOME_API void AQH_Object_free(AQH_OBJECT *o);
/*@}*/

View File

@@ -40,6 +40,8 @@ struct AQH_OBJECT {
AQH_OBJECT_SIGNALHANDLER_FN signalHandlerFn;
AQH_OBJECT_ENABLE_FN enableFn;
AQH_OBJECT_DISABLE_FN disableFn;
int refCount;
};

View File

@@ -70,7 +70,8 @@ void AQH_ConnectDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf,
flags=AQH_Tag16IpcMsg_GetTagDataAsUint32(msg, AQH_MSGDATA_CONNECT_TAGS_FLAGS, 0);
GWEN_Buffer_AppendArgs(dbuf,
"CONNECT (code=%d, proto=%d, proto version=%d, clientId=%s, userId=%s, flags=%08x)\n",
"CONNECT %s (code=%d, proto=%d, proto version=%d, clientId=%s, userId=%s, flags=%08x)\n",
sText?sText:"",
GWEN_IpcMsg_GetCode(msg),
GWEN_IpcMsg_GetProtoId(msg),
GWEN_IpcMsg_GetProtoVersion(msg),

View File

@@ -137,7 +137,8 @@ void AQH_DevicesDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf,
{
if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSGDATA_DEVICES_MINSIZE) {
GWEN_Buffer_AppendArgs(dbuf,
"DEVICES (code=%d, proto=%d, proto version=%d, flags=0x%08x)\n",
"DEVICES %s (code=%d, proto=%d, proto version=%d, flags=0x%08x)\n",
sText?sText:"",
GWEN_IpcMsg_GetCode(msg),
GWEN_IpcMsg_GetProtoId(msg),
GWEN_IpcMsg_GetProtoVersion(msg),

View File

@@ -68,7 +68,8 @@ void AQH_GetDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf,
tsEnd=AQH_Tag16IpcMsg_GetTagDataAsUint64(msg, AQH_MSGDATA_GETDATA_TAGS_END, 0);
GWEN_Buffer_AppendArgs(dbuf,
"GETDATA (code=%d, proto=%d, proto version=%d, name=%s, tsBegin=%lu, tsEnd=%lu)\n",
"GETDATA %s (code=%d, proto=%d, proto version=%d, name=%s, tsBegin=%lu, tsEnd=%lu)\n",
sText?sText:"",
GWEN_IpcMsg_GetCode(msg),
GWEN_IpcMsg_GetProtoId(msg),
GWEN_IpcMsg_GetProtoVersion(msg),

View File

@@ -139,7 +139,8 @@ void AQH_MultiDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf
numberOfPoints=(tag?GWEN_Tag16_GetTagLength(tag):0)/(2*sizeof(uint64_t));
GWEN_Buffer_AppendArgs(dbuf,
"MULTIDATA (code=%d, proto=%d, proto version=%d, name=%s, units=%s, type=%d, datapoints=%u)\n",
"MULTIDATA %s (code=%d, proto=%d, proto version=%d, name=%s, units=%s, type=%d, datapoints=%u)\n",
sText?sText:"",
GWEN_IpcMsg_GetCode(msg),
GWEN_IpcMsg_GetProtoId(msg),
GWEN_IpcMsg_GetProtoVersion(msg),

View File

@@ -92,7 +92,8 @@ void AQH_SetDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, cons
{
if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSGDATA_SET_MINSIZE) {
GWEN_Buffer_AppendArgs(dbuf,
"SET (code=%d, proto=%d, proto version=%d)\n",
"SET %s (code=%d, proto=%d, proto version=%d)\n",
sText?sText:"",
GWEN_IpcMsg_GetCode(msg),
GWEN_IpcMsg_GetProtoId(msg),
GWEN_IpcMsg_GetProtoVersion(msg));

View File

@@ -139,7 +139,8 @@ void AQH_ValuesDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, c
{
if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSGDATA_VALUES_MINSIZE) {
GWEN_Buffer_AppendArgs(dbuf,
"VALUES (code=%d, proto=%d, proto version=%d, flags=0x%08x)\n",
"VALUES %s (code=%d, proto=%d, proto version=%d, flags=0x%08x)\n",
sText?sText:"",
GWEN_IpcMsg_GetCode(msg),
GWEN_IpcMsg_GetProtoId(msg),
GWEN_IpcMsg_GetProtoVersion(msg),

View File

@@ -37,7 +37,6 @@ GWEN_MSG *AQH_QwordsIpcMsg_new(uint8_t protoId, uint8_t protoVer, uint16_t code,
uint32_t flags, const uint64_t *i64Ptr, int count)
{
GWEN_MSG *msg;
uint8_t *ptr;
int payloadSize;
int i;

View File

@@ -60,7 +60,7 @@ void AQH_Tag16IpcMsg_Extend(GWEN_MSG *msg)
void _freeData(void *bp, void *p)
void _freeData(GWEN_UNUSED void *bp, void *p)
{
AQH_MSG_IPC_TAG16 *xmsg;
@@ -239,7 +239,8 @@ void AQH_Tag16IpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const
{
if (GWEN_Msg_GetBytesInBuffer(msg)>=GWEN_MSGIPC_OFFS_PAYLOAD) {
GWEN_Buffer_AppendArgs(dbuf,
"Tag16 (code=%d, proto=%d, proto version=%d)\n",
"Tag16 %s (code=%d, proto=%d, proto version=%d)\n",
sText?sText:"",
GWEN_IpcMsg_GetCode(msg),
GWEN_IpcMsg_GetProtoId(msg),
GWEN_IpcMsg_GetProtoVersion(msg));

View File

@@ -51,8 +51,11 @@
nodemsgreader.h
ttyobject.h
tcpd_object.h
nodeendpoint.h
endpoint.h
message.h
msgrequest.h
ipc_server.h
tty_endpoint.h
</headers>
@@ -60,8 +63,10 @@
msgreader_p.h
msgwriter_p.h
tcpd_object_p.h
nodeendpoint_p.h
endpoint_p.h
message_p.h
msgrequest_p.h
ipc_server_p.h
</headers>
@@ -74,8 +79,11 @@
nodemsgreader.c
ttyobject.c
tcpd_object.c
nodeendpoint.c
endpoint.c
message.c
msgrequest.c
ipc_server.c
tty_endpoint.c
</sources>

View File

@@ -10,7 +10,7 @@
# include <config.h>
#endif
#include "./nodeendpoint_p.h"
#include "./endpoint_p.h"
#include "aqhome/ipc2/msgreader.h"
#include "aqhome/ipc2/msgwriter.h"
#include "aqhome/msg/node/m_node.h"
@@ -25,7 +25,24 @@
GWEN_INHERIT(AQH_OBJECT, AQH_NODE_ENDPOINT)
/* ------------------------------------------------------------------------------------------------
* defs and enums
* ------------------------------------------------------------------------------------------------
*/
enum {
AQH_ENDPOINT_SLOT_MSG_RECVD=1,
AQH_ENDPOINT_SLOT_MSG_SENT
};
/* ------------------------------------------------------------------------------------------------
* global vars
* ------------------------------------------------------------------------------------------------
*/
GWEN_INHERIT(AQH_OBJECT, AQH_ENDPOINT)
@@ -36,8 +53,8 @@ GWEN_INHERIT(AQH_OBJECT, AQH_NODE_ENDPOINT)
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2);
static int _handleMsgRecvd(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr);
static int _handleMsgSent(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr);
static int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr);
static int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr);
static void _dumpMsg(const AQH_MESSAGE *msg, const char *sText);
@@ -47,14 +64,14 @@ static void _dumpMsg(const AQH_MESSAGE *msg, const char *sText);
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_NodeEndpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter)
AQH_OBJECT *AQH_Endpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter)
{
AQH_OBJECT *o;
AQH_NODE_ENDPOINT *xo;
AQH_ENDPOINT *xo;
o=AQH_Object_new(eventLoop);
GWEN_NEW_OBJECT(AQH_NODE_ENDPOINT, xo);
GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o, xo, _freeData);
GWEN_NEW_OBJECT(AQH_ENDPOINT, xo);
GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_ENDPOINT, o, xo, _freeData);
xo->msgOutList=AQH_Message_List_new();
xo->msgInList=AQH_Message_List_new();
@@ -63,12 +80,12 @@ AQH_OBJECT *AQH_NodeEndpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReade
if (msgReader) {
xo->msgReader=msgReader;
AQH_Object_AddLink(msgReader, AQH_MSG_READER_SIGNAL_MSGRECVD, AQH_NODE_ENDPOINT_SLOT_MSG_RECVD, o);
AQH_Object_AddLink(msgReader, AQH_MSG_READER_SIGNAL_MSGRECVD, AQH_ENDPOINT_SLOT_MSG_RECVD, o);
}
if (msgWriter) {
xo->msgWriter=msgWriter;
AQH_Object_AddLink(msgWriter, AQH_MSG_WRITER_SIGNAL_MSGSENT, AQH_NODE_ENDPOINT_SLOT_MSG_SENT, o);
AQH_Object_AddLink(msgWriter, AQH_MSG_WRITER_SIGNAL_MSGSENT, AQH_ENDPOINT_SLOT_MSG_SENT, o);
}
return o;
@@ -78,9 +95,9 @@ AQH_OBJECT *AQH_NodeEndpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReade
void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
{
AQH_NODE_ENDPOINT *xo;
AQH_ENDPOINT *xo;
xo=(AQH_NODE_ENDPOINT*) p;
xo=(AQH_ENDPOINT*) p;
AQH_Message_List_free(xo->msgOutList);
AQH_Message_List_free(xo->msgInList);
AQH_Object_free(xo->msgWriter);
@@ -90,12 +107,12 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgOutList(const AQH_OBJECT *o)
AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgOutList(const AQH_OBJECT *o)
{
if (o) {
AQH_NODE_ENDPOINT *xo;
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return xo->msgOutList;
}
@@ -104,12 +121,12 @@ AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgOutList(const AQH_OBJECT *o)
AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgOut(AQH_OBJECT *o)
AQH_MESSAGE *AQH_Endpoint_GetNextMsgOut(AQH_OBJECT *o)
{
if (o) {
AQH_NODE_ENDPOINT *xo;
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return AQH_Message_List_First(xo->msgOutList);
}
@@ -118,12 +135,12 @@ AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgOut(AQH_OBJECT *o)
void AQH_NodeEndpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg)
void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg)
{
if (o && msg) {
AQH_NODE_ENDPOINT *xo;
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo) {
AQH_Message_List_Add(msg, xo->msgOutList);
if (xo->msgWriter && AQH_Message_List_GetCount(xo->msgOutList)==1) {
@@ -136,12 +153,12 @@ void AQH_NodeEndpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg)
AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgInList(const AQH_OBJECT *o)
AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgInList(const AQH_OBJECT *o)
{
if (o) {
AQH_NODE_ENDPOINT *xo;
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return xo->msgInList;
}
@@ -150,12 +167,12 @@ AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgInList(const AQH_OBJECT *o)
AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgIn(AQH_OBJECT *o)
AQH_MESSAGE *AQH_Endpoint_GetNextMsgIn(AQH_OBJECT *o)
{
if (o) {
AQH_NODE_ENDPOINT *xo;
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return AQH_Message_List_First(xo->msgInList);
}
@@ -164,12 +181,12 @@ AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgIn(AQH_OBJECT *o)
void AQH_NodeEndpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg)
void AQH_Endpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg)
{
if (o && msg) {
AQH_NODE_ENDPOINT *xo;
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo) {
AQH_Message_List_Add(msg, xo->msgInList);
DBG_ERROR(AQH_LOGDOMAIN, "now %d msgs in list", AQH_Message_List_GetCount(xo->msgInList));
@@ -179,11 +196,39 @@ void AQH_NodeEndpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg)
int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2)
uint32_t AQH_Endpoint_GetLastMessageId(const AQH_OBJECT *o)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return xo->lastMsgId;
}
return 0;
}
uint32_t AQH_Endpoint_GetNextMessageId(AQH_OBJECT *o)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return ++(xo->lastMsgId);
}
return 0;
}
int _handleSignal(AQH_OBJECT *o, uint32_t slotId, GWEN_UNUSED AQH_OBJECT *senderObject, int param1, void *param2)
{
switch(slotId) {
case AQH_NODE_ENDPOINT_SLOT_MSG_RECVD: return _handleMsgRecvd(o, senderObject, param1, param2);
case AQH_NODE_ENDPOINT_SLOT_MSG_SENT: return _handleMsgSent(o, senderObject, param1, param2);
case AQH_ENDPOINT_SLOT_MSG_RECVD: return _handleMsgRecvd(o, param1, param2);
case AQH_ENDPOINT_SLOT_MSG_SENT: return _handleMsgSent(o, param1, param2);
default:
break;
}
@@ -193,24 +238,53 @@ int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int
int _handleMsgRecvd(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr)
int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr)
{
AQH_MESSAGE *msg;
DBG_ERROR(AQH_LOGDOMAIN, "Msg received:");
msg=AQH_NodeMessage_fromBuffer(msgPtr, msgLen);
_dumpMsg(msg, "Received");
AQH_NodeEndpoint_AddMsgIn(o, msg);
AQH_Endpoint_AddMsgIn(o, msg);
return 1;
}
int _handleMsgSent(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr)
int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr)
{
DBG_ERROR(AQH_LOGDOMAIN, "Msg sent:");
GWEN_Text_LogString((const char*) msgPtr, msgLen, AQH_LOGDOMAIN, GWEN_LoggerLevel_Error);
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo) {
AQH_MESSAGE *msg;
msg=AQH_Message_List_First(xo->msgOutList);
if (msg) {
/* remove sent message from list */
AQH_Message_List_Del(msg);
AQH_Message_free(msg);
/* get next message in list */
msg=AQH_Message_List_First(xo->msgOutList);
if (msg) {
DBG_ERROR(AQH_LOGDOMAIN, "Sending next message");
AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg));
}
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Last message sent, disabling writer");
AQH_Object_Disable(xo->msgWriter);
}
}
}
return 1;
}

44
aqhome/ipc2/endpoint.h Normal file
View File

@@ -0,0 +1,44 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_ENDPOINT_H
#define AQH_ENDPOINT_H
#include <aqhome/events2/object.h>
#include <aqhome/ipc2/message.h>
/**
* Constructor.
*
* Takes over ownership of msgReader and msgWriter, links signal AQH_MSG_READER_SIGNAL_MSGRECVD of msgReader
* and signal AQH_MSG_WRITER_SIGNAL_MSGSENT of msgWriter to the newly created endpoint object.
*
* @param eventLoop pointer to event loop
* @param msgReader pointer to message reader object (takes over ownership)
* @param msgWriter pointer to message writer object (takes over ownership)
*/
AQHOME_API AQH_OBJECT *AQH_Endpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter);
AQHOME_API AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgOutList(const AQH_OBJECT *o);
AQHOME_API AQH_MESSAGE *AQH_Endpoint_GetNextMsgOut(AQH_OBJECT *o);
AQHOME_API void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg);
AQHOME_API AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgInList(const AQH_OBJECT *o);
AQHOME_API AQH_MESSAGE *AQH_Endpoint_GetNextMsgIn(AQH_OBJECT *o);
AQHOME_API void AQH_Endpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg);
AQHOME_API uint32_t AQH_Endpoint_GetLastMessageId(const AQH_OBJECT *o);
AQHOME_API uint32_t AQH_Endpoint_GetNextMessageId(AQH_OBJECT *o);
#endif

View File

@@ -6,19 +6,21 @@
* should have received along with this file.
****************************************************************************/
#ifndef AQH_NODEENDPOINT_P_H
#define AQH_NODEENDPOINT_P_H
#ifndef AQH_ENDPOINT_P_H
#define AQH_ENDPOINT_P_H
#include "./nodeendpoint.h"
#include "./endpoint.h"
typedef struct AQH_NODE_ENDPOINT AQH_NODE_ENDPOINT;
typedef struct AQH_ENDPOINT AQH_ENDPOINT;
struct AQH_NODE_ENDPOINT {
struct AQH_ENDPOINT {
AQH_MESSAGE_LIST *msgOutList;
AQH_MESSAGE_LIST *msgInList;
AQH_OBJECT *msgWriter;
AQH_OBJECT *msgReader;
uint32_t lastMsgId;
};

146
aqhome/ipc2/ipc_server.c Normal file
View File

@@ -0,0 +1,146 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./ipc_server_p.h"
#include <aqhome/ipc2/tcpd_object.h>
#include <aqhome/ipc2/ipcmsgreader.h>
#include <aqhome/ipc2/msgwriter.h>
#include <aqhome/events2/fdobject.h>
#include <gwenhywfar/debug.h>
#include <unistd.h>
/* ------------------------------------------------------------------------------------------------
* defs and enums
* ------------------------------------------------------------------------------------------------
*/
enum {
AQH_IPCD_OBJECT_SLOT_NEWCONN=1
};
/* ------------------------------------------------------------------------------------------------
* global vars
* ------------------------------------------------------------------------------------------------
*/
GWEN_INHERIT(AQH_OBJECT, AQH_IPC_SERVER)
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2);
static int _handleNewConn(AQH_OBJECT *o, int newFd);
/* ------------------------------------------------------------------------------------------------
* implementation
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_IpcServerObject_new(AQH_EVENT_LOOP *eventLoop, int fd)
{
AQH_OBJECT *o;
AQH_IPC_SERVER *xo;
o=AQH_Object_new(eventLoop);
GWEN_NEW_OBJECT(AQH_IPC_SERVER, xo);
GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_IPC_SERVER, o, xo, _freeData);
AQH_Object_SetSignalHandlerFn(o, _handleSignal);
xo->tcpdObject=AQH_TcpdObject_new(eventLoop, fd);
AQH_Object_AddLink(xo->tcpdObject, AQH_TCPD_OBJECT_SIGNAL_NEWCONN, AQH_IPCD_OBJECT_SLOT_NEWCONN, o);
AQH_Object_Enable(xo->tcpdObject);
return o;
}
void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
{
AQH_IPC_SERVER *xo;
xo=(AQH_IPC_SERVER*) p;
if (xo->tcpdObject) {
AQH_Object_Disable(xo->tcpdObject);
AQH_Object_free(xo->tcpdObject);
xo->tcpdObject=NULL;
}
GWEN_FREE_OBJECT(xo);
}
int _handleSignal(AQH_OBJECT *o,
uint32_t slotId,
GWEN_UNUSED AQH_OBJECT *senderObject,
int param1,
GWEN_UNUSED void *param2)
{
switch(slotId) {
case AQH_TCPD_OBJECT_SIGNAL_NEWCONN: return _handleNewConn(o, param1);
default:
break;
}
return 0; /* not handled */
}
int _handleNewConn(AQH_OBJECT *o, int newFd)
{
AQH_EVENT_LOOP *eventLoop;
int fdCopy;
AQH_OBJECT *fdReader;
AQH_OBJECT *fdWriter;
AQH_OBJECT *msgReader;
AQH_OBJECT *msgWriter;
AQH_OBJECT *endpoint;
DBG_ERROR(AQH_LOGDOMAIN, "Incoming connection");
eventLoop=AQH_Object_GetEventLoop(o);
fdCopy=dup(newFd);
fdReader=AQH_FdObject_new(eventLoop, newFd, AQH_FDOBJECT_FDMODE_READ);
msgReader=AQH_IpcMsgReader_new(eventLoop, fdReader);
fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE);
msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter);
endpoint=AQH_Endpoint_new(eventLoop, msgReader, msgWriter);
if (0==AQH_Object_EmitSignal(o, AQH_TCPD_OBJECT_SIGNAL_NEWCONN, 0, (void*) endpoint)) {
DBG_ERROR(AQH_LOGDOMAIN, "New connection not handled");
AQH_Object_free(endpoint);
}
return 1; /* msg handled */
}

35
aqhome/ipc2/ipc_server.h Normal file
View File

@@ -0,0 +1,35 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_IPC_SERVER_H
#define AQH_IPC_SERVER_H
#include <aqhome/ipc2/endpoint.h>
enum {
/** param2=pointer to endpoint object (see @ref AQH_Endpoint_new) */
AQH_IPC_SERVER_SIGNAL_NEWCLIENT=AQH_OBJECT_SIGNAL_LAST
};
/**
* Constructor.
*
* @param eventLoop pointer to eventLoop
* @param fd socket to listen on (see @ref AQH_TcpdObject_new).
*/
AQHOME_API AQH_OBJECT *AQH_IpcServerObject_new(AQH_EVENT_LOOP *eventLoop, int fd);
#endif

View File

@@ -0,0 +1,26 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_IPC_SERVER_P_H
#define AQH_IPC_SERVER_P_H
#include "./ipc_server.h"
typedef struct AQH_IPC_SERVER AQH_IPC_SERVER;
struct AQH_IPC_SERVER {
AQH_OBJECT *tcpdObject;
};
#endif

View File

@@ -76,6 +76,21 @@ void AQH_Message_free(AQH_MESSAGE *msg)
AQH_OBJECT *AQH_Message_GetObject(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->object:NULL;
}
void AQH_Message_SetObject(AQH_MESSAGE *msg, AQH_OBJECT *o)
{
if (msg && msg->refCount)
msg->object=o;
}
uint8_t *AQH_Message_GetMsgPointer(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgPointer:NULL;

View File

@@ -11,6 +11,7 @@
#include <aqhome/api.h>
#include <aqhome/events2/object.h>
#include <gwenhywfar/list.h>
#include <gwenhywfar/inherit.h>
@@ -54,6 +55,9 @@ AQHOME_API void AQH_Message_WriteBytesAt(AQH_MESSAGE *msg, uint32_t pos, const u
AQHOME_API int AQH_Message_WriteStringAt(AQH_MESSAGE *msg, uint32_t pos, uint32_t maxSize, int filler, const char *s);
AQHOME_API int AQH_Message_WriteStringWithTrailingNullAt(AQH_MESSAGE *msg, uint32_t pos, uint32_t maxSize, int filler, const char *s);
AQHOME_API AQH_OBJECT *AQH_Message_GetObject(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetObject(AQH_MESSAGE *msg, AQH_OBJECT *o);
#endif

View File

@@ -23,6 +23,8 @@ struct AQH_MESSAGE {
uint8_t *msgPointer;
uint32_t msgSize;
uint32_t usedSize;
AQH_OBJECT *object;
};

View File

@@ -84,6 +84,11 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
xo=(AQH_MSG_READER*) p;
free(xo->currentMsgBuf);
if (xo->fdObject) {
AQH_Object_Disable(xo->fdObject);
AQH_Object_free(xo->fdObject);
xo->fdObject=NULL;
}
GWEN_FREE_OBJECT(xo);
}

473
aqhome/ipc2/msgrequest.c Normal file
View File

@@ -0,0 +1,473 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./msgrequest_p.h"
#include <aqhome/events2/fdobject.h>
#include <aqhome/msg/ipc/m_ipc.h>
#include <gwenhywfar/debug.h>
GWEN_INHERIT_FUNCTIONS(AQH_MSG_REQUEST)
GWEN_TREE2_FUNCTIONS(AQH_MSG_REQUEST, AQH_MsgRequest)
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static void _freeFinishedRequests(AQH_MSG_REQUEST *rq);
/* ------------------------------------------------------------------------------------------------
* code
* ------------------------------------------------------------------------------------------------
*/
AQH_MSG_REQUEST *AQH_MsgRequest_new()
{
AQH_MSG_REQUEST *rq;
GWEN_NEW_OBJECT(AQH_MSG_REQUEST, rq);
GWEN_INHERIT_INIT(AQH_MSG_REQUEST, rq);
GWEN_TREE2_INIT(AQH_MSG_REQUEST, rq, AQH_MsgRequest);
return rq;
}
void AQH_MsgRequest_free(AQH_MSG_REQUEST *rq)
{
if (rq) {
GWEN_TREE2_FINI(AQH_MSG_REQUEST, rq, AQH_MsgRequest);
GWEN_INHERIT_FINI(AQH_MSG_REQUEST, rq);
GWEN_Timestamp_free(rq->expiresAt);
GWEN_Timestamp_free(rq->createdAt);
AQH_Message_free(rq->requestMsg);
AQH_Message_List_free(rq->msgList);
AQH_Object_free(rq->endpoint);
GWEN_FREE_OBJECT(rq);
}
}
int AQH_MsgRequest_GetRequestType(const AQH_MSG_REQUEST *rq)
{
return rq?rq->requestType:0;
}
void AQH_MsgRequest_SetRequestType(AQH_MSG_REQUEST *rq, int t)
{
if (rq)
rq->requestType=t;
}
AQH_MESSAGE *AQH_MsgRequest_GetRequestMsg(const AQH_MSG_REQUEST *rq)
{
return rq?rq->requestMsg:NULL;
}
void AQH_MsgRequest_SetRequestMsg(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg)
{
if (rq) {
AQH_Message_free(rq->requestMsg);
rq->requestMsg=msg;
}
}
AQH_OBJECT *AQH_MsgRequest_GetEndpoint(const AQH_MSG_REQUEST *rq)
{
return rq?rq->endpoint:NULL;
}
void AQH_MsgRequest_SetEndpoint(AQH_MSG_REQUEST *rq, AQH_OBJECT *ep)
{
if (rq) {
if (ep)
AQH_Object_IncRefCount(ep);
if (rq->endpoint)
AQH_Object_free(ep);
rq->endpoint=ep;
}
}
uint32_t AQH_MsgRequest_GetRequestMsgId(const AQH_MSG_REQUEST *rq)
{
return rq?rq->requestMsgId:0;
}
void AQH_MsgRequest_SetRequestMsgId(AQH_MSG_REQUEST *rq, uint32_t id)
{
if (rq)
rq->requestMsgId=id;
}
AQH_MESSAGE_LIST *AQH_MsgRequest_GetMsgList(const AQH_MSG_REQUEST *rq)
{
return rq?rq->msgList:NULL;
}
AQH_MESSAGE *AQH_MsgRequest_GetFirstMsgFromList(const AQH_MSG_REQUEST *rq)
{
return (rq && rq->msgList)?AQH_Message_List_First(rq->msgList):NULL;
}
void AQH_MsgRequest_AddMsgToList(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg)
{
if (rq && msg) {
if (rq->msgList==NULL)
rq->msgList=AQH_Message_List_new();
AQH_Message_List_Add(msg, rq->msgList);
}
}
const GWEN_TIMESTAMP *AQH_MsgRequest_GetCreatedAt(const AQH_MSG_REQUEST *rq)
{
return rq?rq->createdAt:NULL;
}
void AQH_MsgRequest_SetCreatedAt(AQH_MSG_REQUEST *rq, GWEN_TIMESTAMP *ts)
{
if (rq) {
GWEN_Timestamp_free(rq->createdAt);
rq->createdAt=ts;
}
}
const GWEN_TIMESTAMP *AQH_MsgRequest_GetExpiresAt(const AQH_MSG_REQUEST *rq)
{
return rq?rq->expiresAt:NULL;
}
void AQH_MsgRequest_SetExpiresAt(AQH_MSG_REQUEST *rq, GWEN_TIMESTAMP *ts)
{
if (rq) {
GWEN_Timestamp_free(rq->expiresAt);
rq->expiresAt=ts;
}
}
void AQH_MsgRequest_SetTimestamps(AQH_MSG_REQUEST *rq, int expiresInSecs)
{
if (rq) {
GWEN_TIMESTAMP *ts;
ts=GWEN_Timestamp_NowInLocalTime();
GWEN_Timestamp_free(rq->createdAt);
rq->createdAt=GWEN_Timestamp_dup(ts);
GWEN_Timestamp_AddSeconds(ts, expiresInSecs);
GWEN_Timestamp_free(rq->expiresAt);
rq->expiresAt=ts;
}
}
int AQH_MsgRequest_HandleResponse(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg)
{
if (rq && rq->handleResponseFn)
return (rq->handleResponseFn)(rq, msg);
return AQH_MSG_REQUEST_RESULT_NOT_HANDLED;
}
void AQH_MsgRequest_SubRequestFinished(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST *subRq, int reason)
{
if (rq && rq->subRequestFinishedFn)
rq->subRequestFinishedFn(rq, subRq, reason);
}
void AQH_MsgRequest_Abort(AQH_MSG_REQUEST *rq, int reason)
{
if (rq && rq->abortFn) {
rq->abortFn(rq, reason);
AQH_MsgRequest_SetState(rq, AQH_MSG_REQUEST_STATE_DONE);
}
else {
AQH_MSG_REQUEST *rqParent;
AQH_MsgRequest_SetState(rq, AQH_MSG_REQUEST_STATE_DONE);
rqParent=AQH_MsgRequest_Tree2_GetParent(rq);
if (rqParent)
AQH_MsgRequest_SubRequestFinished(rqParent, rq, AQH_MSG_REQUEST_REASON_ABORTED);
}
}
AQH_MSG_REQUEST_HANDLERESPONSE_FN AQH_MsgRequest_SetHandleResponseFn(AQH_MSG_REQUEST *rq,
AQH_MSG_REQUEST_HANDLERESPONSE_FN f)
{
if (rq) {
AQH_MSG_REQUEST_HANDLERESPONSE_FN oldFn;
oldFn=rq->handleResponseFn;
rq->handleResponseFn=f;
return oldFn;
}
return NULL;
}
AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN AQH_MsgRequest_SetSubRequestFinishedFn(AQH_MSG_REQUEST *rq,
AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN f)
{
if (rq) {
AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN oldFn;
oldFn=rq->subRequestFinishedFn;
rq->subRequestFinishedFn=f;
return oldFn;
}
return NULL;
}
AQH_MSG_REQUEST_ABORT_FN AQH_MsgRequest_SetAbortFn(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST_ABORT_FN f)
{
if (rq) {
AQH_MSG_REQUEST_ABORT_FN oldFn;
oldFn=rq->abortFn;
rq->abortFn=f;
return oldFn;
}
return NULL;
}
void *AQH_MsgRequest_GetPrivateData(const AQH_MSG_REQUEST *rq)
{
return rq?rq->privateData:NULL;
}
void AQH_MsgRequest_SetPrivateData(AQH_MSG_REQUEST *rq, void *p)
{
if (rq)
rq->privateData=p;
}
int AQH_MsgRequest_GetResult(const AQH_MSG_REQUEST *rq)
{
return rq?rq->result:0;
}
void AQH_MsgRequest_SetResult(AQH_MSG_REQUEST *rq, int result)
{
if (rq)
rq->result=result;
}
int AQH_MsgRequest_GetState(const AQH_MSG_REQUEST *rq)
{
return rq?rq->state:0;
}
void AQH_MsgRequest_SetState(AQH_MSG_REQUEST *rq, int i)
{
if (rq)
rq->state=i;
}
AQH_MSG_REQUEST *AQH_MsgRequest_Tree2_FindByEndpointAndMsgId(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *ep, uint32_t refMsgId)
{
if (rootRq) {
AQH_MSG_REQUEST *rq;
rq=AQH_MsgRequest_Tree2_GetFirstChild(rootRq);
while(rq) {
if (rq->endpoint==ep && rq->requestMsgId==refMsgId)
return rq;
rq=AQH_MsgRequest_Tree2_GetBelow(rq);
} /* while */
}
return NULL;
}
void AQH_Request_Tree2_CheckTimeouts(AQH_MSG_REQUEST *rootRq)
{
if (rootRq) {
AQH_MSG_REQUEST *rq;
GWEN_TIMESTAMP *now;
now=GWEN_Timestamp_NowInLocalTime();
rq=AQH_MsgRequest_Tree2_GetFirstChild(rootRq);
while(rq) {
const GWEN_TIMESTAMP *ts;
ts=AQH_MsgRequest_GetExpiresAt(rq);
if (GWEN_Timestamp_Compare(now, ts)>=0) {
/* timeout */
DBG_INFO(AQH_LOGDOMAIN, "Request timed out, aborting");
AQH_MsgRequest_Abort(rq, AQH_MSG_REQUEST_REASON_TIMEOUT);
}
rq=AQH_MsgRequest_Tree2_GetBelow(rq);
}
GWEN_Timestamp_free(now);
}
}
void AQH_Request_Tree2_Cleanup(AQH_MSG_REQUEST *rootRq)
{
if (rootRq) {
AQH_MSG_REQUEST *rq;
rq=AQH_MsgRequest_Tree2_GetFirstChild(rootRq);
while(rq) {
AQH_MSG_REQUEST *nextSubRq;
nextSubRq=AQH_MsgRequest_Tree2_GetNext(rq);
_freeFinishedRequests(rq);
rq=nextSubRq;
}
}
}
int AQH_Request_Tree2_HandleIpcMsg(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *srcEp, AQH_MESSAGE *recvdMsg)
{
if (rootRq) {
uint32_t refMsgId;
refMsgId=AQH_IpcMessage_GetRefMsgId(recvdMsg);
if (refMsgId) {
AQH_MSG_REQUEST *rq;
rq=AQH_MsgRequest_Tree2_GetFirstChild(rootRq);
while(rq) {
if (srcEp==AQH_MsgRequest_GetEndpoint(rq) && refMsgId==AQH_MsgRequest_GetRequestMsgId(rq)) {
if (AQH_MsgRequest_HandleResponse(rq, recvdMsg)==AQH_MSG_REQUEST_RESULT_HANDLED)
return AQH_MSG_REQUEST_RESULT_HANDLED;
}
rq=AQH_MsgRequest_Tree2_GetBelow(rq);
}
}
else {
DBG_INFO(AQH_LOGDOMAIN, "Message has no reference msg id, not a response");
}
}
return AQH_MSG_REQUEST_RESULT_NOT_HANDLED;
}
int AQH_Request_Tree2_HandleTtyMsg(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *srcEp, AQH_MESSAGE *recvdMsg)
{
if (rootRq) {
AQH_MSG_REQUEST *rq;
rq=AQH_MsgRequest_Tree2_GetFirstChild(rootRq);
while(rq) {
if (srcEp==AQH_MsgRequest_GetEndpoint(rq)) {
if (AQH_MsgRequest_HandleResponse(rq, recvdMsg)==AQH_MSG_REQUEST_RESULT_HANDLED)
return AQH_MSG_REQUEST_RESULT_HANDLED;
}
rq=AQH_MsgRequest_Tree2_GetBelow(rq);
}
}
return AQH_MSG_REQUEST_RESULT_NOT_HANDLED;
}
void _freeFinishedRequests(AQH_MSG_REQUEST *rq)
{
AQH_MSG_REQUEST *subRq;
subRq=AQH_MsgRequest_Tree2_GetFirstChild(rq);
while(subRq) {
AQH_MSG_REQUEST *nextSubRq;
nextSubRq=AQH_MsgRequest_Tree2_GetNext(subRq);
_freeFinishedRequests(subRq);
subRq=nextSubRq;
}
if (AQH_MsgRequest_GetState(rq)==AQH_MSG_REQUEST_STATE_DONE) {
DBG_INFO(AQH_LOGDOMAIN, "Deleting request");
AQH_MsgRequest_free(rq);
}
}

104
aqhome/ipc2/msgrequest.h Normal file
View File

@@ -0,0 +1,104 @@
/****************************************************************************
* This file is part of the project Gwenhywfar.
* Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_MSGREQUEST_H
#define AQH_MSGREQUEST_H
#include <aqhome/ipc2/message.h>
#include <gwenhywfar/gwenhywfar.h>
#include <gwenhywfar/inherit.h>
#include <gwenhywfar/tree2.h>
#include <gwenhywfar/timestamp.h>
#define AQH_MSG_REQUEST_RESULT_NOT_HANDLED 0
#define AQH_MSG_REQUEST_RESULT_HANDLED 1
#define AQH_MSG_REQUEST_REASON_DONE 0
#define AQH_MSG_REQUEST_REASON_ABORTED 1
#define AQH_MSG_REQUEST_REASON_TIMEOUT 2
#define AQH_MSG_REQUEST_REASON_DISCONNECT 3
#define AQH_MSG_REQUEST_STATE_OPEN 0
#define AQH_MSG_REQUEST_STATE_DONE 1
typedef struct AQH_MSG_REQUEST AQH_MSG_REQUEST;
GWEN_INHERIT_FUNCTION_LIB_DEFS(AQH_MSG_REQUEST, AQHOME_API)
GWEN_TREE2_FUNCTION_LIB_DEFS(AQH_MSG_REQUEST, AQH_MsgRequest, AQHOME_API)
typedef int (*AQH_MSG_REQUEST_HANDLERESPONSE_FN)(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg);
typedef void (*AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN)(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST *subRq, int reason);
typedef void (*AQH_MSG_REQUEST_ABORT_FN)(AQH_MSG_REQUEST *rq, int reason);
AQHOME_API AQH_MSG_REQUEST *AQH_MsgRequest_new();
AQHOME_API void AQH_MsgRequest_free(AQH_MSG_REQUEST *rq);
AQHOME_API int AQH_MsgRequest_GetRequestType(const AQH_MSG_REQUEST *rq);
AQHOME_API void AQH_MsgRequest_SetRequestType(AQH_MSG_REQUEST *rq, int t);
AQHOME_API AQH_MESSAGE *AQH_MsgRequest_GetRequestMsg(const AQH_MSG_REQUEST *rq);
AQHOME_API void AQH_MsgRequest_SetRequestMsg(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg);
AQHOME_API AQH_OBJECT *AQH_MsgRequest_GetEndpoint(const AQH_MSG_REQUEST *rq);
AQHOME_API void AQH_MsgRequest_SetEndpoint(AQH_MSG_REQUEST *rq, AQH_OBJECT *ep);
AQHOME_API uint32_t AQH_MsgRequest_GetRequestMsgId(const AQH_MSG_REQUEST *rq);
AQHOME_API void AQH_MsgRequest_SetRequestMsgId(AQH_MSG_REQUEST *rq, uint32_t id);
AQHOME_API AQH_MESSAGE_LIST *AQH_MsgRequest_GetMsgList(const AQH_MSG_REQUEST *rq);
AQHOME_API AQH_MESSAGE *AQH_MsgRequest_GetFirstMsgFromList(const AQH_MSG_REQUEST *rq);
AQHOME_API void AQH_MsgRequest_AddMsgToList(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg);
AQHOME_API const GWEN_TIMESTAMP *AQH_MsgRequest_GetCreatedAt(const AQH_MSG_REQUEST *rq);
AQHOME_API void AQH_MsgRequest_SetCreatedAt(AQH_MSG_REQUEST *rq, GWEN_TIMESTAMP *ts);
AQHOME_API const GWEN_TIMESTAMP *AQH_MsgRequest_GetExpiresAt(const AQH_MSG_REQUEST *rq);
AQHOME_API void AQH_MsgRequest_SetExpiresAt(AQH_MSG_REQUEST *rq, GWEN_TIMESTAMP *ts);
AQHOME_API void AQH_MsgRequest_SetTimestamps(AQH_MSG_REQUEST *rq, int expiresInSecs);
AQHOME_API int AQH_MsgRequest_GetResult(const AQH_MSG_REQUEST *rq);
AQHOME_API void AQH_MsgRequest_SetResult(AQH_MSG_REQUEST *rq, int result);
AQHOME_API int AQH_MsgRequest_GetState(const AQH_MSG_REQUEST *rq);
AQHOME_API void AQH_MsgRequest_SetState(AQH_MSG_REQUEST *rq, int i);
AQHOME_API int AQH_MsgRequest_HandleResponse(AQH_MSG_REQUEST *rq, AQH_MESSAGE *msg);
AQHOME_API void AQH_MsgRequest_SubRequestFinished(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST *subRq, int reason);
AQHOME_API void AQH_MsgRequest_Abort(AQH_MSG_REQUEST *rq, int reason);
AQHOME_API void *AQH_MsgRequest_GetPrivateData(const AQH_MSG_REQUEST *rq);
AQHOME_API void AQH_MsgRequest_SetPrivateData(AQH_MSG_REQUEST *rq, void *p);
AQHOME_API AQH_MSG_REQUEST_HANDLERESPONSE_FN AQH_MsgRequest_SetHandleResponseFn(AQH_MSG_REQUEST *rq,
AQH_MSG_REQUEST_HANDLERESPONSE_FN fn);
AQHOME_API AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN AQH_MsgRequest_SetSubRequestFinishedFn(AQH_MSG_REQUEST *rq,
AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN f);
AQHOME_API AQH_MSG_REQUEST_ABORT_FN AQH_MsgRequest_SetAbortFn(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST_ABORT_FN f);
AQHOME_API AQH_MSG_REQUEST *AQH_MsgRequest_Tree2_FindByEndpointAndMsgId(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *ep, uint32_t refMsgId);
AQHOME_API void AQH_Request_Tree2_CheckTimeouts(AQH_MSG_REQUEST *rootRq);
AQHOME_API void AQH_Request_Tree2_Cleanup(AQH_MSG_REQUEST *rootRq);
AQHOME_API int AQH_Request_Tree2_HandleIpcMsg(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *srcEp, AQH_MESSAGE *recvdMsg);
AQHOME_API int AQH_Request_Tree2_HandleTtyMsg(AQH_MSG_REQUEST *rootRq, AQH_OBJECT *srcEp, AQH_MESSAGE *recvdMsg);
#endif

View File

@@ -0,0 +1,45 @@
/****************************************************************************
* This file is part of the project Gwenhywfar.
* Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_MSGREQUEST_P_H
#define AQH_MSGREQUEST_P_H
#include "./msgrequest.h"
struct AQH_MSG_REQUEST {
GWEN_INHERIT_ELEMENT(AQH_MSG_REQUEST)
GWEN_TREE2_ELEMENT(AQH_MSG_REQUEST)
int requestType;
AQH_MESSAGE *requestMsg; /* msg this request is based on */
AQH_OBJECT *endpoint; /* source/dest endpoint for this request */
uint32_t requestMsgId;
AQH_MESSAGE_LIST *msgList;
GWEN_TIMESTAMP *createdAt;
GWEN_TIMESTAMP *expiresAt;
AQH_MSG_REQUEST_HANDLERESPONSE_FN handleResponseFn;
AQH_MSG_REQUEST_SUBREQUESTFINISHED_FN subRequestFinishedFn;
AQH_MSG_REQUEST_ABORT_FN abortFn;
void *privateData;
int state;
int result;
};
#endif

View File

@@ -77,6 +77,11 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
AQH_MSG_WRITER *xo;
xo=(AQH_MSG_WRITER*) p;
if (xo->fdObject) {
AQH_Object_Disable(xo->fdObject);
AQH_Object_free(xo->fdObject);
xo->fdObject=NULL;
}
GWEN_FREE_OBJECT(xo);
}

View File

@@ -1,37 +0,0 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_NODEENDPOINT_H
#define AQH_NODEENDPOINT_H
#include <aqhome/events2/object.h>
#include <aqhome/ipc2/message.h>
enum {
AQH_NODE_ENDPOINT_SLOT_MSG_RECVD=1,
AQH_NODE_ENDPOINT_SLOT_MSG_SENT
};
AQHOME_API AQH_OBJECT *AQH_NodeEndpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter);
AQHOME_API AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgOutList(const AQH_OBJECT *o);
AQHOME_API AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgOut(AQH_OBJECT *o);
AQHOME_API void AQH_NodeEndpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg);
AQHOME_API AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgInList(const AQH_OBJECT *o);
AQHOME_API AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgIn(AQH_OBJECT *o);
AQHOME_API void AQH_NodeEndpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg);
#endif

View File

@@ -62,7 +62,7 @@ static int _acceptConnection(int serverSocket);
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject)
AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, int fd)
{
AQH_OBJECT *o;
AQH_TCPD_OBJECT *xo;
@@ -72,8 +72,8 @@ AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject)
GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o, xo, _freeData);
AQH_Object_SetSignalHandlerFn(o, _handleSignal);
xo->fdObject=fdObject;
xo->fdSocket=AQH_FdObject_GetFd(fdObject);
xo->fdSocket=fd;
xo->fdObject=AQH_FdObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ);
#if 0
/* create object for readable socket, connect to THIS, enable */
@@ -92,7 +92,10 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
AQH_TCPD_OBJECT *xo;
xo=(AQH_TCPD_OBJECT*) p;
if (xo->fdObject) {
AQH_Object_Disable(xo->fdObject);
AQH_Object_free(xo->fdObject);
}
GWEN_FREE_OBJECT(xo);
}

View File

@@ -18,11 +18,11 @@ enum {
/**
* Start listening to the given fdObject which represents a tcp network socket.
* The socket for that fdObject can be created by @ref AQH_TcpdObject_CreateListeningSocket().
* Start listening on the given socket which represents a tcp network socket.
* The socket can be created by @ref AQH_TcpdObject_CreateListeningSocket().
*
*/
AQHOME_API AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject);
AQHOME_API AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, int fd);
/**

View File

@@ -0,0 +1,57 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./tty_endpoint.h"
#include <aqhome/ipc2/nodemsgreader.h>
#include <aqhome/ipc2/msgwriter.h>
#include <aqhome/ipc2/ttyobject.h>
#include <aqhome/ipc2/endpoint.h>
#include <aqhome/events2/fdobject.h>
#include <gwenhywfar/debug.h>
#include <unistd.h>
/* ------------------------------------------------------------------------------------------------
* code
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_TtyEndpoint2_new(AQH_EVENT_LOOP *eventLoop, int fd)
{
int fdCopy;
AQH_OBJECT *fdReader;
AQH_OBJECT *fdWriter;
AQH_OBJECT *msgReader;
AQH_OBJECT *msgWriter;
AQH_OBJECT *endpoint;
fdCopy=dup(fd);
fdReader=AQH_TtyObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ);
AQH_FdObject_FlushInput(fdReader);
msgReader=AQH_NodeMsgReader_new(eventLoop, fdReader);
AQH_Object_Enable(msgReader);
fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE);
msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter);
endpoint=AQH_Endpoint_new(eventLoop, msgReader, msgWriter);
return endpoint;
}

View File

@@ -0,0 +1,29 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_TTY_ENDPOINT_H
#define AQH_TTY_ENDPOINT_H
#include <aqhome/events2/object.h>
/**
* Constructor.
*
* @param eventLoop pointer to eventLoop
* @param fd open file descriptor for tty object (see @ref AQH_TtyObject_OpenAndInitDevice).
*/
AQHOME_API AQH_OBJECT *AQH_TtyEndpoint2_new(AQH_EVENT_LOOP *eventLoop, int fd);
#endif

View File

@@ -23,8 +23,8 @@
#include "aqhome/ipc2/nodemsgreader.h"
#include "aqhome/ipc2/msgreader.h"
#include "aqhome/ipc2/msgwriter.h"
#include "aqhome/ipc2/nodeendpoint.h"
#include "aqhome/ipc2/endpoint.h"
#include "aqhome/ipc2/tty_endpoint.h"
#include "aqhome/aqhome.h"
@@ -480,7 +480,7 @@ int testTty(int argc, char **argv)
int fd;
AQH_OBJECT *ttyReadObject;
AQH_OBJECT *msgReaderObject;
AQH_OBJECT *nodeEndpointObject;
AQH_OBJECT *endpointObject;
int rv;
if (argc<2) {
@@ -504,7 +504,7 @@ int testTty(int argc, char **argv)
ttyReadObject=AQH_TtyObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ);
AQH_FdObject_FlushInput(ttyReadObject);
msgReaderObject=AQH_NodeMsgReader_new(eventLoop, ttyReadObject);
nodeEndpointObject=AQH_NodeEndpoint_new(eventLoop, msgReaderObject, NULL);
endpointObject=AQH_Endpoint_new(eventLoop, msgReaderObject, NULL);
AQH_Object_Enable(msgReaderObject);
@@ -517,6 +517,44 @@ int testTty(int argc, char **argv)
int testTty2(int argc, char **argv)
{
const char *deviceName;
AQH_EVENT_LOOP *eventLoop;
struct termios initialTermiosState;
int fd;
AQH_OBJECT *endpointObject;
int rv;
if (argc<2) {
fprintf(stderr, "Missing device name\n");
return 1;
}
rv=AQH_Init();
if (rv<0) {
}
deviceName=argv[1];
eventLoop=AQH_EventLoop_new();
fd=AQH_TtyObject_OpenAndInitDevice(deviceName, &initialTermiosState);
if (fd<0) {
fprintf(stderr, "Error opening device \"%s\"\n", deviceName);
return 2;
}
endpointObject=AQH_TtyEndpoint2_new(eventLoop, fd);
for (;;) {
AQH_EventLoop_Run(eventLoop);
}
return 0;
}
int main(int argc, char **argv)
{
@@ -526,7 +564,7 @@ int main(int argc, char **argv)
//return testMqttSubscribe2(argc, argv);
//return testHttpDaemon();
//return testMqttSubscribe3(argc, argv);
return testTty(argc, argv);
return testTty2(argc, argv);
return 0;
}