Files
aqhomecontrol/apps/aqhome-storage/init_mqtt.c

213 lines
5.8 KiB
C

/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./init_mqtt.h"
#include "./aqhomestorage_p.h"
#include "./aqhomehttp.h"
#include "./u_login.h"
#include "./u_rooms.h"
#include "./u_devices.h"
#include "./u_mqtttopics.h"
#include "./u_values.h"
#include "./u_static.h"
#include "aqhome/msg/endpoint_tty.h"
#include "aqhome/ipc/endpoint_ipc.h"
#include "aqhome/mqtt/endpoint_mqttc.h"
#include <aqhome/mqtt/msg_mqtt_subscribe.h>
#include <aqhome/mqtt/msg_mqtt_suback.h>
#include "aqhome/http/endpoint_http.h"
#include "aqhome/http/httpservice_conf.h"
#include "aqhome/http/httpservice_http.h"
#include "aqhome/http/httpservice.h"
#include "aqhome/http/content_files.h"
#include <gwenhywfar/gwenhywfar.h>
#include <gwenhywfar/args.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endpoint_tcpd.h>
#include <gwenhywfar/endpoint_msgio.h>
#include <gwenhywfar/directory.h>
#include <gwenhywfar/text.h>
#ifdef HAVE_SYS_TYPES_H
# include <sys/types.h>
#endif
#ifdef HAVE_SYS_STAT_H
# include <sys/stat.h>
#endif
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <time.h>
/* ------------------------------------------------------------------------------------------------
* defines
* ------------------------------------------------------------------------------------------------
*/
//#define I18N(msg) msg
#define I18S(msg) msg
#define AQHOME_STORAGE_DEFAULT_CMDTIMEOUT 10000
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static int _mqttConnect(GWEN_MSG_ENDPOINT *epTcp);
static int _subscribe(AQHOME_STORAGE *aqh, const char *topicFilter);
static GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int timeoutInSeconds);
/* ------------------------------------------------------------------------------------------------
* implementations
* ------------------------------------------------------------------------------------------------
*/
int AqHomeStorage_SetupMqtt(AQHOME_STORAGE *aqh, GWEN_DB_NODE *dbArgs)
{
const char *mqttAddress;
int mqttPort;
const char *mqttClientId;
int mqttKeepAlive;
mqttAddress=GWEN_DB_GetCharValue(dbArgs, "mqttAddress", 0, NULL);
mqttPort=GWEN_DB_GetIntValue(dbArgs, "mqttPort", 0, AQHOME_STORAGE_DEFAULT_MQTT_PORT);
mqttClientId=GWEN_DB_GetCharValue(dbArgs, "mqttClientId", 0, AQHOME_STORAGE_DEFAULT_MQTT_CLIENTID);
mqttKeepAlive=GWEN_DB_GetIntValue(dbArgs, "mqttKeepAlive", 0, AQHOME_STORAGE_DEFAULT_MQTT_KEEPALIVE);
if (mqttAddress && *mqttAddress && mqttPort) {
GWEN_MSG_ENDPOINT *ep;
int rv;
ep=AQH_MqttClientEndpoint_new(mqttClientId, mqttAddress, mqttPort, NULL, 0);
AQH_MqttClientEndpoint_SetKeepAliveTime(ep, mqttKeepAlive);
GWEN_MsgEndpoint_Tree2_AddChild(aqh->rootEndpoint, ep);
aqh->mqttEndpoint=ep;
rv=_mqttConnect(ep);
if (rv<0) {
DBG_ERROR(NULL, "Error connecting to MQTT server %s:%d (%d)", mqttAddress, mqttPort, rv);
return rv;
}
rv=_subscribe(aqh, "#");
if (rv<0) {
DBG_ERROR(NULL, "Error subscribingconnecting to MQTT server %s:%d (%d)", mqttAddress, mqttPort, rv);
return rv;
}
}
return 0;
}
int _mqttConnect(GWEN_MSG_ENDPOINT *epTcp)
{
if (GWEN_MsgEndpoint_GetState(epTcp)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) {
int rv;
rv=AQH_MqttClientEndpoint_StartConnect(epTcp);
if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) {
DBG_ERROR(NULL, "Error starting to connect (%d)", rv);
return rv;
}
}
while(GWEN_MsgEndpoint_GetState(epTcp)!=GWEN_MSG_ENDPOINT_STATE_CONNECTED) {
DBG_DEBUG(NULL, "Next loop");
GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
}
return 0;
}
int _subscribe(AQHOME_STORAGE *aqh, const char *topicFilter)
{
uint16_t pckId;
GWEN_MSG *msgOut;
GWEN_MSG *msgIn;
DBG_INFO(NULL, "Sending SUBSCRIBE %s", topicFilter);
pckId=AQH_MqttClientEndpoint_GetNextPacketId(aqh->mqttEndpoint);
msgOut=GWEN_SubscribeMqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE, pckId, topicFilter, 0);
if (msgOut==NULL) {
DBG_ERROR(NULL, "Error creating message");
return GWEN_ERROR_INTERNAL;
}
GWEN_MsgEndpoint_AddSendMessage(aqh->mqttEndpoint, msgOut);
DBG_INFO(NULL, "Waiting for response");
msgIn=_awaitPacket(aqh->mqttEndpoint, AQH_MQTTMSG_MSGTYPE_SUBACK, AQHOME_STORAGE_DEFAULT_CMDTIMEOUT);
if (msgIn) {
GWEN_BUFFER *buf;
buf=GWEN_Buffer_new(0, 256, 0, 1);
AQH_SubAckMqttMsg_DumpToBuffer(msgIn, buf, "received");
DBG_INFO(NULL, "%s", GWEN_Buffer_GetStart(buf));
GWEN_Buffer_free(buf);
GWEN_Msg_free(msgIn);
}
return 0;
}
GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType, int timeoutInSeconds)
{
time_t startTime;
startTime=time(NULL);
for (;;) {
GWEN_MSG *msg;
time_t now;
GWEN_MsgEndpoint_IoLoop(epTcp, 2000); /* 2000 ms */
msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp);
if (msg) {
if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(expectedPacketType & 0xf0)) {
return msg;
}
else {
DBG_ERROR(NULL, "Received this message:");
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
}
GWEN_Msg_free(msg);
}
now=time(NULL);
if (now-startTime>timeoutInSeconds) {
DBG_INFO(NULL, "Timeout");
break;
}
}
return NULL;
}