First cut. Need to add control of how many rows are sent at once.
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Sub::Name 'subname';
16 use Try::Tiny;
17 use overload ();
18 use namespace::clean;
19
20
21 # default cursor class, overridable in connect_info attributes
22 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
23
24 __PACKAGE__->mk_group_accessors('inherited' => qw/
25   sql_limit_dialect sql_quote_char sql_name_sep
26 /);
27
28 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
29
30 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
31 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
32
33 __PACKAGE__->sql_name_sep('.');
34
35 __PACKAGE__->mk_group_accessors('simple' => qw/
36   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
37   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
38   transaction_depth _dbh_autocommit  savepoints
39 /);
40
41 # the values for these accessors are picked out (and deleted) from
42 # the attribute hashref passed to connect_info
43 my @storage_options = qw/
44   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
45   disable_sth_caching unsafe auto_savepoint
46 /;
47 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
48
49
50 # capability definitions, using a 2-tiered accessor system
51 # The rationale is:
52 #
53 # A driver/user may define _use_X, which blindly without any checks says:
54 # "(do not) use this capability", (use_dbms_capability is an "inherited"
55 # type accessor)
56 #
57 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
58 # accessor, which in turn calls _determine_supports_X, and stores the return
59 # in a special slot on the storage object, which is wiped every time a $dbh
60 # reconnection takes place (it is not guaranteed that upon reconnection we
61 # will get the same rdbms version). _determine_supports_X does not need to
62 # exist on a driver, as we ->can for it before calling.
63
64 my @capabilities = (qw/
65   insert_returning
66   insert_returning_bound
67   placeholders
68   typeless_placeholders
69   join_optimizer
70 /);
71 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
72 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
73
74 # on by default, not strictly a capability (pending rewrite)
75 __PACKAGE__->_use_join_optimizer (1);
76 sub _determine_supports_join_optimizer { 1 };
77
78 # Each of these methods need _determine_driver called before itself
79 # in order to function reliably. This is a purely DRY optimization
80 #
81 # get_(use)_dbms_capability need to be called on the correct Storage
82 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
83 # _determine_supports_X which obv. needs a correct driver as well
84 my @rdbms_specific_methods = qw/
85   deployment_statements
86   sqlt_type
87   sql_maker
88   build_datetime_parser
89   datetime_parser_type
90
91   insert
92   insert_bulk
93   update
94   delete
95   select
96   select_single
97
98   get_use_dbms_capability
99   get_dbms_capability
100
101   _server_info
102   _get_server_version
103 /;
104
105 for my $meth (@rdbms_specific_methods) {
106
107   my $orig = __PACKAGE__->can ($meth)
108     or die "$meth is not a ::Storage::DBI method!";
109
110   no strict qw/refs/;
111   no warnings qw/redefine/;
112   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
113     if (
114       # only fire when invoked on an instance, a valid class-based invocation
115       # would e.g. be setting a default for an inherited accessor
116       ref $_[0]
117         and
118       ! $_[0]->_driver_determined
119         and
120       ! $_[0]->{_in_determine_driver}
121     ) {
122       $_[0]->_determine_driver;
123
124       # This for some reason crashes and burns on perl 5.8.1
125       # IFF the method ends up throwing an exception
126       #goto $_[0]->can ($meth);
127
128       my $cref = $_[0]->can ($meth);
129       goto $cref;
130     }
131
132     goto $orig;
133   };
134 }
135
136
137 =head1 NAME
138
139 DBIx::Class::Storage::DBI - DBI storage handler
140
141 =head1 SYNOPSIS
142
143   my $schema = MySchema->connect('dbi:SQLite:my.db');
144
145   $schema->storage->debug(1);
146
147   my @stuff = $schema->storage->dbh_do(
148     sub {
149       my ($storage, $dbh, @args) = @_;
150       $dbh->do("DROP TABLE authors");
151     },
152     @column_list
153   );
154
155   $schema->resultset('Book')->search({
156      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
157   });
158
159 =head1 DESCRIPTION
160
161 This class represents the connection to an RDBMS via L<DBI>.  See
162 L<DBIx::Class::Storage> for general information.  This pod only
163 documents DBI-specific methods and behaviors.
164
165 =head1 METHODS
166
167 =cut
168
169 sub new {
170   my $new = shift->next::method(@_);
171
172   $new->transaction_depth(0);
173   $new->_sql_maker_opts({});
174   $new->_dbh_details({});
175   $new->{savepoints} = [];
176   $new->{_in_dbh_do} = 0;
177   $new->{_dbh_gen} = 0;
178
179   # read below to see what this does
180   $new->_arm_global_destructor;
181
182   $new;
183 }
184
185 # This is hack to work around perl shooting stuff in random
186 # order on exit(). If we do not walk the remaining storage
187 # objects in an END block, there is a *small but real* chance
188 # of a fork()ed child to kill the parent's shared DBI handle,
189 # *before perl reaches the DESTROY in this package*
190 # Yes, it is ugly and effective.
191 # Additionally this registry is used by the CLONE method to
192 # make sure no handles are shared between threads
193 {
194   my %seek_and_destroy;
195
196   sub _arm_global_destructor {
197     my $self = shift;
198     my $key = refaddr ($self);
199     $seek_and_destroy{$key} = $self;
200     weaken ($seek_and_destroy{$key});
201   }
202
203   END {
204     local $?; # just in case the DBI destructor changes it somehow
205
206     # destroy just the object if not native to this process/thread
207     $_->_verify_pid for (grep
208       { defined $_ }
209       values %seek_and_destroy
210     );
211   }
212
213   sub CLONE {
214     # As per DBI's recommendation, DBIC disconnects all handles as
215     # soon as possible (DBIC will reconnect only on demand from within
216     # the thread)
217     for (values %seek_and_destroy) {
218       next unless $_;
219       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
220       $_->_dbh(undef);
221     }
222   }
223 }
224
225 sub DESTROY {
226   my $self = shift;
227
228   # some databases spew warnings on implicit disconnect
229   local $SIG{__WARN__} = sub {};
230   $self->_dbh(undef);
231
232   # this op is necessary, since the very last perl runtime statement
233   # triggers a global destruction shootout, and the $SIG localization
234   # may very well be destroyed before perl actually gets to do the
235   # $dbh undef
236   1;
237 }
238
239 # handle pid changes correctly - do not destroy parent's connection
240 sub _verify_pid {
241   my $self = shift;
242
243   my $pid = $self->_conn_pid;
244   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
245     $dbh->{InactiveDestroy} = 1;
246     $self->{_dbh_gen}++;
247     $self->_dbh(undef);
248   }
249
250   return;
251 }
252
253 =head2 connect_info
254
255 This method is normally called by L<DBIx::Class::Schema/connection>, which
256 encapsulates its argument list in an arrayref before passing them here.
257
258 The argument list may contain:
259
260 =over
261
262 =item *
263
264 The same 4-element argument set one would normally pass to
265 L<DBI/connect>, optionally followed by
266 L<extra attributes|/DBIx::Class specific connection attributes>
267 recognized by DBIx::Class:
268
269   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
270
271 =item *
272
273 A single code reference which returns a connected
274 L<DBI database handle|DBI/connect> optionally followed by
275 L<extra attributes|/DBIx::Class specific connection attributes> recognized
276 by DBIx::Class:
277
278   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
279
280 =item *
281
282 A single hashref with all the attributes and the dsn/user/password
283 mixed together:
284
285   $connect_info_args = [{
286     dsn => $dsn,
287     user => $user,
288     password => $pass,
289     %dbi_attributes,
290     %extra_attributes,
291   }];
292
293   $connect_info_args = [{
294     dbh_maker => sub { DBI->connect (...) },
295     %dbi_attributes,
296     %extra_attributes,
297   }];
298
299 This is particularly useful for L<Catalyst> based applications, allowing the
300 following config (L<Config::General> style):
301
302   <Model::DB>
303     schema_class   App::DB
304     <connect_info>
305       dsn          dbi:mysql:database=test
306       user         testuser
307       password     TestPass
308       AutoCommit   1
309     </connect_info>
310   </Model::DB>
311
312 The C<dsn>/C<user>/C<password> combination can be substituted by the
313 C<dbh_maker> key whose value is a coderef that returns a connected
314 L<DBI database handle|DBI/connect>
315
316 =back
317
318 Please note that the L<DBI> docs recommend that you always explicitly
319 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
320 recommends that it be set to I<1>, and that you perform transactions
321 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
322 to I<1> if you do not do explicitly set it to zero.  This is the default
323 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
324
325 =head3 DBIx::Class specific connection attributes
326
327 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
328 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
329 the following connection options. These options can be mixed in with your other
330 L<DBI> connection attributes, or placed in a separate hashref
331 (C<\%extra_attributes>) as shown above.
332
333 Every time C<connect_info> is invoked, any previous settings for
334 these options will be cleared before setting the new ones, regardless of
335 whether any options are specified in the new C<connect_info>.
336
337
338 =over
339
340 =item on_connect_do
341
342 Specifies things to do immediately after connecting or re-connecting to
343 the database.  Its value may contain:
344
345 =over
346
347 =item a scalar
348
349 This contains one SQL statement to execute.
350
351 =item an array reference
352
353 This contains SQL statements to execute in order.  Each element contains
354 a string or a code reference that returns a string.
355
356 =item a code reference
357
358 This contains some code to execute.  Unlike code references within an
359 array reference, its return value is ignored.
360
361 =back
362
363 =item on_disconnect_do
364
365 Takes arguments in the same form as L</on_connect_do> and executes them
366 immediately before disconnecting from the database.
367
368 Note, this only runs if you explicitly call L</disconnect> on the
369 storage object.
370
371 =item on_connect_call
372
373 A more generalized form of L</on_connect_do> that calls the specified
374 C<connect_call_METHOD> methods in your storage driver.
375
376   on_connect_do => 'select 1'
377
378 is equivalent to:
379
380   on_connect_call => [ [ do_sql => 'select 1' ] ]
381
382 Its values may contain:
383
384 =over
385
386 =item a scalar
387
388 Will call the C<connect_call_METHOD> method.
389
390 =item a code reference
391
392 Will execute C<< $code->($storage) >>
393
394 =item an array reference
395
396 Each value can be a method name or code reference.
397
398 =item an array of arrays
399
400 For each array, the first item is taken to be the C<connect_call_> method name
401 or code reference, and the rest are parameters to it.
402
403 =back
404
405 Some predefined storage methods you may use:
406
407 =over
408
409 =item do_sql
410
411 Executes a SQL string or a code reference that returns a SQL string. This is
412 what L</on_connect_do> and L</on_disconnect_do> use.
413
414 It can take:
415
416 =over
417
418 =item a scalar
419
420 Will execute the scalar as SQL.
421
422 =item an arrayref
423
424 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
425 attributes hashref and bind values.
426
427 =item a code reference
428
429 Will execute C<< $code->($storage) >> and execute the return array refs as
430 above.
431
432 =back
433
434 =item datetime_setup
435
436 Execute any statements necessary to initialize the database session to return
437 and accept datetime/timestamp values used with
438 L<DBIx::Class::InflateColumn::DateTime>.
439
440 Only necessary for some databases, see your specific storage driver for
441 implementation details.
442
443 =back
444
445 =item on_disconnect_call
446
447 Takes arguments in the same form as L</on_connect_call> and executes them
448 immediately before disconnecting from the database.
449
450 Calls the C<disconnect_call_METHOD> methods as opposed to the
451 C<connect_call_METHOD> methods called by L</on_connect_call>.
452
453 Note, this only runs if you explicitly call L</disconnect> on the
454 storage object.
455
456 =item disable_sth_caching
457
458 If set to a true value, this option will disable the caching of
459 statement handles via L<DBI/prepare_cached>.
460
461 =item limit_dialect
462
463 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
464 default L</sql_limit_dialect> setting of the storage (if any). For a list
465 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
466
467 =item quote_names
468
469 When true automatically sets L</quote_char> and L</name_sep> to the characters
470 appropriate for your particular RDBMS. This option is preferred over specifying
471 L</quote_char> directly.
472
473 =item quote_char
474
475 Specifies what characters to use to quote table and column names.
476
477 C<quote_char> expects either a single character, in which case is it
478 is placed on either side of the table/column name, or an arrayref of length
479 2 in which case the table/column name is placed between the elements.
480
481 For example under MySQL you should use C<< quote_char => '`' >>, and for
482 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
483
484 =item name_sep
485
486 This parameter is only useful in conjunction with C<quote_char>, and is used to
487 specify the character that separates elements (schemas, tables, columns) from
488 each other. If unspecified it defaults to the most commonly used C<.>.
489
490 =item unsafe
491
492 This Storage driver normally installs its own C<HandleError>, sets
493 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
494 all database handles, including those supplied by a coderef.  It does this
495 so that it can have consistent and useful error behavior.
496
497 If you set this option to a true value, Storage will not do its usual
498 modifications to the database handle's attributes, and instead relies on
499 the settings in your connect_info DBI options (or the values you set in
500 your connection coderef, in the case that you are connecting via coderef).
501
502 Note that your custom settings can cause Storage to malfunction,
503 especially if you set a C<HandleError> handler that suppresses exceptions
504 and/or disable C<RaiseError>.
505
506 =item auto_savepoint
507
508 If this option is true, L<DBIx::Class> will use savepoints when nesting
509 transactions, making it possible to recover from failure in the inner
510 transaction without having to abort all outer transactions.
511
512 =item cursor_class
513
514 Use this argument to supply a cursor class other than the default
515 L<DBIx::Class::Storage::DBI::Cursor>.
516
517 =back
518
519 Some real-life examples of arguments to L</connect_info> and
520 L<DBIx::Class::Schema/connect>
521
522   # Simple SQLite connection
523   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
524
525   # Connect via subref
526   ->connect_info([ sub { DBI->connect(...) } ]);
527
528   # Connect via subref in hashref
529   ->connect_info([{
530     dbh_maker => sub { DBI->connect(...) },
531     on_connect_do => 'alter session ...',
532   }]);
533
534   # A bit more complicated
535   ->connect_info(
536     [
537       'dbi:Pg:dbname=foo',
538       'postgres',
539       'my_pg_password',
540       { AutoCommit => 1 },
541       { quote_char => q{"} },
542     ]
543   );
544
545   # Equivalent to the previous example
546   ->connect_info(
547     [
548       'dbi:Pg:dbname=foo',
549       'postgres',
550       'my_pg_password',
551       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
552     ]
553   );
554
555   # Same, but with hashref as argument
556   # See parse_connect_info for explanation
557   ->connect_info(
558     [{
559       dsn         => 'dbi:Pg:dbname=foo',
560       user        => 'postgres',
561       password    => 'my_pg_password',
562       AutoCommit  => 1,
563       quote_char  => q{"},
564       name_sep    => q{.},
565     }]
566   );
567
568   # Subref + DBIx::Class-specific connection options
569   ->connect_info(
570     [
571       sub { DBI->connect(...) },
572       {
573           quote_char => q{`},
574           name_sep => q{@},
575           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
576           disable_sth_caching => 1,
577       },
578     ]
579   );
580
581
582
583 =cut
584
585 sub connect_info {
586   my ($self, $info) = @_;
587
588   return $self->_connect_info if !$info;
589
590   $self->_connect_info($info); # copy for _connect_info
591
592   $info = $self->_normalize_connect_info($info)
593     if ref $info eq 'ARRAY';
594
595   for my $storage_opt (keys %{ $info->{storage_options} }) {
596     my $value = $info->{storage_options}{$storage_opt};
597
598     $self->$storage_opt($value);
599   }
600
601   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
602   #  the new set of options
603   $self->_sql_maker(undef);
604   $self->_sql_maker_opts({});
605
606   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
607     my $value = $info->{sql_maker_options}{$sql_maker_opt};
608
609     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
610   }
611
612   my %attrs = (
613     %{ $self->_default_dbi_connect_attributes || {} },
614     %{ $info->{attributes} || {} },
615   );
616
617   my @args = @{ $info->{arguments} };
618
619   if (keys %attrs and ref $args[0] ne 'CODE') {
620     carp
621         'You provided explicit AutoCommit => 0 in your connection_info. '
622       . 'This is almost universally a bad idea (see the footnotes of '
623       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
624       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
625       . 'this warning.'
626       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
627
628     push @args, \%attrs if keys %attrs;
629   }
630   $self->_dbi_connect_info(\@args);
631
632   # FIXME - dirty:
633   # save attributes them in a separate accessor so they are always
634   # introspectable, even in case of a CODE $dbhmaker
635   $self->_dbic_connect_attributes (\%attrs);
636
637   return $self->_connect_info;
638 }
639
640 sub _normalize_connect_info {
641   my ($self, $info_arg) = @_;
642   my %info;
643
644   my @args = @$info_arg;  # take a shallow copy for further mutilation
645
646   # combine/pre-parse arguments depending on invocation style
647
648   my %attrs;
649   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
650     %attrs = %{ $args[1] || {} };
651     @args = $args[0];
652   }
653   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
654     %attrs = %{$args[0]};
655     @args = ();
656     if (my $code = delete $attrs{dbh_maker}) {
657       @args = $code;
658
659       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
660       if (@ignored) {
661         carp sprintf (
662             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
663           . "to the result of 'dbh_maker'",
664
665           join (', ', map { "'$_'" } (@ignored) ),
666         );
667       }
668     }
669     else {
670       @args = delete @attrs{qw/dsn user password/};
671     }
672   }
673   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
674     %attrs = (
675       % { $args[3] || {} },
676       % { $args[4] || {} },
677     );
678     @args = @args[0,1,2];
679   }
680
681   $info{arguments} = \@args;
682
683   my @storage_opts = grep exists $attrs{$_},
684     @storage_options, 'cursor_class';
685
686   @{ $info{storage_options} }{@storage_opts} =
687     delete @attrs{@storage_opts} if @storage_opts;
688
689   my @sql_maker_opts = grep exists $attrs{$_},
690     qw/limit_dialect quote_char name_sep quote_names/;
691
692   @{ $info{sql_maker_options} }{@sql_maker_opts} =
693     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
694
695   $info{attributes} = \%attrs if %attrs;
696
697   return \%info;
698 }
699
700 sub _default_dbi_connect_attributes () {
701   +{
702     AutoCommit => 1,
703     PrintError => 0,
704     RaiseError => 1,
705     ShowErrorStatement => 1,
706   };
707 }
708
709 =head2 on_connect_do
710
711 This method is deprecated in favour of setting via L</connect_info>.
712
713 =cut
714
715 =head2 on_disconnect_do
716
717 This method is deprecated in favour of setting via L</connect_info>.
718
719 =cut
720
721 sub _parse_connect_do {
722   my ($self, $type) = @_;
723
724   my $val = $self->$type;
725   return () if not defined $val;
726
727   my @res;
728
729   if (not ref($val)) {
730     push @res, [ 'do_sql', $val ];
731   } elsif (ref($val) eq 'CODE') {
732     push @res, $val;
733   } elsif (ref($val) eq 'ARRAY') {
734     push @res, map { [ 'do_sql', $_ ] } @$val;
735   } else {
736     $self->throw_exception("Invalid type for $type: ".ref($val));
737   }
738
739   return \@res;
740 }
741
742 =head2 dbh_do
743
744 Arguments: ($subref | $method_name), @extra_coderef_args?
745
746 Execute the given $subref or $method_name using the new exception-based
747 connection management.
748
749 The first two arguments will be the storage object that C<dbh_do> was called
750 on and a database handle to use.  Any additional arguments will be passed
751 verbatim to the called subref as arguments 2 and onwards.
752
753 Using this (instead of $self->_dbh or $self->dbh) ensures correct
754 exception handling and reconnection (or failover in future subclasses).
755
756 Your subref should have no side-effects outside of the database, as
757 there is the potential for your subref to be partially double-executed
758 if the database connection was stale/dysfunctional.
759
760 Example:
761
762   my @stuff = $schema->storage->dbh_do(
763     sub {
764       my ($storage, $dbh, @cols) = @_;
765       my $cols = join(q{, }, @cols);
766       $dbh->selectrow_array("SELECT $cols FROM foo");
767     },
768     @column_list
769   );
770
771 =cut
772
773 sub dbh_do {
774   my $self = shift;
775   my $code = shift;
776
777   my $dbh = $self->_get_dbh;
778
779   return $self->$code($dbh, @_)
780     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
781
782   local $self->{_in_dbh_do} = 1;
783
784   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
785   my $args = \@_;
786   return try {
787     $self->$code ($dbh, @$args);
788   } catch {
789     $self->throw_exception($_) if $self->connected;
790
791     # We were not connected - reconnect and retry, but let any
792     #  exception fall right through this time
793     carp "Retrying $code after catching disconnected exception: $_"
794       if $ENV{DBIC_DBIRETRY_DEBUG};
795
796     $self->_populate_dbh;
797     $self->$code($self->_dbh, @$args);
798   };
799 }
800
801 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
802 # It also informs dbh_do to bypass itself while under the direction of txn_do,
803 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
804 sub txn_do {
805   my $self = shift;
806   my $coderef = shift;
807
808   ref $coderef eq 'CODE' or $self->throw_exception
809     ('$coderef must be a CODE reference');
810
811   local $self->{_in_dbh_do} = 1;
812
813   my @result;
814   my $want = wantarray;
815
816   my $tried = 0;
817   while(1) {
818     my $exception;
819
820     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
821     my $args = \@_;
822
823     try {
824       $self->txn_begin;
825       my $txn_start_depth = $self->transaction_depth;
826       if($want) {
827           @result = $coderef->(@$args);
828       }
829       elsif(defined $want) {
830           $result[0] = $coderef->(@$args);
831       }
832       else {
833           $coderef->(@$args);
834       }
835
836       my $delta_txn = $txn_start_depth - $self->transaction_depth;
837       if ($delta_txn == 0) {
838         $self->txn_commit;
839       }
840       elsif ($delta_txn != 1) {
841         # an off-by-one would mean we fired a rollback
842         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
843       }
844     } catch {
845       $exception = $_;
846     };
847
848     if(! defined $exception) { return wantarray ? @result : $result[0] }
849
850     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
851       my $rollback_exception;
852       try { $self->txn_rollback } catch { $rollback_exception = shift };
853       if(defined $rollback_exception) {
854         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
855         $self->throw_exception($exception)  # propagate nested rollback
856           if $rollback_exception =~ /$exception_class/;
857
858         $self->throw_exception(
859           "Transaction aborted: ${exception}. "
860           . "Rollback failed: ${rollback_exception}"
861         );
862       }
863       $self->throw_exception($exception)
864     }
865
866     # We were not connected, and was first try - reconnect and retry
867     # via the while loop
868     carp "Retrying $coderef after catching disconnected exception: $exception"
869       if $ENV{DBIC_TXNRETRY_DEBUG};
870     $self->_populate_dbh;
871   }
872 }
873
874 =head2 disconnect
875
876 Our C<disconnect> method also performs a rollback first if the
877 database is not in C<AutoCommit> mode.
878
879 =cut
880
881 sub disconnect {
882   my ($self) = @_;
883
884   if( $self->_dbh ) {
885     my @actions;
886
887     push @actions, ( $self->on_disconnect_call || () );
888     push @actions, $self->_parse_connect_do ('on_disconnect_do');
889
890     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
891
892     $self->_dbh_rollback unless $self->_dbh_autocommit;
893
894     %{ $self->_dbh->{CachedKids} } = ();
895     $self->_dbh->disconnect;
896     $self->_dbh(undef);
897     $self->{_dbh_gen}++;
898   }
899 }
900
901 =head2 with_deferred_fk_checks
902
903 =over 4
904
905 =item Arguments: C<$coderef>
906
907 =item Return Value: The return value of $coderef
908
909 =back
910
911 Storage specific method to run the code ref with FK checks deferred or
912 in MySQL's case disabled entirely.
913
914 =cut
915
916 # Storage subclasses should override this
917 sub with_deferred_fk_checks {
918   my ($self, $sub) = @_;
919   $sub->();
920 }
921
922 =head2 connected
923
924 =over
925
926 =item Arguments: none
927
928 =item Return Value: 1|0
929
930 =back
931
932 Verifies that the current database handle is active and ready to execute
933 an SQL statement (e.g. the connection did not get stale, server is still
934 answering, etc.) This method is used internally by L</dbh>.
935
936 =cut
937
938 sub connected {
939   my $self = shift;
940   return 0 unless $self->_seems_connected;
941
942   #be on the safe side
943   local $self->_dbh->{RaiseError} = 1;
944
945   return $self->_ping;
946 }
947
948 sub _seems_connected {
949   my $self = shift;
950
951   $self->_verify_pid;
952
953   my $dbh = $self->_dbh
954     or return 0;
955
956   return $dbh->FETCH('Active');
957 }
958
959 sub _ping {
960   my $self = shift;
961
962   my $dbh = $self->_dbh or return 0;
963
964   return $dbh->ping;
965 }
966
967 sub ensure_connected {
968   my ($self) = @_;
969
970   unless ($self->connected) {
971     $self->_populate_dbh;
972   }
973 }
974
975 =head2 dbh
976
977 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
978 is guaranteed to be healthy by implicitly calling L</connected>, and if
979 necessary performing a reconnection before returning. Keep in mind that this
980 is very B<expensive> on some database engines. Consider using L</dbh_do>
981 instead.
982
983 =cut
984
985 sub dbh {
986   my ($self) = @_;
987
988   if (not $self->_dbh) {
989     $self->_populate_dbh;
990   } else {
991     $self->ensure_connected;
992   }
993   return $self->_dbh;
994 }
995
996 # this is the internal "get dbh or connect (don't check)" method
997 sub _get_dbh {
998   my $self = shift;
999   $self->_verify_pid;
1000   $self->_populate_dbh unless $self->_dbh;
1001   return $self->_dbh;
1002 }
1003
1004 sub sql_maker {
1005   my ($self) = @_;
1006   unless ($self->_sql_maker) {
1007     my $sql_maker_class = $self->sql_maker_class;
1008
1009     my %opts = %{$self->_sql_maker_opts||{}};
1010     my $dialect =
1011       $opts{limit_dialect}
1012         ||
1013       $self->sql_limit_dialect
1014         ||
1015       do {
1016         my $s_class = (ref $self) || $self;
1017         carp (
1018           "Your storage class ($s_class) does not set sql_limit_dialect and you "
1019         . 'have not supplied an explicit limit_dialect in your connection_info. '
1020         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1021         . 'databases but can be (and often is) painfully slow. '
1022         . "Please file an RT ticket against '$s_class' ."
1023         );
1024
1025         'GenericSubQ';
1026       }
1027     ;
1028
1029     my ($quote_char, $name_sep);
1030
1031     if ($opts{quote_names}) {
1032       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1033         my $s_class = (ref $self) || $self;
1034         carp (
1035           "You requested 'quote_names' but your storage class ($s_class) does "
1036         . 'not explicitly define a default sql_quote_char and you have not '
1037         . 'supplied a quote_char as part of your connection_info. DBIC will '
1038         .q{default to the ANSI SQL standard quote '"', which works most of }
1039         . "the time. Please file an RT ticket against '$s_class'."
1040         );
1041
1042         '"'; # RV
1043       };
1044
1045       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1046     }
1047
1048     $self->_sql_maker($sql_maker_class->new(
1049       bindtype=>'columns',
1050       array_datatypes => 1,
1051       limit_dialect => $dialect,
1052       ($quote_char ? (quote_char => $quote_char) : ()),
1053       name_sep => ($name_sep || '.'),
1054       %opts,
1055     ));
1056   }
1057   return $self->_sql_maker;
1058 }
1059
1060 # nothing to do by default
1061 sub _rebless {}
1062 sub _init {}
1063
1064 sub _populate_dbh {
1065   my ($self) = @_;
1066
1067   my @info = @{$self->_dbi_connect_info || []};
1068   $self->_dbh(undef); # in case ->connected failed we might get sent here
1069   $self->_dbh_details({}); # reset everything we know
1070
1071   $self->_dbh($self->_connect(@info));
1072
1073   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1074
1075   $self->_determine_driver;
1076
1077   # Always set the transaction depth on connect, since
1078   #  there is no transaction in progress by definition
1079   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1080
1081   $self->_run_connection_actions unless $self->{_in_determine_driver};
1082 }
1083
1084 sub _run_connection_actions {
1085   my $self = shift;
1086   my @actions;
1087
1088   push @actions, ( $self->on_connect_call || () );
1089   push @actions, $self->_parse_connect_do ('on_connect_do');
1090
1091   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1092 }
1093
1094
1095
1096 sub set_use_dbms_capability {
1097   $_[0]->set_inherited ($_[1], $_[2]);
1098 }
1099
1100 sub get_use_dbms_capability {
1101   my ($self, $capname) = @_;
1102
1103   my $use = $self->get_inherited ($capname);
1104   return defined $use
1105     ? $use
1106     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1107   ;
1108 }
1109
1110 sub set_dbms_capability {
1111   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1112 }
1113
1114 sub get_dbms_capability {
1115   my ($self, $capname) = @_;
1116
1117   my $cap = $self->_dbh_details->{capability}{$capname};
1118
1119   unless (defined $cap) {
1120     if (my $meth = $self->can ("_determine$capname")) {
1121       $cap = $self->$meth ? 1 : 0;
1122     }
1123     else {
1124       $cap = 0;
1125     }
1126
1127     $self->set_dbms_capability ($capname, $cap);
1128   }
1129
1130   return $cap;
1131 }
1132
1133 sub _server_info {
1134   my $self = shift;
1135
1136   my $info;
1137   unless ($info = $self->_dbh_details->{info}) {
1138
1139     $info = {};
1140
1141     my $server_version = try { $self->_get_server_version };
1142
1143     if (defined $server_version) {
1144       $info->{dbms_version} = $server_version;
1145
1146       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1147       my @verparts = split (/\./, $numeric_version);
1148       if (
1149         @verparts
1150           &&
1151         $verparts[0] <= 999
1152       ) {
1153         # consider only up to 3 version parts, iff not more than 3 digits
1154         my @use_parts;
1155         while (@verparts && @use_parts < 3) {
1156           my $p = shift @verparts;
1157           last if $p > 999;
1158           push @use_parts, $p;
1159         }
1160         push @use_parts, 0 while @use_parts < 3;
1161
1162         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1163       }
1164     }
1165
1166     $self->_dbh_details->{info} = $info;
1167   }
1168
1169   return $info;
1170 }
1171
1172 sub _get_server_version {
1173   shift->_dbh_get_info(18);
1174 }
1175
1176 sub _dbh_get_info {
1177   my ($self, $info) = @_;
1178
1179   return try { $self->_get_dbh->get_info($info) } || undef;
1180 }
1181
1182 sub _determine_driver {
1183   my ($self) = @_;
1184
1185   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1186     my $started_connected = 0;
1187     local $self->{_in_determine_driver} = 1;
1188
1189     if (ref($self) eq __PACKAGE__) {
1190       my $driver;
1191       if ($self->_dbh) { # we are connected
1192         $driver = $self->_dbh->{Driver}{Name};
1193         $started_connected = 1;
1194       } else {
1195         # if connect_info is a CODEREF, we have no choice but to connect
1196         if (ref $self->_dbi_connect_info->[0] &&
1197             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1198           $self->_populate_dbh;
1199           $driver = $self->_dbh->{Driver}{Name};
1200         }
1201         else {
1202           # try to use dsn to not require being connected, the driver may still
1203           # force a connection in _rebless to determine version
1204           # (dsn may not be supplied at all if all we do is make a mock-schema)
1205           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1206           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1207           $driver ||= $ENV{DBI_DRIVER};
1208         }
1209       }
1210
1211       if ($driver) {
1212         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1213         if ($self->load_optional_class($storage_class)) {
1214           mro::set_mro($storage_class, 'c3');
1215           bless $self, $storage_class;
1216           $self->_rebless();
1217         }
1218       }
1219     }
1220
1221     $self->_driver_determined(1);
1222
1223     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1224
1225     $self->_init; # run driver-specific initializations
1226
1227     $self->_run_connection_actions
1228         if !$started_connected && defined $self->_dbh;
1229   }
1230 }
1231
1232 sub _do_connection_actions {
1233   my $self          = shift;
1234   my $method_prefix = shift;
1235   my $call          = shift;
1236
1237   if (not ref($call)) {
1238     my $method = $method_prefix . $call;
1239     $self->$method(@_);
1240   } elsif (ref($call) eq 'CODE') {
1241     $self->$call(@_);
1242   } elsif (ref($call) eq 'ARRAY') {
1243     if (ref($call->[0]) ne 'ARRAY') {
1244       $self->_do_connection_actions($method_prefix, $_) for @$call;
1245     } else {
1246       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1247     }
1248   } else {
1249     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1250   }
1251
1252   return $self;
1253 }
1254
1255 sub connect_call_do_sql {
1256   my $self = shift;
1257   $self->_do_query(@_);
1258 }
1259
1260 sub disconnect_call_do_sql {
1261   my $self = shift;
1262   $self->_do_query(@_);
1263 }
1264
1265 # override in db-specific backend when necessary
1266 sub connect_call_datetime_setup { 1 }
1267
1268 sub _do_query {
1269   my ($self, $action) = @_;
1270
1271   if (ref $action eq 'CODE') {
1272     $action = $action->($self);
1273     $self->_do_query($_) foreach @$action;
1274   }
1275   else {
1276     # Most debuggers expect ($sql, @bind), so we need to exclude
1277     # the attribute hash which is the second argument to $dbh->do
1278     # furthermore the bind values are usually to be presented
1279     # as named arrayref pairs, so wrap those here too
1280     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1281     my $sql = shift @do_args;
1282     my $attrs = shift @do_args;
1283     my @bind = map { [ undef, $_ ] } @do_args;
1284
1285     $self->_query_start($sql, @bind);
1286     $self->_get_dbh->do($sql, $attrs, @do_args);
1287     $self->_query_end($sql, @bind);
1288   }
1289
1290   return $self;
1291 }
1292
1293 sub _connect {
1294   my ($self, @info) = @_;
1295
1296   $self->throw_exception("You failed to provide any connection info")
1297     if !@info;
1298
1299   my ($old_connect_via, $dbh);
1300
1301   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1302     $old_connect_via = $DBI::connect_via;
1303     $DBI::connect_via = 'connect';
1304   }
1305
1306   try {
1307     if(ref $info[0] eq 'CODE') {
1308        $dbh = $info[0]->();
1309     }
1310     else {
1311        $dbh = DBI->connect(@info);
1312     }
1313
1314     if (!$dbh) {
1315       die $DBI::errstr;
1316     }
1317
1318     unless ($self->unsafe) {
1319
1320       $self->throw_exception(
1321         'Refusing clobbering of {HandleError} installed on externally supplied '
1322        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1323       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1324
1325       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1326       # request, or an external handle. Complain and set anyway
1327       unless ($dbh->{RaiseError}) {
1328         carp( ref $info[0] eq 'CODE'
1329
1330           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1331            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1332            .'attribute has been supplied'
1333
1334           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1335            .'unsafe => 1. Toggling RaiseError back to true'
1336         );
1337
1338         $dbh->{RaiseError} = 1;
1339       }
1340
1341       # this odd anonymous coderef dereference is in fact really
1342       # necessary to avoid the unwanted effect described in perl5
1343       # RT#75792
1344       sub {
1345         my $weak_self = $_[0];
1346         weaken $weak_self;
1347
1348         # the coderef is blessed so we can distinguish it from externally
1349         # supplied handles (which must be preserved)
1350         $_[1]->{HandleError} = bless sub {
1351           if ($weak_self) {
1352             $weak_self->throw_exception("DBI Exception: $_[0]");
1353           }
1354           else {
1355             # the handler may be invoked by something totally out of
1356             # the scope of DBIC
1357             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1358           }
1359         }, '__DBIC__DBH__ERROR__HANDLER__';
1360       }->($self, $dbh);
1361     }
1362   }
1363   catch {
1364     $self->throw_exception("DBI Connection failed: $_")
1365   }
1366   finally {
1367     $DBI::connect_via = $old_connect_via if $old_connect_via;
1368   };
1369
1370   $self->_dbh_autocommit($dbh->{AutoCommit});
1371   $dbh;
1372 }
1373
1374 sub svp_begin {
1375   my ($self, $name) = @_;
1376
1377   $name = $self->_svp_generate_name
1378     unless defined $name;
1379
1380   $self->throw_exception ("You can't use savepoints outside a transaction")
1381     if $self->{transaction_depth} == 0;
1382
1383   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1384     unless $self->can('_svp_begin');
1385
1386   push @{ $self->{savepoints} }, $name;
1387
1388   $self->debugobj->svp_begin($name) if $self->debug;
1389
1390   return $self->_svp_begin($name);
1391 }
1392
1393 sub svp_release {
1394   my ($self, $name) = @_;
1395
1396   $self->throw_exception ("You can't use savepoints outside a transaction")
1397     if $self->{transaction_depth} == 0;
1398
1399   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1400     unless $self->can('_svp_release');
1401
1402   if (defined $name) {
1403     $self->throw_exception ("Savepoint '$name' does not exist")
1404       unless grep { $_ eq $name } @{ $self->{savepoints} };
1405
1406     # Dig through the stack until we find the one we are releasing.  This keeps
1407     # the stack up to date.
1408     my $svp;
1409
1410     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1411   } else {
1412     $name = pop @{ $self->{savepoints} };
1413   }
1414
1415   $self->debugobj->svp_release($name) if $self->debug;
1416
1417   return $self->_svp_release($name);
1418 }
1419
1420 sub svp_rollback {
1421   my ($self, $name) = @_;
1422
1423   $self->throw_exception ("You can't use savepoints outside a transaction")
1424     if $self->{transaction_depth} == 0;
1425
1426   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1427     unless $self->can('_svp_rollback');
1428
1429   if (defined $name) {
1430       # If they passed us a name, verify that it exists in the stack
1431       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1432           $self->throw_exception("Savepoint '$name' does not exist!");
1433       }
1434
1435       # Dig through the stack until we find the one we are releasing.  This keeps
1436       # the stack up to date.
1437       while(my $s = pop(@{ $self->{savepoints} })) {
1438           last if($s eq $name);
1439       }
1440       # Add the savepoint back to the stack, as a rollback doesn't remove the
1441       # named savepoint, only everything after it.
1442       push(@{ $self->{savepoints} }, $name);
1443   } else {
1444       # We'll assume they want to rollback to the last savepoint
1445       $name = $self->{savepoints}->[-1];
1446   }
1447
1448   $self->debugobj->svp_rollback($name) if $self->debug;
1449
1450   return $self->_svp_rollback($name);
1451 }
1452
1453 sub _svp_generate_name {
1454   my ($self) = @_;
1455   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1456 }
1457
1458 sub txn_begin {
1459   my $self = shift;
1460
1461   # this means we have not yet connected and do not know the AC status
1462   # (e.g. coderef $dbh)
1463   if (! defined $self->_dbh_autocommit) {
1464     $self->ensure_connected;
1465   }
1466   # otherwise re-connect on pid changes, so
1467   # that the txn_depth is adjusted properly
1468   # the lightweight _get_dbh is good enoug here
1469   # (only superficial handle check, no pings)
1470   else {
1471     $self->_get_dbh;
1472   }
1473
1474   if($self->transaction_depth == 0) {
1475     $self->debugobj->txn_begin()
1476       if $self->debug;
1477     $self->_dbh_begin_work;
1478   }
1479   elsif ($self->auto_savepoint) {
1480     $self->svp_begin;
1481   }
1482   $self->{transaction_depth}++;
1483 }
1484
1485 sub _dbh_begin_work {
1486   my $self = shift;
1487
1488   # if the user is utilizing txn_do - good for him, otherwise we need to
1489   # ensure that the $dbh is healthy on BEGIN.
1490   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1491   # will be replaced by a failure of begin_work itself (which will be
1492   # then retried on reconnect)
1493   if ($self->{_in_dbh_do}) {
1494     $self->_dbh->begin_work;
1495   } else {
1496     $self->dbh_do(sub { $_[1]->begin_work });
1497   }
1498 }
1499
1500 sub txn_commit {
1501   my $self = shift;
1502   if (! $self->_dbh) {
1503     $self->throw_exception('cannot COMMIT on a disconnected handle');
1504   }
1505   elsif ($self->{transaction_depth} == 1) {
1506     $self->debugobj->txn_commit()
1507       if ($self->debug);
1508     $self->_dbh_commit;
1509     $self->{transaction_depth} = 0
1510       if $self->_dbh_autocommit;
1511   }
1512   elsif($self->{transaction_depth} > 1) {
1513     $self->{transaction_depth}--;
1514     $self->svp_release
1515       if $self->auto_savepoint;
1516   }
1517   elsif (! $self->_dbh->FETCH('AutoCommit') ) {
1518
1519     carp "Storage transaction_depth $self->{transaction_depth} does not match "
1520         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1521
1522     $self->debugobj->txn_commit()
1523       if ($self->debug);
1524     $self->_dbh_commit;
1525     $self->{transaction_depth} = 0
1526       if $self->_dbh_autocommit;
1527   }
1528   else {
1529     $self->throw_exception( 'Refusing to commit without a started transaction' );
1530   }
1531 }
1532
1533 sub _dbh_commit {
1534   my $self = shift;
1535   my $dbh  = $self->_dbh
1536     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1537   $dbh->commit;
1538 }
1539
1540 sub txn_rollback {
1541   my $self = shift;
1542   my $dbh = $self->_dbh;
1543   try {
1544     if ($self->{transaction_depth} == 1) {
1545       $self->debugobj->txn_rollback()
1546         if ($self->debug);
1547       $self->{transaction_depth} = 0
1548         if $self->_dbh_autocommit;
1549       $self->_dbh_rollback;
1550     }
1551     elsif($self->{transaction_depth} > 1) {
1552       $self->{transaction_depth}--;
1553       if ($self->auto_savepoint) {
1554         $self->svp_rollback;
1555         $self->svp_release;
1556       }
1557     }
1558     else {
1559       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1560     }
1561   }
1562   catch {
1563     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1564
1565     if ($_ !~ /$exception_class/) {
1566       # ensure that a failed rollback resets the transaction depth
1567       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1568     }
1569
1570     $self->throw_exception($_)
1571   };
1572 }
1573
1574 sub _dbh_rollback {
1575   my $self = shift;
1576   my $dbh  = $self->_dbh
1577     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1578   $dbh->rollback;
1579 }
1580
1581 # This used to be the top-half of _execute.  It was split out to make it
1582 #  easier to override in NoBindVars without duping the rest.  It takes up
1583 #  all of _execute's args, and emits $sql, @bind.
1584 sub _prep_for_execute {
1585   my ($self, $op, $extra_bind, $ident, $args) = @_;
1586
1587   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1588     $ident = $ident->from();
1589   }
1590
1591   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1592
1593   unshift(@bind,
1594     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1595       if $extra_bind;
1596   return ($sql, \@bind);
1597 }
1598
1599
1600 sub _fix_bind_params {
1601     my ($self, @bind) = @_;
1602
1603     ### Turn @bind from something like this:
1604     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1605     ### to this:
1606     ###   ( "'1'", "'1'", "'3'" )
1607     return
1608         map {
1609             if ( defined( $_ && $_->[1] ) ) {
1610                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1611             }
1612             else { q{NULL}; }
1613         } @bind;
1614 }
1615
1616 sub _query_start {
1617     my ( $self, $sql, @bind ) = @_;
1618
1619     if ( $self->debug ) {
1620         @bind = $self->_fix_bind_params(@bind);
1621
1622         $self->debugobj->query_start( $sql, @bind );
1623     }
1624 }
1625
1626 sub _query_end {
1627     my ( $self, $sql, @bind ) = @_;
1628
1629     if ( $self->debug ) {
1630         @bind = $self->_fix_bind_params(@bind);
1631         $self->debugobj->query_end( $sql, @bind );
1632     }
1633 }
1634
1635 sub _dbh_execute {
1636   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1637
1638   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1639
1640   $self->_query_start( $sql, @$bind );
1641
1642   my $sth = $self->sth($sql,$op);
1643
1644   my $placeholder_index = 1;
1645
1646   foreach my $bound (@$bind) {
1647     my $attributes = {};
1648     my($column_name, @data) = @$bound;
1649
1650     if ($bind_attributes) {
1651       $attributes = $bind_attributes->{$column_name}
1652       if defined $bind_attributes->{$column_name};
1653     }
1654
1655     foreach my $data (@data) {
1656       my $ref = ref $data;
1657
1658       if ($ref and overload::Method($data, '""') ) {
1659         $data = "$data";
1660       }
1661       elsif ($ref eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1662         $sth->bind_param_inout(
1663           $placeholder_index++,
1664           $data,
1665           $self->_max_column_bytesize($ident, $column_name),
1666           $attributes
1667         );
1668         next;
1669       }
1670
1671       $sth->bind_param($placeholder_index++, $data, $attributes);
1672     }
1673   }
1674
1675   # Can this fail without throwing an exception anyways???
1676   my $rv = $sth->execute();
1677   $self->throw_exception(
1678     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1679   ) if !$rv;
1680
1681   $self->_query_end( $sql, @$bind );
1682
1683   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1684 }
1685
1686 sub _execute {
1687     my $self = shift;
1688     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1689 }
1690
1691 sub _prefetch_autovalues {
1692   my ($self, $source, $to_insert) = @_;
1693
1694   my $colinfo = $source->columns_info;
1695
1696   my %values;
1697   for my $col (keys %$colinfo) {
1698     if (
1699       $colinfo->{$col}{auto_nextval}
1700         and
1701       (
1702         ! exists $to_insert->{$col}
1703           or
1704         ref $to_insert->{$col} eq 'SCALAR'
1705       )
1706     ) {
1707       $values{$col} = $self->_sequence_fetch(
1708         'NEXTVAL',
1709         ( $colinfo->{$col}{sequence} ||=
1710             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1711         ),
1712       );
1713     }
1714   }
1715
1716   \%values;
1717 }
1718
1719 sub insert {
1720   my ($self, $source, $to_insert) = @_;
1721
1722   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1723
1724   # fuse the values
1725   $to_insert = { %$to_insert, %$prefetched_values };
1726
1727   # list of primary keys we try to fetch from the database
1728   # both not-exsists and scalarrefs are considered
1729   my %fetch_pks;
1730   for ($source->primary_columns) {
1731     $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
1732       if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
1733   }
1734
1735   my ($sqla_opts, @ir_container);
1736   if ($self->_use_insert_returning) {
1737
1738     # retain order as declared in the resultsource
1739     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1740       push @{$sqla_opts->{returning}}, $_;
1741       $sqla_opts->{returning_container} = \@ir_container
1742         if $self->_use_insert_returning_bound;
1743     }
1744   }
1745
1746   my $bind_attributes = $self->source_bind_attributes($source);
1747
1748   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1749
1750   my %returned_cols;
1751
1752   if (my $retlist = $sqla_opts->{returning}) {
1753     @ir_container = try {
1754       local $SIG{__WARN__} = sub {};
1755       my @r = $sth->fetchrow_array;
1756       $sth->finish;
1757       @r;
1758     } unless @ir_container;
1759
1760     @returned_cols{@$retlist} = @ir_container if @ir_container;
1761   }
1762
1763   return { %$prefetched_values, %returned_cols };
1764 }
1765
1766
1767 ## Currently it is assumed that all values passed will be "normal", i.e. not
1768 ## scalar refs, or at least, all the same type as the first set, the statement is
1769 ## only prepped once.
1770 sub insert_bulk {
1771   my ($self, $source, $cols, $data) = @_;
1772
1773   my %colvalues;
1774   @colvalues{@$cols} = (0..$#$cols);
1775
1776   for my $i (0..$#$cols) {
1777     my $first_val = $data->[0][$i];
1778     next unless ref $first_val eq 'SCALAR';
1779
1780     $colvalues{ $cols->[$i] } = $first_val;
1781   }
1782
1783   # check for bad data and stringify stringifiable objects
1784   my $bad_slice = sub {
1785     my ($msg, $col_idx, $slice_idx) = @_;
1786     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1787       $msg,
1788       $cols->[$col_idx],
1789       do {
1790         require Data::Dumper::Concise;
1791         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1792         Data::Dumper::Concise::Dumper ({
1793           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1794         }),
1795       }
1796     );
1797   };
1798
1799   for my $datum_idx (0..$#$data) {
1800     my $datum = $data->[$datum_idx];
1801
1802     for my $col_idx (0..$#$cols) {
1803       my $val            = $datum->[$col_idx];
1804       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1805       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1806
1807       if ($is_literal_sql) {
1808         if (not ref $val) {
1809           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1810         }
1811         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1812           $bad_slice->("$reftype reference found where literal SQL expected",
1813             $col_idx, $datum_idx);
1814         }
1815         elsif ($$val ne $$sqla_bind){
1816           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1817             $col_idx, $datum_idx);
1818         }
1819       }
1820       elsif (my $reftype = ref $val) {
1821         require overload;
1822         if (overload::Method($val, '""')) {
1823           $datum->[$col_idx] = "".$val;
1824         }
1825         else {
1826           $bad_slice->("$reftype reference found where bind expected",
1827             $col_idx, $datum_idx);
1828         }
1829       }
1830     }
1831   }
1832
1833   return $self->_insert_bulk(
1834     $source, $cols, \%colvalues, $data,
1835   );
1836 }
1837
1838 # Broken out so that it can be overridden in Storage/DBI/mysql.pm
1839 sub _insert_bulk {
1840   my ($self, $source, $cols, $colvalues, $data) = @_;
1841
1842   my ($sql, $bind) = $self->_prep_for_execute (
1843     'insert', undef, $source, [$colvalues]
1844   );
1845
1846   if (! @$bind) {
1847     # if the bindlist is empty - make sure all "values" are in fact
1848     # literal scalarrefs. If not the case this means the storage ate
1849     # them away (e.g. the NoBindVars component) and interpolated them
1850     # directly into the SQL. This obviosly can't be good for multi-inserts
1851
1852     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1853       if first { ref $_ ne 'SCALAR' } values %$colvalues;
1854   }
1855
1856   # neither _execute_array, nor _execute_inserts_with_no_binds are
1857   # atomic (even if _execute _array is a single call). Thus a safety
1858   # scope guard
1859   my $guard = $self->txn_scope_guard;
1860
1861   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1862   my $sth = $self->sth($sql);
1863   my $rv = do {
1864     if (@$bind) {
1865       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1866       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1867     }
1868     else {
1869       # bind_param_array doesn't work if there are no binds
1870       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1871     }
1872   };
1873
1874   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1875
1876   $guard->commit;
1877
1878   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1879 }
1880
1881 sub _execute_array {
1882   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1883
1884   ## This must be an arrayref, else nothing works!
1885   my $tuple_status = [];
1886
1887   ## Get the bind_attributes, if any exist
1888   my $bind_attributes = $self->source_bind_attributes($source);
1889
1890   ## Bind the values and execute
1891   my $placeholder_index = 1;
1892
1893   foreach my $bound (@$bind) {
1894
1895     my $attributes = {};
1896     my ($column_name, $data_index) = @$bound;
1897
1898     if( $bind_attributes ) {
1899       $attributes = $bind_attributes->{$column_name}
1900       if defined $bind_attributes->{$column_name};
1901     }
1902
1903     my @data = map { $_->[$data_index] } @$data;
1904
1905     $sth->bind_param_array(
1906       $placeholder_index,
1907       [@data],
1908       (%$attributes ?  $attributes : ()),
1909     );
1910     $placeholder_index++;
1911   }
1912
1913   my ($rv, $err);
1914   try {
1915     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1916   }
1917   catch {
1918     $err = shift;
1919   };
1920
1921   # Not all DBDs are create equal. Some throw on error, some return
1922   # an undef $rv, and some set $sth->err - try whatever we can
1923   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1924     ! defined $err
1925       and
1926     ( !defined $rv or $sth->err )
1927   );
1928
1929   # Statement must finish even if there was an exception.
1930   try {
1931     $sth->finish
1932   }
1933   catch {
1934     $err = shift unless defined $err
1935   };
1936
1937   if (defined $err) {
1938     my $i = 0;
1939     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1940
1941     $self->throw_exception("Unexpected populate error: $err")
1942       if ($i > $#$tuple_status);
1943
1944     require Data::Dumper::Concise;
1945     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1946       ($tuple_status->[$i][1] || $err),
1947       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
1948     );
1949   }
1950
1951   return $rv;
1952 }
1953
1954 sub _dbh_execute_array {
1955     my ($self, $sth, $tuple_status, @extra) = @_;
1956
1957     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1958 }
1959
1960 sub _dbh_execute_inserts_with_no_binds {
1961   my ($self, $sth, $count) = @_;
1962
1963   my $err;
1964   try {
1965     my $dbh = $self->_get_dbh;
1966     local $dbh->{RaiseError} = 1;
1967     local $dbh->{PrintError} = 0;
1968
1969     $sth->execute foreach 1..$count;
1970   }
1971   catch {
1972     $err = shift;
1973   };
1974
1975   # Make sure statement is finished even if there was an exception.
1976   try {
1977     $sth->finish
1978   }
1979   catch {
1980     $err = shift unless defined $err;
1981   };
1982
1983   $self->throw_exception($err) if defined $err;
1984
1985   return $count;
1986 }
1987
1988 sub update {
1989   my ($self, $source, @args) = @_;
1990
1991   my $bind_attrs = $self->source_bind_attributes($source);
1992
1993   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1994 }
1995
1996
1997 sub delete {
1998   my ($self, $source, @args) = @_;
1999
2000   my $bind_attrs = $self->source_bind_attributes($source);
2001
2002   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
2003 }
2004
2005 # We were sent here because the $rs contains a complex search
2006 # which will require a subquery to select the correct rows
2007 # (i.e. joined or limited resultsets, or non-introspectable conditions)
2008 #
2009 # Generating a single PK column subquery is trivial and supported
2010 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
2011 # Look at _multipk_update_delete()
2012 sub _subq_update_delete {
2013   my $self = shift;
2014   my ($rs, $op, $values) = @_;
2015
2016   my $rsrc = $rs->result_source;
2017
2018   # quick check if we got a sane rs on our hands
2019   my @pcols = $rsrc->_pri_cols;
2020
2021   my $sel = $rs->_resolved_attrs->{select};
2022   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
2023
2024   if (
2025       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
2026         ne
2027       join ("\x00", sort @$sel )
2028   ) {
2029     $self->throw_exception (
2030       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
2031     );
2032   }
2033
2034   if (@pcols == 1) {
2035     return $self->$op (
2036       $rsrc,
2037       $op eq 'update' ? $values : (),
2038       { $pcols[0] => { -in => $rs->as_query } },
2039     );
2040   }
2041
2042   else {
2043     return $self->_multipk_update_delete (@_);
2044   }
2045 }
2046
2047 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2048 # resultset update/delete involving subqueries. So by default resort
2049 # to simple (and inefficient) delete_all style per-row opearations,
2050 # while allowing specific storages to override this with a faster
2051 # implementation.
2052 #
2053 sub _multipk_update_delete {
2054   return shift->_per_row_update_delete (@_);
2055 }
2056
2057 # This is the default loop used to delete/update rows for multi PK
2058 # resultsets, and used by mysql exclusively (because it can't do anything
2059 # else).
2060 #
2061 # We do not use $row->$op style queries, because resultset update/delete
2062 # is not expected to cascade (this is what delete_all/update_all is for).
2063 #
2064 # There should be no race conditions as the entire operation is rolled
2065 # in a transaction.
2066 #
2067 sub _per_row_update_delete {
2068   my $self = shift;
2069   my ($rs, $op, $values) = @_;
2070
2071   my $rsrc = $rs->result_source;
2072   my @pcols = $rsrc->_pri_cols;
2073
2074   my $guard = $self->txn_scope_guard;
2075
2076   # emulate the return value of $sth->execute for non-selects
2077   my $row_cnt = '0E0';
2078
2079   my $subrs_cur = $rs->cursor;
2080   my @all_pk = $subrs_cur->all;
2081   for my $pks ( @all_pk) {
2082
2083     my $cond;
2084     for my $i (0.. $#pcols) {
2085       $cond->{$pcols[$i]} = $pks->[$i];
2086     }
2087
2088     $self->$op (
2089       $rsrc,
2090       $op eq 'update' ? $values : (),
2091       $cond,
2092     );
2093
2094     $row_cnt++;
2095   }
2096
2097   $guard->commit;
2098
2099   return $row_cnt;
2100 }
2101
2102 sub _select {
2103   my $self = shift;
2104   $self->_execute($self->_select_args(@_));
2105 }
2106
2107 sub _select_args_to_query {
2108   my $self = shift;
2109
2110   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
2111   #  = $self->_select_args($ident, $select, $cond, $attrs);
2112   my ($op, $bind, $ident, $bind_attrs, @args) =
2113     $self->_select_args(@_);
2114
2115   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2116   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
2117   $prepared_bind ||= [];
2118
2119   return wantarray
2120     ? ($sql, $prepared_bind, $bind_attrs)
2121     : \[ "($sql)", @$prepared_bind ]
2122   ;
2123 }
2124
2125 sub _select_args {
2126   my ($self, $ident, $select, $where, $attrs) = @_;
2127
2128   my $sql_maker = $self->sql_maker;
2129   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2130
2131   $attrs = {
2132     %$attrs,
2133     select => $select,
2134     from => $ident,
2135     where => $where,
2136     $rs_alias && $alias2source->{$rs_alias}
2137       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2138       : ()
2139     ,
2140   };
2141
2142   # calculate bind_attrs before possible $ident mangling
2143   my $bind_attrs = {};
2144   for my $alias (keys %$alias2source) {
2145     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2146     for my $col (keys %$bindtypes) {
2147
2148       my $fqcn = join ('.', $alias, $col);
2149       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2150
2151       # Unqialified column names are nice, but at the same time can be
2152       # rather ambiguous. What we do here is basically go along with
2153       # the loop, adding an unqualified column slot to $bind_attrs,
2154       # alongside the fully qualified name. As soon as we encounter
2155       # another column by that name (which would imply another table)
2156       # we unset the unqualified slot and never add any info to it
2157       # to avoid erroneous type binding. If this happens the users
2158       # only choice will be to fully qualify his column name
2159
2160       if (exists $bind_attrs->{$col}) {
2161         $bind_attrs->{$col} = {};
2162       }
2163       else {
2164         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2165       }
2166     }
2167   }
2168
2169   # Sanity check the attributes (SQLMaker does it too, but
2170   # in case of a software_limit we'll never reach there)
2171   if (defined $attrs->{offset}) {
2172     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2173       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2174   }
2175   $attrs->{offset} ||= 0;
2176
2177   if (defined $attrs->{rows}) {
2178     $self->throw_exception("The rows attribute must be a positive integer if present")
2179       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2180   }
2181   elsif ($attrs->{offset}) {
2182     # MySQL actually recommends this approach.  I cringe.
2183     $attrs->{rows} = $sql_maker->__max_int;
2184   }
2185
2186   my @limit;
2187
2188   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2189   # storage, unless software limit was requested
2190   if (
2191     #limited has_many
2192     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2193        ||
2194     # grouped prefetch (to satisfy group_by == select)
2195     ( $attrs->{group_by}
2196         &&
2197       @{$attrs->{group_by}}
2198         &&
2199       $attrs->{_prefetch_selector_range}
2200     )
2201   ) {
2202     ($ident, $select, $where, $attrs)
2203       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2204   }
2205   elsif (! $attrs->{software_limit} ) {
2206     push @limit, $attrs->{rows}, $attrs->{offset};
2207   }
2208
2209   # try to simplify the joinmap further (prune unreferenced type-single joins)
2210   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2211
2212 ###
2213   # This would be the point to deflate anything found in $where
2214   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2215   # expect a row object. And all we have is a resultsource (it is trivial
2216   # to extract deflator coderefs via $alias2source above).
2217   #
2218   # I don't see a way forward other than changing the way deflators are
2219   # invoked, and that's just bad...
2220 ###
2221
2222   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2223 }
2224
2225 # Returns a counting SELECT for a simple count
2226 # query. Abstracted so that a storage could override
2227 # this to { count => 'firstcol' } or whatever makes
2228 # sense as a performance optimization
2229 sub _count_select {
2230   #my ($self, $source, $rs_attrs) = @_;
2231   return { count => '*' };
2232 }
2233
2234
2235 sub source_bind_attributes {
2236   my ($self, $source) = @_;
2237
2238   my $bind_attributes;
2239
2240   my $colinfo = $source->columns_info;
2241
2242   for my $col (keys %$colinfo) {
2243     if (my $dt = $colinfo->{$col}{data_type} ) {
2244       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2245     }
2246   }
2247
2248   return $bind_attributes;
2249 }
2250
2251 =head2 select
2252
2253 =over 4
2254
2255 =item Arguments: $ident, $select, $condition, $attrs
2256
2257 =back
2258
2259 Handle a SQL select statement.
2260
2261 =cut
2262
2263 sub select {
2264   my $self = shift;
2265   my ($ident, $select, $condition, $attrs) = @_;
2266   return $self->cursor_class->new($self, \@_, $attrs);
2267 }
2268
2269 sub select_single {
2270   my $self = shift;
2271   my ($rv, $sth, @bind) = $self->_select(@_);
2272   my @row = $sth->fetchrow_array;
2273   my @nextrow = $sth->fetchrow_array if @row;
2274   if(@row && @nextrow) {
2275     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2276   }
2277   # Need to call finish() to work round broken DBDs
2278   $sth->finish();
2279   return @row;
2280 }
2281
2282 =head2 sql_limit_dialect
2283
2284 This is an accessor for the default SQL limit dialect used by a particular
2285 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2286 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2287 see L<DBIx::Class::SQLMaker::LimitDialects>.
2288
2289 =head2 sth
2290
2291 =over 4
2292
2293 =item Arguments: $sql
2294
2295 =back
2296
2297 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2298
2299 =cut
2300
2301 sub _dbh_sth {
2302   my ($self, $dbh, $sql) = @_;
2303
2304   # 3 is the if_active parameter which avoids active sth re-use
2305   my $sth = $self->disable_sth_caching
2306     ? $dbh->prepare($sql)
2307     : $dbh->prepare_cached($sql, {}, 3);
2308
2309   # XXX You would think RaiseError would make this impossible,
2310   #  but apparently that's not true :(
2311   $self->throw_exception(
2312     $dbh->errstr
2313       ||
2314     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2315             .'an exception and/or setting $dbh->errstr',
2316       length ($sql) > 20
2317         ? substr($sql, 0, 20) . '...'
2318         : $sql
2319       ,
2320       'DBD::' . $dbh->{Driver}{Name},
2321     )
2322   ) if !$sth;
2323
2324   $sth;
2325 }
2326
2327 sub sth {
2328   my ($self, $sql) = @_;
2329   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2330 }
2331
2332 sub _dbh_columns_info_for {
2333   my ($self, $dbh, $table) = @_;
2334
2335   if ($dbh->can('column_info')) {
2336     my %result;
2337     my $caught;
2338     try {
2339       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2340       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2341       $sth->execute();
2342       while ( my $info = $sth->fetchrow_hashref() ){
2343         my %column_info;
2344         $column_info{data_type}   = $info->{TYPE_NAME};
2345         $column_info{size}      = $info->{COLUMN_SIZE};
2346         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2347         $column_info{default_value} = $info->{COLUMN_DEF};
2348         my $col_name = $info->{COLUMN_NAME};
2349         $col_name =~ s/^\"(.*)\"$/$1/;
2350
2351         $result{$col_name} = \%column_info;
2352       }
2353     } catch {
2354       $caught = 1;
2355     };
2356     return \%result if !$caught && scalar keys %result;
2357   }
2358
2359   my %result;
2360   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2361   $sth->execute;
2362   my @columns = @{$sth->{NAME_lc}};
2363   for my $i ( 0 .. $#columns ){
2364     my %column_info;
2365     $column_info{data_type} = $sth->{TYPE}->[$i];
2366     $column_info{size} = $sth->{PRECISION}->[$i];
2367     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2368
2369     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2370       $column_info{data_type} = $1;
2371       $column_info{size}    = $2;
2372     }
2373
2374     $result{$columns[$i]} = \%column_info;
2375   }
2376   $sth->finish;
2377
2378   foreach my $col (keys %result) {
2379     my $colinfo = $result{$col};
2380     my $type_num = $colinfo->{data_type};
2381     my $type_name;
2382     if(defined $type_num && $dbh->can('type_info')) {
2383       my $type_info = $dbh->type_info($type_num);
2384       $type_name = $type_info->{TYPE_NAME} if $type_info;
2385       $colinfo->{data_type} = $type_name if $type_name;
2386     }
2387   }
2388
2389   return \%result;
2390 }
2391
2392 sub columns_info_for {
2393   my ($self, $table) = @_;
2394   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2395 }
2396
2397 =head2 last_insert_id
2398
2399 Return the row id of the last insert.
2400
2401 =cut
2402
2403 sub _dbh_last_insert_id {
2404     my ($self, $dbh, $source, $col) = @_;
2405
2406     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2407
2408     return $id if defined $id;
2409
2410     my $class = ref $self;
2411     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2412 }
2413
2414 sub last_insert_id {
2415   my $self = shift;
2416   $self->_dbh_last_insert_id ($self->_dbh, @_);
2417 }
2418
2419 =head2 _native_data_type
2420
2421 =over 4
2422
2423 =item Arguments: $type_name
2424
2425 =back
2426
2427 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2428 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2429 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2430
2431 The default implementation returns C<undef>, implement in your Storage driver if
2432 you need this functionality.
2433
2434 Should map types from other databases to the native RDBMS type, for example
2435 C<VARCHAR2> to C<VARCHAR>.
2436
2437 Types with modifiers should map to the underlying data type. For example,
2438 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2439
2440 Composite types should map to the container type, for example
2441 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2442
2443 =cut
2444
2445 sub _native_data_type {
2446   #my ($self, $data_type) = @_;
2447   return undef
2448 }
2449
2450 # Check if placeholders are supported at all
2451 sub _determine_supports_placeholders {
2452   my $self = shift;
2453   my $dbh  = $self->_get_dbh;
2454
2455   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2456   # but it is inaccurate more often than not
2457   return try {
2458     local $dbh->{PrintError} = 0;
2459     local $dbh->{RaiseError} = 1;
2460     $dbh->do('select ?', {}, 1);
2461     1;
2462   }
2463   catch {
2464     0;
2465   };
2466 }
2467
2468 # Check if placeholders bound to non-string types throw exceptions
2469 #
2470 sub _determine_supports_typeless_placeholders {
2471   my $self = shift;
2472   my $dbh  = $self->_get_dbh;
2473
2474   return try {
2475     local $dbh->{PrintError} = 0;
2476     local $dbh->{RaiseError} = 1;
2477     # this specifically tests a bind that is NOT a string
2478     $dbh->do('select 1 where 1 = ?', {}, 1);
2479     1;
2480   }
2481   catch {
2482     0;
2483   };
2484 }
2485
2486 =head2 sqlt_type
2487
2488 Returns the database driver name.
2489
2490 =cut
2491
2492 sub sqlt_type {
2493   shift->_get_dbh->{Driver}->{Name};
2494 }
2495
2496 =head2 bind_attribute_by_data_type
2497
2498 Given a datatype from column info, returns a database specific bind
2499 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2500 let the database planner just handle it.
2501
2502 Generally only needed for special case column types, like bytea in postgres.
2503
2504 =cut
2505
2506 sub bind_attribute_by_data_type {
2507     return;
2508 }
2509
2510 =head2 is_datatype_numeric
2511
2512 Given a datatype from column_info, returns a boolean value indicating if
2513 the current RDBMS considers it a numeric value. This controls how
2514 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2515 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2516 be performed instead of the usual C<eq>.
2517
2518 =cut
2519
2520 sub is_datatype_numeric {
2521   my ($self, $dt) = @_;
2522
2523   return 0 unless $dt;
2524
2525   return $dt =~ /^ (?:
2526     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2527   ) $/ix;
2528 }
2529
2530
2531 =head2 create_ddl_dir
2532
2533 =over 4
2534
2535 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2536
2537 =back
2538
2539 Creates a SQL file based on the Schema, for each of the specified
2540 database engines in C<\@databases> in the given directory.
2541 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2542
2543 Given a previous version number, this will also create a file containing
2544 the ALTER TABLE statements to transform the previous schema into the
2545 current one. Note that these statements may contain C<DROP TABLE> or
2546 C<DROP COLUMN> statements that can potentially destroy data.
2547
2548 The file names are created using the C<ddl_filename> method below, please
2549 override this method in your schema if you would like a different file
2550 name format. For the ALTER file, the same format is used, replacing
2551 $version in the name with "$preversion-$version".
2552
2553 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2554 The most common value for this would be C<< { add_drop_table => 1 } >>
2555 to have the SQL produced include a C<DROP TABLE> statement for each table
2556 created. For quoting purposes supply C<quote_table_names> and
2557 C<quote_field_names>.
2558
2559 If no arguments are passed, then the following default values are assumed:
2560
2561 =over 4
2562
2563 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2564
2565 =item version    - $schema->schema_version
2566
2567 =item directory  - './'
2568
2569 =item preversion - <none>
2570
2571 =back
2572
2573 By default, C<\%sqlt_args> will have
2574
2575  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2576
2577 merged with the hash passed in. To disable any of those features, pass in a
2578 hashref like the following
2579
2580  { ignore_constraint_names => 0, # ... other options }
2581
2582
2583 WARNING: You are strongly advised to check all SQL files created, before applying
2584 them.
2585
2586 =cut
2587
2588 sub create_ddl_dir {
2589   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2590
2591   unless ($dir) {
2592     carp "No directory given, using ./\n";
2593     $dir = './';
2594   } else {
2595       -d $dir
2596         or
2597       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2598         or
2599       $self->throw_exception(
2600         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2601       );
2602   }
2603
2604   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2605
2606   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2607   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2608
2609   my $schema_version = $schema->schema_version || '1.x';
2610   $version ||= $schema_version;
2611
2612   $sqltargs = {
2613     add_drop_table => 1,
2614     ignore_constraint_names => 1,
2615     ignore_index_names => 1,
2616     %{$sqltargs || {}}
2617   };
2618
2619   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2620     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2621   }
2622
2623   my $sqlt = SQL::Translator->new( $sqltargs );
2624
2625   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2626   my $sqlt_schema = $sqlt->translate({ data => $schema })
2627     or $self->throw_exception ($sqlt->error);
2628
2629   foreach my $db (@$databases) {
2630     $sqlt->reset();
2631     $sqlt->{schema} = $sqlt_schema;
2632     $sqlt->producer($db);
2633
2634     my $file;
2635     my $filename = $schema->ddl_filename($db, $version, $dir);
2636     if (-e $filename && ($version eq $schema_version )) {
2637       # if we are dumping the current version, overwrite the DDL
2638       carp "Overwriting existing DDL file - $filename";
2639       unlink($filename);
2640     }
2641
2642     my $output = $sqlt->translate;
2643     if(!$output) {
2644       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2645       next;
2646     }
2647     if(!open($file, ">$filename")) {
2648       $self->throw_exception("Can't open $filename for writing ($!)");
2649       next;
2650     }
2651     print $file $output;
2652     close($file);
2653
2654     next unless ($preversion);
2655
2656     require SQL::Translator::Diff;
2657
2658     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2659     if(!-e $prefilename) {
2660       carp("No previous schema file found ($prefilename)");
2661       next;
2662     }
2663
2664     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2665     if(-e $difffile) {
2666       carp("Overwriting existing diff file - $difffile");
2667       unlink($difffile);
2668     }
2669
2670     my $source_schema;
2671     {
2672       my $t = SQL::Translator->new($sqltargs);
2673       $t->debug( 0 );
2674       $t->trace( 0 );
2675
2676       $t->parser( $db )
2677         or $self->throw_exception ($t->error);
2678
2679       my $out = $t->translate( $prefilename )
2680         or $self->throw_exception ($t->error);
2681
2682       $source_schema = $t->schema;
2683
2684       $source_schema->name( $prefilename )
2685         unless ( $source_schema->name );
2686     }
2687
2688     # The "new" style of producers have sane normalization and can support
2689     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2690     # And we have to diff parsed SQL against parsed SQL.
2691     my $dest_schema = $sqlt_schema;
2692
2693     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2694       my $t = SQL::Translator->new($sqltargs);
2695       $t->debug( 0 );
2696       $t->trace( 0 );
2697
2698       $t->parser( $db )
2699         or $self->throw_exception ($t->error);
2700
2701       my $out = $t->translate( $filename )
2702         or $self->throw_exception ($t->error);
2703
2704       $dest_schema = $t->schema;
2705
2706       $dest_schema->name( $filename )
2707         unless $dest_schema->name;
2708     }
2709
2710     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2711                                                   $dest_schema,   $db,
2712                                                   $sqltargs
2713                                                  );
2714     if(!open $file, ">$difffile") {
2715       $self->throw_exception("Can't write to $difffile ($!)");
2716       next;
2717     }
2718     print $file $diff;
2719     close($file);
2720   }
2721 }
2722
2723 =head2 deployment_statements
2724
2725 =over 4
2726
2727 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2728
2729 =back
2730
2731 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2732
2733 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2734 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2735
2736 C<$directory> is used to return statements from files in a previously created
2737 L</create_ddl_dir> directory and is optional. The filenames are constructed
2738 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2739
2740 If no C<$directory> is specified then the statements are constructed on the
2741 fly using L<SQL::Translator> and C<$version> is ignored.
2742
2743 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2744
2745 =cut
2746
2747 sub deployment_statements {
2748   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2749   $type ||= $self->sqlt_type;
2750   $version ||= $schema->schema_version || '1.x';
2751   $dir ||= './';
2752   my $filename = $schema->ddl_filename($type, $version, $dir);
2753   if(-f $filename)
2754   {
2755       # FIXME replace this block when a proper sane sql parser is available
2756       my $file;
2757       open($file, "<$filename")
2758         or $self->throw_exception("Can't open $filename ($!)");
2759       my @rows = <$file>;
2760       close($file);
2761       return join('', @rows);
2762   }
2763
2764   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2765     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2766   }
2767
2768   # sources needs to be a parser arg, but for simplicty allow at top level
2769   # coming in
2770   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2771       if exists $sqltargs->{sources};
2772
2773   my $tr = SQL::Translator->new(
2774     producer => "SQL::Translator::Producer::${type}",
2775     %$sqltargs,
2776     parser => 'SQL::Translator::Parser::DBIx::Class',
2777     data => $schema,
2778   );
2779
2780   my @ret;
2781   if (wantarray) {
2782     @ret = $tr->translate;
2783   }
2784   else {
2785     $ret[0] = $tr->translate;
2786   }
2787
2788   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2789     unless (@ret && defined $ret[0]);
2790
2791   return wantarray ? @ret : $ret[0];
2792 }
2793
2794 # FIXME deploy() currently does not accurately report sql errors
2795 # Will always return true while errors are warned
2796 sub deploy {
2797   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2798   my $deploy = sub {
2799     my $line = shift;
2800     return if(!$line);
2801     return if($line =~ /^--/);
2802     # next if($line =~ /^DROP/m);
2803     return if($line =~ /^BEGIN TRANSACTION/m);
2804     return if($line =~ /^COMMIT/m);
2805     return if $line =~ /^\s+$/; # skip whitespace only
2806     $self->_query_start($line);
2807     try {
2808       # do a dbh_do cycle here, as we need some error checking in
2809       # place (even though we will ignore errors)
2810       $self->dbh_do (sub { $_[1]->do($line) });
2811     } catch {
2812       carp qq{$_ (running "${line}")};
2813     };
2814     $self->_query_end($line);
2815   };
2816   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2817   if (@statements > 1) {
2818     foreach my $statement (@statements) {
2819       $deploy->( $statement );
2820     }
2821   }
2822   elsif (@statements == 1) {
2823     # split on single line comments and end of statements
2824     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2825       $deploy->( $line );
2826     }
2827   }
2828 }
2829
2830 =head2 datetime_parser
2831
2832 Returns the datetime parser class
2833
2834 =cut
2835
2836 sub datetime_parser {
2837   my $self = shift;
2838   return $self->{datetime_parser} ||= do {
2839     $self->build_datetime_parser(@_);
2840   };
2841 }
2842
2843 =head2 datetime_parser_type
2844
2845 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2846
2847 =head2 build_datetime_parser
2848
2849 See L</datetime_parser>
2850
2851 =cut
2852
2853 sub build_datetime_parser {
2854   my $self = shift;
2855   my $type = $self->datetime_parser_type(@_);
2856   return $type;
2857 }
2858
2859
2860 =head2 is_replicating
2861
2862 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2863 replicate from a master database.  Default is undef, which is the result
2864 returned by databases that don't support replication.
2865
2866 =cut
2867
2868 sub is_replicating {
2869     return;
2870
2871 }
2872
2873 =head2 lag_behind_master
2874
2875 Returns a number that represents a certain amount of lag behind a master db
2876 when a given storage is replicating.  The number is database dependent, but
2877 starts at zero and increases with the amount of lag. Default in undef
2878
2879 =cut
2880
2881 sub lag_behind_master {
2882     return;
2883 }
2884
2885 =head2 relname_to_table_alias
2886
2887 =over 4
2888
2889 =item Arguments: $relname, $join_count
2890
2891 =back
2892
2893 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2894 queries.
2895
2896 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2897 way these aliases are named.
2898
2899 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2900 otherwise C<"$relname">.
2901
2902 =cut
2903
2904 sub relname_to_table_alias {
2905   my ($self, $relname, $join_count) = @_;
2906
2907   my $alias = ($join_count && $join_count > 1 ?
2908     join('_', $relname, $join_count) : $relname);
2909
2910   return $alias;
2911 }
2912
2913 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2914 # version and it may be necessary to amend or override it for a specific storage
2915 # if such binds are necessary.
2916 sub _max_column_bytesize {
2917   my ($self, $source, $col) = @_;
2918
2919   my $inf = $source->column_info($col);
2920   return $inf->{_max_bytesize} ||= do {
2921
2922     my $max_size;
2923
2924     if (my $data_type = $inf->{data_type}) {
2925       $data_type = lc($data_type);
2926
2927       # String/sized-binary types
2928       if ($data_type =~ /^(?:l?(?:var)?char(?:acter)?(?:\s*varying)?
2929                              |(?:var)?binary(?:\s*varying)?|raw)\b/x
2930       ) {
2931         $max_size = $inf->{size};
2932       }
2933       # Other charset/unicode types, assume scale of 4
2934       elsif ($data_type =~ /^(?:national\s*character(?:\s*varying)?|nchar
2935                               |univarchar
2936                               |nvarchar)\b/x
2937       ) {
2938         $max_size = $inf->{size} * 4 if $inf->{size};
2939       }
2940       # Blob types
2941       elsif ($self->_is_lob_type($data_type)) {
2942         # default to longreadlen
2943       }
2944       else {
2945         $max_size = 100;  # for all other (numeric?) datatypes
2946       }
2947     }
2948
2949     $max_size ||= $self->_get_dbh->{LongReadLen} || 8000;
2950   };
2951 }
2952
2953 # Determine if a data_type is some type of BLOB
2954 # FIXME: these regexes are expensive, result of these checks should be cached in
2955 # the column_info .
2956 sub _is_lob_type {
2957   my ($self, $data_type) = @_;
2958   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2959     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2960                                   |varchar|character\s*varying|nvarchar
2961                                   |national\s*character\s*varying))?\z/xi);
2962 }
2963
2964 sub _is_binary_lob_type {
2965   my ($self, $data_type) = @_;
2966   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2967     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2968 }
2969
2970 sub _is_text_lob_type {
2971   my ($self, $data_type) = @_;
2972   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2973     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2974                         |national\s*character\s*varying))\z/xi);
2975 }
2976
2977 1;
2978
2979 =head1 USAGE NOTES
2980
2981 =head2 DBIx::Class and AutoCommit
2982
2983 DBIx::Class can do some wonderful magic with handling exceptions,
2984 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2985 (the default) combined with C<txn_do> for transaction support.
2986
2987 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2988 in an assumed transaction between commits, and you're telling us you'd
2989 like to manage that manually.  A lot of the magic protections offered by
2990 this module will go away.  We can't protect you from exceptions due to database
2991 disconnects because we don't know anything about how to restart your
2992 transactions.  You're on your own for handling all sorts of exceptional
2993 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2994 be with raw DBI.
2995
2996
2997 =head1 AUTHORS
2998
2999 Matt S. Trout <mst@shadowcatsystems.co.uk>
3000
3001 Andy Grundman <andy@hybridized.org>
3002
3003 =head1 LICENSE
3004
3005 You may distribute this code under the same terms as Perl itself.
3006
3007 =cut