diff --git a/apps/aqhome-data/0BUILD b/apps/aqhome-data/0BUILD index 37df0d9..ced58b2 100644 --- a/apps/aqhome-data/0BUILD +++ b/apps/aqhome-data/0BUILD @@ -40,12 +40,16 @@ fini.h init.h loop.h + server.h + server_p.h + s_connect.h + s_getdevices.h + s_getvalues.h c_connect.h c_updatedata.h c_getvalues.h c_getdevices.h c_getdatapoints.h - c_getlastdatapoint.h c_setdata.h c_addvalue.h c_annvalue.h @@ -64,11 +68,14 @@ c_getvalues.c c_getdevices.c c_getdatapoints.c - c_getlastdatapoint.c c_setdata.c c_addvalue.c c_annvalue.c c_moddevice.c + server.c + s_connect.c + s_getdevices.c + s_getvalues.c main.c diff --git a/apps/aqhome-data/loop.c b/apps/aqhome-data/loop.c index f86c4d9..a7a2aff 100644 --- a/apps/aqhome-data/loop.c +++ b/apps/aqhome-data/loop.c @@ -15,7 +15,6 @@ #include "./c_connect.h" #include "./c_updatedata.h" #include "./c_getdatapoints.h" -#include "./c_getlastdatapoint.h" #include "./c_getvalues.h" #include "./c_getdevices.h" #include "./c_setdata.h" @@ -242,7 +241,6 @@ void _handleIpcMsg(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg) case AQH_MSGTYPE_IPC_DATA_UPDATEDATA: AqHomeData_HandleUpdateData(aqh, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ: AqHomeData_HandleGetValues(aqh, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_GETDATA_REQ: AqHomeData_HandleGetDataPoints(aqh, ep, msg); break; - case AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ: AqHomeData_HandleGetLastDataPoint(aqh, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_SETDATA: AqHomeData_HandleSetData(aqh, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_ADDVALUE: AqHomeData_HandleAddValue(aqh, ep, msg); break; case AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE: AqHomeData_HandleAnnounceValue(aqh, ep, msg); break; diff --git a/apps/aqhome-data/main.c b/apps/aqhome-data/main.c index e3cb9c1..5f36068 100644 --- a/apps/aqhome-data/main.c +++ b/apps/aqhome-data/main.c @@ -13,10 +13,7 @@ #include #include -#include "./aqhome_data.h" -#include "./init.h" -#include "./fini.h" -#include "./loop.h" +#include "./server.h" #include #include @@ -58,8 +55,9 @@ static int _setupSigAction(struct sigaction *sa, int sig); static void _signalHandler(int s); #endif -static void _runService(AQHOME_DATA *aqh); -static void _writeCurrentState(AQHOME_DATA *aqh); +static void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop); +static void _writeCurrentState(AQH_OBJECT *aqh); +static int _diffInSeconds(time_t t1, time_t t0); @@ -84,7 +82,8 @@ static int stopService=0; int main(int argc, char **argv) { int rv; - AQHOME_DATA *aqh; + AQH_EVENT_LOOP *eventLoop; + AQH_OBJECT *aqh; GWEN_GUI *gui; rv=GWEN_Init(); @@ -111,17 +110,18 @@ int main(int argc, char **argv) gui=GWEN_Gui_CGui_new(); GWEN_Gui_SetGui(gui); - aqh=AqHomeData_new(); - rv=AqHomeData_Init(aqh, argc, argv); + eventLoop=AQH_EventLoop_new(); + aqh=AqHomeDataServer_new(eventLoop); + rv=AqHomeDataServer_Init(aqh, argc, argv); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); return 2; } - _runService(aqh); + _runService(aqh, eventLoop); - AqHomeData_Fini(aqh); - AqHomeData_free(aqh); + //AqHomeData_Fini(aqh); + AQH_Object_free(aqh); GWEN_Gui_SetGui(NULL); GWEN_Gui_free(gui); @@ -131,14 +131,14 @@ int main(int argc, char **argv) -void _runService(AQHOME_DATA *aqh) +void _runService(AQH_OBJECT *aqh, AQH_EVENT_LOOP *eventLoop) { time_t timeStart; time_t timeLastWrite; time_t timeLastConnectionCleanup; int timeout; - timeout=AqHomeData_GetTimeout(aqh); + timeout=AqHomeDataServer_GetTimeout(aqh); timeStart=time(NULL); timeLastWrite=time(NULL); timeLastConnectionCleanup=time(NULL); @@ -146,25 +146,26 @@ void _runService(AQHOME_DATA *aqh) while(!stopService) { time_t now; - DBG_DEBUG(NULL, "Next loop"); - AqHomeData_Loop(aqh, 2000); + DBG_ERROR(NULL, "Next loop (%d clients)", AqHomeDataServer_GetClientNum(aqh)); + AQH_EventLoop_Run(eventLoop, 2000); + AqHomeDataServer_HandleClientMsgs(aqh); now=time(NULL); - if (((int)difftime(now, timeLastConnectionCleanup))>CONNCLEAN_INTERVAL_IN_SECS) { - DBG_DEBUG(NULL, "Cleanup connections"); - GWEN_MsgEndpoint_RemoveUnconnectedAndEmptyChildren(AqHomeData_GetIpcdEndpoint(aqh)); + if (_diffInSeconds(now, timeLastConnectionCleanup)>CONNCLEAN_INTERVAL_IN_SECS) { + DBG_ERROR(NULL, "Cleanup connections"); + AqHomeDataServer_CleanupClients(aqh); timeLastConnectionCleanup=now; } - if (((int)difftime(now, timeLastWrite))>WRITE_INTERVAL_IN_SECS) { - DBG_DEBUG(NULL, "Write time"); + if (_diffInSeconds(now, timeLastWrite)>WRITE_INTERVAL_IN_SECS) { + DBG_ERROR(NULL, "Write time"); _writeCurrentState(aqh); timeLastWrite=now; } - if (timeout && ((int)difftime(now, timeStart))>timeout) { - DBG_INFO(NULL, "Timeout"); + if (timeout && (_diffInSeconds(now, timeStart)>timeout)) { + DBG_ERROR(NULL, "Timeout"); _writeCurrentState(aqh); break; } @@ -173,11 +174,17 @@ void _runService(AQHOME_DATA *aqh) -void _writeCurrentState(AQHOME_DATA *aqh) +int _diffInSeconds(time_t t1, time_t t0) +{ + return t1-t0; +} + + +void _writeCurrentState(AQH_OBJECT *aqh) { int rv; - rv=AqHomeData_WriteStorageIfChanged(aqh); + rv=AqHomeDataServer_WriteStorageIfChanged(aqh); if (rv<0) { DBG_ERROR(NULL, "ATTENTION: Could not write storage statefile (%d)", rv); } diff --git a/apps/aqhome-data/s_connect.c b/apps/aqhome-data/s_connect.c new file mode 100644 index 0000000..631229e --- /dev/null +++ b/apps/aqhome-data/s_connect.c @@ -0,0 +1,82 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./s_connect.h" +#include "./server_p.h" +#include "aqhome/ipc2/endpoint.h" +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/data/m_ipcd.h" +#include "aqhome/msg/ipc/data/m_ipcd_connect.h" +#include "aqhome/msg/ipc/m_ipc_result.h" +#include "aqhome/msg/ipc/m_ipc_tag16.h" + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + +void AqHomeDataServer_HandleConnect(GWEN_UNUSED AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + GWEN_TAG16_LIST *tagList; + AQH_MESSAGE *outMsg; + int resultCode=AQH_MSGDATA_RESULT_SUCCESS; + char *clientId=NULL; + char *userId=NULL; + char *passw=NULL; + uint32_t flags; + + tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0); + clientId=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSGDATA_CONNECT_TAGS_CLIENTID, NULL); + userId=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSGDATA_CONNECT_TAGS_USERID, NULL); + flags=AQH_Tag16_GetTagDataAsUint32(tagList, AQH_MSGDATA_CONNECT_TAGS_FLAGS, 0); + passw=AQH_Tag16_GetTagDataAsNewString(tagList, AQH_MSGDATA_CONNECT_TAGS_PASSWORD, NULL); + + if (clientId) + AQH_Endpoint_SetServiceName(ep, clientId); + if (userId) + AQH_Endpoint_SetUserName(ep, userId); + + if (flags & AQH_MSGDATA_CONNECT_FLAGS_WANTUPDATES) + AQH_Endpoint_AddFlags(ep, AQH_ENDPOINT_FLAGS_WANTUPDATES); + + /* TODO: add user management, for now we allow all */ + AQH_Endpoint_SetPermissions(ep, + AQH_ENDPOINT_PERMS_LISTVALUES | + AQH_ENDPOINT_PERMS_READVALUE | + AQH_ENDPOINT_PERMS_ADDVALUE | + AQH_ENDPOINT_PERMS_LISTDATA | + AQH_ENDPOINT_PERMS_READDATA | + AQH_ENDPOINT_PERMS_ADDDATA | + AQH_ENDPOINT_PERMS_LISTDEVICES | + AQH_ENDPOINT_PERMS_READDEVICE | + AQH_ENDPOINT_PERMS_ADDDEVICE | + AQH_ENDPOINT_PERMS_MODDEVICE); + free(passw); + free(userId); + free(clientId); + + outMsg=AQH_IpcMessageResult_new(AQH_IPC_PROTOCOL_DATA_ID, + AQH_IPC_PROTOCOL_DATA_VERSION, + AQH_MSGTYPE_IPC_DATA_RESULT, + AQH_Endpoint_GetNextMessageId(ep), + AQH_IpcMessage_GetMsgId(msg), + resultCode, NULL); + AQH_Endpoint_AddMsgOut(ep, outMsg); +} + + + diff --git a/apps/aqhome-data/s_connect.h b/apps/aqhome-data/s_connect.h new file mode 100644 index 0000000..5a005aa --- /dev/null +++ b/apps/aqhome-data/s_connect.h @@ -0,0 +1,25 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_DATA_S_CONNECT_H +#define AQHOME_DATA_S_CONNECT_H + + +#include "./server.h" + + +void AqHomeDataServer_HandleConnect(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg); + + + +#endif + + + + + diff --git a/apps/aqhome-data/s_getdevices.c b/apps/aqhome-data/s_getdevices.c new file mode 100644 index 0000000..27deb48 --- /dev/null +++ b/apps/aqhome-data/s_getdevices.c @@ -0,0 +1,116 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./s_getdevices.h" +#include "./server_p.h" +#include "aqhome/ipc2/endpoint.h" +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/data/m_ipcd.h" +#include "aqhome/msg/ipc/data/m_ipcd_devices.h" +#include "aqhome/msg/ipc/m_ipc_result.h" +#include "aqhome/msg/ipc/m_ipc_tag16.h" + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + +#define AQHOMEDATA_DEVICESPERMSG 10 + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void _sendDeviceList(AQH_OBJECT *ep, const AQH_DEVICE_LIST *vl, uint32_t flags, uint32_t refMsgId); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + +void AqHomeDataServer_HandleGetDevices(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + AQHOME_SERVER *xo; + + xo=AqHomeDataServer_GetServerData(o); + if (xo) { + const AQH_DEVICE_LIST *origDeviceList; + uint32_t refMsgId; + + refMsgId=AQH_IpcMessage_GetMsgId(msg); + + DBG_ERROR(NULL, "HandleGetDevices"); + origDeviceList=AQH_Storage_GetDeviceList(xo->storage); + if (origDeviceList) { + DBG_ERROR(NULL, "Have a list of %d devices", AQH_Device_List_GetCount(origDeviceList)); + if (AQH_Device_List_GetCount(origDeviceList)=AQHOMEDATA_DEVICESPERMSG) { + DBG_ERROR(NULL, "Sending %d devices", AQH_Device_List_GetCount(tmpDeviceList)); + _sendDeviceList(ep, tmpDeviceList, next?0:AQH_MSGDATA_DEVICES_FLAGS_LASTMSG, refMsgId); + AQH_Device_List_Clear(tmpDeviceList); + } + v=next; + } + if (AQH_Device_List_GetCount(tmpDeviceList)) { + DBG_ERROR(NULL, "Sending %d devices", AQH_Device_List_GetCount(tmpDeviceList)); + _sendDeviceList(ep, tmpDeviceList, AQH_MSGDATA_DEVICES_FLAGS_LASTMSG, refMsgId); /* send remaining */ + } + AQH_Device_List_free(tmpDeviceList); + } + } + else { + /* empty list */ + _sendDeviceList(ep, NULL, AQH_MSGDATA_DEVICES_FLAGS_LASTMSG, refMsgId); + } + } +} + + + +void _sendDeviceList(AQH_OBJECT *ep, const AQH_DEVICE_LIST *vl, uint32_t flags, uint32_t refMsgId) +{ + AQH_MESSAGE *msg; + + DBG_ERROR(NULL, "Sending msg (refMsgId=%d)", refMsgId); + msg=AQH_IpcdMessageDevices_new(AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP, AQH_Endpoint_GetNextMessageId(ep), refMsgId, flags, vl); + AQH_Endpoint_AddMsgOut(ep, msg); +} + + + diff --git a/apps/aqhome-data/s_getdevices.h b/apps/aqhome-data/s_getdevices.h new file mode 100644 index 0000000..3216188 --- /dev/null +++ b/apps/aqhome-data/s_getdevices.h @@ -0,0 +1,25 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_DATA_S_GETDEVICES_H +#define AQHOME_DATA_S_GETDEVICES_H + + +#include "./server.h" + + +void AqHomeDataServer_HandleGetDevices(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg); + + + +#endif + + + + + diff --git a/apps/aqhome-data/s_getvalues.c b/apps/aqhome-data/s_getvalues.c new file mode 100644 index 0000000..ca2796d --- /dev/null +++ b/apps/aqhome-data/s_getvalues.c @@ -0,0 +1,117 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./s_getvalues.h" +#include "./server_p.h" +#include "aqhome/ipc2/endpoint.h" +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/data/m_ipcd.h" +#include "aqhome/msg/ipc/data/m_ipcd_values.h" +#include "aqhome/msg/ipc/m_ipc_result.h" +#include "aqhome/msg/ipc/m_ipc_tag16.h" + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + +#define AQHOMEDATA_VALUESPERMSG 10 + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void _sendValueList(AQH_OBJECT *ep, const AQH_VALUE_LIST *vl, uint32_t flags, uint32_t refMsgId); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + +void AqHomeDataServer_HandleGetValues(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg) +{ + AQHOME_SERVER *xo; + + xo=AqHomeDataServer_GetServerData(o); + if (xo) { + const AQH_VALUE_LIST *origValueList; + + uint32_t refMsgId; + + refMsgId=AQH_IpcMessage_GetMsgId(msg); + + DBG_INFO(NULL, "HandleGetValues"); + origValueList=AQH_Storage_GetValueList(xo->storage); + if (origValueList) { + DBG_INFO(NULL, "Have a list of %d values", AQH_Value_List_GetCount(origValueList)); + if (AQH_Value_List_GetCount(origValueList)=AQHOMEDATA_VALUESPERMSG) { + DBG_INFO(NULL, "Sending %d values", AQH_Value_List_GetCount(tmpValueList)); + _sendValueList(ep, tmpValueList, next?0:AQH_MSGDATA_VALUES_FLAGS_LASTMSG, refMsgId); + AQH_Value_List_Clear(tmpValueList); + } + v=next; + } + if (AQH_Value_List_GetCount(tmpValueList)) { + DBG_INFO(NULL, "Sending %d values", AQH_Value_List_GetCount(tmpValueList)); + _sendValueList(ep, tmpValueList, AQH_MSGDATA_VALUES_FLAGS_LASTMSG, refMsgId); /* send remaining */ + } + AQH_Value_List_free(tmpValueList); + } + } + else { + /* empty list */ + _sendValueList(ep, NULL, AQH_MSGDATA_VALUES_FLAGS_LASTMSG, refMsgId); + } + } +} + + + +void _sendValueList(AQH_OBJECT *ep, const AQH_VALUE_LIST *vl, uint32_t flags, uint32_t refMsgId) +{ + AQH_MESSAGE *msg; + + DBG_ERROR(NULL, "Sending msg (refMsgId=%d)", refMsgId); + msg=AQH_IpcdMessageValues_new(AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP, AQH_Endpoint_GetNextMessageId(ep), refMsgId, flags, vl); + AQH_Endpoint_AddMsgOut(ep, msg); +} + + + diff --git a/apps/aqhome-data/s_getvalues.h b/apps/aqhome-data/s_getvalues.h new file mode 100644 index 0000000..f606606 --- /dev/null +++ b/apps/aqhome-data/s_getvalues.h @@ -0,0 +1,25 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_DATA_S_GETVALUES_H +#define AQHOME_DATA_S_GETVALUES_H + + +#include "./server.h" + + +void AqHomeDataServer_HandleGetValues(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg); + + + +#endif + + + + + diff --git a/apps/aqhome-data/server.c b/apps/aqhome-data/server.c new file mode 100644 index 0000000..a6efb7e --- /dev/null +++ b/apps/aqhome-data/server.c @@ -0,0 +1,719 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./server_p.h" +#include "./s_connect.h" +#include "./s_getdevices.h" +#include "./s_getvalues.h" + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + +#define I18N(msg) msg +#define I18S(msg) msg + + + +enum { + AQH_AQHOME_SERVER_SLOT_NEWCLIENT=1, + AQH_AQHOME_SERVER_SLOT_CLOSED +}; + + + +/* ------------------------------------------------------------------------------------------------ + * global vars + * ------------------------------------------------------------------------------------------------ + */ + +GWEN_INHERIT(AQH_OBJECT, AQHOME_SERVER) + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); +static int _setupStorage(AQHOME_SERVER *xo, GWEN_DB_NODE *dbArgs); +static int _setupIpc(AQH_OBJECT *o, AQHOME_SERVER *xo, GWEN_DB_NODE *dbArgs); +static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); +static int _handleNewClient(AQH_OBJECT *o, AQH_OBJECT *clientEndpoint); +static int _handleClientDown(AQH_OBJECT *o, AQH_OBJECT *clientEndpoint); +static void _handleMsgsFromClient(AQH_OBJECT *o, AQHOME_SERVER *xo, AQH_OBJECT *ep); +static void _handleMsgFromClient(AQH_OBJECT *o, AQH_OBJECT *ep, AQH_MESSAGE *msg); +static int _createPidFile(const char *pidFilename); +static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs); + + + +/* ------------------------------------------------------------------------------------------------ + * implementation + * ------------------------------------------------------------------------------------------------ + */ + +AQH_OBJECT *AqHomeDataServer_new(AQH_EVENT_LOOP *eventLoop) +{ + AQH_OBJECT *o; + AQHOME_SERVER *xo; + + o=AQH_Object_new(eventLoop); + GWEN_NEW_OBJECT(AQHOME_SERVER, xo); + GWEN_INHERIT_SETDATA(AQH_OBJECT, AQHOME_SERVER, o, xo, _freeData); + xo->storageMutex=GWEN_Mutex_new(); + xo->tcpClientList=AQH_Object_List_new(); + + AQH_Object_SetSignalHandlerFn(o, _handleSignal); + + return o; +} + + + +void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) +{ + AQHOME_SERVER *xo; + + xo=(AQHOME_SERVER*) p; + GWEN_Mutex_free(xo->storageMutex); + if (xo->ipcServer) { + AQH_Object_Disable(xo->ipcServer); + AQH_Object_free(xo->ipcServer); + xo->ipcServer=NULL; + } + if (xo->tcpClientList) { + AQH_Object_List_free(xo->tcpClientList); + xo->tcpClientList=NULL; + } + GWEN_DB_Group_free(xo->dbArgs); + AQH_Storage_free(xo->storage); + free(xo->pidFile); + + GWEN_FREE_OBJECT(xo); +} + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * getters, setters + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +AQHOME_SERVER *AqHomeDataServer_GetServerData(const AQH_OBJECT *o) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + return xo; +} + + + +int AqHomeDataServer_GetTimeout(const AQH_OBJECT *o) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) + return xo->timeout; + return 0; +} + + + +int AqHomeDataServer_GetClientNum(const AQH_OBJECT *o) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) + return AQH_Object_List_GetCount(xo->tcpClientList); + return 0; +} + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * init + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int AqHomeDataServer_Init(AQH_OBJECT *o, int argc, char **argv) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) { + GWEN_DB_NODE *dbArgs; + int rv; + const char *s; + + dbArgs=GWEN_DB_Group_new("args"); + rv=_readArgs(argc, argv, dbArgs); + if (rv<0) { + DBG_ERROR(NULL, "Error reading args (%d)", rv); + return rv; + } + AQH_MergeConfigFileIntoConfig(dbArgs, "ConfigFile"); + + xo->dbArgs=dbArgs; + + s=GWEN_DB_GetCharValue(dbArgs, "loglevel", 0, NULL); + if (s && *s) { + GWEN_LOGGER_LEVEL ll; + + ll=GWEN_Logger_Name2Level(s); + GWEN_Logger_SetLevel(NULL, ll); + } + + xo->timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0); + + s=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, AQHOME_DATA_DEFAULT_PIDFILE); + if (s && *s) { + free(xo->pidFile); + xo->pidFile=strdup(s); + rv=_createPidFile(s); + if (rv<0) { + DBG_ERROR(NULL, "Error creating PID file (%d)", rv); + return rv; + } + } + + rv=_setupStorage(xo, dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + + rv=_setupIpc(o, xo, dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + + return 0; + } + else { + DBG_ERROR(NULL, "Not of type AQHOME_SERVER object"); + return GWEN_ERROR_INVALID; + } +} + + + +int _setupStorage(AQHOME_SERVER *xo, GWEN_DB_NODE *dbArgs) +{ + const char *dataFolder; + GWEN_BUFFER *nameBuf; + AQH_STORAGE *sto; + int rv; + + dataFolder=GWEN_DB_GetCharValue(dbArgs, "dataFolder", 0, AQHOME_DATA_DEFAULT_DATADIR); + nameBuf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Buffer_AppendArgs(nameBuf, "%s%s%s", dataFolder, GWEN_DIR_SEPARATOR_S, AQHOME_DATA_STATEFILENAME); + + sto=AQH_Storage_new(); + AQH_Storage_SetStateFile(sto, GWEN_Buffer_GetStart(nameBuf)); + AQH_Storage_SetDataFileFolder(sto, dataFolder); + GWEN_Buffer_free(nameBuf); + + rv=AQH_Storage_Init(sto); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + AQH_Storage_free(sto); + return rv; + } + xo->storage=sto; + return 0; +} + + + +int _setupIpc(AQH_OBJECT *o, AQHOME_SERVER *xo, GWEN_DB_NODE *dbArgs) +{ + const char *tcpAddress; + int tcpPort; + + tcpAddress=GWEN_DB_GetCharValue(dbArgs, "tcpAddress", 0, NULL); + if (!(tcpAddress && *tcpAddress)) + tcpAddress=GWEN_DB_GetCharValue(dbArgs, "ConfigFile/brokerAddress", 0, NULL); + + tcpPort=GWEN_DB_GetIntValue(dbArgs, "tcpPort", 0, -1); + if (tcpPort<0) + tcpPort=GWEN_DB_GetIntValue(dbArgs, "ConfigFile/brokerPort", 0, AQHOME_DATA_DEFAULT_IPC_PORT); + + if (tcpAddress && *tcpAddress && tcpPort>0) { + int fd; + + DBG_ERROR(NULL, "Starting TCP service on \"%s\":%d", tcpAddress, tcpPort); + fd=AQH_TcpdObject_CreateListeningSocket(tcpAddress, tcpPort); + if (fd<0) { + DBG_INFO(NULL, "here"); + return GWEN_ERROR_IO; + } + + xo->ipcServer=AQH_IpcServerObject_new(AQH_Object_GetEventLoop(o), fd); + AQH_Object_AddLink(xo->ipcServer, AQH_IPC_SERVER_SIGNAL_NEWCLIENT, AQH_AQHOME_SERVER_SLOT_NEWCLIENT, o); + AQH_Object_Enable(xo->ipcServer); + return 0; + } + else { + DBG_ERROR(NULL, "Missing server address"); + return GWEN_ERROR_GENERIC; + } +} + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * signal handler + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, GWEN_UNUSED int param1, void *param2) +{ + switch(slotId) { + case AQH_AQHOME_SERVER_SLOT_NEWCLIENT: return _handleNewClient(o, (AQH_OBJECT*) param2); + case AQH_AQHOME_SERVER_SLOT_CLOSED: return _handleClientDown(o, senderObject); + default: + break; + } + + return 0; /* not handled */ +} + + + +int _handleNewClient(AQH_OBJECT *o, AQH_OBJECT *clientEndpoint) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) { + DBG_ERROR(NULL, "New IPC client"); + AQH_Object_AddLink(clientEndpoint, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_AQHOME_SERVER_SLOT_CLOSED, o); + AQH_Object_List_Add(clientEndpoint, xo->tcpClientList); + return 1; /* handled */ + } + + return 0; /* not handled */ +} + + + +int _handleClientDown(AQH_OBJECT *o, AQH_OBJECT *clientEndpoint) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) { + DBG_ERROR(NULL, "IPC client down"); + AQH_Object_AddFlags(clientEndpoint, AQH_OBJECT_FLAGS_DELETE); + return 1; /* handled */ + } + + return 0; /* not handled */ +} + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * client management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +void AqHomeDataServer_CleanupClients(AQH_OBJECT *o) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) { + AQH_OBJECT *ep; + + ep=AQH_Object_List_First(xo->tcpClientList); + while(ep) { + AQH_OBJECT *epNext; + + epNext=AQH_Object_List_Next(ep); + if (AQH_Object_GetFlags(ep) & AQH_OBJECT_FLAGS_DELETE) { + AQH_Object_List_Del(ep); + AQH_Object_free(ep); + } + ep=epNext; + } /* while */ + } +} + + + +void AqHomeDataServer_HandleClientMsgs(AQH_OBJECT *o) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) { + AQH_OBJECT *ep; + + ep=AQH_Object_List_First(xo->tcpClientList); + while(ep) { + AQH_OBJECT *epNext; + + epNext=AQH_Object_List_Next(ep); + _handleMsgsFromClient(o, xo, ep); + ep=epNext; + } /* while */ + } +} + + + +void _handleMsgsFromClient(AQH_OBJECT *o, AQHOME_SERVER *xo, AQH_OBJECT *ep) +{ + AQH_MESSAGE *msg; + + while( (msg=AQH_Endpoint_GetNextMsgIn(ep)) ) { + AQH_Message_SetObject(msg, ep); + if (AQH_Request_Tree2_HandleIpcMsg(xo->requestTree, ep, msg)!=AQH_MSG_REQUEST_RESULT_HANDLED) + _handleMsgFromClient(o, ep, msg); + AQH_Message_free(msg); + } +} + + + +void _handleMsgFromClient(AQH_OBJECT *o, AQH_OBJECT *ep, AQH_MESSAGE *msg) +{ + uint16_t code; + uint8_t protoId; + + /* exec IPC message */ + code=AQH_IpcMessage_GetCode(msg); + protoId=AQH_IpcMessage_GetProtoId(msg); + if (protoId==AQH_IPC_PROTOCOL_DATA_ID) { + DBG_ERROR(NULL, "Received IPC packet %d (%x)", (int) code, code); + switch(code) { + case AQH_MSGTYPE_IPC_DATA_CONNECT_REQ: AqHomeDataServer_HandleConnect(o, ep, msg); break; + case AQH_MSGTYPE_IPC_DATA_UPDATEDATA: break; + case AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ: AqHomeDataServer_HandleGetValues(o, ep, msg); break; + case AQH_MSGTYPE_IPC_DATA_GETDATA_REQ: break; + case AQH_MSGTYPE_IPC_DATA_SETDATA: break; + case AQH_MSGTYPE_IPC_DATA_ADDVALUE: break; + case AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE: break; + case AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ: AqHomeDataServer_HandleGetDevices(o, ep, msg); break; + case AQH_MSGTYPE_IPC_DATA_MODDEVICE_REQ: break; + default: break; + } + } + else { + DBG_ERROR(NULL, "Invalid IPC protocol %d (%02x)", protoId, protoId); + } + +} + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * request management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +AQH_MSG_REQUEST *AqHomeDataServer_GetRequestTree(const AQH_OBJECT *o) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) + return xo->requestTree; + return NULL; +} + + + +void AqHomeDataServer_AddRequestToTree(AQH_OBJECT *o, AQH_MSG_REQUEST *rq) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo && rq) + AQH_MsgRequest_Tree2_AddChild(xo->requestTree, rq); +} + + + +void AqHomeDataServer_CleanupRequests(AQH_OBJECT *o) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) { + AQH_Request_Tree2_CheckTimeouts(xo->requestTree); + AQH_Request_Tree2_Cleanup(xo->requestTree); + } +} + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * storage management functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int AqHomeDataServer_LockStorage(AQH_OBJECT *o) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) { + int rv; + + rv=GWEN_Mutex_Lock(xo->storageMutex); + if (rv<0) { + DBG_ERROR(AQH_LOGDOMAIN, "Error obtaining lock on storage mutex"); + return rv; + } + return rv; + } + return GWEN_ERROR_INVALID; +} + + + +int AqHomeDataServer_UnlockStorage(AQH_OBJECT *o) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) { + int rv; + + rv=GWEN_Mutex_Unlock(xo->storageMutex); + if (rv<0) { + DBG_ERROR(AQH_LOGDOMAIN, "Error releasing lock on storage mutex"); + return rv; + } + return rv; + } + return GWEN_ERROR_INVALID; +} + + + +int AqHomeDataServer_WriteStorageIfChanged(AQH_OBJECT *o) +{ + AQHOME_SERVER *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQHOME_SERVER, o); + if (xo) { + if (AQH_Storage_GetRuntimeFlags(xo->storage) & AQH_STORAGE_RTFLAGS_MODIFIED) { + int rv; + + DBG_INFO(NULL, "Storage modified, writing statefile"); + rv=AqHomeDataServer_LockStorage(o); + if (rv<0) { + DBG_INFO(NULL, "Error locking storage (%d)", rv); + return rv; + } + rv=AQH_Storage_WriteState(xo->storage); + if (rv<0) { + DBG_INFO(NULL, "Error writing state file (%d)", rv); + AqHomeDataServer_UnlockStorage(o); + return rv; + } + + rv=AqHomeDataServer_UnlockStorage(o); + if (rv<0) { + DBG_INFO(NULL, "Error unlocking storage (%d)", rv); + return rv; + } + } + + return 0; + } + return GWEN_ERROR_INVALID; +} + + + + +/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + * helper functions + * xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + */ + +int _createPidFile(const char *pidFilename) +{ + FILE *f; + int pidfd; + + if (remove(pidFilename)==0) { + DBG_ERROR(0, "Old PID file existed, removed. (Unclean shutdown?)"); + } + +#ifdef HAVE_SYS_STAT_H + pidfd = open(pidFilename, O_EXCL|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); + if (pidfd < 0) { + DBG_ERROR(NULL, "Could not create PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + + f = fdopen(pidfd, "w"); +#else /* HAVE_STAT_H */ + f=fopen(pidFilename,"w+"); +#endif /* HAVE_STAT_H */ + + /* write pid */ +#ifdef HAVE_GETPID + fprintf(f,"%d\n",getpid()); +#else + fprintf(f,"-1\n"); +#endif + if (fclose(f)) { + DBG_ERROR(0, "Could not close PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + return 0; +} + + + +int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) +{ + int rv; + const GWEN_ARGS args[]= { + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "loglevel", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "L", /* short option */ + "loglevel", /* long option */ + I18S("Specify loglevel"), /* short description */ + I18S("Specify loglevel") /* long description */ + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "tcpAddress", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "t", /* short option */ + "tcpaddress", /* long option */ + I18S("Specify the TCP address to listen on (disabled if missing)"), + I18S("Specify the TCP address to listen on (disabled if missing)") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "tcpPort", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "P", /* short option */ + "tcpport", /* long option */ + I18S("Specify the TCP port to listen on"), + I18S("Specify the TCP port to listen on") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "datafolder", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + NULL, /* short option */ + "datafolder", /* long option */ + I18S("Folder where data files are stored"), + I18S("Folder where data files are stored") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "pidfile", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "p", /* short option */ + "pidfile", /* long option */ + I18S("Specify the PID file"), + I18S("Specify the PID file") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "timeout", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "T", /* short option */ + "timeout", /* long option */ + I18S("Specify timeout in second (default: no timeout)"), + I18S("Specify timeout in second (default: no timeout)") + }, + { + GWEN_ARGS_FLAGS_HELP | GWEN_ARGS_FLAGS_LAST, /* flags */ + GWEN_ArgsType_Int, /* type */ + "help", /* name */ + 0, /* minnum */ + 0, /* maxnum */ + "h", /* short option */ + "help", + I18S("Show this help screen."), + I18S("Show this help screen.") + } + }; + + rv=GWEN_Args_Check(argc, argv, 1, 0, args, dbArgs); + if (rv==GWEN_ARGS_RESULT_ERROR) { + fprintf(stderr, "ERROR: Could not parse arguments main\n"); + return GWEN_ERROR_INVALID; + } + else if (rv==GWEN_ARGS_RESULT_HELP) { + GWEN_BUFFER *ubuf; + + ubuf=GWEN_Buffer_new(0, 1024, 0, 1); + GWEN_Buffer_AppendArgs(ubuf, + I18N("This is version %s.\nUsage: %s [OPTIONS]\n\nOptions:\n"), + AQHOME_VERSION_STRING, + argv[0]); + if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) { + fprintf(stderr, "ERROR: Could not create help string\n"); + return 1; + } + GWEN_Buffer_AppendString(ubuf, "\n"); + + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(ubuf)); + GWEN_Buffer_free(ubuf); + return GWEN_ERROR_CLOSE; + } + return 0; +} + + + + diff --git a/apps/aqhome-data/server.h b/apps/aqhome-data/server.h new file mode 100644 index 0000000..30cf8b1 --- /dev/null +++ b/apps/aqhome-data/server.h @@ -0,0 +1,59 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_DATA_SERVER_H +#define AQHOME_DATA_SERVER_H + + +#include +#include +#include + + +#define AQH_ENDPOINT_FLAGS_WANTUPDATES 0x0001 + +#define AQH_ENDPOINT_PERMS_LISTVALUES 0x0001 +#define AQH_ENDPOINT_PERMS_READVALUE 0x0002 +#define AQH_ENDPOINT_PERMS_ADDVALUE 0x0004 + +#define AQH_ENDPOINT_PERMS_LISTDATA 0x0010 +#define AQH_ENDPOINT_PERMS_READDATA 0x0020 +#define AQH_ENDPOINT_PERMS_ADDDATA 0x0040 +#define AQH_ENDPOINT_PERMS_SETDATA 0x0080 + +#define AQH_ENDPOINT_PERMS_LISTDEVICES 0x0100 +#define AQH_ENDPOINT_PERMS_READDEVICE 0x0200 +#define AQH_ENDPOINT_PERMS_ADDDEVICE 0x0400 +#define AQH_ENDPOINT_PERMS_MODDEVICE 0x0800 + + + +AQH_OBJECT *AqHomeDataServer_new(AQH_EVENT_LOOP *eventLoop); +int AqHomeDataServer_Init(AQH_OBJECT *o, int argc, char **argv); + +int AqHomeDataServer_GetTimeout(const AQH_OBJECT *o); +int AqHomeDataServer_GetClientNum(const AQH_OBJECT *o); + + +void AqHomeDataServer_CleanupClients(AQH_OBJECT *o); +void AqHomeDataServer_HandleClientMsgs(AQH_OBJECT *o); + +AQH_MSG_REQUEST *AqHomeDataServer_GetRequestTree(const AQH_OBJECT *o); +void AqHomeDataServer_AddRequestToTree(AQH_OBJECT *o, AQH_MSG_REQUEST *rq); +void AqHomeDataServer_CleanupRequests(AQH_OBJECT *o); + +int AqHomeDataServer_LockStorage(AQH_OBJECT *o); +int AqHomeDataServer_UnlockStorage(AQH_OBJECT *o); +int AqHomeDataServer_WriteStorageIfChanged(AQH_OBJECT *o); + + + + +#endif + + diff --git a/apps/aqhome-data/server_p.h b/apps/aqhome-data/server_p.h new file mode 100644 index 0000000..43be9cc --- /dev/null +++ b/apps/aqhome-data/server_p.h @@ -0,0 +1,49 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_DATA_SERVER_P_H +#define AQHOME_DATA_SERVER_P_H + + +#include "./server.h" + +#include + +#include + + +#define AQHOME_DATA_DEFAULT_PIDFILE "/var/run/aqhome-data.pid" +#define AQHOME_DATA_DEFAULT_DATADIR "/var/lib/aqhome-data/data" +#define AQHOME_DATA_DEFAULT_IPC_PORT 45456 + +#define AQHOME_DATA_STATEFILENAME "statefile" + + + +typedef struct AQHOME_SERVER AQHOME_SERVER; +struct AQHOME_SERVER { + AQH_OBJECT *ipcServer; + AQH_OBJECT_LIST *tcpClientList; + + AQH_MSG_REQUEST *requestTree; + + GWEN_DB_NODE *dbArgs; + AQH_STORAGE *storage; + char *pidFile; + int timeout; /* timeout for run e.g. inside valgrind */ + GWEN_MUTEX *storageMutex; +}; + + +AQHOME_SERVER *AqHomeDataServer_GetServerData(const AQH_OBJECT *o); + + + +#endif + + diff --git a/apps/aqhome-tool/data/0BUILD b/apps/aqhome-tool/data/0BUILD index 25f9605..ae8f4e5 100644 --- a/apps/aqhome-tool/data/0BUILD +++ b/apps/aqhome-tool/data/0BUILD @@ -33,6 +33,8 @@ + client.h + client_p.h getvalues.h getdevices.h adddata.h @@ -47,6 +49,7 @@ $(local/typefiles) + client.c getvalues.c getdevices.c adddata.c diff --git a/apps/aqhome-tool/data/client.c b/apps/aqhome-tool/data/client.c new file mode 100644 index 0000000..443b1b3 --- /dev/null +++ b/apps/aqhome-tool/data/client.c @@ -0,0 +1,222 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./client_p.h" +#include "../utils.h" + +#include "aqhome/aqhome.h" +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/m_ipc_tag16.h" +#include "aqhome/msg/ipc/m_ipc_result.h" +#include "aqhome/ipc2/endpoint.h" + +#include +#include +#include +#include + + + +GWEN_INHERIT(AQH_OBJECT, AQH_TOOL_CLIENT) + + + + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); + +static int _sendWaitHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo); +static AQH_MESSAGE *_createRequestMessage(AQH_OBJECT *o, uint32_t msgId); +static int _handleResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList); + + + + + +AQH_OBJECT *AQH_ToolClient_new(AQH_EVENT_LOOP *eventLoop, GWEN_DB_NODE *dbGlobalArgs, const GWEN_ARGS *argDescrs) +{ + AQH_OBJECT *o; + AQH_TOOL_CLIENT *xo; + + o=AQH_Object_new(eventLoop); + GWEN_NEW_OBJECT(AQH_TOOL_CLIENT, xo); + GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o, xo, _freeData); + xo->dbGlobalArgs=dbGlobalArgs; + xo->args=argDescrs; + + return o; +} + + + +void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) +{ + AQH_TOOL_CLIENT *xo; + + xo=(AQH_TOOL_CLIENT*)p; + GWEN_DB_Group_free(xo->dbLocalArgs); + GWEN_DB_Group_free(xo->dbGlobalArgs); + AQH_Object_free(xo->ipcEndpoint); + GWEN_FREE_OBJECT(xo); +} + + + +int AQH_ToolClient_ReadLocalArgs(AQH_OBJECT *o, int argc, char **argv) +{ + if (o) { + AQH_TOOL_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); + if (xo) { + int rv; + + GWEN_DB_Group_free(xo->dbLocalArgs); + xo->dbLocalArgs=GWEN_DB_GetGroup(xo->dbGlobalArgs, GWEN_DB_FLAGS_DEFAULT, "local"); + rv=GWEN_Args_Check(argc, argv, 1, GWEN_ARGS_MODE_ALLOW_FREEPARAM, xo->args, xo->dbLocalArgs); + if (rv==GWEN_ARGS_RESULT_ERROR) { + fprintf(stderr, "ERROR: Could not parse arguments\n"); + return 1; + } + else if (rv==GWEN_ARGS_RESULT_HELP) { + GWEN_BUFFER *ubuf; + + ubuf=GWEN_Buffer_new(0, 1024, 0, 1); + if (GWEN_Args_Usage(xo->args, ubuf, GWEN_ArgsOutType_Txt)) { + fprintf(stderr, "ERROR: Could not create help string\n"); + return 1; + } + fprintf(stderr, "%s\n", GWEN_Buffer_GetStart(ubuf)); + GWEN_Buffer_free(ubuf); + return 1; + } + xo->timeoutInSeconds=GWEN_DB_GetIntValue(xo->dbLocalArgs, "timeout", 0, 5); + + AQH_MergeConfigFileIntoConfig(xo->dbLocalArgs, "ConfigFile"); + return 0; + } + } + return 1; +} + + + +void AQH_ToolClient_SetCreateRequestMessageFn(AQH_OBJECT *o, AQH_TOOLCLIENT_CREATEREQUESTMESSAGE_FN f) +{ + AQH_TOOL_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); + if (xo) + xo->createRequestMessageFn=f; +} + + + +void AQH_ToolClient_SetHandleResponseMessageFn(AQH_OBJECT *o, AQH_TOOLCLIENT_HANDLERESPONSEMESSAGE_FN f) +{ + AQH_TOOL_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); + if (xo) + xo->handleResponseMessageFn=f; +} + + + +int AQH_ToolClient_Run(AQH_OBJECT *o) +{ + AQH_TOOL_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); + if (xo) { + xo->ipcEndpoint=Utils2_SetupBrokerClientEndpoint(AQH_Object_GetEventLoop(o), xo->dbLocalArgs, 0); + if (xo->ipcEndpoint==NULL) { + DBG_ERROR(NULL, "ERROR creating TCP connection"); + return 2; + } + return _sendWaitHandle(o, xo); + } + return GWEN_ERROR_INVALID; +} + + + +int _sendWaitHandle(AQH_OBJECT *o, AQH_TOOL_CLIENT *xo) +{ + AQH_EVENT_LOOP *eventLoop; + AQH_MESSAGE *msgOut; + uint32_t msgId; + + eventLoop=AQH_Object_GetEventLoop(o); + msgId=AQH_Endpoint_GetNextMessageId(xo->ipcEndpoint); + msgOut=_createRequestMessage(o, msgId); + if (msgOut==NULL) { + DBG_ERROR(NULL, "Error creating outbound message"); + return 2; + } + AQH_Endpoint_AddMsgOut(xo->ipcEndpoint, msgOut); + + for (;;) { + AQH_MESSAGE *msgIn; + + msgIn=Utils2_WaitForResponseMsg(eventLoop, xo->ipcEndpoint, msgId, xo->timeoutInSeconds); + if (msgIn) { + GWEN_TAG16_LIST *tagList; + + tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); + if (tagList) { + int rv; + + rv=_handleResponseMessage(o, msgIn, tagList); + AQH_Message_free(msgIn); + if (rv<0) { + DBG_ERROR(NULL, "here (%d)", rv); + return 3; + } + else if (rv==1) { + DBG_ERROR(NULL, "Done."); + return 0; + } + } + } + } /* for */ + return 1; +} + + + +AQH_MESSAGE *_createRequestMessage(AQH_OBJECT *o, uint32_t msgId) +{ + AQH_TOOL_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); + if (xo) { + if (xo->createRequestMessageFn) + return xo->createRequestMessageFn(o, msgId); + } + return NULL; +} + + + +int _handleResponseMessage(AQH_OBJECT *o, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList) +{ + AQH_TOOL_CLIENT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TOOL_CLIENT, o); + if (xo) { + if (xo->handleResponseMessageFn) + return xo->handleResponseMessageFn(o, msg, tagList); + } + return 0; +} + + diff --git a/apps/aqhome-tool/data/client.h b/apps/aqhome-tool/data/client.h new file mode 100644 index 0000000..49a4b35 --- /dev/null +++ b/apps/aqhome-tool/data/client.h @@ -0,0 +1,33 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_TOOL_CLIENT_H +#define AQHOME_TOOL_CLIENT_H + + + +#include "aqhome/msg/ipc/m_ipc_tag16.h" + +#include + + +typedef AQH_MESSAGE* (*AQH_TOOLCLIENT_CREATEREQUESTMESSAGE_FN)(AQH_OBJECT *o, uint32_t msgId); +typedef int (*AQH_TOOLCLIENT_HANDLERESPONSEMESSAGE_FN)(AQH_OBJECT *o, const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList); + + +AQH_OBJECT *AQH_ToolClient_new(AQH_EVENT_LOOP *eventLoop, GWEN_DB_NODE *dbGlobalArgs, const GWEN_ARGS *argDescrs); +int AQH_ToolClient_ReadLocalArgs(AQH_OBJECT *o, int argc, char **argv); +int AQH_ToolClient_Run(AQH_OBJECT *o); + +void AQH_ToolClient_SetCreateRequestMessageFn(AQH_OBJECT *o, AQH_TOOLCLIENT_CREATEREQUESTMESSAGE_FN f); +void AQH_ToolClient_SetHandleResponseMessageFn(AQH_OBJECT *o, AQH_TOOLCLIENT_HANDLERESPONSEMESSAGE_FN f); + + + +#endif + diff --git a/apps/aqhome-tool/data/client_p.h b/apps/aqhome-tool/data/client_p.h new file mode 100644 index 0000000..cb5378d --- /dev/null +++ b/apps/aqhome-tool/data/client_p.h @@ -0,0 +1,34 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOME_TOOL_CLIENT_P_H +#define AQHOME_TOOL_CLIENT_P_H + + + +#include "./client.h" + + + +typedef struct AQH_TOOL_CLIENT AQH_TOOL_CLIENT; +struct AQH_TOOL_CLIENT { + GWEN_DB_NODE *dbGlobalArgs; + GWEN_DB_NODE *dbLocalArgs; + const GWEN_ARGS *args; + + AQH_TOOLCLIENT_CREATEREQUESTMESSAGE_FN createRequestMessageFn; + AQH_TOOLCLIENT_HANDLERESPONSEMESSAGE_FN handleResponseMessageFn; + + AQH_OBJECT *ipcEndpoint; + int timeoutInSeconds; + +}; + + +#endif + diff --git a/apps/aqhome-tool/data/getdevices.c b/apps/aqhome-tool/data/getdevices.c index a95da6c..5455210 100644 --- a/apps/aqhome-tool/data/getdevices.c +++ b/apps/aqhome-tool/data/getdevices.c @@ -19,6 +19,12 @@ #include "aqhome/ipc/data/msg_data_devices.h" #include "aqhome/ipc/data/ipc_data.h" +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/m_ipc_tag16.h" +#include "aqhome/msg/ipc/m_ipc_result.h" +#include "aqhome/msg/ipc/data/m_ipcd_devices.h" +#include "aqhome/ipc2/endpoint.h" + #include #include #include @@ -33,7 +39,6 @@ static int _doGetDevices(GWEN_DB_NODE *dbArgs); -static void _sendCommand(GWEN_MSG_ENDPOINT *epTcp); @@ -164,88 +169,91 @@ int AQH_Tool_GetDevices(GWEN_DB_NODE *dbGlobalArgs, int argc, char **argv) int _doGetDevices(GWEN_DB_NODE *dbArgs) { - GWEN_MSG_ENDPOINT *epTcp; + AQH_EVENT_LOOP *eventLoop; + AQH_OBJECT *epTcp; int timeoutInSeconds; - GWEN_MSG *msg; int printHeader; + AQH_MESSAGE *msgOut; + uint32_t msgId; timeoutInSeconds=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 5); printHeader=GWEN_DB_GetIntValue(dbArgs, "printHeader", 0, 0); - epTcp=Utils_SetupBrokerClientEndpoint(dbArgs, 0); + eventLoop=AQH_EventLoop_new(); + epTcp=Utils2_SetupBrokerClientEndpoint(eventLoop, dbArgs, 0); if (epTcp==NULL) { DBG_ERROR(NULL, "ERROR creating TCP connection"); + AQH_EventLoop_free(eventLoop); return 2; } /*fprintf(stdout, "Sending GetDevices request\n");*/ - - _sendCommand(epTcp); + msgId=AQH_Endpoint_GetNextMessageId(epTcp); + msgOut=AQH_IpcMessage_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ, + msgId, 0, + 0, NULL); + AQH_Endpoint_AddMsgOut(epTcp, msgOut); for (;;) { + AQH_MESSAGE *msgIn; uint16_t code; - msg=Utils_WaitForSpecificIpcMessage(epTcp, AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP, timeoutInSeconds); - if (msg==NULL) { - DBG_ERROR(NULL, "No response received"); - return 2; - } - code=GWEN_IpcMsg_GetCode(msg); - if (code==AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP) { - AQH_DEVICE_LIST *deviceList; + msgIn=Utils2_WaitForResponseMsg(eventLoop, epTcp, msgId, timeoutInSeconds); + if (msgIn) { + GWEN_TAG16_LIST *tagList; - AQH_DevicesDataIpcMsg_Parse(msg, 0); - deviceList=AQH_DevicesDataIpcMsg_ReadDeviceList(msg); - if (deviceList) { - AQH_DEVICE *device; + code=AQH_IpcMessage_GetCode(msgIn); + tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); + if (tagList) { + if (code==AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP) { + AQH_DEVICE_LIST *deviceList; - device=AQH_Device_List_First(deviceList); - while(device) { - Utils_PrintDevice(device, printHeader); - printHeader=0; - device=AQH_Device_List_Next(device); + deviceList=AQH_IpcdMessageDevices_ReadDeviceList(tagList); + if (deviceList) { + AQH_DEVICE *device; + + device=AQH_Device_List_First(deviceList); + while(device) { + Utils_PrintDevice(device, printHeader); + printHeader=0; + device=AQH_Device_List_Next(device); + } + AQH_Device_List_free(deviceList); + } + + DBG_ERROR(NULL, "Flags: %08x", AQH_IpcdMessageDevices_GetFlags(tagList)); + if (AQH_IpcdMessageDevices_GetFlags(tagList) & AQH_MSGDATA_DEVICES_FLAGS_LASTMSG) { + DBG_ERROR(NULL, "Last message received"); + GWEN_Tag16_List_free(tagList); + break; + } } - AQH_Device_List_free(deviceList); - } + else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { + uint32_t resultCode; - if (AQH_DevicesDataIpcMsg_GetFlags(msg) & AQH_MSGDATA_DEVICES_FLAGS_LASTMSG) { - DBG_INFO(NULL, "Last message received"); - break; + resultCode=AQH_IpcMessageResult_GetResult(tagList); + fprintf(stderr, "ERROR: %d\n", resultCode); + GWEN_Tag16_List_free(tagList); + AQH_Object_free(epTcp); + AQH_EventLoop_free(eventLoop); + return 3; + } + else { + DBG_INFO(NULL, "Unexpected message \"%d\"", code); + GWEN_Tag16_List_free(tagList); + AQH_Object_free(epTcp); + AQH_EventLoop_free(eventLoop); + return 3; + } + GWEN_Tag16_List_free(tagList); } } - else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { - uint32_t resultCode; - - resultCode=AQH_ResultIpcMsg_GetResultCode(msg); - fprintf(stderr, "ERROR: %d\n", resultCode); - GWEN_MsgEndpoint_free(epTcp); - return 3; - } - else { - DBG_INFO(NULL, "Unexpected message \"%d\"", code); - GWEN_MsgEndpoint_free(epTcp); - return 3; - } } /* for */ - GWEN_MsgEndpoint_free(epTcp); + AQH_Object_free(epTcp); return 0; } -void _sendCommand(GWEN_MSG_ENDPOINT *epTcp) -{ - GWEN_MSG *msgOut; - - msgOut=GWEN_IpcMsg_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ, - GWEN_MsgEndpoint_GetNextMessageId(epTcp), 0, - 0, NULL); - GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); -} - - - - - diff --git a/apps/aqhome-tool/data/getvalues.c b/apps/aqhome-tool/data/getvalues.c index 74d73ca..b1f1def 100644 --- a/apps/aqhome-tool/data/getvalues.c +++ b/apps/aqhome-tool/data/getvalues.c @@ -19,6 +19,12 @@ #include "aqhome/ipc/data/msg_data_values.h" #include "aqhome/ipc/data/ipc_data.h" +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/m_ipc_tag16.h" +#include "aqhome/msg/ipc/m_ipc_result.h" +#include "aqhome/msg/ipc/data/m_ipcd_values.h" +#include "aqhome/ipc2/endpoint.h" + #include #include #include @@ -33,8 +39,6 @@ static int _doGetValues(GWEN_DB_NODE *dbArgs); -static uint32_t _sendRequest(GWEN_MSG_ENDPOINT *epTcp); -static int _handleResponses(GWEN_MSG_ENDPOINT *epTcp, uint32_t msgId, int timeoutInSeconds); @@ -110,6 +114,17 @@ int AQH_Tool_GetValues(GWEN_DB_NODE *dbGlobalArgs, int argc, char **argv) I18S("Specify service password"), I18S("Specify service password") }, + { + 0, /* flags */ + GWEN_ArgsType_Int, /* type */ + "printHeader", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "H", /* short option */ + "printheader", /* long option */ + I18S("Print header if given"), + I18S("Print header if given") + }, { GWEN_ARGS_FLAGS_HELP | GWEN_ARGS_FLAGS_LAST, /* flags */ GWEN_ArgsType_Int, /* type */ @@ -154,112 +169,88 @@ int AQH_Tool_GetValues(GWEN_DB_NODE *dbGlobalArgs, int argc, char **argv) int _doGetValues(GWEN_DB_NODE *dbArgs) { - GWEN_MSG_ENDPOINT *epTcp; + AQH_EVENT_LOOP *eventLoop; + AQH_OBJECT *epTcp; int timeoutInSeconds; + int printHeader; + AQH_MESSAGE *msgOut; uint32_t msgId; - int rv; timeoutInSeconds=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 5); + printHeader=GWEN_DB_GetIntValue(dbArgs, "printHeader", 0, 0); - epTcp=Utils_SetupBrokerClientEndpoint(dbArgs, 0); + eventLoop=AQH_EventLoop_new(); + epTcp=Utils2_SetupBrokerClientEndpoint(eventLoop, dbArgs, 0); if (epTcp==NULL) { DBG_ERROR(NULL, "ERROR creating TCP connection"); + AQH_EventLoop_free(eventLoop); return 2; } - msgId=_sendRequest(epTcp); - rv=_handleResponses(epTcp, msgId, timeoutInSeconds); - if (rv!=0) { - DBG_ERROR(NULL, "here (%d)", rv); - } - GWEN_MsgEndpoint_free(epTcp); - return rv; -} + msgId=AQH_Endpoint_GetNextMessageId(epTcp); + msgOut=AQH_IpcMessage_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ, + msgId, 0, + 0, NULL); + AQH_Endpoint_AddMsgOut(epTcp, msgOut); - - -uint32_t _sendRequest(GWEN_MSG_ENDPOINT *epTcp) -{ - GWEN_MSG *msgOut; - uint32_t msgId; - - msgId=GWEN_MsgEndpoint_GetNextMessageId(epTcp); - msgOut=GWEN_IpcMsg_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ, - msgId, 0, - 0, NULL); - GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); - return msgId; -} - - - -int _handleResponses(GWEN_MSG_ENDPOINT *epTcp, uint32_t msgId, int timeoutInSeconds) -{ for (;;) { - GWEN_MSG *msg; + AQH_MESSAGE *msgIn; + uint16_t code; - msg=Utils_WaitForResponse(epTcp, msgId, timeoutInSeconds); - if (msg) { - uint16_t code; + msgIn=Utils2_WaitForResponseMsg(eventLoop, epTcp, msgId, timeoutInSeconds); + if (msgIn) { + GWEN_TAG16_LIST *tagList; - code=GWEN_IpcMsg_GetCode(msg); - if (code==AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP) { - AQH_VALUE_LIST *valueList; + code=AQH_IpcMessage_GetCode(msgIn); + tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0); + if (tagList) { + if (code==AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP) { + AQH_VALUE_LIST *valueList; - AQH_ValuesDataIpcMsg_Parse(msg, 0); - valueList=AQH_ValuesDataIpcMsg_ReadValueList(msg); - if (valueList) { - AQH_VALUE *v; + valueList=AQH_IpcdMessageValues_ReadValueList(tagList); + if (valueList) { + AQH_VALUE *value; - v=AQH_Value_List_First(valueList); - while(v) { - uint64_t valueId; - const char *valueName; - const char *valueUnits; - - valueId=AQH_Value_GetId(v); - valueName=AQH_Value_GetNameForSystem(v); - valueUnits=AQH_Value_GetValueUnits(v); - - fprintf(stdout, "%lu\t%s\t%s\n", - (unsigned long int) valueId, - valueName?valueName:"", - valueUnits?valueUnits:""); - - v=AQH_Value_List_Next(v); + value=AQH_Value_List_First(valueList); + while(value) { + Utils_PrintValue(value, printHeader); + printHeader=0; + value=AQH_Value_List_Next(value); + } + AQH_Value_List_free(valueList); } - AQH_Value_List_free(valueList); - } - if (AQH_ValuesDataIpcMsg_GetFlags(msg) & AQH_MSGDATA_VALUES_FLAGS_LASTMSG) { - DBG_INFO(NULL, "Last message received"); - GWEN_Msg_free(msg); - break; + if (AQH_IpcdMessageValues_GetFlags(tagList) & AQH_MSGDATA_VALUES_FLAGS_LASTMSG) { + DBG_INFO(NULL, "Last message received"); + GWEN_Tag16_List_free(tagList); + break; + } } - } - else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { - uint32_t resultCode; + else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) { + uint32_t resultCode; - resultCode=AQH_ResultIpcMsg_GetResultCode(msg); - fprintf(stderr, "ERROR: %d\n", resultCode); - GWEN_Msg_free(msg); - return 3; + resultCode=AQH_IpcMessageResult_GetResult(tagList); + fprintf(stderr, "ERROR: %d\n", resultCode); + GWEN_Tag16_List_free(tagList); + AQH_Object_free(epTcp); + AQH_EventLoop_free(eventLoop); + return 3; + } + else { + DBG_INFO(NULL, "Unexpected message \"%d\"", code); + GWEN_Tag16_List_free(tagList); + AQH_Object_free(epTcp); + AQH_EventLoop_free(eventLoop); + return 3; + } + GWEN_Tag16_List_free(tagList); } - else { - DBG_INFO(NULL, "Unexpected message \"%d\"", code); - GWEN_Msg_free(msg); - return 3; - } - } /* if msg */ - else { - DBG_ERROR(NULL, "No response received"); - return 2; } } /* for */ + AQH_Object_free(epTcp); return 0; } - diff --git a/apps/aqhome-tool/utils.c b/apps/aqhome-tool/utils.c index f326c51..faecaac 100644 --- a/apps/aqhome-tool/utils.c +++ b/apps/aqhome-tool/utils.c @@ -21,6 +21,12 @@ #include "aqhome/ipc/msg_ipc_result.h" #include "aqhome/ipc/endpoint_ipcclient.h" +#include "aqhome/msg/ipc/m_ipc.h" +#include "aqhome/msg/ipc/data/m_ipcd.h" +#include "aqhome/ipc2/tcp_object.h" +#include "aqhome/ipc2/ipc_client.h" + + #include #include #include @@ -35,6 +41,46 @@ +AQH_OBJECT *Utils2_SetupBrokerClientEndpoint(AQH_EVENT_LOOP *eventLoop, GWEN_DB_NODE *dbArgs, uint32_t flags) +{ + const char *brokerAddress; + int brokerPort; + const char *brokerClientId; + + brokerAddress=GWEN_DB_GetCharValue(dbArgs, "brokerAddress", 0, NULL); + if (!(brokerAddress && *brokerAddress)) + brokerAddress=GWEN_DB_GetCharValue(dbArgs, "ConfigFile/brokerAddress", 0, "127.0.0.1"); + + brokerPort=GWEN_DB_GetIntValue(dbArgs, "brokerPort", 0, -1); + if (brokerPort<0) + brokerPort=GWEN_DB_GetIntValue(dbArgs, "ConfigFile/brokerPort", 0, 45456); + + brokerClientId=GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, "aqhome-tool"); + + if (brokerAddress && *brokerAddress && brokerPort) { + AQH_OBJECT *ep; + int fd; + + fd=AQH_TcpObject_CreateConnectedSocket(brokerAddress, brokerPort); + if (fd<0) { + DBG_ERROR(NULL, "Error connecting to broker server %s:%d", brokerAddress, brokerPort); + return NULL; + } + + ep=AQH_IpcClientObject_new(eventLoop, fd); + assert(ep); + AQH_Endpoint_AddFlags(ep, flags); + return ep; + } + else { + DBG_ERROR(NULL, "No server settings"); + } + + return NULL; +} + + + GWEN_MSG_ENDPOINT *Utils_SetupBrokerClientEndpoint(GWEN_DB_NODE *dbArgs, uint32_t flags) { const char *brokerAddress; @@ -172,6 +218,42 @@ GWEN_MSG *Utils_WaitForSpecificNodeMessage(GWEN_MSG_ENDPOINT *epTcp, +AQH_MESSAGE *Utils2_WaitForResponseMsg(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *epTcp, uint32_t refMsgId, int timeoutInSeconds) +{ + time_t startTime; + + startTime=time(NULL); + + for (;;) { + AQH_MESSAGE *msg; + time_t now; + + AQH_EventLoop_Run(eventLoop, 500); + msg=AQH_Endpoint_GetNextMsgIn(epTcp); + if (msg) { + if (refMsgId==AQH_IpcMessage_GetRefMsgId(msg)) + return msg; + else { + uint16_t code; + + code=AQH_IpcMessage_GetCode(msg); + DBG_ERROR(NULL, "Received unexpected message %d (%x), ignoring", code, code); + AQH_Message_free(msg); + } + } + + now=time(NULL); + if (now-startTime>timeoutInSeconds) { + DBG_ERROR(NULL, "Timeout"); + break; + } + } + + return NULL; +} + + + GWEN_MSG *Utils_WaitForSpecificIpcMessage(GWEN_MSG_ENDPOINT *epTcp, int msgCode, int timeoutInSeconds) @@ -559,4 +641,24 @@ AQH_DEVICE *Utils_DeviceFromArgs(GWEN_DB_NODE *dbArgs) +void Utils_PrintValue(const AQH_VALUE *value, int printHeader) +{ + uint64_t valueId; + const char *valueName; + const char *valueUnits; + + valueId=AQH_Value_GetId(value); + valueName=AQH_Value_GetNameForSystem(value); + valueUnits=AQH_Value_GetValueUnits(value); + + if (printHeader) + fprintf(stdout, "ID\tName\tUnits\n"); + fprintf(stdout, "%lu\t%s\t%s\n", + (unsigned long int) valueId, + valueName?valueName:"", + valueUnits?valueUnits:""); +} + + + diff --git a/apps/aqhome-tool/utils.h b/apps/aqhome-tool/utils.h index f511065..d6c5fc0 100644 --- a/apps/aqhome-tool/utils.h +++ b/apps/aqhome-tool/utils.h @@ -16,6 +16,15 @@ #include #include +#include +#include +#include + + + +AQH_OBJECT *Utils2_SetupBrokerClientEndpoint(AQH_EVENT_LOOP *eventLoop, GWEN_DB_NODE *dbArgs, uint32_t flags); +AQH_MESSAGE *Utils2_WaitForResponseMsg(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *epTcp, uint32_t refMsgId, int timeoutInSeconds); + GWEN_MSG_ENDPOINT *Utils_SetupIpcEndpoint(GWEN_DB_NODE *dbArgs, @@ -46,6 +55,7 @@ void Utils_PrintDiffData(const uint64_t *dataPoints, uint32_t numValues, const c void Utils_PrintFormattedSingleDataPoint(const AQH_VALUE *v, uint64_t timestamp, double data, const char *tmpl); void Utils_PrintDevice(const AQH_DEVICE *device, int printHeader); +void Utils_PrintValue(const AQH_VALUE *value, int printHeader); AQH_DEVICE *Utils_DeviceFromArgs(GWEN_DB_NODE *dbArgs); diff --git a/aqhome/events2/eventloop.h b/aqhome/events2/eventloop.h index a73570d..71e48ac 100644 --- a/aqhome/events2/eventloop.h +++ b/aqhome/events2/eventloop.h @@ -19,7 +19,7 @@ typedef struct AQH_EVENT_LOOP AQH_EVENT_LOOP; AQHOME_API AQH_EVENT_LOOP *AQH_EventLoop_new(void); AQHOME_API void AQH_EventLoop_free(AQH_EVENT_LOOP *eventLoop); -AQHOME_API void AQH_EventLoop_Run(AQH_EVENT_LOOP *eventLoop); +AQHOME_API void AQH_EventLoop_Run(AQH_EVENT_LOOP *eventLoop, int timeoutInMillisecs); AQHOME_API void AQH_EventLoop_AddFdObject(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *o); AQHOME_API void AQH_EventLoop_DelFdObject(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *o); diff --git a/aqhome/events2/eventloop_select.c b/aqhome/events2/eventloop_select.c index fa4381f..dd32b56 100644 --- a/aqhome/events2/eventloop_select.c +++ b/aqhome/events2/eventloop_select.c @@ -39,7 +39,7 @@ static void _signalReadyFdObjects(AQH_OBJECT_LIST2 *ol); * ------------------------------------------------------------------------------------------------ */ -void AQH_EventLoop_Run(AQH_EVENT_LOOP *eventLoop) +void AQH_EventLoop_Run(AQH_EVENT_LOOP *eventLoop, int timeoutInMillisecs) { fd_set fdRead; fd_set fdWrite; @@ -56,8 +56,8 @@ void AQH_EventLoop_Run(AQH_EVENT_LOOP *eventLoop) if (highestFd>-1) { struct timeval tv; - tv.tv_sec=0; - tv.tv_usec=200000; + tv.tv_sec=timeoutInMillisecs/1000; + tv.tv_usec=(timeoutInMillisecs%1000)*1000; rv=select(highestFd+1, &fdRead, &fdWrite, NULL, &tv); if (rv>0) { /* some fds became active */ diff --git a/aqhome/events2/fdobject.c b/aqhome/events2/fdobject.c index e819a5e..7ffb4a5 100644 --- a/aqhome/events2/fdobject.c +++ b/aqhome/events2/fdobject.c @@ -168,7 +168,7 @@ int AQH_FdObject_Read(AQH_OBJECT *o, uint8_t *ptrBuffer, uint32_t lenBuffer) } else if (rv>0) { /* data received */ - DBG_ERROR(AQH_LOGDOMAIN, "Received %d bytes", (int) rv); + DBG_INFO(AQH_LOGDOMAIN, "Received %d bytes", (int) rv); return (int) rv; } else { diff --git a/aqhome/events2/object.c b/aqhome/events2/object.c index 4584da5..ca306b9 100644 --- a/aqhome/events2/object.c +++ b/aqhome/events2/object.c @@ -72,10 +72,9 @@ void AQH_Object_free(AQH_OBJECT *o) { if (o && o->refCount>0) { if (--(o->refCount)==0) { - GWEN_INHERIT_FINI(AQH_OBJECT, o); - GWEN_LIST_FINI(AQH_OBJECT, o); - AQH_Link_List_free(o->linkList); + GWEN_LIST_FINI(AQH_OBJECT, o); + GWEN_INHERIT_FINI(AQH_OBJECT, o); GWEN_FREE_OBJECT(o); } diff --git a/aqhome/events2/object.h b/aqhome/events2/object.h index cb53e79..71d9507 100644 --- a/aqhome/events2/object.h +++ b/aqhome/events2/object.h @@ -21,6 +21,7 @@ #define AQH_OBJECT_FLAGS_ENABLED 0x80000000L #define AQH_OBJECT_FLAGS_FIRE 0x40000000L +#define AQH_OBJECT_FLAGS_DELETE 0x20000000L enum { diff --git a/aqhome/ipc2/0BUILD b/aqhome/ipc2/0BUILD index 62a5e6d..acafb2a 100644 --- a/aqhome/ipc2/0BUILD +++ b/aqhome/ipc2/0BUILD @@ -51,10 +51,12 @@ nodemsgreader.h ttyobject.h tcpd_object.h + tcp_object.h endpoint.h message.h msgrequest.h ipc_server.h + ipc_client.h tty_endpoint.h @@ -79,10 +81,12 @@ nodemsgreader.c ttyobject.c tcpd_object.c + tcp_object.c endpoint.c message.c msgrequest.c ipc_server.c + ipc_client.c tty_endpoint.c diff --git a/aqhome/ipc2/endpoint.c b/aqhome/ipc2/endpoint.c index d3b6f79..173f8eb 100644 --- a/aqhome/ipc2/endpoint.c +++ b/aqhome/ipc2/endpoint.c @@ -30,9 +30,14 @@ * ------------------------------------------------------------------------------------------------ */ +/*#define LOG_MESSAGES*/ + + + enum { AQH_ENDPOINT_SLOT_MSG_RECVD=1, - AQH_ENDPOINT_SLOT_MSG_SENT + AQH_ENDPOINT_SLOT_MSG_SENT, + AQH_ENDPOINT_SLOT_CLOSED }; @@ -54,8 +59,8 @@ GWEN_INHERIT(AQH_OBJECT, AQH_ENDPOINT) static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); static int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr); -static int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr); -static void _dumpMsg(const AQH_MESSAGE *msg, const char *sText); +static int _handleMsgSent(AQH_OBJECT *o); +static int _handleClosed(AQH_OBJECT *o); @@ -81,6 +86,7 @@ AQH_OBJECT *AQH_Endpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, A if (msgReader) { xo->msgReader=msgReader; AQH_Object_AddLink(msgReader, AQH_MSG_READER_SIGNAL_MSGRECVD, AQH_ENDPOINT_SLOT_MSG_RECVD, o); + AQH_Object_AddLink(msgReader, AQH_MSG_READER_SIGNAL_CLOSED, AQH_ENDPOINT_SLOT_CLOSED, o); } if (msgWriter) { @@ -98,6 +104,8 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) AQH_ENDPOINT *xo; xo=(AQH_ENDPOINT*) p; + free(xo->serviceName); + free(xo->userName); AQH_Message_List_free(xo->msgOutList); AQH_Message_List_free(xo->msgInList); AQH_Object_free(xo->msgWriter); @@ -107,6 +115,197 @@ void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p) +const char *AQH_Endpoint_GetServiceName(const AQH_OBJECT *o) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + return xo->serviceName; + } + return NULL; +} + + + +void AQH_Endpoint_SetServiceName(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) { + free(xo->serviceName); + xo->serviceName=s?strdup(s):NULL; + } + } +} + + + +const char *AQH_Endpoint_GetUserName(const AQH_OBJECT *o) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + return xo->userName; + } + return NULL; +} + + + +void AQH_Endpoint_SetUserName(AQH_OBJECT *o, const char *s) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) { + free(xo->userName); + xo->userName=s?strdup(s):NULL; + } + } +} + + + +uint32_t AQH_Endpoint_GetPermissions(const AQH_OBJECT *o) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + return xo->permissions; + } + return 0; +} + + + +void AQH_Endpoint_SetPermissions(AQH_OBJECT *o, uint32_t i) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + xo->permissions=i; + } +} + + + +void AQH_Endpoint_AddPermissions(AQH_OBJECT *o, uint32_t i) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + xo->permissions|=i; + } +} + + + +void AQH_Endpoint_SubPermissions(AQH_OBJECT *o, uint32_t i) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + xo->permissions&=~i; + } +} + + + +uint32_t AQH_Endpoint_GetFlags(const AQH_OBJECT *o) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + return xo->flags; + } + return 0; +} + + + +void AQH_Endpoint_SetFlags(AQH_OBJECT *o, uint32_t i) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + xo->flags=i; + } +} + + + +void AQH_Endpoint_AddFlags(AQH_OBJECT *o, uint32_t i) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + xo->flags|=i; + } +} + + + +void AQH_Endpoint_SubFlags(AQH_OBJECT *o, uint32_t i) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + xo->flags&=~i; + } +} + + + +int AQH_Endpoint_GetState(const AQH_OBJECT *o) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + return xo->state; + } + return 0; +} + + + +void AQH_Endpoint_SetState(AQH_OBJECT *o, int i) +{ + if (o) { + AQH_ENDPOINT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) + xo->state=i; + } +} + + + AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgOutList(const AQH_OBJECT *o) { if (o) { @@ -144,6 +343,7 @@ void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg) if (xo) { AQH_Message_List_Add(msg, xo->msgOutList); if (xo->msgWriter && AQH_Message_List_GetCount(xo->msgOutList)==1) { + DBG_INFO(AQH_LOGDOMAIN, "Enabling msgWriter, sending message"); AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg)); AQH_Object_Enable(xo->msgWriter); } @@ -173,8 +373,15 @@ AQH_MESSAGE *AQH_Endpoint_GetNextMsgIn(AQH_OBJECT *o) AQH_ENDPOINT *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); - if (xo) - return AQH_Message_List_First(xo->msgInList); + if (xo) { + AQH_MESSAGE *msg; + + msg=AQH_Message_List_First(xo->msgInList); + if (msg) { + AQH_Message_List_Del(msg); + return msg; + } + } } return NULL; } @@ -189,7 +396,7 @@ void AQH_Endpoint_AddMsgIn(AQH_OBJECT *o, AQH_MESSAGE *msg) xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); if (xo) { AQH_Message_List_Add(msg, xo->msgInList); - DBG_ERROR(AQH_LOGDOMAIN, "now %d msgs in list", AQH_Message_List_GetCount(xo->msgInList)); + DBG_INFO(AQH_LOGDOMAIN, "now %d msgs in list", AQH_Message_List_GetCount(xo->msgInList)); } } } @@ -228,7 +435,8 @@ int _handleSignal(AQH_OBJECT *o, uint32_t slotId, GWEN_UNUSED AQH_OBJECT *sender { switch(slotId) { case AQH_ENDPOINT_SLOT_MSG_RECVD: return _handleMsgRecvd(o, param1, param2); - case AQH_ENDPOINT_SLOT_MSG_SENT: return _handleMsgSent(o, param1, param2); + case AQH_ENDPOINT_SLOT_MSG_SENT: return _handleMsgSent(o); + case AQH_ENDPOINT_SLOT_CLOSED: return _handleClosed(o); default: break; } @@ -242,9 +450,9 @@ int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr) { AQH_MESSAGE *msg; - DBG_ERROR(AQH_LOGDOMAIN, "Msg received:"); + DBG_INFO(AQH_LOGDOMAIN, "Msg received:"); + /*GWEN_Text_LogString((const char*) msgPtr, msgLen, AQH_LOGDOMAIN, GWEN_LoggerLevel_Error);*/ msg=AQH_NodeMessage_fromBuffer(msgPtr, msgLen); - _dumpMsg(msg, "Received"); AQH_Endpoint_AddMsgIn(o, msg); return 1; @@ -252,11 +460,9 @@ int _handleMsgRecvd(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr) -int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr) +int _handleMsgSent(AQH_OBJECT *o) { - DBG_ERROR(AQH_LOGDOMAIN, "Msg sent:"); - GWEN_Text_LogString((const char*) msgPtr, msgLen, AQH_LOGDOMAIN, GWEN_LoggerLevel_Error); - + DBG_INFO(AQH_LOGDOMAIN, "Msg sent"); if (o) { AQH_ENDPOINT *xo; @@ -264,6 +470,7 @@ int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr) if (xo) { AQH_MESSAGE *msg; + DBG_INFO(AQH_LOGDOMAIN, "Messages in outlist: %d", AQH_Message_List_GetCount(xo->msgOutList)); msg=AQH_Message_List_First(xo->msgOutList); if (msg) { /* remove sent message from list */ @@ -273,33 +480,38 @@ int _handleMsgSent(AQH_OBJECT *o, int msgLen, const uint8_t *msgPtr) /* get next message in list */ msg=AQH_Message_List_First(xo->msgOutList); if (msg) { - DBG_ERROR(AQH_LOGDOMAIN, "Sending next message"); + DBG_INFO(AQH_LOGDOMAIN, "Sending next message"); AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg)); } - } - else { - DBG_ERROR(AQH_LOGDOMAIN, "Last message sent, disabling writer"); - AQH_Object_Disable(xo->msgWriter); + else { + DBG_INFO(AQH_LOGDOMAIN, "Last message sent, disabling writer"); + AQH_Object_Disable(xo->msgWriter); + } } } } - return 1; } -void _dumpMsg(const AQH_MESSAGE *msg, const char *sText) +int _handleClosed(AQH_OBJECT *o) { - if (msg) { - GWEN_BUFFER *mbuf; + AQH_ENDPOINT *xo; - mbuf=GWEN_Buffer_new(0, 256, 0, 1); - AQH_NodeMessage_DumpSpecificToBuffer(msg, mbuf, sText); - DBG_ERROR(AQH_LOGDOMAIN, "%s", GWEN_Buffer_GetStart(mbuf)); - GWEN_Buffer_free(mbuf); + DBG_ERROR(AQH_LOGDOMAIN, "Connection closed."); + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o); + if (xo) { + AQH_Object_Disable(xo->msgWriter); + AQH_Object_Disable(xo->msgReader); + if (0==AQH_Object_EmitSignal(o, AQH_ENDPOINT_SIGNAL_CLOSED, 0, NULL)) { + DBG_ERROR(AQH_LOGDOMAIN, "Signal CLOSED not handled"); + } + return 1; } + return 0; } + diff --git a/aqhome/ipc2/endpoint.h b/aqhome/ipc2/endpoint.h index a11c500..66b3af8 100644 --- a/aqhome/ipc2/endpoint.h +++ b/aqhome/ipc2/endpoint.h @@ -14,6 +14,12 @@ +enum { + AQH_ENDPOINT_SIGNAL_CLOSED=AQH_OBJECT_SIGNAL_LAST, +}; + + + /** * Constructor. * @@ -26,6 +32,25 @@ */ AQHOME_API AQH_OBJECT *AQH_Endpoint_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *msgReader, AQH_OBJECT *msgWriter); +AQHOME_API const char *AQH_Endpoint_GetServiceName(const AQH_OBJECT *o); +AQHOME_API void AQH_Endpoint_SetServiceName(AQH_OBJECT *o, const char *s); + +AQHOME_API const char *AQH_Endpoint_GetUserName(const AQH_OBJECT *o); +AQHOME_API void AQH_Endpoint_SetUserName(AQH_OBJECT *o, const char *s); + +AQHOME_API uint32_t AQH_Endpoint_GetPermissions(const AQH_OBJECT *o); +AQHOME_API void AQH_Endpoint_SetPermissions(AQH_OBJECT *o, uint32_t i); +AQHOME_API void AQH_Endpoint_AddPermissions(AQH_OBJECT *o, uint32_t i); +AQHOME_API void AQH_Endpoint_SubPermissions(AQH_OBJECT *o, uint32_t i); + +AQHOME_API uint32_t AQH_Endpoint_GetFlags(const AQH_OBJECT *o); +AQHOME_API void AQH_Endpoint_SetFlags(AQH_OBJECT *o, uint32_t i); +AQHOME_API void AQH_Endpoint_AddFlags(AQH_OBJECT *o, uint32_t i); +AQHOME_API void AQH_Endpoint_SubFlags(AQH_OBJECT *o, uint32_t i); + +AQHOME_API int AQH_Endpoint_GetState(const AQH_OBJECT *o); +AQHOME_API void AQH_Endpoint_SetState(AQH_OBJECT *o, int i); + AQHOME_API AQH_MESSAGE_LIST *AQH_Endpoint_GetMsgOutList(const AQH_OBJECT *o); AQHOME_API AQH_MESSAGE *AQH_Endpoint_GetNextMsgOut(AQH_OBJECT *o); AQHOME_API void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg); diff --git a/aqhome/ipc2/endpoint_p.h b/aqhome/ipc2/endpoint_p.h index 7039406..4441e1a 100644 --- a/aqhome/ipc2/endpoint_p.h +++ b/aqhome/ipc2/endpoint_p.h @@ -21,6 +21,11 @@ struct AQH_ENDPOINT { AQH_OBJECT *msgReader; uint32_t lastMsgId; + int state; + uint32_t permissions; + uint32_t flags; + char *serviceName; + char *userName; }; diff --git a/aqhome/ipc2/ipc_client.c b/aqhome/ipc2/ipc_client.c new file mode 100644 index 0000000..67878d5 --- /dev/null +++ b/aqhome/ipc2/ipc_client.c @@ -0,0 +1,53 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./ipc_client.h" + +#include +#include +#include + +#include + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + +AQH_OBJECT *AQH_IpcClientObject_new(AQH_EVENT_LOOP *eventLoop, int fd) +{ + int fdCopy; + AQH_OBJECT *fdReader; + AQH_OBJECT *fdWriter; + AQH_OBJECT *msgReader; + AQH_OBJECT *msgWriter; + AQH_OBJECT *endpoint; + + fdCopy=dup(fd); + + fdReader=AQH_FdObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ); + msgReader=AQH_IpcMsgReader_new(eventLoop, fdReader); + AQH_Object_Enable(msgReader); + + fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE); + msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter); + + endpoint=AQH_Endpoint_new(eventLoop, msgReader, msgWriter); + return endpoint; +} + + + diff --git a/aqhome/ipc2/ipc_client.h b/aqhome/ipc2/ipc_client.h new file mode 100644 index 0000000..2cc4ce2 --- /dev/null +++ b/aqhome/ipc2/ipc_client.h @@ -0,0 +1,28 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_IPC_CLIENT_H +#define AQH_IPC_CLIENT_H + + +#include + + + +/** + * Create an endpoint object (see @ref AQH_Endpoint_new) which works over a given socket. + * + * @param eventLoop pointer to eventLoop + * @param fd connected non-blocking socket to work with (see @ref AQH_TcpObject_CreateConnectedSocket). + */ +AQHOME_API AQH_OBJECT *AQH_IpcClientObject_new(AQH_EVENT_LOOP *eventLoop, int fd); + + + +#endif + diff --git a/aqhome/ipc2/ipc_server.c b/aqhome/ipc2/ipc_server.c index 34a5957..4270753 100644 --- a/aqhome/ipc2/ipc_server.c +++ b/aqhome/ipc2/ipc_server.c @@ -101,7 +101,7 @@ int _handleSignal(AQH_OBJECT *o, GWEN_UNUSED void *param2) { switch(slotId) { - case AQH_TCPD_OBJECT_SIGNAL_NEWCONN: return _handleNewConn(o, param1); + case AQH_IPCD_OBJECT_SLOT_NEWCONN: return _handleNewConn(o, param1); default: break; } @@ -127,6 +127,7 @@ int _handleNewConn(AQH_OBJECT *o, int newFd) fdReader=AQH_FdObject_new(eventLoop, newFd, AQH_FDOBJECT_FDMODE_READ); msgReader=AQH_IpcMsgReader_new(eventLoop, fdReader); + AQH_Object_Enable(msgReader); fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE); msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter); diff --git a/aqhome/ipc2/ipcmsgreader.c b/aqhome/ipc2/ipcmsgreader.c index 4f6f5c5..33e76a1 100644 --- a/aqhome/ipc2/ipcmsgreader.c +++ b/aqhome/ipc2/ipcmsgreader.c @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -63,6 +64,7 @@ int _readMsg(AQH_OBJECT *o) int rv; if (xo->bytesReceivedbytesReceived>=AQH_MSG_READER_HEADER_SIZE) { + DBG_INFO(AQH_LOGDOMAIN, "Reading body"); /* reading remainder of msg directly into allocated buffer */ rv=AQH_MsgReader_ReadRemainderFromRingbuffer(o); if (rv<0) { @@ -90,7 +93,7 @@ int _readMsg(AQH_OBJECT *o) rv=AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_MSGRECVD, msgLen, (void*) msgPtr); if (rv==0) { - DBG_INFO(AQH_LOGDOMAIN, "Received message ignored"); + DBG_ERROR(AQH_LOGDOMAIN, "Received message ignored"); } free(msgPtr); return 1; @@ -136,9 +139,9 @@ int _readHeaderFromRingbuffer(AQH_MSG_READER *xo) DBG_ERROR(AQH_LOGDOMAIN, "Bad message size(%lu)", (unsigned long int) msgLen); return GWEN_ERROR_GENERIC; } - xo->currentMsgBuf=(uint8_t*) malloc(msgLen+4); /* +4 because of msg len (4 bytes) */ + xo->currentMsgBuf=(uint8_t*) malloc(msgLen); memmove(xo->currentMsgBuf, xo->headerBuffer, xo->bytesReceived); - xo->bytesLeft=(msgLen+4)-xo->bytesReceived; + xo->bytesLeft=msgLen-xo->bytesReceived; } return 0; diff --git a/aqhome/ipc2/msgreader.c b/aqhome/ipc2/msgreader.c index 85cf8cb..0ee353d 100644 --- a/aqhome/ipc2/msgreader.c +++ b/aqhome/ipc2/msgreader.c @@ -201,6 +201,7 @@ int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject) { AQH_MSG_READER *xo; + DBG_INFO(AQH_LOGDOMAIN, "Socket ready"); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); if (xo) { int rv; @@ -401,7 +402,7 @@ void _resetBuffers(AQH_MSG_READER *xo) void _cbEnable(AQH_OBJECT *o) { - if (o && !(AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) { + if (o) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); @@ -414,7 +415,7 @@ void _cbEnable(AQH_OBJECT *o) void _cbDisable(AQH_OBJECT *o) { - if (o && (AQH_Object_GetFlags(o) & AQH_OBJECT_FLAGS_ENABLED)) { + if (o) { AQH_MSG_READER *xo; xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o); diff --git a/aqhome/ipc2/msgwriter.c b/aqhome/ipc2/msgwriter.c index 8d824d9..60121b7 100644 --- a/aqhome/ipc2/msgwriter.c +++ b/aqhome/ipc2/msgwriter.c @@ -129,11 +129,12 @@ int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject) { AQH_MSG_WRITER *xo; + DBG_INFO(AQH_LOGDOMAIN, "Socket ready"); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_WRITER, o); if (xo) { - if (xo->bytesLeft) { - int rv; + int rv; + if (xo->bytesLeft) { if (!(xo->flags & AQH_MSGWRITER_FLAGS_MSGSTARTED)) { rv=_startMsg(xo, fdObject); if (rv<0) { @@ -161,12 +162,17 @@ int _handleSocketReady(AQH_OBJECT *o, AQH_OBJECT *fdObject) } else { if (xo->bytesLeft==0) { + int msgLen; + const uint8_t *msgPtr; + _endMsg(xo, fdObject); - rv=AQH_Object_EmitSignal(o, AQH_MSG_WRITER_SIGNAL_MSGSENT, xo->msgBufLen, (void*) (xo->msgBufPtr)); - if (rv==0) { - DBG_INFO(AQH_LOGDOMAIN, "Sent message ignored"); - } + msgPtr=xo->msgBufPtr; + msgLen=xo->msgBufLen; _resetBuffer(o); + rv=AQH_Object_EmitSignal(o, AQH_MSG_WRITER_SIGNAL_MSGSENT, msgLen, (void*) msgPtr); + if (rv==0) { + DBG_ERROR(AQH_LOGDOMAIN, "Sent message ignored"); + } } } } diff --git a/aqhome/ipc2/tcp_object.c b/aqhome/ipc2/tcp_object.c new file mode 100644 index 0000000..58eaedc --- /dev/null +++ b/aqhome/ipc2/tcp_object.c @@ -0,0 +1,178 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "./tcp_object.h" + +#include + +#include +#include +#include +#include +#include + +#include +#include + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static int _socketSetBlocking(int sk, int fl); +static int _translateHError(int herr); +static int _setHostAddr(struct in_addr *inetAddr, const char *sAddr); +static int _setHostName(struct in_addr *inetAddr, const char *sAddr); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + +int AQH_TcpObject_CreateConnectedSocket(const char *addr, int port) +{ + int sk; + struct sockaddr_in inetAddr; + int rv; + + memset(&inetAddr, 0, sizeof(inetAddr)); + inetAddr.sin_family=AF_INET; + rv=_setHostAddr(&inetAddr.sin_addr, addr); /* try tuple */ + if (rv<0) { + rv=_setHostName(&inetAddr.sin_addr, addr); /* lookup name */ + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + } + inetAddr.sin_port=htons(port); + + sk=socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (sk<0) { + DBG_INFO(AQH_LOGDOMAIN, "socket(): %s", strerror(errno)); + return GWEN_ERROR_IO; + } + + rv=connect(sk, (struct sockaddr*) &inetAddr, sizeof(inetAddr)); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + close(sk); + return rv; + } + + rv=_socketSetBlocking(sk, 0); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + close(sk); + return rv; + } + + return sk; +} + + + +int _setHostAddr(struct in_addr *inetAddr, const char *sAddr) +{ + inetAddr->s_addr=0; + if (!inet_aton(sAddr, inetAddr)) { + DBG_ERROR(AQH_LOGDOMAIN, "Invalid address \"%s\"", sAddr); + return GWEN_ERROR_INVALID; + } + + return 0; +} + + + +int _setHostName(struct in_addr *inetAddr, const char *sAddr) +{ + struct hostent *he; + + he=gethostbyname(sAddr); + if (!he) { + DBG_ERROR(AQH_LOGDOMAIN, "gethostbyname(\"%s\"): %s", sAddr, hstrerror(h_errno)); + return _translateHError(h_errno); + } + /* name resolved, store address */ + memcpy(inetAddr, he->h_addr_list[0], sizeof(struct in_addr)); + return 0; +} + + + +int _socketSetBlocking(int sk, int fl) +{ + int prevFlags; + int newFlags; + + /* get current socket flags */ + prevFlags=fcntl(sk, F_GETFL); + if (prevFlags==-1) { + DBG_INFO(AQH_LOGDOMAIN, "fcntl(): %s", strerror(errno)); + return GWEN_ERROR_IO; + } + + /* set nonblocking/blocking */ + if (fl) + newFlags=prevFlags&(~O_NONBLOCK); + else + newFlags=prevFlags|O_NONBLOCK; + + if (-1==fcntl(sk, F_SETFL, newFlags)) { + DBG_INFO(AQH_LOGDOMAIN, "fcntl(): %s", strerror(errno)); + return GWEN_ERROR_IO; + } + prevFlags=fcntl(sk, F_GETFL); + if (prevFlags!=newFlags) { + DBG_ERROR(AQH_LOGDOMAIN, "fcntl() did not set flags correctly (%08x!=%08x)", prevFlags, newFlags); + return GWEN_ERROR_IO; + } + + return 0; +} + + + +int _translateHError(int herr) +{ + int rv; + + switch (herr) { + case HOST_NOT_FOUND: + rv=GWEN_ERROR_HOST_NOT_FOUND; + break; +#ifdef NO_ADDRESS + case NO_ADDRESS: + rv=GWEN_ERROR_NO_ADDRESS; + break; +#endif + case NO_RECOVERY: + rv=GWEN_ERROR_NO_RECOVERY; + break; + case TRY_AGAIN: + rv=GWEN_ERROR_TRY_AGAIN; + break; + default: + rv=GWEN_ERROR_UNKNOWN_DNS_ERROR; + break; + } /* switch */ + + return rv; +} + + + diff --git a/aqhome/ipc2/tcp_object.h b/aqhome/ipc2/tcp_object.h new file mode 100644 index 0000000..16d4874 --- /dev/null +++ b/aqhome/ipc2/tcp_object.h @@ -0,0 +1,24 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2025 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_TCP_OBJECT_H +#define AQH_TCP_OBJECT_H + +#include + +/** + * Helper function to create and connect a TCP socket. + */ +AQHOME_API int AQH_TcpObject_CreateConnectedSocket(const char *addr, int port); + + + + + +#endif + diff --git a/aqhome/ipc2/tcpd_object.c b/aqhome/ipc2/tcpd_object.c index 9d5e369..07bda63 100644 --- a/aqhome/ipc2/tcpd_object.c +++ b/aqhome/ipc2/tcpd_object.c @@ -47,6 +47,8 @@ static void GWENHYWFAR_CB _freeData(void *bp, void *p); static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2); static int _handleSocketReady(AQH_OBJECT *o); +static void _cbEnable(AQH_OBJECT *o); +static void _cbDisable(AQH_OBJECT *o); static int _socketSetReuseAddress(int sk, int fl); static int _socketSetBlocking(int sk, int fl); @@ -71,16 +73,12 @@ AQH_OBJECT *AQH_TcpdObject_new(AQH_EVENT_LOOP *eventLoop, int fd) GWEN_NEW_OBJECT(AQH_TCPD_OBJECT, xo); GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o, xo, _freeData); AQH_Object_SetSignalHandlerFn(o, _handleSignal); + AQH_Object_SetEnableFn(o, _cbEnable); + AQH_Object_SetDisableFn(o, _cbDisable); xo->fdSocket=fd; xo->fdObject=AQH_FdObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ); - -#if 0 - /* create object for readable socket, connect to THIS, enable */ - xo->fdObject=AQH_FdObject_new(AQH_Object_GetEventLoop(o), rv, AQH_FDOBJECT_FDMODE_READ); AQH_Object_AddLink(xo->fdObject, AQH_FDOBJECT_SIGNAL_ISREADY, AQH_TCPD_OBJECT_SLOT_SOCKETREADY, o); - AQH_Object_Enable(xo->fdObject); -#endif return o; } @@ -178,17 +176,19 @@ int _handleSocketReady(AQH_OBJECT *o) { AQH_TCPD_OBJECT *xo; + DBG_ERROR(NULL, "Socket ready"); xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o); if (xo) { int clientSk; clientSk=_acceptConnection(xo->fdSocket); if (clientSk<0) { - DBG_INFO(AQH_LOGDOMAIN, "here (%d)", clientSk); + DBG_ERROR(AQH_LOGDOMAIN, "here (%d)", clientSk); } else { + DBG_ERROR(AQH_LOGDOMAIN, "New connection"); if (0==AQH_Object_EmitSignal(o, AQH_TCPD_OBJECT_SIGNAL_NEWCONN, clientSk, NULL)) { - DBG_INFO(AQH_LOGDOMAIN, "New connection not handled"); + DBG_ERROR(AQH_LOGDOMAIN, "New connection not handled"); close(clientSk); } } @@ -199,6 +199,32 @@ int _handleSocketReady(AQH_OBJECT *o) +void _cbEnable(AQH_OBJECT *o) +{ + if (o) { + AQH_TCPD_OBJECT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o); + if (xo && xo->fdObject) + AQH_Object_Enable(xo->fdObject); + } +} + + + +void _cbDisable(AQH_OBJECT *o) +{ + if (o) { + AQH_TCPD_OBJECT *xo; + + xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_TCPD_OBJECT, o); + if (xo && xo->fdObject) + AQH_Object_Disable(xo->fdObject); + } +} + + + int _setHostAddr(struct in_addr *inetAddr, const char *sAddr) { inetAddr->s_addr=0; diff --git a/aqhome/libtest.c b/aqhome/libtest.c index f0e97e4..ba10c61 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -509,7 +509,7 @@ int testTty(int argc, char **argv) AQH_Object_Enable(msgReaderObject); for (;;) { - AQH_EventLoop_Run(eventLoop); + AQH_EventLoop_Run(eventLoop, 500); } return 0; @@ -547,7 +547,7 @@ int testTty2(int argc, char **argv) endpointObject=AQH_TtyEndpoint2_new(eventLoop, fd); for (;;) { - AQH_EventLoop_Run(eventLoop); + AQH_EventLoop_Run(eventLoop, 500); } return 0; diff --git a/aqhome/msg/ipc/data/m_ipcd_values.c b/aqhome/msg/ipc/data/m_ipcd_values.c index d11024d..9f11c78 100644 --- a/aqhome/msg/ipc/data/m_ipcd_values.c +++ b/aqhome/msg/ipc/data/m_ipcd_values.c @@ -59,7 +59,7 @@ AQH_MESSAGE *AQH_IpcdMessageValues_new(uint16_t code, uint32_t msgId, uint32_t r -AQH_VALUE_LIST *AQH_IpcdMessageDevices_ReadValueList(const GWEN_TAG16_LIST *tagList) +AQH_VALUE_LIST *AQH_IpcdMessageValues_ReadValueList(const GWEN_TAG16_LIST *tagList) { if (tagList) { AQH_VALUE_LIST *valueList; diff --git a/aqhome/msg/ipc/data/m_ipcd_values.h b/aqhome/msg/ipc/data/m_ipcd_values.h index f083ca2..a0afafd 100644 --- a/aqhome/msg/ipc/data/m_ipcd_values.h +++ b/aqhome/msg/ipc/data/m_ipcd_values.h @@ -29,7 +29,7 @@ AQHOME_API AQH_MESSAGE *AQH_IpcdMessageValues_new(uint16_t code, uint32_t msgId, uint32_t refMsgId, uint32_t flags, const AQH_VALUE_LIST *valueList); -AQHOME_API AQH_VALUE_LIST *AQH_IpcdMessageDevices_ReadValueList(const GWEN_TAG16_LIST *tagList); +AQHOME_API AQH_VALUE_LIST *AQH_IpcdMessageValues_ReadValueList(const GWEN_TAG16_LIST *tagList); AQHOME_API AQH_VALUE *AQH_IpcdMessageValues_ReadFirstValue(const GWEN_TAG16_LIST *tagList); AQHOME_API uint32_t AQH_IpcdMessageValues_GetFlags(const GWEN_TAG16_LIST *tagList); diff --git a/aqhome/msg/ipc/m_ipc.c b/aqhome/msg/ipc/m_ipc.c index 7832106..47f76c6 100644 --- a/aqhome/msg/ipc/m_ipc.c +++ b/aqhome/msg/ipc/m_ipc.c @@ -143,28 +143,3 @@ void AQH_IpcMessage_DumpToBuffer(const AQH_MESSAGE *msg, GWEN_BUFFER *dbuf, cons -const char *AQH_IpcMessage_MsgTypeToChar(uint16_t i) -{ - switch(i) { - case AQH_MSGTYPE_IPC_DATA_RESULT: return "Result"; - case AQH_MSGTYPE_IPC_DATA_CONNECT_REQ: return "Connect(Req)"; - case AQH_MSGTYPE_IPC_DATA_UPDATEDATA: return "UpdateData"; - case AQH_MSGTYPE_IPC_DATA_DATACHANGED: return "DataChanged"; - case AQH_MSGTYPE_IPC_DATA_SETDATA: return "SetData"; - case AQH_MSGTYPE_IPC_DATA_ADDVALUE: return "AddValue"; - case AQH_MSGTYPE_IPC_DATA_GETDATA_REQ: return "GetData(Req)"; - case AQH_MSGTYPE_IPC_DATA_GETDATA_RSP: return "GetData(Rsp)"; - case AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ: return "GetLastData(Req)"; - case AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP: return "GetLastData(Rsp)"; - case AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ: return "GetValues(Req)"; - case AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP: return "GetValues(Rsp)"; - case AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ: return "GetDevices(Req)"; - case AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP: return "GetDevices(Rsp)"; - case AQH_MSGTYPE_IPC_DATA_MODDEVICE_REQ: return "ModDevice(Req)"; - case AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE: return "AnnounceValue"; - default: return "(unknown)"; - } -} - - - diff --git a/aqhome/msg/ipc/m_ipc.h b/aqhome/msg/ipc/m_ipc.h index 35af1de..d110f4d 100644 --- a/aqhome/msg/ipc/m_ipc.h +++ b/aqhome/msg/ipc/m_ipc.h @@ -26,38 +26,6 @@ -#define AQH_IPC_PROTOCOL_DATA_ID 2 -#define AQH_IPC_PROTOCOL_DATA_VERSION 1 - - - -#define AQH_MSGTYPE_IPC_DATA_RESULT 0x0001 /* AQH_ResultIpcMsg */ - -#define AQH_MSGTYPE_IPC_DATA_CONNECT_REQ 0x0010 /* serviceName, userName, password */ - -#define AQH_MSGTYPE_IPC_DATA_UPDATEDATA 0x0100 /* AQH_MultiDataDataIpcMsg */ -#define AQH_MSGTYPE_IPC_DATA_DATACHANGED 0x0200 /* AQH_MultiDataDataIpcMsg */ - -#define AQH_MSGTYPE_IPC_DATA_SETDATA 0x0300 /* AQH_SetDataIpcMsg */ - -#define AQH_MSGTYPE_IPC_DATA_ADDVALUE 0x0400 /* AQH_AddValueDataIpcMsg */ - -#define AQH_MSGTYPE_IPC_DATA_GETDATA_REQ 0x0500 /* AQH_GetDataDataIpcMsg */ -#define AQH_MSGTYPE_IPC_DATA_GETDATA_RSP 0x0600 /* AQH_MultiDataDataIpcMsg */ - -#define AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ 0x0700 /* AQH_GetDataDataIpcMsg */ -#define AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP 0x0800 /* AQH_MultiDataDataIpcMsg */ - -#define AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ 0x0900 /* GWEN_IpcMsg */ -#define AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP 0x0a00 /* AQH_ValuesDataIpcMsg */ - -#define AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ 0x0b00 /* GWEN_IpcMsg */ -#define AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP 0x0c00 /* AQH_DevicesDataIpcMsg */ - -#define AQH_MSGTYPE_IPC_DATA_MODDEVICE_REQ 0x0d00 /* AQH_DevicesDataIpcMsg */ - -#define AQH_MSGTYPE_IPC_DATA_ANNOUNCEVALUE 0x0e00 /* AQH_ValuesDataIpcMsg */ - AQHOME_API AQH_MESSAGE *AQH_IpcMessage_new(uint8_t protoId, uint8_t protoVer, uint16_t code, @@ -80,9 +48,6 @@ AQHOME_API uint8_t *AQH_IpcMessage_GetPayloadPointer(const AQH_MESSAGE *msg); AQHOME_API void AQH_IpcMessage_DumpToBuffer(const AQH_MESSAGE *msg, GWEN_BUFFER *dbuf, const char *sText); -AQHOME_API const char *AQH_IpcMessage_MsgTypeToChar(uint16_t i); - - diff --git a/aqhome/msg/ipc/m_ipc_result.c b/aqhome/msg/ipc/m_ipc_result.c index 01cb7b8..b06c709 100644 --- a/aqhome/msg/ipc/m_ipc_result.c +++ b/aqhome/msg/ipc/m_ipc_result.c @@ -53,6 +53,13 @@ AQH_MESSAGE *AQH_IpcMessageResult_new(uint8_t protoId, uint8_t protoVer, uint16_ +uint32_t AQH_IpcMessageResult_GetResult(const GWEN_TAG16_LIST *tagList) +{ + return tagList?AQH_Tag16_GetTagDataAsUint32(tagList, AQH_MSGDATA_RESULT_TAGS_RESULT, 0):0; +} + + + void AQH_IpcMessageResult_DumpToBuffer(const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList, GWEN_BUFFER *dbuf, const char *sText) { int result=0; @@ -64,8 +71,7 @@ void AQH_IpcMessageResult_DumpToBuffer(const AQH_MESSAGE *msg, const GWEN_TAG16_ } GWEN_Buffer_AppendArgs(dbuf, - "RESULT(%s) %s (code=%d, proto=%d, proto version=%d, result=%d, text=\"%s\")\n", - AQH_IpcMessage_MsgTypeToChar(AQH_IpcMessage_GetCode(msg)), + "RESULT %s (code=%d, proto=%d, proto version=%d, result=%d, text=\"%s\")\n", sText?sText:"", AQH_IpcMessage_GetCode(msg), AQH_IpcMessage_GetProtoId(msg), diff --git a/aqhome/msg/ipc/m_ipc_result.h b/aqhome/msg/ipc/m_ipc_result.h index 0aab871..e95d170 100644 --- a/aqhome/msg/ipc/m_ipc_result.h +++ b/aqhome/msg/ipc/m_ipc_result.h @@ -17,6 +17,18 @@ #include +#define AQH_MSGDATA_RESULT_SUCCESS 0 +#define AQH_MSGDATA_RESULT_ERROR_GENERIC 1 +#define AQH_MSGDATA_RESULT_ERROR_INVALID 2 +#define AQH_MSGDATA_RESULT_ERROR_EXISTS 3 +#define AQH_MSGDATA_RESULT_ERROR_NODATA 4 +#define AQH_MSGDATA_RESULT_ERROR_BADDATA 5 +#define AQH_MSGDATA_RESULT_ERROR_PERMS 6 +#define AQH_MSGDATA_RESULT_ERROR_NOTFOUND 7 +#define AQH_MSGDATA_RESULT_ERROR_IO 8 +#define AQH_MSGDATA_RESULT_ERROR_TRYAGAIN 9 + + #define AQH_MSGDATA_RESULT_TAGS_RESULT 0x0001 #define AQH_MSGDATA_RESULT_TAGS_TEXT 0x0002 @@ -26,6 +38,9 @@ AQHOME_API AQH_MESSAGE *AQH_IpcMessageResult_new(uint8_t protoId, uint8_t protoVer, uint16_t code, uint32_t msgId, uint32_t refMsgId, int result, const char *text); + +AQHOME_API uint32_t AQH_IpcMessageResult_GetResult(const GWEN_TAG16_LIST *tagList); + AQHOME_API void AQH_IpcMessageResult_DumpToBuffer(const AQH_MESSAGE *msg, const GWEN_TAG16_LIST *tagList, GWEN_BUFFER *dbuf, const char *sText);