e14f234c3f607456081d7648a3c8747ac71fc95e
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders join_optimizer/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
60
61 # on by default, not strictly a capability (pending rewrite)
62 __PACKAGE__->_use_join_optimizer (1);
63 sub _determine_supports_join_optimizer { 1 };
64
65 # Each of these methods need _determine_driver called before itself
66 # in order to function reliably. This is a purely DRY optimization
67 #
68 # get_(use)_dbms_capability need to be called on the correct Storage
69 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
70 # _determine_supports_X which obv. needs a correct driver as well
71 my @rdbms_specific_methods = qw/
72   deployment_statements
73   sqlt_type
74   sql_maker
75   build_datetime_parser
76   datetime_parser_type
77
78   insert
79   insert_bulk
80   update
81   delete
82   select
83   select_single
84
85   get_use_dbms_capability
86   get_dbms_capability
87
88   _server_info
89   _get_server_version
90 /;
91
92 for my $meth (@rdbms_specific_methods) {
93
94   my $orig = __PACKAGE__->can ($meth)
95     or die "$meth is not a ::Storage::DBI method!";
96
97   no strict qw/refs/;
98   no warnings qw/redefine/;
99   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
100     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
101       $_[0]->_determine_driver;
102
103       # This for some reason crashes and burns on perl 5.8.1
104       # IFF the method ends up throwing an exception
105       #goto $_[0]->can ($meth);
106
107       my $cref = $_[0]->can ($meth);
108       goto $cref;
109     }
110     goto $orig;
111   };
112 }
113
114
115 =head1 NAME
116
117 DBIx::Class::Storage::DBI - DBI storage handler
118
119 =head1 SYNOPSIS
120
121   my $schema = MySchema->connect('dbi:SQLite:my.db');
122
123   $schema->storage->debug(1);
124
125   my @stuff = $schema->storage->dbh_do(
126     sub {
127       my ($storage, $dbh, @args) = @_;
128       $dbh->do("DROP TABLE authors");
129     },
130     @column_list
131   );
132
133   $schema->resultset('Book')->search({
134      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
135   });
136
137 =head1 DESCRIPTION
138
139 This class represents the connection to an RDBMS via L<DBI>.  See
140 L<DBIx::Class::Storage> for general information.  This pod only
141 documents DBI-specific methods and behaviors.
142
143 =head1 METHODS
144
145 =cut
146
147 sub new {
148   my $new = shift->next::method(@_);
149
150   $new->transaction_depth(0);
151   $new->_sql_maker_opts({});
152   $new->_dbh_details({});
153   $new->{savepoints} = [];
154   $new->{_in_dbh_do} = 0;
155   $new->{_dbh_gen} = 0;
156
157   # read below to see what this does
158   $new->_arm_global_destructor;
159
160   $new;
161 }
162
163 # This is hack to work around perl shooting stuff in random
164 # order on exit(). If we do not walk the remaining storage
165 # objects in an END block, there is a *small but real* chance
166 # of a fork()ed child to kill the parent's shared DBI handle,
167 # *before perl reaches the DESTROY in this package*
168 # Yes, it is ugly and effective.
169 # Additionally this registry is used by the CLONE method to
170 # make sure no handles are shared between threads
171 {
172   my %seek_and_destroy;
173
174   sub _arm_global_destructor {
175     my $self = shift;
176     my $key = refaddr ($self);
177     $seek_and_destroy{$key} = $self;
178     weaken ($seek_and_destroy{$key});
179   }
180
181   END {
182     local $?; # just in case the DBI destructor changes it somehow
183
184     # destroy just the object if not native to this process/thread
185     $_->_verify_pid for (grep
186       { defined $_ }
187       values %seek_and_destroy
188     );
189   }
190
191   sub CLONE {
192     # As per DBI's recommendation, DBIC disconnects all handles as
193     # soon as possible (DBIC will reconnect only on demand from within
194     # the thread)
195     for (values %seek_and_destroy) {
196       next unless $_;
197       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
198       $_->_dbh(undef);
199     }
200   }
201 }
202
203 sub DESTROY {
204   my $self = shift;
205
206   # some databases spew warnings on implicit disconnect
207   local $SIG{__WARN__} = sub {};
208   $self->_dbh(undef);
209
210   # this op is necessary, since the very last perl runtime statement
211   # triggers a global destruction shootout, and the $SIG localization
212   # may very well be destroyed before perl actually gets to do the
213   # $dbh undef
214   1;
215 }
216
217 # handle pid changes correctly - do not destroy parent's connection
218 sub _verify_pid {
219   my $self = shift;
220
221   my $pid = $self->_conn_pid;
222   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
223     $dbh->{InactiveDestroy} = 1;
224     $self->{_dbh_gen}++;
225     $self->_dbh(undef);
226   }
227
228   return;
229 }
230
231 =head2 connect_info
232
233 This method is normally called by L<DBIx::Class::Schema/connection>, which
234 encapsulates its argument list in an arrayref before passing them here.
235
236 The argument list may contain:
237
238 =over
239
240 =item *
241
242 The same 4-element argument set one would normally pass to
243 L<DBI/connect>, optionally followed by
244 L<extra attributes|/DBIx::Class specific connection attributes>
245 recognized by DBIx::Class:
246
247   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
248
249 =item *
250
251 A single code reference which returns a connected
252 L<DBI database handle|DBI/connect> optionally followed by
253 L<extra attributes|/DBIx::Class specific connection attributes> recognized
254 by DBIx::Class:
255
256   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
257
258 =item *
259
260 A single hashref with all the attributes and the dsn/user/password
261 mixed together:
262
263   $connect_info_args = [{
264     dsn => $dsn,
265     user => $user,
266     password => $pass,
267     %dbi_attributes,
268     %extra_attributes,
269   }];
270
271   $connect_info_args = [{
272     dbh_maker => sub { DBI->connect (...) },
273     %dbi_attributes,
274     %extra_attributes,
275   }];
276
277 This is particularly useful for L<Catalyst> based applications, allowing the
278 following config (L<Config::General> style):
279
280   <Model::DB>
281     schema_class   App::DB
282     <connect_info>
283       dsn          dbi:mysql:database=test
284       user         testuser
285       password     TestPass
286       AutoCommit   1
287     </connect_info>
288   </Model::DB>
289
290 The C<dsn>/C<user>/C<password> combination can be substituted by the
291 C<dbh_maker> key whose value is a coderef that returns a connected
292 L<DBI database handle|DBI/connect>
293
294 =back
295
296 Please note that the L<DBI> docs recommend that you always explicitly
297 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
298 recommends that it be set to I<1>, and that you perform transactions
299 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
300 to I<1> if you do not do explicitly set it to zero.  This is the default
301 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
302
303 =head3 DBIx::Class specific connection attributes
304
305 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
306 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
307 the following connection options. These options can be mixed in with your other
308 L<DBI> connection attributes, or placed in a separate hashref
309 (C<\%extra_attributes>) as shown above.
310
311 Every time C<connect_info> is invoked, any previous settings for
312 these options will be cleared before setting the new ones, regardless of
313 whether any options are specified in the new C<connect_info>.
314
315
316 =over
317
318 =item on_connect_do
319
320 Specifies things to do immediately after connecting or re-connecting to
321 the database.  Its value may contain:
322
323 =over
324
325 =item a scalar
326
327 This contains one SQL statement to execute.
328
329 =item an array reference
330
331 This contains SQL statements to execute in order.  Each element contains
332 a string or a code reference that returns a string.
333
334 =item a code reference
335
336 This contains some code to execute.  Unlike code references within an
337 array reference, its return value is ignored.
338
339 =back
340
341 =item on_disconnect_do
342
343 Takes arguments in the same form as L</on_connect_do> and executes them
344 immediately before disconnecting from the database.
345
346 Note, this only runs if you explicitly call L</disconnect> on the
347 storage object.
348
349 =item on_connect_call
350
351 A more generalized form of L</on_connect_do> that calls the specified
352 C<connect_call_METHOD> methods in your storage driver.
353
354   on_connect_do => 'select 1'
355
356 is equivalent to:
357
358   on_connect_call => [ [ do_sql => 'select 1' ] ]
359
360 Its values may contain:
361
362 =over
363
364 =item a scalar
365
366 Will call the C<connect_call_METHOD> method.
367
368 =item a code reference
369
370 Will execute C<< $code->($storage) >>
371
372 =item an array reference
373
374 Each value can be a method name or code reference.
375
376 =item an array of arrays
377
378 For each array, the first item is taken to be the C<connect_call_> method name
379 or code reference, and the rest are parameters to it.
380
381 =back
382
383 Some predefined storage methods you may use:
384
385 =over
386
387 =item do_sql
388
389 Executes a SQL string or a code reference that returns a SQL string. This is
390 what L</on_connect_do> and L</on_disconnect_do> use.
391
392 It can take:
393
394 =over
395
396 =item a scalar
397
398 Will execute the scalar as SQL.
399
400 =item an arrayref
401
402 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
403 attributes hashref and bind values.
404
405 =item a code reference
406
407 Will execute C<< $code->($storage) >> and execute the return array refs as
408 above.
409
410 =back
411
412 =item datetime_setup
413
414 Execute any statements necessary to initialize the database session to return
415 and accept datetime/timestamp values used with
416 L<DBIx::Class::InflateColumn::DateTime>.
417
418 Only necessary for some databases, see your specific storage driver for
419 implementation details.
420
421 =back
422
423 =item on_disconnect_call
424
425 Takes arguments in the same form as L</on_connect_call> and executes them
426 immediately before disconnecting from the database.
427
428 Calls the C<disconnect_call_METHOD> methods as opposed to the
429 C<connect_call_METHOD> methods called by L</on_connect_call>.
430
431 Note, this only runs if you explicitly call L</disconnect> on the
432 storage object.
433
434 =item disable_sth_caching
435
436 If set to a true value, this option will disable the caching of
437 statement handles via L<DBI/prepare_cached>.
438
439 =item limit_dialect
440
441 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
442 default L</sql_limit_dialect> setting of the storage (if any). For a list
443 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
444
445 =item quote_char
446
447 Specifies what characters to use to quote table and column names.
448
449 C<quote_char> expects either a single character, in which case is it
450 is placed on either side of the table/column name, or an arrayref of length
451 2 in which case the table/column name is placed between the elements.
452
453 For example under MySQL you should use C<< quote_char => '`' >>, and for
454 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
455
456 =item name_sep
457
458 This parameter is only useful in conjunction with C<quote_char>, and is used to
459 specify the character that separates elements (schemas, tables, columns) from
460 each other. If unspecified it defaults to the most commonly used C<.>.
461
462 =item unsafe
463
464 This Storage driver normally installs its own C<HandleError>, sets
465 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
466 all database handles, including those supplied by a coderef.  It does this
467 so that it can have consistent and useful error behavior.
468
469 If you set this option to a true value, Storage will not do its usual
470 modifications to the database handle's attributes, and instead relies on
471 the settings in your connect_info DBI options (or the values you set in
472 your connection coderef, in the case that you are connecting via coderef).
473
474 Note that your custom settings can cause Storage to malfunction,
475 especially if you set a C<HandleError> handler that suppresses exceptions
476 and/or disable C<RaiseError>.
477
478 =item auto_savepoint
479
480 If this option is true, L<DBIx::Class> will use savepoints when nesting
481 transactions, making it possible to recover from failure in the inner
482 transaction without having to abort all outer transactions.
483
484 =item cursor_class
485
486 Use this argument to supply a cursor class other than the default
487 L<DBIx::Class::Storage::DBI::Cursor>.
488
489 =back
490
491 Some real-life examples of arguments to L</connect_info> and
492 L<DBIx::Class::Schema/connect>
493
494   # Simple SQLite connection
495   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
496
497   # Connect via subref
498   ->connect_info([ sub { DBI->connect(...) } ]);
499
500   # Connect via subref in hashref
501   ->connect_info([{
502     dbh_maker => sub { DBI->connect(...) },
503     on_connect_do => 'alter session ...',
504   }]);
505
506   # A bit more complicated
507   ->connect_info(
508     [
509       'dbi:Pg:dbname=foo',
510       'postgres',
511       'my_pg_password',
512       { AutoCommit => 1 },
513       { quote_char => q{"} },
514     ]
515   );
516
517   # Equivalent to the previous example
518   ->connect_info(
519     [
520       'dbi:Pg:dbname=foo',
521       'postgres',
522       'my_pg_password',
523       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
524     ]
525   );
526
527   # Same, but with hashref as argument
528   # See parse_connect_info for explanation
529   ->connect_info(
530     [{
531       dsn         => 'dbi:Pg:dbname=foo',
532       user        => 'postgres',
533       password    => 'my_pg_password',
534       AutoCommit  => 1,
535       quote_char  => q{"},
536       name_sep    => q{.},
537     }]
538   );
539
540   # Subref + DBIx::Class-specific connection options
541   ->connect_info(
542     [
543       sub { DBI->connect(...) },
544       {
545           quote_char => q{`},
546           name_sep => q{@},
547           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
548           disable_sth_caching => 1,
549       },
550     ]
551   );
552
553
554
555 =cut
556
557 sub connect_info {
558   my ($self, $info) = @_;
559
560   return $self->_connect_info if !$info;
561
562   $self->_connect_info($info); # copy for _connect_info
563
564   $info = $self->_normalize_connect_info($info)
565     if ref $info eq 'ARRAY';
566
567   for my $storage_opt (keys %{ $info->{storage_options} }) {
568     my $value = $info->{storage_options}{$storage_opt};
569
570     $self->$storage_opt($value);
571   }
572
573   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
574   #  the new set of options
575   $self->_sql_maker(undef);
576   $self->_sql_maker_opts({});
577
578   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
579     my $value = $info->{sql_maker_options}{$sql_maker_opt};
580
581     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
582   }
583
584   my %attrs = (
585     %{ $self->_default_dbi_connect_attributes || {} },
586     %{ $info->{attributes} || {} },
587   );
588
589   my @args = @{ $info->{arguments} };
590
591   $self->_dbi_connect_info([@args,
592     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
593
594   # FIXME - dirty:
595   # save attributes them in a separate accessor so they are always
596   # introspectable, even in case of a CODE $dbhmaker
597   $self->_dbic_connect_attributes (\%attrs);
598
599   return $self->_connect_info;
600 }
601
602 sub _normalize_connect_info {
603   my ($self, $info_arg) = @_;
604   my %info;
605
606   my @args = @$info_arg;  # take a shallow copy for further mutilation
607
608   # combine/pre-parse arguments depending on invocation style
609
610   my %attrs;
611   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
612     %attrs = %{ $args[1] || {} };
613     @args = $args[0];
614   }
615   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
616     %attrs = %{$args[0]};
617     @args = ();
618     if (my $code = delete $attrs{dbh_maker}) {
619       @args = $code;
620
621       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
622       if (@ignored) {
623         carp sprintf (
624             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
625           . "to the result of 'dbh_maker'",
626
627           join (', ', map { "'$_'" } (@ignored) ),
628         );
629       }
630     }
631     else {
632       @args = delete @attrs{qw/dsn user password/};
633     }
634   }
635   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
636     %attrs = (
637       % { $args[3] || {} },
638       % { $args[4] || {} },
639     );
640     @args = @args[0,1,2];
641   }
642
643   $info{arguments} = \@args;
644
645   my @storage_opts = grep exists $attrs{$_},
646     @storage_options, 'cursor_class';
647
648   @{ $info{storage_options} }{@storage_opts} =
649     delete @attrs{@storage_opts} if @storage_opts;
650
651   my @sql_maker_opts = grep exists $attrs{$_},
652     qw/limit_dialect quote_char name_sep/;
653
654   @{ $info{sql_maker_options} }{@sql_maker_opts} =
655     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
656
657   $info{attributes} = \%attrs if %attrs;
658
659   return \%info;
660 }
661
662 sub _default_dbi_connect_attributes {
663   return {
664     AutoCommit => 1,
665     RaiseError => 1,
666     PrintError => 0,
667   };
668 }
669
670 =head2 on_connect_do
671
672 This method is deprecated in favour of setting via L</connect_info>.
673
674 =cut
675
676 =head2 on_disconnect_do
677
678 This method is deprecated in favour of setting via L</connect_info>.
679
680 =cut
681
682 sub _parse_connect_do {
683   my ($self, $type) = @_;
684
685   my $val = $self->$type;
686   return () if not defined $val;
687
688   my @res;
689
690   if (not ref($val)) {
691     push @res, [ 'do_sql', $val ];
692   } elsif (ref($val) eq 'CODE') {
693     push @res, $val;
694   } elsif (ref($val) eq 'ARRAY') {
695     push @res, map { [ 'do_sql', $_ ] } @$val;
696   } else {
697     $self->throw_exception("Invalid type for $type: ".ref($val));
698   }
699
700   return \@res;
701 }
702
703 =head2 dbh_do
704
705 Arguments: ($subref | $method_name), @extra_coderef_args?
706
707 Execute the given $subref or $method_name using the new exception-based
708 connection management.
709
710 The first two arguments will be the storage object that C<dbh_do> was called
711 on and a database handle to use.  Any additional arguments will be passed
712 verbatim to the called subref as arguments 2 and onwards.
713
714 Using this (instead of $self->_dbh or $self->dbh) ensures correct
715 exception handling and reconnection (or failover in future subclasses).
716
717 Your subref should have no side-effects outside of the database, as
718 there is the potential for your subref to be partially double-executed
719 if the database connection was stale/dysfunctional.
720
721 Example:
722
723   my @stuff = $schema->storage->dbh_do(
724     sub {
725       my ($storage, $dbh, @cols) = @_;
726       my $cols = join(q{, }, @cols);
727       $dbh->selectrow_array("SELECT $cols FROM foo");
728     },
729     @column_list
730   );
731
732 =cut
733
734 sub dbh_do {
735   my $self = shift;
736   my $code = shift;
737
738   my $dbh = $self->_get_dbh;
739
740   return $self->$code($dbh, @_)
741     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
742
743   local $self->{_in_dbh_do} = 1;
744
745   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
746   my $args = \@_;
747   return try {
748     $self->$code ($dbh, @$args);
749   } catch {
750     $self->throw_exception($_) if $self->connected;
751
752     # We were not connected - reconnect and retry, but let any
753     #  exception fall right through this time
754     carp "Retrying $code after catching disconnected exception: $_"
755       if $ENV{DBIC_DBIRETRY_DEBUG};
756
757     $self->_populate_dbh;
758     $self->$code($self->_dbh, @$args);
759   };
760 }
761
762 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
763 # It also informs dbh_do to bypass itself while under the direction of txn_do,
764 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
765 sub txn_do {
766   my $self = shift;
767   my $coderef = shift;
768
769   ref $coderef eq 'CODE' or $self->throw_exception
770     ('$coderef must be a CODE reference');
771
772   local $self->{_in_dbh_do} = 1;
773
774   my @result;
775   my $want_array = wantarray;
776
777   my $tried = 0;
778   while(1) {
779     my $exception;
780
781     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
782     my $args = \@_;
783
784     try {
785       $self->txn_begin;
786       my $txn_start_depth = $self->transaction_depth;
787       if($want_array) {
788           @result = $coderef->(@$args);
789       }
790       elsif(defined $want_array) {
791           $result[0] = $coderef->(@$args);
792       }
793       else {
794           $coderef->(@$args);
795       }
796
797       my $delta_txn = $txn_start_depth - $self->transaction_depth;
798       if ($delta_txn == 0) {
799         $self->txn_commit;
800       }
801       elsif ($delta_txn != 1) {
802         # an off-by-one would mean we fired a rollback
803         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
804       }
805     } catch {
806       $exception = $_;
807     };
808
809     if(! defined $exception) { return $want_array ? @result : $result[0] }
810
811     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
812       my $rollback_exception;
813       try { $self->txn_rollback } catch { $rollback_exception = shift };
814       if(defined $rollback_exception) {
815         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
816         $self->throw_exception($exception)  # propagate nested rollback
817           if $rollback_exception =~ /$exception_class/;
818
819         $self->throw_exception(
820           "Transaction aborted: ${exception}. "
821           . "Rollback failed: ${rollback_exception}"
822         );
823       }
824       $self->throw_exception($exception)
825     }
826
827     # We were not connected, and was first try - reconnect and retry
828     # via the while loop
829     carp "Retrying $coderef after catching disconnected exception: $exception"
830       if $ENV{DBIC_TXNRETRY_DEBUG};
831     $self->_populate_dbh;
832   }
833 }
834
835 =head2 disconnect
836
837 Our C<disconnect> method also performs a rollback first if the
838 database is not in C<AutoCommit> mode.
839
840 =cut
841
842 sub disconnect {
843   my ($self) = @_;
844
845   if( $self->_dbh ) {
846     my @actions;
847
848     push @actions, ( $self->on_disconnect_call || () );
849     push @actions, $self->_parse_connect_do ('on_disconnect_do');
850
851     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
852
853     $self->_dbh_rollback unless $self->_dbh_autocommit;
854
855     %{ $self->_dbh->{CachedKids} } = ();
856     $self->_dbh->disconnect;
857     $self->_dbh(undef);
858     $self->{_dbh_gen}++;
859   }
860 }
861
862 =head2 with_deferred_fk_checks
863
864 =over 4
865
866 =item Arguments: C<$coderef>
867
868 =item Return Value: The return value of $coderef
869
870 =back
871
872 Storage specific method to run the code ref with FK checks deferred or
873 in MySQL's case disabled entirely.
874
875 =cut
876
877 # Storage subclasses should override this
878 sub with_deferred_fk_checks {
879   my ($self, $sub) = @_;
880   $sub->();
881 }
882
883 =head2 connected
884
885 =over
886
887 =item Arguments: none
888
889 =item Return Value: 1|0
890
891 =back
892
893 Verifies that the current database handle is active and ready to execute
894 an SQL statement (e.g. the connection did not get stale, server is still
895 answering, etc.) This method is used internally by L</dbh>.
896
897 =cut
898
899 sub connected {
900   my $self = shift;
901   return 0 unless $self->_seems_connected;
902
903   #be on the safe side
904   local $self->_dbh->{RaiseError} = 1;
905
906   return $self->_ping;
907 }
908
909 sub _seems_connected {
910   my $self = shift;
911
912   $self->_verify_pid;
913
914   my $dbh = $self->_dbh
915     or return 0;
916
917   return $dbh->FETCH('Active');
918 }
919
920 sub _ping {
921   my $self = shift;
922
923   my $dbh = $self->_dbh or return 0;
924
925   return $dbh->ping;
926 }
927
928 sub ensure_connected {
929   my ($self) = @_;
930
931   unless ($self->connected) {
932     $self->_populate_dbh;
933   }
934 }
935
936 =head2 dbh
937
938 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
939 is guaranteed to be healthy by implicitly calling L</connected>, and if
940 necessary performing a reconnection before returning. Keep in mind that this
941 is very B<expensive> on some database engines. Consider using L</dbh_do>
942 instead.
943
944 =cut
945
946 sub dbh {
947   my ($self) = @_;
948
949   if (not $self->_dbh) {
950     $self->_populate_dbh;
951   } else {
952     $self->ensure_connected;
953   }
954   return $self->_dbh;
955 }
956
957 # this is the internal "get dbh or connect (don't check)" method
958 sub _get_dbh {
959   my $self = shift;
960   $self->_verify_pid;
961   $self->_populate_dbh unless $self->_dbh;
962   return $self->_dbh;
963 }
964
965 sub sql_maker {
966   my ($self) = @_;
967   unless ($self->_sql_maker) {
968     my $sql_maker_class = $self->sql_maker_class;
969     $self->ensure_class_loaded ($sql_maker_class);
970
971     my %opts = %{$self->_sql_maker_opts||{}};
972     my $dialect =
973       $opts{limit_dialect}
974         ||
975       $self->sql_limit_dialect
976         ||
977       do {
978         my $s_class = (ref $self) || $self;
979         carp (
980           "Your storage class ($s_class) does not set sql_limit_dialect and you "
981         . 'have not supplied an explicit limit_dialect in your connection_info. '
982         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
983         . 'databases but can be (and often is) painfully slow.'
984         );
985
986         'GenericSubQ';
987       }
988     ;
989
990     $self->_sql_maker($sql_maker_class->new(
991       bindtype=>'columns',
992       array_datatypes => 1,
993       limit_dialect => $dialect,
994       name_sep => '.',
995       %opts,
996     ));
997   }
998   return $self->_sql_maker;
999 }
1000
1001 # nothing to do by default
1002 sub _rebless {}
1003 sub _init {}
1004
1005 sub _populate_dbh {
1006   my ($self) = @_;
1007
1008   my @info = @{$self->_dbi_connect_info || []};
1009   $self->_dbh(undef); # in case ->connected failed we might get sent here
1010   $self->_dbh_details({}); # reset everything we know
1011
1012   $self->_dbh($self->_connect(@info));
1013
1014   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1015
1016   $self->_determine_driver;
1017
1018   # Always set the transaction depth on connect, since
1019   #  there is no transaction in progress by definition
1020   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1021
1022   $self->_run_connection_actions unless $self->{_in_determine_driver};
1023 }
1024
1025 sub _run_connection_actions {
1026   my $self = shift;
1027   my @actions;
1028
1029   push @actions, ( $self->on_connect_call || () );
1030   push @actions, $self->_parse_connect_do ('on_connect_do');
1031
1032   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1033 }
1034
1035
1036
1037 sub set_use_dbms_capability {
1038   $_[0]->set_inherited ($_[1], $_[2]);
1039 }
1040
1041 sub get_use_dbms_capability {
1042   my ($self, $capname) = @_;
1043
1044   my $use = $self->get_inherited ($capname);
1045   return defined $use
1046     ? $use
1047     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1048   ;
1049 }
1050
1051 sub set_dbms_capability {
1052   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1053 }
1054
1055 sub get_dbms_capability {
1056   my ($self, $capname) = @_;
1057
1058   my $cap = $self->_dbh_details->{capability}{$capname};
1059
1060   unless (defined $cap) {
1061     if (my $meth = $self->can ("_determine$capname")) {
1062       $cap = $self->$meth ? 1 : 0;
1063     }
1064     else {
1065       $cap = 0;
1066     }
1067
1068     $self->set_dbms_capability ($capname, $cap);
1069   }
1070
1071   return $cap;
1072 }
1073
1074 sub _server_info {
1075   my $self = shift;
1076
1077   my $info;
1078   unless ($info = $self->_dbh_details->{info}) {
1079
1080     $info = {};
1081
1082     my $server_version = try { $self->_get_server_version };
1083
1084     if (defined $server_version) {
1085       $info->{dbms_version} = $server_version;
1086
1087       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1088       my @verparts = split (/\./, $numeric_version);
1089       if (
1090         @verparts
1091           &&
1092         $verparts[0] <= 999
1093       ) {
1094         # consider only up to 3 version parts, iff not more than 3 digits
1095         my @use_parts;
1096         while (@verparts && @use_parts < 3) {
1097           my $p = shift @verparts;
1098           last if $p > 999;
1099           push @use_parts, $p;
1100         }
1101         push @use_parts, 0 while @use_parts < 3;
1102
1103         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1104       }
1105     }
1106
1107     $self->_dbh_details->{info} = $info;
1108   }
1109
1110   return $info;
1111 }
1112
1113 sub _get_server_version {
1114   shift->_get_dbh->get_info(18);
1115 }
1116
1117 sub _determine_driver {
1118   my ($self) = @_;
1119
1120   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1121     my $started_connected = 0;
1122     local $self->{_in_determine_driver} = 1;
1123
1124     if (ref($self) eq __PACKAGE__) {
1125       my $driver;
1126       if ($self->_dbh) { # we are connected
1127         $driver = $self->_dbh->{Driver}{Name};
1128         $started_connected = 1;
1129       } else {
1130         # if connect_info is a CODEREF, we have no choice but to connect
1131         if (ref $self->_dbi_connect_info->[0] &&
1132             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1133           $self->_populate_dbh;
1134           $driver = $self->_dbh->{Driver}{Name};
1135         }
1136         else {
1137           # try to use dsn to not require being connected, the driver may still
1138           # force a connection in _rebless to determine version
1139           # (dsn may not be supplied at all if all we do is make a mock-schema)
1140           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1141           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1142           $driver ||= $ENV{DBI_DRIVER};
1143         }
1144       }
1145
1146       if ($driver) {
1147         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1148         if ($self->load_optional_class($storage_class)) {
1149           mro::set_mro($storage_class, 'c3');
1150           bless $self, $storage_class;
1151           $self->_rebless();
1152         }
1153       }
1154     }
1155
1156     $self->_driver_determined(1);
1157
1158     $self->_init; # run driver-specific initializations
1159
1160     $self->_run_connection_actions
1161         if !$started_connected && defined $self->_dbh;
1162   }
1163 }
1164
1165 sub _do_connection_actions {
1166   my $self          = shift;
1167   my $method_prefix = shift;
1168   my $call          = shift;
1169
1170   if (not ref($call)) {
1171     my $method = $method_prefix . $call;
1172     $self->$method(@_);
1173   } elsif (ref($call) eq 'CODE') {
1174     $self->$call(@_);
1175   } elsif (ref($call) eq 'ARRAY') {
1176     if (ref($call->[0]) ne 'ARRAY') {
1177       $self->_do_connection_actions($method_prefix, $_) for @$call;
1178     } else {
1179       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1180     }
1181   } else {
1182     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1183   }
1184
1185   return $self;
1186 }
1187
1188 sub connect_call_do_sql {
1189   my $self = shift;
1190   $self->_do_query(@_);
1191 }
1192
1193 sub disconnect_call_do_sql {
1194   my $self = shift;
1195   $self->_do_query(@_);
1196 }
1197
1198 # override in db-specific backend when necessary
1199 sub connect_call_datetime_setup { 1 }
1200
1201 sub _do_query {
1202   my ($self, $action) = @_;
1203
1204   if (ref $action eq 'CODE') {
1205     $action = $action->($self);
1206     $self->_do_query($_) foreach @$action;
1207   }
1208   else {
1209     # Most debuggers expect ($sql, @bind), so we need to exclude
1210     # the attribute hash which is the second argument to $dbh->do
1211     # furthermore the bind values are usually to be presented
1212     # as named arrayref pairs, so wrap those here too
1213     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1214     my $sql = shift @do_args;
1215     my $attrs = shift @do_args;
1216     my @bind = map { [ undef, $_ ] } @do_args;
1217
1218     $self->_query_start($sql, @bind);
1219     $self->_get_dbh->do($sql, $attrs, @do_args);
1220     $self->_query_end($sql, @bind);
1221   }
1222
1223   return $self;
1224 }
1225
1226 sub _connect {
1227   my ($self, @info) = @_;
1228
1229   $self->throw_exception("You failed to provide any connection info")
1230     if !@info;
1231
1232   my ($old_connect_via, $dbh);
1233
1234   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1235     $old_connect_via = $DBI::connect_via;
1236     $DBI::connect_via = 'connect';
1237   }
1238
1239   try {
1240     if(ref $info[0] eq 'CODE') {
1241        $dbh = $info[0]->();
1242     }
1243     else {
1244        $dbh = DBI->connect(@info);
1245     }
1246
1247     if (!$dbh) {
1248       die $DBI::errstr;
1249     }
1250
1251     unless ($self->unsafe) {
1252
1253       # this odd anonymous coderef dereference is in fact really
1254       # necessary to avoid the unwanted effect described in perl5
1255       # RT#75792
1256       sub {
1257         my $weak_self = $_[0];
1258         weaken $weak_self;
1259
1260         $_[1]->{HandleError} = sub {
1261           if ($weak_self) {
1262             $weak_self->throw_exception("DBI Exception: $_[0]");
1263           }
1264           else {
1265             # the handler may be invoked by something totally out of
1266             # the scope of DBIC
1267             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1268           }
1269         };
1270       }->($self, $dbh);
1271
1272       $dbh->{ShowErrorStatement} = 1;
1273       $dbh->{RaiseError} = 1;
1274       $dbh->{PrintError} = 0;
1275     }
1276   }
1277   catch {
1278     $self->throw_exception("DBI Connection failed: $_")
1279   }
1280   finally {
1281     $DBI::connect_via = $old_connect_via if $old_connect_via;
1282   };
1283
1284   $self->_dbh_autocommit($dbh->{AutoCommit});
1285   $dbh;
1286 }
1287
1288 sub svp_begin {
1289   my ($self, $name) = @_;
1290
1291   $name = $self->_svp_generate_name
1292     unless defined $name;
1293
1294   $self->throw_exception ("You can't use savepoints outside a transaction")
1295     if $self->{transaction_depth} == 0;
1296
1297   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1298     unless $self->can('_svp_begin');
1299
1300   push @{ $self->{savepoints} }, $name;
1301
1302   $self->debugobj->svp_begin($name) if $self->debug;
1303
1304   return $self->_svp_begin($name);
1305 }
1306
1307 sub svp_release {
1308   my ($self, $name) = @_;
1309
1310   $self->throw_exception ("You can't use savepoints outside a transaction")
1311     if $self->{transaction_depth} == 0;
1312
1313   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1314     unless $self->can('_svp_release');
1315
1316   if (defined $name) {
1317     $self->throw_exception ("Savepoint '$name' does not exist")
1318       unless grep { $_ eq $name } @{ $self->{savepoints} };
1319
1320     # Dig through the stack until we find the one we are releasing.  This keeps
1321     # the stack up to date.
1322     my $svp;
1323
1324     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1325   } else {
1326     $name = pop @{ $self->{savepoints} };
1327   }
1328
1329   $self->debugobj->svp_release($name) if $self->debug;
1330
1331   return $self->_svp_release($name);
1332 }
1333
1334 sub svp_rollback {
1335   my ($self, $name) = @_;
1336
1337   $self->throw_exception ("You can't use savepoints outside a transaction")
1338     if $self->{transaction_depth} == 0;
1339
1340   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1341     unless $self->can('_svp_rollback');
1342
1343   if (defined $name) {
1344       # If they passed us a name, verify that it exists in the stack
1345       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1346           $self->throw_exception("Savepoint '$name' does not exist!");
1347       }
1348
1349       # Dig through the stack until we find the one we are releasing.  This keeps
1350       # the stack up to date.
1351       while(my $s = pop(@{ $self->{savepoints} })) {
1352           last if($s eq $name);
1353       }
1354       # Add the savepoint back to the stack, as a rollback doesn't remove the
1355       # named savepoint, only everything after it.
1356       push(@{ $self->{savepoints} }, $name);
1357   } else {
1358       # We'll assume they want to rollback to the last savepoint
1359       $name = $self->{savepoints}->[-1];
1360   }
1361
1362   $self->debugobj->svp_rollback($name) if $self->debug;
1363
1364   return $self->_svp_rollback($name);
1365 }
1366
1367 sub _svp_generate_name {
1368   my ($self) = @_;
1369   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1370 }
1371
1372 sub txn_begin {
1373   my $self = shift;
1374
1375   # this means we have not yet connected and do not know the AC status
1376   # (e.g. coderef $dbh)
1377   if (! defined $self->_dbh_autocommit) {
1378     $self->ensure_connected;
1379   }
1380   # otherwise re-connect on pid changes, so
1381   # that the txn_depth is adjusted properly
1382   # the lightweight _get_dbh is good enoug here
1383   # (only superficial handle check, no pings)
1384   else {
1385     $self->_get_dbh;
1386   }
1387
1388   if($self->transaction_depth == 0) {
1389     $self->debugobj->txn_begin()
1390       if $self->debug;
1391     $self->_dbh_begin_work;
1392   }
1393   elsif ($self->auto_savepoint) {
1394     $self->svp_begin;
1395   }
1396   $self->{transaction_depth}++;
1397 }
1398
1399 sub _dbh_begin_work {
1400   my $self = shift;
1401
1402   # if the user is utilizing txn_do - good for him, otherwise we need to
1403   # ensure that the $dbh is healthy on BEGIN.
1404   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1405   # will be replaced by a failure of begin_work itself (which will be
1406   # then retried on reconnect)
1407   if ($self->{_in_dbh_do}) {
1408     $self->_dbh->begin_work;
1409   } else {
1410     $self->dbh_do(sub { $_[1]->begin_work });
1411   }
1412 }
1413
1414 sub txn_commit {
1415   my $self = shift;
1416   if ($self->{transaction_depth} == 1) {
1417     $self->debugobj->txn_commit()
1418       if ($self->debug);
1419     $self->_dbh_commit;
1420     $self->{transaction_depth} = 0
1421       if $self->_dbh_autocommit;
1422   }
1423   elsif($self->{transaction_depth} > 1) {
1424     $self->{transaction_depth}--;
1425     $self->svp_release
1426       if $self->auto_savepoint;
1427   }
1428   else {
1429     $self->throw_exception( 'Refusing to commit without a started transaction' );
1430   }
1431 }
1432
1433 sub _dbh_commit {
1434   my $self = shift;
1435   my $dbh  = $self->_dbh
1436     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1437   $dbh->commit;
1438 }
1439
1440 sub txn_rollback {
1441   my $self = shift;
1442   my $dbh = $self->_dbh;
1443   try {
1444     if ($self->{transaction_depth} == 1) {
1445       $self->debugobj->txn_rollback()
1446         if ($self->debug);
1447       $self->{transaction_depth} = 0
1448         if $self->_dbh_autocommit;
1449       $self->_dbh_rollback;
1450     }
1451     elsif($self->{transaction_depth} > 1) {
1452       $self->{transaction_depth}--;
1453       if ($self->auto_savepoint) {
1454         $self->svp_rollback;
1455         $self->svp_release;
1456       }
1457     }
1458     else {
1459       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1460     }
1461   }
1462   catch {
1463     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1464
1465     if ($_ !~ /$exception_class/) {
1466       # ensure that a failed rollback resets the transaction depth
1467       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1468     }
1469
1470     $self->throw_exception($_)
1471   };
1472 }
1473
1474 sub _dbh_rollback {
1475   my $self = shift;
1476   my $dbh  = $self->_dbh
1477     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1478   $dbh->rollback;
1479 }
1480
1481 # This used to be the top-half of _execute.  It was split out to make it
1482 #  easier to override in NoBindVars without duping the rest.  It takes up
1483 #  all of _execute's args, and emits $sql, @bind.
1484 sub _prep_for_execute {
1485   my ($self, $op, $extra_bind, $ident, $args) = @_;
1486
1487   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1488     $ident = $ident->from();
1489   }
1490
1491   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1492
1493   unshift(@bind,
1494     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1495       if $extra_bind;
1496   return ($sql, \@bind);
1497 }
1498
1499
1500 sub _fix_bind_params {
1501     my ($self, @bind) = @_;
1502
1503     ### Turn @bind from something like this:
1504     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1505     ### to this:
1506     ###   ( "'1'", "'1'", "'3'" )
1507     return
1508         map {
1509             if ( defined( $_ && $_->[1] ) ) {
1510                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1511             }
1512             else { q{NULL}; }
1513         } @bind;
1514 }
1515
1516 sub _query_start {
1517     my ( $self, $sql, @bind ) = @_;
1518
1519     if ( $self->debug ) {
1520         @bind = $self->_fix_bind_params(@bind);
1521
1522         $self->debugobj->query_start( $sql, @bind );
1523     }
1524 }
1525
1526 sub _query_end {
1527     my ( $self, $sql, @bind ) = @_;
1528
1529     if ( $self->debug ) {
1530         @bind = $self->_fix_bind_params(@bind);
1531         $self->debugobj->query_end( $sql, @bind );
1532     }
1533 }
1534
1535 sub _dbh_execute {
1536   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1537
1538   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1539
1540   $self->_query_start( $sql, @$bind );
1541
1542   my $sth = $self->sth($sql,$op);
1543
1544   my $placeholder_index = 1;
1545
1546   foreach my $bound (@$bind) {
1547     my $attributes = {};
1548     my($column_name, @data) = @$bound;
1549
1550     if ($bind_attributes) {
1551       $attributes = $bind_attributes->{$column_name}
1552       if defined $bind_attributes->{$column_name};
1553     }
1554
1555     foreach my $data (@data) {
1556       my $ref = ref $data;
1557       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1558
1559       $sth->bind_param($placeholder_index, $data, $attributes);
1560       $placeholder_index++;
1561     }
1562   }
1563
1564   # Can this fail without throwing an exception anyways???
1565   my $rv = $sth->execute();
1566   $self->throw_exception(
1567     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1568   ) if !$rv;
1569
1570   $self->_query_end( $sql, @$bind );
1571
1572   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1573 }
1574
1575 sub _execute {
1576     my $self = shift;
1577     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1578 }
1579
1580 sub _prefetch_autovalues {
1581   my ($self, $source, $to_insert) = @_;
1582
1583   my $colinfo = $source->columns_info;
1584
1585   my %values;
1586   for my $col (keys %$colinfo) {
1587     if (
1588       $colinfo->{$col}{auto_nextval}
1589         and
1590       (
1591         ! exists $to_insert->{$col}
1592           or
1593         ref $to_insert->{$col} eq 'SCALAR'
1594       )
1595     ) {
1596       $values{$col} = $self->_sequence_fetch(
1597         'nextval',
1598         ( $colinfo->{$col}{sequence} ||=
1599             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1600         ),
1601       );
1602     }
1603   }
1604
1605   \%values;
1606 }
1607
1608 sub insert {
1609   my ($self, $source, $to_insert) = @_;
1610
1611   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1612
1613   # fuse the values
1614   $to_insert = { %$to_insert, %$prefetched_values };
1615
1616   # list of primary keys we try to fetch from the database
1617   # both not-exsists and scalarrefs are considered
1618   my %fetch_pks;
1619   %fetch_pks = ( map
1620     { $_ => scalar keys %fetch_pks }  # so we can preserve order for prettyness
1621     grep
1622       { ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR' }
1623       $source->primary_columns
1624   );
1625
1626   my $sqla_opts;
1627   if ($self->_use_insert_returning) {
1628
1629     # retain order as declared in the resultsource
1630     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1631       push @{$sqla_opts->{returning}}, $_;
1632     }
1633   }
1634
1635   my $bind_attributes = $self->source_bind_attributes($source);
1636
1637   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1638
1639   my %returned_cols;
1640
1641   if (my $retlist = $sqla_opts->{returning}) {
1642     my @ret_vals = try {
1643       local $SIG{__WARN__} = sub {};
1644       my @r = $sth->fetchrow_array;
1645       $sth->finish;
1646       @r;
1647     };
1648
1649     @returned_cols{@$retlist} = @ret_vals if @ret_vals;
1650   }
1651
1652   return { %$prefetched_values, %returned_cols };
1653 }
1654
1655
1656 ## Currently it is assumed that all values passed will be "normal", i.e. not
1657 ## scalar refs, or at least, all the same type as the first set, the statement is
1658 ## only prepped once.
1659 sub insert_bulk {
1660   my ($self, $source, $cols, $data) = @_;
1661
1662   my %colvalues;
1663   @colvalues{@$cols} = (0..$#$cols);
1664
1665   for my $i (0..$#$cols) {
1666     my $first_val = $data->[0][$i];
1667     next unless ref $first_val eq 'SCALAR';
1668
1669     $colvalues{ $cols->[$i] } = $first_val;
1670   }
1671
1672   # check for bad data and stringify stringifiable objects
1673   my $bad_slice = sub {
1674     my ($msg, $col_idx, $slice_idx) = @_;
1675     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1676       $msg,
1677       $cols->[$col_idx],
1678       do {
1679         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1680         Dumper {
1681           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1682         },
1683       }
1684     );
1685   };
1686
1687   for my $datum_idx (0..$#$data) {
1688     my $datum = $data->[$datum_idx];
1689
1690     for my $col_idx (0..$#$cols) {
1691       my $val            = $datum->[$col_idx];
1692       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1693       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1694
1695       if ($is_literal_sql) {
1696         if (not ref $val) {
1697           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1698         }
1699         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1700           $bad_slice->("$reftype reference found where literal SQL expected",
1701             $col_idx, $datum_idx);
1702         }
1703         elsif ($$val ne $$sqla_bind){
1704           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1705             $col_idx, $datum_idx);
1706         }
1707       }
1708       elsif (my $reftype = ref $val) {
1709         require overload;
1710         if (overload::Method($val, '""')) {
1711           $datum->[$col_idx] = "".$val;
1712         }
1713         else {
1714           $bad_slice->("$reftype reference found where bind expected",
1715             $col_idx, $datum_idx);
1716         }
1717       }
1718     }
1719   }
1720
1721   my ($sql, $bind) = $self->_prep_for_execute (
1722     'insert', undef, $source, [\%colvalues]
1723   );
1724
1725   if (! @$bind) {
1726     # if the bindlist is empty - make sure all "values" are in fact
1727     # literal scalarrefs. If not the case this means the storage ate
1728     # them away (e.g. the NoBindVars component) and interpolated them
1729     # directly into the SQL. This obviosly can't be good for multi-inserts
1730
1731     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1732       if first { ref $_ ne 'SCALAR' } values %colvalues;
1733   }
1734
1735   # neither _execute_array, nor _execute_inserts_with_no_binds are
1736   # atomic (even if _execute _array is a single call). Thus a safety
1737   # scope guard
1738   my $guard = $self->txn_scope_guard;
1739
1740   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1741   my $sth = $self->sth($sql);
1742   my $rv = do {
1743     if (@$bind) {
1744       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1745       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1746     }
1747     else {
1748       # bind_param_array doesn't work if there are no binds
1749       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1750     }
1751   };
1752
1753   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1754
1755   $guard->commit;
1756
1757   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1758 }
1759
1760 sub _execute_array {
1761   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1762
1763   ## This must be an arrayref, else nothing works!
1764   my $tuple_status = [];
1765
1766   ## Get the bind_attributes, if any exist
1767   my $bind_attributes = $self->source_bind_attributes($source);
1768
1769   ## Bind the values and execute
1770   my $placeholder_index = 1;
1771
1772   foreach my $bound (@$bind) {
1773
1774     my $attributes = {};
1775     my ($column_name, $data_index) = @$bound;
1776
1777     if( $bind_attributes ) {
1778       $attributes = $bind_attributes->{$column_name}
1779       if defined $bind_attributes->{$column_name};
1780     }
1781
1782     my @data = map { $_->[$data_index] } @$data;
1783
1784     $sth->bind_param_array(
1785       $placeholder_index,
1786       [@data],
1787       (%$attributes ?  $attributes : ()),
1788     );
1789     $placeholder_index++;
1790   }
1791
1792   my ($rv, $err);
1793   try {
1794     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1795   }
1796   catch {
1797     $err = shift;
1798   };
1799
1800   # Statement must finish even if there was an exception.
1801   try {
1802     $sth->finish
1803   }
1804   catch {
1805     $err = shift unless defined $err
1806   };
1807
1808   $err = $sth->errstr
1809     if (! defined $err and $sth->err);
1810
1811   if (defined $err) {
1812     my $i = 0;
1813     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1814
1815     $self->throw_exception("Unexpected populate error: $err")
1816       if ($i > $#$tuple_status);
1817
1818     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1819       ($tuple_status->[$i][1] || $err),
1820       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1821     );
1822   }
1823
1824   return $rv;
1825 }
1826
1827 sub _dbh_execute_array {
1828     my ($self, $sth, $tuple_status, @extra) = @_;
1829
1830     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1831 }
1832
1833 sub _dbh_execute_inserts_with_no_binds {
1834   my ($self, $sth, $count) = @_;
1835
1836   my $err;
1837   try {
1838     my $dbh = $self->_get_dbh;
1839     local $dbh->{RaiseError} = 1;
1840     local $dbh->{PrintError} = 0;
1841
1842     $sth->execute foreach 1..$count;
1843   }
1844   catch {
1845     $err = shift;
1846   };
1847
1848   # Make sure statement is finished even if there was an exception.
1849   try {
1850     $sth->finish
1851   }
1852   catch {
1853     $err = shift unless defined $err;
1854   };
1855
1856   $self->throw_exception($err) if defined $err;
1857
1858   return $count;
1859 }
1860
1861 sub update {
1862   my ($self, $source, @args) = @_;
1863
1864   my $bind_attrs = $self->source_bind_attributes($source);
1865
1866   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1867 }
1868
1869
1870 sub delete {
1871   my ($self, $source, @args) = @_;
1872
1873   my $bind_attrs = $self->source_bind_attributes($source);
1874
1875   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1876 }
1877
1878 # We were sent here because the $rs contains a complex search
1879 # which will require a subquery to select the correct rows
1880 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1881 #
1882 # Generating a single PK column subquery is trivial and supported
1883 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1884 # Look at _multipk_update_delete()
1885 sub _subq_update_delete {
1886   my $self = shift;
1887   my ($rs, $op, $values) = @_;
1888
1889   my $rsrc = $rs->result_source;
1890
1891   # quick check if we got a sane rs on our hands
1892   my @pcols = $rsrc->_pri_cols;
1893
1894   my $sel = $rs->_resolved_attrs->{select};
1895   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1896
1897   if (
1898       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1899         ne
1900       join ("\x00", sort @$sel )
1901   ) {
1902     $self->throw_exception (
1903       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1904     );
1905   }
1906
1907   if (@pcols == 1) {
1908     return $self->$op (
1909       $rsrc,
1910       $op eq 'update' ? $values : (),
1911       { $pcols[0] => { -in => $rs->as_query } },
1912     );
1913   }
1914
1915   else {
1916     return $self->_multipk_update_delete (@_);
1917   }
1918 }
1919
1920 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1921 # resultset update/delete involving subqueries. So by default resort
1922 # to simple (and inefficient) delete_all style per-row opearations,
1923 # while allowing specific storages to override this with a faster
1924 # implementation.
1925 #
1926 sub _multipk_update_delete {
1927   return shift->_per_row_update_delete (@_);
1928 }
1929
1930 # This is the default loop used to delete/update rows for multi PK
1931 # resultsets, and used by mysql exclusively (because it can't do anything
1932 # else).
1933 #
1934 # We do not use $row->$op style queries, because resultset update/delete
1935 # is not expected to cascade (this is what delete_all/update_all is for).
1936 #
1937 # There should be no race conditions as the entire operation is rolled
1938 # in a transaction.
1939 #
1940 sub _per_row_update_delete {
1941   my $self = shift;
1942   my ($rs, $op, $values) = @_;
1943
1944   my $rsrc = $rs->result_source;
1945   my @pcols = $rsrc->_pri_cols;
1946
1947   my $guard = $self->txn_scope_guard;
1948
1949   # emulate the return value of $sth->execute for non-selects
1950   my $row_cnt = '0E0';
1951
1952   my $subrs_cur = $rs->cursor;
1953   my @all_pk = $subrs_cur->all;
1954   for my $pks ( @all_pk) {
1955
1956     my $cond;
1957     for my $i (0.. $#pcols) {
1958       $cond->{$pcols[$i]} = $pks->[$i];
1959     }
1960
1961     $self->$op (
1962       $rsrc,
1963       $op eq 'update' ? $values : (),
1964       $cond,
1965     );
1966
1967     $row_cnt++;
1968   }
1969
1970   $guard->commit;
1971
1972   return $row_cnt;
1973 }
1974
1975 sub _select {
1976   my $self = shift;
1977   $self->_execute($self->_select_args(@_));
1978 }
1979
1980 sub _select_args_to_query {
1981   my $self = shift;
1982
1983   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1984   #  = $self->_select_args($ident, $select, $cond, $attrs);
1985   my ($op, $bind, $ident, $bind_attrs, @args) =
1986     $self->_select_args(@_);
1987
1988   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1989   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1990   $prepared_bind ||= [];
1991
1992   return wantarray
1993     ? ($sql, $prepared_bind, $bind_attrs)
1994     : \[ "($sql)", @$prepared_bind ]
1995   ;
1996 }
1997
1998 sub _select_args {
1999   my ($self, $ident, $select, $where, $attrs) = @_;
2000
2001   my $sql_maker = $self->sql_maker;
2002   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2003
2004   $attrs = {
2005     %$attrs,
2006     select => $select,
2007     from => $ident,
2008     where => $where,
2009     $rs_alias && $alias2source->{$rs_alias}
2010       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
2011       : ()
2012     ,
2013   };
2014
2015   # calculate bind_attrs before possible $ident mangling
2016   my $bind_attrs = {};
2017   for my $alias (keys %$alias2source) {
2018     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2019     for my $col (keys %$bindtypes) {
2020
2021       my $fqcn = join ('.', $alias, $col);
2022       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2023
2024       # Unqialified column names are nice, but at the same time can be
2025       # rather ambiguous. What we do here is basically go along with
2026       # the loop, adding an unqualified column slot to $bind_attrs,
2027       # alongside the fully qualified name. As soon as we encounter
2028       # another column by that name (which would imply another table)
2029       # we unset the unqualified slot and never add any info to it
2030       # to avoid erroneous type binding. If this happens the users
2031       # only choice will be to fully qualify his column name
2032
2033       if (exists $bind_attrs->{$col}) {
2034         $bind_attrs->{$col} = {};
2035       }
2036       else {
2037         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2038       }
2039     }
2040   }
2041
2042   # Sanity check the attributes (SQLMaker does it too, but
2043   # in case of a software_limit we'll never reach there)
2044   if (defined $attrs->{offset}) {
2045     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2046       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2047   }
2048   $attrs->{offset} ||= 0;
2049
2050   if (defined $attrs->{rows}) {
2051     $self->throw_exception("The rows attribute must be a positive integer if present")
2052       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2053   }
2054   elsif ($attrs->{offset}) {
2055     # MySQL actually recommends this approach.  I cringe.
2056     $attrs->{rows} = $sql_maker->__max_int;
2057   }
2058
2059   my @limit;
2060
2061   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2062   # storage, unless software limit was requested
2063   if (
2064     #limited has_many
2065     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2066        ||
2067     # grouped prefetch (to satisfy group_by == select)
2068     ( $attrs->{group_by}
2069         &&
2070       @{$attrs->{group_by}}
2071         &&
2072       $attrs->{_prefetch_select}
2073         &&
2074       @{$attrs->{_prefetch_select}}
2075     )
2076   ) {
2077     ($ident, $select, $where, $attrs)
2078       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2079   }
2080   elsif (! $attrs->{software_limit} ) {
2081     push @limit, $attrs->{rows}, $attrs->{offset};
2082   }
2083
2084   # try to simplify the joinmap further (prune unreferenced type-single joins)
2085   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2086
2087 ###
2088   # This would be the point to deflate anything found in $where
2089   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2090   # expect a row object. And all we have is a resultsource (it is trivial
2091   # to extract deflator coderefs via $alias2source above).
2092   #
2093   # I don't see a way forward other than changing the way deflators are
2094   # invoked, and that's just bad...
2095 ###
2096
2097   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2098 }
2099
2100 # Returns a counting SELECT for a simple count
2101 # query. Abstracted so that a storage could override
2102 # this to { count => 'firstcol' } or whatever makes
2103 # sense as a performance optimization
2104 sub _count_select {
2105   #my ($self, $source, $rs_attrs) = @_;
2106   return { count => '*' };
2107 }
2108
2109
2110 sub source_bind_attributes {
2111   my ($self, $source) = @_;
2112
2113   my $bind_attributes;
2114
2115   my $colinfo = $source->columns_info;
2116
2117   for my $col (keys %$colinfo) {
2118     if (my $dt = $colinfo->{$col}{data_type} ) {
2119       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2120     }
2121   }
2122
2123   return $bind_attributes;
2124 }
2125
2126 =head2 select
2127
2128 =over 4
2129
2130 =item Arguments: $ident, $select, $condition, $attrs
2131
2132 =back
2133
2134 Handle a SQL select statement.
2135
2136 =cut
2137
2138 sub select {
2139   my $self = shift;
2140   my ($ident, $select, $condition, $attrs) = @_;
2141   return $self->cursor_class->new($self, \@_, $attrs);
2142 }
2143
2144 sub select_single {
2145   my $self = shift;
2146   my ($rv, $sth, @bind) = $self->_select(@_);
2147   my @row = $sth->fetchrow_array;
2148   my @nextrow = $sth->fetchrow_array if @row;
2149   if(@row && @nextrow) {
2150     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2151   }
2152   # Need to call finish() to work round broken DBDs
2153   $sth->finish();
2154   return @row;
2155 }
2156
2157 =head2 sql_limit_dialect
2158
2159 This is an accessor for the default SQL limit dialect used by a particular
2160 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2161 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2162 see L<DBIx::Class::SQLMaker::LimitDialects>.
2163
2164 =head2 sth
2165
2166 =over 4
2167
2168 =item Arguments: $sql
2169
2170 =back
2171
2172 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2173
2174 =cut
2175
2176 sub _dbh_sth {
2177   my ($self, $dbh, $sql) = @_;
2178
2179   # 3 is the if_active parameter which avoids active sth re-use
2180   my $sth = $self->disable_sth_caching
2181     ? $dbh->prepare($sql)
2182     : $dbh->prepare_cached($sql, {}, 3);
2183
2184   # XXX You would think RaiseError would make this impossible,
2185   #  but apparently that's not true :(
2186   $self->throw_exception($dbh->errstr) if !$sth;
2187
2188   $sth;
2189 }
2190
2191 sub sth {
2192   my ($self, $sql) = @_;
2193   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2194 }
2195
2196 sub _dbh_columns_info_for {
2197   my ($self, $dbh, $table) = @_;
2198
2199   if ($dbh->can('column_info')) {
2200     my %result;
2201     my $caught;
2202     try {
2203       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2204       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2205       $sth->execute();
2206       while ( my $info = $sth->fetchrow_hashref() ){
2207         my %column_info;
2208         $column_info{data_type}   = $info->{TYPE_NAME};
2209         $column_info{size}      = $info->{COLUMN_SIZE};
2210         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2211         $column_info{default_value} = $info->{COLUMN_DEF};
2212         my $col_name = $info->{COLUMN_NAME};
2213         $col_name =~ s/^\"(.*)\"$/$1/;
2214
2215         $result{$col_name} = \%column_info;
2216       }
2217     } catch {
2218       $caught = 1;
2219     };
2220     return \%result if !$caught && scalar keys %result;
2221   }
2222
2223   my %result;
2224   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2225   $sth->execute;
2226   my @columns = @{$sth->{NAME_lc}};
2227   for my $i ( 0 .. $#columns ){
2228     my %column_info;
2229     $column_info{data_type} = $sth->{TYPE}->[$i];
2230     $column_info{size} = $sth->{PRECISION}->[$i];
2231     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2232
2233     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2234       $column_info{data_type} = $1;
2235       $column_info{size}    = $2;
2236     }
2237
2238     $result{$columns[$i]} = \%column_info;
2239   }
2240   $sth->finish;
2241
2242   foreach my $col (keys %result) {
2243     my $colinfo = $result{$col};
2244     my $type_num = $colinfo->{data_type};
2245     my $type_name;
2246     if(defined $type_num && $dbh->can('type_info')) {
2247       my $type_info = $dbh->type_info($type_num);
2248       $type_name = $type_info->{TYPE_NAME} if $type_info;
2249       $colinfo->{data_type} = $type_name if $type_name;
2250     }
2251   }
2252
2253   return \%result;
2254 }
2255
2256 sub columns_info_for {
2257   my ($self, $table) = @_;
2258   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2259 }
2260
2261 =head2 last_insert_id
2262
2263 Return the row id of the last insert.
2264
2265 =cut
2266
2267 sub _dbh_last_insert_id {
2268     my ($self, $dbh, $source, $col) = @_;
2269
2270     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2271
2272     return $id if defined $id;
2273
2274     my $class = ref $self;
2275     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2276 }
2277
2278 sub last_insert_id {
2279   my $self = shift;
2280   $self->_dbh_last_insert_id ($self->_dbh, @_);
2281 }
2282
2283 =head2 _native_data_type
2284
2285 =over 4
2286
2287 =item Arguments: $type_name
2288
2289 =back
2290
2291 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2292 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2293 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2294
2295 The default implementation returns C<undef>, implement in your Storage driver if
2296 you need this functionality.
2297
2298 Should map types from other databases to the native RDBMS type, for example
2299 C<VARCHAR2> to C<VARCHAR>.
2300
2301 Types with modifiers should map to the underlying data type. For example,
2302 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2303
2304 Composite types should map to the container type, for example
2305 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2306
2307 =cut
2308
2309 sub _native_data_type {
2310   #my ($self, $data_type) = @_;
2311   return undef
2312 }
2313
2314 # Check if placeholders are supported at all
2315 sub _determine_supports_placeholders {
2316   my $self = shift;
2317   my $dbh  = $self->_get_dbh;
2318
2319   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2320   # but it is inaccurate more often than not
2321   return try {
2322     local $dbh->{PrintError} = 0;
2323     local $dbh->{RaiseError} = 1;
2324     $dbh->do('select ?', {}, 1);
2325     1;
2326   }
2327   catch {
2328     0;
2329   };
2330 }
2331
2332 # Check if placeholders bound to non-string types throw exceptions
2333 #
2334 sub _determine_supports_typeless_placeholders {
2335   my $self = shift;
2336   my $dbh  = $self->_get_dbh;
2337
2338   return try {
2339     local $dbh->{PrintError} = 0;
2340     local $dbh->{RaiseError} = 1;
2341     # this specifically tests a bind that is NOT a string
2342     $dbh->do('select 1 where 1 = ?', {}, 1);
2343     1;
2344   }
2345   catch {
2346     0;
2347   };
2348 }
2349
2350 =head2 sqlt_type
2351
2352 Returns the database driver name.
2353
2354 =cut
2355
2356 sub sqlt_type {
2357   shift->_get_dbh->{Driver}->{Name};
2358 }
2359
2360 =head2 bind_attribute_by_data_type
2361
2362 Given a datatype from column info, returns a database specific bind
2363 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2364 let the database planner just handle it.
2365
2366 Generally only needed for special case column types, like bytea in postgres.
2367
2368 =cut
2369
2370 sub bind_attribute_by_data_type {
2371     return;
2372 }
2373
2374 =head2 is_datatype_numeric
2375
2376 Given a datatype from column_info, returns a boolean value indicating if
2377 the current RDBMS considers it a numeric value. This controls how
2378 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2379 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2380 be performed instead of the usual C<eq>.
2381
2382 =cut
2383
2384 sub is_datatype_numeric {
2385   my ($self, $dt) = @_;
2386
2387   return 0 unless $dt;
2388
2389   return $dt =~ /^ (?:
2390     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2391   ) $/ix;
2392 }
2393
2394
2395 =head2 create_ddl_dir
2396
2397 =over 4
2398
2399 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2400
2401 =back
2402
2403 Creates a SQL file based on the Schema, for each of the specified
2404 database engines in C<\@databases> in the given directory.
2405 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2406
2407 Given a previous version number, this will also create a file containing
2408 the ALTER TABLE statements to transform the previous schema into the
2409 current one. Note that these statements may contain C<DROP TABLE> or
2410 C<DROP COLUMN> statements that can potentially destroy data.
2411
2412 The file names are created using the C<ddl_filename> method below, please
2413 override this method in your schema if you would like a different file
2414 name format. For the ALTER file, the same format is used, replacing
2415 $version in the name with "$preversion-$version".
2416
2417 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2418 The most common value for this would be C<< { add_drop_table => 1 } >>
2419 to have the SQL produced include a C<DROP TABLE> statement for each table
2420 created. For quoting purposes supply C<quote_table_names> and
2421 C<quote_field_names>.
2422
2423 If no arguments are passed, then the following default values are assumed:
2424
2425 =over 4
2426
2427 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2428
2429 =item version    - $schema->schema_version
2430
2431 =item directory  - './'
2432
2433 =item preversion - <none>
2434
2435 =back
2436
2437 By default, C<\%sqlt_args> will have
2438
2439  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2440
2441 merged with the hash passed in. To disable any of those features, pass in a
2442 hashref like the following
2443
2444  { ignore_constraint_names => 0, # ... other options }
2445
2446
2447 WARNING: You are strongly advised to check all SQL files created, before applying
2448 them.
2449
2450 =cut
2451
2452 sub create_ddl_dir {
2453   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2454
2455   unless ($dir) {
2456     carp "No directory given, using ./\n";
2457     $dir = './';
2458   } else {
2459       -d $dir
2460         or
2461       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2462         or
2463       $self->throw_exception(
2464         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2465       );
2466   }
2467
2468   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2469
2470   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2471   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2472
2473   my $schema_version = $schema->schema_version || '1.x';
2474   $version ||= $schema_version;
2475
2476   $sqltargs = {
2477     add_drop_table => 1,
2478     ignore_constraint_names => 1,
2479     ignore_index_names => 1,
2480     %{$sqltargs || {}}
2481   };
2482
2483   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2484     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2485   }
2486
2487   my $sqlt = SQL::Translator->new( $sqltargs );
2488
2489   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2490   my $sqlt_schema = $sqlt->translate({ data => $schema })
2491     or $self->throw_exception ($sqlt->error);
2492
2493   foreach my $db (@$databases) {
2494     $sqlt->reset();
2495     $sqlt->{schema} = $sqlt_schema;
2496     $sqlt->producer($db);
2497
2498     my $file;
2499     my $filename = $schema->ddl_filename($db, $version, $dir);
2500     if (-e $filename && ($version eq $schema_version )) {
2501       # if we are dumping the current version, overwrite the DDL
2502       carp "Overwriting existing DDL file - $filename";
2503       unlink($filename);
2504     }
2505
2506     my $output = $sqlt->translate;
2507     if(!$output) {
2508       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2509       next;
2510     }
2511     if(!open($file, ">$filename")) {
2512       $self->throw_exception("Can't open $filename for writing ($!)");
2513       next;
2514     }
2515     print $file $output;
2516     close($file);
2517
2518     next unless ($preversion);
2519
2520     require SQL::Translator::Diff;
2521
2522     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2523     if(!-e $prefilename) {
2524       carp("No previous schema file found ($prefilename)");
2525       next;
2526     }
2527
2528     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2529     if(-e $difffile) {
2530       carp("Overwriting existing diff file - $difffile");
2531       unlink($difffile);
2532     }
2533
2534     my $source_schema;
2535     {
2536       my $t = SQL::Translator->new($sqltargs);
2537       $t->debug( 0 );
2538       $t->trace( 0 );
2539
2540       $t->parser( $db )
2541         or $self->throw_exception ($t->error);
2542
2543       my $out = $t->translate( $prefilename )
2544         or $self->throw_exception ($t->error);
2545
2546       $source_schema = $t->schema;
2547
2548       $source_schema->name( $prefilename )
2549         unless ( $source_schema->name );
2550     }
2551
2552     # The "new" style of producers have sane normalization and can support
2553     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2554     # And we have to diff parsed SQL against parsed SQL.
2555     my $dest_schema = $sqlt_schema;
2556
2557     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2558       my $t = SQL::Translator->new($sqltargs);
2559       $t->debug( 0 );
2560       $t->trace( 0 );
2561
2562       $t->parser( $db )
2563         or $self->throw_exception ($t->error);
2564
2565       my $out = $t->translate( $filename )
2566         or $self->throw_exception ($t->error);
2567
2568       $dest_schema = $t->schema;
2569
2570       $dest_schema->name( $filename )
2571         unless $dest_schema->name;
2572     }
2573
2574     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2575                                                   $dest_schema,   $db,
2576                                                   $sqltargs
2577                                                  );
2578     if(!open $file, ">$difffile") {
2579       $self->throw_exception("Can't write to $difffile ($!)");
2580       next;
2581     }
2582     print $file $diff;
2583     close($file);
2584   }
2585 }
2586
2587 =head2 deployment_statements
2588
2589 =over 4
2590
2591 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2592
2593 =back
2594
2595 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2596
2597 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2598 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2599
2600 C<$directory> is used to return statements from files in a previously created
2601 L</create_ddl_dir> directory and is optional. The filenames are constructed
2602 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2603
2604 If no C<$directory> is specified then the statements are constructed on the
2605 fly using L<SQL::Translator> and C<$version> is ignored.
2606
2607 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2608
2609 =cut
2610
2611 sub deployment_statements {
2612   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2613   $type ||= $self->sqlt_type;
2614   $version ||= $schema->schema_version || '1.x';
2615   $dir ||= './';
2616   my $filename = $schema->ddl_filename($type, $version, $dir);
2617   if(-f $filename)
2618   {
2619       my $file;
2620       open($file, "<$filename")
2621         or $self->throw_exception("Can't open $filename ($!)");
2622       my @rows = <$file>;
2623       close($file);
2624       return join('', @rows);
2625   }
2626
2627   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2628     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2629   }
2630
2631   # sources needs to be a parser arg, but for simplicty allow at top level
2632   # coming in
2633   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2634       if exists $sqltargs->{sources};
2635
2636   my $tr = SQL::Translator->new(
2637     producer => "SQL::Translator::Producer::${type}",
2638     %$sqltargs,
2639     parser => 'SQL::Translator::Parser::DBIx::Class',
2640     data => $schema,
2641   );
2642
2643   my @ret;
2644   my $wa = wantarray;
2645   if ($wa) {
2646     @ret = $tr->translate;
2647   }
2648   else {
2649     $ret[0] = $tr->translate;
2650   }
2651
2652   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2653     unless (@ret && defined $ret[0]);
2654
2655   return $wa ? @ret : $ret[0];
2656 }
2657
2658 sub deploy {
2659   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2660   my $deploy = sub {
2661     my $line = shift;
2662     return if($line =~ /^--/);
2663     return if(!$line);
2664     # next if($line =~ /^DROP/m);
2665     return if($line =~ /^BEGIN TRANSACTION/m);
2666     return if($line =~ /^COMMIT/m);
2667     return if $line =~ /^\s+$/; # skip whitespace only
2668     $self->_query_start($line);
2669     try {
2670       # do a dbh_do cycle here, as we need some error checking in
2671       # place (even though we will ignore errors)
2672       $self->dbh_do (sub { $_[1]->do($line) });
2673     } catch {
2674       carp qq{$_ (running "${line}")};
2675     };
2676     $self->_query_end($line);
2677   };
2678   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2679   if (@statements > 1) {
2680     foreach my $statement (@statements) {
2681       $deploy->( $statement );
2682     }
2683   }
2684   elsif (@statements == 1) {
2685     foreach my $line ( split(";\n", $statements[0])) {
2686       $deploy->( $line );
2687     }
2688   }
2689 }
2690
2691 =head2 datetime_parser
2692
2693 Returns the datetime parser class
2694
2695 =cut
2696
2697 sub datetime_parser {
2698   my $self = shift;
2699   return $self->{datetime_parser} ||= do {
2700     $self->build_datetime_parser(@_);
2701   };
2702 }
2703
2704 =head2 datetime_parser_type
2705
2706 Defines (returns) the datetime parser class - currently hardwired to
2707 L<DateTime::Format::MySQL>
2708
2709 =cut
2710
2711 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2712
2713 =head2 build_datetime_parser
2714
2715 See L</datetime_parser>
2716
2717 =cut
2718
2719 sub build_datetime_parser {
2720   my $self = shift;
2721   my $type = $self->datetime_parser_type(@_);
2722   $self->ensure_class_loaded ($type);
2723   return $type;
2724 }
2725
2726
2727 =head2 is_replicating
2728
2729 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2730 replicate from a master database.  Default is undef, which is the result
2731 returned by databases that don't support replication.
2732
2733 =cut
2734
2735 sub is_replicating {
2736     return;
2737
2738 }
2739
2740 =head2 lag_behind_master
2741
2742 Returns a number that represents a certain amount of lag behind a master db
2743 when a given storage is replicating.  The number is database dependent, but
2744 starts at zero and increases with the amount of lag. Default in undef
2745
2746 =cut
2747
2748 sub lag_behind_master {
2749     return;
2750 }
2751
2752 =head2 relname_to_table_alias
2753
2754 =over 4
2755
2756 =item Arguments: $relname, $join_count
2757
2758 =back
2759
2760 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2761 queries.
2762
2763 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2764 way these aliases are named.
2765
2766 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2767 otherwise C<"$relname">.
2768
2769 =cut
2770
2771 sub relname_to_table_alias {
2772   my ($self, $relname, $join_count) = @_;
2773
2774   my $alias = ($join_count && $join_count > 1 ?
2775     join('_', $relname, $join_count) : $relname);
2776
2777   return $alias;
2778 }
2779
2780 1;
2781
2782 =head1 USAGE NOTES
2783
2784 =head2 DBIx::Class and AutoCommit
2785
2786 DBIx::Class can do some wonderful magic with handling exceptions,
2787 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2788 (the default) combined with C<txn_do> for transaction support.
2789
2790 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2791 in an assumed transaction between commits, and you're telling us you'd
2792 like to manage that manually.  A lot of the magic protections offered by
2793 this module will go away.  We can't protect you from exceptions due to database
2794 disconnects because we don't know anything about how to restart your
2795 transactions.  You're on your own for handling all sorts of exceptional
2796 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2797 be with raw DBI.
2798
2799
2800 =head1 AUTHORS
2801
2802 Matt S. Trout <mst@shadowcatsystems.co.uk>
2803
2804 Andy Grundman <andy@hybridized.org>
2805
2806 =head1 LICENSE
2807
2808 You may distribute this code under the same terms as Perl itself.
2809
2810 =cut