16bf6a6db81405a898f743965266319f3ac5ca1f
[scpubgit/Object-Remote.git] / lib / Object / Remote / Role / Connector / PerlInterpreter.pm
1 package Object::Remote::Role::Connector::PerlInterpreter;
2
3 use IPC::Open2;
4 use IPC::Open3; 
5 use IO::Handle;
6 use Symbol; 
7 use Object::Remote::Logging qw( :log :dlog router );
8 use Object::Remote::ModuleSender;
9 use Object::Remote::Handle;
10 use Object::Remote::Future;
11 use Scalar::Util qw(blessed weaken);
12 use Moo::Role;
13
14 with 'Object::Remote::Role::Connector';
15
16 has module_sender => (is => 'lazy');
17 has ulimit => ( is => 'ro');
18 has nice => ( is => 'ro');
19 has watchdog_timeout => ( is => 'ro', required => 1, default => sub { undef });
20 has perl_command => (is => 'lazy');
21 has pid => (is => 'rwp');
22 has connection_id => (is => 'rwp');
23
24 #if no child_stderr file handle is specified then stderr
25 #of the child will be connected to stderr of the parent
26 has stderr => ( is => 'rw', default => sub { undef } );
27
28 sub _build_module_sender {
29   my ($hook) =
30     grep {blessed($_) && $_->isa('Object::Remote::ModuleLoader::Hook') }
31       @INC;
32   return $hook ? $hook->sender : Object::Remote::ModuleSender->new;
33 }
34
35 sub _build_perl_command {
36     my ($self) = @_;
37     my $nice = $self->nice;
38     my $ulimit = $self->ulimit;
39     my $perl_path = 'perl';
40     my $shell_code = '';
41
42     if (defined($ulimit)) {
43         $shell_code .= "ulimit $ulimit || exit 1; ";
44     }
45
46     if (defined($nice)) {
47         $shell_code .= "nice -n $nice ";
48     }
49
50     if (defined($ENV{OBJECT_REMOTE_PERL_BIN})) {
51         log_debug { "Using OBJECT_REMOTE_PERL_BIN environment variable as perl path" };
52         $perl_path = $ENV{OBJECT_REMOTE_PERL_BIN};
53     }
54
55     $shell_code .= $perl_path . ' -';
56
57     return [ 'bash', '-c', $shell_code ];
58 }
59
60 around connect => sub {
61   my ($orig, $self) = (shift, shift);
62   my $f = $self->$start::start($orig => @_);
63   return future {
64     $f->on_done(sub {
65       my ($conn) = $f->get;
66       $self->_setup_watchdog_reset($conn);
67       my $sub = $conn->remote_sub('Object::Remote::Logging::init_remote_logging');
68       $sub->('Object::Remote::Logging', router => router(), connection_id => $conn->_id);
69       Object::Remote::Handle->new(
70         connection => $conn,
71         class => 'Object::Remote::ModuleLoader',
72         args => { module_sender => $self->module_sender }
73       )->disarm_free;
74       require Object::Remote::Prompt;
75       Object::Remote::Prompt::maybe_set_prompt_command_on($conn);
76     });
77     $f;
78   } 2;
79 };
80
81 sub final_perl_command { shift->perl_command }
82
83 sub _start_perl {
84   my $self = shift;
85   my $given_stderr = $self->stderr;
86   my $foreign_stderr;
87  
88   Dlog_verbose {
89     s/\n/ /g; "invoking connection to perl interpreter using command line: $_"
90   } @{$self->final_perl_command};
91     
92   if (defined($given_stderr)) {
93     #if the stderr data goes to an existing file handle
94     #an anonymous file handle is required
95     #as the other half of a pipe style file handle pair
96     #so the file handles can go into the run loop
97     $foreign_stderr = gensym();
98   } else {
99     #if no file handle has been specified
100     #for the child's stderr then connect
101     #the child stderr to the parent stderr
102     $foreign_stderr = ">&STDERR";
103   }
104   
105   my $pid = open3(
106     my $foreign_stdin,
107     my $foreign_stdout,
108     $foreign_stderr,
109     @{$self->final_perl_command},
110   ) or die "Failed to run perl at '$_[0]': $!";
111   
112   $self->_set_pid($pid);
113   
114   if (defined($given_stderr)) {   
115     Dlog_debug { "Child process STDERR is being handled via run loop" };
116         
117     Object::Remote->current_loop
118                   ->watch_io(
119                       handle => $foreign_stderr,
120                       on_read_ready => sub {
121                         my $buf = ''; 
122                         my $len = sysread($foreign_stderr, $buf, 32768);
123                         if (!defined($len) or $len == 0) {
124                           log_trace { "Got EOF or error on child stderr, removing from watcher" };
125                           $self->stderr(undef);
126                           Object::Remote->current_loop->unwatch_io(
127                                          handle => $foreign_stderr,
128                                          on_read_ready => 1
129                                        );
130                           } else {
131                             Dlog_trace { "got $len characters of stderr data for connection" };
132                             print $given_stderr $buf or die "could not send stderr data: $!";
133                           }
134                          } 
135                       );     
136   }
137       
138   return ($foreign_stdin, $foreign_stdout, $pid);
139 }
140
141 sub _open2_for {
142   my $self = shift;
143   my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_);
144   my $to_send = $self->fatnode_text;
145   log_debug { my $len = length($to_send); "Sending contents of fat node to remote node; size is '$len' characters" };
146   Object::Remote->current_loop
147                 ->watch_io(
148                     handle => $foreign_stdin,
149                     on_write_ready => sub {
150                       my $len = syswrite($foreign_stdin, $to_send, 32768);
151                       if (defined $len) {
152                         substr($to_send, 0, $len) = '';
153                       }
154                       # if the stdin went away, we'll never get Shere
155                       # so it's not a big deal to simply give up on !defined
156                       if (!defined($len) or 0 == length($to_send)) {
157                         log_trace { "Got EOF or error when writing fatnode data to filehandle, unwatching it" };
158                         Object::Remote->current_loop
159                                       ->unwatch_io(
160                                           handle => $foreign_stdin,
161                                           on_write_ready => 1
162                                       );
163                       } else {
164                           log_trace { "Sent $len bytes of fatnode data to remote side" };
165                       }
166                     }
167                   );
168   return ($foreign_stdin, $foreign_stdout, $pid);
169 }
170
171 sub _setup_watchdog_reset {
172   my ($self, $conn) = @_;
173   my $timer_id; 
174     
175   return unless $self->watchdog_timeout; 
176         
177   Dlog_trace { "Creating Watchdog management timer for connection id $_" } $conn->_id;
178     
179   weaken($conn);
180         
181   $timer_id = Object::Remote->current_loop->watch_time(
182     every => $self->watchdog_timeout / 3,
183     code => sub {
184       unless(defined($conn)) {
185         log_trace { "Weak reference to connection in Watchdog was lost, terminating update timer $timer_id" };
186         Object::Remote->current_loop->unwatch_time($timer_id);
187         return;  
188       }
189             
190       Dlog_trace { "Reseting Watchdog for connection id $_" } $conn->_id;
191       #we do not want to block in the run loop so send the
192       #update off and ignore any result, we don't need it
193       #anyway
194       $conn->send_class_call(0, 'Object::Remote::WatchDog', 'reset');
195     }
196   );     
197 }
198
199 sub fatnode_text {
200   my ($self) = @_;
201   my $connection_timeout = $self->timeout;
202   my $watchdog_timeout = $self->watchdog_timeout;
203   my $text = '';
204
205   require Object::Remote::FatNode;
206   
207   if (defined($connection_timeout)) {
208     $text .= "alarm($connection_timeout);\n";
209   }
210   
211   if (defined($watchdog_timeout)) {
212     $text .= "my \$WATCHDOG_TIMEOUT = $watchdog_timeout;\n";
213   } else {
214     $text .= "my \$WATCHDOG_TIMEOUT = undef;\n";
215   }
216   
217   $text .= '$Object::Remote::FatNode::REMOTE_NODE = "1";' . "\n";
218   
219   $text .= <<'END';
220 $INC{'Object/Remote/FatNode.pm'} = __FILE__;
221 $Object::Remote::FatNode::DATA = <<'ENDFAT';
222 END
223   $text .= do { no warnings 'once'; $Object::Remote::FatNode::DATA };
224   $text .= "ENDFAT\n";
225   $text .= <<'END';
226 eval $Object::Remote::FatNode::DATA;
227 die $@ if $@;
228 END
229   
230   $text .= "__END__\n";
231   return $text;
232 }
233
234 1;