Sic/Perl

From Organic Design wiki
Legacy.svg Legacy: This article describes a concept that has been superseded in the course of ongoing development on the Organic Design wiki. Please do not develop this any further or base work on this concept, this is only useful for a historic record of work done. You may find a link to the currently used concept or function in this article, if not you can contact the author to find out what has taken the place of this legacy item.
#!/usr/bin/perl -w
#
# Liscenced under LGPL: www.gnu.org/copyleft/lesser.html
#
# Nad - 2003 - SIC-Games network layer
#
use IO::Socket;
use IO::Select;
use Cwd;
use strict;

# Main Functions
sub sendMessage;
sub processMessage;
sub processOutgoingEvents;
sub updatePotentialPeers;
sub getActivePeers;
sub node;
sub pathToHashRef;
sub parent;
sub leaf;
sub guid;
sub logAdd;

use constant CREATE => '__<createGuid>__';

# NODAL CORE
# Nodes are hashes containing other properties, functionRefs and other hashRefs
# Nodes can contain array items called 'queue' which is a list of its currently executing nodes
# Each of the items in the queue are either a functionRef or a hashRef (to another node with its own queue)
# Queues are rings/circular because refs shift off the bottom and push onto the top
# executed functions return ref to function/hash which is to be pushed on to replace them (ie what they become)
# function execution takes place relative to the passed cwd (the currently executing node)
# one function in the tree will execute for every iteration of the outside loop
# the inside loop walks the hash struct rotating each queue until an actual function is executed

# Three test threads each of three sequencial functions
# - Each simply prints its name and returns a reference to the next in the thread
# - In reality the functions would be acting on their local environment (the global $cwd hashRef)
# - and what's returned can be not only a function, but another hashRef or void if its finished
our ($A,$B,$C, $P,$Q,$R, $X,$Y,$Z);
$A = sub {print "A"; return $B;};
$B = sub {print "B"; return $C;};
$C = sub {print "C"; return 0;};

$P = sub {print "P"; return $Q;};
$Q = sub {print "Q"; return $R;};
$R = sub {print "R"; return 0;};

$X = sub {print "X"; return $Y;};
$Y = sub {print "Y"; return $Z;};
$Z = sub {print "Z"; return 0;};

# The execution tree is set up with the root containing thread-ABC and another node which contains threads PQR & XYZ
# - So PQR and XYZ execute in parallel and as a whole they execute in parallel with ABC
my $someNode = {queue => [$P,$X]};
our $root = {queue => [$A,$someNode]};
our $now;
if (0) {
while (1) {
	$now = $root;
	for (my $i = 1; $i == 1;) {
		my $queue = $$now{'queue'};
		if ($#$queue >= 0) {
			my $todo = shift @$queue;
			push @$queue, $_ if $_ = ref $todo eq 'CODE' ? ($i = &$todo) : ($now = $todo);
			}
		else	{$i = 0}
		}
	}

exit;
}
# ______________________________________________________________________________________________________________________ #

# Determine if running on linux from leading slash in $cwd
our $cwd = cwd;
$cwd =~ s/\//\\/g unless $cwd =~ /^\//;
$cwd =~ s/[\\\/]$//g;

# Main shared data-structure
# - NOTE: No time to implement this in P2P, using basic event-queue - %events

our $publicSpace = \{};
our $thisRoot = "Sic";						# root in hash for this project/script

# The port that this event listens for incoming messages on
#our $thisPort = node '/private/this-port';
our $thisPort = 2012;

# We don't know thisPeer until our IP is known, but that is only known (currently) from incoming
# - defined on reception of incoming message which includes this IP in the 'to' key
# - added in 'from' key of outgoing message
# - format: peer-IP:Port or interface-Guid
our $thisPeer = undef;

# Stores actual current potentialPeers (as used by IO:Select)
# - indexed by fileno(handle) called $stream
# - activePeers are a hash because it stores many local peer properties
our %activePeers;
our $activePeersChanged = 0;

# All activePeers found by all so far
# - the principle is propagate what we know works, don't propagate what we know fails
# - potentialPeers is a list because it only knows basic connection info
our @potentialPeers = ();

# Next peer after us in potentialPeerList
# - another reason potenialPeerList is best as a list not a hash
# - Currently, all potentialPeers form a ring of communication
our $nextPeer = undef;

# Queue of unprocessed events
# - message content is always events
our %events;

# A unique guid is sent with our requestAccess so we don't talk to ourselves
# - we can use this to discover our own IP as we discover our IP on first incoming request
our $thisGuid = guid;

#our $logPattern = node('/private/log-pattern');
our $logPattern = '.*';

# Initialise server on listening $thisPort
our $server = new IO::Socket::INET Listen=>1, LocalPort=>$thisPort, Proto=>'tcp';
our $select = new IO::Select $server;
logAdd "================================================================================";

# Attempt to send connection message to each potentialPeer
# - we preceed them with "peer-" so they can be valid XML tags
# - version info is sent on new connection
#sendMessage "peer-$_" for split /,/, node '/private/potential-peers';
sleep(5);
sendMessage "peer-$_" for split /,/, '192.168.7.1:2012,192.168.7.2:2012,192.168.7.3:2012,192.168.7.4:2012';

# ______________________________________________________________________________________________________________________ #
# MAIN SERVER LOOP

while (1) {

	# Handle output potentialPeers?
	# - this may be needed if outgoing msg is larger than recipients' input-buffer
	#   for $handle ($select->can_write) {syswrite($fh, $obuffer);}

	# Handle input potentialPeers
	for my $handle ($select->can_read) {
		my $stream = fileno $handle;
		if ($handle == $server) {
			# NEW: handle is the server, set up a new peer
			# - but it can't be identified until its msg processed
			my $newhandle = $server->accept;
			$stream = fileno $newhandle;
			$select->add($newhandle);
			$activePeers{$stream}{'buffer'} = '';
			$activePeers{$stream}{'handle'} = $newhandle;
			logAdd "New connection: Stream$stream", 'main/new';
			}
		elsif (sysread $handle, my $input, 10000) {
			# DATA: handle is an existing stream with data to read (NOTE: we should disconnect after vertain size limit)
			# - Process (and remove) all complete (null-terminated) messages from this potentialPeers buffer
			$activePeers{$stream}{'buffer'} .= $input;
			if ($activePeers{$stream}{'buffer'} =~ s/^(.*\x00)//s)
				{processMessage $_, $handle for split /\x00/, $1}
			}
		else {
			# DEL: handle is an existing stream with no more data to read
			my $peer;
			if (defined $activePeers{$stream}{'interface'}) {
				# If its an interface disconnecting, create a removal event
				$peer = $activePeers{$stream}{'interface'};
				$events{&guid} = "now,interfaces.deleteInterface,$peer";
				}
			else {$peer = $activePeers{$stream}{'peer'}}
			# Remove from IO:Select, activePeers and PotentialPeers
			$select->remove($handle);
			delete $activePeers{$stream};
			$activePeersChanged = 1;
			@potentialPeers = grep {(index $_, $peer) < 0} @potentialPeers;
			$handle->close;
			logAdd "$peer disconnected.", 'main/del';
			}
		}
		
	# Send current events (and changes to activePeers) to potentialPeers
	updatePotentialPeers if $activePeersChanged;
	logAdd 'activePeersChanged', 'main' if $activePeersChanged;
	processOutgoingEvents;

	# Periodically write to cache.xml
	#XMLout($publicSpace, outputfile=>$cacheFile, keyattr=>[], xmldecl=>1, rootname=>'cache', noattr=>1);

	}
#END
# ______________________________________________________________________________________________________________________ #

# Encode and send message to a peer by IP:port
# - all message content is events (nodal commands)
# - if there recipient is not an activePeer, then a new connection is attempted
# - version info is sent with new connections incase our data is out-of-date
# - all our activePeers are sent with every message
# - format is md5(24chr),subject&content(mime64)\x00
sub sendMessage {
	my $peer = shift;
	my $content = shift;
	my $handle;
	logAdd "sendMessage($peer)", 'send';
	$content = '' unless defined $content;
	# Add peer info to outgoing message
	my $subject = "to=$peer&port=$thisPort&add=peer&guid=$thisGuid";
	$subject .= "&from=$thisPeer" if defined $thisPeer;
	# Add all our active-peers' names to outgoing message (not interfaces)
	my $peers = join ',', getActivePeers;
	$subject .= "&peers=$peers" if $peers;
	logAdd "   $_", 'send' for split /&/, $subject;
	# Get handle from peer name (this interface/peer thing is messy!)
	for (keys %activePeers) {
		my $p = exists $activePeers{$_}{'peer'} ? $activePeers{$_}{'peer'} : $activePeers{$_}{'interface'};
		$handle = $activePeers{$_}{'handle'} if $p eq $peer;
		}
	# If no handle found, try establishing a new stream
	unless (defined $handle) {
		$peer =~ /([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+$)/; # get rid of "peer-" bit
		$handle = IO::Socket::INET->new(PeerAddr=>$1, Proto=>'tcp');
		return logAdd "   Couldn't establish connection to $peer!", 'send' unless defined $handle;
		# Connected to new peer, add to local activePeers
		my $stream = fileno $handle;
		$activePeers{$stream}{'buffer'} = '';
		$activePeers{$stream}{'handle'} = $handle;
		$activePeers{$stream}{'peer'} = $peer;
		$activePeersChanged = 1;
		$select->add($handle);
		logAdd "   Stream$stream is connection to $peer", 'send';
		}
	# Send message
	print $handle "$subject|$content\0";
	}
# ______________________________________________________________________________________________________________________ #

# Decode and execute received message
# - executes received events (nodal commands)
# - replies with updates if local is more recent
# - Discovers own IP from incoming 'to' field if still unknown
# - Disconnects stream if it is us
# - Add sender to our activePeers
# - Add senders' activePeers to our potentialPeers
sub processMessage {
	my $data = shift;
	my $handle = shift;
	my $stream = fileno $handle;
	my %msg;
	my $peer;
	logAdd 'processMessage()', 'main';
	# Decode message
	(my $subject, $msg{'events'}) = split /[|]/, $data;
	$msg{$1} = /(.+)=(.*)/ ? $2 : /(.+)/ for split /&/, $subject;
	logAdd "   $_", 'recv' for split /&/, $subject;
	# Set self IP incase not known yet
	$thisPeer = $msg{'to'} unless defined $thisPeer;
	# Identify and add sender if 'add' field found
	if (exists $msg{'add'}) {
		my $add = $msg{'add'};
		if ($add eq 'interface') {
			# Sender is an interface, initiate an addInterface event for interfaces
			$peer = "interface$thisPeer-".$msg{'guid'};
			$peer =~ s/peer//;
			$events{&guid} = "now,interfaces.addInterface,$peer";
			}
		else {
			# Sender is a peer
			my ($iport, $iaddr) = sockaddr_in getpeername $handle;
			$peer = 'peer-'.inet_ntoa($iaddr).':'.$msg{'port'};
			}
		# If peer is self, disconnect and exit (before adding as activePeer)
		if ($msg{'guid'} eq $thisGuid) {
			logAdd "   Stream$stream identified as us, disconnecting...", 'recv';
			$select->remove($handle);
			delete $activePeers{$stream};
			return $handle->close;
			}
		# Add sender to our activePeers...
		unless (exists $activePeers{$stream}{$add}) {
			logAdd "   Stream$stream identified as $peer ($add)", 'recv';
			$activePeers{$stream}{$add} = $peer;
			if ($add eq 'peer') {
				$activePeersChanged = 1;
				# ...and senders' activePeers to our potentialPeers
				updatePotentialPeers $msg{'peers'} if exists $msg{'peers'};
				}
			}
		}
	# Update event queue with any new events (content) received (discarded if already seen)
	for (split /&/, $msg{'events'}) {
		/(.+)=(.*)$/;
		unless (exists $events{$1}) {
			$events{$1} = $2;
			#logAdd "new event received:$1 = $2", 'recv';
			}
		}	
	# Interface logging
	logAdd $msg{'log'}, 'SWF-'.uc(substr($activePeers{$stream}{'interface'}, -6)) if exists $msg{'log'};
	# Interface requests peer connection
	sendMessage "peer-".$msg{'connect'} if exists $msg{'connect'};
	}
	
# ______________________________________________________________________________________________________________________ #

sub processOutgoingEvents {
	my @msg = ();
	my $qs;
	# Build a query-string of the queued-events
	while (my ($k, $v) = each %events) {
		if ($v) { # adds to qs if not already processed (=0)
			push @msg, "$k=$v";
			logAdd "sending event:$k = $v", 'main';
			$events{$k} = 0;
			}
		}
	$qs = join '&', @msg;
	# If nextPeer defined, send events to next
	if (defined $nextPeer) {
		sendMessage $nextPeer, $qs if $activePeersChanged or $#msg >= 0;
		$activePeersChanged = 0;
		}
	# Also send queued changes directly to all local interfaces
	if ($#msg >= 0 ) {
		sendMessage $activePeers{$_}{'interface'}, $qs for grep exists $activePeers{$_}{'interface'}, keys %activePeers;
		}
	}

# Update potential peers from activePeers and sent comma-separated-list of peers
# - The list is sorted so that all peers can form a ring with their nextPeer's
# - called when $activePeers changed or new options arrive
sub updatePotentialPeers {
	my $addPeers = shift;
	my %peers = ();
	map $peers{$_} = 1, split /,/, $addPeers if defined $addPeers;
	$peers{$_} = 1 for @potentialPeers;
	$peers{$_} = 1 for getActivePeers;
	$peers{$thisPeer} = 1 if defined $thisPeer;
	@potentialPeers = map $_, sort grep /^peer/, keys %peers;
	logAdd "      $_", 'UPP' for @potentialPeers;
	# update nextPeer (actually use previous cos can use -1 array index)
	return unless defined $thisPeer;
	my $this = undef;
	for (0..$#potentialPeers) {$this = $_ if $potentialPeers[$_] eq $thisPeer}
	$nextPeer = ($#potentialPeers>0 and defined $this) ? $potentialPeers[--$this] : undef;
	logAdd "      Next:$nextPeer", 'UPP' if defined $nextPeer;
	}

sub getActivePeers {
	return map $activePeers{$_}{'peer'}, grep exists $activePeers{$_}{'peer'}, keys %activePeers;
	}

# %$publicSpace
# - currently path is a unique guid representing the event
# - hash-ref: use $$publicSpace{ or %$publicSpace or $publicSpace->{
# - network-tree maintained by peer-network
# - uses XML::Simple for external-file and internal-hash of network
sub node {
	my $path = shift;
	my $cmd = shift;
	my $data = shift;
	my $node = leaf $path;
	# Execute the nodal command
	if ($cmd eq 'set') {
		# Set node value, (create path if doesn't exist)
		my $ptr = pathToHashRef $publicSpace, parent("$thisRoot$path"), 1;
		return if $$ptr{$node} eq $data;
		$$ptr{$node} = $data;
		}
	elsif ($cmd eq 'delete') {
		# Delete node or value, return 0 silently if node not found
		my $ptr = pathToHashRef $publicSpace, parent "$thisRoot$path";
		return 0 unless defined $$ptr{$node};
		delete $$ptr{$node};
		}
	else {
		# Default is to return the node content or undef
		my $ptr = pathToHashRef $publicSpace, parent "$thisRoot$path";
		return $$ptr{$node};
		}
	}

# Use standard /-separated path-string to access hash-tree
# - returns undef if any non-hash-ref is encountered, unless,
# - $create is defined, then the path will be created as traversed
sub pathToHashRef {
	my $ptr = shift;
	my $path = shift;
	my $create = shift;
	for (split /\//, $path) {
		if (ref $$ptr{$_} eq 'HASH')	{$ptr = $$ptr{$_}}
		elsif (defined $create)			{$ptr = $$ptr{$_} = {}}
		else 									{return}
		}
	return $ptr;
	}

# Return parent path of passed path
sub parent {
	my $path = shift;
	$path =~ s/\/[^\/]+?$//;
	return $path;
	}
	
# Return parent path of passed path
sub leaf {
	my $path = shift;
	$path =~ s/^.*\///;
	return $path;
	}
	
# Generate a guid
sub guid {
	return 'p'.(rand()+time);
	}
	
# Add an item to the output log (and print if debugging)
sub logAdd {
	my $entry = shift;
	my $where = shift;
	$where = '' unless defined $where;
	return unless $where =~ /$logPattern/;
	$where = " $where" while length $where < 10;
	print localtime()." : $where : $entry\n";
	}