Io.c

From Organic Design wiki
Revision as of 09:53, 14 March 2009 by Nad (talk | contribs) (no per,s)

// http://www.organicdesign.co.nz/peerd - nodal p2p wiki daemonTemplate:C // This article and all its includes are licenced under LGPL // GPL: http://www.gnu.org/copyleft/lesser.html // SRC: http://www.organicdesign.co.nz/io.c // included in http://www.organicdesign.co.nz/peerd.c // See also: http://www.organicdesign.co.nz/talk:io.c, http://www.kegel.com/c10k.html

  1. define ioDELAY 0 // normal operation is 0 for no delay
  2. define ioPAKSIZE 128 // keep packet-size small for multipelxed design
  3. define ioPAKMAX 16384 // maximum packet size to allow for variable pak size at runtime
  4. define filePAKSIZE 4096 // packet size for file-streams
  5. define sockPAKSIZE 128 // packet size for socket-streams
  6. define ioBUFSIZE 4096 // dictates max message size
  7. define ioMAXCLIENTS 1000 // used by listen()
  8. define ioSOCK 0 // stream type of socket
  9. define ioFILE 1 // stream type of file
  1. ifndef __WIN32__
  2. define WSAGetLastError() errno
  3. endif

// Socket structures, globals and prototypes void server(); int ioInit(); int ioExit(); int nonblocking(int); void ioErr(char*); fd_set *fdsetInit(int); fd_set fdset; int sock; struct sockaddr_in addr; struct timeval to; unsigned long int sockopt_on = 1;

// Client/Stream info structure, globals and prototypes char *pBuf; // Buffer to read/write data packets const char *term = "\r\n\r\n"; typedef struct si { int fd; // File-descriptor for this stream int type; // Stream type (ioFILE, ioSOCK) int size; // Size of the packets to read/write for this stream char *iBuf, *oBuf; // Message buffers for this stream int iPtr; // Index into input-buffer for next packet to start at int tPtr; // Index into term (atomised strcmp incase terminator spans packet-boundary) int oPtr; } streamInfo; void io(); int streamOpen(int,char*); void streamClose(streamInfo*); void streamProcess(streamInfo*);


// ----------------------------------------------------------------------------------------- // // io.c

// Initialise socket listening on specified port int ioInit() {

#if __WIN32__ WSADATA wsaData; if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logAdd("WSAStartup() failed!"); #endif

// Set up structures for socket & select addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = htonl(INADDR_ANY); memset(&(addr.sin_zero),0,8); errno = 0;

// Do the usual socket polava: create,options,bind,listen if ((sock = socket(PF_INET,SOCK_STREAM,0))<0) ioErr("init/socket() failed returning %d"); else if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0) ioErr("init/setsockopt() failed returning %d"); else if (bind(nonblocking(sock),(struct sockaddr*)&addr,sizeof(struct sockaddr))<0) ioErr("init/bind() failed returning %d"); else if (listen(sock,ioMAXCLIENTS)<0) ioErr("init/listen() failed returning %d"); else if (WSAGetLastError() == 0) { char *msg = malloc(100); sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port); logAdd(msg); free(msg); } else { logAdd("Server failed to start!"); return WSAGetLastError(); }

// Globals for server() and client() quantum-functions pBuf = malloc(ioPAKMAX); *nodeState(nodeSERVER,nodeCODE) = &server;

return 0; }


// Perform any cleanup for socket int ioExit() { free(pBuf); // todo: close streams and free buffers etc #if __WIN32__ WSACleanup(); #endif return WSAGetLastError(); }


// make the passed socket non-blocking so accept() returns straight away for multiplexed model // - if no incoming requests, returns EAGAIN or EWOULDBLOCK state int nonblocking(int socket) { #if __WIN32__ ioctlsocket(socket,FIONBIO,&sockopt_on); #else fcntl(socket,F_SETFL,fcntl(socket,F_GETFL)|O_NONBLOCK); #endif return socket; }


void ioErr(char *msg) { logAdd(msg,errno = WSAGetLastError()); }


// Return pointer to fdset filled with passed file-descriptor ready for a select() call fd_set *fdsetInit(int fd) { FD_ZERO(&fdset); FD_SET(fd,&fdset); to.tv_sec = to.tv_usec = ioDELAY; return &fdset; }


// Process message in iBuf which should be a complete HTTP GET request // - iPtr is put back to 0 so next message fills from start of iBuf void streamProcess(streamInfo *info) { char *rqst, *end, *msg = info->iBuf; if (strncmp("GET /",msg,5)==0) {

// Extract request path & query-string rqst = end = msg+4; while(*++end > ' '); *end = '\0'; logAdd("Request: \"%s\"",rqst);

// Exit if /stop requested if (strcmp("/stop",rqst)==0) nodeExit();

// Send test response back send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0); } else { send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0); logAdd("Peerd only supports GET requests currently!"); } info->iPtr = 0; // reset iPtr ready for next message }


// Create new client-node and streamInfo structure from passed file-descriptor // - if resource not NULL, then try and create fd by resolving the resource string // - also if resource is non-NULL, fd can be used to pass fopen flags int streamOpen(int fd, char *resource) {

// Create new streamInfo structure for the new stream (default to socket params) streamInfo *info = malloc(sizeof(streamInfo)); info->type = ioSOCK; info->size = sockPAKSIZE;

// Try and obtain an fd from resource description if (resource) { if ((*resource=='.')||(*resource=='/')) { // The resource is a file, use [[[[1]]]] (the fd can be used to pass open-flags) if ((fd = nonblocking(open(resource,fd)))<0) ioErr("streamOpen: File open failed returning %d"); else { info->type = ioFILE; info->size = filePAKSIZE; } } else { // Resolve IP/port from resource text description and attempt connection // - see [[[example]]], [[[hostent]]], [[[[2]]]], [[[[3]]]] struct hostent *host = gethostbyname(resource); if (host == NULL ) ioErr("streamOpen/gethostbyname() failed returning %d"); else { struct in_addr ia; memset((char*)&ia,0,sizeof(struct in_addr)); ia.s_addr = *(u_long*)host->h_addr; logAdd("hostAddr: %s\n",inet_ntoa(ia)); //if ((fd = socket(PF_INET,host->h_addrtype,0))<0) // ioErr("streamOpen/socket() failed returning %d"); //else if (setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0) // ioErr("streamOpen/setsockopt() failed returning %d"); //else if (connect(fd,ntohl(ia),sizeof(uint_32_t))<0) // ioErr("streamOpen/connect() failed returning %d"); //else { // } fd = -1; } } }

// Clean up and exit if no valid fd obtained if (fd<0) { free(info); logAdd("Couldn't open resource \"%s\"",resource); return errno; } else logAdd("New connection: Stream%d.",fd);

// Create new streamInfo structure and allocate buffers info->fd = fd; info->iBuf = malloc(ioBUFSIZE); info->oBuf = malloc(ioBUFSIZE); info->iPtr = info->oPtr = info->tPtr = 0;

// Create new client-node in clients-loop and add the new streamInfo node n = nodeLoopInsert(nodeSTREAMS,0); *nodeState(n,nodeSTREAM) = info; *nodeState(n,nodeCODE) = &io; return fd; }


// Close the passed stream, free its resources and remove from clients-loop void streamClose(streamInfo *info) { logAdd("Stream%d closed.\n",info->fd); nodeLoopRemove(parent,this); free(info->iBuf); free(info->oBuf); free(info); }


// Send and/or receive a packet of data over the current node's associated stream void io() {

// Get pointer to the streamInfo structure in current nodal context (this) streamInfo *info = *nodeState(this, nodeSTREAM); int fd = info->fd, type = info->type, size = info->size; char *iBuf = info->iBuf, *oBuf = info->oBuf;

// Try and read a packet from the stream (see [[[[4]]]], [[[[5]]]], [[[[6]]]]) int n = -1; if (type == ioFILE) n = read(fd,pBuf,size); else if ((type == ioSOCK) && (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0)) n = recv(fd,pBuf,size,0);

// If any bytes read, append to current message if (n > 0) { if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE; // loop last packet if msg too big int i = 0; while (n--) // Append current msg, for each complete msg, process it & reset iPtr if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0; else if (term[info->tPtr] == 0) streamProcess(info); } else if (n == 0) streamClose(info); // Zero bytes were read, do orderly termination else ioErr("stream/recv() failed returning %d");

// Data to send? //if (select(fd+1,NULL,fdsetInit(fd),NULL,&to)>0) { // write a packet from outbuf //int len = ioPAKSIZE; // or less //if (send(fd,oBuf+oPtr,len,0)<0) ioErr("send() failed!"); // }

}


// Function called by each stream-node in network's reduction loop // - Checks if any new connections waiting on listening socket, // if so [[[[7]]]] and call streamOpen() with new file-descriptor void server() { if (select(sock+1,fdsetInit(sock),NULL,NULL,&to)>0) if (streamOpen(accept(sock,NULL,NULL),NULL)<0) ioErr("server/accept() failed returning %d"); }