fix bug where dead connections would not execute cleanup code; add in repeating timer...
[scpubgit/Object-Remote.git] / lib / Object / Remote / Connection.pm
index b051b0b..4d46618 100644 (file)
@@ -7,11 +7,12 @@ use Object::Remote::CodeContainer;
 use Object::Remote::GlobProxy;
 use Object::Remote::GlobContainer;
 use Object::Remote::Logging qw (:log :dlog);
+use Object::Remote::Tied;
 use Object::Remote;
 use Symbol;
 use IO::Handle;
 use Module::Runtime qw(use_module);
-use Scalar::Util qw(weaken blessed refaddr);
+use Scalar::Util qw(weaken blessed refaddr openhandle);
 use JSON::PP qw(encode_json);
 use Moo;
 
@@ -24,28 +25,47 @@ has _id => ( is => 'ro', required => 1, default => sub { our $NEXT_CONNECTION_ID
 
 has send_to_fh => (
   is => 'ro', required => 1,
-  trigger => sub { $_[1]->autoflush(1) },
+  trigger => sub {
+      my $self = $_[0];
+      $_[1]->autoflush(1);
+      Dlog_trace { my $id = $self->_id; "connection had send_to_fh set to $_"  } $_[1];
+  },
 );
 
 has read_channel => (
   is => 'ro', required => 1,
   trigger => sub {
     my ($self, $ch) = @_;
-    Dlog_trace { "trigger for read_channel has been invoked for connection $_" } $self->_id;
+    my $id = $self->_id; 
+    Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh; 
     weaken($self);
     $ch->on_line_call(sub { $self->_receive(@_) });
-    $ch->on_close_call(sub { $self->on_close->done(@_) });
+    $ch->on_close_call(sub { 
+        log_trace { "invoking 'done' on on_close handler for connection id '$id'" }; 
+        $self->on_close->done(@_);
+    });
   },
 );
 
+#TODO properly fix this bug -
+#trigger can't ever be invoked with a default
+#value and the on_close attribute is read only....
+#the future never gets the on_done handler
+#installed 
+sub BUILD {
+  my ($self) = @_; 
+  $self->on_close(CPS::Future->new);  
+}
+
 has on_close => (
-  is => 'ro', default => sub { CPS::Future->new },
+  is => 'rw', default => sub { CPS::Future->new },
   trigger => sub {
     my ($self, $f) = @_;
     Dlog_trace { "trigger for on_close has been invoked for connection $_" } $self->_id;
     weaken($self);
     $f->on_done(sub {
-      $self->_fail_outstanding("Connection lost: ".($f->get)[0]);
+      Dlog_trace { "failing all of the outstanding futures for connection $_" } $self->_id;
+      $self->_fail_outstanding("Connection lost: " . ($f->get)[0]);
     });
   }
 );
@@ -118,7 +138,19 @@ sub _build__json {
     __local_object__ => sub {
       $self->local_objects_by_id->{$_[0]}
     }
-  );
+  )->filter_json_single_key_object(
+    __remote_tied_hash__ => sub {
+        my %tied_hash;
+        tie %tied_hash, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
+        return \%tied_hash;
+    }
+  )->filter_json_single_key_object(
+    __remote_tied_array__ => sub {
+        my @tied_array;
+        tie @tied_array, 'Object::Remote::Tied', $self->_id_to_remote_object(@_);
+        return \@tied_array;
+    }
+  ); 
 }
 
 BEGIN {
@@ -135,7 +167,6 @@ sub new_from_spec {
   Dlog_debug { "creating a new connection from spec" };
   foreach my $poss (do { our @Guess }) {
     if (my $conn = $poss->($spec)) {
-      #Dlog_debug { my $id = $conn->_id; "created connection $id for spec $_" } $spec;
       return $conn->maybe::start::connect;
     }
   }
@@ -197,7 +228,7 @@ sub register_remote {
 
 sub send_free {
   my ($self, $id) = @_;
-  Dlog_debug { "sending request to free object '$id' for connection $_" } $self->_id;
+  Dlog_trace { "sending request to free object '$id' for connection $_" } $self->_id;
   delete $self->remote_objects_by_id->{$id};
   $self->_send([ free => $id ]);
 }
@@ -235,20 +266,33 @@ sub _send {
   my $fh = $self->send_to_fh;
   Dlog_trace { "Starting to serialize data in argument to _send for connection $_" } $self->_id;
   my $serialized = $self->_serialize($to_send)."\n";
-  Dlog_debug { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
+  Dlog_trace { my $l = length($serialized); "serialization is completed; sending '$l' characters of serialized data to $_" } $fh;
   #TODO this is very risky for deadlocks unless it's set to non-blocking and then with out extra
-  #logic it could easily do short-writes to the remote side
-  my $ret = print $fh $serialized;
-  Dlog_trace { my $r = defined $ret ? $ret : 'undef'; "print() returned $r with $_" } $fh;
-  #TODO hrm reason print's return value was ignored?
-  die "could not write to filehandle: $!" unless $ret;
+  #logic it could easily do short-writes to the remote side - how about taking this entire buffer
+  #and having the run loop send it to the file handle so this doesn't block while the sending
+  #is happening? 
+  my $ret; 
+  eval { 
+      local($SIG{PIPE}) = 'IGNORE';
+      die "filehandle is not open" unless openhandle($fh);
+      log_trace { "file handle has passed openhandle() test; printing to it" };
+      $ret = print $fh $serialized;
+      die "print was not successful: $!" unless defined $ret
+  };
+  
+  if ($@) {
+      Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
+      my $error = $@; chomp($error);
+      $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
+      return; 
+  }
+      
   return $ret; 
 }
 
 sub _serialize {
   my ($self, $data) = @_;
   local our @New_Ids = (-1);
-  Dlog_debug { "starting to serialize data for connection $_" } $self->_id;
   return eval {
     my $flat = $self->_encode($self->_deobjectify($data));
     warn "$$ >>> ${flat}\n" if $DEBUG;
@@ -284,9 +328,19 @@ sub _deobjectify {
     }
   } elsif (my $ref = ref($data)) {
     if ($ref eq 'HASH') {
-      return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
+      my $tied_to = tied(%$data);
+      if(defined($tied_to)) {
+        return +{__remote_tied_hash__ => $self->_local_object_to_id($tied_to)}; 
+      } else {
+        return +{ map +($_ => $self->_deobjectify($data->{$_})), keys %$data };
+      }
     } elsif ($ref eq 'ARRAY') {
-      return [ map $self->_deobjectify($_), @$data ];
+      my $tied_to = tied(@$data);
+      if (defined($tied_to)) {
+        return +{__remote_tied_array__ => $self->_local_object_to_id($tied_to)}; 
+      } else {
+        return [ map $self->_deobjectify($_), @$data ];
+      }
     } elsif ($ref eq 'CODE') {
       my $id = $self->_local_object_to_id(
                  Object::Remote::CodeContainer->new(code => $data)