Nodal-wikid.pl

From Organic Design wiki
Revision as of 13:30, 2 August 2017 by Nad (talk | contribs)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Legacy.svg Legacy: This article describes a concept that has been superseded in the course of ongoing development on the Organic Design wiki. Please do not develop this any further or base work on this concept, this is only useful for a historic record of work done. You may find a link to the currently used concept or function in this article, if not you can contact the author to find out what has taken the place of this legacy item.
# Licensed under LGPL: www.gnu.org/copyleft/lesser.html
#
# NodalWikid.pl - NodalHash using wiki as persistence (before moving to peerd)
#
# NOTE: peerd is extremely inefficient!
#       Nodes should be based on binary (see ListSpace), rather than PERL's tied hashes
#       And NodalWikid.pl is even more inefficient :-)
#
use IO::Socket;
use IO::Select;

our $maxNodes = 1000; # Max nodes that can fit in local runtime NodalSpace
our %space;
our $space;

# ----------------------------------------------------------------------------------------------------------- #
# NODAL CORE CLASS
# - Converts a standard PERL hash-table into a Distributed Computational Hash Table
# - This class is "hard-wired" to act on %::space
package NodalSpace;
use Scalar::Util qw( refaddr );
use Carp;

# Nodes needed for nodal operation
# - THIS and SELF are concepts that contain a node-reference
use constant OBJECT		=> 0;	# aka ROOT
use constant LIST		=> 1;	# aka SUBJECT
use constant HASH		=> 2;	# aka CONTENT
use constant HUSK		=> 3;	# General Husk (class/struct of husk)
use constant THIS		=> 4;	# This executing Husk (runtime of husk)
use constant PEER		=> 5;	# General Peer class (class of peer)
use constant SELF		=> 6;	# This peer (runtime of this peer)
use constant INIT		=> 7;	# aka onCreate
use constant MAIN		=> 8;
use constant EXIT		=> 9;	# aka onRemove
use constant GUID		=> 10;
use constant NAME		=> 11;
use constant CODE		=> 16;
use constant LANG		=> 17;
use constant PERL		=> 18;
# Bender			=> 19;
# Gir				=> 20;
# Pipi				=> 21;
# IronGiant			=> 22;

# Create the actual data structure with its three aspects
# - The internal structure is a three element array containing scalar, listref and hashref
#   the hash aspect uses a two element array for keys which are refs because the actual
#   hash-key can only be the address of a ref, so its stored as {ref-addr} = [ref-key,value]
sub TIEHASH {
	my $this = []; # Note that the internal data structure is actually an array not a hash
	bless $this, shift;
	$this->CLEAR;
	return $this;
	}

# Used when a NodalHash is assigned with {} or initialised with TIEHASH
# - clears all three aspects of the NodalHash object
sub CLEAR {
	my $this = shift;
	$$this[OBJECT] = '';
	$$this[LIST] = [];
	$$this[HASH] = {};
	}

# Return value in passed key
# - if value is a ref (which it should be) then it's blessed as a NodalHash so methods are accessible
# - remember, keys which are refs must be treated specially to preserve the key as a reference
# - the special key of -i returns the nodes internal data-structure (a 3-element array)
#   the internal structure is only meant for use by other non-TIEHASH methods of the Nodal class
# - onAccess happens here, but it may not be a useful concept to adopt
sub FETCH {
	my ( $this, $key, $val ) = @_;
	return $this if $key eq -i;
	my $k = ref $key ? refaddr $key : $key;
	$val = exists $$this[HASH]{$k} ? $$this[HASH]{$k}[1] : undef;
	$val = $$val[1] if ref $key;
	return ref $val ? bless $val, ref $this : $val;
	}

# Store val in key of NodalHash
# - If key is a ref, then store key and val together using address as key
# - If val is a hash, tie it to this Nodal class
sub STORE {
	my ( $this, $key, $val ) = @_;
	my $k = ref $key ? refaddr $key : $key;
	tie %$val, ref $this if ref $val eq 'HASH' and not tied %$val;
	# Get current value
	my $cur = exists $$this[HASH]{$k} ? $$this[HASH]{$k} : undef;
	$cur = $$cur[1] if ref $key;
	# If changing, update, queue for propagation and queue any onChange
	if ( $val ne $cur ) {
		$$this[HASH]{$k} = ref $key ? [$key, $val] : $val;
		# QUEUE THE CHANGE FOR SYNC HERE (general onChange may not be useful)
		}
	$val;
	}

# Nodal Reduction
# - An item shifts off to executes, it returns what to push back on (if anything)
# - If there is no code here, then execution goes within, and the same node is pushed back on
#   Reducable items with no cod ere Processes, their presence is controlled by their INIT and EXIT processes
# - If there is code, but not locally executable, then build from this language's text description
sub reduce {
	my @list = shift->{-i}[LIST];
	return unless my $node = shift @list;
	if ( exists $$node{CODE} ) {
		my $code = $$node{CODE}{ $$space{ $$space{THIS} }{LANG} }->object;
		$$node{CODE}->object = eval "sub{$code}" unless $$node{CODE}->object;
		if ( ref $$node{CODE}->object eq 'CODE' ) { $node = &{ $$node{CODE}->object } }
		else {
			# error: could not declare this code in local language
			# - we need to create an instance of this kind of error
			}
		} else { $node->reduce }
	push @list, $node if ref $node;
	}

sub DELETE {
	my ( $this, $key ) = @_;
	delete $$this[HASH]{ ref $key ? refaddr $key : $key };
	}

sub EXISTS {
	my ( $this, $key ) = @_;
	exists $$this[HASH]{ ref $key ? refaddr $key : $key };
	}

# Returns ref to lambda content
# - Content is returned directly if already a reference
sub STATE {
	my $val = shift->{-i}[STATE];
	return ref $val ? $val : \$val;
	}

# STORAGE OVERVIEW
# - each storage resource is an n-node list which is filled from the start and chopped off the end
# - this allows chopped data to be aggregated and compressed into deeper archive if necessary
# - for now this can simply be done with an index file listing the node guids in order
# - the node-associations and data-content are stored in guid-named file
# - this method can easily use FS or DB, and they can share common NodalSpace::toString and NodalSpace::fromString methods

# Writes and flushes change-buffer to local file-cache
sub sync {
	my $this = shift;
	my @synclist = @{ $$this{SYNC}->list };
	for ( @synclist ) {
		# something
		}
	&::logAdd( "$#synclist nodes exported." );
	}

# Loads $::maxNodes from the nodal index file
# - saved from @root list
sub load {
	my @rootlist = @{ $space->list };

	# first loop thru maxNodes of root-list and create empty hashrefs
	# - the new hashref is a NodalHash (because assigned to a NodalHash key)
	# - don't know what to do with root a:b yet
	open ROOTH, '<', "$peer.3";
	while ( <ROOTH> =~ /^(.+?)(:(.+?))?$/ && $#rootlist < $::maxNodes ) {
		my $node = $$space{$1} = {};
		${ $$node{GUID}->object } = $1 unless $2;
		push @rootlist, $node;
		}
	close ROOTH;

	# now loop thru all empty nodes and populate from their associated files
	for my $node ( @rootlist ) {
		my $guid = $$node{GUID}->object;
		
		# Read in nodal portion (LIST and HASH aspects)
		if ( open NODEH, '<', "$guid.3" ) {
			while ( <NODEH> =~ /^(.+?)(:(.+?))?$/ )
				{ $2 ? $$space{$node}{$1} = $$space{$3} : push @{ $$space{$node}->list }, $$space{$1} }
			close NODEH
			}
		else { &::logAdd( "Couldn't read LIST & HASH content for: $node!" ) }

		# Read in SCALAR aspect (from guid.1 if small and safe)
		if ( /^([^*])+[\r\n]+\*/ ) { $$node->object = $1 }
		elsif ( open NODEH, '<', "$guid.1" ) {
			binmode NODEH;
			sysread NODEH, $$node->object, 1000000;
			close NODEH;
			}
		else { &::logAdd( "Couldn't read SCALAR content for: $node!" ) }

		}

	&::logAdd( "$#rootlist nodes nodes imported." );
	}

# ----------------------------------------------------------------------------------------------------------- #
# INITIALISE NODAL SPACE

package main;

# Create global %space, tie to Nodal class
# - $space is a blessed ref to %space for executing the non-TIEHASH methods of %space
tie %space, 'NodalSpace';
$space = \%space;
bless $space, 'NodalSpace';

# Load persistent content
# - this is not done in TIEHASH since only root should initialise the loading
$space->load;

# Convert $peer from name to node-ref of this peer in %space and map to SELF
my $userpage = readFile "User:$::peer";
$$space{NodalSpace::SELF} = $$space{$1} if $userpage =~ /^\*\s*Node\s*:\s*([0-9]+)/im;
$$space{NodalSpace::THIS} = $space; # this husks runtime (env)
#$peer = $space;
#&::logAdd( 'GUID for this peer is '.$$peer{NodalSpace::GUID} );
#$$space{NodalSpace::SELF} = $$space{$peer};

# Set SELF's LANG to PERL
$$space{NodalSpace::SELF}{NodalSpace::LANG} = $$space{NodalSpace::PERL};

# Push peer's INIT onto root queue
# - when INIT is finished, root contains MAIN for this peer
push @{ $space->list }, $$space{NodalSpace::INIT};

# ----------------------------------------------------------------------------------------------------------- #
# PEERD
# - this is the single peerd-server loop running in its own thread
# - nodal reduction can be in here instead of each session handler
# - only stream message construct/extract needs to be in the individual session-handler threads
sub peerd {

	my $port = shift;

	# On startup, notify server.pl who'll preserve our handle in global scope
	my $server;
	do {
		$server = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp';
		unless ( $server ) {
			logAdd "Couldn't establish connection to parent, retrying in 10 seconds.";
			sleep(10);
			}
		} until ( $server );
	print $server "GET /connect/peer/peerd HTTP/1.1\r\n\r\n";

	# Initialise streams
	my $select = IO::Select->new( $server );
	my %streams = ();
	$streams{fileno $server}{handle} = $server;
	$streams{fileno $server}{buffer} = '';

	# Main server loop (listening to server.pl and session-handlers)
	while(1) {

		# Loop through readable streams
		for my $handle ( $select->can_read(1) ) {
			my $stream = fileno $handle;

			# There is data to read on this stream, accumulate in this streams input buffer
			if ( sysread $handle, my $input, 10000 ) {
				$streams{$stream}{buffer} .= $input;

				# At least one complete message has accumulated
				if ( $streams{$stream}{buffer} =~ s/^(.*\x00)//s ) {

					# Remove each complete message from buffer and process
					for my $msg ( split /\x00/, $1 ) {

						# Message is from server.pl:
						if ( $handle == $server ) {
							logAdd "Message from server.pl: \"$msg\"";

							# Msg is a connect-request, so establish new stream back to server.pl
							# - the new stream-handle will be given to a newly spawned session-handler
							if ( $msg =~ /connect-request\/(stream[0-9]+)$/ ) {
								if ( my $ph = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp' ) {
									print $ph "GET /session/peer/peerd/$1 HTTP/1.1\r\n\r\n";
									my $pstream = fileno $ph;
									$streams{$pstream}{handle} = $ph;
									$streams{$pstream}{buffer} = '';
									$streams{$pstream}{interface} = 1;
									logAdd "Connecting back to server on stream$pstream";
									}
								else { logAdd "Failed to establish new connection with server.pl!" }
								}

							# Not connect-request, so propagate msg to session-handlers
							else {
								logAdd "Forwarding msg to session-handlers...";
								for ( keys %streams ) {
									my $sh = $streams{$_}{handle};
									print $sh "$msg\x00" if exists $streams{$_}{interface};
									logAdd "Forwarded to stream$_" if exists $streams{$_}{interface};
									}
								}
							}

						# Message is not from server.pl, must be from a session-handler
						else {
							logAdd "Message from a session-handler: \"$msg\"";
							}
						}
					}
				}

			# There is no more data to read on this stream, so close it
			else {
				$::select->remove( $handle );
				delete $streams{$stream};
				$handle->close();
				logAdd "Stream$stream disconnected.";
				}

			}
		}
	}

# ----------------------------------------------------------------------------------------------------------- #
# SESSION-HANDLER
# - A separate interfaceHandler function is spawned for each peer.swf connect request
# - a session-handler is a dedicated thread sitting between nodal-wikid.pl and a peer.swf
# - todo: this is spawned by server.pl, but that should be moved into here
sub sessionHandler {

	my( $peerd, $interface ) = @_;
	$::subname .= "/$$";
	my $pstream = fileno $peerd;
	my $istream = fileno $interface;
	logAdd "Session-handler( peerd/stream$pstream, interface/stream$istream )";

	# Set up a new listener for interface and parent
	my $select = IO::Select->new( $peerd, $interface );

	# Introduce ourselves to the interface we've been assigned to handle
	print $interface "Hello, I'm your session-handler, my PID is $$.\x00";

	# Server loop (listening to parent and interface streams)
	my %buffers = {};
	while(1) {
		for my $handle ( $select->can_read(1) ) {

			# One of our streams has data to read
			if ( sysread $handle, my $input, 10000 ) {
				my $stream = fileno $handle;
				$buffers{$stream} .= $input;
				if ( $buffers{$stream} =~ s/^(.*\x00)//s ) {
					for ( split /\x00/, $1 ) {
						logAdd "Msg received on stream$stream: \"$_\"";
						# Forward all data from peerd to interface
						print $interface "$_\x00" if $handle == $peerd;
						}
					}
				}

			# If either connection close, close stream-handle and die
			else {
				$peerd->close();
				$interface->close();
				logAdd 'Handles closed, Exit.';
				exit;
				}

			}
		}
	}