/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "aqhome/msgmanager_p.h" #include "aqhome/msg/endpointmgr.h" #include "aqhome/msg/msg_node.h" #include "aqhome/msg/msg_sendstats.h" #include "aqhome/msg/msg_recvstats.h" #include "aqhome/msg/msg_value2.h" #include "aqhome/msg/msg_needaddr.h" #include "aqhome/msg/msg_claimaddr.h" #include "aqhome/msg/msg_haveaddr.h" #include "aqhome/msg/msg_device.h" #include "aqhome/msg/msg_ping.h" #include "aqhome/msg/endpoint_tty.h" #include "aqhome/msg/endpoint_node.h" #include "aqhome/mqtt/endpoint_mqttc.h" #include "aqhome/ipc/msg_ipc.h" #include "aqhome/ipc/msg_ipc_ping.h" #include "aqhome/ipc/msg_ipc_setaccmsggrps.h" #include "aqhome/ipc/msg_ipc_forward.h" #include #include #include #include #include GWEN_INHERIT(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER) static void GWENHYWFAR_CB _freeData(void *bp, void *p); static void _loopOnceOverEndpoints(GWEN_MSG_ENDPOINT_MGR *emgr); static void _handleEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep); static void _handleNodeMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleIpcMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMqttMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMsgValue2(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMsgNeedAddress(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMsgClaimAddress(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMsgHaveAddress(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMsgComSendStat(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMsgComRecvStat(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleMsgDevice(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleIpcMsgPing(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleIpcMsgSetAccMsgGrps(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static void _handleIpcMsgForward(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); static AQH_NODE_INFO *_getOrCreateNodeAndUpdateUidAddr(GWEN_MSG_ENDPOINT_MGR *emgr, const GWEN_MSG *msg, uint32_t uid); GWEN_MSG_ENDPOINT_MGR *AQH_MsgManager_new(uint8_t busAddr) { GWEN_MSG_ENDPOINT_MGR *mgr; AQH_MSG_MANAGER *xmgr; mgr=AQH_MsgEndpointMgr_new(busAddr); GWEN_NEW_OBJECT(AQH_MSG_MANAGER, xmgr); GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, mgr, xmgr, _freeData); xmgr->nodeDb=AQH_NodeDb_new(); return mgr; } void _freeData(void *bp, void *p) { AQH_MSG_MANAGER *xmgr; xmgr=(AQH_MSG_MANAGER*) p; AQH_NodeDb_free(xmgr->nodeDb); free(xmgr->dbFilename); GWEN_FREE_OBJECT(xmgr); } const char *AQH_MsgManager_GetDbFilename(const GWEN_MSG_ENDPOINT_MGR *emgr) { AQH_MSG_MANAGER *xmgr; xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); if (xmgr) return xmgr->dbFilename; return NULL; } void AQH_MsgManager_SetDbFilename(GWEN_MSG_ENDPOINT_MGR *emgr, const char *s) { AQH_MSG_MANAGER *xmgr; xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); if (xmgr) { free(xmgr->dbFilename); xmgr->dbFilename=s?strdup(s):NULL; if (xmgr->dbFilename) { GWEN_DB_NODE *dbNodeDb; int rv; dbNodeDb=GWEN_DB_Group_new("dbNodes"); rv=GWEN_DB_ReadFile(dbNodeDb, xmgr->dbFilename, GWEN_DB_FLAGS_DEFAULT); if (rv==0) { AQH_NodeDb_fromDb(xmgr->nodeDb, dbNodeDb); GWEN_DB_Group_free(dbNodeDb); } } } } int AQH_MsgManager_LoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr) { int rv; rv=GWEN_MsgEndpointMgr_IoLoopOnce(emgr); _loopOnceOverEndpoints(emgr); GWEN_MsgEndpointMgr_RunAllEndpoints(emgr); return rv; } void _loopOnceOverEndpoints(GWEN_MSG_ENDPOINT_MGR *emgr) { AQH_MSG_MANAGER *xmgr; GWEN_MSG_ENDPOINT_LIST *endpointList; DBG_DEBUG(AQH_LOGDOMAIN, "Handle endpoint messages"); xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); endpointList=GWEN_MsgEndpointMgr_GetEndpointList(emgr); if (endpointList) { GWEN_MSG_ENDPOINT *ep; ep=GWEN_MsgEndpoint_List_First(endpointList); while(ep) { if (GWEN_MsgEndpoint_GetGroupId(ep) & AQH_MSGMGR_ENDPOINTGROUP_MQTT) { if (GWEN_ConnectableMsgEndpoint_GetState(ep)>=GWEN_ENDPOINT_MQTTC_STATE_ESTABLISHED) _handleEndpoint(emgr, ep); else { DBG_INFO(AQH_LOGDOMAIN, "Not handling MQTT endpoint right now (not fully connected)"); } } else _handleEndpoint(emgr, ep); ep=GWEN_MsgEndpoint_List_Next(ep); } /* while */ } if (AQH_NodeDb_IsModified(xmgr->nodeDb)) { if (xmgr->dbFilename) { GWEN_DB_NODE *dbNodeDb; dbNodeDb=GWEN_DB_Group_new("nodeDb"); AQH_NodeDb_toDb(xmgr->nodeDb, dbNodeDb); GWEN_DB_WriteFile(dbNodeDb, xmgr->dbFilename, GWEN_DB_FLAGS_DEFAULT); GWEN_DB_Group_free(dbNodeDb); } } } void _handleEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep) { GWEN_MSG *msg; while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep)) ) { int groupId; groupId=GWEN_Msg_GetGroupId(msg); switch(groupId) { case AQH_MSGMGR_ENDPOINTGROUP_NODE: _handleNodeMsg(emgr, ep, msg); break; case AQH_MSGMGR_ENDPOINTGROUP_IPC: _handleIpcMsg(emgr, ep, msg); break; case AQH_MSGMGR_ENDPOINTGROUP_MQTT: _handleMqttMsg(emgr, ep, msg); break; default: DBG_ERROR(AQH_LOGDOMAIN, "unhandled groupId %d (%02x), ignoring message", groupId, groupId); break; } GWEN_Msg_free(msg); } } void _handleNodeMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { int msgIsValid; uint8_t msgType; DBG_INFO(AQH_LOGDOMAIN, " - msg %d (%s) from %d to %d", AQH_NodeMsg_GetMsgType(msg), AQH_NodeMsg_MsgTypeToChar(AQH_NodeMsg_GetMsgType(msg)), AQH_NodeMsg_GetSourceAddress(msg), AQH_NodeMsg_GetDestAddress(msg)); AQH_MsgEndpointMgr_DistributeMsgFromNodeEndpoint(emgr, ep, msg, AQH_MSGMGR_ENDPOINTGROUP_NODE, NULL); msgIsValid=(AQH_NodeMsg_IsChecksumValid(msg) && AQH_NodeMsg_IsMsgComplete(msg)); msgType=AQH_NodeMsg_GetMsgType(msg); if (msgIsValid) { switch(msgType) { case AQH_MSG_TYPE_COMSENDSTATS: _handleMsgComSendStat(emgr, ep, msg); break; case AQH_MSG_TYPE_COMRECVSTATS: _handleMsgComRecvStat(emgr, ep, msg); break; case AQH_MSG_TYPE_VALUE2: _handleMsgValue2(emgr, ep, msg); break; case AQH_MSG_TYPE_NEED_ADDRESS: _handleMsgNeedAddress(emgr, ep, msg); break; case AQH_MSG_TYPE_CLAIM_ADDRESS: _handleMsgClaimAddress(emgr, ep, msg); break; case AQH_MSG_TYPE_HAVE_ADDRESS: _handleMsgHaveAddress(emgr, ep, msg); break; case AQH_MSG_TYPE_DEVICE: _handleMsgDevice(emgr, ep, msg); break; default: break; } } } void _handleIpcMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { uint16_t code; /* exec IPC message */ code=GWEN_IpcMsg_GetCode(msg); DBG_ERROR(AQH_LOGDOMAIN, "Received IPC packet"); switch(code) { case AQH_MSGTYPE_IPC_PING: _handleIpcMsgPing(emgr, ep, msg); break; case AQH_MSGTYPE_IPC_SETACCMSGGRPS: _handleIpcMsgSetAccMsgGrps(emgr, ep, msg); break; case AQH_MSGTYPE_IPC_FORWARD: _handleIpcMsgForward(emgr, ep, msg); break; default: break; } } void _handleMqttMsg(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { /* exec MQTT message */ } void _handleIpcMsgPing(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { GWEN_MSG *msgOut; DBG_ERROR(AQH_LOGDOMAIN, "Received IPC PING message"); msgOut=AQH_PingMsg_new(AQH_MsgEndpointMgr_GetBusAddr(emgr), AQH_PingIpcMsg_GetDestAddr(msg), AQH_MSG_TYPE_PING); AQH_MsgEndpointMgr_DistributeMsgFromNodeEndpoint(emgr, ep, msgOut, AQH_MSGMGR_ENDPOINTGROUP_NODE, AQH_MSG_ENDPOINT_TTY_NAME); GWEN_Msg_free(msgOut); } void _handleIpcMsgSetAccMsgGrps(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { uint32_t groups; DBG_ERROR(AQH_LOGDOMAIN, "Received IPC SET_ACCEPTED_MSG_GROUPS message"); groups=AQH_SetAcceptedMsgGroupsIpcMsg_GetMsgGroups(msg); AQH_NodeEndpoint_SetAcceptedMsgGroups(ep, groups); // TODO: send response? } void _handleIpcMsgForward(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { GWEN_MSG *msgOut; DBG_ERROR(AQH_LOGDOMAIN, "Received IPC FORWARD message"); msgOut=AQH_ForwardIpcMsg_GetCopyOfNodeMsg(msg); if (msgOut) { AQH_MsgEndpointMgr_DistributeMsgFromNodeEndpoint(emgr, ep, msgOut, AQH_MSGMGR_ENDPOINTGROUP_NODE, AQH_MSG_ENDPOINT_TTY_NAME); GWEN_Msg_free(msgOut); } } void _handleMsgValue2(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { AQH_MSG_MANAGER *xmgr; xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); if (xmgr) { AQH_NODE_INFO *ni; uint32_t uid; uid=AQH_Value2Msg_GetUid(msg); ni=_getOrCreateNodeAndUpdateUidAddr(emgr, msg, uid); if (ni==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } } } void _handleMsgNeedAddress(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { AQH_MSG_MANAGER *xmgr; xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); if (xmgr) { AQH_NODE_INFO *ni; uint32_t uid; uid=AQH_NeedAddrMsg_GetUid(msg); ni=_getOrCreateNodeAndUpdateUidAddr(emgr, msg, uid); if (ni==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } } } void _handleMsgClaimAddress(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { AQH_MSG_MANAGER *xmgr; xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); if (xmgr) { AQH_NODE_INFO *ni; uint32_t uid; uid=AQH_ClaimAddrMsg_GetUid(msg); ni=_getOrCreateNodeAndUpdateUidAddr(emgr, msg, uid); if (ni==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } } } void _handleMsgHaveAddress(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { AQH_MSG_MANAGER *xmgr; xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); if (xmgr) { AQH_NODE_INFO *ni; uint32_t uid; uid=AQH_HaveAddrMsg_GetUid(msg); ni=_getOrCreateNodeAndUpdateUidAddr(emgr, msg, uid); if (ni==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } } } void _handleMsgComSendStat(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { AQH_MSG_MANAGER *xmgr; xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); if (xmgr) { AQH_NODE_INFO *ni; uint32_t uid; uid=AQH_SendStatsMsg_GetUid(msg); ni=_getOrCreateNodeAndUpdateUidAddr(emgr, msg, uid); if (ni==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } AQH_NodeInfo_SetStatsPacketsOut(ni, AQH_SendStatsMsg_GetPacketsOut(msg)); AQH_NodeInfo_SetStatsCollisions(ni, AQH_SendStatsMsg_GetCollisions(msg)); AQH_NodeInfo_SetStatsBusy(ni, AQH_SendStatsMsg_GetBusyErrors(msg)); AQH_NodeDb_SetModified(xmgr->nodeDb); } } void _handleMsgComRecvStat(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { AQH_MSG_MANAGER *xmgr; xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); if (xmgr) { AQH_NODE_INFO *ni; uint32_t uid; uid=AQH_RecvStatsMsg_GetUid(msg); ni=_getOrCreateNodeAndUpdateUidAddr(emgr, msg, uid); if (ni==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } AQH_NodeInfo_SetStatsPacketsIn(ni, AQH_RecvStatsMsg_GetPacketsIn(msg)); AQH_NodeInfo_SetStatsCrcErrors(ni, AQH_RecvStatsMsg_GetCrcErrors(msg)); AQH_NodeInfo_SetStatsIoErrors(ni, AQH_RecvStatsMsg_GetIoErrors(msg)); AQH_NodeDb_SetModified(xmgr->nodeDb); } } void _handleMsgDevice(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) { AQH_MSG_MANAGER *xmgr; xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); if (xmgr) { AQH_NODE_INFO *ni; uint32_t uid; uid=AQH_DeviceMsg_GetUid(msg); ni=_getOrCreateNodeAndUpdateUidAddr(emgr, msg, uid); if (ni) { AQH_NodeInfo_SetFirmwareType(ni, AQH_DeviceMsg_GetFirmwareType(msg)); AQH_NodeInfo_SetFirmwareVersion(ni, (AQH_DeviceMsg_GetFirmwareHigh(msg)<<8) | AQH_DeviceMsg_GetFirmwareLow(msg)); AQH_NodeInfo_SetModules(ni, AQH_DeviceMsg_GetModuleMask(msg)); AQH_NodeDb_SetModified(xmgr->nodeDb); } else { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } } } AQH_NODE_INFO *_getOrCreateNodeAndUpdateUidAddr(GWEN_MSG_ENDPOINT_MGR *emgr, const GWEN_MSG *msg, uint32_t uid) { AQH_MSG_MANAGER *xmgr; xmgr=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_MANAGER, emgr); if (xmgr) { uint8_t busAddr; AQH_NODE_INFO *ni; busAddr=AQH_NodeMsg_GetSourceAddress(msg); ni=AQH_NodeDb_GetNodeInfoByUid(xmgr->nodeDb, uid); if (ni) { uint8_t storedBusAddr; storedBusAddr=AQH_NodeInfo_GetBusAddress(ni); if (busAddr!=0 && storedBusAddr!=busAddr) { DBG_INFO(AQH_LOGDOMAIN, "Changed busaddr for %08x from %02x to %02x", uid, storedBusAddr, busAddr); AQH_NodeInfo_SetBusAddress(ni, busAddr); AQH_NodeDb_SetModified(xmgr->nodeDb); } } else { int rv; ni=AQH_NodeInfo_new(); AQH_NodeInfo_SetBusAddress(ni, busAddr); AQH_NodeInfo_SetUid(ni, uid); rv=AQH_NodeDb_AddNodeInfo(xmgr->nodeDb, ni); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); AQH_NodeInfo_free(ni); return NULL; } else { DBG_INFO(AQH_LOGDOMAIN, "Added node %08x (%02x)", uid, busAddr); } } return ni; } return NULL; }