/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "aqhome/msgendpointmanager_p.h" #include "aqhome/msg_setaccmsggrps.h" #include #include #include #include #include static int _ioLoopOnce(AQH_MSG_ENDPOINT_MGR *emgr); static void _msgLoopOnce(AQH_MSG_ENDPOINT_MGR *emgr); static void _runAllEndpoints(AQH_MSG_ENDPOINT_MGR *emgr); static void _distributeMsg(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep, const AQH_MSG *msg); static void _handleAdminMsg(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep, const AQH_MSG *msg); static void _handleMsgSetAcceptedMsgGroups(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep, const AQH_MSG *msg); static uint32_t _getMsgGroup(uint8_t msgType); AQH_MSG_ENDPOINT_MGR *AQH_MsgEndpointMgr_new(uint8_t busAddr) { AQH_MSG_ENDPOINT_MGR *emgr; GWEN_NEW_OBJECT(AQH_MSG_ENDPOINT_MGR, emgr); emgr->endpointList=AQH_MsgEndpoint_List_new(); emgr->busAddr=busAddr; return emgr; } void AQH_MsgEndpointMgr_free(AQH_MSG_ENDPOINT_MGR *emgr) { if (emgr) { AQH_MsgEndpoint_List_free(emgr->endpointList); GWEN_FREE_OBJECT(emgr); } } uint8_t AQH_MsgEndpointMgr_GetBusAddr(const AQH_MSG_ENDPOINT_MGR *emgr) { return emgr->busAddr; } AQH_MSG_ENDPOINT_LIST *AQH_MsgEndpointMgr_GetEndpointList(const AQH_MSG_ENDPOINT_MGR *emgr) { return emgr->endpointList; } void AQH_MsgEndpointMgr_AddEndpoint(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep) { AQH_MsgEndpoint_List_Add(ep, emgr->endpointList); } void AQH_MsgEndpointMgr_DelEndpoint(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep) { AQH_MsgEndpoint_List_Del(ep); } int AQH_MsgEndpointMgr_LoopOnce(AQH_MSG_ENDPOINT_MGR *emgr) { int rv; rv=_ioLoopOnce(emgr); _msgLoopOnce(emgr); _runAllEndpoints(emgr); return rv; } int _ioLoopOnce(AQH_MSG_ENDPOINT_MGR *emgr) { AQH_MSG_ENDPOINT *ep; fd_set readSet; fd_set writeSet; int highestRdFd=-1; int highestWrFd=-1; struct timeval tv; int rv; FD_ZERO(&readSet); FD_ZERO(&writeSet); tv.tv_sec=2; tv.tv_usec=0; DBG_INFO(AQH_LOGDOMAIN, "Sampling sockets"); ep=AQH_MsgEndpoint_List_First(emgr->endpointList); if (ep==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "No endpoints."); return GWEN_ERROR_GENERIC; } while(ep) { DBG_INFO(AQH_LOGDOMAIN, "- checking endpoint %s", AQH_MsgEndpoint_GetName(ep)); if (!(AQH_MsgEndpoint_GetFlags(ep) & AQH_MSG_ENDPOINT_FLAGS_NOIO)) { int fd; fd=AQH_MsgEndpoint_GetReadFd(ep); if (fd>=0) { DBG_INFO(AQH_LOGDOMAIN, " - adding socket %d for read", fd); FD_SET(fd, &readSet); highestRdFd=(fd>highestRdFd)?fd:highestRdFd; } fd=AQH_MsgEndpoint_GetWriteFd(ep); if (fd>=0) { DBG_INFO(AQH_LOGDOMAIN, " - adding socket %d for write", fd); FD_SET(fd, &writeSet); highestWrFd=(fd>highestWrFd)?fd:highestWrFd; } } else { DBG_INFO(AQH_LOGDOMAIN, " - endpoint %s does not support IO", AQH_MsgEndpoint_GetName(ep)); } ep=AQH_MsgEndpoint_List_Next(ep); } DBG_INFO(AQH_LOGDOMAIN, "Calling select (highest read socket: %d, highest write socket: %d)", highestRdFd, highestWrFd); rv=select(((highestRdFd>highestWrFd)?highestRdFd:highestWrFd)+1, (highestRdFd<0)?NULL:&readSet, (highestWrFd<0)?NULL:&writeSet, NULL, &tv); DBG_INFO(AQH_LOGDOMAIN, "Return from select (%d, %d=%s)", rv, (rv<0)?errno:0, (rv<0)?strerror(errno):"no error"); if (rv<0) { if (errno!=EINTR) { DBG_ERROR(AQH_LOGDOMAIN, "Error on select"); return GWEN_ERROR_IO; } } else if (rv==0) { /* timeout */ DBG_INFO(AQH_LOGDOMAIN, "timeout"); return GWEN_ERROR_TRY_AGAIN; } else if (rv) { DBG_INFO(AQH_LOGDOMAIN, "Letting all endpoints handle IO"); ep=AQH_MsgEndpoint_List_First(emgr->endpointList); while(ep) { AQH_MSG_ENDPOINT *epNext; int fd; int rv; epNext=AQH_MsgEndpoint_List_Next(ep); fd=AQH_MsgEndpoint_GetFd(ep); if (fd!=-1 && FD_ISSET(fd, &readSet)) { DBG_INFO(AQH_LOGDOMAIN, "- endpoint(%s): read", AQH_MsgEndpoint_GetName(ep)); rv=AQH_MsgEndpoint_HandleReadable(ep, emgr); if (rv<0 && rv!=GWEN_ERROR_TRY_AGAIN) { DBG_INFO(AQH_LOGDOMAIN, "error, removing endpoint %s", AQH_MsgEndpoint_GetName(ep)); fd=-1; AQH_MsgEndpointMgr_DelEndpoint(emgr, ep); } } if (fd!=-1 && FD_ISSET(fd, &writeSet)) { DBG_INFO(AQH_LOGDOMAIN, "- endpoint(%s): write", AQH_MsgEndpoint_GetName(ep)); rv=AQH_MsgEndpoint_HandleWritable(ep, emgr); if (rv<0 && rv!=GWEN_ERROR_TRY_AGAIN) { DBG_INFO(AQH_LOGDOMAIN, "error, removing endpoint %s", AQH_MsgEndpoint_GetName(ep)); fd=-1; AQH_MsgEndpointMgr_DelEndpoint(emgr, ep); } } ep=epNext; } } return 0; } void _msgLoopOnce(AQH_MSG_ENDPOINT_MGR *emgr) { AQH_MSG_ENDPOINT *ep; DBG_INFO(AQH_LOGDOMAIN, "Handle endpoint messages"); ep=AQH_MsgEndpoint_List_First(emgr->endpointList); while(ep) { AQH_MSG *msg; DBG_INFO(AQH_LOGDOMAIN, "- endpoint(%s)", AQH_MsgEndpoint_GetName(ep)); while( (msg=AQH_MsgEndpoint_TakeFirstReceivedMessage(ep)) ) { uint32_t msgGroup; DBG_INFO(AQH_LOGDOMAIN, " - msg %d from %d to %d", AQH_Msg_GetMsgType(msg), AQH_Msg_GetSourceAddress(msg), AQH_Msg_GetDestAddress(msg)); msgGroup=_getMsgGroup(AQH_Msg_GetMsgType(msg)); if (msgGroup & AQH_MSG_ENDPOINT_MSGGROUP_ADMIN) { if (!(AQH_MsgEndpoint_GetGroupId(ep) & AQH_MSG_ENDPOINT_ENDPOINTGROUP_BUS)) { /* only handle admin messages not from nodes */ DBG_INFO(AQH_LOGDOMAIN, " - handling admin message"); _handleAdminMsg(emgr, ep, msg); } } else { DBG_INFO(AQH_LOGDOMAIN, " - distributing message"); _distributeMsg(emgr, ep, msg); } AQH_Msg_free(msg); } ep=AQH_MsgEndpoint_List_Next(ep); } } void _runAllEndpoints(AQH_MSG_ENDPOINT_MGR *emgr) { AQH_MSG_ENDPOINT *ep; DBG_INFO(AQH_LOGDOMAIN, "Running all endpoints"); ep=AQH_MsgEndpoint_List_First(emgr->endpointList); while(ep) { AQH_MSG_ENDPOINT *next; next=AQH_MsgEndpoint_List_Next(ep); DBG_INFO(AQH_LOGDOMAIN, "- running endpoint %s", AQH_MsgEndpoint_GetName(ep)); AQH_MsgEndpoint_Run(ep); ep=next; } } void _distributeMsg(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *srcEp, const AQH_MSG *msg) { AQH_MSG_ENDPOINT *ep; int srcGroupId; uint32_t msgGroup; msgGroup=_getMsgGroup(AQH_Msg_GetMsgType(msg)); srcGroupId=AQH_MsgEndpoint_GetGroupId(srcEp); ep=AQH_MsgEndpoint_List_First(emgr->endpointList); while(ep) { if (ep!=srcEp) { uint32_t acceptedGroupIds; uint32_t acceptedMsgGroups; DBG_INFO(AQH_LOGDOMAIN, "- checking endpoint %s", AQH_MsgEndpoint_GetName(ep)); acceptedGroupIds=AQH_MsgEndpoint_GetAcceptedMsgGroups(ep); acceptedMsgGroups=AQH_MsgEndpoint_GetAcceptedMsgGroups(ep); if ( !(AQH_MsgEndpoint_GetFlags(ep) & AQH_MSG_ENDPOINT_FLAGS_NOMESSAGES) && (acceptedMsgGroups & msgGroup) && (acceptedGroupIds & srcGroupId) ) { /* endpoint accepts this message */ DBG_INFO(AQH_LOGDOMAIN, " - endpoint %s accepts message", AQH_MsgEndpoint_GetName(ep)); AQH_MsgEndpoint_AddSendMessage(ep, AQH_Msg_dup(msg)); } else { DBG_INFO(AQH_LOGDOMAIN, " - endpoint %s does not accept message", AQH_MsgEndpoint_GetName(ep)); } } ep=AQH_MsgEndpoint_List_Next(ep); } } void _handleAdminMsg(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep, const AQH_MSG *msg) { uint8_t mt; mt=AQH_Msg_GetMsgType(msg); switch(mt) { case AQH_MSG_TYPE_NET_SET_ACCEPTED_MSGGROUPS: _handleMsgSetAcceptedMsgGroups(emgr, ep, msg); break; default: break; } } void _handleMsgSetAcceptedMsgGroups(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep, const AQH_MSG *msg) { AQH_MsgEndpoint_SetAcceptedMsgGroups(ep, AQH_MsgSetAcceptedMsgGroups_GetMsgGroups(msg)); } uint32_t _getMsgGroup(uint8_t msgType) { switch(msgType) { case AQH_MSG_TYPE_PING: case AQH_MSG_TYPE_PONG: case AQH_MSG_TYPE_COMSENDSTATS: case AQH_MSG_TYPE_COMRECVSTATS: case AQH_MSG_TYPE_TWIBUSMEMBER: case AQH_MSG_TYPE_DEBUG: return AQH_MSG_ENDPOINT_MSGGROUP_INFO; case AQH_MSG_TYPE_VALUE: return AQH_MSG_ENDPOINT_MSGGROUP_VALUES; case AQH_MSG_TYPE_NEED_ADDRESS: case AQH_MSG_TYPE_HAVE_ADDRESS: case AQH_MSG_TYPE_CLAIM_ADDRESS: case AQH_MSG_TYPE_DENY_ADDRESS: case AQH_MSG_TYPE_ADDRESS_RANGE: return AQH_MSG_ENDPOINT_MSGGROUP_ADDRESS; case AQH_MSG_TYPE_NET_SET_ACCEPTED_MSGGROUPS: return AQH_MSG_ENDPOINT_MSGGROUP_ADMIN; default: return 0; } }