2dc005c8f08eb86d6230319b5e00dcdb4dbba2e5
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Sub::Name 'subname';
16 use Try::Tiny;
17 use overload ();
18 use namespace::clean;
19
20
21 # default cursor class, overridable in connect_info attributes
22 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
23
24 __PACKAGE__->mk_group_accessors('inherited' => qw/
25   sql_limit_dialect sql_quote_char sql_name_sep
26 /);
27
28 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
29
30 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
31 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
32
33 __PACKAGE__->sql_name_sep('.');
34
35 __PACKAGE__->mk_group_accessors('simple' => qw/
36   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
37   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
38   transaction_depth _dbh_autocommit  savepoints
39 /);
40
41 # the values for these accessors are picked out (and deleted) from
42 # the attribute hashref passed to connect_info
43 my @storage_options = qw/
44   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
45   disable_sth_caching unsafe auto_savepoint
46 /;
47 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
48
49
50 # capability definitions, using a 2-tiered accessor system
51 # The rationale is:
52 #
53 # A driver/user may define _use_X, which blindly without any checks says:
54 # "(do not) use this capability", (use_dbms_capability is an "inherited"
55 # type accessor)
56 #
57 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
58 # accessor, which in turn calls _determine_supports_X, and stores the return
59 # in a special slot on the storage object, which is wiped every time a $dbh
60 # reconnection takes place (it is not guaranteed that upon reconnection we
61 # will get the same rdbms version). _determine_supports_X does not need to
62 # exist on a driver, as we ->can for it before calling.
63
64 my @capabilities = (qw/
65   insert_returning
66   insert_returning_bound
67   placeholders
68   typeless_placeholders
69   join_optimizer
70 /);
71 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
72 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
73
74 # on by default, not strictly a capability (pending rewrite)
75 __PACKAGE__->_use_join_optimizer (1);
76 sub _determine_supports_join_optimizer { 1 };
77
78 # Each of these methods need _determine_driver called before itself
79 # in order to function reliably. This is a purely DRY optimization
80 #
81 # get_(use)_dbms_capability need to be called on the correct Storage
82 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
83 # _determine_supports_X which obv. needs a correct driver as well
84 my @rdbms_specific_methods = qw/
85   deployment_statements
86   sqlt_type
87   sql_maker
88   build_datetime_parser
89   datetime_parser_type
90
91   insert
92   insert_bulk
93   update
94   delete
95   select
96   select_single
97
98   get_use_dbms_capability
99   get_dbms_capability
100
101   _server_info
102   _get_server_version
103 /;
104
105 for my $meth (@rdbms_specific_methods) {
106
107   my $orig = __PACKAGE__->can ($meth)
108     or die "$meth is not a ::Storage::DBI method!";
109
110   no strict qw/refs/;
111   no warnings qw/redefine/;
112   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
113     if (
114       # only fire when invoked on an instance, a valid class-based invocation
115       # would e.g. be setting a default for an inherited accessor
116       ref $_[0]
117         and
118       ! $_[0]->_driver_determined
119         and
120       ! $_[0]->{_in_determine_driver}
121     ) {
122       $_[0]->_determine_driver;
123
124       # This for some reason crashes and burns on perl 5.8.1
125       # IFF the method ends up throwing an exception
126       #goto $_[0]->can ($meth);
127
128       my $cref = $_[0]->can ($meth);
129       goto $cref;
130     }
131
132     goto $orig;
133   };
134 }
135
136
137 =head1 NAME
138
139 DBIx::Class::Storage::DBI - DBI storage handler
140
141 =head1 SYNOPSIS
142
143   my $schema = MySchema->connect('dbi:SQLite:my.db');
144
145   $schema->storage->debug(1);
146
147   my @stuff = $schema->storage->dbh_do(
148     sub {
149       my ($storage, $dbh, @args) = @_;
150       $dbh->do("DROP TABLE authors");
151     },
152     @column_list
153   );
154
155   $schema->resultset('Book')->search({
156      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
157   });
158
159 =head1 DESCRIPTION
160
161 This class represents the connection to an RDBMS via L<DBI>.  See
162 L<DBIx::Class::Storage> for general information.  This pod only
163 documents DBI-specific methods and behaviors.
164
165 =head1 METHODS
166
167 =cut
168
169 sub new {
170   my $new = shift->next::method(@_);
171
172   $new->transaction_depth(0);
173   $new->_sql_maker_opts({});
174   $new->_dbh_details({});
175   $new->{savepoints} = [];
176   $new->{_in_dbh_do} = 0;
177   $new->{_dbh_gen} = 0;
178
179   # read below to see what this does
180   $new->_arm_global_destructor;
181
182   $new;
183 }
184
185 # This is hack to work around perl shooting stuff in random
186 # order on exit(). If we do not walk the remaining storage
187 # objects in an END block, there is a *small but real* chance
188 # of a fork()ed child to kill the parent's shared DBI handle,
189 # *before perl reaches the DESTROY in this package*
190 # Yes, it is ugly and effective.
191 # Additionally this registry is used by the CLONE method to
192 # make sure no handles are shared between threads
193 {
194   my %seek_and_destroy;
195
196   sub _arm_global_destructor {
197     my $self = shift;
198     my $key = refaddr ($self);
199     $seek_and_destroy{$key} = $self;
200     weaken ($seek_and_destroy{$key});
201   }
202
203   END {
204     local $?; # just in case the DBI destructor changes it somehow
205
206     # destroy just the object if not native to this process/thread
207     $_->_verify_pid for (grep
208       { defined $_ }
209       values %seek_and_destroy
210     );
211   }
212
213   sub CLONE {
214     # As per DBI's recommendation, DBIC disconnects all handles as
215     # soon as possible (DBIC will reconnect only on demand from within
216     # the thread)
217     for (values %seek_and_destroy) {
218       next unless $_;
219       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
220       $_->_dbh(undef);
221     }
222   }
223 }
224
225 sub DESTROY {
226   my $self = shift;
227
228   # some databases spew warnings on implicit disconnect
229   local $SIG{__WARN__} = sub {};
230   $self->_dbh(undef);
231
232   # this op is necessary, since the very last perl runtime statement
233   # triggers a global destruction shootout, and the $SIG localization
234   # may very well be destroyed before perl actually gets to do the
235   # $dbh undef
236   1;
237 }
238
239 # handle pid changes correctly - do not destroy parent's connection
240 sub _verify_pid {
241   my $self = shift;
242
243   my $pid = $self->_conn_pid;
244   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
245     $dbh->{InactiveDestroy} = 1;
246     $self->{_dbh_gen}++;
247     $self->_dbh(undef);
248   }
249
250   return;
251 }
252
253 =head2 connect_info
254
255 This method is normally called by L<DBIx::Class::Schema/connection>, which
256 encapsulates its argument list in an arrayref before passing them here.
257
258 The argument list may contain:
259
260 =over
261
262 =item *
263
264 The same 4-element argument set one would normally pass to
265 L<DBI/connect>, optionally followed by
266 L<extra attributes|/DBIx::Class specific connection attributes>
267 recognized by DBIx::Class:
268
269   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
270
271 =item *
272
273 A single code reference which returns a connected
274 L<DBI database handle|DBI/connect> optionally followed by
275 L<extra attributes|/DBIx::Class specific connection attributes> recognized
276 by DBIx::Class:
277
278   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
279
280 =item *
281
282 A single hashref with all the attributes and the dsn/user/password
283 mixed together:
284
285   $connect_info_args = [{
286     dsn => $dsn,
287     user => $user,
288     password => $pass,
289     %dbi_attributes,
290     %extra_attributes,
291   }];
292
293   $connect_info_args = [{
294     dbh_maker => sub { DBI->connect (...) },
295     %dbi_attributes,
296     %extra_attributes,
297   }];
298
299 This is particularly useful for L<Catalyst> based applications, allowing the
300 following config (L<Config::General> style):
301
302   <Model::DB>
303     schema_class   App::DB
304     <connect_info>
305       dsn          dbi:mysql:database=test
306       user         testuser
307       password     TestPass
308       AutoCommit   1
309     </connect_info>
310   </Model::DB>
311
312 The C<dsn>/C<user>/C<password> combination can be substituted by the
313 C<dbh_maker> key whose value is a coderef that returns a connected
314 L<DBI database handle|DBI/connect>
315
316 =back
317
318 Please note that the L<DBI> docs recommend that you always explicitly
319 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
320 recommends that it be set to I<1>, and that you perform transactions
321 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
322 to I<1> if you do not do explicitly set it to zero.  This is the default
323 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
324
325 =head3 DBIx::Class specific connection attributes
326
327 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
328 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
329 the following connection options. These options can be mixed in with your other
330 L<DBI> connection attributes, or placed in a separate hashref
331 (C<\%extra_attributes>) as shown above.
332
333 Every time C<connect_info> is invoked, any previous settings for
334 these options will be cleared before setting the new ones, regardless of
335 whether any options are specified in the new C<connect_info>.
336
337
338 =over
339
340 =item on_connect_do
341
342 Specifies things to do immediately after connecting or re-connecting to
343 the database.  Its value may contain:
344
345 =over
346
347 =item a scalar
348
349 This contains one SQL statement to execute.
350
351 =item an array reference
352
353 This contains SQL statements to execute in order.  Each element contains
354 a string or a code reference that returns a string.
355
356 =item a code reference
357
358 This contains some code to execute.  Unlike code references within an
359 array reference, its return value is ignored.
360
361 =back
362
363 =item on_disconnect_do
364
365 Takes arguments in the same form as L</on_connect_do> and executes them
366 immediately before disconnecting from the database.
367
368 Note, this only runs if you explicitly call L</disconnect> on the
369 storage object.
370
371 =item on_connect_call
372
373 A more generalized form of L</on_connect_do> that calls the specified
374 C<connect_call_METHOD> methods in your storage driver.
375
376   on_connect_do => 'select 1'
377
378 is equivalent to:
379
380   on_connect_call => [ [ do_sql => 'select 1' ] ]
381
382 Its values may contain:
383
384 =over
385
386 =item a scalar
387
388 Will call the C<connect_call_METHOD> method.
389
390 =item a code reference
391
392 Will execute C<< $code->($storage) >>
393
394 =item an array reference
395
396 Each value can be a method name or code reference.
397
398 =item an array of arrays
399
400 For each array, the first item is taken to be the C<connect_call_> method name
401 or code reference, and the rest are parameters to it.
402
403 =back
404
405 Some predefined storage methods you may use:
406
407 =over
408
409 =item do_sql
410
411 Executes a SQL string or a code reference that returns a SQL string. This is
412 what L</on_connect_do> and L</on_disconnect_do> use.
413
414 It can take:
415
416 =over
417
418 =item a scalar
419
420 Will execute the scalar as SQL.
421
422 =item an arrayref
423
424 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
425 attributes hashref and bind values.
426
427 =item a code reference
428
429 Will execute C<< $code->($storage) >> and execute the return array refs as
430 above.
431
432 =back
433
434 =item datetime_setup
435
436 Execute any statements necessary to initialize the database session to return
437 and accept datetime/timestamp values used with
438 L<DBIx::Class::InflateColumn::DateTime>.
439
440 Only necessary for some databases, see your specific storage driver for
441 implementation details.
442
443 =back
444
445 =item on_disconnect_call
446
447 Takes arguments in the same form as L</on_connect_call> and executes them
448 immediately before disconnecting from the database.
449
450 Calls the C<disconnect_call_METHOD> methods as opposed to the
451 C<connect_call_METHOD> methods called by L</on_connect_call>.
452
453 Note, this only runs if you explicitly call L</disconnect> on the
454 storage object.
455
456 =item disable_sth_caching
457
458 If set to a true value, this option will disable the caching of
459 statement handles via L<DBI/prepare_cached>.
460
461 =item limit_dialect
462
463 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
464 default L</sql_limit_dialect> setting of the storage (if any). For a list
465 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
466
467 =item quote_names
468
469 When true automatically sets L</quote_char> and L</name_sep> to the characters
470 appropriate for your particular RDBMS. This option is preferred over specifying
471 L</quote_char> directly.
472
473 =item quote_char
474
475 Specifies what characters to use to quote table and column names.
476
477 C<quote_char> expects either a single character, in which case is it
478 is placed on either side of the table/column name, or an arrayref of length
479 2 in which case the table/column name is placed between the elements.
480
481 For example under MySQL you should use C<< quote_char => '`' >>, and for
482 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
483
484 =item name_sep
485
486 This parameter is only useful in conjunction with C<quote_char>, and is used to
487 specify the character that separates elements (schemas, tables, columns) from
488 each other. If unspecified it defaults to the most commonly used C<.>.
489
490 =item unsafe
491
492 This Storage driver normally installs its own C<HandleError>, sets
493 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
494 all database handles, including those supplied by a coderef.  It does this
495 so that it can have consistent and useful error behavior.
496
497 If you set this option to a true value, Storage will not do its usual
498 modifications to the database handle's attributes, and instead relies on
499 the settings in your connect_info DBI options (or the values you set in
500 your connection coderef, in the case that you are connecting via coderef).
501
502 Note that your custom settings can cause Storage to malfunction,
503 especially if you set a C<HandleError> handler that suppresses exceptions
504 and/or disable C<RaiseError>.
505
506 =item auto_savepoint
507
508 If this option is true, L<DBIx::Class> will use savepoints when nesting
509 transactions, making it possible to recover from failure in the inner
510 transaction without having to abort all outer transactions.
511
512 =item cursor_class
513
514 Use this argument to supply a cursor class other than the default
515 L<DBIx::Class::Storage::DBI::Cursor>.
516
517 =back
518
519 Some real-life examples of arguments to L</connect_info> and
520 L<DBIx::Class::Schema/connect>
521
522   # Simple SQLite connection
523   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
524
525   # Connect via subref
526   ->connect_info([ sub { DBI->connect(...) } ]);
527
528   # Connect via subref in hashref
529   ->connect_info([{
530     dbh_maker => sub { DBI->connect(...) },
531     on_connect_do => 'alter session ...',
532   }]);
533
534   # A bit more complicated
535   ->connect_info(
536     [
537       'dbi:Pg:dbname=foo',
538       'postgres',
539       'my_pg_password',
540       { AutoCommit => 1 },
541       { quote_char => q{"} },
542     ]
543   );
544
545   # Equivalent to the previous example
546   ->connect_info(
547     [
548       'dbi:Pg:dbname=foo',
549       'postgres',
550       'my_pg_password',
551       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
552     ]
553   );
554
555   # Same, but with hashref as argument
556   # See parse_connect_info for explanation
557   ->connect_info(
558     [{
559       dsn         => 'dbi:Pg:dbname=foo',
560       user        => 'postgres',
561       password    => 'my_pg_password',
562       AutoCommit  => 1,
563       quote_char  => q{"},
564       name_sep    => q{.},
565     }]
566   );
567
568   # Subref + DBIx::Class-specific connection options
569   ->connect_info(
570     [
571       sub { DBI->connect(...) },
572       {
573           quote_char => q{`},
574           name_sep => q{@},
575           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
576           disable_sth_caching => 1,
577       },
578     ]
579   );
580
581
582
583 =cut
584
585 sub connect_info {
586   my ($self, $info) = @_;
587
588   return $self->_connect_info if !$info;
589
590   $self->_connect_info($info); # copy for _connect_info
591
592   $info = $self->_normalize_connect_info($info)
593     if ref $info eq 'ARRAY';
594
595   for my $storage_opt (keys %{ $info->{storage_options} }) {
596     my $value = $info->{storage_options}{$storage_opt};
597
598     $self->$storage_opt($value);
599   }
600
601   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
602   #  the new set of options
603   $self->_sql_maker(undef);
604   $self->_sql_maker_opts({});
605
606   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
607     my $value = $info->{sql_maker_options}{$sql_maker_opt};
608
609     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
610   }
611
612   my %attrs = (
613     %{ $self->_default_dbi_connect_attributes || {} },
614     %{ $info->{attributes} || {} },
615   );
616
617   my @args = @{ $info->{arguments} };
618
619   if (keys %attrs and ref $args[0] ne 'CODE') {
620     carp
621         'You provided explicit AutoCommit => 0 in your connection_info. '
622       . 'This is almost universally a bad idea (see the footnotes of '
623       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
624       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
625       . 'this warning.'
626       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
627
628     push @args, \%attrs if keys %attrs;
629   }
630   $self->_dbi_connect_info(\@args);
631
632   # FIXME - dirty:
633   # save attributes them in a separate accessor so they are always
634   # introspectable, even in case of a CODE $dbhmaker
635   $self->_dbic_connect_attributes (\%attrs);
636
637   return $self->_connect_info;
638 }
639
640 sub _normalize_connect_info {
641   my ($self, $info_arg) = @_;
642   my %info;
643
644   my @args = @$info_arg;  # take a shallow copy for further mutilation
645
646   # combine/pre-parse arguments depending on invocation style
647
648   my %attrs;
649   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
650     %attrs = %{ $args[1] || {} };
651     @args = $args[0];
652   }
653   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
654     %attrs = %{$args[0]};
655     @args = ();
656     if (my $code = delete $attrs{dbh_maker}) {
657       @args = $code;
658
659       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
660       if (@ignored) {
661         carp sprintf (
662             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
663           . "to the result of 'dbh_maker'",
664
665           join (', ', map { "'$_'" } (@ignored) ),
666         );
667       }
668     }
669     else {
670       @args = delete @attrs{qw/dsn user password/};
671     }
672   }
673   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
674     %attrs = (
675       % { $args[3] || {} },
676       % { $args[4] || {} },
677     );
678     @args = @args[0,1,2];
679   }
680
681   $info{arguments} = \@args;
682
683   my @storage_opts = grep exists $attrs{$_},
684     @storage_options, 'cursor_class';
685
686   @{ $info{storage_options} }{@storage_opts} =
687     delete @attrs{@storage_opts} if @storage_opts;
688
689   my @sql_maker_opts = grep exists $attrs{$_},
690     qw/limit_dialect quote_char name_sep quote_names/;
691
692   @{ $info{sql_maker_options} }{@sql_maker_opts} =
693     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
694
695   $info{attributes} = \%attrs if %attrs;
696
697   return \%info;
698 }
699
700 sub _default_dbi_connect_attributes () {
701   +{
702     AutoCommit => 1,
703     PrintError => 0,
704     RaiseError => 1,
705     ShowErrorStatement => 1,
706   };
707 }
708
709 =head2 on_connect_do
710
711 This method is deprecated in favour of setting via L</connect_info>.
712
713 =cut
714
715 =head2 on_disconnect_do
716
717 This method is deprecated in favour of setting via L</connect_info>.
718
719 =cut
720
721 sub _parse_connect_do {
722   my ($self, $type) = @_;
723
724   my $val = $self->$type;
725   return () if not defined $val;
726
727   my @res;
728
729   if (not ref($val)) {
730     push @res, [ 'do_sql', $val ];
731   } elsif (ref($val) eq 'CODE') {
732     push @res, $val;
733   } elsif (ref($val) eq 'ARRAY') {
734     push @res, map { [ 'do_sql', $_ ] } @$val;
735   } else {
736     $self->throw_exception("Invalid type for $type: ".ref($val));
737   }
738
739   return \@res;
740 }
741
742 =head2 dbh_do
743
744 Arguments: ($subref | $method_name), @extra_coderef_args?
745
746 Execute the given $subref or $method_name using the new exception-based
747 connection management.
748
749 The first two arguments will be the storage object that C<dbh_do> was called
750 on and a database handle to use.  Any additional arguments will be passed
751 verbatim to the called subref as arguments 2 and onwards.
752
753 Using this (instead of $self->_dbh or $self->dbh) ensures correct
754 exception handling and reconnection (or failover in future subclasses).
755
756 Your subref should have no side-effects outside of the database, as
757 there is the potential for your subref to be partially double-executed
758 if the database connection was stale/dysfunctional.
759
760 Example:
761
762   my @stuff = $schema->storage->dbh_do(
763     sub {
764       my ($storage, $dbh, @cols) = @_;
765       my $cols = join(q{, }, @cols);
766       $dbh->selectrow_array("SELECT $cols FROM foo");
767     },
768     @column_list
769   );
770
771 =cut
772
773 sub dbh_do {
774   my $self = shift;
775   my $code = shift;
776
777   my $dbh = $self->_get_dbh;
778
779   return $self->$code($dbh, @_)
780     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
781
782   local $self->{_in_dbh_do} = 1;
783
784   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
785   my $args = \@_;
786   return try {
787     $self->$code ($dbh, @$args);
788   } catch {
789     $self->throw_exception($_) if $self->connected;
790
791     # We were not connected - reconnect and retry, but let any
792     #  exception fall right through this time
793     carp "Retrying $code after catching disconnected exception: $_"
794       if $ENV{DBIC_DBIRETRY_DEBUG};
795
796     $self->_populate_dbh;
797     $self->$code($self->_dbh, @$args);
798   };
799 }
800
801 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
802 # It also informs dbh_do to bypass itself while under the direction of txn_do,
803 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
804 sub txn_do {
805   my $self = shift;
806   my $coderef = shift;
807
808   ref $coderef eq 'CODE' or $self->throw_exception
809     ('$coderef must be a CODE reference');
810
811   local $self->{_in_dbh_do} = 1;
812
813   my @result;
814   my $want = wantarray;
815
816   my $tried = 0;
817   while(1) {
818     my $exception;
819
820     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
821     my $args = \@_;
822
823     try {
824       $self->txn_begin;
825       my $txn_start_depth = $self->transaction_depth;
826       if($want) {
827           @result = $coderef->(@$args);
828       }
829       elsif(defined $want) {
830           $result[0] = $coderef->(@$args);
831       }
832       else {
833           $coderef->(@$args);
834       }
835
836       my $delta_txn = $txn_start_depth - $self->transaction_depth;
837       if ($delta_txn == 0) {
838         $self->txn_commit;
839       }
840       elsif ($delta_txn != 1) {
841         # an off-by-one would mean we fired a rollback
842         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
843       }
844     } catch {
845       $exception = $_;
846     };
847
848     if(! defined $exception) { return wantarray ? @result : $result[0] }
849
850     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
851       my $rollback_exception;
852       try { $self->txn_rollback } catch { $rollback_exception = shift };
853       if(defined $rollback_exception) {
854         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
855         $self->throw_exception($exception)  # propagate nested rollback
856           if $rollback_exception =~ /$exception_class/;
857
858         $self->throw_exception(
859           "Transaction aborted: ${exception}. "
860           . "Rollback failed: ${rollback_exception}"
861         );
862       }
863       $self->throw_exception($exception)
864     }
865
866     # We were not connected, and was first try - reconnect and retry
867     # via the while loop
868     carp "Retrying $coderef after catching disconnected exception: $exception"
869       if $ENV{DBIC_TXNRETRY_DEBUG};
870     $self->_populate_dbh;
871   }
872 }
873
874 =head2 disconnect
875
876 Our C<disconnect> method also performs a rollback first if the
877 database is not in C<AutoCommit> mode.
878
879 =cut
880
881 sub disconnect {
882   my ($self) = @_;
883
884   if( $self->_dbh ) {
885     my @actions;
886
887     push @actions, ( $self->on_disconnect_call || () );
888     push @actions, $self->_parse_connect_do ('on_disconnect_do');
889
890     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
891
892     $self->_dbh_rollback unless $self->_dbh_autocommit;
893
894     %{ $self->_dbh->{CachedKids} } = ();
895     $self->_dbh->disconnect;
896     $self->_dbh(undef);
897     $self->{_dbh_gen}++;
898   }
899 }
900
901 =head2 with_deferred_fk_checks
902
903 =over 4
904
905 =item Arguments: C<$coderef>
906
907 =item Return Value: The return value of $coderef
908
909 =back
910
911 Storage specific method to run the code ref with FK checks deferred or
912 in MySQL's case disabled entirely.
913
914 =cut
915
916 # Storage subclasses should override this
917 sub with_deferred_fk_checks {
918   my ($self, $sub) = @_;
919   $sub->();
920 }
921
922 =head2 connected
923
924 =over
925
926 =item Arguments: none
927
928 =item Return Value: 1|0
929
930 =back
931
932 Verifies that the current database handle is active and ready to execute
933 an SQL statement (e.g. the connection did not get stale, server is still
934 answering, etc.) This method is used internally by L</dbh>.
935
936 =cut
937
938 sub connected {
939   my $self = shift;
940   return 0 unless $self->_seems_connected;
941
942   #be on the safe side
943   local $self->_dbh->{RaiseError} = 1;
944
945   return $self->_ping;
946 }
947
948 sub _seems_connected {
949   my $self = shift;
950
951   $self->_verify_pid;
952
953   my $dbh = $self->_dbh
954     or return 0;
955
956   return $dbh->FETCH('Active');
957 }
958
959 sub _ping {
960   my $self = shift;
961
962   my $dbh = $self->_dbh or return 0;
963
964   return $dbh->ping;
965 }
966
967 sub ensure_connected {
968   my ($self) = @_;
969
970   unless ($self->connected) {
971     $self->_populate_dbh;
972   }
973 }
974
975 =head2 dbh
976
977 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
978 is guaranteed to be healthy by implicitly calling L</connected>, and if
979 necessary performing a reconnection before returning. Keep in mind that this
980 is very B<expensive> on some database engines. Consider using L</dbh_do>
981 instead.
982
983 =cut
984
985 sub dbh {
986   my ($self) = @_;
987
988   if (not $self->_dbh) {
989     $self->_populate_dbh;
990   } else {
991     $self->ensure_connected;
992   }
993   return $self->_dbh;
994 }
995
996 # this is the internal "get dbh or connect (don't check)" method
997 sub _get_dbh {
998   my $self = shift;
999   $self->_verify_pid;
1000   $self->_populate_dbh unless $self->_dbh;
1001   return $self->_dbh;
1002 }
1003
1004 sub sql_maker {
1005   my ($self) = @_;
1006   unless ($self->_sql_maker) {
1007     my $sql_maker_class = $self->sql_maker_class;
1008
1009     my %opts = %{$self->_sql_maker_opts||{}};
1010     my $dialect =
1011       $opts{limit_dialect}
1012         ||
1013       $self->sql_limit_dialect
1014         ||
1015       do {
1016         my $s_class = (ref $self) || $self;
1017         carp (
1018           "Your storage class ($s_class) does not set sql_limit_dialect and you "
1019         . 'have not supplied an explicit limit_dialect in your connection_info. '
1020         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1021         . 'databases but can be (and often is) painfully slow. '
1022         . "Please file an RT ticket against '$s_class' ."
1023         );
1024
1025         'GenericSubQ';
1026       }
1027     ;
1028
1029     my ($quote_char, $name_sep);
1030
1031     if ($opts{quote_names}) {
1032       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1033         my $s_class = (ref $self) || $self;
1034         carp (
1035           "You requested 'quote_names' but your storage class ($s_class) does "
1036         . 'not explicitly define a default sql_quote_char and you have not '
1037         . 'supplied a quote_char as part of your connection_info. DBIC will '
1038         .q{default to the ANSI SQL standard quote '"', which works most of }
1039         . "the time. Please file an RT ticket against '$s_class'."
1040         );
1041
1042         '"'; # RV
1043       };
1044
1045       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1046     }
1047
1048     $self->_sql_maker($sql_maker_class->new(
1049       bindtype=>'columns',
1050       array_datatypes => 1,
1051       limit_dialect => $dialect,
1052       ($quote_char ? (quote_char => $quote_char) : ()),
1053       name_sep => ($name_sep || '.'),
1054       %opts,
1055     ));
1056   }
1057   return $self->_sql_maker;
1058 }
1059
1060 # nothing to do by default
1061 sub _rebless {}
1062 sub _init {}
1063
1064 sub _populate_dbh {
1065   my ($self) = @_;
1066
1067   my @info = @{$self->_dbi_connect_info || []};
1068   $self->_dbh(undef); # in case ->connected failed we might get sent here
1069   $self->_dbh_details({}); # reset everything we know
1070
1071   $self->_dbh($self->_connect(@info));
1072
1073   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1074
1075   $self->_determine_driver;
1076
1077   # Always set the transaction depth on connect, since
1078   #  there is no transaction in progress by definition
1079   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1080
1081   $self->_run_connection_actions unless $self->{_in_determine_driver};
1082 }
1083
1084 sub _run_connection_actions {
1085   my $self = shift;
1086   my @actions;
1087
1088   push @actions, ( $self->on_connect_call || () );
1089   push @actions, $self->_parse_connect_do ('on_connect_do');
1090
1091   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1092 }
1093
1094
1095
1096 sub set_use_dbms_capability {
1097   $_[0]->set_inherited ($_[1], $_[2]);
1098 }
1099
1100 sub get_use_dbms_capability {
1101   my ($self, $capname) = @_;
1102
1103   my $use = $self->get_inherited ($capname);
1104   return defined $use
1105     ? $use
1106     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1107   ;
1108 }
1109
1110 sub set_dbms_capability {
1111   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1112 }
1113
1114 sub get_dbms_capability {
1115   my ($self, $capname) = @_;
1116
1117   my $cap = $self->_dbh_details->{capability}{$capname};
1118
1119   unless (defined $cap) {
1120     if (my $meth = $self->can ("_determine$capname")) {
1121       $cap = $self->$meth ? 1 : 0;
1122     }
1123     else {
1124       $cap = 0;
1125     }
1126
1127     $self->set_dbms_capability ($capname, $cap);
1128   }
1129
1130   return $cap;
1131 }
1132
1133 sub _server_info {
1134   my $self = shift;
1135
1136   my $info;
1137   unless ($info = $self->_dbh_details->{info}) {
1138
1139     $info = {};
1140
1141     my $server_version = try { $self->_get_server_version };
1142
1143     if (defined $server_version) {
1144       $info->{dbms_version} = $server_version;
1145
1146       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1147       my @verparts = split (/\./, $numeric_version);
1148       if (
1149         @verparts
1150           &&
1151         $verparts[0] <= 999
1152       ) {
1153         # consider only up to 3 version parts, iff not more than 3 digits
1154         my @use_parts;
1155         while (@verparts && @use_parts < 3) {
1156           my $p = shift @verparts;
1157           last if $p > 999;
1158           push @use_parts, $p;
1159         }
1160         push @use_parts, 0 while @use_parts < 3;
1161
1162         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1163       }
1164     }
1165
1166     $self->_dbh_details->{info} = $info;
1167   }
1168
1169   return $info;
1170 }
1171
1172 sub _get_server_version {
1173   shift->_dbh_get_info(18);
1174 }
1175
1176 sub _dbh_get_info {
1177   my ($self, $info) = @_;
1178
1179   return try { $self->_get_dbh->get_info($info) } || undef;
1180 }
1181
1182 sub _determine_driver {
1183   my ($self) = @_;
1184
1185   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1186     my $started_connected = 0;
1187     local $self->{_in_determine_driver} = 1;
1188
1189     if (ref($self) eq __PACKAGE__) {
1190       my $driver;
1191       if ($self->_dbh) { # we are connected
1192         $driver = $self->_dbh->{Driver}{Name};
1193         $started_connected = 1;
1194       } else {
1195         # if connect_info is a CODEREF, we have no choice but to connect
1196         if (ref $self->_dbi_connect_info->[0] &&
1197             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1198           $self->_populate_dbh;
1199           $driver = $self->_dbh->{Driver}{Name};
1200         }
1201         else {
1202           # try to use dsn to not require being connected, the driver may still
1203           # force a connection in _rebless to determine version
1204           # (dsn may not be supplied at all if all we do is make a mock-schema)
1205           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1206           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1207           $driver ||= $ENV{DBI_DRIVER};
1208         }
1209       }
1210
1211       if ($driver) {
1212         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1213         if ($self->load_optional_class($storage_class)) {
1214           mro::set_mro($storage_class, 'c3');
1215           bless $self, $storage_class;
1216           $self->_rebless();
1217         }
1218       }
1219     }
1220
1221     $self->_driver_determined(1);
1222
1223     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1224
1225     $self->_init; # run driver-specific initializations
1226
1227     $self->_run_connection_actions
1228         if !$started_connected && defined $self->_dbh;
1229   }
1230 }
1231
1232 sub _do_connection_actions {
1233   my $self          = shift;
1234   my $method_prefix = shift;
1235   my $call          = shift;
1236
1237   if (not ref($call)) {
1238     my $method = $method_prefix . $call;
1239     $self->$method(@_);
1240   } elsif (ref($call) eq 'CODE') {
1241     $self->$call(@_);
1242   } elsif (ref($call) eq 'ARRAY') {
1243     if (ref($call->[0]) ne 'ARRAY') {
1244       $self->_do_connection_actions($method_prefix, $_) for @$call;
1245     } else {
1246       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1247     }
1248   } else {
1249     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1250   }
1251
1252   return $self;
1253 }
1254
1255 sub connect_call_do_sql {
1256   my $self = shift;
1257   $self->_do_query(@_);
1258 }
1259
1260 sub disconnect_call_do_sql {
1261   my $self = shift;
1262   $self->_do_query(@_);
1263 }
1264
1265 # override in db-specific backend when necessary
1266 sub connect_call_datetime_setup { 1 }
1267
1268 sub _do_query {
1269   my ($self, $action) = @_;
1270
1271   if (ref $action eq 'CODE') {
1272     $action = $action->($self);
1273     $self->_do_query($_) foreach @$action;
1274   }
1275   else {
1276     # Most debuggers expect ($sql, @bind), so we need to exclude
1277     # the attribute hash which is the second argument to $dbh->do
1278     # furthermore the bind values are usually to be presented
1279     # as named arrayref pairs, so wrap those here too
1280     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1281     my $sql = shift @do_args;
1282     my $attrs = shift @do_args;
1283     my @bind = map { [ undef, $_ ] } @do_args;
1284
1285     $self->_query_start($sql, @bind);
1286     $self->_get_dbh->do($sql, $attrs, @do_args);
1287     $self->_query_end($sql, @bind);
1288   }
1289
1290   return $self;
1291 }
1292
1293 sub _connect {
1294   my ($self, @info) = @_;
1295
1296   $self->throw_exception("You failed to provide any connection info")
1297     if !@info;
1298
1299   my ($old_connect_via, $dbh);
1300
1301   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1302     $old_connect_via = $DBI::connect_via;
1303     $DBI::connect_via = 'connect';
1304   }
1305
1306   try {
1307     if(ref $info[0] eq 'CODE') {
1308        $dbh = $info[0]->();
1309     }
1310     else {
1311        $dbh = DBI->connect(@info);
1312     }
1313
1314     if (!$dbh) {
1315       die $DBI::errstr;
1316     }
1317
1318     unless ($self->unsafe) {
1319
1320       $self->throw_exception(
1321         'Refusing clobbering of {HandleError} installed on externally supplied '
1322        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1323       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1324
1325       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1326       # request, or an external handle. Complain and set anyway
1327       unless ($dbh->{RaiseError}) {
1328         carp( ref $info[0] eq 'CODE'
1329
1330           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1331            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1332            .'attribute has been supplied'
1333
1334           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1335            .'unsafe => 1. Toggling RaiseError back to true'
1336         );
1337
1338         $dbh->{RaiseError} = 1;
1339       }
1340
1341       # this odd anonymous coderef dereference is in fact really
1342       # necessary to avoid the unwanted effect described in perl5
1343       # RT#75792
1344       sub {
1345         my $weak_self = $_[0];
1346         weaken $weak_self;
1347
1348         # the coderef is blessed so we can distinguish it from externally
1349         # supplied handles (which must be preserved)
1350         $_[1]->{HandleError} = bless sub {
1351           if ($weak_self) {
1352             $weak_self->throw_exception("DBI Exception: $_[0]");
1353           }
1354           else {
1355             # the handler may be invoked by something totally out of
1356             # the scope of DBIC
1357             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1358           }
1359         }, '__DBIC__DBH__ERROR__HANDLER__';
1360       }->($self, $dbh);
1361     }
1362   }
1363   catch {
1364     $self->throw_exception("DBI Connection failed: $_")
1365   }
1366   finally {
1367     $DBI::connect_via = $old_connect_via if $old_connect_via;
1368   };
1369
1370   $self->_dbh_autocommit($dbh->{AutoCommit});
1371   $dbh;
1372 }
1373
1374 sub svp_begin {
1375   my ($self, $name) = @_;
1376
1377   $name = $self->_svp_generate_name
1378     unless defined $name;
1379
1380   $self->throw_exception ("You can't use savepoints outside a transaction")
1381     if $self->{transaction_depth} == 0;
1382
1383   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1384     unless $self->can('_svp_begin');
1385
1386   push @{ $self->{savepoints} }, $name;
1387
1388   $self->debugobj->svp_begin($name) if $self->debug;
1389
1390   return $self->_svp_begin($name);
1391 }
1392
1393 sub svp_release {
1394   my ($self, $name) = @_;
1395
1396   $self->throw_exception ("You can't use savepoints outside a transaction")
1397     if $self->{transaction_depth} == 0;
1398
1399   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1400     unless $self->can('_svp_release');
1401
1402   if (defined $name) {
1403     $self->throw_exception ("Savepoint '$name' does not exist")
1404       unless grep { $_ eq $name } @{ $self->{savepoints} };
1405
1406     # Dig through the stack until we find the one we are releasing.  This keeps
1407     # the stack up to date.
1408     my $svp;
1409
1410     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1411   } else {
1412     $name = pop @{ $self->{savepoints} };
1413   }
1414
1415   $self->debugobj->svp_release($name) if $self->debug;
1416
1417   return $self->_svp_release($name);
1418 }
1419
1420 sub svp_rollback {
1421   my ($self, $name) = @_;
1422
1423   $self->throw_exception ("You can't use savepoints outside a transaction")
1424     if $self->{transaction_depth} == 0;
1425
1426   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1427     unless $self->can('_svp_rollback');
1428
1429   if (defined $name) {
1430       # If they passed us a name, verify that it exists in the stack
1431       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1432           $self->throw_exception("Savepoint '$name' does not exist!");
1433       }
1434
1435       # Dig through the stack until we find the one we are releasing.  This keeps
1436       # the stack up to date.
1437       while(my $s = pop(@{ $self->{savepoints} })) {
1438           last if($s eq $name);
1439       }
1440       # Add the savepoint back to the stack, as a rollback doesn't remove the
1441       # named savepoint, only everything after it.
1442       push(@{ $self->{savepoints} }, $name);
1443   } else {
1444       # We'll assume they want to rollback to the last savepoint
1445       $name = $self->{savepoints}->[-1];
1446   }
1447
1448   $self->debugobj->svp_rollback($name) if $self->debug;
1449
1450   return $self->_svp_rollback($name);
1451 }
1452
1453 sub _svp_generate_name {
1454   my ($self) = @_;
1455   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1456 }
1457
1458 sub txn_begin {
1459   my $self = shift;
1460
1461   # this means we have not yet connected and do not know the AC status
1462   # (e.g. coderef $dbh)
1463   if (! defined $self->_dbh_autocommit) {
1464     $self->ensure_connected;
1465   }
1466   # otherwise re-connect on pid changes, so
1467   # that the txn_depth is adjusted properly
1468   # the lightweight _get_dbh is good enoug here
1469   # (only superficial handle check, no pings)
1470   else {
1471     $self->_get_dbh;
1472   }
1473
1474   if($self->transaction_depth == 0) {
1475     $self->debugobj->txn_begin()
1476       if $self->debug;
1477     $self->_dbh_begin_work;
1478   }
1479   elsif ($self->auto_savepoint) {
1480     $self->svp_begin;
1481   }
1482   $self->{transaction_depth}++;
1483 }
1484
1485 sub _dbh_begin_work {
1486   my $self = shift;
1487
1488   # if the user is utilizing txn_do - good for him, otherwise we need to
1489   # ensure that the $dbh is healthy on BEGIN.
1490   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1491   # will be replaced by a failure of begin_work itself (which will be
1492   # then retried on reconnect)
1493   if ($self->{_in_dbh_do}) {
1494     $self->_dbh->begin_work;
1495   } else {
1496     $self->dbh_do(sub { $_[1]->begin_work });
1497   }
1498 }
1499
1500 sub txn_commit {
1501   my $self = shift;
1502   if (! $self->_dbh) {
1503     $self->throw_exception('cannot COMMIT on a disconnected handle');
1504   }
1505   elsif ($self->{transaction_depth} == 1) {
1506     $self->debugobj->txn_commit()
1507       if ($self->debug);
1508     $self->_dbh_commit;
1509     $self->{transaction_depth} = 0
1510       if $self->_dbh_autocommit;
1511   }
1512   elsif($self->{transaction_depth} > 1) {
1513     $self->{transaction_depth}--;
1514     $self->svp_release
1515       if $self->auto_savepoint;
1516   }
1517   elsif (! $self->_dbh->FETCH('AutoCommit') ) {
1518
1519     carp "Storage transaction_depth $self->{transaction_depth} does not match "
1520         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1521
1522     $self->debugobj->txn_commit()
1523       if ($self->debug);
1524     $self->_dbh_commit;
1525     $self->{transaction_depth} = 0
1526       if $self->_dbh_autocommit;
1527   }
1528   else {
1529     $self->throw_exception( 'Refusing to commit without a started transaction' );
1530   }
1531 }
1532
1533 sub _dbh_commit {
1534   my $self = shift;
1535   my $dbh  = $self->_dbh
1536     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1537   $dbh->commit;
1538 }
1539
1540 sub txn_rollback {
1541   my $self = shift;
1542   my $dbh = $self->_dbh;
1543   try {
1544     if ($self->{transaction_depth} == 1) {
1545       $self->debugobj->txn_rollback()
1546         if ($self->debug);
1547       $self->{transaction_depth} = 0
1548         if $self->_dbh_autocommit;
1549       $self->_dbh_rollback;
1550     }
1551     elsif($self->{transaction_depth} > 1) {
1552       $self->{transaction_depth}--;
1553       if ($self->auto_savepoint) {
1554         $self->svp_rollback;
1555         $self->svp_release;
1556       }
1557     }
1558     else {
1559       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1560     }
1561   }
1562   catch {
1563     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1564
1565     if ($_ !~ /$exception_class/) {
1566       # ensure that a failed rollback resets the transaction depth
1567       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1568     }
1569
1570     $self->throw_exception($_)
1571   };
1572 }
1573
1574 sub _dbh_rollback {
1575   my $self = shift;
1576   my $dbh  = $self->_dbh
1577     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1578   $dbh->rollback;
1579 }
1580
1581 # This used to be the top-half of _execute.  It was split out to make it
1582 #  easier to override in NoBindVars without duping the rest.  It takes up
1583 #  all of _execute's args, and emits $sql, @bind.
1584 sub _prep_for_execute {
1585   my ($self, $op, $extra_bind, $ident, $args) = @_;
1586
1587   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1588     $ident = $ident->from();
1589   }
1590
1591   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1592
1593   unshift(@bind,
1594     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1595       if $extra_bind;
1596   return ($sql, \@bind);
1597 }
1598
1599
1600 sub _fix_bind_params {
1601     my ($self, @bind) = @_;
1602
1603     ### Turn @bind from something like this:
1604     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1605     ### to this:
1606     ###   ( "'1'", "'1'", "'3'" )
1607     return
1608         map {
1609             if ( defined( $_ && $_->[1] ) ) {
1610                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1611             }
1612             else { q{NULL}; }
1613         } @bind;
1614 }
1615
1616 sub _query_start {
1617     my ( $self, $sql, @bind ) = @_;
1618
1619     if ( $self->debug ) {
1620         @bind = $self->_fix_bind_params(@bind);
1621
1622         $self->debugobj->query_start( $sql, @bind );
1623     }
1624 }
1625
1626 sub _query_end {
1627     my ( $self, $sql, @bind ) = @_;
1628
1629     if ( $self->debug ) {
1630         @bind = $self->_fix_bind_params(@bind);
1631         $self->debugobj->query_end( $sql, @bind );
1632     }
1633 }
1634
1635 sub _dbh_execute {
1636   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1637
1638   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1639
1640   $self->_query_start( $sql, @$bind );
1641
1642   my $sth = $self->sth($sql,$op);
1643
1644   my $placeholder_index = 1;
1645
1646   foreach my $bound (@$bind) {
1647     my $attributes = {};
1648     my($column_name, @data) = @$bound;
1649
1650     if ($bind_attributes) {
1651       $attributes = $bind_attributes->{$column_name}
1652       if defined $bind_attributes->{$column_name};
1653     }
1654
1655     foreach my $data (@data) {
1656       my $ref = ref $data;
1657
1658       if ($ref and overload::Method($data, '""') ) {
1659         $data = "$data";
1660       }
1661       elsif ($ref eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1662         $sth->bind_param_inout(
1663           $placeholder_index++,
1664           $data,
1665           $self->_max_column_bytesize($ident, $column_name),
1666           $attributes
1667         );
1668         next;
1669       }
1670
1671       $sth->bind_param($placeholder_index++, $data, $attributes);
1672     }
1673   }
1674
1675   # Can this fail without throwing an exception anyways???
1676   my $rv = $sth->execute();
1677   $self->throw_exception(
1678     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1679   ) if !$rv;
1680
1681   $self->_query_end( $sql, @$bind );
1682
1683   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1684 }
1685
1686 sub _execute {
1687     my $self = shift;
1688     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1689 }
1690
1691 sub _prefetch_autovalues {
1692   my ($self, $source, $to_insert) = @_;
1693
1694   my $colinfo = $source->columns_info;
1695
1696   my %values;
1697   for my $col (keys %$colinfo) {
1698     if (
1699       $colinfo->{$col}{auto_nextval}
1700         and
1701       (
1702         ! exists $to_insert->{$col}
1703           or
1704         ref $to_insert->{$col} eq 'SCALAR'
1705       )
1706     ) {
1707       $values{$col} = $self->_sequence_fetch(
1708         'NEXTVAL',
1709         ( $colinfo->{$col}{sequence} ||=
1710             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1711         ),
1712       );
1713     }
1714   }
1715
1716   \%values;
1717 }
1718
1719 sub insert {
1720   my ($self, $source, $to_insert) = @_;
1721
1722   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1723
1724   # fuse the values
1725   $to_insert = { %$to_insert, %$prefetched_values };
1726
1727   # list of primary keys we try to fetch from the database
1728   # both not-exsists and scalarrefs are considered
1729   my %fetch_pks;
1730   for ($source->primary_columns) {
1731     $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
1732       if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
1733   }
1734
1735   my ($sqla_opts, @ir_container);
1736   if ($self->_use_insert_returning) {
1737
1738     # retain order as declared in the resultsource
1739     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1740       push @{$sqla_opts->{returning}}, $_;
1741       $sqla_opts->{returning_container} = \@ir_container
1742         if $self->_use_insert_returning_bound;
1743     }
1744   }
1745
1746   my $bind_attributes = $self->source_bind_attributes($source);
1747
1748   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1749
1750   my %returned_cols;
1751
1752   if (my $retlist = $sqla_opts->{returning}) {
1753     @ir_container = try {
1754       local $SIG{__WARN__} = sub {};
1755       my @r = $sth->fetchrow_array;
1756       $sth->finish;
1757       @r;
1758     } unless @ir_container;
1759
1760     @returned_cols{@$retlist} = @ir_container if @ir_container;
1761   }
1762
1763   return { %$prefetched_values, %returned_cols };
1764 }
1765
1766
1767 ## Currently it is assumed that all values passed will be "normal", i.e. not
1768 ## scalar refs, or at least, all the same type as the first set, the statement is
1769 ## only prepped once.
1770 sub insert_bulk {
1771   my ($self, $source, $cols, $data) = @_;
1772
1773   my %colvalues;
1774   @colvalues{@$cols} = (0..$#$cols);
1775
1776   for my $i (0..$#$cols) {
1777     my $first_val = $data->[0][$i];
1778     next unless ref $first_val eq 'SCALAR';
1779
1780     $colvalues{ $cols->[$i] } = $first_val;
1781   }
1782
1783   # check for bad data and stringify stringifiable objects
1784   my $bad_slice = sub {
1785     my ($msg, $col_idx, $slice_idx) = @_;
1786     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1787       $msg,
1788       $cols->[$col_idx],
1789       do {
1790         require Data::Dumper::Concise;
1791         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1792         Data::Dumper::Concise::Dumper ({
1793           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1794         }),
1795       }
1796     );
1797   };
1798
1799   for my $datum_idx (0..$#$data) {
1800     my $datum = $data->[$datum_idx];
1801
1802     for my $col_idx (0..$#$cols) {
1803       my $val            = $datum->[$col_idx];
1804       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1805       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1806
1807       if ($is_literal_sql) {
1808         if (not ref $val) {
1809           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1810         }
1811         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1812           $bad_slice->("$reftype reference found where literal SQL expected",
1813             $col_idx, $datum_idx);
1814         }
1815         elsif ($$val ne $$sqla_bind){
1816           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1817             $col_idx, $datum_idx);
1818         }
1819       }
1820       elsif (my $reftype = ref $val) {
1821         require overload;
1822         if (overload::Method($val, '""')) {
1823           $datum->[$col_idx] = "".$val;
1824         }
1825         else {
1826           $bad_slice->("$reftype reference found where bind expected",
1827             $col_idx, $datum_idx);
1828         }
1829       }
1830     }
1831   }
1832
1833   my ($sql, $bind) = $self->_prep_for_execute (
1834     'insert', undef, $source, [\%colvalues]
1835   );
1836
1837   if (! @$bind) {
1838     # if the bindlist is empty - make sure all "values" are in fact
1839     # literal scalarrefs. If not the case this means the storage ate
1840     # them away (e.g. the NoBindVars component) and interpolated them
1841     # directly into the SQL. This obviosly can't be good for multi-inserts
1842
1843     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1844       if first { ref $_ ne 'SCALAR' } values %colvalues;
1845   }
1846
1847   # neither _execute_array, nor _execute_inserts_with_no_binds are
1848   # atomic (even if _execute _array is a single call). Thus a safety
1849   # scope guard
1850   my $guard = $self->txn_scope_guard;
1851
1852   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1853   my $sth = $self->sth($sql);
1854   my $rv = do {
1855     if (@$bind) {
1856       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1857       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1858     }
1859     else {
1860       # bind_param_array doesn't work if there are no binds
1861       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1862     }
1863   };
1864
1865   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1866
1867   $guard->commit;
1868
1869   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1870 }
1871
1872 sub _execute_array {
1873   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1874
1875   ## This must be an arrayref, else nothing works!
1876   my $tuple_status = [];
1877
1878   ## Get the bind_attributes, if any exist
1879   my $bind_attributes = $self->source_bind_attributes($source);
1880
1881   ## Bind the values and execute
1882   my $placeholder_index = 1;
1883
1884   foreach my $bound (@$bind) {
1885
1886     my $attributes = {};
1887     my ($column_name, $data_index) = @$bound;
1888
1889     if( $bind_attributes ) {
1890       $attributes = $bind_attributes->{$column_name}
1891       if defined $bind_attributes->{$column_name};
1892     }
1893
1894     my @data = map { $_->[$data_index] } @$data;
1895
1896     $sth->bind_param_array(
1897       $placeholder_index,
1898       [@data],
1899       (%$attributes ?  $attributes : ()),
1900     );
1901     $placeholder_index++;
1902   }
1903
1904   my ($rv, $err);
1905   try {
1906     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1907   }
1908   catch {
1909     $err = shift;
1910   };
1911
1912   # Not all DBDs are create equal. Some throw on error, some return
1913   # an undef $rv, and some set $sth->err - try whatever we can
1914   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1915     ! defined $err
1916       and
1917     ( !defined $rv or $sth->err )
1918   );
1919
1920   # Statement must finish even if there was an exception.
1921   try {
1922     $sth->finish
1923   }
1924   catch {
1925     $err = shift unless defined $err
1926   };
1927
1928   if (defined $err) {
1929     my $i = 0;
1930     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1931
1932     $self->throw_exception("Unexpected populate error: $err")
1933       if ($i > $#$tuple_status);
1934
1935     require Data::Dumper::Concise;
1936     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1937       ($tuple_status->[$i][1] || $err),
1938       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
1939     );
1940   }
1941
1942   return $rv;
1943 }
1944
1945 sub _dbh_execute_array {
1946     my ($self, $sth, $tuple_status, @extra) = @_;
1947
1948     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1949 }
1950
1951 sub _dbh_execute_inserts_with_no_binds {
1952   my ($self, $sth, $count) = @_;
1953
1954   my $err;
1955   try {
1956     my $dbh = $self->_get_dbh;
1957     local $dbh->{RaiseError} = 1;
1958     local $dbh->{PrintError} = 0;
1959
1960     $sth->execute foreach 1..$count;
1961   }
1962   catch {
1963     $err = shift;
1964   };
1965
1966   # Make sure statement is finished even if there was an exception.
1967   try {
1968     $sth->finish
1969   }
1970   catch {
1971     $err = shift unless defined $err;
1972   };
1973
1974   $self->throw_exception($err) if defined $err;
1975
1976   return $count;
1977 }
1978
1979 sub update {
1980   my ($self, $source, @args) = @_;
1981
1982   my $bind_attrs = $self->source_bind_attributes($source);
1983
1984   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1985 }
1986
1987
1988 sub delete {
1989   my ($self, $source, @args) = @_;
1990
1991   my $bind_attrs = $self->source_bind_attributes($source);
1992
1993   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1994 }
1995
1996 # We were sent here because the $rs contains a complex search
1997 # which will require a subquery to select the correct rows
1998 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1999 #
2000 # Generating a single PK column subquery is trivial and supported
2001 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
2002 # Look at _multipk_update_delete()
2003 sub _subq_update_delete {
2004   my $self = shift;
2005   my ($rs, $op, $values) = @_;
2006
2007   my $rsrc = $rs->result_source;
2008
2009   # quick check if we got a sane rs on our hands
2010   my @pcols = $rsrc->_pri_cols;
2011
2012   my $sel = $rs->_resolved_attrs->{select};
2013   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
2014
2015   if (
2016       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
2017         ne
2018       join ("\x00", sort @$sel )
2019   ) {
2020     $self->throw_exception (
2021       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
2022     );
2023   }
2024
2025   if (@pcols == 1) {
2026     return $self->$op (
2027       $rsrc,
2028       $op eq 'update' ? $values : (),
2029       { $pcols[0] => { -in => $rs->as_query } },
2030     );
2031   }
2032
2033   else {
2034     return $self->_multipk_update_delete (@_);
2035   }
2036 }
2037
2038 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2039 # resultset update/delete involving subqueries. So by default resort
2040 # to simple (and inefficient) delete_all style per-row opearations,
2041 # while allowing specific storages to override this with a faster
2042 # implementation.
2043 #
2044 sub _multipk_update_delete {
2045   return shift->_per_row_update_delete (@_);
2046 }
2047
2048 # This is the default loop used to delete/update rows for multi PK
2049 # resultsets, and used by mysql exclusively (because it can't do anything
2050 # else).
2051 #
2052 # We do not use $row->$op style queries, because resultset update/delete
2053 # is not expected to cascade (this is what delete_all/update_all is for).
2054 #
2055 # There should be no race conditions as the entire operation is rolled
2056 # in a transaction.
2057 #
2058 sub _per_row_update_delete {
2059   my $self = shift;
2060   my ($rs, $op, $values) = @_;
2061
2062   my $rsrc = $rs->result_source;
2063   my @pcols = $rsrc->_pri_cols;
2064
2065   my $guard = $self->txn_scope_guard;
2066
2067   # emulate the return value of $sth->execute for non-selects
2068   my $row_cnt = '0E0';
2069
2070   my $subrs_cur = $rs->cursor;
2071   my @all_pk = $subrs_cur->all;
2072   for my $pks ( @all_pk) {
2073
2074     my $cond;
2075     for my $i (0.. $#pcols) {
2076       $cond->{$pcols[$i]} = $pks->[$i];
2077     }
2078
2079     $self->$op (
2080       $rsrc,
2081       $op eq 'update' ? $values : (),
2082       $cond,
2083     );
2084
2085     $row_cnt++;
2086   }
2087
2088   $guard->commit;
2089
2090   return $row_cnt;
2091 }
2092
2093 sub _select {
2094   my $self = shift;
2095   $self->_execute($self->_select_args(@_));
2096 }
2097
2098 sub _select_args_to_query {
2099   my $self = shift;
2100
2101   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
2102   #  = $self->_select_args($ident, $select, $cond, $attrs);
2103   my ($op, $bind, $ident, $bind_attrs, @args) =
2104     $self->_select_args(@_);
2105
2106   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2107   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
2108   $prepared_bind ||= [];
2109
2110   return wantarray
2111     ? ($sql, $prepared_bind, $bind_attrs)
2112     : \[ "($sql)", @$prepared_bind ]
2113   ;
2114 }
2115
2116 sub _select_args {
2117   my ($self, $ident, $select, $where, $attrs) = @_;
2118
2119   my $sql_maker = $self->sql_maker;
2120   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2121
2122   $attrs = {
2123     %$attrs,
2124     select => $select,
2125     from => $ident,
2126     where => $where,
2127     $rs_alias && $alias2source->{$rs_alias}
2128       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2129       : ()
2130     ,
2131   };
2132
2133   # calculate bind_attrs before possible $ident mangling
2134   my $bind_attrs = {};
2135   for my $alias (keys %$alias2source) {
2136     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2137     for my $col (keys %$bindtypes) {
2138
2139       my $fqcn = join ('.', $alias, $col);
2140       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2141
2142       # Unqialified column names are nice, but at the same time can be
2143       # rather ambiguous. What we do here is basically go along with
2144       # the loop, adding an unqualified column slot to $bind_attrs,
2145       # alongside the fully qualified name. As soon as we encounter
2146       # another column by that name (which would imply another table)
2147       # we unset the unqualified slot and never add any info to it
2148       # to avoid erroneous type binding. If this happens the users
2149       # only choice will be to fully qualify his column name
2150
2151       if (exists $bind_attrs->{$col}) {
2152         $bind_attrs->{$col} = {};
2153       }
2154       else {
2155         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2156       }
2157     }
2158   }
2159
2160   # Sanity check the attributes (SQLMaker does it too, but
2161   # in case of a software_limit we'll never reach there)
2162   if (defined $attrs->{offset}) {
2163     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2164       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2165   }
2166   $attrs->{offset} ||= 0;
2167
2168   if (defined $attrs->{rows}) {
2169     $self->throw_exception("The rows attribute must be a positive integer if present")
2170       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2171   }
2172   elsif ($attrs->{offset}) {
2173     # MySQL actually recommends this approach.  I cringe.
2174     $attrs->{rows} = $sql_maker->__max_int;
2175   }
2176
2177   my @limit;
2178
2179   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2180   # storage, unless software limit was requested
2181   if (
2182     #limited has_many
2183     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2184        ||
2185     # grouped prefetch (to satisfy group_by == select)
2186     ( $attrs->{group_by}
2187         &&
2188       @{$attrs->{group_by}}
2189         &&
2190       $attrs->{_prefetch_selector_range}
2191     )
2192   ) {
2193     ($ident, $select, $where, $attrs)
2194       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2195   }
2196   elsif (! $attrs->{software_limit} ) {
2197     push @limit, $attrs->{rows}, $attrs->{offset};
2198   }
2199
2200   # try to simplify the joinmap further (prune unreferenced type-single joins)
2201   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2202
2203 ###
2204   # This would be the point to deflate anything found in $where
2205   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2206   # expect a row object. And all we have is a resultsource (it is trivial
2207   # to extract deflator coderefs via $alias2source above).
2208   #
2209   # I don't see a way forward other than changing the way deflators are
2210   # invoked, and that's just bad...
2211 ###
2212
2213   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2214 }
2215
2216 # Returns a counting SELECT for a simple count
2217 # query. Abstracted so that a storage could override
2218 # this to { count => 'firstcol' } or whatever makes
2219 # sense as a performance optimization
2220 sub _count_select {
2221   #my ($self, $source, $rs_attrs) = @_;
2222   return { count => '*' };
2223 }
2224
2225
2226 sub source_bind_attributes {
2227   my ($self, $source) = @_;
2228
2229   my $bind_attributes;
2230
2231   my $colinfo = $source->columns_info;
2232
2233   for my $col (keys %$colinfo) {
2234     if (my $dt = $colinfo->{$col}{data_type} ) {
2235       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2236     }
2237   }
2238
2239   return $bind_attributes;
2240 }
2241
2242 =head2 select
2243
2244 =over 4
2245
2246 =item Arguments: $ident, $select, $condition, $attrs
2247
2248 =back
2249
2250 Handle a SQL select statement.
2251
2252 =cut
2253
2254 sub select {
2255   my $self = shift;
2256   my ($ident, $select, $condition, $attrs) = @_;
2257   return $self->cursor_class->new($self, \@_, $attrs);
2258 }
2259
2260 sub select_single {
2261   my $self = shift;
2262   my ($rv, $sth, @bind) = $self->_select(@_);
2263   my @row = $sth->fetchrow_array;
2264   my @nextrow = $sth->fetchrow_array if @row;
2265   if(@row && @nextrow) {
2266     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2267   }
2268   # Need to call finish() to work round broken DBDs
2269   $sth->finish();
2270   return @row;
2271 }
2272
2273 =head2 sql_limit_dialect
2274
2275 This is an accessor for the default SQL limit dialect used by a particular
2276 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2277 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2278 see L<DBIx::Class::SQLMaker::LimitDialects>.
2279
2280 =head2 sth
2281
2282 =over 4
2283
2284 =item Arguments: $sql
2285
2286 =back
2287
2288 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2289
2290 =cut
2291
2292 sub _dbh_sth {
2293   my ($self, $dbh, $sql) = @_;
2294
2295   # 3 is the if_active parameter which avoids active sth re-use
2296   my $sth = $self->disable_sth_caching
2297     ? $dbh->prepare($sql)
2298     : $dbh->prepare_cached($sql, {}, 3);
2299
2300   # XXX You would think RaiseError would make this impossible,
2301   #  but apparently that's not true :(
2302   $self->throw_exception(
2303     $dbh->errstr
2304       ||
2305     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2306             .'an exception and/or setting $dbh->errstr',
2307       length ($sql) > 20
2308         ? substr($sql, 0, 20) . '...'
2309         : $sql
2310       ,
2311       'DBD::' . $dbh->{Driver}{Name},
2312     )
2313   ) if !$sth;
2314
2315   $sth;
2316 }
2317
2318 sub sth {
2319   my ($self, $sql) = @_;
2320   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2321 }
2322
2323 sub _dbh_columns_info_for {
2324   my ($self, $dbh, $table) = @_;
2325
2326   if ($dbh->can('column_info')) {
2327     my %result;
2328     my $caught;
2329     try {
2330       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2331       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2332       $sth->execute();
2333       while ( my $info = $sth->fetchrow_hashref() ){
2334         my %column_info;
2335         $column_info{data_type}   = $info->{TYPE_NAME};
2336         $column_info{size}      = $info->{COLUMN_SIZE};
2337         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2338         $column_info{default_value} = $info->{COLUMN_DEF};
2339         my $col_name = $info->{COLUMN_NAME};
2340         $col_name =~ s/^\"(.*)\"$/$1/;
2341
2342         $result{$col_name} = \%column_info;
2343       }
2344     } catch {
2345       $caught = 1;
2346     };
2347     return \%result if !$caught && scalar keys %result;
2348   }
2349
2350   my %result;
2351   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2352   $sth->execute;
2353   my @columns = @{$sth->{NAME_lc}};
2354   for my $i ( 0 .. $#columns ){
2355     my %column_info;
2356     $column_info{data_type} = $sth->{TYPE}->[$i];
2357     $column_info{size} = $sth->{PRECISION}->[$i];
2358     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2359
2360     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2361       $column_info{data_type} = $1;
2362       $column_info{size}    = $2;
2363     }
2364
2365     $result{$columns[$i]} = \%column_info;
2366   }
2367   $sth->finish;
2368
2369   foreach my $col (keys %result) {
2370     my $colinfo = $result{$col};
2371     my $type_num = $colinfo->{data_type};
2372     my $type_name;
2373     if(defined $type_num && $dbh->can('type_info')) {
2374       my $type_info = $dbh->type_info($type_num);
2375       $type_name = $type_info->{TYPE_NAME} if $type_info;
2376       $colinfo->{data_type} = $type_name if $type_name;
2377     }
2378   }
2379
2380   return \%result;
2381 }
2382
2383 sub columns_info_for {
2384   my ($self, $table) = @_;
2385   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2386 }
2387
2388 =head2 last_insert_id
2389
2390 Return the row id of the last insert.
2391
2392 =cut
2393
2394 sub _dbh_last_insert_id {
2395     my ($self, $dbh, $source, $col) = @_;
2396
2397     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2398
2399     return $id if defined $id;
2400
2401     my $class = ref $self;
2402     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2403 }
2404
2405 sub last_insert_id {
2406   my $self = shift;
2407   $self->_dbh_last_insert_id ($self->_dbh, @_);
2408 }
2409
2410 =head2 _native_data_type
2411
2412 =over 4
2413
2414 =item Arguments: $type_name
2415
2416 =back
2417
2418 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2419 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2420 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2421
2422 The default implementation returns C<undef>, implement in your Storage driver if
2423 you need this functionality.
2424
2425 Should map types from other databases to the native RDBMS type, for example
2426 C<VARCHAR2> to C<VARCHAR>.
2427
2428 Types with modifiers should map to the underlying data type. For example,
2429 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2430
2431 Composite types should map to the container type, for example
2432 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2433
2434 =cut
2435
2436 sub _native_data_type {
2437   #my ($self, $data_type) = @_;
2438   return undef
2439 }
2440
2441 # Check if placeholders are supported at all
2442 sub _determine_supports_placeholders {
2443   my $self = shift;
2444   my $dbh  = $self->_get_dbh;
2445
2446   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2447   # but it is inaccurate more often than not
2448   return try {
2449     local $dbh->{PrintError} = 0;
2450     local $dbh->{RaiseError} = 1;
2451     $dbh->do('select ?', {}, 1);
2452     1;
2453   }
2454   catch {
2455     0;
2456   };
2457 }
2458
2459 # Check if placeholders bound to non-string types throw exceptions
2460 #
2461 sub _determine_supports_typeless_placeholders {
2462   my $self = shift;
2463   my $dbh  = $self->_get_dbh;
2464
2465   return try {
2466     local $dbh->{PrintError} = 0;
2467     local $dbh->{RaiseError} = 1;
2468     # this specifically tests a bind that is NOT a string
2469     $dbh->do('select 1 where 1 = ?', {}, 1);
2470     1;
2471   }
2472   catch {
2473     0;
2474   };
2475 }
2476
2477 =head2 sqlt_type
2478
2479 Returns the database driver name.
2480
2481 =cut
2482
2483 sub sqlt_type {
2484   shift->_get_dbh->{Driver}->{Name};
2485 }
2486
2487 =head2 bind_attribute_by_data_type
2488
2489 Given a datatype from column info, returns a database specific bind
2490 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2491 let the database planner just handle it.
2492
2493 Generally only needed for special case column types, like bytea in postgres.
2494
2495 =cut
2496
2497 sub bind_attribute_by_data_type {
2498     return;
2499 }
2500
2501 =head2 is_datatype_numeric
2502
2503 Given a datatype from column_info, returns a boolean value indicating if
2504 the current RDBMS considers it a numeric value. This controls how
2505 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2506 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2507 be performed instead of the usual C<eq>.
2508
2509 =cut
2510
2511 sub is_datatype_numeric {
2512   my ($self, $dt) = @_;
2513
2514   return 0 unless $dt;
2515
2516   return $dt =~ /^ (?:
2517     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2518   ) $/ix;
2519 }
2520
2521
2522 =head2 create_ddl_dir
2523
2524 =over 4
2525
2526 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2527
2528 =back
2529
2530 Creates a SQL file based on the Schema, for each of the specified
2531 database engines in C<\@databases> in the given directory.
2532 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2533
2534 Given a previous version number, this will also create a file containing
2535 the ALTER TABLE statements to transform the previous schema into the
2536 current one. Note that these statements may contain C<DROP TABLE> or
2537 C<DROP COLUMN> statements that can potentially destroy data.
2538
2539 The file names are created using the C<ddl_filename> method below, please
2540 override this method in your schema if you would like a different file
2541 name format. For the ALTER file, the same format is used, replacing
2542 $version in the name with "$preversion-$version".
2543
2544 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2545 The most common value for this would be C<< { add_drop_table => 1 } >>
2546 to have the SQL produced include a C<DROP TABLE> statement for each table
2547 created. For quoting purposes supply C<quote_table_names> and
2548 C<quote_field_names>.
2549
2550 If no arguments are passed, then the following default values are assumed:
2551
2552 =over 4
2553
2554 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2555
2556 =item version    - $schema->schema_version
2557
2558 =item directory  - './'
2559
2560 =item preversion - <none>
2561
2562 =back
2563
2564 By default, C<\%sqlt_args> will have
2565
2566  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2567
2568 merged with the hash passed in. To disable any of those features, pass in a
2569 hashref like the following
2570
2571  { ignore_constraint_names => 0, # ... other options }
2572
2573
2574 WARNING: You are strongly advised to check all SQL files created, before applying
2575 them.
2576
2577 =cut
2578
2579 sub create_ddl_dir {
2580   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2581
2582   unless ($dir) {
2583     carp "No directory given, using ./\n";
2584     $dir = './';
2585   } else {
2586       -d $dir
2587         or
2588       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2589         or
2590       $self->throw_exception(
2591         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2592       );
2593   }
2594
2595   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2596
2597   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2598   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2599
2600   my $schema_version = $schema->schema_version || '1.x';
2601   $version ||= $schema_version;
2602
2603   $sqltargs = {
2604     add_drop_table => 1,
2605     ignore_constraint_names => 1,
2606     ignore_index_names => 1,
2607     %{$sqltargs || {}}
2608   };
2609
2610   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2611     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2612   }
2613
2614   my $sqlt = SQL::Translator->new( $sqltargs );
2615
2616   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2617   my $sqlt_schema = $sqlt->translate({ data => $schema })
2618     or $self->throw_exception ($sqlt->error);
2619
2620   foreach my $db (@$databases) {
2621     $sqlt->reset();
2622     $sqlt->{schema} = $sqlt_schema;
2623     $sqlt->producer($db);
2624
2625     my $file;
2626     my $filename = $schema->ddl_filename($db, $version, $dir);
2627     if (-e $filename && ($version eq $schema_version )) {
2628       # if we are dumping the current version, overwrite the DDL
2629       carp "Overwriting existing DDL file - $filename";
2630       unlink($filename);
2631     }
2632
2633     my $output = $sqlt->translate;
2634     if(!$output) {
2635       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2636       next;
2637     }
2638     if(!open($file, ">$filename")) {
2639       $self->throw_exception("Can't open $filename for writing ($!)");
2640       next;
2641     }
2642     print $file $output;
2643     close($file);
2644
2645     next unless ($preversion);
2646
2647     require SQL::Translator::Diff;
2648
2649     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2650     if(!-e $prefilename) {
2651       carp("No previous schema file found ($prefilename)");
2652       next;
2653     }
2654
2655     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2656     if(-e $difffile) {
2657       carp("Overwriting existing diff file - $difffile");
2658       unlink($difffile);
2659     }
2660
2661     my $source_schema;
2662     {
2663       my $t = SQL::Translator->new($sqltargs);
2664       $t->debug( 0 );
2665       $t->trace( 0 );
2666
2667       $t->parser( $db )
2668         or $self->throw_exception ($t->error);
2669
2670       my $out = $t->translate( $prefilename )
2671         or $self->throw_exception ($t->error);
2672
2673       $source_schema = $t->schema;
2674
2675       $source_schema->name( $prefilename )
2676         unless ( $source_schema->name );
2677     }
2678
2679     # The "new" style of producers have sane normalization and can support
2680     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2681     # And we have to diff parsed SQL against parsed SQL.
2682     my $dest_schema = $sqlt_schema;
2683
2684     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2685       my $t = SQL::Translator->new($sqltargs);
2686       $t->debug( 0 );
2687       $t->trace( 0 );
2688
2689       $t->parser( $db )
2690         or $self->throw_exception ($t->error);
2691
2692       my $out = $t->translate( $filename )
2693         or $self->throw_exception ($t->error);
2694
2695       $dest_schema = $t->schema;
2696
2697       $dest_schema->name( $filename )
2698         unless $dest_schema->name;
2699     }
2700
2701     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2702                                                   $dest_schema,   $db,
2703                                                   $sqltargs
2704                                                  );
2705     if(!open $file, ">$difffile") {
2706       $self->throw_exception("Can't write to $difffile ($!)");
2707       next;
2708     }
2709     print $file $diff;
2710     close($file);
2711   }
2712 }
2713
2714 =head2 deployment_statements
2715
2716 =over 4
2717
2718 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2719
2720 =back
2721
2722 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2723
2724 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2725 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2726
2727 C<$directory> is used to return statements from files in a previously created
2728 L</create_ddl_dir> directory and is optional. The filenames are constructed
2729 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2730
2731 If no C<$directory> is specified then the statements are constructed on the
2732 fly using L<SQL::Translator> and C<$version> is ignored.
2733
2734 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2735
2736 =cut
2737
2738 sub deployment_statements {
2739   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2740   $type ||= $self->sqlt_type;
2741   $version ||= $schema->schema_version || '1.x';
2742   $dir ||= './';
2743   my $filename = $schema->ddl_filename($type, $version, $dir);
2744   if(-f $filename)
2745   {
2746       # FIXME replace this block when a proper sane sql parser is available
2747       my $file;
2748       open($file, "<$filename")
2749         or $self->throw_exception("Can't open $filename ($!)");
2750       my @rows = <$file>;
2751       close($file);
2752       return join('', @rows);
2753   }
2754
2755   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2756     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2757   }
2758
2759   # sources needs to be a parser arg, but for simplicty allow at top level
2760   # coming in
2761   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2762       if exists $sqltargs->{sources};
2763
2764   my $tr = SQL::Translator->new(
2765     producer => "SQL::Translator::Producer::${type}",
2766     %$sqltargs,
2767     parser => 'SQL::Translator::Parser::DBIx::Class',
2768     data => $schema,
2769   );
2770
2771   my @ret;
2772   if (wantarray) {
2773     @ret = $tr->translate;
2774   }
2775   else {
2776     $ret[0] = $tr->translate;
2777   }
2778
2779   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2780     unless (@ret && defined $ret[0]);
2781
2782   return wantarray ? @ret : $ret[0];
2783 }
2784
2785 # FIXME deploy() currently does not accurately report sql errors
2786 # Will always return true while errors are warned
2787 sub deploy {
2788   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2789   my $deploy = sub {
2790     my $line = shift;
2791     return if(!$line);
2792     return if($line =~ /^--/);
2793     # next if($line =~ /^DROP/m);
2794     return if($line =~ /^BEGIN TRANSACTION/m);
2795     return if($line =~ /^COMMIT/m);
2796     return if $line =~ /^\s+$/; # skip whitespace only
2797     $self->_query_start($line);
2798     try {
2799       # do a dbh_do cycle here, as we need some error checking in
2800       # place (even though we will ignore errors)
2801       $self->dbh_do (sub { $_[1]->do($line) });
2802     } catch {
2803       carp qq{$_ (running "${line}")};
2804     };
2805     $self->_query_end($line);
2806   };
2807   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2808   if (@statements > 1) {
2809     foreach my $statement (@statements) {
2810       $deploy->( $statement );
2811     }
2812   }
2813   elsif (@statements == 1) {
2814     # split on single line comments and end of statements
2815     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2816       $deploy->( $line );
2817     }
2818   }
2819 }
2820
2821 =head2 datetime_parser
2822
2823 Returns the datetime parser class
2824
2825 =cut
2826
2827 sub datetime_parser {
2828   my $self = shift;
2829   return $self->{datetime_parser} ||= do {
2830     $self->build_datetime_parser(@_);
2831   };
2832 }
2833
2834 =head2 datetime_parser_type
2835
2836 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2837
2838 =head2 build_datetime_parser
2839
2840 See L</datetime_parser>
2841
2842 =cut
2843
2844 sub build_datetime_parser {
2845   my $self = shift;
2846   my $type = $self->datetime_parser_type(@_);
2847   return $type;
2848 }
2849
2850
2851 =head2 is_replicating
2852
2853 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2854 replicate from a master database.  Default is undef, which is the result
2855 returned by databases that don't support replication.
2856
2857 =cut
2858
2859 sub is_replicating {
2860     return;
2861
2862 }
2863
2864 =head2 lag_behind_master
2865
2866 Returns a number that represents a certain amount of lag behind a master db
2867 when a given storage is replicating.  The number is database dependent, but
2868 starts at zero and increases with the amount of lag. Default in undef
2869
2870 =cut
2871
2872 sub lag_behind_master {
2873     return;
2874 }
2875
2876 =head2 relname_to_table_alias
2877
2878 =over 4
2879
2880 =item Arguments: $relname, $join_count
2881
2882 =back
2883
2884 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2885 queries.
2886
2887 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2888 way these aliases are named.
2889
2890 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2891 otherwise C<"$relname">.
2892
2893 =cut
2894
2895 sub relname_to_table_alias {
2896   my ($self, $relname, $join_count) = @_;
2897
2898   my $alias = ($join_count && $join_count > 1 ?
2899     join('_', $relname, $join_count) : $relname);
2900
2901   return $alias;
2902 }
2903
2904 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2905 # version and it may be necessary to amend or override it for a specific storage
2906 # if such binds are necessary.
2907 sub _max_column_bytesize {
2908   my ($self, $source, $col) = @_;
2909
2910   my $inf = $source->column_info($col);
2911   return $inf->{_max_bytesize} ||= do {
2912
2913     my $max_size;
2914
2915     if (my $data_type = $inf->{data_type}) {
2916       $data_type = lc($data_type);
2917
2918       # String/sized-binary types
2919       if ($data_type =~ /^(?:l?(?:var)?char(?:acter)?(?:\s*varying)?
2920                              |(?:var)?binary(?:\s*varying)?|raw)\b/x
2921       ) {
2922         $max_size = $inf->{size};
2923       }
2924       # Other charset/unicode types, assume scale of 4
2925       elsif ($data_type =~ /^(?:national\s*character(?:\s*varying)?|nchar
2926                               |univarchar
2927                               |nvarchar)\b/x
2928       ) {
2929         $max_size = $inf->{size} * 4 if $inf->{size};
2930       }
2931       # Blob types
2932       elsif ($self->_is_lob_type($data_type)) {
2933         # default to longreadlen
2934       }
2935       else {
2936         $max_size = 100;  # for all other (numeric?) datatypes
2937       }
2938     }
2939
2940     $max_size ||= $self->_get_dbh->{LongReadLen} || 8000;
2941   };
2942 }
2943
2944 # Determine if a data_type is some type of BLOB
2945 # FIXME: these regexes are expensive, result of these checks should be cached in
2946 # the column_info .
2947 sub _is_lob_type {
2948   my ($self, $data_type) = @_;
2949   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2950     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2951                                   |varchar|character\s*varying|nvarchar
2952                                   |national\s*character\s*varying))?\z/xi);
2953 }
2954
2955 sub _is_binary_lob_type {
2956   my ($self, $data_type) = @_;
2957   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2958     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2959 }
2960
2961 sub _is_text_lob_type {
2962   my ($self, $data_type) = @_;
2963   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2964     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2965                         |national\s*character\s*varying))\z/xi);
2966 }
2967
2968 1;
2969
2970 =head1 USAGE NOTES
2971
2972 =head2 DBIx::Class and AutoCommit
2973
2974 DBIx::Class can do some wonderful magic with handling exceptions,
2975 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2976 (the default) combined with C<txn_do> for transaction support.
2977
2978 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2979 in an assumed transaction between commits, and you're telling us you'd
2980 like to manage that manually.  A lot of the magic protections offered by
2981 this module will go away.  We can't protect you from exceptions due to database
2982 disconnects because we don't know anything about how to restart your
2983 transactions.  You're on your own for handling all sorts of exceptional
2984 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2985 be with raw DBI.
2986
2987
2988 =head1 AUTHORS
2989
2990 Matt S. Trout <mst@shadowcatsystems.co.uk>
2991
2992 Andy Grundman <andy@hybridized.org>
2993
2994 =head1 LICENSE
2995
2996 You may distribute this code under the same terms as Perl itself.
2997
2998 =cut