XML Socket.pl

From Organic Design wiki
Revision as of 12:40, 8 December 2011 by Nad (talk | contribs)
Legacy.svg Legacy: This article describes a concept that has been superseded in the course of ongoing development on the Organic Design wiki. Please do not develop this any further or base work on this concept, this is only useful for a historic record of work done. You may find a link to the currently used concept or function in this article, if not you can contact the author to find out what has taken the place of this legacy item.

<perl>

  1. !/usr/bin/perl -w

use IO::Socket; use IO::Select; use Cwd; use strict;

  1. Main Functions

sub sendMessage; sub processMessage; sub processOutgoingEvents; sub parent; sub leaf; sub guid; sub logAdd;

  1. Determine if running on linux from leading slash in $cwd

our $cwd = cwd; $cwd =~ s/\//\\/g unless $cwd =~ /^\//; $cwd =~ s/[\\\/]$//g;

  1. The port that this event listens for incoming messages on
  2. our $thisPort = node '/private/this-port';

our $thisPort = 2012;

  1. We don't know thisPeer until our IP is known, but that is only known (currently) from incoming
  2. - defined on reception of incoming message which includes this IP in the 'to' key
  3. - added in 'from' key of outgoing message
  4. - format: peer-IP:Port or interface-Guid

our $thisPeer = undef;

  1. Stores actual current potentialPeers (as used by IO:Select)
  2. - indexed by fileno(handle) called $stream
  3. - activePeers are a hash because it stores many local peer properties

our %activePeers; our $activePeersChanged = 0;

  1. All activePeers found by all so far
  2. - the principle is propagate what we know works, don't propagate what we know fails
  3. - potentialPeers is a list because it only knows basic connection info

our @potentialPeers = ();

  1. Next peer after us in potentialPeerList
  2. - another reason potenialPeerList is best as a list not a hash
  3. - Currently, all potentialPeers form a ring of communication

our $nextPeer = undef;

  1. A unique guid is sent with our requestAccess so we don't talk to ourselves
  2. - we can use this to discover our own IP as we discover our IP on first incoming request

our $thisGuid = guid;

  1. our $logPattern = node('/private/log-pattern');

our $logPattern = '.*';

  1. Initialise server on listening $thisPort

our $server = new IO::Socket::INET Listen=>1, LocalPort=>$thisPort, Proto=>'tcp'; our $select = new IO::Select $server; logAdd "================================================================================";

  1. Attempt to send connection message to each potentialPeer
  2. - we preceed them with "peer-" so they can be valid XML tags
  3. - version info is sent on new connection
  4. sendMessage "peer-$_" for split /,/, node '/private/potential-peers';

sleep(5); sendMessage "peer-$_" for split /,/, '192.168.7.1:2012,192.168.7.2:2012,192.168.7.3:2012,192.168.7.4:2012';

  1. ______________________________________________________________________________________________________________________ #
  2. MAIN SERVER LOOP

while (1) {

# Handle output potentialPeers? # - this may be needed if outgoing msg is larger than recipients' input-buffer # for $handle ($select->can_write) {syswrite($fh, $obuffer);}

# Handle input potentialPeers for my $handle ($select->can_read) { my $stream = fileno $handle; if ($handle == $server) { # NEW: handle is the server, set up a new peer # - but it can't be identified until its msg processed my $newhandle = $server->accept; $stream = fileno $newhandle; $select->add($newhandle); $activePeers{$stream}{'buffer'} = ; $activePeers{$stream}{'handle'} = $newhandle; logAdd "New connection: Stream$stream", 'main/new'; } elsif (sysread $handle, my $input, 10000) { # DATA: handle is an existing stream with data to read (NOTE: we should disconnect after vertain size limit) # - Process (and remove) all complete (null-terminated) messages from this potentialPeers buffer $activePeers{$stream}{'buffer'} .= $input; if ($activePeers{$stream}{'buffer'} =~ s/^(.*\x00)//s) {processMessage $_, $handle for split /\x00/, $1} } else { # DEL: handle is an existing stream with no more data to read my $peer; if (defined $activePeers{$stream}{'interface'}) { # If its an interface disconnecting, create a removal event $peer = $activePeers{$stream}{'interface'}; $events{&guid} = "now,interfaces.deleteInterface,$peer"; } else {$peer = $activePeers{$stream}{'peer'}} # Remove from IO:Select, activePeers and PotentialPeers $select->remove($handle); delete $activePeers{$stream}; $activePeersChanged = 1; @potentialPeers = grep {(index $_, $peer) < 0} @potentialPeers; $handle->close; logAdd "$peer disconnected.", 'main/del'; } }

# Send current events (and changes to activePeers) to potentialPeers updatePotentialPeers if $activePeersChanged; logAdd 'activePeersChanged', 'main' if $activePeersChanged; processOutgoingEvents;

# Periodically write to cache.xml #XMLout($publicSpace, outputfile=>$cacheFile, keyattr=>[], xmldecl=>1, rootname=>'cache', noattr=>1);

}

  1. END
  2. ______________________________________________________________________________________________________________________ #
  1. Encode and send message to a peer by IP:port
  2. - all message content is events (nodal commands)
  3. - if there recipient is not an activePeer, then a new connection is attempted
  4. - version info is sent with new connections incase our data is out-of-date
  5. - all our activePeers are sent with every message
  6. - format is md5(24chr),subject&content(mime64)\x00

sub sendMessage { my $peer = shift; my $content = shift; my $handle; logAdd "sendMessage($peer)", 'send'; $content = unless defined $content; # Add peer info to outgoing message my $subject = "to=$peer&port=$thisPort&add=peer&guid=$thisGuid"; $subject .= "&from=$thisPeer" if defined $thisPeer; # Add all our active-peers' names to outgoing message (not interfaces) my $peers = join ',', getActivePeers; $subject .= "&peers=$peers" if $peers; logAdd " $_", 'send' for split /&/, $subject; # Get handle from peer name (this interface/peer thing is messy!) for (keys %activePeers) { my $p = exists $activePeers{$_}{'peer'} ? $activePeers{$_}{'peer'} : $activePeers{$_}{'interface'}; $handle = $activePeers{$_}{'handle'} if $p eq $peer; } # If no handle found, try establishing a new stream unless (defined $handle) { $peer =~ /([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+$)/; # get rid of "peer-" bit $handle = IO::Socket::INET->new(PeerAddr=>$1, Proto=>'tcp'); return logAdd " Couldn't establish connection to $peer!", 'send' unless defined $handle; # Connected to new peer, add to local activePeers my $stream = fileno $handle; $activePeers{$stream}{'buffer'} = ; $activePeers{$stream}{'handle'} = $handle; $activePeers{$stream}{'peer'} = $peer; $activePeersChanged = 1; $select->add($handle); logAdd " Stream$stream is connection to $peer", 'send'; } # Send message print $handle "$subject|$content\0"; }

  1. ______________________________________________________________________________________________________________________ #
  1. Decode and execute received message
  2. - executes received events (nodal commands)
  3. - replies with updates if local is more recent
  4. - Discovers own IP from incoming 'to' field if still unknown
  5. - Disconnects stream if it is us
  6. - Add sender to our activePeers
  7. - Add senders' activePeers to our potentialPeers

sub processMessage { my $data = shift; my $handle = shift; my $stream = fileno $handle; my %msg; my $peer; logAdd 'processMessage()', 'main'; # Decode message (my $subject, $msg{'events'}) = split /[|]/, $data; $msg{$1} = /(.+)=(.*)/ ? $2 : /(.+)/ for split /&/, $subject; logAdd " $_", 'recv' for split /&/, $subject; # Set self IP incase not known yet $thisPeer = $msg{'to'} unless defined $thisPeer; # Identify and add sender if 'add' field found if (exists $msg{'add'}) { my $add = $msg{'add'}; if ($add eq 'interface') { # Sender is an interface, initiate an addInterface event for interfaces $peer = "interface$thisPeer-".$msg{'guid'}; $peer =~ s/peer//; $events{&guid} = "now,interfaces.addInterface,$peer"; } else { # Sender is a peer my ($iport, $iaddr) = sockaddr_in getpeername $handle; $peer = 'peer-'.inet_ntoa($iaddr).':'.$msg{'port'}; } # If peer is self, disconnect and exit (before adding as activePeer) if ($msg{'guid'} eq $thisGuid) { logAdd " Stream$stream identified as us, disconnecting...", 'recv'; $select->remove($handle); delete $activePeers{$stream}; return $handle->close; } # Add sender to our activePeers... unless (exists $activePeers{$stream}{$add}) { logAdd " Stream$stream identified as $peer ($add)", 'recv'; $activePeers{$stream}{$add} = $peer; if ($add eq 'peer') { $activePeersChanged = 1; # ...and senders' activePeers to our potentialPeers updatePotentialPeers $msg{'peers'} if exists $msg{'peers'}; } } } # Update event queue with any new events (content) received (discarded if already seen) for (split /&/, $msg{'events'}) { /(.+)=(.*)$/; unless (exists $events{$1}) { $events{$1} = $2; #logAdd "new event received:$1 = $2", 'recv'; } } # Interface logging logAdd $msg{'log'}, 'SWF-'.uc(substr($activePeers{$stream}{'interface'}, -6)) if exists $msg{'log'}; # Interface requests peer connection sendMessage "peer-".$msg{'connect'} if exists $msg{'connect'}; }

  1. ______________________________________________________________________________________________________________________ #

sub processOutgoingEvents { my @msg = (); my $qs; # Build a query-string of the queued-events while (my ($k, $v) = each %events) { if ($v) { # adds to qs if not already processed (=0) push @msg, "$k=$v"; logAdd "sending event:$k = $v", 'main'; $events{$k} = 0; } } $qs = join '&', @msg; # If nextPeer defined, send events to next if (defined $nextPeer) { sendMessage $nextPeer, $qs if $activePeersChanged or $#msg >= 0; $activePeersChanged = 0; } # Also send queued changes directly to all local interfaces if ($#msg >= 0 ) { sendMessage $activePeers{$_}{'interface'}, $qs for grep exists $activePeers{$_}{'interface'}, keys %activePeers; } }

  1. Update potential peers from activePeers and sent comma-separated-list of peers
  2. - The list is sorted so that all peers can form a ring with their nextPeer's
  3. - called when $activePeers changed or new options arrive

sub updatePotentialPeers { my $addPeers = shift; my %peers = (); map $peers{$_} = 1, split /,/, $addPeers if defined $addPeers; $peers{$_} = 1 for @potentialPeers; $peers{$_} = 1 for getActivePeers; $peers{$thisPeer} = 1 if defined $thisPeer; @potentialPeers = map $_, sort grep /^peer/, keys %peers; logAdd " $_", 'UPP' for @potentialPeers; # update nextPeer (actually use previous cos can use -1 array index) return unless defined $thisPeer; my $this = undef; for (0..$#potentialPeers) {$this = $_ if $potentialPeers[$_] eq $thisPeer} $nextPeer = ($#potentialPeers>0 and defined $this) ? $potentialPeers[--$this] : undef; logAdd " Next:$nextPeer", 'UPP' if defined $nextPeer; }

sub getActivePeers { return map $activePeers{$_}{'peer'}, grep exists $activePeers{$_}{'peer'}, keys %activePeers; }

  1. Generate a guid

sub guid { return 'p'.(rand()+time); }

  1. Add an item to the output log (and print if debugging)

sub logAdd { my $entry = shift; my $where = shift; $where = unless defined $where; return unless $where =~ /$logPattern/; $where = " $where" while length $where < 10; print localtime()." : $where : $entry\n"; } </perl>