made event ok lines more consistant
[urisagit/Stem.git] / lib / Stem / AsyncIO.pm
CommitLineData
4536f655 1# File: Stem/AsyncIO.pm
2
3# This file is part of Stem.
4# Copyright (C) 1999, 2000, 2001 Stem Systems, Inc.
5
6# Stem is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10
11# Stem is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15
16# You should have received a copy of the GNU General Public License
17# along with Stem; if not, write to the Free Software
18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20# For a license to use the Stem under conditions other than those
21# described here, to purchase support for this software, or to purchase a
22# commercial warranty contract, please contact Stem Systems at:
23
24# Stem Systems, Inc. 781-643-7504
25# 79 Everett St. info@stemsystems.com
26# Arlington, MA 02474
27# USA
28
29package Stem::AsyncIO ;
30
31use strict ;
32use Data::Dumper ;
33
34use Stem::Vars ;
35
36
37my $attr_spec = [
38
39 {
40 'name' => 'object',
41 'required' => 1,
42 'help' => <<HELP,
43HELP
44 },
45
46 {
47 'name' => 'read_method',
48 'default' => 'async_read_data',
49 'help' => <<HELP,
50Method called with the data read from the read handle. It is only called if the
51data_addr attribute is not set.
52HELP
53 },
54
55 {
56 'name' => 'stderr_method',
57 'default' => 'async_stderr_data',
58 'help' => <<HELP,
59Method called with the data read from the stderr handle. It is only
60called if the stderr_addr attribute is not set.
61HELP
62 },
63
64 {
65 'name' => 'closed_method',
66 'default' => 'async_closed',
67 'help' => <<HELP,
68Method used when this object is closed.
69HELP
70 },
71 {
72 'name' => 'fh',
73 'help' => <<HELP,
74File handle used for reading and writing.
75HELP
76 },
77 {
78 'name' => 'read_fh',
79 'help' => <<HELP,
80File Handle used for reading.
81HELP
82 },
83 {
84 'name' => 'write_fh',
85 'help' => <<HELP,
86File handle used for standard output.
87HELP
88 },
89 {
90 'name' => 'stderr_fh',
91 'help' => <<HELP,
92File handle used for Standard Error.
93HELP
94 },
95 {
96 'name' => 'data_addr',
97 'type' => 'address',
98 'help' => <<HELP,
99The address of the Cell where the data is sent.
100HELP
101 },
102 {
103 'name' => 'stderr_addr',
104 'type' => 'address',
105 'help' => <<HELP,
106The address of the Cell where the stderr is sent.
107HELP
108 },
109 {
110 'name' => 'data_msg_type',
111 'default' => 'data',
112 'help' => <<HELP,
113This sets the type of the data message.
114HELP
115 },
116 {
117 'name' => 'codec',
118 'help' => <<HELP,
119Use this codec to encode/decode the I/O data. Each write is encoded to
120one packet out. Each packet read in will be decoded and either send a
121data message or generate a callback.
122HELP
123 },
124 {
125 'name' => 'stderr_msg_type',
126 'default' => 'stderr_data',
127 'help' => <<HELP,
128This sets the type of the stderr data message.
129HELP
130 },
131 {
132 'name' => 'from_addr',
133 'type' => 'address',
134 'help' => <<HELP,
135The address used in the 'from' field of data and stderr messages.
136HELP
137 },
138 {
139 'name' => 'send_data_on_close',
140 'type' => 'boolean',
141 'help' => <<HELP,
142Buffer all read data and send it when the read handle is closed.
143HELP
144 },
145 {
146 'name' => 'id',
147 'help' => <<HELP,
148The id is passed to the callback method as its only argument. Use it to
149identify different instances of this object.
150HELP
151
152 },
153
154################
155## add support to log all AIO
156################
157
158 {
159 'name' => 'log_label',
160 'default' => 'AIO',
161 'help' => <<HELP,
162HELP
163 },
164 {
165 'name' => 'log_level',
166 'default' => 5,
167 'help' => <<HELP,
168HELP
169 },
170 {
171 'name' => 'read_log',
172 'help' => <<HELP,
173HELP
174 },
175
176 {
177 'name' => 'stderr_log',
178 'help' => <<HELP,
179HELP
180 },
181
182 {
183 'name' => 'write_log',
184 'help' => <<HELP,
185HELP
186 },
187
188
189] ;
190
191use Carp 'cluck' ;
192
193sub new {
194
195 my( $class ) = shift ;
196
197 my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
198 return $self unless ref $self ;
199
200#cluck "NEW $self" ;
201
202 if ( $self->{'data_addr'} && ! $self->{'from_addr'} ) {
203
204 return "Using 'data_addr in AsyncIO requires a 'from_addr'" ;
205 }
206
207 if ( my $codec = $self->{'codec'} ) {
208
209 require Stem::Packet ;
210 my $packet = Stem::Packet->new( 'codec' => $codec ) ;
211 return $packet unless ref $packet ;
212
213 $self->{'packet'} = $packet ;
214 }
215
216 $self->{'stderr_addr'} ||= $self->{'data_addr'} ;
217
218 $self->{'buffer'} = '' if $self->{'send_data_on_close'} ;
219
220 $self->{ 'read_fh' } ||= $self->{ 'fh' } ;
221 $self->{ 'write_fh' } ||= $self->{ 'fh' } ;
222
223 if ( my $read_fh = $self->{'read_fh'} ) {
224
225 my $read_event = Stem::Event::Read->new(
226 'object' => $self,
227 'fh' => $read_fh,
228 ) ;
229
230 return $read_event unless ref $read_event ;
231
232 $self->{'read_event'} = $read_event ;
233 }
234
235 if ( my $stderr_fh = $self->{'stderr_fh'} ) {
236
237 my $stderr_event = Stem::Event::Read->new(
238 'object' => $self,
239 'fh' => $stderr_fh,
240 'method' => 'stderr_readable',
241 ) ;
242
243 return $stderr_event unless ref $stderr_event ;
244
245 $self->{'stderr_event'} = $stderr_event ;
246 }
247
248 if ( my $write_fh = $self->{'write_fh'} ) {
249
250 my $write_event = Stem::Event::Write->new(
251 'object' => $self,
252 'fh' => $write_fh,
253 ) ;
254
255 return $write_event unless ref $write_event ;
256
257 $self->{'write_event'} = $write_event ;
258
259 $self->{'write_buf'} = '' ;
260 }
261
262 return $self ;
263}
264
265sub shut_down {
266
267 my( $self ) = @_ ;
268
269#cluck "SHUT $self\n" ;
270
271
272 if ( $self->{'shut_down'} ) {
273
274 return ;
275 }
276
277 $self->{'shutting_down'} = 1 ;
278
279 $self->read_shut_down() ;
280
281 $self->write_shut_down() ;
282
283 if ( my $event = delete $self->{'stderr_event'} ) {
284
285 $event->cancel() ;
286 close( $self->{'stderr_fh'} ) ;
287 }
288
289 $self->{'shut_down'} = 1 ;
290
291#print "DELETE OBJ", caller(), "\n" ;
292
293 delete $self->{'object'} ;
294}
295
296sub read_shut_down {
297
298 my( $self ) = @_ ;
299
300 if ( my $event = delete $self->{'read_event'} ) {
301
302 $event->cancel() ;
303 }
304
305 shutdown( $self->{'read_fh'}, 0 ) ;
306}
307
308sub write_shut_down {
309
310 my( $self ) = @_ ;
311
312 if ( exists( $self->{'write_buf'} ) &&
313 length( $self->{'write_buf'} ) ) {
314
315#print "write handle shut when empty\n" ;
316 $self->{'shut_down_when_empty'} = 1 ;
317
318 return ;
319 }
320
321 if ( my $event = delete $self->{'write_event'} ) {
322
323 shutdown( $self->{'write_fh'}, 1 ) ;
324 $event->cancel() ;
325 }
326}
327
328sub readable {
329
330 my( $self ) = @_ ;
331
332 my( $read_buf ) ;
333
334 return if $self->{'shut_down'} ;
335
336 my $bytes_read = sysread( $self->{'read_fh'}, $read_buf, 8192 ) ;
337
338#print "READ: $bytes_read [$read_buf]\n" ;
339
340 unless( defined( $bytes_read ) && $bytes_read > 0 ) {
341
342 $self->read_shut_down() ;
343
344 if ( $self->{'send_data_on_close'} &&
345 length( $self->{'buffer'} ) ) {
346
347 $self->send_data() ;
348
349# since we sent the total read buffer, we don't do a closed callback.
350
351 return ;
352 }
353
354 $self->_callback( 'closed_method' ) ;
355
356 return ;
357 }
358
359# decode the packet if needed
360
361 if ( my $packet = $self->{packet} ) {
362
363 my $buf_ref = \$read_buf ;
364
365 while( my $data_ref = $packet->to_data( $buf_ref ) ) {
366
367 $self->send_data( $data_ref ) ;
368 $buf_ref = undef ;
369 }
370
371 return ;
372 }
373
374 if ( $self->{'send_data_on_close'} ) {
375
376 $self->{'buffer'} .= $read_buf ;
377 return ;
378 }
379
380 $self->send_data( \$read_buf ) ;
381}
382
383sub send_data {
384
385 my( $self, $buffer ) = @_ ;
386
387 my $buf_ref = $buffer || \$self->{'buffer'} ;
388
389 $self->_send_data_msg( 'data_addr', 'data_msg_type', $buf_ref ) ;
390 $self->_callback( 'read_method', $buf_ref ) ;
391
392 return ;
393}
394
395sub stderr_readable {
396
397 my( $self ) = @_ ;
398
399 my( $read_buf ) ;
400
401 my $bytes_read = sysread( $self->{'stderr_fh'}, $read_buf, 8192 ) ;
402
403# no callback on stderr close. let the read handle close deal with the
404# shutdown
405
406 return if $bytes_read == 0 ;
407
408#print "STDERR READ [$read_buf]\n" ;
409
410 $self->_send_data_msg( 'stderr_addr', 'stderr_msg_type', \$read_buf ) ;
411 $self->_callback( 'stderr_method', \$read_buf ) ;
412}
413
414sub _send_data_msg {
415
416 my( $self, $addr_attr, $type_attr, $data_ref ) = @_ ;
417
418 my $to_addr = $self->{$addr_attr} or return ;
419
420 my $msg = Stem::Msg->new(
421 'to' => $to_addr,
422 'from' => $self->{'from_addr'},
423 'type' => $self->{$type_attr},
424 'data' => $data_ref,
425 ) ;
426
427#print $msg->dump( 'SEND DATA' ) ;
428 $msg->dispatch() ;
429}
430
431sub _callback {
432
433 my ( $self, $method_attr, @data ) = @_ ;
434
435 my $obj = $self->{'object'} or return ;
436
437 my $method = $self->{$method_attr} ;
438
439 my $code = $obj->can( $method ) or return ;
440
441 return $obj->$code( @data, $self->{'id'} ) ;
442}
443
444sub write {
445
446 my( $self ) = shift ;
447
448 return unless @_ ;
449
450 return unless exists( $self->{'write_buf'} ) ;
451
452 my $buffer = shift ;
453
454 return if $self->{'shut_down'} ;
455
456# encode the data in a packet if needed
457
458 if ( my $packet = $self->{packet} ) {
459
460 my $buf_ref = $packet->to_packet( $buffer ) ;
461
462 $self->{'write_buf'} .= ${$buf_ref} ;
463 }
464 else {
465
466 $self->{'write_buf'} .= ref $buffer eq 'SCALAR' ?
467 ${$buffer} : $buffer ;
468 }
469
470 $self->{'write_event'}->start() ;
471}
472
473sub final_write {
474
475 my( $self ) = @_ ;
476
477 $self->write( $_[1] ) ;
478
479 $self->write_shut_down() ;
480}
481
482
483sub writeable {
484
485 my( $self ) = @_ ;
486
487 return if $self->{'shut_down'} ;
488
489 my $buf_ref = \$self->{'write_buf'} ;
490 my $buf_len = length $$buf_ref ;
491
492#print "BUFLEN [$buf_len]\n" ;
493
494 unless ( $buf_len ) {
495
496#print "AIO W STOPPING\n" ;
497 $self->{'write_event'}->stop() ;
498 return ;
499 }
500
501 my $bytes_written = syswrite( $self->{'write_fh'}, $$buf_ref ) ;
502
503 unless( defined( $bytes_written ) ) {
504
505# do a SHUTDOWN
506 return ;
507 }
508
509# remove the part of the buffer that was written
510
511 substr( $$buf_ref, 0, $bytes_written, '' ) ;
512
513 return if length( $$buf_ref ) ;
514
515 $self->write_shut_down() if $self->{'shut_down_when_empty'} ;
516}
517
518
519# DESTROY {
520
521# my( $self ) = @_ ;
522
523# print "DESTROY $self\n" ;
524
525# }
526
5271 ;