aqhome: adapted server aqhome-mqttlog to events2 api.

This commit is contained in:
Martin Preuss
2025-03-08 01:03:22 +01:00
parent 58c6d12e36
commit ca2103f7b3
38 changed files with 3300 additions and 205 deletions

View File

@@ -49,6 +49,7 @@
msgwriter.h
ipcmsgreader.h
nodemsgreader.h
mqttmsgreader.h
ttyobject.h
tcpd_object.h
tcp_object.h
@@ -58,6 +59,8 @@
ipc_server.h
ipc_client.h
tty_endpoint.h
mqtt_endpoint.h
mqtt_client.h
</headers>
@@ -79,6 +82,7 @@
msgwriter.c
ipcmsgreader.c
nodemsgreader.c
mqttmsgreader.c
ttyobject.c
tcpd_object.c
tcp_object.c
@@ -89,6 +93,8 @@
ipc_client.c
ipc_endpoint.c
tty_endpoint.c
mqtt_endpoint.c
mqtt_client.c
</sources>

View File

@@ -395,11 +395,17 @@ void AQH_Endpoint_AddMsgOut(AQH_OBJECT *o, AQH_MESSAGE *msg)
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_ENDPOINT, o);
if (xo) {
AQH_Message_List_Add(msg, xo->msgOutList);
if (xo->msgWriter && AQH_Message_List_GetCount(xo->msgOutList)==1) {
DBG_INFO(AQH_LOGDOMAIN, "Enabling msgWriter, sending message");
AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg));
AQH_Object_Enable(xo->msgWriter);
if (AQH_Message_GetUsedSize(msg)<1) {
DBG_ERROR(AQH_LOGDOMAIN, "Empty message, not sending");
AQH_Message_free(msg);
}
else {
AQH_Message_List_Add(msg, xo->msgOutList);
if (xo->msgWriter && AQH_Message_List_GetCount(xo->msgOutList)==1) {
DBG_INFO(AQH_LOGDOMAIN, "Enabling msgWriter, sending message");
AQH_MsgWriter_SendMsg(xo->msgWriter, AQH_Message_GetMsgPointer(msg), AQH_Message_GetUsedSize(msg));
AQH_Object_Enable(xo->msgWriter);
}
}
}
}

53
aqhome/ipc2/mqtt_client.c Normal file
View File

@@ -0,0 +1,53 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./mqtt_client.h"
#include <aqhome/ipc2/mqttmsgreader.h>
#include <aqhome/ipc2/msgwriter.h>
#include <aqhome/events2/fdobject.h>
#include <gwenhywfar/debug.h>
#include <unistd.h>
/* ------------------------------------------------------------------------------------------------
* code
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_MqttClientObject_new(AQH_EVENT_LOOP *eventLoop, int fd)
{
int fdCopy;
AQH_OBJECT *fdReader;
AQH_OBJECT *fdWriter;
AQH_OBJECT *msgReader;
AQH_OBJECT *msgWriter;
AQH_OBJECT *endpoint;
fdCopy=dup(fd);
fdReader=AQH_FdObject_new(eventLoop, fd, AQH_FDOBJECT_FDMODE_READ);
msgReader=AQH_MqttMsgReader_new(eventLoop, fdReader);
AQH_Object_Enable(msgReader);
fdWriter=AQH_FdObject_new(eventLoop, fdCopy, AQH_FDOBJECT_FDMODE_WRITE);
msgWriter=AQH_MsgWriter_new(eventLoop, fdWriter);
endpoint=AQH_Endpoint_new(eventLoop, msgReader, msgWriter);
return endpoint;
}

27
aqhome/ipc2/mqtt_client.h Normal file
View File

@@ -0,0 +1,27 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_MQTT_CLIENT_H
#define AQH_MQTT_CLIENT_H
#include <aqhome/ipc2/endpoint.h>
/**
*
* @param eventLoop pointer to eventLoop
* @param fd connected non-blocking socket to work with (see @ref AQH_TcpObject_CreateConnectedSocket).
*/
AQHOME_API AQH_OBJECT *AQH_MqttClientObject_new(AQH_EVENT_LOOP *eventLoop, int fd);
#endif

View File

@@ -0,0 +1,74 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./mqtt_endpoint.h"
#include "aqhome/msg/mqtt/m_mqtt.h"
#include <gwenhywfar/debug.h>
#include <time.h>
AQH_MESSAGE *AQH_MqttEndpoint_WaitForConnAckMsg(AQH_OBJECT *mqttEndpoint, int timeoutInSeconds)
{
return AQH_MqttEndpoint_WaitForMsg(mqttEndpoint, AQH_MQTTMSG_MSGTYPE_CONNACK, timeoutInSeconds);
}
AQH_MESSAGE *AQH_MqttEndpoint_WaitForMsg(AQH_OBJECT *mqttEndpoint, uint8_t t, int timeoutInSeconds)
{
time_t startTime;
startTime=time(NULL);
t&=0xf0;
for (;;) {
AQH_MESSAGE_LIST *msgList;
time_t now;
msgList=AQH_Endpoint_GetMsgInList(mqttEndpoint);
if (msgList) {
AQH_MESSAGE *msg;
msg=AQH_Message_List_First(msgList);
while(msg) {
uint8_t msgTypeAndFlags;
msgTypeAndFlags=AQH_MqttMessage_GetTypeAndFlags(msg);
if ((msgTypeAndFlags & 0xf0) == t) {
AQH_Message_List_Del(msg);
return msg;
}
msg=AQH_Message_List_Next(msg);
}
}
now=time(NULL);
if (now-startTime>timeoutInSeconds) {
DBG_INFO(NULL, "Timeout");
break;
}
AQH_EventLoop_Run(AQH_Object_GetEventLoop(mqttEndpoint), 500);
} /* for */
return NULL;
}

View File

@@ -0,0 +1,23 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_MQTT_ENDPOINT_H
#define AQH_MQTT_ENDPOINT_H
#include <aqhome/ipc2/endpoint.h>
AQHOME_API AQH_MESSAGE *AQH_MqttEndpoint_WaitForConnAckMsg(AQH_OBJECT *mqttEndpoint, int timeoutInSeconds);
AQHOME_API AQH_MESSAGE *AQH_MqttEndpoint_WaitForMsg(AQH_OBJECT *mqttEndpoint, uint8_t t, int timeoutInSeconds);
#endif

182
aqhome/ipc2/mqttmsgreader.c Normal file
View File

@@ -0,0 +1,182 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./mqttmsgreader.h"
#include "./msgreader_p.h"
#include <aqhome/events2/fdobject.h>
#include <gwenhywfar/inherit.h>
#include <gwenhywfar/debug.h>
#include <gwenhywfar/endianfns.h>
#include <gwenhywfar/text.h>
#include <sys/socket.h>
#define AQH_MSG_READER_HEADER_SIZE 2
#define AQH_MSG_READER_MINMSGSIZE 12
#define AQH_MSG_READER_MAXMSGSIZE 10240
#define AQH_MSG_READER_FLAGS_READBODY 0x0001
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static int _readMsg(AQH_OBJECT *o);
static int _readHeaderFromRingbuffer(AQH_MSG_READER *xo);
/* ------------------------------------------------------------------------------------------------
* implementation
* ------------------------------------------------------------------------------------------------
*/
AQH_OBJECT *AQH_MqttMsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject)
{
AQH_OBJECT *o;
o=AQH_MsgReader_new(eventLoop, fdObject);
AQH_MsgReader_SetReadMsgFn(o, _readMsg);
return o;
}
int _readMsg(AQH_OBJECT *o)
{
AQH_MSG_READER *xo;
xo=AQH_MsgReader_GetData(o);
if (xo) {
int rv;
if (!(xo->flags & AQH_MSG_READER_FLAGS_READBODY)) {
DBG_INFO(AQH_LOGDOMAIN, "Reading header");
rv=_readHeaderFromRingbuffer(xo);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
}
if (xo->flags & AQH_MSG_READER_FLAGS_READBODY) {
DBG_INFO(AQH_LOGDOMAIN, "Reading body");
/* reading remainder of msg directly into allocated buffer */
rv=AQH_MsgReader_ReadRemainderFromRingbuffer(o);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv);
return rv;
}
else if (rv==1) {
int msgLen;
uint8_t *msgPtr;
msgLen=xo->bytesReceived;
msgPtr=xo->currentMsgBuf;
xo->bytesReceived=0;
xo->bytesLeft=0;
xo->currentMsgBuf=NULL;
xo->flags&=~AQH_MSG_READER_FLAGS_READBODY;
DBG_ERROR(NULL, "Received message:");
//GWEN_Text_LogString((const char*) msgPtr, msgLen, NULL, GWEN_LoggerLevel_Error);
rv=AQH_Object_EmitSignal(o, AQH_MSG_READER_SIGNAL_MSGRECVD, msgLen, (void*) msgPtr);
if (rv==0) {
DBG_ERROR(AQH_LOGDOMAIN, "Received message ignored");
}
free(msgPtr);
return 1;
}
}
return 0;
}
return GWEN_ERROR_GENERIC;
}
int _readHeaderFromRingbuffer(AQH_MSG_READER *xo)
{
int remainingBytesInBuffer;
uint32_t mqttPayloadLen=0;
int idx=0;
const uint8_t *ptr;
uint8_t lenByte;
int shift=0;
int rv;
ptr=xo->headerBuffer;
remainingBytesInBuffer=xo->bytesReceived;
if (remainingBytesInBuffer>AQH_MSG_READER_HEADERBUFFER_SIZE) {
DBG_ERROR(AQH_LOGDOMAIN, "Error in message (msg size not determined within %d bytes)", remainingBytesInBuffer);
return GWEN_ERROR_BAD_DATA;
}
/* read type and flags (first byte) */
if (xo->bytesReceived<=idx) {
rv=GWEN_RingBuffer_ReadByte(xo->ringBuffer);
if (rv!=-1) {
xo->headerBuffer[idx]=rv;
xo->bytesReceived++;
remainingBytesInBuffer++;
}
else
return 0;
}
idx++;
/* read address bytes */
while(idx<6) { /* max 4 bytes size plus type/flags byte */
if (xo->bytesReceived<=idx) {
rv=GWEN_RingBuffer_ReadByte(xo->ringBuffer);
if (rv!=-1) {
xo->headerBuffer[idx]=rv;
xo->bytesReceived++;
remainingBytesInBuffer++;
}
else
return 0;
}
lenByte=ptr[idx];
mqttPayloadLen+=(lenByte & 0x7f)<<shift;
if (!(lenByte & 0x80)) {
uint32_t fullMsgSize;
/* last byte of size, finish */
fullMsgSize=mqttPayloadLen+idx+1;
xo->bytesLeft=(fullMsgSize-xo->bytesReceived);
xo->currentMsgBuf=(uint8_t*) malloc(fullMsgSize);
memmove(xo->currentMsgBuf, xo->headerBuffer, xo->bytesReceived);
xo->bytesLeft=fullMsgSize-xo->bytesReceived;
xo->flags|=AQH_MSG_READER_FLAGS_READBODY;
DBG_ERROR(AQH_LOGDOMAIN,
"Got size: full size=%d, payload pos=%d, payload size=%d (%04x)",
fullMsgSize, idx+1, mqttPayloadLen, mqttPayloadLen);
return 1; /* size successfully determined */
}
shift+=7;
idx++;
}
DBG_ERROR(AQH_LOGDOMAIN, "Bad MQTT message (could not determine message length)");
return GWEN_ERROR_BAD_DATA;
}

View File

@@ -0,0 +1,20 @@
/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifndef AQH_MQTTMSGREADER_H
#define AQH_MQTTMSGREADER_H
#include <aqhome/events2/object.h>
AQH_OBJECT *AQH_MqttMsgReader_new(AQH_EVENT_LOOP *eventLoop, AQH_OBJECT *fdObject);
#endif

View File

@@ -23,7 +23,7 @@
#define AQH_MSGREADER_SKIPTIME_IN_MS 20
#define AQH_MSGREADER_FLAGS_SKIP 0x0001
#define AQH_MSGREADER_FLAGS_SKIP 0x80000000
@@ -109,8 +109,10 @@ void AQH_MsgReader_SetFlags(AQH_OBJECT *o, uint32_t f)
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo)
if (xo) {
DBG_ERROR(AQH_LOGDOMAIN, "Set flags: %08x", f);
xo->flags=f;
}
}
@@ -120,8 +122,10 @@ void AQH_MsgReader_AddFlags(AQH_OBJECT *o, uint32_t f)
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo)
if (xo) {
DBG_ERROR(AQH_LOGDOMAIN, "Adding flags: %08x", f);
xo->flags|=f;
}
}
@@ -131,8 +135,10 @@ void AQH_MsgReader_SubFlags(AQH_OBJECT *o, uint32_t f)
AQH_MSG_READER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo)
if (xo) {
DBG_ERROR(AQH_LOGDOMAIN, "Clearing flags: %08x", f);
xo->flags&=~f;
}
}
@@ -339,35 +345,45 @@ int AQH_MsgReader_ReadRemainderFromRingbuffer(AQH_OBJECT *o)
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MSG_READER, o);
if (xo) {
uint32_t bytesInRingBuffer;
uint32_t bytesToRead;
int rv;
if (xo->bytesLeft==0) {
/* msg finished */
DBG_INFO(AQH_LOGDOMAIN, "Message complete");
return 1;
}
else {
uint32_t bytesInRingBuffer;
uint32_t bytesToRead;
int rv;
bytesInRingBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer);
/* still reading header */
bytesToRead=xo->bytesLeft;
if (bytesInRingBuffer<bytesToRead)
bytesToRead=bytesInRingBuffer;
if (bytesToRead) {
uint32_t xferSize;
xferSize=bytesToRead;
rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->currentMsgBuf+xo->bytesReceived), &xferSize);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer empty");
return 0;
bytesInRingBuffer=GWEN_RingBuffer_GetUsedBytes(xo->ringBuffer);
/* still reading header */
bytesToRead=xo->bytesLeft;
if (bytesInRingBuffer<bytesToRead)
bytesToRead=bytesInRingBuffer;
if (bytesToRead) {
uint32_t xferSize;
xferSize=bytesToRead;
rv=GWEN_RingBuffer_ReadBytes(xo->ringBuffer, (char*) (xo->currentMsgBuf+xo->bytesReceived), &xferSize);
if (rv<0) {
DBG_INFO(AQH_LOGDOMAIN, "Ringbuffer empty");
return 0;
}
if (xferSize<bytesToRead) {
DBG_ERROR(AQH_LOGDOMAIN, "Read fewer bytes than available?");
return GWEN_ERROR_GENERIC;
}
xo->bytesReceived+=xferSize;
xo->bytesLeft-=xferSize;
if (xo->bytesLeft==0) {
/* msg finished */
DBG_INFO(AQH_LOGDOMAIN, "Message complete");
return 1;
}
}
if (xferSize<bytesToRead) {
DBG_ERROR(AQH_LOGDOMAIN, "Read fewer bytes than available?");
return GWEN_ERROR_GENERIC;
}
xo->bytesReceived+=xferSize;
xo->bytesLeft-=xferSize;
if (xo->bytesLeft==0) {
/* msg finished */
DBG_INFO(AQH_LOGDOMAIN, "Message complete");
return 1;
else {
DBG_ERROR(AQH_LOGDOMAIN, "Nothing to read??");
}
}
return 0;