/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./utils.h" #include "aqhome/ipc/endpoint_ipc.h" #include "aqhome/ipc/nodes/msg_ipc_setaccmsggrps.h" #include "aqhome/ipc/nodes/msg_ipc_forward.h" #include "aqhome/ipc/data/ipc_data.h" #include "aqhome/ipc/data/msg_data_connect.h" #include "aqhome/ipc/msg_ipc_result.h" #include "aqhome/ipc/endpoint_ipcclient.h" #include #include #include #include #include #include #include #define UTILS_IPC_ENDPOINT_DEFAULT_MSGSIZE 4096 GWEN_MSG_ENDPOINT *Utils_SetupBrokerClientEndpoint(GWEN_DB_NODE *dbArgs, uint32_t flags) { const char *brokerAddress; int brokerPort; const char *brokerClientId; brokerAddress=GWEN_DB_GetCharValue(dbArgs, "brokerAddress", 0, NULL); if (!(brokerAddress && *brokerAddress)) brokerAddress=GWEN_DB_GetCharValue(dbArgs, "ConfigFile/brokerAddress", 0, "127.0.0.1"); brokerPort=GWEN_DB_GetIntValue(dbArgs, "brokerPort", 0, -1); if (brokerPort<0) brokerPort=GWEN_DB_GetIntValue(dbArgs, "ConfigFile/brokerPort", 0, 45456); brokerClientId=GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, "aqhome-tool"); if (brokerAddress && *brokerAddress && brokerPort) { GWEN_MSG_ENDPOINT *ep; GWEN_MSG_ENDPOINT *ipcBaseEndpoint; int rv; ep=AQH_ClientIpcEndpoint_new("brokerIpcClient", 0); GWEN_MsgEndpoint_AddFlags(ep, flags); ipcBaseEndpoint=AQH_IpcEndpoint_CreateIpcTcpClient(brokerAddress, brokerPort, "brokerPhysEndpoint", 0); AQH_IpcEndpoint_SetServiceName(ipcBaseEndpoint, brokerClientId); GWEN_MsgEndpoint_Tree2_AddChild(ep, ipcBaseEndpoint); rv=GWEN_MultilayerEndpoint_StartConnect(ep); if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { DBG_ERROR(NULL, "Error connecting to broker server %s:%d (%d), will retry later", brokerAddress, brokerPort, rv); GWEN_MsgEndpoint_free(ep); return NULL; } return ep; } return NULL; } GWEN_MSG_ENDPOINT *Utils_SetupNodesClientEndpoint(GWEN_DB_NODE *dbArgs) { return Utils_SetupIpcEndpoint(dbArgs, "tcpAddress", "tcpPort", "ConfigFile/nodesAddress", "ConfigFile/nodesPort", 45454); } GWEN_MSG_ENDPOINT *Utils_SetupIpcEndpoint(GWEN_DB_NODE *dbArgs, const char *varNameAddr, const char *varNamePort, const char *fileVarNameAddr, const char *fileVarNamePort, int defaultPort) { GWEN_MSG_ENDPOINT *epTcp; const char *tcpAddress; int tcpPort; int rv; tcpAddress=GWEN_DB_GetCharValue(dbArgs, varNameAddr, 0, NULL); if (!(tcpAddress && *tcpAddress)) tcpAddress=GWEN_DB_GetCharValue(dbArgs, fileVarNameAddr, 0, "127.0.0.1"); tcpPort=GWEN_DB_GetIntValue(dbArgs, varNamePort, 0, -1); if (tcpPort<0) tcpPort=GWEN_DB_GetIntValue(dbArgs, fileVarNamePort, 0, defaultPort); DBG_INFO(NULL, "Setup tcp client endpoint to %s:%d", tcpAddress, tcpPort); epTcp=AQH_IpcEndpoint_CreateIpcTcpClient(tcpAddress, tcpPort, "aqhome-tool-IPC", 0); if (epTcp==NULL) { DBG_ERROR(NULL, "Error creating endpoint TCPc"); return NULL; } GWEN_MsgEndpoint_SetDefaultMessageSize(epTcp, UTILS_IPC_ENDPOINT_DEFAULT_MSGSIZE); rv=GWEN_TcpcEndpoint_StartConnect(epTcp); if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { DBG_ERROR(NULL, "Error connecting (%d)", rv); GWEN_MsgEndpoint_free(epTcp); return NULL; } return epTcp; } GWEN_MSG *Utils_WaitForSpecificNodeMessage(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int nodeAddr, int timeoutInSeconds) { time_t startTime; startTime=time(NULL); for (;;) { GWEN_MSG *msg; time_t now; while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp)) ) { if (GWEN_IpcMsg_GetCode(msg)==AQH_MSGTYPE_IPC_NODES_FORWARD) { GWEN_MSG *nodeMsg; DBG_INFO(NULL, "Received IPC FORWARD message"); nodeMsg=AQH_ForwardIpcMsg_GetCopyOfNodeMsg(msg); if (nodeMsg) { DBG_INFO(AQH_LOGDOMAIN, "Received node msg from %d (%d)", AQH_NodeMsg_GetSourceAddress(nodeMsg), AQH_NodeMsg_GetMsgType(nodeMsg)); if (AQH_NodeMsg_GetMsgType(nodeMsg)==msgCode && (nodeAddr==0 || AQH_NodeMsg_GetSourceAddress(nodeMsg)==nodeAddr)) { GWEN_Msg_free(msg); return nodeMsg; } } } else { DBG_INFO(NULL, "Received IPC message %d, ignoring", GWEN_IpcMsg_GetCode(msg)); } GWEN_Msg_free(msg); } /* while */ now=time(NULL); if (now-startTime>timeoutInSeconds) { DBG_INFO(NULL, "Timeout"); break; } GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */ } return NULL; } GWEN_MSG *Utils_WaitForSpecificIpcMessage(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds) { time_t startTime; startTime=time(NULL); for (;;) { GWEN_MSG *msg; time_t now; GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */ msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp); if (msg) { uint16_t code; code=GWEN_IpcMsg_GetCode(msg); if (code==msgCode) { DBG_INFO(NULL, "Received expected IPC message"); return msg; } else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { DBG_INFO(NULL, "Received IPC result message"); return msg; } else { DBG_INFO(NULL, "Received unexpected message %d (%x)", code, code); } GWEN_Msg_free(msg); } now=time(NULL); if (now-startTime>timeoutInSeconds) { DBG_INFO(NULL, "Timeout"); break; } } return NULL; } int Utils_FlushOutMessageQueue(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds) { time_t startTime; startTime=time(NULL); while(GWEN_MsgEndpoint_HaveMessageToSend(epTcp)) { time_t now; GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */ now=time(NULL); if (now-startTime>timeoutInSeconds) { DBG_INFO(NULL, "Timeout"); return GWEN_ERROR_TIMEOUT; } } return 0; } int Utils_SendAcceptedMsgGroups(GWEN_MSG_ENDPOINT *epTcp, uint32_t groups) { GWEN_MSG *msgOut; msgOut=AQH_SetAcceptedMsgGroupsIpcMsg_new(AQH_MSGTYPE_IPC_NODES_SETACCMSGGRPS, groups); if (msgOut==NULL) { DBG_ERROR(NULL, "Error creating message"); return GWEN_ERROR_GENERIC; } GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); return 0; } GWEN_MSG_ENDPOINT *Utils_OpenBrokerConnection(GWEN_DB_NODE *dbArgs, uint32_t flags, int timeoutInSeconds) { GWEN_MSG_ENDPOINT *epTcp; GWEN_MSG *msgOut; GWEN_MSG *msgIn; uint32_t result; const char *clientId; const char *userId; const char *password; clientId=GWEN_DB_GetCharValue(dbArgs, "clientId", 0, NULL); userId=GWEN_DB_GetCharValue(dbArgs, "userId", 0, NULL); password=GWEN_DB_GetCharValue(dbArgs, "password", 0, NULL); epTcp=Utils_SetupBrokerClientEndpoint(dbArgs, 0); if (epTcp==NULL) { DBG_ERROR(NULL, "ERROR creating TCP connection"); return NULL; } msgOut=AQH_ConnectDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_CONNECT_REQ, clientId, userId, password, flags); if (msgOut==NULL) { DBG_ERROR(NULL, "Error creating message"); GWEN_MsgEndpoint_free(epTcp); return NULL; } GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); msgIn=Utils_WaitForSpecificIpcMessage(epTcp, AQH_MSGTYPE_IPC_DATA_RESULT, timeoutInSeconds); if (msgIn==NULL) { DBG_ERROR(NULL, "No response received"); GWEN_MsgEndpoint_free(epTcp); return NULL; } result=AQH_ResultIpcMsg_GetResultCode(msgIn); GWEN_Msg_free(msgIn); if (result!=AQH_MSG_IPC_SUCCESS) { DBG_ERROR(NULL, "Response: %d", result); GWEN_MsgEndpoint_free(epTcp); return NULL; } return epTcp; } void Utils_PrintDataPoints(const uint64_t *dataPoints, uint32_t numValues, const char *valueUnits) { uint32_t i; for(i=0; i1) { uint64_t timestamp=0; double valueFirst=0.0; double valueLast=0.0; double valueDiff=0.0; union {double f; uint64_t i;} u; /* ignore timestamp of first datapoint */ u.i=dataPoints[1]; valueFirst=u.f; timestamp=dataPoints[(numValues-1)*2]; u.i=dataPoints[((numValues-1)*2)+1]; valueLast=u.f; valueDiff=valueLast-valueFirst; Utils_PrintSingleDataPoint(timestamp, valueDiff, valueUnits); } else { uint64_t timestamp; union {double f; uint64_t i;} u; timestamp=dataPoints[0]; u.i=dataPoints[1]; Utils_PrintSingleDataPoint(timestamp, u.f, valueUnits); } } } void Utils_PrintDevice(const AQH_DEVICE *device, int printHeader) { GWEN_TIMESTAMP *ts; uint64_t deviceId; const char *deviceNameForSystem; const char *deviceNameForGui; const char *roomName; const char *location; const char *description; uint64_t timestamp; deviceId=AQH_Device_GetId(device); deviceNameForSystem=AQH_Device_GetNameForSystem(device); deviceNameForGui=AQH_Device_GetNameForGui(device); roomName=AQH_Device_GetRoomName(device); location=AQH_Device_GetLocation(device); description=AQH_Device_GetDescription(device); timestamp=AQH_Device_GetTimestampCreation(device); ts=timestamp?GWEN_Timestamp_fromLocalTime((time_t) timestamp):NULL; if (printHeader) fprintf(stdout, "ID\tName\tCreation Date\tGUI Name\tRoom\tLocation\tDescription\n"); if (ts) fprintf(stdout, "%lu\t%s\t%04d/%02d/%02d-%02d:%02d:%02d\t%s\t%s\t%s\t%s\n", deviceId, deviceNameForSystem, GWEN_Timestamp_GetYear(ts), GWEN_Timestamp_GetMonth(ts), GWEN_Timestamp_GetDay(ts), GWEN_Timestamp_GetHour(ts), GWEN_Timestamp_GetMinute(ts), GWEN_Timestamp_GetSecond(ts), deviceNameForGui?deviceNameForGui:"", roomName?roomName:"", location?location:"", description?description:""); else fprintf(stdout, "%lu\t%s\t\t%s\t%s\t%s\t%s\n", deviceId, deviceNameForSystem, deviceNameForGui?deviceNameForGui:"", roomName?roomName:"", location?location:"", description?description:""); } AQH_DEVICE *Utils_DeviceFromArgs(GWEN_DB_NODE *dbArgs) { AQH_DEVICE *device; device=AQH_Device_new(); AQH_Device_SetNameForSystem(device, GWEN_DB_GetCharValue(dbArgs, "device", 0, NULL)); AQH_Device_SetNameForGui(device, GWEN_DB_GetCharValue(dbArgs, "nameForGui", 0, NULL)); AQH_Device_SetRoomName(device, GWEN_DB_GetCharValue(dbArgs, "roomName", 0, NULL)); AQH_Device_SetLocation(device, GWEN_DB_GetCharValue(dbArgs, "location", 0, NULL)); AQH_Device_SetDescription(device, GWEN_DB_GetCharValue(dbArgs, "description", 0, NULL)); return device; }