io.c

From Organic Design wiki
Legacy.svg Legacy: This article describes a concept that has been superseded in the course of ongoing development on the Organic Design wiki. Please do not develop this any further or base work on this concept, this is only useful for a historic record of work done. You may find a link to the currently used concept or function in this article, if not you can contact the author to find out what has taken the place of this legacy item.
// http://www.organicdesign.co.nz/peerd - nodal p2p wiki daemon
// This article and all its includes are licenced under LGPL
// GPL: http://www.gnu.org/copyleft/lesser.html
// SRC: http://www.organicdesign.co.nz/io.c
// included in http://www.organicdesign.co.nz/peerd.c
// See also: http://www.organicdesign.co.nz/talk:io.c, http://www.kegel.com/c10k.html

#define ioDELAY 0         // normal operation is 0 for no delay
#define ioPAKSIZE 128     // keep packet-size small for multipelxed design
#define ioPAKMAX  16384   // maximum packet size to allow for variable pak size at runtime
#define filePAKSIZE 4096  // packet size for file-streams
#define sockPAKSIZE 128   // packet size for socket-streams
#define ioBUFSIZE 4096    // dictates max message size
#define ioMAXCLIENTS 1000 // used by listen()
#define ioSOCK 0          // stream type of socket
#define ioFILE 1          // stream type of file

#ifndef __WIN32__
#define WSAGetLastError() errno
#endif

// Socket structures, globals and prototypes
void server();
int ioInit();
int ioExit();
int nonblocking(int);
void ioErr(char*);
fd_set *fdsetInit(int);
fd_set fdset;
int sock;
struct sockaddr_in addr;
struct timeval to;
unsigned long int sockopt_on = 1;

// Client/Stream info structure, globals and prototypes
char *pBuf;                // Buffer to read/write data packets
const char *term = "\r\n\r\n";
typedef struct si {
	int fd;                // File-descriptor for this stream
	int type;              // Stream type (ioFILE, ioSOCK)
	int size;              // Size of the packets to read/write for this stream
	char *iBuf, *oBuf;     // Message buffers for this stream
	int iPtr;              // Index into input-buffer for next packet to start at
	int tPtr;              // Index into term (atomised strcmp incase terminator spans packet-boundary)
	int oPtr;
	} streamInfo;
void io();
int streamOpen(int,char*);
void streamClose(streamInfo*);
void streamProcess(streamInfo*);


// ----------------------------------------------------------------------------------------- //
// io.c

// Initialise socket listening on specified port
int ioInit() {

	#if __WIN32__
	WSADATA wsaData;
	if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logAdd("WSAStartup() failed!");
	#endif

	// Set up structures for socket & select
	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);
	addr.sin_addr.s_addr = htonl(INADDR_ANY);
	memset(&(addr.sin_zero),0,8);
	errno = 0;

	// Do the usual socket polava: create,options,bind,listen
	if ((sock = socket(PF_INET,SOCK_STREAM,0))<0)
		ioErr("init/socket() failed returning %d");
	else if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
		ioErr("init/setsockopt() failed returning %d");
	else if (bind(nonblocking(sock),(struct sockaddr*)&addr,sizeof(struct sockaddr))<0)
		ioErr("init/bind() failed returning %d");
	else if (listen(sock,ioMAXCLIENTS)<0)
		ioErr("init/listen() failed returning %d");
	else if (WSAGetLastError() == 0) {
		char *msg = malloc(100);
		sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port);
		logAdd(msg);
		free(msg);
		}
	else {
		logAdd("Server failed to start!");
		return WSAGetLastError();
		}

	// Globals for server() and client() quantum-functions
	pBuf = malloc(ioPAKMAX);
	*nodeState(nodeSERVER,nodeCODE) = &server;

	return 0;
	}


// Perform any cleanup for socket
int ioExit() {
	free(pBuf);
	// todo: close streams and free buffers etc
	#if __WIN32__
	WSACleanup();
	#endif
	return WSAGetLastError();
	}


// make the passed socket non-blocking so accept() returns straight away for multiplexed model
// - if no incoming requests, returns EAGAIN or EWOULDBLOCK state
int nonblocking(int socket) {
	#if __WIN32__
	ioctlsocket(socket,FIONBIO,&sockopt_on);
	#else
	fcntl(socket,F_SETFL,fcntl(socket,F_GETFL)|O_NONBLOCK);
	#endif
	return socket;
	}


void ioErr(char *msg) {
	logAdd(msg,errno = WSAGetLastError());
	}


// Return pointer to fdset filled with passed file-descriptor ready for a select() call
fd_set *fdsetInit(int fd) {
	FD_ZERO(&fdset);
	FD_SET(fd,&fdset);
	to.tv_sec = to.tv_usec = ioDELAY;
	return &fdset;
	}


// Process message in iBuf which should be a complete HTTP GET request
// - iPtr is put back to 0 so next message fills from start of iBuf
void streamProcess(streamInfo *info) {
	char *rqst, *end, *msg = info->iBuf;
	if (strncmp("GET /",msg,5)==0) {

		// Extract request path & query-string
		rqst = end = msg+4;
		while(*++end > ' ');
		*end = '\0';
		logAdd("Request: \"%s\"",rqst);

		// Exit if /stop requested
		if (strcmp("/stop",rqst)==0) nodeExit();

		// Send test response back
		send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0);
		}
	else {
		send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0);
		logAdd("Peerd only supports GET requests currently!");
		}
	info->iPtr = 0; // reset iPtr ready for next message
	}


// Create new client-node and streamInfo structure from passed file-descriptor
// - if resource not NULL, then try and create fd by resolving the resource string
// - also if resource is non-NULL, fd can be used to pass fopen flags
int streamOpen(int fd, char *resource) {

	// Create new streamInfo structure for the new stream (default to socket params)
	streamInfo *info = malloc(sizeof(streamInfo));
	info->type = ioSOCK;
	info->size = sockPAKSIZE;

	// Try and obtain an fd from resource description
	if (resource) {
		if ((*resource=='.')||(*resource=='/')) {
			// The resource is a file, use [[[[http://www.opengroup.org/onlinepubs/009695399/functions/open.html|open]]]] (the fd can be used to pass open-flags)
			if ((fd = nonblocking(open(resource,fd)))<0)
				ioErr("streamOpen: File open failed returning %d");
			else {
				info->type = ioFILE;
				info->size = filePAKSIZE;
				}
			}
		else {
			// Resolve IP/port from resource text description and attempt connection
			// - see [[[[http://developer.novell.com/wiki/index.php/BSD_gethost_functions|gethost example]]]], [[[[http://www.delorie.com/gnu/docs/glibc/libc_319.html|struct hostent]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/gethostbyname.html|gethostbyname]]]], [[[[http://www.die.net/doc/linux/man/man2/connect.2.html|connect]]]]
			struct hostent *host = gethostbyname(resource);
			if (host == NULL )
				ioErr("streamOpen/gethostbyname() failed returning %d");
			else {
				struct in_addr ia;
				memset((char*)&ia,0,sizeof(struct in_addr));
				ia.s_addr = *(u_long*)host->h_addr;
				logAdd("hostAddr: %s\n",inet_ntoa(ia));
				//if ((fd = socket(PF_INET,host->h_addrtype,0))<0)
				//	ioErr("streamOpen/socket() failed returning %d");
				//else if (setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
				//	ioErr("streamOpen/setsockopt() failed returning %d");
				//else if (connect(fd,ntohl(ia),sizeof(uint_32_t))<0)
				//	ioErr("streamOpen/connect() failed returning %d");
				//else {
				//	}
				fd = -1;
				}
			}
		}

	// Clean up and exit if no valid fd obtained
	if (fd<0) {
		free(info);
		logAdd("Couldn't open resource \"%s\"",resource);
		return errno;
		}
	else logAdd("New connection: Stream%d.",fd);

	// Create new streamInfo structure and allocate buffers
	info->fd = fd;
	info->iBuf = malloc(ioBUFSIZE);
	info->oBuf = malloc(ioBUFSIZE);
	info->iPtr = info->oPtr = info->tPtr = 0;

	// Create new client-node in clients-loop and add the new streamInfo
	node n = nodeLoopInsert(nodeSTREAMS,0);
	*nodeState(n,nodeSTREAM) = info;
	*nodeState(n,nodeCODE) = &io;
	return fd;
	}


// Close the passed stream, free its resources and remove from clients-loop
void streamClose(streamInfo *info) {
	logAdd("Stream%d closed.\n",info->fd);
	nodeLoopRemove(parent,this);
	free(info->iBuf);
	free(info->oBuf);
	free(info);
	}


// Send and/or receive a packet of data over the current node's associated stream
void io() {

	// Get pointer to the streamInfo structure in current nodal context (this)
	streamInfo *info = *nodeState(this, nodeSTREAM);
	int fd = info->fd, type = info->type, size = info->size;
	char *iBuf = info->iBuf, *oBuf = info->oBuf;

	// Try and read a packet from the stream (see [[[[http://www.opengroup.org/onlinepubs/009695399/functions/select.html|select]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/recv.html|recv]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/read.html|read]]]])
	int n = -1;
	if (type == ioFILE)
		n = read(fd,pBuf,size);
	else if ((type == ioSOCK) && (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0))
		n = recv(fd,pBuf,size,0);

	// If any bytes read, append to current message
	if (n > 0) {
		if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE; // loop last packet if msg too big
		int i = 0;
		while (n--)  // Append current msg, for each complete msg, process it & reset iPtr
			if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0;
			else if (term[info->tPtr] == 0) streamProcess(info);
		}
	else if (n == 0) streamClose(info); // Zero bytes were read, do orderly termination
	else ioErr("stream/recv() failed returning %d");

	// Data to send?
	//if (select(fd+1,NULL,fdsetInit(fd),NULL,&to)>0) {
		// write a packet from outbuf
		//int len = ioPAKSIZE; // or less
		//if (send(fd,oBuf+oPtr,len,0)<0) ioErr("send() failed!");
	//	}

	}


// Function called by each stream-node in network's reduction loop
// - Checks if any new connections waiting on listening socket,
//   if so [[[[http://www.opengroup.org/onlinepubs/009695399/functions/accept.html|accept]]]] and call streamOpen() with new file-descriptor
void server() {
	if (select(sock+1,fdsetInit(sock),NULL,NULL,&to)>0)
		if (streamOpen(accept(sock,NULL,NULL),NULL)<0)
			ioErr("server/accept() failed returning %d");
	}