diff --git a/apps/aqhome-data/aqhome_data.c b/apps/aqhome-data/aqhome_data.c index 950cce0..4223084 100644 --- a/apps/aqhome-data/aqhome_data.c +++ b/apps/aqhome-data/aqhome_data.c @@ -27,6 +27,7 @@ AQHOME_DATA *AqHomeData_new() GWEN_NEW_OBJECT(AQHOME_DATA, aqh); aqh->storageMutex=GWEN_Mutex_new(); + aqh->requestTree=GWEN_MsgRequest_new(); return aqh; } @@ -38,6 +39,7 @@ void AqHomeData_free(AQHOME_DATA *aqh) if (aqh) { GWEN_Mutex_free(aqh->storageMutex); + GWEN_MsgRequest_free(aqh->requestTree); GWEN_MsgEndpoint_free(aqh->ipcdEndpoint); GWEN_DB_Group_free(aqh->dbArgs); AQH_Storage_free(aqh->storage); @@ -70,6 +72,21 @@ AQH_STORAGE *AqHomeData_GetStorage(const AQHOME_DATA *aqh) +GWEN_MSG_REQUEST *AqHomeData_GetRequestTree(const AQHOME_DATA *aqh) +{ + return aqh?aqh->requestTree:NULL; +} + + + +void AqHomeData_AddRequestToTree(AQHOME_DATA *aqh, GWEN_MSG_REQUEST *rq) +{ + if (aqh && rq) + GWEN_MsgRequest_Tree2_AddChild(aqh->requestTree, rq); +} + + + const char *AqHomeData_GetPidFile(const AQHOME_DATA *aqh) { return aqh?aqh->pidFile:NULL; diff --git a/apps/aqhome-data/aqhome_data.h b/apps/aqhome-data/aqhome_data.h index 60e0227..17503f3 100644 --- a/apps/aqhome-data/aqhome_data.h +++ b/apps/aqhome-data/aqhome_data.h @@ -13,6 +13,7 @@ #include "aqhome/data/storage.h" #include +#include @@ -37,6 +38,8 @@ int AqHomeData_GetTimeout(const AQHOME_DATA *aqh); int AqHomeData_LockStorage(AQHOME_DATA *aqh); int AqHomeData_UnlockStorage(AQHOME_DATA *aqh); +GWEN_MSG_REQUEST *AqHomeData_GetRequestTree(const AQHOME_DATA *aqh); +void AqHomeData_AddRequestToTree(AQHOME_DATA *aqh, GWEN_MSG_REQUEST *rq); diff --git a/apps/aqhome-data/aqhome_data_p.h b/apps/aqhome-data/aqhome_data_p.h index c10e08f..65a2dc5 100644 --- a/apps/aqhome-data/aqhome_data_p.h +++ b/apps/aqhome-data/aqhome_data_p.h @@ -36,6 +36,7 @@ struct AQHOME_DATA { GWEN_MUTEX *storageMutex; + GWEN_MSG_REQUEST *requestTree; }; diff --git a/apps/aqhome-data/c_setdata.c b/apps/aqhome-data/c_setdata.c index 487ee96..9143f36 100644 --- a/apps/aqhome-data/c_setdata.c +++ b/apps/aqhome-data/c_setdata.c @@ -15,6 +15,7 @@ #include "./aqhome_data_p.h" #include "./loop.h" #include "aqhome/aqhome.h" +#include "aqhome/ipc/requests.h" #include "aqhome/ipc/data/ipc_data.h" #include "aqhome/ipc/data/msg_data_set.h" #include "aqhome/ipc/endpoint_ipc.h" @@ -30,6 +31,8 @@ * ------------------------------------------------------------------------------------------------ */ +#define R_SETDATA_REQUEST_EXPIRE_SECS 20 +#define R_SETDATA_SUBREQUEST_EXPIRE_SECS 10 @@ -38,7 +41,16 @@ * ------------------------------------------------------------------------------------------------ */ -static int _forwardDataToDriver(AQHOME_DATA *aqh, const AQH_VALUE *v, const char *data); +static GWEN_MSG_REQUEST *_mkRequest_SetData(AQHOME_DATA *aqh, + GWEN_MSG_ENDPOINT *epSrc, uint32_t requestMsgId, + GWEN_MSG_ENDPOINT *epDriver, + const AQH_VALUE *v, const char *data); +static void _rqSubRequestFinished(GWEN_MSG_REQUEST *rq, GWEN_MSG_REQUEST *subRq, int reason); +static void _rqAbort(GWEN_MSG_REQUEST *rq); + +static GWEN_MSG_REQUEST *_mkSubRequest_SetData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epDriver, const AQH_VALUE *v, const char *data); +static int _subRqHandleResponse(GWEN_MSG_REQUEST *rq, GWEN_MSG *msg); +static void _subRqAbort(GWEN_MSG_REQUEST *rq); @@ -49,13 +61,15 @@ static int _forwardDataToDriver(AQHOME_DATA *aqh, const AQH_VALUE *v, const char void AqHomeData_HandleSetData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epSrc, GWEN_MSG *recvdMsg) { - GWEN_MSG *outMsg; - int resultCode=AQH_MSG_IPC_SUCCESS; + uint32_t msgId; AQH_VALUE *recvdValue; const char *valueName; char *valueDataFreeable; AQH_VALUE *systemValue; + msgId=GWEN_IpcMsg_GetMsgId(recvdMsg); + DBG_ERROR(NULL, "Received IPC SetDataRequest message (msgId=%d)", msgId); + AQH_SetDataIpcMsg_Parse(recvdMsg, 0); recvdValue=AQH_SetDataIpcMsg_ReadValue(recvdMsg); valueName=recvdValue?AQH_Value_GetNameForSystem(recvdValue):NULL; @@ -64,57 +78,181 @@ void AqHomeData_HandleSetData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epSrc, GWEN_M systemValue=AQH_Storage_GetValueByNameForSystem(aqh->storage, valueName); if (systemValue) { if (AQH_Value_GetValueType(systemValue)==AQH_ValueType_Actor) { - resultCode=_forwardDataToDriver(aqh, systemValue, valueDataFreeable); + const char *driverName; + + driverName=AQH_Value_GetDriver(systemValue); + if (driverName && *driverName) { + GWEN_MSG_ENDPOINT *epDriver; + + epDriver=AqHomeData_GetIpcEndpointByServiceName(aqh, driverName); + if (epDriver) { + GWEN_MSG_REQUEST *rq; + + DBG_ERROR(NULL, "Creating SETDATA request for driver endpoint (%s)", GWEN_MsgEndpoint_GetName(epDriver)); + rq=_mkRequest_SetData(aqh, epSrc, msgId, epDriver, systemValue, valueDataFreeable); + AqHomeData_AddRequestToTree(aqh, rq); + } + else { + DBG_ERROR(NULL, "Driver \"%s\" not available", driverName); + AQH_IpcEndpoint_SendResponseResult(epSrc, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_GENERIC); + } + } + else { + DBG_ERROR(NULL, "No driver name"); + AQH_IpcEndpoint_SendResponseResult(epSrc, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_GENERIC); + } } else { - DBG_INFO(NULL, "Value \"%s\" is not an actor", valueName); - resultCode=AQH_MSG_IPC_ERROR_INVALID; + DBG_ERROR(NULL, "Value \"%s\" is not an actor", valueName); + AQH_IpcEndpoint_SendResponseResult(epSrc, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_INVALID); } } else { - DBG_INFO(NULL, "Unknown value \"%s\"", valueName); - resultCode=AQH_MSG_IPC_ERROR_INVALID; + DBG_ERROR(NULL, "Unknown value \"%s\"", valueName); + AQH_IpcEndpoint_SendResponseResult(epSrc, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_NOTFOUND); } AQH_Value_free(recvdValue); free(valueDataFreeable); - - outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, - GWEN_MsgEndpoint_GetNextMessageId(epSrc), GWEN_IpcMsg_GetMsgId(recvdMsg), - resultCode); - GWEN_MsgEndpoint_AddSendMessage(epSrc, outMsg); } -int _forwardDataToDriver(AQHOME_DATA *aqh, const AQH_VALUE *v, const char *data) +/* ------------------------------------------------------------------------------------------------ + * IPC Request SETDATA + */ + +GWEN_MSG_REQUEST *_mkRequest_SetData(AQHOME_DATA *aqh, + GWEN_MSG_ENDPOINT *epSrc, uint32_t requestMsgId, + GWEN_MSG_ENDPOINT *epDriver, + const AQH_VALUE *v, const char *data) { - const char *driverName; + GWEN_MSG_REQUEST *rq; + GWEN_MSG_REQUEST *subRq; - driverName=AQH_Value_GetDriver(v); - if (driverName && *driverName) { - GWEN_MSG_ENDPOINT *ep; + rq=GWEN_MsgRequest_new(); + GWEN_MsgRequest_SetPrivateData(rq, aqh); + GWEN_MsgRequest_SetEndpoint(rq, epSrc); + GWEN_MsgRequest_SetRequestMsgId(rq, requestMsgId); + GWEN_MsgRequest_SetSubRequestFinishedFn(rq, _rqSubRequestFinished); + GWEN_MsgRequest_SetAbortFn(rq, _rqAbort); + GWEN_MsgRequest_SetTimestamps(rq, R_SETDATA_REQUEST_EXPIRE_SECS); - ep=AqHomeData_GetIpcEndpointByServiceName(aqh, driverName); - if (ep) { - GWEN_MSG *driverMsg; + subRq=_mkSubRequest_SetData(aqh, epDriver, v, data); + GWEN_MsgRequest_Tree2_AddChild(rq, subRq); - DBG_INFO(AQH_LOGDOMAIN, "Sending SETDATA msg to driver endpoint (%s)", GWEN_MsgEndpoint_GetName(ep)); - driverMsg=AQH_SetDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_SETDATA, - GWEN_MsgEndpoint_GetNextMessageId(ep), 0, - v, data); - GWEN_MsgEndpoint_AddSendMessage(ep, driverMsg); - return AQH_MSG_IPC_SUCCESS; - } - else { - DBG_INFO(NULL, "Driver \"%s\" not available", driverName); - return AQH_MSG_IPC_ERROR_GENERIC; - } + return rq; +} + + + +void _rqSubRequestFinished(GWEN_MSG_REQUEST *rq, GWEN_MSG_REQUEST *subRq, int reason) +{ + GWEN_MSG_ENDPOINT *ep; + uint32_t refMsgId; + int result; + + DBG_ERROR(NULL, "SubRequest finished (reason: %d)", reason); + refMsgId=GWEN_MsgRequest_GetRequestMsgId(rq); + ep=GWEN_MsgRequest_GetEndpoint(rq); + result=GWEN_MsgRequest_GetResult(subRq); + + if (reason==GWEN_MSG_REQUEST_REASON_ABORTED) + AQH_IpcEndpoint_SendResponseResult(ep, refMsgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_GENERIC); + else + AQH_IpcEndpoint_SendResponseResult(ep, refMsgId, AQH_MSGTYPE_IPC_DATA_RESULT, result); + + GWEN_MsgRequest_SetResult(rq, result); + GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE); +} + + + +void _rqAbort(GWEN_MSG_REQUEST *rq) +{ + GWEN_MSG_ENDPOINT *ep; + uint32_t refMsgId; + GWEN_MSG_REQUEST *rqParent; + + DBG_INFO(NULL, "Aborting request"); + refMsgId=GWEN_MsgRequest_GetRequestMsgId(rq); + ep=GWEN_MsgRequest_GetEndpoint(rq); + AQH_IpcEndpoint_SendResponseResult(ep, refMsgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_GENERIC); + GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE); + + rqParent=GWEN_MsgRequest_Tree2_GetParent(rq); + if (rqParent) + GWEN_MsgRequest_SubRequestFinished(rqParent, rq, GWEN_MSG_REQUEST_REASON_ABORTED); +} + + + +/* ------------------------------------------------------------------------------------------------ + * Driver Request SETDATA + */ + + +GWEN_MSG_REQUEST *_mkSubRequest_SetData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epDriver, const AQH_VALUE *v, const char *data) +{ + GWEN_MSG_REQUEST *rq; + uint16_t msgId; + GWEN_MSG *driverMsg; + + rq=GWEN_MsgRequest_new(); + GWEN_MsgRequest_SetPrivateData(rq, aqh); + GWEN_MsgRequest_SetEndpoint(rq, epDriver); + + GWEN_MsgRequest_SetHandleResponseFn(rq, _subRqHandleResponse); + GWEN_MsgRequest_SetAbortFn(rq, _subRqAbort); + + msgId=GWEN_MsgEndpoint_GetNextMessageId(epDriver); + GWEN_MsgRequest_SetRequestMsgId(rq, msgId); + GWEN_MsgRequest_SetTimestamps(rq, R_SETDATA_SUBREQUEST_EXPIRE_SECS); + + driverMsg=AQH_SetDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_SETDATA, msgId, 0, v, data); + GWEN_MsgEndpoint_AddSendMessage(epDriver, driverMsg); + + return rq; +} + + + +int _subRqHandleResponse(GWEN_MSG_REQUEST *rq, GWEN_MSG *msg) +{ + DBG_ERROR(NULL, "Checking message from driver"); + if (GWEN_IpcMsg_GetCode(msg)==AQH_MSGTYPE_IPC_DATA_RESULT) { + uint32_t result; + GWEN_MSG_REQUEST *rqParent; + + result=AQH_ResultIpcMsg_GetResultCode(msg); + DBG_ERROR(NULL, "Received result for request: %d", result); + GWEN_MsgRequest_SetResult(rq, result); + GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE); + rqParent=GWEN_MsgRequest_Tree2_GetParent(rq); + if (rqParent) + GWEN_MsgRequest_SubRequestFinished(rqParent, rq, GWEN_MSG_REQUEST_REASON_DONE); + return GWEN_MSG_REQUEST_RESULT_HANDLED; } else { - DBG_INFO(NULL, "No driver name"); - return AQH_MSG_IPC_ERROR_GENERIC; + DBG_ERROR(NULL, "Unexpected response message %d", GWEN_IpcMsg_GetCode(msg)); } + + return GWEN_MSG_REQUEST_RESULT_NOT_HANDLED; } +void _subRqAbort(GWEN_MSG_REQUEST *rq) +{ + GWEN_MSG_REQUEST *rqParent; + + DBG_ERROR(NULL, "Aborting request"); + + GWEN_MsgRequest_SetResult(rq, AQH_MSG_IPC_ERROR_GENERIC); + GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE); + + rqParent=GWEN_MsgRequest_Tree2_GetParent(rq); + if (rqParent) + GWEN_MsgRequest_SubRequestFinished(rqParent, rq, GWEN_MSG_REQUEST_REASON_ABORTED); +} + + diff --git a/apps/aqhome-data/loop.c b/apps/aqhome-data/loop.c index cbdb418..275ed86 100644 --- a/apps/aqhome-data/loop.c +++ b/apps/aqhome-data/loop.c @@ -28,6 +28,7 @@ #include "aqhome/ipc/data/msg_data_datapoints.h" #include "aqhome/ipc/endpoint_ipc.h" #include "aqhome/ipc/msg_ipc_result.h" +#include "aqhome/ipc/requests.h" #include #include @@ -67,6 +68,9 @@ void AqHomeData_Loop(AQHOME_DATA *aqh, int timeoutInMsecs) if (aqh) { GWEN_MsgEndpoint_IoLoop(aqh->ipcdEndpoint, timeoutInMsecs); _readAndHandleIpcMessages(aqh); + + AQH_Requests_CheckTimeouts(aqh->requestTree); + AQH_Requests_Cleanup(aqh->requestTree); } } @@ -210,7 +214,13 @@ void _handleIpcEndpoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep) GWEN_MSG *msg; while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep)) ) { - _handleIpcMsg(aqh, ep, msg); + DBG_ERROR(NULL, "Got IPS message %d (msgId=%d, refMsgId=%d) [%s]", + GWEN_IpcMsg_GetCode(msg), + GWEN_IpcMsg_GetMsgId(msg), + GWEN_IpcMsg_GetRefMsgId(msg), + GWEN_MsgEndpoint_GetName(ep)); + if (AQH_Requests_HandleIpcMsg(aqh->requestTree, ep, msg)!=GWEN_MSG_REQUEST_RESULT_HANDLED) + _handleIpcMsg(aqh, ep, msg); GWEN_Msg_free(msg); } }