fixed dummy values in Build.PL
[urisagit/Stem.git] / sessions / mid_event_async.pl
1 #!/usr/local/bin/perl -w
2
3 use strict ;
4
5 use Stem::Event ;
6 use Stem::Socket ;
7 use Stem::AsyncIO ;
8 use Stem::Gather ;
9
10 use Data::Dumper ;
11 use Getopt::Long ;
12
13 my $opts_ok = GetOptions(
14         \my %opts,
15         'server_port=s',
16         'upper_port=s',
17         'reverse_port=s',
18         'verbose|v',
19         'help|h',
20 ) ;
21
22 usage() unless $opts_ok ;
23 usage() if $opts{help} ;
24
25 my %backend_ports = (
26
27         'reverse'       => $opts{reverse_port} || 8888,
28         'upper'         => $opts{upper_port} || 8889,
29 ) ;
30
31 # this controls the order of requests to the backends.
32
33 my @backend_ids = sort keys %backend_ports ;
34
35 my $listen = init_server( $opts{server_port} || 8887 ) ;
36
37 Stem::Event::start_loop() ;
38
39 exit ;
40
41 # create the listen socket for the server side of the middle layer.
42
43 sub init_server {
44
45         my( $port ) = @_ ;
46
47 # create the middle layer listen socket
48
49         my $listen = Stem::Socket->new(
50                 object  => bless( {
51                 }, __PACKAGE__),
52                 method  => 'client_connected',
53                 port    => $port,
54                 server  => 1,
55         ) ;
56
57         die "can't listen on $port: $listen" unless ref $listen ;
58
59         return $listen ;
60 }
61
62 # this is called when the server has accepted a socket connection
63
64 sub client_connected {
65
66         my( $obj, $socket ) = @_ ;
67
68 # create the session object
69
70         my $self = bless {}, __PACKAGE__ ;
71
72 # create and save the async io object for the client
73
74         my $async = Stem::AsyncIO->new(
75                 object  => $self,
76                 fh      => $socket,
77                 read_method     => 'client_read_data',
78                 send_data_on_close => 1,
79         ) ;
80         ref $async or die "can't create Async: $async" ;
81         $self->{client_async} = $async ;
82
83 # create and save the gather object
84
85         my $gather = Stem::Gather->new(
86                 object  => $self,
87                 keys    => \@backend_ids,
88         ) ;
89
90         ref $gather or die "can't create Gather: $gather" ;
91         $self->{gather} = $gather ;
92 }
93
94 # this is called when all the data from client has been read.
95
96 sub client_read_data {
97
98         my( $self, $data ) = @_ ;
99
100         print "Client read [${$data}]\n"  if $opts{verbose} ;
101
102 # store the client data (a ref is passed in)
103
104         $self->{'client_data'} = ${$data} ;
105
106 # connect to all of the backend servers
107
108         $self->connect_to_backends() ;
109 }
110
111 # this connects the session to all of the backends
112
113 sub connect_to_backends {
114
115         my( $self ) = @_ ;
116
117 # loop over all the backends
118
119         foreach my $id ( @backend_ids ) {
120
121 # connect to the backend with this id and its port and save the
122 # connect object
123
124                 my $connect = Stem::Socket->new(
125                         object  => $self,
126                         id      => $id,
127                         port    => $backend_ports{ $id },
128                         method  => 'backend_connected',
129                 ) ;
130
131                 ref $connect or die "can't create Socket: $connect" ;
132                 $self->{connect}{$id} = $connect ;
133         }
134 }
135
136 # this is called when a backend end connection succeeds
137
138 sub backend_connected {
139
140         my( $self, $socket, $id ) = @_ ;
141
142 # delete and shutdown the connect object as we no longer need it
143
144         my $connect = delete $self->{connect}{$id} ;
145         $connect->shut_down() ;
146
147 # create and save an async i/o object for this backend
148
149         my $async = Stem::AsyncIO->new(
150                 object  => $self,
151                 id      => $id,
152                 fh      => $socket,
153                 read_method     => 'backend_read_data',
154                 send_data_on_close => 1,
155         ) ;
156         ref $async or die "can't create Async: $async" ;
157         $self->{async}{$id} = $async ;
158
159 # write the client data to the back end. no more data will follow.
160
161         $async->final_write( \$self->{client_data} ) ;
162 }
163
164 # this is called when we have read all the data from the backend
165
166 sub backend_read_data {
167
168         my( $self, $data, $id ) = @_ ;
169
170         print "Backend $id READ [${$data}]\n" if $opts{verbose} ;
171
172 # save the backend data (we are passed a ref)
173
174         $self->{backend_data}{$id} = ${$data} ;
175
176 # delete and shutdown the async i/o for the backend since we don't
177 # need it anymore
178
179         my $async = delete $self->{async}{$id} ;
180         $async->shut_down() ;
181
182 # mark that this backend is done
183
184         $self->{'gather'}->gathered( $id ) ;
185 }
186
187 # this is called when all the backends are done.
188
189 sub gather_done {
190
191         my( $self ) = @_ ;
192
193         my $gather = delete $self->{gather} ;
194         $gather->shut_down() ;
195
196 # no more backends so we return the joined backend data to the client.
197
198 # we don't need the gather object around anymore
199 # allow for self cleanup when it is done with the final write to the
200 # client.
201
202         my $async = delete $self->{client_async} ;
203         $async->final_write(
204                 join( '',  @{$self->{backend_data}}{ @backend_ids } )
205         ) ;
206 }
207
208
209
210 sub usage {
211
212         my ( $error ) = @_ ;
213
214         $error ||= '' ;
215         die <<DIE ;
216 $error
217 usage: $0 [--help|h] [--upper_port <port>] [--reverse_port <port>]
218         [--server_port <port>] [--v|--verbose]
219
220         upper_port <port>       Set the port for the middleware server
221                                 (default is 8888)
222         upper_port <port>       Set the port for the upper case server
223                                 (default is 8888)
224         reverse_port <port>     Set the port for the string reverse server
225                                 (default is 8889)
226         verbose                 Set verbose mode
227         help | h                Print this help text
228 DIE
229
230 }
231
232 # this destroy can be uncommented to see the actual destruction of the
233 # various obects in this script.
234
235 # DESTROY {
236 #       my( $self ) = @_ ;
237 #       print "DEST [$self]\n" ;
238 # }