Rename SQLAHacks to SQLMaker, shuffle around files, add extensive
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } @capabilities );
60
61
62 # Each of these methods need _determine_driver called before itself
63 # in order to function reliably. This is a purely DRY optimization
64 #
65 # get_(use)_dbms_capability need to be called on the correct Storage
66 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
67 # _determine_supports_X which obv. needs a correct driver as well
68 my @rdbms_specific_methods = qw/
69   deployment_statements
70   sqlt_type
71   sql_maker
72   build_datetime_parser
73   datetime_parser_type
74
75
76   insert
77   insert_bulk
78   update
79   delete
80   select
81   select_single
82
83   get_use_dbms_capability
84   get_dbms_capability
85 /;
86
87 for my $meth (@rdbms_specific_methods) {
88
89   my $orig = __PACKAGE__->can ($meth)
90     or die "$meth is not a ::Storage::DBI method!";
91
92   no strict qw/refs/;
93   no warnings qw/redefine/;
94   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
95     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
96       $_[0]->_determine_driver;
97       goto $_[0]->can($meth);
98     }
99     $orig->(@_);
100   };
101 }
102
103
104 =head1 NAME
105
106 DBIx::Class::Storage::DBI - DBI storage handler
107
108 =head1 SYNOPSIS
109
110   my $schema = MySchema->connect('dbi:SQLite:my.db');
111
112   $schema->storage->debug(1);
113
114   my @stuff = $schema->storage->dbh_do(
115     sub {
116       my ($storage, $dbh, @args) = @_;
117       $dbh->do("DROP TABLE authors");
118     },
119     @column_list
120   );
121
122   $schema->resultset('Book')->search({
123      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
124   });
125
126 =head1 DESCRIPTION
127
128 This class represents the connection to an RDBMS via L<DBI>.  See
129 L<DBIx::Class::Storage> for general information.  This pod only
130 documents DBI-specific methods and behaviors.
131
132 =head1 METHODS
133
134 =cut
135
136 sub new {
137   my $new = shift->next::method(@_);
138
139   $new->transaction_depth(0);
140   $new->_sql_maker_opts({});
141   $new->_dbh_details({});
142   $new->{savepoints} = [];
143   $new->{_in_dbh_do} = 0;
144   $new->{_dbh_gen} = 0;
145
146   # read below to see what this does
147   $new->_arm_global_destructor;
148
149   $new;
150 }
151
152 # This is hack to work around perl shooting stuff in random
153 # order on exit(). If we do not walk the remaining storage
154 # objects in an END block, there is a *small but real* chance
155 # of a fork()ed child to kill the parent's shared DBI handle,
156 # *before perl reaches the DESTROY in this package*
157 # Yes, it is ugly and effective.
158 {
159   my %seek_and_destroy;
160
161   sub _arm_global_destructor {
162     my $self = shift;
163     my $key = Scalar::Util::refaddr ($self);
164     $seek_and_destroy{$key} = $self;
165     Scalar::Util::weaken ($seek_and_destroy{$key});
166   }
167
168   END {
169     local $?; # just in case the DBI destructor changes it somehow
170
171     # destroy just the object if not native to this process/thread
172     $_->_preserve_foreign_dbh for (grep
173       { defined $_ }
174       values %seek_and_destroy
175     );
176   }
177 }
178
179 sub DESTROY {
180   my $self = shift;
181
182   # destroy just the object if not native to this process/thread
183   $self->_preserve_foreign_dbh;
184
185   # some databases need this to stop spewing warnings
186   if (my $dbh = $self->_dbh) {
187     try {
188       %{ $dbh->{CachedKids} } = ();
189       $dbh->disconnect;
190     };
191   }
192
193   $self->_dbh(undef);
194 }
195
196 sub _preserve_foreign_dbh {
197   my $self = shift;
198
199   return unless $self->_dbh;
200
201   $self->_verify_tid;
202
203   return unless $self->_dbh;
204
205   $self->_verify_pid;
206
207 }
208
209 # handle pid changes correctly - do not destroy parent's connection
210 sub _verify_pid {
211   my $self = shift;
212
213   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
214
215   $self->_dbh->{InactiveDestroy} = 1;
216   $self->_dbh(undef);
217   $self->{_dbh_gen}++;
218
219   return;
220 }
221
222 # very similar to above, but seems to FAIL if I set InactiveDestroy
223 sub _verify_tid {
224   my $self = shift;
225
226   if ( ! defined $self->_conn_tid ) {
227     return; # no threads
228   }
229   elsif ( $self->_conn_tid == threads->tid ) {
230     return; # same thread
231   }
232
233   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
234   $self->_dbh(undef);
235   $self->{_dbh_gen}++;
236
237   return;
238 }
239
240
241 =head2 connect_info
242
243 This method is normally called by L<DBIx::Class::Schema/connection>, which
244 encapsulates its argument list in an arrayref before passing them here.
245
246 The argument list may contain:
247
248 =over
249
250 =item *
251
252 The same 4-element argument set one would normally pass to
253 L<DBI/connect>, optionally followed by
254 L<extra attributes|/DBIx::Class specific connection attributes>
255 recognized by DBIx::Class:
256
257   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
258
259 =item *
260
261 A single code reference which returns a connected
262 L<DBI database handle|DBI/connect> optionally followed by
263 L<extra attributes|/DBIx::Class specific connection attributes> recognized
264 by DBIx::Class:
265
266   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
267
268 =item *
269
270 A single hashref with all the attributes and the dsn/user/password
271 mixed together:
272
273   $connect_info_args = [{
274     dsn => $dsn,
275     user => $user,
276     password => $pass,
277     %dbi_attributes,
278     %extra_attributes,
279   }];
280
281   $connect_info_args = [{
282     dbh_maker => sub { DBI->connect (...) },
283     %dbi_attributes,
284     %extra_attributes,
285   }];
286
287 This is particularly useful for L<Catalyst> based applications, allowing the
288 following config (L<Config::General> style):
289
290   <Model::DB>
291     schema_class   App::DB
292     <connect_info>
293       dsn          dbi:mysql:database=test
294       user         testuser
295       password     TestPass
296       AutoCommit   1
297     </connect_info>
298   </Model::DB>
299
300 The C<dsn>/C<user>/C<password> combination can be substituted by the
301 C<dbh_maker> key whose value is a coderef that returns a connected
302 L<DBI database handle|DBI/connect>
303
304 =back
305
306 Please note that the L<DBI> docs recommend that you always explicitly
307 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
308 recommends that it be set to I<1>, and that you perform transactions
309 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
310 to I<1> if you do not do explicitly set it to zero.  This is the default
311 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
312
313 =head3 DBIx::Class specific connection attributes
314
315 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
316 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
317 the following connection options. These options can be mixed in with your other
318 L<DBI> connection attributes, or placed in a separate hashref
319 (C<\%extra_attributes>) as shown above.
320
321 Every time C<connect_info> is invoked, any previous settings for
322 these options will be cleared before setting the new ones, regardless of
323 whether any options are specified in the new C<connect_info>.
324
325
326 =over
327
328 =item on_connect_do
329
330 Specifies things to do immediately after connecting or re-connecting to
331 the database.  Its value may contain:
332
333 =over
334
335 =item a scalar
336
337 This contains one SQL statement to execute.
338
339 =item an array reference
340
341 This contains SQL statements to execute in order.  Each element contains
342 a string or a code reference that returns a string.
343
344 =item a code reference
345
346 This contains some code to execute.  Unlike code references within an
347 array reference, its return value is ignored.
348
349 =back
350
351 =item on_disconnect_do
352
353 Takes arguments in the same form as L</on_connect_do> and executes them
354 immediately before disconnecting from the database.
355
356 Note, this only runs if you explicitly call L</disconnect> on the
357 storage object.
358
359 =item on_connect_call
360
361 A more generalized form of L</on_connect_do> that calls the specified
362 C<connect_call_METHOD> methods in your storage driver.
363
364   on_connect_do => 'select 1'
365
366 is equivalent to:
367
368   on_connect_call => [ [ do_sql => 'select 1' ] ]
369
370 Its values may contain:
371
372 =over
373
374 =item a scalar
375
376 Will call the C<connect_call_METHOD> method.
377
378 =item a code reference
379
380 Will execute C<< $code->($storage) >>
381
382 =item an array reference
383
384 Each value can be a method name or code reference.
385
386 =item an array of arrays
387
388 For each array, the first item is taken to be the C<connect_call_> method name
389 or code reference, and the rest are parameters to it.
390
391 =back
392
393 Some predefined storage methods you may use:
394
395 =over
396
397 =item do_sql
398
399 Executes a SQL string or a code reference that returns a SQL string. This is
400 what L</on_connect_do> and L</on_disconnect_do> use.
401
402 It can take:
403
404 =over
405
406 =item a scalar
407
408 Will execute the scalar as SQL.
409
410 =item an arrayref
411
412 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
413 attributes hashref and bind values.
414
415 =item a code reference
416
417 Will execute C<< $code->($storage) >> and execute the return array refs as
418 above.
419
420 =back
421
422 =item datetime_setup
423
424 Execute any statements necessary to initialize the database session to return
425 and accept datetime/timestamp values used with
426 L<DBIx::Class::InflateColumn::DateTime>.
427
428 Only necessary for some databases, see your specific storage driver for
429 implementation details.
430
431 =back
432
433 =item on_disconnect_call
434
435 Takes arguments in the same form as L</on_connect_call> and executes them
436 immediately before disconnecting from the database.
437
438 Calls the C<disconnect_call_METHOD> methods as opposed to the
439 C<connect_call_METHOD> methods called by L</on_connect_call>.
440
441 Note, this only runs if you explicitly call L</disconnect> on the
442 storage object.
443
444 =item disable_sth_caching
445
446 If set to a true value, this option will disable the caching of
447 statement handles via L<DBI/prepare_cached>.
448
449 =item limit_dialect
450
451 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
452 default L</sql_limit_dialect> setting of the storage (if any). For a list
453 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
454
455 =item quote_char
456
457 Specifies what characters to use to quote table and column names. If
458 you use this you will want to specify L</name_sep> as well.
459
460 C<quote_char> expects either a single character, in which case is it
461 is placed on either side of the table/column name, or an arrayref of length
462 2 in which case the table/column name is placed between the elements.
463
464 For example under MySQL you should use C<< quote_char => '`' >>, and for
465 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
466
467 =item name_sep
468
469 This only needs to be used in conjunction with C<quote_char>, and is used to
470 specify the character that separates elements (schemas, tables, columns) from
471 each other. In most cases this is simply a C<.>.
472
473 The consequences of not supplying this value is that L<SQL::Abstract>
474 will assume DBIx::Class' uses of aliases to be complete column
475 names. The output will look like I<"me.name"> when it should actually
476 be I<"me"."name">.
477
478 =item unsafe
479
480 This Storage driver normally installs its own C<HandleError>, sets
481 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
482 all database handles, including those supplied by a coderef.  It does this
483 so that it can have consistent and useful error behavior.
484
485 If you set this option to a true value, Storage will not do its usual
486 modifications to the database handle's attributes, and instead relies on
487 the settings in your connect_info DBI options (or the values you set in
488 your connection coderef, in the case that you are connecting via coderef).
489
490 Note that your custom settings can cause Storage to malfunction,
491 especially if you set a C<HandleError> handler that suppresses exceptions
492 and/or disable C<RaiseError>.
493
494 =item auto_savepoint
495
496 If this option is true, L<DBIx::Class> will use savepoints when nesting
497 transactions, making it possible to recover from failure in the inner
498 transaction without having to abort all outer transactions.
499
500 =item cursor_class
501
502 Use this argument to supply a cursor class other than the default
503 L<DBIx::Class::Storage::DBI::Cursor>.
504
505 =back
506
507 Some real-life examples of arguments to L</connect_info> and
508 L<DBIx::Class::Schema/connect>
509
510   # Simple SQLite connection
511   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
512
513   # Connect via subref
514   ->connect_info([ sub { DBI->connect(...) } ]);
515
516   # Connect via subref in hashref
517   ->connect_info([{
518     dbh_maker => sub { DBI->connect(...) },
519     on_connect_do => 'alter session ...',
520   }]);
521
522   # A bit more complicated
523   ->connect_info(
524     [
525       'dbi:Pg:dbname=foo',
526       'postgres',
527       'my_pg_password',
528       { AutoCommit => 1 },
529       { quote_char => q{"}, name_sep => q{.} },
530     ]
531   );
532
533   # Equivalent to the previous example
534   ->connect_info(
535     [
536       'dbi:Pg:dbname=foo',
537       'postgres',
538       'my_pg_password',
539       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
540     ]
541   );
542
543   # Same, but with hashref as argument
544   # See parse_connect_info for explanation
545   ->connect_info(
546     [{
547       dsn         => 'dbi:Pg:dbname=foo',
548       user        => 'postgres',
549       password    => 'my_pg_password',
550       AutoCommit  => 1,
551       quote_char  => q{"},
552       name_sep    => q{.},
553     }]
554   );
555
556   # Subref + DBIx::Class-specific connection options
557   ->connect_info(
558     [
559       sub { DBI->connect(...) },
560       {
561           quote_char => q{`},
562           name_sep => q{@},
563           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
564           disable_sth_caching => 1,
565       },
566     ]
567   );
568
569
570
571 =cut
572
573 sub connect_info {
574   my ($self, $info) = @_;
575
576   return $self->_connect_info if !$info;
577
578   $self->_connect_info($info); # copy for _connect_info
579
580   $info = $self->_normalize_connect_info($info)
581     if ref $info eq 'ARRAY';
582
583   for my $storage_opt (keys %{ $info->{storage_options} }) {
584     my $value = $info->{storage_options}{$storage_opt};
585
586     $self->$storage_opt($value);
587   }
588
589   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
590   #  the new set of options
591   $self->_sql_maker(undef);
592   $self->_sql_maker_opts({});
593
594   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
595     my $value = $info->{sql_maker_options}{$sql_maker_opt};
596
597     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
598   }
599
600   my %attrs = (
601     %{ $self->_default_dbi_connect_attributes || {} },
602     %{ $info->{attributes} || {} },
603   );
604
605   my @args = @{ $info->{arguments} };
606
607   $self->_dbi_connect_info([@args,
608     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
609
610   # FIXME - dirty:
611   # save attributes them in a separate accessor so they are always
612   # introspectable, even in case of a CODE $dbhmaker
613   $self->_dbic_connect_attributes (\%attrs);
614
615   return $self->_connect_info;
616 }
617
618 sub _normalize_connect_info {
619   my ($self, $info_arg) = @_;
620   my %info;
621
622   my @args = @$info_arg;  # take a shallow copy for further mutilation
623
624   # combine/pre-parse arguments depending on invocation style
625
626   my %attrs;
627   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
628     %attrs = %{ $args[1] || {} };
629     @args = $args[0];
630   }
631   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
632     %attrs = %{$args[0]};
633     @args = ();
634     if (my $code = delete $attrs{dbh_maker}) {
635       @args = $code;
636
637       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
638       if (@ignored) {
639         carp sprintf (
640             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
641           . "to the result of 'dbh_maker'",
642
643           join (', ', map { "'$_'" } (@ignored) ),
644         );
645       }
646     }
647     else {
648       @args = delete @attrs{qw/dsn user password/};
649     }
650   }
651   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
652     %attrs = (
653       % { $args[3] || {} },
654       % { $args[4] || {} },
655     );
656     @args = @args[0,1,2];
657   }
658
659   $info{arguments} = \@args;
660
661   my @storage_opts = grep exists $attrs{$_},
662     @storage_options, 'cursor_class';
663
664   @{ $info{storage_options} }{@storage_opts} =
665     delete @attrs{@storage_opts} if @storage_opts;
666
667   my @sql_maker_opts = grep exists $attrs{$_},
668     qw/limit_dialect quote_char name_sep/;
669
670   @{ $info{sql_maker_options} }{@sql_maker_opts} =
671     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
672
673   $info{attributes} = \%attrs if %attrs;
674
675   return \%info;
676 }
677
678 sub _default_dbi_connect_attributes {
679   return {
680     AutoCommit => 1,
681     RaiseError => 1,
682     PrintError => 0,
683   };
684 }
685
686 =head2 on_connect_do
687
688 This method is deprecated in favour of setting via L</connect_info>.
689
690 =cut
691
692 =head2 on_disconnect_do
693
694 This method is deprecated in favour of setting via L</connect_info>.
695
696 =cut
697
698 sub _parse_connect_do {
699   my ($self, $type) = @_;
700
701   my $val = $self->$type;
702   return () if not defined $val;
703
704   my @res;
705
706   if (not ref($val)) {
707     push @res, [ 'do_sql', $val ];
708   } elsif (ref($val) eq 'CODE') {
709     push @res, $val;
710   } elsif (ref($val) eq 'ARRAY') {
711     push @res, map { [ 'do_sql', $_ ] } @$val;
712   } else {
713     $self->throw_exception("Invalid type for $type: ".ref($val));
714   }
715
716   return \@res;
717 }
718
719 =head2 dbh_do
720
721 Arguments: ($subref | $method_name), @extra_coderef_args?
722
723 Execute the given $subref or $method_name using the new exception-based
724 connection management.
725
726 The first two arguments will be the storage object that C<dbh_do> was called
727 on and a database handle to use.  Any additional arguments will be passed
728 verbatim to the called subref as arguments 2 and onwards.
729
730 Using this (instead of $self->_dbh or $self->dbh) ensures correct
731 exception handling and reconnection (or failover in future subclasses).
732
733 Your subref should have no side-effects outside of the database, as
734 there is the potential for your subref to be partially double-executed
735 if the database connection was stale/dysfunctional.
736
737 Example:
738
739   my @stuff = $schema->storage->dbh_do(
740     sub {
741       my ($storage, $dbh, @cols) = @_;
742       my $cols = join(q{, }, @cols);
743       $dbh->selectrow_array("SELECT $cols FROM foo");
744     },
745     @column_list
746   );
747
748 =cut
749
750 sub dbh_do {
751   my $self = shift;
752   my $code = shift;
753
754   my $dbh = $self->_get_dbh;
755
756   return $self->$code($dbh, @_)
757     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
758
759   local $self->{_in_dbh_do} = 1;
760
761   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
762   my $args = \@_;
763   return try {
764     $self->$code ($dbh, @$args);
765   } catch {
766     $self->throw_exception($_) if $self->connected;
767
768     # We were not connected - reconnect and retry, but let any
769     #  exception fall right through this time
770     carp "Retrying $code after catching disconnected exception: $_"
771       if $ENV{DBIC_DBIRETRY_DEBUG};
772
773     $self->_populate_dbh;
774     $self->$code($self->_dbh, @$args);
775   };
776 }
777
778 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
779 # It also informs dbh_do to bypass itself while under the direction of txn_do,
780 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
781 sub txn_do {
782   my $self = shift;
783   my $coderef = shift;
784
785   ref $coderef eq 'CODE' or $self->throw_exception
786     ('$coderef must be a CODE reference');
787
788   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
789
790   local $self->{_in_dbh_do} = 1;
791
792   my @result;
793   my $want_array = wantarray;
794
795   my $tried = 0;
796   while(1) {
797     my $exception;
798
799     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
800     my $args = \@_;
801
802     try {
803       $self->_get_dbh;
804
805       $self->txn_begin;
806       if($want_array) {
807           @result = $coderef->(@$args);
808       }
809       elsif(defined $want_array) {
810           $result[0] = $coderef->(@$args);
811       }
812       else {
813           $coderef->(@$args);
814       }
815       $self->txn_commit;
816     } catch {
817       $exception = $_;
818     };
819
820     if(! defined $exception) { return $want_array ? @result : $result[0] }
821
822     if($tried++ || $self->connected) {
823       my $rollback_exception;
824       try { $self->txn_rollback } catch { $rollback_exception = shift };
825       if(defined $rollback_exception) {
826         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
827         $self->throw_exception($exception)  # propagate nested rollback
828           if $rollback_exception =~ /$exception_class/;
829
830         $self->throw_exception(
831           "Transaction aborted: ${exception}. "
832           . "Rollback failed: ${rollback_exception}"
833         );
834       }
835       $self->throw_exception($exception)
836     }
837
838     # We were not connected, and was first try - reconnect and retry
839     # via the while loop
840     carp "Retrying $coderef after catching disconnected exception: $exception"
841       if $ENV{DBIC_DBIRETRY_DEBUG};
842     $self->_populate_dbh;
843   }
844 }
845
846 =head2 disconnect
847
848 Our C<disconnect> method also performs a rollback first if the
849 database is not in C<AutoCommit> mode.
850
851 =cut
852
853 sub disconnect {
854   my ($self) = @_;
855
856   if( $self->_dbh ) {
857     my @actions;
858
859     push @actions, ( $self->on_disconnect_call || () );
860     push @actions, $self->_parse_connect_do ('on_disconnect_do');
861
862     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
863
864     $self->_dbh_rollback unless $self->_dbh_autocommit;
865
866     %{ $self->_dbh->{CachedKids} } = ();
867     $self->_dbh->disconnect;
868     $self->_dbh(undef);
869     $self->{_dbh_gen}++;
870   }
871 }
872
873 =head2 with_deferred_fk_checks
874
875 =over 4
876
877 =item Arguments: C<$coderef>
878
879 =item Return Value: The return value of $coderef
880
881 =back
882
883 Storage specific method to run the code ref with FK checks deferred or
884 in MySQL's case disabled entirely.
885
886 =cut
887
888 # Storage subclasses should override this
889 sub with_deferred_fk_checks {
890   my ($self, $sub) = @_;
891   $sub->();
892 }
893
894 =head2 connected
895
896 =over
897
898 =item Arguments: none
899
900 =item Return Value: 1|0
901
902 =back
903
904 Verifies that the current database handle is active and ready to execute
905 an SQL statement (e.g. the connection did not get stale, server is still
906 answering, etc.) This method is used internally by L</dbh>.
907
908 =cut
909
910 sub connected {
911   my $self = shift;
912   return 0 unless $self->_seems_connected;
913
914   #be on the safe side
915   local $self->_dbh->{RaiseError} = 1;
916
917   return $self->_ping;
918 }
919
920 sub _seems_connected {
921   my $self = shift;
922
923   $self->_preserve_foreign_dbh;
924
925   my $dbh = $self->_dbh
926     or return 0;
927
928   return $dbh->FETCH('Active');
929 }
930
931 sub _ping {
932   my $self = shift;
933
934   my $dbh = $self->_dbh or return 0;
935
936   return $dbh->ping;
937 }
938
939 sub ensure_connected {
940   my ($self) = @_;
941
942   unless ($self->connected) {
943     $self->_populate_dbh;
944   }
945 }
946
947 =head2 dbh
948
949 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
950 is guaranteed to be healthy by implicitly calling L</connected>, and if
951 necessary performing a reconnection before returning. Keep in mind that this
952 is very B<expensive> on some database engines. Consider using L</dbh_do>
953 instead.
954
955 =cut
956
957 sub dbh {
958   my ($self) = @_;
959
960   if (not $self->_dbh) {
961     $self->_populate_dbh;
962   } else {
963     $self->ensure_connected;
964   }
965   return $self->_dbh;
966 }
967
968 # this is the internal "get dbh or connect (don't check)" method
969 sub _get_dbh {
970   my $self = shift;
971   $self->_preserve_foreign_dbh;
972   $self->_populate_dbh unless $self->_dbh;
973   return $self->_dbh;
974 }
975
976 sub sql_maker {
977   my ($self) = @_;
978   unless ($self->_sql_maker) {
979     my $sql_maker_class = $self->sql_maker_class;
980     $self->ensure_class_loaded ($sql_maker_class);
981
982     my %opts = %{$self->_sql_maker_opts||{}};
983     my $dialect =
984       $opts{limit_dialect}
985         ||
986       $self->sql_limit_dialect
987         ||
988       do {
989         my $s_class = (ref $self) || $self;
990         carp (
991           "Your storage class ($s_class) does not set sql_limit_dialect and you "
992         . 'have not supplied an explicit limit_dialect in your connection_info. '
993         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
994         . 'databases but can be (and often is) painfully slow.'
995         );
996
997         'GenericSubQ';
998       }
999     ;
1000
1001     $self->_sql_maker($sql_maker_class->new(
1002       bindtype=>'columns',
1003       array_datatypes => 1,
1004       limit_dialect => $dialect,
1005       %opts,
1006     ));
1007   }
1008   return $self->_sql_maker;
1009 }
1010
1011 # nothing to do by default
1012 sub _rebless {}
1013 sub _init {}
1014
1015 sub _populate_dbh {
1016   my ($self) = @_;
1017
1018   my @info = @{$self->_dbi_connect_info || []};
1019   $self->_dbh(undef); # in case ->connected failed we might get sent here
1020   $self->_dbh_details({}); # reset everything we know
1021
1022   $self->_dbh($self->_connect(@info));
1023
1024   $self->_conn_pid($$);
1025   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1026
1027   $self->_determine_driver;
1028
1029   # Always set the transaction depth on connect, since
1030   #  there is no transaction in progress by definition
1031   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1032
1033   $self->_run_connection_actions unless $self->{_in_determine_driver};
1034 }
1035
1036 sub _run_connection_actions {
1037   my $self = shift;
1038   my @actions;
1039
1040   push @actions, ( $self->on_connect_call || () );
1041   push @actions, $self->_parse_connect_do ('on_connect_do');
1042
1043   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1044 }
1045
1046
1047
1048 sub set_use_dbms_capability {
1049   $_[0]->set_inherited ($_[1], $_[2]);
1050 }
1051
1052 sub get_use_dbms_capability {
1053   my ($self, $capname) = @_;
1054
1055   my $use = $self->get_inherited ($capname);
1056   return defined $use
1057     ? $use
1058     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1059   ;
1060 }
1061
1062 sub set_dbms_capability {
1063   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1064 }
1065
1066 sub get_dbms_capability {
1067   my ($self, $capname) = @_;
1068
1069   my $cap = $self->_dbh_details->{capability}{$capname};
1070
1071   unless (defined $cap) {
1072     if (my $meth = $self->can ("_determine$capname")) {
1073       $cap = $self->$meth ? 1 : 0;
1074     }
1075     else {
1076       $cap = 0;
1077     }
1078
1079     $self->set_dbms_capability ($capname, $cap);
1080   }
1081
1082   return $cap;
1083 }
1084
1085 sub _server_info {
1086   my $self = shift;
1087
1088   my $info;
1089   unless ($info = $self->_dbh_details->{info}) {
1090
1091     $info = {};
1092
1093     my $server_version = try { $self->_get_server_version };
1094
1095     if (defined $server_version) {
1096       $info->{dbms_version} = $server_version;
1097
1098       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1099       my @verparts = split (/\./, $numeric_version);
1100       if (
1101         @verparts
1102           &&
1103         $verparts[0] <= 999
1104       ) {
1105         # consider only up to 3 version parts, iff not more than 3 digits
1106         my @use_parts;
1107         while (@verparts && @use_parts < 3) {
1108           my $p = shift @verparts;
1109           last if $p > 999;
1110           push @use_parts, $p;
1111         }
1112         push @use_parts, 0 while @use_parts < 3;
1113
1114         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1115       }
1116     }
1117
1118     $self->_dbh_details->{info} = $info;
1119   }
1120
1121   return $info;
1122 }
1123
1124 sub _get_server_version {
1125   shift->_get_dbh->get_info(18);
1126 }
1127
1128 sub _determine_driver {
1129   my ($self) = @_;
1130
1131   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1132     my $started_connected = 0;
1133     local $self->{_in_determine_driver} = 1;
1134
1135     if (ref($self) eq __PACKAGE__) {
1136       my $driver;
1137       if ($self->_dbh) { # we are connected
1138         $driver = $self->_dbh->{Driver}{Name};
1139         $started_connected = 1;
1140       } else {
1141         # if connect_info is a CODEREF, we have no choice but to connect
1142         if (ref $self->_dbi_connect_info->[0] &&
1143             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1144           $self->_populate_dbh;
1145           $driver = $self->_dbh->{Driver}{Name};
1146         }
1147         else {
1148           # try to use dsn to not require being connected, the driver may still
1149           # force a connection in _rebless to determine version
1150           # (dsn may not be supplied at all if all we do is make a mock-schema)
1151           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1152           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1153           $driver ||= $ENV{DBI_DRIVER};
1154         }
1155       }
1156
1157       if ($driver) {
1158         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1159         if ($self->load_optional_class($storage_class)) {
1160           mro::set_mro($storage_class, 'c3');
1161           bless $self, $storage_class;
1162           $self->_rebless();
1163         }
1164       }
1165     }
1166
1167     $self->_driver_determined(1);
1168
1169     $self->_init; # run driver-specific initializations
1170
1171     $self->_run_connection_actions
1172         if !$started_connected && defined $self->_dbh;
1173   }
1174 }
1175
1176 sub _do_connection_actions {
1177   my $self          = shift;
1178   my $method_prefix = shift;
1179   my $call          = shift;
1180
1181   if (not ref($call)) {
1182     my $method = $method_prefix . $call;
1183     $self->$method(@_);
1184   } elsif (ref($call) eq 'CODE') {
1185     $self->$call(@_);
1186   } elsif (ref($call) eq 'ARRAY') {
1187     if (ref($call->[0]) ne 'ARRAY') {
1188       $self->_do_connection_actions($method_prefix, $_) for @$call;
1189     } else {
1190       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1191     }
1192   } else {
1193     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1194   }
1195
1196   return $self;
1197 }
1198
1199 sub connect_call_do_sql {
1200   my $self = shift;
1201   $self->_do_query(@_);
1202 }
1203
1204 sub disconnect_call_do_sql {
1205   my $self = shift;
1206   $self->_do_query(@_);
1207 }
1208
1209 # override in db-specific backend when necessary
1210 sub connect_call_datetime_setup { 1 }
1211
1212 sub _do_query {
1213   my ($self, $action) = @_;
1214
1215   if (ref $action eq 'CODE') {
1216     $action = $action->($self);
1217     $self->_do_query($_) foreach @$action;
1218   }
1219   else {
1220     # Most debuggers expect ($sql, @bind), so we need to exclude
1221     # the attribute hash which is the second argument to $dbh->do
1222     # furthermore the bind values are usually to be presented
1223     # as named arrayref pairs, so wrap those here too
1224     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1225     my $sql = shift @do_args;
1226     my $attrs = shift @do_args;
1227     my @bind = map { [ undef, $_ ] } @do_args;
1228
1229     $self->_query_start($sql, @bind);
1230     $self->_get_dbh->do($sql, $attrs, @do_args);
1231     $self->_query_end($sql, @bind);
1232   }
1233
1234   return $self;
1235 }
1236
1237 sub _connect {
1238   my ($self, @info) = @_;
1239
1240   $self->throw_exception("You failed to provide any connection info")
1241     if !@info;
1242
1243   my ($old_connect_via, $dbh);
1244
1245   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1246     $old_connect_via = $DBI::connect_via;
1247     $DBI::connect_via = 'connect';
1248   }
1249
1250   try {
1251     if(ref $info[0] eq 'CODE') {
1252        $dbh = $info[0]->();
1253     }
1254     else {
1255        $dbh = DBI->connect(@info);
1256     }
1257
1258     if (!$dbh) {
1259       die $DBI::errstr;
1260     }
1261
1262     unless ($self->unsafe) {
1263
1264       # this odd anonymous coderef dereference is in fact really
1265       # necessary to avoid the unwanted effect described in perl5
1266       # RT#75792
1267       sub {
1268         my $weak_self = $_[0];
1269         weaken $weak_self;
1270
1271         $_[1]->{HandleError} = sub {
1272           if ($weak_self) {
1273             $weak_self->throw_exception("DBI Exception: $_[0]");
1274           }
1275           else {
1276             # the handler may be invoked by something totally out of
1277             # the scope of DBIC
1278             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1279           }
1280         };
1281       }->($self, $dbh);
1282
1283       $dbh->{ShowErrorStatement} = 1;
1284       $dbh->{RaiseError} = 1;
1285       $dbh->{PrintError} = 0;
1286     }
1287   }
1288   catch {
1289     $self->throw_exception("DBI Connection failed: $_")
1290   }
1291   finally {
1292     $DBI::connect_via = $old_connect_via if $old_connect_via;
1293   };
1294
1295   $self->_dbh_autocommit($dbh->{AutoCommit});
1296   $dbh;
1297 }
1298
1299 sub svp_begin {
1300   my ($self, $name) = @_;
1301
1302   $name = $self->_svp_generate_name
1303     unless defined $name;
1304
1305   $self->throw_exception ("You can't use savepoints outside a transaction")
1306     if $self->{transaction_depth} == 0;
1307
1308   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1309     unless $self->can('_svp_begin');
1310
1311   push @{ $self->{savepoints} }, $name;
1312
1313   $self->debugobj->svp_begin($name) if $self->debug;
1314
1315   return $self->_svp_begin($name);
1316 }
1317
1318 sub svp_release {
1319   my ($self, $name) = @_;
1320
1321   $self->throw_exception ("You can't use savepoints outside a transaction")
1322     if $self->{transaction_depth} == 0;
1323
1324   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1325     unless $self->can('_svp_release');
1326
1327   if (defined $name) {
1328     $self->throw_exception ("Savepoint '$name' does not exist")
1329       unless grep { $_ eq $name } @{ $self->{savepoints} };
1330
1331     # Dig through the stack until we find the one we are releasing.  This keeps
1332     # the stack up to date.
1333     my $svp;
1334
1335     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1336   } else {
1337     $name = pop @{ $self->{savepoints} };
1338   }
1339
1340   $self->debugobj->svp_release($name) if $self->debug;
1341
1342   return $self->_svp_release($name);
1343 }
1344
1345 sub svp_rollback {
1346   my ($self, $name) = @_;
1347
1348   $self->throw_exception ("You can't use savepoints outside a transaction")
1349     if $self->{transaction_depth} == 0;
1350
1351   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1352     unless $self->can('_svp_rollback');
1353
1354   if (defined $name) {
1355       # If they passed us a name, verify that it exists in the stack
1356       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1357           $self->throw_exception("Savepoint '$name' does not exist!");
1358       }
1359
1360       # Dig through the stack until we find the one we are releasing.  This keeps
1361       # the stack up to date.
1362       while(my $s = pop(@{ $self->{savepoints} })) {
1363           last if($s eq $name);
1364       }
1365       # Add the savepoint back to the stack, as a rollback doesn't remove the
1366       # named savepoint, only everything after it.
1367       push(@{ $self->{savepoints} }, $name);
1368   } else {
1369       # We'll assume they want to rollback to the last savepoint
1370       $name = $self->{savepoints}->[-1];
1371   }
1372
1373   $self->debugobj->svp_rollback($name) if $self->debug;
1374
1375   return $self->_svp_rollback($name);
1376 }
1377
1378 sub _svp_generate_name {
1379     my ($self) = @_;
1380
1381     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1382 }
1383
1384 sub txn_begin {
1385   my $self = shift;
1386
1387   # this means we have not yet connected and do not know the AC status
1388   # (e.g. coderef $dbh)
1389   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1390
1391   if($self->{transaction_depth} == 0) {
1392     $self->debugobj->txn_begin()
1393       if $self->debug;
1394     $self->_dbh_begin_work;
1395   }
1396   elsif ($self->auto_savepoint) {
1397     $self->svp_begin;
1398   }
1399   $self->{transaction_depth}++;
1400 }
1401
1402 sub _dbh_begin_work {
1403   my $self = shift;
1404
1405   # if the user is utilizing txn_do - good for him, otherwise we need to
1406   # ensure that the $dbh is healthy on BEGIN.
1407   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1408   # will be replaced by a failure of begin_work itself (which will be
1409   # then retried on reconnect)
1410   if ($self->{_in_dbh_do}) {
1411     $self->_dbh->begin_work;
1412   } else {
1413     $self->dbh_do(sub { $_[1]->begin_work });
1414   }
1415 }
1416
1417 sub txn_commit {
1418   my $self = shift;
1419   if ($self->{transaction_depth} == 1) {
1420     $self->debugobj->txn_commit()
1421       if ($self->debug);
1422     $self->_dbh_commit;
1423     $self->{transaction_depth} = 0
1424       if $self->_dbh_autocommit;
1425   }
1426   elsif($self->{transaction_depth} > 1) {
1427     $self->{transaction_depth}--;
1428     $self->svp_release
1429       if $self->auto_savepoint;
1430   }
1431 }
1432
1433 sub _dbh_commit {
1434   my $self = shift;
1435   my $dbh  = $self->_dbh
1436     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1437   $dbh->commit;
1438 }
1439
1440 sub txn_rollback {
1441   my $self = shift;
1442   my $dbh = $self->_dbh;
1443   try {
1444     if ($self->{transaction_depth} == 1) {
1445       $self->debugobj->txn_rollback()
1446         if ($self->debug);
1447       $self->{transaction_depth} = 0
1448         if $self->_dbh_autocommit;
1449       $self->_dbh_rollback;
1450     }
1451     elsif($self->{transaction_depth} > 1) {
1452       $self->{transaction_depth}--;
1453       if ($self->auto_savepoint) {
1454         $self->svp_rollback;
1455         $self->svp_release;
1456       }
1457     }
1458     else {
1459       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1460     }
1461   }
1462   catch {
1463     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1464
1465     if ($_ !~ /$exception_class/) {
1466       # ensure that a failed rollback resets the transaction depth
1467       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1468     }
1469
1470     $self->throw_exception($_)
1471   };
1472 }
1473
1474 sub _dbh_rollback {
1475   my $self = shift;
1476   my $dbh  = $self->_dbh
1477     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1478   $dbh->rollback;
1479 }
1480
1481 # This used to be the top-half of _execute.  It was split out to make it
1482 #  easier to override in NoBindVars without duping the rest.  It takes up
1483 #  all of _execute's args, and emits $sql, @bind.
1484 sub _prep_for_execute {
1485   my ($self, $op, $extra_bind, $ident, $args) = @_;
1486
1487   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1488     $ident = $ident->from();
1489   }
1490
1491   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1492
1493   unshift(@bind,
1494     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1495       if $extra_bind;
1496   return ($sql, \@bind);
1497 }
1498
1499
1500 sub _fix_bind_params {
1501     my ($self, @bind) = @_;
1502
1503     ### Turn @bind from something like this:
1504     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1505     ### to this:
1506     ###   ( "'1'", "'1'", "'3'" )
1507     return
1508         map {
1509             if ( defined( $_ && $_->[1] ) ) {
1510                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1511             }
1512             else { q{'NULL'}; }
1513         } @bind;
1514 }
1515
1516 sub _query_start {
1517     my ( $self, $sql, @bind ) = @_;
1518
1519     if ( $self->debug ) {
1520         @bind = $self->_fix_bind_params(@bind);
1521
1522         $self->debugobj->query_start( $sql, @bind );
1523     }
1524 }
1525
1526 sub _query_end {
1527     my ( $self, $sql, @bind ) = @_;
1528
1529     if ( $self->debug ) {
1530         @bind = $self->_fix_bind_params(@bind);
1531         $self->debugobj->query_end( $sql, @bind );
1532     }
1533 }
1534
1535 sub _dbh_execute {
1536   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1537
1538   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1539
1540   $self->_query_start( $sql, @$bind );
1541
1542   my $sth = $self->sth($sql,$op);
1543
1544   my $placeholder_index = 1;
1545
1546   foreach my $bound (@$bind) {
1547     my $attributes = {};
1548     my($column_name, @data) = @$bound;
1549
1550     if ($bind_attributes) {
1551       $attributes = $bind_attributes->{$column_name}
1552       if defined $bind_attributes->{$column_name};
1553     }
1554
1555     foreach my $data (@data) {
1556       my $ref = ref $data;
1557       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1558
1559       $sth->bind_param($placeholder_index, $data, $attributes);
1560       $placeholder_index++;
1561     }
1562   }
1563
1564   # Can this fail without throwing an exception anyways???
1565   my $rv = $sth->execute();
1566   $self->throw_exception(
1567     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1568   ) if !$rv;
1569
1570   $self->_query_end( $sql, @$bind );
1571
1572   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1573 }
1574
1575 sub _execute {
1576     my $self = shift;
1577     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1578 }
1579
1580 sub _prefetch_insert_auto_nextvals {
1581   my ($self, $source, $to_insert) = @_;
1582
1583   my $upd = {};
1584
1585   foreach my $col ( $source->columns ) {
1586     if ( !defined $to_insert->{$col} ) {
1587       my $col_info = $source->column_info($col);
1588
1589       if ( $col_info->{auto_nextval} ) {
1590         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1591           'nextval',
1592           $col_info->{sequence} ||=
1593             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1594         );
1595       }
1596     }
1597   }
1598
1599   return $upd;
1600 }
1601
1602 sub insert {
1603   my $self = shift;
1604   my ($source, $to_insert, $opts) = @_;
1605
1606   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1607
1608   my $bind_attributes = $self->source_bind_attributes($source);
1609
1610   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1611
1612   if ($opts->{returning}) {
1613     my @ret_cols = @{$opts->{returning}};
1614
1615     my @ret_vals = try {
1616       local $SIG{__WARN__} = sub {};
1617       my @r = $sth->fetchrow_array;
1618       $sth->finish;
1619       @r;
1620     };
1621
1622     my %ret;
1623     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1624
1625     $updated_cols = {
1626       %$updated_cols,
1627       %ret,
1628     };
1629   }
1630
1631   return $updated_cols;
1632 }
1633
1634 ## Currently it is assumed that all values passed will be "normal", i.e. not
1635 ## scalar refs, or at least, all the same type as the first set, the statement is
1636 ## only prepped once.
1637 sub insert_bulk {
1638   my ($self, $source, $cols, $data) = @_;
1639
1640   my %colvalues;
1641   @colvalues{@$cols} = (0..$#$cols);
1642
1643   for my $i (0..$#$cols) {
1644     my $first_val = $data->[0][$i];
1645     next unless ref $first_val eq 'SCALAR';
1646
1647     $colvalues{ $cols->[$i] } = $first_val;
1648   }
1649
1650   # check for bad data and stringify stringifiable objects
1651   my $bad_slice = sub {
1652     my ($msg, $col_idx, $slice_idx) = @_;
1653     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1654       $msg,
1655       $cols->[$col_idx],
1656       do {
1657         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1658         Dumper {
1659           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1660         },
1661       }
1662     );
1663   };
1664
1665   for my $datum_idx (0..$#$data) {
1666     my $datum = $data->[$datum_idx];
1667
1668     for my $col_idx (0..$#$cols) {
1669       my $val            = $datum->[$col_idx];
1670       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1671       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1672
1673       if ($is_literal_sql) {
1674         if (not ref $val) {
1675           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1676         }
1677         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1678           $bad_slice->("$reftype reference found where literal SQL expected",
1679             $col_idx, $datum_idx);
1680         }
1681         elsif ($$val ne $$sqla_bind){
1682           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1683             $col_idx, $datum_idx);
1684         }
1685       }
1686       elsif (my $reftype = ref $val) {
1687         require overload;
1688         if (overload::Method($val, '""')) {
1689           $datum->[$col_idx] = "".$val;
1690         }
1691         else {
1692           $bad_slice->("$reftype reference found where bind expected",
1693             $col_idx, $datum_idx);
1694         }
1695       }
1696     }
1697   }
1698
1699   my ($sql, $bind) = $self->_prep_for_execute (
1700     'insert', undef, $source, [\%colvalues]
1701   );
1702   my @bind = @$bind;
1703
1704   my $empty_bind = 1 if (not @bind) &&
1705     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1706
1707   if ((not @bind) && (not $empty_bind)) {
1708     $self->throw_exception(
1709       'Cannot insert_bulk without support for placeholders'
1710     );
1711   }
1712
1713   # neither _execute_array, nor _execute_inserts_with_no_binds are
1714   # atomic (even if _execute _array is a single call). Thus a safety
1715   # scope guard
1716   my $guard = $self->txn_scope_guard;
1717
1718   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1719   my $sth = $self->sth($sql);
1720   my $rv = do {
1721     if ($empty_bind) {
1722       # bind_param_array doesn't work if there are no binds
1723       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1724     }
1725     else {
1726 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1727       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1728     }
1729   };
1730
1731   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1732
1733   $guard->commit;
1734
1735   return (wantarray ? ($rv, $sth, @bind) : $rv);
1736 }
1737
1738 sub _execute_array {
1739   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1740
1741   ## This must be an arrayref, else nothing works!
1742   my $tuple_status = [];
1743
1744   ## Get the bind_attributes, if any exist
1745   my $bind_attributes = $self->source_bind_attributes($source);
1746
1747   ## Bind the values and execute
1748   my $placeholder_index = 1;
1749
1750   foreach my $bound (@$bind) {
1751
1752     my $attributes = {};
1753     my ($column_name, $data_index) = @$bound;
1754
1755     if( $bind_attributes ) {
1756       $attributes = $bind_attributes->{$column_name}
1757       if defined $bind_attributes->{$column_name};
1758     }
1759
1760     my @data = map { $_->[$data_index] } @$data;
1761
1762     $sth->bind_param_array(
1763       $placeholder_index,
1764       [@data],
1765       (%$attributes ?  $attributes : ()),
1766     );
1767     $placeholder_index++;
1768   }
1769
1770   my ($rv, $err);
1771   try {
1772     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1773   }
1774   catch {
1775     $err = shift;
1776   }
1777   finally {
1778     # Statement must finish even if there was an exception.
1779     try {
1780       $sth->finish
1781     }
1782     catch {
1783       $err = shift unless defined $err
1784     };
1785   };
1786
1787   $err = $sth->errstr
1788     if (! defined $err and $sth->err);
1789
1790   if (defined $err) {
1791     my $i = 0;
1792     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1793
1794     $self->throw_exception("Unexpected populate error: $err")
1795       if ($i > $#$tuple_status);
1796
1797     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1798       ($tuple_status->[$i][1] || $err),
1799       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1800     );
1801   }
1802
1803   return $rv;
1804 }
1805
1806 sub _dbh_execute_array {
1807     my ($self, $sth, $tuple_status, @extra) = @_;
1808
1809     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1810 }
1811
1812 sub _dbh_execute_inserts_with_no_binds {
1813   my ($self, $sth, $count) = @_;
1814
1815   my $err;
1816   try {
1817     my $dbh = $self->_get_dbh;
1818     local $dbh->{RaiseError} = 1;
1819     local $dbh->{PrintError} = 0;
1820
1821     $sth->execute foreach 1..$count;
1822   }
1823   catch {
1824     $err = shift;
1825   }
1826   finally {
1827     # Make sure statement is finished even if there was an exception.
1828     try {
1829       $sth->finish
1830     }
1831     catch {
1832       $err = shift unless defined $err;
1833     };
1834   };
1835
1836   $self->throw_exception($err) if defined $err;
1837
1838   return $count;
1839 }
1840
1841 sub update {
1842   my ($self, $source, @args) = @_;
1843
1844   my $bind_attrs = $self->source_bind_attributes($source);
1845
1846   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1847 }
1848
1849
1850 sub delete {
1851   my ($self, $source, @args) = @_;
1852
1853   my $bind_attrs = $self->source_bind_attributes($source);
1854
1855   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1856 }
1857
1858 # We were sent here because the $rs contains a complex search
1859 # which will require a subquery to select the correct rows
1860 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1861 #
1862 # Generating a single PK column subquery is trivial and supported
1863 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1864 # Look at _multipk_update_delete()
1865 sub _subq_update_delete {
1866   my $self = shift;
1867   my ($rs, $op, $values) = @_;
1868
1869   my $rsrc = $rs->result_source;
1870
1871   # quick check if we got a sane rs on our hands
1872   my @pcols = $rsrc->_pri_cols;
1873
1874   my $sel = $rs->_resolved_attrs->{select};
1875   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1876
1877   if (
1878       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1879         ne
1880       join ("\x00", sort @$sel )
1881   ) {
1882     $self->throw_exception (
1883       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1884     );
1885   }
1886
1887   if (@pcols == 1) {
1888     return $self->$op (
1889       $rsrc,
1890       $op eq 'update' ? $values : (),
1891       { $pcols[0] => { -in => $rs->as_query } },
1892     );
1893   }
1894
1895   else {
1896     return $self->_multipk_update_delete (@_);
1897   }
1898 }
1899
1900 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1901 # resultset update/delete involving subqueries. So by default resort
1902 # to simple (and inefficient) delete_all style per-row opearations,
1903 # while allowing specific storages to override this with a faster
1904 # implementation.
1905 #
1906 sub _multipk_update_delete {
1907   return shift->_per_row_update_delete (@_);
1908 }
1909
1910 # This is the default loop used to delete/update rows for multi PK
1911 # resultsets, and used by mysql exclusively (because it can't do anything
1912 # else).
1913 #
1914 # We do not use $row->$op style queries, because resultset update/delete
1915 # is not expected to cascade (this is what delete_all/update_all is for).
1916 #
1917 # There should be no race conditions as the entire operation is rolled
1918 # in a transaction.
1919 #
1920 sub _per_row_update_delete {
1921   my $self = shift;
1922   my ($rs, $op, $values) = @_;
1923
1924   my $rsrc = $rs->result_source;
1925   my @pcols = $rsrc->_pri_cols;
1926
1927   my $guard = $self->txn_scope_guard;
1928
1929   # emulate the return value of $sth->execute for non-selects
1930   my $row_cnt = '0E0';
1931
1932   my $subrs_cur = $rs->cursor;
1933   my @all_pk = $subrs_cur->all;
1934   for my $pks ( @all_pk) {
1935
1936     my $cond;
1937     for my $i (0.. $#pcols) {
1938       $cond->{$pcols[$i]} = $pks->[$i];
1939     }
1940
1941     $self->$op (
1942       $rsrc,
1943       $op eq 'update' ? $values : (),
1944       $cond,
1945     );
1946
1947     $row_cnt++;
1948   }
1949
1950   $guard->commit;
1951
1952   return $row_cnt;
1953 }
1954
1955 sub _select {
1956   my $self = shift;
1957   $self->_execute($self->_select_args(@_));
1958 }
1959
1960 sub _select_args_to_query {
1961   my $self = shift;
1962
1963   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1964   #  = $self->_select_args($ident, $select, $cond, $attrs);
1965   my ($op, $bind, $ident, $bind_attrs, @args) =
1966     $self->_select_args(@_);
1967
1968   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1969   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1970   $prepared_bind ||= [];
1971
1972   return wantarray
1973     ? ($sql, $prepared_bind, $bind_attrs)
1974     : \[ "($sql)", @$prepared_bind ]
1975   ;
1976 }
1977
1978 sub _select_args {
1979   my ($self, $ident, $select, $where, $attrs) = @_;
1980
1981   my $sql_maker = $self->sql_maker;
1982   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1983
1984   $attrs = {
1985     %$attrs,
1986     select => $select,
1987     from => $ident,
1988     where => $where,
1989     $rs_alias && $alias2source->{$rs_alias}
1990       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1991       : ()
1992     ,
1993   };
1994
1995   # calculate bind_attrs before possible $ident mangling
1996   my $bind_attrs = {};
1997   for my $alias (keys %$alias2source) {
1998     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1999     for my $col (keys %$bindtypes) {
2000
2001       my $fqcn = join ('.', $alias, $col);
2002       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2003
2004       # Unqialified column names are nice, but at the same time can be
2005       # rather ambiguous. What we do here is basically go along with
2006       # the loop, adding an unqualified column slot to $bind_attrs,
2007       # alongside the fully qualified name. As soon as we encounter
2008       # another column by that name (which would imply another table)
2009       # we unset the unqualified slot and never add any info to it
2010       # to avoid erroneous type binding. If this happens the users
2011       # only choice will be to fully qualify his column name
2012
2013       if (exists $bind_attrs->{$col}) {
2014         $bind_attrs->{$col} = {};
2015       }
2016       else {
2017         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2018       }
2019     }
2020   }
2021
2022   # Sanity check the attributes (SQLMaker does it too, but
2023   # in case of a software_limit we'll never reach there)
2024   if (defined $attrs->{offset}) {
2025     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2026       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2027   }
2028   $attrs->{offset} ||= 0;
2029
2030   if (defined $attrs->{rows}) {
2031     $self->throw_exception("The rows attribute must be a positive integer if present")
2032       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2033   }
2034   elsif ($attrs->{offset}) {
2035     # MySQL actually recommends this approach.  I cringe.
2036     $attrs->{rows} = $sql_maker->__max_int;
2037   }
2038
2039   my @limit;
2040
2041   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2042   # storage, unless software limit was requested
2043   if (
2044     #limited has_many
2045     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2046        ||
2047     # grouped prefetch (to satisfy group_by == select)
2048     ( $attrs->{group_by}
2049         &&
2050       @{$attrs->{group_by}}
2051         &&
2052       $attrs->{_prefetch_select}
2053         &&
2054       @{$attrs->{_prefetch_select}}
2055     )
2056   ) {
2057     ($ident, $select, $where, $attrs)
2058       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2059   }
2060   elsif (! $attrs->{software_limit} ) {
2061     push @limit, $attrs->{rows}, $attrs->{offset};
2062   }
2063
2064   # try to simplify the joinmap further (prune unreferenced type-single joins)
2065   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2066
2067 ###
2068   # This would be the point to deflate anything found in $where
2069   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2070   # expect a row object. And all we have is a resultsource (it is trivial
2071   # to extract deflator coderefs via $alias2source above).
2072   #
2073   # I don't see a way forward other than changing the way deflators are
2074   # invoked, and that's just bad...
2075 ###
2076
2077   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2078 }
2079
2080 # Returns a counting SELECT for a simple count
2081 # query. Abstracted so that a storage could override
2082 # this to { count => 'firstcol' } or whatever makes
2083 # sense as a performance optimization
2084 sub _count_select {
2085   #my ($self, $source, $rs_attrs) = @_;
2086   return { count => '*' };
2087 }
2088
2089
2090 sub source_bind_attributes {
2091   my ($self, $source) = @_;
2092
2093   my $bind_attributes;
2094   foreach my $column ($source->columns) {
2095
2096     my $data_type = $source->column_info($column)->{data_type} || '';
2097     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2098      if $data_type;
2099   }
2100
2101   return $bind_attributes;
2102 }
2103
2104 =head2 select
2105
2106 =over 4
2107
2108 =item Arguments: $ident, $select, $condition, $attrs
2109
2110 =back
2111
2112 Handle a SQL select statement.
2113
2114 =cut
2115
2116 sub select {
2117   my $self = shift;
2118   my ($ident, $select, $condition, $attrs) = @_;
2119   return $self->cursor_class->new($self, \@_, $attrs);
2120 }
2121
2122 sub select_single {
2123   my $self = shift;
2124   my ($rv, $sth, @bind) = $self->_select(@_);
2125   my @row = $sth->fetchrow_array;
2126   my @nextrow = $sth->fetchrow_array if @row;
2127   if(@row && @nextrow) {
2128     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2129   }
2130   # Need to call finish() to work round broken DBDs
2131   $sth->finish();
2132   return @row;
2133 }
2134
2135 =head2 sql_limit_dialect
2136
2137 This is an accessor for the default SQL limit dialect used by a particular
2138 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2139 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2140 see L<DBIx::Class::SQLMaker::LimitDialects>.
2141
2142 =head2 sth
2143
2144 =over 4
2145
2146 =item Arguments: $sql
2147
2148 =back
2149
2150 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2151
2152 =cut
2153
2154 sub _dbh_sth {
2155   my ($self, $dbh, $sql) = @_;
2156
2157   # 3 is the if_active parameter which avoids active sth re-use
2158   my $sth = $self->disable_sth_caching
2159     ? $dbh->prepare($sql)
2160     : $dbh->prepare_cached($sql, {}, 3);
2161
2162   # XXX You would think RaiseError would make this impossible,
2163   #  but apparently that's not true :(
2164   $self->throw_exception($dbh->errstr) if !$sth;
2165
2166   $sth;
2167 }
2168
2169 sub sth {
2170   my ($self, $sql) = @_;
2171   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2172 }
2173
2174 sub _dbh_columns_info_for {
2175   my ($self, $dbh, $table) = @_;
2176
2177   if ($dbh->can('column_info')) {
2178     my %result;
2179     my $caught;
2180     try {
2181       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2182       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2183       $sth->execute();
2184       while ( my $info = $sth->fetchrow_hashref() ){
2185         my %column_info;
2186         $column_info{data_type}   = $info->{TYPE_NAME};
2187         $column_info{size}      = $info->{COLUMN_SIZE};
2188         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2189         $column_info{default_value} = $info->{COLUMN_DEF};
2190         my $col_name = $info->{COLUMN_NAME};
2191         $col_name =~ s/^\"(.*)\"$/$1/;
2192
2193         $result{$col_name} = \%column_info;
2194       }
2195     } catch {
2196       $caught = 1;
2197     };
2198     return \%result if !$caught && scalar keys %result;
2199   }
2200
2201   my %result;
2202   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2203   $sth->execute;
2204   my @columns = @{$sth->{NAME_lc}};
2205   for my $i ( 0 .. $#columns ){
2206     my %column_info;
2207     $column_info{data_type} = $sth->{TYPE}->[$i];
2208     $column_info{size} = $sth->{PRECISION}->[$i];
2209     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2210
2211     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2212       $column_info{data_type} = $1;
2213       $column_info{size}    = $2;
2214     }
2215
2216     $result{$columns[$i]} = \%column_info;
2217   }
2218   $sth->finish;
2219
2220   foreach my $col (keys %result) {
2221     my $colinfo = $result{$col};
2222     my $type_num = $colinfo->{data_type};
2223     my $type_name;
2224     if(defined $type_num && $dbh->can('type_info')) {
2225       my $type_info = $dbh->type_info($type_num);
2226       $type_name = $type_info->{TYPE_NAME} if $type_info;
2227       $colinfo->{data_type} = $type_name if $type_name;
2228     }
2229   }
2230
2231   return \%result;
2232 }
2233
2234 sub columns_info_for {
2235   my ($self, $table) = @_;
2236   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2237 }
2238
2239 =head2 last_insert_id
2240
2241 Return the row id of the last insert.
2242
2243 =cut
2244
2245 sub _dbh_last_insert_id {
2246     my ($self, $dbh, $source, $col) = @_;
2247
2248     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2249
2250     return $id if defined $id;
2251
2252     my $class = ref $self;
2253     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2254 }
2255
2256 sub last_insert_id {
2257   my $self = shift;
2258   $self->_dbh_last_insert_id ($self->_dbh, @_);
2259 }
2260
2261 =head2 _native_data_type
2262
2263 =over 4
2264
2265 =item Arguments: $type_name
2266
2267 =back
2268
2269 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2270 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2271 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2272
2273 The default implementation returns C<undef>, implement in your Storage driver if
2274 you need this functionality.
2275
2276 Should map types from other databases to the native RDBMS type, for example
2277 C<VARCHAR2> to C<VARCHAR>.
2278
2279 Types with modifiers should map to the underlying data type. For example,
2280 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2281
2282 Composite types should map to the container type, for example
2283 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2284
2285 =cut
2286
2287 sub _native_data_type {
2288   #my ($self, $data_type) = @_;
2289   return undef
2290 }
2291
2292 # Check if placeholders are supported at all
2293 sub _determine_supports_placeholders {
2294   my $self = shift;
2295   my $dbh  = $self->_get_dbh;
2296
2297   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2298   # but it is inaccurate more often than not
2299   return try {
2300     local $dbh->{PrintError} = 0;
2301     local $dbh->{RaiseError} = 1;
2302     $dbh->do('select ?', {}, 1);
2303     1;
2304   }
2305   catch {
2306     0;
2307   };
2308 }
2309
2310 # Check if placeholders bound to non-string types throw exceptions
2311 #
2312 sub _determine_supports_typeless_placeholders {
2313   my $self = shift;
2314   my $dbh  = $self->_get_dbh;
2315
2316   return try {
2317     local $dbh->{PrintError} = 0;
2318     local $dbh->{RaiseError} = 1;
2319     # this specifically tests a bind that is NOT a string
2320     $dbh->do('select 1 where 1 = ?', {}, 1);
2321     1;
2322   }
2323   catch {
2324     0;
2325   };
2326 }
2327
2328 =head2 sqlt_type
2329
2330 Returns the database driver name.
2331
2332 =cut
2333
2334 sub sqlt_type {
2335   shift->_get_dbh->{Driver}->{Name};
2336 }
2337
2338 =head2 bind_attribute_by_data_type
2339
2340 Given a datatype from column info, returns a database specific bind
2341 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2342 let the database planner just handle it.
2343
2344 Generally only needed for special case column types, like bytea in postgres.
2345
2346 =cut
2347
2348 sub bind_attribute_by_data_type {
2349     return;
2350 }
2351
2352 =head2 is_datatype_numeric
2353
2354 Given a datatype from column_info, returns a boolean value indicating if
2355 the current RDBMS considers it a numeric value. This controls how
2356 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2357 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2358 be performed instead of the usual C<eq>.
2359
2360 =cut
2361
2362 sub is_datatype_numeric {
2363   my ($self, $dt) = @_;
2364
2365   return 0 unless $dt;
2366
2367   return $dt =~ /^ (?:
2368     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2369   ) $/ix;
2370 }
2371
2372
2373 =head2 create_ddl_dir
2374
2375 =over 4
2376
2377 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2378
2379 =back
2380
2381 Creates a SQL file based on the Schema, for each of the specified
2382 database engines in C<\@databases> in the given directory.
2383 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2384
2385 Given a previous version number, this will also create a file containing
2386 the ALTER TABLE statements to transform the previous schema into the
2387 current one. Note that these statements may contain C<DROP TABLE> or
2388 C<DROP COLUMN> statements that can potentially destroy data.
2389
2390 The file names are created using the C<ddl_filename> method below, please
2391 override this method in your schema if you would like a different file
2392 name format. For the ALTER file, the same format is used, replacing
2393 $version in the name with "$preversion-$version".
2394
2395 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2396 The most common value for this would be C<< { add_drop_table => 1 } >>
2397 to have the SQL produced include a C<DROP TABLE> statement for each table
2398 created. For quoting purposes supply C<quote_table_names> and
2399 C<quote_field_names>.
2400
2401 If no arguments are passed, then the following default values are assumed:
2402
2403 =over 4
2404
2405 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2406
2407 =item version    - $schema->schema_version
2408
2409 =item directory  - './'
2410
2411 =item preversion - <none>
2412
2413 =back
2414
2415 By default, C<\%sqlt_args> will have
2416
2417  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2418
2419 merged with the hash passed in. To disable any of those features, pass in a
2420 hashref like the following
2421
2422  { ignore_constraint_names => 0, # ... other options }
2423
2424
2425 WARNING: You are strongly advised to check all SQL files created, before applying
2426 them.
2427
2428 =cut
2429
2430 sub create_ddl_dir {
2431   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2432
2433   unless ($dir) {
2434     carp "No directory given, using ./\n";
2435     $dir = './';
2436   } else {
2437       -d $dir
2438         or
2439       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2440         or
2441       $self->throw_exception(
2442         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2443       );
2444   }
2445
2446   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2447
2448   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2449   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2450
2451   my $schema_version = $schema->schema_version || '1.x';
2452   $version ||= $schema_version;
2453
2454   $sqltargs = {
2455     add_drop_table => 1,
2456     ignore_constraint_names => 1,
2457     ignore_index_names => 1,
2458     %{$sqltargs || {}}
2459   };
2460
2461   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2462     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2463   }
2464
2465   my $sqlt = SQL::Translator->new( $sqltargs );
2466
2467   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2468   my $sqlt_schema = $sqlt->translate({ data => $schema })
2469     or $self->throw_exception ($sqlt->error);
2470
2471   foreach my $db (@$databases) {
2472     $sqlt->reset();
2473     $sqlt->{schema} = $sqlt_schema;
2474     $sqlt->producer($db);
2475
2476     my $file;
2477     my $filename = $schema->ddl_filename($db, $version, $dir);
2478     if (-e $filename && ($version eq $schema_version )) {
2479       # if we are dumping the current version, overwrite the DDL
2480       carp "Overwriting existing DDL file - $filename";
2481       unlink($filename);
2482     }
2483
2484     my $output = $sqlt->translate;
2485     if(!$output) {
2486       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2487       next;
2488     }
2489     if(!open($file, ">$filename")) {
2490       $self->throw_exception("Can't open $filename for writing ($!)");
2491       next;
2492     }
2493     print $file $output;
2494     close($file);
2495
2496     next unless ($preversion);
2497
2498     require SQL::Translator::Diff;
2499
2500     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2501     if(!-e $prefilename) {
2502       carp("No previous schema file found ($prefilename)");
2503       next;
2504     }
2505
2506     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2507     if(-e $difffile) {
2508       carp("Overwriting existing diff file - $difffile");
2509       unlink($difffile);
2510     }
2511
2512     my $source_schema;
2513     {
2514       my $t = SQL::Translator->new($sqltargs);
2515       $t->debug( 0 );
2516       $t->trace( 0 );
2517
2518       $t->parser( $db )
2519         or $self->throw_exception ($t->error);
2520
2521       my $out = $t->translate( $prefilename )
2522         or $self->throw_exception ($t->error);
2523
2524       $source_schema = $t->schema;
2525
2526       $source_schema->name( $prefilename )
2527         unless ( $source_schema->name );
2528     }
2529
2530     # The "new" style of producers have sane normalization and can support
2531     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2532     # And we have to diff parsed SQL against parsed SQL.
2533     my $dest_schema = $sqlt_schema;
2534
2535     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2536       my $t = SQL::Translator->new($sqltargs);
2537       $t->debug( 0 );
2538       $t->trace( 0 );
2539
2540       $t->parser( $db )
2541         or $self->throw_exception ($t->error);
2542
2543       my $out = $t->translate( $filename )
2544         or $self->throw_exception ($t->error);
2545
2546       $dest_schema = $t->schema;
2547
2548       $dest_schema->name( $filename )
2549         unless $dest_schema->name;
2550     }
2551
2552     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2553                                                   $dest_schema,   $db,
2554                                                   $sqltargs
2555                                                  );
2556     if(!open $file, ">$difffile") {
2557       $self->throw_exception("Can't write to $difffile ($!)");
2558       next;
2559     }
2560     print $file $diff;
2561     close($file);
2562   }
2563 }
2564
2565 =head2 deployment_statements
2566
2567 =over 4
2568
2569 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2570
2571 =back
2572
2573 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2574
2575 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2576 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2577
2578 C<$directory> is used to return statements from files in a previously created
2579 L</create_ddl_dir> directory and is optional. The filenames are constructed
2580 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2581
2582 If no C<$directory> is specified then the statements are constructed on the
2583 fly using L<SQL::Translator> and C<$version> is ignored.
2584
2585 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2586
2587 =cut
2588
2589 sub deployment_statements {
2590   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2591   $type ||= $self->sqlt_type;
2592   $version ||= $schema->schema_version || '1.x';
2593   $dir ||= './';
2594   my $filename = $schema->ddl_filename($type, $version, $dir);
2595   if(-f $filename)
2596   {
2597       my $file;
2598       open($file, "<$filename")
2599         or $self->throw_exception("Can't open $filename ($!)");
2600       my @rows = <$file>;
2601       close($file);
2602       return join('', @rows);
2603   }
2604
2605   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2606     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2607   }
2608
2609   # sources needs to be a parser arg, but for simplicty allow at top level
2610   # coming in
2611   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2612       if exists $sqltargs->{sources};
2613
2614   my $tr = SQL::Translator->new(
2615     producer => "SQL::Translator::Producer::${type}",
2616     %$sqltargs,
2617     parser => 'SQL::Translator::Parser::DBIx::Class',
2618     data => $schema,
2619   );
2620
2621   my @ret;
2622   my $wa = wantarray;
2623   if ($wa) {
2624     @ret = $tr->translate;
2625   }
2626   else {
2627     $ret[0] = $tr->translate;
2628   }
2629
2630   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2631     unless (@ret && defined $ret[0]);
2632
2633   return $wa ? @ret : $ret[0];
2634 }
2635
2636 sub deploy {
2637   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2638   my $deploy = sub {
2639     my $line = shift;
2640     return if($line =~ /^--/);
2641     return if(!$line);
2642     # next if($line =~ /^DROP/m);
2643     return if($line =~ /^BEGIN TRANSACTION/m);
2644     return if($line =~ /^COMMIT/m);
2645     return if $line =~ /^\s+$/; # skip whitespace only
2646     $self->_query_start($line);
2647     try {
2648       # do a dbh_do cycle here, as we need some error checking in
2649       # place (even though we will ignore errors)
2650       $self->dbh_do (sub { $_[1]->do($line) });
2651     } catch {
2652       carp qq{$_ (running "${line}")};
2653     };
2654     $self->_query_end($line);
2655   };
2656   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2657   if (@statements > 1) {
2658     foreach my $statement (@statements) {
2659       $deploy->( $statement );
2660     }
2661   }
2662   elsif (@statements == 1) {
2663     foreach my $line ( split(";\n", $statements[0])) {
2664       $deploy->( $line );
2665     }
2666   }
2667 }
2668
2669 =head2 datetime_parser
2670
2671 Returns the datetime parser class
2672
2673 =cut
2674
2675 sub datetime_parser {
2676   my $self = shift;
2677   return $self->{datetime_parser} ||= do {
2678     $self->build_datetime_parser(@_);
2679   };
2680 }
2681
2682 =head2 datetime_parser_type
2683
2684 Defines (returns) the datetime parser class - currently hardwired to
2685 L<DateTime::Format::MySQL>
2686
2687 =cut
2688
2689 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2690
2691 =head2 build_datetime_parser
2692
2693 See L</datetime_parser>
2694
2695 =cut
2696
2697 sub build_datetime_parser {
2698   my $self = shift;
2699   my $type = $self->datetime_parser_type(@_);
2700   $self->ensure_class_loaded ($type);
2701   return $type;
2702 }
2703
2704
2705 =head2 is_replicating
2706
2707 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2708 replicate from a master database.  Default is undef, which is the result
2709 returned by databases that don't support replication.
2710
2711 =cut
2712
2713 sub is_replicating {
2714     return;
2715
2716 }
2717
2718 =head2 lag_behind_master
2719
2720 Returns a number that represents a certain amount of lag behind a master db
2721 when a given storage is replicating.  The number is database dependent, but
2722 starts at zero and increases with the amount of lag. Default in undef
2723
2724 =cut
2725
2726 sub lag_behind_master {
2727     return;
2728 }
2729
2730 =head2 relname_to_table_alias
2731
2732 =over 4
2733
2734 =item Arguments: $relname, $join_count
2735
2736 =back
2737
2738 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2739 queries.
2740
2741 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2742 way these aliases are named.
2743
2744 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2745 otherwise C<"$relname">.
2746
2747 =cut
2748
2749 sub relname_to_table_alias {
2750   my ($self, $relname, $join_count) = @_;
2751
2752   my $alias = ($join_count && $join_count > 1 ?
2753     join('_', $relname, $join_count) : $relname);
2754
2755   return $alias;
2756 }
2757
2758 1;
2759
2760 =head1 USAGE NOTES
2761
2762 =head2 DBIx::Class and AutoCommit
2763
2764 DBIx::Class can do some wonderful magic with handling exceptions,
2765 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2766 (the default) combined with C<txn_do> for transaction support.
2767
2768 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2769 in an assumed transaction between commits, and you're telling us you'd
2770 like to manage that manually.  A lot of the magic protections offered by
2771 this module will go away.  We can't protect you from exceptions due to database
2772 disconnects because we don't know anything about how to restart your
2773 transactions.  You're on your own for handling all sorts of exceptional
2774 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2775 be with raw DBI.
2776
2777
2778 =head1 AUTHORS
2779
2780 Matt S. Trout <mst@shadowcatsystems.co.uk>
2781
2782 Andy Grundman <andy@hybridized.org>
2783
2784 =head1 LICENSE
2785
2786 You may distribute this code under the same terms as Perl itself.
2787
2788 =cut