Stop showing __BULK_INSERT__ during populate() without bindvals
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders join_optimizer/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
60
61 # on by default, not strictly a capability (pending rewrite)
62 __PACKAGE__->_use_join_optimizer (1);
63 sub _determine_supports_join_optimizer { 1 };
64
65 # Each of these methods need _determine_driver called before itself
66 # in order to function reliably. This is a purely DRY optimization
67 #
68 # get_(use)_dbms_capability need to be called on the correct Storage
69 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
70 # _determine_supports_X which obv. needs a correct driver as well
71 my @rdbms_specific_methods = qw/
72   deployment_statements
73   sqlt_type
74   sql_maker
75   build_datetime_parser
76   datetime_parser_type
77
78   insert
79   insert_bulk
80   update
81   delete
82   select
83   select_single
84
85   get_use_dbms_capability
86   get_dbms_capability
87
88   _server_info
89   _get_server_version
90 /;
91
92 for my $meth (@rdbms_specific_methods) {
93
94   my $orig = __PACKAGE__->can ($meth)
95     or die "$meth is not a ::Storage::DBI method!";
96
97   no strict qw/refs/;
98   no warnings qw/redefine/;
99   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
100     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
101       $_[0]->_determine_driver;
102
103       # This for some reason crashes and burns on perl 5.8.1
104       # IFF the method ends up throwing an exception
105       #goto $_[0]->can ($meth);
106
107       my $cref = $_[0]->can ($meth);
108       goto $cref;
109     }
110     goto $orig;
111   };
112 }
113
114
115 =head1 NAME
116
117 DBIx::Class::Storage::DBI - DBI storage handler
118
119 =head1 SYNOPSIS
120
121   my $schema = MySchema->connect('dbi:SQLite:my.db');
122
123   $schema->storage->debug(1);
124
125   my @stuff = $schema->storage->dbh_do(
126     sub {
127       my ($storage, $dbh, @args) = @_;
128       $dbh->do("DROP TABLE authors");
129     },
130     @column_list
131   );
132
133   $schema->resultset('Book')->search({
134      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
135   });
136
137 =head1 DESCRIPTION
138
139 This class represents the connection to an RDBMS via L<DBI>.  See
140 L<DBIx::Class::Storage> for general information.  This pod only
141 documents DBI-specific methods and behaviors.
142
143 =head1 METHODS
144
145 =cut
146
147 sub new {
148   my $new = shift->next::method(@_);
149
150   $new->transaction_depth(0);
151   $new->_sql_maker_opts({});
152   $new->_dbh_details({});
153   $new->{savepoints} = [];
154   $new->{_in_dbh_do} = 0;
155   $new->{_dbh_gen} = 0;
156
157   # read below to see what this does
158   $new->_arm_global_destructor;
159
160   $new;
161 }
162
163 # This is hack to work around perl shooting stuff in random
164 # order on exit(). If we do not walk the remaining storage
165 # objects in an END block, there is a *small but real* chance
166 # of a fork()ed child to kill the parent's shared DBI handle,
167 # *before perl reaches the DESTROY in this package*
168 # Yes, it is ugly and effective.
169 {
170   my %seek_and_destroy;
171
172   sub _arm_global_destructor {
173     my $self = shift;
174     my $key = refaddr ($self);
175     $seek_and_destroy{$key} = $self;
176     weaken ($seek_and_destroy{$key});
177   }
178
179   END {
180     local $?; # just in case the DBI destructor changes it somehow
181
182     # destroy just the object if not native to this process/thread
183     $_->_preserve_foreign_dbh for (grep
184       { defined $_ }
185       values %seek_and_destroy
186     );
187   }
188 }
189
190 sub DESTROY {
191   my $self = shift;
192
193   # some databases spew warnings on implicit disconnect
194   local $SIG{__WARN__} = sub {};
195   $self->_dbh(undef);
196 }
197
198 sub _preserve_foreign_dbh {
199   my $self = shift;
200
201   return unless $self->_dbh;
202
203   $self->_verify_tid;
204
205   return unless $self->_dbh;
206
207   $self->_verify_pid;
208
209 }
210
211 # handle pid changes correctly - do not destroy parent's connection
212 sub _verify_pid {
213   my $self = shift;
214
215   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
216
217   $self->_dbh->{InactiveDestroy} = 1;
218   $self->_dbh(undef);
219   $self->{_dbh_gen}++;
220
221   return;
222 }
223
224 # very similar to above, but seems to FAIL if I set InactiveDestroy
225 sub _verify_tid {
226   my $self = shift;
227
228   if ( ! defined $self->_conn_tid ) {
229     return; # no threads
230   }
231   elsif ( $self->_conn_tid == threads->tid ) {
232     return; # same thread
233   }
234
235   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
236   $self->_dbh(undef);
237   $self->{_dbh_gen}++;
238
239   return;
240 }
241
242
243 =head2 connect_info
244
245 This method is normally called by L<DBIx::Class::Schema/connection>, which
246 encapsulates its argument list in an arrayref before passing them here.
247
248 The argument list may contain:
249
250 =over
251
252 =item *
253
254 The same 4-element argument set one would normally pass to
255 L<DBI/connect>, optionally followed by
256 L<extra attributes|/DBIx::Class specific connection attributes>
257 recognized by DBIx::Class:
258
259   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
260
261 =item *
262
263 A single code reference which returns a connected
264 L<DBI database handle|DBI/connect> optionally followed by
265 L<extra attributes|/DBIx::Class specific connection attributes> recognized
266 by DBIx::Class:
267
268   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
269
270 =item *
271
272 A single hashref with all the attributes and the dsn/user/password
273 mixed together:
274
275   $connect_info_args = [{
276     dsn => $dsn,
277     user => $user,
278     password => $pass,
279     %dbi_attributes,
280     %extra_attributes,
281   }];
282
283   $connect_info_args = [{
284     dbh_maker => sub { DBI->connect (...) },
285     %dbi_attributes,
286     %extra_attributes,
287   }];
288
289 This is particularly useful for L<Catalyst> based applications, allowing the
290 following config (L<Config::General> style):
291
292   <Model::DB>
293     schema_class   App::DB
294     <connect_info>
295       dsn          dbi:mysql:database=test
296       user         testuser
297       password     TestPass
298       AutoCommit   1
299     </connect_info>
300   </Model::DB>
301
302 The C<dsn>/C<user>/C<password> combination can be substituted by the
303 C<dbh_maker> key whose value is a coderef that returns a connected
304 L<DBI database handle|DBI/connect>
305
306 =back
307
308 Please note that the L<DBI> docs recommend that you always explicitly
309 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
310 recommends that it be set to I<1>, and that you perform transactions
311 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
312 to I<1> if you do not do explicitly set it to zero.  This is the default
313 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
314
315 =head3 DBIx::Class specific connection attributes
316
317 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
318 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
319 the following connection options. These options can be mixed in with your other
320 L<DBI> connection attributes, or placed in a separate hashref
321 (C<\%extra_attributes>) as shown above.
322
323 Every time C<connect_info> is invoked, any previous settings for
324 these options will be cleared before setting the new ones, regardless of
325 whether any options are specified in the new C<connect_info>.
326
327
328 =over
329
330 =item on_connect_do
331
332 Specifies things to do immediately after connecting or re-connecting to
333 the database.  Its value may contain:
334
335 =over
336
337 =item a scalar
338
339 This contains one SQL statement to execute.
340
341 =item an array reference
342
343 This contains SQL statements to execute in order.  Each element contains
344 a string or a code reference that returns a string.
345
346 =item a code reference
347
348 This contains some code to execute.  Unlike code references within an
349 array reference, its return value is ignored.
350
351 =back
352
353 =item on_disconnect_do
354
355 Takes arguments in the same form as L</on_connect_do> and executes them
356 immediately before disconnecting from the database.
357
358 Note, this only runs if you explicitly call L</disconnect> on the
359 storage object.
360
361 =item on_connect_call
362
363 A more generalized form of L</on_connect_do> that calls the specified
364 C<connect_call_METHOD> methods in your storage driver.
365
366   on_connect_do => 'select 1'
367
368 is equivalent to:
369
370   on_connect_call => [ [ do_sql => 'select 1' ] ]
371
372 Its values may contain:
373
374 =over
375
376 =item a scalar
377
378 Will call the C<connect_call_METHOD> method.
379
380 =item a code reference
381
382 Will execute C<< $code->($storage) >>
383
384 =item an array reference
385
386 Each value can be a method name or code reference.
387
388 =item an array of arrays
389
390 For each array, the first item is taken to be the C<connect_call_> method name
391 or code reference, and the rest are parameters to it.
392
393 =back
394
395 Some predefined storage methods you may use:
396
397 =over
398
399 =item do_sql
400
401 Executes a SQL string or a code reference that returns a SQL string. This is
402 what L</on_connect_do> and L</on_disconnect_do> use.
403
404 It can take:
405
406 =over
407
408 =item a scalar
409
410 Will execute the scalar as SQL.
411
412 =item an arrayref
413
414 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
415 attributes hashref and bind values.
416
417 =item a code reference
418
419 Will execute C<< $code->($storage) >> and execute the return array refs as
420 above.
421
422 =back
423
424 =item datetime_setup
425
426 Execute any statements necessary to initialize the database session to return
427 and accept datetime/timestamp values used with
428 L<DBIx::Class::InflateColumn::DateTime>.
429
430 Only necessary for some databases, see your specific storage driver for
431 implementation details.
432
433 =back
434
435 =item on_disconnect_call
436
437 Takes arguments in the same form as L</on_connect_call> and executes them
438 immediately before disconnecting from the database.
439
440 Calls the C<disconnect_call_METHOD> methods as opposed to the
441 C<connect_call_METHOD> methods called by L</on_connect_call>.
442
443 Note, this only runs if you explicitly call L</disconnect> on the
444 storage object.
445
446 =item disable_sth_caching
447
448 If set to a true value, this option will disable the caching of
449 statement handles via L<DBI/prepare_cached>.
450
451 =item limit_dialect
452
453 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
454 default L</sql_limit_dialect> setting of the storage (if any). For a list
455 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
456
457 =item quote_char
458
459 Specifies what characters to use to quote table and column names.
460
461 C<quote_char> expects either a single character, in which case is it
462 is placed on either side of the table/column name, or an arrayref of length
463 2 in which case the table/column name is placed between the elements.
464
465 For example under MySQL you should use C<< quote_char => '`' >>, and for
466 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
467
468 =item name_sep
469
470 This parameter is only useful in conjunction with C<quote_char>, and is used to
471 specify the character that separates elements (schemas, tables, columns) from
472 each other. If unspecified it defaults to the most commonly used C<.>.
473
474 =item unsafe
475
476 This Storage driver normally installs its own C<HandleError>, sets
477 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
478 all database handles, including those supplied by a coderef.  It does this
479 so that it can have consistent and useful error behavior.
480
481 If you set this option to a true value, Storage will not do its usual
482 modifications to the database handle's attributes, and instead relies on
483 the settings in your connect_info DBI options (or the values you set in
484 your connection coderef, in the case that you are connecting via coderef).
485
486 Note that your custom settings can cause Storage to malfunction,
487 especially if you set a C<HandleError> handler that suppresses exceptions
488 and/or disable C<RaiseError>.
489
490 =item auto_savepoint
491
492 If this option is true, L<DBIx::Class> will use savepoints when nesting
493 transactions, making it possible to recover from failure in the inner
494 transaction without having to abort all outer transactions.
495
496 =item cursor_class
497
498 Use this argument to supply a cursor class other than the default
499 L<DBIx::Class::Storage::DBI::Cursor>.
500
501 =back
502
503 Some real-life examples of arguments to L</connect_info> and
504 L<DBIx::Class::Schema/connect>
505
506   # Simple SQLite connection
507   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
508
509   # Connect via subref
510   ->connect_info([ sub { DBI->connect(...) } ]);
511
512   # Connect via subref in hashref
513   ->connect_info([{
514     dbh_maker => sub { DBI->connect(...) },
515     on_connect_do => 'alter session ...',
516   }]);
517
518   # A bit more complicated
519   ->connect_info(
520     [
521       'dbi:Pg:dbname=foo',
522       'postgres',
523       'my_pg_password',
524       { AutoCommit => 1 },
525       { quote_char => q{"} },
526     ]
527   );
528
529   # Equivalent to the previous example
530   ->connect_info(
531     [
532       'dbi:Pg:dbname=foo',
533       'postgres',
534       'my_pg_password',
535       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
536     ]
537   );
538
539   # Same, but with hashref as argument
540   # See parse_connect_info for explanation
541   ->connect_info(
542     [{
543       dsn         => 'dbi:Pg:dbname=foo',
544       user        => 'postgres',
545       password    => 'my_pg_password',
546       AutoCommit  => 1,
547       quote_char  => q{"},
548       name_sep    => q{.},
549     }]
550   );
551
552   # Subref + DBIx::Class-specific connection options
553   ->connect_info(
554     [
555       sub { DBI->connect(...) },
556       {
557           quote_char => q{`},
558           name_sep => q{@},
559           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
560           disable_sth_caching => 1,
561       },
562     ]
563   );
564
565
566
567 =cut
568
569 sub connect_info {
570   my ($self, $info) = @_;
571
572   return $self->_connect_info if !$info;
573
574   $self->_connect_info($info); # copy for _connect_info
575
576   $info = $self->_normalize_connect_info($info)
577     if ref $info eq 'ARRAY';
578
579   for my $storage_opt (keys %{ $info->{storage_options} }) {
580     my $value = $info->{storage_options}{$storage_opt};
581
582     $self->$storage_opt($value);
583   }
584
585   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
586   #  the new set of options
587   $self->_sql_maker(undef);
588   $self->_sql_maker_opts({});
589
590   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
591     my $value = $info->{sql_maker_options}{$sql_maker_opt};
592
593     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
594   }
595
596   my %attrs = (
597     %{ $self->_default_dbi_connect_attributes || {} },
598     %{ $info->{attributes} || {} },
599   );
600
601   my @args = @{ $info->{arguments} };
602
603   $self->_dbi_connect_info([@args,
604     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
605
606   # FIXME - dirty:
607   # save attributes them in a separate accessor so they are always
608   # introspectable, even in case of a CODE $dbhmaker
609   $self->_dbic_connect_attributes (\%attrs);
610
611   return $self->_connect_info;
612 }
613
614 sub _normalize_connect_info {
615   my ($self, $info_arg) = @_;
616   my %info;
617
618   my @args = @$info_arg;  # take a shallow copy for further mutilation
619
620   # combine/pre-parse arguments depending on invocation style
621
622   my %attrs;
623   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
624     %attrs = %{ $args[1] || {} };
625     @args = $args[0];
626   }
627   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
628     %attrs = %{$args[0]};
629     @args = ();
630     if (my $code = delete $attrs{dbh_maker}) {
631       @args = $code;
632
633       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
634       if (@ignored) {
635         carp sprintf (
636             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
637           . "to the result of 'dbh_maker'",
638
639           join (', ', map { "'$_'" } (@ignored) ),
640         );
641       }
642     }
643     else {
644       @args = delete @attrs{qw/dsn user password/};
645     }
646   }
647   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
648     %attrs = (
649       % { $args[3] || {} },
650       % { $args[4] || {} },
651     );
652     @args = @args[0,1,2];
653   }
654
655   $info{arguments} = \@args;
656
657   my @storage_opts = grep exists $attrs{$_},
658     @storage_options, 'cursor_class';
659
660   @{ $info{storage_options} }{@storage_opts} =
661     delete @attrs{@storage_opts} if @storage_opts;
662
663   my @sql_maker_opts = grep exists $attrs{$_},
664     qw/limit_dialect quote_char name_sep/;
665
666   @{ $info{sql_maker_options} }{@sql_maker_opts} =
667     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
668
669   $info{attributes} = \%attrs if %attrs;
670
671   return \%info;
672 }
673
674 sub _default_dbi_connect_attributes {
675   return {
676     AutoCommit => 1,
677     RaiseError => 1,
678     PrintError => 0,
679   };
680 }
681
682 =head2 on_connect_do
683
684 This method is deprecated in favour of setting via L</connect_info>.
685
686 =cut
687
688 =head2 on_disconnect_do
689
690 This method is deprecated in favour of setting via L</connect_info>.
691
692 =cut
693
694 sub _parse_connect_do {
695   my ($self, $type) = @_;
696
697   my $val = $self->$type;
698   return () if not defined $val;
699
700   my @res;
701
702   if (not ref($val)) {
703     push @res, [ 'do_sql', $val ];
704   } elsif (ref($val) eq 'CODE') {
705     push @res, $val;
706   } elsif (ref($val) eq 'ARRAY') {
707     push @res, map { [ 'do_sql', $_ ] } @$val;
708   } else {
709     $self->throw_exception("Invalid type for $type: ".ref($val));
710   }
711
712   return \@res;
713 }
714
715 =head2 dbh_do
716
717 Arguments: ($subref | $method_name), @extra_coderef_args?
718
719 Execute the given $subref or $method_name using the new exception-based
720 connection management.
721
722 The first two arguments will be the storage object that C<dbh_do> was called
723 on and a database handle to use.  Any additional arguments will be passed
724 verbatim to the called subref as arguments 2 and onwards.
725
726 Using this (instead of $self->_dbh or $self->dbh) ensures correct
727 exception handling and reconnection (or failover in future subclasses).
728
729 Your subref should have no side-effects outside of the database, as
730 there is the potential for your subref to be partially double-executed
731 if the database connection was stale/dysfunctional.
732
733 Example:
734
735   my @stuff = $schema->storage->dbh_do(
736     sub {
737       my ($storage, $dbh, @cols) = @_;
738       my $cols = join(q{, }, @cols);
739       $dbh->selectrow_array("SELECT $cols FROM foo");
740     },
741     @column_list
742   );
743
744 =cut
745
746 sub dbh_do {
747   my $self = shift;
748   my $code = shift;
749
750   my $dbh = $self->_get_dbh;
751
752   return $self->$code($dbh, @_)
753     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
754
755   local $self->{_in_dbh_do} = 1;
756
757   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
758   my $args = \@_;
759   return try {
760     $self->$code ($dbh, @$args);
761   } catch {
762     $self->throw_exception($_) if $self->connected;
763
764     # We were not connected - reconnect and retry, but let any
765     #  exception fall right through this time
766     carp "Retrying $code after catching disconnected exception: $_"
767       if $ENV{DBIC_DBIRETRY_DEBUG};
768
769     $self->_populate_dbh;
770     $self->$code($self->_dbh, @$args);
771   };
772 }
773
774 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
775 # It also informs dbh_do to bypass itself while under the direction of txn_do,
776 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
777 sub txn_do {
778   my $self = shift;
779   my $coderef = shift;
780
781   ref $coderef eq 'CODE' or $self->throw_exception
782     ('$coderef must be a CODE reference');
783
784   local $self->{_in_dbh_do} = 1;
785
786   my @result;
787   my $want_array = wantarray;
788
789   my $tried = 0;
790   while(1) {
791     my $exception;
792
793     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
794     my $args = \@_;
795
796     try {
797       $self->txn_begin;
798       my $txn_start_depth = $self->transaction_depth;
799       if($want_array) {
800           @result = $coderef->(@$args);
801       }
802       elsif(defined $want_array) {
803           $result[0] = $coderef->(@$args);
804       }
805       else {
806           $coderef->(@$args);
807       }
808
809       my $delta_txn = $txn_start_depth - $self->transaction_depth;
810       if ($delta_txn == 0) {
811         $self->txn_commit;
812       }
813       elsif ($delta_txn != 1) {
814         # an off-by-one would mean we fired a rollback
815         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
816       }
817     } catch {
818       $exception = $_;
819     };
820
821     if(! defined $exception) { return $want_array ? @result : $result[0] }
822
823     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
824       my $rollback_exception;
825       try { $self->txn_rollback } catch { $rollback_exception = shift };
826       if(defined $rollback_exception) {
827         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
828         $self->throw_exception($exception)  # propagate nested rollback
829           if $rollback_exception =~ /$exception_class/;
830
831         $self->throw_exception(
832           "Transaction aborted: ${exception}. "
833           . "Rollback failed: ${rollback_exception}"
834         );
835       }
836       $self->throw_exception($exception)
837     }
838
839     # We were not connected, and was first try - reconnect and retry
840     # via the while loop
841     carp "Retrying $coderef after catching disconnected exception: $exception"
842       if $ENV{DBIC_TXNRETRY_DEBUG};
843     $self->_populate_dbh;
844   }
845 }
846
847 =head2 disconnect
848
849 Our C<disconnect> method also performs a rollback first if the
850 database is not in C<AutoCommit> mode.
851
852 =cut
853
854 sub disconnect {
855   my ($self) = @_;
856
857   if( $self->_dbh ) {
858     my @actions;
859
860     push @actions, ( $self->on_disconnect_call || () );
861     push @actions, $self->_parse_connect_do ('on_disconnect_do');
862
863     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
864
865     $self->_dbh_rollback unless $self->_dbh_autocommit;
866
867     %{ $self->_dbh->{CachedKids} } = ();
868     $self->_dbh->disconnect;
869     $self->_dbh(undef);
870     $self->{_dbh_gen}++;
871   }
872 }
873
874 =head2 with_deferred_fk_checks
875
876 =over 4
877
878 =item Arguments: C<$coderef>
879
880 =item Return Value: The return value of $coderef
881
882 =back
883
884 Storage specific method to run the code ref with FK checks deferred or
885 in MySQL's case disabled entirely.
886
887 =cut
888
889 # Storage subclasses should override this
890 sub with_deferred_fk_checks {
891   my ($self, $sub) = @_;
892   $sub->();
893 }
894
895 =head2 connected
896
897 =over
898
899 =item Arguments: none
900
901 =item Return Value: 1|0
902
903 =back
904
905 Verifies that the current database handle is active and ready to execute
906 an SQL statement (e.g. the connection did not get stale, server is still
907 answering, etc.) This method is used internally by L</dbh>.
908
909 =cut
910
911 sub connected {
912   my $self = shift;
913   return 0 unless $self->_seems_connected;
914
915   #be on the safe side
916   local $self->_dbh->{RaiseError} = 1;
917
918   return $self->_ping;
919 }
920
921 sub _seems_connected {
922   my $self = shift;
923
924   $self->_preserve_foreign_dbh;
925
926   my $dbh = $self->_dbh
927     or return 0;
928
929   return $dbh->FETCH('Active');
930 }
931
932 sub _ping {
933   my $self = shift;
934
935   my $dbh = $self->_dbh or return 0;
936
937   return $dbh->ping;
938 }
939
940 sub ensure_connected {
941   my ($self) = @_;
942
943   unless ($self->connected) {
944     $self->_populate_dbh;
945   }
946 }
947
948 =head2 dbh
949
950 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
951 is guaranteed to be healthy by implicitly calling L</connected>, and if
952 necessary performing a reconnection before returning. Keep in mind that this
953 is very B<expensive> on some database engines. Consider using L</dbh_do>
954 instead.
955
956 =cut
957
958 sub dbh {
959   my ($self) = @_;
960
961   if (not $self->_dbh) {
962     $self->_populate_dbh;
963   } else {
964     $self->ensure_connected;
965   }
966   return $self->_dbh;
967 }
968
969 # this is the internal "get dbh or connect (don't check)" method
970 sub _get_dbh {
971   my $self = shift;
972   $self->_preserve_foreign_dbh;
973   $self->_populate_dbh unless $self->_dbh;
974   return $self->_dbh;
975 }
976
977 sub sql_maker {
978   my ($self) = @_;
979   unless ($self->_sql_maker) {
980     my $sql_maker_class = $self->sql_maker_class;
981     $self->ensure_class_loaded ($sql_maker_class);
982
983     my %opts = %{$self->_sql_maker_opts||{}};
984     my $dialect =
985       $opts{limit_dialect}
986         ||
987       $self->sql_limit_dialect
988         ||
989       do {
990         my $s_class = (ref $self) || $self;
991         carp (
992           "Your storage class ($s_class) does not set sql_limit_dialect and you "
993         . 'have not supplied an explicit limit_dialect in your connection_info. '
994         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
995         . 'databases but can be (and often is) painfully slow.'
996         );
997
998         'GenericSubQ';
999       }
1000     ;
1001
1002     $self->_sql_maker($sql_maker_class->new(
1003       bindtype=>'columns',
1004       array_datatypes => 1,
1005       limit_dialect => $dialect,
1006       name_sep => '.',
1007       %opts,
1008     ));
1009   }
1010   return $self->_sql_maker;
1011 }
1012
1013 # nothing to do by default
1014 sub _rebless {}
1015 sub _init {}
1016
1017 sub _populate_dbh {
1018   my ($self) = @_;
1019
1020   my @info = @{$self->_dbi_connect_info || []};
1021   $self->_dbh(undef); # in case ->connected failed we might get sent here
1022   $self->_dbh_details({}); # reset everything we know
1023
1024   $self->_dbh($self->_connect(@info));
1025
1026   $self->_conn_pid($$);
1027   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1028
1029   $self->_determine_driver;
1030
1031   # Always set the transaction depth on connect, since
1032   #  there is no transaction in progress by definition
1033   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1034
1035   $self->_run_connection_actions unless $self->{_in_determine_driver};
1036 }
1037
1038 sub _run_connection_actions {
1039   my $self = shift;
1040   my @actions;
1041
1042   push @actions, ( $self->on_connect_call || () );
1043   push @actions, $self->_parse_connect_do ('on_connect_do');
1044
1045   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1046 }
1047
1048
1049
1050 sub set_use_dbms_capability {
1051   $_[0]->set_inherited ($_[1], $_[2]);
1052 }
1053
1054 sub get_use_dbms_capability {
1055   my ($self, $capname) = @_;
1056
1057   my $use = $self->get_inherited ($capname);
1058   return defined $use
1059     ? $use
1060     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1061   ;
1062 }
1063
1064 sub set_dbms_capability {
1065   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1066 }
1067
1068 sub get_dbms_capability {
1069   my ($self, $capname) = @_;
1070
1071   my $cap = $self->_dbh_details->{capability}{$capname};
1072
1073   unless (defined $cap) {
1074     if (my $meth = $self->can ("_determine$capname")) {
1075       $cap = $self->$meth ? 1 : 0;
1076     }
1077     else {
1078       $cap = 0;
1079     }
1080
1081     $self->set_dbms_capability ($capname, $cap);
1082   }
1083
1084   return $cap;
1085 }
1086
1087 sub _server_info {
1088   my $self = shift;
1089
1090   my $info;
1091   unless ($info = $self->_dbh_details->{info}) {
1092
1093     $info = {};
1094
1095     my $server_version = try { $self->_get_server_version };
1096
1097     if (defined $server_version) {
1098       $info->{dbms_version} = $server_version;
1099
1100       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1101       my @verparts = split (/\./, $numeric_version);
1102       if (
1103         @verparts
1104           &&
1105         $verparts[0] <= 999
1106       ) {
1107         # consider only up to 3 version parts, iff not more than 3 digits
1108         my @use_parts;
1109         while (@verparts && @use_parts < 3) {
1110           my $p = shift @verparts;
1111           last if $p > 999;
1112           push @use_parts, $p;
1113         }
1114         push @use_parts, 0 while @use_parts < 3;
1115
1116         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1117       }
1118     }
1119
1120     $self->_dbh_details->{info} = $info;
1121   }
1122
1123   return $info;
1124 }
1125
1126 sub _get_server_version {
1127   shift->_get_dbh->get_info(18);
1128 }
1129
1130 sub _determine_driver {
1131   my ($self) = @_;
1132
1133   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1134     my $started_connected = 0;
1135     local $self->{_in_determine_driver} = 1;
1136
1137     if (ref($self) eq __PACKAGE__) {
1138       my $driver;
1139       if ($self->_dbh) { # we are connected
1140         $driver = $self->_dbh->{Driver}{Name};
1141         $started_connected = 1;
1142       } else {
1143         # if connect_info is a CODEREF, we have no choice but to connect
1144         if (ref $self->_dbi_connect_info->[0] &&
1145             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1146           $self->_populate_dbh;
1147           $driver = $self->_dbh->{Driver}{Name};
1148         }
1149         else {
1150           # try to use dsn to not require being connected, the driver may still
1151           # force a connection in _rebless to determine version
1152           # (dsn may not be supplied at all if all we do is make a mock-schema)
1153           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1154           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1155           $driver ||= $ENV{DBI_DRIVER};
1156         }
1157       }
1158
1159       if ($driver) {
1160         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1161         if ($self->load_optional_class($storage_class)) {
1162           mro::set_mro($storage_class, 'c3');
1163           bless $self, $storage_class;
1164           $self->_rebless();
1165         }
1166       }
1167     }
1168
1169     $self->_driver_determined(1);
1170
1171     $self->_init; # run driver-specific initializations
1172
1173     $self->_run_connection_actions
1174         if !$started_connected && defined $self->_dbh;
1175   }
1176 }
1177
1178 sub _do_connection_actions {
1179   my $self          = shift;
1180   my $method_prefix = shift;
1181   my $call          = shift;
1182
1183   if (not ref($call)) {
1184     my $method = $method_prefix . $call;
1185     $self->$method(@_);
1186   } elsif (ref($call) eq 'CODE') {
1187     $self->$call(@_);
1188   } elsif (ref($call) eq 'ARRAY') {
1189     if (ref($call->[0]) ne 'ARRAY') {
1190       $self->_do_connection_actions($method_prefix, $_) for @$call;
1191     } else {
1192       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1193     }
1194   } else {
1195     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1196   }
1197
1198   return $self;
1199 }
1200
1201 sub connect_call_do_sql {
1202   my $self = shift;
1203   $self->_do_query(@_);
1204 }
1205
1206 sub disconnect_call_do_sql {
1207   my $self = shift;
1208   $self->_do_query(@_);
1209 }
1210
1211 # override in db-specific backend when necessary
1212 sub connect_call_datetime_setup { 1 }
1213
1214 sub _do_query {
1215   my ($self, $action) = @_;
1216
1217   if (ref $action eq 'CODE') {
1218     $action = $action->($self);
1219     $self->_do_query($_) foreach @$action;
1220   }
1221   else {
1222     # Most debuggers expect ($sql, @bind), so we need to exclude
1223     # the attribute hash which is the second argument to $dbh->do
1224     # furthermore the bind values are usually to be presented
1225     # as named arrayref pairs, so wrap those here too
1226     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1227     my $sql = shift @do_args;
1228     my $attrs = shift @do_args;
1229     my @bind = map { [ undef, $_ ] } @do_args;
1230
1231     $self->_query_start($sql, @bind);
1232     $self->_get_dbh->do($sql, $attrs, @do_args);
1233     $self->_query_end($sql, @bind);
1234   }
1235
1236   return $self;
1237 }
1238
1239 sub _connect {
1240   my ($self, @info) = @_;
1241
1242   $self->throw_exception("You failed to provide any connection info")
1243     if !@info;
1244
1245   my ($old_connect_via, $dbh);
1246
1247   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1248     $old_connect_via = $DBI::connect_via;
1249     $DBI::connect_via = 'connect';
1250   }
1251
1252   try {
1253     if(ref $info[0] eq 'CODE') {
1254        $dbh = $info[0]->();
1255     }
1256     else {
1257        $dbh = DBI->connect(@info);
1258     }
1259
1260     if (!$dbh) {
1261       die $DBI::errstr;
1262     }
1263
1264     unless ($self->unsafe) {
1265
1266       # this odd anonymous coderef dereference is in fact really
1267       # necessary to avoid the unwanted effect described in perl5
1268       # RT#75792
1269       sub {
1270         my $weak_self = $_[0];
1271         weaken $weak_self;
1272
1273         $_[1]->{HandleError} = sub {
1274           if ($weak_self) {
1275             $weak_self->throw_exception("DBI Exception: $_[0]");
1276           }
1277           else {
1278             # the handler may be invoked by something totally out of
1279             # the scope of DBIC
1280             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1281           }
1282         };
1283       }->($self, $dbh);
1284
1285       $dbh->{ShowErrorStatement} = 1;
1286       $dbh->{RaiseError} = 1;
1287       $dbh->{PrintError} = 0;
1288     }
1289   }
1290   catch {
1291     $self->throw_exception("DBI Connection failed: $_")
1292   }
1293   finally {
1294     $DBI::connect_via = $old_connect_via if $old_connect_via;
1295   };
1296
1297   $self->_dbh_autocommit($dbh->{AutoCommit});
1298   $dbh;
1299 }
1300
1301 sub svp_begin {
1302   my ($self, $name) = @_;
1303
1304   $name = $self->_svp_generate_name
1305     unless defined $name;
1306
1307   $self->throw_exception ("You can't use savepoints outside a transaction")
1308     if $self->{transaction_depth} == 0;
1309
1310   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1311     unless $self->can('_svp_begin');
1312
1313   push @{ $self->{savepoints} }, $name;
1314
1315   $self->debugobj->svp_begin($name) if $self->debug;
1316
1317   return $self->_svp_begin($name);
1318 }
1319
1320 sub svp_release {
1321   my ($self, $name) = @_;
1322
1323   $self->throw_exception ("You can't use savepoints outside a transaction")
1324     if $self->{transaction_depth} == 0;
1325
1326   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1327     unless $self->can('_svp_release');
1328
1329   if (defined $name) {
1330     $self->throw_exception ("Savepoint '$name' does not exist")
1331       unless grep { $_ eq $name } @{ $self->{savepoints} };
1332
1333     # Dig through the stack until we find the one we are releasing.  This keeps
1334     # the stack up to date.
1335     my $svp;
1336
1337     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1338   } else {
1339     $name = pop @{ $self->{savepoints} };
1340   }
1341
1342   $self->debugobj->svp_release($name) if $self->debug;
1343
1344   return $self->_svp_release($name);
1345 }
1346
1347 sub svp_rollback {
1348   my ($self, $name) = @_;
1349
1350   $self->throw_exception ("You can't use savepoints outside a transaction")
1351     if $self->{transaction_depth} == 0;
1352
1353   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1354     unless $self->can('_svp_rollback');
1355
1356   if (defined $name) {
1357       # If they passed us a name, verify that it exists in the stack
1358       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1359           $self->throw_exception("Savepoint '$name' does not exist!");
1360       }
1361
1362       # Dig through the stack until we find the one we are releasing.  This keeps
1363       # the stack up to date.
1364       while(my $s = pop(@{ $self->{savepoints} })) {
1365           last if($s eq $name);
1366       }
1367       # Add the savepoint back to the stack, as a rollback doesn't remove the
1368       # named savepoint, only everything after it.
1369       push(@{ $self->{savepoints} }, $name);
1370   } else {
1371       # We'll assume they want to rollback to the last savepoint
1372       $name = $self->{savepoints}->[-1];
1373   }
1374
1375   $self->debugobj->svp_rollback($name) if $self->debug;
1376
1377   return $self->_svp_rollback($name);
1378 }
1379
1380 sub _svp_generate_name {
1381   my ($self) = @_;
1382   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1383 }
1384
1385 sub txn_begin {
1386   my $self = shift;
1387
1388   # this means we have not yet connected and do not know the AC status
1389   # (e.g. coderef $dbh)
1390   if (! defined $self->_dbh_autocommit) {
1391     $self->ensure_connected;
1392   }
1393   # otherwise re-connect on pid changes, so
1394   # that the txn_depth is adjusted properly
1395   # the lightweight _get_dbh is good enoug here
1396   # (only superficial handle check, no pings)
1397   else {
1398     $self->_get_dbh;
1399   }
1400
1401   if($self->transaction_depth == 0) {
1402     $self->debugobj->txn_begin()
1403       if $self->debug;
1404     $self->_dbh_begin_work;
1405   }
1406   elsif ($self->auto_savepoint) {
1407     $self->svp_begin;
1408   }
1409   $self->{transaction_depth}++;
1410 }
1411
1412 sub _dbh_begin_work {
1413   my $self = shift;
1414
1415   # if the user is utilizing txn_do - good for him, otherwise we need to
1416   # ensure that the $dbh is healthy on BEGIN.
1417   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1418   # will be replaced by a failure of begin_work itself (which will be
1419   # then retried on reconnect)
1420   if ($self->{_in_dbh_do}) {
1421     $self->_dbh->begin_work;
1422   } else {
1423     $self->dbh_do(sub { $_[1]->begin_work });
1424   }
1425 }
1426
1427 sub txn_commit {
1428   my $self = shift;
1429   if ($self->{transaction_depth} == 1) {
1430     $self->debugobj->txn_commit()
1431       if ($self->debug);
1432     $self->_dbh_commit;
1433     $self->{transaction_depth} = 0
1434       if $self->_dbh_autocommit;
1435   }
1436   elsif($self->{transaction_depth} > 1) {
1437     $self->{transaction_depth}--;
1438     $self->svp_release
1439       if $self->auto_savepoint;
1440   }
1441   else {
1442     $self->throw_exception( 'Refusing to commit without a started transaction' );
1443   }
1444 }
1445
1446 sub _dbh_commit {
1447   my $self = shift;
1448   my $dbh  = $self->_dbh
1449     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1450   $dbh->commit;
1451 }
1452
1453 sub txn_rollback {
1454   my $self = shift;
1455   my $dbh = $self->_dbh;
1456   try {
1457     if ($self->{transaction_depth} == 1) {
1458       $self->debugobj->txn_rollback()
1459         if ($self->debug);
1460       $self->{transaction_depth} = 0
1461         if $self->_dbh_autocommit;
1462       $self->_dbh_rollback;
1463     }
1464     elsif($self->{transaction_depth} > 1) {
1465       $self->{transaction_depth}--;
1466       if ($self->auto_savepoint) {
1467         $self->svp_rollback;
1468         $self->svp_release;
1469       }
1470     }
1471     else {
1472       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1473     }
1474   }
1475   catch {
1476     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1477
1478     if ($_ !~ /$exception_class/) {
1479       # ensure that a failed rollback resets the transaction depth
1480       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1481     }
1482
1483     $self->throw_exception($_)
1484   };
1485 }
1486
1487 sub _dbh_rollback {
1488   my $self = shift;
1489   my $dbh  = $self->_dbh
1490     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1491   $dbh->rollback;
1492 }
1493
1494 # This used to be the top-half of _execute.  It was split out to make it
1495 #  easier to override in NoBindVars without duping the rest.  It takes up
1496 #  all of _execute's args, and emits $sql, @bind.
1497 sub _prep_for_execute {
1498   my ($self, $op, $extra_bind, $ident, $args) = @_;
1499
1500   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1501     $ident = $ident->from();
1502   }
1503
1504   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1505
1506   unshift(@bind,
1507     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1508       if $extra_bind;
1509   return ($sql, \@bind);
1510 }
1511
1512
1513 sub _fix_bind_params {
1514     my ($self, @bind) = @_;
1515
1516     ### Turn @bind from something like this:
1517     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1518     ### to this:
1519     ###   ( "'1'", "'1'", "'3'" )
1520     return
1521         map {
1522             if ( defined( $_ && $_->[1] ) ) {
1523                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1524             }
1525             else { q{NULL}; }
1526         } @bind;
1527 }
1528
1529 sub _query_start {
1530     my ( $self, $sql, @bind ) = @_;
1531
1532     if ( $self->debug ) {
1533         @bind = $self->_fix_bind_params(@bind);
1534
1535         $self->debugobj->query_start( $sql, @bind );
1536     }
1537 }
1538
1539 sub _query_end {
1540     my ( $self, $sql, @bind ) = @_;
1541
1542     if ( $self->debug ) {
1543         @bind = $self->_fix_bind_params(@bind);
1544         $self->debugobj->query_end( $sql, @bind );
1545     }
1546 }
1547
1548 sub _dbh_execute {
1549   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1550
1551   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1552
1553   $self->_query_start( $sql, @$bind );
1554
1555   my $sth = $self->sth($sql,$op);
1556
1557   my $placeholder_index = 1;
1558
1559   foreach my $bound (@$bind) {
1560     my $attributes = {};
1561     my($column_name, @data) = @$bound;
1562
1563     if ($bind_attributes) {
1564       $attributes = $bind_attributes->{$column_name}
1565       if defined $bind_attributes->{$column_name};
1566     }
1567
1568     foreach my $data (@data) {
1569       my $ref = ref $data;
1570       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1571
1572       $sth->bind_param($placeholder_index, $data, $attributes);
1573       $placeholder_index++;
1574     }
1575   }
1576
1577   # Can this fail without throwing an exception anyways???
1578   my $rv = $sth->execute();
1579   $self->throw_exception(
1580     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1581   ) if !$rv;
1582
1583   $self->_query_end( $sql, @$bind );
1584
1585   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1586 }
1587
1588 sub _execute {
1589     my $self = shift;
1590     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1591 }
1592
1593 sub insert {
1594   my ($self, $source, $to_insert) = @_;
1595
1596   my $colinfo = $source->columns_info;
1597
1598   # mix with auto-nextval marked values (a bit of a speed hit, but
1599   # no saner way to handle this yet)
1600   my $auto_nextvals = {} ;
1601   for my $col (keys %$colinfo) {
1602     if (
1603       $colinfo->{$col}{auto_nextval}
1604         and
1605       (
1606         ! exists $to_insert->{$col}
1607           or
1608         ref $to_insert->{$col} eq 'SCALAR'
1609       )
1610     ) {
1611       $auto_nextvals->{$col} = $self->_sequence_fetch(
1612         'nextval',
1613         ( $colinfo->{$col}{sequence} ||=
1614             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1615         ),
1616       );
1617     }
1618   }
1619
1620   # fuse the values
1621   $to_insert = { %$to_insert, %$auto_nextvals };
1622
1623   # list of primary keys we try to fetch from the database
1624   # both not-exsists and scalarrefs are considered
1625   my %fetch_pks;
1626   %fetch_pks = ( map
1627     { $_ => scalar keys %fetch_pks }  # so we can preserve order for prettyness
1628     grep
1629       { ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR' }
1630       $source->primary_columns
1631   );
1632
1633   my $sqla_opts;
1634   if ($self->_use_insert_returning) {
1635
1636     # retain order as declared in the resultsource
1637     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1638       push @{$sqla_opts->{returning}}, $_;
1639     }
1640   }
1641
1642   my $bind_attributes = $self->source_bind_attributes($source);
1643
1644   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1645
1646   my %returned_cols = %$auto_nextvals;
1647
1648   if (my $retlist = $sqla_opts->{returning}) {
1649     my @ret_vals = try {
1650       local $SIG{__WARN__} = sub {};
1651       my @r = $sth->fetchrow_array;
1652       $sth->finish;
1653       @r;
1654     };
1655
1656     @returned_cols{@$retlist} = @ret_vals if @ret_vals;
1657   }
1658
1659   return \%returned_cols;
1660 }
1661
1662
1663 ## Currently it is assumed that all values passed will be "normal", i.e. not
1664 ## scalar refs, or at least, all the same type as the first set, the statement is
1665 ## only prepped once.
1666 sub insert_bulk {
1667   my ($self, $source, $cols, $data) = @_;
1668
1669   my %colvalues;
1670   @colvalues{@$cols} = (0..$#$cols);
1671
1672   for my $i (0..$#$cols) {
1673     my $first_val = $data->[0][$i];
1674     next unless ref $first_val eq 'SCALAR';
1675
1676     $colvalues{ $cols->[$i] } = $first_val;
1677   }
1678
1679   # check for bad data and stringify stringifiable objects
1680   my $bad_slice = sub {
1681     my ($msg, $col_idx, $slice_idx) = @_;
1682     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1683       $msg,
1684       $cols->[$col_idx],
1685       do {
1686         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1687         Dumper {
1688           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1689         },
1690       }
1691     );
1692   };
1693
1694   for my $datum_idx (0..$#$data) {
1695     my $datum = $data->[$datum_idx];
1696
1697     for my $col_idx (0..$#$cols) {
1698       my $val            = $datum->[$col_idx];
1699       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1700       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1701
1702       if ($is_literal_sql) {
1703         if (not ref $val) {
1704           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1705         }
1706         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1707           $bad_slice->("$reftype reference found where literal SQL expected",
1708             $col_idx, $datum_idx);
1709         }
1710         elsif ($$val ne $$sqla_bind){
1711           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1712             $col_idx, $datum_idx);
1713         }
1714       }
1715       elsif (my $reftype = ref $val) {
1716         require overload;
1717         if (overload::Method($val, '""')) {
1718           $datum->[$col_idx] = "".$val;
1719         }
1720         else {
1721           $bad_slice->("$reftype reference found where bind expected",
1722             $col_idx, $datum_idx);
1723         }
1724       }
1725     }
1726   }
1727
1728   my ($sql, $bind) = $self->_prep_for_execute (
1729     'insert', undef, $source, [\%colvalues]
1730   );
1731
1732   if (! @$bind) {
1733     # if the bindlist is empty - make sure all "values" are in fact
1734     # literal scalarrefs. If not the case this means the storage ate
1735     # them away (e.g. the NoBindVars component) and interpolated them
1736     # directly into the SQL. This obviosly can't be good for multi-inserts
1737
1738     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1739       if first { ref $_ ne 'SCALAR' } values %colvalues;
1740   }
1741
1742   # neither _execute_array, nor _execute_inserts_with_no_binds are
1743   # atomic (even if _execute _array is a single call). Thus a safety
1744   # scope guard
1745   my $guard = $self->txn_scope_guard;
1746
1747   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1748   my $sth = $self->sth($sql);
1749   my $rv = do {
1750     if (@$bind) {
1751       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1752       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1753     }
1754     else {
1755       # bind_param_array doesn't work if there are no binds
1756       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1757     }
1758   };
1759
1760   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1761
1762   $guard->commit;
1763
1764   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1765 }
1766
1767 sub _execute_array {
1768   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1769
1770   ## This must be an arrayref, else nothing works!
1771   my $tuple_status = [];
1772
1773   ## Get the bind_attributes, if any exist
1774   my $bind_attributes = $self->source_bind_attributes($source);
1775
1776   ## Bind the values and execute
1777   my $placeholder_index = 1;
1778
1779   foreach my $bound (@$bind) {
1780
1781     my $attributes = {};
1782     my ($column_name, $data_index) = @$bound;
1783
1784     if( $bind_attributes ) {
1785       $attributes = $bind_attributes->{$column_name}
1786       if defined $bind_attributes->{$column_name};
1787     }
1788
1789     my @data = map { $_->[$data_index] } @$data;
1790
1791     $sth->bind_param_array(
1792       $placeholder_index,
1793       [@data],
1794       (%$attributes ?  $attributes : ()),
1795     );
1796     $placeholder_index++;
1797   }
1798
1799   my ($rv, $err);
1800   try {
1801     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1802   }
1803   catch {
1804     $err = shift;
1805   };
1806
1807   # Statement must finish even if there was an exception.
1808   try {
1809     $sth->finish
1810   }
1811   catch {
1812     $err = shift unless defined $err
1813   };
1814
1815   $err = $sth->errstr
1816     if (! defined $err and $sth->err);
1817
1818   if (defined $err) {
1819     my $i = 0;
1820     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1821
1822     $self->throw_exception("Unexpected populate error: $err")
1823       if ($i > $#$tuple_status);
1824
1825     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1826       ($tuple_status->[$i][1] || $err),
1827       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1828     );
1829   }
1830
1831   return $rv;
1832 }
1833
1834 sub _dbh_execute_array {
1835     my ($self, $sth, $tuple_status, @extra) = @_;
1836
1837     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1838 }
1839
1840 sub _dbh_execute_inserts_with_no_binds {
1841   my ($self, $sth, $count) = @_;
1842
1843   my $err;
1844   try {
1845     my $dbh = $self->_get_dbh;
1846     local $dbh->{RaiseError} = 1;
1847     local $dbh->{PrintError} = 0;
1848
1849     $sth->execute foreach 1..$count;
1850   }
1851   catch {
1852     $err = shift;
1853   };
1854
1855   # Make sure statement is finished even if there was an exception.
1856   try {
1857     $sth->finish
1858   }
1859   catch {
1860     $err = shift unless defined $err;
1861   };
1862
1863   $self->throw_exception($err) if defined $err;
1864
1865   return $count;
1866 }
1867
1868 sub update {
1869   my ($self, $source, @args) = @_;
1870
1871   my $bind_attrs = $self->source_bind_attributes($source);
1872
1873   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1874 }
1875
1876
1877 sub delete {
1878   my ($self, $source, @args) = @_;
1879
1880   my $bind_attrs = $self->source_bind_attributes($source);
1881
1882   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1883 }
1884
1885 # We were sent here because the $rs contains a complex search
1886 # which will require a subquery to select the correct rows
1887 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1888 #
1889 # Generating a single PK column subquery is trivial and supported
1890 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1891 # Look at _multipk_update_delete()
1892 sub _subq_update_delete {
1893   my $self = shift;
1894   my ($rs, $op, $values) = @_;
1895
1896   my $rsrc = $rs->result_source;
1897
1898   # quick check if we got a sane rs on our hands
1899   my @pcols = $rsrc->_pri_cols;
1900
1901   my $sel = $rs->_resolved_attrs->{select};
1902   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1903
1904   if (
1905       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1906         ne
1907       join ("\x00", sort @$sel )
1908   ) {
1909     $self->throw_exception (
1910       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1911     );
1912   }
1913
1914   if (@pcols == 1) {
1915     return $self->$op (
1916       $rsrc,
1917       $op eq 'update' ? $values : (),
1918       { $pcols[0] => { -in => $rs->as_query } },
1919     );
1920   }
1921
1922   else {
1923     return $self->_multipk_update_delete (@_);
1924   }
1925 }
1926
1927 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1928 # resultset update/delete involving subqueries. So by default resort
1929 # to simple (and inefficient) delete_all style per-row opearations,
1930 # while allowing specific storages to override this with a faster
1931 # implementation.
1932 #
1933 sub _multipk_update_delete {
1934   return shift->_per_row_update_delete (@_);
1935 }
1936
1937 # This is the default loop used to delete/update rows for multi PK
1938 # resultsets, and used by mysql exclusively (because it can't do anything
1939 # else).
1940 #
1941 # We do not use $row->$op style queries, because resultset update/delete
1942 # is not expected to cascade (this is what delete_all/update_all is for).
1943 #
1944 # There should be no race conditions as the entire operation is rolled
1945 # in a transaction.
1946 #
1947 sub _per_row_update_delete {
1948   my $self = shift;
1949   my ($rs, $op, $values) = @_;
1950
1951   my $rsrc = $rs->result_source;
1952   my @pcols = $rsrc->_pri_cols;
1953
1954   my $guard = $self->txn_scope_guard;
1955
1956   # emulate the return value of $sth->execute for non-selects
1957   my $row_cnt = '0E0';
1958
1959   my $subrs_cur = $rs->cursor;
1960   my @all_pk = $subrs_cur->all;
1961   for my $pks ( @all_pk) {
1962
1963     my $cond;
1964     for my $i (0.. $#pcols) {
1965       $cond->{$pcols[$i]} = $pks->[$i];
1966     }
1967
1968     $self->$op (
1969       $rsrc,
1970       $op eq 'update' ? $values : (),
1971       $cond,
1972     );
1973
1974     $row_cnt++;
1975   }
1976
1977   $guard->commit;
1978
1979   return $row_cnt;
1980 }
1981
1982 sub _select {
1983   my $self = shift;
1984   $self->_execute($self->_select_args(@_));
1985 }
1986
1987 sub _select_args_to_query {
1988   my $self = shift;
1989
1990   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1991   #  = $self->_select_args($ident, $select, $cond, $attrs);
1992   my ($op, $bind, $ident, $bind_attrs, @args) =
1993     $self->_select_args(@_);
1994
1995   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1996   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1997   $prepared_bind ||= [];
1998
1999   return wantarray
2000     ? ($sql, $prepared_bind, $bind_attrs)
2001     : \[ "($sql)", @$prepared_bind ]
2002   ;
2003 }
2004
2005 sub _select_args {
2006   my ($self, $ident, $select, $where, $attrs) = @_;
2007
2008   my $sql_maker = $self->sql_maker;
2009   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2010
2011   $attrs = {
2012     %$attrs,
2013     select => $select,
2014     from => $ident,
2015     where => $where,
2016     $rs_alias && $alias2source->{$rs_alias}
2017       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
2018       : ()
2019     ,
2020   };
2021
2022   # calculate bind_attrs before possible $ident mangling
2023   my $bind_attrs = {};
2024   for my $alias (keys %$alias2source) {
2025     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2026     for my $col (keys %$bindtypes) {
2027
2028       my $fqcn = join ('.', $alias, $col);
2029       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2030
2031       # Unqialified column names are nice, but at the same time can be
2032       # rather ambiguous. What we do here is basically go along with
2033       # the loop, adding an unqualified column slot to $bind_attrs,
2034       # alongside the fully qualified name. As soon as we encounter
2035       # another column by that name (which would imply another table)
2036       # we unset the unqualified slot and never add any info to it
2037       # to avoid erroneous type binding. If this happens the users
2038       # only choice will be to fully qualify his column name
2039
2040       if (exists $bind_attrs->{$col}) {
2041         $bind_attrs->{$col} = {};
2042       }
2043       else {
2044         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2045       }
2046     }
2047   }
2048
2049   # Sanity check the attributes (SQLMaker does it too, but
2050   # in case of a software_limit we'll never reach there)
2051   if (defined $attrs->{offset}) {
2052     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2053       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2054   }
2055   $attrs->{offset} ||= 0;
2056
2057   if (defined $attrs->{rows}) {
2058     $self->throw_exception("The rows attribute must be a positive integer if present")
2059       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2060   }
2061   elsif ($attrs->{offset}) {
2062     # MySQL actually recommends this approach.  I cringe.
2063     $attrs->{rows} = $sql_maker->__max_int;
2064   }
2065
2066   my @limit;
2067
2068   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2069   # storage, unless software limit was requested
2070   if (
2071     #limited has_many
2072     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2073        ||
2074     # grouped prefetch (to satisfy group_by == select)
2075     ( $attrs->{group_by}
2076         &&
2077       @{$attrs->{group_by}}
2078         &&
2079       $attrs->{_prefetch_select}
2080         &&
2081       @{$attrs->{_prefetch_select}}
2082     )
2083   ) {
2084     ($ident, $select, $where, $attrs)
2085       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2086   }
2087   elsif (! $attrs->{software_limit} ) {
2088     push @limit, $attrs->{rows}, $attrs->{offset};
2089   }
2090
2091   # try to simplify the joinmap further (prune unreferenced type-single joins)
2092   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2093
2094 ###
2095   # This would be the point to deflate anything found in $where
2096   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2097   # expect a row object. And all we have is a resultsource (it is trivial
2098   # to extract deflator coderefs via $alias2source above).
2099   #
2100   # I don't see a way forward other than changing the way deflators are
2101   # invoked, and that's just bad...
2102 ###
2103
2104   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2105 }
2106
2107 # Returns a counting SELECT for a simple count
2108 # query. Abstracted so that a storage could override
2109 # this to { count => 'firstcol' } or whatever makes
2110 # sense as a performance optimization
2111 sub _count_select {
2112   #my ($self, $source, $rs_attrs) = @_;
2113   return { count => '*' };
2114 }
2115
2116
2117 sub source_bind_attributes {
2118   my ($self, $source) = @_;
2119
2120   my $bind_attributes;
2121
2122   my $colinfo = $source->columns_info;
2123
2124   for my $col (keys %$colinfo) {
2125     if (my $dt = $colinfo->{$col}{data_type} ) {
2126       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2127     }
2128   }
2129
2130   return $bind_attributes;
2131 }
2132
2133 =head2 select
2134
2135 =over 4
2136
2137 =item Arguments: $ident, $select, $condition, $attrs
2138
2139 =back
2140
2141 Handle a SQL select statement.
2142
2143 =cut
2144
2145 sub select {
2146   my $self = shift;
2147   my ($ident, $select, $condition, $attrs) = @_;
2148   return $self->cursor_class->new($self, \@_, $attrs);
2149 }
2150
2151 sub select_single {
2152   my $self = shift;
2153   my ($rv, $sth, @bind) = $self->_select(@_);
2154   my @row = $sth->fetchrow_array;
2155   my @nextrow = $sth->fetchrow_array if @row;
2156   if(@row && @nextrow) {
2157     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2158   }
2159   # Need to call finish() to work round broken DBDs
2160   $sth->finish();
2161   return @row;
2162 }
2163
2164 =head2 sql_limit_dialect
2165
2166 This is an accessor for the default SQL limit dialect used by a particular
2167 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2168 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2169 see L<DBIx::Class::SQLMaker::LimitDialects>.
2170
2171 =head2 sth
2172
2173 =over 4
2174
2175 =item Arguments: $sql
2176
2177 =back
2178
2179 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2180
2181 =cut
2182
2183 sub _dbh_sth {
2184   my ($self, $dbh, $sql) = @_;
2185
2186   # 3 is the if_active parameter which avoids active sth re-use
2187   my $sth = $self->disable_sth_caching
2188     ? $dbh->prepare($sql)
2189     : $dbh->prepare_cached($sql, {}, 3);
2190
2191   # XXX You would think RaiseError would make this impossible,
2192   #  but apparently that's not true :(
2193   $self->throw_exception($dbh->errstr) if !$sth;
2194
2195   $sth;
2196 }
2197
2198 sub sth {
2199   my ($self, $sql) = @_;
2200   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2201 }
2202
2203 sub _dbh_columns_info_for {
2204   my ($self, $dbh, $table) = @_;
2205
2206   if ($dbh->can('column_info')) {
2207     my %result;
2208     my $caught;
2209     try {
2210       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2211       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2212       $sth->execute();
2213       while ( my $info = $sth->fetchrow_hashref() ){
2214         my %column_info;
2215         $column_info{data_type}   = $info->{TYPE_NAME};
2216         $column_info{size}      = $info->{COLUMN_SIZE};
2217         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2218         $column_info{default_value} = $info->{COLUMN_DEF};
2219         my $col_name = $info->{COLUMN_NAME};
2220         $col_name =~ s/^\"(.*)\"$/$1/;
2221
2222         $result{$col_name} = \%column_info;
2223       }
2224     } catch {
2225       $caught = 1;
2226     };
2227     return \%result if !$caught && scalar keys %result;
2228   }
2229
2230   my %result;
2231   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2232   $sth->execute;
2233   my @columns = @{$sth->{NAME_lc}};
2234   for my $i ( 0 .. $#columns ){
2235     my %column_info;
2236     $column_info{data_type} = $sth->{TYPE}->[$i];
2237     $column_info{size} = $sth->{PRECISION}->[$i];
2238     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2239
2240     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2241       $column_info{data_type} = $1;
2242       $column_info{size}    = $2;
2243     }
2244
2245     $result{$columns[$i]} = \%column_info;
2246   }
2247   $sth->finish;
2248
2249   foreach my $col (keys %result) {
2250     my $colinfo = $result{$col};
2251     my $type_num = $colinfo->{data_type};
2252     my $type_name;
2253     if(defined $type_num && $dbh->can('type_info')) {
2254       my $type_info = $dbh->type_info($type_num);
2255       $type_name = $type_info->{TYPE_NAME} if $type_info;
2256       $colinfo->{data_type} = $type_name if $type_name;
2257     }
2258   }
2259
2260   return \%result;
2261 }
2262
2263 sub columns_info_for {
2264   my ($self, $table) = @_;
2265   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2266 }
2267
2268 =head2 last_insert_id
2269
2270 Return the row id of the last insert.
2271
2272 =cut
2273
2274 sub _dbh_last_insert_id {
2275     my ($self, $dbh, $source, $col) = @_;
2276
2277     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2278
2279     return $id if defined $id;
2280
2281     my $class = ref $self;
2282     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2283 }
2284
2285 sub last_insert_id {
2286   my $self = shift;
2287   $self->_dbh_last_insert_id ($self->_dbh, @_);
2288 }
2289
2290 =head2 _native_data_type
2291
2292 =over 4
2293
2294 =item Arguments: $type_name
2295
2296 =back
2297
2298 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2299 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2300 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2301
2302 The default implementation returns C<undef>, implement in your Storage driver if
2303 you need this functionality.
2304
2305 Should map types from other databases to the native RDBMS type, for example
2306 C<VARCHAR2> to C<VARCHAR>.
2307
2308 Types with modifiers should map to the underlying data type. For example,
2309 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2310
2311 Composite types should map to the container type, for example
2312 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2313
2314 =cut
2315
2316 sub _native_data_type {
2317   #my ($self, $data_type) = @_;
2318   return undef
2319 }
2320
2321 # Check if placeholders are supported at all
2322 sub _determine_supports_placeholders {
2323   my $self = shift;
2324   my $dbh  = $self->_get_dbh;
2325
2326   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2327   # but it is inaccurate more often than not
2328   return try {
2329     local $dbh->{PrintError} = 0;
2330     local $dbh->{RaiseError} = 1;
2331     $dbh->do('select ?', {}, 1);
2332     1;
2333   }
2334   catch {
2335     0;
2336   };
2337 }
2338
2339 # Check if placeholders bound to non-string types throw exceptions
2340 #
2341 sub _determine_supports_typeless_placeholders {
2342   my $self = shift;
2343   my $dbh  = $self->_get_dbh;
2344
2345   return try {
2346     local $dbh->{PrintError} = 0;
2347     local $dbh->{RaiseError} = 1;
2348     # this specifically tests a bind that is NOT a string
2349     $dbh->do('select 1 where 1 = ?', {}, 1);
2350     1;
2351   }
2352   catch {
2353     0;
2354   };
2355 }
2356
2357 =head2 sqlt_type
2358
2359 Returns the database driver name.
2360
2361 =cut
2362
2363 sub sqlt_type {
2364   shift->_get_dbh->{Driver}->{Name};
2365 }
2366
2367 =head2 bind_attribute_by_data_type
2368
2369 Given a datatype from column info, returns a database specific bind
2370 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2371 let the database planner just handle it.
2372
2373 Generally only needed for special case column types, like bytea in postgres.
2374
2375 =cut
2376
2377 sub bind_attribute_by_data_type {
2378     return;
2379 }
2380
2381 =head2 is_datatype_numeric
2382
2383 Given a datatype from column_info, returns a boolean value indicating if
2384 the current RDBMS considers it a numeric value. This controls how
2385 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2386 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2387 be performed instead of the usual C<eq>.
2388
2389 =cut
2390
2391 sub is_datatype_numeric {
2392   my ($self, $dt) = @_;
2393
2394   return 0 unless $dt;
2395
2396   return $dt =~ /^ (?:
2397     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2398   ) $/ix;
2399 }
2400
2401
2402 =head2 create_ddl_dir
2403
2404 =over 4
2405
2406 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2407
2408 =back
2409
2410 Creates a SQL file based on the Schema, for each of the specified
2411 database engines in C<\@databases> in the given directory.
2412 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2413
2414 Given a previous version number, this will also create a file containing
2415 the ALTER TABLE statements to transform the previous schema into the
2416 current one. Note that these statements may contain C<DROP TABLE> or
2417 C<DROP COLUMN> statements that can potentially destroy data.
2418
2419 The file names are created using the C<ddl_filename> method below, please
2420 override this method in your schema if you would like a different file
2421 name format. For the ALTER file, the same format is used, replacing
2422 $version in the name with "$preversion-$version".
2423
2424 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2425 The most common value for this would be C<< { add_drop_table => 1 } >>
2426 to have the SQL produced include a C<DROP TABLE> statement for each table
2427 created. For quoting purposes supply C<quote_table_names> and
2428 C<quote_field_names>.
2429
2430 If no arguments are passed, then the following default values are assumed:
2431
2432 =over 4
2433
2434 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2435
2436 =item version    - $schema->schema_version
2437
2438 =item directory  - './'
2439
2440 =item preversion - <none>
2441
2442 =back
2443
2444 By default, C<\%sqlt_args> will have
2445
2446  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2447
2448 merged with the hash passed in. To disable any of those features, pass in a
2449 hashref like the following
2450
2451  { ignore_constraint_names => 0, # ... other options }
2452
2453
2454 WARNING: You are strongly advised to check all SQL files created, before applying
2455 them.
2456
2457 =cut
2458
2459 sub create_ddl_dir {
2460   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2461
2462   unless ($dir) {
2463     carp "No directory given, using ./\n";
2464     $dir = './';
2465   } else {
2466       -d $dir
2467         or
2468       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2469         or
2470       $self->throw_exception(
2471         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2472       );
2473   }
2474
2475   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2476
2477   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2478   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2479
2480   my $schema_version = $schema->schema_version || '1.x';
2481   $version ||= $schema_version;
2482
2483   $sqltargs = {
2484     add_drop_table => 1,
2485     ignore_constraint_names => 1,
2486     ignore_index_names => 1,
2487     %{$sqltargs || {}}
2488   };
2489
2490   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2491     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2492   }
2493
2494   my $sqlt = SQL::Translator->new( $sqltargs );
2495
2496   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2497   my $sqlt_schema = $sqlt->translate({ data => $schema })
2498     or $self->throw_exception ($sqlt->error);
2499
2500   foreach my $db (@$databases) {
2501     $sqlt->reset();
2502     $sqlt->{schema} = $sqlt_schema;
2503     $sqlt->producer($db);
2504
2505     my $file;
2506     my $filename = $schema->ddl_filename($db, $version, $dir);
2507     if (-e $filename && ($version eq $schema_version )) {
2508       # if we are dumping the current version, overwrite the DDL
2509       carp "Overwriting existing DDL file - $filename";
2510       unlink($filename);
2511     }
2512
2513     my $output = $sqlt->translate;
2514     if(!$output) {
2515       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2516       next;
2517     }
2518     if(!open($file, ">$filename")) {
2519       $self->throw_exception("Can't open $filename for writing ($!)");
2520       next;
2521     }
2522     print $file $output;
2523     close($file);
2524
2525     next unless ($preversion);
2526
2527     require SQL::Translator::Diff;
2528
2529     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2530     if(!-e $prefilename) {
2531       carp("No previous schema file found ($prefilename)");
2532       next;
2533     }
2534
2535     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2536     if(-e $difffile) {
2537       carp("Overwriting existing diff file - $difffile");
2538       unlink($difffile);
2539     }
2540
2541     my $source_schema;
2542     {
2543       my $t = SQL::Translator->new($sqltargs);
2544       $t->debug( 0 );
2545       $t->trace( 0 );
2546
2547       $t->parser( $db )
2548         or $self->throw_exception ($t->error);
2549
2550       my $out = $t->translate( $prefilename )
2551         or $self->throw_exception ($t->error);
2552
2553       $source_schema = $t->schema;
2554
2555       $source_schema->name( $prefilename )
2556         unless ( $source_schema->name );
2557     }
2558
2559     # The "new" style of producers have sane normalization and can support
2560     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2561     # And we have to diff parsed SQL against parsed SQL.
2562     my $dest_schema = $sqlt_schema;
2563
2564     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2565       my $t = SQL::Translator->new($sqltargs);
2566       $t->debug( 0 );
2567       $t->trace( 0 );
2568
2569       $t->parser( $db )
2570         or $self->throw_exception ($t->error);
2571
2572       my $out = $t->translate( $filename )
2573         or $self->throw_exception ($t->error);
2574
2575       $dest_schema = $t->schema;
2576
2577       $dest_schema->name( $filename )
2578         unless $dest_schema->name;
2579     }
2580
2581     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2582                                                   $dest_schema,   $db,
2583                                                   $sqltargs
2584                                                  );
2585     if(!open $file, ">$difffile") {
2586       $self->throw_exception("Can't write to $difffile ($!)");
2587       next;
2588     }
2589     print $file $diff;
2590     close($file);
2591   }
2592 }
2593
2594 =head2 deployment_statements
2595
2596 =over 4
2597
2598 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2599
2600 =back
2601
2602 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2603
2604 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2605 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2606
2607 C<$directory> is used to return statements from files in a previously created
2608 L</create_ddl_dir> directory and is optional. The filenames are constructed
2609 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2610
2611 If no C<$directory> is specified then the statements are constructed on the
2612 fly using L<SQL::Translator> and C<$version> is ignored.
2613
2614 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2615
2616 =cut
2617
2618 sub deployment_statements {
2619   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2620   $type ||= $self->sqlt_type;
2621   $version ||= $schema->schema_version || '1.x';
2622   $dir ||= './';
2623   my $filename = $schema->ddl_filename($type, $version, $dir);
2624   if(-f $filename)
2625   {
2626       my $file;
2627       open($file, "<$filename")
2628         or $self->throw_exception("Can't open $filename ($!)");
2629       my @rows = <$file>;
2630       close($file);
2631       return join('', @rows);
2632   }
2633
2634   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2635     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2636   }
2637
2638   # sources needs to be a parser arg, but for simplicty allow at top level
2639   # coming in
2640   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2641       if exists $sqltargs->{sources};
2642
2643   my $tr = SQL::Translator->new(
2644     producer => "SQL::Translator::Producer::${type}",
2645     %$sqltargs,
2646     parser => 'SQL::Translator::Parser::DBIx::Class',
2647     data => $schema,
2648   );
2649
2650   my @ret;
2651   my $wa = wantarray;
2652   if ($wa) {
2653     @ret = $tr->translate;
2654   }
2655   else {
2656     $ret[0] = $tr->translate;
2657   }
2658
2659   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2660     unless (@ret && defined $ret[0]);
2661
2662   return $wa ? @ret : $ret[0];
2663 }
2664
2665 sub deploy {
2666   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2667   my $deploy = sub {
2668     my $line = shift;
2669     return if($line =~ /^--/);
2670     return if(!$line);
2671     # next if($line =~ /^DROP/m);
2672     return if($line =~ /^BEGIN TRANSACTION/m);
2673     return if($line =~ /^COMMIT/m);
2674     return if $line =~ /^\s+$/; # skip whitespace only
2675     $self->_query_start($line);
2676     try {
2677       # do a dbh_do cycle here, as we need some error checking in
2678       # place (even though we will ignore errors)
2679       $self->dbh_do (sub { $_[1]->do($line) });
2680     } catch {
2681       carp qq{$_ (running "${line}")};
2682     };
2683     $self->_query_end($line);
2684   };
2685   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2686   if (@statements > 1) {
2687     foreach my $statement (@statements) {
2688       $deploy->( $statement );
2689     }
2690   }
2691   elsif (@statements == 1) {
2692     foreach my $line ( split(";\n", $statements[0])) {
2693       $deploy->( $line );
2694     }
2695   }
2696 }
2697
2698 =head2 datetime_parser
2699
2700 Returns the datetime parser class
2701
2702 =cut
2703
2704 sub datetime_parser {
2705   my $self = shift;
2706   return $self->{datetime_parser} ||= do {
2707     $self->build_datetime_parser(@_);
2708   };
2709 }
2710
2711 =head2 datetime_parser_type
2712
2713 Defines (returns) the datetime parser class - currently hardwired to
2714 L<DateTime::Format::MySQL>
2715
2716 =cut
2717
2718 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2719
2720 =head2 build_datetime_parser
2721
2722 See L</datetime_parser>
2723
2724 =cut
2725
2726 sub build_datetime_parser {
2727   my $self = shift;
2728   my $type = $self->datetime_parser_type(@_);
2729   $self->ensure_class_loaded ($type);
2730   return $type;
2731 }
2732
2733
2734 =head2 is_replicating
2735
2736 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2737 replicate from a master database.  Default is undef, which is the result
2738 returned by databases that don't support replication.
2739
2740 =cut
2741
2742 sub is_replicating {
2743     return;
2744
2745 }
2746
2747 =head2 lag_behind_master
2748
2749 Returns a number that represents a certain amount of lag behind a master db
2750 when a given storage is replicating.  The number is database dependent, but
2751 starts at zero and increases with the amount of lag. Default in undef
2752
2753 =cut
2754
2755 sub lag_behind_master {
2756     return;
2757 }
2758
2759 =head2 relname_to_table_alias
2760
2761 =over 4
2762
2763 =item Arguments: $relname, $join_count
2764
2765 =back
2766
2767 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2768 queries.
2769
2770 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2771 way these aliases are named.
2772
2773 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2774 otherwise C<"$relname">.
2775
2776 =cut
2777
2778 sub relname_to_table_alias {
2779   my ($self, $relname, $join_count) = @_;
2780
2781   my $alias = ($join_count && $join_count > 1 ?
2782     join('_', $relname, $join_count) : $relname);
2783
2784   return $alias;
2785 }
2786
2787 1;
2788
2789 =head1 USAGE NOTES
2790
2791 =head2 DBIx::Class and AutoCommit
2792
2793 DBIx::Class can do some wonderful magic with handling exceptions,
2794 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2795 (the default) combined with C<txn_do> for transaction support.
2796
2797 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2798 in an assumed transaction between commits, and you're telling us you'd
2799 like to manage that manually.  A lot of the magic protections offered by
2800 this module will go away.  We can't protect you from exceptions due to database
2801 disconnects because we don't know anything about how to restart your
2802 transactions.  You're on your own for handling all sorts of exceptional
2803 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2804 be with raw DBI.
2805
2806
2807 =head1 AUTHORS
2808
2809 Matt S. Trout <mst@shadowcatsystems.co.uk>
2810
2811 Andy Grundman <andy@hybridized.org>
2812
2813 =head1 LICENSE
2814
2815 You may distribute this code under the same terms as Perl itself.
2816
2817 =cut