decreased verbosity, send MQTT ping every 2 minutes to avoid disconnect.
This commit is contained in:
@@ -47,6 +47,8 @@
|
|||||||
#define I18N(msg) msg
|
#define I18N(msg) msg
|
||||||
#define I18S(msg) msg
|
#define I18S(msg) msg
|
||||||
|
|
||||||
|
#define AQHOME_MQTTLOG_PING_INTERVAL 120
|
||||||
|
|
||||||
//#define FULL_DEBUG
|
//#define FULL_DEBUG
|
||||||
|
|
||||||
|
|
||||||
@@ -146,6 +148,7 @@ int _serve(GWEN_DB_NODE *dbArgs)
|
|||||||
int rv;
|
int rv;
|
||||||
int timeout;
|
int timeout;
|
||||||
time_t startTime;
|
time_t startTime;
|
||||||
|
time_t lastPingSendTime;
|
||||||
const char *baseFolder;
|
const char *baseFolder;
|
||||||
|
|
||||||
startTime=time(NULL);
|
startTime=time(NULL);
|
||||||
@@ -196,6 +199,8 @@ int _serve(GWEN_DB_NODE *dbArgs)
|
|||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lastPingSendTime=time(NULL);
|
||||||
|
|
||||||
while(!stopService) {
|
while(!stopService) {
|
||||||
DBG_DEBUG(NULL, "Next loop");
|
DBG_DEBUG(NULL, "Next loop");
|
||||||
GWEN_MsgEndpoint2_IoLoop(epTcp, 2000); /* 2000 ms */
|
GWEN_MsgEndpoint2_IoLoop(epTcp, 2000); /* 2000 ms */
|
||||||
@@ -222,7 +227,10 @@ int _serve(GWEN_DB_NODE *dbArgs)
|
|||||||
GWEN_Buffer_free(buf);
|
GWEN_Buffer_free(buf);
|
||||||
#endif
|
#endif
|
||||||
AqHomeMqttLog_HandlePublishMsg(baseFolder, itemList, msg);
|
AqHomeMqttLog_HandlePublishMsg(baseFolder, itemList, msg);
|
||||||
}
|
}
|
||||||
|
else if ((AQH_MqttMsg_GetMsgTypeAndFlags(msg) & 0xf0)==(AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0)) {
|
||||||
|
DBG_INFO(AQH_LOGDOMAIN, "PING response received");
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
#ifdef FULL_DEBUG
|
#ifdef FULL_DEBUG
|
||||||
DBG_ERROR(NULL, "Received this message:");
|
DBG_ERROR(NULL, "Received this message:");
|
||||||
@@ -232,13 +240,26 @@ int _serve(GWEN_DB_NODE *dbArgs)
|
|||||||
GWEN_Msg_free(msg);
|
GWEN_Msg_free(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (timeout) {
|
if (timeout) {
|
||||||
time_t now;
|
time_t now;
|
||||||
|
|
||||||
now=time(NULL);
|
now=time(NULL);
|
||||||
if ((now-startTime)>timeout) {
|
if ((now-startTime)>timeout) {
|
||||||
DBG_INFO(NULL, "Timeout, stopping service");
|
DBG_INFO(NULL, "Timeout, stopping service");
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (1){
|
||||||
|
time_t now;
|
||||||
|
|
||||||
|
now=time(NULL);
|
||||||
|
if (now-lastPingSendTime>AQHOME_MQTTLOG_PING_INTERVAL) {
|
||||||
|
rv=AqHomeMqttLog_Ping(epTcp);
|
||||||
|
if (rv<0) {
|
||||||
|
DBG_INFO(NULL, "Error sending PING");
|
||||||
|
}
|
||||||
|
lastPingSendTime=time(NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -108,7 +108,7 @@ void _handlePublish(const char *baseFolder, const ITEM_LIST *itemList, const cha
|
|||||||
if (item) {
|
if (item) {
|
||||||
const char *t;
|
const char *t;
|
||||||
|
|
||||||
DBG_INFO(NULL, "HANDLING topic \"%s\"", topic);
|
DBG_INFO(AQH_LOGDOMAIN, "HANDLING topic \"%s\"", topic);
|
||||||
t=Item_GetDataType(item);
|
t=Item_GetDataType(item);
|
||||||
if (t && strcasecmp(t, "json")==0)
|
if (t && strcasecmp(t, "json")==0)
|
||||||
_handleJsonMsgForItem(baseFolder, item, value);
|
_handleJsonMsgForItem(baseFolder, item, value);
|
||||||
@@ -116,7 +116,7 @@ void _handlePublish(const char *baseFolder, const ITEM_LIST *itemList, const cha
|
|||||||
_handleRawMsgForItem(baseFolder, item, value);
|
_handleRawMsgForItem(baseFolder, item, value);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
DBG_INFO(NULL, "ignoring topic \"%s\"", topic);
|
DBG_INFO(AQH_LOGDOMAIN, "ignoring topic \"%s\"", topic);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -94,7 +94,7 @@ int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT2 *epTcp)
|
|||||||
int rv;
|
int rv;
|
||||||
|
|
||||||
rv=AQH_MqttClientEndpoint2_StartConnect(epTcp);
|
rv=AQH_MqttClientEndpoint2_StartConnect(epTcp);
|
||||||
if (rv<0) {
|
if (rv<0 && rv!=GWEN_ERROR_IN_PROGRESS) {
|
||||||
DBG_ERROR(NULL, "Error starting to connect (%d)", rv);
|
DBG_ERROR(NULL, "Error starting to connect (%d)", rv);
|
||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
@@ -122,9 +122,6 @@ int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT2 *epTcp, const char *topicFilter)
|
|||||||
DBG_ERROR(NULL, "Error creating message");
|
DBG_ERROR(NULL, "Error creating message");
|
||||||
return GWEN_ERROR_INTERNAL;
|
return GWEN_ERROR_INTERNAL;
|
||||||
}
|
}
|
||||||
DBG_ERROR(NULL, "Sending this message:");
|
|
||||||
GWEN_Text_DumpString((const char*) GWEN_Msg_GetConstBuffer(msgOut), GWEN_Msg_GetBytesInBuffer(msgOut), 2);
|
|
||||||
|
|
||||||
GWEN_MsgEndpoint2_AddSendMessage(epTcp, msgOut);
|
GWEN_MsgEndpoint2_AddSendMessage(epTcp, msgOut);
|
||||||
|
|
||||||
DBG_INFO(NULL, "Waiting for response");
|
DBG_INFO(NULL, "Waiting for response");
|
||||||
@@ -144,6 +141,23 @@ int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT2 *epTcp, const char *topicFilter)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int AqHomeMqttLog_Ping(GWEN_MSG_ENDPOINT2 *epTcp)
|
||||||
|
{
|
||||||
|
GWEN_MSG *msgOut;
|
||||||
|
|
||||||
|
DBG_INFO(AQH_LOGDOMAIN, "Sending PING");
|
||||||
|
msgOut=GWEN_MqttMsg_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL);
|
||||||
|
if (msgOut==NULL) {
|
||||||
|
DBG_ERROR(NULL, "Error creating message");
|
||||||
|
return GWEN_ERROR_INTERNAL;
|
||||||
|
}
|
||||||
|
GWEN_MsgEndpoint2_AddSendMessage(epTcp, msgOut);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT2 *epTcp, uint8_t expectedPacketType, int timeoutInSeconds)
|
GWEN_MSG *_awaitPacket(GWEN_MSG_ENDPOINT2 *epTcp, uint8_t expectedPacketType, int timeoutInSeconds)
|
||||||
{
|
{
|
||||||
time_t startTime;
|
time_t startTime;
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
GWEN_MSG_ENDPOINT2 *AqHomeMqttLog_CreateMqttEndpoint(GWEN_DB_NODE *dbArgs);
|
GWEN_MSG_ENDPOINT2 *AqHomeMqttLog_CreateMqttEndpoint(GWEN_DB_NODE *dbArgs);
|
||||||
int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT2 *epTcp);
|
int AqHomeMqttLog_MqttConnect(GWEN_MSG_ENDPOINT2 *epTcp);
|
||||||
int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT2 *epTcp, const char *topicFilter);
|
int AqHomeMqttLog_Subscribe(GWEN_MSG_ENDPOINT2 *epTcp, const char *topicFilter);
|
||||||
|
int AqHomeMqttLog_Ping(GWEN_MSG_ENDPOINT2 *epTcp);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,2 +1,7 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
export AQHOME_LOGLEVEL=info
|
||||||
|
export LD_LIBRARY_PATH="0-build/aqhome/:$LD_LIBRARY_PATH"
|
||||||
|
|
||||||
|
0-build/apps/aqhome-mqttlog/aqhome-mqttlog -ma 192.168.117.192 -mp 1883 -W /tmp/aqhome/mqttlog -i apps/aqhome-mqttlog/mqttlog.conf --mqttclientid=AQHOMEMQTTLOGTEST
|
||||||
|
|
||||||
AQHOME_LOGLEVEL=info LD_LIBRARY_PATH="../../aqhome/:$LD_LIBRARY_PATH" ./aqhome-mqttlog -ma 192.168.117.192 -mp 1883 -t 127.0.0.1 -W /tmp/aqhome
|
|
||||||
|
|||||||
@@ -183,7 +183,7 @@ int _getBytesNeededForMessage(GWEN_UNUSED GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg)
|
|||||||
|
|
||||||
bytesInMsg=GWEN_Msg_GetBytesInBuffer(msg);
|
bytesInMsg=GWEN_Msg_GetBytesInBuffer(msg);
|
||||||
if (bytesInMsg<2) {
|
if (bytesInMsg<2) {
|
||||||
DBG_INFO(AQH_LOGDOMAIN, "Header not yet complete");
|
DBG_DEBUG(AQH_LOGDOMAIN, "Header not yet complete");
|
||||||
return (int) (2-bytesInMsg);
|
return (int) (2-bytesInMsg);
|
||||||
}
|
}
|
||||||
if (!(GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET))
|
if (!(GWEN_Msg_GetFlags(msg) & GWEN_MSG_FLAGS_PAYLOADINFO_SET))
|
||||||
@@ -196,7 +196,7 @@ int _getBytesNeededForMessage(GWEN_UNUSED GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg)
|
|||||||
return (int)(msgLength-bytesInMsg);
|
return (int)(msgLength-bytesInMsg);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
DBG_INFO(AQH_LOGDOMAIN, "Size field not complete, requesting another byte");
|
DBG_DEBUG(AQH_LOGDOMAIN, "Size field not complete, requesting another byte");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -215,7 +215,7 @@ void _addSockets(GWEN_MSG_ENDPOINT2 *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET
|
|||||||
|
|
||||||
void _checkSockets(GWEN_MSG_ENDPOINT2 *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
|
void _checkSockets(GWEN_MSG_ENDPOINT2 *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
|
||||||
{
|
{
|
||||||
DBG_ERROR(AQH_LOGDOMAIN, "Checking sockets in state %d", GWEN_MsgEndpoint2_GetState(ep));
|
DBG_DEBUG(AQH_LOGDOMAIN, "Checking sockets in state %d", GWEN_MsgEndpoint2_GetState(ep));
|
||||||
if (ep) {
|
if (ep) {
|
||||||
GWEN_MSG_ENDPOINT2 *epChild;
|
GWEN_MSG_ENDPOINT2 *epChild;
|
||||||
|
|
||||||
|
|||||||
@@ -290,7 +290,7 @@ int _getBytesNeededForMessage(GWEN_UNUSED GWEN_MSG_ENDPOINT2 *ep, GWEN_MSG *msg)
|
|||||||
|
|
||||||
bytesInMsg=GWEN_Msg_GetBytesInBuffer(msg);
|
bytesInMsg=GWEN_Msg_GetBytesInBuffer(msg);
|
||||||
if (bytesInMsg<AQH_MSG_OFFS_ALL_DATA_BEGIN) {
|
if (bytesInMsg<AQH_MSG_OFFS_ALL_DATA_BEGIN) {
|
||||||
DBG_INFO(AQH_LOGDOMAIN, "Header not yet complete (%d)", bytesInMsg);
|
DBG_DEBUG(AQH_LOGDOMAIN, "Header not yet complete (%d)", bytesInMsg);
|
||||||
return (int) (AQH_MSG_OFFS_ALL_DATA_BEGIN-bytesInMsg);
|
return (int) (AQH_MSG_OFFS_ALL_DATA_BEGIN-bytesInMsg);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|||||||
@@ -9,4 +9,4 @@ export LD_LIBRARY_PATH="0-build/aqhome/:$LD_LIBRARY_PATH"
|
|||||||
|
|
||||||
# 0-build/apps/aqhomed/aqhomed -l aqhome.log -db aqhome.db -ma 192.168.117.192 -mp 1883 -t 127.0.0.1 -p aqhomed.pid
|
# 0-build/apps/aqhomed/aqhomed -l aqhome.log -db aqhome.db -ma 192.168.117.192 -mp 1883 -t 127.0.0.1 -p aqhomed.pid
|
||||||
|
|
||||||
0-build/apps/aqhomed/aqhomed -l aqhome.log -db aqhome.db -p aqhomed.pid -W /tmp/aqhome -ma 192.168.117.192 -mp 1883 -t 127.0.0.1
|
0-build/apps/aqhomed/aqhomed -l aqhome.log -db aqhome.db -p aqhomed.pid -W /tmp/aqhome/aqhomed -ma 192.168.117.192 -mp 1883 -t 127.0.0.1
|
||||||
|
|||||||
Reference in New Issue
Block a user