Difference between revisions of "Io.c"

From Organic Design wiki
(add open new socket/connect in streamOpen())
(making generic for file/sock)
Line 6: Line 6:
 
#define ioPAKSIZE 128    // keep packet-size small for non-multithreaded design
 
#define ioPAKSIZE 128    // keep packet-size small for non-multithreaded design
 
#define ioPAKMAX  16384  // maximum packet size to allow for variable pak size at runtime
 
#define ioPAKMAX  16384  // maximum packet size to allow for variable pak size at runtime
 +
#define filePAKSIZE 4096  // keep packet-size small for non-multithreaded design
 +
#define sockPAKSIZE 128  // keep packet-size small for non-multithreaded design
 
#define ioBUFSIZE 4096    // dictates max message size
 
#define ioBUFSIZE 4096    // dictates max message size
 
#define ioMAXCLIENTS 1000 // used by listen()
 
#define ioMAXCLIENTS 1000 // used by listen()
Line 33: Line 35:
 
int tPtr;              // Index into term (atomised strcmp incase terminator spans packet-boundary)
 
int tPtr;              // Index into term (atomised strcmp incase terminator spans packet-boundary)
 
int oPtr;
 
int oPtr;
 +
int szPacket;          // Size of the packets to read/write for this stream
 +
// Pointers to the send/recv functions for this stream
 +
ssize_t (recv)(int fd,void* buf,size_t len,int f);
 +
void *send;            // Pointer to the function used for sending a packet
 
} streamInfo;
 
} streamInfo;
 
void stream();
 
void stream();
Line 147: Line 153:
 
// Create new client-node and streamInfo structure from passed file-descriptor
 
// Create new client-node and streamInfo structure from passed file-descriptor
 
// - if resource not NULL, then try and create fd by resolving the resource string
 
// - if resource not NULL, then try and create fd by resolving the resource string
//   fd is used as flags in that case
+
// - also if resource is non-NULL, fd can be used to pass fopen flags
 
int streamOpen(int fd, char *resource) {
 
int streamOpen(int fd, char *resource) {
 +
 +
// Create new streamInfo structure for the new stream
 +
streamInfo *info = malloc(sizeof(streamInfo));
 +
 +
// Try and obtain an fd from resource description
 
if (resource) {
 
if (resource) {
if ((*resource=='.')||(*resource=='/')) fd = fopen(resource,fd|O_NONBLOCK);
+
if ((*resource=='.')||(*resource=='/')) {
 +
fd = fopen(resource,fd|O_NONBLOCK);
 +
info->szPacket = filePAKSIZE;
 +
info->recv = &read;
 +
info->send = &write;
 +
}
 
else {
 
else {
 
if ((fd = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP))<0)
 
if ((fd = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP))<0)
Line 159: Line 175:
 
if (connect(fd,sockaddr *address,socklen_t address_len)<0)
 
if (connect(fd,sockaddr *address,socklen_t address_len)<0)
 
logErrNum("streamOpen/connect() failed returning %d",WSAGetLastError());
 
logErrNum("streamOpen/connect() failed returning %d",WSAGetLastError());
 +
info->szPacket = sockPAKSIZE;
 +
info->recv = &recv;
 +
info->send = &send;
 
}
 
}
 
}
 
}
if (fd<0) return logErrMsg("Couldn't open resource \"%s\"",resource);
+
 
logAddNum("New connection: Stream%d.",fd);
+
// Clean up and exit if no valid fd obtained
 +
if (fd<0) {
 +
free(info);
 +
logErrMsg("Couldn't open resource \"%s\"",resource);
 +
return WSAGetLastError();
 +
} else logAddNum("New connection: Stream%d.",fd);
 +
 
 
// Create new streamInfo structure and allocate buffers
 
// Create new streamInfo structure and allocate buffers
streamInfo *info = malloc(sizeof(streamInfo));
 
 
info->fd = fd;
 
info->fd = fd;
 
info->iBuf = malloc(ioBUFSIZE);
 
info->iBuf = malloc(ioBUFSIZE);
Line 202: Line 226:
 
// If any data to recieve, read a packet
 
// If any data to recieve, read a packet
 
if (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0) {
 
if (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0) {
if ((n = recv(fd,pBuf,ioPAKSIZE,0)) > 0) {
+
if ((n = (info->recv)(fd,pBuf,ioPAKSIZE,0)) > 0) {
 
// Some bytes were read, append to current message (loop last packet if msg too big)
 
// Some bytes were read, append to current message (loop last packet if msg too big)
 
if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE;
 
if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE;

Revision as of 21:43, 16 August 2006

// This article and all its includes are licenced under LGPL // GPL: http://www.gnu.org/copyleft/lesser.html // SRC: http://www.organicdesign.co.nz/server.c

  1. define ioDELAY 0 // normal operation is 0 for no delay
  2. define ioPAKSIZE 128 // keep packet-size small for non-multithreaded design
  3. define ioPAKMAX 16384 // maximum packet size to allow for variable pak size at runtime
  4. define filePAKSIZE 4096 // keep packet-size small for non-multithreaded design
  5. define sockPAKSIZE 128 // keep packet-size small for non-multithreaded design
  6. define ioBUFSIZE 4096 // dictates max message size
  7. define ioMAXCLIENTS 1000 // used by listen()
  8. ifndef __WIN32__
  9. define WSAGetLastError() errno
  10. endif

// Socket structures, globals and prototypes void server(); int ioInit(); int ioExit(); int nonblocking(int socket); fd_set *fdsetInit(int fd); fd_set fdset; int sock,fd; struct sockaddr_in addr; struct timeval to; unsigned long int sockopt_on;

// Client/Stream info structure, globals and prototypes char *pBuf; // Buffer to read/write data packets const char *term = "\r\n\r\n"; typedef struct si { int fd; // File-descriptor for this stream char *iBuf, *oBuf; // Message buffers for this stream int iPtr; // Index into input-buffer for next packet to start at int tPtr; // Index into term (atomised strcmp incase terminator spans packet-boundary) int oPtr; int szPacket; // Size of the packets to read/write for this stream // Pointers to the send/recv functions for this stream ssize_t (recv)(int fd,void* buf,size_t len,int f); void *send; // Pointer to the function used for sending a packet } streamInfo; void stream(); int streamOpen(int fd,char *resource); void streamClose(streamInfo *info); void streamProcess(streamInfo *info);


// Initialise socket listening on specified port int ioInit() {

#if __WIN32__ WSADATA wsaData; if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logErr("WSAStartup() failed!"); #endif

// Set up structures for socket & select sockopt_on = 1; int szAddr = sizeof(struct sockaddr_in); memset((char*)&addr, 0, szAddr); // zero the struct addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = htonl(INADDR_ANY); errno = 0;

// Do the usual socket polava: create,options,bind,listen if ((sock = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP))<0) logErrNum("init/socket() failed returning %d",WSAGetLastError()); if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0) logErrNum("init/setsockopt() failed returning %d",WSAGetLastError()); if (bind(nonblocking(sock),(struct sockaddr*)&addr,szAddr)<0) logErrNum("init/bind() failed returning %d",WSAGetLastError()); if (listen(sock,ioMAXCLIENTS)<0) logErrNum("init/listen() failed returning %d",WSAGetLastError());

if (WSAGetLastError() == 0) { char *msg = malloc(100); sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port); logAdd(msg); free(msg); } else { logAdd("Server failed to start!"); return WSAGetLastError(); }

// Globals for server() and client() quantum-functions pBuf = malloc(ioPAKMAX); *nodeState(nodeSERVER,0) = &server; nodeSetValue(nodeSERVER,nodeCODE,nodeTRUE);

return 0; }


// Perform any cleanup for socket int ioExit() { free(pBuf); // todo: close streams and free buffers etc #if __WIN32__ WSACleanup(); #endif return WSAGetLastError(); }


// make the passed socket non-blocking so accept() returns straight away for multiplexed model // - if no incoming requests, returns EAGAIN or EWOULDBLOCK state int nonblocking(int socket) { #if __WIN32__ ioctlsocket(socket,FIONBIO,&sockopt_on); #else fcntl(socket,F_SETFL,fcntl(socket,F_GETFL)|O_NONBLOCK); #endif return socket; }


// Return pointer to fdset filled with passed file-descriptor ready for a select() call fd_set *fdsetInit(int fd) { FD_ZERO(&fdset); FD_SET(fd,&fdset); to.tv_sec = to.tv_usec = ioDELAY; return &fdset; }


// Process message in iBuf which should be a complete HTTP GET request // - iPtr is put back to 0 so next message fills from start of iBuf void streamProcess(streamInfo *info) { char *rqst, *end, *msg = info->iBuf; if (strncmp("GET /",msg,5)==0) {

// Extract request path & query-string rqst = end = msg+4; while(*++end > ' '); *end = '\0'; printf("Request:\"%s\"\n",rqst);

// Exit if /stop requested if (strcmp("/stop",rqst)==0) nodeExit();

// Send test response back send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0); } else { send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0); logAdd("Peerd only supports GET requests currently!"); } info->iPtr = 0; // reset iPtr ready for next message }


// Create new client-node and streamInfo structure from passed file-descriptor // - if resource not NULL, then try and create fd by resolving the resource string // - also if resource is non-NULL, fd can be used to pass fopen flags int streamOpen(int fd, char *resource) {

// Create new streamInfo structure for the new stream streamInfo *info = malloc(sizeof(streamInfo));

// Try and obtain an fd from resource description if (resource) { if ((*resource=='.')||(*resource=='/')) { fd = fopen(resource,fd|O_NONBLOCK); info->szPacket = filePAKSIZE; info->recv = &read; info->send = &write; } else { if ((fd = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP))<0) logErrNum("streamOpen/socket() failed returning %d",WSAGetLastError()); if (setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0) logErrNum("streamOpen/setsockopt() failed returning %d",WSAGetLastError()); // todo: resolve resource addr if (connect(fd,sockaddr *address,socklen_t address_len)<0) logErrNum("streamOpen/connect() failed returning %d",WSAGetLastError()); info->szPacket = sockPAKSIZE; info->recv = &recv; info->send = &send; } }

// Clean up and exit if no valid fd obtained if (fd<0) { free(info); logErrMsg("Couldn't open resource \"%s\"",resource); return WSAGetLastError(); } else logAddNum("New connection: Stream%d.",fd);

// Create new streamInfo structure and allocate buffers info->fd = fd; info->iBuf = malloc(ioBUFSIZE); info->oBuf = malloc(ioBUFSIZE); info->iPtr = info->oPtr = info->tPtr = 0;

// Create new client-node in clients-loop and add the new streamInfo node nc = nodeGetValue(nodeSTREAMS,nodeLOOP); nc = nodeLoopInsert(nc,0); nodeSetValue(nodeSTREAMS,nodeLOOP,nc); nodeSetValue(nc,nodeCODE,nodeTRUE); *nodeState(nc,0) = &stream; *nodeState(nc,nodeSTREAM) = info; return fd; }


// Close the passed stream, free its resources and remove from clients-loop void streamClose(streamInfo *info) { printf("Stream%d closed.\n",info->fd); //nodeLoopRemove(this); // NOTE: reduction not handling PREV free(info->iBuf); free(info->oBuf); free(info); }


// Each of the stream nodes points to this function in its State void stream() {

// Get pointer to the streamInfo structure in current nodal context (this) streamInfo *info = *nodeState(this, nodeSTREAM); fd = info->fd; char *iBuf = info->iBuf, *oBuf = info->oBuf; int n, i = 0;

// If any data to recieve, read a packet if (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0) { if ((n = (info->recv)(fd,pBuf,ioPAKSIZE,0)) > 0) { // Some bytes were read, append to current message (loop last packet if msg too big) if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE; while (n--) // Append current msg, for each complete msg, process it & reset iPtr if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0; else if (term[info->tPtr] == 0) streamProcess(info); } else if (n == 0) streamClose(info); // Zero bytes were read, do orderly termination else logErrNum("stream/recv() failed returning %d",WSAGetLastError()); }

// Data to send? //if (select(fd+1,NULL,fdsetInit(fd),NULL,&to)>0) { // write a packet from outbuf //int len = ioPAKSIZE; // or less //if (send(fd,oBuf+oPtr,len,0)<0) logErr("send() failed!"); // }

}


// Function called by each stream-node in network's reduction loop // - Checks if any new connections waiting on listening socket, // if so accept and call streamOpen() with new file-descriptor void server() { if (select(sock+1,fdsetInit(sock),NULL,NULL,&to)>0) if (streamOpen(accept(sock,NULL,NULL),NULL)<0) logErrNum("server/accept() failed returning %d",WSAGetLastError()); }