Livelets.pl
From Organic Design wiki
#!/usr/bin/perl
# LiveWiki Extension
# - See http://www.mediawiki.org/wiki/Extension:LiveWiki for installation and usage details
# - Licenced under LGPL (http://www.gnu.org/copyleft/lesser.html)
use IO::Socket;
use IO::Select;
use MIME::Base64;
# Daemonise
open STDIN, '/dev/null';
open STDOUT, ">>../$::peer.log";
open STDERR, ">>../$::peer.log";
defined ( my $pid = fork ) or die "Can't fork: $!";
exit if $pid;
setsid or die "Can't start a new session: $!";
umask 0;
# ----------------------------------------------------------------------------------------------------------- #
# SOCKET SERVER
%::streams = ();
$::port = shift or 80;
$0 = "$::daemon: $::peer (http$::port)";
my $subname = $::subname;
# Initialise server listening on our port
do { unless ( $::server = new IO::Socket::INET
Listen => 1,
LocalPort => $::port,
Proto => 'tcp',
ReuseAddr => 1 ) {
logAdd "Failed to bind to Port$::port, waiting 10 seconds...";
sleep(10);
}
} until ( $::server );
$::select = new IO::Select $::server;
logAdd "Listening on port $::port";
# Main server loop
while(1) {
for my $handle ( $::select->can_read(1) ) {
my $stream = fileno $handle;
$::subname = "$subname/stream$stream";
# Handle is the server, set up a new stream
if ( $handle == $::server ) {
my $newhandle = $::server->accept;
$stream = fileno $newhandle;
$::select->add( $newhandle );
$::streams{$stream}{buffer} = '';
$::streams{$stream}{handle} = $newhandle;
logAdd "New connection: Stream$stream", 'main/new';
}
# Handle is an existing stream with data to read
# NOTE: we should disconnect after certain size limit
# - Process (and remove) all complete messages from this peer's buffer
elsif ( sysread $handle, my $input, 10000 ) {
$::streams{$stream}{buffer} .= $input;
if ( $::streams{$stream}{buffer} =~ s/^(.*\r?\n\r?\n\x00?)//s ) {
processMessage( $stream, $_ ) for split /\r?\n\r?\n\x00?/, $1;
}
}
# Handle is an existing stream with no more data to read
else {
$::select->remove( $handle );
delete $::streams{$stream};
$handle->close();
logAdd "Stream$stream disconnected.";
}
}
}
}
# ----------------------------------------------------------------------------------------------------------- #
# PROCESS INCOMING MESSAGE
sub processMessage {
my ( $stream, $msg ) = @_;
my $handle = $::streams{$stream}{handle};
my $headers = '';
my $http = '200 OK';
my $respond = 1;
my $date = strftime "%a, %d %b %Y %H:%M:%S %Z", localtime;
my $article = $::deny;
# Extract info from the HTTP GET request
my ( $title, $ct, $ver ) =
$msg =~ /GET\s+\/+(.*?)(\/(.+))?\s+(HTTP\/[0-9.]+)/ ? ( $1, $3, $4 ) : ( $peer, '', 'HTTP/1.1' );
# If request authenticates service it, else return 401
if ( $ct =~ /^cmd/ ? ( $msg =~ /Authorization: Basic (\w+)/ and decode_base64($1) eq "$::peer:$::pwd1" ) : 1 ) {
# Process a command
if ( $ct =~ /^cmd(.*)/ ) {
logAdd "Executing $title command.";
$article = '<pre>'.command($title).'</pre>';
$ct = '' if $1 ne '/raw';
}
# Process request from other peer or peer-related process
# - doesn't return an HTTP response
elsif ( $ct =~ /^peer\/(.*)/ ) {
my $type = $1;
logAdd "PEER command \"$title\" received from an instance of \"$type\"";
# peerd-child is doing initial connection, keep handle in global scope
# - peerd could talk back here over this stream, but doesn't currently
if ( $title eq 'connect' and $type eq 'peerd' ) {
$::peerdHandle = $handle;
}
# Forward any commands from XmlWiki to peerd unchanged
elsif ( $type eq 'notify-peer.php' ) {
print $::peerdHandle "$title/$type\x00" if defined $::peerdHandle;
}
# SWF initiating connection, ask peerd to connect back on a new stream for a session
elsif ( $title eq 'connect' and $type eq 'interface' ) {
# Send stream-id of swf, so we know which one is wanted by the connection back from peerd
print $::peerdHandle "connect-request/stream$stream\x00" if defined $::peerdHandle;
# Remove the handle from this (server.pl) context, but don't close it
# - also keep our local %stream info to give to session-handler
$::select->remove( $handle );
}
# peerd is connecting back from session request, so we can now spawn the session-handler
elsif ( $title eq 'session' and $type =~ /^peerd\/stream([0-9]+)/ ) {
# Spawn a new session-handler child and pass the peerd and peer.swf stream-handles
spawn 'sessionHandler', $handle, $::streams{$1}{handle};
# We can remove the interface from %streams now too
delete $::streams{$1};
}
# These peer requests don't want HTTP returned currently
$respond = 0;
}
# Process request for raw SWF
elsif ( $ct eq 'application/x-shockwave-flash' ) {
$headers .= "Content-Disposition: inline;filename=$::peer.swf\r\n";
logAdd "SWF movie \"$title\" requested";
$article = readFile $title;
}
# Process request for SWF embedded in HTML
elsif ( $ct eq 'swf' ) {
my $width = 640;
my $height = 480;
my $file = "/$::peer.swf/application/x-shockwave-flash";
$article = "<object classid=\"clsid:D27CDB6E-AE6D-11cf-96B8-444553540000\"
codebase=\"http://download.macromedia.com/pub/shockwave/cabs/flash/swflash.cab#version=6,0,0,0\"
width=\"$width\" height=\"$height\" id=\"$file\" align=\"\"
type=\"application/x-shockwave-flash\" data=\"$file\">
<param name=\"movie\" value=\"$file\">
<param name=\"quality\" value=\"high\">
<param name=\"bgcolor\" value=\"cccccc\">
<embed src=\"$file\" quality=\"high\" bgcolor=\"cccccc\"
width=\"$width\" height=\"$height\" align=\"\"
name=\"$file\" type=\"application/x-shockwave-flash\"
pluginspage=\"http://www.macromedia.com/go/getflashplayer\" />
</object>";
$ct = '';
}
# If still non processed, treat request as local filename to return
else {
logAdd "$title($ct) requested";
$article = readFile $title;
}
# Render article
unless ( $ct ) {
$ct = 'text/html';
my $tmp = $::template;
$tmp =~ s/<!-- title -->/$title/g;
$tmp =~ s/<!-- content -->/ wikiParse $article /e;
$article = $tmp;
}
}
else { $http = "401 Authorization Required\r\nWWW-Authenticate: Basic realm=\"private\"" }
# Send response back to requestor
if ( $respond ) {
$headers = "$ver $http\r\nDate: $date\r\nServer: $::daemon\r\n$headers";
$headers .= "Content-Type: $ct\r\n";
$headers .= "Connection: close\r\n";
$headers .= "Content-Length: ".(length $article)."\r\n";
print $handle "$headers\r\n$article";
}
# ----------------------------------------------------------------------------------------------------------- #
# STREAM THREAD
# - this is the single peerd-server loop running in its own thread
# - nodal reduction can be in here instead of each session handler
# - only stream message construct/extract needs to be in the individual session-handler threads
sub peerd {
my $port = shift;
# On startup, notify server.pl who'll preserve our handle in global scope
my $server;
do {
$server = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp';
unless ( $server ) {
logAdd "Couldn't establish connection to parent, retrying in 10 seconds.";
sleep(10);
}
} until ( $server );
print $server "GET /connect/peer/peerd HTTP/1.1\r\n\r\n";
# Initialise streams
my $select = IO::Select->new( $server );
my %streams = ();
$streams{fileno $server}{handle} = $server;
$streams{fileno $server}{buffer} = '';
# Main server loop (listening to server.pl and session-handlers)
while(1) {
# Loop through readable streams
for my $handle ( $select->can_read(1) ) {
my $stream = fileno $handle;
# There is data to read on this stream, accumulate in this streams input buffer
if ( sysread $handle, my $input, 10000 ) {
$streams{$stream}{buffer} .= $input;
# At least one complete message has accumulated
if ( $streams{$stream}{buffer} =~ s/^(.*\x00)//s ) {
# Remove each complete message from buffer and process
for my $msg ( split /\x00/, $1 ) {
# Message is from server.pl:
if ( $handle == $server ) {
logAdd "Message from server.pl: \"$msg\"";
# Msg is a connect-request, so establish new stream back to server.pl
# - the new stream-handle will be given to a newly spawned session-handler
if ( $msg =~ /connect-request\/(stream[0-9]+)$/ ) {
if ( my $ph = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp' ) {
print $ph "GET /session/peer/peerd/$1 HTTP/1.1\r\n\r\n";
my $pstream = fileno $ph;
$streams{$pstream}{handle} = $ph;
$streams{$pstream}{buffer} = '';
$streams{$pstream}{interface} = 1;
logAdd "Connecting back to server on stream$pstream";
}
else { logAdd "Failed to establish new connection with server.pl!" }
}
# Not connect-request, so propagate msg to session-handlers
else {
logAdd "Forwarding msg to session-handlers...";
for ( keys %streams ) {
my $sh = $streams{$_}{handle};
print $sh "$msg\x00" if exists $streams{$_}{interface};
logAdd "Forwarded to stream$_" if exists $streams{$_}{interface};
}
}
}
# Message is not from server.pl, must be from a session-handler
else {
logAdd "Message from a session-handler: \"$msg\"";
}
}
}
}
# There is no more data to read on this stream, so close it
else {
$::select->remove( $handle );
delete $streams{$stream};
$handle->close();
logAdd "Stream$stream disconnected.";
}
}
}
}
# ----------------------------------------------------------------------------------------------------------- #
# SESSION-HANDLER
# - A separate interfaceHandler function is spawned for each peer.swf connect request
# - a session-handler is a dedicated thread sitting between nodal-wikid.pl and a peer.swf
# - todo: this is spawned by server.pl, but that should be moved into here
sub sessionHandler {
my( $peerd, $interface ) = @_;
$::subname .= "/$$";
my $pstream = fileno $peerd;
my $istream = fileno $interface;
logAdd "Session-handler( peerd/stream$pstream, interface/stream$istream )";
# Set up a new listener for interface and parent
my $select = IO::Select->new( $peerd, $interface );
# Introduce ourselves to the interface we've been assigned to handle
print $interface "Hello, I'm your session-handler, my PID is $$.\x00";
# Server loop (listening to parent and interface streams)
my %buffers = {};
while(1) {
for my $handle ( $select->can_read(1) ) {
# One of our streams has data to read
if ( sysread $handle, my $input, 10000 ) {
my $stream = fileno $handle;
$buffers{$stream} .= $input;
if ( $buffers{$stream} =~ s/^(.*\x00)//s ) {
for ( split /\x00/, $1 ) {
logAdd "Msg received on stream$stream: \"$_\"";
# Forward all data from peerd to interface
print $interface "$_\x00" if $handle == $peerd;
}
}
}
# If either connection close, close stream-handle and die
else {
$peerd->close();
$interface->close();
logAdd 'Handles closed, Exit.';
exit;
}
}
}
}