diff --git a/apps/aqhome-mqttlog/main.c b/apps/aqhome-mqttlog/main.c index ed7c1a3..26ae5c0 100644 --- a/apps/aqhome-mqttlog/main.c +++ b/apps/aqhome-mqttlog/main.c @@ -47,6 +47,8 @@ #define I18N(msg) msg #define I18S(msg) msg +#define AQHOME_MQTTLOG_PING_INTERVAL 120 + //#define FULL_DEBUG @@ -146,6 +148,7 @@ int _serve(GWEN_DB_NODE *dbArgs) int rv; int timeout; time_t startTime; + time_t lastPingSendTime; const char *baseFolder; startTime=time(NULL); @@ -196,6 +199,8 @@ int _serve(GWEN_DB_NODE *dbArgs) return rv; } + lastPingSendTime=time(NULL); + while(!stopService) { DBG_DEBUG(NULL, "Next loop"); GWEN_MsgEndpoint2_IoLoop(epTcp, 2000); /* 2000 ms */ @@ -222,7 +227,10 @@ int _serve(GWEN_DB_NODE *dbArgs) GWEN_Buffer_free(buf); #endif AqHomeMqttLog_HandlePublishMsg(baseFolder, itemList, msg); - } + } + else if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0)) { + DBG_INFO(AQH_LOGDOMAIN, "PING response received"); + } else { #ifdef FULL_DEBUG DBG_ERROR(NULL, "Received this message:"); @@ -232,13 +240,26 @@ int _serve(GWEN_DB_NODE *dbArgs) GWEN_Msg_free(msg); } } + if (timeout) { time_t now; now=time(NULL); if ((now-startTime)>timeout) { - DBG_INFO(NULL, "Timeout, stopping service"); - break; + DBG_INFO(NULL, "Timeout, stopping service"); + break; + } + } + if (1){ + time_t now; + + now=time(NULL); + if (now-lastPingSendTime>AQHOME_MQTTLOG_PING_INTERVAL) { + rv=AqHomeMqttLog_Ping(epTcp); + if (rv<0) { + DBG_INFO(NULL, "Error sending PING"); + } + lastPingSendTime=time(NULL); } } } diff --git a/apps/aqhome-mqttlog/messages.c b/apps/aqhome-mqttlog/messages.c index 2c73604..ea8b0b0 100644 --- a/apps/aqhome-mqttlog/messages.c +++ b/apps/aqhome-mqttlog/messages.c @@ -108,7 +108,7 @@ void _handlePublish(const char *baseFolder, const ITEM_LIST *itemList, const cha if (item) { const char *t; - DBG_INFO(NULL, "HANDLING topic \"%s\"", topic); + DBG_INFO(AQH_LOGDOMAIN, "HANDLING topic \"%s\"", topic); t=Item_GetDataType(item); if (t && strcasecmp(t, "json")==0) _handleJsonMsgForItem(baseFolder, item, value); @@ -116,7 +116,7 @@ void _handlePublish(const char *baseFolder, const ITEM_LIST *itemList, const cha _handleRawMsgForItem(baseFolder, item, value); } else { - DBG_INFO(NULL, "ignoring topic \"%s\"", topic); + DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", topic); } } diff --git a/apps/aqhome-mqttlog/mqtt.c b/apps/aqhome-mqttlog/mqtt.c index fd861f4..dfefaa9 100644 --- a/apps/aqhome-mqttlog/mqtt.c +++ b/apps/aqhome-mqttlog/mqtt.c @@ -94,7 +94,7 @@ int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT2 *epTcp) int rv; rv=AQH_MqttClientEndpoint2_StartConnect(epTcp); - if (rv<0) { + if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) { DBG_ERROR(NULL, "Error starting to connect (%d)", rv); return rv; } @@ -122,9 +122,6 @@ int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT2 *epTcp, const char *topicFilter) DBG_ERROR(NULL, "Error creating message"); return GWEN_ERROR_INTERNAL; } - DBG_ERROR(NULL, "Sending this message:"); - GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msgOut), GWEN_Msg_GetBytesInBuffer(msgOut), 2); - GWEN_MsgEndpoint2_AddSendMessage(epTcp, msgOut); DBG_INFO(NULL, "Waiting for response"); @@ -144,6 +141,23 @@ int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT2 *epTcp, const char *topicFilter) +int AqHomeMqttLog_Ping(GWEN_MSG_ENDPOINT2 *epTcp) +{ + GWEN_MSG *msgOut; + + DBG_INFO(AQH_LOGDOMAIN, "Sending PING"); + msgOut=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL); + if (msgOut==NULL) { + DBG_ERROR(NULL, "Error creating message"); + return GWEN_ERROR_INTERNAL; + } + GWEN_MsgEndpoint2_AddSendMessage(epTcp, msgOut); + + return 0; +} + + + GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT2 *epTcp, uint8_t expectedPacketType, int timeoutInSeconds) { time_t startTime; diff --git a/apps/aqhome-mqttlog/mqtt.h b/apps/aqhome-mqttlog/mqtt.h index 7ff84bb..8e8ac2c 100644 --- a/apps/aqhome-mqttlog/mqtt.h +++ b/apps/aqhome-mqttlog/mqtt.h @@ -19,6 +19,7 @@ GWEN_MSG_ENDPOINT2 *AqHomeMqttLog_CreateMqttEndpoint(GWEN_DB_NODE *dbArgs); int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT2 *epTcp); int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT2 *epTcp, const char *topicFilter); +int AqHomeMqttLog_Ping(GWEN_MSG_ENDPOINT2 *epTcp); diff --git a/aqhome-mqttlog.sh b/aqhome-mqttlog.sh index 0786e6b..52d4872 100755 --- a/aqhome-mqttlog.sh +++ b/aqhome-mqttlog.sh @@ -1,2 +1,7 @@ +#!/bin/bash + +export AQHOME_LOGLEVEL=info +export LD_LIBRARY_PATH="0-build/aqhome/:$LD_LIBRARY_PATH" + +0-build/apps/aqhome-mqttlog/aqhome-mqttlog -ma 192.168.117.192 -mp 1883 -W /tmp/aqhome/mqttlog -i apps/aqhome-mqttlog/mqttlog.conf --mqttclientid=AQHOMEMQTTLOGTEST -AQHOME_LOGLEVEL=info LD_LIBRARY_PATH="../../aqhome/:$LD_LIBRARY_PATH" ./aqhome-mqttlog -ma 192.168.117.192 -mp 1883 -t 127.0.0.1 -W /tmp/aqhome diff --git a/aqhome/mqtt/endpoint2_mqtt.c b/aqhome/mqtt/endpoint2_mqtt.c index 4002b38..afcf5e5 100644 --- a/aqhome/mqtt/endpoint2_mqtt.c +++ b/aqhome/mqtt/endpoint2_mqtt.c @@ -183,7 +183,7 @@ int _getBytesNeededForMessage(GWEN_UNUSED GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg) bytesInMsg=GWEN_Msg_GetBytesInBuffer(msg); if (bytesInMsg<2) { - DBG_INFO(AQH_LOGDOMAIN, "Header not yet complete"); + DBG_DEBUG(AQH_LOGDOMAIN, "Header not yet complete"); return (int) (2-bytesInMsg); } if (!(GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET)) @@ -196,7 +196,7 @@ int _getBytesNeededForMessage(GWEN_UNUSED GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg) return (int)(msgLength-bytesInMsg); } else { - DBG_INFO(AQH_LOGDOMAIN, "Size field not complete, requesting another byte"); + DBG_DEBUG(AQH_LOGDOMAIN, "Size field not complete, requesting another byte"); return 1; } } diff --git a/aqhome/mqtt/endpoint2_mqttc.c b/aqhome/mqtt/endpoint2_mqttc.c index 6db06b0..547528b 100644 --- a/aqhome/mqtt/endpoint2_mqttc.c +++ b/aqhome/mqtt/endpoint2_mqttc.c @@ -215,7 +215,7 @@ void _addSockets(GWEN_MSG_ENDPOINT2 *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET void _checkSockets(GWEN_MSG_ENDPOINT2 *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet) { - DBG_ERROR(AQH_LOGDOMAIN, "Checking sockets in state %d", GWEN_MsgEndpoint2_GetState(ep)); + DBG_DEBUG(AQH_LOGDOMAIN, "Checking sockets in state %d", GWEN_MsgEndpoint2_GetState(ep)); if (ep) { GWEN_MSG_ENDPOINT2 *epChild; diff --git a/aqhome/msg/endpoint2_tty.c b/aqhome/msg/endpoint2_tty.c index 1377dc8..cab5919 100644 --- a/aqhome/msg/endpoint2_tty.c +++ b/aqhome/msg/endpoint2_tty.c @@ -290,7 +290,7 @@ int _getBytesNeededForMessage(GWEN_UNUSED GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg) bytesInMsg=GWEN_Msg_GetBytesInBuffer(msg); if (bytesInMsg