Difference between revisions of "Io.c"

From Organic Design wiki
(default new streamInfo to socket params)
m (Nad moved page Io.h to Io.c without leaving a redirect)
 
(19 intermediate revisions by 2 users not shown)
Line 1: Line 1:
 +
{{legacy}}{{lowercase}}
 +
<source lang="c">
 +
// http://www.organicdesign.co.nz/peerd - nodal p2p wiki daemon
 
// This article and all its includes are licenced under LGPL
 
// This article and all its includes are licenced under LGPL
// GPL: [[[[http://www.gnu.org/copyleft/lesser.html]]]]
+
// GPL: http://www.gnu.org/copyleft/lesser.html
// SRC: [[[[http://www.organicdesign.co.nz/io.c]]]]
+
// SRC: http://www.organicdesign.co.nz/io.c
// included in [[[[http://www.organicdesign.co.nz/category:peerd/files/C|peerd]]]][[[[http://www.organicdesign.co.nz/husk.c|/husk.c]]]]
+
// included in http://www.organicdesign.co.nz/peerd.c
// See also: [[[[http://www.organicdesign.co.nz/talk:io.c|talk page]]]]
+
// See also: http://www.organicdesign.co.nz/talk:io.c, http://www.kegel.com/c10k.html
  
 
#define ioDELAY 0        // normal operation is 0 for no delay
 
#define ioDELAY 0        // normal operation is 0 for no delay
Line 27: Line 30:
 
fd_set *fdsetInit(int);
 
fd_set *fdsetInit(int);
 
fd_set fdset;
 
fd_set fdset;
int sock,fd;
+
int sock;
 
struct sockaddr_in addr;
 
struct sockaddr_in addr;
 
struct timeval to;
 
struct timeval to;
Line 36: Line 39:
 
const char *term = "\r\n\r\n";
 
const char *term = "\r\n\r\n";
 
typedef struct si {
 
typedef struct si {
 +
int fd;                // File-descriptor for this stream
 
int type;              // Stream type (ioFILE, ioSOCK)
 
int type;              // Stream type (ioFILE, ioSOCK)
int fd;               // File-descriptor for this stream
+
int size;             // Size of the packets to read/write for this stream
 
char *iBuf, *oBuf;    // Message buffers for this stream
 
char *iBuf, *oBuf;    // Message buffers for this stream
 
int iPtr;              // Index into input-buffer for next packet to start at
 
int iPtr;              // Index into input-buffer for next packet to start at
 
int tPtr;              // Index into term (atomised strcmp incase terminator spans packet-boundary)
 
int tPtr;              // Index into term (atomised strcmp incase terminator spans packet-boundary)
 
int oPtr;
 
int oPtr;
int szPacket;          // Size of the packets to read/write for this stream
 
 
} streamInfo;
 
} streamInfo;
 
void io();
 
void io();
Line 49: Line 52:
 
void streamProcess(streamInfo*);
 
void streamProcess(streamInfo*);
  
 +
 +
// ----------------------------------------------------------------------------------------- //
 +
// io.c
  
 
// Initialise socket listening on specified port
 
// Initialise socket listening on specified port
Line 55: Line 61:
 
#if __WIN32__
 
#if __WIN32__
 
WSADATA wsaData;
 
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logErr("WSAStartup() failed!");
+
if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logAdd("WSAStartup() failed!");
 
#endif
 
#endif
  
Line 87: Line 93:
 
// Globals for server() and client() quantum-functions
 
// Globals for server() and client() quantum-functions
 
pBuf = malloc(ioPAKMAX);
 
pBuf = malloc(ioPAKMAX);
*nodeState(nodeSERVER,0) = &server;
+
*nodeState(nodeSERVER,nodeCODE) = &server;
nodeSetValue(nodeSERVER,nodeCODE,nodeTRUE);
 
  
return 0;
+
return 0;
 
}
 
}
  
Line 118: Line 123:
  
 
void ioErr(char *msg) {
 
void ioErr(char *msg) {
logErrNum(msg,errno = WSAGetLastError());
+
logAdd(msg,errno = WSAGetLastError());
 
}
 
}
  
Line 141: Line 146:
 
while(*++end > ' ');
 
while(*++end > ' ');
 
*end = '\0';
 
*end = '\0';
printf("Request:\"%s\"\n",rqst);
+
logAdd("Request: \"%s\"",rqst);
  
 
// Exit if /stop requested
 
// Exit if /stop requested
Line 165: Line 170:
 
streamInfo *info = malloc(sizeof(streamInfo));
 
streamInfo *info = malloc(sizeof(streamInfo));
 
info->type = ioSOCK;
 
info->type = ioSOCK;
info->szPacket = sockPAKSIZE;
+
info->size = sockPAKSIZE;
  
 
// Try and obtain an fd from resource description
 
// Try and obtain an fd from resource description
Line 175: Line 180:
 
else {
 
else {
 
info->type = ioFILE;
 
info->type = ioFILE;
info->szPacket = filePAKSIZE;
+
info->size = filePAKSIZE;
 
}
 
}
 
}
 
}
Line 188: Line 193:
 
memset((char*)&ia,0,sizeof(struct in_addr));
 
memset((char*)&ia,0,sizeof(struct in_addr));
 
ia.s_addr = *(u_long*)host->h_addr;
 
ia.s_addr = *(u_long*)host->h_addr;
printf("hostAddr: %s\n",inet_ntoa(ia));
+
logAdd("hostAddr: %s\n",inet_ntoa(ia));
 
//if ((fd = socket(PF_INET,host->h_addrtype,0))<0)
 
//if ((fd = socket(PF_INET,host->h_addrtype,0))<0)
 
// ioErr("streamOpen/socket() failed returning %d");
 
// ioErr("streamOpen/socket() failed returning %d");
Line 205: Line 210:
 
if (fd<0) {
 
if (fd<0) {
 
free(info);
 
free(info);
logErrMsg("Couldn't open resource \"%s\"",resource);
+
logAdd("Couldn't open resource \"%s\"",resource);
 
return errno;
 
return errno;
 
}
 
}
else logAddNum("New connection: Stream%d.",fd);
+
else logAdd("New connection: Stream%d.",fd);
  
 
// Create new streamInfo structure and allocate buffers
 
// Create new streamInfo structure and allocate buffers
Line 217: Line 222:
  
 
// Create new client-node in clients-loop and add the new streamInfo
 
// Create new client-node in clients-loop and add the new streamInfo
node n = nodeGetValue(nodeSTREAMS,nodeLOOP);
+
node n = nodeLoopInsert(nodeSTREAMS,0);
n = nodeLoopInsert(n,0);
 
nodeSetValue(nodeSTREAMS,nodeLOOP,n);
 
nodeSetValue(n,nodeCODE,nodeTRUE);
 
*nodeState(n,0) = &io;
 
 
*nodeState(n,nodeSTREAM) = info;
 
*nodeState(n,nodeSTREAM) = info;
 +
*nodeState(n,nodeCODE) = &io;
 
return fd;
 
return fd;
 
}
 
}
Line 229: Line 231:
 
// Close the passed stream, free its resources and remove from clients-loop
 
// Close the passed stream, free its resources and remove from clients-loop
 
void streamClose(streamInfo *info) {
 
void streamClose(streamInfo *info) {
printf("Stream%d closed.\n",info->fd);
+
logAdd("Stream%d closed.\n",info->fd);
//nodeLoopRemove(this); // NOTE: reduction not handling PREV
+
nodeLoopRemove(parent,this);
 
free(info->iBuf);
 
free(info->iBuf);
 
free(info->oBuf);
 
free(info->oBuf);
Line 242: Line 244:
 
// Get pointer to the streamInfo structure in current nodal context (this)
 
// Get pointer to the streamInfo structure in current nodal context (this)
 
streamInfo *info = *nodeState(this, nodeSTREAM);
 
streamInfo *info = *nodeState(this, nodeSTREAM);
fd = info->fd;
+
int fd = info->fd, type = info->type, size = info->size;
 
char *iBuf = info->iBuf, *oBuf = info->oBuf;
 
char *iBuf = info->iBuf, *oBuf = info->oBuf;
int type = info->type, size = info->szPacket;
 
  
 
// Try and read a packet from the stream (see [[[[http://www.opengroup.org/onlinepubs/009695399/functions/select.html|select]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/recv.html|recv]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/read.html|read]]]])
 
// Try and read a packet from the stream (see [[[[http://www.opengroup.org/onlinepubs/009695399/functions/select.html|select]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/recv.html|recv]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/read.html|read]]]])
int i = 0, n = 0;
+
int n = -1;
if (type == ioSOCK) {
+
if (type == ioFILE)
if (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0) n = recv(fd,pBuf,size,0);
+
n = read(fd,pBuf,size);
}
+
else if ((type == ioSOCK) && (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0))
else if (type == ioFILE) n = read(fd,pBuf,size);
+
n = recv(fd,pBuf,size,0);
  
// If any bytes read, append to current message (loop last packet if msg too big)
+
// If any bytes read, append to current message
 
if (n > 0) {
 
if (n > 0) {
if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE;
+
if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE; // loop last packet if msg too big
 +
int i = 0;
 
while (n--)  // Append current msg, for each complete msg, process it & reset iPtr
 
while (n--)  // Append current msg, for each complete msg, process it & reset iPtr
 
if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0;
 
if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0;
Line 267: Line 269:
 
// write a packet from outbuf
 
// write a packet from outbuf
 
//int len = ioPAKSIZE; // or less
 
//int len = ioPAKSIZE; // or less
//if (send(fd,oBuf+oPtr,len,0)<0) logErr("send() failed!");
+
//if (send(fd,oBuf+oPtr,len,0)<0) ioErr("send() failed!");
 
// }
 
// }
  
Line 281: Line 283:
 
ioErr("server/accept() failed returning %d");
 
ioErr("server/accept() failed returning %d");
 
}
 
}
 +
</source>
 +
[[Category:C]]

Latest revision as of 13:10, 13 December 2019

Legacy.svg Legacy: This article describes a concept that has been superseded in the course of ongoing development on the Organic Design wiki. Please do not develop this any further or base work on this concept, this is only useful for a historic record of work done. You may find a link to the currently used concept or function in this article, if not you can contact the author to find out what has taken the place of this legacy item.
// http://www.organicdesign.co.nz/peerd - nodal p2p wiki daemon
// This article and all its includes are licenced under LGPL
// GPL: http://www.gnu.org/copyleft/lesser.html
// SRC: http://www.organicdesign.co.nz/io.c
// included in http://www.organicdesign.co.nz/peerd.c
// See also: http://www.organicdesign.co.nz/talk:io.c, http://www.kegel.com/c10k.html

#define ioDELAY 0         // normal operation is 0 for no delay
#define ioPAKSIZE 128     // keep packet-size small for multipelxed design
#define ioPAKMAX  16384   // maximum packet size to allow for variable pak size at runtime
#define filePAKSIZE 4096  // packet size for file-streams
#define sockPAKSIZE 128   // packet size for socket-streams
#define ioBUFSIZE 4096    // dictates max message size
#define ioMAXCLIENTS 1000 // used by listen()
#define ioSOCK 0          // stream type of socket
#define ioFILE 1          // stream type of file

#ifndef __WIN32__
#define WSAGetLastError() errno
#endif

// Socket structures, globals and prototypes
void server();
int ioInit();
int ioExit();
int nonblocking(int);
void ioErr(char*);
fd_set *fdsetInit(int);
fd_set fdset;
int sock;
struct sockaddr_in addr;
struct timeval to;
unsigned long int sockopt_on = 1;

// Client/Stream info structure, globals and prototypes
char *pBuf;                // Buffer to read/write data packets
const char *term = "\r\n\r\n";
typedef struct si {
	int fd;                // File-descriptor for this stream
	int type;              // Stream type (ioFILE, ioSOCK)
	int size;              // Size of the packets to read/write for this stream
	char *iBuf, *oBuf;     // Message buffers for this stream
	int iPtr;              // Index into input-buffer for next packet to start at
	int tPtr;              // Index into term (atomised strcmp incase terminator spans packet-boundary)
	int oPtr;
	} streamInfo;
void io();
int streamOpen(int,char*);
void streamClose(streamInfo*);
void streamProcess(streamInfo*);


// ----------------------------------------------------------------------------------------- //
// io.c

// Initialise socket listening on specified port
int ioInit() {

	#if __WIN32__
	WSADATA wsaData;
	if (WSAStartup(MAKEWORD(2,0),&wsaData)!=0) logAdd("WSAStartup() failed!");
	#endif

	// Set up structures for socket & select
	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);
	addr.sin_addr.s_addr = htonl(INADDR_ANY);
	memset(&(addr.sin_zero),0,8);
	errno = 0;

	// Do the usual socket polava: create,options,bind,listen
	if ((sock = socket(PF_INET,SOCK_STREAM,0))<0)
		ioErr("init/socket() failed returning %d");
	else if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
		ioErr("init/setsockopt() failed returning %d");
	else if (bind(nonblocking(sock),(struct sockaddr*)&addr,sizeof(struct sockaddr))<0)
		ioErr("init/bind() failed returning %d");
	else if (listen(sock,ioMAXCLIENTS)<0)
		ioErr("init/listen() failed returning %d");
	else if (WSAGetLastError() == 0) {
		char *msg = malloc(100);
		sprintf(msg,"Daemon \"%s\" started successfully and serving on port %d.",peer,port);
		logAdd(msg);
		free(msg);
		}
	else {
		logAdd("Server failed to start!");
		return WSAGetLastError();
		}

	// Globals for server() and client() quantum-functions
	pBuf = malloc(ioPAKMAX);
	*nodeState(nodeSERVER,nodeCODE) = &server;

	return 0;
	}


// Perform any cleanup for socket
int ioExit() {
	free(pBuf);
	// todo: close streams and free buffers etc
	#if __WIN32__
	WSACleanup();
	#endif
	return WSAGetLastError();
	}


// make the passed socket non-blocking so accept() returns straight away for multiplexed model
// - if no incoming requests, returns EAGAIN or EWOULDBLOCK state
int nonblocking(int socket) {
	#if __WIN32__
	ioctlsocket(socket,FIONBIO,&sockopt_on);
	#else
	fcntl(socket,F_SETFL,fcntl(socket,F_GETFL)|O_NONBLOCK);
	#endif
	return socket;
	}


void ioErr(char *msg) {
	logAdd(msg,errno = WSAGetLastError());
	}


// Return pointer to fdset filled with passed file-descriptor ready for a select() call
fd_set *fdsetInit(int fd) {
	FD_ZERO(&fdset);
	FD_SET(fd,&fdset);
	to.tv_sec = to.tv_usec = ioDELAY;
	return &fdset;
	}


// Process message in iBuf which should be a complete HTTP GET request
// - iPtr is put back to 0 so next message fills from start of iBuf
void streamProcess(streamInfo *info) {
	char *rqst, *end, *msg = info->iBuf;
	if (strncmp("GET /",msg,5)==0) {

		// Extract request path & query-string
		rqst = end = msg+4;
		while(*++end > ' ');
		*end = '\0';
		logAdd("Request: \"%s\"",rqst);

		// Exit if /stop requested
		if (strcmp("/stop",rqst)==0) nodeExit();

		// Send test response back
		send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nStink eh?",134,0);
		}
	else {
		send(info->fd,"HTTP/1.1 200 OK\r\nDate: Mon, 31 Jul 2006 22:27:14 NZST\r\nContent-Type: text/html\r\nConnection: keep-alive\r\nContent-Length: 9\r\n\r\nNo GET!!!",134,0);
		logAdd("Peerd only supports GET requests currently!");
		}
	info->iPtr = 0; // reset iPtr ready for next message
	}


// Create new client-node and streamInfo structure from passed file-descriptor
// - if resource not NULL, then try and create fd by resolving the resource string
// - also if resource is non-NULL, fd can be used to pass fopen flags
int streamOpen(int fd, char *resource) {

	// Create new streamInfo structure for the new stream (default to socket params)
	streamInfo *info = malloc(sizeof(streamInfo));
	info->type = ioSOCK;
	info->size = sockPAKSIZE;

	// Try and obtain an fd from resource description
	if (resource) {
		if ((*resource=='.')||(*resource=='/')) {
			// The resource is a file, use [[[[http://www.opengroup.org/onlinepubs/009695399/functions/open.html|open]]]] (the fd can be used to pass open-flags)
			if ((fd = nonblocking(open(resource,fd)))<0)
				ioErr("streamOpen: File open failed returning %d");
			else {
				info->type = ioFILE;
				info->size = filePAKSIZE;
				}
			}
		else {
			// Resolve IP/port from resource text description and attempt connection
			// - see [[[[http://developer.novell.com/wiki/index.php/BSD_gethost_functions|gethost example]]]], [[[[http://www.delorie.com/gnu/docs/glibc/libc_319.html|struct hostent]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/gethostbyname.html|gethostbyname]]]], [[[[http://www.die.net/doc/linux/man/man2/connect.2.html|connect]]]]
			struct hostent *host = gethostbyname(resource);
			if (host == NULL )
				ioErr("streamOpen/gethostbyname() failed returning %d");
			else {
				struct in_addr ia;
				memset((char*)&ia,0,sizeof(struct in_addr));
				ia.s_addr = *(u_long*)host->h_addr;
				logAdd("hostAddr: %s\n",inet_ntoa(ia));
				//if ((fd = socket(PF_INET,host->h_addrtype,0))<0)
				//	ioErr("streamOpen/socket() failed returning %d");
				//else if (setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&sockopt_on,sizeof(int))<0)
				//	ioErr("streamOpen/setsockopt() failed returning %d");
				//else if (connect(fd,ntohl(ia),sizeof(uint_32_t))<0)
				//	ioErr("streamOpen/connect() failed returning %d");
				//else {
				//	}
				fd = -1;
				}
			}
		}

	// Clean up and exit if no valid fd obtained
	if (fd<0) {
		free(info);
		logAdd("Couldn't open resource \"%s\"",resource);
		return errno;
		}
	else logAdd("New connection: Stream%d.",fd);

	// Create new streamInfo structure and allocate buffers
	info->fd = fd;
	info->iBuf = malloc(ioBUFSIZE);
	info->oBuf = malloc(ioBUFSIZE);
	info->iPtr = info->oPtr = info->tPtr = 0;

	// Create new client-node in clients-loop and add the new streamInfo
	node n = nodeLoopInsert(nodeSTREAMS,0);
	*nodeState(n,nodeSTREAM) = info;
	*nodeState(n,nodeCODE) = &io;
	return fd;
	}


// Close the passed stream, free its resources and remove from clients-loop
void streamClose(streamInfo *info) {
	logAdd("Stream%d closed.\n",info->fd);
	nodeLoopRemove(parent,this);
	free(info->iBuf);
	free(info->oBuf);
	free(info);
	}


// Send and/or receive a packet of data over the current node's associated stream
void io() {

	// Get pointer to the streamInfo structure in current nodal context (this)
	streamInfo *info = *nodeState(this, nodeSTREAM);
	int fd = info->fd, type = info->type, size = info->size;
	char *iBuf = info->iBuf, *oBuf = info->oBuf;

	// Try and read a packet from the stream (see [[[[http://www.opengroup.org/onlinepubs/009695399/functions/select.html|select]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/recv.html|recv]]]], [[[[http://www.opengroup.org/onlinepubs/009695399/functions/read.html|read]]]])
	int n = -1;
	if (type == ioFILE)
		n = read(fd,pBuf,size);
	else if ((type == ioSOCK) && (select(fd+1,fdsetInit(fd),NULL,NULL,&to)>0))
		n = recv(fd,pBuf,size,0);

	// If any bytes read, append to current message
	if (n > 0) {
		if (info->iPtr > ioBUFSIZE-ioPAKSIZE) info->iPtr -= ioPAKSIZE; // loop last packet if msg too big
		int i = 0;
		while (n--)  // Append current msg, for each complete msg, process it & reset iPtr
			if ((iBuf[info->iPtr++] = pBuf[i++]) != term[info->tPtr++]) info->tPtr = 0;
			else if (term[info->tPtr] == 0) streamProcess(info);
		}
	else if (n == 0) streamClose(info); // Zero bytes were read, do orderly termination
	else ioErr("stream/recv() failed returning %d");

	// Data to send?
	//if (select(fd+1,NULL,fdsetInit(fd),NULL,&to)>0) {
		// write a packet from outbuf
		//int len = ioPAKSIZE; // or less
		//if (send(fd,oBuf+oPtr,len,0)<0) ioErr("send() failed!");
	//	}

	}


// Function called by each stream-node in network's reduction loop
// - Checks if any new connections waiting on listening socket,
//   if so [[[[http://www.opengroup.org/onlinepubs/009695399/functions/accept.html|accept]]]] and call streamOpen() with new file-descriptor
void server() {
	if (select(sock+1,fdsetInit(sock),NULL,NULL,&to)>0)
		if (streamOpen(accept(sock,NULL,NULL),NULL)<0)
			ioErr("server/accept() failed returning %d");
	}