fixed up the replication test, added some tests for the dbd::multi problem of null...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replicated;
2
3 use strict;
4 use warnings;
5
6 use DBIx::Class::Storage::DBI;
7 use DBD::Multi;
8
9 use base qw/Class::Accessor::Fast/;
10
11 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
12
13 =head1 NAME
14
15 DBIx::Class::Storage::DBI::Replicated - ALPHA Replicated database support
16
17 =head1 SYNOPSIS
18
19 The Following example shows how to change an existing $schema to a replicated
20 storage type and update it's connection information to contain a master DSN and
21 an array of slaves.
22
23     ## Change storage_type in your schema class
24     $schema->storage_type( '::DBI::Replicated' );
25     
26     ## Set your connection.
27     $schema->connect(
28         $dsn, $user, $password, {
29                 AutoCommit => 1,
30                 ## Other standard DBI connection or DBD custom attributes added as
31                 ## usual.  Additionally, we have two custom attributes for defining
32                 ## slave information and controlling how the underlying DBD::Multi
33                 slaves_connect_info => [
34                    ## Define each slave like a 'normal' DBI connection, but you add
35                    ## in a DBD::Multi custom attribute to define how the slave is
36                    ## prioritized.  Please see DBD::Multi for more.
37                    [$slave1dsn, $user, $password, {%slave1opts, priority=>10}],
38                [$slave2dsn, $user, $password, {%slave2opts, priority=>10}],
39                [$slave3dsn, $user, $password, {%slave3opts, priority=>20}],
40                ## add in a preexisting database handle
41                [$dbh, '','', {priority=>30}], 
42                ## DBD::Multi will call this coderef for connects 
43                [sub {  DBI->connect(< DSN info >) }, '', '', {priority=>40}],  
44                ## If the last item is hashref, we use that for DBD::Multi's 
45                ## configuration information.  Again, see DBD::Multi for more.
46                {timeout=>25, failed_max=>2},               
47                 ],
48         },
49     );
50     
51     ## Now, just use the schema as normal
52     $schema->resultset('Table')->find(< unique >); ## Reads will use slaves
53     $schema->resultset('Table')->create(\%info); ## Writes will use master
54
55 =head1 DESCRIPTION
56
57 Warning: This class is marked ALPHA.  We are using this in development and have
58 some basic test coverage but the code hasn't yet been stressed by a variety
59 of databases.  Individual DB's may have quirks we are not aware of.  Please
60 use this in development and pass along your experiences/bug fixes.
61
62 This class implements replicated data store for DBI. Currently you can define
63 one master and numerous slave database connections. All write-type queries
64 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
65 database, all read-type queries (SELECTs) go to the slave database.
66
67 For every slave database you can define a priority value, which controls data
68 source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
69 sources used (if they have the same priority, the are used randomized), than
70 if all low priority data sources fail, higher ones tried in order.
71
72 =head1 CONFIGURATION
73
74 Please see L<DBD::Multi> for most configuration information.
75
76 =cut
77
78 sub new {
79     my $proto = shift;
80     my $class = ref( $proto ) || $proto;
81     my $self = {};
82
83     bless( $self, $class );
84
85     $self->write_source( DBIx::Class::Storage::DBI->new );
86     $self->read_source( DBIx::Class::Storage::DBI->new );
87
88     return $self;
89 }
90
91 sub all_sources {
92     my $self = shift;
93
94     my @sources = ($self->read_source, $self->write_source);
95
96     return wantarray ? @sources : \@sources;
97 }
98
99 sub _connect_info {
100         my $self = shift;
101     my $master = $self->write_source->_connect_info;
102     $master->[-1]->{slave_connect_info} = $self->read_source->_connect_info;
103     return $master;
104 }
105
106 sub connect_info {
107         my ($self, $source_info) = @_;
108
109     ## if there is no $source_info, treat this sub like an accessor
110     return $self->_connect_info
111      if !$source_info;
112     
113     ## Alright, let's conect the master 
114     $self->write_source->connect_info($source_info);
115   
116     ## Now, build and then connect the Slaves
117     my @slaves_connect_info = @{$source_info->[-1]->{slaves_connect_info}};   
118     my $dbd_multi_config = ref $slaves_connect_info[-1] eq 'HASH' 
119         ? pop @slaves_connect_info : {};
120
121     ## We need to do this since SQL::Abstract::Limit can't guess what DBD::Multi is
122     $dbd_multi_config->{limit_dialect} = $self->write_source->sql_maker->limit_dialect
123         unless defined $dbd_multi_config->{limit_dialect};
124
125     @slaves_connect_info = map {
126         ## if the first element in the arrayhash is a ref, make that the value
127         my $db = ref $_->[0] ? $_->[0] : $_;
128         my $priority = $_->[-1]->{priority} || 10; ## default priority is 10
129         $priority => $db;
130     } @slaves_connect_info;
131     
132     $self->read_source->connect_info([ 
133         'dbi:Multi:', undef, undef, { 
134                 dsns => [@slaves_connect_info],
135                 %$dbd_multi_config,
136         },
137     ]);
138     
139     ## Return the formated connection information
140     return $self->_connect_info;
141 }
142
143 sub select {
144     shift->read_source->select( @_ );
145 }
146 sub select_single {
147     shift->read_source->select_single( @_ );
148 }
149 sub throw_exception {
150     shift->read_source->throw_exception( @_ );
151 }
152 sub sql_maker {
153     shift->read_source->sql_maker( @_ );
154 }
155 sub columns_info_for {
156     shift->read_source->columns_info_for( @_ );
157 }
158 sub sqlt_type {
159     shift->read_source->sqlt_type( @_ );
160 }
161 sub create_ddl_dir {
162     shift->read_source->create_ddl_dir( @_ );
163 }
164 sub deployment_statements {
165     shift->read_source->deployment_statements( @_ );
166 }
167 sub datetime_parser {
168     shift->read_source->datetime_parser( @_ );
169 }
170 sub datetime_parser_type {
171     shift->read_source->datetime_parser_type( @_ );
172 }
173 sub build_datetime_parser {
174     shift->read_source->build_datetime_parser( @_ );
175 }
176
177 sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
178 sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
179 sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
180 sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
181 sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
182
183 sub DESTROY {
184     my $self = shift;
185
186     undef $self->{write_source};
187     undef $self->{read_sources};
188 }
189
190 sub last_insert_id {
191     shift->write_source->last_insert_id( @_ );
192 }
193 sub insert {
194     shift->write_source->insert( @_ );
195 }
196 sub update {
197     shift->write_source->update( @_ );
198 }
199 sub update_all {
200     shift->write_source->update_all( @_ );
201 }
202 sub delete {
203     shift->write_source->delete( @_ );
204 }
205 sub delete_all {
206     shift->write_source->delete_all( @_ );
207 }
208 sub create {
209     shift->write_source->create( @_ );
210 }
211 sub find_or_create {
212     shift->write_source->find_or_create( @_ );
213 }
214 sub update_or_create {
215     shift->write_source->update_or_create( @_ );
216 }
217 sub connected {
218     shift->write_source->connected( @_ );
219 }
220 sub ensure_connected {
221     shift->write_source->ensure_connected( @_ );
222 }
223 sub dbh {
224     shift->write_source->dbh( @_ );
225 }
226 sub txn_do {
227     shift->write_source->txn_do( @_ );
228 }
229 sub txn_commit {
230     shift->write_source->txn_commit( @_ );
231 }
232 sub txn_rollback {
233     shift->write_source->txn_rollback( @_ );
234 }
235 sub sth {
236     shift->write_source->sth( @_ );
237 }
238 sub deploy {
239     shift->write_source->deploy( @_ );
240 }
241 sub _prep_for_execute {
242         shift->write_source->_prep_for_execute(@_);
243 }
244
245 sub debugobj {
246         shift->write_source->debugobj(@_);
247 }
248 sub debug {
249     shift->write_source->debug(@_);
250 }
251
252 sub debugfh { shift->_not_supported( 'debugfh' ) };
253 sub debugcb { shift->_not_supported( 'debugcb' ) };
254
255 sub _not_supported {
256     my( $self, $method ) = @_;
257
258     die "This Storage does not support $method method.";
259 }
260
261 =head1 SEE ALSO
262
263 L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
264
265 =head1 AUTHOR
266
267 Norbert Csongrádi <bert@cpan.org>
268
269 Peter Siklósi <einon@einon.hu>
270
271 John Napiorkowski <john.napiorkowski@takkle.com>
272
273 =head1 LICENSE
274
275 You may distribute this code under the same terms as Perl itself.
276
277 =cut
278
279 1;