Difference between revisions of "Sic/Perl"

From Organic Design wiki
m ({{perl}})
m
Line 1: Line 1:
 +
<source lang="perl">
 
#!/usr/bin/perl -w
 
#!/usr/bin/perl -w
# {{perl}}
+
#
 
# Liscenced under LGPL: www.gnu.org/copyleft/lesser.html
 
# Liscenced under LGPL: www.gnu.org/copyleft/lesser.html
 
#
 
#
Line 425: Line 426:
 
print localtime()." : $where : $entry\n";
 
print localtime()." : $where : $entry\n";
 
}
 
}
 +
</source>
 +
[[Category:PERL]]

Revision as of 02:53, 23 April 2020

#!/usr/bin/perl -w
#
# Liscenced under LGPL: www.gnu.org/copyleft/lesser.html
#
# Nad - 2003 - SIC-Games network layer
#
use IO::Socket;
use IO::Select;
use Cwd;
use strict;

# Main Functions
sub sendMessage;
sub processMessage;
sub processOutgoingEvents;
sub updatePotentialPeers;
sub getActivePeers;
sub node;
sub pathToHashRef;
sub parent;
sub leaf;
sub guid;
sub logAdd;

use constant CREATE => '__<createGuid>__';

# NODAL CORE
# Nodes are hashes containing other properties, functionRefs and other hashRefs
# Nodes can contain array items called 'queue' which is a list of its currently executing nodes
# Each of the items in the queue are either a functionRef or a hashRef (to another node with its own queue)
# Queues are rings/circular because refs shift off the bottom and push onto the top
# executed functions return ref to function/hash which is to be pushed on to replace them (ie what they become)
# function execution takes place relative to the passed cwd (the currently executing node)
# one function in the tree will execute for every iteration of the outside loop
# the inside loop walks the hash struct rotating each queue until an actual function is executed

# Three test threads each of three sequencial functions
# - Each simply prints its name and returns a reference to the next in the thread
# - In reality the functions would be acting on their local environment (the global $cwd hashRef)
# - and what's returned can be not only a function, but another hashRef or void if its finished
our ($A,$B,$C, $P,$Q,$R, $X,$Y,$Z);
$A = sub {print "A"; return $B;};
$B = sub {print "B"; return $C;};
$C = sub {print "C"; return 0;};

$P = sub {print "P"; return $Q;};
$Q = sub {print "Q"; return $R;};
$R = sub {print "R"; return 0;};

$X = sub {print "X"; return $Y;};
$Y = sub {print "Y"; return $Z;};
$Z = sub {print "Z"; return 0;};

# The execution tree is set up with the root containing thread-ABC and another node which contains threads PQR & XYZ
# - So PQR and XYZ execute in parallel and as a whole they execute in parallel with ABC
my $someNode = {queue => [$P,$X]};
our $root = {queue => [$A,$someNode]};
our $now;
if (0) {
while (1) {
	$now = $root;
	for (my $i = 1; $i == 1;) {
		my $queue = $$now{'queue'};
		if ($#$queue >= 0) {
			my $todo = shift @$queue;
			push @$queue, $_ if $_ = ref $todo eq 'CODE' ? ($i = &$todo) : ($now = $todo);
			}
		else	{$i = 0}
		}
	}

exit;
}
# ______________________________________________________________________________________________________________________ #

# Determine if running on linux from leading slash in $cwd
our $cwd = cwd;
$cwd =~ s/\//\\/g unless $cwd =~ /^\//;
$cwd =~ s/[\\\/]$//g;

# Main shared data-structure
# - NOTE: No time to implement this in P2P, using basic event-queue - %events

our $publicSpace = \{};
our $thisRoot = "Sic";						# root in hash for this project/script

# The port that this event listens for incoming messages on
#our $thisPort = node '/private/this-port';
our $thisPort = 2012;

# We don't know thisPeer until our IP is known, but that is only known (currently) from incoming
# - defined on reception of incoming message which includes this IP in the 'to' key
# - added in 'from' key of outgoing message
# - format: peer-IP:Port or interface-Guid
our $thisPeer = undef;

# Stores actual current potentialPeers (as used by IO:Select)
# - indexed by fileno(handle) called $stream
# - activePeers are a hash because it stores many local peer properties
our %activePeers;
our $activePeersChanged = 0;

# All activePeers found by all so far
# - the principle is propagate what we know works, don't propagate what we know fails
# - potentialPeers is a list because it only knows basic connection info
our @potentialPeers = ();

# Next peer after us in potentialPeerList
# - another reason potenialPeerList is best as a list not a hash
# - Currently, all potentialPeers form a ring of communication
our $nextPeer = undef;

# Queue of unprocessed events
# - message content is always events
our %events;

# A unique guid is sent with our requestAccess so we don't talk to ourselves
# - we can use this to discover our own IP as we discover our IP on first incoming request
our $thisGuid = guid;

#our $logPattern = node('/private/log-pattern');
our $logPattern = '.*';

# Initialise server on listening $thisPort
our $server = new IO::Socket::INET Listen=>1, LocalPort=>$thisPort, Proto=>'tcp';
our $select = new IO::Select $server;
logAdd "================================================================================";

# Attempt to send connection message to each potentialPeer
# - we preceed them with "peer-" so they can be valid XML tags
# - version info is sent on new connection
#sendMessage "peer-$_" for split /,/, node '/private/potential-peers';
sleep(5);
sendMessage "peer-$_" for split /,/, '192.168.7.1:2012,192.168.7.2:2012,192.168.7.3:2012,192.168.7.4:2012';

# ______________________________________________________________________________________________________________________ #
# MAIN SERVER LOOP

while (1) {

	# Handle output potentialPeers?
	# - this may be needed if outgoing msg is larger than recipients' input-buffer
	#   for $handle ($select->can_write) {syswrite($fh, $obuffer);}

	# Handle input potentialPeers
	for my $handle ($select->can_read) {
		my $stream = fileno $handle;
		if ($handle == $server) {
			# NEW: handle is the server, set up a new peer
			# - but it can't be identified until its msg processed
			my $newhandle = $server->accept;
			$stream = fileno $newhandle;
			$select->add($newhandle);
			$activePeers{$stream}{'buffer'} = '';
			$activePeers{$stream}{'handle'} = $newhandle;
			logAdd "New connection: Stream$stream", 'main/new';
			}
		elsif (sysread $handle, my $input, 10000) {
			# DATA: handle is an existing stream with data to read (NOTE: we should disconnect after vertain size limit)
			# - Process (and remove) all complete (null-terminated) messages from this potentialPeers buffer
			$activePeers{$stream}{'buffer'} .= $input;
			if ($activePeers{$stream}{'buffer'} =~ s/^(.*\x00)//s)
				{processMessage $_, $handle for split /\x00/, $1}
			}
		else {
			# DEL: handle is an existing stream with no more data to read
			my $peer;
			if (defined $activePeers{$stream}{'interface'}) {
				# If its an interface disconnecting, create a removal event
				$peer = $activePeers{$stream}{'interface'};
				$events{&guid} = "now,interfaces.deleteInterface,$peer";
				}
			else {$peer = $activePeers{$stream}{'peer'}}
			# Remove from IO:Select, activePeers and PotentialPeers
			$select->remove($handle);
			delete $activePeers{$stream};
			$activePeersChanged = 1;
			@potentialPeers = grep {(index $_, $peer) < 0} @potentialPeers;
			$handle->close;
			logAdd "$peer disconnected.", 'main/del';
			}
		}
		
	# Send current events (and changes to activePeers) to potentialPeers
	updatePotentialPeers if $activePeersChanged;
	logAdd 'activePeersChanged', 'main' if $activePeersChanged;
	processOutgoingEvents;

	# Periodically write to cache.xml
	#XMLout($publicSpace, outputfile=>$cacheFile, keyattr=>[], xmldecl=>1, rootname=>'cache', noattr=>1);

	}
#END
# ______________________________________________________________________________________________________________________ #

# Encode and send message to a peer by IP:port
# - all message content is events (nodal commands)
# - if there recipient is not an activePeer, then a new connection is attempted
# - version info is sent with new connections incase our data is out-of-date
# - all our activePeers are sent with every message
# - format is md5(24chr),subject&content(mime64)\x00
sub sendMessage {
	my $peer = shift;
	my $content = shift;
	my $handle;
	logAdd "sendMessage($peer)", 'send';
	$content = '' unless defined $content;
	# Add peer info to outgoing message
	my $subject = "to=$peer&port=$thisPort&add=peer&guid=$thisGuid";
	$subject .= "&from=$thisPeer" if defined $thisPeer;
	# Add all our active-peers' names to outgoing message (not interfaces)
	my $peers = join ',', getActivePeers;
	$subject .= "&peers=$peers" if $peers;
	logAdd "   $_", 'send' for split /&/, $subject;
	# Get handle from peer name (this interface/peer thing is messy!)
	for (keys %activePeers) {
		my $p = exists $activePeers{$_}{'peer'} ? $activePeers{$_}{'peer'} : $activePeers{$_}{'interface'};
		$handle = $activePeers{$_}{'handle'} if $p eq $peer;
		}
	# If no handle found, try establishing a new stream
	unless (defined $handle) {
		$peer =~ /([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+$)/; # get rid of "peer-" bit
		$handle = IO::Socket::INET->new(PeerAddr=>$1, Proto=>'tcp');
		return logAdd "   Couldn't establish connection to $peer!", 'send' unless defined $handle;
		# Connected to new peer, add to local activePeers
		my $stream = fileno $handle;
		$activePeers{$stream}{'buffer'} = '';
		$activePeers{$stream}{'handle'} = $handle;
		$activePeers{$stream}{'peer'} = $peer;
		$activePeersChanged = 1;
		$select->add($handle);
		logAdd "   Stream$stream is connection to $peer", 'send';
		}
	# Send message
	print $handle "$subject|$content\0";
	}
# ______________________________________________________________________________________________________________________ #

# Decode and execute received message
# - executes received events (nodal commands)
# - replies with updates if local is more recent
# - Discovers own IP from incoming 'to' field if still unknown
# - Disconnects stream if it is us
# - Add sender to our activePeers
# - Add senders' activePeers to our potentialPeers
sub processMessage {
	my $data = shift;
	my $handle = shift;
	my $stream = fileno $handle;
	my %msg;
	my $peer;
	logAdd 'processMessage()', 'main';
	# Decode message
	(my $subject, $msg{'events'}) = split /[|]/, $data;
	$msg{$1} = /(.+)=(.*)/ ? $2 : /(.+)/ for split /&/, $subject;
	logAdd "   $_", 'recv' for split /&/, $subject;
	# Set self IP incase not known yet
	$thisPeer = $msg{'to'} unless defined $thisPeer;
	# Identify and add sender if 'add' field found
	if (exists $msg{'add'}) {
		my $add = $msg{'add'};
		if ($add eq 'interface') {
			# Sender is an interface, initiate an addInterface event for interfaces
			$peer = "interface$thisPeer-".$msg{'guid'};
			$peer =~ s/peer//;
			$events{&guid} = "now,interfaces.addInterface,$peer";
			}
		else {
			# Sender is a peer
			my ($iport, $iaddr) = sockaddr_in getpeername $handle;
			$peer = 'peer-'.inet_ntoa($iaddr).':'.$msg{'port'};
			}
		# If peer is self, disconnect and exit (before adding as activePeer)
		if ($msg{'guid'} eq $thisGuid) {
			logAdd "   Stream$stream identified as us, disconnecting...", 'recv';
			$select->remove($handle);
			delete $activePeers{$stream};
			return $handle->close;
			}
		# Add sender to our activePeers...
		unless (exists $activePeers{$stream}{$add}) {
			logAdd "   Stream$stream identified as $peer ($add)", 'recv';
			$activePeers{$stream}{$add} = $peer;
			if ($add eq 'peer') {
				$activePeersChanged = 1;
				# ...and senders' activePeers to our potentialPeers
				updatePotentialPeers $msg{'peers'} if exists $msg{'peers'};
				}
			}
		}
	# Update event queue with any new events (content) received (discarded if already seen)
	for (split /&/, $msg{'events'}) {
		/(.+)=(.*)$/;
		unless (exists $events{$1}) {
			$events{$1} = $2;
			#logAdd "new event received:$1 = $2", 'recv';
			}
		}	
	# Interface logging
	logAdd $msg{'log'}, 'SWF-'.uc(substr($activePeers{$stream}{'interface'}, -6)) if exists $msg{'log'};
	# Interface requests peer connection
	sendMessage "peer-".$msg{'connect'} if exists $msg{'connect'};
	}
	
# ______________________________________________________________________________________________________________________ #

sub processOutgoingEvents {
	my @msg = ();
	my $qs;
	# Build a query-string of the queued-events
	while (my ($k, $v) = each %events) {
		if ($v) { # adds to qs if not already processed (=0)
			push @msg, "$k=$v";
			logAdd "sending event:$k = $v", 'main';
			$events{$k} = 0;
			}
		}
	$qs = join '&', @msg;
	# If nextPeer defined, send events to next
	if (defined $nextPeer) {
		sendMessage $nextPeer, $qs if $activePeersChanged or $#msg >= 0;
		$activePeersChanged = 0;
		}
	# Also send queued changes directly to all local interfaces
	if ($#msg >= 0 ) {
		sendMessage $activePeers{$_}{'interface'}, $qs for grep exists $activePeers{$_}{'interface'}, keys %activePeers;
		}
	}

# Update potential peers from activePeers and sent comma-separated-list of peers
# - The list is sorted so that all peers can form a ring with their nextPeer's
# - called when $activePeers changed or new options arrive
sub updatePotentialPeers {
	my $addPeers = shift;
	my %peers = ();
	map $peers{$_} = 1, split /,/, $addPeers if defined $addPeers;
	$peers{$_} = 1 for @potentialPeers;
	$peers{$_} = 1 for getActivePeers;
	$peers{$thisPeer} = 1 if defined $thisPeer;
	@potentialPeers = map $_, sort grep /^peer/, keys %peers;
	logAdd "      $_", 'UPP' for @potentialPeers;
	# update nextPeer (actually use previous cos can use -1 array index)
	return unless defined $thisPeer;
	my $this = undef;
	for (0..$#potentialPeers) {$this = $_ if $potentialPeers[$_] eq $thisPeer}
	$nextPeer = ($#potentialPeers>0 and defined $this) ? $potentialPeers[--$this] : undef;
	logAdd "      Next:$nextPeer", 'UPP' if defined $nextPeer;
	}

sub getActivePeers {
	return map $activePeers{$_}{'peer'}, grep exists $activePeers{$_}{'peer'}, keys %activePeers;
	}

# %$publicSpace
# - currently path is a unique guid representing the event
# - hash-ref: use $$publicSpace{ or %$publicSpace or $publicSpace->{
# - network-tree maintained by peer-network
# - uses XML::Simple for external-file and internal-hash of network
sub node {
	my $path = shift;
	my $cmd = shift;
	my $data = shift;
	my $node = leaf $path;
	# Execute the nodal command
	if ($cmd eq 'set') {
		# Set node value, (create path if doesn't exist)
		my $ptr = pathToHashRef $publicSpace, parent("$thisRoot$path"), 1;
		return if $$ptr{$node} eq $data;
		$$ptr{$node} = $data;
		}
	elsif ($cmd eq 'delete') {
		# Delete node or value, return 0 silently if node not found
		my $ptr = pathToHashRef $publicSpace, parent "$thisRoot$path";
		return 0 unless defined $$ptr{$node};
		delete $$ptr{$node};
		}
	else {
		# Default is to return the node content or undef
		my $ptr = pathToHashRef $publicSpace, parent "$thisRoot$path";
		return $$ptr{$node};
		}
	}

# Use standard /-separated path-string to access hash-tree
# - returns undef if any non-hash-ref is encountered, unless,
# - $create is defined, then the path will be created as traversed
sub pathToHashRef {
	my $ptr = shift;
	my $path = shift;
	my $create = shift;
	for (split /\//, $path) {
		if (ref $$ptr{$_} eq 'HASH')	{$ptr = $$ptr{$_}}
		elsif (defined $create)			{$ptr = $$ptr{$_} = {}}
		else 									{return}
		}
	return $ptr;
	}

# Return parent path of passed path
sub parent {
	my $path = shift;
	$path =~ s/\/[^\/]+?$//;
	return $path;
	}
	
# Return parent path of passed path
sub leaf {
	my $path = shift;
	$path =~ s/^.*\///;
	return $path;
	}
	
# Generate a guid
sub guid {
	return 'p'.(rand()+time);
	}
	
# Add an item to the output log (and print if debugging)
sub logAdd {
	my $entry = shift;
	my $where = shift;
	$where = '' unless defined $where;
	return unless $where =~ /$logPattern/;
	$where = " $where" while length $where < 10;
	print localtime()." : $where : $entry\n";
	}