edited to reflect the moving around of the demo files
[urisagit/Stem.git] / lib / Stem / Portal.pm
CommitLineData
4536f655 1# File: Stem/Portal.pm
2
3# This file is part of Stem.
4# Copyright (C) 1999, 2000, 2001 Stem Systems, Inc.
5
6# Stem is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10
11# Stem is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15
16# You should have received a copy of the GNU General Public License
17# along with Stem; if not, write to the Free Software
18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20# For a license to use the Stem under conditions other than those
21# described here, to purchase support for this software, or to purchase a
22# commercial warranty contract, please contact Stem Systems at:
23
24# Stem Systems, Inc. 781-643-7504
25# 79 Everett St. info@stemsystems.com
26# Arlington, MA 02474
27# USA
28
29package Stem::Portal ;
30
31use strict ;
32use Carp ;
33use Data::Dumper ;
34
35use Stem::AsyncIO ;
36use Stem::Vars ;
37use Stem::Packet ;
38use Stem::Socket ;
39use Stem::Trace 'log' => 'stem_status', 'sub' => 'TraceStatus' ;
40use Stem::Trace 'log' => 'stem_error' , 'sub' => 'TraceError' ;
41
42my %name_to_portal ;
43my %portal_to_names ;
44
45my $default_portal ;
46
47
48Stem::Route::register_class( __PACKAGE__, 'port' ) ;
49
50my $attr_spec_portal = [
51
52 {
53 'name' => 'reg_name',
54 'default' => '',
55 'help' => <<HELP,
56This is a unique name used to register this instance of a Portal.
57HELP
58 },
59 {
60 'name' => 'server',
61 'env' => 'server',
62 'help' => <<HELP,
63This determines if we are a server or a client.
64If it is true, we are a server. Otherwise, we are a client.
65HELP
66 },
67 {
68 'name' => 'sync',
69 'type' => 'boolean',
70 'default' => 1,
71 'help' => <<HELP,
72Mark this as a synchronously connecting Portal. Default is syncronous
73connections, set to 0 for asynchronous. In both cases the same method
74callbacks are used.
75HELP
76 },
77 {
78 'name' => 'port',
79 'default' => 10_000,
80 'env' => 'port',
81 'help' => <<HELP,
82This determines which port we bind to if we are a server.
83This determines which port we connect to if we are a client.
84The default value is 10,000.
85HELP
86 },
87 {
88 'name' => 'host',
89 'default' => 'localhost',
90 'env' => 'host',
91 'help' => <<HELP,
92This determines which host we attach to when we are a client.
93The default value is localhost.
94HELP
95 },
96
97 {
98 'name' => 'spawn_conf_file',
99 'help' => <<HELP,
100This tells the portal to fork another Stem Hub and pass this value as
101the configuration file argument to run_stem. The new Hub will be
102connected to this Portal and be private to it.
103HELP
104 },
105
106 {
107 'name' => 'spawn_conf_args',
108 'help' => <<HELP,
109
110This tells the portal to fork another Stem Hub and pass (via a
111message) this data to the Stem::Conf as a configuration
112The new Hub will be connected to this Portal and be private
113to it.
114
115HELP
116 },
117
118
119 {
120 'name' => 'run_stem_args',
121 'help' => <<HELP,
122These are the command line arguments to run_stem for the spawned Hub.
123HELP
124 },
125 {
126 'name' => 'codec',
127 'help' => <<HELP,
128This is the sub-class that is used to convert messages to/from a byte
129stream for this portal
130HELP
131 },
132 {
133 'name' => 'disable',
134 'env' => 'disable',
135 'type' => 'boolean',
136 'help' => <<HELP,
137This flag will disable this Portal. It will not construct an object and
138no errors will be returned.
139HELP
140 },
141
142] ;
143
144sub new {
145
146 my( $class ) = shift ;
147
148 my $self = Stem::Class::parse_args( $attr_spec_portal, @_ ) ;
149 return $self unless ref $self ;
150
151 return if $self->{ 'disable' } ;
152
153 my $name = $Stem::Vars::Hub_name ;
154
155 if ( $Env{'portal_use_stdio'} ) {
156
157 return $self->new_child_portal() ;
158 }
159
160 if ( $self->{'spawn_conf_file'} || $self->{'spawn_conf_args'} ) {
161
162 return $self->new_parent_portal() ;
163 }
164
165 if ( $self->{'server'} ) {
166
167 $self->{'type'} = 'listener' ;
168 $self->{'server_name'} = $name ;
169 }
170 else {
171
172 $self->{'type'} = 'client' ;
173 $self->{'name'} = $name ;
174 }
175
176#print "REG new [$self->{'reg_name'}]\n" ;
177
178 my $sock_obj = Stem::Socket->new(
179 'object' => $self,
180 'host' => $self->{'host'},
181 'port' => $self->{'port'},
182 'server' => $self->{'server'},
183 'sync' => $self->{'sync'},
184 ) ;
185
186 ref $sock_obj or return <<ERR ;
187Stem::Portal '$self->{'reg_name'}' tried to connect/listen to $self->{'host'}:$self->{'port'}
188$sock_obj
189ERR
190
191 $self->{'sock_obj'} = $sock_obj ;
192
193 return ;
194}
195
196sub connected {
197
198 my( $self, $connected_sock ) = @_ ;
199
200 my( $portal ) ;
201
202 TraceStatus "Portal Connected" ;
203
204 $self->{'read_fh'} = $connected_sock ;
205 $self->{'write_fh'} = $connected_sock ;
206
207 my $type = $self->{'type'} ;
208
209 if ( $type eq 'listener' ) {
210
211# fork off a new portal by making a clone of the listener portal
212
213 $portal = bless { %$self } ;
214 $portal->{'type'} = 'accepted' ;
215
216 my $name = $portal->{'server_name'} ;
217
218 $portal->{'name'} = $name ;
219
220 delete( $portal->{'sock_obj'} ) ;
221 }
222 else {
223
224#print "Portal Connected\n" ;
225
226# a client portal is just itself
227
228 $portal = $self ;
229
230#print "REG [$self->{'reg_name'}]\n" ;
231
232 if ( my $name = $self->{'reg_name'} ) {
233
234 $portal->register( $name ) ;
235 }
236
237 unless ( $default_portal ) {
238
239 $portal->register( 'DEFAULT' ) ;
240 $default_portal = $portal ;
241 }
242 }
243
244 my $err = $portal->_activate() ;
245
246 die $err if $err ;
247}
248
249my $run_stem_path ;
250
251sub new_parent_portal {
252
253 my( $self ) = @_ ;
254
255 $run_stem_path ||= do {
256
257 require Stem::Proc ;
258 require Stem::InstallConfig ;
259
260 $Stem::InstallConfig{ run_stem_path } ;
261 } ;
262
263 my $conf_file = $self->{'spawn_conf_file'} || 'portal_child' ;
264
265 my @run_stem_args = @{$self->{'run_stem_args'} || []} ;
266
267 my $proc = Stem::Proc->new(
268
269 path => $run_stem_path,
270 proc_args => [
271 'portal_use_stdio=1',
272 @run_stem_args,
273 $conf_file,
274 ],
275 spawn_now => 1,
276 cell_attr => [
277 no_io => 1,
278 ],
279 ) ;
280
281 $self->{'proc'} = $proc ;
282
283 TraceStatus "Portal Paren" ;
284
285 $self->{'read_fh'} = $proc->read_fh() ;
286 $self->{'write_fh'} = $proc->write_fh() ;
287
288#print "REG [$self->{'reg_name'}]\n" ;
289
290 my $err = $self->_activate() ;
291
292
293 die $err if $err ;
294###########
295# $self->{'spawn_conf_args'} ) {
296#### when can we send the conf data?
297##########
298
299
300}
301
302sub new_child_portal {
303
304 my( $self ) = @_ ;
305
306 $self->{'type'} = 'child' ;
307
308
309 TraceStatus "Portal Child" ;
310
311 $self->{'read_fh'} = \*STDIN ;
312 $self->{'write_fh'} = \*STDOUT ;
313
314#print "REG [$self->{'reg_name'}]\n" ;
315
316 unless ( $default_portal ) {
317
318 $self->register( 'DEFAULT' ) ;
319 $default_portal = $self ;
320 }
321
322 if ( my $portal_name = $Env{'portal_name'} ) {
323
324 $self->register( $portal_name ) ;
325 }
326
327 my $err = $self->_activate() ;
328
329 die $err if $err ;
330}
331
332
333sub _activate {
334
335 my( $self ) = @_ ;
336
337 TraceStatus "Active portal" ;
338
339 my $aio = Stem::AsyncIO->new(
340
341 'object' => $self,
342 'read_fh' => $self->{'read_fh'},
343 'write_fh' => $self->{'write_fh'},
344 'read_method' => 'portal_data',
345 'closed_method' => 'portal_closed',
346 ) ;
347
348 return $aio unless ref $aio ;
349
350 $self->{'aio'} = $aio ;
351
352 my $packet = Stem::Packet->new( 'codec' => $self->{'codec'} ) ;
353 return $packet unless ref $packet ;
354 $self->{'packet'} = $packet ;
355
356 my $msg = Stem::Msg->new( 'from' => "${Stem::Vars::Hub_name}:port",
357 'type' => 'register',
358 ) ;
359
360 return $msg unless ref $msg ;
361
362 $self->write_msg( $msg ) ;
363
364 return ;
365}
366
367# this is not a method, but a class sub
368
369sub send_msg {
370
371 my ( $msg, $to_hub ) = @_ ;
372
373 $to_hub ||= 'DEFAULT' ;
374
375 my $self = $name_to_portal{ $to_hub } ;
376
377 return "unknown Portal '$to_hub'" unless $self ;
378
379 $msg->from_hub( $self->{'name'} ) unless $msg->from_hub() ;
380# $msg->from_hub( $self->{'name'} ) ;
381
382 unless( $self->{'remote_hub'} ) {
383
384 push( @{$self->{'queued_msgs'}}, $msg ) ;
385
386 return ;
387 }
388
389 $self->write_msg( $msg ) ;
390
391 return ;
392}
393
394# this is a regular method called by the above sub.
395
396sub write_msg {
397
398 my( $self, $msg ) = @_ ;
399
400 my $packet_text = $self->{'packet'}->to_packet( $msg ) ;
401
402#print "PACK SEND [$packet_text]\n" ;
403
404 $self->{'aio'}->write( $packet_text ) ;
405}
406
407sub portal_data {
408
409 my( $self, $packet_text ) = @_ ;
410
411 my $packet = $self->{'packet'} ;
412
413# parse out all messages that may be in the input data
414
415 while( my $msg = $packet->to_data( $packet_text ) ) {
416
417 $self->_portal_msg_in( $msg ) ;
418
419# no more incoming data in this callback
420
421 $packet_text = '' ;
422 }
423}
424
425sub _portal_msg_in {
426
427 my( $self, $msg ) = @_ ;
428
429 if ( $msg->type() eq 'register' ) {
430
431# register the other hub and mark this hub as connecting to it.
432
433 $self->{'remote_hub'} = $msg->from_hub() ;
434 warn( caller(), $msg->dump() ) and die
435 'Msg Has No Remote Hub' unless $self->{'remote_hub'} ;
436 $self->register( $self->{'remote_hub'} ) ;
437
438# handle messages that got queued while the portal was down
439
440 while( my $queued_msg = shift @{$self->{'queued_msgs'}} ) {
441
442#print $queued_msg->dump( 'QUEUED' ) ;
443 $self->write_msg( $queued_msg ) ;
444 }
445
446 return ;
447 }
448
449 $msg->in_portal( $self->{'remote_hub'} ) ;
450 $msg->dispatch() ;
451}
452
453
454sub portal_closed {
455
456 my( $self ) = @_ ;
457
458#TraceStatus "Portal closed" ;
459
460 Stem::Route::unregister_cell( $self ) ;
461 my $names = $self->unregister() ;
462
463 if ( $self->{'type'} eq 'accepted' ) {
464
465# TraceStatus "client hub '$self->{'name'}' closed" ;
466
467 $self->shut_down() ;
468 return ;
469 }
470
471 my @hub_names = ref $names ? @{$names} : 'UNKNOWN' ;
472
473 Stem::Event::end_loop() ;
474
475 die "server hub [@hub_names] died" ;
476}
477
478sub shut_down {
479
480 my( $self ) = @_ ;
481
482 TraceStatus "SHUT DOWN port : ". Dumper($self);
483
484 $self->{'aio'}->shut_down() ;
485 delete @{$self}{qw( object aio )} ;
486}
487
488# this is for messages directly to this portal. messages are sent out
489# the portal via the send class method
490#
491# UNUSED so far
492
493sub msg_in {
494
495 my( $self, $msg ) = @_ ;
496
497 TraceStatus "portal msg in" ;
498}
499
500sub register {
501
502 my( $self, $name ) = @_ ;
503
504#print "NAME [$name]: ", caller(), "\n" ;
505
506 TraceStatus "portal arg: [$self] [$name]\n\t",
507 map( "<$_>", caller() ), "\n" ;
508
509 $name_to_portal{ $name } = $self ;
510 push( @{$portal_to_names{ $self }}, $name ) ;
511}
512
513sub unregister {
514
515 my( $name ) = @_ ;
516
517# convert a name to its object ;
518
519 my $portal = ref $name ? $name : $name_to_portal{ $name } ;
520
521 if ( $portal ) {
522
523 delete $name_to_portal{ $portal } ;
524
525 my $names = delete $portal_to_names{ $portal } ;
526
527 return $names ;
528
529 }
530
531 return ;
532}
533
534sub status_cmd {
535
536 my ($self, $msg ) = @_ ;
537
538#print $msg->dump( 'PORT' ) ;
539
540 my $status = <<STATUS ;
541
542Portal Status for Hub '$Stem::Vars::Hub_name'
543
544STATUS
545
546 foreach my $port_name ( sort keys %name_to_portal ) {
547
548 my $portal = $name_to_portal{ $port_name } ;
549
550 $status .= <<STATUS ;
551$port_name
552 Hub: $portal->{'remote_hub'}
553 Type: $portal->{'type'}
554
555STATUS
556
557 }
558
559 return $status ;
560}
561
5621 ;