From d1c21322b899329f9d22d3df73695e694e9a1a5e Mon Sep 17 00:00:00 2001 From: Martin Preuss Date: Sat, 18 Mar 2023 23:25:21 +0100 Subject: [PATCH] More work on IPC code, added aqhomed daemon. --- 0BUILD | 15 + apps/0BUILD | 9 + apps/aqhomed/0BUILD | 62 ++++ apps/aqhomed/main.c | 463 +++++++++++++++++++++++++++++ aqhome/0BUILD | 4 + aqhome/ipc/0BUILD | 84 ++++++ aqhome/ipc/endpoint_node_ipc.c | 64 ++++ aqhome/ipc/endpoint_node_ipc.h | 24 ++ aqhome/ipc/endpoint_node_ipc_tcp.c | 63 ++++ aqhome/ipc/endpoint_node_ipc_tcp.h | 22 ++ aqhome/ipc/msg_forward.c | 82 +++++ aqhome/ipc/msg_forward.h | 37 +++ aqhome/ipc/msg_ipc.c | 16 + aqhome/ipc/msg_ipc.h | 30 ++ aqhome/libtest.c | 18 +- aqhome/msg/0BUILD | 4 +- aqhome/msg/endpoint_node.c | 15 +- aqhome/msg/endpoint_node.h | 1 + aqhome/msg/endpointmgr.c | 31 +- aqhome/msg/endpointmgr.h | 9 +- aqhome/msgmanager.c | 81 +++++ aqhome/msgmanager.h | 32 ++ aqhome/nodes/nodedb.c | 22 +- 23 files changed, 1127 insertions(+), 61 deletions(-) create mode 100644 apps/0BUILD create mode 100644 apps/aqhomed/0BUILD create mode 100644 apps/aqhomed/main.c create mode 100644 aqhome/ipc/0BUILD create mode 100644 aqhome/ipc/endpoint_node_ipc.c create mode 100644 aqhome/ipc/endpoint_node_ipc.h create mode 100644 aqhome/ipc/endpoint_node_ipc_tcp.c create mode 100644 aqhome/ipc/endpoint_node_ipc_tcp.h create mode 100644 aqhome/ipc/msg_forward.c create mode 100644 aqhome/ipc/msg_forward.h create mode 100644 aqhome/ipc/msg_ipc.c create mode 100644 aqhome/ipc/msg_ipc.h create mode 100644 aqhome/msgmanager.c create mode 100644 aqhome/msgmanager.h diff --git a/0BUILD b/0BUILD index dcc4aca..b0586d9 100644 --- a/0BUILD +++ b/0BUILD @@ -27,6 +27,7 @@ $(project_vmajor).$(project_vminor).$(project_vpatchlevel).$(project_vbuild)$(project_vtag) + @@ -38,6 +39,7 @@ $(option_prefix)/bin $(option_prefix)/etc $(option_prefix)/bin + $(option_prefix)/sbin $(option_prefix)/lib $(option_prefix)/include $(option_prefix)/share @@ -49,6 +51,18 @@ $(datadir)/$(package) + + signal.h + sys/stat.h + sys/types.h + + + + + getpid + + + @@ -79,6 +93,7 @@ avr aqhome + apps diff --git a/apps/0BUILD b/apps/0BUILD new file mode 100644 index 0000000..cb2e91d --- /dev/null +++ b/apps/0BUILD @@ -0,0 +1,9 @@ + + + + + + aqhomed + + + diff --git a/apps/aqhomed/0BUILD b/apps/aqhomed/0BUILD new file mode 100644 index 0000000..9b5043f --- /dev/null +++ b/apps/aqhomed/0BUILD @@ -0,0 +1,62 @@ + + + + + + + + $(gwenhywfar_cflags) + -I$(topsrcdir) + -I$(topbuilddir) + + + + --include=$(builddir) + --include=$(srcdir) + + + $(visibility_cflags) + + + + + + + + + + + + + + + + + + + + + $(local/typefiles) + main.c + + + + aqhome + + + + $(gwenhywfar_libs) + + + + + + + + + + + + + + diff --git a/apps/aqhomed/main.c b/apps/aqhomed/main.c new file mode 100644 index 0000000..ada1521 --- /dev/null +++ b/apps/aqhomed/main.c @@ -0,0 +1,463 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#ifdef HAVE_SIGNAL_H +# include +#endif + +#ifdef HAVE_SYS_TYPES_H +# include +#endif + +#ifdef HAVE_SYS_STAT_H +# include +#endif + +#include +#include +#include +#include +#include + + +#define I18N(msg) msg +#define I18S(msg) msg + + + + +static int _serve(GWEN_DB_NODE *dbArgs); +static GWEN_MSG_ENDPOINT_MGR *_setupService(GWEN_DB_NODE *dbArgs); +static int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs); +static int _createPidFile(const char *pidFilename); + + +#ifdef HAVE_SIGNAL_H +static int _setSignalHandlers(void); +static int _setupSigAction(struct sigaction *sa, int sig); +static void _signalHandler(int s); +static struct sigaction saINT,saTERM, saHUP, saTSTP, saCONT; +#endif + +static int stopService=0; + + + +int main(int argc, char **argv) +{ + GWEN_DB_NODE *dbArgs; + int rv; + GWEN_GUI *gui; + const char *s; + + rv=GWEN_Init(); + if (rv) { + fprintf(stderr, "ERROR: Unable to init Gwen.\n"); + return 2; + } + + GWEN_Logger_Open(0, "aqhomed", 0, GWEN_LoggerType_Console, GWEN_LoggerFacility_User); + GWEN_Logger_SetLevel(0, GWEN_LoggerLevel_Warning); + GWEN_Logger_SetLevel(0, GWEN_LoggerLevel_Info); + + rv=AQH_Init(); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return 2; + } + + dbArgs=GWEN_DB_Group_new("arguments"); + rv=_readArgs(argc, argv, dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return 2; + } + else if (rv==1) { + DBG_INFO(NULL, "Help printed, done"); + return 0; + } + + gui=GWEN_Gui_CGui_new(); + s=GWEN_DB_GetCharValue(dbArgs, "charset", 0, NULL); + if (s && *s) + GWEN_Gui_SetCharSet(gui, s); + GWEN_Gui_SetGui(gui); + + rv=_serve(dbArgs); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return 2; + } + + return 0; +} + + + +int _serve(GWEN_DB_NODE *dbArgs) +{ + const char *pidFile; + GWEN_MSG_ENDPOINT_MGR *emgr; + int rv; + + rv=_setSignalHandlers(); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + + pidFile=GWEN_DB_GetCharValue(dbArgs, "pidfile", 0, "aqhomed.pid"); + if (pidFile && *pidFile) { + rv=_createPidFile(pidFile); + if (rv<0) { + DBG_INFO(NULL, "here (%d)", rv); + return rv; + } + } + + emgr=_setupService(dbArgs); + if (emgr==NULL) { + DBG_INFO(NULL, "Error setting up service"); + return GWEN_ERROR_GENERIC; + } + + while(!stopService) { + DBG_DEBUG(NULL, "Next loop"); + AQH_MsgManager_LoopOnce(emgr); + } + + if (pidFile && *pidFile) + remove(pidFile); + + return 0; +} + + + +GWEN_MSG_ENDPOINT_MGR *_setupService(GWEN_DB_NODE *dbArgs) +{ + GWEN_MSG_ENDPOINT_MGR *emgr; + int nodeAddress; + const char *devicePath; + const char *logFile; + const char *tcpAddress; + int tcpPort; + + nodeAddress=GWEN_DB_GetIntValue(dbArgs, "nodeAddress", 0, 240); + logFile=GWEN_DB_GetCharValue(dbArgs, "logfile", 0, NULL); + devicePath=GWEN_DB_GetCharValue(dbArgs, "device", 0, "/dev/ttyUSB0"); + tcpAddress=GWEN_DB_GetCharValue(dbArgs, "tcpAddress", 0, NULL); + tcpPort=GWEN_DB_GetIntValue(dbArgs, "tcpPort", 0, 45454); + + emgr=AQH_MsgManager_new(nodeAddress & 0xff); + + if (devicePath && *devicePath) { + GWEN_MSG_ENDPOINT *epTty; + + epTty=AQH_TtyNodeEndpoint_new(devicePath, AQH_MSGMGR_ENDPOINTGROUP_NODE); + if (epTty==NULL) { + DBG_ERROR(NULL, "Error creating endpoint TTY"); + GWEN_MsgEndpointMgr_free(emgr); + return NULL; + } + GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTty); + } + else { + DBG_ERROR(NULL, "Missing device path"); + GWEN_MsgEndpointMgr_free(emgr); + return NULL; + } + + if (logFile && *logFile) { + GWEN_MSG_ENDPOINT *epLog; + + epLog=AQH_LogEndpoint_new(logFile, AQH_MSGMGR_ENDPOINTGROUP_NODE); + if (epLog==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint LOG"); + GWEN_MsgEndpointMgr_free(emgr); + return NULL; + } + GWEN_MsgEndpointMgr_AddEndpoint(emgr, epLog); + } + + if (tcpAddress && *tcpAddress && tcpPort) { + GWEN_MSG_ENDPOINT *epTcp; + + epTcp=AQH_TcpIpcNodeEndpoint_new(NULL, tcpAddress, tcpPort, AQH_MSGMGR_ENDPOINTGROUP_IPC); + if (epTcp==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint TCP"); + GWEN_MsgEndpointMgr_free(emgr); + return NULL; + } + GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTcp); + } + + return emgr; +} + + + + +int _readArgs(int argc, char **argv, GWEN_DB_NODE *dbArgs) +{ + int rv; + const GWEN_ARGS args[]= { + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "cfgdir", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "D", /* short option */ + "cfgdir", /* long option */ + I18S("Specify the configuration folder"), + I18S("Specify the configuration folder") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "charset", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + 0, /* short option */ + "charset", /* long option */ + I18S("Specify the output character set"), /* short description */ + I18S("Specify the output character set") /* long description */ + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "device", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "d", /* short option */ + "device", /* long option */ + I18S("Specify the device to communicate with (e.g. /dev/ttyUSB0)"), + I18S("Specify the device to communicate with (e.g. /dev/ttyUSB0)") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "nodeAddress", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "n", /* short option */ + "node", /* long option */ + I18S("Specify the node address for the AqHome node adaptor (default 240)"), + I18S("Specify the node address for the AqHome node adaptor (default 240)") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "logFile", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "l", /* short option */ + "logfile", /* long option */ + I18S("Specify a logfile to log received messages to"), + I18S("Specify a logfile to log received messages to") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "tcpAddress", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "t", /* short option */ + "tcpaddress", /* long option */ + I18S("Specify the TCP address to listen on (disabled if missing)"), + I18S("Specify the TCP address to listen on (disabled if missing)") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Int, /* type */ + "tcpPort", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "P", /* short option */ + "tcpport", /* long option */ + I18S("Specify the TCP port to listen on"), + I18S("Specify the TCP port to listen on") + }, + { + GWEN_ARGS_FLAGS_HAS_ARGUMENT, /* flags */ + GWEN_ArgsType_Char, /* type */ + "pidfile", /* name */ + 0, /* minnum */ + 1, /* maxnum */ + "p", /* short option */ + "pidfile", /* long option */ + I18S("Specify the PID file"), + I18S("Specify the PID file") + }, + { + GWEN_ARGS_FLAGS_HELP | GWEN_ARGS_FLAGS_LAST, /* flags */ + GWEN_ArgsType_Int, /* type */ + "help", /* name */ + 0, /* minnum */ + 0, /* maxnum */ + "h", /* short option */ + "help", + I18S("Show this help screen."), + I18S("Show this help screen.") + } + }; + + rv=GWEN_Args_Check(argc, argv, 1, 0, args, dbArgs); + if (rv==GWEN_ARGS_RESULT_ERROR) { + fprintf(stderr, "ERROR: Could not parse arguments main\n"); + return GWEN_ERROR_INVALID; + } + else if (rv==GWEN_ARGS_RESULT_HELP) { + GWEN_BUFFER *ubuf; + + ubuf=GWEN_Buffer_new(0, 1024, 0, 1); + GWEN_Buffer_AppendArgs(ubuf, + I18N("This is version %s.\nUsage: %s [OPTIONS]\n\nOptions:\n"), + AQHOME_VERSION_STRING, + argv[0]); + if (GWEN_Args_Usage(args, ubuf, GWEN_ArgsOutType_Txt)) { + fprintf(stderr, "ERROR: Could not create help string\n"); + return 1; + } + GWEN_Buffer_AppendString(ubuf, "\n"); + + fprintf(stdout, "%s\n", GWEN_Buffer_GetStart(ubuf)); + GWEN_Buffer_free(ubuf); + return GWEN_ERROR_CLOSE; + } + return 0; +} + + + +int _setSignalHandlers(void) +{ +#ifdef HAVE_SIGNAL_H + int rv; + + rv=_setupSigAction(&saINT, SIGINT); + if (rv) + return rv; + + rv=_setupSigAction(&saTERM, SIGTERM); + if (rv) + return rv; + + rv=_setupSigAction(&saHUP, SIGHUP); + if (rv) + return rv; + +# ifdef SIGTSTP + rv=_setupSigAction(&saTSTP, SIGTSTP); + if (rv) + return rv; +# endif + +# ifdef SIGCONT + rv=_setupSigAction(&saCONT, SIGCONT); + if (rv) + return rv; +# endif +#endif + return 0; +} + + + +int _setupSigAction(struct sigaction *sa, int sig) +{ + sa->sa_handler=_signalHandler; + sigemptyset(&sa->sa_mask); + sa->sa_flags=0; + if (sigaction(sig, sa, 0)) { + DBG_ERROR(NULL, "Could not setup signal handler for signal %d", sig); + return GWEN_ERROR_IO; + } + + return 0; +} + + + +void _signalHandler(int s) +{ + switch(s) { + case SIGINT: + case SIGTERM: + case SIGHUP: + DBG_WARN(0, "Received signal %d, stopping service in next loop.",s); + stopService=1; + break; + default: + DBG_WARN(0, "Unknown signal %d",s); + break; + } +} + + + +int _createPidFile(const char *pidFilename) +{ + FILE *f; + int pidfd; + + if (remove(pidFilename)==0) { + DBG_ERROR(0, "Old PID file existed, removed. (Unclean shutdown?)"); + } + +#ifdef HAVE_SYS_STAT_H + pidfd = open(pidFilename, O_EXCL|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); + if (pidfd < 0) { + DBG_ERROR(NULL, "Could not create PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + + f = fdopen(pidfd, "w"); +#else /* HAVE_STAT_H */ + f=fopen(pidFilename,"w+"); +#endif /* HAVE_STAT_H */ + + /* write pid */ +#ifdef HAVE_GETPID + fprintf(f,"%d\n",getpid()); +#else + fprintf(f,"-1\n"); +#endif + if (fclose(f)) { + DBG_ERROR(0, "Could not close PID file \"%s\" (%s), aborting.", pidFilename, strerror(errno)); + return GWEN_ERROR_IO; + } + return 0; +} + + + + + + + diff --git a/aqhome/0BUILD b/aqhome/0BUILD index 2196539..08d6eaf 100644 --- a/aqhome/0BUILD +++ b/aqhome/0BUILD @@ -46,22 +46,26 @@ api.h aqhome.h + msgmanager.h $(local/typefiles) aqhome.c + msgmanager.c msg + ipc nodes aqhmsg + aqhipc aqhnodes diff --git a/aqhome/ipc/0BUILD b/aqhome/ipc/0BUILD new file mode 100644 index 0000000..5b8e0db --- /dev/null +++ b/aqhome/ipc/0BUILD @@ -0,0 +1,84 @@ + + + + + + + + $(gwenhywfar_cflags) + -I$(topsrcdir) + -I$(topbuilddir) + + + + --include=$(builddir) + --include=$(srcdir) + + + + + + $(visibility_cflags) + + + + --api=AQHOME_API + + + + + + + + + + + + + + + + + + $(local/built_headers_pub) + + + + + endpoint_node_ipc.h + endpoint_node_ipc_tcp.h + msg_ipc.h + msg_forward.h + + + + + + + + + $(local/typefiles) + + endpoint_node_ipc.c + endpoint_node_ipc_tcp.c + msg_ipc.c + msg_forward.c + + + + + + + + + + + + + + + + + + + diff --git a/aqhome/ipc/endpoint_node_ipc.c b/aqhome/ipc/endpoint_node_ipc.c new file mode 100644 index 0000000..778d311 --- /dev/null +++ b/aqhome/ipc/endpoint_node_ipc.c @@ -0,0 +1,64 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/ipc/endpoint_node_ipc.h" +#include "aqhome/msg/endpoint_node.h" +#include "aqhome/msg/msg_node.h" +#include "aqhome/ipc/msg_forward.h" + +#include +#include +#include +#include + + +#define AQH_MSG_ENDPOINT_NODEIPC_NAME "nodeipc" + + + +static void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m); + + + + + +GWEN_MSG_ENDPOINT *AQH_IpcNodeEndpoint_new(const char *name, int groupId) +{ + int fd; + GWEN_MSG_ENDPOINT *ep; + + ep=GWEN_IpcEndpoint_new(name?name:AQH_MSG_ENDPOINT_NODEIPC_NAME, groupId); + AQH_NodeEndpoint_Extend(ep); + GWEN_MsgEndpoint_SetProcessOutMsgFn(ep, _processOutMessage); + + AQH_NodeEndpoint_SetAcceptedMsgGroups(ep, AQH_MSG_TYPEGROUP_ALL); + + return ep; +} + + + +void _processOutMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *nodeMsg) +{ + GWEN_MSG *ipcMsg; + + ipcMsg=AQH_ForwardMsg_new(AQH_MSGTYPE_IPC_FORWARD, GWEN_Msg_GetConstBuffer(nodeMsg), GWEN_Msg_GetBytesInBuffer(nodeMsg)); + GWEN_MsgEndpoint_AddSendMessage(ep, ipcMsg); + GWEN_Msg_free(nodeMsg); +} + + + + + + diff --git a/aqhome/ipc/endpoint_node_ipc.h b/aqhome/ipc/endpoint_node_ipc.h new file mode 100644 index 0000000..f473c57 --- /dev/null +++ b/aqhome/ipc/endpoint_node_ipc.h @@ -0,0 +1,24 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_ENDPOINT_NODE_IPC_H +#define AQH_ENDPOINT_NODE_IPC_H + + +#include + +#include + + + +AQHOME_API GWEN_MSG_ENDPOINT *AQH_IpcNodeEndpoint_new(const char *name, int groupId); + +AQHOME_API int AQH_IpcNodeEndpointMgr_LoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr); + + +#endif diff --git a/aqhome/ipc/endpoint_node_ipc_tcp.c b/aqhome/ipc/endpoint_node_ipc_tcp.c new file mode 100644 index 0000000..84db0c8 --- /dev/null +++ b/aqhome/ipc/endpoint_node_ipc_tcp.c @@ -0,0 +1,63 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/ipc/endpoint_node_ipc_tcp.h" +#include "aqhome/ipc/endpoint_node_ipc.h" +#include "aqhome/msg/endpoint_node.h" +#include "aqhome/msg/msg_node.h" +#include "aqhome/ipc/msg_forward.h" + +#include +#include +#include +#include + + +#define AQH_MSG_ENDPOINT_NODEIPCTCP_NAME "nodeipctcp" + + + +static GWEN_MSG_ENDPOINT *_createChild(GWEN_MSG_ENDPOINT *ep); + + + + + +GWEN_MSG_ENDPOINT *AQH_TcpIpcNodeEndpoint_new(const char *name, const char *host, int port, int groupId) +{ + int fd; + GWEN_MSG_ENDPOINT *ep; + + ep=GWEN_TcpIpcEndpoint_new(name?name:AQH_MSG_ENDPOINT_NODEIPCTCP_NAME, host, port, groupId); + GWEN_MsgEndpoint_AddFlags(ep, AQH_MSGEP_NODE_FLAGS_NOMESSAGES); + GWEN_MsgEndpoint_SetCreateChildFn(ep, _createChild); + + return ep; +} + + + +GWEN_MSG_ENDPOINT *_createChild(GWEN_MSG_ENDPOINT *ep) +{ + DBG_INFO(AQH_LOGDOMAIN, "Creating child endpoint for %s", GWEN_MsgEndpoint_GetName(ep)); + return AQH_IpcNodeEndpoint_new(NULL, GWEN_MsgEndpoint_GetGroupId(ep)); +} + + + + + + + + + diff --git a/aqhome/ipc/endpoint_node_ipc_tcp.h b/aqhome/ipc/endpoint_node_ipc_tcp.h new file mode 100644 index 0000000..5f38ec0 --- /dev/null +++ b/aqhome/ipc/endpoint_node_ipc_tcp.h @@ -0,0 +1,22 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_ENDPOINT_NODE_IPC_TCP_H +#define AQH_ENDPOINT_NODE_IPC_TCP_H + + +#include + +#include + + + +AQHOME_API GWEN_MSG_ENDPOINT *AQH_TcpIpcNodeEndpoint_new(const char *name, const char *host, int port, int groupId); + + +#endif diff --git a/aqhome/ipc/msg_forward.c b/aqhome/ipc/msg_forward.c new file mode 100644 index 0000000..fb079d3 --- /dev/null +++ b/aqhome/ipc/msg_forward.c @@ -0,0 +1,82 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + + + + +GWEN_MSG *AQH_ForwardMsg_new(uint16_t code, const uint8_t *ptr, uint32_t len) +{ + return GWEN_IpcMsg_new(AQH_IPC_PROTOCOL_ID, AQH_IPC_PROTOCOL_VERSION, code, len, ptr); +} + + + +const uint8_t *AQH_ForwardMsg_GetMsgPtr(const GWEN_MSG *msg) +{ + if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSG_FORWARD_MINSIZE) + return GWEN_Msg_GetConstBuffer(msg)+AQH_MSG_OFFS_FORWARD_MSG; + return NULL; +} + + + +uint32_t AQH_ForwardMsg_GetMsgLen(const GWEN_MSG *msg) +{ + uint32_t len; + + len=GWEN_Msg_GetBytesInBuffer(msg); + if (len>AQH_MSG_FORWARD_MINSIZE) { + return len-AQH_MSG_OFFS_FORWARD_MSG; + } + return 0; +} + + + +void AQH_ForwardMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText) +{ + if (GWEN_Msg_GetBytesInBuffer(msg)>=AQH_MSG_FORWARD_MINSIZE) { + const uint8_t *ptr; + uint32_t len; + + ptr=AQH_ForwardMsg_GetMsgPtr(msg); + len=AQH_ForwardMsg_GetMsgLen(msg); + + GWEN_Buffer_AppendArgs(dbuf, "FORWARD (code=%d, protocol=%d, protocol version=%d)\n", + GWEN_IpcMsg_GetCode(msg), + GWEN_IpcMsg_GetProtoId(msg), + GWEN_IpcMsg_GetProtoVersion(msg)); + if (ptr && len) { + GWEN_Text_DumpString2Buffer(ptr, len, dbuf, 2); + GWEN_Buffer_AppendByte(dbuf, '\n'); + } + } +} + + + + + + + diff --git a/aqhome/ipc/msg_forward.h b/aqhome/ipc/msg_forward.h new file mode 100644 index 0000000..ac315b9 --- /dev/null +++ b/aqhome/ipc/msg_forward.h @@ -0,0 +1,37 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_FORWARD_H +#define AQH_MSG_FORWARD_H + + +#include +#include +#include + +#include +#include + + + +#define AQH_MSG_OFFS_FORWARD_MSG (GWEN_MSGIPC_OFFS_PAYLOAD+0) + +#define AQH_MSG_FORWARD_MINSIZE (AQH_MSG_OFFS_FORWARD_MSG+AQH_MSG_OFFS_ALL_DATA_BEGIN) + + + +AQHOME_API GWEN_MSG *AQH_ForwardMsg_new(uint16_t code, const uint8_t *ptr, uint32_t len); +AQHOME_API const uint8_t *AQH_ForwardMsg_GetMsgPtr(const GWEN_MSG *msg); +AQHOME_API uint32_t AQH_ForwardMsg_GetMsgLen(const GWEN_MSG *msg); +AQHOME_API void AQH_ForwardMsg_DumpToBuffer(const GWEN_MSG *msg, GWEN_BUFFER *dbuf, const char *sText); + + +#endif + + + diff --git a/aqhome/ipc/msg_ipc.c b/aqhome/ipc/msg_ipc.c new file mode 100644 index 0000000..41f3a6d --- /dev/null +++ b/aqhome/ipc/msg_ipc.c @@ -0,0 +1,16 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include + + diff --git a/aqhome/ipc/msg_ipc.h b/aqhome/ipc/msg_ipc.h new file mode 100644 index 0000000..28c1d21 --- /dev/null +++ b/aqhome/ipc/msg_ipc.h @@ -0,0 +1,30 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_IPC_H +#define AQH_MSG_IPC_H + + +#include + +#include +#include + + +#define AQH_IPC_PROTOCOL_ID 1 +#define AQH_IPC_PROTOCOL_VERSION 1 + + +#define AQH_MSGTYPE_IPC_FORWARD 0x100 + + + +#endif + + + diff --git a/aqhome/libtest.c b/aqhome/libtest.c index 63ef4d7..320ff96 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -8,6 +8,8 @@ #include "aqhome/msg/endpoint_node.h" #include "aqhome/msg/endpoint_log.h" #include "aqhome/msg/endpoint_tty.h" +#include "aqhome/ipc/endpoint_node_ipc_tcp.h" +#include "aqhome/msgmanager.h" #include "aqhome/aqhome.h" @@ -97,29 +99,37 @@ int testEndpoints() GWEN_MSG_ENDPOINT_MGR *emgr; GWEN_MSG_ENDPOINT *epTty; GWEN_MSG_ENDPOINT *epLog; + GWEN_MSG_ENDPOINT *epTcp; rv=AQH_Init(); if (rv<0) { } - emgr=AQH_MsgEndpointMgr_new(0xc0); - epTty=AQH_TtyNodeEndpoint_new("/dev/ttyUSB0", AQH_MSG_ENDPOINTGROUP_NODE); + emgr=AQH_MsgManager_new(0xc0); + epTty=AQH_TtyNodeEndpoint_new("/dev/ttyUSB0", AQH_MSGMGR_ENDPOINTGROUP_NODE); if (epTty==NULL) { DBG_ERROR(NULL, "Error creating endpoint TTY"); return 2; } GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTty); - epLog=AQH_LogEndpoint_new("endpoints.log", AQH_MSG_ENDPOINTGROUP_NODE); + epLog=AQH_LogEndpoint_new("endpoints.log", AQH_MSGMGR_ENDPOINTGROUP_NODE); if (epLog==NULL) { DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint LOG"); return 2; } GWEN_MsgEndpointMgr_AddEndpoint(emgr, epLog); + epTcp=AQH_TcpIpcNodeEndpoint_new(NULL, "127.0.0.1", 45454, AQH_MSGMGR_ENDPOINTGROUP_IPC); + if (epTcp==NULL) { + DBG_ERROR(AQH_LOGDOMAIN, "Error creating endpoint TCP"); + return 2; + } + GWEN_MsgEndpointMgr_AddEndpoint(emgr, epTcp); + for (;;) { DBG_DEBUG(AQH_LOGDOMAIN, "Next loop"); - AQH_MsgEndpointMgr_LoopOnce(emgr); + AQH_MsgManager_LoopOnce(emgr); } return 0; } diff --git a/aqhome/msg/0BUILD b/aqhome/msg/0BUILD index 92198df..cda53c1 100644 --- a/aqhome/msg/0BUILD +++ b/aqhome/msg/0BUILD @@ -39,12 +39,12 @@ - + $(local/built_headers_pub) - + endpointmgr.h endpoint_node.h endpoint_log.h diff --git a/aqhome/msg/endpoint_node.c b/aqhome/msg/endpoint_node.c index 0506072..0384f15 100644 --- a/aqhome/msg/endpoint_node.c +++ b/aqhome/msg/endpoint_node.c @@ -39,18 +39,25 @@ static void GWENHYWFAR_CB _freeData(void *bp, void *p); GWEN_MSG_ENDPOINT *AQH_NodeEndpoint_new(const char *name, int groupId) { GWEN_MSG_ENDPOINT *ep; - AQH_MSG_ENDPOINT_NODE *xep; - int fd; ep=GWEN_MsgEndpoint_new(name?name:AQH_MSG_ENDPOINT_NODE_NAME, groupId); - GWEN_NEW_OBJECT(AQH_MSG_ENDPOINT_NODE, xep); - GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_NODE, ep, xep, _freeData); + AQH_NodeEndpoint_Extend(ep); return ep; } +void AQH_NodeEndpoint_Extend(GWEN_MSG_ENDPOINT *ep) +{ + AQH_MSG_ENDPOINT_NODE *xep; + + GWEN_NEW_OBJECT(AQH_MSG_ENDPOINT_NODE, xep); + GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, AQH_MSG_ENDPOINT_NODE, ep, xep, _freeData); +} + + + void _freeData(void *bp, void *p) { AQH_MSG_ENDPOINT_NODE *xep; diff --git a/aqhome/msg/endpoint_node.h b/aqhome/msg/endpoint_node.h index ad6b721..3c8fdda 100644 --- a/aqhome/msg/endpoint_node.h +++ b/aqhome/msg/endpoint_node.h @@ -20,6 +20,7 @@ AQHOME_API GWEN_MSG_ENDPOINT *AQH_NodeEndpoint_new(const char *name, int groupId); +AQHOME_API void AQH_NodeEndpoint_Extend(GWEN_MSG_ENDPOINT *ep); AQHOME_API uint32_t AQH_NodeEndpoint_GetAcceptedMsgGroups(const GWEN_MSG_ENDPOINT *ep); AQHOME_API void AQH_NodeEndpoint_SetAcceptedMsgGroups(GWEN_MSG_ENDPOINT *ep, uint32_t f); diff --git a/aqhome/msg/endpointmgr.c b/aqhome/msg/endpointmgr.c index 546e06d..972a81a 100644 --- a/aqhome/msg/endpointmgr.c +++ b/aqhome/msg/endpointmgr.c @@ -27,10 +27,8 @@ GWEN_INHERIT(GWEN_MSG_ENDPOINT_MGR, AQH_MSG_ENDPOINT_MGR); static void GWENHYWFAR_CB _freeData(void *bp, void *p); -static void _msgLoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr); static void _handleEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep); static void _handleNodeEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep); -static void _handleIpcEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep); static void _distributeMsgFromNodeEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *srcEp, const GWEN_MSG *msg); @@ -64,19 +62,19 @@ void _freeData(void *bp, void *p) -int AQH_MsgEndpointMgr_LoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr) +int AQH_MsgEndpointMgr_LoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr, int groupId) { int rv; rv=GWEN_MsgEndpointMgr_IoLoopOnce(emgr); - _msgLoopOnce(emgr); + AQH_MsgEndpointMgr_LoopOnceOverNodeEndpoints(emgr, groupId); GWEN_MsgEndpointMgr_RunAllEndpoints(emgr); return rv; } -void _msgLoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr) +void AQH_MsgEndpointMgr_LoopOnceOverNodeEndpoints(GWEN_MSG_ENDPOINT_MGR *emgr, int groupId) { GWEN_MSG_ENDPOINT_LIST *endpointList; @@ -88,7 +86,8 @@ void _msgLoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr) ep=GWEN_MsgEndpoint_List_First(endpointList); while(ep) { DBG_DEBUG(AQH_LOGDOMAIN, "- endpoint(%s)", GWEN_MsgEndpoint_GetName(ep)); - _handleEndpoint(emgr, ep); + if (GWEN_MsgEndpoint_GetGroupId(ep)==groupId) + _handleNodeEndpoint(emgr, ep); ep=GWEN_MsgEndpoint_List_Next(ep); } } @@ -96,17 +95,6 @@ void _msgLoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr) -void _handleEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep) -{ - switch(GWEN_MsgEndpoint_GetGroupId(ep)) { - case AQH_MSG_ENDPOINTGROUP_NODE: _handleNodeEndpoint(emgr, ep); break; - case AQH_MSG_ENDPOINTGROUP_IPC: _handleIpcEndpoint(emgr, ep); break; - default: break; - } -} - - - void _handleNodeEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep) { GWEN_MSG *msg; @@ -122,13 +110,6 @@ void _handleNodeEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep) -void _handleIpcEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep) -{ - /* TODO: handle IPC messages */ -} - - - void _distributeMsgFromNodeEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *srcEp, const GWEN_MSG *msg) { GWEN_MSG_ENDPOINT_LIST *endpointList; @@ -159,7 +140,7 @@ void _distributeMsgFromNodeEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOI ) { /* endpoint accepts this message */ DBG_DEBUG(AQH_LOGDOMAIN, " - endpoint %s accepts message", GWEN_MsgEndpoint_GetName(ep)); - GWEN_MsgEndpoint_AddSendMessage(ep, GWEN_Msg_dup(msg)); + GWEN_MsgEndpoint_ProcessOutMessage(ep, GWEN_Msg_dup(msg)); } else { DBG_DEBUG(AQH_LOGDOMAIN, " - endpoint %s does not accept message", GWEN_MsgEndpoint_GetName(ep)); diff --git a/aqhome/msg/endpointmgr.h b/aqhome/msg/endpointmgr.h index c2a8d7d..92c6ca7 100644 --- a/aqhome/msg/endpointmgr.h +++ b/aqhome/msg/endpointmgr.h @@ -16,13 +16,10 @@ -#define AQH_MSG_ENDPOINTGROUP_NODE 1 -#define AQH_MSG_ENDPOINTGROUP_IPC 2 - - - AQHOME_API GWEN_MSG_ENDPOINT_MGR *AQH_MsgEndpointMgr_new(uint8_t busAddr); -AQHOME_API int AQH_MsgEndpointMgr_LoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr); +AQHOME_API int AQH_MsgEndpointMgr_LoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr, int groupId); + +AQHOME_API void AQH_MsgEndpointMgr_LoopOnceOverNodeEndpoints(GWEN_MSG_ENDPOINT_MGR *emgr, int groupId); diff --git a/aqhome/msgmanager.c b/aqhome/msgmanager.c new file mode 100644 index 0000000..4f758c1 --- /dev/null +++ b/aqhome/msgmanager.c @@ -0,0 +1,81 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include +#endif + + +#include "aqhome/msgmanager.h" +#include "aqhome/msg/endpointmgr.h" + +#include +#include + + + +void _loopOnceOverIpcEndpoints(GWEN_MSG_ENDPOINT_MGR *emgr); +void _handleIpcEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep); + + + + + +GWEN_MSG_ENDPOINT_MGR *AQH_MsgManager_new(uint8_t busAddr) +{ + return AQH_MsgEndpointMgr_new(busAddr); +} + + + +int AQH_MsgManager_LoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr) +{ + int rv; + + rv=GWEN_MsgEndpointMgr_IoLoopOnce(emgr); + AQH_MsgEndpointMgr_LoopOnceOverNodeEndpoints(emgr, AQH_MSGMGR_ENDPOINTGROUP_NODE); + _loopOnceOverIpcEndpoints(emgr); + GWEN_MsgEndpointMgr_RunAllEndpoints(emgr); + return rv; +} + + + +void _loopOnceOverIpcEndpoints(GWEN_MSG_ENDPOINT_MGR *emgr) +{ + GWEN_MSG_ENDPOINT_LIST *endpointList; + + DBG_DEBUG(AQH_LOGDOMAIN, "Handle endpoint messages"); + endpointList=GWEN_MsgEndpointMgr_GetEndpointList(emgr); + if (endpointList) { + GWEN_MSG_ENDPOINT *ep; + + ep=GWEN_MsgEndpoint_List_First(endpointList); + while(ep) { + DBG_DEBUG(AQH_LOGDOMAIN, "- endpoint(%s)", GWEN_MsgEndpoint_GetName(ep)); + if (GWEN_MsgEndpoint_GetGroupId(ep)==AQH_MSGMGR_ENDPOINTGROUP_IPC) + _handleIpcEndpoint(emgr, ep); + ep=GWEN_MsgEndpoint_List_Next(ep); + } + } +} + + + +void _handleIpcEndpoint(GWEN_MSG_ENDPOINT_MGR *emgr, GWEN_MSG_ENDPOINT *ep) +{ + GWEN_MSG *msg; + + while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep)) ) { + /* exec IPC message */ + GWEN_Msg_free(msg); + } +} + + + diff --git a/aqhome/msgmanager.h b/aqhome/msgmanager.h new file mode 100644 index 0000000..3d89a58 --- /dev/null +++ b/aqhome/msgmanager.h @@ -0,0 +1,32 @@ +/**************************************************************************** + * This file is part of the project AqHome. + * AqHome (c) by 2023 Martin Preuss, all rights reserved. + * + * The license for this file can be found in the file COPYING which you + * should have received along with this file. + ****************************************************************************/ + +#ifndef AQH_MSG_MANAGER_H +#define AQH_MSG_MANAGER_H + + +#include + +#include + + + +#define AQH_MSGMGR_ENDPOINTGROUP_NODE 1 +#define AQH_MSGMGR_ENDPOINTGROUP_IPC 2 + + + +AQHOME_API GWEN_MSG_ENDPOINT_MGR *AQH_MsgManager_new(uint8_t busAddr); +AQHOME_API int AQH_MsgManager_LoopOnce(GWEN_MSG_ENDPOINT_MGR *emgr); + + + + + + +#endif diff --git a/aqhome/nodes/nodedb.c b/aqhome/nodes/nodedb.c index b1b3b95..0be6d72 100644 --- a/aqhome/nodes/nodedb.c +++ b/aqhome/nodes/nodedb.c @@ -80,32 +80,14 @@ int AQH_NodeDb_AddNodeInfo(AQH_NODE_DB *ndb, AQH_NODE_INFO *ni) AQH_NODE_INFO *AQH_NodeDb_GetNodeInfoByBusAddr(AQH_NODE_DB *ndb, uint8_t busAddr) { - AQH_NODE_INFO *ni; - - ni=AQH_NodeInfo_List_First(ndb->nodeList); - while(ni) { - if (busAddr==0 || busAddr==AQH_NodeInfo_GetBusAddress(ni)) - return ni; - ni=AQH_NodeInfo_List_Next(ni); - } - - return NULL; + return AQH_NodeInfo_List_GetByBusAddress(ndb->nodeList, busAddr); } AQH_NODE_INFO *AQH_NodeDb_GetNodeInfoByUid(AQH_NODE_DB *ndb, uint32_t uid) { - AQH_NODE_INFO *ni; - - ni=AQH_NodeInfo_List_First(ndb->nodeList); - while(ni) { - if (uid==0 || uid==AQH_NodeInfo_GetUid(ni)) - return ni; - ni=AQH_NodeInfo_List_Next(ni); - } - - return NULL; + return AQH_NodeInfo_List_GetByUid(ndb->nodeList, uid); }