XML Socket.pl

From Organic Design wiki
Legacy.svg Legacy: This article describes a concept that has been superseded in the course of ongoing development on the Organic Design wiki. Please do not develop this any further or base work on this concept, this is only useful for a historic record of work done. You may find a link to the currently used concept or function in this article, if not you can contact the author to find out what has taken the place of this legacy item.
#!/usr/bin/perl -w
use IO::Socket;
use IO::Select;
use Cwd;
use strict;

# Main Functions
sub sendMessage;
sub processMessage;
sub processOutgoingEvents;
sub parent;
sub leaf;
sub guid;
sub logAdd;

# Determine if running on linux from leading slash in $cwd
our $cwd = cwd;
$cwd =~ s/\//\\/g unless $cwd =~ /^\//;
$cwd =~ s/[\\\/]$//g;

# The port that this event listens for incoming messages on
#our $thisPort = node '/private/this-port';
our $thisPort = 2012;

# We don't know thisPeer until our IP is known, but that is only known (currently) from incoming
# - defined on reception of incoming message which includes this IP in the 'to' key
# - added in 'from' key of outgoing message
# - format: peer-IP:Port or interface-Guid
our $thisPeer = undef;

# Stores actual current potentialPeers (as used by IO:Select)
# - indexed by fileno(handle) called $stream
# - activePeers are a hash because it stores many local peer properties
our %activePeers;
our $activePeersChanged = 0;

# All activePeers found by all so far
# - the principle is propagate what we know works, don't propagate what we know fails
# - potentialPeers is a list because it only knows basic connection info
our @potentialPeers = ();

# Next peer after us in potentialPeerList
# - another reason potenialPeerList is best as a list not a hash
# - Currently, all potentialPeers form a ring of communication
our $nextPeer = undef;

# A unique guid is sent with our requestAccess so we don't talk to ourselves
# - we can use this to discover our own IP as we discover our IP on first incoming request
our $thisGuid = guid;

#our $logPattern = node('/private/log-pattern');
our $logPattern = '.*';

# Initialise server on listening $thisPort
our $server = new IO::Socket::INET Listen=>1, LocalPort=>$thisPort, Proto=>'tcp';
our $select = new IO::Select $server;
logAdd "================================================================================";

# Attempt to send connection message to each potentialPeer
# - we preceed them with "peer-" so they can be valid XML tags
# - version info is sent on new connection
#sendMessage "peer-$_" for split /,/, node '/private/potential-peers';
sleep(5);
sendMessage "peer-$_" for split /,/, '192.168.7.1:2012,192.168.7.2:2012,192.168.7.3:2012,192.168.7.4:2012';

# ______________________________________________________________________________________________________________________ #
# MAIN SERVER LOOP

while (1) {

	# Handle output potentialPeers?
	# - this may be needed if outgoing msg is larger than recipients' input-buffer
	#   for $handle ($select->can_write) {syswrite($fh, $obuffer);}

	# Handle input potentialPeers
	for my $handle ($select->can_read) {
		my $stream = fileno $handle;
		if ($handle == $server) {
			# NEW: handle is the server, set up a new peer
			# - but it can't be identified until its msg processed
			my $newhandle = $server->accept;
			$stream = fileno $newhandle;
			$select->add($newhandle);
			$activePeers{$stream}{'buffer'} = '';
			$activePeers{$stream}{'handle'} = $newhandle;
			logAdd "New connection: Stream$stream", 'main/new';
			}
		elsif (sysread $handle, my $input, 10000) {
			# DATA: handle is an existing stream with data to read (NOTE: we should disconnect after vertain size limit)
			# - Process (and remove) all complete (null-terminated) messages from this potentialPeers buffer
			$activePeers{$stream}{'buffer'} .= $input;
			if ($activePeers{$stream}{'buffer'} =~ s/^(.*\x00)//s)
				{processMessage $_, $handle for split /\x00/, $1}
			}
		else {
			# DEL: handle is an existing stream with no more data to read
			my $peer;
			if (defined $activePeers{$stream}{'interface'}) {
				# If its an interface disconnecting, create a removal event
				$peer = $activePeers{$stream}{'interface'};
				$events{&guid} = "now,interfaces.deleteInterface,$peer";
				}
			else {$peer = $activePeers{$stream}{'peer'}}
			# Remove from IO:Select, activePeers and PotentialPeers
			$select->remove($handle);
			delete $activePeers{$stream};
			$activePeersChanged = 1;
			@potentialPeers = grep {(index $_, $peer) < 0} @potentialPeers;
			$handle->close;
			logAdd "$peer disconnected.", 'main/del';
			}
		}
		
	# Send current events (and changes to activePeers) to potentialPeers
	updatePotentialPeers if $activePeersChanged;
	logAdd 'activePeersChanged', 'main' if $activePeersChanged;
	processOutgoingEvents;

	# Periodically write to cache.xml
	#XMLout($publicSpace, outputfile=>$cacheFile, keyattr=>[], xmldecl=>1, rootname=>'cache', noattr=>1);

	}
#END
# ______________________________________________________________________________________________________________________ #

# Encode and send message to a peer by IP:port
# - all message content is events (nodal commands)
# - if there recipient is not an activePeer, then a new connection is attempted
# - version info is sent with new connections incase our data is out-of-date
# - all our activePeers are sent with every message
# - format is md5(24chr),subject&content(mime64)\x00
sub sendMessage {
	my $peer = shift;
	my $content = shift;
	my $handle;
	logAdd "sendMessage($peer)", 'send';
	$content = '' unless defined $content;
	# Add peer info to outgoing message
	my $subject = "to=$peer&port=$thisPort&add=peer&guid=$thisGuid";
	$subject .= "&from=$thisPeer" if defined $thisPeer;
	# Add all our active-peers' names to outgoing message (not interfaces)
	my $peers = join ',', getActivePeers;
	$subject .= "&peers=$peers" if $peers;
	logAdd "   $_", 'send' for split /&/, $subject;
	# Get handle from peer name (this interface/peer thing is messy!)
	for (keys %activePeers) {
		my $p = exists $activePeers{$_}{'peer'} ? $activePeers{$_}{'peer'} : $activePeers{$_}{'interface'};
		$handle = $activePeers{$_}{'handle'} if $p eq $peer;
		}
	# If no handle found, try establishing a new stream
	unless (defined $handle) {
		$peer =~ /([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+$)/; # get rid of "peer-" bit
		$handle = IO::Socket::INET->new(PeerAddr=>$1, Proto=>'tcp');
		return logAdd "   Couldn't establish connection to $peer!", 'send' unless defined $handle;
		# Connected to new peer, add to local activePeers
		my $stream = fileno $handle;
		$activePeers{$stream}{'buffer'} = '';
		$activePeers{$stream}{'handle'} = $handle;
		$activePeers{$stream}{'peer'} = $peer;
		$activePeersChanged = 1;
		$select->add($handle);
		logAdd "   Stream$stream is connection to $peer", 'send';
		}
	# Send message
	print $handle "$subject|$content\0";
	}
# ______________________________________________________________________________________________________________________ #

# Decode and execute received message
# - executes received events (nodal commands)
# - replies with updates if local is more recent
# - Discovers own IP from incoming 'to' field if still unknown
# - Disconnects stream if it is us
# - Add sender to our activePeers
# - Add senders' activePeers to our potentialPeers
sub processMessage {
	my $data = shift;
	my $handle = shift;
	my $stream = fileno $handle;
	my %msg;
	my $peer;
	logAdd 'processMessage()', 'main';
	# Decode message
	(my $subject, $msg{'events'}) = split /[|]/, $data;
	$msg{$1} = /(.+)=(.*)/ ? $2 : /(.+)/ for split /&/, $subject;
	logAdd "   $_", 'recv' for split /&/, $subject;
	# Set self IP incase not known yet
	$thisPeer = $msg{'to'} unless defined $thisPeer;
	# Identify and add sender if 'add' field found
	if (exists $msg{'add'}) {
		my $add = $msg{'add'};
		if ($add eq 'interface') {
			# Sender is an interface, initiate an addInterface event for interfaces
			$peer = "interface$thisPeer-".$msg{'guid'};
			$peer =~ s/peer//;
			$events{&guid} = "now,interfaces.addInterface,$peer";
			}
		else {
			# Sender is a peer
			my ($iport, $iaddr) = sockaddr_in getpeername $handle;
			$peer = 'peer-'.inet_ntoa($iaddr).':'.$msg{'port'};
			}
		# If peer is self, disconnect and exit (before adding as activePeer)
		if ($msg{'guid'} eq $thisGuid) {
			logAdd "   Stream$stream identified as us, disconnecting...", 'recv';
			$select->remove($handle);
			delete $activePeers{$stream};
			return $handle->close;
			}
		# Add sender to our activePeers...
		unless (exists $activePeers{$stream}{$add}) {
			logAdd "   Stream$stream identified as $peer ($add)", 'recv';
			$activePeers{$stream}{$add} = $peer;
			if ($add eq 'peer') {
				$activePeersChanged = 1;
				# ...and senders' activePeers to our potentialPeers
				updatePotentialPeers $msg{'peers'} if exists $msg{'peers'};
				}
			}
		}
	# Update event queue with any new events (content) received (discarded if already seen)
	for (split /&/, $msg{'events'}) {
		/(.+)=(.*)$/;
		unless (exists $events{$1}) {
			$events{$1} = $2;
			#logAdd "new event received:$1 = $2", 'recv';
			}
		}	
	# Interface logging
	logAdd $msg{'log'}, 'SWF-'.uc(substr($activePeers{$stream}{'interface'}, -6)) if exists $msg{'log'};
	# Interface requests peer connection
	sendMessage "peer-".$msg{'connect'} if exists $msg{'connect'};
	}
	
# ______________________________________________________________________________________________________________________ #

sub processOutgoingEvents {
	my @msg = ();
	my $qs;
	# Build a query-string of the queued-events
	while (my ($k, $v) = each %events) {
		if ($v) { # adds to qs if not already processed (=0)
			push @msg, "$k=$v";
			logAdd "sending event:$k = $v", 'main';
			$events{$k} = 0;
			}
		}
	$qs = join '&', @msg;
	# If nextPeer defined, send events to next
	if (defined $nextPeer) {
		sendMessage $nextPeer, $qs if $activePeersChanged or $#msg >= 0;
		$activePeersChanged = 0;
		}
	# Also send queued changes directly to all local interfaces
	if ($#msg >= 0 ) {
		sendMessage $activePeers{$_}{'interface'}, $qs for grep exists $activePeers{$_}{'interface'}, keys %activePeers;
		}
	}

# Update potential peers from activePeers and sent comma-separated-list of peers
# - The list is sorted so that all peers can form a ring with their nextPeer's
# - called when $activePeers changed or new options arrive
sub updatePotentialPeers {
	my $addPeers = shift;
	my %peers = ();
	map $peers{$_} = 1, split /,/, $addPeers if defined $addPeers;
	$peers{$_} = 1 for @potentialPeers;
	$peers{$_} = 1 for getActivePeers;
	$peers{$thisPeer} = 1 if defined $thisPeer;
	@potentialPeers = map $_, sort grep /^peer/, keys %peers;
	logAdd "      $_", 'UPP' for @potentialPeers;
	# update nextPeer (actually use previous cos can use -1 array index)
	return unless defined $thisPeer;
	my $this = undef;
	for (0..$#potentialPeers) {$this = $_ if $potentialPeers[$_] eq $thisPeer}
	$nextPeer = ($#potentialPeers>0 and defined $this) ? $potentialPeers[--$this] : undef;
	logAdd "      Next:$nextPeer", 'UPP' if defined $nextPeer;
	}

sub getActivePeers {
	return map $activePeers{$_}{'peer'}, grep exists $activePeers{$_}{'peer'}, keys %activePeers;
	}

# Generate a guid
sub guid {
	return 'p'.(rand()+time);
	}
	
# Add an item to the output log (and print if debugging)
sub logAdd {
	my $entry = shift;
	my $where = shift;
	$where = '' unless defined $where;
	return unless $where =~ /$logPattern/;
	$where = " $where" while length $where < 10;
	print localtime()." : $where : $entry\n";
	}