Difference between revisions of "Io.c"

From Organic Design wiki
(moved from server.c)
 
m (Nad moved page Io.h to Io.c without leaving a redirect)
 
(62 intermediate revisions by 2 users not shown)
Line 1: Line 1:
 +
{{legacy}}{{lowercase}}
 +
<source lang="c">
 +
// http://www.organicdesign.co.nz/peerd - nodal p2p wiki daemon
 
// This article and all its includes are licenced under LGPL
 
// This article and all its includes are licenced under LGPL
 
// GPL: http://www.gnu.org/copyleft/lesser.html
 
// GPL: http://www.gnu.org/copyleft/lesser.html
// SRC: http://www.organicdesign.co.nz/server.c
+
// SRC: http://www.organicdesign.co.nz/io.c
 +
// included in http://www.organicdesign.co.nz/peerd.c
 +
// See also: http://www.organicdesign.co.nz/talk:io.c, http://www.kegel.com/c10k.html
 +
 
 +
#define ioDELAY 0        // normal operation is 0 for no delay
 +
#define ioPAKSIZE 128    // keep packet-size small for multipelxed design
 +
#define ioPAKMAX  16384  // maximum packet size to allow for variable pak size at runtime
 +
#define filePAKSIZE 4096  // packet size for file-streams
 +
#define sockPAKSIZE 128  // packet size for socket-streams
 +
#define ioBUFSIZE 4096    // dictates max message size
 +
#define ioMAXCLIENTS 1000 // used by listen()
 +
#define ioSOCK 0          // stream type of socket
 +
#define ioFILE 1          // stream type of file
  
#define svrDELAY 0        // normal operation is 0 for no delay
 
#define svrPAKSIZE 128    // keep packet-size small for non-multithreaded design
 
#define svrPAKMAX  16384  // maximum packet size to allow for variable pak size at runtime
 
#define svrBUFSIZE 4096    // dictates max message size
 
#define svrMAXCLIENTS 1000 // used by listen()
 
 
#ifndef __WIN32__
 
#ifndef __WIN32__
 
#define WSAGetLastError() errno
 
#define WSAGetLastError() errno
Line 14: Line 24:
 
// Socket structures, globals and prototypes
 
// Socket structures, globals and prototypes
 
void server();
 
void server();
int serverInit();
+
int ioInit();
int serverExit();
+
int ioExit();
int nonblocking(int socket);
+
int nonblocking(int);
fd_set *fdsetInit(int fd);
+
void ioErr(char*);
 +
fd_set *fdsetInit(int);
 
fd_set fdset;
 
fd_set fdset;
int sock,fd;
+
int sock;
 
struct sockaddr_in addr;
 
struct sockaddr_in addr;
 
struct timeval to;
 
struct timeval to;
unsigned long int sockopt_on;
+
unsigned long int sockopt_on = 1;
  
 
// Client/Stream info structure, globals and prototypes
 
// Client/Stream info structure, globals and prototypes
 
char *pBuf;                // Buffer to read/write data packets
 
char *pBuf;                // Buffer to read/write data packets
 
const char *term = "\r\n\r\n";
 
const char *term = "\r\n\r\n";
typedef struct siStruct {
+
typedef struct si {
 
int fd;                // File-descriptor for this stream
 
int fd;                // File-descriptor for this stream
 +
int type;              // Stream type (ioFILE, ioSOCK)
 +
int size;              // Size of the packets to read/write for this stream
 
char *iBuf, *oBuf;    // Message buffers for this stream
 
char *iBuf, *oBuf;    // Message buffers for this stream
 
int iPtr;              // Index into input-buffer for next packet to start at
 
int iPtr;              // Index into input-buffer for next packet to start at
Line 34: Line 47:
 
int oPtr;
 
int oPtr;
 
} streamInfo;
 
} streamInfo;
void client();
+
void io();
int streamOpen(int fd,char *resource);
+
int streamOpen(int,char*);
void streamClose(streamInfo *stream);
+
void streamClose(streamInfo*);
void streamProcess(streamInfo *stream);
+
void streamProcess(streamInfo*);
 +
 
  
 +
// ----------------------------------------------------------------------------------------- //
 +
// io.c
  
 
// Initialise socket listening on specified port
 
// Initialise socket listening on specified port
int serverInit() {
+
int ioInit() {
  
 
#if __WIN32__
 
#if __WIN32__
 
WSADATA wsaData;
 
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logErr("WSAStartup() failed!");
+
if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logAdd("WSAStartup() failed!");
 
#endif
 
#endif
  
 
// Set up structures for socket & select
 
// Set up structures for socket & select
sockopt_on = 1;
 
int szAddr = sizeof(struct sockaddr_in);
 
memset((char*)&addr, 0, szAddr); // zero the struct
 
 
addr.sin_family = AF_INET;
 
addr.sin_family = AF_INET;
 
addr.sin_port = htons(port);
 
addr.sin_port = htons(port);
 
addr.sin_addr.s_addr = htonl(INADDR_ANY);
 
addr.sin_addr.s_addr = htonl(INADDR_ANY);
 +
memset(&(addr.sin_zero),0,8);
 
errno = 0;
 
errno = 0;
  
 
// Do the usual socket polava: create,options,bind,listen
 
// Do the usual socket polava: create,options,bind,listen
if ((sock = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP))<0)
+
if ((sock = socket(PF_INET,SOCK_STREAM,0))<0)
logErrNum("init/socket() failed returning %d",WSAGetLastError());
+
ioErr("init/socket() failed returning %d");
if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
+
else if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
logErrNum("init/setsockopt() failed returning %d",WSAGetLastError());
+
ioErr("init/setsockopt() failed returning %d");
if (bind(nonblocking(sock),(struct sockaddr*)&addr,szAddr)<0)
+
else if (bind(nonblocking(sock),(struct sockaddr*)&addr,sizeof(struct sockaddr))<0)
logErrNum("init/bind() failed returning %d",WSAGetLastError());
+
ioErr("init/bind() failed returning %d");
if (listen(sock,svrMAXCLIENTS)<0)
+
else if (listen(sock,ioMAXCLIENTS)<0)
logErrNum("init/listen() failed returning %d",WSAGetLastError());
+
ioErr("init/listen() failed returning %d");
 
+
else if (WSAGetLastError() == 0) {
if (WSAGetLastError() == 0) {
 
 
char *msg = malloc(100);
 
char *msg = malloc(100);
 
sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port);
 
sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port);
Line 79: Line 92:
  
 
// Globals for server() and client() quantum-functions
 
// Globals for server() and client() quantum-functions
pBuf = malloc(svrPAKMAX);
+
pBuf = malloc(ioPAKMAX);
*nodeState(nodeSERVER,0) = &server;
+
*nodeState(nodeSERVER,nodeCODE) = &server;
nodeSetValue(nodeSERVER,nodeCODE,nodeTRUE);
 
  
return 0;
+
return 0;
 
}
 
}
  
  
 
// Perform any cleanup for socket
 
// Perform any cleanup for socket
int serverExit() {
+
int ioExit() {
 
free(pBuf);
 
free(pBuf);
 
// todo: close streams and free buffers etc
 
// todo: close streams and free buffers etc
Line 107: Line 119:
 
#endif
 
#endif
 
return socket;
 
return socket;
 +
}
 +
 +
 +
void ioErr(char *msg) {
 +
logAdd(msg,errno = WSAGetLastError());
 
}
 
}
  
Line 114: Line 131:
 
FD_ZERO(&fdset);
 
FD_ZERO(&fdset);
 
FD_SET(fd,&fdset);
 
FD_SET(fd,&fdset);
to.tv_sec = to.tv_usec = svrDELAY;
+
to.tv_sec = to.tv_usec = ioDELAY;
 
return &fdset;
 
return &fdset;
 
}
 
}
Line 121: Line 138:
 
// Process message in iBuf which should be a complete HTTP GET request
 
// Process message in iBuf which should be a complete HTTP GET request
 
// - iPtr is put back to 0 so next message fills from start of iBuf
 
// - iPtr is put back to 0 so next message fills from start of iBuf
void streamProcess(streamInfo *stream) {
+
void streamProcess(streamInfo *info) {
char *rqst, *end, *msg = stream->iBuf;
+
char *rqst, *end, *msg = info->iBuf;
 
if (strncmp("GET /",msg,5)==0) {
 
if (strncmp("GET /",msg,5)==0) {
  
Line 129: Line 146:
 
while(*++end > ' ');
 
while(*++end > ' ');
 
*end = '\0';
 
*end = '\0';
printf("Request:\"%s\"\n",rqst);
+
logAdd("Request: \"%s\"",rqst);
  
 
// Exit if /stop requested
 
// Exit if /stop requested
Line 135: Line 152:
  
 
// Send test response back
 
// Send test response back
send(stream->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0);
+
send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0);
 
}
 
}
 
else {
 
else {
send(stream->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0);
+
send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0);
 
logAdd("Peerd only supports GET requests currently!");
 
logAdd("Peerd only supports GET requests currently!");
 
}
 
}
stream->iPtr = 0; // reset iPtr ready for next message
+
info->iPtr = 0; // reset iPtr ready for next message
 
}
 
}
  
  
 
// Create new client-node and streamInfo structure from passed file-descriptor
 
// Create new client-node and streamInfo structure from passed file-descriptor
// - if fd is NULL, then try and create one by resolving the resource string
+
// - if resource not NULL, then try and create fd by resolving the resource string
void streamOpen(int fd, char *resource) {
+
// - also if resource is non-NULL, fd can be used to pass fopen flags
if (fd == NULL) {
+
int streamOpen(int fd, char *resource) {
// if resource is /^[./]/ then its a file
+
 
// - file will have different packet size etc
+
// Create new streamInfo structure for the new stream (default to socket params)
// - may need to include &recv/&read etc in streamInfo struct
+
streamInfo *info = malloc(sizeof(streamInfo));
 +
info->type = ioSOCK;
 +
info->size = sockPAKSIZE;
 +
 
 +
// Try and obtain an fd from resource description
 +
if (resource) {
 +
if ((*resource=='.')||(*resource=='/')) {
 +
// The resource is a file, use [[[[http://www.opengroup.org/onlinepubs/009695399/functions/open.html|open]]]] (the fd can be used to pass open-flags)
 +
if ((fd = nonblocking(open(resource,fd)))<0)
 +
ioErr("streamOpen: File open failed returning %d");
 +
else {
 +
info->type = ioFILE;
 +
info->size = filePAKSIZE;
 +
}
 +
}
 +
else {
 +
// Resolve IP/port from resource text description and attempt connection
 +
// - see [[[[http://developer.novell.com/wiki/index.php/BSD_gethost_functions|gethost example]]]], [[[[http://www.delorie.com/gnu/docs/glibc/libc_319.html|struct hostent]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/gethostbyname.html|gethostbyname]]]], [[[[http://www.die.net/doc/linux/man/man2/connect.2.html|connect]]]]
 +
struct hostent *host = gethostbyname(resource);
 +
if (host == NULL )
 +
ioErr("streamOpen/gethostbyname() failed returning %d");
 +
else {
 +
struct in_addr ia;
 +
memset((char*)&ia,0,sizeof(struct in_addr));
 +
ia.s_addr = *(u_long*)host->h_addr;
 +
logAdd("hostAddr: %s\n",inet_ntoa(ia));
 +
//if ((fd = socket(PF_INET,host->h_addrtype,0))<0)
 +
// ioErr("streamOpen/socket() failed returning %d");
 +
//else if (setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
 +
// ioErr("streamOpen/setsockopt() failed returning %d");
 +
//else if (connect(fd,ntohl(ia),sizeof(uint_32_t))<0)
 +
// ioErr("streamOpen/connect() failed returning %d");
 +
//else {
 +
// }
 +
fd = -1;
 +
}
 +
}
 +
}
 +
 
 +
// Clean up and exit if no valid fd obtained
 +
if (fd<0) {
 +
free(info);
 +
logAdd("Couldn't open resource \"%s\"",resource);
 +
return errno;
 
}
 
}
if (fd < 0) return logErrMsg("Couldn't open resource \"%s\"",resource);
+
else logAdd("New connection: Stream%d.",fd);
logAddNum("New connection: Stream%d.",fd);
+
 
 
// Create new streamInfo structure and allocate buffers
 
// Create new streamInfo structure and allocate buffers
streamInfo *stream = malloc(sizeof(streamInfo));
+
info->fd = fd;
stream->fd = fd;
+
info->iBuf = malloc(ioBUFSIZE);
stream->iBuf = malloc(svrBUFSIZE);
+
info->oBuf = malloc(ioBUFSIZE);
stream->oBuf = malloc(svrBUFSIZE);
+
info->iPtr = info->oPtr = info->tPtr = 0;
stream->iPtr = stream->oPtr = stream->tPtr = 0;
 
  
 
// Create new client-node in clients-loop and add the new streamInfo
 
// Create new client-node in clients-loop and add the new streamInfo
node nc = nodeGetValue(nodeCLIENTS,nodeLOOP);
+
node n = nodeLoopInsert(nodeSTREAMS,0);
nc = nodeLoopInsert(nc,0);
+
*nodeState(n,nodeSTREAM) = info;
nodeSetValue(nodeCLIENTS,nodeLOOP,nc);
+
*nodeState(n,nodeCODE) = &io;
nodeSetValue(nc,nodeCODE,nodeTRUE);
+
return fd;
*nodeState(nc,0) = &client;
 
*nodeState(nc,nodeSTREAM) = stream;
 
 
}
 
}
  
  
 
// Close the passed stream, free its resources and remove from clients-loop
 
// Close the passed stream, free its resources and remove from clients-loop
void streamClose(streamInfo *stream) {
+
void streamClose(streamInfo *info) {
printf("Stream%d closed.\n",stream->fd);
+
logAdd("Stream%d closed.\n",info->fd);
//nodeLoopRemove(this); // NOTE: reduction not handling PREV
+
nodeLoopRemove(parent,this);
free(stream->iBuf);
+
free(info->iBuf);
free(stream->oBuf);
+
free(info->oBuf);
free(stream);
+
free(info);
 
}
 
}
  
  
// Each of the stream nodes points to this function in its State
+
// Send and/or receive a packet of data over the current node's associated stream
void client() {
+
void io() {
  
 
// Get pointer to the streamInfo structure in current nodal context (this)
 
// Get pointer to the streamInfo structure in current nodal context (this)
streamInfo *stream = *nodeState(this, nodeSTREAM);
+
streamInfo *info = *nodeState(this, nodeSTREAM);
fd = stream->fd;
+
int fd = info->fd, type = info->type, size = info->size;
char *iBuf = stream->iBuf, *oBuf = stream->oBuf;
+
char *iBuf = info->iBuf, *oBuf = info->oBuf;
int n, i = 0;
 
  
// If any data to recieve, read a packet
+
// Try and read a packet from the stream (see [[[[http://www.opengroup.org/onlinepubs/009695399/functions/select.html|select]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/recv.html|recv]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/read.html|read]]]])
if (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0) {
+
int n = -1;
if ((n = recv(fd,pBuf,svrPAKSIZE,0)) > 0) {
+
if (type == ioFILE)
// Some bytes were read, append to current message (loop last packet if msg too big)
+
n = read(fd,pBuf,size);
if (stream->iPtr > svrBUFSIZE-svrPAKSIZE) stream->iPtr -= svrPAKSIZE;
+
else if ((type == ioSOCK) && (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0))
while (n--)  // Append current msg, for each complete msg, process it & reset iPtr
+
n = recv(fd,pBuf,size,0);
if ((iBuf[stream->iPtr++] = pBuf[i++]) != term[stream->tPtr++]) stream->tPtr = 0;
+
 
else if (term[stream->tPtr] == 0) streamProcess(stream);
+
// If any bytes read, append to current message
}
+
if (n > 0) {
else if (n == 0) streamClose(stream); // Zero bytes were read, do orderly termination
+
if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE; // loop last packet if msg too big
else logErrNum("client/recv() failed returning %d",WSAGetLastError());
+
int i = 0;
 +
while (n--)  // Append current msg, for each complete msg, process it & reset iPtr
 +
if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0;
 +
else if (term[info->tPtr] == 0) streamProcess(info);
 
}
 
}
 +
else if (n == 0) streamClose(info); // Zero bytes were read, do orderly termination
 +
else ioErr("stream/recv() failed returning %d");
  
 
// Data to send?
 
// Data to send?
 
//if (select(fd+1,NULL,fdsetInit(fd),NULL,&to)>0) {
 
//if (select(fd+1,NULL,fdsetInit(fd),NULL,&to)>0) {
 
// write a packet from outbuf
 
// write a packet from outbuf
//int len = svrPAKSIZE; // or less
+
//int len = ioPAKSIZE; // or less
//if (send(fd,oBuf+oPtr,len,0)<0) logErr("send() failed!");
+
//if (send(fd,oBuf+oPtr,len,0)<0) ioErr("send() failed!");
 
// }
 
// }
  
Line 216: Line 277:
 
// Function called by each stream-node in network's reduction loop
 
// Function called by each stream-node in network's reduction loop
 
// - Checks if any new connections waiting on listening socket,
 
// - Checks if any new connections waiting on listening socket,
//  if so accept and call streamOpen() with new file-descriptor
+
//  if so [[[[http://www.opengroup.org/onlinepubs/009695399/functions/accept.html|accept]]]] and call streamOpen() with new file-descriptor
 
void server() {
 
void server() {
 
if (select(sock+1,fdsetInit(sock),NULL,NULL,&to)>0)
 
if (select(sock+1,fdsetInit(sock),NULL,NULL,&to)>0)
if ((fd = nonblocking(accept(sock,NULL,NULL)))>0) streamOpen(fd,NULL);
+
if (streamOpen(accept(sock,NULL,NULL),NULL)<0)
else logErrNum("server/accept() failed returning %d",WSAGetLastError());
+
ioErr("server/accept() failed returning %d");
 
}
 
}
 +
</source>
 +
[[Category:C]]

Latest revision as of 13:10, 13 December 2019

Legacy.svg Legacy: This article describes a concept that has been superseded in the course of ongoing development on the Organic Design wiki. Please do not develop this any further or base work on this concept, this is only useful for a historic record of work done. You may find a link to the currently used concept or function in this article, if not you can contact the author to find out what has taken the place of this legacy item.
// http://www.organicdesign.co.nz/peerd - nodal p2p wiki daemon
// This article and all its includes are licenced under LGPL
// GPL: http://www.gnu.org/copyleft/lesser.html
// SRC: http://www.organicdesign.co.nz/io.c
// included in http://www.organicdesign.co.nz/peerd.c
// See also: http://www.organicdesign.co.nz/talk:io.c, http://www.kegel.com/c10k.html

#define ioDELAY 0         // normal operation is 0 for no delay
#define ioPAKSIZE 128     // keep packet-size small for multipelxed design
#define ioPAKMAX  16384   // maximum packet size to allow for variable pak size at runtime
#define filePAKSIZE 4096  // packet size for file-streams
#define sockPAKSIZE 128   // packet size for socket-streams
#define ioBUFSIZE 4096    // dictates max message size
#define ioMAXCLIENTS 1000 // used by listen()
#define ioSOCK 0          // stream type of socket
#define ioFILE 1          // stream type of file

#ifndef __WIN32__
#define WSAGetLastError() errno
#endif

// Socket structures, globals and prototypes
void server();
int ioInit();
int ioExit();
int nonblocking(int);
void ioErr(char*);
fd_set *fdsetInit(int);
fd_set fdset;
int sock;
struct sockaddr_in addr;
struct timeval to;
unsigned long int sockopt_on = 1;

// Client/Stream info structure, globals and prototypes
char *pBuf;                // Buffer to read/write data packets
const char *term = "\r\n\r\n";
typedef struct si {
	int fd;                // File-descriptor for this stream
	int type;              // Stream type (ioFILE, ioSOCK)
	int size;              // Size of the packets to read/write for this stream
	char *iBuf, *oBuf;     // Message buffers for this stream
	int iPtr;              // Index into input-buffer for next packet to start at
	int tPtr;              // Index into term (atomised strcmp incase terminator spans packet-boundary)
	int oPtr;
	} streamInfo;
void io();
int streamOpen(int,char*);
void streamClose(streamInfo*);
void streamProcess(streamInfo*);


// ----------------------------------------------------------------------------------------- //
// io.c

// Initialise socket listening on specified port
int ioInit() {

	#if __WIN32__
	WSADATA wsaData;
	if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logAdd("WSAStartup() failed!");
	#endif

	// Set up structures for socket & select
	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);
	addr.sin_addr.s_addr = htonl(INADDR_ANY);
	memset(&(addr.sin_zero),0,8);
	errno = 0;

	// Do the usual socket polava: create,options,bind,listen
	if ((sock = socket(PF_INET,SOCK_STREAM,0))<0)
		ioErr("init/socket() failed returning %d");
	else if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
		ioErr("init/setsockopt() failed returning %d");
	else if (bind(nonblocking(sock),(struct sockaddr*)&addr,sizeof(struct sockaddr))<0)
		ioErr("init/bind() failed returning %d");
	else if (listen(sock,ioMAXCLIENTS)<0)
		ioErr("init/listen() failed returning %d");
	else if (WSAGetLastError() == 0) {
		char *msg = malloc(100);
		sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port);
		logAdd(msg);
		free(msg);
		}
	else {
		logAdd("Server failed to start!");
		return WSAGetLastError();
		}

	// Globals for server() and client() quantum-functions
	pBuf = malloc(ioPAKMAX);
	*nodeState(nodeSERVER,nodeCODE) = &server;

	return 0;
	}


// Perform any cleanup for socket
int ioExit() {
	free(pBuf);
	// todo: close streams and free buffers etc
	#if __WIN32__
	WSACleanup();
	#endif
	return WSAGetLastError();
	}


// make the passed socket non-blocking so accept() returns straight away for multiplexed model
// - if no incoming requests, returns EAGAIN or EWOULDBLOCK state
int nonblocking(int socket) {
	#if __WIN32__
	ioctlsocket(socket,FIONBIO,&sockopt_on);
	#else
	fcntl(socket,F_SETFL,fcntl(socket,F_GETFL)|O_NONBLOCK);
	#endif
	return socket;
	}


void ioErr(char *msg) {
	logAdd(msg,errno = WSAGetLastError());
	}


// Return pointer to fdset filled with passed file-descriptor ready for a select() call
fd_set *fdsetInit(int fd) {
	FD_ZERO(&fdset);
	FD_SET(fd,&fdset);
	to.tv_sec = to.tv_usec = ioDELAY;
	return &fdset;
	}


// Process message in iBuf which should be a complete HTTP GET request
// - iPtr is put back to 0 so next message fills from start of iBuf
void streamProcess(streamInfo *info) {
	char *rqst, *end, *msg = info->iBuf;
	if (strncmp("GET /",msg,5)==0) {

		// Extract request path & query-string
		rqst = end = msg+4;
		while(*++end > ' ');
		*end = '\0';
		logAdd("Request: \"%s\"",rqst);

		// Exit if /stop requested
		if (strcmp("/stop",rqst)==0) nodeExit();

		// Send test response back
		send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0);
		}
	else {
		send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0);
		logAdd("Peerd only supports GET requests currently!");
		}
	info->iPtr = 0; // reset iPtr ready for next message
	}


// Create new client-node and streamInfo structure from passed file-descriptor
// - if resource not NULL, then try and create fd by resolving the resource string
// - also if resource is non-NULL, fd can be used to pass fopen flags
int streamOpen(int fd, char *resource) {

	// Create new streamInfo structure for the new stream (default to socket params)
	streamInfo *info = malloc(sizeof(streamInfo));
	info->type = ioSOCK;
	info->size = sockPAKSIZE;

	// Try and obtain an fd from resource description
	if (resource) {
		if ((*resource=='.')||(*resource=='/')) {
			// The resource is a file, use [[[[http://www.opengroup.org/onlinepubs/009695399/functions/open.html|open]]]] (the fd can be used to pass open-flags)
			if ((fd = nonblocking(open(resource,fd)))<0)
				ioErr("streamOpen: File open failed returning %d");
			else {
				info->type = ioFILE;
				info->size = filePAKSIZE;
				}
			}
		else {
			// Resolve IP/port from resource text description and attempt connection
			// - see [[[[http://developer.novell.com/wiki/index.php/BSD_gethost_functions|gethost example]]]], [[[[http://www.delorie.com/gnu/docs/glibc/libc_319.html|struct hostent]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/gethostbyname.html|gethostbyname]]]], [[[[http://www.die.net/doc/linux/man/man2/connect.2.html|connect]]]]
			struct hostent *host = gethostbyname(resource);
			if (host == NULL )
				ioErr("streamOpen/gethostbyname() failed returning %d");
			else {
				struct in_addr ia;
				memset((char*)&ia,0,sizeof(struct in_addr));
				ia.s_addr = *(u_long*)host->h_addr;
				logAdd("hostAddr: %s\n",inet_ntoa(ia));
				//if ((fd = socket(PF_INET,host->h_addrtype,0))<0)
				//	ioErr("streamOpen/socket() failed returning %d");
				//else if (setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
				//	ioErr("streamOpen/setsockopt() failed returning %d");
				//else if (connect(fd,ntohl(ia),sizeof(uint_32_t))<0)
				//	ioErr("streamOpen/connect() failed returning %d");
				//else {
				//	}
				fd = -1;
				}
			}
		}

	// Clean up and exit if no valid fd obtained
	if (fd<0) {
		free(info);
		logAdd("Couldn't open resource \"%s\"",resource);
		return errno;
		}
	else logAdd("New connection: Stream%d.",fd);

	// Create new streamInfo structure and allocate buffers
	info->fd = fd;
	info->iBuf = malloc(ioBUFSIZE);
	info->oBuf = malloc(ioBUFSIZE);
	info->iPtr = info->oPtr = info->tPtr = 0;

	// Create new client-node in clients-loop and add the new streamInfo
	node n = nodeLoopInsert(nodeSTREAMS,0);
	*nodeState(n,nodeSTREAM) = info;
	*nodeState(n,nodeCODE) = &io;
	return fd;
	}


// Close the passed stream, free its resources and remove from clients-loop
void streamClose(streamInfo *info) {
	logAdd("Stream%d closed.\n",info->fd);
	nodeLoopRemove(parent,this);
	free(info->iBuf);
	free(info->oBuf);
	free(info);
	}


// Send and/or receive a packet of data over the current node's associated stream
void io() {

	// Get pointer to the streamInfo structure in current nodal context (this)
	streamInfo *info = *nodeState(this, nodeSTREAM);
	int fd = info->fd, type = info->type, size = info->size;
	char *iBuf = info->iBuf, *oBuf = info->oBuf;

	// Try and read a packet from the stream (see [[[[http://www.opengroup.org/onlinepubs/009695399/functions/select.html|select]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/recv.html|recv]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/read.html|read]]]])
	int n = -1;
	if (type == ioFILE)
		n = read(fd,pBuf,size);
	else if ((type == ioSOCK) && (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0))
		n = recv(fd,pBuf,size,0);

	// If any bytes read, append to current message
	if (n > 0) {
		if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE; // loop last packet if msg too big
		int i = 0;
		while (n--)  // Append current msg, for each complete msg, process it & reset iPtr
			if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0;
			else if (term[info->tPtr] == 0) streamProcess(info);
		}
	else if (n == 0) streamClose(info); // Zero bytes were read, do orderly termination
	else ioErr("stream/recv() failed returning %d");

	// Data to send?
	//if (select(fd+1,NULL,fdsetInit(fd),NULL,&to)>0) {
		// write a packet from outbuf
		//int len = ioPAKSIZE; // or less
		//if (send(fd,oBuf+oPtr,len,0)<0) ioErr("send() failed!");
	//	}

	}


// Function called by each stream-node in network's reduction loop
// - Checks if any new connections waiting on listening socket,
//   if so [[[[http://www.opengroup.org/onlinepubs/009695399/functions/accept.html|accept]]]] and call streamOpen() with new file-descriptor
void server() {
	if (select(sock+1,fdsetInit(sock),NULL,NULL,&to)>0)
		if (streamOpen(accept(sock,NULL,NULL),NULL)<0)
			ioErr("server/accept() failed returning %d");
	}