/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2025 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./server_p.h" #include "./net_read.h" #include "aqhome-react/units/u_timer.h" #include "aqhome-react/units/u_logical.h" #include "aqhome-react/units/u_valuefilter.h" #include "aqhome-react/units/u_valueset.h" #include "aqhome-react/units/u_varset.h" #include "aqhome-react/units/u_stabilize.h" #include "aqhome-react/units/u_lowpass.h" #include "aqhome-react/units/u_highpass.h" #include "aqhome-react/units/u_zeroposnegstring.h" #include "aqhome-react/units/u_suntime.h" #include "aqhome-react/units/u_varchanges.h" #include "aqhome-react/units/u_timeprogram.h" #include "aqhome-react/units/u_statfns.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* ------------------------------------------------------------------------------------------------ * defines * ------------------------------------------------------------------------------------------------ */ #define I18N(msg) msg #define I18S(msg) msg #define A_ARG GWEN_ARGS_FLAGS_HAS_ARGUMENT #define A_END (GWEN_ARGS_FLAGS_HELP | GWEN_ARGS_FLAGS_LAST) #define A_CHAR GWEN_ArgsType_Char #define A_INT GWEN_ArgsType_Int #define AQH_REACT_SERVER_BROKER_RESTARTTIME 10 enum { AQH_REACT_SERVER_SLOT_BROKERCLOSED=1, }; /* ------------------------------------------------------------------------------------------------ * global vars * ------------------------------------------------------------------------------------------------ */ GWEN_INHERIT(AQH_OBJECT, AQH_REACT_SERVER) /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p); static void _setBrokerAddress(AQH_OBJECT *o, const char *s); static void _setBrokerPort(AQH_OBJECT *o, int i); static void _setBrokerClientId(AQH_OBJECT *o, const char *s); static void _readConfig(AQH_OBJECT *o, AQH_REACT_SERVER *xo, GWEN_DB_NODE *dbArgs); static const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue); static int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue); static int _startBroker(AQH_OBJECT *o, AQH_REACT_SERVER *xo); static int _exchangeBrokerConnect(AQH_REACT_SERVER *xo, uint32_t flags); static void _handleMsgFromBroker(AQH_REACT_SERVER *xo, const AQH_MESSAGE *msg); static void _handleBrokerChangeData(AQHREACT_UNIT *varChangeUnit, const GWEN_TAG16_LIST *tagList); static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, GWEN_UNUSED int param1, void *param2); static int _handleBrokerDown(AQH_REACT_SERVER *xo); static void _setupBuiltinUnits(AQH_OBJECT *o, AQH_REACT_SERVER *xo); static int _processAllUnits(AQH_OBJECT *o); static int _createPidFile(const char *pidFilename); static int _diffInSeconds(time_t t1, time_t t0); static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs); /* ------------------------------------------------------------------------------------------------ * code * ------------------------------------------------------------------------------------------------ */ /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * constructor, destructor * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx */ AQH_OBJECT *AQH_ReactServer_new(AQH_EVENT_LOOP *eventLoop) { AQH_OBJECT *o; AQH_REACT_SERVER *xo; o=AQH_Object_new(eventLoop); GWEN_NEW_OBJECT(AQH_REACT_SERVER, xo); GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_REACT_SERVER, o, xo, _freeData); AQH_Object_SetSignalHandlerFn(o, _handleSignal); xo->timeoutInSeconds=5; xo->unitList=AQHREACT_Unit_List_new(); return o; } void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) { AQH_REACT_SERVER *xo; xo=(AQH_REACT_SERVER*) p; AQH_Object_free(xo->brokerEndpoint); GWEN_FREE_OBJECT(xo); } AQH_REACT_SERVER *AQH_ReactServer_GetServerData(const AQH_OBJECT *o) { if (o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); return xo; } return NULL; } /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * getter, setter * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx */ int AQH_ReactServer_GetTimeout(const AQH_OBJECT *o) { if (o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) return xo->timeout; } return 0; } void _setPidFile(AQH_OBJECT *o, const char *s) { if (o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) { free(xo->pidFile); xo->pidFile=s?strdup(s):NULL; } } } void _setBrokerAddress(AQH_OBJECT *o, const char *s) { if (o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) { free(xo->brokerAddress); xo->brokerAddress=s?strdup(s):NULL; } } } void _setBrokerPort(AQH_OBJECT *o, int i) { if (o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) xo->brokerPort=i; } } void _setBrokerClientId(AQH_OBJECT *o, const char *s) { if (o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) { free(xo->brokerClientId); xo->brokerClientId=s?strdup(s):NULL; } } } void AQH_ReactServer_SetVarsFile(AQH_OBJECT *o, const char *s) { if (o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) { free(xo->varsFile); xo->varsFile=s?strdup(s):NULL; } } } AQH_OBJECT *AQH_ReactServer_GetBrokerEndpoint(const AQH_OBJECT *o) { if (o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) return xo->brokerEndpoint; } return 0; } time_t AQH_ReactServer_GetLatestNetworkFileTime(const AQH_OBJECT *aqh) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo) return xo->latestNetworkFileTime; } return 0; } void AQH_ReactServer_SetLatestNetworkFileTime(AQH_OBJECT *aqh, time_t t) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo) xo->latestNetworkFileTime=t; } } AQHREACT_UNIT *AQH_ReactServer_GetTimerUnit(const AQH_OBJECT *aqh) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo) return xo->timerUnit; } return NULL; } AQHREACT_UNIT *AQH_ReactServer_GetServerVarChangeUnit(const AQH_OBJECT *aqh) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo) return xo->serverVarChangeUnit; } return NULL; } /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * init * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx */ int AQH_ReactServer_Init(AQH_OBJECT *o, int argc, char **argv) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) { GWEN_DB_NODE *dbArgs; int rv; const char *s; dbArgs=GWEN_DB_Group_new("args"); rv=_readArgs(argc, argv, dbArgs); if (rv<0) { DBG_ERROR(NULL, "Error reading args (%d)", rv); return rv; } AQH_MergeConfigFileIntoConfig(dbArgs, "ConfigFile"); _readConfig(o, xo, dbArgs); s=GWEN_DB_GetCharValue(dbArgs, "loglevel", 0, NULL); if (s && *s) { GWEN_LOGGER_LEVEL ll; ll=GWEN_Logger_Name2Level(s); GWEN_Logger_SetLevel(NULL, ll); } s=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, AQHOME_REACT_DEFAULT_PIDFILE); if (s && *s) { _setPidFile(o, s); rv=_createPidFile(s); if (rv<0) { DBG_ERROR(NULL, "Error creating PID file (%d)", rv); return rv; } } s=GWEN_DB_GetCharValue(dbArgs, "varsfile", 0, NULL); if (s && *s) { AQH_ReactServer_SetVarsFile(o, s); } else { GWEN_BUFFER *bufFilename; bufFilename=AQH_GetRuntimeFilePath(AQHOME_REACT_DEFAULT_VARSFILE); if (bufFilename) { AQH_ReactServer_SetVarsFile(o, GWEN_Buffer_GetStart(bufFilename)); GWEN_Buffer_free(bufFilename); } else { DBG_ERROR(NULL, "Could not setup filename for vars, please specify via command line argument"); return GWEN_ERROR_GENERIC; } } xo->localVars=AQH_Vars_CreateGroup("localVars"); // @TODO: read vars from file rv=AQH_ReactServer_ReloadUnitNets(o); if (rv<0) { DBG_ERROR(NULL, "Error reading unit network files (%d)", rv); return rv; } DBG_INFO(NULL, "Starting Broker Connection"); rv=_startBroker(o, xo); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return rv; } return 0; } else { DBG_ERROR(NULL, "Not of type AQH_REACT_SERVER object"); return GWEN_ERROR_INVALID; } } void _readConfig(AQH_OBJECT *o, AQH_REACT_SERVER *xo, GWEN_DB_NODE *dbArgs) { xo->dbArgs=dbArgs; xo->timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0); AQH_ReactServer_SetVarsFile(o, readCharConfigWithAlt(dbArgs, "varsfile", "ConfigFile/reactVarsFile", NULL)); _setBrokerAddress(o, readCharConfigWithAlt(dbArgs, "brokerAddress", "ConfigFile/brokerAddress", "127.0.0.1")); _setBrokerPort(o, readIntConfigWithAlt(dbArgs, "brokerPort", "ConfigFile/brokerPort", AQHOME_REACT_DEFAULT_BROKER_PORT, -1)); _setBrokerClientId(o, GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, AQHOME_REACT_DEFAULT_BROKER_CLIENTID)); } const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue) { const char *s; s=GWEN_DB_GetCharValue(dbArgs, varName, 0, NULL); if (!(s && *s)) s=GWEN_DB_GetCharValue(dbArgs, altVarName, 0, NULL); return (s && *s)?s:defaultValue; } int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue) { int i; i=GWEN_DB_GetIntValue(dbArgs, varName, 0, nonValue); if (i==nonValue) i=GWEN_DB_GetIntValue(dbArgs, altVarName, 0, nonValue); return (i!=nonValue)?i:defaultValue; } int _startBroker(AQH_OBJECT *o, AQH_REACT_SERVER *xo) { if (xo->brokerEndpoint) { AQH_Object_Disable(xo->brokerEndpoint); AQH_Object_free(xo->brokerEndpoint); xo->brokerEndpoint=NULL; } if (xo->brokerAddress && *(xo->brokerAddress) && xo->brokerPort) { AQH_OBJECT *ep; int fd; int rv; fd=AQH_TcpObject_CreateConnectedSocket(xo->brokerAddress, xo->brokerPort); if (fd<0) { DBG_ERROR(NULL, "Error connecting to broker server %s:%d", xo->brokerAddress, xo->brokerPort); return GWEN_ERROR_IO; } DBG_INFO(NULL, "Physically connected to broker server %s:%d", xo->brokerAddress, xo->brokerPort); ep=AQH_IpcClientObject_new(AQH_Object_GetEventLoop(o), fd); assert(ep); AQH_Endpoint_SetServiceName(ep, xo->brokerClientId); AQH_Object_AddLink(ep, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_REACT_SERVER_SLOT_BROKERCLOSED, o); AQH_Object_Enable(ep); xo->brokerEndpoint=ep; rv=_exchangeBrokerConnect(xo, AQH_MSG_CONNECT_FLAGS_WANTUPDATES); if (rv!=0) { DBG_ERROR(NULL, "Error connecting to broker: %d", rv); return (rv<0)?rv:GWEN_ERROR_PERMISSIONS; } DBG_INFO(NULL, "Connected to broker at %s:%d", xo->brokerAddress, xo->brokerPort); return 0; } else { DBG_ERROR(NULL, "No server settings"); return GWEN_ERROR_BAD_DATA; } return 0; } int _exchangeBrokerConnect(AQH_REACT_SERVER *xo, uint32_t flags) { AQH_MESSAGE *msgOut; uint32_t msgId; msgId=AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint); msgOut=AQH_IpcMessageConnect_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_CONNECT_REQ, msgId, 0, xo->brokerClientId, NULL, NULL, flags); AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, msgOut); return AQH_IpcEndpoint_WaitForResultMsg(xo->brokerEndpoint, AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_RESULT, msgId, xo->timeoutInSeconds); } /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * fini * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx */ void AQH_ReactServer_Fini(AQH_OBJECT *o) { if (o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo && xo->brokerEndpoint) { if (xo->brokerEndpoint) { AQH_Object_free(xo->brokerEndpoint); xo->brokerEndpoint=NULL; } AQHREACT_Unit_List_Clear(xo->unitList); xo->timerUnit=NULL; xo->serverVarChangeUnit=NULL; xo->localVarChangeUnit=NULL; AQH_Vars_free(xo->localVars); xo->localVars=NULL; if (xo->pidFile) remove(xo->pidFile); } } } /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * broker management functions * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx */ void AQH_ReactServer_HandleBrokerMsgs(AQH_OBJECT *o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo && xo->brokerEndpoint) { AQH_MESSAGE *msg; while( (msg=AQH_Endpoint_GetNextMsgIn(xo->brokerEndpoint)) ) { AQH_Message_SetObject(msg, xo->brokerEndpoint); _handleMsgFromBroker(xo, msg); AQH_Message_free(msg); } } } void _handleMsgFromBroker(AQH_REACT_SERVER *xo, const AQH_MESSAGE *msg) { uint16_t code; uint8_t protoId; /* exec IPC message */ code=AQH_IpcMessage_GetCode(msg); protoId=AQH_IpcMessage_GetProtoId(msg); if (protoId==AQH_IPC_PROTOCOL_DATA_ID) { GWEN_TAG16_LIST *tagList; tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0); if (tagList) { DBG_INFO(NULL, "Received IPC packet %d (%x)", (int) code, code); switch(code) { case AQH_MSGTYPE_IPC_DATA_DATACHANGED: _handleBrokerChangeData(xo->serverVarChangeUnit, tagList); break; default: break; } GWEN_Tag16_List_free(tagList); } } else { DBG_ERROR(NULL, "Invalid IPC protocol %d (%02x)", protoId, protoId); } } void _handleBrokerChangeData(AQHREACT_UNIT *varChangeUnit, const GWEN_TAG16_LIST *tagList) { AQH_VALUE *value; uint64_t numberOfPoints; const uint64_t *dataPoints; value=AQH_IpcdMessageMultiData_ReadValue(tagList); if (value) { AQH_IpcdMessageMultiData_ReadDatapoints(tagList, &dataPoints, &numberOfPoints); DBG_INFO(NULL, "Value changed on server: %s (%d data points)", AQH_Value_GetNameForSystem(value), (int) numberOfPoints); if (numberOfPoints>0 && dataPoints) { uint32_t i; for(i=0; idbArgs) { if (xo->brokerEndpoint) { if (AQH_Object_GetFlags(xo->brokerEndpoint) & AQH_OBJECT_FLAGS_DELETE) { DBG_INFO(NULL, "Deleting broker connection"); AQH_Object_Disable(xo->brokerEndpoint); AQH_Object_free(xo->brokerEndpoint); xo->brokerEndpoint=NULL; } } if (xo->brokerEndpoint==NULL) { time_t now; now=time(NULL); if (_diffInSeconds(now, xo->timestampBrokerDown)>AQH_REACT_SERVER_BROKER_RESTARTTIME) { int rv; DBG_ERROR(NULL, "Restarting broker connection"); rv=_startBroker(o, xo); if (rv<0) { DBG_ERROR(NULL, "here (%d)", rv); } } } } } /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * signal handler * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx */ int _handleSignal(AQH_OBJECT *o, uint32_t slotId, GWEN_UNUSED AQH_OBJECT *senderObject, GWEN_UNUSED int param1, GWEN_UNUSED void *param2) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) { switch(slotId) { case AQH_REACT_SERVER_SLOT_BROKERCLOSED: return _handleBrokerDown(xo); default: break; } } return 0; /* not handled */ } int _handleBrokerDown(AQH_REACT_SERVER *xo) { if (xo->brokerEndpoint) { DBG_ERROR(NULL, "Broker connection down"); AQH_Object_AddFlags(xo->brokerEndpoint, AQH_OBJECT_FLAGS_DELETE); xo->timestampBrokerDown=time(NULL); } return 1; } /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * unit management * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx */ int AQH_ReactServer_ReloadUnitNets(AQH_OBJECT *o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) { int rv; AQHREACT_Unit_List_Clear(xo->unitList); xo->timerUnit=NULL; xo->serverVarChangeUnit=NULL; _setupBuiltinUnits(o, xo); rv=AQH_ReactServer_ReadUnitNetFiles(o); if (rv<0) { DBG_ERROR(NULL, "Error reading network files (%d)", rv); return rv; } { GWEN_BUFFER *dbuf; dbuf=GWEN_Buffer_new(0, 256, 0, 1); AQHREACT_Unit_List_Dump(xo->unitList, dbuf, 2, "Loaded networks:"); fprintf(stderr, "%s\n", GWEN_Buffer_GetStart(dbuf)); GWEN_Buffer_free(dbuf); } return 0; } return GWEN_ERROR_INVALID; } void _setupBuiltinUnits(AQH_OBJECT *aqh, AQH_REACT_SERVER *xo) { AQHREACT_UNIT *unit; unit=AqHomeReact_UnitTimer_new(aqh); AQHREACT_Unit_SetId(unit, ".timer"); AQHREACT_Unit_List_Add(unit, xo->unitList); xo->timerUnit=unit; unit=AqHomeReact_UnitVarChanges_new(aqh); AQHREACT_Unit_SetId(unit, ".updatedValue"); AQHREACT_Unit_List_Add(unit, xo->unitList); xo->serverVarChangeUnit=unit; unit=AqHomeReact_UnitVarChanges_new(aqh); AQHREACT_Unit_SetId(unit, ".updatedVar"); AQHREACT_Unit_List_Add(unit, xo->unitList); xo->localVarChangeUnit=unit; } void AQH_ReactServer_ProcessAllUnits(AQH_OBJECT *o) { int rv; do { rv=_processAllUnits(o); } while (rv==1); } int _processAllUnits(AQH_OBJECT *o) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, o); if (xo) { int result=0; AQHREACT_UNIT *unit; unit=AQHREACT_Unit_List_First(xo->unitList); while(unit) { int rv; rv=AQHREACT_Unit_Process(unit); if (rv>0) result=1; unit=AQHREACT_Unit_List_Next(unit); } return result; } return GWEN_ERROR_INVALID; } AQHREACT_UNIT *AQH_ReactServer_CreateUnitByName(AQH_OBJECT *aqh, const char *unitType) { /* this does not include u_timer and u_varchanges, because those are only created once globally in init.c */ if (aqh && unitType && *unitType) { if (strcasecmp(unitType, "or")==0) return AqHomeReact_UnitOr_new(aqh); else if (strcasecmp(unitType, "and")==0) return AqHomeReact_UnitAnd_new(aqh); else if (strcasecmp(unitType, "xor")==0) return AqHomeReact_UnitXor_new(aqh); else if (strcasecmp(unitType, "valueFilter")==0) return AqHomeReact_UnitValueFilter_new(aqh); else if (strcasecmp(unitType, "valueSet")==0) return AqHomeReact_UnitValueSet_new(aqh); else if (strcasecmp(unitType, "varSet")==0) return AqHomeReact_UnitVarSet_new(aqh); else if (strcasecmp(unitType, "stabilize")==0) return AqHomeReact_UnitStabilize_new(aqh); else if (strcasecmp(unitType, "lowPass")==0) return AqHomeReact_UnitLowPass_new(aqh); else if (strcasecmp(unitType, "highPass")==0) return AqHomeReact_UnitHighPass_new(aqh); else if (strcasecmp(unitType, "zeroPosNegString")==0) return AqHomeReact_UnitZeroPosNegString_new(aqh); else if (strcasecmp(unitType, "suntime")==0) return AqHomeReact_UnitSuntime_new(aqh); else if (strcasecmp(unitType, "timeraction")==0) return AqHomeReact_UnitTimeProgram_new(aqh); else if (strcasecmp(unitType, "average")==0) return AqHomeReact_UnitAverage_new(aqh); else if (strcasecmp(unitType, "minvalue")==0) return AqHomeReact_UnitMinValue_new(aqh); else if (strcasecmp(unitType, "maxvalue")==0) return AqHomeReact_UnitMaxValue_new(aqh); else { AQHREACT_UNIT *unit; DBG_INFO(NULL, "Trying to load network \"%s\"", unitType); unit=AQH_ReactServer_FindAndReadDataDirNetwork(aqh, unitType); if (unit==NULL) { DBG_ERROR(NULL, "Unknown unit type \"%s\"", unitType); return NULL; } else { const char *s; s=AQHREACT_Unit_GetTypeName(unit); if (!(s && *s && strcasecmp(s, unitType)==0)) { DBG_ERROR(NULL, "ERROR: Network file for type \"%s\" contains type \"%s\" instead", unitType, s?s:""); AQHREACT_Unit_free(unit); return NULL; } } return unit; } } return NULL; } AQHREACT_UNIT *AQH_ReactServer_FindUnitByUnitId(const AQH_OBJECT *aqh, const char *unitId) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo && unitId && *unitId) { AQHREACT_UNIT *unit; unit=AQHREACT_Unit_List_GetById(xo->unitList, unitId); if (unit==NULL) { DBG_ERROR(NULL, "Unit \"%s\" not found", unitId); return NULL; } return unit; } } return NULL; } void AQH_ReactServer_AddUnit(AQH_OBJECT *aqh, AQHREACT_UNIT *unit) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo && xo->unitList) AQHREACT_Unit_List_Add(unit, xo->unitList); } } /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * localvar management * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx */ int AQH_ReactServer_SetCharValue(AQH_OBJECT *aqh, const char *path, const char *value) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo) { if (xo->localVars && path && *path) { int rv; uint64_t timestamp; timestamp=(uint64_t) time(NULL); rv=AQH_Vars_SetCharValue(xo->localVars, AQH_VARS_PATHFLAGS_OVERWRITE_VARS, path, value); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return rv; } AqHomeReact_UnitVarChanges_StringVarUpdated(xo->localVarChangeUnit, path, timestamp, value); return 0; } } } return GWEN_ERROR_INVALID; } const char *AQH_ReactServer_GetCharValue(AQH_OBJECT *aqh, const char *path, int idx, const char *defaultValue) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo) { if (xo->localVars && path && *path) { return AQH_Vars_GetCharValue(xo->localVars, path, idx, defaultValue); } } } return defaultValue; } int AQH_ReactServer_SetDoubleValue(AQH_OBJECT *aqh, const char *path, double value) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo && xo->localVars && path && *path) { int rv; uint64_t timestamp; timestamp=(uint64_t) time(NULL); rv=AQH_Vars_SetDoubleValue(xo->localVars, AQH_VARS_PATHFLAGS_OVERWRITE_VARS, path, value); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return rv; } AqHomeReact_UnitVarChanges_DoubleVarUpdated(xo->localVarChangeUnit, path, timestamp, value); return 0; } } return GWEN_ERROR_INVALID; } double AQH_ReactServer_GetDoubleValue(AQH_OBJECT *aqh, const char *path, int idx, double defaultValue) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo && xo->localVars && path && *path) { return AQH_Vars_GetDoubleValue(xo->localVars, path, idx, defaultValue); } } return defaultValue; } int AQH_ReactServer_SetIntValue(AQH_OBJECT *aqh, const char *path, int value) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo && xo->localVars && path && *path) { int rv; uint64_t timestamp; timestamp=(uint64_t) time(NULL); rv=AQH_Vars_SetIntValue(xo->localVars, AQH_VARS_PATHFLAGS_OVERWRITE_VARS, path, value); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return rv; } AqHomeReact_UnitVarChanges_IntVarUpdated(xo->localVarChangeUnit, path, timestamp, value); return 0; } } return GWEN_ERROR_INVALID; } int AQH_ReactServer_GetIntValue(AQH_OBJECT *aqh, const char *path, int idx, int defaultValue) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo && xo->localVars && path && *path) { return AQH_Vars_GetIntValue(xo->localVars, path, idx, defaultValue); } } return defaultValue; } int AQH_ReactServer_IncIntValue(AQH_OBJECT *aqh, const char *path, int startValue, int defaultValue) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo && xo->localVars && path && *path) { int v; v=AQH_Vars_GetIntValue(xo->localVars, path, 0, startValue); v++; AQH_ReactServer_SetIntValue(aqh, path, v); return v; } } return defaultValue; } int AQH_ReactServer_DecIntValue(AQH_OBJECT *aqh, const char *path, int startValue, int defaultValue) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo && xo->localVars && path && *path) { int v; v=AQH_Vars_GetIntValue(xo->localVars, path, 0, startValue); v--; AQH_ReactServer_SetIntValue(aqh, path, v); return v; } } return defaultValue; } int AQH_ReactServer_WriteVarsFile(AQH_OBJECT *aqh) { if (aqh) { AQH_REACT_SERVER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_REACT_SERVER, aqh); if (xo && xo->localVars && xo->varsFile) { int rv; rv=AQH_Vars_WriteDbFile(xo->localVars, xo->varsFile); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return rv; } } return 0; } return GWEN_ERROR_INVALID; } /* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * helper functions * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx */ int _createPidFile(const char *pidFilename) { FILE *f; int pidfd; if (remove(pidFilename)==0) { DBG_ERROR(0, "Old PID file existed, removed. (Unclean shutdown?)"); } #ifdef HAVE_SYS_STAT_H pidfd = open(pidFilename, O_EXCL|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); if (pidfd < 0) { DBG_ERROR(NULL, "Could not create PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); return GWEN_ERROR_IO; } f = fdopen(pidfd, "w"); #else /* HAVE_STAT_H */ f=fopen(pidFilename,"w+"); #endif /* HAVE_STAT_H */ /* write pid */ #ifdef HAVE_GETPID fprintf(f,"%d\n",getpid()); #else fprintf(f,"-1\n"); #endif if (fclose(f)) { DBG_ERROR(0, "Could not close PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); return GWEN_ERROR_IO; } return 0; } int _diffInSeconds(time_t t1, time_t t0) { return t1-t0; } int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) { int rv; const GWEN_ARGS args[]= { /* flags type name min max s long short_descr, long_descr */ { A_ARG, A_CHAR, "loglevel", 0, 1, "L", "loglevel", I18S("Specify loglevel"), NULL}, { A_ARG, A_CHAR, "cfgdir", 0, 1, "D", "cfgdir", I18S("Specify the configuration folder"), NULL}, { A_ARG, A_CHAR, "charset", 0, 1, NULL, "charset", I18S("Specify the output character set"), NULL}, { A_ARG, A_CHAR, "brokerAddress", 0, 1, "ba", "brokerddress", I18S("Broker address [127.0.0.1]"), NULL}, { A_ARG, A_INT, "brokerPort", 0, 1, "bp", "brokerport", I18S("Broker port [1899]"), NULL}, { A_ARG, A_CHAR, "brokerClientId", 0, 1, NULL, "brokerclientid", I18S("Broker client id"), NULL}, { A_ARG, A_CHAR, "deviceFile", 0, 1, "d", "devicefile", I18S("Device file"), NULL}, { A_ARG, A_CHAR, "varsFile", 0, 1, "V", "varsfile", I18S("File to store status variables"), NULL}, { A_ARG, A_CHAR, "pidfile", 0, 1, "p", "pidfile", I18S("PID file"), NULL}, { A_ARG, A_INT, "timeout", 0, 1, "T", NULL, I18S("timeout in seconds [0]"), NULL}, { A_END, A_INT, "help", 0, 0, "h", "help", I18S("Show this help screen"), NULL} }; rv=GWEN_Args_Check(argc, argv, 1, 0, args, dbArgs); if (rv==GWEN_ARGS_RESULT_ERROR) { fprintf(stderr, "ERROR: Could not parse arguments main\n"); return GWEN_ERROR_INVALID; } else if (rv==GWEN_ARGS_RESULT_HELP) { GWEN_BUFFER *ubuf; ubuf=GWEN_Buffer_new(0, 1024, 0, 1); GWEN_Buffer_AppendArgs(ubuf, I18N("This is version %s.\nUsage: %s [OPTIONS]\n\nOptions:\n"), AQHOME_VERSION_STRING, argv[0]); if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) { fprintf(stderr, "ERROR: Could not create help string\n"); return 1; } GWEN_Buffer_AppendString(ubuf, "\n"); fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(ubuf)); GWEN_Buffer_free(ubuf); return GWEN_ERROR_CLOSE; } return 0; }