fixed dummy values in Build.PL
[urisagit/Stem.git] / sessions / mid_event_async.pl
CommitLineData
4536f655 1#!/usr/local/bin/perl -w
2
3use strict ;
4
5use Stem::Event ;
6use Stem::Socket ;
7use Stem::AsyncIO ;
8use Stem::Gather ;
9
10use Data::Dumper ;
11use Getopt::Long ;
12
13my $opts_ok = GetOptions(
14 \my %opts,
15 'server_port=s',
16 'upper_port=s',
17 'reverse_port=s',
18 'verbose|v',
19 'help|h',
20) ;
21
22usage() unless $opts_ok ;
23usage() if $opts{help} ;
24
25my %backend_ports = (
26
27 'reverse' => $opts{reverse_port} || 8888,
28 'upper' => $opts{upper_port} || 8889,
29) ;
30
31# this controls the order of requests to the backends.
32
33my @backend_ids = sort keys %backend_ports ;
34
35my $listen = init_server( $opts{server_port} || 8887 ) ;
36
37Stem::Event::start_loop() ;
38
39exit ;
40
41# create the listen socket for the server side of the middle layer.
42
43sub init_server {
44
45 my( $port ) = @_ ;
46
47# create the middle layer listen socket
48
49 my $listen = Stem::Socket->new(
50 object => bless( {
51 }, __PACKAGE__),
52 method => 'client_connected',
53 port => $port,
54 server => 1,
55 ) ;
56
57 die "can't listen on $port: $listen" unless ref $listen ;
58
59 return $listen ;
60}
61
62# this is called when the server has accepted a socket connection
63
64sub client_connected {
65
66 my( $obj, $socket ) = @_ ;
67
68# create the session object
69
70 my $self = bless {}, __PACKAGE__ ;
71
72# create and save the async io object for the client
73
74 my $async = Stem::AsyncIO->new(
75 object => $self,
76 fh => $socket,
77 read_method => 'client_read_data',
78 send_data_on_close => 1,
79 ) ;
80 ref $async or die "can't create Async: $async" ;
81 $self->{client_async} = $async ;
82
83# create and save the gather object
84
85 my $gather = Stem::Gather->new(
86 object => $self,
87 keys => \@backend_ids,
88 ) ;
89
90 ref $gather or die "can't create Gather: $gather" ;
91 $self->{gather} = $gather ;
92}
93
94# this is called when all the data from client has been read.
95
96sub client_read_data {
97
98 my( $self, $data ) = @_ ;
99
100 print "Client read [${$data}]\n" if $opts{verbose} ;
101
102# store the client data (a ref is passed in)
103
104 $self->{'client_data'} = ${$data} ;
105
106# connect to all of the backend servers
107
108 $self->connect_to_backends() ;
109}
110
111# this connects the session to all of the backends
112
113sub connect_to_backends {
114
115 my( $self ) = @_ ;
116
117# loop over all the backends
118
119 foreach my $id ( @backend_ids ) {
120
121# connect to the backend with this id and its port and save the
122# connect object
123
124 my $connect = Stem::Socket->new(
125 object => $self,
126 id => $id,
127 port => $backend_ports{ $id },
128 method => 'backend_connected',
129 ) ;
130
131 ref $connect or die "can't create Socket: $connect" ;
132 $self->{connect}{$id} = $connect ;
133 }
134}
135
136# this is called when a backend end connection succeeds
137
138sub backend_connected {
139
140 my( $self, $socket, $id ) = @_ ;
141
142# delete and shutdown the connect object as we no longer need it
143
144 my $connect = delete $self->{connect}{$id} ;
145 $connect->shut_down() ;
146
147# create and save an async i/o object for this backend
148
149 my $async = Stem::AsyncIO->new(
150 object => $self,
151 id => $id,
152 fh => $socket,
153 read_method => 'backend_read_data',
154 send_data_on_close => 1,
155 ) ;
156 ref $async or die "can't create Async: $async" ;
157 $self->{async}{$id} = $async ;
158
159# write the client data to the back end. no more data will follow.
160
161 $async->final_write( \$self->{client_data} ) ;
162}
163
164# this is called when we have read all the data from the backend
165
166sub backend_read_data {
167
168 my( $self, $data, $id ) = @_ ;
169
170 print "Backend $id READ [${$data}]\n" if $opts{verbose} ;
171
172# save the backend data (we are passed a ref)
173
174 $self->{backend_data}{$id} = ${$data} ;
175
176# delete and shutdown the async i/o for the backend since we don't
177# need it anymore
178
179 my $async = delete $self->{async}{$id} ;
180 $async->shut_down() ;
181
182# mark that this backend is done
183
184 $self->{'gather'}->gathered( $id ) ;
185}
186
187# this is called when all the backends are done.
188
189sub gather_done {
190
191 my( $self ) = @_ ;
192
193 my $gather = delete $self->{gather} ;
194 $gather->shut_down() ;
195
196# no more backends so we return the joined backend data to the client.
197
198# we don't need the gather object around anymore
199# allow for self cleanup when it is done with the final write to the
200# client.
201
202 my $async = delete $self->{client_async} ;
203 $async->final_write(
204 join( '', @{$self->{backend_data}}{ @backend_ids } )
205 ) ;
206}
207
208
209
210sub usage {
211
212 my ( $error ) = @_ ;
213
214 $error ||= '' ;
215 die <<DIE ;
216$error
217usage: $0 [--help|h] [--upper_port <port>] [--reverse_port <port>]
218 [--server_port <port>] [--v|--verbose]
219
220 upper_port <port> Set the port for the middleware server
221 (default is 8888)
222 upper_port <port> Set the port for the upper case server
223 (default is 8888)
224 reverse_port <port> Set the port for the string reverse server
225 (default is 8889)
226 verbose Set verbose mode
227 help | h Print this help text
228DIE
229
230}
231
232# this destroy can be uncommented to see the actual destruction of the
233# various obects in this script.
234
235# DESTROY {
236# my( $self ) = @_ ;
237# print "DEST [$self]\n" ;
238# }