aqhome: started rewriting message code, start using new event2 lib.

This commit is contained in:
Martin Preuss
2025-02-25 01:13:07 +01:00
parent f1f24168e5
commit cf8edbbd5f
58 changed files with 5393 additions and 163 deletions

View File

@@ -46,13 +46,22 @@
<headers dist="true" install="$(pkgincludedir)/ipc" >
msgreader.h
msgwriter.h
ipcmsgreader.h
nodemsgreader.h
ttyobject.h
tcpd_object.h
nodeendpoint.h
message.h
</headers>
<headers dist="true" >
msgreader_p.h
msgwriter_p.h
tcpd_object_p.h
nodeendpoint_p.h
message_p.h
</headers>
@@ -60,7 +69,13 @@
$(local/typefiles)
msgreader.c
msgwriter.c
ipcmsgreader.c
nodemsgreader.c
ttyobject.c
tcpd_object.c
nodeendpoint.c
message.c
</sources>

149
aqhome/ipc2/ipcmsgreader.c Normal file
View File

@@ -0,0 +1,149 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./ipcmsgreader.h"
#include "./msgreader_p.h"
#include <aqhome/events2/fdobject.h>
#include <gwenhywfar/inherit.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endianfns.h>
#include <sys/socket.h>
#define AQH_MSG_READER_HEADER_SIZE 4
#define AQH_MSG_READER_MINMSGSIZE 12
#define AQH_MSG_READER_MAXMSGSIZE 10240
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static int _readMsg(AQH_OBJECT *o);
static int _readHeaderFromRingbuffer(AQH_MSG_READER *xo);
/* ------------------------------------------------------------------------------------------------
* implementation
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_IpcMsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject)
{
AQH_OBJECT *o;
o=AQH_MsgReader_new(eventLoop, fdObject);
AQH_MsgReader_SetReadMsgFn(o, _readMsg);
return o;
}
int _readMsg(AQH_OBJECT *o)
{
AQH_MSG_READER *xo;
xo=AQH_MsgReader_GetData(o);
if (xo) {
int rv;
if (xo->bytesReceived<AQH_MSG_READER_HEADER_SIZE) {
rv=_readHeaderFromRingbuffer(xo);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
}
if (xo->bytesReceived>=AQH_MSG_READER_HEADER_SIZE) {
/* reading remainder of msg directly into allocated buffer */
rv=AQH_MsgReader_ReadRemainderFromRingbuffer(o);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
else if (rv==1) {
int msgLen;
uint8_t *msgPtr;
msgLen=xo->bytesReceived;
msgPtr=xo->currentMsgBuf;
xo->bytesReceived=0;
xo->bytesLeft=0;
xo->currentMsgBuf=NULL;
rv=AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_MSGRECVD, msgLen, (void*) msgPtr);
if (rv==0) {
DBG_INFO(AQH_LOGDOMAIN, "Received message ignored");
}
free(msgPtr);
return 1;
}
}
return 0;
}
return GWEN_ERROR_GENERIC;
}
int _readHeaderFromRingbuffer(AQH_MSG_READER *xo)
{
uint32_t remaining;
int rv;
uint32_t xferSize;
uint32_t bytesInBuffer;
bytesInBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer);
/* still reading header */
remaining=AQH_MSG_READER_HEADER_SIZE-xo->bytesReceived;
if (bytesInBuffer<remaining)
remaining=bytesInBuffer;
xferSize=remaining;
rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->headerBuffer+xo->bytesReceived), &xferSize);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer empty");
return 0;
}
if (xferSize<remaining) {
DBG_ERROR(AQH_LOGDOMAIN, "Read fewer bytes than available?");
return GWEN_ERROR_GENERIC;
}
xo->bytesReceived+=xferSize;
if (xo->bytesReceived==AQH_MSG_READER_HEADER_SIZE) {
uint32_t msgLen;
/* full size received, parse msg size, allocate buffer */
msgLen=GWEN_ENDIAN_LE32TOH(*((const uint32_t*)(xo->headerBuffer)));
if (msgLen<AQH_MSG_READER_MINMSGSIZE || msgLen>AQH_MSG_READER_MAXMSGSIZE) {
DBG_ERROR(AQH_LOGDOMAIN, "Bad message size(%lu)", (unsigned long int) msgLen);
return GWEN_ERROR_GENERIC;
}
xo->currentMsgBuf=(uint8_t*) malloc(msgLen+4); /* +4 because of msg len (4 bytes) */
memmove(xo->currentMsgBuf, xo->headerBuffer, xo->bytesReceived);
xo->bytesLeft=(msgLen+4)-xo->bytesReceived;
}
return 0;
}

View File

@@ -0,0 +1,20 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_IPCMSGREADER_H
#define AQH_IPCMSGREADER_H
#include <aqhome/events2/object.h>
AQH_OBJECT *AQH_IpcMsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject);
#endif

355
aqhome/ipc2/message.c Normal file
View File

@@ -0,0 +1,355 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./message_p.h"
#include <gwenhywfar/misc.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endianfns.h>
GWEN_LIST_FUNCTIONS(AQH_MESSAGE, AQH_Message)
GWEN_INHERIT_FUNCTIONS(AQH_MESSAGE)
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static void _ensureSize(AQH_MESSAGE *msg, uint32_t neededSize);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
AQH_MESSAGE *AQH_Message_new(void)
{
AQH_MESSAGE *msg;
GWEN_NEW_OBJECT(AQH_MESSAGE, msg);
GWEN_INHERIT_INIT(AQH_MESSAGE, msg);
GWEN_LIST_INIT(AQH_MESSAGE, msg);
msg->refCount=1;
return msg;
}
void AQH_Message_IncRef(AQH_MESSAGE *msg)
{
if (msg && msg->refCount)
msg->refCount++;
}
void AQH_Message_free(AQH_MESSAGE *msg)
{
if (msg && msg->refCount>0) {
if (msg->refCount==1) {
msg->refCount=0;
GWEN_LIST_FINI(AQH_MESSAGE, msg);
GWEN_INHERIT_FINI(AQH_MESSAGE, msg);
free(msg->msgPointer);
GWEN_FREE_OBJECT(msg);
}
else
msg->refCount--;
}
}
uint8_t *AQH_Message_GetMsgPointer(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgPointer:NULL;
}
uint32_t AQH_Message_GetMsgSize(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgSize:0;
}
void AQH_Message_SetData(AQH_MESSAGE *msg, const uint8_t *msgPtr, uint32_t msgSize)
{
if (msg && msg->refCount) {
free(msg->msgPointer);
msg->msgPointer=NULL;
msg->msgSize=0;
if (msgSize) {
msg->msgPointer=(uint8_t*) malloc(msgSize);
if (msg->msgPointer) {
if (msgPtr)
memmove(msg->msgPointer, msgPtr, msgSize);
else
memset(msg->msgPointer, 0, msgSize);
msg->msgSize=msgSize;
}
}
}
}
uint32_t AQH_Message_GetUsedSize(const AQH_MESSAGE *msg)
{
return msg?msg->usedSize:0;
}
void AQH_Message_SetUsedSize(AQH_MESSAGE *msg, uint32_t i)
{
if (msg) {
_ensureSize(msg, i);
msg->usedSize=i;
}
}
void AQH_Message_IncUsedSize(AQH_MESSAGE *msg, uint32_t i)
{
if (msg) {
_ensureSize(msg, msg->usedSize+i);
msg->usedSize+=i;
}
}
int AQH_Message_GetMsgType(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgType:0;
}
void AQH_Message_SetMsgType(AQH_MESSAGE *msg, int i)
{
if (msg && msg->refCount)
msg->msgType=i;
}
int AQH_Message_GetMsgProtoId(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgProtoId:0;
}
void AQH_Message_SetMsgProtoId(AQH_MESSAGE *msg, int i)
{
if (msg && msg->refCount)
msg->msgProtoId=i;
}
int AQH_Message_GetMsgProtoVer(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgProtoVer:0;
}
void AQH_Message_SetMsgProtoVer(AQH_MESSAGE *msg, int i)
{
if (msg && msg->refCount)
msg->msgProtoVer=i;
}
int AQH_Message_GetMsgCommand(const AQH_MESSAGE *msg)
{
return (msg && msg->refCount)?msg->msgCommand:0;
}
void AQH_Message_SetMsgCommand(AQH_MESSAGE *msg, int i)
{
if (msg && msg->refCount)
msg->msgCommand=i;
}
void AQH_Message_WriteUint8At(AQH_MESSAGE *msg, uint32_t pos, uint8_t d)
{
if (msg) {
_ensureSize(msg, pos+1);
msg->msgPointer[pos]=d;
}
}
uint8_t AQH_Message_ReadUint8At(const AQH_MESSAGE *msg, uint32_t pos, uint8_t defaultValue)
{
if (msg && msg->msgPointer && msg->msgSize>=pos+1)
return msg->msgPointer[pos];
return defaultValue;
}
void AQH_Message_WriteUint16At(AQH_MESSAGE *msg, uint32_t pos, uint16_t d)
{
if (msg) {
_ensureSize(msg, pos+2);
*( (uint16_t*) (msg->msgPointer+pos) )=GWEN_ENDIAN_HTOLE16(d);
}
}
uint16_t AQH_Message_ReadUint16At(const AQH_MESSAGE *msg, uint32_t pos, uint16_t defaultValue)
{
if (msg && msg->msgPointer && msg->msgSize>=pos+2)
return GWEN_ENDIAN_LE16TOH(*( (uint16_t*) (msg->msgPointer+pos) ));
return defaultValue;
}
void AQH_Message_WriteUint32At(AQH_MESSAGE *msg, uint32_t pos, uint32_t d)
{
if (msg) {
_ensureSize(msg, pos+4);
*( (uint32_t*) (msg->msgPointer+pos) )=GWEN_ENDIAN_HTOLE32(d);
}
}
uint32_t AQH_Message_ReadUint32At(const AQH_MESSAGE *msg, uint32_t pos, uint32_t defaultValue)
{
if (msg && msg->msgPointer && msg->msgSize>=pos+4)
return GWEN_ENDIAN_LE32TOH(*( (uint32_t*) (msg->msgPointer+pos) ));
return defaultValue;
}
void AQH_Message_WriteUint64At(AQH_MESSAGE *msg, uint32_t pos, uint64_t d)
{
if (msg) {
_ensureSize(msg, pos+8);
*( (uint64_t*) (msg->msgPointer+pos) )=GWEN_ENDIAN_HTOLE64(d);
}
}
uint64_t AQH_Message_ReadUint64At(const AQH_MESSAGE *msg, uint32_t pos, uint64_t defaultValue)
{
if (msg && msg->msgPointer && msg->msgSize>=pos+8)
return GWEN_ENDIAN_LE64TOH(*( (uint64_t*) (msg->msgPointer+pos) ));
return defaultValue;
}
void AQH_Message_WriteBytesAt(AQH_MESSAGE *msg, uint32_t pos, const uint8_t *bufferPtr, uint32_t bufferLen)
{
if (msg && bufferPtr && bufferLen) {
_ensureSize(msg, pos+bufferLen);
memmove(msg->msgPointer+pos, bufferPtr, bufferLen);
}
}
int AQH_Message_WriteStringAt(AQH_MESSAGE *msg, uint32_t pos, uint32_t maxSize, int filler, const char *s)
{
if (msg && maxSize) {
uint32_t slen=0;
if (s) {
slen=strlen(s);
if (slen>maxSize) {
DBG_ERROR(AQH_LOGDOMAIN, "String too long (%d > %d)", slen, maxSize);
return GWEN_ERROR_INVALID;
}
}
_ensureSize(msg, pos+maxSize);
if (s)
memmove(msg->msgPointer+pos, s, slen);
if (slen<maxSize)
memset(msg->msgPointer+pos+slen, filler, maxSize-slen);
}
return 0;
}
int AQH_Message_WriteStringWithTrailingNullAt(AQH_MESSAGE *msg, uint32_t pos, uint32_t maxSize, int filler, const char *s)
{
if (msg && maxSize) {
uint32_t slen=1;
if (s) {
slen=strlen(s)+1;
if (slen>maxSize) {
DBG_ERROR(AQH_LOGDOMAIN, "String too long (%d > %d)", slen, maxSize);
return GWEN_ERROR_INVALID;
}
}
_ensureSize(msg, pos+maxSize);
if (s)
memmove(msg->msgPointer+pos, s, slen);
if (slen<maxSize)
memset(msg->msgPointer+pos+slen, filler, maxSize-slen);
}
return 0;
}
void _ensureSize(AQH_MESSAGE *msg, uint32_t neededSize)
{
if (msg && neededSize) {
if (msg->msgPointer==0 || msg->msgSize==0) {
msg->msgPointer=(uint8_t*) malloc(neededSize);
assert(msg->msgPointer);
}
else {
if (neededSize>msg->msgSize) {
uint8_t *ptr;
ptr=realloc(msg->msgPointer, neededSize);
assert(ptr);
if (ptr) {
msg->msgPointer=ptr;
msg->msgSize=neededSize;
}
}
}
}
}

72
aqhome/ipc2/message.h Normal file
View File

@@ -0,0 +1,72 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_MESSAGE_H
#define AQH_MESSAGE_H
#include <aqhome/api.h>
#include <gwenhywfar/list.h>
#include <gwenhywfar/inherit.h>
typedef struct AQH_MESSAGE AQH_MESSAGE;
GWEN_LIST_FUNCTION_LIB_DEFS(AQH_MESSAGE, AQH_Message, AQHOME_API)
GWEN_INHERIT_FUNCTION_LIB_DEFS(AQH_MESSAGE, AQHOME_API)
AQHOME_API AQH_MESSAGE *AQH_Message_new(void);
AQHOME_API void AQH_Message_IncRef(AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_free(AQH_MESSAGE *msg);
/* unparsed data */
AQHOME_API uint8_t *AQH_Message_GetMsgPointer(const AQH_MESSAGE *msg);
AQHOME_API uint32_t AQH_Message_GetMsgSize(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetData(AQH_MESSAGE *msg, const uint8_t *msgPtr, uint32_t msgSize);
AQHOME_API uint32_t AQH_Message_GetUsedSize(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetUsedSize(AQH_MESSAGE *msg, uint32_t i);
AQHOME_API void AQH_Message_IncUsedSize(AQH_MESSAGE *msg, uint32_t i);
/* parsed header data */
AQHOME_API int AQH_Message_GetMsgType(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetMsgType(AQH_MESSAGE *msg, int i);
AQHOME_API int AQH_Message_GetMsgProtoId(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetMsgProtoId(AQH_MESSAGE *msg, int i);
AQHOME_API int AQH_Message_GetMsgProtoVer(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetMsgProtoVer(AQH_MESSAGE *msg, int i);
AQHOME_API int AQH_Message_GetMsgCommand(const AQH_MESSAGE *msg);
AQHOME_API void AQH_Message_SetMsgCommand(AQH_MESSAGE *msg, int i);
/* helper functions for parsing */
AQHOME_API void AQH_Message_WriteUint8At(AQH_MESSAGE *msg, uint32_t pos, uint8_t d);
AQHOME_API uint8_t AQH_Message_ReadUint8At(const AQH_MESSAGE *msg, uint32_t pos, uint8_t defaultValue);
AQHOME_API void AQH_Message_WriteUint16At(AQH_MESSAGE *msg, uint32_t pos, uint16_t d);
AQHOME_API uint16_t AQH_Message_ReadUint16At(const AQH_MESSAGE *msg, uint32_t pos, uint16_t defaultValue);
AQHOME_API void AQH_Message_WriteUint32At(AQH_MESSAGE *msg, uint32_t pos, uint32_t d);
AQHOME_API uint32_t AQH_Message_ReadUint32At(const AQH_MESSAGE *msg, uint32_t pos, uint32_t defaultValue);
AQHOME_API void AQH_Message_WriteUint64At(AQH_MESSAGE *msg, uint32_t pos, uint64_t d);
AQHOME_API uint64_t AQH_Message_ReadUint64At(const AQH_MESSAGE *msg, uint32_t pos, uint64_t defaultValue);
AQHOME_API void AQH_Message_WriteBytesAt(AQH_MESSAGE *msg, uint32_t pos, const uint8_t *bufferPtr, uint32_t bufferLen);
AQHOME_API int AQH_Message_WriteStringAt(AQH_MESSAGE *msg, uint32_t pos, uint32_t maxSize, int filler, const char *s);
AQHOME_API int AQH_Message_WriteStringWithTrailingNullAt(AQH_MESSAGE *msg, uint32_t pos, uint32_t maxSize, int filler, const char *s);
#endif

37
aqhome/ipc2/message_p.h Normal file
View File

@@ -0,0 +1,37 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_MESSAGE_P_H
#define AQH_MESSAGE_P_H
#include "./message.h"
struct AQH_MESSAGE {
GWEN_INHERIT_ELEMENT(AQH_MESSAGE)
GWEN_LIST_ELEMENT(AQH_MESSAGE)
int refCount;
/* unparsed data */
uint8_t *msgPointer;
uint32_t msgSize;
uint32_t usedSize;
/* parsed header data */
int msgType;
int msgProtoId;
int msgProtoVer;
int msgCommand;
};
#endif

View File

@@ -18,14 +18,12 @@
#include <gwenhywfar/endianfns.h>
#include <sys/socket.h>
#include <time.h>
#define AQH_MSG_READER_MINMSGSIZE 12
#define AQH_MSG_READER_MAXMSGSIZE 10240
#define AQH_MSGREADER_SKIPTIME_IN_MS 20
enum {
AQH_MSGREADER_SLOT_SOCKETREADY=1
};
#define AQH_MSGREADER_FLAGS_SKIP 0x0001
@@ -39,13 +37,15 @@ GWEN_INHERIT(AQH_OBJECT, AQH_MSG_READER)
*/
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2);
static int _handleSocketReady(AQH_OBJECT *o);
static int _fillRingbuffer(AQH_OBJECT *o);
static int _readMsgFromRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo);
static int _readHeaderFromRingbuffer(AQH_MSG_READER *xo);
static int _readRemainderFromRingbuffer(AQH_MSG_READER *xo);
static int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject);
static int _fillRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo, AQH_OBJECT *fdObject);
static void _handleSkipping(AQH_OBJECT *o, AQH_OBJECT *fdObject);
static int _isStillSkipTime(AQH_MSG_READER *xo);
static uint64_t _getTimeInMilliSeconds(void);
static void _resetBuffers(AQH_MSG_READER *xo);
static void _cbEnable(AQH_OBJECT *o);
static void _cbDisable(AQH_OBJECT *o);
@@ -54,7 +54,7 @@ static int _readRemainderFromRingbuffer(AQH_MSG_READER *xo);
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_MsgReader_new(AQH_EVENT_LOOP *eventLoop, int fd)
AQH_OBJECT *AQH_MsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject)
{
AQH_OBJECT *o;
AQH_MSG_READER *xo;
@@ -62,15 +62,16 @@ AQH_OBJECT *AQH_MsgReader_new(AQH_EVENT_LOOP *eventLoop, int fd)
o=AQH_Object_new(eventLoop);
GWEN_NEW_OBJECT(AQH_MSG_READER, xo);
GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_MSG_READER, o, xo, _freeData);
xo->fdSocket=fd;
xo->ringBuffer=GWEN_RingBuffer_new(AQH_MSG_READER_RINGBUFFER_SIZE);
AQH_Object_SetSignalHandlerFn(o, _handleSignal);
AQH_Object_SetEnableFn(o, _cbEnable);
AQH_Object_SetDisableFn(o, _cbDisable);
/* create object for readable socket, connect to THIS, enable */
xo->fdObject=AQH_FdObject_new(AQH_Object_GetEventLoop(o), fd, AQH_FDOBJECT_FDMODE_READ);
AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_MSGREADER_SLOT_SOCKETREADY, o);
AQH_Object_Enable(xo->fdObject);
if (fdObject) {
xo->fdObject=fdObject;
AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_MSGREADER_SLOT_SOCKETREADY, o);
}
return o;
}
@@ -82,23 +83,77 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
AQH_MSG_READER *xo;
xo=(AQH_MSG_READER*) p;
if (xo->fdObject) {
AQH_Object_Disable(xo->fdObject);
AQH_Object_free(xo->fdObject);
}
free(xo->currentMsgBuf);
GWEN_FREE_OBJECT(xo);
}
uint32_t AQH_MsgReader_GetFlags(const AQH_OBJECT *o)
{
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
return xo?xo->flags:0;
}
void AQH_MsgReader_SetFlags(AQH_OBJECT *o, uint32_t f)
{
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo)
xo->flags=f;
}
void AQH_MsgReader_AddFlags(AQH_OBJECT *o, uint32_t f)
{
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo)
xo->flags|=f;
}
void AQH_MsgReader_SubFlags(AQH_OBJECT *o, uint32_t f)
{
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo)
xo->flags&=~f;
}
AQH_MSG_READER *AQH_MsgReader_GetData(const AQH_OBJECT *o)
{
if (o) {
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
return xo;
}
return NULL;
}
int _handleSignal(AQH_OBJECT *o,
uint32_t slotId,
GWEN_UNUSED AQH_OBJECT *senderObject,
AQH_OBJECT *senderObject,
GWEN_UNUSED int param1,
GWEN_UNUSED void *param2)
{
switch(slotId) {
case AQH_MSGREADER_SLOT_SOCKETREADY: return _handleSocketReady(o);
case AQH_MSGREADER_SLOT_SOCKETREADY: return _handleSocketReady(o, senderObject);
default:
break;
}
@@ -108,7 +163,36 @@ int _handleSignal(AQH_OBJECT *o,
int _handleSocketReady(AQH_OBJECT *o)
int AQH_MsgReader_ReadMsg(AQH_OBJECT *o)
{
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo)
return xo->readMsgFn(o);
return GWEN_ERROR_NOT_IMPLEMENTED;
}
AQH_MSG_READER_READMSG_FN AQH_MsgReader_SetReadMsgFn(AQH_OBJECT *o, AQH_MSG_READER_READMSG_FN f)
{
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo) {
AQH_MSG_READER_READMSG_FN oldFn;
oldFn=xo->readMsgFn;
xo->readMsgFn=f;
return oldFn;
}
return NULL;
}
int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject)
{
AQH_MSG_READER *xo;
@@ -116,22 +200,28 @@ int _handleSocketReady(AQH_OBJECT *o)
if (xo) {
int rv;
if (xo->flags & AQH_MSGREADER_FLAGS_SKIP) {
if (_isStillSkipTime(xo)) {
_handleSkipping(o, fdObject);
return 1;
}
}
/* read available data into ringbuffer */
rv=_fillRingbuffer(o);
rv=_fillRingbuffer(o, xo, fdObject);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_ERROR, rv, NULL);
xo->fdSocket=-1;
return 1;
if (rv!=GWEN_ERROR_TRY_AGAIN) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_ERROR, rv, NULL);
}
}
/* read messages from ring buffer until buffer empty */
do {
rv=_readMsgFromRingbuffer(o, xo);
rv=AQH_MsgReader_ReadMsg(o);
} while (rv==1);
if (rv<0) {
AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_ERROR, rv, NULL);
xo->fdSocket=-1;
}
return 1;
@@ -142,12 +232,66 @@ int _handleSocketReady(AQH_OBJECT *o)
int _fillRingbuffer(AQH_OBJECT *o)
void AQH_MsgReader_StartSkipping(AQH_OBJECT *o)
{
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo && xo->fdSocket!=-1) {
if (xo) {
DBG_ERROR(AQH_LOGDOMAIN, "Enter skip mode");
GWEN_RingBuffer_Reset(xo->ringBuffer);
_resetBuffers(xo);
xo->flags|=AQH_MSGREADER_FLAGS_SKIP;
xo->timestamp=_getTimeInMilliSeconds();
}
}
void _handleSkipping(AQH_OBJECT *o, AQH_OBJECT *fdObject)
{
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo) {
int rv;
rv=AQH_FdObject_FlushInput(fdObject);
if (rv<0) {
AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_ERROR, rv, NULL);
}
else if (rv>0)
xo->timestamp=_getTimeInMilliSeconds();
}
}
int _isStillSkipTime(AQH_MSG_READER *xo)
{
uint64_t currentTime;
uint64_t diffTime;
currentTime=_getTimeInMilliSeconds();
diffTime=currentTime-xo->timestamp;
if (diffTime>=AQH_MSGREADER_SKIPTIME_IN_MS) {
xo->flags&=~AQH_MSGREADER_FLAGS_SKIP;
GWEN_RingBuffer_Reset(xo->ringBuffer);
_resetBuffers(xo);
DBG_ERROR(AQH_LOGDOMAIN, "Leaving skip mode");
return 0;
}
else {
xo->timestamp=currentTime;
return 1;
}
}
int _fillRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo, AQH_OBJECT *fdObject)
{
while (1) {
uint32_t len;
/* read data into ringbuffer */
@@ -155,160 +299,124 @@ int _fillRingbuffer(AQH_OBJECT *o)
if (len>0) {
int rv;
rv=AQH_FdObject_Read(xo->fdObject, (uint8_t*) GWEN_RingBuffer_GetWritePointer(xo->ringBuffer), len);
rv=AQH_FdObject_Read(fdObject, (uint8_t*) GWEN_RingBuffer_GetWritePointer(xo->ringBuffer), len);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
xo->fdSocket=-1;
if (rv!=GWEN_ERROR_TRY_AGAIN) {
DBG_ERROR(AQH_LOGDOMAIN, "here (%d)", rv);
}
return rv;
}
else if (rv==0) {
DBG_INFO(AQH_LOGDOMAIN, "EOF met");
xo->fdSocket=-1;
AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_CLOSED, 0, NULL);
return 0;
}
else {
/* bytes received */
GWEN_RingBuffer_SkipBytesWrite(xo->ringBuffer, rv);
return rv;
//return rv;
}
}
else {
DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer full");
return GWEN_ERROR_BUFFER_OVERFLOW;
/*return GWEN_ERROR_BUFFER_OVERFLOW;*/
return 0;
}
}
}
int AQH_MsgReader_ReadRemainderFromRingbuffer(AQH_OBJECT *o)
{
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo) {
uint32_t bytesInRingBuffer;
uint32_t bytesToRead;
int rv;
bytesInRingBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer);
/* still reading header */
bytesToRead=xo->bytesLeft;
if (bytesInRingBuffer<bytesToRead)
bytesToRead=bytesInRingBuffer;
if (bytesToRead) {
uint32_t xferSize;
xferSize=bytesToRead;
rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->currentMsgBuf+xo->bytesReceived), &xferSize);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer empty");
return 0;
}
if (xferSize<bytesToRead) {
DBG_ERROR(AQH_LOGDOMAIN, "Read fewer bytes than available?");
return GWEN_ERROR_GENERIC;
}
xo->bytesReceived+=xferSize;
xo->bytesLeft-=xferSize;
if (xo->bytesLeft==0) {
/* msg finished */
DBG_INFO(AQH_LOGDOMAIN, "Message complete");
return 1;
}
}
return 0;
}
else {
DBG_INFO(AQH_LOGDOMAIN, "fd inactive (previous error or EOF?)");
DBG_ERROR(AQH_LOGDOMAIN, "Not a MSGREADER object");
return GWEN_ERROR_INVALID;
}
}
int _readMsgFromRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo)
uint64_t _getTimeInMilliSeconds(void)
{
int rv;
struct timespec t ;
if (xo->bytesReceived<AQH_MSG_READER_HEADERBUFFER_SIZE) {
rv=_readHeaderFromRingbuffer(xo);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
}
if (xo->bytesReceived>=AQH_MSG_READER_HEADERBUFFER_SIZE) {
/* reading remainder of msg directly into allocated buffer */
rv=_readRemainderFromRingbuffer(xo);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
else if (rv==1) {
int msgLen;
uint8_t *msgPtr;
msgLen=xo->bytesReceived;
msgPtr=xo->currentMsgBuf;
xo->bytesReceived=0;
xo->bytesLeft=0;
xo->currentMsgBuf=NULL;
rv=AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_MSGRECVD, msgLen, (void*) msgPtr);
if (rv==0) {
DBG_INFO(AQH_LOGDOMAIN, "Received message ignored");
free(msgPtr);
}
return 1;
}
}
return 0;
clock_gettime(CLOCK_REALTIME, &t);
return t.tv_sec*1000+(t.tv_nsec+500000)/1000000 ;
}
int _readHeaderFromRingbuffer(AQH_MSG_READER *xo)
void _resetBuffers(AQH_MSG_READER *xo)
{
uint32_t remaining;
int rv;
uint32_t xferSize;
uint32_t bytesInBuffer;
bytesInBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer);
/* still reading header */
remaining=AQH_MSG_READER_HEADERBUFFER_SIZE-xo->bytesReceived;
if (bytesInBuffer<remaining)
remaining=bytesInBuffer;
xferSize=remaining;
rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->headerBuffer+xo->bytesReceived), &xferSize);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
if (xferSize<remaining) {
DBG_ERROR(AQH_LOGDOMAIN, "Read fewer bytes than available?");
return GWEN_ERROR_GENERIC;
}
xo->bytesReceived+=xferSize;
if (xo->bytesReceived==AQH_MSG_READER_HEADERBUFFER_SIZE) {
uint32_t msgLen;
/* full size received, parse msg size, allocate buffer */
msgLen=GWEN_ENDIAN_LE32TOH(*((const uint32_t*)(xo->headerBuffer)));
if (msgLen<AQH_MSG_READER_MINMSGSIZE || msgLen>AQH_MSG_READER_MAXMSGSIZE) {
DBG_ERROR(AQH_LOGDOMAIN, "Bad message size(%lu)", (unsigned long int) msgLen);
return GWEN_ERROR_GENERIC;
}
xo->currentMsgBuf=(uint8_t*) malloc(msgLen+4);
memmove(xo->currentMsgBuf, xo->headerBuffer, xo->bytesReceived);
xo->bytesLeft=(msgLen+4)-xo->bytesReceived;
}
return 0;
free(xo->currentMsgBuf);
xo->currentMsgBuf=NULL;
xo->bytesReceived=0;
xo->bytesLeft=0;
}
int _readRemainderFromRingbuffer(AQH_MSG_READER *xo)
void _cbEnable(AQH_OBJECT *o)
{
uint32_t bytesInBuffer;
uint32_t bytesToRead;
int rv;
if (o && !(AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) {
AQH_MSG_READER *xo;
bytesInBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer);
/* still reading header */
bytesToRead=xo->bytesLeft;
if (bytesInBuffer<bytesToRead)
bytesToRead=bytesInBuffer;
if (bytesToRead) {
uint32_t xferSize;
xferSize=bytesToRead;
rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->currentMsgBuf+xo->bytesReceived), &xferSize);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
if (xferSize<bytesToRead) {
DBG_ERROR(AQH_LOGDOMAIN, "Read fewer bytes than available?");
return GWEN_ERROR_GENERIC;
}
xo->bytesReceived+=xferSize;
xo->bytesLeft-=xferSize;
if (xo->bytesLeft==0) {
/* msg finished */
DBG_INFO(AQH_LOGDOMAIN, "Message complete");
return 1;
}
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo && xo->fdObject)
AQH_Object_Enable(xo->fdObject);
}
return 0;
}
void _cbDisable(AQH_OBJECT *o)
{
if (o && (AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) {
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo && xo->fdObject)
AQH_Object_Disable(xo->fdObject);
}
}

View File

@@ -12,7 +12,6 @@
#include <aqhome/events2/object.h>
enum {
/** param1=msgSize, param2=msgPointer */
AQH_MSG_READER_SIGNAL_MSGRECVD=AQH_OBJECT_SIGNAL_LAST,
@@ -23,8 +22,10 @@ enum {
enum {
AQH_MSGREADER_SLOT_SOCKETREADY=1
};
AQH_OBJECT *AQH_MsgReader_new(AQH_EVENT_LOOP *eventLoop, int fd);

View File

@@ -15,22 +15,54 @@
#include <gwenhywfar/ringbuffer.h>
#define AQH_MSG_READER_RINGBUFFER_SIZE 1024
#define AQH_MSG_READER_HEADERBUFFER_SIZE 4
#define AQH_MSG_READER_HEADERBUFFER_SIZE 32
typedef struct AQH_MSG_READER AQH_MSG_READER;
/**
* Read data for a message from the internal ring buffer.
*
* @return 1 if something has been done, 0 if not, negative on error
* @param o object (THIS)
*/
typedef int (*AQH_MSG_READER_READMSG_FN)(AQH_OBJECT *o);
struct AQH_MSG_READER {
int fdSocket;
AQH_OBJECT *fdObject;
GWEN_RINGBUFFER *ringBuffer;
int bytesReceived;
int bytesLeft;
uint8_t headerBuffer[AQH_MSG_READER_HEADERBUFFER_SIZE];
uint8_t *currentMsgBuf;
uint32_t flags;
uint64_t timestamp;
AQH_OBJECT *fdObject;
AQH_MSG_READER_READMSG_FN readMsgFn;
};
AQH_OBJECT *AQH_MsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject);
AQH_MSG_READER *AQH_MsgReader_GetData(const AQH_OBJECT *o);
int AQH_MsgReader_ReadRemainderFromRingbuffer(AQH_OBJECT *o);
int AQH_MsgReader_ReadMsg(AQH_OBJECT *o);
AQH_MSG_READER_READMSG_FN AQH_MsgReader_SetReadMsgFn(AQH_OBJECT *o, AQH_MSG_READER_READMSG_FN f);
uint32_t AQH_MsgReader_GetFlags(const AQH_OBJECT *o);
void AQH_MsgReader_SetFlags(AQH_OBJECT *o, uint32_t f);
void AQH_MsgReader_AddFlags(AQH_OBJECT *o, uint32_t f);
void AQH_MsgReader_SubFlags(AQH_OBJECT *o, uint32_t f);
void AQH_MsgReader_StartSkipping(AQH_OBJECT *o);

240
aqhome/ipc2/msgwriter.c Normal file
View File

@@ -0,0 +1,240 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./msgwriter_p.h"
#include <aqhome/events2/fdobject.h>
#include <gwenhywfar/inherit.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endianfns.h>
#include <sys/socket.h>
#define AQH_MSGWRITER_FLAGS_MSGSTARTED 0x0001
GWEN_INHERIT(AQH_OBJECT, AQH_MSG_WRITER)
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2);
static int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject);
static int _startMsg(AQH_MSG_WRITER *xo, AQH_OBJECT *fdObject);
static void _endMsg(AQH_MSG_WRITER *xo, AQH_OBJECT *fdObject);
static void _resetBuffer(AQH_OBJECT *o);
static void _cbEnable(AQH_OBJECT *o);
static void _cbDisable(AQH_OBJECT *o);
/* ------------------------------------------------------------------------------------------------
* implementation
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_MsgWriter_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject)
{
AQH_OBJECT *o;
AQH_MSG_WRITER *xo;
o=AQH_Object_new(eventLoop);
GWEN_NEW_OBJECT(AQH_MSG_WRITER, xo);
GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_MSG_WRITER, o, xo, _freeData);
AQH_Object_SetSignalHandlerFn(o, _handleSignal);
AQH_Object_SetEnableFn(o, _cbEnable);
AQH_Object_SetDisableFn(o, _cbDisable);
if (fdObject) {
xo->fdObject=fdObject;
AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_MSGWRITER_SLOT_SOCKETREADY, o);
}
return o;
}
void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
{
AQH_MSG_WRITER *xo;
xo=(AQH_MSG_WRITER*) p;
GWEN_FREE_OBJECT(xo);
}
void AQH_MsgWriter_SendMsg(AQH_OBJECT *o, const uint8_t *ptr, int len)
{
if (o) {
AQH_MSG_WRITER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o);
if (xo) {
_resetBuffer(o);
if (ptr && len) {
xo->msgBufPtr=ptr;
xo->msgBufLen=len;
}
xo->bytesLeft=xo->msgBufLen;
xo->currentPtr=xo->msgBufPtr;
xo->flags&=~AQH_MSGWRITER_FLAGS_MSGSTARTED;
}
}
}
int _handleSignal(AQH_OBJECT *o,
uint32_t slotId,
GWEN_UNUSED AQH_OBJECT *senderObject,
GWEN_UNUSED int param1,
GWEN_UNUSED void *param2)
{
switch(slotId) {
case AQH_MSGWRITER_SLOT_SOCKETREADY: return _handleSocketReady(o, senderObject);
default:
break;
}
return 0; /* not handled */
}
int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject)
{
AQH_MSG_WRITER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o);
if (xo) {
if (xo->bytesLeft) {
int rv;
if (!(xo->flags & AQH_MSGWRITER_FLAGS_MSGSTARTED)) {
rv=_startMsg(xo, fdObject);
if (rv<0) {
if (rv==GWEN_ERROR_TRY_AGAIN) {
/* line is busy */
}
else {
_endMsg(xo, fdObject);
AQH_Object_EmitSignal(o, AQH_MSG_WRITER_SIGNAL_ERROR, rv, NULL);
}
return 1;
}
}
do {
rv=AQH_FdObject_Write(fdObject, xo->currentPtr, xo->bytesLeft);
if (rv>0) {
xo->currentPtr+=rv;
xo->bytesLeft-=rv;
}
} while (rv>0 && xo->bytesLeft>0);
if (rv<0 && rv!=GWEN_ERROR_TRY_AGAIN) {
_endMsg(xo, fdObject);
AQH_Object_EmitSignal(o, AQH_MSG_WRITER_SIGNAL_ERROR, rv, NULL);
}
else {
if (xo->bytesLeft==0) {
_endMsg(xo, fdObject);
rv=AQH_Object_EmitSignal(o, AQH_MSG_WRITER_SIGNAL_MSGSENT, xo->msgBufLen, (void*) (xo->msgBufPtr));
if (rv==0) {
DBG_INFO(AQH_LOGDOMAIN, "Sent message ignored");
}
_resetBuffer(o);
}
}
}
return 1;
}
return 0;
}
int _startMsg(AQH_MSG_WRITER *xo, AQH_OBJECT *fdObject)
{
int rv;
rv=AQH_FdObject_StartMsg(fdObject);
if (rv<0) {
/* line is busy */
return rv;
}
xo->flags|=AQH_MSGWRITER_FLAGS_MSGSTARTED;
return 0;
}
void _endMsg(AQH_MSG_WRITER *xo, AQH_OBJECT *fdObject)
{
xo->flags&=~AQH_MSGWRITER_FLAGS_MSGSTARTED;
AQH_FdObject_EndMsg(fdObject);
}
void _resetBuffer(AQH_OBJECT *o)
{
AQH_MSG_WRITER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o);
if (xo) {
xo->msgBufPtr=NULL;
xo->msgBufLen=0;
xo->currentPtr=NULL;
xo->bytesLeft=0;
}
}
void _cbEnable(AQH_OBJECT *o)
{
if (o && !(AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) {
AQH_MSG_WRITER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o);
if (xo && xo->fdObject)
AQH_Object_Enable(xo->fdObject);
}
}
void _cbDisable(AQH_OBJECT *o)
{
if (o && (AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) {
AQH_MSG_WRITER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o);
if (xo && xo->fdObject)
AQH_Object_Disable(xo->fdObject);
}
}

38
aqhome/ipc2/msgwriter.h Normal file
View File

@@ -0,0 +1,38 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_MSGWRITER_H
#define AQH_MSGWRITER_H
#include <aqhome/events2/object.h>
enum {
/** param1=msgSize, param2=msgPointer */
AQH_MSG_WRITER_SIGNAL_MSGSENT=AQH_OBJECT_SIGNAL_LAST,
/** param1: error code */
AQH_MSG_WRITER_SIGNAL_ERROR,
AQH_MSG_WRITER_SIGNAL_CLOSED
};
enum {
AQH_MSGWRITER_SLOT_SOCKETREADY=1
};
AQH_OBJECT *AQH_MsgWriter_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject);
void AQH_MsgWriter_SendMsg(AQH_OBJECT *o, const uint8_t *ptr, int len);
#endif

34
aqhome/ipc2/msgwriter_p.h Normal file
View File

@@ -0,0 +1,34 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_MSGWRITER_P_H
#define AQH_MSGWRITER_P_H
#include "./msgwriter.h"
typedef struct AQH_MSG_WRITER AQH_MSG_WRITER;
struct AQH_MSG_WRITER {
int msgBufLen;
const uint8_t *msgBufPtr;
int bytesLeft;
const uint8_t *currentPtr;
uint32_t flags;
AQH_OBJECT *fdObject;
};
#endif

236
aqhome/ipc2/nodeendpoint.c Normal file
View File

@@ -0,0 +1,236 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./nodeendpoint_p.h"
#include "aqhome/ipc2/msgreader.h"
#include "aqhome/ipc2/msgwriter.h"
#include "aqhome/msg/node/m_node.h"
#include "aqhome/msg/node/m_device.h"
#include "aqhome/msg/node/m_recvstats.h"
#include "aqhome/msg/node/m_sendstats.h"
#include <gwenhywfar/debug.h>
#include <gwenhywfar/text.h>
GWEN_INHERIT(AQH_OBJECT, AQH_NODE_ENDPOINT)
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2);
static int _handleMsgRecvd(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr);
static int _handleMsgSent(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr);
static void _dumpMsg(const AQH_MESSAGE *msg, const char *sText);
/* ------------------------------------------------------------------------------------------------
* implementation
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_NodeEndpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter)
{
AQH_OBJECT *o;
AQH_NODE_ENDPOINT *xo;
o=AQH_Object_new(eventLoop);
GWEN_NEW_OBJECT(AQH_NODE_ENDPOINT, xo);
GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o, xo, _freeData);
xo->msgOutList=AQH_Message_List_new();
xo->msgInList=AQH_Message_List_new();
AQH_Object_SetSignalHandlerFn(o, _handleSignal);
if (msgReader) {
xo->msgReader=msgReader;
AQH_Object_AddLink(msgReader, AQH_MSG_READER_SIGNAL_MSGRECVD, AQH_NODE_ENDPOINT_SLOT_MSG_RECVD, o);
}
if (msgWriter) {
xo->msgWriter=msgWriter;
AQH_Object_AddLink(msgWriter, AQH_MSG_WRITER_SIGNAL_MSGSENT, AQH_NODE_ENDPOINT_SLOT_MSG_SENT, o);
}
return o;
}
void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
{
AQH_NODE_ENDPOINT *xo;
xo=(AQH_NODE_ENDPOINT*) p;
AQH_Message_List_free(xo->msgOutList);
AQH_Message_List_free(xo->msgInList);
AQH_Object_free(xo->msgWriter);
AQH_Object_free(xo->msgReader);
GWEN_FREE_OBJECT(xo);
}
AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgOutList(const AQH_OBJECT *o)
{
if (o) {
AQH_NODE_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
if (xo)
return xo->msgOutList;
}
return NULL;
}
AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgOut(AQH_OBJECT *o)
{
if (o) {
AQH_NODE_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
if (xo)
return AQH_Message_List_First(xo->msgOutList);
}
return NULL;
}
void AQH_NodeEndpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg)
{
if (o && msg) {
AQH_NODE_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
if (xo) {
AQH_Message_List_Add(msg, xo->msgOutList);
if (xo->msgWriter && AQH_Message_List_GetCount(xo->msgOutList)==1) {
AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg));
AQH_Object_Enable(xo->msgWriter);
}
}
}
}
AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgInList(const AQH_OBJECT *o)
{
if (o) {
AQH_NODE_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
if (xo)
return xo->msgInList;
}
return NULL;
}
AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgIn(AQH_OBJECT *o)
{
if (o) {
AQH_NODE_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
if (xo)
return AQH_Message_List_First(xo->msgInList);
}
return NULL;
}
void AQH_NodeEndpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg)
{
if (o && msg) {
AQH_NODE_ENDPOINT *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_ENDPOINT, o);
if (xo) {
AQH_Message_List_Add(msg, xo->msgInList);
DBG_ERROR(AQH_LOGDOMAIN, "now %d msgs in list", AQH_Message_List_GetCount(xo->msgInList));
}
}
}
int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2)
{
switch(slotId) {
case AQH_NODE_ENDPOINT_SLOT_MSG_RECVD: return _handleMsgRecvd(o, senderObject, param1, param2);
case AQH_NODE_ENDPOINT_SLOT_MSG_SENT: return _handleMsgSent(o, senderObject, param1, param2);
default:
break;
}
return 0; /* not handled */
}
int _handleMsgRecvd(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr)
{
AQH_MESSAGE *msg;
DBG_ERROR(AQH_LOGDOMAIN, "Msg received:");
msg=AQH_NodeMessage_fromBuffer(msgPtr, msgLen);
_dumpMsg(msg, "Received");
AQH_NodeEndpoint_AddMsgIn(o, msg);
return 1;
}
int _handleMsgSent(AQH_OBJECT *o, AQH_OBJECT *senderObject, int msgLen, const uint8_t *msgPtr)
{
DBG_ERROR(AQH_LOGDOMAIN, "Msg sent:");
GWEN_Text_LogString((const char*) msgPtr, msgLen, AQH_LOGDOMAIN, GWEN_LoggerLevel_Error);
return 1;
}
void _dumpMsg(const AQH_MESSAGE *msg, const char *sText)
{
if (msg) {
GWEN_BUFFER *mbuf;
mbuf=GWEN_Buffer_new(0, 256, 0, 1);
switch(AQH_NodeMessage_GetMsgType(msg)) {
case AQH_MSG_TYPE_DEVICE: AQH_DeviceMessage_DumpToBuffer(msg, mbuf, sText); break;
case AQH_MSG_TYPE_COMRECVSTATS: AQH_RecvStatsMessage_DumpToBuffer(msg, mbuf, sText); break;
case AQH_MSG_TYPE_COMSENDSTATS: AQH_SendStatsMessage_DumpToBuffer(msg, mbuf, sText); break;
default: AQH_NodeMessage_DumpToBuffer(msg, mbuf, sText); break;
}
DBG_ERROR(AQH_LOGDOMAIN, "%s", GWEN_Buffer_GetStart(mbuf));
GWEN_Buffer_free(mbuf);
}
}

View File

@@ -0,0 +1,37 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_NODEENDPOINT_H
#define AQH_NODEENDPOINT_H
#include <aqhome/events2/object.h>
#include <aqhome/ipc2/message.h>
enum {
AQH_NODE_ENDPOINT_SLOT_MSG_RECVD=1,
AQH_NODE_ENDPOINT_SLOT_MSG_SENT
};
AQHOME_API AQH_OBJECT *AQH_NodeEndpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter);
AQHOME_API AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgOutList(const AQH_OBJECT *o);
AQHOME_API AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgOut(AQH_OBJECT *o);
AQHOME_API void AQH_NodeEndpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg);
AQHOME_API AQH_MESSAGE_LIST *AQH_NodeEndpoint_GetMsgInList(const AQH_OBJECT *o);
AQHOME_API AQH_MESSAGE *AQH_NodeEndpoint_GetNextMsgIn(AQH_OBJECT *o);
AQHOME_API void AQH_NodeEndpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg);
#endif

View File

@@ -0,0 +1,27 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_NODEENDPOINT_P_H
#define AQH_NODEENDPOINT_P_H
#include "./nodeendpoint.h"
typedef struct AQH_NODE_ENDPOINT AQH_NODE_ENDPOINT;
struct AQH_NODE_ENDPOINT {
AQH_MESSAGE_LIST *msgOutList;
AQH_MESSAGE_LIST *msgInList;
AQH_OBJECT *msgWriter;
AQH_OBJECT *msgReader;
};
#endif

185
aqhome/ipc2/nodemsgreader.c Normal file
View File

@@ -0,0 +1,185 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./nodemsgreader.h"
#include "./msgreader_p.h"
#include <aqhome/events2/fdobject.h>
#include <gwenhywfar/inherit.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endianfns.h>
#include <sys/socket.h>
/* ------------------------------------------------------------------------------------------------
* definitions
* ------------------------------------------------------------------------------------------------
*/
#define AQH_MSG_READER_HEADER_SIZE 2
#define AQH_MSG_READER_MAXMSGSIZE 32
#define AQH_MSG_READER_FLAG_SKIPPING 0x0001
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static int _readMsg(AQH_OBJECT *o);
static int _readHeaderFromRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo);
static uint8_t _calcCrc8Checksum(const uint8_t *ptr, uint8_t len);
/* ------------------------------------------------------------------------------------------------
* implementation
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_NodeMsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject)
{
AQH_OBJECT *o;
o=AQH_MsgReader_new(eventLoop, fdObject);
AQH_MsgReader_SetReadMsgFn(o, _readMsg);
return o;
}
int _readMsg(AQH_OBJECT *o)
{
AQH_MSG_READER *xo;
xo=AQH_MsgReader_GetData(o);
if (xo) {
int rv;
if (xo->bytesReceived<AQH_MSG_READER_HEADER_SIZE) {
rv=_readHeaderFromRingbuffer(o, xo);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
}
if (xo->bytesReceived>=AQH_MSG_READER_HEADER_SIZE) {
/* reading remainder of msg directly into allocated buffer */
rv=AQH_MsgReader_ReadRemainderFromRingbuffer(o);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
else if (rv==1) {
int msgLen;
uint8_t *msgPtr;
msgLen=xo->bytesReceived;
msgPtr=xo->currentMsgBuf;
xo->bytesReceived=0;
xo->bytesLeft=0;
xo->currentMsgBuf=NULL;
if (_calcCrc8Checksum(msgPtr, msgLen)==0) {
rv=AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_MSGRECVD, msgLen, (void*) msgPtr);
if (rv==0) {
DBG_INFO(AQH_LOGDOMAIN, "Received message ignored");
}
}
else {
DBG_INFO(AQH_LOGDOMAIN, "Bad CRC");
AQH_MsgReader_StartSkipping(o);
}
free(msgPtr);
return 1;
}
}
return 0;
}
return GWEN_ERROR_GENERIC;
}
int _readHeaderFromRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo)
{
uint32_t remaining;
int rv;
uint32_t xferSize;
uint32_t bytesInBuffer;
bytesInBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer);
/* still reading header */
remaining=AQH_MSG_READER_HEADER_SIZE-xo->bytesReceived;
if (bytesInBuffer<remaining)
remaining=bytesInBuffer;
xferSize=remaining;
rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->headerBuffer+xo->bytesReceived), &xferSize);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer empty");
return 0;
}
if (xferSize<remaining) {
DBG_ERROR(AQH_LOGDOMAIN, "Read fewer bytes than available?");
return GWEN_ERROR_GENERIC;
}
xo->bytesReceived+=xferSize;
if (xo->bytesReceived==AQH_MSG_READER_HEADER_SIZE) {
uint32_t msgLen;
/* full size received, parse msg size, allocate buffer */
msgLen=xo->headerBuffer[1];
if (msgLen>AQH_MSG_READER_MAXMSGSIZE) {
DBG_ERROR(AQH_LOGDOMAIN, "Bad message size(%lu)", (unsigned long int) msgLen);
AQH_MsgReader_StartSkipping(o);
return GWEN_ERROR_GENERIC;
}
xo->currentMsgBuf=(uint8_t*) malloc(msgLen+3); /* +3 (dest addr, msg len, crc) */
memmove(xo->currentMsgBuf, xo->headerBuffer, xo->bytesReceived);
xo->bytesLeft=(msgLen+3)-xo->bytesReceived;
}
return 0;
}
uint8_t _calcCrc8Checksum(const uint8_t *ptr, uint8_t len)
{
int i;
uint8_t x=0xff;
for (i=0; i<len; i++, ptr++) {
int j;
x^=*ptr;
for (j=0; j<8; j++) {
if (x & 0x80)
x=(uint8_t) (x<<1)^0x97;
else
x<<=1;
}
}
return x;
}

View File

@@ -0,0 +1,20 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_NODEMSGREADER_H
#define AQH_NODEMSGREADER_H
#include <aqhome/events2/object.h>
AQHOME_API AQH_OBJECT *AQH_NodeMsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject);
#endif

230
aqhome/ipc2/ttyobject.c Normal file
View File

@@ -0,0 +1,230 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./ttyobject.h"
#include <aqhome/events2/fdobject.h>
#include <gwenhywfar/debug.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/ioctl.h>
#include <unistd.h>
#define AQH_TTYOBJECT_BAUDRATE B19200
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static int _startMsg(AQH_OBJECT *o);
static void _endMsg(AQH_OBJECT *o);
static int _getAttn(int fd);
static int _setAttn(int fd, int val);
static int _fdSetBlocking(int sk, int fl);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_TtyObject_new(AQH_EVENT_LOOP *eventLoop, int fd, int fdMode)
{
AQH_OBJECT *o;
o=AQH_FdObject_new(eventLoop, fd, fdMode);
AQH_FdObject_SetStartMsgFn(o, _startMsg);
AQH_FdObject_SetEndMsgFn(o, _endMsg);
return o;
}
int _startMsg(AQH_OBJECT *o)
{
if (o) {
int fd;
int rv;
fd=AQH_FdObject_GetFd(o);
if (fd==-1)
return GWEN_ERROR_IO;
rv=_getAttn(fd);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
else if (rv==0) {
return GWEN_ERROR_TRY_AGAIN; /* line busy */
}
else {
_setAttn(fd, 0); /* set ATTN low */
return 0;
}
}
else
return GWEN_ERROR_INVALID;
}
void _endMsg(AQH_OBJECT *o)
{
if (o) {
int fd;
fd=AQH_FdObject_GetFd(o);
if (fd>=0)
_setAttn(fd, 1); /* set ATTN high */
}
}
int _getAttn(int fd)
{
int rv;
int status;
rv=ioctl(fd, TIOCMGET, &status);
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Error on ioctl: %s (%d)", strerror(errno), errno);
return GWEN_ERROR_IO;
}
return (status & TIOCM_CTS)?0:1;
}
int _setAttn(int fd, int val)
{
int rv;
int status;
rv=ioctl(fd, TIOCMGET, &status);
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Error on ioctl: %s (%d)", strerror(errno), errno);
return GWEN_ERROR_IO;
}
if (val)
status &= ~ (TIOCM_DTR | TIOCM_RTS); /* attn high, clear the DTR pin (cave: signals inverted!) */
else
status |= TIOCM_DTR | TIOCM_RTS; /* attn low, set the DTR pin (cave: signals inverted!) */
rv=ioctl(fd, TIOCMSET, &status);
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Error on ioctl: %s (%d)", strerror(errno), errno);
return GWEN_ERROR_IO;
}
return 0;
}
int AQH_TtyObject_OpenAndInitDevice(const char *device, struct termios *initialTermiosState)
{
int fd;
struct termios options;
int rv;
fd=open(device, O_NOCTTY | O_NDELAY | O_RDWR);
if (fd<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Error on open(%s): %s (%d)", device, strerror(errno), errno);
return GWEN_ERROR_IO;
}
DBG_INFO(AQH_LOGDOMAIN, "Device %s open (socket %d)", device, fd);
_fdSetBlocking(fd, 0);
rv=tcgetattr(fd, &options);
if (initialTermiosState)
*initialTermiosState=options;
options.c_cflag=CLOCAL | CREAD | CS8;
options.c_iflag=IGNPAR | IGNBRK;
options.c_oflag=0;
options.c_lflag&= ~(ICANON | ECHO | ECHOE | ISIG);
options.c_cc[VTIME]=0; /* read timeout in deciseconds */
options.c_cc[VMIN]=0; /* no minimum number of receive bytes */
cfmakeraw(&options);
rv=cfsetispeed(&options, AQH_TTYOBJECT_BAUDRATE);
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Error on cfsetispeed(%s): %s (%d)", device, strerror(errno), errno);
return GWEN_ERROR_IO;
}
rv=cfsetospeed(&options, AQH_TTYOBJECT_BAUDRATE);
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Error on cfsetospeed(%s): %s (%d)", device, strerror(errno), errno);
return GWEN_ERROR_IO;
}
rv=tcflush(fd, TCIOFLUSH);
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Error on tcflush(%s): %s (%d)", device, strerror(errno), errno);
return GWEN_ERROR_IO;
}
rv=tcsetattr(fd, TCSANOW, &options);
if (rv<0) {
DBG_ERROR(AQH_LOGDOMAIN, "Error on tcsetattr(%s): %s (%d)", device, strerror(errno), errno);
return GWEN_ERROR_IO;
}
return fd;
}
int _fdSetBlocking(int fd, int fl)
{
int prevFlags;
int newFlags;
/* get current socket flags */
prevFlags=fcntl(fd, F_GETFL);
if (prevFlags==-1) {
DBG_INFO(AQH_LOGDOMAIN, "fcntl(): %s", strerror(errno));
return GWEN_ERROR_IO;
}
/* set nonblocking/blocking */
if (fl)
newFlags=prevFlags&(~O_NONBLOCK);
else
newFlags=prevFlags|O_NONBLOCK;
if (-1==fcntl(fd, F_SETFL, newFlags)) {
DBG_INFO(AQH_LOGDOMAIN, "fcntl(): %s", strerror(errno));
return GWEN_ERROR_IO;
}
prevFlags=fcntl(fd, F_GETFL);
if (prevFlags!=newFlags) {
DBG_ERROR(AQH_LOGDOMAIN, "fcntl() did not set flags correctly (%08x!=%08x)", prevFlags, newFlags);
return GWEN_ERROR_IO;
}
return 0;
}

38
aqhome/ipc2/ttyobject.h Normal file
View File

@@ -0,0 +1,38 @@
/****************************************************************************
* This file is part of the project Gwenhywfar.
* Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_TTYOBJECT_H
#define AQH_TTYOBJECT_H
#include <aqhome/events2/object.h>
#include <termios.h>
/**
* Create Tty object for given filedescriptor and in given mode (read or write)
*
* @return object created
* @param eventLoop event loop to assign the new object to
* @param fd filedescriptor (e.g. created by calls to open or socket)
* @param fdMode AQH_FDOBJECT_FDMODE_READ or AQH_FDOBJECT_FDMODE_WRITE
*/
AQHOME_API AQH_OBJECT *AQH_TtyObject_new(AQH_EVENT_LOOP *eventLoop, int fd, int fdMode);
/**
* Open given device and setup TTY parameters for it.
*
* @return filedescriptor for the openened and initialized device
* @param device path to device to open (e.g. "/dev/ttyUSB0")
* @param initialTermiosState var to store initial state of the device (if given)
*/
AQHOME_API int AQH_TtyObject_OpenAndInitDevice(const char *device, struct termios *initialTermiosState);
#endif