70dcb24c886f115fd6ba91b81eb5cbde9271f345
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use overload ();
20 use namespace::clean;
21
22
23 # default cursor class, overridable in connect_info attributes
24 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
25
26 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
27 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/
30   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
31   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
32   transaction_depth _dbh_autocommit  savepoints
33 /);
34
35 # the values for these accessors are picked out (and deleted) from
36 # the attribute hashref passed to connect_info
37 my @storage_options = qw/
38   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
39   disable_sth_caching unsafe auto_savepoint
40 /;
41 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
42
43
44 # capability definitions, using a 2-tiered accessor system
45 # The rationale is:
46 #
47 # A driver/user may define _use_X, which blindly without any checks says:
48 # "(do not) use this capability", (use_dbms_capability is an "inherited"
49 # type accessor)
50 #
51 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
52 # accessor, which in turn calls _determine_supports_X, and stores the return
53 # in a special slot on the storage object, which is wiped every time a $dbh
54 # reconnection takes place (it is not guaranteed that upon reconnection we
55 # will get the same rdbms version). _determine_supports_X does not need to
56 # exist on a driver, as we ->can for it before calling.
57
58 my @capabilities = (qw/
59   insert_returning
60   insert_returning_bound
61   placeholders
62   typeless_placeholders
63   join_optimizer
64 /);
65 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
66 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
67
68 # on by default, not strictly a capability (pending rewrite)
69 __PACKAGE__->_use_join_optimizer (1);
70 sub _determine_supports_join_optimizer { 1 };
71
72 # Each of these methods need _determine_driver called before itself
73 # in order to function reliably. This is a purely DRY optimization
74 #
75 # get_(use)_dbms_capability need to be called on the correct Storage
76 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
77 # _determine_supports_X which obv. needs a correct driver as well
78 my @rdbms_specific_methods = qw/
79   deployment_statements
80   sqlt_type
81   sql_maker
82   build_datetime_parser
83   datetime_parser_type
84
85   insert
86   insert_bulk
87   update
88   delete
89   select
90   select_single
91
92   get_use_dbms_capability
93   get_dbms_capability
94
95   _server_info
96   _get_server_version
97 /;
98
99 for my $meth (@rdbms_specific_methods) {
100
101   my $orig = __PACKAGE__->can ($meth)
102     or die "$meth is not a ::Storage::DBI method!";
103
104   no strict qw/refs/;
105   no warnings qw/redefine/;
106   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
107     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
108       $_[0]->_determine_driver;
109
110       # This for some reason crashes and burns on perl 5.8.1
111       # IFF the method ends up throwing an exception
112       #goto $_[0]->can ($meth);
113
114       my $cref = $_[0]->can ($meth);
115       goto $cref;
116     }
117     goto $orig;
118   };
119 }
120
121
122 =head1 NAME
123
124 DBIx::Class::Storage::DBI - DBI storage handler
125
126 =head1 SYNOPSIS
127
128   my $schema = MySchema->connect('dbi:SQLite:my.db');
129
130   $schema->storage->debug(1);
131
132   my @stuff = $schema->storage->dbh_do(
133     sub {
134       my ($storage, $dbh, @args) = @_;
135       $dbh->do("DROP TABLE authors");
136     },
137     @column_list
138   );
139
140   $schema->resultset('Book')->search({
141      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
142   });
143
144 =head1 DESCRIPTION
145
146 This class represents the connection to an RDBMS via L<DBI>.  See
147 L<DBIx::Class::Storage> for general information.  This pod only
148 documents DBI-specific methods and behaviors.
149
150 =head1 METHODS
151
152 =cut
153
154 sub new {
155   my $new = shift->next::method(@_);
156
157   $new->transaction_depth(0);
158   $new->_sql_maker_opts({});
159   $new->_dbh_details({});
160   $new->{savepoints} = [];
161   $new->{_in_dbh_do} = 0;
162   $new->{_dbh_gen} = 0;
163
164   # read below to see what this does
165   $new->_arm_global_destructor;
166
167   $new;
168 }
169
170 # This is hack to work around perl shooting stuff in random
171 # order on exit(). If we do not walk the remaining storage
172 # objects in an END block, there is a *small but real* chance
173 # of a fork()ed child to kill the parent's shared DBI handle,
174 # *before perl reaches the DESTROY in this package*
175 # Yes, it is ugly and effective.
176 # Additionally this registry is used by the CLONE method to
177 # make sure no handles are shared between threads
178 {
179   my %seek_and_destroy;
180
181   sub _arm_global_destructor {
182     my $self = shift;
183     my $key = refaddr ($self);
184     $seek_and_destroy{$key} = $self;
185     weaken ($seek_and_destroy{$key});
186   }
187
188   END {
189     local $?; # just in case the DBI destructor changes it somehow
190
191     # destroy just the object if not native to this process/thread
192     $_->_verify_pid for (grep
193       { defined $_ }
194       values %seek_and_destroy
195     );
196   }
197
198   sub CLONE {
199     # As per DBI's recommendation, DBIC disconnects all handles as
200     # soon as possible (DBIC will reconnect only on demand from within
201     # the thread)
202     for (values %seek_and_destroy) {
203       next unless $_;
204       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
205       $_->_dbh(undef);
206     }
207   }
208 }
209
210 sub DESTROY {
211   my $self = shift;
212
213   # some databases spew warnings on implicit disconnect
214   local $SIG{__WARN__} = sub {};
215   $self->_dbh(undef);
216
217   # this op is necessary, since the very last perl runtime statement
218   # triggers a global destruction shootout, and the $SIG localization
219   # may very well be destroyed before perl actually gets to do the
220   # $dbh undef
221   1;
222 }
223
224 # handle pid changes correctly - do not destroy parent's connection
225 sub _verify_pid {
226   my $self = shift;
227
228   my $pid = $self->_conn_pid;
229   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
230     $dbh->{InactiveDestroy} = 1;
231     $self->{_dbh_gen}++;
232     $self->_dbh(undef);
233   }
234
235   return;
236 }
237
238 =head2 connect_info
239
240 This method is normally called by L<DBIx::Class::Schema/connection>, which
241 encapsulates its argument list in an arrayref before passing them here.
242
243 The argument list may contain:
244
245 =over
246
247 =item *
248
249 The same 4-element argument set one would normally pass to
250 L<DBI/connect>, optionally followed by
251 L<extra attributes|/DBIx::Class specific connection attributes>
252 recognized by DBIx::Class:
253
254   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
255
256 =item *
257
258 A single code reference which returns a connected
259 L<DBI database handle|DBI/connect> optionally followed by
260 L<extra attributes|/DBIx::Class specific connection attributes> recognized
261 by DBIx::Class:
262
263   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
264
265 =item *
266
267 A single hashref with all the attributes and the dsn/user/password
268 mixed together:
269
270   $connect_info_args = [{
271     dsn => $dsn,
272     user => $user,
273     password => $pass,
274     %dbi_attributes,
275     %extra_attributes,
276   }];
277
278   $connect_info_args = [{
279     dbh_maker => sub { DBI->connect (...) },
280     %dbi_attributes,
281     %extra_attributes,
282   }];
283
284 This is particularly useful for L<Catalyst> based applications, allowing the
285 following config (L<Config::General> style):
286
287   <Model::DB>
288     schema_class   App::DB
289     <connect_info>
290       dsn          dbi:mysql:database=test
291       user         testuser
292       password     TestPass
293       AutoCommit   1
294     </connect_info>
295   </Model::DB>
296
297 The C<dsn>/C<user>/C<password> combination can be substituted by the
298 C<dbh_maker> key whose value is a coderef that returns a connected
299 L<DBI database handle|DBI/connect>
300
301 =back
302
303 Please note that the L<DBI> docs recommend that you always explicitly
304 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
305 recommends that it be set to I<1>, and that you perform transactions
306 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
307 to I<1> if you do not do explicitly set it to zero.  This is the default
308 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
309
310 =head3 DBIx::Class specific connection attributes
311
312 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
313 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
314 the following connection options. These options can be mixed in with your other
315 L<DBI> connection attributes, or placed in a separate hashref
316 (C<\%extra_attributes>) as shown above.
317
318 Every time C<connect_info> is invoked, any previous settings for
319 these options will be cleared before setting the new ones, regardless of
320 whether any options are specified in the new C<connect_info>.
321
322
323 =over
324
325 =item on_connect_do
326
327 Specifies things to do immediately after connecting or re-connecting to
328 the database.  Its value may contain:
329
330 =over
331
332 =item a scalar
333
334 This contains one SQL statement to execute.
335
336 =item an array reference
337
338 This contains SQL statements to execute in order.  Each element contains
339 a string or a code reference that returns a string.
340
341 =item a code reference
342
343 This contains some code to execute.  Unlike code references within an
344 array reference, its return value is ignored.
345
346 =back
347
348 =item on_disconnect_do
349
350 Takes arguments in the same form as L</on_connect_do> and executes them
351 immediately before disconnecting from the database.
352
353 Note, this only runs if you explicitly call L</disconnect> on the
354 storage object.
355
356 =item on_connect_call
357
358 A more generalized form of L</on_connect_do> that calls the specified
359 C<connect_call_METHOD> methods in your storage driver.
360
361   on_connect_do => 'select 1'
362
363 is equivalent to:
364
365   on_connect_call => [ [ do_sql => 'select 1' ] ]
366
367 Its values may contain:
368
369 =over
370
371 =item a scalar
372
373 Will call the C<connect_call_METHOD> method.
374
375 =item a code reference
376
377 Will execute C<< $code->($storage) >>
378
379 =item an array reference
380
381 Each value can be a method name or code reference.
382
383 =item an array of arrays
384
385 For each array, the first item is taken to be the C<connect_call_> method name
386 or code reference, and the rest are parameters to it.
387
388 =back
389
390 Some predefined storage methods you may use:
391
392 =over
393
394 =item do_sql
395
396 Executes a SQL string or a code reference that returns a SQL string. This is
397 what L</on_connect_do> and L</on_disconnect_do> use.
398
399 It can take:
400
401 =over
402
403 =item a scalar
404
405 Will execute the scalar as SQL.
406
407 =item an arrayref
408
409 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
410 attributes hashref and bind values.
411
412 =item a code reference
413
414 Will execute C<< $code->($storage) >> and execute the return array refs as
415 above.
416
417 =back
418
419 =item datetime_setup
420
421 Execute any statements necessary to initialize the database session to return
422 and accept datetime/timestamp values used with
423 L<DBIx::Class::InflateColumn::DateTime>.
424
425 Only necessary for some databases, see your specific storage driver for
426 implementation details.
427
428 =back
429
430 =item on_disconnect_call
431
432 Takes arguments in the same form as L</on_connect_call> and executes them
433 immediately before disconnecting from the database.
434
435 Calls the C<disconnect_call_METHOD> methods as opposed to the
436 C<connect_call_METHOD> methods called by L</on_connect_call>.
437
438 Note, this only runs if you explicitly call L</disconnect> on the
439 storage object.
440
441 =item disable_sth_caching
442
443 If set to a true value, this option will disable the caching of
444 statement handles via L<DBI/prepare_cached>.
445
446 =item limit_dialect
447
448 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
449 default L</sql_limit_dialect> setting of the storage (if any). For a list
450 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
451
452 =item quote_char
453
454 Specifies what characters to use to quote table and column names.
455
456 C<quote_char> expects either a single character, in which case is it
457 is placed on either side of the table/column name, or an arrayref of length
458 2 in which case the table/column name is placed between the elements.
459
460 For example under MySQL you should use C<< quote_char => '`' >>, and for
461 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
462
463 =item name_sep
464
465 This parameter is only useful in conjunction with C<quote_char>, and is used to
466 specify the character that separates elements (schemas, tables, columns) from
467 each other. If unspecified it defaults to the most commonly used C<.>.
468
469 =item unsafe
470
471 This Storage driver normally installs its own C<HandleError>, sets
472 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
473 all database handles, including those supplied by a coderef.  It does this
474 so that it can have consistent and useful error behavior.
475
476 If you set this option to a true value, Storage will not do its usual
477 modifications to the database handle's attributes, and instead relies on
478 the settings in your connect_info DBI options (or the values you set in
479 your connection coderef, in the case that you are connecting via coderef).
480
481 Note that your custom settings can cause Storage to malfunction,
482 especially if you set a C<HandleError> handler that suppresses exceptions
483 and/or disable C<RaiseError>.
484
485 =item auto_savepoint
486
487 If this option is true, L<DBIx::Class> will use savepoints when nesting
488 transactions, making it possible to recover from failure in the inner
489 transaction without having to abort all outer transactions.
490
491 =item cursor_class
492
493 Use this argument to supply a cursor class other than the default
494 L<DBIx::Class::Storage::DBI::Cursor>.
495
496 =back
497
498 Some real-life examples of arguments to L</connect_info> and
499 L<DBIx::Class::Schema/connect>
500
501   # Simple SQLite connection
502   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
503
504   # Connect via subref
505   ->connect_info([ sub { DBI->connect(...) } ]);
506
507   # Connect via subref in hashref
508   ->connect_info([{
509     dbh_maker => sub { DBI->connect(...) },
510     on_connect_do => 'alter session ...',
511   }]);
512
513   # A bit more complicated
514   ->connect_info(
515     [
516       'dbi:Pg:dbname=foo',
517       'postgres',
518       'my_pg_password',
519       { AutoCommit => 1 },
520       { quote_char => q{"} },
521     ]
522   );
523
524   # Equivalent to the previous example
525   ->connect_info(
526     [
527       'dbi:Pg:dbname=foo',
528       'postgres',
529       'my_pg_password',
530       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
531     ]
532   );
533
534   # Same, but with hashref as argument
535   # See parse_connect_info for explanation
536   ->connect_info(
537     [{
538       dsn         => 'dbi:Pg:dbname=foo',
539       user        => 'postgres',
540       password    => 'my_pg_password',
541       AutoCommit  => 1,
542       quote_char  => q{"},
543       name_sep    => q{.},
544     }]
545   );
546
547   # Subref + DBIx::Class-specific connection options
548   ->connect_info(
549     [
550       sub { DBI->connect(...) },
551       {
552           quote_char => q{`},
553           name_sep => q{@},
554           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
555           disable_sth_caching => 1,
556       },
557     ]
558   );
559
560
561
562 =cut
563
564 sub connect_info {
565   my ($self, $info) = @_;
566
567   return $self->_connect_info if !$info;
568
569   $self->_connect_info($info); # copy for _connect_info
570
571   $info = $self->_normalize_connect_info($info)
572     if ref $info eq 'ARRAY';
573
574   for my $storage_opt (keys %{ $info->{storage_options} }) {
575     my $value = $info->{storage_options}{$storage_opt};
576
577     $self->$storage_opt($value);
578   }
579
580   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
581   #  the new set of options
582   $self->_sql_maker(undef);
583   $self->_sql_maker_opts({});
584
585   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
586     my $value = $info->{sql_maker_options}{$sql_maker_opt};
587
588     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
589   }
590
591   my %attrs = (
592     %{ $self->_default_dbi_connect_attributes || {} },
593     %{ $info->{attributes} || {} },
594   );
595
596   my @args = @{ $info->{arguments} };
597
598   $self->_dbi_connect_info([@args,
599     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
600
601   # FIXME - dirty:
602   # save attributes them in a separate accessor so they are always
603   # introspectable, even in case of a CODE $dbhmaker
604   $self->_dbic_connect_attributes (\%attrs);
605
606   return $self->_connect_info;
607 }
608
609 sub _normalize_connect_info {
610   my ($self, $info_arg) = @_;
611   my %info;
612
613   my @args = @$info_arg;  # take a shallow copy for further mutilation
614
615   # combine/pre-parse arguments depending on invocation style
616
617   my %attrs;
618   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
619     %attrs = %{ $args[1] || {} };
620     @args = $args[0];
621   }
622   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
623     %attrs = %{$args[0]};
624     @args = ();
625     if (my $code = delete $attrs{dbh_maker}) {
626       @args = $code;
627
628       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
629       if (@ignored) {
630         carp sprintf (
631             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
632           . "to the result of 'dbh_maker'",
633
634           join (', ', map { "'$_'" } (@ignored) ),
635         );
636       }
637     }
638     else {
639       @args = delete @attrs{qw/dsn user password/};
640     }
641   }
642   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
643     %attrs = (
644       % { $args[3] || {} },
645       % { $args[4] || {} },
646     );
647     @args = @args[0,1,2];
648   }
649
650   $info{arguments} = \@args;
651
652   my @storage_opts = grep exists $attrs{$_},
653     @storage_options, 'cursor_class';
654
655   @{ $info{storage_options} }{@storage_opts} =
656     delete @attrs{@storage_opts} if @storage_opts;
657
658   my @sql_maker_opts = grep exists $attrs{$_},
659     qw/limit_dialect quote_char name_sep/;
660
661   @{ $info{sql_maker_options} }{@sql_maker_opts} =
662     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
663
664   $info{attributes} = \%attrs if %attrs;
665
666   return \%info;
667 }
668
669 sub _default_dbi_connect_attributes {
670   return {
671     AutoCommit => 1,
672     RaiseError => 1,
673     PrintError => 0,
674   };
675 }
676
677 =head2 on_connect_do
678
679 This method is deprecated in favour of setting via L</connect_info>.
680
681 =cut
682
683 =head2 on_disconnect_do
684
685 This method is deprecated in favour of setting via L</connect_info>.
686
687 =cut
688
689 sub _parse_connect_do {
690   my ($self, $type) = @_;
691
692   my $val = $self->$type;
693   return () if not defined $val;
694
695   my @res;
696
697   if (not ref($val)) {
698     push @res, [ 'do_sql', $val ];
699   } elsif (ref($val) eq 'CODE') {
700     push @res, $val;
701   } elsif (ref($val) eq 'ARRAY') {
702     push @res, map { [ 'do_sql', $_ ] } @$val;
703   } else {
704     $self->throw_exception("Invalid type for $type: ".ref($val));
705   }
706
707   return \@res;
708 }
709
710 =head2 dbh_do
711
712 Arguments: ($subref | $method_name), @extra_coderef_args?
713
714 Execute the given $subref or $method_name using the new exception-based
715 connection management.
716
717 The first two arguments will be the storage object that C<dbh_do> was called
718 on and a database handle to use.  Any additional arguments will be passed
719 verbatim to the called subref as arguments 2 and onwards.
720
721 Using this (instead of $self->_dbh or $self->dbh) ensures correct
722 exception handling and reconnection (or failover in future subclasses).
723
724 Your subref should have no side-effects outside of the database, as
725 there is the potential for your subref to be partially double-executed
726 if the database connection was stale/dysfunctional.
727
728 Example:
729
730   my @stuff = $schema->storage->dbh_do(
731     sub {
732       my ($storage, $dbh, @cols) = @_;
733       my $cols = join(q{, }, @cols);
734       $dbh->selectrow_array("SELECT $cols FROM foo");
735     },
736     @column_list
737   );
738
739 =cut
740
741 sub dbh_do {
742   my $self = shift;
743   my $code = shift;
744
745   my $dbh = $self->_get_dbh;
746
747   return $self->$code($dbh, @_)
748     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
749
750   local $self->{_in_dbh_do} = 1;
751
752   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
753   my $args = \@_;
754   return try {
755     $self->$code ($dbh, @$args);
756   } catch {
757     $self->throw_exception($_) if $self->connected;
758
759     # We were not connected - reconnect and retry, but let any
760     #  exception fall right through this time
761     carp "Retrying $code after catching disconnected exception: $_"
762       if $ENV{DBIC_DBIRETRY_DEBUG};
763
764     $self->_populate_dbh;
765     $self->$code($self->_dbh, @$args);
766   };
767 }
768
769 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
770 # It also informs dbh_do to bypass itself while under the direction of txn_do,
771 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
772 sub txn_do {
773   my $self = shift;
774   my $coderef = shift;
775
776   ref $coderef eq 'CODE' or $self->throw_exception
777     ('$coderef must be a CODE reference');
778
779   local $self->{_in_dbh_do} = 1;
780
781   my @result;
782   my $want = wantarray;
783
784   my $tried = 0;
785   while(1) {
786     my $exception;
787
788     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
789     my $args = \@_;
790
791     try {
792       $self->txn_begin;
793       my $txn_start_depth = $self->transaction_depth;
794       if($want) {
795           @result = $coderef->(@$args);
796       }
797       elsif(defined $want) {
798           $result[0] = $coderef->(@$args);
799       }
800       else {
801           $coderef->(@$args);
802       }
803
804       my $delta_txn = $txn_start_depth - $self->transaction_depth;
805       if ($delta_txn == 0) {
806         $self->txn_commit;
807       }
808       elsif ($delta_txn != 1) {
809         # an off-by-one would mean we fired a rollback
810         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
811       }
812     } catch {
813       $exception = $_;
814     };
815
816     if(! defined $exception) { return wantarray ? @result : $result[0] }
817
818     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
819       my $rollback_exception;
820       try { $self->txn_rollback } catch { $rollback_exception = shift };
821       if(defined $rollback_exception) {
822         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
823         $self->throw_exception($exception)  # propagate nested rollback
824           if $rollback_exception =~ /$exception_class/;
825
826         $self->throw_exception(
827           "Transaction aborted: ${exception}. "
828           . "Rollback failed: ${rollback_exception}"
829         );
830       }
831       $self->throw_exception($exception)
832     }
833
834     # We were not connected, and was first try - reconnect and retry
835     # via the while loop
836     carp "Retrying $coderef after catching disconnected exception: $exception"
837       if $ENV{DBIC_TXNRETRY_DEBUG};
838     $self->_populate_dbh;
839   }
840 }
841
842 =head2 disconnect
843
844 Our C<disconnect> method also performs a rollback first if the
845 database is not in C<AutoCommit> mode.
846
847 =cut
848
849 sub disconnect {
850   my ($self) = @_;
851
852   if( $self->_dbh ) {
853     my @actions;
854
855     push @actions, ( $self->on_disconnect_call || () );
856     push @actions, $self->_parse_connect_do ('on_disconnect_do');
857
858     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
859
860     $self->_dbh_rollback unless $self->_dbh_autocommit;
861
862     %{ $self->_dbh->{CachedKids} } = ();
863     $self->_dbh->disconnect;
864     $self->_dbh(undef);
865     $self->{_dbh_gen}++;
866   }
867 }
868
869 =head2 with_deferred_fk_checks
870
871 =over 4
872
873 =item Arguments: C<$coderef>
874
875 =item Return Value: The return value of $coderef
876
877 =back
878
879 Storage specific method to run the code ref with FK checks deferred or
880 in MySQL's case disabled entirely.
881
882 =cut
883
884 # Storage subclasses should override this
885 sub with_deferred_fk_checks {
886   my ($self, $sub) = @_;
887   $sub->();
888 }
889
890 =head2 connected
891
892 =over
893
894 =item Arguments: none
895
896 =item Return Value: 1|0
897
898 =back
899
900 Verifies that the current database handle is active and ready to execute
901 an SQL statement (e.g. the connection did not get stale, server is still
902 answering, etc.) This method is used internally by L</dbh>.
903
904 =cut
905
906 sub connected {
907   my $self = shift;
908   return 0 unless $self->_seems_connected;
909
910   #be on the safe side
911   local $self->_dbh->{RaiseError} = 1;
912
913   return $self->_ping;
914 }
915
916 sub _seems_connected {
917   my $self = shift;
918
919   $self->_verify_pid;
920
921   my $dbh = $self->_dbh
922     or return 0;
923
924   return $dbh->FETCH('Active');
925 }
926
927 sub _ping {
928   my $self = shift;
929
930   my $dbh = $self->_dbh or return 0;
931
932   return $dbh->ping;
933 }
934
935 sub ensure_connected {
936   my ($self) = @_;
937
938   unless ($self->connected) {
939     $self->_populate_dbh;
940   }
941 }
942
943 =head2 dbh
944
945 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
946 is guaranteed to be healthy by implicitly calling L</connected>, and if
947 necessary performing a reconnection before returning. Keep in mind that this
948 is very B<expensive> on some database engines. Consider using L</dbh_do>
949 instead.
950
951 =cut
952
953 sub dbh {
954   my ($self) = @_;
955
956   if (not $self->_dbh) {
957     $self->_populate_dbh;
958   } else {
959     $self->ensure_connected;
960   }
961   return $self->_dbh;
962 }
963
964 # this is the internal "get dbh or connect (don't check)" method
965 sub _get_dbh {
966   my $self = shift;
967   $self->_verify_pid;
968   $self->_populate_dbh unless $self->_dbh;
969   return $self->_dbh;
970 }
971
972 sub sql_maker {
973   my ($self) = @_;
974   unless ($self->_sql_maker) {
975     my $sql_maker_class = $self->sql_maker_class;
976     $self->ensure_class_loaded ($sql_maker_class);
977
978     my %opts = %{$self->_sql_maker_opts||{}};
979     my $dialect =
980       $opts{limit_dialect}
981         ||
982       $self->sql_limit_dialect
983         ||
984       do {
985         my $s_class = (ref $self) || $self;
986         carp (
987           "Your storage class ($s_class) does not set sql_limit_dialect and you "
988         . 'have not supplied an explicit limit_dialect in your connection_info. '
989         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
990         . 'databases but can be (and often is) painfully slow.'
991         );
992
993         'GenericSubQ';
994       }
995     ;
996
997     $self->_sql_maker($sql_maker_class->new(
998       bindtype=>'columns',
999       array_datatypes => 1,
1000       limit_dialect => $dialect,
1001       name_sep => '.',
1002       %opts,
1003     ));
1004   }
1005   return $self->_sql_maker;
1006 }
1007
1008 # nothing to do by default
1009 sub _rebless {}
1010 sub _init {}
1011
1012 sub _populate_dbh {
1013   my ($self) = @_;
1014
1015   my @info = @{$self->_dbi_connect_info || []};
1016   $self->_dbh(undef); # in case ->connected failed we might get sent here
1017   $self->_dbh_details({}); # reset everything we know
1018
1019   $self->_dbh($self->_connect(@info));
1020
1021   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1022
1023   $self->_determine_driver;
1024
1025   # Always set the transaction depth on connect, since
1026   #  there is no transaction in progress by definition
1027   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1028
1029   $self->_run_connection_actions unless $self->{_in_determine_driver};
1030 }
1031
1032 sub _run_connection_actions {
1033   my $self = shift;
1034   my @actions;
1035
1036   push @actions, ( $self->on_connect_call || () );
1037   push @actions, $self->_parse_connect_do ('on_connect_do');
1038
1039   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1040 }
1041
1042
1043
1044 sub set_use_dbms_capability {
1045   $_[0]->set_inherited ($_[1], $_[2]);
1046 }
1047
1048 sub get_use_dbms_capability {
1049   my ($self, $capname) = @_;
1050
1051   my $use = $self->get_inherited ($capname);
1052   return defined $use
1053     ? $use
1054     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1055   ;
1056 }
1057
1058 sub set_dbms_capability {
1059   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1060 }
1061
1062 sub get_dbms_capability {
1063   my ($self, $capname) = @_;
1064
1065   my $cap = $self->_dbh_details->{capability}{$capname};
1066
1067   unless (defined $cap) {
1068     if (my $meth = $self->can ("_determine$capname")) {
1069       $cap = $self->$meth ? 1 : 0;
1070     }
1071     else {
1072       $cap = 0;
1073     }
1074
1075     $self->set_dbms_capability ($capname, $cap);
1076   }
1077
1078   return $cap;
1079 }
1080
1081 sub _server_info {
1082   my $self = shift;
1083
1084   my $info;
1085   unless ($info = $self->_dbh_details->{info}) {
1086
1087     $info = {};
1088
1089     my $server_version = try { $self->_get_server_version };
1090
1091     if (defined $server_version) {
1092       $info->{dbms_version} = $server_version;
1093
1094       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1095       my @verparts = split (/\./, $numeric_version);
1096       if (
1097         @verparts
1098           &&
1099         $verparts[0] <= 999
1100       ) {
1101         # consider only up to 3 version parts, iff not more than 3 digits
1102         my @use_parts;
1103         while (@verparts && @use_parts < 3) {
1104           my $p = shift @verparts;
1105           last if $p > 999;
1106           push @use_parts, $p;
1107         }
1108         push @use_parts, 0 while @use_parts < 3;
1109
1110         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1111       }
1112     }
1113
1114     $self->_dbh_details->{info} = $info;
1115   }
1116
1117   return $info;
1118 }
1119
1120 sub _get_server_version {
1121   shift->_get_dbh->get_info(18);
1122 }
1123
1124 sub _determine_driver {
1125   my ($self) = @_;
1126
1127   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1128     my $started_connected = 0;
1129     local $self->{_in_determine_driver} = 1;
1130
1131     if (ref($self) eq __PACKAGE__) {
1132       my $driver;
1133       if ($self->_dbh) { # we are connected
1134         $driver = $self->_dbh->{Driver}{Name};
1135         $started_connected = 1;
1136       } else {
1137         # if connect_info is a CODEREF, we have no choice but to connect
1138         if (ref $self->_dbi_connect_info->[0] &&
1139             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1140           $self->_populate_dbh;
1141           $driver = $self->_dbh->{Driver}{Name};
1142         }
1143         else {
1144           # try to use dsn to not require being connected, the driver may still
1145           # force a connection in _rebless to determine version
1146           # (dsn may not be supplied at all if all we do is make a mock-schema)
1147           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1148           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1149           $driver ||= $ENV{DBI_DRIVER};
1150         }
1151       }
1152
1153       if ($driver) {
1154         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1155         if ($self->load_optional_class($storage_class)) {
1156           mro::set_mro($storage_class, 'c3');
1157           bless $self, $storage_class;
1158           $self->_rebless();
1159         }
1160       }
1161     }
1162
1163     $self->_driver_determined(1);
1164
1165     $self->_init; # run driver-specific initializations
1166
1167     $self->_run_connection_actions
1168         if !$started_connected && defined $self->_dbh;
1169   }
1170 }
1171
1172 sub _do_connection_actions {
1173   my $self          = shift;
1174   my $method_prefix = shift;
1175   my $call          = shift;
1176
1177   if (not ref($call)) {
1178     my $method = $method_prefix . $call;
1179     $self->$method(@_);
1180   } elsif (ref($call) eq 'CODE') {
1181     $self->$call(@_);
1182   } elsif (ref($call) eq 'ARRAY') {
1183     if (ref($call->[0]) ne 'ARRAY') {
1184       $self->_do_connection_actions($method_prefix, $_) for @$call;
1185     } else {
1186       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1187     }
1188   } else {
1189     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1190   }
1191
1192   return $self;
1193 }
1194
1195 sub connect_call_do_sql {
1196   my $self = shift;
1197   $self->_do_query(@_);
1198 }
1199
1200 sub disconnect_call_do_sql {
1201   my $self = shift;
1202   $self->_do_query(@_);
1203 }
1204
1205 # override in db-specific backend when necessary
1206 sub connect_call_datetime_setup { 1 }
1207
1208 sub _do_query {
1209   my ($self, $action) = @_;
1210
1211   if (ref $action eq 'CODE') {
1212     $action = $action->($self);
1213     $self->_do_query($_) foreach @$action;
1214   }
1215   else {
1216     # Most debuggers expect ($sql, @bind), so we need to exclude
1217     # the attribute hash which is the second argument to $dbh->do
1218     # furthermore the bind values are usually to be presented
1219     # as named arrayref pairs, so wrap those here too
1220     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1221     my $sql = shift @do_args;
1222     my $attrs = shift @do_args;
1223     my @bind = map { [ undef, $_ ] } @do_args;
1224
1225     $self->_query_start($sql, @bind);
1226     $self->_get_dbh->do($sql, $attrs, @do_args);
1227     $self->_query_end($sql, @bind);
1228   }
1229
1230   return $self;
1231 }
1232
1233 sub _connect {
1234   my ($self, @info) = @_;
1235
1236   $self->throw_exception("You failed to provide any connection info")
1237     if !@info;
1238
1239   my ($old_connect_via, $dbh);
1240
1241   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1242     $old_connect_via = $DBI::connect_via;
1243     $DBI::connect_via = 'connect';
1244   }
1245
1246   try {
1247     if(ref $info[0] eq 'CODE') {
1248        $dbh = $info[0]->();
1249     }
1250     else {
1251        $dbh = DBI->connect(@info);
1252     }
1253
1254     if (!$dbh) {
1255       die $DBI::errstr;
1256     }
1257
1258     unless ($self->unsafe) {
1259
1260       # this odd anonymous coderef dereference is in fact really
1261       # necessary to avoid the unwanted effect described in perl5
1262       # RT#75792
1263       sub {
1264         my $weak_self = $_[0];
1265         weaken $weak_self;
1266
1267         $_[1]->{HandleError} = sub {
1268           if ($weak_self) {
1269             $weak_self->throw_exception("DBI Exception: $_[0]");
1270           }
1271           else {
1272             # the handler may be invoked by something totally out of
1273             # the scope of DBIC
1274             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1275           }
1276         };
1277       }->($self, $dbh);
1278
1279       $dbh->{ShowErrorStatement} = 1;
1280       $dbh->{RaiseError} = 1;
1281       $dbh->{PrintError} = 0;
1282     }
1283   }
1284   catch {
1285     $self->throw_exception("DBI Connection failed: $_")
1286   }
1287   finally {
1288     $DBI::connect_via = $old_connect_via if $old_connect_via;
1289   };
1290
1291   $self->_dbh_autocommit($dbh->{AutoCommit});
1292   $dbh;
1293 }
1294
1295 sub svp_begin {
1296   my ($self, $name) = @_;
1297
1298   $name = $self->_svp_generate_name
1299     unless defined $name;
1300
1301   $self->throw_exception ("You can't use savepoints outside a transaction")
1302     if $self->{transaction_depth} == 0;
1303
1304   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1305     unless $self->can('_svp_begin');
1306
1307   push @{ $self->{savepoints} }, $name;
1308
1309   $self->debugobj->svp_begin($name) if $self->debug;
1310
1311   return $self->_svp_begin($name);
1312 }
1313
1314 sub svp_release {
1315   my ($self, $name) = @_;
1316
1317   $self->throw_exception ("You can't use savepoints outside a transaction")
1318     if $self->{transaction_depth} == 0;
1319
1320   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1321     unless $self->can('_svp_release');
1322
1323   if (defined $name) {
1324     $self->throw_exception ("Savepoint '$name' does not exist")
1325       unless grep { $_ eq $name } @{ $self->{savepoints} };
1326
1327     # Dig through the stack until we find the one we are releasing.  This keeps
1328     # the stack up to date.
1329     my $svp;
1330
1331     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1332   } else {
1333     $name = pop @{ $self->{savepoints} };
1334   }
1335
1336   $self->debugobj->svp_release($name) if $self->debug;
1337
1338   return $self->_svp_release($name);
1339 }
1340
1341 sub svp_rollback {
1342   my ($self, $name) = @_;
1343
1344   $self->throw_exception ("You can't use savepoints outside a transaction")
1345     if $self->{transaction_depth} == 0;
1346
1347   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1348     unless $self->can('_svp_rollback');
1349
1350   if (defined $name) {
1351       # If they passed us a name, verify that it exists in the stack
1352       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1353           $self->throw_exception("Savepoint '$name' does not exist!");
1354       }
1355
1356       # Dig through the stack until we find the one we are releasing.  This keeps
1357       # the stack up to date.
1358       while(my $s = pop(@{ $self->{savepoints} })) {
1359           last if($s eq $name);
1360       }
1361       # Add the savepoint back to the stack, as a rollback doesn't remove the
1362       # named savepoint, only everything after it.
1363       push(@{ $self->{savepoints} }, $name);
1364   } else {
1365       # We'll assume they want to rollback to the last savepoint
1366       $name = $self->{savepoints}->[-1];
1367   }
1368
1369   $self->debugobj->svp_rollback($name) if $self->debug;
1370
1371   return $self->_svp_rollback($name);
1372 }
1373
1374 sub _svp_generate_name {
1375   my ($self) = @_;
1376   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1377 }
1378
1379 sub txn_begin {
1380   my $self = shift;
1381
1382   # this means we have not yet connected and do not know the AC status
1383   # (e.g. coderef $dbh)
1384   if (! defined $self->_dbh_autocommit) {
1385     $self->ensure_connected;
1386   }
1387   # otherwise re-connect on pid changes, so
1388   # that the txn_depth is adjusted properly
1389   # the lightweight _get_dbh is good enoug here
1390   # (only superficial handle check, no pings)
1391   else {
1392     $self->_get_dbh;
1393   }
1394
1395   if($self->transaction_depth == 0) {
1396     $self->debugobj->txn_begin()
1397       if $self->debug;
1398     $self->_dbh_begin_work;
1399   }
1400   elsif ($self->auto_savepoint) {
1401     $self->svp_begin;
1402   }
1403   $self->{transaction_depth}++;
1404 }
1405
1406 sub _dbh_begin_work {
1407   my $self = shift;
1408
1409   # if the user is utilizing txn_do - good for him, otherwise we need to
1410   # ensure that the $dbh is healthy on BEGIN.
1411   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1412   # will be replaced by a failure of begin_work itself (which will be
1413   # then retried on reconnect)
1414   if ($self->{_in_dbh_do}) {
1415     $self->_dbh->begin_work;
1416   } else {
1417     $self->dbh_do(sub { $_[1]->begin_work });
1418   }
1419 }
1420
1421 sub txn_commit {
1422   my $self = shift;
1423   if (! $self->_dbh) {
1424     $self->throw_exception('cannot COMMIT on a disconnected handle');
1425   }
1426   elsif ($self->{transaction_depth} == 1) {
1427     $self->debugobj->txn_commit()
1428       if ($self->debug);
1429     $self->_dbh_commit;
1430     $self->{transaction_depth} = 0
1431       if $self->_dbh_autocommit;
1432   }
1433   elsif($self->{transaction_depth} > 1) {
1434     $self->{transaction_depth}--;
1435     $self->svp_release
1436       if $self->auto_savepoint;
1437   }
1438   elsif (! $self->_dbh->FETCH('AutoCommit') ) {
1439
1440     carp "Storage transaction_depth $self->{transaction_depth} does not match "
1441         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1442
1443     $self->debugobj->txn_commit()
1444       if ($self->debug);
1445     $self->_dbh_commit;
1446     $self->{transaction_depth} = 0
1447       if $self->_dbh_autocommit;
1448   }
1449   else {
1450     $self->throw_exception( 'Refusing to commit without a started transaction' );
1451   }
1452 }
1453
1454 sub _dbh_commit {
1455   my $self = shift;
1456   my $dbh  = $self->_dbh
1457     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1458   $dbh->commit;
1459 }
1460
1461 sub txn_rollback {
1462   my $self = shift;
1463   my $dbh = $self->_dbh;
1464   try {
1465     if ($self->{transaction_depth} == 1) {
1466       $self->debugobj->txn_rollback()
1467         if ($self->debug);
1468       $self->{transaction_depth} = 0
1469         if $self->_dbh_autocommit;
1470       $self->_dbh_rollback;
1471     }
1472     elsif($self->{transaction_depth} > 1) {
1473       $self->{transaction_depth}--;
1474       if ($self->auto_savepoint) {
1475         $self->svp_rollback;
1476         $self->svp_release;
1477       }
1478     }
1479     else {
1480       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1481     }
1482   }
1483   catch {
1484     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1485
1486     if ($_ !~ /$exception_class/) {
1487       # ensure that a failed rollback resets the transaction depth
1488       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1489     }
1490
1491     $self->throw_exception($_)
1492   };
1493 }
1494
1495 sub _dbh_rollback {
1496   my $self = shift;
1497   my $dbh  = $self->_dbh
1498     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1499   $dbh->rollback;
1500 }
1501
1502 # This used to be the top-half of _execute.  It was split out to make it
1503 #  easier to override in NoBindVars without duping the rest.  It takes up
1504 #  all of _execute's args, and emits $sql, @bind.
1505 sub _prep_for_execute {
1506   my ($self, $op, $extra_bind, $ident, $args) = @_;
1507
1508   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1509     $ident = $ident->from();
1510   }
1511
1512   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1513
1514   unshift(@bind,
1515     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1516       if $extra_bind;
1517   return ($sql, \@bind);
1518 }
1519
1520
1521 sub _fix_bind_params {
1522     my ($self, @bind) = @_;
1523
1524     ### Turn @bind from something like this:
1525     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1526     ### to this:
1527     ###   ( "'1'", "'1'", "'3'" )
1528     return
1529         map {
1530             if ( defined( $_ && $_->[1] ) ) {
1531                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1532             }
1533             else { q{NULL}; }
1534         } @bind;
1535 }
1536
1537 sub _query_start {
1538     my ( $self, $sql, @bind ) = @_;
1539
1540     if ( $self->debug ) {
1541         @bind = $self->_fix_bind_params(@bind);
1542
1543         $self->debugobj->query_start( $sql, @bind );
1544     }
1545 }
1546
1547 sub _query_end {
1548     my ( $self, $sql, @bind ) = @_;
1549
1550     if ( $self->debug ) {
1551         @bind = $self->_fix_bind_params(@bind);
1552         $self->debugobj->query_end( $sql, @bind );
1553     }
1554 }
1555
1556 sub _dbh_execute {
1557   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1558
1559   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1560
1561   $self->_query_start( $sql, @$bind );
1562
1563   my $sth = $self->sth($sql,$op);
1564
1565   my $placeholder_index = 1;
1566
1567   foreach my $bound (@$bind) {
1568     my $attributes = {};
1569     my($column_name, @data) = @$bound;
1570
1571     if ($bind_attributes) {
1572       $attributes = $bind_attributes->{$column_name}
1573       if defined $bind_attributes->{$column_name};
1574     }
1575
1576     foreach my $data (@data) {
1577       my $ref = ref $data;
1578
1579       if ($ref and overload::Method($data, '""') ) {
1580         $data = "$data";
1581       }
1582       elsif ($ref eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1583         $sth->bind_param_inout(
1584           $placeholder_index++,
1585           $data,
1586           $self->_max_column_bytesize($ident, $column_name),
1587           $attributes
1588         );
1589         next;
1590       }
1591
1592       $sth->bind_param($placeholder_index++, $data, $attributes);
1593     }
1594   }
1595
1596   # Can this fail without throwing an exception anyways???
1597   my $rv = $sth->execute();
1598   $self->throw_exception(
1599     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1600   ) if !$rv;
1601
1602   $self->_query_end( $sql, @$bind );
1603
1604   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1605 }
1606
1607 sub _execute {
1608     my $self = shift;
1609     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1610 }
1611
1612 sub _prefetch_autovalues {
1613   my ($self, $source, $to_insert) = @_;
1614
1615   my $colinfo = $source->columns_info;
1616
1617   my %values;
1618   for my $col (keys %$colinfo) {
1619     if (
1620       $colinfo->{$col}{auto_nextval}
1621         and
1622       (
1623         ! exists $to_insert->{$col}
1624           or
1625         ref $to_insert->{$col} eq 'SCALAR'
1626       )
1627     ) {
1628       $values{$col} = $self->_sequence_fetch(
1629         'NEXTVAL',
1630         ( $colinfo->{$col}{sequence} ||=
1631             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1632         ),
1633       );
1634     }
1635   }
1636
1637   \%values;
1638 }
1639
1640 sub insert {
1641   my ($self, $source, $to_insert) = @_;
1642
1643   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1644
1645   # fuse the values
1646   $to_insert = { %$to_insert, %$prefetched_values };
1647
1648   # list of primary keys we try to fetch from the database
1649   # both not-exsists and scalarrefs are considered
1650   my %fetch_pks;
1651   for ($source->primary_columns) {
1652     $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
1653       if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
1654   }
1655
1656   my ($sqla_opts, @ir_container);
1657   if ($self->_use_insert_returning) {
1658
1659     # retain order as declared in the resultsource
1660     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1661       push @{$sqla_opts->{returning}}, $_;
1662       $sqla_opts->{returning_container} = \@ir_container
1663         if $self->_use_insert_returning_bound;
1664     }
1665   }
1666
1667   my $bind_attributes = $self->source_bind_attributes($source);
1668
1669   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1670
1671   my %returned_cols;
1672
1673   if (my $retlist = $sqla_opts->{returning}) {
1674     @ir_container = try {
1675       local $SIG{__WARN__} = sub {};
1676       my @r = $sth->fetchrow_array;
1677       $sth->finish;
1678       @r;
1679     } unless @ir_container;
1680
1681     @returned_cols{@$retlist} = @ir_container if @ir_container;
1682   }
1683
1684   return { %$prefetched_values, %returned_cols };
1685 }
1686
1687
1688 ## Currently it is assumed that all values passed will be "normal", i.e. not
1689 ## scalar refs, or at least, all the same type as the first set, the statement is
1690 ## only prepped once.
1691 sub insert_bulk {
1692   my ($self, $source, $cols, $data) = @_;
1693
1694   my %colvalues;
1695   @colvalues{@$cols} = (0..$#$cols);
1696
1697   for my $i (0..$#$cols) {
1698     my $first_val = $data->[0][$i];
1699     next unless ref $first_val eq 'SCALAR';
1700
1701     $colvalues{ $cols->[$i] } = $first_val;
1702   }
1703
1704   # check for bad data and stringify stringifiable objects
1705   my $bad_slice = sub {
1706     my ($msg, $col_idx, $slice_idx) = @_;
1707     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1708       $msg,
1709       $cols->[$col_idx],
1710       do {
1711         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1712         Dumper {
1713           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1714         },
1715       }
1716     );
1717   };
1718
1719   for my $datum_idx (0..$#$data) {
1720     my $datum = $data->[$datum_idx];
1721
1722     for my $col_idx (0..$#$cols) {
1723       my $val            = $datum->[$col_idx];
1724       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1725       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1726
1727       if ($is_literal_sql) {
1728         if (not ref $val) {
1729           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1730         }
1731         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1732           $bad_slice->("$reftype reference found where literal SQL expected",
1733             $col_idx, $datum_idx);
1734         }
1735         elsif ($$val ne $$sqla_bind){
1736           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1737             $col_idx, $datum_idx);
1738         }
1739       }
1740       elsif (my $reftype = ref $val) {
1741         require overload;
1742         if (overload::Method($val, '""')) {
1743           $datum->[$col_idx] = "".$val;
1744         }
1745         else {
1746           $bad_slice->("$reftype reference found where bind expected",
1747             $col_idx, $datum_idx);
1748         }
1749       }
1750     }
1751   }
1752
1753   my ($sql, $bind) = $self->_prep_for_execute (
1754     'insert', undef, $source, [\%colvalues]
1755   );
1756
1757   if (! @$bind) {
1758     # if the bindlist is empty - make sure all "values" are in fact
1759     # literal scalarrefs. If not the case this means the storage ate
1760     # them away (e.g. the NoBindVars component) and interpolated them
1761     # directly into the SQL. This obviosly can't be good for multi-inserts
1762
1763     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1764       if first { ref $_ ne 'SCALAR' } values %colvalues;
1765   }
1766
1767   # neither _execute_array, nor _execute_inserts_with_no_binds are
1768   # atomic (even if _execute _array is a single call). Thus a safety
1769   # scope guard
1770   my $guard = $self->txn_scope_guard;
1771
1772   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1773   my $sth = $self->sth($sql);
1774   my $rv = do {
1775     if (@$bind) {
1776       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1777       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1778     }
1779     else {
1780       # bind_param_array doesn't work if there are no binds
1781       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1782     }
1783   };
1784
1785   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1786
1787   $guard->commit;
1788
1789   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1790 }
1791
1792 sub _execute_array {
1793   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1794
1795   ## This must be an arrayref, else nothing works!
1796   my $tuple_status = [];
1797
1798   ## Get the bind_attributes, if any exist
1799   my $bind_attributes = $self->source_bind_attributes($source);
1800
1801   ## Bind the values and execute
1802   my $placeholder_index = 1;
1803
1804   foreach my $bound (@$bind) {
1805
1806     my $attributes = {};
1807     my ($column_name, $data_index) = @$bound;
1808
1809     if( $bind_attributes ) {
1810       $attributes = $bind_attributes->{$column_name}
1811       if defined $bind_attributes->{$column_name};
1812     }
1813
1814     my @data = map { $_->[$data_index] } @$data;
1815
1816     $sth->bind_param_array(
1817       $placeholder_index,
1818       [@data],
1819       (%$attributes ?  $attributes : ()),
1820     );
1821     $placeholder_index++;
1822   }
1823
1824   my ($rv, $err);
1825   try {
1826     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1827   }
1828   catch {
1829     $err = shift;
1830   };
1831
1832   # Not all DBDs are create equal. Some throw on error, some return
1833   # an undef $rv, and some set $sth->err - try whatever we can
1834   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1835     ! defined $err
1836       and
1837     ( !defined $rv or $sth->err )
1838   );
1839
1840   # Statement must finish even if there was an exception.
1841   try {
1842     $sth->finish
1843   }
1844   catch {
1845     $err = shift unless defined $err
1846   };
1847
1848   if (defined $err) {
1849     my $i = 0;
1850     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1851
1852     $self->throw_exception("Unexpected populate error: $err")
1853       if ($i > $#$tuple_status);
1854
1855     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1856       ($tuple_status->[$i][1] || $err),
1857       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1858     );
1859   }
1860
1861   return $rv;
1862 }
1863
1864 sub _dbh_execute_array {
1865     my ($self, $sth, $tuple_status, @extra) = @_;
1866
1867     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1868 }
1869
1870 sub _dbh_execute_inserts_with_no_binds {
1871   my ($self, $sth, $count) = @_;
1872
1873   my $err;
1874   try {
1875     my $dbh = $self->_get_dbh;
1876     local $dbh->{RaiseError} = 1;
1877     local $dbh->{PrintError} = 0;
1878
1879     $sth->execute foreach 1..$count;
1880   }
1881   catch {
1882     $err = shift;
1883   };
1884
1885   # Make sure statement is finished even if there was an exception.
1886   try {
1887     $sth->finish
1888   }
1889   catch {
1890     $err = shift unless defined $err;
1891   };
1892
1893   $self->throw_exception($err) if defined $err;
1894
1895   return $count;
1896 }
1897
1898 sub update {
1899   my ($self, $source, @args) = @_;
1900
1901   my $bind_attrs = $self->source_bind_attributes($source);
1902
1903   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1904 }
1905
1906
1907 sub delete {
1908   my ($self, $source, @args) = @_;
1909
1910   my $bind_attrs = $self->source_bind_attributes($source);
1911
1912   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1913 }
1914
1915 # We were sent here because the $rs contains a complex search
1916 # which will require a subquery to select the correct rows
1917 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1918 #
1919 # Generating a single PK column subquery is trivial and supported
1920 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1921 # Look at _multipk_update_delete()
1922 sub _subq_update_delete {
1923   my $self = shift;
1924   my ($rs, $op, $values) = @_;
1925
1926   my $rsrc = $rs->result_source;
1927
1928   # quick check if we got a sane rs on our hands
1929   my @pcols = $rsrc->_pri_cols;
1930
1931   my $sel = $rs->_resolved_attrs->{select};
1932   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1933
1934   if (
1935       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1936         ne
1937       join ("\x00", sort @$sel )
1938   ) {
1939     $self->throw_exception (
1940       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1941     );
1942   }
1943
1944   if (@pcols == 1) {
1945     return $self->$op (
1946       $rsrc,
1947       $op eq 'update' ? $values : (),
1948       { $pcols[0] => { -in => $rs->as_query } },
1949     );
1950   }
1951
1952   else {
1953     return $self->_multipk_update_delete (@_);
1954   }
1955 }
1956
1957 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1958 # resultset update/delete involving subqueries. So by default resort
1959 # to simple (and inefficient) delete_all style per-row opearations,
1960 # while allowing specific storages to override this with a faster
1961 # implementation.
1962 #
1963 sub _multipk_update_delete {
1964   return shift->_per_row_update_delete (@_);
1965 }
1966
1967 # This is the default loop used to delete/update rows for multi PK
1968 # resultsets, and used by mysql exclusively (because it can't do anything
1969 # else).
1970 #
1971 # We do not use $row->$op style queries, because resultset update/delete
1972 # is not expected to cascade (this is what delete_all/update_all is for).
1973 #
1974 # There should be no race conditions as the entire operation is rolled
1975 # in a transaction.
1976 #
1977 sub _per_row_update_delete {
1978   my $self = shift;
1979   my ($rs, $op, $values) = @_;
1980
1981   my $rsrc = $rs->result_source;
1982   my @pcols = $rsrc->_pri_cols;
1983
1984   my $guard = $self->txn_scope_guard;
1985
1986   # emulate the return value of $sth->execute for non-selects
1987   my $row_cnt = '0E0';
1988
1989   my $subrs_cur = $rs->cursor;
1990   my @all_pk = $subrs_cur->all;
1991   for my $pks ( @all_pk) {
1992
1993     my $cond;
1994     for my $i (0.. $#pcols) {
1995       $cond->{$pcols[$i]} = $pks->[$i];
1996     }
1997
1998     $self->$op (
1999       $rsrc,
2000       $op eq 'update' ? $values : (),
2001       $cond,
2002     );
2003
2004     $row_cnt++;
2005   }
2006
2007   $guard->commit;
2008
2009   return $row_cnt;
2010 }
2011
2012 sub _select {
2013   my $self = shift;
2014   $self->_execute($self->_select_args(@_));
2015 }
2016
2017 sub _select_args_to_query {
2018   my $self = shift;
2019
2020   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
2021   #  = $self->_select_args($ident, $select, $cond, $attrs);
2022   my ($op, $bind, $ident, $bind_attrs, @args) =
2023     $self->_select_args(@_);
2024
2025   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2026   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
2027   $prepared_bind ||= [];
2028
2029   return wantarray
2030     ? ($sql, $prepared_bind, $bind_attrs)
2031     : \[ "($sql)", @$prepared_bind ]
2032   ;
2033 }
2034
2035 sub _select_args {
2036   my ($self, $ident, $select, $where, $attrs) = @_;
2037
2038   my $sql_maker = $self->sql_maker;
2039   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2040
2041   $attrs = {
2042     %$attrs,
2043     select => $select,
2044     from => $ident,
2045     where => $where,
2046     $rs_alias && $alias2source->{$rs_alias}
2047       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2048       : ()
2049     ,
2050   };
2051
2052   # calculate bind_attrs before possible $ident mangling
2053   my $bind_attrs = {};
2054   for my $alias (keys %$alias2source) {
2055     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2056     for my $col (keys %$bindtypes) {
2057
2058       my $fqcn = join ('.', $alias, $col);
2059       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2060
2061       # Unqialified column names are nice, but at the same time can be
2062       # rather ambiguous. What we do here is basically go along with
2063       # the loop, adding an unqualified column slot to $bind_attrs,
2064       # alongside the fully qualified name. As soon as we encounter
2065       # another column by that name (which would imply another table)
2066       # we unset the unqualified slot and never add any info to it
2067       # to avoid erroneous type binding. If this happens the users
2068       # only choice will be to fully qualify his column name
2069
2070       if (exists $bind_attrs->{$col}) {
2071         $bind_attrs->{$col} = {};
2072       }
2073       else {
2074         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2075       }
2076     }
2077   }
2078
2079   # Sanity check the attributes (SQLMaker does it too, but
2080   # in case of a software_limit we'll never reach there)
2081   if (defined $attrs->{offset}) {
2082     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2083       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2084   }
2085   $attrs->{offset} ||= 0;
2086
2087   if (defined $attrs->{rows}) {
2088     $self->throw_exception("The rows attribute must be a positive integer if present")
2089       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2090   }
2091   elsif ($attrs->{offset}) {
2092     # MySQL actually recommends this approach.  I cringe.
2093     $attrs->{rows} = $sql_maker->__max_int;
2094   }
2095
2096   my @limit;
2097
2098   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2099   # storage, unless software limit was requested
2100   if (
2101     #limited has_many
2102     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2103        ||
2104     # grouped prefetch (to satisfy group_by == select)
2105     ( $attrs->{group_by}
2106         &&
2107       @{$attrs->{group_by}}
2108         &&
2109       $attrs->{_prefetch_selector_range}
2110     )
2111   ) {
2112     ($ident, $select, $where, $attrs)
2113       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2114   }
2115   elsif (! $attrs->{software_limit} ) {
2116     push @limit, $attrs->{rows}, $attrs->{offset};
2117   }
2118
2119   # try to simplify the joinmap further (prune unreferenced type-single joins)
2120   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2121
2122 ###
2123   # This would be the point to deflate anything found in $where
2124   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2125   # expect a row object. And all we have is a resultsource (it is trivial
2126   # to extract deflator coderefs via $alias2source above).
2127   #
2128   # I don't see a way forward other than changing the way deflators are
2129   # invoked, and that's just bad...
2130 ###
2131
2132   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2133 }
2134
2135 # Returns a counting SELECT for a simple count
2136 # query. Abstracted so that a storage could override
2137 # this to { count => 'firstcol' } or whatever makes
2138 # sense as a performance optimization
2139 sub _count_select {
2140   #my ($self, $source, $rs_attrs) = @_;
2141   return { count => '*' };
2142 }
2143
2144
2145 sub source_bind_attributes {
2146   my ($self, $source) = @_;
2147
2148   my $bind_attributes;
2149
2150   my $colinfo = $source->columns_info;
2151
2152   for my $col (keys %$colinfo) {
2153     if (my $dt = $colinfo->{$col}{data_type} ) {
2154       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2155     }
2156   }
2157
2158   return $bind_attributes;
2159 }
2160
2161 =head2 select
2162
2163 =over 4
2164
2165 =item Arguments: $ident, $select, $condition, $attrs
2166
2167 =back
2168
2169 Handle a SQL select statement.
2170
2171 =cut
2172
2173 sub select {
2174   my $self = shift;
2175   my ($ident, $select, $condition, $attrs) = @_;
2176   return $self->cursor_class->new($self, \@_, $attrs);
2177 }
2178
2179 sub select_single {
2180   my $self = shift;
2181   my ($rv, $sth, @bind) = $self->_select(@_);
2182   my @row = $sth->fetchrow_array;
2183   my @nextrow = $sth->fetchrow_array if @row;
2184   if(@row && @nextrow) {
2185     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2186   }
2187   # Need to call finish() to work round broken DBDs
2188   $sth->finish();
2189   return @row;
2190 }
2191
2192 =head2 sql_limit_dialect
2193
2194 This is an accessor for the default SQL limit dialect used by a particular
2195 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2196 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2197 see L<DBIx::Class::SQLMaker::LimitDialects>.
2198
2199 =head2 sth
2200
2201 =over 4
2202
2203 =item Arguments: $sql
2204
2205 =back
2206
2207 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2208
2209 =cut
2210
2211 sub _dbh_sth {
2212   my ($self, $dbh, $sql) = @_;
2213
2214   # 3 is the if_active parameter which avoids active sth re-use
2215   my $sth = $self->disable_sth_caching
2216     ? $dbh->prepare($sql)
2217     : $dbh->prepare_cached($sql, {}, 3);
2218
2219   # XXX You would think RaiseError would make this impossible,
2220   #  but apparently that's not true :(
2221   $self->throw_exception($dbh->errstr) if !$sth;
2222
2223   $sth;
2224 }
2225
2226 sub sth {
2227   my ($self, $sql) = @_;
2228   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2229 }
2230
2231 sub _dbh_columns_info_for {
2232   my ($self, $dbh, $table) = @_;
2233
2234   if ($dbh->can('column_info')) {
2235     my %result;
2236     my $caught;
2237     try {
2238       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2239       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2240       $sth->execute();
2241       while ( my $info = $sth->fetchrow_hashref() ){
2242         my %column_info;
2243         $column_info{data_type}   = $info->{TYPE_NAME};
2244         $column_info{size}      = $info->{COLUMN_SIZE};
2245         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2246         $column_info{default_value} = $info->{COLUMN_DEF};
2247         my $col_name = $info->{COLUMN_NAME};
2248         $col_name =~ s/^\"(.*)\"$/$1/;
2249
2250         $result{$col_name} = \%column_info;
2251       }
2252     } catch {
2253       $caught = 1;
2254     };
2255     return \%result if !$caught && scalar keys %result;
2256   }
2257
2258   my %result;
2259   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2260   $sth->execute;
2261   my @columns = @{$sth->{NAME_lc}};
2262   for my $i ( 0 .. $#columns ){
2263     my %column_info;
2264     $column_info{data_type} = $sth->{TYPE}->[$i];
2265     $column_info{size} = $sth->{PRECISION}->[$i];
2266     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2267
2268     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2269       $column_info{data_type} = $1;
2270       $column_info{size}    = $2;
2271     }
2272
2273     $result{$columns[$i]} = \%column_info;
2274   }
2275   $sth->finish;
2276
2277   foreach my $col (keys %result) {
2278     my $colinfo = $result{$col};
2279     my $type_num = $colinfo->{data_type};
2280     my $type_name;
2281     if(defined $type_num && $dbh->can('type_info')) {
2282       my $type_info = $dbh->type_info($type_num);
2283       $type_name = $type_info->{TYPE_NAME} if $type_info;
2284       $colinfo->{data_type} = $type_name if $type_name;
2285     }
2286   }
2287
2288   return \%result;
2289 }
2290
2291 sub columns_info_for {
2292   my ($self, $table) = @_;
2293   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2294 }
2295
2296 =head2 last_insert_id
2297
2298 Return the row id of the last insert.
2299
2300 =cut
2301
2302 sub _dbh_last_insert_id {
2303     my ($self, $dbh, $source, $col) = @_;
2304
2305     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2306
2307     return $id if defined $id;
2308
2309     my $class = ref $self;
2310     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2311 }
2312
2313 sub last_insert_id {
2314   my $self = shift;
2315   $self->_dbh_last_insert_id ($self->_dbh, @_);
2316 }
2317
2318 =head2 _native_data_type
2319
2320 =over 4
2321
2322 =item Arguments: $type_name
2323
2324 =back
2325
2326 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2327 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2328 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2329
2330 The default implementation returns C<undef>, implement in your Storage driver if
2331 you need this functionality.
2332
2333 Should map types from other databases to the native RDBMS type, for example
2334 C<VARCHAR2> to C<VARCHAR>.
2335
2336 Types with modifiers should map to the underlying data type. For example,
2337 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2338
2339 Composite types should map to the container type, for example
2340 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2341
2342 =cut
2343
2344 sub _native_data_type {
2345   #my ($self, $data_type) = @_;
2346   return undef
2347 }
2348
2349 # Check if placeholders are supported at all
2350 sub _determine_supports_placeholders {
2351   my $self = shift;
2352   my $dbh  = $self->_get_dbh;
2353
2354   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2355   # but it is inaccurate more often than not
2356   return try {
2357     local $dbh->{PrintError} = 0;
2358     local $dbh->{RaiseError} = 1;
2359     $dbh->do('select ?', {}, 1);
2360     1;
2361   }
2362   catch {
2363     0;
2364   };
2365 }
2366
2367 # Check if placeholders bound to non-string types throw exceptions
2368 #
2369 sub _determine_supports_typeless_placeholders {
2370   my $self = shift;
2371   my $dbh  = $self->_get_dbh;
2372
2373   return try {
2374     local $dbh->{PrintError} = 0;
2375     local $dbh->{RaiseError} = 1;
2376     # this specifically tests a bind that is NOT a string
2377     $dbh->do('select 1 where 1 = ?', {}, 1);
2378     1;
2379   }
2380   catch {
2381     0;
2382   };
2383 }
2384
2385 =head2 sqlt_type
2386
2387 Returns the database driver name.
2388
2389 =cut
2390
2391 sub sqlt_type {
2392   shift->_get_dbh->{Driver}->{Name};
2393 }
2394
2395 =head2 bind_attribute_by_data_type
2396
2397 Given a datatype from column info, returns a database specific bind
2398 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2399 let the database planner just handle it.
2400
2401 Generally only needed for special case column types, like bytea in postgres.
2402
2403 =cut
2404
2405 sub bind_attribute_by_data_type {
2406     return;
2407 }
2408
2409 =head2 is_datatype_numeric
2410
2411 Given a datatype from column_info, returns a boolean value indicating if
2412 the current RDBMS considers it a numeric value. This controls how
2413 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2414 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2415 be performed instead of the usual C<eq>.
2416
2417 =cut
2418
2419 sub is_datatype_numeric {
2420   my ($self, $dt) = @_;
2421
2422   return 0 unless $dt;
2423
2424   return $dt =~ /^ (?:
2425     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2426   ) $/ix;
2427 }
2428
2429
2430 =head2 create_ddl_dir
2431
2432 =over 4
2433
2434 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2435
2436 =back
2437
2438 Creates a SQL file based on the Schema, for each of the specified
2439 database engines in C<\@databases> in the given directory.
2440 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2441
2442 Given a previous version number, this will also create a file containing
2443 the ALTER TABLE statements to transform the previous schema into the
2444 current one. Note that these statements may contain C<DROP TABLE> or
2445 C<DROP COLUMN> statements that can potentially destroy data.
2446
2447 The file names are created using the C<ddl_filename> method below, please
2448 override this method in your schema if you would like a different file
2449 name format. For the ALTER file, the same format is used, replacing
2450 $version in the name with "$preversion-$version".
2451
2452 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2453 The most common value for this would be C<< { add_drop_table => 1 } >>
2454 to have the SQL produced include a C<DROP TABLE> statement for each table
2455 created. For quoting purposes supply C<quote_table_names> and
2456 C<quote_field_names>.
2457
2458 If no arguments are passed, then the following default values are assumed:
2459
2460 =over 4
2461
2462 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2463
2464 =item version    - $schema->schema_version
2465
2466 =item directory  - './'
2467
2468 =item preversion - <none>
2469
2470 =back
2471
2472 By default, C<\%sqlt_args> will have
2473
2474  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2475
2476 merged with the hash passed in. To disable any of those features, pass in a
2477 hashref like the following
2478
2479  { ignore_constraint_names => 0, # ... other options }
2480
2481
2482 WARNING: You are strongly advised to check all SQL files created, before applying
2483 them.
2484
2485 =cut
2486
2487 sub create_ddl_dir {
2488   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2489
2490   unless ($dir) {
2491     carp "No directory given, using ./\n";
2492     $dir = './';
2493   } else {
2494       -d $dir
2495         or
2496       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2497         or
2498       $self->throw_exception(
2499         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2500       );
2501   }
2502
2503   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2504
2505   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2506   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2507
2508   my $schema_version = $schema->schema_version || '1.x';
2509   $version ||= $schema_version;
2510
2511   $sqltargs = {
2512     add_drop_table => 1,
2513     ignore_constraint_names => 1,
2514     ignore_index_names => 1,
2515     %{$sqltargs || {}}
2516   };
2517
2518   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2519     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2520   }
2521
2522   my $sqlt = SQL::Translator->new( $sqltargs );
2523
2524   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2525   my $sqlt_schema = $sqlt->translate({ data => $schema })
2526     or $self->throw_exception ($sqlt->error);
2527
2528   foreach my $db (@$databases) {
2529     $sqlt->reset();
2530     $sqlt->{schema} = $sqlt_schema;
2531     $sqlt->producer($db);
2532
2533     my $file;
2534     my $filename = $schema->ddl_filename($db, $version, $dir);
2535     if (-e $filename && ($version eq $schema_version )) {
2536       # if we are dumping the current version, overwrite the DDL
2537       carp "Overwriting existing DDL file - $filename";
2538       unlink($filename);
2539     }
2540
2541     my $output = $sqlt->translate;
2542     if(!$output) {
2543       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2544       next;
2545     }
2546     if(!open($file, ">$filename")) {
2547       $self->throw_exception("Can't open $filename for writing ($!)");
2548       next;
2549     }
2550     print $file $output;
2551     close($file);
2552
2553     next unless ($preversion);
2554
2555     require SQL::Translator::Diff;
2556
2557     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2558     if(!-e $prefilename) {
2559       carp("No previous schema file found ($prefilename)");
2560       next;
2561     }
2562
2563     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2564     if(-e $difffile) {
2565       carp("Overwriting existing diff file - $difffile");
2566       unlink($difffile);
2567     }
2568
2569     my $source_schema;
2570     {
2571       my $t = SQL::Translator->new($sqltargs);
2572       $t->debug( 0 );
2573       $t->trace( 0 );
2574
2575       $t->parser( $db )
2576         or $self->throw_exception ($t->error);
2577
2578       my $out = $t->translate( $prefilename )
2579         or $self->throw_exception ($t->error);
2580
2581       $source_schema = $t->schema;
2582
2583       $source_schema->name( $prefilename )
2584         unless ( $source_schema->name );
2585     }
2586
2587     # The "new" style of producers have sane normalization and can support
2588     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2589     # And we have to diff parsed SQL against parsed SQL.
2590     my $dest_schema = $sqlt_schema;
2591
2592     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2593       my $t = SQL::Translator->new($sqltargs);
2594       $t->debug( 0 );
2595       $t->trace( 0 );
2596
2597       $t->parser( $db )
2598         or $self->throw_exception ($t->error);
2599
2600       my $out = $t->translate( $filename )
2601         or $self->throw_exception ($t->error);
2602
2603       $dest_schema = $t->schema;
2604
2605       $dest_schema->name( $filename )
2606         unless $dest_schema->name;
2607     }
2608
2609     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2610                                                   $dest_schema,   $db,
2611                                                   $sqltargs
2612                                                  );
2613     if(!open $file, ">$difffile") {
2614       $self->throw_exception("Can't write to $difffile ($!)");
2615       next;
2616     }
2617     print $file $diff;
2618     close($file);
2619   }
2620 }
2621
2622 =head2 deployment_statements
2623
2624 =over 4
2625
2626 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2627
2628 =back
2629
2630 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2631
2632 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2633 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2634
2635 C<$directory> is used to return statements from files in a previously created
2636 L</create_ddl_dir> directory and is optional. The filenames are constructed
2637 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2638
2639 If no C<$directory> is specified then the statements are constructed on the
2640 fly using L<SQL::Translator> and C<$version> is ignored.
2641
2642 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2643
2644 =cut
2645
2646 sub deployment_statements {
2647   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2648   $type ||= $self->sqlt_type;
2649   $version ||= $schema->schema_version || '1.x';
2650   $dir ||= './';
2651   my $filename = $schema->ddl_filename($type, $version, $dir);
2652   if(-f $filename)
2653   {
2654       my $file;
2655       open($file, "<$filename")
2656         or $self->throw_exception("Can't open $filename ($!)");
2657       my @rows = <$file>;
2658       close($file);
2659       return join('', @rows);
2660   }
2661
2662   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2663     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2664   }
2665
2666   # sources needs to be a parser arg, but for simplicty allow at top level
2667   # coming in
2668   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2669       if exists $sqltargs->{sources};
2670
2671   my $tr = SQL::Translator->new(
2672     producer => "SQL::Translator::Producer::${type}",
2673     %$sqltargs,
2674     parser => 'SQL::Translator::Parser::DBIx::Class',
2675     data => $schema,
2676   );
2677
2678   my @ret;
2679   if (wantarray) {
2680     @ret = $tr->translate;
2681   }
2682   else {
2683     $ret[0] = $tr->translate;
2684   }
2685
2686   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2687     unless (@ret && defined $ret[0]);
2688
2689   return wantarray ? @ret : $ret[0];
2690 }
2691
2692 sub deploy {
2693   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2694   my $deploy = sub {
2695     my $line = shift;
2696     return if($line =~ /^--/);
2697     return if(!$line);
2698     # next if($line =~ /^DROP/m);
2699     return if($line =~ /^BEGIN TRANSACTION/m);
2700     return if($line =~ /^COMMIT/m);
2701     return if $line =~ /^\s+$/; # skip whitespace only
2702     $self->_query_start($line);
2703     try {
2704       # do a dbh_do cycle here, as we need some error checking in
2705       # place (even though we will ignore errors)
2706       $self->dbh_do (sub { $_[1]->do($line) });
2707     } catch {
2708       carp qq{$_ (running "${line}")};
2709     };
2710     $self->_query_end($line);
2711   };
2712   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2713   if (@statements > 1) {
2714     foreach my $statement (@statements) {
2715       $deploy->( $statement );
2716     }
2717   }
2718   elsif (@statements == 1) {
2719     foreach my $line ( split(";\n", $statements[0])) {
2720       $deploy->( $line );
2721     }
2722   }
2723 }
2724
2725 =head2 datetime_parser
2726
2727 Returns the datetime parser class
2728
2729 =cut
2730
2731 sub datetime_parser {
2732   my $self = shift;
2733   return $self->{datetime_parser} ||= do {
2734     $self->build_datetime_parser(@_);
2735   };
2736 }
2737
2738 =head2 datetime_parser_type
2739
2740 Defines (returns) the datetime parser class - currently hardwired to
2741 L<DateTime::Format::MySQL>
2742
2743 =cut
2744
2745 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2746
2747 =head2 build_datetime_parser
2748
2749 See L</datetime_parser>
2750
2751 =cut
2752
2753 sub build_datetime_parser {
2754   my $self = shift;
2755   my $type = $self->datetime_parser_type(@_);
2756   $self->ensure_class_loaded ($type);
2757   return $type;
2758 }
2759
2760
2761 =head2 is_replicating
2762
2763 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2764 replicate from a master database.  Default is undef, which is the result
2765 returned by databases that don't support replication.
2766
2767 =cut
2768
2769 sub is_replicating {
2770     return;
2771
2772 }
2773
2774 =head2 lag_behind_master
2775
2776 Returns a number that represents a certain amount of lag behind a master db
2777 when a given storage is replicating.  The number is database dependent, but
2778 starts at zero and increases with the amount of lag. Default in undef
2779
2780 =cut
2781
2782 sub lag_behind_master {
2783     return;
2784 }
2785
2786 =head2 relname_to_table_alias
2787
2788 =over 4
2789
2790 =item Arguments: $relname, $join_count
2791
2792 =back
2793
2794 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2795 queries.
2796
2797 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2798 way these aliases are named.
2799
2800 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2801 otherwise C<"$relname">.
2802
2803 =cut
2804
2805 sub relname_to_table_alias {
2806   my ($self, $relname, $join_count) = @_;
2807
2808   my $alias = ($join_count && $join_count > 1 ?
2809     join('_', $relname, $join_count) : $relname);
2810
2811   return $alias;
2812 }
2813
2814 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2815 # version and it may be necessary to amend or override it for a specific storage
2816 # if such binds are necessary.
2817 sub _max_column_bytesize {
2818   my ($self, $source, $col) = @_;
2819
2820   my $inf = $source->column_info($col);
2821   return $inf->{_max_bytesize} ||= do {
2822
2823     my $max_size;
2824
2825     if (my $data_type = $inf->{data_type}) {
2826       $data_type = lc($data_type);
2827
2828       # String/sized-binary types
2829       if ($data_type =~ /^(?:l?(?:var)?char(?:acter)?(?:\s*varying)?
2830                              |(?:var)?binary(?:\s*varying)?|raw)\b/x
2831       ) {
2832         $max_size = $inf->{size};
2833       }
2834       # Other charset/unicode types, assume scale of 4
2835       elsif ($data_type =~ /^(?:national\s*character(?:\s*varying)?|nchar
2836                               |univarchar
2837                               |nvarchar)\b/x
2838       ) {
2839         $max_size = $inf->{size} * 4 if $inf->{size};
2840       }
2841       # Blob types
2842       elsif ($self->_is_lob_type($data_type)) {
2843         # default to longreadlen
2844       }
2845       else {
2846         $max_size = 100;  # for all other (numeric?) datatypes
2847       }
2848     }
2849
2850     $max_size ||= $self->_get_dbh->{LongReadLen} || 8000;
2851   };
2852 }
2853
2854 # Determine if a data_type is some type of BLOB
2855 sub _is_lob_type {
2856   my ($self, $data_type) = @_;
2857   $data_type && ($data_type =~ /(?:lob|bfile|text|image|bytea|memo)/i
2858     || $data_type =~ /^long(?:\s*(?:raw|bit\s*varying|varbit|binary
2859                                   |varchar|character\s*varying|nvarchar
2860                                   |national\s*character\s*varying))?$/xi);
2861 }
2862
2863 1;
2864
2865 =head1 USAGE NOTES
2866
2867 =head2 DBIx::Class and AutoCommit
2868
2869 DBIx::Class can do some wonderful magic with handling exceptions,
2870 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2871 (the default) combined with C<txn_do> for transaction support.
2872
2873 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2874 in an assumed transaction between commits, and you're telling us you'd
2875 like to manage that manually.  A lot of the magic protections offered by
2876 this module will go away.  We can't protect you from exceptions due to database
2877 disconnects because we don't know anything about how to restart your
2878 transactions.  You're on your own for handling all sorts of exceptional
2879 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2880 be with raw DBI.
2881
2882
2883 =head1 AUTHORS
2884
2885 Matt S. Trout <mst@shadowcatsystems.co.uk>
2886
2887 Andy Grundman <andy@hybridized.org>
2888
2889 =head1 LICENSE
2890
2891 You may distribute this code under the same terms as Perl itself.
2892
2893 =cut