345 lines
9.0 KiB
C
345 lines
9.0 KiB
C
/****************************************************************************
|
|
* This file is part of the project AqHome.
|
|
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
|
|
*
|
|
* The license for this file can be found in the file COPYING which you
|
|
* should have received along with this file.
|
|
****************************************************************************/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
# include <config.h>
|
|
#endif
|
|
|
|
#include "aqhome/msgendpointmanager_p.h"
|
|
#include "aqhome/msg_setaccmsggrps.h"
|
|
|
|
#include <gwenhywfar/misc.h>
|
|
#include <gwenhywfar/list.h>
|
|
#include <gwenhywfar/error.h>
|
|
#include <gwenhywfar/debug.h>
|
|
|
|
#include <errno.h>
|
|
|
|
|
|
|
|
static int _ioLoopOnce(AQH_MSG_ENDPOINT_MGR *emgr);
|
|
static void _msgLoopOnce(AQH_MSG_ENDPOINT_MGR *emgr);
|
|
static void _runAllEndpoints(AQH_MSG_ENDPOINT_MGR *emgr);
|
|
static void _distributeMsg(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep, const AQH_MSG *msg);
|
|
static void _handleAdminMsg(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep, const AQH_MSG *msg);
|
|
static void _handleMsgSetAcceptedMsgGroups(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep, const AQH_MSG *msg);
|
|
static uint32_t _getMsgGroup(uint8_t msgType);
|
|
|
|
|
|
|
|
|
|
AQH_MSG_ENDPOINT_MGR *AQH_MsgEndpointMgr_new(uint8_t busAddr)
|
|
{
|
|
AQH_MSG_ENDPOINT_MGR *emgr;
|
|
|
|
GWEN_NEW_OBJECT(AQH_MSG_ENDPOINT_MGR, emgr);
|
|
emgr->endpointList=AQH_MsgEndpoint_List_new();
|
|
emgr->busAddr=busAddr;
|
|
return emgr;
|
|
}
|
|
|
|
|
|
|
|
void AQH_MsgEndpointMgr_free(AQH_MSG_ENDPOINT_MGR *emgr)
|
|
{
|
|
if (emgr) {
|
|
AQH_MsgEndpoint_List_free(emgr->endpointList);
|
|
GWEN_FREE_OBJECT(emgr);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
uint8_t AQH_MsgEndpointMgr_GetBusAddr(const AQH_MSG_ENDPOINT_MGR *emgr)
|
|
{
|
|
return emgr->busAddr;
|
|
}
|
|
|
|
|
|
|
|
AQH_MSG_ENDPOINT_LIST *AQH_MsgEndpointMgr_GetEndpointList(const AQH_MSG_ENDPOINT_MGR *emgr)
|
|
{
|
|
return emgr->endpointList;
|
|
}
|
|
|
|
|
|
|
|
void AQH_MsgEndpointMgr_AddEndpoint(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep)
|
|
{
|
|
AQH_MsgEndpoint_List_Add(ep, emgr->endpointList);
|
|
}
|
|
|
|
|
|
|
|
void AQH_MsgEndpointMgr_DelEndpoint(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep)
|
|
{
|
|
AQH_MsgEndpoint_List_Del(ep);
|
|
}
|
|
|
|
|
|
|
|
int AQH_MsgEndpointMgr_LoopOnce(AQH_MSG_ENDPOINT_MGR *emgr)
|
|
{
|
|
int rv;
|
|
|
|
rv=_ioLoopOnce(emgr);
|
|
_msgLoopOnce(emgr);
|
|
_runAllEndpoints(emgr);
|
|
return rv;
|
|
}
|
|
|
|
|
|
|
|
int _ioLoopOnce(AQH_MSG_ENDPOINT_MGR *emgr)
|
|
{
|
|
AQH_MSG_ENDPOINT *ep;
|
|
fd_set readSet;
|
|
fd_set writeSet;
|
|
int highestRdFd=-1;
|
|
int highestWrFd=-1;
|
|
struct timeval tv;
|
|
int rv;
|
|
|
|
FD_ZERO(&readSet);
|
|
FD_ZERO(&writeSet);
|
|
tv.tv_sec=2;
|
|
tv.tv_usec=0;
|
|
|
|
DBG_INFO(AQH_LOGDOMAIN, "Sampling sockets");
|
|
ep=AQH_MsgEndpoint_List_First(emgr->endpointList);
|
|
if (ep==NULL) {
|
|
DBG_ERROR(AQH_LOGDOMAIN, "No endpoints.");
|
|
return GWEN_ERROR_GENERIC;
|
|
}
|
|
while(ep) {
|
|
DBG_INFO(AQH_LOGDOMAIN, "- checking endpoint %s", AQH_MsgEndpoint_GetName(ep));
|
|
if (!(AQH_MsgEndpoint_GetFlags(ep) & AQH_MSG_ENDPOINT_FLAGS_NOIO)) {
|
|
int fd;
|
|
|
|
fd=AQH_MsgEndpoint_GetReadFd(ep);
|
|
if (fd>=0) {
|
|
DBG_INFO(AQH_LOGDOMAIN, " - adding socket %d for read", fd);
|
|
FD_SET(fd, &readSet);
|
|
highestRdFd=(fd>highestRdFd)?fd:highestRdFd;
|
|
}
|
|
|
|
fd=AQH_MsgEndpoint_GetWriteFd(ep);
|
|
if (fd>=0) {
|
|
DBG_INFO(AQH_LOGDOMAIN, " - adding socket %d for write", fd);
|
|
FD_SET(fd, &writeSet);
|
|
highestWrFd=(fd>highestWrFd)?fd:highestWrFd;
|
|
}
|
|
}
|
|
else {
|
|
DBG_INFO(AQH_LOGDOMAIN, " - endpoint %s does not support IO", AQH_MsgEndpoint_GetName(ep));
|
|
}
|
|
ep=AQH_MsgEndpoint_List_Next(ep);
|
|
}
|
|
|
|
DBG_INFO(AQH_LOGDOMAIN, "Calling select (highest read socket: %d, highest write socket: %d)", highestRdFd, highestWrFd);
|
|
rv=select(((highestRdFd>highestWrFd)?highestRdFd:highestWrFd)+1,
|
|
(highestRdFd<0)?NULL:&readSet,
|
|
(highestWrFd<0)?NULL:&writeSet,
|
|
NULL,
|
|
&tv);
|
|
DBG_INFO(AQH_LOGDOMAIN, "Return from select (%d, %d=%s)", rv, (rv<0)?errno:0, (rv<0)?strerror(errno):"no error");
|
|
if (rv<0) {
|
|
if (errno!=EINTR) {
|
|
DBG_ERROR(AQH_LOGDOMAIN, "Error on select");
|
|
return GWEN_ERROR_IO;
|
|
}
|
|
}
|
|
else if (rv==0) {
|
|
/* timeout */
|
|
DBG_INFO(AQH_LOGDOMAIN, "timeout");
|
|
return GWEN_ERROR_TRY_AGAIN;
|
|
}
|
|
else if (rv) {
|
|
DBG_INFO(AQH_LOGDOMAIN, "Letting all endpoints handle IO");
|
|
ep=AQH_MsgEndpoint_List_First(emgr->endpointList);
|
|
while(ep) {
|
|
AQH_MSG_ENDPOINT *epNext;
|
|
int fd;
|
|
int rv;
|
|
|
|
epNext=AQH_MsgEndpoint_List_Next(ep);
|
|
fd=AQH_MsgEndpoint_GetFd(ep);
|
|
if (fd!=-1 && FD_ISSET(fd, &readSet)) {
|
|
DBG_INFO(AQH_LOGDOMAIN, "- endpoint(%s): read", AQH_MsgEndpoint_GetName(ep));
|
|
rv=AQH_MsgEndpoint_HandleReadable(ep, emgr);
|
|
if (rv<0 && rv!=GWEN_ERROR_TRY_AGAIN) {
|
|
DBG_INFO(AQH_LOGDOMAIN, "error, removing endpoint %s", AQH_MsgEndpoint_GetName(ep));
|
|
fd=-1;
|
|
AQH_MsgEndpointMgr_DelEndpoint(emgr, ep);
|
|
}
|
|
}
|
|
if (fd!=-1 && FD_ISSET(fd, &writeSet)) {
|
|
DBG_INFO(AQH_LOGDOMAIN, "- endpoint(%s): write", AQH_MsgEndpoint_GetName(ep));
|
|
rv=AQH_MsgEndpoint_HandleWritable(ep, emgr);
|
|
if (rv<0 && rv!=GWEN_ERROR_TRY_AGAIN) {
|
|
DBG_INFO(AQH_LOGDOMAIN, "error, removing endpoint %s", AQH_MsgEndpoint_GetName(ep));
|
|
fd=-1;
|
|
AQH_MsgEndpointMgr_DelEndpoint(emgr, ep);
|
|
}
|
|
}
|
|
ep=epNext;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
|
|
void _msgLoopOnce(AQH_MSG_ENDPOINT_MGR *emgr)
|
|
{
|
|
AQH_MSG_ENDPOINT *ep;
|
|
|
|
DBG_INFO(AQH_LOGDOMAIN, "Handle endpoint messages");
|
|
ep=AQH_MsgEndpoint_List_First(emgr->endpointList);
|
|
while(ep) {
|
|
AQH_MSG *msg;
|
|
|
|
DBG_INFO(AQH_LOGDOMAIN, "- endpoint(%s)", AQH_MsgEndpoint_GetName(ep));
|
|
while( (msg=AQH_MsgEndpoint_TakeFirstReceivedMessage(ep)) ) {
|
|
uint32_t msgGroup;
|
|
|
|
DBG_INFO(AQH_LOGDOMAIN,
|
|
" - msg %d from %d to %d",
|
|
AQH_Msg_GetMsgType(msg), AQH_Msg_GetSourceAddress(msg), AQH_Msg_GetDestAddress(msg));
|
|
msgGroup=_getMsgGroup(AQH_Msg_GetMsgType(msg));
|
|
if (msgGroup & AQH_MSG_ENDPOINT_MSGGROUP_ADMIN) {
|
|
if (!(AQH_MsgEndpoint_GetGroupId(ep) & AQH_MSG_ENDPOINT_ENDPOINTGROUP_BUS)) {
|
|
/* only handle admin messages not from nodes */
|
|
DBG_INFO(AQH_LOGDOMAIN, " - handling admin message");
|
|
_handleAdminMsg(emgr, ep, msg);
|
|
}
|
|
}
|
|
else {
|
|
DBG_INFO(AQH_LOGDOMAIN, " - distributing message");
|
|
_distributeMsg(emgr, ep, msg);
|
|
}
|
|
AQH_Msg_free(msg);
|
|
}
|
|
|
|
ep=AQH_MsgEndpoint_List_Next(ep);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void _runAllEndpoints(AQH_MSG_ENDPOINT_MGR *emgr)
|
|
{
|
|
AQH_MSG_ENDPOINT *ep;
|
|
|
|
DBG_INFO(AQH_LOGDOMAIN, "Running all endpoints");
|
|
ep=AQH_MsgEndpoint_List_First(emgr->endpointList);
|
|
while(ep) {
|
|
AQH_MSG_ENDPOINT *next;
|
|
|
|
next=AQH_MsgEndpoint_List_Next(ep);
|
|
DBG_INFO(AQH_LOGDOMAIN, "- running endpoint %s", AQH_MsgEndpoint_GetName(ep));
|
|
AQH_MsgEndpoint_Run(ep);
|
|
ep=next;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void _distributeMsg(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *srcEp, const AQH_MSG *msg)
|
|
{
|
|
AQH_MSG_ENDPOINT *ep;
|
|
int srcGroupId;
|
|
uint32_t msgGroup;
|
|
|
|
msgGroup=_getMsgGroup(AQH_Msg_GetMsgType(msg));
|
|
srcGroupId=AQH_MsgEndpoint_GetGroupId(srcEp);
|
|
|
|
ep=AQH_MsgEndpoint_List_First(emgr->endpointList);
|
|
while(ep) {
|
|
if (ep!=srcEp) {
|
|
uint32_t acceptedGroupIds;
|
|
uint32_t acceptedMsgGroups;
|
|
|
|
DBG_INFO(AQH_LOGDOMAIN, "- checking endpoint %s", AQH_MsgEndpoint_GetName(ep));
|
|
acceptedGroupIds=AQH_MsgEndpoint_GetAcceptedMsgGroups(ep);
|
|
acceptedMsgGroups=AQH_MsgEndpoint_GetAcceptedMsgGroups(ep);
|
|
|
|
if (
|
|
!(AQH_MsgEndpoint_GetFlags(ep) & AQH_MSG_ENDPOINT_FLAGS_NOMESSAGES) &&
|
|
(acceptedMsgGroups & msgGroup) &&
|
|
(acceptedGroupIds & srcGroupId)
|
|
) {
|
|
/* endpoint accepts this message */
|
|
DBG_INFO(AQH_LOGDOMAIN, " - endpoint %s accepts message", AQH_MsgEndpoint_GetName(ep));
|
|
AQH_MsgEndpoint_AddSendMessage(ep, AQH_Msg_dup(msg));
|
|
}
|
|
else {
|
|
DBG_INFO(AQH_LOGDOMAIN, " - endpoint %s does not accept message", AQH_MsgEndpoint_GetName(ep));
|
|
}
|
|
}
|
|
ep=AQH_MsgEndpoint_List_Next(ep);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void _handleAdminMsg(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep, const AQH_MSG *msg)
|
|
{
|
|
uint8_t mt;
|
|
|
|
mt=AQH_Msg_GetMsgType(msg);
|
|
switch(mt) {
|
|
case AQH_MSG_TYPE_NET_SET_ACCEPTED_MSGGROUPS:
|
|
_handleMsgSetAcceptedMsgGroups(emgr, ep, msg);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void _handleMsgSetAcceptedMsgGroups(AQH_MSG_ENDPOINT_MGR *emgr, AQH_MSG_ENDPOINT *ep, const AQH_MSG *msg)
|
|
{
|
|
AQH_MsgEndpoint_SetAcceptedMsgGroups(ep, AQH_MsgSetAcceptedMsgGroups_GetMsgGroups(msg));
|
|
}
|
|
|
|
|
|
|
|
uint32_t _getMsgGroup(uint8_t msgType)
|
|
{
|
|
switch(msgType) {
|
|
case AQH_MSG_TYPE_PING:
|
|
case AQH_MSG_TYPE_PONG:
|
|
case AQH_MSG_TYPE_COMSENDSTATS:
|
|
case AQH_MSG_TYPE_COMRECVSTATS:
|
|
case AQH_MSG_TYPE_TWIBUSMEMBER:
|
|
case AQH_MSG_TYPE_DEBUG:
|
|
return AQH_MSG_ENDPOINT_MSGGROUP_INFO;
|
|
case AQH_MSG_TYPE_VALUE:
|
|
return AQH_MSG_ENDPOINT_MSGGROUP_VALUES;
|
|
case AQH_MSG_TYPE_NEED_ADDRESS:
|
|
case AQH_MSG_TYPE_HAVE_ADDRESS:
|
|
case AQH_MSG_TYPE_CLAIM_ADDRESS:
|
|
case AQH_MSG_TYPE_DENY_ADDRESS:
|
|
case AQH_MSG_TYPE_ADDRESS_RANGE:
|
|
return AQH_MSG_ENDPOINT_MSGGROUP_ADDRESS;
|
|
case AQH_MSG_TYPE_NET_SET_ACCEPTED_MSGGROUPS:
|
|
return AQH_MSG_ENDPOINT_MSGGROUP_ADMIN;
|
|
default:
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|