aqhomed, aqhome: added MQTT endpoint, publish value changes via MQTT.

This commit is contained in:
Martin Preuss
2023-03-29 16:46:32 +02:00
parent bf4451f3f4
commit 5d5446ad90
8 changed files with 538 additions and 24 deletions

View File

@@ -17,6 +17,7 @@
#include <aqhome/msg/endpoint_log.h>
#include <aqhome/msg/endpoint_tty.h>
#include <aqhome/ipc/endpoint_node_ipc_tcp.h>
#include <aqhome/mqtt/endpoint_mqttc.h>
#include <gwenhywfar/gwenhywfar.h>
#include <gwenhywfar/args.h>
@@ -52,6 +53,12 @@
static int _serve(GWEN_DB_NODE *dbArgs);
static GWEN_MSG_ENDPOINT_MGR *_setupService(GWEN_DB_NODE *dbArgs);
static int _setupTty(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs);
static int _setupLog(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs);
static int _setupIpc(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs);
static int _setupMqtt(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs);
static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs);
static int _createPidFile(const char *pidFilename);
@@ -67,6 +74,7 @@ static int stopService=0;
int main(int argc, char **argv)
{
GWEN_DB_NODE *dbArgs;
@@ -162,47 +170,96 @@ GWEN_MSG_ENDPOINT_MGR *_setupService(GWEN_DB_NODE *dbArgs)
{
GWEN_MSG_ENDPOINT_MGR *emgr;
int nodeAddress;
const char *devicePath;
const char *logFile;
const char *tcpAddress;
int tcpPort;
int rv;
nodeAddress=GWEN_DB_GetIntValue(dbArgs, "nodeAddress", 0, 240);
logFile=GWEN_DB_GetCharValue(dbArgs, "logfile", 0, NULL);
devicePath=GWEN_DB_GetCharValue(dbArgs, "device", 0, "/dev/ttyUSB0");
tcpAddress=GWEN_DB_GetCharValue(dbArgs, "tcpAddress", 0, NULL);
tcpPort=GWEN_DB_GetIntValue(dbArgs, "tcpPort", 0, 45454);
emgr=AQH_MsgManager_new(nodeAddress & 0xff);
rv=_setupTty(emgr, dbArgs);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
GWEN_MsgEndpointMgr_free(emgr);
return NULL;
}
rv=_setupLog(emgr, dbArgs);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
GWEN_MsgEndpointMgr_free(emgr);
return NULL;
}
rv=_setupIpc(emgr, dbArgs);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
GWEN_MsgEndpointMgr_free(emgr);
return NULL;
}
rv=_setupMqtt(emgr, dbArgs);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
GWEN_MsgEndpointMgr_free(emgr);
return NULL;
}
return emgr;
}
int _setupTty(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs)
{
const char *devicePath;
devicePath=GWEN_DB_GetCharValue(dbArgs, "device", 0, "/dev/ttyUSB0");
if (devicePath && *devicePath) {
GWEN_MSG_ENDPOINT *epTty;
epTty=AQH_TtyNodeEndpoint_new(devicePath, AQH_MSGMGR_ENDPOINTGROUP_NODE);
if (epTty==NULL) {
DBG_ERROR(NULL, "Error creating endpoint TTY");
GWEN_MsgEndpointMgr_free(emgr);
return NULL;
return GWEN_ERROR_GENERIC;
}
GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTty);
}
else {
DBG_ERROR(NULL, "Missing device path");
GWEN_MsgEndpointMgr_free(emgr);
return NULL;
return GWEN_ERROR_GENERIC;
}
return 0;
}
int _setupLog(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs)
{
const char *logFile;
logFile=GWEN_DB_GetCharValue(dbArgs, "logfile", 0, NULL);
if (logFile && *logFile) {
GWEN_MSG_ENDPOINT *epLog;
epLog=AQH_LogEndpoint_new(logFile, AQH_MSGMGR_ENDPOINTGROUP_NODE);
if (epLog==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint LOG");
GWEN_MsgEndpointMgr_free(emgr);
return NULL;
return GWEN_ERROR_GENERIC;
}
GWEN_MsgEndpointMgr_AddEndpoint(emgr, epLog);
}
return 0;
}
int _setupIpc(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs)
{
const char *tcpAddress;
int tcpPort;
tcpAddress=GWEN_DB_GetCharValue(dbArgs, "tcpAddress", 0, NULL);
tcpPort=GWEN_DB_GetIntValue(dbArgs, "tcpPort", 0, 45454);
if (tcpAddress && *tcpAddress && tcpPort) {
GWEN_MSG_ENDPOINT *epTcp;
@@ -210,17 +267,49 @@ GWEN_MSG_ENDPOINT_MGR *_setupService(GWEN_DB_NODE *dbArgs)
epTcp=AQH_TcpdIpcNodeEndpoint_new(tcpAddress, tcpPort, NULL, AQH_MSGMGR_ENDPOINTGROUP_IPC);
if (epTcp==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint TCP");
GWEN_MsgEndpointMgr_free(emgr);
return NULL;
return GWEN_ERROR_GENERIC;
}
GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTcp);
}
return emgr;
return 0;
}
int _setupMqtt(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_DB_NODE *dbArgs)
{
const char *mqttAddress;
int mqttPort;
const char *mqttClientId;
const char *mqttTopicPrefix;
int mqttKeepAlive;
mqttAddress=GWEN_DB_GetCharValue(dbArgs, "mqttAddress", 0, NULL);
mqttPort=GWEN_DB_GetIntValue(dbArgs, "mqttPort", 0, 1883);
mqttClientId=GWEN_DB_GetCharValue(dbArgs, "mqttClientId", 0, "aqhomed");
mqttTopicPrefix=GWEN_DB_GetCharValue(dbArgs, "mqttTopicPrefix", 0, "aqhome/sensors");
mqttKeepAlive=GWEN_DB_GetIntValue(dbArgs, "mqttKeepAlive", 0, 600);
if (mqttAddress && *mqttAddress && mqttPort) {
GWEN_MSG_ENDPOINT *epMqtt;
epMqtt=AQH_MqttClientEndpoint_new(mqttAddress, mqttPort, NULL, AQH_MSGMGR_ENDPOINTGROUP_NODE|AQH_MSGMGR_ENDPOINTGROUP_MQTT);
if (epMqtt==NULL) {
DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint TCP");
return GWEN_ERROR_GENERIC;
}
if (mqttClientId && *mqttClientId)
AQH_MqttClientEndpoint_SetClientId(epMqtt, mqttClientId);
if (mqttTopicPrefix && *mqttTopicPrefix)
AQH_MqttClientEndpoint_SetTopicPrefix(epMqtt, mqttTopicPrefix);
AQH_MqttClientEndpoint_SetKeepAliveTime(epMqtt, mqttKeepAlive);
GWEN_MsgEndpointMgr_AddEndpoint(emgr, epMqtt);
}
return 0;
}
int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs)
{
@@ -303,6 +392,61 @@ int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs)
I18S("Specify the TCP port to listen on"),
I18S("Specify the TCP port to listen on")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"mqttAddress", /* name */
0, /* minnum */
1, /* maxnum */
"ma", /* short option */
"mqttaddress", /* long option */
I18S("Specify the address of the MQTT server to connect to (disabled if missing)"),
I18S("Specify the address of the MQTT server to connect to (disabled if missing)")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Int, /* type */
"mqttPort", /* name */
0, /* minnum */
1, /* maxnum */
"mp", /* short option */
"mqttport", /* long option */
I18S("Specify the port of the MQTT server (default: 1883)"),
I18S("Specify the port of the MQTT server (default: 1883)")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"mqttClientId", /* name */
0, /* minnum */
1, /* maxnum */
NULL, /* short option */
"mqttclientid", /* long option */
I18S("Specify client id for the MQTT server (default: \"aqhomed\")"),
I18S("Specify client id for the MQTT server (default: \"aqhomed\")")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */
"mqttTopicPrefix", /* name */
0, /* minnum */
1, /* maxnum */
"mt", /* short option */
"mqtttopicprefix", /* long option */
I18S("Specify prefix of MQTT topics when publishing (defaults to \"aqhome/sensors\")"),
I18S("Specify prefix of MQTT topics when publishing (defaults to \"aqhome/sensors\")")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Int, /* type */
"mqttKeepAlive", /* name */
0, /* minnum */
1, /* maxnum */
"mk", /* short option */
"mqttkeepalive", /* long option */
I18S("Specify keepalive time in seconds (defaults: 600)"),
I18S("Specify keepalive time in seconds (defaults: 600)")
},
{
GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */
GWEN_ArgsType_Char, /* type */