Remove forgotten frame from Storage-method overrides (was missing
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } @capabilities );
60
61
62 # Each of these methods need _determine_driver called before itself
63 # in order to function reliably. This is a purely DRY optimization
64 #
65 # get_(use)_dbms_capability need to be called on the correct Storage
66 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
67 # _determine_supports_X which obv. needs a correct driver as well
68 my @rdbms_specific_methods = qw/
69   deployment_statements
70   sqlt_type
71   sql_maker
72   build_datetime_parser
73   datetime_parser_type
74
75   insert
76   insert_bulk
77   update
78   delete
79   select
80   select_single
81
82   get_use_dbms_capability
83   get_dbms_capability
84 /;
85
86 for my $meth (@rdbms_specific_methods) {
87
88   my $orig = __PACKAGE__->can ($meth)
89     or die "$meth is not a ::Storage::DBI method!";
90
91   no strict qw/refs/;
92   no warnings qw/redefine/;
93   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
94     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
95       $_[0]->_determine_driver;
96       goto $_[0]->can($meth);
97     }
98     goto $orig;
99   };
100 }
101
102
103 =head1 NAME
104
105 DBIx::Class::Storage::DBI - DBI storage handler
106
107 =head1 SYNOPSIS
108
109   my $schema = MySchema->connect('dbi:SQLite:my.db');
110
111   $schema->storage->debug(1);
112
113   my @stuff = $schema->storage->dbh_do(
114     sub {
115       my ($storage, $dbh, @args) = @_;
116       $dbh->do("DROP TABLE authors");
117     },
118     @column_list
119   );
120
121   $schema->resultset('Book')->search({
122      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
123   });
124
125 =head1 DESCRIPTION
126
127 This class represents the connection to an RDBMS via L<DBI>.  See
128 L<DBIx::Class::Storage> for general information.  This pod only
129 documents DBI-specific methods and behaviors.
130
131 =head1 METHODS
132
133 =cut
134
135 sub new {
136   my $new = shift->next::method(@_);
137
138   $new->transaction_depth(0);
139   $new->_sql_maker_opts({});
140   $new->_dbh_details({});
141   $new->{savepoints} = [];
142   $new->{_in_dbh_do} = 0;
143   $new->{_dbh_gen} = 0;
144
145   # read below to see what this does
146   $new->_arm_global_destructor;
147
148   $new;
149 }
150
151 # This is hack to work around perl shooting stuff in random
152 # order on exit(). If we do not walk the remaining storage
153 # objects in an END block, there is a *small but real* chance
154 # of a fork()ed child to kill the parent's shared DBI handle,
155 # *before perl reaches the DESTROY in this package*
156 # Yes, it is ugly and effective.
157 {
158   my %seek_and_destroy;
159
160   sub _arm_global_destructor {
161     my $self = shift;
162     my $key = Scalar::Util::refaddr ($self);
163     $seek_and_destroy{$key} = $self;
164     Scalar::Util::weaken ($seek_and_destroy{$key});
165   }
166
167   END {
168     local $?; # just in case the DBI destructor changes it somehow
169
170     # destroy just the object if not native to this process/thread
171     $_->_preserve_foreign_dbh for (grep
172       { defined $_ }
173       values %seek_and_destroy
174     );
175   }
176 }
177
178 sub DESTROY {
179   my $self = shift;
180
181   # destroy just the object if not native to this process/thread
182   $self->_preserve_foreign_dbh;
183
184   # some databases need this to stop spewing warnings
185   if (my $dbh = $self->_dbh) {
186     try {
187       %{ $dbh->{CachedKids} } = ();
188       $dbh->disconnect;
189     };
190   }
191
192   $self->_dbh(undef);
193 }
194
195 sub _preserve_foreign_dbh {
196   my $self = shift;
197
198   return unless $self->_dbh;
199
200   $self->_verify_tid;
201
202   return unless $self->_dbh;
203
204   $self->_verify_pid;
205
206 }
207
208 # handle pid changes correctly - do not destroy parent's connection
209 sub _verify_pid {
210   my $self = shift;
211
212   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
213
214   $self->_dbh->{InactiveDestroy} = 1;
215   $self->_dbh(undef);
216   $self->{_dbh_gen}++;
217
218   return;
219 }
220
221 # very similar to above, but seems to FAIL if I set InactiveDestroy
222 sub _verify_tid {
223   my $self = shift;
224
225   if ( ! defined $self->_conn_tid ) {
226     return; # no threads
227   }
228   elsif ( $self->_conn_tid == threads->tid ) {
229     return; # same thread
230   }
231
232   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
233   $self->_dbh(undef);
234   $self->{_dbh_gen}++;
235
236   return;
237 }
238
239
240 =head2 connect_info
241
242 This method is normally called by L<DBIx::Class::Schema/connection>, which
243 encapsulates its argument list in an arrayref before passing them here.
244
245 The argument list may contain:
246
247 =over
248
249 =item *
250
251 The same 4-element argument set one would normally pass to
252 L<DBI/connect>, optionally followed by
253 L<extra attributes|/DBIx::Class specific connection attributes>
254 recognized by DBIx::Class:
255
256   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
257
258 =item *
259
260 A single code reference which returns a connected
261 L<DBI database handle|DBI/connect> optionally followed by
262 L<extra attributes|/DBIx::Class specific connection attributes> recognized
263 by DBIx::Class:
264
265   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
266
267 =item *
268
269 A single hashref with all the attributes and the dsn/user/password
270 mixed together:
271
272   $connect_info_args = [{
273     dsn => $dsn,
274     user => $user,
275     password => $pass,
276     %dbi_attributes,
277     %extra_attributes,
278   }];
279
280   $connect_info_args = [{
281     dbh_maker => sub { DBI->connect (...) },
282     %dbi_attributes,
283     %extra_attributes,
284   }];
285
286 This is particularly useful for L<Catalyst> based applications, allowing the
287 following config (L<Config::General> style):
288
289   <Model::DB>
290     schema_class   App::DB
291     <connect_info>
292       dsn          dbi:mysql:database=test
293       user         testuser
294       password     TestPass
295       AutoCommit   1
296     </connect_info>
297   </Model::DB>
298
299 The C<dsn>/C<user>/C<password> combination can be substituted by the
300 C<dbh_maker> key whose value is a coderef that returns a connected
301 L<DBI database handle|DBI/connect>
302
303 =back
304
305 Please note that the L<DBI> docs recommend that you always explicitly
306 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
307 recommends that it be set to I<1>, and that you perform transactions
308 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
309 to I<1> if you do not do explicitly set it to zero.  This is the default
310 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
311
312 =head3 DBIx::Class specific connection attributes
313
314 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
315 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
316 the following connection options. These options can be mixed in with your other
317 L<DBI> connection attributes, or placed in a separate hashref
318 (C<\%extra_attributes>) as shown above.
319
320 Every time C<connect_info> is invoked, any previous settings for
321 these options will be cleared before setting the new ones, regardless of
322 whether any options are specified in the new C<connect_info>.
323
324
325 =over
326
327 =item on_connect_do
328
329 Specifies things to do immediately after connecting or re-connecting to
330 the database.  Its value may contain:
331
332 =over
333
334 =item a scalar
335
336 This contains one SQL statement to execute.
337
338 =item an array reference
339
340 This contains SQL statements to execute in order.  Each element contains
341 a string or a code reference that returns a string.
342
343 =item a code reference
344
345 This contains some code to execute.  Unlike code references within an
346 array reference, its return value is ignored.
347
348 =back
349
350 =item on_disconnect_do
351
352 Takes arguments in the same form as L</on_connect_do> and executes them
353 immediately before disconnecting from the database.
354
355 Note, this only runs if you explicitly call L</disconnect> on the
356 storage object.
357
358 =item on_connect_call
359
360 A more generalized form of L</on_connect_do> that calls the specified
361 C<connect_call_METHOD> methods in your storage driver.
362
363   on_connect_do => 'select 1'
364
365 is equivalent to:
366
367   on_connect_call => [ [ do_sql => 'select 1' ] ]
368
369 Its values may contain:
370
371 =over
372
373 =item a scalar
374
375 Will call the C<connect_call_METHOD> method.
376
377 =item a code reference
378
379 Will execute C<< $code->($storage) >>
380
381 =item an array reference
382
383 Each value can be a method name or code reference.
384
385 =item an array of arrays
386
387 For each array, the first item is taken to be the C<connect_call_> method name
388 or code reference, and the rest are parameters to it.
389
390 =back
391
392 Some predefined storage methods you may use:
393
394 =over
395
396 =item do_sql
397
398 Executes a SQL string or a code reference that returns a SQL string. This is
399 what L</on_connect_do> and L</on_disconnect_do> use.
400
401 It can take:
402
403 =over
404
405 =item a scalar
406
407 Will execute the scalar as SQL.
408
409 =item an arrayref
410
411 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
412 attributes hashref and bind values.
413
414 =item a code reference
415
416 Will execute C<< $code->($storage) >> and execute the return array refs as
417 above.
418
419 =back
420
421 =item datetime_setup
422
423 Execute any statements necessary to initialize the database session to return
424 and accept datetime/timestamp values used with
425 L<DBIx::Class::InflateColumn::DateTime>.
426
427 Only necessary for some databases, see your specific storage driver for
428 implementation details.
429
430 =back
431
432 =item on_disconnect_call
433
434 Takes arguments in the same form as L</on_connect_call> and executes them
435 immediately before disconnecting from the database.
436
437 Calls the C<disconnect_call_METHOD> methods as opposed to the
438 C<connect_call_METHOD> methods called by L</on_connect_call>.
439
440 Note, this only runs if you explicitly call L</disconnect> on the
441 storage object.
442
443 =item disable_sth_caching
444
445 If set to a true value, this option will disable the caching of
446 statement handles via L<DBI/prepare_cached>.
447
448 =item limit_dialect
449
450 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
451 default L</sql_limit_dialect> setting of the storage (if any). For a list
452 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
453
454 =item quote_char
455
456 Specifies what characters to use to quote table and column names. If
457 you use this you will want to specify L</name_sep> as well.
458
459 C<quote_char> expects either a single character, in which case is it
460 is placed on either side of the table/column name, or an arrayref of length
461 2 in which case the table/column name is placed between the elements.
462
463 For example under MySQL you should use C<< quote_char => '`' >>, and for
464 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
465
466 =item name_sep
467
468 This only needs to be used in conjunction with C<quote_char>, and is used to
469 specify the character that separates elements (schemas, tables, columns) from
470 each other. In most cases this is simply a C<.>.
471
472 The consequences of not supplying this value is that L<SQL::Abstract>
473 will assume DBIx::Class' uses of aliases to be complete column
474 names. The output will look like I<"me.name"> when it should actually
475 be I<"me"."name">.
476
477 =item unsafe
478
479 This Storage driver normally installs its own C<HandleError>, sets
480 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
481 all database handles, including those supplied by a coderef.  It does this
482 so that it can have consistent and useful error behavior.
483
484 If you set this option to a true value, Storage will not do its usual
485 modifications to the database handle's attributes, and instead relies on
486 the settings in your connect_info DBI options (or the values you set in
487 your connection coderef, in the case that you are connecting via coderef).
488
489 Note that your custom settings can cause Storage to malfunction,
490 especially if you set a C<HandleError> handler that suppresses exceptions
491 and/or disable C<RaiseError>.
492
493 =item auto_savepoint
494
495 If this option is true, L<DBIx::Class> will use savepoints when nesting
496 transactions, making it possible to recover from failure in the inner
497 transaction without having to abort all outer transactions.
498
499 =item cursor_class
500
501 Use this argument to supply a cursor class other than the default
502 L<DBIx::Class::Storage::DBI::Cursor>.
503
504 =back
505
506 Some real-life examples of arguments to L</connect_info> and
507 L<DBIx::Class::Schema/connect>
508
509   # Simple SQLite connection
510   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
511
512   # Connect via subref
513   ->connect_info([ sub { DBI->connect(...) } ]);
514
515   # Connect via subref in hashref
516   ->connect_info([{
517     dbh_maker => sub { DBI->connect(...) },
518     on_connect_do => 'alter session ...',
519   }]);
520
521   # A bit more complicated
522   ->connect_info(
523     [
524       'dbi:Pg:dbname=foo',
525       'postgres',
526       'my_pg_password',
527       { AutoCommit => 1 },
528       { quote_char => q{"}, name_sep => q{.} },
529     ]
530   );
531
532   # Equivalent to the previous example
533   ->connect_info(
534     [
535       'dbi:Pg:dbname=foo',
536       'postgres',
537       'my_pg_password',
538       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
539     ]
540   );
541
542   # Same, but with hashref as argument
543   # See parse_connect_info for explanation
544   ->connect_info(
545     [{
546       dsn         => 'dbi:Pg:dbname=foo',
547       user        => 'postgres',
548       password    => 'my_pg_password',
549       AutoCommit  => 1,
550       quote_char  => q{"},
551       name_sep    => q{.},
552     }]
553   );
554
555   # Subref + DBIx::Class-specific connection options
556   ->connect_info(
557     [
558       sub { DBI->connect(...) },
559       {
560           quote_char => q{`},
561           name_sep => q{@},
562           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
563           disable_sth_caching => 1,
564       },
565     ]
566   );
567
568
569
570 =cut
571
572 sub connect_info {
573   my ($self, $info) = @_;
574
575   return $self->_connect_info if !$info;
576
577   $self->_connect_info($info); # copy for _connect_info
578
579   $info = $self->_normalize_connect_info($info)
580     if ref $info eq 'ARRAY';
581
582   for my $storage_opt (keys %{ $info->{storage_options} }) {
583     my $value = $info->{storage_options}{$storage_opt};
584
585     $self->$storage_opt($value);
586   }
587
588   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
589   #  the new set of options
590   $self->_sql_maker(undef);
591   $self->_sql_maker_opts({});
592
593   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
594     my $value = $info->{sql_maker_options}{$sql_maker_opt};
595
596     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
597   }
598
599   my %attrs = (
600     %{ $self->_default_dbi_connect_attributes || {} },
601     %{ $info->{attributes} || {} },
602   );
603
604   my @args = @{ $info->{arguments} };
605
606   $self->_dbi_connect_info([@args,
607     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
608
609   # FIXME - dirty:
610   # save attributes them in a separate accessor so they are always
611   # introspectable, even in case of a CODE $dbhmaker
612   $self->_dbic_connect_attributes (\%attrs);
613
614   return $self->_connect_info;
615 }
616
617 sub _normalize_connect_info {
618   my ($self, $info_arg) = @_;
619   my %info;
620
621   my @args = @$info_arg;  # take a shallow copy for further mutilation
622
623   # combine/pre-parse arguments depending on invocation style
624
625   my %attrs;
626   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
627     %attrs = %{ $args[1] || {} };
628     @args = $args[0];
629   }
630   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
631     %attrs = %{$args[0]};
632     @args = ();
633     if (my $code = delete $attrs{dbh_maker}) {
634       @args = $code;
635
636       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
637       if (@ignored) {
638         carp sprintf (
639             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
640           . "to the result of 'dbh_maker'",
641
642           join (', ', map { "'$_'" } (@ignored) ),
643         );
644       }
645     }
646     else {
647       @args = delete @attrs{qw/dsn user password/};
648     }
649   }
650   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
651     %attrs = (
652       % { $args[3] || {} },
653       % { $args[4] || {} },
654     );
655     @args = @args[0,1,2];
656   }
657
658   $info{arguments} = \@args;
659
660   my @storage_opts = grep exists $attrs{$_},
661     @storage_options, 'cursor_class';
662
663   @{ $info{storage_options} }{@storage_opts} =
664     delete @attrs{@storage_opts} if @storage_opts;
665
666   my @sql_maker_opts = grep exists $attrs{$_},
667     qw/limit_dialect quote_char name_sep/;
668
669   @{ $info{sql_maker_options} }{@sql_maker_opts} =
670     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
671
672   $info{attributes} = \%attrs if %attrs;
673
674   return \%info;
675 }
676
677 sub _default_dbi_connect_attributes {
678   return {
679     AutoCommit => 1,
680     RaiseError => 1,
681     PrintError => 0,
682   };
683 }
684
685 =head2 on_connect_do
686
687 This method is deprecated in favour of setting via L</connect_info>.
688
689 =cut
690
691 =head2 on_disconnect_do
692
693 This method is deprecated in favour of setting via L</connect_info>.
694
695 =cut
696
697 sub _parse_connect_do {
698   my ($self, $type) = @_;
699
700   my $val = $self->$type;
701   return () if not defined $val;
702
703   my @res;
704
705   if (not ref($val)) {
706     push @res, [ 'do_sql', $val ];
707   } elsif (ref($val) eq 'CODE') {
708     push @res, $val;
709   } elsif (ref($val) eq 'ARRAY') {
710     push @res, map { [ 'do_sql', $_ ] } @$val;
711   } else {
712     $self->throw_exception("Invalid type for $type: ".ref($val));
713   }
714
715   return \@res;
716 }
717
718 =head2 dbh_do
719
720 Arguments: ($subref | $method_name), @extra_coderef_args?
721
722 Execute the given $subref or $method_name using the new exception-based
723 connection management.
724
725 The first two arguments will be the storage object that C<dbh_do> was called
726 on and a database handle to use.  Any additional arguments will be passed
727 verbatim to the called subref as arguments 2 and onwards.
728
729 Using this (instead of $self->_dbh or $self->dbh) ensures correct
730 exception handling and reconnection (or failover in future subclasses).
731
732 Your subref should have no side-effects outside of the database, as
733 there is the potential for your subref to be partially double-executed
734 if the database connection was stale/dysfunctional.
735
736 Example:
737
738   my @stuff = $schema->storage->dbh_do(
739     sub {
740       my ($storage, $dbh, @cols) = @_;
741       my $cols = join(q{, }, @cols);
742       $dbh->selectrow_array("SELECT $cols FROM foo");
743     },
744     @column_list
745   );
746
747 =cut
748
749 sub dbh_do {
750   my $self = shift;
751   my $code = shift;
752
753   my $dbh = $self->_get_dbh;
754
755   return $self->$code($dbh, @_)
756     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
757
758   local $self->{_in_dbh_do} = 1;
759
760   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
761   my $args = \@_;
762   return try {
763     $self->$code ($dbh, @$args);
764   } catch {
765     $self->throw_exception($_) if $self->connected;
766
767     # We were not connected - reconnect and retry, but let any
768     #  exception fall right through this time
769     carp "Retrying $code after catching disconnected exception: $_"
770       if $ENV{DBIC_DBIRETRY_DEBUG};
771
772     $self->_populate_dbh;
773     $self->$code($self->_dbh, @$args);
774   };
775 }
776
777 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
778 # It also informs dbh_do to bypass itself while under the direction of txn_do,
779 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
780 sub txn_do {
781   my $self = shift;
782   my $coderef = shift;
783
784   ref $coderef eq 'CODE' or $self->throw_exception
785     ('$coderef must be a CODE reference');
786
787   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
788
789   local $self->{_in_dbh_do} = 1;
790
791   my @result;
792   my $want_array = wantarray;
793
794   my $tried = 0;
795   while(1) {
796     my $exception;
797
798     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
799     my $args = \@_;
800
801     try {
802       $self->_get_dbh;
803
804       $self->txn_begin;
805       if($want_array) {
806           @result = $coderef->(@$args);
807       }
808       elsif(defined $want_array) {
809           $result[0] = $coderef->(@$args);
810       }
811       else {
812           $coderef->(@$args);
813       }
814       $self->txn_commit;
815     } catch {
816       $exception = $_;
817     };
818
819     if(! defined $exception) { return $want_array ? @result : $result[0] }
820
821     if($tried++ || $self->connected) {
822       my $rollback_exception;
823       try { $self->txn_rollback } catch { $rollback_exception = shift };
824       if(defined $rollback_exception) {
825         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
826         $self->throw_exception($exception)  # propagate nested rollback
827           if $rollback_exception =~ /$exception_class/;
828
829         $self->throw_exception(
830           "Transaction aborted: ${exception}. "
831           . "Rollback failed: ${rollback_exception}"
832         );
833       }
834       $self->throw_exception($exception)
835     }
836
837     # We were not connected, and was first try - reconnect and retry
838     # via the while loop
839     carp "Retrying $coderef after catching disconnected exception: $exception"
840       if $ENV{DBIC_DBIRETRY_DEBUG};
841     $self->_populate_dbh;
842   }
843 }
844
845 =head2 disconnect
846
847 Our C<disconnect> method also performs a rollback first if the
848 database is not in C<AutoCommit> mode.
849
850 =cut
851
852 sub disconnect {
853   my ($self) = @_;
854
855   if( $self->_dbh ) {
856     my @actions;
857
858     push @actions, ( $self->on_disconnect_call || () );
859     push @actions, $self->_parse_connect_do ('on_disconnect_do');
860
861     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
862
863     $self->_dbh_rollback unless $self->_dbh_autocommit;
864
865     %{ $self->_dbh->{CachedKids} } = ();
866     $self->_dbh->disconnect;
867     $self->_dbh(undef);
868     $self->{_dbh_gen}++;
869   }
870 }
871
872 =head2 with_deferred_fk_checks
873
874 =over 4
875
876 =item Arguments: C<$coderef>
877
878 =item Return Value: The return value of $coderef
879
880 =back
881
882 Storage specific method to run the code ref with FK checks deferred or
883 in MySQL's case disabled entirely.
884
885 =cut
886
887 # Storage subclasses should override this
888 sub with_deferred_fk_checks {
889   my ($self, $sub) = @_;
890   $sub->();
891 }
892
893 =head2 connected
894
895 =over
896
897 =item Arguments: none
898
899 =item Return Value: 1|0
900
901 =back
902
903 Verifies that the current database handle is active and ready to execute
904 an SQL statement (e.g. the connection did not get stale, server is still
905 answering, etc.) This method is used internally by L</dbh>.
906
907 =cut
908
909 sub connected {
910   my $self = shift;
911   return 0 unless $self->_seems_connected;
912
913   #be on the safe side
914   local $self->_dbh->{RaiseError} = 1;
915
916   return $self->_ping;
917 }
918
919 sub _seems_connected {
920   my $self = shift;
921
922   $self->_preserve_foreign_dbh;
923
924   my $dbh = $self->_dbh
925     or return 0;
926
927   return $dbh->FETCH('Active');
928 }
929
930 sub _ping {
931   my $self = shift;
932
933   my $dbh = $self->_dbh or return 0;
934
935   return $dbh->ping;
936 }
937
938 sub ensure_connected {
939   my ($self) = @_;
940
941   unless ($self->connected) {
942     $self->_populate_dbh;
943   }
944 }
945
946 =head2 dbh
947
948 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
949 is guaranteed to be healthy by implicitly calling L</connected>, and if
950 necessary performing a reconnection before returning. Keep in mind that this
951 is very B<expensive> on some database engines. Consider using L</dbh_do>
952 instead.
953
954 =cut
955
956 sub dbh {
957   my ($self) = @_;
958
959   if (not $self->_dbh) {
960     $self->_populate_dbh;
961   } else {
962     $self->ensure_connected;
963   }
964   return $self->_dbh;
965 }
966
967 # this is the internal "get dbh or connect (don't check)" method
968 sub _get_dbh {
969   my $self = shift;
970   $self->_preserve_foreign_dbh;
971   $self->_populate_dbh unless $self->_dbh;
972   return $self->_dbh;
973 }
974
975 sub sql_maker {
976   my ($self) = @_;
977   unless ($self->_sql_maker) {
978     my $sql_maker_class = $self->sql_maker_class;
979     $self->ensure_class_loaded ($sql_maker_class);
980
981     my %opts = %{$self->_sql_maker_opts||{}};
982     my $dialect =
983       $opts{limit_dialect}
984         ||
985       $self->sql_limit_dialect
986         ||
987       do {
988         my $s_class = (ref $self) || $self;
989         carp (
990           "Your storage class ($s_class) does not set sql_limit_dialect and you "
991         . 'have not supplied an explicit limit_dialect in your connection_info. '
992         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
993         . 'databases but can be (and often is) painfully slow.'
994         );
995
996         'GenericSubQ';
997       }
998     ;
999
1000     $self->_sql_maker($sql_maker_class->new(
1001       bindtype=>'columns',
1002       array_datatypes => 1,
1003       limit_dialect => $dialect,
1004       %opts,
1005     ));
1006   }
1007   return $self->_sql_maker;
1008 }
1009
1010 # nothing to do by default
1011 sub _rebless {}
1012 sub _init {}
1013
1014 sub _populate_dbh {
1015   my ($self) = @_;
1016
1017   my @info = @{$self->_dbi_connect_info || []};
1018   $self->_dbh(undef); # in case ->connected failed we might get sent here
1019   $self->_dbh_details({}); # reset everything we know
1020
1021   $self->_dbh($self->_connect(@info));
1022
1023   $self->_conn_pid($$);
1024   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1025
1026   $self->_determine_driver;
1027
1028   # Always set the transaction depth on connect, since
1029   #  there is no transaction in progress by definition
1030   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1031
1032   $self->_run_connection_actions unless $self->{_in_determine_driver};
1033 }
1034
1035 sub _run_connection_actions {
1036   my $self = shift;
1037   my @actions;
1038
1039   push @actions, ( $self->on_connect_call || () );
1040   push @actions, $self->_parse_connect_do ('on_connect_do');
1041
1042   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1043 }
1044
1045
1046
1047 sub set_use_dbms_capability {
1048   $_[0]->set_inherited ($_[1], $_[2]);
1049 }
1050
1051 sub get_use_dbms_capability {
1052   my ($self, $capname) = @_;
1053
1054   my $use = $self->get_inherited ($capname);
1055   return defined $use
1056     ? $use
1057     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1058   ;
1059 }
1060
1061 sub set_dbms_capability {
1062   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1063 }
1064
1065 sub get_dbms_capability {
1066   my ($self, $capname) = @_;
1067
1068   my $cap = $self->_dbh_details->{capability}{$capname};
1069
1070   unless (defined $cap) {
1071     if (my $meth = $self->can ("_determine$capname")) {
1072       $cap = $self->$meth ? 1 : 0;
1073     }
1074     else {
1075       $cap = 0;
1076     }
1077
1078     $self->set_dbms_capability ($capname, $cap);
1079   }
1080
1081   return $cap;
1082 }
1083
1084 sub _server_info {
1085   my $self = shift;
1086
1087   my $info;
1088   unless ($info = $self->_dbh_details->{info}) {
1089
1090     $info = {};
1091
1092     my $server_version = try { $self->_get_server_version };
1093
1094     if (defined $server_version) {
1095       $info->{dbms_version} = $server_version;
1096
1097       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1098       my @verparts = split (/\./, $numeric_version);
1099       if (
1100         @verparts
1101           &&
1102         $verparts[0] <= 999
1103       ) {
1104         # consider only up to 3 version parts, iff not more than 3 digits
1105         my @use_parts;
1106         while (@verparts && @use_parts < 3) {
1107           my $p = shift @verparts;
1108           last if $p > 999;
1109           push @use_parts, $p;
1110         }
1111         push @use_parts, 0 while @use_parts < 3;
1112
1113         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1114       }
1115     }
1116
1117     $self->_dbh_details->{info} = $info;
1118   }
1119
1120   return $info;
1121 }
1122
1123 sub _get_server_version {
1124   shift->_get_dbh->get_info(18);
1125 }
1126
1127 sub _determine_driver {
1128   my ($self) = @_;
1129
1130   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1131     my $started_connected = 0;
1132     local $self->{_in_determine_driver} = 1;
1133
1134     if (ref($self) eq __PACKAGE__) {
1135       my $driver;
1136       if ($self->_dbh) { # we are connected
1137         $driver = $self->_dbh->{Driver}{Name};
1138         $started_connected = 1;
1139       } else {
1140         # if connect_info is a CODEREF, we have no choice but to connect
1141         if (ref $self->_dbi_connect_info->[0] &&
1142             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1143           $self->_populate_dbh;
1144           $driver = $self->_dbh->{Driver}{Name};
1145         }
1146         else {
1147           # try to use dsn to not require being connected, the driver may still
1148           # force a connection in _rebless to determine version
1149           # (dsn may not be supplied at all if all we do is make a mock-schema)
1150           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1151           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1152           $driver ||= $ENV{DBI_DRIVER};
1153         }
1154       }
1155
1156       if ($driver) {
1157         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1158         if ($self->load_optional_class($storage_class)) {
1159           mro::set_mro($storage_class, 'c3');
1160           bless $self, $storage_class;
1161           $self->_rebless();
1162         }
1163       }
1164     }
1165
1166     $self->_driver_determined(1);
1167
1168     $self->_init; # run driver-specific initializations
1169
1170     $self->_run_connection_actions
1171         if !$started_connected && defined $self->_dbh;
1172   }
1173 }
1174
1175 sub _do_connection_actions {
1176   my $self          = shift;
1177   my $method_prefix = shift;
1178   my $call          = shift;
1179
1180   if (not ref($call)) {
1181     my $method = $method_prefix . $call;
1182     $self->$method(@_);
1183   } elsif (ref($call) eq 'CODE') {
1184     $self->$call(@_);
1185   } elsif (ref($call) eq 'ARRAY') {
1186     if (ref($call->[0]) ne 'ARRAY') {
1187       $self->_do_connection_actions($method_prefix, $_) for @$call;
1188     } else {
1189       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1190     }
1191   } else {
1192     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1193   }
1194
1195   return $self;
1196 }
1197
1198 sub connect_call_do_sql {
1199   my $self = shift;
1200   $self->_do_query(@_);
1201 }
1202
1203 sub disconnect_call_do_sql {
1204   my $self = shift;
1205   $self->_do_query(@_);
1206 }
1207
1208 # override in db-specific backend when necessary
1209 sub connect_call_datetime_setup { 1 }
1210
1211 sub _do_query {
1212   my ($self, $action) = @_;
1213
1214   if (ref $action eq 'CODE') {
1215     $action = $action->($self);
1216     $self->_do_query($_) foreach @$action;
1217   }
1218   else {
1219     # Most debuggers expect ($sql, @bind), so we need to exclude
1220     # the attribute hash which is the second argument to $dbh->do
1221     # furthermore the bind values are usually to be presented
1222     # as named arrayref pairs, so wrap those here too
1223     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1224     my $sql = shift @do_args;
1225     my $attrs = shift @do_args;
1226     my @bind = map { [ undef, $_ ] } @do_args;
1227
1228     $self->_query_start($sql, @bind);
1229     $self->_get_dbh->do($sql, $attrs, @do_args);
1230     $self->_query_end($sql, @bind);
1231   }
1232
1233   return $self;
1234 }
1235
1236 sub _connect {
1237   my ($self, @info) = @_;
1238
1239   $self->throw_exception("You failed to provide any connection info")
1240     if !@info;
1241
1242   my ($old_connect_via, $dbh);
1243
1244   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1245     $old_connect_via = $DBI::connect_via;
1246     $DBI::connect_via = 'connect';
1247   }
1248
1249   try {
1250     if(ref $info[0] eq 'CODE') {
1251        $dbh = $info[0]->();
1252     }
1253     else {
1254        $dbh = DBI->connect(@info);
1255     }
1256
1257     if (!$dbh) {
1258       die $DBI::errstr;
1259     }
1260
1261     unless ($self->unsafe) {
1262
1263       # this odd anonymous coderef dereference is in fact really
1264       # necessary to avoid the unwanted effect described in perl5
1265       # RT#75792
1266       sub {
1267         my $weak_self = $_[0];
1268         weaken $weak_self;
1269
1270         $_[1]->{HandleError} = sub {
1271           if ($weak_self) {
1272             $weak_self->throw_exception("DBI Exception: $_[0]");
1273           }
1274           else {
1275             # the handler may be invoked by something totally out of
1276             # the scope of DBIC
1277             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1278           }
1279         };
1280       }->($self, $dbh);
1281
1282       $dbh->{ShowErrorStatement} = 1;
1283       $dbh->{RaiseError} = 1;
1284       $dbh->{PrintError} = 0;
1285     }
1286   }
1287   catch {
1288     $self->throw_exception("DBI Connection failed: $_")
1289   }
1290   finally {
1291     $DBI::connect_via = $old_connect_via if $old_connect_via;
1292   };
1293
1294   $self->_dbh_autocommit($dbh->{AutoCommit});
1295   $dbh;
1296 }
1297
1298 sub svp_begin {
1299   my ($self, $name) = @_;
1300
1301   $name = $self->_svp_generate_name
1302     unless defined $name;
1303
1304   $self->throw_exception ("You can't use savepoints outside a transaction")
1305     if $self->{transaction_depth} == 0;
1306
1307   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1308     unless $self->can('_svp_begin');
1309
1310   push @{ $self->{savepoints} }, $name;
1311
1312   $self->debugobj->svp_begin($name) if $self->debug;
1313
1314   return $self->_svp_begin($name);
1315 }
1316
1317 sub svp_release {
1318   my ($self, $name) = @_;
1319
1320   $self->throw_exception ("You can't use savepoints outside a transaction")
1321     if $self->{transaction_depth} == 0;
1322
1323   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1324     unless $self->can('_svp_release');
1325
1326   if (defined $name) {
1327     $self->throw_exception ("Savepoint '$name' does not exist")
1328       unless grep { $_ eq $name } @{ $self->{savepoints} };
1329
1330     # Dig through the stack until we find the one we are releasing.  This keeps
1331     # the stack up to date.
1332     my $svp;
1333
1334     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1335   } else {
1336     $name = pop @{ $self->{savepoints} };
1337   }
1338
1339   $self->debugobj->svp_release($name) if $self->debug;
1340
1341   return $self->_svp_release($name);
1342 }
1343
1344 sub svp_rollback {
1345   my ($self, $name) = @_;
1346
1347   $self->throw_exception ("You can't use savepoints outside a transaction")
1348     if $self->{transaction_depth} == 0;
1349
1350   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1351     unless $self->can('_svp_rollback');
1352
1353   if (defined $name) {
1354       # If they passed us a name, verify that it exists in the stack
1355       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1356           $self->throw_exception("Savepoint '$name' does not exist!");
1357       }
1358
1359       # Dig through the stack until we find the one we are releasing.  This keeps
1360       # the stack up to date.
1361       while(my $s = pop(@{ $self->{savepoints} })) {
1362           last if($s eq $name);
1363       }
1364       # Add the savepoint back to the stack, as a rollback doesn't remove the
1365       # named savepoint, only everything after it.
1366       push(@{ $self->{savepoints} }, $name);
1367   } else {
1368       # We'll assume they want to rollback to the last savepoint
1369       $name = $self->{savepoints}->[-1];
1370   }
1371
1372   $self->debugobj->svp_rollback($name) if $self->debug;
1373
1374   return $self->_svp_rollback($name);
1375 }
1376
1377 sub _svp_generate_name {
1378     my ($self) = @_;
1379
1380     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1381 }
1382
1383 sub txn_begin {
1384   my $self = shift;
1385
1386   # this means we have not yet connected and do not know the AC status
1387   # (e.g. coderef $dbh)
1388   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1389
1390   if($self->{transaction_depth} == 0) {
1391     $self->debugobj->txn_begin()
1392       if $self->debug;
1393     $self->_dbh_begin_work;
1394   }
1395   elsif ($self->auto_savepoint) {
1396     $self->svp_begin;
1397   }
1398   $self->{transaction_depth}++;
1399 }
1400
1401 sub _dbh_begin_work {
1402   my $self = shift;
1403
1404   # if the user is utilizing txn_do - good for him, otherwise we need to
1405   # ensure that the $dbh is healthy on BEGIN.
1406   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1407   # will be replaced by a failure of begin_work itself (which will be
1408   # then retried on reconnect)
1409   if ($self->{_in_dbh_do}) {
1410     $self->_dbh->begin_work;
1411   } else {
1412     $self->dbh_do(sub { $_[1]->begin_work });
1413   }
1414 }
1415
1416 sub txn_commit {
1417   my $self = shift;
1418   if ($self->{transaction_depth} == 1) {
1419     $self->debugobj->txn_commit()
1420       if ($self->debug);
1421     $self->_dbh_commit;
1422     $self->{transaction_depth} = 0
1423       if $self->_dbh_autocommit;
1424   }
1425   elsif($self->{transaction_depth} > 1) {
1426     $self->{transaction_depth}--;
1427     $self->svp_release
1428       if $self->auto_savepoint;
1429   }
1430 }
1431
1432 sub _dbh_commit {
1433   my $self = shift;
1434   my $dbh  = $self->_dbh
1435     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1436   $dbh->commit;
1437 }
1438
1439 sub txn_rollback {
1440   my $self = shift;
1441   my $dbh = $self->_dbh;
1442   try {
1443     if ($self->{transaction_depth} == 1) {
1444       $self->debugobj->txn_rollback()
1445         if ($self->debug);
1446       $self->{transaction_depth} = 0
1447         if $self->_dbh_autocommit;
1448       $self->_dbh_rollback;
1449     }
1450     elsif($self->{transaction_depth} > 1) {
1451       $self->{transaction_depth}--;
1452       if ($self->auto_savepoint) {
1453         $self->svp_rollback;
1454         $self->svp_release;
1455       }
1456     }
1457     else {
1458       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1459     }
1460   }
1461   catch {
1462     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1463
1464     if ($_ !~ /$exception_class/) {
1465       # ensure that a failed rollback resets the transaction depth
1466       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1467     }
1468
1469     $self->throw_exception($_)
1470   };
1471 }
1472
1473 sub _dbh_rollback {
1474   my $self = shift;
1475   my $dbh  = $self->_dbh
1476     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1477   $dbh->rollback;
1478 }
1479
1480 # This used to be the top-half of _execute.  It was split out to make it
1481 #  easier to override in NoBindVars without duping the rest.  It takes up
1482 #  all of _execute's args, and emits $sql, @bind.
1483 sub _prep_for_execute {
1484   my ($self, $op, $extra_bind, $ident, $args) = @_;
1485
1486   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1487     $ident = $ident->from();
1488   }
1489
1490   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1491
1492   unshift(@bind,
1493     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1494       if $extra_bind;
1495   return ($sql, \@bind);
1496 }
1497
1498
1499 sub _fix_bind_params {
1500     my ($self, @bind) = @_;
1501
1502     ### Turn @bind from something like this:
1503     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1504     ### to this:
1505     ###   ( "'1'", "'1'", "'3'" )
1506     return
1507         map {
1508             if ( defined( $_ && $_->[1] ) ) {
1509                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1510             }
1511             else { q{'NULL'}; }
1512         } @bind;
1513 }
1514
1515 sub _query_start {
1516     my ( $self, $sql, @bind ) = @_;
1517
1518     if ( $self->debug ) {
1519         @bind = $self->_fix_bind_params(@bind);
1520
1521         $self->debugobj->query_start( $sql, @bind );
1522     }
1523 }
1524
1525 sub _query_end {
1526     my ( $self, $sql, @bind ) = @_;
1527
1528     if ( $self->debug ) {
1529         @bind = $self->_fix_bind_params(@bind);
1530         $self->debugobj->query_end( $sql, @bind );
1531     }
1532 }
1533
1534 sub _dbh_execute {
1535   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1536
1537   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1538
1539   $self->_query_start( $sql, @$bind );
1540
1541   my $sth = $self->sth($sql,$op);
1542
1543   my $placeholder_index = 1;
1544
1545   foreach my $bound (@$bind) {
1546     my $attributes = {};
1547     my($column_name, @data) = @$bound;
1548
1549     if ($bind_attributes) {
1550       $attributes = $bind_attributes->{$column_name}
1551       if defined $bind_attributes->{$column_name};
1552     }
1553
1554     foreach my $data (@data) {
1555       my $ref = ref $data;
1556       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1557
1558       $sth->bind_param($placeholder_index, $data, $attributes);
1559       $placeholder_index++;
1560     }
1561   }
1562
1563   # Can this fail without throwing an exception anyways???
1564   my $rv = $sth->execute();
1565   $self->throw_exception(
1566     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1567   ) if !$rv;
1568
1569   $self->_query_end( $sql, @$bind );
1570
1571   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1572 }
1573
1574 sub _execute {
1575     my $self = shift;
1576     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1577 }
1578
1579 sub _prefetch_insert_auto_nextvals {
1580   my ($self, $source, $to_insert) = @_;
1581
1582   my $upd = {};
1583
1584   foreach my $col ( $source->columns ) {
1585     if ( !defined $to_insert->{$col} ) {
1586       my $col_info = $source->column_info($col);
1587
1588       if ( $col_info->{auto_nextval} ) {
1589         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1590           'nextval',
1591           $col_info->{sequence} ||=
1592             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1593         );
1594       }
1595     }
1596   }
1597
1598   return $upd;
1599 }
1600
1601 sub insert {
1602   my $self = shift;
1603   my ($source, $to_insert, $opts) = @_;
1604
1605   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1606
1607   my $bind_attributes = $self->source_bind_attributes($source);
1608
1609   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1610
1611   if ($opts->{returning}) {
1612     my @ret_cols = @{$opts->{returning}};
1613
1614     my @ret_vals = try {
1615       local $SIG{__WARN__} = sub {};
1616       my @r = $sth->fetchrow_array;
1617       $sth->finish;
1618       @r;
1619     };
1620
1621     my %ret;
1622     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1623
1624     $updated_cols = {
1625       %$updated_cols,
1626       %ret,
1627     };
1628   }
1629
1630   return $updated_cols;
1631 }
1632
1633 ## Currently it is assumed that all values passed will be "normal", i.e. not
1634 ## scalar refs, or at least, all the same type as the first set, the statement is
1635 ## only prepped once.
1636 sub insert_bulk {
1637   my ($self, $source, $cols, $data) = @_;
1638
1639   my %colvalues;
1640   @colvalues{@$cols} = (0..$#$cols);
1641
1642   for my $i (0..$#$cols) {
1643     my $first_val = $data->[0][$i];
1644     next unless ref $first_val eq 'SCALAR';
1645
1646     $colvalues{ $cols->[$i] } = $first_val;
1647   }
1648
1649   # check for bad data and stringify stringifiable objects
1650   my $bad_slice = sub {
1651     my ($msg, $col_idx, $slice_idx) = @_;
1652     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1653       $msg,
1654       $cols->[$col_idx],
1655       do {
1656         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1657         Dumper {
1658           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1659         },
1660       }
1661     );
1662   };
1663
1664   for my $datum_idx (0..$#$data) {
1665     my $datum = $data->[$datum_idx];
1666
1667     for my $col_idx (0..$#$cols) {
1668       my $val            = $datum->[$col_idx];
1669       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1670       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1671
1672       if ($is_literal_sql) {
1673         if (not ref $val) {
1674           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1675         }
1676         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1677           $bad_slice->("$reftype reference found where literal SQL expected",
1678             $col_idx, $datum_idx);
1679         }
1680         elsif ($$val ne $$sqla_bind){
1681           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1682             $col_idx, $datum_idx);
1683         }
1684       }
1685       elsif (my $reftype = ref $val) {
1686         require overload;
1687         if (overload::Method($val, '""')) {
1688           $datum->[$col_idx] = "".$val;
1689         }
1690         else {
1691           $bad_slice->("$reftype reference found where bind expected",
1692             $col_idx, $datum_idx);
1693         }
1694       }
1695     }
1696   }
1697
1698   my ($sql, $bind) = $self->_prep_for_execute (
1699     'insert', undef, $source, [\%colvalues]
1700   );
1701   my @bind = @$bind;
1702
1703   my $empty_bind = 1 if (not @bind) &&
1704     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1705
1706   if ((not @bind) && (not $empty_bind)) {
1707     $self->throw_exception(
1708       'Cannot insert_bulk without support for placeholders'
1709     );
1710   }
1711
1712   # neither _execute_array, nor _execute_inserts_with_no_binds are
1713   # atomic (even if _execute _array is a single call). Thus a safety
1714   # scope guard
1715   my $guard = $self->txn_scope_guard;
1716
1717   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1718   my $sth = $self->sth($sql);
1719   my $rv = do {
1720     if ($empty_bind) {
1721       # bind_param_array doesn't work if there are no binds
1722       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1723     }
1724     else {
1725 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1726       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1727     }
1728   };
1729
1730   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1731
1732   $guard->commit;
1733
1734   return (wantarray ? ($rv, $sth, @bind) : $rv);
1735 }
1736
1737 sub _execute_array {
1738   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1739
1740   ## This must be an arrayref, else nothing works!
1741   my $tuple_status = [];
1742
1743   ## Get the bind_attributes, if any exist
1744   my $bind_attributes = $self->source_bind_attributes($source);
1745
1746   ## Bind the values and execute
1747   my $placeholder_index = 1;
1748
1749   foreach my $bound (@$bind) {
1750
1751     my $attributes = {};
1752     my ($column_name, $data_index) = @$bound;
1753
1754     if( $bind_attributes ) {
1755       $attributes = $bind_attributes->{$column_name}
1756       if defined $bind_attributes->{$column_name};
1757     }
1758
1759     my @data = map { $_->[$data_index] } @$data;
1760
1761     $sth->bind_param_array(
1762       $placeholder_index,
1763       [@data],
1764       (%$attributes ?  $attributes : ()),
1765     );
1766     $placeholder_index++;
1767   }
1768
1769   my ($rv, $err);
1770   try {
1771     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1772   }
1773   catch {
1774     $err = shift;
1775   }
1776   finally {
1777     # Statement must finish even if there was an exception.
1778     try {
1779       $sth->finish
1780     }
1781     catch {
1782       $err = shift unless defined $err
1783     };
1784   };
1785
1786   $err = $sth->errstr
1787     if (! defined $err and $sth->err);
1788
1789   if (defined $err) {
1790     my $i = 0;
1791     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1792
1793     $self->throw_exception("Unexpected populate error: $err")
1794       if ($i > $#$tuple_status);
1795
1796     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1797       ($tuple_status->[$i][1] || $err),
1798       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1799     );
1800   }
1801
1802   return $rv;
1803 }
1804
1805 sub _dbh_execute_array {
1806     my ($self, $sth, $tuple_status, @extra) = @_;
1807
1808     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1809 }
1810
1811 sub _dbh_execute_inserts_with_no_binds {
1812   my ($self, $sth, $count) = @_;
1813
1814   my $err;
1815   try {
1816     my $dbh = $self->_get_dbh;
1817     local $dbh->{RaiseError} = 1;
1818     local $dbh->{PrintError} = 0;
1819
1820     $sth->execute foreach 1..$count;
1821   }
1822   catch {
1823     $err = shift;
1824   }
1825   finally {
1826     # Make sure statement is finished even if there was an exception.
1827     try {
1828       $sth->finish
1829     }
1830     catch {
1831       $err = shift unless defined $err;
1832     };
1833   };
1834
1835   $self->throw_exception($err) if defined $err;
1836
1837   return $count;
1838 }
1839
1840 sub update {
1841   my ($self, $source, @args) = @_;
1842
1843   my $bind_attrs = $self->source_bind_attributes($source);
1844
1845   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1846 }
1847
1848
1849 sub delete {
1850   my ($self, $source, @args) = @_;
1851
1852   my $bind_attrs = $self->source_bind_attributes($source);
1853
1854   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1855 }
1856
1857 # We were sent here because the $rs contains a complex search
1858 # which will require a subquery to select the correct rows
1859 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1860 #
1861 # Generating a single PK column subquery is trivial and supported
1862 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1863 # Look at _multipk_update_delete()
1864 sub _subq_update_delete {
1865   my $self = shift;
1866   my ($rs, $op, $values) = @_;
1867
1868   my $rsrc = $rs->result_source;
1869
1870   # quick check if we got a sane rs on our hands
1871   my @pcols = $rsrc->_pri_cols;
1872
1873   my $sel = $rs->_resolved_attrs->{select};
1874   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1875
1876   if (
1877       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1878         ne
1879       join ("\x00", sort @$sel )
1880   ) {
1881     $self->throw_exception (
1882       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1883     );
1884   }
1885
1886   if (@pcols == 1) {
1887     return $self->$op (
1888       $rsrc,
1889       $op eq 'update' ? $values : (),
1890       { $pcols[0] => { -in => $rs->as_query } },
1891     );
1892   }
1893
1894   else {
1895     return $self->_multipk_update_delete (@_);
1896   }
1897 }
1898
1899 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1900 # resultset update/delete involving subqueries. So by default resort
1901 # to simple (and inefficient) delete_all style per-row opearations,
1902 # while allowing specific storages to override this with a faster
1903 # implementation.
1904 #
1905 sub _multipk_update_delete {
1906   return shift->_per_row_update_delete (@_);
1907 }
1908
1909 # This is the default loop used to delete/update rows for multi PK
1910 # resultsets, and used by mysql exclusively (because it can't do anything
1911 # else).
1912 #
1913 # We do not use $row->$op style queries, because resultset update/delete
1914 # is not expected to cascade (this is what delete_all/update_all is for).
1915 #
1916 # There should be no race conditions as the entire operation is rolled
1917 # in a transaction.
1918 #
1919 sub _per_row_update_delete {
1920   my $self = shift;
1921   my ($rs, $op, $values) = @_;
1922
1923   my $rsrc = $rs->result_source;
1924   my @pcols = $rsrc->_pri_cols;
1925
1926   my $guard = $self->txn_scope_guard;
1927
1928   # emulate the return value of $sth->execute for non-selects
1929   my $row_cnt = '0E0';
1930
1931   my $subrs_cur = $rs->cursor;
1932   my @all_pk = $subrs_cur->all;
1933   for my $pks ( @all_pk) {
1934
1935     my $cond;
1936     for my $i (0.. $#pcols) {
1937       $cond->{$pcols[$i]} = $pks->[$i];
1938     }
1939
1940     $self->$op (
1941       $rsrc,
1942       $op eq 'update' ? $values : (),
1943       $cond,
1944     );
1945
1946     $row_cnt++;
1947   }
1948
1949   $guard->commit;
1950
1951   return $row_cnt;
1952 }
1953
1954 sub _select {
1955   my $self = shift;
1956   $self->_execute($self->_select_args(@_));
1957 }
1958
1959 sub _select_args_to_query {
1960   my $self = shift;
1961
1962   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1963   #  = $self->_select_args($ident, $select, $cond, $attrs);
1964   my ($op, $bind, $ident, $bind_attrs, @args) =
1965     $self->_select_args(@_);
1966
1967   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1968   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1969   $prepared_bind ||= [];
1970
1971   return wantarray
1972     ? ($sql, $prepared_bind, $bind_attrs)
1973     : \[ "($sql)", @$prepared_bind ]
1974   ;
1975 }
1976
1977 sub _select_args {
1978   my ($self, $ident, $select, $where, $attrs) = @_;
1979
1980   my $sql_maker = $self->sql_maker;
1981   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1982
1983   $attrs = {
1984     %$attrs,
1985     select => $select,
1986     from => $ident,
1987     where => $where,
1988     $rs_alias && $alias2source->{$rs_alias}
1989       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1990       : ()
1991     ,
1992   };
1993
1994   # calculate bind_attrs before possible $ident mangling
1995   my $bind_attrs = {};
1996   for my $alias (keys %$alias2source) {
1997     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1998     for my $col (keys %$bindtypes) {
1999
2000       my $fqcn = join ('.', $alias, $col);
2001       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2002
2003       # Unqialified column names are nice, but at the same time can be
2004       # rather ambiguous. What we do here is basically go along with
2005       # the loop, adding an unqualified column slot to $bind_attrs,
2006       # alongside the fully qualified name. As soon as we encounter
2007       # another column by that name (which would imply another table)
2008       # we unset the unqualified slot and never add any info to it
2009       # to avoid erroneous type binding. If this happens the users
2010       # only choice will be to fully qualify his column name
2011
2012       if (exists $bind_attrs->{$col}) {
2013         $bind_attrs->{$col} = {};
2014       }
2015       else {
2016         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2017       }
2018     }
2019   }
2020
2021   # Sanity check the attributes (SQLMaker does it too, but
2022   # in case of a software_limit we'll never reach there)
2023   if (defined $attrs->{offset}) {
2024     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2025       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2026   }
2027   $attrs->{offset} ||= 0;
2028
2029   if (defined $attrs->{rows}) {
2030     $self->throw_exception("The rows attribute must be a positive integer if present")
2031       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2032   }
2033   elsif ($attrs->{offset}) {
2034     # MySQL actually recommends this approach.  I cringe.
2035     $attrs->{rows} = $sql_maker->__max_int;
2036   }
2037
2038   my @limit;
2039
2040   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2041   # storage, unless software limit was requested
2042   if (
2043     #limited has_many
2044     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2045        ||
2046     # grouped prefetch (to satisfy group_by == select)
2047     ( $attrs->{group_by}
2048         &&
2049       @{$attrs->{group_by}}
2050         &&
2051       $attrs->{_prefetch_select}
2052         &&
2053       @{$attrs->{_prefetch_select}}
2054     )
2055   ) {
2056     ($ident, $select, $where, $attrs)
2057       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2058   }
2059   elsif (! $attrs->{software_limit} ) {
2060     push @limit, $attrs->{rows}, $attrs->{offset};
2061   }
2062
2063   # try to simplify the joinmap further (prune unreferenced type-single joins)
2064   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2065
2066 ###
2067   # This would be the point to deflate anything found in $where
2068   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2069   # expect a row object. And all we have is a resultsource (it is trivial
2070   # to extract deflator coderefs via $alias2source above).
2071   #
2072   # I don't see a way forward other than changing the way deflators are
2073   # invoked, and that's just bad...
2074 ###
2075
2076   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2077 }
2078
2079 # Returns a counting SELECT for a simple count
2080 # query. Abstracted so that a storage could override
2081 # this to { count => 'firstcol' } or whatever makes
2082 # sense as a performance optimization
2083 sub _count_select {
2084   #my ($self, $source, $rs_attrs) = @_;
2085   return { count => '*' };
2086 }
2087
2088
2089 sub source_bind_attributes {
2090   my ($self, $source) = @_;
2091
2092   my $bind_attributes;
2093   foreach my $column ($source->columns) {
2094
2095     my $data_type = $source->column_info($column)->{data_type} || '';
2096     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2097      if $data_type;
2098   }
2099
2100   return $bind_attributes;
2101 }
2102
2103 =head2 select
2104
2105 =over 4
2106
2107 =item Arguments: $ident, $select, $condition, $attrs
2108
2109 =back
2110
2111 Handle a SQL select statement.
2112
2113 =cut
2114
2115 sub select {
2116   my $self = shift;
2117   my ($ident, $select, $condition, $attrs) = @_;
2118   return $self->cursor_class->new($self, \@_, $attrs);
2119 }
2120
2121 sub select_single {
2122   my $self = shift;
2123   my ($rv, $sth, @bind) = $self->_select(@_);
2124   my @row = $sth->fetchrow_array;
2125   my @nextrow = $sth->fetchrow_array if @row;
2126   if(@row && @nextrow) {
2127     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2128   }
2129   # Need to call finish() to work round broken DBDs
2130   $sth->finish();
2131   return @row;
2132 }
2133
2134 =head2 sql_limit_dialect
2135
2136 This is an accessor for the default SQL limit dialect used by a particular
2137 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2138 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2139 see L<DBIx::Class::SQLMaker::LimitDialects>.
2140
2141 =head2 sth
2142
2143 =over 4
2144
2145 =item Arguments: $sql
2146
2147 =back
2148
2149 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2150
2151 =cut
2152
2153 sub _dbh_sth {
2154   my ($self, $dbh, $sql) = @_;
2155
2156   # 3 is the if_active parameter which avoids active sth re-use
2157   my $sth = $self->disable_sth_caching
2158     ? $dbh->prepare($sql)
2159     : $dbh->prepare_cached($sql, {}, 3);
2160
2161   # XXX You would think RaiseError would make this impossible,
2162   #  but apparently that's not true :(
2163   $self->throw_exception($dbh->errstr) if !$sth;
2164
2165   $sth;
2166 }
2167
2168 sub sth {
2169   my ($self, $sql) = @_;
2170   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2171 }
2172
2173 sub _dbh_columns_info_for {
2174   my ($self, $dbh, $table) = @_;
2175
2176   if ($dbh->can('column_info')) {
2177     my %result;
2178     my $caught;
2179     try {
2180       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2181       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2182       $sth->execute();
2183       while ( my $info = $sth->fetchrow_hashref() ){
2184         my %column_info;
2185         $column_info{data_type}   = $info->{TYPE_NAME};
2186         $column_info{size}      = $info->{COLUMN_SIZE};
2187         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2188         $column_info{default_value} = $info->{COLUMN_DEF};
2189         my $col_name = $info->{COLUMN_NAME};
2190         $col_name =~ s/^\"(.*)\"$/$1/;
2191
2192         $result{$col_name} = \%column_info;
2193       }
2194     } catch {
2195       $caught = 1;
2196     };
2197     return \%result if !$caught && scalar keys %result;
2198   }
2199
2200   my %result;
2201   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2202   $sth->execute;
2203   my @columns = @{$sth->{NAME_lc}};
2204   for my $i ( 0 .. $#columns ){
2205     my %column_info;
2206     $column_info{data_type} = $sth->{TYPE}->[$i];
2207     $column_info{size} = $sth->{PRECISION}->[$i];
2208     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2209
2210     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2211       $column_info{data_type} = $1;
2212       $column_info{size}    = $2;
2213     }
2214
2215     $result{$columns[$i]} = \%column_info;
2216   }
2217   $sth->finish;
2218
2219   foreach my $col (keys %result) {
2220     my $colinfo = $result{$col};
2221     my $type_num = $colinfo->{data_type};
2222     my $type_name;
2223     if(defined $type_num && $dbh->can('type_info')) {
2224       my $type_info = $dbh->type_info($type_num);
2225       $type_name = $type_info->{TYPE_NAME} if $type_info;
2226       $colinfo->{data_type} = $type_name if $type_name;
2227     }
2228   }
2229
2230   return \%result;
2231 }
2232
2233 sub columns_info_for {
2234   my ($self, $table) = @_;
2235   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2236 }
2237
2238 =head2 last_insert_id
2239
2240 Return the row id of the last insert.
2241
2242 =cut
2243
2244 sub _dbh_last_insert_id {
2245     my ($self, $dbh, $source, $col) = @_;
2246
2247     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2248
2249     return $id if defined $id;
2250
2251     my $class = ref $self;
2252     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2253 }
2254
2255 sub last_insert_id {
2256   my $self = shift;
2257   $self->_dbh_last_insert_id ($self->_dbh, @_);
2258 }
2259
2260 =head2 _native_data_type
2261
2262 =over 4
2263
2264 =item Arguments: $type_name
2265
2266 =back
2267
2268 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2269 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2270 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2271
2272 The default implementation returns C<undef>, implement in your Storage driver if
2273 you need this functionality.
2274
2275 Should map types from other databases to the native RDBMS type, for example
2276 C<VARCHAR2> to C<VARCHAR>.
2277
2278 Types with modifiers should map to the underlying data type. For example,
2279 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2280
2281 Composite types should map to the container type, for example
2282 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2283
2284 =cut
2285
2286 sub _native_data_type {
2287   #my ($self, $data_type) = @_;
2288   return undef
2289 }
2290
2291 # Check if placeholders are supported at all
2292 sub _determine_supports_placeholders {
2293   my $self = shift;
2294   my $dbh  = $self->_get_dbh;
2295
2296   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2297   # but it is inaccurate more often than not
2298   return try {
2299     local $dbh->{PrintError} = 0;
2300     local $dbh->{RaiseError} = 1;
2301     $dbh->do('select ?', {}, 1);
2302     1;
2303   }
2304   catch {
2305     0;
2306   };
2307 }
2308
2309 # Check if placeholders bound to non-string types throw exceptions
2310 #
2311 sub _determine_supports_typeless_placeholders {
2312   my $self = shift;
2313   my $dbh  = $self->_get_dbh;
2314
2315   return try {
2316     local $dbh->{PrintError} = 0;
2317     local $dbh->{RaiseError} = 1;
2318     # this specifically tests a bind that is NOT a string
2319     $dbh->do('select 1 where 1 = ?', {}, 1);
2320     1;
2321   }
2322   catch {
2323     0;
2324   };
2325 }
2326
2327 =head2 sqlt_type
2328
2329 Returns the database driver name.
2330
2331 =cut
2332
2333 sub sqlt_type {
2334   shift->_get_dbh->{Driver}->{Name};
2335 }
2336
2337 =head2 bind_attribute_by_data_type
2338
2339 Given a datatype from column info, returns a database specific bind
2340 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2341 let the database planner just handle it.
2342
2343 Generally only needed for special case column types, like bytea in postgres.
2344
2345 =cut
2346
2347 sub bind_attribute_by_data_type {
2348     return;
2349 }
2350
2351 =head2 is_datatype_numeric
2352
2353 Given a datatype from column_info, returns a boolean value indicating if
2354 the current RDBMS considers it a numeric value. This controls how
2355 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2356 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2357 be performed instead of the usual C<eq>.
2358
2359 =cut
2360
2361 sub is_datatype_numeric {
2362   my ($self, $dt) = @_;
2363
2364   return 0 unless $dt;
2365
2366   return $dt =~ /^ (?:
2367     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2368   ) $/ix;
2369 }
2370
2371
2372 =head2 create_ddl_dir
2373
2374 =over 4
2375
2376 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2377
2378 =back
2379
2380 Creates a SQL file based on the Schema, for each of the specified
2381 database engines in C<\@databases> in the given directory.
2382 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2383
2384 Given a previous version number, this will also create a file containing
2385 the ALTER TABLE statements to transform the previous schema into the
2386 current one. Note that these statements may contain C<DROP TABLE> or
2387 C<DROP COLUMN> statements that can potentially destroy data.
2388
2389 The file names are created using the C<ddl_filename> method below, please
2390 override this method in your schema if you would like a different file
2391 name format. For the ALTER file, the same format is used, replacing
2392 $version in the name with "$preversion-$version".
2393
2394 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2395 The most common value for this would be C<< { add_drop_table => 1 } >>
2396 to have the SQL produced include a C<DROP TABLE> statement for each table
2397 created. For quoting purposes supply C<quote_table_names> and
2398 C<quote_field_names>.
2399
2400 If no arguments are passed, then the following default values are assumed:
2401
2402 =over 4
2403
2404 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2405
2406 =item version    - $schema->schema_version
2407
2408 =item directory  - './'
2409
2410 =item preversion - <none>
2411
2412 =back
2413
2414 By default, C<\%sqlt_args> will have
2415
2416  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2417
2418 merged with the hash passed in. To disable any of those features, pass in a
2419 hashref like the following
2420
2421  { ignore_constraint_names => 0, # ... other options }
2422
2423
2424 WARNING: You are strongly advised to check all SQL files created, before applying
2425 them.
2426
2427 =cut
2428
2429 sub create_ddl_dir {
2430   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2431
2432   unless ($dir) {
2433     carp "No directory given, using ./\n";
2434     $dir = './';
2435   } else {
2436       -d $dir
2437         or
2438       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2439         or
2440       $self->throw_exception(
2441         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2442       );
2443   }
2444
2445   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2446
2447   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2448   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2449
2450   my $schema_version = $schema->schema_version || '1.x';
2451   $version ||= $schema_version;
2452
2453   $sqltargs = {
2454     add_drop_table => 1,
2455     ignore_constraint_names => 1,
2456     ignore_index_names => 1,
2457     %{$sqltargs || {}}
2458   };
2459
2460   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2461     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2462   }
2463
2464   my $sqlt = SQL::Translator->new( $sqltargs );
2465
2466   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2467   my $sqlt_schema = $sqlt->translate({ data => $schema })
2468     or $self->throw_exception ($sqlt->error);
2469
2470   foreach my $db (@$databases) {
2471     $sqlt->reset();
2472     $sqlt->{schema} = $sqlt_schema;
2473     $sqlt->producer($db);
2474
2475     my $file;
2476     my $filename = $schema->ddl_filename($db, $version, $dir);
2477     if (-e $filename && ($version eq $schema_version )) {
2478       # if we are dumping the current version, overwrite the DDL
2479       carp "Overwriting existing DDL file - $filename";
2480       unlink($filename);
2481     }
2482
2483     my $output = $sqlt->translate;
2484     if(!$output) {
2485       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2486       next;
2487     }
2488     if(!open($file, ">$filename")) {
2489       $self->throw_exception("Can't open $filename for writing ($!)");
2490       next;
2491     }
2492     print $file $output;
2493     close($file);
2494
2495     next unless ($preversion);
2496
2497     require SQL::Translator::Diff;
2498
2499     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2500     if(!-e $prefilename) {
2501       carp("No previous schema file found ($prefilename)");
2502       next;
2503     }
2504
2505     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2506     if(-e $difffile) {
2507       carp("Overwriting existing diff file - $difffile");
2508       unlink($difffile);
2509     }
2510
2511     my $source_schema;
2512     {
2513       my $t = SQL::Translator->new($sqltargs);
2514       $t->debug( 0 );
2515       $t->trace( 0 );
2516
2517       $t->parser( $db )
2518         or $self->throw_exception ($t->error);
2519
2520       my $out = $t->translate( $prefilename )
2521         or $self->throw_exception ($t->error);
2522
2523       $source_schema = $t->schema;
2524
2525       $source_schema->name( $prefilename )
2526         unless ( $source_schema->name );
2527     }
2528
2529     # The "new" style of producers have sane normalization and can support
2530     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2531     # And we have to diff parsed SQL against parsed SQL.
2532     my $dest_schema = $sqlt_schema;
2533
2534     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2535       my $t = SQL::Translator->new($sqltargs);
2536       $t->debug( 0 );
2537       $t->trace( 0 );
2538
2539       $t->parser( $db )
2540         or $self->throw_exception ($t->error);
2541
2542       my $out = $t->translate( $filename )
2543         or $self->throw_exception ($t->error);
2544
2545       $dest_schema = $t->schema;
2546
2547       $dest_schema->name( $filename )
2548         unless $dest_schema->name;
2549     }
2550
2551     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2552                                                   $dest_schema,   $db,
2553                                                   $sqltargs
2554                                                  );
2555     if(!open $file, ">$difffile") {
2556       $self->throw_exception("Can't write to $difffile ($!)");
2557       next;
2558     }
2559     print $file $diff;
2560     close($file);
2561   }
2562 }
2563
2564 =head2 deployment_statements
2565
2566 =over 4
2567
2568 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2569
2570 =back
2571
2572 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2573
2574 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2575 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2576
2577 C<$directory> is used to return statements from files in a previously created
2578 L</create_ddl_dir> directory and is optional. The filenames are constructed
2579 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2580
2581 If no C<$directory> is specified then the statements are constructed on the
2582 fly using L<SQL::Translator> and C<$version> is ignored.
2583
2584 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2585
2586 =cut
2587
2588 sub deployment_statements {
2589   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2590   $type ||= $self->sqlt_type;
2591   $version ||= $schema->schema_version || '1.x';
2592   $dir ||= './';
2593   my $filename = $schema->ddl_filename($type, $version, $dir);
2594   if(-f $filename)
2595   {
2596       my $file;
2597       open($file, "<$filename")
2598         or $self->throw_exception("Can't open $filename ($!)");
2599       my @rows = <$file>;
2600       close($file);
2601       return join('', @rows);
2602   }
2603
2604   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2605     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2606   }
2607
2608   # sources needs to be a parser arg, but for simplicty allow at top level
2609   # coming in
2610   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2611       if exists $sqltargs->{sources};
2612
2613   my $tr = SQL::Translator->new(
2614     producer => "SQL::Translator::Producer::${type}",
2615     %$sqltargs,
2616     parser => 'SQL::Translator::Parser::DBIx::Class',
2617     data => $schema,
2618   );
2619
2620   my @ret;
2621   my $wa = wantarray;
2622   if ($wa) {
2623     @ret = $tr->translate;
2624   }
2625   else {
2626     $ret[0] = $tr->translate;
2627   }
2628
2629   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2630     unless (@ret && defined $ret[0]);
2631
2632   return $wa ? @ret : $ret[0];
2633 }
2634
2635 sub deploy {
2636   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2637   my $deploy = sub {
2638     my $line = shift;
2639     return if($line =~ /^--/);
2640     return if(!$line);
2641     # next if($line =~ /^DROP/m);
2642     return if($line =~ /^BEGIN TRANSACTION/m);
2643     return if($line =~ /^COMMIT/m);
2644     return if $line =~ /^\s+$/; # skip whitespace only
2645     $self->_query_start($line);
2646     try {
2647       # do a dbh_do cycle here, as we need some error checking in
2648       # place (even though we will ignore errors)
2649       $self->dbh_do (sub { $_[1]->do($line) });
2650     } catch {
2651       carp qq{$_ (running "${line}")};
2652     };
2653     $self->_query_end($line);
2654   };
2655   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2656   if (@statements > 1) {
2657     foreach my $statement (@statements) {
2658       $deploy->( $statement );
2659     }
2660   }
2661   elsif (@statements == 1) {
2662     foreach my $line ( split(";\n", $statements[0])) {
2663       $deploy->( $line );
2664     }
2665   }
2666 }
2667
2668 =head2 datetime_parser
2669
2670 Returns the datetime parser class
2671
2672 =cut
2673
2674 sub datetime_parser {
2675   my $self = shift;
2676   return $self->{datetime_parser} ||= do {
2677     $self->build_datetime_parser(@_);
2678   };
2679 }
2680
2681 =head2 datetime_parser_type
2682
2683 Defines (returns) the datetime parser class - currently hardwired to
2684 L<DateTime::Format::MySQL>
2685
2686 =cut
2687
2688 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2689
2690 =head2 build_datetime_parser
2691
2692 See L</datetime_parser>
2693
2694 =cut
2695
2696 sub build_datetime_parser {
2697   my $self = shift;
2698   my $type = $self->datetime_parser_type(@_);
2699   $self->ensure_class_loaded ($type);
2700   return $type;
2701 }
2702
2703
2704 =head2 is_replicating
2705
2706 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2707 replicate from a master database.  Default is undef, which is the result
2708 returned by databases that don't support replication.
2709
2710 =cut
2711
2712 sub is_replicating {
2713     return;
2714
2715 }
2716
2717 =head2 lag_behind_master
2718
2719 Returns a number that represents a certain amount of lag behind a master db
2720 when a given storage is replicating.  The number is database dependent, but
2721 starts at zero and increases with the amount of lag. Default in undef
2722
2723 =cut
2724
2725 sub lag_behind_master {
2726     return;
2727 }
2728
2729 =head2 relname_to_table_alias
2730
2731 =over 4
2732
2733 =item Arguments: $relname, $join_count
2734
2735 =back
2736
2737 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2738 queries.
2739
2740 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2741 way these aliases are named.
2742
2743 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2744 otherwise C<"$relname">.
2745
2746 =cut
2747
2748 sub relname_to_table_alias {
2749   my ($self, $relname, $join_count) = @_;
2750
2751   my $alias = ($join_count && $join_count > 1 ?
2752     join('_', $relname, $join_count) : $relname);
2753
2754   return $alias;
2755 }
2756
2757 1;
2758
2759 =head1 USAGE NOTES
2760
2761 =head2 DBIx::Class and AutoCommit
2762
2763 DBIx::Class can do some wonderful magic with handling exceptions,
2764 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2765 (the default) combined with C<txn_do> for transaction support.
2766
2767 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2768 in an assumed transaction between commits, and you're telling us you'd
2769 like to manage that manually.  A lot of the magic protections offered by
2770 this module will go away.  We can't protect you from exceptions due to database
2771 disconnects because we don't know anything about how to restart your
2772 transactions.  You're on your own for handling all sorts of exceptional
2773 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2774 be with raw DBI.
2775
2776
2777 =head1 AUTHORS
2778
2779 Matt S. Trout <mst@shadowcatsystems.co.uk>
2780
2781 Andy Grundman <andy@hybridized.org>
2782
2783 =head1 LICENSE
2784
2785 You may distribute this code under the same terms as Perl itself.
2786
2787 =cut