3 # This file is part of Stem.
4 # Copyright (C) 1999, 2000, 2001 Stem Systems, Inc.
6 # Stem is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # Stem is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with Stem; if not, write to the Free Software
18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 # For a license to use the Stem under conditions other than those
21 # described here, to purchase support for this software, or to purchase a
22 # commercial warranty contract, please contact Stem Systems at:
24 # Stem Systems, Inc. 781-643-7504
25 # 79 Everett St. info@stemsystems.com
34 use Carp qw( cluck ) ;
36 use Stem::Route qw( :cell ) ;
40 use Stem::Cell::Clone ;
41 use Stem::Cell::Pipe ;
42 use Stem::Cell::Flow ;
43 use Stem::Cell::Work ;
45 use Stem::Trace 'log' => 'stem_status' , 'sub' => 'TraceStatus' ;
47 my %class_to_attr_name ;
54 The registered address of the owner Cell
58 'name' => 'cloneable',
61 The parent Cell will be cloned upon triggering
65 'name' => 'data_addr',
68 Cell address to send any data read in. If not set here it must come
69 from a trigger message.
73 'name' => 'status_addr',
76 Cell address to send Cell status to
80 'name' => 'send_data_on_close',
83 Buffer all read data and only send it when the I/O is closed
90 Don't do any I/O for the Cell. Either there is none or the owner Cell must
95 'name' => 'pipe_addr',
98 Cell address to open a pipe to
102 'name' => 'pipe_args',
104 This is list of arguments or a single argument which is passed to the
105 cell at the remote end of the pipe.
109 'name' => 'aio_args',
112 This is a list of arguments passed to the Stem::AsyncIO module constructor
116 'name' => 'errors_to_output',
117 'env' => 'errors_to_output',
119 Any received error messages will be sent to the output.
124 # change this to max_clones
130 Size of unique ID space for clones. Range is 26**N
134 'name' => 'trigger_method',
135 'default' => 'triggered_cell',
137 Method to callback in owner object when cell is triggered
141 # the below attributes are not permanent yet
144 'name' => 'shut_down_method',
145 'default' => 'shut_down_cell',
147 Method to callback in owner object when cell is shutdown
151 'name' => 'activated_method',
152 'default' => 'activate_cell',
154 Method to call in owner Cell when the cell is activated. UNSUPPORTED
158 'name' => 'sequence_done_method',
160 Method to call in owner Cell when the executing sequence completes.
166 This sets the codec that converts data packets to/from a byte stream.
170 'name' => 'work_ready_addr',
173 This is the address of the Cell that this Cell sends a message to
174 when work can be done (i.e. a work message can now be sent here).
178 'name' => 'stderr_log',
180 This sets the log that will get the stderr output of the process
188 my( $class ) = shift ;
190 my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
191 return $self unless ref $self ;
193 #print $self->_dump( 'NEW' ) ;
198 # this is only called in Stem::Conf for this class.
199 # it initializes the cell info object inside its owner object.
203 my( $self, $owner_obj, $cell_name, $cell_info_attr ) = @_ ;
205 # the $owner_obj is the cell that owns this Stem::Cell object
207 $self->{'owner_obj'} = $owner_obj ;
208 $self->{'cell_name'} = $cell_name ;
209 # $self->{'from_addr'} = $cell_name ;
211 $self->{'from_addr'} = Stem::Msg::make_address_string(
212 $Stem::Vars::Hub_name,
216 $self->{'cell_info_attr'} = $cell_info_attr ;
218 # save the attribute name that the owner class uses for the cell info.
219 # this is how a cell info object can be found given an owner cell object.
220 # also keep this name in the info itself
222 #print "OWNER [$owner_obj]\n" ;
223 $class_to_attr_name{ ref $owner_obj } ||= $cell_info_attr ;
225 if ( $self->{'cloneable'} ) {
227 $self->{'id_obj'} = Stem::Id->new(
228 'size' => $self->{'id_size'} ) ;
229 $self->{'is_parent'} = 1 ;
230 $self->{'target'} = '' ;
234 # get the cell info whether we were called from the owner object or
235 # the cell info itself ;
241 my $class = ref $self ;
243 return "can't get cell info from '$self'\n" unless $class ;
245 return $self if $class eq __PACKAGE__ ;
247 #print "CLASS [$class][$class_to_attr_name{ $class }]\n" ;
249 return $self->{ $class_to_attr_name{ $class } } ;
254 my ( $self, @args ) = @_ ;
256 my $self_info = $self->_get_cell_info() ;
258 return $self_info unless ref $self_info ;
260 return if $self_info->{'triggered'} ;
262 # clone this cell and its info if needed
263 # $cell will either be $self or a clone of $self
265 my $cell = $self_info->_clone() ;
267 my $cell_info = $cell->_get_cell_info() ;
269 $cell_info->{'triggered'} = 1 ;
271 #print $cell_info->_dump( 'TRIGGER' ) ;
273 # set any args (e.g. from trigger message) into this cell
275 $cell_info->cell_set_args( @args ) ;
277 $cell_info->_cell_pipe() ;
279 if ( my $err = $cell_info->_gather_io_args() ) {
280 $cell_info->cell_shut_down( $err ) ;
284 # do the callback into the (possibly cloned) cell
286 if ( my $err = $cell_info->_callback( 'trigger_method' ) ) {
288 #print "CALLBACK $err\n" ;
290 $cell_info->cell_shut_down( $err ) ;
294 # return $cell_info ;
298 sub cell_trigger_cmd {
300 my ( $self, $msg ) = @_ ;
304 if ( my $data = $msg->data() ) {
306 $data = ${$data} if ref $data eq 'SCALAR' ;
308 my $ref = ref $data ;
310 if ( ! $ref && defined $data ) {
312 unless ( @args = $data =~ /(\S+)=(\S+)/g ) {
314 @args = ( 'args' => $data ) ;
317 elsif ( $ref eq 'HASH' ) {
321 elsif ( $ref eq 'ARRAY' ) {
327 push( @args, triggering_msg => $msg ) ;
329 my $err = $self->cell_trigger( @args ) ;
331 print "TRIG ERR [$err]\n" if $err ;
333 return $err if ref $err ;
340 my( $self, $error ) = @_ ;
342 my $cell_info = $self->_get_cell_info() ;
344 #cluck "CELL SHUT\n" ;
346 #print $cell_info->_dump( 'SHUT' ) ;
349 return unless $error || $cell_info->{'active'} ;
351 $cell_info->{'error'} = $error ;
353 #print $cell_info->_dump( "SHUT $error" ) ;
355 if ( my $aio = delete $cell_info->{'aio'} ) {
360 if ( my $gather = delete $cell_info->{'gather'} ) {
362 $gather->shut_down() ;
365 $cell_info->_close_pipe() ;
367 $cell_info->_clone_delete() ;
369 delete $cell_info->{'args'} ;
370 # delete $cell_info->{'data_addr'} ;
372 $cell_info->{'active'} = 0 ;
373 $cell_info->{'triggered'} = 0 ;
375 TraceStatus "cell shut down done" ;
383 my( $self, %args ) = @_ ;
385 my $cell_info = $self->_get_cell_info() ;
387 @{$cell_info->{'args'}}{ keys %args } = values %args ;
389 if ( my $gather = $cell_info->{'gather'} ) {
391 my $err = $gather->gathered( keys %args ) ;
392 return $err if $err ;
400 my( $self, @arg_keys ) = @_ ;
402 my $cell_info = $self->_get_cell_info() ;
404 return( @{$cell_info->{'args'}}{@arg_keys } ) ;
409 my( $self ) = shift ;
411 my $cell_info = $self->_get_cell_info() ;
413 $cell_info->{'info'} = shift if @_ ;
415 return $cell_info->{'info'} ;
418 sub _gather_io_args {
422 my $cell_info = $self->_get_cell_info() ;
424 return if $cell_info->{'no_io'} ;
426 my @gather_keys = 'aio_args' ;
428 push( @gather_keys, 'data_addr' ) if
429 $cell_info->{'piped'} &&
430 ! $cell_info->{'data_addr'} ;
432 my $gather = Stem::Gather->new(
433 'object' => $cell_info,
434 'keys' => \@gather_keys,
435 'gathered_method' => '_cell_activate_io',
438 return $gather unless ref $gather ;
440 $cell_info->{'gather'} = $gather ;
442 my $err = $gather->gathered( keys %{$cell_info->{'args'}} ) ;
444 return $err if $err ;
447 sub _cell_activate_io {
451 TraceStatus "cell activated" ;
453 $self->{'active'} = 1 ;
455 #print $self->_dump( "BEFORE AIO" ) ;
459 # get any config args
461 if ( my $aio_args = $self->{'aio_args'} ) {
463 push( @aio_args, %{$aio_args} ) ;
466 # args from a trigger message override any config args
468 if ( my $msg_aio_args = $self->{'args'}{'aio_args'} ) {
470 ref $msg_aio_args eq 'ARRAY' or return <<ERR ;
471 aio_args is not an ARRAY ref
473 push( @aio_args, @{$msg_aio_args} ) ;
476 my $data_addr = $self->{'args'}{'data_addr'} || $self->{'data_addr'} ;
478 my $aio = Stem::AsyncIO->new(
480 'object' => $self->{'owner_obj'},
481 'data_addr' => $data_addr,
482 'from_addr' => $self->{'from_addr'},
483 'send_data_on_close' => $self->{'send_data_on_close'},
484 'codec' => $self->{'codec'},
488 print "AIO ERR [$aio]\n" unless ref $aio ;
489 return $aio unless ref $aio ;
491 $self->{'aio'} = $aio ;
493 #print $self->_dump( "AFTER AIO" ) ;
502 my $cell_info = $self->_get_cell_info() ;
504 $cell_info->{'active'} = 1 ;
507 *cell_status_cmd = \&status_cmd ;
513 my $cell_info = $self->_get_cell_info() ;
515 my $info = $cell_info->{'info'} || $cell_info->{'args'}{'info'} || '' ;
517 $info =~ s/^/\t\t/mg ;
519 my $class = ref $cell_info->{'owner_obj'} ;
521 # my $data_addr = Stem::Msg::address_string(
522 my $data_addr = $cell_info->{'data_addr'} ||
523 $cell_info->{'args'}{'data_addr'} ||
526 my $active = ( $cell_info->{'active'} ) ? 'Active' : 'Inactive' ;
528 my $codec = $cell_info->{codec} || 'NONE' ;
530 print "CELL STATUS\n" ;
532 #my $dump = $cell_info->_dump( 'STATUS' ) ;
538 Addr: $cell_info->{'from_addr'}
540 Data Addr: $data_addr
546 AIO: $cell_info->{aio}
557 my( $self, $msg ) = @_ ;
559 #print "DATA SELF $self\n" ;
561 #print $msg->dump( 'CELL IN' ) ;
563 my $cell_info = $self->_get_cell_info() ;
565 if ( $cell_info->{'is_parent'} ) {
568 TraceStatus "parent cell $cell_info->{'from_addr'} ignoring msg" ;
573 unless( $cell_info->{'active'} ) {
574 #print "INACTIVE\n" ;
576 TraceStatus "cell not active. msg ignored FOO" ;
581 #print $cell_info->_dump( "DATA IN" ) ;
583 $cell_info->{data_in_msg} = $msg ;
584 $cell_info->cell_write( $msg->data() ) ;
589 my( $self, $data ) = @_ ;
591 my $cell_info = $self->_get_cell_info() ;
593 $cell_info->{'aio'}->write( $data ) ;
596 sub _cell_write_sync {
598 my( $self, $data ) = @_ ;
600 my $cell_info = $self->_get_cell_info() ;
602 #print "SYNC $$data\n" ;
604 #print $cell_info->_dump( 'SYNC' ) ;
606 if ( my $aio_args = $cell_info->{'args'}{'aio_args'} ) {
608 my %aio_args = @{$aio_args} ;
610 if ( my $fh = $aio_args{'fh'} ) {
612 # $fh->blocking( 1 ) ;
614 $fh->syswrite( (ref $data) ? $$data : $data ) ;
619 # handle stderr data as plain data
621 *stderr_data_in = \&data_in ;
624 # $cell_info is the Stem::Cell object of the parent cell. the name is
625 # not self as it is differentiated from $clone_info.
631 my ( $self, $method_name, @data ) = @_ ;
633 my $method = $self->{$method_name} ;
635 my $owner_obj = $self->{'owner_obj'} ;
637 if ( $owner_obj->can( $method ) ) {
639 return $owner_obj->$method( @data ) ;
642 TraceStatus "can't call $method in $owner_obj" ;
651 my $cell_info = $self->_get_cell_info() ;
653 return( $cell_info->{'from_addr'} ) ;
656 use Stem::Debug qw( dump_data ) ;
660 my ( $self, $text ) = @_ ;
662 return $text . dump_data( $self ) ;
666 my $dump = "$text =\n" ;
668 my $cell_info = $self->_get_cell_info() ;
670 # my $owner_obj = $cell_info->{owner_obj} ;
671 # my @names = lookup_cell_name( $owner_obj ) ;
672 # $dump .= "\nNames: @names\n" ;
674 foreach my $key ( sort keys %{$cell_info} ) {
676 my $val = $cell_info->{$key} ;
677 next unless defined $val ;
679 if ( $key eq 'args' ) {
681 $dump .= "\targs = {\n" ;
683 foreach my $arg ( sort keys %{$val} ) {
685 my $arg_val = $val->{$arg} || '';
687 $dump .= "\t\t$arg = '$arg_val'\n" ;
695 $dump .= "\t$key = '$val'\n" ;
707 my $cell_info = $self->_get_cell_info() ;
709 return $cell_info->_dump() . Dumper $cell_info ;