Merge branch 'master' of steve@erxz.com:/home/uri/git_repo/stem
[urisagit/Stem.git] / lib / Stem / Cell.pm
CommitLineData
4536f655 1# File: Stem/Cell.pm
2
3# This file is part of Stem.
4# Copyright (C) 1999, 2000, 2001 Stem Systems, Inc.
5
6# Stem is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10
11# Stem is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15
16# You should have received a copy of the GNU General Public License
17# along with Stem; if not, write to the Free Software
18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20# For a license to use the Stem under conditions other than those
21# described here, to purchase support for this software, or to purchase a
22# commercial warranty contract, please contact Stem Systems at:
23
24# Stem Systems, Inc. 781-643-7504
25# 79 Everett St. info@stemsystems.com
26# Arlington, MA 02474
27# USA
28
29package Stem::Cell ;
30
31use strict ;
32
33use Data::Dumper ;
34use Carp qw( cluck ) ;
35
36use Stem::Route qw( :cell ) ;
37use Stem::AsyncIO ;
38use Stem::Id ;
39use Stem::Gather ;
40use Stem::Cell::Clone ;
41use Stem::Cell::Pipe ;
42use Stem::Cell::Flow ;
43use Stem::Cell::Work ;
44
45use Stem::Trace 'log' => 'stem_status' , 'sub' => 'TraceStatus' ;
46
47my %class_to_attr_name ;
48
49my $attr_spec = [
50
51 {
52 'name' => 'reg_name',
53 'help' => <<HELP,
54The registered address of the owner Cell
55HELP
56 },
57 {
58 'name' => 'cloneable',
59 'type' => 'boolean',
60 'help' => <<HELP,
61The parent Cell will be cloned upon triggering
62HELP
63 },
64 {
65 'name' => 'data_addr',
66 'type' => 'address',
67 'help' => <<HELP,
68Cell address to send any data read in. If not set here it must come
69from a trigger message.
70HELP
71 },
72 {
73 'name' => 'status_addr',
74 'type' => 'address',
75 'help' => <<HELP,
76Cell address to send Cell status to
77HELP
78 },
79 {
80 'name' => 'send_data_on_close',
81 'type' => 'boolean',
82 'help' => <<HELP,
83Buffer all read data and only send it when the I/O is closed
84HELP
85 },
86 {
87 'name' => 'no_io',
88 'type' => 'boolean',
89 'help' => <<HELP,
90Don't do any I/O for the Cell. Either there is none or the owner Cell must
91do its own I/O
92HELP
93 },
94 {
95 'name' => 'pipe_addr',
96 'type' => 'address',
97 'help' => <<HELP,
98Cell address to open a pipe to
99HELP
100 },
101 {
102 'name' => 'pipe_args',
103 'help' => <<HELP,
104This is list of arguments or a single argument which is passed to the
105cell at the remote end of the pipe.
106HELP
107 },
108 {
109 'name' => 'aio_args',
110 'type' => 'hash',
111 'help' => <<HELP,
112This is a list of arguments passed to the Stem::AsyncIO module constructor
113HELP
114 },
115 {
116 'name' => 'errors_to_output',
117 'env' => 'errors_to_output',
118 'help' => <<HELP,
119Any received error messages will be sent to the output.
120HELP
121 },
122
123############
124# change this to max_clones
125############
126 {
127 'name' => 'id_size',
128 'default' => 3,
129 'help' => <<HELP,
130Size of unique ID space for clones. Range is 26**N
131HELP
132 },
133 {
134 'name' => 'trigger_method',
135 'default' => 'triggered_cell',
136 'help' => <<HELP,
137Method to callback in owner object when cell is triggered
138HELP
139 },
140
141# the below attributes are not permanent yet
142# unused so far.
143 {
144 'name' => 'shut_down_method',
145 'default' => 'shut_down_cell',
146 'help' => <<HELP,
147Method to callback in owner object when cell is shutdown
148HELP
149 },
150 {
151 'name' => 'activated_method',
152 'default' => 'activate_cell',
153 'help' => <<HELP,
154Method to call in owner Cell when the cell is activated. UNSUPPORTED
155HELP
156 },
157 {
158 'name' => 'sequence_done_method',
159 'help' => <<HELP,
160Method to call in owner Cell when the executing sequence completes.
161HELP
162 },
163 {
164 'name' => 'codec',
165 'help' => <<HELP,
166This sets the codec that converts data packets to/from a byte stream.
167HELP
168 },
169 {
170 'name' => 'work_ready_addr',
171 'type' => 'address',
172 'help' => <<HELP,
173This is the address of the Cell that this Cell sends a message to
174when work can be done (i.e. a work message can now be sent here).
175HELP
176 },
177 {
178 'name' => 'stderr_log',
179 'help' => <<HELP,
180This sets the log that will get the stderr output of the process
181HELP
182 },
183] ;
184
185
186sub new {
187
188 my( $class ) = shift ;
189
190 my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
191 return $self unless ref $self ;
192
193#print $self->_dump( 'NEW' ) ;
194
195 return( $self ) ;
196}
197
198# this is only called in Stem::Conf for this class.
199# it initializes the cell info object inside its owner object.
200
201sub cell_init {
202
203 my( $self, $owner_obj, $cell_name, $cell_info_attr ) = @_ ;
204
205# the $owner_obj is the cell that owns this Stem::Cell object
206
207 $self->{'owner_obj'} = $owner_obj ;
208 $self->{'cell_name'} = $cell_name ;
209# $self->{'from_addr'} = $cell_name ;
210
211 $self->{'from_addr'} = Stem::Msg::make_address_string(
212 $Stem::Vars::Hub_name,
213 $cell_name
214 ) ;
215
216 $self->{'cell_info_attr'} = $cell_info_attr ;
217
218# save the attribute name that the owner class uses for the cell info.
219# this is how a cell info object can be found given an owner cell object.
220# also keep this name in the info itself
221
222#print "OWNER [$owner_obj]\n" ;
223 $class_to_attr_name{ ref $owner_obj } ||= $cell_info_attr ;
224
225 if ( $self->{'cloneable'} ) {
226
227 $self->{'id_obj'} = Stem::Id->new(
228 'size' => $self->{'id_size'} ) ;
229 $self->{'is_parent'} = 1 ;
230 $self->{'target'} = '' ;
231 }
232}
233
234# get the cell info whether we were called from the owner object or
235# the cell info itself ;
236
237sub _get_cell_info {
238
239 my ( $self ) = @_ ;
240
241 my $class = ref $self ;
242
243 return "can't get cell info from '$self'\n" unless $class ;
244
245 return $self if $class eq __PACKAGE__ ;
246
247#print "CLASS [$class][$class_to_attr_name{ $class }]\n" ;
248
249 return $self->{ $class_to_attr_name{ $class } } ;
250}
251
252sub cell_trigger {
253
254 my ( $self, @args ) = @_ ;
255
256 my $self_info = $self->_get_cell_info() ;
257
258 return $self_info unless ref $self_info ;
259
260 return if $self_info->{'triggered'} ;
261
262# clone this cell and its info if needed
263# $cell will either be $self or a clone of $self
264
265 my $cell = $self_info->_clone() ;
266
267 my $cell_info = $cell->_get_cell_info() ;
268
269 $cell_info->{'triggered'} = 1 ;
270
271#print $cell_info->_dump( 'TRIGGER' ) ;
272
273# set any args (e.g. from trigger message) into this cell
274
275 $cell_info->cell_set_args( @args ) ;
276
277 $cell_info->_cell_pipe() ;
278
279 if ( my $err = $cell_info->_gather_io_args() ) {
280 $cell_info->cell_shut_down( $err ) ;
281 return $err ;
282 }
283
284# do the callback into the (possibly cloned) cell
285
286 if ( my $err = $cell_info->_callback( 'trigger_method' ) ) {
287
288#print "CALLBACK $err\n" ;
289
290 $cell_info->cell_shut_down( $err ) ;
291 return $err ;
292 }
293
294# return $cell_info ;
295 return ;
296}
297
298sub cell_trigger_cmd {
299
300 my ( $self, $msg ) = @_ ;
301
302 my @args ;
303
304 if ( my $data = $msg->data() ) {
305
306 $data = ${$data} if ref $data eq 'SCALAR' ;
307
308 my $ref = ref $data ;
309
310 if ( ! $ref && defined $data ) {
311
312 unless ( @args = $data =~ /(\S+)=(\S+)/g ) {
313
314 @args = ( 'args' => $data ) ;
315 }
316 }
317 elsif ( $ref eq 'HASH' ) {
318
319 @args = %{$data} ;
320 }
321 elsif ( $ref eq 'ARRAY' ) {
322
323 @args = @{$data} ;
324 }
325 }
326
327 push( @args, triggering_msg => $msg ) ;
328
329 my $err = $self->cell_trigger( @args ) ;
330
331print "TRIG ERR [$err]\n" if $err ;
332
333 return $err if ref $err ;
334 return ;
335}
336
337
338sub cell_shut_down {
339
340 my( $self, $error ) = @_ ;
341
342 my $cell_info = $self->_get_cell_info() ;
343
344#cluck "CELL SHUT\n" ;
345
346#print $cell_info->_dump( 'SHUT' ) ;
347
348
349 return unless $error || $cell_info->{'active'} ;
350
351 $cell_info->{'error'} = $error ;
352
353#print $cell_info->_dump( "SHUT $error" ) ;
354
355 if ( my $aio = delete $cell_info->{'aio'} ) {
356
357 $aio->shut_down() ;
358 }
359
360 if ( my $gather = delete $cell_info->{'gather'} ) {
361
362 $gather->shut_down() ;
363 }
364
365 $cell_info->_close_pipe() ;
366
367 $cell_info->_clone_delete() ;
368
369 delete $cell_info->{'args'} ;
370# delete $cell_info->{'data_addr'} ;
371
372 $cell_info->{'active'} = 0 ;
373 $cell_info->{'triggered'} = 0 ;
374
375 TraceStatus "cell shut down done" ;
376
377 return ;
378}
379
380
381sub cell_set_args {
382
383 my( $self, %args ) = @_ ;
384
385 my $cell_info = $self->_get_cell_info() ;
386
387 @{$cell_info->{'args'}}{ keys %args } = values %args ;
388
389 if ( my $gather = $cell_info->{'gather'} ) {
390
391 my $err = $gather->gathered( keys %args ) ;
392 return $err if $err ;
393 }
394
395 return ;
396}
397
398sub cell_get_args {
399
400 my( $self, @arg_keys ) = @_ ;
401
402 my $cell_info = $self->_get_cell_info() ;
403
404 return( @{$cell_info->{'args'}}{@arg_keys } ) ;
405}
406
407sub cell_info {
408
409 my( $self ) = shift ;
410
411 my $cell_info = $self->_get_cell_info() ;
412
413 $cell_info->{'info'} = shift if @_ ;
414
415 return $cell_info->{'info'} ;
416}
417
418sub _gather_io_args {
419
420 my( $self ) = @_ ;
421
422 my $cell_info = $self->_get_cell_info() ;
423
424 return if $cell_info->{'no_io'} ;
425
426 my @gather_keys = 'aio_args' ;
427
428 push( @gather_keys, 'data_addr' ) if
429 $cell_info->{'piped'} &&
430 ! $cell_info->{'data_addr'} ;
431
432 my $gather = Stem::Gather->new(
433 'object' => $cell_info,
434 'keys' => \@gather_keys,
435 'gathered_method' => '_cell_activate_io',
436 ) ;
437
438 return $gather unless ref $gather ;
439
440 $cell_info->{'gather'} = $gather ;
441
442 my $err = $gather->gathered( keys %{$cell_info->{'args'}} ) ;
443
444 return $err if $err ;
445}
446
447sub _cell_activate_io {
448
449 my ( $self ) = @_ ;
450
451 TraceStatus "cell activated" ;
452
453 $self->{'active'} = 1 ;
454
455#print $self->_dump( "BEFORE AIO" ) ;
456
457 my @aio_args ;
458
459# get any config args
460
461 if ( my $aio_args = $self->{'aio_args'} ) {
462
463 push( @aio_args, %{$aio_args} ) ;
464 }
465
466# args from a trigger message override any config args
467
468 if ( my $msg_aio_args = $self->{'args'}{'aio_args'} ) {
469
470 ref $msg_aio_args eq 'ARRAY' or return <<ERR ;
471aio_args is not an ARRAY ref
472ERR
473 push( @aio_args, @{$msg_aio_args} ) ;
474 }
475
476 my $data_addr = $self->{'args'}{'data_addr'} || $self->{'data_addr'} ;
477
478 my $aio = Stem::AsyncIO->new(
479
480 'object' => $self->{'owner_obj'},
481 'data_addr' => $data_addr,
482 'from_addr' => $self->{'from_addr'},
483 'send_data_on_close' => $self->{'send_data_on_close'},
484 'codec' => $self->{'codec'},
485 @aio_args,
486 ) ;
487
488print "AIO ERR [$aio]\n" unless ref $aio ;
489 return $aio unless ref $aio ;
490
491 $self->{'aio'} = $aio ;
492
493#print $self->_dump( "AFTER AIO" ) ;
494
495 return ;
496}
497
498sub cell_activate {
499
500 my( $self ) = @_ ;
501
502 my $cell_info = $self->_get_cell_info() ;
503
504 $cell_info->{'active'} = 1 ;
505}
506
507*cell_status_cmd = \&status_cmd ;
508
509sub status_cmd {
510
511 my( $self ) = @_ ;
512
513 my $cell_info = $self->_get_cell_info() ;
514
515 my $info = $cell_info->{'info'} || $cell_info->{'args'}{'info'} || '' ;
516
517 $info =~ s/^/\t\t/mg ;
518
519 my $class = ref $cell_info->{'owner_obj'} ;
520
521# my $data_addr = Stem::Msg::address_string(
522 my $data_addr = $cell_info->{'data_addr'} ||
523 $cell_info->{'args'}{'data_addr'} ||
524 '[NONE]' ;
525
526 my $active = ( $cell_info->{'active'} ) ? 'Active' : 'Inactive' ;
527
528 my $codec = $cell_info->{codec} || 'NONE' ;
529
530print "CELL STATUS\n" ;
531
532#my $dump = $cell_info->_dump( 'STATUS' ) ;
533my $dump = '' ;
534
535 return <<STATUS ;
536Cell Status for:
537Class: $class
538Addr: $cell_info->{'from_addr'}
539Status: $active
540Data Addr: $data_addr
541Codec: $codec
542Info:$info
543
544SELF: $self
545CELL: $cell_info
546AIO: $cell_info->{aio}
547FH: $cell_info->{fh}
548
549$dump
550
551STATUS
552
553}
554
555sub data_in {
556
557 my( $self, $msg ) = @_ ;
558
559#print "DATA SELF $self\n" ;
560
561#print $msg->dump( 'CELL IN' ) ;
562
563 my $cell_info = $self->_get_cell_info() ;
564
565 if ( $cell_info->{'is_parent'} ) {
566
567#print "PARENT\n" ;
568 TraceStatus "parent cell $cell_info->{'from_addr'} ignoring msg" ;
569
570 return ;
571 }
572
573 unless( $cell_info->{'active'} ) {
574#print "INACTIVE\n" ;
575
576 TraceStatus "cell not active. msg ignored FOO" ;
577
578 return ;
579 }
580
581#print $cell_info->_dump( "DATA IN" ) ;
582
583 $cell_info->{data_in_msg} = $msg ;
584 $cell_info->cell_write( $msg->data() ) ;
585}
586
587sub cell_write {
588
589 my( $self, $data ) = @_ ;
590
591 my $cell_info = $self->_get_cell_info() ;
592
593 $cell_info->{'aio'}->write( $data ) ;
594}
595
596sub _cell_write_sync {
597
598 my( $self, $data ) = @_ ;
599
600 my $cell_info = $self->_get_cell_info() ;
601
602#print "SYNC $$data\n" ;
603
604#print $cell_info->_dump( 'SYNC' ) ;
605
606 if ( my $aio_args = $cell_info->{'args'}{'aio_args'} ) {
607
608 my %aio_args = @{$aio_args} ;
609
610 if ( my $fh = $aio_args{'fh'} ) {
611
612# $fh->blocking( 1 ) ;
613
614 $fh->syswrite( (ref $data) ? $$data : $data ) ;
615 }
616 }
617}
618
619# handle stderr data as plain data
620
621*stderr_data_in = \&data_in ;
622
623
624# $cell_info is the Stem::Cell object of the parent cell. the name is
625# not self as it is differentiated from $clone_info.
626
627
628
629sub _callback {
630
631 my ( $self, $method_name, @data ) = @_ ;
632
633 my $method = $self->{$method_name} ;
634
635 my $owner_obj = $self->{'owner_obj'} ;
636
637 if ( $owner_obj->can( $method ) ) {
638
639 return $owner_obj->$method( @data ) ;
640 }
641
642 TraceStatus "can't call $method in $owner_obj" ;
643
644 return ;
645}
646
647sub cell_from_addr {
648
649 my ( $self ) = @_ ;
650
651 my $cell_info = $self->_get_cell_info() ;
652
653 return( $cell_info->{'from_addr'} ) ;
654}
655
656use Stem::Debug qw( dump_data ) ;
657
658sub _dump {
659
660 my ( $self, $text ) = @_ ;
661
662return $text . dump_data( $self ) ;
663
664 $text ||= 'CELL' ;
665
666 my $dump = "$text =\n" ;
667
668 my $cell_info = $self->_get_cell_info() ;
669
670# my $owner_obj = $cell_info->{owner_obj} ;
671# my @names = lookup_cell_name( $owner_obj ) ;
672# $dump .= "\nNames: @names\n" ;
673
674 foreach my $key ( sort keys %{$cell_info} ) {
675
676 my $val = $cell_info->{$key} ;
677 next unless defined $val ;
678
679 if ( $key eq 'args' ) {
680
681 $dump .= "\targs = {\n" ;
682
683 foreach my $arg ( sort keys %{$val} ) {
684
685 my $arg_val = $val->{$arg} || '';
686
687 $dump .= "\t\t$arg = '$arg_val'\n" ;
688 }
689
690 $dump .= "\t}\n" ;
691
692 next ;
693 }
694
695 $dump .= "\t$key = '$val'\n" ;
696 }
697
698 $dump .= "\n\n" ;
699
700 return $dump ;
701}
702
703sub dump_cmd {
704
705 my ($self) = @_ ;
706
707 my $cell_info = $self->_get_cell_info() ;
708
709 return $cell_info->_dump() . Dumper $cell_info ;
710}
711
7121 ;