Unqualify imported functions
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders join_optimizer/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
60
61 # on by default, not strictly a capability (pending rewrite)
62 __PACKAGE__->_use_join_optimizer (1);
63 sub _determine_supports_join_optimizer { 1 };
64
65 # Each of these methods need _determine_driver called before itself
66 # in order to function reliably. This is a purely DRY optimization
67 #
68 # get_(use)_dbms_capability need to be called on the correct Storage
69 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
70 # _determine_supports_X which obv. needs a correct driver as well
71 my @rdbms_specific_methods = qw/
72   deployment_statements
73   sqlt_type
74   sql_maker
75   build_datetime_parser
76   datetime_parser_type
77
78   insert
79   insert_bulk
80   update
81   delete
82   select
83   select_single
84
85   get_use_dbms_capability
86   get_dbms_capability
87
88   _server_info
89   _get_server_version
90 /;
91
92 for my $meth (@rdbms_specific_methods) {
93
94   my $orig = __PACKAGE__->can ($meth)
95     or die "$meth is not a ::Storage::DBI method!";
96
97   no strict qw/refs/;
98   no warnings qw/redefine/;
99   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
100     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
101       $_[0]->_determine_driver;
102
103       # This for some reason crashes and burns on perl 5.8.1
104       # IFF the method ends up throwing an exception
105       #goto $_[0]->can ($meth);
106
107       my $cref = $_[0]->can ($meth);
108       goto $cref;
109     }
110     goto $orig;
111   };
112 }
113
114
115 =head1 NAME
116
117 DBIx::Class::Storage::DBI - DBI storage handler
118
119 =head1 SYNOPSIS
120
121   my $schema = MySchema->connect('dbi:SQLite:my.db');
122
123   $schema->storage->debug(1);
124
125   my @stuff = $schema->storage->dbh_do(
126     sub {
127       my ($storage, $dbh, @args) = @_;
128       $dbh->do("DROP TABLE authors");
129     },
130     @column_list
131   );
132
133   $schema->resultset('Book')->search({
134      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
135   });
136
137 =head1 DESCRIPTION
138
139 This class represents the connection to an RDBMS via L<DBI>.  See
140 L<DBIx::Class::Storage> for general information.  This pod only
141 documents DBI-specific methods and behaviors.
142
143 =head1 METHODS
144
145 =cut
146
147 sub new {
148   my $new = shift->next::method(@_);
149
150   $new->transaction_depth(0);
151   $new->_sql_maker_opts({});
152   $new->_dbh_details({});
153   $new->{savepoints} = [];
154   $new->{_in_dbh_do} = 0;
155   $new->{_dbh_gen} = 0;
156
157   # read below to see what this does
158   $new->_arm_global_destructor;
159
160   $new;
161 }
162
163 # This is hack to work around perl shooting stuff in random
164 # order on exit(). If we do not walk the remaining storage
165 # objects in an END block, there is a *small but real* chance
166 # of a fork()ed child to kill the parent's shared DBI handle,
167 # *before perl reaches the DESTROY in this package*
168 # Yes, it is ugly and effective.
169 {
170   my %seek_and_destroy;
171
172   sub _arm_global_destructor {
173     my $self = shift;
174     my $key = refaddr ($self);
175     $seek_and_destroy{$key} = $self;
176     weaken ($seek_and_destroy{$key});
177   }
178
179   END {
180     local $?; # just in case the DBI destructor changes it somehow
181
182     # destroy just the object if not native to this process/thread
183     $_->_preserve_foreign_dbh for (grep
184       { defined $_ }
185       values %seek_and_destroy
186     );
187   }
188 }
189
190 sub DESTROY {
191   my $self = shift;
192
193   # some databases spew warnings on implicit disconnect
194   local $SIG{__WARN__} = sub {};
195   $self->_dbh(undef);
196 }
197
198 sub _preserve_foreign_dbh {
199   my $self = shift;
200
201   return unless $self->_dbh;
202
203   $self->_verify_tid;
204
205   return unless $self->_dbh;
206
207   $self->_verify_pid;
208
209 }
210
211 # handle pid changes correctly - do not destroy parent's connection
212 sub _verify_pid {
213   my $self = shift;
214
215   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
216
217   $self->_dbh->{InactiveDestroy} = 1;
218   $self->_dbh(undef);
219   $self->{_dbh_gen}++;
220
221   return;
222 }
223
224 # very similar to above, but seems to FAIL if I set InactiveDestroy
225 sub _verify_tid {
226   my $self = shift;
227
228   if ( ! defined $self->_conn_tid ) {
229     return; # no threads
230   }
231   elsif ( $self->_conn_tid == threads->tid ) {
232     return; # same thread
233   }
234
235   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
236   $self->_dbh(undef);
237   $self->{_dbh_gen}++;
238
239   return;
240 }
241
242
243 =head2 connect_info
244
245 This method is normally called by L<DBIx::Class::Schema/connection>, which
246 encapsulates its argument list in an arrayref before passing them here.
247
248 The argument list may contain:
249
250 =over
251
252 =item *
253
254 The same 4-element argument set one would normally pass to
255 L<DBI/connect>, optionally followed by
256 L<extra attributes|/DBIx::Class specific connection attributes>
257 recognized by DBIx::Class:
258
259   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
260
261 =item *
262
263 A single code reference which returns a connected
264 L<DBI database handle|DBI/connect> optionally followed by
265 L<extra attributes|/DBIx::Class specific connection attributes> recognized
266 by DBIx::Class:
267
268   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
269
270 =item *
271
272 A single hashref with all the attributes and the dsn/user/password
273 mixed together:
274
275   $connect_info_args = [{
276     dsn => $dsn,
277     user => $user,
278     password => $pass,
279     %dbi_attributes,
280     %extra_attributes,
281   }];
282
283   $connect_info_args = [{
284     dbh_maker => sub { DBI->connect (...) },
285     %dbi_attributes,
286     %extra_attributes,
287   }];
288
289 This is particularly useful for L<Catalyst> based applications, allowing the
290 following config (L<Config::General> style):
291
292   <Model::DB>
293     schema_class   App::DB
294     <connect_info>
295       dsn          dbi:mysql:database=test
296       user         testuser
297       password     TestPass
298       AutoCommit   1
299     </connect_info>
300   </Model::DB>
301
302 The C<dsn>/C<user>/C<password> combination can be substituted by the
303 C<dbh_maker> key whose value is a coderef that returns a connected
304 L<DBI database handle|DBI/connect>
305
306 =back
307
308 Please note that the L<DBI> docs recommend that you always explicitly
309 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
310 recommends that it be set to I<1>, and that you perform transactions
311 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
312 to I<1> if you do not do explicitly set it to zero.  This is the default
313 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
314
315 =head3 DBIx::Class specific connection attributes
316
317 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
318 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
319 the following connection options. These options can be mixed in with your other
320 L<DBI> connection attributes, or placed in a separate hashref
321 (C<\%extra_attributes>) as shown above.
322
323 Every time C<connect_info> is invoked, any previous settings for
324 these options will be cleared before setting the new ones, regardless of
325 whether any options are specified in the new C<connect_info>.
326
327
328 =over
329
330 =item on_connect_do
331
332 Specifies things to do immediately after connecting or re-connecting to
333 the database.  Its value may contain:
334
335 =over
336
337 =item a scalar
338
339 This contains one SQL statement to execute.
340
341 =item an array reference
342
343 This contains SQL statements to execute in order.  Each element contains
344 a string or a code reference that returns a string.
345
346 =item a code reference
347
348 This contains some code to execute.  Unlike code references within an
349 array reference, its return value is ignored.
350
351 =back
352
353 =item on_disconnect_do
354
355 Takes arguments in the same form as L</on_connect_do> and executes them
356 immediately before disconnecting from the database.
357
358 Note, this only runs if you explicitly call L</disconnect> on the
359 storage object.
360
361 =item on_connect_call
362
363 A more generalized form of L</on_connect_do> that calls the specified
364 C<connect_call_METHOD> methods in your storage driver.
365
366   on_connect_do => 'select 1'
367
368 is equivalent to:
369
370   on_connect_call => [ [ do_sql => 'select 1' ] ]
371
372 Its values may contain:
373
374 =over
375
376 =item a scalar
377
378 Will call the C<connect_call_METHOD> method.
379
380 =item a code reference
381
382 Will execute C<< $code->($storage) >>
383
384 =item an array reference
385
386 Each value can be a method name or code reference.
387
388 =item an array of arrays
389
390 For each array, the first item is taken to be the C<connect_call_> method name
391 or code reference, and the rest are parameters to it.
392
393 =back
394
395 Some predefined storage methods you may use:
396
397 =over
398
399 =item do_sql
400
401 Executes a SQL string or a code reference that returns a SQL string. This is
402 what L</on_connect_do> and L</on_disconnect_do> use.
403
404 It can take:
405
406 =over
407
408 =item a scalar
409
410 Will execute the scalar as SQL.
411
412 =item an arrayref
413
414 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
415 attributes hashref and bind values.
416
417 =item a code reference
418
419 Will execute C<< $code->($storage) >> and execute the return array refs as
420 above.
421
422 =back
423
424 =item datetime_setup
425
426 Execute any statements necessary to initialize the database session to return
427 and accept datetime/timestamp values used with
428 L<DBIx::Class::InflateColumn::DateTime>.
429
430 Only necessary for some databases, see your specific storage driver for
431 implementation details.
432
433 =back
434
435 =item on_disconnect_call
436
437 Takes arguments in the same form as L</on_connect_call> and executes them
438 immediately before disconnecting from the database.
439
440 Calls the C<disconnect_call_METHOD> methods as opposed to the
441 C<connect_call_METHOD> methods called by L</on_connect_call>.
442
443 Note, this only runs if you explicitly call L</disconnect> on the
444 storage object.
445
446 =item disable_sth_caching
447
448 If set to a true value, this option will disable the caching of
449 statement handles via L<DBI/prepare_cached>.
450
451 =item limit_dialect
452
453 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
454 default L</sql_limit_dialect> setting of the storage (if any). For a list
455 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
456
457 =item quote_char
458
459 Specifies what characters to use to quote table and column names.
460
461 C<quote_char> expects either a single character, in which case is it
462 is placed on either side of the table/column name, or an arrayref of length
463 2 in which case the table/column name is placed between the elements.
464
465 For example under MySQL you should use C<< quote_char => '`' >>, and for
466 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
467
468 =item name_sep
469
470 This parameter is only useful in conjunction with C<quote_char>, and is used to
471 specify the character that separates elements (schemas, tables, columns) from
472 each other. If unspecified it defaults to the most commonly used C<.>.
473
474 =item unsafe
475
476 This Storage driver normally installs its own C<HandleError>, sets
477 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
478 all database handles, including those supplied by a coderef.  It does this
479 so that it can have consistent and useful error behavior.
480
481 If you set this option to a true value, Storage will not do its usual
482 modifications to the database handle's attributes, and instead relies on
483 the settings in your connect_info DBI options (or the values you set in
484 your connection coderef, in the case that you are connecting via coderef).
485
486 Note that your custom settings can cause Storage to malfunction,
487 especially if you set a C<HandleError> handler that suppresses exceptions
488 and/or disable C<RaiseError>.
489
490 =item auto_savepoint
491
492 If this option is true, L<DBIx::Class> will use savepoints when nesting
493 transactions, making it possible to recover from failure in the inner
494 transaction without having to abort all outer transactions.
495
496 =item cursor_class
497
498 Use this argument to supply a cursor class other than the default
499 L<DBIx::Class::Storage::DBI::Cursor>.
500
501 =back
502
503 Some real-life examples of arguments to L</connect_info> and
504 L<DBIx::Class::Schema/connect>
505
506   # Simple SQLite connection
507   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
508
509   # Connect via subref
510   ->connect_info([ sub { DBI->connect(...) } ]);
511
512   # Connect via subref in hashref
513   ->connect_info([{
514     dbh_maker => sub { DBI->connect(...) },
515     on_connect_do => 'alter session ...',
516   }]);
517
518   # A bit more complicated
519   ->connect_info(
520     [
521       'dbi:Pg:dbname=foo',
522       'postgres',
523       'my_pg_password',
524       { AutoCommit => 1 },
525       { quote_char => q{"} },
526     ]
527   );
528
529   # Equivalent to the previous example
530   ->connect_info(
531     [
532       'dbi:Pg:dbname=foo',
533       'postgres',
534       'my_pg_password',
535       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
536     ]
537   );
538
539   # Same, but with hashref as argument
540   # See parse_connect_info for explanation
541   ->connect_info(
542     [{
543       dsn         => 'dbi:Pg:dbname=foo',
544       user        => 'postgres',
545       password    => 'my_pg_password',
546       AutoCommit  => 1,
547       quote_char  => q{"},
548       name_sep    => q{.},
549     }]
550   );
551
552   # Subref + DBIx::Class-specific connection options
553   ->connect_info(
554     [
555       sub { DBI->connect(...) },
556       {
557           quote_char => q{`},
558           name_sep => q{@},
559           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
560           disable_sth_caching => 1,
561       },
562     ]
563   );
564
565
566
567 =cut
568
569 sub connect_info {
570   my ($self, $info) = @_;
571
572   return $self->_connect_info if !$info;
573
574   $self->_connect_info($info); # copy for _connect_info
575
576   $info = $self->_normalize_connect_info($info)
577     if ref $info eq 'ARRAY';
578
579   for my $storage_opt (keys %{ $info->{storage_options} }) {
580     my $value = $info->{storage_options}{$storage_opt};
581
582     $self->$storage_opt($value);
583   }
584
585   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
586   #  the new set of options
587   $self->_sql_maker(undef);
588   $self->_sql_maker_opts({});
589
590   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
591     my $value = $info->{sql_maker_options}{$sql_maker_opt};
592
593     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
594   }
595
596   my %attrs = (
597     %{ $self->_default_dbi_connect_attributes || {} },
598     %{ $info->{attributes} || {} },
599   );
600
601   my @args = @{ $info->{arguments} };
602
603   $self->_dbi_connect_info([@args,
604     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
605
606   # FIXME - dirty:
607   # save attributes them in a separate accessor so they are always
608   # introspectable, even in case of a CODE $dbhmaker
609   $self->_dbic_connect_attributes (\%attrs);
610
611   return $self->_connect_info;
612 }
613
614 sub _normalize_connect_info {
615   my ($self, $info_arg) = @_;
616   my %info;
617
618   my @args = @$info_arg;  # take a shallow copy for further mutilation
619
620   # combine/pre-parse arguments depending on invocation style
621
622   my %attrs;
623   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
624     %attrs = %{ $args[1] || {} };
625     @args = $args[0];
626   }
627   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
628     %attrs = %{$args[0]};
629     @args = ();
630     if (my $code = delete $attrs{dbh_maker}) {
631       @args = $code;
632
633       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
634       if (@ignored) {
635         carp sprintf (
636             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
637           . "to the result of 'dbh_maker'",
638
639           join (', ', map { "'$_'" } (@ignored) ),
640         );
641       }
642     }
643     else {
644       @args = delete @attrs{qw/dsn user password/};
645     }
646   }
647   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
648     %attrs = (
649       % { $args[3] || {} },
650       % { $args[4] || {} },
651     );
652     @args = @args[0,1,2];
653   }
654
655   $info{arguments} = \@args;
656
657   my @storage_opts = grep exists $attrs{$_},
658     @storage_options, 'cursor_class';
659
660   @{ $info{storage_options} }{@storage_opts} =
661     delete @attrs{@storage_opts} if @storage_opts;
662
663   my @sql_maker_opts = grep exists $attrs{$_},
664     qw/limit_dialect quote_char name_sep/;
665
666   @{ $info{sql_maker_options} }{@sql_maker_opts} =
667     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
668
669   $info{attributes} = \%attrs if %attrs;
670
671   return \%info;
672 }
673
674 sub _default_dbi_connect_attributes {
675   return {
676     AutoCommit => 1,
677     RaiseError => 1,
678     PrintError => 0,
679   };
680 }
681
682 =head2 on_connect_do
683
684 This method is deprecated in favour of setting via L</connect_info>.
685
686 =cut
687
688 =head2 on_disconnect_do
689
690 This method is deprecated in favour of setting via L</connect_info>.
691
692 =cut
693
694 sub _parse_connect_do {
695   my ($self, $type) = @_;
696
697   my $val = $self->$type;
698   return () if not defined $val;
699
700   my @res;
701
702   if (not ref($val)) {
703     push @res, [ 'do_sql', $val ];
704   } elsif (ref($val) eq 'CODE') {
705     push @res, $val;
706   } elsif (ref($val) eq 'ARRAY') {
707     push @res, map { [ 'do_sql', $_ ] } @$val;
708   } else {
709     $self->throw_exception("Invalid type for $type: ".ref($val));
710   }
711
712   return \@res;
713 }
714
715 =head2 dbh_do
716
717 Arguments: ($subref | $method_name), @extra_coderef_args?
718
719 Execute the given $subref or $method_name using the new exception-based
720 connection management.
721
722 The first two arguments will be the storage object that C<dbh_do> was called
723 on and a database handle to use.  Any additional arguments will be passed
724 verbatim to the called subref as arguments 2 and onwards.
725
726 Using this (instead of $self->_dbh or $self->dbh) ensures correct
727 exception handling and reconnection (or failover in future subclasses).
728
729 Your subref should have no side-effects outside of the database, as
730 there is the potential for your subref to be partially double-executed
731 if the database connection was stale/dysfunctional.
732
733 Example:
734
735   my @stuff = $schema->storage->dbh_do(
736     sub {
737       my ($storage, $dbh, @cols) = @_;
738       my $cols = join(q{, }, @cols);
739       $dbh->selectrow_array("SELECT $cols FROM foo");
740     },
741     @column_list
742   );
743
744 =cut
745
746 sub dbh_do {
747   my $self = shift;
748   my $code = shift;
749
750   my $dbh = $self->_get_dbh;
751
752   return $self->$code($dbh, @_)
753     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
754
755   local $self->{_in_dbh_do} = 1;
756
757   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
758   my $args = \@_;
759   return try {
760     $self->$code ($dbh, @$args);
761   } catch {
762     $self->throw_exception($_) if $self->connected;
763
764     # We were not connected - reconnect and retry, but let any
765     #  exception fall right through this time
766     carp "Retrying $code after catching disconnected exception: $_"
767       if $ENV{DBIC_DBIRETRY_DEBUG};
768
769     $self->_populate_dbh;
770     $self->$code($self->_dbh, @$args);
771   };
772 }
773
774 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
775 # It also informs dbh_do to bypass itself while under the direction of txn_do,
776 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
777 sub txn_do {
778   my $self = shift;
779   my $coderef = shift;
780
781   ref $coderef eq 'CODE' or $self->throw_exception
782     ('$coderef must be a CODE reference');
783
784   local $self->{_in_dbh_do} = 1;
785
786   my @result;
787   my $want_array = wantarray;
788
789   my $tried = 0;
790   while(1) {
791     my $exception;
792
793     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
794     my $args = \@_;
795
796     try {
797       $self->txn_begin;
798       my $txn_start_depth = $self->transaction_depth;
799       if($want_array) {
800           @result = $coderef->(@$args);
801       }
802       elsif(defined $want_array) {
803           $result[0] = $coderef->(@$args);
804       }
805       else {
806           $coderef->(@$args);
807       }
808
809       my $delta_txn = $txn_start_depth - $self->transaction_depth;
810       if ($delta_txn == 0) {
811         $self->txn_commit;
812       }
813       elsif ($delta_txn != 1) {
814         # an off-by-one would mean we fired a rollback
815         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
816       }
817     } catch {
818       $exception = $_;
819     };
820
821     if(! defined $exception) { return $want_array ? @result : $result[0] }
822
823     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
824       my $rollback_exception;
825       try { $self->txn_rollback } catch { $rollback_exception = shift };
826       if(defined $rollback_exception) {
827         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
828         $self->throw_exception($exception)  # propagate nested rollback
829           if $rollback_exception =~ /$exception_class/;
830
831         $self->throw_exception(
832           "Transaction aborted: ${exception}. "
833           . "Rollback failed: ${rollback_exception}"
834         );
835       }
836       $self->throw_exception($exception)
837     }
838
839     # We were not connected, and was first try - reconnect and retry
840     # via the while loop
841     carp "Retrying $coderef after catching disconnected exception: $exception"
842       if $ENV{DBIC_TXNRETRY_DEBUG};
843     $self->_populate_dbh;
844   }
845 }
846
847 =head2 disconnect
848
849 Our C<disconnect> method also performs a rollback first if the
850 database is not in C<AutoCommit> mode.
851
852 =cut
853
854 sub disconnect {
855   my ($self) = @_;
856
857   if( $self->_dbh ) {
858     my @actions;
859
860     push @actions, ( $self->on_disconnect_call || () );
861     push @actions, $self->_parse_connect_do ('on_disconnect_do');
862
863     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
864
865     $self->_dbh_rollback unless $self->_dbh_autocommit;
866
867     %{ $self->_dbh->{CachedKids} } = ();
868     $self->_dbh->disconnect;
869     $self->_dbh(undef);
870     $self->{_dbh_gen}++;
871   }
872 }
873
874 =head2 with_deferred_fk_checks
875
876 =over 4
877
878 =item Arguments: C<$coderef>
879
880 =item Return Value: The return value of $coderef
881
882 =back
883
884 Storage specific method to run the code ref with FK checks deferred or
885 in MySQL's case disabled entirely.
886
887 =cut
888
889 # Storage subclasses should override this
890 sub with_deferred_fk_checks {
891   my ($self, $sub) = @_;
892   $sub->();
893 }
894
895 =head2 connected
896
897 =over
898
899 =item Arguments: none
900
901 =item Return Value: 1|0
902
903 =back
904
905 Verifies that the current database handle is active and ready to execute
906 an SQL statement (e.g. the connection did not get stale, server is still
907 answering, etc.) This method is used internally by L</dbh>.
908
909 =cut
910
911 sub connected {
912   my $self = shift;
913   return 0 unless $self->_seems_connected;
914
915   #be on the safe side
916   local $self->_dbh->{RaiseError} = 1;
917
918   return $self->_ping;
919 }
920
921 sub _seems_connected {
922   my $self = shift;
923
924   $self->_preserve_foreign_dbh;
925
926   my $dbh = $self->_dbh
927     or return 0;
928
929   return $dbh->FETCH('Active');
930 }
931
932 sub _ping {
933   my $self = shift;
934
935   my $dbh = $self->_dbh or return 0;
936
937   return $dbh->ping;
938 }
939
940 sub ensure_connected {
941   my ($self) = @_;
942
943   unless ($self->connected) {
944     $self->_populate_dbh;
945   }
946 }
947
948 =head2 dbh
949
950 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
951 is guaranteed to be healthy by implicitly calling L</connected>, and if
952 necessary performing a reconnection before returning. Keep in mind that this
953 is very B<expensive> on some database engines. Consider using L</dbh_do>
954 instead.
955
956 =cut
957
958 sub dbh {
959   my ($self) = @_;
960
961   if (not $self->_dbh) {
962     $self->_populate_dbh;
963   } else {
964     $self->ensure_connected;
965   }
966   return $self->_dbh;
967 }
968
969 # this is the internal "get dbh or connect (don't check)" method
970 sub _get_dbh {
971   my $self = shift;
972   $self->_preserve_foreign_dbh;
973   $self->_populate_dbh unless $self->_dbh;
974   return $self->_dbh;
975 }
976
977 sub sql_maker {
978   my ($self) = @_;
979   unless ($self->_sql_maker) {
980     my $sql_maker_class = $self->sql_maker_class;
981     $self->ensure_class_loaded ($sql_maker_class);
982
983     my %opts = %{$self->_sql_maker_opts||{}};
984     my $dialect =
985       $opts{limit_dialect}
986         ||
987       $self->sql_limit_dialect
988         ||
989       do {
990         my $s_class = (ref $self) || $self;
991         carp (
992           "Your storage class ($s_class) does not set sql_limit_dialect and you "
993         . 'have not supplied an explicit limit_dialect in your connection_info. '
994         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
995         . 'databases but can be (and often is) painfully slow.'
996         );
997
998         'GenericSubQ';
999       }
1000     ;
1001
1002     $self->_sql_maker($sql_maker_class->new(
1003       bindtype=>'columns',
1004       array_datatypes => 1,
1005       limit_dialect => $dialect,
1006       name_sep => '.',
1007       %opts,
1008     ));
1009   }
1010   return $self->_sql_maker;
1011 }
1012
1013 # nothing to do by default
1014 sub _rebless {}
1015 sub _init {}
1016
1017 sub _populate_dbh {
1018   my ($self) = @_;
1019
1020   my @info = @{$self->_dbi_connect_info || []};
1021   $self->_dbh(undef); # in case ->connected failed we might get sent here
1022   $self->_dbh_details({}); # reset everything we know
1023
1024   $self->_dbh($self->_connect(@info));
1025
1026   $self->_conn_pid($$);
1027   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1028
1029   $self->_determine_driver;
1030
1031   # Always set the transaction depth on connect, since
1032   #  there is no transaction in progress by definition
1033   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1034
1035   $self->_run_connection_actions unless $self->{_in_determine_driver};
1036 }
1037
1038 sub _run_connection_actions {
1039   my $self = shift;
1040   my @actions;
1041
1042   push @actions, ( $self->on_connect_call || () );
1043   push @actions, $self->_parse_connect_do ('on_connect_do');
1044
1045   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1046 }
1047
1048
1049
1050 sub set_use_dbms_capability {
1051   $_[0]->set_inherited ($_[1], $_[2]);
1052 }
1053
1054 sub get_use_dbms_capability {
1055   my ($self, $capname) = @_;
1056
1057   my $use = $self->get_inherited ($capname);
1058   return defined $use
1059     ? $use
1060     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1061   ;
1062 }
1063
1064 sub set_dbms_capability {
1065   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1066 }
1067
1068 sub get_dbms_capability {
1069   my ($self, $capname) = @_;
1070
1071   my $cap = $self->_dbh_details->{capability}{$capname};
1072
1073   unless (defined $cap) {
1074     if (my $meth = $self->can ("_determine$capname")) {
1075       $cap = $self->$meth ? 1 : 0;
1076     }
1077     else {
1078       $cap = 0;
1079     }
1080
1081     $self->set_dbms_capability ($capname, $cap);
1082   }
1083
1084   return $cap;
1085 }
1086
1087 sub _server_info {
1088   my $self = shift;
1089
1090   my $info;
1091   unless ($info = $self->_dbh_details->{info}) {
1092
1093     $info = {};
1094
1095     my $server_version = try { $self->_get_server_version };
1096
1097     if (defined $server_version) {
1098       $info->{dbms_version} = $server_version;
1099
1100       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1101       my @verparts = split (/\./, $numeric_version);
1102       if (
1103         @verparts
1104           &&
1105         $verparts[0] <= 999
1106       ) {
1107         # consider only up to 3 version parts, iff not more than 3 digits
1108         my @use_parts;
1109         while (@verparts && @use_parts < 3) {
1110           my $p = shift @verparts;
1111           last if $p > 999;
1112           push @use_parts, $p;
1113         }
1114         push @use_parts, 0 while @use_parts < 3;
1115
1116         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1117       }
1118     }
1119
1120     $self->_dbh_details->{info} = $info;
1121   }
1122
1123   return $info;
1124 }
1125
1126 sub _get_server_version {
1127   shift->_get_dbh->get_info(18);
1128 }
1129
1130 sub _determine_driver {
1131   my ($self) = @_;
1132
1133   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1134     my $started_connected = 0;
1135     local $self->{_in_determine_driver} = 1;
1136
1137     if (ref($self) eq __PACKAGE__) {
1138       my $driver;
1139       if ($self->_dbh) { # we are connected
1140         $driver = $self->_dbh->{Driver}{Name};
1141         $started_connected = 1;
1142       } else {
1143         # if connect_info is a CODEREF, we have no choice but to connect
1144         if (ref $self->_dbi_connect_info->[0] &&
1145             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1146           $self->_populate_dbh;
1147           $driver = $self->_dbh->{Driver}{Name};
1148         }
1149         else {
1150           # try to use dsn to not require being connected, the driver may still
1151           # force a connection in _rebless to determine version
1152           # (dsn may not be supplied at all if all we do is make a mock-schema)
1153           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1154           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1155           $driver ||= $ENV{DBI_DRIVER};
1156         }
1157       }
1158
1159       if ($driver) {
1160         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1161         if ($self->load_optional_class($storage_class)) {
1162           mro::set_mro($storage_class, 'c3');
1163           bless $self, $storage_class;
1164           $self->_rebless();
1165         }
1166       }
1167     }
1168
1169     $self->_driver_determined(1);
1170
1171     $self->_init; # run driver-specific initializations
1172
1173     $self->_run_connection_actions
1174         if !$started_connected && defined $self->_dbh;
1175   }
1176 }
1177
1178 sub _do_connection_actions {
1179   my $self          = shift;
1180   my $method_prefix = shift;
1181   my $call          = shift;
1182
1183   if (not ref($call)) {
1184     my $method = $method_prefix . $call;
1185     $self->$method(@_);
1186   } elsif (ref($call) eq 'CODE') {
1187     $self->$call(@_);
1188   } elsif (ref($call) eq 'ARRAY') {
1189     if (ref($call->[0]) ne 'ARRAY') {
1190       $self->_do_connection_actions($method_prefix, $_) for @$call;
1191     } else {
1192       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1193     }
1194   } else {
1195     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1196   }
1197
1198   return $self;
1199 }
1200
1201 sub connect_call_do_sql {
1202   my $self = shift;
1203   $self->_do_query(@_);
1204 }
1205
1206 sub disconnect_call_do_sql {
1207   my $self = shift;
1208   $self->_do_query(@_);
1209 }
1210
1211 # override in db-specific backend when necessary
1212 sub connect_call_datetime_setup { 1 }
1213
1214 sub _do_query {
1215   my ($self, $action) = @_;
1216
1217   if (ref $action eq 'CODE') {
1218     $action = $action->($self);
1219     $self->_do_query($_) foreach @$action;
1220   }
1221   else {
1222     # Most debuggers expect ($sql, @bind), so we need to exclude
1223     # the attribute hash which is the second argument to $dbh->do
1224     # furthermore the bind values are usually to be presented
1225     # as named arrayref pairs, so wrap those here too
1226     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1227     my $sql = shift @do_args;
1228     my $attrs = shift @do_args;
1229     my @bind = map { [ undef, $_ ] } @do_args;
1230
1231     $self->_query_start($sql, @bind);
1232     $self->_get_dbh->do($sql, $attrs, @do_args);
1233     $self->_query_end($sql, @bind);
1234   }
1235
1236   return $self;
1237 }
1238
1239 sub _connect {
1240   my ($self, @info) = @_;
1241
1242   $self->throw_exception("You failed to provide any connection info")
1243     if !@info;
1244
1245   my ($old_connect_via, $dbh);
1246
1247   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1248     $old_connect_via = $DBI::connect_via;
1249     $DBI::connect_via = 'connect';
1250   }
1251
1252   try {
1253     if(ref $info[0] eq 'CODE') {
1254        $dbh = $info[0]->();
1255     }
1256     else {
1257        $dbh = DBI->connect(@info);
1258     }
1259
1260     if (!$dbh) {
1261       die $DBI::errstr;
1262     }
1263
1264     unless ($self->unsafe) {
1265
1266       # this odd anonymous coderef dereference is in fact really
1267       # necessary to avoid the unwanted effect described in perl5
1268       # RT#75792
1269       sub {
1270         my $weak_self = $_[0];
1271         weaken $weak_self;
1272
1273         $_[1]->{HandleError} = sub {
1274           if ($weak_self) {
1275             $weak_self->throw_exception("DBI Exception: $_[0]");
1276           }
1277           else {
1278             # the handler may be invoked by something totally out of
1279             # the scope of DBIC
1280             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1281           }
1282         };
1283       }->($self, $dbh);
1284
1285       $dbh->{ShowErrorStatement} = 1;
1286       $dbh->{RaiseError} = 1;
1287       $dbh->{PrintError} = 0;
1288     }
1289   }
1290   catch {
1291     $self->throw_exception("DBI Connection failed: $_")
1292   }
1293   finally {
1294     $DBI::connect_via = $old_connect_via if $old_connect_via;
1295   };
1296
1297   $self->_dbh_autocommit($dbh->{AutoCommit});
1298   $dbh;
1299 }
1300
1301 sub svp_begin {
1302   my ($self, $name) = @_;
1303
1304   $name = $self->_svp_generate_name
1305     unless defined $name;
1306
1307   $self->throw_exception ("You can't use savepoints outside a transaction")
1308     if $self->{transaction_depth} == 0;
1309
1310   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1311     unless $self->can('_svp_begin');
1312
1313   push @{ $self->{savepoints} }, $name;
1314
1315   $self->debugobj->svp_begin($name) if $self->debug;
1316
1317   return $self->_svp_begin($name);
1318 }
1319
1320 sub svp_release {
1321   my ($self, $name) = @_;
1322
1323   $self->throw_exception ("You can't use savepoints outside a transaction")
1324     if $self->{transaction_depth} == 0;
1325
1326   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1327     unless $self->can('_svp_release');
1328
1329   if (defined $name) {
1330     $self->throw_exception ("Savepoint '$name' does not exist")
1331       unless grep { $_ eq $name } @{ $self->{savepoints} };
1332
1333     # Dig through the stack until we find the one we are releasing.  This keeps
1334     # the stack up to date.
1335     my $svp;
1336
1337     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1338   } else {
1339     $name = pop @{ $self->{savepoints} };
1340   }
1341
1342   $self->debugobj->svp_release($name) if $self->debug;
1343
1344   return $self->_svp_release($name);
1345 }
1346
1347 sub svp_rollback {
1348   my ($self, $name) = @_;
1349
1350   $self->throw_exception ("You can't use savepoints outside a transaction")
1351     if $self->{transaction_depth} == 0;
1352
1353   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1354     unless $self->can('_svp_rollback');
1355
1356   if (defined $name) {
1357       # If they passed us a name, verify that it exists in the stack
1358       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1359           $self->throw_exception("Savepoint '$name' does not exist!");
1360       }
1361
1362       # Dig through the stack until we find the one we are releasing.  This keeps
1363       # the stack up to date.
1364       while(my $s = pop(@{ $self->{savepoints} })) {
1365           last if($s eq $name);
1366       }
1367       # Add the savepoint back to the stack, as a rollback doesn't remove the
1368       # named savepoint, only everything after it.
1369       push(@{ $self->{savepoints} }, $name);
1370   } else {
1371       # We'll assume they want to rollback to the last savepoint
1372       $name = $self->{savepoints}->[-1];
1373   }
1374
1375   $self->debugobj->svp_rollback($name) if $self->debug;
1376
1377   return $self->_svp_rollback($name);
1378 }
1379
1380 sub _svp_generate_name {
1381   my ($self) = @_;
1382   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1383 }
1384
1385 sub txn_begin {
1386   my $self = shift;
1387
1388   # this means we have not yet connected and do not know the AC status
1389   # (e.g. coderef $dbh)
1390   if (! defined $self->_dbh_autocommit) {
1391     $self->ensure_connected;
1392   }
1393   # otherwise re-connect on pid changes, so
1394   # that the txn_depth is adjusted properly
1395   # the lightweight _get_dbh is good enoug here
1396   # (only superficial handle check, no pings)
1397   else {
1398     $self->_get_dbh;
1399   }
1400
1401   if($self->transaction_depth == 0) {
1402     $self->debugobj->txn_begin()
1403       if $self->debug;
1404     $self->_dbh_begin_work;
1405   }
1406   elsif ($self->auto_savepoint) {
1407     $self->svp_begin;
1408   }
1409   $self->{transaction_depth}++;
1410 }
1411
1412 sub _dbh_begin_work {
1413   my $self = shift;
1414
1415   # if the user is utilizing txn_do - good for him, otherwise we need to
1416   # ensure that the $dbh is healthy on BEGIN.
1417   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1418   # will be replaced by a failure of begin_work itself (which will be
1419   # then retried on reconnect)
1420   if ($self->{_in_dbh_do}) {
1421     $self->_dbh->begin_work;
1422   } else {
1423     $self->dbh_do(sub { $_[1]->begin_work });
1424   }
1425 }
1426
1427 sub txn_commit {
1428   my $self = shift;
1429   if ($self->{transaction_depth} == 1) {
1430     $self->debugobj->txn_commit()
1431       if ($self->debug);
1432     $self->_dbh_commit;
1433     $self->{transaction_depth} = 0
1434       if $self->_dbh_autocommit;
1435   }
1436   elsif($self->{transaction_depth} > 1) {
1437     $self->{transaction_depth}--;
1438     $self->svp_release
1439       if $self->auto_savepoint;
1440   }
1441   else {
1442     $self->throw_exception( 'Refusing to commit without a started transaction' );
1443   }
1444 }
1445
1446 sub _dbh_commit {
1447   my $self = shift;
1448   my $dbh  = $self->_dbh
1449     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1450   $dbh->commit;
1451 }
1452
1453 sub txn_rollback {
1454   my $self = shift;
1455   my $dbh = $self->_dbh;
1456   try {
1457     if ($self->{transaction_depth} == 1) {
1458       $self->debugobj->txn_rollback()
1459         if ($self->debug);
1460       $self->{transaction_depth} = 0
1461         if $self->_dbh_autocommit;
1462       $self->_dbh_rollback;
1463     }
1464     elsif($self->{transaction_depth} > 1) {
1465       $self->{transaction_depth}--;
1466       if ($self->auto_savepoint) {
1467         $self->svp_rollback;
1468         $self->svp_release;
1469       }
1470     }
1471     else {
1472       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1473     }
1474   }
1475   catch {
1476     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1477
1478     if ($_ !~ /$exception_class/) {
1479       # ensure that a failed rollback resets the transaction depth
1480       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1481     }
1482
1483     $self->throw_exception($_)
1484   };
1485 }
1486
1487 sub _dbh_rollback {
1488   my $self = shift;
1489   my $dbh  = $self->_dbh
1490     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1491   $dbh->rollback;
1492 }
1493
1494 # This used to be the top-half of _execute.  It was split out to make it
1495 #  easier to override in NoBindVars without duping the rest.  It takes up
1496 #  all of _execute's args, and emits $sql, @bind.
1497 sub _prep_for_execute {
1498   my ($self, $op, $extra_bind, $ident, $args) = @_;
1499
1500   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1501     $ident = $ident->from();
1502   }
1503
1504   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1505
1506   unshift(@bind,
1507     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1508       if $extra_bind;
1509   return ($sql, \@bind);
1510 }
1511
1512
1513 sub _fix_bind_params {
1514     my ($self, @bind) = @_;
1515
1516     ### Turn @bind from something like this:
1517     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1518     ### to this:
1519     ###   ( "'1'", "'1'", "'3'" )
1520     return
1521         map {
1522             if ( defined( $_ && $_->[1] ) ) {
1523                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1524             }
1525             else { q{NULL}; }
1526         } @bind;
1527 }
1528
1529 sub _query_start {
1530     my ( $self, $sql, @bind ) = @_;
1531
1532     if ( $self->debug ) {
1533         @bind = $self->_fix_bind_params(@bind);
1534
1535         $self->debugobj->query_start( $sql, @bind );
1536     }
1537 }
1538
1539 sub _query_end {
1540     my ( $self, $sql, @bind ) = @_;
1541
1542     if ( $self->debug ) {
1543         @bind = $self->_fix_bind_params(@bind);
1544         $self->debugobj->query_end( $sql, @bind );
1545     }
1546 }
1547
1548 sub _dbh_execute {
1549   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1550
1551   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1552
1553   $self->_query_start( $sql, @$bind );
1554
1555   my $sth = $self->sth($sql,$op);
1556
1557   my $placeholder_index = 1;
1558
1559   foreach my $bound (@$bind) {
1560     my $attributes = {};
1561     my($column_name, @data) = @$bound;
1562
1563     if ($bind_attributes) {
1564       $attributes = $bind_attributes->{$column_name}
1565       if defined $bind_attributes->{$column_name};
1566     }
1567
1568     foreach my $data (@data) {
1569       my $ref = ref $data;
1570       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1571
1572       $sth->bind_param($placeholder_index, $data, $attributes);
1573       $placeholder_index++;
1574     }
1575   }
1576
1577   # Can this fail without throwing an exception anyways???
1578   my $rv = $sth->execute();
1579   $self->throw_exception(
1580     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1581   ) if !$rv;
1582
1583   $self->_query_end( $sql, @$bind );
1584
1585   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1586 }
1587
1588 sub _execute {
1589     my $self = shift;
1590     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1591 }
1592
1593 sub insert {
1594   my ($self, $source, $to_insert) = @_;
1595
1596   my $colinfo = $source->columns_info;
1597
1598   # mix with auto-nextval marked values (a bit of a speed hit, but
1599   # no saner way to handle this yet)
1600   my $auto_nextvals = {} ;
1601   for my $col (keys %$colinfo) {
1602     if (
1603       $colinfo->{$col}{auto_nextval}
1604         and
1605       (
1606         ! exists $to_insert->{$col}
1607           or
1608         ref $to_insert->{$col} eq 'SCALAR'
1609       )
1610     ) {
1611       $auto_nextvals->{$col} = $self->_sequence_fetch(
1612         'nextval',
1613         ( $colinfo->{$col}{sequence} ||=
1614             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1615         ),
1616       );
1617     }
1618   }
1619
1620   # fuse the values
1621   $to_insert = { %$to_insert, %$auto_nextvals };
1622
1623   # list of primary keys we try to fetch from the database
1624   # both not-exsists and scalarrefs are considered
1625   my %fetch_pks;
1626   %fetch_pks = ( map
1627     { $_ => scalar keys %fetch_pks }  # so we can preserve order for prettyness
1628     grep
1629       { ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR' }
1630       $source->primary_columns
1631   );
1632
1633   my $sqla_opts;
1634   if ($self->_use_insert_returning) {
1635
1636     # retain order as declared in the resultsource
1637     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1638       push @{$sqla_opts->{returning}}, $_;
1639     }
1640   }
1641
1642   my $bind_attributes = $self->source_bind_attributes($source);
1643
1644   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1645
1646   my %returned_cols = %$auto_nextvals;
1647
1648   if (my $retlist = $sqla_opts->{returning}) {
1649     my @ret_vals = try {
1650       local $SIG{__WARN__} = sub {};
1651       my @r = $sth->fetchrow_array;
1652       $sth->finish;
1653       @r;
1654     };
1655
1656     @returned_cols{@$retlist} = @ret_vals if @ret_vals;
1657   }
1658
1659   return \%returned_cols;
1660 }
1661
1662
1663 ## Currently it is assumed that all values passed will be "normal", i.e. not
1664 ## scalar refs, or at least, all the same type as the first set, the statement is
1665 ## only prepped once.
1666 sub insert_bulk {
1667   my ($self, $source, $cols, $data) = @_;
1668
1669   my %colvalues;
1670   @colvalues{@$cols} = (0..$#$cols);
1671
1672   for my $i (0..$#$cols) {
1673     my $first_val = $data->[0][$i];
1674     next unless ref $first_val eq 'SCALAR';
1675
1676     $colvalues{ $cols->[$i] } = $first_val;
1677   }
1678
1679   # check for bad data and stringify stringifiable objects
1680   my $bad_slice = sub {
1681     my ($msg, $col_idx, $slice_idx) = @_;
1682     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1683       $msg,
1684       $cols->[$col_idx],
1685       do {
1686         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1687         Dumper {
1688           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1689         },
1690       }
1691     );
1692   };
1693
1694   for my $datum_idx (0..$#$data) {
1695     my $datum = $data->[$datum_idx];
1696
1697     for my $col_idx (0..$#$cols) {
1698       my $val            = $datum->[$col_idx];
1699       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1700       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1701
1702       if ($is_literal_sql) {
1703         if (not ref $val) {
1704           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1705         }
1706         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1707           $bad_slice->("$reftype reference found where literal SQL expected",
1708             $col_idx, $datum_idx);
1709         }
1710         elsif ($$val ne $$sqla_bind){
1711           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1712             $col_idx, $datum_idx);
1713         }
1714       }
1715       elsif (my $reftype = ref $val) {
1716         require overload;
1717         if (overload::Method($val, '""')) {
1718           $datum->[$col_idx] = "".$val;
1719         }
1720         else {
1721           $bad_slice->("$reftype reference found where bind expected",
1722             $col_idx, $datum_idx);
1723         }
1724       }
1725     }
1726   }
1727
1728   my ($sql, $bind) = $self->_prep_for_execute (
1729     'insert', undef, $source, [\%colvalues]
1730   );
1731   my @bind = @$bind;
1732
1733   my $empty_bind = 1 if (not @bind) &&
1734     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1735
1736   if ((not @bind) && (not $empty_bind)) {
1737     $self->throw_exception(
1738       'Cannot insert_bulk without support for placeholders'
1739     );
1740   }
1741
1742   # neither _execute_array, nor _execute_inserts_with_no_binds are
1743   # atomic (even if _execute _array is a single call). Thus a safety
1744   # scope guard
1745   my $guard = $self->txn_scope_guard;
1746
1747   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1748   my $sth = $self->sth($sql);
1749   my $rv = do {
1750     if ($empty_bind) {
1751       # bind_param_array doesn't work if there are no binds
1752       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1753     }
1754     else {
1755 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1756       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1757     }
1758   };
1759
1760   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1761
1762   $guard->commit;
1763
1764   return (wantarray ? ($rv, $sth, @bind) : $rv);
1765 }
1766
1767 sub _execute_array {
1768   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1769
1770   ## This must be an arrayref, else nothing works!
1771   my $tuple_status = [];
1772
1773   ## Get the bind_attributes, if any exist
1774   my $bind_attributes = $self->source_bind_attributes($source);
1775
1776   ## Bind the values and execute
1777   my $placeholder_index = 1;
1778
1779   foreach my $bound (@$bind) {
1780
1781     my $attributes = {};
1782     my ($column_name, $data_index) = @$bound;
1783
1784     if( $bind_attributes ) {
1785       $attributes = $bind_attributes->{$column_name}
1786       if defined $bind_attributes->{$column_name};
1787     }
1788
1789     my @data = map { $_->[$data_index] } @$data;
1790
1791     $sth->bind_param_array(
1792       $placeholder_index,
1793       [@data],
1794       (%$attributes ?  $attributes : ()),
1795     );
1796     $placeholder_index++;
1797   }
1798
1799   my ($rv, $err);
1800   try {
1801     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1802   }
1803   catch {
1804     $err = shift;
1805   };
1806
1807   # Statement must finish even if there was an exception.
1808   try {
1809     $sth->finish
1810   }
1811   catch {
1812     $err = shift unless defined $err
1813   };
1814
1815   $err = $sth->errstr
1816     if (! defined $err and $sth->err);
1817
1818   if (defined $err) {
1819     my $i = 0;
1820     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1821
1822     $self->throw_exception("Unexpected populate error: $err")
1823       if ($i > $#$tuple_status);
1824
1825     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1826       ($tuple_status->[$i][1] || $err),
1827       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1828     );
1829   }
1830
1831   return $rv;
1832 }
1833
1834 sub _dbh_execute_array {
1835     my ($self, $sth, $tuple_status, @extra) = @_;
1836
1837     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1838 }
1839
1840 sub _dbh_execute_inserts_with_no_binds {
1841   my ($self, $sth, $count) = @_;
1842
1843   my $err;
1844   try {
1845     my $dbh = $self->_get_dbh;
1846     local $dbh->{RaiseError} = 1;
1847     local $dbh->{PrintError} = 0;
1848
1849     $sth->execute foreach 1..$count;
1850   }
1851   catch {
1852     $err = shift;
1853   }
1854   finally {
1855     # Make sure statement is finished even if there was an exception.
1856     try {
1857       $sth->finish
1858     }
1859     catch {
1860       $err = shift unless defined $err;
1861     };
1862   };
1863
1864   $self->throw_exception($err) if defined $err;
1865
1866   return $count;
1867 }
1868
1869 sub update {
1870   my ($self, $source, @args) = @_;
1871
1872   my $bind_attrs = $self->source_bind_attributes($source);
1873
1874   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1875 }
1876
1877
1878 sub delete {
1879   my ($self, $source, @args) = @_;
1880
1881   my $bind_attrs = $self->source_bind_attributes($source);
1882
1883   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1884 }
1885
1886 # We were sent here because the $rs contains a complex search
1887 # which will require a subquery to select the correct rows
1888 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1889 #
1890 # Generating a single PK column subquery is trivial and supported
1891 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1892 # Look at _multipk_update_delete()
1893 sub _subq_update_delete {
1894   my $self = shift;
1895   my ($rs, $op, $values) = @_;
1896
1897   my $rsrc = $rs->result_source;
1898
1899   # quick check if we got a sane rs on our hands
1900   my @pcols = $rsrc->_pri_cols;
1901
1902   my $sel = $rs->_resolved_attrs->{select};
1903   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1904
1905   if (
1906       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1907         ne
1908       join ("\x00", sort @$sel )
1909   ) {
1910     $self->throw_exception (
1911       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1912     );
1913   }
1914
1915   if (@pcols == 1) {
1916     return $self->$op (
1917       $rsrc,
1918       $op eq 'update' ? $values : (),
1919       { $pcols[0] => { -in => $rs->as_query } },
1920     );
1921   }
1922
1923   else {
1924     return $self->_multipk_update_delete (@_);
1925   }
1926 }
1927
1928 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1929 # resultset update/delete involving subqueries. So by default resort
1930 # to simple (and inefficient) delete_all style per-row opearations,
1931 # while allowing specific storages to override this with a faster
1932 # implementation.
1933 #
1934 sub _multipk_update_delete {
1935   return shift->_per_row_update_delete (@_);
1936 }
1937
1938 # This is the default loop used to delete/update rows for multi PK
1939 # resultsets, and used by mysql exclusively (because it can't do anything
1940 # else).
1941 #
1942 # We do not use $row->$op style queries, because resultset update/delete
1943 # is not expected to cascade (this is what delete_all/update_all is for).
1944 #
1945 # There should be no race conditions as the entire operation is rolled
1946 # in a transaction.
1947 #
1948 sub _per_row_update_delete {
1949   my $self = shift;
1950   my ($rs, $op, $values) = @_;
1951
1952   my $rsrc = $rs->result_source;
1953   my @pcols = $rsrc->_pri_cols;
1954
1955   my $guard = $self->txn_scope_guard;
1956
1957   # emulate the return value of $sth->execute for non-selects
1958   my $row_cnt = '0E0';
1959
1960   my $subrs_cur = $rs->cursor;
1961   my @all_pk = $subrs_cur->all;
1962   for my $pks ( @all_pk) {
1963
1964     my $cond;
1965     for my $i (0.. $#pcols) {
1966       $cond->{$pcols[$i]} = $pks->[$i];
1967     }
1968
1969     $self->$op (
1970       $rsrc,
1971       $op eq 'update' ? $values : (),
1972       $cond,
1973     );
1974
1975     $row_cnt++;
1976   }
1977
1978   $guard->commit;
1979
1980   return $row_cnt;
1981 }
1982
1983 sub _select {
1984   my $self = shift;
1985   $self->_execute($self->_select_args(@_));
1986 }
1987
1988 sub _select_args_to_query {
1989   my $self = shift;
1990
1991   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1992   #  = $self->_select_args($ident, $select, $cond, $attrs);
1993   my ($op, $bind, $ident, $bind_attrs, @args) =
1994     $self->_select_args(@_);
1995
1996   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1997   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1998   $prepared_bind ||= [];
1999
2000   return wantarray
2001     ? ($sql, $prepared_bind, $bind_attrs)
2002     : \[ "($sql)", @$prepared_bind ]
2003   ;
2004 }
2005
2006 sub _select_args {
2007   my ($self, $ident, $select, $where, $attrs) = @_;
2008
2009   my $sql_maker = $self->sql_maker;
2010   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2011
2012   $attrs = {
2013     %$attrs,
2014     select => $select,
2015     from => $ident,
2016     where => $where,
2017     $rs_alias && $alias2source->{$rs_alias}
2018       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
2019       : ()
2020     ,
2021   };
2022
2023   # calculate bind_attrs before possible $ident mangling
2024   my $bind_attrs = {};
2025   for my $alias (keys %$alias2source) {
2026     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2027     for my $col (keys %$bindtypes) {
2028
2029       my $fqcn = join ('.', $alias, $col);
2030       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2031
2032       # Unqialified column names are nice, but at the same time can be
2033       # rather ambiguous. What we do here is basically go along with
2034       # the loop, adding an unqualified column slot to $bind_attrs,
2035       # alongside the fully qualified name. As soon as we encounter
2036       # another column by that name (which would imply another table)
2037       # we unset the unqualified slot and never add any info to it
2038       # to avoid erroneous type binding. If this happens the users
2039       # only choice will be to fully qualify his column name
2040
2041       if (exists $bind_attrs->{$col}) {
2042         $bind_attrs->{$col} = {};
2043       }
2044       else {
2045         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2046       }
2047     }
2048   }
2049
2050   # Sanity check the attributes (SQLMaker does it too, but
2051   # in case of a software_limit we'll never reach there)
2052   if (defined $attrs->{offset}) {
2053     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2054       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2055   }
2056   $attrs->{offset} ||= 0;
2057
2058   if (defined $attrs->{rows}) {
2059     $self->throw_exception("The rows attribute must be a positive integer if present")
2060       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2061   }
2062   elsif ($attrs->{offset}) {
2063     # MySQL actually recommends this approach.  I cringe.
2064     $attrs->{rows} = $sql_maker->__max_int;
2065   }
2066
2067   my @limit;
2068
2069   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2070   # storage, unless software limit was requested
2071   if (
2072     #limited has_many
2073     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2074        ||
2075     # grouped prefetch (to satisfy group_by == select)
2076     ( $attrs->{group_by}
2077         &&
2078       @{$attrs->{group_by}}
2079         &&
2080       $attrs->{_prefetch_select}
2081         &&
2082       @{$attrs->{_prefetch_select}}
2083     )
2084   ) {
2085     ($ident, $select, $where, $attrs)
2086       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2087   }
2088   elsif (! $attrs->{software_limit} ) {
2089     push @limit, $attrs->{rows}, $attrs->{offset};
2090   }
2091
2092   # try to simplify the joinmap further (prune unreferenced type-single joins)
2093   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2094
2095 ###
2096   # This would be the point to deflate anything found in $where
2097   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2098   # expect a row object. And all we have is a resultsource (it is trivial
2099   # to extract deflator coderefs via $alias2source above).
2100   #
2101   # I don't see a way forward other than changing the way deflators are
2102   # invoked, and that's just bad...
2103 ###
2104
2105   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2106 }
2107
2108 # Returns a counting SELECT for a simple count
2109 # query. Abstracted so that a storage could override
2110 # this to { count => 'firstcol' } or whatever makes
2111 # sense as a performance optimization
2112 sub _count_select {
2113   #my ($self, $source, $rs_attrs) = @_;
2114   return { count => '*' };
2115 }
2116
2117
2118 sub source_bind_attributes {
2119   my ($self, $source) = @_;
2120
2121   my $bind_attributes;
2122
2123   my $colinfo = $source->columns_info;
2124
2125   for my $col (keys %$colinfo) {
2126     if (my $dt = $colinfo->{$col}{data_type} ) {
2127       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2128     }
2129   }
2130
2131   return $bind_attributes;
2132 }
2133
2134 =head2 select
2135
2136 =over 4
2137
2138 =item Arguments: $ident, $select, $condition, $attrs
2139
2140 =back
2141
2142 Handle a SQL select statement.
2143
2144 =cut
2145
2146 sub select {
2147   my $self = shift;
2148   my ($ident, $select, $condition, $attrs) = @_;
2149   return $self->cursor_class->new($self, \@_, $attrs);
2150 }
2151
2152 sub select_single {
2153   my $self = shift;
2154   my ($rv, $sth, @bind) = $self->_select(@_);
2155   my @row = $sth->fetchrow_array;
2156   my @nextrow = $sth->fetchrow_array if @row;
2157   if(@row && @nextrow) {
2158     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2159   }
2160   # Need to call finish() to work round broken DBDs
2161   $sth->finish();
2162   return @row;
2163 }
2164
2165 =head2 sql_limit_dialect
2166
2167 This is an accessor for the default SQL limit dialect used by a particular
2168 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2169 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2170 see L<DBIx::Class::SQLMaker::LimitDialects>.
2171
2172 =head2 sth
2173
2174 =over 4
2175
2176 =item Arguments: $sql
2177
2178 =back
2179
2180 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2181
2182 =cut
2183
2184 sub _dbh_sth {
2185   my ($self, $dbh, $sql) = @_;
2186
2187   # 3 is the if_active parameter which avoids active sth re-use
2188   my $sth = $self->disable_sth_caching
2189     ? $dbh->prepare($sql)
2190     : $dbh->prepare_cached($sql, {}, 3);
2191
2192   # XXX You would think RaiseError would make this impossible,
2193   #  but apparently that's not true :(
2194   $self->throw_exception($dbh->errstr) if !$sth;
2195
2196   $sth;
2197 }
2198
2199 sub sth {
2200   my ($self, $sql) = @_;
2201   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2202 }
2203
2204 sub _dbh_columns_info_for {
2205   my ($self, $dbh, $table) = @_;
2206
2207   if ($dbh->can('column_info')) {
2208     my %result;
2209     my $caught;
2210     try {
2211       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2212       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2213       $sth->execute();
2214       while ( my $info = $sth->fetchrow_hashref() ){
2215         my %column_info;
2216         $column_info{data_type}   = $info->{TYPE_NAME};
2217         $column_info{size}      = $info->{COLUMN_SIZE};
2218         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2219         $column_info{default_value} = $info->{COLUMN_DEF};
2220         my $col_name = $info->{COLUMN_NAME};
2221         $col_name =~ s/^\"(.*)\"$/$1/;
2222
2223         $result{$col_name} = \%column_info;
2224       }
2225     } catch {
2226       $caught = 1;
2227     };
2228     return \%result if !$caught && scalar keys %result;
2229   }
2230
2231   my %result;
2232   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2233   $sth->execute;
2234   my @columns = @{$sth->{NAME_lc}};
2235   for my $i ( 0 .. $#columns ){
2236     my %column_info;
2237     $column_info{data_type} = $sth->{TYPE}->[$i];
2238     $column_info{size} = $sth->{PRECISION}->[$i];
2239     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2240
2241     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2242       $column_info{data_type} = $1;
2243       $column_info{size}    = $2;
2244     }
2245
2246     $result{$columns[$i]} = \%column_info;
2247   }
2248   $sth->finish;
2249
2250   foreach my $col (keys %result) {
2251     my $colinfo = $result{$col};
2252     my $type_num = $colinfo->{data_type};
2253     my $type_name;
2254     if(defined $type_num && $dbh->can('type_info')) {
2255       my $type_info = $dbh->type_info($type_num);
2256       $type_name = $type_info->{TYPE_NAME} if $type_info;
2257       $colinfo->{data_type} = $type_name if $type_name;
2258     }
2259   }
2260
2261   return \%result;
2262 }
2263
2264 sub columns_info_for {
2265   my ($self, $table) = @_;
2266   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2267 }
2268
2269 =head2 last_insert_id
2270
2271 Return the row id of the last insert.
2272
2273 =cut
2274
2275 sub _dbh_last_insert_id {
2276     my ($self, $dbh, $source, $col) = @_;
2277
2278     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2279
2280     return $id if defined $id;
2281
2282     my $class = ref $self;
2283     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2284 }
2285
2286 sub last_insert_id {
2287   my $self = shift;
2288   $self->_dbh_last_insert_id ($self->_dbh, @_);
2289 }
2290
2291 =head2 _native_data_type
2292
2293 =over 4
2294
2295 =item Arguments: $type_name
2296
2297 =back
2298
2299 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2300 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2301 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2302
2303 The default implementation returns C<undef>, implement in your Storage driver if
2304 you need this functionality.
2305
2306 Should map types from other databases to the native RDBMS type, for example
2307 C<VARCHAR2> to C<VARCHAR>.
2308
2309 Types with modifiers should map to the underlying data type. For example,
2310 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2311
2312 Composite types should map to the container type, for example
2313 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2314
2315 =cut
2316
2317 sub _native_data_type {
2318   #my ($self, $data_type) = @_;
2319   return undef
2320 }
2321
2322 # Check if placeholders are supported at all
2323 sub _determine_supports_placeholders {
2324   my $self = shift;
2325   my $dbh  = $self->_get_dbh;
2326
2327   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2328   # but it is inaccurate more often than not
2329   return try {
2330     local $dbh->{PrintError} = 0;
2331     local $dbh->{RaiseError} = 1;
2332     $dbh->do('select ?', {}, 1);
2333     1;
2334   }
2335   catch {
2336     0;
2337   };
2338 }
2339
2340 # Check if placeholders bound to non-string types throw exceptions
2341 #
2342 sub _determine_supports_typeless_placeholders {
2343   my $self = shift;
2344   my $dbh  = $self->_get_dbh;
2345
2346   return try {
2347     local $dbh->{PrintError} = 0;
2348     local $dbh->{RaiseError} = 1;
2349     # this specifically tests a bind that is NOT a string
2350     $dbh->do('select 1 where 1 = ?', {}, 1);
2351     1;
2352   }
2353   catch {
2354     0;
2355   };
2356 }
2357
2358 =head2 sqlt_type
2359
2360 Returns the database driver name.
2361
2362 =cut
2363
2364 sub sqlt_type {
2365   shift->_get_dbh->{Driver}->{Name};
2366 }
2367
2368 =head2 bind_attribute_by_data_type
2369
2370 Given a datatype from column info, returns a database specific bind
2371 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2372 let the database planner just handle it.
2373
2374 Generally only needed for special case column types, like bytea in postgres.
2375
2376 =cut
2377
2378 sub bind_attribute_by_data_type {
2379     return;
2380 }
2381
2382 =head2 is_datatype_numeric
2383
2384 Given a datatype from column_info, returns a boolean value indicating if
2385 the current RDBMS considers it a numeric value. This controls how
2386 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2387 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2388 be performed instead of the usual C<eq>.
2389
2390 =cut
2391
2392 sub is_datatype_numeric {
2393   my ($self, $dt) = @_;
2394
2395   return 0 unless $dt;
2396
2397   return $dt =~ /^ (?:
2398     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2399   ) $/ix;
2400 }
2401
2402
2403 =head2 create_ddl_dir
2404
2405 =over 4
2406
2407 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2408
2409 =back
2410
2411 Creates a SQL file based on the Schema, for each of the specified
2412 database engines in C<\@databases> in the given directory.
2413 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2414
2415 Given a previous version number, this will also create a file containing
2416 the ALTER TABLE statements to transform the previous schema into the
2417 current one. Note that these statements may contain C<DROP TABLE> or
2418 C<DROP COLUMN> statements that can potentially destroy data.
2419
2420 The file names are created using the C<ddl_filename> method below, please
2421 override this method in your schema if you would like a different file
2422 name format. For the ALTER file, the same format is used, replacing
2423 $version in the name with "$preversion-$version".
2424
2425 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2426 The most common value for this would be C<< { add_drop_table => 1 } >>
2427 to have the SQL produced include a C<DROP TABLE> statement for each table
2428 created. For quoting purposes supply C<quote_table_names> and
2429 C<quote_field_names>.
2430
2431 If no arguments are passed, then the following default values are assumed:
2432
2433 =over 4
2434
2435 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2436
2437 =item version    - $schema->schema_version
2438
2439 =item directory  - './'
2440
2441 =item preversion - <none>
2442
2443 =back
2444
2445 By default, C<\%sqlt_args> will have
2446
2447  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2448
2449 merged with the hash passed in. To disable any of those features, pass in a
2450 hashref like the following
2451
2452  { ignore_constraint_names => 0, # ... other options }
2453
2454
2455 WARNING: You are strongly advised to check all SQL files created, before applying
2456 them.
2457
2458 =cut
2459
2460 sub create_ddl_dir {
2461   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2462
2463   unless ($dir) {
2464     carp "No directory given, using ./\n";
2465     $dir = './';
2466   } else {
2467       -d $dir
2468         or
2469       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2470         or
2471       $self->throw_exception(
2472         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2473       );
2474   }
2475
2476   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2477
2478   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2479   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2480
2481   my $schema_version = $schema->schema_version || '1.x';
2482   $version ||= $schema_version;
2483
2484   $sqltargs = {
2485     add_drop_table => 1,
2486     ignore_constraint_names => 1,
2487     ignore_index_names => 1,
2488     %{$sqltargs || {}}
2489   };
2490
2491   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2492     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2493   }
2494
2495   my $sqlt = SQL::Translator->new( $sqltargs );
2496
2497   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2498   my $sqlt_schema = $sqlt->translate({ data => $schema })
2499     or $self->throw_exception ($sqlt->error);
2500
2501   foreach my $db (@$databases) {
2502     $sqlt->reset();
2503     $sqlt->{schema} = $sqlt_schema;
2504     $sqlt->producer($db);
2505
2506     my $file;
2507     my $filename = $schema->ddl_filename($db, $version, $dir);
2508     if (-e $filename && ($version eq $schema_version )) {
2509       # if we are dumping the current version, overwrite the DDL
2510       carp "Overwriting existing DDL file - $filename";
2511       unlink($filename);
2512     }
2513
2514     my $output = $sqlt->translate;
2515     if(!$output) {
2516       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2517       next;
2518     }
2519     if(!open($file, ">$filename")) {
2520       $self->throw_exception("Can't open $filename for writing ($!)");
2521       next;
2522     }
2523     print $file $output;
2524     close($file);
2525
2526     next unless ($preversion);
2527
2528     require SQL::Translator::Diff;
2529
2530     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2531     if(!-e $prefilename) {
2532       carp("No previous schema file found ($prefilename)");
2533       next;
2534     }
2535
2536     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2537     if(-e $difffile) {
2538       carp("Overwriting existing diff file - $difffile");
2539       unlink($difffile);
2540     }
2541
2542     my $source_schema;
2543     {
2544       my $t = SQL::Translator->new($sqltargs);
2545       $t->debug( 0 );
2546       $t->trace( 0 );
2547
2548       $t->parser( $db )
2549         or $self->throw_exception ($t->error);
2550
2551       my $out = $t->translate( $prefilename )
2552         or $self->throw_exception ($t->error);
2553
2554       $source_schema = $t->schema;
2555
2556       $source_schema->name( $prefilename )
2557         unless ( $source_schema->name );
2558     }
2559
2560     # The "new" style of producers have sane normalization and can support
2561     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2562     # And we have to diff parsed SQL against parsed SQL.
2563     my $dest_schema = $sqlt_schema;
2564
2565     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2566       my $t = SQL::Translator->new($sqltargs);
2567       $t->debug( 0 );
2568       $t->trace( 0 );
2569
2570       $t->parser( $db )
2571         or $self->throw_exception ($t->error);
2572
2573       my $out = $t->translate( $filename )
2574         or $self->throw_exception ($t->error);
2575
2576       $dest_schema = $t->schema;
2577
2578       $dest_schema->name( $filename )
2579         unless $dest_schema->name;
2580     }
2581
2582     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2583                                                   $dest_schema,   $db,
2584                                                   $sqltargs
2585                                                  );
2586     if(!open $file, ">$difffile") {
2587       $self->throw_exception("Can't write to $difffile ($!)");
2588       next;
2589     }
2590     print $file $diff;
2591     close($file);
2592   }
2593 }
2594
2595 =head2 deployment_statements
2596
2597 =over 4
2598
2599 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2600
2601 =back
2602
2603 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2604
2605 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2606 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2607
2608 C<$directory> is used to return statements from files in a previously created
2609 L</create_ddl_dir> directory and is optional. The filenames are constructed
2610 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2611
2612 If no C<$directory> is specified then the statements are constructed on the
2613 fly using L<SQL::Translator> and C<$version> is ignored.
2614
2615 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2616
2617 =cut
2618
2619 sub deployment_statements {
2620   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2621   $type ||= $self->sqlt_type;
2622   $version ||= $schema->schema_version || '1.x';
2623   $dir ||= './';
2624   my $filename = $schema->ddl_filename($type, $version, $dir);
2625   if(-f $filename)
2626   {
2627       my $file;
2628       open($file, "<$filename")
2629         or $self->throw_exception("Can't open $filename ($!)");
2630       my @rows = <$file>;
2631       close($file);
2632       return join('', @rows);
2633   }
2634
2635   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2636     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2637   }
2638
2639   # sources needs to be a parser arg, but for simplicty allow at top level
2640   # coming in
2641   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2642       if exists $sqltargs->{sources};
2643
2644   my $tr = SQL::Translator->new(
2645     producer => "SQL::Translator::Producer::${type}",
2646     %$sqltargs,
2647     parser => 'SQL::Translator::Parser::DBIx::Class',
2648     data => $schema,
2649   );
2650
2651   my @ret;
2652   my $wa = wantarray;
2653   if ($wa) {
2654     @ret = $tr->translate;
2655   }
2656   else {
2657     $ret[0] = $tr->translate;
2658   }
2659
2660   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2661     unless (@ret && defined $ret[0]);
2662
2663   return $wa ? @ret : $ret[0];
2664 }
2665
2666 sub deploy {
2667   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2668   my $deploy = sub {
2669     my $line = shift;
2670     return if($line =~ /^--/);
2671     return if(!$line);
2672     # next if($line =~ /^DROP/m);
2673     return if($line =~ /^BEGIN TRANSACTION/m);
2674     return if($line =~ /^COMMIT/m);
2675     return if $line =~ /^\s+$/; # skip whitespace only
2676     $self->_query_start($line);
2677     try {
2678       # do a dbh_do cycle here, as we need some error checking in
2679       # place (even though we will ignore errors)
2680       $self->dbh_do (sub { $_[1]->do($line) });
2681     } catch {
2682       carp qq{$_ (running "${line}")};
2683     };
2684     $self->_query_end($line);
2685   };
2686   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2687   if (@statements > 1) {
2688     foreach my $statement (@statements) {
2689       $deploy->( $statement );
2690     }
2691   }
2692   elsif (@statements == 1) {
2693     foreach my $line ( split(";\n", $statements[0])) {
2694       $deploy->( $line );
2695     }
2696   }
2697 }
2698
2699 =head2 datetime_parser
2700
2701 Returns the datetime parser class
2702
2703 =cut
2704
2705 sub datetime_parser {
2706   my $self = shift;
2707   return $self->{datetime_parser} ||= do {
2708     $self->build_datetime_parser(@_);
2709   };
2710 }
2711
2712 =head2 datetime_parser_type
2713
2714 Defines (returns) the datetime parser class - currently hardwired to
2715 L<DateTime::Format::MySQL>
2716
2717 =cut
2718
2719 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2720
2721 =head2 build_datetime_parser
2722
2723 See L</datetime_parser>
2724
2725 =cut
2726
2727 sub build_datetime_parser {
2728   my $self = shift;
2729   my $type = $self->datetime_parser_type(@_);
2730   $self->ensure_class_loaded ($type);
2731   return $type;
2732 }
2733
2734
2735 =head2 is_replicating
2736
2737 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2738 replicate from a master database.  Default is undef, which is the result
2739 returned by databases that don't support replication.
2740
2741 =cut
2742
2743 sub is_replicating {
2744     return;
2745
2746 }
2747
2748 =head2 lag_behind_master
2749
2750 Returns a number that represents a certain amount of lag behind a master db
2751 when a given storage is replicating.  The number is database dependent, but
2752 starts at zero and increases with the amount of lag. Default in undef
2753
2754 =cut
2755
2756 sub lag_behind_master {
2757     return;
2758 }
2759
2760 =head2 relname_to_table_alias
2761
2762 =over 4
2763
2764 =item Arguments: $relname, $join_count
2765
2766 =back
2767
2768 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2769 queries.
2770
2771 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2772 way these aliases are named.
2773
2774 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2775 otherwise C<"$relname">.
2776
2777 =cut
2778
2779 sub relname_to_table_alias {
2780   my ($self, $relname, $join_count) = @_;
2781
2782   my $alias = ($join_count && $join_count > 1 ?
2783     join('_', $relname, $join_count) : $relname);
2784
2785   return $alias;
2786 }
2787
2788 1;
2789
2790 =head1 USAGE NOTES
2791
2792 =head2 DBIx::Class and AutoCommit
2793
2794 DBIx::Class can do some wonderful magic with handling exceptions,
2795 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2796 (the default) combined with C<txn_do> for transaction support.
2797
2798 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2799 in an assumed transaction between commits, and you're telling us you'd
2800 like to manage that manually.  A lot of the magic protections offered by
2801 this module will go away.  We can't protect you from exceptions due to database
2802 disconnects because we don't know anything about how to restart your
2803 transactions.  You're on your own for handling all sorts of exceptional
2804 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2805 be with raw DBI.
2806
2807
2808 =head1 AUTHORS
2809
2810 Matt S. Trout <mst@shadowcatsystems.co.uk>
2811
2812 Andy Grundman <andy@hybridized.org>
2813
2814 =head1 LICENSE
2815
2816 You may distribute this code under the same terms as Perl itself.
2817
2818 =cut