Make sure unaliased selectors and prefetch coexist peacefully
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use overload ();
20 use namespace::clean;
21
22
23 # default cursor class, overridable in connect_info attributes
24 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
25
26 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
27 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/
30   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
31   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
32   transaction_depth _dbh_autocommit  savepoints
33 /);
34
35 # the values for these accessors are picked out (and deleted) from
36 # the attribute hashref passed to connect_info
37 my @storage_options = qw/
38   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
39   disable_sth_caching unsafe auto_savepoint
40 /;
41 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
42
43
44 # capability definitions, using a 2-tiered accessor system
45 # The rationale is:
46 #
47 # A driver/user may define _use_X, which blindly without any checks says:
48 # "(do not) use this capability", (use_dbms_capability is an "inherited"
49 # type accessor)
50 #
51 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
52 # accessor, which in turn calls _determine_supports_X, and stores the return
53 # in a special slot on the storage object, which is wiped every time a $dbh
54 # reconnection takes place (it is not guaranteed that upon reconnection we
55 # will get the same rdbms version). _determine_supports_X does not need to
56 # exist on a driver, as we ->can for it before calling.
57
58 my @capabilities = (qw/
59   insert_returning
60   insert_returning_bound
61   placeholders
62   typeless_placeholders
63   join_optimizer
64 /);
65 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
66 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
67
68 # on by default, not strictly a capability (pending rewrite)
69 __PACKAGE__->_use_join_optimizer (1);
70 sub _determine_supports_join_optimizer { 1 };
71
72 # Each of these methods need _determine_driver called before itself
73 # in order to function reliably. This is a purely DRY optimization
74 #
75 # get_(use)_dbms_capability need to be called on the correct Storage
76 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
77 # _determine_supports_X which obv. needs a correct driver as well
78 my @rdbms_specific_methods = qw/
79   deployment_statements
80   sqlt_type
81   sql_maker
82   build_datetime_parser
83   datetime_parser_type
84
85   insert
86   insert_bulk
87   update
88   delete
89   select
90   select_single
91
92   get_use_dbms_capability
93   get_dbms_capability
94
95   _server_info
96   _get_server_version
97 /;
98
99 for my $meth (@rdbms_specific_methods) {
100
101   my $orig = __PACKAGE__->can ($meth)
102     or die "$meth is not a ::Storage::DBI method!";
103
104   no strict qw/refs/;
105   no warnings qw/redefine/;
106   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
107     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
108       $_[0]->_determine_driver;
109
110       # This for some reason crashes and burns on perl 5.8.1
111       # IFF the method ends up throwing an exception
112       #goto $_[0]->can ($meth);
113
114       my $cref = $_[0]->can ($meth);
115       goto $cref;
116     }
117     goto $orig;
118   };
119 }
120
121
122 =head1 NAME
123
124 DBIx::Class::Storage::DBI - DBI storage handler
125
126 =head1 SYNOPSIS
127
128   my $schema = MySchema->connect('dbi:SQLite:my.db');
129
130   $schema->storage->debug(1);
131
132   my @stuff = $schema->storage->dbh_do(
133     sub {
134       my ($storage, $dbh, @args) = @_;
135       $dbh->do("DROP TABLE authors");
136     },
137     @column_list
138   );
139
140   $schema->resultset('Book')->search({
141      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
142   });
143
144 =head1 DESCRIPTION
145
146 This class represents the connection to an RDBMS via L<DBI>.  See
147 L<DBIx::Class::Storage> for general information.  This pod only
148 documents DBI-specific methods and behaviors.
149
150 =head1 METHODS
151
152 =cut
153
154 sub new {
155   my $new = shift->next::method(@_);
156
157   $new->transaction_depth(0);
158   $new->_sql_maker_opts({});
159   $new->_dbh_details({});
160   $new->{savepoints} = [];
161   $new->{_in_dbh_do} = 0;
162   $new->{_dbh_gen} = 0;
163
164   # read below to see what this does
165   $new->_arm_global_destructor;
166
167   $new;
168 }
169
170 # This is hack to work around perl shooting stuff in random
171 # order on exit(). If we do not walk the remaining storage
172 # objects in an END block, there is a *small but real* chance
173 # of a fork()ed child to kill the parent's shared DBI handle,
174 # *before perl reaches the DESTROY in this package*
175 # Yes, it is ugly and effective.
176 # Additionally this registry is used by the CLONE method to
177 # make sure no handles are shared between threads
178 {
179   my %seek_and_destroy;
180
181   sub _arm_global_destructor {
182     my $self = shift;
183     my $key = refaddr ($self);
184     $seek_and_destroy{$key} = $self;
185     weaken ($seek_and_destroy{$key});
186   }
187
188   END {
189     local $?; # just in case the DBI destructor changes it somehow
190
191     # destroy just the object if not native to this process/thread
192     $_->_verify_pid for (grep
193       { defined $_ }
194       values %seek_and_destroy
195     );
196   }
197
198   sub CLONE {
199     # As per DBI's recommendation, DBIC disconnects all handles as
200     # soon as possible (DBIC will reconnect only on demand from within
201     # the thread)
202     for (values %seek_and_destroy) {
203       next unless $_;
204       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
205       $_->_dbh(undef);
206     }
207   }
208 }
209
210 sub DESTROY {
211   my $self = shift;
212
213   # some databases spew warnings on implicit disconnect
214   local $SIG{__WARN__} = sub {};
215   $self->_dbh(undef);
216
217   # this op is necessary, since the very last perl runtime statement
218   # triggers a global destruction shootout, and the $SIG localization
219   # may very well be destroyed before perl actually gets to do the
220   # $dbh undef
221   1;
222 }
223
224 # handle pid changes correctly - do not destroy parent's connection
225 sub _verify_pid {
226   my $self = shift;
227
228   my $pid = $self->_conn_pid;
229   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
230     $dbh->{InactiveDestroy} = 1;
231     $self->{_dbh_gen}++;
232     $self->_dbh(undef);
233   }
234
235   return;
236 }
237
238 =head2 connect_info
239
240 This method is normally called by L<DBIx::Class::Schema/connection>, which
241 encapsulates its argument list in an arrayref before passing them here.
242
243 The argument list may contain:
244
245 =over
246
247 =item *
248
249 The same 4-element argument set one would normally pass to
250 L<DBI/connect>, optionally followed by
251 L<extra attributes|/DBIx::Class specific connection attributes>
252 recognized by DBIx::Class:
253
254   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
255
256 =item *
257
258 A single code reference which returns a connected
259 L<DBI database handle|DBI/connect> optionally followed by
260 L<extra attributes|/DBIx::Class specific connection attributes> recognized
261 by DBIx::Class:
262
263   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
264
265 =item *
266
267 A single hashref with all the attributes and the dsn/user/password
268 mixed together:
269
270   $connect_info_args = [{
271     dsn => $dsn,
272     user => $user,
273     password => $pass,
274     %dbi_attributes,
275     %extra_attributes,
276   }];
277
278   $connect_info_args = [{
279     dbh_maker => sub { DBI->connect (...) },
280     %dbi_attributes,
281     %extra_attributes,
282   }];
283
284 This is particularly useful for L<Catalyst> based applications, allowing the
285 following config (L<Config::General> style):
286
287   <Model::DB>
288     schema_class   App::DB
289     <connect_info>
290       dsn          dbi:mysql:database=test
291       user         testuser
292       password     TestPass
293       AutoCommit   1
294     </connect_info>
295   </Model::DB>
296
297 The C<dsn>/C<user>/C<password> combination can be substituted by the
298 C<dbh_maker> key whose value is a coderef that returns a connected
299 L<DBI database handle|DBI/connect>
300
301 =back
302
303 Please note that the L<DBI> docs recommend that you always explicitly
304 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
305 recommends that it be set to I<1>, and that you perform transactions
306 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
307 to I<1> if you do not do explicitly set it to zero.  This is the default
308 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
309
310 =head3 DBIx::Class specific connection attributes
311
312 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
313 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
314 the following connection options. These options can be mixed in with your other
315 L<DBI> connection attributes, or placed in a separate hashref
316 (C<\%extra_attributes>) as shown above.
317
318 Every time C<connect_info> is invoked, any previous settings for
319 these options will be cleared before setting the new ones, regardless of
320 whether any options are specified in the new C<connect_info>.
321
322
323 =over
324
325 =item on_connect_do
326
327 Specifies things to do immediately after connecting or re-connecting to
328 the database.  Its value may contain:
329
330 =over
331
332 =item a scalar
333
334 This contains one SQL statement to execute.
335
336 =item an array reference
337
338 This contains SQL statements to execute in order.  Each element contains
339 a string or a code reference that returns a string.
340
341 =item a code reference
342
343 This contains some code to execute.  Unlike code references within an
344 array reference, its return value is ignored.
345
346 =back
347
348 =item on_disconnect_do
349
350 Takes arguments in the same form as L</on_connect_do> and executes them
351 immediately before disconnecting from the database.
352
353 Note, this only runs if you explicitly call L</disconnect> on the
354 storage object.
355
356 =item on_connect_call
357
358 A more generalized form of L</on_connect_do> that calls the specified
359 C<connect_call_METHOD> methods in your storage driver.
360
361   on_connect_do => 'select 1'
362
363 is equivalent to:
364
365   on_connect_call => [ [ do_sql => 'select 1' ] ]
366
367 Its values may contain:
368
369 =over
370
371 =item a scalar
372
373 Will call the C<connect_call_METHOD> method.
374
375 =item a code reference
376
377 Will execute C<< $code->($storage) >>
378
379 =item an array reference
380
381 Each value can be a method name or code reference.
382
383 =item an array of arrays
384
385 For each array, the first item is taken to be the C<connect_call_> method name
386 or code reference, and the rest are parameters to it.
387
388 =back
389
390 Some predefined storage methods you may use:
391
392 =over
393
394 =item do_sql
395
396 Executes a SQL string or a code reference that returns a SQL string. This is
397 what L</on_connect_do> and L</on_disconnect_do> use.
398
399 It can take:
400
401 =over
402
403 =item a scalar
404
405 Will execute the scalar as SQL.
406
407 =item an arrayref
408
409 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
410 attributes hashref and bind values.
411
412 =item a code reference
413
414 Will execute C<< $code->($storage) >> and execute the return array refs as
415 above.
416
417 =back
418
419 =item datetime_setup
420
421 Execute any statements necessary to initialize the database session to return
422 and accept datetime/timestamp values used with
423 L<DBIx::Class::InflateColumn::DateTime>.
424
425 Only necessary for some databases, see your specific storage driver for
426 implementation details.
427
428 =back
429
430 =item on_disconnect_call
431
432 Takes arguments in the same form as L</on_connect_call> and executes them
433 immediately before disconnecting from the database.
434
435 Calls the C<disconnect_call_METHOD> methods as opposed to the
436 C<connect_call_METHOD> methods called by L</on_connect_call>.
437
438 Note, this only runs if you explicitly call L</disconnect> on the
439 storage object.
440
441 =item disable_sth_caching
442
443 If set to a true value, this option will disable the caching of
444 statement handles via L<DBI/prepare_cached>.
445
446 =item limit_dialect
447
448 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
449 default L</sql_limit_dialect> setting of the storage (if any). For a list
450 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
451
452 =item quote_char
453
454 Specifies what characters to use to quote table and column names.
455
456 C<quote_char> expects either a single character, in which case is it
457 is placed on either side of the table/column name, or an arrayref of length
458 2 in which case the table/column name is placed between the elements.
459
460 For example under MySQL you should use C<< quote_char => '`' >>, and for
461 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
462
463 =item name_sep
464
465 This parameter is only useful in conjunction with C<quote_char>, and is used to
466 specify the character that separates elements (schemas, tables, columns) from
467 each other. If unspecified it defaults to the most commonly used C<.>.
468
469 =item unsafe
470
471 This Storage driver normally installs its own C<HandleError>, sets
472 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
473 all database handles, including those supplied by a coderef.  It does this
474 so that it can have consistent and useful error behavior.
475
476 If you set this option to a true value, Storage will not do its usual
477 modifications to the database handle's attributes, and instead relies on
478 the settings in your connect_info DBI options (or the values you set in
479 your connection coderef, in the case that you are connecting via coderef).
480
481 Note that your custom settings can cause Storage to malfunction,
482 especially if you set a C<HandleError> handler that suppresses exceptions
483 and/or disable C<RaiseError>.
484
485 =item auto_savepoint
486
487 If this option is true, L<DBIx::Class> will use savepoints when nesting
488 transactions, making it possible to recover from failure in the inner
489 transaction without having to abort all outer transactions.
490
491 =item cursor_class
492
493 Use this argument to supply a cursor class other than the default
494 L<DBIx::Class::Storage::DBI::Cursor>.
495
496 =back
497
498 Some real-life examples of arguments to L</connect_info> and
499 L<DBIx::Class::Schema/connect>
500
501   # Simple SQLite connection
502   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
503
504   # Connect via subref
505   ->connect_info([ sub { DBI->connect(...) } ]);
506
507   # Connect via subref in hashref
508   ->connect_info([{
509     dbh_maker => sub { DBI->connect(...) },
510     on_connect_do => 'alter session ...',
511   }]);
512
513   # A bit more complicated
514   ->connect_info(
515     [
516       'dbi:Pg:dbname=foo',
517       'postgres',
518       'my_pg_password',
519       { AutoCommit => 1 },
520       { quote_char => q{"} },
521     ]
522   );
523
524   # Equivalent to the previous example
525   ->connect_info(
526     [
527       'dbi:Pg:dbname=foo',
528       'postgres',
529       'my_pg_password',
530       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
531     ]
532   );
533
534   # Same, but with hashref as argument
535   # See parse_connect_info for explanation
536   ->connect_info(
537     [{
538       dsn         => 'dbi:Pg:dbname=foo',
539       user        => 'postgres',
540       password    => 'my_pg_password',
541       AutoCommit  => 1,
542       quote_char  => q{"},
543       name_sep    => q{.},
544     }]
545   );
546
547   # Subref + DBIx::Class-specific connection options
548   ->connect_info(
549     [
550       sub { DBI->connect(...) },
551       {
552           quote_char => q{`},
553           name_sep => q{@},
554           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
555           disable_sth_caching => 1,
556       },
557     ]
558   );
559
560
561
562 =cut
563
564 sub connect_info {
565   my ($self, $info) = @_;
566
567   return $self->_connect_info if !$info;
568
569   $self->_connect_info($info); # copy for _connect_info
570
571   $info = $self->_normalize_connect_info($info)
572     if ref $info eq 'ARRAY';
573
574   for my $storage_opt (keys %{ $info->{storage_options} }) {
575     my $value = $info->{storage_options}{$storage_opt};
576
577     $self->$storage_opt($value);
578   }
579
580   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
581   #  the new set of options
582   $self->_sql_maker(undef);
583   $self->_sql_maker_opts({});
584
585   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
586     my $value = $info->{sql_maker_options}{$sql_maker_opt};
587
588     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
589   }
590
591   my %attrs = (
592     %{ $self->_default_dbi_connect_attributes || {} },
593     %{ $info->{attributes} || {} },
594   );
595
596   my @args = @{ $info->{arguments} };
597
598   $self->_dbi_connect_info([@args,
599     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
600
601   # FIXME - dirty:
602   # save attributes them in a separate accessor so they are always
603   # introspectable, even in case of a CODE $dbhmaker
604   $self->_dbic_connect_attributes (\%attrs);
605
606   return $self->_connect_info;
607 }
608
609 sub _normalize_connect_info {
610   my ($self, $info_arg) = @_;
611   my %info;
612
613   my @args = @$info_arg;  # take a shallow copy for further mutilation
614
615   # combine/pre-parse arguments depending on invocation style
616
617   my %attrs;
618   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
619     %attrs = %{ $args[1] || {} };
620     @args = $args[0];
621   }
622   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
623     %attrs = %{$args[0]};
624     @args = ();
625     if (my $code = delete $attrs{dbh_maker}) {
626       @args = $code;
627
628       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
629       if (@ignored) {
630         carp sprintf (
631             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
632           . "to the result of 'dbh_maker'",
633
634           join (', ', map { "'$_'" } (@ignored) ),
635         );
636       }
637     }
638     else {
639       @args = delete @attrs{qw/dsn user password/};
640     }
641   }
642   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
643     %attrs = (
644       % { $args[3] || {} },
645       % { $args[4] || {} },
646     );
647     @args = @args[0,1,2];
648   }
649
650   $info{arguments} = \@args;
651
652   my @storage_opts = grep exists $attrs{$_},
653     @storage_options, 'cursor_class';
654
655   @{ $info{storage_options} }{@storage_opts} =
656     delete @attrs{@storage_opts} if @storage_opts;
657
658   my @sql_maker_opts = grep exists $attrs{$_},
659     qw/limit_dialect quote_char name_sep/;
660
661   @{ $info{sql_maker_options} }{@sql_maker_opts} =
662     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
663
664   $info{attributes} = \%attrs if %attrs;
665
666   return \%info;
667 }
668
669 sub _default_dbi_connect_attributes {
670   return {
671     AutoCommit => 1,
672     RaiseError => 1,
673     PrintError => 0,
674   };
675 }
676
677 =head2 on_connect_do
678
679 This method is deprecated in favour of setting via L</connect_info>.
680
681 =cut
682
683 =head2 on_disconnect_do
684
685 This method is deprecated in favour of setting via L</connect_info>.
686
687 =cut
688
689 sub _parse_connect_do {
690   my ($self, $type) = @_;
691
692   my $val = $self->$type;
693   return () if not defined $val;
694
695   my @res;
696
697   if (not ref($val)) {
698     push @res, [ 'do_sql', $val ];
699   } elsif (ref($val) eq 'CODE') {
700     push @res, $val;
701   } elsif (ref($val) eq 'ARRAY') {
702     push @res, map { [ 'do_sql', $_ ] } @$val;
703   } else {
704     $self->throw_exception("Invalid type for $type: ".ref($val));
705   }
706
707   return \@res;
708 }
709
710 =head2 dbh_do
711
712 Arguments: ($subref | $method_name), @extra_coderef_args?
713
714 Execute the given $subref or $method_name using the new exception-based
715 connection management.
716
717 The first two arguments will be the storage object that C<dbh_do> was called
718 on and a database handle to use.  Any additional arguments will be passed
719 verbatim to the called subref as arguments 2 and onwards.
720
721 Using this (instead of $self->_dbh or $self->dbh) ensures correct
722 exception handling and reconnection (or failover in future subclasses).
723
724 Your subref should have no side-effects outside of the database, as
725 there is the potential for your subref to be partially double-executed
726 if the database connection was stale/dysfunctional.
727
728 Example:
729
730   my @stuff = $schema->storage->dbh_do(
731     sub {
732       my ($storage, $dbh, @cols) = @_;
733       my $cols = join(q{, }, @cols);
734       $dbh->selectrow_array("SELECT $cols FROM foo");
735     },
736     @column_list
737   );
738
739 =cut
740
741 sub dbh_do {
742   my $self = shift;
743   my $code = shift;
744
745   my $dbh = $self->_get_dbh;
746
747   return $self->$code($dbh, @_)
748     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
749
750   local $self->{_in_dbh_do} = 1;
751
752   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
753   my $args = \@_;
754   return try {
755     $self->$code ($dbh, @$args);
756   } catch {
757     $self->throw_exception($_) if $self->connected;
758
759     # We were not connected - reconnect and retry, but let any
760     #  exception fall right through this time
761     carp "Retrying $code after catching disconnected exception: $_"
762       if $ENV{DBIC_DBIRETRY_DEBUG};
763
764     $self->_populate_dbh;
765     $self->$code($self->_dbh, @$args);
766   };
767 }
768
769 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
770 # It also informs dbh_do to bypass itself while under the direction of txn_do,
771 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
772 sub txn_do {
773   my $self = shift;
774   my $coderef = shift;
775
776   ref $coderef eq 'CODE' or $self->throw_exception
777     ('$coderef must be a CODE reference');
778
779   local $self->{_in_dbh_do} = 1;
780
781   my @result;
782   my $want = wantarray;
783
784   my $tried = 0;
785   while(1) {
786     my $exception;
787
788     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
789     my $args = \@_;
790
791     try {
792       $self->txn_begin;
793       my $txn_start_depth = $self->transaction_depth;
794       if($want) {
795           @result = $coderef->(@$args);
796       }
797       elsif(defined $want) {
798           $result[0] = $coderef->(@$args);
799       }
800       else {
801           $coderef->(@$args);
802       }
803
804       my $delta_txn = $txn_start_depth - $self->transaction_depth;
805       if ($delta_txn == 0) {
806         $self->txn_commit;
807       }
808       elsif ($delta_txn != 1) {
809         # an off-by-one would mean we fired a rollback
810         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
811       }
812     } catch {
813       $exception = $_;
814     };
815
816     if(! defined $exception) { return wantarray ? @result : $result[0] }
817
818     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
819       my $rollback_exception;
820       try { $self->txn_rollback } catch { $rollback_exception = shift };
821       if(defined $rollback_exception) {
822         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
823         $self->throw_exception($exception)  # propagate nested rollback
824           if $rollback_exception =~ /$exception_class/;
825
826         $self->throw_exception(
827           "Transaction aborted: ${exception}. "
828           . "Rollback failed: ${rollback_exception}"
829         );
830       }
831       $self->throw_exception($exception)
832     }
833
834     # We were not connected, and was first try - reconnect and retry
835     # via the while loop
836     carp "Retrying $coderef after catching disconnected exception: $exception"
837       if $ENV{DBIC_TXNRETRY_DEBUG};
838     $self->_populate_dbh;
839   }
840 }
841
842 =head2 disconnect
843
844 Our C<disconnect> method also performs a rollback first if the
845 database is not in C<AutoCommit> mode.
846
847 =cut
848
849 sub disconnect {
850   my ($self) = @_;
851
852   if( $self->_dbh ) {
853     my @actions;
854
855     push @actions, ( $self->on_disconnect_call || () );
856     push @actions, $self->_parse_connect_do ('on_disconnect_do');
857
858     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
859
860     $self->_dbh_rollback unless $self->_dbh_autocommit;
861
862     %{ $self->_dbh->{CachedKids} } = ();
863     $self->_dbh->disconnect;
864     $self->_dbh(undef);
865     $self->{_dbh_gen}++;
866   }
867 }
868
869 =head2 with_deferred_fk_checks
870
871 =over 4
872
873 =item Arguments: C<$coderef>
874
875 =item Return Value: The return value of $coderef
876
877 =back
878
879 Storage specific method to run the code ref with FK checks deferred or
880 in MySQL's case disabled entirely.
881
882 =cut
883
884 # Storage subclasses should override this
885 sub with_deferred_fk_checks {
886   my ($self, $sub) = @_;
887   $sub->();
888 }
889
890 =head2 connected
891
892 =over
893
894 =item Arguments: none
895
896 =item Return Value: 1|0
897
898 =back
899
900 Verifies that the current database handle is active and ready to execute
901 an SQL statement (e.g. the connection did not get stale, server is still
902 answering, etc.) This method is used internally by L</dbh>.
903
904 =cut
905
906 sub connected {
907   my $self = shift;
908   return 0 unless $self->_seems_connected;
909
910   #be on the safe side
911   local $self->_dbh->{RaiseError} = 1;
912
913   return $self->_ping;
914 }
915
916 sub _seems_connected {
917   my $self = shift;
918
919   $self->_verify_pid;
920
921   my $dbh = $self->_dbh
922     or return 0;
923
924   return $dbh->FETCH('Active');
925 }
926
927 sub _ping {
928   my $self = shift;
929
930   my $dbh = $self->_dbh or return 0;
931
932   return $dbh->ping;
933 }
934
935 sub ensure_connected {
936   my ($self) = @_;
937
938   unless ($self->connected) {
939     $self->_populate_dbh;
940   }
941 }
942
943 =head2 dbh
944
945 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
946 is guaranteed to be healthy by implicitly calling L</connected>, and if
947 necessary performing a reconnection before returning. Keep in mind that this
948 is very B<expensive> on some database engines. Consider using L</dbh_do>
949 instead.
950
951 =cut
952
953 sub dbh {
954   my ($self) = @_;
955
956   if (not $self->_dbh) {
957     $self->_populate_dbh;
958   } else {
959     $self->ensure_connected;
960   }
961   return $self->_dbh;
962 }
963
964 # this is the internal "get dbh or connect (don't check)" method
965 sub _get_dbh {
966   my $self = shift;
967   $self->_verify_pid;
968   $self->_populate_dbh unless $self->_dbh;
969   return $self->_dbh;
970 }
971
972 sub sql_maker {
973   my ($self) = @_;
974   unless ($self->_sql_maker) {
975     my $sql_maker_class = $self->sql_maker_class;
976     $self->ensure_class_loaded ($sql_maker_class);
977
978     my %opts = %{$self->_sql_maker_opts||{}};
979     my $dialect =
980       $opts{limit_dialect}
981         ||
982       $self->sql_limit_dialect
983         ||
984       do {
985         my $s_class = (ref $self) || $self;
986         carp (
987           "Your storage class ($s_class) does not set sql_limit_dialect and you "
988         . 'have not supplied an explicit limit_dialect in your connection_info. '
989         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
990         . 'databases but can be (and often is) painfully slow.'
991         );
992
993         'GenericSubQ';
994       }
995     ;
996
997     $self->_sql_maker($sql_maker_class->new(
998       bindtype=>'columns',
999       array_datatypes => 1,
1000       limit_dialect => $dialect,
1001       name_sep => '.',
1002       %opts,
1003     ));
1004   }
1005   return $self->_sql_maker;
1006 }
1007
1008 # nothing to do by default
1009 sub _rebless {}
1010 sub _init {}
1011
1012 sub _populate_dbh {
1013   my ($self) = @_;
1014
1015   my @info = @{$self->_dbi_connect_info || []};
1016   $self->_dbh(undef); # in case ->connected failed we might get sent here
1017   $self->_dbh_details({}); # reset everything we know
1018
1019   $self->_dbh($self->_connect(@info));
1020
1021   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1022
1023   $self->_determine_driver;
1024
1025   # Always set the transaction depth on connect, since
1026   #  there is no transaction in progress by definition
1027   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1028
1029   $self->_run_connection_actions unless $self->{_in_determine_driver};
1030 }
1031
1032 sub _run_connection_actions {
1033   my $self = shift;
1034   my @actions;
1035
1036   push @actions, ( $self->on_connect_call || () );
1037   push @actions, $self->_parse_connect_do ('on_connect_do');
1038
1039   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1040 }
1041
1042
1043
1044 sub set_use_dbms_capability {
1045   $_[0]->set_inherited ($_[1], $_[2]);
1046 }
1047
1048 sub get_use_dbms_capability {
1049   my ($self, $capname) = @_;
1050
1051   my $use = $self->get_inherited ($capname);
1052   return defined $use
1053     ? $use
1054     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1055   ;
1056 }
1057
1058 sub set_dbms_capability {
1059   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1060 }
1061
1062 sub get_dbms_capability {
1063   my ($self, $capname) = @_;
1064
1065   my $cap = $self->_dbh_details->{capability}{$capname};
1066
1067   unless (defined $cap) {
1068     if (my $meth = $self->can ("_determine$capname")) {
1069       $cap = $self->$meth ? 1 : 0;
1070     }
1071     else {
1072       $cap = 0;
1073     }
1074
1075     $self->set_dbms_capability ($capname, $cap);
1076   }
1077
1078   return $cap;
1079 }
1080
1081 sub _server_info {
1082   my $self = shift;
1083
1084   my $info;
1085   unless ($info = $self->_dbh_details->{info}) {
1086
1087     $info = {};
1088
1089     my $server_version = try { $self->_get_server_version };
1090
1091     if (defined $server_version) {
1092       $info->{dbms_version} = $server_version;
1093
1094       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1095       my @verparts = split (/\./, $numeric_version);
1096       if (
1097         @verparts
1098           &&
1099         $verparts[0] <= 999
1100       ) {
1101         # consider only up to 3 version parts, iff not more than 3 digits
1102         my @use_parts;
1103         while (@verparts && @use_parts < 3) {
1104           my $p = shift @verparts;
1105           last if $p > 999;
1106           push @use_parts, $p;
1107         }
1108         push @use_parts, 0 while @use_parts < 3;
1109
1110         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1111       }
1112     }
1113
1114     $self->_dbh_details->{info} = $info;
1115   }
1116
1117   return $info;
1118 }
1119
1120 sub _get_server_version {
1121   shift->_get_dbh->get_info(18);
1122 }
1123
1124 sub _determine_driver {
1125   my ($self) = @_;
1126
1127   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1128     my $started_connected = 0;
1129     local $self->{_in_determine_driver} = 1;
1130
1131     if (ref($self) eq __PACKAGE__) {
1132       my $driver;
1133       if ($self->_dbh) { # we are connected
1134         $driver = $self->_dbh->{Driver}{Name};
1135         $started_connected = 1;
1136       } else {
1137         # if connect_info is a CODEREF, we have no choice but to connect
1138         if (ref $self->_dbi_connect_info->[0] &&
1139             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1140           $self->_populate_dbh;
1141           $driver = $self->_dbh->{Driver}{Name};
1142         }
1143         else {
1144           # try to use dsn to not require being connected, the driver may still
1145           # force a connection in _rebless to determine version
1146           # (dsn may not be supplied at all if all we do is make a mock-schema)
1147           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1148           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1149           $driver ||= $ENV{DBI_DRIVER};
1150         }
1151       }
1152
1153       if ($driver) {
1154         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1155         if ($self->load_optional_class($storage_class)) {
1156           mro::set_mro($storage_class, 'c3');
1157           bless $self, $storage_class;
1158           $self->_rebless();
1159         }
1160       }
1161     }
1162
1163     $self->_driver_determined(1);
1164
1165     $self->_init; # run driver-specific initializations
1166
1167     $self->_run_connection_actions
1168         if !$started_connected && defined $self->_dbh;
1169   }
1170 }
1171
1172 sub _do_connection_actions {
1173   my $self          = shift;
1174   my $method_prefix = shift;
1175   my $call          = shift;
1176
1177   if (not ref($call)) {
1178     my $method = $method_prefix . $call;
1179     $self->$method(@_);
1180   } elsif (ref($call) eq 'CODE') {
1181     $self->$call(@_);
1182   } elsif (ref($call) eq 'ARRAY') {
1183     if (ref($call->[0]) ne 'ARRAY') {
1184       $self->_do_connection_actions($method_prefix, $_) for @$call;
1185     } else {
1186       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1187     }
1188   } else {
1189     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1190   }
1191
1192   return $self;
1193 }
1194
1195 sub connect_call_do_sql {
1196   my $self = shift;
1197   $self->_do_query(@_);
1198 }
1199
1200 sub disconnect_call_do_sql {
1201   my $self = shift;
1202   $self->_do_query(@_);
1203 }
1204
1205 # override in db-specific backend when necessary
1206 sub connect_call_datetime_setup { 1 }
1207
1208 sub _do_query {
1209   my ($self, $action) = @_;
1210
1211   if (ref $action eq 'CODE') {
1212     $action = $action->($self);
1213     $self->_do_query($_) foreach @$action;
1214   }
1215   else {
1216     # Most debuggers expect ($sql, @bind), so we need to exclude
1217     # the attribute hash which is the second argument to $dbh->do
1218     # furthermore the bind values are usually to be presented
1219     # as named arrayref pairs, so wrap those here too
1220     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1221     my $sql = shift @do_args;
1222     my $attrs = shift @do_args;
1223     my @bind = map { [ undef, $_ ] } @do_args;
1224
1225     $self->_query_start($sql, @bind);
1226     $self->_get_dbh->do($sql, $attrs, @do_args);
1227     $self->_query_end($sql, @bind);
1228   }
1229
1230   return $self;
1231 }
1232
1233 sub _connect {
1234   my ($self, @info) = @_;
1235
1236   $self->throw_exception("You failed to provide any connection info")
1237     if !@info;
1238
1239   my ($old_connect_via, $dbh);
1240
1241   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1242     $old_connect_via = $DBI::connect_via;
1243     $DBI::connect_via = 'connect';
1244   }
1245
1246   try {
1247     if(ref $info[0] eq 'CODE') {
1248        $dbh = $info[0]->();
1249     }
1250     else {
1251        $dbh = DBI->connect(@info);
1252     }
1253
1254     if (!$dbh) {
1255       die $DBI::errstr;
1256     }
1257
1258     unless ($self->unsafe) {
1259
1260       # this odd anonymous coderef dereference is in fact really
1261       # necessary to avoid the unwanted effect described in perl5
1262       # RT#75792
1263       sub {
1264         my $weak_self = $_[0];
1265         weaken $weak_self;
1266
1267         $_[1]->{HandleError} = sub {
1268           if ($weak_self) {
1269             $weak_self->throw_exception("DBI Exception: $_[0]");
1270           }
1271           else {
1272             # the handler may be invoked by something totally out of
1273             # the scope of DBIC
1274             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1275           }
1276         };
1277       }->($self, $dbh);
1278
1279       $dbh->{ShowErrorStatement} = 1;
1280       $dbh->{RaiseError} = 1;
1281       $dbh->{PrintError} = 0;
1282     }
1283   }
1284   catch {
1285     $self->throw_exception("DBI Connection failed: $_")
1286   }
1287   finally {
1288     $DBI::connect_via = $old_connect_via if $old_connect_via;
1289   };
1290
1291   $self->_dbh_autocommit($dbh->{AutoCommit});
1292   $dbh;
1293 }
1294
1295 sub svp_begin {
1296   my ($self, $name) = @_;
1297
1298   $name = $self->_svp_generate_name
1299     unless defined $name;
1300
1301   $self->throw_exception ("You can't use savepoints outside a transaction")
1302     if $self->{transaction_depth} == 0;
1303
1304   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1305     unless $self->can('_svp_begin');
1306
1307   push @{ $self->{savepoints} }, $name;
1308
1309   $self->debugobj->svp_begin($name) if $self->debug;
1310
1311   return $self->_svp_begin($name);
1312 }
1313
1314 sub svp_release {
1315   my ($self, $name) = @_;
1316
1317   $self->throw_exception ("You can't use savepoints outside a transaction")
1318     if $self->{transaction_depth} == 0;
1319
1320   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1321     unless $self->can('_svp_release');
1322
1323   if (defined $name) {
1324     $self->throw_exception ("Savepoint '$name' does not exist")
1325       unless grep { $_ eq $name } @{ $self->{savepoints} };
1326
1327     # Dig through the stack until we find the one we are releasing.  This keeps
1328     # the stack up to date.
1329     my $svp;
1330
1331     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1332   } else {
1333     $name = pop @{ $self->{savepoints} };
1334   }
1335
1336   $self->debugobj->svp_release($name) if $self->debug;
1337
1338   return $self->_svp_release($name);
1339 }
1340
1341 sub svp_rollback {
1342   my ($self, $name) = @_;
1343
1344   $self->throw_exception ("You can't use savepoints outside a transaction")
1345     if $self->{transaction_depth} == 0;
1346
1347   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1348     unless $self->can('_svp_rollback');
1349
1350   if (defined $name) {
1351       # If they passed us a name, verify that it exists in the stack
1352       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1353           $self->throw_exception("Savepoint '$name' does not exist!");
1354       }
1355
1356       # Dig through the stack until we find the one we are releasing.  This keeps
1357       # the stack up to date.
1358       while(my $s = pop(@{ $self->{savepoints} })) {
1359           last if($s eq $name);
1360       }
1361       # Add the savepoint back to the stack, as a rollback doesn't remove the
1362       # named savepoint, only everything after it.
1363       push(@{ $self->{savepoints} }, $name);
1364   } else {
1365       # We'll assume they want to rollback to the last savepoint
1366       $name = $self->{savepoints}->[-1];
1367   }
1368
1369   $self->debugobj->svp_rollback($name) if $self->debug;
1370
1371   return $self->_svp_rollback($name);
1372 }
1373
1374 sub _svp_generate_name {
1375   my ($self) = @_;
1376   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1377 }
1378
1379 sub txn_begin {
1380   my $self = shift;
1381
1382   # this means we have not yet connected and do not know the AC status
1383   # (e.g. coderef $dbh)
1384   if (! defined $self->_dbh_autocommit) {
1385     $self->ensure_connected;
1386   }
1387   # otherwise re-connect on pid changes, so
1388   # that the txn_depth is adjusted properly
1389   # the lightweight _get_dbh is good enoug here
1390   # (only superficial handle check, no pings)
1391   else {
1392     $self->_get_dbh;
1393   }
1394
1395   if($self->transaction_depth == 0) {
1396     $self->debugobj->txn_begin()
1397       if $self->debug;
1398     $self->_dbh_begin_work;
1399   }
1400   elsif ($self->auto_savepoint) {
1401     $self->svp_begin;
1402   }
1403   $self->{transaction_depth}++;
1404 }
1405
1406 sub _dbh_begin_work {
1407   my $self = shift;
1408
1409   # if the user is utilizing txn_do - good for him, otherwise we need to
1410   # ensure that the $dbh is healthy on BEGIN.
1411   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1412   # will be replaced by a failure of begin_work itself (which will be
1413   # then retried on reconnect)
1414   if ($self->{_in_dbh_do}) {
1415     $self->_dbh->begin_work;
1416   } else {
1417     $self->dbh_do(sub { $_[1]->begin_work });
1418   }
1419 }
1420
1421 sub txn_commit {
1422   my $self = shift;
1423   if ($self->{transaction_depth} == 1) {
1424     $self->debugobj->txn_commit()
1425       if ($self->debug);
1426     $self->_dbh_commit;
1427     $self->{transaction_depth} = 0
1428       if $self->_dbh_autocommit;
1429   }
1430   elsif($self->{transaction_depth} > 1) {
1431     $self->{transaction_depth}--;
1432     $self->svp_release
1433       if $self->auto_savepoint;
1434   }
1435   else {
1436     $self->throw_exception( 'Refusing to commit without a started transaction' );
1437   }
1438 }
1439
1440 sub _dbh_commit {
1441   my $self = shift;
1442   my $dbh  = $self->_dbh
1443     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1444   $dbh->commit;
1445 }
1446
1447 sub txn_rollback {
1448   my $self = shift;
1449   my $dbh = $self->_dbh;
1450   try {
1451     if ($self->{transaction_depth} == 1) {
1452       $self->debugobj->txn_rollback()
1453         if ($self->debug);
1454       $self->{transaction_depth} = 0
1455         if $self->_dbh_autocommit;
1456       $self->_dbh_rollback;
1457     }
1458     elsif($self->{transaction_depth} > 1) {
1459       $self->{transaction_depth}--;
1460       if ($self->auto_savepoint) {
1461         $self->svp_rollback;
1462         $self->svp_release;
1463       }
1464     }
1465     else {
1466       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1467     }
1468   }
1469   catch {
1470     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1471
1472     if ($_ !~ /$exception_class/) {
1473       # ensure that a failed rollback resets the transaction depth
1474       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1475     }
1476
1477     $self->throw_exception($_)
1478   };
1479 }
1480
1481 sub _dbh_rollback {
1482   my $self = shift;
1483   my $dbh  = $self->_dbh
1484     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1485   $dbh->rollback;
1486 }
1487
1488 # This used to be the top-half of _execute.  It was split out to make it
1489 #  easier to override in NoBindVars without duping the rest.  It takes up
1490 #  all of _execute's args, and emits $sql, @bind.
1491 sub _prep_for_execute {
1492   my ($self, $op, $extra_bind, $ident, $args) = @_;
1493
1494   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1495     $ident = $ident->from();
1496   }
1497
1498   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1499
1500   unshift(@bind,
1501     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1502       if $extra_bind;
1503   return ($sql, \@bind);
1504 }
1505
1506
1507 sub _fix_bind_params {
1508     my ($self, @bind) = @_;
1509
1510     ### Turn @bind from something like this:
1511     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1512     ### to this:
1513     ###   ( "'1'", "'1'", "'3'" )
1514     return
1515         map {
1516             if ( defined( $_ && $_->[1] ) ) {
1517                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1518             }
1519             else { q{NULL}; }
1520         } @bind;
1521 }
1522
1523 sub _query_start {
1524     my ( $self, $sql, @bind ) = @_;
1525
1526     if ( $self->debug ) {
1527         @bind = $self->_fix_bind_params(@bind);
1528
1529         $self->debugobj->query_start( $sql, @bind );
1530     }
1531 }
1532
1533 sub _query_end {
1534     my ( $self, $sql, @bind ) = @_;
1535
1536     if ( $self->debug ) {
1537         @bind = $self->_fix_bind_params(@bind);
1538         $self->debugobj->query_end( $sql, @bind );
1539     }
1540 }
1541
1542 sub _dbh_execute {
1543   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1544
1545   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1546
1547   $self->_query_start( $sql, @$bind );
1548
1549   my $sth = $self->sth($sql,$op);
1550
1551   my $placeholder_index = 1;
1552
1553   foreach my $bound (@$bind) {
1554     my $attributes = {};
1555     my($column_name, @data) = @$bound;
1556
1557     if ($bind_attributes) {
1558       $attributes = $bind_attributes->{$column_name}
1559       if defined $bind_attributes->{$column_name};
1560     }
1561
1562     foreach my $data (@data) {
1563       my $ref = ref $data;
1564
1565       if ($ref and overload::Method($data, '""') ) {
1566         $data = "$data";
1567       }
1568       elsif ($ref eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1569         $sth->bind_param_inout(
1570           $placeholder_index++,
1571           $data,
1572           $self->_max_column_bytesize($ident, $column_name),
1573           $attributes
1574         );
1575         next;
1576       }
1577
1578       $sth->bind_param($placeholder_index++, $data, $attributes);
1579     }
1580   }
1581
1582   # Can this fail without throwing an exception anyways???
1583   my $rv = $sth->execute();
1584   $self->throw_exception(
1585     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1586   ) if !$rv;
1587
1588   $self->_query_end( $sql, @$bind );
1589
1590   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1591 }
1592
1593 sub _execute {
1594     my $self = shift;
1595     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1596 }
1597
1598 sub _prefetch_autovalues {
1599   my ($self, $source, $to_insert) = @_;
1600
1601   my $colinfo = $source->columns_info;
1602
1603   my %values;
1604   for my $col (keys %$colinfo) {
1605     if (
1606       $colinfo->{$col}{auto_nextval}
1607         and
1608       (
1609         ! exists $to_insert->{$col}
1610           or
1611         ref $to_insert->{$col} eq 'SCALAR'
1612       )
1613     ) {
1614       $values{$col} = $self->_sequence_fetch(
1615         'NEXTVAL',
1616         ( $colinfo->{$col}{sequence} ||=
1617             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1618         ),
1619       );
1620     }
1621   }
1622
1623   \%values;
1624 }
1625
1626 sub insert {
1627   my ($self, $source, $to_insert) = @_;
1628
1629   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1630
1631   # fuse the values
1632   $to_insert = { %$to_insert, %$prefetched_values };
1633
1634   # list of primary keys we try to fetch from the database
1635   # both not-exsists and scalarrefs are considered
1636   my %fetch_pks;
1637   for ($source->primary_columns) {
1638     $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
1639       if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
1640   }
1641
1642   my ($sqla_opts, @ir_container);
1643   if ($self->_use_insert_returning) {
1644
1645     # retain order as declared in the resultsource
1646     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1647       push @{$sqla_opts->{returning}}, $_;
1648       $sqla_opts->{returning_container} = \@ir_container
1649         if $self->_use_insert_returning_bound;
1650     }
1651   }
1652
1653   my $bind_attributes = $self->source_bind_attributes($source);
1654
1655   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1656
1657   my %returned_cols;
1658
1659   if (my $retlist = $sqla_opts->{returning}) {
1660     @ir_container = try {
1661       local $SIG{__WARN__} = sub {};
1662       my @r = $sth->fetchrow_array;
1663       $sth->finish;
1664       @r;
1665     } unless @ir_container;
1666
1667     @returned_cols{@$retlist} = @ir_container if @ir_container;
1668   }
1669
1670   return { %$prefetched_values, %returned_cols };
1671 }
1672
1673
1674 ## Currently it is assumed that all values passed will be "normal", i.e. not
1675 ## scalar refs, or at least, all the same type as the first set, the statement is
1676 ## only prepped once.
1677 sub insert_bulk {
1678   my ($self, $source, $cols, $data) = @_;
1679
1680   my %colvalues;
1681   @colvalues{@$cols} = (0..$#$cols);
1682
1683   for my $i (0..$#$cols) {
1684     my $first_val = $data->[0][$i];
1685     next unless ref $first_val eq 'SCALAR';
1686
1687     $colvalues{ $cols->[$i] } = $first_val;
1688   }
1689
1690   # check for bad data and stringify stringifiable objects
1691   my $bad_slice = sub {
1692     my ($msg, $col_idx, $slice_idx) = @_;
1693     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1694       $msg,
1695       $cols->[$col_idx],
1696       do {
1697         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1698         Dumper {
1699           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1700         },
1701       }
1702     );
1703   };
1704
1705   for my $datum_idx (0..$#$data) {
1706     my $datum = $data->[$datum_idx];
1707
1708     for my $col_idx (0..$#$cols) {
1709       my $val            = $datum->[$col_idx];
1710       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1711       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1712
1713       if ($is_literal_sql) {
1714         if (not ref $val) {
1715           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1716         }
1717         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1718           $bad_slice->("$reftype reference found where literal SQL expected",
1719             $col_idx, $datum_idx);
1720         }
1721         elsif ($$val ne $$sqla_bind){
1722           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1723             $col_idx, $datum_idx);
1724         }
1725       }
1726       elsif (my $reftype = ref $val) {
1727         require overload;
1728         if (overload::Method($val, '""')) {
1729           $datum->[$col_idx] = "".$val;
1730         }
1731         else {
1732           $bad_slice->("$reftype reference found where bind expected",
1733             $col_idx, $datum_idx);
1734         }
1735       }
1736     }
1737   }
1738
1739   my ($sql, $bind) = $self->_prep_for_execute (
1740     'insert', undef, $source, [\%colvalues]
1741   );
1742
1743   if (! @$bind) {
1744     # if the bindlist is empty - make sure all "values" are in fact
1745     # literal scalarrefs. If not the case this means the storage ate
1746     # them away (e.g. the NoBindVars component) and interpolated them
1747     # directly into the SQL. This obviosly can't be good for multi-inserts
1748
1749     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1750       if first { ref $_ ne 'SCALAR' } values %colvalues;
1751   }
1752
1753   # neither _execute_array, nor _execute_inserts_with_no_binds are
1754   # atomic (even if _execute _array is a single call). Thus a safety
1755   # scope guard
1756   my $guard = $self->txn_scope_guard;
1757
1758   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1759   my $sth = $self->sth($sql);
1760   my $rv = do {
1761     if (@$bind) {
1762       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1763       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1764     }
1765     else {
1766       # bind_param_array doesn't work if there are no binds
1767       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1768     }
1769   };
1770
1771   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1772
1773   $guard->commit;
1774
1775   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1776 }
1777
1778 sub _execute_array {
1779   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1780
1781   ## This must be an arrayref, else nothing works!
1782   my $tuple_status = [];
1783
1784   ## Get the bind_attributes, if any exist
1785   my $bind_attributes = $self->source_bind_attributes($source);
1786
1787   ## Bind the values and execute
1788   my $placeholder_index = 1;
1789
1790   foreach my $bound (@$bind) {
1791
1792     my $attributes = {};
1793     my ($column_name, $data_index) = @$bound;
1794
1795     if( $bind_attributes ) {
1796       $attributes = $bind_attributes->{$column_name}
1797       if defined $bind_attributes->{$column_name};
1798     }
1799
1800     my @data = map { $_->[$data_index] } @$data;
1801
1802     $sth->bind_param_array(
1803       $placeholder_index,
1804       [@data],
1805       (%$attributes ?  $attributes : ()),
1806     );
1807     $placeholder_index++;
1808   }
1809
1810   my ($rv, $err);
1811   try {
1812     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1813   }
1814   catch {
1815     $err = shift;
1816   };
1817
1818   # Statement must finish even if there was an exception.
1819   try {
1820     $sth->finish
1821   }
1822   catch {
1823     $err = shift unless defined $err
1824   };
1825
1826   $err = $sth->errstr
1827     if (! defined $err and $sth->err);
1828
1829   if (defined $err) {
1830     my $i = 0;
1831     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1832
1833     $self->throw_exception("Unexpected populate error: $err")
1834       if ($i > $#$tuple_status);
1835
1836     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1837       ($tuple_status->[$i][1] || $err),
1838       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1839     );
1840   }
1841
1842   return $rv;
1843 }
1844
1845 sub _dbh_execute_array {
1846     my ($self, $sth, $tuple_status, @extra) = @_;
1847
1848     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1849 }
1850
1851 sub _dbh_execute_inserts_with_no_binds {
1852   my ($self, $sth, $count) = @_;
1853
1854   my $err;
1855   try {
1856     my $dbh = $self->_get_dbh;
1857     local $dbh->{RaiseError} = 1;
1858     local $dbh->{PrintError} = 0;
1859
1860     $sth->execute foreach 1..$count;
1861   }
1862   catch {
1863     $err = shift;
1864   };
1865
1866   # Make sure statement is finished even if there was an exception.
1867   try {
1868     $sth->finish
1869   }
1870   catch {
1871     $err = shift unless defined $err;
1872   };
1873
1874   $self->throw_exception($err) if defined $err;
1875
1876   return $count;
1877 }
1878
1879 sub update {
1880   my ($self, $source, @args) = @_;
1881
1882   my $bind_attrs = $self->source_bind_attributes($source);
1883
1884   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1885 }
1886
1887
1888 sub delete {
1889   my ($self, $source, @args) = @_;
1890
1891   my $bind_attrs = $self->source_bind_attributes($source);
1892
1893   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1894 }
1895
1896 # We were sent here because the $rs contains a complex search
1897 # which will require a subquery to select the correct rows
1898 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1899 #
1900 # Generating a single PK column subquery is trivial and supported
1901 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1902 # Look at _multipk_update_delete()
1903 sub _subq_update_delete {
1904   my $self = shift;
1905   my ($rs, $op, $values) = @_;
1906
1907   my $rsrc = $rs->result_source;
1908
1909   # quick check if we got a sane rs on our hands
1910   my @pcols = $rsrc->_pri_cols;
1911
1912   my $sel = $rs->_resolved_attrs->{select};
1913   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1914
1915   if (
1916       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1917         ne
1918       join ("\x00", sort @$sel )
1919   ) {
1920     $self->throw_exception (
1921       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1922     );
1923   }
1924
1925   if (@pcols == 1) {
1926     return $self->$op (
1927       $rsrc,
1928       $op eq 'update' ? $values : (),
1929       { $pcols[0] => { -in => $rs->as_query } },
1930     );
1931   }
1932
1933   else {
1934     return $self->_multipk_update_delete (@_);
1935   }
1936 }
1937
1938 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1939 # resultset update/delete involving subqueries. So by default resort
1940 # to simple (and inefficient) delete_all style per-row opearations,
1941 # while allowing specific storages to override this with a faster
1942 # implementation.
1943 #
1944 sub _multipk_update_delete {
1945   return shift->_per_row_update_delete (@_);
1946 }
1947
1948 # This is the default loop used to delete/update rows for multi PK
1949 # resultsets, and used by mysql exclusively (because it can't do anything
1950 # else).
1951 #
1952 # We do not use $row->$op style queries, because resultset update/delete
1953 # is not expected to cascade (this is what delete_all/update_all is for).
1954 #
1955 # There should be no race conditions as the entire operation is rolled
1956 # in a transaction.
1957 #
1958 sub _per_row_update_delete {
1959   my $self = shift;
1960   my ($rs, $op, $values) = @_;
1961
1962   my $rsrc = $rs->result_source;
1963   my @pcols = $rsrc->_pri_cols;
1964
1965   my $guard = $self->txn_scope_guard;
1966
1967   # emulate the return value of $sth->execute for non-selects
1968   my $row_cnt = '0E0';
1969
1970   my $subrs_cur = $rs->cursor;
1971   my @all_pk = $subrs_cur->all;
1972   for my $pks ( @all_pk) {
1973
1974     my $cond;
1975     for my $i (0.. $#pcols) {
1976       $cond->{$pcols[$i]} = $pks->[$i];
1977     }
1978
1979     $self->$op (
1980       $rsrc,
1981       $op eq 'update' ? $values : (),
1982       $cond,
1983     );
1984
1985     $row_cnt++;
1986   }
1987
1988   $guard->commit;
1989
1990   return $row_cnt;
1991 }
1992
1993 sub _select {
1994   my $self = shift;
1995   $self->_execute($self->_select_args(@_));
1996 }
1997
1998 sub _select_args_to_query {
1999   my $self = shift;
2000
2001   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
2002   #  = $self->_select_args($ident, $select, $cond, $attrs);
2003   my ($op, $bind, $ident, $bind_attrs, @args) =
2004     $self->_select_args(@_);
2005
2006   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2007   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
2008   $prepared_bind ||= [];
2009
2010   return wantarray
2011     ? ($sql, $prepared_bind, $bind_attrs)
2012     : \[ "($sql)", @$prepared_bind ]
2013   ;
2014 }
2015
2016 sub _select_args {
2017   my ($self, $ident, $select, $where, $attrs) = @_;
2018
2019   my $sql_maker = $self->sql_maker;
2020   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2021
2022   $attrs = {
2023     %$attrs,
2024     select => $select,
2025     from => $ident,
2026     where => $where,
2027     $rs_alias && $alias2source->{$rs_alias}
2028       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
2029       : ()
2030     ,
2031   };
2032
2033   # calculate bind_attrs before possible $ident mangling
2034   my $bind_attrs = {};
2035   for my $alias (keys %$alias2source) {
2036     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2037     for my $col (keys %$bindtypes) {
2038
2039       my $fqcn = join ('.', $alias, $col);
2040       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2041
2042       # Unqialified column names are nice, but at the same time can be
2043       # rather ambiguous. What we do here is basically go along with
2044       # the loop, adding an unqualified column slot to $bind_attrs,
2045       # alongside the fully qualified name. As soon as we encounter
2046       # another column by that name (which would imply another table)
2047       # we unset the unqualified slot and never add any info to it
2048       # to avoid erroneous type binding. If this happens the users
2049       # only choice will be to fully qualify his column name
2050
2051       if (exists $bind_attrs->{$col}) {
2052         $bind_attrs->{$col} = {};
2053       }
2054       else {
2055         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2056       }
2057     }
2058   }
2059
2060   # Sanity check the attributes (SQLMaker does it too, but
2061   # in case of a software_limit we'll never reach there)
2062   if (defined $attrs->{offset}) {
2063     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2064       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2065   }
2066   $attrs->{offset} ||= 0;
2067
2068   if (defined $attrs->{rows}) {
2069     $self->throw_exception("The rows attribute must be a positive integer if present")
2070       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2071   }
2072   elsif ($attrs->{offset}) {
2073     # MySQL actually recommends this approach.  I cringe.
2074     $attrs->{rows} = $sql_maker->__max_int;
2075   }
2076
2077   my @limit;
2078
2079   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2080   # storage, unless software limit was requested
2081   if (
2082     #limited has_many
2083     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2084        ||
2085     # grouped prefetch (to satisfy group_by == select)
2086     ( $attrs->{group_by}
2087         &&
2088       @{$attrs->{group_by}}
2089         &&
2090       $attrs->{_prefetch_selector_range}
2091     )
2092   ) {
2093     ($ident, $select, $where, $attrs)
2094       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2095   }
2096   elsif (! $attrs->{software_limit} ) {
2097     push @limit, $attrs->{rows}, $attrs->{offset};
2098   }
2099
2100   # try to simplify the joinmap further (prune unreferenced type-single joins)
2101   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2102
2103 ###
2104   # This would be the point to deflate anything found in $where
2105   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2106   # expect a row object. And all we have is a resultsource (it is trivial
2107   # to extract deflator coderefs via $alias2source above).
2108   #
2109   # I don't see a way forward other than changing the way deflators are
2110   # invoked, and that's just bad...
2111 ###
2112
2113   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2114 }
2115
2116 # Returns a counting SELECT for a simple count
2117 # query. Abstracted so that a storage could override
2118 # this to { count => 'firstcol' } or whatever makes
2119 # sense as a performance optimization
2120 sub _count_select {
2121   #my ($self, $source, $rs_attrs) = @_;
2122   return { count => '*' };
2123 }
2124
2125
2126 sub source_bind_attributes {
2127   my ($self, $source) = @_;
2128
2129   my $bind_attributes;
2130
2131   my $colinfo = $source->columns_info;
2132
2133   for my $col (keys %$colinfo) {
2134     if (my $dt = $colinfo->{$col}{data_type} ) {
2135       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2136     }
2137   }
2138
2139   return $bind_attributes;
2140 }
2141
2142 =head2 select
2143
2144 =over 4
2145
2146 =item Arguments: $ident, $select, $condition, $attrs
2147
2148 =back
2149
2150 Handle a SQL select statement.
2151
2152 =cut
2153
2154 sub select {
2155   my $self = shift;
2156   my ($ident, $select, $condition, $attrs) = @_;
2157   return $self->cursor_class->new($self, \@_, $attrs);
2158 }
2159
2160 sub select_single {
2161   my $self = shift;
2162   my ($rv, $sth, @bind) = $self->_select(@_);
2163   my @row = $sth->fetchrow_array;
2164   my @nextrow = $sth->fetchrow_array if @row;
2165   if(@row && @nextrow) {
2166     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2167   }
2168   # Need to call finish() to work round broken DBDs
2169   $sth->finish();
2170   return @row;
2171 }
2172
2173 =head2 sql_limit_dialect
2174
2175 This is an accessor for the default SQL limit dialect used by a particular
2176 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2177 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2178 see L<DBIx::Class::SQLMaker::LimitDialects>.
2179
2180 =head2 sth
2181
2182 =over 4
2183
2184 =item Arguments: $sql
2185
2186 =back
2187
2188 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2189
2190 =cut
2191
2192 sub _dbh_sth {
2193   my ($self, $dbh, $sql) = @_;
2194
2195   # 3 is the if_active parameter which avoids active sth re-use
2196   my $sth = $self->disable_sth_caching
2197     ? $dbh->prepare($sql)
2198     : $dbh->prepare_cached($sql, {}, 3);
2199
2200   # XXX You would think RaiseError would make this impossible,
2201   #  but apparently that's not true :(
2202   $self->throw_exception($dbh->errstr) if !$sth;
2203
2204   $sth;
2205 }
2206
2207 sub sth {
2208   my ($self, $sql) = @_;
2209   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2210 }
2211
2212 sub _dbh_columns_info_for {
2213   my ($self, $dbh, $table) = @_;
2214
2215   if ($dbh->can('column_info')) {
2216     my %result;
2217     my $caught;
2218     try {
2219       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2220       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2221       $sth->execute();
2222       while ( my $info = $sth->fetchrow_hashref() ){
2223         my %column_info;
2224         $column_info{data_type}   = $info->{TYPE_NAME};
2225         $column_info{size}      = $info->{COLUMN_SIZE};
2226         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2227         $column_info{default_value} = $info->{COLUMN_DEF};
2228         my $col_name = $info->{COLUMN_NAME};
2229         $col_name =~ s/^\"(.*)\"$/$1/;
2230
2231         $result{$col_name} = \%column_info;
2232       }
2233     } catch {
2234       $caught = 1;
2235     };
2236     return \%result if !$caught && scalar keys %result;
2237   }
2238
2239   my %result;
2240   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2241   $sth->execute;
2242   my @columns = @{$sth->{NAME_lc}};
2243   for my $i ( 0 .. $#columns ){
2244     my %column_info;
2245     $column_info{data_type} = $sth->{TYPE}->[$i];
2246     $column_info{size} = $sth->{PRECISION}->[$i];
2247     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2248
2249     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2250       $column_info{data_type} = $1;
2251       $column_info{size}    = $2;
2252     }
2253
2254     $result{$columns[$i]} = \%column_info;
2255   }
2256   $sth->finish;
2257
2258   foreach my $col (keys %result) {
2259     my $colinfo = $result{$col};
2260     my $type_num = $colinfo->{data_type};
2261     my $type_name;
2262     if(defined $type_num && $dbh->can('type_info')) {
2263       my $type_info = $dbh->type_info($type_num);
2264       $type_name = $type_info->{TYPE_NAME} if $type_info;
2265       $colinfo->{data_type} = $type_name if $type_name;
2266     }
2267   }
2268
2269   return \%result;
2270 }
2271
2272 sub columns_info_for {
2273   my ($self, $table) = @_;
2274   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2275 }
2276
2277 =head2 last_insert_id
2278
2279 Return the row id of the last insert.
2280
2281 =cut
2282
2283 sub _dbh_last_insert_id {
2284     my ($self, $dbh, $source, $col) = @_;
2285
2286     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2287
2288     return $id if defined $id;
2289
2290     my $class = ref $self;
2291     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2292 }
2293
2294 sub last_insert_id {
2295   my $self = shift;
2296   $self->_dbh_last_insert_id ($self->_dbh, @_);
2297 }
2298
2299 =head2 _native_data_type
2300
2301 =over 4
2302
2303 =item Arguments: $type_name
2304
2305 =back
2306
2307 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2308 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2309 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2310
2311 The default implementation returns C<undef>, implement in your Storage driver if
2312 you need this functionality.
2313
2314 Should map types from other databases to the native RDBMS type, for example
2315 C<VARCHAR2> to C<VARCHAR>.
2316
2317 Types with modifiers should map to the underlying data type. For example,
2318 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2319
2320 Composite types should map to the container type, for example
2321 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2322
2323 =cut
2324
2325 sub _native_data_type {
2326   #my ($self, $data_type) = @_;
2327   return undef
2328 }
2329
2330 # Check if placeholders are supported at all
2331 sub _determine_supports_placeholders {
2332   my $self = shift;
2333   my $dbh  = $self->_get_dbh;
2334
2335   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2336   # but it is inaccurate more often than not
2337   return try {
2338     local $dbh->{PrintError} = 0;
2339     local $dbh->{RaiseError} = 1;
2340     $dbh->do('select ?', {}, 1);
2341     1;
2342   }
2343   catch {
2344     0;
2345   };
2346 }
2347
2348 # Check if placeholders bound to non-string types throw exceptions
2349 #
2350 sub _determine_supports_typeless_placeholders {
2351   my $self = shift;
2352   my $dbh  = $self->_get_dbh;
2353
2354   return try {
2355     local $dbh->{PrintError} = 0;
2356     local $dbh->{RaiseError} = 1;
2357     # this specifically tests a bind that is NOT a string
2358     $dbh->do('select 1 where 1 = ?', {}, 1);
2359     1;
2360   }
2361   catch {
2362     0;
2363   };
2364 }
2365
2366 =head2 sqlt_type
2367
2368 Returns the database driver name.
2369
2370 =cut
2371
2372 sub sqlt_type {
2373   shift->_get_dbh->{Driver}->{Name};
2374 }
2375
2376 =head2 bind_attribute_by_data_type
2377
2378 Given a datatype from column info, returns a database specific bind
2379 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2380 let the database planner just handle it.
2381
2382 Generally only needed for special case column types, like bytea in postgres.
2383
2384 =cut
2385
2386 sub bind_attribute_by_data_type {
2387     return;
2388 }
2389
2390 =head2 is_datatype_numeric
2391
2392 Given a datatype from column_info, returns a boolean value indicating if
2393 the current RDBMS considers it a numeric value. This controls how
2394 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2395 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2396 be performed instead of the usual C<eq>.
2397
2398 =cut
2399
2400 sub is_datatype_numeric {
2401   my ($self, $dt) = @_;
2402
2403   return 0 unless $dt;
2404
2405   return $dt =~ /^ (?:
2406     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2407   ) $/ix;
2408 }
2409
2410
2411 =head2 create_ddl_dir
2412
2413 =over 4
2414
2415 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2416
2417 =back
2418
2419 Creates a SQL file based on the Schema, for each of the specified
2420 database engines in C<\@databases> in the given directory.
2421 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2422
2423 Given a previous version number, this will also create a file containing
2424 the ALTER TABLE statements to transform the previous schema into the
2425 current one. Note that these statements may contain C<DROP TABLE> or
2426 C<DROP COLUMN> statements that can potentially destroy data.
2427
2428 The file names are created using the C<ddl_filename> method below, please
2429 override this method in your schema if you would like a different file
2430 name format. For the ALTER file, the same format is used, replacing
2431 $version in the name with "$preversion-$version".
2432
2433 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2434 The most common value for this would be C<< { add_drop_table => 1 } >>
2435 to have the SQL produced include a C<DROP TABLE> statement for each table
2436 created. For quoting purposes supply C<quote_table_names> and
2437 C<quote_field_names>.
2438
2439 If no arguments are passed, then the following default values are assumed:
2440
2441 =over 4
2442
2443 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2444
2445 =item version    - $schema->schema_version
2446
2447 =item directory  - './'
2448
2449 =item preversion - <none>
2450
2451 =back
2452
2453 By default, C<\%sqlt_args> will have
2454
2455  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2456
2457 merged with the hash passed in. To disable any of those features, pass in a
2458 hashref like the following
2459
2460  { ignore_constraint_names => 0, # ... other options }
2461
2462
2463 WARNING: You are strongly advised to check all SQL files created, before applying
2464 them.
2465
2466 =cut
2467
2468 sub create_ddl_dir {
2469   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2470
2471   unless ($dir) {
2472     carp "No directory given, using ./\n";
2473     $dir = './';
2474   } else {
2475       -d $dir
2476         or
2477       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2478         or
2479       $self->throw_exception(
2480         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2481       );
2482   }
2483
2484   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2485
2486   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2487   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2488
2489   my $schema_version = $schema->schema_version || '1.x';
2490   $version ||= $schema_version;
2491
2492   $sqltargs = {
2493     add_drop_table => 1,
2494     ignore_constraint_names => 1,
2495     ignore_index_names => 1,
2496     %{$sqltargs || {}}
2497   };
2498
2499   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2500     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2501   }
2502
2503   my $sqlt = SQL::Translator->new( $sqltargs );
2504
2505   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2506   my $sqlt_schema = $sqlt->translate({ data => $schema })
2507     or $self->throw_exception ($sqlt->error);
2508
2509   foreach my $db (@$databases) {
2510     $sqlt->reset();
2511     $sqlt->{schema} = $sqlt_schema;
2512     $sqlt->producer($db);
2513
2514     my $file;
2515     my $filename = $schema->ddl_filename($db, $version, $dir);
2516     if (-e $filename && ($version eq $schema_version )) {
2517       # if we are dumping the current version, overwrite the DDL
2518       carp "Overwriting existing DDL file - $filename";
2519       unlink($filename);
2520     }
2521
2522     my $output = $sqlt->translate;
2523     if(!$output) {
2524       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2525       next;
2526     }
2527     if(!open($file, ">$filename")) {
2528       $self->throw_exception("Can't open $filename for writing ($!)");
2529       next;
2530     }
2531     print $file $output;
2532     close($file);
2533
2534     next unless ($preversion);
2535
2536     require SQL::Translator::Diff;
2537
2538     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2539     if(!-e $prefilename) {
2540       carp("No previous schema file found ($prefilename)");
2541       next;
2542     }
2543
2544     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2545     if(-e $difffile) {
2546       carp("Overwriting existing diff file - $difffile");
2547       unlink($difffile);
2548     }
2549
2550     my $source_schema;
2551     {
2552       my $t = SQL::Translator->new($sqltargs);
2553       $t->debug( 0 );
2554       $t->trace( 0 );
2555
2556       $t->parser( $db )
2557         or $self->throw_exception ($t->error);
2558
2559       my $out = $t->translate( $prefilename )
2560         or $self->throw_exception ($t->error);
2561
2562       $source_schema = $t->schema;
2563
2564       $source_schema->name( $prefilename )
2565         unless ( $source_schema->name );
2566     }
2567
2568     # The "new" style of producers have sane normalization and can support
2569     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2570     # And we have to diff parsed SQL against parsed SQL.
2571     my $dest_schema = $sqlt_schema;
2572
2573     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2574       my $t = SQL::Translator->new($sqltargs);
2575       $t->debug( 0 );
2576       $t->trace( 0 );
2577
2578       $t->parser( $db )
2579         or $self->throw_exception ($t->error);
2580
2581       my $out = $t->translate( $filename )
2582         or $self->throw_exception ($t->error);
2583
2584       $dest_schema = $t->schema;
2585
2586       $dest_schema->name( $filename )
2587         unless $dest_schema->name;
2588     }
2589
2590     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2591                                                   $dest_schema,   $db,
2592                                                   $sqltargs
2593                                                  );
2594     if(!open $file, ">$difffile") {
2595       $self->throw_exception("Can't write to $difffile ($!)");
2596       next;
2597     }
2598     print $file $diff;
2599     close($file);
2600   }
2601 }
2602
2603 =head2 deployment_statements
2604
2605 =over 4
2606
2607 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2608
2609 =back
2610
2611 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2612
2613 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2614 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2615
2616 C<$directory> is used to return statements from files in a previously created
2617 L</create_ddl_dir> directory and is optional. The filenames are constructed
2618 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2619
2620 If no C<$directory> is specified then the statements are constructed on the
2621 fly using L<SQL::Translator> and C<$version> is ignored.
2622
2623 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2624
2625 =cut
2626
2627 sub deployment_statements {
2628   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2629   $type ||= $self->sqlt_type;
2630   $version ||= $schema->schema_version || '1.x';
2631   $dir ||= './';
2632   my $filename = $schema->ddl_filename($type, $version, $dir);
2633   if(-f $filename)
2634   {
2635       my $file;
2636       open($file, "<$filename")
2637         or $self->throw_exception("Can't open $filename ($!)");
2638       my @rows = <$file>;
2639       close($file);
2640       return join('', @rows);
2641   }
2642
2643   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2644     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2645   }
2646
2647   # sources needs to be a parser arg, but for simplicty allow at top level
2648   # coming in
2649   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2650       if exists $sqltargs->{sources};
2651
2652   my $tr = SQL::Translator->new(
2653     producer => "SQL::Translator::Producer::${type}",
2654     %$sqltargs,
2655     parser => 'SQL::Translator::Parser::DBIx::Class',
2656     data => $schema,
2657   );
2658
2659   my @ret;
2660   if (wantarray) {
2661     @ret = $tr->translate;
2662   }
2663   else {
2664     $ret[0] = $tr->translate;
2665   }
2666
2667   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2668     unless (@ret && defined $ret[0]);
2669
2670   return wantarray ? @ret : $ret[0];
2671 }
2672
2673 sub deploy {
2674   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2675   my $deploy = sub {
2676     my $line = shift;
2677     return if($line =~ /^--/);
2678     return if(!$line);
2679     # next if($line =~ /^DROP/m);
2680     return if($line =~ /^BEGIN TRANSACTION/m);
2681     return if($line =~ /^COMMIT/m);
2682     return if $line =~ /^\s+$/; # skip whitespace only
2683     $self->_query_start($line);
2684     try {
2685       # do a dbh_do cycle here, as we need some error checking in
2686       # place (even though we will ignore errors)
2687       $self->dbh_do (sub { $_[1]->do($line) });
2688     } catch {
2689       carp qq{$_ (running "${line}")};
2690     };
2691     $self->_query_end($line);
2692   };
2693   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2694   if (@statements > 1) {
2695     foreach my $statement (@statements) {
2696       $deploy->( $statement );
2697     }
2698   }
2699   elsif (@statements == 1) {
2700     foreach my $line ( split(";\n", $statements[0])) {
2701       $deploy->( $line );
2702     }
2703   }
2704 }
2705
2706 =head2 datetime_parser
2707
2708 Returns the datetime parser class
2709
2710 =cut
2711
2712 sub datetime_parser {
2713   my $self = shift;
2714   return $self->{datetime_parser} ||= do {
2715     $self->build_datetime_parser(@_);
2716   };
2717 }
2718
2719 =head2 datetime_parser_type
2720
2721 Defines (returns) the datetime parser class - currently hardwired to
2722 L<DateTime::Format::MySQL>
2723
2724 =cut
2725
2726 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2727
2728 =head2 build_datetime_parser
2729
2730 See L</datetime_parser>
2731
2732 =cut
2733
2734 sub build_datetime_parser {
2735   my $self = shift;
2736   my $type = $self->datetime_parser_type(@_);
2737   $self->ensure_class_loaded ($type);
2738   return $type;
2739 }
2740
2741
2742 =head2 is_replicating
2743
2744 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2745 replicate from a master database.  Default is undef, which is the result
2746 returned by databases that don't support replication.
2747
2748 =cut
2749
2750 sub is_replicating {
2751     return;
2752
2753 }
2754
2755 =head2 lag_behind_master
2756
2757 Returns a number that represents a certain amount of lag behind a master db
2758 when a given storage is replicating.  The number is database dependent, but
2759 starts at zero and increases with the amount of lag. Default in undef
2760
2761 =cut
2762
2763 sub lag_behind_master {
2764     return;
2765 }
2766
2767 =head2 relname_to_table_alias
2768
2769 =over 4
2770
2771 =item Arguments: $relname, $join_count
2772
2773 =back
2774
2775 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2776 queries.
2777
2778 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2779 way these aliases are named.
2780
2781 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2782 otherwise C<"$relname">.
2783
2784 =cut
2785
2786 sub relname_to_table_alias {
2787   my ($self, $relname, $join_count) = @_;
2788
2789   my $alias = ($join_count && $join_count > 1 ?
2790     join('_', $relname, $join_count) : $relname);
2791
2792   return $alias;
2793 }
2794
2795 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2796 # version and it may be necessary to amend or override it for a specific storage
2797 # if such binds are necessary.
2798 sub _max_column_bytesize {
2799   my ($self, $source, $col) = @_;
2800
2801   my $inf = $source->column_info($col);
2802   return $inf->{_max_bytesize} ||= do {
2803
2804     my $max_size;
2805
2806     if (my $data_type = $inf->{data_type}) {
2807       $data_type = lc($data_type);
2808
2809       # String/sized-binary types
2810       if ($data_type =~ /^(?:l?(?:var)?char(?:acter)?(?:\s*varying)?
2811                              |(?:var)?binary(?:\s*varying)?|raw)\b/x
2812       ) {
2813         $max_size = $inf->{size};
2814       }
2815       # Other charset/unicode types, assume scale of 4
2816       elsif ($data_type =~ /^(?:national\s*character(?:\s*varying)?|nchar
2817                               |univarchar
2818                               |nvarchar)\b/x
2819       ) {
2820         $max_size = $inf->{size} * 4 if $inf->{size};
2821       }
2822       # Blob types
2823       elsif ($self->_is_lob_type($data_type)) {
2824         # default to longreadlen
2825       }
2826       else {
2827         $max_size = 100;  # for all other (numeric?) datatypes
2828       }
2829     }
2830
2831     $max_size ||= $self->_get_dbh->{LongReadLen} || 8000;
2832   };
2833 }
2834
2835 # Determine if a data_type is some type of BLOB
2836 sub _is_lob_type {
2837   my ($self, $data_type) = @_;
2838   $data_type && ($data_type =~ /(?:lob|bfile|text|image|bytea|memo)/i
2839     || $data_type =~ /^long(?:\s*(?:raw|bit\s*varying|varbit|binary
2840                                   |varchar|character\s*varying|nvarchar
2841                                   |national\s*character\s*varying))?$/xi);
2842 }
2843
2844 1;
2845
2846 =head1 USAGE NOTES
2847
2848 =head2 DBIx::Class and AutoCommit
2849
2850 DBIx::Class can do some wonderful magic with handling exceptions,
2851 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2852 (the default) combined with C<txn_do> for transaction support.
2853
2854 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2855 in an assumed transaction between commits, and you're telling us you'd
2856 like to manage that manually.  A lot of the magic protections offered by
2857 this module will go away.  We can't protect you from exceptions due to database
2858 disconnects because we don't know anything about how to restart your
2859 transactions.  You're on your own for handling all sorts of exceptional
2860 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2861 be with raw DBI.
2862
2863
2864 =head1 AUTHORS
2865
2866 Matt S. Trout <mst@shadowcatsystems.co.uk>
2867
2868 Andy Grundman <andy@hybridized.org>
2869
2870 =head1 LICENSE
2871
2872 You may distribute this code under the same terms as Perl itself.
2873
2874 =cut