--- /dev/null
+use strict;
+use warnings FATAL => 'all';
+use ExtUtils::MakeMaker;
+
+(do 'maint/Makefile.PL.include' or die $@) unless -f 'META.yml';
+
+WriteMakefile(
+ NAME => 'Object-Remote',
+ VERSION_FROM => 'lib/Object/Remote.pm',
+ PREREQ_PM => {
+ Moo => 0,
+ 'Module::Runtime' => 0,
+ 'JSON::PP' => 0,
+ 'CPS::Future' => 0,
+ },
+);
--- /dev/null
+#!/usr/bin/env perl
+
+use strictures 1;
+use Object::Remote::Connector::STDIO;
+use Object::Remote;
+
+my $c = Object::Remote::Connector::STDIO->new->connect;
+
+Object::Remote->current_loop->run;
--- /dev/null
+package Object::Remote;
+
+use Object::Remote::MiniLoop;
+use Object::Remote::Proxy;
+use Scalar::Util qw(weaken);
+use Moo;
+
+has connection => (is => 'ro', required => 1);
+
+has id => (is => 'rwp');
+
+has proxy => (is => 'lazy', weak_ref => 1);
+
+sub _build_proxy {
+ bless({ handle => $_[0] }, 'Object::Remote::Proxy');
+}
+
+sub BUILD {
+ my ($self, $args) = @_;
+ unless ($self->id) {
+ die "No id supplied and no class either" unless $args->{class};
+ $self->_set_id(
+ $self->_await(
+ $self->connection->send(
+ class_call => $args->{class},
+ $args->{constructor}||'new', @{$args->{args}||[]}
+ )
+ )
+ );
+ }
+ $self->connection->register_remote($self);
+}
+
+sub current_loop {
+ our $Current_Loop ||= Object::Remote::MiniLoop->new
+}
+
+sub call {
+ my ($self, $id, $method, @args) = @_;
+ $self->_await($self->connection->send(call => $self->id, $method, @args));
+}
+
+sub _await {
+ my ($self, $future) = @_;
+ my $loop = $self->current_loop;
+ $future->on_ready(sub { $loop->stop });
+ $loop->run;
+ $future->get;
+}
+
+sub DEMOLISH {
+ my ($self, $gd) = @_;
+ return if $gd;
+ $self->connection->send_free($self->id);
+}
+
+1;
--- /dev/null
+package Object::Remote::Connection;
+
+use CPS::Future;
+use Object::Remote;
+use IO::Handle;
+use Module::Runtime qw(use_module);
+use Scalar::Util qw(weaken blessed refaddr);
+use JSON::PP qw(encode_json);
+use Moo;
+
+has send_to_fh => (
+ is => 'ro', required => 1,
+ trigger => sub { $_[1]->autoflush(1) },
+);
+
+has receive_from_fh => (
+ is => 'ro', required => 1,
+ trigger => sub {
+ my ($self, $fh) = @_;
+ weaken($self);
+ Object::Remote->current_loop
+ ->watch_io(
+ handle => $fh,
+ on_read_ready => sub { $self->_receive_data_from($fh) }
+ );
+ },
+);
+
+has _receive_data_buffer => (is => 'ro', default => sub { my $x = ''; \$x });
+
+has local_objects_by_id => (is => 'ro', default => sub { {} });
+
+has remote_objects_by_id => (is => 'ro', default => sub { {} });
+
+has _json => (
+ is => 'lazy',
+ handles => {
+ _deserialize => 'decode',
+ _encode => 'encode',
+ },
+);
+
+sub _build__json {
+ weaken(my $self = shift);
+ my $remotes = $self->remote_objects_by_id;
+ JSON::PP->new->filter_json_single_key_object(
+ __remote_object__ => sub {
+ my $id = shift;
+ (
+ $remotes->{$id}
+ or Object::Remote->new(connection => $self, id => $id)
+ )->proxy;
+ }
+ );
+}
+
+sub register_remote {
+ my ($self, $remote) = @_;
+ weaken($self->remote_objects_by_id->{$remote->id} = $remote);
+ return $remote;
+}
+
+sub send_free {
+ my ($self, $id) = @_;
+ delete $self->remote_objects_by_id->{$id};
+ $self->_send([ free => $id ]);
+}
+
+sub send {
+ my ($self, $type, @call) = @_;
+
+ unshift @call, $type => my $future = CPS::Future->new;
+
+ $self->_send(\@call);
+
+ return $future;
+}
+
+sub _send {
+ my ($self, $to_send) = @_;
+
+ print { $self->send_to_fh } $self->_serialize($to_send);
+}
+
+sub _serialize {
+ my ($self, $data) = @_;
+ local our @New_Ids;
+ eval { return $self->_encode($self->_deobjectify($data)) };
+ my $err = $@; # won't get here if the eval doesn't die
+ # don't keep refs to new things
+ delete @{$self->local_objects_by_id}{@New_Ids};
+ die "Error serializing: $err";
+}
+
+sub _deobjectify {
+ my ($self, $data) = @_;
+ if (blessed($data)) {
+ my $id = refaddr($data);
+ $self->local_objects_by_id->{$id} ||= do {
+ push our(@New_Ids), $id;
+ $data;
+ };
+ return +{ __remote_object__ => $id };
+ } elsif (my $ref = ref($data)) {
+ if ($ref eq 'HASH') {
+ return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
+ } elsif ($ref eq 'ARRAY') {
+ return [ map $self->_deobjectify($_), @$data ];
+ } else {
+ die "Can't collapse reftype $ref";
+ }
+ }
+ return $data; # plain scalar
+}
+
+sub _receive_data_from {
+ my ($self, $fh) = @_;
+ my $rb = $self->_receive_data_buffer;
+ if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
+ while ($$rb =~ s/^(.*)\n//) {
+ $self->_receive($1);
+ }
+ }
+}
+
+sub _receive {
+ my ($self, $data) = @_;
+ my ($type, @rest) = eval { @{$self->_deserialize($data)} }
+ or do { warn "Deserialize failed for ${data}: $@"; return };
+ eval { $self->${\"receive_${type}"}(@rest); 1 }
+ or do { warn "Receive failed for ${data}: $@"; return };
+ return;
+}
+
+sub receive_free {
+ my ($self, $id) = @_;
+ delete $self->local_objects_by_id->{$id};
+ return;
+}
+
+sub receive_call {
+ my ($self, $future, $id, @rest) = @_;
+ my $local = $self->local_objects_by_id->{$id}
+ or do { $future->fail("No such object $id"); return };
+ $self->_invoke($future, $local, @rest);
+}
+
+sub receive_class_call {
+ my ($self, $future, $class, @rest) = @_;
+ eval { use_module($class) }
+ or do { $future->fail("Error loading ${class}: $@"); return };
+ $self->_invoke($future, $class, @rest);
+}
+
+sub _invoke {
+ my ($self, $future, $local, $method, @args) = @_;
+ eval { $future->done($local->$method(@args)); 1 }
+ or do { $future->fail($@); return; };
+ return;
+}
+
+sub DEMOLISH {
+ my ($self, $gd) = @_;
+ return if $gd;
+ Object::Remote->current_loop
+ ->unwatch_io(
+ handle => $self->receive_from_fh,
+ on_read_ready => 1
+ );
+}
+
+1;
--- /dev/null
+package Object::Remote::Connector::Local;
+
+use IPC::Open2;
+use Object::Remote::Connection;
+use Moo;
+
+sub connect {
+ # XXX bin/ is wrong but meh, fix later
+ my $pid = open2(my $its_stdin, my $its_stdout, 'bin/object-remote-node')
+ or die "Couldn't start local node: $!";
+ Object::Remote::Connection->new(
+ send_to_fh => $its_stdin,
+ receive_from_fh => $its_stdout,
+ child_pid => $pid
+ );
+}
+
+1;
--- /dev/null
+package Object::Remote::Connector::STDIO;
+
+use File::Spec;
+use Object::Remote::Connection;
+use Moo;
+
+sub connect {
+ open my $stdin, '<&', \*STDIN or die "Duping stdin: $!";
+ open my $stdout, '>&', \*STDOUT or die "Duping stdout: $!";
+ $stdout->autoflush(1);
+ # if we don't re-open them then 0 and 1 get re-used - which is not
+ # only potentially bloody confusing but results in warnings like:
+ # "Filehandle STDOUT reopened as STDIN only for input"
+ close STDIN or die "Closing stdin: $!";
+ open STDIN, '<', File::Spec->dev_null or die "Re-opening stdin: $!";
+ close STDOUT or die "Closing stdout: $!";
+ open STDOUT, '>', File::Spec->dev_null or die "Re-opening stdout: $!";
+ Object::Remote::Connection->new(
+ send_to_fh => $stdout,
+ receive_from_fh => $stdin
+ );
+}
+
+1;
--- /dev/null
+package Object::Remote::MiniLoop;
+
+use IO::Select;
+use Moo;
+
+# this is ro because we only actually set it using local in sub run
+
+has is_running => (is => 'ro', clearer => 'stop');
+
+has _read_watches => (is => 'ro', default => sub { {} });
+has _read_select => (is => 'ro', default => sub { IO::Select->new });
+
+sub pass_watches_to {
+ my ($self, $new_loop) = @_;
+ foreach my $fh ($self->_read_select->handles) {
+ $new_loop->watch_io(
+ handle => $fh,
+ on_read_ready => $self->_read_watches->{$fh}
+ );
+ }
+}
+
+sub watch_io {
+ my ($self, %watch) = @_;
+ my $fh = $watch{handle};
+ if (my $cb = $watch{on_read_ready}) {
+ $self->_read_select->add($fh);
+ $self->_read_watches->{$fh} = $cb;
+ }
+}
+
+sub unwatch_io {
+ my ($self, %watch) = @_;
+ my $fh = $watch{handle};
+ if ($watch{on_read_ready}) {
+ $self->_read_select->remove($fh);
+ delete $self->_read_watches->{$fh};
+ }
+}
+
+sub loop_once {
+ my ($self) = @_;
+ my $read = $self->_read_watches;
+ my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5);
+ # I would love to trap errors in the select call but IO::Select doesn't
+ # differentiate between an error and a timeout.
+ # -- no, love, mst.
+ foreach my $fh (@$readable) {
+ $read->{$fh}();
+ }
+}
+
+sub run {
+ my ($self) = @_;
+ local $self->{is_running} = 1;
+ while ($self->is_running) {
+ $self->loop_once;
+ }
+}
+
+1;
--- /dev/null
+package Object::Remote::Proxy;
+
+use strictures 1;
+
+sub AUTOLOAD {
+ my $self = shift;
+ (my $method) = (our $AUTOLOAD =~ /([^:]+)$/);
+ $self->{handle}->call($method => @_);
+}
+
+sub DESTROY { }
+
+1;