0a7ad67967dc6e150f4db7c3e4550305e95a4e94
[scpubgit/Object-Remote.git] / lib / Object / Remote / Role / Connector / PerlInterpreter.pm
1 package Object::Remote::Role::Connector::PerlInterpreter;
2
3 use IPC::Open2;
4 use IPC::Open3; 
5 use IO::Handle;
6 use Symbol; 
7 use Object::Remote::Logging qw(:log :dlog router);
8 use Object::Remote::ModuleSender;
9 use Object::Remote::Handle;
10 use Object::Remote::Future;
11 use Scalar::Util qw(blessed weaken);
12 use Moo::Role;
13
14 with 'Object::Remote::Role::Connector';
15
16 has module_sender => (is => 'lazy');
17 has ulimit => ( is => 'ro');
18 has nice => ( is => 'ro');
19 has watchdog_timeout => ( is => 'ro', required => 1, default => sub { undef });
20 has perl_command => (is => 'lazy');
21 has pid => (is => 'rwp');
22 has connection_id => (is => 'rwp');
23
24 #if no child_stderr file handle is specified then stderr
25 #of the child will be connected to stderr of the parent
26 has stderr => ( is => 'rw', default => sub { undef } );
27
28 BEGIN { router()->exclude_forwarding; }
29
30 sub _build_module_sender {
31   my ($hook) =
32     grep {blessed($_) && $_->isa('Object::Remote::ModuleLoader::Hook') }
33       @INC;
34   return $hook ? $hook->sender : Object::Remote::ModuleSender->new;
35 }
36
37 sub _build_perl_command {
38     my ($self) = @_;
39     my $nice = $self->nice;
40     my $ulimit = $self->ulimit;
41     my $perl_path = 'perl';
42     my $shell_code = '';
43
44     if (defined($ulimit)) {
45         $shell_code .= "ulimit $ulimit || exit 1; ";
46     }
47
48     if (defined($nice)) {
49         $shell_code .= "nice -n $nice ";
50     }
51
52     if (defined($ENV{OBJECT_REMOTE_PERL_BIN})) {
53         log_debug { "Using OBJECT_REMOTE_PERL_BIN environment variable as perl path" };
54         $perl_path = $ENV{OBJECT_REMOTE_PERL_BIN};
55     }
56
57     $shell_code .= $perl_path . ' -';
58
59     return [ 'bash', '-c', $shell_code ];
60 }
61
62 around connect => sub {
63   my ($orig, $self) = (shift, shift);
64   my $f = $self->$start::start($orig => @_);
65   return future {
66     $f->on_done(sub {
67       my ($conn) = $f->get;
68       $self->_setup_watchdog_reset($conn);
69       my $sub = $conn->remote_sub('Object::Remote::Logging::init_remote_logging');
70       $sub->('Object::Remote::Logging', router => router(), connection_id => $conn->_id);
71       Object::Remote::Handle->new(
72         connection => $conn,
73         class => 'Object::Remote::ModuleLoader',
74         args => { module_sender => $self->module_sender }
75       )->disarm_free;
76       require Object::Remote::Prompt;
77       Object::Remote::Prompt::maybe_set_prompt_command_on($conn);
78     });
79     $f;
80   } 2;
81 };
82
83 sub final_perl_command { shift->perl_command }
84
85 sub _start_perl {
86   my $self = shift;
87   my $given_stderr = $self->stderr;
88   my $foreign_stderr;
89  
90   Dlog_verbose {
91     s/\n/ /g; "invoking connection to perl interpreter using command line: $_"
92   } @{$self->final_perl_command};
93     
94   if (defined($given_stderr)) {
95     #if the stderr data goes to an existing file handle
96     #an anonymous file handle is required
97     #as the other half of a pipe style file handle pair
98     #so the file handles can go into the run loop
99     $foreign_stderr = gensym();
100   } else {
101     #if no file handle has been specified
102     #for the child's stderr then connect
103     #the child stderr to the parent stderr
104     $foreign_stderr = ">&STDERR";
105   }
106   
107   my $pid = open3(
108     my $foreign_stdin,
109     my $foreign_stdout,
110     $foreign_stderr,
111     @{$self->final_perl_command},
112   ) or die "Failed to run perl at '$_[0]': $!";
113   
114   $self->_set_pid($pid);
115   
116   if (defined($given_stderr)) {   
117     Dlog_debug { "Child process STDERR is being handled via run loop" };
118         
119     Object::Remote->current_loop
120                   ->watch_io(
121                       handle => $foreign_stderr,
122                       on_read_ready => sub {
123                         my $buf = ''; 
124                         my $len = sysread($foreign_stderr, $buf, 32768);
125                         if (!defined($len) or $len == 0) {
126                           log_trace { "Got EOF or error on child stderr, removing from watcher" };
127                           $self->stderr(undef);
128                           Object::Remote->current_loop->unwatch_io(
129                                          handle => $foreign_stderr,
130                                          on_read_ready => 1
131                                        );
132                           } else {
133                             Dlog_trace { "got $len characters of stderr data for connection" };
134                             print $given_stderr $buf or die "could not send stderr data: $!";
135                           }
136                          } 
137                       );     
138   }
139       
140   return ($foreign_stdin, $foreign_stdout, $pid);
141 }
142
143 sub _open2_for {
144   my $self = shift;
145   my ($foreign_stdin, $foreign_stdout, $pid) = $self->_start_perl(@_);
146   my $to_send = $self->fatnode_text;
147   log_debug { my $len = length($to_send); "Sending contents of fat node to remote node; size is '$len' characters" };
148   Object::Remote->current_loop
149                 ->watch_io(
150                     handle => $foreign_stdin,
151                     on_write_ready => sub {
152                       my $len = syswrite($foreign_stdin, $to_send, 32768);
153                       if (defined $len) {
154                         substr($to_send, 0, $len) = '';
155                       }
156                       # if the stdin went away, we'll never get Shere
157                       # so it's not a big deal to simply give up on !defined
158                       if (!defined($len) or 0 == length($to_send)) {
159                         log_trace { "Got EOF or error when writing fatnode data to filehandle, unwatching it" };
160                         Object::Remote->current_loop
161                                       ->unwatch_io(
162                                           handle => $foreign_stdin,
163                                           on_write_ready => 1
164                                       );
165                       } else {
166                           log_trace { "Sent $len bytes of fatnode data to remote side" };
167                       }
168                     }
169                   );
170   return ($foreign_stdin, $foreign_stdout, $pid);
171 }
172
173 sub _setup_watchdog_reset {
174   my ($self, $conn) = @_;
175   my $timer_id; 
176     
177   return unless $self->watchdog_timeout; 
178         
179   Dlog_trace { "Creating Watchdog management timer for connection id $_" } $conn->_id;
180     
181   weaken($conn);
182         
183   $timer_id = Object::Remote->current_loop->watch_time(
184     every => $self->watchdog_timeout / 3,
185     code => sub {
186       unless(defined($conn)) {
187         log_trace { "Weak reference to connection in Watchdog was lost, terminating update timer $timer_id" };
188         Object::Remote->current_loop->unwatch_time($timer_id);
189         return;  
190       }
191             
192       Dlog_trace { "Reseting Watchdog for connection id $_" } $conn->_id;
193       #we do not want to block in the run loop so send the
194       #update off and ignore any result, we don't need it
195       #anyway
196       $conn->send_class_call(0, 'Object::Remote::WatchDog', 'reset');
197     }
198   );     
199 }
200
201 sub fatnode_text {
202   my ($self) = @_;
203   my $connection_timeout = $self->timeout;
204   my $watchdog_timeout = $self->watchdog_timeout;
205   my $text = '';
206
207   require Object::Remote::FatNode;
208   
209   if (defined($connection_timeout)) {
210     $text .= "alarm($connection_timeout);\n";
211   }
212   
213   if (defined($watchdog_timeout)) {
214     $text .= "my \$WATCHDOG_TIMEOUT = $watchdog_timeout;\n";
215   } else {
216     $text .= "my \$WATCHDOG_TIMEOUT = undef;\n";
217   }
218   
219   $text .= '$Object::Remote::FatNode::REMOTE_NODE = "1";' . "\n";
220   
221   $text .= <<'END';
222 $INC{'Object/Remote/FatNode.pm'} = __FILE__;
223 $Object::Remote::FatNode::DATA = <<'ENDFAT';
224 END
225   $text .= do { no warnings 'once'; $Object::Remote::FatNode::DATA };
226   $text .= "ENDFAT\n";
227   $text .= <<'END';
228 eval $Object::Remote::FatNode::DATA;
229 die $@ if $@;
230 END
231   
232   $text .= "__END__\n";
233   return $text;
234 }
235
236 1;