SQLA::Limit is no more \o/
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } @capabilities );
60
61
62 # Each of these methods need _determine_driver called before itself
63 # in order to function reliably. This is a purely DRY optimization
64 #
65 # get_(use)_dbms_capability need to be called on the correct Storage
66 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
67 # _determine_supports_X which obv. needs a correct driver as well
68 my @rdbms_specific_methods = qw/
69   deployment_statements
70   sqlt_type
71   sql_maker
72   build_datetime_parser
73   datetime_parser_type
74
75
76   insert
77   insert_bulk
78   update
79   delete
80   select
81   select_single
82
83   get_use_dbms_capability
84   get_dbms_capability
85 /;
86
87 for my $meth (@rdbms_specific_methods) {
88
89   my $orig = __PACKAGE__->can ($meth)
90     or die "$meth is not a ::Storage::DBI method!";
91
92   no strict qw/refs/;
93   no warnings qw/redefine/;
94   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
95     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
96       $_[0]->_determine_driver;
97       goto $_[0]->can($meth);
98     }
99     $orig->(@_);
100   };
101 }
102
103
104 =head1 NAME
105
106 DBIx::Class::Storage::DBI - DBI storage handler
107
108 =head1 SYNOPSIS
109
110   my $schema = MySchema->connect('dbi:SQLite:my.db');
111
112   $schema->storage->debug(1);
113
114   my @stuff = $schema->storage->dbh_do(
115     sub {
116       my ($storage, $dbh, @args) = @_;
117       $dbh->do("DROP TABLE authors");
118     },
119     @column_list
120   );
121
122   $schema->resultset('Book')->search({
123      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
124   });
125
126 =head1 DESCRIPTION
127
128 This class represents the connection to an RDBMS via L<DBI>.  See
129 L<DBIx::Class::Storage> for general information.  This pod only
130 documents DBI-specific methods and behaviors.
131
132 =head1 METHODS
133
134 =cut
135
136 sub new {
137   my $new = shift->next::method(@_);
138
139   $new->transaction_depth(0);
140   $new->_sql_maker_opts({});
141   $new->_dbh_details({});
142   $new->{savepoints} = [];
143   $new->{_in_dbh_do} = 0;
144   $new->{_dbh_gen} = 0;
145
146   # read below to see what this does
147   $new->_arm_global_destructor;
148
149   $new;
150 }
151
152 # This is hack to work around perl shooting stuff in random
153 # order on exit(). If we do not walk the remaining storage
154 # objects in an END block, there is a *small but real* chance
155 # of a fork()ed child to kill the parent's shared DBI handle,
156 # *before perl reaches the DESTROY in this package*
157 # Yes, it is ugly and effective.
158 {
159   my %seek_and_destroy;
160
161   sub _arm_global_destructor {
162     my $self = shift;
163     my $key = Scalar::Util::refaddr ($self);
164     $seek_and_destroy{$key} = $self;
165     Scalar::Util::weaken ($seek_and_destroy{$key});
166   }
167
168   END {
169     local $?; # just in case the DBI destructor changes it somehow
170
171     # destroy just the object if not native to this process/thread
172     $_->_preserve_foreign_dbh for (grep
173       { defined $_ }
174       values %seek_and_destroy
175     );
176   }
177 }
178
179 sub DESTROY {
180   my $self = shift;
181
182   # destroy just the object if not native to this process/thread
183   $self->_preserve_foreign_dbh;
184
185   # some databases need this to stop spewing warnings
186   if (my $dbh = $self->_dbh) {
187     try {
188       %{ $dbh->{CachedKids} } = ();
189       $dbh->disconnect;
190     };
191   }
192
193   $self->_dbh(undef);
194 }
195
196 sub _preserve_foreign_dbh {
197   my $self = shift;
198
199   return unless $self->_dbh;
200
201   $self->_verify_tid;
202
203   return unless $self->_dbh;
204
205   $self->_verify_pid;
206
207 }
208
209 # handle pid changes correctly - do not destroy parent's connection
210 sub _verify_pid {
211   my $self = shift;
212
213   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
214
215   $self->_dbh->{InactiveDestroy} = 1;
216   $self->_dbh(undef);
217   $self->{_dbh_gen}++;
218
219   return;
220 }
221
222 # very similar to above, but seems to FAIL if I set InactiveDestroy
223 sub _verify_tid {
224   my $self = shift;
225
226   if ( ! defined $self->_conn_tid ) {
227     return; # no threads
228   }
229   elsif ( $self->_conn_tid == threads->tid ) {
230     return; # same thread
231   }
232
233   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
234   $self->_dbh(undef);
235   $self->{_dbh_gen}++;
236
237   return;
238 }
239
240
241 =head2 connect_info
242
243 This method is normally called by L<DBIx::Class::Schema/connection>, which
244 encapsulates its argument list in an arrayref before passing them here.
245
246 The argument list may contain:
247
248 =over
249
250 =item *
251
252 The same 4-element argument set one would normally pass to
253 L<DBI/connect>, optionally followed by
254 L<extra attributes|/DBIx::Class specific connection attributes>
255 recognized by DBIx::Class:
256
257   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
258
259 =item *
260
261 A single code reference which returns a connected
262 L<DBI database handle|DBI/connect> optionally followed by
263 L<extra attributes|/DBIx::Class specific connection attributes> recognized
264 by DBIx::Class:
265
266   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
267
268 =item *
269
270 A single hashref with all the attributes and the dsn/user/password
271 mixed together:
272
273   $connect_info_args = [{
274     dsn => $dsn,
275     user => $user,
276     password => $pass,
277     %dbi_attributes,
278     %extra_attributes,
279   }];
280
281   $connect_info_args = [{
282     dbh_maker => sub { DBI->connect (...) },
283     %dbi_attributes,
284     %extra_attributes,
285   }];
286
287 This is particularly useful for L<Catalyst> based applications, allowing the
288 following config (L<Config::General> style):
289
290   <Model::DB>
291     schema_class   App::DB
292     <connect_info>
293       dsn          dbi:mysql:database=test
294       user         testuser
295       password     TestPass
296       AutoCommit   1
297     </connect_info>
298   </Model::DB>
299
300 The C<dsn>/C<user>/C<password> combination can be substituted by the
301 C<dbh_maker> key whose value is a coderef that returns a connected
302 L<DBI database handle|DBI/connect>
303
304 =back
305
306 Please note that the L<DBI> docs recommend that you always explicitly
307 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
308 recommends that it be set to I<1>, and that you perform transactions
309 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
310 to I<1> if you do not do explicitly set it to zero.  This is the default
311 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
312
313 =head3 DBIx::Class specific connection attributes
314
315 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
316 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
317 the following connection options. These options can be mixed in with your other
318 L<DBI> connection attributes, or placed in a separate hashref
319 (C<\%extra_attributes>) as shown above.
320
321 Every time C<connect_info> is invoked, any previous settings for
322 these options will be cleared before setting the new ones, regardless of
323 whether any options are specified in the new C<connect_info>.
324
325
326 =over
327
328 =item on_connect_do
329
330 Specifies things to do immediately after connecting or re-connecting to
331 the database.  Its value may contain:
332
333 =over
334
335 =item a scalar
336
337 This contains one SQL statement to execute.
338
339 =item an array reference
340
341 This contains SQL statements to execute in order.  Each element contains
342 a string or a code reference that returns a string.
343
344 =item a code reference
345
346 This contains some code to execute.  Unlike code references within an
347 array reference, its return value is ignored.
348
349 =back
350
351 =item on_disconnect_do
352
353 Takes arguments in the same form as L</on_connect_do> and executes them
354 immediately before disconnecting from the database.
355
356 Note, this only runs if you explicitly call L</disconnect> on the
357 storage object.
358
359 =item on_connect_call
360
361 A more generalized form of L</on_connect_do> that calls the specified
362 C<connect_call_METHOD> methods in your storage driver.
363
364   on_connect_do => 'select 1'
365
366 is equivalent to:
367
368   on_connect_call => [ [ do_sql => 'select 1' ] ]
369
370 Its values may contain:
371
372 =over
373
374 =item a scalar
375
376 Will call the C<connect_call_METHOD> method.
377
378 =item a code reference
379
380 Will execute C<< $code->($storage) >>
381
382 =item an array reference
383
384 Each value can be a method name or code reference.
385
386 =item an array of arrays
387
388 For each array, the first item is taken to be the C<connect_call_> method name
389 or code reference, and the rest are parameters to it.
390
391 =back
392
393 Some predefined storage methods you may use:
394
395 =over
396
397 =item do_sql
398
399 Executes a SQL string or a code reference that returns a SQL string. This is
400 what L</on_connect_do> and L</on_disconnect_do> use.
401
402 It can take:
403
404 =over
405
406 =item a scalar
407
408 Will execute the scalar as SQL.
409
410 =item an arrayref
411
412 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
413 attributes hashref and bind values.
414
415 =item a code reference
416
417 Will execute C<< $code->($storage) >> and execute the return array refs as
418 above.
419
420 =back
421
422 =item datetime_setup
423
424 Execute any statements necessary to initialize the database session to return
425 and accept datetime/timestamp values used with
426 L<DBIx::Class::InflateColumn::DateTime>.
427
428 Only necessary for some databases, see your specific storage driver for
429 implementation details.
430
431 =back
432
433 =item on_disconnect_call
434
435 Takes arguments in the same form as L</on_connect_call> and executes them
436 immediately before disconnecting from the database.
437
438 Calls the C<disconnect_call_METHOD> methods as opposed to the
439 C<connect_call_METHOD> methods called by L</on_connect_call>.
440
441 Note, this only runs if you explicitly call L</disconnect> on the
442 storage object.
443
444 =item disable_sth_caching
445
446 If set to a true value, this option will disable the caching of
447 statement handles via L<DBI/prepare_cached>.
448
449 =item limit_dialect
450
451 Sets the limit dialect. This is useful for JDBC-bridge among others
452 where the remote SQL-dialect cannot be determined by the name of the
453 driver alone. See also L<SQL::Abstract::Limit>.
454
455 =item quote_char
456
457 Specifies what characters to use to quote table and column names. If
458 you use this you will want to specify L</name_sep> as well.
459
460 C<quote_char> expects either a single character, in which case is it
461 is placed on either side of the table/column name, or an arrayref of length
462 2 in which case the table/column name is placed between the elements.
463
464 For example under MySQL you should use C<< quote_char => '`' >>, and for
465 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
466
467 =item name_sep
468
469 This only needs to be used in conjunction with C<quote_char>, and is used to
470 specify the character that separates elements (schemas, tables, columns) from
471 each other. In most cases this is simply a C<.>.
472
473 The consequences of not supplying this value is that L<SQL::Abstract>
474 will assume DBIx::Class' uses of aliases to be complete column
475 names. The output will look like I<"me.name"> when it should actually
476 be I<"me"."name">.
477
478 =item unsafe
479
480 This Storage driver normally installs its own C<HandleError>, sets
481 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
482 all database handles, including those supplied by a coderef.  It does this
483 so that it can have consistent and useful error behavior.
484
485 If you set this option to a true value, Storage will not do its usual
486 modifications to the database handle's attributes, and instead relies on
487 the settings in your connect_info DBI options (or the values you set in
488 your connection coderef, in the case that you are connecting via coderef).
489
490 Note that your custom settings can cause Storage to malfunction,
491 especially if you set a C<HandleError> handler that suppresses exceptions
492 and/or disable C<RaiseError>.
493
494 =item auto_savepoint
495
496 If this option is true, L<DBIx::Class> will use savepoints when nesting
497 transactions, making it possible to recover from failure in the inner
498 transaction without having to abort all outer transactions.
499
500 =item cursor_class
501
502 Use this argument to supply a cursor class other than the default
503 L<DBIx::Class::Storage::DBI::Cursor>.
504
505 =back
506
507 Some real-life examples of arguments to L</connect_info> and
508 L<DBIx::Class::Schema/connect>
509
510   # Simple SQLite connection
511   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
512
513   # Connect via subref
514   ->connect_info([ sub { DBI->connect(...) } ]);
515
516   # Connect via subref in hashref
517   ->connect_info([{
518     dbh_maker => sub { DBI->connect(...) },
519     on_connect_do => 'alter session ...',
520   }]);
521
522   # A bit more complicated
523   ->connect_info(
524     [
525       'dbi:Pg:dbname=foo',
526       'postgres',
527       'my_pg_password',
528       { AutoCommit => 1 },
529       { quote_char => q{"}, name_sep => q{.} },
530     ]
531   );
532
533   # Equivalent to the previous example
534   ->connect_info(
535     [
536       'dbi:Pg:dbname=foo',
537       'postgres',
538       'my_pg_password',
539       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
540     ]
541   );
542
543   # Same, but with hashref as argument
544   # See parse_connect_info for explanation
545   ->connect_info(
546     [{
547       dsn         => 'dbi:Pg:dbname=foo',
548       user        => 'postgres',
549       password    => 'my_pg_password',
550       AutoCommit  => 1,
551       quote_char  => q{"},
552       name_sep    => q{.},
553     }]
554   );
555
556   # Subref + DBIx::Class-specific connection options
557   ->connect_info(
558     [
559       sub { DBI->connect(...) },
560       {
561           quote_char => q{`},
562           name_sep => q{@},
563           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
564           disable_sth_caching => 1,
565       },
566     ]
567   );
568
569
570
571 =cut
572
573 sub connect_info {
574   my ($self, $info) = @_;
575
576   return $self->_connect_info if !$info;
577
578   $self->_connect_info($info); # copy for _connect_info
579
580   $info = $self->_normalize_connect_info($info)
581     if ref $info eq 'ARRAY';
582
583   for my $storage_opt (keys %{ $info->{storage_options} }) {
584     my $value = $info->{storage_options}{$storage_opt};
585
586     $self->$storage_opt($value);
587   }
588
589   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
590   #  the new set of options
591   $self->_sql_maker(undef);
592   $self->_sql_maker_opts({});
593
594   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
595     my $value = $info->{sql_maker_options}{$sql_maker_opt};
596
597     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
598   }
599
600   my %attrs = (
601     %{ $self->_default_dbi_connect_attributes || {} },
602     %{ $info->{attributes} || {} },
603   );
604
605   my @args = @{ $info->{arguments} };
606
607   $self->_dbi_connect_info([@args,
608     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
609
610   # FIXME - dirty:
611   # save attributes them in a separate accessor so they are always
612   # introspectable, even in case of a CODE $dbhmaker
613   $self->_dbic_connect_attributes (\%attrs);
614
615   return $self->_connect_info;
616 }
617
618 sub _normalize_connect_info {
619   my ($self, $info_arg) = @_;
620   my %info;
621
622   my @args = @$info_arg;  # take a shallow copy for further mutilation
623
624   # combine/pre-parse arguments depending on invocation style
625
626   my %attrs;
627   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
628     %attrs = %{ $args[1] || {} };
629     @args = $args[0];
630   }
631   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
632     %attrs = %{$args[0]};
633     @args = ();
634     if (my $code = delete $attrs{dbh_maker}) {
635       @args = $code;
636
637       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
638       if (@ignored) {
639         carp sprintf (
640             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
641           . "to the result of 'dbh_maker'",
642
643           join (', ', map { "'$_'" } (@ignored) ),
644         );
645       }
646     }
647     else {
648       @args = delete @attrs{qw/dsn user password/};
649     }
650   }
651   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
652     %attrs = (
653       % { $args[3] || {} },
654       % { $args[4] || {} },
655     );
656     @args = @args[0,1,2];
657   }
658
659   $info{arguments} = \@args;
660
661   my @storage_opts = grep exists $attrs{$_},
662     @storage_options, 'cursor_class';
663
664   @{ $info{storage_options} }{@storage_opts} =
665     delete @attrs{@storage_opts} if @storage_opts;
666
667   my @sql_maker_opts = grep exists $attrs{$_},
668     qw/limit_dialect quote_char name_sep/;
669
670   @{ $info{sql_maker_options} }{@sql_maker_opts} =
671     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
672
673   $info{attributes} = \%attrs if %attrs;
674
675   return \%info;
676 }
677
678 sub _default_dbi_connect_attributes {
679   return {
680     AutoCommit => 1,
681     RaiseError => 1,
682     PrintError => 0,
683   };
684 }
685
686 =head2 on_connect_do
687
688 This method is deprecated in favour of setting via L</connect_info>.
689
690 =cut
691
692 =head2 on_disconnect_do
693
694 This method is deprecated in favour of setting via L</connect_info>.
695
696 =cut
697
698 sub _parse_connect_do {
699   my ($self, $type) = @_;
700
701   my $val = $self->$type;
702   return () if not defined $val;
703
704   my @res;
705
706   if (not ref($val)) {
707     push @res, [ 'do_sql', $val ];
708   } elsif (ref($val) eq 'CODE') {
709     push @res, $val;
710   } elsif (ref($val) eq 'ARRAY') {
711     push @res, map { [ 'do_sql', $_ ] } @$val;
712   } else {
713     $self->throw_exception("Invalid type for $type: ".ref($val));
714   }
715
716   return \@res;
717 }
718
719 =head2 dbh_do
720
721 Arguments: ($subref | $method_name), @extra_coderef_args?
722
723 Execute the given $subref or $method_name using the new exception-based
724 connection management.
725
726 The first two arguments will be the storage object that C<dbh_do> was called
727 on and a database handle to use.  Any additional arguments will be passed
728 verbatim to the called subref as arguments 2 and onwards.
729
730 Using this (instead of $self->_dbh or $self->dbh) ensures correct
731 exception handling and reconnection (or failover in future subclasses).
732
733 Your subref should have no side-effects outside of the database, as
734 there is the potential for your subref to be partially double-executed
735 if the database connection was stale/dysfunctional.
736
737 Example:
738
739   my @stuff = $schema->storage->dbh_do(
740     sub {
741       my ($storage, $dbh, @cols) = @_;
742       my $cols = join(q{, }, @cols);
743       $dbh->selectrow_array("SELECT $cols FROM foo");
744     },
745     @column_list
746   );
747
748 =cut
749
750 sub dbh_do {
751   my $self = shift;
752   my $code = shift;
753
754   my $dbh = $self->_get_dbh;
755
756   return $self->$code($dbh, @_)
757     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
758
759   local $self->{_in_dbh_do} = 1;
760
761   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
762   my $args = \@_;
763   return try {
764     $self->$code ($dbh, @$args);
765   } catch {
766     $self->throw_exception($_) if $self->connected;
767
768     # We were not connected - reconnect and retry, but let any
769     #  exception fall right through this time
770     carp "Retrying $code after catching disconnected exception: $_"
771       if $ENV{DBIC_DBIRETRY_DEBUG};
772
773     $self->_populate_dbh;
774     $self->$code($self->_dbh, @$args);
775   };
776 }
777
778 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
779 # It also informs dbh_do to bypass itself while under the direction of txn_do,
780 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
781 sub txn_do {
782   my $self = shift;
783   my $coderef = shift;
784
785   ref $coderef eq 'CODE' or $self->throw_exception
786     ('$coderef must be a CODE reference');
787
788   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
789
790   local $self->{_in_dbh_do} = 1;
791
792   my @result;
793   my $want_array = wantarray;
794
795   my $tried = 0;
796   while(1) {
797     my $exception;
798
799     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
800     my $args = \@_;
801
802     try {
803       $self->_get_dbh;
804
805       $self->txn_begin;
806       if($want_array) {
807           @result = $coderef->(@$args);
808       }
809       elsif(defined $want_array) {
810           $result[0] = $coderef->(@$args);
811       }
812       else {
813           $coderef->(@$args);
814       }
815       $self->txn_commit;
816     } catch {
817       $exception = $_;
818     };
819
820     if(! defined $exception) { return $want_array ? @result : $result[0] }
821
822     if($tried++ || $self->connected) {
823       my $rollback_exception;
824       try { $self->txn_rollback } catch { $rollback_exception = shift };
825       if(defined $rollback_exception) {
826         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
827         $self->throw_exception($exception)  # propagate nested rollback
828           if $rollback_exception =~ /$exception_class/;
829
830         $self->throw_exception(
831           "Transaction aborted: ${exception}. "
832           . "Rollback failed: ${rollback_exception}"
833         );
834       }
835       $self->throw_exception($exception)
836     }
837
838     # We were not connected, and was first try - reconnect and retry
839     # via the while loop
840     carp "Retrying $coderef after catching disconnected exception: $exception"
841       if $ENV{DBIC_DBIRETRY_DEBUG};
842     $self->_populate_dbh;
843   }
844 }
845
846 =head2 disconnect
847
848 Our C<disconnect> method also performs a rollback first if the
849 database is not in C<AutoCommit> mode.
850
851 =cut
852
853 sub disconnect {
854   my ($self) = @_;
855
856   if( $self->_dbh ) {
857     my @actions;
858
859     push @actions, ( $self->on_disconnect_call || () );
860     push @actions, $self->_parse_connect_do ('on_disconnect_do');
861
862     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
863
864     $self->_dbh_rollback unless $self->_dbh_autocommit;
865
866     %{ $self->_dbh->{CachedKids} } = ();
867     $self->_dbh->disconnect;
868     $self->_dbh(undef);
869     $self->{_dbh_gen}++;
870   }
871 }
872
873 =head2 with_deferred_fk_checks
874
875 =over 4
876
877 =item Arguments: C<$coderef>
878
879 =item Return Value: The return value of $coderef
880
881 =back
882
883 Storage specific method to run the code ref with FK checks deferred or
884 in MySQL's case disabled entirely.
885
886 =cut
887
888 # Storage subclasses should override this
889 sub with_deferred_fk_checks {
890   my ($self, $sub) = @_;
891   $sub->();
892 }
893
894 =head2 connected
895
896 =over
897
898 =item Arguments: none
899
900 =item Return Value: 1|0
901
902 =back
903
904 Verifies that the current database handle is active and ready to execute
905 an SQL statement (e.g. the connection did not get stale, server is still
906 answering, etc.) This method is used internally by L</dbh>.
907
908 =cut
909
910 sub connected {
911   my $self = shift;
912   return 0 unless $self->_seems_connected;
913
914   #be on the safe side
915   local $self->_dbh->{RaiseError} = 1;
916
917   return $self->_ping;
918 }
919
920 sub _seems_connected {
921   my $self = shift;
922
923   $self->_preserve_foreign_dbh;
924
925   my $dbh = $self->_dbh
926     or return 0;
927
928   return $dbh->FETCH('Active');
929 }
930
931 sub _ping {
932   my $self = shift;
933
934   my $dbh = $self->_dbh or return 0;
935
936   return $dbh->ping;
937 }
938
939 sub ensure_connected {
940   my ($self) = @_;
941
942   unless ($self->connected) {
943     $self->_populate_dbh;
944   }
945 }
946
947 =head2 dbh
948
949 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
950 is guaranteed to be healthy by implicitly calling L</connected>, and if
951 necessary performing a reconnection before returning. Keep in mind that this
952 is very B<expensive> on some database engines. Consider using L</dbh_do>
953 instead.
954
955 =cut
956
957 sub dbh {
958   my ($self) = @_;
959
960   if (not $self->_dbh) {
961     $self->_populate_dbh;
962   } else {
963     $self->ensure_connected;
964   }
965   return $self->_dbh;
966 }
967
968 # this is the internal "get dbh or connect (don't check)" method
969 sub _get_dbh {
970   my $self = shift;
971   $self->_preserve_foreign_dbh;
972   $self->_populate_dbh unless $self->_dbh;
973   return $self->_dbh;
974 }
975
976 sub sql_maker {
977   my ($self) = @_;
978   unless ($self->_sql_maker) {
979     my $sql_maker_class = $self->sql_maker_class;
980     $self->ensure_class_loaded ($sql_maker_class);
981
982     my %opts = %{$self->_sql_maker_opts||{}};
983     my $dialect =
984       $opts{limit_dialect}
985         ||
986       $self->sql_limit_dialect
987         ||
988       do {
989         my $s_class = (ref $self) || $self;
990         carp (
991           "Your storage class ($s_class) does not set sql_limit_dialect and you "
992         . 'have not supplied an explicit limit_dialect in your connection_info. '
993         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
994         . 'databases but can be (and often is) painfully slow.'
995         );
996
997         'GenericSubQ';
998       }
999     ;
1000
1001     $self->_sql_maker($sql_maker_class->new(
1002       bindtype=>'columns',
1003       array_datatypes => 1,
1004       limit_dialect => $dialect,
1005       %opts,
1006     ));
1007   }
1008   return $self->_sql_maker;
1009 }
1010
1011 # nothing to do by default
1012 sub _rebless {}
1013 sub _init {}
1014
1015 sub _populate_dbh {
1016   my ($self) = @_;
1017
1018   my @info = @{$self->_dbi_connect_info || []};
1019   $self->_dbh(undef); # in case ->connected failed we might get sent here
1020   $self->_dbh_details({}); # reset everything we know
1021
1022   $self->_dbh($self->_connect(@info));
1023
1024   $self->_conn_pid($$);
1025   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1026
1027   $self->_determine_driver;
1028
1029   # Always set the transaction depth on connect, since
1030   #  there is no transaction in progress by definition
1031   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1032
1033   $self->_run_connection_actions unless $self->{_in_determine_driver};
1034 }
1035
1036 sub _run_connection_actions {
1037   my $self = shift;
1038   my @actions;
1039
1040   push @actions, ( $self->on_connect_call || () );
1041   push @actions, $self->_parse_connect_do ('on_connect_do');
1042
1043   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1044 }
1045
1046
1047
1048 sub set_use_dbms_capability {
1049   $_[0]->set_inherited ($_[1], $_[2]);
1050 }
1051
1052 sub get_use_dbms_capability {
1053   my ($self, $capname) = @_;
1054
1055   my $use = $self->get_inherited ($capname);
1056   return defined $use
1057     ? $use
1058     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1059   ;
1060 }
1061
1062 sub set_dbms_capability {
1063   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1064 }
1065
1066 sub get_dbms_capability {
1067   my ($self, $capname) = @_;
1068
1069   my $cap = $self->_dbh_details->{capability}{$capname};
1070
1071   unless (defined $cap) {
1072     if (my $meth = $self->can ("_determine$capname")) {
1073       $cap = $self->$meth ? 1 : 0;
1074     }
1075     else {
1076       $cap = 0;
1077     }
1078
1079     $self->set_dbms_capability ($capname, $cap);
1080   }
1081
1082   return $cap;
1083 }
1084
1085 sub _server_info {
1086   my $self = shift;
1087
1088   my $info;
1089   unless ($info = $self->_dbh_details->{info}) {
1090
1091     $info = {};
1092
1093     my $server_version = try { $self->_get_server_version };
1094
1095     if (defined $server_version) {
1096       $info->{dbms_version} = $server_version;
1097
1098       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1099       my @verparts = split (/\./, $numeric_version);
1100       if (
1101         @verparts
1102           &&
1103         $verparts[0] <= 999
1104       ) {
1105         # consider only up to 3 version parts, iff not more than 3 digits
1106         my @use_parts;
1107         while (@verparts && @use_parts < 3) {
1108           my $p = shift @verparts;
1109           last if $p > 999;
1110           push @use_parts, $p;
1111         }
1112         push @use_parts, 0 while @use_parts < 3;
1113
1114         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1115       }
1116     }
1117
1118     $self->_dbh_details->{info} = $info;
1119   }
1120
1121   return $info;
1122 }
1123
1124 sub _get_server_version {
1125   shift->_get_dbh->get_info(18);
1126 }
1127
1128 sub _determine_driver {
1129   my ($self) = @_;
1130
1131   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1132     my $started_connected = 0;
1133     local $self->{_in_determine_driver} = 1;
1134
1135     if (ref($self) eq __PACKAGE__) {
1136       my $driver;
1137       if ($self->_dbh) { # we are connected
1138         $driver = $self->_dbh->{Driver}{Name};
1139         $started_connected = 1;
1140       } else {
1141         # if connect_info is a CODEREF, we have no choice but to connect
1142         if (ref $self->_dbi_connect_info->[0] &&
1143             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1144           $self->_populate_dbh;
1145           $driver = $self->_dbh->{Driver}{Name};
1146         }
1147         else {
1148           # try to use dsn to not require being connected, the driver may still
1149           # force a connection in _rebless to determine version
1150           # (dsn may not be supplied at all if all we do is make a mock-schema)
1151           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1152           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1153           $driver ||= $ENV{DBI_DRIVER};
1154         }
1155       }
1156
1157       if ($driver) {
1158         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1159         if ($self->load_optional_class($storage_class)) {
1160           mro::set_mro($storage_class, 'c3');
1161           bless $self, $storage_class;
1162           $self->_rebless();
1163         }
1164       }
1165     }
1166
1167     $self->_driver_determined(1);
1168
1169     $self->_init; # run driver-specific initializations
1170
1171     $self->_run_connection_actions
1172         if !$started_connected && defined $self->_dbh;
1173   }
1174 }
1175
1176 sub _do_connection_actions {
1177   my $self          = shift;
1178   my $method_prefix = shift;
1179   my $call          = shift;
1180
1181   if (not ref($call)) {
1182     my $method = $method_prefix . $call;
1183     $self->$method(@_);
1184   } elsif (ref($call) eq 'CODE') {
1185     $self->$call(@_);
1186   } elsif (ref($call) eq 'ARRAY') {
1187     if (ref($call->[0]) ne 'ARRAY') {
1188       $self->_do_connection_actions($method_prefix, $_) for @$call;
1189     } else {
1190       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1191     }
1192   } else {
1193     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1194   }
1195
1196   return $self;
1197 }
1198
1199 sub connect_call_do_sql {
1200   my $self = shift;
1201   $self->_do_query(@_);
1202 }
1203
1204 sub disconnect_call_do_sql {
1205   my $self = shift;
1206   $self->_do_query(@_);
1207 }
1208
1209 # override in db-specific backend when necessary
1210 sub connect_call_datetime_setup { 1 }
1211
1212 sub _do_query {
1213   my ($self, $action) = @_;
1214
1215   if (ref $action eq 'CODE') {
1216     $action = $action->($self);
1217     $self->_do_query($_) foreach @$action;
1218   }
1219   else {
1220     # Most debuggers expect ($sql, @bind), so we need to exclude
1221     # the attribute hash which is the second argument to $dbh->do
1222     # furthermore the bind values are usually to be presented
1223     # as named arrayref pairs, so wrap those here too
1224     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1225     my $sql = shift @do_args;
1226     my $attrs = shift @do_args;
1227     my @bind = map { [ undef, $_ ] } @do_args;
1228
1229     $self->_query_start($sql, @bind);
1230     $self->_get_dbh->do($sql, $attrs, @do_args);
1231     $self->_query_end($sql, @bind);
1232   }
1233
1234   return $self;
1235 }
1236
1237 sub _connect {
1238   my ($self, @info) = @_;
1239
1240   $self->throw_exception("You failed to provide any connection info")
1241     if !@info;
1242
1243   my ($old_connect_via, $dbh);
1244
1245   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1246     $old_connect_via = $DBI::connect_via;
1247     $DBI::connect_via = 'connect';
1248   }
1249
1250   try {
1251     if(ref $info[0] eq 'CODE') {
1252        $dbh = $info[0]->();
1253     }
1254     else {
1255        $dbh = DBI->connect(@info);
1256     }
1257
1258     if (!$dbh) {
1259       die $DBI::errstr;
1260     }
1261
1262     unless ($self->unsafe) {
1263
1264       # this odd anonymous coderef dereference is in fact really
1265       # necessary to avoid the unwanted effect described in perl5
1266       # RT#75792
1267       sub {
1268         my $weak_self = $_[0];
1269         weaken $weak_self;
1270
1271         $_[1]->{HandleError} = sub {
1272           if ($weak_self) {
1273             $weak_self->throw_exception("DBI Exception: $_[0]");
1274           }
1275           else {
1276             # the handler may be invoked by something totally out of
1277             # the scope of DBIC
1278             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1279           }
1280         };
1281       }->($self, $dbh);
1282
1283       $dbh->{ShowErrorStatement} = 1;
1284       $dbh->{RaiseError} = 1;
1285       $dbh->{PrintError} = 0;
1286     }
1287   }
1288   catch {
1289     $self->throw_exception("DBI Connection failed: $_")
1290   }
1291   finally {
1292     $DBI::connect_via = $old_connect_via if $old_connect_via;
1293   };
1294
1295   $self->_dbh_autocommit($dbh->{AutoCommit});
1296   $dbh;
1297 }
1298
1299 sub svp_begin {
1300   my ($self, $name) = @_;
1301
1302   $name = $self->_svp_generate_name
1303     unless defined $name;
1304
1305   $self->throw_exception ("You can't use savepoints outside a transaction")
1306     if $self->{transaction_depth} == 0;
1307
1308   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1309     unless $self->can('_svp_begin');
1310
1311   push @{ $self->{savepoints} }, $name;
1312
1313   $self->debugobj->svp_begin($name) if $self->debug;
1314
1315   return $self->_svp_begin($name);
1316 }
1317
1318 sub svp_release {
1319   my ($self, $name) = @_;
1320
1321   $self->throw_exception ("You can't use savepoints outside a transaction")
1322     if $self->{transaction_depth} == 0;
1323
1324   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1325     unless $self->can('_svp_release');
1326
1327   if (defined $name) {
1328     $self->throw_exception ("Savepoint '$name' does not exist")
1329       unless grep { $_ eq $name } @{ $self->{savepoints} };
1330
1331     # Dig through the stack until we find the one we are releasing.  This keeps
1332     # the stack up to date.
1333     my $svp;
1334
1335     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1336   } else {
1337     $name = pop @{ $self->{savepoints} };
1338   }
1339
1340   $self->debugobj->svp_release($name) if $self->debug;
1341
1342   return $self->_svp_release($name);
1343 }
1344
1345 sub svp_rollback {
1346   my ($self, $name) = @_;
1347
1348   $self->throw_exception ("You can't use savepoints outside a transaction")
1349     if $self->{transaction_depth} == 0;
1350
1351   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1352     unless $self->can('_svp_rollback');
1353
1354   if (defined $name) {
1355       # If they passed us a name, verify that it exists in the stack
1356       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1357           $self->throw_exception("Savepoint '$name' does not exist!");
1358       }
1359
1360       # Dig through the stack until we find the one we are releasing.  This keeps
1361       # the stack up to date.
1362       while(my $s = pop(@{ $self->{savepoints} })) {
1363           last if($s eq $name);
1364       }
1365       # Add the savepoint back to the stack, as a rollback doesn't remove the
1366       # named savepoint, only everything after it.
1367       push(@{ $self->{savepoints} }, $name);
1368   } else {
1369       # We'll assume they want to rollback to the last savepoint
1370       $name = $self->{savepoints}->[-1];
1371   }
1372
1373   $self->debugobj->svp_rollback($name) if $self->debug;
1374
1375   return $self->_svp_rollback($name);
1376 }
1377
1378 sub _svp_generate_name {
1379     my ($self) = @_;
1380
1381     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1382 }
1383
1384 sub txn_begin {
1385   my $self = shift;
1386
1387   # this means we have not yet connected and do not know the AC status
1388   # (e.g. coderef $dbh)
1389   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1390
1391   if($self->{transaction_depth} == 0) {
1392     $self->debugobj->txn_begin()
1393       if $self->debug;
1394     $self->_dbh_begin_work;
1395   }
1396   elsif ($self->auto_savepoint) {
1397     $self->svp_begin;
1398   }
1399   $self->{transaction_depth}++;
1400 }
1401
1402 sub _dbh_begin_work {
1403   my $self = shift;
1404
1405   # if the user is utilizing txn_do - good for him, otherwise we need to
1406   # ensure that the $dbh is healthy on BEGIN.
1407   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1408   # will be replaced by a failure of begin_work itself (which will be
1409   # then retried on reconnect)
1410   if ($self->{_in_dbh_do}) {
1411     $self->_dbh->begin_work;
1412   } else {
1413     $self->dbh_do(sub { $_[1]->begin_work });
1414   }
1415 }
1416
1417 sub txn_commit {
1418   my $self = shift;
1419   if ($self->{transaction_depth} == 1) {
1420     $self->debugobj->txn_commit()
1421       if ($self->debug);
1422     $self->_dbh_commit;
1423     $self->{transaction_depth} = 0
1424       if $self->_dbh_autocommit;
1425   }
1426   elsif($self->{transaction_depth} > 1) {
1427     $self->{transaction_depth}--;
1428     $self->svp_release
1429       if $self->auto_savepoint;
1430   }
1431 }
1432
1433 sub _dbh_commit {
1434   my $self = shift;
1435   my $dbh  = $self->_dbh
1436     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1437   $dbh->commit;
1438 }
1439
1440 sub txn_rollback {
1441   my $self = shift;
1442   my $dbh = $self->_dbh;
1443   try {
1444     if ($self->{transaction_depth} == 1) {
1445       $self->debugobj->txn_rollback()
1446         if ($self->debug);
1447       $self->{transaction_depth} = 0
1448         if $self->_dbh_autocommit;
1449       $self->_dbh_rollback;
1450     }
1451     elsif($self->{transaction_depth} > 1) {
1452       $self->{transaction_depth}--;
1453       if ($self->auto_savepoint) {
1454         $self->svp_rollback;
1455         $self->svp_release;
1456       }
1457     }
1458     else {
1459       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1460     }
1461   }
1462   catch {
1463     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1464
1465     if ($_ !~ /$exception_class/) {
1466       # ensure that a failed rollback resets the transaction depth
1467       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1468     }
1469
1470     $self->throw_exception($_)
1471   };
1472 }
1473
1474 sub _dbh_rollback {
1475   my $self = shift;
1476   my $dbh  = $self->_dbh
1477     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1478   $dbh->rollback;
1479 }
1480
1481 # This used to be the top-half of _execute.  It was split out to make it
1482 #  easier to override in NoBindVars without duping the rest.  It takes up
1483 #  all of _execute's args, and emits $sql, @bind.
1484 sub _prep_for_execute {
1485   my ($self, $op, $extra_bind, $ident, $args) = @_;
1486
1487   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1488     $ident = $ident->from();
1489   }
1490
1491   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1492
1493   unshift(@bind,
1494     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1495       if $extra_bind;
1496   return ($sql, \@bind);
1497 }
1498
1499
1500 sub _fix_bind_params {
1501     my ($self, @bind) = @_;
1502
1503     ### Turn @bind from something like this:
1504     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1505     ### to this:
1506     ###   ( "'1'", "'1'", "'3'" )
1507     return
1508         map {
1509             if ( defined( $_ && $_->[1] ) ) {
1510                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1511             }
1512             else { q{'NULL'}; }
1513         } @bind;
1514 }
1515
1516 sub _query_start {
1517     my ( $self, $sql, @bind ) = @_;
1518
1519     if ( $self->debug ) {
1520         @bind = $self->_fix_bind_params(@bind);
1521
1522         $self->debugobj->query_start( $sql, @bind );
1523     }
1524 }
1525
1526 sub _query_end {
1527     my ( $self, $sql, @bind ) = @_;
1528
1529     if ( $self->debug ) {
1530         @bind = $self->_fix_bind_params(@bind);
1531         $self->debugobj->query_end( $sql, @bind );
1532     }
1533 }
1534
1535 sub _dbh_execute {
1536   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1537
1538   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1539
1540   $self->_query_start( $sql, @$bind );
1541
1542   my $sth = $self->sth($sql,$op);
1543
1544   my $placeholder_index = 1;
1545
1546   foreach my $bound (@$bind) {
1547     my $attributes = {};
1548     my($column_name, @data) = @$bound;
1549
1550     if ($bind_attributes) {
1551       $attributes = $bind_attributes->{$column_name}
1552       if defined $bind_attributes->{$column_name};
1553     }
1554
1555     foreach my $data (@data) {
1556       my $ref = ref $data;
1557       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1558
1559       $sth->bind_param($placeholder_index, $data, $attributes);
1560       $placeholder_index++;
1561     }
1562   }
1563
1564   # Can this fail without throwing an exception anyways???
1565   my $rv = $sth->execute();
1566   $self->throw_exception(
1567     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1568   ) if !$rv;
1569
1570   $self->_query_end( $sql, @$bind );
1571
1572   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1573 }
1574
1575 sub _execute {
1576     my $self = shift;
1577     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1578 }
1579
1580 sub _prefetch_insert_auto_nextvals {
1581   my ($self, $source, $to_insert) = @_;
1582
1583   my $upd = {};
1584
1585   foreach my $col ( $source->columns ) {
1586     if ( !defined $to_insert->{$col} ) {
1587       my $col_info = $source->column_info($col);
1588
1589       if ( $col_info->{auto_nextval} ) {
1590         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1591           'nextval',
1592           $col_info->{sequence} ||=
1593             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1594         );
1595       }
1596     }
1597   }
1598
1599   return $upd;
1600 }
1601
1602 sub insert {
1603   my $self = shift;
1604   my ($source, $to_insert, $opts) = @_;
1605
1606   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1607
1608   my $bind_attributes = $self->source_bind_attributes($source);
1609
1610   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1611
1612   if ($opts->{returning}) {
1613     my @ret_cols = @{$opts->{returning}};
1614
1615     my @ret_vals = try {
1616       local $SIG{__WARN__} = sub {};
1617       my @r = $sth->fetchrow_array;
1618       $sth->finish;
1619       @r;
1620     };
1621
1622     my %ret;
1623     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1624
1625     $updated_cols = {
1626       %$updated_cols,
1627       %ret,
1628     };
1629   }
1630
1631   return $updated_cols;
1632 }
1633
1634 ## Currently it is assumed that all values passed will be "normal", i.e. not
1635 ## scalar refs, or at least, all the same type as the first set, the statement is
1636 ## only prepped once.
1637 sub insert_bulk {
1638   my ($self, $source, $cols, $data) = @_;
1639
1640   my %colvalues;
1641   @colvalues{@$cols} = (0..$#$cols);
1642
1643   for my $i (0..$#$cols) {
1644     my $first_val = $data->[0][$i];
1645     next unless ref $first_val eq 'SCALAR';
1646
1647     $colvalues{ $cols->[$i] } = $first_val;
1648   }
1649
1650   # check for bad data and stringify stringifiable objects
1651   my $bad_slice = sub {
1652     my ($msg, $col_idx, $slice_idx) = @_;
1653     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1654       $msg,
1655       $cols->[$col_idx],
1656       do {
1657         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1658         Dumper {
1659           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1660         },
1661       }
1662     );
1663   };
1664
1665   for my $datum_idx (0..$#$data) {
1666     my $datum = $data->[$datum_idx];
1667
1668     for my $col_idx (0..$#$cols) {
1669       my $val            = $datum->[$col_idx];
1670       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1671       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1672
1673       if ($is_literal_sql) {
1674         if (not ref $val) {
1675           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1676         }
1677         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1678           $bad_slice->("$reftype reference found where literal SQL expected",
1679             $col_idx, $datum_idx);
1680         }
1681         elsif ($$val ne $$sqla_bind){
1682           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1683             $col_idx, $datum_idx);
1684         }
1685       }
1686       elsif (my $reftype = ref $val) {
1687         require overload;
1688         if (overload::Method($val, '""')) {
1689           $datum->[$col_idx] = "".$val;
1690         }
1691         else {
1692           $bad_slice->("$reftype reference found where bind expected",
1693             $col_idx, $datum_idx);
1694         }
1695       }
1696     }
1697   }
1698
1699   my ($sql, $bind) = $self->_prep_for_execute (
1700     'insert', undef, $source, [\%colvalues]
1701   );
1702   my @bind = @$bind;
1703
1704   my $empty_bind = 1 if (not @bind) &&
1705     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1706
1707   if ((not @bind) && (not $empty_bind)) {
1708     $self->throw_exception(
1709       'Cannot insert_bulk without support for placeholders'
1710     );
1711   }
1712
1713   # neither _execute_array, nor _execute_inserts_with_no_binds are
1714   # atomic (even if _execute _array is a single call). Thus a safety
1715   # scope guard
1716   my $guard = $self->txn_scope_guard;
1717
1718   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1719   my $sth = $self->sth($sql);
1720   my $rv = do {
1721     if ($empty_bind) {
1722       # bind_param_array doesn't work if there are no binds
1723       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1724     }
1725     else {
1726 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1727       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1728     }
1729   };
1730
1731   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1732
1733   $guard->commit;
1734
1735   return (wantarray ? ($rv, $sth, @bind) : $rv);
1736 }
1737
1738 sub _execute_array {
1739   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1740
1741   ## This must be an arrayref, else nothing works!
1742   my $tuple_status = [];
1743
1744   ## Get the bind_attributes, if any exist
1745   my $bind_attributes = $self->source_bind_attributes($source);
1746
1747   ## Bind the values and execute
1748   my $placeholder_index = 1;
1749
1750   foreach my $bound (@$bind) {
1751
1752     my $attributes = {};
1753     my ($column_name, $data_index) = @$bound;
1754
1755     if( $bind_attributes ) {
1756       $attributes = $bind_attributes->{$column_name}
1757       if defined $bind_attributes->{$column_name};
1758     }
1759
1760     my @data = map { $_->[$data_index] } @$data;
1761
1762     $sth->bind_param_array(
1763       $placeholder_index,
1764       [@data],
1765       (%$attributes ?  $attributes : ()),
1766     );
1767     $placeholder_index++;
1768   }
1769
1770   my ($rv, $err);
1771   try {
1772     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1773   }
1774   catch {
1775     $err = shift;
1776   }
1777   finally {
1778     # Statement must finish even if there was an exception.
1779     try {
1780       $sth->finish
1781     }
1782     catch {
1783       $err = shift unless defined $err
1784     };
1785   };
1786
1787   $err = $sth->errstr
1788     if (! defined $err and $sth->err);
1789
1790   if (defined $err) {
1791     my $i = 0;
1792     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1793
1794     $self->throw_exception("Unexpected populate error: $err")
1795       if ($i > $#$tuple_status);
1796
1797     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1798       ($tuple_status->[$i][1] || $err),
1799       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1800     );
1801   }
1802
1803   return $rv;
1804 }
1805
1806 sub _dbh_execute_array {
1807     my ($self, $sth, $tuple_status, @extra) = @_;
1808
1809     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1810 }
1811
1812 sub _dbh_execute_inserts_with_no_binds {
1813   my ($self, $sth, $count) = @_;
1814
1815   my $err;
1816   try {
1817     my $dbh = $self->_get_dbh;
1818     local $dbh->{RaiseError} = 1;
1819     local $dbh->{PrintError} = 0;
1820
1821     $sth->execute foreach 1..$count;
1822   }
1823   catch {
1824     $err = shift;
1825   }
1826   finally {
1827     # Make sure statement is finished even if there was an exception.
1828     try {
1829       $sth->finish
1830     }
1831     catch {
1832       $err = shift unless defined $err;
1833     };
1834   };
1835
1836   $self->throw_exception($err) if defined $err;
1837
1838   return $count;
1839 }
1840
1841 sub update {
1842   my ($self, $source, @args) = @_;
1843
1844   my $bind_attrs = $self->source_bind_attributes($source);
1845
1846   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1847 }
1848
1849
1850 sub delete {
1851   my ($self, $source, @args) = @_;
1852
1853   my $bind_attrs = $self->source_bind_attributes($source);
1854
1855   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1856 }
1857
1858 # We were sent here because the $rs contains a complex search
1859 # which will require a subquery to select the correct rows
1860 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1861 #
1862 # Generating a single PK column subquery is trivial and supported
1863 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1864 # Look at _multipk_update_delete()
1865 sub _subq_update_delete {
1866   my $self = shift;
1867   my ($rs, $op, $values) = @_;
1868
1869   my $rsrc = $rs->result_source;
1870
1871   # quick check if we got a sane rs on our hands
1872   my @pcols = $rsrc->_pri_cols;
1873
1874   my $sel = $rs->_resolved_attrs->{select};
1875   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1876
1877   if (
1878       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1879         ne
1880       join ("\x00", sort @$sel )
1881   ) {
1882     $self->throw_exception (
1883       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1884     );
1885   }
1886
1887   if (@pcols == 1) {
1888     return $self->$op (
1889       $rsrc,
1890       $op eq 'update' ? $values : (),
1891       { $pcols[0] => { -in => $rs->as_query } },
1892     );
1893   }
1894
1895   else {
1896     return $self->_multipk_update_delete (@_);
1897   }
1898 }
1899
1900 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1901 # resultset update/delete involving subqueries. So by default resort
1902 # to simple (and inefficient) delete_all style per-row opearations,
1903 # while allowing specific storages to override this with a faster
1904 # implementation.
1905 #
1906 sub _multipk_update_delete {
1907   return shift->_per_row_update_delete (@_);
1908 }
1909
1910 # This is the default loop used to delete/update rows for multi PK
1911 # resultsets, and used by mysql exclusively (because it can't do anything
1912 # else).
1913 #
1914 # We do not use $row->$op style queries, because resultset update/delete
1915 # is not expected to cascade (this is what delete_all/update_all is for).
1916 #
1917 # There should be no race conditions as the entire operation is rolled
1918 # in a transaction.
1919 #
1920 sub _per_row_update_delete {
1921   my $self = shift;
1922   my ($rs, $op, $values) = @_;
1923
1924   my $rsrc = $rs->result_source;
1925   my @pcols = $rsrc->_pri_cols;
1926
1927   my $guard = $self->txn_scope_guard;
1928
1929   # emulate the return value of $sth->execute for non-selects
1930   my $row_cnt = '0E0';
1931
1932   my $subrs_cur = $rs->cursor;
1933   my @all_pk = $subrs_cur->all;
1934   for my $pks ( @all_pk) {
1935
1936     my $cond;
1937     for my $i (0.. $#pcols) {
1938       $cond->{$pcols[$i]} = $pks->[$i];
1939     }
1940
1941     $self->$op (
1942       $rsrc,
1943       $op eq 'update' ? $values : (),
1944       $cond,
1945     );
1946
1947     $row_cnt++;
1948   }
1949
1950   $guard->commit;
1951
1952   return $row_cnt;
1953 }
1954
1955 sub _select {
1956   my $self = shift;
1957   $self->_execute($self->_select_args(@_));
1958 }
1959
1960 sub _select_args_to_query {
1961   my $self = shift;
1962
1963   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1964   #  = $self->_select_args($ident, $select, $cond, $attrs);
1965   my ($op, $bind, $ident, $bind_attrs, @args) =
1966     $self->_select_args(@_);
1967
1968   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1969   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1970   $prepared_bind ||= [];
1971
1972   return wantarray
1973     ? ($sql, $prepared_bind, $bind_attrs)
1974     : \[ "($sql)", @$prepared_bind ]
1975   ;
1976 }
1977
1978 sub _select_args {
1979   my ($self, $ident, $select, $where, $attrs) = @_;
1980
1981   my $sql_maker = $self->sql_maker;
1982   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1983
1984   $attrs = {
1985     %$attrs,
1986     select => $select,
1987     from => $ident,
1988     where => $where,
1989     $rs_alias && $alias2source->{$rs_alias}
1990       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1991       : ()
1992     ,
1993   };
1994
1995   # calculate bind_attrs before possible $ident mangling
1996   my $bind_attrs = {};
1997   for my $alias (keys %$alias2source) {
1998     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1999     for my $col (keys %$bindtypes) {
2000
2001       my $fqcn = join ('.', $alias, $col);
2002       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2003
2004       # Unqialified column names are nice, but at the same time can be
2005       # rather ambiguous. What we do here is basically go along with
2006       # the loop, adding an unqualified column slot to $bind_attrs,
2007       # alongside the fully qualified name. As soon as we encounter
2008       # another column by that name (which would imply another table)
2009       # we unset the unqualified slot and never add any info to it
2010       # to avoid erroneous type binding. If this happens the users
2011       # only choice will be to fully qualify his column name
2012
2013       if (exists $bind_attrs->{$col}) {
2014         $bind_attrs->{$col} = {};
2015       }
2016       else {
2017         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2018       }
2019     }
2020   }
2021
2022   # Sanity check the attributes (SQLAHacks does it too, but
2023   # in case of a software_limit we'll never reach there)
2024   if (defined $attrs->{offset}) {
2025     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2026       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2027   }
2028   $attrs->{offset} ||= 0;
2029
2030   if (defined $attrs->{rows}) {
2031     $self->throw_exception("The rows attribute must be a positive integer if present")
2032       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2033   }
2034   elsif ($attrs->{offset}) {
2035     # MySQL actually recommends this approach.  I cringe.
2036     $attrs->{rows} = $sql_maker->__max_int;
2037   }
2038
2039   my @limit;
2040
2041   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2042   # storage, unless software limit was requested
2043   if (
2044     #limited has_many
2045     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2046        ||
2047     # grouped prefetch (to satisfy group_by == select)
2048     ( $attrs->{group_by}
2049         &&
2050       @{$attrs->{group_by}}
2051         &&
2052       $attrs->{_prefetch_select}
2053         &&
2054       @{$attrs->{_prefetch_select}}
2055     )
2056   ) {
2057     ($ident, $select, $where, $attrs)
2058       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2059   }
2060   elsif (! $attrs->{software_limit} ) {
2061     push @limit, $attrs->{rows}, $attrs->{offset};
2062   }
2063
2064   # try to simplify the joinmap further (prune unreferenced type-single joins)
2065   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2066
2067 ###
2068   # This would be the point to deflate anything found in $where
2069   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2070   # expect a row object. And all we have is a resultsource (it is trivial
2071   # to extract deflator coderefs via $alias2source above).
2072   #
2073   # I don't see a way forward other than changing the way deflators are
2074   # invoked, and that's just bad...
2075 ###
2076
2077   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2078 }
2079
2080 # Returns a counting SELECT for a simple count
2081 # query. Abstracted so that a storage could override
2082 # this to { count => 'firstcol' } or whatever makes
2083 # sense as a performance optimization
2084 sub _count_select {
2085   #my ($self, $source, $rs_attrs) = @_;
2086   return { count => '*' };
2087 }
2088
2089
2090 sub source_bind_attributes {
2091   my ($self, $source) = @_;
2092
2093   my $bind_attributes;
2094   foreach my $column ($source->columns) {
2095
2096     my $data_type = $source->column_info($column)->{data_type} || '';
2097     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2098      if $data_type;
2099   }
2100
2101   return $bind_attributes;
2102 }
2103
2104 =head2 select
2105
2106 =over 4
2107
2108 =item Arguments: $ident, $select, $condition, $attrs
2109
2110 =back
2111
2112 Handle a SQL select statement.
2113
2114 =cut
2115
2116 sub select {
2117   my $self = shift;
2118   my ($ident, $select, $condition, $attrs) = @_;
2119   return $self->cursor_class->new($self, \@_, $attrs);
2120 }
2121
2122 sub select_single {
2123   my $self = shift;
2124   my ($rv, $sth, @bind) = $self->_select(@_);
2125   my @row = $sth->fetchrow_array;
2126   my @nextrow = $sth->fetchrow_array if @row;
2127   if(@row && @nextrow) {
2128     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2129   }
2130   # Need to call finish() to work round broken DBDs
2131   $sth->finish();
2132   return @row;
2133 }
2134
2135 =head2 sth
2136
2137 =over 4
2138
2139 =item Arguments: $sql
2140
2141 =back
2142
2143 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2144
2145 =cut
2146
2147 sub _dbh_sth {
2148   my ($self, $dbh, $sql) = @_;
2149
2150   # 3 is the if_active parameter which avoids active sth re-use
2151   my $sth = $self->disable_sth_caching
2152     ? $dbh->prepare($sql)
2153     : $dbh->prepare_cached($sql, {}, 3);
2154
2155   # XXX You would think RaiseError would make this impossible,
2156   #  but apparently that's not true :(
2157   $self->throw_exception($dbh->errstr) if !$sth;
2158
2159   $sth;
2160 }
2161
2162 sub sth {
2163   my ($self, $sql) = @_;
2164   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2165 }
2166
2167 sub _dbh_columns_info_for {
2168   my ($self, $dbh, $table) = @_;
2169
2170   if ($dbh->can('column_info')) {
2171     my %result;
2172     my $caught;
2173     try {
2174       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2175       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2176       $sth->execute();
2177       while ( my $info = $sth->fetchrow_hashref() ){
2178         my %column_info;
2179         $column_info{data_type}   = $info->{TYPE_NAME};
2180         $column_info{size}      = $info->{COLUMN_SIZE};
2181         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2182         $column_info{default_value} = $info->{COLUMN_DEF};
2183         my $col_name = $info->{COLUMN_NAME};
2184         $col_name =~ s/^\"(.*)\"$/$1/;
2185
2186         $result{$col_name} = \%column_info;
2187       }
2188     } catch {
2189       $caught = 1;
2190     };
2191     return \%result if !$caught && scalar keys %result;
2192   }
2193
2194   my %result;
2195   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2196   $sth->execute;
2197   my @columns = @{$sth->{NAME_lc}};
2198   for my $i ( 0 .. $#columns ){
2199     my %column_info;
2200     $column_info{data_type} = $sth->{TYPE}->[$i];
2201     $column_info{size} = $sth->{PRECISION}->[$i];
2202     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2203
2204     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2205       $column_info{data_type} = $1;
2206       $column_info{size}    = $2;
2207     }
2208
2209     $result{$columns[$i]} = \%column_info;
2210   }
2211   $sth->finish;
2212
2213   foreach my $col (keys %result) {
2214     my $colinfo = $result{$col};
2215     my $type_num = $colinfo->{data_type};
2216     my $type_name;
2217     if(defined $type_num && $dbh->can('type_info')) {
2218       my $type_info = $dbh->type_info($type_num);
2219       $type_name = $type_info->{TYPE_NAME} if $type_info;
2220       $colinfo->{data_type} = $type_name if $type_name;
2221     }
2222   }
2223
2224   return \%result;
2225 }
2226
2227 sub columns_info_for {
2228   my ($self, $table) = @_;
2229   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2230 }
2231
2232 =head2 last_insert_id
2233
2234 Return the row id of the last insert.
2235
2236 =cut
2237
2238 sub _dbh_last_insert_id {
2239     my ($self, $dbh, $source, $col) = @_;
2240
2241     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2242
2243     return $id if defined $id;
2244
2245     my $class = ref $self;
2246     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2247 }
2248
2249 sub last_insert_id {
2250   my $self = shift;
2251   $self->_dbh_last_insert_id ($self->_dbh, @_);
2252 }
2253
2254 =head2 _native_data_type
2255
2256 =over 4
2257
2258 =item Arguments: $type_name
2259
2260 =back
2261
2262 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2263 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2264 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2265
2266 The default implementation returns C<undef>, implement in your Storage driver if
2267 you need this functionality.
2268
2269 Should map types from other databases to the native RDBMS type, for example
2270 C<VARCHAR2> to C<VARCHAR>.
2271
2272 Types with modifiers should map to the underlying data type. For example,
2273 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2274
2275 Composite types should map to the container type, for example
2276 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2277
2278 =cut
2279
2280 sub _native_data_type {
2281   #my ($self, $data_type) = @_;
2282   return undef
2283 }
2284
2285 # Check if placeholders are supported at all
2286 sub _determine_supports_placeholders {
2287   my $self = shift;
2288   my $dbh  = $self->_get_dbh;
2289
2290   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2291   # but it is inaccurate more often than not
2292   return try {
2293     local $dbh->{PrintError} = 0;
2294     local $dbh->{RaiseError} = 1;
2295     $dbh->do('select ?', {}, 1);
2296     1;
2297   }
2298   catch {
2299     0;
2300   };
2301 }
2302
2303 # Check if placeholders bound to non-string types throw exceptions
2304 #
2305 sub _determine_supports_typeless_placeholders {
2306   my $self = shift;
2307   my $dbh  = $self->_get_dbh;
2308
2309   return try {
2310     local $dbh->{PrintError} = 0;
2311     local $dbh->{RaiseError} = 1;
2312     # this specifically tests a bind that is NOT a string
2313     $dbh->do('select 1 where 1 = ?', {}, 1);
2314     1;
2315   }
2316   catch {
2317     0;
2318   };
2319 }
2320
2321 =head2 sqlt_type
2322
2323 Returns the database driver name.
2324
2325 =cut
2326
2327 sub sqlt_type {
2328   shift->_get_dbh->{Driver}->{Name};
2329 }
2330
2331 =head2 bind_attribute_by_data_type
2332
2333 Given a datatype from column info, returns a database specific bind
2334 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2335 let the database planner just handle it.
2336
2337 Generally only needed for special case column types, like bytea in postgres.
2338
2339 =cut
2340
2341 sub bind_attribute_by_data_type {
2342     return;
2343 }
2344
2345 =head2 is_datatype_numeric
2346
2347 Given a datatype from column_info, returns a boolean value indicating if
2348 the current RDBMS considers it a numeric value. This controls how
2349 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2350 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2351 be performed instead of the usual C<eq>.
2352
2353 =cut
2354
2355 sub is_datatype_numeric {
2356   my ($self, $dt) = @_;
2357
2358   return 0 unless $dt;
2359
2360   return $dt =~ /^ (?:
2361     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2362   ) $/ix;
2363 }
2364
2365
2366 =head2 create_ddl_dir
2367
2368 =over 4
2369
2370 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2371
2372 =back
2373
2374 Creates a SQL file based on the Schema, for each of the specified
2375 database engines in C<\@databases> in the given directory.
2376 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2377
2378 Given a previous version number, this will also create a file containing
2379 the ALTER TABLE statements to transform the previous schema into the
2380 current one. Note that these statements may contain C<DROP TABLE> or
2381 C<DROP COLUMN> statements that can potentially destroy data.
2382
2383 The file names are created using the C<ddl_filename> method below, please
2384 override this method in your schema if you would like a different file
2385 name format. For the ALTER file, the same format is used, replacing
2386 $version in the name with "$preversion-$version".
2387
2388 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2389 The most common value for this would be C<< { add_drop_table => 1 } >>
2390 to have the SQL produced include a C<DROP TABLE> statement for each table
2391 created. For quoting purposes supply C<quote_table_names> and
2392 C<quote_field_names>.
2393
2394 If no arguments are passed, then the following default values are assumed:
2395
2396 =over 4
2397
2398 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2399
2400 =item version    - $schema->schema_version
2401
2402 =item directory  - './'
2403
2404 =item preversion - <none>
2405
2406 =back
2407
2408 By default, C<\%sqlt_args> will have
2409
2410  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2411
2412 merged with the hash passed in. To disable any of those features, pass in a
2413 hashref like the following
2414
2415  { ignore_constraint_names => 0, # ... other options }
2416
2417
2418 WARNING: You are strongly advised to check all SQL files created, before applying
2419 them.
2420
2421 =cut
2422
2423 sub create_ddl_dir {
2424   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2425
2426   unless ($dir) {
2427     carp "No directory given, using ./\n";
2428     $dir = './';
2429   } else {
2430       -d $dir
2431         or
2432       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2433         or
2434       $self->throw_exception(
2435         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2436       );
2437   }
2438
2439   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2440
2441   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2442   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2443
2444   my $schema_version = $schema->schema_version || '1.x';
2445   $version ||= $schema_version;
2446
2447   $sqltargs = {
2448     add_drop_table => 1,
2449     ignore_constraint_names => 1,
2450     ignore_index_names => 1,
2451     %{$sqltargs || {}}
2452   };
2453
2454   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2455     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2456   }
2457
2458   my $sqlt = SQL::Translator->new( $sqltargs );
2459
2460   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2461   my $sqlt_schema = $sqlt->translate({ data => $schema })
2462     or $self->throw_exception ($sqlt->error);
2463
2464   foreach my $db (@$databases) {
2465     $sqlt->reset();
2466     $sqlt->{schema} = $sqlt_schema;
2467     $sqlt->producer($db);
2468
2469     my $file;
2470     my $filename = $schema->ddl_filename($db, $version, $dir);
2471     if (-e $filename && ($version eq $schema_version )) {
2472       # if we are dumping the current version, overwrite the DDL
2473       carp "Overwriting existing DDL file - $filename";
2474       unlink($filename);
2475     }
2476
2477     my $output = $sqlt->translate;
2478     if(!$output) {
2479       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2480       next;
2481     }
2482     if(!open($file, ">$filename")) {
2483       $self->throw_exception("Can't open $filename for writing ($!)");
2484       next;
2485     }
2486     print $file $output;
2487     close($file);
2488
2489     next unless ($preversion);
2490
2491     require SQL::Translator::Diff;
2492
2493     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2494     if(!-e $prefilename) {
2495       carp("No previous schema file found ($prefilename)");
2496       next;
2497     }
2498
2499     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2500     if(-e $difffile) {
2501       carp("Overwriting existing diff file - $difffile");
2502       unlink($difffile);
2503     }
2504
2505     my $source_schema;
2506     {
2507       my $t = SQL::Translator->new($sqltargs);
2508       $t->debug( 0 );
2509       $t->trace( 0 );
2510
2511       $t->parser( $db )
2512         or $self->throw_exception ($t->error);
2513
2514       my $out = $t->translate( $prefilename )
2515         or $self->throw_exception ($t->error);
2516
2517       $source_schema = $t->schema;
2518
2519       $source_schema->name( $prefilename )
2520         unless ( $source_schema->name );
2521     }
2522
2523     # The "new" style of producers have sane normalization and can support
2524     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2525     # And we have to diff parsed SQL against parsed SQL.
2526     my $dest_schema = $sqlt_schema;
2527
2528     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2529       my $t = SQL::Translator->new($sqltargs);
2530       $t->debug( 0 );
2531       $t->trace( 0 );
2532
2533       $t->parser( $db )
2534         or $self->throw_exception ($t->error);
2535
2536       my $out = $t->translate( $filename )
2537         or $self->throw_exception ($t->error);
2538
2539       $dest_schema = $t->schema;
2540
2541       $dest_schema->name( $filename )
2542         unless $dest_schema->name;
2543     }
2544
2545     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2546                                                   $dest_schema,   $db,
2547                                                   $sqltargs
2548                                                  );
2549     if(!open $file, ">$difffile") {
2550       $self->throw_exception("Can't write to $difffile ($!)");
2551       next;
2552     }
2553     print $file $diff;
2554     close($file);
2555   }
2556 }
2557
2558 =head2 deployment_statements
2559
2560 =over 4
2561
2562 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2563
2564 =back
2565
2566 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2567
2568 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2569 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2570
2571 C<$directory> is used to return statements from files in a previously created
2572 L</create_ddl_dir> directory and is optional. The filenames are constructed
2573 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2574
2575 If no C<$directory> is specified then the statements are constructed on the
2576 fly using L<SQL::Translator> and C<$version> is ignored.
2577
2578 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2579
2580 =cut
2581
2582 sub deployment_statements {
2583   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2584   $type ||= $self->sqlt_type;
2585   $version ||= $schema->schema_version || '1.x';
2586   $dir ||= './';
2587   my $filename = $schema->ddl_filename($type, $version, $dir);
2588   if(-f $filename)
2589   {
2590       my $file;
2591       open($file, "<$filename")
2592         or $self->throw_exception("Can't open $filename ($!)");
2593       my @rows = <$file>;
2594       close($file);
2595       return join('', @rows);
2596   }
2597
2598   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2599     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2600   }
2601
2602   # sources needs to be a parser arg, but for simplicty allow at top level
2603   # coming in
2604   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2605       if exists $sqltargs->{sources};
2606
2607   my $tr = SQL::Translator->new(
2608     producer => "SQL::Translator::Producer::${type}",
2609     %$sqltargs,
2610     parser => 'SQL::Translator::Parser::DBIx::Class',
2611     data => $schema,
2612   );
2613
2614   my @ret;
2615   my $wa = wantarray;
2616   if ($wa) {
2617     @ret = $tr->translate;
2618   }
2619   else {
2620     $ret[0] = $tr->translate;
2621   }
2622
2623   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2624     unless (@ret && defined $ret[0]);
2625
2626   return $wa ? @ret : $ret[0];
2627 }
2628
2629 sub deploy {
2630   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2631   my $deploy = sub {
2632     my $line = shift;
2633     return if($line =~ /^--/);
2634     return if(!$line);
2635     # next if($line =~ /^DROP/m);
2636     return if($line =~ /^BEGIN TRANSACTION/m);
2637     return if($line =~ /^COMMIT/m);
2638     return if $line =~ /^\s+$/; # skip whitespace only
2639     $self->_query_start($line);
2640     try {
2641       # do a dbh_do cycle here, as we need some error checking in
2642       # place (even though we will ignore errors)
2643       $self->dbh_do (sub { $_[1]->do($line) });
2644     } catch {
2645       carp qq{$_ (running "${line}")};
2646     };
2647     $self->_query_end($line);
2648   };
2649   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2650   if (@statements > 1) {
2651     foreach my $statement (@statements) {
2652       $deploy->( $statement );
2653     }
2654   }
2655   elsif (@statements == 1) {
2656     foreach my $line ( split(";\n", $statements[0])) {
2657       $deploy->( $line );
2658     }
2659   }
2660 }
2661
2662 =head2 datetime_parser
2663
2664 Returns the datetime parser class
2665
2666 =cut
2667
2668 sub datetime_parser {
2669   my $self = shift;
2670   return $self->{datetime_parser} ||= do {
2671     $self->build_datetime_parser(@_);
2672   };
2673 }
2674
2675 =head2 datetime_parser_type
2676
2677 Defines (returns) the datetime parser class - currently hardwired to
2678 L<DateTime::Format::MySQL>
2679
2680 =cut
2681
2682 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2683
2684 =head2 build_datetime_parser
2685
2686 See L</datetime_parser>
2687
2688 =cut
2689
2690 sub build_datetime_parser {
2691   my $self = shift;
2692   my $type = $self->datetime_parser_type(@_);
2693   $self->ensure_class_loaded ($type);
2694   return $type;
2695 }
2696
2697
2698 =head2 is_replicating
2699
2700 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2701 replicate from a master database.  Default is undef, which is the result
2702 returned by databases that don't support replication.
2703
2704 =cut
2705
2706 sub is_replicating {
2707     return;
2708
2709 }
2710
2711 =head2 lag_behind_master
2712
2713 Returns a number that represents a certain amount of lag behind a master db
2714 when a given storage is replicating.  The number is database dependent, but
2715 starts at zero and increases with the amount of lag. Default in undef
2716
2717 =cut
2718
2719 sub lag_behind_master {
2720     return;
2721 }
2722
2723 =head2 relname_to_table_alias
2724
2725 =over 4
2726
2727 =item Arguments: $relname, $join_count
2728
2729 =back
2730
2731 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2732 queries.
2733
2734 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2735 way these aliases are named.
2736
2737 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2738 otherwise C<"$relname">.
2739
2740 =cut
2741
2742 sub relname_to_table_alias {
2743   my ($self, $relname, $join_count) = @_;
2744
2745   my $alias = ($join_count && $join_count > 1 ?
2746     join('_', $relname, $join_count) : $relname);
2747
2748   return $alias;
2749 }
2750
2751 1;
2752
2753 =head1 USAGE NOTES
2754
2755 =head2 DBIx::Class and AutoCommit
2756
2757 DBIx::Class can do some wonderful magic with handling exceptions,
2758 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2759 (the default) combined with C<txn_do> for transaction support.
2760
2761 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2762 in an assumed transaction between commits, and you're telling us you'd
2763 like to manage that manually.  A lot of the magic protections offered by
2764 this module will go away.  We can't protect you from exceptions due to database
2765 disconnects because we don't know anything about how to restart your
2766 transactions.  You're on your own for handling all sorts of exceptional
2767 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2768 be with raw DBI.
2769
2770
2771 =head1 AUTHORS
2772
2773 Matt S. Trout <mst@shadowcatsystems.co.uk>
2774
2775 Andy Grundman <andy@hybridized.org>
2776
2777 =head1 LICENSE
2778
2779 You may distribute this code under the same terms as Perl itself.
2780
2781 =cut