Fix mysterious ::Storage::DBI goto-shim failures on older
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } @capabilities );
60
61
62 # Each of these methods need _determine_driver called before itself
63 # in order to function reliably. This is a purely DRY optimization
64 #
65 # get_(use)_dbms_capability need to be called on the correct Storage
66 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
67 # _determine_supports_X which obv. needs a correct driver as well
68 my @rdbms_specific_methods = qw/
69   deployment_statements
70   sqlt_type
71   sql_maker
72   build_datetime_parser
73   datetime_parser_type
74
75   insert
76   insert_bulk
77   update
78   delete
79   select
80   select_single
81
82   get_use_dbms_capability
83   get_dbms_capability
84 /;
85
86 for my $meth (@rdbms_specific_methods) {
87
88   my $orig = __PACKAGE__->can ($meth)
89     or die "$meth is not a ::Storage::DBI method!";
90
91   no strict qw/refs/;
92   no warnings qw/redefine/;
93   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
94     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
95       $_[0]->_determine_driver;
96
97       # This for some reason crashes and burns on perl 5.8.1
98       # IFF the method ends up throwing an exception
99       #goto $_[0]->can ($meth);
100
101       my $cref = $_[0]->can ($meth);
102       goto $cref;
103     }
104     goto $orig;
105   };
106 }
107
108
109 =head1 NAME
110
111 DBIx::Class::Storage::DBI - DBI storage handler
112
113 =head1 SYNOPSIS
114
115   my $schema = MySchema->connect('dbi:SQLite:my.db');
116
117   $schema->storage->debug(1);
118
119   my @stuff = $schema->storage->dbh_do(
120     sub {
121       my ($storage, $dbh, @args) = @_;
122       $dbh->do("DROP TABLE authors");
123     },
124     @column_list
125   );
126
127   $schema->resultset('Book')->search({
128      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
129   });
130
131 =head1 DESCRIPTION
132
133 This class represents the connection to an RDBMS via L<DBI>.  See
134 L<DBIx::Class::Storage> for general information.  This pod only
135 documents DBI-specific methods and behaviors.
136
137 =head1 METHODS
138
139 =cut
140
141 sub new {
142   my $new = shift->next::method(@_);
143
144   $new->transaction_depth(0);
145   $new->_sql_maker_opts({});
146   $new->_dbh_details({});
147   $new->{savepoints} = [];
148   $new->{_in_dbh_do} = 0;
149   $new->{_dbh_gen} = 0;
150
151   # read below to see what this does
152   $new->_arm_global_destructor;
153
154   $new;
155 }
156
157 # This is hack to work around perl shooting stuff in random
158 # order on exit(). If we do not walk the remaining storage
159 # objects in an END block, there is a *small but real* chance
160 # of a fork()ed child to kill the parent's shared DBI handle,
161 # *before perl reaches the DESTROY in this package*
162 # Yes, it is ugly and effective.
163 {
164   my %seek_and_destroy;
165
166   sub _arm_global_destructor {
167     my $self = shift;
168     my $key = Scalar::Util::refaddr ($self);
169     $seek_and_destroy{$key} = $self;
170     Scalar::Util::weaken ($seek_and_destroy{$key});
171   }
172
173   END {
174     local $?; # just in case the DBI destructor changes it somehow
175
176     # destroy just the object if not native to this process/thread
177     $_->_preserve_foreign_dbh for (grep
178       { defined $_ }
179       values %seek_and_destroy
180     );
181   }
182 }
183
184 sub DESTROY {
185   my $self = shift;
186
187   # destroy just the object if not native to this process/thread
188   $self->_preserve_foreign_dbh;
189
190   # some databases need this to stop spewing warnings
191   if (my $dbh = $self->_dbh) {
192     try {
193       %{ $dbh->{CachedKids} } = ();
194       $dbh->disconnect;
195     };
196   }
197
198   $self->_dbh(undef);
199 }
200
201 sub _preserve_foreign_dbh {
202   my $self = shift;
203
204   return unless $self->_dbh;
205
206   $self->_verify_tid;
207
208   return unless $self->_dbh;
209
210   $self->_verify_pid;
211
212 }
213
214 # handle pid changes correctly - do not destroy parent's connection
215 sub _verify_pid {
216   my $self = shift;
217
218   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
219
220   $self->_dbh->{InactiveDestroy} = 1;
221   $self->_dbh(undef);
222   $self->{_dbh_gen}++;
223
224   return;
225 }
226
227 # very similar to above, but seems to FAIL if I set InactiveDestroy
228 sub _verify_tid {
229   my $self = shift;
230
231   if ( ! defined $self->_conn_tid ) {
232     return; # no threads
233   }
234   elsif ( $self->_conn_tid == threads->tid ) {
235     return; # same thread
236   }
237
238   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
239   $self->_dbh(undef);
240   $self->{_dbh_gen}++;
241
242   return;
243 }
244
245
246 =head2 connect_info
247
248 This method is normally called by L<DBIx::Class::Schema/connection>, which
249 encapsulates its argument list in an arrayref before passing them here.
250
251 The argument list may contain:
252
253 =over
254
255 =item *
256
257 The same 4-element argument set one would normally pass to
258 L<DBI/connect>, optionally followed by
259 L<extra attributes|/DBIx::Class specific connection attributes>
260 recognized by DBIx::Class:
261
262   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
263
264 =item *
265
266 A single code reference which returns a connected
267 L<DBI database handle|DBI/connect> optionally followed by
268 L<extra attributes|/DBIx::Class specific connection attributes> recognized
269 by DBIx::Class:
270
271   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
272
273 =item *
274
275 A single hashref with all the attributes and the dsn/user/password
276 mixed together:
277
278   $connect_info_args = [{
279     dsn => $dsn,
280     user => $user,
281     password => $pass,
282     %dbi_attributes,
283     %extra_attributes,
284   }];
285
286   $connect_info_args = [{
287     dbh_maker => sub { DBI->connect (...) },
288     %dbi_attributes,
289     %extra_attributes,
290   }];
291
292 This is particularly useful for L<Catalyst> based applications, allowing the
293 following config (L<Config::General> style):
294
295   <Model::DB>
296     schema_class   App::DB
297     <connect_info>
298       dsn          dbi:mysql:database=test
299       user         testuser
300       password     TestPass
301       AutoCommit   1
302     </connect_info>
303   </Model::DB>
304
305 The C<dsn>/C<user>/C<password> combination can be substituted by the
306 C<dbh_maker> key whose value is a coderef that returns a connected
307 L<DBI database handle|DBI/connect>
308
309 =back
310
311 Please note that the L<DBI> docs recommend that you always explicitly
312 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
313 recommends that it be set to I<1>, and that you perform transactions
314 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
315 to I<1> if you do not do explicitly set it to zero.  This is the default
316 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
317
318 =head3 DBIx::Class specific connection attributes
319
320 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
321 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
322 the following connection options. These options can be mixed in with your other
323 L<DBI> connection attributes, or placed in a separate hashref
324 (C<\%extra_attributes>) as shown above.
325
326 Every time C<connect_info> is invoked, any previous settings for
327 these options will be cleared before setting the new ones, regardless of
328 whether any options are specified in the new C<connect_info>.
329
330
331 =over
332
333 =item on_connect_do
334
335 Specifies things to do immediately after connecting or re-connecting to
336 the database.  Its value may contain:
337
338 =over
339
340 =item a scalar
341
342 This contains one SQL statement to execute.
343
344 =item an array reference
345
346 This contains SQL statements to execute in order.  Each element contains
347 a string or a code reference that returns a string.
348
349 =item a code reference
350
351 This contains some code to execute.  Unlike code references within an
352 array reference, its return value is ignored.
353
354 =back
355
356 =item on_disconnect_do
357
358 Takes arguments in the same form as L</on_connect_do> and executes them
359 immediately before disconnecting from the database.
360
361 Note, this only runs if you explicitly call L</disconnect> on the
362 storage object.
363
364 =item on_connect_call
365
366 A more generalized form of L</on_connect_do> that calls the specified
367 C<connect_call_METHOD> methods in your storage driver.
368
369   on_connect_do => 'select 1'
370
371 is equivalent to:
372
373   on_connect_call => [ [ do_sql => 'select 1' ] ]
374
375 Its values may contain:
376
377 =over
378
379 =item a scalar
380
381 Will call the C<connect_call_METHOD> method.
382
383 =item a code reference
384
385 Will execute C<< $code->($storage) >>
386
387 =item an array reference
388
389 Each value can be a method name or code reference.
390
391 =item an array of arrays
392
393 For each array, the first item is taken to be the C<connect_call_> method name
394 or code reference, and the rest are parameters to it.
395
396 =back
397
398 Some predefined storage methods you may use:
399
400 =over
401
402 =item do_sql
403
404 Executes a SQL string or a code reference that returns a SQL string. This is
405 what L</on_connect_do> and L</on_disconnect_do> use.
406
407 It can take:
408
409 =over
410
411 =item a scalar
412
413 Will execute the scalar as SQL.
414
415 =item an arrayref
416
417 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
418 attributes hashref and bind values.
419
420 =item a code reference
421
422 Will execute C<< $code->($storage) >> and execute the return array refs as
423 above.
424
425 =back
426
427 =item datetime_setup
428
429 Execute any statements necessary to initialize the database session to return
430 and accept datetime/timestamp values used with
431 L<DBIx::Class::InflateColumn::DateTime>.
432
433 Only necessary for some databases, see your specific storage driver for
434 implementation details.
435
436 =back
437
438 =item on_disconnect_call
439
440 Takes arguments in the same form as L</on_connect_call> and executes them
441 immediately before disconnecting from the database.
442
443 Calls the C<disconnect_call_METHOD> methods as opposed to the
444 C<connect_call_METHOD> methods called by L</on_connect_call>.
445
446 Note, this only runs if you explicitly call L</disconnect> on the
447 storage object.
448
449 =item disable_sth_caching
450
451 If set to a true value, this option will disable the caching of
452 statement handles via L<DBI/prepare_cached>.
453
454 =item limit_dialect
455
456 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
457 default L</sql_limit_dialect> setting of the storage (if any). For a list
458 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
459
460 =item quote_char
461
462 Specifies what characters to use to quote table and column names. If
463 you use this you will want to specify L</name_sep> as well.
464
465 C<quote_char> expects either a single character, in which case is it
466 is placed on either side of the table/column name, or an arrayref of length
467 2 in which case the table/column name is placed between the elements.
468
469 For example under MySQL you should use C<< quote_char => '`' >>, and for
470 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
471
472 =item name_sep
473
474 This only needs to be used in conjunction with C<quote_char>, and is used to
475 specify the character that separates elements (schemas, tables, columns) from
476 each other. In most cases this is simply a C<.>.
477
478 The consequences of not supplying this value is that L<SQL::Abstract>
479 will assume DBIx::Class' uses of aliases to be complete column
480 names. The output will look like I<"me.name"> when it should actually
481 be I<"me"."name">.
482
483 =item unsafe
484
485 This Storage driver normally installs its own C<HandleError>, sets
486 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
487 all database handles, including those supplied by a coderef.  It does this
488 so that it can have consistent and useful error behavior.
489
490 If you set this option to a true value, Storage will not do its usual
491 modifications to the database handle's attributes, and instead relies on
492 the settings in your connect_info DBI options (or the values you set in
493 your connection coderef, in the case that you are connecting via coderef).
494
495 Note that your custom settings can cause Storage to malfunction,
496 especially if you set a C<HandleError> handler that suppresses exceptions
497 and/or disable C<RaiseError>.
498
499 =item auto_savepoint
500
501 If this option is true, L<DBIx::Class> will use savepoints when nesting
502 transactions, making it possible to recover from failure in the inner
503 transaction without having to abort all outer transactions.
504
505 =item cursor_class
506
507 Use this argument to supply a cursor class other than the default
508 L<DBIx::Class::Storage::DBI::Cursor>.
509
510 =back
511
512 Some real-life examples of arguments to L</connect_info> and
513 L<DBIx::Class::Schema/connect>
514
515   # Simple SQLite connection
516   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
517
518   # Connect via subref
519   ->connect_info([ sub { DBI->connect(...) } ]);
520
521   # Connect via subref in hashref
522   ->connect_info([{
523     dbh_maker => sub { DBI->connect(...) },
524     on_connect_do => 'alter session ...',
525   }]);
526
527   # A bit more complicated
528   ->connect_info(
529     [
530       'dbi:Pg:dbname=foo',
531       'postgres',
532       'my_pg_password',
533       { AutoCommit => 1 },
534       { quote_char => q{"}, name_sep => q{.} },
535     ]
536   );
537
538   # Equivalent to the previous example
539   ->connect_info(
540     [
541       'dbi:Pg:dbname=foo',
542       'postgres',
543       'my_pg_password',
544       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
545     ]
546   );
547
548   # Same, but with hashref as argument
549   # See parse_connect_info for explanation
550   ->connect_info(
551     [{
552       dsn         => 'dbi:Pg:dbname=foo',
553       user        => 'postgres',
554       password    => 'my_pg_password',
555       AutoCommit  => 1,
556       quote_char  => q{"},
557       name_sep    => q{.},
558     }]
559   );
560
561   # Subref + DBIx::Class-specific connection options
562   ->connect_info(
563     [
564       sub { DBI->connect(...) },
565       {
566           quote_char => q{`},
567           name_sep => q{@},
568           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
569           disable_sth_caching => 1,
570       },
571     ]
572   );
573
574
575
576 =cut
577
578 sub connect_info {
579   my ($self, $info) = @_;
580
581   return $self->_connect_info if !$info;
582
583   $self->_connect_info($info); # copy for _connect_info
584
585   $info = $self->_normalize_connect_info($info)
586     if ref $info eq 'ARRAY';
587
588   for my $storage_opt (keys %{ $info->{storage_options} }) {
589     my $value = $info->{storage_options}{$storage_opt};
590
591     $self->$storage_opt($value);
592   }
593
594   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
595   #  the new set of options
596   $self->_sql_maker(undef);
597   $self->_sql_maker_opts({});
598
599   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
600     my $value = $info->{sql_maker_options}{$sql_maker_opt};
601
602     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
603   }
604
605   my %attrs = (
606     %{ $self->_default_dbi_connect_attributes || {} },
607     %{ $info->{attributes} || {} },
608   );
609
610   my @args = @{ $info->{arguments} };
611
612   $self->_dbi_connect_info([@args,
613     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
614
615   # FIXME - dirty:
616   # save attributes them in a separate accessor so they are always
617   # introspectable, even in case of a CODE $dbhmaker
618   $self->_dbic_connect_attributes (\%attrs);
619
620   return $self->_connect_info;
621 }
622
623 sub _normalize_connect_info {
624   my ($self, $info_arg) = @_;
625   my %info;
626
627   my @args = @$info_arg;  # take a shallow copy for further mutilation
628
629   # combine/pre-parse arguments depending on invocation style
630
631   my %attrs;
632   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
633     %attrs = %{ $args[1] || {} };
634     @args = $args[0];
635   }
636   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
637     %attrs = %{$args[0]};
638     @args = ();
639     if (my $code = delete $attrs{dbh_maker}) {
640       @args = $code;
641
642       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
643       if (@ignored) {
644         carp sprintf (
645             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
646           . "to the result of 'dbh_maker'",
647
648           join (', ', map { "'$_'" } (@ignored) ),
649         );
650       }
651     }
652     else {
653       @args = delete @attrs{qw/dsn user password/};
654     }
655   }
656   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
657     %attrs = (
658       % { $args[3] || {} },
659       % { $args[4] || {} },
660     );
661     @args = @args[0,1,2];
662   }
663
664   $info{arguments} = \@args;
665
666   my @storage_opts = grep exists $attrs{$_},
667     @storage_options, 'cursor_class';
668
669   @{ $info{storage_options} }{@storage_opts} =
670     delete @attrs{@storage_opts} if @storage_opts;
671
672   my @sql_maker_opts = grep exists $attrs{$_},
673     qw/limit_dialect quote_char name_sep/;
674
675   @{ $info{sql_maker_options} }{@sql_maker_opts} =
676     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
677
678   $info{attributes} = \%attrs if %attrs;
679
680   return \%info;
681 }
682
683 sub _default_dbi_connect_attributes {
684   return {
685     AutoCommit => 1,
686     RaiseError => 1,
687     PrintError => 0,
688   };
689 }
690
691 =head2 on_connect_do
692
693 This method is deprecated in favour of setting via L</connect_info>.
694
695 =cut
696
697 =head2 on_disconnect_do
698
699 This method is deprecated in favour of setting via L</connect_info>.
700
701 =cut
702
703 sub _parse_connect_do {
704   my ($self, $type) = @_;
705
706   my $val = $self->$type;
707   return () if not defined $val;
708
709   my @res;
710
711   if (not ref($val)) {
712     push @res, [ 'do_sql', $val ];
713   } elsif (ref($val) eq 'CODE') {
714     push @res, $val;
715   } elsif (ref($val) eq 'ARRAY') {
716     push @res, map { [ 'do_sql', $_ ] } @$val;
717   } else {
718     $self->throw_exception("Invalid type for $type: ".ref($val));
719   }
720
721   return \@res;
722 }
723
724 =head2 dbh_do
725
726 Arguments: ($subref | $method_name), @extra_coderef_args?
727
728 Execute the given $subref or $method_name using the new exception-based
729 connection management.
730
731 The first two arguments will be the storage object that C<dbh_do> was called
732 on and a database handle to use.  Any additional arguments will be passed
733 verbatim to the called subref as arguments 2 and onwards.
734
735 Using this (instead of $self->_dbh or $self->dbh) ensures correct
736 exception handling and reconnection (or failover in future subclasses).
737
738 Your subref should have no side-effects outside of the database, as
739 there is the potential for your subref to be partially double-executed
740 if the database connection was stale/dysfunctional.
741
742 Example:
743
744   my @stuff = $schema->storage->dbh_do(
745     sub {
746       my ($storage, $dbh, @cols) = @_;
747       my $cols = join(q{, }, @cols);
748       $dbh->selectrow_array("SELECT $cols FROM foo");
749     },
750     @column_list
751   );
752
753 =cut
754
755 sub dbh_do {
756   my $self = shift;
757   my $code = shift;
758
759   my $dbh = $self->_get_dbh;
760
761   return $self->$code($dbh, @_)
762     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
763
764   local $self->{_in_dbh_do} = 1;
765
766   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
767   my $args = \@_;
768   return try {
769     $self->$code ($dbh, @$args);
770   } catch {
771     $self->throw_exception($_) if $self->connected;
772
773     # We were not connected - reconnect and retry, but let any
774     #  exception fall right through this time
775     carp "Retrying $code after catching disconnected exception: $_"
776       if $ENV{DBIC_DBIRETRY_DEBUG};
777
778     $self->_populate_dbh;
779     $self->$code($self->_dbh, @$args);
780   };
781 }
782
783 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
784 # It also informs dbh_do to bypass itself while under the direction of txn_do,
785 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
786 sub txn_do {
787   my $self = shift;
788   my $coderef = shift;
789
790   ref $coderef eq 'CODE' or $self->throw_exception
791     ('$coderef must be a CODE reference');
792
793   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
794
795   local $self->{_in_dbh_do} = 1;
796
797   my @result;
798   my $want_array = wantarray;
799
800   my $tried = 0;
801   while(1) {
802     my $exception;
803
804     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
805     my $args = \@_;
806
807     try {
808       $self->_get_dbh;
809
810       $self->txn_begin;
811       if($want_array) {
812           @result = $coderef->(@$args);
813       }
814       elsif(defined $want_array) {
815           $result[0] = $coderef->(@$args);
816       }
817       else {
818           $coderef->(@$args);
819       }
820       $self->txn_commit;
821     } catch {
822       $exception = $_;
823     };
824
825     if(! defined $exception) { return $want_array ? @result : $result[0] }
826
827     if($tried++ || $self->connected) {
828       my $rollback_exception;
829       try { $self->txn_rollback } catch { $rollback_exception = shift };
830       if(defined $rollback_exception) {
831         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
832         $self->throw_exception($exception)  # propagate nested rollback
833           if $rollback_exception =~ /$exception_class/;
834
835         $self->throw_exception(
836           "Transaction aborted: ${exception}. "
837           . "Rollback failed: ${rollback_exception}"
838         );
839       }
840       $self->throw_exception($exception)
841     }
842
843     # We were not connected, and was first try - reconnect and retry
844     # via the while loop
845     carp "Retrying $coderef after catching disconnected exception: $exception"
846       if $ENV{DBIC_DBIRETRY_DEBUG};
847     $self->_populate_dbh;
848   }
849 }
850
851 =head2 disconnect
852
853 Our C<disconnect> method also performs a rollback first if the
854 database is not in C<AutoCommit> mode.
855
856 =cut
857
858 sub disconnect {
859   my ($self) = @_;
860
861   if( $self->_dbh ) {
862     my @actions;
863
864     push @actions, ( $self->on_disconnect_call || () );
865     push @actions, $self->_parse_connect_do ('on_disconnect_do');
866
867     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
868
869     $self->_dbh_rollback unless $self->_dbh_autocommit;
870
871     %{ $self->_dbh->{CachedKids} } = ();
872     $self->_dbh->disconnect;
873     $self->_dbh(undef);
874     $self->{_dbh_gen}++;
875   }
876 }
877
878 =head2 with_deferred_fk_checks
879
880 =over 4
881
882 =item Arguments: C<$coderef>
883
884 =item Return Value: The return value of $coderef
885
886 =back
887
888 Storage specific method to run the code ref with FK checks deferred or
889 in MySQL's case disabled entirely.
890
891 =cut
892
893 # Storage subclasses should override this
894 sub with_deferred_fk_checks {
895   my ($self, $sub) = @_;
896   $sub->();
897 }
898
899 =head2 connected
900
901 =over
902
903 =item Arguments: none
904
905 =item Return Value: 1|0
906
907 =back
908
909 Verifies that the current database handle is active and ready to execute
910 an SQL statement (e.g. the connection did not get stale, server is still
911 answering, etc.) This method is used internally by L</dbh>.
912
913 =cut
914
915 sub connected {
916   my $self = shift;
917   return 0 unless $self->_seems_connected;
918
919   #be on the safe side
920   local $self->_dbh->{RaiseError} = 1;
921
922   return $self->_ping;
923 }
924
925 sub _seems_connected {
926   my $self = shift;
927
928   $self->_preserve_foreign_dbh;
929
930   my $dbh = $self->_dbh
931     or return 0;
932
933   return $dbh->FETCH('Active');
934 }
935
936 sub _ping {
937   my $self = shift;
938
939   my $dbh = $self->_dbh or return 0;
940
941   return $dbh->ping;
942 }
943
944 sub ensure_connected {
945   my ($self) = @_;
946
947   unless ($self->connected) {
948     $self->_populate_dbh;
949   }
950 }
951
952 =head2 dbh
953
954 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
955 is guaranteed to be healthy by implicitly calling L</connected>, and if
956 necessary performing a reconnection before returning. Keep in mind that this
957 is very B<expensive> on some database engines. Consider using L</dbh_do>
958 instead.
959
960 =cut
961
962 sub dbh {
963   my ($self) = @_;
964
965   if (not $self->_dbh) {
966     $self->_populate_dbh;
967   } else {
968     $self->ensure_connected;
969   }
970   return $self->_dbh;
971 }
972
973 # this is the internal "get dbh or connect (don't check)" method
974 sub _get_dbh {
975   my $self = shift;
976   $self->_preserve_foreign_dbh;
977   $self->_populate_dbh unless $self->_dbh;
978   return $self->_dbh;
979 }
980
981 sub sql_maker {
982   my ($self) = @_;
983   unless ($self->_sql_maker) {
984     my $sql_maker_class = $self->sql_maker_class;
985     $self->ensure_class_loaded ($sql_maker_class);
986
987     my %opts = %{$self->_sql_maker_opts||{}};
988     my $dialect =
989       $opts{limit_dialect}
990         ||
991       $self->sql_limit_dialect
992         ||
993       do {
994         my $s_class = (ref $self) || $self;
995         carp (
996           "Your storage class ($s_class) does not set sql_limit_dialect and you "
997         . 'have not supplied an explicit limit_dialect in your connection_info. '
998         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
999         . 'databases but can be (and often is) painfully slow.'
1000         );
1001
1002         'GenericSubQ';
1003       }
1004     ;
1005
1006     $self->_sql_maker($sql_maker_class->new(
1007       bindtype=>'columns',
1008       array_datatypes => 1,
1009       limit_dialect => $dialect,
1010       %opts,
1011     ));
1012   }
1013   return $self->_sql_maker;
1014 }
1015
1016 # nothing to do by default
1017 sub _rebless {}
1018 sub _init {}
1019
1020 sub _populate_dbh {
1021   my ($self) = @_;
1022
1023   my @info = @{$self->_dbi_connect_info || []};
1024   $self->_dbh(undef); # in case ->connected failed we might get sent here
1025   $self->_dbh_details({}); # reset everything we know
1026
1027   $self->_dbh($self->_connect(@info));
1028
1029   $self->_conn_pid($$);
1030   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1031
1032   $self->_determine_driver;
1033
1034   # Always set the transaction depth on connect, since
1035   #  there is no transaction in progress by definition
1036   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1037
1038   $self->_run_connection_actions unless $self->{_in_determine_driver};
1039 }
1040
1041 sub _run_connection_actions {
1042   my $self = shift;
1043   my @actions;
1044
1045   push @actions, ( $self->on_connect_call || () );
1046   push @actions, $self->_parse_connect_do ('on_connect_do');
1047
1048   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1049 }
1050
1051
1052
1053 sub set_use_dbms_capability {
1054   $_[0]->set_inherited ($_[1], $_[2]);
1055 }
1056
1057 sub get_use_dbms_capability {
1058   my ($self, $capname) = @_;
1059
1060   my $use = $self->get_inherited ($capname);
1061   return defined $use
1062     ? $use
1063     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1064   ;
1065 }
1066
1067 sub set_dbms_capability {
1068   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1069 }
1070
1071 sub get_dbms_capability {
1072   my ($self, $capname) = @_;
1073
1074   my $cap = $self->_dbh_details->{capability}{$capname};
1075
1076   unless (defined $cap) {
1077     if (my $meth = $self->can ("_determine$capname")) {
1078       $cap = $self->$meth ? 1 : 0;
1079     }
1080     else {
1081       $cap = 0;
1082     }
1083
1084     $self->set_dbms_capability ($capname, $cap);
1085   }
1086
1087   return $cap;
1088 }
1089
1090 sub _server_info {
1091   my $self = shift;
1092
1093   my $info;
1094   unless ($info = $self->_dbh_details->{info}) {
1095
1096     $info = {};
1097
1098     my $server_version = try { $self->_get_server_version };
1099
1100     if (defined $server_version) {
1101       $info->{dbms_version} = $server_version;
1102
1103       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1104       my @verparts = split (/\./, $numeric_version);
1105       if (
1106         @verparts
1107           &&
1108         $verparts[0] <= 999
1109       ) {
1110         # consider only up to 3 version parts, iff not more than 3 digits
1111         my @use_parts;
1112         while (@verparts && @use_parts < 3) {
1113           my $p = shift @verparts;
1114           last if $p > 999;
1115           push @use_parts, $p;
1116         }
1117         push @use_parts, 0 while @use_parts < 3;
1118
1119         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1120       }
1121     }
1122
1123     $self->_dbh_details->{info} = $info;
1124   }
1125
1126   return $info;
1127 }
1128
1129 sub _get_server_version {
1130   shift->_get_dbh->get_info(18);
1131 }
1132
1133 sub _determine_driver {
1134   my ($self) = @_;
1135
1136   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1137     my $started_connected = 0;
1138     local $self->{_in_determine_driver} = 1;
1139
1140     if (ref($self) eq __PACKAGE__) {
1141       my $driver;
1142       if ($self->_dbh) { # we are connected
1143         $driver = $self->_dbh->{Driver}{Name};
1144         $started_connected = 1;
1145       } else {
1146         # if connect_info is a CODEREF, we have no choice but to connect
1147         if (ref $self->_dbi_connect_info->[0] &&
1148             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1149           $self->_populate_dbh;
1150           $driver = $self->_dbh->{Driver}{Name};
1151         }
1152         else {
1153           # try to use dsn to not require being connected, the driver may still
1154           # force a connection in _rebless to determine version
1155           # (dsn may not be supplied at all if all we do is make a mock-schema)
1156           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1157           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1158           $driver ||= $ENV{DBI_DRIVER};
1159         }
1160       }
1161
1162       if ($driver) {
1163         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1164         if ($self->load_optional_class($storage_class)) {
1165           mro::set_mro($storage_class, 'c3');
1166           bless $self, $storage_class;
1167           $self->_rebless();
1168         }
1169       }
1170     }
1171
1172     $self->_driver_determined(1);
1173
1174     $self->_init; # run driver-specific initializations
1175
1176     $self->_run_connection_actions
1177         if !$started_connected && defined $self->_dbh;
1178   }
1179 }
1180
1181 sub _do_connection_actions {
1182   my $self          = shift;
1183   my $method_prefix = shift;
1184   my $call          = shift;
1185
1186   if (not ref($call)) {
1187     my $method = $method_prefix . $call;
1188     $self->$method(@_);
1189   } elsif (ref($call) eq 'CODE') {
1190     $self->$call(@_);
1191   } elsif (ref($call) eq 'ARRAY') {
1192     if (ref($call->[0]) ne 'ARRAY') {
1193       $self->_do_connection_actions($method_prefix, $_) for @$call;
1194     } else {
1195       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1196     }
1197   } else {
1198     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1199   }
1200
1201   return $self;
1202 }
1203
1204 sub connect_call_do_sql {
1205   my $self = shift;
1206   $self->_do_query(@_);
1207 }
1208
1209 sub disconnect_call_do_sql {
1210   my $self = shift;
1211   $self->_do_query(@_);
1212 }
1213
1214 # override in db-specific backend when necessary
1215 sub connect_call_datetime_setup { 1 }
1216
1217 sub _do_query {
1218   my ($self, $action) = @_;
1219
1220   if (ref $action eq 'CODE') {
1221     $action = $action->($self);
1222     $self->_do_query($_) foreach @$action;
1223   }
1224   else {
1225     # Most debuggers expect ($sql, @bind), so we need to exclude
1226     # the attribute hash which is the second argument to $dbh->do
1227     # furthermore the bind values are usually to be presented
1228     # as named arrayref pairs, so wrap those here too
1229     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1230     my $sql = shift @do_args;
1231     my $attrs = shift @do_args;
1232     my @bind = map { [ undef, $_ ] } @do_args;
1233
1234     $self->_query_start($sql, @bind);
1235     $self->_get_dbh->do($sql, $attrs, @do_args);
1236     $self->_query_end($sql, @bind);
1237   }
1238
1239   return $self;
1240 }
1241
1242 sub _connect {
1243   my ($self, @info) = @_;
1244
1245   $self->throw_exception("You failed to provide any connection info")
1246     if !@info;
1247
1248   my ($old_connect_via, $dbh);
1249
1250   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1251     $old_connect_via = $DBI::connect_via;
1252     $DBI::connect_via = 'connect';
1253   }
1254
1255   try {
1256     if(ref $info[0] eq 'CODE') {
1257        $dbh = $info[0]->();
1258     }
1259     else {
1260        $dbh = DBI->connect(@info);
1261     }
1262
1263     if (!$dbh) {
1264       die $DBI::errstr;
1265     }
1266
1267     unless ($self->unsafe) {
1268
1269       # this odd anonymous coderef dereference is in fact really
1270       # necessary to avoid the unwanted effect described in perl5
1271       # RT#75792
1272       sub {
1273         my $weak_self = $_[0];
1274         weaken $weak_self;
1275
1276         $_[1]->{HandleError} = sub {
1277           if ($weak_self) {
1278             $weak_self->throw_exception("DBI Exception: $_[0]");
1279           }
1280           else {
1281             # the handler may be invoked by something totally out of
1282             # the scope of DBIC
1283             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1284           }
1285         };
1286       }->($self, $dbh);
1287
1288       $dbh->{ShowErrorStatement} = 1;
1289       $dbh->{RaiseError} = 1;
1290       $dbh->{PrintError} = 0;
1291     }
1292   }
1293   catch {
1294     $self->throw_exception("DBI Connection failed: $_")
1295   }
1296   finally {
1297     $DBI::connect_via = $old_connect_via if $old_connect_via;
1298   };
1299
1300   $self->_dbh_autocommit($dbh->{AutoCommit});
1301   $dbh;
1302 }
1303
1304 sub svp_begin {
1305   my ($self, $name) = @_;
1306
1307   $name = $self->_svp_generate_name
1308     unless defined $name;
1309
1310   $self->throw_exception ("You can't use savepoints outside a transaction")
1311     if $self->{transaction_depth} == 0;
1312
1313   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1314     unless $self->can('_svp_begin');
1315
1316   push @{ $self->{savepoints} }, $name;
1317
1318   $self->debugobj->svp_begin($name) if $self->debug;
1319
1320   return $self->_svp_begin($name);
1321 }
1322
1323 sub svp_release {
1324   my ($self, $name) = @_;
1325
1326   $self->throw_exception ("You can't use savepoints outside a transaction")
1327     if $self->{transaction_depth} == 0;
1328
1329   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1330     unless $self->can('_svp_release');
1331
1332   if (defined $name) {
1333     $self->throw_exception ("Savepoint '$name' does not exist")
1334       unless grep { $_ eq $name } @{ $self->{savepoints} };
1335
1336     # Dig through the stack until we find the one we are releasing.  This keeps
1337     # the stack up to date.
1338     my $svp;
1339
1340     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1341   } else {
1342     $name = pop @{ $self->{savepoints} };
1343   }
1344
1345   $self->debugobj->svp_release($name) if $self->debug;
1346
1347   return $self->_svp_release($name);
1348 }
1349
1350 sub svp_rollback {
1351   my ($self, $name) = @_;
1352
1353   $self->throw_exception ("You can't use savepoints outside a transaction")
1354     if $self->{transaction_depth} == 0;
1355
1356   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1357     unless $self->can('_svp_rollback');
1358
1359   if (defined $name) {
1360       # If they passed us a name, verify that it exists in the stack
1361       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1362           $self->throw_exception("Savepoint '$name' does not exist!");
1363       }
1364
1365       # Dig through the stack until we find the one we are releasing.  This keeps
1366       # the stack up to date.
1367       while(my $s = pop(@{ $self->{savepoints} })) {
1368           last if($s eq $name);
1369       }
1370       # Add the savepoint back to the stack, as a rollback doesn't remove the
1371       # named savepoint, only everything after it.
1372       push(@{ $self->{savepoints} }, $name);
1373   } else {
1374       # We'll assume they want to rollback to the last savepoint
1375       $name = $self->{savepoints}->[-1];
1376   }
1377
1378   $self->debugobj->svp_rollback($name) if $self->debug;
1379
1380   return $self->_svp_rollback($name);
1381 }
1382
1383 sub _svp_generate_name {
1384     my ($self) = @_;
1385
1386     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1387 }
1388
1389 sub txn_begin {
1390   my $self = shift;
1391
1392   # this means we have not yet connected and do not know the AC status
1393   # (e.g. coderef $dbh)
1394   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1395
1396   if($self->{transaction_depth} == 0) {
1397     $self->debugobj->txn_begin()
1398       if $self->debug;
1399     $self->_dbh_begin_work;
1400   }
1401   elsif ($self->auto_savepoint) {
1402     $self->svp_begin;
1403   }
1404   $self->{transaction_depth}++;
1405 }
1406
1407 sub _dbh_begin_work {
1408   my $self = shift;
1409
1410   # if the user is utilizing txn_do - good for him, otherwise we need to
1411   # ensure that the $dbh is healthy on BEGIN.
1412   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1413   # will be replaced by a failure of begin_work itself (which will be
1414   # then retried on reconnect)
1415   if ($self->{_in_dbh_do}) {
1416     $self->_dbh->begin_work;
1417   } else {
1418     $self->dbh_do(sub { $_[1]->begin_work });
1419   }
1420 }
1421
1422 sub txn_commit {
1423   my $self = shift;
1424   if ($self->{transaction_depth} == 1) {
1425     $self->debugobj->txn_commit()
1426       if ($self->debug);
1427     $self->_dbh_commit;
1428     $self->{transaction_depth} = 0
1429       if $self->_dbh_autocommit;
1430   }
1431   elsif($self->{transaction_depth} > 1) {
1432     $self->{transaction_depth}--;
1433     $self->svp_release
1434       if $self->auto_savepoint;
1435   }
1436 }
1437
1438 sub _dbh_commit {
1439   my $self = shift;
1440   my $dbh  = $self->_dbh
1441     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1442   $dbh->commit;
1443 }
1444
1445 sub txn_rollback {
1446   my $self = shift;
1447   my $dbh = $self->_dbh;
1448   try {
1449     if ($self->{transaction_depth} == 1) {
1450       $self->debugobj->txn_rollback()
1451         if ($self->debug);
1452       $self->{transaction_depth} = 0
1453         if $self->_dbh_autocommit;
1454       $self->_dbh_rollback;
1455     }
1456     elsif($self->{transaction_depth} > 1) {
1457       $self->{transaction_depth}--;
1458       if ($self->auto_savepoint) {
1459         $self->svp_rollback;
1460         $self->svp_release;
1461       }
1462     }
1463     else {
1464       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1465     }
1466   }
1467   catch {
1468     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1469
1470     if ($_ !~ /$exception_class/) {
1471       # ensure that a failed rollback resets the transaction depth
1472       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1473     }
1474
1475     $self->throw_exception($_)
1476   };
1477 }
1478
1479 sub _dbh_rollback {
1480   my $self = shift;
1481   my $dbh  = $self->_dbh
1482     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1483   $dbh->rollback;
1484 }
1485
1486 # This used to be the top-half of _execute.  It was split out to make it
1487 #  easier to override in NoBindVars without duping the rest.  It takes up
1488 #  all of _execute's args, and emits $sql, @bind.
1489 sub _prep_for_execute {
1490   my ($self, $op, $extra_bind, $ident, $args) = @_;
1491
1492   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1493     $ident = $ident->from();
1494   }
1495
1496   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1497
1498   unshift(@bind,
1499     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1500       if $extra_bind;
1501   return ($sql, \@bind);
1502 }
1503
1504
1505 sub _fix_bind_params {
1506     my ($self, @bind) = @_;
1507
1508     ### Turn @bind from something like this:
1509     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1510     ### to this:
1511     ###   ( "'1'", "'1'", "'3'" )
1512     return
1513         map {
1514             if ( defined( $_ && $_->[1] ) ) {
1515                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1516             }
1517             else { q{'NULL'}; }
1518         } @bind;
1519 }
1520
1521 sub _query_start {
1522     my ( $self, $sql, @bind ) = @_;
1523
1524     if ( $self->debug ) {
1525         @bind = $self->_fix_bind_params(@bind);
1526
1527         $self->debugobj->query_start( $sql, @bind );
1528     }
1529 }
1530
1531 sub _query_end {
1532     my ( $self, $sql, @bind ) = @_;
1533
1534     if ( $self->debug ) {
1535         @bind = $self->_fix_bind_params(@bind);
1536         $self->debugobj->query_end( $sql, @bind );
1537     }
1538 }
1539
1540 sub _dbh_execute {
1541   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1542
1543   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1544
1545   $self->_query_start( $sql, @$bind );
1546
1547   my $sth = $self->sth($sql,$op);
1548
1549   my $placeholder_index = 1;
1550
1551   foreach my $bound (@$bind) {
1552     my $attributes = {};
1553     my($column_name, @data) = @$bound;
1554
1555     if ($bind_attributes) {
1556       $attributes = $bind_attributes->{$column_name}
1557       if defined $bind_attributes->{$column_name};
1558     }
1559
1560     foreach my $data (@data) {
1561       my $ref = ref $data;
1562       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1563
1564       $sth->bind_param($placeholder_index, $data, $attributes);
1565       $placeholder_index++;
1566     }
1567   }
1568
1569   # Can this fail without throwing an exception anyways???
1570   my $rv = $sth->execute();
1571   $self->throw_exception(
1572     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1573   ) if !$rv;
1574
1575   $self->_query_end( $sql, @$bind );
1576
1577   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1578 }
1579
1580 sub _execute {
1581     my $self = shift;
1582     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1583 }
1584
1585 sub _prefetch_insert_auto_nextvals {
1586   my ($self, $source, $to_insert) = @_;
1587
1588   my $upd = {};
1589
1590   foreach my $col ( $source->columns ) {
1591     if ( !defined $to_insert->{$col} ) {
1592       my $col_info = $source->column_info($col);
1593
1594       if ( $col_info->{auto_nextval} ) {
1595         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1596           'nextval',
1597           $col_info->{sequence} ||=
1598             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1599         );
1600       }
1601     }
1602   }
1603
1604   return $upd;
1605 }
1606
1607 sub insert {
1608   my $self = shift;
1609   my ($source, $to_insert, $opts) = @_;
1610
1611   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1612
1613   my $bind_attributes = $self->source_bind_attributes($source);
1614
1615   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1616
1617   if ($opts->{returning}) {
1618     my @ret_cols = @{$opts->{returning}};
1619
1620     my @ret_vals = try {
1621       local $SIG{__WARN__} = sub {};
1622       my @r = $sth->fetchrow_array;
1623       $sth->finish;
1624       @r;
1625     };
1626
1627     my %ret;
1628     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1629
1630     $updated_cols = {
1631       %$updated_cols,
1632       %ret,
1633     };
1634   }
1635
1636   return $updated_cols;
1637 }
1638
1639 ## Currently it is assumed that all values passed will be "normal", i.e. not
1640 ## scalar refs, or at least, all the same type as the first set, the statement is
1641 ## only prepped once.
1642 sub insert_bulk {
1643   my ($self, $source, $cols, $data) = @_;
1644
1645   my %colvalues;
1646   @colvalues{@$cols} = (0..$#$cols);
1647
1648   for my $i (0..$#$cols) {
1649     my $first_val = $data->[0][$i];
1650     next unless ref $first_val eq 'SCALAR';
1651
1652     $colvalues{ $cols->[$i] } = $first_val;
1653   }
1654
1655   # check for bad data and stringify stringifiable objects
1656   my $bad_slice = sub {
1657     my ($msg, $col_idx, $slice_idx) = @_;
1658     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1659       $msg,
1660       $cols->[$col_idx],
1661       do {
1662         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1663         Dumper {
1664           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1665         },
1666       }
1667     );
1668   };
1669
1670   for my $datum_idx (0..$#$data) {
1671     my $datum = $data->[$datum_idx];
1672
1673     for my $col_idx (0..$#$cols) {
1674       my $val            = $datum->[$col_idx];
1675       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1676       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1677
1678       if ($is_literal_sql) {
1679         if (not ref $val) {
1680           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1681         }
1682         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1683           $bad_slice->("$reftype reference found where literal SQL expected",
1684             $col_idx, $datum_idx);
1685         }
1686         elsif ($$val ne $$sqla_bind){
1687           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1688             $col_idx, $datum_idx);
1689         }
1690       }
1691       elsif (my $reftype = ref $val) {
1692         require overload;
1693         if (overload::Method($val, '""')) {
1694           $datum->[$col_idx] = "".$val;
1695         }
1696         else {
1697           $bad_slice->("$reftype reference found where bind expected",
1698             $col_idx, $datum_idx);
1699         }
1700       }
1701     }
1702   }
1703
1704   my ($sql, $bind) = $self->_prep_for_execute (
1705     'insert', undef, $source, [\%colvalues]
1706   );
1707   my @bind = @$bind;
1708
1709   my $empty_bind = 1 if (not @bind) &&
1710     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1711
1712   if ((not @bind) && (not $empty_bind)) {
1713     $self->throw_exception(
1714       'Cannot insert_bulk without support for placeholders'
1715     );
1716   }
1717
1718   # neither _execute_array, nor _execute_inserts_with_no_binds are
1719   # atomic (even if _execute _array is a single call). Thus a safety
1720   # scope guard
1721   my $guard = $self->txn_scope_guard;
1722
1723   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1724   my $sth = $self->sth($sql);
1725   my $rv = do {
1726     if ($empty_bind) {
1727       # bind_param_array doesn't work if there are no binds
1728       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1729     }
1730     else {
1731 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1732       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1733     }
1734   };
1735
1736   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1737
1738   $guard->commit;
1739
1740   return (wantarray ? ($rv, $sth, @bind) : $rv);
1741 }
1742
1743 sub _execute_array {
1744   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1745
1746   ## This must be an arrayref, else nothing works!
1747   my $tuple_status = [];
1748
1749   ## Get the bind_attributes, if any exist
1750   my $bind_attributes = $self->source_bind_attributes($source);
1751
1752   ## Bind the values and execute
1753   my $placeholder_index = 1;
1754
1755   foreach my $bound (@$bind) {
1756
1757     my $attributes = {};
1758     my ($column_name, $data_index) = @$bound;
1759
1760     if( $bind_attributes ) {
1761       $attributes = $bind_attributes->{$column_name}
1762       if defined $bind_attributes->{$column_name};
1763     }
1764
1765     my @data = map { $_->[$data_index] } @$data;
1766
1767     $sth->bind_param_array(
1768       $placeholder_index,
1769       [@data],
1770       (%$attributes ?  $attributes : ()),
1771     );
1772     $placeholder_index++;
1773   }
1774
1775   my ($rv, $err);
1776   try {
1777     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1778   }
1779   catch {
1780     $err = shift;
1781   }
1782   finally {
1783     # Statement must finish even if there was an exception.
1784     try {
1785       $sth->finish
1786     }
1787     catch {
1788       $err = shift unless defined $err
1789     };
1790   };
1791
1792   $err = $sth->errstr
1793     if (! defined $err and $sth->err);
1794
1795   if (defined $err) {
1796     my $i = 0;
1797     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1798
1799     $self->throw_exception("Unexpected populate error: $err")
1800       if ($i > $#$tuple_status);
1801
1802     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1803       ($tuple_status->[$i][1] || $err),
1804       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1805     );
1806   }
1807
1808   return $rv;
1809 }
1810
1811 sub _dbh_execute_array {
1812     my ($self, $sth, $tuple_status, @extra) = @_;
1813
1814     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1815 }
1816
1817 sub _dbh_execute_inserts_with_no_binds {
1818   my ($self, $sth, $count) = @_;
1819
1820   my $err;
1821   try {
1822     my $dbh = $self->_get_dbh;
1823     local $dbh->{RaiseError} = 1;
1824     local $dbh->{PrintError} = 0;
1825
1826     $sth->execute foreach 1..$count;
1827   }
1828   catch {
1829     $err = shift;
1830   }
1831   finally {
1832     # Make sure statement is finished even if there was an exception.
1833     try {
1834       $sth->finish
1835     }
1836     catch {
1837       $err = shift unless defined $err;
1838     };
1839   };
1840
1841   $self->throw_exception($err) if defined $err;
1842
1843   return $count;
1844 }
1845
1846 sub update {
1847   my ($self, $source, @args) = @_;
1848
1849   my $bind_attrs = $self->source_bind_attributes($source);
1850
1851   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1852 }
1853
1854
1855 sub delete {
1856   my ($self, $source, @args) = @_;
1857
1858   my $bind_attrs = $self->source_bind_attributes($source);
1859
1860   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1861 }
1862
1863 # We were sent here because the $rs contains a complex search
1864 # which will require a subquery to select the correct rows
1865 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1866 #
1867 # Generating a single PK column subquery is trivial and supported
1868 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1869 # Look at _multipk_update_delete()
1870 sub _subq_update_delete {
1871   my $self = shift;
1872   my ($rs, $op, $values) = @_;
1873
1874   my $rsrc = $rs->result_source;
1875
1876   # quick check if we got a sane rs on our hands
1877   my @pcols = $rsrc->_pri_cols;
1878
1879   my $sel = $rs->_resolved_attrs->{select};
1880   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1881
1882   if (
1883       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1884         ne
1885       join ("\x00", sort @$sel )
1886   ) {
1887     $self->throw_exception (
1888       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1889     );
1890   }
1891
1892   if (@pcols == 1) {
1893     return $self->$op (
1894       $rsrc,
1895       $op eq 'update' ? $values : (),
1896       { $pcols[0] => { -in => $rs->as_query } },
1897     );
1898   }
1899
1900   else {
1901     return $self->_multipk_update_delete (@_);
1902   }
1903 }
1904
1905 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1906 # resultset update/delete involving subqueries. So by default resort
1907 # to simple (and inefficient) delete_all style per-row opearations,
1908 # while allowing specific storages to override this with a faster
1909 # implementation.
1910 #
1911 sub _multipk_update_delete {
1912   return shift->_per_row_update_delete (@_);
1913 }
1914
1915 # This is the default loop used to delete/update rows for multi PK
1916 # resultsets, and used by mysql exclusively (because it can't do anything
1917 # else).
1918 #
1919 # We do not use $row->$op style queries, because resultset update/delete
1920 # is not expected to cascade (this is what delete_all/update_all is for).
1921 #
1922 # There should be no race conditions as the entire operation is rolled
1923 # in a transaction.
1924 #
1925 sub _per_row_update_delete {
1926   my $self = shift;
1927   my ($rs, $op, $values) = @_;
1928
1929   my $rsrc = $rs->result_source;
1930   my @pcols = $rsrc->_pri_cols;
1931
1932   my $guard = $self->txn_scope_guard;
1933
1934   # emulate the return value of $sth->execute for non-selects
1935   my $row_cnt = '0E0';
1936
1937   my $subrs_cur = $rs->cursor;
1938   my @all_pk = $subrs_cur->all;
1939   for my $pks ( @all_pk) {
1940
1941     my $cond;
1942     for my $i (0.. $#pcols) {
1943       $cond->{$pcols[$i]} = $pks->[$i];
1944     }
1945
1946     $self->$op (
1947       $rsrc,
1948       $op eq 'update' ? $values : (),
1949       $cond,
1950     );
1951
1952     $row_cnt++;
1953   }
1954
1955   $guard->commit;
1956
1957   return $row_cnt;
1958 }
1959
1960 sub _select {
1961   my $self = shift;
1962   $self->_execute($self->_select_args(@_));
1963 }
1964
1965 sub _select_args_to_query {
1966   my $self = shift;
1967
1968   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1969   #  = $self->_select_args($ident, $select, $cond, $attrs);
1970   my ($op, $bind, $ident, $bind_attrs, @args) =
1971     $self->_select_args(@_);
1972
1973   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1974   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1975   $prepared_bind ||= [];
1976
1977   return wantarray
1978     ? ($sql, $prepared_bind, $bind_attrs)
1979     : \[ "($sql)", @$prepared_bind ]
1980   ;
1981 }
1982
1983 sub _select_args {
1984   my ($self, $ident, $select, $where, $attrs) = @_;
1985
1986   my $sql_maker = $self->sql_maker;
1987   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1988
1989   $attrs = {
1990     %$attrs,
1991     select => $select,
1992     from => $ident,
1993     where => $where,
1994     $rs_alias && $alias2source->{$rs_alias}
1995       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1996       : ()
1997     ,
1998   };
1999
2000   # calculate bind_attrs before possible $ident mangling
2001   my $bind_attrs = {};
2002   for my $alias (keys %$alias2source) {
2003     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2004     for my $col (keys %$bindtypes) {
2005
2006       my $fqcn = join ('.', $alias, $col);
2007       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2008
2009       # Unqialified column names are nice, but at the same time can be
2010       # rather ambiguous. What we do here is basically go along with
2011       # the loop, adding an unqualified column slot to $bind_attrs,
2012       # alongside the fully qualified name. As soon as we encounter
2013       # another column by that name (which would imply another table)
2014       # we unset the unqualified slot and never add any info to it
2015       # to avoid erroneous type binding. If this happens the users
2016       # only choice will be to fully qualify his column name
2017
2018       if (exists $bind_attrs->{$col}) {
2019         $bind_attrs->{$col} = {};
2020       }
2021       else {
2022         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2023       }
2024     }
2025   }
2026
2027   # Sanity check the attributes (SQLMaker does it too, but
2028   # in case of a software_limit we'll never reach there)
2029   if (defined $attrs->{offset}) {
2030     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2031       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2032   }
2033   $attrs->{offset} ||= 0;
2034
2035   if (defined $attrs->{rows}) {
2036     $self->throw_exception("The rows attribute must be a positive integer if present")
2037       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2038   }
2039   elsif ($attrs->{offset}) {
2040     # MySQL actually recommends this approach.  I cringe.
2041     $attrs->{rows} = $sql_maker->__max_int;
2042   }
2043
2044   my @limit;
2045
2046   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2047   # storage, unless software limit was requested
2048   if (
2049     #limited has_many
2050     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2051        ||
2052     # grouped prefetch (to satisfy group_by == select)
2053     ( $attrs->{group_by}
2054         &&
2055       @{$attrs->{group_by}}
2056         &&
2057       $attrs->{_prefetch_select}
2058         &&
2059       @{$attrs->{_prefetch_select}}
2060     )
2061   ) {
2062     ($ident, $select, $where, $attrs)
2063       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2064   }
2065   elsif (! $attrs->{software_limit} ) {
2066     push @limit, $attrs->{rows}, $attrs->{offset};
2067   }
2068
2069   # try to simplify the joinmap further (prune unreferenced type-single joins)
2070   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2071
2072 ###
2073   # This would be the point to deflate anything found in $where
2074   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2075   # expect a row object. And all we have is a resultsource (it is trivial
2076   # to extract deflator coderefs via $alias2source above).
2077   #
2078   # I don't see a way forward other than changing the way deflators are
2079   # invoked, and that's just bad...
2080 ###
2081
2082   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2083 }
2084
2085 # Returns a counting SELECT for a simple count
2086 # query. Abstracted so that a storage could override
2087 # this to { count => 'firstcol' } or whatever makes
2088 # sense as a performance optimization
2089 sub _count_select {
2090   #my ($self, $source, $rs_attrs) = @_;
2091   return { count => '*' };
2092 }
2093
2094
2095 sub source_bind_attributes {
2096   my ($self, $source) = @_;
2097
2098   my $bind_attributes;
2099   foreach my $column ($source->columns) {
2100
2101     my $data_type = $source->column_info($column)->{data_type} || '';
2102     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2103      if $data_type;
2104   }
2105
2106   return $bind_attributes;
2107 }
2108
2109 =head2 select
2110
2111 =over 4
2112
2113 =item Arguments: $ident, $select, $condition, $attrs
2114
2115 =back
2116
2117 Handle a SQL select statement.
2118
2119 =cut
2120
2121 sub select {
2122   my $self = shift;
2123   my ($ident, $select, $condition, $attrs) = @_;
2124   return $self->cursor_class->new($self, \@_, $attrs);
2125 }
2126
2127 sub select_single {
2128   my $self = shift;
2129   my ($rv, $sth, @bind) = $self->_select(@_);
2130   my @row = $sth->fetchrow_array;
2131   my @nextrow = $sth->fetchrow_array if @row;
2132   if(@row && @nextrow) {
2133     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2134   }
2135   # Need to call finish() to work round broken DBDs
2136   $sth->finish();
2137   return @row;
2138 }
2139
2140 =head2 sql_limit_dialect
2141
2142 This is an accessor for the default SQL limit dialect used by a particular
2143 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2144 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2145 see L<DBIx::Class::SQLMaker::LimitDialects>.
2146
2147 =head2 sth
2148
2149 =over 4
2150
2151 =item Arguments: $sql
2152
2153 =back
2154
2155 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2156
2157 =cut
2158
2159 sub _dbh_sth {
2160   my ($self, $dbh, $sql) = @_;
2161
2162   # 3 is the if_active parameter which avoids active sth re-use
2163   my $sth = $self->disable_sth_caching
2164     ? $dbh->prepare($sql)
2165     : $dbh->prepare_cached($sql, {}, 3);
2166
2167   # XXX You would think RaiseError would make this impossible,
2168   #  but apparently that's not true :(
2169   $self->throw_exception($dbh->errstr) if !$sth;
2170
2171   $sth;
2172 }
2173
2174 sub sth {
2175   my ($self, $sql) = @_;
2176   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2177 }
2178
2179 sub _dbh_columns_info_for {
2180   my ($self, $dbh, $table) = @_;
2181
2182   if ($dbh->can('column_info')) {
2183     my %result;
2184     my $caught;
2185     try {
2186       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2187       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2188       $sth->execute();
2189       while ( my $info = $sth->fetchrow_hashref() ){
2190         my %column_info;
2191         $column_info{data_type}   = $info->{TYPE_NAME};
2192         $column_info{size}      = $info->{COLUMN_SIZE};
2193         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2194         $column_info{default_value} = $info->{COLUMN_DEF};
2195         my $col_name = $info->{COLUMN_NAME};
2196         $col_name =~ s/^\"(.*)\"$/$1/;
2197
2198         $result{$col_name} = \%column_info;
2199       }
2200     } catch {
2201       $caught = 1;
2202     };
2203     return \%result if !$caught && scalar keys %result;
2204   }
2205
2206   my %result;
2207   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2208   $sth->execute;
2209   my @columns = @{$sth->{NAME_lc}};
2210   for my $i ( 0 .. $#columns ){
2211     my %column_info;
2212     $column_info{data_type} = $sth->{TYPE}->[$i];
2213     $column_info{size} = $sth->{PRECISION}->[$i];
2214     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2215
2216     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2217       $column_info{data_type} = $1;
2218       $column_info{size}    = $2;
2219     }
2220
2221     $result{$columns[$i]} = \%column_info;
2222   }
2223   $sth->finish;
2224
2225   foreach my $col (keys %result) {
2226     my $colinfo = $result{$col};
2227     my $type_num = $colinfo->{data_type};
2228     my $type_name;
2229     if(defined $type_num && $dbh->can('type_info')) {
2230       my $type_info = $dbh->type_info($type_num);
2231       $type_name = $type_info->{TYPE_NAME} if $type_info;
2232       $colinfo->{data_type} = $type_name if $type_name;
2233     }
2234   }
2235
2236   return \%result;
2237 }
2238
2239 sub columns_info_for {
2240   my ($self, $table) = @_;
2241   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2242 }
2243
2244 =head2 last_insert_id
2245
2246 Return the row id of the last insert.
2247
2248 =cut
2249
2250 sub _dbh_last_insert_id {
2251     my ($self, $dbh, $source, $col) = @_;
2252
2253     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2254
2255     return $id if defined $id;
2256
2257     my $class = ref $self;
2258     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2259 }
2260
2261 sub last_insert_id {
2262   my $self = shift;
2263   $self->_dbh_last_insert_id ($self->_dbh, @_);
2264 }
2265
2266 =head2 _native_data_type
2267
2268 =over 4
2269
2270 =item Arguments: $type_name
2271
2272 =back
2273
2274 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2275 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2276 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2277
2278 The default implementation returns C<undef>, implement in your Storage driver if
2279 you need this functionality.
2280
2281 Should map types from other databases to the native RDBMS type, for example
2282 C<VARCHAR2> to C<VARCHAR>.
2283
2284 Types with modifiers should map to the underlying data type. For example,
2285 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2286
2287 Composite types should map to the container type, for example
2288 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2289
2290 =cut
2291
2292 sub _native_data_type {
2293   #my ($self, $data_type) = @_;
2294   return undef
2295 }
2296
2297 # Check if placeholders are supported at all
2298 sub _determine_supports_placeholders {
2299   my $self = shift;
2300   my $dbh  = $self->_get_dbh;
2301
2302   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2303   # but it is inaccurate more often than not
2304   return try {
2305     local $dbh->{PrintError} = 0;
2306     local $dbh->{RaiseError} = 1;
2307     $dbh->do('select ?', {}, 1);
2308     1;
2309   }
2310   catch {
2311     0;
2312   };
2313 }
2314
2315 # Check if placeholders bound to non-string types throw exceptions
2316 #
2317 sub _determine_supports_typeless_placeholders {
2318   my $self = shift;
2319   my $dbh  = $self->_get_dbh;
2320
2321   return try {
2322     local $dbh->{PrintError} = 0;
2323     local $dbh->{RaiseError} = 1;
2324     # this specifically tests a bind that is NOT a string
2325     $dbh->do('select 1 where 1 = ?', {}, 1);
2326     1;
2327   }
2328   catch {
2329     0;
2330   };
2331 }
2332
2333 =head2 sqlt_type
2334
2335 Returns the database driver name.
2336
2337 =cut
2338
2339 sub sqlt_type {
2340   shift->_get_dbh->{Driver}->{Name};
2341 }
2342
2343 =head2 bind_attribute_by_data_type
2344
2345 Given a datatype from column info, returns a database specific bind
2346 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2347 let the database planner just handle it.
2348
2349 Generally only needed for special case column types, like bytea in postgres.
2350
2351 =cut
2352
2353 sub bind_attribute_by_data_type {
2354     return;
2355 }
2356
2357 =head2 is_datatype_numeric
2358
2359 Given a datatype from column_info, returns a boolean value indicating if
2360 the current RDBMS considers it a numeric value. This controls how
2361 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2362 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2363 be performed instead of the usual C<eq>.
2364
2365 =cut
2366
2367 sub is_datatype_numeric {
2368   my ($self, $dt) = @_;
2369
2370   return 0 unless $dt;
2371
2372   return $dt =~ /^ (?:
2373     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2374   ) $/ix;
2375 }
2376
2377
2378 =head2 create_ddl_dir
2379
2380 =over 4
2381
2382 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2383
2384 =back
2385
2386 Creates a SQL file based on the Schema, for each of the specified
2387 database engines in C<\@databases> in the given directory.
2388 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2389
2390 Given a previous version number, this will also create a file containing
2391 the ALTER TABLE statements to transform the previous schema into the
2392 current one. Note that these statements may contain C<DROP TABLE> or
2393 C<DROP COLUMN> statements that can potentially destroy data.
2394
2395 The file names are created using the C<ddl_filename> method below, please
2396 override this method in your schema if you would like a different file
2397 name format. For the ALTER file, the same format is used, replacing
2398 $version in the name with "$preversion-$version".
2399
2400 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2401 The most common value for this would be C<< { add_drop_table => 1 } >>
2402 to have the SQL produced include a C<DROP TABLE> statement for each table
2403 created. For quoting purposes supply C<quote_table_names> and
2404 C<quote_field_names>.
2405
2406 If no arguments are passed, then the following default values are assumed:
2407
2408 =over 4
2409
2410 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2411
2412 =item version    - $schema->schema_version
2413
2414 =item directory  - './'
2415
2416 =item preversion - <none>
2417
2418 =back
2419
2420 By default, C<\%sqlt_args> will have
2421
2422  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2423
2424 merged with the hash passed in. To disable any of those features, pass in a
2425 hashref like the following
2426
2427  { ignore_constraint_names => 0, # ... other options }
2428
2429
2430 WARNING: You are strongly advised to check all SQL files created, before applying
2431 them.
2432
2433 =cut
2434
2435 sub create_ddl_dir {
2436   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2437
2438   unless ($dir) {
2439     carp "No directory given, using ./\n";
2440     $dir = './';
2441   } else {
2442       -d $dir
2443         or
2444       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2445         or
2446       $self->throw_exception(
2447         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2448       );
2449   }
2450
2451   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2452
2453   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2454   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2455
2456   my $schema_version = $schema->schema_version || '1.x';
2457   $version ||= $schema_version;
2458
2459   $sqltargs = {
2460     add_drop_table => 1,
2461     ignore_constraint_names => 1,
2462     ignore_index_names => 1,
2463     %{$sqltargs || {}}
2464   };
2465
2466   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2467     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2468   }
2469
2470   my $sqlt = SQL::Translator->new( $sqltargs );
2471
2472   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2473   my $sqlt_schema = $sqlt->translate({ data => $schema })
2474     or $self->throw_exception ($sqlt->error);
2475
2476   foreach my $db (@$databases) {
2477     $sqlt->reset();
2478     $sqlt->{schema} = $sqlt_schema;
2479     $sqlt->producer($db);
2480
2481     my $file;
2482     my $filename = $schema->ddl_filename($db, $version, $dir);
2483     if (-e $filename && ($version eq $schema_version )) {
2484       # if we are dumping the current version, overwrite the DDL
2485       carp "Overwriting existing DDL file - $filename";
2486       unlink($filename);
2487     }
2488
2489     my $output = $sqlt->translate;
2490     if(!$output) {
2491       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2492       next;
2493     }
2494     if(!open($file, ">$filename")) {
2495       $self->throw_exception("Can't open $filename for writing ($!)");
2496       next;
2497     }
2498     print $file $output;
2499     close($file);
2500
2501     next unless ($preversion);
2502
2503     require SQL::Translator::Diff;
2504
2505     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2506     if(!-e $prefilename) {
2507       carp("No previous schema file found ($prefilename)");
2508       next;
2509     }
2510
2511     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2512     if(-e $difffile) {
2513       carp("Overwriting existing diff file - $difffile");
2514       unlink($difffile);
2515     }
2516
2517     my $source_schema;
2518     {
2519       my $t = SQL::Translator->new($sqltargs);
2520       $t->debug( 0 );
2521       $t->trace( 0 );
2522
2523       $t->parser( $db )
2524         or $self->throw_exception ($t->error);
2525
2526       my $out = $t->translate( $prefilename )
2527         or $self->throw_exception ($t->error);
2528
2529       $source_schema = $t->schema;
2530
2531       $source_schema->name( $prefilename )
2532         unless ( $source_schema->name );
2533     }
2534
2535     # The "new" style of producers have sane normalization and can support
2536     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2537     # And we have to diff parsed SQL against parsed SQL.
2538     my $dest_schema = $sqlt_schema;
2539
2540     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2541       my $t = SQL::Translator->new($sqltargs);
2542       $t->debug( 0 );
2543       $t->trace( 0 );
2544
2545       $t->parser( $db )
2546         or $self->throw_exception ($t->error);
2547
2548       my $out = $t->translate( $filename )
2549         or $self->throw_exception ($t->error);
2550
2551       $dest_schema = $t->schema;
2552
2553       $dest_schema->name( $filename )
2554         unless $dest_schema->name;
2555     }
2556
2557     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2558                                                   $dest_schema,   $db,
2559                                                   $sqltargs
2560                                                  );
2561     if(!open $file, ">$difffile") {
2562       $self->throw_exception("Can't write to $difffile ($!)");
2563       next;
2564     }
2565     print $file $diff;
2566     close($file);
2567   }
2568 }
2569
2570 =head2 deployment_statements
2571
2572 =over 4
2573
2574 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2575
2576 =back
2577
2578 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2579
2580 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2581 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2582
2583 C<$directory> is used to return statements from files in a previously created
2584 L</create_ddl_dir> directory and is optional. The filenames are constructed
2585 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2586
2587 If no C<$directory> is specified then the statements are constructed on the
2588 fly using L<SQL::Translator> and C<$version> is ignored.
2589
2590 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2591
2592 =cut
2593
2594 sub deployment_statements {
2595   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2596   $type ||= $self->sqlt_type;
2597   $version ||= $schema->schema_version || '1.x';
2598   $dir ||= './';
2599   my $filename = $schema->ddl_filename($type, $version, $dir);
2600   if(-f $filename)
2601   {
2602       my $file;
2603       open($file, "<$filename")
2604         or $self->throw_exception("Can't open $filename ($!)");
2605       my @rows = <$file>;
2606       close($file);
2607       return join('', @rows);
2608   }
2609
2610   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2611     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2612   }
2613
2614   # sources needs to be a parser arg, but for simplicty allow at top level
2615   # coming in
2616   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2617       if exists $sqltargs->{sources};
2618
2619   my $tr = SQL::Translator->new(
2620     producer => "SQL::Translator::Producer::${type}",
2621     %$sqltargs,
2622     parser => 'SQL::Translator::Parser::DBIx::Class',
2623     data => $schema,
2624   );
2625
2626   my @ret;
2627   my $wa = wantarray;
2628   if ($wa) {
2629     @ret = $tr->translate;
2630   }
2631   else {
2632     $ret[0] = $tr->translate;
2633   }
2634
2635   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2636     unless (@ret && defined $ret[0]);
2637
2638   return $wa ? @ret : $ret[0];
2639 }
2640
2641 sub deploy {
2642   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2643   my $deploy = sub {
2644     my $line = shift;
2645     return if($line =~ /^--/);
2646     return if(!$line);
2647     # next if($line =~ /^DROP/m);
2648     return if($line =~ /^BEGIN TRANSACTION/m);
2649     return if($line =~ /^COMMIT/m);
2650     return if $line =~ /^\s+$/; # skip whitespace only
2651     $self->_query_start($line);
2652     try {
2653       # do a dbh_do cycle here, as we need some error checking in
2654       # place (even though we will ignore errors)
2655       $self->dbh_do (sub { $_[1]->do($line) });
2656     } catch {
2657       carp qq{$_ (running "${line}")};
2658     };
2659     $self->_query_end($line);
2660   };
2661   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2662   if (@statements > 1) {
2663     foreach my $statement (@statements) {
2664       $deploy->( $statement );
2665     }
2666   }
2667   elsif (@statements == 1) {
2668     foreach my $line ( split(";\n", $statements[0])) {
2669       $deploy->( $line );
2670     }
2671   }
2672 }
2673
2674 =head2 datetime_parser
2675
2676 Returns the datetime parser class
2677
2678 =cut
2679
2680 sub datetime_parser {
2681   my $self = shift;
2682   return $self->{datetime_parser} ||= do {
2683     $self->build_datetime_parser(@_);
2684   };
2685 }
2686
2687 =head2 datetime_parser_type
2688
2689 Defines (returns) the datetime parser class - currently hardwired to
2690 L<DateTime::Format::MySQL>
2691
2692 =cut
2693
2694 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2695
2696 =head2 build_datetime_parser
2697
2698 See L</datetime_parser>
2699
2700 =cut
2701
2702 sub build_datetime_parser {
2703   my $self = shift;
2704   my $type = $self->datetime_parser_type(@_);
2705   $self->ensure_class_loaded ($type);
2706   return $type;
2707 }
2708
2709
2710 =head2 is_replicating
2711
2712 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2713 replicate from a master database.  Default is undef, which is the result
2714 returned by databases that don't support replication.
2715
2716 =cut
2717
2718 sub is_replicating {
2719     return;
2720
2721 }
2722
2723 =head2 lag_behind_master
2724
2725 Returns a number that represents a certain amount of lag behind a master db
2726 when a given storage is replicating.  The number is database dependent, but
2727 starts at zero and increases with the amount of lag. Default in undef
2728
2729 =cut
2730
2731 sub lag_behind_master {
2732     return;
2733 }
2734
2735 =head2 relname_to_table_alias
2736
2737 =over 4
2738
2739 =item Arguments: $relname, $join_count
2740
2741 =back
2742
2743 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2744 queries.
2745
2746 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2747 way these aliases are named.
2748
2749 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2750 otherwise C<"$relname">.
2751
2752 =cut
2753
2754 sub relname_to_table_alias {
2755   my ($self, $relname, $join_count) = @_;
2756
2757   my $alias = ($join_count && $join_count > 1 ?
2758     join('_', $relname, $join_count) : $relname);
2759
2760   return $alias;
2761 }
2762
2763 1;
2764
2765 =head1 USAGE NOTES
2766
2767 =head2 DBIx::Class and AutoCommit
2768
2769 DBIx::Class can do some wonderful magic with handling exceptions,
2770 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2771 (the default) combined with C<txn_do> for transaction support.
2772
2773 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2774 in an assumed transaction between commits, and you're telling us you'd
2775 like to manage that manually.  A lot of the magic protections offered by
2776 this module will go away.  We can't protect you from exceptions due to database
2777 disconnects because we don't know anything about how to restart your
2778 transactions.  You're on your own for handling all sorts of exceptional
2779 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2780 be with raw DBI.
2781
2782
2783 =head1 AUTHORS
2784
2785 Matt S. Trout <mst@shadowcatsystems.co.uk>
2786
2787 Andy Grundman <andy@hybridized.org>
2788
2789 =head1 LICENSE
2790
2791 You may distribute this code under the same terms as Perl itself.
2792
2793 =cut