/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./msgreader_p.h" #include #include #include #include #include #include #define AQH_MSGREADER_SKIPTIME_IN_MS 20 #define AQH_MSGREADER_FLAGS_SKIP 0x80000000 GWEN_INHERIT(AQH_OBJECT, AQH_MSG_READER) /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); static int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject); static int _fillRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo, AQH_OBJECT *fdObject); static void _handleSkipping(AQH_OBJECT *o, AQH_OBJECT *fdObject); static int _isStillSkipTime(AQH_MSG_READER *xo); static uint64_t _getTimeInMilliSeconds(void); static void _resetBuffers(AQH_MSG_READER *xo); static void _cbEnable(AQH_OBJECT *o); static void _cbDisable(AQH_OBJECT *o); /* ------------------------------------------------------------------------------------------------ * implementation * ------------------------------------------------------------------------------------------------ */ AQH_OBJECT *AQH_MsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject) { AQH_OBJECT *o; AQH_MSG_READER *xo; o=AQH_Object_new(eventLoop); GWEN_NEW_OBJECT(AQH_MSG_READER, xo); GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_MSG_READER, o, xo, _freeData); xo->ringBuffer=GWEN_RingBuffer_new(AQH_MSG_READER_RINGBUFFER_SIZE); AQH_Object_SetSignalHandlerFn(o, _handleSignal); AQH_Object_SetEnableFn(o, _cbEnable); AQH_Object_SetDisableFn(o, _cbDisable); if (fdObject) { xo->fdObject=fdObject; AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_MSGREADER_SLOT_SOCKETREADY, o); } return o; } void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) { AQH_MSG_READER *xo; xo=(AQH_MSG_READER*) p; free(xo->currentMsgBuf); if (xo->fdObject) { AQH_Object_Disable(xo->fdObject); AQH_Object_free(xo->fdObject); xo->fdObject=NULL; } GWEN_FREE_OBJECT(xo); } uint32_t AQH_MsgReader_GetFlags(const AQH_OBJECT *o) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); return xo?xo->flags:0; } void AQH_MsgReader_SetFlags(AQH_OBJECT *o, uint32_t f) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) { xo->flags=f; } } void AQH_MsgReader_AddFlags(AQH_OBJECT *o, uint32_t f) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) { xo->flags|=f; } } void AQH_MsgReader_SubFlags(AQH_OBJECT *o, uint32_t f) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) { xo->flags&=~f; } } AQH_MSG_READER *AQH_MsgReader_GetData(const AQH_OBJECT *o) { if (o) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); return xo; } return NULL; } int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, GWEN_UNUSED int param1, GWEN_UNUSED void *param2) { switch(slotId) { case AQH_MSGREADER_SLOT_SOCKETREADY: return _handleSocketReady(o, senderObject); default: break; } return 0; /* not handled */ } int AQH_MsgReader_ReadMsg(AQH_OBJECT *o) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) return xo->readMsgFn(o); return GWEN_ERROR_NOT_IMPLEMENTED; } AQH_MSG_READER_READMSG_FN AQH_MsgReader_SetReadMsgFn(AQH_OBJECT *o, AQH_MSG_READER_READMSG_FN f) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) { AQH_MSG_READER_READMSG_FN oldFn; oldFn=xo->readMsgFn; xo->readMsgFn=f; return oldFn; } return NULL; } int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject) { AQH_MSG_READER *xo; DBG_DEBUG(AQH_LOGDOMAIN, "Socket ready"); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) { int rv; if (xo->flags & AQH_MSGREADER_FLAGS_SKIP) { if (_isStillSkipTime(xo)) { _handleSkipping(o, fdObject); return 1; } } /* read available data into ringbuffer */ rv=_fillRingbuffer(o, xo, fdObject); if (rv<0) { if (rv!=GWEN_ERROR_TRY_AGAIN) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_ERROR, rv, NULL); } } /* read messages from ring buffer until buffer empty */ do { rv=AQH_MsgReader_ReadMsg(o); } while (rv==1); if (rv<0) { AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_ERROR, rv, NULL); } return 1; } return 0; } void AQH_MsgReader_StartSkipping(AQH_OBJECT *o) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) { DBG_ERROR(AQH_LOGDOMAIN, "Enter skip mode"); GWEN_RingBuffer_Reset(xo->ringBuffer); _resetBuffers(xo); xo->flags|=AQH_MSGREADER_FLAGS_SKIP; xo->timestamp=_getTimeInMilliSeconds(); } } void _handleSkipping(AQH_OBJECT *o, AQH_OBJECT *fdObject) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) { int rv; rv=AQH_FdObject_FlushInput(fdObject); if (rv<0) { AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_ERROR, rv, NULL); } else if (rv>0) xo->timestamp=_getTimeInMilliSeconds(); } } int _isStillSkipTime(AQH_MSG_READER *xo) { uint64_t currentTime; uint64_t diffTime; currentTime=_getTimeInMilliSeconds(); diffTime=currentTime-xo->timestamp; if (diffTime>=AQH_MSGREADER_SKIPTIME_IN_MS) { xo->flags&=~AQH_MSGREADER_FLAGS_SKIP; GWEN_RingBuffer_Reset(xo->ringBuffer); _resetBuffers(xo); DBG_ERROR(AQH_LOGDOMAIN, "Leaving skip mode"); return 0; } else { xo->timestamp=currentTime; return 1; } } int _fillRingbuffer(AQH_OBJECT *o, AQH_MSG_READER *xo, AQH_OBJECT *fdObject) { while (1) { uint32_t len; /* read data into ringbuffer */ len=GWEN_RingBuffer_GetMaxUnsegmentedWrite(xo->ringBuffer); if (len>0) { int rv; rv=AQH_FdObject_Read(fdObject, (uint8_t*) GWEN_RingBuffer_GetWritePointer(xo->ringBuffer), len); if (rv<0) { if (rv!=GWEN_ERROR_TRY_AGAIN) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); } return rv; } else if (rv==0) { DBG_INFO(AQH_LOGDOMAIN, "EOF met"); AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_CLOSED, 0, NULL); return 0; } else { /* bytes received */ GWEN_RingBuffer_SkipBytesWrite(xo->ringBuffer, rv); //return rv; } } else { DBG_DEBUG(AQH_LOGDOMAIN, "Ringbuffer full"); return 0; } } } int AQH_MsgReader_ReadRemainderFromRingbuffer(AQH_OBJECT *o) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) { if (xo->bytesLeft==0) { /* msg finished */ DBG_DEBUG(AQH_LOGDOMAIN, "Message complete"); return 1; } else { uint32_t bytesInRingBuffer; uint32_t bytesToRead; int rv; bytesInRingBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer); /* still reading header */ bytesToRead=xo->bytesLeft; if (bytesInRingBufferringBuffer, (char*) (xo->currentMsgBuf+xo->bytesReceived), &xferSize); if (rv<0) { DBG_DEBUG(AQH_LOGDOMAIN, "Ringbuffer empty"); return 0; } if (xferSizebytesReceived+=xferSize; xo->bytesLeft-=xferSize; if (xo->bytesLeft==0) { /* msg finished */ DBG_DEBUG(AQH_LOGDOMAIN, "Message complete"); return 1; } } else { DBG_ERROR(AQH_LOGDOMAIN, "Nothing to read??"); } } return 0; } else { DBG_ERROR(AQH_LOGDOMAIN, "Not a MSGREADER object"); return GWEN_ERROR_INVALID; } } uint64_t _getTimeInMilliSeconds(void) { struct timespec t ; clock_gettime(CLOCK_REALTIME, &t); return t.tv_sec*1000+(t.tv_nsec+500000)/1000000 ; } void _resetBuffers(AQH_MSG_READER *xo) { free(xo->currentMsgBuf); xo->currentMsgBuf=NULL; xo->bytesReceived=0; xo->bytesLeft=0; } void _cbEnable(AQH_OBJECT *o) { if (o) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo && xo->fdObject) AQH_Object_Enable(xo->fdObject); } } void _cbDisable(AQH_OBJECT *o) { if (o) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo && xo->fdObject) AQH_Object_Disable(xo->fdObject); } }