aqhome: Prepared reorganizing IPC and nodes code around built-in event2 api.

This commit is contained in:
Martin Preuss
2025-02-26 00:49:33 +01:00
parent cf8edbbd5f
commit f63079af11
54 changed files with 2390 additions and 202 deletions

View File

@@ -138,62 +138,6 @@ void AQH_Message_IncUsedSize(AQH_MESSAGE *msg, uint32_t i)
int AQH_Message_GetMsgType(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgType:0;
}
void AQH_Message_SetMsgType(AQH_MESSAGE *msg, int i)
{
if (msg && msg->refCount)
msg->msgType=i;
}
int AQH_Message_GetMsgProtoId(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgProtoId:0;
}
void AQH_Message_SetMsgProtoId(AQH_MESSAGE *msg, int i)
{
if (msg && msg->refCount)
msg->msgProtoId=i;
}
int AQH_Message_GetMsgProtoVer(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgProtoVer:0;
}
void AQH_Message_SetMsgProtoVer(AQH_MESSAGE *msg, int i)
{
if (msg && msg->refCount)
msg->msgProtoVer=i;
}
int AQH_Message_GetMsgCommand(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgCommand:0;
}
void AQH_Message_SetMsgCommand(AQH_MESSAGE *msg, int i)
{
if (msg && msg->refCount)
msg->msgCommand=i;
}

View File

@@ -37,19 +37,6 @@ AQHOME_API uint32_t AQH_Message_GetUsedSize(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetUsedSize(AQH_MESSAGE *msg, uint32_t i);
AQHOME_API void AQH_Message_IncUsedSize(AQH_MESSAGE *msg, uint32_t i);
/* parsed header data */
AQHOME_API int AQH_Message_GetMsgType(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetMsgType(AQH_MESSAGE *msg, int i);
AQHOME_API int AQH_Message_GetMsgProtoId(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetMsgProtoId(AQH_MESSAGE *msg, int i);
AQHOME_API int AQH_Message_GetMsgProtoVer(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetMsgProtoVer(AQH_MESSAGE *msg, int i);
AQHOME_API int AQH_Message_GetMsgCommand(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetMsgCommand(AQH_MESSAGE *msg, int i);
/* helper functions for parsing */
AQHOME_API void AQH_Message_WriteUint8At(AQH_MESSAGE *msg, uint32_t pos, uint8_t d);
AQHOME_API uint8_t AQH_Message_ReadUint8At(const AQH_MESSAGE *msg, uint32_t pos, uint8_t defaultValue);

View File

@@ -23,12 +23,6 @@ struct AQH_MESSAGE {
uint8_t *msgPointer;
uint32_t msgSize;
uint32_t usedSize;
/* parsed header data */
int msgType;
int msgProtoId;
int msgProtoVer;
int msgCommand;
};

View File

@@ -212,7 +212,7 @@ void _resetBuffer(AQH_OBJECT *o)
void _cbEnable(AQH_OBJECT *o)
{
if (o && !(AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) {
if (o) {
AQH_MSG_WRITER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o);
@@ -225,7 +225,7 @@ void _cbEnable(AQH_OBJECT *o)
void _cbDisable(AQH_OBJECT *o)
{
if (o && (AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) {
if (o) {
AQH_MSG_WRITER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o);

View File

@@ -222,12 +222,7 @@ void _dumpMsg(const AQH_MESSAGE *msg, const char *sText)
GWEN_BUFFER *mbuf;
mbuf=GWEN_Buffer_new(0, 256, 0, 1);
switch(AQH_NodeMessage_GetMsgType(msg)) {
case AQH_MSG_TYPE_DEVICE: AQH_DeviceMessage_DumpToBuffer(msg, mbuf, sText); break;
case AQH_MSG_TYPE_COMRECVSTATS: AQH_RecvStatsMessage_DumpToBuffer(msg, mbuf, sText); break;
case AQH_MSG_TYPE_COMSENDSTATS: AQH_SendStatsMessage_DumpToBuffer(msg, mbuf, sText); break;
default: AQH_NodeMessage_DumpToBuffer(msg, mbuf, sText); break;
}
AQH_NodeMessage_DumpSpecificToBuffer(msg, mbuf, sText);
DBG_ERROR(AQH_LOGDOMAIN, "%s", GWEN_Buffer_GetStart(mbuf));
GWEN_Buffer_free(mbuf);
}

View File

@@ -48,7 +48,6 @@ static void GWENHYWFAR_CB _freeData(void *bp, void *p);
static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2);
static int _handleSocketReady(AQH_OBJECT *o);
static int _createListeningSocket(const char *addr, int port);
static int _socketSetReuseAddress(int sk, int fl);
static int _socketSetBlocking(int sk, int fl);
static int _translateHError(int herr);
@@ -63,7 +62,7 @@ static int _acceptConnection(int serverSocket);
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop)
AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject)
{
AQH_OBJECT *o;
AQH_TCPD_OBJECT *xo;
@@ -71,10 +70,18 @@ AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop)
o=AQH_Object_new(eventLoop);
GWEN_NEW_OBJECT(AQH_TCPD_OBJECT, xo);
GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o, xo, _freeData);
xo->fdSocket=-1;
AQH_Object_SetSignalHandlerFn(o, _handleSignal);
xo->fdObject=fdObject;
xo->fdSocket=AQH_FdObject_GetFd(fdObject);
#if 0
/* create object for readable socket, connect to THIS, enable */
xo->fdObject=AQH_FdObject_new(AQH_Object_GetEventLoop(o), rv, AQH_FDOBJECT_FDMODE_READ);
AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_TCPD_OBJECT_SLOT_SOCKETREADY, o);
AQH_Object_Enable(xo->fdObject);
#endif
return o;
}
@@ -85,111 +92,13 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
AQH_TCPD_OBJECT *xo;
xo=(AQH_TCPD_OBJECT*) p;
if (xo->fdObject)
AQH_Object_free(xo->fdObject);
if (xo->fdSocket)
close(xo->fdSocket);
GWEN_FREE_OBJECT(xo);
}
int AQH_TcpdObject_StartListening(AQH_OBJECT *o, const char *addr, int port)
{
if (o) {
AQH_TCPD_OBJECT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o);
if (xo) {
if (xo->fdSocket<0) {
int rv;
rv=_createListeningSocket(addr, port);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
xo->fdSocket=rv;
/* create object for readable socket, connect to THIS, enable */
xo->fdObject=AQH_FdObject_new(AQH_Object_GetEventLoop(o), rv, AQH_FDOBJECT_FDMODE_READ);
AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_TCPD_OBJECT_SLOT_SOCKETREADY, o);
AQH_Object_Enable(xo->fdObject);
}
}
}
return GWEN_ERROR_GENERIC;
}
void AQH_TcpdObject_StopListening(AQH_OBJECT *o)
{
if (o) {
AQH_TCPD_OBJECT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o);
if (xo) {
if (xo->fdObject) {
AQH_Object_Disable(xo->fdObject);
AQH_Object_free(xo->fdObject);
xo->fdObject=NULL;
}
if (xo->fdSocket<0) {
close(xo->fdSocket);
xo->fdSocket=-1;
}
}
}
}
int _handleSignal(AQH_OBJECT *o,
uint32_t slotId,
GWEN_UNUSED AQH_OBJECT *senderObject,
GWEN_UNUSED int param1,
GWEN_UNUSED void *param2)
{
switch(slotId) {
case AQH_TCPD_OBJECT_SLOT_SOCKETREADY: return _handleSocketReady(o);
default:
break;
}
return 0; /* not handled */
}
int _handleSocketReady(AQH_OBJECT *o)
{
AQH_TCPD_OBJECT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o);
if (xo) {
int clientSk;
clientSk=_acceptConnection(xo->fdSocket);
if (clientSk<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", clientSk);
}
else {
if (0==AQH_Object_EmitSignal(o, AQH_TCPD_OBJECT_SIGNAL_NEWCONN, clientSk, NULL)) {
DBG_INFO(AQH_LOGDOMAIN, "New connection not handled");
close(clientSk);
}
}
return 1; /* handled */
}
return 0; /* not handled */
}
int _createListeningSocket(const char *addr, int port)
int AQH_TcpdObject_CreateListeningSocket(const char *addr, int port)
{
int sk;
struct sockaddr_in inetAddr;
@@ -245,6 +154,48 @@ int _createListeningSocket(const char *addr, int port)
int _handleSignal(AQH_OBJECT *o,
uint32_t slotId,
GWEN_UNUSED AQH_OBJECT *senderObject,
GWEN_UNUSED int param1,
GWEN_UNUSED void *param2)
{
switch(slotId) {
case AQH_TCPD_OBJECT_SLOT_SOCKETREADY: return _handleSocketReady(o);
default:
break;
}
return 0; /* not handled */
}
int _handleSocketReady(AQH_OBJECT *o)
{
AQH_TCPD_OBJECT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o);
if (xo) {
int clientSk;
clientSk=_acceptConnection(xo->fdSocket);
if (clientSk<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", clientSk);
}
else {
if (0==AQH_Object_EmitSignal(o, AQH_TCPD_OBJECT_SIGNAL_NEWCONN, clientSk, NULL)) {
DBG_INFO(AQH_LOGDOMAIN, "New connection not handled");
close(clientSk);
}
}
return 1; /* handled */
}
return 0; /* not handled */
}
int _setHostAddr(struct in_addr *inetAddr, const char *sAddr)
{
inetAddr->s_addr=0;

View File

@@ -17,9 +17,18 @@ enum {
};
AQHOME_API AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop);
AQHOME_API int AQH_TcpdObject_StartListening(AQH_OBJECT *o, const char *addr, int port);
AQHOME_API void AQH_TcpdObject_StopListening(AQH_OBJECT *o);
/**
* Start listening to the given fdObject which represents a tcp network socket.
* The socket for that fdObject can be created by @ref AQH_TcpdObject_CreateListeningSocket().
*
*/
AQHOME_API AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject);
/**
* Helper function to create a listening TCP socket.
*/
AQHOME_API int AQH_TcpdObject_CreateListeningSocket(const char *addr, int port);