import initial sketch of Object::Remote
Matt S Trout [Mon, 14 May 2012 06:55:31 +0000 (06:55 +0000)]
Makefile.PL [new file with mode: 0644]
bin/object-remote-node [new file with mode: 0644]
lib/Object/Remote.pm [new file with mode: 0644]
lib/Object/Remote/Connection.pm [new file with mode: 0644]
lib/Object/Remote/Connector/Local.pm [new file with mode: 0644]
lib/Object/Remote/Connector/STDIO.pm [new file with mode: 0644]
lib/Object/Remote/MiniLoop.pm [new file with mode: 0644]
lib/Object/Remote/Proxy.pm [new file with mode: 0644]

diff --git a/Makefile.PL b/Makefile.PL
new file mode 100644 (file)
index 0000000..7dcccb6
--- /dev/null
@@ -0,0 +1,16 @@
+use strict;
+use warnings FATAL => 'all';
+use ExtUtils::MakeMaker;
+
+(do 'maint/Makefile.PL.include' or die $@) unless -f 'META.yml';
+
+WriteMakefile(
+  NAME => 'Object-Remote',
+  VERSION_FROM => 'lib/Object/Remote.pm',
+  PREREQ_PM => {
+    Moo => 0,
+    'Module::Runtime' => 0,
+    'JSON::PP' => 0,
+    'CPS::Future' => 0,
+  },
+);
diff --git a/bin/object-remote-node b/bin/object-remote-node
new file mode 100644 (file)
index 0000000..d938ebc
--- /dev/null
@@ -0,0 +1,9 @@
+#!/usr/bin/env perl
+
+use strictures 1;
+use Object::Remote::Connector::STDIO;
+use Object::Remote;
+
+my $c = Object::Remote::Connector::STDIO->new->connect;
+
+Object::Remote->current_loop->run;
diff --git a/lib/Object/Remote.pm b/lib/Object/Remote.pm
new file mode 100644 (file)
index 0000000..326a3a5
--- /dev/null
@@ -0,0 +1,57 @@
+package Object::Remote;
+
+use Object::Remote::MiniLoop;
+use Object::Remote::Proxy;
+use Scalar::Util qw(weaken);
+use Moo;
+
+has connection => (is => 'ro', required => 1);
+
+has id => (is => 'rwp');
+
+has proxy => (is => 'lazy', weak_ref => 1);
+
+sub _build_proxy {
+  bless({ handle => $_[0] }, 'Object::Remote::Proxy');
+}
+
+sub BUILD {
+  my ($self, $args) = @_;
+  unless ($self->id) {
+    die "No id supplied and no class either" unless $args->{class};
+    $self->_set_id(
+      $self->_await(
+        $self->connection->send(
+          class_call => $args->{class},
+          $args->{constructor}||'new', @{$args->{args}||[]}
+        )
+      )
+    );
+  }
+  $self->connection->register_remote($self);
+}
+
+sub current_loop {
+  our $Current_Loop ||= Object::Remote::MiniLoop->new
+}
+
+sub call {
+  my ($self, $id, $method, @args) = @_;
+  $self->_await($self->connection->send(call => $self->id, $method, @args));
+}
+
+sub _await {
+  my ($self, $future) = @_;
+  my $loop = $self->current_loop;
+  $future->on_ready(sub { $loop->stop });
+  $loop->run;
+  $future->get;
+}
+
+sub DEMOLISH {
+  my ($self, $gd) = @_;
+  return if $gd;
+  $self->connection->send_free($self->id);
+}
+
+1;
diff --git a/lib/Object/Remote/Connection.pm b/lib/Object/Remote/Connection.pm
new file mode 100644 (file)
index 0000000..7a078b5
--- /dev/null
@@ -0,0 +1,172 @@
+package Object::Remote::Connection;
+
+use CPS::Future;
+use Object::Remote;
+use IO::Handle;
+use Module::Runtime qw(use_module);
+use Scalar::Util qw(weaken blessed refaddr);
+use JSON::PP qw(encode_json);
+use Moo;
+
+has send_to_fh => (
+  is => 'ro', required => 1,
+  trigger => sub { $_[1]->autoflush(1) },
+);
+
+has receive_from_fh => (
+  is => 'ro', required => 1,
+  trigger => sub {
+    my ($self, $fh) = @_;
+    weaken($self);
+    Object::Remote->current_loop
+                  ->watch_io(
+                      handle => $fh,
+                      on_read_ready => sub { $self->_receive_data_from($fh) }
+                    );
+  },
+);
+
+has _receive_data_buffer => (is => 'ro', default => sub { my $x = ''; \$x });
+
+has local_objects_by_id => (is => 'ro', default => sub { {} });
+
+has remote_objects_by_id => (is => 'ro', default => sub { {} });
+
+has _json => (
+  is => 'lazy',
+  handles => {
+    _deserialize => 'decode',
+    _encode => 'encode',
+  },
+);
+
+sub _build__json {
+  weaken(my $self = shift);
+  my $remotes = $self->remote_objects_by_id;
+  JSON::PP->new->filter_json_single_key_object(
+    __remote_object__ => sub {
+      my $id = shift;
+      (
+        $remotes->{$id}
+        or Object::Remote->new(connection => $self, id => $id)
+      )->proxy;
+    }
+  );
+}
+
+sub register_remote {
+  my ($self, $remote) = @_;
+  weaken($self->remote_objects_by_id->{$remote->id} = $remote);
+  return $remote;
+}
+
+sub send_free {
+  my ($self, $id) = @_;
+  delete $self->remote_objects_by_id->{$id};
+  $self->_send([ free => $id ]);
+}
+
+sub send {
+  my ($self, $type, @call) = @_;
+
+  unshift @call, $type => my $future = CPS::Future->new;
+
+  $self->_send(\@call);
+
+  return $future;
+}
+
+sub _send {
+  my ($self, $to_send) = @_;
+
+  print { $self->send_to_fh } $self->_serialize($to_send);
+}
+
+sub _serialize {
+  my ($self, $data) = @_;
+  local our @New_Ids;
+  eval { return $self->_encode($self->_deobjectify($data)) };
+  my $err = $@; # won't get here if the eval doesn't die
+  # don't keep refs to new things
+  delete @{$self->local_objects_by_id}{@New_Ids};
+  die "Error serializing: $err";
+}
+
+sub _deobjectify {
+  my ($self, $data) = @_;
+  if (blessed($data)) {
+    my $id = refaddr($data);
+    $self->local_objects_by_id->{$id} ||= do {
+      push our(@New_Ids), $id;
+      $data;
+    };
+    return +{ __remote_object__ => $id };
+  } elsif (my $ref = ref($data)) {
+    if ($ref eq 'HASH') {
+      return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
+    } elsif ($ref eq 'ARRAY') {
+      return [ map $self->_deobjectify($_), @$data ];
+    } else {
+      die "Can't collapse reftype $ref";
+    }
+  }
+  return $data; # plain scalar
+}
+
+sub _receive_data_from {
+  my ($self, $fh) = @_;
+  my $rb = $self->_receive_data_buffer;
+  if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
+    while ($$rb =~ s/^(.*)\n//) {
+      $self->_receive($1);
+    }
+  }
+}
+
+sub _receive {
+  my ($self, $data) = @_;
+  my ($type, @rest) = eval { @{$self->_deserialize($data)} }
+    or do { warn "Deserialize failed for ${data}: $@"; return };
+  eval { $self->${\"receive_${type}"}(@rest); 1 }
+    or do { warn "Receive failed for ${data}: $@"; return };
+  return;
+}
+
+sub receive_free {
+  my ($self, $id) = @_;
+  delete $self->local_objects_by_id->{$id};
+  return;
+}
+
+sub receive_call {
+  my ($self, $future, $id, @rest) = @_;
+  my $local = $self->local_objects_by_id->{$id}
+    or do { $future->fail("No such object $id"); return };
+  $self->_invoke($future, $local, @rest);
+}
+
+sub receive_class_call {
+  my ($self, $future, $class, @rest) = @_;
+  eval { use_module($class) }
+    or do { $future->fail("Error loading ${class}: $@"); return };
+  $self->_invoke($future, $class, @rest);
+}
+
+sub _invoke {
+  my ($self, $future, $local, $method, @args) = @_;
+  eval { $future->done($local->$method(@args)); 1 }
+    or do { $future->fail($@); return; };
+  return;
+}
+
+sub DEMOLISH {
+  my ($self, $gd) = @_;
+  return if $gd;
+  Object::Remote->current_loop
+                ->unwatch_io(
+                    handle => $self->receive_from_fh,
+                    on_read_ready => 1
+                  );
+}
+
+1;
diff --git a/lib/Object/Remote/Connector/Local.pm b/lib/Object/Remote/Connector/Local.pm
new file mode 100644 (file)
index 0000000..33f599f
--- /dev/null
@@ -0,0 +1,18 @@
+package Object::Remote::Connector::Local;
+
+use IPC::Open2;
+use Object::Remote::Connection;
+use Moo;
+
+sub connect {
+  # XXX bin/ is wrong but meh, fix later
+  my $pid = open2(my $its_stdin, my $its_stdout, 'bin/object-remote-node')
+    or die "Couldn't start local node: $!";
+  Object::Remote::Connection->new(
+    send_to_fh => $its_stdin,
+    receive_from_fh => $its_stdout,
+    child_pid => $pid
+  );
+}
+
+1;
diff --git a/lib/Object/Remote/Connector/STDIO.pm b/lib/Object/Remote/Connector/STDIO.pm
new file mode 100644 (file)
index 0000000..4ea49af
--- /dev/null
@@ -0,0 +1,24 @@
+package Object::Remote::Connector::STDIO;
+
+use File::Spec;
+use Object::Remote::Connection;
+use Moo;
+
+sub connect {
+  open my $stdin, '<&', \*STDIN or die "Duping stdin: $!";
+  open my $stdout, '>&', \*STDOUT or die "Duping stdout: $!";
+  $stdout->autoflush(1);
+  # if we don't re-open them then 0 and 1 get re-used - which is not
+  # only potentially bloody confusing but results in warnings like:
+  # "Filehandle STDOUT reopened as STDIN only for input"
+  close STDIN or die "Closing stdin: $!";
+  open STDIN, '<', File::Spec->dev_null or die "Re-opening stdin: $!";
+  close STDOUT or die "Closing stdout: $!";
+  open STDOUT, '>', File::Spec->dev_null or die "Re-opening stdout: $!";
+  Object::Remote::Connection->new(
+    send_to_fh => $stdout,
+    receive_from_fh => $stdin
+  );
+}
+
+1;
diff --git a/lib/Object/Remote/MiniLoop.pm b/lib/Object/Remote/MiniLoop.pm
new file mode 100644 (file)
index 0000000..c5025f3
--- /dev/null
@@ -0,0 +1,61 @@
+package Object::Remote::MiniLoop;
+
+use IO::Select;
+use Moo;
+
+# this is ro because we only actually set it using local in sub run
+
+has is_running => (is => 'ro', clearer => 'stop');
+
+has _read_watches => (is => 'ro', default => sub { {} });
+has _read_select => (is => 'ro', default => sub { IO::Select->new });
+
+sub pass_watches_to {
+  my ($self, $new_loop) = @_;
+  foreach my $fh ($self->_read_select->handles) {
+    $new_loop->watch_io(
+      handle => $fh,
+      on_read_ready => $self->_read_watches->{$fh}
+    );
+  }
+}
+
+sub watch_io {
+  my ($self, %watch) = @_;
+  my $fh = $watch{handle};
+  if (my $cb = $watch{on_read_ready}) {
+    $self->_read_select->add($fh);
+    $self->_read_watches->{$fh} = $cb;
+  }
+}
+
+sub unwatch_io {
+  my ($self, %watch) = @_;
+  my $fh = $watch{handle};
+  if ($watch{on_read_ready}) {
+    $self->_read_select->remove($fh);
+    delete $self->_read_watches->{$fh};
+  }
+}
+
+sub loop_once {
+  my ($self) = @_;
+  my $read = $self->_read_watches;
+  my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5);
+  # I would love to trap errors in the select call but IO::Select doesn't
+  # differentiate between an error and a timeout.
+  #   -- no, love, mst.
+  foreach my $fh (@$readable) {
+    $read->{$fh}();
+  }
+}
+
+sub run {
+  my ($self) = @_;
+  local $self->{is_running} = 1;
+  while ($self->is_running) {
+    $self->loop_once;
+  }
+}
+
+1;
diff --git a/lib/Object/Remote/Proxy.pm b/lib/Object/Remote/Proxy.pm
new file mode 100644 (file)
index 0000000..b2ae3c6
--- /dev/null
@@ -0,0 +1,13 @@
+package Object::Remote::Proxy;
+
+use strictures 1;
+
+sub AUTOLOAD {
+  my $self = shift;
+  (my $method) = (our $AUTOLOAD =~ /([^:]+)$/);
+  $self->{handle}->call($method => @_);
+}
+
+sub DESTROY { }
+
+1;