silly misspells and trailing whitespace
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use Scalar::Util();
13 use List::Util();
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17      _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
24   disable_sth_caching unsafe auto_savepoint
25 /;
26 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
27
28
29 # default cursor class, overridable in connect_info attributes
30 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
31
32 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
33 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
34
35
36 =head1 NAME
37
38 DBIx::Class::Storage::DBI - DBI storage handler
39
40 =head1 SYNOPSIS
41
42   my $schema = MySchema->connect('dbi:SQLite:my.db');
43
44   $schema->storage->debug(1);
45   $schema->dbh_do("DROP TABLE authors");
46
47   $schema->resultset('Book')->search({
48      written_on => $schema->storage->datetime_parser(DateTime->now)
49   });
50
51 =head1 DESCRIPTION
52
53 This class represents the connection to an RDBMS via L<DBI>.  See
54 L<DBIx::Class::Storage> for general information.  This pod only
55 documents DBI-specific methods and behaviors.
56
57 =head1 METHODS
58
59 =cut
60
61 sub new {
62   my $new = shift->next::method(@_);
63
64   $new->transaction_depth(0);
65   $new->_sql_maker_opts({});
66   $new->{savepoints} = [];
67   $new->{_in_dbh_do} = 0;
68   $new->{_dbh_gen} = 0;
69
70   $new;
71 }
72
73 =head2 connect_info
74
75 This method is normally called by L<DBIx::Class::Schema/connection>, which
76 encapsulates its argument list in an arrayref before passing them here.
77
78 The argument list may contain:
79
80 =over
81
82 =item *
83
84 The same 4-element argument set one would normally pass to
85 L<DBI/connect>, optionally followed by
86 L<extra attributes|/DBIx::Class specific connection attributes>
87 recognized by DBIx::Class:
88
89   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
90
91 =item *
92
93 A single code reference which returns a connected
94 L<DBI database handle|DBI/connect> optionally followed by
95 L<extra attributes|/DBIx::Class specific connection attributes> recognized
96 by DBIx::Class:
97
98   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
99
100 =item *
101
102 A single hashref with all the attributes and the dsn/user/password
103 mixed together:
104
105   $connect_info_args = [{
106     dsn => $dsn,
107     user => $user,
108     password => $pass,
109     %dbi_attributes,
110     %extra_attributes,
111   }];
112
113 This is particularly useful for L<Catalyst> based applications, allowing the
114 following config (L<Config::General> style):
115
116   <Model::DB>
117     schema_class   App::DB
118     <connect_info>
119       dsn          dbi:mysql:database=test
120       user         testuser
121       password     TestPass
122       AutoCommit   1
123     </connect_info>
124   </Model::DB>
125
126 =back
127
128 Please note that the L<DBI> docs recommend that you always explicitly
129 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
130 recommends that it be set to I<1>, and that you perform transactions
131 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
132 to I<1> if you do not do explicitly set it to zero.  This is the default
133 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
134
135 =head3 DBIx::Class specific connection attributes
136
137 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
138 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
139 the following connection options. These options can be mixed in with your other
140 L<DBI> connection attributes, or placed in a seperate hashref
141 (C<\%extra_attributes>) as shown above.
142
143 Every time C<connect_info> is invoked, any previous settings for
144 these options will be cleared before setting the new ones, regardless of
145 whether any options are specified in the new C<connect_info>.
146
147
148 =over
149
150 =item on_connect_do
151
152 Specifies things to do immediately after connecting or re-connecting to
153 the database.  Its value may contain:
154
155 =over
156
157 =item a scalar
158
159 This contains one SQL statement to execute.
160
161 =item an array reference
162
163 This contains SQL statements to execute in order.  Each element contains
164 a string or a code reference that returns a string.
165
166 =item a code reference
167
168 This contains some code to execute.  Unlike code references within an
169 array reference, its return value is ignored.
170
171 =back
172
173 =item on_disconnect_do
174
175 Takes arguments in the same form as L</on_connect_do> and executes them
176 immediately before disconnecting from the database.
177
178 Note, this only runs if you explicitly call L</disconnect> on the
179 storage object.
180
181 =item on_connect_call
182
183 A more generalized form of L</on_connect_do> that calls the specified
184 C<connect_call_METHOD> methods in your storage driver.
185
186   on_connect_do => 'select 1'
187
188 is equivalent to:
189
190   on_connect_call => [ [ do_sql => 'select 1' ] ]
191
192 Its values may contain:
193
194 =over
195
196 =item a scalar
197
198 Will call the C<connect_call_METHOD> method.
199
200 =item a code reference
201
202 Will execute C<< $code->($storage) >>
203
204 =item an array reference
205
206 Each value can be a method name or code reference.
207
208 =item an array of arrays
209
210 For each array, the first item is taken to be the C<connect_call_> method name
211 or code reference, and the rest are parameters to it.
212
213 =back
214
215 Some predefined storage methods you may use:
216
217 =over
218
219 =item do_sql
220
221 Executes a SQL string or a code reference that returns a SQL string. This is
222 what L</on_connect_do> and L</on_disconnect_do> use.
223
224 It can take:
225
226 =over
227
228 =item a scalar
229
230 Will execute the scalar as SQL.
231
232 =item an arrayref
233
234 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
235 attributes hashref and bind values.
236
237 =item a code reference
238
239 Will execute C<< $code->($storage) >> and execute the return array refs as
240 above.
241
242 =back
243
244 =item datetime_setup
245
246 Execute any statements necessary to initialize the database session to return
247 and accept datetime/timestamp values used with
248 L<DBIx::Class::InflateColumn::DateTime>.
249
250 Only necessary for some databases, see your specific storage driver for
251 implementation details.
252
253 =back
254
255 =item on_disconnect_call
256
257 Takes arguments in the same form as L</on_connect_call> and executes them
258 immediately before disconnecting from the database.
259
260 Calls the C<disconnect_call_METHOD> methods as opposed to the
261 C<connect_call_METHOD> methods called by L</on_connect_call>.
262
263 Note, this only runs if you explicitly call L</disconnect> on the
264 storage object.
265
266 =item disable_sth_caching
267
268 If set to a true value, this option will disable the caching of
269 statement handles via L<DBI/prepare_cached>.
270
271 =item limit_dialect
272
273 Sets the limit dialect. This is useful for JDBC-bridge among others
274 where the remote SQL-dialect cannot be determined by the name of the
275 driver alone. See also L<SQL::Abstract::Limit>.
276
277 =item quote_char
278
279 Specifies what characters to use to quote table and column names. If
280 you use this you will want to specify L</name_sep> as well.
281
282 C<quote_char> expects either a single character, in which case is it
283 is placed on either side of the table/column name, or an arrayref of length
284 2 in which case the table/column name is placed between the elements.
285
286 For example under MySQL you should use C<< quote_char => '`' >>, and for
287 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
288
289 =item name_sep
290
291 This only needs to be used in conjunction with C<quote_char>, and is used to
292 specify the charecter that seperates elements (schemas, tables, columns) from
293 each other. In most cases this is simply a C<.>.
294
295 The consequences of not supplying this value is that L<SQL::Abstract>
296 will assume DBIx::Class' uses of aliases to be complete column
297 names. The output will look like I<"me.name"> when it should actually
298 be I<"me"."name">.
299
300 =item unsafe
301
302 This Storage driver normally installs its own C<HandleError>, sets
303 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
304 all database handles, including those supplied by a coderef.  It does this
305 so that it can have consistent and useful error behavior.
306
307 If you set this option to a true value, Storage will not do its usual
308 modifications to the database handle's attributes, and instead relies on
309 the settings in your connect_info DBI options (or the values you set in
310 your connection coderef, in the case that you are connecting via coderef).
311
312 Note that your custom settings can cause Storage to malfunction,
313 especially if you set a C<HandleError> handler that suppresses exceptions
314 and/or disable C<RaiseError>.
315
316 =item auto_savepoint
317
318 If this option is true, L<DBIx::Class> will use savepoints when nesting
319 transactions, making it possible to recover from failure in the inner
320 transaction without having to abort all outer transactions.
321
322 =item cursor_class
323
324 Use this argument to supply a cursor class other than the default
325 L<DBIx::Class::Storage::DBI::Cursor>.
326
327 =back
328
329 Some real-life examples of arguments to L</connect_info> and
330 L<DBIx::Class::Schema/connect>
331
332   # Simple SQLite connection
333   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
334
335   # Connect via subref
336   ->connect_info([ sub { DBI->connect(...) } ]);
337
338   # A bit more complicated
339   ->connect_info(
340     [
341       'dbi:Pg:dbname=foo',
342       'postgres',
343       'my_pg_password',
344       { AutoCommit => 1 },
345       { quote_char => q{"}, name_sep => q{.} },
346     ]
347   );
348
349   # Equivalent to the previous example
350   ->connect_info(
351     [
352       'dbi:Pg:dbname=foo',
353       'postgres',
354       'my_pg_password',
355       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
356     ]
357   );
358
359   # Same, but with hashref as argument
360   # See parse_connect_info for explanation
361   ->connect_info(
362     [{
363       dsn         => 'dbi:Pg:dbname=foo',
364       user        => 'postgres',
365       password    => 'my_pg_password',
366       AutoCommit  => 1,
367       quote_char  => q{"},
368       name_sep    => q{.},
369     }]
370   );
371
372   # Subref + DBIx::Class-specific connection options
373   ->connect_info(
374     [
375       sub { DBI->connect(...) },
376       {
377           quote_char => q{`},
378           name_sep => q{@},
379           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
380           disable_sth_caching => 1,
381       },
382     ]
383   );
384
385
386
387 =cut
388
389 sub connect_info {
390   my ($self, $info_arg) = @_;
391
392   return $self->_connect_info if !$info_arg;
393
394   my @args = @$info_arg;  # take a shallow copy for further mutilation
395   $self->_connect_info([@args]); # copy for _connect_info
396
397
398   # combine/pre-parse arguments depending on invocation style
399
400   my %attrs;
401   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
402     %attrs = %{ $args[1] || {} };
403     @args = $args[0];
404   }
405   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
406     %attrs = %{$args[0]};
407     @args = ();
408     for (qw/password user dsn/) {
409       unshift @args, delete $attrs{$_};
410     }
411   }
412   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
413     %attrs = (
414       % { $args[3] || {} },
415       % { $args[4] || {} },
416     );
417     @args = @args[0,1,2];
418   }
419
420   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
421   #  the new set of options
422   $self->_sql_maker(undef);
423   $self->_sql_maker_opts({});
424
425   if(keys %attrs) {
426     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
427       if(my $value = delete $attrs{$storage_opt}) {
428         $self->$storage_opt($value);
429       }
430     }
431     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
432       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
433         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
434       }
435     }
436   }
437
438   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
439
440   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
441   $self->_connect_info;
442 }
443
444 =head2 on_connect_do
445
446 This method is deprecated in favour of setting via L</connect_info>.
447
448 =cut
449
450 =head2 on_disconnect_do
451
452 This method is deprecated in favour of setting via L</connect_info>.
453
454 =cut
455
456 sub _parse_connect_do {
457   my ($self, $type) = @_;
458
459   my $val = $self->$type;
460   return () if not defined $val;
461
462   my @res;
463
464   if (not ref($val)) {
465     push @res, [ 'do_sql', $val ];
466   } elsif (ref($val) eq 'CODE') {
467     push @res, $val;
468   } elsif (ref($val) eq 'ARRAY') {
469     push @res, map { [ 'do_sql', $_ ] } @$val;
470   } else {
471     $self->throw_exception("Invalid type for $type: ".ref($val));
472   }
473
474   return \@res;
475 }
476
477 =head2 dbh_do
478
479 Arguments: ($subref | $method_name), @extra_coderef_args?
480
481 Execute the given $subref or $method_name using the new exception-based
482 connection management.
483
484 The first two arguments will be the storage object that C<dbh_do> was called
485 on and a database handle to use.  Any additional arguments will be passed
486 verbatim to the called subref as arguments 2 and onwards.
487
488 Using this (instead of $self->_dbh or $self->dbh) ensures correct
489 exception handling and reconnection (or failover in future subclasses).
490
491 Your subref should have no side-effects outside of the database, as
492 there is the potential for your subref to be partially double-executed
493 if the database connection was stale/dysfunctional.
494
495 Example:
496
497   my @stuff = $schema->storage->dbh_do(
498     sub {
499       my ($storage, $dbh, @cols) = @_;
500       my $cols = join(q{, }, @cols);
501       $dbh->selectrow_array("SELECT $cols FROM foo");
502     },
503     @column_list
504   );
505
506 =cut
507
508 sub dbh_do {
509   my $self = shift;
510   my $code = shift;
511
512   my $dbh = $self->_dbh;
513
514   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
515       || $self->{transaction_depth};
516
517   local $self->{_in_dbh_do} = 1;
518
519   my @result;
520   my $want_array = wantarray;
521
522   eval {
523     $self->_verify_pid if $dbh;
524     if(!$self->_dbh) {
525         $self->_populate_dbh;
526         $dbh = $self->_dbh;
527     }
528
529     if($want_array) {
530         @result = $self->$code($dbh, @_);
531     }
532     elsif(defined $want_array) {
533         $result[0] = $self->$code($dbh, @_);
534     }
535     else {
536         $self->$code($dbh, @_);
537     }
538   };
539
540   my $exception = $@;
541   if(!$exception) { return $want_array ? @result : $result[0] }
542
543   $self->throw_exception($exception) if $self->connected;
544
545   # We were not connected - reconnect and retry, but let any
546   #  exception fall right through this time
547   $self->_populate_dbh;
548   $self->$code($self->_dbh, @_);
549 }
550
551 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
552 # It also informs dbh_do to bypass itself while under the direction of txn_do,
553 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
554 sub txn_do {
555   my $self = shift;
556   my $coderef = shift;
557
558   ref $coderef eq 'CODE' or $self->throw_exception
559     ('$coderef must be a CODE reference');
560
561   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
562
563   local $self->{_in_dbh_do} = 1;
564
565   my @result;
566   my $want_array = wantarray;
567
568   my $tried = 0;
569   while(1) {
570     eval {
571       $self->_verify_pid if $self->_dbh;
572       $self->_populate_dbh if !$self->_dbh;
573
574       $self->txn_begin;
575       if($want_array) {
576           @result = $coderef->(@_);
577       }
578       elsif(defined $want_array) {
579           $result[0] = $coderef->(@_);
580       }
581       else {
582           $coderef->(@_);
583       }
584       $self->txn_commit;
585     };
586
587     my $exception = $@;
588     if(!$exception) { return $want_array ? @result : $result[0] }
589
590     if($tried++ > 0 || $self->connected) {
591       eval { $self->txn_rollback };
592       my $rollback_exception = $@;
593       if($rollback_exception) {
594         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
595         $self->throw_exception($exception)  # propagate nested rollback
596           if $rollback_exception =~ /$exception_class/;
597
598         $self->throw_exception(
599           "Transaction aborted: ${exception}. "
600           . "Rollback failed: ${rollback_exception}"
601         );
602       }
603       $self->throw_exception($exception)
604     }
605
606     # We were not connected, and was first try - reconnect and retry
607     # via the while loop
608     $self->_populate_dbh;
609   }
610 }
611
612 =head2 disconnect
613
614 Our C<disconnect> method also performs a rollback first if the
615 database is not in C<AutoCommit> mode.
616
617 =cut
618
619 sub disconnect {
620   my ($self) = @_;
621
622   if( $self->connected ) {
623     my @actions;
624
625     push @actions, ( $self->on_disconnect_call || () );
626     push @actions, $self->_parse_connect_do ('on_disconnect_do');
627
628     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
629
630     $self->_dbh->rollback unless $self->_dbh_autocommit;
631     $self->_dbh->disconnect;
632     $self->_dbh(undef);
633     $self->{_dbh_gen}++;
634   }
635 }
636
637 =head2 with_deferred_fk_checks
638
639 =over 4
640
641 =item Arguments: C<$coderef>
642
643 =item Return Value: The return value of $coderef
644
645 =back
646
647 Storage specific method to run the code ref with FK checks deferred or
648 in MySQL's case disabled entirely.
649
650 =cut
651
652 # Storage subclasses should override this
653 sub with_deferred_fk_checks {
654   my ($self, $sub) = @_;
655
656   $sub->();
657 }
658
659 sub connected {
660   my ($self) = @_;
661
662   if(my $dbh = $self->_dbh) {
663       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
664           $self->_dbh(undef);
665           $self->{_dbh_gen}++;
666           return;
667       }
668       else {
669           $self->_verify_pid;
670           return 0 if !$self->_dbh;
671       }
672       return ($dbh->FETCH('Active') && $dbh->ping);
673   }
674
675   return 0;
676 }
677
678 # handle pid changes correctly
679 #  NOTE: assumes $self->_dbh is a valid $dbh
680 sub _verify_pid {
681   my ($self) = @_;
682
683   return if defined $self->_conn_pid && $self->_conn_pid == $$;
684
685   $self->_dbh->{InactiveDestroy} = 1;
686   $self->_dbh(undef);
687   $self->{_dbh_gen}++;
688
689   return;
690 }
691
692 sub ensure_connected {
693   my ($self) = @_;
694
695   unless ($self->connected) {
696     $self->_populate_dbh;
697   }
698 }
699
700 =head2 dbh
701
702 Returns the dbh - a data base handle of class L<DBI>.
703
704 =cut
705
706 sub dbh {
707   my ($self) = @_;
708
709   $self->ensure_connected;
710   return $self->_dbh;
711 }
712
713 sub _sql_maker_args {
714     my ($self) = @_;
715
716     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
717 }
718
719 sub sql_maker {
720   my ($self) = @_;
721   unless ($self->_sql_maker) {
722     my $sql_maker_class = $self->sql_maker_class;
723     $self->ensure_class_loaded ($sql_maker_class);
724     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
725   }
726   return $self->_sql_maker;
727 }
728
729 sub _rebless {}
730
731 sub _populate_dbh {
732   my ($self) = @_;
733   my @info = @{$self->_dbi_connect_info || []};
734   $self->_dbh($self->_connect(@info));
735
736   $self->_conn_pid($$);
737   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
738
739   $self->_determine_driver;
740
741   # Always set the transaction depth on connect, since
742   #  there is no transaction in progress by definition
743   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
744
745   my @actions;
746
747   push @actions, ( $self->on_connect_call || () );
748   push @actions, $self->_parse_connect_do ('on_connect_do');
749
750   $self->_do_connection_actions(connect_call_ => $_) for @actions;
751 }
752
753 sub _determine_driver {
754   my ($self) = @_;
755
756   if (ref $self eq 'DBIx::Class::Storage::DBI') {
757     my $driver;
758
759     if ($self->_dbh) { # we are connected
760       $driver = $self->_dbh->{Driver}{Name};
761     } else {
762       # try to use dsn to not require being connected, the driver may still
763       # force a connection in _rebless to determine version
764       ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
765     }
766
767     my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
768     if ($self->load_optional_class($storage_class)) {
769       mro::set_mro($storage_class, 'c3');
770       bless $self, $storage_class;
771       $self->_rebless();
772     }
773   }
774 }
775
776 sub _do_connection_actions {
777   my $self          = shift;
778   my $method_prefix = shift;
779   my $call          = shift;
780
781   if (not ref($call)) {
782     my $method = $method_prefix . $call;
783     $self->$method(@_);
784   } elsif (ref($call) eq 'CODE') {
785     $self->$call(@_);
786   } elsif (ref($call) eq 'ARRAY') {
787     if (ref($call->[0]) ne 'ARRAY') {
788       $self->_do_connection_actions($method_prefix, $_) for @$call;
789     } else {
790       $self->_do_connection_actions($method_prefix, @$_) for @$call;
791     }
792   } else {
793     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
794   }
795
796   return $self;
797 }
798
799 sub connect_call_do_sql {
800   my $self = shift;
801   $self->_do_query(@_);
802 }
803
804 sub disconnect_call_do_sql {
805   my $self = shift;
806   $self->_do_query(@_);
807 }
808
809 # override in db-specific backend when necessary
810 sub connect_call_datetime_setup { 1 }
811
812 sub _do_query {
813   my ($self, $action) = @_;
814
815   if (ref $action eq 'CODE') {
816     $action = $action->($self);
817     $self->_do_query($_) foreach @$action;
818   }
819   else {
820     # Most debuggers expect ($sql, @bind), so we need to exclude
821     # the attribute hash which is the second argument to $dbh->do
822     # furthermore the bind values are usually to be presented
823     # as named arrayref pairs, so wrap those here too
824     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
825     my $sql = shift @do_args;
826     my $attrs = shift @do_args;
827     my @bind = map { [ undef, $_ ] } @do_args;
828
829     $self->_query_start($sql, @bind);
830     $self->_dbh->do($sql, $attrs, @do_args);
831     $self->_query_end($sql, @bind);
832   }
833
834   return $self;
835 }
836
837 sub _connect {
838   my ($self, @info) = @_;
839
840   $self->throw_exception("You failed to provide any connection info")
841     if !@info;
842
843   my ($old_connect_via, $dbh);
844
845   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
846     $old_connect_via = $DBI::connect_via;
847     $DBI::connect_via = 'connect';
848   }
849
850   eval {
851     if(ref $info[0] eq 'CODE') {
852        $dbh = &{$info[0]}
853     }
854     else {
855        $dbh = DBI->connect(@info);
856     }
857
858     if($dbh && !$self->unsafe) {
859       my $weak_self = $self;
860       Scalar::Util::weaken($weak_self);
861       $dbh->{HandleError} = sub {
862           if ($weak_self) {
863             $weak_self->throw_exception("DBI Exception: $_[0]");
864           }
865           else {
866             croak ("DBI Exception: $_[0]");
867           }
868       };
869       $dbh->{ShowErrorStatement} = 1;
870       $dbh->{RaiseError} = 1;
871       $dbh->{PrintError} = 0;
872     }
873   };
874
875   $DBI::connect_via = $old_connect_via if $old_connect_via;
876
877   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
878     if !$dbh || $@;
879
880   $self->_dbh_autocommit($dbh->{AutoCommit});
881
882   $dbh;
883 }
884
885 sub svp_begin {
886   my ($self, $name) = @_;
887
888   $name = $self->_svp_generate_name
889     unless defined $name;
890
891   $self->throw_exception ("You can't use savepoints outside a transaction")
892     if $self->{transaction_depth} == 0;
893
894   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
895     unless $self->can('_svp_begin');
896
897   push @{ $self->{savepoints} }, $name;
898
899   $self->debugobj->svp_begin($name) if $self->debug;
900
901   return $self->_svp_begin($name);
902 }
903
904 sub svp_release {
905   my ($self, $name) = @_;
906
907   $self->throw_exception ("You can't use savepoints outside a transaction")
908     if $self->{transaction_depth} == 0;
909
910   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
911     unless $self->can('_svp_release');
912
913   if (defined $name) {
914     $self->throw_exception ("Savepoint '$name' does not exist")
915       unless grep { $_ eq $name } @{ $self->{savepoints} };
916
917     # Dig through the stack until we find the one we are releasing.  This keeps
918     # the stack up to date.
919     my $svp;
920
921     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
922   } else {
923     $name = pop @{ $self->{savepoints} };
924   }
925
926   $self->debugobj->svp_release($name) if $self->debug;
927
928   return $self->_svp_release($name);
929 }
930
931 sub svp_rollback {
932   my ($self, $name) = @_;
933
934   $self->throw_exception ("You can't use savepoints outside a transaction")
935     if $self->{transaction_depth} == 0;
936
937   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
938     unless $self->can('_svp_rollback');
939
940   if (defined $name) {
941       # If they passed us a name, verify that it exists in the stack
942       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
943           $self->throw_exception("Savepoint '$name' does not exist!");
944       }
945
946       # Dig through the stack until we find the one we are releasing.  This keeps
947       # the stack up to date.
948       while(my $s = pop(@{ $self->{savepoints} })) {
949           last if($s eq $name);
950       }
951       # Add the savepoint back to the stack, as a rollback doesn't remove the
952       # named savepoint, only everything after it.
953       push(@{ $self->{savepoints} }, $name);
954   } else {
955       # We'll assume they want to rollback to the last savepoint
956       $name = $self->{savepoints}->[-1];
957   }
958
959   $self->debugobj->svp_rollback($name) if $self->debug;
960
961   return $self->_svp_rollback($name);
962 }
963
964 sub _svp_generate_name {
965     my ($self) = @_;
966
967     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
968 }
969
970 sub txn_begin {
971   my $self = shift;
972   $self->ensure_connected();
973   if($self->{transaction_depth} == 0) {
974     $self->debugobj->txn_begin()
975       if $self->debug;
976     # this isn't ->_dbh-> because
977     #  we should reconnect on begin_work
978     #  for AutoCommit users
979     $self->dbh->begin_work;
980   } elsif ($self->auto_savepoint) {
981     $self->svp_begin;
982   }
983   $self->{transaction_depth}++;
984 }
985
986 sub txn_commit {
987   my $self = shift;
988   if ($self->{transaction_depth} == 1) {
989     my $dbh = $self->_dbh;
990     $self->debugobj->txn_commit()
991       if ($self->debug);
992     $dbh->commit;
993     $self->{transaction_depth} = 0
994       if $self->_dbh_autocommit;
995   }
996   elsif($self->{transaction_depth} > 1) {
997     $self->{transaction_depth}--;
998     $self->svp_release
999       if $self->auto_savepoint;
1000   }
1001 }
1002
1003 sub txn_rollback {
1004   my $self = shift;
1005   my $dbh = $self->_dbh;
1006   eval {
1007     if ($self->{transaction_depth} == 1) {
1008       $self->debugobj->txn_rollback()
1009         if ($self->debug);
1010       $self->{transaction_depth} = 0
1011         if $self->_dbh_autocommit;
1012       $dbh->rollback;
1013     }
1014     elsif($self->{transaction_depth} > 1) {
1015       $self->{transaction_depth}--;
1016       if ($self->auto_savepoint) {
1017         $self->svp_rollback;
1018         $self->svp_release;
1019       }
1020     }
1021     else {
1022       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1023     }
1024   };
1025   if ($@) {
1026     my $error = $@;
1027     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1028     $error =~ /$exception_class/ and $self->throw_exception($error);
1029     # ensure that a failed rollback resets the transaction depth
1030     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1031     $self->throw_exception($error);
1032   }
1033 }
1034
1035 # This used to be the top-half of _execute.  It was split out to make it
1036 #  easier to override in NoBindVars without duping the rest.  It takes up
1037 #  all of _execute's args, and emits $sql, @bind.
1038 sub _prep_for_execute {
1039   my ($self, $op, $extra_bind, $ident, $args) = @_;
1040
1041   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1042     $ident = $ident->from();
1043   }
1044
1045   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1046
1047   unshift(@bind,
1048     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1049       if $extra_bind;
1050   return ($sql, \@bind);
1051 }
1052
1053
1054 sub _fix_bind_params {
1055     my ($self, @bind) = @_;
1056
1057     ### Turn @bind from something like this:
1058     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1059     ### to this:
1060     ###   ( "'1'", "'1'", "'3'" )
1061     return
1062         map {
1063             if ( defined( $_ && $_->[1] ) ) {
1064                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1065             }
1066             else { q{'NULL'}; }
1067         } @bind;
1068 }
1069
1070 sub _query_start {
1071     my ( $self, $sql, @bind ) = @_;
1072
1073     if ( $self->debug ) {
1074         @bind = $self->_fix_bind_params(@bind);
1075
1076         $self->debugobj->query_start( $sql, @bind );
1077     }
1078 }
1079
1080 sub _query_end {
1081     my ( $self, $sql, @bind ) = @_;
1082
1083     if ( $self->debug ) {
1084         @bind = $self->_fix_bind_params(@bind);
1085         $self->debugobj->query_end( $sql, @bind );
1086     }
1087 }
1088
1089 sub _dbh_execute {
1090   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1091
1092   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1093
1094   $self->_query_start( $sql, @$bind );
1095
1096   my $sth = $self->sth($sql,$op);
1097
1098   my $placeholder_index = 1;
1099
1100   foreach my $bound (@$bind) {
1101     my $attributes = {};
1102     my($column_name, @data) = @$bound;
1103
1104     if ($bind_attributes) {
1105       $attributes = $bind_attributes->{$column_name}
1106       if defined $bind_attributes->{$column_name};
1107     }
1108
1109     foreach my $data (@data) {
1110       my $ref = ref $data;
1111       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1112
1113       $sth->bind_param($placeholder_index, $data, $attributes);
1114       $placeholder_index++;
1115     }
1116   }
1117
1118   # Can this fail without throwing an exception anyways???
1119   my $rv = $sth->execute();
1120   $self->throw_exception($sth->errstr) if !$rv;
1121
1122   $self->_query_end( $sql, @$bind );
1123
1124   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1125 }
1126
1127 sub _execute {
1128     my $self = shift;
1129     $self->dbh_do('_dbh_execute', @_)
1130 }
1131
1132 sub insert {
1133   my ($self, $source, $to_insert) = @_;
1134
1135   my $ident = $source->from;
1136   my $bind_attributes = $self->source_bind_attributes($source);
1137
1138   my $updated_cols = {};
1139
1140   $self->ensure_connected;
1141   foreach my $col ( $source->columns ) {
1142     if ( !defined $to_insert->{$col} ) {
1143       my $col_info = $source->column_info($col);
1144
1145       if ( $col_info->{auto_nextval} ) {
1146         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1147       }
1148     }
1149   }
1150
1151   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1152
1153   return $updated_cols;
1154 }
1155
1156 ## Still not quite perfect, and EXPERIMENTAL
1157 ## Currently it is assumed that all values passed will be "normal", i.e. not
1158 ## scalar refs, or at least, all the same type as the first set, the statement is
1159 ## only prepped once.
1160 sub insert_bulk {
1161   my ($self, $source, $cols, $data) = @_;
1162   my %colvalues;
1163   my $table = $source->from;
1164   @colvalues{@$cols} = (0..$#$cols);
1165   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1166
1167   $self->_query_start( $sql, @bind );
1168   my $sth = $self->sth($sql);
1169
1170 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1171
1172   ## This must be an arrayref, else nothing works!
1173   my $tuple_status = [];
1174
1175   ## Get the bind_attributes, if any exist
1176   my $bind_attributes = $self->source_bind_attributes($source);
1177
1178   ## Bind the values and execute
1179   my $placeholder_index = 1;
1180
1181   foreach my $bound (@bind) {
1182
1183     my $attributes = {};
1184     my ($column_name, $data_index) = @$bound;
1185
1186     if( $bind_attributes ) {
1187       $attributes = $bind_attributes->{$column_name}
1188       if defined $bind_attributes->{$column_name};
1189     }
1190
1191     my @data = map { $_->[$data_index] } @$data;
1192
1193     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1194     $placeholder_index++;
1195   }
1196   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1197   if (my $err = $@) {
1198     my $i = 0;
1199     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1200
1201     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1202       if ($i > $#$tuple_status);
1203
1204     require Data::Dumper;
1205     local $Data::Dumper::Terse = 1;
1206     local $Data::Dumper::Indent = 1;
1207     local $Data::Dumper::Useqq = 1;
1208     local $Data::Dumper::Quotekeys = 0;
1209
1210     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1211       $tuple_status->[$i][1],
1212       Data::Dumper::Dumper(
1213         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1214       ),
1215     );
1216   }
1217   $self->throw_exception($sth->errstr) if !$rv;
1218
1219   $self->_query_end( $sql, @bind );
1220   return (wantarray ? ($rv, $sth, @bind) : $rv);
1221 }
1222
1223 sub update {
1224   my $self = shift @_;
1225   my $source = shift @_;
1226   my $bind_attributes = $self->source_bind_attributes($source);
1227
1228   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1229 }
1230
1231
1232 sub delete {
1233   my $self = shift @_;
1234   my $source = shift @_;
1235
1236   my $bind_attrs = $self->source_bind_attributes($source);
1237
1238   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1239 }
1240
1241 # We were sent here because the $rs contains a complex search
1242 # which will require a subquery to select the correct rows
1243 # (i.e. joined or limited resultsets)
1244 #
1245 # Genarating a single PK column subquery is trivial and supported
1246 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1247 # Look at _multipk_update_delete()
1248 sub _subq_update_delete {
1249   my $self = shift;
1250   my ($rs, $op, $values) = @_;
1251
1252   my $rsrc = $rs->result_source;
1253
1254   # we already check this, but double check naively just in case. Should be removed soon
1255   my $sel = $rs->_resolved_attrs->{select};
1256   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1257   my @pcols = $rsrc->primary_columns;
1258   if (@$sel != @pcols) {
1259     $self->throw_exception (
1260       'Subquery update/delete can not be called on resultsets selecting a'
1261      .' number of columns different than the number of primary keys'
1262     );
1263   }
1264
1265   if (@pcols == 1) {
1266     return $self->$op (
1267       $rsrc,
1268       $op eq 'update' ? $values : (),
1269       { $pcols[0] => { -in => $rs->as_query } },
1270     );
1271   }
1272
1273   else {
1274     return $self->_multipk_update_delete (@_);
1275   }
1276 }
1277
1278 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1279 # resultset update/delete involving subqueries. So by default resort
1280 # to simple (and inefficient) delete_all style per-row opearations,
1281 # while allowing specific storages to override this with a faster
1282 # implementation.
1283 #
1284 sub _multipk_update_delete {
1285   return shift->_per_row_update_delete (@_);
1286 }
1287
1288 # This is the default loop used to delete/update rows for multi PK
1289 # resultsets, and used by mysql exclusively (because it can't do anything
1290 # else).
1291 #
1292 # We do not use $row->$op style queries, because resultset update/delete
1293 # is not expected to cascade (this is what delete_all/update_all is for).
1294 #
1295 # There should be no race conditions as the entire operation is rolled
1296 # in a transaction.
1297 #
1298 sub _per_row_update_delete {
1299   my $self = shift;
1300   my ($rs, $op, $values) = @_;
1301
1302   my $rsrc = $rs->result_source;
1303   my @pcols = $rsrc->primary_columns;
1304
1305   my $guard = $self->txn_scope_guard;
1306
1307   # emulate the return value of $sth->execute for non-selects
1308   my $row_cnt = '0E0';
1309
1310   my $subrs_cur = $rs->cursor;
1311   while (my @pks = $subrs_cur->next) {
1312
1313     my $cond;
1314     for my $i (0.. $#pcols) {
1315       $cond->{$pcols[$i]} = $pks[$i];
1316     }
1317
1318     $self->$op (
1319       $rsrc,
1320       $op eq 'update' ? $values : (),
1321       $cond,
1322     );
1323
1324     $row_cnt++;
1325   }
1326
1327   $guard->commit;
1328
1329   return $row_cnt;
1330 }
1331
1332 sub _select {
1333   my $self = shift;
1334
1335   # localization is neccessary as
1336   # 1) there is no infrastructure to pass this around before SQLA2
1337   # 2) _select_args sets it and _prep_for_execute consumes it
1338   my $sql_maker = $self->sql_maker;
1339   local $sql_maker->{_dbic_rs_attrs};
1340
1341   return $self->_execute($self->_select_args(@_));
1342 }
1343
1344 sub _select_args_to_query {
1345   my $self = shift;
1346
1347   # localization is neccessary as
1348   # 1) there is no infrastructure to pass this around before SQLA2
1349   # 2) _select_args sets it and _prep_for_execute consumes it
1350   my $sql_maker = $self->sql_maker;
1351   local $sql_maker->{_dbic_rs_attrs};
1352
1353   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1354   #  = $self->_select_args($ident, $select, $cond, $attrs);
1355   my ($op, $bind, $ident, $bind_attrs, @args) =
1356     $self->_select_args(@_);
1357
1358   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1359   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1360   $prepared_bind ||= [];
1361
1362   return wantarray
1363     ? ($sql, $prepared_bind, $bind_attrs)
1364     : \[ "($sql)", @$prepared_bind ]
1365   ;
1366 }
1367
1368 sub _select_args {
1369   my ($self, $ident, $select, $where, $attrs) = @_;
1370
1371   my $sql_maker = $self->sql_maker;
1372   $sql_maker->{_dbic_rs_attrs} = {
1373     %$attrs,
1374     select => $select,
1375     from => $ident,
1376     where => $where,
1377   };
1378
1379   my ($alias2source, $root_alias) = $self->_resolve_ident_sources ($ident);
1380
1381   # calculate bind_attrs before possible $ident mangling
1382   my $bind_attrs = {};
1383   for my $alias (keys %$alias2source) {
1384     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1385     for my $col (keys %$bindtypes) {
1386
1387       my $fqcn = join ('.', $alias, $col);
1388       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1389
1390       # so that unqualified searches can be bound too
1391       $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq $root_alias;
1392     }
1393   }
1394
1395   my @limit;
1396   if ($attrs->{software_limit} ||
1397       $sql_maker->_default_limit_syntax eq "GenericSubQ") {
1398         $attrs->{software_limit} = 1;
1399   } else {
1400     $self->throw_exception("rows attribute must be positive if present")
1401       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1402
1403     # MySQL actually recommends this approach.  I cringe.
1404     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1405
1406     if ($attrs->{rows} && keys %{$attrs->{collapse}}) {
1407       ($ident, $select, $where, $attrs)
1408         = $self->_adjust_select_args_for_limited_prefetch ($ident, $select, $where, $attrs);
1409     }
1410     else {
1411       push @limit, $attrs->{rows}, $attrs->{offset};
1412     }
1413   }
1414
1415 ###
1416   # This would be the point to deflate anything found in $where
1417   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1418   # expect a row object. And all we have is a resultsource (it is trivial
1419   # to extract deflator coderefs via $alias2source above).
1420   #
1421   # I don't see a way forward other than changing the way deflators are
1422   # invoked, and that's just bad...
1423 ###
1424
1425   my $order = { map
1426     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1427     (qw/order_by group_by having _virtual_order_by/ )
1428   };
1429
1430
1431   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1432 }
1433
1434 sub _adjust_select_args_for_limited_prefetch {
1435   my ($self, $from, $select, $where, $attrs) = @_;
1436
1437   if ($attrs->{group_by} && @{$attrs->{group_by}}) {
1438     $self->throw_exception ('has_many prefetch with limit (rows/offset) is not supported on grouped resultsets');
1439   }
1440
1441   $self->throw_exception ('has_many prefetch with limit (rows/offset) is not supported on resultsets with a custom from attribute')
1442     if (ref $from ne 'ARRAY');
1443
1444
1445   # separate attributes
1446   my $sub_attrs = { %$attrs };
1447   delete $attrs->{$_} for qw/where bind rows offset/;
1448   delete $sub_attrs->{$_} for qw/for collapse select as order_by/;
1449
1450   my $alias = $attrs->{alias};
1451
1452   # create subquery select list
1453   my $sub_select = [ grep { $_ =~ /^$alias\./ } @{$attrs->{select}} ];
1454
1455   # bring over all non-collapse-induced order_by into the inner query (if any)
1456   # the outer one will have to keep them all
1457   if (my $ord_cnt = @{$attrs->{order_by}} - @{$attrs->{_collapse_order_by}} ) {
1458     $sub_attrs->{order_by} = [
1459       @{$attrs->{order_by}}[ 0 .. ($#{$attrs->{order_by}} - $ord_cnt - 1) ]
1460     ];
1461   }
1462
1463   # mangle {from}
1464   $from = [ @$from ];
1465   my $select_root = shift @$from;
1466   my @outer_from = @$from;
1467
1468   my %inner_joins;
1469   my %join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1470
1471   # in complex search_related chains $alias may *not* be 'me'
1472   # so always include it in the inner join, and also shift away
1473   # from the outer stack, so that the two datasets actually do
1474   # meet
1475   if ($select_root->{-alias} ne $alias) {
1476     $inner_joins{$alias} = 1;
1477
1478     while (@outer_from && $outer_from[0][0]{-alias} ne $alias) {
1479       shift @outer_from;
1480     }
1481     if (! @outer_from) {
1482       $self->throw_exception ("Unable to find '$alias' in the {from} stack, something is wrong");
1483     }
1484
1485     shift @outer_from; # the new subquery will represent this alias, so get rid of it
1486   }
1487
1488
1489   # decide which parts of the join will remain on the inside
1490   #
1491   # this is not a very viable optimisation, but it was written
1492   # before I realised this, so might as well remain. We can throw
1493   # away _any_ branches of the join tree that are:
1494   # 1) not mentioned in the condition/order
1495   # 2) left-join leaves (or left-join leaf chains)
1496   # Most of the join conditions will not satisfy this, but for real
1497   # complex queries some might, and we might make some RDBMS happy.
1498   #
1499   #
1500   # since we do not have introspectable SQLA, we fall back to ugly
1501   # scanning of raw SQL for WHERE, and for pieces of ORDER BY
1502   # in order to determine what goes into %inner_joins
1503   # It may not be very efficient, but it's a reasonable stop-gap
1504   {
1505     # produce stuff unquoted, so it can be scanned
1506     my $sql_maker = $self->sql_maker;
1507     local $sql_maker->{quote_char};
1508
1509     my @order_by = (map
1510       { ref $_ ? $_->[0] : $_ }
1511       $sql_maker->_order_by_chunks ($sub_attrs->{order_by})
1512     );
1513
1514     my $where_sql = $sql_maker->where ($where);
1515
1516     # sort needed joins
1517     for my $alias (keys %join_info) {
1518
1519       # any table alias found on a column name in where or order_by
1520       # gets included in %inner_joins
1521       # Also any parent joins that are needed to reach this particular alias
1522       for my $piece ($where_sql, @order_by ) {
1523         if ($piece =~ /\b$alias\./) {
1524           $inner_joins{$alias} = 1;
1525         }
1526       }
1527     }
1528   }
1529
1530   # scan for non-leaf/non-left joins and mark as needed
1531   # also mark all ancestor joins that are needed to reach this particular alias
1532   # (e.g.  join => { cds => 'tracks' } - tracks will bring cds too )
1533   #
1534   # traverse by the size of the -join_path i.e. reverse depth first
1535   for my $alias (sort { @{$join_info{$b}{-join_path}} <=> @{$join_info{$a}{-join_path}} } (keys %join_info) ) {
1536
1537     my $j = $join_info{$alias};
1538     $inner_joins{$alias} = 1 if (! $j->{-join_type} || ($j->{-join_type} !~ /^left$/i) );
1539
1540     if ($inner_joins{$alias}) {
1541       $inner_joins{$_} = 1 for (@{$j->{-join_path}});
1542     }
1543   }
1544
1545   # construct the inner $from for the subquery
1546   my $inner_from = [ $select_root ];
1547   for my $j (@$from) {
1548     push @$inner_from, $j if $inner_joins{$j->[0]{-alias}};
1549   }
1550
1551   # if a multi-type join was needed in the subquery ("multi" is indicated by
1552   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1553
1554   for my $alias (keys %inner_joins) {
1555
1556     # the dot comes from some weirdness in collapse
1557     # remove after the rewrite
1558     if ($attrs->{collapse}{".$alias"}) {
1559       $sub_attrs->{group_by} = $sub_select;
1560       last;
1561     }
1562   }
1563
1564   # generate the subquery
1565   my $subq = $self->_select_args_to_query (
1566     $inner_from,
1567     $sub_select,
1568     $where,
1569     $sub_attrs
1570   );
1571
1572   # put it in the new {from}
1573   unshift @outer_from, {
1574     -alias => $alias,
1575     -source_handle => $select_root->{-source_handle},
1576     $alias => $subq,
1577   };
1578
1579   # This is totally horrific - the $where ends up in both the inner and outer query
1580   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
1581   # then if where conditions apply to the *right* side of the prefetch, you may have
1582   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
1583   # the outer select to exclude joins you didin't want in the first place
1584   #
1585   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1586   return (\@outer_from, $select, $where, $attrs);
1587 }
1588
1589 sub _resolve_ident_sources {
1590   my ($self, $ident) = @_;
1591
1592   my $alias2source = {};
1593   my $root_alias;
1594
1595   # the reason this is so contrived is that $ident may be a {from}
1596   # structure, specifying multiple tables to join
1597   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1598     # this is compat mode for insert/update/delete which do not deal with aliases
1599     $alias2source->{me} = $ident;
1600     $root_alias = 'me';
1601   }
1602   elsif (ref $ident eq 'ARRAY') {
1603
1604     for (@$ident) {
1605       my $tabinfo;
1606       if (ref $_ eq 'HASH') {
1607         $tabinfo = $_;
1608         $root_alias = $tabinfo->{-alias};
1609       }
1610       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1611         $tabinfo = $_->[0];
1612       }
1613
1614       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1615         if ($tabinfo->{-source_handle});
1616     }
1617   }
1618
1619   return ($alias2source, $root_alias);
1620 }
1621
1622 # Takes $ident, \@column_names
1623 #
1624 # returns { $column_name => \%column_info, ... }
1625 # also note: this adds -result_source => $rsrc to the column info
1626 #
1627 # usage:
1628 #   my $col_sources = $self->_resolve_column_info($ident, [map $_->[0], @{$bind}]);
1629 sub _resolve_column_info {
1630   my ($self, $ident, $colnames) = @_;
1631   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
1632
1633   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1634   $sep = "\Q$sep\E";
1635
1636   my (%return, %converted);
1637   foreach my $col (@$colnames) {
1638     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
1639
1640     # deal with unqualified cols - we assume the main alias for all
1641     # unqualified ones, ugly but can't think of anything better right now
1642     $alias ||= $root_alias;
1643
1644     my $rsrc = $alias2src->{$alias};
1645     $return{$col} = $rsrc && { %{$rsrc->column_info($colname)}, -result_source => $rsrc };
1646   }
1647   return \%return;
1648 }
1649
1650 # Returns a counting SELECT for a simple count
1651 # query. Abstracted so that a storage could override
1652 # this to { count => 'firstcol' } or whatever makes
1653 # sense as a performance optimization
1654 sub _count_select {
1655   #my ($self, $source, $rs_attrs) = @_;
1656   return { count => '*' };
1657 }
1658
1659 # Returns a SELECT which will end up in the subselect
1660 # There may or may not be a group_by, as the subquery
1661 # might have been called to accomodate a limit
1662 #
1663 # Most databases would be happy with whatever ends up
1664 # here, but some choke in various ways.
1665 #
1666 sub _subq_count_select {
1667   my ($self, $source, $rs_attrs) = @_;
1668   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1669
1670   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1671   return @pcols ? \@pcols : [ 1 ];
1672 }
1673
1674
1675 sub source_bind_attributes {
1676   my ($self, $source) = @_;
1677
1678   my $bind_attributes;
1679   foreach my $column ($source->columns) {
1680
1681     my $data_type = $source->column_info($column)->{data_type} || '';
1682     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1683      if $data_type;
1684   }
1685
1686   return $bind_attributes;
1687 }
1688
1689 =head2 select
1690
1691 =over 4
1692
1693 =item Arguments: $ident, $select, $condition, $attrs
1694
1695 =back
1696
1697 Handle a SQL select statement.
1698
1699 =cut
1700
1701 sub select {
1702   my $self = shift;
1703   my ($ident, $select, $condition, $attrs) = @_;
1704   return $self->cursor_class->new($self, \@_, $attrs);
1705 }
1706
1707 sub select_single {
1708   my $self = shift;
1709   my ($rv, $sth, @bind) = $self->_select(@_);
1710   my @row = $sth->fetchrow_array;
1711   my @nextrow = $sth->fetchrow_array if @row;
1712   if(@row && @nextrow) {
1713     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1714   }
1715   # Need to call finish() to work round broken DBDs
1716   $sth->finish();
1717   return @row;
1718 }
1719
1720 =head2 sth
1721
1722 =over 4
1723
1724 =item Arguments: $sql
1725
1726 =back
1727
1728 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1729
1730 =cut
1731
1732 sub _dbh_sth {
1733   my ($self, $dbh, $sql) = @_;
1734
1735   # 3 is the if_active parameter which avoids active sth re-use
1736   my $sth = $self->disable_sth_caching
1737     ? $dbh->prepare($sql)
1738     : $dbh->prepare_cached($sql, {}, 3);
1739
1740   # XXX You would think RaiseError would make this impossible,
1741   #  but apparently that's not true :(
1742   $self->throw_exception($dbh->errstr) if !$sth;
1743
1744   $sth;
1745 }
1746
1747 sub sth {
1748   my ($self, $sql) = @_;
1749   $self->dbh_do('_dbh_sth', $sql);
1750 }
1751
1752 sub _dbh_columns_info_for {
1753   my ($self, $dbh, $table) = @_;
1754
1755   if ($dbh->can('column_info')) {
1756     my %result;
1757     eval {
1758       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1759       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1760       $sth->execute();
1761       while ( my $info = $sth->fetchrow_hashref() ){
1762         my %column_info;
1763         $column_info{data_type}   = $info->{TYPE_NAME};
1764         $column_info{size}      = $info->{COLUMN_SIZE};
1765         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1766         $column_info{default_value} = $info->{COLUMN_DEF};
1767         my $col_name = $info->{COLUMN_NAME};
1768         $col_name =~ s/^\"(.*)\"$/$1/;
1769
1770         $result{$col_name} = \%column_info;
1771       }
1772     };
1773     return \%result if !$@ && scalar keys %result;
1774   }
1775
1776   my %result;
1777   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1778   $sth->execute;
1779   my @columns = @{$sth->{NAME_lc}};
1780   for my $i ( 0 .. $#columns ){
1781     my %column_info;
1782     $column_info{data_type} = $sth->{TYPE}->[$i];
1783     $column_info{size} = $sth->{PRECISION}->[$i];
1784     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1785
1786     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1787       $column_info{data_type} = $1;
1788       $column_info{size}    = $2;
1789     }
1790
1791     $result{$columns[$i]} = \%column_info;
1792   }
1793   $sth->finish;
1794
1795   foreach my $col (keys %result) {
1796     my $colinfo = $result{$col};
1797     my $type_num = $colinfo->{data_type};
1798     my $type_name;
1799     if(defined $type_num && $dbh->can('type_info')) {
1800       my $type_info = $dbh->type_info($type_num);
1801       $type_name = $type_info->{TYPE_NAME} if $type_info;
1802       $colinfo->{data_type} = $type_name if $type_name;
1803     }
1804   }
1805
1806   return \%result;
1807 }
1808
1809 sub columns_info_for {
1810   my ($self, $table) = @_;
1811   $self->dbh_do('_dbh_columns_info_for', $table);
1812 }
1813
1814 =head2 last_insert_id
1815
1816 Return the row id of the last insert.
1817
1818 =cut
1819
1820 sub _dbh_last_insert_id {
1821     # All Storage's need to register their own _dbh_last_insert_id
1822     # the old SQLite-based method was highly inappropriate
1823
1824     my $self = shift;
1825     my $class = ref $self;
1826     $self->throw_exception (<<EOE);
1827
1828 No _dbh_last_insert_id() method found in $class.
1829 Since the method of obtaining the autoincrement id of the last insert
1830 operation varies greatly between different databases, this method must be
1831 individually implemented for every storage class.
1832 EOE
1833 }
1834
1835 sub last_insert_id {
1836   my $self = shift;
1837   $self->dbh_do('_dbh_last_insert_id', @_);
1838 }
1839
1840 =head2 sqlt_type
1841
1842 Returns the database driver name.
1843
1844 =cut
1845
1846 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1847
1848 =head2 bind_attribute_by_data_type
1849
1850 Given a datatype from column info, returns a database specific bind
1851 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1852 let the database planner just handle it.
1853
1854 Generally only needed for special case column types, like bytea in postgres.
1855
1856 =cut
1857
1858 sub bind_attribute_by_data_type {
1859     return;
1860 }
1861
1862 =head2 is_datatype_numeric
1863
1864 Given a datatype from column_info, returns a boolean value indicating if
1865 the current RDBMS considers it a numeric value. This controls how
1866 L<DBIx::Class::Row/set_column> decides whether to mark the column as
1867 dirty - when the datatype is deemed numeric a C<< != >> comparison will
1868 be performed instead of the usual C<eq>.
1869
1870 =cut
1871
1872 sub is_datatype_numeric {
1873   my ($self, $dt) = @_;
1874
1875   return 0 unless $dt;
1876
1877   return $dt =~ /^ (?:
1878     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
1879   ) $/ix;
1880 }
1881
1882
1883 =head2 create_ddl_dir (EXPERIMENTAL)
1884
1885 =over 4
1886
1887 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1888
1889 =back
1890
1891 Creates a SQL file based on the Schema, for each of the specified
1892 database engines in C<\@databases> in the given directory.
1893 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
1894
1895 Given a previous version number, this will also create a file containing
1896 the ALTER TABLE statements to transform the previous schema into the
1897 current one. Note that these statements may contain C<DROP TABLE> or
1898 C<DROP COLUMN> statements that can potentially destroy data.
1899
1900 The file names are created using the C<ddl_filename> method below, please
1901 override this method in your schema if you would like a different file
1902 name format. For the ALTER file, the same format is used, replacing
1903 $version in the name with "$preversion-$version".
1904
1905 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
1906 The most common value for this would be C<< { add_drop_table => 1 } >>
1907 to have the SQL produced include a C<DROP TABLE> statement for each table
1908 created. For quoting purposes supply C<quote_table_names> and
1909 C<quote_field_names>.
1910
1911 If no arguments are passed, then the following default values are assumed:
1912
1913 =over 4
1914
1915 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
1916
1917 =item version    - $schema->schema_version
1918
1919 =item directory  - './'
1920
1921 =item preversion - <none>
1922
1923 =back
1924
1925 By default, C<\%sqlt_args> will have
1926
1927  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1928
1929 merged with the hash passed in. To disable any of those features, pass in a
1930 hashref like the following
1931
1932  { ignore_constraint_names => 0, # ... other options }
1933
1934
1935 Note that this feature is currently EXPERIMENTAL and may not work correctly
1936 across all databases, or fully handle complex relationships.
1937
1938 WARNING: Please check all SQL files created, before applying them.
1939
1940 =cut
1941
1942 sub create_ddl_dir {
1943   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1944
1945   if(!$dir || !-d $dir) {
1946     carp "No directory given, using ./\n";
1947     $dir = "./";
1948   }
1949   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1950   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1951
1952   my $schema_version = $schema->schema_version || '1.x';
1953   $version ||= $schema_version;
1954
1955   $sqltargs = {
1956     add_drop_table => 1,
1957     ignore_constraint_names => 1,
1958     ignore_index_names => 1,
1959     %{$sqltargs || {}}
1960   };
1961
1962   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1963       . $self->_check_sqlt_message . q{'})
1964           if !$self->_check_sqlt_version;
1965
1966   my $sqlt = SQL::Translator->new( $sqltargs );
1967
1968   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1969   my $sqlt_schema = $sqlt->translate({ data => $schema })
1970     or $self->throw_exception ($sqlt->error);
1971
1972   foreach my $db (@$databases) {
1973     $sqlt->reset();
1974     $sqlt->{schema} = $sqlt_schema;
1975     $sqlt->producer($db);
1976
1977     my $file;
1978     my $filename = $schema->ddl_filename($db, $version, $dir);
1979     if (-e $filename && ($version eq $schema_version )) {
1980       # if we are dumping the current version, overwrite the DDL
1981       carp "Overwriting existing DDL file - $filename";
1982       unlink($filename);
1983     }
1984
1985     my $output = $sqlt->translate;
1986     if(!$output) {
1987       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1988       next;
1989     }
1990     if(!open($file, ">$filename")) {
1991       $self->throw_exception("Can't open $filename for writing ($!)");
1992       next;
1993     }
1994     print $file $output;
1995     close($file);
1996
1997     next unless ($preversion);
1998
1999     require SQL::Translator::Diff;
2000
2001     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2002     if(!-e $prefilename) {
2003       carp("No previous schema file found ($prefilename)");
2004       next;
2005     }
2006
2007     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2008     if(-e $difffile) {
2009       carp("Overwriting existing diff file - $difffile");
2010       unlink($difffile);
2011     }
2012
2013     my $source_schema;
2014     {
2015       my $t = SQL::Translator->new($sqltargs);
2016       $t->debug( 0 );
2017       $t->trace( 0 );
2018
2019       $t->parser( $db )
2020         or $self->throw_exception ($t->error);
2021
2022       my $out = $t->translate( $prefilename )
2023         or $self->throw_exception ($t->error);
2024
2025       $source_schema = $t->schema;
2026
2027       $source_schema->name( $prefilename )
2028         unless ( $source_schema->name );
2029     }
2030
2031     # The "new" style of producers have sane normalization and can support
2032     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2033     # And we have to diff parsed SQL against parsed SQL.
2034     my $dest_schema = $sqlt_schema;
2035
2036     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2037       my $t = SQL::Translator->new($sqltargs);
2038       $t->debug( 0 );
2039       $t->trace( 0 );
2040
2041       $t->parser( $db )
2042         or $self->throw_exception ($t->error);
2043
2044       my $out = $t->translate( $filename )
2045         or $self->throw_exception ($t->error);
2046
2047       $dest_schema = $t->schema;
2048
2049       $dest_schema->name( $filename )
2050         unless $dest_schema->name;
2051     }
2052
2053     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2054                                                   $dest_schema,   $db,
2055                                                   $sqltargs
2056                                                  );
2057     if(!open $file, ">$difffile") {
2058       $self->throw_exception("Can't write to $difffile ($!)");
2059       next;
2060     }
2061     print $file $diff;
2062     close($file);
2063   }
2064 }
2065
2066 =head2 deployment_statements
2067
2068 =over 4
2069
2070 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2071
2072 =back
2073
2074 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2075
2076 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2077 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2078
2079 C<$directory> is used to return statements from files in a previously created
2080 L</create_ddl_dir> directory and is optional. The filenames are constructed
2081 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2082
2083 If no C<$directory> is specified then the statements are constructed on the
2084 fly using L<SQL::Translator> and C<$version> is ignored.
2085
2086 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2087
2088 =cut
2089
2090 sub deployment_statements {
2091   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2092   # Need to be connected to get the correct sqlt_type
2093   $self->ensure_connected() unless $type;
2094   $type ||= $self->sqlt_type;
2095   $version ||= $schema->schema_version || '1.x';
2096   $dir ||= './';
2097   my $filename = $schema->ddl_filename($type, $version, $dir);
2098   if(-f $filename)
2099   {
2100       my $file;
2101       open($file, "<$filename")
2102         or $self->throw_exception("Can't open $filename ($!)");
2103       my @rows = <$file>;
2104       close($file);
2105       return join('', @rows);
2106   }
2107
2108   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
2109       . $self->_check_sqlt_message . q{'})
2110           if !$self->_check_sqlt_version;
2111
2112   require SQL::Translator::Parser::DBIx::Class;
2113   eval qq{use SQL::Translator::Producer::${type}};
2114   $self->throw_exception($@) if $@;
2115
2116   # sources needs to be a parser arg, but for simplicty allow at top level
2117   # coming in
2118   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2119       if exists $sqltargs->{sources};
2120
2121   my $tr = SQL::Translator->new(%$sqltargs);
2122   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
2123   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
2124 }
2125
2126 sub deploy {
2127   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2128   my $deploy = sub {
2129     my $line = shift;
2130     return if($line =~ /^--/);
2131     return if(!$line);
2132     # next if($line =~ /^DROP/m);
2133     return if($line =~ /^BEGIN TRANSACTION/m);
2134     return if($line =~ /^COMMIT/m);
2135     return if $line =~ /^\s+$/; # skip whitespace only
2136     $self->_query_start($line);
2137     eval {
2138       $self->dbh->do($line); # shouldn't be using ->dbh ?
2139     };
2140     if ($@) {
2141       carp qq{$@ (running "${line}")};
2142     }
2143     $self->_query_end($line);
2144   };
2145   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2146   if (@statements > 1) {
2147     foreach my $statement (@statements) {
2148       $deploy->( $statement );
2149     }
2150   }
2151   elsif (@statements == 1) {
2152     foreach my $line ( split(";\n", $statements[0])) {
2153       $deploy->( $line );
2154     }
2155   }
2156 }
2157
2158 =head2 datetime_parser
2159
2160 Returns the datetime parser class
2161
2162 =cut
2163
2164 sub datetime_parser {
2165   my $self = shift;
2166   return $self->{datetime_parser} ||= do {
2167     $self->ensure_connected;
2168     $self->build_datetime_parser(@_);
2169   };
2170 }
2171
2172 =head2 datetime_parser_type
2173
2174 Defines (returns) the datetime parser class - currently hardwired to
2175 L<DateTime::Format::MySQL>
2176
2177 =cut
2178
2179 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2180
2181 =head2 build_datetime_parser
2182
2183 See L</datetime_parser>
2184
2185 =cut
2186
2187 sub build_datetime_parser {
2188   my $self = shift;
2189   my $type = $self->datetime_parser_type(@_);
2190   eval "use ${type}";
2191   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2192   return $type;
2193 }
2194
2195 {
2196     my $_check_sqlt_version; # private
2197     my $_check_sqlt_message; # private
2198     sub _check_sqlt_version {
2199         return $_check_sqlt_version if defined $_check_sqlt_version;
2200         eval 'use SQL::Translator "0.09003"';
2201         $_check_sqlt_message = $@ || '';
2202         $_check_sqlt_version = !$@;
2203     }
2204
2205     sub _check_sqlt_message {
2206         _check_sqlt_version if !defined $_check_sqlt_message;
2207         $_check_sqlt_message;
2208     }
2209 }
2210
2211 =head2 is_replicating
2212
2213 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2214 replicate from a master database.  Default is undef, which is the result
2215 returned by databases that don't support replication.
2216
2217 =cut
2218
2219 sub is_replicating {
2220     return;
2221
2222 }
2223
2224 =head2 lag_behind_master
2225
2226 Returns a number that represents a certain amount of lag behind a master db
2227 when a given storage is replicating.  The number is database dependent, but
2228 starts at zero and increases with the amount of lag. Default in undef
2229
2230 =cut
2231
2232 sub lag_behind_master {
2233     return;
2234 }
2235
2236 sub DESTROY {
2237   my $self = shift;
2238   return if !$self->_dbh;
2239   $self->_verify_pid;
2240   $self->_dbh(undef);
2241 }
2242
2243 1;
2244
2245 =head1 USAGE NOTES
2246
2247 =head2 DBIx::Class and AutoCommit
2248
2249 DBIx::Class can do some wonderful magic with handling exceptions,
2250 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2251 combined with C<txn_do> for transaction support.
2252
2253 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2254 in an assumed transaction between commits, and you're telling us you'd
2255 like to manage that manually.  A lot of the magic protections offered by
2256 this module will go away.  We can't protect you from exceptions due to database
2257 disconnects because we don't know anything about how to restart your
2258 transactions.  You're on your own for handling all sorts of exceptional
2259 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2260 be with raw DBI.
2261
2262
2263
2264 =head1 AUTHORS
2265
2266 Matt S. Trout <mst@shadowcatsystems.co.uk>
2267
2268 Andy Grundman <andy@hybridized.org>
2269
2270 =head1 LICENSE
2271
2272 You may distribute this code under the same terms as Perl itself.
2273
2274 =cut