Nodal-wikid.pl
<perl>
- Licensed under LGPL: www.gnu.org/copyleft/lesser.html
- NodalWikid.pl - NodalHash using wiki as persistence (before moving to peerd)
- NOTE: peerd is extremely inefficient!
- Nodes should be based on binary (see ListSpace), rather than PERL's tied hashes
- And NodalWikid.pl is even more inefficient :-)
use IO::Socket; use IO::Select;
our $maxNodes = 1000; # Max nodes that can fit in local runtime NodalSpace our %space; our $space;
- ----------------------------------------------------------------------------------------------------------- #
- NODAL CORE CLASS
- - Converts a standard PERL hash-table into a Distributed Computational Hash Table
- - This class is "hard-wired" to act on %::space
package NodalSpace; use Scalar::Util qw( refaddr ); use Carp;
- Nodes needed for nodal operation
- - THIS and SELF are concepts that contain a node-reference
use constant OBJECT => 0; # aka ROOT use constant LIST => 1; # aka SUBJECT use constant HASH => 2; # aka CONTENT use constant HUSK => 3; # General Husk (class/struct of husk) use constant THIS => 4; # This executing Husk (runtime of husk) use constant PEER => 5; # General Peer class (class of peer) use constant SELF => 6; # This peer (runtime of this peer) use constant INIT => 7; # aka onCreate use constant MAIN => 8; use constant EXIT => 9; # aka onRemove use constant GUID => 10; use constant NAME => 11; use constant CODE => 16; use constant LANG => 17; use constant PERL => 18;
- Bender => 19;
- Gir => 20;
- Pipi => 21;
- IronGiant => 22;
- Create the actual data structure with its three aspects
- - The internal structure is a three element array containing scalar, listref and hashref
- the hash aspect uses a two element array for keys which are refs because the actual
- hash-key can only be the address of a ref, so its stored as {ref-addr} = [ref-key,value]
sub TIEHASH { my $this = []; # Note that the internal data structure is actually an array not a hash bless $this, shift; $this->CLEAR; return $this; }
- Used when a NodalHash is assigned with {} or initialised with TIEHASH
- - clears all three aspects of the NodalHash object
sub CLEAR { my $this = shift; $$this[OBJECT] = ; $$this[LIST] = []; $$this[HASH] = {}; }
- Return value in passed key
- - if value is a ref (which it should be) then it's blessed as a NodalHash so methods are accessible
- - remember, keys which are refs must be treated specially to preserve the key as a reference
- - the special key of -i returns the nodes internal data-structure (a 3-element array)
- the internal structure is only meant for use by other non-TIEHASH methods of the Nodal class
- - onAccess happens here, but it may not be a useful concept to adopt
sub FETCH { my ( $this, $key, $val ) = @_; return $this if $key eq -i; my $k = ref $key ? refaddr $key : $key; $val = exists $$this[HASH]{$k} ? $$this[HASH]{$k}[1] : undef; $val = $$val[1] if ref $key; return ref $val ? bless $val, ref $this : $val; }
- Store val in key of NodalHash
- - If key is a ref, then store key and val together using address as key
- - If val is a hash, tie it to this Nodal class
sub STORE { my ( $this, $key, $val ) = @_; my $k = ref $key ? refaddr $key : $key; tie %$val, ref $this if ref $val eq 'HASH' and not tied %$val; # Get current value my $cur = exists $$this[HASH]{$k} ? $$this[HASH]{$k} : undef; $cur = $$cur[1] if ref $key; # If changing, update, queue for propagation and queue any onChange if ( $val ne $cur ) { $$this[HASH]{$k} = ref $key ? [$key, $val] : $val; # QUEUE THE CHANGE FOR SYNC HERE (general onChange may not be useful) } $val; }
- Nodal Reduction
- - An item shifts off to executes, it returns what to push back on (if anything)
- - If there is no code here, then execution goes within, and the same node is pushed back on
- Reducable items with no cod ere Processes, their presence is controlled by their INIT and EXIT processes
- - If there is code, but not locally executable, then build from this language's text description
sub reduce { my @list = shift->{-i}[LIST]; return unless my $node = shift @list; if ( exists $$node{CODE} ) { my $code = $$node{CODE}{ $$space{ $$space{THIS} }{LANG} }->object; $$node{CODE}->object = eval "sub{$code}" unless $$node{CODE}->object; if ( ref $$node{CODE}->object eq 'CODE' ) { $node = &{ $$node{CODE}->object } } else { # error: could not declare this code in local language # - we need to create an instance of this kind of error } } else { $node->reduce } push @list, $node if ref $node; }
sub DELETE { my ( $this, $key ) = @_; delete $$this[HASH]{ ref $key ? refaddr $key : $key }; }
sub EXISTS { my ( $this, $key ) = @_; exists $$this[HASH]{ ref $key ? refaddr $key : $key }; }
- Returns ref to lambda content
- - Content is returned directly if already a reference
sub STATE { my $val = shift->{-i}[STATE]; return ref $val ? $val : \$val; }
- STORAGE OVERVIEW
- - each storage resource is an n-node list which is filled from the start and chopped off the end
- - this allows chopped data to be aggregated and compressed into deeper archive if necessary
- - for now this can simply be done with an index file listing the node guids in order
- - the node-associations and data-content are stored in guid-named file
- - this method can easily use FS or DB, and they can share common NodalSpace::toString and NodalSpace::fromString methods
- Writes and flushes change-buffer to local file-cache
sub sync { my $this = shift; my @synclist = @{ $$this{SYNC}->list }; for ( @synclist ) { # something } &::logAdd( "$#synclist nodes exported." ); }
- Loads $::maxNodes from the nodal index file
- - saved from @root list
sub load { my @rootlist = @{ $space->list };
# first loop thru maxNodes of root-list and create empty hashrefs # - the new hashref is a NodalHash (because assigned to a NodalHash key) # - don't know what to do with root a:b yet open ROOTH, '<', "$peer.3"; while ( <ROOTH> =~ /^(.+?)(:(.+?))?$/ && $#rootlist < $::maxNodes ) { my $node = $$space{$1} = {}; ${ $$node{GUID}->object } = $1 unless $2; push @rootlist, $node; } close ROOTH;
# now loop thru all empty nodes and populate from their associated files for my $node ( @rootlist ) { my $guid = $$node{GUID}->object;
# Read in nodal portion (LIST and HASH aspects) if ( open NODEH, '<', "$guid.3" ) { while ( <NODEH> =~ /^(.+?)(:(.+?))?$/ ) { $2 ? $$space{$node}{$1} = $$space{$3} : push @{ $$space{$node}->list }, $$space{$1} } close NODEH } else { &::logAdd( "Couldn't read LIST & HASH content for: $node!" ) }
# Read in SCALAR aspect (from guid.1 if small and safe) if ( /^([^*])+[\r\n]+\*/ ) { $$node->object = $1 } elsif ( open NODEH, '<', "$guid.1" ) { binmode NODEH; sysread NODEH, $$node->object, 1000000; close NODEH; } else { &::logAdd( "Couldn't read SCALAR content for: $node!" ) }
}
&::logAdd( "$#rootlist nodes nodes imported." ); }
- ----------------------------------------------------------------------------------------------------------- #
- INITIALISE NODAL SPACE
package main;
- Create global %space, tie to Nodal class
- - $space is a blessed ref to %space for executing the non-TIEHASH methods of %space
tie %space, 'NodalSpace'; $space = \%space; bless $space, 'NodalSpace';
- Load persistent content
- - this is not done in TIEHASH since only root should initialise the loading
$space->load;
- Convert $peer from name to node-ref of this peer in %space and map to SELF
my $userpage = readFile "User:$::peer"; $$space{NodalSpace::SELF} = $$space{$1} if $userpage =~ /^\*\s*Node\s*:\s*([0-9]+)/im; $$space{NodalSpace::THIS} = $space; # this husks runtime (env)
- $peer = $space;
- &::logAdd( 'GUID for this peer is '.$$peer{NodalSpace::GUID} );
- $$space{NodalSpace::SELF} = $$space{$peer};
- Set SELF's LANG to PERL
$$space{NodalSpace::SELF}{NodalSpace::LANG} = $$space{NodalSpace::PERL};
- Push peer's INIT onto root queue
- - when INIT is finished, root contains MAIN for this peer
push @{ $space->list }, $$space{NodalSpace::INIT};
- ----------------------------------------------------------------------------------------------------------- #
- PEERD
- - this is the single peerd-server loop running in its own thread
- - nodal reduction can be in here instead of each session handler
- - only stream message construct/extract needs to be in the individual session-handler threads
sub peerd {
my $port = shift;
# On startup, notify server.pl who'll preserve our handle in global scope my $server; do { $server = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp'; unless ( $server ) { logAdd "Couldn't establish connection to parent, retrying in 10 seconds."; sleep(10); } } until ( $server ); print $server "GET /connect/peer/peerd HTTP/1.1\r\n\r\n";
# Initialise streams my $select = IO::Select->new( $server ); my %streams = (); $streams{fileno $server}{handle} = $server; $streams{fileno $server}{buffer} = ;
# Main server loop (listening to server.pl and session-handlers) while(1) {
# Loop through readable streams for my $handle ( $select->can_read(1) ) { my $stream = fileno $handle;
# There is data to read on this stream, accumulate in this streams input buffer if ( sysread $handle, my $input, 10000 ) { $streams{$stream}{buffer} .= $input;
# At least one complete message has accumulated if ( $streams{$stream}{buffer} =~ s/^(.*\x00)//s ) {
# Remove each complete message from buffer and process for my $msg ( split /\x00/, $1 ) {
# Message is from server.pl: if ( $handle == $server ) { logAdd "Message from server.pl: \"$msg\"";
# Msg is a connect-request, so establish new stream back to server.pl # - the new stream-handle will be given to a newly spawned session-handler if ( $msg =~ /connect-request\/(stream[0-9]+)$/ ) { if ( my $ph = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp' ) { print $ph "GET /session/peer/peerd/$1 HTTP/1.1\r\n\r\n"; my $pstream = fileno $ph; $streams{$pstream}{handle} = $ph; $streams{$pstream}{buffer} = ; $streams{$pstream}{interface} = 1; logAdd "Connecting back to server on stream$pstream"; } else { logAdd "Failed to establish new connection with server.pl!" } }
# Not connect-request, so propagate msg to session-handlers else { logAdd "Forwarding msg to session-handlers..."; for ( keys %streams ) { my $sh = $streams{$_}{handle}; print $sh "$msg\x00" if exists $streams{$_}{interface}; logAdd "Forwarded to stream$_" if exists $streams{$_}{interface}; } } }
# Message is not from server.pl, must be from a session-handler else { logAdd "Message from a session-handler: \"$msg\""; } } } }
# There is no more data to read on this stream, so close it else { $::select->remove( $handle ); delete $streams{$stream}; $handle->close(); logAdd "Stream$stream disconnected."; }
} } }
- ----------------------------------------------------------------------------------------------------------- #
- SESSION-HANDLER
- - A separate interfaceHandler function is spawned for each peer.swf connect request
- - a session-handler is a dedicated thread sitting between nodal-wikid.pl and a peer.swf
- - todo: this is spawned by server.pl, but that should be moved into here
sub sessionHandler {
my( $peerd, $interface ) = @_; $::subname .= "/$$"; my $pstream = fileno $peerd; my $istream = fileno $interface; logAdd "Session-handler( peerd/stream$pstream, interface/stream$istream )";
# Set up a new listener for interface and parent my $select = IO::Select->new( $peerd, $interface );
# Introduce ourselves to the interface we've been assigned to handle print $interface "Hello, I'm your session-handler, my PID is $$.\x00";
# Server loop (listening to parent and interface streams) my %buffers = {}; while(1) { for my $handle ( $select->can_read(1) ) {
# One of our streams has data to read if ( sysread $handle, my $input, 10000 ) { my $stream = fileno $handle; $buffers{$stream} .= $input; if ( $buffers{$stream} =~ s/^(.*\x00)//s ) { for ( split /\x00/, $1 ) { logAdd "Msg received on stream$stream: \"$_\""; # Forward all data from peerd to interface print $interface "$_\x00" if $handle == $peerd; } } }
# If either connection close, close stream-handle and die else { $peerd->close(); $interface->close(); logAdd 'Handles closed, Exit.'; exit; }
} } } </perl>