From: Matt S Trout Date: Mon, 14 May 2012 06:55:31 +0000 (+0000) Subject: import initial sketch of Object::Remote X-Git-Tag: v0.001001~66 X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=9e72f0cf54e92bccdba71eb75037f1cfe4f69f36;p=scpubgit%2FObject-Remote.git import initial sketch of Object::Remote --- 9e72f0cf54e92bccdba71eb75037f1cfe4f69f36 diff --git a/Makefile.PL b/Makefile.PL new file mode 100644 index 0000000..7dcccb6 --- /dev/null +++ b/Makefile.PL @@ -0,0 +1,16 @@ +use strict; +use warnings FATAL => 'all'; +use ExtUtils::MakeMaker; + +(do 'maint/Makefile.PL.include' or die $@) unless -f 'META.yml'; + +WriteMakefile( + NAME => 'Object-Remote', + VERSION_FROM => 'lib/Object/Remote.pm', + PREREQ_PM => { + Moo => 0, + 'Module::Runtime' => 0, + 'JSON::PP' => 0, + 'CPS::Future' => 0, + }, +); diff --git a/bin/object-remote-node b/bin/object-remote-node new file mode 100644 index 0000000..d938ebc --- /dev/null +++ b/bin/object-remote-node @@ -0,0 +1,9 @@ +#!/usr/bin/env perl + +use strictures 1; +use Object::Remote::Connector::STDIO; +use Object::Remote; + +my $c = Object::Remote::Connector::STDIO->new->connect; + +Object::Remote->current_loop->run; diff --git a/lib/Object/Remote.pm b/lib/Object/Remote.pm new file mode 100644 index 0000000..326a3a5 --- /dev/null +++ b/lib/Object/Remote.pm @@ -0,0 +1,57 @@ +package Object::Remote; + +use Object::Remote::MiniLoop; +use Object::Remote::Proxy; +use Scalar::Util qw(weaken); +use Moo; + +has connection => (is => 'ro', required => 1); + +has id => (is => 'rwp'); + +has proxy => (is => 'lazy', weak_ref => 1); + +sub _build_proxy { + bless({ handle => $_[0] }, 'Object::Remote::Proxy'); +} + +sub BUILD { + my ($self, $args) = @_; + unless ($self->id) { + die "No id supplied and no class either" unless $args->{class}; + $self->_set_id( + $self->_await( + $self->connection->send( + class_call => $args->{class}, + $args->{constructor}||'new', @{$args->{args}||[]} + ) + ) + ); + } + $self->connection->register_remote($self); +} + +sub current_loop { + our $Current_Loop ||= Object::Remote::MiniLoop->new +} + +sub call { + my ($self, $id, $method, @args) = @_; + $self->_await($self->connection->send(call => $self->id, $method, @args)); +} + +sub _await { + my ($self, $future) = @_; + my $loop = $self->current_loop; + $future->on_ready(sub { $loop->stop }); + $loop->run; + $future->get; +} + +sub DEMOLISH { + my ($self, $gd) = @_; + return if $gd; + $self->connection->send_free($self->id); +} + +1; diff --git a/lib/Object/Remote/Connection.pm b/lib/Object/Remote/Connection.pm new file mode 100644 index 0000000..7a078b5 --- /dev/null +++ b/lib/Object/Remote/Connection.pm @@ -0,0 +1,172 @@ +package Object::Remote::Connection; + +use CPS::Future; +use Object::Remote; +use IO::Handle; +use Module::Runtime qw(use_module); +use Scalar::Util qw(weaken blessed refaddr); +use JSON::PP qw(encode_json); +use Moo; + +has send_to_fh => ( + is => 'ro', required => 1, + trigger => sub { $_[1]->autoflush(1) }, +); + +has receive_from_fh => ( + is => 'ro', required => 1, + trigger => sub { + my ($self, $fh) = @_; + weaken($self); + Object::Remote->current_loop + ->watch_io( + handle => $fh, + on_read_ready => sub { $self->_receive_data_from($fh) } + ); + }, +); + +has _receive_data_buffer => (is => 'ro', default => sub { my $x = ''; \$x }); + +has local_objects_by_id => (is => 'ro', default => sub { {} }); + +has remote_objects_by_id => (is => 'ro', default => sub { {} }); + +has _json => ( + is => 'lazy', + handles => { + _deserialize => 'decode', + _encode => 'encode', + }, +); + +sub _build__json { + weaken(my $self = shift); + my $remotes = $self->remote_objects_by_id; + JSON::PP->new->filter_json_single_key_object( + __remote_object__ => sub { + my $id = shift; + ( + $remotes->{$id} + or Object::Remote->new(connection => $self, id => $id) + )->proxy; + } + ); +} + +sub register_remote { + my ($self, $remote) = @_; + weaken($self->remote_objects_by_id->{$remote->id} = $remote); + return $remote; +} + +sub send_free { + my ($self, $id) = @_; + delete $self->remote_objects_by_id->{$id}; + $self->_send([ free => $id ]); +} + +sub send { + my ($self, $type, @call) = @_; + + unshift @call, $type => my $future = CPS::Future->new; + + $self->_send(\@call); + + return $future; +} + +sub _send { + my ($self, $to_send) = @_; + + print { $self->send_to_fh } $self->_serialize($to_send); +} + +sub _serialize { + my ($self, $data) = @_; + local our @New_Ids; + eval { return $self->_encode($self->_deobjectify($data)) }; + my $err = $@; # won't get here if the eval doesn't die + # don't keep refs to new things + delete @{$self->local_objects_by_id}{@New_Ids}; + die "Error serializing: $err"; +} + +sub _deobjectify { + my ($self, $data) = @_; + if (blessed($data)) { + my $id = refaddr($data); + $self->local_objects_by_id->{$id} ||= do { + push our(@New_Ids), $id; + $data; + }; + return +{ __remote_object__ => $id }; + } elsif (my $ref = ref($data)) { + if ($ref eq 'HASH') { + return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data }; + } elsif ($ref eq 'ARRAY') { + return [ map $self->_deobjectify($_), @$data ]; + } else { + die "Can't collapse reftype $ref"; + } + } + return $data; # plain scalar +} + +sub _receive_data_from { + my ($self, $fh) = @_; + my $rb = $self->_receive_data_buffer; + if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) { + while ($$rb =~ s/^(.*)\n//) { + $self->_receive($1); + } + } +} + +sub _receive { + my ($self, $data) = @_; + my ($type, @rest) = eval { @{$self->_deserialize($data)} } + or do { warn "Deserialize failed for ${data}: $@"; return }; + eval { $self->${\"receive_${type}"}(@rest); 1 } + or do { warn "Receive failed for ${data}: $@"; return }; + return; +} + +sub receive_free { + my ($self, $id) = @_; + delete $self->local_objects_by_id->{$id}; + return; +} + +sub receive_call { + my ($self, $future, $id, @rest) = @_; + my $local = $self->local_objects_by_id->{$id} + or do { $future->fail("No such object $id"); return }; + $self->_invoke($future, $local, @rest); +} + +sub receive_class_call { + my ($self, $future, $class, @rest) = @_; + eval { use_module($class) } + or do { $future->fail("Error loading ${class}: $@"); return }; + $self->_invoke($future, $class, @rest); +} + +sub _invoke { + my ($self, $future, $local, $method, @args) = @_; + eval { $future->done($local->$method(@args)); 1 } + or do { $future->fail($@); return; }; + return; +} + +sub DEMOLISH { + my ($self, $gd) = @_; + return if $gd; + Object::Remote->current_loop + ->unwatch_io( + handle => $self->receive_from_fh, + on_read_ready => 1 + ); +} + +1; diff --git a/lib/Object/Remote/Connector/Local.pm b/lib/Object/Remote/Connector/Local.pm new file mode 100644 index 0000000..33f599f --- /dev/null +++ b/lib/Object/Remote/Connector/Local.pm @@ -0,0 +1,18 @@ +package Object::Remote::Connector::Local; + +use IPC::Open2; +use Object::Remote::Connection; +use Moo; + +sub connect { + # XXX bin/ is wrong but meh, fix later + my $pid = open2(my $its_stdin, my $its_stdout, 'bin/object-remote-node') + or die "Couldn't start local node: $!"; + Object::Remote::Connection->new( + send_to_fh => $its_stdin, + receive_from_fh => $its_stdout, + child_pid => $pid + ); +} + +1; diff --git a/lib/Object/Remote/Connector/STDIO.pm b/lib/Object/Remote/Connector/STDIO.pm new file mode 100644 index 0000000..4ea49af --- /dev/null +++ b/lib/Object/Remote/Connector/STDIO.pm @@ -0,0 +1,24 @@ +package Object::Remote::Connector::STDIO; + +use File::Spec; +use Object::Remote::Connection; +use Moo; + +sub connect { + open my $stdin, '<&', \*STDIN or die "Duping stdin: $!"; + open my $stdout, '>&', \*STDOUT or die "Duping stdout: $!"; + $stdout->autoflush(1); + # if we don't re-open them then 0 and 1 get re-used - which is not + # only potentially bloody confusing but results in warnings like: + # "Filehandle STDOUT reopened as STDIN only for input" + close STDIN or die "Closing stdin: $!"; + open STDIN, '<', File::Spec->dev_null or die "Re-opening stdin: $!"; + close STDOUT or die "Closing stdout: $!"; + open STDOUT, '>', File::Spec->dev_null or die "Re-opening stdout: $!"; + Object::Remote::Connection->new( + send_to_fh => $stdout, + receive_from_fh => $stdin + ); +} + +1; diff --git a/lib/Object/Remote/MiniLoop.pm b/lib/Object/Remote/MiniLoop.pm new file mode 100644 index 0000000..c5025f3 --- /dev/null +++ b/lib/Object/Remote/MiniLoop.pm @@ -0,0 +1,61 @@ +package Object::Remote::MiniLoop; + +use IO::Select; +use Moo; + +# this is ro because we only actually set it using local in sub run + +has is_running => (is => 'ro', clearer => 'stop'); + +has _read_watches => (is => 'ro', default => sub { {} }); +has _read_select => (is => 'ro', default => sub { IO::Select->new }); + +sub pass_watches_to { + my ($self, $new_loop) = @_; + foreach my $fh ($self->_read_select->handles) { + $new_loop->watch_io( + handle => $fh, + on_read_ready => $self->_read_watches->{$fh} + ); + } +} + +sub watch_io { + my ($self, %watch) = @_; + my $fh = $watch{handle}; + if (my $cb = $watch{on_read_ready}) { + $self->_read_select->add($fh); + $self->_read_watches->{$fh} = $cb; + } +} + +sub unwatch_io { + my ($self, %watch) = @_; + my $fh = $watch{handle}; + if ($watch{on_read_ready}) { + $self->_read_select->remove($fh); + delete $self->_read_watches->{$fh}; + } +} + +sub loop_once { + my ($self) = @_; + my $read = $self->_read_watches; + my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5); + # I would love to trap errors in the select call but IO::Select doesn't + # differentiate between an error and a timeout. + # -- no, love, mst. + foreach my $fh (@$readable) { + $read->{$fh}(); + } +} + +sub run { + my ($self) = @_; + local $self->{is_running} = 1; + while ($self->is_running) { + $self->loop_once; + } +} + +1; diff --git a/lib/Object/Remote/Proxy.pm b/lib/Object/Remote/Proxy.pm new file mode 100644 index 0000000..b2ae3c6 --- /dev/null +++ b/lib/Object/Remote/Proxy.pm @@ -0,0 +1,13 @@ +package Object::Remote::Proxy; + +use strictures 1; + +sub AUTOLOAD { + my $self = shift; + (my $method) = (our $AUTOLOAD =~ /([^:]+)$/); + $self->{handle}->call($method => @_); +} + +sub DESTROY { } + +1;