749c7901d44a50c801382f47942497bc09dd91c5
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use Scalar::Util();
13 use List::Util();
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit _on_connect_do
18        _on_disconnect_do _on_connect_do_store _on_disconnect_do_store
19        savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call disable_sth_caching unsafe auto_savepoint
26 /;
27 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
28
29
30 # default cursor class, overridable in connect_info attributes
31 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
32
33 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
34 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
35
36
37 =head1 NAME
38
39 DBIx::Class::Storage::DBI - DBI storage handler
40
41 =head1 SYNOPSIS
42
43   my $schema = MySchema->connect('dbi:SQLite:my.db');
44
45   $schema->storage->debug(1);
46   $schema->dbh_do("DROP TABLE authors");
47
48   $schema->resultset('Book')->search({
49      written_on => $schema->storage->datetime_parser(DateTime->now)
50   });
51
52 =head1 DESCRIPTION
53
54 This class represents the connection to an RDBMS via L<DBI>.  See
55 L<DBIx::Class::Storage> for general information.  This pod only
56 documents DBI-specific methods and behaviors.
57
58 =head1 METHODS
59
60 =cut
61
62 sub new {
63   my $new = shift->next::method(@_);
64
65   $new->transaction_depth(0);
66   $new->_sql_maker_opts({});
67   $new->{savepoints} = [];
68   $new->{_in_dbh_do} = 0;
69   $new->{_dbh_gen} = 0;
70
71   $new;
72 }
73
74 =head2 connect_info
75
76 This method is normally called by L<DBIx::Class::Schema/connection>, which
77 encapsulates its argument list in an arrayref before passing them here.
78
79 The argument list may contain:
80
81 =over
82
83 =item *
84
85 The same 4-element argument set one would normally pass to
86 L<DBI/connect>, optionally followed by
87 L<extra attributes|/DBIx::Class specific connection attributes>
88 recognized by DBIx::Class:
89
90   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
91
92 =item *
93
94 A single code reference which returns a connected 
95 L<DBI database handle|DBI/connect> optionally followed by 
96 L<extra attributes|/DBIx::Class specific connection attributes> recognized
97 by DBIx::Class:
98
99   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
100
101 =item *
102
103 A single hashref with all the attributes and the dsn/user/password
104 mixed together:
105
106   $connect_info_args = [{
107     dsn => $dsn,
108     user => $user,
109     password => $pass,
110     %dbi_attributes,
111     %extra_attributes,
112   }];
113
114 This is particularly useful for L<Catalyst> based applications, allowing the 
115 following config (L<Config::General> style):
116
117   <Model::DB>
118     schema_class   App::DB
119     <connect_info>
120       dsn          dbi:mysql:database=test
121       user         testuser
122       password     TestPass
123       AutoCommit   1
124     </connect_info>
125   </Model::DB>
126
127 =back
128
129 Please note that the L<DBI> docs recommend that you always explicitly
130 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
131 recommends that it be set to I<1>, and that you perform transactions
132 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
133 to I<1> if you do not do explicitly set it to zero.  This is the default 
134 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
135
136 =head3 DBIx::Class specific connection attributes
137
138 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
139 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
140 the following connection options. These options can be mixed in with your other
141 L<DBI> connection attributes, or placed in a seperate hashref
142 (C<\%extra_attributes>) as shown above.
143
144 Every time C<connect_info> is invoked, any previous settings for
145 these options will be cleared before setting the new ones, regardless of
146 whether any options are specified in the new C<connect_info>.
147
148
149 =over
150
151 =item on_connect_do
152
153 Specifies things to do immediately after connecting or re-connecting to
154 the database.  Its value may contain:
155
156 =over
157
158 =item a scalar
159
160 This contains one SQL statement to execute.
161
162 =item an array reference
163
164 This contains SQL statements to execute in order.  Each element contains
165 a string or a code reference that returns a string.
166
167 =item a code reference
168
169 This contains some code to execute.  Unlike code references within an
170 array reference, its return value is ignored.
171
172 =back
173
174 =item on_disconnect_do
175
176 Takes arguments in the same form as L</on_connect_do> and executes them
177 immediately before disconnecting from the database.
178
179 Note, this only runs if you explicitly call L</disconnect> on the
180 storage object.
181
182 =item on_connect_call
183
184 A more generalized form of L</on_connect_do> that calls the specified
185 C<connect_call_METHOD> methods in your storage driver.
186
187   on_connect_do => 'select 1'
188
189 is equivalent to:
190
191   on_connect_call => [ [ do_sql => 'select 1' ] ]
192
193 Its values may contain:
194
195 =over
196
197 =item a scalar
198
199 Will call the C<connect_call_METHOD> method.
200
201 =item a code reference
202
203 Will execute C<< $code->($storage) >>
204
205 =item an array reference
206
207 Each value can be a method name or code reference.
208
209 =item an array of arrays
210
211 For each array, the first item is taken to be the C<connect_call_> method name
212 or code reference, and the rest are parameters to it.
213
214 =back
215
216 Some predefined storage methods you may use:
217
218 =over
219
220 =item do_sql
221
222 Executes a SQL string or a code reference that returns a SQL string. This is
223 what L</on_connect_do> and L</on_disconnect_do> use.
224
225 It can take:
226
227 =over
228
229 =item a scalar
230
231 Will execute the scalar as SQL.
232
233 =item an arrayref
234
235 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
236 attributes hashref and bind values.
237
238 =item a code reference
239
240 Will execute C<< $code->($storage) >> and execute the return array refs as
241 above.
242
243 =back
244
245 =item datetime_setup
246
247 Execute any statements necessary to initialize the database session to return
248 and accept datetime/timestamp values used with
249 L<DBIx::Class::InflateColumn::DateTime>.
250
251 Only necessary for some databases, see your specific storage driver for
252 implementation details.
253
254 =back
255
256 =item on_disconnect_call
257
258 Takes arguments in the same form as L</on_connect_call> and executes them
259 immediately before disconnecting from the database.
260
261 Calls the C<disconnect_call_METHOD> methods as opposed to the
262 C<connect_call_METHOD> methods called by L</on_connect_call>.
263
264 Note, this only runs if you explicitly call L</disconnect> on the
265 storage object.
266
267 =item disable_sth_caching
268
269 If set to a true value, this option will disable the caching of
270 statement handles via L<DBI/prepare_cached>.
271
272 =item limit_dialect 
273
274 Sets the limit dialect. This is useful for JDBC-bridge among others
275 where the remote SQL-dialect cannot be determined by the name of the
276 driver alone. See also L<SQL::Abstract::Limit>.
277
278 =item quote_char
279
280 Specifies what characters to use to quote table and column names. If 
281 you use this you will want to specify L</name_sep> as well.
282
283 C<quote_char> expects either a single character, in which case is it
284 is placed on either side of the table/column name, or an arrayref of length
285 2 in which case the table/column name is placed between the elements.
286
287 For example under MySQL you should use C<< quote_char => '`' >>, and for
288 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
289
290 =item name_sep
291
292 This only needs to be used in conjunction with C<quote_char>, and is used to 
293 specify the charecter that seperates elements (schemas, tables, columns) from 
294 each other. In most cases this is simply a C<.>.
295
296 The consequences of not supplying this value is that L<SQL::Abstract>
297 will assume DBIx::Class' uses of aliases to be complete column
298 names. The output will look like I<"me.name"> when it should actually
299 be I<"me"."name">.
300
301 =item unsafe
302
303 This Storage driver normally installs its own C<HandleError>, sets
304 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
305 all database handles, including those supplied by a coderef.  It does this
306 so that it can have consistent and useful error behavior.
307
308 If you set this option to a true value, Storage will not do its usual
309 modifications to the database handle's attributes, and instead relies on
310 the settings in your connect_info DBI options (or the values you set in
311 your connection coderef, in the case that you are connecting via coderef).
312
313 Note that your custom settings can cause Storage to malfunction,
314 especially if you set a C<HandleError> handler that suppresses exceptions
315 and/or disable C<RaiseError>.
316
317 =item auto_savepoint
318
319 If this option is true, L<DBIx::Class> will use savepoints when nesting
320 transactions, making it possible to recover from failure in the inner
321 transaction without having to abort all outer transactions.
322
323 =item cursor_class
324
325 Use this argument to supply a cursor class other than the default
326 L<DBIx::Class::Storage::DBI::Cursor>.
327
328 =back
329
330 Some real-life examples of arguments to L</connect_info> and
331 L<DBIx::Class::Schema/connect>
332
333   # Simple SQLite connection
334   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
335
336   # Connect via subref
337   ->connect_info([ sub { DBI->connect(...) } ]);
338
339   # A bit more complicated
340   ->connect_info(
341     [
342       'dbi:Pg:dbname=foo',
343       'postgres',
344       'my_pg_password',
345       { AutoCommit => 1 },
346       { quote_char => q{"}, name_sep => q{.} },
347     ]
348   );
349
350   # Equivalent to the previous example
351   ->connect_info(
352     [
353       'dbi:Pg:dbname=foo',
354       'postgres',
355       'my_pg_password',
356       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
357     ]
358   );
359
360   # Same, but with hashref as argument
361   # See parse_connect_info for explanation
362   ->connect_info(
363     [{
364       dsn         => 'dbi:Pg:dbname=foo',
365       user        => 'postgres',
366       password    => 'my_pg_password',
367       AutoCommit  => 1,
368       quote_char  => q{"},
369       name_sep    => q{.},
370     }]
371   );
372
373   # Subref + DBIx::Class-specific connection options
374   ->connect_info(
375     [
376       sub { DBI->connect(...) },
377       {
378           quote_char => q{`},
379           name_sep => q{@},
380           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
381           disable_sth_caching => 1,
382       },
383     ]
384   );
385
386
387
388 =cut
389
390 sub connect_info {
391   my ($self, $info_arg) = @_;
392
393   return $self->_connect_info if !$info_arg;
394
395   my @args = @$info_arg;  # take a shallow copy for further mutilation
396   $self->_connect_info([@args]); # copy for _connect_info
397
398
399   # combine/pre-parse arguments depending on invocation style
400
401   my %attrs;
402   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
403     %attrs = %{ $args[1] || {} };
404     @args = $args[0];
405   }
406   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
407     %attrs = %{$args[0]};
408     @args = ();
409     for (qw/password user dsn/) {
410       unshift @args, delete $attrs{$_};
411     }
412   }
413   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
414     %attrs = (
415       % { $args[3] || {} },
416       % { $args[4] || {} },
417     );
418     @args = @args[0,1,2];
419   }
420
421   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
422   #  the new set of options
423   $self->_sql_maker(undef);
424   $self->_sql_maker_opts({});
425
426   if(keys %attrs) {
427     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
428       if(my $value = delete $attrs{$storage_opt}) {
429         $self->$storage_opt($value);
430       }
431     }
432     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
433       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
434         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
435       }
436     }
437     for my $connect_do_opt (qw/on_connect_do on_disconnect_do/) {
438       if(my $opt_val = delete $attrs{$connect_do_opt}) {
439         $self->$connect_do_opt($opt_val);
440       }
441     }
442   }
443
444   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
445
446   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
447   $self->_connect_info;
448 }
449
450 =head2 on_connect_do
451
452 This method is deprecated in favour of setting via L</connect_info>.
453
454 =cut
455
456 sub on_connect_do {
457   my $self = shift;
458   $self->_setup_connect_do(on_connect_do => @_);
459 }
460
461 =head2 on_disconnect_do
462
463 This method is deprecated in favour of setting via L</connect_info>.
464
465 =cut
466
467 sub on_disconnect_do {
468   my $self = shift;
469   $self->_setup_connect_do(on_disconnect_do => @_);
470 }
471
472 sub _setup_connect_do {
473   my ($self, $opt) = (shift, shift);
474
475   my $accessor = "_$opt";
476   my $store    = "_${opt}_store";
477
478   return $self->$accessor if not @_;
479
480   my $val = shift;
481
482   if (not defined $val) {
483     $self->$accessor(undef);
484     $self->$store(undef);
485     return;
486   }
487
488   my @store;
489
490   if (not ref($val)) {
491     push @store, [ 'do_sql', $val ];
492   } elsif (ref($val) eq 'CODE') {
493     push @store, $val;
494   } elsif (ref($val) eq 'ARRAY') {
495     push @store, map [ 'do_sql', $_ ], @$val;
496   } else {
497     $self->throw_exception("Invalid type for $opt ".ref($val));
498   }
499
500   $self->$store(\@store);
501   $self->$accessor($val);
502 }
503
504 =head2 dbh_do
505
506 Arguments: ($subref | $method_name), @extra_coderef_args?
507
508 Execute the given $subref or $method_name using the new exception-based
509 connection management.
510
511 The first two arguments will be the storage object that C<dbh_do> was called
512 on and a database handle to use.  Any additional arguments will be passed
513 verbatim to the called subref as arguments 2 and onwards.
514
515 Using this (instead of $self->_dbh or $self->dbh) ensures correct
516 exception handling and reconnection (or failover in future subclasses).
517
518 Your subref should have no side-effects outside of the database, as
519 there is the potential for your subref to be partially double-executed
520 if the database connection was stale/dysfunctional.
521
522 Example:
523
524   my @stuff = $schema->storage->dbh_do(
525     sub {
526       my ($storage, $dbh, @cols) = @_;
527       my $cols = join(q{, }, @cols);
528       $dbh->selectrow_array("SELECT $cols FROM foo");
529     },
530     @column_list
531   );
532
533 =cut
534
535 sub dbh_do {
536   my $self = shift;
537   my $code = shift;
538
539   my $dbh = $self->_dbh;
540
541   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
542       || $self->{transaction_depth};
543
544   local $self->{_in_dbh_do} = 1;
545
546   my @result;
547   my $want_array = wantarray;
548
549   eval {
550     $self->_verify_pid if $dbh;
551     if(!$self->_dbh) {
552         $self->_populate_dbh;
553         $dbh = $self->_dbh;
554     }
555
556     if($want_array) {
557         @result = $self->$code($dbh, @_);
558     }
559     elsif(defined $want_array) {
560         $result[0] = $self->$code($dbh, @_);
561     }
562     else {
563         $self->$code($dbh, @_);
564     }
565   };
566
567   my $exception = $@;
568   if(!$exception) { return $want_array ? @result : $result[0] }
569
570   $self->throw_exception($exception) if $self->connected;
571
572   # We were not connected - reconnect and retry, but let any
573   #  exception fall right through this time
574   $self->_populate_dbh;
575   $self->$code($self->_dbh, @_);
576 }
577
578 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
579 # It also informs dbh_do to bypass itself while under the direction of txn_do,
580 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
581 sub txn_do {
582   my $self = shift;
583   my $coderef = shift;
584
585   ref $coderef eq 'CODE' or $self->throw_exception
586     ('$coderef must be a CODE reference');
587
588   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
589
590   local $self->{_in_dbh_do} = 1;
591
592   my @result;
593   my $want_array = wantarray;
594
595   my $tried = 0;
596   while(1) {
597     eval {
598       $self->_verify_pid if $self->_dbh;
599       $self->_populate_dbh if !$self->_dbh;
600
601       $self->txn_begin;
602       if($want_array) {
603           @result = $coderef->(@_);
604       }
605       elsif(defined $want_array) {
606           $result[0] = $coderef->(@_);
607       }
608       else {
609           $coderef->(@_);
610       }
611       $self->txn_commit;
612     };
613
614     my $exception = $@;
615     if(!$exception) { return $want_array ? @result : $result[0] }
616
617     if($tried++ > 0 || $self->connected) {
618       eval { $self->txn_rollback };
619       my $rollback_exception = $@;
620       if($rollback_exception) {
621         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
622         $self->throw_exception($exception)  # propagate nested rollback
623           if $rollback_exception =~ /$exception_class/;
624
625         $self->throw_exception(
626           "Transaction aborted: ${exception}. "
627           . "Rollback failed: ${rollback_exception}"
628         );
629       }
630       $self->throw_exception($exception)
631     }
632
633     # We were not connected, and was first try - reconnect and retry
634     # via the while loop
635     $self->_populate_dbh;
636   }
637 }
638
639 =head2 disconnect
640
641 Our C<disconnect> method also performs a rollback first if the
642 database is not in C<AutoCommit> mode.
643
644 =cut
645
646 sub disconnect {
647   my ($self) = @_;
648
649   if( $self->connected ) {
650     if (my $connection_call = $self->on_disconnect_call) {
651       $self->_do_connection_actions(disconnect_call_ => $connection_call)
652     }
653     if (my $connection_do   = $self->_on_disconnect_do_store) {
654       $self->_do_connection_actions(disconnect_call_ => $connection_do)
655     }
656
657     $self->_dbh->rollback unless $self->_dbh_autocommit;
658     $self->_dbh->disconnect;
659     $self->_dbh(undef);
660     $self->{_dbh_gen}++;
661   }
662 }
663
664 =head2 with_deferred_fk_checks
665
666 =over 4
667
668 =item Arguments: C<$coderef>
669
670 =item Return Value: The return value of $coderef
671
672 =back
673
674 Storage specific method to run the code ref with FK checks deferred or
675 in MySQL's case disabled entirely.
676
677 =cut
678
679 # Storage subclasses should override this
680 sub with_deferred_fk_checks {
681   my ($self, $sub) = @_;
682
683   $sub->();
684 }
685
686 sub connected {
687   my ($self) = @_;
688
689   if(my $dbh = $self->_dbh) {
690       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
691           $self->_dbh(undef);
692           $self->{_dbh_gen}++;
693           return;
694       }
695       else {
696           $self->_verify_pid;
697           return 0 if !$self->_dbh;
698       }
699       return ($dbh->FETCH('Active') && $dbh->ping);
700   }
701
702   return 0;
703 }
704
705 # handle pid changes correctly
706 #  NOTE: assumes $self->_dbh is a valid $dbh
707 sub _verify_pid {
708   my ($self) = @_;
709
710   return if defined $self->_conn_pid && $self->_conn_pid == $$;
711
712   $self->_dbh->{InactiveDestroy} = 1;
713   $self->_dbh(undef);
714   $self->{_dbh_gen}++;
715
716   return;
717 }
718
719 sub ensure_connected {
720   my ($self) = @_;
721
722   unless ($self->connected) {
723     $self->_populate_dbh;
724   }
725 }
726
727 =head2 dbh
728
729 Returns the dbh - a data base handle of class L<DBI>.
730
731 =cut
732
733 sub dbh {
734   my ($self) = @_;
735
736   $self->ensure_connected;
737   return $self->_dbh;
738 }
739
740 sub _sql_maker_args {
741     my ($self) = @_;
742     
743     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
744 }
745
746 sub sql_maker {
747   my ($self) = @_;
748   unless ($self->_sql_maker) {
749     my $sql_maker_class = $self->sql_maker_class;
750     $self->ensure_class_loaded ($sql_maker_class);
751     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
752   }
753   return $self->_sql_maker;
754 }
755
756 sub _rebless {}
757
758 sub _populate_dbh {
759   my ($self) = @_;
760   my @info = @{$self->_dbi_connect_info || []};
761   $self->_dbh($self->_connect(@info));
762
763   $self->_conn_pid($$);
764   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
765
766   $self->_determine_driver;
767
768   # Always set the transaction depth on connect, since
769   #  there is no transaction in progress by definition
770   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
771
772   if (my $connection_call = $self->on_connect_call) {
773     $self->_do_connection_actions(connect_call_ => $connection_call)
774   }
775   if (my $connection_do = $self->_on_connect_do_store) {
776     $self->_do_connection_actions(connect_call_ => $connection_do)
777   }
778 }
779
780 sub _determine_driver {
781   my ($self) = @_;
782
783   if (ref $self eq 'DBIx::Class::Storage::DBI') {
784     my $driver;
785
786     if ($self->_dbh) { # we are connected
787       $driver = $self->_dbh->{Driver}{Name};
788     } else {
789       # try to use dsn to not require being connected, the driver may still
790       # force a connection in _rebless to determine version
791       ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
792     }
793
794     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
795       bless $self, "DBIx::Class::Storage::DBI::${driver}";
796       $self->_rebless();
797     }
798   }
799 }
800
801 sub _do_connection_actions {
802   my $self          = shift;
803   my $method_prefix = shift;
804   my $call          = shift;
805
806   if (not ref($call)) {
807     my $method = $method_prefix . $call;
808     $self->$method(@_);
809   } elsif (ref($call) eq 'CODE') {
810     $self->$call(@_);
811   } elsif (ref($call) eq 'ARRAY') {
812     if (ref($call->[0]) ne 'ARRAY') {
813       $self->_do_connection_actions($method_prefix, $_) for @$call;
814     } else {
815       $self->_do_connection_actions($method_prefix, @$_) for @$call;
816     }
817   } else {
818     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
819   }
820
821   return $self;
822 }
823
824 sub connect_call_do_sql {
825   my $self = shift;
826   $self->_do_query(@_);
827 }
828
829 sub disconnect_call_do_sql {
830   my $self = shift;
831   $self->_do_query(@_);
832 }
833
834 # override in db-specific backend when necessary
835 sub connect_call_datetime_setup { 1 }
836
837 sub _do_query {
838   my ($self, $action) = @_;
839
840   if (ref $action eq 'CODE') {
841     $action = $action->($self);
842     $self->_do_query($_) foreach @$action;
843   }
844   else {
845     # Most debuggers expect ($sql, @bind), so we need to exclude
846     # the attribute hash which is the second argument to $dbh->do
847     # furthermore the bind values are usually to be presented
848     # as named arrayref pairs, so wrap those here too
849     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
850     my $sql = shift @do_args;
851     my $attrs = shift @do_args;
852     my @bind = map { [ undef, $_ ] } @do_args;
853
854     $self->_query_start($sql, @bind);
855     $self->_dbh->do($sql, $attrs, @do_args);
856     $self->_query_end($sql, @bind);
857   }
858
859   return $self;
860 }
861
862 sub _connect {
863   my ($self, @info) = @_;
864
865   $self->throw_exception("You failed to provide any connection info")
866     if !@info;
867
868   my ($old_connect_via, $dbh);
869
870   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
871     $old_connect_via = $DBI::connect_via;
872     $DBI::connect_via = 'connect';
873   }
874
875   eval {
876     if(ref $info[0] eq 'CODE') {
877        $dbh = &{$info[0]}
878     }
879     else {
880        $dbh = DBI->connect(@info);
881     }
882
883     if($dbh && !$self->unsafe) {
884       my $weak_self = $self;
885       Scalar::Util::weaken($weak_self);
886       $dbh->{HandleError} = sub {
887           if ($weak_self) {
888             $weak_self->throw_exception("DBI Exception: $_[0]");
889           }
890           else {
891             croak ("DBI Exception: $_[0]");
892           }
893       };
894       $dbh->{ShowErrorStatement} = 1;
895       $dbh->{RaiseError} = 1;
896       $dbh->{PrintError} = 0;
897     }
898   };
899
900   $DBI::connect_via = $old_connect_via if $old_connect_via;
901
902   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
903     if !$dbh || $@;
904
905   $self->_dbh_autocommit($dbh->{AutoCommit});
906
907   $dbh;
908 }
909
910 sub svp_begin {
911   my ($self, $name) = @_;
912
913   $name = $self->_svp_generate_name
914     unless defined $name;
915
916   $self->throw_exception ("You can't use savepoints outside a transaction")
917     if $self->{transaction_depth} == 0;
918
919   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
920     unless $self->can('_svp_begin');
921   
922   push @{ $self->{savepoints} }, $name;
923
924   $self->debugobj->svp_begin($name) if $self->debug;
925   
926   return $self->_svp_begin($name);
927 }
928
929 sub svp_release {
930   my ($self, $name) = @_;
931
932   $self->throw_exception ("You can't use savepoints outside a transaction")
933     if $self->{transaction_depth} == 0;
934
935   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
936     unless $self->can('_svp_release');
937
938   if (defined $name) {
939     $self->throw_exception ("Savepoint '$name' does not exist")
940       unless grep { $_ eq $name } @{ $self->{savepoints} };
941
942     # Dig through the stack until we find the one we are releasing.  This keeps
943     # the stack up to date.
944     my $svp;
945
946     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
947   } else {
948     $name = pop @{ $self->{savepoints} };
949   }
950
951   $self->debugobj->svp_release($name) if $self->debug;
952
953   return $self->_svp_release($name);
954 }
955
956 sub svp_rollback {
957   my ($self, $name) = @_;
958
959   $self->throw_exception ("You can't use savepoints outside a transaction")
960     if $self->{transaction_depth} == 0;
961
962   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
963     unless $self->can('_svp_rollback');
964
965   if (defined $name) {
966       # If they passed us a name, verify that it exists in the stack
967       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
968           $self->throw_exception("Savepoint '$name' does not exist!");
969       }
970
971       # Dig through the stack until we find the one we are releasing.  This keeps
972       # the stack up to date.
973       while(my $s = pop(@{ $self->{savepoints} })) {
974           last if($s eq $name);
975       }
976       # Add the savepoint back to the stack, as a rollback doesn't remove the
977       # named savepoint, only everything after it.
978       push(@{ $self->{savepoints} }, $name);
979   } else {
980       # We'll assume they want to rollback to the last savepoint
981       $name = $self->{savepoints}->[-1];
982   }
983
984   $self->debugobj->svp_rollback($name) if $self->debug;
985   
986   return $self->_svp_rollback($name);
987 }
988
989 sub _svp_generate_name {
990     my ($self) = @_;
991
992     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
993 }
994
995 sub txn_begin {
996   my $self = shift;
997   $self->ensure_connected();
998   if($self->{transaction_depth} == 0) {
999     $self->debugobj->txn_begin()
1000       if $self->debug;
1001     # this isn't ->_dbh-> because
1002     #  we should reconnect on begin_work
1003     #  for AutoCommit users
1004     $self->dbh->begin_work;
1005   } elsif ($self->auto_savepoint) {
1006     $self->svp_begin;
1007   }
1008   $self->{transaction_depth}++;
1009 }
1010
1011 sub txn_commit {
1012   my $self = shift;
1013   if ($self->{transaction_depth} == 1) {
1014     my $dbh = $self->_dbh;
1015     $self->debugobj->txn_commit()
1016       if ($self->debug);
1017     $dbh->commit;
1018     $self->{transaction_depth} = 0
1019       if $self->_dbh_autocommit;
1020   }
1021   elsif($self->{transaction_depth} > 1) {
1022     $self->{transaction_depth}--;
1023     $self->svp_release
1024       if $self->auto_savepoint;
1025   }
1026 }
1027
1028 sub txn_rollback {
1029   my $self = shift;
1030   my $dbh = $self->_dbh;
1031   eval {
1032     if ($self->{transaction_depth} == 1) {
1033       $self->debugobj->txn_rollback()
1034         if ($self->debug);
1035       $self->{transaction_depth} = 0
1036         if $self->_dbh_autocommit;
1037       $dbh->rollback;
1038     }
1039     elsif($self->{transaction_depth} > 1) {
1040       $self->{transaction_depth}--;
1041       if ($self->auto_savepoint) {
1042         $self->svp_rollback;
1043         $self->svp_release;
1044       }
1045     }
1046     else {
1047       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1048     }
1049   };
1050   if ($@) {
1051     my $error = $@;
1052     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1053     $error =~ /$exception_class/ and $self->throw_exception($error);
1054     # ensure that a failed rollback resets the transaction depth
1055     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1056     $self->throw_exception($error);
1057   }
1058 }
1059
1060 # This used to be the top-half of _execute.  It was split out to make it
1061 #  easier to override in NoBindVars without duping the rest.  It takes up
1062 #  all of _execute's args, and emits $sql, @bind.
1063 sub _prep_for_execute {
1064   my ($self, $op, $extra_bind, $ident, $args) = @_;
1065
1066   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1067     $ident = $ident->from();
1068   }
1069
1070   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1071
1072   unshift(@bind,
1073     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1074       if $extra_bind;
1075   return ($sql, \@bind);
1076 }
1077
1078
1079 sub _fix_bind_params {
1080     my ($self, @bind) = @_;
1081
1082     ### Turn @bind from something like this:
1083     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1084     ### to this:
1085     ###   ( "'1'", "'1'", "'3'" )
1086     return
1087         map {
1088             if ( defined( $_ && $_->[1] ) ) {
1089                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1090             }
1091             else { q{'NULL'}; }
1092         } @bind;
1093 }
1094
1095 sub _query_start {
1096     my ( $self, $sql, @bind ) = @_;
1097
1098     if ( $self->debug ) {
1099         @bind = $self->_fix_bind_params(@bind);
1100
1101         $self->debugobj->query_start( $sql, @bind );
1102     }
1103 }
1104
1105 sub _query_end {
1106     my ( $self, $sql, @bind ) = @_;
1107
1108     if ( $self->debug ) {
1109         @bind = $self->_fix_bind_params(@bind);
1110         $self->debugobj->query_end( $sql, @bind );
1111     }
1112 }
1113
1114 sub _dbh_execute {
1115   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1116
1117   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1118
1119   $self->_query_start( $sql, @$bind );
1120
1121   my $sth = $self->sth($sql,$op);
1122
1123   my $placeholder_index = 1; 
1124
1125   foreach my $bound (@$bind) {
1126     my $attributes = {};
1127     my($column_name, @data) = @$bound;
1128
1129     if ($bind_attributes) {
1130       $attributes = $bind_attributes->{$column_name}
1131       if defined $bind_attributes->{$column_name};
1132     }
1133
1134     foreach my $data (@data) {
1135       my $ref = ref $data;
1136       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1137
1138       $sth->bind_param($placeholder_index, $data, $attributes);
1139       $placeholder_index++;
1140     }
1141   }
1142
1143   # Can this fail without throwing an exception anyways???
1144   my $rv = $sth->execute();
1145   $self->throw_exception($sth->errstr) if !$rv;
1146
1147   $self->_query_end( $sql, @$bind );
1148
1149   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1150 }
1151
1152 sub _execute {
1153     my $self = shift;
1154     $self->dbh_do('_dbh_execute', @_)
1155 }
1156
1157 sub insert {
1158   my ($self, $source, $to_insert) = @_;
1159
1160   my $ident = $source->from;
1161   my $bind_attributes = $self->source_bind_attributes($source);
1162
1163   my $updated_cols = {};
1164
1165   $self->ensure_connected;
1166   foreach my $col ( $source->columns ) {
1167     if ( !defined $to_insert->{$col} ) {
1168       my $col_info = $source->column_info($col);
1169
1170       if ( $col_info->{auto_nextval} ) {
1171         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1172       }
1173     }
1174   }
1175
1176   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1177
1178   return $updated_cols;
1179 }
1180
1181 ## Still not quite perfect, and EXPERIMENTAL
1182 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1183 ## scalar refs, or at least, all the same type as the first set, the statement is
1184 ## only prepped once.
1185 sub insert_bulk {
1186   my ($self, $source, $cols, $data) = @_;
1187   my %colvalues;
1188   my $table = $source->from;
1189   @colvalues{@$cols} = (0..$#$cols);
1190   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1191   
1192   $self->_query_start( $sql, @bind );
1193   my $sth = $self->sth($sql);
1194
1195 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1196
1197   ## This must be an arrayref, else nothing works!
1198   my $tuple_status = [];
1199
1200   ## Get the bind_attributes, if any exist
1201   my $bind_attributes = $self->source_bind_attributes($source);
1202
1203   ## Bind the values and execute
1204   my $placeholder_index = 1; 
1205
1206   foreach my $bound (@bind) {
1207
1208     my $attributes = {};
1209     my ($column_name, $data_index) = @$bound;
1210
1211     if( $bind_attributes ) {
1212       $attributes = $bind_attributes->{$column_name}
1213       if defined $bind_attributes->{$column_name};
1214     }
1215
1216     my @data = map { $_->[$data_index] } @$data;
1217
1218     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1219     $placeholder_index++;
1220   }
1221   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1222   if (my $err = $@) {
1223     my $i = 0;
1224     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1225
1226     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1227       if ($i > $#$tuple_status);
1228
1229     require Data::Dumper;
1230     local $Data::Dumper::Terse = 1;
1231     local $Data::Dumper::Indent = 1;
1232     local $Data::Dumper::Useqq = 1;
1233     local $Data::Dumper::Quotekeys = 0;
1234
1235     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1236       $tuple_status->[$i][1],
1237       Data::Dumper::Dumper(
1238         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1239       ),
1240     );
1241   }
1242   $self->throw_exception($sth->errstr) if !$rv;
1243
1244   $self->_query_end( $sql, @bind );
1245   return (wantarray ? ($rv, $sth, @bind) : $rv);
1246 }
1247
1248 sub update {
1249   my $self = shift @_;
1250   my $source = shift @_;
1251   my $bind_attributes = $self->source_bind_attributes($source);
1252   
1253   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1254 }
1255
1256
1257 sub delete {
1258   my $self = shift @_;
1259   my $source = shift @_;
1260   
1261   my $bind_attrs = $self->source_bind_attributes($source);
1262   
1263   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1264 }
1265
1266 # We were sent here because the $rs contains a complex search
1267 # which will require a subquery to select the correct rows
1268 # (i.e. joined or limited resultsets)
1269 #
1270 # Genarating a single PK column subquery is trivial and supported
1271 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1272 # Look at _multipk_update_delete()
1273 sub _subq_update_delete {
1274   my $self = shift;
1275   my ($rs, $op, $values) = @_;
1276
1277   my $rsrc = $rs->result_source;
1278
1279   # we already check this, but double check naively just in case. Should be removed soon
1280   my $sel = $rs->_resolved_attrs->{select};
1281   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1282   my @pcols = $rsrc->primary_columns;
1283   if (@$sel != @pcols) {
1284     $self->throw_exception (
1285       'Subquery update/delete can not be called on resultsets selecting a'
1286      .' number of columns different than the number of primary keys'
1287     );
1288   }
1289
1290   if (@pcols == 1) {
1291     return $self->$op (
1292       $rsrc,
1293       $op eq 'update' ? $values : (),
1294       { $pcols[0] => { -in => $rs->as_query } },
1295     );
1296   }
1297
1298   else {
1299     return $self->_multipk_update_delete (@_);
1300   }
1301 }
1302
1303 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1304 # resultset update/delete involving subqueries. So by default resort
1305 # to simple (and inefficient) delete_all style per-row opearations,
1306 # while allowing specific storages to override this with a faster
1307 # implementation.
1308 #
1309 sub _multipk_update_delete {
1310   return shift->_per_row_update_delete (@_);
1311 }
1312
1313 # This is the default loop used to delete/update rows for multi PK
1314 # resultsets, and used by mysql exclusively (because it can't do anything
1315 # else).
1316 #
1317 # We do not use $row->$op style queries, because resultset update/delete
1318 # is not expected to cascade (this is what delete_all/update_all is for).
1319 #
1320 # There should be no race conditions as the entire operation is rolled
1321 # in a transaction.
1322 #
1323 sub _per_row_update_delete {
1324   my $self = shift;
1325   my ($rs, $op, $values) = @_;
1326
1327   my $rsrc = $rs->result_source;
1328   my @pcols = $rsrc->primary_columns;
1329
1330   my $guard = $self->txn_scope_guard;
1331
1332   # emulate the return value of $sth->execute for non-selects
1333   my $row_cnt = '0E0';
1334
1335   my $subrs_cur = $rs->cursor;
1336   while (my @pks = $subrs_cur->next) {
1337
1338     my $cond;
1339     for my $i (0.. $#pcols) {
1340       $cond->{$pcols[$i]} = $pks[$i];
1341     }
1342
1343     $self->$op (
1344       $rsrc,
1345       $op eq 'update' ? $values : (),
1346       $cond,
1347     );
1348
1349     $row_cnt++;
1350   }
1351
1352   $guard->commit;
1353
1354   return $row_cnt;
1355 }
1356
1357 sub _select {
1358   my $self = shift;
1359
1360   # localization is neccessary as
1361   # 1) there is no infrastructure to pass this around (easy to do, but will wait)
1362   # 2) _select_args sets it and _prep_for_execute consumes it
1363   my $sql_maker = $self->sql_maker;
1364   local $sql_maker->{for};
1365
1366   return $self->_execute($self->_select_args(@_));
1367 }
1368
1369 sub _select_args_to_query {
1370   my $self = shift;
1371
1372   # localization is neccessary as
1373   # 1) there is no infrastructure to pass this around (easy to do, but will wait)
1374   # 2) _select_args sets it and _prep_for_execute consumes it
1375   my $sql_maker = $self->sql_maker;
1376   local $sql_maker->{for};
1377
1378   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1379   #  = $self->_select_args($ident, $select, $cond, $attrs);
1380   my ($op, $bind, $ident, $bind_attrs, @args) =
1381     $self->_select_args(@_);
1382
1383   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1384   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1385   $prepared_bind ||= [];
1386
1387   return wantarray
1388     ? ($sql, $prepared_bind, $bind_attrs)
1389     : \[ "($sql)", @$prepared_bind ]
1390   ;
1391 }
1392
1393 sub _select_args {
1394   my ($self, $ident, $select, $where, $attrs) = @_;
1395
1396   my $sql_maker = $self->sql_maker;
1397   my $alias2source = $self->_resolve_ident_sources ($ident);
1398
1399   # calculate bind_attrs before possible $ident mangling
1400   my $bind_attrs = {};
1401   for my $alias (keys %$alias2source) {
1402     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1403     for my $col (keys %$bindtypes) {
1404
1405       my $fqcn = join ('.', $alias, $col);
1406       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1407
1408       # so that unqualified searches can be bound too
1409       $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq 'me';
1410     }
1411   }
1412
1413   my @limit;
1414   if ($attrs->{software_limit} ||
1415       $sql_maker->_default_limit_syntax eq "GenericSubQ") {
1416         $attrs->{software_limit} = 1;
1417   } else {
1418     $self->throw_exception("rows attribute must be positive if present")
1419       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1420
1421     # MySQL actually recommends this approach.  I cringe.
1422     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1423
1424     if ($attrs->{rows} && keys %{$attrs->{collapse}}) {
1425       ($ident, $select, $where, $attrs)
1426         = $self->_adjust_select_args_for_limited_prefetch ($ident, $select, $where, $attrs);
1427     }
1428     else {
1429       push @limit, $attrs->{rows}, $attrs->{offset};
1430     }
1431   }
1432
1433 ###
1434   # This would be the point to deflate anything found in $where
1435   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1436   # expect a row object. And all we have is a resultsource (it is trivial
1437   # to extract deflator coderefs via $alias2source above).
1438   #
1439   # I don't see a way forward other than changing the way deflators are
1440   # invoked, and that's just bad...
1441 ###
1442
1443   my $order = { map
1444     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1445     (qw/order_by group_by having _virtual_order_by/ )
1446   };
1447
1448
1449   $sql_maker->{for} = delete $attrs->{for};
1450
1451   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1452 }
1453
1454 sub _adjust_select_args_for_limited_prefetch {
1455   my ($self, $from, $select, $where, $attrs) = @_;
1456
1457   if ($attrs->{group_by} and @{$attrs->{group_by}}) {
1458     $self->throw_exception ('Prefetch with limit (rows/offset) is not supported on resultsets with a group_by attribute');
1459   }
1460
1461   $self->throw_exception ('Prefetch with limit (rows/offset) is not supported on resultsets with a custom from attribute')
1462     if (ref $from ne 'ARRAY');
1463
1464
1465   # separate attributes
1466   my $sub_attrs = { %$attrs };
1467   delete $attrs->{$_} for qw/where bind rows offset/;
1468   delete $sub_attrs->{$_} for qw/for collapse select order_by/;
1469
1470   my $alias = $attrs->{alias};
1471
1472   # create subquery select list
1473   my $sub_select = [ grep { $_ =~ /^$alias\./ } @{$attrs->{select}} ];
1474
1475   # bring over all non-collapse-induced order_by into the inner query (if any)
1476   # the outer one will have to keep them all
1477   if (my $ord_cnt = @{$attrs->{order_by}} - @{$attrs->{_collapse_order_by}} ) {
1478     $sub_attrs->{order_by} = [
1479       @{$attrs->{order_by}}[ 0 .. ($#{$attrs->{order_by}} - $ord_cnt - 1) ]
1480     ];
1481   }
1482
1483   # mangle {from}
1484   $from = [ @$from ];
1485   my $select_root = shift @$from;
1486   my @outer_from = @$from;
1487
1488   my %inner_joins;
1489   my %join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1490
1491   # in complex search_related chains $alias may *not* be 'me'
1492   # so always include it in the inner join, and also shift away
1493   # from the outer stack, so that the two datasets actually do
1494   # meet
1495   if ($select_root->{-alias} ne $alias) {
1496     $inner_joins{$alias} = 1;
1497
1498     while (@outer_from && $outer_from[0][0]{-alias} ne $alias) {
1499       shift @outer_from;
1500     }
1501     if (! @outer_from) {
1502       $self->throw_exception ("Unable to find '$alias' in the {from} stack, something is wrong");
1503     }
1504
1505     shift @outer_from; # the new subquery will represent this alias, so get rid of it
1506   }
1507
1508
1509   # decide which parts of the join will remain on the inside
1510   #
1511   # this is not a very viable optimisation, but it was written
1512   # before I realised this, so might as well remain. We can throw
1513   # away _any_ branches of the join tree that are:
1514   # 1) not mentioned in the condition/order
1515   # 2) left-join leaves (or left-join leaf chains)
1516   # Most of the join ocnditions will not satisfy this, but for real
1517   # complex queries some might, and we might make some RDBMS happy.
1518   #
1519   #
1520   # since we do not have introspectable SQLA, we fall back to ugly
1521   # scanning of raw SQL for WHERE, and for pieces of ORDER BY
1522   # in order to determine what goes into %inner_joins
1523   # It may not be very efficient, but it's a reasonable stop-gap
1524   {
1525     # produce stuff unquoted, so it can be scanned
1526     my $sql_maker = $self->sql_maker;
1527     local $sql_maker->{quote_char};
1528
1529     my @order_by = (map
1530       { ref $_ ? $_->[0] : $_ }
1531       $sql_maker->_order_by_chunks ($sub_attrs->{order_by})
1532     );
1533
1534     my $where_sql = $sql_maker->where ($where);
1535
1536     # sort needed joins
1537     for my $alias (keys %join_info) {
1538
1539       # any table alias found on a column name in where or order_by
1540       # gets included in %inner_joins
1541       # Also any parent joins that are needed to reach this particular alias
1542       for my $piece ($where_sql, @order_by ) {
1543         if ($piece =~ /\b$alias\./) {
1544           $inner_joins{$alias} = 1;
1545         }
1546       }
1547     }
1548   }
1549
1550   # scan for non-leaf/non-left joins and mark as needed
1551   # also mark all ancestor joins that are needed to reach this particular alias
1552   # (e.g.  join => { cds => 'tracks' } - tracks will bring cds too )
1553   #
1554   # traverse by the size of the -join_path i.e. reverse depth first
1555   for my $alias (sort { @{$join_info{$b}{-join_path}} <=> @{$join_info{$a}{-join_path}} } (keys %join_info) ) {
1556
1557     my $j = $join_info{$alias};
1558     $inner_joins{$alias} = 1 if (! $j->{-join_type} || ($j->{-join_type} !~ /^left$/i) );
1559
1560     if ($inner_joins{$alias}) {
1561       $inner_joins{$_} = 1 for (@{$j->{-join_path}});
1562     }
1563   }
1564
1565   # construct the inner $from for the subquery
1566   my $inner_from = [ $select_root ];
1567   for my $j (@$from) {
1568     push @$inner_from, $j if $inner_joins{$j->[0]{-alias}};
1569   }
1570
1571   # if a multi-type join was needed in the subquery ("multi" is indicated by
1572   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1573
1574   for my $alias (keys %inner_joins) {
1575
1576     # the dot comes from some weirdness in collapse
1577     # remove after the rewrite
1578     if ($attrs->{collapse}{".$alias"}) {
1579       $sub_attrs->{group_by} = $sub_select;
1580       last;
1581     }
1582   }
1583
1584   # generate the subquery
1585   my $subq = $self->_select_args_to_query (
1586     $inner_from,
1587     $sub_select,
1588     $where,
1589     $sub_attrs
1590   );
1591
1592   # put it in the new {from}
1593   unshift @outer_from, { $alias => $subq };
1594
1595   # This is totally horrific - the $where ends up in both the inner and outer query
1596   # Unfortunately not much can be done until SQLA2 introspection arrives
1597   #
1598   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1599   return (\@outer_from, $select, $where, $attrs);
1600 }
1601
1602 sub _resolve_ident_sources {
1603   my ($self, $ident) = @_;
1604
1605   my $alias2source = {};
1606
1607   # the reason this is so contrived is that $ident may be a {from}
1608   # structure, specifying multiple tables to join
1609   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1610     # this is compat mode for insert/update/delete which do not deal with aliases
1611     $alias2source->{me} = $ident;
1612   }
1613   elsif (ref $ident eq 'ARRAY') {
1614
1615     for (@$ident) {
1616       my $tabinfo;
1617       if (ref $_ eq 'HASH') {
1618         $tabinfo = $_;
1619       }
1620       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1621         $tabinfo = $_->[0];
1622       }
1623
1624       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1625         if ($tabinfo->{-source_handle});
1626     }
1627   }
1628
1629   return $alias2source;
1630 }
1631
1632 # Returns a counting SELECT for a simple count
1633 # query. Abstracted so that a storage could override
1634 # this to { count => 'firstcol' } or whatever makes
1635 # sense as a performance optimization
1636 sub _count_select {
1637   #my ($self, $source, $rs_attrs) = @_;
1638   return { count => '*' };
1639 }
1640
1641 # Returns a SELECT which will end up in the subselect
1642 # There may or may not be a group_by, as the subquery
1643 # might have been called to accomodate a limit
1644 #
1645 # Most databases would be happy with whatever ends up
1646 # here, but some choke in various ways.
1647 #
1648 sub _subq_count_select {
1649   my ($self, $source, $rs_attrs) = @_;
1650   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1651
1652   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1653   return @pcols ? \@pcols : [ 1 ];
1654 }
1655
1656
1657 sub source_bind_attributes {
1658   my ($self, $source) = @_;
1659
1660   my $bind_attributes;
1661   foreach my $column ($source->columns) {
1662
1663     my $data_type = $source->column_info($column)->{data_type} || '';
1664     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1665      if $data_type;
1666   }
1667
1668   return $bind_attributes;
1669 }
1670
1671 =head2 select
1672
1673 =over 4
1674
1675 =item Arguments: $ident, $select, $condition, $attrs
1676
1677 =back
1678
1679 Handle a SQL select statement.
1680
1681 =cut
1682
1683 sub select {
1684   my $self = shift;
1685   my ($ident, $select, $condition, $attrs) = @_;
1686   return $self->cursor_class->new($self, \@_, $attrs);
1687 }
1688
1689 sub select_single {
1690   my $self = shift;
1691   my ($rv, $sth, @bind) = $self->_select(@_);
1692   my @row = $sth->fetchrow_array;
1693   my @nextrow = $sth->fetchrow_array if @row;
1694   if(@row && @nextrow) {
1695     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1696   }
1697   # Need to call finish() to work round broken DBDs
1698   $sth->finish();
1699   return @row;
1700 }
1701
1702 =head2 sth
1703
1704 =over 4
1705
1706 =item Arguments: $sql
1707
1708 =back
1709
1710 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1711
1712 =cut
1713
1714 sub _dbh_sth {
1715   my ($self, $dbh, $sql) = @_;
1716
1717   # 3 is the if_active parameter which avoids active sth re-use
1718   my $sth = $self->disable_sth_caching
1719     ? $dbh->prepare($sql)
1720     : $dbh->prepare_cached($sql, {}, 3);
1721
1722   # XXX You would think RaiseError would make this impossible,
1723   #  but apparently that's not true :(
1724   $self->throw_exception($dbh->errstr) if !$sth;
1725
1726   $sth;
1727 }
1728
1729 sub sth {
1730   my ($self, $sql) = @_;
1731   $self->dbh_do('_dbh_sth', $sql);
1732 }
1733
1734 sub _dbh_columns_info_for {
1735   my ($self, $dbh, $table) = @_;
1736
1737   if ($dbh->can('column_info')) {
1738     my %result;
1739     eval {
1740       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1741       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1742       $sth->execute();
1743       while ( my $info = $sth->fetchrow_hashref() ){
1744         my %column_info;
1745         $column_info{data_type}   = $info->{TYPE_NAME};
1746         $column_info{size}      = $info->{COLUMN_SIZE};
1747         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1748         $column_info{default_value} = $info->{COLUMN_DEF};
1749         my $col_name = $info->{COLUMN_NAME};
1750         $col_name =~ s/^\"(.*)\"$/$1/;
1751
1752         $result{$col_name} = \%column_info;
1753       }
1754     };
1755     return \%result if !$@ && scalar keys %result;
1756   }
1757
1758   my %result;
1759   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1760   $sth->execute;
1761   my @columns = @{$sth->{NAME_lc}};
1762   for my $i ( 0 .. $#columns ){
1763     my %column_info;
1764     $column_info{data_type} = $sth->{TYPE}->[$i];
1765     $column_info{size} = $sth->{PRECISION}->[$i];
1766     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1767
1768     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1769       $column_info{data_type} = $1;
1770       $column_info{size}    = $2;
1771     }
1772
1773     $result{$columns[$i]} = \%column_info;
1774   }
1775   $sth->finish;
1776
1777   foreach my $col (keys %result) {
1778     my $colinfo = $result{$col};
1779     my $type_num = $colinfo->{data_type};
1780     my $type_name;
1781     if(defined $type_num && $dbh->can('type_info')) {
1782       my $type_info = $dbh->type_info($type_num);
1783       $type_name = $type_info->{TYPE_NAME} if $type_info;
1784       $colinfo->{data_type} = $type_name if $type_name;
1785     }
1786   }
1787
1788   return \%result;
1789 }
1790
1791 sub columns_info_for {
1792   my ($self, $table) = @_;
1793   $self->dbh_do('_dbh_columns_info_for', $table);
1794 }
1795
1796 =head2 last_insert_id
1797
1798 Return the row id of the last insert.
1799
1800 =cut
1801
1802 sub _dbh_last_insert_id {
1803     # All Storage's need to register their own _dbh_last_insert_id
1804     # the old SQLite-based method was highly inappropriate
1805
1806     my $self = shift;
1807     my $class = ref $self;
1808     $self->throw_exception (<<EOE);
1809
1810 No _dbh_last_insert_id() method found in $class.
1811 Since the method of obtaining the autoincrement id of the last insert
1812 operation varies greatly between different databases, this method must be
1813 individually implemented for every storage class.
1814 EOE
1815 }
1816
1817 sub last_insert_id {
1818   my $self = shift;
1819   $self->dbh_do('_dbh_last_insert_id', @_);
1820 }
1821
1822 =head2 sqlt_type
1823
1824 Returns the database driver name.
1825
1826 =cut
1827
1828 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1829
1830 =head2 bind_attribute_by_data_type
1831
1832 Given a datatype from column info, returns a database specific bind
1833 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1834 let the database planner just handle it.
1835
1836 Generally only needed for special case column types, like bytea in postgres.
1837
1838 =cut
1839
1840 sub bind_attribute_by_data_type {
1841     return;
1842 }
1843
1844 =head2 is_datatype_numeric
1845
1846 Given a datatype from column_info, returns a boolean value indicating if
1847 the current RDBMS considers it a numeric value. This controls how
1848 L<DBIx::Class::Row/set_column> decides whether to mark the column as
1849 dirty - when the datatype is deemed numeric a C<< != >> comparison will
1850 be performed instead of the usual C<eq>.
1851
1852 =cut
1853
1854 sub is_datatype_numeric {
1855   my ($self, $dt) = @_;
1856
1857   return 0 unless $dt;
1858
1859   return $dt =~ /^ (?:
1860     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
1861   ) $/ix;
1862 }
1863
1864
1865 =head2 create_ddl_dir (EXPERIMENTAL)
1866
1867 =over 4
1868
1869 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1870
1871 =back
1872
1873 Creates a SQL file based on the Schema, for each of the specified
1874 database engines in C<\@databases> in the given directory.
1875 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
1876
1877 Given a previous version number, this will also create a file containing
1878 the ALTER TABLE statements to transform the previous schema into the
1879 current one. Note that these statements may contain C<DROP TABLE> or
1880 C<DROP COLUMN> statements that can potentially destroy data.
1881
1882 The file names are created using the C<ddl_filename> method below, please
1883 override this method in your schema if you would like a different file
1884 name format. For the ALTER file, the same format is used, replacing
1885 $version in the name with "$preversion-$version".
1886
1887 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
1888 The most common value for this would be C<< { add_drop_table => 1 } >>
1889 to have the SQL produced include a C<DROP TABLE> statement for each table
1890 created. For quoting purposes supply C<quote_table_names> and
1891 C<quote_field_names>.
1892
1893 If no arguments are passed, then the following default values are assumed:
1894
1895 =over 4
1896
1897 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
1898
1899 =item version    - $schema->schema_version
1900
1901 =item directory  - './'
1902
1903 =item preversion - <none>
1904
1905 =back
1906
1907 By default, C<\%sqlt_args> will have
1908
1909  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1910
1911 merged with the hash passed in. To disable any of those features, pass in a 
1912 hashref like the following
1913
1914  { ignore_constraint_names => 0, # ... other options }
1915
1916
1917 Note that this feature is currently EXPERIMENTAL and may not work correctly 
1918 across all databases, or fully handle complex relationships.
1919
1920 WARNING: Please check all SQL files created, before applying them.
1921
1922 =cut
1923
1924 sub create_ddl_dir {
1925   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1926
1927   if(!$dir || !-d $dir) {
1928     carp "No directory given, using ./\n";
1929     $dir = "./";
1930   }
1931   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1932   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1933
1934   my $schema_version = $schema->schema_version || '1.x';
1935   $version ||= $schema_version;
1936
1937   $sqltargs = {
1938     add_drop_table => 1, 
1939     ignore_constraint_names => 1,
1940     ignore_index_names => 1,
1941     %{$sqltargs || {}}
1942   };
1943
1944   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1945       . $self->_check_sqlt_message . q{'})
1946           if !$self->_check_sqlt_version;
1947
1948   my $sqlt = SQL::Translator->new( $sqltargs );
1949
1950   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1951   my $sqlt_schema = $sqlt->translate({ data => $schema })
1952     or $self->throw_exception ($sqlt->error);
1953
1954   foreach my $db (@$databases) {
1955     $sqlt->reset();
1956     $sqlt->{schema} = $sqlt_schema;
1957     $sqlt->producer($db);
1958
1959     my $file;
1960     my $filename = $schema->ddl_filename($db, $version, $dir);
1961     if (-e $filename && ($version eq $schema_version )) {
1962       # if we are dumping the current version, overwrite the DDL
1963       carp "Overwriting existing DDL file - $filename";
1964       unlink($filename);
1965     }
1966
1967     my $output = $sqlt->translate;
1968     if(!$output) {
1969       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1970       next;
1971     }
1972     if(!open($file, ">$filename")) {
1973       $self->throw_exception("Can't open $filename for writing ($!)");
1974       next;
1975     }
1976     print $file $output;
1977     close($file);
1978   
1979     next unless ($preversion);
1980
1981     require SQL::Translator::Diff;
1982
1983     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1984     if(!-e $prefilename) {
1985       carp("No previous schema file found ($prefilename)");
1986       next;
1987     }
1988
1989     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1990     if(-e $difffile) {
1991       carp("Overwriting existing diff file - $difffile");
1992       unlink($difffile);
1993     }
1994     
1995     my $source_schema;
1996     {
1997       my $t = SQL::Translator->new($sqltargs);
1998       $t->debug( 0 );
1999       $t->trace( 0 );
2000
2001       $t->parser( $db )
2002         or $self->throw_exception ($t->error);
2003
2004       my $out = $t->translate( $prefilename )
2005         or $self->throw_exception ($t->error);
2006
2007       $source_schema = $t->schema;
2008
2009       $source_schema->name( $prefilename )
2010         unless ( $source_schema->name );
2011     }
2012
2013     # The "new" style of producers have sane normalization and can support 
2014     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2015     # And we have to diff parsed SQL against parsed SQL.
2016     my $dest_schema = $sqlt_schema;
2017
2018     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2019       my $t = SQL::Translator->new($sqltargs);
2020       $t->debug( 0 );
2021       $t->trace( 0 );
2022
2023       $t->parser( $db )
2024         or $self->throw_exception ($t->error);
2025
2026       my $out = $t->translate( $filename )
2027         or $self->throw_exception ($t->error);
2028
2029       $dest_schema = $t->schema;
2030
2031       $dest_schema->name( $filename )
2032         unless $dest_schema->name;
2033     }
2034     
2035     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2036                                                   $dest_schema,   $db,
2037                                                   $sqltargs
2038                                                  );
2039     if(!open $file, ">$difffile") { 
2040       $self->throw_exception("Can't write to $difffile ($!)");
2041       next;
2042     }
2043     print $file $diff;
2044     close($file);
2045   }
2046 }
2047
2048 =head2 deployment_statements
2049
2050 =over 4
2051
2052 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2053
2054 =back
2055
2056 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2057
2058 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2059 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2060
2061 C<$directory> is used to return statements from files in a previously created
2062 L</create_ddl_dir> directory and is optional. The filenames are constructed
2063 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2064
2065 If no C<$directory> is specified then the statements are constructed on the
2066 fly using L<SQL::Translator> and C<$version> is ignored.
2067
2068 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2069
2070 =cut
2071
2072 sub deployment_statements {
2073   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2074   # Need to be connected to get the correct sqlt_type
2075   $self->ensure_connected() unless $type;
2076   $type ||= $self->sqlt_type;
2077   $version ||= $schema->schema_version || '1.x';
2078   $dir ||= './';
2079   my $filename = $schema->ddl_filename($type, $version, $dir);
2080   if(-f $filename)
2081   {
2082       my $file;
2083       open($file, "<$filename") 
2084         or $self->throw_exception("Can't open $filename ($!)");
2085       my @rows = <$file>;
2086       close($file);
2087       return join('', @rows);
2088   }
2089
2090   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
2091       . $self->_check_sqlt_message . q{'})
2092           if !$self->_check_sqlt_version;
2093
2094   require SQL::Translator::Parser::DBIx::Class;
2095   eval qq{use SQL::Translator::Producer::${type}};
2096   $self->throw_exception($@) if $@;
2097
2098   # sources needs to be a parser arg, but for simplicty allow at top level 
2099   # coming in
2100   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2101       if exists $sqltargs->{sources};
2102
2103   my $tr = SQL::Translator->new(%$sqltargs);
2104   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
2105   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
2106 }
2107
2108 sub deploy {
2109   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2110   my $deploy = sub {
2111     my $line = shift;
2112     return if($line =~ /^--/);
2113     return if(!$line);
2114     # next if($line =~ /^DROP/m);
2115     return if($line =~ /^BEGIN TRANSACTION/m);
2116     return if($line =~ /^COMMIT/m);
2117     return if $line =~ /^\s+$/; # skip whitespace only
2118     $self->_query_start($line);
2119     eval {
2120       $self->dbh->do($line); # shouldn't be using ->dbh ?
2121     };
2122     if ($@) {
2123       carp qq{$@ (running "${line}")};
2124     }
2125     $self->_query_end($line);
2126   };
2127   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2128   if (@statements > 1) {
2129     foreach my $statement (@statements) {
2130       $deploy->( $statement );
2131     }
2132   }
2133   elsif (@statements == 1) {
2134     foreach my $line ( split(";\n", $statements[0])) {
2135       $deploy->( $line );
2136     }
2137   }
2138 }
2139
2140 =head2 datetime_parser
2141
2142 Returns the datetime parser class
2143
2144 =cut
2145
2146 sub datetime_parser {
2147   my $self = shift;
2148   return $self->{datetime_parser} ||= do {
2149     $self->ensure_connected;
2150     $self->build_datetime_parser(@_);
2151   };
2152 }
2153
2154 =head2 datetime_parser_type
2155
2156 Defines (returns) the datetime parser class - currently hardwired to
2157 L<DateTime::Format::MySQL>
2158
2159 =cut
2160
2161 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2162
2163 =head2 build_datetime_parser
2164
2165 See L</datetime_parser>
2166
2167 =cut
2168
2169 sub build_datetime_parser {
2170   my $self = shift;
2171   my $type = $self->datetime_parser_type(@_);
2172   eval "use ${type}";
2173   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2174   return $type;
2175 }
2176
2177 {
2178     my $_check_sqlt_version; # private
2179     my $_check_sqlt_message; # private
2180     sub _check_sqlt_version {
2181         return $_check_sqlt_version if defined $_check_sqlt_version;
2182         eval 'use SQL::Translator "0.09003"';
2183         $_check_sqlt_message = $@ || '';
2184         $_check_sqlt_version = !$@;
2185     }
2186
2187     sub _check_sqlt_message {
2188         _check_sqlt_version if !defined $_check_sqlt_message;
2189         $_check_sqlt_message;
2190     }
2191 }
2192
2193 =head2 is_replicating
2194
2195 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2196 replicate from a master database.  Default is undef, which is the result
2197 returned by databases that don't support replication.
2198
2199 =cut
2200
2201 sub is_replicating {
2202     return;
2203     
2204 }
2205
2206 =head2 lag_behind_master
2207
2208 Returns a number that represents a certain amount of lag behind a master db
2209 when a given storage is replicating.  The number is database dependent, but
2210 starts at zero and increases with the amount of lag. Default in undef
2211
2212 =cut
2213
2214 sub lag_behind_master {
2215     return;
2216 }
2217
2218 sub DESTROY {
2219   my $self = shift;
2220   return if !$self->_dbh;
2221   $self->_verify_pid;
2222   $self->_dbh(undef);
2223 }
2224
2225 1;
2226
2227 =head1 USAGE NOTES
2228
2229 =head2 DBIx::Class and AutoCommit
2230
2231 DBIx::Class can do some wonderful magic with handling exceptions,
2232 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2233 combined with C<txn_do> for transaction support.
2234
2235 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2236 in an assumed transaction between commits, and you're telling us you'd
2237 like to manage that manually.  A lot of the magic protections offered by
2238 this module will go away.  We can't protect you from exceptions due to database
2239 disconnects because we don't know anything about how to restart your
2240 transactions.  You're on your own for handling all sorts of exceptional
2241 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2242 be with raw DBI.
2243
2244
2245
2246 =head1 AUTHORS
2247
2248 Matt S. Trout <mst@shadowcatsystems.co.uk>
2249
2250 Andy Grundman <andy@hybridized.org>
2251
2252 =head1 LICENSE
2253
2254 You may distribute this code under the same terms as Perl itself.
2255
2256 =cut