4d18d8dce68639a69d62d47a54ecbc1f717df28e
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders join_optimizer/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
60
61 # on by default, not strictly a capability (pending rewrite)
62 __PACKAGE__->_use_join_optimizer (1);
63 sub _determine_supports_join_optimizer { 1 };
64
65 # Each of these methods need _determine_driver called before itself
66 # in order to function reliably. This is a purely DRY optimization
67 #
68 # get_(use)_dbms_capability need to be called on the correct Storage
69 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
70 # _determine_supports_X which obv. needs a correct driver as well
71 my @rdbms_specific_methods = qw/
72   deployment_statements
73   sqlt_type
74   sql_maker
75   build_datetime_parser
76   datetime_parser_type
77
78   insert
79   insert_bulk
80   update
81   delete
82   select
83   select_single
84
85   get_use_dbms_capability
86   get_dbms_capability
87
88   _server_info
89   _get_server_version
90 /;
91
92 for my $meth (@rdbms_specific_methods) {
93
94   my $orig = __PACKAGE__->can ($meth)
95     or die "$meth is not a ::Storage::DBI method!";
96
97   no strict qw/refs/;
98   no warnings qw/redefine/;
99   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
100     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
101       $_[0]->_determine_driver;
102
103       # This for some reason crashes and burns on perl 5.8.1
104       # IFF the method ends up throwing an exception
105       #goto $_[0]->can ($meth);
106
107       my $cref = $_[0]->can ($meth);
108       goto $cref;
109     }
110     goto $orig;
111   };
112 }
113
114
115 =head1 NAME
116
117 DBIx::Class::Storage::DBI - DBI storage handler
118
119 =head1 SYNOPSIS
120
121   my $schema = MySchema->connect('dbi:SQLite:my.db');
122
123   $schema->storage->debug(1);
124
125   my @stuff = $schema->storage->dbh_do(
126     sub {
127       my ($storage, $dbh, @args) = @_;
128       $dbh->do("DROP TABLE authors");
129     },
130     @column_list
131   );
132
133   $schema->resultset('Book')->search({
134      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
135   });
136
137 =head1 DESCRIPTION
138
139 This class represents the connection to an RDBMS via L<DBI>.  See
140 L<DBIx::Class::Storage> for general information.  This pod only
141 documents DBI-specific methods and behaviors.
142
143 =head1 METHODS
144
145 =cut
146
147 sub new {
148   my $new = shift->next::method(@_);
149
150   $new->transaction_depth(0);
151   $new->_sql_maker_opts({});
152   $new->_dbh_details({});
153   $new->{savepoints} = [];
154   $new->{_in_dbh_do} = 0;
155   $new->{_dbh_gen} = 0;
156
157   # read below to see what this does
158   $new->_arm_global_destructor;
159
160   $new;
161 }
162
163 # This is hack to work around perl shooting stuff in random
164 # order on exit(). If we do not walk the remaining storage
165 # objects in an END block, there is a *small but real* chance
166 # of a fork()ed child to kill the parent's shared DBI handle,
167 # *before perl reaches the DESTROY in this package*
168 # Yes, it is ugly and effective.
169 # Additionally this registry is used by the CLONE method to
170 # make sure no handles are shared between threads
171 {
172   my %seek_and_destroy;
173
174   sub _arm_global_destructor {
175     my $self = shift;
176     my $key = refaddr ($self);
177     $seek_and_destroy{$key} = $self;
178     weaken ($seek_and_destroy{$key});
179   }
180
181   END {
182     local $?; # just in case the DBI destructor changes it somehow
183
184     # destroy just the object if not native to this process/thread
185     $_->_verify_pid for (grep
186       { defined $_ }
187       values %seek_and_destroy
188     );
189   }
190
191   sub CLONE {
192     # As per DBI's recommendation, DBIC disconnects all handles as
193     # soon as possible (DBIC will reconnect only on demand from within
194     # the thread)
195     for (values %seek_and_destroy) {
196       next unless $_;
197       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
198       $_->_dbh(undef);
199     }
200   }
201 }
202
203 sub DESTROY {
204   my $self = shift;
205
206   # some databases spew warnings on implicit disconnect
207   local $SIG{__WARN__} = sub {};
208   $self->_dbh(undef);
209 }
210
211 # handle pid changes correctly - do not destroy parent's connection
212 sub _verify_pid {
213   my $self = shift;
214
215   my $pid = $self->_conn_pid;
216   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
217     $dbh->{InactiveDestroy} = 1;
218     $self->{_dbh_gen}++;
219     $self->_dbh(undef);
220   }
221
222   return;
223 }
224
225 =head2 connect_info
226
227 This method is normally called by L<DBIx::Class::Schema/connection>, which
228 encapsulates its argument list in an arrayref before passing them here.
229
230 The argument list may contain:
231
232 =over
233
234 =item *
235
236 The same 4-element argument set one would normally pass to
237 L<DBI/connect>, optionally followed by
238 L<extra attributes|/DBIx::Class specific connection attributes>
239 recognized by DBIx::Class:
240
241   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
242
243 =item *
244
245 A single code reference which returns a connected
246 L<DBI database handle|DBI/connect> optionally followed by
247 L<extra attributes|/DBIx::Class specific connection attributes> recognized
248 by DBIx::Class:
249
250   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
251
252 =item *
253
254 A single hashref with all the attributes and the dsn/user/password
255 mixed together:
256
257   $connect_info_args = [{
258     dsn => $dsn,
259     user => $user,
260     password => $pass,
261     %dbi_attributes,
262     %extra_attributes,
263   }];
264
265   $connect_info_args = [{
266     dbh_maker => sub { DBI->connect (...) },
267     %dbi_attributes,
268     %extra_attributes,
269   }];
270
271 This is particularly useful for L<Catalyst> based applications, allowing the
272 following config (L<Config::General> style):
273
274   <Model::DB>
275     schema_class   App::DB
276     <connect_info>
277       dsn          dbi:mysql:database=test
278       user         testuser
279       password     TestPass
280       AutoCommit   1
281     </connect_info>
282   </Model::DB>
283
284 The C<dsn>/C<user>/C<password> combination can be substituted by the
285 C<dbh_maker> key whose value is a coderef that returns a connected
286 L<DBI database handle|DBI/connect>
287
288 =back
289
290 Please note that the L<DBI> docs recommend that you always explicitly
291 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
292 recommends that it be set to I<1>, and that you perform transactions
293 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
294 to I<1> if you do not do explicitly set it to zero.  This is the default
295 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
296
297 =head3 DBIx::Class specific connection attributes
298
299 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
300 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
301 the following connection options. These options can be mixed in with your other
302 L<DBI> connection attributes, or placed in a separate hashref
303 (C<\%extra_attributes>) as shown above.
304
305 Every time C<connect_info> is invoked, any previous settings for
306 these options will be cleared before setting the new ones, regardless of
307 whether any options are specified in the new C<connect_info>.
308
309
310 =over
311
312 =item on_connect_do
313
314 Specifies things to do immediately after connecting or re-connecting to
315 the database.  Its value may contain:
316
317 =over
318
319 =item a scalar
320
321 This contains one SQL statement to execute.
322
323 =item an array reference
324
325 This contains SQL statements to execute in order.  Each element contains
326 a string or a code reference that returns a string.
327
328 =item a code reference
329
330 This contains some code to execute.  Unlike code references within an
331 array reference, its return value is ignored.
332
333 =back
334
335 =item on_disconnect_do
336
337 Takes arguments in the same form as L</on_connect_do> and executes them
338 immediately before disconnecting from the database.
339
340 Note, this only runs if you explicitly call L</disconnect> on the
341 storage object.
342
343 =item on_connect_call
344
345 A more generalized form of L</on_connect_do> that calls the specified
346 C<connect_call_METHOD> methods in your storage driver.
347
348   on_connect_do => 'select 1'
349
350 is equivalent to:
351
352   on_connect_call => [ [ do_sql => 'select 1' ] ]
353
354 Its values may contain:
355
356 =over
357
358 =item a scalar
359
360 Will call the C<connect_call_METHOD> method.
361
362 =item a code reference
363
364 Will execute C<< $code->($storage) >>
365
366 =item an array reference
367
368 Each value can be a method name or code reference.
369
370 =item an array of arrays
371
372 For each array, the first item is taken to be the C<connect_call_> method name
373 or code reference, and the rest are parameters to it.
374
375 =back
376
377 Some predefined storage methods you may use:
378
379 =over
380
381 =item do_sql
382
383 Executes a SQL string or a code reference that returns a SQL string. This is
384 what L</on_connect_do> and L</on_disconnect_do> use.
385
386 It can take:
387
388 =over
389
390 =item a scalar
391
392 Will execute the scalar as SQL.
393
394 =item an arrayref
395
396 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
397 attributes hashref and bind values.
398
399 =item a code reference
400
401 Will execute C<< $code->($storage) >> and execute the return array refs as
402 above.
403
404 =back
405
406 =item datetime_setup
407
408 Execute any statements necessary to initialize the database session to return
409 and accept datetime/timestamp values used with
410 L<DBIx::Class::InflateColumn::DateTime>.
411
412 Only necessary for some databases, see your specific storage driver for
413 implementation details.
414
415 =back
416
417 =item on_disconnect_call
418
419 Takes arguments in the same form as L</on_connect_call> and executes them
420 immediately before disconnecting from the database.
421
422 Calls the C<disconnect_call_METHOD> methods as opposed to the
423 C<connect_call_METHOD> methods called by L</on_connect_call>.
424
425 Note, this only runs if you explicitly call L</disconnect> on the
426 storage object.
427
428 =item disable_sth_caching
429
430 If set to a true value, this option will disable the caching of
431 statement handles via L<DBI/prepare_cached>.
432
433 =item limit_dialect
434
435 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
436 default L</sql_limit_dialect> setting of the storage (if any). For a list
437 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
438
439 =item quote_char
440
441 Specifies what characters to use to quote table and column names.
442
443 C<quote_char> expects either a single character, in which case is it
444 is placed on either side of the table/column name, or an arrayref of length
445 2 in which case the table/column name is placed between the elements.
446
447 For example under MySQL you should use C<< quote_char => '`' >>, and for
448 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
449
450 =item name_sep
451
452 This parameter is only useful in conjunction with C<quote_char>, and is used to
453 specify the character that separates elements (schemas, tables, columns) from
454 each other. If unspecified it defaults to the most commonly used C<.>.
455
456 =item unsafe
457
458 This Storage driver normally installs its own C<HandleError>, sets
459 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
460 all database handles, including those supplied by a coderef.  It does this
461 so that it can have consistent and useful error behavior.
462
463 If you set this option to a true value, Storage will not do its usual
464 modifications to the database handle's attributes, and instead relies on
465 the settings in your connect_info DBI options (or the values you set in
466 your connection coderef, in the case that you are connecting via coderef).
467
468 Note that your custom settings can cause Storage to malfunction,
469 especially if you set a C<HandleError> handler that suppresses exceptions
470 and/or disable C<RaiseError>.
471
472 =item auto_savepoint
473
474 If this option is true, L<DBIx::Class> will use savepoints when nesting
475 transactions, making it possible to recover from failure in the inner
476 transaction without having to abort all outer transactions.
477
478 =item cursor_class
479
480 Use this argument to supply a cursor class other than the default
481 L<DBIx::Class::Storage::DBI::Cursor>.
482
483 =back
484
485 Some real-life examples of arguments to L</connect_info> and
486 L<DBIx::Class::Schema/connect>
487
488   # Simple SQLite connection
489   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
490
491   # Connect via subref
492   ->connect_info([ sub { DBI->connect(...) } ]);
493
494   # Connect via subref in hashref
495   ->connect_info([{
496     dbh_maker => sub { DBI->connect(...) },
497     on_connect_do => 'alter session ...',
498   }]);
499
500   # A bit more complicated
501   ->connect_info(
502     [
503       'dbi:Pg:dbname=foo',
504       'postgres',
505       'my_pg_password',
506       { AutoCommit => 1 },
507       { quote_char => q{"} },
508     ]
509   );
510
511   # Equivalent to the previous example
512   ->connect_info(
513     [
514       'dbi:Pg:dbname=foo',
515       'postgres',
516       'my_pg_password',
517       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
518     ]
519   );
520
521   # Same, but with hashref as argument
522   # See parse_connect_info for explanation
523   ->connect_info(
524     [{
525       dsn         => 'dbi:Pg:dbname=foo',
526       user        => 'postgres',
527       password    => 'my_pg_password',
528       AutoCommit  => 1,
529       quote_char  => q{"},
530       name_sep    => q{.},
531     }]
532   );
533
534   # Subref + DBIx::Class-specific connection options
535   ->connect_info(
536     [
537       sub { DBI->connect(...) },
538       {
539           quote_char => q{`},
540           name_sep => q{@},
541           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
542           disable_sth_caching => 1,
543       },
544     ]
545   );
546
547
548
549 =cut
550
551 sub connect_info {
552   my ($self, $info) = @_;
553
554   return $self->_connect_info if !$info;
555
556   $self->_connect_info($info); # copy for _connect_info
557
558   $info = $self->_normalize_connect_info($info)
559     if ref $info eq 'ARRAY';
560
561   for my $storage_opt (keys %{ $info->{storage_options} }) {
562     my $value = $info->{storage_options}{$storage_opt};
563
564     $self->$storage_opt($value);
565   }
566
567   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
568   #  the new set of options
569   $self->_sql_maker(undef);
570   $self->_sql_maker_opts({});
571
572   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
573     my $value = $info->{sql_maker_options}{$sql_maker_opt};
574
575     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
576   }
577
578   my %attrs = (
579     %{ $self->_default_dbi_connect_attributes || {} },
580     %{ $info->{attributes} || {} },
581   );
582
583   my @args = @{ $info->{arguments} };
584
585   $self->_dbi_connect_info([@args,
586     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
587
588   # FIXME - dirty:
589   # save attributes them in a separate accessor so they are always
590   # introspectable, even in case of a CODE $dbhmaker
591   $self->_dbic_connect_attributes (\%attrs);
592
593   return $self->_connect_info;
594 }
595
596 sub _normalize_connect_info {
597   my ($self, $info_arg) = @_;
598   my %info;
599
600   my @args = @$info_arg;  # take a shallow copy for further mutilation
601
602   # combine/pre-parse arguments depending on invocation style
603
604   my %attrs;
605   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
606     %attrs = %{ $args[1] || {} };
607     @args = $args[0];
608   }
609   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
610     %attrs = %{$args[0]};
611     @args = ();
612     if (my $code = delete $attrs{dbh_maker}) {
613       @args = $code;
614
615       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
616       if (@ignored) {
617         carp sprintf (
618             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
619           . "to the result of 'dbh_maker'",
620
621           join (', ', map { "'$_'" } (@ignored) ),
622         );
623       }
624     }
625     else {
626       @args = delete @attrs{qw/dsn user password/};
627     }
628   }
629   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
630     %attrs = (
631       % { $args[3] || {} },
632       % { $args[4] || {} },
633     );
634     @args = @args[0,1,2];
635   }
636
637   $info{arguments} = \@args;
638
639   my @storage_opts = grep exists $attrs{$_},
640     @storage_options, 'cursor_class';
641
642   @{ $info{storage_options} }{@storage_opts} =
643     delete @attrs{@storage_opts} if @storage_opts;
644
645   my @sql_maker_opts = grep exists $attrs{$_},
646     qw/limit_dialect quote_char name_sep/;
647
648   @{ $info{sql_maker_options} }{@sql_maker_opts} =
649     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
650
651   $info{attributes} = \%attrs if %attrs;
652
653   return \%info;
654 }
655
656 sub _default_dbi_connect_attributes {
657   return {
658     AutoCommit => 1,
659     RaiseError => 1,
660     PrintError => 0,
661   };
662 }
663
664 =head2 on_connect_do
665
666 This method is deprecated in favour of setting via L</connect_info>.
667
668 =cut
669
670 =head2 on_disconnect_do
671
672 This method is deprecated in favour of setting via L</connect_info>.
673
674 =cut
675
676 sub _parse_connect_do {
677   my ($self, $type) = @_;
678
679   my $val = $self->$type;
680   return () if not defined $val;
681
682   my @res;
683
684   if (not ref($val)) {
685     push @res, [ 'do_sql', $val ];
686   } elsif (ref($val) eq 'CODE') {
687     push @res, $val;
688   } elsif (ref($val) eq 'ARRAY') {
689     push @res, map { [ 'do_sql', $_ ] } @$val;
690   } else {
691     $self->throw_exception("Invalid type for $type: ".ref($val));
692   }
693
694   return \@res;
695 }
696
697 =head2 dbh_do
698
699 Arguments: ($subref | $method_name), @extra_coderef_args?
700
701 Execute the given $subref or $method_name using the new exception-based
702 connection management.
703
704 The first two arguments will be the storage object that C<dbh_do> was called
705 on and a database handle to use.  Any additional arguments will be passed
706 verbatim to the called subref as arguments 2 and onwards.
707
708 Using this (instead of $self->_dbh or $self->dbh) ensures correct
709 exception handling and reconnection (or failover in future subclasses).
710
711 Your subref should have no side-effects outside of the database, as
712 there is the potential for your subref to be partially double-executed
713 if the database connection was stale/dysfunctional.
714
715 Example:
716
717   my @stuff = $schema->storage->dbh_do(
718     sub {
719       my ($storage, $dbh, @cols) = @_;
720       my $cols = join(q{, }, @cols);
721       $dbh->selectrow_array("SELECT $cols FROM foo");
722     },
723     @column_list
724   );
725
726 =cut
727
728 sub dbh_do {
729   my $self = shift;
730   my $code = shift;
731
732   my $dbh = $self->_get_dbh;
733
734   return $self->$code($dbh, @_)
735     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
736
737   local $self->{_in_dbh_do} = 1;
738
739   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
740   my $args = \@_;
741   return try {
742     $self->$code ($dbh, @$args);
743   } catch {
744     $self->throw_exception($_) if $self->connected;
745
746     # We were not connected - reconnect and retry, but let any
747     #  exception fall right through this time
748     carp "Retrying $code after catching disconnected exception: $_"
749       if $ENV{DBIC_DBIRETRY_DEBUG};
750
751     $self->_populate_dbh;
752     $self->$code($self->_dbh, @$args);
753   };
754 }
755
756 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
757 # It also informs dbh_do to bypass itself while under the direction of txn_do,
758 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
759 sub txn_do {
760   my $self = shift;
761   my $coderef = shift;
762
763   ref $coderef eq 'CODE' or $self->throw_exception
764     ('$coderef must be a CODE reference');
765
766   local $self->{_in_dbh_do} = 1;
767
768   my @result;
769   my $want_array = wantarray;
770
771   my $tried = 0;
772   while(1) {
773     my $exception;
774
775     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
776     my $args = \@_;
777
778     try {
779       $self->txn_begin;
780       my $txn_start_depth = $self->transaction_depth;
781       if($want_array) {
782           @result = $coderef->(@$args);
783       }
784       elsif(defined $want_array) {
785           $result[0] = $coderef->(@$args);
786       }
787       else {
788           $coderef->(@$args);
789       }
790
791       my $delta_txn = $txn_start_depth - $self->transaction_depth;
792       if ($delta_txn == 0) {
793         $self->txn_commit;
794       }
795       elsif ($delta_txn != 1) {
796         # an off-by-one would mean we fired a rollback
797         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
798       }
799     } catch {
800       $exception = $_;
801     };
802
803     if(! defined $exception) { return $want_array ? @result : $result[0] }
804
805     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
806       my $rollback_exception;
807       try { $self->txn_rollback } catch { $rollback_exception = shift };
808       if(defined $rollback_exception) {
809         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
810         $self->throw_exception($exception)  # propagate nested rollback
811           if $rollback_exception =~ /$exception_class/;
812
813         $self->throw_exception(
814           "Transaction aborted: ${exception}. "
815           . "Rollback failed: ${rollback_exception}"
816         );
817       }
818       $self->throw_exception($exception)
819     }
820
821     # We were not connected, and was first try - reconnect and retry
822     # via the while loop
823     carp "Retrying $coderef after catching disconnected exception: $exception"
824       if $ENV{DBIC_TXNRETRY_DEBUG};
825     $self->_populate_dbh;
826   }
827 }
828
829 =head2 disconnect
830
831 Our C<disconnect> method also performs a rollback first if the
832 database is not in C<AutoCommit> mode.
833
834 =cut
835
836 sub disconnect {
837   my ($self) = @_;
838
839   if( $self->_dbh ) {
840     my @actions;
841
842     push @actions, ( $self->on_disconnect_call || () );
843     push @actions, $self->_parse_connect_do ('on_disconnect_do');
844
845     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
846
847     $self->_dbh_rollback unless $self->_dbh_autocommit;
848
849     %{ $self->_dbh->{CachedKids} } = ();
850     $self->_dbh->disconnect;
851     $self->_dbh(undef);
852     $self->{_dbh_gen}++;
853   }
854 }
855
856 =head2 with_deferred_fk_checks
857
858 =over 4
859
860 =item Arguments: C<$coderef>
861
862 =item Return Value: The return value of $coderef
863
864 =back
865
866 Storage specific method to run the code ref with FK checks deferred or
867 in MySQL's case disabled entirely.
868
869 =cut
870
871 # Storage subclasses should override this
872 sub with_deferred_fk_checks {
873   my ($self, $sub) = @_;
874   $sub->();
875 }
876
877 =head2 connected
878
879 =over
880
881 =item Arguments: none
882
883 =item Return Value: 1|0
884
885 =back
886
887 Verifies that the current database handle is active and ready to execute
888 an SQL statement (e.g. the connection did not get stale, server is still
889 answering, etc.) This method is used internally by L</dbh>.
890
891 =cut
892
893 sub connected {
894   my $self = shift;
895   return 0 unless $self->_seems_connected;
896
897   #be on the safe side
898   local $self->_dbh->{RaiseError} = 1;
899
900   return $self->_ping;
901 }
902
903 sub _seems_connected {
904   my $self = shift;
905
906   $self->_verify_pid;
907
908   my $dbh = $self->_dbh
909     or return 0;
910
911   return $dbh->FETCH('Active');
912 }
913
914 sub _ping {
915   my $self = shift;
916
917   my $dbh = $self->_dbh or return 0;
918
919   return $dbh->ping;
920 }
921
922 sub ensure_connected {
923   my ($self) = @_;
924
925   unless ($self->connected) {
926     $self->_populate_dbh;
927   }
928 }
929
930 =head2 dbh
931
932 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
933 is guaranteed to be healthy by implicitly calling L</connected>, and if
934 necessary performing a reconnection before returning. Keep in mind that this
935 is very B<expensive> on some database engines. Consider using L</dbh_do>
936 instead.
937
938 =cut
939
940 sub dbh {
941   my ($self) = @_;
942
943   if (not $self->_dbh) {
944     $self->_populate_dbh;
945   } else {
946     $self->ensure_connected;
947   }
948   return $self->_dbh;
949 }
950
951 # this is the internal "get dbh or connect (don't check)" method
952 sub _get_dbh {
953   my $self = shift;
954   $self->_verify_pid;
955   $self->_populate_dbh unless $self->_dbh;
956   return $self->_dbh;
957 }
958
959 sub sql_maker {
960   my ($self) = @_;
961   unless ($self->_sql_maker) {
962     my $sql_maker_class = $self->sql_maker_class;
963     $self->ensure_class_loaded ($sql_maker_class);
964
965     my %opts = %{$self->_sql_maker_opts||{}};
966     my $dialect =
967       $opts{limit_dialect}
968         ||
969       $self->sql_limit_dialect
970         ||
971       do {
972         my $s_class = (ref $self) || $self;
973         carp (
974           "Your storage class ($s_class) does not set sql_limit_dialect and you "
975         . 'have not supplied an explicit limit_dialect in your connection_info. '
976         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
977         . 'databases but can be (and often is) painfully slow.'
978         );
979
980         'GenericSubQ';
981       }
982     ;
983
984     $self->_sql_maker($sql_maker_class->new(
985       bindtype=>'columns',
986       array_datatypes => 1,
987       limit_dialect => $dialect,
988       name_sep => '.',
989       %opts,
990     ));
991   }
992   return $self->_sql_maker;
993 }
994
995 # nothing to do by default
996 sub _rebless {}
997 sub _init {}
998
999 sub _populate_dbh {
1000   my ($self) = @_;
1001
1002   my @info = @{$self->_dbi_connect_info || []};
1003   $self->_dbh(undef); # in case ->connected failed we might get sent here
1004   $self->_dbh_details({}); # reset everything we know
1005
1006   $self->_dbh($self->_connect(@info));
1007
1008   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1009
1010   $self->_determine_driver;
1011
1012   # Always set the transaction depth on connect, since
1013   #  there is no transaction in progress by definition
1014   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1015
1016   $self->_run_connection_actions unless $self->{_in_determine_driver};
1017 }
1018
1019 sub _run_connection_actions {
1020   my $self = shift;
1021   my @actions;
1022
1023   push @actions, ( $self->on_connect_call || () );
1024   push @actions, $self->_parse_connect_do ('on_connect_do');
1025
1026   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1027 }
1028
1029
1030
1031 sub set_use_dbms_capability {
1032   $_[0]->set_inherited ($_[1], $_[2]);
1033 }
1034
1035 sub get_use_dbms_capability {
1036   my ($self, $capname) = @_;
1037
1038   my $use = $self->get_inherited ($capname);
1039   return defined $use
1040     ? $use
1041     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1042   ;
1043 }
1044
1045 sub set_dbms_capability {
1046   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1047 }
1048
1049 sub get_dbms_capability {
1050   my ($self, $capname) = @_;
1051
1052   my $cap = $self->_dbh_details->{capability}{$capname};
1053
1054   unless (defined $cap) {
1055     if (my $meth = $self->can ("_determine$capname")) {
1056       $cap = $self->$meth ? 1 : 0;
1057     }
1058     else {
1059       $cap = 0;
1060     }
1061
1062     $self->set_dbms_capability ($capname, $cap);
1063   }
1064
1065   return $cap;
1066 }
1067
1068 sub _server_info {
1069   my $self = shift;
1070
1071   my $info;
1072   unless ($info = $self->_dbh_details->{info}) {
1073
1074     $info = {};
1075
1076     my $server_version = try { $self->_get_server_version };
1077
1078     if (defined $server_version) {
1079       $info->{dbms_version} = $server_version;
1080
1081       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1082       my @verparts = split (/\./, $numeric_version);
1083       if (
1084         @verparts
1085           &&
1086         $verparts[0] <= 999
1087       ) {
1088         # consider only up to 3 version parts, iff not more than 3 digits
1089         my @use_parts;
1090         while (@verparts && @use_parts < 3) {
1091           my $p = shift @verparts;
1092           last if $p > 999;
1093           push @use_parts, $p;
1094         }
1095         push @use_parts, 0 while @use_parts < 3;
1096
1097         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1098       }
1099     }
1100
1101     $self->_dbh_details->{info} = $info;
1102   }
1103
1104   return $info;
1105 }
1106
1107 sub _get_server_version {
1108   shift->_get_dbh->get_info(18);
1109 }
1110
1111 sub _determine_driver {
1112   my ($self) = @_;
1113
1114   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1115     my $started_connected = 0;
1116     local $self->{_in_determine_driver} = 1;
1117
1118     if (ref($self) eq __PACKAGE__) {
1119       my $driver;
1120       if ($self->_dbh) { # we are connected
1121         $driver = $self->_dbh->{Driver}{Name};
1122         $started_connected = 1;
1123       } else {
1124         # if connect_info is a CODEREF, we have no choice but to connect
1125         if (ref $self->_dbi_connect_info->[0] &&
1126             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1127           $self->_populate_dbh;
1128           $driver = $self->_dbh->{Driver}{Name};
1129         }
1130         else {
1131           # try to use dsn to not require being connected, the driver may still
1132           # force a connection in _rebless to determine version
1133           # (dsn may not be supplied at all if all we do is make a mock-schema)
1134           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1135           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1136           $driver ||= $ENV{DBI_DRIVER};
1137         }
1138       }
1139
1140       if ($driver) {
1141         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1142         if ($self->load_optional_class($storage_class)) {
1143           mro::set_mro($storage_class, 'c3');
1144           bless $self, $storage_class;
1145           $self->_rebless();
1146         }
1147       }
1148     }
1149
1150     $self->_driver_determined(1);
1151
1152     $self->_init; # run driver-specific initializations
1153
1154     $self->_run_connection_actions
1155         if !$started_connected && defined $self->_dbh;
1156   }
1157 }
1158
1159 sub _do_connection_actions {
1160   my $self          = shift;
1161   my $method_prefix = shift;
1162   my $call          = shift;
1163
1164   if (not ref($call)) {
1165     my $method = $method_prefix . $call;
1166     $self->$method(@_);
1167   } elsif (ref($call) eq 'CODE') {
1168     $self->$call(@_);
1169   } elsif (ref($call) eq 'ARRAY') {
1170     if (ref($call->[0]) ne 'ARRAY') {
1171       $self->_do_connection_actions($method_prefix, $_) for @$call;
1172     } else {
1173       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1174     }
1175   } else {
1176     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1177   }
1178
1179   return $self;
1180 }
1181
1182 sub connect_call_do_sql {
1183   my $self = shift;
1184   $self->_do_query(@_);
1185 }
1186
1187 sub disconnect_call_do_sql {
1188   my $self = shift;
1189   $self->_do_query(@_);
1190 }
1191
1192 # override in db-specific backend when necessary
1193 sub connect_call_datetime_setup { 1 }
1194
1195 sub _do_query {
1196   my ($self, $action) = @_;
1197
1198   if (ref $action eq 'CODE') {
1199     $action = $action->($self);
1200     $self->_do_query($_) foreach @$action;
1201   }
1202   else {
1203     # Most debuggers expect ($sql, @bind), so we need to exclude
1204     # the attribute hash which is the second argument to $dbh->do
1205     # furthermore the bind values are usually to be presented
1206     # as named arrayref pairs, so wrap those here too
1207     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1208     my $sql = shift @do_args;
1209     my $attrs = shift @do_args;
1210     my @bind = map { [ undef, $_ ] } @do_args;
1211
1212     $self->_query_start($sql, @bind);
1213     $self->_get_dbh->do($sql, $attrs, @do_args);
1214     $self->_query_end($sql, @bind);
1215   }
1216
1217   return $self;
1218 }
1219
1220 sub _connect {
1221   my ($self, @info) = @_;
1222
1223   $self->throw_exception("You failed to provide any connection info")
1224     if !@info;
1225
1226   my ($old_connect_via, $dbh);
1227
1228   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1229     $old_connect_via = $DBI::connect_via;
1230     $DBI::connect_via = 'connect';
1231   }
1232
1233   try {
1234     if(ref $info[0] eq 'CODE') {
1235        $dbh = $info[0]->();
1236     }
1237     else {
1238        $dbh = DBI->connect(@info);
1239     }
1240
1241     if (!$dbh) {
1242       die $DBI::errstr;
1243     }
1244
1245     unless ($self->unsafe) {
1246
1247       # this odd anonymous coderef dereference is in fact really
1248       # necessary to avoid the unwanted effect described in perl5
1249       # RT#75792
1250       sub {
1251         my $weak_self = $_[0];
1252         weaken $weak_self;
1253
1254         $_[1]->{HandleError} = sub {
1255           if ($weak_self) {
1256             $weak_self->throw_exception("DBI Exception: $_[0]");
1257           }
1258           else {
1259             # the handler may be invoked by something totally out of
1260             # the scope of DBIC
1261             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1262           }
1263         };
1264       }->($self, $dbh);
1265
1266       $dbh->{ShowErrorStatement} = 1;
1267       $dbh->{RaiseError} = 1;
1268       $dbh->{PrintError} = 0;
1269     }
1270   }
1271   catch {
1272     $self->throw_exception("DBI Connection failed: $_")
1273   }
1274   finally {
1275     $DBI::connect_via = $old_connect_via if $old_connect_via;
1276   };
1277
1278   $self->_dbh_autocommit($dbh->{AutoCommit});
1279   $dbh;
1280 }
1281
1282 sub svp_begin {
1283   my ($self, $name) = @_;
1284
1285   $name = $self->_svp_generate_name
1286     unless defined $name;
1287
1288   $self->throw_exception ("You can't use savepoints outside a transaction")
1289     if $self->{transaction_depth} == 0;
1290
1291   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1292     unless $self->can('_svp_begin');
1293
1294   push @{ $self->{savepoints} }, $name;
1295
1296   $self->debugobj->svp_begin($name) if $self->debug;
1297
1298   return $self->_svp_begin($name);
1299 }
1300
1301 sub svp_release {
1302   my ($self, $name) = @_;
1303
1304   $self->throw_exception ("You can't use savepoints outside a transaction")
1305     if $self->{transaction_depth} == 0;
1306
1307   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1308     unless $self->can('_svp_release');
1309
1310   if (defined $name) {
1311     $self->throw_exception ("Savepoint '$name' does not exist")
1312       unless grep { $_ eq $name } @{ $self->{savepoints} };
1313
1314     # Dig through the stack until we find the one we are releasing.  This keeps
1315     # the stack up to date.
1316     my $svp;
1317
1318     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1319   } else {
1320     $name = pop @{ $self->{savepoints} };
1321   }
1322
1323   $self->debugobj->svp_release($name) if $self->debug;
1324
1325   return $self->_svp_release($name);
1326 }
1327
1328 sub svp_rollback {
1329   my ($self, $name) = @_;
1330
1331   $self->throw_exception ("You can't use savepoints outside a transaction")
1332     if $self->{transaction_depth} == 0;
1333
1334   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1335     unless $self->can('_svp_rollback');
1336
1337   if (defined $name) {
1338       # If they passed us a name, verify that it exists in the stack
1339       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1340           $self->throw_exception("Savepoint '$name' does not exist!");
1341       }
1342
1343       # Dig through the stack until we find the one we are releasing.  This keeps
1344       # the stack up to date.
1345       while(my $s = pop(@{ $self->{savepoints} })) {
1346           last if($s eq $name);
1347       }
1348       # Add the savepoint back to the stack, as a rollback doesn't remove the
1349       # named savepoint, only everything after it.
1350       push(@{ $self->{savepoints} }, $name);
1351   } else {
1352       # We'll assume they want to rollback to the last savepoint
1353       $name = $self->{savepoints}->[-1];
1354   }
1355
1356   $self->debugobj->svp_rollback($name) if $self->debug;
1357
1358   return $self->_svp_rollback($name);
1359 }
1360
1361 sub _svp_generate_name {
1362   my ($self) = @_;
1363   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1364 }
1365
1366 sub txn_begin {
1367   my $self = shift;
1368
1369   # this means we have not yet connected and do not know the AC status
1370   # (e.g. coderef $dbh)
1371   if (! defined $self->_dbh_autocommit) {
1372     $self->ensure_connected;
1373   }
1374   # otherwise re-connect on pid changes, so
1375   # that the txn_depth is adjusted properly
1376   # the lightweight _get_dbh is good enoug here
1377   # (only superficial handle check, no pings)
1378   else {
1379     $self->_get_dbh;
1380   }
1381
1382   if($self->transaction_depth == 0) {
1383     $self->debugobj->txn_begin()
1384       if $self->debug;
1385     $self->_dbh_begin_work;
1386   }
1387   elsif ($self->auto_savepoint) {
1388     $self->svp_begin;
1389   }
1390   $self->{transaction_depth}++;
1391 }
1392
1393 sub _dbh_begin_work {
1394   my $self = shift;
1395
1396   # if the user is utilizing txn_do - good for him, otherwise we need to
1397   # ensure that the $dbh is healthy on BEGIN.
1398   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1399   # will be replaced by a failure of begin_work itself (which will be
1400   # then retried on reconnect)
1401   if ($self->{_in_dbh_do}) {
1402     $self->_dbh->begin_work;
1403   } else {
1404     $self->dbh_do(sub { $_[1]->begin_work });
1405   }
1406 }
1407
1408 sub txn_commit {
1409   my $self = shift;
1410   if ($self->{transaction_depth} == 1) {
1411     $self->debugobj->txn_commit()
1412       if ($self->debug);
1413     $self->_dbh_commit;
1414     $self->{transaction_depth} = 0
1415       if $self->_dbh_autocommit;
1416   }
1417   elsif($self->{transaction_depth} > 1) {
1418     $self->{transaction_depth}--;
1419     $self->svp_release
1420       if $self->auto_savepoint;
1421   }
1422   else {
1423     $self->throw_exception( 'Refusing to commit without a started transaction' );
1424   }
1425 }
1426
1427 sub _dbh_commit {
1428   my $self = shift;
1429   my $dbh  = $self->_dbh
1430     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1431   $dbh->commit;
1432 }
1433
1434 sub txn_rollback {
1435   my $self = shift;
1436   my $dbh = $self->_dbh;
1437   try {
1438     if ($self->{transaction_depth} == 1) {
1439       $self->debugobj->txn_rollback()
1440         if ($self->debug);
1441       $self->{transaction_depth} = 0
1442         if $self->_dbh_autocommit;
1443       $self->_dbh_rollback;
1444     }
1445     elsif($self->{transaction_depth} > 1) {
1446       $self->{transaction_depth}--;
1447       if ($self->auto_savepoint) {
1448         $self->svp_rollback;
1449         $self->svp_release;
1450       }
1451     }
1452     else {
1453       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1454     }
1455   }
1456   catch {
1457     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1458
1459     if ($_ !~ /$exception_class/) {
1460       # ensure that a failed rollback resets the transaction depth
1461       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1462     }
1463
1464     $self->throw_exception($_)
1465   };
1466 }
1467
1468 sub _dbh_rollback {
1469   my $self = shift;
1470   my $dbh  = $self->_dbh
1471     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1472   $dbh->rollback;
1473 }
1474
1475 # This used to be the top-half of _execute.  It was split out to make it
1476 #  easier to override in NoBindVars without duping the rest.  It takes up
1477 #  all of _execute's args, and emits $sql, @bind.
1478 sub _prep_for_execute {
1479   my ($self, $op, $extra_bind, $ident, $args) = @_;
1480
1481   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1482     $ident = $ident->from();
1483   }
1484
1485   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1486
1487   unshift(@bind,
1488     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1489       if $extra_bind;
1490   return ($sql, \@bind);
1491 }
1492
1493
1494 sub _fix_bind_params {
1495     my ($self, @bind) = @_;
1496
1497     ### Turn @bind from something like this:
1498     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1499     ### to this:
1500     ###   ( "'1'", "'1'", "'3'" )
1501     return
1502         map {
1503             if ( defined( $_ && $_->[1] ) ) {
1504                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1505             }
1506             else { q{NULL}; }
1507         } @bind;
1508 }
1509
1510 sub _query_start {
1511     my ( $self, $sql, @bind ) = @_;
1512
1513     if ( $self->debug ) {
1514         @bind = $self->_fix_bind_params(@bind);
1515
1516         $self->debugobj->query_start( $sql, @bind );
1517     }
1518 }
1519
1520 sub _query_end {
1521     my ( $self, $sql, @bind ) = @_;
1522
1523     if ( $self->debug ) {
1524         @bind = $self->_fix_bind_params(@bind);
1525         $self->debugobj->query_end( $sql, @bind );
1526     }
1527 }
1528
1529 sub _dbh_execute {
1530   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1531
1532   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1533
1534   $self->_query_start( $sql, @$bind );
1535
1536   my $sth = $self->sth($sql,$op);
1537
1538   my $placeholder_index = 1;
1539
1540   foreach my $bound (@$bind) {
1541     my $attributes = {};
1542     my($column_name, @data) = @$bound;
1543
1544     if ($bind_attributes) {
1545       $attributes = $bind_attributes->{$column_name}
1546       if defined $bind_attributes->{$column_name};
1547     }
1548
1549     foreach my $data (@data) {
1550       my $ref = ref $data;
1551       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1552
1553       $sth->bind_param($placeholder_index, $data, $attributes);
1554       $placeholder_index++;
1555     }
1556   }
1557
1558   # Can this fail without throwing an exception anyways???
1559   my $rv = $sth->execute();
1560   $self->throw_exception(
1561     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1562   ) if !$rv;
1563
1564   $self->_query_end( $sql, @$bind );
1565
1566   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1567 }
1568
1569 sub _execute {
1570     my $self = shift;
1571     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1572 }
1573
1574 sub _prefetch_autovalues {
1575   my ($self, $source, $to_insert) = @_;
1576
1577   my $colinfo = $source->columns_info;
1578
1579   my %values;
1580   for my $col (keys %$colinfo) {
1581     if (
1582       $colinfo->{$col}{auto_nextval}
1583         and
1584       (
1585         ! exists $to_insert->{$col}
1586           or
1587         ref $to_insert->{$col} eq 'SCALAR'
1588       )
1589     ) {
1590       $values{$col} = $self->_sequence_fetch(
1591         'nextval',
1592         ( $colinfo->{$col}{sequence} ||=
1593             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1594         ),
1595       );
1596     }
1597   }
1598
1599   \%values;
1600 }
1601
1602 sub insert {
1603   my ($self, $source, $to_insert) = @_;
1604
1605   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1606
1607   # fuse the values
1608   $to_insert = { %$to_insert, %$prefetched_values };
1609
1610   # list of primary keys we try to fetch from the database
1611   # both not-exsists and scalarrefs are considered
1612   my %fetch_pks;
1613   %fetch_pks = ( map
1614     { $_ => scalar keys %fetch_pks }  # so we can preserve order for prettyness
1615     grep
1616       { ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR' }
1617       $source->primary_columns
1618   );
1619
1620   my $sqla_opts;
1621   if ($self->_use_insert_returning) {
1622
1623     # retain order as declared in the resultsource
1624     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1625       push @{$sqla_opts->{returning}}, $_;
1626     }
1627   }
1628
1629   my $bind_attributes = $self->source_bind_attributes($source);
1630
1631   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1632
1633   my %returned_cols;
1634
1635   if (my $retlist = $sqla_opts->{returning}) {
1636     my @ret_vals = try {
1637       local $SIG{__WARN__} = sub {};
1638       my @r = $sth->fetchrow_array;
1639       $sth->finish;
1640       @r;
1641     };
1642
1643     @returned_cols{@$retlist} = @ret_vals if @ret_vals;
1644   }
1645
1646   return { %$prefetched_values, %returned_cols };
1647 }
1648
1649
1650 ## Currently it is assumed that all values passed will be "normal", i.e. not
1651 ## scalar refs, or at least, all the same type as the first set, the statement is
1652 ## only prepped once.
1653 sub insert_bulk {
1654   my ($self, $source, $cols, $data) = @_;
1655
1656   my %colvalues;
1657   @colvalues{@$cols} = (0..$#$cols);
1658
1659   for my $i (0..$#$cols) {
1660     my $first_val = $data->[0][$i];
1661     next unless ref $first_val eq 'SCALAR';
1662
1663     $colvalues{ $cols->[$i] } = $first_val;
1664   }
1665
1666   # check for bad data and stringify stringifiable objects
1667   my $bad_slice = sub {
1668     my ($msg, $col_idx, $slice_idx) = @_;
1669     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1670       $msg,
1671       $cols->[$col_idx],
1672       do {
1673         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1674         Dumper {
1675           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1676         },
1677       }
1678     );
1679   };
1680
1681   for my $datum_idx (0..$#$data) {
1682     my $datum = $data->[$datum_idx];
1683
1684     for my $col_idx (0..$#$cols) {
1685       my $val            = $datum->[$col_idx];
1686       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1687       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1688
1689       if ($is_literal_sql) {
1690         if (not ref $val) {
1691           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1692         }
1693         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1694           $bad_slice->("$reftype reference found where literal SQL expected",
1695             $col_idx, $datum_idx);
1696         }
1697         elsif ($$val ne $$sqla_bind){
1698           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1699             $col_idx, $datum_idx);
1700         }
1701       }
1702       elsif (my $reftype = ref $val) {
1703         require overload;
1704         if (overload::Method($val, '""')) {
1705           $datum->[$col_idx] = "".$val;
1706         }
1707         else {
1708           $bad_slice->("$reftype reference found where bind expected",
1709             $col_idx, $datum_idx);
1710         }
1711       }
1712     }
1713   }
1714
1715   my ($sql, $bind) = $self->_prep_for_execute (
1716     'insert', undef, $source, [\%colvalues]
1717   );
1718
1719   if (! @$bind) {
1720     # if the bindlist is empty - make sure all "values" are in fact
1721     # literal scalarrefs. If not the case this means the storage ate
1722     # them away (e.g. the NoBindVars component) and interpolated them
1723     # directly into the SQL. This obviosly can't be good for multi-inserts
1724
1725     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1726       if first { ref $_ ne 'SCALAR' } values %colvalues;
1727   }
1728
1729   # neither _execute_array, nor _execute_inserts_with_no_binds are
1730   # atomic (even if _execute _array is a single call). Thus a safety
1731   # scope guard
1732   my $guard = $self->txn_scope_guard;
1733
1734   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1735   my $sth = $self->sth($sql);
1736   my $rv = do {
1737     if (@$bind) {
1738       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1739       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1740     }
1741     else {
1742       # bind_param_array doesn't work if there are no binds
1743       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1744     }
1745   };
1746
1747   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1748
1749   $guard->commit;
1750
1751   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1752 }
1753
1754 sub _execute_array {
1755   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1756
1757   ## This must be an arrayref, else nothing works!
1758   my $tuple_status = [];
1759
1760   ## Get the bind_attributes, if any exist
1761   my $bind_attributes = $self->source_bind_attributes($source);
1762
1763   ## Bind the values and execute
1764   my $placeholder_index = 1;
1765
1766   foreach my $bound (@$bind) {
1767
1768     my $attributes = {};
1769     my ($column_name, $data_index) = @$bound;
1770
1771     if( $bind_attributes ) {
1772       $attributes = $bind_attributes->{$column_name}
1773       if defined $bind_attributes->{$column_name};
1774     }
1775
1776     my @data = map { $_->[$data_index] } @$data;
1777
1778     $sth->bind_param_array(
1779       $placeholder_index,
1780       [@data],
1781       (%$attributes ?  $attributes : ()),
1782     );
1783     $placeholder_index++;
1784   }
1785
1786   my ($rv, $err);
1787   try {
1788     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1789   }
1790   catch {
1791     $err = shift;
1792   };
1793
1794   # Statement must finish even if there was an exception.
1795   try {
1796     $sth->finish
1797   }
1798   catch {
1799     $err = shift unless defined $err
1800   };
1801
1802   $err = $sth->errstr
1803     if (! defined $err and $sth->err);
1804
1805   if (defined $err) {
1806     my $i = 0;
1807     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1808
1809     $self->throw_exception("Unexpected populate error: $err")
1810       if ($i > $#$tuple_status);
1811
1812     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1813       ($tuple_status->[$i][1] || $err),
1814       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1815     );
1816   }
1817
1818   return $rv;
1819 }
1820
1821 sub _dbh_execute_array {
1822     my ($self, $sth, $tuple_status, @extra) = @_;
1823
1824     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1825 }
1826
1827 sub _dbh_execute_inserts_with_no_binds {
1828   my ($self, $sth, $count) = @_;
1829
1830   my $err;
1831   try {
1832     my $dbh = $self->_get_dbh;
1833     local $dbh->{RaiseError} = 1;
1834     local $dbh->{PrintError} = 0;
1835
1836     $sth->execute foreach 1..$count;
1837   }
1838   catch {
1839     $err = shift;
1840   };
1841
1842   # Make sure statement is finished even if there was an exception.
1843   try {
1844     $sth->finish
1845   }
1846   catch {
1847     $err = shift unless defined $err;
1848   };
1849
1850   $self->throw_exception($err) if defined $err;
1851
1852   return $count;
1853 }
1854
1855 sub update {
1856   my ($self, $source, @args) = @_;
1857
1858   my $bind_attrs = $self->source_bind_attributes($source);
1859
1860   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1861 }
1862
1863
1864 sub delete {
1865   my ($self, $source, @args) = @_;
1866
1867   my $bind_attrs = $self->source_bind_attributes($source);
1868
1869   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1870 }
1871
1872 # We were sent here because the $rs contains a complex search
1873 # which will require a subquery to select the correct rows
1874 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1875 #
1876 # Generating a single PK column subquery is trivial and supported
1877 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1878 # Look at _multipk_update_delete()
1879 sub _subq_update_delete {
1880   my $self = shift;
1881   my ($rs, $op, $values) = @_;
1882
1883   my $rsrc = $rs->result_source;
1884
1885   # quick check if we got a sane rs on our hands
1886   my @pcols = $rsrc->_pri_cols;
1887
1888   my $sel = $rs->_resolved_attrs->{select};
1889   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1890
1891   if (
1892       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1893         ne
1894       join ("\x00", sort @$sel )
1895   ) {
1896     $self->throw_exception (
1897       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1898     );
1899   }
1900
1901   if (@pcols == 1) {
1902     return $self->$op (
1903       $rsrc,
1904       $op eq 'update' ? $values : (),
1905       { $pcols[0] => { -in => $rs->as_query } },
1906     );
1907   }
1908
1909   else {
1910     return $self->_multipk_update_delete (@_);
1911   }
1912 }
1913
1914 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1915 # resultset update/delete involving subqueries. So by default resort
1916 # to simple (and inefficient) delete_all style per-row opearations,
1917 # while allowing specific storages to override this with a faster
1918 # implementation.
1919 #
1920 sub _multipk_update_delete {
1921   return shift->_per_row_update_delete (@_);
1922 }
1923
1924 # This is the default loop used to delete/update rows for multi PK
1925 # resultsets, and used by mysql exclusively (because it can't do anything
1926 # else).
1927 #
1928 # We do not use $row->$op style queries, because resultset update/delete
1929 # is not expected to cascade (this is what delete_all/update_all is for).
1930 #
1931 # There should be no race conditions as the entire operation is rolled
1932 # in a transaction.
1933 #
1934 sub _per_row_update_delete {
1935   my $self = shift;
1936   my ($rs, $op, $values) = @_;
1937
1938   my $rsrc = $rs->result_source;
1939   my @pcols = $rsrc->_pri_cols;
1940
1941   my $guard = $self->txn_scope_guard;
1942
1943   # emulate the return value of $sth->execute for non-selects
1944   my $row_cnt = '0E0';
1945
1946   my $subrs_cur = $rs->cursor;
1947   my @all_pk = $subrs_cur->all;
1948   for my $pks ( @all_pk) {
1949
1950     my $cond;
1951     for my $i (0.. $#pcols) {
1952       $cond->{$pcols[$i]} = $pks->[$i];
1953     }
1954
1955     $self->$op (
1956       $rsrc,
1957       $op eq 'update' ? $values : (),
1958       $cond,
1959     );
1960
1961     $row_cnt++;
1962   }
1963
1964   $guard->commit;
1965
1966   return $row_cnt;
1967 }
1968
1969 sub _select {
1970   my $self = shift;
1971   $self->_execute($self->_select_args(@_));
1972 }
1973
1974 sub _select_args_to_query {
1975   my $self = shift;
1976
1977   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1978   #  = $self->_select_args($ident, $select, $cond, $attrs);
1979   my ($op, $bind, $ident, $bind_attrs, @args) =
1980     $self->_select_args(@_);
1981
1982   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1983   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1984   $prepared_bind ||= [];
1985
1986   return wantarray
1987     ? ($sql, $prepared_bind, $bind_attrs)
1988     : \[ "($sql)", @$prepared_bind ]
1989   ;
1990 }
1991
1992 sub _select_args {
1993   my ($self, $ident, $select, $where, $attrs) = @_;
1994
1995   my $sql_maker = $self->sql_maker;
1996   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1997
1998   $attrs = {
1999     %$attrs,
2000     select => $select,
2001     from => $ident,
2002     where => $where,
2003     $rs_alias && $alias2source->{$rs_alias}
2004       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
2005       : ()
2006     ,
2007   };
2008
2009   # calculate bind_attrs before possible $ident mangling
2010   my $bind_attrs = {};
2011   for my $alias (keys %$alias2source) {
2012     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2013     for my $col (keys %$bindtypes) {
2014
2015       my $fqcn = join ('.', $alias, $col);
2016       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2017
2018       # Unqialified column names are nice, but at the same time can be
2019       # rather ambiguous. What we do here is basically go along with
2020       # the loop, adding an unqualified column slot to $bind_attrs,
2021       # alongside the fully qualified name. As soon as we encounter
2022       # another column by that name (which would imply another table)
2023       # we unset the unqualified slot and never add any info to it
2024       # to avoid erroneous type binding. If this happens the users
2025       # only choice will be to fully qualify his column name
2026
2027       if (exists $bind_attrs->{$col}) {
2028         $bind_attrs->{$col} = {};
2029       }
2030       else {
2031         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2032       }
2033     }
2034   }
2035
2036   # Sanity check the attributes (SQLMaker does it too, but
2037   # in case of a software_limit we'll never reach there)
2038   if (defined $attrs->{offset}) {
2039     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2040       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2041   }
2042   $attrs->{offset} ||= 0;
2043
2044   if (defined $attrs->{rows}) {
2045     $self->throw_exception("The rows attribute must be a positive integer if present")
2046       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2047   }
2048   elsif ($attrs->{offset}) {
2049     # MySQL actually recommends this approach.  I cringe.
2050     $attrs->{rows} = $sql_maker->__max_int;
2051   }
2052
2053   my @limit;
2054
2055   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2056   # storage, unless software limit was requested
2057   if (
2058     #limited has_many
2059     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2060        ||
2061     # grouped prefetch (to satisfy group_by == select)
2062     ( $attrs->{group_by}
2063         &&
2064       @{$attrs->{group_by}}
2065         &&
2066       $attrs->{_prefetch_select}
2067         &&
2068       @{$attrs->{_prefetch_select}}
2069     )
2070   ) {
2071     ($ident, $select, $where, $attrs)
2072       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2073   }
2074   elsif (! $attrs->{software_limit} ) {
2075     push @limit, $attrs->{rows}, $attrs->{offset};
2076   }
2077
2078   # try to simplify the joinmap further (prune unreferenced type-single joins)
2079   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2080
2081 ###
2082   # This would be the point to deflate anything found in $where
2083   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2084   # expect a row object. And all we have is a resultsource (it is trivial
2085   # to extract deflator coderefs via $alias2source above).
2086   #
2087   # I don't see a way forward other than changing the way deflators are
2088   # invoked, and that's just bad...
2089 ###
2090
2091   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2092 }
2093
2094 # Returns a counting SELECT for a simple count
2095 # query. Abstracted so that a storage could override
2096 # this to { count => 'firstcol' } or whatever makes
2097 # sense as a performance optimization
2098 sub _count_select {
2099   #my ($self, $source, $rs_attrs) = @_;
2100   return { count => '*' };
2101 }
2102
2103
2104 sub source_bind_attributes {
2105   my ($self, $source) = @_;
2106
2107   my $bind_attributes;
2108
2109   my $colinfo = $source->columns_info;
2110
2111   for my $col (keys %$colinfo) {
2112     if (my $dt = $colinfo->{$col}{data_type} ) {
2113       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2114     }
2115   }
2116
2117   return $bind_attributes;
2118 }
2119
2120 =head2 select
2121
2122 =over 4
2123
2124 =item Arguments: $ident, $select, $condition, $attrs
2125
2126 =back
2127
2128 Handle a SQL select statement.
2129
2130 =cut
2131
2132 sub select {
2133   my $self = shift;
2134   my ($ident, $select, $condition, $attrs) = @_;
2135   return $self->cursor_class->new($self, \@_, $attrs);
2136 }
2137
2138 sub select_single {
2139   my $self = shift;
2140   my ($rv, $sth, @bind) = $self->_select(@_);
2141   my @row = $sth->fetchrow_array;
2142   my @nextrow = $sth->fetchrow_array if @row;
2143   if(@row && @nextrow) {
2144     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2145   }
2146   # Need to call finish() to work round broken DBDs
2147   $sth->finish();
2148   return @row;
2149 }
2150
2151 =head2 sql_limit_dialect
2152
2153 This is an accessor for the default SQL limit dialect used by a particular
2154 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2155 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2156 see L<DBIx::Class::SQLMaker::LimitDialects>.
2157
2158 =head2 sth
2159
2160 =over 4
2161
2162 =item Arguments: $sql
2163
2164 =back
2165
2166 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2167
2168 =cut
2169
2170 sub _dbh_sth {
2171   my ($self, $dbh, $sql) = @_;
2172
2173   # 3 is the if_active parameter which avoids active sth re-use
2174   my $sth = $self->disable_sth_caching
2175     ? $dbh->prepare($sql)
2176     : $dbh->prepare_cached($sql, {}, 3);
2177
2178   # XXX You would think RaiseError would make this impossible,
2179   #  but apparently that's not true :(
2180   $self->throw_exception($dbh->errstr) if !$sth;
2181
2182   $sth;
2183 }
2184
2185 sub sth {
2186   my ($self, $sql) = @_;
2187   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2188 }
2189
2190 sub _dbh_columns_info_for {
2191   my ($self, $dbh, $table) = @_;
2192
2193   if ($dbh->can('column_info')) {
2194     my %result;
2195     my $caught;
2196     try {
2197       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2198       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2199       $sth->execute();
2200       while ( my $info = $sth->fetchrow_hashref() ){
2201         my %column_info;
2202         $column_info{data_type}   = $info->{TYPE_NAME};
2203         $column_info{size}      = $info->{COLUMN_SIZE};
2204         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2205         $column_info{default_value} = $info->{COLUMN_DEF};
2206         my $col_name = $info->{COLUMN_NAME};
2207         $col_name =~ s/^\"(.*)\"$/$1/;
2208
2209         $result{$col_name} = \%column_info;
2210       }
2211     } catch {
2212       $caught = 1;
2213     };
2214     return \%result if !$caught && scalar keys %result;
2215   }
2216
2217   my %result;
2218   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2219   $sth->execute;
2220   my @columns = @{$sth->{NAME_lc}};
2221   for my $i ( 0 .. $#columns ){
2222     my %column_info;
2223     $column_info{data_type} = $sth->{TYPE}->[$i];
2224     $column_info{size} = $sth->{PRECISION}->[$i];
2225     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2226
2227     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2228       $column_info{data_type} = $1;
2229       $column_info{size}    = $2;
2230     }
2231
2232     $result{$columns[$i]} = \%column_info;
2233   }
2234   $sth->finish;
2235
2236   foreach my $col (keys %result) {
2237     my $colinfo = $result{$col};
2238     my $type_num = $colinfo->{data_type};
2239     my $type_name;
2240     if(defined $type_num && $dbh->can('type_info')) {
2241       my $type_info = $dbh->type_info($type_num);
2242       $type_name = $type_info->{TYPE_NAME} if $type_info;
2243       $colinfo->{data_type} = $type_name if $type_name;
2244     }
2245   }
2246
2247   return \%result;
2248 }
2249
2250 sub columns_info_for {
2251   my ($self, $table) = @_;
2252   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2253 }
2254
2255 =head2 last_insert_id
2256
2257 Return the row id of the last insert.
2258
2259 =cut
2260
2261 sub _dbh_last_insert_id {
2262     my ($self, $dbh, $source, $col) = @_;
2263
2264     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2265
2266     return $id if defined $id;
2267
2268     my $class = ref $self;
2269     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2270 }
2271
2272 sub last_insert_id {
2273   my $self = shift;
2274   $self->_dbh_last_insert_id ($self->_dbh, @_);
2275 }
2276
2277 =head2 _native_data_type
2278
2279 =over 4
2280
2281 =item Arguments: $type_name
2282
2283 =back
2284
2285 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2286 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2287 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2288
2289 The default implementation returns C<undef>, implement in your Storage driver if
2290 you need this functionality.
2291
2292 Should map types from other databases to the native RDBMS type, for example
2293 C<VARCHAR2> to C<VARCHAR>.
2294
2295 Types with modifiers should map to the underlying data type. For example,
2296 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2297
2298 Composite types should map to the container type, for example
2299 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2300
2301 =cut
2302
2303 sub _native_data_type {
2304   #my ($self, $data_type) = @_;
2305   return undef
2306 }
2307
2308 # Check if placeholders are supported at all
2309 sub _determine_supports_placeholders {
2310   my $self = shift;
2311   my $dbh  = $self->_get_dbh;
2312
2313   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2314   # but it is inaccurate more often than not
2315   return try {
2316     local $dbh->{PrintError} = 0;
2317     local $dbh->{RaiseError} = 1;
2318     $dbh->do('select ?', {}, 1);
2319     1;
2320   }
2321   catch {
2322     0;
2323   };
2324 }
2325
2326 # Check if placeholders bound to non-string types throw exceptions
2327 #
2328 sub _determine_supports_typeless_placeholders {
2329   my $self = shift;
2330   my $dbh  = $self->_get_dbh;
2331
2332   return try {
2333     local $dbh->{PrintError} = 0;
2334     local $dbh->{RaiseError} = 1;
2335     # this specifically tests a bind that is NOT a string
2336     $dbh->do('select 1 where 1 = ?', {}, 1);
2337     1;
2338   }
2339   catch {
2340     0;
2341   };
2342 }
2343
2344 =head2 sqlt_type
2345
2346 Returns the database driver name.
2347
2348 =cut
2349
2350 sub sqlt_type {
2351   shift->_get_dbh->{Driver}->{Name};
2352 }
2353
2354 =head2 bind_attribute_by_data_type
2355
2356 Given a datatype from column info, returns a database specific bind
2357 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2358 let the database planner just handle it.
2359
2360 Generally only needed for special case column types, like bytea in postgres.
2361
2362 =cut
2363
2364 sub bind_attribute_by_data_type {
2365     return;
2366 }
2367
2368 =head2 is_datatype_numeric
2369
2370 Given a datatype from column_info, returns a boolean value indicating if
2371 the current RDBMS considers it a numeric value. This controls how
2372 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2373 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2374 be performed instead of the usual C<eq>.
2375
2376 =cut
2377
2378 sub is_datatype_numeric {
2379   my ($self, $dt) = @_;
2380
2381   return 0 unless $dt;
2382
2383   return $dt =~ /^ (?:
2384     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2385   ) $/ix;
2386 }
2387
2388
2389 =head2 create_ddl_dir
2390
2391 =over 4
2392
2393 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2394
2395 =back
2396
2397 Creates a SQL file based on the Schema, for each of the specified
2398 database engines in C<\@databases> in the given directory.
2399 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2400
2401 Given a previous version number, this will also create a file containing
2402 the ALTER TABLE statements to transform the previous schema into the
2403 current one. Note that these statements may contain C<DROP TABLE> or
2404 C<DROP COLUMN> statements that can potentially destroy data.
2405
2406 The file names are created using the C<ddl_filename> method below, please
2407 override this method in your schema if you would like a different file
2408 name format. For the ALTER file, the same format is used, replacing
2409 $version in the name with "$preversion-$version".
2410
2411 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2412 The most common value for this would be C<< { add_drop_table => 1 } >>
2413 to have the SQL produced include a C<DROP TABLE> statement for each table
2414 created. For quoting purposes supply C<quote_table_names> and
2415 C<quote_field_names>.
2416
2417 If no arguments are passed, then the following default values are assumed:
2418
2419 =over 4
2420
2421 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2422
2423 =item version    - $schema->schema_version
2424
2425 =item directory  - './'
2426
2427 =item preversion - <none>
2428
2429 =back
2430
2431 By default, C<\%sqlt_args> will have
2432
2433  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2434
2435 merged with the hash passed in. To disable any of those features, pass in a
2436 hashref like the following
2437
2438  { ignore_constraint_names => 0, # ... other options }
2439
2440
2441 WARNING: You are strongly advised to check all SQL files created, before applying
2442 them.
2443
2444 =cut
2445
2446 sub create_ddl_dir {
2447   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2448
2449   unless ($dir) {
2450     carp "No directory given, using ./\n";
2451     $dir = './';
2452   } else {
2453       -d $dir
2454         or
2455       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2456         or
2457       $self->throw_exception(
2458         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2459       );
2460   }
2461
2462   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2463
2464   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2465   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2466
2467   my $schema_version = $schema->schema_version || '1.x';
2468   $version ||= $schema_version;
2469
2470   $sqltargs = {
2471     add_drop_table => 1,
2472     ignore_constraint_names => 1,
2473     ignore_index_names => 1,
2474     %{$sqltargs || {}}
2475   };
2476
2477   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2478     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2479   }
2480
2481   my $sqlt = SQL::Translator->new( $sqltargs );
2482
2483   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2484   my $sqlt_schema = $sqlt->translate({ data => $schema })
2485     or $self->throw_exception ($sqlt->error);
2486
2487   foreach my $db (@$databases) {
2488     $sqlt->reset();
2489     $sqlt->{schema} = $sqlt_schema;
2490     $sqlt->producer($db);
2491
2492     my $file;
2493     my $filename = $schema->ddl_filename($db, $version, $dir);
2494     if (-e $filename && ($version eq $schema_version )) {
2495       # if we are dumping the current version, overwrite the DDL
2496       carp "Overwriting existing DDL file - $filename";
2497       unlink($filename);
2498     }
2499
2500     my $output = $sqlt->translate;
2501     if(!$output) {
2502       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2503       next;
2504     }
2505     if(!open($file, ">$filename")) {
2506       $self->throw_exception("Can't open $filename for writing ($!)");
2507       next;
2508     }
2509     print $file $output;
2510     close($file);
2511
2512     next unless ($preversion);
2513
2514     require SQL::Translator::Diff;
2515
2516     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2517     if(!-e $prefilename) {
2518       carp("No previous schema file found ($prefilename)");
2519       next;
2520     }
2521
2522     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2523     if(-e $difffile) {
2524       carp("Overwriting existing diff file - $difffile");
2525       unlink($difffile);
2526     }
2527
2528     my $source_schema;
2529     {
2530       my $t = SQL::Translator->new($sqltargs);
2531       $t->debug( 0 );
2532       $t->trace( 0 );
2533
2534       $t->parser( $db )
2535         or $self->throw_exception ($t->error);
2536
2537       my $out = $t->translate( $prefilename )
2538         or $self->throw_exception ($t->error);
2539
2540       $source_schema = $t->schema;
2541
2542       $source_schema->name( $prefilename )
2543         unless ( $source_schema->name );
2544     }
2545
2546     # The "new" style of producers have sane normalization and can support
2547     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2548     # And we have to diff parsed SQL against parsed SQL.
2549     my $dest_schema = $sqlt_schema;
2550
2551     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2552       my $t = SQL::Translator->new($sqltargs);
2553       $t->debug( 0 );
2554       $t->trace( 0 );
2555
2556       $t->parser( $db )
2557         or $self->throw_exception ($t->error);
2558
2559       my $out = $t->translate( $filename )
2560         or $self->throw_exception ($t->error);
2561
2562       $dest_schema = $t->schema;
2563
2564       $dest_schema->name( $filename )
2565         unless $dest_schema->name;
2566     }
2567
2568     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2569                                                   $dest_schema,   $db,
2570                                                   $sqltargs
2571                                                  );
2572     if(!open $file, ">$difffile") {
2573       $self->throw_exception("Can't write to $difffile ($!)");
2574       next;
2575     }
2576     print $file $diff;
2577     close($file);
2578   }
2579 }
2580
2581 =head2 deployment_statements
2582
2583 =over 4
2584
2585 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2586
2587 =back
2588
2589 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2590
2591 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2592 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2593
2594 C<$directory> is used to return statements from files in a previously created
2595 L</create_ddl_dir> directory and is optional. The filenames are constructed
2596 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2597
2598 If no C<$directory> is specified then the statements are constructed on the
2599 fly using L<SQL::Translator> and C<$version> is ignored.
2600
2601 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2602
2603 =cut
2604
2605 sub deployment_statements {
2606   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2607   $type ||= $self->sqlt_type;
2608   $version ||= $schema->schema_version || '1.x';
2609   $dir ||= './';
2610   my $filename = $schema->ddl_filename($type, $version, $dir);
2611   if(-f $filename)
2612   {
2613       my $file;
2614       open($file, "<$filename")
2615         or $self->throw_exception("Can't open $filename ($!)");
2616       my @rows = <$file>;
2617       close($file);
2618       return join('', @rows);
2619   }
2620
2621   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2622     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2623   }
2624
2625   # sources needs to be a parser arg, but for simplicty allow at top level
2626   # coming in
2627   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2628       if exists $sqltargs->{sources};
2629
2630   my $tr = SQL::Translator->new(
2631     producer => "SQL::Translator::Producer::${type}",
2632     %$sqltargs,
2633     parser => 'SQL::Translator::Parser::DBIx::Class',
2634     data => $schema,
2635   );
2636
2637   my @ret;
2638   my $wa = wantarray;
2639   if ($wa) {
2640     @ret = $tr->translate;
2641   }
2642   else {
2643     $ret[0] = $tr->translate;
2644   }
2645
2646   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2647     unless (@ret && defined $ret[0]);
2648
2649   return $wa ? @ret : $ret[0];
2650 }
2651
2652 sub deploy {
2653   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2654   my $deploy = sub {
2655     my $line = shift;
2656     return if($line =~ /^--/);
2657     return if(!$line);
2658     # next if($line =~ /^DROP/m);
2659     return if($line =~ /^BEGIN TRANSACTION/m);
2660     return if($line =~ /^COMMIT/m);
2661     return if $line =~ /^\s+$/; # skip whitespace only
2662     $self->_query_start($line);
2663     try {
2664       # do a dbh_do cycle here, as we need some error checking in
2665       # place (even though we will ignore errors)
2666       $self->dbh_do (sub { $_[1]->do($line) });
2667     } catch {
2668       carp qq{$_ (running "${line}")};
2669     };
2670     $self->_query_end($line);
2671   };
2672   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2673   if (@statements > 1) {
2674     foreach my $statement (@statements) {
2675       $deploy->( $statement );
2676     }
2677   }
2678   elsif (@statements == 1) {
2679     foreach my $line ( split(";\n", $statements[0])) {
2680       $deploy->( $line );
2681     }
2682   }
2683 }
2684
2685 =head2 datetime_parser
2686
2687 Returns the datetime parser class
2688
2689 =cut
2690
2691 sub datetime_parser {
2692   my $self = shift;
2693   return $self->{datetime_parser} ||= do {
2694     $self->build_datetime_parser(@_);
2695   };
2696 }
2697
2698 =head2 datetime_parser_type
2699
2700 Defines (returns) the datetime parser class - currently hardwired to
2701 L<DateTime::Format::MySQL>
2702
2703 =cut
2704
2705 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2706
2707 =head2 build_datetime_parser
2708
2709 See L</datetime_parser>
2710
2711 =cut
2712
2713 sub build_datetime_parser {
2714   my $self = shift;
2715   my $type = $self->datetime_parser_type(@_);
2716   $self->ensure_class_loaded ($type);
2717   return $type;
2718 }
2719
2720
2721 =head2 is_replicating
2722
2723 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2724 replicate from a master database.  Default is undef, which is the result
2725 returned by databases that don't support replication.
2726
2727 =cut
2728
2729 sub is_replicating {
2730     return;
2731
2732 }
2733
2734 =head2 lag_behind_master
2735
2736 Returns a number that represents a certain amount of lag behind a master db
2737 when a given storage is replicating.  The number is database dependent, but
2738 starts at zero and increases with the amount of lag. Default in undef
2739
2740 =cut
2741
2742 sub lag_behind_master {
2743     return;
2744 }
2745
2746 =head2 relname_to_table_alias
2747
2748 =over 4
2749
2750 =item Arguments: $relname, $join_count
2751
2752 =back
2753
2754 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2755 queries.
2756
2757 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2758 way these aliases are named.
2759
2760 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2761 otherwise C<"$relname">.
2762
2763 =cut
2764
2765 sub relname_to_table_alias {
2766   my ($self, $relname, $join_count) = @_;
2767
2768   my $alias = ($join_count && $join_count > 1 ?
2769     join('_', $relname, $join_count) : $relname);
2770
2771   return $alias;
2772 }
2773
2774 1;
2775
2776 =head1 USAGE NOTES
2777
2778 =head2 DBIx::Class and AutoCommit
2779
2780 DBIx::Class can do some wonderful magic with handling exceptions,
2781 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2782 (the default) combined with C<txn_do> for transaction support.
2783
2784 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2785 in an assumed transaction between commits, and you're telling us you'd
2786 like to manage that manually.  A lot of the magic protections offered by
2787 this module will go away.  We can't protect you from exceptions due to database
2788 disconnects because we don't know anything about how to restart your
2789 transactions.  You're on your own for handling all sorts of exceptional
2790 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2791 be with raw DBI.
2792
2793
2794 =head1 AUTHORS
2795
2796 Matt S. Trout <mst@shadowcatsystems.co.uk>
2797
2798 Andy Grundman <andy@hybridized.org>
2799
2800 =head1 LICENSE
2801
2802 You may distribute this code under the same terms as Perl itself.
2803
2804 =cut