Files
aqhomecontrol/aqhome/msgendpointtcp.c
Martin Preuss caa85edfc6 More work on node/pc interface.
- added AQH_MSG_ENDPOINT
- added AQH_MsgEndpointLog
- added AQH_MsgEndpointTcp
- added AQH_MsgEndpointTty
- added AQH_MsgEndpointMgr
2023-02-20 23:45:10 +01:00

223 lines
4.7 KiB
C

/****************************************************************************
* This file is part of the project AqHome.
* AqHome (c) by 2023 Martin Preuss, all rights reserved.
*
* The license for this file can be found in the file COPYING which you
* should have received along with this file.
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "aqhome/msgendpointtcp.h"
#include <gwenhywfar/list.h>
#include <gwenhywfar/inherit.h>
#include <gwenhywfar/debug.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#define AQH_MSG_ENDPOINTTCP_BACKLOG 10
static int _setupListeningSocket(const char *host, int port);
static int _setSocketNonBlocking(int fd);
static int _getReadFd(AQH_MSG_ENDPOINT *ep);
static int _getWriteFd(AQH_MSG_ENDPOINT *ep);
static int _handleReadable(AQH_MSG_ENDPOINT *ep, AQH_MSG_ENDPOINT_MGR *emgr);
static int _handleWritable(AQH_MSG_ENDPOINT *ep, AQH_MSG_ENDPOINT_MGR *emgr);
AQH_MSG_ENDPOINT *AQH_MsgEndpointTcp_new(const char *host, int port)
{
int fd;
AQH_MSG_ENDPOINT *ep;
fd=_setupListeningSocket(host, port);
if (fd<0) {
DBG_INFO(NULL, "here");
return NULL;
}
ep=AQH_MsgEndpoint_new(fd, AQH_MSG_ENDPOINT_ENDPOINTGROUP_NET);
AQH_MsgEndpoint_SetAcceptedEndpointGroups(ep, AQH_MSG_ENDPOINT_ENDPOINTGROUP_BUS);
AQH_MsgEndpoint_AddFlags(ep, AQH_MSG_ENDPOINT_FLAGS_NOMESSAGES);
AQH_MsgEndpoint_SetHandleReadableFn(ep, _handleReadable);
AQH_MsgEndpoint_SetHandleWritableFn(ep, _handleWritable);
AQH_MsgEndpoint_SetGetReadFdFn(ep, _getReadFd);
AQH_MsgEndpoint_SetGetWriteFdFn(ep, _getWriteFd);
return ep;
}
int _setupListeningSocket(const char *host, int port)
{
struct sockaddr_in addr;
int rv;
int sk;
int i;
memset(&addr, 0, sizeof(addr));
addr.sin_port=htons(port);
addr.sin_family=AF_INET;
if (inet_aton(host, &(addr.sin_addr))==0) {
/* bad address */
}
sk=socket(AF_INET, SOCK_STREAM, 0);
if (sk<0) {
/* socket error */
DBG_ERROR(NULL, "socket(): %s", strerror(errno));
}
i=1;
rv=setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
if (rv<0) {
DBG_ERROR(NULL, "setsockopt(): %s", strerror(errno));
close(sk);
return GWEN_ERROR_IO;
}
rv=_setSocketNonBlocking(sk);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
close(sk);
return rv;
}
rv=bind(sk, (struct sockaddr*) &addr, sizeof(addr));
if (rv<0) {
DBG_ERROR(NULL, "bind(): %s", strerror(errno));
close(sk);
return GWEN_ERROR_IO;
}
rv=listen(sk, AQH_MSG_ENDPOINTTCP_BACKLOG);
if (rv<0) {
DBG_ERROR(NULL, "listen(): %s", strerror(errno));
close(sk);
return GWEN_ERROR_IO;
}
return sk;
}
int _setSocketNonBlocking(int fd)
{
int prevFlags;
int newFlags;
/* get current socket flags */
prevFlags=fcntl(fd, F_GETFL);
if (prevFlags==-1) {
DBG_ERROR(NULL, "fcntl(): %s", strerror(errno));
return GWEN_ERROR_IO;
}
/* set nonblocking/blocking */
newFlags=prevFlags|O_NONBLOCK;
if (-1==fcntl(fd, F_SETFL, newFlags)) {
DBG_ERROR(NULL, "fcntl(): %s", strerror(errno));
return GWEN_ERROR_IO;
}
return 0;
}
int _getReadFd(AQH_MSG_ENDPOINT *ep)
{
return AQH_MsgEndpoint_GetFd(ep);
}
int _getWriteFd(AQH_MSG_ENDPOINT *ep)
{
return -1;
}
int _handleReadable(AQH_MSG_ENDPOINT *ep, AQH_MSG_ENDPOINT_MGR *emgr)
{
int fd;
int newSock;
int rv;
struct sockaddr_in clientAddr;
socklen_t len;
AQH_MSG_ENDPOINT *newEp;
fd=AQH_MsgEndpoint_GetFd(ep);
memset(&clientAddr, 0, sizeof(clientAddr));
do {
len=sizeof(clientAddr);
rv=accept(fd, (struct sockaddr*) &clientAddr, &len);
} while(rv<0 && errno==EINTR);
if (rv<0) {
if (errno==EAGAIN || errno==EWOULDBLOCK)
return GWEN_ERROR_TRY_AGAIN;
DBG_ERROR(NULL, "Error on accept(): %s (%d)", strerror(errno), errno);
return GWEN_ERROR_IO;
}
newSock=rv;
rv=_setSocketNonBlocking(newSock);
if (rv<0) {
DBG_INFO(NULL, "here (%d)", rv);
close(newSock);
return rv;
}
newEp=AQH_MsgEndpoint_new(newSock, AQH_MsgEndpoint_GetGroupId(ep));
AQH_MsgEndpoint_SetFlags(newEp, AQH_MsgEndpoint_GetFlags(ep));
AQH_MsgEndpoint_SubFlags(newEp, AQH_MSG_ENDPOINT_FLAGS_NOMESSAGES);
AQH_MsgEndpoint_SetAcceptedMsgGroups(newEp, AQH_MsgEndpoint_GetAcceptedMsgGroups(ep));
AQH_MsgEndpoint_SetAcceptedEndpointGroups(newEp, AQH_MsgEndpoint_GetAcceptedEndpointGroups(ep));
AQH_MsgEndpointMgr_AddEndpoint(emgr, newEp);
return 0;
}
int _handleWritable(AQH_MSG_ENDPOINT *ep, AQH_MSG_ENDPOINT_MGR *emgr)
{
/* should not get called */
return GWEN_ERROR_INVALID;
}