Difference between revisions of "Io.c"
m |
m |
||
Line 27: | Line 27: | ||
char *pBuf; // Buffer to read/write data packets | char *pBuf; // Buffer to read/write data packets | ||
const char *term = "\r\n\r\n"; | const char *term = "\r\n\r\n"; | ||
− | typedef struct | + | typedef struct si { |
int fd; // File-descriptor for this stream | int fd; // File-descriptor for this stream | ||
char *iBuf, *oBuf; // Message buffers for this stream | char *iBuf, *oBuf; // Message buffers for this stream | ||
Line 36: | Line 36: | ||
void stream(); | void stream(); | ||
int streamOpen(int fd,char *resource); | int streamOpen(int fd,char *resource); | ||
− | void streamClose(streamInfo * | + | void streamClose(streamInfo *info); |
− | void streamProcess(streamInfo * | + | void streamProcess(streamInfo *info); |
Line 121: | Line 121: | ||
// Process message in iBuf which should be a complete HTTP GET request | // Process message in iBuf which should be a complete HTTP GET request | ||
// - iPtr is put back to 0 so next message fills from start of iBuf | // - iPtr is put back to 0 so next message fills from start of iBuf | ||
− | void streamProcess(streamInfo * | + | void streamProcess(streamInfo *info) { |
− | char *rqst, *end, *msg = | + | char *rqst, *end, *msg = info->iBuf; |
if (strncmp("GET /",msg,5)==0) { | if (strncmp("GET /",msg,5)==0) { | ||
Line 135: | Line 135: | ||
// Send test response back | // Send test response back | ||
− | send( | + | send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0); |
} | } | ||
else { | else { | ||
− | send( | + | send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0); |
logAdd("Peerd only supports GET requests currently!"); | logAdd("Peerd only supports GET requests currently!"); | ||
} | } | ||
− | + | info->iPtr = 0; // reset iPtr ready for next message | |
} | } | ||
Line 156: | Line 156: | ||
logAddNum("New connection: Stream%d.",fd); | logAddNum("New connection: Stream%d.",fd); | ||
// Create new streamInfo structure and allocate buffers | // Create new streamInfo structure and allocate buffers | ||
− | streamInfo * | + | streamInfo *info = malloc(sizeof(streamInfo)); |
− | + | info->fd = fd; | |
− | + | info->iBuf = malloc(ioBUFSIZE); | |
− | + | info->oBuf = malloc(ioBUFSIZE); | |
− | + | info->iPtr = info->oPtr = info->tPtr = 0; | |
// Create new client-node in clients-loop and add the new streamInfo | // Create new client-node in clients-loop and add the new streamInfo | ||
− | node nc = nodeGetValue( | + | node nc = nodeGetValue(nodeSTREAMS,nodeLOOP); |
nc = nodeLoopInsert(nc,0); | nc = nodeLoopInsert(nc,0); | ||
− | nodeSetValue( | + | nodeSetValue(nodeSTREAMS,nodeLOOP,nc); |
nodeSetValue(nc,nodeCODE,nodeTRUE); | nodeSetValue(nc,nodeCODE,nodeTRUE); | ||
− | *nodeState(nc,0) = & | + | *nodeState(nc,0) = &stream; |
− | *nodeState(nc,nodeSTREAM) = | + | *nodeState(nc,nodeSTREAM) = info; |
return fd; | return fd; | ||
} | } | ||
Line 174: | Line 174: | ||
// Close the passed stream, free its resources and remove from clients-loop | // Close the passed stream, free its resources and remove from clients-loop | ||
− | void streamClose(streamInfo * | + | void streamClose(streamInfo *info) { |
− | printf("Stream%d closed.\n", | + | printf("Stream%d closed.\n",info->fd); |
//nodeLoopRemove(this); // NOTE: reduction not handling PREV | //nodeLoopRemove(this); // NOTE: reduction not handling PREV | ||
− | free( | + | free(info->iBuf); |
− | free( | + | free(info->oBuf); |
− | free( | + | free(info); |
} | } | ||
Line 187: | Line 187: | ||
// Get pointer to the streamInfo structure in current nodal context (this) | // Get pointer to the streamInfo structure in current nodal context (this) | ||
− | streamInfo * | + | streamInfo *info = *nodeState(this, nodeSTREAM); |
− | fd = | + | fd = info->fd; |
− | char *iBuf = | + | char *iBuf = info->iBuf, *oBuf = info->oBuf; |
int n, i = 0; | int n, i = 0; | ||
Line 196: | Line 196: | ||
if ((n = recv(fd,pBuf,ioPAKSIZE,0)) > 0) { | if ((n = recv(fd,pBuf,ioPAKSIZE,0)) > 0) { | ||
// Some bytes were read, append to current message (loop last packet if msg too big) | // Some bytes were read, append to current message (loop last packet if msg too big) | ||
− | if ( | + | if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE; |
while (n--) // Append current msg, for each complete msg, process it & reset iPtr | while (n--) // Append current msg, for each complete msg, process it & reset iPtr | ||
− | if ((iBuf[ | + | if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0; |
− | else if (term[ | + | else if (term[info->tPtr] == 0) streamProcess(info); |
} | } | ||
− | else if (n == 0) streamClose( | + | else if (n == 0) streamClose(info); // Zero bytes were read, do orderly termination |
else logErrNum("stream/recv() failed returning %d",WSAGetLastError()); | else logErrNum("stream/recv() failed returning %d",WSAGetLastError()); | ||
} | } |
Revision as of 04:36, 15 August 2006
// This article and all its includes are licenced under LGPL // GPL: http://www.gnu.org/copyleft/lesser.html // SRC: http://www.organicdesign.co.nz/server.c
- define ioDELAY 0 // normal operation is 0 for no delay
- define ioPAKSIZE 128 // keep packet-size small for non-multithreaded design
- define ioPAKMAX 16384 // maximum packet size to allow for variable pak size at runtime
- define ioBUFSIZE 4096 // dictates max message size
- define ioMAXCLIENTS 1000 // used by listen()
- ifndef __WIN32__
- define WSAGetLastError() errno
- endif
// Socket structures, globals and prototypes void server(); int ioInit(); int ioExit(); int nonblocking(int socket); fd_set *fdsetInit(int fd); fd_set fdset; int sock,fd; struct sockaddr_in addr; struct timeval to; unsigned long int sockopt_on;
// Client/Stream info structure, globals and prototypes char *pBuf; // Buffer to read/write data packets const char *term = "\r\n\r\n"; typedef struct si { int fd; // File-descriptor for this stream char *iBuf, *oBuf; // Message buffers for this stream int iPtr; // Index into input-buffer for next packet to start at int tPtr; // Index into term (atomised strcmp incase terminator spans packet-boundary) int oPtr; } streamInfo; void stream(); int streamOpen(int fd,char *resource); void streamClose(streamInfo *info); void streamProcess(streamInfo *info);
// Initialise socket listening on specified port
int ioInit() {
#if __WIN32__ WSADATA wsaData; if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logErr("WSAStartup() failed!"); #endif
// Set up structures for socket & select sockopt_on = 1; int szAddr = sizeof(struct sockaddr_in); memset((char*)&addr, 0, szAddr); // zero the struct addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = htonl(INADDR_ANY); errno = 0;
// Do the usual socket polava: create,options,bind,listen if ((sock = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP))<0) logErrNum("init/socket() failed returning %d",WSAGetLastError()); if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0) logErrNum("init/setsockopt() failed returning %d",WSAGetLastError()); if (bind(nonblocking(sock),(struct sockaddr*)&addr,szAddr)<0) logErrNum("init/bind() failed returning %d",WSAGetLastError()); if (listen(sock,ioMAXCLIENTS)<0) logErrNum("init/listen() failed returning %d",WSAGetLastError());
if (WSAGetLastError() == 0) { char *msg = malloc(100); sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port); logAdd(msg); free(msg); } else { logAdd("Server failed to start!"); return WSAGetLastError(); }
// Globals for server() and client() quantum-functions pBuf = malloc(ioPAKMAX); *nodeState(nodeSERVER,0) = &server; nodeSetValue(nodeSERVER,nodeCODE,nodeTRUE);
return 0; }
// Perform any cleanup for socket
int ioExit() {
free(pBuf);
// todo: close streams and free buffers etc
#if __WIN32__
WSACleanup();
#endif
return WSAGetLastError();
}
// make the passed socket non-blocking so accept() returns straight away for multiplexed model
// - if no incoming requests, returns EAGAIN or EWOULDBLOCK state
int nonblocking(int socket) {
#if __WIN32__
ioctlsocket(socket,FIONBIO,&sockopt_on);
#else
fcntl(socket,F_SETFL,fcntl(socket,F_GETFL)|O_NONBLOCK);
#endif
return socket;
}
// Return pointer to fdset filled with passed file-descriptor ready for a select() call
fd_set *fdsetInit(int fd) {
FD_ZERO(&fdset);
FD_SET(fd,&fdset);
to.tv_sec = to.tv_usec = ioDELAY;
return &fdset;
}
// Process message in iBuf which should be a complete HTTP GET request
// - iPtr is put back to 0 so next message fills from start of iBuf
void streamProcess(streamInfo *info) {
char *rqst, *end, *msg = info->iBuf;
if (strncmp("GET /",msg,5)==0) {
// Extract request path & query-string rqst = end = msg+4; while(*++end > ' '); *end = '\0'; printf("Request:\"%s\"\n",rqst);
// Exit if /stop requested if (strcmp("/stop",rqst)==0) nodeExit();
// Send test response back send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0); } else { send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0); logAdd("Peerd only supports GET requests currently!"); } info->iPtr = 0; // reset iPtr ready for next message }
// Create new client-node and streamInfo structure from passed file-descriptor
// - if fd is NULL, then try and create one by resolving the resource string
int streamOpen(int fd, char *resource) {
if (fd == NULL) {
// if resource is /^[./]/ then its a file
// - file will have different packet size etc
// - may need to include &recv/&read etc in streamInfo struct
}
if (fd < 0) return logErrMsg("Couldn't open resource \"%s\"",resource);
logAddNum("New connection: Stream%d.",fd);
// Create new streamInfo structure and allocate buffers
streamInfo *info = malloc(sizeof(streamInfo));
info->fd = fd;
info->iBuf = malloc(ioBUFSIZE);
info->oBuf = malloc(ioBUFSIZE);
info->iPtr = info->oPtr = info->tPtr = 0;
// Create new client-node in clients-loop and add the new streamInfo node nc = nodeGetValue(nodeSTREAMS,nodeLOOP); nc = nodeLoopInsert(nc,0); nodeSetValue(nodeSTREAMS,nodeLOOP,nc); nodeSetValue(nc,nodeCODE,nodeTRUE); *nodeState(nc,0) = &stream; *nodeState(nc,nodeSTREAM) = info; return fd; }
// Close the passed stream, free its resources and remove from clients-loop
void streamClose(streamInfo *info) {
printf("Stream%d closed.\n",info->fd);
//nodeLoopRemove(this); // NOTE: reduction not handling PREV
free(info->iBuf);
free(info->oBuf);
free(info);
}
// Each of the stream nodes points to this function in its State
void stream() {
// Get pointer to the streamInfo structure in current nodal context (this) streamInfo *info = *nodeState(this, nodeSTREAM); fd = info->fd; char *iBuf = info->iBuf, *oBuf = info->oBuf; int n, i = 0;
// If any data to recieve, read a packet if (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0) { if ((n = recv(fd,pBuf,ioPAKSIZE,0)) > 0) { // Some bytes were read, append to current message (loop last packet if msg too big) if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE; while (n--) // Append current msg, for each complete msg, process it & reset iPtr if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0; else if (term[info->tPtr] == 0) streamProcess(info); } else if (n == 0) streamClose(info); // Zero bytes were read, do orderly termination else logErrNum("stream/recv() failed returning %d",WSAGetLastError()); }
// Data to send? //if (select(fd+1,NULL,fdsetInit(fd),NULL,&to)>0) { // write a packet from outbuf //int len = ioPAKSIZE; // or less //if (send(fd,oBuf+oPtr,len,0)<0) logErr("send() failed!"); // }
}
// Function called by each stream-node in network's reduction loop
// - Checks if any new connections waiting on listening socket,
// if so accept and call streamOpen() with new file-descriptor
void server() {
if (select(sock+1,fdsetInit(sock),NULL,NULL,&to)>0)
if ((fd = nonblocking(accept(sock,NULL,NULL)))>0) streamOpen(fd,NULL);
else logErrNum("io/accept() failed returning %d",WSAGetLastError());
}