edited to reflect the moving around of the demo files
[urisagit/Stem.git] / lib / Stem / Portal.pm
1 #  File: Stem/Portal.pm
2
3 #  This file is part of Stem.
4 #  Copyright (C) 1999, 2000, 2001 Stem Systems, Inc.
5
6 #  Stem is free software; you can redistribute it and/or modify
7 #  it under the terms of the GNU General Public License as published by
8 #  the Free Software Foundation; either version 2 of the License, or
9 #  (at your option) any later version.
10
11 #  Stem is distributed in the hope that it will be useful,
12 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 #  GNU General Public License for more details.
15
16 #  You should have received a copy of the GNU General Public License
17 #  along with Stem; if not, write to the Free Software
18 #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
20 #  For a license to use the Stem under conditions other than those
21 #  described here, to purchase support for this software, or to purchase a
22 #  commercial warranty contract, please contact Stem Systems at:
23
24 #       Stem Systems, Inc.              781-643-7504
25 #       79 Everett St.                  info@stemsystems.com
26 #       Arlington, MA 02474
27 #       USA
28
29 package Stem::Portal ;
30
31 use strict ;
32 use Carp ;
33 use Data::Dumper ;
34
35 use Stem::AsyncIO ;
36 use Stem::Vars ;
37 use Stem::Packet ;
38 use Stem::Socket ;
39 use Stem::Trace 'log' => 'stem_status', 'sub' => 'TraceStatus' ;
40 use Stem::Trace 'log' => 'stem_error' , 'sub' => 'TraceError' ;
41
42 my %name_to_portal ;
43 my %portal_to_names ;
44
45 my $default_portal ;
46
47
48 Stem::Route::register_class( __PACKAGE__, 'port' ) ;
49
50 my $attr_spec_portal = [
51
52         {
53                 'name'          => 'reg_name',
54                 'default'       => '',
55                 'help'          => <<HELP,
56 This is a unique name used to register this instance of a Portal.
57 HELP
58         },
59         {
60                 'name'          => 'server',
61                 'env'           => 'server',
62                 'help'          => <<HELP,
63 This determines if we are a server or a client.
64 If it is true, we are a server.  Otherwise, we are a client.
65 HELP
66         },
67         {
68                 'name'          => 'sync',
69                 'type'          => 'boolean',
70                 'default'       => 1,
71                 'help'          => <<HELP,
72 Mark this as a synchronously connecting Portal. Default is syncronous
73 connections, set to 0 for asynchronous. In both cases the same method
74 callbacks are used.
75 HELP
76         },
77         {
78                 'name'          => 'port',
79                 'default'       => 10_000,
80                 'env'           => 'port',
81                 'help'          => <<HELP,
82 This determines which port we bind to  if we are a server.
83 This determines which port we connect to if we are a client. 
84 The default value is 10,000.
85 HELP
86         },
87         {
88                 'name'          => 'host',
89                 'default'       => 'localhost',
90                 'env'           => 'host',
91                 'help'          => <<HELP,
92 This determines which host we attach to when we are a client.
93 The default value is localhost.
94 HELP
95         },
96
97         {
98                 'name'          => 'spawn_conf_file',
99                 'help'          => <<HELP,
100 This tells the portal to fork another Stem Hub and pass this value as
101 the configuration file argument to run_stem.  The new Hub will be
102 connected to this Portal and be private to it.
103 HELP
104         },
105
106         {
107                 'name'          => 'spawn_conf_args',
108                 'help'          => <<HELP,
109
110 This tells the portal to fork another Stem Hub and pass (via a
111 message) this data to the Stem::Conf as a configuration 
112 The new Hub will be connected to this Portal and be private
113 to it.
114
115 HELP
116         },
117
118
119         {
120                 'name'          => 'run_stem_args',
121                 'help'          => <<HELP,
122 These are the command line arguments to run_stem for the spawned Hub.
123 HELP
124         },
125         {
126                 'name'          => 'codec',
127                 'help'          => <<HELP,
128 This is the sub-class that is used to convert messages to/from a byte
129 stream for this portal
130 HELP
131         },
132         {
133                 'name'          => 'disable',
134                 'env'           => 'disable',
135                 'type'          => 'boolean',
136                 'help'          => <<HELP,
137 This flag will disable this Portal. It will not construct an object and
138 no errors will be returned.
139 HELP
140         },
141
142 ] ;
143
144 sub new {
145
146         my( $class ) = shift ;
147
148         my $self = Stem::Class::parse_args( $attr_spec_portal, @_ ) ;
149         return $self unless ref $self ;
150
151         return if $self->{ 'disable' } ;
152
153         my $name = $Stem::Vars::Hub_name ;
154
155         if ( $Env{'portal_use_stdio'} ) {
156
157                 return $self->new_child_portal() ;
158         }
159
160         if ( $self->{'spawn_conf_file'} || $self->{'spawn_conf_args'} ) {
161
162                 return $self->new_parent_portal() ;
163         }
164
165         if ( $self->{'server'} ) {
166
167                 $self->{'type'} = 'listener' ;
168                 $self->{'server_name'} = $name ;
169         }
170         else {
171
172                 $self->{'type'} = 'client' ;
173                 $self->{'name'} = $name ;
174         }
175
176 #print "REG new [$self->{'reg_name'}]\n" ;
177
178         my $sock_obj = Stem::Socket->new(
179                                 'object'        => $self,
180                                 'host'          => $self->{'host'},
181                                 'port'          => $self->{'port'},
182                                 'server'        => $self->{'server'},
183                                 'sync'          => $self->{'sync'},
184         ) ;
185
186         ref $sock_obj or return <<ERR ;
187 Stem::Portal '$self->{'reg_name'}' tried to connect/listen to $self->{'host'}:$self->{'port'}
188 $sock_obj
189 ERR
190
191         $self->{'sock_obj'} = $sock_obj ;
192
193         return ;
194 }
195
196 sub connected {
197
198         my( $self, $connected_sock ) = @_ ;
199
200         my( $portal ) ;
201
202         TraceStatus "Portal Connected" ;
203
204         $self->{'read_fh'} = $connected_sock ;
205         $self->{'write_fh'} = $connected_sock ;
206
207         my $type = $self->{'type'} ;
208
209         if ( $type eq 'listener' ) {
210
211 # fork off a new portal by making a clone of the listener portal
212
213                 $portal = bless { %$self } ;
214                 $portal->{'type'} = 'accepted' ;
215
216                 my $name = $portal->{'server_name'} ;
217
218                 $portal->{'name'} = $name ;
219
220                 delete( $portal->{'sock_obj'} ) ;
221         }
222         else {
223
224 #print "Portal Connected\n" ;
225
226 # a client portal is just itself
227
228                 $portal = $self ;
229
230 #print "REG [$self->{'reg_name'}]\n" ;
231
232                 if ( my $name = $self->{'reg_name'} ) {
233
234                         $portal->register( $name ) ;
235                 }
236
237                 unless ( $default_portal ) {
238
239                         $portal->register( 'DEFAULT' ) ;
240                         $default_portal = $portal ;
241                 }
242         }
243
244         my $err = $portal->_activate() ;
245
246         die $err if $err ;
247 }
248
249 my $run_stem_path ;
250
251 sub new_parent_portal {
252
253         my( $self ) = @_ ;
254
255         $run_stem_path ||= do {
256
257                 require Stem::Proc ;
258                 require Stem::InstallConfig ;
259
260                 $Stem::InstallConfig{ run_stem_path } ;
261         } ;
262
263         my $conf_file = $self->{'spawn_conf_file'} || 'portal_child' ;
264
265         my @run_stem_args = @{$self->{'run_stem_args'} || []} ;
266
267         my $proc = Stem::Proc->new(
268
269                         path            => $run_stem_path,
270                         proc_args       => [
271                                 'portal_use_stdio=1',
272                                 @run_stem_args,
273                                 $conf_file,
274                         ],
275                         spawn_now       => 1,
276                         cell_attr       => [
277                                 no_io   => 1,
278                         ],
279         ) ;
280
281         $self->{'proc'} = $proc ;
282
283         TraceStatus "Portal Paren" ;
284
285         $self->{'read_fh'} = $proc->read_fh() ;
286         $self->{'write_fh'} = $proc->write_fh() ;
287
288 #print "REG [$self->{'reg_name'}]\n" ;
289
290         my $err = $self->_activate() ;
291
292
293         die $err if $err ;
294 ###########
295 #        $self->{'spawn_conf_args'} ) {
296 #### when can we send the conf data?
297 ##########
298
299
300 }
301
302 sub new_child_portal {
303
304         my( $self ) = @_ ;
305
306         $self->{'type'} = 'child' ;
307
308
309         TraceStatus "Portal Child" ;
310
311         $self->{'read_fh'} = \*STDIN ;
312         $self->{'write_fh'} = \*STDOUT ;
313
314 #print "REG [$self->{'reg_name'}]\n" ;
315
316         unless ( $default_portal ) {
317
318                 $self->register( 'DEFAULT' ) ;
319                 $default_portal = $self ;
320         }
321
322         if ( my $portal_name = $Env{'portal_name'} ) {
323
324                 $self->register( $portal_name ) ;
325         }
326
327         my $err = $self->_activate() ;
328
329         die $err if $err ;
330 }
331
332
333 sub _activate {
334
335         my( $self ) = @_ ;
336
337         TraceStatus "Active portal" ;
338
339         my $aio = Stem::AsyncIO->new(
340
341                         'object'        => $self,
342                         'read_fh'       => $self->{'read_fh'},
343                         'write_fh'      => $self->{'write_fh'},
344                         'read_method'   => 'portal_data',
345                         'closed_method' => 'portal_closed',
346         ) ;
347
348         return $aio unless ref $aio ;
349
350         $self->{'aio'} = $aio ;
351
352         my $packet = Stem::Packet->new( 'codec' => $self->{'codec'} ) ;
353         return $packet unless ref $packet ;
354         $self->{'packet'} = $packet ;
355
356         my $msg = Stem::Msg->new( 'from' => "${Stem::Vars::Hub_name}:port",
357                                   'type'     => 'register',
358         ) ;
359
360         return $msg unless ref $msg ;
361
362         $self->write_msg( $msg ) ;
363
364         return ;
365 }
366
367 # this is not a method, but a class sub
368
369 sub send_msg {
370
371         my ( $msg, $to_hub ) = @_ ;
372
373         $to_hub ||= 'DEFAULT' ;
374
375         my $self = $name_to_portal{ $to_hub } ;
376
377         return "unknown Portal '$to_hub'" unless $self ;
378
379         $msg->from_hub( $self->{'name'} ) unless $msg->from_hub() ;
380 #       $msg->from_hub( $self->{'name'} ) ;
381
382         unless( $self->{'remote_hub'} ) {
383
384                 push( @{$self->{'queued_msgs'}}, $msg ) ;
385
386                 return ;
387         }
388
389         $self->write_msg( $msg ) ;
390
391         return ;
392 }
393
394 # this is a regular method called by the above sub.
395
396 sub write_msg {
397
398         my( $self, $msg ) = @_ ;
399
400         my $packet_text = $self->{'packet'}->to_packet( $msg ) ;
401
402 #print "PACK SEND [$packet_text]\n" ;
403
404         $self->{'aio'}->write( $packet_text ) ;
405 }
406
407 sub portal_data {
408
409         my( $self, $packet_text ) = @_ ;
410
411         my $packet = $self->{'packet'} ;
412
413 # parse out all messages that may be in the input data
414
415         while( my $msg = $packet->to_data( $packet_text ) ) {
416
417                 $self->_portal_msg_in( $msg ) ;
418
419 # no more incoming data in this callback 
420
421                 $packet_text = '' ;
422         }
423 }
424
425 sub _portal_msg_in {
426
427         my( $self, $msg ) = @_ ;
428
429         if ( $msg->type() eq 'register' ) {
430
431 # register the other hub and mark this hub as connecting to it.
432
433                 $self->{'remote_hub'} = $msg->from_hub() ;
434                 warn( caller(), $msg->dump() ) and die
435                         'Msg Has No Remote Hub' unless $self->{'remote_hub'} ;
436                 $self->register( $self->{'remote_hub'} ) ;
437
438 # handle messages that got queued while the portal was down
439
440                 while( my $queued_msg = shift @{$self->{'queued_msgs'}} ) {
441
442 #print $queued_msg->dump( 'QUEUED' ) ;
443                         $self->write_msg( $queued_msg ) ;
444                 }
445
446                 return ;
447         }
448
449         $msg->in_portal( $self->{'remote_hub'} ) ;
450         $msg->dispatch() ;
451 }
452
453
454 sub portal_closed {
455
456         my( $self ) = @_ ;
457
458 #TraceStatus "Portal closed" ;
459
460         Stem::Route::unregister_cell( $self ) ;
461         my $names = $self->unregister() ;
462
463         if ( $self->{'type'} eq 'accepted' ) {
464
465 #               TraceStatus "client hub '$self->{'name'}' closed" ;
466
467                 $self->shut_down() ;
468                 return ;
469         }
470
471         my @hub_names = ref $names ? @{$names} : 'UNKNOWN' ;
472
473         Stem::Event::end_loop() ;
474
475         die "server hub [@hub_names] died" ;
476 }
477
478 sub shut_down {
479
480         my( $self ) = @_ ;
481
482         TraceStatus "SHUT DOWN port : ". Dumper($self);
483
484         $self->{'aio'}->shut_down() ;
485         delete @{$self}{qw( object aio )} ;
486 }
487
488 # this is for messages directly to this portal. messages are sent out
489 # the portal via the send class method
490 #
491 # UNUSED so far
492
493 sub msg_in {
494
495         my( $self, $msg ) = @_ ;
496
497         TraceStatus "portal msg in" ;
498 }
499
500 sub register {
501
502         my( $self, $name ) = @_ ;
503
504 #print "NAME [$name]: ", caller(), "\n" ;
505
506         TraceStatus "portal arg: [$self] [$name]\n\t",
507                                         map( "<$_>", caller() ), "\n" ;
508
509         $name_to_portal{ $name } = $self ;
510         push( @{$portal_to_names{ $self }}, $name ) ;
511 }
512
513 sub unregister {
514
515         my( $name ) = @_ ;
516
517 # convert a name to its object ;
518
519         my $portal = ref $name ? $name : $name_to_portal{ $name } ;
520
521         if ( $portal ) {
522
523                 delete $name_to_portal{ $portal } ;
524
525                 my $names = delete $portal_to_names{ $portal } ;
526
527                 return $names ;
528
529         }
530
531         return ;
532 }
533
534 sub status_cmd {
535
536         my ($self, $msg ) = @_ ;
537
538 #print $msg->dump( 'PORT' ) ;
539
540         my $status = <<STATUS ;
541
542 Portal Status for Hub '$Stem::Vars::Hub_name'
543
544 STATUS
545
546         foreach my $port_name ( sort keys %name_to_portal ) {
547
548                 my $portal = $name_to_portal{ $port_name } ;
549
550                 $status .= <<STATUS ;
551 $port_name
552         Hub:    $portal->{'remote_hub'}
553         Type:   $portal->{'type'}
554
555 STATUS
556
557         }
558
559         return $status ;
560 }
561
562 1 ;