From acbe4505b9bd502308f13f1c54fd901c3ece0da1 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Thu, 9 Jan 2025 01:42:19 +0100 Subject: [PATCH] ipc2: started working on event-based ipc handling. --- aqhome/0BUILD | 2 + aqhome/events2/fdobject.c | 60 +++++- aqhome/events2/fdobject.h | 2 + aqhome/events2/object.h | 3 +- aqhome/ipc2/0BUILD | 82 ++++++++ aqhome/ipc2/msgreader.c | 314 +++++++++++++++++++++++++++++ aqhome/ipc2/msgreader.h | 32 +++ aqhome/ipc2/msgreader_p.h | 38 ++++ aqhome/ipc2/tcpd_object.c | 379 ++++++++++++++++++++++++++++++++++++ aqhome/ipc2/tcpd_object.h | 29 +++ aqhome/ipc2/tcpd_object_p.h | 27 +++ 11 files changed, 963 insertions(+), 5 deletions(-) create mode 100644 aqhome/ipc2/0BUILD create mode 100644 aqhome/ipc2/msgreader.c create mode 100644 aqhome/ipc2/msgreader.h create mode 100644 aqhome/ipc2/msgreader_p.h create mode 100644 aqhome/ipc2/tcpd_object.c create mode 100644 aqhome/ipc2/tcpd_object.h create mode 100644 aqhome/ipc2/tcpd_object_p.h diff --git a/aqhome/0BUILD b/aqhome/0BUILD index 5208aa9..7ef5be5 100644 --- a/aqhome/0BUILD +++ b/aqhome/0BUILD @@ -65,6 +65,7 @@ msg ipc + ipc2 nodes mqtt hexfile @@ -78,6 +79,7 @@ aqhmsg aqhipc + aqhipc2 aqhnodes aqhmqtt aqhhexfile diff --git a/aqhome/events2/fdobject.c b/aqhome/events2/fdobject.c index 6d1cc56..e33bcd1 100644 --- a/aqhome/events2/fdobject.c +++ b/aqhome/events2/fdobject.c @@ -13,6 +13,11 @@ #include "./fdobject_p.h" #include +#include + +#include +#include +#include @@ -109,14 +114,59 @@ int AQH_FdObject_GetFd(const AQH_OBJECT *o) +int AQH_FdObject_Read(AQH_OBJECT *o, uint8_t *ptrBuffer, uint32_t lenBuffer) +{ + if (o && ptrBuffer && lenBuffer) { + AQH_FDOBJECT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_FDOBJECT, o); + if (xo) { + if (xo->fd!=-1) { + ssize_t rv; + + rv=recv(xo->fd, ptrBuffer, lenBuffer, MSG_DONTWAIT); + if (rv==0) { + DBG_INFO(AQH_LOGDOMAIN, "EOF met"); + return 0; + } + else if (rv>0) { + /* data received */ + return (int) rv; + } + else { + if (rv==EINTR || errno==EWOULDBLOCK || errno==EAGAIN) { + /* temporarily no data, try again */ + return GWEN_ERROR_TRY_AGAIN; + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Error on read: %s (%d)", strerror(errno), errno); + xo->fd=-1; + return GWEN_ERROR_IO; + } + } + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Previous error, not reading."); + return GWEN_ERROR_IO; + } + } + } + + return GWEN_ERROR_INVALID; +} + + + void _cbEnable(AQH_OBJECT *o) { - if (o) { + if (o && !(AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) { AQH_EVENT_LOOP *eventLoop; eventLoop=AQH_Object_GetEventLoop(o); - if (eventLoop) + if (eventLoop) { AQH_EventLoop_AddFdObject(eventLoop, o); + AQH_Object_AddFlags(o, AQH_OBJECT_FLAGS_ENABLED); + } } } @@ -124,12 +174,14 @@ void _cbEnable(AQH_OBJECT *o) void _cbDisable(AQH_OBJECT *o) { - if (o) { + if (o && (AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) { AQH_EVENT_LOOP *eventLoop; eventLoop=AQH_Object_GetEventLoop(o); - if (eventLoop) + if (eventLoop) { AQH_EventLoop_DelFdObject(eventLoop, o); + AQH_Object_SubFlags(o, AQH_OBJECT_FLAGS_ENABLED); + } } } diff --git a/aqhome/events2/fdobject.h b/aqhome/events2/fdobject.h index 09a8678..9289d81 100644 --- a/aqhome/events2/fdobject.h +++ b/aqhome/events2/fdobject.h @@ -32,5 +32,7 @@ AQHOME_API void AQH_FdObject_SetFdMode(AQH_OBJECT *o, int i); AQHOME_API int AQH_FdObject_GetFd(const AQH_OBJECT *o); +AQHOME_API int AQH_FdObject_Read(AQH_OBJECT *o, uint8_t *ptrBuffer, uint32_t lenBuffer); + #endif diff --git a/aqhome/events2/object.h b/aqhome/events2/object.h index e801c9c..ab99672 100644 --- a/aqhome/events2/object.h +++ b/aqhome/events2/object.h @@ -19,7 +19,8 @@ -#define AQH_OBJECT_FLAGS_FIRE 0x80000000L +#define AQH_OBJECT_FLAGS_ENABLED 0x80000000L +#define AQH_OBJECT_FLAGS_FIRE 0x40000000L enum { diff --git a/aqhome/ipc2/0BUILD b/aqhome/ipc2/0BUILD new file mode 100644 index 0000000..e4dabe8 --- /dev/null +++ b/aqhome/ipc2/0BUILD @@ -0,0 +1,82 @@ + + + + + + + + $(gwenhywfar_cflags) + -I$(topsrcdir) + -I$(topbuilddir) + + + + --include=$(builddir) + --include=$(srcdir) + + + + + + $(visibility_cflags) + + + + --api=AQHOME_API + + + + + + + + + + + + + + + + + + $(local/built_headers_pub) + + + + + msgreader.h + tcpd_object.h + + + + + msgreader_p.h + tcpd_object_p.h + + + + + $(local/typefiles) + + msgreader.c + tcpd_object.c + + + + + + + + + + + + + + + + + + + diff --git a/aqhome/ipc2/msgreader.c b/aqhome/ipc2/msgreader.c new file mode 100644 index 0000000..6ac8efc --- /dev/null +++ b/aqhome/ipc2/msgreader.c @@ -0,0 +1,314 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./msgreader_p.h" +#include + +#include +#include +#include + +#include + + +#define AQH_MSG_READER_MINMSGSIZE 12 +#define AQH_MSG_READER_MAXMSGSIZE 10240 + +enum { + AQH_MSGREADER_SLOT_SOCKETREADY=1 +}; + + + +GWEN_INHERIT(AQH_OBJECT, AQH_MSG_READER) + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); + +static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); +static int _handleSocketReady(AQH_OBJECT *o); +static int _fillRingbuffer(AQH_OBJECT *o); +static int _readMsgFromRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo); +static int _readHeaderFromRingbuffer(AQH_MSG_READER *xo); +static int _readRemainderFromRingbuffer(AQH_MSG_READER *xo); + + + +/* ------------------------------------------------------------------------------------------------ + * implementation + * ------------------------------------------------------------------------------------------------ + */ + +AQH_OBJECT *AQH_MsgReader_new(AQH_EVENT_LOOP *eventLoop, int fd) +{ + AQH_OBJECT *o; + AQH_MSG_READER *xo; + + o=AQH_Object_new(eventLoop); + GWEN_NEW_OBJECT(AQH_MSG_READER, xo); + GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_MSG_READER, o, xo, _freeData); + xo->fdSocket=fd; + xo->ringBuffer=GWEN_RingBuffer_new(AQH_MSG_READER_RINGBUFFER_SIZE); + + AQH_Object_SetSignalHandlerFn(o, _handleSignal); + + /* create object for readable socket, connect to THIS, enable */ + xo->fdObject=AQH_FdObject_new(AQH_Object_GetEventLoop(o), fd, AQH_FDOBJECT_FDMODE_READ); + AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_MSGREADER_SLOT_SOCKETREADY, o); + AQH_Object_Enable(xo->fdObject); + + return o; +} + + + +void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) +{ + AQH_MSG_READER *xo; + + xo=(AQH_MSG_READER*) p; + if (xo->fdObject) { + AQH_Object_Disable(xo->fdObject); + AQH_Object_free(xo->fdObject); + } + GWEN_FREE_OBJECT(xo); +} + + + +int _handleSignal(AQH_OBJECT *o, + uint32_t slotId, + GWEN_UNUSED AQH_OBJECT *senderObject, + GWEN_UNUSED int param1, + GWEN_UNUSED void *param2) +{ + switch(slotId) { + case AQH_MSGREADER_SLOT_SOCKETREADY: return _handleSocketReady(o); + default: + break; + } + + return 0; /* not handled */ +} + + + +int _handleSocketReady(AQH_OBJECT *o) +{ + AQH_MSG_READER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); + if (xo) { + int rv; + + /* read available data into ringbuffer */ + rv=_fillRingbuffer(o); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_ERROR, rv, NULL); + xo->fdSocket=-1; + return 1; + } + + /* read messages from ring buffer until buffer empty */ + do { + rv=_readMsgFromRingbuffer(o, xo); + } while (rv==1); + if (rv<0) { + AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_ERROR, rv, NULL); + xo->fdSocket=-1; + } + + return 1; + } + + return 0; +} + + + +int _fillRingbuffer(AQH_OBJECT *o) +{ + AQH_MSG_READER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); + if (xo && xo->fdSocket!=-1) { + uint32_t len; + + /* read data into ringbuffer */ + len=GWEN_RingBuffer_GetMaxUnsegmentedWrite(xo->ringBuffer); + if (len>0) { + int rv; + + rv=AQH_FdObject_Read(xo->fdObject, (uint8_t*) GWEN_RingBuffer_GetWritePointer(xo->ringBuffer), len); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + xo->fdSocket=-1; + return rv; + } + else if (rv==0) { + DBG_INFO(AQH_LOGDOMAIN, "EOF met"); + xo->fdSocket=-1; + return 0; + } + else { + /* bytes received */ + GWEN_RingBuffer_SkipBytesWrite(xo->ringBuffer, rv); + return rv; + } + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer full"); + return GWEN_ERROR_BUFFER_OVERFLOW; + } + } + else { + DBG_INFO(AQH_LOGDOMAIN, "fd inactive (previous error or EOF?)"); + return GWEN_ERROR_INVALID; + } +} + + + +int _readMsgFromRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo) +{ + int rv; + + if (xo->bytesReceivedbytesReceived>=AQH_MSG_READER_HEADERBUFFER_SIZE) { + /* reading remainder of msg directly into allocated buffer */ + rv=_readRemainderFromRingbuffer(xo); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + else if (rv==1) { + int msgLen; + uint8_t *msgPtr; + + msgLen=xo->bytesReceived; + msgPtr=xo->currentMsgBuf; + + xo->bytesReceived=0; + xo->bytesLeft=0; + xo->currentMsgBuf=NULL; + + rv=AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_MSGRECVD, msgLen, (void*) msgPtr); + if (rv==0) { + DBG_INFO(AQH_LOGDOMAIN, "Received message ignored"); + free(msgPtr); + } + return 1; + } + } + return 0; +} + + + +int _readHeaderFromRingbuffer(AQH_MSG_READER *xo) +{ + uint32_t remaining; + int rv; + uint32_t xferSize; + uint32_t bytesInBuffer; + + bytesInBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer); + + /* still reading header */ + remaining=AQH_MSG_READER_HEADERBUFFER_SIZE-xo->bytesReceived; + if (bytesInBufferringBuffer, (char*) (xo->headerBuffer+xo->bytesReceived), &xferSize); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + if (xferSizebytesReceived+=xferSize; + if (xo->bytesReceived==AQH_MSG_READER_HEADERBUFFER_SIZE) { + uint32_t msgLen; + + /* full size received, parse msg size, allocate buffer */ + msgLen=GWEN_ENDIAN_LE32TOH(*((const uint32_t*)(xo->headerBuffer))); + if (msgLenAQH_MSG_READER_MAXMSGSIZE) { + DBG_ERROR(AQH_LOGDOMAIN, "Bad message size(%lu)", (unsigned long int) msgLen); + return GWEN_ERROR_GENERIC; + } + xo->currentMsgBuf=(uint8_t*) malloc(msgLen+4); + memmove(xo->currentMsgBuf, xo->headerBuffer, xo->bytesReceived); + xo->bytesLeft=(msgLen+4)-xo->bytesReceived; + } + + return 0; +} + + + +int _readRemainderFromRingbuffer(AQH_MSG_READER *xo) +{ + uint32_t bytesInBuffer; + uint32_t bytesToRead; + int rv; + + bytesInBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer); + + /* still reading header */ + bytesToRead=xo->bytesLeft; + if (bytesInBufferringBuffer, (char*) (xo->currentMsgBuf+xo->bytesReceived), &xferSize); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + if (xferSizebytesReceived+=xferSize; + xo->bytesLeft-=xferSize; + if (xo->bytesLeft==0) { + /* msg finished */ + DBG_INFO(AQH_LOGDOMAIN, "Message complete"); + return 1; + } + } + return 0; +} + + + + + + diff --git a/aqhome/ipc2/msgreader.h b/aqhome/ipc2/msgreader.h new file mode 100644 index 0000000..199c390 --- /dev/null +++ b/aqhome/ipc2/msgreader.h @@ -0,0 +1,32 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSGREADER_H +#define AQH_MSGREADER_H + +#include + + + +enum { + /** param1=msgSize, param2=msgPointer */ + AQH_MSG_READER_SIGNAL_MSGRECVD=AQH_OBJECT_SIGNAL_LAST, + /** param1: error code */ + AQH_MSG_READER_SIGNAL_ERROR, + AQH_MSG_READER_SIGNAL_CLOSED +}; + + + + +AQH_OBJECT *AQH_MsgReader_new(AQH_EVENT_LOOP *eventLoop, int fd); + + + +#endif + diff --git a/aqhome/ipc2/msgreader_p.h b/aqhome/ipc2/msgreader_p.h new file mode 100644 index 0000000..b62c44c --- /dev/null +++ b/aqhome/ipc2/msgreader_p.h @@ -0,0 +1,38 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSGREADER_P_H +#define AQH_MSGREADER_P_H + + +#include "./msgreader.h" + +#include + +#define AQH_MSG_READER_RINGBUFFER_SIZE 1024 +#define AQH_MSG_READER_HEADERBUFFER_SIZE 4 + + +typedef struct AQH_MSG_READER AQH_MSG_READER; +struct AQH_MSG_READER { + int fdSocket; + AQH_OBJECT *fdObject; + GWEN_RINGBUFFER *ringBuffer; + + int bytesReceived; + int bytesLeft; + uint8_t headerBuffer[AQH_MSG_READER_HEADERBUFFER_SIZE]; + uint8_t *currentMsgBuf; +}; + + + + + +#endif + diff --git a/aqhome/ipc2/tcpd_object.c b/aqhome/ipc2/tcpd_object.c new file mode 100644 index 0000000..1eee026 --- /dev/null +++ b/aqhome/ipc2/tcpd_object.c @@ -0,0 +1,379 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./tcpd_object_p.h" +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + + +#define MAX_BACKLOG 10 + + +enum { + AQH_TCPD_OBJECT_SLOT_SOCKETREADY=1 +}; + + +GWEN_INHERIT(AQH_OBJECT, AQH_TCPD_OBJECT) + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); + +static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); +static int _handleSocketReady(AQH_OBJECT *o); + +static int _createListeningSocket(const char *addr, int port); +static int _socketSetReuseAddress(int sk, int fl); +static int _socketSetBlocking(int sk, int fl); +static int _translateHError(int herr); +static int _setHostAddr(struct in_addr *inetAddr, const char *sAddr); +static int _setHostName(struct in_addr *inetAddr, const char *sAddr); +static int _acceptConnection(int serverSocket); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + +AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop) +{ + AQH_OBJECT *o; + AQH_TCPD_OBJECT *xo; + + o=AQH_Object_new(eventLoop); + GWEN_NEW_OBJECT(AQH_TCPD_OBJECT, xo); + GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o, xo, _freeData); + xo->fdSocket=-1; + + AQH_Object_SetSignalHandlerFn(o, _handleSignal); + + return o; +} + + + +void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) +{ + AQH_TCPD_OBJECT *xo; + + xo=(AQH_TCPD_OBJECT*) p; + if (xo->fdObject) + AQH_Object_free(xo->fdObject); + if (xo->fdSocket) + close(xo->fdSocket); + + GWEN_FREE_OBJECT(xo); +} + + + +int AQH_TcpdObject_StartListening(AQH_OBJECT *o, const char *addr, int port) +{ + if (o) { + AQH_TCPD_OBJECT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o); + if (xo) { + if (xo->fdSocket<0) { + int rv; + + rv=_createListeningSocket(addr, port); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + xo->fdSocket=rv; + + /* create object for readable socket, connect to THIS, enable */ + xo->fdObject=AQH_FdObject_new(AQH_Object_GetEventLoop(o), rv, AQH_FDOBJECT_FDMODE_READ); + AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_TCPD_OBJECT_SLOT_SOCKETREADY, o); + AQH_Object_Enable(xo->fdObject); + } + } + } + + return GWEN_ERROR_GENERIC; +} + + + +void AQH_TcpdObject_StopListening(AQH_OBJECT *o) +{ + if (o) { + AQH_TCPD_OBJECT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o); + if (xo) { + if (xo->fdObject) { + AQH_Object_Disable(xo->fdObject); + AQH_Object_free(xo->fdObject); + xo->fdObject=NULL; + } + if (xo->fdSocket<0) { + close(xo->fdSocket); + xo->fdSocket=-1; + } + } + } +} + + + +int _handleSignal(AQH_OBJECT *o, + uint32_t slotId, + GWEN_UNUSED AQH_OBJECT *senderObject, + GWEN_UNUSED int param1, + GWEN_UNUSED void *param2) +{ + switch(slotId) { + case AQH_TCPD_OBJECT_SLOT_SOCKETREADY: return _handleSocketReady(o); + default: + break; + } + + return 0; /* not handled */ +} + + + +int _handleSocketReady(AQH_OBJECT *o) +{ + AQH_TCPD_OBJECT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o); + if (xo) { + int clientSk; + + clientSk=_acceptConnection(xo->fdSocket); + if (clientSk<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", clientSk); + } + else { + if (0==AQH_Object_EmitSignal(o, AQH_TCPD_OBJECT_SIGNAL_NEWCONN, clientSk, NULL)) { + DBG_INFO(AQH_LOGDOMAIN, "New connection not handled"); + close(clientSk); + } + } + return 1; /* handled */ + } + return 0; /* not handled */ +} + + + +int _createListeningSocket(const char *addr, int port) +{ + int sk; + struct sockaddr_in inetAddr; + int rv; + + memset(&inetAddr, 0, sizeof(inetAddr)); + inetAddr.sin_family=AF_INET; + rv=_setHostAddr(&inetAddr.sin_addr, addr); /* try tuple */ + if (rv<0) { + rv=_setHostName(&inetAddr.sin_addr, addr); /* lookup name */ + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + } + inetAddr.sin_port=htons(port); + + sk=socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (sk<0) { + DBG_INFO(AQH_LOGDOMAIN, "socket(): %s", strerror(errno)); + return GWEN_ERROR_IO; + } + + rv=_socketSetReuseAddress(sk, 1); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + close(sk); + return rv; + } + + rv=_socketSetBlocking(sk, 0); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + close(sk); + return rv; + } + + rv=bind(sk, (struct sockaddr*) &inetAddr, sizeof(inetAddr)); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + close(sk); + return rv; + } + + rv=listen(sk, MAX_BACKLOG); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + close(sk); + return rv; + } + return sk; +} + + + +int _setHostAddr(struct in_addr *inetAddr, const char *sAddr) +{ + inetAddr->s_addr=0; + if (!inet_aton(sAddr, inetAddr)) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid address \"%s\"", sAddr); + return GWEN_ERROR_INVALID; + } + + return 0; +} + + + +int _setHostName(struct in_addr *inetAddr, const char *sAddr) +{ + struct hostent *he; + + he=gethostbyname(sAddr); + if (!he) { + DBG_ERROR(AQH_LOGDOMAIN, "gethostbyname(\"%s\"): %s", sAddr, hstrerror(h_errno)); + return _translateHError(h_errno); + } + /* name resolved, store address */ + memcpy(inetAddr, he->h_addr_list[0], sizeof(struct in_addr)); + return 0; +} + + + +int _socketSetBlocking(int sk, int fl) +{ + int prevFlags; + int newFlags; + + /* get current socket flags */ + prevFlags=fcntl(sk, F_GETFL); + if (prevFlags==-1) { + DBG_INFO(AQH_LOGDOMAIN, "fcntl(): %s", strerror(errno)); + return GWEN_ERROR_IO; + } + + /* set nonblocking/blocking */ + if (fl) + newFlags=prevFlags&(~O_NONBLOCK); + else + newFlags=prevFlags|O_NONBLOCK; + + if (-1==fcntl(sk, F_SETFL, newFlags)) { + DBG_INFO(AQH_LOGDOMAIN, "fcntl(): %s", strerror(errno)); + return GWEN_ERROR_IO; + } + prevFlags=fcntl(sk, F_GETFL); + if (prevFlags!=newFlags) { + DBG_ERROR(AQH_LOGDOMAIN, "fcntl() did not set flags correctly (%08x!=%08x)", prevFlags, newFlags); + return GWEN_ERROR_IO; + } + + return 0; +} + + + +int _socketSetReuseAddress(int sk, int fl) +{ + if (setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, &fl, sizeof(fl))) { + DBG_INFO(AQH_LOGDOMAIN, "setsockopt(): %s", strerror(errno)); + return GWEN_ERROR_IO; + } + return 0; +} + + + +int _translateHError(int herr) +{ + int rv; + + switch (herr) { + case HOST_NOT_FOUND: + rv=GWEN_ERROR_HOST_NOT_FOUND; + break; +#ifdef NO_ADDRESS + case NO_ADDRESS: + rv=GWEN_ERROR_NO_ADDRESS; + break; +#endif + case NO_RECOVERY: + rv=GWEN_ERROR_NO_RECOVERY; + break; + case TRY_AGAIN: + rv=GWEN_ERROR_TRY_AGAIN; + break; + default: + rv=GWEN_ERROR_UNKNOWN_DNS_ERROR; + break; + } /* switch */ + + return rv; +} + + + +int _acceptConnection(int serverSocket) +{ + int clientSk; + struct sockaddr_in inetAddr; + unsigned int socklen; + int rv; + + socklen=sizeof(inetAddr); + clientSk=accept(serverSocket, (struct sockaddr*) &inetAddr, &socklen); + if (clientSk<0) { + DBG_INFO(AQH_LOGDOMAIN, "accept(): %s", strerror(errno)); + return GWEN_ERROR_IO; + } + DBG_INFO(AQH_LOGDOMAIN, "Accepted incoming connection"); + + rv=_socketSetBlocking(clientSk, 0); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + close(clientSk); + return rv; + } + + return clientSk; +} + + + + + + diff --git a/aqhome/ipc2/tcpd_object.h b/aqhome/ipc2/tcpd_object.h new file mode 100644 index 0000000..cb4e254 --- /dev/null +++ b/aqhome/ipc2/tcpd_object.h @@ -0,0 +1,29 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_TCPD_OBJECT_H +#define AQH_TCPD_OBJECT_H + +#include + +enum { + /** new client connected (param1 is fd for the new clients non-blocking socket) */ + AQH_TCPD_OBJECT_SIGNAL_NEWCONN=AQH_OBJECT_SIGNAL_LAST +}; + + +AQHOME_API AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop); +AQHOME_API int AQH_TcpdObject_StartListening(AQH_OBJECT *o, const char *addr, int port); +AQHOME_API void AQH_TcpdObject_StopListening(AQH_OBJECT *o); + + + + + +#endif + diff --git a/aqhome/ipc2/tcpd_object_p.h b/aqhome/ipc2/tcpd_object_p.h new file mode 100644 index 0000000..88f2a1e --- /dev/null +++ b/aqhome/ipc2/tcpd_object_p.h @@ -0,0 +1,27 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_TCPD_OBJECT_P_H +#define AQH_TCPD_OBJECT_P_H + + +#include "./tcpd_object.h" + + +typedef struct AQH_TCPD_OBJECT AQH_TCPD_OBJECT; +struct AQH_TCPD_OBJECT { + int fdSocket; + AQH_OBJECT *fdObject; +}; + + + + + +#endif +