aqhome-data: use requests for SETDATA ipc command.

This commit is contained in:
Martin Preuss
2024-10-01 22:04:01 +02:00
parent 9c2001285b
commit 6f5da8ee6c
5 changed files with 204 additions and 35 deletions

View File

@@ -27,6 +27,7 @@ AQHOME_DATA *AqHomeData_new()
GWEN_NEW_OBJECT(AQHOME_DATA, aqh);
aqh->storageMutex=GWEN_Mutex_new();
aqh->requestTree=GWEN_MsgRequest_new();
return aqh;
}
@@ -38,6 +39,7 @@ void AqHomeData_free(AQHOME_DATA *aqh)
if (aqh) {
GWEN_Mutex_free(aqh->storageMutex);
GWEN_MsgRequest_free(aqh->requestTree);
GWEN_MsgEndpoint_free(aqh->ipcdEndpoint);
GWEN_DB_Group_free(aqh->dbArgs);
AQH_Storage_free(aqh->storage);
@@ -70,6 +72,21 @@ AQH_STORAGE *AqHomeData_GetStorage(const AQHOME_DATA *aqh)
GWEN_MSG_REQUEST *AqHomeData_GetRequestTree(const AQHOME_DATA *aqh)
{
return aqh?aqh->requestTree:NULL;
}
void AqHomeData_AddRequestToTree(AQHOME_DATA *aqh, GWEN_MSG_REQUEST *rq)
{
if (aqh && rq)
GWEN_MsgRequest_Tree2_AddChild(aqh->requestTree, rq);
}
const char *AqHomeData_GetPidFile(const AQHOME_DATA *aqh)
{
return aqh?aqh->pidFile:NULL;

View File

@@ -13,6 +13,7 @@
#include "aqhome/data/storage.h"
#include <gwenhywfar/endpoint.h>
#include <gwenhywfar/request.h>
@@ -37,6 +38,8 @@ int AqHomeData_GetTimeout(const AQHOME_DATA *aqh);
int AqHomeData_LockStorage(AQHOME_DATA *aqh);
int AqHomeData_UnlockStorage(AQHOME_DATA *aqh);
GWEN_MSG_REQUEST *AqHomeData_GetRequestTree(const AQHOME_DATA *aqh);
void AqHomeData_AddRequestToTree(AQHOME_DATA *aqh, GWEN_MSG_REQUEST *rq);

View File

@@ -36,6 +36,7 @@ struct AQHOME_DATA {
GWEN_MUTEX *storageMutex;
GWEN_MSG_REQUEST *requestTree;
};

View File

@@ -15,6 +15,7 @@
#include "./aqhome_data_p.h"
#include "./loop.h"
#include "aqhome/aqhome.h"
#include "aqhome/ipc/requests.h"
#include "aqhome/ipc/data/ipc_data.h"
#include "aqhome/ipc/data/msg_data_set.h"
#include "aqhome/ipc/endpoint_ipc.h"
@@ -30,6 +31,8 @@
* ------------------------------------------------------------------------------------------------
*/
#define R_SETDATA_REQUEST_EXPIRE_SECS 20
#define R_SETDATA_SUBREQUEST_EXPIRE_SECS 10
@@ -38,7 +41,16 @@
* ------------------------------------------------------------------------------------------------
*/
static int _forwardDataToDriver(AQHOME_DATA *aqh, const AQH_VALUE *v, const char *data);
static GWEN_MSG_REQUEST *_mkRequest_SetData(AQHOME_DATA *aqh,
GWEN_MSG_ENDPOINT *epSrc, uint32_t requestMsgId,
GWEN_MSG_ENDPOINT *epDriver,
const AQH_VALUE *v, const char *data);
static void _rqSubRequestFinished(GWEN_MSG_REQUEST *rq, GWEN_MSG_REQUEST *subRq, int reason);
static void _rqAbort(GWEN_MSG_REQUEST *rq);
static GWEN_MSG_REQUEST *_mkSubRequest_SetData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epDriver, const AQH_VALUE *v, const char *data);
static int _subRqHandleResponse(GWEN_MSG_REQUEST *rq, GWEN_MSG *msg);
static void _subRqAbort(GWEN_MSG_REQUEST *rq);
@@ -49,13 +61,15 @@ static int _forwardDataToDriver(AQHOME_DATA *aqh, const AQH_VALUE *v, const char
void AqHomeData_HandleSetData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epSrc, GWEN_MSG *recvdMsg)
{
GWEN_MSG *outMsg;
int resultCode=AQH_MSG_IPC_SUCCESS;
uint32_t msgId;
AQH_VALUE *recvdValue;
const char *valueName;
char *valueDataFreeable;
AQH_VALUE *systemValue;
msgId=GWEN_IpcMsg_GetMsgId(recvdMsg);
DBG_ERROR(NULL, "Received IPC SetDataRequest message (msgId=%d)", msgId);
AQH_SetDataIpcMsg_Parse(recvdMsg, 0);
recvdValue=AQH_SetDataIpcMsg_ReadValue(recvdMsg);
valueName=recvdValue?AQH_Value_GetNameForSystem(recvdValue):NULL;
@@ -64,57 +78,181 @@ void AqHomeData_HandleSetData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epSrc, GWEN_M
systemValue=AQH_Storage_GetValueByNameForSystem(aqh->storage, valueName);
if (systemValue) {
if (AQH_Value_GetValueType(systemValue)==AQH_ValueType_Actor) {
resultCode=_forwardDataToDriver(aqh, systemValue, valueDataFreeable);
const char *driverName;
driverName=AQH_Value_GetDriver(systemValue);
if (driverName && *driverName) {
GWEN_MSG_ENDPOINT *epDriver;
epDriver=AqHomeData_GetIpcEndpointByServiceName(aqh, driverName);
if (epDriver) {
GWEN_MSG_REQUEST *rq;
DBG_ERROR(NULL, "Creating SETDATA request for driver endpoint (%s)", GWEN_MsgEndpoint_GetName(epDriver));
rq=_mkRequest_SetData(aqh, epSrc, msgId, epDriver, systemValue, valueDataFreeable);
AqHomeData_AddRequestToTree(aqh, rq);
}
else {
DBG_ERROR(NULL, "Driver \"%s\" not available", driverName);
AQH_IpcEndpoint_SendResponseResult(epSrc, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_GENERIC);
}
}
else {
DBG_ERROR(NULL, "No driver name");
AQH_IpcEndpoint_SendResponseResult(epSrc, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_GENERIC);
}
}
else {
DBG_INFO(NULL, "Value \"%s\" is not an actor", valueName);
resultCode=AQH_MSG_IPC_ERROR_INVALID;
DBG_ERROR(NULL, "Value \"%s\" is not an actor", valueName);
AQH_IpcEndpoint_SendResponseResult(epSrc, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_INVALID);
}
}
else {
DBG_INFO(NULL, "Unknown value \"%s\"", valueName);
resultCode=AQH_MSG_IPC_ERROR_INVALID;
DBG_ERROR(NULL, "Unknown value \"%s\"", valueName);
AQH_IpcEndpoint_SendResponseResult(epSrc, msgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_NOTFOUND);
}
AQH_Value_free(recvdValue);
free(valueDataFreeable);
outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT,
GWEN_MsgEndpoint_GetNextMessageId(epSrc), GWEN_IpcMsg_GetMsgId(recvdMsg),
resultCode);
GWEN_MsgEndpoint_AddSendMessage(epSrc, outMsg);
}
int _forwardDataToDriver(AQHOME_DATA *aqh, const AQH_VALUE *v, const char *data)
/* ------------------------------------------------------------------------------------------------
* IPC Request SETDATA
*/
GWEN_MSG_REQUEST *_mkRequest_SetData(AQHOME_DATA *aqh,
GWEN_MSG_ENDPOINT *epSrc, uint32_t requestMsgId,
GWEN_MSG_ENDPOINT *epDriver,
const AQH_VALUE *v, const char *data)
{
const char *driverName;
GWEN_MSG_REQUEST *rq;
GWEN_MSG_REQUEST *subRq;
driverName=AQH_Value_GetDriver(v);
if (driverName && *driverName) {
GWEN_MSG_ENDPOINT *ep;
rq=GWEN_MsgRequest_new();
GWEN_MsgRequest_SetPrivateData(rq, aqh);
GWEN_MsgRequest_SetEndpoint(rq, epSrc);
GWEN_MsgRequest_SetRequestMsgId(rq, requestMsgId);
GWEN_MsgRequest_SetSubRequestFinishedFn(rq, _rqSubRequestFinished);
GWEN_MsgRequest_SetAbortFn(rq, _rqAbort);
GWEN_MsgRequest_SetTimestamps(rq, R_SETDATA_REQUEST_EXPIRE_SECS);
ep=AqHomeData_GetIpcEndpointByServiceName(aqh, driverName);
if (ep) {
GWEN_MSG *driverMsg;
subRq=_mkSubRequest_SetData(aqh, epDriver, v, data);
GWEN_MsgRequest_Tree2_AddChild(rq, subRq);
DBG_INFO(AQH_LOGDOMAIN, "Sending SETDATA msg to driver endpoint (%s)", GWEN_MsgEndpoint_GetName(ep));
driverMsg=AQH_SetDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_SETDATA,
GWEN_MsgEndpoint_GetNextMessageId(ep), 0,
v, data);
GWEN_MsgEndpoint_AddSendMessage(ep, driverMsg);
return AQH_MSG_IPC_SUCCESS;
}
else {
DBG_INFO(NULL, "Driver \"%s\" not available", driverName);
return AQH_MSG_IPC_ERROR_GENERIC;
}
return rq;
}
void _rqSubRequestFinished(GWEN_MSG_REQUEST *rq, GWEN_MSG_REQUEST *subRq, int reason)
{
GWEN_MSG_ENDPOINT *ep;
uint32_t refMsgId;
int result;
DBG_ERROR(NULL, "SubRequest finished (reason: %d)", reason);
refMsgId=GWEN_MsgRequest_GetRequestMsgId(rq);
ep=GWEN_MsgRequest_GetEndpoint(rq);
result=GWEN_MsgRequest_GetResult(subRq);
if (reason==GWEN_MSG_REQUEST_REASON_ABORTED)
AQH_IpcEndpoint_SendResponseResult(ep, refMsgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_GENERIC);
else
AQH_IpcEndpoint_SendResponseResult(ep, refMsgId, AQH_MSGTYPE_IPC_DATA_RESULT, result);
GWEN_MsgRequest_SetResult(rq, result);
GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE);
}
void _rqAbort(GWEN_MSG_REQUEST *rq)
{
GWEN_MSG_ENDPOINT *ep;
uint32_t refMsgId;
GWEN_MSG_REQUEST *rqParent;
DBG_INFO(NULL, "Aborting request");
refMsgId=GWEN_MsgRequest_GetRequestMsgId(rq);
ep=GWEN_MsgRequest_GetEndpoint(rq);
AQH_IpcEndpoint_SendResponseResult(ep, refMsgId, AQH_MSGTYPE_IPC_DATA_RESULT, AQH_MSG_IPC_ERROR_GENERIC);
GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE);
rqParent=GWEN_MsgRequest_Tree2_GetParent(rq);
if (rqParent)
GWEN_MsgRequest_SubRequestFinished(rqParent, rq, GWEN_MSG_REQUEST_REASON_ABORTED);
}
/* ------------------------------------------------------------------------------------------------
* Driver Request SETDATA
*/
GWEN_MSG_REQUEST *_mkSubRequest_SetData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epDriver, const AQH_VALUE *v, const char *data)
{
GWEN_MSG_REQUEST *rq;
uint16_t msgId;
GWEN_MSG *driverMsg;
rq=GWEN_MsgRequest_new();
GWEN_MsgRequest_SetPrivateData(rq, aqh);
GWEN_MsgRequest_SetEndpoint(rq, epDriver);
GWEN_MsgRequest_SetHandleResponseFn(rq, _subRqHandleResponse);
GWEN_MsgRequest_SetAbortFn(rq, _subRqAbort);
msgId=GWEN_MsgEndpoint_GetNextMessageId(epDriver);
GWEN_MsgRequest_SetRequestMsgId(rq, msgId);
GWEN_MsgRequest_SetTimestamps(rq, R_SETDATA_SUBREQUEST_EXPIRE_SECS);
driverMsg=AQH_SetDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_SETDATA, msgId, 0, v, data);
GWEN_MsgEndpoint_AddSendMessage(epDriver, driverMsg);
return rq;
}
int _subRqHandleResponse(GWEN_MSG_REQUEST *rq, GWEN_MSG *msg)
{
DBG_ERROR(NULL, "Checking message from driver");
if (GWEN_IpcMsg_GetCode(msg)==AQH_MSGTYPE_IPC_DATA_RESULT) {
uint32_t result;
GWEN_MSG_REQUEST *rqParent;
result=AQH_ResultIpcMsg_GetResultCode(msg);
DBG_ERROR(NULL, "Received result for request: %d", result);
GWEN_MsgRequest_SetResult(rq, result);
GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE);
rqParent=GWEN_MsgRequest_Tree2_GetParent(rq);
if (rqParent)
GWEN_MsgRequest_SubRequestFinished(rqParent, rq, GWEN_MSG_REQUEST_REASON_DONE);
return GWEN_MSG_REQUEST_RESULT_HANDLED;
}
else {
DBG_INFO(NULL, "No driver name");
return AQH_MSG_IPC_ERROR_GENERIC;
DBG_ERROR(NULL, "Unexpected response message %d", GWEN_IpcMsg_GetCode(msg));
}
return GWEN_MSG_REQUEST_RESULT_NOT_HANDLED;
}
void _subRqAbort(GWEN_MSG_REQUEST *rq)
{
GWEN_MSG_REQUEST *rqParent;
DBG_ERROR(NULL, "Aborting request");
GWEN_MsgRequest_SetResult(rq, AQH_MSG_IPC_ERROR_GENERIC);
GWEN_MsgRequest_SetState(rq, GWEN_MSG_REQUEST_STATE_DONE);
rqParent=GWEN_MsgRequest_Tree2_GetParent(rq);
if (rqParent)
GWEN_MsgRequest_SubRequestFinished(rqParent, rq, GWEN_MSG_REQUEST_REASON_ABORTED);
}

View File

@@ -28,6 +28,7 @@
#include "aqhome/ipc/data/msg_data_datapoints.h"
#include "aqhome/ipc/endpoint_ipc.h"
#include "aqhome/ipc/msg_ipc_result.h"
#include "aqhome/ipc/requests.h"
#include <gwenhywfar/gwenhywfar.h>
#include <gwenhywfar/args.h>
@@ -67,6 +68,9 @@ void AqHomeData_Loop(AQHOME_DATA *aqh, int timeoutInMsecs)
if (aqh) {
GWEN_MsgEndpoint_IoLoop(aqh->ipcdEndpoint, timeoutInMsecs);
_readAndHandleIpcMessages(aqh);
AQH_Requests_CheckTimeouts(aqh->requestTree);
AQH_Requests_Cleanup(aqh->requestTree);
}
}
@@ -210,7 +214,13 @@ void _handleIpcEndpoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep)
GWEN_MSG *msg;
while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep)) ) {
_handleIpcMsg(aqh, ep, msg);
DBG_ERROR(NULL, "Got IPS message %d (msgId=%d, refMsgId=%d) [%s]",
GWEN_IpcMsg_GetCode(msg),
GWEN_IpcMsg_GetMsgId(msg),
GWEN_IpcMsg_GetRefMsgId(msg),
GWEN_MsgEndpoint_GetName(ep));
if (AQH_Requests_HandleIpcMsg(aqh->requestTree, ep, msg)!=GWEN_MSG_REQUEST_RESULT_HANDLED)
_handleIpcMsg(aqh, ep, msg);
GWEN_Msg_free(msg);
}
}