use more correct subsection links in POD
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Try::Tiny;
16 use overload ();
17 use namespace::clean;
18
19 # default cursor class, overridable in connect_info attributes
20 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
21
22 __PACKAGE__->mk_group_accessors('inherited' => qw/
23   sql_limit_dialect sql_quote_char sql_name_sep
24 /);
25
26 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
27
28 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
29 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
30
31 __PACKAGE__->sql_name_sep('.');
32
33 __PACKAGE__->mk_group_accessors('simple' => qw/
34   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
35   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
36 /);
37
38 # the values for these accessors are picked out (and deleted) from
39 # the attribute hashref passed to connect_info
40 my @storage_options = qw/
41   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
42   disable_sth_caching unsafe auto_savepoint
43 /;
44 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
45
46
47 # capability definitions, using a 2-tiered accessor system
48 # The rationale is:
49 #
50 # A driver/user may define _use_X, which blindly without any checks says:
51 # "(do not) use this capability", (use_dbms_capability is an "inherited"
52 # type accessor)
53 #
54 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
55 # accessor, which in turn calls _determine_supports_X, and stores the return
56 # in a special slot on the storage object, which is wiped every time a $dbh
57 # reconnection takes place (it is not guaranteed that upon reconnection we
58 # will get the same rdbms version). _determine_supports_X does not need to
59 # exist on a driver, as we ->can for it before calling.
60
61 my @capabilities = (qw/
62   insert_returning
63   insert_returning_bound
64   placeholders
65   typeless_placeholders
66   join_optimizer
67 /);
68 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
69 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
70
71 # on by default, not strictly a capability (pending rewrite)
72 __PACKAGE__->_use_join_optimizer (1);
73 sub _determine_supports_join_optimizer { 1 };
74
75 # Each of these methods need _determine_driver called before itself
76 # in order to function reliably. This is a purely DRY optimization
77 #
78 # get_(use)_dbms_capability need to be called on the correct Storage
79 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
80 # _determine_supports_X which obv. needs a correct driver as well
81 my @rdbms_specific_methods = qw/
82   deployment_statements
83   sqlt_type
84   sql_maker
85   build_datetime_parser
86   datetime_parser_type
87
88   txn_begin
89   insert
90   insert_bulk
91   update
92   delete
93   select
94   select_single
95   with_deferred_fk_checks
96
97   get_use_dbms_capability
98   get_dbms_capability
99
100   _server_info
101   _get_server_version
102 /;
103
104 for my $meth (@rdbms_specific_methods) {
105
106   my $orig = __PACKAGE__->can ($meth)
107     or die "$meth is not a ::Storage::DBI method!";
108
109   no strict qw/refs/;
110   no warnings qw/redefine/;
111   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
112     if (
113       # only fire when invoked on an instance, a valid class-based invocation
114       # would e.g. be setting a default for an inherited accessor
115       ref $_[0]
116         and
117       ! $_[0]->_driver_determined
118         and
119       ! $_[0]->{_in_determine_driver}
120     ) {
121       $_[0]->_determine_driver;
122
123       # This for some reason crashes and burns on perl 5.8.1
124       # IFF the method ends up throwing an exception
125       #goto $_[0]->can ($meth);
126
127       my $cref = $_[0]->can ($meth);
128       goto $cref;
129     }
130
131     goto $orig;
132   };
133 }
134
135 =head1 NAME
136
137 DBIx::Class::Storage::DBI - DBI storage handler
138
139 =head1 SYNOPSIS
140
141   my $schema = MySchema->connect('dbi:SQLite:my.db');
142
143   $schema->storage->debug(1);
144
145   my @stuff = $schema->storage->dbh_do(
146     sub {
147       my ($storage, $dbh, @args) = @_;
148       $dbh->do("DROP TABLE authors");
149     },
150     @column_list
151   );
152
153   $schema->resultset('Book')->search({
154      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
155   });
156
157 =head1 DESCRIPTION
158
159 This class represents the connection to an RDBMS via L<DBI>.  See
160 L<DBIx::Class::Storage> for general information.  This pod only
161 documents DBI-specific methods and behaviors.
162
163 =head1 METHODS
164
165 =cut
166
167 sub new {
168   my $new = shift->next::method(@_);
169
170   $new->_sql_maker_opts({});
171   $new->_dbh_details({});
172   $new->{_in_do_block} = 0;
173   $new->{_dbh_gen} = 0;
174
175   # read below to see what this does
176   $new->_arm_global_destructor;
177
178   $new;
179 }
180
181 # This is hack to work around perl shooting stuff in random
182 # order on exit(). If we do not walk the remaining storage
183 # objects in an END block, there is a *small but real* chance
184 # of a fork()ed child to kill the parent's shared DBI handle,
185 # *before perl reaches the DESTROY in this package*
186 # Yes, it is ugly and effective.
187 # Additionally this registry is used by the CLONE method to
188 # make sure no handles are shared between threads
189 {
190   my %seek_and_destroy;
191
192   sub _arm_global_destructor {
193     my $self = shift;
194     my $key = refaddr ($self);
195     $seek_and_destroy{$key} = $self;
196     weaken ($seek_and_destroy{$key});
197   }
198
199   END {
200     local $?; # just in case the DBI destructor changes it somehow
201
202     # destroy just the object if not native to this process/thread
203     $_->_verify_pid for (grep
204       { defined $_ }
205       values %seek_and_destroy
206     );
207   }
208
209   sub CLONE {
210     # As per DBI's recommendation, DBIC disconnects all handles as
211     # soon as possible (DBIC will reconnect only on demand from within
212     # the thread)
213     for (values %seek_and_destroy) {
214       next unless $_;
215       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
216       $_->_dbh(undef);
217
218       $_->transaction_depth(0);
219       $_->savepoints([]);
220     }
221   }
222 }
223
224 sub DESTROY {
225   my $self = shift;
226
227   # some databases spew warnings on implicit disconnect
228   local $SIG{__WARN__} = sub {};
229   $self->_dbh(undef);
230
231   # this op is necessary, since the very last perl runtime statement
232   # triggers a global destruction shootout, and the $SIG localization
233   # may very well be destroyed before perl actually gets to do the
234   # $dbh undef
235   1;
236 }
237
238 # handle pid changes correctly - do not destroy parent's connection
239 sub _verify_pid {
240   my $self = shift;
241
242   my $pid = $self->_conn_pid;
243   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
244     $dbh->{InactiveDestroy} = 1;
245     $self->{_dbh_gen}++;
246     $self->_dbh(undef);
247     $self->transaction_depth(0);
248     $self->savepoints([]);
249   }
250
251   return;
252 }
253
254 =head2 connect_info
255
256 This method is normally called by L<DBIx::Class::Schema/connection>, which
257 encapsulates its argument list in an arrayref before passing them here.
258
259 The argument list may contain:
260
261 =over
262
263 =item *
264
265 The same 4-element argument set one would normally pass to
266 L<DBI/connect>, optionally followed by
267 L<extra attributes|/DBIx::Class specific connection attributes>
268 recognized by DBIx::Class:
269
270   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
271
272 =item *
273
274 A single code reference which returns a connected
275 L<DBI database handle|DBI/connect> optionally followed by
276 L<extra attributes|/DBIx::Class specific connection attributes> recognized
277 by DBIx::Class:
278
279   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
280
281 =item *
282
283 A single hashref with all the attributes and the dsn/user/password
284 mixed together:
285
286   $connect_info_args = [{
287     dsn => $dsn,
288     user => $user,
289     password => $pass,
290     %dbi_attributes,
291     %extra_attributes,
292   }];
293
294   $connect_info_args = [{
295     dbh_maker => sub { DBI->connect (...) },
296     %dbi_attributes,
297     %extra_attributes,
298   }];
299
300 This is particularly useful for L<Catalyst> based applications, allowing the
301 following config (L<Config::General> style):
302
303   <Model::DB>
304     schema_class   App::DB
305     <connect_info>
306       dsn          dbi:mysql:database=test
307       user         testuser
308       password     TestPass
309       AutoCommit   1
310     </connect_info>
311   </Model::DB>
312
313 The C<dsn>/C<user>/C<password> combination can be substituted by the
314 C<dbh_maker> key whose value is a coderef that returns a connected
315 L<DBI database handle|DBI/connect>
316
317 =back
318
319 Please note that the L<DBI> docs recommend that you always explicitly
320 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
321 recommends that it be set to I<1>, and that you perform transactions
322 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
323 to I<1> if you do not do explicitly set it to zero.  This is the default
324 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
325
326 =head3 DBIx::Class specific connection attributes
327
328 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
329 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
330 the following connection options. These options can be mixed in with your other
331 L<DBI> connection attributes, or placed in a separate hashref
332 (C<\%extra_attributes>) as shown above.
333
334 Every time C<connect_info> is invoked, any previous settings for
335 these options will be cleared before setting the new ones, regardless of
336 whether any options are specified in the new C<connect_info>.
337
338
339 =over
340
341 =item on_connect_do
342
343 Specifies things to do immediately after connecting or re-connecting to
344 the database.  Its value may contain:
345
346 =over
347
348 =item a scalar
349
350 This contains one SQL statement to execute.
351
352 =item an array reference
353
354 This contains SQL statements to execute in order.  Each element contains
355 a string or a code reference that returns a string.
356
357 =item a code reference
358
359 This contains some code to execute.  Unlike code references within an
360 array reference, its return value is ignored.
361
362 =back
363
364 =item on_disconnect_do
365
366 Takes arguments in the same form as L</on_connect_do> and executes them
367 immediately before disconnecting from the database.
368
369 Note, this only runs if you explicitly call L</disconnect> on the
370 storage object.
371
372 =item on_connect_call
373
374 A more generalized form of L</on_connect_do> that calls the specified
375 C<connect_call_METHOD> methods in your storage driver.
376
377   on_connect_do => 'select 1'
378
379 is equivalent to:
380
381   on_connect_call => [ [ do_sql => 'select 1' ] ]
382
383 Its values may contain:
384
385 =over
386
387 =item a scalar
388
389 Will call the C<connect_call_METHOD> method.
390
391 =item a code reference
392
393 Will execute C<< $code->($storage) >>
394
395 =item an array reference
396
397 Each value can be a method name or code reference.
398
399 =item an array of arrays
400
401 For each array, the first item is taken to be the C<connect_call_> method name
402 or code reference, and the rest are parameters to it.
403
404 =back
405
406 Some predefined storage methods you may use:
407
408 =over
409
410 =item do_sql
411
412 Executes a SQL string or a code reference that returns a SQL string. This is
413 what L</on_connect_do> and L</on_disconnect_do> use.
414
415 It can take:
416
417 =over
418
419 =item a scalar
420
421 Will execute the scalar as SQL.
422
423 =item an arrayref
424
425 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
426 attributes hashref and bind values.
427
428 =item a code reference
429
430 Will execute C<< $code->($storage) >> and execute the return array refs as
431 above.
432
433 =back
434
435 =item datetime_setup
436
437 Execute any statements necessary to initialize the database session to return
438 and accept datetime/timestamp values used with
439 L<DBIx::Class::InflateColumn::DateTime>.
440
441 Only necessary for some databases, see your specific storage driver for
442 implementation details.
443
444 =back
445
446 =item on_disconnect_call
447
448 Takes arguments in the same form as L</on_connect_call> and executes them
449 immediately before disconnecting from the database.
450
451 Calls the C<disconnect_call_METHOD> methods as opposed to the
452 C<connect_call_METHOD> methods called by L</on_connect_call>.
453
454 Note, this only runs if you explicitly call L</disconnect> on the
455 storage object.
456
457 =item disable_sth_caching
458
459 If set to a true value, this option will disable the caching of
460 statement handles via L<DBI/prepare_cached>.
461
462 =item limit_dialect
463
464 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
465 default L</sql_limit_dialect> setting of the storage (if any). For a list
466 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
467
468 =item quote_names
469
470 When true automatically sets L</quote_char> and L</name_sep> to the characters
471 appropriate for your particular RDBMS. This option is preferred over specifying
472 L</quote_char> directly.
473
474 =item quote_char
475
476 Specifies what characters to use to quote table and column names.
477
478 C<quote_char> expects either a single character, in which case is it
479 is placed on either side of the table/column name, or an arrayref of length
480 2 in which case the table/column name is placed between the elements.
481
482 For example under MySQL you should use C<< quote_char => '`' >>, and for
483 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
484
485 =item name_sep
486
487 This parameter is only useful in conjunction with C<quote_char>, and is used to
488 specify the character that separates elements (schemas, tables, columns) from
489 each other. If unspecified it defaults to the most commonly used C<.>.
490
491 =item unsafe
492
493 This Storage driver normally installs its own C<HandleError>, sets
494 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
495 all database handles, including those supplied by a coderef.  It does this
496 so that it can have consistent and useful error behavior.
497
498 If you set this option to a true value, Storage will not do its usual
499 modifications to the database handle's attributes, and instead relies on
500 the settings in your connect_info DBI options (or the values you set in
501 your connection coderef, in the case that you are connecting via coderef).
502
503 Note that your custom settings can cause Storage to malfunction,
504 especially if you set a C<HandleError> handler that suppresses exceptions
505 and/or disable C<RaiseError>.
506
507 =item auto_savepoint
508
509 If this option is true, L<DBIx::Class> will use savepoints when nesting
510 transactions, making it possible to recover from failure in the inner
511 transaction without having to abort all outer transactions.
512
513 =item cursor_class
514
515 Use this argument to supply a cursor class other than the default
516 L<DBIx::Class::Storage::DBI::Cursor>.
517
518 =back
519
520 Some real-life examples of arguments to L</connect_info> and
521 L<DBIx::Class::Schema/connect>
522
523   # Simple SQLite connection
524   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
525
526   # Connect via subref
527   ->connect_info([ sub { DBI->connect(...) } ]);
528
529   # Connect via subref in hashref
530   ->connect_info([{
531     dbh_maker => sub { DBI->connect(...) },
532     on_connect_do => 'alter session ...',
533   }]);
534
535   # A bit more complicated
536   ->connect_info(
537     [
538       'dbi:Pg:dbname=foo',
539       'postgres',
540       'my_pg_password',
541       { AutoCommit => 1 },
542       { quote_char => q{"} },
543     ]
544   );
545
546   # Equivalent to the previous example
547   ->connect_info(
548     [
549       'dbi:Pg:dbname=foo',
550       'postgres',
551       'my_pg_password',
552       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
553     ]
554   );
555
556   # Same, but with hashref as argument
557   # See parse_connect_info for explanation
558   ->connect_info(
559     [{
560       dsn         => 'dbi:Pg:dbname=foo',
561       user        => 'postgres',
562       password    => 'my_pg_password',
563       AutoCommit  => 1,
564       quote_char  => q{"},
565       name_sep    => q{.},
566     }]
567   );
568
569   # Subref + DBIx::Class-specific connection options
570   ->connect_info(
571     [
572       sub { DBI->connect(...) },
573       {
574           quote_char => q{`},
575           name_sep => q{@},
576           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
577           disable_sth_caching => 1,
578       },
579     ]
580   );
581
582
583
584 =cut
585
586 sub connect_info {
587   my ($self, $info) = @_;
588
589   return $self->_connect_info if !$info;
590
591   $self->_connect_info($info); # copy for _connect_info
592
593   $info = $self->_normalize_connect_info($info)
594     if ref $info eq 'ARRAY';
595
596   for my $storage_opt (keys %{ $info->{storage_options} }) {
597     my $value = $info->{storage_options}{$storage_opt};
598
599     $self->$storage_opt($value);
600   }
601
602   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
603   #  the new set of options
604   $self->_sql_maker(undef);
605   $self->_sql_maker_opts({});
606
607   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
608     my $value = $info->{sql_maker_options}{$sql_maker_opt};
609
610     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
611   }
612
613   my %attrs = (
614     %{ $self->_default_dbi_connect_attributes || {} },
615     %{ $info->{attributes} || {} },
616   );
617
618   my @args = @{ $info->{arguments} };
619
620   if (keys %attrs and ref $args[0] ne 'CODE') {
621     carp
622         'You provided explicit AutoCommit => 0 in your connection_info. '
623       . 'This is almost universally a bad idea (see the footnotes of '
624       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
625       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
626       . 'this warning.'
627       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
628
629     push @args, \%attrs if keys %attrs;
630   }
631   $self->_dbi_connect_info(\@args);
632
633   # FIXME - dirty:
634   # save attributes them in a separate accessor so they are always
635   # introspectable, even in case of a CODE $dbhmaker
636   $self->_dbic_connect_attributes (\%attrs);
637
638   return $self->_connect_info;
639 }
640
641 sub _normalize_connect_info {
642   my ($self, $info_arg) = @_;
643   my %info;
644
645   my @args = @$info_arg;  # take a shallow copy for further mutilation
646
647   # combine/pre-parse arguments depending on invocation style
648
649   my %attrs;
650   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
651     %attrs = %{ $args[1] || {} };
652     @args = $args[0];
653   }
654   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
655     %attrs = %{$args[0]};
656     @args = ();
657     if (my $code = delete $attrs{dbh_maker}) {
658       @args = $code;
659
660       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
661       if (@ignored) {
662         carp sprintf (
663             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
664           . "to the result of 'dbh_maker'",
665
666           join (', ', map { "'$_'" } (@ignored) ),
667         );
668       }
669     }
670     else {
671       @args = delete @attrs{qw/dsn user password/};
672     }
673   }
674   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
675     %attrs = (
676       % { $args[3] || {} },
677       % { $args[4] || {} },
678     );
679     @args = @args[0,1,2];
680   }
681
682   $info{arguments} = \@args;
683
684   my @storage_opts = grep exists $attrs{$_},
685     @storage_options, 'cursor_class';
686
687   @{ $info{storage_options} }{@storage_opts} =
688     delete @attrs{@storage_opts} if @storage_opts;
689
690   my @sql_maker_opts = grep exists $attrs{$_},
691     qw/limit_dialect quote_char name_sep quote_names/;
692
693   @{ $info{sql_maker_options} }{@sql_maker_opts} =
694     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
695
696   $info{attributes} = \%attrs if %attrs;
697
698   return \%info;
699 }
700
701 sub _default_dbi_connect_attributes () {
702   +{
703     AutoCommit => 1,
704     PrintError => 0,
705     RaiseError => 1,
706     ShowErrorStatement => 1,
707   };
708 }
709
710 =head2 on_connect_do
711
712 This method is deprecated in favour of setting via L</connect_info>.
713
714 =cut
715
716 =head2 on_disconnect_do
717
718 This method is deprecated in favour of setting via L</connect_info>.
719
720 =cut
721
722 sub _parse_connect_do {
723   my ($self, $type) = @_;
724
725   my $val = $self->$type;
726   return () if not defined $val;
727
728   my @res;
729
730   if (not ref($val)) {
731     push @res, [ 'do_sql', $val ];
732   } elsif (ref($val) eq 'CODE') {
733     push @res, $val;
734   } elsif (ref($val) eq 'ARRAY') {
735     push @res, map { [ 'do_sql', $_ ] } @$val;
736   } else {
737     $self->throw_exception("Invalid type for $type: ".ref($val));
738   }
739
740   return \@res;
741 }
742
743 =head2 dbh_do
744
745 Arguments: ($subref | $method_name), @extra_coderef_args?
746
747 Execute the given $subref or $method_name using the new exception-based
748 connection management.
749
750 The first two arguments will be the storage object that C<dbh_do> was called
751 on and a database handle to use.  Any additional arguments will be passed
752 verbatim to the called subref as arguments 2 and onwards.
753
754 Using this (instead of $self->_dbh or $self->dbh) ensures correct
755 exception handling and reconnection (or failover in future subclasses).
756
757 Your subref should have no side-effects outside of the database, as
758 there is the potential for your subref to be partially double-executed
759 if the database connection was stale/dysfunctional.
760
761 Example:
762
763   my @stuff = $schema->storage->dbh_do(
764     sub {
765       my ($storage, $dbh, @cols) = @_;
766       my $cols = join(q{, }, @cols);
767       $dbh->selectrow_array("SELECT $cols FROM foo");
768     },
769     @column_list
770   );
771
772 =cut
773
774 sub dbh_do {
775   my $self = shift;
776   my $code = shift;
777
778   my $dbh = $self->_get_dbh;
779
780   return $self->$code($dbh, @_)
781     if ( $self->{_in_do_block} || $self->{transaction_depth} );
782
783   local $self->{_in_do_block} = 1;
784
785   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
786   my $args = \@_;
787
788   try {
789     $self->$code ($dbh, @$args);
790   } catch {
791     $self->throw_exception($_) if $self->connected;
792
793     # We were not connected - reconnect and retry, but let any
794     #  exception fall right through this time
795     carp "Retrying dbh_do($code) after catching disconnected exception: $_"
796       if $ENV{DBIC_STORAGE_RETRY_DEBUG};
797
798     $self->_populate_dbh;
799     $self->$code($self->_dbh, @$args);
800   };
801 }
802
803 sub txn_do {
804   # connects or reconnects on pid change, necessary to grab correct txn_depth
805   $_[0]->_get_dbh;
806   local $_[0]->{_in_do_block} = 1;
807   shift->next::method(@_);
808 }
809
810 =head2 disconnect
811
812 Our C<disconnect> method also performs a rollback first if the
813 database is not in C<AutoCommit> mode.
814
815 =cut
816
817 sub disconnect {
818   my ($self) = @_;
819
820   if( $self->_dbh ) {
821     my @actions;
822
823     push @actions, ( $self->on_disconnect_call || () );
824     push @actions, $self->_parse_connect_do ('on_disconnect_do');
825
826     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
827
828     # stops the "implicit rollback on disconnect" warning
829     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
830
831     %{ $self->_dbh->{CachedKids} } = ();
832     $self->_dbh->disconnect;
833     $self->_dbh(undef);
834     $self->{_dbh_gen}++;
835   }
836 }
837
838 =head2 with_deferred_fk_checks
839
840 =over 4
841
842 =item Arguments: C<$coderef>
843
844 =item Return Value: The return value of $coderef
845
846 =back
847
848 Storage specific method to run the code ref with FK checks deferred or
849 in MySQL's case disabled entirely.
850
851 =cut
852
853 # Storage subclasses should override this
854 sub with_deferred_fk_checks {
855   my ($self, $sub) = @_;
856   $sub->();
857 }
858
859 =head2 connected
860
861 =over
862
863 =item Arguments: none
864
865 =item Return Value: 1|0
866
867 =back
868
869 Verifies that the current database handle is active and ready to execute
870 an SQL statement (e.g. the connection did not get stale, server is still
871 answering, etc.) This method is used internally by L</dbh>.
872
873 =cut
874
875 sub connected {
876   my $self = shift;
877   return 0 unless $self->_seems_connected;
878
879   #be on the safe side
880   local $self->_dbh->{RaiseError} = 1;
881
882   return $self->_ping;
883 }
884
885 sub _seems_connected {
886   my $self = shift;
887
888   $self->_verify_pid;
889
890   my $dbh = $self->_dbh
891     or return 0;
892
893   return $dbh->FETCH('Active');
894 }
895
896 sub _ping {
897   my $self = shift;
898
899   my $dbh = $self->_dbh or return 0;
900
901   return $dbh->ping;
902 }
903
904 sub ensure_connected {
905   my ($self) = @_;
906
907   unless ($self->connected) {
908     $self->_populate_dbh;
909   }
910 }
911
912 =head2 dbh
913
914 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
915 is guaranteed to be healthy by implicitly calling L</connected>, and if
916 necessary performing a reconnection before returning. Keep in mind that this
917 is very B<expensive> on some database engines. Consider using L</dbh_do>
918 instead.
919
920 =cut
921
922 sub dbh {
923   my ($self) = @_;
924
925   if (not $self->_dbh) {
926     $self->_populate_dbh;
927   } else {
928     $self->ensure_connected;
929   }
930   return $self->_dbh;
931 }
932
933 # this is the internal "get dbh or connect (don't check)" method
934 sub _get_dbh {
935   my $self = shift;
936   $self->_verify_pid;
937   $self->_populate_dbh unless $self->_dbh;
938   return $self->_dbh;
939 }
940
941 sub sql_maker {
942   my ($self) = @_;
943   unless ($self->_sql_maker) {
944     my $sql_maker_class = $self->sql_maker_class;
945
946     my %opts = %{$self->_sql_maker_opts||{}};
947     my $dialect =
948       $opts{limit_dialect}
949         ||
950       $self->sql_limit_dialect
951         ||
952       do {
953         my $s_class = (ref $self) || $self;
954         carp (
955           "Your storage class ($s_class) does not set sql_limit_dialect and you "
956         . 'have not supplied an explicit limit_dialect in your connection_info. '
957         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
958         . 'databases but can be (and often is) painfully slow. '
959         . "Please file an RT ticket against '$s_class' ."
960         );
961
962         'GenericSubQ';
963       }
964     ;
965
966     my ($quote_char, $name_sep);
967
968     if ($opts{quote_names}) {
969       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
970         my $s_class = (ref $self) || $self;
971         carp (
972           "You requested 'quote_names' but your storage class ($s_class) does "
973         . 'not explicitly define a default sql_quote_char and you have not '
974         . 'supplied a quote_char as part of your connection_info. DBIC will '
975         .q{default to the ANSI SQL standard quote '"', which works most of }
976         . "the time. Please file an RT ticket against '$s_class'."
977         );
978
979         '"'; # RV
980       };
981
982       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
983     }
984
985     $self->_sql_maker($sql_maker_class->new(
986       bindtype=>'columns',
987       array_datatypes => 1,
988       limit_dialect => $dialect,
989       ($quote_char ? (quote_char => $quote_char) : ()),
990       name_sep => ($name_sep || '.'),
991       %opts,
992     ));
993   }
994   return $self->_sql_maker;
995 }
996
997 # nothing to do by default
998 sub _rebless {}
999 sub _init {}
1000
1001 sub _populate_dbh {
1002   my ($self) = @_;
1003
1004   my @info = @{$self->_dbi_connect_info || []};
1005   $self->_dbh(undef); # in case ->connected failed we might get sent here
1006   $self->_dbh_details({}); # reset everything we know
1007
1008   $self->_dbh($self->_connect(@info));
1009
1010   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1011
1012   $self->_determine_driver;
1013
1014   # Always set the transaction depth on connect, since
1015   #  there is no transaction in progress by definition
1016   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1017
1018   $self->_run_connection_actions unless $self->{_in_determine_driver};
1019 }
1020
1021 sub _run_connection_actions {
1022   my $self = shift;
1023   my @actions;
1024
1025   push @actions, ( $self->on_connect_call || () );
1026   push @actions, $self->_parse_connect_do ('on_connect_do');
1027
1028   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1029 }
1030
1031
1032
1033 sub set_use_dbms_capability {
1034   $_[0]->set_inherited ($_[1], $_[2]);
1035 }
1036
1037 sub get_use_dbms_capability {
1038   my ($self, $capname) = @_;
1039
1040   my $use = $self->get_inherited ($capname);
1041   return defined $use
1042     ? $use
1043     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1044   ;
1045 }
1046
1047 sub set_dbms_capability {
1048   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1049 }
1050
1051 sub get_dbms_capability {
1052   my ($self, $capname) = @_;
1053
1054   my $cap = $self->_dbh_details->{capability}{$capname};
1055
1056   unless (defined $cap) {
1057     if (my $meth = $self->can ("_determine$capname")) {
1058       $cap = $self->$meth ? 1 : 0;
1059     }
1060     else {
1061       $cap = 0;
1062     }
1063
1064     $self->set_dbms_capability ($capname, $cap);
1065   }
1066
1067   return $cap;
1068 }
1069
1070 sub _server_info {
1071   my $self = shift;
1072
1073   my $info;
1074   unless ($info = $self->_dbh_details->{info}) {
1075
1076     $info = {};
1077
1078     my $server_version = try { $self->_get_server_version };
1079
1080     if (defined $server_version) {
1081       $info->{dbms_version} = $server_version;
1082
1083       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1084       my @verparts = split (/\./, $numeric_version);
1085       if (
1086         @verparts
1087           &&
1088         $verparts[0] <= 999
1089       ) {
1090         # consider only up to 3 version parts, iff not more than 3 digits
1091         my @use_parts;
1092         while (@verparts && @use_parts < 3) {
1093           my $p = shift @verparts;
1094           last if $p > 999;
1095           push @use_parts, $p;
1096         }
1097         push @use_parts, 0 while @use_parts < 3;
1098
1099         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1100       }
1101     }
1102
1103     $self->_dbh_details->{info} = $info;
1104   }
1105
1106   return $info;
1107 }
1108
1109 sub _get_server_version {
1110   shift->_dbh_get_info(18);
1111 }
1112
1113 sub _dbh_get_info {
1114   my ($self, $info) = @_;
1115
1116   return try { $self->_get_dbh->get_info($info) } || undef;
1117 }
1118
1119 sub _determine_driver {
1120   my ($self) = @_;
1121
1122   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1123     my $started_connected = 0;
1124     local $self->{_in_determine_driver} = 1;
1125
1126     if (ref($self) eq __PACKAGE__) {
1127       my $driver;
1128       if ($self->_dbh) { # we are connected
1129         $driver = $self->_dbh->{Driver}{Name};
1130         $started_connected = 1;
1131       } else {
1132         # if connect_info is a CODEREF, we have no choice but to connect
1133         if (ref $self->_dbi_connect_info->[0] &&
1134             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1135           $self->_populate_dbh;
1136           $driver = $self->_dbh->{Driver}{Name};
1137         }
1138         else {
1139           # try to use dsn to not require being connected, the driver may still
1140           # force a connection in _rebless to determine version
1141           # (dsn may not be supplied at all if all we do is make a mock-schema)
1142           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1143           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1144           $driver ||= $ENV{DBI_DRIVER};
1145         }
1146       }
1147
1148       if ($driver) {
1149         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1150         if ($self->load_optional_class($storage_class)) {
1151           mro::set_mro($storage_class, 'c3');
1152           bless $self, $storage_class;
1153           $self->_rebless();
1154         }
1155       }
1156     }
1157
1158     $self->_driver_determined(1);
1159
1160     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1161
1162     $self->_init; # run driver-specific initializations
1163
1164     $self->_run_connection_actions
1165         if !$started_connected && defined $self->_dbh;
1166   }
1167 }
1168
1169 sub _do_connection_actions {
1170   my $self          = shift;
1171   my $method_prefix = shift;
1172   my $call          = shift;
1173
1174   if (not ref($call)) {
1175     my $method = $method_prefix . $call;
1176     $self->$method(@_);
1177   } elsif (ref($call) eq 'CODE') {
1178     $self->$call(@_);
1179   } elsif (ref($call) eq 'ARRAY') {
1180     if (ref($call->[0]) ne 'ARRAY') {
1181       $self->_do_connection_actions($method_prefix, $_) for @$call;
1182     } else {
1183       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1184     }
1185   } else {
1186     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1187   }
1188
1189   return $self;
1190 }
1191
1192 sub connect_call_do_sql {
1193   my $self = shift;
1194   $self->_do_query(@_);
1195 }
1196
1197 sub disconnect_call_do_sql {
1198   my $self = shift;
1199   $self->_do_query(@_);
1200 }
1201
1202 # override in db-specific backend when necessary
1203 sub connect_call_datetime_setup { 1 }
1204
1205 sub _do_query {
1206   my ($self, $action) = @_;
1207
1208   if (ref $action eq 'CODE') {
1209     $action = $action->($self);
1210     $self->_do_query($_) foreach @$action;
1211   }
1212   else {
1213     # Most debuggers expect ($sql, @bind), so we need to exclude
1214     # the attribute hash which is the second argument to $dbh->do
1215     # furthermore the bind values are usually to be presented
1216     # as named arrayref pairs, so wrap those here too
1217     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1218     my $sql = shift @do_args;
1219     my $attrs = shift @do_args;
1220     my @bind = map { [ undef, $_ ] } @do_args;
1221
1222     $self->_query_start($sql, \@bind);
1223     $self->_get_dbh->do($sql, $attrs, @do_args);
1224     $self->_query_end($sql, \@bind);
1225   }
1226
1227   return $self;
1228 }
1229
1230 sub _connect {
1231   my ($self, @info) = @_;
1232
1233   $self->throw_exception("You failed to provide any connection info")
1234     if !@info;
1235
1236   my ($old_connect_via, $dbh);
1237
1238   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1239     $old_connect_via = $DBI::connect_via;
1240     $DBI::connect_via = 'connect';
1241   }
1242
1243   try {
1244     if(ref $info[0] eq 'CODE') {
1245       $dbh = $info[0]->();
1246     }
1247     else {
1248       require DBI;
1249       $dbh = DBI->connect(@info);
1250     }
1251
1252     if (!$dbh) {
1253       die $DBI::errstr;
1254     }
1255
1256     unless ($self->unsafe) {
1257
1258       $self->throw_exception(
1259         'Refusing clobbering of {HandleError} installed on externally supplied '
1260        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1261       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1262
1263       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1264       # request, or an external handle. Complain and set anyway
1265       unless ($dbh->{RaiseError}) {
1266         carp( ref $info[0] eq 'CODE'
1267
1268           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1269            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1270            .'attribute has been supplied'
1271
1272           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1273            .'unsafe => 1. Toggling RaiseError back to true'
1274         );
1275
1276         $dbh->{RaiseError} = 1;
1277       }
1278
1279       # this odd anonymous coderef dereference is in fact really
1280       # necessary to avoid the unwanted effect described in perl5
1281       # RT#75792
1282       sub {
1283         my $weak_self = $_[0];
1284         weaken $weak_self;
1285
1286         # the coderef is blessed so we can distinguish it from externally
1287         # supplied handles (which must be preserved)
1288         $_[1]->{HandleError} = bless sub {
1289           if ($weak_self) {
1290             $weak_self->throw_exception("DBI Exception: $_[0]");
1291           }
1292           else {
1293             # the handler may be invoked by something totally out of
1294             # the scope of DBIC
1295             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1296           }
1297         }, '__DBIC__DBH__ERROR__HANDLER__';
1298       }->($self, $dbh);
1299     }
1300   }
1301   catch {
1302     $self->throw_exception("DBI Connection failed: $_")
1303   }
1304   finally {
1305     $DBI::connect_via = $old_connect_via if $old_connect_via;
1306   };
1307
1308   $self->_dbh_autocommit($dbh->{AutoCommit});
1309   $dbh;
1310 }
1311
1312 sub txn_begin {
1313   my $self = shift;
1314
1315   # this means we have not yet connected and do not know the AC status
1316   # (e.g. coderef $dbh), need a full-fledged connection check
1317   if (! defined $self->_dbh_autocommit) {
1318     $self->ensure_connected;
1319   }
1320   # Otherwise simply connect or re-connect on pid changes
1321   else {
1322     $self->_get_dbh;
1323   }
1324
1325   $self->next::method(@_);
1326 }
1327
1328 sub _exec_txn_begin {
1329   my $self = shift;
1330
1331   # if the user is utilizing txn_do - good for him, otherwise we need to
1332   # ensure that the $dbh is healthy on BEGIN.
1333   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1334   # will be replaced by a failure of begin_work itself (which will be
1335   # then retried on reconnect)
1336   if ($self->{_in_do_block}) {
1337     $self->_dbh->begin_work;
1338   } else {
1339     $self->dbh_do(sub { $_[1]->begin_work });
1340   }
1341 }
1342
1343 sub txn_commit {
1344   my $self = shift;
1345
1346   $self->_verify_pid if $self->_dbh;
1347   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1348     unless $self->_dbh;
1349
1350   # esoteric case for folks using external $dbh handles
1351   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1352     carp "Storage transaction_depth 0 does not match "
1353         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1354     $self->transaction_depth(1);
1355   }
1356
1357   $self->next::method(@_);
1358
1359   # if AutoCommit is disabled txn_depth never goes to 0
1360   # as a new txn is started immediately on commit
1361   $self->transaction_depth(1) if (
1362     !$self->transaction_depth
1363       and
1364     defined $self->_dbh_autocommit
1365       and
1366     ! $self->_dbh_autocommit
1367   );
1368 }
1369
1370 sub _exec_txn_commit {
1371   shift->_dbh->commit;
1372 }
1373
1374 sub txn_rollback {
1375   my $self = shift;
1376
1377   $self->_verify_pid if $self->_dbh;
1378   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1379     unless $self->_dbh;
1380
1381   # esoteric case for folks using external $dbh handles
1382   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1383     carp "Storage transaction_depth 0 does not match "
1384         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1385     $self->transaction_depth(1);
1386   }
1387
1388   $self->next::method(@_);
1389
1390   # if AutoCommit is disabled txn_depth never goes to 0
1391   # as a new txn is started immediately on commit
1392   $self->transaction_depth(1) if (
1393     !$self->transaction_depth
1394       and
1395     defined $self->_dbh_autocommit
1396       and
1397     ! $self->_dbh_autocommit
1398   );
1399 }
1400
1401 sub _exec_txn_rollback {
1402   shift->_dbh->rollback;
1403 }
1404
1405 # generate some identical methods
1406 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1407   no strict qw/refs/;
1408   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1409     my $self = shift;
1410     $self->_verify_pid if $self->_dbh;
1411     $self->throw_exception("Unable to $meth() on a disconnected storage")
1412       unless $self->_dbh;
1413     $self->next::method(@_);
1414   };
1415 }
1416
1417 # This used to be the top-half of _execute.  It was split out to make it
1418 #  easier to override in NoBindVars without duping the rest.  It takes up
1419 #  all of _execute's args, and emits $sql, @bind.
1420 sub _prep_for_execute {
1421   #my ($self, $op, $ident, $args) = @_;
1422   return shift->_gen_sql_bind(@_)
1423 }
1424
1425 sub _gen_sql_bind {
1426   my ($self, $op, $ident, $args) = @_;
1427
1428   my ($sql, @bind) = $self->sql_maker->$op(
1429     blessed($ident) ? $ident->from : $ident,
1430     @$args,
1431   );
1432
1433   my (@final_bind, $colinfos);
1434   my $resolve_bindinfo = sub {
1435     $colinfos ||= $self->_resolve_column_info($ident);
1436     if (my $col = $_[1]->{dbic_colname}) {
1437       $_[1]->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1438         if $colinfos->{$col}{data_type};
1439       $_[1]->{sqlt_size} ||= $colinfos->{$col}{size}
1440         if $colinfos->{$col}{size};
1441     }
1442     $_[1];
1443   };
1444
1445   for my $e (@{$args->[2]{bind}||[]}, @bind) {
1446     push @final_bind, [ do {
1447       if (ref $e ne 'ARRAY') {
1448         ({}, $e)
1449       }
1450       elsif (! defined $e->[0]) {
1451         ({}, $e->[1])
1452       }
1453       elsif (ref $e->[0] eq 'HASH') {
1454         (
1455           (first { $e->[0]{$_} } qw/dbd_attrs sqlt_datatype/) ? $e->[0] : $self->$resolve_bindinfo($e->[0]),
1456           $e->[1]
1457         )
1458       }
1459       elsif (ref $e->[0] eq 'SCALAR') {
1460         ( { sqlt_datatype => ${$e->[0]} }, $e->[1] )
1461       }
1462       else {
1463         ( $self->$resolve_bindinfo({ dbic_colname => $e->[0] }), $e->[1] )
1464       }
1465     }];
1466   }
1467
1468   if ($op eq 'select'
1469      && first { blessed($_->[1]) && $_->[1]->isa('DateTime') } @final_bind) {
1470
1471     carp_unique 'DateTime objects passed to search() are not supported '
1472       . 'properly (InflateColumn::DateTime formats and settings are not '
1473       . 'respected.) See "Formatting DateTime objects in queries" in '
1474       . 'DBIx::Class::Manual::Cookbook';
1475   }
1476
1477   ($sql, \@final_bind);
1478 }
1479
1480 sub _format_for_trace {
1481   #my ($self, $bind) = @_;
1482
1483   ### Turn @bind from something like this:
1484   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1485   ### to this:
1486   ###   ( "'1'", "'3'" )
1487
1488   map {
1489     defined( $_ && $_->[1] )
1490       ? qq{'$_->[1]'}
1491       : q{NULL}
1492   } @{$_[1] || []};
1493 }
1494
1495 sub _query_start {
1496   my ( $self, $sql, $bind ) = @_;
1497
1498   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1499     if $self->debug;
1500 }
1501
1502 sub _query_end {
1503   my ( $self, $sql, $bind ) = @_;
1504
1505   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1506     if $self->debug;
1507 }
1508
1509 my $sba_compat;
1510 sub _dbi_attrs_for_bind {
1511   my ($self, $ident, $bind) = @_;
1512
1513   if (! defined $sba_compat) {
1514     $self->_determine_driver;
1515     $sba_compat = $self->can('source_bind_attributes') == \&source_bind_attributes
1516       ? 0
1517       : 1
1518     ;
1519   }
1520
1521   my $sba_attrs;
1522   if ($sba_compat) {
1523     my $class = ref $self;
1524     carp_unique (
1525       "The source_bind_attributes() override in $class relies on a deprecated codepath. "
1526      .'You are strongly advised to switch your code to override bind_attribute_by_datatype() '
1527      .'instead. This legacy compat shim will also disappear some time before DBIC 0.09'
1528     );
1529
1530     my $sba_attrs = $self->source_bind_attributes
1531   }
1532
1533   my @attrs;
1534
1535   for (map { $_->[0] } @$bind) {
1536     push @attrs, do {
1537       if (exists $_->{dbd_attrs}) {
1538         $_->{dbd_attrs}
1539       }
1540       elsif($_->{sqlt_datatype}) {
1541         # cache the result in the dbh_details hash, as it can not change unless
1542         # we connect to something else
1543         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1544         if (not exists $cache->{$_->{sqlt_datatype}}) {
1545           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1546         }
1547         $cache->{$_->{sqlt_datatype}};
1548       }
1549       elsif ($sba_attrs and $_->{dbic_colname}) {
1550         $sba_attrs->{$_->{dbic_colname}} || undef;
1551       }
1552       else {
1553         undef;  # always push something at this position
1554       }
1555     }
1556   }
1557
1558   return \@attrs;
1559 }
1560
1561 sub _execute {
1562   my ($self, $op, $ident, @args) = @_;
1563
1564   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1565
1566   shift->dbh_do(    # retry over disconnects
1567     '_dbh_execute',
1568     $sql,
1569     $bind,
1570     $self->_dbi_attrs_for_bind($ident, $bind)
1571   );
1572 }
1573
1574 sub _dbh_execute {
1575   my ($self, undef, $sql, $bind, $bind_attrs) = @_;
1576
1577   $self->_query_start( $sql, $bind );
1578   my $sth = $self->_sth($sql);
1579
1580   for my $i (0 .. $#$bind) {
1581     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1582       $sth->bind_param_inout(
1583         $i + 1, # bind params counts are 1-based
1584         $bind->[$i][1],
1585         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1586         $bind_attrs->[$i],
1587       );
1588     }
1589     else {
1590       $sth->bind_param(
1591         $i + 1,
1592         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1593           ? "$bind->[$i][1]"
1594           : $bind->[$i][1]
1595         ,
1596         $bind_attrs->[$i],
1597       );
1598     }
1599   }
1600
1601   # Can this fail without throwing an exception anyways???
1602   my $rv = $sth->execute();
1603   $self->throw_exception(
1604     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1605   ) if !$rv;
1606
1607   $self->_query_end( $sql, $bind );
1608
1609   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1610 }
1611
1612 sub _prefetch_autovalues {
1613   my ($self, $source, $to_insert) = @_;
1614
1615   my $colinfo = $source->columns_info;
1616
1617   my %values;
1618   for my $col (keys %$colinfo) {
1619     if (
1620       $colinfo->{$col}{auto_nextval}
1621         and
1622       (
1623         ! exists $to_insert->{$col}
1624           or
1625         ref $to_insert->{$col} eq 'SCALAR'
1626           or
1627         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1628       )
1629     ) {
1630       $values{$col} = $self->_sequence_fetch(
1631         'NEXTVAL',
1632         ( $colinfo->{$col}{sequence} ||=
1633             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1634         ),
1635       );
1636     }
1637   }
1638
1639   \%values;
1640 }
1641
1642 sub insert {
1643   my ($self, $source, $to_insert) = @_;
1644
1645   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1646
1647   # fuse the values, but keep a separate list of prefetched_values so that
1648   # they can be fused once again with the final return
1649   $to_insert = { %$to_insert, %$prefetched_values };
1650
1651   my $col_infos = $source->columns_info;
1652   my %pcols = map { $_ => 1 } $source->primary_columns;
1653   my %retrieve_cols;
1654   for my $col ($source->columns) {
1655     # nothing to retrieve when explicit values are supplied
1656     next if (defined $to_insert->{$col} and ! (
1657       ref $to_insert->{$col} eq 'SCALAR'
1658         or
1659       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1660     ));
1661
1662     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1663     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1664       $pcols{$col}
1665         or
1666       $col_infos->{$col}{retrieve_on_insert}
1667     );
1668   };
1669
1670   my ($sqla_opts, @ir_container);
1671   if (%retrieve_cols and $self->_use_insert_returning) {
1672     $sqla_opts->{returning_container} = \@ir_container
1673       if $self->_use_insert_returning_bound;
1674
1675     $sqla_opts->{returning} = [
1676       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1677     ];
1678   }
1679
1680   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1681
1682   my %returned_cols = %$to_insert;
1683   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1684     @ir_container = try {
1685       local $SIG{__WARN__} = sub {};
1686       my @r = $sth->fetchrow_array;
1687       $sth->finish;
1688       @r;
1689     } unless @ir_container;
1690
1691     @returned_cols{@$retlist} = @ir_container if @ir_container;
1692   }
1693   else {
1694     # pull in PK if needed and then everything else
1695     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1696
1697       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1698         unless $self->can('last_insert_id');
1699
1700       my @pri_values = $self->last_insert_id($source, @missing_pri);
1701
1702       $self->throw_exception( "Can't get last insert id" )
1703         unless (@pri_values == @missing_pri);
1704
1705       @returned_cols{@missing_pri} = @pri_values;
1706       delete $retrieve_cols{$_} for @missing_pri;
1707     }
1708
1709     # if there is more left to pull
1710     if (%retrieve_cols) {
1711       $self->throw_exception(
1712         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1713       ) unless %pcols;
1714
1715       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1716
1717       my $cur = DBIx::Class::ResultSet->new($source, {
1718         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1719         select => \@left_to_fetch,
1720       })->cursor;
1721
1722       @returned_cols{@left_to_fetch} = $cur->next;
1723
1724       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1725         if scalar $cur->next;
1726     }
1727   }
1728
1729   return { %$prefetched_values, %returned_cols };
1730 }
1731
1732 sub insert_bulk {
1733   my ($self, $source, $cols, $data) = @_;
1734
1735   # FIXME - perhaps this is not even needed? does DBI stringify?
1736   #
1737   # forcibly stringify whatever is stringifiable
1738   for my $r (0 .. $#$data) {
1739     for my $c (0 .. $#{$data->[$r]}) {
1740       $data->[$r][$c] = "$data->[$r][$c]"
1741         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1742     }
1743   }
1744
1745   # check the data for consistency
1746   # report a sensible error on bad data
1747   #
1748   # also create a list of dynamic binds (ones that will be changing
1749   # for each row)
1750   my $dyn_bind_idx;
1751   for my $col_idx (0..$#$cols) {
1752
1753     # the first "row" is used as a point of reference
1754     my $reference_val = $data->[0][$col_idx];
1755     my $is_literal = ref $reference_val eq 'SCALAR';
1756     my $is_literal_bind = ( !$is_literal and (
1757       ref $reference_val eq 'REF'
1758         and
1759       ref $$reference_val eq 'ARRAY'
1760     ) );
1761
1762     $dyn_bind_idx->{$col_idx} = 1
1763       if (!$is_literal and !$is_literal_bind);
1764
1765     # use a closure for convenience (less to pass)
1766     my $bad_slice = sub {
1767       my ($msg, $slice_idx) = @_;
1768       $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1769         $msg,
1770         $cols->[$col_idx],
1771         do {
1772           require Data::Dumper::Concise;
1773           local $Data::Dumper::Maxdepth = 2;
1774           Data::Dumper::Concise::Dumper ({
1775             map { $cols->[$_] =>
1776               $data->[$slice_idx][$_]
1777             } (0 .. $#$cols)
1778           }),
1779         }
1780       );
1781     };
1782
1783     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
1784       my $val = $data->[$row_idx][$col_idx];
1785
1786       if ($is_literal) {
1787         if (ref $val ne 'SCALAR') {
1788           $bad_slice->(
1789             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
1790             $row_idx
1791           );
1792         }
1793         elsif ($$val ne $$reference_val) {
1794           $bad_slice->(
1795             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
1796             $row_idx
1797           );
1798         }
1799       }
1800       elsif ($is_literal_bind) {
1801         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
1802           $bad_slice->(
1803             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
1804             $row_idx
1805           );
1806         }
1807         elsif (${$val}->[0] ne ${$reference_val}->[0]) {
1808           $bad_slice->(
1809             "Inconsistent literal SQL-bind value (expecting \\['${$reference_val}->[0]', ... ])",
1810             $row_idx
1811           );
1812         }
1813       }
1814       elsif (ref $val) {
1815         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
1816           $bad_slice->("Literal SQL found where a plain bind value is expected", $row_idx);
1817         }
1818         else {
1819           $bad_slice->("$val reference found where bind expected", $row_idx);
1820         }
1821       }
1822     }
1823   }
1824
1825   # Get the sql with bind values interpolated where necessary. For dynamic
1826   # binds convert the values of the first row into a literal+bind combo, with
1827   # extra positional info in the bind attr hashref. This will allow us to match
1828   # the order properly, and is so contrived because a user-supplied literal
1829   # bind (or something else specific to a resultsource and/or storage driver)
1830   # can inject extra binds along the way, so one can't rely on "shift
1831   # positions" ordering at all. Also we can't just hand SQLA a set of some
1832   # known "values" (e.g. hashrefs that can be later matched up by address),
1833   # because we want to supply a real value on which perhaps e.g. datatype
1834   # checks will be performed
1835   my ($sql, $proto_bind) = $self->_prep_for_execute (
1836     'insert',
1837     $source,
1838     [ { map { $cols->[$_] => $dyn_bind_idx->{$_}
1839       ? \[ '?', [
1840           { dbic_colname => $cols->[$_], _bind_data_slice_idx => $_ }
1841             =>
1842           $data->[0][$_]
1843         ] ]
1844       : $data->[0][$_]
1845     } (0..$#$cols) } ],
1846   );
1847
1848   if (! @$proto_bind and keys %$dyn_bind_idx) {
1849     # if the bindlist is empty and we had some dynamic binds, this means the
1850     # storage ate them away (e.g. the NoBindVars component) and interpolated
1851     # them directly into the SQL. This obviosly can't be good for multi-inserts
1852     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1853   }
1854
1855   # neither _execute_array, nor _execute_inserts_with_no_binds are
1856   # atomic (even if _execute _array is a single call). Thus a safety
1857   # scope guard
1858   my $guard = $self->txn_scope_guard;
1859
1860   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
1861   my $sth = $self->_sth($sql);
1862   my $rv = do {
1863     if (@$proto_bind) {
1864       # proto bind contains the information on which pieces of $data to pull
1865       # $cols is passed in only for prettier error-reporting
1866       $self->_execute_array( $source, $sth, $proto_bind, $cols, $data );
1867     }
1868     else {
1869       # bind_param_array doesn't work if there are no binds
1870       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1871     }
1872   };
1873
1874   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
1875
1876   $guard->commit;
1877
1878   return (wantarray ? ($rv, $sth, @$proto_bind) : $rv);
1879 }
1880
1881 sub _execute_array {
1882   my ($self, $source, $sth, $proto_bind, $cols, $data, @extra) = @_;
1883
1884   ## This must be an arrayref, else nothing works!
1885   my $tuple_status = [];
1886
1887   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
1888
1889   # Bind the values by column slices
1890   for my $i (0 .. $#$proto_bind) {
1891     my $data_slice_idx = (
1892       ref $proto_bind->[$i][0] eq 'HASH'
1893         and
1894       exists $proto_bind->[$i][0]{_bind_data_slice_idx}
1895     ) ? $proto_bind->[$i][0]{_bind_data_slice_idx} : undef;
1896
1897     $sth->bind_param_array(
1898       $i+1, # DBI bind indexes are 1-based
1899       defined $data_slice_idx
1900         # either get a "column" of dynamic values, or just repeat the same
1901         # bind over and over
1902         ? [ map { $_->[$data_slice_idx] } @$data ]
1903         : [ ($proto_bind->[$i][1]) x @$data ]
1904       ,
1905       defined $bind_attrs->[$i] ? $bind_attrs->[$i] : (), # some DBDs throw up when given an undef
1906     );
1907   }
1908
1909   my ($rv, $err);
1910   try {
1911     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1912   }
1913   catch {
1914     $err = shift;
1915   };
1916
1917   # Not all DBDs are create equal. Some throw on error, some return
1918   # an undef $rv, and some set $sth->err - try whatever we can
1919   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1920     ! defined $err
1921       and
1922     ( !defined $rv or $sth->err )
1923   );
1924
1925   # Statement must finish even if there was an exception.
1926   try {
1927     $sth->finish
1928   }
1929   catch {
1930     $err = shift unless defined $err
1931   };
1932
1933   if (defined $err) {
1934     my $i = 0;
1935     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1936
1937     $self->throw_exception("Unexpected populate error: $err")
1938       if ($i > $#$tuple_status);
1939
1940     require Data::Dumper::Concise;
1941     $self->throw_exception(sprintf "execute_array() aborted with '%s' at populate slice:\n%s",
1942       ($tuple_status->[$i][1] || $err),
1943       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
1944     );
1945   }
1946
1947   return $rv;
1948 }
1949
1950 sub _dbh_execute_array {
1951   #my ($self, $sth, $tuple_status, @extra) = @_;
1952   return $_[1]->execute_array({ArrayTupleStatus => $_[2]});
1953 }
1954
1955 sub _dbh_execute_inserts_with_no_binds {
1956   my ($self, $sth, $count) = @_;
1957
1958   my $err;
1959   try {
1960     my $dbh = $self->_get_dbh;
1961     local $dbh->{RaiseError} = 1;
1962     local $dbh->{PrintError} = 0;
1963
1964     $sth->execute foreach 1..$count;
1965   }
1966   catch {
1967     $err = shift;
1968   };
1969
1970   # Make sure statement is finished even if there was an exception.
1971   try {
1972     $sth->finish
1973   }
1974   catch {
1975     $err = shift unless defined $err;
1976   };
1977
1978   $self->throw_exception($err) if defined $err;
1979
1980   return $count;
1981 }
1982
1983 sub update {
1984   #my ($self, $source, @args) = @_;
1985   shift->_execute('update', @_);
1986 }
1987
1988
1989 sub delete {
1990   #my ($self, $source, @args) = @_;
1991   shift->_execute('delete', @_);
1992 }
1993
1994 # We were sent here because the $rs contains a complex search
1995 # which will require a subquery to select the correct rows
1996 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1997 #
1998 # Generating a single PK column subquery is trivial and supported
1999 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
2000 # Look at _multipk_update_delete()
2001 sub _subq_update_delete {
2002   my $self = shift;
2003   my ($rs, $op, $values) = @_;
2004
2005   my $rsrc = $rs->result_source;
2006
2007   # quick check if we got a sane rs on our hands
2008   my @pcols = $rsrc->_pri_cols;
2009
2010   my $sel = $rs->_resolved_attrs->{select};
2011   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
2012
2013   if (
2014       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
2015         ne
2016       join ("\x00", sort @$sel )
2017   ) {
2018     $self->throw_exception (
2019       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
2020     );
2021   }
2022
2023   if (@pcols == 1) {
2024     return $self->$op (
2025       $rsrc,
2026       $op eq 'update' ? $values : (),
2027       { $pcols[0] => { -in => $rs->as_query } },
2028     );
2029   }
2030
2031   else {
2032     return $self->_multipk_update_delete (@_);
2033   }
2034 }
2035
2036 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2037 # resultset update/delete involving subqueries. So by default resort
2038 # to simple (and inefficient) delete_all style per-row opearations,
2039 # while allowing specific storages to override this with a faster
2040 # implementation.
2041 #
2042 sub _multipk_update_delete {
2043   return shift->_per_row_update_delete (@_);
2044 }
2045
2046 # This is the default loop used to delete/update rows for multi PK
2047 # resultsets, and used by mysql exclusively (because it can't do anything
2048 # else).
2049 #
2050 # We do not use $row->$op style queries, because resultset update/delete
2051 # is not expected to cascade (this is what delete_all/update_all is for).
2052 #
2053 # There should be no race conditions as the entire operation is rolled
2054 # in a transaction.
2055 #
2056 sub _per_row_update_delete {
2057   my $self = shift;
2058   my ($rs, $op, $values) = @_;
2059
2060   my $rsrc = $rs->result_source;
2061   my @pcols = $rsrc->_pri_cols;
2062
2063   my $guard = $self->txn_scope_guard;
2064
2065   # emulate the return value of $sth->execute for non-selects
2066   my $row_cnt = '0E0';
2067
2068   my $subrs_cur = $rs->cursor;
2069   my @all_pk = $subrs_cur->all;
2070   for my $pks ( @all_pk) {
2071
2072     my $cond;
2073     for my $i (0.. $#pcols) {
2074       $cond->{$pcols[$i]} = $pks->[$i];
2075     }
2076
2077     $self->$op (
2078       $rsrc,
2079       $op eq 'update' ? $values : (),
2080       $cond,
2081     );
2082
2083     $row_cnt++;
2084   }
2085
2086   $guard->commit;
2087
2088   return $row_cnt;
2089 }
2090
2091 sub _select {
2092   my $self = shift;
2093   $self->_execute($self->_select_args(@_));
2094 }
2095
2096 sub _select_args_to_query {
2097   my $self = shift;
2098
2099   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2100   #  = $self->_select_args($ident, $select, $cond, $attrs);
2101   my ($op, $ident, @args) =
2102     $self->_select_args(@_);
2103
2104   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2105   my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, \@args);
2106   $prepared_bind ||= [];
2107
2108   return wantarray
2109     ? ($sql, $prepared_bind)
2110     : \[ "($sql)", @$prepared_bind ]
2111   ;
2112 }
2113
2114 sub _select_args {
2115   my ($self, $ident, $select, $where, $attrs) = @_;
2116
2117   my $sql_maker = $self->sql_maker;
2118   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2119
2120   $attrs = {
2121     %$attrs,
2122     select => $select,
2123     from => $ident,
2124     where => $where,
2125     $rs_alias && $alias2source->{$rs_alias}
2126       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2127       : ()
2128     ,
2129   };
2130
2131   # Sanity check the attributes (SQLMaker does it too, but
2132   # in case of a software_limit we'll never reach there)
2133   if (defined $attrs->{offset}) {
2134     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2135       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2136   }
2137
2138   if (defined $attrs->{rows}) {
2139     $self->throw_exception("The rows attribute must be a positive integer if present")
2140       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2141   }
2142   elsif ($attrs->{offset}) {
2143     # MySQL actually recommends this approach.  I cringe.
2144     $attrs->{rows} = $sql_maker->__max_int;
2145   }
2146
2147   my @limit;
2148
2149   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2150   # storage, unless software limit was requested
2151   if (
2152     #limited has_many
2153     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2154        ||
2155     # grouped prefetch (to satisfy group_by == select)
2156     ( $attrs->{group_by}
2157         &&
2158       @{$attrs->{group_by}}
2159         &&
2160       $attrs->{_prefetch_selector_range}
2161     )
2162   ) {
2163     ($ident, $select, $where, $attrs)
2164       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2165   }
2166   elsif (! $attrs->{software_limit} ) {
2167     push @limit, (
2168       $attrs->{rows} || (),
2169       $attrs->{offset} || (),
2170     );
2171   }
2172
2173   # try to simplify the joinmap further (prune unreferenced type-single joins)
2174   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2175
2176 ###
2177   # This would be the point to deflate anything found in $where
2178   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2179   # expect a row object. And all we have is a resultsource (it is trivial
2180   # to extract deflator coderefs via $alias2source above).
2181   #
2182   # I don't see a way forward other than changing the way deflators are
2183   # invoked, and that's just bad...
2184 ###
2185
2186   return ('select', $ident, $select, $where, $attrs, @limit);
2187 }
2188
2189 # Returns a counting SELECT for a simple count
2190 # query. Abstracted so that a storage could override
2191 # this to { count => 'firstcol' } or whatever makes
2192 # sense as a performance optimization
2193 sub _count_select {
2194   #my ($self, $source, $rs_attrs) = @_;
2195   return { count => '*' };
2196 }
2197
2198 sub source_bind_attributes {
2199   shift->throw_exception(
2200     'source_bind_attributes() was never meant to be a callable public method - '
2201    .'please contact the DBIC dev-team and describe your use case so that a reasonable '
2202    .'solution can be provided'
2203    ."\nhttp://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT"
2204   );
2205 }
2206
2207 =head2 select
2208
2209 =over 4
2210
2211 =item Arguments: $ident, $select, $condition, $attrs
2212
2213 =back
2214
2215 Handle a SQL select statement.
2216
2217 =cut
2218
2219 sub select {
2220   my $self = shift;
2221   my ($ident, $select, $condition, $attrs) = @_;
2222   return $self->cursor_class->new($self, \@_, $attrs);
2223 }
2224
2225 sub select_single {
2226   my $self = shift;
2227   my ($rv, $sth, @bind) = $self->_select(@_);
2228   my @row = $sth->fetchrow_array;
2229   my @nextrow = $sth->fetchrow_array if @row;
2230   if(@row && @nextrow) {
2231     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2232   }
2233   # Need to call finish() to work round broken DBDs
2234   $sth->finish();
2235   return @row;
2236 }
2237
2238 =head2 sql_limit_dialect
2239
2240 This is an accessor for the default SQL limit dialect used by a particular
2241 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2242 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2243 see L<DBIx::Class::SQLMaker::LimitDialects>.
2244
2245 =cut
2246
2247 sub _dbh_sth {
2248   my ($self, $dbh, $sql) = @_;
2249
2250   # 3 is the if_active parameter which avoids active sth re-use
2251   my $sth = $self->disable_sth_caching
2252     ? $dbh->prepare($sql)
2253     : $dbh->prepare_cached($sql, {}, 3);
2254
2255   # XXX You would think RaiseError would make this impossible,
2256   #  but apparently that's not true :(
2257   $self->throw_exception(
2258     $dbh->errstr
2259       ||
2260     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2261             .'an exception and/or setting $dbh->errstr',
2262       length ($sql) > 20
2263         ? substr($sql, 0, 20) . '...'
2264         : $sql
2265       ,
2266       'DBD::' . $dbh->{Driver}{Name},
2267     )
2268   ) if !$sth;
2269
2270   $sth;
2271 }
2272
2273 sub sth {
2274   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2275   shift->_sth(@_);
2276 }
2277
2278 sub _sth {
2279   my ($self, $sql) = @_;
2280   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2281 }
2282
2283 sub _dbh_columns_info_for {
2284   my ($self, $dbh, $table) = @_;
2285
2286   if ($dbh->can('column_info')) {
2287     my %result;
2288     my $caught;
2289     try {
2290       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2291       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2292       $sth->execute();
2293       while ( my $info = $sth->fetchrow_hashref() ){
2294         my %column_info;
2295         $column_info{data_type}   = $info->{TYPE_NAME};
2296         $column_info{size}      = $info->{COLUMN_SIZE};
2297         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2298         $column_info{default_value} = $info->{COLUMN_DEF};
2299         my $col_name = $info->{COLUMN_NAME};
2300         $col_name =~ s/^\"(.*)\"$/$1/;
2301
2302         $result{$col_name} = \%column_info;
2303       }
2304     } catch {
2305       $caught = 1;
2306     };
2307     return \%result if !$caught && scalar keys %result;
2308   }
2309
2310   my %result;
2311   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2312   $sth->execute;
2313   my @columns = @{$sth->{NAME_lc}};
2314   for my $i ( 0 .. $#columns ){
2315     my %column_info;
2316     $column_info{data_type} = $sth->{TYPE}->[$i];
2317     $column_info{size} = $sth->{PRECISION}->[$i];
2318     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2319
2320     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2321       $column_info{data_type} = $1;
2322       $column_info{size}    = $2;
2323     }
2324
2325     $result{$columns[$i]} = \%column_info;
2326   }
2327   $sth->finish;
2328
2329   foreach my $col (keys %result) {
2330     my $colinfo = $result{$col};
2331     my $type_num = $colinfo->{data_type};
2332     my $type_name;
2333     if(defined $type_num && $dbh->can('type_info')) {
2334       my $type_info = $dbh->type_info($type_num);
2335       $type_name = $type_info->{TYPE_NAME} if $type_info;
2336       $colinfo->{data_type} = $type_name if $type_name;
2337     }
2338   }
2339
2340   return \%result;
2341 }
2342
2343 sub columns_info_for {
2344   my ($self, $table) = @_;
2345   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2346 }
2347
2348 =head2 last_insert_id
2349
2350 Return the row id of the last insert.
2351
2352 =cut
2353
2354 sub _dbh_last_insert_id {
2355     my ($self, $dbh, $source, $col) = @_;
2356
2357     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2358
2359     return $id if defined $id;
2360
2361     my $class = ref $self;
2362     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2363 }
2364
2365 sub last_insert_id {
2366   my $self = shift;
2367   $self->_dbh_last_insert_id ($self->_dbh, @_);
2368 }
2369
2370 =head2 _native_data_type
2371
2372 =over 4
2373
2374 =item Arguments: $type_name
2375
2376 =back
2377
2378 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2379 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2380 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2381
2382 The default implementation returns C<undef>, implement in your Storage driver if
2383 you need this functionality.
2384
2385 Should map types from other databases to the native RDBMS type, for example
2386 C<VARCHAR2> to C<VARCHAR>.
2387
2388 Types with modifiers should map to the underlying data type. For example,
2389 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2390
2391 Composite types should map to the container type, for example
2392 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2393
2394 =cut
2395
2396 sub _native_data_type {
2397   #my ($self, $data_type) = @_;
2398   return undef
2399 }
2400
2401 # Check if placeholders are supported at all
2402 sub _determine_supports_placeholders {
2403   my $self = shift;
2404   my $dbh  = $self->_get_dbh;
2405
2406   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2407   # but it is inaccurate more often than not
2408   return try {
2409     local $dbh->{PrintError} = 0;
2410     local $dbh->{RaiseError} = 1;
2411     $dbh->do('select ?', {}, 1);
2412     1;
2413   }
2414   catch {
2415     0;
2416   };
2417 }
2418
2419 # Check if placeholders bound to non-string types throw exceptions
2420 #
2421 sub _determine_supports_typeless_placeholders {
2422   my $self = shift;
2423   my $dbh  = $self->_get_dbh;
2424
2425   return try {
2426     local $dbh->{PrintError} = 0;
2427     local $dbh->{RaiseError} = 1;
2428     # this specifically tests a bind that is NOT a string
2429     $dbh->do('select 1 where 1 = ?', {}, 1);
2430     1;
2431   }
2432   catch {
2433     0;
2434   };
2435 }
2436
2437 =head2 sqlt_type
2438
2439 Returns the database driver name.
2440
2441 =cut
2442
2443 sub sqlt_type {
2444   shift->_get_dbh->{Driver}->{Name};
2445 }
2446
2447 =head2 bind_attribute_by_data_type
2448
2449 Given a datatype from column info, returns a database specific bind
2450 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2451 let the database planner just handle it.
2452
2453 Generally only needed for special case column types, like bytea in postgres.
2454
2455 =cut
2456
2457 sub bind_attribute_by_data_type {
2458     return;
2459 }
2460
2461 =head2 is_datatype_numeric
2462
2463 Given a datatype from column_info, returns a boolean value indicating if
2464 the current RDBMS considers it a numeric value. This controls how
2465 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2466 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2467 be performed instead of the usual C<eq>.
2468
2469 =cut
2470
2471 sub is_datatype_numeric {
2472   #my ($self, $dt) = @_;
2473
2474   return 0 unless $_[1];
2475
2476   $_[1] =~ /^ (?:
2477     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2478   ) $/ix;
2479 }
2480
2481
2482 =head2 create_ddl_dir
2483
2484 =over 4
2485
2486 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2487
2488 =back
2489
2490 Creates a SQL file based on the Schema, for each of the specified
2491 database engines in C<\@databases> in the given directory.
2492 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2493
2494 Given a previous version number, this will also create a file containing
2495 the ALTER TABLE statements to transform the previous schema into the
2496 current one. Note that these statements may contain C<DROP TABLE> or
2497 C<DROP COLUMN> statements that can potentially destroy data.
2498
2499 The file names are created using the C<ddl_filename> method below, please
2500 override this method in your schema if you would like a different file
2501 name format. For the ALTER file, the same format is used, replacing
2502 $version in the name with "$preversion-$version".
2503
2504 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2505 The most common value for this would be C<< { add_drop_table => 1 } >>
2506 to have the SQL produced include a C<DROP TABLE> statement for each table
2507 created. For quoting purposes supply C<quote_table_names> and
2508 C<quote_field_names>.
2509
2510 If no arguments are passed, then the following default values are assumed:
2511
2512 =over 4
2513
2514 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2515
2516 =item version    - $schema->schema_version
2517
2518 =item directory  - './'
2519
2520 =item preversion - <none>
2521
2522 =back
2523
2524 By default, C<\%sqlt_args> will have
2525
2526  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2527
2528 merged with the hash passed in. To disable any of those features, pass in a
2529 hashref like the following
2530
2531  { ignore_constraint_names => 0, # ... other options }
2532
2533
2534 WARNING: You are strongly advised to check all SQL files created, before applying
2535 them.
2536
2537 =cut
2538
2539 sub create_ddl_dir {
2540   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2541
2542   unless ($dir) {
2543     carp "No directory given, using ./\n";
2544     $dir = './';
2545   } else {
2546       -d $dir
2547         or
2548       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2549         or
2550       $self->throw_exception(
2551         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2552       );
2553   }
2554
2555   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2556
2557   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2558   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2559
2560   my $schema_version = $schema->schema_version || '1.x';
2561   $version ||= $schema_version;
2562
2563   $sqltargs = {
2564     add_drop_table => 1,
2565     ignore_constraint_names => 1,
2566     ignore_index_names => 1,
2567     %{$sqltargs || {}}
2568   };
2569
2570   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2571     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2572   }
2573
2574   my $sqlt = SQL::Translator->new( $sqltargs );
2575
2576   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2577   my $sqlt_schema = $sqlt->translate({ data => $schema })
2578     or $self->throw_exception ($sqlt->error);
2579
2580   foreach my $db (@$databases) {
2581     $sqlt->reset();
2582     $sqlt->{schema} = $sqlt_schema;
2583     $sqlt->producer($db);
2584
2585     my $file;
2586     my $filename = $schema->ddl_filename($db, $version, $dir);
2587     if (-e $filename && ($version eq $schema_version )) {
2588       # if we are dumping the current version, overwrite the DDL
2589       carp "Overwriting existing DDL file - $filename";
2590       unlink($filename);
2591     }
2592
2593     my $output = $sqlt->translate;
2594     if(!$output) {
2595       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2596       next;
2597     }
2598     if(!open($file, ">$filename")) {
2599       $self->throw_exception("Can't open $filename for writing ($!)");
2600       next;
2601     }
2602     print $file $output;
2603     close($file);
2604
2605     next unless ($preversion);
2606
2607     require SQL::Translator::Diff;
2608
2609     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2610     if(!-e $prefilename) {
2611       carp("No previous schema file found ($prefilename)");
2612       next;
2613     }
2614
2615     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2616     if(-e $difffile) {
2617       carp("Overwriting existing diff file - $difffile");
2618       unlink($difffile);
2619     }
2620
2621     my $source_schema;
2622     {
2623       my $t = SQL::Translator->new($sqltargs);
2624       $t->debug( 0 );
2625       $t->trace( 0 );
2626
2627       $t->parser( $db )
2628         or $self->throw_exception ($t->error);
2629
2630       my $out = $t->translate( $prefilename )
2631         or $self->throw_exception ($t->error);
2632
2633       $source_schema = $t->schema;
2634
2635       $source_schema->name( $prefilename )
2636         unless ( $source_schema->name );
2637     }
2638
2639     # The "new" style of producers have sane normalization and can support
2640     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2641     # And we have to diff parsed SQL against parsed SQL.
2642     my $dest_schema = $sqlt_schema;
2643
2644     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2645       my $t = SQL::Translator->new($sqltargs);
2646       $t->debug( 0 );
2647       $t->trace( 0 );
2648
2649       $t->parser( $db )
2650         or $self->throw_exception ($t->error);
2651
2652       my $out = $t->translate( $filename )
2653         or $self->throw_exception ($t->error);
2654
2655       $dest_schema = $t->schema;
2656
2657       $dest_schema->name( $filename )
2658         unless $dest_schema->name;
2659     }
2660
2661     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2662                                                   $dest_schema,   $db,
2663                                                   $sqltargs
2664                                                  );
2665     if(!open $file, ">$difffile") {
2666       $self->throw_exception("Can't write to $difffile ($!)");
2667       next;
2668     }
2669     print $file $diff;
2670     close($file);
2671   }
2672 }
2673
2674 =head2 deployment_statements
2675
2676 =over 4
2677
2678 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2679
2680 =back
2681
2682 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2683
2684 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2685 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2686
2687 C<$directory> is used to return statements from files in a previously created
2688 L</create_ddl_dir> directory and is optional. The filenames are constructed
2689 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2690
2691 If no C<$directory> is specified then the statements are constructed on the
2692 fly using L<SQL::Translator> and C<$version> is ignored.
2693
2694 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2695
2696 =cut
2697
2698 sub deployment_statements {
2699   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2700   $type ||= $self->sqlt_type;
2701   $version ||= $schema->schema_version || '1.x';
2702   $dir ||= './';
2703   my $filename = $schema->ddl_filename($type, $version, $dir);
2704   if(-f $filename)
2705   {
2706       # FIXME replace this block when a proper sane sql parser is available
2707       my $file;
2708       open($file, "<$filename")
2709         or $self->throw_exception("Can't open $filename ($!)");
2710       my @rows = <$file>;
2711       close($file);
2712       return join('', @rows);
2713   }
2714
2715   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2716     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2717   }
2718
2719   # sources needs to be a parser arg, but for simplicty allow at top level
2720   # coming in
2721   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2722       if exists $sqltargs->{sources};
2723
2724   my $tr = SQL::Translator->new(
2725     producer => "SQL::Translator::Producer::${type}",
2726     %$sqltargs,
2727     parser => 'SQL::Translator::Parser::DBIx::Class',
2728     data => $schema,
2729   );
2730
2731   my @ret;
2732   if (wantarray) {
2733     @ret = $tr->translate;
2734   }
2735   else {
2736     $ret[0] = $tr->translate;
2737   }
2738
2739   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2740     unless (@ret && defined $ret[0]);
2741
2742   return wantarray ? @ret : $ret[0];
2743 }
2744
2745 # FIXME deploy() currently does not accurately report sql errors
2746 # Will always return true while errors are warned
2747 sub deploy {
2748   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2749   my $deploy = sub {
2750     my $line = shift;
2751     return if(!$line);
2752     return if($line =~ /^--/);
2753     # next if($line =~ /^DROP/m);
2754     return if($line =~ /^BEGIN TRANSACTION/m);
2755     return if($line =~ /^COMMIT/m);
2756     return if $line =~ /^\s+$/; # skip whitespace only
2757     $self->_query_start($line);
2758     try {
2759       # do a dbh_do cycle here, as we need some error checking in
2760       # place (even though we will ignore errors)
2761       $self->dbh_do (sub { $_[1]->do($line) });
2762     } catch {
2763       carp qq{$_ (running "${line}")};
2764     };
2765     $self->_query_end($line);
2766   };
2767   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2768   if (@statements > 1) {
2769     foreach my $statement (@statements) {
2770       $deploy->( $statement );
2771     }
2772   }
2773   elsif (@statements == 1) {
2774     # split on single line comments and end of statements
2775     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2776       $deploy->( $line );
2777     }
2778   }
2779 }
2780
2781 =head2 datetime_parser
2782
2783 Returns the datetime parser class
2784
2785 =cut
2786
2787 sub datetime_parser {
2788   my $self = shift;
2789   return $self->{datetime_parser} ||= do {
2790     $self->build_datetime_parser(@_);
2791   };
2792 }
2793
2794 =head2 datetime_parser_type
2795
2796 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2797
2798 =head2 build_datetime_parser
2799
2800 See L</datetime_parser>
2801
2802 =cut
2803
2804 sub build_datetime_parser {
2805   my $self = shift;
2806   my $type = $self->datetime_parser_type(@_);
2807   return $type;
2808 }
2809
2810
2811 =head2 is_replicating
2812
2813 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2814 replicate from a master database.  Default is undef, which is the result
2815 returned by databases that don't support replication.
2816
2817 =cut
2818
2819 sub is_replicating {
2820     return;
2821
2822 }
2823
2824 =head2 lag_behind_master
2825
2826 Returns a number that represents a certain amount of lag behind a master db
2827 when a given storage is replicating.  The number is database dependent, but
2828 starts at zero and increases with the amount of lag. Default in undef
2829
2830 =cut
2831
2832 sub lag_behind_master {
2833     return;
2834 }
2835
2836 =head2 relname_to_table_alias
2837
2838 =over 4
2839
2840 =item Arguments: $relname, $join_count
2841
2842 =back
2843
2844 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2845 queries.
2846
2847 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2848 way these aliases are named.
2849
2850 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2851 otherwise C<"$relname">.
2852
2853 =cut
2854
2855 sub relname_to_table_alias {
2856   my ($self, $relname, $join_count) = @_;
2857
2858   my $alias = ($join_count && $join_count > 1 ?
2859     join('_', $relname, $join_count) : $relname);
2860
2861   return $alias;
2862 }
2863
2864 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2865 # version and it may be necessary to amend or override it for a specific storage
2866 # if such binds are necessary.
2867 sub _max_column_bytesize {
2868   my ($self, $attr) = @_;
2869
2870   my $max_size;
2871
2872   if ($attr->{sqlt_datatype}) {
2873     my $data_type = lc($attr->{sqlt_datatype});
2874
2875     if ($attr->{sqlt_size}) {
2876
2877       # String/sized-binary types
2878       if ($data_type =~ /^(?:
2879           l? (?:var)? char(?:acter)? (?:\s*varying)?
2880             |
2881           (?:var)? binary (?:\s*varying)?
2882             |
2883           raw
2884         )\b/x
2885       ) {
2886         $max_size = $attr->{sqlt_size};
2887       }
2888       # Other charset/unicode types, assume scale of 4
2889       elsif ($data_type =~ /^(?:
2890           national \s* character (?:\s*varying)?
2891             |
2892           nchar
2893             |
2894           univarchar
2895             |
2896           nvarchar
2897         )\b/x
2898       ) {
2899         $max_size = $attr->{sqlt_size} * 4;
2900       }
2901     }
2902
2903     if (!$max_size and !$self->_is_lob_type($data_type)) {
2904       $max_size = 100 # for all other (numeric?) datatypes
2905     }
2906   }
2907
2908   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
2909 }
2910
2911 # Determine if a data_type is some type of BLOB
2912 sub _is_lob_type {
2913   my ($self, $data_type) = @_;
2914   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2915     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2916                                   |varchar|character\s*varying|nvarchar
2917                                   |national\s*character\s*varying))?\z/xi);
2918 }
2919
2920 sub _is_binary_lob_type {
2921   my ($self, $data_type) = @_;
2922   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2923     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2924 }
2925
2926 sub _is_text_lob_type {
2927   my ($self, $data_type) = @_;
2928   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2929     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2930                         |national\s*character\s*varying))\z/xi);
2931 }
2932
2933 1;
2934
2935 =head1 USAGE NOTES
2936
2937 =head2 DBIx::Class and AutoCommit
2938
2939 DBIx::Class can do some wonderful magic with handling exceptions,
2940 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2941 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
2942 transaction support.
2943
2944 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2945 in an assumed transaction between commits, and you're telling us you'd
2946 like to manage that manually.  A lot of the magic protections offered by
2947 this module will go away.  We can't protect you from exceptions due to database
2948 disconnects because we don't know anything about how to restart your
2949 transactions.  You're on your own for handling all sorts of exceptional
2950 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2951 be with raw DBI.
2952
2953
2954 =head1 AUTHORS
2955
2956 Matt S. Trout <mst@shadowcatsystems.co.uk>
2957
2958 Andy Grundman <andy@hybridized.org>
2959
2960 =head1 LICENSE
2961
2962 You may distribute this code under the same terms as Perl itself.
2963
2964 =cut