diff --git a/apps/aqhome-nodes/0BUILD b/apps/aqhome-nodes/0BUILD index f43e7ba..9f2545f 100644 --- a/apps/aqhome-nodes/0BUILD +++ b/apps/aqhome-nodes/0BUILD @@ -35,16 +35,6 @@ - aqhomed.h - aqhomed_p.h - init.h - fini.h - loop.h - loop_broker.h - loop_tty.h - loop_tty_ipc.h - loop_tty_broker.h - loop_ipc.h db.h tty_log.h devicesread.h @@ -54,21 +44,12 @@ $(local/typefiles) - main.c - aqhomed.c - init.c - fini.c - loop.c - loop_broker.c - loop_tty.c - loop_tty_ipc.c - loop_tty_broker.c - loop_ipc.c + server.c db.c - tty_log.c devicesread.c devicesdump.c r_setdata.c + main.c diff --git a/apps/aqhome-nodes/db.c b/apps/aqhome-nodes/db.c index 0c2618d..c328a7b 100644 --- a/apps/aqhome-nodes/db.c +++ b/apps/aqhome-nodes/db.c @@ -1,6 +1,6 @@ /**************************************************************************** * This file is part of the project AqHome. - * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. @@ -12,28 +12,25 @@ #include "./db.h" -#include "./aqhomed_p.h" +#include "./server_p.h" #include "aqhome/aqhome.h" -#include "aqhome/msg/msg_node.h" -#include "aqhome/msg/msg_sendstats.h" -#include "aqhome/msg/msg_recvstats.h" -#include "aqhome/msg/msg_value2.h" -#include "aqhome/msg/msg_value3.h" -#include "aqhome/msg/msg_needaddr.h" -#include "aqhome/msg/msg_claimaddr.h" -#include "aqhome/msg/msg_haveaddr.h" -#include "aqhome/msg/msg_device.h" -#include "aqhome/msg/msg_flashready.h" +#include "aqhome/msg/node/m_node.h" +#include "aqhome/msg/node/m_sendstats.h" +#include "aqhome/msg/node/m_recvstats.h" +#include "aqhome/msg/node/m_value.h" +#include "aqhome/msg/node/m_addr.h" +#include "aqhome/msg/node/m_device.h" +#include "aqhome/msg/node/m_flashready.h" #include "aqhome/data/value.h" -#include "aqhome/ipc/data/ipc_data.h" -#include "aqhome/ipc/data/msg_data_values.h" +#include "aqhome/msg/ipc/data/m_ipcd.h" +#include "aqhome/msg/ipc/data/m_ipcd_values.h" +#include "aqhome/ipc2/endpoint.h" #include #include #include -#include #include @@ -43,23 +40,20 @@ * ------------------------------------------------------------------------------------------------ */ -static void _handleMsgValue2(AQHOMED *aqh, const GWEN_MSG *msg); -static void _handleMsgValue3(AQHOMED *aqh, const GWEN_MSG *msg); -static void _handleMsgNeedAddress(AQHOMED *aqh, const GWEN_MSG *msg); -static void _handleMsgClaimAddress(AQHOMED *aqh, const GWEN_MSG *msg); -static void _handleMsgHaveAddress(AQHOMED *aqh, const GWEN_MSG *msg); -static void _handleMsgComSendStat(AQHOMED *aqh, const GWEN_MSG *msg); -static void _handleMsgComRecvStat(AQHOMED *aqh, const GWEN_MSG *msg); -static void _handleMsgDevice(AQHOMED *aqh, const GWEN_MSG *msg); -static void _handleMsgFlashReady(AQHOMED *aqh, const GWEN_MSG *msg); +static void _handleMsgValue(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _handleAddressMsg(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _handleMsgComSendStat(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _handleMsgComRecvStat(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _handleMsgDevice(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _handleMsgFlashReady(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); -static AQH_NODE_INFO *_getOrCreateNodeAndUpdateUidAddr(AQHOMED *aqh, const GWEN_MSG *msg, uint32_t uid); +static AQH_NODE_INFO *_getOrCreateNodeAndUpdateUidAddr(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg, uint32_t uid); static void _updateTimestampLastChange(AQH_NODE_INFO *ni); -static void _assignDeviceId(AQHOMED *aqh, AQH_NODE_INFO *ni, uint32_t uid); +static void _assignDeviceId(AQH_OBJECT *o, AQH_NODE_INFO *ni, uint32_t uid); -static void _announceNodeValues(AQHOMED *aqh, const AQH_NODE_INFO *ni); +static void _announceNodeValues(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_NODE_INFO *ni); static void _setDeviceName(AQH_VALUE *value, uint32_t uid); -static void _announceValue(AQHOMED *aqh, uint32_t uid, const AQHNODE_VALUE *v); +static void _announceValue(AQH_NODE_SERVER *xo, uint32_t uid, const AQHNODE_VALUE *v); @@ -69,61 +63,57 @@ static void _announceValue(AQHOMED *aqh, uint32_t uid, const AQHNODE_VALUE *v); * ------------------------------------------------------------------------------------------------ */ -void AqHomed_NodeMsgToDb(AQHOMED *aqh, const GWEN_MSG *msg) +void AQH_NodeServer_NodeMsgToDb(AQH_OBJECT *o, const AQH_MESSAGE *msg) { - int msgIsValid; - uint8_t msgType; + AQH_NODE_SERVER *xo; - DBG_INFO(AQH_LOGDOMAIN, - " - msg %d (%s) from %d to %d", - AQH_NodeMsg_GetMsgType(msg), - AQH_NodeMsg_MsgTypeToChar(AQH_NodeMsg_GetMsgType(msg)), - AQH_NodeMsg_GetSourceAddress(msg), - AQH_NodeMsg_GetDestAddress(msg)); + xo=AQH_NodeServer_GetServerData(o); + if (xo) { + uint8_t msgType; - msgIsValid=(AQH_NodeMsg_IsChecksumValid(msg) && AQH_NodeMsg_IsMsgComplete(msg)); - msgType=AQH_NodeMsg_GetMsgType(msg); + msgType=AQH_NodeMessage_GetMsgType(msg); - if (msgIsValid) { switch(msgType) { - case AQH_MSG_TYPE_COMSENDSTATS: _handleMsgComSendStat(aqh, msg); break; - case AQH_MSG_TYPE_COMRECVSTATS: _handleMsgComRecvStat(aqh, msg); break; - case AQH_MSG_TYPE_VALUE2: _handleMsgValue2(aqh, msg); break; - case AQH_MSG_TYPE_VALUE_REPORT: _handleMsgValue3(aqh, msg); break; - case AQH_MSG_TYPE_NEED_ADDRESS: _handleMsgNeedAddress(aqh, msg); break; - case AQH_MSG_TYPE_CLAIM_ADDRESS: _handleMsgClaimAddress(aqh, msg); break; - case AQH_MSG_TYPE_HAVE_ADDRESS: _handleMsgHaveAddress(aqh, msg); break; - case AQH_MSG_TYPE_DEVICE: _handleMsgDevice(aqh, msg); break; - case AQH_MSG_TYPE_FLASH_READY: _handleMsgFlashReady(aqh, msg); break; - default: break; + case AQH_MSG_TYPE_COMSENDSTATS: _handleMsgComSendStat(xo, msg); break; + case AQH_MSG_TYPE_COMRECVSTATS: _handleMsgComRecvStat(xo, msg); break; + case AQH_MSG_TYPE_VALUE_REPORT: _handleMsgValue(xo, msg); break; + case AQH_MSG_TYPE_NEED_ADDRESS: _handleAddressMsg(xo, msg); break; + case AQH_MSG_TYPE_CLAIM_ADDRESS: _handleAddressMsg(xo, msg); break; + case AQH_MSG_TYPE_HAVE_ADDRESS: _handleAddressMsg(xo, msg); break; + case AQH_MSG_TYPE_DEVICE: _handleMsgDevice(o, xo, msg); break; + case AQH_MSG_TYPE_FLASH_READY: _handleMsgFlashReady(o, xo, msg); break; + default: break; } } } -void AqHomed_WriteNodeDb(AQHOMED *aqh) +void AQH_NodeServer_WriteNodeDb(AQH_OBJECT *o) { - if (aqh->dbFile) { + AQH_NODE_SERVER *xo; + + xo=AQH_NodeServer_GetServerData(o); + if (xo && xo->dbFile) { GWEN_DB_NODE *dbNodeDb; - AQH_NodeDb_ClearModified(aqh->nodeDb); + AQH_NodeDb_ClearModified(xo->nodeDb); dbNodeDb=GWEN_DB_Group_new("nodeDb"); - AQH_NodeDb_toDb(aqh->nodeDb, dbNodeDb); - GWEN_DB_WriteFile(dbNodeDb, aqh->dbFile, GWEN_DB_FLAGS_DEFAULT); + AQH_NodeDb_toDb(xo->nodeDb, dbNodeDb); + GWEN_DB_WriteFile(dbNodeDb, xo->dbFile, GWEN_DB_FLAGS_DEFAULT); GWEN_DB_Group_free(dbNodeDb); } } -void _handleMsgValue2(AQHOMED *aqh, const GWEN_MSG *msg) +void _handleMsgValue(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) { AQH_NODE_INFO *ni; uint32_t uid; - uid=AQH_Value2Msg_GetUid(msg); - ni=_getOrCreateNodeAndUpdateUidAddr(aqh, msg, uid); + uid=AQH_ValueMessage_GetUid(msg); + ni=_getOrCreateNodeAndUpdateUidAddr(xo, msg, uid); if (ni==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } @@ -131,13 +121,13 @@ void _handleMsgValue2(AQHOMED *aqh, const GWEN_MSG *msg) -void _handleMsgValue3(AQHOMED *aqh, const GWEN_MSG *msg) +void _handleAddressMsg(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) { AQH_NODE_INFO *ni; uint32_t uid; - uid=AQH_Value3Msg_GetUid(msg); - ni=_getOrCreateNodeAndUpdateUidAddr(aqh, msg, uid); + uid=AQH_AddrMessage_GetUid(msg); + ni=_getOrCreateNodeAndUpdateUidAddr(xo, msg, uid); if (ni==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } @@ -145,111 +135,69 @@ void _handleMsgValue3(AQHOMED *aqh, const GWEN_MSG *msg) -void _handleMsgNeedAddress(AQHOMED *aqh, const GWEN_MSG *msg) +void _handleMsgComSendStat(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) { AQH_NODE_INFO *ni; uint32_t uid; - uid=AQH_NeedAddrMsg_GetUid(msg); - ni=_getOrCreateNodeAndUpdateUidAddr(aqh, msg, uid); + uid=AQH_SendStatsMessage_GetUid(msg); + ni=_getOrCreateNodeAndUpdateUidAddr(xo, msg, uid); if (ni==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } -} - - - -void _handleMsgClaimAddress(AQHOMED *aqh, const GWEN_MSG *msg) -{ - AQH_NODE_INFO *ni; - uint32_t uid; - - uid=AQH_ClaimAddrMsg_GetUid(msg); - ni=_getOrCreateNodeAndUpdateUidAddr(aqh, msg, uid); - if (ni==NULL) { - DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); - } -} - - - -void _handleMsgHaveAddress(AQHOMED *aqh, const GWEN_MSG *msg) -{ - AQH_NODE_INFO *ni; - uint32_t uid; - - uid=AQH_HaveAddrMsg_GetUid(msg); - ni=_getOrCreateNodeAndUpdateUidAddr(aqh, msg, uid); - if (ni==NULL) { - DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); - } -} - - - -void _handleMsgComSendStat(AQHOMED *aqh, const GWEN_MSG *msg) -{ - AQH_NODE_INFO *ni; - uint32_t uid; - - uid=AQH_SendStatsMsg_GetUid(msg); - ni=_getOrCreateNodeAndUpdateUidAddr(aqh, msg, uid); - if (ni==NULL) { - DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); - } - AQH_NodeInfo_SetStatsPacketsOut(ni, AQH_SendStatsMsg_GetPacketsOut(msg)); - AQH_NodeInfo_SetStatsCollisions(ni, AQH_SendStatsMsg_GetCollisions(msg)); - AQH_NodeInfo_SetStatsBusy(ni, AQH_SendStatsMsg_GetBusyErrors(msg)); - AQH_NodeDb_SetModified(aqh->nodeDb); + AQH_NodeInfo_SetStatsPacketsOut(ni, AQH_SendStatsMessage_GetPacketsOut(msg)); + AQH_NodeInfo_SetStatsCollisions(ni, AQH_SendStatsMessage_GetCollisions(msg)); + AQH_NodeInfo_SetStatsBusy(ni, AQH_SendStatsMessage_GetBusyErrors(msg)); + AQH_NodeDb_SetModified(xo->nodeDb); _updateTimestampLastChange(ni); } -void _handleMsgComRecvStat(AQHOMED *aqh, const GWEN_MSG *msg) +void _handleMsgComRecvStat(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) { AQH_NODE_INFO *ni; uint32_t uid; - uid=AQH_RecvStatsMsg_GetUid(msg); - ni=_getOrCreateNodeAndUpdateUidAddr(aqh, msg, uid); + uid=AQH_RecvStatsMessage_GetUid(msg); + ni=_getOrCreateNodeAndUpdateUidAddr(xo, msg, uid); if (ni==NULL) { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); } - AQH_NodeInfo_SetStatsPacketsIn(ni, AQH_RecvStatsMsg_GetPacketsIn(msg)); - AQH_NodeInfo_SetStatsCrcErrors(ni, AQH_RecvStatsMsg_GetCrcErrors(msg)); - AQH_NodeInfo_SetStatsIoErrors(ni, AQH_RecvStatsMsg_GetIoErrors(msg)); - AQH_NodeDb_SetModified(aqh->nodeDb); + AQH_NodeInfo_SetStatsPacketsIn(ni, AQH_RecvStatsMessage_GetPacketsIn(msg)); + AQH_NodeInfo_SetStatsCrcErrors(ni, AQH_RecvStatsMessage_GetCrcErrors(msg)); + AQH_NodeInfo_SetStatsIoErrors(ni, AQH_RecvStatsMessage_GetIoErrors(msg)); + AQH_NodeDb_SetModified(xo->nodeDb); _updateTimestampLastChange(ni); } -void _handleMsgDevice(AQHOMED *aqh, const GWEN_MSG *msg) +void _handleMsgDevice(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) { AQH_NODE_INFO *ni; uint32_t uid; - uid=AQH_DeviceMsg_GetUid(msg); - ni=_getOrCreateNodeAndUpdateUidAddr(aqh, msg, uid); + uid=AQH_DeviceMessage_GetUid(msg); + ni=_getOrCreateNodeAndUpdateUidAddr(xo, msg, uid); if (ni) { const char *s; - AQH_NodeInfo_SetManufacturer(ni, AQH_DeviceMsg_GetManufacturer(msg)); - AQH_NodeInfo_SetDeviceType(ni, AQH_DeviceMsg_GetDeviceType(msg)); - AQH_NodeInfo_SetDeviceVersion(ni, (AQH_DeviceMsg_GetDeviceVersion(msg)<<8)+AQH_DeviceMsg_GetDeviceRevision(msg)); + AQH_NodeInfo_SetManufacturer(ni, AQH_DeviceMessage_GetManufacturer(msg)); + AQH_NodeInfo_SetDeviceType(ni, AQH_DeviceMessage_GetDeviceType(msg)); + AQH_NodeInfo_SetDeviceVersion(ni, (AQH_DeviceMessage_GetDeviceVersion(msg)<<8)+AQH_DeviceMessage_GetDeviceRevision(msg)); AQH_NodeInfo_SetFirmwareVersion(ni, - (AQH_DeviceMsg_GetFirmwareVariant(msg)<<24) | - (AQH_DeviceMsg_GetFirmwareVersionMajor(msg)<<16) | - (AQH_DeviceMsg_GetFirmwareVersionMinor(msg)<<8) | - AQH_DeviceMsg_GetFirmwareVersionPatchlevel(msg)); + (AQH_DeviceMessage_GetFirmwareVariant(msg)<<24) | + (AQH_DeviceMessage_GetFirmwareVersionMajor(msg)<<16) | + (AQH_DeviceMessage_GetFirmwareVersionMinor(msg)<<8) | + AQH_DeviceMessage_GetFirmwareVersionPatchlevel(msg)); s=AQH_NodeInfo_GetDeviceId(ni); if (!(s && *s)) - _assignDeviceId(aqh, ni, uid); + _assignDeviceId(o, ni, uid); _updateTimestampLastChange(ni); - AQH_NodeDb_SetModified(aqh->nodeDb); + AQH_NodeDb_SetModified(xo->nodeDb); if (uid!=0x00000000L && uid!=0xffffffff) - _announceNodeValues(aqh, ni); + _announceNodeValues(o, xo, ni); } else { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); @@ -258,7 +206,7 @@ void _handleMsgDevice(AQHOMED *aqh, const GWEN_MSG *msg) -void _announceNodeValues(AQHOMED *aqh, const AQH_NODE_INFO *ni) +void _announceNodeValues(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_NODE_INFO *ni) { const char *devName; @@ -266,7 +214,7 @@ void _announceNodeValues(AQHOMED *aqh, const AQH_NODE_INFO *ni) if (devName) { const AQHNODE_DEVICE *devInfo; - devInfo=AqHomed_GetDeviceDefByName(aqh, devName); + devInfo=AQH_NodeServer_GetDeviceDefByName(o, devName); if (devInfo) { const AQHNODE_VALUE_LIST *valueList; @@ -280,7 +228,7 @@ void _announceNodeValues(AQHOMED *aqh, const AQH_NODE_INFO *ni) AQH_NodeInfo_GetUid(ni), AQHNODE_Value_GetName(v), AQHNODE_Value_GetModality(v), AQH_ValueModality_toString(AQHNODE_Value_GetModality(v))); - _announceValue(aqh, AQH_NodeInfo_GetUid(ni), v); + _announceValue(xo, AQH_NodeInfo_GetUid(ni), v); v=AQHNODE_Value_List_Next(v); } } @@ -308,10 +256,10 @@ void _setDeviceName(AQH_VALUE *value, uint32_t uid) -void _announceValue(AQHOMED *aqh, uint32_t uid, const AQHNODE_VALUE *v) +void _announceValue(AQH_NODE_SERVER *xo, uint32_t uid, const AQHNODE_VALUE *v) { AQH_VALUE *value; - GWEN_MSG *msg; + AQH_MESSAGE *msg; value=AQH_Value_new(); _setDeviceName(value, uid); @@ -320,41 +268,41 @@ void _announceValue(AQHOMED *aqh, uint32_t uid, const AQHNODE_VALUE *v) AQH_Value_SetValueUnits(value, AQHNODE_Value_GetValueUnits(v)); AQH_Value_SetValueType(value, AQHNODE_Value_GetValueType(v)); AQH_Value_SetModality(value, AQHNODE_Value_GetModality(v)); - - msg=AQH_ValuesDataIpcMsg_newForOneValue(AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE, - GWEN_MsgEndpoint_GetNextMessageId(aqh->brokerEndpoint), 0, - 0, value); - GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, msg); + + msg=AQH_IpcdMessageValues_newForOne(AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE, + AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, + 0, value); + AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, msg); AQH_Value_free(value); } -void _handleMsgFlashReady(AQHOMED *aqh, const GWEN_MSG *msg) +void _handleMsgFlashReady(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) { AQH_NODE_INFO *ni; uint32_t uid; - uid=AQH_FlashReadyMsg_GetUid(msg); - ni=_getOrCreateNodeAndUpdateUidAddr(aqh, msg, uid); + uid=AQH_FlashReadyMessage_GetUid(msg); + ni=_getOrCreateNodeAndUpdateUidAddr(xo, msg, uid); if (ni) { const char *s; - AQH_NodeInfo_SetManufacturer(ni, AQH_FlashReadyMsg_GetManufacturer(msg)); - AQH_NodeInfo_SetDeviceType(ni, AQH_FlashReadyMsg_GetDeviceType(msg)); - AQH_NodeInfo_SetDeviceVersion(ni, (AQH_FlashReadyMsg_GetDeviceVersion(msg)<<8)+AQH_FlashReadyMsg_GetDeviceRevision(msg)); + AQH_NodeInfo_SetManufacturer(ni, AQH_FlashReadyMessage_GetManufacturer(msg)); + AQH_NodeInfo_SetDeviceType(ni, AQH_FlashReadyMessage_GetDeviceType(msg)); + AQH_NodeInfo_SetDeviceVersion(ni, (AQH_FlashReadyMessage_GetDeviceVersion(msg)<<8)+AQH_FlashReadyMessage_GetDeviceRevision(msg)); AQH_NodeInfo_SetFirmwareVersion(ni, - (AQH_FlashReadyMsg_GetFirmwareVariant(msg)<<24) | - (AQH_FlashReadyMsg_GetFirmwareVersionMajor(msg)<<16) | - (AQH_FlashReadyMsg_GetFirmwareVersionMinor(msg)<<8) | - AQH_FlashReadyMsg_GetFirmwareVersionPatchlevel(msg)); + (AQH_FlashReadyMessage_GetFirmwareVariant(msg)<<24) | + (AQH_FlashReadyMessage_GetFirmwareVersionMajor(msg)<<16) | + (AQH_FlashReadyMessage_GetFirmwareVersionMinor(msg)<<8) | + AQH_FlashReadyMessage_GetFirmwareVersionPatchlevel(msg)); s=AQH_NodeInfo_GetDeviceId(ni); if (!(s && *s)) - _assignDeviceId(aqh, ni, uid); + _assignDeviceId(o, ni, uid); _updateTimestampLastChange(ni); - AQH_NodeDb_SetModified(aqh->nodeDb); + AQH_NodeDb_SetModified(xo->nodeDb); if (uid!=0x00000000L && uid!=0xffffffff) - _announceNodeValues(aqh, ni); + _announceNodeValues(o, xo, ni); } else { DBG_INFO(AQH_LOGDOMAIN, "Error handling message"); @@ -363,13 +311,13 @@ void _handleMsgFlashReady(AQHOMED *aqh, const GWEN_MSG *msg) -AQH_NODE_INFO *_getOrCreateNodeAndUpdateUidAddr(AQHOMED *aqh, const GWEN_MSG *msg, uint32_t uid) +AQH_NODE_INFO *_getOrCreateNodeAndUpdateUidAddr(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg, uint32_t uid) { uint8_t busAddr; AQH_NODE_INFO *ni; - busAddr=AQH_NodeMsg_GetSourceAddress(msg); - ni=AQH_NodeDb_GetNodeInfoByUid(aqh->nodeDb, uid); + busAddr=AQH_NodeMessage_GetSourceAddress(msg); + ni=AQH_NodeDb_GetNodeInfoByUid(xo->nodeDb, uid); if (ni) { uint8_t storedBusAddr; @@ -378,7 +326,7 @@ AQH_NODE_INFO *_getOrCreateNodeAndUpdateUidAddr(AQHOMED *aqh, const GWEN_MSG *ms DBG_INFO(AQH_LOGDOMAIN, "Changed busaddr for %08x from %02x to %02x", uid, storedBusAddr, busAddr); AQH_NodeInfo_SetBusAddress(ni, busAddr); _updateTimestampLastChange(ni); - AQH_NodeDb_SetModified(aqh->nodeDb); + AQH_NodeDb_SetModified(xo->nodeDb); } } else { @@ -388,7 +336,7 @@ AQH_NODE_INFO *_getOrCreateNodeAndUpdateUidAddr(AQHOMED *aqh, const GWEN_MSG *ms AQH_NodeInfo_SetBusAddress(ni, busAddr); AQH_NodeInfo_SetUid(ni, uid); _updateTimestampLastChange(ni); - rv=AQH_NodeDb_AddNodeInfo(aqh->nodeDb, ni); + rv=AQH_NodeDb_AddNodeInfo(xo->nodeDb, ni); if (rv<0) { DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); AQH_NodeInfo_free(ni); @@ -403,14 +351,14 @@ AQH_NODE_INFO *_getOrCreateNodeAndUpdateUidAddr(AQHOMED *aqh, const GWEN_MSG *ms -void _assignDeviceId(AQHOMED *aqh, AQH_NODE_INFO *ni, uint32_t uid) +void _assignDeviceId(AQH_OBJECT *o, AQH_NODE_INFO *ni, uint32_t uid) { const AQHNODE_DEVICE *dev; - dev=AqHomed_FindDeviceDef(aqh, - AQH_NodeInfo_GetManufacturer(ni), - AQH_NodeInfo_GetDeviceType(ni), - AQH_NodeInfo_GetDeviceVersion(ni)); + dev=AQH_NodeServer_FindDeviceDef(o, + AQH_NodeInfo_GetManufacturer(ni), + AQH_NodeInfo_GetDeviceType(ni), + AQH_NodeInfo_GetDeviceVersion(ni)); if (dev==NULL) { DBG_ERROR(NULL, "Unknown NODE device encountered (%08x, %04x, %04x)", diff --git a/apps/aqhome-nodes/db.h b/apps/aqhome-nodes/db.h index 4b12c76..dcbfae6 100644 --- a/apps/aqhome-nodes/db.h +++ b/apps/aqhome-nodes/db.h @@ -1,6 +1,6 @@ /**************************************************************************** * This file is part of the project AqHome. - * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. @@ -10,13 +10,15 @@ #define AQHOMED_DB_H -#include "./aqhomed.h" +#include "./server.h" + +#include "aqhome/events2/object.h" +#include "aqhome/ipc2/message.h" +void AQH_NodeServer_NodeMsgToDb(AQH_OBJECT *o, const AQH_MESSAGE *msg); -void AqHomed_NodeMsgToDb(AQHOMED *aqh, const GWEN_MSG *msg); - -void AqHomed_WriteNodeDb(AQHOMED *aqh); +void AQH_NodeServer_WriteNodeDb(AQH_OBJECT *o); diff --git a/apps/aqhome-nodes/devicesdump.c b/apps/aqhome-nodes/devicesdump.c index 7ace40e..c8a7f2b 100644 --- a/apps/aqhome-nodes/devicesdump.c +++ b/apps/aqhome-nodes/devicesdump.c @@ -34,7 +34,7 @@ static void _dumpValue(const AQHNODE_VALUE *value, GWEN_BUFFER *dbuf, int indent * ------------------------------------------------------------------------------------------------ */ -void AqHomeNodes_DumpDevices(const AQHNODE_DEVICE_LIST *devList, GWEN_BUFFER *dbuf) +void AQH_NodeServer_DumpDevices(const AQHNODE_DEVICE_LIST *devList, GWEN_BUFFER *dbuf) { if (devList && AQHNODE_Device_List_GetCount(devList)) { diff --git a/apps/aqhome-nodes/devicesdump.h b/apps/aqhome-nodes/devicesdump.h index 2ae750b..b49fac2 100644 --- a/apps/aqhome-nodes/devicesdump.h +++ b/apps/aqhome-nodes/devicesdump.h @@ -15,7 +15,7 @@ -void AqHomeNodes_DumpDevices(const AQHNODE_DEVICE_LIST *devList, GWEN_BUFFER *dbuf); +void AQH_NodeServer_DumpDevices(const AQHNODE_DEVICE_LIST *devList, GWEN_BUFFER *dbuf); diff --git a/apps/aqhome-nodes/devicesread.c b/apps/aqhome-nodes/devicesread.c index ba3b656..e147348 100644 --- a/apps/aqhome-nodes/devicesread.c +++ b/apps/aqhome-nodes/devicesread.c @@ -51,7 +51,7 @@ static int _readDeviceVersion(AQHNODE_DEVICE *device, GWEN_XMLNODE *deviceNode); * ------------------------------------------------------------------------------------------------ */ -AQHNODE_DEVICE_LIST *AqHomeNodes_ReadDeviceFile(const char *sFilename) +AQHNODE_DEVICE_LIST *AQH_NodeServer_ReadDeviceFile(const char *sFilename) { int rv; @@ -81,7 +81,7 @@ AQHNODE_DEVICE_LIST *AqHomeNodes_ReadDeviceFile(const char *sFilename) -AQHNODE_DEVICE_LIST *AqHomeNodes_ReadDataDeviceFiles() +AQHNODE_DEVICE_LIST *AQH_NodeServer_ReadDataDeviceFiles() { GWEN_STRINGLIST *sl; diff --git a/apps/aqhome-nodes/devicesread.h b/apps/aqhome-nodes/devicesread.h index ad34040..0573df7 100644 --- a/apps/aqhome-nodes/devicesread.h +++ b/apps/aqhome-nodes/devicesread.h @@ -15,8 +15,8 @@ -AQHNODE_DEVICE_LIST *AqHomeNodes_ReadDeviceFile(const char *sFilename); -AQHNODE_DEVICE_LIST *AqHomeNodes_ReadDataDeviceFiles(void); +AQHNODE_DEVICE_LIST *AQH_NodeServer_ReadDeviceFile(const char *sFilename); +AQHNODE_DEVICE_LIST *AQH_NodeServer_ReadDataDeviceFiles(void); diff --git a/apps/aqhome-nodes/main.c b/apps/aqhome-nodes/main.c index aeb69af..7f9e1ba 100644 --- a/apps/aqhome-nodes/main.c +++ b/apps/aqhome-nodes/main.c @@ -13,10 +13,7 @@ #include #include -#include "./aqhomed.h" -#include "./init.h" -#include "./fini.h" -#include "./loop.h" +#include "./server.h" #include #include @@ -37,6 +34,9 @@ #define I18N(msg) msg #define I18S(msg) msg +#define CONNCLEAN_INTERVAL_IN_SECS 2 +#define CONNCHECK_INTERVAL_IN_SECS 10 + /* ------------------------------------------------------------------------------------------------ @@ -44,12 +44,13 @@ * ------------------------------------------------------------------------------------------------ */ -static void _runService(AQHOMED *aqh); +static void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop); #ifdef HAVE_SIGNAL_H static int _setSignalHandlers(void); static int _setupSigAction(struct sigaction *sa, int sig); static void _signalHandler(int s); #endif +static int _diffInSeconds(time_t t1, time_t t0); @@ -74,7 +75,8 @@ static int stopService=0; int main(int argc, char **argv) { int rv; - AQHOMED *aqh; + AQH_EVENT_LOOP *eventLoop; + AQH_OBJECT *aqh; GWEN_GUI *gui; rv=GWEN_Init(); @@ -83,7 +85,7 @@ int main(int argc, char **argv) return 2; } - GWEN_Logger_Open(0, "aqhomed", 0, GWEN_LoggerType_Console, GWEN_LoggerFacility_User); + GWEN_Logger_Open(0, "aqhome-nodes", 0, GWEN_LoggerType_Console, GWEN_LoggerFacility_User); GWEN_Logger_SetLevel(0, GWEN_LoggerLevel_Warning); rv=_setSignalHandlers(); @@ -101,17 +103,18 @@ int main(int argc, char **argv) gui=GWEN_Gui_CGui_new(); GWEN_Gui_SetGui(gui); - aqh=AqHomed_new(); - rv=AqHomed_Init(aqh, argc, argv); + eventLoop=AQH_EventLoop_new(); + aqh=AQH_NodeServer_new(eventLoop); + rv=AQH_NodeServer_Init(aqh, argc, argv); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return 2; } - _runService(aqh); + _runService(aqh, eventLoop); - AqHomed_Fini(aqh); - AqHomed_free(aqh); + //AQH_NodeServer_Fini(aqh); + AQH_Object_free(aqh); GWEN_Gui_SetGui(NULL); GWEN_Gui_free(gui); @@ -121,24 +124,44 @@ int main(int argc, char **argv) -void _runService(AQHOMED *aqh) +void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop) { time_t timeStart; int timeout; + time_t timeLastConnectionCleanup; + time_t timeLastConnCheck; - timeout=AqHomed_GetTimeout(aqh); + timeout=AQH_NodeServer_GetTimeout(aqh); timeStart=time(NULL); + timeLastConnectionCleanup=time(NULL); + timeLastConnCheck=time(NULL); while(!stopService) { - AqHomed_Loop(aqh, 2000); - if (timeout) { - time_t now; + time_t now; - now=time(NULL); - if (timeout && ((int)difftime(now, timeStart))>timeout) { - DBG_INFO(NULL, "Timeout"); - break; - } + AQH_EventLoop_Run(eventLoop, 2000); + AQH_NodeServer_HandleTtyMsgs(aqh); + AQH_NodeServer_HandleClientMsgs(aqh); + AQH_NodeServer_HandleBrokerMsgs(aqh); + + now=time(NULL); + + if (_diffInSeconds(now, timeLastConnectionCleanup)>CONNCLEAN_INTERVAL_IN_SECS) { + DBG_ERROR(NULL, "Cleanup connections"); + AQH_NodeServer_CleanupClients(aqh); + timeLastConnectionCleanup=now; + } + + if (_diffInSeconds(now, timeLastConnCheck)>CONNCHECK_INTERVAL_IN_SECS) { + DBG_ERROR(NULL, "Write time"); + AQH_NodeServer_CheckBrokerConnection(aqh); + AQH_NodeServer_CheckTtyConnection(aqh); + timeLastConnCheck=now; + } + + if (timeout && (_diffInSeconds(now, timeStart)>timeout)) { + DBG_INFO(NULL, "Timeout"); + break; } } /* while */ } @@ -210,3 +233,10 @@ void _signalHandler(int s) } + +int _diffInSeconds(time_t t1, time_t t0) +{ + return t1-t0; +} + + diff --git a/apps/aqhome-nodes/r_setdata.c b/apps/aqhome-nodes/r_setdata.c index cb4fe7f..d9d6a59 100644 --- a/apps/aqhome-nodes/r_setdata.c +++ b/apps/aqhome-nodes/r_setdata.c @@ -1,6 +1,6 @@ /**************************************************************************** * This file is part of the project AqHome. - * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. @@ -12,14 +12,18 @@ #include "./r_setdata.h" -#include "./aqhomed_p.h" +#include "./server_p.h" #include "aqhome/aqhome.h" -#include "aqhome/ipc/endpoint_ipc.h" -#include "aqhome/ipc/msg_ipc_result.h" -#include "aqhome/ipc/data/msg_data_set.h" -#include "aqhome/ipc/data/ipc_data.h" -#include "aqhome/msg/msg_value3.h" +#include "aqhome/data/value.h" +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/m_ipc_result.h" +#include "aqhome/msg/ipc/m_ipc_tag16.h" +#include "aqhome/msg/ipc/data/m_ipcd.h" +#include "aqhome/msg/ipc/data/m_ipcd_setdata.h" +#include "aqhome/msg/node/m_node.h" +#include "aqhome/msg/node/m_value.h" +#include "aqhome/ipc2/endpoint.h" #include @@ -40,18 +44,20 @@ * ------------------------------------------------------------------------------------------------ */ -static AQH_NODE_INFO *_getNodeInfoFromValue(AQHOMED *aqh, const AQH_VALUE *value); +static AQH_NODE_INFO *_getNodeInfoFromValue(AQH_NODE_SERVER *xo, const AQH_VALUE *value); -static GWEN_MSG_REQUEST *_mkRequest_SetData(AQHOMED *aqh, - GWEN_MSG_ENDPOINT *ep, uint32_t requestMsgId, - int destAddr, int valueId, uint16_t dataVal, uint16_t dataDenom); +static AQH_MSG_REQUEST *_mkRequest_SetData(AQH_OBJECT *o, AQH_NODE_SERVER *xo, + AQH_OBJECT *ep, uint32_t requestMsgId, + int destAddr, int valueId, uint16_t dataVal, uint16_t dataDenom); -static void _rqSubRequestFinished(GWEN_MSG_REQUEST *rq, GWEN_MSG_REQUEST *subRq, int reason); -static void _rqAbort(GWEN_MSG_REQUEST *rq, int reason); +static void _rqSubRequestFinished(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST *subRq, int reason); +static void _rqAbort(AQH_MSG_REQUEST *rq, int reason); -static GWEN_MSG_REQUEST *_mkSubRequest_SetData(AQHOMED *aqh, int destAddr, int valueId, uint16_t dataVal, uint16_t dataDenom); -static int _subRqHandleResponse(GWEN_MSG_REQUEST *rq, GWEN_MSG *msg); -static void _subRqAbort(GWEN_MSG_REQUEST *rq, int reason); +static AQH_MSG_REQUEST *_mkSubRequest_SetData(AQH_OBJECT *o, AQH_NODE_SERVER *xo, + int destAddr, int valueId, uint16_t dataVal, uint16_t dataDenom); +static int _subRqHandleResponse(AQH_MSG_REQUEST *rq, const AQH_MESSAGE *msg); +static void _subRqAbort(AQH_MSG_REQUEST *rq, int reason); +static void _sendResponseResultToBroker(AQH_OBJECT *ep, uint32_t refMsgId, int result); @@ -61,171 +67,182 @@ static void _subRqAbort(GWEN_MSG_REQUEST *rq, int reason); */ -void AqHomeNodes_HandleSetData(AQHOMED *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *recvdMsg) +void AQH_NodeServer_HandleSetData(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg) { - uint32_t msgId; + if (o) { + AQH_NODE_SERVER *xo; - DBG_INFO(NULL, "Received IPC SetDataRequest message"); - msgId=GWEN_IpcMsg_GetMsgId(recvdMsg); + xo=AQH_NodeServer_GetServerData(o); + if (xo) { + uint32_t msgId; - if (aqh->ttyEndpoint && GWEN_MsgEndpoint_GetState(aqh->ttyEndpoint)==GWEN_MSG_ENDPOINT_STATE_CONNECTED) { - AQH_VALUE *value; + DBG_INFO(NULL, "Received IPC SetDataRequest message"); + msgId=AQH_IpcMessage_GetMsgId(recvdMsg); + + if (xo->ttyEndpoint) { + GWEN_TAG16_LIST *tagList; - AQH_SetDataIpcMsg_Parse(recvdMsg, 0); - value=AQH_SetDataIpcMsg_ReadValue(recvdMsg); - if (value) { - const char *varName; + tagList=AQH_IpcMessageTag16_ParsePayload(recvdMsg, 0); + if (tagList) { + AQH_VALUE *value; + + value=AQH_IpcdMessageSetData_ReadValue(tagList); + if (value) { + const char *varName; - varName=AQH_Value_GetName(value); - if (varName) { - char *data; + varName=AQH_Value_GetName(value); + if (varName) { + char *data; - data=AQH_SetDataIpcMsg_ReadData(recvdMsg); - if (data) { - AQH_NODE_INFO *nodeInfo; + data=AQH_IpcdMessageSetData_ReadData(tagList); + if (data) { + AQH_NODE_INFO *nodeInfo; - nodeInfo=_getNodeInfoFromValue(aqh, value); - if (nodeInfo) { - const char *devName; + nodeInfo=_getNodeInfoFromValue(xo, value); + if (nodeInfo) { + const char *devName; - devName=AQH_NodeInfo_GetDeviceId(nodeInfo); - if (devName) { - const AQHNODE_DEVICE *devInfo; + devName=AQH_NodeInfo_GetDeviceId(nodeInfo); + if (devName) { + const AQHNODE_DEVICE *devInfo; - devInfo=AqHomed_GetDeviceDefByName(aqh, devName); - if (devInfo) { - const AQHNODE_VALUE *devValue; + devInfo=AQH_NodeServer_GetDeviceDefByName(o, devName); + if (devInfo) { + const AQHNODE_VALUE *devValue; - devValue=AQHNODE_Value_List_GetByName(AQHNODE_Device_GetValueList(devInfo), varName); - if (devValue) { - uint16_t dataVal=0; - uint16_t dataDenom=0; + devValue=AQHNODE_Value_List_GetByName(AQHNODE_Device_GetValueList(devInfo), varName); + if (devValue) { + uint16_t dataVal=0; + uint16_t dataDenom=0; - if (AQH_ReadDataFromString(AQHNODE_Value_GetDataType(devValue), data, &dataVal, &dataDenom)==0) { - GWEN_MSG_REQUEST *rq; - int destAddr; + if (AQH_ReadDataFromString(AQHNODE_Value_GetDataType(devValue), data, &dataVal, &dataDenom)==0) { + AQH_MSG_REQUEST *rq; + int destAddr; - destAddr=AQH_NodeInfo_GetBusAddress(nodeInfo); - DBG_DEBUG(NULL, "Creating SETDATA request"); + destAddr=AQH_NodeInfo_GetBusAddress(nodeInfo); + DBG_DEBUG(NULL, "Creating SETDATA request"); - rq=_mkRequest_SetData(aqh, ep, msgId, destAddr, AQHNODE_Value_GetId(devValue), dataVal, dataDenom); - AqHomed_AddRequestToTree(aqh, rq); - /* done */ + rq=_mkRequest_SetData(o, xo, ep, msgId, destAddr, AQHNODE_Value_GetId(devValue), dataVal, dataDenom); + AQH_NodeServer_AddRequestToTree(o, rq); + /* done */ + } + else { + DBG_ERROR(NULL, "Bad data \"%s\"", data); + _sendResponseResultToBroker(ep, msgId, AQH_MSGDATA_RESULT_ERROR_BADDATA); + } + } + else { + DBG_ERROR(NULL, "Invalid value name \"%s\"", varName); + _sendResponseResultToBroker(ep, msgId, AQH_MSGDATA_RESULT_ERROR_INVALID); + } + } + else { + DBG_ERROR(NULL, "Unknown node \"%s\"", devName); + _sendResponseResultToBroker(ep, msgId, AQH_MSGDATA_RESULT_ERROR_INVALID); + } } else { - DBG_ERROR(NULL, "Bad data \"%s\"", data); - AQH_IpcEndpoint_SendResponseResult(ep, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_BADDATA); + DBG_ERROR(NULL, "Node not yet fully identified, come back later"); + _sendResponseResultToBroker(ep, msgId, AQH_MSGDATA_RESULT_ERROR_TRYAGAIN); } } else { - DBG_ERROR(NULL, "Invalid value name \"%s\"", varName); - AQH_IpcEndpoint_SendResponseResult(ep, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_INVALID); + DBG_ERROR(NULL, "No matching nodeinfo"); + _sendResponseResultToBroker(ep, msgId, AQH_MSGDATA_RESULT_ERROR_INVALID); } + free(data); } else { - DBG_ERROR(NULL, "Unknown node \"%s\"", devName); - AQH_IpcEndpoint_SendResponseResult(ep, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_INVALID); + DBG_ERROR(NULL, "No data"); + _sendResponseResultToBroker(ep, msgId, AQH_MSGDATA_RESULT_ERROR_NODATA); } } else { - DBG_ERROR(NULL, "Node not yet fully identified, come back later"); - AQH_IpcEndpoint_SendResponseResult(ep, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_TRYAGAIN); + DBG_ERROR(NULL, "No var name"); + _sendResponseResultToBroker(ep, msgId, AQH_MSGDATA_RESULT_ERROR_NODATA); } + AQH_Value_free(value); } else { - DBG_ERROR(NULL, "No matching nodeinfo"); - AQH_IpcEndpoint_SendResponseResult(ep, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_INVALID); + DBG_ERROR(NULL, "Could not read value from message"); } - free(data); - } - else { - DBG_ERROR(NULL, "No data"); - AQH_IpcEndpoint_SendResponseResult(ep, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_NODATA); + GWEN_Tag16_List_free(tagList); } } else { - DBG_ERROR(NULL, "No var name"); - AQH_IpcEndpoint_SendResponseResult(ep, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_NODATA); + DBG_ERROR(NULL, "TTY endpoint not connected"); + _sendResponseResultToBroker(ep, msgId, AQH_MSGDATA_RESULT_ERROR_IO); } - AQH_Value_free(value); } - else { - DBG_ERROR(NULL, "Could not read value from message"); - } - } - else { - DBG_ERROR(NULL, "TTY endpoint not connected"); - AQH_IpcEndpoint_SendResponseResult(ep, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_IO); } } - /* ------------------------------------------------------------------------------------------------ * IPC Request SETDATA */ -GWEN_MSG_REQUEST *_mkRequest_SetData(AQHOMED *aqh, - GWEN_MSG_ENDPOINT *ep, uint32_t requestMsgId, - int destAddr, int valueId, uint16_t dataVal, uint16_t dataDenom) +AQH_MSG_REQUEST *_mkRequest_SetData(AQH_OBJECT *o, AQH_NODE_SERVER *xo, + AQH_OBJECT *ep, uint32_t requestMsgId, + int destAddr, int valueId, uint16_t dataVal, uint16_t dataDenom) { - GWEN_MSG_REQUEST *rq; - GWEN_MSG_REQUEST *subRq; + AQH_MSG_REQUEST *rq; + AQH_MSG_REQUEST *subRq; - rq=GWEN_MsgRequest_new(); - GWEN_MsgRequest_SetPrivateData(rq, aqh); - GWEN_MsgRequest_SetEndpoint(rq, ep); - GWEN_MsgRequest_SetRequestMsgId(rq, requestMsgId); - GWEN_MsgRequest_SetSubRequestFinishedFn(rq, _rqSubRequestFinished); - GWEN_MsgRequest_SetAbortFn(rq, _rqAbort); - GWEN_MsgRequest_SetTimestamps(rq, R_SETDATA_REQUEST_EXPIRE_SECS); + rq=AQH_MsgRequest_new(); + AQH_MsgRequest_SetPrivateData(rq, o); + AQH_MsgRequest_SetEndpoint(rq, ep); + AQH_MsgRequest_SetRequestMsgId(rq, requestMsgId); + AQH_MsgRequest_SetSubRequestFinishedFn(rq, _rqSubRequestFinished); + AQH_MsgRequest_SetAbortFn(rq, _rqAbort); + AQH_MsgRequest_SetTimestamps(rq, R_SETDATA_REQUEST_EXPIRE_SECS); - subRq=_mkSubRequest_SetData(aqh, destAddr, valueId, dataVal, dataDenom); - GWEN_MsgRequest_Tree2_AddChild(rq, subRq); + subRq=_mkSubRequest_SetData(o, xo, destAddr, valueId, dataVal, dataDenom); + AQH_MsgRequest_Tree2_AddChild(rq, subRq); return rq; } -void _rqSubRequestFinished(GWEN_MSG_REQUEST *rq, GWEN_MSG_REQUEST *subRq, int reason) +void _rqSubRequestFinished(AQH_MSG_REQUEST *rq, AQH_MSG_REQUEST *subRq, int reason) { - GWEN_MSG_ENDPOINT *ep; + AQH_OBJECT *ep; uint32_t refMsgId; int result; DBG_INFO(NULL, "SubRequest finished (reason: %d)", reason); - refMsgId=GWEN_MsgRequest_GetRequestMsgId(rq); - ep=GWEN_MsgRequest_GetEndpoint(rq); - result=GWEN_MsgRequest_GetResult(subRq); + refMsgId=AQH_MsgRequest_GetRequestMsgId(rq); + ep=AQH_MsgRequest_GetEndpoint(rq); + result=AQH_MsgRequest_GetResult(subRq); - if (reason==GWEN_MSG_REQUEST_REASON_ABORTED) - AQH_IpcEndpoint_SendResponseResult(ep, refMsgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_GENERIC); + if (reason==AQH_MSG_REQUEST_REASON_ABORTED) + _sendResponseResultToBroker(ep, refMsgId, AQH_MSGDATA_RESULT_ERROR_GENERIC); else - AQH_IpcEndpoint_SendResponseResult(ep, refMsgId, AQH_MSGTYPE_IPC_DATA_RESULT, result); + _sendResponseResultToBroker(ep, refMsgId, result); - GWEN_MsgRequest_SetResult(rq, result); - GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE); + AQH_MsgRequest_SetResult(rq, result); + AQH_MsgRequest_SetState(rq, AQH_MSG_REQUEST_STATE_DONE); } -void _rqAbort(GWEN_MSG_REQUEST *rq, int reason) +void _rqAbort(AQH_MSG_REQUEST *rq, int reason) { - GWEN_MSG_ENDPOINT *ep; + AQH_OBJECT *ep; uint32_t refMsgId; - GWEN_MSG_REQUEST *rqParent; + AQH_MSG_REQUEST *rqParent; DBG_INFO(NULL, "Aborting request"); - refMsgId=GWEN_MsgRequest_GetRequestMsgId(rq); - ep=GWEN_MsgRequest_GetEndpoint(rq); - AQH_IpcEndpoint_SendResponseResult(ep, refMsgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_GENERIC); - GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE); + refMsgId=AQH_MsgRequest_GetRequestMsgId(rq); + ep=AQH_MsgRequest_GetEndpoint(rq); + _sendResponseResultToBroker(ep, refMsgId, AQH_MSGDATA_RESULT_ERROR_GENERIC); + AQH_MsgRequest_SetState(rq, AQH_MSG_REQUEST_STATE_DONE); - rqParent=GWEN_MsgRequest_Tree2_GetParent(rq); + rqParent=AQH_MsgRequest_Tree2_GetParent(rq); if (rqParent) - GWEN_MsgRequest_SubRequestFinished(rqParent, rq, reason); + AQH_MsgRequest_SubRequestFinished(rqParent, rq, reason); } @@ -238,91 +255,100 @@ void _rqAbort(GWEN_MSG_REQUEST *rq, int reason) */ -GWEN_MSG_REQUEST *_mkSubRequest_SetData(AQHOMED *aqh, int destAddr, int valueId, uint16_t dataVal, uint16_t dataDenom) +AQH_MSG_REQUEST *_mkSubRequest_SetData(AQH_OBJECT *o, AQH_NODE_SERVER *xo, + int destAddr, int valueId, uint16_t dataVal, uint16_t dataDenom) { - GWEN_MSG_REQUEST *rq; + AQH_MSG_REQUEST *rq; uint16_t msgId; - GWEN_MSG *msgOut; + AQH_MESSAGE *msgOut; - rq=GWEN_MsgRequest_new(); - GWEN_MsgRequest_SetPrivateData(rq, aqh); - GWEN_MsgRequest_SetEndpoint(rq, aqh->ttyEndpoint); + rq=AQH_MsgRequest_new(); + AQH_MsgRequest_SetPrivateData(rq, o); + AQH_MsgRequest_SetEndpoint(rq, xo->ttyEndpoint); - GWEN_MsgRequest_SetHandleResponseFn(rq, _subRqHandleResponse); - GWEN_MsgRequest_SetAbortFn(rq, _subRqAbort); + AQH_MsgRequest_SetHandleResponseFn(rq, _subRqHandleResponse); + AQH_MsgRequest_SetAbortFn(rq, _subRqAbort); - msgId=GWEN_MsgEndpoint_GetNextMessageId(aqh->ttyEndpoint) & 0xffff; - GWEN_MsgRequest_SetRequestMsgId(rq, msgId); - GWEN_MsgRequest_SetTimestamps(rq, R_SETDATA_SUBREQUEST_EXPIRE_SECS); + msgId=AQH_Endpoint_GetNextMessageId(xo->ttyEndpoint) & 0xffff; + AQH_MsgRequest_SetRequestMsgId(rq, msgId); + AQH_MsgRequest_SetTimestamps(rq, R_SETDATA_SUBREQUEST_EXPIRE_SECS); - msgOut=AQH_Value3Msg_new(aqh->nodeAddress, destAddr, AQH_MSG_TYPE_VALUE_SET, msgId, valueId, dataVal, dataDenom); - GWEN_MsgEndpoint_AddSendMessage(aqh->ttyEndpoint, msgOut); + msgOut=AQH_ValueMessage_new(xo->nodeAddress, destAddr, AQH_MSG_TYPE_VALUE_SET, msgId, valueId, dataVal, dataDenom); + AQH_Endpoint_AddMsgOut(xo->ttyEndpoint, msgOut); return rq; } -int _subRqHandleResponse(GWEN_MSG_REQUEST *rq, GWEN_MSG *msg) +int _subRqHandleResponse(AQH_MSG_REQUEST *rq, const AQH_MESSAGE *msg) { - AQHOMED *aqh; - uint8_t destAddr; + AQH_OBJECT *o; - DBG_DEBUG(NULL, "Checking message from %02x", AQH_NodeMsg_GetSourceAddress(msg)); - aqh=(AQHOMED*)GWEN_MsgRequest_GetPrivateData(rq); + DBG_DEBUG(NULL, "Checking message from %02x", AQH_NodeMessage_GetSourceAddress(msg)); + o=(AQH_OBJECT*)AQH_MsgRequest_GetPrivateData(rq); + if (o) { + AQH_NODE_SERVER *xo; - destAddr=AQH_NodeMsg_GetDestAddress(msg); - if (destAddr==0xff || destAddr==aqh->nodeAddress) { - uint8_t msgCode; + xo=AQH_NodeServer_GetServerData(o); + if (xo) { + uint8_t destAddr; - msgCode=AQH_NodeMsg_GetMsgType(msg); - if (msgCode==AQH_MSG_TYPE_VALUE_SET_ACK || msgCode==AQH_MSG_TYPE_VALUE_SET_NACK) { - uint16_t msgId; + destAddr=AQH_NodeMessage_GetDestAddress(msg); + if (destAddr==0xff || destAddr==xo->nodeAddress) { + uint8_t msgCode; - msgId=AQH_Value3Msg_GetMsgId(msg); - if (msgId==GWEN_MsgRequest_GetRequestMsgId(rq)) { - GWEN_MSG_REQUEST *rqParent; + msgCode=AQH_NodeMessage_GetMsgType(msg); + if (msgCode==AQH_MSG_TYPE_VALUE_SET_ACK || msgCode==AQH_MSG_TYPE_VALUE_SET_NACK) { + uint16_t msgId; - DBG_INFO(NULL, - "Received response (%02x) for msg id %04x from %02x", - msgCode, msgId, AQH_NodeMsg_GetSourceAddress(msg)); - GWEN_MsgRequest_SetResult(rq, (msgCode==AQH_MSG_TYPE_VALUE_SET_ACK)?AQH_MSG_IPC_SUCCESS:AQH_MSG_IPC_ERROR_GENERIC); - GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE); - rqParent=GWEN_MsgRequest_Tree2_GetParent(rq); - if (rqParent) - GWEN_MsgRequest_SubRequestFinished(rqParent, rq, GWEN_MSG_REQUEST_REASON_DONE); - return GWEN_MSG_REQUEST_RESULT_HANDLED; + msgId=AQH_ValueMessage_GetMsgId(msg); + if (msgId==AQH_MsgRequest_GetRequestMsgId(rq)) { + AQH_MSG_REQUEST *rqParent; + + DBG_INFO(NULL, + "Received response (%02x) for msg id %04x from %02x", + msgCode, msgId, AQH_NodeMessage_GetSourceAddress(msg)); + AQH_MsgRequest_SetResult(rq, + (msgCode==AQH_MSG_TYPE_VALUE_SET_ACK)?AQH_MSGDATA_RESULT_SUCCESS:AQH_MSGDATA_RESULT_ERROR_GENERIC); + AQH_MsgRequest_SetState(rq, AQH_MSG_REQUEST_STATE_DONE); + rqParent=AQH_MsgRequest_Tree2_GetParent(rq); + if (rqParent) + AQH_MsgRequest_SubRequestFinished(rqParent, rq, AQH_MSG_REQUEST_REASON_DONE); + return AQH_MSG_REQUEST_RESULT_HANDLED; + } + else { + DBG_INFO(NULL, " Non-matching message id"); + } + } + else { + DBG_INFO(NULL, " Non-matching message code"); + } } - else { - DBG_INFO(NULL, " Non-matching message id"); - } - } - else { - DBG_INFO(NULL, " Non-matching message code"); } } - return GWEN_MSG_REQUEST_RESULT_NOT_HANDLED; + return AQH_MSG_REQUEST_RESULT_NOT_HANDLED; } -void _subRqAbort(GWEN_MSG_REQUEST *rq, int reason) +void _subRqAbort(AQH_MSG_REQUEST *rq, int reason) { - GWEN_MSG_REQUEST *rqParent; + AQH_MSG_REQUEST *rqParent; DBG_INFO(NULL, "Aborting request"); - GWEN_MsgRequest_SetResult(rq, AQH_MSG_IPC_ERROR_GENERIC); - GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE); + AQH_MsgRequest_SetResult(rq, AQH_MSGDATA_RESULT_ERROR_GENERIC); + AQH_MsgRequest_SetState(rq, AQH_MSG_REQUEST_STATE_DONE); - rqParent=GWEN_MsgRequest_Tree2_GetParent(rq); + rqParent=AQH_MsgRequest_Tree2_GetParent(rq); if (rqParent) - GWEN_MsgRequest_SubRequestFinished(rqParent, rq, reason); + AQH_MsgRequest_SubRequestFinished(rqParent, rq, reason); } -AQH_NODE_INFO *_getNodeInfoFromValue(AQHOMED *aqh, const AQH_VALUE *value) +AQH_NODE_INFO *_getNodeInfoFromValue(AQH_NODE_SERVER *xo, const AQH_VALUE *value) { const char *s; unsigned long int uid; @@ -331,7 +357,7 @@ AQH_NODE_INFO *_getNodeInfoFromValue(AQHOMED *aqh, const AQH_VALUE *value) if (s && *s && 1==sscanf(s, "%lx", &uid)) { AQH_NODE_INFO *ni; - ni=AQH_NodeDb_GetNodeInfoByUid(aqh->nodeDb, uid); + ni=AQH_NodeDb_GetNodeInfoByUid(xo->nodeDb, uid); if (ni==NULL) { DBG_ERROR(NULL, "Node \"%08lx\" not found", uid); return NULL; @@ -342,3 +368,15 @@ AQH_NODE_INFO *_getNodeInfoFromValue(AQHOMED *aqh, const AQH_VALUE *value) } + +void _sendResponseResultToBroker(AQH_OBJECT *ep, uint32_t refMsgId, int result) +{ + AQH_MESSAGE *msg; + + msg=AQH_IpcMessageResult_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_DATA_RESULT, + AQH_Endpoint_GetNextMessageId(ep), refMsgId, result, NULL); + AQH_Endpoint_AddMsgOut(ep, msg); +} + + + diff --git a/apps/aqhome-nodes/r_setdata.h b/apps/aqhome-nodes/r_setdata.h index 26f6cf5..6b30fa0 100644 --- a/apps/aqhome-nodes/r_setdata.h +++ b/apps/aqhome-nodes/r_setdata.h @@ -10,13 +10,13 @@ #define AQHOMED_R_SETDATA_H -#include "./aqhomed.h" +#include "./server.h" #include -void AqHomeNodes_HandleSetData(AQHOMED *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *recvdMsg); +void AQH_NodeServer_HandleSetData(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *recvdMsg); diff --git a/apps/aqhome-nodes/server.c b/apps/aqhome-nodes/server.c new file mode 100644 index 0000000..c40017e --- /dev/null +++ b/apps/aqhome-nodes/server.c @@ -0,0 +1,1300 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./server_p.h" +#include "./db.h" +#include "./devicesread.h" +#include "./r_setdata.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + +#define I18N(msg) msg +#define I18S(msg) msg + +#define A_ARG GWEN_ARGS_FLAGS_HAS_ARGUMENT +#define A_END (GWEN_ARGS_FLAGS_HELP | GWEN_ARGS_FLAGS_LAST) +#define A_CHAR GWEN_ArgsType_Char +#define A_INT GWEN_ArgsType_Int + + +#define AQH_NODE_SERVER_BROKER_RESTARTTIME 10 +#define AQH_NODE_SERVER_TTY_RESTARTTIME 10 + + +enum { + AQH_NODE_SERVER_SLOT_NEWCLIENT=1, + AQH_NODE_SERVER_SLOT_CLIENTCLOSED, + AQH_NODE_SERVER_SLOT_BROKERCLOSED, + AQH_NODE_SERVER_SLOT_TTYCLOSED +}; + + + +/* ------------------------------------------------------------------------------------------------ + * global vars + * ------------------------------------------------------------------------------------------------ + */ + +GWEN_INHERIT(AQH_OBJECT, AQH_NODE_SERVER) + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); +static void _readConfig(AQH_OBJECT *o, AQH_NODE_SERVER *xo, GWEN_DB_NODE *dbArgs); +static const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue); +static int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue); + +static int _startIpc(AQH_OBJECT *o, AQH_NODE_SERVER *xo); +static int _startTty(AQH_OBJECT *o, AQH_NODE_SERVER *xo); +static int _startBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo); +static void _setupDb(AQH_NODE_SERVER *xo); +static int _loadDeviceList(AQH_NODE_SERVER *xo); + +static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); +static int _handleNewIpcClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *clientEndpoint); +static int _handleIpcClientDown(AQH_OBJECT *clientEndpoint); +static int _handleBrokerDown(AQH_NODE_SERVER *xo); +static int _handleTtyDown(AQH_NODE_SERVER *xo); + +static void _handleMsgsFromClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *ep); +static void _handleMsgFromClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *ep, const AQH_MESSAGE *msg); + +static void _handleMsgFromTty(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _writeTtyMsgToLogFile(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _forwardTtyMsgToBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _forwardValueMessage(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _forwardDataFromSendStatsMessage(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _forwardDataFromRecvStatsMessage(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg); +static void _publishInt(AQH_NODE_SERVER *xo, uint32_t uid, const char *vPath, int vModality, const char *vUnits, int v); +static void _publishDouble(AQH_NODE_SERVER *xo, uint32_t uid, const char *vPath, int vModality, const char *vUnits, double v); +static void _setDeviceName(AQH_VALUE *value, uint32_t uid); + +static void _handleMsgFromBroker(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg); + +static void _writeToLogFile(const char *filename, const char *txt); +static int _createPidFile(const char *pidFilename); +static int _diffInSeconds(time_t t1, time_t t0); +static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs); + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * constructor, destructor + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +AQH_OBJECT *AQH_NodeServer_new(AQH_EVENT_LOOP *eventLoop) +{ + AQH_OBJECT *o; + AQH_NODE_SERVER *xo; + + o=AQH_Object_new(eventLoop); + GWEN_NEW_OBJECT(AQH_NODE_SERVER, xo); + GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_NODE_SERVER, o, xo, _freeData); + xo->ipcClientList=AQH_Object_List_new(); + + AQH_Object_SetSignalHandlerFn(o, _handleSignal); + + return o; +} + + + +void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) +{ + AQH_NODE_SERVER *xo; + + xo=(AQH_NODE_SERVER*) p; + + if (xo->ipcClientList) { + AQH_Object_List_free(xo->ipcClientList); + xo->ipcClientList=NULL; + } + free(xo->dbFile); + free(xo->logFile); + free(xo->pidFile); + free(xo->devicePath); + free(xo->tcpAddress); + free(xo->brokerAddress); + free(xo->brokerClientId); + + GWEN_FREE_OBJECT(xo); +} + + + +AQH_NODE_SERVER *AQH_NodeServer_GetServerData(const AQH_OBJECT *o) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + return xo; + } + return NULL; +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * getter, setter + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int AQH_NodeServer_GetTimeout(const AQH_OBJECT *o) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) + return xo->timeout; + } + return 0; +} + + + +void AQH_NodeServer_SetLogFile(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + free(xo->logFile); + xo->logFile=s?strdup(s):NULL; + } + } +} + + + +void AQH_NodeServer_SetDbFile(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + free(xo->dbFile); + xo->dbFile=s?strdup(s):NULL; + } + } +} + + + +void AQH_NodeServer_SetPidFile(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + free(xo->pidFile); + xo->pidFile=s?strdup(s):NULL; + } + } +} + + + +void AQH_NodeServer_SetDevicePath(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + free(xo->devicePath); + xo->devicePath=s?strdup(s):NULL; + } + } +} + + + +void AQH_NodeServer_SetTpcAddress(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + free(xo->tcpAddress); + xo->tcpAddress=s?strdup(s):NULL; + } + } +} + + + +void AQH_NodeServer_SetTcpPort(AQH_OBJECT *o, int i) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) + xo->tcpPort=i; + } +} + + + +void AQH_NodeServer_SetBrokerAddress(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + free(xo->brokerAddress); + xo->brokerAddress=s?strdup(s):NULL; + } + } +} + + + +void AQH_NodeServer_SetBrokerPort(AQH_OBJECT *o, int i) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) + xo->brokerPort=i; + } +} + + + +void AQH_NodeServer_SetBrokerClientId(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + free(xo->brokerClientId); + xo->brokerClientId=s?strdup(s):NULL; + } + } +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * init + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int AQH_NodeServer_Init(AQH_OBJECT *o, int argc, char **argv) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + GWEN_DB_NODE *dbArgs; + int rv; + const char *s; + + dbArgs=GWEN_DB_Group_new("args"); + rv=_readArgs(argc, argv, dbArgs); + if (rv<0) { + DBG_ERROR(NULL, "Error reading args (%d)", rv); + return rv; + } + AQH_MergeConfigFileIntoConfig(dbArgs, "ConfigFile"); + _readConfig(o, xo, dbArgs); + + s=GWEN_DB_GetCharValue(dbArgs, "loglevel", 0, NULL); + if (s && *s) { + GWEN_LOGGER_LEVEL ll; + + ll=GWEN_Logger_Name2Level(s); + GWEN_Logger_SetLevel(NULL, ll); + } + + s=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, AQHOMED_DEFAULT_PIDFILE); + if (s && *s) { + AQH_NodeServer_SetPidFile(o, s); + rv=_createPidFile(s); + if (rv<0) { + DBG_ERROR(NULL, "Error creating PID file (%d)", rv); + return rv; + } + } + + DBG_INFO(NULL, "Loading device files"); + rv=_loadDeviceList(xo); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + + DBG_INFO(NULL, "Setup node db"); + _setupDb(xo); + + DBG_INFO(NULL, "Starting IPC Service"); + rv=_startIpc(o, xo); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + DBG_INFO(NULL, "Starting TTY Service"); + rv=_startTty(o, xo); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + DBG_INFO(NULL, "Starting Broker Connection"); + rv=_startBroker(o, xo); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + return 0; + } + else { + DBG_ERROR(NULL, "Not of type AQH_NODE_SERVER object"); + return GWEN_ERROR_INVALID; + } +} + + + +void _readConfig(AQH_OBJECT *o, AQH_NODE_SERVER *xo, GWEN_DB_NODE *dbArgs) +{ + xo->dbArgs=dbArgs; + + xo->timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0); + xo->nodeAddress=GWEN_DB_GetIntValue(dbArgs, "nodeAddress", 0, AQHOMED_DEFAULT_NODEADDR); + + AQH_NodeServer_SetDbFile(o, GWEN_DB_GetCharValue(dbArgs, "dbfile", 0, NULL)); + AQH_NodeServer_SetLogFile(o, GWEN_DB_GetCharValue(dbArgs, "logfile", 0, NULL)); + + AQH_NodeServer_SetDevicePath(o, GWEN_DB_GetCharValue(dbArgs, "device", 0, AQHOMED_DEFAULT_DEVICE)); + AQH_NodeServer_SetTpcAddress(o, readCharConfigWithAlt(dbArgs, "tcpAddress", "ConfigFile/nodesAddress", NULL)); + AQH_NodeServer_SetTcpPort(o, readIntConfigWithAlt(dbArgs, "tcpPort", "ConfigFile/nodesPort", AQHOMED_DEFAULT_IPC_PORT, -1)); + + AQH_NodeServer_SetBrokerAddress(o, readCharConfigWithAlt(dbArgs, "brokerAddress", "ConfigFile/brokerAddress", "127.0.0.1")); + AQH_NodeServer_SetBrokerPort(o, readIntConfigWithAlt(dbArgs, "brokerPort", "ConfigFile/brokerPort", AQHOMED_DEFAULT_BROKER_PORT, -1)); + AQH_NodeServer_SetBrokerClientId(o, GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, AQHOMED_DEFAULT_BROKER_CLIENTID)); + +} + + + +const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue) +{ + const char *s; + + s=GWEN_DB_GetCharValue(dbArgs, varName, 0, NULL); + if (!(s && *s)) + s=GWEN_DB_GetCharValue(dbArgs, altVarName, 0, NULL); + return (s && *s)?s:defaultValue; +} + + + +int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue) +{ + int i; + + i=GWEN_DB_GetIntValue(dbArgs, varName, 0, nonValue); + if (i==nonValue) + i=GWEN_DB_GetIntValue(dbArgs, altVarName, 0, nonValue); + return (i!=nonValue)?i:defaultValue; +} + + + +int _startIpc(AQH_OBJECT *o, AQH_NODE_SERVER *xo) +{ + if (xo->ipcEndpoint) { + AQH_Object_Disable(xo->ipcEndpoint); + AQH_Object_free(xo->ipcEndpoint); + xo->ipcEndpoint=NULL; + } + + if (xo->tcpAddress && *(xo->tcpAddress) && xo->tcpPort>0) { + int fd; + + DBG_ERROR(NULL, "Starting IPC service on \"%s\":%d", xo->tcpAddress, xo->tcpPort); + fd=AQH_TcpdObject_CreateListeningSocket(xo->tcpAddress, xo->tcpPort); + if (fd<0) { + DBG_INFO(NULL, "here"); + return GWEN_ERROR_IO; + } + + xo->ipcEndpoint=AQH_IpcServerObject_new(AQH_Object_GetEventLoop(o), fd); + AQH_Object_AddLink(xo->ipcEndpoint, AQH_IPC_SERVER_SIGNAL_NEWCLIENT, AQH_NODE_SERVER_SLOT_NEWCLIENT, o); + AQH_Object_Enable(xo->ipcEndpoint); + return 0; + } + else { + DBG_ERROR(NULL, "Missing server address"); + return GWEN_ERROR_GENERIC; + } +} + + + +int _startTty(AQH_OBJECT *o, AQH_NODE_SERVER *xo) +{ + if (xo->ttyEndpoint) { + AQH_Object_Disable(xo->ttyEndpoint); + AQH_Object_free(xo->ttyEndpoint); + xo->ttyEndpoint=NULL; + } + + if (xo->devicePath && *(xo->devicePath)) { + int fd; + + DBG_ERROR(NULL, "Opening TTY device \"%s\"", xo->devicePath); + fd=AQH_TtyObject_OpenAndInitDevice(xo->devicePath, &(xo->initialTermiosState)); + if (fd<0) { + DBG_INFO(NULL, "here"); + return GWEN_ERROR_IO; + } + xo->ttyEndpoint=AQH_TtyEndpoint2_new(AQH_Object_GetEventLoop(o), fd); + AQH_Object_AddLink(xo->ttyEndpoint, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_NODE_SERVER_SLOT_TTYCLOSED, o); + AQH_Object_Enable(xo->ttyEndpoint); + } + else { + DBG_ERROR(NULL, "Missing device path"); + return GWEN_ERROR_GENERIC; + } + + return 0; +} + + + +int _startBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo) +{ + if (xo->brokerEndpoint) { + AQH_Object_Disable(xo->brokerEndpoint); + AQH_Object_free(xo->brokerEndpoint); + xo->brokerEndpoint=NULL; + } + + if (xo->brokerAddress && *(xo->brokerAddress) && xo->brokerPort) { + AQH_OBJECT *ep; + int fd; + int rv; + + fd=AQH_TcpObject_CreateConnectedSocket(xo->brokerAddress, xo->brokerPort); + if (fd<0) { + DBG_ERROR(NULL, "Error connecting to broker server %s:%d", xo->brokerAddress, xo->brokerPort); + return GWEN_ERROR_IO; + } + + ep=AQH_IpcClientObject_new(AQH_Object_GetEventLoop(o), fd); + assert(ep); + AQH_Endpoint_SetServiceName(ep, xo->brokerClientId); + AQH_Object_AddLink(ep, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_NODE_SERVER_SLOT_BROKERCLOSED, o); + AQH_Object_Enable(ep); + + rv=AQH_IpcEndpoint_ExchangeConnectMsg(ep, + AQH_MSGTYPE_IPC_DATA_CONNECT_REQ, + AQH_MSGTYPE_IPC_DATA_RESULT, + xo->brokerClientId, + NULL, + NULL, + 0, + xo->timeoutInSeconds); + if (rv!=0) { + DBG_ERROR(NULL, "Error connecting to broker: %d", rv); + return (rv<0)?rv:GWEN_ERROR_PERMISSIONS; + } + DBG_ERROR(NULL, "Connected to broker at %s:%d", xo->brokerAddress, xo->brokerPort); + xo->brokerEndpoint=ep; + return 0; + } + else { + DBG_ERROR(NULL, "No server settings"); + return GWEN_ERROR_BAD_DATA; + } + + return 0; +} + + + +void _setupDb(AQH_NODE_SERVER *xo) +{ + if (xo->dbFile) { + GWEN_DB_NODE *dbNodeDb; + int rv; + + dbNodeDb=GWEN_DB_Group_new("dbNodes"); + rv=GWEN_DB_ReadFile(dbNodeDb, xo->dbFile, GWEN_DB_FLAGS_DEFAULT|GWEN_PATH_FLAGS_CREATE_GROUP); + if (rv==0) { + AQH_NodeDb_fromDb(xo->nodeDb, dbNodeDb); + } + GWEN_DB_Group_free(dbNodeDb); + } +} + + + +int _loadDeviceList(AQH_NODE_SERVER *xo) +{ + AQHNODE_DEVICE_LIST *deviceList; + + deviceList=AQH_NodeServer_ReadDataDeviceFiles(); + if (deviceList==NULL) { + DBG_ERROR(NULL, "Error reading device list"); + return GWEN_ERROR_GENERIC; + } + xo->deviceDefList=deviceList; + return 0; +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * client management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +void AQH_NodeServer_CleanupClients(AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + AQH_OBJECT *ep; + + ep=AQH_Object_List_First(xo->ipcClientList); + while(ep) { + AQH_OBJECT *epNext; + + epNext=AQH_Object_List_Next(ep); + if (AQH_Object_GetFlags(ep) & AQH_OBJECT_FLAGS_DELETE) { + AQH_Object_List_Del(ep); + AQH_Object_free(ep); + } + ep=epNext; + } /* while */ + } +} + + + +void AQH_NodeServer_HandleClientMsgs(AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + AQH_OBJECT *ep; + + ep=AQH_Object_List_First(xo->ipcClientList); + while(ep) { + AQH_OBJECT *epNext; + + epNext=AQH_Object_List_Next(ep); + _handleMsgsFromClient(o, xo, ep); + ep=epNext; + } /* while */ + } +} + + + +void _handleMsgsFromClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *ep) +{ + AQH_MESSAGE *msg; + + while( (msg=AQH_Endpoint_GetNextMsgIn(ep)) ) { + AQH_Message_SetObject(msg, ep); + if (AQH_Request_Tree2_HandleIpcMsg(xo->requestTree, ep, msg)!=AQH_MSG_REQUEST_RESULT_HANDLED) + _handleMsgFromClient(o, xo, ep, msg); + AQH_Message_free(msg); + } +} + + + +void _handleMsgFromClient(GWEN_UNUSED AQH_OBJECT *o, GWEN_UNUSED AQH_NODE_SERVER *xo, GWEN_UNUSED AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + uint16_t code; + uint8_t protoId; + + /* exec IPC message */ + code=AQH_IpcMessage_GetCode(msg); + protoId=AQH_IpcMessage_GetProtoId(msg); + if (protoId==AQH_IPC_PROTOCOL_DATA_ID) { + DBG_ERROR(NULL, "Received IPC packet %d (%x)", (int) code, code); + switch(code) { + default: break; + } + } + else { + DBG_ERROR(NULL, "Invalid IPC protocol %d (%02x)", protoId, protoId); + } +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * TTY management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +void AQH_NodeServer_HandleTtyMsgs(AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo && xo->ttyEndpoint) { + AQH_MESSAGE *msg; + + while( (msg=AQH_Endpoint_GetNextMsgIn(xo->ttyEndpoint)) ) { + AQH_Message_SetObject(msg, xo->ttyEndpoint); + if (AQH_Request_Tree2_HandleTtyMsg(xo->requestTree, xo->ttyEndpoint, msg)!=AQH_MSG_REQUEST_RESULT_HANDLED) + _handleMsgFromTty(o, xo, msg); + AQH_Message_free(msg); + } + } +} + + + +void _handleMsgFromTty(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) +{ + uint8_t code; + + code=AQH_NodeMessage_GetMsgType(msg); + DBG_ERROR(NULL, "Received Node packet %d (%x)", (int) code, code); + AQH_NodeServer_NodeMsgToDb(o, msg); + _writeTtyMsgToLogFile(xo, msg); + _forwardTtyMsgToBroker(o, xo, msg); +} + + + +void _forwardTtyMsgToBroker(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) +{ + uint8_t code; + + code=AQH_NodeMessage_GetMsgType(msg); + switch(code) { + case AQH_MSG_TYPE_VALUE_REPORT: _forwardValueMessage(o, xo, msg); break; + case AQH_MSG_TYPE_COMSENDSTATS: _forwardDataFromSendStatsMessage(xo, msg); break; + case AQH_MSG_TYPE_COMRECVSTATS: _forwardDataFromRecvStatsMessage(xo, msg); break; + default: break; + } +} + + + +void _forwardValueMessage(AQH_OBJECT *o, AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) +{ + uint8_t valueId; + double v; + uint32_t uid; + AQH_NODE_INFO *ni; + const char *devName; + const AQHNODE_DEVICE *devInfo; + const AQHNODE_VALUE *value; + const char *vname; + + valueId=AQH_ValueMessage_GetValueId(msg); + v=AQH_ValueMessage_GetValue(msg); + + uid=AQH_ValueMessage_GetUid(msg); + ni=uid?AQH_NodeDb_GetNodeInfoByUid(xo->nodeDb, uid):NULL;; + devName=ni?AQH_NodeInfo_GetDeviceId(ni):NULL; + devInfo=devName?AQH_NodeServer_GetDeviceDefByName(o, devName):NULL; + value=devInfo?AQHNODE_Value_List_GetById(AQHNODE_Device_GetValueList(devInfo), valueId):NULL; + vname=value?AQHNODE_Value_GetName(value):NULL; + if (vname && *vname) + _publishDouble(xo, uid, vname, AQHNODE_Value_GetModality(value), AQHNODE_Value_GetValueUnits(value), v); +} + + + +void _forwardDataFromSendStatsMessage(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) +{ + uint16_t packetsOutInt; + + packetsOutInt=AQH_SendStatsMessage_GetPacketsOut(msg); + if (packetsOutInt) { + uint32_t uid; + double packetsOut; + double collisions; + double busy; + double collisionsPercentage=0.0; + double busyPercentage=0.0; + + uid=AQH_SendStatsMessage_GetUid(msg); + packetsOut=/*(double)*/ packetsOutInt; + collisions=/*(double)*/ AQH_SendStatsMessage_GetCollisions(msg); + busy=/*(double)*/ AQH_SendStatsMessage_GetBusyErrors(msg); + + collisionsPercentage=collisions*100.0/packetsOut; + busyPercentage=busy*100.0/packetsOut; + + _publishInt( xo, uid, "net/packetsOut", 0, NULL, packetsOutInt); + _publishInt( xo, uid, "net/collisions", 0, NULL, (int) AQH_SendStatsMessage_GetCollisions(msg)); + _publishDouble(xo, uid, "net/collisionsPercent", 0, "%", collisionsPercentage); + _publishDouble(xo, uid, "net/busyPercent", 0, "%", busyPercentage); + } +} + + + +void _forwardDataFromRecvStatsMessage(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) +{ + uint16_t packetsInInt; + + packetsInInt=AQH_RecvStatsMessage_GetPacketsIn(msg); + if (packetsInInt) { + uint32_t uid; + double packetsIn; + double crcErrors; + double ioErrors; + double crcErrorsPercentage=0.0; + double ioErrorsPercentage=0.0; + + uid=AQH_SendStatsMessage_GetUid(msg); + packetsIn=/*(double)*/ packetsInInt; + crcErrors=/*(double)*/AQH_RecvStatsMessage_GetCrcErrors(msg); + ioErrors=/*(double)*/AQH_RecvStatsMessage_GetIoErrors(msg); + + crcErrorsPercentage=crcErrors*100.0/packetsIn; + ioErrorsPercentage=ioErrors*100.0/packetsIn; + + _publishInt( xo, uid, "net/packetsIn", 0, NULL, packetsInInt); + _publishInt( xo, uid, "net/crcerrors", 0, NULL, (int) AQH_RecvStatsMessage_GetCrcErrors(msg)); + _publishInt( xo, uid, "net/ioerrors", 0, NULL, (int) AQH_RecvStatsMessage_GetIoErrors(msg)); + _publishDouble(xo, uid, "net/crcerrorsPercent", 0, "%", crcErrorsPercentage); + _publishDouble(xo, uid, "net/ioerrorsPercent", 0, "%", ioErrorsPercentage); + } +} + + + +void _publishInt(AQH_NODE_SERVER *xo, uint32_t uid, const char *vPath, int vModality, const char *vUnits, int v) +{ + _publishDouble(xo, uid, vPath, vModality, vUnits, /*(double)*/ v); +} + + + +void _publishDouble(AQH_NODE_SERVER *xo, uint32_t uid, const char *vPath, int vModality, const char *vUnits, double v) +{ + AQH_MESSAGE *pubMsg; + union {double f; uint64_t i;} u; + uint64_t arrayToSend[2]; + AQH_VALUE *value; + + u.f=v; + arrayToSend[0]=(uint64_t) time(NULL); + arrayToSend[1]=u.i; + + value=AQH_Value_new(); + _setDeviceName(value, uid); + AQH_Value_SetName(value, vPath); + AQH_Value_SetValueUnits(value, vUnits); + AQH_Value_SetValueType(value, AQH_ValueType_Sensor); + AQH_Value_SetModality(value, vModality); + + pubMsg=AQH_IpcdMessageMultiData_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, + AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint), 0, + value, arrayToSend, 1); + if (pubMsg) { + DBG_INFO(AQH_LOGDOMAIN, "BROKER PUBLISH %s: %f", AQH_Value_GetName(value), v); + AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, pubMsg); + } + AQH_Value_free(value); +} + + + +void _setDeviceName(AQH_VALUE *value, uint32_t uid) +{ + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + GWEN_Buffer_AppendArgs(buf, "%08x", uid); + AQH_Value_SetDeviceName(value, GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); +} + + + +void _writeTtyMsgToLogFile(AQH_NODE_SERVER *xo, const AQH_MESSAGE *msg) +{ + if (xo->logFile) { + GWEN_BUFFER *dbuf; + + dbuf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_NodeMessage_DumpSpecificToBuffer(msg, dbuf, "received"); + _writeToLogFile(xo->logFile, GWEN_Buffer_GetStart(dbuf)); + GWEN_Buffer_free(dbuf); + } +} + + + +void _writeToLogFile(const char *filename, const char *txt) +{ + if (txt && *txt) { + FILE *f; + + f=fopen(filename, "a+"); + if (f) { + if (1!=fwrite(txt, strlen(txt), 1, f)) { + DBG_ERROR(NULL, "Error logging."); + } + fclose(f); + } + } +} + + + +void AQH_NodeServer_CheckTtyConnection(AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo && xo->dbArgs) { + if (xo->ttyEndpoint==NULL) { + time_t now; + + DBG_ERROR(NULL, "TTY closed"); + now=time(NULL); + if (_diffInSeconds(now, xo->timestampBrokerDown)>AQH_NODE_SERVER_TTY_RESTARTTIME) { + int rv; + + DBG_ERROR(NULL, "Re-opening TTY device"); + rv=_startTty(o, xo); + if (rv<0) { + DBG_ERROR(NULL, "here (%d)", rv); + } + } + } + } +} + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * broker management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +void AQH_NodeServer_HandleBrokerMsgs(AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo && xo->brokerEndpoint) { + AQH_MESSAGE *msg; + + while( (msg=AQH_Endpoint_GetNextMsgIn(xo->brokerEndpoint)) ) { + AQH_Message_SetObject(msg, xo->brokerEndpoint); + if (AQH_Request_Tree2_HandleIpcMsg(xo->requestTree, xo->brokerEndpoint, msg)!=AQH_MSG_REQUEST_RESULT_HANDLED) + _handleMsgFromBroker(o, xo->brokerEndpoint, msg); + AQH_Message_free(msg); + } + } +} + + + +void _handleMsgFromBroker(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + uint16_t code; + uint8_t protoId; + + /* exec IPC message */ + code=AQH_IpcMessage_GetCode(msg); + protoId=AQH_IpcMessage_GetProtoId(msg); + if (protoId==AQH_IPC_PROTOCOL_DATA_ID) { + DBG_ERROR(NULL, "Received IPC packet %d (%x)", (int) code, code); + switch(code) { + case AQH_MSGTYPE_IPC_DATA_SETDATA: AQH_NodeServer_HandleSetData(o, ep, msg); break; + default: break; + } + } + else { + DBG_ERROR(NULL, "Invalid IPC protocol %d (%02x)", protoId, protoId); + } +} + + + +void AQH_NodeServer_CheckBrokerConnection(AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo && xo->dbArgs) { + if (xo->brokerEndpoint==NULL) { + time_t now; + + DBG_ERROR(NULL, "Broker connection down"); + now=time(NULL); + if (_diffInSeconds(now, xo->timestampBrokerDown)>AQH_NODE_SERVER_BROKER_RESTARTTIME) { + int rv; + + DBG_ERROR(NULL, "Restarting broker connection"); + rv=_startBroker(o, xo); + if (rv<0) { + DBG_ERROR(NULL, "here (%d)", rv); + } + } + } + } +} + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * request management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +AQH_MSG_REQUEST *AQH_NodeServer_GetRequestTree(const AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) + return xo->requestTree; + return NULL; +} + + + +void AQH_NodeServer_AddRequestToTree(AQH_OBJECT *o, AQH_MSG_REQUEST *rq) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo && rq) + AQH_MsgRequest_Tree2_AddChild(xo->requestTree, rq); +} + + + +void AQH_NodeServer_CleanupRequests(AQH_OBJECT *o) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + AQH_Request_Tree2_CheckTimeouts(xo->requestTree); + AQH_Request_Tree2_Cleanup(xo->requestTree); + } +} + + + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * signal handler + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, GWEN_UNUSED int param1, void *param2) +{ + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) { + switch(slotId) { + case AQH_NODE_SERVER_SLOT_NEWCLIENT: return _handleNewIpcClient(o, xo, (AQH_OBJECT*) param2); + case AQH_NODE_SERVER_SLOT_CLIENTCLOSED: return _handleIpcClientDown(senderObject); + case AQH_NODE_SERVER_SLOT_BROKERCLOSED: return _handleBrokerDown(xo); + case AQH_NODE_SERVER_SLOT_TTYCLOSED: return _handleTtyDown(xo); + default: + break; + } + } + + return 0; /* not handled */ +} + + + +int _handleNewIpcClient(AQH_OBJECT *o, AQH_NODE_SERVER *xo, AQH_OBJECT *clientEndpoint) +{ + DBG_ERROR(NULL, "New IPC client"); + AQH_Object_AddLink(clientEndpoint, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_NODE_SERVER_SLOT_CLIENTCLOSED, o); + AQH_Object_List_Add(clientEndpoint, xo->ipcClientList); + return 1; /* handled */ +} + + + +int _handleIpcClientDown(AQH_OBJECT *clientEndpoint) +{ + DBG_ERROR(NULL, "IPC client down"); + AQH_Object_AddFlags(clientEndpoint, AQH_OBJECT_FLAGS_DELETE); + return 1; /* handled */ +} + + + +int _handleBrokerDown(AQH_NODE_SERVER *xo) +{ + if (xo->brokerEndpoint) { + AQH_Object_Disable(xo->brokerEndpoint); + AQH_Object_free(xo->brokerEndpoint); + xo->brokerEndpoint=NULL; + xo->timestampBrokerDown=time(NULL); + } + return 1; +} + + + +int _handleTtyDown(AQH_NODE_SERVER *xo) +{ + if (xo->ttyEndpoint) { + AQH_Object_Disable(xo->ttyEndpoint); + AQH_Object_free(xo->ttyEndpoint); + xo->ttyEndpoint=NULL; + xo->timestampTtyDown=time(NULL); + } + return 1; +} + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * device management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +const AQHNODE_DEVICE_LIST *AQH_NodeServer_GetDeviceDefList(const AQH_OBJECT *o) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo) + return xo->deviceDefList; + } + return NULL; +} + + + +const AQHNODE_DEVICE *AQH_NodeServer_FindDeviceDef(const AQH_OBJECT *o, uint32_t manufacturer, uint16_t deviceType, uint16_t deviceVersion) +{ + if (o) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo && xo->deviceDefList) { + const AQHNODE_DEVICE *device; + + device=AQHNODE_Device_List_First(xo->deviceDefList); + while(device) { + if (AQHNODE_Device_GetManufacturer(device)==manufacturer && + AQHNODE_Device_GetDeviceType(device)==deviceType && + AQHNODE_Device_GetDeviceVersion(device)==(deviceVersion & 0xff00)) + return device; + device=AQHNODE_Device_List_Next(device); + } + } + } + + return NULL; +} + + + +const AQHNODE_DEVICE *AQH_NodeServer_GetDeviceDefByName(const AQH_OBJECT *o, const char *name) +{ + if (o && name) { + AQH_NODE_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_NODE_SERVER, o); + if (xo && xo->deviceDefList) + return AQHNODE_Device_List_GetByName(xo->deviceDefList, name); + } + return NULL; +} + + + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * helper functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int _createPidFile(const char *pidFilename) +{ + FILE *f; + int pidfd; + + if (remove(pidFilename)==0) { + DBG_ERROR(0, "Old PID file existed, removed. (Unclean shutdown?)"); + } + +#ifdef HAVE_SYS_STAT_H + pidfd = open(pidFilename, O_EXCL|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); + if (pidfd < 0) { + DBG_ERROR(NULL, "Could not create PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + + f = fdopen(pidfd, "w"); +#else /* HAVE_STAT_H */ + f=fopen(pidFilename,"w+"); +#endif /* HAVE_STAT_H */ + + /* write pid */ +#ifdef HAVE_GETPID + fprintf(f,"%d\n",getpid()); +#else + fprintf(f,"-1\n"); +#endif + if (fclose(f)) { + DBG_ERROR(0, "Could not close PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + return 0; +} + + + +int _diffInSeconds(time_t t1, time_t t0) +{ + return t1-t0; +} + + + +int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) +{ + int rv; + const GWEN_ARGS args[]= { + /* flags type name min max s long short_descr, long_descr */ + { A_ARG, A_CHAR, "loglevel", 0, 1, "L", "loglevel", I18S("Specify loglevel"), NULL}, + { A_ARG, A_CHAR, "cfgdir", 0, 1, "D", "cfgdir", I18S("Specify the configuration folder"), NULL}, + { A_ARG, A_CHAR, "charset", 0, 1, NULL, "charset", I18S("Specify the output character set"), NULL}, + { A_ARG, A_CHAR, "device", 0, 1, "d", "device", I18S("Specify the device (e.g. /dev/ttyUSB0)"), NULL}, + { A_ARG, A_INT, "nodeAddress", 0, 1, "n", "node", I18S("AqHome node adaptor address(default 240)"), NULL}, + { A_ARG, A_CHAR, "logFile", 0, 1, "l", "logfile", I18S("Specify a logfile to log received messages to"), NULL}, + { A_ARG, A_CHAR, "tcpAddress", 0, 1, "t", "tcpaddress", I18S("TCP address to listen on (disabled if missing)"), NULL}, + { A_ARG, A_INT, "tcpPort", 0, 1, "P", "tcpport", I18S("TCP port to listen on (default: 45454)"), NULL}, + { A_ARG, A_CHAR, "brokerAddress", 0, 1, "ba", "brokerddress", I18S("Broker address [127.0.0.1]"), NULL}, + { A_ARG, A_INT, "brokerPort", 0, 1, "bp", "brokerport", I18S("Broker port [1899]"), NULL}, + { A_ARG, A_CHAR, "brokerClientId", 0, 1, NULL, "brokerclientid", I18S("Broker client id"), NULL}, + { A_ARG, A_CHAR, "dbfile", 0, 1, "db", "dbfile", I18S("DB file to read/write node database"), NULL}, + { A_ARG, A_CHAR, "pidfile", 0, 1, "p", "pidfile", I18S("PID file"), NULL}, + { A_ARG, A_INT, "timeout", 0, 1, "T", NULL, I18S("timeout in seconds [0]"), NULL}, + { A_END, A_INT, "help", 0, 0, "h", "help", I18S("Show this help screen"), NULL} + }; + + rv=GWEN_Args_Check(argc, argv, 1, 0, args, dbArgs); + if (rv==GWEN_ARGS_RESULT_ERROR) { + fprintf(stderr, "ERROR: Could not parse arguments main\n"); + return GWEN_ERROR_INVALID; + } + else if (rv==GWEN_ARGS_RESULT_HELP) { + GWEN_BUFFER *ubuf; + + ubuf=GWEN_Buffer_new(0, 1024, 0, 1); + GWEN_Buffer_AppendArgs(ubuf, + I18N("This is version %s.\nUsage: %s [OPTIONS]\n\nOptions:\n"), + AQHOME_VERSION_STRING, + argv[0]); + if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) { + fprintf(stderr, "ERROR: Could not create help string\n"); + return 1; + } + GWEN_Buffer_AppendString(ubuf, "\n"); + + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(ubuf)); + GWEN_Buffer_free(ubuf); + return GWEN_ERROR_CLOSE; + } + return 0; +} + + + diff --git a/apps/aqhome-nodes/server.h b/apps/aqhome-nodes/server.h new file mode 100644 index 0000000..fe8a0a1 --- /dev/null +++ b/apps/aqhome-nodes/server.h @@ -0,0 +1,65 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef SERVER_H +#define SERVER_H + + + +#include "aqhome-nodes/types/device.h" + +#include "aqhome/events2/object.h" +#include + + + +AQH_OBJECT *AQH_NodeServer_new(AQH_EVENT_LOOP *eventLoop); +int AQH_NodeServer_Init(AQH_OBJECT *o, int argc, char **argv); + +/* loop functions */ +void AQH_NodeServer_CleanupClients(AQH_OBJECT *o); +void AQH_NodeServer_HandleTtyMsgs(AQH_OBJECT *o); +void AQH_NodeServer_HandleClientMsgs(AQH_OBJECT *o); +void AQH_NodeServer_HandleBrokerMsgs(AQH_OBJECT *o); +void AQH_NodeServer_CheckBrokerConnection(AQH_OBJECT *o); +void AQH_NodeServer_CheckTtyConnection(AQH_OBJECT *o); + + +/* getters and setters */ +int AQH_NodeServer_GetTimeout(const AQH_OBJECT *o); +void AQH_NodeServer_SetLogFile(AQH_OBJECT *o, const char *s); +void AQH_NodeServer_SetDbFile(AQH_OBJECT *o, const char *s); +void AQH_NodeServer_SetPidFile(AQH_OBJECT *o, const char *s); + +void AQH_NodeServer_SetDevicePath(AQH_OBJECT *o, const char *s); +void AQH_NodeServer_SetTpcAddress(AQH_OBJECT *o, const char *s); +void AQH_NodeServer_SetTcpPort(AQH_OBJECT *o, int i); +void AQH_NodeServer_SetBrokerAddress(AQH_OBJECT *o, const char *s); +void AQH_NodeServer_SetBrokerPort(AQH_OBJECT *o, int i); +void AQH_NodeServer_SetBrokerClientId(AQH_OBJECT *o, const char *s); + + +/* device management */ +const AQHNODE_DEVICE_LIST *AQH_NodeServer_GetDeviceDefList(const AQH_OBJECT *o); +const AQHNODE_DEVICE *AQH_NodeServer_FindDeviceDef(const AQH_OBJECT *o, + uint32_t manufacturer, + uint16_t deviceType, uint16_t deviceVersion); +const AQHNODE_DEVICE *AQH_NodeServer_GetDeviceDefByName(const AQH_OBJECT *o, const char *name); + + +/* request management */ +AQH_MSG_REQUEST *AQH_NodeServer_GetRequestTree(const AQH_OBJECT *o); +void AQH_NodeServer_AddRequestToTree(AQH_OBJECT *o, AQH_MSG_REQUEST *rq); +void AQH_NodeServer_CleanupRequests(AQH_OBJECT *o); + + + + + + +#endif diff --git a/apps/aqhome-nodes/server_p.h b/apps/aqhome-nodes/server_p.h new file mode 100644 index 0000000..a5a0211 --- /dev/null +++ b/apps/aqhome-nodes/server_p.h @@ -0,0 +1,76 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef SERVER_P_H +#define SERVER_P_H + + +#include "./server.h" +#include "aqhome-nodes/types/device.h" + +#include "aqhome/nodes/nodedb.h" +#include "aqhome/ipc2/msgrequest.h" + +#include + + +/* default values */ +#define AQHOMED_DEFAULT_NODEADDR 240 +#define AQHOMED_DEFAULT_PIDFILE "/var/run/aqhomed-node.pid" +#define AQHOMED_DEFAULT_DEVICE "/dev/ttyUSB0" +#define AQHOMED_DEFAULT_IPC_PORT 45454 +#define AQHOMED_DEFAULT_BROKER_PORT 1899 +#define AQHOMED_DEFAULT_BROKER_CLIENTID "nodes" + + + +typedef struct AQH_NODE_SERVER AQH_NODE_SERVER; +struct AQH_NODE_SERVER { + AQH_OBJECT *ttyEndpoint; + AQH_OBJECT *brokerEndpoint; + + AQH_OBJECT *ipcEndpoint; + AQH_OBJECT_LIST *ipcClientList; + + AQH_NODE_DB *nodeDb; + AQHNODE_DEVICE_LIST *deviceDefList; + + AQH_MSG_REQUEST *requestTree; + + GWEN_DB_NODE *dbArgs; + + char *dbFile; + char *logFile; + char *pidFile; + + int timeout; /* timeout for run e.g. inside valgrind */ + char *devicePath; + + char *tcpAddress; + int tcpPort; + + char *brokerAddress; + int brokerPort; + char *brokerClientId; + + time_t timestampTtyDown; + time_t timestampBrokerDown; + + int nodeAddress; + + struct termios initialTermiosState; + + int timeoutInSeconds; +}; + + +AQH_NODE_SERVER *AQH_NodeServer_GetServerData(const AQH_OBJECT *o); + + +#endif +