/**************************************************************************** * This file is part of the project Gwenhywfar. * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "aqhome/mqtt/endpoint_mqttc.h" #include "aqhome/mqtt/endpoint_mqtt.h" #include "aqhome/mqtt/msg_mqtt_connect.h" #include "aqhome/mqtt/msg_mqtt_connack.h" #include "aqhome/mqtt/msg_mqtt_publish.h" #include "aqhome/mqtt/msg_mqtt_subscribe.h" #include "aqhome/mqtt/msg_mqtt_suback.h" #include #include #include #include #include #define AQH_ENDPOINT2_MQTT_NAME "mqtt-client" #define AQH_ENDPOINT2_MQTTC_RECONNECT_TIME 5 #define AQH_ENDPOINT2_MQTTC_CONNECT_TIMEOUT 10 #define AQH_ENDPOINT2_MQTTC_STAGE_NONE 0 #define AQH_ENDPOINT2_MQTTC_STAGE_CONNREQ 1 #define AQH_ENDPOINT2_MQTTC_STAGE_SUBREQ 2 #define AQH_ENDPOINT2_MQTTC_STAGE_UP 10 /* ------------------------------------------------------------------------------------------------ * forward declarations * ------------------------------------------------------------------------------------------------ */ static int _startConnect(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild); static void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet); static void _lookForAndHandleConnAck(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild); static void _lookForAndHandleSubAck(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild); static int _sendSubscribeMsg(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, const char *topicFilter); /* ------------------------------------------------------------------------------------------------ * implementations * ------------------------------------------------------------------------------------------------ */ GWEN_MSG_ENDPOINT *AQH_MqttClientEndpoint_new(const char *clientId, const char *host, int port, const char *name, int groupId) { GWEN_MSG_ENDPOINT *ep; GWEN_MSG_ENDPOINT *epChild; ep=GWEN_MultilayerEndpoint_new(name?name:AQH_ENDPOINT2_MQTT_NAME, groupId); GWEN_MultilayerEndpoint_SetConnectTimeoutInSeconds(ep, AQH_ENDPOINT2_MQTTC_CONNECT_TIMEOUT); GWEN_MultilayerEndpoint_SetReconnectTimeInSeconds(ep, AQH_ENDPOINT2_MQTTC_RECONNECT_TIME); GWEN_MultilayerEndpoint_SetStartConnectFn(ep, _startConnect); GWEN_MultilayerEndpoint_SetCheckSocketsFn(ep, _checkSockets); epChild=GWEN_TcpcEndpoint_new(host, port, NULL, groupId); GWEN_MsgIoEndpoint_Extend(epChild); AQH_MqttEndpoint_Extend(epChild); AQH_MqttEndpoint_SetClientId(epChild, clientId); GWEN_MsgEndpoint_Tree2_AddChild(ep, epChild); return ep; } uint16_t AQH_MqttClientEndpoint_GetKeepAliveTime(const GWEN_MSG_ENDPOINT *ep) { if (ep) { GWEN_MSG_ENDPOINT *epChild; epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep); if (epChild) return AQH_MqttEndpoint_GetKeepAliveTime(epChild); } return 0; } void AQH_MqttClientEndpoint_SetKeepAliveTime(GWEN_MSG_ENDPOINT *ep, uint16_t i) { if (ep) { GWEN_MSG_ENDPOINT *epChild; epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep); if (epChild) { AQH_MqttEndpoint_SetKeepAliveTime(epChild, i); } } } uint16_t AQH_MqttClientEndpoint_GetNextPacketId(const GWEN_MSG_ENDPOINT *ep) { if (ep) { GWEN_MSG_ENDPOINT *epChild; epChild=GWEN_MsgEndpoint_Tree2_GetFirstChild(ep); if (epChild) { return AQH_MqttEndpoint_GetNextPacketId(epChild); } } return 0; } void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) { int stage; GWEN_MsgEndpoint_CheckSockets(epChild, readSet, writeSet, xSet); /* let base layer work */ stage=GWEN_MultilayerEndpoint_GetStage(ep); switch(stage) { case AQH_ENDPOINT2_MQTTC_STAGE_CONNREQ: _lookForAndHandleConnAck(ep, epChild); break; case AQH_ENDPOINT2_MQTTC_STAGE_SUBREQ: _lookForAndHandleSubAck(ep, epChild); break; default: break; } } int _startConnect(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild) { if (epChild) { int rv; GWEN_MSG *msg; rv=GWEN_TcpcEndpoint_StartConnect(epChild); if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { DBG_INFO(AQH_LOGDOMAIN, "Error starting to connect child layer (%d)", rv); return rv; } msg=AQH_MqttEndpoint_CreateMsgConnect(epChild); if (msg) { GWEN_MsgEndpoint_AddSendMessage(epChild, msg); GWEN_MsgEndpoint_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTING); GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_CONNREQ); return rv; /* result from GWEN_TcpcEndpoint_StartConnect() above */ } } return GWEN_ERROR_GENERIC; } void _lookForAndHandleConnAck(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild) { GWEN_MSG *msg; msg=GWEN_MsgEndpoint_GetFirstReceivedMessage(epChild); while(msg) { GWEN_MSG *msgNext; uint8_t msgType; msgNext=GWEN_Msg_List_Next(msg); msgType=AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0; if (msgType==AQH_MQTTMSG_MSGTYPE_CONNACK) { int code; GWEN_Msg_List_Del(msg); /* remove from list */ code=AQH_ConnAckMqttMsg_GetResultCode(msg); if (code==AQH_MQTTMSG_CONNACK_RESULT_ACCEPTED) { DBG_INFO(AQH_LOGDOMAIN, "Positive CONNACK response"); if (GWEN_MsgEndpoint_GetFlags(ep) & AQH_ENDPOINT2_MQTTCLIENT_FLAGS_SUBSCRIBEALL) { int rv; DBG_INFO(AQH_LOGDOMAIN, "Sending subscribe message"); rv=_sendSubscribeMsg(ep, epChild, "#"); if (rv<0) { DBG_ERROR(AQH_LOGDOMAIN, "Error sending SUBSCRIBE request (%d)", rv); GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_NONE); GWEN_MsgEndpoint_Disconnect(epChild); GWEN_MsgEndpoint_Disconnect(ep); } else { GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_SUBREQ); } } else { DBG_INFO(AQH_LOGDOMAIN, "Connected (no auto-subscription requested)"); GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_UP); GWEN_MsgEndpoint_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTED); } } else { DBG_ERROR(AQH_LOGDOMAIN, "Negative CONNACK response (%d)", code); GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_NONE); GWEN_MsgEndpoint_Disconnect(epChild); GWEN_MsgEndpoint_Disconnect(ep); } GWEN_Msg_free(msg); break; } else { DBG_ERROR(AQH_LOGDOMAIN, "Ignoring response (%d)", msgType); } msg=msgNext; } /* while */ } void _lookForAndHandleSubAck(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild) { GWEN_MSG *msg; msg=GWEN_MsgEndpoint_GetFirstReceivedMessage(epChild); while(msg) { GWEN_MSG *msgNext; uint8_t msgType; msgNext=GWEN_Msg_List_Next(msg); msgType=AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0; if (msgType==AQH_MQTTMSG_MSGTYPE_SUBACK) { int code; GWEN_Msg_List_Del(msg); /* remove from list */ code=AQH_SubAckMqttMsg_GetResultCode(msg); if (code!=128) { DBG_INFO(AQH_LOGDOMAIN, "Positive SUBACK response, connected (%d, %02x)", code, code); GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_UP); GWEN_MsgEndpoint_SetState(ep, GWEN_MSG_ENDPOINT_STATE_CONNECTED); } else { DBG_ERROR(AQH_LOGDOMAIN, "Negative SUBACK response (%d)", code); GWEN_MultilayerEndpoint_SetStage(ep, AQH_ENDPOINT2_MQTTC_STAGE_NONE); GWEN_MsgEndpoint_Disconnect(epChild); GWEN_MsgEndpoint_Disconnect(ep); } GWEN_Msg_free(msg); break; } else { DBG_ERROR(AQH_LOGDOMAIN, "Ignoring response (%d)", msgType); } msg=msgNext; } /* while */ } int _sendSubscribeMsg(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT *epChild, const char *topicFilter) { uint16_t pckId; GWEN_MSG *msgOut; DBG_INFO(AQH_LOGDOMAIN, "Sending SUBSCRIBE %s", topicFilter); pckId=AQH_MqttClientEndpoint_GetNextPacketId(ep); msgOut=GWEN_SubscribeMqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE, pckId, topicFilter, 0); if (msgOut==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Error creating message"); return GWEN_ERROR_INTERNAL; } GWEN_MsgEndpoint_AddSendMessage(epChild, msgOut); return 0; }