Difference between revisions of "Server.c"

From Organic Design wiki
m
(use an si struct again)
Line 27: Line 27:
 
addr.sin_port = htons(atoi(*hash("port")));
 
addr.sin_port = htons(atoi(*hash("port")));
 
addr.sin_addr.s_addr = htonl(INADDR_ANY);
 
addr.sin_addr.s_addr = htonl(INADDR_ANY);
 +
 +
// struct for a stream-node's state to point to
 +
typedef struct siStruct {
 +
int fd, inptr, outptr;
 +
char *inbuf, *outbuf;
 +
} streamInfo;
  
 
// make the passed socket non-blocking so accept() returns straight away for multiplexed model
 
// make the passed socket non-blocking so accept() returns straight away for multiplexed model
Line 58: Line 64:
 
// Function called by main network node in root reduction loop
 
// Function called by main network node in root reduction loop
 
void server() {
 
void server() {
 +
int fd;
 
FD_ZERO(fdset);
 
FD_ZERO(fdset);
 
FD_SET(sock,fdset);
 
FD_SET(sock,fdset);
 
to.tv_sec = to.tv_usec = DELAY;
 
to.tv_sec = to.tv_usec = DELAY;
 
if (select(sock+1, fdset, NULL, NULL, &to)>0) {
 
if (select(sock+1, fdset, NULL, NULL, &to)>0) {
// New connection, create new stream-node  
+
// New connection request, accpet and create new stream-node  
int *stream = malloc(sizeof(int));
+
if (nonblocking(fd = accept(sock, NULL, NULL))>0) {
nonblocking(*stream = accept(sock, NULL, NULL));
+
streamInfo *info = malloc(sizeof(streamInfo));
 
+
info->fd = fd;
// todo: change this to a single state with ptr-to-struct of these pointers
+
info->inbuf = malloc(BUFSIZE);
nodeSetState(newNode, STREAM, stream);
+
info->outbuf = malloc(BUFSIZE);
*nodeSetState(newNode, INBUF, malloc(BUFSIZE)) = '\0';
+
info->inptr = 0;
*nodeSetState(newNode, INPTR, malloc(sizeof(int*))) = 0;
+
info->outptr, 0;
*nodeSetState(newNode, OUTBUF, malloc(BUFSIZE)) = '\0';
+
nodeSetState(newNode = add-me-to-stream-loop, STREAMINFO, info);
*nodeSetState(newNode, OUTPTR, malloc(sizeof(int*))) = 0;
+
}
 +
else logErr("accpet(): failed!");
 
}
 
}
 
}
 
}
Line 78: Line 86:
 
// Function called by each stream in network's reduction loop
 
// Function called by each stream in network's reduction loop
 
void stream() {
 
void stream() {
int i, *stream = nodeGetState(this, STREAM);
+
streamInfo info = nodeGetState(this, STREAMINFO);
char *inbuf = nodeGetState(this, INBUF);
+
int i,fd = info->fd, inptr = info->inptr, outptr = info->outptr;
char *outbuf = nodeGetState(this, OUTBUF);
+
char *inbuf = info->inbuf, *outbuf = info->outbuf;
  
 
// Data to receive?
 
// Data to receive?
 
FD_ZERO(fdset);
 
FD_ZERO(fdset);
FD_SET(*stream,fdset);
+
FD_SET(fd,fdset);
 
to.tv_sec = to.tv_usec = DELAY;
 
to.tv_sec = to.tv_usec = DELAY;
if (select(1+*stream, fdset, NULL, NULL, &to)>0) {
+
if (select(fd+1, fdset, NULL, NULL, &to)>0) {
 
// read a packet into inbuf
 
// read a packet into inbuf
if ((i = recv(*stream, inbuf+*inptr, PAKSIZE, 0)>0) {
+
if ((i = recv(fd, inbuf+inptr, PAKSIZE, 0)>0) {
 
if (complete_message) {
 
if (complete_message) {
*inptr -= msg_size;
+
inptr = info->inptr -= msg_size;
 
// hook a process into the loop
 
// hook a process into the loop
 
}
 
}
Line 97: Line 105:
 
// zero bytes to read, do orderly termination
 
// zero bytes to read, do orderly termination
 
// todo: remove this stream node from loop
 
// todo: remove this stream node from loop
free(stream);
+
free(info);
free(inbuf);
 
free(inptr);
 
free(outbuf);
 
free(outptr);
 
 
}
 
}
 
else logErr("recv(): failed!");
 
else logErr("recv(): failed!");
Line 109: Line 113:
 
// Data to send?
 
// Data to send?
 
FD_ZERO(fdset);
 
FD_ZERO(fdset);
FD_SET(*stream,fdset);
+
FD_SET(fd,fdset);
 
to.tv_sec = to.tv_usec = DELAY;
 
to.tv_sec = to.tv_usec = DELAY;
if (select(1+*stream, NULL, fdset, NULL, &to)>0) {
+
if (select(fd+1, NULL, fdset, NULL, &to)>0) {
 
// write a packet from outbuf
 
// write a packet from outbuf
 
}
 
}
  
 
}
 
}

Revision as of 01:44, 19 July 2006

// This article and all its includes are licenced under LGPL // GPL: http://www.gnu.org/copyleft/lesser.html // SRC: http://www.organicdesign.co.nz

  1. define DELAY 1 // normal operation is 0 for no delay
  2. define PAKSIZE 128 // keep packet-size small for non-multithreaded design
  3. define BUFSIZE 10000 // dictates max message size
  4. define MAXCLIENTS 1000 // used by listen()
  1. ifdef WINDOWS
  2. include <winsock.h>
  3. else
  4. include <sys/socket.h>
  5. include <sys/select.h>
  6. include <netinet/in.h>
  7. include <fcntl.h> // needed for O_NONBLOCK option on sock
  8. include <sys/time.h> // for select()
  9. endif

// Set up structures for socket & select struct sockaddr_in addr; struct timeval to; fd_set *fdset; int sock, sockopt_on = 1, szAddr = sizeof(struct sockaddr_in); memset((char*)&addr, 0, szAddr); // zero the struct addr.sin_family = PF_INET; addr.sin_port = htons(atoi(*hash("port"))); addr.sin_addr.s_addr = htonl(INADDR_ANY);

// struct for a stream-node's state to point to typedef struct siStruct { int fd, inptr, outptr; char *inbuf, *outbuf; } streamInfo;

// make the passed socket non-blocking so accept() returns straight away for multiplexed model // - if no incoming requests, returns EAGAIN or EWOULDBLOCK state int nonblocking(int socket) { int opts = fcntl(sock, F_GETFL); if (opts<0) logAdd("noblock() failed!"); opts = (opts|O_NONBLOCK); if (fnctl(socket,F_SETFL,opts)<0) logAdd("noblock() failed!"); return socket; }

// Do the usual socket polava: create,options,bind,listen if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) logAdd("socket() failed!"); if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &sockopt_on, sizeof(int)) < 0) logAdd("setsockopt() failed!"); if (bind(nonblocking(sock), (struct sockaddr*)&addr, szAddr) < 0) logAdd("bind() failed!"); if (listen(sock, MAXCLIENTS) < 0) logAdd("listen() failed!");


// Parses a message content and responds to client int processMessage(char* msg) { // test if restart cmd first

// send response // - NOTE: this should put the msg in si.out if (send(stream, MSG, strlen(MSG)+1, 0) == -1) logAdd("send() failed!"); else logAdd("Sent message"); }


// Function called by main network node in root reduction loop void server() { int fd; FD_ZERO(fdset); FD_SET(sock,fdset); to.tv_sec = to.tv_usec = DELAY; if (select(sock+1, fdset, NULL, NULL, &to)>0) { // New connection request, accpet and create new stream-node if (nonblocking(fd = accept(sock, NULL, NULL))>0) { streamInfo *info = malloc(sizeof(streamInfo)); info->fd = fd; info->inbuf = malloc(BUFSIZE); info->outbuf = malloc(BUFSIZE); info->inptr = 0; info->outptr, 0; nodeSetState(newNode = add-me-to-stream-loop, STREAMINFO, info); } else logErr("accpet(): failed!"); } }


// Function called by each stream in network's reduction loop void stream() { streamInfo info = nodeGetState(this, STREAMINFO); int i,fd = info->fd, inptr = info->inptr, outptr = info->outptr; char *inbuf = info->inbuf, *outbuf = info->outbuf;

// Data to receive? FD_ZERO(fdset); FD_SET(fd,fdset); to.tv_sec = to.tv_usec = DELAY; if (select(fd+1, fdset, NULL, NULL, &to)>0) { // read a packet into inbuf if ((i = recv(fd, inbuf+inptr, PAKSIZE, 0)>0) { if (complete_message) { inptr = info->inptr -= msg_size; // hook a process into the loop } } else if (i == 0) { // zero bytes to read, do orderly termination // todo: remove this stream node from loop free(info); } else logErr("recv(): failed!"); }


// Data to send? FD_ZERO(fdset); FD_SET(fd,fdset); to.tv_sec = to.tv_usec = DELAY; if (select(fd+1, NULL, fdset, NULL, &to)>0) { // write a packet from outbuf }

}