From 1e27223dfa2ded6225283fdc59820d570542de44 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Sun, 1 Oct 2023 21:31:02 +0200 Subject: [PATCH] Simplified IPC code to use less different IPC messages. Share more code. More qork on MQTT code. --- apps/aqhome-data/c_connect.c | 26 +- apps/aqhome-data/c_connect.h | 2 +- apps/aqhome-data/c_getdatapoints.c | 28 +- apps/aqhome-data/c_getdatapoints.h | 2 +- apps/aqhome-data/c_getlastdatapoint.c | 70 +++-- apps/aqhome-data/c_getlastdatapoint.h | 2 +- apps/aqhome-data/c_getvalues.c | 2 +- apps/aqhome-data/c_setdata.c | 1 + apps/aqhome-data/c_updatedata.c | 41 +-- apps/aqhome-data/c_updatedata.h | 2 +- apps/aqhome-data/loop.c | 54 +++- apps/aqhome-data/loop.h | 4 + apps/aqhome-mqttlog/0BUILD | 2 + apps/aqhome-mqttlog/aqhome_mqtt.c | 2 - apps/aqhome-mqttlog/fini.c | 80 ++++++ apps/aqhome-mqttlog/fini.h | 23 ++ apps/aqhome-mqttlog/init.c | 4 + apps/aqhome-mqttlog/mqtt.c | 6 +- apps/aqhome-nodes/loop_tty_broker.c | 49 +++- apps/aqhome-tool/data/adddata.c | 8 +- apps/aqhome-tool/data/getlastdatapoint.c | 49 ++-- apps/aqhome-tool/data/getvalues.c | 34 +-- apps/aqhome-tool/data/setdata.c | 2 +- apps/aqhome-tool/utils.c | 17 +- aqhome/data/device.t2d | 54 ++-- aqhome/data/value.t2d | 14 + aqhome/ipc/0BUILD | 1 + aqhome/ipc/data/ipc_data.c | 242 ++++++++++++++++ aqhome/ipc/data/ipc_data.h | 14 + aqhome/ipc/data/msg_data_connect.c | 30 +- aqhome/ipc/data/msg_data_connect.h | 2 + aqhome/ipc/data/msg_data_getdata.c | 24 +- aqhome/ipc/data/msg_data_getdata.h | 2 + aqhome/ipc/data/msg_data_multidata.c | 93 ++++-- aqhome/ipc/data/msg_data_multidata.h | 15 +- aqhome/ipc/data/msg_data_singledata.c | 42 ++- aqhome/ipc/data/msg_data_singledata.h | 4 + aqhome/ipc/data/msg_data_value.c | 16 ++ aqhome/ipc/data/msg_data_value.h | 6 +- aqhome/ipc/data/msg_data_values.c | 274 +++++------------- aqhome/ipc/data/msg_data_values.h | 18 +- aqhome/ipc/msg_ipc_tag16.c | 182 +++++++++++- aqhome/ipc/msg_ipc_tag16.h | 17 +- aqhome/ipc/msg_ipc_tag16_p.h | 25 ++ aqhome/libtest.c | 52 +++- aqhome/mqtt/endpoint_mqttc.c | 347 +++++++++-------------- aqhome/mqtt/endpoint_mqttc.h | 2 + aqhome/mqtt/msg_mqtt.c | 5 + aqhome/mqtt/msg_mqtt_suback.c | 32 +++ aqhome/mqtt/msg_mqtt_suback.h | 1 + 50 files changed, 1326 insertions(+), 698 deletions(-) create mode 100644 apps/aqhome-mqttlog/fini.c create mode 100644 apps/aqhome-mqttlog/fini.h create mode 100644 aqhome/ipc/msg_ipc_tag16_p.h diff --git a/apps/aqhome-data/c_connect.c b/apps/aqhome-data/c_connect.c index 5284306..3b0c629 100644 --- a/apps/aqhome-data/c_connect.c +++ b/apps/aqhome-data/c_connect.c @@ -44,34 +44,29 @@ * ------------------------------------------------------------------------------------------------ */ -void AqHomeData_HandleConnect(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) +void AqHomeData_HandleConnect(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg) { GWEN_MSG *outMsg; int resultCode=AQH_MSG_IPC_SUCCESS; - GWEN_TAG16_LIST *tagList; char *clientId=NULL; char *userId=NULL; char *passw=NULL; + uint32_t flags; - tagList=AQH_Tag16IpcMsg_ParseTags(msg, 0); - if (tagList) { - const GWEN_TAG16 *tag; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_CONNECT_TAGS_CLIENTID); - clientId=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_CONNECT_TAGS_USERID); - userId=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_CONNECT_TAGS_PASSWORD); - passw=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - } + AQH_ConnectDataIpcMsg_Parse(msg, 0); + clientId=AQH_Tag16IpcMsg_GetTagDataAsNewString(msg, AQH_MSGDATA_CONNECT_TAGS_CLIENTID, NULL); + userId=AQH_Tag16IpcMsg_GetTagDataAsNewString(msg, AQH_MSGDATA_CONNECT_TAGS_USERID, NULL); + flags=AQH_Tag16IpcMsg_GetTagDataAsUint32(msg, AQH_MSGDATA_CONNECT_TAGS_FLAGS, 0); + passw=AQH_Tag16IpcMsg_GetTagDataAsNewString(msg, AQH_MSGDATA_CONNECT_TAGS_PASSWORD, NULL); if (clientId) AQH_IpcEndpoint_SetServiceName(ep, clientId); if (userId) AQH_IpcEndpoint_SetUserName(ep, userId); + if (flags & AQH_MSGDATA_CONNECT_FLAGS_WANTUPDATES) + GWEN_MsgEndpoint_AddFlags(ep, AQH_IPCENDPOINT_FLAGS_WANTUPDATES); + /* TODO: add user management, for now we allow all */ AQH_IpcEndpoint_SetPermissions(ep, AQH_IPCENDPOINT_PERMS_LISTVALUES | @@ -83,7 +78,6 @@ void AqHomeData_HandleConnect(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWE free(passw); free(userId); free(clientId); - GWEN_Tag16_List_free(tagList); outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode); GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); diff --git a/apps/aqhome-data/c_connect.h b/apps/aqhome-data/c_connect.h index bfc8550..c6a3792 100644 --- a/apps/aqhome-data/c_connect.h +++ b/apps/aqhome-data/c_connect.h @@ -13,7 +13,7 @@ #include "./aqhome_data.h" -void AqHomeData_HandleConnect(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); +void AqHomeData_HandleConnect(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg); diff --git a/apps/aqhome-data/c_getdatapoints.c b/apps/aqhome-data/c_getdatapoints.c index 8963f49..a15d116 100644 --- a/apps/aqhome-data/c_getdatapoints.c +++ b/apps/aqhome-data/c_getdatapoints.c @@ -48,32 +48,22 @@ static int _getAndSendDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const */ -void AqHomeData_HandleGetDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *recvdMsg) +void AqHomeData_HandleGetDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *recvdMsg) { GWEN_MSG *outMsg; int resultCode=AQH_MSG_IPC_SUCCESS; if (AQH_IpcEndpoint_GetPermissions(ep) & AQH_IPCENDPOINT_PERMS_READDATA) { - GWEN_TAG16_LIST *tagList; AQH_VALUE *value; - char *valueName=NULL; - uint64_t tsBegin=0; - uint64_t tsEnd=0; - - tagList=AQH_Tag16IpcMsg_ParseTags(recvdMsg, 0); - if (tagList) { - const GWEN_TAG16 *tag; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_GETDATA_TAGS_NAME); - valueName=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_GETDATA_TAGS_BEGIN); - tsBegin=tag?GWEN_Tag16_GetTagDataAsUint64(tag, 0):0; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_GETDATA_TAGS_END); - tsEnd=tag?GWEN_Tag16_GetTagDataAsUint64(tag, 0):0; - } + char *valueName; + uint64_t tsBegin; + uint64_t tsEnd; + AQH_GetDataDataIpcMsg_Parse(recvdMsg, 0); + valueName=AQH_Tag16IpcMsg_GetTagDataAsNewString(recvdMsg, AQH_MSGDATA_GETDATA_TAGS_NAME, NULL); + tsBegin=AQH_Tag16IpcMsg_GetTagDataAsUint64(recvdMsg, AQH_MSGDATA_GETDATA_TAGS_BEGIN, 0); + tsEnd=AQH_Tag16IpcMsg_GetTagDataAsUint64(recvdMsg, AQH_MSGDATA_GETDATA_TAGS_END, 0); + value=AQH_Storage_GetValueByNameForSystem(aqh->storage, valueName); if (value) { resultCode=_getAndSendDataPoints(aqh, ep, value, tsBegin, tsEnd); diff --git a/apps/aqhome-data/c_getdatapoints.h b/apps/aqhome-data/c_getdatapoints.h index b89fa5b..3add821 100644 --- a/apps/aqhome-data/c_getdatapoints.h +++ b/apps/aqhome-data/c_getdatapoints.h @@ -13,7 +13,7 @@ #include "./aqhome_data.h" -void AqHomeData_HandleGetDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); +void AqHomeData_HandleGetDataPoints(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg); diff --git a/apps/aqhome-data/c_getlastdatapoint.c b/apps/aqhome-data/c_getlastdatapoint.c index fc44a1b..46013c2 100644 --- a/apps/aqhome-data/c_getlastdatapoint.c +++ b/apps/aqhome-data/c_getlastdatapoint.c @@ -14,8 +14,8 @@ #include "./c_getlastdatapoint.h" #include "./aqhome_data_p.h" #include "aqhome/ipc/data/ipc_data.h" -#include "aqhome/ipc/data/msg_data_value.h" -#include "aqhome/ipc/data/msg_data_singledata.h" +#include "aqhome/ipc/data/msg_data_getdata.h" +#include "aqhome/ipc/data/msg_data_multidata.h" #include "aqhome/ipc/endpoint_ipc.h" #include "aqhome/ipc/msg_ipc_result.h" #include "aqhome/ipc/msg_ipc_tag16.h" @@ -45,54 +45,50 @@ */ -void AqHomeData_HandleGetLastDataPoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *recvdMsg) +void AqHomeData_HandleGetLastDataPoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *recvdMsg) { GWEN_MSG *outMsg; int resultCode=AQH_MSG_IPC_SUCCESS; if (AQH_IpcEndpoint_GetPermissions(ep) & AQH_IPCENDPOINT_PERMS_READDATA) { - GWEN_TAG16_LIST *tagList; - const AQH_VALUE *value; - char *valueName=NULL; - - tagList=AQH_Tag16IpcMsg_ParseTags(recvdMsg, 0); - if (tagList) { - const GWEN_TAG16 *tag; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_VALUE_TAGS_NAME); - valueName=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - } - - value=AQH_Storage_GetValueByNameForSystem(aqh->storage, valueName); - if (value) { - uint64_t timestamp=0; - double data=0.0; - int rv; - - rv=AQH_Storage_GetLastDataPoint(aqh->storage, AQH_Value_GetId(value), ×tamp, &data); - if (rv<0) { - switch(rv) { - case GWEN_ERROR_INVALID: resultCode=AQH_MSG_IPC_ERROR_INVALID; break; - case GWEN_ERROR_NO_DATA: resultCode=AQH_MSG_IPC_ERROR_NODATA; break; - default: resultCode=AQH_MSG_IPC_ERROR_GENERIC; break; + char *valueName; + + AQH_GetDataDataIpcMsg_Parse(recvdMsg, 0); + valueName=AQH_Tag16IpcMsg_GetTagDataAsNewString(recvdMsg, AQH_MSGDATA_GETDATA_TAGS_NAME, NULL); + if (valueName && *valueName) { + const AQH_VALUE *storedValue; + + storedValue=AQH_Storage_GetValueByNameForSystem(aqh->storage, valueName); + if (storedValue) { + uint64_t timestamp=0; + double data=0.0; + int rv; + + rv=AQH_Storage_GetLastDataPoint(aqh->storage, AQH_Value_GetId(storedValue), ×tamp, &data); + if (rv<0) { + switch(rv) { + case GWEN_ERROR_INVALID: resultCode=AQH_MSG_IPC_ERROR_INVALID; break; + case GWEN_ERROR_NO_DATA: resultCode=AQH_MSG_IPC_ERROR_NODATA; break; + default: resultCode=AQH_MSG_IPC_ERROR_GENERIC; break; + } + } + else { + outMsg=AQH_MultiDataDataIpcMsg_newForOne(AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP, storedValue, timestamp, data); + GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); + free(valueName); + return; } } else { - outMsg=AQH_SingleDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETLASTDATA_RSP, - AQH_Value_GetNameForSystem(value), - AQH_Value_GetValueUnits(value), - AQH_Value_GetValueType(value), - timestamp, data); - GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); - free(valueName); - return; + DBG_INFO(NULL, "Value \"%s\" not found", valueName); + resultCode=AQH_MSG_IPC_ERROR_NOTFOUND; } + free(valueName); } else { - DBG_INFO(NULL, "Value \"%s\" not found", valueName); + DBG_INFO(NULL, "No name for value"); resultCode=AQH_MSG_IPC_ERROR_NOTFOUND; } - free(valueName); } else { DBG_ERROR(AQH_LOGDOMAIN, "No permissions to read data"); diff --git a/apps/aqhome-data/c_getlastdatapoint.h b/apps/aqhome-data/c_getlastdatapoint.h index 972c905..2524c2d 100644 --- a/apps/aqhome-data/c_getlastdatapoint.h +++ b/apps/aqhome-data/c_getlastdatapoint.h @@ -13,7 +13,7 @@ #include "./aqhome_data.h" -void AqHomeData_HandleGetLastDataPoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); +void AqHomeData_HandleGetLastDataPoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg); diff --git a/apps/aqhome-data/c_getvalues.c b/apps/aqhome-data/c_getvalues.c index 2444f40..3b04121 100644 --- a/apps/aqhome-data/c_getvalues.c +++ b/apps/aqhome-data/c_getvalues.c @@ -96,7 +96,7 @@ void _sendValueList(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const AQH_VALUE_LIS { GWEN_MSG *msg; - msg=AQH_ValuesDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP, flags, vl, 1); + msg=AQH_ValuesDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP, flags, vl); GWEN_MsgEndpoint_AddSendMessage(ep, msg); } diff --git a/apps/aqhome-data/c_setdata.c b/apps/aqhome-data/c_setdata.c index 2700718..4711f37 100644 --- a/apps/aqhome-data/c_setdata.c +++ b/apps/aqhome-data/c_setdata.c @@ -109,6 +109,7 @@ int _forwardDataToDriver(AQHOME_DATA *aqh, const AQH_VALUE *v, uint64_t timestam AQH_Value_GetNameForDriver(v), AQH_Value_GetValueUnits(v), AQH_Value_GetValueType(v), + NULL, timestamp, datapoint); GWEN_MsgEndpoint_AddSendMessage(ep, driverMsg); diff --git a/apps/aqhome-data/c_updatedata.c b/apps/aqhome-data/c_updatedata.c index 50d0035..5d92828 100644 --- a/apps/aqhome-data/c_updatedata.c +++ b/apps/aqhome-data/c_updatedata.c @@ -48,39 +48,28 @@ static void _sendDataChangedMsgToAllClients(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT * ------------------------------------------------------------------------------------------------ */ -void AqHomeData_HandleUpdateData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *recvdMsg) +void AqHomeData_HandleUpdateData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *recvdMsg) { GWEN_MSG *outMsg; int resultCode=AQH_MSG_IPC_SUCCESS; - GWEN_TAG16_LIST *tagList; - char *valueName=NULL; - char *valueUnits=NULL; - int valueType; + const GWEN_TAG16 *tag; + AQH_VALUE *recvdValue; + const char *valueName; const uint64_t *dataPoints=NULL; unsigned int numberOfPoints=0; - tagList=AQH_Tag16IpcMsg_ParseTags(recvdMsg, 0); - if (tagList) { - const GWEN_TAG16 *tag; + AQH_MultiDataDataIpcMsg_Parse(recvdMsg, 0); - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_MULTIDATA_TAGS_NAME); - valueName=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_MULTIDATA_TAGS_UNITS); - valueUnits=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_MULTIDATA_TAGS_TYPE); - valueType=tag?GWEN_Tag16_GetTagDataAsUint32(tag, 0):0; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_MULTIDATA_TAGS_DATA); - dataPoints=(const uint64_t*)GWEN_Tag16_GetTagData(tag); - numberOfPoints=(tag?GWEN_Tag16_GetTagLength(tag):0)/(2*sizeof(uint64_t)); - } + recvdValue=AQH_MultiDataDataIpcMsg_ReadValue(recvdMsg); + valueName=recvdValue?AQH_Value_GetNameForDriver(recvdValue):NULL; + tag=AQH_Tag16IpcMsg_FindFirstTagByType(recvdMsg, AQH_MSGDATA_MULTIDATA_TAGS_DATA); + dataPoints=tag?((const uint64_t*)GWEN_Tag16_GetTagData(tag)):NULL; + numberOfPoints=(tag?GWEN_Tag16_GetTagLength(tag):0)/(2*sizeof(uint64_t)); if (numberOfPoints>0) { AQH_VALUE *value; - value=AqHomeData_GetOrCreateValueForDriver(aqh, ep, valueName, valueUnits, valueType); + value=AqHomeData_GetOrCreateValueForDriverWithTemplate(aqh, ep, recvdValue); if (value) { resultCode=_storeDataPoints(aqh, value, dataPoints, numberOfPoints); if (resultCode==AQH_MSG_IPC_SUCCESS) @@ -95,7 +84,7 @@ void AqHomeData_HandleUpdateData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const DBG_INFO(NULL, "No datapoints"); resultCode=AQH_MSG_IPC_ERROR_INVALID; } - free(valueName); + AQH_Value_free(recvdValue); outMsg=AQH_ResultIpcMsg_new(AQH_MSGTYPE_IPC_DATA_RESULT, resultCode); GWEN_MsgEndpoint_AddSendMessage(ep, outMsg); @@ -141,11 +130,7 @@ void _sendDataChangedMsgToAllClients(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *epSrc, GWEN_MSG *msg; DBG_DEBUG(AQH_LOGDOMAIN, "Sending update msg to endpoint %s", GWEN_MsgEndpoint_GetName(ep)); - msg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_DATACHANGED, - AQH_Value_GetNameForSystem(v), - AQH_Value_GetValueUnits(v), - AQH_Value_GetValueType(v), - dataPoints, numValues); + msg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_DATACHANGED, v, dataPoints, numValues); GWEN_MsgEndpoint_AddSendMessage(ep, msg); } else { diff --git a/apps/aqhome-data/c_updatedata.h b/apps/aqhome-data/c_updatedata.h index 34ba363..ca09fd2 100644 --- a/apps/aqhome-data/c_updatedata.h +++ b/apps/aqhome-data/c_updatedata.h @@ -13,7 +13,7 @@ #include "./aqhome_data.h" -void AqHomeData_HandleUpdateData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *recvdMsg); +void AqHomeData_HandleUpdateData(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *recvdMsg); diff --git a/apps/aqhome-data/loop.c b/apps/aqhome-data/loop.c index 1734184..7e4d345 100644 --- a/apps/aqhome-data/loop.c +++ b/apps/aqhome-data/loop.c @@ -48,7 +48,7 @@ static void _readAndHandleIpcMessages(AQHOME_DATA *aqh); static void _handleIpcEndpoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep); -static void _handleIpcMsg(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg); +static void _handleIpcMsg(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg); @@ -110,14 +110,7 @@ AQH_VALUE *AqHomeData_GetOrCreateValueForDriver(AQHOME_DATA *aqh, serviceName=AQH_IpcEndpoint_GetServiceName(epDriver); buf=GWEN_Buffer_new(0, 256, 0, 1); - if (serviceName && *serviceName) { - GWEN_Buffer_AppendString(buf, serviceName); - GWEN_Buffer_AppendString(buf, "/"); - } - else { - GWEN_Buffer_AppendString(buf, "unknown/"); - } - GWEN_Buffer_AppendString(buf, nameForDriver); + GWEN_Buffer_AppendArgs(buf, "%s/%s", (serviceName && *serviceName)?serviceName:"unknown", nameForDriver); v=AQH_Storage_GetValueByNameForSystem(aqh->storage, GWEN_Buffer_GetStart(buf)); if (v==NULL) { @@ -143,6 +136,47 @@ AQH_VALUE *AqHomeData_GetOrCreateValueForDriver(AQHOME_DATA *aqh, +AQH_VALUE *AqHomeData_GetOrCreateValueForDriverWithTemplate(AQHOME_DATA *aqh, + GWEN_MSG_ENDPOINT *epDriver, + const AQH_VALUE *valueTemplate) +{ + const char *serviceName; + AQH_VALUE *v; + GWEN_BUFFER *buf; + const char *nameForDriver; + + serviceName=AQH_IpcEndpoint_GetServiceName(epDriver); + nameForDriver=AQH_Value_GetNameForDriver(valueTemplate); + + buf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Buffer_AppendArgs(buf, "%s/%s", (serviceName && *serviceName)?serviceName:"unknown", nameForDriver); + + v=AQH_Storage_GetValueByNameForSystem(aqh->storage, GWEN_Buffer_GetStart(buf)); + if (v==NULL) { + if (AQH_IpcEndpoint_GetPermissions(epDriver) & AQH_IPCENDPOINT_PERMS_ADDVALUE) { + /* TODO: get or create device */ + DBG_INFO(AQH_LOGDOMAIN, "Creating value \"%s\"", GWEN_Buffer_GetStart(buf)); + v=AQH_Value_new(); + AQH_Value_SetDriver(v, serviceName); + AQH_Value_SetNameForDriver(v, AQH_Value_GetNameForDriver(valueTemplate)); + AQH_Value_SetNameForSystem(v, GWEN_Buffer_GetStart(buf)); + AQH_Value_SetValueUnits(v, AQH_Value_GetValueUnits(valueTemplate)); + AQH_Value_SetValueType(v, AQH_Value_GetValueType(valueTemplate)); + AQH_Value_SetTimestampCreation(v, (uint64_t) time(NULL)); + AQH_Storage_AddValue(aqh->storage, v); + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "No permissions to create value \"%s\"", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + return NULL; + } + } + GWEN_Buffer_free(buf); + return v; +} + + + void _readAndHandleIpcMessages(AQHOME_DATA *aqh) { if (aqh->ipcdEndpoint) { @@ -170,7 +204,7 @@ void _handleIpcEndpoint(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep) -void _handleIpcMsg(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, const GWEN_MSG *msg) +void _handleIpcMsg(AQHOME_DATA *aqh, GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg) { uint16_t code; diff --git a/apps/aqhome-data/loop.h b/apps/aqhome-data/loop.h index a60a1b3..abfe230 100644 --- a/apps/aqhome-data/loop.h +++ b/apps/aqhome-data/loop.h @@ -23,6 +23,10 @@ AQH_VALUE *AqHomeData_GetOrCreateValueForDriver(AQHOME_DATA *aqh, const char *units, int valueType); +AQH_VALUE *AqHomeData_GetOrCreateValueForDriverWithTemplate(AQHOME_DATA *aqh, + GWEN_MSG_ENDPOINT *epDriver, + const AQH_VALUE *valueTemplate); + #endif diff --git a/apps/aqhome-mqttlog/0BUILD b/apps/aqhome-mqttlog/0BUILD index 2fae7ed..226bfe9 100644 --- a/apps/aqhome-mqttlog/0BUILD +++ b/apps/aqhome-mqttlog/0BUILD @@ -46,6 +46,7 @@ init.h + fini.h aqhome_mqtt.h aqhome_mqtt_p.h mqtt.h @@ -56,6 +57,7 @@ $(local/typefiles) aqhome_mqtt.c init.c + fini.c main.c mqtt.c messages.c diff --git a/apps/aqhome-mqttlog/aqhome_mqtt.c b/apps/aqhome-mqttlog/aqhome_mqtt.c index bb5a418..293f929 100644 --- a/apps/aqhome-mqttlog/aqhome_mqtt.c +++ b/apps/aqhome-mqttlog/aqhome_mqtt.c @@ -30,8 +30,6 @@ AQHOME_MQTT *AqHomeMqtt_new() GWEN_NEW_OBJECT(AQHOME_MQTT, aqh); - aqh->rootEndpoint=GWEN_MsgEndpoint_new("root", 0); - return aqh; } diff --git a/apps/aqhome-mqttlog/fini.c b/apps/aqhome-mqttlog/fini.c new file mode 100644 index 0000000..2209428 --- /dev/null +++ b/apps/aqhome-mqttlog/fini.c @@ -0,0 +1,80 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "./fini.h" +#include "./aqhome_mqtt_p.h" + +#include +#include +#include +#include + +#include + + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void _disconnectTree(GWEN_MSG_ENDPOINT *ep); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + +void AqHomeMqtt_Fini(AQHOME_MQTT *aqh) +{ + if (aqh) { + if (aqh->rootEndpoint) + _disconnectTree(aqh->rootEndpoint); + GWEN_MsgEndpoint_free(aqh->rootEndpoint); + aqh->rootEndpoint=NULL; + aqh->brokerEndpoint=NULL; + aqh->mqttEndpoint=NULL; + + if (aqh->pidFile) + remove(aqh->pidFile); + } +} + + + +void _disconnectTree(GWEN_MSG_ENDPOINT *ep) +{ + GWEN_MSG_ENDPOINT *epChild; + + epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep); + while(epChild) { + _disconnectTree(epChild); + epChild=GWEN_MsgEndpoint_Tree2_GetNext(epChild); + } /* while */ + + GWEN_MsgEndpoint_Disconnect(ep); +} + + + + diff --git a/apps/aqhome-mqttlog/fini.h b/apps/aqhome-mqttlog/fini.h new file mode 100644 index 0000000..7ec8e1f --- /dev/null +++ b/apps/aqhome-mqttlog/fini.h @@ -0,0 +1,23 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQHOMEMQTT_FINI_H +#define AQHOMEMQTT_FINI_H + + +#include "./aqhome_mqtt.h" + + + +void AqHomeMqtt_Fini(AQHOME_MQTT *aqh); + + + +#endif + + diff --git a/apps/aqhome-mqttlog/init.c b/apps/aqhome-mqttlog/init.c index c3ee1ce..ce20ccb 100644 --- a/apps/aqhome-mqttlog/init.c +++ b/apps/aqhome-mqttlog/init.c @@ -73,6 +73,8 @@ int AqHomeMqtt_Init(AQHOME_MQTT *aqh, int argc, char **argv) } aqh->dbArgs=dbArgs; + aqh->timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0); + s=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, AQHOME_MQTT_DEFAULT_PIDFILE); if (s && *s) { AqHomeMqtt_SetPidFile(aqh, s); @@ -83,6 +85,8 @@ int AqHomeMqtt_Init(AQHOME_MQTT *aqh, int argc, char **argv) } } + aqh->rootEndpoint=GWEN_MsgEndpoint_new("root", 0); + rv=_setupMqtt(aqh, dbArgs); if (rv<0) { DBG_ERROR(NULL, "Error setting up connection to broker (%d)", rv); diff --git a/apps/aqhome-mqttlog/mqtt.c b/apps/aqhome-mqttlog/mqtt.c index 4ab30b7..fc69bea 100644 --- a/apps/aqhome-mqttlog/mqtt.c +++ b/apps/aqhome-mqttlog/mqtt.c @@ -22,8 +22,9 @@ #include #include -#include +#include #include +#include #include #include @@ -75,6 +76,7 @@ GWEN_MSG_ENDPOINT *AqHomeMqttLog_CreateMqttEndpoint(GWEN_DB_NODE *dbArgs) return NULL; } AQH_MqttClientEndpoint_SetKeepAliveTime(epMqtt, mqttKeepAlive); + GWEN_MsgEndpoint_AddFlags(epMqtt, AQH_ENDPOINT2_MQTTCLIENT_FLAGS_SUBSCRIBEALL); return epMqtt; } @@ -89,7 +91,7 @@ int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT *epTcp) if (GWEN_MsgEndpoint_GetState(epTcp)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { int rv; - rv=AQH_MqttClientEndpoint_StartConnect(epTcp); + rv=GWEN_MultilayerEndpoint_StartConnect(epTcp); if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { DBG_ERROR(NULL, "Error starting to connect (%d)", rv); return rv; diff --git a/apps/aqhome-nodes/loop_tty_broker.c b/apps/aqhome-nodes/loop_tty_broker.c index bb96635..23befaa 100644 --- a/apps/aqhome-nodes/loop_tty_broker.c +++ b/apps/aqhome-nodes/loop_tty_broker.c @@ -52,6 +52,8 @@ static void _processSendStatsMessage(AQHOMED *aqh, const GWEN_MSG *nodeMsg); static void _processRecvStatsMessage(AQHOMED *aqh, const GWEN_MSG *nodeMsg); static void _publishInt(AQHOMED *aqh, uint32_t uid, int valueId, const char *valueUnits, const char *valuePath, int v); static void _publishDouble(AQHOMED *aqh, uint32_t uid, int valueId, const char *valueUnits, const char *valuePath, double v); +static void _setValueNameForDriver(AQH_VALUE *value, uint32_t uid, int valueId, const char *valuePath); +static void _setDeviceName(AQH_VALUE *value, uint32_t uid); @@ -162,29 +164,58 @@ void _publishInt(AQHOMED *aqh, uint32_t uid, int valueId, const char *valueUnits void _publishDouble(AQHOMED *aqh, uint32_t uid, int valueId, const char *valueUnits, const char *valuePath, double v) { - GWEN_BUFFER *bufTopic; GWEN_MSG *pubMsg; union {double f; uint64_t i;} u; uint64_t arrayToSend[2]; + AQH_VALUE *value; u.f=v; arrayToSend[0]=(uint64_t) time(NULL); arrayToSend[1]=u.i; - bufTopic=GWEN_Buffer_new(0, 64, 0, 1); - if (valueId>0) - GWEN_Buffer_AppendArgs(bufTopic, "%08x/%d/%s", uid, valueId, valuePath); - else - GWEN_Buffer_AppendArgs(bufTopic, "%08x/%s", uid, valuePath); + value=AQH_Value_new(); + _setValueNameForDriver(value, uid, valueId, valuePath); + AQH_Value_SetValueUnits(value, valueUnits); + AQH_Value_SetValueType(value, 0); + _setDeviceName(value, uid); - pubMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, GWEN_Buffer_GetStart(bufTopic), valueUnits, 0, arrayToSend, 1); + pubMsg=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, value, arrayToSend, 1); if (pubMsg) { - DBG_INFO(AQH_LOGDOMAIN, "BROKER PUBLISH %s: %f", GWEN_Buffer_GetStart(bufTopic), v); + DBG_INFO(AQH_LOGDOMAIN, "BROKER PUBLISH %s: %f", AQH_Value_GetNameForDriver(value), v); GWEN_MsgEndpoint_AddSendMessage(aqh->brokerEndpoint, pubMsg); } - GWEN_Buffer_free(bufTopic); + AQH_Value_free(value); +} + + + +void _setValueNameForDriver(AQH_VALUE *value, uint32_t uid, int valueId, const char *valuePath) +{ + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + if (valueId>0) + GWEN_Buffer_AppendArgs(buf, "%08x/%d/%s", uid, valueId, valuePath); + else + GWEN_Buffer_AppendArgs(buf, "%08x/%s", uid, valuePath); + AQH_Value_SetNameForDriver(value, GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); +} + + + +void _setDeviceName(AQH_VALUE *value, uint32_t uid) +{ + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 64, 0, 1); + GWEN_Buffer_AppendArgs(buf, "%08x", uid); + AQH_Value_SetDeviceNameForDriver(value, GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); } + + diff --git a/apps/aqhome-tool/data/adddata.c b/apps/aqhome-tool/data/adddata.c index 89b9cb9..c49584f 100644 --- a/apps/aqhome-tool/data/adddata.c +++ b/apps/aqhome-tool/data/adddata.c @@ -291,12 +291,18 @@ void _sendCommand(GWEN_MSG_ENDPOINT *epTcp, const char *valueName, const char *v GWEN_MSG *msgOut; union {double f; uint64_t i;} u; uint64_t arrayToSend[2]; + AQH_VALUE *value; u.f=dataToSend; arrayToSend[0]=timestampToSend; arrayToSend[1]=u.i; - msgOut=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, valueName, valueUnits, 0, arrayToSend, 1); + value=AQH_Value_new(); + AQH_Value_SetNameForDriver(value, valueName); + AQH_Value_SetValueUnits(value, valueUnits); + + msgOut=AQH_MultiDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, value, arrayToSend, 1); + AQH_Value_free(value); GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); } diff --git a/apps/aqhome-tool/data/getlastdatapoint.c b/apps/aqhome-tool/data/getlastdatapoint.c index 2c78442..f38b156 100644 --- a/apps/aqhome-tool/data/getlastdatapoint.c +++ b/apps/aqhome-tool/data/getlastdatapoint.c @@ -15,8 +15,8 @@ #include "aqhome/msg/msg_node.h" #include "aqhome/ipc/msg_ipc_result.h" -#include "aqhome/ipc/data/msg_data_value.h" -#include "aqhome/ipc/data/msg_data_singledata.h" +#include "aqhome/ipc/data/msg_data_getdata.h" +#include "aqhome/ipc/data/msg_data_multidata.h" #include "aqhome/ipc/data/ipc_data.h" #include "aqhome/ipc/msg_ipc_tag16.h" @@ -41,7 +41,7 @@ static int _doGetLastDataPoint(GWEN_DB_NODE *dbArgs); static void _sendCommand(GWEN_MSG_ENDPOINT *epTcp, const char *valueName); static int _awaitAndHandleResponse(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds); -static int _handleDataResponse(const GWEN_MSG *msg); +static int _handleDataResponse(GWEN_MSG *msg); @@ -208,7 +208,7 @@ void _sendCommand(GWEN_MSG_ENDPOINT *epTcp, const char *valueName) { GWEN_MSG *msgOut; - msgOut=AQH_ValueDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ, valueName, NULL, 0); + msgOut=AQH_GetDataDataIpcMsg_new(AQH_MSGTYPE_IPC_DATA_GETLASTDATA_REQ, valueName, 0, 0); GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut); } @@ -251,35 +251,32 @@ int _awaitAndHandleResponse(GWEN_MSG_ENDPOINT *epTcp, int timeoutInSeconds) -int _handleDataResponse(const GWEN_MSG *msg) +int _handleDataResponse(GWEN_MSG *msg) { - GWEN_TAG16_LIST *tagList; + AQH_VALUE *value; + const GWEN_TAG16 *tag; + const char *valueUnits; + unsigned int numberOfPoints; + const uint64_t *dataPoints; - tagList=AQH_Tag16IpcMsg_ParseTags(msg, 0); - if (tagList) { - const GWEN_TAG16 *tag; - char *valueUnits; + AQH_MultiDataDataIpcMsg_Parse(msg, 0); + value=AQH_MultiDataDataIpcMsg_ReadValue(msg); + valueUnits=value?AQH_Value_GetValueUnits(value):NULL; + + tag=AQH_Tag16IpcMsg_FindFirstTagByType(msg, AQH_MSGDATA_MULTIDATA_TAGS_DATA); + numberOfPoints=(tag?GWEN_Tag16_GetTagLength(tag):0)/(2*sizeof(uint64_t)); + dataPoints=tag?((const uint64_t*) GWEN_Tag16_GetTagData(tag)):NULL; + if (numberOfPoints>0 && dataPoints) { uint64_t timestamp; union {double f; uint64_t i;} u; - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_UNITS); - valueUnits=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_TIME); - timestamp=tag?GWEN_Tag16_GetTagDataAsUint64(tag, 0):0; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_DATA); - u.i=tag?GWEN_Tag16_GetTagDataAsUint64(tag, 0):0; - + timestamp=dataPoints[0]; + u.i=dataPoints[1]; Utils_PrintSingleDataPoint(timestamp, u.f, valueUnits); + } - free(valueUnits); - return 0; - } - else { - DBG_ERROR(NULL, "Invalid message received"); - return 3; - } + AQH_Value_free(value); + return 0; } diff --git a/apps/aqhome-tool/data/getvalues.c b/apps/aqhome-tool/data/getvalues.c index a18d1c6..e158630 100644 --- a/apps/aqhome-tool/data/getvalues.c +++ b/apps/aqhome-tool/data/getvalues.c @@ -176,34 +176,36 @@ int _doGetValues(GWEN_DB_NODE *dbArgs) } code=GWEN_IpcMsg_GetCode(msg); if (code==AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP) { - if (AQH_ValuesDataIpcMsg_IsValid(msg)) { - uint32_t numValues; - uint32_t i; + AQH_VALUE_LIST *valueList; - numValues=AQH_ValuesDataIpcMsg_GetNumValues(msg); - for(i=0; i #include +#include #include #include @@ -257,7 +258,21 @@ void Utils_PrintDataPoints(const uint64_t *dataPoints, uint32_t numValues, const void Utils_PrintSingleDataPoint(uint64_t timestamp, double data, const char *valueUnits) { - fprintf(stdout, "%lu\t%lf\t%s\n", (unsigned long int) timestamp, data, valueUnits?valueUnits:""); + GWEN_TIMESTAMP *ts; + + ts=GWEN_Timestamp_fromLocalTime((time_t) timestamp); + if (ts) + fprintf(stdout, "%04d/%02d/%02d-%02d:%02d:%02d\t%lf\t%s\n", + GWEN_Timestamp_GetYear(ts), + GWEN_Timestamp_GetMonth(ts), + GWEN_Timestamp_GetDay(ts), + GWEN_Timestamp_GetHour(ts), + GWEN_Timestamp_GetMinute(ts), + GWEN_Timestamp_GetSecond(ts), + data, valueUnits?valueUnits:""); + else + fprintf(stdout, "\t%lf\t%s\n", + data, valueUnits?valueUnits:""); } diff --git a/aqhome/data/device.t2d b/aqhome/data/device.t2d index f2b3bd1..64eec70 100644 --- a/aqhome/data/device.t2d +++ b/aqhome/data/device.t2d @@ -37,28 +37,35 @@ with_getbymember - - 0 - 0 - public - - - - - 0 - 0 - public - own with_getbymember - - - + 0 0 public own - + + 0 + 0 + public + own + + + + 0 + 0 + public + own with_getbymember + + + + 0 + 0 + public + own with_getbymember + + + 0 0 public @@ -72,6 +79,21 @@ own + + + 0 + 0 + public + own + + + + 0 + 0 + public + own + + diff --git a/aqhome/data/value.t2d b/aqhome/data/value.t2d index d804333..ac19a22 100644 --- a/aqhome/data/value.t2d +++ b/aqhome/data/value.t2d @@ -73,6 +73,20 @@ own with_getbymember + + 0 + 0 + public + own + + + + 0 + 0 + public + own + + 0 0 diff --git a/aqhome/ipc/0BUILD b/aqhome/ipc/0BUILD index b742089..7cc10f0 100644 --- a/aqhome/ipc/0BUILD +++ b/aqhome/ipc/0BUILD @@ -55,6 +55,7 @@ endpoint_ipc_p.h + msg_ipc_tag16_p.h diff --git a/aqhome/ipc/data/ipc_data.c b/aqhome/ipc/data/ipc_data.c index 94f35cc..91bcdee 100644 --- a/aqhome/ipc/data/ipc_data.c +++ b/aqhome/ipc/data/ipc_data.c @@ -12,5 +12,247 @@ #include +#include +#include + +#include +#include + + +/* ------------------------------------------------------------------------------------------------ + * defines + * ------------------------------------------------------------------------------------------------ + */ + +#define AQH_IPCDATA_VALUE_TAGS_ID 0x01 +#define AQH_IPCDATA_VALUE_TAGS_DRIVER 0x02 +#define AQH_IPCDATA_VALUE_TAGS_NAMEFORDRIVER 0x03 +#define AQH_IPCDATA_VALUE_TAGS_NAMEFORSYSTEM 0x04 +#define AQH_IPCDATA_VALUE_TAGS_TYPE 0x05 +#define AQH_IPCDATA_VALUE_TAGS_UNITS 0x06 +#define AQH_IPCDATA_VALUE_TAGS_TIMEOFCREATION 0x07 +#define AQH_IPCDATA_VALUE_TAGS_DEVFORDRIVER 0x08 +#define AQH_IPCDATA_VALUE_TAGS_DEVFORSYSTEM 0x09 + + + +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void _writeValueFieldsAsTagsToBuffer(const AQH_VALUE *value, GWEN_BUFFER *buf); +static AQH_VALUE *_readValueFromTag(const uint8_t *ptr, uint32_t len); + + + +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ + + + +int AQH_DataIpc_WriteValueListAsTagsToBuffer(unsigned int tagType, const AQH_VALUE_LIST *valueList, GWEN_BUFFER *buf) +{ + if (valueList) { + const AQH_VALUE *value; + + value=AQH_Value_List_First(valueList); + while(value) { + int rv; + + rv=AQH_DataIpc_WriteValueAsTagToBuffer(tagType, value, buf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + GWEN_Buffer_free(buf); + return rv; + } + + value=AQH_Value_List_Next(value); + } + } + return 0; +} + + + +int AQH_DataIpc_WriteValueAsTagToBuffer(unsigned int tagType, const AQH_VALUE *value, GWEN_BUFFER *buf) +{ + int startPos; + int rv; + + startPos=GWEN_Tag16_StartTagInBuffer(tagType, buf); + if (startPos<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", startPos); + return startPos; + } + + _writeValueFieldsAsTagsToBuffer(value, buf); + + rv=GWEN_Tag16_EndTagInBuffer(startPos, buf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return rv; + } + + return 0; +} + + + +void _writeValueFieldsAsTagsToBuffer(const AQH_VALUE *value, GWEN_BUFFER *buf) +{ + const char *s; + + GWEN_Tag16_WriteUint64TagToBuffer(AQH_IPCDATA_VALUE_TAGS_ID, AQH_Value_GetId(value), buf); + + s=AQH_Value_GetDriver(value); + if (s && *s) + GWEN_Tag16_WriteStringTagToBuffer(AQH_IPCDATA_VALUE_TAGS_DRIVER, s, buf); + + s=AQH_Value_GetNameForDriver(value); + if (s && *s) + GWEN_Tag16_WriteStringTagToBuffer(AQH_IPCDATA_VALUE_TAGS_NAMEFORDRIVER, s, buf); + + s=AQH_Value_GetNameForSystem(value); + if (s && *s) + GWEN_Tag16_WriteStringTagToBuffer(AQH_IPCDATA_VALUE_TAGS_NAMEFORSYSTEM, s, buf); + + GWEN_Tag16_WriteUint32TagToBuffer(AQH_IPCDATA_VALUE_TAGS_TYPE, AQH_Value_GetValueType(value), buf); + + s=AQH_Value_GetValueUnits(value); + if (s && *s) + GWEN_Tag16_WriteStringTagToBuffer(AQH_IPCDATA_VALUE_TAGS_UNITS, s, buf); + + GWEN_Tag16_WriteUint64TagToBuffer(AQH_IPCDATA_VALUE_TAGS_TIMEOFCREATION, AQH_Value_GetTimestampCreation(value), buf); + + s=AQH_Value_GetDeviceNameForDriver(value); + if (s && *s) + GWEN_Tag16_WriteStringTagToBuffer(AQH_IPCDATA_VALUE_TAGS_DEVFORDRIVER, s, buf); + + s=AQH_Value_GetDeviceNameForSystem(value); + if (s && *s) + GWEN_Tag16_WriteStringTagToBuffer(AQH_IPCDATA_VALUE_TAGS_DEVFORSYSTEM, s, buf); +} + + + + + + +AQH_VALUE_LIST *AQH_DataIpc_ReadValuesFromTagList(const GWEN_TAG16_LIST *tagList, unsigned int wantedTagType) +{ + AQH_VALUE_LIST *valueList; + const GWEN_TAG16 *tag; + + valueList=AQH_Value_List_new(); + tag=GWEN_Tag16_List_First(tagList); + while(tag) { + unsigned int tagType; + AQH_VALUE *value; + + tagType=GWEN_Tag16_GetTagType(tag); + if (tagType==wantedTagType) { + value=_readValueFromTag((const uint8_t*) GWEN_Tag16_GetTagData(tag), (uint32_t) GWEN_Tag16_GetTagLength(tag)); + if (value) + AQH_Value_List_Add(value, valueList); + } + + tag=GWEN_Tag16_List_Next(tag); + } + + if (AQH_Value_List_GetCount(valueList)<1) { + AQH_Value_List_free(valueList); + return NULL; + } + + return valueList; +} + + + +AQH_VALUE *AQH_DataIpc_ReadValueFromTagList(const GWEN_TAG16_LIST *tagList, unsigned int wantedTagType) +{ + if (tagList) { + const GWEN_TAG16 *tag; + + tag=GWEN_Tag16_List_FindFirstByTagType(tagList, wantedTagType); + return tag?_readValueFromTag((const uint8_t*) GWEN_Tag16_GetTagData(tag), (uint32_t) GWEN_Tag16_GetTagLength(tag)):NULL; + } + return NULL; +} + + + +AQH_VALUE *_readValueFromTag(const uint8_t *ptr, uint32_t len) +{ + GWEN_TAG16_LIST *tagList; + + tagList=GWEN_Tag16_List_fromBuffer(ptr, len, 0); + if (tagList) { + GWEN_TAG16 *tag; + AQH_VALUE *value; + + value=AQH_Value_new(); + tag=GWEN_Tag16_List_First(tagList); + while(tag) { + unsigned int tagType; + char *s; + + tagType=GWEN_Tag16_GetTagType(tag); + switch(tagType) { + case AQH_IPCDATA_VALUE_TAGS_ID: + AQH_Value_SetId(value, GWEN_Tag16_GetTagDataAsUint64(tag, 0)); + break; + case AQH_IPCDATA_VALUE_TAGS_DRIVER: + s=GWEN_Tag16_GetTagDataAsNewString(tag, NULL); + AQH_Value_SetDriver(value, s); + free(s); + break; + case AQH_IPCDATA_VALUE_TAGS_NAMEFORDRIVER: + s=GWEN_Tag16_GetTagDataAsNewString(tag, NULL); + AQH_Value_SetNameForDriver(value, s); + free(s); + break; + case AQH_IPCDATA_VALUE_TAGS_NAMEFORSYSTEM: + s=GWEN_Tag16_GetTagDataAsNewString(tag, NULL); + AQH_Value_SetNameForSystem(value, s); + free(s); + break; + case AQH_IPCDATA_VALUE_TAGS_TYPE: + AQH_Value_SetValueType(value, GWEN_Tag16_GetTagDataAsUint32(tag, 0)); + break; + case AQH_IPCDATA_VALUE_TAGS_UNITS: + s=GWEN_Tag16_GetTagDataAsNewString(tag, NULL); + AQH_Value_SetValueUnits(value, s); + free(s); + break; + case AQH_IPCDATA_VALUE_TAGS_TIMEOFCREATION: + AQH_Value_SetTimestampCreation(value, GWEN_Tag16_GetTagDataAsUint64(tag, 0)); + break; + case AQH_IPCDATA_VALUE_TAGS_DEVFORDRIVER: + s=GWEN_Tag16_GetTagDataAsNewString(tag, NULL); + AQH_Value_SetDeviceNameForDriver(value, s); + free(s); + break; + case AQH_IPCDATA_VALUE_TAGS_DEVFORSYSTEM: + s=GWEN_Tag16_GetTagDataAsNewString(tag, NULL); + AQH_Value_SetDeviceNameForSystem(value, s); + free(s); + break; + default: + DBG_INFO(AQH_LOGDOMAIN, "Unhandled tag typ %d (%02x)", tagType, tagType); + break; + } + tag=GWEN_Tag16_List_Next(tag); + } + GWEN_Tag16_List_free(tagList); + return value; + } + + return NULL; +} + diff --git a/aqhome/ipc/data/ipc_data.h b/aqhome/ipc/data/ipc_data.h index 1c924d9..094d7d0 100644 --- a/aqhome/ipc/data/ipc_data.h +++ b/aqhome/ipc/data/ipc_data.h @@ -11,8 +11,12 @@ #include +#include #include +#include +#include + #define AQH_IPC_PROTOCOL_DATA_ID 2 @@ -41,6 +45,16 @@ + + +/* utils */ +AQHOME_API int AQH_DataIpc_WriteValueListAsTagsToBuffer(unsigned int tagType, const AQH_VALUE_LIST *valueList, GWEN_BUFFER *buf); +AQHOME_API int AQH_DataIpc_WriteValueAsTagToBuffer(unsigned int tagType, const AQH_VALUE *value, GWEN_BUFFER *buf); +AQHOME_API AQH_VALUE_LIST *AQH_DataIpc_ReadValuesFromTagList(const GWEN_TAG16_LIST *tagList, unsigned int wantedTagType); +AQHOME_API AQH_VALUE *AQH_DataIpc_ReadValueFromTagList(const GWEN_TAG16_LIST *tagList, unsigned int wantedTagType); + + + #endif diff --git a/aqhome/ipc/data/msg_data_connect.c b/aqhome/ipc/data/msg_data_connect.c index 4703f91..f38f194 100644 --- a/aqhome/ipc/data/msg_data_connect.c +++ b/aqhome/ipc/data/msg_data_connect.c @@ -49,28 +49,23 @@ GWEN_MSG *AQH_ConnectDataIpcMsg_new(uint16_t code, const char *clientId, const c +void AQH_ConnectDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy) +{ + AQH_Tag16IpcMsg_ExtendAndParse(msg, doCopy); +} + + void AQH_ConnectDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) { if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSGDATA_CONNECT_MINSIZE) { - GWEN_TAG16_LIST *tagList; - char *clientId=NULL; - char *userId=NULL; - uint32_t flags=0; + char *clientId; + char *userId; + uint32_t flags; - tagList=AQH_Tag16IpcMsg_ParseTags(msg, 0); - if (tagList) { - const GWEN_TAG16 *tag; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_CONNECT_TAGS_CLIENTID); - clientId=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_CONNECT_TAGS_USERID); - userId=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_CONNECT_TAGS_FLAGS); - flags=tag?GWEN_Tag16_GetTagDataAsUint32(tag, 0):0; - } + clientId=AQH_Tag16IpcMsg_GetTagDataAsNewString(msg, AQH_MSGDATA_CONNECT_TAGS_CLIENTID, NULL); + userId=AQH_Tag16IpcMsg_GetTagDataAsNewString(msg, AQH_MSGDATA_CONNECT_TAGS_USERID, NULL); + flags=AQH_Tag16IpcMsg_GetTagDataAsUint32(msg, AQH_MSGDATA_CONNECT_TAGS_FLAGS, 0); GWEN_Buffer_AppendArgs(dbuf, "CONNECT (code=%d, proto=%d, proto version=%d, clientId=%s, userId=%s, flags=%08x)\n", @@ -82,7 +77,6 @@ void AQH_ConnectDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, flags); free(userId); free(clientId); - GWEN_Tag16_List_free(tagList); } } diff --git a/aqhome/ipc/data/msg_data_connect.h b/aqhome/ipc/data/msg_data_connect.h index 9985214..c94044a 100644 --- a/aqhome/ipc/data/msg_data_connect.h +++ b/aqhome/ipc/data/msg_data_connect.h @@ -35,6 +35,8 @@ AQHOME_API GWEN_MSG *AQH_ConnectDataIpcMsg_new(uint16_t code, const char *userId, const char *password, uint32_t flags); +AQHOME_API void AQH_ConnectDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy); + AQHOME_API int AQH_ConnectDataIpcMsg_IsValid(const GWEN_MSG *msg); AQHOME_API void AQH_ConnectDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); diff --git a/aqhome/ipc/data/msg_data_getdata.c b/aqhome/ipc/data/msg_data_getdata.c index bf7e878..33aba6a 100644 --- a/aqhome/ipc/data/msg_data_getdata.c +++ b/aqhome/ipc/data/msg_data_getdata.c @@ -46,28 +46,23 @@ GWEN_MSG *AQH_GetDataDataIpcMsg_new(uint16_t code, const char *valueName, uint64 +void AQH_GetDataDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy) +{ + AQH_Tag16IpcMsg_ExtendAndParse(msg, doCopy); +} + + void AQH_GetDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) { if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSGDATA_GETDATA_MINSIZE) { - GWEN_TAG16_LIST *tagList; char *valueName=NULL; uint64_t tsBegin=0; uint64_t tsEnd=0; - tagList=AQH_Tag16IpcMsg_ParseTags(msg, 0); - if (tagList) { - const GWEN_TAG16 *tag; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_GETDATA_TAGS_NAME); - valueName=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_GETDATA_TAGS_BEGIN); - tsBegin=tag?GWEN_Tag16_GetTagDataAsUint64(tag, 0):0; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_GETDATA_TAGS_END); - tsEnd=tag?GWEN_Tag16_GetTagDataAsUint64(tag, 0):0; - } + valueName=AQH_Tag16IpcMsg_GetTagDataAsNewString(msg, AQH_MSGDATA_GETDATA_TAGS_NAME, NULL); + tsBegin=AQH_Tag16IpcMsg_GetTagDataAsUint64(msg, AQH_MSGDATA_GETDATA_TAGS_BEGIN, 0); + tsEnd=AQH_Tag16IpcMsg_GetTagDataAsUint64(msg, AQH_MSGDATA_GETDATA_TAGS_END, 0); GWEN_Buffer_AppendArgs(dbuf, "GETDATA (code=%d, proto=%d, proto version=%d, name=%s, tsBegin=%lu, tsEnd=%lu)\n", @@ -78,7 +73,6 @@ void AQH_GetDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, (unsigned long int) tsBegin, (unsigned long int) tsEnd); free(valueName); - GWEN_Tag16_List_free(tagList); } } diff --git a/aqhome/ipc/data/msg_data_getdata.h b/aqhome/ipc/data/msg_data_getdata.h index 88b98fb..2fdb62e 100644 --- a/aqhome/ipc/data/msg_data_getdata.h +++ b/aqhome/ipc/data/msg_data_getdata.h @@ -29,6 +29,8 @@ AQHOME_API GWEN_MSG *AQH_GetDataDataIpcMsg_new(uint16_t code, const char *valueName, uint64_t tsBegin, uint64_t tsEnd); +AQHOME_API void AQH_GetDataDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy); + AQHOME_API void AQH_GetDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); diff --git a/aqhome/ipc/data/msg_data_multidata.c b/aqhome/ipc/data/msg_data_multidata.c index c28cb8e..64f8e47 100644 --- a/aqhome/ipc/data/msg_data_multidata.c +++ b/aqhome/ipc/data/msg_data_multidata.c @@ -27,19 +27,21 @@ -GWEN_MSG *AQH_MultiDataDataIpcMsg_new(uint16_t code, - const char *valueName, const char *valueUnits, int valueType, - const uint64_t *i64Ptr, int numOfDataPoints) +GWEN_MSG *AQH_MultiDataDataIpcMsg_new(uint16_t code, const AQH_VALUE *value, const uint64_t *i64Ptr, int numOfDataPoints) { GWEN_MSG *msg; GWEN_BUFFER *buf; + int rv; buf=GWEN_Buffer_new(0, 256, 0, 1); - if (valueName && *valueName) - GWEN_Tag16_WriteStringTagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_NAME, valueName, buf); - if (valueUnits && *valueUnits) - GWEN_Tag16_WriteStringTagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_UNITS, valueUnits, buf); - GWEN_Tag16_WriteUint32TagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_TYPE, valueType, buf); + + rv=AQH_DataIpc_WriteValueAsTagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_VALUE, value, buf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + GWEN_Buffer_free(buf); + return NULL; + } + if (i64Ptr && numOfDataPoints) GWEN_Tag16_WriteTagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_DATA, (const uint8_t*)i64Ptr, numOfDataPoints*2*sizeof(uint64_t), buf); @@ -51,32 +53,67 @@ GWEN_MSG *AQH_MultiDataDataIpcMsg_new(uint16_t code, +GWEN_MSG *AQH_MultiDataDataIpcMsg_newForOne(uint16_t code, const AQH_VALUE *value, uint64_t timeStamp, double dataPoint) +{ + GWEN_MSG *msg; + GWEN_BUFFER *buf; + int rv; + union {double f; uint64_t i;} u; + uint64_t arrayToSend[2]; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + + rv=AQH_DataIpc_WriteValueAsTagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_VALUE, value, buf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + GWEN_Buffer_free(buf); + return NULL; + } + + arrayToSend[0]=timeStamp; + u.f=dataPoint; + arrayToSend[1]=u.i; + GWEN_Tag16_WriteTagToBuffer(AQH_MSGDATA_MULTIDATA_TAGS_DATA, (const uint8_t*) arrayToSend, 2*sizeof(uint64_t), buf); + + msg=AQH_Tag16IpcMsg_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, code, + GWEN_Buffer_GetUsedBytes(buf), (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + return msg; +} + + + +void AQH_MultiDataDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy) +{ + AQH_Tag16IpcMsg_ExtendAndParse(msg, doCopy); +} + + + +AQH_VALUE *AQH_MultiDataDataIpcMsg_ReadValue(const GWEN_MSG *msg) +{ + return AQH_DataIpc_ReadValueFromTagList(AQH_Tag16IpcMsg_GetTags(msg), AQH_MSGDATA_MULTIDATA_TAGS_VALUE); +} + + void AQH_MultiDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) { if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSGDATA_MULTIDATA_MINSIZE) { - GWEN_TAG16_LIST *tagList; - char *valueName=NULL; - char *valueUnits=NULL; + const GWEN_TAG16 *tag; + AQH_VALUE *value; + const char *valueName; + const char *valueUnits; int valueType; unsigned int numberOfPoints=0; - tagList=AQH_Tag16IpcMsg_ParseTags(msg, 0); - if (tagList) { - const GWEN_TAG16 *tag; + value=AQH_MultiDataDataIpcMsg_ReadValue(msg); + valueName=value?AQH_Value_GetNameForSystem(value):NULL; + valueUnits=value?AQH_Value_GetValueUnits(value):NULL; + valueType=value?AQH_Value_GetValueType(value):0; - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_MULTIDATA_TAGS_NAME); - valueName=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_MULTIDATA_TAGS_UNITS); - valueUnits=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_MULTIDATA_TAGS_TYPE); - valueType=tag?GWEN_Tag16_GetTagDataAsUint32(tag, 0):0; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_MULTIDATA_TAGS_DATA); - numberOfPoints=(tag?GWEN_Tag16_GetTagLength(tag):0)/(2*sizeof(uint64_t)); - } + tag=AQH_Tag16IpcMsg_FindFirstTagByType(msg, AQH_MSGDATA_MULTIDATA_TAGS_DATA); + numberOfPoints=(tag?GWEN_Tag16_GetTagLength(tag):0)/(2*sizeof(uint64_t)); GWEN_Buffer_AppendArgs(dbuf, "MULTIDATA (code=%d, proto=%d, proto version=%d, name=%s, units=%s, type=%d, datapoints=%u)\n", @@ -87,9 +124,7 @@ void AQH_MultiDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf valueUnits?valueUnits:"", valueType, numberOfPoints); - free(valueUnits); - free(valueName); - GWEN_Tag16_List_free(tagList); + AQH_Value_free(value); } } diff --git a/aqhome/ipc/data/msg_data_multidata.h b/aqhome/ipc/data/msg_data_multidata.h index a404771..e63432f 100644 --- a/aqhome/ipc/data/msg_data_multidata.h +++ b/aqhome/ipc/data/msg_data_multidata.h @@ -17,16 +17,17 @@ #include -#define AQH_MSGDATA_MULTIDATA_TAGS_NAME 0x0001 -#define AQH_MSGDATA_MULTIDATA_TAGS_UNITS 0x0002 -#define AQH_MSGDATA_MULTIDATA_TAGS_TYPE 0x0003 -#define AQH_MSGDATA_MULTIDATA_TAGS_DATA 0x0010 +#define AQH_MSGDATA_MULTIDATA_TAGS_VALUE 0xc1 +#define AQH_MSGDATA_MULTIDATA_TAGS_DATA 0xc2 -AQHOME_API GWEN_MSG *AQH_MultiDataDataIpcMsg_new(uint16_t code, - const char *valueName, const char *valueUnits, int valueType, - const uint64_t *i64Ptr, int numOfDataPoints); +AQHOME_API GWEN_MSG *AQH_MultiDataDataIpcMsg_new(uint16_t code, const AQH_VALUE *value, const uint64_t *i64Ptr, int numOfDataPoints); +AQHOME_API GWEN_MSG *AQH_MultiDataDataIpcMsg_newForOne(uint16_t code, const AQH_VALUE *value, uint64_t timeStamp, double dataPoint); + +AQHOME_API void AQH_MultiDataDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy); +AQHOME_API AQH_VALUE *AQH_MultiDataDataIpcMsg_ReadValue(const GWEN_MSG *msg); +AQHOME_API AQH_VALUE *AQH_ValuesDataIpcMsg_ReadFirstValue(const GWEN_MSG *msg); AQHOME_API void AQH_MultiDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); diff --git a/aqhome/ipc/data/msg_data_singledata.c b/aqhome/ipc/data/msg_data_singledata.c index 9ae4e8c..75702d8 100644 --- a/aqhome/ipc/data/msg_data_singledata.c +++ b/aqhome/ipc/data/msg_data_singledata.c @@ -31,6 +31,7 @@ GWEN_MSG *AQH_SingleDataDataIpcMsg_new(uint16_t code, const char *valueName, const char *valueUnits, int valueType, + const char *deviceName, uint64_t timestamp, double datapoint) { @@ -45,6 +46,8 @@ GWEN_MSG *AQH_SingleDataDataIpcMsg_new(uint16_t code, if (valueUnits && *valueUnits) GWEN_Tag16_WriteStringTagToBuffer(AQH_MSGDATA_SINGLEDATA_TAGS_UNITS, valueUnits, buf); GWEN_Tag16_WriteUint32TagToBuffer(AQH_MSGDATA_SINGLEDATA_TAGS_TYPE, valueType, buf); + if (deviceName && *deviceName) + GWEN_Tag16_WriteStringTagToBuffer(AQH_MSGDATA_SINGLEDATA_TAGS_DEV, deviceName, buf); GWEN_Tag16_WriteUint64TagToBuffer(AQH_MSGDATA_SINGLEDATA_TAGS_TIME, timestamp, buf); GWEN_Tag16_WriteUint64TagToBuffer(AQH_MSGDATA_SINGLEDATA_TAGS_DATA, u.i, buf); @@ -56,50 +59,45 @@ GWEN_MSG *AQH_SingleDataDataIpcMsg_new(uint16_t code, +void AQH_SingleDataDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy) +{ + AQH_Tag16IpcMsg_ExtendAndParse(msg, doCopy); +} + + + void AQH_SingleDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) { if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSGDATA_SINGLEDATA_MINSIZE) { - GWEN_TAG16_LIST *tagList; char *valueName=NULL; char *valueUnits=NULL; int valueType; + char *deviceName=NULL; uint64_t timestamp; union {double f; uint64_t i;} u; - tagList=AQH_Tag16IpcMsg_ParseTags(msg, 0); - if (tagList) { - const GWEN_TAG16 *tag; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_NAME); - valueName=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_UNITS); - valueUnits=tag?GWEN_Tag16_GetTagDataAsNewString(tag, NULL):NULL; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_TYPE); - valueType=tag?GWEN_Tag16_GetTagDataAsUint32(tag, 0):0; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_TIME); - timestamp=tag?GWEN_Tag16_GetTagDataAsUint64(tag, 0):0; - - tag=GWEN_Tag16_List_FindFirstByTagType(tagList, AQH_MSGDATA_SINGLEDATA_TAGS_DATA); - u.i=tag?GWEN_Tag16_GetTagDataAsUint64(tag, 0):0; - } + valueName=AQH_Tag16IpcMsg_GetTagDataAsNewString(msg, AQH_MSGDATA_SINGLEDATA_TAGS_NAME, NULL); + valueUnits=AQH_Tag16IpcMsg_GetTagDataAsNewString(msg, AQH_MSGDATA_SINGLEDATA_TAGS_UNITS, NULL); + valueType=AQH_Tag16IpcMsg_GetTagDataAsUint32(msg, AQH_MSGDATA_SINGLEDATA_TAGS_TYPE, 0); + deviceName=AQH_Tag16IpcMsg_GetTagDataAsNewString(msg, AQH_MSGDATA_SINGLEDATA_TAGS_DEV, NULL); + timestamp=AQH_Tag16IpcMsg_GetTagDataAsUint64(msg, AQH_MSGDATA_SINGLEDATA_TAGS_TIME, 0); + u.i=AQH_Tag16IpcMsg_GetTagDataAsUint64(msg, AQH_MSGDATA_SINGLEDATA_TAGS_DATA, 0); GWEN_Buffer_AppendArgs(dbuf, - "SINGLEDATA (code=%d, proto=%d, proto version=%d, name=%s, units=%s, type=%d, time=%lu, data=%f)\n", + "SINGLEDATA (code=%d, proto=%d, proto version=%d, name=%s, units=%s, type=%d, dev=%s, time=%lu, data=%f)\n", GWEN_IpcMsg_GetCode(msg), GWEN_IpcMsg_GetProtoId(msg), GWEN_IpcMsg_GetProtoVersion(msg), valueName?valueName:"", valueUnits?valueUnits:"", valueType, + deviceName?deviceName:"", (unsigned long int) timestamp, u.f); + free(deviceName); free(valueUnits); free(valueName); - GWEN_Tag16_List_free(tagList); } } diff --git a/aqhome/ipc/data/msg_data_singledata.h b/aqhome/ipc/data/msg_data_singledata.h index 23d5b45..0793d9c 100644 --- a/aqhome/ipc/data/msg_data_singledata.h +++ b/aqhome/ipc/data/msg_data_singledata.h @@ -24,6 +24,7 @@ #define AQH_MSGDATA_SINGLEDATA_TAGS_NAME 0x0001 #define AQH_MSGDATA_SINGLEDATA_TAGS_UNITS 0x0002 #define AQH_MSGDATA_SINGLEDATA_TAGS_TYPE 0x0003 +#define AQH_MSGDATA_SINGLEDATA_TAGS_DEV 0x0004 #define AQH_MSGDATA_SINGLEDATA_TAGS_TIME 0x0010 #define AQH_MSGDATA_SINGLEDATA_TAGS_DATA 0x0011 @@ -33,9 +34,12 @@ AQHOME_API GWEN_MSG *AQH_SingleDataDataIpcMsg_new(uint16_t code, const char *valueName, const char *valueUnits, int valueType, + const char *deviceName, uint64_t timestamp, double datapoint); +AQHOME_API void AQH_SingleDataDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy); + AQHOME_API void AQH_SingleDataDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); diff --git a/aqhome/ipc/data/msg_data_value.c b/aqhome/ipc/data/msg_data_value.c index 5c4fbd0..3447b96 100644 --- a/aqhome/ipc/data/msg_data_value.c +++ b/aqhome/ipc/data/msg_data_value.c @@ -36,6 +36,7 @@ GWEN_MSG *AQH_ValueDataIpcMsg_new(uint16_t code, GWEN_BUFFER *buf; buf=GWEN_Buffer_new(0, 256, 0, 1); + if (valueName && *valueName) GWEN_Tag16_WriteStringTagToBuffer(AQH_MSGDATA_VALUE_TAGS_NAME, valueName, buf); if (valueUnits && *valueUnits) @@ -50,6 +51,20 @@ GWEN_MSG *AQH_ValueDataIpcMsg_new(uint16_t code, +void AQH_ValueDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy) +{ + AQH_Tag16IpcMsg_ExtendAndParse(msg, doCopy); +} + + + +AQH_VALUE *AQH_ValueDataIpcMsg_ReadValue(const GWEN_MSG *msg) +{ + //return AQH_DataIpc_ReadValueFromTagList(AQH_Tag16IpcMsg_GetTags(msg), AQH_MSGDATA_VALUES_TAGS_VALUE); +} + + + void AQH_ValueDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) { @@ -59,6 +74,7 @@ void AQH_ValueDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, co char *valueUnits=NULL; int valueType; + tagList=AQH_Tag16IpcMsg_ParseTags(msg, 0); if (tagList) { const GWEN_TAG16 *tag; diff --git a/aqhome/ipc/data/msg_data_value.h b/aqhome/ipc/data/msg_data_value.h index 92c2c08..1c7dd99 100644 --- a/aqhome/ipc/data/msg_data_value.h +++ b/aqhome/ipc/data/msg_data_value.h @@ -27,11 +27,15 @@ -AQHOME_API GWEN_MSG *AQH_ValueDataIpcMsg_new(uint16_t code, +AQHOME_API GWEN_DEPRECATED GWEN_MSG *AQH_ValueDataIpcMsg_new(uint16_t code, const char *valueName, const char *valueUnits, int valueType); +AQHOME_API void AQH_ValueDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy); +AQHOME_API AQH_VALUE *AQH_ValueDataIpcMsg_ReadValue(const GWEN_MSG *msg); + + AQHOME_API void AQH_ValueDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); diff --git a/aqhome/ipc/data/msg_data_values.c b/aqhome/ipc/data/msg_data_values.c index 9906405..6b2b3b8 100644 --- a/aqhome/ipc/data/msg_data_values.c +++ b/aqhome/ipc/data/msg_data_values.c @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -19,230 +20,113 @@ #include #include #include +#include #include -#define AQH_MSGDATA_VALUES_OFFS_FLAGS 0 /* 4 bytes */ -#define AQH_MSGDATA_VALUES_OFFS_NUMVALUES 4 /* 4 bytes */ - -#define AQH_MSGDATA_VALUES_OFFS_VALUES 8 /* 8 byte */ - - -#define AQH_MSGDATA_VALUES_VALUES_OFFS_ID 0 /* 8 byte */ -#define AQH_MSGDATA_VALUES_VALUES_OFFS_NAME 8 /* 104 byte */ -# define AQH_MSGDATA_VALUES_VALUES_SIZE_NAME 104 /* 104 byte */ -#define AQH_MSGDATA_VALUES_VALUES_OFFS_UNITS 112 /* 16 bytes */ -# define AQH_MSGDATA_VALUES_VALUES_SIZE_UNITS 16 -#define AQH_MSGDATA_VALUES_VALUES_SIZE 128 - - -#define AQH_MSGDATA_VALUES_MINSIZE (GWEN_MSGIPC_OFFS_PAYLOAD+AQH_MSGDATA_VALUES_OFFS_VALUES) +#define AQH_MSGDATA_VALUES_MINSIZE GWEN_MSGIPC_OFFS_PAYLOAD - -static void _writeValue(const AQH_VALUE *value, uint8_t *ptr, int useSystemName); +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ +/* ------------------------------------------------------------------------------------------------ + * code + * ------------------------------------------------------------------------------------------------ + */ -GWEN_MSG *AQH_ValuesDataIpcMsg_new(uint16_t code, uint32_t flags, const AQH_VALUE_LIST *valueList, int useSystemName) +GWEN_MSG *AQH_ValuesDataIpcMsg_new(uint16_t code, uint32_t flags, const AQH_VALUE_LIST *valueList) { GWEN_MSG *msg; - uint8_t *ptr; - int count; - int payloadSize; + GWEN_BUFFER *buf; + int rv; - count=valueList?AQH_Value_List_GetCount(valueList):0; - payloadSize=AQH_MSGDATA_VALUES_OFFS_VALUES+(count*AQH_MSGDATA_VALUES_VALUES_SIZE); + buf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Tag16_WriteUint32TagToBuffer(AQH_MSGDATA_VALUES_TAGS_FLAGS, flags, buf); + rv=AQH_DataIpc_WriteValueListAsTagsToBuffer(AQH_MSGDATA_VALUES_TAGS_VALUE, valueList, buf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + GWEN_Buffer_free(buf); + return NULL; + } - msg=GWEN_IpcMsg_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, code, payloadSize, NULL); - ptr=GWEN_Msg_GetBuffer(msg)+GWEN_MSGIPC_OFFS_PAYLOAD; - *(ptr++)=flags & 0xff; - *(ptr++)=(flags>>8) & 0xff; - *(ptr++)=(flags>>16) & 0xff; - *(ptr++)=(flags>>24) & 0xff; + msg=AQH_Tag16IpcMsg_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, code, + GWEN_Buffer_GetUsedBytes(buf), (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + return msg; +} - *(ptr++)=count & 0xff; - *(ptr++)=(count>>8) & 0xff; - *(ptr++)=(count>>16) & 0xff; - *(ptr++)=(count>>24) & 0xff; - if (count>0) { - const AQH_VALUE *value; - value=AQH_Value_List_First(valueList); - while(value) { - _writeValue(value, ptr, useSystemName); - ptr+=AQH_MSGDATA_VALUES_VALUES_SIZE; - value=AQH_Value_List_Next(value); +GWEN_MSG *AQH_ValuesDataIpcMsg_newForOneValue(uint16_t code, uint32_t flags, const AQH_VALUE *value) +{ + GWEN_MSG *msg; + GWEN_BUFFER *buf; + int rv; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Tag16_WriteUint32TagToBuffer(AQH_MSGDATA_VALUES_TAGS_FLAGS, flags, buf); + rv=AQH_DataIpc_WriteValueAsTagToBuffer(AQH_MSGDATA_VALUES_TAGS_VALUE, value, buf); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + GWEN_Buffer_free(buf); + return NULL; + } + + msg=AQH_Tag16IpcMsg_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, code, + GWEN_Buffer_GetUsedBytes(buf), (const uint8_t*) GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + return msg; +} + + + +void AQH_ValuesDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy) +{ + AQH_Tag16IpcMsg_ExtendAndParse(msg, doCopy); +} + + + +AQH_VALUE_LIST *AQH_ValuesDataIpcMsg_ReadValueList(const GWEN_MSG *msg) +{ + const GWEN_TAG16_LIST *tagList; + + tagList=AQH_Tag16IpcMsg_GetTags(msg); + if (tagList) { + AQH_VALUE_LIST *valueList; + + valueList=AQH_DataIpc_ReadValuesFromTagList(tagList, AQH_MSGDATA_VALUES_TAGS_VALUE); + if (valueList==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "No value list received"); } + return valueList; + } + else { + DBG_INFO(AQH_LOGDOMAIN, "No tag16 list received"); + return NULL; } - return msg; } -GWEN_MSG *AQH_ValuesDataIpcMsg_newForOneValue(uint16_t code, uint32_t flags, const AQH_VALUE *value, int useSystemName) +AQH_VALUE *AQH_ValuesDataIpcMsg_ReadFirstValue(const GWEN_MSG *msg) { - GWEN_MSG *msg; - uint8_t *ptr; - int count; - int payloadSize; - - count=1; - payloadSize=AQH_MSGDATA_VALUES_OFFS_VALUES+AQH_MSGDATA_VALUES_VALUES_SIZE; - - msg=GWEN_IpcMsg_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, code, payloadSize, NULL); - ptr=GWEN_Msg_GetBuffer(msg)+GWEN_MSGIPC_OFFS_PAYLOAD; - *(ptr++)=flags & 0xff; - *(ptr++)=(flags>>8) & 0xff; - *(ptr++)=(flags>>16) & 0xff; - *(ptr++)=(flags>>24) & 0xff; - - *(ptr++)=count & 0xff; - *(ptr++)=(count>>8) & 0xff; - *(ptr++)=(count>>16) & 0xff; - *(ptr++)=(count>>24) & 0xff; - - _writeValue(value, ptr, useSystemName); - - return msg; + return AQH_DataIpc_ReadValueFromTagList(AQH_Tag16IpcMsg_GetTags(msg), AQH_MSGDATA_VALUES_TAGS_VALUE); } -void _writeValue(const AQH_VALUE *value, uint8_t *ptr, int useSystemName) -{ - uint64_t i64; - const char *name; - const char *units; - - i64=AQH_Value_GetId(value); - name=useSystemName?AQH_Value_GetNameForSystem(value):AQH_Value_GetNameForDriver(value); - units=AQH_Value_GetValueUnits(value); - - *(ptr++)=i64 & 0xff; - *(ptr++)=(i64>>8) & 0xff; - *(ptr++)=(i64>>16) & 0xff; - *(ptr++)=(i64>>24) & 0xff; - *(ptr++)=(i64>>32) & 0xff; - *(ptr++)=(i64>>40) & 0xff; - *(ptr++)=(i64>>48) & 0xff; - *(ptr++)=(i64>>56) & 0xff; - if (name) { - strncpy((char*) ptr, name, AQH_MSGDATA_VALUES_VALUES_SIZE_NAME-1); - ptr[AQH_MSGDATA_VALUES_VALUES_SIZE_NAME-1]=0; - } - else - memset(ptr, 0, AQH_MSGDATA_VALUES_VALUES_SIZE_NAME); - ptr+=AQH_MSGDATA_VALUES_VALUES_SIZE_NAME; - - if (units) { - strncpy((char*) ptr, units, AQH_MSGDATA_VALUES_VALUES_SIZE_UNITS-1); - ptr[AQH_MSGDATA_VALUES_VALUES_SIZE_UNITS-1]=0; - } - else - memset(ptr, 0, AQH_MSGDATA_VALUES_VALUES_SIZE_UNITS); - ptr+=AQH_MSGDATA_VALUES_VALUES_SIZE_UNITS; -} - - uint32_t AQH_ValuesDataIpcMsg_GetFlags(const GWEN_MSG *msg) { - return GWEN_Msg_GetUint32At(msg, GWEN_MSGIPC_OFFS_PAYLOAD+AQH_MSGDATA_VALUES_OFFS_FLAGS, 0); -} - - - -uint32_t AQH_ValuesDataIpcMsg_GetNumValues(const GWEN_MSG *msg) -{ - return GWEN_Msg_GetUint32At(msg, GWEN_MSGIPC_OFFS_PAYLOAD+AQH_MSGDATA_VALUES_OFFS_NUMVALUES, 0); -} - - - -uint64_t AQH_ValuesDataIpcMsg_GetValueId(const GWEN_MSG *msg, int idx) -{ - uint32_t pos; - - pos= - AQH_MSGDATA_VALUES_OFFS_VALUES+ - (idx*AQH_MSGDATA_VALUES_VALUES_SIZE)+ - AQH_MSGDATA_VALUES_VALUES_OFFS_ID; - return GWEN_Msg_GetUint64At(msg, GWEN_MSGIPC_OFFS_PAYLOAD+pos, 0); -} - - - -const char *AQH_ValuesDataIpcMsg_GetValueName(const GWEN_MSG *msg, int idx) -{ - uint32_t pos; - - pos= - AQH_MSGDATA_VALUES_OFFS_VALUES+ - (idx*AQH_MSGDATA_VALUES_VALUES_SIZE)+ - AQH_MSGDATA_VALUES_VALUES_OFFS_NAME; - - if (GWEN_Msg_GetBytesInBuffer(msg)>=pos+GWEN_MSGIPC_OFFS_PAYLOAD) - return (const char*) (GWEN_Msg_GetConstBuffer(msg)+GWEN_MSGIPC_OFFS_PAYLOAD+pos); - return NULL; -} - - - -const char *AQH_ValuesDataIpcMsg_GetValueUnits(const GWEN_MSG *msg, int idx) -{ - uint32_t pos; - - pos= - AQH_MSGDATA_VALUES_OFFS_VALUES+ - (idx*AQH_MSGDATA_VALUES_VALUES_SIZE)+ - AQH_MSGDATA_VALUES_VALUES_OFFS_UNITS; - - if (GWEN_Msg_GetBytesInBuffer(msg)>=pos+GWEN_MSGIPC_OFFS_PAYLOAD) - return (const char*) (GWEN_Msg_GetConstBuffer(msg)+GWEN_MSGIPC_OFFS_PAYLOAD+pos); - return NULL; -} - - - -int AQH_ValuesDataIpcMsg_IsValid(const GWEN_MSG *msg) -{ - int msgLen; - int numValues; - const uint8_t *ptr; - int i; - - msgLen=GWEN_Msg_GetBytesInBuffer(msg); - if (msgLen=AQH_MSGDATA_VALUES_MINSIZE) { GWEN_Buffer_AppendArgs(dbuf, - "VALUES (code=%d, proto=%d, proto version=%d, flags=0x%08x, values=%d)\n", + "VALUES (code=%d, proto=%d, proto version=%d, flags=0x%08x)\n", GWEN_IpcMsg_GetCode(msg), GWEN_IpcMsg_GetProtoId(msg), GWEN_IpcMsg_GetProtoVersion(msg), - (unsigned int)AQH_ValuesDataIpcMsg_GetFlags(msg), - AQH_ValuesDataIpcMsg_GetNumValues(msg)); + (unsigned int)AQH_ValuesDataIpcMsg_GetFlags(msg)); } } @@ -265,4 +148,3 @@ void AQH_ValuesDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, c - diff --git a/aqhome/ipc/data/msg_data_values.h b/aqhome/ipc/data/msg_data_values.h index 0a85769..4fb4b69 100644 --- a/aqhome/ipc/data/msg_data_values.h +++ b/aqhome/ipc/data/msg_data_values.h @@ -23,19 +23,21 @@ #define AQH_MSGDATA_VALUES_FLAGS_LASTMSG 0x0001 +#define AQH_MSGDATA_VALUES_TAGS_FLAGS 0x01 +#define AQH_MSGDATA_VALUES_TAGS_VALUE 0xc2 -AQHOME_API GWEN_MSG *AQH_ValuesDataIpcMsg_new(uint16_t code, uint32_t flags, const AQH_VALUE_LIST *valueList, int useSystemName); -AQHOME_API GWEN_MSG *AQH_ValuesDataIpcMsg_newForOneValue(uint16_t code, uint32_t flags, const AQH_VALUE *value, int useSystemName); + +AQHOME_API GWEN_MSG *AQH_ValuesDataIpcMsg_new(uint16_t code, uint32_t flags, const AQH_VALUE_LIST *valueList); +AQHOME_API GWEN_MSG *AQH_ValuesDataIpcMsg_newForOneValue(uint16_t code, uint32_t flags, const AQH_VALUE *value); + +AQHOME_API void AQH_ValuesDataIpcMsg_Parse(GWEN_MSG *msg, int doCopy); + +AQHOME_API AQH_VALUE_LIST *AQH_ValuesDataIpcMsg_ReadValueList(const GWEN_MSG *msg); +AQHOME_API AQH_VALUE *AQH_ValuesDataIpcMsg_ReadFirstValue(const GWEN_MSG *msg); AQHOME_API uint32_t AQH_ValuesDataIpcMsg_GetFlags(const GWEN_MSG *msg); -AQHOME_API uint32_t AQH_ValuesDataIpcMsg_GetNumValues(const GWEN_MSG *msg); -AQHOME_API uint64_t AQH_ValuesDataIpcMsg_GetValueId(const GWEN_MSG *msg, int idx); -AQHOME_API const char *AQH_ValuesDataIpcMsg_GetValueName(const GWEN_MSG *msg, int idx); -AQHOME_API const char *AQH_ValuesDataIpcMsg_GetValueUnits(const GWEN_MSG *msg, int idx); - -AQHOME_API int AQH_ValuesDataIpcMsg_IsValid(const GWEN_MSG *msg); AQHOME_API void AQH_ValuesDataIpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); diff --git a/aqhome/ipc/msg_ipc_tag16.c b/aqhome/ipc/msg_ipc_tag16.c index 2aa9f5e..6376127 100644 --- a/aqhome/ipc/msg_ipc_tag16.c +++ b/aqhome/ipc/msg_ipc_tag16.c @@ -11,21 +11,199 @@ #endif -#include +#include "msg_ipc_tag16_p.h" #include #include +/* ------------------------------------------------------------------------------------------------ + * forward declarations + * ------------------------------------------------------------------------------------------------ + */ + +static void GWENHYWFAR_CB _freeData(void *bp, void *p); +static GWEN_TAG16_LIST *_parseTags(const GWEN_MSG *msg, int doCopy); + + + +/* ------------------------------------------------------------------------------------------------ + * implementations + * ------------------------------------------------------------------------------------------------ + */ + +GWEN_INHERIT(GWEN_MSG, AQH_MSG_IPC_TAG16); + + GWEN_MSG *AQH_Tag16IpcMsg_new(uint8_t protoId, uint8_t protoVer, uint16_t code, uint32_t payloadLen, const uint8_t *payload) { - return GWEN_IpcMsg_new(protoId, protoVer, code, payloadLen, payload); + GWEN_MSG *msg; + + msg=GWEN_IpcMsg_new(protoId, protoVer, code, payloadLen, payload); + AQH_Tag16IpcMsg_Extend(msg); + return msg; +} + + + +void AQH_Tag16IpcMsg_Extend(GWEN_MSG *msg) +{ + AQH_MSG_IPC_TAG16 *xmsg; + + GWEN_NEW_OBJECT(AQH_MSG_IPC_TAG16, xmsg); + GWEN_INHERIT_SETDATA(GWEN_MSG, AQH_MSG_IPC_TAG16, msg, xmsg, _freeData); +} + + + + +void _freeData(void *bp, void *p) +{ + AQH_MSG_IPC_TAG16 *xmsg; + + xmsg=(AQH_MSG_IPC_TAG16*) p; + GWEN_Tag16_List_free(xmsg->tagList); +} + + + +GWEN_TAG16_LIST *AQH_Tag16IpcMsg_GetTags(const GWEN_MSG *msg) +{ + if (msg) { + AQH_MSG_IPC_TAG16 *xmsg; + + xmsg=GWEN_INHERIT_GETDATA(GWEN_MSG, AQH_MSG_IPC_TAG16, msg); + if (xmsg) { + return xmsg->tagList; + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Not a Tag16IpcMsg message"); + return NULL; + } + } + else { + DBG_INFO(AQH_LOGDOMAIN, "NULLPOINTER"); + return NULL; + } +} + + + +void AQH_Tag16IpcMsg_ReadTags(GWEN_MSG *msg, int doCopy) +{ + if (msg) { + AQH_MSG_IPC_TAG16 *xmsg; + + xmsg=GWEN_INHERIT_GETDATA(GWEN_MSG, AQH_MSG_IPC_TAG16, msg); + if (xmsg) { + if (xmsg->tagList) { + DBG_INFO(AQH_LOGDOMAIN, "Tags already parsed"); + } + else { + xmsg->tagList=_parseTags(msg, doCopy); + if (xmsg->tagList==NULL) { + DBG_INFO(AQH_LOGDOMAIN, "No tags received"); + } + } + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Not a Tag16IpcMsg message"); + } + } + else { + DBG_INFO(AQH_LOGDOMAIN, "NULLPOINTER"); + } +} + + + +void AQH_Tag16IpcMsg_ExtendAndParse(GWEN_MSG *msg, int doCopy) +{ + if (!(GWEN_INHERIT_ISOFTYPE(GWEN_MSG, AQH_MSG_IPC_TAG16, msg))) + AQH_Tag16IpcMsg_Extend(msg); + AQH_Tag16IpcMsg_ReadTags(msg, doCopy); +} + + + +char *AQH_Tag16IpcMsg_GetTagDataAsNewString(const GWEN_MSG *msg, unsigned int tagType, const char *defaultValue) +{ + if (msg) { + AQH_MSG_IPC_TAG16 *xmsg; + + xmsg=GWEN_INHERIT_GETDATA(GWEN_MSG, AQH_MSG_IPC_TAG16, msg); + if (xmsg && xmsg->tagList) { + const GWEN_TAG16 *tag; + + tag=GWEN_Tag16_List_FindFirstByTagType(xmsg->tagList, tagType); + if (tag) + return GWEN_Tag16_GetTagDataAsNewString(tag, defaultValue); + } + } + return defaultValue?strdup(defaultValue):NULL; +} + + + +uint32_t AQH_Tag16IpcMsg_GetTagDataAsUint32(const GWEN_MSG *msg, unsigned int tagType, uint32_t defaultValue) +{ + if (msg) { + AQH_MSG_IPC_TAG16 *xmsg; + + xmsg=GWEN_INHERIT_GETDATA(GWEN_MSG, AQH_MSG_IPC_TAG16, msg); + if (xmsg && xmsg->tagList) { + const GWEN_TAG16 *tag; + + tag=GWEN_Tag16_List_FindFirstByTagType(xmsg->tagList, tagType); + return tag?GWEN_Tag16_GetTagDataAsUint32(tag, defaultValue):defaultValue; + } + } + return defaultValue; +} + + + +uint64_t AQH_Tag16IpcMsg_GetTagDataAsUint64(const GWEN_MSG *msg, unsigned int tagType, uint64_t defaultValue) +{ + if (msg) { + AQH_MSG_IPC_TAG16 *xmsg; + + xmsg=GWEN_INHERIT_GETDATA(GWEN_MSG, AQH_MSG_IPC_TAG16, msg); + if (xmsg && xmsg->tagList) { + const GWEN_TAG16 *tag; + + tag=GWEN_Tag16_List_FindFirstByTagType(xmsg->tagList, tagType); + return tag?GWEN_Tag16_GetTagDataAsUint64(tag, defaultValue):defaultValue; + } + } + return defaultValue; +} + + + +const GWEN_TAG16 *AQH_Tag16IpcMsg_FindFirstTagByType(const GWEN_MSG *msg, unsigned int tagType) +{ + if (msg) { + AQH_MSG_IPC_TAG16 *xmsg; + + xmsg=GWEN_INHERIT_GETDATA(GWEN_MSG, AQH_MSG_IPC_TAG16, msg); + if (xmsg && xmsg->tagList) + return GWEN_Tag16_List_FindFirstByTagType(xmsg->tagList, tagType); + } + return NULL; } GWEN_TAG16_LIST *AQH_Tag16IpcMsg_ParseTags(const GWEN_MSG *msg, int doCopy) +{ + return _parseTags(msg, doCopy); +} + + + +GWEN_TAG16_LIST *_parseTags(const GWEN_MSG *msg, int doCopy) { uint32_t msgSize; diff --git a/aqhome/ipc/msg_ipc_tag16.h b/aqhome/ipc/msg_ipc_tag16.h index 510a262..5e8465f 100644 --- a/aqhome/ipc/msg_ipc_tag16.h +++ b/aqhome/ipc/msg_ipc_tag16.h @@ -18,9 +18,24 @@ AQHOME_API GWEN_MSG *AQH_Tag16IpcMsg_new(uint8_t protoId, uint8_t protoVer, uint16_t code, uint32_t payloadLen, const uint8_t *payload); -AQHOME_API GWEN_TAG16_LIST *AQH_Tag16IpcMsg_ParseTags(const GWEN_MSG *msg, int doCopy); +AQHOME_API void AQH_Tag16IpcMsg_ExtendAndParse(GWEN_MSG *msg, int doCopy); + +AQHOME_API void AQH_Tag16IpcMsg_Extend(GWEN_MSG *msg); +AQHOME_API void AQH_Tag16IpcMsg_ReadTags(GWEN_MSG *msg, int doCopy); + +AQHOME_API GWEN_TAG16_LIST *AQH_Tag16IpcMsg_GetTags(const GWEN_MSG *msg); + +AQHOME_API GWEN_DEPRECATED GWEN_TAG16_LIST *AQH_Tag16IpcMsg_ParseTags(const GWEN_MSG *msg, int doCopy); AQHOME_API void AQH_Tag16IpcMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); +AQHOME_API const GWEN_TAG16 *AQH_Tag16IpcMsg_FindFirstTagByType(const GWEN_MSG *msg, unsigned int tagType); + +AQHOME_API char *AQH_Tag16IpcMsg_GetTagDataAsNewString(const GWEN_MSG *msg, unsigned int tagType, const char *defaultValue); +AQHOME_API uint32_t AQH_Tag16IpcMsg_GetTagDataAsUint32(const GWEN_MSG *msg, unsigned int tagType, uint32_t defaultValue); +AQHOME_API uint64_t AQH_Tag16IpcMsg_GetTagDataAsUint64(const GWEN_MSG *msg, unsigned int tagType, uint64_t defaultValue); + + + #endif diff --git a/aqhome/ipc/msg_ipc_tag16_p.h b/aqhome/ipc/msg_ipc_tag16_p.h new file mode 100644 index 0000000..00da2be --- /dev/null +++ b/aqhome/ipc/msg_ipc_tag16_p.h @@ -0,0 +1,25 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_IPC_TAG16_P_H +#define AQH_MSG_IPC_TAG16_P_H + + +#include + + +typedef struct AQH_MSG_IPC_TAG16 AQH_MSG_IPC_TAG16; +struct AQH_MSG_IPC_TAG16 { + GWEN_TAG16_LIST *tagList; +}; + + +#endif + + + diff --git a/aqhome/libtest.c b/aqhome/libtest.c index fc74c69..17da2f7 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -113,7 +114,6 @@ int testMqttConnection2() AQH_Init(); epClient=AQH_MqttClientEndpoint_new("TESTCLIENT1234", "127.0.0.1", 1883, NULL, 1); - for (loop=0;; loop++) { DBG_INFO(GWEN_LOGDOMAIN, "Loop %d:", loop); GWEN_MsgEndpoint_IoLoop(epClient, 2000); /* 2000 ms */ @@ -199,6 +199,55 @@ int testMqttSubscribe2(int argc, char **argv) +int testMqttSubscribe3(int argc, char **argv) +{ + GWEN_MSG_ENDPOINT *epClient; + int rv; + //const char *host="127.0.0.1"; + const char *host="192.168.117.192"; + + AQH_Init(); + + if (argc>1) + host=argv[1]; + + DBG_ERROR(AQH_LOGDOMAIN, "Connecting to %s (%s)", host, argv[1]); + epClient=AQH_MqttClientEndpoint_new("TESTCLIENT1234", host, 1883, NULL, 1); + GWEN_MsgEndpoint_AddFlags(epClient, AQH_ENDPOINT2_MQTTCLIENT_FLAGS_SUBSCRIBEALL); + rv=GWEN_MultilayerEndpoint_StartConnect(epClient); + if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { + DBG_ERROR(NULL, "Error on startConnect: %d", rv); + return 2; + } + + for (;;) { + GWEN_MSG *msg; + + GWEN_MsgEndpoint_IoLoop(epClient, 2000); /* 2000 ms */ + msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epClient); + if (msg) { + if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) { + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_PublishMqttMsg_DumpToBuffer(msg, buf, "received"); + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + } + else { + DBG_ERROR(NULL, "Received this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); + } + GWEN_Msg_free(msg); + } + } + + + return 0; +} + + + int _mqttConnect2(GWEN_MSG_ENDPOINT *epClient) { int loop; @@ -423,6 +472,7 @@ int main(int argc, char **argv) //return testMqttConnection2(); //return testMqttSubscribe2(argc, argv); //return testHttpDaemon(); + return testMqttSubscribe3(argc, argv); return 0; } diff --git a/aqhome/mqtt/endpoint_mqttc.c b/aqhome/mqtt/endpoint_mqttc.c index cf33ac6..32f882d 100644 --- a/aqhome/mqtt/endpoint_mqttc.c +++ b/aqhome/mqtt/endpoint_mqttc.c @@ -16,9 +16,12 @@ #include "aqhome/mqtt/msg_mqtt_connect.h" #include "aqhome/mqtt/msg_mqtt_connack.h" #include "aqhome/mqtt/msg_mqtt_publish.h" +#include "aqhome/mqtt/msg_mqtt_subscribe.h" +#include "aqhome/mqtt/msg_mqtt_suback.h" #include #include +#include #include #include @@ -27,6 +30,12 @@ #define AQH_ENDPOINT2_MQTTC_RECONNECT_TIME 5 #define AQH_ENDPOINT2_MQTTC_CONNECT_TIMEOUT 10 +#define AQH_ENDPOINT2_MQTTC_STAGE_NONE 0 +#define AQH_ENDPOINT2_MQTTC_STAGE_CONNREQ 1 +#define AQH_ENDPOINT2_MQTTC_STAGE_SUBREQ 2 +#define AQH_ENDPOINT2_MQTTC_STAGE_UP 10 + + /* ------------------------------------------------------------------------------------------------ @@ -34,22 +43,15 @@ * ------------------------------------------------------------------------------------------------ */ -static void _addSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_UNUSED GWEN_SOCKETSET *xSet); -static void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); -static int _startConnect(GWEN_MSG_ENDPOINT *ep); -static void _moveMessagesBetweenLists(GWEN_MSG_LIST *srcList, GWEN_MSG_LIST *dstList); - -static void _addSocketsWhenUnconnected(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, - GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); -static void _addSocketsWhenConnecting(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, - GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); -static void _addSocketsWhenConnected(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, - GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); - -static void _checkSocketsWhenConnecting(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, - GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); -static void _checkSocketsWhenConnected(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, - GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); +static int _startConnect(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild); +static void _checkSockets(GWEN_MSG_ENDPOINT *ep, + GWEN_MSG_ENDPOINT *epChild, + GWEN_SOCKETSET *readSet, + GWEN_SOCKETSET *writeSet, + GWEN_SOCKETSET *xSet); +static void _lookForAndHandleConnAck(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild); +static void _lookForAndHandleSubAck(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild); +static int _sendSubscribeMsg(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, const char *topicFilter); @@ -67,9 +69,11 @@ GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *clientId, GWEN_MSG_ENDPOINT *ep; GWEN_MSG_ENDPOINT *epChild; - ep=GWEN_MsgEndpoint_new(name?name:AQH_ENDPOINT2_MQTT_NAME, groupId); - GWEN_MsgEndpoint_SetAddSocketsFn(ep, _addSockets); - GWEN_MsgEndpoint_SetCheckSocketsFn(ep, _checkSockets); + ep=GWEN_MultilayerEndpoint_new(name?name:AQH_ENDPOINT2_MQTT_NAME, groupId); + GWEN_MultilayerEndpoint_SetConnectTimeoutInSeconds(ep, AQH_ENDPOINT2_MQTTC_CONNECT_TIMEOUT); + GWEN_MultilayerEndpoint_SetReconnectTimeInSeconds(ep, AQH_ENDPOINT2_MQTTC_RECONNECT_TIME); + GWEN_MultilayerEndpoint_SetStartConnectFn(ep, _startConnect); + GWEN_MultilayerEndpoint_SetCheckSocketsFn(ep, _checkSockets); epChild=GWEN_TcpcEndpoint_new(host, port, NULL, groupId); GWEN_MsgIoEndpoint_Extend(epChild); @@ -82,36 +86,6 @@ GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *clientId, -int AQH_MqttClientEndpoint_StartConnect(GWEN_MSG_ENDPOINT *ep) -{ - if (ep) { - if (GWEN_MsgEndpoint_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { - int rv; - - /* connect, set state */ - rv=_startConnect(ep); - if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { - DBG_INFO(AQH_LOGDOMAIN, "Endpoint %s: Error connecting (%d)", GWEN_MsgEndpoint_GetName(ep), rv); - GWEN_MsgEndpoint_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTING); - } - else { - DBG_INFO(AQH_LOGDOMAIN, "Endpoint %s: Connecting.", GWEN_MsgEndpoint_GetName(ep)); - GWEN_MsgEndpoint_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTING); - } - return rv; - } - else { - DBG_ERROR(AQH_LOGDOMAIN, "Endpoint %s: Not unconnected", GWEN_MsgEndpoint_GetName(ep)); - } - } - else { - DBG_ERROR(GWEN_LOGDOMAIN, "No endpoint"); - } - return GWEN_ERROR_GENERIC; -} - - - uint16_t AQH_MqttClientEndpoint_GetKeepAliveTime(const GWEN_MSG_ENDPOINT *ep) { if (ep) { @@ -155,152 +129,52 @@ uint16_t AQH_MqttClientEndpoint_GetNextPacketId(const GWEN_MSG_ENDPOINT *ep) - -void _moveMessagesBetweenLists(GWEN_MSG_LIST *srcList, GWEN_MSG_LIST *dstList) +void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, + GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) { - GWEN_MSG *msg; - - while( (msg=GWEN_Msg_List_First(srcList)) ) { - GWEN_Msg_List_Del(msg); - GWEN_Msg_List_Add(msg, dstList); - } -} - - - -void _addSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) -{ - if (ep) { - GWEN_MSG_ENDPOINT *epChild; - - epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep); - if (epChild) { - if (GWEN_MsgEndpoint_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) - _addSocketsWhenUnconnected(ep, epChild, readSet, writeSet, xSet); - else { - if (GWEN_MsgEndpoint_GetState(epChild)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { - DBG_ERROR(AQH_LOGDOMAIN, "Error on tcp layer, disconnecting"); - GWEN_MsgEndpoint_Disconnect(epChild); - GWEN_MsgEndpoint_Disconnect(ep); - } - else { - if (GWEN_MsgEndpoint_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_CONNECTING) - _addSocketsWhenConnecting(ep, epChild, readSet, writeSet, xSet); - if (GWEN_MsgEndpoint_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_CONNECTED) - _addSocketsWhenConnected(ep, epChild, readSet, writeSet, xSet); - } - } - } /* if (epChild) */ - } /* if (ep) */ -} - - - -void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) -{ - DBG_DEBUG(AQH_LOGDOMAIN, "Checking sockets in state %d", GWEN_MsgEndpoint_GetState(ep)); - if (ep) { - GWEN_MSG_ENDPOINT *epChild; - - epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep); - if (epChild) { - if (GWEN_MsgEndpoint_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { - /* nothing to do here */ - } /* if GWEN_MSG_ENDPOINT_STATE_UNCONNECTED */ - else { - if (GWEN_MsgEndpoint_GetState(epChild)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { - DBG_ERROR(AQH_LOGDOMAIN, "Error on tcp layer, disconnecting"); - GWEN_MsgEndpoint_Disconnect(epChild); - GWEN_MsgEndpoint_Disconnect(ep); - } - else { - if (GWEN_MsgEndpoint_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_CONNECTING) - _checkSocketsWhenConnecting(ep, epChild, readSet, writeSet, xSet); - else if (GWEN_MsgEndpoint_GetState(ep)==GWEN_MSG_ENDPOINT_STATE_CONNECTED) - _checkSocketsWhenConnected(ep, epChild, readSet, writeSet, xSet); - } - } - } - } -} - - - -void _addSocketsWhenUnconnected(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, - GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) -{ - time_t now; - - now=time(NULL); - if ((now-GWEN_MsgEndpoint_GetTimeOfLastStateChange(ep))>=AQH_ENDPOINT2_MQTTC_RECONNECT_TIME) { - int rv; - - /* (re)connect, set state */ - DBG_INFO(AQH_LOGDOMAIN, "Starting to (re-)connect"); - rv=AQH_MqttClientEndpoint_StartConnect(ep); - if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { - DBG_INFO(GWEN_LOGDOMAIN, "here (%d)", rv); - } - } -} - - - -void _addSocketsWhenConnecting(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, - GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) -{ - time_t now; - - now=time(NULL); - if ((now-GWEN_MsgEndpoint_GetTimeOfLastStateChange(ep))>=AQH_ENDPOINT2_MQTTC_CONNECT_TIMEOUT || - GWEN_MsgEndpoint_GetState(epChild)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { - DBG_ERROR(AQH_LOGDOMAIN, "Timeout on connect"); - GWEN_MsgEndpoint_Disconnect(epChild); - GWEN_MsgEndpoint_Disconnect(ep); - } - else - GWEN_MsgEndpoint_AddSockets(epChild, readSet, writeSet, xSet); -} - - - -void _addSocketsWhenConnected(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, - GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) -{ - if (GWEN_MsgEndpoint_GetState(epChild)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { - DBG_ERROR(AQH_LOGDOMAIN, "Error on tcp layer, disconnecting"); - GWEN_MsgEndpoint_Disconnect(epChild); - GWEN_MsgEndpoint_Disconnect(ep); - } - else { - /* move to-send messages to child */ - _moveMessagesBetweenLists(GWEN_MsgEndpoint_GetSendMessageList(ep), GWEN_MsgEndpoint_GetSendMessageList(epChild)); - GWEN_MsgEndpoint_AddSockets(epChild, readSet, writeSet, xSet); - } -} - - - -void _checkSocketsWhenConnected(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, - GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) -{ - _moveMessagesBetweenLists(GWEN_MsgEndpoint_GetSendMessageList(ep), GWEN_MsgEndpoint_GetSendMessageList(epChild)); - GWEN_MsgEndpoint_CheckSockets(epChild, readSet, writeSet, xSet); - _moveMessagesBetweenLists(GWEN_MsgEndpoint_GetReceivedMessageList(epChild), GWEN_MsgEndpoint_GetReceivedMessageList(ep)); -} - - - - - - -void _checkSocketsWhenConnecting(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, - GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) -{ - GWEN_MSG *msg; + int stage; GWEN_MsgEndpoint_CheckSockets(epChild, readSet, writeSet, xSet); /* let base layer work */ + stage=GWEN_MultilayerEndpoint_GetStage(ep); + switch(stage) { + case AQH_ENDPOINT2_MQTTC_STAGE_CONNREQ: _lookForAndHandleConnAck(ep, epChild); break; + case AQH_ENDPOINT2_MQTTC_STAGE_SUBREQ: _lookForAndHandleSubAck(ep, epChild); break; + default: break; + } +} + + + +int _startConnect(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild) +{ + if (epChild) { + int rv; + GWEN_MSG *msg; + + rv=GWEN_TcpcEndpoint_StartConnect(epChild); + if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { + DBG_INFO(AQH_LOGDOMAIN, "Error starting to connect child layer (%d)", rv); + return rv; + } + msg=AQH_MqttEndpoint_CreateMsgConnect(epChild); + if (msg) { + GWEN_MsgEndpoint_AddSendMessage(epChild, msg); + GWEN_MsgEndpoint_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTING); + GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_CONNREQ); + return rv; /* result from GWEN_TcpcEndpoint_StartConnect() above */ + } + } + return GWEN_ERROR_GENERIC; +} + + + + +void _lookForAndHandleConnAck(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild) +{ + GWEN_MSG *msg; + msg=GWEN_MsgEndpoint_GetFirstReceivedMessage(epChild); while(msg) { GWEN_MSG *msgNext; @@ -308,17 +182,38 @@ void _checkSocketsWhenConnecting(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChi msgNext=GWEN_Msg_List_Next(msg); msgType=AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0; + if (msgType==AQH_MQTTMSG_MSGTYPE_CONNACK) { int code; GWEN_Msg_List_Del(msg); /* remove from list */ code=AQH_ConnAckMqttMsg_GetResultCode(msg); if (code==AQH_MQTTMSG_CONNACK_RESULT_ACCEPTED) { - DBG_INFO(AQH_LOGDOMAIN, "Positive CONNACK response, connected"); - GWEN_MsgEndpoint_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTED); + DBG_INFO(AQH_LOGDOMAIN, "Positive CONNACK response"); + if (GWEN_MsgEndpoint_GetFlags(ep) & AQH_ENDPOINT2_MQTTCLIENT_FLAGS_SUBSCRIBEALL) { + int rv; + + DBG_INFO(AQH_LOGDOMAIN, "Sending subscribe message"); + rv=_sendSubscribeMsg(ep, epChild, "#"); + if (rv<0) { + DBG_ERROR(AQH_LOGDOMAIN, "Error sending SUBSCRIBE request (%d)", rv); + GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_NONE); + GWEN_MsgEndpoint_Disconnect(epChild); + GWEN_MsgEndpoint_Disconnect(ep); + } + else { + GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_SUBREQ); + } + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Connected (no auto-subscription requested)"); + GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_UP); + GWEN_MsgEndpoint_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTED); + } } else { DBG_ERROR(AQH_LOGDOMAIN, "Negative CONNACK response (%d)", code); + GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_NONE); GWEN_MsgEndpoint_Disconnect(epChild); GWEN_MsgEndpoint_Disconnect(ep); } @@ -334,30 +229,64 @@ void _checkSocketsWhenConnecting(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChi -int _startConnect(GWEN_MSG_ENDPOINT *ep) +void _lookForAndHandleSubAck(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild) { - GWEN_MSG_ENDPOINT *epChild; + GWEN_MSG *msg; - epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep); - if (epChild) { - int rv; - GWEN_MSG *msg; + msg=GWEN_MsgEndpoint_GetFirstReceivedMessage(epChild); + while(msg) { + GWEN_MSG *msgNext; + uint8_t msgType; - rv=GWEN_TcpcEndpoint_StartConnect(epChild); - if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { - DBG_INFO(AQH_LOGDOMAIN, "Error starting to connect child layer (%d)", rv); - return rv; + msgNext=GWEN_Msg_List_Next(msg); + msgType=AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0; + + if (msgType==AQH_MQTTMSG_MSGTYPE_SUBACK) { + int code; + + GWEN_Msg_List_Del(msg); /* remove from list */ + code=AQH_SubAckMqttMsg_GetResultCode(msg); + if (code!=128) { + DBG_INFO(AQH_LOGDOMAIN, "Positive SUBACK response, connected (%d, %02x)", code, code); + GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_UP); + GWEN_MsgEndpoint_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTED); + } + else { + DBG_ERROR(AQH_LOGDOMAIN, "Negative SUBACK response (%d)", code); + GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_NONE); + GWEN_MsgEndpoint_Disconnect(epChild); + GWEN_MsgEndpoint_Disconnect(ep); + } + GWEN_Msg_free(msg); + break; } - msg=AQH_MqttEndpoint_CreateMsgConnect(epChild); - if (msg) { - GWEN_MsgEndpoint_AddSendMessage(epChild, msg); - GWEN_MsgEndpoint_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTING); - return rv; /* result from GWEN_TcpcEndpoint_StartConnect() above */ + else { + DBG_ERROR(AQH_LOGDOMAIN, "Ignoring response (%d)", msgType); } + msg=msgNext; + } /* while */ +} + + + +int _sendSubscribeMsg(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, const char *topicFilter) +{ + uint16_t pckId; + GWEN_MSG *msgOut; + + DBG_INFO(AQH_LOGDOMAIN, "Sending SUBSCRIBE %s", topicFilter); + pckId=AQH_MqttClientEndpoint_GetNextPacketId(ep); + msgOut=GWEN_SubscribeMqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE, pckId, topicFilter, 0); + if (msgOut==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error creating message"); + return GWEN_ERROR_INTERNAL; } - return GWEN_ERROR_GENERIC; + GWEN_MsgEndpoint_AddSendMessage(epChild, msgOut); + + return 0; } + diff --git a/aqhome/mqtt/endpoint_mqttc.h b/aqhome/mqtt/endpoint_mqttc.h index f03d7b3..7064178 100644 --- a/aqhome/mqtt/endpoint_mqttc.h +++ b/aqhome/mqtt/endpoint_mqttc.h @@ -19,6 +19,8 @@ extern "C" { #endif +#define AQH_ENDPOINT2_MQTTCLIENT_FLAGS_SUBSCRIBEALL 0x0001 + /** */ diff --git a/aqhome/mqtt/msg_mqtt.c b/aqhome/mqtt/msg_mqtt.c index 3bb8345..7926f2b 100644 --- a/aqhome/mqtt/msg_mqtt.c +++ b/aqhome/mqtt/msg_mqtt.c @@ -13,6 +13,7 @@ #include "aqhome/mqtt/msg_mqtt.h" +#include #include #include @@ -84,6 +85,10 @@ int AQH_MqttMsg_IsMsgComplete(const GWEN_MSG *msg) void AQH_MqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) { + GWEN_Text_DumpString2Buffer((const char*) GWEN_Msg_GetConstBuffer(msg), + GWEN_Msg_GetBytesInBuffer(msg), + dbuf, + 2); } diff --git a/aqhome/mqtt/msg_mqtt_suback.c b/aqhome/mqtt/msg_mqtt_suback.c index fb2c180..a35de15 100644 --- a/aqhome/mqtt/msg_mqtt_suback.c +++ b/aqhome/mqtt/msg_mqtt_suback.c @@ -41,6 +41,38 @@ GWEN_MSG *GWEN_SubAckMqttMsg_new(uint8_t flags, uint16_t packetId, uint8_t resul +int AQH_SubAckMqttMsg_GetResultCode(const GWEN_MSG *msg) +{ + const uint8_t *msgPtr; + uint32_t msgLen; + + msgPtr=GWEN_Msg_GetConstBuffer(msg); + msgLen=GWEN_Msg_GetBytesInBuffer(msg); + + if (msgLen>1) { + uint32_t flags; + uint32_t payloadLen; + const uint8_t *payloadPtr; + + flags=AQH_MqttMsg_GetMsgTypeAndFlags(msg); + payloadLen=GWEN_Msg_GetParsedPayloadSize(msg); + payloadPtr=msgPtr+GWEN_Msg_GetParsedPayloadOffset(msg); + + if (payloadLen>0) { + if (flags & (AQH_MQTTMSG_FLAGS_QOS2 | AQH_MQTTMSG_FLAGS_QOS1)) { + payloadLen--; + payloadPtr++; + } + if (payloadLen>1) + return (int) (payloadPtr[0]); + } + } + + return GWEN_ERROR_GENERIC; +} + + + void AQH_SubAckMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) { const uint8_t *msgPtr; diff --git a/aqhome/mqtt/msg_mqtt_suback.h b/aqhome/mqtt/msg_mqtt_suback.h index ecc4da1..901147a 100644 --- a/aqhome/mqtt/msg_mqtt_suback.h +++ b/aqhome/mqtt/msg_mqtt_suback.h @@ -21,6 +21,7 @@ AQHOME_API GWEN_MSG *GWEN_SubAckMqttMsg_new(uint8_t flags, uint16_t packetId, uint8_t result); AQHOME_API void AQH_SubAckMqttMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); +AQHOME_API int AQH_SubAckMqttMsg_GetResultCode(const GWEN_MSG *msg);