Files
aqhomecontrol/apps/aqhome-mqttlog/server.c
2025-03-10 00:02:26 +01:00

1144 lines
30 KiB
C

/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2025 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "./server_p.h"
#include "./s_publish.h"
#include "./s_setdata.h"
#include "./xmlread.h"
#include "./xmlwrite.h"
#include <aqhome/aqhome.h>
#include <aqhome/ipc2/ipc_endpoint.h>
#include <aqhome/ipc2/ipc_server.h>
#include <aqhome/ipc2/tcpd_object.h>
#include <aqhome/ipc2/tcp_object.h>
#include <aqhome/ipc2/tcpd_object.h>
#include <aqhome/ipc2/ipc_client.h>
#include <aqhome/msg/ipc/m_ipc.h>
#include <aqhome/msg/ipc/m_ipc_result.h>
#include <aqhome/msg/ipc/m_ipc_connect.h>
#include <aqhome/msg/ipc/m_ipc_tag16.h>
#include <aqhome/msg/ipc/data/m_ipcd.h>
#include <aqhome/msg/ipc/data/m_ipcd_multidata.h>
#include <aqhome/ipc2/mqtt_endpoint.h>
#include <aqhome/ipc2/mqtt_client.h>
#include <aqhome/msg/mqtt/m_mqtt.h>
#include <aqhome/msg/mqtt/m_mqtt_connect.h>
#include <aqhome/msg/mqtt/m_mqtt_connack.h>
#include <aqhome/msg/mqtt/m_mqtt_subscribe.h>
#include <aqhome/msg/mqtt/m_mqtt_suback.h>
#include <aqhome/msg/mqtt/m_mqtt_publish.h>
#include <aqhome/data/value.h>
#include <gwenhywfar/args.h>
#include <gwenhywfar/misc.h>
#include <gwenhywfar/debug.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
/* ------------------------------------------------------------------------------------------------
* defines
* ------------------------------------------------------------------------------------------------
*/
#define I18N(msg) msg
#define I18S(msg) msg
#define A_ARG GWEN_ARGS_FLAGS_HAS_ARGUMENT
#define A_END (GWEN_ARGS_FLAGS_HELP | GWEN_ARGS_FLAGS_LAST)
#define A_CHAR GWEN_ArgsType_Char
#define A_INT GWEN_ArgsType_Int
#define AQH_MQTTLOG_SERVER_BROKER_RESTARTTIME 10
#define AQH_MQTTLOG_SERVER_MQTT_RESTARTTIME 10
enum {
AQH_MQTTLOG_SERVER_SLOT_BROKERCLOSED=1,
AQH_MQTTLOG_SERVER_SLOT_MQTTCLOSED
};
/* ------------------------------------------------------------------------------------------------
* global vars
* ------------------------------------------------------------------------------------------------
*/
GWEN_INHERIT(AQH_OBJECT, AQH_MQTTLOG_SERVER)
/* ------------------------------------------------------------------------------------------------
* forward declarations
* ------------------------------------------------------------------------------------------------
*/
static void GWENHYWFAR_CB _freeData(void *bp, void *p);
static void _readConfig(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo, GWEN_DB_NODE *dbArgs);
static const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue);
static int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue);
static void _setMqttAddress(AQH_OBJECT *o, const char *s);
static void _setMqttPort(AQH_OBJECT *o, int i);
static void _setMqttClientId(AQH_OBJECT *o, const char *s);
static void _setMqttKeepAlive(AQH_OBJECT *o, int i);
static int _startBroker(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo);
static int _exchangeBrokerConnect(AQH_MQTTLOG_SERVER *xo, uint32_t flags);
static int _startMqtt(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo);
static int _exchangeMqttConnect(AQH_MQTTLOG_SERVER *xo);
static int _exchangeMqttSubscribe(AQH_MQTTLOG_SERVER *xo);
static int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, int param1, void *param2);
static int _handleBrokerDown(AQH_MQTTLOG_SERVER *xo);
static int _handleMqttDown(AQH_MQTTLOG_SERVER *xo);
static void _handleMsgFromBroker(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
static void _handleMsgFromMqtt(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
static void _handleMqttMsgPingRsp(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg);
static int _createPidFile(const char *pidFilename);
static int _diffInSeconds(time_t t1, time_t t0);
static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs);
/* ------------------------------------------------------------------------------------------------
* code
* ------------------------------------------------------------------------------------------------
*/
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
* constructor, destructor
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
*/
AQH_OBJECT *AQH_MqttLogServer_new(AQH_EVENT_LOOP *eventLoop)
{
AQH_OBJECT *o;
AQH_MQTTLOG_SERVER *xo;
o=AQH_Object_new(eventLoop);
GWEN_NEW_OBJECT(AQH_MQTTLOG_SERVER, xo);
GWEN_INHERIT_SETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o, xo, _freeData);
AQH_Object_SetSignalHandlerFn(o, _handleSignal);
xo->timeoutInSeconds=5;
return o;
}
void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
{
AQH_MQTTLOG_SERVER *xo;
xo=(AQH_MQTTLOG_SERVER*) p;
AQH_Object_free(xo->mqttEndpoint);
AQH_Object_free(xo->brokerEndpoint);
GWEN_FREE_OBJECT(xo);
}
AQH_MQTTLOG_SERVER *AQH_MqttLogServer_GetServerData(const AQH_OBJECT *o)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
return xo;
}
return NULL;
}
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
* getter, setter
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
*/
int AQH_MqttLogServer_GetTimeout(const AQH_OBJECT *o)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo)
return xo->timeout;
}
return 0;
}
void AQH_MqttLogServer_SetPidFile(AQH_OBJECT *o, const char *s)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
free(xo->pidFile);
xo->pidFile=s?strdup(s):NULL;
}
}
}
void AQH_MqttLogServer_SetDeviceFile(AQH_OBJECT *o, const char *s)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
free(xo->deviceFile);
xo->deviceFile=s?strdup(s):NULL;
}
}
}
void _setBrokerAddress(AQH_OBJECT *o, const char *s)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
free(xo->brokerAddress);
xo->brokerAddress=s?strdup(s):NULL;
}
}
}
void _setBrokerPort(AQH_OBJECT *o, int i)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo)
xo->brokerPort=i;
}
}
void _setBrokerClientId(AQH_OBJECT *o, const char *s)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
free(xo->brokerClientId);
xo->brokerClientId=s?strdup(s):NULL;
}
}
}
void _setMqttAddress(AQH_OBJECT *o, const char *s)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
free(xo->mqttAddress);
xo->mqttAddress=s?strdup(s):NULL;
}
}
}
void _setMqttPort(AQH_OBJECT *o, int i)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo)
xo->mqttPort=i;
}
}
void _setMqttClientId(AQH_OBJECT *o, const char *s)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
free(xo->mqttClientId);
xo->mqttClientId=s?strdup(s):NULL;
}
}
}
void _setMqttKeepAlive(AQH_OBJECT *o, int i)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo)
xo->mqttKeepAlive=i;
}
}
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
* init
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
*/
int AQH_MqttLogServer_Init(AQH_OBJECT *o, int argc, char **argv)
{
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
GWEN_DB_NODE *dbArgs;
int rv;
const char *s;
dbArgs=GWEN_DB_Group_new("args");
rv=_readArgs(argc, argv, dbArgs);
if (rv<0) {
DBG_ERROR(NULL, "Error reading args (%d)", rv);
return rv;
}
AQH_MergeConfigFileIntoConfig(dbArgs, "ConfigFile");
_readConfig(o, xo, dbArgs);
s=GWEN_DB_GetCharValue(dbArgs, "loglevel", 0, NULL);
if (s && *s) {
GWEN_LOGGER_LEVEL ll;
ll=GWEN_Logger_Name2Level(s);
GWEN_Logger_SetLevel(NULL, ll);
}
s=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, AQHOME_MQTT_DEFAULT_PIDFILE);
if (s && *s) {
AQH_MqttLogServer_SetPidFile(o, s);
rv=_createPidFile(s);
if (rv<0) {
DBG_ERROR(NULL, "Error creating PID file (%d)", rv);
return rv;
}
}
DBG_INFO(NULL, "Starting Broker Connection");
rv=_startBroker(o, xo);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
return rv;
}
DBG_INFO(NULL, "Starting MQTT Connection");
rv=_startMqtt(o, xo);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
return rv;
}
AQH_MqttLogServer_LoadRuntimeDeviceFiles(o);
AQH_MqttLogServer_ReloadDeviceFiles(o);
return 0;
}
else {
DBG_ERROR(NULL, "Not of type AQH_MQTTLOG_SERVER object");
return GWEN_ERROR_INVALID;
}
}
void _readConfig(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo, GWEN_DB_NODE *dbArgs)
{
xo->dbArgs=dbArgs;
xo->timeout=GWEN_DB_GetIntValue(dbArgs, "timeout", 0, 0);
AQH_MqttLogServer_SetDeviceFile(o, GWEN_DB_GetCharValue(dbArgs, "deviceFile", 0, AQHOME_MQTT_DEFAULT_DEVICEFILE));
_setMqttAddress(o, readCharConfigWithAlt(dbArgs, "mqttAddress", "ConfigFile/mqttAddr", "127.0.0.1"));
_setMqttPort(o, readIntConfigWithAlt(dbArgs, "mqttPort", "ConfigFile/mqttPort", 1883, -1));
_setMqttClientId(o, readCharConfigWithAlt(dbArgs, "mqttClientId", "ConfigFile/mqttClientId", "aqhome-mqttlog"));
_setMqttKeepAlive(o, readIntConfigWithAlt(dbArgs, "mqttKeepAlive", "ConfigFile/mqttKeepAlive", 600, -1));
_setBrokerAddress(o, readCharConfigWithAlt(dbArgs, "brokerAddress", "ConfigFile/brokerAddress", "127.0.0.1"));
_setBrokerPort(o, readIntConfigWithAlt(dbArgs, "brokerPort", "ConfigFile/brokerPort", AQHOME_MQTT_DEFAULT_BROKER_PORT, -1));
_setBrokerClientId(o, GWEN_DB_GetCharValue(dbArgs, "brokerClientId", 0, AQHOME_MQTT_DEFAULT_BROKER_CLIENTID));
}
const char *readCharConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, const char *defaultValue)
{
const char *s;
s=GWEN_DB_GetCharValue(dbArgs, varName, 0, NULL);
if (!(s && *s))
s=GWEN_DB_GetCharValue(dbArgs, altVarName, 0, NULL);
return (s && *s)?s:defaultValue;
}
int readIntConfigWithAlt(GWEN_DB_NODE *dbArgs, const char *varName, const char *altVarName, int defaultValue, int nonValue)
{
int i;
i=GWEN_DB_GetIntValue(dbArgs, varName, 0, nonValue);
if (i==nonValue)
i=GWEN_DB_GetIntValue(dbArgs, altVarName, 0, nonValue);
return (i!=nonValue)?i:defaultValue;
}
int _startBroker(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo)
{
if (xo->brokerEndpoint) {
AQH_Object_Disable(xo->brokerEndpoint);
AQH_Object_free(xo->brokerEndpoint);
xo->brokerEndpoint=NULL;
}
if (xo->brokerAddress && *(xo->brokerAddress) && xo->brokerPort) {
AQH_OBJECT *ep;
int fd;
int rv;
fd=AQH_TcpObject_CreateConnectedSocket(xo->brokerAddress, xo->brokerPort);
if (fd<0) {
DBG_ERROR(NULL, "Error connecting to broker server %s:%d", xo->brokerAddress, xo->brokerPort);
return GWEN_ERROR_IO;
}
DBG_INFO(NULL, "Physically connected to broker server %s:%d", xo->brokerAddress, xo->brokerPort);
ep=AQH_IpcClientObject_new(AQH_Object_GetEventLoop(o), fd);
assert(ep);
AQH_Endpoint_SetServiceName(ep, xo->brokerClientId);
AQH_Object_AddLink(ep, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_MQTTLOG_SERVER_SLOT_BROKERCLOSED, o);
AQH_Object_Enable(ep);
xo->brokerEndpoint=ep;
rv=_exchangeBrokerConnect(xo, 0);
if (rv!=0) {
DBG_ERROR(NULL, "Error connecting to broker: %d", rv);
return (rv<0)?rv:GWEN_ERROR_PERMISSIONS;
}
DBG_NOTICE(NULL, "Connected to broker at %s:%d", xo->brokerAddress, xo->brokerPort);
return 0;
}
else {
DBG_ERROR(NULL, "No server settings");
return GWEN_ERROR_BAD_DATA;
}
return 0;
}
int _exchangeBrokerConnect(AQH_MQTTLOG_SERVER *xo, uint32_t flags)
{
AQH_MESSAGE *msgOut;
uint32_t msgId;
msgId=AQH_Endpoint_GetNextMessageId(xo->brokerEndpoint);
msgOut=AQH_IpcMessageConnect_new(AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION,
AQH_MSGTYPE_IPC_CONNECT_REQ,
msgId, 0,
xo->brokerClientId, NULL, NULL, flags);
AQH_Endpoint_AddMsgOut(xo->brokerEndpoint, msgOut);
return AQH_IpcEndpoint_WaitForResultMsg(xo->brokerEndpoint,
AQH_IPC_PROTOCOL_DATA_ID, AQH_IPC_PROTOCOL_DATA_VERSION, AQH_MSGTYPE_IPC_RESULT,
msgId, xo->timeoutInSeconds);
}
int _startMqtt(AQH_OBJECT *o, AQH_MQTTLOG_SERVER *xo)
{
if (xo->mqttEndpoint) {
AQH_Object_Disable(xo->mqttEndpoint);
AQH_Object_free(xo->mqttEndpoint);
xo->mqttEndpoint=NULL;
}
if (xo->mqttAddress && *(xo->mqttAddress) && xo->mqttPort) {
AQH_OBJECT *ep;
int fd;
int rv;
fd=AQH_TcpObject_CreateConnectedSocket(xo->mqttAddress, xo->mqttPort);
if (fd<0) {
DBG_ERROR(NULL, "Error connecting to MQTT server %s:%d", xo->mqttAddress, xo->mqttPort);
return GWEN_ERROR_IO;
}
DBG_INFO(NULL, "Physically connected to MQTT server %s:%d", xo->mqttAddress, xo->mqttPort);
ep=AQH_MqttClientObject_new(AQH_Object_GetEventLoop(o), fd);
assert(ep);
AQH_Endpoint_SetServiceName(ep, xo->mqttClientId);
AQH_Object_AddLink(ep, AQH_ENDPOINT_SIGNAL_CLOSED, AQH_MQTTLOG_SERVER_SLOT_MQTTCLOSED, o);
AQH_Object_Enable(ep);
xo->mqttEndpoint=ep;
rv=_exchangeMqttConnect(xo);
if (rv!=0) {
DBG_ERROR(NULL, "MQTT: Error exchanging CONNECT request (%d)", rv);
return (rv<0)?rv:GWEN_ERROR_PERMISSIONS;
}
rv=_exchangeMqttSubscribe(xo);
if (rv!=0) {
DBG_ERROR(NULL, "MQTT: Error exchanging SUBSCRIBE request (%d)", rv);
return (rv<0)?rv:GWEN_ERROR_PERMISSIONS;
}
DBG_NOTICE(NULL, "Connected to MQTT at %s:%d", xo->mqttAddress, xo->mqttPort);
return 0;
}
else {
DBG_ERROR(NULL, "No MQTT server settings");
return GWEN_ERROR_BAD_DATA;
}
return 0;
}
int _exchangeMqttConnect(AQH_MQTTLOG_SERVER *xo)
{
AQH_MESSAGE *msg;
msg=AQH_MqttMessageConnect_new("MQTT", 0x04, 0, xo->mqttKeepAlive, xo->mqttClientId, NULL, NULL);
AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msg);
msg=AQH_MqttEndpoint_WaitForConnAckMsg(xo->mqttEndpoint, xo->timeoutInSeconds);
if (msg) {
int resultCode;
resultCode=AQH_MqttMessageConnAck_GetResultCode(msg);
AQH_Message_free(msg);
if (resultCode==AQH_MQTTMSG_CONNACK_RESULT_ACCEPTED) {
DBG_INFO(AQH_LOGDOMAIN, "Positive CONNACK response");
return 0;
}
else {
DBG_ERROR(NULL, "Negative CONNACK response: %d", resultCode);
return GWEN_ERROR_GENERIC;
}
}
else {
DBG_ERROR(NULL, "No CONNACK message received.");
return GWEN_ERROR_GENERIC;
}
}
int _exchangeMqttSubscribe(AQH_MQTTLOG_SERVER *xo)
{
uint16_t pckId;
AQH_MESSAGE *msg;
pckId=AQH_Endpoint_GetNextMessageId(xo->mqttEndpoint);
msg=AQH_MqttMessageSubscribe_new(0, pckId, "#", 0);
AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msg);
msg=AQH_MqttEndpoint_WaitForMsg(xo->mqttEndpoint, AQH_MQTTMSG_MSGTYPE_SUBACK, xo->timeoutInSeconds);
if (msg) {
int resultCode;
resultCode=AQH_MqttMessageSubAck_GetResultCode(msg);
AQH_Message_free(msg);
if (resultCode!=128) {
DBG_INFO(AQH_LOGDOMAIN, "Positive SUBACK response");
return 0;
}
else {
DBG_ERROR(NULL, "Negative SUBACK response: %d", resultCode);
return GWEN_ERROR_GENERIC;
}
}
else {
DBG_ERROR(NULL, "No SUBACK message received.");
return GWEN_ERROR_GENERIC;
}
}
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
* fini
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
*/
void AQH_MqttLogServer_Fini(AQH_OBJECT *o)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
if (xo->pidFile)
remove(xo->pidFile);
}
}
}
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
* broker management functions
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
*/
void AQH_MqttLogServer_HandleBrokerMsgs(AQH_OBJECT *o)
{
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo && xo->brokerEndpoint) {
AQH_MESSAGE *msg;
while( (msg=AQH_Endpoint_GetNextMsgIn(xo->brokerEndpoint)) ) {
AQH_Message_SetObject(msg, xo->brokerEndpoint);
_handleMsgFromBroker(o, xo->brokerEndpoint, msg);
AQH_Message_free(msg);
}
}
}
void _handleMsgFromBroker(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg)
{
GWEN_TAG16_LIST *tagList;
tagList=AQH_IpcMessageTag16_ParsePayload(msg, 0);
if (tagList) {
uint16_t code;
uint8_t protoId;
code=AQH_IpcMessage_GetCode(msg);
protoId=AQH_IpcMessage_GetProtoId(msg);
if (protoId==AQH_IPC_PROTOCOL_DATA_ID) {
DBG_INFO(NULL, "Received IPC packet %d (%x)", (int) code, code);
switch(code) {
case AQH_MSGTYPE_IPC_DATA_SETDATA: AQH_MqttLogServer_HandleSetData(o, msg, tagList); break;
default: break;
}
}
else {
DBG_ERROR(NULL, "Invalid IPC protocol %d (%02x)", protoId, protoId);
}
GWEN_Tag16_List_free(tagList);
}
}
void AQH_MqttLogServer_CheckBrokerConnection(AQH_OBJECT *o)
{
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo && xo->dbArgs) {
if (xo->brokerEndpoint) {
if (AQH_Object_GetFlags(xo->brokerEndpoint) & AQH_OBJECT_FLAGS_DELETE) {
DBG_INFO(NULL, "Deleting broker connection");
AQH_Object_Disable(xo->brokerEndpoint);
AQH_Object_free(xo->brokerEndpoint);
xo->brokerEndpoint=NULL;
}
}
if (xo->brokerEndpoint==NULL) {
time_t now;
now=time(NULL);
if (_diffInSeconds(now, xo->timestampBrokerDown)>AQH_MQTTLOG_SERVER_BROKER_RESTARTTIME) {
int rv;
DBG_INFO(NULL, "Restarting broker connection");
rv=_startBroker(o, xo);
if (rv<0) {
DBG_ERROR(NULL, "here (%d)", rv);
}
}
}
}
}
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
* MQTT management functions
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
*/
void AQH_MqttLogServer_HandleMqttMsgs(AQH_OBJECT *o)
{
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo && xo->mqttEndpoint) {
AQH_MESSAGE *msg;
while( (msg=AQH_Endpoint_GetNextMsgIn(xo->mqttEndpoint)) ) {
AQH_Message_SetObject(msg, xo->mqttEndpoint);
_handleMsgFromMqtt(o, xo->mqttEndpoint, msg);
AQH_Message_free(msg);
}
}
}
void _handleMsgFromMqtt(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg)
{
uint8_t code;
/* exec IPC message */
code=AQH_MqttMessage_GetTypeAndFlags(msg);
switch(code & 0xf0) {
case (AQH_MQTTMSG_MSGTYPE_PUBLISH & 0xf0): AQH_MqttLogServer_HandlePublishMsg(o, ep, msg); break;
case (AQH_MQTTMSG_MSGTYPE_PINGRESP & 0xf0): _handleMqttMsgPingRsp(o, ep, msg); break;
default: break;
}
}
void _handleMqttMsgPingRsp(AQH_OBJECT *o, AQH_OBJECT *ep, const AQH_MESSAGE *msg)
{
DBG_INFO(NULL, "PING response received");
}
void AQH_MqttLogServer_CheckMqttConnection(AQH_OBJECT *o)
{
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
if (xo->mqttEndpoint) {
if (AQH_Object_GetFlags(xo->mqttEndpoint) & AQH_OBJECT_FLAGS_DELETE) {
DBG_INFO(NULL, "Deleting mqtt connection");
AQH_Object_Disable(xo->mqttEndpoint);
AQH_Object_free(xo->mqttEndpoint);
xo->mqttEndpoint=NULL;
}
}
if (xo->mqttEndpoint==NULL) {
time_t now;
now=time(NULL);
if (_diffInSeconds(now, xo->timestampMqttDown)>AQH_MQTTLOG_SERVER_MQTT_RESTARTTIME) {
int rv;
DBG_INFO(NULL, "Restarting MQTT connection");
rv=_startMqtt(o, xo);
if (rv<0) {
DBG_ERROR(NULL, "here (%d)", rv);
}
}
}
}
}
int AQH_MqttLogServer_SendPing(AQH_OBJECT *o)
{
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
AQH_MESSAGE *msgOut;
DBG_INFO(NULL, "Sending PING");
msgOut=AQH_MqttMessage_new(AQH_MQTTMSG_MSGTYPE_PINGREQ, 0, NULL);
AQH_Endpoint_AddMsgOut(xo->mqttEndpoint, msgOut);
return 0;
}
return GWEN_ERROR_INVALID;
}
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
* device management functions
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
*/
AQHMQTT_DEVICE_LIST *AQH_MqttLogServer_GetAvailableDeviceList(const AQH_OBJECT *o)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo)
return xo->availableDeviceList;
}
return NULL;
}
void AQH_MqttLogServer_SetAvailableDeviceList(AQH_OBJECT *o, AQHMQTT_DEVICE_LIST *dl)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
AQHMQTT_Device_List_free(xo->availableDeviceList);
xo->availableDeviceList=dl;
}
}
}
void AQH_MqttLogServer_SetRegisteredDeviceList(AQH_OBJECT *o, AQHMQTT_DEVICE_LIST *dl)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
AQHMQTT_Device_List_free(xo->registeredDeviceList);
xo->registeredDeviceList=dl;
}
}
}
AQHMQTT_DEVICE *AQH_MqttLogServer_FindRegisteredDevice(AQH_OBJECT *o, const char *wantedDeviceId)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo && xo->registeredDeviceList) {
return AQHMQTT_Device_List_GetById(xo->registeredDeviceList, wantedDeviceId);
}
else {
DBG_ERROR(NULL, "No registered devices");
}
}
return NULL;
}
void AQH_MqttLogServer_DumpRegisteredDevices(const AQH_OBJECT *o)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo && xo->registeredDeviceList) {
AQHMQTT_DEVICE *device;
device=AQHMQTT_Device_List_First(xo->registeredDeviceList);
if (device) {
fprintf(stderr, "Registered Devices:\n");
while(device) {
const char *sDeviceName;
const char *sDeviceId;
sDeviceName=AQHMQTT_Device_GetName(device);
sDeviceId=AQHMQTT_Device_GetId(device);
fprintf(stderr, " %s (%s)\n", sDeviceId?sDeviceId:"<no id>", sDeviceName?sDeviceName:"<no name>");
device=AQHMQTT_Device_List_Next(device);
}
}
else {
fprintf(stderr, "No registered devices\n");
}
}
else {
fprintf(stderr, "No registered devices\n");
}
}
}
void AQH_MqttLogServer_ReloadDeviceFiles(AQH_OBJECT *o)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
AQHMQTT_DEVICE_LIST *deviceList;
DBG_INFO(NULL, "Loading devices description files");
deviceList=AQH_MqttLogServer_ReadDataDeviceFiles(o);
if (deviceList)
AQH_MqttLogServer_SetAvailableDeviceList(o, deviceList);
}
}
}
void AQH_MqttLogServer_LoadRuntimeDeviceFiles(AQH_OBJECT *o)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
AQHMQTT_DEVICE_LIST *deviceList;
DBG_INFO(NULL, "Loading registered devices from file \"%s\"", xo->deviceFile);
deviceList=AQH_MqttLogServer_ReadDeviceFile(o, xo->deviceFile);
if (deviceList)
AQH_MqttLogServer_SetRegisteredDeviceList(o, deviceList);
}
}
}
int AQH_MqttLogServer_SaveRuntimeDeviceFiles(AQH_OBJECT *o)
{
if (o) {
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
int rv;
rv=AQH_MqttLogServer_WriteDevicesFile(xo->registeredDeviceList, xo->deviceFile);
if (rv<0) {
DBG_INFO(NULL, "Error writing devices to \"%s\" (%d)", xo->deviceFile, rv);
return rv;
}
}
}
return 0;
}
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
* signal handler
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
*/
int _handleSignal(AQH_OBJECT *o, uint32_t slotId, AQH_OBJECT *senderObject, GWEN_UNUSED int param1, void *param2)
{
AQH_MQTTLOG_SERVER *xo;
xo=GWEN_INHERIT_GETDATA(AQH_OBJECT, AQH_MQTTLOG_SERVER, o);
if (xo) {
switch(slotId) {
case AQH_MQTTLOG_SERVER_SLOT_BROKERCLOSED: return _handleBrokerDown(xo);
case AQH_MQTTLOG_SERVER_SLOT_MQTTCLOSED: return _handleMqttDown(xo);
default:
break;
}
}
return 0; /* not handled */
}
int _handleBrokerDown(AQH_MQTTLOG_SERVER *xo)
{
if (xo->brokerEndpoint) {
DBG_WARN(NULL, "Broker connection down");
AQH_Object_AddFlags(xo->brokerEndpoint, AQH_OBJECT_FLAGS_DELETE);
xo->timestampBrokerDown=time(NULL);
}
return 1;
}
int _handleMqttDown(AQH_MQTTLOG_SERVER *xo)
{
if (xo->mqttEndpoint) {
DBG_WARN(NULL, "MQTT connection down");
AQH_Object_AddFlags(xo->mqttEndpoint, AQH_OBJECT_FLAGS_DELETE);
xo->timestampMqttDown=time(NULL);
}
return 1;
}
/* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
* helper functions
* xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
*/
int _createPidFile(const char *pidFilename)
{
FILE *f;
int pidfd;
if (remove(pidFilename)==0) {
DBG_ERROR(0, "Old PID file existed, removed. (Unclean shutdown?)");
}
#ifdef HAVE_SYS_STAT_H
pidfd = open(pidFilename, O_EXCL|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
if (pidfd < 0) {
DBG_ERROR(NULL, "Could not create PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno));
return GWEN_ERROR_IO;
}
f = fdopen(pidfd, "w");
#else /* HAVE_STAT_H */
f=fopen(pidFilename,"w+");
#endif /* HAVE_STAT_H */
/* write pid */
#ifdef HAVE_GETPID
fprintf(f,"%d\n",getpid());
#else
fprintf(f,"-1\n");
#endif
if (fclose(f)) {
DBG_ERROR(0, "Could not close PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno));
return GWEN_ERROR_IO;
}
return 0;
}
int _diffInSeconds(time_t t1, time_t t0)
{
return t1-t0;
}
int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs)
{
int rv;
const GWEN_ARGS args[]= {
/* flags type name min max s long short_descr, long_descr */
{ A_ARG, A_CHAR, "loglevel", 0, 1, "L", "loglevel", I18S("Specify loglevel"), NULL},
{ A_ARG, A_CHAR, "cfgdir", 0, 1, "D", "cfgdir", I18S("Specify the configuration folder"), NULL},
{ A_ARG, A_CHAR, "charset", 0, 1, NULL, "charset", I18S("Specify the output character set"), NULL},
{ A_ARG, A_CHAR, "mqttAddress", 0, 1, "t", "mqttaddress", I18S("Address of MQTT server"), NULL},
{ A_ARG, A_INT, "mqttPort", 0, 1, "P", "mqttport", I18S("Port of MQTT server (default: 1883)"), NULL},
{ A_ARG, A_CHAR, "mqttClientId", 0, 1, NULL, "mqttclientid", I18S("MQTT client id"), NULL},
{ A_ARG, A_INT, "mqttKeepAlive", 0, 1, "P", "mqttkeepalive", I18S("MQTT keep-alive time"), NULL},
{ A_ARG, A_CHAR, "brokerAddress", 0, 1, "ba", "brokerddress", I18S("Broker address [127.0.0.1]"), NULL},
{ A_ARG, A_INT, "brokerPort", 0, 1, "bp", "brokerport", I18S("Broker port [1899]"), NULL},
{ A_ARG, A_CHAR, "brokerClientId", 0, 1, NULL, "brokerclientid", I18S("Broker client id"), NULL},
{ A_ARG, A_CHAR, "deviceFile", 0, 1, "d", "devicefile", I18S("Device file"), NULL},
{ A_ARG, A_CHAR, "pidfile", 0, 1, "p", "pidfile", I18S("PID file"), NULL},
{ A_ARG, A_INT, "timeout", 0, 1, "T", NULL, I18S("timeout in seconds [0]"), NULL},
{ A_END, A_INT, "help", 0, 0, "h", "help", I18S("Show this help screen"), NULL}
};
rv=GWEN_Args_Check(argc, argv, 1, 0, args, dbArgs);
if (rv==GWEN_ARGS_RESULT_ERROR) {
fprintf(stderr, "ERROR: Could not parse arguments main\n");
return GWEN_ERROR_INVALID;
}
else if (rv==GWEN_ARGS_RESULT_HELP) {
GWEN_BUFFER *ubuf;
ubuf=GWEN_Buffer_new(0, 1024, 0, 1);
GWEN_Buffer_AppendArgs(ubuf,
I18N("This is version %s.\nUsage: %s [OPTIONS]\n\nOptions:\n"),
AQHOME_VERSION_STRING,
argv[0]);
if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) {
fprintf(stderr, "ERROR: Could not create help string\n");
return 1;
}
GWEN_Buffer_AppendString(ubuf, "\n");
fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(ubuf));
GWEN_Buffer_free(ubuf);
return GWEN_ERROR_CLOSE;
}
return 0;
}