Cleaned up demos, various build fixes
[urisagit/Stem.git] / lib / Stem / Cell.pm
1 #  File: Stem/Cell.pm
2
3 #  This file is part of Stem.
4 #  Copyright (C) 1999, 2000, 2001 Stem Systems, Inc.
5
6 #  Stem is free software; you can redistribute it and/or modify
7 #  it under the terms of the GNU General Public License as published by
8 #  the Free Software Foundation; either version 2 of the License, or
9 #  (at your option) any later version.
10
11 #  Stem is distributed in the hope that it will be useful,
12 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 #  GNU General Public License for more details.
15
16 #  You should have received a copy of the GNU General Public License
17 #  along with Stem; if not, write to the Free Software
18 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
20 #  For a license to use the Stem under conditions other than those
21 #  described here, to purchase support for this software, or to purchase a
22 #  commercial warranty contract, please contact Stem Systems at:
23
24 #       Stem Systems, Inc.              781-643-7504
25 #       79 Everett St.                  info@stemsystems.com
26 #       Arlington, MA 02474
27 #       USA
28
29 package Stem::Cell ;
30
31 use strict ;
32
33 use Data::Dumper ;
34 use Carp qw( cluck ) ;
35
36 use Stem::Route qw( :cell ) ;
37 use Stem::AsyncIO ;
38 use Stem::Id ;
39 use Stem::Gather ;
40 use Stem::Cell::Clone ;
41 use Stem::Cell::Pipe ;
42 use Stem::Cell::Flow ;
43 use Stem::Cell::Work ;
44
45 use Stem::Trace 'log' => 'stem_status' , 'sub' => 'TraceStatus' ;
46
47 my %class_to_attr_name ;
48
49 my $attr_spec = [
50
51         {
52                 'name'          => 'reg_name',
53                 'help'          => <<HELP,
54 The registered address of the owner Cell
55 HELP
56         },
57         {
58                 'name'          => 'cloneable',
59                 'type'          => 'boolean',
60                 'help'          => <<HELP,
61 The parent Cell will be cloned upon triggering
62 HELP
63         },
64         {
65                 'name'          => 'data_addr',
66                 'type'          => 'address',
67                 'help'          => <<HELP,
68 Cell address to send any data read in. If not set here it must come
69 from a trigger message.
70 HELP
71         },
72         {
73                 'name'          => 'status_addr',
74                 'type'          => 'address',
75                 'help'          => <<HELP,
76 Cell address to send Cell status to
77 HELP
78         },
79         {
80                 'name'          => 'send_data_on_close',
81                 'type'          => 'boolean',
82                 'help'          => <<HELP,
83 Buffer all read data and only send it when the I/O is closed
84 HELP
85         },
86         {
87                 'name'          => 'no_io',
88                 'type'          => 'boolean',
89                 'help'          => <<HELP,
90 Don't do any I/O for the Cell. Either there is none or the owner Cell must
91 do its own I/O
92 HELP
93         },
94         {
95                 'name'          => 'pipe_addr',
96                 'type'          => 'address',
97                 'help'          => <<HELP,
98 Cell address to open a pipe to
99 HELP
100         },
101         {
102                 'name'          => 'pipe_args',
103                 'help'          => <<HELP,
104 This is list of arguments or a single argument which is passed to the
105 cell at the remote end of the pipe.
106 HELP
107         },
108         {
109                 'name'          => 'aio_args',
110                 'type'          => 'hash',
111                 'help'          => <<HELP,
112 This is a list of arguments passed to the Stem::AsyncIO module constructor
113 HELP
114         },
115         {
116                 'name'          => 'errors_to_output',
117                 'env'           => 'errors_to_output',
118                 'help'          => <<HELP,
119 Any received error messages will be sent to the output.
120 HELP
121         },
122
123 ############
124 # change this to max_clones
125 ############
126         {
127                 'name'          => 'id_size',
128                 'default'       => 3,
129                 'help'          => <<HELP,
130 Size of unique ID space for clones. Range is 26**N
131 HELP
132         },
133         {
134                 'name'          => 'trigger_method',
135                 'default'       => 'triggered_cell',
136                 'help'          => <<HELP,
137 Method to callback in owner object when cell is triggered
138 HELP
139         },
140
141 # the below attributes are not permanent yet
142 # unused so far.
143         {
144                 'name'          => 'shut_down_method',
145                 'default'       => 'shut_down_cell',
146                 'help'          => <<HELP,
147 Method to callback in owner object when cell is shutdown
148 HELP
149         },
150         {
151                 'name'          => 'activated_method',
152                 'default'       => 'activate_cell',
153                 'help'          => <<HELP,
154 Method to call in owner Cell when the cell is activated. UNSUPPORTED
155 HELP
156         },
157         {
158                 'name'          => 'sequence_done_method',
159                 'help'          => <<HELP,
160 Method to call in owner Cell when the executing sequence completes.
161 HELP
162         },
163         {
164                 'name'          => 'codec',
165                 'help'          => <<HELP,
166 This sets the codec that converts data packets to/from a byte stream.
167 HELP
168         },
169         {
170                 'name'          => 'work_ready_addr',
171                 'type'          => 'address',
172                 'help'          => <<HELP,
173 This is the address of the Cell that this Cell sends a message to
174 when work can be done (i.e. a work message can now be sent here).
175 HELP
176         },
177         {
178                 'name'          => 'stderr_log',
179                 'help'          => <<HELP,
180 This sets the log that will get the stderr output of the process
181 HELP
182         },
183 ] ;
184
185
186 sub new {
187
188         my( $class ) = shift ;
189
190         my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
191         return $self unless ref $self ;
192
193 #print $self->_dump( 'NEW' ) ;
194
195         return( $self ) ;
196 }
197
198 # this is only called in Stem::Conf for this class.
199 # it initializes the cell info object inside its owner object.
200
201 sub cell_init {
202
203         my( $self, $owner_obj, $cell_name, $cell_info_attr ) = @_ ;
204
205 # the $owner_obj is the cell that owns this Stem::Cell object
206
207         $self->{'owner_obj'} = $owner_obj ;
208         $self->{'cell_name'} = $cell_name ;
209 #       $self->{'from_addr'} = $cell_name ;
210                                         
211         $self->{'from_addr'} = Stem::Msg::make_address_string( 
212                                         $Stem::Vars::Hub_name,
213                                         $cell_name
214         ) ;
215
216         $self->{'cell_info_attr'} = $cell_info_attr ;
217
218 # save the attribute name that the owner class uses for the cell info.
219 # this is how a cell info object can be found given an owner cell object.
220 # also keep this name in the info itself
221
222 #print "OWNER [$owner_obj]\n" ;
223         $class_to_attr_name{ ref $owner_obj } ||= $cell_info_attr ;
224
225         if ( $self->{'cloneable'} ) {
226
227                 $self->{'id_obj'} = Stem::Id->new(
228                                         'size'  => $self->{'id_size'} ) ;
229                 $self->{'is_parent'} = 1 ;
230                 $self->{'target'} = '' ;
231         }
232 }
233
234 # get the cell info whether we were called from the owner object or
235 # the cell info itself ;
236
237 sub _get_cell_info {
238
239         my ( $self ) = @_ ;
240
241         my $class = ref $self ;
242
243         return "can't get cell info from '$self'\n" unless $class ;
244
245         return $self if $class eq __PACKAGE__ ;
246
247 #print "CLASS [$class][$class_to_attr_name{ $class }]\n" ;
248
249         return $self->{ $class_to_attr_name{ $class } } ;
250 }
251
252 sub cell_trigger {
253
254         my ( $self, @args ) = @_ ;
255
256         my $self_info = $self->_get_cell_info() ;
257
258         return $self_info unless ref $self_info ;
259
260         return if $self_info->{'triggered'} ;
261
262 # clone this cell and its info if needed
263 # $cell will either be $self or a clone of $self
264
265         my $cell = $self_info->_clone() ;
266
267         my $cell_info = $cell->_get_cell_info() ;
268
269         $cell_info->{'triggered'} = 1 ;
270
271 #print $cell_info->_dump( 'TRIGGER' ) ;
272
273 # set any args (e.g. from trigger message) into this cell 
274
275         $cell_info->cell_set_args( @args ) ;
276
277         $cell_info->_cell_pipe() ;
278
279         if ( my $err = $cell_info->_gather_io_args() ) {
280                 $cell_info->cell_shut_down( $err ) ;
281                 return $err ;
282         }
283
284 # do the callback into the (possibly cloned) cell
285
286         if ( my $err = $cell_info->_callback( 'trigger_method' ) ) {
287
288 #print "CALLBACK $err\n" ;
289
290                 $cell_info->cell_shut_down( $err ) ;
291                 return $err ;
292         }
293
294 #       return $cell_info ;
295         return ;
296 }
297
298 sub cell_trigger_cmd {
299
300         my ( $self, $msg ) = @_ ;
301
302         my @args ;
303
304         if ( my $data = $msg->data() ) {
305
306                 $data = ${$data} if ref $data eq 'SCALAR' ;
307
308                 my $ref = ref $data ;
309
310                 if ( ! $ref && defined $data ) {
311
312                         unless ( @args = $data =~ /(\S+)=(\S+)/g ) {
313
314                                 @args = ( 'args' => $data ) ;
315                         }
316                 }
317                 elsif ( $ref eq 'HASH' ) {
318
319                         @args = %{$data} ;
320                 }
321                 elsif ( $ref eq 'ARRAY' ) {
322
323                         @args = @{$data} ;
324                 }
325         }
326
327         push( @args, triggering_msg => $msg ) ;
328
329         my $err = $self->cell_trigger( @args ) ;
330
331 print "TRIG ERR [$err]\n" if $err ;
332
333         return $err if ref $err ;
334         return ;
335 }
336
337
338 sub cell_shut_down {
339
340         my( $self, $error ) = @_ ;
341
342         my $cell_info = $self->_get_cell_info() ;
343
344 #cluck "CELL SHUT\n" ;
345
346 #print $cell_info->_dump( 'SHUT' ) ;
347
348
349         return unless $error || $cell_info->{'active'} ;
350
351         $cell_info->{'error'} = $error ;
352
353 #print $cell_info->_dump( "SHUT $error" ) ;
354
355         if ( my $aio = delete $cell_info->{'aio'} ) {
356
357                 $aio->shut_down() ;
358         }
359
360         if ( my $gather = delete $cell_info->{'gather'} ) {
361
362                 $gather->shut_down() ;
363         }
364
365         $cell_info->_close_pipe() ;
366
367         $cell_info->_clone_delete() ;
368
369         delete $cell_info->{'args'} ;
370 #       delete $cell_info->{'data_addr'} ;
371
372         $cell_info->{'active'} = 0 ;
373         $cell_info->{'triggered'} = 0 ;
374
375         TraceStatus "cell shut down done" ;
376
377         return ;
378 }
379
380
381 sub cell_set_args {
382
383         my( $self, %args ) = @_ ;
384
385         my $cell_info = $self->_get_cell_info() ;
386
387         @{$cell_info->{'args'}}{ keys %args } = values %args ;
388
389         if ( my $gather = $cell_info->{'gather'} ) {
390
391                 my $err = $gather->gathered( keys %args ) ;
392                 return $err if $err ;
393         }
394
395         return ;
396 }
397
398 sub cell_get_args {
399
400         my( $self, @arg_keys ) = @_ ;
401
402         my $cell_info = $self->_get_cell_info() ;
403
404         return( @{$cell_info->{'args'}}{@arg_keys } ) ;
405 }
406
407 sub cell_info {
408
409         my( $self ) = shift ;
410
411         my $cell_info = $self->_get_cell_info() ;
412
413         $cell_info->{'info'} = shift if @_ ;
414
415         return $cell_info->{'info'} ;
416 }
417
418 sub _gather_io_args {
419
420         my( $self ) = @_ ;
421
422         my $cell_info = $self->_get_cell_info() ;
423
424         return if $cell_info->{'no_io'} ;
425
426         my @gather_keys = 'aio_args' ;
427
428         push( @gather_keys, 'data_addr' ) if
429                         $cell_info->{'piped'} &&
430                         ! $cell_info->{'data_addr'} ;
431
432         my $gather = Stem::Gather->new(
433                         'object'        => $cell_info,
434                         'keys'          => \@gather_keys,
435                         'gathered_method' => '_cell_activate_io',
436         ) ;
437
438         return $gather unless ref $gather ;
439
440         $cell_info->{'gather'} = $gather ;
441
442         my $err = $gather->gathered( keys %{$cell_info->{'args'}} ) ;
443
444         return $err if $err ;
445 }
446
447 sub _cell_activate_io {
448
449         my ( $self ) = @_ ;
450
451         TraceStatus "cell activated" ;
452
453         $self->{'active'} = 1 ;
454
455 #print $self->_dump( "BEFORE AIO" ) ;
456
457         my @aio_args ;
458
459 # get any config args
460
461         if ( my $aio_args = $self->{'aio_args'} ) {
462
463                 push( @aio_args, %{$aio_args} ) ;
464         }
465
466 # args from a trigger message override any config args
467
468         if ( my $msg_aio_args = $self->{'args'}{'aio_args'} ) {
469
470                 ref $msg_aio_args eq 'ARRAY' or return <<ERR ;
471 aio_args is not an ARRAY ref
472 ERR
473                 push( @aio_args, @{$msg_aio_args} ) ;
474         }
475
476         my $data_addr = $self->{'args'}{'data_addr'} || $self->{'data_addr'} ;
477
478         my $aio = Stem::AsyncIO->new(
479
480                 'object'                => $self->{'owner_obj'},
481                 'data_addr'             => $data_addr,
482                 'from_addr'             => $self->{'from_addr'},
483                 'send_data_on_close'    => $self->{'send_data_on_close'},
484                 'codec'                 => $self->{'codec'},
485                 @aio_args,
486         ) ;
487
488 print "AIO ERR [$aio]\n" unless ref $aio ;
489         return $aio unless ref $aio ;
490
491         $self->{'aio'} = $aio ;
492
493 #print $self->_dump( "AFTER AIO" ) ;
494
495         return ;
496 }
497
498 sub cell_activate {
499
500         my( $self ) = @_ ;
501
502         my $cell_info = $self->_get_cell_info() ;
503
504         $cell_info->{'active'} = 1 ;
505 }
506
507 *cell_status_cmd = \&status_cmd ;
508
509 sub status_cmd {
510
511         my( $self ) = @_ ;
512
513         my $cell_info = $self->_get_cell_info() ;
514
515         my $info = $cell_info->{'info'} || $cell_info->{'args'}{'info'} || '' ;
516
517         $info =~ s/^/\t\t/mg ;
518  
519         my $class = ref $cell_info->{'owner_obj'} ;
520
521 #       my $data_addr = Stem::Msg::address_string( 
522         my $data_addr = $cell_info->{'data_addr'} ||
523                         $cell_info->{'args'}{'data_addr'} ||
524                         '[NONE]' ;
525
526         my $active = ( $cell_info->{'active'} ) ? 'Active' : 'Inactive' ;
527
528         my $codec = $cell_info->{codec} || 'NONE' ;
529
530 print "CELL STATUS\n" ;
531
532 #my $dump = $cell_info->_dump( 'STATUS' ) ;
533 my $dump = '' ;
534
535         return <<STATUS ;
536 Cell Status for:
537 Class:          $class
538 Addr:           $cell_info->{'from_addr'}
539 Status:         $active
540 Data Addr:      $data_addr
541 Codec:          $codec
542 Info:$info
543
544 SELF: $self
545 CELL: $cell_info
546 AIO: $cell_info->{aio}
547 FH: $cell_info->{fh}
548
549 $dump
550
551 STATUS
552
553 }
554
555 sub data_in {
556
557         my( $self, $msg ) = @_ ;
558
559 #print "DATA SELF $self\n" ;
560
561 #print $msg->dump( 'CELL IN' ) ;
562
563         my $cell_info = $self->_get_cell_info() ;
564
565         if ( $cell_info->{'is_parent'} ) {
566
567 #print "PARENT\n" ;
568                 TraceStatus "parent cell $cell_info->{'from_addr'} ignoring msg" ;
569
570                 return ;
571         }
572
573         unless( $cell_info->{'active'} ) {
574 #print "INACTIVE\n" ;
575
576                 TraceStatus "cell not active. msg ignored FOO" ;
577
578                 return ;
579         }
580
581 #print $cell_info->_dump( "DATA IN" ) ;
582
583         $cell_info->{data_in_msg} = $msg ;
584         $cell_info->cell_write( $msg->data() ) ;
585 }
586
587 sub cell_write {
588
589         my( $self, $data ) = @_ ;
590
591         my $cell_info = $self->_get_cell_info() ;
592
593         $cell_info->{'aio'}->write( $data ) ;
594 }
595
596 sub _cell_write_sync {
597
598         my( $self, $data ) = @_ ;
599
600         my $cell_info = $self->_get_cell_info() ;
601
602 #print "SYNC $$data\n" ;
603
604 #print $cell_info->_dump( 'SYNC' ) ;
605
606         if ( my $aio_args = $cell_info->{'args'}{'aio_args'} ) {
607
608                 my %aio_args = @{$aio_args} ;
609
610                 if ( my $fh = $aio_args{'fh'} ) {
611
612 #                       $fh->blocking( 1 ) ;
613
614                         $fh->syswrite( (ref $data) ? $$data : $data ) ;
615                 }
616         }
617 }
618
619 # handle stderr data as plain data
620
621 *stderr_data_in = \&data_in ;
622
623
624 # $cell_info is the Stem::Cell object of the parent cell. the name is
625 # not self as it is differentiated from $clone_info.
626
627
628
629 sub _callback {
630
631         my ( $self, $method_name, @data ) = @_ ;
632
633         my $method = $self->{$method_name} ;
634
635         my $owner_obj = $self->{'owner_obj'} ;
636
637         if ( $owner_obj->can( $method ) ) {
638
639                 return $owner_obj->$method( @data ) ;
640         }
641
642         TraceStatus "can't call $method in $owner_obj" ;
643
644         return ;
645 }
646
647 sub cell_from_addr {
648
649         my ( $self ) = @_ ;
650
651         my $cell_info = $self->_get_cell_info() ;
652
653         return( $cell_info->{'from_addr'} ) ;
654 }
655
656 use Stem::Debug qw( dump_data ) ;
657
658 sub _dump {
659
660         my ( $self, $text ) = @_ ;
661
662 return $text . dump_data( $self ) ;
663
664         $text ||= 'CELL' ;
665
666         my $dump = "$text =\n" ;
667
668         my $cell_info = $self->_get_cell_info() ;
669
670 #       my $owner_obj = $cell_info->{owner_obj} ;
671 #       my @names = lookup_cell_name( $owner_obj ) ;
672 #       $dump .= "\nNames: @names\n" ;
673
674         foreach my $key ( sort keys %{$cell_info} ) {
675
676                 my $val = $cell_info->{$key} ;
677                 next unless defined $val ;
678
679                 if ( $key eq 'args' ) {
680
681                         $dump .= "\targs = {\n" ;
682
683                         foreach my $arg ( sort keys %{$val} ) {
684
685                                 my $arg_val = $val->{$arg} || '';
686
687                                 $dump .= "\t\t$arg = '$arg_val'\n" ;
688                         }
689
690                         $dump .= "\t}\n" ;
691
692                         next ;
693                 }
694
695                 $dump .= "\t$key = '$val'\n" ;
696         }
697
698         $dump .= "\n\n" ;
699
700         return $dump ;
701 }
702
703 sub dump_cmd {
704
705         my ($self) = @_ ;
706
707         my $cell_info = $self->_get_cell_info() ;
708
709         return $cell_info->_dump() . Dumper $cell_info ;
710 }
711
712 1 ;