mqtt: added more message types, added test for them.

This commit is contained in:
Martin Preuss
2023-03-29 00:42:49 +02:00
parent e07e3f2b8a
commit bf4451f3f4
10 changed files with 477 additions and 2 deletions

View File

@@ -11,6 +11,9 @@
#include "aqhome/ipc/endpoint_node_ipc_tcp.h"
#include "aqhome/mqtt/endpoint_mqttc.h"
#include "aqhome/mqtt/msg_mqtt_connect.h"
#include "aqhome/mqtt/msg_mqtt_connack.h"
#include "aqhome/mqtt/msg_mqtt_publish.h"
#include "aqhome/mqtt/msg_mqtt_pubresponse.h"
#include "aqhome/msgmanager.h"
#include "aqhome/aqhome.h"
@@ -192,6 +195,7 @@ int testMqttConnection()
return 2;
}
fprintf(stdout, "Sending CONNECT\n");
msgOut=GWEN_ConnectMqttMsg_new("MQTT", 4, 0, 10, "CLIENTID123", NULL, NULL);
if (msgOut==NULL) {
DBG_ERROR(NULL, "Error creating message");
@@ -199,6 +203,7 @@ int testMqttConnection()
}
GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut);
fprintf(stdout, "Waiting for response\n");
for (;;) {
GWEN_MSG *msg;
@@ -207,12 +212,59 @@ int testMqttConnection()
GWEN_MsgEndpointMgr_IoLoopOnce(emgr);
msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp);
if (msg) {
DBG_ERROR(NULL, "Received this message:");
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
if (AQH_MqttMsg_GetMsgTypeAndFlags(msg)==AQH_MQTTMSG_MSGTYPE_CONNACK) {
GWEN_BUFFER *buf;
buf=GWEN_Buffer_new(0, 256, 0, 1);
AQH_ConnAckMqttMsg_DumpToBuffer(msg, buf, "received");
fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf));
GWEN_Buffer_free(buf);
}
else {
DBG_ERROR(NULL, "Received this message:");
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
}
GWEN_Msg_free(msg);
break;
}
}
fprintf(stdout, "Sending PUBLISH\n");
//msgOut=GWEN_PublishMqttMsg_new(AQH_MQTTMSG_FLAGS_QOS1, 1, "test/subject1", (const uint8_t*) "29.9", 4);
msgOut=GWEN_PublishMqttMsg_new(0, 0, "test/subject1", (const uint8_t*) "29.9", 4);
if (msgOut==NULL) {
DBG_ERROR(NULL, "Error creating message");
return 2;
}
GWEN_MsgEndpoint_AddSendMessage(epTcp, msgOut);
fprintf(stdout, "Waiting for response\n");
for (;;) {
GWEN_MSG *msg;
DBG_DEBUG(AQH_LOGDOMAIN, "Next loop");
//GWEN_MsgManager_LoopOnce(emgr);
GWEN_MsgEndpointMgr_IoLoopOnce(emgr);
msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(epTcp);
if (msg) {
if (AQH_MqttMsg_GetMsgTypeAndFlags(msg)==AQH_MQTTMSG_MSGTYPE_PUBACK) {
GWEN_BUFFER *buf;
buf=GWEN_Buffer_new(0, 256, 0, 1);
AQH_PubResponseMqttMsg_DumpToBuffer(msg, buf, "received");
fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf));
GWEN_Buffer_free(buf);
}
else {
DBG_ERROR(NULL, "Received this message:");
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2);
}
GWEN_Msg_free(msg);
break;
}
}
return 0;
}