Reduce to a warning the commit-without-apparent-begin exception from 7d216b10
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use overload ();
20 use namespace::clean;
21
22
23 # default cursor class, overridable in connect_info attributes
24 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
25
26 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
27 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/
30   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
31   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
32   transaction_depth _dbh_autocommit  savepoints
33 /);
34
35 # the values for these accessors are picked out (and deleted) from
36 # the attribute hashref passed to connect_info
37 my @storage_options = qw/
38   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
39   disable_sth_caching unsafe auto_savepoint
40 /;
41 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
42
43
44 # capability definitions, using a 2-tiered accessor system
45 # The rationale is:
46 #
47 # A driver/user may define _use_X, which blindly without any checks says:
48 # "(do not) use this capability", (use_dbms_capability is an "inherited"
49 # type accessor)
50 #
51 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
52 # accessor, which in turn calls _determine_supports_X, and stores the return
53 # in a special slot on the storage object, which is wiped every time a $dbh
54 # reconnection takes place (it is not guaranteed that upon reconnection we
55 # will get the same rdbms version). _determine_supports_X does not need to
56 # exist on a driver, as we ->can for it before calling.
57
58 my @capabilities = (qw/
59   insert_returning
60   insert_returning_bound
61   placeholders
62   typeless_placeholders
63   join_optimizer
64 /);
65 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
66 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
67
68 # on by default, not strictly a capability (pending rewrite)
69 __PACKAGE__->_use_join_optimizer (1);
70 sub _determine_supports_join_optimizer { 1 };
71
72 # Each of these methods need _determine_driver called before itself
73 # in order to function reliably. This is a purely DRY optimization
74 #
75 # get_(use)_dbms_capability need to be called on the correct Storage
76 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
77 # _determine_supports_X which obv. needs a correct driver as well
78 my @rdbms_specific_methods = qw/
79   deployment_statements
80   sqlt_type
81   sql_maker
82   build_datetime_parser
83   datetime_parser_type
84
85   insert
86   insert_bulk
87   update
88   delete
89   select
90   select_single
91
92   get_use_dbms_capability
93   get_dbms_capability
94
95   _server_info
96   _get_server_version
97 /;
98
99 for my $meth (@rdbms_specific_methods) {
100
101   my $orig = __PACKAGE__->can ($meth)
102     or die "$meth is not a ::Storage::DBI method!";
103
104   no strict qw/refs/;
105   no warnings qw/redefine/;
106   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
107     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
108       $_[0]->_determine_driver;
109
110       # This for some reason crashes and burns on perl 5.8.1
111       # IFF the method ends up throwing an exception
112       #goto $_[0]->can ($meth);
113
114       my $cref = $_[0]->can ($meth);
115       goto $cref;
116     }
117     goto $orig;
118   };
119 }
120
121
122 =head1 NAME
123
124 DBIx::Class::Storage::DBI - DBI storage handler
125
126 =head1 SYNOPSIS
127
128   my $schema = MySchema->connect('dbi:SQLite:my.db');
129
130   $schema->storage->debug(1);
131
132   my @stuff = $schema->storage->dbh_do(
133     sub {
134       my ($storage, $dbh, @args) = @_;
135       $dbh->do("DROP TABLE authors");
136     },
137     @column_list
138   );
139
140   $schema->resultset('Book')->search({
141      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
142   });
143
144 =head1 DESCRIPTION
145
146 This class represents the connection to an RDBMS via L<DBI>.  See
147 L<DBIx::Class::Storage> for general information.  This pod only
148 documents DBI-specific methods and behaviors.
149
150 =head1 METHODS
151
152 =cut
153
154 sub new {
155   my $new = shift->next::method(@_);
156
157   $new->transaction_depth(0);
158   $new->_sql_maker_opts({});
159   $new->_dbh_details({});
160   $new->{savepoints} = [];
161   $new->{_in_dbh_do} = 0;
162   $new->{_dbh_gen} = 0;
163
164   # read below to see what this does
165   $new->_arm_global_destructor;
166
167   $new;
168 }
169
170 # This is hack to work around perl shooting stuff in random
171 # order on exit(). If we do not walk the remaining storage
172 # objects in an END block, there is a *small but real* chance
173 # of a fork()ed child to kill the parent's shared DBI handle,
174 # *before perl reaches the DESTROY in this package*
175 # Yes, it is ugly and effective.
176 # Additionally this registry is used by the CLONE method to
177 # make sure no handles are shared between threads
178 {
179   my %seek_and_destroy;
180
181   sub _arm_global_destructor {
182     my $self = shift;
183     my $key = refaddr ($self);
184     $seek_and_destroy{$key} = $self;
185     weaken ($seek_and_destroy{$key});
186   }
187
188   END {
189     local $?; # just in case the DBI destructor changes it somehow
190
191     # destroy just the object if not native to this process/thread
192     $_->_verify_pid for (grep
193       { defined $_ }
194       values %seek_and_destroy
195     );
196   }
197
198   sub CLONE {
199     # As per DBI's recommendation, DBIC disconnects all handles as
200     # soon as possible (DBIC will reconnect only on demand from within
201     # the thread)
202     for (values %seek_and_destroy) {
203       next unless $_;
204       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
205       $_->_dbh(undef);
206     }
207   }
208 }
209
210 sub DESTROY {
211   my $self = shift;
212
213   # some databases spew warnings on implicit disconnect
214   local $SIG{__WARN__} = sub {};
215   $self->_dbh(undef);
216
217   # this op is necessary, since the very last perl runtime statement
218   # triggers a global destruction shootout, and the $SIG localization
219   # may very well be destroyed before perl actually gets to do the
220   # $dbh undef
221   1;
222 }
223
224 # handle pid changes correctly - do not destroy parent's connection
225 sub _verify_pid {
226   my $self = shift;
227
228   my $pid = $self->_conn_pid;
229   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
230     $dbh->{InactiveDestroy} = 1;
231     $self->{_dbh_gen}++;
232     $self->_dbh(undef);
233   }
234
235   return;
236 }
237
238 =head2 connect_info
239
240 This method is normally called by L<DBIx::Class::Schema/connection>, which
241 encapsulates its argument list in an arrayref before passing them here.
242
243 The argument list may contain:
244
245 =over
246
247 =item *
248
249 The same 4-element argument set one would normally pass to
250 L<DBI/connect>, optionally followed by
251 L<extra attributes|/DBIx::Class specific connection attributes>
252 recognized by DBIx::Class:
253
254   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
255
256 =item *
257
258 A single code reference which returns a connected
259 L<DBI database handle|DBI/connect> optionally followed by
260 L<extra attributes|/DBIx::Class specific connection attributes> recognized
261 by DBIx::Class:
262
263   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
264
265 =item *
266
267 A single hashref with all the attributes and the dsn/user/password
268 mixed together:
269
270   $connect_info_args = [{
271     dsn => $dsn,
272     user => $user,
273     password => $pass,
274     %dbi_attributes,
275     %extra_attributes,
276   }];
277
278   $connect_info_args = [{
279     dbh_maker => sub { DBI->connect (...) },
280     %dbi_attributes,
281     %extra_attributes,
282   }];
283
284 This is particularly useful for L<Catalyst> based applications, allowing the
285 following config (L<Config::General> style):
286
287   <Model::DB>
288     schema_class   App::DB
289     <connect_info>
290       dsn          dbi:mysql:database=test
291       user         testuser
292       password     TestPass
293       AutoCommit   1
294     </connect_info>
295   </Model::DB>
296
297 The C<dsn>/C<user>/C<password> combination can be substituted by the
298 C<dbh_maker> key whose value is a coderef that returns a connected
299 L<DBI database handle|DBI/connect>
300
301 =back
302
303 Please note that the L<DBI> docs recommend that you always explicitly
304 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
305 recommends that it be set to I<1>, and that you perform transactions
306 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
307 to I<1> if you do not do explicitly set it to zero.  This is the default
308 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
309
310 =head3 DBIx::Class specific connection attributes
311
312 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
313 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
314 the following connection options. These options can be mixed in with your other
315 L<DBI> connection attributes, or placed in a separate hashref
316 (C<\%extra_attributes>) as shown above.
317
318 Every time C<connect_info> is invoked, any previous settings for
319 these options will be cleared before setting the new ones, regardless of
320 whether any options are specified in the new C<connect_info>.
321
322
323 =over
324
325 =item on_connect_do
326
327 Specifies things to do immediately after connecting or re-connecting to
328 the database.  Its value may contain:
329
330 =over
331
332 =item a scalar
333
334 This contains one SQL statement to execute.
335
336 =item an array reference
337
338 This contains SQL statements to execute in order.  Each element contains
339 a string or a code reference that returns a string.
340
341 =item a code reference
342
343 This contains some code to execute.  Unlike code references within an
344 array reference, its return value is ignored.
345
346 =back
347
348 =item on_disconnect_do
349
350 Takes arguments in the same form as L</on_connect_do> and executes them
351 immediately before disconnecting from the database.
352
353 Note, this only runs if you explicitly call L</disconnect> on the
354 storage object.
355
356 =item on_connect_call
357
358 A more generalized form of L</on_connect_do> that calls the specified
359 C<connect_call_METHOD> methods in your storage driver.
360
361   on_connect_do => 'select 1'
362
363 is equivalent to:
364
365   on_connect_call => [ [ do_sql => 'select 1' ] ]
366
367 Its values may contain:
368
369 =over
370
371 =item a scalar
372
373 Will call the C<connect_call_METHOD> method.
374
375 =item a code reference
376
377 Will execute C<< $code->($storage) >>
378
379 =item an array reference
380
381 Each value can be a method name or code reference.
382
383 =item an array of arrays
384
385 For each array, the first item is taken to be the C<connect_call_> method name
386 or code reference, and the rest are parameters to it.
387
388 =back
389
390 Some predefined storage methods you may use:
391
392 =over
393
394 =item do_sql
395
396 Executes a SQL string or a code reference that returns a SQL string. This is
397 what L</on_connect_do> and L</on_disconnect_do> use.
398
399 It can take:
400
401 =over
402
403 =item a scalar
404
405 Will execute the scalar as SQL.
406
407 =item an arrayref
408
409 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
410 attributes hashref and bind values.
411
412 =item a code reference
413
414 Will execute C<< $code->($storage) >> and execute the return array refs as
415 above.
416
417 =back
418
419 =item datetime_setup
420
421 Execute any statements necessary to initialize the database session to return
422 and accept datetime/timestamp values used with
423 L<DBIx::Class::InflateColumn::DateTime>.
424
425 Only necessary for some databases, see your specific storage driver for
426 implementation details.
427
428 =back
429
430 =item on_disconnect_call
431
432 Takes arguments in the same form as L</on_connect_call> and executes them
433 immediately before disconnecting from the database.
434
435 Calls the C<disconnect_call_METHOD> methods as opposed to the
436 C<connect_call_METHOD> methods called by L</on_connect_call>.
437
438 Note, this only runs if you explicitly call L</disconnect> on the
439 storage object.
440
441 =item disable_sth_caching
442
443 If set to a true value, this option will disable the caching of
444 statement handles via L<DBI/prepare_cached>.
445
446 =item limit_dialect
447
448 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
449 default L</sql_limit_dialect> setting of the storage (if any). For a list
450 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
451
452 =item quote_char
453
454 Specifies what characters to use to quote table and column names.
455
456 C<quote_char> expects either a single character, in which case is it
457 is placed on either side of the table/column name, or an arrayref of length
458 2 in which case the table/column name is placed between the elements.
459
460 For example under MySQL you should use C<< quote_char => '`' >>, and for
461 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
462
463 =item name_sep
464
465 This parameter is only useful in conjunction with C<quote_char>, and is used to
466 specify the character that separates elements (schemas, tables, columns) from
467 each other. If unspecified it defaults to the most commonly used C<.>.
468
469 =item unsafe
470
471 This Storage driver normally installs its own C<HandleError>, sets
472 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
473 all database handles, including those supplied by a coderef.  It does this
474 so that it can have consistent and useful error behavior.
475
476 If you set this option to a true value, Storage will not do its usual
477 modifications to the database handle's attributes, and instead relies on
478 the settings in your connect_info DBI options (or the values you set in
479 your connection coderef, in the case that you are connecting via coderef).
480
481 Note that your custom settings can cause Storage to malfunction,
482 especially if you set a C<HandleError> handler that suppresses exceptions
483 and/or disable C<RaiseError>.
484
485 =item auto_savepoint
486
487 If this option is true, L<DBIx::Class> will use savepoints when nesting
488 transactions, making it possible to recover from failure in the inner
489 transaction without having to abort all outer transactions.
490
491 =item cursor_class
492
493 Use this argument to supply a cursor class other than the default
494 L<DBIx::Class::Storage::DBI::Cursor>.
495
496 =back
497
498 Some real-life examples of arguments to L</connect_info> and
499 L<DBIx::Class::Schema/connect>
500
501   # Simple SQLite connection
502   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
503
504   # Connect via subref
505   ->connect_info([ sub { DBI->connect(...) } ]);
506
507   # Connect via subref in hashref
508   ->connect_info([{
509     dbh_maker => sub { DBI->connect(...) },
510     on_connect_do => 'alter session ...',
511   }]);
512
513   # A bit more complicated
514   ->connect_info(
515     [
516       'dbi:Pg:dbname=foo',
517       'postgres',
518       'my_pg_password',
519       { AutoCommit => 1 },
520       { quote_char => q{"} },
521     ]
522   );
523
524   # Equivalent to the previous example
525   ->connect_info(
526     [
527       'dbi:Pg:dbname=foo',
528       'postgres',
529       'my_pg_password',
530       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
531     ]
532   );
533
534   # Same, but with hashref as argument
535   # See parse_connect_info for explanation
536   ->connect_info(
537     [{
538       dsn         => 'dbi:Pg:dbname=foo',
539       user        => 'postgres',
540       password    => 'my_pg_password',
541       AutoCommit  => 1,
542       quote_char  => q{"},
543       name_sep    => q{.},
544     }]
545   );
546
547   # Subref + DBIx::Class-specific connection options
548   ->connect_info(
549     [
550       sub { DBI->connect(...) },
551       {
552           quote_char => q{`},
553           name_sep => q{@},
554           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
555           disable_sth_caching => 1,
556       },
557     ]
558   );
559
560
561
562 =cut
563
564 sub connect_info {
565   my ($self, $info) = @_;
566
567   return $self->_connect_info if !$info;
568
569   $self->_connect_info($info); # copy for _connect_info
570
571   $info = $self->_normalize_connect_info($info)
572     if ref $info eq 'ARRAY';
573
574   for my $storage_opt (keys %{ $info->{storage_options} }) {
575     my $value = $info->{storage_options}{$storage_opt};
576
577     $self->$storage_opt($value);
578   }
579
580   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
581   #  the new set of options
582   $self->_sql_maker(undef);
583   $self->_sql_maker_opts({});
584
585   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
586     my $value = $info->{sql_maker_options}{$sql_maker_opt};
587
588     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
589   }
590
591   my %attrs = (
592     %{ $self->_default_dbi_connect_attributes || {} },
593     %{ $info->{attributes} || {} },
594   );
595
596   my @args = @{ $info->{arguments} };
597
598   $self->_dbi_connect_info([@args,
599     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
600
601   # FIXME - dirty:
602   # save attributes them in a separate accessor so they are always
603   # introspectable, even in case of a CODE $dbhmaker
604   $self->_dbic_connect_attributes (\%attrs);
605
606   return $self->_connect_info;
607 }
608
609 sub _normalize_connect_info {
610   my ($self, $info_arg) = @_;
611   my %info;
612
613   my @args = @$info_arg;  # take a shallow copy for further mutilation
614
615   # combine/pre-parse arguments depending on invocation style
616
617   my %attrs;
618   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
619     %attrs = %{ $args[1] || {} };
620     @args = $args[0];
621   }
622   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
623     %attrs = %{$args[0]};
624     @args = ();
625     if (my $code = delete $attrs{dbh_maker}) {
626       @args = $code;
627
628       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
629       if (@ignored) {
630         carp sprintf (
631             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
632           . "to the result of 'dbh_maker'",
633
634           join (', ', map { "'$_'" } (@ignored) ),
635         );
636       }
637     }
638     else {
639       @args = delete @attrs{qw/dsn user password/};
640     }
641   }
642   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
643     %attrs = (
644       % { $args[3] || {} },
645       % { $args[4] || {} },
646     );
647     @args = @args[0,1,2];
648   }
649
650   $info{arguments} = \@args;
651
652   my @storage_opts = grep exists $attrs{$_},
653     @storage_options, 'cursor_class';
654
655   @{ $info{storage_options} }{@storage_opts} =
656     delete @attrs{@storage_opts} if @storage_opts;
657
658   my @sql_maker_opts = grep exists $attrs{$_},
659     qw/limit_dialect quote_char name_sep/;
660
661   @{ $info{sql_maker_options} }{@sql_maker_opts} =
662     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
663
664   $info{attributes} = \%attrs if %attrs;
665
666   return \%info;
667 }
668
669 sub _default_dbi_connect_attributes {
670   return {
671     AutoCommit => 1,
672     RaiseError => 1,
673     PrintError => 0,
674   };
675 }
676
677 =head2 on_connect_do
678
679 This method is deprecated in favour of setting via L</connect_info>.
680
681 =cut
682
683 =head2 on_disconnect_do
684
685 This method is deprecated in favour of setting via L</connect_info>.
686
687 =cut
688
689 sub _parse_connect_do {
690   my ($self, $type) = @_;
691
692   my $val = $self->$type;
693   return () if not defined $val;
694
695   my @res;
696
697   if (not ref($val)) {
698     push @res, [ 'do_sql', $val ];
699   } elsif (ref($val) eq 'CODE') {
700     push @res, $val;
701   } elsif (ref($val) eq 'ARRAY') {
702     push @res, map { [ 'do_sql', $_ ] } @$val;
703   } else {
704     $self->throw_exception("Invalid type for $type: ".ref($val));
705   }
706
707   return \@res;
708 }
709
710 =head2 dbh_do
711
712 Arguments: ($subref | $method_name), @extra_coderef_args?
713
714 Execute the given $subref or $method_name using the new exception-based
715 connection management.
716
717 The first two arguments will be the storage object that C<dbh_do> was called
718 on and a database handle to use.  Any additional arguments will be passed
719 verbatim to the called subref as arguments 2 and onwards.
720
721 Using this (instead of $self->_dbh or $self->dbh) ensures correct
722 exception handling and reconnection (or failover in future subclasses).
723
724 Your subref should have no side-effects outside of the database, as
725 there is the potential for your subref to be partially double-executed
726 if the database connection was stale/dysfunctional.
727
728 Example:
729
730   my @stuff = $schema->storage->dbh_do(
731     sub {
732       my ($storage, $dbh, @cols) = @_;
733       my $cols = join(q{, }, @cols);
734       $dbh->selectrow_array("SELECT $cols FROM foo");
735     },
736     @column_list
737   );
738
739 =cut
740
741 sub dbh_do {
742   my $self = shift;
743   my $code = shift;
744
745   my $dbh = $self->_get_dbh;
746
747   return $self->$code($dbh, @_)
748     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
749
750   local $self->{_in_dbh_do} = 1;
751
752   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
753   my $args = \@_;
754   return try {
755     $self->$code ($dbh, @$args);
756   } catch {
757     $self->throw_exception($_) if $self->connected;
758
759     # We were not connected - reconnect and retry, but let any
760     #  exception fall right through this time
761     carp "Retrying $code after catching disconnected exception: $_"
762       if $ENV{DBIC_DBIRETRY_DEBUG};
763
764     $self->_populate_dbh;
765     $self->$code($self->_dbh, @$args);
766   };
767 }
768
769 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
770 # It also informs dbh_do to bypass itself while under the direction of txn_do,
771 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
772 sub txn_do {
773   my $self = shift;
774   my $coderef = shift;
775
776   ref $coderef eq 'CODE' or $self->throw_exception
777     ('$coderef must be a CODE reference');
778
779   local $self->{_in_dbh_do} = 1;
780
781   my @result;
782   my $want = wantarray;
783
784   my $tried = 0;
785   while(1) {
786     my $exception;
787
788     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
789     my $args = \@_;
790
791     try {
792       $self->txn_begin;
793       my $txn_start_depth = $self->transaction_depth;
794       if($want) {
795           @result = $coderef->(@$args);
796       }
797       elsif(defined $want) {
798           $result[0] = $coderef->(@$args);
799       }
800       else {
801           $coderef->(@$args);
802       }
803
804       my $delta_txn = $txn_start_depth - $self->transaction_depth;
805       if ($delta_txn == 0) {
806         $self->txn_commit;
807       }
808       elsif ($delta_txn != 1) {
809         # an off-by-one would mean we fired a rollback
810         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
811       }
812     } catch {
813       $exception = $_;
814     };
815
816     if(! defined $exception) { return wantarray ? @result : $result[0] }
817
818     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
819       my $rollback_exception;
820       try { $self->txn_rollback } catch { $rollback_exception = shift };
821       if(defined $rollback_exception) {
822         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
823         $self->throw_exception($exception)  # propagate nested rollback
824           if $rollback_exception =~ /$exception_class/;
825
826         $self->throw_exception(
827           "Transaction aborted: ${exception}. "
828           . "Rollback failed: ${rollback_exception}"
829         );
830       }
831       $self->throw_exception($exception)
832     }
833
834     # We were not connected, and was first try - reconnect and retry
835     # via the while loop
836     carp "Retrying $coderef after catching disconnected exception: $exception"
837       if $ENV{DBIC_TXNRETRY_DEBUG};
838     $self->_populate_dbh;
839   }
840 }
841
842 =head2 disconnect
843
844 Our C<disconnect> method also performs a rollback first if the
845 database is not in C<AutoCommit> mode.
846
847 =cut
848
849 sub disconnect {
850   my ($self) = @_;
851
852   if( $self->_dbh ) {
853     my @actions;
854
855     push @actions, ( $self->on_disconnect_call || () );
856     push @actions, $self->_parse_connect_do ('on_disconnect_do');
857
858     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
859
860     $self->_dbh_rollback unless $self->_dbh_autocommit;
861
862     %{ $self->_dbh->{CachedKids} } = ();
863     $self->_dbh->disconnect;
864     $self->_dbh(undef);
865     $self->{_dbh_gen}++;
866   }
867 }
868
869 =head2 with_deferred_fk_checks
870
871 =over 4
872
873 =item Arguments: C<$coderef>
874
875 =item Return Value: The return value of $coderef
876
877 =back
878
879 Storage specific method to run the code ref with FK checks deferred or
880 in MySQL's case disabled entirely.
881
882 =cut
883
884 # Storage subclasses should override this
885 sub with_deferred_fk_checks {
886   my ($self, $sub) = @_;
887   $sub->();
888 }
889
890 =head2 connected
891
892 =over
893
894 =item Arguments: none
895
896 =item Return Value: 1|0
897
898 =back
899
900 Verifies that the current database handle is active and ready to execute
901 an SQL statement (e.g. the connection did not get stale, server is still
902 answering, etc.) This method is used internally by L</dbh>.
903
904 =cut
905
906 sub connected {
907   my $self = shift;
908   return 0 unless $self->_seems_connected;
909
910   #be on the safe side
911   local $self->_dbh->{RaiseError} = 1;
912
913   return $self->_ping;
914 }
915
916 sub _seems_connected {
917   my $self = shift;
918
919   $self->_verify_pid;
920
921   my $dbh = $self->_dbh
922     or return 0;
923
924   return $dbh->FETCH('Active');
925 }
926
927 sub _ping {
928   my $self = shift;
929
930   my $dbh = $self->_dbh or return 0;
931
932   return $dbh->ping;
933 }
934
935 sub ensure_connected {
936   my ($self) = @_;
937
938   unless ($self->connected) {
939     $self->_populate_dbh;
940   }
941 }
942
943 =head2 dbh
944
945 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
946 is guaranteed to be healthy by implicitly calling L</connected>, and if
947 necessary performing a reconnection before returning. Keep in mind that this
948 is very B<expensive> on some database engines. Consider using L</dbh_do>
949 instead.
950
951 =cut
952
953 sub dbh {
954   my ($self) = @_;
955
956   if (not $self->_dbh) {
957     $self->_populate_dbh;
958   } else {
959     $self->ensure_connected;
960   }
961   return $self->_dbh;
962 }
963
964 # this is the internal "get dbh or connect (don't check)" method
965 sub _get_dbh {
966   my $self = shift;
967   $self->_verify_pid;
968   $self->_populate_dbh unless $self->_dbh;
969   return $self->_dbh;
970 }
971
972 sub sql_maker {
973   my ($self) = @_;
974   unless ($self->_sql_maker) {
975     my $sql_maker_class = $self->sql_maker_class;
976     $self->ensure_class_loaded ($sql_maker_class);
977
978     my %opts = %{$self->_sql_maker_opts||{}};
979     my $dialect =
980       $opts{limit_dialect}
981         ||
982       $self->sql_limit_dialect
983         ||
984       do {
985         my $s_class = (ref $self) || $self;
986         carp (
987           "Your storage class ($s_class) does not set sql_limit_dialect and you "
988         . 'have not supplied an explicit limit_dialect in your connection_info. '
989         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
990         . 'databases but can be (and often is) painfully slow.'
991         );
992
993         'GenericSubQ';
994       }
995     ;
996
997     $self->_sql_maker($sql_maker_class->new(
998       bindtype=>'columns',
999       array_datatypes => 1,
1000       limit_dialect => $dialect,
1001       name_sep => '.',
1002       %opts,
1003     ));
1004   }
1005   return $self->_sql_maker;
1006 }
1007
1008 # nothing to do by default
1009 sub _rebless {}
1010 sub _init {}
1011
1012 sub _populate_dbh {
1013   my ($self) = @_;
1014
1015   my @info = @{$self->_dbi_connect_info || []};
1016   $self->_dbh(undef); # in case ->connected failed we might get sent here
1017   $self->_dbh_details({}); # reset everything we know
1018
1019   $self->_dbh($self->_connect(@info));
1020
1021   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1022
1023   $self->_determine_driver;
1024
1025   # Always set the transaction depth on connect, since
1026   #  there is no transaction in progress by definition
1027   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1028
1029   $self->_run_connection_actions unless $self->{_in_determine_driver};
1030 }
1031
1032 sub _run_connection_actions {
1033   my $self = shift;
1034   my @actions;
1035
1036   push @actions, ( $self->on_connect_call || () );
1037   push @actions, $self->_parse_connect_do ('on_connect_do');
1038
1039   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1040 }
1041
1042
1043
1044 sub set_use_dbms_capability {
1045   $_[0]->set_inherited ($_[1], $_[2]);
1046 }
1047
1048 sub get_use_dbms_capability {
1049   my ($self, $capname) = @_;
1050
1051   my $use = $self->get_inherited ($capname);
1052   return defined $use
1053     ? $use
1054     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1055   ;
1056 }
1057
1058 sub set_dbms_capability {
1059   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1060 }
1061
1062 sub get_dbms_capability {
1063   my ($self, $capname) = @_;
1064
1065   my $cap = $self->_dbh_details->{capability}{$capname};
1066
1067   unless (defined $cap) {
1068     if (my $meth = $self->can ("_determine$capname")) {
1069       $cap = $self->$meth ? 1 : 0;
1070     }
1071     else {
1072       $cap = 0;
1073     }
1074
1075     $self->set_dbms_capability ($capname, $cap);
1076   }
1077
1078   return $cap;
1079 }
1080
1081 sub _server_info {
1082   my $self = shift;
1083
1084   my $info;
1085   unless ($info = $self->_dbh_details->{info}) {
1086
1087     $info = {};
1088
1089     my $server_version = try { $self->_get_server_version };
1090
1091     if (defined $server_version) {
1092       $info->{dbms_version} = $server_version;
1093
1094       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1095       my @verparts = split (/\./, $numeric_version);
1096       if (
1097         @verparts
1098           &&
1099         $verparts[0] <= 999
1100       ) {
1101         # consider only up to 3 version parts, iff not more than 3 digits
1102         my @use_parts;
1103         while (@verparts && @use_parts < 3) {
1104           my $p = shift @verparts;
1105           last if $p > 999;
1106           push @use_parts, $p;
1107         }
1108         push @use_parts, 0 while @use_parts < 3;
1109
1110         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1111       }
1112     }
1113
1114     $self->_dbh_details->{info} = $info;
1115   }
1116
1117   return $info;
1118 }
1119
1120 sub _get_server_version {
1121   shift->_get_dbh->get_info(18);
1122 }
1123
1124 sub _determine_driver {
1125   my ($self) = @_;
1126
1127   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1128     my $started_connected = 0;
1129     local $self->{_in_determine_driver} = 1;
1130
1131     if (ref($self) eq __PACKAGE__) {
1132       my $driver;
1133       if ($self->_dbh) { # we are connected
1134         $driver = $self->_dbh->{Driver}{Name};
1135         $started_connected = 1;
1136       } else {
1137         # if connect_info is a CODEREF, we have no choice but to connect
1138         if (ref $self->_dbi_connect_info->[0] &&
1139             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1140           $self->_populate_dbh;
1141           $driver = $self->_dbh->{Driver}{Name};
1142         }
1143         else {
1144           # try to use dsn to not require being connected, the driver may still
1145           # force a connection in _rebless to determine version
1146           # (dsn may not be supplied at all if all we do is make a mock-schema)
1147           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1148           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1149           $driver ||= $ENV{DBI_DRIVER};
1150         }
1151       }
1152
1153       if ($driver) {
1154         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1155         if ($self->load_optional_class($storage_class)) {
1156           mro::set_mro($storage_class, 'c3');
1157           bless $self, $storage_class;
1158           $self->_rebless();
1159         }
1160       }
1161     }
1162
1163     $self->_driver_determined(1);
1164
1165     $self->_init; # run driver-specific initializations
1166
1167     $self->_run_connection_actions
1168         if !$started_connected && defined $self->_dbh;
1169   }
1170 }
1171
1172 sub _do_connection_actions {
1173   my $self          = shift;
1174   my $method_prefix = shift;
1175   my $call          = shift;
1176
1177   if (not ref($call)) {
1178     my $method = $method_prefix . $call;
1179     $self->$method(@_);
1180   } elsif (ref($call) eq 'CODE') {
1181     $self->$call(@_);
1182   } elsif (ref($call) eq 'ARRAY') {
1183     if (ref($call->[0]) ne 'ARRAY') {
1184       $self->_do_connection_actions($method_prefix, $_) for @$call;
1185     } else {
1186       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1187     }
1188   } else {
1189     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1190   }
1191
1192   return $self;
1193 }
1194
1195 sub connect_call_do_sql {
1196   my $self = shift;
1197   $self->_do_query(@_);
1198 }
1199
1200 sub disconnect_call_do_sql {
1201   my $self = shift;
1202   $self->_do_query(@_);
1203 }
1204
1205 # override in db-specific backend when necessary
1206 sub connect_call_datetime_setup { 1 }
1207
1208 sub _do_query {
1209   my ($self, $action) = @_;
1210
1211   if (ref $action eq 'CODE') {
1212     $action = $action->($self);
1213     $self->_do_query($_) foreach @$action;
1214   }
1215   else {
1216     # Most debuggers expect ($sql, @bind), so we need to exclude
1217     # the attribute hash which is the second argument to $dbh->do
1218     # furthermore the bind values are usually to be presented
1219     # as named arrayref pairs, so wrap those here too
1220     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1221     my $sql = shift @do_args;
1222     my $attrs = shift @do_args;
1223     my @bind = map { [ undef, $_ ] } @do_args;
1224
1225     $self->_query_start($sql, @bind);
1226     $self->_get_dbh->do($sql, $attrs, @do_args);
1227     $self->_query_end($sql, @bind);
1228   }
1229
1230   return $self;
1231 }
1232
1233 sub _connect {
1234   my ($self, @info) = @_;
1235
1236   $self->throw_exception("You failed to provide any connection info")
1237     if !@info;
1238
1239   my ($old_connect_via, $dbh);
1240
1241   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1242     $old_connect_via = $DBI::connect_via;
1243     $DBI::connect_via = 'connect';
1244   }
1245
1246   try {
1247     if(ref $info[0] eq 'CODE') {
1248        $dbh = $info[0]->();
1249     }
1250     else {
1251        $dbh = DBI->connect(@info);
1252     }
1253
1254     if (!$dbh) {
1255       die $DBI::errstr;
1256     }
1257
1258     unless ($self->unsafe) {
1259
1260       # this odd anonymous coderef dereference is in fact really
1261       # necessary to avoid the unwanted effect described in perl5
1262       # RT#75792
1263       sub {
1264         my $weak_self = $_[0];
1265         weaken $weak_self;
1266
1267         $_[1]->{HandleError} = sub {
1268           if ($weak_self) {
1269             $weak_self->throw_exception("DBI Exception: $_[0]");
1270           }
1271           else {
1272             # the handler may be invoked by something totally out of
1273             # the scope of DBIC
1274             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1275           }
1276         };
1277       }->($self, $dbh);
1278
1279       $dbh->{ShowErrorStatement} = 1;
1280       $dbh->{RaiseError} = 1;
1281       $dbh->{PrintError} = 0;
1282     }
1283   }
1284   catch {
1285     $self->throw_exception("DBI Connection failed: $_")
1286   }
1287   finally {
1288     $DBI::connect_via = $old_connect_via if $old_connect_via;
1289   };
1290
1291   $self->_dbh_autocommit($dbh->{AutoCommit});
1292   $dbh;
1293 }
1294
1295 sub svp_begin {
1296   my ($self, $name) = @_;
1297
1298   $name = $self->_svp_generate_name
1299     unless defined $name;
1300
1301   $self->throw_exception ("You can't use savepoints outside a transaction")
1302     if $self->{transaction_depth} == 0;
1303
1304   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1305     unless $self->can('_svp_begin');
1306
1307   push @{ $self->{savepoints} }, $name;
1308
1309   $self->debugobj->svp_begin($name) if $self->debug;
1310
1311   return $self->_svp_begin($name);
1312 }
1313
1314 sub svp_release {
1315   my ($self, $name) = @_;
1316
1317   $self->throw_exception ("You can't use savepoints outside a transaction")
1318     if $self->{transaction_depth} == 0;
1319
1320   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1321     unless $self->can('_svp_release');
1322
1323   if (defined $name) {
1324     $self->throw_exception ("Savepoint '$name' does not exist")
1325       unless grep { $_ eq $name } @{ $self->{savepoints} };
1326
1327     # Dig through the stack until we find the one we are releasing.  This keeps
1328     # the stack up to date.
1329     my $svp;
1330
1331     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1332   } else {
1333     $name = pop @{ $self->{savepoints} };
1334   }
1335
1336   $self->debugobj->svp_release($name) if $self->debug;
1337
1338   return $self->_svp_release($name);
1339 }
1340
1341 sub svp_rollback {
1342   my ($self, $name) = @_;
1343
1344   $self->throw_exception ("You can't use savepoints outside a transaction")
1345     if $self->{transaction_depth} == 0;
1346
1347   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1348     unless $self->can('_svp_rollback');
1349
1350   if (defined $name) {
1351       # If they passed us a name, verify that it exists in the stack
1352       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1353           $self->throw_exception("Savepoint '$name' does not exist!");
1354       }
1355
1356       # Dig through the stack until we find the one we are releasing.  This keeps
1357       # the stack up to date.
1358       while(my $s = pop(@{ $self->{savepoints} })) {
1359           last if($s eq $name);
1360       }
1361       # Add the savepoint back to the stack, as a rollback doesn't remove the
1362       # named savepoint, only everything after it.
1363       push(@{ $self->{savepoints} }, $name);
1364   } else {
1365       # We'll assume they want to rollback to the last savepoint
1366       $name = $self->{savepoints}->[-1];
1367   }
1368
1369   $self->debugobj->svp_rollback($name) if $self->debug;
1370
1371   return $self->_svp_rollback($name);
1372 }
1373
1374 sub _svp_generate_name {
1375   my ($self) = @_;
1376   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1377 }
1378
1379 sub txn_begin {
1380   my $self = shift;
1381
1382   # this means we have not yet connected and do not know the AC status
1383   # (e.g. coderef $dbh)
1384   if (! defined $self->_dbh_autocommit) {
1385     $self->ensure_connected;
1386   }
1387   # otherwise re-connect on pid changes, so
1388   # that the txn_depth is adjusted properly
1389   # the lightweight _get_dbh is good enoug here
1390   # (only superficial handle check, no pings)
1391   else {
1392     $self->_get_dbh;
1393   }
1394
1395   if($self->transaction_depth == 0) {
1396     $self->debugobj->txn_begin()
1397       if $self->debug;
1398     $self->_dbh_begin_work;
1399   }
1400   elsif ($self->auto_savepoint) {
1401     $self->svp_begin;
1402   }
1403   $self->{transaction_depth}++;
1404 }
1405
1406 sub _dbh_begin_work {
1407   my $self = shift;
1408
1409   # if the user is utilizing txn_do - good for him, otherwise we need to
1410   # ensure that the $dbh is healthy on BEGIN.
1411   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1412   # will be replaced by a failure of begin_work itself (which will be
1413   # then retried on reconnect)
1414   if ($self->{_in_dbh_do}) {
1415     $self->_dbh->begin_work;
1416   } else {
1417     $self->dbh_do(sub { $_[1]->begin_work });
1418   }
1419 }
1420
1421 sub txn_commit {
1422   my $self = shift;
1423   if (! $self->_dbh) {
1424     $self->throw_exception('cannot COMMIT on a disconnected handle');
1425   }
1426   elsif ($self->{transaction_depth} == 1) {
1427     $self->debugobj->txn_commit()
1428       if ($self->debug);
1429     $self->_dbh_commit;
1430     $self->{transaction_depth} = 0
1431       if $self->_dbh_autocommit;
1432   }
1433   elsif($self->{transaction_depth} > 1) {
1434     $self->{transaction_depth}--;
1435     $self->svp_release
1436       if $self->auto_savepoint;
1437   }
1438   elsif (! $self->_dbh->FETCH('AutoCommit') ) {
1439
1440     carp "Storage transaction_depth $self->{transaction_depth} does not match "
1441         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1442
1443     $self->debugobj->txn_commit()
1444       if ($self->debug);
1445     $self->_dbh_commit;
1446     $self->{transaction_depth} = 0
1447       if $self->_dbh_autocommit;
1448   }
1449   else {
1450     $self->throw_exception( 'Refusing to commit without a started transaction' );
1451   }
1452 }
1453
1454 sub _dbh_commit {
1455   my $self = shift;
1456   my $dbh  = $self->_dbh
1457     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1458   $dbh->commit;
1459 }
1460
1461 sub txn_rollback {
1462   my $self = shift;
1463   my $dbh = $self->_dbh;
1464   try {
1465     if ($self->{transaction_depth} == 1) {
1466       $self->debugobj->txn_rollback()
1467         if ($self->debug);
1468       $self->{transaction_depth} = 0
1469         if $self->_dbh_autocommit;
1470       $self->_dbh_rollback;
1471     }
1472     elsif($self->{transaction_depth} > 1) {
1473       $self->{transaction_depth}--;
1474       if ($self->auto_savepoint) {
1475         $self->svp_rollback;
1476         $self->svp_release;
1477       }
1478     }
1479     else {
1480       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1481     }
1482   }
1483   catch {
1484     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1485
1486     if ($_ !~ /$exception_class/) {
1487       # ensure that a failed rollback resets the transaction depth
1488       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1489     }
1490
1491     $self->throw_exception($_)
1492   };
1493 }
1494
1495 sub _dbh_rollback {
1496   my $self = shift;
1497   my $dbh  = $self->_dbh
1498     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1499   $dbh->rollback;
1500 }
1501
1502 # This used to be the top-half of _execute.  It was split out to make it
1503 #  easier to override in NoBindVars without duping the rest.  It takes up
1504 #  all of _execute's args, and emits $sql, @bind.
1505 sub _prep_for_execute {
1506   my ($self, $op, $extra_bind, $ident, $args) = @_;
1507
1508   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1509     $ident = $ident->from();
1510   }
1511
1512   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1513
1514   unshift(@bind,
1515     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1516       if $extra_bind;
1517   return ($sql, \@bind);
1518 }
1519
1520
1521 sub _fix_bind_params {
1522     my ($self, @bind) = @_;
1523
1524     ### Turn @bind from something like this:
1525     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1526     ### to this:
1527     ###   ( "'1'", "'1'", "'3'" )
1528     return
1529         map {
1530             if ( defined( $_ && $_->[1] ) ) {
1531                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1532             }
1533             else { q{NULL}; }
1534         } @bind;
1535 }
1536
1537 sub _query_start {
1538     my ( $self, $sql, @bind ) = @_;
1539
1540     if ( $self->debug ) {
1541         @bind = $self->_fix_bind_params(@bind);
1542
1543         $self->debugobj->query_start( $sql, @bind );
1544     }
1545 }
1546
1547 sub _query_end {
1548     my ( $self, $sql, @bind ) = @_;
1549
1550     if ( $self->debug ) {
1551         @bind = $self->_fix_bind_params(@bind);
1552         $self->debugobj->query_end( $sql, @bind );
1553     }
1554 }
1555
1556 sub _dbh_execute {
1557   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1558
1559   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1560
1561   $self->_query_start( $sql, @$bind );
1562
1563   my $sth = $self->sth($sql,$op);
1564
1565   my $placeholder_index = 1;
1566
1567   foreach my $bound (@$bind) {
1568     my $attributes = {};
1569     my($column_name, @data) = @$bound;
1570
1571     if ($bind_attributes) {
1572       $attributes = $bind_attributes->{$column_name}
1573       if defined $bind_attributes->{$column_name};
1574     }
1575
1576     foreach my $data (@data) {
1577       my $ref = ref $data;
1578
1579       if ($ref and overload::Method($data, '""') ) {
1580         $data = "$data";
1581       }
1582       elsif ($ref eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1583         $sth->bind_param_inout(
1584           $placeholder_index++,
1585           $data,
1586           $self->_max_column_bytesize($ident, $column_name),
1587           $attributes
1588         );
1589         next;
1590       }
1591
1592       $sth->bind_param($placeholder_index++, $data, $attributes);
1593     }
1594   }
1595
1596   # Can this fail without throwing an exception anyways???
1597   my $rv = $sth->execute();
1598   $self->throw_exception(
1599     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1600   ) if !$rv;
1601
1602   $self->_query_end( $sql, @$bind );
1603
1604   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1605 }
1606
1607 sub _execute {
1608     my $self = shift;
1609     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1610 }
1611
1612 sub _prefetch_autovalues {
1613   my ($self, $source, $to_insert) = @_;
1614
1615   my $colinfo = $source->columns_info;
1616
1617   my %values;
1618   for my $col (keys %$colinfo) {
1619     if (
1620       $colinfo->{$col}{auto_nextval}
1621         and
1622       (
1623         ! exists $to_insert->{$col}
1624           or
1625         ref $to_insert->{$col} eq 'SCALAR'
1626       )
1627     ) {
1628       $values{$col} = $self->_sequence_fetch(
1629         'NEXTVAL',
1630         ( $colinfo->{$col}{sequence} ||=
1631             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1632         ),
1633       );
1634     }
1635   }
1636
1637   \%values;
1638 }
1639
1640 sub insert {
1641   my ($self, $source, $to_insert) = @_;
1642
1643   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1644
1645   # fuse the values
1646   $to_insert = { %$to_insert, %$prefetched_values };
1647
1648   # list of primary keys we try to fetch from the database
1649   # both not-exsists and scalarrefs are considered
1650   my %fetch_pks;
1651   for ($source->primary_columns) {
1652     $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
1653       if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
1654   }
1655
1656   my ($sqla_opts, @ir_container);
1657   if ($self->_use_insert_returning) {
1658
1659     # retain order as declared in the resultsource
1660     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1661       push @{$sqla_opts->{returning}}, $_;
1662       $sqla_opts->{returning_container} = \@ir_container
1663         if $self->_use_insert_returning_bound;
1664     }
1665   }
1666
1667   my $bind_attributes = $self->source_bind_attributes($source);
1668
1669   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1670
1671   my %returned_cols;
1672
1673   if (my $retlist = $sqla_opts->{returning}) {
1674     @ir_container = try {
1675       local $SIG{__WARN__} = sub {};
1676       my @r = $sth->fetchrow_array;
1677       $sth->finish;
1678       @r;
1679     } unless @ir_container;
1680
1681     @returned_cols{@$retlist} = @ir_container if @ir_container;
1682   }
1683
1684   return { %$prefetched_values, %returned_cols };
1685 }
1686
1687
1688 ## Currently it is assumed that all values passed will be "normal", i.e. not
1689 ## scalar refs, or at least, all the same type as the first set, the statement is
1690 ## only prepped once.
1691 sub insert_bulk {
1692   my ($self, $source, $cols, $data) = @_;
1693
1694   my %colvalues;
1695   @colvalues{@$cols} = (0..$#$cols);
1696
1697   for my $i (0..$#$cols) {
1698     my $first_val = $data->[0][$i];
1699     next unless ref $first_val eq 'SCALAR';
1700
1701     $colvalues{ $cols->[$i] } = $first_val;
1702   }
1703
1704   # check for bad data and stringify stringifiable objects
1705   my $bad_slice = sub {
1706     my ($msg, $col_idx, $slice_idx) = @_;
1707     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1708       $msg,
1709       $cols->[$col_idx],
1710       do {
1711         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1712         Dumper {
1713           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1714         },
1715       }
1716     );
1717   };
1718
1719   for my $datum_idx (0..$#$data) {
1720     my $datum = $data->[$datum_idx];
1721
1722     for my $col_idx (0..$#$cols) {
1723       my $val            = $datum->[$col_idx];
1724       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1725       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1726
1727       if ($is_literal_sql) {
1728         if (not ref $val) {
1729           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1730         }
1731         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1732           $bad_slice->("$reftype reference found where literal SQL expected",
1733             $col_idx, $datum_idx);
1734         }
1735         elsif ($$val ne $$sqla_bind){
1736           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1737             $col_idx, $datum_idx);
1738         }
1739       }
1740       elsif (my $reftype = ref $val) {
1741         require overload;
1742         if (overload::Method($val, '""')) {
1743           $datum->[$col_idx] = "".$val;
1744         }
1745         else {
1746           $bad_slice->("$reftype reference found where bind expected",
1747             $col_idx, $datum_idx);
1748         }
1749       }
1750     }
1751   }
1752
1753   my ($sql, $bind) = $self->_prep_for_execute (
1754     'insert', undef, $source, [\%colvalues]
1755   );
1756
1757   if (! @$bind) {
1758     # if the bindlist is empty - make sure all "values" are in fact
1759     # literal scalarrefs. If not the case this means the storage ate
1760     # them away (e.g. the NoBindVars component) and interpolated them
1761     # directly into the SQL. This obviosly can't be good for multi-inserts
1762
1763     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1764       if first { ref $_ ne 'SCALAR' } values %colvalues;
1765   }
1766
1767   # neither _execute_array, nor _execute_inserts_with_no_binds are
1768   # atomic (even if _execute _array is a single call). Thus a safety
1769   # scope guard
1770   my $guard = $self->txn_scope_guard;
1771
1772   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1773   my $sth = $self->sth($sql);
1774   my $rv = do {
1775     if (@$bind) {
1776       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1777       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1778     }
1779     else {
1780       # bind_param_array doesn't work if there are no binds
1781       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1782     }
1783   };
1784
1785   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1786
1787   $guard->commit;
1788
1789   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1790 }
1791
1792 sub _execute_array {
1793   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1794
1795   ## This must be an arrayref, else nothing works!
1796   my $tuple_status = [];
1797
1798   ## Get the bind_attributes, if any exist
1799   my $bind_attributes = $self->source_bind_attributes($source);
1800
1801   ## Bind the values and execute
1802   my $placeholder_index = 1;
1803
1804   foreach my $bound (@$bind) {
1805
1806     my $attributes = {};
1807     my ($column_name, $data_index) = @$bound;
1808
1809     if( $bind_attributes ) {
1810       $attributes = $bind_attributes->{$column_name}
1811       if defined $bind_attributes->{$column_name};
1812     }
1813
1814     my @data = map { $_->[$data_index] } @$data;
1815
1816     $sth->bind_param_array(
1817       $placeholder_index,
1818       [@data],
1819       (%$attributes ?  $attributes : ()),
1820     );
1821     $placeholder_index++;
1822   }
1823
1824   my ($rv, $err);
1825   try {
1826     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1827   }
1828   catch {
1829     $err = shift;
1830   };
1831
1832   # Statement must finish even if there was an exception.
1833   try {
1834     $sth->finish
1835   }
1836   catch {
1837     $err = shift unless defined $err
1838   };
1839
1840   $err = $sth->errstr
1841     if (! defined $err and $sth->err);
1842
1843   if (defined $err) {
1844     my $i = 0;
1845     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1846
1847     $self->throw_exception("Unexpected populate error: $err")
1848       if ($i > $#$tuple_status);
1849
1850     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1851       ($tuple_status->[$i][1] || $err),
1852       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1853     );
1854   }
1855
1856   return $rv;
1857 }
1858
1859 sub _dbh_execute_array {
1860     my ($self, $sth, $tuple_status, @extra) = @_;
1861
1862     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1863 }
1864
1865 sub _dbh_execute_inserts_with_no_binds {
1866   my ($self, $sth, $count) = @_;
1867
1868   my $err;
1869   try {
1870     my $dbh = $self->_get_dbh;
1871     local $dbh->{RaiseError} = 1;
1872     local $dbh->{PrintError} = 0;
1873
1874     $sth->execute foreach 1..$count;
1875   }
1876   catch {
1877     $err = shift;
1878   };
1879
1880   # Make sure statement is finished even if there was an exception.
1881   try {
1882     $sth->finish
1883   }
1884   catch {
1885     $err = shift unless defined $err;
1886   };
1887
1888   $self->throw_exception($err) if defined $err;
1889
1890   return $count;
1891 }
1892
1893 sub update {
1894   my ($self, $source, @args) = @_;
1895
1896   my $bind_attrs = $self->source_bind_attributes($source);
1897
1898   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1899 }
1900
1901
1902 sub delete {
1903   my ($self, $source, @args) = @_;
1904
1905   my $bind_attrs = $self->source_bind_attributes($source);
1906
1907   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1908 }
1909
1910 # We were sent here because the $rs contains a complex search
1911 # which will require a subquery to select the correct rows
1912 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1913 #
1914 # Generating a single PK column subquery is trivial and supported
1915 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1916 # Look at _multipk_update_delete()
1917 sub _subq_update_delete {
1918   my $self = shift;
1919   my ($rs, $op, $values) = @_;
1920
1921   my $rsrc = $rs->result_source;
1922
1923   # quick check if we got a sane rs on our hands
1924   my @pcols = $rsrc->_pri_cols;
1925
1926   my $sel = $rs->_resolved_attrs->{select};
1927   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1928
1929   if (
1930       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1931         ne
1932       join ("\x00", sort @$sel )
1933   ) {
1934     $self->throw_exception (
1935       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1936     );
1937   }
1938
1939   if (@pcols == 1) {
1940     return $self->$op (
1941       $rsrc,
1942       $op eq 'update' ? $values : (),
1943       { $pcols[0] => { -in => $rs->as_query } },
1944     );
1945   }
1946
1947   else {
1948     return $self->_multipk_update_delete (@_);
1949   }
1950 }
1951
1952 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1953 # resultset update/delete involving subqueries. So by default resort
1954 # to simple (and inefficient) delete_all style per-row opearations,
1955 # while allowing specific storages to override this with a faster
1956 # implementation.
1957 #
1958 sub _multipk_update_delete {
1959   return shift->_per_row_update_delete (@_);
1960 }
1961
1962 # This is the default loop used to delete/update rows for multi PK
1963 # resultsets, and used by mysql exclusively (because it can't do anything
1964 # else).
1965 #
1966 # We do not use $row->$op style queries, because resultset update/delete
1967 # is not expected to cascade (this is what delete_all/update_all is for).
1968 #
1969 # There should be no race conditions as the entire operation is rolled
1970 # in a transaction.
1971 #
1972 sub _per_row_update_delete {
1973   my $self = shift;
1974   my ($rs, $op, $values) = @_;
1975
1976   my $rsrc = $rs->result_source;
1977   my @pcols = $rsrc->_pri_cols;
1978
1979   my $guard = $self->txn_scope_guard;
1980
1981   # emulate the return value of $sth->execute for non-selects
1982   my $row_cnt = '0E0';
1983
1984   my $subrs_cur = $rs->cursor;
1985   my @all_pk = $subrs_cur->all;
1986   for my $pks ( @all_pk) {
1987
1988     my $cond;
1989     for my $i (0.. $#pcols) {
1990       $cond->{$pcols[$i]} = $pks->[$i];
1991     }
1992
1993     $self->$op (
1994       $rsrc,
1995       $op eq 'update' ? $values : (),
1996       $cond,
1997     );
1998
1999     $row_cnt++;
2000   }
2001
2002   $guard->commit;
2003
2004   return $row_cnt;
2005 }
2006
2007 sub _select {
2008   my $self = shift;
2009   $self->_execute($self->_select_args(@_));
2010 }
2011
2012 sub _select_args_to_query {
2013   my $self = shift;
2014
2015   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
2016   #  = $self->_select_args($ident, $select, $cond, $attrs);
2017   my ($op, $bind, $ident, $bind_attrs, @args) =
2018     $self->_select_args(@_);
2019
2020   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2021   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
2022   $prepared_bind ||= [];
2023
2024   return wantarray
2025     ? ($sql, $prepared_bind, $bind_attrs)
2026     : \[ "($sql)", @$prepared_bind ]
2027   ;
2028 }
2029
2030 sub _select_args {
2031   my ($self, $ident, $select, $where, $attrs) = @_;
2032
2033   my $sql_maker = $self->sql_maker;
2034   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2035
2036   $attrs = {
2037     %$attrs,
2038     select => $select,
2039     from => $ident,
2040     where => $where,
2041     $rs_alias && $alias2source->{$rs_alias}
2042       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2043       : ()
2044     ,
2045   };
2046
2047   # calculate bind_attrs before possible $ident mangling
2048   my $bind_attrs = {};
2049   for my $alias (keys %$alias2source) {
2050     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2051     for my $col (keys %$bindtypes) {
2052
2053       my $fqcn = join ('.', $alias, $col);
2054       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2055
2056       # Unqialified column names are nice, but at the same time can be
2057       # rather ambiguous. What we do here is basically go along with
2058       # the loop, adding an unqualified column slot to $bind_attrs,
2059       # alongside the fully qualified name. As soon as we encounter
2060       # another column by that name (which would imply another table)
2061       # we unset the unqualified slot and never add any info to it
2062       # to avoid erroneous type binding. If this happens the users
2063       # only choice will be to fully qualify his column name
2064
2065       if (exists $bind_attrs->{$col}) {
2066         $bind_attrs->{$col} = {};
2067       }
2068       else {
2069         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2070       }
2071     }
2072   }
2073
2074   # Sanity check the attributes (SQLMaker does it too, but
2075   # in case of a software_limit we'll never reach there)
2076   if (defined $attrs->{offset}) {
2077     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2078       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2079   }
2080   $attrs->{offset} ||= 0;
2081
2082   if (defined $attrs->{rows}) {
2083     $self->throw_exception("The rows attribute must be a positive integer if present")
2084       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2085   }
2086   elsif ($attrs->{offset}) {
2087     # MySQL actually recommends this approach.  I cringe.
2088     $attrs->{rows} = $sql_maker->__max_int;
2089   }
2090
2091   my @limit;
2092
2093   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2094   # storage, unless software limit was requested
2095   if (
2096     #limited has_many
2097     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2098        ||
2099     # grouped prefetch (to satisfy group_by == select)
2100     ( $attrs->{group_by}
2101         &&
2102       @{$attrs->{group_by}}
2103         &&
2104       $attrs->{_prefetch_selector_range}
2105     )
2106   ) {
2107     ($ident, $select, $where, $attrs)
2108       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2109   }
2110   elsif (! $attrs->{software_limit} ) {
2111     push @limit, $attrs->{rows}, $attrs->{offset};
2112   }
2113
2114   # try to simplify the joinmap further (prune unreferenced type-single joins)
2115   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2116
2117 ###
2118   # This would be the point to deflate anything found in $where
2119   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2120   # expect a row object. And all we have is a resultsource (it is trivial
2121   # to extract deflator coderefs via $alias2source above).
2122   #
2123   # I don't see a way forward other than changing the way deflators are
2124   # invoked, and that's just bad...
2125 ###
2126
2127   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2128 }
2129
2130 # Returns a counting SELECT for a simple count
2131 # query. Abstracted so that a storage could override
2132 # this to { count => 'firstcol' } or whatever makes
2133 # sense as a performance optimization
2134 sub _count_select {
2135   #my ($self, $source, $rs_attrs) = @_;
2136   return { count => '*' };
2137 }
2138
2139
2140 sub source_bind_attributes {
2141   my ($self, $source) = @_;
2142
2143   my $bind_attributes;
2144
2145   my $colinfo = $source->columns_info;
2146
2147   for my $col (keys %$colinfo) {
2148     if (my $dt = $colinfo->{$col}{data_type} ) {
2149       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2150     }
2151   }
2152
2153   return $bind_attributes;
2154 }
2155
2156 =head2 select
2157
2158 =over 4
2159
2160 =item Arguments: $ident, $select, $condition, $attrs
2161
2162 =back
2163
2164 Handle a SQL select statement.
2165
2166 =cut
2167
2168 sub select {
2169   my $self = shift;
2170   my ($ident, $select, $condition, $attrs) = @_;
2171   return $self->cursor_class->new($self, \@_, $attrs);
2172 }
2173
2174 sub select_single {
2175   my $self = shift;
2176   my ($rv, $sth, @bind) = $self->_select(@_);
2177   my @row = $sth->fetchrow_array;
2178   my @nextrow = $sth->fetchrow_array if @row;
2179   if(@row && @nextrow) {
2180     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2181   }
2182   # Need to call finish() to work round broken DBDs
2183   $sth->finish();
2184   return @row;
2185 }
2186
2187 =head2 sql_limit_dialect
2188
2189 This is an accessor for the default SQL limit dialect used by a particular
2190 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2191 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2192 see L<DBIx::Class::SQLMaker::LimitDialects>.
2193
2194 =head2 sth
2195
2196 =over 4
2197
2198 =item Arguments: $sql
2199
2200 =back
2201
2202 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2203
2204 =cut
2205
2206 sub _dbh_sth {
2207   my ($self, $dbh, $sql) = @_;
2208
2209   # 3 is the if_active parameter which avoids active sth re-use
2210   my $sth = $self->disable_sth_caching
2211     ? $dbh->prepare($sql)
2212     : $dbh->prepare_cached($sql, {}, 3);
2213
2214   # XXX You would think RaiseError would make this impossible,
2215   #  but apparently that's not true :(
2216   $self->throw_exception($dbh->errstr) if !$sth;
2217
2218   $sth;
2219 }
2220
2221 sub sth {
2222   my ($self, $sql) = @_;
2223   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2224 }
2225
2226 sub _dbh_columns_info_for {
2227   my ($self, $dbh, $table) = @_;
2228
2229   if ($dbh->can('column_info')) {
2230     my %result;
2231     my $caught;
2232     try {
2233       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2234       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2235       $sth->execute();
2236       while ( my $info = $sth->fetchrow_hashref() ){
2237         my %column_info;
2238         $column_info{data_type}   = $info->{TYPE_NAME};
2239         $column_info{size}      = $info->{COLUMN_SIZE};
2240         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2241         $column_info{default_value} = $info->{COLUMN_DEF};
2242         my $col_name = $info->{COLUMN_NAME};
2243         $col_name =~ s/^\"(.*)\"$/$1/;
2244
2245         $result{$col_name} = \%column_info;
2246       }
2247     } catch {
2248       $caught = 1;
2249     };
2250     return \%result if !$caught && scalar keys %result;
2251   }
2252
2253   my %result;
2254   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2255   $sth->execute;
2256   my @columns = @{$sth->{NAME_lc}};
2257   for my $i ( 0 .. $#columns ){
2258     my %column_info;
2259     $column_info{data_type} = $sth->{TYPE}->[$i];
2260     $column_info{size} = $sth->{PRECISION}->[$i];
2261     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2262
2263     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2264       $column_info{data_type} = $1;
2265       $column_info{size}    = $2;
2266     }
2267
2268     $result{$columns[$i]} = \%column_info;
2269   }
2270   $sth->finish;
2271
2272   foreach my $col (keys %result) {
2273     my $colinfo = $result{$col};
2274     my $type_num = $colinfo->{data_type};
2275     my $type_name;
2276     if(defined $type_num && $dbh->can('type_info')) {
2277       my $type_info = $dbh->type_info($type_num);
2278       $type_name = $type_info->{TYPE_NAME} if $type_info;
2279       $colinfo->{data_type} = $type_name if $type_name;
2280     }
2281   }
2282
2283   return \%result;
2284 }
2285
2286 sub columns_info_for {
2287   my ($self, $table) = @_;
2288   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2289 }
2290
2291 =head2 last_insert_id
2292
2293 Return the row id of the last insert.
2294
2295 =cut
2296
2297 sub _dbh_last_insert_id {
2298     my ($self, $dbh, $source, $col) = @_;
2299
2300     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2301
2302     return $id if defined $id;
2303
2304     my $class = ref $self;
2305     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2306 }
2307
2308 sub last_insert_id {
2309   my $self = shift;
2310   $self->_dbh_last_insert_id ($self->_dbh, @_);
2311 }
2312
2313 =head2 _native_data_type
2314
2315 =over 4
2316
2317 =item Arguments: $type_name
2318
2319 =back
2320
2321 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2322 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2323 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2324
2325 The default implementation returns C<undef>, implement in your Storage driver if
2326 you need this functionality.
2327
2328 Should map types from other databases to the native RDBMS type, for example
2329 C<VARCHAR2> to C<VARCHAR>.
2330
2331 Types with modifiers should map to the underlying data type. For example,
2332 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2333
2334 Composite types should map to the container type, for example
2335 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2336
2337 =cut
2338
2339 sub _native_data_type {
2340   #my ($self, $data_type) = @_;
2341   return undef
2342 }
2343
2344 # Check if placeholders are supported at all
2345 sub _determine_supports_placeholders {
2346   my $self = shift;
2347   my $dbh  = $self->_get_dbh;
2348
2349   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2350   # but it is inaccurate more often than not
2351   return try {
2352     local $dbh->{PrintError} = 0;
2353     local $dbh->{RaiseError} = 1;
2354     $dbh->do('select ?', {}, 1);
2355     1;
2356   }
2357   catch {
2358     0;
2359   };
2360 }
2361
2362 # Check if placeholders bound to non-string types throw exceptions
2363 #
2364 sub _determine_supports_typeless_placeholders {
2365   my $self = shift;
2366   my $dbh  = $self->_get_dbh;
2367
2368   return try {
2369     local $dbh->{PrintError} = 0;
2370     local $dbh->{RaiseError} = 1;
2371     # this specifically tests a bind that is NOT a string
2372     $dbh->do('select 1 where 1 = ?', {}, 1);
2373     1;
2374   }
2375   catch {
2376     0;
2377   };
2378 }
2379
2380 =head2 sqlt_type
2381
2382 Returns the database driver name.
2383
2384 =cut
2385
2386 sub sqlt_type {
2387   shift->_get_dbh->{Driver}->{Name};
2388 }
2389
2390 =head2 bind_attribute_by_data_type
2391
2392 Given a datatype from column info, returns a database specific bind
2393 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2394 let the database planner just handle it.
2395
2396 Generally only needed for special case column types, like bytea in postgres.
2397
2398 =cut
2399
2400 sub bind_attribute_by_data_type {
2401     return;
2402 }
2403
2404 =head2 is_datatype_numeric
2405
2406 Given a datatype from column_info, returns a boolean value indicating if
2407 the current RDBMS considers it a numeric value. This controls how
2408 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2409 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2410 be performed instead of the usual C<eq>.
2411
2412 =cut
2413
2414 sub is_datatype_numeric {
2415   my ($self, $dt) = @_;
2416
2417   return 0 unless $dt;
2418
2419   return $dt =~ /^ (?:
2420     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2421   ) $/ix;
2422 }
2423
2424
2425 =head2 create_ddl_dir
2426
2427 =over 4
2428
2429 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2430
2431 =back
2432
2433 Creates a SQL file based on the Schema, for each of the specified
2434 database engines in C<\@databases> in the given directory.
2435 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2436
2437 Given a previous version number, this will also create a file containing
2438 the ALTER TABLE statements to transform the previous schema into the
2439 current one. Note that these statements may contain C<DROP TABLE> or
2440 C<DROP COLUMN> statements that can potentially destroy data.
2441
2442 The file names are created using the C<ddl_filename> method below, please
2443 override this method in your schema if you would like a different file
2444 name format. For the ALTER file, the same format is used, replacing
2445 $version in the name with "$preversion-$version".
2446
2447 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2448 The most common value for this would be C<< { add_drop_table => 1 } >>
2449 to have the SQL produced include a C<DROP TABLE> statement for each table
2450 created. For quoting purposes supply C<quote_table_names> and
2451 C<quote_field_names>.
2452
2453 If no arguments are passed, then the following default values are assumed:
2454
2455 =over 4
2456
2457 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2458
2459 =item version    - $schema->schema_version
2460
2461 =item directory  - './'
2462
2463 =item preversion - <none>
2464
2465 =back
2466
2467 By default, C<\%sqlt_args> will have
2468
2469  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2470
2471 merged with the hash passed in. To disable any of those features, pass in a
2472 hashref like the following
2473
2474  { ignore_constraint_names => 0, # ... other options }
2475
2476
2477 WARNING: You are strongly advised to check all SQL files created, before applying
2478 them.
2479
2480 =cut
2481
2482 sub create_ddl_dir {
2483   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2484
2485   unless ($dir) {
2486     carp "No directory given, using ./\n";
2487     $dir = './';
2488   } else {
2489       -d $dir
2490         or
2491       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2492         or
2493       $self->throw_exception(
2494         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2495       );
2496   }
2497
2498   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2499
2500   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2501   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2502
2503   my $schema_version = $schema->schema_version || '1.x';
2504   $version ||= $schema_version;
2505
2506   $sqltargs = {
2507     add_drop_table => 1,
2508     ignore_constraint_names => 1,
2509     ignore_index_names => 1,
2510     %{$sqltargs || {}}
2511   };
2512
2513   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2514     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2515   }
2516
2517   my $sqlt = SQL::Translator->new( $sqltargs );
2518
2519   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2520   my $sqlt_schema = $sqlt->translate({ data => $schema })
2521     or $self->throw_exception ($sqlt->error);
2522
2523   foreach my $db (@$databases) {
2524     $sqlt->reset();
2525     $sqlt->{schema} = $sqlt_schema;
2526     $sqlt->producer($db);
2527
2528     my $file;
2529     my $filename = $schema->ddl_filename($db, $version, $dir);
2530     if (-e $filename && ($version eq $schema_version )) {
2531       # if we are dumping the current version, overwrite the DDL
2532       carp "Overwriting existing DDL file - $filename";
2533       unlink($filename);
2534     }
2535
2536     my $output = $sqlt->translate;
2537     if(!$output) {
2538       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2539       next;
2540     }
2541     if(!open($file, ">$filename")) {
2542       $self->throw_exception("Can't open $filename for writing ($!)");
2543       next;
2544     }
2545     print $file $output;
2546     close($file);
2547
2548     next unless ($preversion);
2549
2550     require SQL::Translator::Diff;
2551
2552     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2553     if(!-e $prefilename) {
2554       carp("No previous schema file found ($prefilename)");
2555       next;
2556     }
2557
2558     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2559     if(-e $difffile) {
2560       carp("Overwriting existing diff file - $difffile");
2561       unlink($difffile);
2562     }
2563
2564     my $source_schema;
2565     {
2566       my $t = SQL::Translator->new($sqltargs);
2567       $t->debug( 0 );
2568       $t->trace( 0 );
2569
2570       $t->parser( $db )
2571         or $self->throw_exception ($t->error);
2572
2573       my $out = $t->translate( $prefilename )
2574         or $self->throw_exception ($t->error);
2575
2576       $source_schema = $t->schema;
2577
2578       $source_schema->name( $prefilename )
2579         unless ( $source_schema->name );
2580     }
2581
2582     # The "new" style of producers have sane normalization and can support
2583     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2584     # And we have to diff parsed SQL against parsed SQL.
2585     my $dest_schema = $sqlt_schema;
2586
2587     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2588       my $t = SQL::Translator->new($sqltargs);
2589       $t->debug( 0 );
2590       $t->trace( 0 );
2591
2592       $t->parser( $db )
2593         or $self->throw_exception ($t->error);
2594
2595       my $out = $t->translate( $filename )
2596         or $self->throw_exception ($t->error);
2597
2598       $dest_schema = $t->schema;
2599
2600       $dest_schema->name( $filename )
2601         unless $dest_schema->name;
2602     }
2603
2604     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2605                                                   $dest_schema,   $db,
2606                                                   $sqltargs
2607                                                  );
2608     if(!open $file, ">$difffile") {
2609       $self->throw_exception("Can't write to $difffile ($!)");
2610       next;
2611     }
2612     print $file $diff;
2613     close($file);
2614   }
2615 }
2616
2617 =head2 deployment_statements
2618
2619 =over 4
2620
2621 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2622
2623 =back
2624
2625 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2626
2627 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2628 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2629
2630 C<$directory> is used to return statements from files in a previously created
2631 L</create_ddl_dir> directory and is optional. The filenames are constructed
2632 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2633
2634 If no C<$directory> is specified then the statements are constructed on the
2635 fly using L<SQL::Translator> and C<$version> is ignored.
2636
2637 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2638
2639 =cut
2640
2641 sub deployment_statements {
2642   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2643   $type ||= $self->sqlt_type;
2644   $version ||= $schema->schema_version || '1.x';
2645   $dir ||= './';
2646   my $filename = $schema->ddl_filename($type, $version, $dir);
2647   if(-f $filename)
2648   {
2649       my $file;
2650       open($file, "<$filename")
2651         or $self->throw_exception("Can't open $filename ($!)");
2652       my @rows = <$file>;
2653       close($file);
2654       return join('', @rows);
2655   }
2656
2657   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2658     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2659   }
2660
2661   # sources needs to be a parser arg, but for simplicty allow at top level
2662   # coming in
2663   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2664       if exists $sqltargs->{sources};
2665
2666   my $tr = SQL::Translator->new(
2667     producer => "SQL::Translator::Producer::${type}",
2668     %$sqltargs,
2669     parser => 'SQL::Translator::Parser::DBIx::Class',
2670     data => $schema,
2671   );
2672
2673   my @ret;
2674   if (wantarray) {
2675     @ret = $tr->translate;
2676   }
2677   else {
2678     $ret[0] = $tr->translate;
2679   }
2680
2681   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2682     unless (@ret && defined $ret[0]);
2683
2684   return wantarray ? @ret : $ret[0];
2685 }
2686
2687 sub deploy {
2688   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2689   my $deploy = sub {
2690     my $line = shift;
2691     return if($line =~ /^--/);
2692     return if(!$line);
2693     # next if($line =~ /^DROP/m);
2694     return if($line =~ /^BEGIN TRANSACTION/m);
2695     return if($line =~ /^COMMIT/m);
2696     return if $line =~ /^\s+$/; # skip whitespace only
2697     $self->_query_start($line);
2698     try {
2699       # do a dbh_do cycle here, as we need some error checking in
2700       # place (even though we will ignore errors)
2701       $self->dbh_do (sub { $_[1]->do($line) });
2702     } catch {
2703       carp qq{$_ (running "${line}")};
2704     };
2705     $self->_query_end($line);
2706   };
2707   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2708   if (@statements > 1) {
2709     foreach my $statement (@statements) {
2710       $deploy->( $statement );
2711     }
2712   }
2713   elsif (@statements == 1) {
2714     foreach my $line ( split(";\n", $statements[0])) {
2715       $deploy->( $line );
2716     }
2717   }
2718 }
2719
2720 =head2 datetime_parser
2721
2722 Returns the datetime parser class
2723
2724 =cut
2725
2726 sub datetime_parser {
2727   my $self = shift;
2728   return $self->{datetime_parser} ||= do {
2729     $self->build_datetime_parser(@_);
2730   };
2731 }
2732
2733 =head2 datetime_parser_type
2734
2735 Defines (returns) the datetime parser class - currently hardwired to
2736 L<DateTime::Format::MySQL>
2737
2738 =cut
2739
2740 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2741
2742 =head2 build_datetime_parser
2743
2744 See L</datetime_parser>
2745
2746 =cut
2747
2748 sub build_datetime_parser {
2749   my $self = shift;
2750   my $type = $self->datetime_parser_type(@_);
2751   $self->ensure_class_loaded ($type);
2752   return $type;
2753 }
2754
2755
2756 =head2 is_replicating
2757
2758 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2759 replicate from a master database.  Default is undef, which is the result
2760 returned by databases that don't support replication.
2761
2762 =cut
2763
2764 sub is_replicating {
2765     return;
2766
2767 }
2768
2769 =head2 lag_behind_master
2770
2771 Returns a number that represents a certain amount of lag behind a master db
2772 when a given storage is replicating.  The number is database dependent, but
2773 starts at zero and increases with the amount of lag. Default in undef
2774
2775 =cut
2776
2777 sub lag_behind_master {
2778     return;
2779 }
2780
2781 =head2 relname_to_table_alias
2782
2783 =over 4
2784
2785 =item Arguments: $relname, $join_count
2786
2787 =back
2788
2789 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2790 queries.
2791
2792 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2793 way these aliases are named.
2794
2795 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2796 otherwise C<"$relname">.
2797
2798 =cut
2799
2800 sub relname_to_table_alias {
2801   my ($self, $relname, $join_count) = @_;
2802
2803   my $alias = ($join_count && $join_count > 1 ?
2804     join('_', $relname, $join_count) : $relname);
2805
2806   return $alias;
2807 }
2808
2809 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2810 # version and it may be necessary to amend or override it for a specific storage
2811 # if such binds are necessary.
2812 sub _max_column_bytesize {
2813   my ($self, $source, $col) = @_;
2814
2815   my $inf = $source->column_info($col);
2816   return $inf->{_max_bytesize} ||= do {
2817
2818     my $max_size;
2819
2820     if (my $data_type = $inf->{data_type}) {
2821       $data_type = lc($data_type);
2822
2823       # String/sized-binary types
2824       if ($data_type =~ /^(?:l?(?:var)?char(?:acter)?(?:\s*varying)?
2825                              |(?:var)?binary(?:\s*varying)?|raw)\b/x
2826       ) {
2827         $max_size = $inf->{size};
2828       }
2829       # Other charset/unicode types, assume scale of 4
2830       elsif ($data_type =~ /^(?:national\s*character(?:\s*varying)?|nchar
2831                               |univarchar
2832                               |nvarchar)\b/x
2833       ) {
2834         $max_size = $inf->{size} * 4 if $inf->{size};
2835       }
2836       # Blob types
2837       elsif ($self->_is_lob_type($data_type)) {
2838         # default to longreadlen
2839       }
2840       else {
2841         $max_size = 100;  # for all other (numeric?) datatypes
2842       }
2843     }
2844
2845     $max_size ||= $self->_get_dbh->{LongReadLen} || 8000;
2846   };
2847 }
2848
2849 # Determine if a data_type is some type of BLOB
2850 sub _is_lob_type {
2851   my ($self, $data_type) = @_;
2852   $data_type && ($data_type =~ /(?:lob|bfile|text|image|bytea|memo)/i
2853     || $data_type =~ /^long(?:\s*(?:raw|bit\s*varying|varbit|binary
2854                                   |varchar|character\s*varying|nvarchar
2855                                   |national\s*character\s*varying))?$/xi);
2856 }
2857
2858 1;
2859
2860 =head1 USAGE NOTES
2861
2862 =head2 DBIx::Class and AutoCommit
2863
2864 DBIx::Class can do some wonderful magic with handling exceptions,
2865 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2866 (the default) combined with C<txn_do> for transaction support.
2867
2868 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2869 in an assumed transaction between commits, and you're telling us you'd
2870 like to manage that manually.  A lot of the magic protections offered by
2871 this module will go away.  We can't protect you from exceptions due to database
2872 disconnects because we don't know anything about how to restart your
2873 transactions.  You're on your own for handling all sorts of exceptional
2874 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2875 be with raw DBI.
2876
2877
2878 =head1 AUTHORS
2879
2880 Matt S. Trout <mst@shadowcatsystems.co.uk>
2881
2882 Andy Grundman <andy@hybridized.org>
2883
2884 =head1 LICENSE
2885
2886 You may distribute this code under the same terms as Perl itself.
2887
2888 =cut