Difference between revisions of "Nodal-wikid.pl"

From Organic Design wiki
m
m
 
(8 intermediate revisions by 3 users not shown)
Line 1: Line 1:
 +
{{legacy}}
 +
<source lang="perl">
 
# Licensed under LGPL: www.gnu.org/copyleft/lesser.html
 
# Licensed under LGPL: www.gnu.org/copyleft/lesser.html
 
#
 
#
Line 10: Line 12:
 
use IO::Select;
 
use IO::Select;
  
our $maxNodes = 1000;
+
our $maxNodes = 1000; # Max nodes that can fit in local runtime NodalSpace
 
our %space;
 
our %space;
 
our $space;
 
our $space;
Line 17: Line 19:
 
# NODAL CORE CLASS
 
# NODAL CORE CLASS
 
# - Converts a standard PERL hash-table into a Distributed Computational Hash Table
 
# - Converts a standard PERL hash-table into a Distributed Computational Hash Table
# - This class is "hardwired" to act on %::space
+
# - This class is "hard-wired" to act on %::space
package Nodal;
+
package NodalSpace;
 
use Scalar::Util qw( refaddr );
 
use Scalar::Util qw( refaddr );
 
use Carp;
 
use Carp;
Line 126: Line 128:
 
exists $$this[HASH]{ ref $key ? refaddr $key : $key };
 
exists $$this[HASH]{ ref $key ? refaddr $key : $key };
 
}
 
}
 
# Returns an array reference to the nodes LIST aspect
 
sub list { return shift->{-i}[LIST] }
 
 
# Returns a hash reference to the nodes HASH aspect
 
sub hash { return shift->{-i}[HASH] }
 
  
 
# Returns ref to lambda content
 
# Returns ref to lambda content
 
# - Content is returned directly if already a reference
 
# - Content is returned directly if already a reference
sub object {
+
sub STATE {
my $val = shift->{-i}[OBJECT];
+
my $val = shift->{-i}[STATE];
 
return ref $val ? $val : \$val;
 
return ref $val ? $val : \$val;
 
}
 
}
  
# Writes and flushes LIST/HASH change-buffer to local file-cache
+
# STORAGE OVERVIEW
# - ROOT's LIST aspect is the ordered list of all nodes
+
# - each storage resource is an n-node list which is filled from the start and chopped off the end
#   the root when loaded has [] and the loaded LIST in {} and @root
+
# - this allows chopped data to be aggregated and compressed into deeper archive if necessary
# - actual LIST/HASH content of each node is in guid.12 file, SCALAR in <guid>
+
# - for now this can simply be done with an index file listing the node guids in order
 
+
# - the node-associations and data-content are stored in guid-named file
# OR
+
# - this method can easily use FS or DB, and they can share common NodalSpace::toString and NodalSpace::fromString methods
 
 
# - what if the quanta start at this peer not root
 
# - then root's LIST aspect can be @space
 
  
 +
# Writes and flushes change-buffer to local file-cache
 
sub sync {
 
sub sync {
 
my $this = shift;
 
my $this = shift;
Line 159: Line 153:
 
}
 
}
  
# Loads $::maxNodes of CONTENT/SUBJECT from local file-cache
+
# Loads $::maxNodes from the nodal index file
 
# - saved from @root list
 
# - saved from @root list
 
sub load {
 
sub load {
Line 208: Line 202:
 
# Create global %space, tie to Nodal class
 
# Create global %space, tie to Nodal class
 
# - $space is a blessed ref to %space for executing the non-TIEHASH methods of %space
 
# - $space is a blessed ref to %space for executing the non-TIEHASH methods of %space
tie %space, 'Nodal';
+
tie %space, 'NodalSpace';
 
$space = \%space;
 
$space = \%space;
bless $space, 'Nodal';
+
bless $space, 'NodalSpace';
  
 
# Load persistent content
 
# Load persistent content
Line 218: Line 212:
 
# Convert $peer from name to node-ref of this peer in %space and map to SELF
 
# Convert $peer from name to node-ref of this peer in %space and map to SELF
 
my $userpage = readFile "User:$::peer";
 
my $userpage = readFile "User:$::peer";
$$space{Nodal::SELF} = $$space{$1} if $userpage =~ /^\*\s*Node\s*:\s*([0-9]+)/im;
+
$$space{NodalSpace::SELF} = $$space{$1} if $userpage =~ /^\*\s*Node\s*:\s*([0-9]+)/im;
$$space{Nodal::THIS} = $space; # this husks runtime (env)
+
$$space{NodalSpace::THIS} = $space; # this husks runtime (env)
 
#$peer = $space;
 
#$peer = $space;
#&::logAdd( 'GUID for this peer is '.$$peer{Nodal::GUID} );
+
#&::logAdd( 'GUID for this peer is '.$$peer{NodalSpace::GUID} );
#$$space{Nodal::SELF} = $$space{$peer};
+
#$$space{NodalSpace::SELF} = $$space{$peer};
  
 
# Set SELF's LANG to PERL
 
# Set SELF's LANG to PERL
$$space{Nodal::SELF}{Nodal::LANG} = $$space{Nodal::PERL};
+
$$space{NodalSpace::SELF}{NodalSpace::LANG} = $$space{NodalSpace::PERL};
  
 
# Push peer's INIT onto root queue
 
# Push peer's INIT onto root queue
 
# - when INIT is finished, root contains MAIN for this peer
 
# - when INIT is finished, root contains MAIN for this peer
push @{ $space->list }, $$space{Nodal::INIT};
+
push @{ $space->list }, $$space{NodalSpace::INIT};
  
 
# ----------------------------------------------------------------------------------------------------------- #
 
# ----------------------------------------------------------------------------------------------------------- #
Line 255: Line 249:
 
my %streams = ();
 
my %streams = ();
 
$streams{fileno $server}{handle} = $server;
 
$streams{fileno $server}{handle} = $server;
$streams{fileno $server}{server} = 1;
+
$streams{fileno $server}{buffer} = '';
  
 
# Main server loop (listening to server.pl and session-handlers)
 
# Main server loop (listening to server.pl and session-handlers)
Line 283: Line 277:
 
if ( my $ph = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp' ) {
 
if ( my $ph = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp' ) {
 
print $ph "GET /session/peer/peerd/$1 HTTP/1.1\r\n\r\n";
 
print $ph "GET /session/peer/peerd/$1 HTTP/1.1\r\n\r\n";
$streams{fileno $ph}{handle} = $ph;
+
my $pstream = fileno $ph;
 +
$streams{$pstream}{handle} = $ph;
 +
$streams{$pstream}{buffer} = '';
 +
$streams{$pstream}{interface} = 1;
 +
logAdd "Connecting back to server on stream$pstream";
 
}
 
}
 
else { logAdd "Failed to establish new connection with server.pl!" }
 
else { logAdd "Failed to establish new connection with server.pl!" }
Line 293: Line 291:
 
for ( keys %streams ) {
 
for ( keys %streams ) {
 
my $sh = $streams{$_}{handle};
 
my $sh = $streams{$_}{handle};
print $sh "$msg\x00" unless exists $streams{$_}{server};
+
print $sh "$msg\x00" if exists $streams{$_}{interface};
logAdd "Forwarded to stream$_" unless exists $streams{$_}{server};
+
logAdd "Forwarded to stream$_" if exists $streams{$_}{interface};
 
}
 
}
 
}
 
}
Line 326: Line 324:
 
sub sessionHandler {
 
sub sessionHandler {
  
my( $peerd, $interface, $stream ) = @_;
+
my( $peerd, $interface ) = @_;
 
$::subname .= "/$$";
 
$::subname .= "/$$";
 
my $pstream = fileno $peerd;
 
my $pstream = fileno $peerd;
Line 367: Line 365:
 
}
 
}
 
}
 
}
 +
</source>
 +
[[Category:PERL]]

Latest revision as of 13:30, 2 August 2017

Legacy.svg Legacy: This article describes a concept that has been superseded in the course of ongoing development on the Organic Design wiki. Please do not develop this any further or base work on this concept, now this page is for historic record only.
# Licensed under LGPL: www.gnu.org/copyleft/lesser.html
#
# NodalWikid.pl - NodalHash using wiki as persistence (before moving to peerd)
#
# NOTE: peerd is extremely inefficient!
#       Nodes should be based on binary (see ListSpace), rather than PERL's tied hashes
#       And NodalWikid.pl is even more inefficient :-)
#
use IO::Socket;
use IO::Select;

our $maxNodes = 1000; # Max nodes that can fit in local runtime NodalSpace
our %space;
our $space;

# ----------------------------------------------------------------------------------------------------------- #
# NODAL CORE CLASS
# - Converts a standard PERL hash-table into a Distributed Computational Hash Table
# - This class is "hard-wired" to act on %::space
package NodalSpace;
use Scalar::Util qw( refaddr );
use Carp;

# Nodes needed for nodal operation
# - THIS and SELF are concepts that contain a node-reference
use constant OBJECT		=> 0;	# aka ROOT
use constant LIST		=> 1;	# aka SUBJECT
use constant HASH		=> 2;	# aka CONTENT
use constant HUSK		=> 3;	# General Husk (class/struct of husk)
use constant THIS		=> 4;	# This executing Husk (runtime of husk)
use constant PEER		=> 5;	# General Peer class (class of peer)
use constant SELF		=> 6;	# This peer (runtime of this peer)
use constant INIT		=> 7;	# aka onCreate
use constant MAIN		=> 8;
use constant EXIT		=> 9;	# aka onRemove
use constant GUID		=> 10;
use constant NAME		=> 11;
use constant CODE		=> 16;
use constant LANG		=> 17;
use constant PERL		=> 18;
# Bender			=> 19;
# Gir				=> 20;
# Pipi				=> 21;
# IronGiant			=> 22;

# Create the actual data structure with its three aspects
# - The internal structure is a three element array containing scalar, listref and hashref
#   the hash aspect uses a two element array for keys which are refs because the actual
#   hash-key can only be the address of a ref, so its stored as {ref-addr} = [ref-key,value]
sub TIEHASH {
	my $this = []; # Note that the internal data structure is actually an array not a hash
	bless $this, shift;
	$this->CLEAR;
	return $this;
	}

# Used when a NodalHash is assigned with {} or initialised with TIEHASH
# - clears all three aspects of the NodalHash object
sub CLEAR {
	my $this = shift;
	$$this[OBJECT] = '';
	$$this[LIST] = [];
	$$this[HASH] = {};
	}

# Return value in passed key
# - if value is a ref (which it should be) then it's blessed as a NodalHash so methods are accessible
# - remember, keys which are refs must be treated specially to preserve the key as a reference
# - the special key of -i returns the nodes internal data-structure (a 3-element array)
#   the internal structure is only meant for use by other non-TIEHASH methods of the Nodal class
# - onAccess happens here, but it may not be a useful concept to adopt
sub FETCH {
	my ( $this, $key, $val ) = @_;
	return $this if $key eq -i;
	my $k = ref $key ? refaddr $key : $key;
	$val = exists $$this[HASH]{$k} ? $$this[HASH]{$k}[1] : undef;
	$val = $$val[1] if ref $key;
	return ref $val ? bless $val, ref $this : $val;
	}

# Store val in key of NodalHash
# - If key is a ref, then store key and val together using address as key
# - If val is a hash, tie it to this Nodal class
sub STORE {
	my ( $this, $key, $val ) = @_;
	my $k = ref $key ? refaddr $key : $key;
	tie %$val, ref $this if ref $val eq 'HASH' and not tied %$val;
	# Get current value
	my $cur = exists $$this[HASH]{$k} ? $$this[HASH]{$k} : undef;
	$cur = $$cur[1] if ref $key;
	# If changing, update, queue for propagation and queue any onChange
	if ( $val ne $cur ) {
		$$this[HASH]{$k} = ref $key ? [$key, $val] : $val;
		# QUEUE THE CHANGE FOR SYNC HERE (general onChange may not be useful)
		}
	$val;
	}

# Nodal Reduction
# - An item shifts off to executes, it returns what to push back on (if anything)
# - If there is no code here, then execution goes within, and the same node is pushed back on
#   Reducable items with no cod ere Processes, their presence is controlled by their INIT and EXIT processes
# - If there is code, but not locally executable, then build from this language's text description
sub reduce {
	my @list = shift->{-i}[LIST];
	return unless my $node = shift @list;
	if ( exists $$node{CODE} ) {
		my $code = $$node{CODE}{ $$space{ $$space{THIS} }{LANG} }->object;
		$$node{CODE}->object = eval "sub{$code}" unless $$node{CODE}->object;
		if ( ref $$node{CODE}->object eq 'CODE' ) { $node = &{ $$node{CODE}->object } }
		else {
			# error: could not declare this code in local language
			# - we need to create an instance of this kind of error
			}
		} else { $node->reduce }
	push @list, $node if ref $node;
	}

sub DELETE {
	my ( $this, $key ) = @_;
	delete $$this[HASH]{ ref $key ? refaddr $key : $key };
	}

sub EXISTS {
	my ( $this, $key ) = @_;
	exists $$this[HASH]{ ref $key ? refaddr $key : $key };
	}

# Returns ref to lambda content
# - Content is returned directly if already a reference
sub STATE {
	my $val = shift->{-i}[STATE];
	return ref $val ? $val : \$val;
	}

# STORAGE OVERVIEW
# - each storage resource is an n-node list which is filled from the start and chopped off the end
# - this allows chopped data to be aggregated and compressed into deeper archive if necessary
# - for now this can simply be done with an index file listing the node guids in order
# - the node-associations and data-content are stored in guid-named file
# - this method can easily use FS or DB, and they can share common NodalSpace::toString and NodalSpace::fromString methods

# Writes and flushes change-buffer to local file-cache
sub sync {
	my $this = shift;
	my @synclist = @{ $$this{SYNC}->list };
	for ( @synclist ) {
		# something
		}
	&::logAdd( "$#synclist nodes exported." );
	}

# Loads $::maxNodes from the nodal index file
# - saved from @root list
sub load {
	my @rootlist = @{ $space->list };

	# first loop thru maxNodes of root-list and create empty hashrefs
	# - the new hashref is a NodalHash (because assigned to a NodalHash key)
	# - don't know what to do with root a:b yet
	open ROOTH, '<', "$peer.3";
	while ( <ROOTH> =~ /^(.+?)(:(.+?))?$/ && $#rootlist < $::maxNodes ) {
		my $node = $$space{$1} = {};
		${ $$node{GUID}->object } = $1 unless $2;
		push @rootlist, $node;
		}
	close ROOTH;

	# now loop thru all empty nodes and populate from their associated files
	for my $node ( @rootlist ) {
		my $guid = $$node{GUID}->object;
		
		# Read in nodal portion (LIST and HASH aspects)
		if ( open NODEH, '<', "$guid.3" ) {
			while ( <NODEH> =~ /^(.+?)(:(.+?))?$/ )
				{ $2 ? $$space{$node}{$1} = $$space{$3} : push @{ $$space{$node}->list }, $$space{$1} }
			close NODEH
			}
		else { &::logAdd( "Couldn't read LIST & HASH content for: $node!" ) }

		# Read in SCALAR aspect (from guid.1 if small and safe)
		if ( /^([^*])+[\r\n]+\*/ ) { $$node->object = $1 }
		elsif ( open NODEH, '<', "$guid.1" ) {
			binmode NODEH;
			sysread NODEH, $$node->object, 1000000;
			close NODEH;
			}
		else { &::logAdd( "Couldn't read SCALAR content for: $node!" ) }

		}

	&::logAdd( "$#rootlist nodes nodes imported." );
	}

# ----------------------------------------------------------------------------------------------------------- #
# INITIALISE NODAL SPACE

package main;

# Create global %space, tie to Nodal class
# - $space is a blessed ref to %space for executing the non-TIEHASH methods of %space
tie %space, 'NodalSpace';
$space = \%space;
bless $space, 'NodalSpace';

# Load persistent content
# - this is not done in TIEHASH since only root should initialise the loading
$space->load;

# Convert $peer from name to node-ref of this peer in %space and map to SELF
my $userpage = readFile "User:$::peer";
$$space{NodalSpace::SELF} = $$space{$1} if $userpage =~ /^\*\s*Node\s*:\s*([0-9]+)/im;
$$space{NodalSpace::THIS} = $space; # this husks runtime (env)
#$peer = $space;
#&::logAdd( 'GUID for this peer is '.$$peer{NodalSpace::GUID} );
#$$space{NodalSpace::SELF} = $$space{$peer};

# Set SELF's LANG to PERL
$$space{NodalSpace::SELF}{NodalSpace::LANG} = $$space{NodalSpace::PERL};

# Push peer's INIT onto root queue
# - when INIT is finished, root contains MAIN for this peer
push @{ $space->list }, $$space{NodalSpace::INIT};

# ----------------------------------------------------------------------------------------------------------- #
# PEERD
# - this is the single peerd-server loop running in its own thread
# - nodal reduction can be in here instead of each session handler
# - only stream message construct/extract needs to be in the individual session-handler threads
sub peerd {

	my $port = shift;

	# On startup, notify server.pl who'll preserve our handle in global scope
	my $server;
	do {
		$server = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp';
		unless ( $server ) {
			logAdd "Couldn't establish connection to parent, retrying in 10 seconds.";
			sleep(10);
			}
		} until ( $server );
	print $server "GET /connect/peer/peerd HTTP/1.1\r\n\r\n";

	# Initialise streams
	my $select = IO::Select->new( $server );
	my %streams = ();
	$streams{fileno $server}{handle} = $server;
	$streams{fileno $server}{buffer} = '';

	# Main server loop (listening to server.pl and session-handlers)
	while(1) {

		# Loop through readable streams
		for my $handle ( $select->can_read(1) ) {
			my $stream = fileno $handle;

			# There is data to read on this stream, accumulate in this streams input buffer
			if ( sysread $handle, my $input, 10000 ) {
				$streams{$stream}{buffer} .= $input;

				# At least one complete message has accumulated
				if ( $streams{$stream}{buffer} =~ s/^(.*\x00)//s ) {

					# Remove each complete message from buffer and process
					for my $msg ( split /\x00/, $1 ) {

						# Message is from server.pl:
						if ( $handle == $server ) {
							logAdd "Message from server.pl: \"$msg\"";

							# Msg is a connect-request, so establish new stream back to server.pl
							# - the new stream-handle will be given to a newly spawned session-handler
							if ( $msg =~ /connect-request\/(stream[0-9]+)$/ ) {
								if ( my $ph = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp' ) {
									print $ph "GET /session/peer/peerd/$1 HTTP/1.1\r\n\r\n";
									my $pstream = fileno $ph;
									$streams{$pstream}{handle} = $ph;
									$streams{$pstream}{buffer} = '';
									$streams{$pstream}{interface} = 1;
									logAdd "Connecting back to server on stream$pstream";
									}
								else { logAdd "Failed to establish new connection with server.pl!" }
								}

							# Not connect-request, so propagate msg to session-handlers
							else {
								logAdd "Forwarding msg to session-handlers...";
								for ( keys %streams ) {
									my $sh = $streams{$_}{handle};
									print $sh "$msg\x00" if exists $streams{$_}{interface};
									logAdd "Forwarded to stream$_" if exists $streams{$_}{interface};
									}
								}
							}

						# Message is not from server.pl, must be from a session-handler
						else {
							logAdd "Message from a session-handler: \"$msg\"";
							}
						}
					}
				}

			# There is no more data to read on this stream, so close it
			else {
				$::select->remove( $handle );
				delete $streams{$stream};
				$handle->close();
				logAdd "Stream$stream disconnected.";
				}

			}
		}
	}

# ----------------------------------------------------------------------------------------------------------- #
# SESSION-HANDLER
# - A separate interfaceHandler function is spawned for each peer.swf connect request
# - a session-handler is a dedicated thread sitting between nodal-wikid.pl and a peer.swf
# - todo: this is spawned by server.pl, but that should be moved into here
sub sessionHandler {

	my( $peerd, $interface ) = @_;
	$::subname .= "/$$";
	my $pstream = fileno $peerd;
	my $istream = fileno $interface;
	logAdd "Session-handler( peerd/stream$pstream, interface/stream$istream )";

	# Set up a new listener for interface and parent
	my $select = IO::Select->new( $peerd, $interface );

	# Introduce ourselves to the interface we've been assigned to handle
	print $interface "Hello, I'm your session-handler, my PID is $$.\x00";

	# Server loop (listening to parent and interface streams)
	my %buffers = {};
	while(1) {
		for my $handle ( $select->can_read(1) ) {

			# One of our streams has data to read
			if ( sysread $handle, my $input, 10000 ) {
				my $stream = fileno $handle;
				$buffers{$stream} .= $input;
				if ( $buffers{$stream} =~ s/^(.*\x00)//s ) {
					for ( split /\x00/, $1 ) {
						logAdd "Msg received on stream$stream: \"$_\"";
						# Forward all data from peerd to interface
						print $interface "$_\x00" if $handle == $peerd;
						}
					}
				}

			# If either connection close, close stream-handle and die
			else {
				$peerd->close();
				$interface->close();
				logAdd 'Handles closed, Exit.';
				exit;
				}

			}
		}
	}