io.c
From Organic Design wiki
// http://www.organicdesign.co.nz/peerd - nodal p2p wiki daemon
// This article and all its includes are licenced under LGPL
// GPL: http://www.gnu.org/copyleft/lesser.html
// SRC: http://www.organicdesign.co.nz/io.c
// included in http://www.organicdesign.co.nz/peerd.c
// See also: http://www.organicdesign.co.nz/talk:io.c, http://www.kegel.com/c10k.html
#define ioDELAY 0 // normal operation is 0 for no delay
#define ioPAKSIZE 128 // keep packet-size small for multipelxed design
#define ioPAKMAX 16384 // maximum packet size to allow for variable pak size at runtime
#define filePAKSIZE 4096 // packet size for file-streams
#define sockPAKSIZE 128 // packet size for socket-streams
#define ioBUFSIZE 4096 // dictates max message size
#define ioMAXCLIENTS 1000 // used by listen()
#define ioSOCK 0 // stream type of socket
#define ioFILE 1 // stream type of file
#ifndef __WIN32__
#define WSAGetLastError() errno
#endif
// Socket structures, globals and prototypes
void server();
int ioInit();
int ioExit();
int nonblocking(int);
void ioErr(char*);
fd_set *fdsetInit(int);
fd_set fdset;
int sock;
struct sockaddr_in addr;
struct timeval to;
unsigned long int sockopt_on = 1;
// Client/Stream info structure, globals and prototypes
char *pBuf; // Buffer to read/write data packets
const char *term = "\r\n\r\n";
typedef struct si {
int fd; // File-descriptor for this stream
int type; // Stream type (ioFILE, ioSOCK)
int size; // Size of the packets to read/write for this stream
char *iBuf, *oBuf; // Message buffers for this stream
int iPtr; // Index into input-buffer for next packet to start at
int tPtr; // Index into term (atomised strcmp incase terminator spans packet-boundary)
int oPtr;
} streamInfo;
void io();
int streamOpen(int,char*);
void streamClose(streamInfo*);
void streamProcess(streamInfo*);
// ----------------------------------------------------------------------------------------- //
// io.c
// Initialise socket listening on specified port
int ioInit() {
#if __WIN32__
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logAdd("WSAStartup() failed!");
#endif
// Set up structures for socket & select
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
memset(&(addr.sin_zero),0,8);
errno = 0;
// Do the usual socket polava: create,options,bind,listen
if ((sock = socket(PF_INET,SOCK_STREAM,0))<0)
ioErr("init/socket() failed returning %d");
else if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
ioErr("init/setsockopt() failed returning %d");
else if (bind(nonblocking(sock),(struct sockaddr*)&addr,sizeof(struct sockaddr))<0)
ioErr("init/bind() failed returning %d");
else if (listen(sock,ioMAXCLIENTS)<0)
ioErr("init/listen() failed returning %d");
else if (WSAGetLastError() == 0) {
char *msg = malloc(100);
sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port);
logAdd(msg);
free(msg);
}
else {
logAdd("Server failed to start!");
return WSAGetLastError();
}
// Globals for server() and client() quantum-functions
pBuf = malloc(ioPAKMAX);
*nodeState(nodeSERVER,nodeCODE) = &server;
return 0;
}
// Perform any cleanup for socket
int ioExit() {
free(pBuf);
// todo: close streams and free buffers etc
#if __WIN32__
WSACleanup();
#endif
return WSAGetLastError();
}
// make the passed socket non-blocking so accept() returns straight away for multiplexed model
// - if no incoming requests, returns EAGAIN or EWOULDBLOCK state
int nonblocking(int socket) {
#if __WIN32__
ioctlsocket(socket,FIONBIO,&sockopt_on);
#else
fcntl(socket,F_SETFL,fcntl(socket,F_GETFL)|O_NONBLOCK);
#endif
return socket;
}
void ioErr(char *msg) {
logAdd(msg,errno = WSAGetLastError());
}
// Return pointer to fdset filled with passed file-descriptor ready for a select() call
fd_set *fdsetInit(int fd) {
FD_ZERO(&fdset);
FD_SET(fd,&fdset);
to.tv_sec = to.tv_usec = ioDELAY;
return &fdset;
}
// Process message in iBuf which should be a complete HTTP GET request
// - iPtr is put back to 0 so next message fills from start of iBuf
void streamProcess(streamInfo *info) {
char *rqst, *end, *msg = info->iBuf;
if (strncmp("GET /",msg,5)==0) {
// Extract request path & query-string
rqst = end = msg+4;
while(*++end > ' ');
*end = '\0';
logAdd("Request: \"%s\"",rqst);
// Exit if /stop requested
if (strcmp("/stop",rqst)==0) nodeExit();
// Send test response back
send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0);
}
else {
send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0);
logAdd("Peerd only supports GET requests currently!");
}
info->iPtr = 0; // reset iPtr ready for next message
}
// Create new client-node and streamInfo structure from passed file-descriptor
// - if resource not NULL, then try and create fd by resolving the resource string
// - also if resource is non-NULL, fd can be used to pass fopen flags
int streamOpen(int fd, char *resource) {
// Create new streamInfo structure for the new stream (default to socket params)
streamInfo *info = malloc(sizeof(streamInfo));
info->type = ioSOCK;
info->size = sockPAKSIZE;
// Try and obtain an fd from resource description
if (resource) {
if ((*resource=='.')||(*resource=='/')) {
// The resource is a file, use [[[[http://www.opengroup.org/onlinepubs/009695399/functions/open.html|open]]]] (the fd can be used to pass open-flags)
if ((fd = nonblocking(open(resource,fd)))<0)
ioErr("streamOpen: File open failed returning %d");
else {
info->type = ioFILE;
info->size = filePAKSIZE;
}
}
else {
// Resolve IP/port from resource text description and attempt connection
// - see [[[[http://developer.novell.com/wiki/index.php/BSD_gethost_functions|gethost example]]]], [[[[http://www.delorie.com/gnu/docs/glibc/libc_319.html|struct hostent]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/gethostbyname.html|gethostbyname]]]], [[[[http://www.die.net/doc/linux/man/man2/connect.2.html|connect]]]]
struct hostent *host = gethostbyname(resource);
if (host == NULL )
ioErr("streamOpen/gethostbyname() failed returning %d");
else {
struct in_addr ia;
memset((char*)&ia,0,sizeof(struct in_addr));
ia.s_addr = *(u_long*)host->h_addr;
logAdd("hostAddr: %s\n",inet_ntoa(ia));
//if ((fd = socket(PF_INET,host->h_addrtype,0))<0)
// ioErr("streamOpen/socket() failed returning %d");
//else if (setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
// ioErr("streamOpen/setsockopt() failed returning %d");
//else if (connect(fd,ntohl(ia),sizeof(uint_32_t))<0)
// ioErr("streamOpen/connect() failed returning %d");
//else {
// }
fd = -1;
}
}
}
// Clean up and exit if no valid fd obtained
if (fd<0) {
free(info);
logAdd("Couldn't open resource \"%s\"",resource);
return errno;
}
else logAdd("New connection: Stream%d.",fd);
// Create new streamInfo structure and allocate buffers
info->fd = fd;
info->iBuf = malloc(ioBUFSIZE);
info->oBuf = malloc(ioBUFSIZE);
info->iPtr = info->oPtr = info->tPtr = 0;
// Create new client-node in clients-loop and add the new streamInfo
node n = nodeLoopInsert(nodeSTREAMS,0);
*nodeState(n,nodeSTREAM) = info;
*nodeState(n,nodeCODE) = &io;
return fd;
}
// Close the passed stream, free its resources and remove from clients-loop
void streamClose(streamInfo *info) {
logAdd("Stream%d closed.\n",info->fd);
nodeLoopRemove(parent,this);
free(info->iBuf);
free(info->oBuf);
free(info);
}
// Send and/or receive a packet of data over the current node's associated stream
void io() {
// Get pointer to the streamInfo structure in current nodal context (this)
streamInfo *info = *nodeState(this, nodeSTREAM);
int fd = info->fd, type = info->type, size = info->size;
char *iBuf = info->iBuf, *oBuf = info->oBuf;
// Try and read a packet from the stream (see [[[[http://www.opengroup.org/onlinepubs/009695399/functions/select.html|select]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/recv.html|recv]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/read.html|read]]]])
int n = -1;
if (type == ioFILE)
n = read(fd,pBuf,size);
else if ((type == ioSOCK) && (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0))
n = recv(fd,pBuf,size,0);
// If any bytes read, append to current message
if (n > 0) {
if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE; // loop last packet if msg too big
int i = 0;
while (n--) // Append current msg, for each complete msg, process it & reset iPtr
if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0;
else if (term[info->tPtr] == 0) streamProcess(info);
}
else if (n == 0) streamClose(info); // Zero bytes were read, do orderly termination
else ioErr("stream/recv() failed returning %d");
// Data to send?
//if (select(fd+1,NULL,fdsetInit(fd),NULL,&to)>0) {
// write a packet from outbuf
//int len = ioPAKSIZE; // or less
//if (send(fd,oBuf+oPtr,len,0)<0) ioErr("send() failed!");
// }
}
// Function called by each stream-node in network's reduction loop
// - Checks if any new connections waiting on listening socket,
// if so [[[[http://www.opengroup.org/onlinepubs/009695399/functions/accept.html|accept]]]] and call streamOpen() with new file-descriptor
void server() {
if (select(sock+1,fdsetInit(sock),NULL,NULL,&to)>0)
if (streamOpen(accept(sock,NULL,NULL),NULL)<0)
ioErr("server/accept() failed returning %d");
}