diff --git a/aqhome/http/endpoint_http.c b/aqhome/http/endpoint_http.c index 552c7c6..48e89e2 100644 --- a/aqhome/http/endpoint_http.c +++ b/aqhome/http/endpoint_http.c @@ -51,8 +51,9 @@ static int _distributeBufferInHeaderMode(GWEN_MSG_ENDPOINT *ep, const uint8_t *b static int _distributeBufferInBodyMode(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, int bufferLen); static int _distributeBufferAsLine(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, int bufferLen); static void _finishMessageAndStartNext(GWEN_MSG_ENDPOINT *ep); -static GWEN_DB_NODE *_parseHeader(char *bufferPtr); -static GWEN_DB_NODE *_parseCommand(const char *buffer); +static void _abortMessage(GWEN_MSG_ENDPOINT *ep); +static int _parseHeader(char *bufferPtr, GWEN_DB_NODE *db); +static int _parseCommand(const char *buffer, GWEN_DB_NODE *db); @@ -83,8 +84,8 @@ void _freeData(void *bp, void *p) xep=(AQH_ENDPOINT_HTTP*) p; - GWEN_DB_Group_free(xep->currentReadHeader); - GWEN_DB_Group_free(xep->currentReadCommand); + GWEN_DB_Group_free(xep->dbCurrentReadHeader); + GWEN_DB_Group_free(xep->dbCurrentReadCommand); GWEN_Buffer_free(xep->currentReadBuffer); GWEN_FREE_OBJECT(xep); @@ -148,6 +149,7 @@ void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSE "Endpoint %s: Error writing current message (%d), disconnecting", GWEN_MsgEndpoint_GetName(ep), rv); + _abortMessage(ep); GWEN_MsgEndpoint_Disconnect(ep); return; } @@ -161,6 +163,7 @@ void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSE "Endpoint %s: Error reading current message (%d), disconnecting", GWEN_MsgEndpoint_GetName(ep), rv); + _abortMessage(ep); GWEN_MsgEndpoint_Disconnect(ep); return; } @@ -277,7 +280,7 @@ int _distributeBufferContent(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, in return GWEN_ERROR_GENERIC; } if (rv<0) { - DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); + DBG_INFO(AQH_LOGDOMAIN, "here (%d)", rv); return rv; } else if (rv==0) { @@ -302,19 +305,21 @@ int _distributeBufferInCommandMode(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferP rv=_distributeBufferAsLine(ep, bufferPtr, bufferLen); if (rv<0) { AQH_ENDPOINT_HTTP *xep; - GWEN_DB_NODE *db; const char *s; - /* line complete, TODO: parse status/command line */ + /* line complete */ + DBG_INFO(AQH_LOGDOMAIN, "Command line complete"); rv=-rv; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_HTTP, ep); - db=_parseCommand(GWEN_Buffer_GetStart(xep->currentReadBuffer)); - if (db==NULL) { + xep->dbCurrentReadCommand=GWEN_DB_Group_new("command"); + + if (_parseCommand(GWEN_Buffer_GetStart(xep->currentReadBuffer), xep->dbCurrentReadCommand)<0) { DBG_INFO(AQH_LOGDOMAIN, "Error parsing command line [%s]", GWEN_Buffer_GetStart(xep->currentReadBuffer)); return GWEN_ERROR_BAD_DATA; } - s=GWEN_DB_GetCharValue(db, "protocol", 0, "HTTP/0.9"); + DBG_ERROR(AQH_LOGDOMAIN, "Command line received: %s", GWEN_Buffer_GetStart(xep->currentReadBuffer)); + s=GWEN_DB_GetCharValue(xep->dbCurrentReadCommand, "protocol", 0, "HTTP/0.9"); if (s && *s && strcasecmp(s, "HTTP/0.9")==0) { DBG_INFO(AQH_LOGDOMAIN, "HTTP 0.9, no header, message finished"); _finishMessageAndStartNext(ep); @@ -322,7 +327,10 @@ int _distributeBufferInCommandMode(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferP } DBG_INFO(AQH_LOGDOMAIN, "Command line complete, advancing to header read mode"); xep->readMode=AQH_EndpointHttpd_ReadMode_Headers; - xep->currentBodyPos=GWEN_Buffer_GetPos(xep->currentReadBuffer); + xep->currentHeaderPos=GWEN_Buffer_GetPos(xep->currentReadBuffer); + } + else { + DBG_INFO(AQH_LOGDOMAIN, "Line not yet finished (%d)", rv); } return rv; @@ -363,29 +371,27 @@ int _distributeBufferInHeaderMode(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPt /* line complete, TODO: parse status/command line */ rv=-rv; xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_HTTP, ep); - DBG_INFO(AQH_LOGDOMAIN, "Line complete, advancing to header read mode"); - xep->readMode=AQH_EndpointHttpd_ReadMode_Headers; lineLength=GWEN_Buffer_GetPos(xep->currentReadBuffer)-xep->lastLineStartPos; xep->lastLineStartPos=GWEN_Buffer_GetPos(xep->currentReadBuffer); if (lineLength==2) { char *copyOfHeader; - GWEN_DB_NODE *db; int contentLength; /* Empty line received, TODO: parse header */ DBG_INFO(AQH_LOGDOMAIN, "Empty header line received, end of header reached."); copyOfHeader=strdup(GWEN_Buffer_GetStart(xep->currentReadBuffer)+xep->currentHeaderPos); - db=_parseHeader(copyOfHeader); - if (db==NULL) { + xep->dbCurrentReadHeader=GWEN_DB_Group_new("header"); + if (_parseHeader(copyOfHeader, xep->dbCurrentReadHeader)<0) { DBG_INFO(AQH_LOGDOMAIN, "Error parsing HTTP header"); free(copyOfHeader); return GWEN_ERROR_BAD_DATA; } free(copyOfHeader); - contentLength=GWEN_DB_GetIntValue(db, "Content-Length", 0, -1); + contentLength=GWEN_DB_GetIntValue(xep->dbCurrentReadHeader, "Content-Length", 0, -1); if (contentLength==0 || contentLength==-1) { - _finishMessageAndStartNext(ep); + DBG_INFO(AQH_LOGDOMAIN, "Message has no body, done"); + _finishMessageAndStartNext(ep); } else { xep->currentBodyPos=GWEN_Buffer_GetPos(xep->currentReadBuffer); @@ -433,7 +439,7 @@ int _distributeBufferInBodyMode(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, -/* return negative number of bytes handled when LF encountered, positive value otherwise */ +/* return negative number of bytes handled when LF encountered (i.e. line done), positive value otherwise */ int _distributeBufferAsLine(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, int bufferLen) { AQH_ENDPOINT_HTTP *xep; @@ -448,9 +454,10 @@ int _distributeBufferAsLine(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, int s++; i++; } - GWEN_Buffer_AppendBytes(xep->currentReadBuffer, (const char*) bufferPtr, i+1); + i++; + GWEN_Buffer_AppendBytes(xep->currentReadBuffer, (const char*) bufferPtr, i); if (*s==10) { - DBG_INFO(AQH_LOGDOMAIN, "Received full line"); + DBG_DEBUG(AQH_LOGDOMAIN, "Received full line (added %d bytes from %d)", i, bufferLen); i=-i; } return i; @@ -462,10 +469,19 @@ void _finishMessageAndStartNext(GWEN_MSG_ENDPOINT *ep) { AQH_ENDPOINT_HTTP *xep; GWEN_MSG *msg; + GWEN_DB_NODE *dbParsedData; DBG_INFO(AQH_LOGDOMAIN, "Message completely received."); xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_HTTP, ep); + msg=GWEN_Msg_fromBytes((const uint8_t*)GWEN_Buffer_GetStart(xep->currentReadBuffer), GWEN_Buffer_GetUsedBytes(xep->currentReadBuffer)); + dbParsedData=GWEN_DB_Group_new("parsedData"); + if (xep->dbCurrentReadCommand) + GWEN_DB_AddGroup(dbParsedData, xep->dbCurrentReadCommand); + if (xep->dbCurrentReadHeader) + GWEN_DB_AddGroup(dbParsedData, xep->dbCurrentReadHeader); + GWEN_Msg_SetDbParsedInfo(msg, dbParsedData); + GWEN_MsgEndpoint_AddReceivedMessage(ep, msg); GWEN_Buffer_Reset(xep->currentReadBuffer); @@ -473,6 +489,8 @@ void _finishMessageAndStartNext(GWEN_MSG_ENDPOINT *ep) xep->currentBodyPos=0; xep->currentBodySize=0; xep->lastLineStartPos=0; + xep->dbCurrentReadCommand=NULL; + xep->dbCurrentReadHeader=NULL; if (xep->flags & AQH_ENDPOINT_HTTP_FLAGS_PASSIVE) xep->readMode=AQH_EndpointHttpd_ReadMode_Command; @@ -482,12 +500,33 @@ void _finishMessageAndStartNext(GWEN_MSG_ENDPOINT *ep) -GWEN_DB_NODE *_parseHeader(char *bufferPtr) +void _abortMessage(GWEN_MSG_ENDPOINT *ep) { - GWEN_DB_NODE *db; - char *p; + AQH_ENDPOINT_HTTP *xep; + + DBG_INFO(AQH_LOGDOMAIN, "Message completely received."); + xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, AQH_ENDPOINT_HTTP, ep); - db=GWEN_DB_Group_new("header"); + GWEN_Buffer_Reset(xep->currentReadBuffer); + xep->currentHeaderPos=0; + xep->currentBodyPos=0; + xep->currentBodySize=0; + xep->lastLineStartPos=0; + if (xep->dbCurrentReadCommand) + GWEN_DB_Group_free(xep->dbCurrentReadCommand); + xep->dbCurrentReadCommand=NULL; + if (xep->dbCurrentReadHeader) + GWEN_DB_Group_free(xep->dbCurrentReadHeader); + xep->dbCurrentReadHeader=NULL; + + xep->readMode=AQH_EndpointHttpd_ReadMode_Aborted; +} + + + +int _parseHeader(char *bufferPtr, GWEN_DB_NODE *db) +{ + char *p; /* resolve line continuations */ p=bufferPtr; @@ -508,12 +547,15 @@ GWEN_DB_NODE *_parseHeader(char *bufferPtr) char *pVarBegin; char *pVarEnd; - /* skip blanks */ - pNext=strchr(p, 10); + /* find end of line */ + pNext=strchr(p, 13); if (pNext) { *pNext=0; pNext++; } + /* skip LF */ + if (*p==10) + p++; while (*p && (*p==32 || *p==9)) p++; if (*p) { @@ -523,35 +565,37 @@ GWEN_DB_NODE *_parseHeader(char *bufferPtr) pVarEnd=p; if (*p!=':') { DBG_INFO(AQH_LOGDOMAIN, "No separator after variable name in received header"); - GWEN_DB_Group_free(db); - return NULL; + return GWEN_ERROR_BAD_DATA; } *pVarEnd=0; p++; while (*p && (*p==32 || *p==9)) p++; - if (*p) - GWEN_DB_SetCharValue(db, GWEN_PATH_FLAGS_CREATE_VAR, pVarBegin, p); + if (*p) { + DBG_ERROR(AQH_LOGDOMAIN, "Setting header variable: [%s] = [%s]", pVarBegin, p); + GWEN_DB_SetCharValue(db, GWEN_PATH_FLAGS_CREATE_VAR, pVarBegin, p); + } } p=pNext; } - return db; + return 0; } -GWEN_DB_NODE *_parseCommand(const char *buffer) +int _parseCommand(const char *buffer, GWEN_DB_NODE *db) { - GWEN_DB_NODE *db; char *tmp; char *p; char *s; - db=GWEN_DB_Group_new("command"); - tmp=strdup(buffer); + /* get end of line (marked by CR/LF) */ + s=strchr(tmp, 13); + if (s) + *s=0; s=tmp; /* read command */ @@ -559,8 +603,7 @@ GWEN_DB_NODE *_parseCommand(const char *buffer) if (!p) { DBG_ERROR(AQH_LOGDOMAIN, "Bad format of HTTP request (%s)", buffer); free(tmp); - GWEN_DB_Group_free(db); - return NULL; + return GWEN_ERROR_BAD_DATA; } *p=0; p++; @@ -573,8 +616,7 @@ GWEN_DB_NODE *_parseCommand(const char *buffer) if (!p) { DBG_ERROR(AQH_LOGDOMAIN, "Bad format of HTTP request (%s)", buffer); free(tmp); - GWEN_DB_Group_free(db); - return NULL; + return GWEN_ERROR_BAD_DATA; } *p=0; p++; @@ -586,15 +628,14 @@ GWEN_DB_NODE *_parseCommand(const char *buffer) /* no protocol information follows, so we assume HTTP/0.9 */ DBG_ERROR(AQH_LOGDOMAIN, "Bad request (not in HTTP>=1.0)"); free(tmp); - GWEN_DB_Group_free(db); - return NULL; + return GWEN_ERROR_BAD_DATA; } else { GWEN_DB_SetCharValue(db, GWEN_DB_FLAGS_OVERWRITE_VARS, "protocol", s); } free(tmp); - return db; + return 0; } diff --git a/aqhome/http/endpoint_http_p.h b/aqhome/http/endpoint_http_p.h index ccbd9d3..1091704 100644 --- a/aqhome/http/endpoint_http_p.h +++ b/aqhome/http/endpoint_http_p.h @@ -19,6 +19,7 @@ enum { + AQH_EndpointHttpd_ReadMode_Aborted=-1, AQH_EndpointHttpd_ReadMode_Command=0, AQH_EndpointHttpd_ReadMode_Status, AQH_EndpointHttpd_ReadMode_Headers, @@ -37,8 +38,8 @@ struct AQH_ENDPOINT_HTTP { GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN checkSocketsFn; GWEN_BUFFER *currentReadBuffer; - GWEN_DB_NODE *currentReadCommand; - GWEN_DB_NODE *currentReadHeader; + GWEN_DB_NODE *dbCurrentReadCommand; + GWEN_DB_NODE *dbCurrentReadHeader; int currentHeaderPos; int currentBodyPos; int currentBodySize; diff --git a/aqhome/libtest.c b/aqhome/libtest.c index ffe3fef..7f9c520 100644 --- a/aqhome/libtest.c +++ b/aqhome/libtest.c @@ -14,6 +14,7 @@ #include "aqhome/mqtt/msg_mqtt_pubresponse.h" #include "aqhome/mqtt/msg_mqtt_subscribe.h" #include "aqhome/mqtt/msg_mqtt_suback.h" +#include "aqhome/http/endpoint_http.h" #include "aqhome/hexfile/hexfile.h" #include "aqhome/hexfile/flashrecord.h" @@ -23,6 +24,8 @@ #include #include #include +#include +#include #include #include @@ -244,6 +247,73 @@ GWEN_MSG *_awaitPacket2(GWEN_MSG_ENDPOINT *epClient, uint8_t expectedPacketType) +GWEN_MSG_ENDPOINT *_acceptHttpConnection(GWEN_UNUSED GWEN_MSG_ENDPOINT *ep, + GWEN_SOCKET *sk, + GWEN_UNUSED const GWEN_INETADDRESS *addr, + GWEN_UNUSED void *data) +{ + GWEN_MSG_ENDPOINT *epIncoming; + + DBG_INFO(GWEN_LOGDOMAIN, "Incoming connection"); + //epIncoming=GWEN_IpcEndpoint_CreateIpcTcpServiceForSocket(sk, NULL, 1); + epIncoming=GWEN_MsgEndpoint_new("HTTP-Service", 0); + GWEN_MsgEndpoint_SetSocket(epIncoming, sk); + GWEN_MsgIoEndpoint_Extend(epIncoming); + AQH_HttpEndpoint_Extend(epIncoming, AQH_ENDPOINT_HTTP_FLAGS_PASSIVE); + return epIncoming; +} + + + +int testHttpDaemon() +{ + GWEN_MSG_ENDPOINT *epServer; + int loop; + + AQH_Init(); + epServer=GWEN_TcpdEndpoint_new("127.0.0.1", 55556, NULL, 1); + GWEN_TcpdEndpoint_SetAcceptFn(epServer, _acceptHttpConnection, NULL); + + for (loop=0;; loop++) { + GWEN_MSG_ENDPOINT *ep; + + DBG_INFO(GWEN_LOGDOMAIN, "Loop %d:", loop); + GWEN_MsgEndpoint_IoLoop(epServer, 2000); /* 2000 ms */ + ep=GWEN_MsgEndpoint_Tree2_GetFirstChild(epServer); + while(ep) { + GWEN_MSG *msg; + + DBG_INFO(GWEN_LOGDOMAIN, "- Checking endpoint"); + while( (msg=GWEN_MsgEndpoint_TakeFirstReceivedMessage(ep)) ) { + GWEN_DB_NODE *db; + GWEN_BUFFER *buf; + GWEN_MSG *msgOut; + + DBG_INFO(GWEN_LOGDOMAIN, " - received msg"); + db=GWEN_Msg_GetDbParsedInfo(msg); + if (db) + GWEN_DB_Dump(db, 2); + + buf=GWEN_Buffer_new(0, 256, 0, 1); + GWEN_Buffer_AppendString(buf, "HTTP/1.1 200 Okay\r\n"); + GWEN_Buffer_AppendString(buf, "Connection: close\r\n"); + GWEN_Buffer_AppendString(buf, "Content-length: 23\r\n"); + GWEN_Buffer_AppendString(buf, "\r\n"); + GWEN_Buffer_AppendString(buf, "This is a test string\r\n"); + msgOut=GWEN_Msg_fromBytes((const uint8_t*)GWEN_Buffer_GetStart(buf), GWEN_Buffer_GetUsedBytes(buf)); + GWEN_Buffer_free(buf); + GWEN_MsgEndpoint_AddSendMessage(ep, msgOut); + GWEN_Msg_free(msg); + } /* while */ + ep=GWEN_MsgEndpoint_Tree2_GetNext(ep); + } + GWEN_MsgEndpoint_RemoveUnconnectedAndEmptyChildren(epServer); + } + return 0; +} + + + int testHexfile(int argc, char **argv) { const char *inFilename; @@ -350,7 +420,8 @@ int main(int argc, char **argv) //return testHexfile(argc, argv); //return testFlashRecords(argc, argv); //return testMqttConnection2(); - return testMqttSubscribe2(argc, argv); + //return testMqttSubscribe2(argc, argv); + return testHttpDaemon(); }