No longer explicitly disconnect on ::Storage DESTROY
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } @capabilities );
60
61
62 # Each of these methods need _determine_driver called before itself
63 # in order to function reliably. This is a purely DRY optimization
64 #
65 # get_(use)_dbms_capability need to be called on the correct Storage
66 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
67 # _determine_supports_X which obv. needs a correct driver as well
68 my @rdbms_specific_methods = qw/
69   deployment_statements
70   sqlt_type
71   sql_maker
72   build_datetime_parser
73   datetime_parser_type
74
75   insert
76   insert_bulk
77   update
78   delete
79   select
80   select_single
81
82   get_use_dbms_capability
83   get_dbms_capability
84
85   _server_info
86   _get_server_version
87 /;
88
89 for my $meth (@rdbms_specific_methods) {
90
91   my $orig = __PACKAGE__->can ($meth)
92     or die "$meth is not a ::Storage::DBI method!";
93
94   no strict qw/refs/;
95   no warnings qw/redefine/;
96   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
97     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
98       $_[0]->_determine_driver;
99
100       # This for some reason crashes and burns on perl 5.8.1
101       # IFF the method ends up throwing an exception
102       #goto $_[0]->can ($meth);
103
104       my $cref = $_[0]->can ($meth);
105       goto $cref;
106     }
107     goto $orig;
108   };
109 }
110
111
112 =head1 NAME
113
114 DBIx::Class::Storage::DBI - DBI storage handler
115
116 =head1 SYNOPSIS
117
118   my $schema = MySchema->connect('dbi:SQLite:my.db');
119
120   $schema->storage->debug(1);
121
122   my @stuff = $schema->storage->dbh_do(
123     sub {
124       my ($storage, $dbh, @args) = @_;
125       $dbh->do("DROP TABLE authors");
126     },
127     @column_list
128   );
129
130   $schema->resultset('Book')->search({
131      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
132   });
133
134 =head1 DESCRIPTION
135
136 This class represents the connection to an RDBMS via L<DBI>.  See
137 L<DBIx::Class::Storage> for general information.  This pod only
138 documents DBI-specific methods and behaviors.
139
140 =head1 METHODS
141
142 =cut
143
144 sub new {
145   my $new = shift->next::method(@_);
146
147   $new->transaction_depth(0);
148   $new->_sql_maker_opts({});
149   $new->_dbh_details({});
150   $new->{savepoints} = [];
151   $new->{_in_dbh_do} = 0;
152   $new->{_dbh_gen} = 0;
153
154   # read below to see what this does
155   $new->_arm_global_destructor;
156
157   $new;
158 }
159
160 # This is hack to work around perl shooting stuff in random
161 # order on exit(). If we do not walk the remaining storage
162 # objects in an END block, there is a *small but real* chance
163 # of a fork()ed child to kill the parent's shared DBI handle,
164 # *before perl reaches the DESTROY in this package*
165 # Yes, it is ugly and effective.
166 {
167   my %seek_and_destroy;
168
169   sub _arm_global_destructor {
170     my $self = shift;
171     my $key = Scalar::Util::refaddr ($self);
172     $seek_and_destroy{$key} = $self;
173     Scalar::Util::weaken ($seek_and_destroy{$key});
174   }
175
176   END {
177     local $?; # just in case the DBI destructor changes it somehow
178
179     # destroy just the object if not native to this process/thread
180     $_->_preserve_foreign_dbh for (grep
181       { defined $_ }
182       values %seek_and_destroy
183     );
184   }
185 }
186
187 sub DESTROY {
188   my $self = shift;
189
190   # some databases spew warnings on implicit disconnect
191   local $SIG{__WARN__} = sub {};
192   $self->_dbh(undef);
193 }
194
195 sub _preserve_foreign_dbh {
196   my $self = shift;
197
198   return unless $self->_dbh;
199
200   $self->_verify_tid;
201
202   return unless $self->_dbh;
203
204   $self->_verify_pid;
205
206 }
207
208 # handle pid changes correctly - do not destroy parent's connection
209 sub _verify_pid {
210   my $self = shift;
211
212   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
213
214   $self->_dbh->{InactiveDestroy} = 1;
215   $self->_dbh(undef);
216   $self->{_dbh_gen}++;
217
218   return;
219 }
220
221 # very similar to above, but seems to FAIL if I set InactiveDestroy
222 sub _verify_tid {
223   my $self = shift;
224
225   if ( ! defined $self->_conn_tid ) {
226     return; # no threads
227   }
228   elsif ( $self->_conn_tid == threads->tid ) {
229     return; # same thread
230   }
231
232   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
233   $self->_dbh(undef);
234   $self->{_dbh_gen}++;
235
236   return;
237 }
238
239
240 =head2 connect_info
241
242 This method is normally called by L<DBIx::Class::Schema/connection>, which
243 encapsulates its argument list in an arrayref before passing them here.
244
245 The argument list may contain:
246
247 =over
248
249 =item *
250
251 The same 4-element argument set one would normally pass to
252 L<DBI/connect>, optionally followed by
253 L<extra attributes|/DBIx::Class specific connection attributes>
254 recognized by DBIx::Class:
255
256   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
257
258 =item *
259
260 A single code reference which returns a connected
261 L<DBI database handle|DBI/connect> optionally followed by
262 L<extra attributes|/DBIx::Class specific connection attributes> recognized
263 by DBIx::Class:
264
265   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
266
267 =item *
268
269 A single hashref with all the attributes and the dsn/user/password
270 mixed together:
271
272   $connect_info_args = [{
273     dsn => $dsn,
274     user => $user,
275     password => $pass,
276     %dbi_attributes,
277     %extra_attributes,
278   }];
279
280   $connect_info_args = [{
281     dbh_maker => sub { DBI->connect (...) },
282     %dbi_attributes,
283     %extra_attributes,
284   }];
285
286 This is particularly useful for L<Catalyst> based applications, allowing the
287 following config (L<Config::General> style):
288
289   <Model::DB>
290     schema_class   App::DB
291     <connect_info>
292       dsn          dbi:mysql:database=test
293       user         testuser
294       password     TestPass
295       AutoCommit   1
296     </connect_info>
297   </Model::DB>
298
299 The C<dsn>/C<user>/C<password> combination can be substituted by the
300 C<dbh_maker> key whose value is a coderef that returns a connected
301 L<DBI database handle|DBI/connect>
302
303 =back
304
305 Please note that the L<DBI> docs recommend that you always explicitly
306 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
307 recommends that it be set to I<1>, and that you perform transactions
308 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
309 to I<1> if you do not do explicitly set it to zero.  This is the default
310 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
311
312 =head3 DBIx::Class specific connection attributes
313
314 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
315 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
316 the following connection options. These options can be mixed in with your other
317 L<DBI> connection attributes, or placed in a separate hashref
318 (C<\%extra_attributes>) as shown above.
319
320 Every time C<connect_info> is invoked, any previous settings for
321 these options will be cleared before setting the new ones, regardless of
322 whether any options are specified in the new C<connect_info>.
323
324
325 =over
326
327 =item on_connect_do
328
329 Specifies things to do immediately after connecting or re-connecting to
330 the database.  Its value may contain:
331
332 =over
333
334 =item a scalar
335
336 This contains one SQL statement to execute.
337
338 =item an array reference
339
340 This contains SQL statements to execute in order.  Each element contains
341 a string or a code reference that returns a string.
342
343 =item a code reference
344
345 This contains some code to execute.  Unlike code references within an
346 array reference, its return value is ignored.
347
348 =back
349
350 =item on_disconnect_do
351
352 Takes arguments in the same form as L</on_connect_do> and executes them
353 immediately before disconnecting from the database.
354
355 Note, this only runs if you explicitly call L</disconnect> on the
356 storage object.
357
358 =item on_connect_call
359
360 A more generalized form of L</on_connect_do> that calls the specified
361 C<connect_call_METHOD> methods in your storage driver.
362
363   on_connect_do => 'select 1'
364
365 is equivalent to:
366
367   on_connect_call => [ [ do_sql => 'select 1' ] ]
368
369 Its values may contain:
370
371 =over
372
373 =item a scalar
374
375 Will call the C<connect_call_METHOD> method.
376
377 =item a code reference
378
379 Will execute C<< $code->($storage) >>
380
381 =item an array reference
382
383 Each value can be a method name or code reference.
384
385 =item an array of arrays
386
387 For each array, the first item is taken to be the C<connect_call_> method name
388 or code reference, and the rest are parameters to it.
389
390 =back
391
392 Some predefined storage methods you may use:
393
394 =over
395
396 =item do_sql
397
398 Executes a SQL string or a code reference that returns a SQL string. This is
399 what L</on_connect_do> and L</on_disconnect_do> use.
400
401 It can take:
402
403 =over
404
405 =item a scalar
406
407 Will execute the scalar as SQL.
408
409 =item an arrayref
410
411 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
412 attributes hashref and bind values.
413
414 =item a code reference
415
416 Will execute C<< $code->($storage) >> and execute the return array refs as
417 above.
418
419 =back
420
421 =item datetime_setup
422
423 Execute any statements necessary to initialize the database session to return
424 and accept datetime/timestamp values used with
425 L<DBIx::Class::InflateColumn::DateTime>.
426
427 Only necessary for some databases, see your specific storage driver for
428 implementation details.
429
430 =back
431
432 =item on_disconnect_call
433
434 Takes arguments in the same form as L</on_connect_call> and executes them
435 immediately before disconnecting from the database.
436
437 Calls the C<disconnect_call_METHOD> methods as opposed to the
438 C<connect_call_METHOD> methods called by L</on_connect_call>.
439
440 Note, this only runs if you explicitly call L</disconnect> on the
441 storage object.
442
443 =item disable_sth_caching
444
445 If set to a true value, this option will disable the caching of
446 statement handles via L<DBI/prepare_cached>.
447
448 =item limit_dialect
449
450 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
451 default L</sql_limit_dialect> setting of the storage (if any). For a list
452 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
453
454 =item quote_char
455
456 Specifies what characters to use to quote table and column names. If
457 you use this you will want to specify L</name_sep> as well.
458
459 C<quote_char> expects either a single character, in which case is it
460 is placed on either side of the table/column name, or an arrayref of length
461 2 in which case the table/column name is placed between the elements.
462
463 For example under MySQL you should use C<< quote_char => '`' >>, and for
464 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
465
466 =item name_sep
467
468 This only needs to be used in conjunction with C<quote_char>, and is used to
469 specify the character that separates elements (schemas, tables, columns) from
470 each other. In most cases this is simply a C<.>.
471
472 The consequences of not supplying this value is that L<SQL::Abstract>
473 will assume DBIx::Class' uses of aliases to be complete column
474 names. The output will look like I<"me.name"> when it should actually
475 be I<"me"."name">.
476
477 =item unsafe
478
479 This Storage driver normally installs its own C<HandleError>, sets
480 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
481 all database handles, including those supplied by a coderef.  It does this
482 so that it can have consistent and useful error behavior.
483
484 If you set this option to a true value, Storage will not do its usual
485 modifications to the database handle's attributes, and instead relies on
486 the settings in your connect_info DBI options (or the values you set in
487 your connection coderef, in the case that you are connecting via coderef).
488
489 Note that your custom settings can cause Storage to malfunction,
490 especially if you set a C<HandleError> handler that suppresses exceptions
491 and/or disable C<RaiseError>.
492
493 =item auto_savepoint
494
495 If this option is true, L<DBIx::Class> will use savepoints when nesting
496 transactions, making it possible to recover from failure in the inner
497 transaction without having to abort all outer transactions.
498
499 =item cursor_class
500
501 Use this argument to supply a cursor class other than the default
502 L<DBIx::Class::Storage::DBI::Cursor>.
503
504 =back
505
506 Some real-life examples of arguments to L</connect_info> and
507 L<DBIx::Class::Schema/connect>
508
509   # Simple SQLite connection
510   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
511
512   # Connect via subref
513   ->connect_info([ sub { DBI->connect(...) } ]);
514
515   # Connect via subref in hashref
516   ->connect_info([{
517     dbh_maker => sub { DBI->connect(...) },
518     on_connect_do => 'alter session ...',
519   }]);
520
521   # A bit more complicated
522   ->connect_info(
523     [
524       'dbi:Pg:dbname=foo',
525       'postgres',
526       'my_pg_password',
527       { AutoCommit => 1 },
528       { quote_char => q{"}, name_sep => q{.} },
529     ]
530   );
531
532   # Equivalent to the previous example
533   ->connect_info(
534     [
535       'dbi:Pg:dbname=foo',
536       'postgres',
537       'my_pg_password',
538       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
539     ]
540   );
541
542   # Same, but with hashref as argument
543   # See parse_connect_info for explanation
544   ->connect_info(
545     [{
546       dsn         => 'dbi:Pg:dbname=foo',
547       user        => 'postgres',
548       password    => 'my_pg_password',
549       AutoCommit  => 1,
550       quote_char  => q{"},
551       name_sep    => q{.},
552     }]
553   );
554
555   # Subref + DBIx::Class-specific connection options
556   ->connect_info(
557     [
558       sub { DBI->connect(...) },
559       {
560           quote_char => q{`},
561           name_sep => q{@},
562           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
563           disable_sth_caching => 1,
564       },
565     ]
566   );
567
568
569
570 =cut
571
572 sub connect_info {
573   my ($self, $info) = @_;
574
575   return $self->_connect_info if !$info;
576
577   $self->_connect_info($info); # copy for _connect_info
578
579   $info = $self->_normalize_connect_info($info)
580     if ref $info eq 'ARRAY';
581
582   for my $storage_opt (keys %{ $info->{storage_options} }) {
583     my $value = $info->{storage_options}{$storage_opt};
584
585     $self->$storage_opt($value);
586   }
587
588   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
589   #  the new set of options
590   $self->_sql_maker(undef);
591   $self->_sql_maker_opts({});
592
593   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
594     my $value = $info->{sql_maker_options}{$sql_maker_opt};
595
596     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
597   }
598
599   my %attrs = (
600     %{ $self->_default_dbi_connect_attributes || {} },
601     %{ $info->{attributes} || {} },
602   );
603
604   my @args = @{ $info->{arguments} };
605
606   $self->_dbi_connect_info([@args,
607     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
608
609   # FIXME - dirty:
610   # save attributes them in a separate accessor so they are always
611   # introspectable, even in case of a CODE $dbhmaker
612   $self->_dbic_connect_attributes (\%attrs);
613
614   return $self->_connect_info;
615 }
616
617 sub _normalize_connect_info {
618   my ($self, $info_arg) = @_;
619   my %info;
620
621   my @args = @$info_arg;  # take a shallow copy for further mutilation
622
623   # combine/pre-parse arguments depending on invocation style
624
625   my %attrs;
626   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
627     %attrs = %{ $args[1] || {} };
628     @args = $args[0];
629   }
630   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
631     %attrs = %{$args[0]};
632     @args = ();
633     if (my $code = delete $attrs{dbh_maker}) {
634       @args = $code;
635
636       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
637       if (@ignored) {
638         carp sprintf (
639             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
640           . "to the result of 'dbh_maker'",
641
642           join (', ', map { "'$_'" } (@ignored) ),
643         );
644       }
645     }
646     else {
647       @args = delete @attrs{qw/dsn user password/};
648     }
649   }
650   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
651     %attrs = (
652       % { $args[3] || {} },
653       % { $args[4] || {} },
654     );
655     @args = @args[0,1,2];
656   }
657
658   $info{arguments} = \@args;
659
660   my @storage_opts = grep exists $attrs{$_},
661     @storage_options, 'cursor_class';
662
663   @{ $info{storage_options} }{@storage_opts} =
664     delete @attrs{@storage_opts} if @storage_opts;
665
666   my @sql_maker_opts = grep exists $attrs{$_},
667     qw/limit_dialect quote_char name_sep/;
668
669   @{ $info{sql_maker_options} }{@sql_maker_opts} =
670     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
671
672   $info{attributes} = \%attrs if %attrs;
673
674   return \%info;
675 }
676
677 sub _default_dbi_connect_attributes {
678   return {
679     AutoCommit => 1,
680     RaiseError => 1,
681     PrintError => 0,
682   };
683 }
684
685 =head2 on_connect_do
686
687 This method is deprecated in favour of setting via L</connect_info>.
688
689 =cut
690
691 =head2 on_disconnect_do
692
693 This method is deprecated in favour of setting via L</connect_info>.
694
695 =cut
696
697 sub _parse_connect_do {
698   my ($self, $type) = @_;
699
700   my $val = $self->$type;
701   return () if not defined $val;
702
703   my @res;
704
705   if (not ref($val)) {
706     push @res, [ 'do_sql', $val ];
707   } elsif (ref($val) eq 'CODE') {
708     push @res, $val;
709   } elsif (ref($val) eq 'ARRAY') {
710     push @res, map { [ 'do_sql', $_ ] } @$val;
711   } else {
712     $self->throw_exception("Invalid type for $type: ".ref($val));
713   }
714
715   return \@res;
716 }
717
718 =head2 dbh_do
719
720 Arguments: ($subref | $method_name), @extra_coderef_args?
721
722 Execute the given $subref or $method_name using the new exception-based
723 connection management.
724
725 The first two arguments will be the storage object that C<dbh_do> was called
726 on and a database handle to use.  Any additional arguments will be passed
727 verbatim to the called subref as arguments 2 and onwards.
728
729 Using this (instead of $self->_dbh or $self->dbh) ensures correct
730 exception handling and reconnection (or failover in future subclasses).
731
732 Your subref should have no side-effects outside of the database, as
733 there is the potential for your subref to be partially double-executed
734 if the database connection was stale/dysfunctional.
735
736 Example:
737
738   my @stuff = $schema->storage->dbh_do(
739     sub {
740       my ($storage, $dbh, @cols) = @_;
741       my $cols = join(q{, }, @cols);
742       $dbh->selectrow_array("SELECT $cols FROM foo");
743     },
744     @column_list
745   );
746
747 =cut
748
749 sub dbh_do {
750   my $self = shift;
751   my $code = shift;
752
753   my $dbh = $self->_get_dbh;
754
755   return $self->$code($dbh, @_)
756     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
757
758   local $self->{_in_dbh_do} = 1;
759
760   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
761   my $args = \@_;
762   return try {
763     $self->$code ($dbh, @$args);
764   } catch {
765     $self->throw_exception($_) if $self->connected;
766
767     # We were not connected - reconnect and retry, but let any
768     #  exception fall right through this time
769     carp "Retrying $code after catching disconnected exception: $_"
770       if $ENV{DBIC_DBIRETRY_DEBUG};
771
772     $self->_populate_dbh;
773     $self->$code($self->_dbh, @$args);
774   };
775 }
776
777 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
778 # It also informs dbh_do to bypass itself while under the direction of txn_do,
779 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
780 sub txn_do {
781   my $self = shift;
782   my $coderef = shift;
783
784   ref $coderef eq 'CODE' or $self->throw_exception
785     ('$coderef must be a CODE reference');
786
787   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
788
789   local $self->{_in_dbh_do} = 1;
790
791   my @result;
792   my $want_array = wantarray;
793
794   my $tried = 0;
795   while(1) {
796     my $exception;
797
798     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
799     my $args = \@_;
800
801     try {
802       $self->txn_begin;
803       if($want_array) {
804           @result = $coderef->(@$args);
805       }
806       elsif(defined $want_array) {
807           $result[0] = $coderef->(@$args);
808       }
809       else {
810           $coderef->(@$args);
811       }
812       $self->txn_commit;
813     } catch {
814       $exception = $_;
815     };
816
817     if(! defined $exception) { return $want_array ? @result : $result[0] }
818
819     if($tried++ || $self->connected) {
820       my $rollback_exception;
821       try { $self->txn_rollback } catch { $rollback_exception = shift };
822       if(defined $rollback_exception) {
823         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
824         $self->throw_exception($exception)  # propagate nested rollback
825           if $rollback_exception =~ /$exception_class/;
826
827         $self->throw_exception(
828           "Transaction aborted: ${exception}. "
829           . "Rollback failed: ${rollback_exception}"
830         );
831       }
832       $self->throw_exception($exception)
833     }
834
835     # We were not connected, and was first try - reconnect and retry
836     # via the while loop
837     carp "Retrying $coderef after catching disconnected exception: $exception"
838       if $ENV{DBIC_TXNRETRY_DEBUG};
839     $self->_populate_dbh;
840   }
841 }
842
843 =head2 disconnect
844
845 Our C<disconnect> method also performs a rollback first if the
846 database is not in C<AutoCommit> mode.
847
848 =cut
849
850 sub disconnect {
851   my ($self) = @_;
852
853   if( $self->_dbh ) {
854     my @actions;
855
856     push @actions, ( $self->on_disconnect_call || () );
857     push @actions, $self->_parse_connect_do ('on_disconnect_do');
858
859     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
860
861     $self->_dbh_rollback unless $self->_dbh_autocommit;
862
863     %{ $self->_dbh->{CachedKids} } = ();
864     $self->_dbh->disconnect;
865     $self->_dbh(undef);
866     $self->{_dbh_gen}++;
867   }
868 }
869
870 =head2 with_deferred_fk_checks
871
872 =over 4
873
874 =item Arguments: C<$coderef>
875
876 =item Return Value: The return value of $coderef
877
878 =back
879
880 Storage specific method to run the code ref with FK checks deferred or
881 in MySQL's case disabled entirely.
882
883 =cut
884
885 # Storage subclasses should override this
886 sub with_deferred_fk_checks {
887   my ($self, $sub) = @_;
888   $sub->();
889 }
890
891 =head2 connected
892
893 =over
894
895 =item Arguments: none
896
897 =item Return Value: 1|0
898
899 =back
900
901 Verifies that the current database handle is active and ready to execute
902 an SQL statement (e.g. the connection did not get stale, server is still
903 answering, etc.) This method is used internally by L</dbh>.
904
905 =cut
906
907 sub connected {
908   my $self = shift;
909   return 0 unless $self->_seems_connected;
910
911   #be on the safe side
912   local $self->_dbh->{RaiseError} = 1;
913
914   return $self->_ping;
915 }
916
917 sub _seems_connected {
918   my $self = shift;
919
920   $self->_preserve_foreign_dbh;
921
922   my $dbh = $self->_dbh
923     or return 0;
924
925   return $dbh->FETCH('Active');
926 }
927
928 sub _ping {
929   my $self = shift;
930
931   my $dbh = $self->_dbh or return 0;
932
933   return $dbh->ping;
934 }
935
936 sub ensure_connected {
937   my ($self) = @_;
938
939   unless ($self->connected) {
940     $self->_populate_dbh;
941   }
942 }
943
944 =head2 dbh
945
946 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
947 is guaranteed to be healthy by implicitly calling L</connected>, and if
948 necessary performing a reconnection before returning. Keep in mind that this
949 is very B<expensive> on some database engines. Consider using L</dbh_do>
950 instead.
951
952 =cut
953
954 sub dbh {
955   my ($self) = @_;
956
957   if (not $self->_dbh) {
958     $self->_populate_dbh;
959   } else {
960     $self->ensure_connected;
961   }
962   return $self->_dbh;
963 }
964
965 # this is the internal "get dbh or connect (don't check)" method
966 sub _get_dbh {
967   my $self = shift;
968   $self->_preserve_foreign_dbh;
969   $self->_populate_dbh unless $self->_dbh;
970   return $self->_dbh;
971 }
972
973 sub sql_maker {
974   my ($self) = @_;
975   unless ($self->_sql_maker) {
976     my $sql_maker_class = $self->sql_maker_class;
977     $self->ensure_class_loaded ($sql_maker_class);
978
979     my %opts = %{$self->_sql_maker_opts||{}};
980     my $dialect =
981       $opts{limit_dialect}
982         ||
983       $self->sql_limit_dialect
984         ||
985       do {
986         my $s_class = (ref $self) || $self;
987         carp (
988           "Your storage class ($s_class) does not set sql_limit_dialect and you "
989         . 'have not supplied an explicit limit_dialect in your connection_info. '
990         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
991         . 'databases but can be (and often is) painfully slow.'
992         );
993
994         'GenericSubQ';
995       }
996     ;
997
998     $self->_sql_maker($sql_maker_class->new(
999       bindtype=>'columns',
1000       array_datatypes => 1,
1001       limit_dialect => $dialect,
1002       %opts,
1003     ));
1004   }
1005   return $self->_sql_maker;
1006 }
1007
1008 # nothing to do by default
1009 sub _rebless {}
1010 sub _init {}
1011
1012 sub _populate_dbh {
1013   my ($self) = @_;
1014
1015   my @info = @{$self->_dbi_connect_info || []};
1016   $self->_dbh(undef); # in case ->connected failed we might get sent here
1017   $self->_dbh_details({}); # reset everything we know
1018
1019   $self->_dbh($self->_connect(@info));
1020
1021   $self->_conn_pid($$);
1022   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1023
1024   $self->_determine_driver;
1025
1026   # Always set the transaction depth on connect, since
1027   #  there is no transaction in progress by definition
1028   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1029
1030   $self->_run_connection_actions unless $self->{_in_determine_driver};
1031 }
1032
1033 sub _run_connection_actions {
1034   my $self = shift;
1035   my @actions;
1036
1037   push @actions, ( $self->on_connect_call || () );
1038   push @actions, $self->_parse_connect_do ('on_connect_do');
1039
1040   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1041 }
1042
1043
1044
1045 sub set_use_dbms_capability {
1046   $_[0]->set_inherited ($_[1], $_[2]);
1047 }
1048
1049 sub get_use_dbms_capability {
1050   my ($self, $capname) = @_;
1051
1052   my $use = $self->get_inherited ($capname);
1053   return defined $use
1054     ? $use
1055     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1056   ;
1057 }
1058
1059 sub set_dbms_capability {
1060   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1061 }
1062
1063 sub get_dbms_capability {
1064   my ($self, $capname) = @_;
1065
1066   my $cap = $self->_dbh_details->{capability}{$capname};
1067
1068   unless (defined $cap) {
1069     if (my $meth = $self->can ("_determine$capname")) {
1070       $cap = $self->$meth ? 1 : 0;
1071     }
1072     else {
1073       $cap = 0;
1074     }
1075
1076     $self->set_dbms_capability ($capname, $cap);
1077   }
1078
1079   return $cap;
1080 }
1081
1082 sub _server_info {
1083   my $self = shift;
1084
1085   my $info;
1086   unless ($info = $self->_dbh_details->{info}) {
1087
1088     $info = {};
1089
1090     my $server_version = try { $self->_get_server_version };
1091
1092     if (defined $server_version) {
1093       $info->{dbms_version} = $server_version;
1094
1095       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1096       my @verparts = split (/\./, $numeric_version);
1097       if (
1098         @verparts
1099           &&
1100         $verparts[0] <= 999
1101       ) {
1102         # consider only up to 3 version parts, iff not more than 3 digits
1103         my @use_parts;
1104         while (@verparts && @use_parts < 3) {
1105           my $p = shift @verparts;
1106           last if $p > 999;
1107           push @use_parts, $p;
1108         }
1109         push @use_parts, 0 while @use_parts < 3;
1110
1111         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1112       }
1113     }
1114
1115     $self->_dbh_details->{info} = $info;
1116   }
1117
1118   return $info;
1119 }
1120
1121 sub _get_server_version {
1122   shift->_get_dbh->get_info(18);
1123 }
1124
1125 sub _determine_driver {
1126   my ($self) = @_;
1127
1128   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1129     my $started_connected = 0;
1130     local $self->{_in_determine_driver} = 1;
1131
1132     if (ref($self) eq __PACKAGE__) {
1133       my $driver;
1134       if ($self->_dbh) { # we are connected
1135         $driver = $self->_dbh->{Driver}{Name};
1136         $started_connected = 1;
1137       } else {
1138         # if connect_info is a CODEREF, we have no choice but to connect
1139         if (ref $self->_dbi_connect_info->[0] &&
1140             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1141           $self->_populate_dbh;
1142           $driver = $self->_dbh->{Driver}{Name};
1143         }
1144         else {
1145           # try to use dsn to not require being connected, the driver may still
1146           # force a connection in _rebless to determine version
1147           # (dsn may not be supplied at all if all we do is make a mock-schema)
1148           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1149           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1150           $driver ||= $ENV{DBI_DRIVER};
1151         }
1152       }
1153
1154       if ($driver) {
1155         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1156         if ($self->load_optional_class($storage_class)) {
1157           mro::set_mro($storage_class, 'c3');
1158           bless $self, $storage_class;
1159           $self->_rebless();
1160         }
1161       }
1162     }
1163
1164     $self->_driver_determined(1);
1165
1166     $self->_init; # run driver-specific initializations
1167
1168     $self->_run_connection_actions
1169         if !$started_connected && defined $self->_dbh;
1170   }
1171 }
1172
1173 sub _do_connection_actions {
1174   my $self          = shift;
1175   my $method_prefix = shift;
1176   my $call          = shift;
1177
1178   if (not ref($call)) {
1179     my $method = $method_prefix . $call;
1180     $self->$method(@_);
1181   } elsif (ref($call) eq 'CODE') {
1182     $self->$call(@_);
1183   } elsif (ref($call) eq 'ARRAY') {
1184     if (ref($call->[0]) ne 'ARRAY') {
1185       $self->_do_connection_actions($method_prefix, $_) for @$call;
1186     } else {
1187       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1188     }
1189   } else {
1190     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1191   }
1192
1193   return $self;
1194 }
1195
1196 sub connect_call_do_sql {
1197   my $self = shift;
1198   $self->_do_query(@_);
1199 }
1200
1201 sub disconnect_call_do_sql {
1202   my $self = shift;
1203   $self->_do_query(@_);
1204 }
1205
1206 # override in db-specific backend when necessary
1207 sub connect_call_datetime_setup { 1 }
1208
1209 sub _do_query {
1210   my ($self, $action) = @_;
1211
1212   if (ref $action eq 'CODE') {
1213     $action = $action->($self);
1214     $self->_do_query($_) foreach @$action;
1215   }
1216   else {
1217     # Most debuggers expect ($sql, @bind), so we need to exclude
1218     # the attribute hash which is the second argument to $dbh->do
1219     # furthermore the bind values are usually to be presented
1220     # as named arrayref pairs, so wrap those here too
1221     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1222     my $sql = shift @do_args;
1223     my $attrs = shift @do_args;
1224     my @bind = map { [ undef, $_ ] } @do_args;
1225
1226     $self->_query_start($sql, @bind);
1227     $self->_get_dbh->do($sql, $attrs, @do_args);
1228     $self->_query_end($sql, @bind);
1229   }
1230
1231   return $self;
1232 }
1233
1234 sub _connect {
1235   my ($self, @info) = @_;
1236
1237   $self->throw_exception("You failed to provide any connection info")
1238     if !@info;
1239
1240   my ($old_connect_via, $dbh);
1241
1242   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1243     $old_connect_via = $DBI::connect_via;
1244     $DBI::connect_via = 'connect';
1245   }
1246
1247   try {
1248     if(ref $info[0] eq 'CODE') {
1249        $dbh = $info[0]->();
1250     }
1251     else {
1252        $dbh = DBI->connect(@info);
1253     }
1254
1255     if (!$dbh) {
1256       die $DBI::errstr;
1257     }
1258
1259     unless ($self->unsafe) {
1260
1261       # this odd anonymous coderef dereference is in fact really
1262       # necessary to avoid the unwanted effect described in perl5
1263       # RT#75792
1264       sub {
1265         my $weak_self = $_[0];
1266         weaken $weak_self;
1267
1268         $_[1]->{HandleError} = sub {
1269           if ($weak_self) {
1270             $weak_self->throw_exception("DBI Exception: $_[0]");
1271           }
1272           else {
1273             # the handler may be invoked by something totally out of
1274             # the scope of DBIC
1275             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1276           }
1277         };
1278       }->($self, $dbh);
1279
1280       $dbh->{ShowErrorStatement} = 1;
1281       $dbh->{RaiseError} = 1;
1282       $dbh->{PrintError} = 0;
1283     }
1284   }
1285   catch {
1286     $self->throw_exception("DBI Connection failed: $_")
1287   }
1288   finally {
1289     $DBI::connect_via = $old_connect_via if $old_connect_via;
1290   };
1291
1292   $self->_dbh_autocommit($dbh->{AutoCommit});
1293   $dbh;
1294 }
1295
1296 sub svp_begin {
1297   my ($self, $name) = @_;
1298
1299   $name = $self->_svp_generate_name
1300     unless defined $name;
1301
1302   $self->throw_exception ("You can't use savepoints outside a transaction")
1303     if $self->{transaction_depth} == 0;
1304
1305   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1306     unless $self->can('_svp_begin');
1307
1308   push @{ $self->{savepoints} }, $name;
1309
1310   $self->debugobj->svp_begin($name) if $self->debug;
1311
1312   return $self->_svp_begin($name);
1313 }
1314
1315 sub svp_release {
1316   my ($self, $name) = @_;
1317
1318   $self->throw_exception ("You can't use savepoints outside a transaction")
1319     if $self->{transaction_depth} == 0;
1320
1321   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1322     unless $self->can('_svp_release');
1323
1324   if (defined $name) {
1325     $self->throw_exception ("Savepoint '$name' does not exist")
1326       unless grep { $_ eq $name } @{ $self->{savepoints} };
1327
1328     # Dig through the stack until we find the one we are releasing.  This keeps
1329     # the stack up to date.
1330     my $svp;
1331
1332     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1333   } else {
1334     $name = pop @{ $self->{savepoints} };
1335   }
1336
1337   $self->debugobj->svp_release($name) if $self->debug;
1338
1339   return $self->_svp_release($name);
1340 }
1341
1342 sub svp_rollback {
1343   my ($self, $name) = @_;
1344
1345   $self->throw_exception ("You can't use savepoints outside a transaction")
1346     if $self->{transaction_depth} == 0;
1347
1348   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1349     unless $self->can('_svp_rollback');
1350
1351   if (defined $name) {
1352       # If they passed us a name, verify that it exists in the stack
1353       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1354           $self->throw_exception("Savepoint '$name' does not exist!");
1355       }
1356
1357       # Dig through the stack until we find the one we are releasing.  This keeps
1358       # the stack up to date.
1359       while(my $s = pop(@{ $self->{savepoints} })) {
1360           last if($s eq $name);
1361       }
1362       # Add the savepoint back to the stack, as a rollback doesn't remove the
1363       # named savepoint, only everything after it.
1364       push(@{ $self->{savepoints} }, $name);
1365   } else {
1366       # We'll assume they want to rollback to the last savepoint
1367       $name = $self->{savepoints}->[-1];
1368   }
1369
1370   $self->debugobj->svp_rollback($name) if $self->debug;
1371
1372   return $self->_svp_rollback($name);
1373 }
1374
1375 sub _svp_generate_name {
1376     my ($self) = @_;
1377
1378     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1379 }
1380
1381 sub txn_begin {
1382   my $self = shift;
1383
1384   # this means we have not yet connected and do not know the AC status
1385   # (e.g. coderef $dbh)
1386   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1387
1388   if($self->{transaction_depth} == 0) {
1389     $self->debugobj->txn_begin()
1390       if $self->debug;
1391     $self->_dbh_begin_work;
1392   }
1393   elsif ($self->auto_savepoint) {
1394     $self->svp_begin;
1395   }
1396   $self->{transaction_depth}++;
1397 }
1398
1399 sub _dbh_begin_work {
1400   my $self = shift;
1401
1402   # if the user is utilizing txn_do - good for him, otherwise we need to
1403   # ensure that the $dbh is healthy on BEGIN.
1404   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1405   # will be replaced by a failure of begin_work itself (which will be
1406   # then retried on reconnect)
1407   if ($self->{_in_dbh_do}) {
1408     $self->_dbh->begin_work;
1409   } else {
1410     $self->dbh_do(sub { $_[1]->begin_work });
1411   }
1412 }
1413
1414 sub txn_commit {
1415   my $self = shift;
1416   if ($self->{transaction_depth} == 1) {
1417     $self->debugobj->txn_commit()
1418       if ($self->debug);
1419     $self->_dbh_commit;
1420     $self->{transaction_depth} = 0
1421       if $self->_dbh_autocommit;
1422   }
1423   elsif($self->{transaction_depth} > 1) {
1424     $self->{transaction_depth}--;
1425     $self->svp_release
1426       if $self->auto_savepoint;
1427   }
1428 }
1429
1430 sub _dbh_commit {
1431   my $self = shift;
1432   my $dbh  = $self->_dbh
1433     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1434   $dbh->commit;
1435 }
1436
1437 sub txn_rollback {
1438   my $self = shift;
1439   my $dbh = $self->_dbh;
1440   try {
1441     if ($self->{transaction_depth} == 1) {
1442       $self->debugobj->txn_rollback()
1443         if ($self->debug);
1444       $self->{transaction_depth} = 0
1445         if $self->_dbh_autocommit;
1446       $self->_dbh_rollback;
1447     }
1448     elsif($self->{transaction_depth} > 1) {
1449       $self->{transaction_depth}--;
1450       if ($self->auto_savepoint) {
1451         $self->svp_rollback;
1452         $self->svp_release;
1453       }
1454     }
1455     else {
1456       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1457     }
1458   }
1459   catch {
1460     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1461
1462     if ($_ !~ /$exception_class/) {
1463       # ensure that a failed rollback resets the transaction depth
1464       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1465     }
1466
1467     $self->throw_exception($_)
1468   };
1469 }
1470
1471 sub _dbh_rollback {
1472   my $self = shift;
1473   my $dbh  = $self->_dbh
1474     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1475   $dbh->rollback;
1476 }
1477
1478 # This used to be the top-half of _execute.  It was split out to make it
1479 #  easier to override in NoBindVars without duping the rest.  It takes up
1480 #  all of _execute's args, and emits $sql, @bind.
1481 sub _prep_for_execute {
1482   my ($self, $op, $extra_bind, $ident, $args) = @_;
1483
1484   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1485     $ident = $ident->from();
1486   }
1487
1488   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1489
1490   unshift(@bind,
1491     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1492       if $extra_bind;
1493   return ($sql, \@bind);
1494 }
1495
1496
1497 sub _fix_bind_params {
1498     my ($self, @bind) = @_;
1499
1500     ### Turn @bind from something like this:
1501     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1502     ### to this:
1503     ###   ( "'1'", "'1'", "'3'" )
1504     return
1505         map {
1506             if ( defined( $_ && $_->[1] ) ) {
1507                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1508             }
1509             else { q{'NULL'}; }
1510         } @bind;
1511 }
1512
1513 sub _query_start {
1514     my ( $self, $sql, @bind ) = @_;
1515
1516     if ( $self->debug ) {
1517         @bind = $self->_fix_bind_params(@bind);
1518
1519         $self->debugobj->query_start( $sql, @bind );
1520     }
1521 }
1522
1523 sub _query_end {
1524     my ( $self, $sql, @bind ) = @_;
1525
1526     if ( $self->debug ) {
1527         @bind = $self->_fix_bind_params(@bind);
1528         $self->debugobj->query_end( $sql, @bind );
1529     }
1530 }
1531
1532 sub _dbh_execute {
1533   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1534
1535   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1536
1537   $self->_query_start( $sql, @$bind );
1538
1539   my $sth = $self->sth($sql,$op);
1540
1541   my $placeholder_index = 1;
1542
1543   foreach my $bound (@$bind) {
1544     my $attributes = {};
1545     my($column_name, @data) = @$bound;
1546
1547     if ($bind_attributes) {
1548       $attributes = $bind_attributes->{$column_name}
1549       if defined $bind_attributes->{$column_name};
1550     }
1551
1552     foreach my $data (@data) {
1553       my $ref = ref $data;
1554       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1555
1556       $sth->bind_param($placeholder_index, $data, $attributes);
1557       $placeholder_index++;
1558     }
1559   }
1560
1561   # Can this fail without throwing an exception anyways???
1562   my $rv = $sth->execute();
1563   $self->throw_exception(
1564     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1565   ) if !$rv;
1566
1567   $self->_query_end( $sql, @$bind );
1568
1569   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1570 }
1571
1572 sub _execute {
1573     my $self = shift;
1574     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1575 }
1576
1577 sub _prefetch_insert_auto_nextvals {
1578   my ($self, $source, $to_insert) = @_;
1579
1580   my $upd = {};
1581
1582   foreach my $col ( $source->columns ) {
1583     if ( !defined $to_insert->{$col} ) {
1584       my $col_info = $source->column_info($col);
1585
1586       if ( $col_info->{auto_nextval} ) {
1587         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1588           'nextval',
1589           $col_info->{sequence} ||=
1590             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1591         );
1592       }
1593     }
1594   }
1595
1596   return $upd;
1597 }
1598
1599 sub insert {
1600   my $self = shift;
1601   my ($source, $to_insert, $opts) = @_;
1602
1603   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1604
1605   my $bind_attributes = $self->source_bind_attributes($source);
1606
1607   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1608
1609   if ($opts->{returning}) {
1610     my @ret_cols = @{$opts->{returning}};
1611
1612     my @ret_vals = try {
1613       local $SIG{__WARN__} = sub {};
1614       my @r = $sth->fetchrow_array;
1615       $sth->finish;
1616       @r;
1617     };
1618
1619     my %ret;
1620     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1621
1622     $updated_cols = {
1623       %$updated_cols,
1624       %ret,
1625     };
1626   }
1627
1628   return $updated_cols;
1629 }
1630
1631 ## Currently it is assumed that all values passed will be "normal", i.e. not
1632 ## scalar refs, or at least, all the same type as the first set, the statement is
1633 ## only prepped once.
1634 sub insert_bulk {
1635   my ($self, $source, $cols, $data) = @_;
1636
1637   my %colvalues;
1638   @colvalues{@$cols} = (0..$#$cols);
1639
1640   for my $i (0..$#$cols) {
1641     my $first_val = $data->[0][$i];
1642     next unless ref $first_val eq 'SCALAR';
1643
1644     $colvalues{ $cols->[$i] } = $first_val;
1645   }
1646
1647   # check for bad data and stringify stringifiable objects
1648   my $bad_slice = sub {
1649     my ($msg, $col_idx, $slice_idx) = @_;
1650     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1651       $msg,
1652       $cols->[$col_idx],
1653       do {
1654         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1655         Dumper {
1656           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1657         },
1658       }
1659     );
1660   };
1661
1662   for my $datum_idx (0..$#$data) {
1663     my $datum = $data->[$datum_idx];
1664
1665     for my $col_idx (0..$#$cols) {
1666       my $val            = $datum->[$col_idx];
1667       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1668       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1669
1670       if ($is_literal_sql) {
1671         if (not ref $val) {
1672           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1673         }
1674         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1675           $bad_slice->("$reftype reference found where literal SQL expected",
1676             $col_idx, $datum_idx);
1677         }
1678         elsif ($$val ne $$sqla_bind){
1679           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1680             $col_idx, $datum_idx);
1681         }
1682       }
1683       elsif (my $reftype = ref $val) {
1684         require overload;
1685         if (overload::Method($val, '""')) {
1686           $datum->[$col_idx] = "".$val;
1687         }
1688         else {
1689           $bad_slice->("$reftype reference found where bind expected",
1690             $col_idx, $datum_idx);
1691         }
1692       }
1693     }
1694   }
1695
1696   my ($sql, $bind) = $self->_prep_for_execute (
1697     'insert', undef, $source, [\%colvalues]
1698   );
1699   my @bind = @$bind;
1700
1701   my $empty_bind = 1 if (not @bind) &&
1702     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1703
1704   if ((not @bind) && (not $empty_bind)) {
1705     $self->throw_exception(
1706       'Cannot insert_bulk without support for placeholders'
1707     );
1708   }
1709
1710   # neither _execute_array, nor _execute_inserts_with_no_binds are
1711   # atomic (even if _execute _array is a single call). Thus a safety
1712   # scope guard
1713   my $guard = $self->txn_scope_guard;
1714
1715   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1716   my $sth = $self->sth($sql);
1717   my $rv = do {
1718     if ($empty_bind) {
1719       # bind_param_array doesn't work if there are no binds
1720       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1721     }
1722     else {
1723 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1724       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1725     }
1726   };
1727
1728   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1729
1730   $guard->commit;
1731
1732   return (wantarray ? ($rv, $sth, @bind) : $rv);
1733 }
1734
1735 sub _execute_array {
1736   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1737
1738   ## This must be an arrayref, else nothing works!
1739   my $tuple_status = [];
1740
1741   ## Get the bind_attributes, if any exist
1742   my $bind_attributes = $self->source_bind_attributes($source);
1743
1744   ## Bind the values and execute
1745   my $placeholder_index = 1;
1746
1747   foreach my $bound (@$bind) {
1748
1749     my $attributes = {};
1750     my ($column_name, $data_index) = @$bound;
1751
1752     if( $bind_attributes ) {
1753       $attributes = $bind_attributes->{$column_name}
1754       if defined $bind_attributes->{$column_name};
1755     }
1756
1757     my @data = map { $_->[$data_index] } @$data;
1758
1759     $sth->bind_param_array(
1760       $placeholder_index,
1761       [@data],
1762       (%$attributes ?  $attributes : ()),
1763     );
1764     $placeholder_index++;
1765   }
1766
1767   my ($rv, $err);
1768   try {
1769     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1770   }
1771   catch {
1772     $err = shift;
1773   }
1774   finally {
1775     # Statement must finish even if there was an exception.
1776     try {
1777       $sth->finish
1778     }
1779     catch {
1780       $err = shift unless defined $err
1781     };
1782   };
1783
1784   $err = $sth->errstr
1785     if (! defined $err and $sth->err);
1786
1787   if (defined $err) {
1788     my $i = 0;
1789     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1790
1791     $self->throw_exception("Unexpected populate error: $err")
1792       if ($i > $#$tuple_status);
1793
1794     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1795       ($tuple_status->[$i][1] || $err),
1796       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1797     );
1798   }
1799
1800   return $rv;
1801 }
1802
1803 sub _dbh_execute_array {
1804     my ($self, $sth, $tuple_status, @extra) = @_;
1805
1806     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1807 }
1808
1809 sub _dbh_execute_inserts_with_no_binds {
1810   my ($self, $sth, $count) = @_;
1811
1812   my $err;
1813   try {
1814     my $dbh = $self->_get_dbh;
1815     local $dbh->{RaiseError} = 1;
1816     local $dbh->{PrintError} = 0;
1817
1818     $sth->execute foreach 1..$count;
1819   }
1820   catch {
1821     $err = shift;
1822   }
1823   finally {
1824     # Make sure statement is finished even if there was an exception.
1825     try {
1826       $sth->finish
1827     }
1828     catch {
1829       $err = shift unless defined $err;
1830     };
1831   };
1832
1833   $self->throw_exception($err) if defined $err;
1834
1835   return $count;
1836 }
1837
1838 sub update {
1839   my ($self, $source, @args) = @_;
1840
1841   my $bind_attrs = $self->source_bind_attributes($source);
1842
1843   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1844 }
1845
1846
1847 sub delete {
1848   my ($self, $source, @args) = @_;
1849
1850   my $bind_attrs = $self->source_bind_attributes($source);
1851
1852   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1853 }
1854
1855 # We were sent here because the $rs contains a complex search
1856 # which will require a subquery to select the correct rows
1857 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1858 #
1859 # Generating a single PK column subquery is trivial and supported
1860 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1861 # Look at _multipk_update_delete()
1862 sub _subq_update_delete {
1863   my $self = shift;
1864   my ($rs, $op, $values) = @_;
1865
1866   my $rsrc = $rs->result_source;
1867
1868   # quick check if we got a sane rs on our hands
1869   my @pcols = $rsrc->_pri_cols;
1870
1871   my $sel = $rs->_resolved_attrs->{select};
1872   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1873
1874   if (
1875       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1876         ne
1877       join ("\x00", sort @$sel )
1878   ) {
1879     $self->throw_exception (
1880       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1881     );
1882   }
1883
1884   if (@pcols == 1) {
1885     return $self->$op (
1886       $rsrc,
1887       $op eq 'update' ? $values : (),
1888       { $pcols[0] => { -in => $rs->as_query } },
1889     );
1890   }
1891
1892   else {
1893     return $self->_multipk_update_delete (@_);
1894   }
1895 }
1896
1897 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1898 # resultset update/delete involving subqueries. So by default resort
1899 # to simple (and inefficient) delete_all style per-row opearations,
1900 # while allowing specific storages to override this with a faster
1901 # implementation.
1902 #
1903 sub _multipk_update_delete {
1904   return shift->_per_row_update_delete (@_);
1905 }
1906
1907 # This is the default loop used to delete/update rows for multi PK
1908 # resultsets, and used by mysql exclusively (because it can't do anything
1909 # else).
1910 #
1911 # We do not use $row->$op style queries, because resultset update/delete
1912 # is not expected to cascade (this is what delete_all/update_all is for).
1913 #
1914 # There should be no race conditions as the entire operation is rolled
1915 # in a transaction.
1916 #
1917 sub _per_row_update_delete {
1918   my $self = shift;
1919   my ($rs, $op, $values) = @_;
1920
1921   my $rsrc = $rs->result_source;
1922   my @pcols = $rsrc->_pri_cols;
1923
1924   my $guard = $self->txn_scope_guard;
1925
1926   # emulate the return value of $sth->execute for non-selects
1927   my $row_cnt = '0E0';
1928
1929   my $subrs_cur = $rs->cursor;
1930   my @all_pk = $subrs_cur->all;
1931   for my $pks ( @all_pk) {
1932
1933     my $cond;
1934     for my $i (0.. $#pcols) {
1935       $cond->{$pcols[$i]} = $pks->[$i];
1936     }
1937
1938     $self->$op (
1939       $rsrc,
1940       $op eq 'update' ? $values : (),
1941       $cond,
1942     );
1943
1944     $row_cnt++;
1945   }
1946
1947   $guard->commit;
1948
1949   return $row_cnt;
1950 }
1951
1952 sub _select {
1953   my $self = shift;
1954   $self->_execute($self->_select_args(@_));
1955 }
1956
1957 sub _select_args_to_query {
1958   my $self = shift;
1959
1960   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1961   #  = $self->_select_args($ident, $select, $cond, $attrs);
1962   my ($op, $bind, $ident, $bind_attrs, @args) =
1963     $self->_select_args(@_);
1964
1965   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1966   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1967   $prepared_bind ||= [];
1968
1969   return wantarray
1970     ? ($sql, $prepared_bind, $bind_attrs)
1971     : \[ "($sql)", @$prepared_bind ]
1972   ;
1973 }
1974
1975 sub _select_args {
1976   my ($self, $ident, $select, $where, $attrs) = @_;
1977
1978   my $sql_maker = $self->sql_maker;
1979   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1980
1981   $attrs = {
1982     %$attrs,
1983     select => $select,
1984     from => $ident,
1985     where => $where,
1986     $rs_alias && $alias2source->{$rs_alias}
1987       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1988       : ()
1989     ,
1990   };
1991
1992   # calculate bind_attrs before possible $ident mangling
1993   my $bind_attrs = {};
1994   for my $alias (keys %$alias2source) {
1995     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1996     for my $col (keys %$bindtypes) {
1997
1998       my $fqcn = join ('.', $alias, $col);
1999       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2000
2001       # Unqialified column names are nice, but at the same time can be
2002       # rather ambiguous. What we do here is basically go along with
2003       # the loop, adding an unqualified column slot to $bind_attrs,
2004       # alongside the fully qualified name. As soon as we encounter
2005       # another column by that name (which would imply another table)
2006       # we unset the unqualified slot and never add any info to it
2007       # to avoid erroneous type binding. If this happens the users
2008       # only choice will be to fully qualify his column name
2009
2010       if (exists $bind_attrs->{$col}) {
2011         $bind_attrs->{$col} = {};
2012       }
2013       else {
2014         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2015       }
2016     }
2017   }
2018
2019   # Sanity check the attributes (SQLMaker does it too, but
2020   # in case of a software_limit we'll never reach there)
2021   if (defined $attrs->{offset}) {
2022     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2023       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2024   }
2025   $attrs->{offset} ||= 0;
2026
2027   if (defined $attrs->{rows}) {
2028     $self->throw_exception("The rows attribute must be a positive integer if present")
2029       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2030   }
2031   elsif ($attrs->{offset}) {
2032     # MySQL actually recommends this approach.  I cringe.
2033     $attrs->{rows} = $sql_maker->__max_int;
2034   }
2035
2036   my @limit;
2037
2038   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2039   # storage, unless software limit was requested
2040   if (
2041     #limited has_many
2042     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2043        ||
2044     # grouped prefetch (to satisfy group_by == select)
2045     ( $attrs->{group_by}
2046         &&
2047       @{$attrs->{group_by}}
2048         &&
2049       $attrs->{_prefetch_select}
2050         &&
2051       @{$attrs->{_prefetch_select}}
2052     )
2053   ) {
2054     ($ident, $select, $where, $attrs)
2055       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2056   }
2057   elsif (! $attrs->{software_limit} ) {
2058     push @limit, $attrs->{rows}, $attrs->{offset};
2059   }
2060
2061   # try to simplify the joinmap further (prune unreferenced type-single joins)
2062   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2063
2064 ###
2065   # This would be the point to deflate anything found in $where
2066   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2067   # expect a row object. And all we have is a resultsource (it is trivial
2068   # to extract deflator coderefs via $alias2source above).
2069   #
2070   # I don't see a way forward other than changing the way deflators are
2071   # invoked, and that's just bad...
2072 ###
2073
2074   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2075 }
2076
2077 # Returns a counting SELECT for a simple count
2078 # query. Abstracted so that a storage could override
2079 # this to { count => 'firstcol' } or whatever makes
2080 # sense as a performance optimization
2081 sub _count_select {
2082   #my ($self, $source, $rs_attrs) = @_;
2083   return { count => '*' };
2084 }
2085
2086
2087 sub source_bind_attributes {
2088   my ($self, $source) = @_;
2089
2090   my $bind_attributes;
2091   foreach my $column ($source->columns) {
2092
2093     my $data_type = $source->column_info($column)->{data_type} || '';
2094     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2095      if $data_type;
2096   }
2097
2098   return $bind_attributes;
2099 }
2100
2101 =head2 select
2102
2103 =over 4
2104
2105 =item Arguments: $ident, $select, $condition, $attrs
2106
2107 =back
2108
2109 Handle a SQL select statement.
2110
2111 =cut
2112
2113 sub select {
2114   my $self = shift;
2115   my ($ident, $select, $condition, $attrs) = @_;
2116   return $self->cursor_class->new($self, \@_, $attrs);
2117 }
2118
2119 sub select_single {
2120   my $self = shift;
2121   my ($rv, $sth, @bind) = $self->_select(@_);
2122   my @row = $sth->fetchrow_array;
2123   my @nextrow = $sth->fetchrow_array if @row;
2124   if(@row && @nextrow) {
2125     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2126   }
2127   # Need to call finish() to work round broken DBDs
2128   $sth->finish();
2129   return @row;
2130 }
2131
2132 =head2 sql_limit_dialect
2133
2134 This is an accessor for the default SQL limit dialect used by a particular
2135 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2136 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2137 see L<DBIx::Class::SQLMaker::LimitDialects>.
2138
2139 =head2 sth
2140
2141 =over 4
2142
2143 =item Arguments: $sql
2144
2145 =back
2146
2147 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2148
2149 =cut
2150
2151 sub _dbh_sth {
2152   my ($self, $dbh, $sql) = @_;
2153
2154   # 3 is the if_active parameter which avoids active sth re-use
2155   my $sth = $self->disable_sth_caching
2156     ? $dbh->prepare($sql)
2157     : $dbh->prepare_cached($sql, {}, 3);
2158
2159   # XXX You would think RaiseError would make this impossible,
2160   #  but apparently that's not true :(
2161   $self->throw_exception($dbh->errstr) if !$sth;
2162
2163   $sth;
2164 }
2165
2166 sub sth {
2167   my ($self, $sql) = @_;
2168   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2169 }
2170
2171 sub _dbh_columns_info_for {
2172   my ($self, $dbh, $table) = @_;
2173
2174   if ($dbh->can('column_info')) {
2175     my %result;
2176     my $caught;
2177     try {
2178       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2179       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2180       $sth->execute();
2181       while ( my $info = $sth->fetchrow_hashref() ){
2182         my %column_info;
2183         $column_info{data_type}   = $info->{TYPE_NAME};
2184         $column_info{size}      = $info->{COLUMN_SIZE};
2185         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2186         $column_info{default_value} = $info->{COLUMN_DEF};
2187         my $col_name = $info->{COLUMN_NAME};
2188         $col_name =~ s/^\"(.*)\"$/$1/;
2189
2190         $result{$col_name} = \%column_info;
2191       }
2192     } catch {
2193       $caught = 1;
2194     };
2195     return \%result if !$caught && scalar keys %result;
2196   }
2197
2198   my %result;
2199   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2200   $sth->execute;
2201   my @columns = @{$sth->{NAME_lc}};
2202   for my $i ( 0 .. $#columns ){
2203     my %column_info;
2204     $column_info{data_type} = $sth->{TYPE}->[$i];
2205     $column_info{size} = $sth->{PRECISION}->[$i];
2206     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2207
2208     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2209       $column_info{data_type} = $1;
2210       $column_info{size}    = $2;
2211     }
2212
2213     $result{$columns[$i]} = \%column_info;
2214   }
2215   $sth->finish;
2216
2217   foreach my $col (keys %result) {
2218     my $colinfo = $result{$col};
2219     my $type_num = $colinfo->{data_type};
2220     my $type_name;
2221     if(defined $type_num && $dbh->can('type_info')) {
2222       my $type_info = $dbh->type_info($type_num);
2223       $type_name = $type_info->{TYPE_NAME} if $type_info;
2224       $colinfo->{data_type} = $type_name if $type_name;
2225     }
2226   }
2227
2228   return \%result;
2229 }
2230
2231 sub columns_info_for {
2232   my ($self, $table) = @_;
2233   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2234 }
2235
2236 =head2 last_insert_id
2237
2238 Return the row id of the last insert.
2239
2240 =cut
2241
2242 sub _dbh_last_insert_id {
2243     my ($self, $dbh, $source, $col) = @_;
2244
2245     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2246
2247     return $id if defined $id;
2248
2249     my $class = ref $self;
2250     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2251 }
2252
2253 sub last_insert_id {
2254   my $self = shift;
2255   $self->_dbh_last_insert_id ($self->_dbh, @_);
2256 }
2257
2258 =head2 _native_data_type
2259
2260 =over 4
2261
2262 =item Arguments: $type_name
2263
2264 =back
2265
2266 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2267 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2268 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2269
2270 The default implementation returns C<undef>, implement in your Storage driver if
2271 you need this functionality.
2272
2273 Should map types from other databases to the native RDBMS type, for example
2274 C<VARCHAR2> to C<VARCHAR>.
2275
2276 Types with modifiers should map to the underlying data type. For example,
2277 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2278
2279 Composite types should map to the container type, for example
2280 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2281
2282 =cut
2283
2284 sub _native_data_type {
2285   #my ($self, $data_type) = @_;
2286   return undef
2287 }
2288
2289 # Check if placeholders are supported at all
2290 sub _determine_supports_placeholders {
2291   my $self = shift;
2292   my $dbh  = $self->_get_dbh;
2293
2294   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2295   # but it is inaccurate more often than not
2296   return try {
2297     local $dbh->{PrintError} = 0;
2298     local $dbh->{RaiseError} = 1;
2299     $dbh->do('select ?', {}, 1);
2300     1;
2301   }
2302   catch {
2303     0;
2304   };
2305 }
2306
2307 # Check if placeholders bound to non-string types throw exceptions
2308 #
2309 sub _determine_supports_typeless_placeholders {
2310   my $self = shift;
2311   my $dbh  = $self->_get_dbh;
2312
2313   return try {
2314     local $dbh->{PrintError} = 0;
2315     local $dbh->{RaiseError} = 1;
2316     # this specifically tests a bind that is NOT a string
2317     $dbh->do('select 1 where 1 = ?', {}, 1);
2318     1;
2319   }
2320   catch {
2321     0;
2322   };
2323 }
2324
2325 =head2 sqlt_type
2326
2327 Returns the database driver name.
2328
2329 =cut
2330
2331 sub sqlt_type {
2332   shift->_get_dbh->{Driver}->{Name};
2333 }
2334
2335 =head2 bind_attribute_by_data_type
2336
2337 Given a datatype from column info, returns a database specific bind
2338 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2339 let the database planner just handle it.
2340
2341 Generally only needed for special case column types, like bytea in postgres.
2342
2343 =cut
2344
2345 sub bind_attribute_by_data_type {
2346     return;
2347 }
2348
2349 =head2 is_datatype_numeric
2350
2351 Given a datatype from column_info, returns a boolean value indicating if
2352 the current RDBMS considers it a numeric value. This controls how
2353 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2354 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2355 be performed instead of the usual C<eq>.
2356
2357 =cut
2358
2359 sub is_datatype_numeric {
2360   my ($self, $dt) = @_;
2361
2362   return 0 unless $dt;
2363
2364   return $dt =~ /^ (?:
2365     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2366   ) $/ix;
2367 }
2368
2369
2370 =head2 create_ddl_dir
2371
2372 =over 4
2373
2374 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2375
2376 =back
2377
2378 Creates a SQL file based on the Schema, for each of the specified
2379 database engines in C<\@databases> in the given directory.
2380 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2381
2382 Given a previous version number, this will also create a file containing
2383 the ALTER TABLE statements to transform the previous schema into the
2384 current one. Note that these statements may contain C<DROP TABLE> or
2385 C<DROP COLUMN> statements that can potentially destroy data.
2386
2387 The file names are created using the C<ddl_filename> method below, please
2388 override this method in your schema if you would like a different file
2389 name format. For the ALTER file, the same format is used, replacing
2390 $version in the name with "$preversion-$version".
2391
2392 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2393 The most common value for this would be C<< { add_drop_table => 1 } >>
2394 to have the SQL produced include a C<DROP TABLE> statement for each table
2395 created. For quoting purposes supply C<quote_table_names> and
2396 C<quote_field_names>.
2397
2398 If no arguments are passed, then the following default values are assumed:
2399
2400 =over 4
2401
2402 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2403
2404 =item version    - $schema->schema_version
2405
2406 =item directory  - './'
2407
2408 =item preversion - <none>
2409
2410 =back
2411
2412 By default, C<\%sqlt_args> will have
2413
2414  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2415
2416 merged with the hash passed in. To disable any of those features, pass in a
2417 hashref like the following
2418
2419  { ignore_constraint_names => 0, # ... other options }
2420
2421
2422 WARNING: You are strongly advised to check all SQL files created, before applying
2423 them.
2424
2425 =cut
2426
2427 sub create_ddl_dir {
2428   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2429
2430   unless ($dir) {
2431     carp "No directory given, using ./\n";
2432     $dir = './';
2433   } else {
2434       -d $dir
2435         or
2436       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2437         or
2438       $self->throw_exception(
2439         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2440       );
2441   }
2442
2443   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2444
2445   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2446   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2447
2448   my $schema_version = $schema->schema_version || '1.x';
2449   $version ||= $schema_version;
2450
2451   $sqltargs = {
2452     add_drop_table => 1,
2453     ignore_constraint_names => 1,
2454     ignore_index_names => 1,
2455     %{$sqltargs || {}}
2456   };
2457
2458   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2459     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2460   }
2461
2462   my $sqlt = SQL::Translator->new( $sqltargs );
2463
2464   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2465   my $sqlt_schema = $sqlt->translate({ data => $schema })
2466     or $self->throw_exception ($sqlt->error);
2467
2468   foreach my $db (@$databases) {
2469     $sqlt->reset();
2470     $sqlt->{schema} = $sqlt_schema;
2471     $sqlt->producer($db);
2472
2473     my $file;
2474     my $filename = $schema->ddl_filename($db, $version, $dir);
2475     if (-e $filename && ($version eq $schema_version )) {
2476       # if we are dumping the current version, overwrite the DDL
2477       carp "Overwriting existing DDL file - $filename";
2478       unlink($filename);
2479     }
2480
2481     my $output = $sqlt->translate;
2482     if(!$output) {
2483       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2484       next;
2485     }
2486     if(!open($file, ">$filename")) {
2487       $self->throw_exception("Can't open $filename for writing ($!)");
2488       next;
2489     }
2490     print $file $output;
2491     close($file);
2492
2493     next unless ($preversion);
2494
2495     require SQL::Translator::Diff;
2496
2497     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2498     if(!-e $prefilename) {
2499       carp("No previous schema file found ($prefilename)");
2500       next;
2501     }
2502
2503     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2504     if(-e $difffile) {
2505       carp("Overwriting existing diff file - $difffile");
2506       unlink($difffile);
2507     }
2508
2509     my $source_schema;
2510     {
2511       my $t = SQL::Translator->new($sqltargs);
2512       $t->debug( 0 );
2513       $t->trace( 0 );
2514
2515       $t->parser( $db )
2516         or $self->throw_exception ($t->error);
2517
2518       my $out = $t->translate( $prefilename )
2519         or $self->throw_exception ($t->error);
2520
2521       $source_schema = $t->schema;
2522
2523       $source_schema->name( $prefilename )
2524         unless ( $source_schema->name );
2525     }
2526
2527     # The "new" style of producers have sane normalization and can support
2528     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2529     # And we have to diff parsed SQL against parsed SQL.
2530     my $dest_schema = $sqlt_schema;
2531
2532     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2533       my $t = SQL::Translator->new($sqltargs);
2534       $t->debug( 0 );
2535       $t->trace( 0 );
2536
2537       $t->parser( $db )
2538         or $self->throw_exception ($t->error);
2539
2540       my $out = $t->translate( $filename )
2541         or $self->throw_exception ($t->error);
2542
2543       $dest_schema = $t->schema;
2544
2545       $dest_schema->name( $filename )
2546         unless $dest_schema->name;
2547     }
2548
2549     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2550                                                   $dest_schema,   $db,
2551                                                   $sqltargs
2552                                                  );
2553     if(!open $file, ">$difffile") {
2554       $self->throw_exception("Can't write to $difffile ($!)");
2555       next;
2556     }
2557     print $file $diff;
2558     close($file);
2559   }
2560 }
2561
2562 =head2 deployment_statements
2563
2564 =over 4
2565
2566 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2567
2568 =back
2569
2570 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2571
2572 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2573 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2574
2575 C<$directory> is used to return statements from files in a previously created
2576 L</create_ddl_dir> directory and is optional. The filenames are constructed
2577 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2578
2579 If no C<$directory> is specified then the statements are constructed on the
2580 fly using L<SQL::Translator> and C<$version> is ignored.
2581
2582 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2583
2584 =cut
2585
2586 sub deployment_statements {
2587   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2588   $type ||= $self->sqlt_type;
2589   $version ||= $schema->schema_version || '1.x';
2590   $dir ||= './';
2591   my $filename = $schema->ddl_filename($type, $version, $dir);
2592   if(-f $filename)
2593   {
2594       my $file;
2595       open($file, "<$filename")
2596         or $self->throw_exception("Can't open $filename ($!)");
2597       my @rows = <$file>;
2598       close($file);
2599       return join('', @rows);
2600   }
2601
2602   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2603     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2604   }
2605
2606   # sources needs to be a parser arg, but for simplicty allow at top level
2607   # coming in
2608   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2609       if exists $sqltargs->{sources};
2610
2611   my $tr = SQL::Translator->new(
2612     producer => "SQL::Translator::Producer::${type}",
2613     %$sqltargs,
2614     parser => 'SQL::Translator::Parser::DBIx::Class',
2615     data => $schema,
2616   );
2617
2618   my @ret;
2619   my $wa = wantarray;
2620   if ($wa) {
2621     @ret = $tr->translate;
2622   }
2623   else {
2624     $ret[0] = $tr->translate;
2625   }
2626
2627   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2628     unless (@ret && defined $ret[0]);
2629
2630   return $wa ? @ret : $ret[0];
2631 }
2632
2633 sub deploy {
2634   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2635   my $deploy = sub {
2636     my $line = shift;
2637     return if($line =~ /^--/);
2638     return if(!$line);
2639     # next if($line =~ /^DROP/m);
2640     return if($line =~ /^BEGIN TRANSACTION/m);
2641     return if($line =~ /^COMMIT/m);
2642     return if $line =~ /^\s+$/; # skip whitespace only
2643     $self->_query_start($line);
2644     try {
2645       # do a dbh_do cycle here, as we need some error checking in
2646       # place (even though we will ignore errors)
2647       $self->dbh_do (sub { $_[1]->do($line) });
2648     } catch {
2649       carp qq{$_ (running "${line}")};
2650     };
2651     $self->_query_end($line);
2652   };
2653   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2654   if (@statements > 1) {
2655     foreach my $statement (@statements) {
2656       $deploy->( $statement );
2657     }
2658   }
2659   elsif (@statements == 1) {
2660     foreach my $line ( split(";\n", $statements[0])) {
2661       $deploy->( $line );
2662     }
2663   }
2664 }
2665
2666 =head2 datetime_parser
2667
2668 Returns the datetime parser class
2669
2670 =cut
2671
2672 sub datetime_parser {
2673   my $self = shift;
2674   return $self->{datetime_parser} ||= do {
2675     $self->build_datetime_parser(@_);
2676   };
2677 }
2678
2679 =head2 datetime_parser_type
2680
2681 Defines (returns) the datetime parser class - currently hardwired to
2682 L<DateTime::Format::MySQL>
2683
2684 =cut
2685
2686 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2687
2688 =head2 build_datetime_parser
2689
2690 See L</datetime_parser>
2691
2692 =cut
2693
2694 sub build_datetime_parser {
2695   my $self = shift;
2696   my $type = $self->datetime_parser_type(@_);
2697   $self->ensure_class_loaded ($type);
2698   return $type;
2699 }
2700
2701
2702 =head2 is_replicating
2703
2704 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2705 replicate from a master database.  Default is undef, which is the result
2706 returned by databases that don't support replication.
2707
2708 =cut
2709
2710 sub is_replicating {
2711     return;
2712
2713 }
2714
2715 =head2 lag_behind_master
2716
2717 Returns a number that represents a certain amount of lag behind a master db
2718 when a given storage is replicating.  The number is database dependent, but
2719 starts at zero and increases with the amount of lag. Default in undef
2720
2721 =cut
2722
2723 sub lag_behind_master {
2724     return;
2725 }
2726
2727 =head2 relname_to_table_alias
2728
2729 =over 4
2730
2731 =item Arguments: $relname, $join_count
2732
2733 =back
2734
2735 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2736 queries.
2737
2738 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2739 way these aliases are named.
2740
2741 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2742 otherwise C<"$relname">.
2743
2744 =cut
2745
2746 sub relname_to_table_alias {
2747   my ($self, $relname, $join_count) = @_;
2748
2749   my $alias = ($join_count && $join_count > 1 ?
2750     join('_', $relname, $join_count) : $relname);
2751
2752   return $alias;
2753 }
2754
2755 1;
2756
2757 =head1 USAGE NOTES
2758
2759 =head2 DBIx::Class and AutoCommit
2760
2761 DBIx::Class can do some wonderful magic with handling exceptions,
2762 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2763 (the default) combined with C<txn_do> for transaction support.
2764
2765 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2766 in an assumed transaction between commits, and you're telling us you'd
2767 like to manage that manually.  A lot of the magic protections offered by
2768 this module will go away.  We can't protect you from exceptions due to database
2769 disconnects because we don't know anything about how to restart your
2770 transactions.  You're on your own for handling all sorts of exceptional
2771 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2772 be with raw DBI.
2773
2774
2775 =head1 AUTHORS
2776
2777 Matt S. Trout <mst@shadowcatsystems.co.uk>
2778
2779 Andy Grundman <andy@hybridized.org>
2780
2781 =head1 LICENSE
2782
2783 You may distribute this code under the same terms as Perl itself.
2784
2785 =cut