Difference between revisions of "Sic/Perl"

From Organic Design wiki
m (Caretaker: typo)
m ({{perl}})
Line 1: Line 1:
 
#!/usr/bin/perl -w
 
#!/usr/bin/perl -w
#
+
# {{perl}}
 
# Liscenced under LGPL: www.gnu.org/copyleft/lesser.html
 
# Liscenced under LGPL: www.gnu.org/copyleft/lesser.html
 
#
 
#

Revision as of 23:39, 30 May 2010

  1. !/usr/bin/perl -w
  2. Our Perl scripts.
  3. Liscenced under LGPL: www.gnu.org/copyleft/lesser.html
  4. Nad - 2003 - SIC-Games network layer

use IO::Socket; use IO::Select; use Cwd; use strict;

  1. Main Functions

sub sendMessage; sub processMessage; sub processOutgoingEvents; sub updatePotentialPeers; sub getActivePeers; sub node; sub pathToHashRef; sub parent; sub leaf; sub guid; sub logAdd;

use constant CREATE => '__<createGuid>__';

  1. NODAL CORE
  2. Nodes are hashes containing other properties, functionRefs and other hashRefs
  3. Nodes can contain array items called 'queue' which is a list of its currently executing nodes
  4. Each of the items in the queue are either a functionRef or a hashRef (to another node with its own queue)
  5. Queues are rings/circular because refs shift off the bottom and push onto the top
  6. executed functions return ref to function/hash which is to be pushed on to replace them (ie what they become)
  7. function execution takes place relative to the passed cwd (the currently executing node)
  8. one function in the tree will execute for every iteration of the outside loop
  9. the inside loop walks the hash struct rotating each queue until an actual function is executed
  1. Three test threads each of three sequencial functions
  2. - Each simply prints its name and returns a reference to the next in the thread
  3. - In reality the functions would be acting on their local environment (the global $cwd hashRef)
  4. - and what's returned can be not only a function, but another hashRef or void if its finished

our ($A,$B,$C, $P,$Q,$R, $X,$Y,$Z); $A = sub {print "A"; return $B;}; $B = sub {print "B"; return $C;}; $C = sub {print "C"; return 0;};

$P = sub {print "P"; return $Q;}; $Q = sub {print "Q"; return $R;}; $R = sub {print "R"; return 0;};

$X = sub {print "X"; return $Y;}; $Y = sub {print "Y"; return $Z;}; $Z = sub {print "Z"; return 0;};

  1. The execution tree is set up with the root containing thread-ABC and another node which contains threads PQR & XYZ
  2. - So PQR and XYZ execute in parallel and as a whole they execute in parallel with ABC

my $someNode = {queue => [$P,$X]}; our $root = {queue => [$A,$someNode]}; our $now; if (0) { while (1) { $now = $root; for (my $i = 1; $i == 1;) { my $queue = $$now{'queue'}; if ($#$queue >= 0) { my $todo = shift @$queue; push @$queue, $_ if $_ = ref $todo eq 'CODE' ? ($i = &$todo) : ($now = $todo); } else {$i = 0} } }

exit; }

  1. ______________________________________________________________________________________________________________________ #
  1. Determine if running on linux from leading slash in $cwd

our $cwd = cwd; $cwd =~ s/\//\\/g unless $cwd =~ /^\//; $cwd =~ s/[\\\/]$//g;

  1. Main shared data-structure
  2. - NOTE: No time to implement this in P2P, using basic event-queue - %events

our $publicSpace = \{}; our $thisRoot = "Sic"; # root in hash for this project/script

  1. The port that this event listens for incoming messages on
  2. our $thisPort = node '/private/this-port';

our $thisPort = 2012;

  1. We don't know thisPeer until our IP is known, but that is only known (currently) from incoming
  2. - defined on reception of incoming message which includes this IP in the 'to' key
  3. - added in 'from' key of outgoing message
  4. - format: peer-IP:Port or interface-Guid

our $thisPeer = undef;

  1. Stores actual current potentialPeers (as used by IO:Select)
  2. - indexed by fileno(handle) called $stream
  3. - activePeers are a hash because it stores many local peer properties

our %activePeers; our $activePeersChanged = 0;

  1. All activePeers found by all so far
  2. - the principle is propagate what we know works, don't propagate what we know fails
  3. - potentialPeers is a list because it only knows basic connection info

our @potentialPeers = ();

  1. Next peer after us in potentialPeerList
  2. - another reason potenialPeerList is best as a list not a hash
  3. - Currently, all potentialPeers form a ring of communication

our $nextPeer = undef;

  1. Queue of unprocessed events
  2. - message content is always events

our %events;

  1. A unique guid is sent with our requestAccess so we don't talk to ourselves
  2. - we can use this to discover our own IP as we discover our IP on first incoming request

our $thisGuid = guid;

  1. our $logPattern = node('/private/log-pattern');

our $logPattern = '.*';

  1. Initialise server on listening $thisPort

our $server = new IO::Socket::INET Listen=>1, LocalPort=>$thisPort, Proto=>'tcp'; our $select = new IO::Select $server; logAdd "================================================================================";

  1. Attempt to send connection message to each potentialPeer
  2. - we preceed them with "peer-" so they can be valid XML tags
  3. - version info is sent on new connection
  4. sendMessage "peer-$_" for split /,/, node '/private/potential-peers';

sleep(5); sendMessage "peer-$_" for split /,/, '192.168.7.1:2012,192.168.7.2:2012,192.168.7.3:2012,192.168.7.4:2012';

  1. ______________________________________________________________________________________________________________________ #
  2. MAIN SERVER LOOP

while (1) {

# Handle output potentialPeers? # - this may be needed if outgoing msg is larger than recipients' input-buffer # for $handle ($select->can_write) {syswrite($fh, $obuffer);}

# Handle input potentialPeers for my $handle ($select->can_read) { my $stream = fileno $handle; if ($handle == $server) { # NEW: handle is the server, set up a new peer # - but it can't be identified until its msg processed my $newhandle = $server->accept; $stream = fileno $newhandle; $select->add($newhandle); $activePeers{$stream}{'buffer'} = ; $activePeers{$stream}{'handle'} = $newhandle; logAdd "New connection: Stream$stream", 'main/new'; } elsif (sysread $handle, my $input, 10000) { # DATA: handle is an existing stream with data to read (NOTE: we should disconnect after vertain size limit) # - Process (and remove) all complete (null-terminated) messages from this potentialPeers buffer $activePeers{$stream}{'buffer'} .= $input; if ($activePeers{$stream}{'buffer'} =~ s/^(.*\x00)//s) {processMessage $_, $handle for split /\x00/, $1} } else { # DEL: handle is an existing stream with no more data to read my $peer; if (defined $activePeers{$stream}{'interface'}) { # If its an interface disconnecting, create a removal event $peer = $activePeers{$stream}{'interface'}; $events{&guid} = "now,interfaces.deleteInterface,$peer"; } else {$peer = $activePeers{$stream}{'peer'}} # Remove from IO:Select, activePeers and PotentialPeers $select->remove($handle); delete $activePeers{$stream}; $activePeersChanged = 1; @potentialPeers = grep {(index $_, $peer) < 0} @potentialPeers; $handle->close; logAdd "$peer disconnected.", 'main/del'; } }

# Send current events (and changes to activePeers) to potentialPeers updatePotentialPeers if $activePeersChanged; logAdd 'activePeersChanged', 'main' if $activePeersChanged; processOutgoingEvents;

# Periodically write to cache.xml #XMLout($publicSpace, outputfile=>$cacheFile, keyattr=>[], xmldecl=>1, rootname=>'cache', noattr=>1);

}

  1. END
  2. ______________________________________________________________________________________________________________________ #
  1. Encode and send message to a peer by IP:port
  2. - all message content is events (nodal commands)
  3. - if there recipient is not an activePeer, then a new connection is attempted
  4. - version info is sent with new connections incase our data is out-of-date
  5. - all our activePeers are sent with every message
  6. - format is md5(24chr),subject&content(mime64)\x00

sub sendMessage { my $peer = shift; my $content = shift; my $handle; logAdd "sendMessage($peer)", 'send'; $content = unless defined $content; # Add peer info to outgoing message my $subject = "to=$peer&port=$thisPort&add=peer&guid=$thisGuid"; $subject .= "&from=$thisPeer" if defined $thisPeer; # Add all our active-peers' names to outgoing message (not interfaces) my $peers = join ',', getActivePeers; $subject .= "&peers=$peers" if $peers; logAdd " $_", 'send' for split /&/, $subject; # Get handle from peer name (this interface/peer thing is messy!) for (keys %activePeers) { my $p = exists $activePeers{$_}{'peer'} ? $activePeers{$_}{'peer'} : $activePeers{$_}{'interface'}; $handle = $activePeers{$_}{'handle'} if $p eq $peer; } # If no handle found, try establishing a new stream unless (defined $handle) { $peer =~ /([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+$)/; # get rid of "peer-" bit $handle = IO::Socket::INET->new(PeerAddr=>$1, Proto=>'tcp'); return logAdd " Couldn't establish connection to $peer!", 'send' unless defined $handle; # Connected to new peer, add to local activePeers my $stream = fileno $handle; $activePeers{$stream}{'buffer'} = ; $activePeers{$stream}{'handle'} = $handle; $activePeers{$stream}{'peer'} = $peer; $activePeersChanged = 1; $select->add($handle); logAdd " Stream$stream is connection to $peer", 'send'; } # Send message print $handle "$subject|$content\0"; }

  1. ______________________________________________________________________________________________________________________ #
  1. Decode and execute received message
  2. - executes received events (nodal commands)
  3. - replies with updates if local is more recent
  4. - Discovers own IP from incoming 'to' field if still unknown
  5. - Disconnects stream if it is us
  6. - Add sender to our activePeers
  7. - Add senders' activePeers to our potentialPeers

sub processMessage { my $data = shift; my $handle = shift; my $stream = fileno $handle; my %msg; my $peer; logAdd 'processMessage()', 'main'; # Decode message (my $subject, $msg{'events'}) = split /[|]/, $data; $msg{$1} = /(.+)=(.*)/ ? $2 : /(.+)/ for split /&/, $subject; logAdd " $_", 'recv' for split /&/, $subject; # Set self IP incase not known yet $thisPeer = $msg{'to'} unless defined $thisPeer; # Identify and add sender if 'add' field found if (exists $msg{'add'}) { my $add = $msg{'add'}; if ($add eq 'interface') { # Sender is an interface, initiate an addInterface event for interfaces $peer = "interface$thisPeer-".$msg{'guid'}; $peer =~ s/peer//; $events{&guid} = "now,interfaces.addInterface,$peer"; } else { # Sender is a peer my ($iport, $iaddr) = sockaddr_in getpeername $handle; $peer = 'peer-'.inet_ntoa($iaddr).':'.$msg{'port'}; } # If peer is self, disconnect and exit (before adding as activePeer) if ($msg{'guid'} eq $thisGuid) { logAdd " Stream$stream identified as us, disconnecting...", 'recv'; $select->remove($handle); delete $activePeers{$stream}; return $handle->close; } # Add sender to our activePeers... unless (exists $activePeers{$stream}{$add}) { logAdd " Stream$stream identified as $peer ($add)", 'recv'; $activePeers{$stream}{$add} = $peer; if ($add eq 'peer') { $activePeersChanged = 1; # ...and senders' activePeers to our potentialPeers updatePotentialPeers $msg{'peers'} if exists $msg{'peers'}; } } } # Update event queue with any new events (content) received (discarded if already seen) for (split /&/, $msg{'events'}) { /(.+)=(.*)$/; unless (exists $events{$1}) { $events{$1} = $2; #logAdd "new event received:$1 = $2", 'recv'; } } # Interface logging logAdd $msg{'log'}, 'SWF-'.uc(substr($activePeers{$stream}{'interface'}, -6)) if exists $msg{'log'}; # Interface requests peer connection sendMessage "peer-".$msg{'connect'} if exists $msg{'connect'}; }

  1. ______________________________________________________________________________________________________________________ #

sub processOutgoingEvents { my @msg = (); my $qs; # Build a query-string of the queued-events while (my ($k, $v) = each %events) { if ($v) { # adds to qs if not already processed (=0) push @msg, "$k=$v"; logAdd "sending event:$k = $v", 'main'; $events{$k} = 0; } } $qs = join '&', @msg; # If nextPeer defined, send events to next if (defined $nextPeer) { sendMessage $nextPeer, $qs if $activePeersChanged or $#msg >= 0; $activePeersChanged = 0; } # Also send queued changes directly to all local interfaces if ($#msg >= 0 ) { sendMessage $activePeers{$_}{'interface'}, $qs for grep exists $activePeers{$_}{'interface'}, keys %activePeers; } }

  1. Update potential peers from activePeers and sent comma-separated-list of peers
  2. - The list is sorted so that all peers can form a ring with their nextPeer's
  3. - called when $activePeers changed or new options arrive

sub updatePotentialPeers { my $addPeers = shift; my %peers = (); map $peers{$_} = 1, split /,/, $addPeers if defined $addPeers; $peers{$_} = 1 for @potentialPeers; $peers{$_} = 1 for getActivePeers; $peers{$thisPeer} = 1 if defined $thisPeer; @potentialPeers = map $_, sort grep /^peer/, keys %peers; logAdd " $_", 'UPP' for @potentialPeers; # update nextPeer (actually use previous cos can use -1 array index) return unless defined $thisPeer; my $this = undef; for (0..$#potentialPeers) {$this = $_ if $potentialPeers[$_] eq $thisPeer} $nextPeer = ($#potentialPeers>0 and defined $this) ? $potentialPeers[--$this] : undef; logAdd " Next:$nextPeer", 'UPP' if defined $nextPeer; }

sub getActivePeers { return map $activePeers{$_}{'peer'}, grep exists $activePeers{$_}{'peer'}, keys %activePeers; }

  1.  %$publicSpace
  2. - currently path is a unique guid representing the event
  3. - hash-ref: use $$publicSpace{ or %$publicSpace or $publicSpace->{
  4. - network-tree maintained by peer-network
  5. - uses XML::Simple for external-file and internal-hash of network

sub node { my $path = shift; my $cmd = shift; my $data = shift; my $node = leaf $path; # Execute the nodal command if ($cmd eq 'set') { # Set node value, (create path if doesn't exist) my $ptr = pathToHashRef $publicSpace, parent("$thisRoot$path"), 1; return if $$ptr{$node} eq $data; $$ptr{$node} = $data; } elsif ($cmd eq 'delete') { # Delete node or value, return 0 silently if node not found my $ptr = pathToHashRef $publicSpace, parent "$thisRoot$path"; return 0 unless defined $$ptr{$node}; delete $$ptr{$node}; } else { # Default is to return the node content or undef my $ptr = pathToHashRef $publicSpace, parent "$thisRoot$path"; return $$ptr{$node}; } }

  1. Use standard /-separated path-string to access hash-tree
  2. - returns undef if any non-hash-ref is encountered, unless,
  3. - $create is defined, then the path will be created as traversed

sub pathToHashRef { my $ptr = shift; my $path = shift; my $create = shift; for (split /\//, $path) { if (ref $$ptr{$_} eq 'HASH') {$ptr = $$ptr{$_}} elsif (defined $create) {$ptr = $$ptr{$_} = {}} else {return} } return $ptr; }

  1. Return parent path of passed path

sub parent { my $path = shift; $path =~ s/\/[^\/]+?$//; return $path; }

  1. Return parent path of passed path

sub leaf { my $path = shift; $path =~ s/^.*\///; return $path; }

  1. Generate a guid

sub guid { return 'p'.(rand()+time); }

  1. Add an item to the output log (and print if debugging)

sub logAdd { my $entry = shift; my $where = shift; $where = unless defined $where; return unless $where =~ /$logPattern/; $where = " $where" while length $where < 10; print localtime()." : $where : $entry\n"; }