/**************************************************************************** * This file is part of the project AqHome. * AqHome (c) by 2023 Martin Preuss, all rights reserved. * * The license for this file can be found in the file COPYING which you * should have received along with this file. ****************************************************************************/ #ifdef HAVE_CONFIG_H # include #endif #include "aqhome/msgendpointtcp.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #define AQH_MSG_ENDPOINTTCP_BACKLOG 10 static int _setupListeningSocket(const char *host, int port); static int _setSocketNonBlocking(int fd); static int _getReadFd(AQH_MSG_ENDPOINT *ep); static int _getWriteFd(AQH_MSG_ENDPOINT *ep); static int _handleReadable(AQH_MSG_ENDPOINT *ep, AQH_MSG_ENDPOINT_MGR *emgr); static int _handleWritable(AQH_MSG_ENDPOINT *ep, AQH_MSG_ENDPOINT_MGR *emgr); AQH_MSG_ENDPOINT *AQH_MsgEndpointTcp_new(const char *host, int port) { int fd; AQH_MSG_ENDPOINT *ep; fd=_setupListeningSocket(host, port); if (fd<0) { DBG_INFO(NULL, "here"); return NULL; } ep=AQH_MsgEndpoint_new(fd, AQH_MSG_ENDPOINT_ENDPOINTGROUP_NET); AQH_MsgEndpoint_SetAcceptedEndpointGroups(ep, AQH_MSG_ENDPOINT_ENDPOINTGROUP_BUS); AQH_MsgEndpoint_AddFlags(ep, AQH_MSG_ENDPOINT_FLAGS_NOMESSAGES); AQH_MsgEndpoint_SetHandleReadableFn(ep, _handleReadable); AQH_MsgEndpoint_SetHandleWritableFn(ep, _handleWritable); AQH_MsgEndpoint_SetGetReadFdFn(ep, _getReadFd); AQH_MsgEndpoint_SetGetWriteFdFn(ep, _getWriteFd); return ep; } int _setupListeningSocket(const char *host, int port) { struct sockaddr_in addr; int rv; int sk; int i; memset(&addr, 0, sizeof(addr)); addr.sin_port=htons(port); addr.sin_family=AF_INET; if (inet_aton(host, &(addr.sin_addr))==0) { /* bad address */ } sk=socket(AF_INET, SOCK_STREAM, 0); if (sk<0) { /* socket error */ DBG_ERROR(NULL, "socket(): %s", strerror(errno)); } i=1; rv=setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)); if (rv<0) { DBG_ERROR(NULL, "setsockopt(): %s", strerror(errno)); close(sk); return GWEN_ERROR_IO; } rv=_setSocketNonBlocking(sk); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); close(sk); return rv; } rv=bind(sk, (struct sockaddr*) &addr, sizeof(addr)); if (rv<0) { DBG_ERROR(NULL, "bind(): %s", strerror(errno)); close(sk); return GWEN_ERROR_IO; } rv=listen(sk, AQH_MSG_ENDPOINTTCP_BACKLOG); if (rv<0) { DBG_ERROR(NULL, "listen(): %s", strerror(errno)); close(sk); return GWEN_ERROR_IO; } return sk; } int _setSocketNonBlocking(int fd) { int prevFlags; int newFlags; /* get current socket flags */ prevFlags=fcntl(fd, F_GETFL); if (prevFlags==-1) { DBG_ERROR(NULL, "fcntl(): %s", strerror(errno)); return GWEN_ERROR_IO; } /* set nonblocking/blocking */ newFlags=prevFlags|O_NONBLOCK; if (-1==fcntl(fd, F_SETFL, newFlags)) { DBG_ERROR(NULL, "fcntl(): %s", strerror(errno)); return GWEN_ERROR_IO; } return 0; } int _getReadFd(AQH_MSG_ENDPOINT *ep) { return AQH_MsgEndpoint_GetFd(ep); } int _getWriteFd(AQH_MSG_ENDPOINT *ep) { return -1; } int _handleReadable(AQH_MSG_ENDPOINT *ep, AQH_MSG_ENDPOINT_MGR *emgr) { int fd; int newSock; int rv; struct sockaddr_in clientAddr; socklen_t len; AQH_MSG_ENDPOINT *newEp; fd=AQH_MsgEndpoint_GetFd(ep); memset(&clientAddr, 0, sizeof(clientAddr)); do { len=sizeof(clientAddr); rv=accept(fd, (struct sockaddr*) &clientAddr, &len); } while(rv<0 && errno==EINTR); if (rv<0) { if (errno==EAGAIN || errno==EWOULDBLOCK) return GWEN_ERROR_TRY_AGAIN; DBG_ERROR(NULL, "Error on accept(): %s (%d)", strerror(errno), errno); return GWEN_ERROR_IO; } newSock=rv; rv=_setSocketNonBlocking(newSock); if (rv<0) { DBG_INFO(NULL, "here (%d)", rv); close(newSock); return rv; } newEp=AQH_MsgEndpoint_new(newSock, AQH_MsgEndpoint_GetGroupId(ep)); AQH_MsgEndpoint_SetFlags(newEp, AQH_MsgEndpoint_GetFlags(ep)); AQH_MsgEndpoint_SubFlags(newEp, AQH_MSG_ENDPOINT_FLAGS_NOMESSAGES); AQH_MsgEndpoint_SetAcceptedMsgGroups(newEp, AQH_MsgEndpoint_GetAcceptedMsgGroups(ep)); AQH_MsgEndpoint_SetAcceptedEndpointGroups(newEp, AQH_MsgEndpoint_GetAcceptedEndpointGroups(ep)); AQH_MsgEndpointMgr_AddEndpoint(emgr, newEp); return 0; } int _handleWritable(AQH_MSG_ENDPOINT *ep, AQH_MSG_ENDPOINT_MGR *emgr) { /* should not get called */ return GWEN_ERROR_INVALID; }