Files
aqhomecontrol/aqhome/dataclient/client.c
Martin Preuss d0c8b3b284 let setData use double values instead of strings.
this allows for storing value set with setData which can then be used in
the cgi module to retrieve the last value set.
2025-10-07 23:50:50 +02:00

576 lines
16 KiB
C

/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./client_p.h"
#include "aqhome/aqhome.h"
#include "aqhome/msg/ipc/m_ipc.h"
#include "aqhome/msg/ipc/m_ipc_tag16.h"
#include "aqhome/msg/ipc/m_ipc_result.h"
#include "aqhome/msg/ipc/data/m_ipcd.h"
#include "aqhome/msg/ipc/nodes/m_ipcn.h"
#include "aqhome/msg/ipc/m_ipc_connect.h"
#include "aqhome/msg/ipc/data/m_ipcd_devices.h"
#include "aqhome/msg/ipc/data/m_ipcd_values.h"
#include "aqhome/msg/ipc/data/m_ipcd_getvalues.h"
#include "aqhome/msg/ipc/data/m_ipcd_getdevices.h"
#include "aqhome/msg/ipc/data/m_ipcd_getdata.h"
#include "aqhome/msg/ipc/data/m_ipcd_multidata.h"
#include "aqhome/msg/ipc/data/m_ipcd_setdata.h"
#include "aqhome/ipc2/tcp_object.h"
#include "aqhome/ipc2/ipc_client.h"
#include <aqhome/ipc2/ipc_endpoint.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/timestamp.h>
#include <gwenhywfar/db.h>
#include <gwenhywfar/i18n.h>
#include <gwenhywfar/text.h>
#define AQH_DATA_CLIENT_DEFAULT_CMD_TIMEOUT 5
static int _connectEndpoint(AQH_DATACLIENT *dc, const char *addr, int port, uint32_t flags);
static int _exchangeConnectMsgs(AQH_DATACLIENT *dc, const char *userId, const char *passwd, const char *clientId, uint32_t flags);
static uint64_t _getFirstOrLastData(AQH_DATACLIENT *dc, const char *valueName, uint64_t *dataPtr, uint64_t maxNum, int mode);
static uint64_t _handleDataResponses(AQH_DATACLIENT *dc, uint64_t *dataPtr, uint64_t maxNum, uint32_t msgId);
static int _handleResult(AQH_DATACLIENT *dc, uint32_t msgId);
AQH_DATACLIENT *AQH_DataClient_new(AQH_EVENT_LOOP *eventLoop, uint8_t protoId, uint8_t protoVer)
{
AQH_DATACLIENT *dc;
GWEN_NEW_OBJECT(AQH_DATACLIENT, dc);
dc->eventLoop=eventLoop;
dc->protoId=protoId;
dc->protoVer=protoVer;
dc->timeoutInSeconds=AQH_DATA_CLIENT_DEFAULT_CMD_TIMEOUT;
return dc;
}
void AQH_DataClient_free(AQH_DATACLIENT *dc)
{
if (dc) {
AQH_Object_free(dc->ipcEndpoint);
GWEN_FREE_OBJECT(dc);
}
}
int AQH_DataClient_ReadLocalArgs(AQH_DATACLIENT *dc,
GWEN_DB_NODE *dbGlobalArgs, const GWEN_ARGS *args,
int argc, char **argv)
{
if (dc) {
int rv;
GWEN_DB_Group_free(dc->dbLocalArgs);
dc->dbLocalArgs=GWEN_DB_GetGroup(dbGlobalArgs, GWEN_DB_FLAGS_DEFAULT, "local");
rv=GWEN_Args_Check(argc, argv, 1, GWEN_ARGS_MODE_ALLOW_FREEPARAM, args, dc->dbLocalArgs);
if (rv==GWEN_ARGS_RESULT_ERROR) {
fprintf(stderr, "ERROR: Could not parse arguments\n");
return 1;
}
else if (rv==GWEN_ARGS_RESULT_HELP) {
GWEN_BUFFER *ubuf;
ubuf=GWEN_Buffer_new(0, 1024, 0, 1);
if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) {
fprintf(stderr, "ERROR: Could not create help string\n");
return 1;
}
fprintf(stderr, "%s\n", GWEN_Buffer_GetStart(ubuf));
GWEN_Buffer_free(ubuf);
return 1;
}
dc->timeoutInSeconds=GWEN_DB_GetIntValue(dc->dbLocalArgs, "timeout", 0, 5);
AQH_MergeConfigFileIntoConfig(dc->dbLocalArgs, "ConfigFile");
return 0;
}
return GWEN_ERROR_INVALID;
}
int AQH_DataClient_ReadConfigFile(AQH_DATACLIENT *dc)
{
GWEN_DB_NODE *dbConfig;
dbConfig=AQH_LoadConfigFile();
if (dbConfig) {
GWEN_DB_Group_free(dc->dbLocalArgs);
dc->dbLocalArgs=dbConfig;
return 0;
}
return GWEN_ERROR_GENERIC;
}
GWEN_DB_NODE *AQH_DataClient_GetDbLocalArgs(const AQH_DATACLIENT *dc)
{
return dc?dc->dbLocalArgs:NULL;
}
int AQH_DataClient_Connect(AQH_DATACLIENT *dc,
const char *addr, int port,
const char *userId, const char *passwd,
const char *clientId,
uint32_t flags)
{
if (dc) {
int rv;
AQH_Object_free(dc->ipcEndpoint);
dc->ipcEndpoint=NULL;
rv=_connectEndpoint(dc, addr, port, 0 /* connection flags */);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
return rv;
}
rv=_exchangeConnectMsgs(dc, userId, passwd, clientId, flags);
if (rv<0) {
AQH_Object_free(dc->ipcEndpoint);
dc->ipcEndpoint=NULL;
DBG_INFO(NULL, "here (%d)", rv);
return rv;
}
return 0;
}
return GWEN_ERROR_INVALID;
}
int AQH_DataClient_Disconnect(AQH_DATACLIENT *dc)
{
if (dc) {
AQH_Object_free(dc->ipcEndpoint);
dc->ipcEndpoint=NULL;
return 0;
}
return GWEN_ERROR_INVALID;
}
void AQH_DataClient_SetTimeout(AQH_DATACLIENT *dc, int i)
{
if (dc) {
dc->timeoutInSeconds=i;
}
}
AQH_DEVICE_LIST *AQH_DataClient_GetDevices(AQH_DATACLIENT *dc, const char *deviceName)
{
if (dc) {
AQH_MESSAGE *msgOut;
AQH_MESSAGE *msgIn;
uint32_t msgId;
AQH_DEVICE_LIST *fullDeviceList;
fullDeviceList=AQH_Device_List_new();
msgId=++(dc->lastMsgId);
msgOut=AQH_IpcdMessageGetDevices_new(AQH_MSGTYPE_IPC_DATA_GETDEVICES_REQ, msgId, 0, deviceName);
AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut);
while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(dc->ipcEndpoint, msgId, dc->timeoutInSeconds)) ) {
GWEN_TAG16_LIST *tagList;
tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0);
if (tagList) {
uint16_t code;
code=AQH_IpcMessage_GetCode(msgIn);
if (code==AQH_MSGTYPE_IPC_DATA_GETDEVICES_RSP) {
AQH_DEVICE_LIST *deviceList;
deviceList=AQH_IpcdMessageDevices_ReadDeviceList(tagList);
if (deviceList) {
AQH_Device_List_AddList(fullDeviceList, deviceList);
AQH_Device_List_free(deviceList);
}
if (AQH_IpcdMessageDevices_GetFlags(tagList) & AQH_MSGDATA_DEVICES_FLAGS_LASTMSG) {
GWEN_Tag16_List_free(tagList);
AQH_Message_free(msgIn);
break;
}
}
else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
DBG_ERROR(NULL, "Server Error: %d", AQH_IpcMessageResult_GetResult(tagList));
GWEN_Tag16_List_free(tagList);
AQH_Message_free(msgIn);
AQH_Device_List_free(fullDeviceList);
return NULL;
}
else {
DBG_INFO(NULL, "Ignoring message \"%d\"", code);
}
GWEN_Tag16_List_free(tagList);
}
AQH_Message_free(msgIn);
} /* while */
if (AQH_Device_List_GetCount(fullDeviceList)>0)
return fullDeviceList;
AQH_Device_List_free(fullDeviceList);
}
return NULL;
}
AQH_VALUE_LIST *AQH_DataClient_GetValues(AQH_DATACLIENT *dc, const char *deviceName, int modality)
{
if (dc) {
AQH_MESSAGE *msgOut;
AQH_MESSAGE *msgIn;
uint32_t msgId;
AQH_VALUE_LIST *fullValueList;
fullValueList=AQH_Value_List_new();
msgId=++(dc->lastMsgId);
msgOut=AQH_IpcdMessageGetValues_new(AQH_MSGTYPE_IPC_DATA_GETVALUES_REQ, msgId, 0, deviceName, modality);
AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut);
while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(dc->ipcEndpoint, msgId, dc->timeoutInSeconds)) ) {
GWEN_TAG16_LIST *tagList;
tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0);
if (tagList) {
uint16_t code;
code=AQH_IpcMessage_GetCode(msgIn);
if (code==AQH_MSGTYPE_IPC_DATA_GETVALUES_RSP) {
AQH_VALUE_LIST *valueList;
valueList=AQH_IpcdMessageValues_ReadValueList(tagList);
if (valueList) {
AQH_Value_List_AddList(fullValueList, valueList);
AQH_Value_List_free(valueList);
}
if (AQH_IpcdMessageValues_GetFlags(tagList) & AQH_MSGDATA_VALUES_FLAGS_LASTMSG) {
GWEN_Tag16_List_free(tagList);
AQH_Message_free(msgIn);
break;
}
}
else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
DBG_ERROR(NULL, "Server Error: %d", AQH_IpcMessageResult_GetResult(tagList));
GWEN_Tag16_List_free(tagList);
AQH_Message_free(msgIn);
AQH_Value_List_free(fullValueList);
return NULL;
}
else {
DBG_INFO(NULL, "Ignoring message \"%d\"", code);
}
GWEN_Tag16_List_free(tagList);
}
AQH_Message_free(msgIn);
} /* while */
if (AQH_Value_List_GetCount(fullValueList)>0)
return fullValueList;
AQH_Value_List_free(fullValueList);
}
return NULL;
}
uint64_t AQH_DataClient_GetFirstData(AQH_DATACLIENT *dc, const char *valueName, uint64_t *dataPtr, uint64_t maxNum)
{
return _getFirstOrLastData(dc, valueName, dataPtr, maxNum, AQH_MSGDATA_GETDATA_MODE_FIRST);
}
uint64_t AQH_DataClient_GetLastData(AQH_DATACLIENT *dc, const char *valueName, uint64_t *dataPtr, uint64_t maxNum)
{
return _getFirstOrLastData(dc, valueName, dataPtr, maxNum, AQH_MSGDATA_GETDATA_MODE_LAST);
}
uint64_t AQH_DataClient_GetPeriodData(AQH_DATACLIENT *dc, const char *valueName,
uint64_t *dataPtr, uint64_t maxNum,
uint64_t tsBegin, uint64_t tsEnd)
{
if (dc) {
AQH_MESSAGE *msgOut;
uint32_t msgId;
msgId=++(dc->lastMsgId);
msgOut=AQH_IpcdMessageGetData_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ,
msgId, 0,
AQH_MSGDATA_GETDATA_MODE_PERIOD,
valueName, tsBegin, tsEnd, maxNum);
AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut);
return _handleDataResponses(dc, dataPtr, maxNum, msgId);
}
return 0;
}
int AQH_DataClient_SetData(AQH_DATACLIENT *dc, const AQH_VALUE *v, double data)
{
if (dc) {
AQH_MESSAGE *msgOut;
uint32_t msgId;
msgId=++(dc->lastMsgId);
msgOut=AQH_IpcdMessageSetData_new(AQH_MSGTYPE_IPC_DATA_SETDATA, msgId, 0, v, data);
AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut);
return _handleResult(dc, msgId);
}
return GWEN_ERROR_INVALID;
}
int AQH_DataClient_UpdateData(AQH_DATACLIENT *dc, const AQH_VALUE *v, uint64_t timeStamp, double dataPoint)
{
if (dc) {
AQH_MESSAGE *msgOut;
uint32_t msgId;
msgId=++(dc->lastMsgId);
msgOut=AQH_IpcdMessageMultiData_newForOne(AQH_MSGTYPE_IPC_DATA_UPDATEDATA, msgId, 0, 0, v, timeStamp, dataPoint);
AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut);
return _handleResult(dc, msgId);
}
return GWEN_ERROR_INVALID;
}
int _connectEndpoint(AQH_DATACLIENT *dc, const char *addr, int port, uint32_t flags)
{
if (dc) {
AQH_OBJECT *ep;
int fd;
fd=AQH_TcpObject_CreateConnectedSocket(addr, port);
if (fd<0) {
DBG_ERROR(NULL, "Error connecting to broker server %s:%d", addr, port);
return GWEN_ERROR_IO;
}
ep=AQH_IpcClientObject_new(dc->eventLoop, fd);
assert(ep);
AQH_Endpoint_AddFlags(ep, flags);
dc->ipcEndpoint=ep;
return 0;
}
return GWEN_ERROR_INVALID;
}
int _exchangeConnectMsgs(AQH_DATACLIENT *dc, const char *userId, const char *passwd, const char *clientId, uint32_t flags)
{
AQH_MESSAGE *msgOut;
uint32_t msgId;
DBG_INFO(NULL, "Sending connect message for proto=%d.%d", dc->protoId, dc->protoVer);
msgId=AQH_Endpoint_GetNextMessageId(dc->ipcEndpoint);
msgOut=AQH_IpcMessageConnect_new(dc->protoId, dc->protoVer,
AQH_MSGTYPE_IPC_CONNECT_REQ,
msgId, 0,
clientId, userId, passwd, flags);
AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut);
return AQH_IpcEndpoint_WaitForResultMsg(dc->ipcEndpoint,
dc->protoId, dc->protoVer, AQH_MSGTYPE_IPC_RESULT,
msgId, dc->timeoutInSeconds);
}
uint64_t _getFirstOrLastData(AQH_DATACLIENT *dc, const char *valueName, uint64_t *dataPtr, uint64_t maxNum, int mode)
{
if (dc) {
AQH_MESSAGE *msgOut;
uint32_t msgId;
msgId=++(dc->lastMsgId);
msgOut=AQH_IpcdMessageGetData_new(AQH_MSGTYPE_IPC_DATA_GETDATA_REQ,
msgId, 0,
mode,
valueName, 0, 0, maxNum);
AQH_Endpoint_AddMsgOut(dc->ipcEndpoint, msgOut);
return _handleDataResponses(dc, dataPtr, maxNum, msgId);
}
return 0;
}
uint64_t _handleDataResponses(AQH_DATACLIENT *dc, uint64_t *dataPtr, uint64_t maxNum, uint32_t msgId)
{
AQH_MESSAGE *msgIn;
uint64_t fullNumberOfPoints=0;
while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(dc->ipcEndpoint, msgId, dc->timeoutInSeconds)) ) {
GWEN_TAG16_LIST *tagList;
tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0);
if (tagList) {
uint16_t code;
code=AQH_IpcMessage_GetCode(msgIn);
if (code==AQH_MSGTYPE_IPC_DATA_GETDATA_RSP) {
const uint64_t *recvDataPtr;
uint64_t recvNumberOfPoints;
uint32_t flags;
flags=AQH_IpcdMessageMultiData_GetFlags(tagList);
AQH_IpcdMessageMultiData_ReadDatapoints(tagList, &recvDataPtr, &recvNumberOfPoints);
if (recvNumberOfPoints) {
uint64_t i;
for (i=0; i<recvNumberOfPoints; i++) {
if (fullNumberOfPoints<maxNum) {
dataPtr[fullNumberOfPoints*2]=recvDataPtr[i*2];
dataPtr[(fullNumberOfPoints*2)+1]=recvDataPtr[(i*2)+1];
fullNumberOfPoints++;
}
else {
DBG_ERROR(NULL, "Too many bytes received");
break;
}
}
}
if (flags & AQH_MSGDATA_MULTIDATA_FLAGS_LASTMSG) {
GWEN_Tag16_List_free(tagList);
AQH_Message_free(msgIn);
break;
}
}
else if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
DBG_INFO(NULL, "Server Error: %d", AQH_IpcMessageResult_GetResult(tagList));
GWEN_Tag16_List_free(tagList);
AQH_Message_free(msgIn);
return 0;
}
else {
DBG_INFO(NULL, "Ignoring message \"%d\"", code);
}
GWEN_Tag16_List_free(tagList);
}
AQH_Message_free(msgIn);
} /* while */
return fullNumberOfPoints;
}
int _handleResult(AQH_DATACLIENT *dc, uint32_t msgId)
{
AQH_MESSAGE *msgIn;
while( (msgIn=AQH_IpcEndpoint_WaitForResponseMsg(dc->ipcEndpoint, msgId, dc->timeoutInSeconds)) ) {
GWEN_TAG16_LIST *tagList;
tagList=AQH_IpcMessageTag16_ParsePayload(msgIn, 0);
if (tagList) {
uint16_t code;
code=AQH_IpcMessage_GetCode(msgIn);
if (code==AQH_MSGTYPE_IPC_DATA_RESULT) {
int result;
result=AQH_IpcMessageResult_GetResult(tagList);
DBG_INFO(NULL, "Server result: %d", result);
GWEN_Tag16_List_free(tagList);
AQH_Message_free(msgIn);
if (result!=AQH_MSGDATA_RESULT_SUCCESS) {
DBG_INFO(NULL, "here (%d)", result);
return GWEN_ERROR_GENERIC;
}
return 0;
}
else {
DBG_INFO(NULL, "Ignoring message \"%d\"", code);
}
GWEN_Tag16_List_free(tagList);
}
AQH_Message_free(msgIn);
} /* while */
return GWEN_ERROR_TIMEOUT;
}
int AQH_DataClient_ConnectWithArgs(AQH_DATACLIENT *dc, uint32_t flags)
{
const char *brokerAddress;
int brokerPort;
const char *userId;
const char *passwd;
const char *clientId;
int rv;
brokerAddress=GWEN_DB_GetCharValue(dc->dbLocalArgs, "brokerAddress", 0, NULL);
if (!(brokerAddress && *brokerAddress))
brokerAddress=GWEN_DB_GetCharValue(dc->dbLocalArgs, "ConfigFile/brokerAddress", 0, "127.0.0.1");
brokerPort=GWEN_DB_GetIntValue(dc->dbLocalArgs, "brokerPort", 0, -1);
if (brokerPort<0)
brokerPort=GWEN_DB_GetIntValue(dc->dbLocalArgs, "ConfigFile/brokerPort", 0, 1899);
userId=GWEN_DB_GetCharValue(dc->dbLocalArgs, "userId", 0, NULL);
passwd=GWEN_DB_GetCharValue(dc->dbLocalArgs, "password", 0, NULL);
clientId=GWEN_DB_GetCharValue(dc->dbLocalArgs, "brokerClientId", 0, NULL);
rv=AQH_DataClient_Connect(dc, brokerAddress, brokerPort, userId, passwd, clientId, flags);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
return rv;
}
return 0;
}