Make _server_info() _determine_driver-bound
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } @capabilities );
60
61
62 # Each of these methods need _determine_driver called before itself
63 # in order to function reliably. This is a purely DRY optimization
64 #
65 # get_(use)_dbms_capability need to be called on the correct Storage
66 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
67 # _determine_supports_X which obv. needs a correct driver as well
68 my @rdbms_specific_methods = qw/
69   deployment_statements
70   sqlt_type
71   sql_maker
72   build_datetime_parser
73   datetime_parser_type
74
75   insert
76   insert_bulk
77   update
78   delete
79   select
80   select_single
81
82   get_use_dbms_capability
83   get_dbms_capability
84
85   _server_info
86   _get_server_version
87 /;
88
89 for my $meth (@rdbms_specific_methods) {
90
91   my $orig = __PACKAGE__->can ($meth)
92     or die "$meth is not a ::Storage::DBI method!";
93
94   no strict qw/refs/;
95   no warnings qw/redefine/;
96   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
97     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
98       $_[0]->_determine_driver;
99
100       # This for some reason crashes and burns on perl 5.8.1
101       # IFF the method ends up throwing an exception
102       #goto $_[0]->can ($meth);
103
104       my $cref = $_[0]->can ($meth);
105       goto $cref;
106     }
107     goto $orig;
108   };
109 }
110
111
112 =head1 NAME
113
114 DBIx::Class::Storage::DBI - DBI storage handler
115
116 =head1 SYNOPSIS
117
118   my $schema = MySchema->connect('dbi:SQLite:my.db');
119
120   $schema->storage->debug(1);
121
122   my @stuff = $schema->storage->dbh_do(
123     sub {
124       my ($storage, $dbh, @args) = @_;
125       $dbh->do("DROP TABLE authors");
126     },
127     @column_list
128   );
129
130   $schema->resultset('Book')->search({
131      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
132   });
133
134 =head1 DESCRIPTION
135
136 This class represents the connection to an RDBMS via L<DBI>.  See
137 L<DBIx::Class::Storage> for general information.  This pod only
138 documents DBI-specific methods and behaviors.
139
140 =head1 METHODS
141
142 =cut
143
144 sub new {
145   my $new = shift->next::method(@_);
146
147   $new->transaction_depth(0);
148   $new->_sql_maker_opts({});
149   $new->_dbh_details({});
150   $new->{savepoints} = [];
151   $new->{_in_dbh_do} = 0;
152   $new->{_dbh_gen} = 0;
153
154   # read below to see what this does
155   $new->_arm_global_destructor;
156
157   $new;
158 }
159
160 # This is hack to work around perl shooting stuff in random
161 # order on exit(). If we do not walk the remaining storage
162 # objects in an END block, there is a *small but real* chance
163 # of a fork()ed child to kill the parent's shared DBI handle,
164 # *before perl reaches the DESTROY in this package*
165 # Yes, it is ugly and effective.
166 {
167   my %seek_and_destroy;
168
169   sub _arm_global_destructor {
170     my $self = shift;
171     my $key = Scalar::Util::refaddr ($self);
172     $seek_and_destroy{$key} = $self;
173     Scalar::Util::weaken ($seek_and_destroy{$key});
174   }
175
176   END {
177     local $?; # just in case the DBI destructor changes it somehow
178
179     # destroy just the object if not native to this process/thread
180     $_->_preserve_foreign_dbh for (grep
181       { defined $_ }
182       values %seek_and_destroy
183     );
184   }
185 }
186
187 sub DESTROY {
188   my $self = shift;
189
190   # destroy just the object if not native to this process/thread
191   $self->_preserve_foreign_dbh;
192
193   # some databases need this to stop spewing warnings
194   if (my $dbh = $self->_dbh) {
195     try {
196       %{ $dbh->{CachedKids} } = ();
197       $dbh->disconnect;
198     };
199   }
200
201   $self->_dbh(undef);
202 }
203
204 sub _preserve_foreign_dbh {
205   my $self = shift;
206
207   return unless $self->_dbh;
208
209   $self->_verify_tid;
210
211   return unless $self->_dbh;
212
213   $self->_verify_pid;
214
215 }
216
217 # handle pid changes correctly - do not destroy parent's connection
218 sub _verify_pid {
219   my $self = shift;
220
221   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
222
223   $self->_dbh->{InactiveDestroy} = 1;
224   $self->_dbh(undef);
225   $self->{_dbh_gen}++;
226
227   return;
228 }
229
230 # very similar to above, but seems to FAIL if I set InactiveDestroy
231 sub _verify_tid {
232   my $self = shift;
233
234   if ( ! defined $self->_conn_tid ) {
235     return; # no threads
236   }
237   elsif ( $self->_conn_tid == threads->tid ) {
238     return; # same thread
239   }
240
241   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
242   $self->_dbh(undef);
243   $self->{_dbh_gen}++;
244
245   return;
246 }
247
248
249 =head2 connect_info
250
251 This method is normally called by L<DBIx::Class::Schema/connection>, which
252 encapsulates its argument list in an arrayref before passing them here.
253
254 The argument list may contain:
255
256 =over
257
258 =item *
259
260 The same 4-element argument set one would normally pass to
261 L<DBI/connect>, optionally followed by
262 L<extra attributes|/DBIx::Class specific connection attributes>
263 recognized by DBIx::Class:
264
265   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
266
267 =item *
268
269 A single code reference which returns a connected
270 L<DBI database handle|DBI/connect> optionally followed by
271 L<extra attributes|/DBIx::Class specific connection attributes> recognized
272 by DBIx::Class:
273
274   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
275
276 =item *
277
278 A single hashref with all the attributes and the dsn/user/password
279 mixed together:
280
281   $connect_info_args = [{
282     dsn => $dsn,
283     user => $user,
284     password => $pass,
285     %dbi_attributes,
286     %extra_attributes,
287   }];
288
289   $connect_info_args = [{
290     dbh_maker => sub { DBI->connect (...) },
291     %dbi_attributes,
292     %extra_attributes,
293   }];
294
295 This is particularly useful for L<Catalyst> based applications, allowing the
296 following config (L<Config::General> style):
297
298   <Model::DB>
299     schema_class   App::DB
300     <connect_info>
301       dsn          dbi:mysql:database=test
302       user         testuser
303       password     TestPass
304       AutoCommit   1
305     </connect_info>
306   </Model::DB>
307
308 The C<dsn>/C<user>/C<password> combination can be substituted by the
309 C<dbh_maker> key whose value is a coderef that returns a connected
310 L<DBI database handle|DBI/connect>
311
312 =back
313
314 Please note that the L<DBI> docs recommend that you always explicitly
315 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
316 recommends that it be set to I<1>, and that you perform transactions
317 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
318 to I<1> if you do not do explicitly set it to zero.  This is the default
319 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
320
321 =head3 DBIx::Class specific connection attributes
322
323 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
324 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
325 the following connection options. These options can be mixed in with your other
326 L<DBI> connection attributes, or placed in a separate hashref
327 (C<\%extra_attributes>) as shown above.
328
329 Every time C<connect_info> is invoked, any previous settings for
330 these options will be cleared before setting the new ones, regardless of
331 whether any options are specified in the new C<connect_info>.
332
333
334 =over
335
336 =item on_connect_do
337
338 Specifies things to do immediately after connecting or re-connecting to
339 the database.  Its value may contain:
340
341 =over
342
343 =item a scalar
344
345 This contains one SQL statement to execute.
346
347 =item an array reference
348
349 This contains SQL statements to execute in order.  Each element contains
350 a string or a code reference that returns a string.
351
352 =item a code reference
353
354 This contains some code to execute.  Unlike code references within an
355 array reference, its return value is ignored.
356
357 =back
358
359 =item on_disconnect_do
360
361 Takes arguments in the same form as L</on_connect_do> and executes them
362 immediately before disconnecting from the database.
363
364 Note, this only runs if you explicitly call L</disconnect> on the
365 storage object.
366
367 =item on_connect_call
368
369 A more generalized form of L</on_connect_do> that calls the specified
370 C<connect_call_METHOD> methods in your storage driver.
371
372   on_connect_do => 'select 1'
373
374 is equivalent to:
375
376   on_connect_call => [ [ do_sql => 'select 1' ] ]
377
378 Its values may contain:
379
380 =over
381
382 =item a scalar
383
384 Will call the C<connect_call_METHOD> method.
385
386 =item a code reference
387
388 Will execute C<< $code->($storage) >>
389
390 =item an array reference
391
392 Each value can be a method name or code reference.
393
394 =item an array of arrays
395
396 For each array, the first item is taken to be the C<connect_call_> method name
397 or code reference, and the rest are parameters to it.
398
399 =back
400
401 Some predefined storage methods you may use:
402
403 =over
404
405 =item do_sql
406
407 Executes a SQL string or a code reference that returns a SQL string. This is
408 what L</on_connect_do> and L</on_disconnect_do> use.
409
410 It can take:
411
412 =over
413
414 =item a scalar
415
416 Will execute the scalar as SQL.
417
418 =item an arrayref
419
420 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
421 attributes hashref and bind values.
422
423 =item a code reference
424
425 Will execute C<< $code->($storage) >> and execute the return array refs as
426 above.
427
428 =back
429
430 =item datetime_setup
431
432 Execute any statements necessary to initialize the database session to return
433 and accept datetime/timestamp values used with
434 L<DBIx::Class::InflateColumn::DateTime>.
435
436 Only necessary for some databases, see your specific storage driver for
437 implementation details.
438
439 =back
440
441 =item on_disconnect_call
442
443 Takes arguments in the same form as L</on_connect_call> and executes them
444 immediately before disconnecting from the database.
445
446 Calls the C<disconnect_call_METHOD> methods as opposed to the
447 C<connect_call_METHOD> methods called by L</on_connect_call>.
448
449 Note, this only runs if you explicitly call L</disconnect> on the
450 storage object.
451
452 =item disable_sth_caching
453
454 If set to a true value, this option will disable the caching of
455 statement handles via L<DBI/prepare_cached>.
456
457 =item limit_dialect
458
459 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
460 default L</sql_limit_dialect> setting of the storage (if any). For a list
461 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
462
463 =item quote_char
464
465 Specifies what characters to use to quote table and column names. If
466 you use this you will want to specify L</name_sep> as well.
467
468 C<quote_char> expects either a single character, in which case is it
469 is placed on either side of the table/column name, or an arrayref of length
470 2 in which case the table/column name is placed between the elements.
471
472 For example under MySQL you should use C<< quote_char => '`' >>, and for
473 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
474
475 =item name_sep
476
477 This only needs to be used in conjunction with C<quote_char>, and is used to
478 specify the character that separates elements (schemas, tables, columns) from
479 each other. In most cases this is simply a C<.>.
480
481 The consequences of not supplying this value is that L<SQL::Abstract>
482 will assume DBIx::Class' uses of aliases to be complete column
483 names. The output will look like I<"me.name"> when it should actually
484 be I<"me"."name">.
485
486 =item unsafe
487
488 This Storage driver normally installs its own C<HandleError>, sets
489 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
490 all database handles, including those supplied by a coderef.  It does this
491 so that it can have consistent and useful error behavior.
492
493 If you set this option to a true value, Storage will not do its usual
494 modifications to the database handle's attributes, and instead relies on
495 the settings in your connect_info DBI options (or the values you set in
496 your connection coderef, in the case that you are connecting via coderef).
497
498 Note that your custom settings can cause Storage to malfunction,
499 especially if you set a C<HandleError> handler that suppresses exceptions
500 and/or disable C<RaiseError>.
501
502 =item auto_savepoint
503
504 If this option is true, L<DBIx::Class> will use savepoints when nesting
505 transactions, making it possible to recover from failure in the inner
506 transaction without having to abort all outer transactions.
507
508 =item cursor_class
509
510 Use this argument to supply a cursor class other than the default
511 L<DBIx::Class::Storage::DBI::Cursor>.
512
513 =back
514
515 Some real-life examples of arguments to L</connect_info> and
516 L<DBIx::Class::Schema/connect>
517
518   # Simple SQLite connection
519   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
520
521   # Connect via subref
522   ->connect_info([ sub { DBI->connect(...) } ]);
523
524   # Connect via subref in hashref
525   ->connect_info([{
526     dbh_maker => sub { DBI->connect(...) },
527     on_connect_do => 'alter session ...',
528   }]);
529
530   # A bit more complicated
531   ->connect_info(
532     [
533       'dbi:Pg:dbname=foo',
534       'postgres',
535       'my_pg_password',
536       { AutoCommit => 1 },
537       { quote_char => q{"}, name_sep => q{.} },
538     ]
539   );
540
541   # Equivalent to the previous example
542   ->connect_info(
543     [
544       'dbi:Pg:dbname=foo',
545       'postgres',
546       'my_pg_password',
547       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
548     ]
549   );
550
551   # Same, but with hashref as argument
552   # See parse_connect_info for explanation
553   ->connect_info(
554     [{
555       dsn         => 'dbi:Pg:dbname=foo',
556       user        => 'postgres',
557       password    => 'my_pg_password',
558       AutoCommit  => 1,
559       quote_char  => q{"},
560       name_sep    => q{.},
561     }]
562   );
563
564   # Subref + DBIx::Class-specific connection options
565   ->connect_info(
566     [
567       sub { DBI->connect(...) },
568       {
569           quote_char => q{`},
570           name_sep => q{@},
571           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
572           disable_sth_caching => 1,
573       },
574     ]
575   );
576
577
578
579 =cut
580
581 sub connect_info {
582   my ($self, $info) = @_;
583
584   return $self->_connect_info if !$info;
585
586   $self->_connect_info($info); # copy for _connect_info
587
588   $info = $self->_normalize_connect_info($info)
589     if ref $info eq 'ARRAY';
590
591   for my $storage_opt (keys %{ $info->{storage_options} }) {
592     my $value = $info->{storage_options}{$storage_opt};
593
594     $self->$storage_opt($value);
595   }
596
597   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
598   #  the new set of options
599   $self->_sql_maker(undef);
600   $self->_sql_maker_opts({});
601
602   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
603     my $value = $info->{sql_maker_options}{$sql_maker_opt};
604
605     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
606   }
607
608   my %attrs = (
609     %{ $self->_default_dbi_connect_attributes || {} },
610     %{ $info->{attributes} || {} },
611   );
612
613   my @args = @{ $info->{arguments} };
614
615   $self->_dbi_connect_info([@args,
616     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
617
618   # FIXME - dirty:
619   # save attributes them in a separate accessor so they are always
620   # introspectable, even in case of a CODE $dbhmaker
621   $self->_dbic_connect_attributes (\%attrs);
622
623   return $self->_connect_info;
624 }
625
626 sub _normalize_connect_info {
627   my ($self, $info_arg) = @_;
628   my %info;
629
630   my @args = @$info_arg;  # take a shallow copy for further mutilation
631
632   # combine/pre-parse arguments depending on invocation style
633
634   my %attrs;
635   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
636     %attrs = %{ $args[1] || {} };
637     @args = $args[0];
638   }
639   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
640     %attrs = %{$args[0]};
641     @args = ();
642     if (my $code = delete $attrs{dbh_maker}) {
643       @args = $code;
644
645       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
646       if (@ignored) {
647         carp sprintf (
648             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
649           . "to the result of 'dbh_maker'",
650
651           join (', ', map { "'$_'" } (@ignored) ),
652         );
653       }
654     }
655     else {
656       @args = delete @attrs{qw/dsn user password/};
657     }
658   }
659   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
660     %attrs = (
661       % { $args[3] || {} },
662       % { $args[4] || {} },
663     );
664     @args = @args[0,1,2];
665   }
666
667   $info{arguments} = \@args;
668
669   my @storage_opts = grep exists $attrs{$_},
670     @storage_options, 'cursor_class';
671
672   @{ $info{storage_options} }{@storage_opts} =
673     delete @attrs{@storage_opts} if @storage_opts;
674
675   my @sql_maker_opts = grep exists $attrs{$_},
676     qw/limit_dialect quote_char name_sep/;
677
678   @{ $info{sql_maker_options} }{@sql_maker_opts} =
679     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
680
681   $info{attributes} = \%attrs if %attrs;
682
683   return \%info;
684 }
685
686 sub _default_dbi_connect_attributes {
687   return {
688     AutoCommit => 1,
689     RaiseError => 1,
690     PrintError => 0,
691   };
692 }
693
694 =head2 on_connect_do
695
696 This method is deprecated in favour of setting via L</connect_info>.
697
698 =cut
699
700 =head2 on_disconnect_do
701
702 This method is deprecated in favour of setting via L</connect_info>.
703
704 =cut
705
706 sub _parse_connect_do {
707   my ($self, $type) = @_;
708
709   my $val = $self->$type;
710   return () if not defined $val;
711
712   my @res;
713
714   if (not ref($val)) {
715     push @res, [ 'do_sql', $val ];
716   } elsif (ref($val) eq 'CODE') {
717     push @res, $val;
718   } elsif (ref($val) eq 'ARRAY') {
719     push @res, map { [ 'do_sql', $_ ] } @$val;
720   } else {
721     $self->throw_exception("Invalid type for $type: ".ref($val));
722   }
723
724   return \@res;
725 }
726
727 =head2 dbh_do
728
729 Arguments: ($subref | $method_name), @extra_coderef_args?
730
731 Execute the given $subref or $method_name using the new exception-based
732 connection management.
733
734 The first two arguments will be the storage object that C<dbh_do> was called
735 on and a database handle to use.  Any additional arguments will be passed
736 verbatim to the called subref as arguments 2 and onwards.
737
738 Using this (instead of $self->_dbh or $self->dbh) ensures correct
739 exception handling and reconnection (or failover in future subclasses).
740
741 Your subref should have no side-effects outside of the database, as
742 there is the potential for your subref to be partially double-executed
743 if the database connection was stale/dysfunctional.
744
745 Example:
746
747   my @stuff = $schema->storage->dbh_do(
748     sub {
749       my ($storage, $dbh, @cols) = @_;
750       my $cols = join(q{, }, @cols);
751       $dbh->selectrow_array("SELECT $cols FROM foo");
752     },
753     @column_list
754   );
755
756 =cut
757
758 sub dbh_do {
759   my $self = shift;
760   my $code = shift;
761
762   my $dbh = $self->_get_dbh;
763
764   return $self->$code($dbh, @_)
765     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
766
767   local $self->{_in_dbh_do} = 1;
768
769   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
770   my $args = \@_;
771   return try {
772     $self->$code ($dbh, @$args);
773   } catch {
774     $self->throw_exception($_) if $self->connected;
775
776     # We were not connected - reconnect and retry, but let any
777     #  exception fall right through this time
778     carp "Retrying $code after catching disconnected exception: $_"
779       if $ENV{DBIC_DBIRETRY_DEBUG};
780
781     $self->_populate_dbh;
782     $self->$code($self->_dbh, @$args);
783   };
784 }
785
786 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
787 # It also informs dbh_do to bypass itself while under the direction of txn_do,
788 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
789 sub txn_do {
790   my $self = shift;
791   my $coderef = shift;
792
793   ref $coderef eq 'CODE' or $self->throw_exception
794     ('$coderef must be a CODE reference');
795
796   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
797
798   local $self->{_in_dbh_do} = 1;
799
800   my @result;
801   my $want_array = wantarray;
802
803   my $tried = 0;
804   while(1) {
805     my $exception;
806
807     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
808     my $args = \@_;
809
810     try {
811       $self->_get_dbh;
812
813       $self->txn_begin;
814       if($want_array) {
815           @result = $coderef->(@$args);
816       }
817       elsif(defined $want_array) {
818           $result[0] = $coderef->(@$args);
819       }
820       else {
821           $coderef->(@$args);
822       }
823       $self->txn_commit;
824     } catch {
825       $exception = $_;
826     };
827
828     if(! defined $exception) { return $want_array ? @result : $result[0] }
829
830     if($tried++ || $self->connected) {
831       my $rollback_exception;
832       try { $self->txn_rollback } catch { $rollback_exception = shift };
833       if(defined $rollback_exception) {
834         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
835         $self->throw_exception($exception)  # propagate nested rollback
836           if $rollback_exception =~ /$exception_class/;
837
838         $self->throw_exception(
839           "Transaction aborted: ${exception}. "
840           . "Rollback failed: ${rollback_exception}"
841         );
842       }
843       $self->throw_exception($exception)
844     }
845
846     # We were not connected, and was first try - reconnect and retry
847     # via the while loop
848     carp "Retrying $coderef after catching disconnected exception: $exception"
849       if $ENV{DBIC_DBIRETRY_DEBUG};
850     $self->_populate_dbh;
851   }
852 }
853
854 =head2 disconnect
855
856 Our C<disconnect> method also performs a rollback first if the
857 database is not in C<AutoCommit> mode.
858
859 =cut
860
861 sub disconnect {
862   my ($self) = @_;
863
864   if( $self->_dbh ) {
865     my @actions;
866
867     push @actions, ( $self->on_disconnect_call || () );
868     push @actions, $self->_parse_connect_do ('on_disconnect_do');
869
870     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
871
872     $self->_dbh_rollback unless $self->_dbh_autocommit;
873
874     %{ $self->_dbh->{CachedKids} } = ();
875     $self->_dbh->disconnect;
876     $self->_dbh(undef);
877     $self->{_dbh_gen}++;
878   }
879 }
880
881 =head2 with_deferred_fk_checks
882
883 =over 4
884
885 =item Arguments: C<$coderef>
886
887 =item Return Value: The return value of $coderef
888
889 =back
890
891 Storage specific method to run the code ref with FK checks deferred or
892 in MySQL's case disabled entirely.
893
894 =cut
895
896 # Storage subclasses should override this
897 sub with_deferred_fk_checks {
898   my ($self, $sub) = @_;
899   $sub->();
900 }
901
902 =head2 connected
903
904 =over
905
906 =item Arguments: none
907
908 =item Return Value: 1|0
909
910 =back
911
912 Verifies that the current database handle is active and ready to execute
913 an SQL statement (e.g. the connection did not get stale, server is still
914 answering, etc.) This method is used internally by L</dbh>.
915
916 =cut
917
918 sub connected {
919   my $self = shift;
920   return 0 unless $self->_seems_connected;
921
922   #be on the safe side
923   local $self->_dbh->{RaiseError} = 1;
924
925   return $self->_ping;
926 }
927
928 sub _seems_connected {
929   my $self = shift;
930
931   $self->_preserve_foreign_dbh;
932
933   my $dbh = $self->_dbh
934     or return 0;
935
936   return $dbh->FETCH('Active');
937 }
938
939 sub _ping {
940   my $self = shift;
941
942   my $dbh = $self->_dbh or return 0;
943
944   return $dbh->ping;
945 }
946
947 sub ensure_connected {
948   my ($self) = @_;
949
950   unless ($self->connected) {
951     $self->_populate_dbh;
952   }
953 }
954
955 =head2 dbh
956
957 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
958 is guaranteed to be healthy by implicitly calling L</connected>, and if
959 necessary performing a reconnection before returning. Keep in mind that this
960 is very B<expensive> on some database engines. Consider using L</dbh_do>
961 instead.
962
963 =cut
964
965 sub dbh {
966   my ($self) = @_;
967
968   if (not $self->_dbh) {
969     $self->_populate_dbh;
970   } else {
971     $self->ensure_connected;
972   }
973   return $self->_dbh;
974 }
975
976 # this is the internal "get dbh or connect (don't check)" method
977 sub _get_dbh {
978   my $self = shift;
979   $self->_preserve_foreign_dbh;
980   $self->_populate_dbh unless $self->_dbh;
981   return $self->_dbh;
982 }
983
984 sub sql_maker {
985   my ($self) = @_;
986   unless ($self->_sql_maker) {
987     my $sql_maker_class = $self->sql_maker_class;
988     $self->ensure_class_loaded ($sql_maker_class);
989
990     my %opts = %{$self->_sql_maker_opts||{}};
991     my $dialect =
992       $opts{limit_dialect}
993         ||
994       $self->sql_limit_dialect
995         ||
996       do {
997         my $s_class = (ref $self) || $self;
998         carp (
999           "Your storage class ($s_class) does not set sql_limit_dialect and you "
1000         . 'have not supplied an explicit limit_dialect in your connection_info. '
1001         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1002         . 'databases but can be (and often is) painfully slow.'
1003         );
1004
1005         'GenericSubQ';
1006       }
1007     ;
1008
1009     $self->_sql_maker($sql_maker_class->new(
1010       bindtype=>'columns',
1011       array_datatypes => 1,
1012       limit_dialect => $dialect,
1013       %opts,
1014     ));
1015   }
1016   return $self->_sql_maker;
1017 }
1018
1019 # nothing to do by default
1020 sub _rebless {}
1021 sub _init {}
1022
1023 sub _populate_dbh {
1024   my ($self) = @_;
1025
1026   my @info = @{$self->_dbi_connect_info || []};
1027   $self->_dbh(undef); # in case ->connected failed we might get sent here
1028   $self->_dbh_details({}); # reset everything we know
1029
1030   $self->_dbh($self->_connect(@info));
1031
1032   $self->_conn_pid($$);
1033   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1034
1035   $self->_determine_driver;
1036
1037   # Always set the transaction depth on connect, since
1038   #  there is no transaction in progress by definition
1039   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1040
1041   $self->_run_connection_actions unless $self->{_in_determine_driver};
1042 }
1043
1044 sub _run_connection_actions {
1045   my $self = shift;
1046   my @actions;
1047
1048   push @actions, ( $self->on_connect_call || () );
1049   push @actions, $self->_parse_connect_do ('on_connect_do');
1050
1051   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1052 }
1053
1054
1055
1056 sub set_use_dbms_capability {
1057   $_[0]->set_inherited ($_[1], $_[2]);
1058 }
1059
1060 sub get_use_dbms_capability {
1061   my ($self, $capname) = @_;
1062
1063   my $use = $self->get_inherited ($capname);
1064   return defined $use
1065     ? $use
1066     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1067   ;
1068 }
1069
1070 sub set_dbms_capability {
1071   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1072 }
1073
1074 sub get_dbms_capability {
1075   my ($self, $capname) = @_;
1076
1077   my $cap = $self->_dbh_details->{capability}{$capname};
1078
1079   unless (defined $cap) {
1080     if (my $meth = $self->can ("_determine$capname")) {
1081       $cap = $self->$meth ? 1 : 0;
1082     }
1083     else {
1084       $cap = 0;
1085     }
1086
1087     $self->set_dbms_capability ($capname, $cap);
1088   }
1089
1090   return $cap;
1091 }
1092
1093 sub _server_info {
1094   my $self = shift;
1095
1096   my $info;
1097   unless ($info = $self->_dbh_details->{info}) {
1098
1099     $info = {};
1100
1101     my $server_version = try { $self->_get_server_version };
1102
1103     if (defined $server_version) {
1104       $info->{dbms_version} = $server_version;
1105
1106       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1107       my @verparts = split (/\./, $numeric_version);
1108       if (
1109         @verparts
1110           &&
1111         $verparts[0] <= 999
1112       ) {
1113         # consider only up to 3 version parts, iff not more than 3 digits
1114         my @use_parts;
1115         while (@verparts && @use_parts < 3) {
1116           my $p = shift @verparts;
1117           last if $p > 999;
1118           push @use_parts, $p;
1119         }
1120         push @use_parts, 0 while @use_parts < 3;
1121
1122         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1123       }
1124     }
1125
1126     $self->_dbh_details->{info} = $info;
1127   }
1128
1129   return $info;
1130 }
1131
1132 sub _get_server_version {
1133   shift->_get_dbh->get_info(18);
1134 }
1135
1136 sub _determine_driver {
1137   my ($self) = @_;
1138
1139   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1140     my $started_connected = 0;
1141     local $self->{_in_determine_driver} = 1;
1142
1143     if (ref($self) eq __PACKAGE__) {
1144       my $driver;
1145       if ($self->_dbh) { # we are connected
1146         $driver = $self->_dbh->{Driver}{Name};
1147         $started_connected = 1;
1148       } else {
1149         # if connect_info is a CODEREF, we have no choice but to connect
1150         if (ref $self->_dbi_connect_info->[0] &&
1151             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1152           $self->_populate_dbh;
1153           $driver = $self->_dbh->{Driver}{Name};
1154         }
1155         else {
1156           # try to use dsn to not require being connected, the driver may still
1157           # force a connection in _rebless to determine version
1158           # (dsn may not be supplied at all if all we do is make a mock-schema)
1159           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1160           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1161           $driver ||= $ENV{DBI_DRIVER};
1162         }
1163       }
1164
1165       if ($driver) {
1166         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1167         if ($self->load_optional_class($storage_class)) {
1168           mro::set_mro($storage_class, 'c3');
1169           bless $self, $storage_class;
1170           $self->_rebless();
1171         }
1172       }
1173     }
1174
1175     $self->_driver_determined(1);
1176
1177     $self->_init; # run driver-specific initializations
1178
1179     $self->_run_connection_actions
1180         if !$started_connected && defined $self->_dbh;
1181   }
1182 }
1183
1184 sub _do_connection_actions {
1185   my $self          = shift;
1186   my $method_prefix = shift;
1187   my $call          = shift;
1188
1189   if (not ref($call)) {
1190     my $method = $method_prefix . $call;
1191     $self->$method(@_);
1192   } elsif (ref($call) eq 'CODE') {
1193     $self->$call(@_);
1194   } elsif (ref($call) eq 'ARRAY') {
1195     if (ref($call->[0]) ne 'ARRAY') {
1196       $self->_do_connection_actions($method_prefix, $_) for @$call;
1197     } else {
1198       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1199     }
1200   } else {
1201     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1202   }
1203
1204   return $self;
1205 }
1206
1207 sub connect_call_do_sql {
1208   my $self = shift;
1209   $self->_do_query(@_);
1210 }
1211
1212 sub disconnect_call_do_sql {
1213   my $self = shift;
1214   $self->_do_query(@_);
1215 }
1216
1217 # override in db-specific backend when necessary
1218 sub connect_call_datetime_setup { 1 }
1219
1220 sub _do_query {
1221   my ($self, $action) = @_;
1222
1223   if (ref $action eq 'CODE') {
1224     $action = $action->($self);
1225     $self->_do_query($_) foreach @$action;
1226   }
1227   else {
1228     # Most debuggers expect ($sql, @bind), so we need to exclude
1229     # the attribute hash which is the second argument to $dbh->do
1230     # furthermore the bind values are usually to be presented
1231     # as named arrayref pairs, so wrap those here too
1232     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1233     my $sql = shift @do_args;
1234     my $attrs = shift @do_args;
1235     my @bind = map { [ undef, $_ ] } @do_args;
1236
1237     $self->_query_start($sql, @bind);
1238     $self->_get_dbh->do($sql, $attrs, @do_args);
1239     $self->_query_end($sql, @bind);
1240   }
1241
1242   return $self;
1243 }
1244
1245 sub _connect {
1246   my ($self, @info) = @_;
1247
1248   $self->throw_exception("You failed to provide any connection info")
1249     if !@info;
1250
1251   my ($old_connect_via, $dbh);
1252
1253   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1254     $old_connect_via = $DBI::connect_via;
1255     $DBI::connect_via = 'connect';
1256   }
1257
1258   try {
1259     if(ref $info[0] eq 'CODE') {
1260        $dbh = $info[0]->();
1261     }
1262     else {
1263        $dbh = DBI->connect(@info);
1264     }
1265
1266     if (!$dbh) {
1267       die $DBI::errstr;
1268     }
1269
1270     unless ($self->unsafe) {
1271
1272       # this odd anonymous coderef dereference is in fact really
1273       # necessary to avoid the unwanted effect described in perl5
1274       # RT#75792
1275       sub {
1276         my $weak_self = $_[0];
1277         weaken $weak_self;
1278
1279         $_[1]->{HandleError} = sub {
1280           if ($weak_self) {
1281             $weak_self->throw_exception("DBI Exception: $_[0]");
1282           }
1283           else {
1284             # the handler may be invoked by something totally out of
1285             # the scope of DBIC
1286             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1287           }
1288         };
1289       }->($self, $dbh);
1290
1291       $dbh->{ShowErrorStatement} = 1;
1292       $dbh->{RaiseError} = 1;
1293       $dbh->{PrintError} = 0;
1294     }
1295   }
1296   catch {
1297     $self->throw_exception("DBI Connection failed: $_")
1298   }
1299   finally {
1300     $DBI::connect_via = $old_connect_via if $old_connect_via;
1301   };
1302
1303   $self->_dbh_autocommit($dbh->{AutoCommit});
1304   $dbh;
1305 }
1306
1307 sub svp_begin {
1308   my ($self, $name) = @_;
1309
1310   $name = $self->_svp_generate_name
1311     unless defined $name;
1312
1313   $self->throw_exception ("You can't use savepoints outside a transaction")
1314     if $self->{transaction_depth} == 0;
1315
1316   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1317     unless $self->can('_svp_begin');
1318
1319   push @{ $self->{savepoints} }, $name;
1320
1321   $self->debugobj->svp_begin($name) if $self->debug;
1322
1323   return $self->_svp_begin($name);
1324 }
1325
1326 sub svp_release {
1327   my ($self, $name) = @_;
1328
1329   $self->throw_exception ("You can't use savepoints outside a transaction")
1330     if $self->{transaction_depth} == 0;
1331
1332   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1333     unless $self->can('_svp_release');
1334
1335   if (defined $name) {
1336     $self->throw_exception ("Savepoint '$name' does not exist")
1337       unless grep { $_ eq $name } @{ $self->{savepoints} };
1338
1339     # Dig through the stack until we find the one we are releasing.  This keeps
1340     # the stack up to date.
1341     my $svp;
1342
1343     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1344   } else {
1345     $name = pop @{ $self->{savepoints} };
1346   }
1347
1348   $self->debugobj->svp_release($name) if $self->debug;
1349
1350   return $self->_svp_release($name);
1351 }
1352
1353 sub svp_rollback {
1354   my ($self, $name) = @_;
1355
1356   $self->throw_exception ("You can't use savepoints outside a transaction")
1357     if $self->{transaction_depth} == 0;
1358
1359   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1360     unless $self->can('_svp_rollback');
1361
1362   if (defined $name) {
1363       # If they passed us a name, verify that it exists in the stack
1364       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1365           $self->throw_exception("Savepoint '$name' does not exist!");
1366       }
1367
1368       # Dig through the stack until we find the one we are releasing.  This keeps
1369       # the stack up to date.
1370       while(my $s = pop(@{ $self->{savepoints} })) {
1371           last if($s eq $name);
1372       }
1373       # Add the savepoint back to the stack, as a rollback doesn't remove the
1374       # named savepoint, only everything after it.
1375       push(@{ $self->{savepoints} }, $name);
1376   } else {
1377       # We'll assume they want to rollback to the last savepoint
1378       $name = $self->{savepoints}->[-1];
1379   }
1380
1381   $self->debugobj->svp_rollback($name) if $self->debug;
1382
1383   return $self->_svp_rollback($name);
1384 }
1385
1386 sub _svp_generate_name {
1387     my ($self) = @_;
1388
1389     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1390 }
1391
1392 sub txn_begin {
1393   my $self = shift;
1394
1395   # this means we have not yet connected and do not know the AC status
1396   # (e.g. coderef $dbh)
1397   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1398
1399   if($self->{transaction_depth} == 0) {
1400     $self->debugobj->txn_begin()
1401       if $self->debug;
1402     $self->_dbh_begin_work;
1403   }
1404   elsif ($self->auto_savepoint) {
1405     $self->svp_begin;
1406   }
1407   $self->{transaction_depth}++;
1408 }
1409
1410 sub _dbh_begin_work {
1411   my $self = shift;
1412
1413   # if the user is utilizing txn_do - good for him, otherwise we need to
1414   # ensure that the $dbh is healthy on BEGIN.
1415   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1416   # will be replaced by a failure of begin_work itself (which will be
1417   # then retried on reconnect)
1418   if ($self->{_in_dbh_do}) {
1419     $self->_dbh->begin_work;
1420   } else {
1421     $self->dbh_do(sub { $_[1]->begin_work });
1422   }
1423 }
1424
1425 sub txn_commit {
1426   my $self = shift;
1427   if ($self->{transaction_depth} == 1) {
1428     $self->debugobj->txn_commit()
1429       if ($self->debug);
1430     $self->_dbh_commit;
1431     $self->{transaction_depth} = 0
1432       if $self->_dbh_autocommit;
1433   }
1434   elsif($self->{transaction_depth} > 1) {
1435     $self->{transaction_depth}--;
1436     $self->svp_release
1437       if $self->auto_savepoint;
1438   }
1439 }
1440
1441 sub _dbh_commit {
1442   my $self = shift;
1443   my $dbh  = $self->_dbh
1444     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1445   $dbh->commit;
1446 }
1447
1448 sub txn_rollback {
1449   my $self = shift;
1450   my $dbh = $self->_dbh;
1451   try {
1452     if ($self->{transaction_depth} == 1) {
1453       $self->debugobj->txn_rollback()
1454         if ($self->debug);
1455       $self->{transaction_depth} = 0
1456         if $self->_dbh_autocommit;
1457       $self->_dbh_rollback;
1458     }
1459     elsif($self->{transaction_depth} > 1) {
1460       $self->{transaction_depth}--;
1461       if ($self->auto_savepoint) {
1462         $self->svp_rollback;
1463         $self->svp_release;
1464       }
1465     }
1466     else {
1467       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1468     }
1469   }
1470   catch {
1471     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1472
1473     if ($_ !~ /$exception_class/) {
1474       # ensure that a failed rollback resets the transaction depth
1475       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1476     }
1477
1478     $self->throw_exception($_)
1479   };
1480 }
1481
1482 sub _dbh_rollback {
1483   my $self = shift;
1484   my $dbh  = $self->_dbh
1485     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1486   $dbh->rollback;
1487 }
1488
1489 # This used to be the top-half of _execute.  It was split out to make it
1490 #  easier to override in NoBindVars without duping the rest.  It takes up
1491 #  all of _execute's args, and emits $sql, @bind.
1492 sub _prep_for_execute {
1493   my ($self, $op, $extra_bind, $ident, $args) = @_;
1494
1495   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1496     $ident = $ident->from();
1497   }
1498
1499   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1500
1501   unshift(@bind,
1502     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1503       if $extra_bind;
1504   return ($sql, \@bind);
1505 }
1506
1507
1508 sub _fix_bind_params {
1509     my ($self, @bind) = @_;
1510
1511     ### Turn @bind from something like this:
1512     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1513     ### to this:
1514     ###   ( "'1'", "'1'", "'3'" )
1515     return
1516         map {
1517             if ( defined( $_ && $_->[1] ) ) {
1518                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1519             }
1520             else { q{'NULL'}; }
1521         } @bind;
1522 }
1523
1524 sub _query_start {
1525     my ( $self, $sql, @bind ) = @_;
1526
1527     if ( $self->debug ) {
1528         @bind = $self->_fix_bind_params(@bind);
1529
1530         $self->debugobj->query_start( $sql, @bind );
1531     }
1532 }
1533
1534 sub _query_end {
1535     my ( $self, $sql, @bind ) = @_;
1536
1537     if ( $self->debug ) {
1538         @bind = $self->_fix_bind_params(@bind);
1539         $self->debugobj->query_end( $sql, @bind );
1540     }
1541 }
1542
1543 sub _dbh_execute {
1544   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1545
1546   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1547
1548   $self->_query_start( $sql, @$bind );
1549
1550   my $sth = $self->sth($sql,$op);
1551
1552   my $placeholder_index = 1;
1553
1554   foreach my $bound (@$bind) {
1555     my $attributes = {};
1556     my($column_name, @data) = @$bound;
1557
1558     if ($bind_attributes) {
1559       $attributes = $bind_attributes->{$column_name}
1560       if defined $bind_attributes->{$column_name};
1561     }
1562
1563     foreach my $data (@data) {
1564       my $ref = ref $data;
1565       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1566
1567       $sth->bind_param($placeholder_index, $data, $attributes);
1568       $placeholder_index++;
1569     }
1570   }
1571
1572   # Can this fail without throwing an exception anyways???
1573   my $rv = $sth->execute();
1574   $self->throw_exception(
1575     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1576   ) if !$rv;
1577
1578   $self->_query_end( $sql, @$bind );
1579
1580   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1581 }
1582
1583 sub _execute {
1584     my $self = shift;
1585     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1586 }
1587
1588 sub _prefetch_insert_auto_nextvals {
1589   my ($self, $source, $to_insert) = @_;
1590
1591   my $upd = {};
1592
1593   foreach my $col ( $source->columns ) {
1594     if ( !defined $to_insert->{$col} ) {
1595       my $col_info = $source->column_info($col);
1596
1597       if ( $col_info->{auto_nextval} ) {
1598         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1599           'nextval',
1600           $col_info->{sequence} ||=
1601             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1602         );
1603       }
1604     }
1605   }
1606
1607   return $upd;
1608 }
1609
1610 sub insert {
1611   my $self = shift;
1612   my ($source, $to_insert, $opts) = @_;
1613
1614   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1615
1616   my $bind_attributes = $self->source_bind_attributes($source);
1617
1618   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1619
1620   if ($opts->{returning}) {
1621     my @ret_cols = @{$opts->{returning}};
1622
1623     my @ret_vals = try {
1624       local $SIG{__WARN__} = sub {};
1625       my @r = $sth->fetchrow_array;
1626       $sth->finish;
1627       @r;
1628     };
1629
1630     my %ret;
1631     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1632
1633     $updated_cols = {
1634       %$updated_cols,
1635       %ret,
1636     };
1637   }
1638
1639   return $updated_cols;
1640 }
1641
1642 ## Currently it is assumed that all values passed will be "normal", i.e. not
1643 ## scalar refs, or at least, all the same type as the first set, the statement is
1644 ## only prepped once.
1645 sub insert_bulk {
1646   my ($self, $source, $cols, $data) = @_;
1647
1648   my %colvalues;
1649   @colvalues{@$cols} = (0..$#$cols);
1650
1651   for my $i (0..$#$cols) {
1652     my $first_val = $data->[0][$i];
1653     next unless ref $first_val eq 'SCALAR';
1654
1655     $colvalues{ $cols->[$i] } = $first_val;
1656   }
1657
1658   # check for bad data and stringify stringifiable objects
1659   my $bad_slice = sub {
1660     my ($msg, $col_idx, $slice_idx) = @_;
1661     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1662       $msg,
1663       $cols->[$col_idx],
1664       do {
1665         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1666         Dumper {
1667           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1668         },
1669       }
1670     );
1671   };
1672
1673   for my $datum_idx (0..$#$data) {
1674     my $datum = $data->[$datum_idx];
1675
1676     for my $col_idx (0..$#$cols) {
1677       my $val            = $datum->[$col_idx];
1678       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1679       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1680
1681       if ($is_literal_sql) {
1682         if (not ref $val) {
1683           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1684         }
1685         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1686           $bad_slice->("$reftype reference found where literal SQL expected",
1687             $col_idx, $datum_idx);
1688         }
1689         elsif ($$val ne $$sqla_bind){
1690           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1691             $col_idx, $datum_idx);
1692         }
1693       }
1694       elsif (my $reftype = ref $val) {
1695         require overload;
1696         if (overload::Method($val, '""')) {
1697           $datum->[$col_idx] = "".$val;
1698         }
1699         else {
1700           $bad_slice->("$reftype reference found where bind expected",
1701             $col_idx, $datum_idx);
1702         }
1703       }
1704     }
1705   }
1706
1707   my ($sql, $bind) = $self->_prep_for_execute (
1708     'insert', undef, $source, [\%colvalues]
1709   );
1710   my @bind = @$bind;
1711
1712   my $empty_bind = 1 if (not @bind) &&
1713     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1714
1715   if ((not @bind) && (not $empty_bind)) {
1716     $self->throw_exception(
1717       'Cannot insert_bulk without support for placeholders'
1718     );
1719   }
1720
1721   # neither _execute_array, nor _execute_inserts_with_no_binds are
1722   # atomic (even if _execute _array is a single call). Thus a safety
1723   # scope guard
1724   my $guard = $self->txn_scope_guard;
1725
1726   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1727   my $sth = $self->sth($sql);
1728   my $rv = do {
1729     if ($empty_bind) {
1730       # bind_param_array doesn't work if there are no binds
1731       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1732     }
1733     else {
1734 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1735       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1736     }
1737   };
1738
1739   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1740
1741   $guard->commit;
1742
1743   return (wantarray ? ($rv, $sth, @bind) : $rv);
1744 }
1745
1746 sub _execute_array {
1747   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1748
1749   ## This must be an arrayref, else nothing works!
1750   my $tuple_status = [];
1751
1752   ## Get the bind_attributes, if any exist
1753   my $bind_attributes = $self->source_bind_attributes($source);
1754
1755   ## Bind the values and execute
1756   my $placeholder_index = 1;
1757
1758   foreach my $bound (@$bind) {
1759
1760     my $attributes = {};
1761     my ($column_name, $data_index) = @$bound;
1762
1763     if( $bind_attributes ) {
1764       $attributes = $bind_attributes->{$column_name}
1765       if defined $bind_attributes->{$column_name};
1766     }
1767
1768     my @data = map { $_->[$data_index] } @$data;
1769
1770     $sth->bind_param_array(
1771       $placeholder_index,
1772       [@data],
1773       (%$attributes ?  $attributes : ()),
1774     );
1775     $placeholder_index++;
1776   }
1777
1778   my ($rv, $err);
1779   try {
1780     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1781   }
1782   catch {
1783     $err = shift;
1784   }
1785   finally {
1786     # Statement must finish even if there was an exception.
1787     try {
1788       $sth->finish
1789     }
1790     catch {
1791       $err = shift unless defined $err
1792     };
1793   };
1794
1795   $err = $sth->errstr
1796     if (! defined $err and $sth->err);
1797
1798   if (defined $err) {
1799     my $i = 0;
1800     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1801
1802     $self->throw_exception("Unexpected populate error: $err")
1803       if ($i > $#$tuple_status);
1804
1805     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1806       ($tuple_status->[$i][1] || $err),
1807       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1808     );
1809   }
1810
1811   return $rv;
1812 }
1813
1814 sub _dbh_execute_array {
1815     my ($self, $sth, $tuple_status, @extra) = @_;
1816
1817     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1818 }
1819
1820 sub _dbh_execute_inserts_with_no_binds {
1821   my ($self, $sth, $count) = @_;
1822
1823   my $err;
1824   try {
1825     my $dbh = $self->_get_dbh;
1826     local $dbh->{RaiseError} = 1;
1827     local $dbh->{PrintError} = 0;
1828
1829     $sth->execute foreach 1..$count;
1830   }
1831   catch {
1832     $err = shift;
1833   }
1834   finally {
1835     # Make sure statement is finished even if there was an exception.
1836     try {
1837       $sth->finish
1838     }
1839     catch {
1840       $err = shift unless defined $err;
1841     };
1842   };
1843
1844   $self->throw_exception($err) if defined $err;
1845
1846   return $count;
1847 }
1848
1849 sub update {
1850   my ($self, $source, @args) = @_;
1851
1852   my $bind_attrs = $self->source_bind_attributes($source);
1853
1854   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1855 }
1856
1857
1858 sub delete {
1859   my ($self, $source, @args) = @_;
1860
1861   my $bind_attrs = $self->source_bind_attributes($source);
1862
1863   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1864 }
1865
1866 # We were sent here because the $rs contains a complex search
1867 # which will require a subquery to select the correct rows
1868 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1869 #
1870 # Generating a single PK column subquery is trivial and supported
1871 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1872 # Look at _multipk_update_delete()
1873 sub _subq_update_delete {
1874   my $self = shift;
1875   my ($rs, $op, $values) = @_;
1876
1877   my $rsrc = $rs->result_source;
1878
1879   # quick check if we got a sane rs on our hands
1880   my @pcols = $rsrc->_pri_cols;
1881
1882   my $sel = $rs->_resolved_attrs->{select};
1883   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1884
1885   if (
1886       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1887         ne
1888       join ("\x00", sort @$sel )
1889   ) {
1890     $self->throw_exception (
1891       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1892     );
1893   }
1894
1895   if (@pcols == 1) {
1896     return $self->$op (
1897       $rsrc,
1898       $op eq 'update' ? $values : (),
1899       { $pcols[0] => { -in => $rs->as_query } },
1900     );
1901   }
1902
1903   else {
1904     return $self->_multipk_update_delete (@_);
1905   }
1906 }
1907
1908 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1909 # resultset update/delete involving subqueries. So by default resort
1910 # to simple (and inefficient) delete_all style per-row opearations,
1911 # while allowing specific storages to override this with a faster
1912 # implementation.
1913 #
1914 sub _multipk_update_delete {
1915   return shift->_per_row_update_delete (@_);
1916 }
1917
1918 # This is the default loop used to delete/update rows for multi PK
1919 # resultsets, and used by mysql exclusively (because it can't do anything
1920 # else).
1921 #
1922 # We do not use $row->$op style queries, because resultset update/delete
1923 # is not expected to cascade (this is what delete_all/update_all is for).
1924 #
1925 # There should be no race conditions as the entire operation is rolled
1926 # in a transaction.
1927 #
1928 sub _per_row_update_delete {
1929   my $self = shift;
1930   my ($rs, $op, $values) = @_;
1931
1932   my $rsrc = $rs->result_source;
1933   my @pcols = $rsrc->_pri_cols;
1934
1935   my $guard = $self->txn_scope_guard;
1936
1937   # emulate the return value of $sth->execute for non-selects
1938   my $row_cnt = '0E0';
1939
1940   my $subrs_cur = $rs->cursor;
1941   my @all_pk = $subrs_cur->all;
1942   for my $pks ( @all_pk) {
1943
1944     my $cond;
1945     for my $i (0.. $#pcols) {
1946       $cond->{$pcols[$i]} = $pks->[$i];
1947     }
1948
1949     $self->$op (
1950       $rsrc,
1951       $op eq 'update' ? $values : (),
1952       $cond,
1953     );
1954
1955     $row_cnt++;
1956   }
1957
1958   $guard->commit;
1959
1960   return $row_cnt;
1961 }
1962
1963 sub _select {
1964   my $self = shift;
1965   $self->_execute($self->_select_args(@_));
1966 }
1967
1968 sub _select_args_to_query {
1969   my $self = shift;
1970
1971   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1972   #  = $self->_select_args($ident, $select, $cond, $attrs);
1973   my ($op, $bind, $ident, $bind_attrs, @args) =
1974     $self->_select_args(@_);
1975
1976   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1977   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1978   $prepared_bind ||= [];
1979
1980   return wantarray
1981     ? ($sql, $prepared_bind, $bind_attrs)
1982     : \[ "($sql)", @$prepared_bind ]
1983   ;
1984 }
1985
1986 sub _select_args {
1987   my ($self, $ident, $select, $where, $attrs) = @_;
1988
1989   my $sql_maker = $self->sql_maker;
1990   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1991
1992   $attrs = {
1993     %$attrs,
1994     select => $select,
1995     from => $ident,
1996     where => $where,
1997     $rs_alias && $alias2source->{$rs_alias}
1998       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1999       : ()
2000     ,
2001   };
2002
2003   # calculate bind_attrs before possible $ident mangling
2004   my $bind_attrs = {};
2005   for my $alias (keys %$alias2source) {
2006     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2007     for my $col (keys %$bindtypes) {
2008
2009       my $fqcn = join ('.', $alias, $col);
2010       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2011
2012       # Unqialified column names are nice, but at the same time can be
2013       # rather ambiguous. What we do here is basically go along with
2014       # the loop, adding an unqualified column slot to $bind_attrs,
2015       # alongside the fully qualified name. As soon as we encounter
2016       # another column by that name (which would imply another table)
2017       # we unset the unqualified slot and never add any info to it
2018       # to avoid erroneous type binding. If this happens the users
2019       # only choice will be to fully qualify his column name
2020
2021       if (exists $bind_attrs->{$col}) {
2022         $bind_attrs->{$col} = {};
2023       }
2024       else {
2025         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2026       }
2027     }
2028   }
2029
2030   # Sanity check the attributes (SQLMaker does it too, but
2031   # in case of a software_limit we'll never reach there)
2032   if (defined $attrs->{offset}) {
2033     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2034       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2035   }
2036   $attrs->{offset} ||= 0;
2037
2038   if (defined $attrs->{rows}) {
2039     $self->throw_exception("The rows attribute must be a positive integer if present")
2040       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2041   }
2042   elsif ($attrs->{offset}) {
2043     # MySQL actually recommends this approach.  I cringe.
2044     $attrs->{rows} = $sql_maker->__max_int;
2045   }
2046
2047   my @limit;
2048
2049   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2050   # storage, unless software limit was requested
2051   if (
2052     #limited has_many
2053     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2054        ||
2055     # grouped prefetch (to satisfy group_by == select)
2056     ( $attrs->{group_by}
2057         &&
2058       @{$attrs->{group_by}}
2059         &&
2060       $attrs->{_prefetch_select}
2061         &&
2062       @{$attrs->{_prefetch_select}}
2063     )
2064   ) {
2065     ($ident, $select, $where, $attrs)
2066       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2067   }
2068   elsif (! $attrs->{software_limit} ) {
2069     push @limit, $attrs->{rows}, $attrs->{offset};
2070   }
2071
2072   # try to simplify the joinmap further (prune unreferenced type-single joins)
2073   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2074
2075 ###
2076   # This would be the point to deflate anything found in $where
2077   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2078   # expect a row object. And all we have is a resultsource (it is trivial
2079   # to extract deflator coderefs via $alias2source above).
2080   #
2081   # I don't see a way forward other than changing the way deflators are
2082   # invoked, and that's just bad...
2083 ###
2084
2085   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2086 }
2087
2088 # Returns a counting SELECT for a simple count
2089 # query. Abstracted so that a storage could override
2090 # this to { count => 'firstcol' } or whatever makes
2091 # sense as a performance optimization
2092 sub _count_select {
2093   #my ($self, $source, $rs_attrs) = @_;
2094   return { count => '*' };
2095 }
2096
2097
2098 sub source_bind_attributes {
2099   my ($self, $source) = @_;
2100
2101   my $bind_attributes;
2102   foreach my $column ($source->columns) {
2103
2104     my $data_type = $source->column_info($column)->{data_type} || '';
2105     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2106      if $data_type;
2107   }
2108
2109   return $bind_attributes;
2110 }
2111
2112 =head2 select
2113
2114 =over 4
2115
2116 =item Arguments: $ident, $select, $condition, $attrs
2117
2118 =back
2119
2120 Handle a SQL select statement.
2121
2122 =cut
2123
2124 sub select {
2125   my $self = shift;
2126   my ($ident, $select, $condition, $attrs) = @_;
2127   return $self->cursor_class->new($self, \@_, $attrs);
2128 }
2129
2130 sub select_single {
2131   my $self = shift;
2132   my ($rv, $sth, @bind) = $self->_select(@_);
2133   my @row = $sth->fetchrow_array;
2134   my @nextrow = $sth->fetchrow_array if @row;
2135   if(@row && @nextrow) {
2136     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2137   }
2138   # Need to call finish() to work round broken DBDs
2139   $sth->finish();
2140   return @row;
2141 }
2142
2143 =head2 sql_limit_dialect
2144
2145 This is an accessor for the default SQL limit dialect used by a particular
2146 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2147 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2148 see L<DBIx::Class::SQLMaker::LimitDialects>.
2149
2150 =head2 sth
2151
2152 =over 4
2153
2154 =item Arguments: $sql
2155
2156 =back
2157
2158 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2159
2160 =cut
2161
2162 sub _dbh_sth {
2163   my ($self, $dbh, $sql) = @_;
2164
2165   # 3 is the if_active parameter which avoids active sth re-use
2166   my $sth = $self->disable_sth_caching
2167     ? $dbh->prepare($sql)
2168     : $dbh->prepare_cached($sql, {}, 3);
2169
2170   # XXX You would think RaiseError would make this impossible,
2171   #  but apparently that's not true :(
2172   $self->throw_exception($dbh->errstr) if !$sth;
2173
2174   $sth;
2175 }
2176
2177 sub sth {
2178   my ($self, $sql) = @_;
2179   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2180 }
2181
2182 sub _dbh_columns_info_for {
2183   my ($self, $dbh, $table) = @_;
2184
2185   if ($dbh->can('column_info')) {
2186     my %result;
2187     my $caught;
2188     try {
2189       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2190       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2191       $sth->execute();
2192       while ( my $info = $sth->fetchrow_hashref() ){
2193         my %column_info;
2194         $column_info{data_type}   = $info->{TYPE_NAME};
2195         $column_info{size}      = $info->{COLUMN_SIZE};
2196         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2197         $column_info{default_value} = $info->{COLUMN_DEF};
2198         my $col_name = $info->{COLUMN_NAME};
2199         $col_name =~ s/^\"(.*)\"$/$1/;
2200
2201         $result{$col_name} = \%column_info;
2202       }
2203     } catch {
2204       $caught = 1;
2205     };
2206     return \%result if !$caught && scalar keys %result;
2207   }
2208
2209   my %result;
2210   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2211   $sth->execute;
2212   my @columns = @{$sth->{NAME_lc}};
2213   for my $i ( 0 .. $#columns ){
2214     my %column_info;
2215     $column_info{data_type} = $sth->{TYPE}->[$i];
2216     $column_info{size} = $sth->{PRECISION}->[$i];
2217     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2218
2219     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2220       $column_info{data_type} = $1;
2221       $column_info{size}    = $2;
2222     }
2223
2224     $result{$columns[$i]} = \%column_info;
2225   }
2226   $sth->finish;
2227
2228   foreach my $col (keys %result) {
2229     my $colinfo = $result{$col};
2230     my $type_num = $colinfo->{data_type};
2231     my $type_name;
2232     if(defined $type_num && $dbh->can('type_info')) {
2233       my $type_info = $dbh->type_info($type_num);
2234       $type_name = $type_info->{TYPE_NAME} if $type_info;
2235       $colinfo->{data_type} = $type_name if $type_name;
2236     }
2237   }
2238
2239   return \%result;
2240 }
2241
2242 sub columns_info_for {
2243   my ($self, $table) = @_;
2244   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2245 }
2246
2247 =head2 last_insert_id
2248
2249 Return the row id of the last insert.
2250
2251 =cut
2252
2253 sub _dbh_last_insert_id {
2254     my ($self, $dbh, $source, $col) = @_;
2255
2256     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2257
2258     return $id if defined $id;
2259
2260     my $class = ref $self;
2261     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2262 }
2263
2264 sub last_insert_id {
2265   my $self = shift;
2266   $self->_dbh_last_insert_id ($self->_dbh, @_);
2267 }
2268
2269 =head2 _native_data_type
2270
2271 =over 4
2272
2273 =item Arguments: $type_name
2274
2275 =back
2276
2277 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2278 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2279 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2280
2281 The default implementation returns C<undef>, implement in your Storage driver if
2282 you need this functionality.
2283
2284 Should map types from other databases to the native RDBMS type, for example
2285 C<VARCHAR2> to C<VARCHAR>.
2286
2287 Types with modifiers should map to the underlying data type. For example,
2288 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2289
2290 Composite types should map to the container type, for example
2291 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2292
2293 =cut
2294
2295 sub _native_data_type {
2296   #my ($self, $data_type) = @_;
2297   return undef
2298 }
2299
2300 # Check if placeholders are supported at all
2301 sub _determine_supports_placeholders {
2302   my $self = shift;
2303   my $dbh  = $self->_get_dbh;
2304
2305   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2306   # but it is inaccurate more often than not
2307   return try {
2308     local $dbh->{PrintError} = 0;
2309     local $dbh->{RaiseError} = 1;
2310     $dbh->do('select ?', {}, 1);
2311     1;
2312   }
2313   catch {
2314     0;
2315   };
2316 }
2317
2318 # Check if placeholders bound to non-string types throw exceptions
2319 #
2320 sub _determine_supports_typeless_placeholders {
2321   my $self = shift;
2322   my $dbh  = $self->_get_dbh;
2323
2324   return try {
2325     local $dbh->{PrintError} = 0;
2326     local $dbh->{RaiseError} = 1;
2327     # this specifically tests a bind that is NOT a string
2328     $dbh->do('select 1 where 1 = ?', {}, 1);
2329     1;
2330   }
2331   catch {
2332     0;
2333   };
2334 }
2335
2336 =head2 sqlt_type
2337
2338 Returns the database driver name.
2339
2340 =cut
2341
2342 sub sqlt_type {
2343   shift->_get_dbh->{Driver}->{Name};
2344 }
2345
2346 =head2 bind_attribute_by_data_type
2347
2348 Given a datatype from column info, returns a database specific bind
2349 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2350 let the database planner just handle it.
2351
2352 Generally only needed for special case column types, like bytea in postgres.
2353
2354 =cut
2355
2356 sub bind_attribute_by_data_type {
2357     return;
2358 }
2359
2360 =head2 is_datatype_numeric
2361
2362 Given a datatype from column_info, returns a boolean value indicating if
2363 the current RDBMS considers it a numeric value. This controls how
2364 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2365 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2366 be performed instead of the usual C<eq>.
2367
2368 =cut
2369
2370 sub is_datatype_numeric {
2371   my ($self, $dt) = @_;
2372
2373   return 0 unless $dt;
2374
2375   return $dt =~ /^ (?:
2376     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2377   ) $/ix;
2378 }
2379
2380
2381 =head2 create_ddl_dir
2382
2383 =over 4
2384
2385 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2386
2387 =back
2388
2389 Creates a SQL file based on the Schema, for each of the specified
2390 database engines in C<\@databases> in the given directory.
2391 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2392
2393 Given a previous version number, this will also create a file containing
2394 the ALTER TABLE statements to transform the previous schema into the
2395 current one. Note that these statements may contain C<DROP TABLE> or
2396 C<DROP COLUMN> statements that can potentially destroy data.
2397
2398 The file names are created using the C<ddl_filename> method below, please
2399 override this method in your schema if you would like a different file
2400 name format. For the ALTER file, the same format is used, replacing
2401 $version in the name with "$preversion-$version".
2402
2403 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2404 The most common value for this would be C<< { add_drop_table => 1 } >>
2405 to have the SQL produced include a C<DROP TABLE> statement for each table
2406 created. For quoting purposes supply C<quote_table_names> and
2407 C<quote_field_names>.
2408
2409 If no arguments are passed, then the following default values are assumed:
2410
2411 =over 4
2412
2413 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2414
2415 =item version    - $schema->schema_version
2416
2417 =item directory  - './'
2418
2419 =item preversion - <none>
2420
2421 =back
2422
2423 By default, C<\%sqlt_args> will have
2424
2425  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2426
2427 merged with the hash passed in. To disable any of those features, pass in a
2428 hashref like the following
2429
2430  { ignore_constraint_names => 0, # ... other options }
2431
2432
2433 WARNING: You are strongly advised to check all SQL files created, before applying
2434 them.
2435
2436 =cut
2437
2438 sub create_ddl_dir {
2439   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2440
2441   unless ($dir) {
2442     carp "No directory given, using ./\n";
2443     $dir = './';
2444   } else {
2445       -d $dir
2446         or
2447       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2448         or
2449       $self->throw_exception(
2450         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2451       );
2452   }
2453
2454   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2455
2456   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2457   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2458
2459   my $schema_version = $schema->schema_version || '1.x';
2460   $version ||= $schema_version;
2461
2462   $sqltargs = {
2463     add_drop_table => 1,
2464     ignore_constraint_names => 1,
2465     ignore_index_names => 1,
2466     %{$sqltargs || {}}
2467   };
2468
2469   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2470     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2471   }
2472
2473   my $sqlt = SQL::Translator->new( $sqltargs );
2474
2475   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2476   my $sqlt_schema = $sqlt->translate({ data => $schema })
2477     or $self->throw_exception ($sqlt->error);
2478
2479   foreach my $db (@$databases) {
2480     $sqlt->reset();
2481     $sqlt->{schema} = $sqlt_schema;
2482     $sqlt->producer($db);
2483
2484     my $file;
2485     my $filename = $schema->ddl_filename($db, $version, $dir);
2486     if (-e $filename && ($version eq $schema_version )) {
2487       # if we are dumping the current version, overwrite the DDL
2488       carp "Overwriting existing DDL file - $filename";
2489       unlink($filename);
2490     }
2491
2492     my $output = $sqlt->translate;
2493     if(!$output) {
2494       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2495       next;
2496     }
2497     if(!open($file, ">$filename")) {
2498       $self->throw_exception("Can't open $filename for writing ($!)");
2499       next;
2500     }
2501     print $file $output;
2502     close($file);
2503
2504     next unless ($preversion);
2505
2506     require SQL::Translator::Diff;
2507
2508     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2509     if(!-e $prefilename) {
2510       carp("No previous schema file found ($prefilename)");
2511       next;
2512     }
2513
2514     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2515     if(-e $difffile) {
2516       carp("Overwriting existing diff file - $difffile");
2517       unlink($difffile);
2518     }
2519
2520     my $source_schema;
2521     {
2522       my $t = SQL::Translator->new($sqltargs);
2523       $t->debug( 0 );
2524       $t->trace( 0 );
2525
2526       $t->parser( $db )
2527         or $self->throw_exception ($t->error);
2528
2529       my $out = $t->translate( $prefilename )
2530         or $self->throw_exception ($t->error);
2531
2532       $source_schema = $t->schema;
2533
2534       $source_schema->name( $prefilename )
2535         unless ( $source_schema->name );
2536     }
2537
2538     # The "new" style of producers have sane normalization and can support
2539     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2540     # And we have to diff parsed SQL against parsed SQL.
2541     my $dest_schema = $sqlt_schema;
2542
2543     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2544       my $t = SQL::Translator->new($sqltargs);
2545       $t->debug( 0 );
2546       $t->trace( 0 );
2547
2548       $t->parser( $db )
2549         or $self->throw_exception ($t->error);
2550
2551       my $out = $t->translate( $filename )
2552         or $self->throw_exception ($t->error);
2553
2554       $dest_schema = $t->schema;
2555
2556       $dest_schema->name( $filename )
2557         unless $dest_schema->name;
2558     }
2559
2560     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2561                                                   $dest_schema,   $db,
2562                                                   $sqltargs
2563                                                  );
2564     if(!open $file, ">$difffile") {
2565       $self->throw_exception("Can't write to $difffile ($!)");
2566       next;
2567     }
2568     print $file $diff;
2569     close($file);
2570   }
2571 }
2572
2573 =head2 deployment_statements
2574
2575 =over 4
2576
2577 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2578
2579 =back
2580
2581 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2582
2583 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2584 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2585
2586 C<$directory> is used to return statements from files in a previously created
2587 L</create_ddl_dir> directory and is optional. The filenames are constructed
2588 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2589
2590 If no C<$directory> is specified then the statements are constructed on the
2591 fly using L<SQL::Translator> and C<$version> is ignored.
2592
2593 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2594
2595 =cut
2596
2597 sub deployment_statements {
2598   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2599   $type ||= $self->sqlt_type;
2600   $version ||= $schema->schema_version || '1.x';
2601   $dir ||= './';
2602   my $filename = $schema->ddl_filename($type, $version, $dir);
2603   if(-f $filename)
2604   {
2605       my $file;
2606       open($file, "<$filename")
2607         or $self->throw_exception("Can't open $filename ($!)");
2608       my @rows = <$file>;
2609       close($file);
2610       return join('', @rows);
2611   }
2612
2613   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2614     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2615   }
2616
2617   # sources needs to be a parser arg, but for simplicty allow at top level
2618   # coming in
2619   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2620       if exists $sqltargs->{sources};
2621
2622   my $tr = SQL::Translator->new(
2623     producer => "SQL::Translator::Producer::${type}",
2624     %$sqltargs,
2625     parser => 'SQL::Translator::Parser::DBIx::Class',
2626     data => $schema,
2627   );
2628
2629   my @ret;
2630   my $wa = wantarray;
2631   if ($wa) {
2632     @ret = $tr->translate;
2633   }
2634   else {
2635     $ret[0] = $tr->translate;
2636   }
2637
2638   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2639     unless (@ret && defined $ret[0]);
2640
2641   return $wa ? @ret : $ret[0];
2642 }
2643
2644 sub deploy {
2645   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2646   my $deploy = sub {
2647     my $line = shift;
2648     return if($line =~ /^--/);
2649     return if(!$line);
2650     # next if($line =~ /^DROP/m);
2651     return if($line =~ /^BEGIN TRANSACTION/m);
2652     return if($line =~ /^COMMIT/m);
2653     return if $line =~ /^\s+$/; # skip whitespace only
2654     $self->_query_start($line);
2655     try {
2656       # do a dbh_do cycle here, as we need some error checking in
2657       # place (even though we will ignore errors)
2658       $self->dbh_do (sub { $_[1]->do($line) });
2659     } catch {
2660       carp qq{$_ (running "${line}")};
2661     };
2662     $self->_query_end($line);
2663   };
2664   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2665   if (@statements > 1) {
2666     foreach my $statement (@statements) {
2667       $deploy->( $statement );
2668     }
2669   }
2670   elsif (@statements == 1) {
2671     foreach my $line ( split(";\n", $statements[0])) {
2672       $deploy->( $line );
2673     }
2674   }
2675 }
2676
2677 =head2 datetime_parser
2678
2679 Returns the datetime parser class
2680
2681 =cut
2682
2683 sub datetime_parser {
2684   my $self = shift;
2685   return $self->{datetime_parser} ||= do {
2686     $self->build_datetime_parser(@_);
2687   };
2688 }
2689
2690 =head2 datetime_parser_type
2691
2692 Defines (returns) the datetime parser class - currently hardwired to
2693 L<DateTime::Format::MySQL>
2694
2695 =cut
2696
2697 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2698
2699 =head2 build_datetime_parser
2700
2701 See L</datetime_parser>
2702
2703 =cut
2704
2705 sub build_datetime_parser {
2706   my $self = shift;
2707   my $type = $self->datetime_parser_type(@_);
2708   $self->ensure_class_loaded ($type);
2709   return $type;
2710 }
2711
2712
2713 =head2 is_replicating
2714
2715 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2716 replicate from a master database.  Default is undef, which is the result
2717 returned by databases that don't support replication.
2718
2719 =cut
2720
2721 sub is_replicating {
2722     return;
2723
2724 }
2725
2726 =head2 lag_behind_master
2727
2728 Returns a number that represents a certain amount of lag behind a master db
2729 when a given storage is replicating.  The number is database dependent, but
2730 starts at zero and increases with the amount of lag. Default in undef
2731
2732 =cut
2733
2734 sub lag_behind_master {
2735     return;
2736 }
2737
2738 =head2 relname_to_table_alias
2739
2740 =over 4
2741
2742 =item Arguments: $relname, $join_count
2743
2744 =back
2745
2746 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2747 queries.
2748
2749 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2750 way these aliases are named.
2751
2752 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2753 otherwise C<"$relname">.
2754
2755 =cut
2756
2757 sub relname_to_table_alias {
2758   my ($self, $relname, $join_count) = @_;
2759
2760   my $alias = ($join_count && $join_count > 1 ?
2761     join('_', $relname, $join_count) : $relname);
2762
2763   return $alias;
2764 }
2765
2766 1;
2767
2768 =head1 USAGE NOTES
2769
2770 =head2 DBIx::Class and AutoCommit
2771
2772 DBIx::Class can do some wonderful magic with handling exceptions,
2773 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2774 (the default) combined with C<txn_do> for transaction support.
2775
2776 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2777 in an assumed transaction between commits, and you're telling us you'd
2778 like to manage that manually.  A lot of the magic protections offered by
2779 this module will go away.  We can't protect you from exceptions due to database
2780 disconnects because we don't know anything about how to restart your
2781 transactions.  You're on your own for handling all sorts of exceptional
2782 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2783 be with raw DBI.
2784
2785
2786 =head1 AUTHORS
2787
2788 Matt S. Trout <mst@shadowcatsystems.co.uk>
2789
2790 Andy Grundman <andy@hybridized.org>
2791
2792 =head1 LICENSE
2793
2794 You may distribute this code under the same terms as Perl itself.
2795
2796 =cut