use Symbol;
use IO::Handle;
use Module::Runtime qw(use_module);
-use Scalar::Util qw(weaken blessed refaddr);
+use Scalar::Util qw(weaken blessed refaddr openhandle);
use JSON::PP qw(encode_json);
use Moo;
is => 'ro', required => 1,
trigger => sub {
my ($self, $ch) = @_;
- Dlog_trace { my $id = $self->_id; "trigger for read_channel has been invoked for connection $id; file handle is " } $ch->fh;
+ my $id = $self->_id;
+ Dlog_trace { "trigger for read_channel has been invoked for connection $id; file handle is $_" } $ch->fh;
weaken($self);
$ch->on_line_call(sub { $self->_receive(@_) });
- $ch->on_close_call(sub { $self->on_close->done(@_) });
+ $ch->on_close_call(sub {
+ log_trace { "invoking 'done' on on_close handler for connection id '$id'" };
+ $self->on_close->done(@_);
+ });
},
);
+#TODO properly fix this bug -
+#trigger can't ever be invoked with a default
+#value and the on_close attribute is read only....
+#the future never gets the on_done handler
+#installed
+sub BUILD {
+ my ($self) = @_;
+ $self->on_close(CPS::Future->new);
+}
+
has on_close => (
- is => 'ro', default => sub { CPS::Future->new },
+ is => 'rw', default => sub { CPS::Future->new },
trigger => sub {
my ($self, $f) = @_;
Dlog_trace { "trigger for on_close has been invoked for connection $_" } $self->_id;
weaken($self);
$f->on_done(sub {
- $self->_fail_outstanding("Connection lost: ".($f->get)[0]);
+ Dlog_trace { "failing all of the outstanding futures for connection $_" } $self->_id;
+ $self->_fail_outstanding("Connection lost: " . ($f->get)[0]);
});
}
);
#logic it could easily do short-writes to the remote side - how about taking this entire buffer
#and having the run loop send it to the file handle so this doesn't block while the sending
#is happening?
- my $ret = print $fh $serialized;
- Dlog_trace { my $r = defined $ret ? $ret : 'undef'; "print() returned $r with $_" } $fh;
- #TODO hrm reason print's return value was ignored?
- die "could not write to filehandle: $!" unless $ret;
+ my $ret;
+ eval {
+ local($SIG{PIPE}) = 'IGNORE';
+ die "filehandle is not open" unless openhandle($fh);
+ log_trace { "file handle has passed openhandle() test; printing to it" };
+ $ret = print $fh $serialized;
+ die "print was not successful: $!" unless defined $ret
+ };
+
+ if ($@) {
+ Dlog_debug { "exception encountered when trying to write to file handle $_: $@" } $fh;
+ my $error = $@; chomp($error);
+ $self->on_close->done("could not write to file handle: $error") unless $self->on_close->is_ready;
+ return;
+ }
+
return $ret;
}
return;
}
+sub _sort_timers {
+ my ($self, @new) = @_;
+ my $timers = $self->_timers;
+
+ log_trace { "Sorting timers" };
+
+ @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, @new;
+ return;
+}
+
sub watch_time {
my ($self, %watch) = @_;
- my $at = $watch{at} || do {
- die "watch_time requires at or after" unless my $after = $watch{after};
- time() + $after;
- };
+ my $at;
+
+ Dlog_trace { "watch_time() invoked with $_" } \%watch;
+
+ if (exists($watch{every})) {
+ $at = time() + $watch{every};
+ } elsif (exists($watch{after})) {
+ $at = time() + $watch{after};
+ } elsif (exists($watch{at})) {
+ $at = $watch{at};
+ } else {
+ die "watch_time requires every, after or at";
+ }
+
die "watch_time requires code" unless my $code = $watch{code};
my $timers = $self->_timers;
- my $new = [ $at => $code ];
- @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new;
+ my $new = [ $at => $code, $watch{every} ];
+ $self->_sort_timers($new);
log_debug { "Created new timer that expires at '$at'" };
return "$new";
}
my $now = time();
log_trace { "Checking timers" };
while (@$timers and $timers->[0][0] <= $now) {
- Dlog_debug { "Found timer that needs to be executed: $_" } $timers->[0];
- (shift @$timers)->[1]->();
+ my $active = $timers->[0];
+ Dlog_debug { "Found timer that needs to be executed: $_" } $active;
+# my (shift @$timers)->[1]->();
+
+ if (defined($active->[2])) {
+ #handle the case of an 'every' timer
+ $active->[0] = time() + $active->[2];
+ Dlog_trace { "scheduling timer for repeat execution at $_"} $active->[0];
+ $self->_sort_timers;
+ } else {
+ #it doesn't repeat again so get rid of it
+ shift(@$timers);
+ }
+
+ #execute the timer
+ $active->[1]->();
+
last if $Loop_Entered;
}
+
log_trace { "Run loop: single loop is completed" };
return;
}
handle => $self->fh,
on_read_ready => 1
);
+ log_trace { "Invoking on_close_call() for dead read channel" };
$self->on_close_call->($err);
}
}
undef($channel);
});
$channel->on_close_call(sub {
+ log_trace { "Connection has been closed" };
$f->fail("Channel closed without seeing Shere: $_[0]");
undef($channel);
});
use Scalar::Util qw(blessed);
use POSIX ":sys_wait_h";
use Moo::Role;
+use Symbol;
with 'Object::Remote::Role::Connector';
#ulimit of ~500 megs of v-ram
#TODO only works with ssh with quotes but only works locally
#with out quotes
-#sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 80000; nice -n 15 perl -"' ] }
sub _build_perl_command { [ 'sh', '-c', '"ulimit -v 200000; nice -n 15 perl -"' ] }
#sub _build_perl_command { [ 'perl', '-' ] }
my $foreign_stderr;
Dlog_debug { "invoking connection to perl interpreter using command line: $_" } @{$self->final_perl_command};
-
- use Symbol;
-
+
if (defined($given_stderr)) {
$foreign_stderr = gensym();
} else {