Nodal-wikid.pl
From Organic Design wiki
# Licensed under LGPL: www.gnu.org/copyleft/lesser.html
#
# NodalWikid.pl - NodalHash using wiki as persistence (before moving to peerd)
#
# NOTE: peerd is extremely inefficient!
# Nodes should be based on binary (see ListSpace), rather than PERL's tied hashes
# And NodalWikid.pl is even more inefficient :-)
#
use IO::Socket;
use IO::Select;
our $maxNodes = 1000; # Max nodes that can fit in local runtime NodalSpace
our %space;
our $space;
# ----------------------------------------------------------------------------------------------------------- #
# NODAL CORE CLASS
# - Converts a standard PERL hash-table into a Distributed Computational Hash Table
# - This class is "hard-wired" to act on %::space
package NodalSpace;
use Scalar::Util qw( refaddr );
use Carp;
# Nodes needed for nodal operation
# - THIS and SELF are concepts that contain a node-reference
use constant OBJECT => 0; # aka ROOT
use constant LIST => 1; # aka SUBJECT
use constant HASH => 2; # aka CONTENT
use constant HUSK => 3; # General Husk (class/struct of husk)
use constant THIS => 4; # This executing Husk (runtime of husk)
use constant PEER => 5; # General Peer class (class of peer)
use constant SELF => 6; # This peer (runtime of this peer)
use constant INIT => 7; # aka onCreate
use constant MAIN => 8;
use constant EXIT => 9; # aka onRemove
use constant GUID => 10;
use constant NAME => 11;
use constant CODE => 16;
use constant LANG => 17;
use constant PERL => 18;
# Bender => 19;
# Gir => 20;
# Pipi => 21;
# IronGiant => 22;
# Create the actual data structure with its three aspects
# - The internal structure is a three element array containing scalar, listref and hashref
# the hash aspect uses a two element array for keys which are refs because the actual
# hash-key can only be the address of a ref, so its stored as {ref-addr} = [ref-key,value]
sub TIEHASH {
my $this = []; # Note that the internal data structure is actually an array not a hash
bless $this, shift;
$this->CLEAR;
return $this;
}
# Used when a NodalHash is assigned with {} or initialised with TIEHASH
# - clears all three aspects of the NodalHash object
sub CLEAR {
my $this = shift;
$$this[OBJECT] = '';
$$this[LIST] = [];
$$this[HASH] = {};
}
# Return value in passed key
# - if value is a ref (which it should be) then it's blessed as a NodalHash so methods are accessible
# - remember, keys which are refs must be treated specially to preserve the key as a reference
# - the special key of -i returns the nodes internal data-structure (a 3-element array)
# the internal structure is only meant for use by other non-TIEHASH methods of the Nodal class
# - onAccess happens here, but it may not be a useful concept to adopt
sub FETCH {
my ( $this, $key, $val ) = @_;
return $this if $key eq -i;
my $k = ref $key ? refaddr $key : $key;
$val = exists $$this[HASH]{$k} ? $$this[HASH]{$k}[1] : undef;
$val = $$val[1] if ref $key;
return ref $val ? bless $val, ref $this : $val;
}
# Store val in key of NodalHash
# - If key is a ref, then store key and val together using address as key
# - If val is a hash, tie it to this Nodal class
sub STORE {
my ( $this, $key, $val ) = @_;
my $k = ref $key ? refaddr $key : $key;
tie %$val, ref $this if ref $val eq 'HASH' and not tied %$val;
# Get current value
my $cur = exists $$this[HASH]{$k} ? $$this[HASH]{$k} : undef;
$cur = $$cur[1] if ref $key;
# If changing, update, queue for propagation and queue any onChange
if ( $val ne $cur ) {
$$this[HASH]{$k} = ref $key ? [$key, $val] : $val;
# QUEUE THE CHANGE FOR SYNC HERE (general onChange may not be useful)
}
$val;
}
# Nodal Reduction
# - An item shifts off to executes, it returns what to push back on (if anything)
# - If there is no code here, then execution goes within, and the same node is pushed back on
# Reducable items with no cod ere Processes, their presence is controlled by their INIT and EXIT processes
# - If there is code, but not locally executable, then build from this language's text description
sub reduce {
my @list = shift->{-i}[LIST];
return unless my $node = shift @list;
if ( exists $$node{CODE} ) {
my $code = $$node{CODE}{ $$space{ $$space{THIS} }{LANG} }->object;
$$node{CODE}->object = eval "sub{$code}" unless $$node{CODE}->object;
if ( ref $$node{CODE}->object eq 'CODE' ) { $node = &{ $$node{CODE}->object } }
else {
# error: could not declare this code in local language
# - we need to create an instance of this kind of error
}
} else { $node->reduce }
push @list, $node if ref $node;
}
sub DELETE {
my ( $this, $key ) = @_;
delete $$this[HASH]{ ref $key ? refaddr $key : $key };
}
sub EXISTS {
my ( $this, $key ) = @_;
exists $$this[HASH]{ ref $key ? refaddr $key : $key };
}
# Returns ref to lambda content
# - Content is returned directly if already a reference
sub STATE {
my $val = shift->{-i}[STATE];
return ref $val ? $val : \$val;
}
# STORAGE OVERVIEW
# - each storage resource is an n-node list which is filled from the start and chopped off the end
# - this allows chopped data to be aggregated and compressed into deeper archive if necessary
# - for now this can simply be done with an index file listing the node guids in order
# - the node-associations and data-content are stored in guid-named file
# - this method can easily use FS or DB, and they can share common NodalSpace::toString and NodalSpace::fromString methods
# Writes and flushes change-buffer to local file-cache
sub sync {
my $this = shift;
my @synclist = @{ $$this{SYNC}->list };
for ( @synclist ) {
# something
}
&::logAdd( "$#synclist nodes exported." );
}
# Loads $::maxNodes from the nodal index file
# - saved from @root list
sub load {
my @rootlist = @{ $space->list };
# first loop thru maxNodes of root-list and create empty hashrefs
# - the new hashref is a NodalHash (because assigned to a NodalHash key)
# - don't know what to do with root a:b yet
open ROOTH, '<', "$peer.3";
while ( <ROOTH> =~ /^(.+?)(:(.+?))?$/ && $#rootlist < $::maxNodes ) {
my $node = $$space{$1} = {};
${ $$node{GUID}->object } = $1 unless $2;
push @rootlist, $node;
}
close ROOTH;
# now loop thru all empty nodes and populate from their associated files
for my $node ( @rootlist ) {
my $guid = $$node{GUID}->object;
# Read in nodal portion (LIST and HASH aspects)
if ( open NODEH, '<', "$guid.3" ) {
while ( <NODEH> =~ /^(.+?)(:(.+?))?$/ )
{ $2 ? $$space{$node}{$1} = $$space{$3} : push @{ $$space{$node}->list }, $$space{$1} }
close NODEH
}
else { &::logAdd( "Couldn't read LIST & HASH content for: $node!" ) }
# Read in SCALAR aspect (from guid.1 if small and safe)
if ( /^([^*])+[\r\n]+\*/ ) { $$node->object = $1 }
elsif ( open NODEH, '<', "$guid.1" ) {
binmode NODEH;
sysread NODEH, $$node->object, 1000000;
close NODEH;
}
else { &::logAdd( "Couldn't read SCALAR content for: $node!" ) }
}
&::logAdd( "$#rootlist nodes nodes imported." );
}
# ----------------------------------------------------------------------------------------------------------- #
# INITIALISE NODAL SPACE
package main;
# Create global %space, tie to Nodal class
# - $space is a blessed ref to %space for executing the non-TIEHASH methods of %space
tie %space, 'NodalSpace';
$space = \%space;
bless $space, 'NodalSpace';
# Load persistent content
# - this is not done in TIEHASH since only root should initialise the loading
$space->load;
# Convert $peer from name to node-ref of this peer in %space and map to SELF
my $userpage = readFile "User:$::peer";
$$space{NodalSpace::SELF} = $$space{$1} if $userpage =~ /^\*\s*Node\s*:\s*([0-9]+)/im;
$$space{NodalSpace::THIS} = $space; # this husks runtime (env)
#$peer = $space;
#&::logAdd( 'GUID for this peer is '.$$peer{NodalSpace::GUID} );
#$$space{NodalSpace::SELF} = $$space{$peer};
# Set SELF's LANG to PERL
$$space{NodalSpace::SELF}{NodalSpace::LANG} = $$space{NodalSpace::PERL};
# Push peer's INIT onto root queue
# - when INIT is finished, root contains MAIN for this peer
push @{ $space->list }, $$space{NodalSpace::INIT};
# ----------------------------------------------------------------------------------------------------------- #
# PEERD
# - this is the single peerd-server loop running in its own thread
# - nodal reduction can be in here instead of each session handler
# - only stream message construct/extract needs to be in the individual session-handler threads
sub peerd {
my $port = shift;
# On startup, notify server.pl who'll preserve our handle in global scope
my $server;
do {
$server = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp';
unless ( $server ) {
logAdd "Couldn't establish connection to parent, retrying in 10 seconds.";
sleep(10);
}
} until ( $server );
print $server "GET /connect/peer/peerd HTTP/1.1\r\n\r\n";
# Initialise streams
my $select = IO::Select->new( $server );
my %streams = ();
$streams{fileno $server}{handle} = $server;
$streams{fileno $server}{buffer} = '';
# Main server loop (listening to server.pl and session-handlers)
while(1) {
# Loop through readable streams
for my $handle ( $select->can_read(1) ) {
my $stream = fileno $handle;
# There is data to read on this stream, accumulate in this streams input buffer
if ( sysread $handle, my $input, 10000 ) {
$streams{$stream}{buffer} .= $input;
# At least one complete message has accumulated
if ( $streams{$stream}{buffer} =~ s/^(.*\x00)//s ) {
# Remove each complete message from buffer and process
for my $msg ( split /\x00/, $1 ) {
# Message is from server.pl:
if ( $handle == $server ) {
logAdd "Message from server.pl: \"$msg\"";
# Msg is a connect-request, so establish new stream back to server.pl
# - the new stream-handle will be given to a newly spawned session-handler
if ( $msg =~ /connect-request\/(stream[0-9]+)$/ ) {
if ( my $ph = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp' ) {
print $ph "GET /session/peer/peerd/$1 HTTP/1.1\r\n\r\n";
my $pstream = fileno $ph;
$streams{$pstream}{handle} = $ph;
$streams{$pstream}{buffer} = '';
$streams{$pstream}{interface} = 1;
logAdd "Connecting back to server on stream$pstream";
}
else { logAdd "Failed to establish new connection with server.pl!" }
}
# Not connect-request, so propagate msg to session-handlers
else {
logAdd "Forwarding msg to session-handlers...";
for ( keys %streams ) {
my $sh = $streams{$_}{handle};
print $sh "$msg\x00" if exists $streams{$_}{interface};
logAdd "Forwarded to stream$_" if exists $streams{$_}{interface};
}
}
}
# Message is not from server.pl, must be from a session-handler
else {
logAdd "Message from a session-handler: \"$msg\"";
}
}
}
}
# There is no more data to read on this stream, so close it
else {
$::select->remove( $handle );
delete $streams{$stream};
$handle->close();
logAdd "Stream$stream disconnected.";
}
}
}
}
# ----------------------------------------------------------------------------------------------------------- #
# SESSION-HANDLER
# - A separate interfaceHandler function is spawned for each peer.swf connect request
# - a session-handler is a dedicated thread sitting between nodal-wikid.pl and a peer.swf
# - todo: this is spawned by server.pl, but that should be moved into here
sub sessionHandler {
my( $peerd, $interface ) = @_;
$::subname .= "/$$";
my $pstream = fileno $peerd;
my $istream = fileno $interface;
logAdd "Session-handler( peerd/stream$pstream, interface/stream$istream )";
# Set up a new listener for interface and parent
my $select = IO::Select->new( $peerd, $interface );
# Introduce ourselves to the interface we've been assigned to handle
print $interface "Hello, I'm your session-handler, my PID is $$.\x00";
# Server loop (listening to parent and interface streams)
my %buffers = {};
while(1) {
for my $handle ( $select->can_read(1) ) {
# One of our streams has data to read
if ( sysread $handle, my $input, 10000 ) {
my $stream = fileno $handle;
$buffers{$stream} .= $input;
if ( $buffers{$stream} =~ s/^(.*\x00)//s ) {
for ( split /\x00/, $1 ) {
logAdd "Msg received on stream$stream: \"$_\"";
# Forward all data from peerd to interface
print $interface "$_\x00" if $handle == $peerd;
}
}
}
# If either connection close, close stream-handle and die
else {
$peerd->close();
$interface->close();
logAdd 'Handles closed, Exit.';
exit;
}
}
}
}