Difference between revisions of "Server.c"

From Organic Design wiki
(yip, works very solidly - but back to rational sizes)
(tidy up err reporting etc)
Line 20: Line 20:
  
 
// Stream info structure and globals
 
// Stream info structure and globals
 +
char *pBuf;                // Buffer to read/write data packets
 +
const char *term = "\r\n\r\n";
 
typedef struct siStruct {
 
typedef struct siStruct {
 
int fd;                // File-descriptor for this stream
 
int fd;                // File-descriptor for this stream
 
char *iBuf, *oBuf;    // Message buffers for this stream
 
char *iBuf, *oBuf;    // Message buffers for this stream
 
int iPtr;              // Index into input-buffer for next packet to start at
 
int iPtr;              // Index into input-buffer for next packet to start at
int tPtr;              // Index into iTerm (atomised strcmp incase terminator spans packet-boundary)
+
int tPtr;              // Index into term (atomised strcmp incase terminator spans packet-boundary)
 
int oPtr;
 
int oPtr;
 
} streamInfo;
 
} streamInfo;
char *pBuf;                // Buffer to read/write data packets
 
const char *iTerm = "\r\n\r\n";
 
  
 
// Function prototypes
 
// Function prototypes
Line 58: Line 58:
 
// Do the usual socket polava: create,options,bind,listen
 
// Do the usual socket polava: create,options,bind,listen
 
if ((sock = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP))<0)
 
if ((sock = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP))<0)
logErrNum("socket() failed returning %d",WSAGetLastError());
+
logErrNum("init/socket() failed returning %d",WSAGetLastError());
 
if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
 
if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
logErr("setsockopt() failed!");
+
logErrNum("init/setsockopt() failed returning %d",WSAGetLastError());
if (bind(nonblocking(sock),(struct sockaddr*)&addr,szAddr)<0) logErr("bind() failed!");
+
if (bind(nonblocking(sock),(struct sockaddr*)&addr,szAddr)<0)
 +
logErrNum("init/bind() failed returning %d",WSAGetLastError());
 
if (listen(sock,svrMAXCLIENTS)<0) logErr("listen() failed!");
 
if (listen(sock,svrMAXCLIENTS)<0) logErr("listen() failed!");
if (errno == 0) {
+
logErrNum("init/listen() failed returning %d",WSAGetLastError());
 +
 
 +
if (WSAGetLastError() == 0) {
 
char *msg = malloc(100);
 
char *msg = malloc(100);
 
sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port);
 
sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port);
Line 71: Line 74:
 
else {
 
else {
 
logAdd("Server failed to start!");
 
logAdd("Server failed to start!");
return errno;
+
return WSAGetLastError();
 
}
 
}
  
Line 126: Line 129:
 
*nodeState(n,nodeNULL) = &client;
 
*nodeState(n,nodeNULL) = &client;
 
*nodeState(n,nodeSTREAMINFO) = stream;
 
*nodeState(n,nodeSTREAMINFO) = stream;
}
+
} else logErrNum("server/accept() failed returning %d",WSAGetLastError());
else logErr("accept(): failed!");
+
} else logErrNum("server/select() failed returning %d",WSAGetLastError());
}
 
 
}
 
}
  
Line 148: Line 150:
 
if (stream->iPtr > svrBUFSIZE-svrPAKSIZE) stream->iPtr -= svrPAKSIZE;
 
if (stream->iPtr > svrBUFSIZE-svrPAKSIZE) stream->iPtr -= svrPAKSIZE;
 
while (n--)  // Append current msg, for each complete msg, process it & reset iPtr
 
while (n--)  // Append current msg, for each complete msg, process it & reset iPtr
if ((iBuf[stream->iPtr++] = pBuf[i++]) != iTerm[stream->tPtr++]) stream->tPtr = 0;
+
if ((iBuf[stream->iPtr++] = pBuf[i++]) != term[stream->tPtr++]) stream->tPtr = 0;
else if (iTerm[stream->tPtr] == 0) process(stream);
+
else if (term[stream->tPtr] == 0) processStream(stream);
 
}
 
}
else if (n == 0) {
+
else if (n == 0) closeStream(stream); // Zero bytes were read, do orderly termination
// Zero bytes were read, do orderly termination
+
else  logErrNum("client/recv() failed returning %d",WSAGetLastError());
printf("Stream%d closed.\n",fd);
+
} else logErrNum("client/select() failed returning %d",WSAGetLastError());
//nodeLoopRemove(this); // NOTE: reduction not handling PREV
+
 
free(iBuf);
 
free(oBuf);
 
free(stream);
 
}
 
else logErrNum("recv() failed returning %d",WSAGetLastError());
 
}
 
 
return;
 
return;
 
// Data to send?
 
// Data to send?
Line 177: Line 173:
 
// Process message in iBuf which should be a complete HTTP GET request
 
// Process message in iBuf which should be a complete HTTP GET request
 
// - iPtr is put back to 0 so next message fills from start of iBuf
 
// - iPtr is put back to 0 so next message fills from start of iBuf
void process(streamInfo *stream) {
+
void processStream(streamInfo *stream) {
stream->iPtr = 0;
 
 
char *rqst, *end, *msg = stream->iBuf;
 
char *rqst, *end, *msg = stream->iBuf;
 
if (strncmp("GET /",msg,5)==0) {
 
if (strncmp("GET /",msg,5)==0) {
Line 201: Line 196:
 
logAdd("Peerd only supports GET requests currently!");
 
logAdd("Peerd only supports GET requests currently!");
 
}
 
}
 +
stream->iPtr = 0; // reset iPtr ready for next message
 +
}
 +
 +
 +
// Close the passed stream and free its resources
 +
void closeStream(streamInfo *stream) {
 +
printf("Stream%d closed.\n",stream->fd);
 +
//nodeLoopRemove(this); // NOTE: reduction not handling PREV
 +
free(stream->iBuf);
 +
free(stream->oBuf);
 +
free(stream);
 
}
 
}

Revision as of 00:07, 3 August 2006

// This article and all its includes are licenced under LGPL // GPL: http://www.gnu.org/copyleft/lesser.html // SRC: http://www.organicdesign.co.nz/server.c


  1. define svrDELAY 0 // normal operation is 0 for no delay
  2. define svrPAKSIZE 128 // keep packet-size small for non-multithreaded design
  3. define svrBUFSIZE 4096 // dictates max message size
  4. define svrMAXCLIENTS 1000 // used by listen()
  5. ifndef __WIN32__
  6. define WSAGetLastError() errno
  7. endif

// Socket structures and variables struct sockaddr_in addr; struct timeval to; fd_set fdset; unsigned long int sockopt_on; int sock;

// Stream info structure and globals char *pBuf; // Buffer to read/write data packets const char *term = "\r\n\r\n"; typedef struct siStruct { int fd; // File-descriptor for this stream char *iBuf, *oBuf; // Message buffers for this stream int iPtr; // Index into input-buffer for next packet to start at int tPtr; // Index into term (atomised strcmp incase terminator spans packet-boundary) int oPtr; } streamInfo;

// Function prototypes //void server(); // declared earlier in nodalHusk since called in root loop //void client(); void process(); int server_init(); int server_exit(); int nonblocking(int socket);


// Initialise socket listening on specified port int server_init() {

#if __WIN32__ WSADATA wsaData; if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logErr("WSAStartup() failed!"); #endif

// Set up structures for socket & select sockopt_on = 1; int szAddr = sizeof(struct sockaddr_in); memset((char*)&addr, 0, szAddr); // zero the struct addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = htonl(INADDR_ANY); errno = 0;

// Do the usual socket polava: create,options,bind,listen if ((sock = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP))<0) logErrNum("init/socket() failed returning %d",WSAGetLastError()); if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0) logErrNum("init/setsockopt() failed returning %d",WSAGetLastError()); if (bind(nonblocking(sock),(struct sockaddr*)&addr,szAddr)<0) logErrNum("init/bind() failed returning %d",WSAGetLastError()); if (listen(sock,svrMAXCLIENTS)<0) logErr("listen() failed!"); logErrNum("init/listen() failed returning %d",WSAGetLastError());

if (WSAGetLastError() == 0) { char *msg = malloc(100); sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port); logAdd(msg); free(msg); } else { logAdd("Server failed to start!"); return WSAGetLastError(); }

// Globals for server() and client() quantum-functions pBuf = malloc(svrPAKSIZE);

return 0; }


// Perform any cleanup for socket int server_exit() { free(pBuf); // todo: close streams and free buffers etc #if __WIN32__ WSACleanup(); #endif return WSAGetLastError(); }


// make the passed socket non-blocking so accept() returns straight away for multiplexed model // - if no incoming requests, returns EAGAIN or EWOULDBLOCK state int nonblocking(int socket) { #if __WIN32__ ioctlsocket(socket,FIONBIO,&sockopt_on); #else fcntl(socket,F_SETFL,fcntl(socket,F_GETFL)|O_NONBLOCK); #endif return socket; }


// Function called by each stream-node in network's reduction loop // - there is also a streams-container node containing zero or more stream nodes void server() { FD_ZERO(&fdset); FD_SET(sock,&fdset); to.tv_sec = to.tv_usec = svrDELAY; if (select(sock+1,&fdset,NULL,NULL,&to)>0) { // New connection request, accept and create new stream-node int fd = nonblocking(accept(sock,NULL,NULL)); if (fd>0) { printf("New connection: Stream%d.\n",fd); streamInfo *stream = malloc(sizeof(streamInfo)); stream->fd = fd; stream->iBuf = malloc(svrBUFSIZE); stream->oBuf = malloc(svrBUFSIZE); stream->iPtr = stream->oPtr = stream->tPtr = 0; node n = nodeGetValue(nodeCLIENTS,nodeLOOP); n = nodeLoopInsert(n,nodeNULL); nodeSetValue(nodeCLIENTS,nodeLOOP,n); nodeSetValue(n,nodeCODE,nodeTRUE); *nodeState(n,nodeNULL) = &client; *nodeState(n,nodeSTREAMINFO) = stream; } else logErrNum("server/accept() failed returning %d",WSAGetLastError()); } else logErrNum("server/select() failed returning %d",WSAGetLastError()); }


// Each of the stream nodes points to this function in its State void client() { // Get the info for this stream ready for reading/writing a packet if necessary streamInfo *stream = *nodeState(this, nodeSTREAMINFO); char *iBuf = stream->iBuf, *oBuf = stream->oBuf; int n, i = 0, fd = stream->fd;

// If any data to recieve, read a packet FD_ZERO(&fdset); FD_SET(fd,&fdset); to.tv_sec = to.tv_usec = svrDELAY; if (select(fd+1,&fdset,NULL,NULL,&to)>0) { if ((n = recv(fd,pBuf,svrPAKSIZE,0)) > 0) { // Some bytes were read, append to current message (loop last packet if msg too big) if (stream->iPtr > svrBUFSIZE-svrPAKSIZE) stream->iPtr -= svrPAKSIZE; while (n--) // Append current msg, for each complete msg, process it & reset iPtr if ((iBuf[stream->iPtr++] = pBuf[i++]) != term[stream->tPtr++]) stream->tPtr = 0; else if (term[stream->tPtr] == 0) processStream(stream); } else if (n == 0) closeStream(stream); // Zero bytes were read, do orderly termination else logErrNum("client/recv() failed returning %d",WSAGetLastError()); } else logErrNum("client/select() failed returning %d",WSAGetLastError());

return; // Data to send? FD_ZERO(&fdset); FD_SET(fd,&fdset); to.tv_sec = to.tv_usec = svrDELAY; if (select(fd+1,NULL,&fdset,NULL,&to)>0) { // write a packet from outbuf //int len = svrPAKSIZE; // or less //if (send(fd,oBuf+oPtr,len,0)<0) logErr("send() failed!"); }

}


// Process message in iBuf which should be a complete HTTP GET request // - iPtr is put back to 0 so next message fills from start of iBuf void processStream(streamInfo *stream) { char *rqst, *end, *msg = stream->iBuf; if (strncmp("GET /",msg,5)==0) {

// Extract request path & query-string rqst = end = msg+4; while(*++end > ' '); *end = '\0'; printf("Request:\"%s\"\n",rqst);

// Exit if /stop requested if (strcmp("/stop",rqst)==0) { logAdd("/stop requested, stopping."); exit(EXIT_SUCCESS); }

// Send test response back send(stream->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0); } else { send(stream->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0); logAdd("Peerd only supports GET requests currently!"); } stream->iPtr = 0; // reset iPtr ready for next message }


// Close the passed stream and free its resources void closeStream(streamInfo *stream) { printf("Stream%d closed.\n",stream->fd); //nodeLoopRemove(this); // NOTE: reduction not handling PREV free(stream->iBuf); free(stream->oBuf); free(stream); }