a6006dc10a496c621b78b5dc681227df35f9aa80
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use Scalar::Util();
13 use List::Util();
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit _on_connect_do
18        _on_disconnect_do _on_connect_do_store _on_disconnect_do_store
19        savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call disable_sth_caching unsafe auto_savepoint
26 /;
27 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
28
29
30 # default cursor class, overridable in connect_info attributes
31 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
32
33 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
34 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
35
36
37 =head1 NAME
38
39 DBIx::Class::Storage::DBI - DBI storage handler
40
41 =head1 SYNOPSIS
42
43   my $schema = MySchema->connect('dbi:SQLite:my.db');
44
45   $schema->storage->debug(1);
46   $schema->dbh_do("DROP TABLE authors");
47
48   $schema->resultset('Book')->search({
49      written_on => $schema->storage->datetime_parser(DateTime->now)
50   });
51
52 =head1 DESCRIPTION
53
54 This class represents the connection to an RDBMS via L<DBI>.  See
55 L<DBIx::Class::Storage> for general information.  This pod only
56 documents DBI-specific methods and behaviors.
57
58 =head1 METHODS
59
60 =cut
61
62 sub new {
63   my $new = shift->next::method(@_);
64
65   $new->transaction_depth(0);
66   $new->_sql_maker_opts({});
67   $new->{savepoints} = [];
68   $new->{_in_dbh_do} = 0;
69   $new->{_dbh_gen} = 0;
70
71   $new;
72 }
73
74 =head2 connect_info
75
76 This method is normally called by L<DBIx::Class::Schema/connection>, which
77 encapsulates its argument list in an arrayref before passing them here.
78
79 The argument list may contain:
80
81 =over
82
83 =item *
84
85 The same 4-element argument set one would normally pass to
86 L<DBI/connect>, optionally followed by
87 L<extra attributes|/DBIx::Class specific connection attributes>
88 recognized by DBIx::Class:
89
90   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
91
92 =item *
93
94 A single code reference which returns a connected 
95 L<DBI database handle|DBI/connect> optionally followed by 
96 L<extra attributes|/DBIx::Class specific connection attributes> recognized
97 by DBIx::Class:
98
99   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
100
101 =item *
102
103 A single hashref with all the attributes and the dsn/user/password
104 mixed together:
105
106   $connect_info_args = [{
107     dsn => $dsn,
108     user => $user,
109     password => $pass,
110     %dbi_attributes,
111     %extra_attributes,
112   }];
113
114 This is particularly useful for L<Catalyst> based applications, allowing the 
115 following config (L<Config::General> style):
116
117   <Model::DB>
118     schema_class   App::DB
119     <connect_info>
120       dsn          dbi:mysql:database=test
121       user         testuser
122       password     TestPass
123       AutoCommit   1
124     </connect_info>
125   </Model::DB>
126
127 =back
128
129 Please note that the L<DBI> docs recommend that you always explicitly
130 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
131 recommends that it be set to I<1>, and that you perform transactions
132 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
133 to I<1> if you do not do explicitly set it to zero.  This is the default 
134 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
135
136 =head3 DBIx::Class specific connection attributes
137
138 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
139 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
140 the following connection options. These options can be mixed in with your other
141 L<DBI> connection attributes, or placed in a seperate hashref
142 (C<\%extra_attributes>) as shown above.
143
144 Every time C<connect_info> is invoked, any previous settings for
145 these options will be cleared before setting the new ones, regardless of
146 whether any options are specified in the new C<connect_info>.
147
148
149 =over
150
151 =item on_connect_do
152
153 Specifies things to do immediately after connecting or re-connecting to
154 the database.  Its value may contain:
155
156 =over
157
158 =item a scalar
159
160 This contains one SQL statement to execute.
161
162 =item an array reference
163
164 This contains SQL statements to execute in order.  Each element contains
165 a string or a code reference that returns a string.
166
167 =item a code reference
168
169 This contains some code to execute.  Unlike code references within an
170 array reference, its return value is ignored.
171
172 =back
173
174 =item on_disconnect_do
175
176 Takes arguments in the same form as L</on_connect_do> and executes them
177 immediately before disconnecting from the database.
178
179 Note, this only runs if you explicitly call L</disconnect> on the
180 storage object.
181
182 =item on_connect_call
183
184 A more generalized form of L</on_connect_do> that calls the specified
185 C<connect_call_METHOD> methods in your storage driver.
186
187   on_connect_do => 'select 1'
188
189 is equivalent to:
190
191   on_connect_call => [ [ do_sql => 'select 1' ] ]
192
193 Its values may contain:
194
195 =over
196
197 =item a scalar
198
199 Will call the C<connect_call_METHOD> method.
200
201 =item a code reference
202
203 Will execute C<< $code->($storage) >>
204
205 =item an array reference
206
207 Each value can be a method name or code reference.
208
209 =item an array of arrays
210
211 For each array, the first item is taken to be the C<connect_call_> method name
212 or code reference, and the rest are parameters to it.
213
214 =back
215
216 Some predefined storage methods you may use:
217
218 =over
219
220 =item do_sql
221
222 Executes a SQL string or a code reference that returns a SQL string. This is
223 what L</on_connect_do> and L</on_disconnect_do> use.
224
225 It can take:
226
227 =over
228
229 =item a scalar
230
231 Will execute the scalar as SQL.
232
233 =item an arrayref
234
235 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
236 attributes hashref and bind values.
237
238 =item a code reference
239
240 Will execute C<< $code->($storage) >> and execute the return array refs as
241 above.
242
243 =back
244
245 =item datetime_setup
246
247 Execute any statements necessary to initialize the database session to return
248 and accept datetime/timestamp values used with
249 L<DBIx::Class::InflateColumn::DateTime>.
250
251 Only necessary for some databases, see your specific storage driver for
252 implementation details.
253
254 =back
255
256 =item on_disconnect_call
257
258 Takes arguments in the same form as L</on_connect_call> and executes them
259 immediately before disconnecting from the database.
260
261 Calls the C<disconnect_call_METHOD> methods as opposed to the
262 C<connect_call_METHOD> methods called by L</on_connect_call>.
263
264 Note, this only runs if you explicitly call L</disconnect> on the
265 storage object.
266
267 =item disable_sth_caching
268
269 If set to a true value, this option will disable the caching of
270 statement handles via L<DBI/prepare_cached>.
271
272 =item limit_dialect 
273
274 Sets the limit dialect. This is useful for JDBC-bridge among others
275 where the remote SQL-dialect cannot be determined by the name of the
276 driver alone. See also L<SQL::Abstract::Limit>.
277
278 =item quote_char
279
280 Specifies what characters to use to quote table and column names. If 
281 you use this you will want to specify L</name_sep> as well.
282
283 C<quote_char> expects either a single character, in which case is it
284 is placed on either side of the table/column name, or an arrayref of length
285 2 in which case the table/column name is placed between the elements.
286
287 For example under MySQL you should use C<< quote_char => '`' >>, and for
288 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
289
290 =item name_sep
291
292 This only needs to be used in conjunction with C<quote_char>, and is used to 
293 specify the charecter that seperates elements (schemas, tables, columns) from 
294 each other. In most cases this is simply a C<.>.
295
296 The consequences of not supplying this value is that L<SQL::Abstract>
297 will assume DBIx::Class' uses of aliases to be complete column
298 names. The output will look like I<"me.name"> when it should actually
299 be I<"me"."name">.
300
301 =item unsafe
302
303 This Storage driver normally installs its own C<HandleError>, sets
304 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
305 all database handles, including those supplied by a coderef.  It does this
306 so that it can have consistent and useful error behavior.
307
308 If you set this option to a true value, Storage will not do its usual
309 modifications to the database handle's attributes, and instead relies on
310 the settings in your connect_info DBI options (or the values you set in
311 your connection coderef, in the case that you are connecting via coderef).
312
313 Note that your custom settings can cause Storage to malfunction,
314 especially if you set a C<HandleError> handler that suppresses exceptions
315 and/or disable C<RaiseError>.
316
317 =item auto_savepoint
318
319 If this option is true, L<DBIx::Class> will use savepoints when nesting
320 transactions, making it possible to recover from failure in the inner
321 transaction without having to abort all outer transactions.
322
323 =item cursor_class
324
325 Use this argument to supply a cursor class other than the default
326 L<DBIx::Class::Storage::DBI::Cursor>.
327
328 =back
329
330 Some real-life examples of arguments to L</connect_info> and
331 L<DBIx::Class::Schema/connect>
332
333   # Simple SQLite connection
334   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
335
336   # Connect via subref
337   ->connect_info([ sub { DBI->connect(...) } ]);
338
339   # A bit more complicated
340   ->connect_info(
341     [
342       'dbi:Pg:dbname=foo',
343       'postgres',
344       'my_pg_password',
345       { AutoCommit => 1 },
346       { quote_char => q{"}, name_sep => q{.} },
347     ]
348   );
349
350   # Equivalent to the previous example
351   ->connect_info(
352     [
353       'dbi:Pg:dbname=foo',
354       'postgres',
355       'my_pg_password',
356       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
357     ]
358   );
359
360   # Same, but with hashref as argument
361   # See parse_connect_info for explanation
362   ->connect_info(
363     [{
364       dsn         => 'dbi:Pg:dbname=foo',
365       user        => 'postgres',
366       password    => 'my_pg_password',
367       AutoCommit  => 1,
368       quote_char  => q{"},
369       name_sep    => q{.},
370     }]
371   );
372
373   # Subref + DBIx::Class-specific connection options
374   ->connect_info(
375     [
376       sub { DBI->connect(...) },
377       {
378           quote_char => q{`},
379           name_sep => q{@},
380           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
381           disable_sth_caching => 1,
382       },
383     ]
384   );
385
386
387
388 =cut
389
390 sub connect_info {
391   my ($self, $info_arg) = @_;
392
393   return $self->_connect_info if !$info_arg;
394
395   my @args = @$info_arg;  # take a shallow copy for further mutilation
396   $self->_connect_info([@args]); # copy for _connect_info
397
398
399   # combine/pre-parse arguments depending on invocation style
400
401   my %attrs;
402   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
403     %attrs = %{ $args[1] || {} };
404     @args = $args[0];
405   }
406   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
407     %attrs = %{$args[0]};
408     @args = ();
409     for (qw/password user dsn/) {
410       unshift @args, delete $attrs{$_};
411     }
412   }
413   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
414     %attrs = (
415       % { $args[3] || {} },
416       % { $args[4] || {} },
417     );
418     @args = @args[0,1,2];
419   }
420
421   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
422   #  the new set of options
423   $self->_sql_maker(undef);
424   $self->_sql_maker_opts({});
425
426   if(keys %attrs) {
427     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
428       if(my $value = delete $attrs{$storage_opt}) {
429         $self->$storage_opt($value);
430       }
431     }
432     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
433       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
434         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
435       }
436     }
437     for my $connect_do_opt (qw/on_connect_do on_disconnect_do/) {
438       if(my $opt_val = delete $attrs{$connect_do_opt}) {
439         $self->$connect_do_opt($opt_val);
440       }
441     }
442   }
443
444   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
445
446   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
447   $self->_connect_info;
448 }
449
450 =head2 on_connect_do
451
452 This method is deprecated in favour of setting via L</connect_info>.
453
454 =cut
455
456 sub on_connect_do {
457   my $self = shift;
458   $self->_setup_connect_do(on_connect_do => @_);
459 }
460
461 =head2 on_disconnect_do
462
463 This method is deprecated in favour of setting via L</connect_info>.
464
465 =cut
466
467 sub on_disconnect_do {
468   my $self = shift;
469   $self->_setup_connect_do(on_disconnect_do => @_);
470 }
471
472 sub _setup_connect_do {
473   my ($self, $opt) = (shift, shift);
474
475   my $accessor = "_$opt";
476   my $store    = "_${opt}_store";
477
478   return $self->$accessor if not @_;
479
480   my $val = shift;
481
482   if (not defined $val) {
483     $self->$accessor(undef);
484     $self->$store(undef);
485     return;
486   }
487
488   my @store;
489
490   if (not ref($val)) {
491     push @store, [ 'do_sql', $val ];
492   } elsif (ref($val) eq 'CODE') {
493     push @store, $val;
494   } elsif (ref($val) eq 'ARRAY') {
495     push @store, map [ 'do_sql', $_ ], @$val;
496   } else {
497     $self->throw_exception("Invalid type for $opt ".ref($val));
498   }
499
500   $self->$store(\@store);
501   $self->$accessor($val);
502 }
503
504 =head2 dbh_do
505
506 Arguments: ($subref | $method_name), @extra_coderef_args?
507
508 Execute the given $subref or $method_name using the new exception-based
509 connection management.
510
511 The first two arguments will be the storage object that C<dbh_do> was called
512 on and a database handle to use.  Any additional arguments will be passed
513 verbatim to the called subref as arguments 2 and onwards.
514
515 Using this (instead of $self->_dbh or $self->dbh) ensures correct
516 exception handling and reconnection (or failover in future subclasses).
517
518 Your subref should have no side-effects outside of the database, as
519 there is the potential for your subref to be partially double-executed
520 if the database connection was stale/dysfunctional.
521
522 Example:
523
524   my @stuff = $schema->storage->dbh_do(
525     sub {
526       my ($storage, $dbh, @cols) = @_;
527       my $cols = join(q{, }, @cols);
528       $dbh->selectrow_array("SELECT $cols FROM foo");
529     },
530     @column_list
531   );
532
533 =cut
534
535 sub dbh_do {
536   my $self = shift;
537   my $code = shift;
538
539   my $dbh = $self->_dbh;
540
541   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
542       || $self->{transaction_depth};
543
544   local $self->{_in_dbh_do} = 1;
545
546   my @result;
547   my $want_array = wantarray;
548
549   eval {
550     $self->_verify_pid if $dbh;
551     if(!$self->_dbh) {
552         $self->_populate_dbh;
553         $dbh = $self->_dbh;
554     }
555
556     if($want_array) {
557         @result = $self->$code($dbh, @_);
558     }
559     elsif(defined $want_array) {
560         $result[0] = $self->$code($dbh, @_);
561     }
562     else {
563         $self->$code($dbh, @_);
564     }
565   };
566
567   my $exception = $@;
568   if(!$exception) { return $want_array ? @result : $result[0] }
569
570   $self->throw_exception($exception) if $self->connected;
571
572   # We were not connected - reconnect and retry, but let any
573   #  exception fall right through this time
574   $self->_populate_dbh;
575   $self->$code($self->_dbh, @_);
576 }
577
578 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
579 # It also informs dbh_do to bypass itself while under the direction of txn_do,
580 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
581 sub txn_do {
582   my $self = shift;
583   my $coderef = shift;
584
585   ref $coderef eq 'CODE' or $self->throw_exception
586     ('$coderef must be a CODE reference');
587
588   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
589
590   local $self->{_in_dbh_do} = 1;
591
592   my @result;
593   my $want_array = wantarray;
594
595   my $tried = 0;
596   while(1) {
597     eval {
598       $self->_verify_pid if $self->_dbh;
599       $self->_populate_dbh if !$self->_dbh;
600
601       $self->txn_begin;
602       if($want_array) {
603           @result = $coderef->(@_);
604       }
605       elsif(defined $want_array) {
606           $result[0] = $coderef->(@_);
607       }
608       else {
609           $coderef->(@_);
610       }
611       $self->txn_commit;
612     };
613
614     my $exception = $@;
615     if(!$exception) { return $want_array ? @result : $result[0] }
616
617     if($tried++ > 0 || $self->connected) {
618       eval { $self->txn_rollback };
619       my $rollback_exception = $@;
620       if($rollback_exception) {
621         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
622         $self->throw_exception($exception)  # propagate nested rollback
623           if $rollback_exception =~ /$exception_class/;
624
625         $self->throw_exception(
626           "Transaction aborted: ${exception}. "
627           . "Rollback failed: ${rollback_exception}"
628         );
629       }
630       $self->throw_exception($exception)
631     }
632
633     # We were not connected, and was first try - reconnect and retry
634     # via the while loop
635     $self->_populate_dbh;
636   }
637 }
638
639 =head2 disconnect
640
641 Our C<disconnect> method also performs a rollback first if the
642 database is not in C<AutoCommit> mode.
643
644 =cut
645
646 sub disconnect {
647   my ($self) = @_;
648
649   if( $self->connected ) {
650     if (my $connection_call = $self->on_disconnect_call) {
651       $self->_do_connection_actions(disconnect_call_ => $connection_call)
652     }
653     if (my $connection_do   = $self->_on_disconnect_do_store) {
654       $self->_do_connection_actions(disconnect_call_ => $connection_do)
655     }
656
657     $self->_dbh->rollback unless $self->_dbh_autocommit;
658     $self->_dbh->disconnect;
659     $self->_dbh(undef);
660     $self->{_dbh_gen}++;
661   }
662 }
663
664 =head2 with_deferred_fk_checks
665
666 =over 4
667
668 =item Arguments: C<$coderef>
669
670 =item Return Value: The return value of $coderef
671
672 =back
673
674 Storage specific method to run the code ref with FK checks deferred or
675 in MySQL's case disabled entirely.
676
677 =cut
678
679 # Storage subclasses should override this
680 sub with_deferred_fk_checks {
681   my ($self, $sub) = @_;
682
683   $sub->();
684 }
685
686 sub connected {
687   my ($self) = @_;
688
689   if(my $dbh = $self->_dbh) {
690       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
691           $self->_dbh(undef);
692           $self->{_dbh_gen}++;
693           return;
694       }
695       else {
696           $self->_verify_pid;
697           return 0 if !$self->_dbh;
698       }
699       return ($dbh->FETCH('Active') && $dbh->ping);
700   }
701
702   return 0;
703 }
704
705 # handle pid changes correctly
706 #  NOTE: assumes $self->_dbh is a valid $dbh
707 sub _verify_pid {
708   my ($self) = @_;
709
710   return if defined $self->_conn_pid && $self->_conn_pid == $$;
711
712   $self->_dbh->{InactiveDestroy} = 1;
713   $self->_dbh(undef);
714   $self->{_dbh_gen}++;
715
716   return;
717 }
718
719 sub ensure_connected {
720   my ($self) = @_;
721
722   unless ($self->connected) {
723     $self->_populate_dbh;
724   }
725 }
726
727 =head2 dbh
728
729 Returns the dbh - a data base handle of class L<DBI>.
730
731 =cut
732
733 sub dbh {
734   my ($self) = @_;
735
736   $self->ensure_connected;
737   return $self->_dbh;
738 }
739
740 sub _sql_maker_args {
741     my ($self) = @_;
742     
743     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
744 }
745
746 sub sql_maker {
747   my ($self) = @_;
748   unless ($self->_sql_maker) {
749     my $sql_maker_class = $self->sql_maker_class;
750     $self->ensure_class_loaded ($sql_maker_class);
751     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
752   }
753   return $self->_sql_maker;
754 }
755
756 sub _rebless {}
757
758 sub _populate_dbh {
759   my ($self) = @_;
760   my @info = @{$self->_dbi_connect_info || []};
761   $self->_dbh($self->_connect(@info));
762
763   $self->_conn_pid($$);
764   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
765
766   $self->_determine_driver;
767
768   # Always set the transaction depth on connect, since
769   #  there is no transaction in progress by definition
770   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
771
772   if (my $connection_call = $self->on_connect_call) {
773     $self->_do_connection_actions(connect_call_ => $connection_call)
774   }
775   if (my $connection_do = $self->_on_connect_do_store) {
776     $self->_do_connection_actions(connect_call_ => $connection_do)
777   }
778 }
779
780 sub _determine_driver {
781   my ($self) = @_;
782
783   if (ref $self eq 'DBIx::Class::Storage::DBI') {
784     my $driver;
785
786     if ($self->_dbh) { # we are connected
787       $driver = $self->_dbh->{Driver}{Name};
788     } else {
789       # try to use dsn to not require being connected, the driver may still
790       # force a connection in _rebless to determine version
791       ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
792     }
793
794     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
795       bless $self, "DBIx::Class::Storage::DBI::${driver}";
796       $self->_rebless();
797     }
798   }
799 }
800
801 sub _do_connection_actions {
802   my $self          = shift;
803   my $method_prefix = shift;
804   my $call          = shift;
805
806   if (not ref($call)) {
807     my $method = $method_prefix . $call;
808     $self->$method(@_);
809   } elsif (ref($call) eq 'CODE') {
810     $self->$call(@_);
811   } elsif (ref($call) eq 'ARRAY') {
812     if (ref($call->[0]) ne 'ARRAY') {
813       $self->_do_connection_actions($method_prefix, $_) for @$call;
814     } else {
815       $self->_do_connection_actions($method_prefix, @$_) for @$call;
816     }
817   } else {
818     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
819   }
820
821   return $self;
822 }
823
824 sub connect_call_do_sql {
825   my $self = shift;
826   $self->_do_query(@_);
827 }
828
829 sub disconnect_call_do_sql {
830   my $self = shift;
831   $self->_do_query(@_);
832 }
833
834 # override in db-specific backend when necessary
835 sub connect_call_datetime_setup { 1 }
836
837 sub _do_query {
838   my ($self, $action) = @_;
839
840   if (ref $action eq 'CODE') {
841     $action = $action->($self);
842     $self->_do_query($_) foreach @$action;
843   }
844   else {
845     # Most debuggers expect ($sql, @bind), so we need to exclude
846     # the attribute hash which is the second argument to $dbh->do
847     # furthermore the bind values are usually to be presented
848     # as named arrayref pairs, so wrap those here too
849     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
850     my $sql = shift @do_args;
851     my $attrs = shift @do_args;
852     my @bind = map { [ undef, $_ ] } @do_args;
853
854     $self->_query_start($sql, @bind);
855     $self->_dbh->do($sql, $attrs, @do_args);
856     $self->_query_end($sql, @bind);
857   }
858
859   return $self;
860 }
861
862 sub _connect {
863   my ($self, @info) = @_;
864
865   $self->throw_exception("You failed to provide any connection info")
866     if !@info;
867
868   my ($old_connect_via, $dbh);
869
870   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
871     $old_connect_via = $DBI::connect_via;
872     $DBI::connect_via = 'connect';
873   }
874
875   eval {
876     if(ref $info[0] eq 'CODE') {
877        $dbh = &{$info[0]}
878     }
879     else {
880        $dbh = DBI->connect(@info);
881     }
882
883     if($dbh && !$self->unsafe) {
884       my $weak_self = $self;
885       Scalar::Util::weaken($weak_self);
886       $dbh->{HandleError} = sub {
887           if ($weak_self) {
888             $weak_self->throw_exception("DBI Exception: $_[0]");
889           }
890           else {
891             croak ("DBI Exception: $_[0]");
892           }
893       };
894       $dbh->{ShowErrorStatement} = 1;
895       $dbh->{RaiseError} = 1;
896       $dbh->{PrintError} = 0;
897     }
898   };
899
900   $DBI::connect_via = $old_connect_via if $old_connect_via;
901
902   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
903     if !$dbh || $@;
904
905   $self->_dbh_autocommit($dbh->{AutoCommit});
906
907   $dbh;
908 }
909
910 sub svp_begin {
911   my ($self, $name) = @_;
912
913   $name = $self->_svp_generate_name
914     unless defined $name;
915
916   $self->throw_exception ("You can't use savepoints outside a transaction")
917     if $self->{transaction_depth} == 0;
918
919   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
920     unless $self->can('_svp_begin');
921   
922   push @{ $self->{savepoints} }, $name;
923
924   $self->debugobj->svp_begin($name) if $self->debug;
925   
926   return $self->_svp_begin($name);
927 }
928
929 sub svp_release {
930   my ($self, $name) = @_;
931
932   $self->throw_exception ("You can't use savepoints outside a transaction")
933     if $self->{transaction_depth} == 0;
934
935   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
936     unless $self->can('_svp_release');
937
938   if (defined $name) {
939     $self->throw_exception ("Savepoint '$name' does not exist")
940       unless grep { $_ eq $name } @{ $self->{savepoints} };
941
942     # Dig through the stack until we find the one we are releasing.  This keeps
943     # the stack up to date.
944     my $svp;
945
946     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
947   } else {
948     $name = pop @{ $self->{savepoints} };
949   }
950
951   $self->debugobj->svp_release($name) if $self->debug;
952
953   return $self->_svp_release($name);
954 }
955
956 sub svp_rollback {
957   my ($self, $name) = @_;
958
959   $self->throw_exception ("You can't use savepoints outside a transaction")
960     if $self->{transaction_depth} == 0;
961
962   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
963     unless $self->can('_svp_rollback');
964
965   if (defined $name) {
966       # If they passed us a name, verify that it exists in the stack
967       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
968           $self->throw_exception("Savepoint '$name' does not exist!");
969       }
970
971       # Dig through the stack until we find the one we are releasing.  This keeps
972       # the stack up to date.
973       while(my $s = pop(@{ $self->{savepoints} })) {
974           last if($s eq $name);
975       }
976       # Add the savepoint back to the stack, as a rollback doesn't remove the
977       # named savepoint, only everything after it.
978       push(@{ $self->{savepoints} }, $name);
979   } else {
980       # We'll assume they want to rollback to the last savepoint
981       $name = $self->{savepoints}->[-1];
982   }
983
984   $self->debugobj->svp_rollback($name) if $self->debug;
985   
986   return $self->_svp_rollback($name);
987 }
988
989 sub _svp_generate_name {
990     my ($self) = @_;
991
992     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
993 }
994
995 sub txn_begin {
996   my $self = shift;
997   $self->ensure_connected();
998   if($self->{transaction_depth} == 0) {
999     $self->debugobj->txn_begin()
1000       if $self->debug;
1001     # this isn't ->_dbh-> because
1002     #  we should reconnect on begin_work
1003     #  for AutoCommit users
1004     $self->dbh->begin_work;
1005   } elsif ($self->auto_savepoint) {
1006     $self->svp_begin;
1007   }
1008   $self->{transaction_depth}++;
1009 }
1010
1011 sub txn_commit {
1012   my $self = shift;
1013   if ($self->{transaction_depth} == 1) {
1014     my $dbh = $self->_dbh;
1015     $self->debugobj->txn_commit()
1016       if ($self->debug);
1017     $dbh->commit;
1018     $self->{transaction_depth} = 0
1019       if $self->_dbh_autocommit;
1020   }
1021   elsif($self->{transaction_depth} > 1) {
1022     $self->{transaction_depth}--;
1023     $self->svp_release
1024       if $self->auto_savepoint;
1025   }
1026 }
1027
1028 sub txn_rollback {
1029   my $self = shift;
1030   my $dbh = $self->_dbh;
1031   eval {
1032     if ($self->{transaction_depth} == 1) {
1033       $self->debugobj->txn_rollback()
1034         if ($self->debug);
1035       $self->{transaction_depth} = 0
1036         if $self->_dbh_autocommit;
1037       $dbh->rollback;
1038     }
1039     elsif($self->{transaction_depth} > 1) {
1040       $self->{transaction_depth}--;
1041       if ($self->auto_savepoint) {
1042         $self->svp_rollback;
1043         $self->svp_release;
1044       }
1045     }
1046     else {
1047       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1048     }
1049   };
1050   if ($@) {
1051     my $error = $@;
1052     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1053     $error =~ /$exception_class/ and $self->throw_exception($error);
1054     # ensure that a failed rollback resets the transaction depth
1055     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1056     $self->throw_exception($error);
1057   }
1058 }
1059
1060 # This used to be the top-half of _execute.  It was split out to make it
1061 #  easier to override in NoBindVars without duping the rest.  It takes up
1062 #  all of _execute's args, and emits $sql, @bind.
1063 sub _prep_for_execute {
1064   my ($self, $op, $extra_bind, $ident, $args) = @_;
1065
1066   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1067     $ident = $ident->from();
1068   }
1069
1070   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1071
1072   unshift(@bind,
1073     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1074       if $extra_bind;
1075   return ($sql, \@bind);
1076 }
1077
1078
1079 sub _fix_bind_params {
1080     my ($self, @bind) = @_;
1081
1082     ### Turn @bind from something like this:
1083     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1084     ### to this:
1085     ###   ( "'1'", "'1'", "'3'" )
1086     return
1087         map {
1088             if ( defined( $_ && $_->[1] ) ) {
1089                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1090             }
1091             else { q{'NULL'}; }
1092         } @bind;
1093 }
1094
1095 sub _query_start {
1096     my ( $self, $sql, @bind ) = @_;
1097
1098     if ( $self->debug ) {
1099         @bind = $self->_fix_bind_params(@bind);
1100
1101         $self->debugobj->query_start( $sql, @bind );
1102     }
1103 }
1104
1105 sub _query_end {
1106     my ( $self, $sql, @bind ) = @_;
1107
1108     if ( $self->debug ) {
1109         @bind = $self->_fix_bind_params(@bind);
1110         $self->debugobj->query_end( $sql, @bind );
1111     }
1112 }
1113
1114 sub _dbh_execute {
1115   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1116
1117   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1118
1119   $self->_query_start( $sql, @$bind );
1120
1121   my $sth = $self->sth($sql,$op);
1122
1123   my $placeholder_index = 1; 
1124
1125   foreach my $bound (@$bind) {
1126     my $attributes = {};
1127     my($column_name, @data) = @$bound;
1128
1129     if ($bind_attributes) {
1130       $attributes = $bind_attributes->{$column_name}
1131       if defined $bind_attributes->{$column_name};
1132     }
1133
1134     foreach my $data (@data) {
1135       my $ref = ref $data;
1136       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1137
1138       $sth->bind_param($placeholder_index, $data, $attributes);
1139       $placeholder_index++;
1140     }
1141   }
1142
1143   # Can this fail without throwing an exception anyways???
1144   my $rv = $sth->execute();
1145   $self->throw_exception($sth->errstr) if !$rv;
1146
1147   $self->_query_end( $sql, @$bind );
1148
1149   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1150 }
1151
1152 sub _execute {
1153     my $self = shift;
1154     $self->dbh_do('_dbh_execute', @_)
1155 }
1156
1157 sub insert {
1158   my ($self, $source, $to_insert) = @_;
1159
1160   my $ident = $source->from;
1161   my $bind_attributes = $self->source_bind_attributes($source);
1162
1163   my $updated_cols = {};
1164
1165   $self->ensure_connected;
1166   foreach my $col ( $source->columns ) {
1167     if ( !defined $to_insert->{$col} ) {
1168       my $col_info = $source->column_info($col);
1169
1170       if ( $col_info->{auto_nextval} ) {
1171         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1172       }
1173     }
1174   }
1175
1176   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1177
1178   return $updated_cols;
1179 }
1180
1181 ## Still not quite perfect, and EXPERIMENTAL
1182 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1183 ## scalar refs, or at least, all the same type as the first set, the statement is
1184 ## only prepped once.
1185 sub insert_bulk {
1186   my ($self, $source, $cols, $data) = @_;
1187   my %colvalues;
1188   my $table = $source->from;
1189   @colvalues{@$cols} = (0..$#$cols);
1190   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1191   
1192   $self->_query_start( $sql, @bind );
1193   my $sth = $self->sth($sql);
1194
1195 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1196
1197   ## This must be an arrayref, else nothing works!
1198   my $tuple_status = [];
1199
1200   ## Get the bind_attributes, if any exist
1201   my $bind_attributes = $self->source_bind_attributes($source);
1202
1203   ## Bind the values and execute
1204   my $placeholder_index = 1; 
1205
1206   foreach my $bound (@bind) {
1207
1208     my $attributes = {};
1209     my ($column_name, $data_index) = @$bound;
1210
1211     if( $bind_attributes ) {
1212       $attributes = $bind_attributes->{$column_name}
1213       if defined $bind_attributes->{$column_name};
1214     }
1215
1216     my @data = map { $_->[$data_index] } @$data;
1217
1218     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1219     $placeholder_index++;
1220   }
1221   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1222   if (my $err = $@) {
1223     my $i = 0;
1224     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1225
1226     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1227       if ($i > $#$tuple_status);
1228
1229     require Data::Dumper;
1230     local $Data::Dumper::Terse = 1;
1231     local $Data::Dumper::Indent = 1;
1232     local $Data::Dumper::Useqq = 1;
1233     local $Data::Dumper::Quotekeys = 0;
1234
1235     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1236       $tuple_status->[$i][1],
1237       Data::Dumper::Dumper(
1238         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1239       ),
1240     );
1241   }
1242   $self->throw_exception($sth->errstr) if !$rv;
1243
1244   $self->_query_end( $sql, @bind );
1245   return (wantarray ? ($rv, $sth, @bind) : $rv);
1246 }
1247
1248 sub update {
1249   my $self = shift @_;
1250   my $source = shift @_;
1251   my $bind_attributes = $self->source_bind_attributes($source);
1252   
1253   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1254 }
1255
1256
1257 sub delete {
1258   my $self = shift @_;
1259   my $source = shift @_;
1260   
1261   my $bind_attrs = $self->source_bind_attributes($source);
1262   
1263   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1264 }
1265
1266 # We were sent here because the $rs contains a complex search
1267 # which will require a subquery to select the correct rows
1268 # (i.e. joined or limited resultsets)
1269 #
1270 # Genarating a single PK column subquery is trivial and supported
1271 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1272 # Look at _multipk_update_delete()
1273 sub _subq_update_delete {
1274   my $self = shift;
1275   my ($rs, $op, $values) = @_;
1276
1277   my $rsrc = $rs->result_source;
1278
1279   # we already check this, but double check naively just in case. Should be removed soon
1280   my $sel = $rs->_resolved_attrs->{select};
1281   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1282   my @pcols = $rsrc->primary_columns;
1283   if (@$sel != @pcols) {
1284     $self->throw_exception (
1285       'Subquery update/delete can not be called on resultsets selecting a'
1286      .' number of columns different than the number of primary keys'
1287     );
1288   }
1289
1290   if (@pcols == 1) {
1291     return $self->$op (
1292       $rsrc,
1293       $op eq 'update' ? $values : (),
1294       { $pcols[0] => { -in => $rs->as_query } },
1295     );
1296   }
1297
1298   else {
1299     return $self->_multipk_update_delete (@_);
1300   }
1301 }
1302
1303 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1304 # resultset update/delete involving subqueries. So by default resort
1305 # to simple (and inefficient) delete_all style per-row opearations,
1306 # while allowing specific storages to override this with a faster
1307 # implementation.
1308 #
1309 sub _multipk_update_delete {
1310   return shift->_per_row_update_delete (@_);
1311 }
1312
1313 # This is the default loop used to delete/update rows for multi PK
1314 # resultsets, and used by mysql exclusively (because it can't do anything
1315 # else).
1316 #
1317 # We do not use $row->$op style queries, because resultset update/delete
1318 # is not expected to cascade (this is what delete_all/update_all is for).
1319 #
1320 # There should be no race conditions as the entire operation is rolled
1321 # in a transaction.
1322 #
1323 sub _per_row_update_delete {
1324   my $self = shift;
1325   my ($rs, $op, $values) = @_;
1326
1327   my $rsrc = $rs->result_source;
1328   my @pcols = $rsrc->primary_columns;
1329
1330   my $guard = $self->txn_scope_guard;
1331
1332   # emulate the return value of $sth->execute for non-selects
1333   my $row_cnt = '0E0';
1334
1335   my $subrs_cur = $rs->cursor;
1336   while (my @pks = $subrs_cur->next) {
1337
1338     my $cond;
1339     for my $i (0.. $#pcols) {
1340       $cond->{$pcols[$i]} = $pks[$i];
1341     }
1342
1343     $self->$op (
1344       $rsrc,
1345       $op eq 'update' ? $values : (),
1346       $cond,
1347     );
1348
1349     $row_cnt++;
1350   }
1351
1352   $guard->commit;
1353
1354   return $row_cnt;
1355 }
1356
1357 sub _select {
1358   my $self = shift;
1359
1360   # localization is neccessary as
1361   # 1) there is no infrastructure to pass this around (easy to do, but will wait)
1362   # 2) _select_args sets it and _prep_for_execute consumes it
1363   my $sql_maker = $self->sql_maker;
1364   local $sql_maker->{for};
1365
1366   return $self->_execute($self->_select_args(@_));
1367 }
1368
1369 sub _select_args_to_query {
1370   my $self = shift;
1371
1372   # localization is neccessary as
1373   # 1) there is no infrastructure to pass this around (easy to do, but will wait)
1374   # 2) _select_args sets it and _prep_for_execute consumes it
1375   my $sql_maker = $self->sql_maker;
1376   local $sql_maker->{for};
1377
1378   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1379   #  = $self->_select_args($ident, $select, $cond, $attrs);
1380   my ($op, $bind, $ident, $bind_attrs, @args) =
1381     $self->_select_args(@_);
1382
1383   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1384   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1385   $prepared_bind ||= [];
1386
1387   return wantarray
1388     ? ($sql, $prepared_bind, $bind_attrs)
1389     : \[ "($sql)", @$prepared_bind ]
1390   ;
1391 }
1392
1393 sub _select_args {
1394   my ($self, $ident, $select, $where, $attrs) = @_;
1395
1396   my $sql_maker = $self->sql_maker;
1397   my $alias2source = $self->_resolve_ident_sources ($ident);
1398
1399   # calculate bind_attrs before possible $ident mangling
1400   my $bind_attrs = {};
1401   for my $alias (keys %$alias2source) {
1402     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1403     for my $col (keys %$bindtypes) {
1404
1405       my $fqcn = join ('.', $alias, $col);
1406       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1407
1408       # so that unqualified searches can be bound too
1409       $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq 'me';
1410     }
1411   }
1412
1413   my @limit;
1414   if ($attrs->{software_limit} ||
1415       $sql_maker->_default_limit_syntax eq "GenericSubQ") {
1416         $attrs->{software_limit} = 1;
1417   } else {
1418     $self->throw_exception("rows attribute must be positive if present")
1419       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1420
1421     # MySQL actually recommends this approach.  I cringe.
1422     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1423
1424     if ($attrs->{rows} && keys %{$attrs->{collapse}}) {
1425       ($ident, $select, $where, $attrs)
1426         = $self->_adjust_select_args_for_limited_prefetch ($ident, $select, $where, $attrs);
1427     }
1428     else {
1429       push @limit, $attrs->{rows}, $attrs->{offset};
1430     }
1431   }
1432
1433 ###
1434   # This would be the point to deflate anything found in $where
1435   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1436   # expect a row object. And all we have is a resultsource (it is trivial
1437   # to extract deflator coderefs via $alias2source above).
1438   #
1439   # I don't see a way forward other than changing the way deflators are
1440   # invoked, and that's just bad...
1441 ###
1442
1443   my $order = { map
1444     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1445     (qw/order_by group_by having _virtual_order_by/ )
1446   };
1447
1448
1449   $sql_maker->{for} = delete $attrs->{for};
1450
1451   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1452 }
1453
1454 sub _adjust_select_args_for_limited_prefetch {
1455   my ($self, $from, $select, $where, $attrs) = @_;
1456
1457   if ($attrs->{group_by} and @{$attrs->{group_by}}) {
1458     $self->throw_exception ('Prefetch with limit (rows/offset) is not supported on resultsets with a group_by attribute');
1459   }
1460
1461   $self->throw_exception ('Prefetch with limit (rows/offset) is not supported on resultsets with a custom from attribute')
1462     if (ref $from ne 'ARRAY');
1463
1464   # separate attributes
1465   my $sub_attrs = { %$attrs };
1466   delete $attrs->{$_} for qw/where bind rows offset/;
1467   delete $sub_attrs->{$_} for qw/for collapse select order_by/;
1468
1469   my $alias = $attrs->{alias};
1470
1471   # create subquery select list
1472   my $sub_select = [ grep { $_ =~ /^$alias\./ } @{$attrs->{select}} ];
1473
1474   # bring over all non-collapse-induced order_by into the inner query (if any)
1475   # the outer one will have to keep them all
1476   if (my $ord_cnt = @{$attrs->{order_by}} - @{$attrs->{_collapse_order_by}} ) {
1477     $sub_attrs->{order_by} = [
1478       @{$attrs->{order_by}}[ 0 .. ($#{$attrs->{order_by}} - $ord_cnt - 1) ]
1479     ];
1480   }
1481
1482
1483   # mangle the head of the {from}
1484   my $self_ident = shift @$from;
1485
1486   my %join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1487
1488   my (%inner_joins);
1489
1490   # decide which parts of the join will remain on the inside
1491   #
1492   # this is not a very viable optimisation, but it was written
1493   # before I realised this, so might as well remain. We can throw
1494   # away _any_ branches of the join tree that are:
1495   # 1) not mentioned in the condition/order
1496   # 2) left-join leaves (or left-join leaf chains)
1497   # Most of the join ocnditions will not satisfy this, but for real
1498   # complex queries some might, and we might make some RDBMS happy.
1499   #
1500   #
1501   # since we do not have introspectable SQLA, we fall back to ugly
1502   # scanning of raw SQL for WHERE, and for pieces of ORDER BY
1503   # in order to determine what goes into %inner_joins
1504   # It may not be very efficient, but it's a reasonable stop-gap
1505   {
1506     # produce stuff unquoted, so it can be scanned
1507     my $sql_maker = $self->sql_maker;
1508     local $sql_maker->{quote_char};
1509
1510     my @order_by = (map
1511       { ref $_ ? $_->[0] : $_ }
1512       $sql_maker->_order_by_chunks ($sub_attrs->{order_by})
1513     );
1514
1515     my $where_sql = $sql_maker->where ($where);
1516
1517     # sort needed joins
1518     for my $alias (keys %join_info) {
1519
1520       # any table alias found on a column name in where or order_by
1521       # gets included in %inner_joins
1522       # Also any parent joins that are needed to reach this particular alias
1523       for my $piece ($where_sql, @order_by ) {
1524         if ($piece =~ /\b$alias\./) {
1525           $inner_joins{$alias} = 1;
1526         }
1527       }
1528     }
1529   }
1530
1531   # scan for non-leaf/non-left joins and mark as needed
1532   # also mark all ancestor joins that are needed to reach this particular alias
1533   # (e.g.  join => { cds => 'tracks' } - tracks will bring cds too )
1534   #
1535   # traverse by the size of the -join_path i.e. reverse depth first
1536   for my $alias (sort { @{$join_info{$b}{-join_path}} <=> @{$join_info{$a}{-join_path}} } (keys %join_info) ) {
1537
1538     my $j = $join_info{$alias};
1539     $inner_joins{$alias} = 1 if (! $j->{-join_type} || ($j->{-join_type} !~ /^left$/i) );
1540
1541     if ($inner_joins{$alias}) {
1542       $inner_joins{$_} = 1 for (@{$j->{-join_path}});
1543     }
1544   }
1545
1546   # construct the inner $from for the subquery
1547   my $inner_from = [ $self_ident ];
1548   if (keys %inner_joins) {
1549     for my $j (@$from) {
1550       push @$inner_from, $j if $inner_joins{$j->[0]{-alias}};
1551     }
1552
1553     # if a multi-type join was needed in the subquery ("multi" is indicated by
1554     # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1555     for my $alias (keys %inner_joins) {
1556
1557       # the dot comes from some weirdness in collapse
1558       # remove after the rewrite
1559       if ($attrs->{collapse}{".$alias"}) {
1560         $sub_attrs->{group_by} = $sub_select;
1561         last;
1562       }
1563     }
1564   }
1565
1566   # generate the subquery
1567   my $subq = $self->_select_args_to_query (
1568     $inner_from,
1569     $sub_select,
1570     $where,
1571     $sub_attrs
1572   );
1573
1574   # put it back in $from
1575   unshift @$from, { $alias => $subq };
1576
1577   # This is totally horrific - the $where ends up in both the inner and outer query
1578   # Unfortunately not much can be done until SQLA2 introspection arrives
1579   #
1580   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1581   return ($from, $select, $where, $attrs);
1582 }
1583
1584 sub _resolve_ident_sources {
1585   my ($self, $ident) = @_;
1586
1587   my $alias2source = {};
1588
1589   # the reason this is so contrived is that $ident may be a {from}
1590   # structure, specifying multiple tables to join
1591   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1592     # this is compat mode for insert/update/delete which do not deal with aliases
1593     $alias2source->{me} = $ident;
1594   }
1595   elsif (ref $ident eq 'ARRAY') {
1596
1597     for (@$ident) {
1598       my $tabinfo;
1599       if (ref $_ eq 'HASH') {
1600         $tabinfo = $_;
1601       }
1602       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1603         $tabinfo = $_->[0];
1604       }
1605
1606       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1607         if ($tabinfo->{-source_handle});
1608     }
1609   }
1610
1611   return $alias2source;
1612 }
1613
1614 sub count {
1615   my ($self, $source, $attrs) = @_;
1616
1617   my $tmp_attrs = { %$attrs };
1618
1619   # take off any limits, record_filter is cdbi, and no point of ordering a count
1620   delete $tmp_attrs->{$_} for (qw/select as rows offset order_by record_filter/);
1621
1622   # overwrite the selector
1623   $tmp_attrs->{select} = { count => '*' };
1624
1625   my $tmp_rs = $source->resultset_class->new($source, $tmp_attrs);
1626   my ($count) = $tmp_rs->cursor->next;
1627
1628   # if the offset/rows attributes are still present, we did not use
1629   # a subquery, so we need to make the calculations in software
1630   $count -= $attrs->{offset} if $attrs->{offset};
1631   $count = $attrs->{rows} if $attrs->{rows} and $attrs->{rows} < $count;
1632   $count = 0 if ($count < 0);
1633
1634   return $count;
1635 }
1636
1637 sub count_grouped {
1638   my ($self, $source, $attrs) = @_;
1639
1640   # copy for the subquery, we need to do some adjustments to it too
1641   my $sub_attrs = { %$attrs };
1642
1643   # these can not go in the subquery, and there is no point of ordering it
1644   delete $sub_attrs->{$_} for qw/collapse select as order_by/;
1645
1646   # if we prefetch, we group_by primary keys only as this is what we would get out of the rs via ->next/->all
1647   # simply deleting group_by suffices, as the code below will re-fill it
1648   # Note: we check $attrs, as $sub_attrs has collapse deleted
1649   if (ref $attrs->{collapse} and keys %{$attrs->{collapse}} ) {
1650     delete $sub_attrs->{group_by};
1651   }
1652
1653   $sub_attrs->{group_by} ||= [ map { "$attrs->{alias}.$_" } ($source->primary_columns) ];
1654   $sub_attrs->{select} = $self->_grouped_count_select ($source, $sub_attrs);
1655
1656   $attrs->{from} = [{
1657     count_subq => $source->resultset_class->new ($source, $sub_attrs )->as_query
1658   }];
1659
1660   # the subquery replaces this
1661   delete $attrs->{$_} for qw/where bind collapse group_by having having_bind rows offset/;
1662
1663   return $self->count ($source, $attrs);
1664 }
1665
1666 #
1667 # Returns a SELECT to go with a supplied GROUP BY
1668 # (caled by count_grouped so a group_by is present)
1669 # Most databases expect them to match, but some
1670 # choke in various ways.
1671 #
1672 sub _grouped_count_select {
1673   my ($self, $source, $rs_args) = @_;
1674   return $rs_args->{group_by};
1675 }
1676
1677 sub source_bind_attributes {
1678   my ($self, $source) = @_;
1679   
1680   my $bind_attributes;
1681   foreach my $column ($source->columns) {
1682   
1683     my $data_type = $source->column_info($column)->{data_type} || '';
1684     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1685      if $data_type;
1686   }
1687
1688   return $bind_attributes;
1689 }
1690
1691 =head2 select
1692
1693 =over 4
1694
1695 =item Arguments: $ident, $select, $condition, $attrs
1696
1697 =back
1698
1699 Handle a SQL select statement.
1700
1701 =cut
1702
1703 sub select {
1704   my $self = shift;
1705   my ($ident, $select, $condition, $attrs) = @_;
1706   return $self->cursor_class->new($self, \@_, $attrs);
1707 }
1708
1709 sub select_single {
1710   my $self = shift;
1711   my ($rv, $sth, @bind) = $self->_select(@_);
1712   my @row = $sth->fetchrow_array;
1713   my @nextrow = $sth->fetchrow_array if @row;
1714   if(@row && @nextrow) {
1715     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1716   }
1717   # Need to call finish() to work round broken DBDs
1718   $sth->finish();
1719   return @row;
1720 }
1721
1722 =head2 sth
1723
1724 =over 4
1725
1726 =item Arguments: $sql
1727
1728 =back
1729
1730 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1731
1732 =cut
1733
1734 sub _dbh_sth {
1735   my ($self, $dbh, $sql) = @_;
1736
1737   # 3 is the if_active parameter which avoids active sth re-use
1738   my $sth = $self->disable_sth_caching
1739     ? $dbh->prepare($sql)
1740     : $dbh->prepare_cached($sql, {}, 3);
1741
1742   # XXX You would think RaiseError would make this impossible,
1743   #  but apparently that's not true :(
1744   $self->throw_exception($dbh->errstr) if !$sth;
1745
1746   $sth;
1747 }
1748
1749 sub sth {
1750   my ($self, $sql) = @_;
1751   $self->dbh_do('_dbh_sth', $sql);
1752 }
1753
1754 sub _dbh_columns_info_for {
1755   my ($self, $dbh, $table) = @_;
1756
1757   if ($dbh->can('column_info')) {
1758     my %result;
1759     eval {
1760       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1761       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1762       $sth->execute();
1763       while ( my $info = $sth->fetchrow_hashref() ){
1764         my %column_info;
1765         $column_info{data_type}   = $info->{TYPE_NAME};
1766         $column_info{size}      = $info->{COLUMN_SIZE};
1767         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1768         $column_info{default_value} = $info->{COLUMN_DEF};
1769         my $col_name = $info->{COLUMN_NAME};
1770         $col_name =~ s/^\"(.*)\"$/$1/;
1771
1772         $result{$col_name} = \%column_info;
1773       }
1774     };
1775     return \%result if !$@ && scalar keys %result;
1776   }
1777
1778   my %result;
1779   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1780   $sth->execute;
1781   my @columns = @{$sth->{NAME_lc}};
1782   for my $i ( 0 .. $#columns ){
1783     my %column_info;
1784     $column_info{data_type} = $sth->{TYPE}->[$i];
1785     $column_info{size} = $sth->{PRECISION}->[$i];
1786     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1787
1788     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1789       $column_info{data_type} = $1;
1790       $column_info{size}    = $2;
1791     }
1792
1793     $result{$columns[$i]} = \%column_info;
1794   }
1795   $sth->finish;
1796
1797   foreach my $col (keys %result) {
1798     my $colinfo = $result{$col};
1799     my $type_num = $colinfo->{data_type};
1800     my $type_name;
1801     if(defined $type_num && $dbh->can('type_info')) {
1802       my $type_info = $dbh->type_info($type_num);
1803       $type_name = $type_info->{TYPE_NAME} if $type_info;
1804       $colinfo->{data_type} = $type_name if $type_name;
1805     }
1806   }
1807
1808   return \%result;
1809 }
1810
1811 sub columns_info_for {
1812   my ($self, $table) = @_;
1813   $self->dbh_do('_dbh_columns_info_for', $table);
1814 }
1815
1816 =head2 last_insert_id
1817
1818 Return the row id of the last insert.
1819
1820 =cut
1821
1822 sub _dbh_last_insert_id {
1823     # All Storage's need to register their own _dbh_last_insert_id
1824     # the old SQLite-based method was highly inappropriate
1825
1826     my $self = shift;
1827     my $class = ref $self;
1828     $self->throw_exception (<<EOE);
1829
1830 No _dbh_last_insert_id() method found in $class.
1831 Since the method of obtaining the autoincrement id of the last insert
1832 operation varies greatly between different databases, this method must be
1833 individually implemented for every storage class.
1834 EOE
1835 }
1836
1837 sub last_insert_id {
1838   my $self = shift;
1839   $self->dbh_do('_dbh_last_insert_id', @_);
1840 }
1841
1842 =head2 sqlt_type
1843
1844 Returns the database driver name.
1845
1846 =cut
1847
1848 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1849
1850 =head2 bind_attribute_by_data_type
1851
1852 Given a datatype from column info, returns a database specific bind
1853 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1854 let the database planner just handle it.
1855
1856 Generally only needed for special case column types, like bytea in postgres.
1857
1858 =cut
1859
1860 sub bind_attribute_by_data_type {
1861     return;
1862 }
1863
1864 =head2 is_datatype_numeric
1865
1866 Given a datatype from column_info, returns a boolean value indicating if
1867 the current RDBMS considers it a numeric value. This controls how
1868 L<DBIx::Class::Row/set_column> decides whether to mark the column as
1869 dirty - when the datatype is deemed numeric a C<< != >> comparison will
1870 be performed instead of the usual C<eq>.
1871
1872 =cut
1873
1874 sub is_datatype_numeric {
1875   my ($self, $dt) = @_;
1876
1877   return 0 unless $dt;
1878
1879   return $dt =~ /^ (?:
1880     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
1881   ) $/ix;
1882 }
1883
1884
1885 =head2 create_ddl_dir (EXPERIMENTAL)
1886
1887 =over 4
1888
1889 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1890
1891 =back
1892
1893 Creates a SQL file based on the Schema, for each of the specified
1894 database engines in C<\@databases> in the given directory.
1895 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
1896
1897 Given a previous version number, this will also create a file containing
1898 the ALTER TABLE statements to transform the previous schema into the
1899 current one. Note that these statements may contain C<DROP TABLE> or
1900 C<DROP COLUMN> statements that can potentially destroy data.
1901
1902 The file names are created using the C<ddl_filename> method below, please
1903 override this method in your schema if you would like a different file
1904 name format. For the ALTER file, the same format is used, replacing
1905 $version in the name with "$preversion-$version".
1906
1907 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
1908 The most common value for this would be C<< { add_drop_table => 1 } >>
1909 to have the SQL produced include a C<DROP TABLE> statement for each table
1910 created. For quoting purposes supply C<quote_table_names> and
1911 C<quote_field_names>.
1912
1913 If no arguments are passed, then the following default values are assumed:
1914
1915 =over 4
1916
1917 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
1918
1919 =item version    - $schema->schema_version
1920
1921 =item directory  - './'
1922
1923 =item preversion - <none>
1924
1925 =back
1926
1927 By default, C<\%sqlt_args> will have
1928
1929  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1930
1931 merged with the hash passed in. To disable any of those features, pass in a 
1932 hashref like the following
1933
1934  { ignore_constraint_names => 0, # ... other options }
1935
1936
1937 Note that this feature is currently EXPERIMENTAL and may not work correctly 
1938 across all databases, or fully handle complex relationships.
1939
1940 WARNING: Please check all SQL files created, before applying them.
1941
1942 =cut
1943
1944 sub create_ddl_dir {
1945   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1946
1947   if(!$dir || !-d $dir) {
1948     carp "No directory given, using ./\n";
1949     $dir = "./";
1950   }
1951   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1952   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1953
1954   my $schema_version = $schema->schema_version || '1.x';
1955   $version ||= $schema_version;
1956
1957   $sqltargs = {
1958     add_drop_table => 1, 
1959     ignore_constraint_names => 1,
1960     ignore_index_names => 1,
1961     %{$sqltargs || {}}
1962   };
1963
1964   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1965       . $self->_check_sqlt_message . q{'})
1966           if !$self->_check_sqlt_version;
1967
1968   my $sqlt = SQL::Translator->new( $sqltargs );
1969
1970   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1971   my $sqlt_schema = $sqlt->translate({ data => $schema })
1972     or $self->throw_exception ($sqlt->error);
1973
1974   foreach my $db (@$databases) {
1975     $sqlt->reset();
1976     $sqlt->{schema} = $sqlt_schema;
1977     $sqlt->producer($db);
1978
1979     my $file;
1980     my $filename = $schema->ddl_filename($db, $version, $dir);
1981     if (-e $filename && ($version eq $schema_version )) {
1982       # if we are dumping the current version, overwrite the DDL
1983       carp "Overwriting existing DDL file - $filename";
1984       unlink($filename);
1985     }
1986
1987     my $output = $sqlt->translate;
1988     if(!$output) {
1989       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1990       next;
1991     }
1992     if(!open($file, ">$filename")) {
1993       $self->throw_exception("Can't open $filename for writing ($!)");
1994       next;
1995     }
1996     print $file $output;
1997     close($file);
1998   
1999     next unless ($preversion);
2000
2001     require SQL::Translator::Diff;
2002
2003     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2004     if(!-e $prefilename) {
2005       carp("No previous schema file found ($prefilename)");
2006       next;
2007     }
2008
2009     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2010     if(-e $difffile) {
2011       carp("Overwriting existing diff file - $difffile");
2012       unlink($difffile);
2013     }
2014     
2015     my $source_schema;
2016     {
2017       my $t = SQL::Translator->new($sqltargs);
2018       $t->debug( 0 );
2019       $t->trace( 0 );
2020
2021       $t->parser( $db )
2022         or $self->throw_exception ($t->error);
2023
2024       my $out = $t->translate( $prefilename )
2025         or $self->throw_exception ($t->error);
2026
2027       $source_schema = $t->schema;
2028
2029       $source_schema->name( $prefilename )
2030         unless ( $source_schema->name );
2031     }
2032
2033     # The "new" style of producers have sane normalization and can support 
2034     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2035     # And we have to diff parsed SQL against parsed SQL.
2036     my $dest_schema = $sqlt_schema;
2037
2038     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2039       my $t = SQL::Translator->new($sqltargs);
2040       $t->debug( 0 );
2041       $t->trace( 0 );
2042
2043       $t->parser( $db )
2044         or $self->throw_exception ($t->error);
2045
2046       my $out = $t->translate( $filename )
2047         or $self->throw_exception ($t->error);
2048
2049       $dest_schema = $t->schema;
2050
2051       $dest_schema->name( $filename )
2052         unless $dest_schema->name;
2053     }
2054     
2055     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2056                                                   $dest_schema,   $db,
2057                                                   $sqltargs
2058                                                  );
2059     if(!open $file, ">$difffile") { 
2060       $self->throw_exception("Can't write to $difffile ($!)");
2061       next;
2062     }
2063     print $file $diff;
2064     close($file);
2065   }
2066 }
2067
2068 =head2 deployment_statements
2069
2070 =over 4
2071
2072 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2073
2074 =back
2075
2076 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2077
2078 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2079 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2080
2081 C<$directory> is used to return statements from files in a previously created
2082 L</create_ddl_dir> directory and is optional. The filenames are constructed
2083 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2084
2085 If no C<$directory> is specified then the statements are constructed on the
2086 fly using L<SQL::Translator> and C<$version> is ignored.
2087
2088 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2089
2090 =cut
2091
2092 sub deployment_statements {
2093   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2094   # Need to be connected to get the correct sqlt_type
2095   $self->ensure_connected() unless $type;
2096   $type ||= $self->sqlt_type;
2097   $version ||= $schema->schema_version || '1.x';
2098   $dir ||= './';
2099   my $filename = $schema->ddl_filename($type, $version, $dir);
2100   if(-f $filename)
2101   {
2102       my $file;
2103       open($file, "<$filename") 
2104         or $self->throw_exception("Can't open $filename ($!)");
2105       my @rows = <$file>;
2106       close($file);
2107       return join('', @rows);
2108   }
2109
2110   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
2111       . $self->_check_sqlt_message . q{'})
2112           if !$self->_check_sqlt_version;
2113
2114   require SQL::Translator::Parser::DBIx::Class;
2115   eval qq{use SQL::Translator::Producer::${type}};
2116   $self->throw_exception($@) if $@;
2117
2118   # sources needs to be a parser arg, but for simplicty allow at top level 
2119   # coming in
2120   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2121       if exists $sqltargs->{sources};
2122
2123   my $tr = SQL::Translator->new(%$sqltargs);
2124   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
2125   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
2126 }
2127
2128 sub deploy {
2129   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2130   my $deploy = sub {
2131     my $line = shift;
2132     return if($line =~ /^--/);
2133     return if(!$line);
2134     # next if($line =~ /^DROP/m);
2135     return if($line =~ /^BEGIN TRANSACTION/m);
2136     return if($line =~ /^COMMIT/m);
2137     return if $line =~ /^\s+$/; # skip whitespace only
2138     $self->_query_start($line);
2139     eval {
2140       $self->dbh->do($line); # shouldn't be using ->dbh ?
2141     };
2142     if ($@) {
2143       carp qq{$@ (running "${line}")};
2144     }
2145     $self->_query_end($line);
2146   };
2147   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2148   if (@statements > 1) {
2149     foreach my $statement (@statements) {
2150       $deploy->( $statement );
2151     }
2152   }
2153   elsif (@statements == 1) {
2154     foreach my $line ( split(";\n", $statements[0])) {
2155       $deploy->( $line );
2156     }
2157   }
2158 }
2159
2160 =head2 datetime_parser
2161
2162 Returns the datetime parser class
2163
2164 =cut
2165
2166 sub datetime_parser {
2167   my $self = shift;
2168   return $self->{datetime_parser} ||= do {
2169     $self->ensure_connected;
2170     $self->build_datetime_parser(@_);
2171   };
2172 }
2173
2174 =head2 datetime_parser_type
2175
2176 Defines (returns) the datetime parser class - currently hardwired to
2177 L<DateTime::Format::MySQL>
2178
2179 =cut
2180
2181 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2182
2183 =head2 build_datetime_parser
2184
2185 See L</datetime_parser>
2186
2187 =cut
2188
2189 sub build_datetime_parser {
2190   my $self = shift;
2191   my $type = $self->datetime_parser_type(@_);
2192   eval "use ${type}";
2193   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2194   return $type;
2195 }
2196
2197 {
2198     my $_check_sqlt_version; # private
2199     my $_check_sqlt_message; # private
2200     sub _check_sqlt_version {
2201         return $_check_sqlt_version if defined $_check_sqlt_version;
2202         eval 'use SQL::Translator "0.09003"';
2203         $_check_sqlt_message = $@ || '';
2204         $_check_sqlt_version = !$@;
2205     }
2206
2207     sub _check_sqlt_message {
2208         _check_sqlt_version if !defined $_check_sqlt_message;
2209         $_check_sqlt_message;
2210     }
2211 }
2212
2213 =head2 is_replicating
2214
2215 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2216 replicate from a master database.  Default is undef, which is the result
2217 returned by databases that don't support replication.
2218
2219 =cut
2220
2221 sub is_replicating {
2222     return;
2223     
2224 }
2225
2226 =head2 lag_behind_master
2227
2228 Returns a number that represents a certain amount of lag behind a master db
2229 when a given storage is replicating.  The number is database dependent, but
2230 starts at zero and increases with the amount of lag. Default in undef
2231
2232 =cut
2233
2234 sub lag_behind_master {
2235     return;
2236 }
2237
2238 sub DESTROY {
2239   my $self = shift;
2240   return if !$self->_dbh;
2241   $self->_verify_pid;
2242   $self->_dbh(undef);
2243 }
2244
2245 1;
2246
2247 =head1 USAGE NOTES
2248
2249 =head2 DBIx::Class and AutoCommit
2250
2251 DBIx::Class can do some wonderful magic with handling exceptions,
2252 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2253 combined with C<txn_do> for transaction support.
2254
2255 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2256 in an assumed transaction between commits, and you're telling us you'd
2257 like to manage that manually.  A lot of the magic protections offered by
2258 this module will go away.  We can't protect you from exceptions due to database
2259 disconnects because we don't know anything about how to restart your
2260 transactions.  You're on your own for handling all sorts of exceptional
2261 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2262 be with raw DBI.
2263
2264
2265
2266 =head1 AUTHORS
2267
2268 Matt S. Trout <mst@shadowcatsystems.co.uk>
2269
2270 Andy Grundman <andy@hybridized.org>
2271
2272 =head1 LICENSE
2273
2274 You may distribute this code under the same terms as Perl itself.
2275
2276 =cut