From 2d393630d8578b5b0b4b82216865e2a06849dda4 Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Sun, 9 Jul 2023 21:19:36 +0200 Subject: [PATCH] aqhome: added AQH_MqttClientEndpoint2_GetNextPacketId(). added test for subscriptions. --- aqhome/libtest.c | 130 +++++++++++++++++++++++++++++++++- aqhome/mqtt/endpoint2_mqttc.c | 18 +++++ aqhome/mqtt/endpoint2_mqttc.h | 2 + 3 files changed, 149 insertions(+), 1 deletion(-) diff --git a/aqhome/libtest.c b/aqhome/libtest.c index 23850c1..194ba2c 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -42,6 +42,9 @@ static int _mqttConnect(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp); static GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp, uint8_t expectedPacketType); +static int _mqttConnect2(GWEN_MSG_ENDPOINT2 *epClient); +static GWEN_MSG *_awaitPacket2(GWEN_MSG_ENDPOINT2 *epClient, uint8_t expectedPacketType); + GWEN_MSG *createPingMsg(uint8_t destAddr, uint8_t srcAddr) @@ -355,11 +358,134 @@ int testMqttSubscribe(int argc, char **argv) } } + return 0; } +int testMqttSubscribe2(int argc, char **argv) +{ + GWEN_MSG_ENDPOINT2 *epClient; + int rv; + GWEN_MSG *msgOut; + GWEN_MSG *msgIn; + uint16_t pckId; + const char *host="127.0.0.1"; + + AQH_Init(); + + if (argc>1) + host=argv[1]; + + DBG_ERROR(AQH_LOGDOMAIN, "Connecting to %s (%s)", host, argv[1]); + epClient=AQH_MqttClientEndpoint2_new("TESTCLIENT1234", host, 1883, NULL, 1); + rv=_mqttConnect2(epClient); + if (rv<0) { + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + return 2; + } + + pckId=AQH_MqttClientEndpoint2_GetNextPacketId(epClient); + //msgOut=GWEN_PublishMqttMsg_new(AQH_MQTTMSG_FLAGS_QOS1, 1, "test/subject1", (const uint8_t*) "29.9", 4); + //msgOut=GWEN_SubscribeMqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE, pckId, "aqhome/#", 0); + msgOut=GWEN_SubscribeMqttMsg_new(AQH_MQTTMSG_MSGTYPE_SUBSCRIBE, pckId, "#", 0); + if (msgOut==NULL) { + DBG_ERROR(NULL, "Error creating message"); + return 2; + } + GWEN_MsgEndpoint2_AddSendMessage(epClient, msgOut); + + msgIn=_awaitPacket2(epClient, AQH_MQTTMSG_MSGTYPE_SUBACK); + if (msgIn) { + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_SubAckMqttMsg_DumpToBuffer(msgIn, buf, "received"); + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + GWEN_Msg_free(msgIn); + } + + for (;;) { + GWEN_MSG *msg; + + GWEN_MsgEndpoint2_IoLoop(epClient, 2000); /* 2000 ms */ + msg=GWEN_MsgEndpoint2_TakeFirstReceivedMessage(epClient); + if (msg) { + if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0)) { + GWEN_BUFFER *buf; + + buf=GWEN_Buffer_new(0, 256, 0, 1); + AQH_PublishMqttMsg_DumpToBuffer(msg, buf, "received"); + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(buf)); + GWEN_Buffer_free(buf); + } + else { + DBG_ERROR(NULL, "Received this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); + } + GWEN_Msg_free(msg); + } + } + + + return 0; +} + + + +int _mqttConnect2(GWEN_MSG_ENDPOINT2 *epClient) +{ + int loop; + + for (loop=0;; loop++) { + DBG_INFO(GWEN_LOGDOMAIN, "Loop %d:", loop); + GWEN_MsgEndpoint2_IoLoop(epClient, 2000); /* 2000 ms */ + + if (GWEN_MsgEndpoint2_GetState(epClient)==GWEN_MSG_ENDPOINT_STATE_CONNECTED) { + DBG_INFO(AQH_LOGDOMAIN, "Connected."); + break; + } + else if (GWEN_MsgEndpoint2_GetState(epClient)==GWEN_MSG_ENDPOINT_STATE_UNCONNECTED) { + DBG_INFO(AQH_LOGDOMAIN, "Disconnected."); + return GWEN_ERROR_IO; + } + } + + return 0; +} + + + +GWEN_MSG *_awaitPacket2(GWEN_MSG_ENDPOINT2 *epClient, uint8_t expectedPacketType) +{ + fprintf(stdout, "Waiting for response\n"); + for (;;) { + GWEN_MSG *msg; + + DBG_DEBUG(AQH_LOGDOMAIN, "Next loop"); + GWEN_MsgEndpoint2_IoLoop(epClient, 2000); /* 2000 ms */ + msg=GWEN_MsgEndpoint2_TakeFirstReceivedMessage(epClient); + if (msg) { + if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(expectedPacketType & 0xf0)) { + return msg; + } + else { + DBG_ERROR(NULL, "Received this message:"); + GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msg), GWEN_Msg_GetBytesInBuffer(msg), 2); + } + } + } /* for */ + + return NULL; +} + + + + + + int _mqttConnect(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *epTcp) { while(GWEN_ConnectableMsgEndpoint_GetState(epTcp)