Don't quote NULL for debug output
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders join_optimizer/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
60
61 # on by default, not strictly a capability (pending rewrite)
62 __PACKAGE__->_use_join_optimizer (1);
63 sub _determine_supports_join_optimizer { 1 };
64
65 # Each of these methods need _determine_driver called before itself
66 # in order to function reliably. This is a purely DRY optimization
67 #
68 # get_(use)_dbms_capability need to be called on the correct Storage
69 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
70 # _determine_supports_X which obv. needs a correct driver as well
71 my @rdbms_specific_methods = qw/
72   deployment_statements
73   sqlt_type
74   sql_maker
75   build_datetime_parser
76   datetime_parser_type
77
78   insert
79   insert_bulk
80   update
81   delete
82   select
83   select_single
84
85   get_use_dbms_capability
86   get_dbms_capability
87
88   _server_info
89   _get_server_version
90 /;
91
92 for my $meth (@rdbms_specific_methods) {
93
94   my $orig = __PACKAGE__->can ($meth)
95     or die "$meth is not a ::Storage::DBI method!";
96
97   no strict qw/refs/;
98   no warnings qw/redefine/;
99   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
100     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
101       $_[0]->_determine_driver;
102
103       # This for some reason crashes and burns on perl 5.8.1
104       # IFF the method ends up throwing an exception
105       #goto $_[0]->can ($meth);
106
107       my $cref = $_[0]->can ($meth);
108       goto $cref;
109     }
110     goto $orig;
111   };
112 }
113
114
115 =head1 NAME
116
117 DBIx::Class::Storage::DBI - DBI storage handler
118
119 =head1 SYNOPSIS
120
121   my $schema = MySchema->connect('dbi:SQLite:my.db');
122
123   $schema->storage->debug(1);
124
125   my @stuff = $schema->storage->dbh_do(
126     sub {
127       my ($storage, $dbh, @args) = @_;
128       $dbh->do("DROP TABLE authors");
129     },
130     @column_list
131   );
132
133   $schema->resultset('Book')->search({
134      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
135   });
136
137 =head1 DESCRIPTION
138
139 This class represents the connection to an RDBMS via L<DBI>.  See
140 L<DBIx::Class::Storage> for general information.  This pod only
141 documents DBI-specific methods and behaviors.
142
143 =head1 METHODS
144
145 =cut
146
147 sub new {
148   my $new = shift->next::method(@_);
149
150   $new->transaction_depth(0);
151   $new->_sql_maker_opts({});
152   $new->_dbh_details({});
153   $new->{savepoints} = [];
154   $new->{_in_dbh_do} = 0;
155   $new->{_dbh_gen} = 0;
156
157   # read below to see what this does
158   $new->_arm_global_destructor;
159
160   $new;
161 }
162
163 # This is hack to work around perl shooting stuff in random
164 # order on exit(). If we do not walk the remaining storage
165 # objects in an END block, there is a *small but real* chance
166 # of a fork()ed child to kill the parent's shared DBI handle,
167 # *before perl reaches the DESTROY in this package*
168 # Yes, it is ugly and effective.
169 {
170   my %seek_and_destroy;
171
172   sub _arm_global_destructor {
173     my $self = shift;
174     my $key = Scalar::Util::refaddr ($self);
175     $seek_and_destroy{$key} = $self;
176     Scalar::Util::weaken ($seek_and_destroy{$key});
177   }
178
179   END {
180     local $?; # just in case the DBI destructor changes it somehow
181
182     # destroy just the object if not native to this process/thread
183     $_->_preserve_foreign_dbh for (grep
184       { defined $_ }
185       values %seek_and_destroy
186     );
187   }
188 }
189
190 sub DESTROY {
191   my $self = shift;
192
193   # some databases spew warnings on implicit disconnect
194   local $SIG{__WARN__} = sub {};
195   $self->_dbh(undef);
196 }
197
198 sub _preserve_foreign_dbh {
199   my $self = shift;
200
201   return unless $self->_dbh;
202
203   $self->_verify_tid;
204
205   return unless $self->_dbh;
206
207   $self->_verify_pid;
208
209 }
210
211 # handle pid changes correctly - do not destroy parent's connection
212 sub _verify_pid {
213   my $self = shift;
214
215   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
216
217   $self->_dbh->{InactiveDestroy} = 1;
218   $self->_dbh(undef);
219   $self->{_dbh_gen}++;
220
221   return;
222 }
223
224 # very similar to above, but seems to FAIL if I set InactiveDestroy
225 sub _verify_tid {
226   my $self = shift;
227
228   if ( ! defined $self->_conn_tid ) {
229     return; # no threads
230   }
231   elsif ( $self->_conn_tid == threads->tid ) {
232     return; # same thread
233   }
234
235   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
236   $self->_dbh(undef);
237   $self->{_dbh_gen}++;
238
239   return;
240 }
241
242
243 =head2 connect_info
244
245 This method is normally called by L<DBIx::Class::Schema/connection>, which
246 encapsulates its argument list in an arrayref before passing them here.
247
248 The argument list may contain:
249
250 =over
251
252 =item *
253
254 The same 4-element argument set one would normally pass to
255 L<DBI/connect>, optionally followed by
256 L<extra attributes|/DBIx::Class specific connection attributes>
257 recognized by DBIx::Class:
258
259   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
260
261 =item *
262
263 A single code reference which returns a connected
264 L<DBI database handle|DBI/connect> optionally followed by
265 L<extra attributes|/DBIx::Class specific connection attributes> recognized
266 by DBIx::Class:
267
268   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
269
270 =item *
271
272 A single hashref with all the attributes and the dsn/user/password
273 mixed together:
274
275   $connect_info_args = [{
276     dsn => $dsn,
277     user => $user,
278     password => $pass,
279     %dbi_attributes,
280     %extra_attributes,
281   }];
282
283   $connect_info_args = [{
284     dbh_maker => sub { DBI->connect (...) },
285     %dbi_attributes,
286     %extra_attributes,
287   }];
288
289 This is particularly useful for L<Catalyst> based applications, allowing the
290 following config (L<Config::General> style):
291
292   <Model::DB>
293     schema_class   App::DB
294     <connect_info>
295       dsn          dbi:mysql:database=test
296       user         testuser
297       password     TestPass
298       AutoCommit   1
299     </connect_info>
300   </Model::DB>
301
302 The C<dsn>/C<user>/C<password> combination can be substituted by the
303 C<dbh_maker> key whose value is a coderef that returns a connected
304 L<DBI database handle|DBI/connect>
305
306 =back
307
308 Please note that the L<DBI> docs recommend that you always explicitly
309 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
310 recommends that it be set to I<1>, and that you perform transactions
311 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
312 to I<1> if you do not do explicitly set it to zero.  This is the default
313 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
314
315 =head3 DBIx::Class specific connection attributes
316
317 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
318 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
319 the following connection options. These options can be mixed in with your other
320 L<DBI> connection attributes, or placed in a separate hashref
321 (C<\%extra_attributes>) as shown above.
322
323 Every time C<connect_info> is invoked, any previous settings for
324 these options will be cleared before setting the new ones, regardless of
325 whether any options are specified in the new C<connect_info>.
326
327
328 =over
329
330 =item on_connect_do
331
332 Specifies things to do immediately after connecting or re-connecting to
333 the database.  Its value may contain:
334
335 =over
336
337 =item a scalar
338
339 This contains one SQL statement to execute.
340
341 =item an array reference
342
343 This contains SQL statements to execute in order.  Each element contains
344 a string or a code reference that returns a string.
345
346 =item a code reference
347
348 This contains some code to execute.  Unlike code references within an
349 array reference, its return value is ignored.
350
351 =back
352
353 =item on_disconnect_do
354
355 Takes arguments in the same form as L</on_connect_do> and executes them
356 immediately before disconnecting from the database.
357
358 Note, this only runs if you explicitly call L</disconnect> on the
359 storage object.
360
361 =item on_connect_call
362
363 A more generalized form of L</on_connect_do> that calls the specified
364 C<connect_call_METHOD> methods in your storage driver.
365
366   on_connect_do => 'select 1'
367
368 is equivalent to:
369
370   on_connect_call => [ [ do_sql => 'select 1' ] ]
371
372 Its values may contain:
373
374 =over
375
376 =item a scalar
377
378 Will call the C<connect_call_METHOD> method.
379
380 =item a code reference
381
382 Will execute C<< $code->($storage) >>
383
384 =item an array reference
385
386 Each value can be a method name or code reference.
387
388 =item an array of arrays
389
390 For each array, the first item is taken to be the C<connect_call_> method name
391 or code reference, and the rest are parameters to it.
392
393 =back
394
395 Some predefined storage methods you may use:
396
397 =over
398
399 =item do_sql
400
401 Executes a SQL string or a code reference that returns a SQL string. This is
402 what L</on_connect_do> and L</on_disconnect_do> use.
403
404 It can take:
405
406 =over
407
408 =item a scalar
409
410 Will execute the scalar as SQL.
411
412 =item an arrayref
413
414 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
415 attributes hashref and bind values.
416
417 =item a code reference
418
419 Will execute C<< $code->($storage) >> and execute the return array refs as
420 above.
421
422 =back
423
424 =item datetime_setup
425
426 Execute any statements necessary to initialize the database session to return
427 and accept datetime/timestamp values used with
428 L<DBIx::Class::InflateColumn::DateTime>.
429
430 Only necessary for some databases, see your specific storage driver for
431 implementation details.
432
433 =back
434
435 =item on_disconnect_call
436
437 Takes arguments in the same form as L</on_connect_call> and executes them
438 immediately before disconnecting from the database.
439
440 Calls the C<disconnect_call_METHOD> methods as opposed to the
441 C<connect_call_METHOD> methods called by L</on_connect_call>.
442
443 Note, this only runs if you explicitly call L</disconnect> on the
444 storage object.
445
446 =item disable_sth_caching
447
448 If set to a true value, this option will disable the caching of
449 statement handles via L<DBI/prepare_cached>.
450
451 =item limit_dialect
452
453 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
454 default L</sql_limit_dialect> setting of the storage (if any). For a list
455 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
456
457 =item quote_char
458
459 Specifies what characters to use to quote table and column names.
460
461 C<quote_char> expects either a single character, in which case is it
462 is placed on either side of the table/column name, or an arrayref of length
463 2 in which case the table/column name is placed between the elements.
464
465 For example under MySQL you should use C<< quote_char => '`' >>, and for
466 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
467
468 =item name_sep
469
470 This parameter is only useful in conjunction with C<quote_char>, and is used to
471 specify the character that separates elements (schemas, tables, columns) from
472 each other. If unspecified it defaults to the most commonly used C<.>.
473
474 =item unsafe
475
476 This Storage driver normally installs its own C<HandleError>, sets
477 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
478 all database handles, including those supplied by a coderef.  It does this
479 so that it can have consistent and useful error behavior.
480
481 If you set this option to a true value, Storage will not do its usual
482 modifications to the database handle's attributes, and instead relies on
483 the settings in your connect_info DBI options (or the values you set in
484 your connection coderef, in the case that you are connecting via coderef).
485
486 Note that your custom settings can cause Storage to malfunction,
487 especially if you set a C<HandleError> handler that suppresses exceptions
488 and/or disable C<RaiseError>.
489
490 =item auto_savepoint
491
492 If this option is true, L<DBIx::Class> will use savepoints when nesting
493 transactions, making it possible to recover from failure in the inner
494 transaction without having to abort all outer transactions.
495
496 =item cursor_class
497
498 Use this argument to supply a cursor class other than the default
499 L<DBIx::Class::Storage::DBI::Cursor>.
500
501 =back
502
503 Some real-life examples of arguments to L</connect_info> and
504 L<DBIx::Class::Schema/connect>
505
506   # Simple SQLite connection
507   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
508
509   # Connect via subref
510   ->connect_info([ sub { DBI->connect(...) } ]);
511
512   # Connect via subref in hashref
513   ->connect_info([{
514     dbh_maker => sub { DBI->connect(...) },
515     on_connect_do => 'alter session ...',
516   }]);
517
518   # A bit more complicated
519   ->connect_info(
520     [
521       'dbi:Pg:dbname=foo',
522       'postgres',
523       'my_pg_password',
524       { AutoCommit => 1 },
525       { quote_char => q{"} },
526     ]
527   );
528
529   # Equivalent to the previous example
530   ->connect_info(
531     [
532       'dbi:Pg:dbname=foo',
533       'postgres',
534       'my_pg_password',
535       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
536     ]
537   );
538
539   # Same, but with hashref as argument
540   # See parse_connect_info for explanation
541   ->connect_info(
542     [{
543       dsn         => 'dbi:Pg:dbname=foo',
544       user        => 'postgres',
545       password    => 'my_pg_password',
546       AutoCommit  => 1,
547       quote_char  => q{"},
548       name_sep    => q{.},
549     }]
550   );
551
552   # Subref + DBIx::Class-specific connection options
553   ->connect_info(
554     [
555       sub { DBI->connect(...) },
556       {
557           quote_char => q{`},
558           name_sep => q{@},
559           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
560           disable_sth_caching => 1,
561       },
562     ]
563   );
564
565
566
567 =cut
568
569 sub connect_info {
570   my ($self, $info) = @_;
571
572   return $self->_connect_info if !$info;
573
574   $self->_connect_info($info); # copy for _connect_info
575
576   $info = $self->_normalize_connect_info($info)
577     if ref $info eq 'ARRAY';
578
579   for my $storage_opt (keys %{ $info->{storage_options} }) {
580     my $value = $info->{storage_options}{$storage_opt};
581
582     $self->$storage_opt($value);
583   }
584
585   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
586   #  the new set of options
587   $self->_sql_maker(undef);
588   $self->_sql_maker_opts({});
589
590   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
591     my $value = $info->{sql_maker_options}{$sql_maker_opt};
592
593     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
594   }
595
596   my %attrs = (
597     %{ $self->_default_dbi_connect_attributes || {} },
598     %{ $info->{attributes} || {} },
599   );
600
601   my @args = @{ $info->{arguments} };
602
603   $self->_dbi_connect_info([@args,
604     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
605
606   # FIXME - dirty:
607   # save attributes them in a separate accessor so they are always
608   # introspectable, even in case of a CODE $dbhmaker
609   $self->_dbic_connect_attributes (\%attrs);
610
611   return $self->_connect_info;
612 }
613
614 sub _normalize_connect_info {
615   my ($self, $info_arg) = @_;
616   my %info;
617
618   my @args = @$info_arg;  # take a shallow copy for further mutilation
619
620   # combine/pre-parse arguments depending on invocation style
621
622   my %attrs;
623   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
624     %attrs = %{ $args[1] || {} };
625     @args = $args[0];
626   }
627   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
628     %attrs = %{$args[0]};
629     @args = ();
630     if (my $code = delete $attrs{dbh_maker}) {
631       @args = $code;
632
633       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
634       if (@ignored) {
635         carp sprintf (
636             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
637           . "to the result of 'dbh_maker'",
638
639           join (', ', map { "'$_'" } (@ignored) ),
640         );
641       }
642     }
643     else {
644       @args = delete @attrs{qw/dsn user password/};
645     }
646   }
647   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
648     %attrs = (
649       % { $args[3] || {} },
650       % { $args[4] || {} },
651     );
652     @args = @args[0,1,2];
653   }
654
655   $info{arguments} = \@args;
656
657   my @storage_opts = grep exists $attrs{$_},
658     @storage_options, 'cursor_class';
659
660   @{ $info{storage_options} }{@storage_opts} =
661     delete @attrs{@storage_opts} if @storage_opts;
662
663   my @sql_maker_opts = grep exists $attrs{$_},
664     qw/limit_dialect quote_char name_sep/;
665
666   @{ $info{sql_maker_options} }{@sql_maker_opts} =
667     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
668
669   $info{attributes} = \%attrs if %attrs;
670
671   return \%info;
672 }
673
674 sub _default_dbi_connect_attributes {
675   return {
676     AutoCommit => 1,
677     RaiseError => 1,
678     PrintError => 0,
679   };
680 }
681
682 =head2 on_connect_do
683
684 This method is deprecated in favour of setting via L</connect_info>.
685
686 =cut
687
688 =head2 on_disconnect_do
689
690 This method is deprecated in favour of setting via L</connect_info>.
691
692 =cut
693
694 sub _parse_connect_do {
695   my ($self, $type) = @_;
696
697   my $val = $self->$type;
698   return () if not defined $val;
699
700   my @res;
701
702   if (not ref($val)) {
703     push @res, [ 'do_sql', $val ];
704   } elsif (ref($val) eq 'CODE') {
705     push @res, $val;
706   } elsif (ref($val) eq 'ARRAY') {
707     push @res, map { [ 'do_sql', $_ ] } @$val;
708   } else {
709     $self->throw_exception("Invalid type for $type: ".ref($val));
710   }
711
712   return \@res;
713 }
714
715 =head2 dbh_do
716
717 Arguments: ($subref | $method_name), @extra_coderef_args?
718
719 Execute the given $subref or $method_name using the new exception-based
720 connection management.
721
722 The first two arguments will be the storage object that C<dbh_do> was called
723 on and a database handle to use.  Any additional arguments will be passed
724 verbatim to the called subref as arguments 2 and onwards.
725
726 Using this (instead of $self->_dbh or $self->dbh) ensures correct
727 exception handling and reconnection (or failover in future subclasses).
728
729 Your subref should have no side-effects outside of the database, as
730 there is the potential for your subref to be partially double-executed
731 if the database connection was stale/dysfunctional.
732
733 Example:
734
735   my @stuff = $schema->storage->dbh_do(
736     sub {
737       my ($storage, $dbh, @cols) = @_;
738       my $cols = join(q{, }, @cols);
739       $dbh->selectrow_array("SELECT $cols FROM foo");
740     },
741     @column_list
742   );
743
744 =cut
745
746 sub dbh_do {
747   my $self = shift;
748   my $code = shift;
749
750   my $dbh = $self->_get_dbh;
751
752   return $self->$code($dbh, @_)
753     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
754
755   local $self->{_in_dbh_do} = 1;
756
757   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
758   my $args = \@_;
759   return try {
760     $self->$code ($dbh, @$args);
761   } catch {
762     $self->throw_exception($_) if $self->connected;
763
764     # We were not connected - reconnect and retry, but let any
765     #  exception fall right through this time
766     carp "Retrying $code after catching disconnected exception: $_"
767       if $ENV{DBIC_DBIRETRY_DEBUG};
768
769     $self->_populate_dbh;
770     $self->$code($self->_dbh, @$args);
771   };
772 }
773
774 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
775 # It also informs dbh_do to bypass itself while under the direction of txn_do,
776 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
777 sub txn_do {
778   my $self = shift;
779   my $coderef = shift;
780
781   ref $coderef eq 'CODE' or $self->throw_exception
782     ('$coderef must be a CODE reference');
783
784   local $self->{_in_dbh_do} = 1;
785
786   my @result;
787   my $want_array = wantarray;
788
789   my $tried = 0;
790   while(1) {
791     my $exception;
792
793     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
794     my $args = \@_;
795
796     try {
797       $self->txn_begin;
798       my $txn_start_depth = $self->transaction_depth;
799       if($want_array) {
800           @result = $coderef->(@$args);
801       }
802       elsif(defined $want_array) {
803           $result[0] = $coderef->(@$args);
804       }
805       else {
806           $coderef->(@$args);
807       }
808
809       my $delta_txn = $txn_start_depth - $self->transaction_depth;
810       if ($delta_txn == 0) {
811         $self->txn_commit;
812       }
813       elsif ($delta_txn != 1) {
814         # an off-by-one would mean we fired a rollback
815         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
816       }
817     } catch {
818       $exception = $_;
819     };
820
821     if(! defined $exception) { return $want_array ? @result : $result[0] }
822
823     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
824       my $rollback_exception;
825       try { $self->txn_rollback } catch { $rollback_exception = shift };
826       if(defined $rollback_exception) {
827         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
828         $self->throw_exception($exception)  # propagate nested rollback
829           if $rollback_exception =~ /$exception_class/;
830
831         $self->throw_exception(
832           "Transaction aborted: ${exception}. "
833           . "Rollback failed: ${rollback_exception}"
834         );
835       }
836       $self->throw_exception($exception)
837     }
838
839     # We were not connected, and was first try - reconnect and retry
840     # via the while loop
841     carp "Retrying $coderef after catching disconnected exception: $exception"
842       if $ENV{DBIC_TXNRETRY_DEBUG};
843     $self->_populate_dbh;
844   }
845 }
846
847 =head2 disconnect
848
849 Our C<disconnect> method also performs a rollback first if the
850 database is not in C<AutoCommit> mode.
851
852 =cut
853
854 sub disconnect {
855   my ($self) = @_;
856
857   if( $self->_dbh ) {
858     my @actions;
859
860     push @actions, ( $self->on_disconnect_call || () );
861     push @actions, $self->_parse_connect_do ('on_disconnect_do');
862
863     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
864
865     $self->_dbh_rollback unless $self->_dbh_autocommit;
866
867     %{ $self->_dbh->{CachedKids} } = ();
868     $self->_dbh->disconnect;
869     $self->_dbh(undef);
870     $self->{_dbh_gen}++;
871   }
872 }
873
874 =head2 with_deferred_fk_checks
875
876 =over 4
877
878 =item Arguments: C<$coderef>
879
880 =item Return Value: The return value of $coderef
881
882 =back
883
884 Storage specific method to run the code ref with FK checks deferred or
885 in MySQL's case disabled entirely.
886
887 =cut
888
889 # Storage subclasses should override this
890 sub with_deferred_fk_checks {
891   my ($self, $sub) = @_;
892   $sub->();
893 }
894
895 =head2 connected
896
897 =over
898
899 =item Arguments: none
900
901 =item Return Value: 1|0
902
903 =back
904
905 Verifies that the current database handle is active and ready to execute
906 an SQL statement (e.g. the connection did not get stale, server is still
907 answering, etc.) This method is used internally by L</dbh>.
908
909 =cut
910
911 sub connected {
912   my $self = shift;
913   return 0 unless $self->_seems_connected;
914
915   #be on the safe side
916   local $self->_dbh->{RaiseError} = 1;
917
918   return $self->_ping;
919 }
920
921 sub _seems_connected {
922   my $self = shift;
923
924   $self->_preserve_foreign_dbh;
925
926   my $dbh = $self->_dbh
927     or return 0;
928
929   return $dbh->FETCH('Active');
930 }
931
932 sub _ping {
933   my $self = shift;
934
935   my $dbh = $self->_dbh or return 0;
936
937   return $dbh->ping;
938 }
939
940 sub ensure_connected {
941   my ($self) = @_;
942
943   unless ($self->connected) {
944     $self->_populate_dbh;
945   }
946 }
947
948 =head2 dbh
949
950 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
951 is guaranteed to be healthy by implicitly calling L</connected>, and if
952 necessary performing a reconnection before returning. Keep in mind that this
953 is very B<expensive> on some database engines. Consider using L</dbh_do>
954 instead.
955
956 =cut
957
958 sub dbh {
959   my ($self) = @_;
960
961   if (not $self->_dbh) {
962     $self->_populate_dbh;
963   } else {
964     $self->ensure_connected;
965   }
966   return $self->_dbh;
967 }
968
969 # this is the internal "get dbh or connect (don't check)" method
970 sub _get_dbh {
971   my $self = shift;
972   $self->_preserve_foreign_dbh;
973   $self->_populate_dbh unless $self->_dbh;
974   return $self->_dbh;
975 }
976
977 sub sql_maker {
978   my ($self) = @_;
979   unless ($self->_sql_maker) {
980     my $sql_maker_class = $self->sql_maker_class;
981     $self->ensure_class_loaded ($sql_maker_class);
982
983     my %opts = %{$self->_sql_maker_opts||{}};
984     my $dialect =
985       $opts{limit_dialect}
986         ||
987       $self->sql_limit_dialect
988         ||
989       do {
990         my $s_class = (ref $self) || $self;
991         carp (
992           "Your storage class ($s_class) does not set sql_limit_dialect and you "
993         . 'have not supplied an explicit limit_dialect in your connection_info. '
994         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
995         . 'databases but can be (and often is) painfully slow.'
996         );
997
998         'GenericSubQ';
999       }
1000     ;
1001
1002     $self->_sql_maker($sql_maker_class->new(
1003       bindtype=>'columns',
1004       array_datatypes => 1,
1005       limit_dialect => $dialect,
1006       name_sep => '.',
1007       %opts,
1008     ));
1009   }
1010   return $self->_sql_maker;
1011 }
1012
1013 # nothing to do by default
1014 sub _rebless {}
1015 sub _init {}
1016
1017 sub _populate_dbh {
1018   my ($self) = @_;
1019
1020   my @info = @{$self->_dbi_connect_info || []};
1021   $self->_dbh(undef); # in case ->connected failed we might get sent here
1022   $self->_dbh_details({}); # reset everything we know
1023
1024   $self->_dbh($self->_connect(@info));
1025
1026   $self->_conn_pid($$);
1027   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1028
1029   $self->_determine_driver;
1030
1031   # Always set the transaction depth on connect, since
1032   #  there is no transaction in progress by definition
1033   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1034
1035   $self->_run_connection_actions unless $self->{_in_determine_driver};
1036 }
1037
1038 sub _run_connection_actions {
1039   my $self = shift;
1040   my @actions;
1041
1042   push @actions, ( $self->on_connect_call || () );
1043   push @actions, $self->_parse_connect_do ('on_connect_do');
1044
1045   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1046 }
1047
1048
1049
1050 sub set_use_dbms_capability {
1051   $_[0]->set_inherited ($_[1], $_[2]);
1052 }
1053
1054 sub get_use_dbms_capability {
1055   my ($self, $capname) = @_;
1056
1057   my $use = $self->get_inherited ($capname);
1058   return defined $use
1059     ? $use
1060     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1061   ;
1062 }
1063
1064 sub set_dbms_capability {
1065   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1066 }
1067
1068 sub get_dbms_capability {
1069   my ($self, $capname) = @_;
1070
1071   my $cap = $self->_dbh_details->{capability}{$capname};
1072
1073   unless (defined $cap) {
1074     if (my $meth = $self->can ("_determine$capname")) {
1075       $cap = $self->$meth ? 1 : 0;
1076     }
1077     else {
1078       $cap = 0;
1079     }
1080
1081     $self->set_dbms_capability ($capname, $cap);
1082   }
1083
1084   return $cap;
1085 }
1086
1087 sub _server_info {
1088   my $self = shift;
1089
1090   my $info;
1091   unless ($info = $self->_dbh_details->{info}) {
1092
1093     $info = {};
1094
1095     my $server_version = try { $self->_get_server_version };
1096
1097     if (defined $server_version) {
1098       $info->{dbms_version} = $server_version;
1099
1100       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1101       my @verparts = split (/\./, $numeric_version);
1102       if (
1103         @verparts
1104           &&
1105         $verparts[0] <= 999
1106       ) {
1107         # consider only up to 3 version parts, iff not more than 3 digits
1108         my @use_parts;
1109         while (@verparts && @use_parts < 3) {
1110           my $p = shift @verparts;
1111           last if $p > 999;
1112           push @use_parts, $p;
1113         }
1114         push @use_parts, 0 while @use_parts < 3;
1115
1116         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1117       }
1118     }
1119
1120     $self->_dbh_details->{info} = $info;
1121   }
1122
1123   return $info;
1124 }
1125
1126 sub _get_server_version {
1127   shift->_get_dbh->get_info(18);
1128 }
1129
1130 sub _determine_driver {
1131   my ($self) = @_;
1132
1133   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1134     my $started_connected = 0;
1135     local $self->{_in_determine_driver} = 1;
1136
1137     if (ref($self) eq __PACKAGE__) {
1138       my $driver;
1139       if ($self->_dbh) { # we are connected
1140         $driver = $self->_dbh->{Driver}{Name};
1141         $started_connected = 1;
1142       } else {
1143         # if connect_info is a CODEREF, we have no choice but to connect
1144         if (ref $self->_dbi_connect_info->[0] &&
1145             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1146           $self->_populate_dbh;
1147           $driver = $self->_dbh->{Driver}{Name};
1148         }
1149         else {
1150           # try to use dsn to not require being connected, the driver may still
1151           # force a connection in _rebless to determine version
1152           # (dsn may not be supplied at all if all we do is make a mock-schema)
1153           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1154           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1155           $driver ||= $ENV{DBI_DRIVER};
1156         }
1157       }
1158
1159       if ($driver) {
1160         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1161         if ($self->load_optional_class($storage_class)) {
1162           mro::set_mro($storage_class, 'c3');
1163           bless $self, $storage_class;
1164           $self->_rebless();
1165         }
1166       }
1167     }
1168
1169     $self->_driver_determined(1);
1170
1171     $self->_init; # run driver-specific initializations
1172
1173     $self->_run_connection_actions
1174         if !$started_connected && defined $self->_dbh;
1175   }
1176 }
1177
1178 sub _do_connection_actions {
1179   my $self          = shift;
1180   my $method_prefix = shift;
1181   my $call          = shift;
1182
1183   if (not ref($call)) {
1184     my $method = $method_prefix . $call;
1185     $self->$method(@_);
1186   } elsif (ref($call) eq 'CODE') {
1187     $self->$call(@_);
1188   } elsif (ref($call) eq 'ARRAY') {
1189     if (ref($call->[0]) ne 'ARRAY') {
1190       $self->_do_connection_actions($method_prefix, $_) for @$call;
1191     } else {
1192       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1193     }
1194   } else {
1195     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1196   }
1197
1198   return $self;
1199 }
1200
1201 sub connect_call_do_sql {
1202   my $self = shift;
1203   $self->_do_query(@_);
1204 }
1205
1206 sub disconnect_call_do_sql {
1207   my $self = shift;
1208   $self->_do_query(@_);
1209 }
1210
1211 # override in db-specific backend when necessary
1212 sub connect_call_datetime_setup { 1 }
1213
1214 sub _do_query {
1215   my ($self, $action) = @_;
1216
1217   if (ref $action eq 'CODE') {
1218     $action = $action->($self);
1219     $self->_do_query($_) foreach @$action;
1220   }
1221   else {
1222     # Most debuggers expect ($sql, @bind), so we need to exclude
1223     # the attribute hash which is the second argument to $dbh->do
1224     # furthermore the bind values are usually to be presented
1225     # as named arrayref pairs, so wrap those here too
1226     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1227     my $sql = shift @do_args;
1228     my $attrs = shift @do_args;
1229     my @bind = map { [ undef, $_ ] } @do_args;
1230
1231     $self->_query_start($sql, @bind);
1232     $self->_get_dbh->do($sql, $attrs, @do_args);
1233     $self->_query_end($sql, @bind);
1234   }
1235
1236   return $self;
1237 }
1238
1239 sub _connect {
1240   my ($self, @info) = @_;
1241
1242   $self->throw_exception("You failed to provide any connection info")
1243     if !@info;
1244
1245   my ($old_connect_via, $dbh);
1246
1247   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1248     $old_connect_via = $DBI::connect_via;
1249     $DBI::connect_via = 'connect';
1250   }
1251
1252   try {
1253     if(ref $info[0] eq 'CODE') {
1254        $dbh = $info[0]->();
1255     }
1256     else {
1257        $dbh = DBI->connect(@info);
1258     }
1259
1260     if (!$dbh) {
1261       die $DBI::errstr;
1262     }
1263
1264     unless ($self->unsafe) {
1265
1266       # this odd anonymous coderef dereference is in fact really
1267       # necessary to avoid the unwanted effect described in perl5
1268       # RT#75792
1269       sub {
1270         my $weak_self = $_[0];
1271         weaken $weak_self;
1272
1273         $_[1]->{HandleError} = sub {
1274           if ($weak_self) {
1275             $weak_self->throw_exception("DBI Exception: $_[0]");
1276           }
1277           else {
1278             # the handler may be invoked by something totally out of
1279             # the scope of DBIC
1280             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1281           }
1282         };
1283       }->($self, $dbh);
1284
1285       $dbh->{ShowErrorStatement} = 1;
1286       $dbh->{RaiseError} = 1;
1287       $dbh->{PrintError} = 0;
1288     }
1289   }
1290   catch {
1291     $self->throw_exception("DBI Connection failed: $_")
1292   }
1293   finally {
1294     $DBI::connect_via = $old_connect_via if $old_connect_via;
1295   };
1296
1297   $self->_dbh_autocommit($dbh->{AutoCommit});
1298   $dbh;
1299 }
1300
1301 sub svp_begin {
1302   my ($self, $name) = @_;
1303
1304   $name = $self->_svp_generate_name
1305     unless defined $name;
1306
1307   $self->throw_exception ("You can't use savepoints outside a transaction")
1308     if $self->{transaction_depth} == 0;
1309
1310   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1311     unless $self->can('_svp_begin');
1312
1313   push @{ $self->{savepoints} }, $name;
1314
1315   $self->debugobj->svp_begin($name) if $self->debug;
1316
1317   return $self->_svp_begin($name);
1318 }
1319
1320 sub svp_release {
1321   my ($self, $name) = @_;
1322
1323   $self->throw_exception ("You can't use savepoints outside a transaction")
1324     if $self->{transaction_depth} == 0;
1325
1326   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1327     unless $self->can('_svp_release');
1328
1329   if (defined $name) {
1330     $self->throw_exception ("Savepoint '$name' does not exist")
1331       unless grep { $_ eq $name } @{ $self->{savepoints} };
1332
1333     # Dig through the stack until we find the one we are releasing.  This keeps
1334     # the stack up to date.
1335     my $svp;
1336
1337     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1338   } else {
1339     $name = pop @{ $self->{savepoints} };
1340   }
1341
1342   $self->debugobj->svp_release($name) if $self->debug;
1343
1344   return $self->_svp_release($name);
1345 }
1346
1347 sub svp_rollback {
1348   my ($self, $name) = @_;
1349
1350   $self->throw_exception ("You can't use savepoints outside a transaction")
1351     if $self->{transaction_depth} == 0;
1352
1353   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1354     unless $self->can('_svp_rollback');
1355
1356   if (defined $name) {
1357       # If they passed us a name, verify that it exists in the stack
1358       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1359           $self->throw_exception("Savepoint '$name' does not exist!");
1360       }
1361
1362       # Dig through the stack until we find the one we are releasing.  This keeps
1363       # the stack up to date.
1364       while(my $s = pop(@{ $self->{savepoints} })) {
1365           last if($s eq $name);
1366       }
1367       # Add the savepoint back to the stack, as a rollback doesn't remove the
1368       # named savepoint, only everything after it.
1369       push(@{ $self->{savepoints} }, $name);
1370   } else {
1371       # We'll assume they want to rollback to the last savepoint
1372       $name = $self->{savepoints}->[-1];
1373   }
1374
1375   $self->debugobj->svp_rollback($name) if $self->debug;
1376
1377   return $self->_svp_rollback($name);
1378 }
1379
1380 sub _svp_generate_name {
1381   my ($self) = @_;
1382   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1383 }
1384
1385 sub txn_begin {
1386   my $self = shift;
1387
1388   # this means we have not yet connected and do not know the AC status
1389   # (e.g. coderef $dbh)
1390   if (! defined $self->_dbh_autocommit) {
1391     $self->ensure_connected;
1392   }
1393   # otherwise re-connect on pid changes, so
1394   # that the txn_depth is adjusted properly
1395   # the lightweight _get_dbh is good enoug here
1396   # (only superficial handle check, no pings)
1397   else {
1398     $self->_get_dbh;
1399   }
1400
1401   if($self->transaction_depth == 0) {
1402     $self->debugobj->txn_begin()
1403       if $self->debug;
1404     $self->_dbh_begin_work;
1405   }
1406   elsif ($self->auto_savepoint) {
1407     $self->svp_begin;
1408   }
1409   $self->{transaction_depth}++;
1410 }
1411
1412 sub _dbh_begin_work {
1413   my $self = shift;
1414
1415   # if the user is utilizing txn_do - good for him, otherwise we need to
1416   # ensure that the $dbh is healthy on BEGIN.
1417   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1418   # will be replaced by a failure of begin_work itself (which will be
1419   # then retried on reconnect)
1420   if ($self->{_in_dbh_do}) {
1421     $self->_dbh->begin_work;
1422   } else {
1423     $self->dbh_do(sub { $_[1]->begin_work });
1424   }
1425 }
1426
1427 sub txn_commit {
1428   my $self = shift;
1429   if ($self->{transaction_depth} == 1) {
1430     $self->debugobj->txn_commit()
1431       if ($self->debug);
1432     $self->_dbh_commit;
1433     $self->{transaction_depth} = 0
1434       if $self->_dbh_autocommit;
1435   }
1436   elsif($self->{transaction_depth} > 1) {
1437     $self->{transaction_depth}--;
1438     $self->svp_release
1439       if $self->auto_savepoint;
1440   }
1441   else {
1442     $self->throw_exception( 'Refusing to commit without a started transaction' );
1443   }
1444 }
1445
1446 sub _dbh_commit {
1447   my $self = shift;
1448   my $dbh  = $self->_dbh
1449     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1450   $dbh->commit;
1451 }
1452
1453 sub txn_rollback {
1454   my $self = shift;
1455   my $dbh = $self->_dbh;
1456   try {
1457     if ($self->{transaction_depth} == 1) {
1458       $self->debugobj->txn_rollback()
1459         if ($self->debug);
1460       $self->{transaction_depth} = 0
1461         if $self->_dbh_autocommit;
1462       $self->_dbh_rollback;
1463     }
1464     elsif($self->{transaction_depth} > 1) {
1465       $self->{transaction_depth}--;
1466       if ($self->auto_savepoint) {
1467         $self->svp_rollback;
1468         $self->svp_release;
1469       }
1470     }
1471     else {
1472       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1473     }
1474   }
1475   catch {
1476     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1477
1478     if ($_ !~ /$exception_class/) {
1479       # ensure that a failed rollback resets the transaction depth
1480       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1481     }
1482
1483     $self->throw_exception($_)
1484   };
1485 }
1486
1487 sub _dbh_rollback {
1488   my $self = shift;
1489   my $dbh  = $self->_dbh
1490     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1491   $dbh->rollback;
1492 }
1493
1494 # This used to be the top-half of _execute.  It was split out to make it
1495 #  easier to override in NoBindVars without duping the rest.  It takes up
1496 #  all of _execute's args, and emits $sql, @bind.
1497 sub _prep_for_execute {
1498   my ($self, $op, $extra_bind, $ident, $args) = @_;
1499
1500   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1501     $ident = $ident->from();
1502   }
1503
1504   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1505
1506   unshift(@bind,
1507     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1508       if $extra_bind;
1509   return ($sql, \@bind);
1510 }
1511
1512
1513 sub _fix_bind_params {
1514     my ($self, @bind) = @_;
1515
1516     ### Turn @bind from something like this:
1517     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1518     ### to this:
1519     ###   ( "'1'", "'1'", "'3'" )
1520     return
1521         map {
1522             if ( defined( $_ && $_->[1] ) ) {
1523                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1524             }
1525             else { q{NULL}; }
1526         } @bind;
1527 }
1528
1529 sub _query_start {
1530     my ( $self, $sql, @bind ) = @_;
1531
1532     if ( $self->debug ) {
1533         @bind = $self->_fix_bind_params(@bind);
1534
1535         $self->debugobj->query_start( $sql, @bind );
1536     }
1537 }
1538
1539 sub _query_end {
1540     my ( $self, $sql, @bind ) = @_;
1541
1542     if ( $self->debug ) {
1543         @bind = $self->_fix_bind_params(@bind);
1544         $self->debugobj->query_end( $sql, @bind );
1545     }
1546 }
1547
1548 sub _dbh_execute {
1549   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1550
1551   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1552
1553   $self->_query_start( $sql, @$bind );
1554
1555   my $sth = $self->sth($sql,$op);
1556
1557   my $placeholder_index = 1;
1558
1559   foreach my $bound (@$bind) {
1560     my $attributes = {};
1561     my($column_name, @data) = @$bound;
1562
1563     if ($bind_attributes) {
1564       $attributes = $bind_attributes->{$column_name}
1565       if defined $bind_attributes->{$column_name};
1566     }
1567
1568     foreach my $data (@data) {
1569       my $ref = ref $data;
1570       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1571
1572       $sth->bind_param($placeholder_index, $data, $attributes);
1573       $placeholder_index++;
1574     }
1575   }
1576
1577   # Can this fail without throwing an exception anyways???
1578   my $rv = $sth->execute();
1579   $self->throw_exception(
1580     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1581   ) if !$rv;
1582
1583   $self->_query_end( $sql, @$bind );
1584
1585   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1586 }
1587
1588 sub _execute {
1589     my $self = shift;
1590     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1591 }
1592
1593 sub _prefetch_insert_auto_nextvals {
1594   my ($self, $source, $to_insert) = @_;
1595
1596   my $upd = {};
1597
1598   foreach my $col ( $source->columns ) {
1599     if ( !defined $to_insert->{$col} ) {
1600       my $col_info = $source->column_info($col);
1601
1602       if ( $col_info->{auto_nextval} ) {
1603         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1604           'nextval',
1605           $col_info->{sequence} ||=
1606             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1607         );
1608       }
1609     }
1610   }
1611
1612   return $upd;
1613 }
1614
1615 sub insert {
1616   my $self = shift;
1617   my ($source, $to_insert, $opts) = @_;
1618
1619   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1620
1621   my $bind_attributes = $self->source_bind_attributes($source);
1622
1623   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1624
1625   if ($opts->{returning}) {
1626     my @ret_cols = @{$opts->{returning}};
1627
1628     my @ret_vals = try {
1629       local $SIG{__WARN__} = sub {};
1630       my @r = $sth->fetchrow_array;
1631       $sth->finish;
1632       @r;
1633     };
1634
1635     my %ret;
1636     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1637
1638     $updated_cols = {
1639       %$updated_cols,
1640       %ret,
1641     };
1642   }
1643
1644   return $updated_cols;
1645 }
1646
1647 ## Currently it is assumed that all values passed will be "normal", i.e. not
1648 ## scalar refs, or at least, all the same type as the first set, the statement is
1649 ## only prepped once.
1650 sub insert_bulk {
1651   my ($self, $source, $cols, $data) = @_;
1652
1653   my %colvalues;
1654   @colvalues{@$cols} = (0..$#$cols);
1655
1656   for my $i (0..$#$cols) {
1657     my $first_val = $data->[0][$i];
1658     next unless ref $first_val eq 'SCALAR';
1659
1660     $colvalues{ $cols->[$i] } = $first_val;
1661   }
1662
1663   # check for bad data and stringify stringifiable objects
1664   my $bad_slice = sub {
1665     my ($msg, $col_idx, $slice_idx) = @_;
1666     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1667       $msg,
1668       $cols->[$col_idx],
1669       do {
1670         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1671         Dumper {
1672           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1673         },
1674       }
1675     );
1676   };
1677
1678   for my $datum_idx (0..$#$data) {
1679     my $datum = $data->[$datum_idx];
1680
1681     for my $col_idx (0..$#$cols) {
1682       my $val            = $datum->[$col_idx];
1683       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1684       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1685
1686       if ($is_literal_sql) {
1687         if (not ref $val) {
1688           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1689         }
1690         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1691           $bad_slice->("$reftype reference found where literal SQL expected",
1692             $col_idx, $datum_idx);
1693         }
1694         elsif ($$val ne $$sqla_bind){
1695           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1696             $col_idx, $datum_idx);
1697         }
1698       }
1699       elsif (my $reftype = ref $val) {
1700         require overload;
1701         if (overload::Method($val, '""')) {
1702           $datum->[$col_idx] = "".$val;
1703         }
1704         else {
1705           $bad_slice->("$reftype reference found where bind expected",
1706             $col_idx, $datum_idx);
1707         }
1708       }
1709     }
1710   }
1711
1712   my ($sql, $bind) = $self->_prep_for_execute (
1713     'insert', undef, $source, [\%colvalues]
1714   );
1715   my @bind = @$bind;
1716
1717   my $empty_bind = 1 if (not @bind) &&
1718     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1719
1720   if ((not @bind) && (not $empty_bind)) {
1721     $self->throw_exception(
1722       'Cannot insert_bulk without support for placeholders'
1723     );
1724   }
1725
1726   # neither _execute_array, nor _execute_inserts_with_no_binds are
1727   # atomic (even if _execute _array is a single call). Thus a safety
1728   # scope guard
1729   my $guard = $self->txn_scope_guard;
1730
1731   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1732   my $sth = $self->sth($sql);
1733   my $rv = do {
1734     if ($empty_bind) {
1735       # bind_param_array doesn't work if there are no binds
1736       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1737     }
1738     else {
1739 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1740       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1741     }
1742   };
1743
1744   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1745
1746   $guard->commit;
1747
1748   return (wantarray ? ($rv, $sth, @bind) : $rv);
1749 }
1750
1751 sub _execute_array {
1752   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1753
1754   ## This must be an arrayref, else nothing works!
1755   my $tuple_status = [];
1756
1757   ## Get the bind_attributes, if any exist
1758   my $bind_attributes = $self->source_bind_attributes($source);
1759
1760   ## Bind the values and execute
1761   my $placeholder_index = 1;
1762
1763   foreach my $bound (@$bind) {
1764
1765     my $attributes = {};
1766     my ($column_name, $data_index) = @$bound;
1767
1768     if( $bind_attributes ) {
1769       $attributes = $bind_attributes->{$column_name}
1770       if defined $bind_attributes->{$column_name};
1771     }
1772
1773     my @data = map { $_->[$data_index] } @$data;
1774
1775     $sth->bind_param_array(
1776       $placeholder_index,
1777       [@data],
1778       (%$attributes ?  $attributes : ()),
1779     );
1780     $placeholder_index++;
1781   }
1782
1783   my ($rv, $err);
1784   try {
1785     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1786   }
1787   catch {
1788     $err = shift;
1789   };
1790
1791   # Statement must finish even if there was an exception.
1792   try {
1793     $sth->finish
1794   }
1795   catch {
1796     $err = shift unless defined $err
1797   };
1798
1799   $err = $sth->errstr
1800     if (! defined $err and $sth->err);
1801
1802   if (defined $err) {
1803     my $i = 0;
1804     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1805
1806     $self->throw_exception("Unexpected populate error: $err")
1807       if ($i > $#$tuple_status);
1808
1809     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1810       ($tuple_status->[$i][1] || $err),
1811       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1812     );
1813   }
1814
1815   return $rv;
1816 }
1817
1818 sub _dbh_execute_array {
1819     my ($self, $sth, $tuple_status, @extra) = @_;
1820
1821     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1822 }
1823
1824 sub _dbh_execute_inserts_with_no_binds {
1825   my ($self, $sth, $count) = @_;
1826
1827   my $err;
1828   try {
1829     my $dbh = $self->_get_dbh;
1830     local $dbh->{RaiseError} = 1;
1831     local $dbh->{PrintError} = 0;
1832
1833     $sth->execute foreach 1..$count;
1834   }
1835   catch {
1836     $err = shift;
1837   }
1838   finally {
1839     # Make sure statement is finished even if there was an exception.
1840     try {
1841       $sth->finish
1842     }
1843     catch {
1844       $err = shift unless defined $err;
1845     };
1846   };
1847
1848   $self->throw_exception($err) if defined $err;
1849
1850   return $count;
1851 }
1852
1853 sub update {
1854   my ($self, $source, @args) = @_;
1855
1856   my $bind_attrs = $self->source_bind_attributes($source);
1857
1858   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1859 }
1860
1861
1862 sub delete {
1863   my ($self, $source, @args) = @_;
1864
1865   my $bind_attrs = $self->source_bind_attributes($source);
1866
1867   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1868 }
1869
1870 # We were sent here because the $rs contains a complex search
1871 # which will require a subquery to select the correct rows
1872 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1873 #
1874 # Generating a single PK column subquery is trivial and supported
1875 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1876 # Look at _multipk_update_delete()
1877 sub _subq_update_delete {
1878   my $self = shift;
1879   my ($rs, $op, $values) = @_;
1880
1881   my $rsrc = $rs->result_source;
1882
1883   # quick check if we got a sane rs on our hands
1884   my @pcols = $rsrc->_pri_cols;
1885
1886   my $sel = $rs->_resolved_attrs->{select};
1887   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1888
1889   if (
1890       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1891         ne
1892       join ("\x00", sort @$sel )
1893   ) {
1894     $self->throw_exception (
1895       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1896     );
1897   }
1898
1899   if (@pcols == 1) {
1900     return $self->$op (
1901       $rsrc,
1902       $op eq 'update' ? $values : (),
1903       { $pcols[0] => { -in => $rs->as_query } },
1904     );
1905   }
1906
1907   else {
1908     return $self->_multipk_update_delete (@_);
1909   }
1910 }
1911
1912 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1913 # resultset update/delete involving subqueries. So by default resort
1914 # to simple (and inefficient) delete_all style per-row opearations,
1915 # while allowing specific storages to override this with a faster
1916 # implementation.
1917 #
1918 sub _multipk_update_delete {
1919   return shift->_per_row_update_delete (@_);
1920 }
1921
1922 # This is the default loop used to delete/update rows for multi PK
1923 # resultsets, and used by mysql exclusively (because it can't do anything
1924 # else).
1925 #
1926 # We do not use $row->$op style queries, because resultset update/delete
1927 # is not expected to cascade (this is what delete_all/update_all is for).
1928 #
1929 # There should be no race conditions as the entire operation is rolled
1930 # in a transaction.
1931 #
1932 sub _per_row_update_delete {
1933   my $self = shift;
1934   my ($rs, $op, $values) = @_;
1935
1936   my $rsrc = $rs->result_source;
1937   my @pcols = $rsrc->_pri_cols;
1938
1939   my $guard = $self->txn_scope_guard;
1940
1941   # emulate the return value of $sth->execute for non-selects
1942   my $row_cnt = '0E0';
1943
1944   my $subrs_cur = $rs->cursor;
1945   my @all_pk = $subrs_cur->all;
1946   for my $pks ( @all_pk) {
1947
1948     my $cond;
1949     for my $i (0.. $#pcols) {
1950       $cond->{$pcols[$i]} = $pks->[$i];
1951     }
1952
1953     $self->$op (
1954       $rsrc,
1955       $op eq 'update' ? $values : (),
1956       $cond,
1957     );
1958
1959     $row_cnt++;
1960   }
1961
1962   $guard->commit;
1963
1964   return $row_cnt;
1965 }
1966
1967 sub _select {
1968   my $self = shift;
1969   $self->_execute($self->_select_args(@_));
1970 }
1971
1972 sub _select_args_to_query {
1973   my $self = shift;
1974
1975   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1976   #  = $self->_select_args($ident, $select, $cond, $attrs);
1977   my ($op, $bind, $ident, $bind_attrs, @args) =
1978     $self->_select_args(@_);
1979
1980   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1981   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1982   $prepared_bind ||= [];
1983
1984   return wantarray
1985     ? ($sql, $prepared_bind, $bind_attrs)
1986     : \[ "($sql)", @$prepared_bind ]
1987   ;
1988 }
1989
1990 sub _select_args {
1991   my ($self, $ident, $select, $where, $attrs) = @_;
1992
1993   my $sql_maker = $self->sql_maker;
1994   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1995
1996   $attrs = {
1997     %$attrs,
1998     select => $select,
1999     from => $ident,
2000     where => $where,
2001     $rs_alias && $alias2source->{$rs_alias}
2002       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
2003       : ()
2004     ,
2005   };
2006
2007   # calculate bind_attrs before possible $ident mangling
2008   my $bind_attrs = {};
2009   for my $alias (keys %$alias2source) {
2010     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2011     for my $col (keys %$bindtypes) {
2012
2013       my $fqcn = join ('.', $alias, $col);
2014       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2015
2016       # Unqialified column names are nice, but at the same time can be
2017       # rather ambiguous. What we do here is basically go along with
2018       # the loop, adding an unqualified column slot to $bind_attrs,
2019       # alongside the fully qualified name. As soon as we encounter
2020       # another column by that name (which would imply another table)
2021       # we unset the unqualified slot and never add any info to it
2022       # to avoid erroneous type binding. If this happens the users
2023       # only choice will be to fully qualify his column name
2024
2025       if (exists $bind_attrs->{$col}) {
2026         $bind_attrs->{$col} = {};
2027       }
2028       else {
2029         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2030       }
2031     }
2032   }
2033
2034   # Sanity check the attributes (SQLMaker does it too, but
2035   # in case of a software_limit we'll never reach there)
2036   if (defined $attrs->{offset}) {
2037     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2038       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2039   }
2040   $attrs->{offset} ||= 0;
2041
2042   if (defined $attrs->{rows}) {
2043     $self->throw_exception("The rows attribute must be a positive integer if present")
2044       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2045   }
2046   elsif ($attrs->{offset}) {
2047     # MySQL actually recommends this approach.  I cringe.
2048     $attrs->{rows} = $sql_maker->__max_int;
2049   }
2050
2051   my @limit;
2052
2053   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2054   # storage, unless software limit was requested
2055   if (
2056     #limited has_many
2057     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2058        ||
2059     # grouped prefetch (to satisfy group_by == select)
2060     ( $attrs->{group_by}
2061         &&
2062       @{$attrs->{group_by}}
2063         &&
2064       $attrs->{_prefetch_select}
2065         &&
2066       @{$attrs->{_prefetch_select}}
2067     )
2068   ) {
2069     ($ident, $select, $where, $attrs)
2070       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2071   }
2072   elsif (! $attrs->{software_limit} ) {
2073     push @limit, $attrs->{rows}, $attrs->{offset};
2074   }
2075
2076   # try to simplify the joinmap further (prune unreferenced type-single joins)
2077   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2078
2079 ###
2080   # This would be the point to deflate anything found in $where
2081   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2082   # expect a row object. And all we have is a resultsource (it is trivial
2083   # to extract deflator coderefs via $alias2source above).
2084   #
2085   # I don't see a way forward other than changing the way deflators are
2086   # invoked, and that's just bad...
2087 ###
2088
2089   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2090 }
2091
2092 # Returns a counting SELECT for a simple count
2093 # query. Abstracted so that a storage could override
2094 # this to { count => 'firstcol' } or whatever makes
2095 # sense as a performance optimization
2096 sub _count_select {
2097   #my ($self, $source, $rs_attrs) = @_;
2098   return { count => '*' };
2099 }
2100
2101
2102 sub source_bind_attributes {
2103   my ($self, $source) = @_;
2104
2105   my $bind_attributes;
2106   foreach my $column ($source->columns) {
2107
2108     my $data_type = $source->column_info($column)->{data_type} || '';
2109     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2110      if $data_type;
2111   }
2112
2113   return $bind_attributes;
2114 }
2115
2116 =head2 select
2117
2118 =over 4
2119
2120 =item Arguments: $ident, $select, $condition, $attrs
2121
2122 =back
2123
2124 Handle a SQL select statement.
2125
2126 =cut
2127
2128 sub select {
2129   my $self = shift;
2130   my ($ident, $select, $condition, $attrs) = @_;
2131   return $self->cursor_class->new($self, \@_, $attrs);
2132 }
2133
2134 sub select_single {
2135   my $self = shift;
2136   my ($rv, $sth, @bind) = $self->_select(@_);
2137   my @row = $sth->fetchrow_array;
2138   my @nextrow = $sth->fetchrow_array if @row;
2139   if(@row && @nextrow) {
2140     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2141   }
2142   # Need to call finish() to work round broken DBDs
2143   $sth->finish();
2144   return @row;
2145 }
2146
2147 =head2 sql_limit_dialect
2148
2149 This is an accessor for the default SQL limit dialect used by a particular
2150 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2151 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2152 see L<DBIx::Class::SQLMaker::LimitDialects>.
2153
2154 =head2 sth
2155
2156 =over 4
2157
2158 =item Arguments: $sql
2159
2160 =back
2161
2162 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2163
2164 =cut
2165
2166 sub _dbh_sth {
2167   my ($self, $dbh, $sql) = @_;
2168
2169   # 3 is the if_active parameter which avoids active sth re-use
2170   my $sth = $self->disable_sth_caching
2171     ? $dbh->prepare($sql)
2172     : $dbh->prepare_cached($sql, {}, 3);
2173
2174   # XXX You would think RaiseError would make this impossible,
2175   #  but apparently that's not true :(
2176   $self->throw_exception($dbh->errstr) if !$sth;
2177
2178   $sth;
2179 }
2180
2181 sub sth {
2182   my ($self, $sql) = @_;
2183   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2184 }
2185
2186 sub _dbh_columns_info_for {
2187   my ($self, $dbh, $table) = @_;
2188
2189   if ($dbh->can('column_info')) {
2190     my %result;
2191     my $caught;
2192     try {
2193       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2194       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2195       $sth->execute();
2196       while ( my $info = $sth->fetchrow_hashref() ){
2197         my %column_info;
2198         $column_info{data_type}   = $info->{TYPE_NAME};
2199         $column_info{size}      = $info->{COLUMN_SIZE};
2200         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2201         $column_info{default_value} = $info->{COLUMN_DEF};
2202         my $col_name = $info->{COLUMN_NAME};
2203         $col_name =~ s/^\"(.*)\"$/$1/;
2204
2205         $result{$col_name} = \%column_info;
2206       }
2207     } catch {
2208       $caught = 1;
2209     };
2210     return \%result if !$caught && scalar keys %result;
2211   }
2212
2213   my %result;
2214   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2215   $sth->execute;
2216   my @columns = @{$sth->{NAME_lc}};
2217   for my $i ( 0 .. $#columns ){
2218     my %column_info;
2219     $column_info{data_type} = $sth->{TYPE}->[$i];
2220     $column_info{size} = $sth->{PRECISION}->[$i];
2221     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2222
2223     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2224       $column_info{data_type} = $1;
2225       $column_info{size}    = $2;
2226     }
2227
2228     $result{$columns[$i]} = \%column_info;
2229   }
2230   $sth->finish;
2231
2232   foreach my $col (keys %result) {
2233     my $colinfo = $result{$col};
2234     my $type_num = $colinfo->{data_type};
2235     my $type_name;
2236     if(defined $type_num && $dbh->can('type_info')) {
2237       my $type_info = $dbh->type_info($type_num);
2238       $type_name = $type_info->{TYPE_NAME} if $type_info;
2239       $colinfo->{data_type} = $type_name if $type_name;
2240     }
2241   }
2242
2243   return \%result;
2244 }
2245
2246 sub columns_info_for {
2247   my ($self, $table) = @_;
2248   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2249 }
2250
2251 =head2 last_insert_id
2252
2253 Return the row id of the last insert.
2254
2255 =cut
2256
2257 sub _dbh_last_insert_id {
2258     my ($self, $dbh, $source, $col) = @_;
2259
2260     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2261
2262     return $id if defined $id;
2263
2264     my $class = ref $self;
2265     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2266 }
2267
2268 sub last_insert_id {
2269   my $self = shift;
2270   $self->_dbh_last_insert_id ($self->_dbh, @_);
2271 }
2272
2273 =head2 _native_data_type
2274
2275 =over 4
2276
2277 =item Arguments: $type_name
2278
2279 =back
2280
2281 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2282 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2283 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2284
2285 The default implementation returns C<undef>, implement in your Storage driver if
2286 you need this functionality.
2287
2288 Should map types from other databases to the native RDBMS type, for example
2289 C<VARCHAR2> to C<VARCHAR>.
2290
2291 Types with modifiers should map to the underlying data type. For example,
2292 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2293
2294 Composite types should map to the container type, for example
2295 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2296
2297 =cut
2298
2299 sub _native_data_type {
2300   #my ($self, $data_type) = @_;
2301   return undef
2302 }
2303
2304 # Check if placeholders are supported at all
2305 sub _determine_supports_placeholders {
2306   my $self = shift;
2307   my $dbh  = $self->_get_dbh;
2308
2309   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2310   # but it is inaccurate more often than not
2311   return try {
2312     local $dbh->{PrintError} = 0;
2313     local $dbh->{RaiseError} = 1;
2314     $dbh->do('select ?', {}, 1);
2315     1;
2316   }
2317   catch {
2318     0;
2319   };
2320 }
2321
2322 # Check if placeholders bound to non-string types throw exceptions
2323 #
2324 sub _determine_supports_typeless_placeholders {
2325   my $self = shift;
2326   my $dbh  = $self->_get_dbh;
2327
2328   return try {
2329     local $dbh->{PrintError} = 0;
2330     local $dbh->{RaiseError} = 1;
2331     # this specifically tests a bind that is NOT a string
2332     $dbh->do('select 1 where 1 = ?', {}, 1);
2333     1;
2334   }
2335   catch {
2336     0;
2337   };
2338 }
2339
2340 =head2 sqlt_type
2341
2342 Returns the database driver name.
2343
2344 =cut
2345
2346 sub sqlt_type {
2347   shift->_get_dbh->{Driver}->{Name};
2348 }
2349
2350 =head2 bind_attribute_by_data_type
2351
2352 Given a datatype from column info, returns a database specific bind
2353 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2354 let the database planner just handle it.
2355
2356 Generally only needed for special case column types, like bytea in postgres.
2357
2358 =cut
2359
2360 sub bind_attribute_by_data_type {
2361     return;
2362 }
2363
2364 =head2 is_datatype_numeric
2365
2366 Given a datatype from column_info, returns a boolean value indicating if
2367 the current RDBMS considers it a numeric value. This controls how
2368 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2369 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2370 be performed instead of the usual C<eq>.
2371
2372 =cut
2373
2374 sub is_datatype_numeric {
2375   my ($self, $dt) = @_;
2376
2377   return 0 unless $dt;
2378
2379   return $dt =~ /^ (?:
2380     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2381   ) $/ix;
2382 }
2383
2384
2385 =head2 create_ddl_dir
2386
2387 =over 4
2388
2389 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2390
2391 =back
2392
2393 Creates a SQL file based on the Schema, for each of the specified
2394 database engines in C<\@databases> in the given directory.
2395 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2396
2397 Given a previous version number, this will also create a file containing
2398 the ALTER TABLE statements to transform the previous schema into the
2399 current one. Note that these statements may contain C<DROP TABLE> or
2400 C<DROP COLUMN> statements that can potentially destroy data.
2401
2402 The file names are created using the C<ddl_filename> method below, please
2403 override this method in your schema if you would like a different file
2404 name format. For the ALTER file, the same format is used, replacing
2405 $version in the name with "$preversion-$version".
2406
2407 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2408 The most common value for this would be C<< { add_drop_table => 1 } >>
2409 to have the SQL produced include a C<DROP TABLE> statement for each table
2410 created. For quoting purposes supply C<quote_table_names> and
2411 C<quote_field_names>.
2412
2413 If no arguments are passed, then the following default values are assumed:
2414
2415 =over 4
2416
2417 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2418
2419 =item version    - $schema->schema_version
2420
2421 =item directory  - './'
2422
2423 =item preversion - <none>
2424
2425 =back
2426
2427 By default, C<\%sqlt_args> will have
2428
2429  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2430
2431 merged with the hash passed in. To disable any of those features, pass in a
2432 hashref like the following
2433
2434  { ignore_constraint_names => 0, # ... other options }
2435
2436
2437 WARNING: You are strongly advised to check all SQL files created, before applying
2438 them.
2439
2440 =cut
2441
2442 sub create_ddl_dir {
2443   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2444
2445   unless ($dir) {
2446     carp "No directory given, using ./\n";
2447     $dir = './';
2448   } else {
2449       -d $dir
2450         or
2451       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2452         or
2453       $self->throw_exception(
2454         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2455       );
2456   }
2457
2458   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2459
2460   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2461   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2462
2463   my $schema_version = $schema->schema_version || '1.x';
2464   $version ||= $schema_version;
2465
2466   $sqltargs = {
2467     add_drop_table => 1,
2468     ignore_constraint_names => 1,
2469     ignore_index_names => 1,
2470     %{$sqltargs || {}}
2471   };
2472
2473   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2474     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2475   }
2476
2477   my $sqlt = SQL::Translator->new( $sqltargs );
2478
2479   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2480   my $sqlt_schema = $sqlt->translate({ data => $schema })
2481     or $self->throw_exception ($sqlt->error);
2482
2483   foreach my $db (@$databases) {
2484     $sqlt->reset();
2485     $sqlt->{schema} = $sqlt_schema;
2486     $sqlt->producer($db);
2487
2488     my $file;
2489     my $filename = $schema->ddl_filename($db, $version, $dir);
2490     if (-e $filename && ($version eq $schema_version )) {
2491       # if we are dumping the current version, overwrite the DDL
2492       carp "Overwriting existing DDL file - $filename";
2493       unlink($filename);
2494     }
2495
2496     my $output = $sqlt->translate;
2497     if(!$output) {
2498       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2499       next;
2500     }
2501     if(!open($file, ">$filename")) {
2502       $self->throw_exception("Can't open $filename for writing ($!)");
2503       next;
2504     }
2505     print $file $output;
2506     close($file);
2507
2508     next unless ($preversion);
2509
2510     require SQL::Translator::Diff;
2511
2512     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2513     if(!-e $prefilename) {
2514       carp("No previous schema file found ($prefilename)");
2515       next;
2516     }
2517
2518     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2519     if(-e $difffile) {
2520       carp("Overwriting existing diff file - $difffile");
2521       unlink($difffile);
2522     }
2523
2524     my $source_schema;
2525     {
2526       my $t = SQL::Translator->new($sqltargs);
2527       $t->debug( 0 );
2528       $t->trace( 0 );
2529
2530       $t->parser( $db )
2531         or $self->throw_exception ($t->error);
2532
2533       my $out = $t->translate( $prefilename )
2534         or $self->throw_exception ($t->error);
2535
2536       $source_schema = $t->schema;
2537
2538       $source_schema->name( $prefilename )
2539         unless ( $source_schema->name );
2540     }
2541
2542     # The "new" style of producers have sane normalization and can support
2543     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2544     # And we have to diff parsed SQL against parsed SQL.
2545     my $dest_schema = $sqlt_schema;
2546
2547     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2548       my $t = SQL::Translator->new($sqltargs);
2549       $t->debug( 0 );
2550       $t->trace( 0 );
2551
2552       $t->parser( $db )
2553         or $self->throw_exception ($t->error);
2554
2555       my $out = $t->translate( $filename )
2556         or $self->throw_exception ($t->error);
2557
2558       $dest_schema = $t->schema;
2559
2560       $dest_schema->name( $filename )
2561         unless $dest_schema->name;
2562     }
2563
2564     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2565                                                   $dest_schema,   $db,
2566                                                   $sqltargs
2567                                                  );
2568     if(!open $file, ">$difffile") {
2569       $self->throw_exception("Can't write to $difffile ($!)");
2570       next;
2571     }
2572     print $file $diff;
2573     close($file);
2574   }
2575 }
2576
2577 =head2 deployment_statements
2578
2579 =over 4
2580
2581 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2582
2583 =back
2584
2585 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2586
2587 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2588 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2589
2590 C<$directory> is used to return statements from files in a previously created
2591 L</create_ddl_dir> directory and is optional. The filenames are constructed
2592 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2593
2594 If no C<$directory> is specified then the statements are constructed on the
2595 fly using L<SQL::Translator> and C<$version> is ignored.
2596
2597 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2598
2599 =cut
2600
2601 sub deployment_statements {
2602   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2603   $type ||= $self->sqlt_type;
2604   $version ||= $schema->schema_version || '1.x';
2605   $dir ||= './';
2606   my $filename = $schema->ddl_filename($type, $version, $dir);
2607   if(-f $filename)
2608   {
2609       my $file;
2610       open($file, "<$filename")
2611         or $self->throw_exception("Can't open $filename ($!)");
2612       my @rows = <$file>;
2613       close($file);
2614       return join('', @rows);
2615   }
2616
2617   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2618     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2619   }
2620
2621   # sources needs to be a parser arg, but for simplicty allow at top level
2622   # coming in
2623   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2624       if exists $sqltargs->{sources};
2625
2626   my $tr = SQL::Translator->new(
2627     producer => "SQL::Translator::Producer::${type}",
2628     %$sqltargs,
2629     parser => 'SQL::Translator::Parser::DBIx::Class',
2630     data => $schema,
2631   );
2632
2633   my @ret;
2634   my $wa = wantarray;
2635   if ($wa) {
2636     @ret = $tr->translate;
2637   }
2638   else {
2639     $ret[0] = $tr->translate;
2640   }
2641
2642   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2643     unless (@ret && defined $ret[0]);
2644
2645   return $wa ? @ret : $ret[0];
2646 }
2647
2648 sub deploy {
2649   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2650   my $deploy = sub {
2651     my $line = shift;
2652     return if($line =~ /^--/);
2653     return if(!$line);
2654     # next if($line =~ /^DROP/m);
2655     return if($line =~ /^BEGIN TRANSACTION/m);
2656     return if($line =~ /^COMMIT/m);
2657     return if $line =~ /^\s+$/; # skip whitespace only
2658     $self->_query_start($line);
2659     try {
2660       # do a dbh_do cycle here, as we need some error checking in
2661       # place (even though we will ignore errors)
2662       $self->dbh_do (sub { $_[1]->do($line) });
2663     } catch {
2664       carp qq{$_ (running "${line}")};
2665     };
2666     $self->_query_end($line);
2667   };
2668   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2669   if (@statements > 1) {
2670     foreach my $statement (@statements) {
2671       $deploy->( $statement );
2672     }
2673   }
2674   elsif (@statements == 1) {
2675     foreach my $line ( split(";\n", $statements[0])) {
2676       $deploy->( $line );
2677     }
2678   }
2679 }
2680
2681 =head2 datetime_parser
2682
2683 Returns the datetime parser class
2684
2685 =cut
2686
2687 sub datetime_parser {
2688   my $self = shift;
2689   return $self->{datetime_parser} ||= do {
2690     $self->build_datetime_parser(@_);
2691   };
2692 }
2693
2694 =head2 datetime_parser_type
2695
2696 Defines (returns) the datetime parser class - currently hardwired to
2697 L<DateTime::Format::MySQL>
2698
2699 =cut
2700
2701 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2702
2703 =head2 build_datetime_parser
2704
2705 See L</datetime_parser>
2706
2707 =cut
2708
2709 sub build_datetime_parser {
2710   my $self = shift;
2711   my $type = $self->datetime_parser_type(@_);
2712   $self->ensure_class_loaded ($type);
2713   return $type;
2714 }
2715
2716
2717 =head2 is_replicating
2718
2719 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2720 replicate from a master database.  Default is undef, which is the result
2721 returned by databases that don't support replication.
2722
2723 =cut
2724
2725 sub is_replicating {
2726     return;
2727
2728 }
2729
2730 =head2 lag_behind_master
2731
2732 Returns a number that represents a certain amount of lag behind a master db
2733 when a given storage is replicating.  The number is database dependent, but
2734 starts at zero and increases with the amount of lag. Default in undef
2735
2736 =cut
2737
2738 sub lag_behind_master {
2739     return;
2740 }
2741
2742 =head2 relname_to_table_alias
2743
2744 =over 4
2745
2746 =item Arguments: $relname, $join_count
2747
2748 =back
2749
2750 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2751 queries.
2752
2753 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2754 way these aliases are named.
2755
2756 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2757 otherwise C<"$relname">.
2758
2759 =cut
2760
2761 sub relname_to_table_alias {
2762   my ($self, $relname, $join_count) = @_;
2763
2764   my $alias = ($join_count && $join_count > 1 ?
2765     join('_', $relname, $join_count) : $relname);
2766
2767   return $alias;
2768 }
2769
2770 1;
2771
2772 =head1 USAGE NOTES
2773
2774 =head2 DBIx::Class and AutoCommit
2775
2776 DBIx::Class can do some wonderful magic with handling exceptions,
2777 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2778 (the default) combined with C<txn_do> for transaction support.
2779
2780 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2781 in an assumed transaction between commits, and you're telling us you'd
2782 like to manage that manually.  A lot of the magic protections offered by
2783 this module will go away.  We can't protect you from exceptions due to database
2784 disconnects because we don't know anything about how to restart your
2785 transactions.  You're on your own for handling all sorts of exceptional
2786 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2787 be with raw DBI.
2788
2789
2790 =head1 AUTHORS
2791
2792 Matt S. Trout <mst@shadowcatsystems.co.uk>
2793
2794 Andy Grundman <andy@hybridized.org>
2795
2796 =head1 LICENSE
2797
2798 You may distribute this code under the same terms as Perl itself.
2799
2800 =cut