Commit | Line | Data |
4536f655 |
1 | # File: Stem/AsyncIO.pm |
2 | |
3 | # This file is part of Stem. |
4 | # Copyright (C) 1999, 2000, 2001 Stem Systems, Inc. |
5 | |
6 | # Stem is free software; you can redistribute it and/or modify |
7 | # it under the terms of the GNU General Public License as published by |
8 | # the Free Software Foundation; either version 2 of the License, or |
9 | # (at your option) any later version. |
10 | |
11 | # Stem is distributed in the hope that it will be useful, |
12 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | # GNU General Public License for more details. |
15 | |
16 | # You should have received a copy of the GNU General Public License |
17 | # along with Stem; if not, write to the Free Software |
18 | # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
19 | |
20 | # For a license to use the Stem under conditions other than those |
21 | # described here, to purchase support for this software, or to purchase a |
22 | # commercial warranty contract, please contact Stem Systems at: |
23 | |
24 | # Stem Systems, Inc. 781-643-7504 |
25 | # 79 Everett St. info@stemsystems.com |
26 | # Arlington, MA 02474 |
27 | # USA |
28 | |
29 | package Stem::AsyncIO ; |
30 | |
31 | use strict ; |
32 | use Data::Dumper ; |
33 | |
34 | use Stem::Vars ; |
35 | |
36 | |
37 | my $attr_spec = [ |
38 | |
39 | { |
40 | 'name' => 'object', |
41 | 'required' => 1, |
42 | 'help' => <<HELP, |
43 | HELP |
44 | }, |
45 | |
46 | { |
47 | 'name' => 'read_method', |
48 | 'default' => 'async_read_data', |
49 | 'help' => <<HELP, |
50 | Method called with the data read from the read handle. It is only called if the |
51 | data_addr attribute is not set. |
52 | HELP |
53 | }, |
54 | |
55 | { |
56 | 'name' => 'stderr_method', |
57 | 'default' => 'async_stderr_data', |
58 | 'help' => <<HELP, |
59 | Method called with the data read from the stderr handle. It is only |
60 | called if the stderr_addr attribute is not set. |
61 | HELP |
62 | }, |
63 | |
64 | { |
65 | 'name' => 'closed_method', |
66 | 'default' => 'async_closed', |
67 | 'help' => <<HELP, |
68 | Method used when this object is closed. |
69 | HELP |
70 | }, |
71 | { |
72 | 'name' => 'fh', |
73 | 'help' => <<HELP, |
74 | File handle used for reading and writing. |
75 | HELP |
76 | }, |
77 | { |
78 | 'name' => 'read_fh', |
79 | 'help' => <<HELP, |
80 | File Handle used for reading. |
81 | HELP |
82 | }, |
83 | { |
84 | 'name' => 'write_fh', |
85 | 'help' => <<HELP, |
86 | File handle used for standard output. |
87 | HELP |
88 | }, |
89 | { |
90 | 'name' => 'stderr_fh', |
91 | 'help' => <<HELP, |
92 | File handle used for Standard Error. |
93 | HELP |
94 | }, |
95 | { |
96 | 'name' => 'data_addr', |
97 | 'type' => 'address', |
98 | 'help' => <<HELP, |
99 | The address of the Cell where the data is sent. |
100 | HELP |
101 | }, |
102 | { |
103 | 'name' => 'stderr_addr', |
104 | 'type' => 'address', |
105 | 'help' => <<HELP, |
106 | The address of the Cell where the stderr is sent. |
107 | HELP |
108 | }, |
109 | { |
110 | 'name' => 'data_msg_type', |
111 | 'default' => 'data', |
112 | 'help' => <<HELP, |
113 | This sets the type of the data message. |
114 | HELP |
115 | }, |
116 | { |
117 | 'name' => 'codec', |
118 | 'help' => <<HELP, |
119 | Use this codec to encode/decode the I/O data. Each write is encoded to |
120 | one packet out. Each packet read in will be decoded and either send a |
121 | data message or generate a callback. |
122 | HELP |
123 | }, |
124 | { |
125 | 'name' => 'stderr_msg_type', |
126 | 'default' => 'stderr_data', |
127 | 'help' => <<HELP, |
128 | This sets the type of the stderr data message. |
129 | HELP |
130 | }, |
131 | { |
132 | 'name' => 'from_addr', |
133 | 'type' => 'address', |
134 | 'help' => <<HELP, |
135 | The address used in the 'from' field of data and stderr messages. |
136 | HELP |
137 | }, |
138 | { |
139 | 'name' => 'send_data_on_close', |
140 | 'type' => 'boolean', |
141 | 'help' => <<HELP, |
142 | Buffer all read data and send it when the read handle is closed. |
143 | HELP |
144 | }, |
145 | { |
146 | 'name' => 'id', |
147 | 'help' => <<HELP, |
148 | The id is passed to the callback method as its only argument. Use it to |
149 | identify different instances of this object. |
150 | HELP |
151 | |
152 | }, |
153 | |
154 | ################ |
155 | ## add support to log all AIO |
156 | ################ |
157 | |
158 | { |
159 | 'name' => 'log_label', |
160 | 'default' => 'AIO', |
161 | 'help' => <<HELP, |
162 | HELP |
163 | }, |
164 | { |
165 | 'name' => 'log_level', |
166 | 'default' => 5, |
167 | 'help' => <<HELP, |
168 | HELP |
169 | }, |
170 | { |
171 | 'name' => 'read_log', |
172 | 'help' => <<HELP, |
173 | HELP |
174 | }, |
175 | |
176 | { |
177 | 'name' => 'stderr_log', |
178 | 'help' => <<HELP, |
179 | HELP |
180 | }, |
181 | |
182 | { |
183 | 'name' => 'write_log', |
184 | 'help' => <<HELP, |
185 | HELP |
186 | }, |
187 | |
188 | |
189 | ] ; |
190 | |
191 | use Carp 'cluck' ; |
192 | |
193 | sub new { |
194 | |
195 | my( $class ) = shift ; |
196 | |
197 | my $self = Stem::Class::parse_args( $attr_spec, @_ ) ; |
198 | return $self unless ref $self ; |
199 | |
200 | #cluck "NEW $self" ; |
201 | |
202 | if ( $self->{'data_addr'} && ! $self->{'from_addr'} ) { |
203 | |
204 | return "Using 'data_addr in AsyncIO requires a 'from_addr'" ; |
205 | } |
206 | |
207 | if ( my $codec = $self->{'codec'} ) { |
208 | |
209 | require Stem::Packet ; |
210 | my $packet = Stem::Packet->new( 'codec' => $codec ) ; |
211 | return $packet unless ref $packet ; |
212 | |
213 | $self->{'packet'} = $packet ; |
214 | } |
215 | |
216 | $self->{'stderr_addr'} ||= $self->{'data_addr'} ; |
217 | |
218 | $self->{'buffer'} = '' if $self->{'send_data_on_close'} ; |
219 | |
220 | $self->{ 'read_fh' } ||= $self->{ 'fh' } ; |
221 | $self->{ 'write_fh' } ||= $self->{ 'fh' } ; |
222 | |
223 | if ( my $read_fh = $self->{'read_fh'} ) { |
224 | |
225 | my $read_event = Stem::Event::Read->new( |
226 | 'object' => $self, |
227 | 'fh' => $read_fh, |
228 | ) ; |
229 | |
230 | return $read_event unless ref $read_event ; |
231 | |
232 | $self->{'read_event'} = $read_event ; |
233 | } |
234 | |
235 | if ( my $stderr_fh = $self->{'stderr_fh'} ) { |
236 | |
237 | my $stderr_event = Stem::Event::Read->new( |
238 | 'object' => $self, |
239 | 'fh' => $stderr_fh, |
240 | 'method' => 'stderr_readable', |
241 | ) ; |
242 | |
243 | return $stderr_event unless ref $stderr_event ; |
244 | |
245 | $self->{'stderr_event'} = $stderr_event ; |
246 | } |
247 | |
248 | if ( my $write_fh = $self->{'write_fh'} ) { |
249 | |
250 | my $write_event = Stem::Event::Write->new( |
251 | 'object' => $self, |
252 | 'fh' => $write_fh, |
253 | ) ; |
254 | |
255 | return $write_event unless ref $write_event ; |
256 | |
257 | $self->{'write_event'} = $write_event ; |
258 | |
259 | $self->{'write_buf'} = '' ; |
260 | } |
261 | |
262 | return $self ; |
263 | } |
264 | |
265 | sub shut_down { |
266 | |
267 | my( $self ) = @_ ; |
268 | |
269 | #cluck "SHUT $self\n" ; |
270 | |
271 | |
272 | if ( $self->{'shut_down'} ) { |
273 | |
274 | return ; |
275 | } |
276 | |
277 | $self->{'shutting_down'} = 1 ; |
278 | |
279 | $self->read_shut_down() ; |
280 | |
281 | $self->write_shut_down() ; |
282 | |
283 | if ( my $event = delete $self->{'stderr_event'} ) { |
284 | |
285 | $event->cancel() ; |
286 | close( $self->{'stderr_fh'} ) ; |
287 | } |
288 | |
289 | $self->{'shut_down'} = 1 ; |
290 | |
291 | #print "DELETE OBJ", caller(), "\n" ; |
292 | |
293 | delete $self->{'object'} ; |
294 | } |
295 | |
296 | sub read_shut_down { |
297 | |
298 | my( $self ) = @_ ; |
299 | |
300 | if ( my $event = delete $self->{'read_event'} ) { |
301 | |
302 | $event->cancel() ; |
303 | } |
304 | |
305 | shutdown( $self->{'read_fh'}, 0 ) ; |
306 | } |
307 | |
308 | sub write_shut_down { |
309 | |
310 | my( $self ) = @_ ; |
311 | |
312 | if ( exists( $self->{'write_buf'} ) && |
313 | length( $self->{'write_buf'} ) ) { |
314 | |
315 | #print "write handle shut when empty\n" ; |
316 | $self->{'shut_down_when_empty'} = 1 ; |
317 | |
318 | return ; |
319 | } |
320 | |
321 | if ( my $event = delete $self->{'write_event'} ) { |
322 | |
323 | shutdown( $self->{'write_fh'}, 1 ) ; |
324 | $event->cancel() ; |
325 | } |
326 | } |
327 | |
328 | sub readable { |
329 | |
330 | my( $self ) = @_ ; |
331 | |
332 | my( $read_buf ) ; |
333 | |
334 | return if $self->{'shut_down'} ; |
335 | |
336 | my $bytes_read = sysread( $self->{'read_fh'}, $read_buf, 8192 ) ; |
337 | |
338 | #print "READ: $bytes_read [$read_buf]\n" ; |
339 | |
340 | unless( defined( $bytes_read ) && $bytes_read > 0 ) { |
341 | |
342 | $self->read_shut_down() ; |
343 | |
344 | if ( $self->{'send_data_on_close'} && |
345 | length( $self->{'buffer'} ) ) { |
346 | |
347 | $self->send_data() ; |
348 | |
349 | # since we sent the total read buffer, we don't do a closed callback. |
350 | |
351 | return ; |
352 | } |
353 | |
354 | $self->_callback( 'closed_method' ) ; |
355 | |
356 | return ; |
357 | } |
358 | |
359 | # decode the packet if needed |
360 | |
361 | if ( my $packet = $self->{packet} ) { |
362 | |
363 | my $buf_ref = \$read_buf ; |
364 | |
365 | while( my $data_ref = $packet->to_data( $buf_ref ) ) { |
366 | |
367 | $self->send_data( $data_ref ) ; |
368 | $buf_ref = undef ; |
369 | } |
370 | |
371 | return ; |
372 | } |
373 | |
374 | if ( $self->{'send_data_on_close'} ) { |
375 | |
376 | $self->{'buffer'} .= $read_buf ; |
377 | return ; |
378 | } |
379 | |
380 | $self->send_data( \$read_buf ) ; |
381 | } |
382 | |
383 | sub send_data { |
384 | |
385 | my( $self, $buffer ) = @_ ; |
386 | |
387 | my $buf_ref = $buffer || \$self->{'buffer'} ; |
388 | |
389 | $self->_send_data_msg( 'data_addr', 'data_msg_type', $buf_ref ) ; |
390 | $self->_callback( 'read_method', $buf_ref ) ; |
391 | |
392 | return ; |
393 | } |
394 | |
395 | sub stderr_readable { |
396 | |
397 | my( $self ) = @_ ; |
398 | |
399 | my( $read_buf ) ; |
400 | |
401 | my $bytes_read = sysread( $self->{'stderr_fh'}, $read_buf, 8192 ) ; |
402 | |
403 | # no callback on stderr close. let the read handle close deal with the |
404 | # shutdown |
405 | |
406 | return if $bytes_read == 0 ; |
407 | |
408 | #print "STDERR READ [$read_buf]\n" ; |
409 | |
410 | $self->_send_data_msg( 'stderr_addr', 'stderr_msg_type', \$read_buf ) ; |
411 | $self->_callback( 'stderr_method', \$read_buf ) ; |
412 | } |
413 | |
414 | sub _send_data_msg { |
415 | |
416 | my( $self, $addr_attr, $type_attr, $data_ref ) = @_ ; |
417 | |
418 | my $to_addr = $self->{$addr_attr} or return ; |
419 | |
420 | my $msg = Stem::Msg->new( |
421 | 'to' => $to_addr, |
422 | 'from' => $self->{'from_addr'}, |
423 | 'type' => $self->{$type_attr}, |
424 | 'data' => $data_ref, |
425 | ) ; |
426 | |
427 | #print $msg->dump( 'SEND DATA' ) ; |
428 | $msg->dispatch() ; |
429 | } |
430 | |
431 | sub _callback { |
432 | |
433 | my ( $self, $method_attr, @data ) = @_ ; |
434 | |
435 | my $obj = $self->{'object'} or return ; |
436 | |
437 | my $method = $self->{$method_attr} ; |
438 | |
439 | my $code = $obj->can( $method ) or return ; |
440 | |
441 | return $obj->$code( @data, $self->{'id'} ) ; |
442 | } |
443 | |
444 | sub write { |
445 | |
446 | my( $self ) = shift ; |
447 | |
448 | return unless @_ ; |
449 | |
450 | return unless exists( $self->{'write_buf'} ) ; |
451 | |
452 | my $buffer = shift ; |
453 | |
454 | return if $self->{'shut_down'} ; |
455 | |
456 | # encode the data in a packet if needed |
457 | |
458 | if ( my $packet = $self->{packet} ) { |
459 | |
460 | my $buf_ref = $packet->to_packet( $buffer ) ; |
461 | |
462 | $self->{'write_buf'} .= ${$buf_ref} ; |
463 | } |
464 | else { |
465 | |
466 | $self->{'write_buf'} .= ref $buffer eq 'SCALAR' ? |
467 | ${$buffer} : $buffer ; |
468 | } |
469 | |
470 | $self->{'write_event'}->start() ; |
471 | } |
472 | |
473 | sub final_write { |
474 | |
475 | my( $self ) = @_ ; |
476 | |
477 | $self->write( $_[1] ) ; |
478 | |
479 | $self->write_shut_down() ; |
480 | } |
481 | |
482 | |
483 | sub writeable { |
484 | |
485 | my( $self ) = @_ ; |
486 | |
487 | return if $self->{'shut_down'} ; |
488 | |
489 | my $buf_ref = \$self->{'write_buf'} ; |
490 | my $buf_len = length $$buf_ref ; |
491 | |
492 | #print "BUFLEN [$buf_len]\n" ; |
493 | |
494 | unless ( $buf_len ) { |
495 | |
496 | #print "AIO W STOPPING\n" ; |
497 | $self->{'write_event'}->stop() ; |
498 | return ; |
499 | } |
500 | |
501 | my $bytes_written = syswrite( $self->{'write_fh'}, $$buf_ref ) ; |
502 | |
503 | unless( defined( $bytes_written ) ) { |
504 | |
505 | # do a SHUTDOWN |
506 | return ; |
507 | } |
508 | |
509 | # remove the part of the buffer that was written |
510 | |
511 | substr( $$buf_ref, 0, $bytes_written, '' ) ; |
512 | |
513 | return if length( $$buf_ref ) ; |
514 | |
515 | $self->write_shut_down() if $self->{'shut_down_when_empty'} ; |
516 | } |
517 | |
518 | |
519 | # DESTROY { |
520 | |
521 | # my( $self ) = @_ ; |
522 | |
523 | # print "DESTROY $self\n" ; |
524 | |
525 | # } |
526 | |
527 | 1 ; |