29fc1bfb384212a91ee4da47da958d334ed58765
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } @capabilities );
60
61
62 # Each of these methods need _determine_driver called before itself
63 # in order to function reliably. This is a purely DRY optimization
64 #
65 # get_(use)_dbms_capability need to be called on the correct Storage
66 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
67 # _determine_supports_X which obv. needs a correct driver as well
68 my @rdbms_specific_methods = qw/
69   deployment_statements
70   sqlt_type
71   sql_maker
72   build_datetime_parser
73   datetime_parser_type
74
75   insert
76   insert_bulk
77   update
78   delete
79   select
80   select_single
81
82   get_use_dbms_capability
83   get_dbms_capability
84
85   _server_info
86   _get_server_version
87 /;
88
89 for my $meth (@rdbms_specific_methods) {
90
91   my $orig = __PACKAGE__->can ($meth)
92     or die "$meth is not a ::Storage::DBI method!";
93
94   no strict qw/refs/;
95   no warnings qw/redefine/;
96   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
97     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
98       $_[0]->_determine_driver;
99
100       # This for some reason crashes and burns on perl 5.8.1
101       # IFF the method ends up throwing an exception
102       #goto $_[0]->can ($meth);
103
104       my $cref = $_[0]->can ($meth);
105       goto $cref;
106     }
107     goto $orig;
108   };
109 }
110
111
112 =head1 NAME
113
114 DBIx::Class::Storage::DBI - DBI storage handler
115
116 =head1 SYNOPSIS
117
118   my $schema = MySchema->connect('dbi:SQLite:my.db');
119
120   $schema->storage->debug(1);
121
122   my @stuff = $schema->storage->dbh_do(
123     sub {
124       my ($storage, $dbh, @args) = @_;
125       $dbh->do("DROP TABLE authors");
126     },
127     @column_list
128   );
129
130   $schema->resultset('Book')->search({
131      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
132   });
133
134 =head1 DESCRIPTION
135
136 This class represents the connection to an RDBMS via L<DBI>.  See
137 L<DBIx::Class::Storage> for general information.  This pod only
138 documents DBI-specific methods and behaviors.
139
140 =head1 METHODS
141
142 =cut
143
144 sub new {
145   my $new = shift->next::method(@_);
146
147   $new->transaction_depth(0);
148   $new->_sql_maker_opts({});
149   $new->_dbh_details({});
150   $new->{savepoints} = [];
151   $new->{_in_dbh_do} = 0;
152   $new->{_dbh_gen} = 0;
153
154   # read below to see what this does
155   $new->_arm_global_destructor;
156
157   $new;
158 }
159
160 # This is hack to work around perl shooting stuff in random
161 # order on exit(). If we do not walk the remaining storage
162 # objects in an END block, there is a *small but real* chance
163 # of a fork()ed child to kill the parent's shared DBI handle,
164 # *before perl reaches the DESTROY in this package*
165 # Yes, it is ugly and effective.
166 {
167   my %seek_and_destroy;
168
169   sub _arm_global_destructor {
170     my $self = shift;
171     my $key = Scalar::Util::refaddr ($self);
172     $seek_and_destroy{$key} = $self;
173     Scalar::Util::weaken ($seek_and_destroy{$key});
174   }
175
176   END {
177     local $?; # just in case the DBI destructor changes it somehow
178
179     # destroy just the object if not native to this process/thread
180     $_->_preserve_foreign_dbh for (grep
181       { defined $_ }
182       values %seek_and_destroy
183     );
184   }
185 }
186
187 sub DESTROY {
188   my $self = shift;
189
190   # some databases spew warnings on implicit disconnect
191   local $SIG{__WARN__} = sub {};
192   $self->_dbh(undef);
193 }
194
195 sub _preserve_foreign_dbh {
196   my $self = shift;
197
198   return unless $self->_dbh;
199
200   $self->_verify_tid;
201
202   return unless $self->_dbh;
203
204   $self->_verify_pid;
205
206 }
207
208 # handle pid changes correctly - do not destroy parent's connection
209 sub _verify_pid {
210   my $self = shift;
211
212   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
213
214   $self->_dbh->{InactiveDestroy} = 1;
215   $self->_dbh(undef);
216   $self->{_dbh_gen}++;
217
218   return;
219 }
220
221 # very similar to above, but seems to FAIL if I set InactiveDestroy
222 sub _verify_tid {
223   my $self = shift;
224
225   if ( ! defined $self->_conn_tid ) {
226     return; # no threads
227   }
228   elsif ( $self->_conn_tid == threads->tid ) {
229     return; # same thread
230   }
231
232   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
233   $self->_dbh(undef);
234   $self->{_dbh_gen}++;
235
236   return;
237 }
238
239
240 =head2 connect_info
241
242 This method is normally called by L<DBIx::Class::Schema/connection>, which
243 encapsulates its argument list in an arrayref before passing them here.
244
245 The argument list may contain:
246
247 =over
248
249 =item *
250
251 The same 4-element argument set one would normally pass to
252 L<DBI/connect>, optionally followed by
253 L<extra attributes|/DBIx::Class specific connection attributes>
254 recognized by DBIx::Class:
255
256   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
257
258 =item *
259
260 A single code reference which returns a connected
261 L<DBI database handle|DBI/connect> optionally followed by
262 L<extra attributes|/DBIx::Class specific connection attributes> recognized
263 by DBIx::Class:
264
265   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
266
267 =item *
268
269 A single hashref with all the attributes and the dsn/user/password
270 mixed together:
271
272   $connect_info_args = [{
273     dsn => $dsn,
274     user => $user,
275     password => $pass,
276     %dbi_attributes,
277     %extra_attributes,
278   }];
279
280   $connect_info_args = [{
281     dbh_maker => sub { DBI->connect (...) },
282     %dbi_attributes,
283     %extra_attributes,
284   }];
285
286 This is particularly useful for L<Catalyst> based applications, allowing the
287 following config (L<Config::General> style):
288
289   <Model::DB>
290     schema_class   App::DB
291     <connect_info>
292       dsn          dbi:mysql:database=test
293       user         testuser
294       password     TestPass
295       AutoCommit   1
296     </connect_info>
297   </Model::DB>
298
299 The C<dsn>/C<user>/C<password> combination can be substituted by the
300 C<dbh_maker> key whose value is a coderef that returns a connected
301 L<DBI database handle|DBI/connect>
302
303 =back
304
305 Please note that the L<DBI> docs recommend that you always explicitly
306 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
307 recommends that it be set to I<1>, and that you perform transactions
308 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
309 to I<1> if you do not do explicitly set it to zero.  This is the default
310 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
311
312 =head3 DBIx::Class specific connection attributes
313
314 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
315 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
316 the following connection options. These options can be mixed in with your other
317 L<DBI> connection attributes, or placed in a separate hashref
318 (C<\%extra_attributes>) as shown above.
319
320 Every time C<connect_info> is invoked, any previous settings for
321 these options will be cleared before setting the new ones, regardless of
322 whether any options are specified in the new C<connect_info>.
323
324
325 =over
326
327 =item on_connect_do
328
329 Specifies things to do immediately after connecting or re-connecting to
330 the database.  Its value may contain:
331
332 =over
333
334 =item a scalar
335
336 This contains one SQL statement to execute.
337
338 =item an array reference
339
340 This contains SQL statements to execute in order.  Each element contains
341 a string or a code reference that returns a string.
342
343 =item a code reference
344
345 This contains some code to execute.  Unlike code references within an
346 array reference, its return value is ignored.
347
348 =back
349
350 =item on_disconnect_do
351
352 Takes arguments in the same form as L</on_connect_do> and executes them
353 immediately before disconnecting from the database.
354
355 Note, this only runs if you explicitly call L</disconnect> on the
356 storage object.
357
358 =item on_connect_call
359
360 A more generalized form of L</on_connect_do> that calls the specified
361 C<connect_call_METHOD> methods in your storage driver.
362
363   on_connect_do => 'select 1'
364
365 is equivalent to:
366
367   on_connect_call => [ [ do_sql => 'select 1' ] ]
368
369 Its values may contain:
370
371 =over
372
373 =item a scalar
374
375 Will call the C<connect_call_METHOD> method.
376
377 =item a code reference
378
379 Will execute C<< $code->($storage) >>
380
381 =item an array reference
382
383 Each value can be a method name or code reference.
384
385 =item an array of arrays
386
387 For each array, the first item is taken to be the C<connect_call_> method name
388 or code reference, and the rest are parameters to it.
389
390 =back
391
392 Some predefined storage methods you may use:
393
394 =over
395
396 =item do_sql
397
398 Executes a SQL string or a code reference that returns a SQL string. This is
399 what L</on_connect_do> and L</on_disconnect_do> use.
400
401 It can take:
402
403 =over
404
405 =item a scalar
406
407 Will execute the scalar as SQL.
408
409 =item an arrayref
410
411 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
412 attributes hashref and bind values.
413
414 =item a code reference
415
416 Will execute C<< $code->($storage) >> and execute the return array refs as
417 above.
418
419 =back
420
421 =item datetime_setup
422
423 Execute any statements necessary to initialize the database session to return
424 and accept datetime/timestamp values used with
425 L<DBIx::Class::InflateColumn::DateTime>.
426
427 Only necessary for some databases, see your specific storage driver for
428 implementation details.
429
430 =back
431
432 =item on_disconnect_call
433
434 Takes arguments in the same form as L</on_connect_call> and executes them
435 immediately before disconnecting from the database.
436
437 Calls the C<disconnect_call_METHOD> methods as opposed to the
438 C<connect_call_METHOD> methods called by L</on_connect_call>.
439
440 Note, this only runs if you explicitly call L</disconnect> on the
441 storage object.
442
443 =item disable_sth_caching
444
445 If set to a true value, this option will disable the caching of
446 statement handles via L<DBI/prepare_cached>.
447
448 =item limit_dialect
449
450 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
451 default L</sql_limit_dialect> setting of the storage (if any). For a list
452 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
453
454 =item quote_char
455
456 Specifies what characters to use to quote table and column names.
457
458 C<quote_char> expects either a single character, in which case is it
459 is placed on either side of the table/column name, or an arrayref of length
460 2 in which case the table/column name is placed between the elements.
461
462 For example under MySQL you should use C<< quote_char => '`' >>, and for
463 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
464
465 =item name_sep
466
467 This parameter is only useful in conjunction with C<quote_char>, and is used to
468 specify the character that separates elements (schemas, tables, columns) from
469 each other. If unspecified it defaults to the most commonly used C<.>.
470
471 =item unsafe
472
473 This Storage driver normally installs its own C<HandleError>, sets
474 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
475 all database handles, including those supplied by a coderef.  It does this
476 so that it can have consistent and useful error behavior.
477
478 If you set this option to a true value, Storage will not do its usual
479 modifications to the database handle's attributes, and instead relies on
480 the settings in your connect_info DBI options (or the values you set in
481 your connection coderef, in the case that you are connecting via coderef).
482
483 Note that your custom settings can cause Storage to malfunction,
484 especially if you set a C<HandleError> handler that suppresses exceptions
485 and/or disable C<RaiseError>.
486
487 =item auto_savepoint
488
489 If this option is true, L<DBIx::Class> will use savepoints when nesting
490 transactions, making it possible to recover from failure in the inner
491 transaction without having to abort all outer transactions.
492
493 =item cursor_class
494
495 Use this argument to supply a cursor class other than the default
496 L<DBIx::Class::Storage::DBI::Cursor>.
497
498 =back
499
500 Some real-life examples of arguments to L</connect_info> and
501 L<DBIx::Class::Schema/connect>
502
503   # Simple SQLite connection
504   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
505
506   # Connect via subref
507   ->connect_info([ sub { DBI->connect(...) } ]);
508
509   # Connect via subref in hashref
510   ->connect_info([{
511     dbh_maker => sub { DBI->connect(...) },
512     on_connect_do => 'alter session ...',
513   }]);
514
515   # A bit more complicated
516   ->connect_info(
517     [
518       'dbi:Pg:dbname=foo',
519       'postgres',
520       'my_pg_password',
521       { AutoCommit => 1 },
522       { quote_char => q{"} },
523     ]
524   );
525
526   # Equivalent to the previous example
527   ->connect_info(
528     [
529       'dbi:Pg:dbname=foo',
530       'postgres',
531       'my_pg_password',
532       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
533     ]
534   );
535
536   # Same, but with hashref as argument
537   # See parse_connect_info for explanation
538   ->connect_info(
539     [{
540       dsn         => 'dbi:Pg:dbname=foo',
541       user        => 'postgres',
542       password    => 'my_pg_password',
543       AutoCommit  => 1,
544       quote_char  => q{"},
545       name_sep    => q{.},
546     }]
547   );
548
549   # Subref + DBIx::Class-specific connection options
550   ->connect_info(
551     [
552       sub { DBI->connect(...) },
553       {
554           quote_char => q{`},
555           name_sep => q{@},
556           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
557           disable_sth_caching => 1,
558       },
559     ]
560   );
561
562
563
564 =cut
565
566 sub connect_info {
567   my ($self, $info) = @_;
568
569   return $self->_connect_info if !$info;
570
571   $self->_connect_info($info); # copy for _connect_info
572
573   $info = $self->_normalize_connect_info($info)
574     if ref $info eq 'ARRAY';
575
576   for my $storage_opt (keys %{ $info->{storage_options} }) {
577     my $value = $info->{storage_options}{$storage_opt};
578
579     $self->$storage_opt($value);
580   }
581
582   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
583   #  the new set of options
584   $self->_sql_maker(undef);
585   $self->_sql_maker_opts({});
586
587   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
588     my $value = $info->{sql_maker_options}{$sql_maker_opt};
589
590     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
591   }
592
593   my %attrs = (
594     %{ $self->_default_dbi_connect_attributes || {} },
595     %{ $info->{attributes} || {} },
596   );
597
598   my @args = @{ $info->{arguments} };
599
600   $self->_dbi_connect_info([@args,
601     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
602
603   # FIXME - dirty:
604   # save attributes them in a separate accessor so they are always
605   # introspectable, even in case of a CODE $dbhmaker
606   $self->_dbic_connect_attributes (\%attrs);
607
608   return $self->_connect_info;
609 }
610
611 sub _normalize_connect_info {
612   my ($self, $info_arg) = @_;
613   my %info;
614
615   my @args = @$info_arg;  # take a shallow copy for further mutilation
616
617   # combine/pre-parse arguments depending on invocation style
618
619   my %attrs;
620   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
621     %attrs = %{ $args[1] || {} };
622     @args = $args[0];
623   }
624   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
625     %attrs = %{$args[0]};
626     @args = ();
627     if (my $code = delete $attrs{dbh_maker}) {
628       @args = $code;
629
630       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
631       if (@ignored) {
632         carp sprintf (
633             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
634           . "to the result of 'dbh_maker'",
635
636           join (', ', map { "'$_'" } (@ignored) ),
637         );
638       }
639     }
640     else {
641       @args = delete @attrs{qw/dsn user password/};
642     }
643   }
644   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
645     %attrs = (
646       % { $args[3] || {} },
647       % { $args[4] || {} },
648     );
649     @args = @args[0,1,2];
650   }
651
652   $info{arguments} = \@args;
653
654   my @storage_opts = grep exists $attrs{$_},
655     @storage_options, 'cursor_class';
656
657   @{ $info{storage_options} }{@storage_opts} =
658     delete @attrs{@storage_opts} if @storage_opts;
659
660   my @sql_maker_opts = grep exists $attrs{$_},
661     qw/limit_dialect quote_char name_sep/;
662
663   @{ $info{sql_maker_options} }{@sql_maker_opts} =
664     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
665
666   $info{attributes} = \%attrs if %attrs;
667
668   return \%info;
669 }
670
671 sub _default_dbi_connect_attributes {
672   return {
673     AutoCommit => 1,
674     RaiseError => 1,
675     PrintError => 0,
676   };
677 }
678
679 =head2 on_connect_do
680
681 This method is deprecated in favour of setting via L</connect_info>.
682
683 =cut
684
685 =head2 on_disconnect_do
686
687 This method is deprecated in favour of setting via L</connect_info>.
688
689 =cut
690
691 sub _parse_connect_do {
692   my ($self, $type) = @_;
693
694   my $val = $self->$type;
695   return () if not defined $val;
696
697   my @res;
698
699   if (not ref($val)) {
700     push @res, [ 'do_sql', $val ];
701   } elsif (ref($val) eq 'CODE') {
702     push @res, $val;
703   } elsif (ref($val) eq 'ARRAY') {
704     push @res, map { [ 'do_sql', $_ ] } @$val;
705   } else {
706     $self->throw_exception("Invalid type for $type: ".ref($val));
707   }
708
709   return \@res;
710 }
711
712 =head2 dbh_do
713
714 Arguments: ($subref | $method_name), @extra_coderef_args?
715
716 Execute the given $subref or $method_name using the new exception-based
717 connection management.
718
719 The first two arguments will be the storage object that C<dbh_do> was called
720 on and a database handle to use.  Any additional arguments will be passed
721 verbatim to the called subref as arguments 2 and onwards.
722
723 Using this (instead of $self->_dbh or $self->dbh) ensures correct
724 exception handling and reconnection (or failover in future subclasses).
725
726 Your subref should have no side-effects outside of the database, as
727 there is the potential for your subref to be partially double-executed
728 if the database connection was stale/dysfunctional.
729
730 Example:
731
732   my @stuff = $schema->storage->dbh_do(
733     sub {
734       my ($storage, $dbh, @cols) = @_;
735       my $cols = join(q{, }, @cols);
736       $dbh->selectrow_array("SELECT $cols FROM foo");
737     },
738     @column_list
739   );
740
741 =cut
742
743 sub dbh_do {
744   my $self = shift;
745   my $code = shift;
746
747   my $dbh = $self->_get_dbh;
748
749   return $self->$code($dbh, @_)
750     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
751
752   local $self->{_in_dbh_do} = 1;
753
754   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
755   my $args = \@_;
756   return try {
757     $self->$code ($dbh, @$args);
758   } catch {
759     $self->throw_exception($_) if $self->connected;
760
761     # We were not connected - reconnect and retry, but let any
762     #  exception fall right through this time
763     carp "Retrying $code after catching disconnected exception: $_"
764       if $ENV{DBIC_DBIRETRY_DEBUG};
765
766     $self->_populate_dbh;
767     $self->$code($self->_dbh, @$args);
768   };
769 }
770
771 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
772 # It also informs dbh_do to bypass itself while under the direction of txn_do,
773 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
774 sub txn_do {
775   my $self = shift;
776   my $coderef = shift;
777
778   ref $coderef eq 'CODE' or $self->throw_exception
779     ('$coderef must be a CODE reference');
780
781   local $self->{_in_dbh_do} = 1;
782
783   my @result;
784   my $want_array = wantarray;
785
786   my $tried = 0;
787   while(1) {
788     my $exception;
789
790     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
791     my $args = \@_;
792
793     try {
794       $self->txn_begin;
795       my $txn_start_depth = $self->transaction_depth;
796       if($want_array) {
797           @result = $coderef->(@$args);
798       }
799       elsif(defined $want_array) {
800           $result[0] = $coderef->(@$args);
801       }
802       else {
803           $coderef->(@$args);
804       }
805
806       my $delta_txn = $txn_start_depth - $self->transaction_depth;
807       if ($delta_txn == 0) {
808         $self->txn_commit;
809       }
810       elsif ($delta_txn != 1) {
811         # an off-by-one would mean we fired a rollback
812         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
813       }
814     } catch {
815       $exception = $_;
816     };
817
818     if(! defined $exception) { return $want_array ? @result : $result[0] }
819
820     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
821       my $rollback_exception;
822       try { $self->txn_rollback } catch { $rollback_exception = shift };
823       if(defined $rollback_exception) {
824         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
825         $self->throw_exception($exception)  # propagate nested rollback
826           if $rollback_exception =~ /$exception_class/;
827
828         $self->throw_exception(
829           "Transaction aborted: ${exception}. "
830           . "Rollback failed: ${rollback_exception}"
831         );
832       }
833       $self->throw_exception($exception)
834     }
835
836     # We were not connected, and was first try - reconnect and retry
837     # via the while loop
838     carp "Retrying $coderef after catching disconnected exception: $exception"
839       if $ENV{DBIC_TXNRETRY_DEBUG};
840     $self->_populate_dbh;
841   }
842 }
843
844 =head2 disconnect
845
846 Our C<disconnect> method also performs a rollback first if the
847 database is not in C<AutoCommit> mode.
848
849 =cut
850
851 sub disconnect {
852   my ($self) = @_;
853
854   if( $self->_dbh ) {
855     my @actions;
856
857     push @actions, ( $self->on_disconnect_call || () );
858     push @actions, $self->_parse_connect_do ('on_disconnect_do');
859
860     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
861
862     $self->_dbh_rollback unless $self->_dbh_autocommit;
863
864     %{ $self->_dbh->{CachedKids} } = ();
865     $self->_dbh->disconnect;
866     $self->_dbh(undef);
867     $self->{_dbh_gen}++;
868   }
869 }
870
871 =head2 with_deferred_fk_checks
872
873 =over 4
874
875 =item Arguments: C<$coderef>
876
877 =item Return Value: The return value of $coderef
878
879 =back
880
881 Storage specific method to run the code ref with FK checks deferred or
882 in MySQL's case disabled entirely.
883
884 =cut
885
886 # Storage subclasses should override this
887 sub with_deferred_fk_checks {
888   my ($self, $sub) = @_;
889   $sub->();
890 }
891
892 =head2 connected
893
894 =over
895
896 =item Arguments: none
897
898 =item Return Value: 1|0
899
900 =back
901
902 Verifies that the current database handle is active and ready to execute
903 an SQL statement (e.g. the connection did not get stale, server is still
904 answering, etc.) This method is used internally by L</dbh>.
905
906 =cut
907
908 sub connected {
909   my $self = shift;
910   return 0 unless $self->_seems_connected;
911
912   #be on the safe side
913   local $self->_dbh->{RaiseError} = 1;
914
915   return $self->_ping;
916 }
917
918 sub _seems_connected {
919   my $self = shift;
920
921   $self->_preserve_foreign_dbh;
922
923   my $dbh = $self->_dbh
924     or return 0;
925
926   return $dbh->FETCH('Active');
927 }
928
929 sub _ping {
930   my $self = shift;
931
932   my $dbh = $self->_dbh or return 0;
933
934   return $dbh->ping;
935 }
936
937 sub ensure_connected {
938   my ($self) = @_;
939
940   unless ($self->connected) {
941     $self->_populate_dbh;
942   }
943 }
944
945 =head2 dbh
946
947 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
948 is guaranteed to be healthy by implicitly calling L</connected>, and if
949 necessary performing a reconnection before returning. Keep in mind that this
950 is very B<expensive> on some database engines. Consider using L</dbh_do>
951 instead.
952
953 =cut
954
955 sub dbh {
956   my ($self) = @_;
957
958   if (not $self->_dbh) {
959     $self->_populate_dbh;
960   } else {
961     $self->ensure_connected;
962   }
963   return $self->_dbh;
964 }
965
966 # this is the internal "get dbh or connect (don't check)" method
967 sub _get_dbh {
968   my $self = shift;
969   $self->_preserve_foreign_dbh;
970   $self->_populate_dbh unless $self->_dbh;
971   return $self->_dbh;
972 }
973
974 sub sql_maker {
975   my ($self) = @_;
976   unless ($self->_sql_maker) {
977     my $sql_maker_class = $self->sql_maker_class;
978     $self->ensure_class_loaded ($sql_maker_class);
979
980     my %opts = %{$self->_sql_maker_opts||{}};
981     my $dialect =
982       $opts{limit_dialect}
983         ||
984       $self->sql_limit_dialect
985         ||
986       do {
987         my $s_class = (ref $self) || $self;
988         carp (
989           "Your storage class ($s_class) does not set sql_limit_dialect and you "
990         . 'have not supplied an explicit limit_dialect in your connection_info. '
991         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
992         . 'databases but can be (and often is) painfully slow.'
993         );
994
995         'GenericSubQ';
996       }
997     ;
998
999     $self->_sql_maker($sql_maker_class->new(
1000       bindtype=>'columns',
1001       array_datatypes => 1,
1002       limit_dialect => $dialect,
1003       name_sep => '.',
1004       %opts,
1005     ));
1006   }
1007   return $self->_sql_maker;
1008 }
1009
1010 # nothing to do by default
1011 sub _rebless {}
1012 sub _init {}
1013
1014 sub _populate_dbh {
1015   my ($self) = @_;
1016
1017   my @info = @{$self->_dbi_connect_info || []};
1018   $self->_dbh(undef); # in case ->connected failed we might get sent here
1019   $self->_dbh_details({}); # reset everything we know
1020
1021   $self->_dbh($self->_connect(@info));
1022
1023   $self->_conn_pid($$);
1024   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1025
1026   $self->_determine_driver;
1027
1028   # Always set the transaction depth on connect, since
1029   #  there is no transaction in progress by definition
1030   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1031
1032   $self->_run_connection_actions unless $self->{_in_determine_driver};
1033 }
1034
1035 sub _run_connection_actions {
1036   my $self = shift;
1037   my @actions;
1038
1039   push @actions, ( $self->on_connect_call || () );
1040   push @actions, $self->_parse_connect_do ('on_connect_do');
1041
1042   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1043 }
1044
1045
1046
1047 sub set_use_dbms_capability {
1048   $_[0]->set_inherited ($_[1], $_[2]);
1049 }
1050
1051 sub get_use_dbms_capability {
1052   my ($self, $capname) = @_;
1053
1054   my $use = $self->get_inherited ($capname);
1055   return defined $use
1056     ? $use
1057     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1058   ;
1059 }
1060
1061 sub set_dbms_capability {
1062   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1063 }
1064
1065 sub get_dbms_capability {
1066   my ($self, $capname) = @_;
1067
1068   my $cap = $self->_dbh_details->{capability}{$capname};
1069
1070   unless (defined $cap) {
1071     if (my $meth = $self->can ("_determine$capname")) {
1072       $cap = $self->$meth ? 1 : 0;
1073     }
1074     else {
1075       $cap = 0;
1076     }
1077
1078     $self->set_dbms_capability ($capname, $cap);
1079   }
1080
1081   return $cap;
1082 }
1083
1084 sub _server_info {
1085   my $self = shift;
1086
1087   my $info;
1088   unless ($info = $self->_dbh_details->{info}) {
1089
1090     $info = {};
1091
1092     my $server_version = try { $self->_get_server_version };
1093
1094     if (defined $server_version) {
1095       $info->{dbms_version} = $server_version;
1096
1097       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1098       my @verparts = split (/\./, $numeric_version);
1099       if (
1100         @verparts
1101           &&
1102         $verparts[0] <= 999
1103       ) {
1104         # consider only up to 3 version parts, iff not more than 3 digits
1105         my @use_parts;
1106         while (@verparts && @use_parts < 3) {
1107           my $p = shift @verparts;
1108           last if $p > 999;
1109           push @use_parts, $p;
1110         }
1111         push @use_parts, 0 while @use_parts < 3;
1112
1113         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1114       }
1115     }
1116
1117     $self->_dbh_details->{info} = $info;
1118   }
1119
1120   return $info;
1121 }
1122
1123 sub _get_server_version {
1124   shift->_get_dbh->get_info(18);
1125 }
1126
1127 sub _determine_driver {
1128   my ($self) = @_;
1129
1130   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1131     my $started_connected = 0;
1132     local $self->{_in_determine_driver} = 1;
1133
1134     if (ref($self) eq __PACKAGE__) {
1135       my $driver;
1136       if ($self->_dbh) { # we are connected
1137         $driver = $self->_dbh->{Driver}{Name};
1138         $started_connected = 1;
1139       } else {
1140         # if connect_info is a CODEREF, we have no choice but to connect
1141         if (ref $self->_dbi_connect_info->[0] &&
1142             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1143           $self->_populate_dbh;
1144           $driver = $self->_dbh->{Driver}{Name};
1145         }
1146         else {
1147           # try to use dsn to not require being connected, the driver may still
1148           # force a connection in _rebless to determine version
1149           # (dsn may not be supplied at all if all we do is make a mock-schema)
1150           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1151           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1152           $driver ||= $ENV{DBI_DRIVER};
1153         }
1154       }
1155
1156       if ($driver) {
1157         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1158         if ($self->load_optional_class($storage_class)) {
1159           mro::set_mro($storage_class, 'c3');
1160           bless $self, $storage_class;
1161           $self->_rebless();
1162         }
1163       }
1164     }
1165
1166     $self->_driver_determined(1);
1167
1168     $self->_init; # run driver-specific initializations
1169
1170     $self->_run_connection_actions
1171         if !$started_connected && defined $self->_dbh;
1172   }
1173 }
1174
1175 sub _do_connection_actions {
1176   my $self          = shift;
1177   my $method_prefix = shift;
1178   my $call          = shift;
1179
1180   if (not ref($call)) {
1181     my $method = $method_prefix . $call;
1182     $self->$method(@_);
1183   } elsif (ref($call) eq 'CODE') {
1184     $self->$call(@_);
1185   } elsif (ref($call) eq 'ARRAY') {
1186     if (ref($call->[0]) ne 'ARRAY') {
1187       $self->_do_connection_actions($method_prefix, $_) for @$call;
1188     } else {
1189       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1190     }
1191   } else {
1192     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1193   }
1194
1195   return $self;
1196 }
1197
1198 sub connect_call_do_sql {
1199   my $self = shift;
1200   $self->_do_query(@_);
1201 }
1202
1203 sub disconnect_call_do_sql {
1204   my $self = shift;
1205   $self->_do_query(@_);
1206 }
1207
1208 # override in db-specific backend when necessary
1209 sub connect_call_datetime_setup { 1 }
1210
1211 sub _do_query {
1212   my ($self, $action) = @_;
1213
1214   if (ref $action eq 'CODE') {
1215     $action = $action->($self);
1216     $self->_do_query($_) foreach @$action;
1217   }
1218   else {
1219     # Most debuggers expect ($sql, @bind), so we need to exclude
1220     # the attribute hash which is the second argument to $dbh->do
1221     # furthermore the bind values are usually to be presented
1222     # as named arrayref pairs, so wrap those here too
1223     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1224     my $sql = shift @do_args;
1225     my $attrs = shift @do_args;
1226     my @bind = map { [ undef, $_ ] } @do_args;
1227
1228     $self->_query_start($sql, @bind);
1229     $self->_get_dbh->do($sql, $attrs, @do_args);
1230     $self->_query_end($sql, @bind);
1231   }
1232
1233   return $self;
1234 }
1235
1236 sub _connect {
1237   my ($self, @info) = @_;
1238
1239   $self->throw_exception("You failed to provide any connection info")
1240     if !@info;
1241
1242   my ($old_connect_via, $dbh);
1243
1244   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1245     $old_connect_via = $DBI::connect_via;
1246     $DBI::connect_via = 'connect';
1247   }
1248
1249   try {
1250     if(ref $info[0] eq 'CODE') {
1251        $dbh = $info[0]->();
1252     }
1253     else {
1254        $dbh = DBI->connect(@info);
1255     }
1256
1257     if (!$dbh) {
1258       die $DBI::errstr;
1259     }
1260
1261     unless ($self->unsafe) {
1262
1263       # this odd anonymous coderef dereference is in fact really
1264       # necessary to avoid the unwanted effect described in perl5
1265       # RT#75792
1266       sub {
1267         my $weak_self = $_[0];
1268         weaken $weak_self;
1269
1270         $_[1]->{HandleError} = sub {
1271           if ($weak_self) {
1272             $weak_self->throw_exception("DBI Exception: $_[0]");
1273           }
1274           else {
1275             # the handler may be invoked by something totally out of
1276             # the scope of DBIC
1277             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1278           }
1279         };
1280       }->($self, $dbh);
1281
1282       $dbh->{ShowErrorStatement} = 1;
1283       $dbh->{RaiseError} = 1;
1284       $dbh->{PrintError} = 0;
1285     }
1286   }
1287   catch {
1288     $self->throw_exception("DBI Connection failed: $_")
1289   }
1290   finally {
1291     $DBI::connect_via = $old_connect_via if $old_connect_via;
1292   };
1293
1294   $self->_dbh_autocommit($dbh->{AutoCommit});
1295   $dbh;
1296 }
1297
1298 sub svp_begin {
1299   my ($self, $name) = @_;
1300
1301   $name = $self->_svp_generate_name
1302     unless defined $name;
1303
1304   $self->throw_exception ("You can't use savepoints outside a transaction")
1305     if $self->{transaction_depth} == 0;
1306
1307   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1308     unless $self->can('_svp_begin');
1309
1310   push @{ $self->{savepoints} }, $name;
1311
1312   $self->debugobj->svp_begin($name) if $self->debug;
1313
1314   return $self->_svp_begin($name);
1315 }
1316
1317 sub svp_release {
1318   my ($self, $name) = @_;
1319
1320   $self->throw_exception ("You can't use savepoints outside a transaction")
1321     if $self->{transaction_depth} == 0;
1322
1323   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1324     unless $self->can('_svp_release');
1325
1326   if (defined $name) {
1327     $self->throw_exception ("Savepoint '$name' does not exist")
1328       unless grep { $_ eq $name } @{ $self->{savepoints} };
1329
1330     # Dig through the stack until we find the one we are releasing.  This keeps
1331     # the stack up to date.
1332     my $svp;
1333
1334     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1335   } else {
1336     $name = pop @{ $self->{savepoints} };
1337   }
1338
1339   $self->debugobj->svp_release($name) if $self->debug;
1340
1341   return $self->_svp_release($name);
1342 }
1343
1344 sub svp_rollback {
1345   my ($self, $name) = @_;
1346
1347   $self->throw_exception ("You can't use savepoints outside a transaction")
1348     if $self->{transaction_depth} == 0;
1349
1350   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1351     unless $self->can('_svp_rollback');
1352
1353   if (defined $name) {
1354       # If they passed us a name, verify that it exists in the stack
1355       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1356           $self->throw_exception("Savepoint '$name' does not exist!");
1357       }
1358
1359       # Dig through the stack until we find the one we are releasing.  This keeps
1360       # the stack up to date.
1361       while(my $s = pop(@{ $self->{savepoints} })) {
1362           last if($s eq $name);
1363       }
1364       # Add the savepoint back to the stack, as a rollback doesn't remove the
1365       # named savepoint, only everything after it.
1366       push(@{ $self->{savepoints} }, $name);
1367   } else {
1368       # We'll assume they want to rollback to the last savepoint
1369       $name = $self->{savepoints}->[-1];
1370   }
1371
1372   $self->debugobj->svp_rollback($name) if $self->debug;
1373
1374   return $self->_svp_rollback($name);
1375 }
1376
1377 sub _svp_generate_name {
1378   my ($self) = @_;
1379   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1380 }
1381
1382 sub txn_begin {
1383   my $self = shift;
1384
1385   # this means we have not yet connected and do not know the AC status
1386   # (e.g. coderef $dbh)
1387   if (! defined $self->_dbh_autocommit) {
1388     $self->ensure_connected;
1389   }
1390   # otherwise re-connect on pid changes, so
1391   # that the txn_depth is adjusted properly
1392   # the lightweight _get_dbh is good enoug here
1393   # (only superficial handle check, no pings)
1394   else {
1395     $self->_get_dbh;
1396   }
1397
1398   if($self->transaction_depth == 0) {
1399     $self->debugobj->txn_begin()
1400       if $self->debug;
1401     $self->_dbh_begin_work;
1402   }
1403   elsif ($self->auto_savepoint) {
1404     $self->svp_begin;
1405   }
1406   $self->{transaction_depth}++;
1407 }
1408
1409 sub _dbh_begin_work {
1410   my $self = shift;
1411
1412   # if the user is utilizing txn_do - good for him, otherwise we need to
1413   # ensure that the $dbh is healthy on BEGIN.
1414   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1415   # will be replaced by a failure of begin_work itself (which will be
1416   # then retried on reconnect)
1417   if ($self->{_in_dbh_do}) {
1418     $self->_dbh->begin_work;
1419   } else {
1420     $self->dbh_do(sub { $_[1]->begin_work });
1421   }
1422 }
1423
1424 sub txn_commit {
1425   my $self = shift;
1426   if ($self->{transaction_depth} == 1) {
1427     $self->debugobj->txn_commit()
1428       if ($self->debug);
1429     $self->_dbh_commit;
1430     $self->{transaction_depth} = 0
1431       if $self->_dbh_autocommit;
1432   }
1433   elsif($self->{transaction_depth} > 1) {
1434     $self->{transaction_depth}--;
1435     $self->svp_release
1436       if $self->auto_savepoint;
1437   }
1438   else {
1439     $self->throw_exception( 'Refusing to commit without a started transaction' );
1440   }
1441 }
1442
1443 sub _dbh_commit {
1444   my $self = shift;
1445   my $dbh  = $self->_dbh
1446     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1447   $dbh->commit;
1448 }
1449
1450 sub txn_rollback {
1451   my $self = shift;
1452   my $dbh = $self->_dbh;
1453   try {
1454     if ($self->{transaction_depth} == 1) {
1455       $self->debugobj->txn_rollback()
1456         if ($self->debug);
1457       $self->{transaction_depth} = 0
1458         if $self->_dbh_autocommit;
1459       $self->_dbh_rollback;
1460     }
1461     elsif($self->{transaction_depth} > 1) {
1462       $self->{transaction_depth}--;
1463       if ($self->auto_savepoint) {
1464         $self->svp_rollback;
1465         $self->svp_release;
1466       }
1467     }
1468     else {
1469       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1470     }
1471   }
1472   catch {
1473     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1474
1475     if ($_ !~ /$exception_class/) {
1476       # ensure that a failed rollback resets the transaction depth
1477       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1478     }
1479
1480     $self->throw_exception($_)
1481   };
1482 }
1483
1484 sub _dbh_rollback {
1485   my $self = shift;
1486   my $dbh  = $self->_dbh
1487     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1488   $dbh->rollback;
1489 }
1490
1491 # This used to be the top-half of _execute.  It was split out to make it
1492 #  easier to override in NoBindVars without duping the rest.  It takes up
1493 #  all of _execute's args, and emits $sql, @bind.
1494 sub _prep_for_execute {
1495   my ($self, $op, $extra_bind, $ident, $args) = @_;
1496
1497   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1498     $ident = $ident->from();
1499   }
1500
1501   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1502
1503   unshift(@bind,
1504     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1505       if $extra_bind;
1506   return ($sql, \@bind);
1507 }
1508
1509
1510 sub _fix_bind_params {
1511     my ($self, @bind) = @_;
1512
1513     ### Turn @bind from something like this:
1514     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1515     ### to this:
1516     ###   ( "'1'", "'1'", "'3'" )
1517     return
1518         map {
1519             if ( defined( $_ && $_->[1] ) ) {
1520                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1521             }
1522             else { q{'NULL'}; }
1523         } @bind;
1524 }
1525
1526 sub _query_start {
1527     my ( $self, $sql, @bind ) = @_;
1528
1529     if ( $self->debug ) {
1530         @bind = $self->_fix_bind_params(@bind);
1531
1532         $self->debugobj->query_start( $sql, @bind );
1533     }
1534 }
1535
1536 sub _query_end {
1537     my ( $self, $sql, @bind ) = @_;
1538
1539     if ( $self->debug ) {
1540         @bind = $self->_fix_bind_params(@bind);
1541         $self->debugobj->query_end( $sql, @bind );
1542     }
1543 }
1544
1545 sub _dbh_execute {
1546   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1547
1548   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1549
1550   $self->_query_start( $sql, @$bind );
1551
1552   my $sth = $self->sth($sql,$op);
1553
1554   my $placeholder_index = 1;
1555
1556   foreach my $bound (@$bind) {
1557     my $attributes = {};
1558     my($column_name, @data) = @$bound;
1559
1560     if ($bind_attributes) {
1561       $attributes = $bind_attributes->{$column_name}
1562       if defined $bind_attributes->{$column_name};
1563     }
1564
1565     foreach my $data (@data) {
1566       my $ref = ref $data;
1567       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1568
1569       $sth->bind_param($placeholder_index, $data, $attributes);
1570       $placeholder_index++;
1571     }
1572   }
1573
1574   # Can this fail without throwing an exception anyways???
1575   my $rv = $sth->execute();
1576   $self->throw_exception(
1577     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1578   ) if !$rv;
1579
1580   $self->_query_end( $sql, @$bind );
1581
1582   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1583 }
1584
1585 sub _execute {
1586     my $self = shift;
1587     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1588 }
1589
1590 sub _prefetch_insert_auto_nextvals {
1591   my ($self, $source, $to_insert) = @_;
1592
1593   my $upd = {};
1594
1595   foreach my $col ( $source->columns ) {
1596     if ( !defined $to_insert->{$col} ) {
1597       my $col_info = $source->column_info($col);
1598
1599       if ( $col_info->{auto_nextval} ) {
1600         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1601           'nextval',
1602           $col_info->{sequence} ||=
1603             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1604         );
1605       }
1606     }
1607   }
1608
1609   return $upd;
1610 }
1611
1612 sub insert {
1613   my $self = shift;
1614   my ($source, $to_insert, $opts) = @_;
1615
1616   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1617
1618   my $bind_attributes = $self->source_bind_attributes($source);
1619
1620   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1621
1622   if ($opts->{returning}) {
1623     my @ret_cols = @{$opts->{returning}};
1624
1625     my @ret_vals = try {
1626       local $SIG{__WARN__} = sub {};
1627       my @r = $sth->fetchrow_array;
1628       $sth->finish;
1629       @r;
1630     };
1631
1632     my %ret;
1633     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1634
1635     $updated_cols = {
1636       %$updated_cols,
1637       %ret,
1638     };
1639   }
1640
1641   return $updated_cols;
1642 }
1643
1644 ## Currently it is assumed that all values passed will be "normal", i.e. not
1645 ## scalar refs, or at least, all the same type as the first set, the statement is
1646 ## only prepped once.
1647 sub insert_bulk {
1648   my ($self, $source, $cols, $data) = @_;
1649
1650   my %colvalues;
1651   @colvalues{@$cols} = (0..$#$cols);
1652
1653   for my $i (0..$#$cols) {
1654     my $first_val = $data->[0][$i];
1655     next unless ref $first_val eq 'SCALAR';
1656
1657     $colvalues{ $cols->[$i] } = $first_val;
1658   }
1659
1660   # check for bad data and stringify stringifiable objects
1661   my $bad_slice = sub {
1662     my ($msg, $col_idx, $slice_idx) = @_;
1663     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1664       $msg,
1665       $cols->[$col_idx],
1666       do {
1667         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1668         Dumper {
1669           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1670         },
1671       }
1672     );
1673   };
1674
1675   for my $datum_idx (0..$#$data) {
1676     my $datum = $data->[$datum_idx];
1677
1678     for my $col_idx (0..$#$cols) {
1679       my $val            = $datum->[$col_idx];
1680       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1681       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1682
1683       if ($is_literal_sql) {
1684         if (not ref $val) {
1685           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1686         }
1687         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1688           $bad_slice->("$reftype reference found where literal SQL expected",
1689             $col_idx, $datum_idx);
1690         }
1691         elsif ($$val ne $$sqla_bind){
1692           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1693             $col_idx, $datum_idx);
1694         }
1695       }
1696       elsif (my $reftype = ref $val) {
1697         require overload;
1698         if (overload::Method($val, '""')) {
1699           $datum->[$col_idx] = "".$val;
1700         }
1701         else {
1702           $bad_slice->("$reftype reference found where bind expected",
1703             $col_idx, $datum_idx);
1704         }
1705       }
1706     }
1707   }
1708
1709   my ($sql, $bind) = $self->_prep_for_execute (
1710     'insert', undef, $source, [\%colvalues]
1711   );
1712   my @bind = @$bind;
1713
1714   my $empty_bind = 1 if (not @bind) &&
1715     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1716
1717   if ((not @bind) && (not $empty_bind)) {
1718     $self->throw_exception(
1719       'Cannot insert_bulk without support for placeholders'
1720     );
1721   }
1722
1723   # neither _execute_array, nor _execute_inserts_with_no_binds are
1724   # atomic (even if _execute _array is a single call). Thus a safety
1725   # scope guard
1726   my $guard = $self->txn_scope_guard;
1727
1728   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1729   my $sth = $self->sth($sql);
1730   my $rv = do {
1731     if ($empty_bind) {
1732       # bind_param_array doesn't work if there are no binds
1733       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1734     }
1735     else {
1736 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1737       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1738     }
1739   };
1740
1741   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1742
1743   $guard->commit;
1744
1745   return (wantarray ? ($rv, $sth, @bind) : $rv);
1746 }
1747
1748 sub _execute_array {
1749   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1750
1751   ## This must be an arrayref, else nothing works!
1752   my $tuple_status = [];
1753
1754   ## Get the bind_attributes, if any exist
1755   my $bind_attributes = $self->source_bind_attributes($source);
1756
1757   ## Bind the values and execute
1758   my $placeholder_index = 1;
1759
1760   foreach my $bound (@$bind) {
1761
1762     my $attributes = {};
1763     my ($column_name, $data_index) = @$bound;
1764
1765     if( $bind_attributes ) {
1766       $attributes = $bind_attributes->{$column_name}
1767       if defined $bind_attributes->{$column_name};
1768     }
1769
1770     my @data = map { $_->[$data_index] } @$data;
1771
1772     $sth->bind_param_array(
1773       $placeholder_index,
1774       [@data],
1775       (%$attributes ?  $attributes : ()),
1776     );
1777     $placeholder_index++;
1778   }
1779
1780   my ($rv, $err);
1781   try {
1782     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1783   }
1784   catch {
1785     $err = shift;
1786   };
1787
1788   # Statement must finish even if there was an exception.
1789   try {
1790     $sth->finish
1791   }
1792   catch {
1793     $err = shift unless defined $err
1794   };
1795
1796   $err = $sth->errstr
1797     if (! defined $err and $sth->err);
1798
1799   if (defined $err) {
1800     my $i = 0;
1801     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1802
1803     $self->throw_exception("Unexpected populate error: $err")
1804       if ($i > $#$tuple_status);
1805
1806     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1807       ($tuple_status->[$i][1] || $err),
1808       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1809     );
1810   }
1811
1812   return $rv;
1813 }
1814
1815 sub _dbh_execute_array {
1816     my ($self, $sth, $tuple_status, @extra) = @_;
1817
1818     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1819 }
1820
1821 sub _dbh_execute_inserts_with_no_binds {
1822   my ($self, $sth, $count) = @_;
1823
1824   my $err;
1825   try {
1826     my $dbh = $self->_get_dbh;
1827     local $dbh->{RaiseError} = 1;
1828     local $dbh->{PrintError} = 0;
1829
1830     $sth->execute foreach 1..$count;
1831   }
1832   catch {
1833     $err = shift;
1834   }
1835   finally {
1836     # Make sure statement is finished even if there was an exception.
1837     try {
1838       $sth->finish
1839     }
1840     catch {
1841       $err = shift unless defined $err;
1842     };
1843   };
1844
1845   $self->throw_exception($err) if defined $err;
1846
1847   return $count;
1848 }
1849
1850 sub update {
1851   my ($self, $source, @args) = @_;
1852
1853   my $bind_attrs = $self->source_bind_attributes($source);
1854
1855   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1856 }
1857
1858
1859 sub delete {
1860   my ($self, $source, @args) = @_;
1861
1862   my $bind_attrs = $self->source_bind_attributes($source);
1863
1864   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1865 }
1866
1867 # We were sent here because the $rs contains a complex search
1868 # which will require a subquery to select the correct rows
1869 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1870 #
1871 # Generating a single PK column subquery is trivial and supported
1872 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1873 # Look at _multipk_update_delete()
1874 sub _subq_update_delete {
1875   my $self = shift;
1876   my ($rs, $op, $values) = @_;
1877
1878   my $rsrc = $rs->result_source;
1879
1880   # quick check if we got a sane rs on our hands
1881   my @pcols = $rsrc->_pri_cols;
1882
1883   my $sel = $rs->_resolved_attrs->{select};
1884   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1885
1886   if (
1887       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1888         ne
1889       join ("\x00", sort @$sel )
1890   ) {
1891     $self->throw_exception (
1892       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1893     );
1894   }
1895
1896   if (@pcols == 1) {
1897     return $self->$op (
1898       $rsrc,
1899       $op eq 'update' ? $values : (),
1900       { $pcols[0] => { -in => $rs->as_query } },
1901     );
1902   }
1903
1904   else {
1905     return $self->_multipk_update_delete (@_);
1906   }
1907 }
1908
1909 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1910 # resultset update/delete involving subqueries. So by default resort
1911 # to simple (and inefficient) delete_all style per-row opearations,
1912 # while allowing specific storages to override this with a faster
1913 # implementation.
1914 #
1915 sub _multipk_update_delete {
1916   return shift->_per_row_update_delete (@_);
1917 }
1918
1919 # This is the default loop used to delete/update rows for multi PK
1920 # resultsets, and used by mysql exclusively (because it can't do anything
1921 # else).
1922 #
1923 # We do not use $row->$op style queries, because resultset update/delete
1924 # is not expected to cascade (this is what delete_all/update_all is for).
1925 #
1926 # There should be no race conditions as the entire operation is rolled
1927 # in a transaction.
1928 #
1929 sub _per_row_update_delete {
1930   my $self = shift;
1931   my ($rs, $op, $values) = @_;
1932
1933   my $rsrc = $rs->result_source;
1934   my @pcols = $rsrc->_pri_cols;
1935
1936   my $guard = $self->txn_scope_guard;
1937
1938   # emulate the return value of $sth->execute for non-selects
1939   my $row_cnt = '0E0';
1940
1941   my $subrs_cur = $rs->cursor;
1942   my @all_pk = $subrs_cur->all;
1943   for my $pks ( @all_pk) {
1944
1945     my $cond;
1946     for my $i (0.. $#pcols) {
1947       $cond->{$pcols[$i]} = $pks->[$i];
1948     }
1949
1950     $self->$op (
1951       $rsrc,
1952       $op eq 'update' ? $values : (),
1953       $cond,
1954     );
1955
1956     $row_cnt++;
1957   }
1958
1959   $guard->commit;
1960
1961   return $row_cnt;
1962 }
1963
1964 sub _select {
1965   my $self = shift;
1966   $self->_execute($self->_select_args(@_));
1967 }
1968
1969 sub _select_args_to_query {
1970   my $self = shift;
1971
1972   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1973   #  = $self->_select_args($ident, $select, $cond, $attrs);
1974   my ($op, $bind, $ident, $bind_attrs, @args) =
1975     $self->_select_args(@_);
1976
1977   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1978   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1979   $prepared_bind ||= [];
1980
1981   return wantarray
1982     ? ($sql, $prepared_bind, $bind_attrs)
1983     : \[ "($sql)", @$prepared_bind ]
1984   ;
1985 }
1986
1987 sub _select_args {
1988   my ($self, $ident, $select, $where, $attrs) = @_;
1989
1990   my $sql_maker = $self->sql_maker;
1991   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1992
1993   $attrs = {
1994     %$attrs,
1995     select => $select,
1996     from => $ident,
1997     where => $where,
1998     $rs_alias && $alias2source->{$rs_alias}
1999       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
2000       : ()
2001     ,
2002   };
2003
2004   # calculate bind_attrs before possible $ident mangling
2005   my $bind_attrs = {};
2006   for my $alias (keys %$alias2source) {
2007     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2008     for my $col (keys %$bindtypes) {
2009
2010       my $fqcn = join ('.', $alias, $col);
2011       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2012
2013       # Unqialified column names are nice, but at the same time can be
2014       # rather ambiguous. What we do here is basically go along with
2015       # the loop, adding an unqualified column slot to $bind_attrs,
2016       # alongside the fully qualified name. As soon as we encounter
2017       # another column by that name (which would imply another table)
2018       # we unset the unqualified slot and never add any info to it
2019       # to avoid erroneous type binding. If this happens the users
2020       # only choice will be to fully qualify his column name
2021
2022       if (exists $bind_attrs->{$col}) {
2023         $bind_attrs->{$col} = {};
2024       }
2025       else {
2026         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2027       }
2028     }
2029   }
2030
2031   # Sanity check the attributes (SQLMaker does it too, but
2032   # in case of a software_limit we'll never reach there)
2033   if (defined $attrs->{offset}) {
2034     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2035       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2036   }
2037   $attrs->{offset} ||= 0;
2038
2039   if (defined $attrs->{rows}) {
2040     $self->throw_exception("The rows attribute must be a positive integer if present")
2041       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2042   }
2043   elsif ($attrs->{offset}) {
2044     # MySQL actually recommends this approach.  I cringe.
2045     $attrs->{rows} = $sql_maker->__max_int;
2046   }
2047
2048   my @limit;
2049
2050   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2051   # storage, unless software limit was requested
2052   if (
2053     #limited has_many
2054     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2055        ||
2056     # grouped prefetch (to satisfy group_by == select)
2057     ( $attrs->{group_by}
2058         &&
2059       @{$attrs->{group_by}}
2060         &&
2061       $attrs->{_prefetch_select}
2062         &&
2063       @{$attrs->{_prefetch_select}}
2064     )
2065   ) {
2066     ($ident, $select, $where, $attrs)
2067       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2068   }
2069   elsif (! $attrs->{software_limit} ) {
2070     push @limit, $attrs->{rows}, $attrs->{offset};
2071   }
2072
2073   # try to simplify the joinmap further (prune unreferenced type-single joins)
2074   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2075
2076 ###
2077   # This would be the point to deflate anything found in $where
2078   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2079   # expect a row object. And all we have is a resultsource (it is trivial
2080   # to extract deflator coderefs via $alias2source above).
2081   #
2082   # I don't see a way forward other than changing the way deflators are
2083   # invoked, and that's just bad...
2084 ###
2085
2086   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2087 }
2088
2089 # Returns a counting SELECT for a simple count
2090 # query. Abstracted so that a storage could override
2091 # this to { count => 'firstcol' } or whatever makes
2092 # sense as a performance optimization
2093 sub _count_select {
2094   #my ($self, $source, $rs_attrs) = @_;
2095   return { count => '*' };
2096 }
2097
2098
2099 sub source_bind_attributes {
2100   my ($self, $source) = @_;
2101
2102   my $bind_attributes;
2103   foreach my $column ($source->columns) {
2104
2105     my $data_type = $source->column_info($column)->{data_type} || '';
2106     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2107      if $data_type;
2108   }
2109
2110   return $bind_attributes;
2111 }
2112
2113 =head2 select
2114
2115 =over 4
2116
2117 =item Arguments: $ident, $select, $condition, $attrs
2118
2119 =back
2120
2121 Handle a SQL select statement.
2122
2123 =cut
2124
2125 sub select {
2126   my $self = shift;
2127   my ($ident, $select, $condition, $attrs) = @_;
2128   return $self->cursor_class->new($self, \@_, $attrs);
2129 }
2130
2131 sub select_single {
2132   my $self = shift;
2133   my ($rv, $sth, @bind) = $self->_select(@_);
2134   my @row = $sth->fetchrow_array;
2135   my @nextrow = $sth->fetchrow_array if @row;
2136   if(@row && @nextrow) {
2137     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2138   }
2139   # Need to call finish() to work round broken DBDs
2140   $sth->finish();
2141   return @row;
2142 }
2143
2144 =head2 sql_limit_dialect
2145
2146 This is an accessor for the default SQL limit dialect used by a particular
2147 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2148 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2149 see L<DBIx::Class::SQLMaker::LimitDialects>.
2150
2151 =head2 sth
2152
2153 =over 4
2154
2155 =item Arguments: $sql
2156
2157 =back
2158
2159 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2160
2161 =cut
2162
2163 sub _dbh_sth {
2164   my ($self, $dbh, $sql) = @_;
2165
2166   # 3 is the if_active parameter which avoids active sth re-use
2167   my $sth = $self->disable_sth_caching
2168     ? $dbh->prepare($sql)
2169     : $dbh->prepare_cached($sql, {}, 3);
2170
2171   # XXX You would think RaiseError would make this impossible,
2172   #  but apparently that's not true :(
2173   $self->throw_exception($dbh->errstr) if !$sth;
2174
2175   $sth;
2176 }
2177
2178 sub sth {
2179   my ($self, $sql) = @_;
2180   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2181 }
2182
2183 sub _dbh_columns_info_for {
2184   my ($self, $dbh, $table) = @_;
2185
2186   if ($dbh->can('column_info')) {
2187     my %result;
2188     my $caught;
2189     try {
2190       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2191       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2192       $sth->execute();
2193       while ( my $info = $sth->fetchrow_hashref() ){
2194         my %column_info;
2195         $column_info{data_type}   = $info->{TYPE_NAME};
2196         $column_info{size}      = $info->{COLUMN_SIZE};
2197         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2198         $column_info{default_value} = $info->{COLUMN_DEF};
2199         my $col_name = $info->{COLUMN_NAME};
2200         $col_name =~ s/^\"(.*)\"$/$1/;
2201
2202         $result{$col_name} = \%column_info;
2203       }
2204     } catch {
2205       $caught = 1;
2206     };
2207     return \%result if !$caught && scalar keys %result;
2208   }
2209
2210   my %result;
2211   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2212   $sth->execute;
2213   my @columns = @{$sth->{NAME_lc}};
2214   for my $i ( 0 .. $#columns ){
2215     my %column_info;
2216     $column_info{data_type} = $sth->{TYPE}->[$i];
2217     $column_info{size} = $sth->{PRECISION}->[$i];
2218     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2219
2220     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2221       $column_info{data_type} = $1;
2222       $column_info{size}    = $2;
2223     }
2224
2225     $result{$columns[$i]} = \%column_info;
2226   }
2227   $sth->finish;
2228
2229   foreach my $col (keys %result) {
2230     my $colinfo = $result{$col};
2231     my $type_num = $colinfo->{data_type};
2232     my $type_name;
2233     if(defined $type_num && $dbh->can('type_info')) {
2234       my $type_info = $dbh->type_info($type_num);
2235       $type_name = $type_info->{TYPE_NAME} if $type_info;
2236       $colinfo->{data_type} = $type_name if $type_name;
2237     }
2238   }
2239
2240   return \%result;
2241 }
2242
2243 sub columns_info_for {
2244   my ($self, $table) = @_;
2245   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2246 }
2247
2248 =head2 last_insert_id
2249
2250 Return the row id of the last insert.
2251
2252 =cut
2253
2254 sub _dbh_last_insert_id {
2255     my ($self, $dbh, $source, $col) = @_;
2256
2257     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2258
2259     return $id if defined $id;
2260
2261     my $class = ref $self;
2262     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2263 }
2264
2265 sub last_insert_id {
2266   my $self = shift;
2267   $self->_dbh_last_insert_id ($self->_dbh, @_);
2268 }
2269
2270 =head2 _native_data_type
2271
2272 =over 4
2273
2274 =item Arguments: $type_name
2275
2276 =back
2277
2278 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2279 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2280 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2281
2282 The default implementation returns C<undef>, implement in your Storage driver if
2283 you need this functionality.
2284
2285 Should map types from other databases to the native RDBMS type, for example
2286 C<VARCHAR2> to C<VARCHAR>.
2287
2288 Types with modifiers should map to the underlying data type. For example,
2289 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2290
2291 Composite types should map to the container type, for example
2292 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2293
2294 =cut
2295
2296 sub _native_data_type {
2297   #my ($self, $data_type) = @_;
2298   return undef
2299 }
2300
2301 # Check if placeholders are supported at all
2302 sub _determine_supports_placeholders {
2303   my $self = shift;
2304   my $dbh  = $self->_get_dbh;
2305
2306   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2307   # but it is inaccurate more often than not
2308   return try {
2309     local $dbh->{PrintError} = 0;
2310     local $dbh->{RaiseError} = 1;
2311     $dbh->do('select ?', {}, 1);
2312     1;
2313   }
2314   catch {
2315     0;
2316   };
2317 }
2318
2319 # Check if placeholders bound to non-string types throw exceptions
2320 #
2321 sub _determine_supports_typeless_placeholders {
2322   my $self = shift;
2323   my $dbh  = $self->_get_dbh;
2324
2325   return try {
2326     local $dbh->{PrintError} = 0;
2327     local $dbh->{RaiseError} = 1;
2328     # this specifically tests a bind that is NOT a string
2329     $dbh->do('select 1 where 1 = ?', {}, 1);
2330     1;
2331   }
2332   catch {
2333     0;
2334   };
2335 }
2336
2337 =head2 sqlt_type
2338
2339 Returns the database driver name.
2340
2341 =cut
2342
2343 sub sqlt_type {
2344   shift->_get_dbh->{Driver}->{Name};
2345 }
2346
2347 =head2 bind_attribute_by_data_type
2348
2349 Given a datatype from column info, returns a database specific bind
2350 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2351 let the database planner just handle it.
2352
2353 Generally only needed for special case column types, like bytea in postgres.
2354
2355 =cut
2356
2357 sub bind_attribute_by_data_type {
2358     return;
2359 }
2360
2361 =head2 is_datatype_numeric
2362
2363 Given a datatype from column_info, returns a boolean value indicating if
2364 the current RDBMS considers it a numeric value. This controls how
2365 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2366 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2367 be performed instead of the usual C<eq>.
2368
2369 =cut
2370
2371 sub is_datatype_numeric {
2372   my ($self, $dt) = @_;
2373
2374   return 0 unless $dt;
2375
2376   return $dt =~ /^ (?:
2377     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2378   ) $/ix;
2379 }
2380
2381
2382 =head2 create_ddl_dir
2383
2384 =over 4
2385
2386 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2387
2388 =back
2389
2390 Creates a SQL file based on the Schema, for each of the specified
2391 database engines in C<\@databases> in the given directory.
2392 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2393
2394 Given a previous version number, this will also create a file containing
2395 the ALTER TABLE statements to transform the previous schema into the
2396 current one. Note that these statements may contain C<DROP TABLE> or
2397 C<DROP COLUMN> statements that can potentially destroy data.
2398
2399 The file names are created using the C<ddl_filename> method below, please
2400 override this method in your schema if you would like a different file
2401 name format. For the ALTER file, the same format is used, replacing
2402 $version in the name with "$preversion-$version".
2403
2404 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2405 The most common value for this would be C<< { add_drop_table => 1 } >>
2406 to have the SQL produced include a C<DROP TABLE> statement for each table
2407 created. For quoting purposes supply C<quote_table_names> and
2408 C<quote_field_names>.
2409
2410 If no arguments are passed, then the following default values are assumed:
2411
2412 =over 4
2413
2414 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2415
2416 =item version    - $schema->schema_version
2417
2418 =item directory  - './'
2419
2420 =item preversion - <none>
2421
2422 =back
2423
2424 By default, C<\%sqlt_args> will have
2425
2426  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2427
2428 merged with the hash passed in. To disable any of those features, pass in a
2429 hashref like the following
2430
2431  { ignore_constraint_names => 0, # ... other options }
2432
2433
2434 WARNING: You are strongly advised to check all SQL files created, before applying
2435 them.
2436
2437 =cut
2438
2439 sub create_ddl_dir {
2440   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2441
2442   unless ($dir) {
2443     carp "No directory given, using ./\n";
2444     $dir = './';
2445   } else {
2446       -d $dir
2447         or
2448       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2449         or
2450       $self->throw_exception(
2451         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2452       );
2453   }
2454
2455   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2456
2457   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2458   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2459
2460   my $schema_version = $schema->schema_version || '1.x';
2461   $version ||= $schema_version;
2462
2463   $sqltargs = {
2464     add_drop_table => 1,
2465     ignore_constraint_names => 1,
2466     ignore_index_names => 1,
2467     %{$sqltargs || {}}
2468   };
2469
2470   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2471     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2472   }
2473
2474   my $sqlt = SQL::Translator->new( $sqltargs );
2475
2476   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2477   my $sqlt_schema = $sqlt->translate({ data => $schema })
2478     or $self->throw_exception ($sqlt->error);
2479
2480   foreach my $db (@$databases) {
2481     $sqlt->reset();
2482     $sqlt->{schema} = $sqlt_schema;
2483     $sqlt->producer($db);
2484
2485     my $file;
2486     my $filename = $schema->ddl_filename($db, $version, $dir);
2487     if (-e $filename && ($version eq $schema_version )) {
2488       # if we are dumping the current version, overwrite the DDL
2489       carp "Overwriting existing DDL file - $filename";
2490       unlink($filename);
2491     }
2492
2493     my $output = $sqlt->translate;
2494     if(!$output) {
2495       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2496       next;
2497     }
2498     if(!open($file, ">$filename")) {
2499       $self->throw_exception("Can't open $filename for writing ($!)");
2500       next;
2501     }
2502     print $file $output;
2503     close($file);
2504
2505     next unless ($preversion);
2506
2507     require SQL::Translator::Diff;
2508
2509     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2510     if(!-e $prefilename) {
2511       carp("No previous schema file found ($prefilename)");
2512       next;
2513     }
2514
2515     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2516     if(-e $difffile) {
2517       carp("Overwriting existing diff file - $difffile");
2518       unlink($difffile);
2519     }
2520
2521     my $source_schema;
2522     {
2523       my $t = SQL::Translator->new($sqltargs);
2524       $t->debug( 0 );
2525       $t->trace( 0 );
2526
2527       $t->parser( $db )
2528         or $self->throw_exception ($t->error);
2529
2530       my $out = $t->translate( $prefilename )
2531         or $self->throw_exception ($t->error);
2532
2533       $source_schema = $t->schema;
2534
2535       $source_schema->name( $prefilename )
2536         unless ( $source_schema->name );
2537     }
2538
2539     # The "new" style of producers have sane normalization and can support
2540     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2541     # And we have to diff parsed SQL against parsed SQL.
2542     my $dest_schema = $sqlt_schema;
2543
2544     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2545       my $t = SQL::Translator->new($sqltargs);
2546       $t->debug( 0 );
2547       $t->trace( 0 );
2548
2549       $t->parser( $db )
2550         or $self->throw_exception ($t->error);
2551
2552       my $out = $t->translate( $filename )
2553         or $self->throw_exception ($t->error);
2554
2555       $dest_schema = $t->schema;
2556
2557       $dest_schema->name( $filename )
2558         unless $dest_schema->name;
2559     }
2560
2561     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2562                                                   $dest_schema,   $db,
2563                                                   $sqltargs
2564                                                  );
2565     if(!open $file, ">$difffile") {
2566       $self->throw_exception("Can't write to $difffile ($!)");
2567       next;
2568     }
2569     print $file $diff;
2570     close($file);
2571   }
2572 }
2573
2574 =head2 deployment_statements
2575
2576 =over 4
2577
2578 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2579
2580 =back
2581
2582 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2583
2584 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2585 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2586
2587 C<$directory> is used to return statements from files in a previously created
2588 L</create_ddl_dir> directory and is optional. The filenames are constructed
2589 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2590
2591 If no C<$directory> is specified then the statements are constructed on the
2592 fly using L<SQL::Translator> and C<$version> is ignored.
2593
2594 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2595
2596 =cut
2597
2598 sub deployment_statements {
2599   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2600   $type ||= $self->sqlt_type;
2601   $version ||= $schema->schema_version || '1.x';
2602   $dir ||= './';
2603   my $filename = $schema->ddl_filename($type, $version, $dir);
2604   if(-f $filename)
2605   {
2606       my $file;
2607       open($file, "<$filename")
2608         or $self->throw_exception("Can't open $filename ($!)");
2609       my @rows = <$file>;
2610       close($file);
2611       return join('', @rows);
2612   }
2613
2614   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2615     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2616   }
2617
2618   # sources needs to be a parser arg, but for simplicty allow at top level
2619   # coming in
2620   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2621       if exists $sqltargs->{sources};
2622
2623   my $tr = SQL::Translator->new(
2624     producer => "SQL::Translator::Producer::${type}",
2625     %$sqltargs,
2626     parser => 'SQL::Translator::Parser::DBIx::Class',
2627     data => $schema,
2628   );
2629
2630   my @ret;
2631   my $wa = wantarray;
2632   if ($wa) {
2633     @ret = $tr->translate;
2634   }
2635   else {
2636     $ret[0] = $tr->translate;
2637   }
2638
2639   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2640     unless (@ret && defined $ret[0]);
2641
2642   return $wa ? @ret : $ret[0];
2643 }
2644
2645 sub deploy {
2646   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2647   my $deploy = sub {
2648     my $line = shift;
2649     return if($line =~ /^--/);
2650     return if(!$line);
2651     # next if($line =~ /^DROP/m);
2652     return if($line =~ /^BEGIN TRANSACTION/m);
2653     return if($line =~ /^COMMIT/m);
2654     return if $line =~ /^\s+$/; # skip whitespace only
2655     $self->_query_start($line);
2656     try {
2657       # do a dbh_do cycle here, as we need some error checking in
2658       # place (even though we will ignore errors)
2659       $self->dbh_do (sub { $_[1]->do($line) });
2660     } catch {
2661       carp qq{$_ (running "${line}")};
2662     };
2663     $self->_query_end($line);
2664   };
2665   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2666   if (@statements > 1) {
2667     foreach my $statement (@statements) {
2668       $deploy->( $statement );
2669     }
2670   }
2671   elsif (@statements == 1) {
2672     foreach my $line ( split(";\n", $statements[0])) {
2673       $deploy->( $line );
2674     }
2675   }
2676 }
2677
2678 =head2 datetime_parser
2679
2680 Returns the datetime parser class
2681
2682 =cut
2683
2684 sub datetime_parser {
2685   my $self = shift;
2686   return $self->{datetime_parser} ||= do {
2687     $self->build_datetime_parser(@_);
2688   };
2689 }
2690
2691 =head2 datetime_parser_type
2692
2693 Defines (returns) the datetime parser class - currently hardwired to
2694 L<DateTime::Format::MySQL>
2695
2696 =cut
2697
2698 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2699
2700 =head2 build_datetime_parser
2701
2702 See L</datetime_parser>
2703
2704 =cut
2705
2706 sub build_datetime_parser {
2707   my $self = shift;
2708   my $type = $self->datetime_parser_type(@_);
2709   $self->ensure_class_loaded ($type);
2710   return $type;
2711 }
2712
2713
2714 =head2 is_replicating
2715
2716 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2717 replicate from a master database.  Default is undef, which is the result
2718 returned by databases that don't support replication.
2719
2720 =cut
2721
2722 sub is_replicating {
2723     return;
2724
2725 }
2726
2727 =head2 lag_behind_master
2728
2729 Returns a number that represents a certain amount of lag behind a master db
2730 when a given storage is replicating.  The number is database dependent, but
2731 starts at zero and increases with the amount of lag. Default in undef
2732
2733 =cut
2734
2735 sub lag_behind_master {
2736     return;
2737 }
2738
2739 =head2 relname_to_table_alias
2740
2741 =over 4
2742
2743 =item Arguments: $relname, $join_count
2744
2745 =back
2746
2747 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2748 queries.
2749
2750 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2751 way these aliases are named.
2752
2753 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2754 otherwise C<"$relname">.
2755
2756 =cut
2757
2758 sub relname_to_table_alias {
2759   my ($self, $relname, $join_count) = @_;
2760
2761   my $alias = ($join_count && $join_count > 1 ?
2762     join('_', $relname, $join_count) : $relname);
2763
2764   return $alias;
2765 }
2766
2767 1;
2768
2769 =head1 USAGE NOTES
2770
2771 =head2 DBIx::Class and AutoCommit
2772
2773 DBIx::Class can do some wonderful magic with handling exceptions,
2774 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2775 (the default) combined with C<txn_do> for transaction support.
2776
2777 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2778 in an assumed transaction between commits, and you're telling us you'd
2779 like to manage that manually.  A lot of the magic protections offered by
2780 this module will go away.  We can't protect you from exceptions due to database
2781 disconnects because we don't know anything about how to restart your
2782 transactions.  You're on your own for handling all sorts of exceptional
2783 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2784 be with raw DBI.
2785
2786
2787 =head1 AUTHORS
2788
2789 Matt S. Trout <mst@shadowcatsystems.co.uk>
2790
2791 Andy Grundman <andy@hybridized.org>
2792
2793 =head1 LICENSE
2794
2795 You may distribute this code under the same terms as Perl itself.
2796
2797 =cut