Difference between revisions of "Sic/Perl"
(Wikifying old code) |
m (Caretaker: typo) |
||
Line 80: | Line 80: | ||
# Main shared data-structure | # Main shared data-structure | ||
− | # - NOTE: No time to | + | # - NOTE: No time to implement this in P2P, using basic event-queue - %events |
our $publicSpace = \{}; | our $publicSpace = \{}; |
Revision as of 17:57, 28 October 2006
- !/usr/bin/perl -w
- Liscenced under LGPL: www.gnu.org/copyleft/lesser.html
- Nad - 2003 - SIC-Games network layer
use IO::Socket; use IO::Select; use Cwd; use strict;
- Main Functions
sub sendMessage; sub processMessage; sub processOutgoingEvents; sub updatePotentialPeers; sub getActivePeers; sub node; sub pathToHashRef; sub parent; sub leaf; sub guid; sub logAdd;
use constant CREATE => '__<createGuid>__';
- NODAL CORE
- Nodes are hashes containing other properties, functionRefs and other hashRefs
- Nodes can contain array items called 'queue' which is a list of its currently executing nodes
- Each of the items in the queue are either a functionRef or a hashRef (to another node with its own queue)
- Queues are rings/circular because refs shift off the bottom and push onto the top
- executed functions return ref to function/hash which is to be pushed on to replace them (ie what they become)
- function execution takes place relative to the passed cwd (the currently executing node)
- one function in the tree will execute for every iteration of the outside loop
- the inside loop walks the hash struct rotating each queue until an actual function is executed
- Three test threads each of three sequencial functions
- - Each simply prints its name and returns a reference to the next in the thread
- - In reality the functions would be acting on their local environment (the global $cwd hashRef)
- - and what's returned can be not only a function, but another hashRef or void if its finished
our ($A,$B,$C, $P,$Q,$R, $X,$Y,$Z); $A = sub {print "A"; return $B;}; $B = sub {print "B"; return $C;}; $C = sub {print "C"; return 0;};
$P = sub {print "P"; return $Q;}; $Q = sub {print "Q"; return $R;}; $R = sub {print "R"; return 0;};
$X = sub {print "X"; return $Y;}; $Y = sub {print "Y"; return $Z;}; $Z = sub {print "Z"; return 0;};
- The execution tree is set up with the root containing thread-ABC and another node which contains threads PQR & XYZ
- - So PQR and XYZ execute in parallel and as a whole they execute in parallel with ABC
my $someNode = {queue => [$P,$X]}; our $root = {queue => [$A,$someNode]}; our $now; if (0) { while (1) { $now = $root; for (my $i = 1; $i == 1;) { my $queue = $$now{'queue'}; if ($#$queue >= 0) { my $todo = shift @$queue; push @$queue, $_ if $_ = ref $todo eq 'CODE' ? ($i = &$todo) : ($now = $todo); } else {$i = 0} } }
exit; }
- ______________________________________________________________________________________________________________________ #
- Determine if running on linux from leading slash in $cwd
our $cwd = cwd; $cwd =~ s/\//\\/g unless $cwd =~ /^\//; $cwd =~ s/[\\\/]$//g;
- Main shared data-structure
- - NOTE: No time to implement this in P2P, using basic event-queue - %events
our $publicSpace = \{}; our $thisRoot = "Sic"; # root in hash for this project/script
- The port that this event listens for incoming messages on
- our $thisPort = node '/private/this-port';
our $thisPort = 2012;
- We don't know thisPeer until our IP is known, but that is only known (currently) from incoming
- - defined on reception of incoming message which includes this IP in the 'to' key
- - added in 'from' key of outgoing message
- - format: peer-IP:Port or interface-Guid
our $thisPeer = undef;
- Stores actual current potentialPeers (as used by IO:Select)
- - indexed by fileno(handle) called $stream
- - activePeers are a hash because it stores many local peer properties
our %activePeers; our $activePeersChanged = 0;
- All activePeers found by all so far
- - the principle is propagate what we know works, don't propagate what we know fails
- - potentialPeers is a list because it only knows basic connection info
our @potentialPeers = ();
- Next peer after us in potentialPeerList
- - another reason potenialPeerList is best as a list not a hash
- - Currently, all potentialPeers form a ring of communication
our $nextPeer = undef;
- Queue of unprocessed events
- - message content is always events
our %events;
- A unique guid is sent with our requestAccess so we don't talk to ourselves
- - we can use this to discover our own IP as we discover our IP on first incoming request
our $thisGuid = guid;
- our $logPattern = node('/private/log-pattern');
our $logPattern = '.*';
- Initialise server on listening $thisPort
our $server = new IO::Socket::INET Listen=>1, LocalPort=>$thisPort, Proto=>'tcp'; our $select = new IO::Select $server; logAdd "================================================================================";
- Attempt to send connection message to each potentialPeer
- - we preceed them with "peer-" so they can be valid XML tags
- - version info is sent on new connection
- sendMessage "peer-$_" for split /,/, node '/private/potential-peers';
sleep(5); sendMessage "peer-$_" for split /,/, '192.168.7.1:2012,192.168.7.2:2012,192.168.7.3:2012,192.168.7.4:2012';
- ______________________________________________________________________________________________________________________ #
- MAIN SERVER LOOP
while (1) {
# Handle output potentialPeers? # - this may be needed if outgoing msg is larger than recipients' input-buffer # for $handle ($select->can_write) {syswrite($fh, $obuffer);}
# Handle input potentialPeers for my $handle ($select->can_read) { my $stream = fileno $handle; if ($handle == $server) { # NEW: handle is the server, set up a new peer # - but it can't be identified until its msg processed my $newhandle = $server->accept; $stream = fileno $newhandle; $select->add($newhandle); $activePeers{$stream}{'buffer'} = ; $activePeers{$stream}{'handle'} = $newhandle; logAdd "New connection: Stream$stream", 'main/new'; } elsif (sysread $handle, my $input, 10000) { # DATA: handle is an existing stream with data to read (NOTE: we should disconnect after vertain size limit) # - Process (and remove) all complete (null-terminated) messages from this potentialPeers buffer $activePeers{$stream}{'buffer'} .= $input; if ($activePeers{$stream}{'buffer'} =~ s/^(.*\x00)//s) {processMessage $_, $handle for split /\x00/, $1} } else { # DEL: handle is an existing stream with no more data to read my $peer; if (defined $activePeers{$stream}{'interface'}) { # If its an interface disconnecting, create a removal event $peer = $activePeers{$stream}{'interface'}; $events{&guid} = "now,interfaces.deleteInterface,$peer"; } else {$peer = $activePeers{$stream}{'peer'}} # Remove from IO:Select, activePeers and PotentialPeers $select->remove($handle); delete $activePeers{$stream}; $activePeersChanged = 1; @potentialPeers = grep {(index $_, $peer) < 0} @potentialPeers; $handle->close; logAdd "$peer disconnected.", 'main/del'; } }
# Send current events (and changes to activePeers) to potentialPeers updatePotentialPeers if $activePeersChanged; logAdd 'activePeersChanged', 'main' if $activePeersChanged; processOutgoingEvents;
# Periodically write to cache.xml #XMLout($publicSpace, outputfile=>$cacheFile, keyattr=>[], xmldecl=>1, rootname=>'cache', noattr=>1);
}
- END
- ______________________________________________________________________________________________________________________ #
- Encode and send message to a peer by IP:port
- - all message content is events (nodal commands)
- - if there recipient is not an activePeer, then a new connection is attempted
- - version info is sent with new connections incase our data is out-of-date
- - all our activePeers are sent with every message
- - format is md5(24chr),subject&content(mime64)\x00
sub sendMessage { my $peer = shift; my $content = shift; my $handle; logAdd "sendMessage($peer)", 'send'; $content = unless defined $content; # Add peer info to outgoing message my $subject = "to=$peer&port=$thisPort&add=peer&guid=$thisGuid"; $subject .= "&from=$thisPeer" if defined $thisPeer; # Add all our active-peers' names to outgoing message (not interfaces) my $peers = join ',', getActivePeers; $subject .= "&peers=$peers" if $peers; logAdd " $_", 'send' for split /&/, $subject; # Get handle from peer name (this interface/peer thing is messy!) for (keys %activePeers) { my $p = exists $activePeers{$_}{'peer'} ? $activePeers{$_}{'peer'} : $activePeers{$_}{'interface'}; $handle = $activePeers{$_}{'handle'} if $p eq $peer; } # If no handle found, try establishing a new stream unless (defined $handle) { $peer =~ /([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+$)/; # get rid of "peer-" bit $handle = IO::Socket::INET->new(PeerAddr=>$1, Proto=>'tcp'); return logAdd " Couldn't establish connection to $peer!", 'send' unless defined $handle; # Connected to new peer, add to local activePeers my $stream = fileno $handle; $activePeers{$stream}{'buffer'} = ; $activePeers{$stream}{'handle'} = $handle; $activePeers{$stream}{'peer'} = $peer; $activePeersChanged = 1; $select->add($handle); logAdd " Stream$stream is connection to $peer", 'send'; } # Send message print $handle "$subject|$content\0"; }
- ______________________________________________________________________________________________________________________ #
- Decode and execute received message
- - executes received events (nodal commands)
- - replies with updates if local is more recent
- - Discovers own IP from incoming 'to' field if still unknown
- - Disconnects stream if it is us
- - Add sender to our activePeers
- - Add senders' activePeers to our potentialPeers
sub processMessage { my $data = shift; my $handle = shift; my $stream = fileno $handle; my %msg; my $peer; logAdd 'processMessage()', 'main'; # Decode message (my $subject, $msg{'events'}) = split /[|]/, $data; $msg{$1} = /(.+)=(.*)/ ? $2 : /(.+)/ for split /&/, $subject; logAdd " $_", 'recv' for split /&/, $subject; # Set self IP incase not known yet $thisPeer = $msg{'to'} unless defined $thisPeer; # Identify and add sender if 'add' field found if (exists $msg{'add'}) { my $add = $msg{'add'}; if ($add eq 'interface') { # Sender is an interface, initiate an addInterface event for interfaces $peer = "interface$thisPeer-".$msg{'guid'}; $peer =~ s/peer//; $events{&guid} = "now,interfaces.addInterface,$peer"; } else { # Sender is a peer my ($iport, $iaddr) = sockaddr_in getpeername $handle; $peer = 'peer-'.inet_ntoa($iaddr).':'.$msg{'port'}; } # If peer is self, disconnect and exit (before adding as activePeer) if ($msg{'guid'} eq $thisGuid) { logAdd " Stream$stream identified as us, disconnecting...", 'recv'; $select->remove($handle); delete $activePeers{$stream}; return $handle->close; } # Add sender to our activePeers... unless (exists $activePeers{$stream}{$add}) { logAdd " Stream$stream identified as $peer ($add)", 'recv'; $activePeers{$stream}{$add} = $peer; if ($add eq 'peer') { $activePeersChanged = 1; # ...and senders' activePeers to our potentialPeers updatePotentialPeers $msg{'peers'} if exists $msg{'peers'}; } } } # Update event queue with any new events (content) received (discarded if already seen) for (split /&/, $msg{'events'}) { /(.+)=(.*)$/; unless (exists $events{$1}) { $events{$1} = $2; #logAdd "new event received:$1 = $2", 'recv'; } } # Interface logging logAdd $msg{'log'}, 'SWF-'.uc(substr($activePeers{$stream}{'interface'}, -6)) if exists $msg{'log'}; # Interface requests peer connection sendMessage "peer-".$msg{'connect'} if exists $msg{'connect'}; }
- ______________________________________________________________________________________________________________________ #
sub processOutgoingEvents { my @msg = (); my $qs; # Build a query-string of the queued-events while (my ($k, $v) = each %events) { if ($v) { # adds to qs if not already processed (=0) push @msg, "$k=$v"; logAdd "sending event:$k = $v", 'main'; $events{$k} = 0; } } $qs = join '&', @msg; # If nextPeer defined, send events to next if (defined $nextPeer) { sendMessage $nextPeer, $qs if $activePeersChanged or $#msg >= 0; $activePeersChanged = 0; } # Also send queued changes directly to all local interfaces if ($#msg >= 0 ) { sendMessage $activePeers{$_}{'interface'}, $qs for grep exists $activePeers{$_}{'interface'}, keys %activePeers; } }
- Update potential peers from activePeers and sent comma-separated-list of peers
- - The list is sorted so that all peers can form a ring with their nextPeer's
- - called when $activePeers changed or new options arrive
sub updatePotentialPeers { my $addPeers = shift; my %peers = (); map $peers{$_} = 1, split /,/, $addPeers if defined $addPeers; $peers{$_} = 1 for @potentialPeers; $peers{$_} = 1 for getActivePeers; $peers{$thisPeer} = 1 if defined $thisPeer; @potentialPeers = map $_, sort grep /^peer/, keys %peers; logAdd " $_", 'UPP' for @potentialPeers; # update nextPeer (actually use previous cos can use -1 array index) return unless defined $thisPeer; my $this = undef; for (0..$#potentialPeers) {$this = $_ if $potentialPeers[$_] eq $thisPeer} $nextPeer = ($#potentialPeers>0 and defined $this) ? $potentialPeers[--$this] : undef; logAdd " Next:$nextPeer", 'UPP' if defined $nextPeer; }
sub getActivePeers { return map $activePeers{$_}{'peer'}, grep exists $activePeers{$_}{'peer'}, keys %activePeers; }
- %$publicSpace
- - currently path is a unique guid representing the event
- - hash-ref: use $$publicSpace{ or %$publicSpace or $publicSpace->{
- - network-tree maintained by peer-network
- - uses XML::Simple for external-file and internal-hash of network
sub node { my $path = shift; my $cmd = shift; my $data = shift; my $node = leaf $path; # Execute the nodal command if ($cmd eq 'set') { # Set node value, (create path if doesn't exist) my $ptr = pathToHashRef $publicSpace, parent("$thisRoot$path"), 1; return if $$ptr{$node} eq $data; $$ptr{$node} = $data; } elsif ($cmd eq 'delete') { # Delete node or value, return 0 silently if node not found my $ptr = pathToHashRef $publicSpace, parent "$thisRoot$path"; return 0 unless defined $$ptr{$node}; delete $$ptr{$node}; } else { # Default is to return the node content or undef my $ptr = pathToHashRef $publicSpace, parent "$thisRoot$path"; return $$ptr{$node}; } }
- Use standard /-separated path-string to access hash-tree
- - returns undef if any non-hash-ref is encountered, unless,
- - $create is defined, then the path will be created as traversed
sub pathToHashRef { my $ptr = shift; my $path = shift; my $create = shift; for (split /\//, $path) { if (ref $$ptr{$_} eq 'HASH') {$ptr = $$ptr{$_}} elsif (defined $create) {$ptr = $$ptr{$_} = {}} else {return} } return $ptr; }
- Return parent path of passed path
sub parent { my $path = shift; $path =~ s/\/[^\/]+?$//; return $path; }
- Return parent path of passed path
sub leaf { my $path = shift; $path =~ s/^.*\///; return $path; }
- Generate a guid
sub guid { return 'p'.(rand()+time); }
- Add an item to the output log (and print if debugging)
sub logAdd { my $entry = shift; my $where = shift; $where = unless defined $where; return unless $where =~ /$logPattern/; $where = " $where" while length $where < 10; print localtime()." : $where : $entry\n"; }