renamed replication storage to replicated (since it's not really doing any replicatio...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replicated.pm
1 package DBIx::Class::Storage::DBI::Replication;
2
3 use strict;
4 use warnings;
5
6 use DBIx::Class::Storage::DBI;
7 use DBD::Multi;
8 use base qw/Class::Accessor::Fast/;
9
10 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
11
12 =head1 NAME
13
14 DBIx::Class::Storage::DBI::Replication - EXPERIMENTAL Replicated database support
15
16 =head1 SYNOPSIS
17
18   # change storage_type in your schema class
19     $schema->storage_type( '::DBI::Replication' );
20     $schema->connect_info( [
21                      [ "dbi:mysql:database=test;hostname=master", "username", "password", { AutoCommit => 1 } ], # master
22                      [ "dbi:mysql:database=test;hostname=slave1", "username", "password", { priority => 10 } ],  # slave1
23                      [ "dbi:mysql:database=test;hostname=slave2", "username", "password", { priority => 10 } ],  # slave2
24                      [ $dbh, '','', {priority=>10}], # add in a preexisting database handle
25                      [ sub {  DBI->connect }, '', '', {priority=>10}], # DBD::Multi will call this coderef for connects
26                      <...>,
27                      { limit_dialect => 'LimitXY' } # If needed, see below
28                     ] );
29
30 =head1 DESCRIPTION
31
32 Warning: This class is marked EXPERIMENTAL. It works for the authors but does
33 not currently have automated tests so your mileage may vary.
34
35 This class implements replicated data store for DBI. Currently you can define
36 one master and numerous slave database connections. All write-type queries
37 (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
38 database, all read-type queries (SELECTs) go to the slave database.
39
40 For every slave database you can define a priority value, which controls data
41 source usage pattern. It uses L<DBD::Multi>, so first the lower priority data
42 sources used (if they have the same priority, the are used randomized), than
43 if all low priority data sources fail, higher ones tried in order.
44
45 =head1 CONFIGURATION
46
47 =head2 Limit dialect
48
49 If you use LIMIT in your queries (effectively, if you use
50 SQL::Abstract::Limit), do not forget to set up limit_dialect (perldoc
51 SQL::Abstract::Limit) by passing it as an option in the (optional) hash
52 reference to connect_info.  DBIC can not set it up automatically, since it can
53 not guess DBD::Multi connection types.
54
55 =cut
56
57 sub new {
58     my $proto = shift;
59     my $class = ref( $proto ) || $proto;
60     my $self = {};
61
62     bless( $self, $class );
63
64     $self->write_source( DBIx::Class::Storage::DBI->new );
65     $self->read_source( DBIx::Class::Storage::DBI->new );
66
67     return $self;
68 }
69
70 sub all_sources {
71     my $self = shift;
72
73     my @sources = ($self->read_source, $self->write_source);
74
75     return wantarray ? @sources : \@sources;
76 }
77
78 sub connect_info {
79     my( $self, $source_info ) = @_;
80
81     my( $info, $global_options, $options, @dsns );
82
83     $info = [ @$source_info ];
84
85     $global_options = ref $info->[-1] eq 'HASH' ? pop( @$info ) : {};
86     if( ref( $options = $info->[0]->[-1] ) eq 'HASH' ) {
87         # Local options present in dsn, merge them with global options
88         map { $global_options->{$_} = $options->{$_} } keys %$options;
89         pop @{$info->[0]};
90     }
91
92     # We need to copy-pass $global_options, since connect_info clears it while
93     # processing options
94     $self->write_source->connect_info( @{$info->[0]}, { %$global_options } );
95
96         ## allow either a DSN string or an already connect $dbh.  Just remember if
97         ## you use the $dbh option then DBD::Multi has no idea how to reconnect in
98         ## the event of a failure.
99         
100     @dsns = map {
101         ## if the first element in the arrayhash is a ref, make that the value
102         my $db = ref $_->[0] ? $_->[0] : $_;
103         ($_->[3]->{priority} || 10) => $db;
104     } @{$info->[0]}[1..@{$info->[0]}-1];
105     
106     $global_options->{dsns} = \@dsns;
107
108     $self->read_source->connect_info( [ 'dbi:Multi:', undef, undef, { %$global_options } ] );
109 }
110
111 sub select {
112     shift->read_source->select( @_ );
113 }
114 sub select_single {
115     shift->read_source->select_single( @_ );
116 }
117 sub throw_exception {
118     shift->read_source->throw_exception( @_ );
119 }
120 sub sql_maker {
121     shift->read_source->sql_maker( @_ );
122 }
123 sub columns_info_for {
124     shift->read_source->columns_info_for( @_ );
125 }
126 sub sqlt_type {
127     shift->read_source->sqlt_type( @_ );
128 }
129 sub create_ddl_dir {
130     shift->read_source->create_ddl_dir( @_ );
131 }
132 sub deployment_statements {
133     shift->read_source->deployment_statements( @_ );
134 }
135 sub datetime_parser {
136     shift->read_source->datetime_parser( @_ );
137 }
138 sub datetime_parser_type {
139     shift->read_source->datetime_parser_type( @_ );
140 }
141 sub build_datetime_parser {
142     shift->read_source->build_datetime_parser( @_ );
143 }
144
145 sub limit_dialect { $_->limit_dialect( @_ ) for( shift->all_sources ) }
146 sub quote_char { $_->quote_char( @_ ) for( shift->all_sources ) }
147 sub name_sep { $_->quote_char( @_ ) for( shift->all_sources ) }
148 sub disconnect { $_->disconnect( @_ ) for( shift->all_sources ) }
149 sub set_schema { $_->set_schema( @_ ) for( shift->all_sources ) }
150
151 sub DESTROY {
152     my $self = shift;
153
154     undef $self->{write_source};
155     undef $self->{read_sources};
156 }
157
158 sub last_insert_id {
159     shift->write_source->last_insert_id( @_ );
160 }
161 sub insert {
162     shift->write_source->insert( @_ );
163 }
164 sub update {
165     shift->write_source->update( @_ );
166 }
167 sub update_all {
168     shift->write_source->update_all( @_ );
169 }
170 sub delete {
171     shift->write_source->delete( @_ );
172 }
173 sub delete_all {
174     shift->write_source->delete_all( @_ );
175 }
176 sub create {
177     shift->write_source->create( @_ );
178 }
179 sub find_or_create {
180     shift->write_source->find_or_create( @_ );
181 }
182 sub update_or_create {
183     shift->write_source->update_or_create( @_ );
184 }
185 sub connected {
186     shift->write_source->connected( @_ );
187 }
188 sub ensure_connected {
189     shift->write_source->ensure_connected( @_ );
190 }
191 sub dbh {
192     shift->write_source->dbh( @_ );
193 }
194 sub txn_begin {
195     shift->write_source->txn_begin( @_ );
196 }
197 sub txn_commit {
198     shift->write_source->txn_commit( @_ );
199 }
200 sub txn_rollback {
201     shift->write_source->txn_rollback( @_ );
202 }
203 sub sth {
204     shift->write_source->sth( @_ );
205 }
206 sub deploy {
207     shift->write_source->deploy( @_ );
208 }
209
210
211 sub debugfh { shift->_not_supported( 'debugfh' ) };
212 sub debugcb { shift->_not_supported( 'debugcb' ) };
213
214 sub _not_supported {
215     my( $self, $method ) = @_;
216
217     die "This Storage does not support $method method.";
218 }
219
220 =head1 SEE ALSO
221
222 L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
223
224 =head1 AUTHOR
225
226 Norbert Csongrádi <bert@cpan.org>
227
228 Peter Siklósi <einon@einon.hu>
229
230 =head1 LICENSE
231
232 You may distribute this code under the same terms as Perl itself.
233
234 =cut
235
236 1;