c7f0ec8870243f107a9c7a4461f7f0b2a1880b1f
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use Data::Dumper::Concise 'Dumper';
15 use Sub::Name 'subname';
16 use Try::Tiny;
17 use File::Path 'make_path';
18 use namespace::clean;
19
20
21 # default cursor class, overridable in connect_info attributes
22 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
23
24 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
25 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
26
27 __PACKAGE__->mk_group_accessors('simple' => qw/
28   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
29   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
30   transaction_depth _dbh_autocommit  savepoints
31 /);
32
33 # the values for these accessors are picked out (and deleted) from
34 # the attribute hashref passed to connect_info
35 my @storage_options = qw/
36   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
37   disable_sth_caching unsafe auto_savepoint
38 /;
39 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
40
41
42 # capability definitions, using a 2-tiered accessor system
43 # The rationale is:
44 #
45 # A driver/user may define _use_X, which blindly without any checks says:
46 # "(do not) use this capability", (use_dbms_capability is an "inherited"
47 # type accessor)
48 #
49 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
50 # accessor, which in turn calls _determine_supports_X, and stores the return
51 # in a special slot on the storage object, which is wiped every time a $dbh
52 # reconnection takes place (it is not guaranteed that upon reconnection we
53 # will get the same rdbms version). _determine_supports_X does not need to
54 # exist on a driver, as we ->can for it before calling.
55
56 my @capabilities = (qw/insert_returning placeholders typeless_placeholders join_optimizer/);
57 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
58 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
59
60 # on by default, not strictly a capability (pending rewrite)
61 __PACKAGE__->_use_join_optimizer (1);
62 sub _determine_supports_join_optimizer { 1 };
63
64 # Each of these methods need _determine_driver called before itself
65 # in order to function reliably. This is a purely DRY optimization
66 #
67 # get_(use)_dbms_capability need to be called on the correct Storage
68 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
69 # _determine_supports_X which obv. needs a correct driver as well
70 my @rdbms_specific_methods = qw/
71   deployment_statements
72   sqlt_type
73   sql_maker
74   build_datetime_parser
75   datetime_parser_type
76
77   insert
78   insert_bulk
79   update
80   delete
81   select
82   select_single
83
84   get_use_dbms_capability
85   get_dbms_capability
86
87   _server_info
88   _get_server_version
89 /;
90
91 for my $meth (@rdbms_specific_methods) {
92
93   my $orig = __PACKAGE__->can ($meth)
94     or die "$meth is not a ::Storage::DBI method!";
95
96   no strict qw/refs/;
97   no warnings qw/redefine/;
98   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
99     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
100       $_[0]->_determine_driver;
101
102       # This for some reason crashes and burns on perl 5.8.1
103       # IFF the method ends up throwing an exception
104       #goto $_[0]->can ($meth);
105
106       my $cref = $_[0]->can ($meth);
107       goto $cref;
108     }
109     goto $orig;
110   };
111 }
112
113
114 =head1 NAME
115
116 DBIx::Class::Storage::DBI - DBI storage handler
117
118 =head1 SYNOPSIS
119
120   my $schema = MySchema->connect('dbi:SQLite:my.db');
121
122   $schema->storage->debug(1);
123
124   my @stuff = $schema->storage->dbh_do(
125     sub {
126       my ($storage, $dbh, @args) = @_;
127       $dbh->do("DROP TABLE authors");
128     },
129     @column_list
130   );
131
132   $schema->resultset('Book')->search({
133      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
134   });
135
136 =head1 DESCRIPTION
137
138 This class represents the connection to an RDBMS via L<DBI>.  See
139 L<DBIx::Class::Storage> for general information.  This pod only
140 documents DBI-specific methods and behaviors.
141
142 =head1 METHODS
143
144 =cut
145
146 sub new {
147   my $new = shift->next::method(@_);
148
149   $new->transaction_depth(0);
150   $new->_sql_maker_opts({});
151   $new->_dbh_details({});
152   $new->{savepoints} = [];
153   $new->{_in_dbh_do} = 0;
154   $new->{_dbh_gen} = 0;
155
156   # read below to see what this does
157   $new->_arm_global_destructor;
158
159   $new;
160 }
161
162 # This is hack to work around perl shooting stuff in random
163 # order on exit(). If we do not walk the remaining storage
164 # objects in an END block, there is a *small but real* chance
165 # of a fork()ed child to kill the parent's shared DBI handle,
166 # *before perl reaches the DESTROY in this package*
167 # Yes, it is ugly and effective.
168 {
169   my %seek_and_destroy;
170
171   sub _arm_global_destructor {
172     my $self = shift;
173     my $key = refaddr ($self);
174     $seek_and_destroy{$key} = $self;
175     weaken ($seek_and_destroy{$key});
176   }
177
178   END {
179     local $?; # just in case the DBI destructor changes it somehow
180
181     # destroy just the object if not native to this process/thread
182     $_->_preserve_foreign_dbh for (grep
183       { defined $_ }
184       values %seek_and_destroy
185     );
186   }
187 }
188
189 sub DESTROY {
190   my $self = shift;
191
192   # some databases spew warnings on implicit disconnect
193   local $SIG{__WARN__} = sub {};
194   $self->_dbh(undef);
195 }
196
197 sub _preserve_foreign_dbh {
198   my $self = shift;
199
200   return unless $self->_dbh;
201
202   $self->_verify_tid;
203
204   return unless $self->_dbh;
205
206   $self->_verify_pid;
207
208 }
209
210 # handle pid changes correctly - do not destroy parent's connection
211 sub _verify_pid {
212   my $self = shift;
213
214   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
215
216   $self->_dbh->{InactiveDestroy} = 1;
217   $self->_dbh(undef);
218   $self->{_dbh_gen}++;
219
220   return;
221 }
222
223 # very similar to above, but seems to FAIL if I set InactiveDestroy
224 sub _verify_tid {
225   my $self = shift;
226
227   if ( ! defined $self->_conn_tid ) {
228     return; # no threads
229   }
230   elsif ( $self->_conn_tid == threads->tid ) {
231     return; # same thread
232   }
233
234   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
235   $self->_dbh(undef);
236   $self->{_dbh_gen}++;
237
238   return;
239 }
240
241
242 =head2 connect_info
243
244 This method is normally called by L<DBIx::Class::Schema/connection>, which
245 encapsulates its argument list in an arrayref before passing them here.
246
247 The argument list may contain:
248
249 =over
250
251 =item *
252
253 The same 4-element argument set one would normally pass to
254 L<DBI/connect>, optionally followed by
255 L<extra attributes|/DBIx::Class specific connection attributes>
256 recognized by DBIx::Class:
257
258   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
259
260 =item *
261
262 A single code reference which returns a connected
263 L<DBI database handle|DBI/connect> optionally followed by
264 L<extra attributes|/DBIx::Class specific connection attributes> recognized
265 by DBIx::Class:
266
267   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
268
269 =item *
270
271 A single hashref with all the attributes and the dsn/user/password
272 mixed together:
273
274   $connect_info_args = [{
275     dsn => $dsn,
276     user => $user,
277     password => $pass,
278     %dbi_attributes,
279     %extra_attributes,
280   }];
281
282   $connect_info_args = [{
283     dbh_maker => sub { DBI->connect (...) },
284     %dbi_attributes,
285     %extra_attributes,
286   }];
287
288 This is particularly useful for L<Catalyst> based applications, allowing the
289 following config (L<Config::General> style):
290
291   <Model::DB>
292     schema_class   App::DB
293     <connect_info>
294       dsn          dbi:mysql:database=test
295       user         testuser
296       password     TestPass
297       AutoCommit   1
298     </connect_info>
299   </Model::DB>
300
301 The C<dsn>/C<user>/C<password> combination can be substituted by the
302 C<dbh_maker> key whose value is a coderef that returns a connected
303 L<DBI database handle|DBI/connect>
304
305 =back
306
307 Please note that the L<DBI> docs recommend that you always explicitly
308 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
309 recommends that it be set to I<1>, and that you perform transactions
310 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
311 to I<1> if you do not do explicitly set it to zero.  This is the default
312 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
313
314 =head3 DBIx::Class specific connection attributes
315
316 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
317 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
318 the following connection options. These options can be mixed in with your other
319 L<DBI> connection attributes, or placed in a separate hashref
320 (C<\%extra_attributes>) as shown above.
321
322 Every time C<connect_info> is invoked, any previous settings for
323 these options will be cleared before setting the new ones, regardless of
324 whether any options are specified in the new C<connect_info>.
325
326
327 =over
328
329 =item on_connect_do
330
331 Specifies things to do immediately after connecting or re-connecting to
332 the database.  Its value may contain:
333
334 =over
335
336 =item a scalar
337
338 This contains one SQL statement to execute.
339
340 =item an array reference
341
342 This contains SQL statements to execute in order.  Each element contains
343 a string or a code reference that returns a string.
344
345 =item a code reference
346
347 This contains some code to execute.  Unlike code references within an
348 array reference, its return value is ignored.
349
350 =back
351
352 =item on_disconnect_do
353
354 Takes arguments in the same form as L</on_connect_do> and executes them
355 immediately before disconnecting from the database.
356
357 Note, this only runs if you explicitly call L</disconnect> on the
358 storage object.
359
360 =item on_connect_call
361
362 A more generalized form of L</on_connect_do> that calls the specified
363 C<connect_call_METHOD> methods in your storage driver.
364
365   on_connect_do => 'select 1'
366
367 is equivalent to:
368
369   on_connect_call => [ [ do_sql => 'select 1' ] ]
370
371 Its values may contain:
372
373 =over
374
375 =item a scalar
376
377 Will call the C<connect_call_METHOD> method.
378
379 =item a code reference
380
381 Will execute C<< $code->($storage) >>
382
383 =item an array reference
384
385 Each value can be a method name or code reference.
386
387 =item an array of arrays
388
389 For each array, the first item is taken to be the C<connect_call_> method name
390 or code reference, and the rest are parameters to it.
391
392 =back
393
394 Some predefined storage methods you may use:
395
396 =over
397
398 =item do_sql
399
400 Executes a SQL string or a code reference that returns a SQL string. This is
401 what L</on_connect_do> and L</on_disconnect_do> use.
402
403 It can take:
404
405 =over
406
407 =item a scalar
408
409 Will execute the scalar as SQL.
410
411 =item an arrayref
412
413 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
414 attributes hashref and bind values.
415
416 =item a code reference
417
418 Will execute C<< $code->($storage) >> and execute the return array refs as
419 above.
420
421 =back
422
423 =item datetime_setup
424
425 Execute any statements necessary to initialize the database session to return
426 and accept datetime/timestamp values used with
427 L<DBIx::Class::InflateColumn::DateTime>.
428
429 Only necessary for some databases, see your specific storage driver for
430 implementation details.
431
432 =back
433
434 =item on_disconnect_call
435
436 Takes arguments in the same form as L</on_connect_call> and executes them
437 immediately before disconnecting from the database.
438
439 Calls the C<disconnect_call_METHOD> methods as opposed to the
440 C<connect_call_METHOD> methods called by L</on_connect_call>.
441
442 Note, this only runs if you explicitly call L</disconnect> on the
443 storage object.
444
445 =item disable_sth_caching
446
447 If set to a true value, this option will disable the caching of
448 statement handles via L<DBI/prepare_cached>.
449
450 =item limit_dialect
451
452 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
453 default L</sql_limit_dialect> setting of the storage (if any). For a list
454 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
455
456 =item quote_char
457
458 Specifies what characters to use to quote table and column names.
459
460 C<quote_char> expects either a single character, in which case is it
461 is placed on either side of the table/column name, or an arrayref of length
462 2 in which case the table/column name is placed between the elements.
463
464 For example under MySQL you should use C<< quote_char => '`' >>, and for
465 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
466
467 =item name_sep
468
469 This parameter is only useful in conjunction with C<quote_char>, and is used to
470 specify the character that separates elements (schemas, tables, columns) from
471 each other. If unspecified it defaults to the most commonly used C<.>.
472
473 =item unsafe
474
475 This Storage driver normally installs its own C<HandleError>, sets
476 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
477 all database handles, including those supplied by a coderef.  It does this
478 so that it can have consistent and useful error behavior.
479
480 If you set this option to a true value, Storage will not do its usual
481 modifications to the database handle's attributes, and instead relies on
482 the settings in your connect_info DBI options (or the values you set in
483 your connection coderef, in the case that you are connecting via coderef).
484
485 Note that your custom settings can cause Storage to malfunction,
486 especially if you set a C<HandleError> handler that suppresses exceptions
487 and/or disable C<RaiseError>.
488
489 =item auto_savepoint
490
491 If this option is true, L<DBIx::Class> will use savepoints when nesting
492 transactions, making it possible to recover from failure in the inner
493 transaction without having to abort all outer transactions.
494
495 =item cursor_class
496
497 Use this argument to supply a cursor class other than the default
498 L<DBIx::Class::Storage::DBI::Cursor>.
499
500 =back
501
502 Some real-life examples of arguments to L</connect_info> and
503 L<DBIx::Class::Schema/connect>
504
505   # Simple SQLite connection
506   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
507
508   # Connect via subref
509   ->connect_info([ sub { DBI->connect(...) } ]);
510
511   # Connect via subref in hashref
512   ->connect_info([{
513     dbh_maker => sub { DBI->connect(...) },
514     on_connect_do => 'alter session ...',
515   }]);
516
517   # A bit more complicated
518   ->connect_info(
519     [
520       'dbi:Pg:dbname=foo',
521       'postgres',
522       'my_pg_password',
523       { AutoCommit => 1 },
524       { quote_char => q{"} },
525     ]
526   );
527
528   # Equivalent to the previous example
529   ->connect_info(
530     [
531       'dbi:Pg:dbname=foo',
532       'postgres',
533       'my_pg_password',
534       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
535     ]
536   );
537
538   # Same, but with hashref as argument
539   # See parse_connect_info for explanation
540   ->connect_info(
541     [{
542       dsn         => 'dbi:Pg:dbname=foo',
543       user        => 'postgres',
544       password    => 'my_pg_password',
545       AutoCommit  => 1,
546       quote_char  => q{"},
547       name_sep    => q{.},
548     }]
549   );
550
551   # Subref + DBIx::Class-specific connection options
552   ->connect_info(
553     [
554       sub { DBI->connect(...) },
555       {
556           quote_char => q{`},
557           name_sep => q{@},
558           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
559           disable_sth_caching => 1,
560       },
561     ]
562   );
563
564
565
566 =cut
567
568 sub connect_info {
569   my ($self, $info) = @_;
570
571   return $self->_connect_info if !$info;
572
573   $self->_connect_info($info); # copy for _connect_info
574
575   $info = $self->_normalize_connect_info($info)
576     if ref $info eq 'ARRAY';
577
578   for my $storage_opt (keys %{ $info->{storage_options} }) {
579     my $value = $info->{storage_options}{$storage_opt};
580
581     $self->$storage_opt($value);
582   }
583
584   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
585   #  the new set of options
586   $self->_sql_maker(undef);
587   $self->_sql_maker_opts({});
588
589   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
590     my $value = $info->{sql_maker_options}{$sql_maker_opt};
591
592     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
593   }
594
595   my %attrs = (
596     %{ $self->_default_dbi_connect_attributes || {} },
597     %{ $info->{attributes} || {} },
598   );
599
600   my @args = @{ $info->{arguments} };
601
602   $self->_dbi_connect_info([@args,
603     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
604
605   # FIXME - dirty:
606   # save attributes them in a separate accessor so they are always
607   # introspectable, even in case of a CODE $dbhmaker
608   $self->_dbic_connect_attributes (\%attrs);
609
610   return $self->_connect_info;
611 }
612
613 sub _normalize_connect_info {
614   my ($self, $info_arg) = @_;
615   my %info;
616
617   my @args = @$info_arg;  # take a shallow copy for further mutilation
618
619   # combine/pre-parse arguments depending on invocation style
620
621   my %attrs;
622   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
623     %attrs = %{ $args[1] || {} };
624     @args = $args[0];
625   }
626   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
627     %attrs = %{$args[0]};
628     @args = ();
629     if (my $code = delete $attrs{dbh_maker}) {
630       @args = $code;
631
632       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
633       if (@ignored) {
634         carp sprintf (
635             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
636           . "to the result of 'dbh_maker'",
637
638           join (', ', map { "'$_'" } (@ignored) ),
639         );
640       }
641     }
642     else {
643       @args = delete @attrs{qw/dsn user password/};
644     }
645   }
646   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
647     %attrs = (
648       % { $args[3] || {} },
649       % { $args[4] || {} },
650     );
651     @args = @args[0,1,2];
652   }
653
654   $info{arguments} = \@args;
655
656   my @storage_opts = grep exists $attrs{$_},
657     @storage_options, 'cursor_class';
658
659   @{ $info{storage_options} }{@storage_opts} =
660     delete @attrs{@storage_opts} if @storage_opts;
661
662   my @sql_maker_opts = grep exists $attrs{$_},
663     qw/limit_dialect quote_char name_sep/;
664
665   @{ $info{sql_maker_options} }{@sql_maker_opts} =
666     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
667
668   $info{attributes} = \%attrs if %attrs;
669
670   return \%info;
671 }
672
673 sub _default_dbi_connect_attributes {
674   return {
675     AutoCommit => 1,
676     RaiseError => 1,
677     PrintError => 0,
678   };
679 }
680
681 =head2 on_connect_do
682
683 This method is deprecated in favour of setting via L</connect_info>.
684
685 =cut
686
687 =head2 on_disconnect_do
688
689 This method is deprecated in favour of setting via L</connect_info>.
690
691 =cut
692
693 sub _parse_connect_do {
694   my ($self, $type) = @_;
695
696   my $val = $self->$type;
697   return () if not defined $val;
698
699   my @res;
700
701   if (not ref($val)) {
702     push @res, [ 'do_sql', $val ];
703   } elsif (ref($val) eq 'CODE') {
704     push @res, $val;
705   } elsif (ref($val) eq 'ARRAY') {
706     push @res, map { [ 'do_sql', $_ ] } @$val;
707   } else {
708     $self->throw_exception("Invalid type for $type: ".ref($val));
709   }
710
711   return \@res;
712 }
713
714 =head2 dbh_do
715
716 Arguments: ($subref | $method_name), @extra_coderef_args?
717
718 Execute the given $subref or $method_name using the new exception-based
719 connection management.
720
721 The first two arguments will be the storage object that C<dbh_do> was called
722 on and a database handle to use.  Any additional arguments will be passed
723 verbatim to the called subref as arguments 2 and onwards.
724
725 Using this (instead of $self->_dbh or $self->dbh) ensures correct
726 exception handling and reconnection (or failover in future subclasses).
727
728 Your subref should have no side-effects outside of the database, as
729 there is the potential for your subref to be partially double-executed
730 if the database connection was stale/dysfunctional.
731
732 Example:
733
734   my @stuff = $schema->storage->dbh_do(
735     sub {
736       my ($storage, $dbh, @cols) = @_;
737       my $cols = join(q{, }, @cols);
738       $dbh->selectrow_array("SELECT $cols FROM foo");
739     },
740     @column_list
741   );
742
743 =cut
744
745 sub dbh_do {
746   my $self = shift;
747   my $code = shift;
748
749   my $dbh = $self->_get_dbh;
750
751   return $self->$code($dbh, @_)
752     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
753
754   local $self->{_in_dbh_do} = 1;
755
756   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
757   my $args = \@_;
758   return try {
759     $self->$code ($dbh, @$args);
760   } catch {
761     $self->throw_exception($_) if $self->connected;
762
763     # We were not connected - reconnect and retry, but let any
764     #  exception fall right through this time
765     carp "Retrying $code after catching disconnected exception: $_"
766       if $ENV{DBIC_DBIRETRY_DEBUG};
767
768     $self->_populate_dbh;
769     $self->$code($self->_dbh, @$args);
770   };
771 }
772
773 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
774 # It also informs dbh_do to bypass itself while under the direction of txn_do,
775 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
776 sub txn_do {
777   my $self = shift;
778   my $coderef = shift;
779
780   ref $coderef eq 'CODE' or $self->throw_exception
781     ('$coderef must be a CODE reference');
782
783   local $self->{_in_dbh_do} = 1;
784
785   my @result;
786   my $want_array = wantarray;
787
788   my $tried = 0;
789   while(1) {
790     my $exception;
791
792     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
793     my $args = \@_;
794
795     try {
796       $self->txn_begin;
797       my $txn_start_depth = $self->transaction_depth;
798       if($want_array) {
799           @result = $coderef->(@$args);
800       }
801       elsif(defined $want_array) {
802           $result[0] = $coderef->(@$args);
803       }
804       else {
805           $coderef->(@$args);
806       }
807
808       my $delta_txn = $txn_start_depth - $self->transaction_depth;
809       if ($delta_txn == 0) {
810         $self->txn_commit;
811       }
812       elsif ($delta_txn != 1) {
813         # an off-by-one would mean we fired a rollback
814         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
815       }
816     } catch {
817       $exception = $_;
818     };
819
820     if(! defined $exception) { return $want_array ? @result : $result[0] }
821
822     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
823       my $rollback_exception;
824       try { $self->txn_rollback } catch { $rollback_exception = shift };
825       if(defined $rollback_exception) {
826         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
827         $self->throw_exception($exception)  # propagate nested rollback
828           if $rollback_exception =~ /$exception_class/;
829
830         $self->throw_exception(
831           "Transaction aborted: ${exception}. "
832           . "Rollback failed: ${rollback_exception}"
833         );
834       }
835       $self->throw_exception($exception)
836     }
837
838     # We were not connected, and was first try - reconnect and retry
839     # via the while loop
840     carp "Retrying $coderef after catching disconnected exception: $exception"
841       if $ENV{DBIC_TXNRETRY_DEBUG};
842     $self->_populate_dbh;
843   }
844 }
845
846 =head2 disconnect
847
848 Our C<disconnect> method also performs a rollback first if the
849 database is not in C<AutoCommit> mode.
850
851 =cut
852
853 sub disconnect {
854   my ($self) = @_;
855
856   if( $self->_dbh ) {
857     my @actions;
858
859     push @actions, ( $self->on_disconnect_call || () );
860     push @actions, $self->_parse_connect_do ('on_disconnect_do');
861
862     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
863
864     $self->_dbh_rollback unless $self->_dbh_autocommit;
865
866     %{ $self->_dbh->{CachedKids} } = ();
867     $self->_dbh->disconnect;
868     $self->_dbh(undef);
869     $self->{_dbh_gen}++;
870   }
871 }
872
873 =head2 with_deferred_fk_checks
874
875 =over 4
876
877 =item Arguments: C<$coderef>
878
879 =item Return Value: The return value of $coderef
880
881 =back
882
883 Storage specific method to run the code ref with FK checks deferred or
884 in MySQL's case disabled entirely.
885
886 =cut
887
888 # Storage subclasses should override this
889 sub with_deferred_fk_checks {
890   my ($self, $sub) = @_;
891   $sub->();
892 }
893
894 =head2 connected
895
896 =over
897
898 =item Arguments: none
899
900 =item Return Value: 1|0
901
902 =back
903
904 Verifies that the current database handle is active and ready to execute
905 an SQL statement (e.g. the connection did not get stale, server is still
906 answering, etc.) This method is used internally by L</dbh>.
907
908 =cut
909
910 sub connected {
911   my $self = shift;
912   return 0 unless $self->_seems_connected;
913
914   #be on the safe side
915   local $self->_dbh->{RaiseError} = 1;
916
917   return $self->_ping;
918 }
919
920 sub _seems_connected {
921   my $self = shift;
922
923   $self->_preserve_foreign_dbh;
924
925   my $dbh = $self->_dbh
926     or return 0;
927
928   return $dbh->FETCH('Active');
929 }
930
931 sub _ping {
932   my $self = shift;
933
934   my $dbh = $self->_dbh or return 0;
935
936   return $dbh->ping;
937 }
938
939 sub ensure_connected {
940   my ($self) = @_;
941
942   unless ($self->connected) {
943     $self->_populate_dbh;
944   }
945 }
946
947 =head2 dbh
948
949 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
950 is guaranteed to be healthy by implicitly calling L</connected>, and if
951 necessary performing a reconnection before returning. Keep in mind that this
952 is very B<expensive> on some database engines. Consider using L</dbh_do>
953 instead.
954
955 =cut
956
957 sub dbh {
958   my ($self) = @_;
959
960   if (not $self->_dbh) {
961     $self->_populate_dbh;
962   } else {
963     $self->ensure_connected;
964   }
965   return $self->_dbh;
966 }
967
968 # this is the internal "get dbh or connect (don't check)" method
969 sub _get_dbh {
970   my $self = shift;
971   $self->_preserve_foreign_dbh;
972   $self->_populate_dbh unless $self->_dbh;
973   return $self->_dbh;
974 }
975
976 sub sql_maker {
977   my ($self) = @_;
978   unless ($self->_sql_maker) {
979     my $sql_maker_class = $self->sql_maker_class;
980     $self->ensure_class_loaded ($sql_maker_class);
981
982     my %opts = %{$self->_sql_maker_opts||{}};
983     my $dialect =
984       $opts{limit_dialect}
985         ||
986       $self->sql_limit_dialect
987         ||
988       do {
989         my $s_class = (ref $self) || $self;
990         carp (
991           "Your storage class ($s_class) does not set sql_limit_dialect and you "
992         . 'have not supplied an explicit limit_dialect in your connection_info. '
993         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
994         . 'databases but can be (and often is) painfully slow.'
995         );
996
997         'GenericSubQ';
998       }
999     ;
1000
1001     $self->_sql_maker($sql_maker_class->new(
1002       bindtype=>'columns',
1003       array_datatypes => 1,
1004       limit_dialect => $dialect,
1005       name_sep => '.',
1006       %opts,
1007     ));
1008   }
1009   return $self->_sql_maker;
1010 }
1011
1012 # nothing to do by default
1013 sub _rebless {}
1014 sub _init {}
1015
1016 sub _populate_dbh {
1017   my ($self) = @_;
1018
1019   my @info = @{$self->_dbi_connect_info || []};
1020   $self->_dbh(undef); # in case ->connected failed we might get sent here
1021   $self->_dbh_details({}); # reset everything we know
1022
1023   $self->_dbh($self->_connect(@info));
1024
1025   $self->_conn_pid($$);
1026   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1027
1028   $self->_determine_driver;
1029
1030   # Always set the transaction depth on connect, since
1031   #  there is no transaction in progress by definition
1032   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1033
1034   $self->_run_connection_actions unless $self->{_in_determine_driver};
1035 }
1036
1037 sub _run_connection_actions {
1038   my $self = shift;
1039   my @actions;
1040
1041   push @actions, ( $self->on_connect_call || () );
1042   push @actions, $self->_parse_connect_do ('on_connect_do');
1043
1044   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1045 }
1046
1047
1048
1049 sub set_use_dbms_capability {
1050   $_[0]->set_inherited ($_[1], $_[2]);
1051 }
1052
1053 sub get_use_dbms_capability {
1054   my ($self, $capname) = @_;
1055
1056   my $use = $self->get_inherited ($capname);
1057   return defined $use
1058     ? $use
1059     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1060   ;
1061 }
1062
1063 sub set_dbms_capability {
1064   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1065 }
1066
1067 sub get_dbms_capability {
1068   my ($self, $capname) = @_;
1069
1070   my $cap = $self->_dbh_details->{capability}{$capname};
1071
1072   unless (defined $cap) {
1073     if (my $meth = $self->can ("_determine$capname")) {
1074       $cap = $self->$meth ? 1 : 0;
1075     }
1076     else {
1077       $cap = 0;
1078     }
1079
1080     $self->set_dbms_capability ($capname, $cap);
1081   }
1082
1083   return $cap;
1084 }
1085
1086 sub _server_info {
1087   my $self = shift;
1088
1089   my $info;
1090   unless ($info = $self->_dbh_details->{info}) {
1091
1092     $info = {};
1093
1094     my $server_version = try { $self->_get_server_version };
1095
1096     if (defined $server_version) {
1097       $info->{dbms_version} = $server_version;
1098
1099       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1100       my @verparts = split (/\./, $numeric_version);
1101       if (
1102         @verparts
1103           &&
1104         $verparts[0] <= 999
1105       ) {
1106         # consider only up to 3 version parts, iff not more than 3 digits
1107         my @use_parts;
1108         while (@verparts && @use_parts < 3) {
1109           my $p = shift @verparts;
1110           last if $p > 999;
1111           push @use_parts, $p;
1112         }
1113         push @use_parts, 0 while @use_parts < 3;
1114
1115         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1116       }
1117     }
1118
1119     $self->_dbh_details->{info} = $info;
1120   }
1121
1122   return $info;
1123 }
1124
1125 sub _get_server_version {
1126   shift->_get_dbh->get_info(18);
1127 }
1128
1129 sub _determine_driver {
1130   my ($self) = @_;
1131
1132   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1133     my $started_connected = 0;
1134     local $self->{_in_determine_driver} = 1;
1135
1136     if (ref($self) eq __PACKAGE__) {
1137       my $driver;
1138       if ($self->_dbh) { # we are connected
1139         $driver = $self->_dbh->{Driver}{Name};
1140         $started_connected = 1;
1141       } else {
1142         # if connect_info is a CODEREF, we have no choice but to connect
1143         if (ref $self->_dbi_connect_info->[0] &&
1144             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1145           $self->_populate_dbh;
1146           $driver = $self->_dbh->{Driver}{Name};
1147         }
1148         else {
1149           # try to use dsn to not require being connected, the driver may still
1150           # force a connection in _rebless to determine version
1151           # (dsn may not be supplied at all if all we do is make a mock-schema)
1152           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1153           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1154           $driver ||= $ENV{DBI_DRIVER};
1155         }
1156       }
1157
1158       if ($driver) {
1159         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1160         if ($self->load_optional_class($storage_class)) {
1161           mro::set_mro($storage_class, 'c3');
1162           bless $self, $storage_class;
1163           $self->_rebless();
1164         }
1165       }
1166     }
1167
1168     $self->_driver_determined(1);
1169
1170     $self->_init; # run driver-specific initializations
1171
1172     $self->_run_connection_actions
1173         if !$started_connected && defined $self->_dbh;
1174   }
1175 }
1176
1177 sub _do_connection_actions {
1178   my $self          = shift;
1179   my $method_prefix = shift;
1180   my $call          = shift;
1181
1182   if (not ref($call)) {
1183     my $method = $method_prefix . $call;
1184     $self->$method(@_);
1185   } elsif (ref($call) eq 'CODE') {
1186     $self->$call(@_);
1187   } elsif (ref($call) eq 'ARRAY') {
1188     if (ref($call->[0]) ne 'ARRAY') {
1189       $self->_do_connection_actions($method_prefix, $_) for @$call;
1190     } else {
1191       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1192     }
1193   } else {
1194     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1195   }
1196
1197   return $self;
1198 }
1199
1200 sub connect_call_do_sql {
1201   my $self = shift;
1202   $self->_do_query(@_);
1203 }
1204
1205 sub disconnect_call_do_sql {
1206   my $self = shift;
1207   $self->_do_query(@_);
1208 }
1209
1210 # override in db-specific backend when necessary
1211 sub connect_call_datetime_setup { 1 }
1212
1213 sub _do_query {
1214   my ($self, $action) = @_;
1215
1216   if (ref $action eq 'CODE') {
1217     $action = $action->($self);
1218     $self->_do_query($_) foreach @$action;
1219   }
1220   else {
1221     # Most debuggers expect ($sql, @bind), so we need to exclude
1222     # the attribute hash which is the second argument to $dbh->do
1223     # furthermore the bind values are usually to be presented
1224     # as named arrayref pairs, so wrap those here too
1225     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1226     my $sql = shift @do_args;
1227     my $attrs = shift @do_args;
1228     my @bind = map { [ undef, $_ ] } @do_args;
1229
1230     $self->_query_start($sql, @bind);
1231     $self->_get_dbh->do($sql, $attrs, @do_args);
1232     $self->_query_end($sql, @bind);
1233   }
1234
1235   return $self;
1236 }
1237
1238 sub _connect {
1239   my ($self, @info) = @_;
1240
1241   $self->throw_exception("You failed to provide any connection info")
1242     if !@info;
1243
1244   my ($old_connect_via, $dbh);
1245
1246   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1247     $old_connect_via = $DBI::connect_via;
1248     $DBI::connect_via = 'connect';
1249   }
1250
1251   try {
1252     if(ref $info[0] eq 'CODE') {
1253        $dbh = $info[0]->();
1254     }
1255     else {
1256        $dbh = DBI->connect(@info);
1257     }
1258
1259     if (!$dbh) {
1260       die $DBI::errstr;
1261     }
1262
1263     unless ($self->unsafe) {
1264
1265       # this odd anonymous coderef dereference is in fact really
1266       # necessary to avoid the unwanted effect described in perl5
1267       # RT#75792
1268       sub {
1269         my $weak_self = $_[0];
1270         weaken $weak_self;
1271
1272         $_[1]->{HandleError} = sub {
1273           if ($weak_self) {
1274             $weak_self->throw_exception("DBI Exception: $_[0]");
1275           }
1276           else {
1277             # the handler may be invoked by something totally out of
1278             # the scope of DBIC
1279             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1280           }
1281         };
1282       }->($self, $dbh);
1283
1284       $dbh->{ShowErrorStatement} = 1;
1285       $dbh->{RaiseError} = 1;
1286       $dbh->{PrintError} = 0;
1287     }
1288   }
1289   catch {
1290     $self->throw_exception("DBI Connection failed: $_")
1291   }
1292   finally {
1293     $DBI::connect_via = $old_connect_via if $old_connect_via;
1294   };
1295
1296   $self->_dbh_autocommit($dbh->{AutoCommit});
1297   $dbh;
1298 }
1299
1300 sub svp_begin {
1301   my ($self, $name) = @_;
1302
1303   $name = $self->_svp_generate_name
1304     unless defined $name;
1305
1306   $self->throw_exception ("You can't use savepoints outside a transaction")
1307     if $self->{transaction_depth} == 0;
1308
1309   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1310     unless $self->can('_svp_begin');
1311
1312   push @{ $self->{savepoints} }, $name;
1313
1314   $self->debugobj->svp_begin($name) if $self->debug;
1315
1316   return $self->_svp_begin($name);
1317 }
1318
1319 sub svp_release {
1320   my ($self, $name) = @_;
1321
1322   $self->throw_exception ("You can't use savepoints outside a transaction")
1323     if $self->{transaction_depth} == 0;
1324
1325   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1326     unless $self->can('_svp_release');
1327
1328   if (defined $name) {
1329     $self->throw_exception ("Savepoint '$name' does not exist")
1330       unless grep { $_ eq $name } @{ $self->{savepoints} };
1331
1332     # Dig through the stack until we find the one we are releasing.  This keeps
1333     # the stack up to date.
1334     my $svp;
1335
1336     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1337   } else {
1338     $name = pop @{ $self->{savepoints} };
1339   }
1340
1341   $self->debugobj->svp_release($name) if $self->debug;
1342
1343   return $self->_svp_release($name);
1344 }
1345
1346 sub svp_rollback {
1347   my ($self, $name) = @_;
1348
1349   $self->throw_exception ("You can't use savepoints outside a transaction")
1350     if $self->{transaction_depth} == 0;
1351
1352   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1353     unless $self->can('_svp_rollback');
1354
1355   if (defined $name) {
1356       # If they passed us a name, verify that it exists in the stack
1357       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1358           $self->throw_exception("Savepoint '$name' does not exist!");
1359       }
1360
1361       # Dig through the stack until we find the one we are releasing.  This keeps
1362       # the stack up to date.
1363       while(my $s = pop(@{ $self->{savepoints} })) {
1364           last if($s eq $name);
1365       }
1366       # Add the savepoint back to the stack, as a rollback doesn't remove the
1367       # named savepoint, only everything after it.
1368       push(@{ $self->{savepoints} }, $name);
1369   } else {
1370       # We'll assume they want to rollback to the last savepoint
1371       $name = $self->{savepoints}->[-1];
1372   }
1373
1374   $self->debugobj->svp_rollback($name) if $self->debug;
1375
1376   return $self->_svp_rollback($name);
1377 }
1378
1379 sub _svp_generate_name {
1380   my ($self) = @_;
1381   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1382 }
1383
1384 sub txn_begin {
1385   my $self = shift;
1386
1387   # this means we have not yet connected and do not know the AC status
1388   # (e.g. coderef $dbh)
1389   if (! defined $self->_dbh_autocommit) {
1390     $self->ensure_connected;
1391   }
1392   # otherwise re-connect on pid changes, so
1393   # that the txn_depth is adjusted properly
1394   # the lightweight _get_dbh is good enoug here
1395   # (only superficial handle check, no pings)
1396   else {
1397     $self->_get_dbh;
1398   }
1399
1400   if($self->transaction_depth == 0) {
1401     $self->debugobj->txn_begin()
1402       if $self->debug;
1403     $self->_dbh_begin_work;
1404   }
1405   elsif ($self->auto_savepoint) {
1406     $self->svp_begin;
1407   }
1408   $self->{transaction_depth}++;
1409 }
1410
1411 sub _dbh_begin_work {
1412   my $self = shift;
1413
1414   # if the user is utilizing txn_do - good for him, otherwise we need to
1415   # ensure that the $dbh is healthy on BEGIN.
1416   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1417   # will be replaced by a failure of begin_work itself (which will be
1418   # then retried on reconnect)
1419   if ($self->{_in_dbh_do}) {
1420     $self->_dbh->begin_work;
1421   } else {
1422     $self->dbh_do(sub { $_[1]->begin_work });
1423   }
1424 }
1425
1426 sub txn_commit {
1427   my $self = shift;
1428   if ($self->{transaction_depth} == 1) {
1429     $self->debugobj->txn_commit()
1430       if ($self->debug);
1431     $self->_dbh_commit;
1432     $self->{transaction_depth} = 0
1433       if $self->_dbh_autocommit;
1434   }
1435   elsif($self->{transaction_depth} > 1) {
1436     $self->{transaction_depth}--;
1437     $self->svp_release
1438       if $self->auto_savepoint;
1439   }
1440   else {
1441     $self->throw_exception( 'Refusing to commit without a started transaction' );
1442   }
1443 }
1444
1445 sub _dbh_commit {
1446   my $self = shift;
1447   my $dbh  = $self->_dbh
1448     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1449   $dbh->commit;
1450 }
1451
1452 sub txn_rollback {
1453   my $self = shift;
1454   my $dbh = $self->_dbh;
1455   try {
1456     if ($self->{transaction_depth} == 1) {
1457       $self->debugobj->txn_rollback()
1458         if ($self->debug);
1459       $self->{transaction_depth} = 0
1460         if $self->_dbh_autocommit;
1461       $self->_dbh_rollback;
1462     }
1463     elsif($self->{transaction_depth} > 1) {
1464       $self->{transaction_depth}--;
1465       if ($self->auto_savepoint) {
1466         $self->svp_rollback;
1467         $self->svp_release;
1468       }
1469     }
1470     else {
1471       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1472     }
1473   }
1474   catch {
1475     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1476
1477     if ($_ !~ /$exception_class/) {
1478       # ensure that a failed rollback resets the transaction depth
1479       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1480     }
1481
1482     $self->throw_exception($_)
1483   };
1484 }
1485
1486 sub _dbh_rollback {
1487   my $self = shift;
1488   my $dbh  = $self->_dbh
1489     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1490   $dbh->rollback;
1491 }
1492
1493 # This used to be the top-half of _execute.  It was split out to make it
1494 #  easier to override in NoBindVars without duping the rest.  It takes up
1495 #  all of _execute's args, and emits $sql, @bind.
1496 sub _prep_for_execute {
1497   my ($self, $op, $extra_bind, $ident, $args) = @_;
1498
1499   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1500     $ident = $ident->from();
1501   }
1502
1503   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1504
1505   unshift(@bind,
1506     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1507       if $extra_bind;
1508   return ($sql, \@bind);
1509 }
1510
1511
1512 sub _fix_bind_params {
1513     my ($self, @bind) = @_;
1514
1515     ### Turn @bind from something like this:
1516     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1517     ### to this:
1518     ###   ( "'1'", "'1'", "'3'" )
1519     return
1520         map {
1521             if ( defined( $_ && $_->[1] ) ) {
1522                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1523             }
1524             else { q{NULL}; }
1525         } @bind;
1526 }
1527
1528 sub _query_start {
1529     my ( $self, $sql, @bind ) = @_;
1530
1531     if ( $self->debug ) {
1532         @bind = $self->_fix_bind_params(@bind);
1533
1534         $self->debugobj->query_start( $sql, @bind );
1535     }
1536 }
1537
1538 sub _query_end {
1539     my ( $self, $sql, @bind ) = @_;
1540
1541     if ( $self->debug ) {
1542         @bind = $self->_fix_bind_params(@bind);
1543         $self->debugobj->query_end( $sql, @bind );
1544     }
1545 }
1546
1547 sub _dbh_execute {
1548   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1549
1550   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1551
1552   $self->_query_start( $sql, @$bind );
1553
1554   my $sth = $self->sth($sql,$op);
1555
1556   my $placeholder_index = 1;
1557
1558   foreach my $bound (@$bind) {
1559     my $attributes = {};
1560     my($column_name, @data) = @$bound;
1561
1562     if ($bind_attributes) {
1563       $attributes = $bind_attributes->{$column_name}
1564       if defined $bind_attributes->{$column_name};
1565     }
1566
1567     foreach my $data (@data) {
1568       my $ref = ref $data;
1569       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1570
1571       $sth->bind_param($placeholder_index, $data, $attributes);
1572       $placeholder_index++;
1573     }
1574   }
1575
1576   # Can this fail without throwing an exception anyways???
1577   my $rv = $sth->execute();
1578   $self->throw_exception(
1579     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1580   ) if !$rv;
1581
1582   $self->_query_end( $sql, @$bind );
1583
1584   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1585 }
1586
1587 sub _execute {
1588     my $self = shift;
1589     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1590 }
1591
1592 sub insert {
1593   my ($self, $source, $to_insert) = @_;
1594
1595   my $colinfo = $source->columns_info;
1596
1597   # mix with auto-nextval marked values (a bit of a speed hit, but
1598   # no saner way to handle this yet)
1599   my $auto_nextvals = {} ;
1600   for my $col (keys %$colinfo) {
1601     if (
1602       $colinfo->{$col}{auto_nextval}
1603         and
1604       (
1605         ! exists $to_insert->{$col}
1606           or
1607         ref $to_insert->{$col} eq 'SCALAR'
1608       )
1609     ) {
1610       $auto_nextvals->{$col} = $self->_sequence_fetch(
1611         'nextval',
1612         ( $colinfo->{$col}{sequence} ||=
1613             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1614         ),
1615       );
1616     }
1617   }
1618
1619   # fuse the values
1620   $to_insert = { %$to_insert, %$auto_nextvals };
1621
1622   # list of primary keys we try to fetch from the database
1623   # both not-exsists and scalarrefs are considered
1624   my %fetch_pks;
1625   %fetch_pks = ( map
1626     { $_ => scalar keys %fetch_pks }  # so we can preserve order for prettyness
1627     grep
1628       { ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR' }
1629       $source->primary_columns
1630   );
1631
1632   my $sqla_opts;
1633   if ($self->_use_insert_returning) {
1634
1635     # retain order as declared in the resultsource
1636     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1637       push @{$sqla_opts->{returning}}, $_;
1638     }
1639   }
1640
1641   my $bind_attributes = $self->source_bind_attributes($source);
1642
1643   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1644
1645   my %returned_cols = %$auto_nextvals;
1646
1647   if (my $retlist = $sqla_opts->{returning}) {
1648     my @ret_vals = try {
1649       local $SIG{__WARN__} = sub {};
1650       my @r = $sth->fetchrow_array;
1651       $sth->finish;
1652       @r;
1653     };
1654
1655     @returned_cols{@$retlist} = @ret_vals if @ret_vals;
1656   }
1657
1658   return \%returned_cols;
1659 }
1660
1661
1662 ## Currently it is assumed that all values passed will be "normal", i.e. not
1663 ## scalar refs, or at least, all the same type as the first set, the statement is
1664 ## only prepped once.
1665 sub insert_bulk {
1666   my ($self, $source, $cols, $data) = @_;
1667
1668   my %colvalues;
1669   @colvalues{@$cols} = (0..$#$cols);
1670
1671   for my $i (0..$#$cols) {
1672     my $first_val = $data->[0][$i];
1673     next unless ref $first_val eq 'SCALAR';
1674
1675     $colvalues{ $cols->[$i] } = $first_val;
1676   }
1677
1678   # check for bad data and stringify stringifiable objects
1679   my $bad_slice = sub {
1680     my ($msg, $col_idx, $slice_idx) = @_;
1681     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1682       $msg,
1683       $cols->[$col_idx],
1684       do {
1685         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1686         Dumper {
1687           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1688         },
1689       }
1690     );
1691   };
1692
1693   for my $datum_idx (0..$#$data) {
1694     my $datum = $data->[$datum_idx];
1695
1696     for my $col_idx (0..$#$cols) {
1697       my $val            = $datum->[$col_idx];
1698       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1699       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1700
1701       if ($is_literal_sql) {
1702         if (not ref $val) {
1703           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1704         }
1705         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1706           $bad_slice->("$reftype reference found where literal SQL expected",
1707             $col_idx, $datum_idx);
1708         }
1709         elsif ($$val ne $$sqla_bind){
1710           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1711             $col_idx, $datum_idx);
1712         }
1713       }
1714       elsif (my $reftype = ref $val) {
1715         require overload;
1716         if (overload::Method($val, '""')) {
1717           $datum->[$col_idx] = "".$val;
1718         }
1719         else {
1720           $bad_slice->("$reftype reference found where bind expected",
1721             $col_idx, $datum_idx);
1722         }
1723       }
1724     }
1725   }
1726
1727   my ($sql, $bind) = $self->_prep_for_execute (
1728     'insert', undef, $source, [\%colvalues]
1729   );
1730   my @bind = @$bind;
1731
1732   my $empty_bind = 1 if (not @bind) &&
1733     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1734
1735   if ((not @bind) && (not $empty_bind)) {
1736     $self->throw_exception(
1737       'Cannot insert_bulk without support for placeholders'
1738     );
1739   }
1740
1741   # neither _execute_array, nor _execute_inserts_with_no_binds are
1742   # atomic (even if _execute _array is a single call). Thus a safety
1743   # scope guard
1744   my $guard = $self->txn_scope_guard;
1745
1746   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1747   my $sth = $self->sth($sql);
1748   my $rv = do {
1749     if ($empty_bind) {
1750       # bind_param_array doesn't work if there are no binds
1751       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1752     }
1753     else {
1754 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1755       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1756     }
1757   };
1758
1759   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1760
1761   $guard->commit;
1762
1763   return (wantarray ? ($rv, $sth, @bind) : $rv);
1764 }
1765
1766 sub _execute_array {
1767   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1768
1769   ## This must be an arrayref, else nothing works!
1770   my $tuple_status = [];
1771
1772   ## Get the bind_attributes, if any exist
1773   my $bind_attributes = $self->source_bind_attributes($source);
1774
1775   ## Bind the values and execute
1776   my $placeholder_index = 1;
1777
1778   foreach my $bound (@$bind) {
1779
1780     my $attributes = {};
1781     my ($column_name, $data_index) = @$bound;
1782
1783     if( $bind_attributes ) {
1784       $attributes = $bind_attributes->{$column_name}
1785       if defined $bind_attributes->{$column_name};
1786     }
1787
1788     my @data = map { $_->[$data_index] } @$data;
1789
1790     $sth->bind_param_array(
1791       $placeholder_index,
1792       [@data],
1793       (%$attributes ?  $attributes : ()),
1794     );
1795     $placeholder_index++;
1796   }
1797
1798   my ($rv, $err);
1799   try {
1800     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1801   }
1802   catch {
1803     $err = shift;
1804   };
1805
1806   # Statement must finish even if there was an exception.
1807   try {
1808     $sth->finish
1809   }
1810   catch {
1811     $err = shift unless defined $err
1812   };
1813
1814   $err = $sth->errstr
1815     if (! defined $err and $sth->err);
1816
1817   if (defined $err) {
1818     my $i = 0;
1819     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1820
1821     $self->throw_exception("Unexpected populate error: $err")
1822       if ($i > $#$tuple_status);
1823
1824     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1825       ($tuple_status->[$i][1] || $err),
1826       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1827     );
1828   }
1829
1830   return $rv;
1831 }
1832
1833 sub _dbh_execute_array {
1834     my ($self, $sth, $tuple_status, @extra) = @_;
1835
1836     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1837 }
1838
1839 sub _dbh_execute_inserts_with_no_binds {
1840   my ($self, $sth, $count) = @_;
1841
1842   my $err;
1843   try {
1844     my $dbh = $self->_get_dbh;
1845     local $dbh->{RaiseError} = 1;
1846     local $dbh->{PrintError} = 0;
1847
1848     $sth->execute foreach 1..$count;
1849   }
1850   catch {
1851     $err = shift;
1852   };
1853
1854   # Make sure statement is finished even if there was an exception.
1855   try {
1856     $sth->finish
1857   }
1858   catch {
1859     $err = shift unless defined $err;
1860   };
1861
1862   $self->throw_exception($err) if defined $err;
1863
1864   return $count;
1865 }
1866
1867 sub update {
1868   my ($self, $source, @args) = @_;
1869
1870   my $bind_attrs = $self->source_bind_attributes($source);
1871
1872   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1873 }
1874
1875
1876 sub delete {
1877   my ($self, $source, @args) = @_;
1878
1879   my $bind_attrs = $self->source_bind_attributes($source);
1880
1881   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1882 }
1883
1884 # We were sent here because the $rs contains a complex search
1885 # which will require a subquery to select the correct rows
1886 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1887 #
1888 # Generating a single PK column subquery is trivial and supported
1889 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1890 # Look at _multipk_update_delete()
1891 sub _subq_update_delete {
1892   my $self = shift;
1893   my ($rs, $op, $values) = @_;
1894
1895   my $rsrc = $rs->result_source;
1896
1897   # quick check if we got a sane rs on our hands
1898   my @pcols = $rsrc->_pri_cols;
1899
1900   my $sel = $rs->_resolved_attrs->{select};
1901   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1902
1903   if (
1904       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1905         ne
1906       join ("\x00", sort @$sel )
1907   ) {
1908     $self->throw_exception (
1909       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1910     );
1911   }
1912
1913   if (@pcols == 1) {
1914     return $self->$op (
1915       $rsrc,
1916       $op eq 'update' ? $values : (),
1917       { $pcols[0] => { -in => $rs->as_query } },
1918     );
1919   }
1920
1921   else {
1922     return $self->_multipk_update_delete (@_);
1923   }
1924 }
1925
1926 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1927 # resultset update/delete involving subqueries. So by default resort
1928 # to simple (and inefficient) delete_all style per-row opearations,
1929 # while allowing specific storages to override this with a faster
1930 # implementation.
1931 #
1932 sub _multipk_update_delete {
1933   return shift->_per_row_update_delete (@_);
1934 }
1935
1936 # This is the default loop used to delete/update rows for multi PK
1937 # resultsets, and used by mysql exclusively (because it can't do anything
1938 # else).
1939 #
1940 # We do not use $row->$op style queries, because resultset update/delete
1941 # is not expected to cascade (this is what delete_all/update_all is for).
1942 #
1943 # There should be no race conditions as the entire operation is rolled
1944 # in a transaction.
1945 #
1946 sub _per_row_update_delete {
1947   my $self = shift;
1948   my ($rs, $op, $values) = @_;
1949
1950   my $rsrc = $rs->result_source;
1951   my @pcols = $rsrc->_pri_cols;
1952
1953   my $guard = $self->txn_scope_guard;
1954
1955   # emulate the return value of $sth->execute for non-selects
1956   my $row_cnt = '0E0';
1957
1958   my $subrs_cur = $rs->cursor;
1959   my @all_pk = $subrs_cur->all;
1960   for my $pks ( @all_pk) {
1961
1962     my $cond;
1963     for my $i (0.. $#pcols) {
1964       $cond->{$pcols[$i]} = $pks->[$i];
1965     }
1966
1967     $self->$op (
1968       $rsrc,
1969       $op eq 'update' ? $values : (),
1970       $cond,
1971     );
1972
1973     $row_cnt++;
1974   }
1975
1976   $guard->commit;
1977
1978   return $row_cnt;
1979 }
1980
1981 sub _select {
1982   my $self = shift;
1983   $self->_execute($self->_select_args(@_));
1984 }
1985
1986 sub _select_args_to_query {
1987   my $self = shift;
1988
1989   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1990   #  = $self->_select_args($ident, $select, $cond, $attrs);
1991   my ($op, $bind, $ident, $bind_attrs, @args) =
1992     $self->_select_args(@_);
1993
1994   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1995   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1996   $prepared_bind ||= [];
1997
1998   return wantarray
1999     ? ($sql, $prepared_bind, $bind_attrs)
2000     : \[ "($sql)", @$prepared_bind ]
2001   ;
2002 }
2003
2004 sub _select_args {
2005   my ($self, $ident, $select, $where, $attrs) = @_;
2006
2007   my $sql_maker = $self->sql_maker;
2008   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2009
2010   $attrs = {
2011     %$attrs,
2012     select => $select,
2013     from => $ident,
2014     where => $where,
2015     $rs_alias && $alias2source->{$rs_alias}
2016       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
2017       : ()
2018     ,
2019   };
2020
2021   # calculate bind_attrs before possible $ident mangling
2022   my $bind_attrs = {};
2023   for my $alias (keys %$alias2source) {
2024     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2025     for my $col (keys %$bindtypes) {
2026
2027       my $fqcn = join ('.', $alias, $col);
2028       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2029
2030       # Unqialified column names are nice, but at the same time can be
2031       # rather ambiguous. What we do here is basically go along with
2032       # the loop, adding an unqualified column slot to $bind_attrs,
2033       # alongside the fully qualified name. As soon as we encounter
2034       # another column by that name (which would imply another table)
2035       # we unset the unqualified slot and never add any info to it
2036       # to avoid erroneous type binding. If this happens the users
2037       # only choice will be to fully qualify his column name
2038
2039       if (exists $bind_attrs->{$col}) {
2040         $bind_attrs->{$col} = {};
2041       }
2042       else {
2043         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2044       }
2045     }
2046   }
2047
2048   # Sanity check the attributes (SQLMaker does it too, but
2049   # in case of a software_limit we'll never reach there)
2050   if (defined $attrs->{offset}) {
2051     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2052       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2053   }
2054   $attrs->{offset} ||= 0;
2055
2056   if (defined $attrs->{rows}) {
2057     $self->throw_exception("The rows attribute must be a positive integer if present")
2058       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2059   }
2060   elsif ($attrs->{offset}) {
2061     # MySQL actually recommends this approach.  I cringe.
2062     $attrs->{rows} = $sql_maker->__max_int;
2063   }
2064
2065   my @limit;
2066
2067   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2068   # storage, unless software limit was requested
2069   if (
2070     #limited has_many
2071     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2072        ||
2073     # grouped prefetch (to satisfy group_by == select)
2074     ( $attrs->{group_by}
2075         &&
2076       @{$attrs->{group_by}}
2077         &&
2078       $attrs->{_prefetch_select}
2079         &&
2080       @{$attrs->{_prefetch_select}}
2081     )
2082   ) {
2083     ($ident, $select, $where, $attrs)
2084       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2085   }
2086   elsif (! $attrs->{software_limit} ) {
2087     push @limit, $attrs->{rows}, $attrs->{offset};
2088   }
2089
2090   # try to simplify the joinmap further (prune unreferenced type-single joins)
2091   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2092
2093 ###
2094   # This would be the point to deflate anything found in $where
2095   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2096   # expect a row object. And all we have is a resultsource (it is trivial
2097   # to extract deflator coderefs via $alias2source above).
2098   #
2099   # I don't see a way forward other than changing the way deflators are
2100   # invoked, and that's just bad...
2101 ###
2102
2103   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2104 }
2105
2106 # Returns a counting SELECT for a simple count
2107 # query. Abstracted so that a storage could override
2108 # this to { count => 'firstcol' } or whatever makes
2109 # sense as a performance optimization
2110 sub _count_select {
2111   #my ($self, $source, $rs_attrs) = @_;
2112   return { count => '*' };
2113 }
2114
2115
2116 sub source_bind_attributes {
2117   my ($self, $source) = @_;
2118
2119   my $bind_attributes;
2120
2121   my $colinfo = $source->columns_info;
2122
2123   for my $col (keys %$colinfo) {
2124     if (my $dt = $colinfo->{$col}{data_type} ) {
2125       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2126     }
2127   }
2128
2129   return $bind_attributes;
2130 }
2131
2132 =head2 select
2133
2134 =over 4
2135
2136 =item Arguments: $ident, $select, $condition, $attrs
2137
2138 =back
2139
2140 Handle a SQL select statement.
2141
2142 =cut
2143
2144 sub select {
2145   my $self = shift;
2146   my ($ident, $select, $condition, $attrs) = @_;
2147   return $self->cursor_class->new($self, \@_, $attrs);
2148 }
2149
2150 sub select_single {
2151   my $self = shift;
2152   my ($rv, $sth, @bind) = $self->_select(@_);
2153   my @row = $sth->fetchrow_array;
2154   my @nextrow = $sth->fetchrow_array if @row;
2155   if(@row && @nextrow) {
2156     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2157   }
2158   # Need to call finish() to work round broken DBDs
2159   $sth->finish();
2160   return @row;
2161 }
2162
2163 =head2 sql_limit_dialect
2164
2165 This is an accessor for the default SQL limit dialect used by a particular
2166 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2167 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2168 see L<DBIx::Class::SQLMaker::LimitDialects>.
2169
2170 =head2 sth
2171
2172 =over 4
2173
2174 =item Arguments: $sql
2175
2176 =back
2177
2178 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2179
2180 =cut
2181
2182 sub _dbh_sth {
2183   my ($self, $dbh, $sql) = @_;
2184
2185   # 3 is the if_active parameter which avoids active sth re-use
2186   my $sth = $self->disable_sth_caching
2187     ? $dbh->prepare($sql)
2188     : $dbh->prepare_cached($sql, {}, 3);
2189
2190   # XXX You would think RaiseError would make this impossible,
2191   #  but apparently that's not true :(
2192   $self->throw_exception($dbh->errstr) if !$sth;
2193
2194   $sth;
2195 }
2196
2197 sub sth {
2198   my ($self, $sql) = @_;
2199   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2200 }
2201
2202 sub _dbh_columns_info_for {
2203   my ($self, $dbh, $table) = @_;
2204
2205   if ($dbh->can('column_info')) {
2206     my %result;
2207     my $caught;
2208     try {
2209       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2210       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2211       $sth->execute();
2212       while ( my $info = $sth->fetchrow_hashref() ){
2213         my %column_info;
2214         $column_info{data_type}   = $info->{TYPE_NAME};
2215         $column_info{size}      = $info->{COLUMN_SIZE};
2216         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2217         $column_info{default_value} = $info->{COLUMN_DEF};
2218         my $col_name = $info->{COLUMN_NAME};
2219         $col_name =~ s/^\"(.*)\"$/$1/;
2220
2221         $result{$col_name} = \%column_info;
2222       }
2223     } catch {
2224       $caught = 1;
2225     };
2226     return \%result if !$caught && scalar keys %result;
2227   }
2228
2229   my %result;
2230   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2231   $sth->execute;
2232   my @columns = @{$sth->{NAME_lc}};
2233   for my $i ( 0 .. $#columns ){
2234     my %column_info;
2235     $column_info{data_type} = $sth->{TYPE}->[$i];
2236     $column_info{size} = $sth->{PRECISION}->[$i];
2237     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2238
2239     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2240       $column_info{data_type} = $1;
2241       $column_info{size}    = $2;
2242     }
2243
2244     $result{$columns[$i]} = \%column_info;
2245   }
2246   $sth->finish;
2247
2248   foreach my $col (keys %result) {
2249     my $colinfo = $result{$col};
2250     my $type_num = $colinfo->{data_type};
2251     my $type_name;
2252     if(defined $type_num && $dbh->can('type_info')) {
2253       my $type_info = $dbh->type_info($type_num);
2254       $type_name = $type_info->{TYPE_NAME} if $type_info;
2255       $colinfo->{data_type} = $type_name if $type_name;
2256     }
2257   }
2258
2259   return \%result;
2260 }
2261
2262 sub columns_info_for {
2263   my ($self, $table) = @_;
2264   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2265 }
2266
2267 =head2 last_insert_id
2268
2269 Return the row id of the last insert.
2270
2271 =cut
2272
2273 sub _dbh_last_insert_id {
2274     my ($self, $dbh, $source, $col) = @_;
2275
2276     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2277
2278     return $id if defined $id;
2279
2280     my $class = ref $self;
2281     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2282 }
2283
2284 sub last_insert_id {
2285   my $self = shift;
2286   $self->_dbh_last_insert_id ($self->_dbh, @_);
2287 }
2288
2289 =head2 _native_data_type
2290
2291 =over 4
2292
2293 =item Arguments: $type_name
2294
2295 =back
2296
2297 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2298 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2299 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2300
2301 The default implementation returns C<undef>, implement in your Storage driver if
2302 you need this functionality.
2303
2304 Should map types from other databases to the native RDBMS type, for example
2305 C<VARCHAR2> to C<VARCHAR>.
2306
2307 Types with modifiers should map to the underlying data type. For example,
2308 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2309
2310 Composite types should map to the container type, for example
2311 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2312
2313 =cut
2314
2315 sub _native_data_type {
2316   #my ($self, $data_type) = @_;
2317   return undef
2318 }
2319
2320 # Check if placeholders are supported at all
2321 sub _determine_supports_placeholders {
2322   my $self = shift;
2323   my $dbh  = $self->_get_dbh;
2324
2325   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2326   # but it is inaccurate more often than not
2327   return try {
2328     local $dbh->{PrintError} = 0;
2329     local $dbh->{RaiseError} = 1;
2330     $dbh->do('select ?', {}, 1);
2331     1;
2332   }
2333   catch {
2334     0;
2335   };
2336 }
2337
2338 # Check if placeholders bound to non-string types throw exceptions
2339 #
2340 sub _determine_supports_typeless_placeholders {
2341   my $self = shift;
2342   my $dbh  = $self->_get_dbh;
2343
2344   return try {
2345     local $dbh->{PrintError} = 0;
2346     local $dbh->{RaiseError} = 1;
2347     # this specifically tests a bind that is NOT a string
2348     $dbh->do('select 1 where 1 = ?', {}, 1);
2349     1;
2350   }
2351   catch {
2352     0;
2353   };
2354 }
2355
2356 =head2 sqlt_type
2357
2358 Returns the database driver name.
2359
2360 =cut
2361
2362 sub sqlt_type {
2363   shift->_get_dbh->{Driver}->{Name};
2364 }
2365
2366 =head2 bind_attribute_by_data_type
2367
2368 Given a datatype from column info, returns a database specific bind
2369 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2370 let the database planner just handle it.
2371
2372 Generally only needed for special case column types, like bytea in postgres.
2373
2374 =cut
2375
2376 sub bind_attribute_by_data_type {
2377     return;
2378 }
2379
2380 =head2 is_datatype_numeric
2381
2382 Given a datatype from column_info, returns a boolean value indicating if
2383 the current RDBMS considers it a numeric value. This controls how
2384 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2385 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2386 be performed instead of the usual C<eq>.
2387
2388 =cut
2389
2390 sub is_datatype_numeric {
2391   my ($self, $dt) = @_;
2392
2393   return 0 unless $dt;
2394
2395   return $dt =~ /^ (?:
2396     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2397   ) $/ix;
2398 }
2399
2400
2401 =head2 create_ddl_dir
2402
2403 =over 4
2404
2405 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2406
2407 =back
2408
2409 Creates a SQL file based on the Schema, for each of the specified
2410 database engines in C<\@databases> in the given directory.
2411 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2412
2413 Given a previous version number, this will also create a file containing
2414 the ALTER TABLE statements to transform the previous schema into the
2415 current one. Note that these statements may contain C<DROP TABLE> or
2416 C<DROP COLUMN> statements that can potentially destroy data.
2417
2418 The file names are created using the C<ddl_filename> method below, please
2419 override this method in your schema if you would like a different file
2420 name format. For the ALTER file, the same format is used, replacing
2421 $version in the name with "$preversion-$version".
2422
2423 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2424 The most common value for this would be C<< { add_drop_table => 1 } >>
2425 to have the SQL produced include a C<DROP TABLE> statement for each table
2426 created. For quoting purposes supply C<quote_table_names> and
2427 C<quote_field_names>.
2428
2429 If no arguments are passed, then the following default values are assumed:
2430
2431 =over 4
2432
2433 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2434
2435 =item version    - $schema->schema_version
2436
2437 =item directory  - './'
2438
2439 =item preversion - <none>
2440
2441 =back
2442
2443 By default, C<\%sqlt_args> will have
2444
2445  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2446
2447 merged with the hash passed in. To disable any of those features, pass in a
2448 hashref like the following
2449
2450  { ignore_constraint_names => 0, # ... other options }
2451
2452
2453 WARNING: You are strongly advised to check all SQL files created, before applying
2454 them.
2455
2456 =cut
2457
2458 sub create_ddl_dir {
2459   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2460
2461   unless ($dir) {
2462     carp "No directory given, using ./\n";
2463     $dir = './';
2464   } else {
2465       -d $dir
2466         or
2467       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2468         or
2469       $self->throw_exception(
2470         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2471       );
2472   }
2473
2474   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2475
2476   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2477   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2478
2479   my $schema_version = $schema->schema_version || '1.x';
2480   $version ||= $schema_version;
2481
2482   $sqltargs = {
2483     add_drop_table => 1,
2484     ignore_constraint_names => 1,
2485     ignore_index_names => 1,
2486     %{$sqltargs || {}}
2487   };
2488
2489   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2490     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2491   }
2492
2493   my $sqlt = SQL::Translator->new( $sqltargs );
2494
2495   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2496   my $sqlt_schema = $sqlt->translate({ data => $schema })
2497     or $self->throw_exception ($sqlt->error);
2498
2499   foreach my $db (@$databases) {
2500     $sqlt->reset();
2501     $sqlt->{schema} = $sqlt_schema;
2502     $sqlt->producer($db);
2503
2504     my $file;
2505     my $filename = $schema->ddl_filename($db, $version, $dir);
2506     if (-e $filename && ($version eq $schema_version )) {
2507       # if we are dumping the current version, overwrite the DDL
2508       carp "Overwriting existing DDL file - $filename";
2509       unlink($filename);
2510     }
2511
2512     my $output = $sqlt->translate;
2513     if(!$output) {
2514       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2515       next;
2516     }
2517     if(!open($file, ">$filename")) {
2518       $self->throw_exception("Can't open $filename for writing ($!)");
2519       next;
2520     }
2521     print $file $output;
2522     close($file);
2523
2524     next unless ($preversion);
2525
2526     require SQL::Translator::Diff;
2527
2528     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2529     if(!-e $prefilename) {
2530       carp("No previous schema file found ($prefilename)");
2531       next;
2532     }
2533
2534     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2535     if(-e $difffile) {
2536       carp("Overwriting existing diff file - $difffile");
2537       unlink($difffile);
2538     }
2539
2540     my $source_schema;
2541     {
2542       my $t = SQL::Translator->new($sqltargs);
2543       $t->debug( 0 );
2544       $t->trace( 0 );
2545
2546       $t->parser( $db )
2547         or $self->throw_exception ($t->error);
2548
2549       my $out = $t->translate( $prefilename )
2550         or $self->throw_exception ($t->error);
2551
2552       $source_schema = $t->schema;
2553
2554       $source_schema->name( $prefilename )
2555         unless ( $source_schema->name );
2556     }
2557
2558     # The "new" style of producers have sane normalization and can support
2559     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2560     # And we have to diff parsed SQL against parsed SQL.
2561     my $dest_schema = $sqlt_schema;
2562
2563     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2564       my $t = SQL::Translator->new($sqltargs);
2565       $t->debug( 0 );
2566       $t->trace( 0 );
2567
2568       $t->parser( $db )
2569         or $self->throw_exception ($t->error);
2570
2571       my $out = $t->translate( $filename )
2572         or $self->throw_exception ($t->error);
2573
2574       $dest_schema = $t->schema;
2575
2576       $dest_schema->name( $filename )
2577         unless $dest_schema->name;
2578     }
2579
2580     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2581                                                   $dest_schema,   $db,
2582                                                   $sqltargs
2583                                                  );
2584     if(!open $file, ">$difffile") {
2585       $self->throw_exception("Can't write to $difffile ($!)");
2586       next;
2587     }
2588     print $file $diff;
2589     close($file);
2590   }
2591 }
2592
2593 =head2 deployment_statements
2594
2595 =over 4
2596
2597 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2598
2599 =back
2600
2601 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2602
2603 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2604 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2605
2606 C<$directory> is used to return statements from files in a previously created
2607 L</create_ddl_dir> directory and is optional. The filenames are constructed
2608 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2609
2610 If no C<$directory> is specified then the statements are constructed on the
2611 fly using L<SQL::Translator> and C<$version> is ignored.
2612
2613 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2614
2615 =cut
2616
2617 sub deployment_statements {
2618   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2619   $type ||= $self->sqlt_type;
2620   $version ||= $schema->schema_version || '1.x';
2621   $dir ||= './';
2622   my $filename = $schema->ddl_filename($type, $version, $dir);
2623   if(-f $filename)
2624   {
2625       my $file;
2626       open($file, "<$filename")
2627         or $self->throw_exception("Can't open $filename ($!)");
2628       my @rows = <$file>;
2629       close($file);
2630       return join('', @rows);
2631   }
2632
2633   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2634     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2635   }
2636
2637   # sources needs to be a parser arg, but for simplicty allow at top level
2638   # coming in
2639   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2640       if exists $sqltargs->{sources};
2641
2642   my $tr = SQL::Translator->new(
2643     producer => "SQL::Translator::Producer::${type}",
2644     %$sqltargs,
2645     parser => 'SQL::Translator::Parser::DBIx::Class',
2646     data => $schema,
2647   );
2648
2649   my @ret;
2650   my $wa = wantarray;
2651   if ($wa) {
2652     @ret = $tr->translate;
2653   }
2654   else {
2655     $ret[0] = $tr->translate;
2656   }
2657
2658   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2659     unless (@ret && defined $ret[0]);
2660
2661   return $wa ? @ret : $ret[0];
2662 }
2663
2664 sub deploy {
2665   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2666   my $deploy = sub {
2667     my $line = shift;
2668     return if($line =~ /^--/);
2669     return if(!$line);
2670     # next if($line =~ /^DROP/m);
2671     return if($line =~ /^BEGIN TRANSACTION/m);
2672     return if($line =~ /^COMMIT/m);
2673     return if $line =~ /^\s+$/; # skip whitespace only
2674     $self->_query_start($line);
2675     try {
2676       # do a dbh_do cycle here, as we need some error checking in
2677       # place (even though we will ignore errors)
2678       $self->dbh_do (sub { $_[1]->do($line) });
2679     } catch {
2680       carp qq{$_ (running "${line}")};
2681     };
2682     $self->_query_end($line);
2683   };
2684   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2685   if (@statements > 1) {
2686     foreach my $statement (@statements) {
2687       $deploy->( $statement );
2688     }
2689   }
2690   elsif (@statements == 1) {
2691     foreach my $line ( split(";\n", $statements[0])) {
2692       $deploy->( $line );
2693     }
2694   }
2695 }
2696
2697 =head2 datetime_parser
2698
2699 Returns the datetime parser class
2700
2701 =cut
2702
2703 sub datetime_parser {
2704   my $self = shift;
2705   return $self->{datetime_parser} ||= do {
2706     $self->build_datetime_parser(@_);
2707   };
2708 }
2709
2710 =head2 datetime_parser_type
2711
2712 Defines (returns) the datetime parser class - currently hardwired to
2713 L<DateTime::Format::MySQL>
2714
2715 =cut
2716
2717 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2718
2719 =head2 build_datetime_parser
2720
2721 See L</datetime_parser>
2722
2723 =cut
2724
2725 sub build_datetime_parser {
2726   my $self = shift;
2727   my $type = $self->datetime_parser_type(@_);
2728   $self->ensure_class_loaded ($type);
2729   return $type;
2730 }
2731
2732
2733 =head2 is_replicating
2734
2735 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2736 replicate from a master database.  Default is undef, which is the result
2737 returned by databases that don't support replication.
2738
2739 =cut
2740
2741 sub is_replicating {
2742     return;
2743
2744 }
2745
2746 =head2 lag_behind_master
2747
2748 Returns a number that represents a certain amount of lag behind a master db
2749 when a given storage is replicating.  The number is database dependent, but
2750 starts at zero and increases with the amount of lag. Default in undef
2751
2752 =cut
2753
2754 sub lag_behind_master {
2755     return;
2756 }
2757
2758 =head2 relname_to_table_alias
2759
2760 =over 4
2761
2762 =item Arguments: $relname, $join_count
2763
2764 =back
2765
2766 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2767 queries.
2768
2769 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2770 way these aliases are named.
2771
2772 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2773 otherwise C<"$relname">.
2774
2775 =cut
2776
2777 sub relname_to_table_alias {
2778   my ($self, $relname, $join_count) = @_;
2779
2780   my $alias = ($join_count && $join_count > 1 ?
2781     join('_', $relname, $join_count) : $relname);
2782
2783   return $alias;
2784 }
2785
2786 1;
2787
2788 =head1 USAGE NOTES
2789
2790 =head2 DBIx::Class and AutoCommit
2791
2792 DBIx::Class can do some wonderful magic with handling exceptions,
2793 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2794 (the default) combined with C<txn_do> for transaction support.
2795
2796 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2797 in an assumed transaction between commits, and you're telling us you'd
2798 like to manage that manually.  A lot of the magic protections offered by
2799 this module will go away.  We can't protect you from exceptions due to database
2800 disconnects because we don't know anything about how to restart your
2801 transactions.  You're on your own for handling all sorts of exceptional
2802 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2803 be with raw DBI.
2804
2805
2806 =head1 AUTHORS
2807
2808 Matt S. Trout <mst@shadowcatsystems.co.uk>
2809
2810 Andy Grundman <andy@hybridized.org>
2811
2812 =head1 LICENSE
2813
2814 You may distribute this code under the same terms as Perl itself.
2815
2816 =cut