1 # File: Stem/AsyncIO.pm
3 # This file is part of Stem.
4 # Copyright (C) 1999, 2000, 2001 Stem Systems, Inc.
6 # Stem is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # Stem is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with Stem; if not, write to the Free Software
18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 # For a license to use the Stem under conditions other than those
21 # described here, to purchase support for this software, or to purchase a
22 # commercial warranty contract, please contact Stem Systems at:
24 # Stem Systems, Inc. 781-643-7504
25 # 79 Everett St. info@stemsystems.com
29 package Stem::AsyncIO ;
47 'name' => 'read_method',
48 'default' => 'async_read_data',
50 Method called with the data read from the read handle. It is only called if the
51 data_addr attribute is not set.
56 'name' => 'stderr_method',
57 'default' => 'async_stderr_data',
59 Method called with the data read from the stderr handle. It is only
60 called if the stderr_addr attribute is not set.
65 'name' => 'closed_method',
66 'default' => 'async_closed',
68 Method used when this object is closed.
74 File handle used for reading and writing.
80 File Handle used for reading.
86 File handle used for standard output.
90 'name' => 'stderr_fh',
92 File handle used for Standard Error.
96 'name' => 'data_addr',
99 The address of the Cell where the data is sent.
103 'name' => 'stderr_addr',
106 The address of the Cell where the stderr is sent.
110 'name' => 'data_msg_type',
113 This sets the type of the data message.
119 Use this codec to encode/decode the I/O data. Each write is encoded to
120 one packet out. Each packet read in will be decoded and either send a
121 data message or generate a callback.
125 'name' => 'stderr_msg_type',
126 'default' => 'stderr_data',
128 This sets the type of the stderr data message.
132 'name' => 'from_addr',
135 The address used in the 'from' field of data and stderr messages.
139 'name' => 'send_data_on_close',
142 Buffer all read data and send it when the read handle is closed.
148 The id is passed to the callback method as its only argument. Use it to
149 identify different instances of this object.
155 ## add support to log all AIO
159 'name' => 'log_label',
165 'name' => 'log_level',
171 'name' => 'read_log',
177 'name' => 'stderr_log',
183 'name' => 'write_log',
195 my( $class ) = shift ;
197 my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
198 return $self unless ref $self ;
202 if ( $self->{'data_addr'} && ! $self->{'from_addr'} ) {
204 return "Using 'data_addr in AsyncIO requires a 'from_addr'" ;
207 if ( my $codec = $self->{'codec'} ) {
209 require Stem::Packet ;
210 my $packet = Stem::Packet->new( 'codec' => $codec ) ;
211 return $packet unless ref $packet ;
213 $self->{'packet'} = $packet ;
216 $self->{'stderr_addr'} ||= $self->{'data_addr'} ;
218 $self->{'buffer'} = '' if $self->{'send_data_on_close'} ;
220 $self->{ 'read_fh' } ||= $self->{ 'fh' } ;
221 $self->{ 'write_fh' } ||= $self->{ 'fh' } ;
223 if ( my $read_fh = $self->{'read_fh'} ) {
225 my $read_event = Stem::Event::Read->new(
230 return $read_event unless ref $read_event ;
232 $self->{'read_event'} = $read_event ;
235 if ( my $stderr_fh = $self->{'stderr_fh'} ) {
237 my $stderr_event = Stem::Event::Read->new(
240 'method' => 'stderr_readable',
243 return $stderr_event unless ref $stderr_event ;
245 $self->{'stderr_event'} = $stderr_event ;
248 if ( my $write_fh = $self->{'write_fh'} ) {
250 my $write_event = Stem::Event::Write->new(
255 return $write_event unless ref $write_event ;
257 $self->{'write_event'} = $write_event ;
259 $self->{'write_buf'} = '' ;
269 #cluck "SHUT $self\n" ;
272 if ( $self->{'shut_down'} ) {
277 $self->{'shutting_down'} = 1 ;
279 $self->read_shut_down() ;
281 $self->write_shut_down() ;
283 if ( my $event = delete $self->{'stderr_event'} ) {
286 close( $self->{'stderr_fh'} ) ;
289 $self->{'shut_down'} = 1 ;
291 #print "DELETE OBJ", caller(), "\n" ;
293 delete $self->{'object'} ;
300 if ( my $event = delete $self->{'read_event'} ) {
305 shutdown( $self->{'read_fh'}, 0 ) ;
308 sub write_shut_down {
312 if ( exists( $self->{'write_buf'} ) &&
313 length( $self->{'write_buf'} ) ) {
315 #print "write handle shut when empty\n" ;
316 $self->{'shut_down_when_empty'} = 1 ;
321 if ( my $event = delete $self->{'write_event'} ) {
323 shutdown( $self->{'write_fh'}, 1 ) ;
334 return if $self->{'shut_down'} ;
336 my $bytes_read = sysread( $self->{'read_fh'}, $read_buf, 8192 ) ;
338 #print "READ: $bytes_read [$read_buf]\n" ;
340 unless( defined( $bytes_read ) && $bytes_read > 0 ) {
342 $self->read_shut_down() ;
344 if ( $self->{'send_data_on_close'} &&
345 length( $self->{'buffer'} ) ) {
349 # since we sent the total read buffer, we don't do a closed callback.
354 $self->_callback( 'closed_method' ) ;
359 # decode the packet if needed
361 if ( my $packet = $self->{packet} ) {
363 my $buf_ref = \$read_buf ;
365 while( my $data_ref = $packet->to_data( $buf_ref ) ) {
367 $self->send_data( $data_ref ) ;
374 if ( $self->{'send_data_on_close'} ) {
376 $self->{'buffer'} .= $read_buf ;
380 $self->send_data( \$read_buf ) ;
385 my( $self, $buffer ) = @_ ;
387 my $buf_ref = $buffer || \$self->{'buffer'} ;
389 $self->_send_data_msg( 'data_addr', 'data_msg_type', $buf_ref ) ;
390 $self->_callback( 'read_method', $buf_ref ) ;
395 sub stderr_readable {
401 my $bytes_read = sysread( $self->{'stderr_fh'}, $read_buf, 8192 ) ;
403 # no callback on stderr close. let the read handle close deal with the
406 return if $bytes_read == 0 ;
408 #print "STDERR READ [$read_buf]\n" ;
410 $self->_send_data_msg( 'stderr_addr', 'stderr_msg_type', \$read_buf ) ;
411 $self->_callback( 'stderr_method', \$read_buf ) ;
416 my( $self, $addr_attr, $type_attr, $data_ref ) = @_ ;
418 my $to_addr = $self->{$addr_attr} or return ;
420 my $msg = Stem::Msg->new(
422 'from' => $self->{'from_addr'},
423 'type' => $self->{$type_attr},
427 #print $msg->dump( 'SEND DATA' ) ;
433 my ( $self, $method_attr, @data ) = @_ ;
435 my $obj = $self->{'object'} or return ;
437 my $method = $self->{$method_attr} ;
439 my $code = $obj->can( $method ) or return ;
441 return $obj->$code( @data, $self->{'id'} ) ;
446 my( $self ) = shift ;
450 return unless exists( $self->{'write_buf'} ) ;
454 return if $self->{'shut_down'} ;
456 # encode the data in a packet if needed
458 if ( my $packet = $self->{packet} ) {
460 my $buf_ref = $packet->to_packet( $buffer ) ;
462 $self->{'write_buf'} .= ${$buf_ref} ;
466 $self->{'write_buf'} .= ref $buffer eq 'SCALAR' ?
467 ${$buffer} : $buffer ;
470 $self->{'write_event'}->start() ;
477 $self->write( $_[1] ) ;
479 $self->write_shut_down() ;
487 return if $self->{'shut_down'} ;
489 my $buf_ref = \$self->{'write_buf'} ;
490 my $buf_len = length $$buf_ref ;
492 #print "BUFLEN [$buf_len]\n" ;
494 unless ( $buf_len ) {
496 #print "AIO W STOPPING\n" ;
497 $self->{'write_event'}->stop() ;
501 my $bytes_written = syswrite( $self->{'write_fh'}, $$buf_ref ) ;
503 unless( defined( $bytes_written ) ) {
509 # remove the part of the buffer that was written
511 substr( $$buf_ref, 0, $bytes_written, '' ) ;
513 return if length( $$buf_ref ) ;
515 $self->write_shut_down() if $self->{'shut_down_when_empty'} ;
523 # print "DESTROY $self\n" ;