aqhome: more work on transformation to event2/ipc2.

This commit is contained in:
Martin Preuss
2025-02-27 14:08:44 +01:00
parent bebc4c1b0d
commit d887747b3c
45 changed files with 2446 additions and 287 deletions

View File

@@ -19,7 +19,7 @@ typedef struct AQH_EVENT_LOOP AQH_EVENT_LOOP;
AQHOME_API AQH_EVENT_LOOP *AQH_EventLoop_new(void);
AQHOME_API void AQH_EventLoop_free(AQH_EVENT_LOOP *eventLoop);
AQHOME_API void AQH_EventLoop_Run(AQH_EVENT_LOOP *eventLoop);
AQHOME_API void AQH_EventLoop_Run(AQH_EVENT_LOOP *eventLoop, int timeoutInMillisecs);
AQHOME_API void AQH_EventLoop_AddFdObject(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *o);
AQHOME_API void AQH_EventLoop_DelFdObject(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *o);

View File

@@ -39,7 +39,7 @@ static void _signalReadyFdObjects(AQH_OBJECT_LIST2 *ol);
* ------------------------------------------------------------------------------------------------
*/
void AQH_EventLoop_Run(AQH_EVENT_LOOP *eventLoop)
void AQH_EventLoop_Run(AQH_EVENT_LOOP *eventLoop, int timeoutInMillisecs)
{
fd_set fdRead;
fd_set fdWrite;
@@ -56,8 +56,8 @@ void AQH_EventLoop_Run(AQH_EVENT_LOOP *eventLoop)
if (highestFd>-1) {
struct timeval tv;
tv.tv_sec=0;
tv.tv_usec=200000;
tv.tv_sec=timeoutInMillisecs/1000;
tv.tv_usec=(timeoutInMillisecs%1000)*1000;
rv=select(highestFd+1, &fdRead, &fdWrite, NULL, &tv);
if (rv>0) {
/* some fds became active */

View File

@@ -168,7 +168,7 @@ int AQH_FdObject_Read(AQH_OBJECT *o, uint8_t *ptrBuffer, uint32_t lenBuffer)
}
else if (rv>0) {
/* data received */
DBG_ERROR(AQH_LOGDOMAIN, "Received %d bytes", (int) rv);
DBG_INFO(AQH_LOGDOMAIN, "Received %d bytes", (int) rv);
return (int) rv;
}
else {

View File

@@ -72,10 +72,9 @@ void AQH_Object_free(AQH_OBJECT *o)
{
if (o && o->refCount>0) {
if (--(o->refCount)==0) {
GWEN_INHERIT_FINI(AQH_OBJECT, o);
GWEN_LIST_FINI(AQH_OBJECT, o);
AQH_Link_List_free(o->linkList);
GWEN_LIST_FINI(AQH_OBJECT, o);
GWEN_INHERIT_FINI(AQH_OBJECT, o);
GWEN_FREE_OBJECT(o);
}

View File

@@ -21,6 +21,7 @@
#define AQH_OBJECT_FLAGS_ENABLED 0x80000000L
#define AQH_OBJECT_FLAGS_FIRE 0x40000000L
#define AQH_OBJECT_FLAGS_DELETE 0x20000000L
enum {

View File

@@ -51,10 +51,12 @@
nodemsgreader.h
ttyobject.h
tcpd_object.h
tcp_object.h
endpoint.h
message.h
msgrequest.h
ipc_server.h
ipc_client.h
tty_endpoint.h
</headers>
@@ -79,10 +81,12 @@
nodemsgreader.c
ttyobject.c
tcpd_object.c
tcp_object.c
endpoint.c
message.c
msgrequest.c
ipc_server.c
ipc_client.c
tty_endpoint.c
</sources>

View File

@@ -30,9 +30,14 @@
* ------------------------------------------------------------------------------------------------
*/
/*#define LOG_MESSAGES*/
enum {
AQH_ENDPOINT_SLOT_MSG_RECVD=1,
AQH_ENDPOINT_SLOT_MSG_SENT
AQH_ENDPOINT_SLOT_MSG_SENT,
AQH_ENDPOINT_SLOT_CLOSED
};
@@ -54,8 +59,8 @@ GWEN_INHERIT(AQH_OBJECT, AQH_ENDPOINT)
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2);
static int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr);
static int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr);
static void _dumpMsg(const AQH_MESSAGE *msg, const char *sText);
static int _handleMsgSent(AQH_OBJECT *o);
static int _handleClosed(AQH_OBJECT *o);
@@ -81,6 +86,7 @@ AQH_OBJECT *AQH_Endpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, A
if (msgReader) {
xo->msgReader=msgReader;
AQH_Object_AddLink(msgReader, AQH_MSG_READER_SIGNAL_MSGRECVD, AQH_ENDPOINT_SLOT_MSG_RECVD, o);
AQH_Object_AddLink(msgReader, AQH_MSG_READER_SIGNAL_CLOSED, AQH_ENDPOINT_SLOT_CLOSED, o);
}
if (msgWriter) {
@@ -98,6 +104,8 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
AQH_ENDPOINT *xo;
xo=(AQH_ENDPOINT*) p;
free(xo->serviceName);
free(xo->userName);
AQH_Message_List_free(xo->msgOutList);
AQH_Message_List_free(xo->msgInList);
AQH_Object_free(xo->msgWriter);
@@ -107,6 +115,197 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
const char *AQH_Endpoint_GetServiceName(const AQH_OBJECT *o)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return xo->serviceName;
}
return NULL;
}
void AQH_Endpoint_SetServiceName(AQH_OBJECT *o, const char *s)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo) {
free(xo->serviceName);
xo->serviceName=s?strdup(s):NULL;
}
}
}
const char *AQH_Endpoint_GetUserName(const AQH_OBJECT *o)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return xo->userName;
}
return NULL;
}
void AQH_Endpoint_SetUserName(AQH_OBJECT *o, const char *s)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo) {
free(xo->userName);
xo->userName=s?strdup(s):NULL;
}
}
}
uint32_t AQH_Endpoint_GetPermissions(const AQH_OBJECT *o)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return xo->permissions;
}
return 0;
}
void AQH_Endpoint_SetPermissions(AQH_OBJECT *o, uint32_t i)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
xo->permissions=i;
}
}
void AQH_Endpoint_AddPermissions(AQH_OBJECT *o, uint32_t i)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
xo->permissions|=i;
}
}
void AQH_Endpoint_SubPermissions(AQH_OBJECT *o, uint32_t i)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
xo->permissions&=~i;
}
}
uint32_t AQH_Endpoint_GetFlags(const AQH_OBJECT *o)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return xo->flags;
}
return 0;
}
void AQH_Endpoint_SetFlags(AQH_OBJECT *o, uint32_t i)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
xo->flags=i;
}
}
void AQH_Endpoint_AddFlags(AQH_OBJECT *o, uint32_t i)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
xo->flags|=i;
}
}
void AQH_Endpoint_SubFlags(AQH_OBJECT *o, uint32_t i)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
xo->flags&=~i;
}
}
int AQH_Endpoint_GetState(const AQH_OBJECT *o)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return xo->state;
}
return 0;
}
void AQH_Endpoint_SetState(AQH_OBJECT *o, int i)
{
if (o) {
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
xo->state=i;
}
}
AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgOutList(const AQH_OBJECT *o)
{
if (o) {
@@ -144,6 +343,7 @@ void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg)
if (xo) {
AQH_Message_List_Add(msg, xo->msgOutList);
if (xo->msgWriter && AQH_Message_List_GetCount(xo->msgOutList)==1) {
DBG_INFO(AQH_LOGDOMAIN, "Enabling msgWriter, sending message");
AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg));
AQH_Object_Enable(xo->msgWriter);
}
@@ -173,8 +373,15 @@ AQH_MESSAGE *AQH_Endpoint_GetNextMsgIn(AQH_OBJECT *o)
AQH_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo)
return AQH_Message_List_First(xo->msgInList);
if (xo) {
AQH_MESSAGE *msg;
msg=AQH_Message_List_First(xo->msgInList);
if (msg) {
AQH_Message_List_Del(msg);
return msg;
}
}
}
return NULL;
}
@@ -189,7 +396,7 @@ void AQH_Endpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg)
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo) {
AQH_Message_List_Add(msg, xo->msgInList);
DBG_ERROR(AQH_LOGDOMAIN, "now %d msgs in list", AQH_Message_List_GetCount(xo->msgInList));
DBG_INFO(AQH_LOGDOMAIN, "now %d msgs in list", AQH_Message_List_GetCount(xo->msgInList));
}
}
}
@@ -228,7 +435,8 @@ int _handleSignal(AQH_OBJECT *o, uint32_t slotId, GWEN_UNUSED AQH_OBJECT *sender
{
switch(slotId) {
case AQH_ENDPOINT_SLOT_MSG_RECVD: return _handleMsgRecvd(o, param1, param2);
case AQH_ENDPOINT_SLOT_MSG_SENT: return _handleMsgSent(o, param1, param2);
case AQH_ENDPOINT_SLOT_MSG_SENT: return _handleMsgSent(o);
case AQH_ENDPOINT_SLOT_CLOSED: return _handleClosed(o);
default:
break;
}
@@ -242,9 +450,9 @@ int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr)
{
AQH_MESSAGE *msg;
DBG_ERROR(AQH_LOGDOMAIN, "Msg received:");
DBG_INFO(AQH_LOGDOMAIN, "Msg received:");
/*GWEN_Text_LogString((const char*) msgPtr, msgLen, AQH_LOGDOMAIN, GWEN_LoggerLevel_Error);*/
msg=AQH_NodeMessage_fromBuffer(msgPtr, msgLen);
_dumpMsg(msg, "Received");
AQH_Endpoint_AddMsgIn(o, msg);
return 1;
@@ -252,11 +460,9 @@ int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr)
int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr)
int _handleMsgSent(AQH_OBJECT *o)
{
DBG_ERROR(AQH_LOGDOMAIN, "Msg sent:");
GWEN_Text_LogString((const char*) msgPtr, msgLen, AQH_LOGDOMAIN, GWEN_LoggerLevel_Error);
DBG_INFO(AQH_LOGDOMAIN, "Msg sent");
if (o) {
AQH_ENDPOINT *xo;
@@ -264,6 +470,7 @@ int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr)
if (xo) {
AQH_MESSAGE *msg;
DBG_INFO(AQH_LOGDOMAIN, "Messages in outlist: %d", AQH_Message_List_GetCount(xo->msgOutList));
msg=AQH_Message_List_First(xo->msgOutList);
if (msg) {
/* remove sent message from list */
@@ -273,33 +480,38 @@ int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr)
/* get next message in list */
msg=AQH_Message_List_First(xo->msgOutList);
if (msg) {
DBG_ERROR(AQH_LOGDOMAIN, "Sending next message");
DBG_INFO(AQH_LOGDOMAIN, "Sending next message");
AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg));
}
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "Last message sent, disabling writer");
AQH_Object_Disable(xo->msgWriter);
else {
DBG_INFO(AQH_LOGDOMAIN, "Last message sent, disabling writer");
AQH_Object_Disable(xo->msgWriter);
}
}
}
}
return 1;
}
void _dumpMsg(const AQH_MESSAGE *msg, const char *sText)
int _handleClosed(AQH_OBJECT *o)
{
if (msg) {
GWEN_BUFFER *mbuf;
AQH_ENDPOINT *xo;
mbuf=GWEN_Buffer_new(0, 256, 0, 1);
AQH_NodeMessage_DumpSpecificToBuffer(msg, mbuf, sText);
DBG_ERROR(AQH_LOGDOMAIN, "%s", GWEN_Buffer_GetStart(mbuf));
GWEN_Buffer_free(mbuf);
DBG_ERROR(AQH_LOGDOMAIN, "Connection closed.");
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo) {
AQH_Object_Disable(xo->msgWriter);
AQH_Object_Disable(xo->msgReader);
if (0==AQH_Object_EmitSignal(o, AQH_ENDPOINT_SIGNAL_CLOSED, 0, NULL)) {
DBG_ERROR(AQH_LOGDOMAIN, "Signal CLOSED not handled");
}
return 1;
}
return 0;
}

View File

@@ -14,6 +14,12 @@
enum {
AQH_ENDPOINT_SIGNAL_CLOSED=AQH_OBJECT_SIGNAL_LAST,
};
/**
* Constructor.
*
@@ -26,6 +32,25 @@
*/
AQHOME_API AQH_OBJECT *AQH_Endpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter);
AQHOME_API const char *AQH_Endpoint_GetServiceName(const AQH_OBJECT *o);
AQHOME_API void AQH_Endpoint_SetServiceName(AQH_OBJECT *o, const char *s);
AQHOME_API const char *AQH_Endpoint_GetUserName(const AQH_OBJECT *o);
AQHOME_API void AQH_Endpoint_SetUserName(AQH_OBJECT *o, const char *s);
AQHOME_API uint32_t AQH_Endpoint_GetPermissions(const AQH_OBJECT *o);
AQHOME_API void AQH_Endpoint_SetPermissions(AQH_OBJECT *o, uint32_t i);
AQHOME_API void AQH_Endpoint_AddPermissions(AQH_OBJECT *o, uint32_t i);
AQHOME_API void AQH_Endpoint_SubPermissions(AQH_OBJECT *o, uint32_t i);
AQHOME_API uint32_t AQH_Endpoint_GetFlags(const AQH_OBJECT *o);
AQHOME_API void AQH_Endpoint_SetFlags(AQH_OBJECT *o, uint32_t i);
AQHOME_API void AQH_Endpoint_AddFlags(AQH_OBJECT *o, uint32_t i);
AQHOME_API void AQH_Endpoint_SubFlags(AQH_OBJECT *o, uint32_t i);
AQHOME_API int AQH_Endpoint_GetState(const AQH_OBJECT *o);
AQHOME_API void AQH_Endpoint_SetState(AQH_OBJECT *o, int i);
AQHOME_API AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgOutList(const AQH_OBJECT *o);
AQHOME_API AQH_MESSAGE *AQH_Endpoint_GetNextMsgOut(AQH_OBJECT *o);
AQHOME_API void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg);

View File

@@ -21,6 +21,11 @@ struct AQH_ENDPOINT {
AQH_OBJECT *msgReader;
uint32_t lastMsgId;
int state;
uint32_t permissions;
uint32_t flags;
char *serviceName;
char *userName;
};

53
aqhome/ipc2/ipc_client.c Normal file
View File

@@ -0,0 +1,53 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./ipc_client.h"
#include <aqhome/ipc2/ipcmsgreader.h>
#include <aqhome/ipc2/msgwriter.h>
#include <aqhome/events2/fdobject.h>
#include <gwenhywfar/debug.h>
#include <unistd.h>
/* ------------------------------------------------------------------------------------------------
* code
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_IpcClientObject_new(AQH_EVENT_LOOP *eventLoop, int fd)
{
int fdCopy;
AQH_OBJECT *fdReader;
AQH_OBJECT *fdWriter;
AQH_OBJECT *msgReader;
AQH_OBJECT *msgWriter;
AQH_OBJECT *endpoint;
fdCopy=dup(fd);
fdReader=AQH_FdObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ);
msgReader=AQH_IpcMsgReader_new(eventLoop, fdReader);
AQH_Object_Enable(msgReader);
fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE);
msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter);
endpoint=AQH_Endpoint_new(eventLoop, msgReader, msgWriter);
return endpoint;
}

28
aqhome/ipc2/ipc_client.h Normal file
View File

@@ -0,0 +1,28 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_IPC_CLIENT_H
#define AQH_IPC_CLIENT_H
#include <aqhome/ipc2/endpoint.h>
/**
* Create an endpoint object (see @ref AQH_Endpoint_new) which works over a given socket.
*
* @param eventLoop pointer to eventLoop
* @param fd connected non-blocking socket to work with (see @ref AQH_TcpObject_CreateConnectedSocket).
*/
AQHOME_API AQH_OBJECT *AQH_IpcClientObject_new(AQH_EVENT_LOOP *eventLoop, int fd);
#endif

View File

@@ -101,7 +101,7 @@ int _handleSignal(AQH_OBJECT *o,
GWEN_UNUSED void *param2)
{
switch(slotId) {
case AQH_TCPD_OBJECT_SIGNAL_NEWCONN: return _handleNewConn(o, param1);
case AQH_IPCD_OBJECT_SLOT_NEWCONN: return _handleNewConn(o, param1);
default:
break;
}
@@ -127,6 +127,7 @@ int _handleNewConn(AQH_OBJECT *o, int newFd)
fdReader=AQH_FdObject_new(eventLoop, newFd, AQH_FDOBJECT_FDMODE_READ);
msgReader=AQH_IpcMsgReader_new(eventLoop, fdReader);
AQH_Object_Enable(msgReader);
fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE);
msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter);

View File

@@ -17,6 +17,7 @@
#include <gwenhywfar/inherit.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endianfns.h>
#include <gwenhywfar/text.h>
#include <sys/socket.h>
@@ -63,6 +64,7 @@ int _readMsg(AQH_OBJECT *o)
int rv;
if (xo->bytesReceived<AQH_MSG_READER_HEADER_SIZE) {
DBG_INFO(AQH_LOGDOMAIN, "Reading header");
rv=_readHeaderFromRingbuffer(xo);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
@@ -71,6 +73,7 @@ int _readMsg(AQH_OBJECT *o)
}
if (xo->bytesReceived>=AQH_MSG_READER_HEADER_SIZE) {
DBG_INFO(AQH_LOGDOMAIN, "Reading body");
/* reading remainder of msg directly into allocated buffer */
rv=AQH_MsgReader_ReadRemainderFromRingbuffer(o);
if (rv<0) {
@@ -90,7 +93,7 @@ int _readMsg(AQH_OBJECT *o)
rv=AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_MSGRECVD, msgLen, (void*) msgPtr);
if (rv==0) {
DBG_INFO(AQH_LOGDOMAIN, "Received message ignored");
DBG_ERROR(AQH_LOGDOMAIN, "Received message ignored");
}
free(msgPtr);
return 1;
@@ -136,9 +139,9 @@ int _readHeaderFromRingbuffer(AQH_MSG_READER *xo)
DBG_ERROR(AQH_LOGDOMAIN, "Bad message size(%lu)", (unsigned long int) msgLen);
return GWEN_ERROR_GENERIC;
}
xo->currentMsgBuf=(uint8_t*) malloc(msgLen+4); /* +4 because of msg len (4 bytes) */
xo->currentMsgBuf=(uint8_t*) malloc(msgLen);
memmove(xo->currentMsgBuf, xo->headerBuffer, xo->bytesReceived);
xo->bytesLeft=(msgLen+4)-xo->bytesReceived;
xo->bytesLeft=msgLen-xo->bytesReceived;
}
return 0;

View File

@@ -201,6 +201,7 @@ int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject)
{
AQH_MSG_READER *xo;
DBG_INFO(AQH_LOGDOMAIN, "Socket ready");
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo) {
int rv;
@@ -401,7 +402,7 @@ void _resetBuffers(AQH_MSG_READER *xo)
void _cbEnable(AQH_OBJECT *o)
{
if (o && !(AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) {
if (o) {
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
@@ -414,7 +415,7 @@ void _cbEnable(AQH_OBJECT *o)
void _cbDisable(AQH_OBJECT *o)
{
if (o && (AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) {
if (o) {
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);

View File

@@ -129,11 +129,12 @@ int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject)
{
AQH_MSG_WRITER *xo;
DBG_INFO(AQH_LOGDOMAIN, "Socket ready");
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o);
if (xo) {
if (xo->bytesLeft) {
int rv;
int rv;
if (xo->bytesLeft) {
if (!(xo->flags & AQH_MSGWRITER_FLAGS_MSGSTARTED)) {
rv=_startMsg(xo, fdObject);
if (rv<0) {
@@ -161,12 +162,17 @@ int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject)
}
else {
if (xo->bytesLeft==0) {
int msgLen;
const uint8_t *msgPtr;
_endMsg(xo, fdObject);
rv=AQH_Object_EmitSignal(o, AQH_MSG_WRITER_SIGNAL_MSGSENT, xo->msgBufLen, (void*) (xo->msgBufPtr));
if (rv==0) {
DBG_INFO(AQH_LOGDOMAIN, "Sent message ignored");
}
msgPtr=xo->msgBufPtr;
msgLen=xo->msgBufLen;
_resetBuffer(o);
rv=AQH_Object_EmitSignal(o, AQH_MSG_WRITER_SIGNAL_MSGSENT, msgLen, (void*) msgPtr);
if (rv==0) {
DBG_ERROR(AQH_LOGDOMAIN, "Sent message ignored");
}
}
}
}

178
aqhome/ipc2/tcp_object.c Normal file
View File

@@ -0,0 +1,178 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./tcp_object.h"
#include <gwenhywfar/debug.h>
#include <unistd.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <errno.h>
#include <string.h>
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static int _socketSetBlocking(int sk, int fl);
static int _translateHError(int herr);
static int _setHostAddr(struct in_addr *inetAddr, const char *sAddr);
static int _setHostName(struct in_addr *inetAddr, const char *sAddr);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
int AQH_TcpObject_CreateConnectedSocket(const char *addr, int port)
{
int sk;
struct sockaddr_in inetAddr;
int rv;
memset(&inetAddr, 0, sizeof(inetAddr));
inetAddr.sin_family=AF_INET;
rv=_setHostAddr(&inetAddr.sin_addr, addr); /* try tuple */
if (rv<0) {
rv=_setHostName(&inetAddr.sin_addr, addr); /* lookup name */
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
}
inetAddr.sin_port=htons(port);
sk=socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sk<0) {
DBG_INFO(AQH_LOGDOMAIN, "socket(): %s", strerror(errno));
return GWEN_ERROR_IO;
}
rv=connect(sk, (struct sockaddr*) &inetAddr, sizeof(inetAddr));
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
close(sk);
return rv;
}
rv=_socketSetBlocking(sk, 0);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
close(sk);
return rv;
}
return sk;
}
int _setHostAddr(struct in_addr *inetAddr, const char *sAddr)
{
inetAddr->s_addr=0;
if (!inet_aton(sAddr, inetAddr)) {
DBG_ERROR(AQH_LOGDOMAIN, "Invalid address \"%s\"", sAddr);
return GWEN_ERROR_INVALID;
}
return 0;
}
int _setHostName(struct in_addr *inetAddr, const char *sAddr)
{
struct hostent *he;
he=gethostbyname(sAddr);
if (!he) {
DBG_ERROR(AQH_LOGDOMAIN, "gethostbyname(\"%s\"): %s", sAddr, hstrerror(h_errno));
return _translateHError(h_errno);
}
/* name resolved, store address */
memcpy(inetAddr, he->h_addr_list[0], sizeof(struct in_addr));
return 0;
}
int _socketSetBlocking(int sk, int fl)
{
int prevFlags;
int newFlags;
/* get current socket flags */
prevFlags=fcntl(sk, F_GETFL);
if (prevFlags==-1) {
DBG_INFO(AQH_LOGDOMAIN, "fcntl(): %s", strerror(errno));
return GWEN_ERROR_IO;
}
/* set nonblocking/blocking */
if (fl)
newFlags=prevFlags&(~O_NONBLOCK);
else
newFlags=prevFlags|O_NONBLOCK;
if (-1==fcntl(sk, F_SETFL, newFlags)) {
DBG_INFO(AQH_LOGDOMAIN, "fcntl(): %s", strerror(errno));
return GWEN_ERROR_IO;
}
prevFlags=fcntl(sk, F_GETFL);
if (prevFlags!=newFlags) {
DBG_ERROR(AQH_LOGDOMAIN, "fcntl() did not set flags correctly (%08x!=%08x)", prevFlags, newFlags);
return GWEN_ERROR_IO;
}
return 0;
}
int _translateHError(int herr)
{
int rv;
switch (herr) {
case HOST_NOT_FOUND:
rv=GWEN_ERROR_HOST_NOT_FOUND;
break;
#ifdef NO_ADDRESS
case NO_ADDRESS:
rv=GWEN_ERROR_NO_ADDRESS;
break;
#endif
case NO_RECOVERY:
rv=GWEN_ERROR_NO_RECOVERY;
break;
case TRY_AGAIN:
rv=GWEN_ERROR_TRY_AGAIN;
break;
default:
rv=GWEN_ERROR_UNKNOWN_DNS_ERROR;
break;
} /* switch */
return rv;
}

24
aqhome/ipc2/tcp_object.h Normal file
View File

@@ -0,0 +1,24 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_TCP_OBJECT_H
#define AQH_TCP_OBJECT_H
#include <aqhome/events2/object.h>
/**
* Helper function to create and connect a TCP socket.
*/
AQHOME_API int AQH_TcpObject_CreateConnectedSocket(const char *addr, int port);
#endif

View File

@@ -47,6 +47,8 @@ static void GWENHYWFAR_CB _freeData(void *bp, void *p);
static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2);
static int _handleSocketReady(AQH_OBJECT *o);
static void _cbEnable(AQH_OBJECT *o);
static void _cbDisable(AQH_OBJECT *o);
static int _socketSetReuseAddress(int sk, int fl);
static int _socketSetBlocking(int sk, int fl);
@@ -71,16 +73,12 @@ AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, int fd)
GWEN_NEW_OBJECT(AQH_TCPD_OBJECT, xo);
GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o, xo, _freeData);
AQH_Object_SetSignalHandlerFn(o, _handleSignal);
AQH_Object_SetEnableFn(o, _cbEnable);
AQH_Object_SetDisableFn(o, _cbDisable);
xo->fdSocket=fd;
xo->fdObject=AQH_FdObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ);
#if 0
/* create object for readable socket, connect to THIS, enable */
xo->fdObject=AQH_FdObject_new(AQH_Object_GetEventLoop(o), rv, AQH_FDOBJECT_FDMODE_READ);
AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_TCPD_OBJECT_SLOT_SOCKETREADY, o);
AQH_Object_Enable(xo->fdObject);
#endif
return o;
}
@@ -178,17 +176,19 @@ int _handleSocketReady(AQH_OBJECT *o)
{
AQH_TCPD_OBJECT *xo;
DBG_ERROR(NULL, "Socket ready");
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o);
if (xo) {
int clientSk;
clientSk=_acceptConnection(xo->fdSocket);
if (clientSk<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", clientSk);
DBG_ERROR(AQH_LOGDOMAIN, "here (%d)", clientSk);
}
else {
DBG_ERROR(AQH_LOGDOMAIN, "New connection");
if (0==AQH_Object_EmitSignal(o, AQH_TCPD_OBJECT_SIGNAL_NEWCONN, clientSk, NULL)) {
DBG_INFO(AQH_LOGDOMAIN, "New connection not handled");
DBG_ERROR(AQH_LOGDOMAIN, "New connection not handled");
close(clientSk);
}
}
@@ -199,6 +199,32 @@ int _handleSocketReady(AQH_OBJECT *o)
void _cbEnable(AQH_OBJECT *o)
{
if (o) {
AQH_TCPD_OBJECT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o);
if (xo && xo->fdObject)
AQH_Object_Enable(xo->fdObject);
}
}
void _cbDisable(AQH_OBJECT *o)
{
if (o) {
AQH_TCPD_OBJECT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o);
if (xo && xo->fdObject)
AQH_Object_Disable(xo->fdObject);
}
}
int _setHostAddr(struct in_addr *inetAddr, const char *sAddr)
{
inetAddr->s_addr=0;

View File

@@ -509,7 +509,7 @@ int testTty(int argc, char **argv)
AQH_Object_Enable(msgReaderObject);
for (;;) {
AQH_EventLoop_Run(eventLoop);
AQH_EventLoop_Run(eventLoop, 500);
}
return 0;
@@ -547,7 +547,7 @@ int testTty2(int argc, char **argv)
endpointObject=AQH_TtyEndpoint2_new(eventLoop, fd);
for (;;) {
AQH_EventLoop_Run(eventLoop);
AQH_EventLoop_Run(eventLoop, 500);
}
return 0;

View File

@@ -59,7 +59,7 @@ AQH_MESSAGE *AQH_IpcdMessageValues_new(uint16_t code, uint32_t msgId, uint32_t r
AQH_VALUE_LIST *AQH_IpcdMessageDevices_ReadValueList(const GWEN_TAG16_LIST *tagList)
AQH_VALUE_LIST *AQH_IpcdMessageValues_ReadValueList(const GWEN_TAG16_LIST *tagList)
{
if (tagList) {
AQH_VALUE_LIST *valueList;

View File

@@ -29,7 +29,7 @@
AQHOME_API AQH_MESSAGE *AQH_IpcdMessageValues_new(uint16_t code, uint32_t msgId, uint32_t refMsgId,
uint32_t flags, const AQH_VALUE_LIST *valueList);
AQHOME_API AQH_VALUE_LIST *AQH_IpcdMessageDevices_ReadValueList(const GWEN_TAG16_LIST *tagList);
AQHOME_API AQH_VALUE_LIST *AQH_IpcdMessageValues_ReadValueList(const GWEN_TAG16_LIST *tagList);
AQHOME_API AQH_VALUE *AQH_IpcdMessageValues_ReadFirstValue(const GWEN_TAG16_LIST *tagList);
AQHOME_API uint32_t AQH_IpcdMessageValues_GetFlags(const GWEN_TAG16_LIST *tagList);

View File

@@ -143,28 +143,3 @@ void AQH_IpcMessage_DumpToBuffer(const AQH_MESSAGE *msg, GWEN_BUFFER *dbuf, cons
const char *AQH_IpcMessage_MsgTypeToChar(uint16_t i)
{
switch(i) {
case AQH_MSGTYPE_IPC_DATA_RESULT: return "Result";
case AQH_MSGTYPE_IPC_DATA_CONNECT_REQ: return "Connect(Req)";
case AQH_MSGTYPE_IPC_DATA_UPDATEDATA: return "UpdateData";
case AQH_MSGTYPE_IPC_DATA_DATACHANGED: return "DataChanged";
case AQH_MSGTYPE_IPC_DATA_SETDATA: return "SetData";
case AQH_MSGTYPE_IPC_DATA_ADDVALUE: return "AddValue";
case AQH_MSGTYPE_IPC_DATA_GETDATA_REQ: return "GetData(Req)";
case AQH_MSGTYPE_IPC_DATA_GETDATA_RSP: return "GetData(Rsp)";
case AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ: return "GetLastData(Req)";
case AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP: return "GetLastData(Rsp)";
case AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ: return "GetValues(Req)";
case AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP: return "GetValues(Rsp)";
case AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ: return "GetDevices(Req)";
case AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP: return "GetDevices(Rsp)";
case AQH_MSGTYPE_IPC_DATA_MODDEVICE_REQ: return "ModDevice(Req)";
case AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE: return "AnnounceValue";
default: return "(unknown)";
}
}

View File

@@ -26,38 +26,6 @@
#define AQH_IPC_PROTOCOL_DATA_ID 2
#define AQH_IPC_PROTOCOL_DATA_VERSION 1
#define AQH_MSGTYPE_IPC_DATA_RESULT 0x0001 /* AQH_ResultIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_CONNECT_REQ 0x0010 /* serviceName, userName, password */
#define AQH_MSGTYPE_IPC_DATA_UPDATEDATA 0x0100 /* AQH_MultiDataDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_DATACHANGED 0x0200 /* AQH_MultiDataDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_SETDATA 0x0300 /* AQH_SetDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_ADDVALUE 0x0400 /* AQH_AddValueDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_GETDATA_REQ 0x0500 /* AQH_GetDataDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_GETDATA_RSP 0x0600 /* AQH_MultiDataDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ 0x0700 /* AQH_GetDataDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP 0x0800 /* AQH_MultiDataDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ 0x0900 /* GWEN_IpcMsg */
#define AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP 0x0a00 /* AQH_ValuesDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ 0x0b00 /* GWEN_IpcMsg */
#define AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP 0x0c00 /* AQH_DevicesDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_MODDEVICE_REQ 0x0d00 /* AQH_DevicesDataIpcMsg */
#define AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE 0x0e00 /* AQH_ValuesDataIpcMsg */
AQHOME_API AQH_MESSAGE *AQH_IpcMessage_new(uint8_t protoId, uint8_t protoVer, uint16_t code,
@@ -80,9 +48,6 @@ AQHOME_API uint8_t *AQH_IpcMessage_GetPayloadPointer(const AQH_MESSAGE *msg);
AQHOME_API void AQH_IpcMessage_DumpToBuffer(const AQH_MESSAGE *msg, GWEN_BUFFER *dbuf, const char *sText);
AQHOME_API const char *AQH_IpcMessage_MsgTypeToChar(uint16_t i);

View File

@@ -53,6 +53,13 @@ AQH_MESSAGE *AQH_IpcMessageResult_new(uint8_t protoId, uint8_t protoVer, uint16_
uint32_t AQH_IpcMessageResult_GetResult(const GWEN_TAG16_LIST *tagList)
{
return tagList?AQH_Tag16_GetTagDataAsUint32(tagList, AQH_MSGDATA_RESULT_TAGS_RESULT, 0):0;
}
void AQH_IpcMessageResult_DumpToBuffer(const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList, GWEN_BUFFER *dbuf, const char *sText)
{
int result=0;
@@ -64,8 +71,7 @@ void AQH_IpcMessageResult_DumpToBuffer(const AQH_MESSAGE *msg, const GWEN_TAG16_
}
GWEN_Buffer_AppendArgs(dbuf,
"RESULT(%s) %s (code=%d, proto=%d, proto version=%d, result=%d, text=\"%s\")\n",
AQH_IpcMessage_MsgTypeToChar(AQH_IpcMessage_GetCode(msg)),
"RESULT %s (code=%d, proto=%d, proto version=%d, result=%d, text=\"%s\")\n",
sText?sText:"",
AQH_IpcMessage_GetCode(msg),
AQH_IpcMessage_GetProtoId(msg),

View File

@@ -17,6 +17,18 @@
#include <gwenhywfar/tag16.h>
#define AQH_MSGDATA_RESULT_SUCCESS 0
#define AQH_MSGDATA_RESULT_ERROR_GENERIC 1
#define AQH_MSGDATA_RESULT_ERROR_INVALID 2
#define AQH_MSGDATA_RESULT_ERROR_EXISTS 3
#define AQH_MSGDATA_RESULT_ERROR_NODATA 4
#define AQH_MSGDATA_RESULT_ERROR_BADDATA 5
#define AQH_MSGDATA_RESULT_ERROR_PERMS 6
#define AQH_MSGDATA_RESULT_ERROR_NOTFOUND 7
#define AQH_MSGDATA_RESULT_ERROR_IO 8
#define AQH_MSGDATA_RESULT_ERROR_TRYAGAIN 9
#define AQH_MSGDATA_RESULT_TAGS_RESULT 0x0001
#define AQH_MSGDATA_RESULT_TAGS_TEXT 0x0002
@@ -26,6 +38,9 @@
AQHOME_API AQH_MESSAGE *AQH_IpcMessageResult_new(uint8_t protoId, uint8_t protoVer, uint16_t code,
uint32_t msgId, uint32_t refMsgId,
int result, const char *text);
AQHOME_API uint32_t AQH_IpcMessageResult_GetResult(const GWEN_TAG16_LIST *tagList);
AQHOME_API void AQH_IpcMessageResult_DumpToBuffer(const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList,
GWEN_BUFFER *dbuf, const char *sText);