5b011e410f8e26092ac57d9dabd423c8eafec56a
[urisagit/Stem.git] / lib / Stem / AsyncIO.pm
1 #  File: Stem/AsyncIO.pm
2
3 #  This file is part of Stem.
4 #  Copyright (C) 1999, 2000, 2001 Stem Systems, Inc.
5
6 #  Stem is free software; you can redistribute it and/or modify
7 #  it under the terms of the GNU General Public License as published by
8 #  the Free Software Foundation; either version 2 of the License, or
9 #  (at your option) any later version.
10
11 #  Stem is distributed in the hope that it will be useful,
12 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 #  GNU General Public License for more details.
15
16 #  You should have received a copy of the GNU General Public License
17 #  along with Stem; if not, write to the Free Software
18 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
20 #  For a license to use the Stem under conditions other than those
21 #  described here, to purchase support for this software, or to purchase a
22 #  commercial warranty contract, please contact Stem Systems at:
23
24 #       Stem Systems, Inc.              781-643-7504
25 #       79 Everett St.                  info@stemsystems.com
26 #       Arlington, MA 02474
27 #       USA
28
29 package Stem::AsyncIO ;
30
31 use strict ;
32 use Data::Dumper ;
33
34 use Stem::Vars ;
35
36
37 my $attr_spec = [
38
39         {
40                 'name'          => 'object',
41                 'required'      => 1,
42                 'help'          => <<HELP,
43 HELP
44         },
45
46         {
47                 'name'          => 'read_method',
48                 'default'       => 'async_read_data',
49                 'help'          => <<HELP,
50 Method called with the data read from the read handle. It is only called if the 
51 data_addr attribute is not set.
52 HELP
53         },
54
55         {
56                 'name'          => 'stderr_method',
57                 'default'       => 'async_stderr_data',
58                 'help'          => <<HELP,
59 Method called with the data read from the stderr handle. It is only
60 called if the stderr_addr attribute is not set.
61 HELP
62         },
63
64         {
65                 'name'          => 'closed_method',
66                 'default'       => 'async_closed',
67                 'help'          => <<HELP,
68 Method used when this object is closed.
69 HELP
70         },
71         {
72                 'name'          => 'fh',
73                 'help'          => <<HELP,
74 File handle used for reading and writing.
75 HELP
76         },
77         {
78                 'name'          => 'read_fh',
79                 'help'          => <<HELP,
80 File Handle used for reading.
81 HELP
82         },
83         {
84                 'name'          => 'write_fh',
85                 'help'          => <<HELP,
86 File handle used for standard output.
87 HELP
88         },
89         {
90                 'name'          => 'stderr_fh',
91                 'help'          => <<HELP,
92 File handle used for Standard Error.
93 HELP
94         },
95         {
96                 'name'          => 'data_addr',
97                 'type'          => 'address',
98                 'help'          => <<HELP,
99 The address of the Cell where the data is sent.
100 HELP
101         },
102         {
103                 'name'          => 'stderr_addr',
104                 'type'          => 'address',
105                 'help'          => <<HELP,
106 The address of the Cell where the stderr is sent.
107 HELP
108         },
109         {
110                 'name'          => 'data_msg_type',
111                 'default'       => 'data',
112                 'help'          => <<HELP,
113 This sets the type of the data message.
114 HELP
115         },
116         {
117                 'name'          => 'codec',
118                 'help'          => <<HELP,
119 Use this codec to encode/decode the I/O data. Each write is encoded to
120 one packet out. Each packet read in will be decoded and either send a
121 data message or generate a callback.
122 HELP
123         },
124         {
125                 'name'          => 'stderr_msg_type',
126                 'default'       => 'stderr_data',
127                 'help'          => <<HELP,
128 This sets the type of the stderr data message.
129 HELP
130         },
131         {
132                 'name'          => 'from_addr',
133                 'type'          => 'address',
134                 'help'          => <<HELP,
135 The address used in the 'from' field of data and stderr messages.
136 HELP
137         },
138         {
139                 'name'          => 'send_data_on_close',
140                 'type'          => 'boolean',
141                 'help'          => <<HELP,
142 Buffer all read data and send it when the read handle is closed.
143 HELP
144         },
145         {
146                 'name'          => 'id',
147                 'help'          => <<HELP,
148 The id is passed to the callback method as its only argument. Use it to
149 identify different instances of this object.
150 HELP
151
152         },
153
154 ################
155 ## add support to log all AIO
156 ################
157
158         {
159                 'name'          => 'log_label',
160                 'default'       => 'AIO',
161                 'help'          => <<HELP,
162 HELP
163         },
164         {
165                 'name'          => 'log_level',
166                 'default'       => 5,
167                 'help'          => <<HELP,
168 HELP
169         },
170         {
171                 'name'          => 'read_log',
172                 'help'          => <<HELP,
173 HELP
174         },
175
176         {
177                 'name'          => 'stderr_log',
178                 'help'          => <<HELP,
179 HELP
180         },
181
182         {
183                 'name'          => 'write_log',
184                 'help'          => <<HELP,
185 HELP
186         },
187
188
189 ] ;
190
191 use Carp 'cluck' ;
192
193 sub new {
194
195         my( $class ) = shift ;
196
197         my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
198         return $self unless ref $self ;
199
200 #cluck "NEW $self" ;
201
202         if ( $self->{'data_addr'} && ! $self->{'from_addr'} ) {
203
204                 return "Using 'data_addr in AsyncIO requires a 'from_addr'" ;
205         }
206
207         if ( my $codec = $self->{'codec'} ) {
208
209                 require Stem::Packet ;
210                 my $packet = Stem::Packet->new( 'codec' => $codec ) ;
211                 return $packet unless ref $packet ;
212
213                 $self->{'packet'} = $packet ;
214         }
215
216         $self->{'stderr_addr'} ||= $self->{'data_addr'} ;
217
218         $self->{'buffer'} = '' if $self->{'send_data_on_close'} ;
219
220         $self->{ 'read_fh' } ||= $self->{ 'fh' } ;
221         $self->{ 'write_fh' } ||= $self->{ 'fh' } ;
222
223         if ( my $read_fh = $self->{'read_fh'} ) {
224
225                 my $read_event = Stem::Event::Read->new(
226                                         'object'        => $self,
227                                         'fh'            => $read_fh,
228                 ) ;
229
230                 return $read_event unless ref $read_event ;
231
232                 $self->{'read_event'} = $read_event ;
233         }
234
235         if ( my $stderr_fh = $self->{'stderr_fh'} ) {
236
237                 my $stderr_event = Stem::Event::Read->new(
238                                         'object'        => $self,
239                                         'fh'            => $stderr_fh,
240                                         'method'        => 'stderr_readable',
241                 ) ;
242
243                 return $stderr_event unless ref $stderr_event ;
244
245                 $self->{'stderr_event'} = $stderr_event ;
246         }
247
248         if ( my $write_fh = $self->{'write_fh'} ) {
249
250                 my $write_event = Stem::Event::Write->new(
251                                         'object'        => $self,
252                                         'fh'            => $write_fh,
253                 ) ;
254
255                 return $write_event unless ref $write_event ;
256
257                 $self->{'write_event'} = $write_event ;
258
259                 $self->{'write_buf'} = '' ;
260         }
261
262         return $self ;
263 }
264
265 sub shut_down {
266
267         my( $self ) = @_ ;
268
269 #cluck "SHUT $self\n" ;
270
271
272         if ( $self->{'shut_down'} ) {
273
274                 return ;
275         }
276
277         $self->{'shutting_down'} = 1 ;
278
279         $self->read_shut_down() ;
280
281         $self->write_shut_down() ;
282
283         if ( my $event = delete $self->{'stderr_event'} ) {
284
285                 $event->cancel() ;
286                 close( $self->{'stderr_fh'} ) ;
287         }
288
289         $self->{'shut_down'} = 1 ;
290
291 #print "DELETE OBJ", caller(), "\n" ;
292
293         delete $self->{'object'} ;
294 }
295
296 sub read_shut_down {
297
298         my( $self ) = @_ ;
299
300         if ( my $event = delete $self->{'read_event'} ) {
301
302                 $event->cancel() ;
303         }
304
305         shutdown( $self->{'read_fh'}, 0 ) ;
306 }
307
308 sub write_shut_down {
309
310         my( $self ) = @_ ;
311
312         if ( exists( $self->{'write_buf'} ) && 
313              length( $self->{'write_buf'} ) ) {
314
315 #print "write handle shut when empty\n" ;
316                 $self->{'shut_down_when_empty'} = 1 ;
317
318                 return ;
319         }
320
321         if ( my $event = delete $self->{'write_event'} ) {
322
323                 shutdown( $self->{'write_fh'}, 1 ) ;
324                 $event->cancel() ;
325         }
326 }
327
328 sub readable {
329
330         my( $self ) = @_ ;
331
332         my( $read_buf ) ;
333
334         return if $self->{'shut_down'} ;
335
336         my $bytes_read = sysread( $self->{'read_fh'}, $read_buf, 8192 ) ;
337
338 #print "READ: $bytes_read [$read_buf]\n" ;
339
340         unless( defined( $bytes_read ) && $bytes_read > 0 ) {
341
342                 $self->read_shut_down() ;
343
344                 if ( $self->{'send_data_on_close'} &&
345                      length( $self->{'buffer'} ) ) {
346
347                         $self->send_data() ;
348
349 # since we sent the total read buffer, we don't do a closed callback.
350
351                         return ;
352                 }
353
354                 $self->_callback( 'closed_method' ) ;
355
356                 return ;
357         }
358
359 # decode the packet if needed
360
361         if ( my $packet = $self->{packet} ) {
362
363                 my $buf_ref = \$read_buf ;
364
365                 while( my $data_ref = $packet->to_data( $buf_ref ) ) {
366
367                         $self->send_data( $data_ref ) ;
368                         $buf_ref = undef ;
369                 }
370
371                 return ;
372         }
373
374         if ( $self->{'send_data_on_close'} ) {
375
376                 $self->{'buffer'} .= $read_buf ;
377                 return ;
378         }
379
380         $self->send_data( \$read_buf ) ;
381 }
382
383 sub send_data {
384
385         my( $self, $buffer ) = @_ ;
386
387         my $buf_ref = $buffer || \$self->{'buffer'} ;
388
389         $self->_send_data_msg( 'data_addr', 'data_msg_type', $buf_ref ) ;
390         $self->_callback( 'read_method', $buf_ref ) ;
391
392         return ;
393 }
394
395 sub stderr_readable {
396
397         my( $self ) = @_ ;
398
399         my( $read_buf ) ;
400
401         my $bytes_read = sysread( $self->{'stderr_fh'}, $read_buf, 8192 ) ;
402
403 # no callback on stderr close. let the read handle close deal with the
404 # shutdown
405
406         return if $bytes_read == 0 ;
407
408 #print "STDERR READ [$read_buf]\n" ;
409
410         $self->_send_data_msg( 'stderr_addr', 'stderr_msg_type', \$read_buf ) ;
411         $self->_callback( 'stderr_method', \$read_buf ) ;
412 }
413
414 sub _send_data_msg {
415
416         my( $self, $addr_attr, $type_attr, $data_ref ) = @_ ;
417
418         my $to_addr = $self->{$addr_attr} or return ;
419
420         my $msg = Stem::Msg->new(
421                         'to'            => $to_addr,
422                         'from'          => $self->{'from_addr'},
423                         'type'          => $self->{$type_attr},
424                         'data'          => $data_ref,
425         ) ;
426
427 #print $msg->dump( 'SEND DATA' ) ;
428         $msg->dispatch() ;
429 }
430
431 sub _callback {
432
433         my ( $self, $method_attr, @data ) = @_ ;
434
435         my $obj = $self->{'object'} or return ;
436
437         my $method = $self->{$method_attr} ;
438
439         my $code = $obj->can( $method ) or return ;
440
441         return $obj->$code( @data, $self->{'id'} ) ;
442 }
443
444 sub write {
445
446         my( $self ) = shift ;
447
448         return unless @_ ;
449
450         return unless exists( $self->{'write_buf'} ) ;
451
452         my $buffer = shift ;
453
454         return if $self->{'shut_down'} ;
455
456 # encode the data in a packet if needed
457
458         if ( my $packet = $self->{packet} ) {
459
460                 my $buf_ref = $packet->to_packet( $buffer ) ;
461
462                 $self->{'write_buf'} .= ${$buf_ref} ;
463         }
464         else {
465
466                 $self->{'write_buf'} .= ref $buffer eq 'SCALAR' ?
467                         ${$buffer} : $buffer ;
468         }
469
470         $self->{'write_event'}->start() ;
471 }
472
473 sub final_write {
474
475         my( $self ) = @_ ;
476
477         $self->write( $_[1] ) ;
478
479         $self->write_shut_down() ;
480 }
481
482
483 sub writeable {
484
485         my( $self ) = @_ ;
486
487         return if $self->{'shut_down'} ;
488
489         my $buf_ref = \$self->{'write_buf'} ;
490         my $buf_len = length $$buf_ref ;
491
492 #print "BUFLEN [$buf_len]\n" ;
493
494         unless ( $buf_len ) {
495
496 #print "AIO W STOPPING\n" ;
497                 $self->{'write_event'}->stop() ;
498                 return ;
499         }
500
501         my $bytes_written = syswrite( $self->{'write_fh'}, $$buf_ref ) ;
502
503         unless( defined( $bytes_written ) ) {
504
505 # do a SHUTDOWN
506                 return ;
507         }
508
509 # remove the part of the buffer that was written 
510
511         substr( $$buf_ref, 0, $bytes_written, '' ) ;
512
513         return if length( $$buf_ref ) ;
514
515         $self->write_shut_down() if $self->{'shut_down_when_empty'} ;
516 }
517
518
519 # DESTROY {
520
521 #       my( $self ) = @_  ;
522
523 # print "DESTROY $self\n" ;
524
525 # }
526
527 1 ;