/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./msgwriter_p.h" #include #include #include #include #include #define AQH_MSGWRITER_FLAGS_MSGSTARTED 0x80000000 GWEN_INHERIT(AQH_OBJECT, AQH_MSG_WRITER) /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); static int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject); static int _startMsg(AQH_MSG_WRITER *xo, AQH_OBJECT *fdObject); static void _endMsg(AQH_MSG_WRITER *xo, AQH_OBJECT *fdObject); static void _resetBuffer(AQH_OBJECT *o); static void _cbEnable(AQH_OBJECT *o); static void _cbDisable(AQH_OBJECT *o); /* ------------------------------------------------------------------------------------------------ * implementation * ------------------------------------------------------------------------------------------------ */ AQH_OBJECT *AQH_MsgWriter_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject) { AQH_OBJECT *o; AQH_MSG_WRITER *xo; o=AQH_Object_new(eventLoop); GWEN_NEW_OBJECT(AQH_MSG_WRITER, xo); GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_MSG_WRITER, o, xo, _freeData); AQH_Object_SetSignalHandlerFn(o, _handleSignal); AQH_Object_SetEnableFn(o, _cbEnable); AQH_Object_SetDisableFn(o, _cbDisable); if (fdObject) { xo->fdObject=fdObject; AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_MSGWRITER_SLOT_SOCKETREADY, o); } return o; } void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) { AQH_MSG_WRITER *xo; xo=(AQH_MSG_WRITER*) p; if (xo->fdObject) { AQH_Object_Disable(xo->fdObject); AQH_Object_free(xo->fdObject); xo->fdObject=NULL; } GWEN_FREE_OBJECT(xo); } void AQH_MsgWriter_SendMsg(AQH_OBJECT *o, const uint8_t *ptr, int len) { if (o) { AQH_MSG_WRITER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o); if (xo) { _resetBuffer(o); if (ptr && len) { xo->msgBufPtr=ptr; xo->msgBufLen=len; } xo->bytesLeft=xo->msgBufLen; xo->currentPtr=xo->msgBufPtr; xo->flags&=~AQH_MSGWRITER_FLAGS_MSGSTARTED; } } } int _handleSignal(AQH_OBJECT *o, uint32_t slotId, GWEN_UNUSED AQH_OBJECT *senderObject, GWEN_UNUSED int param1, GWEN_UNUSED void *param2) { switch(slotId) { case AQH_MSGWRITER_SLOT_SOCKETREADY: return _handleSocketReady(o, senderObject); default: break; } return 0; /* not handled */ } int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject) { AQH_MSG_WRITER *xo; DBG_DEBUG(AQH_LOGDOMAIN, "Socket ready"); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o); if (xo) { int rv; if (xo->bytesLeft) { if (!(xo->flags & AQH_MSGWRITER_FLAGS_MSGSTARTED)) { DBG_INFO(NULL, "Starting message"); rv=_startMsg(xo, fdObject); if (rv<0) { if (rv==GWEN_ERROR_TRY_AGAIN) { /* line is busy */ } else { _endMsg(xo, fdObject); AQH_Object_EmitSignal(o, AQH_MSG_WRITER_SIGNAL_ERROR, rv, NULL); } return 1; } } do { rv=AQH_FdObject_Write(fdObject, xo->currentPtr, xo->bytesLeft); if (rv>0) { xo->currentPtr+=rv; xo->bytesLeft-=rv; } } while (rv>0 && xo->bytesLeft>0); if (rv<0 && rv!=GWEN_ERROR_TRY_AGAIN) { _endMsg(xo, fdObject); AQH_Object_EmitSignal(o, AQH_MSG_WRITER_SIGNAL_ERROR, rv, NULL); } else { if (xo->bytesLeft==0) { int msgLen; const uint8_t *msgPtr; _endMsg(xo, fdObject); DBG_INFO(NULL, "Ended message"); msgPtr=xo->msgBufPtr; msgLen=xo->msgBufLen; _resetBuffer(o); rv=AQH_Object_EmitSignal(o, AQH_MSG_WRITER_SIGNAL_MSGSENT, msgLen, (void*) msgPtr); if (rv==0) { DBG_ERROR(AQH_LOGDOMAIN, "Sent message ignored"); } } } } return 1; } return 0; } int _startMsg(AQH_MSG_WRITER *xo, AQH_OBJECT *fdObject) { int rv; rv=AQH_FdObject_StartMsg(fdObject); if (rv<0) { /* line is busy */ return rv; } xo->flags|=AQH_MSGWRITER_FLAGS_MSGSTARTED; return 0; } void _endMsg(AQH_MSG_WRITER *xo, AQH_OBJECT *fdObject) { xo->flags&=~AQH_MSGWRITER_FLAGS_MSGSTARTED; AQH_FdObject_EndMsg(fdObject); } void _resetBuffer(AQH_OBJECT *o) { AQH_MSG_WRITER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o); if (xo) { xo->msgBufPtr=NULL; xo->msgBufLen=0; xo->currentPtr=NULL; xo->bytesLeft=0; } } void _cbEnable(AQH_OBJECT *o) { if (o) { AQH_MSG_WRITER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o); if (xo && xo->fdObject) AQH_Object_Enable(xo->fdObject); } } void _cbDisable(AQH_OBJECT *o) { if (o) { AQH_MSG_WRITER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o); if (xo && xo->fdObject) AQH_Object_Disable(xo->fdObject); } }