/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "./init_mqtt.h" #include "./aqhomestorage_p.h" #include "./aqhomehttp.h" #include "./u_login.h" #include "./u_rooms.h" #include "./u_devices.h" #include "./u_mqtttopics.h" #include "./u_values.h" #include "./u_static.h" #include "aqhome/msg/endpoint_tty.h" #include "aqhome/ipc/endpoint_ipc.h" #include "aqhome/mqtt/endpoint_mqttc.h" #include #include #include "aqhome/http/endpoint_http.h" #include "aqhome/http/httpservice_conf.h" #include "aqhome/http/httpservice_http.h" #include "aqhome/http/httpservice.h" #include "aqhome/http/content_files.h" #include #include #include #include #include #include #include #ifdef HAVE_SYS_TYPES_H # include #endif #ifdef HAVE_SYS_STAT_H # include #endif #include #include #include #include #include #include /* ------------------------------------------------------------------------------------------------ * defines * ------------------------------------------------------------------------------------------------ */ //#define I18N(msg) msg #define I18S(msg) msg #define AQHOME_STORAGE_DEFAULT_CMDTIMEOUT 10000 /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static int _mqttConnect(GWEN_MSG_ENDPOINT *epTcp); static int _subscribe(AQHOME_STORAGE *aqh, const char *topicFilter); static GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int timeoutInSeconds); /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ int AqHomeStorage_SetupMqtt(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs) { const char *mqttAddress; int mqttPort; const char *mqttClientId; int mqttKeepAlive; mqttAddress=GWEN_DB_GetCharValue(dbArgs, "mqttAddress", 0, NULL); mqttPort=GWEN_DB_GetIntValue(dbArgs, "mqttPort", 0, AQHOME_STORAGE_DEFAULT_MQTT_PORT); mqttClientId=GWEN_DB_GetCharValue(dbArgs, "mqttClientId", 0, AQHOME_STORAGE_DEFAULT_MQTT_CLIENTID); mqttKeepAlive=GWEN_DB_GetIntValue(dbArgs, "mqttKeepAlive", 0, AQHOME_STORAGE_DEFAULT_MQTT_KEEPALIVE); if (mqttAddress && *mqttAddress && mqttPort) { GWEN_MSG_ENDPOINT *ep; int rv; ep=AQH_MqttClientEndpoint_new(mqttClientId, mqttAddress, mqttPort, NULL, 0); AQH_MqttClientEndpoint_SetKeepAliveTime(ep, mqttKeepAlive); GWEN_MsgEndpoint_Tree2_AddChild(aqh->rootEndpoint, ep); aqh->mqttEndpoint=ep; rv=_mqttConnect(ep); if (rv<0) { DBG_ERROR(NULL, "Error connecting to MQTT server %s:%d (%d)", mqttAddress, mqttPort, rv); return rv; } rv=_subscribe(aqh, "#"); if (rv<0) { DBG_ERROR(NULL, "Error subscribingconnecting to MQTT server %s:%d (%d)", mqttAddress, mqttPort, rv); return rv; } } return 0; } int _mqttConnect(GWEN_MSG_ENDPOINT *epTcp) { if (GWEN_MsgEndpoint_GetState(epTcp)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { int rv; rv=AQH_MqttClientEndpoint_StartConnect(epTcp); if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { DBG_ERROR(NULL, "Error starting to connect (%d)", rv); return rv; } } while(GWEN_MsgEndpoint_GetState(epTcp)!=GWEN_MSG_ENDPOINT_STATE_CONNECTED) { DBG_DEBUG(NULL, "Next loop"); GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */ } return 0; } int _subscribe(AQHOME_STORAGE *aqh, const char *topicFilter) { uint16_t pckId; GWEN_MSG *msgOut; GWEN_MSG *msgIn; DBG_INFO(NULL, "Sending SUBSCRIBE %s", topicFilter); pckId=AQH_MqttClientEndpoint_GetNextPacketId(aqh->mqttEndpoint); msgOut=GWEN_SubscribeMqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE, pckId, topicFilter, 0); if (msgOut==NULL) { DBG_ERROR(NULL, "Error creating message"); return GWEN_ERROR_INTERNAL; } GWEN_MsgEndpoint_AddSendMessage(aqh->mqttEndpoint, msgOut); DBG_INFO(NULL, "Waiting for response"); msgIn=_awaitPacket(aqh->mqttEndpoint, AQH_MQTTMSG_MSGTYPE_SUBACK, AQHOME_STORAGE_DEFAULT_CMDTIMEOUT); if (msgIn) { GWEN_BUFFER *buf; buf=GWEN_Buffer_new(0, 256, 0, 1); AQH_SubAckMqttMsg_DumpToBuffer(msgIn, buf, "received"); DBG_INFO(NULL, "%s", GWEN_Buffer_GetStart(buf)); GWEN_Buffer_free(buf); GWEN_Msg_free(msgIn); } return 0; } GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int timeoutInSeconds) { time_t startTime; startTime=time(NULL); for (;;) { GWEN_MSG *msg; time_t now; GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */ msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp); if (msg) { if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(expectedPacketType & 0xf0)) { return msg; } else { DBG_ERROR(NULL, "Received this message:"); GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); } GWEN_Msg_free(msg); } now=time(NULL); if (now-startTime>timeoutInSeconds) { DBG_INFO(NULL, "Timeout"); break; } } return NULL; }