|
|
| Line 18: |
Line 18: |
| | #include <fcntl.h> // O_RDWR, O_NONBLOCK, F_GETFL, F_SETFL | | #include <fcntl.h> // O_RDWR, O_NONBLOCK, F_GETFL, F_SETFL |
| | #endif | | #endif |
| − |
| |
| − | // Set up structures for socket & select
| |
| − | struct timeval to;
| |
| − | fd_set *fdset;
| |
| − | int sock, sockopt_on = 1, szAddr = sizeof(struct sockaddr_in);
| |
| − | struct sockaddr_in *addr = calloc(1,szAddr);
| |
| − | addr->sin_family = PF_INET;
| |
| − | addr->sin_port = htons(atoi(*hash("port"))); // get port from arg-hash
| |
| − | addr->sin_addr.s_addr = htonl(INADDR_ANY);
| |
| − |
| |
| − | // struct for a stream-node's state to point to
| |
| − | typedef struct siStruct {
| |
| − | int fd, inptr, outptr;
| |
| − | char *inbuf, *outbuf;
| |
| − | } streamInfo;
| |
| − |
| |
| − | // make the passed socket non-blocking so accept() returns straight away for multiplexed model
| |
| − | // - if no incoming requests, returns EAGAIN or EWOULDBLOCK state
| |
| − | int nonblocking(int socket) {
| |
| − | int opts = fcntl(sock, F_GETFL);
| |
| − | if (opts<0) logAdd("nonblocking() failed!");
| |
| − | opts = (opts|O_NONBLOCK);
| |
| − | if (fcntl(socket,F_SETFL,opts)<0) logErr("nonblocking() failed!");
| |
| − | return socket;
| |
| − | }
| |
| − |
| |
| − | // Do the usual socket polava: create,options,bind,listen
| |
| − | if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) logErr("socket() failed!");
| |
| − | if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &sockopt_on, sizeof(int)) < 0) logErr("setsockopt() failed!");
| |
| − | if (bind(nonblocking(sock), addr, szAddr) < 0) logErr("bind() failed!");
| |
| − | if (listen(sock, MAXCLIENTS) < 0) logErr("listen() failed!");
| |
| − |
| |
| − |
| |
| − | // Parses a message content and responds to client
| |
| − | void processMessage(char* msg) {
| |
| − |
| |
| − | // Create a new context node
| |
| − |
| |
| − | // validate and extract event, params and mime-type
| |
| − | // /GET\s+\/+(.*?)(\/(.+))?\s+(HTTP\/[0-9.]+)/
| |
| − |
| |
| − | // add appropriate response to nodal context
| |
| − | // - restart, click?x&y, else content/mime/type
| |
| − |
| |
| − | }
| |
| − |
| |
| − |
| |
| − | // Function called by each stream-node in network's reduction loop
| |
| − | // - there is also a streams-container node containing zero or more stream nodes
| |
| − | void server() {
| |
| − | FD_ZERO(fdset);
| |
| − | FD_SET(sock,fdset);
| |
| − | to.tv_sec = to.tv_usec = DELAY;
| |
| − | if (select(sock+1, fdset, NULL, NULL, &to)>0) {
| |
| − | // New connection request, accpet and create new stream-node
| |
| − | int fd = nonblocking(accept(sock, NULL, NULL));
| |
| − | if (fd>0) {
| |
| − | streamInfo *newsi = malloc(sizeof(streamInfo));
| |
| − | newsi->fd = fd;
| |
| − | newsi->inbuf = malloc(BUFSIZE);
| |
| − | newsi->outbuf = malloc(BUFSIZE);
| |
| − | newsi->inptr = 0;
| |
| − | newsi->outptr = 0;
| |
| − | //nodeSetState(newNode = add-me-to-stream-loop, STREAMINFO, newsi);
| |
| − | }
| |
| − | else logErr("accept(): failed!");
| |
| − | }
| |
| − | }
| |
| − |
| |
| − |
| |
| − | // Each of the stream nodes points to this function in its State
| |
| − | void client() {
| |
| − |
| |
| − | // Get the info for this stream ready for reading/writing a packet if necessary
| |
| − | streamInfo *info = nodeGetState(this, STREAMINFO);
| |
| − | int i,fd = info->fd, inptr = info->inptr, outptr = info->outptr;
| |
| − | char *inbuf = info->inbuf, *outbuf = info->outbuf;
| |
| − |
| |
| − | // Data to receive?
| |
| − | FD_ZERO(fdset);
| |
| − | FD_SET(fd,fdset);
| |
| − | to.tv_sec = to.tv_usec = DELAY;
| |
| − | if (select(fd+1, fdset, NULL, NULL, &to)>0) {
| |
| − | // read a packet into inbuf
| |
| − | if (i = recv(fd, inbuf+inptr, PAKSIZE, 0)>0) {
| |
| − |
| |
| − | // test if complete message in buffer
| |
| − | char *message = NULL;
| |
| − | while (i--) {
| |
| − | // /\r?\n\r?\n\x00?/
| |
| − | }
| |
| − |
| |
| − | if (message) {
| |
| − | //inptr = info->inptr -= msg_size;
| |
| − | // hook a process into the loop
| |
| − | }
| |
| − | }
| |
| − | else if (i == 0) {
| |
| − | // zero bytes to read, do orderly termination
| |
| − | // todo: remove this stream node from loop
| |
| − | free(info);
| |
| − | }
| |
| − | else logErr("recv(): failed!");
| |
| − | }
| |
| − |
| |
| − | // Data to send?
| |
| − | FD_ZERO(fdset);
| |
| − | FD_SET(fd,fdset);
| |
| − | to.tv_sec = to.tv_usec = DELAY;
| |
| − | if (select(fd+1, NULL, fdset, NULL, &to)>0) {
| |
| − | // write a packet from outbuf
| |
| − | int len = PAKSIZE; // or less
| |
| − | if (send(fd, outbuf+outptr, len, 0) == -1) logAdd("send() failed!");
| |
| − | }
| |
| − |
| |
| − | }
| |