Initial revision (but working :).
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Replication.pm
1 package DBIx::Class::Storage::DBI::Replication;
2
3 use strict;
4 use warnings;
5
6 use DBIx::Class::Storage::DBI;
7 use DBD::Multi;
8 use base qw/Class::Accessor::Fast/;
9
10 __PACKAGE__->mk_accessors( qw/read_source write_source/ );
11
12 =head1 NAME
13
14 DBIx::Class::Storage::DBI::Replication - Replicated database support
15
16 =head1 SYNOPSIS
17
18   # change storage_type in your schema class
19     $schema->storage_type( '::DBI::Replication' );
20     $schema->connect_info( [
21                      [ "dbi:mysql:database=test;hostname=master", "username", "password", { AutoCommit => 1 } ], # master
22                      [ "dbi:mysql:database=test;hostname=slave1", "username", "password", { priority => 10 } ],  # slave1
23                      [ "dbi:mysql:database=test;hostname=slave2", "username", "password", { priority => 10 } ],  # slave2
24                      <...>
25                     ] );
26
27 =head1 DESCRIPTION
28
29 This class implements replicated data store for DBI. Currently you can define one master and numerous slave database
30 connections. All write-type queries (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master database,
31 all read-type queries (SELECTs) go to the slave database.
32
33 For every slave database you can define a priority value, which controls data source usage pattern. It uses
34 L<DBD::Multi>, so first the lower priority data sources used (if they have the same priority, the are used
35 randomized), than if all low priority data sources fail, higher ones tried in order.
36
37 =cut
38
39 sub new {
40     my $proto = shift;
41     my $class = ref( $proto ) || $proto;
42     my $self = {
43         read_sources => [],
44         read_iterator => 0,
45     };
46
47     bless( $self, $class );
48
49     $self->write_source( DBIx::Class::Storage::DBI->new );
50     $self->read_source( DBIx::Class::Storage::DBI->new );
51
52     return $self;
53 }
54
55 sub all_sources {
56     my $self = shift;
57
58     my @sources = ($self->{read_source}, $self->write_source);
59
60     return wantarray ? @sources : \@sources;
61 }
62
63 sub connect_info {
64     my( $self, $source_info ) = @_;
65
66     $self->write_source->connect_info( $source_info->[0] );
67
68     my @dsns = map { ($_->[3]->{priority} || 10) => $_ } @{$source_info}[1..@$source_info-1];
69     $self->read_source->connect_info( [ 'dbi:Multi:', undef, undef, { dsns => \@dsns } ] );
70 }
71
72 sub select {
73     return shift->read_source()->select( @_ );
74 }
75 sub select_single {
76     return shift->read_source()->select_single( @_ );
77 }
78 sub throw_exception {
79     return shift->read_source()->throw_exception( @_ );
80 }
81 sub sql_maker {
82     return shift->read_source()->sql_maker( @_ );
83 }
84 sub columns_info_for {
85     return shift->read_source()->columns_info_for( @_ );
86 }
87 sub sqlt_type {
88     return shift->read_source()->sqlt_type( @_ );
89 }
90 sub create_ddl_dir {
91     return shift->read_source()->create_ddl_dir( @_ );
92 }
93 sub deployment_statements {
94     return shift->read_source()->deployment_statements( @_ );
95 }
96 sub datetime_parser {
97     return shift->read_source()->datetime_parser( @_ );
98 }
99 sub datetime_parser_type {
100     return shift->read_source()->datetime_parser_type( @_ );
101 }
102 sub build_datetime_parser {
103     return shift->read_source()->build_datetime_parser( @_ );
104 }
105
106 sub limit_dialect {
107     my $self = shift;
108     $self->$_->limit_dialect( @_ ) for( $self->all_sources() );
109 }
110 sub quote_char {
111     my $self = shift;
112     $self->$_->quote_char( @_ ) for( $self->all_sources() );
113 }
114 sub name_sep {
115     my $self = shift;
116     $self->$_->quote_char( @_ ) for( $self->all_sources() );
117 }
118 sub disconnect {
119     my $self = shift;
120     $self->$_->disconnect( @_ ) for( $self->all_sources() );
121 }
122 sub DESTROY {
123     my $self = shift;
124
125     $self->{write_source} = undef;
126     $self->{read_sources} = undef;
127 }
128
129 sub last_insert_id {
130     return shift->write_source()->last_insert_id( @_ );
131 }
132 sub insert {
133     return shift->write_source()->insert( @_ );
134 }
135 sub update {
136     return shift->write_source()->update( @_ );
137 }
138 sub update_all {
139     return shift->write_source()->update_all( @_ );
140 }
141 sub delete {
142     return shift->write_source()->delete( @_ );
143 }
144 sub delete_all {
145     return shift->write_source()->delete_all( @_ );
146 }
147 sub create {
148     return shift->write_source()->create( @_ );
149 }
150 sub find_or_create {
151     return shift->write_source()->find_or_create( @_ );
152 }
153 sub update_or_create {
154     return shift->write_source()->update_or_create( @_ );
155 }
156 sub connected {
157     return shift->write_source()->connected( @_ );
158 }
159 sub ensure_connected {
160     return shift->write_source()->ensure_connected( @_ );
161 }
162 sub dbh {
163     return shift->write_source()->dbh( @_ );
164 }
165 sub txn_begin {
166     return shift->write_source()->txn_begin( @_ );
167 }
168 sub txn_commit {
169     return shift->write_source()->txn_commit( @_ );
170 }
171 sub txn_rollback {
172     return shift->write_source()->txn_rollback( @_ );
173 }
174 sub sth {
175     return shift->write_source()->sth( @_ );
176 }
177 sub deploy {
178     return shift->write_source()->deploy( @_ );
179 }
180
181
182 sub debugfh { shift->_not_supported( 'debugfh' ) };
183 sub debugcb { shift->_not_supported( 'debugcb' ) };
184
185 sub _not_supported {
186     my( $self, $method ) = @_;
187
188     die "This Storage does not support $method method.";
189 }
190
191 =head1 SEE ALSO
192
193 L<DBI::Class::Storage::DBI>, L<DBD::Multi>, L<DBI>
194
195 =head1 AUTHOR
196
197 Norbert Csongrádi <bert@cpan.org>
198
199 Peter Siklósi <einon@ahq.hu>
200
201 =head1 LICENSE
202
203 You may distribute this code under the same terms as Perl itself.
204
205 =cut
206
207 1;