Commit | Line | Data |
4536f655 |
1 | # File: Stem/SockMsg.pm |
2 | |
3 | # This file is part of Stem. |
4 | # Copyright (C) 1999, 2000, 2001 Stem Systems, Inc. |
5 | |
6 | # Stem is free software; you can redistribute it and/or modify |
7 | # it under the terms of the GNU General Public License as published by |
8 | # the Free Software Foundation; either version 2 of the License, or |
9 | # (at your option) any later version. |
10 | |
11 | # Stem is distributed in the hope that it will be useful, |
12 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | # GNU General Public License for more details. |
15 | |
16 | # You should have received a copy of the GNU General Public License |
17 | # along with Stem; if not, write to the Free Software |
18 | # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
19 | |
20 | # For a license to use the Stem under conditions other than those |
21 | # described here, to purchase support for this software, or to purchase a |
22 | # commercial warranty contract, please contact Stem Systems at: |
23 | |
24 | # Stem Systems, Inc. 781-643-7504 |
25 | # 79 Everett St. info@stemsystems.com |
26 | # Arlington, MA 02474 |
27 | # USA |
28 | |
29 | package Stem::SockMsg ; |
30 | |
31 | use strict ; |
32 | |
33 | use Data::Dumper ; |
34 | |
35 | use Stem::Socket ; |
36 | use Stem::Trace 'log' => 'stem_status', 'sub' => 'TraceStatus' ; |
37 | use Stem::Trace 'log' => 'stem_error' , 'sub' => 'TraceError' ; |
38 | use Stem::Route qw( :cell ) ; |
39 | use base 'Stem::Cell' ; |
40 | |
41 | use Stem::Debug qw( dump_data dump_socket ) ; |
42 | |
43 | |
44 | my $attr_spec = [ |
45 | |
46 | { |
47 | 'name' => 'reg_name', |
48 | 'help' => <<HELP, |
49 | The registration name for this Cell |
50 | HELP |
51 | }, |
52 | |
53 | { |
54 | 'name' => 'host', |
55 | 'env' => 'host', |
56 | 'help' => <<HELP, |
57 | Host address to listen on or connect to |
58 | HELP |
59 | }, |
60 | |
61 | { |
62 | 'name' => 'port', |
63 | 'env' => 'port', |
64 | 'required' => 1, |
65 | 'help' => <<HELP, |
66 | Port address to listen on or connect to |
67 | HELP |
68 | }, |
69 | |
70 | { |
71 | 'name' => 'server', |
72 | 'type' => 'boolean', |
73 | 'help' => <<HELP, |
74 | Mark this Cell as a server (listens for connections) |
75 | HELP |
76 | }, |
77 | |
78 | { |
79 | 'name' => 'connect_now', |
80 | 'type' => 'boolean', |
81 | 'help' => <<HELP, |
82 | Connect upon Cell creation |
83 | HELP |
84 | }, |
85 | |
86 | { |
87 | 'name' => 'status_addr', |
88 | 'type' => 'address', |
89 | 'help' => <<HELP, |
90 | Send status (connect/disconnect) messages to this address. |
91 | HELP |
92 | }, |
93 | |
94 | { |
95 | 'name' => 'sync', |
96 | 'type' => 'boolean', |
97 | 'default' => 0, |
98 | 'help' => <<HELP, |
99 | Mark this as a synchronously connecting socket. Default is asyncronous |
100 | connections. In both cases the same method callbacks are used. |
101 | HELP |
102 | }, |
103 | |
104 | { |
105 | 'name' => 'log_name', |
106 | 'help' => <<HELP, |
107 | Log to send connection status to |
108 | HELP |
109 | }, |
110 | |
111 | { |
112 | 'name' => 'cell_attr', |
113 | 'class' => 'Stem::Cell', |
114 | 'help' => <<HELP, |
115 | Argument list passed to Stem::Cell for this Cell |
116 | HELP |
117 | }, |
118 | |
119 | ] ; |
120 | |
121 | #my $listener ; |
122 | |
123 | |
124 | sub new { |
125 | |
126 | my( $class ) = shift ; |
127 | |
128 | my $self = Stem::Class::parse_args( $attr_spec, @_ ) ; |
129 | return $self unless ref $self ; |
130 | |
131 | if ( $self->{'server'} ) { |
132 | my $listen_obj = Stem::Socket->new( |
133 | 'object' => $self, |
134 | 'host' => $self->{'host'}, |
135 | 'port' => $self->{'port'}, |
136 | 'server' => 1, |
137 | ) ; |
138 | |
139 | return $listen_obj unless ref $listen_obj ; |
140 | |
141 | my $host_text = $self->{'host'} ; |
142 | |
143 | $host_text = 'localhost' unless defined $host_text ; |
144 | |
145 | my $info = <<INFO ; |
146 | SockMsg |
147 | Type: server |
148 | Local: $host_text:$self->{'port'} |
149 | INFO |
150 | |
151 | $self->cell_info( $info ) ; |
152 | |
153 | $self->{'listen_obj'} = $listen_obj ; |
154 | |
155 | #print "LISTEN $listen_obj\n" ; |
156 | #$listener = $listen_obj ; |
157 | |
158 | $self->cell_activate() ; |
159 | } |
160 | elsif ( $self->{'connect_now'} ) { |
161 | |
162 | $self->connect() ; |
163 | } |
164 | |
165 | $self->cell_set_args( |
166 | 'host' => $self->{'host'}, |
167 | 'port' => $self->{'port'}, |
168 | 'server' => $self->{'server'}, |
169 | ) ; |
170 | |
171 | #print "Sock\n", Dumper( $self ) ; |
172 | |
173 | return( $self ) ; |
174 | } |
175 | |
176 | sub connect { |
177 | |
178 | my( $self ) = @_ ; |
179 | |
180 | #print "MODE [$self->{connecting}]\n" ; |
181 | |
182 | # return if $self->{connecting}++ ; |
183 | |
184 | my $host = $self->cell_get_args( 'host' ) || $self->{'host'} ; |
185 | my $port = $self->cell_get_args( 'port' ) || $self->{'port'} ; |
186 | my $sync = $self->cell_get_args( 'sync' ) || $self->{'sync'} ; |
187 | |
188 | ######################## |
189 | ######################## |
190 | ## handle connect timeouts |
191 | ######################## |
192 | ######################## |
193 | |
194 | #TraceStatus "Connecting to $host:$port" ; |
195 | |
196 | my $sock_obj = Stem::Socket->new( |
197 | 'object' => $self, |
198 | 'host' => $host, |
199 | 'port' => $port, |
200 | 'sync' => $sync, |
201 | ) ; |
202 | |
203 | return $sock_obj unless ref $sock_obj ; |
204 | |
205 | $self->{'sock_obj'} = $sock_obj ; |
206 | |
207 | return ; |
208 | } |
209 | |
210 | sub connected { |
211 | |
212 | my( $self, $connected_sock ) = @_ ; |
213 | |
214 | #print "CONNECTED\n" ; |
215 | |
216 | $self->{connected} = 1 ; |
217 | |
218 | $self->send_status_msg( 'connected' ) ; |
219 | |
220 | my $type = $self->{'sock_obj'} ? |
221 | $self->{'sock_obj'}->type() : |
222 | 'sync connected' ; |
223 | |
224 | my $info = sprintf( <<INFO, |
225 | SockMsg connected |
226 | Type: $type |
227 | Local: %s:%d |
228 | Remote: %s:%d |
229 | INFO |
230 | $connected_sock->sockhost(), |
231 | $connected_sock->sockport(), |
232 | $connected_sock->peerhost(), |
233 | $connected_sock->peerport(), |
234 | ) ; |
235 | |
236 | TraceStatus "\n$info" ; |
237 | |
238 | if ( my $log_name = $self->{ 'log_name' } ) { |
239 | |
240 | #print "MSG LOG\n" ; |
241 | |
242 | Stem::Log::Entry->new( |
243 | 'logs' => $log_name, |
244 | 'text' => "Connected\n$info", |
245 | ) ; |
246 | } |
247 | |
248 | $self->cell_set_args( |
249 | 'fh' => $connected_sock, |
250 | 'aio_args' => |
251 | [ 'fh' => $connected_sock ], |
252 | 'info' => $info, |
253 | ) ; |
254 | |
255 | my $err = $self->cell_trigger() ; |
256 | # print "TRIGGER ERR [$err]\n" unless ref $err ; |
257 | } |
258 | |
259 | # this method is called after the cell is triggered. this cell can be |
260 | # the original cell or a cloned one. |
261 | |
262 | sub triggered_cell { |
263 | |
264 | my( $self ) = @_ ; |
265 | |
266 | #print "SockMsg triggered\n" ; |
267 | return if $self->{'server'} ; |
268 | |
269 | # return "SockMsg: can't connect a server socket" if $self->{'server'} ; |
270 | |
271 | return $self->connect() ; |
272 | } |
273 | |
274 | # we handle the socket close method directly here so we can reconnect |
275 | # if needed. the other async method callbacks are in Cell.pm |
276 | |
277 | sub async_closed { |
278 | |
279 | my( $self ) = @_ ; |
280 | |
281 | # reconnect stuff. should be in Socket.pm |
282 | |
283 | # my $sock = $self->cell_get_args( 'fh' ) ; |
284 | # $sock->close() ; |
285 | #print "Sock MSG: closed name $self->{'reg_name'}\n" ; |
286 | # $self->{'sock_obj'}->connect_to() ; |
287 | |
288 | $self->send_status_msg( 'disconnected' ) ; |
289 | |
290 | if ( my $log_name = $self->{ 'log_name' } ) { |
291 | |
292 | Stem::Log::Entry->new( |
293 | 'logs' => $log_name, |
294 | 'text' => "Closed\n$self->{'info'}", |
295 | ) |
296 | } |
297 | |
298 | # TraceStatus "Disconnected" ; |
299 | |
300 | $self->cell_set_args( 'info' => 'SockMsg disconnected' ) ; |
301 | |
302 | ###################### |
303 | ###################### |
304 | # add support for reconnect. |
305 | # it has a flag, delay, retry count. |
306 | ###################### |
307 | ###################### |
308 | |
309 | $self->shut_down() ; |
310 | } |
311 | |
312 | sub shut_down { |
313 | |
314 | my( $self ) = @_ ; |
315 | |
316 | #print "SOCKMSG SHUT $self\n", caller(), "\n", dump_data $self ; |
317 | |
318 | $self->cell_shut_down() ; |
319 | |
320 | unless ( $self->{'connected'} ) { |
321 | |
322 | use Carp 'cluck' ; |
323 | #cluck "SOCKMSG SHUT SERVER $self\n" ; |
324 | |
325 | my $sock_obj = $self->{'sock_obj'} ; |
326 | |
327 | $sock_obj->shut_down() ; |
328 | } |
329 | } |
330 | |
331 | sub send_status_msg { |
332 | |
333 | my( $self, $status ) = @_ ; |
334 | |
335 | my $status_addr = $self->{status_addr} or return ; |
336 | |
337 | my $status_msg = Stem::Msg->new( |
338 | to => $status_addr, |
339 | from => $self->cell_from_addr(), |
340 | type => 'status', |
341 | data => { |
342 | status => $status, |
343 | }, |
344 | ) ; |
345 | |
346 | $status_msg->dispatch() ; |
347 | } |
348 | |
349 | |
350 | |
351 | sub DESTROY { |
352 | my ( $self ) = @_ ; |
353 | |
354 | # print "SOCKMSG DESTROY", caller(), "\n" ; |
355 | |
356 | #print $self->_dump( "DESTROY") ; |
357 | } |
358 | |
359 | |
360 | # sub IO::Socket::INET::DESTROY { |
361 | # my ( $self ) = @_ ; |
362 | |
363 | # # print "IO::DESTROY\n", dump_socket( $self ) ; |
364 | |
365 | # #warn "L $listener - S $self\n" if $listener == $self ; |
366 | |
367 | # # print "SOCKMSG DESTROY", caller(), "\n" ; |
368 | # #cluck "IO::DESTROY $self\n" ; |
369 | # } |
370 | |
371 | 1 ; |