convert from the bottom up
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated / Pool.pm
1 package DBIx::Class::Storage::DBI::Replicated::Pool;
2
3 use Moo;
4 use Role::Tiny ();
5 use List::Util ();
6 use Scalar::Util qw(reftype);
7 use DBI ();
8 use Carp::Clan qw/^DBIx::Class/;
9 use Try::Tiny;
10 use DBIx::Class::Storage::DBI::Replicated::Types
11   qw(PositiveInteger Number DBICStorageDBI ClassName HashRef);
12
13 =head1 NAME
14
15 DBIx::Class::Storage::DBI::Replicated::Pool - Manage a pool of replicants
16
17 =head1 SYNOPSIS
18
19 This class is used internally by L<DBIx::Class::Storage::DBI::Replicated>.  You
20 shouldn't need to create instances of this class.
21
22 =head1 DESCRIPTION
23
24 In a replicated storage type, there is at least one replicant to handle the
25 read-only traffic.  The Pool class manages this replicant, or list of 
26 replicants, and gives some methods for querying information about their status.
27
28 =head1 ATTRIBUTES
29
30 This class defines the following attributes.
31
32 =head2 maximum_lag ($num)
33
34 This is a number which defines the maximum allowed lag returned by the
35 L<DBIx::Class::Storage::DBI/lag_behind_master> method.  The default is 0.  In
36 general, this should return a larger number when the replicant is lagging
37 behind its master, however the implementation of this is database specific, so
38 don't count on this number having a fixed meaning.  For example, MySQL will
39 return a number of seconds that the replicating database is lagging.
40
41 =cut
42
43 has 'maximum_lag' => (
44   is=>'rw',
45   isa=>Number,
46   lazy=>1,
47   default=>sub {0},
48 );
49
50 =head2 last_validated
51
52 This is an integer representing a time since the last time the replicants were
53 validated. It's nothing fancy, just an integer provided via the perl L<time|perlfunc/time>
54 built-in.
55
56 =cut
57
58 has 'last_validated' => (
59   is=>'rw',
60   isa=>PositiveInteger,
61   lazy=>1,
62   default=>sub {0},
63 );
64
65 =head2 replicant_type ($classname)
66
67 Base class used to instantiate replicants that are in the pool.  Unless you
68 need to subclass L<DBIx::Class::Storage::DBI::Replicated::Replicant> you should
69 just leave this alone.
70
71 =cut
72
73 has 'replicant_type' => (
74   is=>'ro',
75   isa=>ClassName,
76   default=> sub{'DBIx::Class::Storage::DBI'},
77   handles=>{
78     'create_replicant' => 'new',
79   },  
80 );
81
82 =head2 replicants
83
84 A hashref of replicants, with the key being the dsn and the value returning the
85 actual replicant storage.  For example, if the $dsn element is something like:
86
87   "dbi:SQLite:dbname=dbfile"
88
89 You could access the specific replicant via:
90
91   $schema->storage->replicants->{'dbname=dbfile'}
92
93 =cut
94
95 has 'replicants' => (
96   is => 'rw',
97   isa => HashRef,
98   default => sub { +{} },
99 );
100
101 has next_unknown_replicant_id => (
102   is => 'rw',
103   isa=>PositiveInteger
104   default => sub { 1 },
105 );
106
107 sub inc_unknown_replicant_id {
108     my $self = shift;
109     my $next = $self->next_unknown_replicant_id + 1;
110     $self->next_unknown_replicant_id($next);
111     return $next;
112 }
113
114 =head2 master
115
116 Reference to the master Storage.
117
118 =cut
119
120 has master => (
121   is => 'rw',
122   isa =>DBICStorageDBI,
123   weak_ref => 1,
124 );
125
126 =head1 METHODS
127
128 This class defines the following methods.
129
130 =head2 connect_replicants ($schema, Array[$connect_info])
131
132 Given an array of $dsn or connect_info structures suitable for connected to a
133 database, create an L<DBIx::Class::Storage::DBI::Replicated::Replicant> object
134 and store it in the L</replicants> attribute.
135
136 =cut
137
138 sub connect_replicants {
139   my $self = shift @_;
140   my $schema = shift @_;
141
142   my @newly_created = ();
143   foreach my $connect_info (@_) {
144     $connect_info = [ $connect_info ]
145       if reftype $connect_info ne 'ARRAY';
146
147     my $connect_coderef =
148       (reftype($connect_info->[0])||'') eq 'CODE' ? $connect_info->[0]
149         : (reftype($connect_info->[0])||'') eq 'HASH' &&
150           $connect_info->[0]->{dbh_maker};
151
152     my $dsn;
153     my $replicant = do {
154       ## yes this is evil, but it only usually happens once (for coderefs)
155       ## this will fail if the coderef does not actually DBI::connect
156       no warnings 'redefine';
157       my $connect = \&DBI::connect;
158       local *DBI::connect = sub {
159         $dsn = $_[1];
160         goto $connect;
161       };
162       $self->connect_replicant($schema, $connect_info);
163     };
164
165     my $key;
166
167     if (!$dsn) {
168       if (!$connect_coderef) {
169         $dsn = $connect_info->[0];
170         $dsn = $dsn->{dsn} if (reftype($dsn)||'') eq 'HASH';
171       }
172       else {
173         # all attempts to get the DSN failed
174         $key = "UNKNOWN_" . $self->next_unknown_replicant_id;
175         $self->inc_unknown_replicant_id;
176       }
177     }
178     if ($dsn) {
179       $replicant->dsn($dsn);
180       ($key) = ($dsn =~ m/^dbi\:.+\:(.+)$/i);
181     }
182
183     if($key) {
184       $replicant->id($key);
185     } else {
186       $replicant->debugobj->print("Could not create an ID for the replicant!");
187     }
188
189     ## Add the new replicant to the list
190     $self->replicants({
191         $key => $replicant,
192         %{$self->replicants},
193     });  
194
195     push @newly_created, $replicant;
196   }
197
198   return @newly_created;
199 }
200
201 =head2 connect_replicant ($schema, $connect_info)
202
203 Given a schema object and a hashref of $connect_info, connect the replicant
204 and return it.
205
206 =cut
207
208 sub connect_replicant {
209   my ($self, $schema, $connect_info) = @_;
210   my $replicant = $self->create_replicant($schema);
211   $replicant->connect_info($connect_info);
212
213 ## It is undesirable for catalyst to connect at ->connect_replicants time, as
214 ## connections should only happen on the first request that uses the database.
215 ## So we try to set the driver without connecting, however this doesn't always
216 ## work, as a driver may need to connect to determine the DB version, and this
217 ## may fail.
218 ##
219 ## Why this is necessary at all, is that we need to have the final storage
220 ## class to apply the Replicant role.
221
222   $self->_safely($replicant, '->_determine_driver', sub {
223     $replicant->_determine_driver
224   });
225
226   Role::Tiny->apply_roles_to_object($replicant, 'DBIx::Class::Storage::DBI::Replicated::Replicant');
227
228   # link back to master
229   $replicant->master($self->master);
230
231   return $replicant;
232 }
233
234 =head2 _safely_ensure_connected ($replicant)
235
236 The standard ensure_connected method with throw an exception should it fail to
237 connect.  For the master database this is desirable, but since replicants are
238 allowed to fail, this behavior is not desirable.  This method wraps the call
239 to ensure_connected in an eval in order to catch any generated errors.  That
240 way a slave can go completely offline (e.g. the box itself can die) without
241 bringing down your entire pool of databases.
242
243 =cut
244
245 sub _safely_ensure_connected {
246   my ($self, $replicant, @args) = @_;
247
248   return $self->_safely($replicant, '->ensure_connected', sub {
249     $replicant->ensure_connected(@args)
250   });
251 }
252
253 =head2 _safely ($replicant, $name, $code)
254
255 Execute C<$code> for operation C<$name> catching any exceptions and printing an
256 error message to the C<<$replicant->debugobj>>.
257
258 Returns 1 on success and undef on failure.
259
260 =cut
261
262 sub _safely {
263   my ($self, $replicant, $name, $code) = @_;
264
265   return try {
266     $code->();
267     1;
268   } catch {
269     $replicant->debugobj->print(sprintf(
270       "Exception trying to $name for replicant %s, error is %s",
271       $replicant->_dbi_connect_info->[0], $_)
272     );
273     undef;
274   };
275 }
276
277 =head2 connected_replicants
278
279 Returns true if there are connected replicants.  Actually is overloaded to
280 return the number of replicants.  So you can do stuff like:
281
282   if( my $num_connected = $storage->has_connected_replicants ) {
283     print "I have $num_connected connected replicants";
284   } else {
285     print "Sorry, no replicants.";
286   }
287
288 This method will actually test that each replicant in the L</replicants> hashref
289 is actually connected, try not to hit this 10 times a second.
290
291 =cut
292
293 sub connected_replicants {
294   my $self = shift @_;
295   return List::Util::sum( map {
296     $_->connected ? 1:0
297   } $self->all_replicants );
298 }
299
300 =head2 active_replicants
301
302 This is an array of replicants that are considered to be active in the pool.
303 This does not check to see if they are connected, but if they are not, DBIC
304 should automatically reconnect them for us when we hit them with a query.
305
306 =cut
307
308 sub active_replicants {
309   my $self = shift @_;
310   return ( grep {$_} map {
311     $_->active ? $_:0
312   } $self->all_replicants );
313 }
314
315 =head2 all_replicants
316
317 Just a simple array of all the replicant storages.  No particular order to the
318 array is given, nor should any meaning be derived.
319
320 =cut
321
322 sub all_replicants {
323   my $self = shift @_;
324   return values %{$self->replicants};
325 }
326
327 =head2 validate_replicants
328
329 This does a check to see if 1) each replicate is connected (or reconnectable),
330 2) that is ->is_replicating, and 3) that it is not exceeding the lag amount
331 defined by L</maximum_lag>.  Replicants that fail any of these tests are set to
332 inactive, and thus removed from the replication pool.
333
334 This tests L<all_replicants>, since a replicant that has been previous marked
335 as inactive can be reactivated should it start to pass the validation tests again.
336
337 See L<DBIx::Class::Storage::DBI> for more about checking if a replicating
338 connection is not following a master or is lagging.
339
340 Calling this method will generate queries on the replicant databases so it is
341 not recommended that you run them very often.
342
343 This method requires that your underlying storage engine supports some sort of
344 native replication mechanism.  Currently only MySQL native replication is
345 supported.  Your patches to make other replication types work are welcomed.
346
347 =cut
348
349 sub validate_replicants {
350   my $self = shift @_;
351   foreach my $replicant($self->all_replicants) {
352     if($self->_safely_ensure_connected($replicant)) {
353       my $is_replicating = $replicant->is_replicating;
354       unless(defined $is_replicating) {
355         $replicant->debugobj->print("Storage Driver ".ref($self)." Does not support the 'is_replicating' method.  Assuming you are manually managing.\n");
356         next;
357       } else {
358         if($is_replicating) {
359           my $lag_behind_master = $replicant->lag_behind_master;
360           unless(defined $lag_behind_master) {
361             $replicant->debugobj->print("Storage Driver ".ref($self)." Does not support the 'lag_behind_master' method.  Assuming you are manually managing.\n");
362             next;
363           } else {
364             if($lag_behind_master <= $self->maximum_lag) {
365               $replicant->active(1);
366             } else {
367               $replicant->active(0);  
368             }
369           }    
370         } else {
371           $replicant->active(0);
372         }
373       }
374     } else {
375       $replicant->active(0);
376     }
377   }
378   ## Mark that we completed this validation.  
379   $self->last_validated(time);  
380 }
381
382 =head1 AUTHOR
383
384 John Napiorkowski <jjnapiork@cpan.org>
385
386 =head1 LICENSE
387
388 You may distribute this code under the same terms as Perl itself.
389
390 =cut
391
392 1;