Commit | Line | Data |
26ab719a |
1 | package DBIx::Class::Storage::DBI::Replicated::Pool; |
2 | |
0bbe6676 |
3 | use Moo; |
4 | use Role::Tiny (); |
5 | use List::Util (); |
6 | use Scalar::Util qw(reftype); |
0bd8e058 |
7 | use DBI (); |
9901aad7 |
8 | use Carp::Clan qw/^DBIx::Class/; |
ed7ab0f4 |
9 | use Try::Tiny; |
0bbe6676 |
10 | use DBIx::Class::Storage::DBI::Replicated::Types |
11 | qw(PositiveInteger Number DBICStorageDBI ClassName HashRef); |
26ab719a |
12 | |
13 | =head1 NAME |
14 | |
21fc4719 |
15 | DBIx::Class::Storage::DBI::Replicated::Pool - Manage a pool of replicants |
26ab719a |
16 | |
17 | =head1 SYNOPSIS |
18 | |
19 | This class is used internally by L<DBIx::Class::Storage::DBI::Replicated>. You |
20 | shouldn't need to create instances of this class. |
d4daee7b |
21 | |
26ab719a |
22 | =head1 DESCRIPTION |
23 | |
24 | In a replicated storage type, there is at least one replicant to handle the |
48580715 |
25 | read-only traffic. The Pool class manages this replicant, or list of |
26ab719a |
26 | replicants, and gives some methods for querying information about their status. |
27 | |
28 | =head1 ATTRIBUTES |
29 | |
30 | This class defines the following attributes. |
31 | |
4a607d7a |
32 | =head2 maximum_lag ($num) |
33 | |
34 | This is a number which defines the maximum allowed lag returned by the |
35 | L<DBIx::Class::Storage::DBI/lag_behind_master> method. The default is 0. In |
36 | general, this should return a larger number when the replicant is lagging |
faaba25f |
37 | behind its master, however the implementation of this is database specific, so |
4a607d7a |
38 | don't count on this number having a fixed meaning. For example, MySQL will |
39 | return a number of seconds that the replicating database is lagging. |
40 | |
41 | =cut |
42 | |
43 | has 'maximum_lag' => ( |
64cdad22 |
44 | is=>'rw', |
0bbe6676 |
45 | isa=>Number, |
64cdad22 |
46 | lazy=>1, |
0bbe6676 |
47 | default=>sub {0}, |
4a607d7a |
48 | ); |
49 | |
17b05c13 |
50 | =head2 last_validated |
51 | |
52 | This is an integer representing a time since the last time the replicants were |
faaba25f |
53 | validated. It's nothing fancy, just an integer provided via the perl L<time|perlfunc/time> |
48580715 |
54 | built-in. |
17b05c13 |
55 | |
56 | =cut |
57 | |
58 | has 'last_validated' => ( |
64cdad22 |
59 | is=>'rw', |
0bbe6676 |
60 | isa=>PositiveInteger, |
64cdad22 |
61 | lazy=>1, |
0bbe6676 |
62 | default=>sub {0}, |
17b05c13 |
63 | ); |
64 | |
4a607d7a |
65 | =head2 replicant_type ($classname) |
26ab719a |
66 | |
67 | Base class used to instantiate replicants that are in the pool. Unless you |
68 | need to subclass L<DBIx::Class::Storage::DBI::Replicated::Replicant> you should |
69 | just leave this alone. |
70 | |
71 | =cut |
72 | |
73 | has 'replicant_type' => ( |
64cdad22 |
74 | is=>'ro', |
41916570 |
75 | isa=>ClassName, |
0bbe6676 |
76 | default=> sub{'DBIx::Class::Storage::DBI'}, |
64cdad22 |
77 | handles=>{ |
78 | 'create_replicant' => 'new', |
79 | }, |
26ab719a |
80 | ); |
81 | |
26ab719a |
82 | =head2 replicants |
83 | |
0bbe6676 |
84 | A hashref of replicants, with the key being the dsn and the value returning the |
48580715 |
85 | actual replicant storage. For example, if the $dsn element is something like: |
26ab719a |
86 | |
64cdad22 |
87 | "dbi:SQLite:dbname=dbfile" |
d4daee7b |
88 | |
26ab719a |
89 | You could access the specific replicant via: |
90 | |
64cdad22 |
91 | $schema->storage->replicants->{'dbname=dbfile'} |
d4daee7b |
92 | |
26ab719a |
93 | =cut |
94 | |
95 | has 'replicants' => ( |
0bbe6676 |
96 | is => 'rw', |
97 | isa => HashRef, |
98 | default => sub { +{} }, |
26ab719a |
99 | ); |
100 | |
ede99b9f |
101 | has next_unknown_replicant_id => ( |
102 | is => 'rw', |
0bbe6676 |
103 | isa=>PositiveInteger |
104 | default => sub { 1 }, |
ede99b9f |
105 | ); |
106 | |
0bbe6676 |
107 | sub inc_unknown_replicant_id { |
108 | my $self = shift; |
109 | my $next = $self->next_unknown_replicant_id + 1; |
110 | $self->next_unknown_replicant_id($next); |
111 | return $next; |
112 | } |
113 | |
cea43436 |
114 | =head2 master |
115 | |
116 | Reference to the master Storage. |
117 | |
118 | =cut |
119 | |
0bbe6676 |
120 | has master => ( |
121 | is => 'rw', |
122 | isa =>DBICStorageDBI, |
123 | weak_ref => 1, |
124 | ); |
cea43436 |
125 | |
26ab719a |
126 | =head1 METHODS |
127 | |
128 | This class defines the following methods. |
129 | |
955a6df6 |
130 | =head2 connect_replicants ($schema, Array[$connect_info]) |
26ab719a |
131 | |
d40080c3 |
132 | Given an array of $dsn or connect_info structures suitable for connected to a |
133 | database, create an L<DBIx::Class::Storage::DBI::Replicated::Replicant> object |
134 | and store it in the L</replicants> attribute. |
26ab719a |
135 | |
136 | =cut |
137 | |
955a6df6 |
138 | sub connect_replicants { |
64cdad22 |
139 | my $self = shift @_; |
140 | my $schema = shift @_; |
d4daee7b |
141 | |
64cdad22 |
142 | my @newly_created = (); |
143 | foreach my $connect_info (@_) { |
2cd3ccc4 |
144 | $connect_info = [ $connect_info ] |
9901aad7 |
145 | if reftype $connect_info ne 'ARRAY'; |
146 | |
0bd8e058 |
147 | my $connect_coderef = |
148 | (reftype($connect_info->[0])||'') eq 'CODE' ? $connect_info->[0] |
149 | : (reftype($connect_info->[0])||'') eq 'HASH' && |
150 | $connect_info->[0]->{dbh_maker}; |
151 | |
152 | my $dsn; |
4c91f824 |
153 | my $replicant = do { |
0bbe6676 |
154 | ## yes this is evil, but it only usually happens once (for coderefs) |
155 | ## this will fail if the coderef does not actually DBI::connect |
0bd8e058 |
156 | no warnings 'redefine'; |
157 | my $connect = \&DBI::connect; |
158 | local *DBI::connect = sub { |
159 | $dsn = $_[1]; |
160 | goto $connect; |
161 | }; |
4c91f824 |
162 | $self->connect_replicant($schema, $connect_info); |
163 | }; |
164 | |
ede99b9f |
165 | my $key; |
166 | |
167 | if (!$dsn) { |
168 | if (!$connect_coderef) { |
169 | $dsn = $connect_info->[0]; |
170 | $dsn = $dsn->{dsn} if (reftype($dsn)||'') eq 'HASH'; |
171 | } |
172 | else { |
173 | # all attempts to get the DSN failed |
174 | $key = "UNKNOWN_" . $self->next_unknown_replicant_id; |
175 | $self->inc_unknown_replicant_id; |
176 | } |
177 | } |
178 | if ($dsn) { |
179 | $replicant->dsn($dsn); |
180 | ($key) = ($dsn =~ m/^dbi\:.+\:(.+)$/i); |
0bd8e058 |
181 | } |
0bd8e058 |
182 | |
0bbe6676 |
183 | if($key) { |
184 | $replicant->id($key); |
185 | } else { |
186 | $replicant->debugobj->print("Could not create an ID for the replicant!"); |
187 | } |
188 | |
189 | ## Add the new replicant to the list |
190 | $self->replicants({ |
191 | $key => $replicant, |
192 | %{$self->replicants}, |
193 | }); |
ede99b9f |
194 | |
64cdad22 |
195 | push @newly_created, $replicant; |
196 | } |
d4daee7b |
197 | |
64cdad22 |
198 | return @newly_created; |
26ab719a |
199 | } |
200 | |
bbafcf26 |
201 | =head2 connect_replicant ($schema, $connect_info) |
202 | |
203 | Given a schema object and a hashref of $connect_info, connect the replicant |
204 | and return it. |
205 | |
206 | =cut |
207 | |
208 | sub connect_replicant { |
209 | my ($self, $schema, $connect_info) = @_; |
210 | my $replicant = $self->create_replicant($schema); |
f15afa13 |
211 | $replicant->connect_info($connect_info); |
d40080c3 |
212 | |
0bbe6676 |
213 | ## It is undesirable for catalyst to connect at ->connect_replicants time, as |
d40080c3 |
214 | ## connections should only happen on the first request that uses the database. |
215 | ## So we try to set the driver without connecting, however this doesn't always |
216 | ## work, as a driver may need to connect to determine the DB version, and this |
217 | ## may fail. |
d6e80959 |
218 | ## |
219 | ## Why this is necessary at all, is that we need to have the final storage |
220 | ## class to apply the Replicant role. |
d40080c3 |
221 | |
222 | $self->_safely($replicant, '->_determine_driver', sub { |
223 | $replicant->_determine_driver |
224 | }); |
225 | |
0bbe6676 |
226 | Role::Tiny->apply_roles_to_object($replicant, 'DBIx::Class::Storage::DBI::Replicated::Replicant'); |
cea43436 |
227 | |
228 | # link back to master |
229 | $replicant->master($self->master); |
230 | |
bbafcf26 |
231 | return $replicant; |
232 | } |
233 | |
f15afa13 |
234 | =head2 _safely_ensure_connected ($replicant) |
235 | |
236 | The standard ensure_connected method with throw an exception should it fail to |
237 | connect. For the master database this is desirable, but since replicants are |
238 | allowed to fail, this behavior is not desirable. This method wraps the call |
239 | to ensure_connected in an eval in order to catch any generated errors. That |
48580715 |
240 | way a slave can go completely offline (e.g. the box itself can die) without |
f15afa13 |
241 | bringing down your entire pool of databases. |
242 | |
243 | =cut |
244 | |
245 | sub _safely_ensure_connected { |
246 | my ($self, $replicant, @args) = @_; |
d40080c3 |
247 | |
248 | return $self->_safely($replicant, '->ensure_connected', sub { |
249 | $replicant->ensure_connected(@args) |
250 | }); |
251 | } |
252 | |
253 | =head2 _safely ($replicant, $name, $code) |
254 | |
255 | Execute C<$code> for operation C<$name> catching any exceptions and printing an |
256 | error message to the C<<$replicant->debugobj>>. |
257 | |
258 | Returns 1 on success and undef on failure. |
259 | |
260 | =cut |
261 | |
262 | sub _safely { |
263 | my ($self, $replicant, $name, $code) = @_; |
264 | |
52b420dd |
265 | return try { |
266 | $code->(); |
267 | 1; |
ed7ab0f4 |
268 | } catch { |
d7a58a29 |
269 | $replicant->debugobj->print(sprintf( |
270 | "Exception trying to $name for replicant %s, error is %s", |
9780718f |
271 | $replicant->_dbi_connect_info->[0], $_) |
d7a58a29 |
272 | ); |
52b420dd |
273 | undef; |
ed7ab0f4 |
274 | }; |
f15afa13 |
275 | } |
276 | |
26ab719a |
277 | =head2 connected_replicants |
278 | |
279 | Returns true if there are connected replicants. Actually is overloaded to |
280 | return the number of replicants. So you can do stuff like: |
281 | |
64cdad22 |
282 | if( my $num_connected = $storage->has_connected_replicants ) { |
283 | print "I have $num_connected connected replicants"; |
284 | } else { |
285 | print "Sorry, no replicants."; |
286 | } |
26ab719a |
287 | |
288 | This method will actually test that each replicant in the L</replicants> hashref |
289 | is actually connected, try not to hit this 10 times a second. |
290 | |
291 | =cut |
292 | |
293 | sub connected_replicants { |
64cdad22 |
294 | my $self = shift @_; |
0bbe6676 |
295 | return List::Util::sum( map { |
64cdad22 |
296 | $_->connected ? 1:0 |
297 | } $self->all_replicants ); |
26ab719a |
298 | } |
299 | |
50336325 |
300 | =head2 active_replicants |
301 | |
302 | This is an array of replicants that are considered to be active in the pool. |
303 | This does not check to see if they are connected, but if they are not, DBIC |
304 | should automatically reconnect them for us when we hit them with a query. |
305 | |
306 | =cut |
307 | |
308 | sub active_replicants { |
64cdad22 |
309 | my $self = shift @_; |
310 | return ( grep {$_} map { |
311 | $_->active ? $_:0 |
312 | } $self->all_replicants ); |
50336325 |
313 | } |
314 | |
26ab719a |
315 | =head2 all_replicants |
316 | |
317 | Just a simple array of all the replicant storages. No particular order to the |
318 | array is given, nor should any meaning be derived. |
319 | |
320 | =cut |
321 | |
322 | sub all_replicants { |
64cdad22 |
323 | my $self = shift @_; |
324 | return values %{$self->replicants}; |
26ab719a |
325 | } |
326 | |
4a607d7a |
327 | =head2 validate_replicants |
328 | |
329 | This does a check to see if 1) each replicate is connected (or reconnectable), |
330 | 2) that is ->is_replicating, and 3) that it is not exceeding the lag amount |
331 | defined by L</maximum_lag>. Replicants that fail any of these tests are set to |
332 | inactive, and thus removed from the replication pool. |
333 | |
334 | This tests L<all_replicants>, since a replicant that has been previous marked |
48580715 |
335 | as inactive can be reactivated should it start to pass the validation tests again. |
4a607d7a |
336 | |
337 | See L<DBIx::Class::Storage::DBI> for more about checking if a replicating |
338 | connection is not following a master or is lagging. |
339 | |
340 | Calling this method will generate queries on the replicant databases so it is |
341 | not recommended that you run them very often. |
342 | |
13b9e828 |
343 | This method requires that your underlying storage engine supports some sort of |
344 | native replication mechanism. Currently only MySQL native replication is |
345 | supported. Your patches to make other replication types work are welcomed. |
346 | |
4a607d7a |
347 | =cut |
348 | |
349 | sub validate_replicants { |
64cdad22 |
350 | my $self = shift @_; |
351 | foreach my $replicant($self->all_replicants) { |
13b9e828 |
352 | if($self->_safely_ensure_connected($replicant)) { |
353 | my $is_replicating = $replicant->is_replicating; |
354 | unless(defined $is_replicating) { |
9901aad7 |
355 | $replicant->debugobj->print("Storage Driver ".ref($self)." Does not support the 'is_replicating' method. Assuming you are manually managing.\n"); |
13b9e828 |
356 | next; |
357 | } else { |
358 | if($is_replicating) { |
359 | my $lag_behind_master = $replicant->lag_behind_master; |
360 | unless(defined $lag_behind_master) { |
9901aad7 |
361 | $replicant->debugobj->print("Storage Driver ".ref($self)." Does not support the 'lag_behind_master' method. Assuming you are manually managing.\n"); |
13b9e828 |
362 | next; |
363 | } else { |
364 | if($lag_behind_master <= $self->maximum_lag) { |
365 | $replicant->active(1); |
366 | } else { |
367 | $replicant->active(0); |
368 | } |
369 | } |
370 | } else { |
371 | $replicant->active(0); |
372 | } |
373 | } |
64cdad22 |
374 | } else { |
64cdad22 |
375 | $replicant->active(0); |
7edf5f1c |
376 | } |
64cdad22 |
377 | } |
378 | ## Mark that we completed this validation. |
0bbe6676 |
379 | $self->last_validated(time); |
4a607d7a |
380 | } |
381 | |
26ab719a |
382 | =head1 AUTHOR |
383 | |
0bbe6676 |
384 | John Napiorkowski <jjnapiork@cpan.org> |
26ab719a |
385 | |
386 | =head1 LICENSE |
387 | |
388 | You may distribute this code under the same terms as Perl itself. |
389 | |
390 | =cut |
391 | |
cb6ec758 |
392 | 1; |