38483878b28afa6ba946e457516c590c9a642a60
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } @capabilities );
60
61
62 # Each of these methods need _determine_driver called before itself
63 # in order to function reliably. This is a purely DRY optimization
64 #
65 # get_(use)_dbms_capability need to be called on the correct Storage
66 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
67 # _determine_supports_X which obv. needs a correct driver as well
68 my @rdbms_specific_methods = qw/
69   deployment_statements
70   sqlt_type
71   sql_maker
72   build_datetime_parser
73   datetime_parser_type
74
75   insert
76   insert_bulk
77   update
78   delete
79   select
80   select_single
81
82   get_use_dbms_capability
83   get_dbms_capability
84
85   _server_info
86   _get_server_version
87 /;
88
89 for my $meth (@rdbms_specific_methods) {
90
91   my $orig = __PACKAGE__->can ($meth)
92     or die "$meth is not a ::Storage::DBI method!";
93
94   no strict qw/refs/;
95   no warnings qw/redefine/;
96   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
97     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
98       $_[0]->_determine_driver;
99
100       # This for some reason crashes and burns on perl 5.8.1
101       # IFF the method ends up throwing an exception
102       #goto $_[0]->can ($meth);
103
104       my $cref = $_[0]->can ($meth);
105       goto $cref;
106     }
107     goto $orig;
108   };
109 }
110
111
112 =head1 NAME
113
114 DBIx::Class::Storage::DBI - DBI storage handler
115
116 =head1 SYNOPSIS
117
118   my $schema = MySchema->connect('dbi:SQLite:my.db');
119
120   $schema->storage->debug(1);
121
122   my @stuff = $schema->storage->dbh_do(
123     sub {
124       my ($storage, $dbh, @args) = @_;
125       $dbh->do("DROP TABLE authors");
126     },
127     @column_list
128   );
129
130   $schema->resultset('Book')->search({
131      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
132   });
133
134 =head1 DESCRIPTION
135
136 This class represents the connection to an RDBMS via L<DBI>.  See
137 L<DBIx::Class::Storage> for general information.  This pod only
138 documents DBI-specific methods and behaviors.
139
140 =head1 METHODS
141
142 =cut
143
144 sub new {
145   my $new = shift->next::method(@_);
146
147   $new->transaction_depth(0);
148   $new->_sql_maker_opts({});
149   $new->_dbh_details({});
150   $new->{savepoints} = [];
151   $new->{_in_dbh_do} = 0;
152   $new->{_dbh_gen} = 0;
153
154   # read below to see what this does
155   $new->_arm_global_destructor;
156
157   $new;
158 }
159
160 # This is hack to work around perl shooting stuff in random
161 # order on exit(). If we do not walk the remaining storage
162 # objects in an END block, there is a *small but real* chance
163 # of a fork()ed child to kill the parent's shared DBI handle,
164 # *before perl reaches the DESTROY in this package*
165 # Yes, it is ugly and effective.
166 {
167   my %seek_and_destroy;
168
169   sub _arm_global_destructor {
170     my $self = shift;
171     my $key = Scalar::Util::refaddr ($self);
172     $seek_and_destroy{$key} = $self;
173     Scalar::Util::weaken ($seek_and_destroy{$key});
174   }
175
176   END {
177     local $?; # just in case the DBI destructor changes it somehow
178
179     # destroy just the object if not native to this process/thread
180     $_->_preserve_foreign_dbh for (grep
181       { defined $_ }
182       values %seek_and_destroy
183     );
184   }
185 }
186
187 sub DESTROY {
188   my $self = shift;
189
190   # destroy just the object if not native to this process/thread
191   $self->_preserve_foreign_dbh;
192
193   # some databases need this to stop spewing warnings
194   if (my $dbh = $self->_dbh) {
195     try {
196       %{ $dbh->{CachedKids} } = ();
197       $dbh->disconnect;
198     };
199   }
200
201   $self->_dbh(undef);
202 }
203
204 sub _preserve_foreign_dbh {
205   my $self = shift;
206
207   return unless $self->_dbh;
208
209   $self->_verify_tid;
210
211   return unless $self->_dbh;
212
213   $self->_verify_pid;
214
215 }
216
217 # handle pid changes correctly - do not destroy parent's connection
218 sub _verify_pid {
219   my $self = shift;
220
221   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
222
223   $self->_dbh->{InactiveDestroy} = 1;
224   $self->_dbh(undef);
225   $self->{_dbh_gen}++;
226
227   return;
228 }
229
230 # very similar to above, but seems to FAIL if I set InactiveDestroy
231 sub _verify_tid {
232   my $self = shift;
233
234   if ( ! defined $self->_conn_tid ) {
235     return; # no threads
236   }
237   elsif ( $self->_conn_tid == threads->tid ) {
238     return; # same thread
239   }
240
241   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
242   $self->_dbh(undef);
243   $self->{_dbh_gen}++;
244
245   return;
246 }
247
248
249 =head2 connect_info
250
251 This method is normally called by L<DBIx::Class::Schema/connection>, which
252 encapsulates its argument list in an arrayref before passing them here.
253
254 The argument list may contain:
255
256 =over
257
258 =item *
259
260 The same 4-element argument set one would normally pass to
261 L<DBI/connect>, optionally followed by
262 L<extra attributes|/DBIx::Class specific connection attributes>
263 recognized by DBIx::Class:
264
265   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
266
267 =item *
268
269 A single code reference which returns a connected
270 L<DBI database handle|DBI/connect> optionally followed by
271 L<extra attributes|/DBIx::Class specific connection attributes> recognized
272 by DBIx::Class:
273
274   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
275
276 =item *
277
278 A single hashref with all the attributes and the dsn/user/password
279 mixed together:
280
281   $connect_info_args = [{
282     dsn => $dsn,
283     user => $user,
284     password => $pass,
285     %dbi_attributes,
286     %extra_attributes,
287   }];
288
289   $connect_info_args = [{
290     dbh_maker => sub { DBI->connect (...) },
291     %dbi_attributes,
292     %extra_attributes,
293   }];
294
295 This is particularly useful for L<Catalyst> based applications, allowing the
296 following config (L<Config::General> style):
297
298   <Model::DB>
299     schema_class   App::DB
300     <connect_info>
301       dsn          dbi:mysql:database=test
302       user         testuser
303       password     TestPass
304       AutoCommit   1
305     </connect_info>
306   </Model::DB>
307
308 The C<dsn>/C<user>/C<password> combination can be substituted by the
309 C<dbh_maker> key whose value is a coderef that returns a connected
310 L<DBI database handle|DBI/connect>
311
312 =back
313
314 Please note that the L<DBI> docs recommend that you always explicitly
315 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
316 recommends that it be set to I<1>, and that you perform transactions
317 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
318 to I<1> if you do not do explicitly set it to zero.  This is the default
319 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
320
321 =head3 DBIx::Class specific connection attributes
322
323 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
324 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
325 the following connection options. These options can be mixed in with your other
326 L<DBI> connection attributes, or placed in a separate hashref
327 (C<\%extra_attributes>) as shown above.
328
329 Every time C<connect_info> is invoked, any previous settings for
330 these options will be cleared before setting the new ones, regardless of
331 whether any options are specified in the new C<connect_info>.
332
333
334 =over
335
336 =item on_connect_do
337
338 Specifies things to do immediately after connecting or re-connecting to
339 the database.  Its value may contain:
340
341 =over
342
343 =item a scalar
344
345 This contains one SQL statement to execute.
346
347 =item an array reference
348
349 This contains SQL statements to execute in order.  Each element contains
350 a string or a code reference that returns a string.
351
352 =item a code reference
353
354 This contains some code to execute.  Unlike code references within an
355 array reference, its return value is ignored.
356
357 =back
358
359 =item on_disconnect_do
360
361 Takes arguments in the same form as L</on_connect_do> and executes them
362 immediately before disconnecting from the database.
363
364 Note, this only runs if you explicitly call L</disconnect> on the
365 storage object.
366
367 =item on_connect_call
368
369 A more generalized form of L</on_connect_do> that calls the specified
370 C<connect_call_METHOD> methods in your storage driver.
371
372   on_connect_do => 'select 1'
373
374 is equivalent to:
375
376   on_connect_call => [ [ do_sql => 'select 1' ] ]
377
378 Its values may contain:
379
380 =over
381
382 =item a scalar
383
384 Will call the C<connect_call_METHOD> method.
385
386 =item a code reference
387
388 Will execute C<< $code->($storage) >>
389
390 =item an array reference
391
392 Each value can be a method name or code reference.
393
394 =item an array of arrays
395
396 For each array, the first item is taken to be the C<connect_call_> method name
397 or code reference, and the rest are parameters to it.
398
399 =back
400
401 Some predefined storage methods you may use:
402
403 =over
404
405 =item do_sql
406
407 Executes a SQL string or a code reference that returns a SQL string. This is
408 what L</on_connect_do> and L</on_disconnect_do> use.
409
410 It can take:
411
412 =over
413
414 =item a scalar
415
416 Will execute the scalar as SQL.
417
418 =item an arrayref
419
420 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
421 attributes hashref and bind values.
422
423 =item a code reference
424
425 Will execute C<< $code->($storage) >> and execute the return array refs as
426 above.
427
428 =back
429
430 =item datetime_setup
431
432 Execute any statements necessary to initialize the database session to return
433 and accept datetime/timestamp values used with
434 L<DBIx::Class::InflateColumn::DateTime>.
435
436 Only necessary for some databases, see your specific storage driver for
437 implementation details.
438
439 =back
440
441 =item on_disconnect_call
442
443 Takes arguments in the same form as L</on_connect_call> and executes them
444 immediately before disconnecting from the database.
445
446 Calls the C<disconnect_call_METHOD> methods as opposed to the
447 C<connect_call_METHOD> methods called by L</on_connect_call>.
448
449 Note, this only runs if you explicitly call L</disconnect> on the
450 storage object.
451
452 =item disable_sth_caching
453
454 If set to a true value, this option will disable the caching of
455 statement handles via L<DBI/prepare_cached>.
456
457 =item limit_dialect
458
459 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
460 default L</sql_limit_dialect> setting of the storage (if any). For a list
461 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
462
463 =item quote_char
464
465 Specifies what characters to use to quote table and column names. If
466 you use this you will want to specify L</name_sep> as well.
467
468 C<quote_char> expects either a single character, in which case is it
469 is placed on either side of the table/column name, or an arrayref of length
470 2 in which case the table/column name is placed between the elements.
471
472 For example under MySQL you should use C<< quote_char => '`' >>, and for
473 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
474
475 =item name_sep
476
477 This only needs to be used in conjunction with C<quote_char>, and is used to
478 specify the character that separates elements (schemas, tables, columns) from
479 each other. In most cases this is simply a C<.>.
480
481 The consequences of not supplying this value is that L<SQL::Abstract>
482 will assume DBIx::Class' uses of aliases to be complete column
483 names. The output will look like I<"me.name"> when it should actually
484 be I<"me"."name">.
485
486 =item unsafe
487
488 This Storage driver normally installs its own C<HandleError>, sets
489 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
490 all database handles, including those supplied by a coderef.  It does this
491 so that it can have consistent and useful error behavior.
492
493 If you set this option to a true value, Storage will not do its usual
494 modifications to the database handle's attributes, and instead relies on
495 the settings in your connect_info DBI options (or the values you set in
496 your connection coderef, in the case that you are connecting via coderef).
497
498 Note that your custom settings can cause Storage to malfunction,
499 especially if you set a C<HandleError> handler that suppresses exceptions
500 and/or disable C<RaiseError>.
501
502 =item auto_savepoint
503
504 If this option is true, L<DBIx::Class> will use savepoints when nesting
505 transactions, making it possible to recover from failure in the inner
506 transaction without having to abort all outer transactions.
507
508 =item cursor_class
509
510 Use this argument to supply a cursor class other than the default
511 L<DBIx::Class::Storage::DBI::Cursor>.
512
513 =back
514
515 Some real-life examples of arguments to L</connect_info> and
516 L<DBIx::Class::Schema/connect>
517
518   # Simple SQLite connection
519   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
520
521   # Connect via subref
522   ->connect_info([ sub { DBI->connect(...) } ]);
523
524   # Connect via subref in hashref
525   ->connect_info([{
526     dbh_maker => sub { DBI->connect(...) },
527     on_connect_do => 'alter session ...',
528   }]);
529
530   # A bit more complicated
531   ->connect_info(
532     [
533       'dbi:Pg:dbname=foo',
534       'postgres',
535       'my_pg_password',
536       { AutoCommit => 1 },
537       { quote_char => q{"}, name_sep => q{.} },
538     ]
539   );
540
541   # Equivalent to the previous example
542   ->connect_info(
543     [
544       'dbi:Pg:dbname=foo',
545       'postgres',
546       'my_pg_password',
547       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
548     ]
549   );
550
551   # Same, but with hashref as argument
552   # See parse_connect_info for explanation
553   ->connect_info(
554     [{
555       dsn         => 'dbi:Pg:dbname=foo',
556       user        => 'postgres',
557       password    => 'my_pg_password',
558       AutoCommit  => 1,
559       quote_char  => q{"},
560       name_sep    => q{.},
561     }]
562   );
563
564   # Subref + DBIx::Class-specific connection options
565   ->connect_info(
566     [
567       sub { DBI->connect(...) },
568       {
569           quote_char => q{`},
570           name_sep => q{@},
571           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
572           disable_sth_caching => 1,
573       },
574     ]
575   );
576
577
578
579 =cut
580
581 sub connect_info {
582   my ($self, $info) = @_;
583
584   return $self->_connect_info if !$info;
585
586   $self->_connect_info($info); # copy for _connect_info
587
588   $info = $self->_normalize_connect_info($info)
589     if ref $info eq 'ARRAY';
590
591   for my $storage_opt (keys %{ $info->{storage_options} }) {
592     my $value = $info->{storage_options}{$storage_opt};
593
594     $self->$storage_opt($value);
595   }
596
597   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
598   #  the new set of options
599   $self->_sql_maker(undef);
600   $self->_sql_maker_opts({});
601
602   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
603     my $value = $info->{sql_maker_options}{$sql_maker_opt};
604
605     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
606   }
607
608   my %attrs = (
609     %{ $self->_default_dbi_connect_attributes || {} },
610     %{ $info->{attributes} || {} },
611   );
612
613   my @args = @{ $info->{arguments} };
614
615   $self->_dbi_connect_info([@args,
616     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
617
618   # FIXME - dirty:
619   # save attributes them in a separate accessor so they are always
620   # introspectable, even in case of a CODE $dbhmaker
621   $self->_dbic_connect_attributes (\%attrs);
622
623   return $self->_connect_info;
624 }
625
626 sub _normalize_connect_info {
627   my ($self, $info_arg) = @_;
628   my %info;
629
630   my @args = @$info_arg;  # take a shallow copy for further mutilation
631
632   # combine/pre-parse arguments depending on invocation style
633
634   my %attrs;
635   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
636     %attrs = %{ $args[1] || {} };
637     @args = $args[0];
638   }
639   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
640     %attrs = %{$args[0]};
641     @args = ();
642     if (my $code = delete $attrs{dbh_maker}) {
643       @args = $code;
644
645       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
646       if (@ignored) {
647         carp sprintf (
648             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
649           . "to the result of 'dbh_maker'",
650
651           join (', ', map { "'$_'" } (@ignored) ),
652         );
653       }
654     }
655     else {
656       @args = delete @attrs{qw/dsn user password/};
657     }
658   }
659   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
660     %attrs = (
661       % { $args[3] || {} },
662       % { $args[4] || {} },
663     );
664     @args = @args[0,1,2];
665   }
666
667   $info{arguments} = \@args;
668
669   my @storage_opts = grep exists $attrs{$_},
670     @storage_options, 'cursor_class';
671
672   @{ $info{storage_options} }{@storage_opts} =
673     delete @attrs{@storage_opts} if @storage_opts;
674
675   my @sql_maker_opts = grep exists $attrs{$_},
676     qw/limit_dialect quote_char name_sep/;
677
678   @{ $info{sql_maker_options} }{@sql_maker_opts} =
679     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
680
681   $info{attributes} = \%attrs if %attrs;
682
683   return \%info;
684 }
685
686 sub _default_dbi_connect_attributes {
687   return {
688     AutoCommit => 1,
689     RaiseError => 1,
690     PrintError => 0,
691   };
692 }
693
694 =head2 on_connect_do
695
696 This method is deprecated in favour of setting via L</connect_info>.
697
698 =cut
699
700 =head2 on_disconnect_do
701
702 This method is deprecated in favour of setting via L</connect_info>.
703
704 =cut
705
706 sub _parse_connect_do {
707   my ($self, $type) = @_;
708
709   my $val = $self->$type;
710   return () if not defined $val;
711
712   my @res;
713
714   if (not ref($val)) {
715     push @res, [ 'do_sql', $val ];
716   } elsif (ref($val) eq 'CODE') {
717     push @res, $val;
718   } elsif (ref($val) eq 'ARRAY') {
719     push @res, map { [ 'do_sql', $_ ] } @$val;
720   } else {
721     $self->throw_exception("Invalid type for $type: ".ref($val));
722   }
723
724   return \@res;
725 }
726
727 =head2 dbh_do
728
729 Arguments: ($subref | $method_name), @extra_coderef_args?
730
731 Execute the given $subref or $method_name using the new exception-based
732 connection management.
733
734 The first two arguments will be the storage object that C<dbh_do> was called
735 on and a database handle to use.  Any additional arguments will be passed
736 verbatim to the called subref as arguments 2 and onwards.
737
738 Using this (instead of $self->_dbh or $self->dbh) ensures correct
739 exception handling and reconnection (or failover in future subclasses).
740
741 Your subref should have no side-effects outside of the database, as
742 there is the potential for your subref to be partially double-executed
743 if the database connection was stale/dysfunctional.
744
745 Example:
746
747   my @stuff = $schema->storage->dbh_do(
748     sub {
749       my ($storage, $dbh, @cols) = @_;
750       my $cols = join(q{, }, @cols);
751       $dbh->selectrow_array("SELECT $cols FROM foo");
752     },
753     @column_list
754   );
755
756 =cut
757
758 sub dbh_do {
759   my $self = shift;
760   my $code = shift;
761
762   my $dbh = $self->_get_dbh;
763
764   return $self->$code($dbh, @_)
765     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
766
767   local $self->{_in_dbh_do} = 1;
768
769   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
770   my $args = \@_;
771   return try {
772     $self->$code ($dbh, @$args);
773   } catch {
774     $self->throw_exception($_) if $self->connected;
775
776     # We were not connected - reconnect and retry, but let any
777     #  exception fall right through this time
778     carp "Retrying $code after catching disconnected exception: $_"
779       if $ENV{DBIC_DBIRETRY_DEBUG};
780
781     $self->_populate_dbh;
782     $self->$code($self->_dbh, @$args);
783   };
784 }
785
786 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
787 # It also informs dbh_do to bypass itself while under the direction of txn_do,
788 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
789 sub txn_do {
790   my $self = shift;
791   my $coderef = shift;
792
793   ref $coderef eq 'CODE' or $self->throw_exception
794     ('$coderef must be a CODE reference');
795
796   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
797
798   local $self->{_in_dbh_do} = 1;
799
800   my @result;
801   my $want_array = wantarray;
802
803   my $tried = 0;
804   while(1) {
805     my $exception;
806
807     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
808     my $args = \@_;
809
810     try {
811       $self->txn_begin;
812       if($want_array) {
813           @result = $coderef->(@$args);
814       }
815       elsif(defined $want_array) {
816           $result[0] = $coderef->(@$args);
817       }
818       else {
819           $coderef->(@$args);
820       }
821       $self->txn_commit;
822     } catch {
823       $exception = $_;
824     };
825
826     if(! defined $exception) { return $want_array ? @result : $result[0] }
827
828     if($tried++ || $self->connected) {
829       my $rollback_exception;
830       try { $self->txn_rollback } catch { $rollback_exception = shift };
831       if(defined $rollback_exception) {
832         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
833         $self->throw_exception($exception)  # propagate nested rollback
834           if $rollback_exception =~ /$exception_class/;
835
836         $self->throw_exception(
837           "Transaction aborted: ${exception}. "
838           . "Rollback failed: ${rollback_exception}"
839         );
840       }
841       $self->throw_exception($exception)
842     }
843
844     # We were not connected, and was first try - reconnect and retry
845     # via the while loop
846     carp "Retrying $coderef after catching disconnected exception: $exception"
847       if $ENV{DBIC_TXNRETRY_DEBUG};
848     $self->_populate_dbh;
849   }
850 }
851
852 =head2 disconnect
853
854 Our C<disconnect> method also performs a rollback first if the
855 database is not in C<AutoCommit> mode.
856
857 =cut
858
859 sub disconnect {
860   my ($self) = @_;
861
862   if( $self->_dbh ) {
863     my @actions;
864
865     push @actions, ( $self->on_disconnect_call || () );
866     push @actions, $self->_parse_connect_do ('on_disconnect_do');
867
868     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
869
870     $self->_dbh_rollback unless $self->_dbh_autocommit;
871
872     %{ $self->_dbh->{CachedKids} } = ();
873     $self->_dbh->disconnect;
874     $self->_dbh(undef);
875     $self->{_dbh_gen}++;
876   }
877 }
878
879 =head2 with_deferred_fk_checks
880
881 =over 4
882
883 =item Arguments: C<$coderef>
884
885 =item Return Value: The return value of $coderef
886
887 =back
888
889 Storage specific method to run the code ref with FK checks deferred or
890 in MySQL's case disabled entirely.
891
892 =cut
893
894 # Storage subclasses should override this
895 sub with_deferred_fk_checks {
896   my ($self, $sub) = @_;
897   $sub->();
898 }
899
900 =head2 connected
901
902 =over
903
904 =item Arguments: none
905
906 =item Return Value: 1|0
907
908 =back
909
910 Verifies that the current database handle is active and ready to execute
911 an SQL statement (e.g. the connection did not get stale, server is still
912 answering, etc.) This method is used internally by L</dbh>.
913
914 =cut
915
916 sub connected {
917   my $self = shift;
918   return 0 unless $self->_seems_connected;
919
920   #be on the safe side
921   local $self->_dbh->{RaiseError} = 1;
922
923   return $self->_ping;
924 }
925
926 sub _seems_connected {
927   my $self = shift;
928
929   $self->_preserve_foreign_dbh;
930
931   my $dbh = $self->_dbh
932     or return 0;
933
934   return $dbh->FETCH('Active');
935 }
936
937 sub _ping {
938   my $self = shift;
939
940   my $dbh = $self->_dbh or return 0;
941
942   return $dbh->ping;
943 }
944
945 sub ensure_connected {
946   my ($self) = @_;
947
948   unless ($self->connected) {
949     $self->_populate_dbh;
950   }
951 }
952
953 =head2 dbh
954
955 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
956 is guaranteed to be healthy by implicitly calling L</connected>, and if
957 necessary performing a reconnection before returning. Keep in mind that this
958 is very B<expensive> on some database engines. Consider using L</dbh_do>
959 instead.
960
961 =cut
962
963 sub dbh {
964   my ($self) = @_;
965
966   if (not $self->_dbh) {
967     $self->_populate_dbh;
968   } else {
969     $self->ensure_connected;
970   }
971   return $self->_dbh;
972 }
973
974 # this is the internal "get dbh or connect (don't check)" method
975 sub _get_dbh {
976   my $self = shift;
977   $self->_preserve_foreign_dbh;
978   $self->_populate_dbh unless $self->_dbh;
979   return $self->_dbh;
980 }
981
982 sub sql_maker {
983   my ($self) = @_;
984   unless ($self->_sql_maker) {
985     my $sql_maker_class = $self->sql_maker_class;
986     $self->ensure_class_loaded ($sql_maker_class);
987
988     my %opts = %{$self->_sql_maker_opts||{}};
989     my $dialect =
990       $opts{limit_dialect}
991         ||
992       $self->sql_limit_dialect
993         ||
994       do {
995         my $s_class = (ref $self) || $self;
996         carp (
997           "Your storage class ($s_class) does not set sql_limit_dialect and you "
998         . 'have not supplied an explicit limit_dialect in your connection_info. '
999         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1000         . 'databases but can be (and often is) painfully slow.'
1001         );
1002
1003         'GenericSubQ';
1004       }
1005     ;
1006
1007     $self->_sql_maker($sql_maker_class->new(
1008       bindtype=>'columns',
1009       array_datatypes => 1,
1010       limit_dialect => $dialect,
1011       %opts,
1012     ));
1013   }
1014   return $self->_sql_maker;
1015 }
1016
1017 # nothing to do by default
1018 sub _rebless {}
1019 sub _init {}
1020
1021 sub _populate_dbh {
1022   my ($self) = @_;
1023
1024   my @info = @{$self->_dbi_connect_info || []};
1025   $self->_dbh(undef); # in case ->connected failed we might get sent here
1026   $self->_dbh_details({}); # reset everything we know
1027
1028   $self->_dbh($self->_connect(@info));
1029
1030   $self->_conn_pid($$);
1031   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1032
1033   $self->_determine_driver;
1034
1035   # Always set the transaction depth on connect, since
1036   #  there is no transaction in progress by definition
1037   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1038
1039   $self->_run_connection_actions unless $self->{_in_determine_driver};
1040 }
1041
1042 sub _run_connection_actions {
1043   my $self = shift;
1044   my @actions;
1045
1046   push @actions, ( $self->on_connect_call || () );
1047   push @actions, $self->_parse_connect_do ('on_connect_do');
1048
1049   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1050 }
1051
1052
1053
1054 sub set_use_dbms_capability {
1055   $_[0]->set_inherited ($_[1], $_[2]);
1056 }
1057
1058 sub get_use_dbms_capability {
1059   my ($self, $capname) = @_;
1060
1061   my $use = $self->get_inherited ($capname);
1062   return defined $use
1063     ? $use
1064     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1065   ;
1066 }
1067
1068 sub set_dbms_capability {
1069   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1070 }
1071
1072 sub get_dbms_capability {
1073   my ($self, $capname) = @_;
1074
1075   my $cap = $self->_dbh_details->{capability}{$capname};
1076
1077   unless (defined $cap) {
1078     if (my $meth = $self->can ("_determine$capname")) {
1079       $cap = $self->$meth ? 1 : 0;
1080     }
1081     else {
1082       $cap = 0;
1083     }
1084
1085     $self->set_dbms_capability ($capname, $cap);
1086   }
1087
1088   return $cap;
1089 }
1090
1091 sub _server_info {
1092   my $self = shift;
1093
1094   my $info;
1095   unless ($info = $self->_dbh_details->{info}) {
1096
1097     $info = {};
1098
1099     my $server_version = try { $self->_get_server_version };
1100
1101     if (defined $server_version) {
1102       $info->{dbms_version} = $server_version;
1103
1104       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1105       my @verparts = split (/\./, $numeric_version);
1106       if (
1107         @verparts
1108           &&
1109         $verparts[0] <= 999
1110       ) {
1111         # consider only up to 3 version parts, iff not more than 3 digits
1112         my @use_parts;
1113         while (@verparts && @use_parts < 3) {
1114           my $p = shift @verparts;
1115           last if $p > 999;
1116           push @use_parts, $p;
1117         }
1118         push @use_parts, 0 while @use_parts < 3;
1119
1120         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1121       }
1122     }
1123
1124     $self->_dbh_details->{info} = $info;
1125   }
1126
1127   return $info;
1128 }
1129
1130 sub _get_server_version {
1131   shift->_get_dbh->get_info(18);
1132 }
1133
1134 sub _determine_driver {
1135   my ($self) = @_;
1136
1137   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1138     my $started_connected = 0;
1139     local $self->{_in_determine_driver} = 1;
1140
1141     if (ref($self) eq __PACKAGE__) {
1142       my $driver;
1143       if ($self->_dbh) { # we are connected
1144         $driver = $self->_dbh->{Driver}{Name};
1145         $started_connected = 1;
1146       } else {
1147         # if connect_info is a CODEREF, we have no choice but to connect
1148         if (ref $self->_dbi_connect_info->[0] &&
1149             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1150           $self->_populate_dbh;
1151           $driver = $self->_dbh->{Driver}{Name};
1152         }
1153         else {
1154           # try to use dsn to not require being connected, the driver may still
1155           # force a connection in _rebless to determine version
1156           # (dsn may not be supplied at all if all we do is make a mock-schema)
1157           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1158           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1159           $driver ||= $ENV{DBI_DRIVER};
1160         }
1161       }
1162
1163       if ($driver) {
1164         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1165         if ($self->load_optional_class($storage_class)) {
1166           mro::set_mro($storage_class, 'c3');
1167           bless $self, $storage_class;
1168           $self->_rebless();
1169         }
1170       }
1171     }
1172
1173     $self->_driver_determined(1);
1174
1175     $self->_init; # run driver-specific initializations
1176
1177     $self->_run_connection_actions
1178         if !$started_connected && defined $self->_dbh;
1179   }
1180 }
1181
1182 sub _do_connection_actions {
1183   my $self          = shift;
1184   my $method_prefix = shift;
1185   my $call          = shift;
1186
1187   if (not ref($call)) {
1188     my $method = $method_prefix . $call;
1189     $self->$method(@_);
1190   } elsif (ref($call) eq 'CODE') {
1191     $self->$call(@_);
1192   } elsif (ref($call) eq 'ARRAY') {
1193     if (ref($call->[0]) ne 'ARRAY') {
1194       $self->_do_connection_actions($method_prefix, $_) for @$call;
1195     } else {
1196       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1197     }
1198   } else {
1199     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1200   }
1201
1202   return $self;
1203 }
1204
1205 sub connect_call_do_sql {
1206   my $self = shift;
1207   $self->_do_query(@_);
1208 }
1209
1210 sub disconnect_call_do_sql {
1211   my $self = shift;
1212   $self->_do_query(@_);
1213 }
1214
1215 # override in db-specific backend when necessary
1216 sub connect_call_datetime_setup { 1 }
1217
1218 sub _do_query {
1219   my ($self, $action) = @_;
1220
1221   if (ref $action eq 'CODE') {
1222     $action = $action->($self);
1223     $self->_do_query($_) foreach @$action;
1224   }
1225   else {
1226     # Most debuggers expect ($sql, @bind), so we need to exclude
1227     # the attribute hash which is the second argument to $dbh->do
1228     # furthermore the bind values are usually to be presented
1229     # as named arrayref pairs, so wrap those here too
1230     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1231     my $sql = shift @do_args;
1232     my $attrs = shift @do_args;
1233     my @bind = map { [ undef, $_ ] } @do_args;
1234
1235     $self->_query_start($sql, @bind);
1236     $self->_get_dbh->do($sql, $attrs, @do_args);
1237     $self->_query_end($sql, @bind);
1238   }
1239
1240   return $self;
1241 }
1242
1243 sub _connect {
1244   my ($self, @info) = @_;
1245
1246   $self->throw_exception("You failed to provide any connection info")
1247     if !@info;
1248
1249   my ($old_connect_via, $dbh);
1250
1251   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1252     $old_connect_via = $DBI::connect_via;
1253     $DBI::connect_via = 'connect';
1254   }
1255
1256   try {
1257     if(ref $info[0] eq 'CODE') {
1258        $dbh = $info[0]->();
1259     }
1260     else {
1261        $dbh = DBI->connect(@info);
1262     }
1263
1264     if (!$dbh) {
1265       die $DBI::errstr;
1266     }
1267
1268     unless ($self->unsafe) {
1269
1270       # this odd anonymous coderef dereference is in fact really
1271       # necessary to avoid the unwanted effect described in perl5
1272       # RT#75792
1273       sub {
1274         my $weak_self = $_[0];
1275         weaken $weak_self;
1276
1277         $_[1]->{HandleError} = sub {
1278           if ($weak_self) {
1279             $weak_self->throw_exception("DBI Exception: $_[0]");
1280           }
1281           else {
1282             # the handler may be invoked by something totally out of
1283             # the scope of DBIC
1284             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1285           }
1286         };
1287       }->($self, $dbh);
1288
1289       $dbh->{ShowErrorStatement} = 1;
1290       $dbh->{RaiseError} = 1;
1291       $dbh->{PrintError} = 0;
1292     }
1293   }
1294   catch {
1295     $self->throw_exception("DBI Connection failed: $_")
1296   }
1297   finally {
1298     $DBI::connect_via = $old_connect_via if $old_connect_via;
1299   };
1300
1301   $self->_dbh_autocommit($dbh->{AutoCommit});
1302   $dbh;
1303 }
1304
1305 sub svp_begin {
1306   my ($self, $name) = @_;
1307
1308   $name = $self->_svp_generate_name
1309     unless defined $name;
1310
1311   $self->throw_exception ("You can't use savepoints outside a transaction")
1312     if $self->{transaction_depth} == 0;
1313
1314   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1315     unless $self->can('_svp_begin');
1316
1317   push @{ $self->{savepoints} }, $name;
1318
1319   $self->debugobj->svp_begin($name) if $self->debug;
1320
1321   return $self->_svp_begin($name);
1322 }
1323
1324 sub svp_release {
1325   my ($self, $name) = @_;
1326
1327   $self->throw_exception ("You can't use savepoints outside a transaction")
1328     if $self->{transaction_depth} == 0;
1329
1330   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1331     unless $self->can('_svp_release');
1332
1333   if (defined $name) {
1334     $self->throw_exception ("Savepoint '$name' does not exist")
1335       unless grep { $_ eq $name } @{ $self->{savepoints} };
1336
1337     # Dig through the stack until we find the one we are releasing.  This keeps
1338     # the stack up to date.
1339     my $svp;
1340
1341     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1342   } else {
1343     $name = pop @{ $self->{savepoints} };
1344   }
1345
1346   $self->debugobj->svp_release($name) if $self->debug;
1347
1348   return $self->_svp_release($name);
1349 }
1350
1351 sub svp_rollback {
1352   my ($self, $name) = @_;
1353
1354   $self->throw_exception ("You can't use savepoints outside a transaction")
1355     if $self->{transaction_depth} == 0;
1356
1357   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1358     unless $self->can('_svp_rollback');
1359
1360   if (defined $name) {
1361       # If they passed us a name, verify that it exists in the stack
1362       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1363           $self->throw_exception("Savepoint '$name' does not exist!");
1364       }
1365
1366       # Dig through the stack until we find the one we are releasing.  This keeps
1367       # the stack up to date.
1368       while(my $s = pop(@{ $self->{savepoints} })) {
1369           last if($s eq $name);
1370       }
1371       # Add the savepoint back to the stack, as a rollback doesn't remove the
1372       # named savepoint, only everything after it.
1373       push(@{ $self->{savepoints} }, $name);
1374   } else {
1375       # We'll assume they want to rollback to the last savepoint
1376       $name = $self->{savepoints}->[-1];
1377   }
1378
1379   $self->debugobj->svp_rollback($name) if $self->debug;
1380
1381   return $self->_svp_rollback($name);
1382 }
1383
1384 sub _svp_generate_name {
1385     my ($self) = @_;
1386
1387     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1388 }
1389
1390 sub txn_begin {
1391   my $self = shift;
1392
1393   # this means we have not yet connected and do not know the AC status
1394   # (e.g. coderef $dbh)
1395   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1396
1397   if($self->{transaction_depth} == 0) {
1398     $self->debugobj->txn_begin()
1399       if $self->debug;
1400     $self->_dbh_begin_work;
1401   }
1402   elsif ($self->auto_savepoint) {
1403     $self->svp_begin;
1404   }
1405   $self->{transaction_depth}++;
1406 }
1407
1408 sub _dbh_begin_work {
1409   my $self = shift;
1410
1411   # if the user is utilizing txn_do - good for him, otherwise we need to
1412   # ensure that the $dbh is healthy on BEGIN.
1413   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1414   # will be replaced by a failure of begin_work itself (which will be
1415   # then retried on reconnect)
1416   if ($self->{_in_dbh_do}) {
1417     $self->_dbh->begin_work;
1418   } else {
1419     $self->dbh_do(sub { $_[1]->begin_work });
1420   }
1421 }
1422
1423 sub txn_commit {
1424   my $self = shift;
1425   if ($self->{transaction_depth} == 1) {
1426     $self->debugobj->txn_commit()
1427       if ($self->debug);
1428     $self->_dbh_commit;
1429     $self->{transaction_depth} = 0
1430       if $self->_dbh_autocommit;
1431   }
1432   elsif($self->{transaction_depth} > 1) {
1433     $self->{transaction_depth}--;
1434     $self->svp_release
1435       if $self->auto_savepoint;
1436   }
1437 }
1438
1439 sub _dbh_commit {
1440   my $self = shift;
1441   my $dbh  = $self->_dbh
1442     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1443   $dbh->commit;
1444 }
1445
1446 sub txn_rollback {
1447   my $self = shift;
1448   my $dbh = $self->_dbh;
1449   try {
1450     if ($self->{transaction_depth} == 1) {
1451       $self->debugobj->txn_rollback()
1452         if ($self->debug);
1453       $self->{transaction_depth} = 0
1454         if $self->_dbh_autocommit;
1455       $self->_dbh_rollback;
1456     }
1457     elsif($self->{transaction_depth} > 1) {
1458       $self->{transaction_depth}--;
1459       if ($self->auto_savepoint) {
1460         $self->svp_rollback;
1461         $self->svp_release;
1462       }
1463     }
1464     else {
1465       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1466     }
1467   }
1468   catch {
1469     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1470
1471     if ($_ !~ /$exception_class/) {
1472       # ensure that a failed rollback resets the transaction depth
1473       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1474     }
1475
1476     $self->throw_exception($_)
1477   };
1478 }
1479
1480 sub _dbh_rollback {
1481   my $self = shift;
1482   my $dbh  = $self->_dbh
1483     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1484   $dbh->rollback;
1485 }
1486
1487 # This used to be the top-half of _execute.  It was split out to make it
1488 #  easier to override in NoBindVars without duping the rest.  It takes up
1489 #  all of _execute's args, and emits $sql, @bind.
1490 sub _prep_for_execute {
1491   my ($self, $op, $extra_bind, $ident, $args) = @_;
1492
1493   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1494     $ident = $ident->from();
1495   }
1496
1497   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1498
1499   unshift(@bind,
1500     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1501       if $extra_bind;
1502   return ($sql, \@bind);
1503 }
1504
1505
1506 sub _fix_bind_params {
1507     my ($self, @bind) = @_;
1508
1509     ### Turn @bind from something like this:
1510     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1511     ### to this:
1512     ###   ( "'1'", "'1'", "'3'" )
1513     return
1514         map {
1515             if ( defined( $_ && $_->[1] ) ) {
1516                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1517             }
1518             else { q{'NULL'}; }
1519         } @bind;
1520 }
1521
1522 sub _query_start {
1523     my ( $self, $sql, @bind ) = @_;
1524
1525     if ( $self->debug ) {
1526         @bind = $self->_fix_bind_params(@bind);
1527
1528         $self->debugobj->query_start( $sql, @bind );
1529     }
1530 }
1531
1532 sub _query_end {
1533     my ( $self, $sql, @bind ) = @_;
1534
1535     if ( $self->debug ) {
1536         @bind = $self->_fix_bind_params(@bind);
1537         $self->debugobj->query_end( $sql, @bind );
1538     }
1539 }
1540
1541 sub _dbh_execute {
1542   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1543
1544   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1545
1546   $self->_query_start( $sql, @$bind );
1547
1548   my $sth = $self->sth($sql,$op);
1549
1550   my $placeholder_index = 1;
1551
1552   foreach my $bound (@$bind) {
1553     my $attributes = {};
1554     my($column_name, @data) = @$bound;
1555
1556     if ($bind_attributes) {
1557       $attributes = $bind_attributes->{$column_name}
1558       if defined $bind_attributes->{$column_name};
1559     }
1560
1561     foreach my $data (@data) {
1562       my $ref = ref $data;
1563       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1564
1565       $sth->bind_param($placeholder_index, $data, $attributes);
1566       $placeholder_index++;
1567     }
1568   }
1569
1570   # Can this fail without throwing an exception anyways???
1571   my $rv = $sth->execute();
1572   $self->throw_exception(
1573     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1574   ) if !$rv;
1575
1576   $self->_query_end( $sql, @$bind );
1577
1578   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1579 }
1580
1581 sub _execute {
1582     my $self = shift;
1583     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1584 }
1585
1586 sub _prefetch_insert_auto_nextvals {
1587   my ($self, $source, $to_insert) = @_;
1588
1589   my $upd = {};
1590
1591   foreach my $col ( $source->columns ) {
1592     if ( !defined $to_insert->{$col} ) {
1593       my $col_info = $source->column_info($col);
1594
1595       if ( $col_info->{auto_nextval} ) {
1596         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1597           'nextval',
1598           $col_info->{sequence} ||=
1599             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1600         );
1601       }
1602     }
1603   }
1604
1605   return $upd;
1606 }
1607
1608 sub insert {
1609   my $self = shift;
1610   my ($source, $to_insert, $opts) = @_;
1611
1612   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1613
1614   my $bind_attributes = $self->source_bind_attributes($source);
1615
1616   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1617
1618   if ($opts->{returning}) {
1619     my @ret_cols = @{$opts->{returning}};
1620
1621     my @ret_vals = try {
1622       local $SIG{__WARN__} = sub {};
1623       my @r = $sth->fetchrow_array;
1624       $sth->finish;
1625       @r;
1626     };
1627
1628     my %ret;
1629     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1630
1631     $updated_cols = {
1632       %$updated_cols,
1633       %ret,
1634     };
1635   }
1636
1637   return $updated_cols;
1638 }
1639
1640 ## Currently it is assumed that all values passed will be "normal", i.e. not
1641 ## scalar refs, or at least, all the same type as the first set, the statement is
1642 ## only prepped once.
1643 sub insert_bulk {
1644   my ($self, $source, $cols, $data) = @_;
1645
1646   my %colvalues;
1647   @colvalues{@$cols} = (0..$#$cols);
1648
1649   for my $i (0..$#$cols) {
1650     my $first_val = $data->[0][$i];
1651     next unless ref $first_val eq 'SCALAR';
1652
1653     $colvalues{ $cols->[$i] } = $first_val;
1654   }
1655
1656   # check for bad data and stringify stringifiable objects
1657   my $bad_slice = sub {
1658     my ($msg, $col_idx, $slice_idx) = @_;
1659     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1660       $msg,
1661       $cols->[$col_idx],
1662       do {
1663         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1664         Dumper {
1665           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1666         },
1667       }
1668     );
1669   };
1670
1671   for my $datum_idx (0..$#$data) {
1672     my $datum = $data->[$datum_idx];
1673
1674     for my $col_idx (0..$#$cols) {
1675       my $val            = $datum->[$col_idx];
1676       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1677       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1678
1679       if ($is_literal_sql) {
1680         if (not ref $val) {
1681           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1682         }
1683         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1684           $bad_slice->("$reftype reference found where literal SQL expected",
1685             $col_idx, $datum_idx);
1686         }
1687         elsif ($$val ne $$sqla_bind){
1688           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1689             $col_idx, $datum_idx);
1690         }
1691       }
1692       elsif (my $reftype = ref $val) {
1693         require overload;
1694         if (overload::Method($val, '""')) {
1695           $datum->[$col_idx] = "".$val;
1696         }
1697         else {
1698           $bad_slice->("$reftype reference found where bind expected",
1699             $col_idx, $datum_idx);
1700         }
1701       }
1702     }
1703   }
1704
1705   my ($sql, $bind) = $self->_prep_for_execute (
1706     'insert', undef, $source, [\%colvalues]
1707   );
1708   my @bind = @$bind;
1709
1710   my $empty_bind = 1 if (not @bind) &&
1711     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1712
1713   if ((not @bind) && (not $empty_bind)) {
1714     $self->throw_exception(
1715       'Cannot insert_bulk without support for placeholders'
1716     );
1717   }
1718
1719   # neither _execute_array, nor _execute_inserts_with_no_binds are
1720   # atomic (even if _execute _array is a single call). Thus a safety
1721   # scope guard
1722   my $guard = $self->txn_scope_guard;
1723
1724   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1725   my $sth = $self->sth($sql);
1726   my $rv = do {
1727     if ($empty_bind) {
1728       # bind_param_array doesn't work if there are no binds
1729       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1730     }
1731     else {
1732 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1733       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1734     }
1735   };
1736
1737   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1738
1739   $guard->commit;
1740
1741   return (wantarray ? ($rv, $sth, @bind) : $rv);
1742 }
1743
1744 sub _execute_array {
1745   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1746
1747   ## This must be an arrayref, else nothing works!
1748   my $tuple_status = [];
1749
1750   ## Get the bind_attributes, if any exist
1751   my $bind_attributes = $self->source_bind_attributes($source);
1752
1753   ## Bind the values and execute
1754   my $placeholder_index = 1;
1755
1756   foreach my $bound (@$bind) {
1757
1758     my $attributes = {};
1759     my ($column_name, $data_index) = @$bound;
1760
1761     if( $bind_attributes ) {
1762       $attributes = $bind_attributes->{$column_name}
1763       if defined $bind_attributes->{$column_name};
1764     }
1765
1766     my @data = map { $_->[$data_index] } @$data;
1767
1768     $sth->bind_param_array(
1769       $placeholder_index,
1770       [@data],
1771       (%$attributes ?  $attributes : ()),
1772     );
1773     $placeholder_index++;
1774   }
1775
1776   my ($rv, $err);
1777   try {
1778     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1779   }
1780   catch {
1781     $err = shift;
1782   }
1783   finally {
1784     # Statement must finish even if there was an exception.
1785     try {
1786       $sth->finish
1787     }
1788     catch {
1789       $err = shift unless defined $err
1790     };
1791   };
1792
1793   $err = $sth->errstr
1794     if (! defined $err and $sth->err);
1795
1796   if (defined $err) {
1797     my $i = 0;
1798     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1799
1800     $self->throw_exception("Unexpected populate error: $err")
1801       if ($i > $#$tuple_status);
1802
1803     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1804       ($tuple_status->[$i][1] || $err),
1805       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1806     );
1807   }
1808
1809   return $rv;
1810 }
1811
1812 sub _dbh_execute_array {
1813     my ($self, $sth, $tuple_status, @extra) = @_;
1814
1815     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1816 }
1817
1818 sub _dbh_execute_inserts_with_no_binds {
1819   my ($self, $sth, $count) = @_;
1820
1821   my $err;
1822   try {
1823     my $dbh = $self->_get_dbh;
1824     local $dbh->{RaiseError} = 1;
1825     local $dbh->{PrintError} = 0;
1826
1827     $sth->execute foreach 1..$count;
1828   }
1829   catch {
1830     $err = shift;
1831   }
1832   finally {
1833     # Make sure statement is finished even if there was an exception.
1834     try {
1835       $sth->finish
1836     }
1837     catch {
1838       $err = shift unless defined $err;
1839     };
1840   };
1841
1842   $self->throw_exception($err) if defined $err;
1843
1844   return $count;
1845 }
1846
1847 sub update {
1848   my ($self, $source, @args) = @_;
1849
1850   my $bind_attrs = $self->source_bind_attributes($source);
1851
1852   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1853 }
1854
1855
1856 sub delete {
1857   my ($self, $source, @args) = @_;
1858
1859   my $bind_attrs = $self->source_bind_attributes($source);
1860
1861   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1862 }
1863
1864 # We were sent here because the $rs contains a complex search
1865 # which will require a subquery to select the correct rows
1866 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1867 #
1868 # Generating a single PK column subquery is trivial and supported
1869 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1870 # Look at _multipk_update_delete()
1871 sub _subq_update_delete {
1872   my $self = shift;
1873   my ($rs, $op, $values) = @_;
1874
1875   my $rsrc = $rs->result_source;
1876
1877   # quick check if we got a sane rs on our hands
1878   my @pcols = $rsrc->_pri_cols;
1879
1880   my $sel = $rs->_resolved_attrs->{select};
1881   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1882
1883   if (
1884       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1885         ne
1886       join ("\x00", sort @$sel )
1887   ) {
1888     $self->throw_exception (
1889       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1890     );
1891   }
1892
1893   if (@pcols == 1) {
1894     return $self->$op (
1895       $rsrc,
1896       $op eq 'update' ? $values : (),
1897       { $pcols[0] => { -in => $rs->as_query } },
1898     );
1899   }
1900
1901   else {
1902     return $self->_multipk_update_delete (@_);
1903   }
1904 }
1905
1906 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1907 # resultset update/delete involving subqueries. So by default resort
1908 # to simple (and inefficient) delete_all style per-row opearations,
1909 # while allowing specific storages to override this with a faster
1910 # implementation.
1911 #
1912 sub _multipk_update_delete {
1913   return shift->_per_row_update_delete (@_);
1914 }
1915
1916 # This is the default loop used to delete/update rows for multi PK
1917 # resultsets, and used by mysql exclusively (because it can't do anything
1918 # else).
1919 #
1920 # We do not use $row->$op style queries, because resultset update/delete
1921 # is not expected to cascade (this is what delete_all/update_all is for).
1922 #
1923 # There should be no race conditions as the entire operation is rolled
1924 # in a transaction.
1925 #
1926 sub _per_row_update_delete {
1927   my $self = shift;
1928   my ($rs, $op, $values) = @_;
1929
1930   my $rsrc = $rs->result_source;
1931   my @pcols = $rsrc->_pri_cols;
1932
1933   my $guard = $self->txn_scope_guard;
1934
1935   # emulate the return value of $sth->execute for non-selects
1936   my $row_cnt = '0E0';
1937
1938   my $subrs_cur = $rs->cursor;
1939   my @all_pk = $subrs_cur->all;
1940   for my $pks ( @all_pk) {
1941
1942     my $cond;
1943     for my $i (0.. $#pcols) {
1944       $cond->{$pcols[$i]} = $pks->[$i];
1945     }
1946
1947     $self->$op (
1948       $rsrc,
1949       $op eq 'update' ? $values : (),
1950       $cond,
1951     );
1952
1953     $row_cnt++;
1954   }
1955
1956   $guard->commit;
1957
1958   return $row_cnt;
1959 }
1960
1961 sub _select {
1962   my $self = shift;
1963   $self->_execute($self->_select_args(@_));
1964 }
1965
1966 sub _select_args_to_query {
1967   my $self = shift;
1968
1969   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1970   #  = $self->_select_args($ident, $select, $cond, $attrs);
1971   my ($op, $bind, $ident, $bind_attrs, @args) =
1972     $self->_select_args(@_);
1973
1974   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1975   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1976   $prepared_bind ||= [];
1977
1978   return wantarray
1979     ? ($sql, $prepared_bind, $bind_attrs)
1980     : \[ "($sql)", @$prepared_bind ]
1981   ;
1982 }
1983
1984 sub _select_args {
1985   my ($self, $ident, $select, $where, $attrs) = @_;
1986
1987   my $sql_maker = $self->sql_maker;
1988   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1989
1990   $attrs = {
1991     %$attrs,
1992     select => $select,
1993     from => $ident,
1994     where => $where,
1995     $rs_alias && $alias2source->{$rs_alias}
1996       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1997       : ()
1998     ,
1999   };
2000
2001   # calculate bind_attrs before possible $ident mangling
2002   my $bind_attrs = {};
2003   for my $alias (keys %$alias2source) {
2004     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2005     for my $col (keys %$bindtypes) {
2006
2007       my $fqcn = join ('.', $alias, $col);
2008       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2009
2010       # Unqialified column names are nice, but at the same time can be
2011       # rather ambiguous. What we do here is basically go along with
2012       # the loop, adding an unqualified column slot to $bind_attrs,
2013       # alongside the fully qualified name. As soon as we encounter
2014       # another column by that name (which would imply another table)
2015       # we unset the unqualified slot and never add any info to it
2016       # to avoid erroneous type binding. If this happens the users
2017       # only choice will be to fully qualify his column name
2018
2019       if (exists $bind_attrs->{$col}) {
2020         $bind_attrs->{$col} = {};
2021       }
2022       else {
2023         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2024       }
2025     }
2026   }
2027
2028   # Sanity check the attributes (SQLMaker does it too, but
2029   # in case of a software_limit we'll never reach there)
2030   if (defined $attrs->{offset}) {
2031     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2032       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2033   }
2034   $attrs->{offset} ||= 0;
2035
2036   if (defined $attrs->{rows}) {
2037     $self->throw_exception("The rows attribute must be a positive integer if present")
2038       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2039   }
2040   elsif ($attrs->{offset}) {
2041     # MySQL actually recommends this approach.  I cringe.
2042     $attrs->{rows} = $sql_maker->__max_int;
2043   }
2044
2045   my @limit;
2046
2047   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2048   # storage, unless software limit was requested
2049   if (
2050     #limited has_many
2051     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2052        ||
2053     # grouped prefetch (to satisfy group_by == select)
2054     ( $attrs->{group_by}
2055         &&
2056       @{$attrs->{group_by}}
2057         &&
2058       $attrs->{_prefetch_select}
2059         &&
2060       @{$attrs->{_prefetch_select}}
2061     )
2062   ) {
2063     ($ident, $select, $where, $attrs)
2064       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2065   }
2066   elsif (! $attrs->{software_limit} ) {
2067     push @limit, $attrs->{rows}, $attrs->{offset};
2068   }
2069
2070   # try to simplify the joinmap further (prune unreferenced type-single joins)
2071   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2072
2073 ###
2074   # This would be the point to deflate anything found in $where
2075   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2076   # expect a row object. And all we have is a resultsource (it is trivial
2077   # to extract deflator coderefs via $alias2source above).
2078   #
2079   # I don't see a way forward other than changing the way deflators are
2080   # invoked, and that's just bad...
2081 ###
2082
2083   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2084 }
2085
2086 # Returns a counting SELECT for a simple count
2087 # query. Abstracted so that a storage could override
2088 # this to { count => 'firstcol' } or whatever makes
2089 # sense as a performance optimization
2090 sub _count_select {
2091   #my ($self, $source, $rs_attrs) = @_;
2092   return { count => '*' };
2093 }
2094
2095
2096 sub source_bind_attributes {
2097   my ($self, $source) = @_;
2098
2099   my $bind_attributes;
2100   foreach my $column ($source->columns) {
2101
2102     my $data_type = $source->column_info($column)->{data_type} || '';
2103     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2104      if $data_type;
2105   }
2106
2107   return $bind_attributes;
2108 }
2109
2110 =head2 select
2111
2112 =over 4
2113
2114 =item Arguments: $ident, $select, $condition, $attrs
2115
2116 =back
2117
2118 Handle a SQL select statement.
2119
2120 =cut
2121
2122 sub select {
2123   my $self = shift;
2124   my ($ident, $select, $condition, $attrs) = @_;
2125   return $self->cursor_class->new($self, \@_, $attrs);
2126 }
2127
2128 sub select_single {
2129   my $self = shift;
2130   my ($rv, $sth, @bind) = $self->_select(@_);
2131   my @row = $sth->fetchrow_array;
2132   my @nextrow = $sth->fetchrow_array if @row;
2133   if(@row && @nextrow) {
2134     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2135   }
2136   # Need to call finish() to work round broken DBDs
2137   $sth->finish();
2138   return @row;
2139 }
2140
2141 =head2 sql_limit_dialect
2142
2143 This is an accessor for the default SQL limit dialect used by a particular
2144 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2145 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2146 see L<DBIx::Class::SQLMaker::LimitDialects>.
2147
2148 =head2 sth
2149
2150 =over 4
2151
2152 =item Arguments: $sql
2153
2154 =back
2155
2156 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2157
2158 =cut
2159
2160 sub _dbh_sth {
2161   my ($self, $dbh, $sql) = @_;
2162
2163   # 3 is the if_active parameter which avoids active sth re-use
2164   my $sth = $self->disable_sth_caching
2165     ? $dbh->prepare($sql)
2166     : $dbh->prepare_cached($sql, {}, 3);
2167
2168   # XXX You would think RaiseError would make this impossible,
2169   #  but apparently that's not true :(
2170   $self->throw_exception($dbh->errstr) if !$sth;
2171
2172   $sth;
2173 }
2174
2175 sub sth {
2176   my ($self, $sql) = @_;
2177   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2178 }
2179
2180 sub _dbh_columns_info_for {
2181   my ($self, $dbh, $table) = @_;
2182
2183   if ($dbh->can('column_info')) {
2184     my %result;
2185     my $caught;
2186     try {
2187       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2188       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2189       $sth->execute();
2190       while ( my $info = $sth->fetchrow_hashref() ){
2191         my %column_info;
2192         $column_info{data_type}   = $info->{TYPE_NAME};
2193         $column_info{size}      = $info->{COLUMN_SIZE};
2194         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2195         $column_info{default_value} = $info->{COLUMN_DEF};
2196         my $col_name = $info->{COLUMN_NAME};
2197         $col_name =~ s/^\"(.*)\"$/$1/;
2198
2199         $result{$col_name} = \%column_info;
2200       }
2201     } catch {
2202       $caught = 1;
2203     };
2204     return \%result if !$caught && scalar keys %result;
2205   }
2206
2207   my %result;
2208   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2209   $sth->execute;
2210   my @columns = @{$sth->{NAME_lc}};
2211   for my $i ( 0 .. $#columns ){
2212     my %column_info;
2213     $column_info{data_type} = $sth->{TYPE}->[$i];
2214     $column_info{size} = $sth->{PRECISION}->[$i];
2215     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2216
2217     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2218       $column_info{data_type} = $1;
2219       $column_info{size}    = $2;
2220     }
2221
2222     $result{$columns[$i]} = \%column_info;
2223   }
2224   $sth->finish;
2225
2226   foreach my $col (keys %result) {
2227     my $colinfo = $result{$col};
2228     my $type_num = $colinfo->{data_type};
2229     my $type_name;
2230     if(defined $type_num && $dbh->can('type_info')) {
2231       my $type_info = $dbh->type_info($type_num);
2232       $type_name = $type_info->{TYPE_NAME} if $type_info;
2233       $colinfo->{data_type} = $type_name if $type_name;
2234     }
2235   }
2236
2237   return \%result;
2238 }
2239
2240 sub columns_info_for {
2241   my ($self, $table) = @_;
2242   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2243 }
2244
2245 =head2 last_insert_id
2246
2247 Return the row id of the last insert.
2248
2249 =cut
2250
2251 sub _dbh_last_insert_id {
2252     my ($self, $dbh, $source, $col) = @_;
2253
2254     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2255
2256     return $id if defined $id;
2257
2258     my $class = ref $self;
2259     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2260 }
2261
2262 sub last_insert_id {
2263   my $self = shift;
2264   $self->_dbh_last_insert_id ($self->_dbh, @_);
2265 }
2266
2267 =head2 _native_data_type
2268
2269 =over 4
2270
2271 =item Arguments: $type_name
2272
2273 =back
2274
2275 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2276 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2277 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2278
2279 The default implementation returns C<undef>, implement in your Storage driver if
2280 you need this functionality.
2281
2282 Should map types from other databases to the native RDBMS type, for example
2283 C<VARCHAR2> to C<VARCHAR>.
2284
2285 Types with modifiers should map to the underlying data type. For example,
2286 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2287
2288 Composite types should map to the container type, for example
2289 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2290
2291 =cut
2292
2293 sub _native_data_type {
2294   #my ($self, $data_type) = @_;
2295   return undef
2296 }
2297
2298 # Check if placeholders are supported at all
2299 sub _determine_supports_placeholders {
2300   my $self = shift;
2301   my $dbh  = $self->_get_dbh;
2302
2303   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2304   # but it is inaccurate more often than not
2305   return try {
2306     local $dbh->{PrintError} = 0;
2307     local $dbh->{RaiseError} = 1;
2308     $dbh->do('select ?', {}, 1);
2309     1;
2310   }
2311   catch {
2312     0;
2313   };
2314 }
2315
2316 # Check if placeholders bound to non-string types throw exceptions
2317 #
2318 sub _determine_supports_typeless_placeholders {
2319   my $self = shift;
2320   my $dbh  = $self->_get_dbh;
2321
2322   return try {
2323     local $dbh->{PrintError} = 0;
2324     local $dbh->{RaiseError} = 1;
2325     # this specifically tests a bind that is NOT a string
2326     $dbh->do('select 1 where 1 = ?', {}, 1);
2327     1;
2328   }
2329   catch {
2330     0;
2331   };
2332 }
2333
2334 =head2 sqlt_type
2335
2336 Returns the database driver name.
2337
2338 =cut
2339
2340 sub sqlt_type {
2341   shift->_get_dbh->{Driver}->{Name};
2342 }
2343
2344 =head2 bind_attribute_by_data_type
2345
2346 Given a datatype from column info, returns a database specific bind
2347 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2348 let the database planner just handle it.
2349
2350 Generally only needed for special case column types, like bytea in postgres.
2351
2352 =cut
2353
2354 sub bind_attribute_by_data_type {
2355     return;
2356 }
2357
2358 =head2 is_datatype_numeric
2359
2360 Given a datatype from column_info, returns a boolean value indicating if
2361 the current RDBMS considers it a numeric value. This controls how
2362 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2363 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2364 be performed instead of the usual C<eq>.
2365
2366 =cut
2367
2368 sub is_datatype_numeric {
2369   my ($self, $dt) = @_;
2370
2371   return 0 unless $dt;
2372
2373   return $dt =~ /^ (?:
2374     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2375   ) $/ix;
2376 }
2377
2378
2379 =head2 create_ddl_dir
2380
2381 =over 4
2382
2383 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2384
2385 =back
2386
2387 Creates a SQL file based on the Schema, for each of the specified
2388 database engines in C<\@databases> in the given directory.
2389 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2390
2391 Given a previous version number, this will also create a file containing
2392 the ALTER TABLE statements to transform the previous schema into the
2393 current one. Note that these statements may contain C<DROP TABLE> or
2394 C<DROP COLUMN> statements that can potentially destroy data.
2395
2396 The file names are created using the C<ddl_filename> method below, please
2397 override this method in your schema if you would like a different file
2398 name format. For the ALTER file, the same format is used, replacing
2399 $version in the name with "$preversion-$version".
2400
2401 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2402 The most common value for this would be C<< { add_drop_table => 1 } >>
2403 to have the SQL produced include a C<DROP TABLE> statement for each table
2404 created. For quoting purposes supply C<quote_table_names> and
2405 C<quote_field_names>.
2406
2407 If no arguments are passed, then the following default values are assumed:
2408
2409 =over 4
2410
2411 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2412
2413 =item version    - $schema->schema_version
2414
2415 =item directory  - './'
2416
2417 =item preversion - <none>
2418
2419 =back
2420
2421 By default, C<\%sqlt_args> will have
2422
2423  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2424
2425 merged with the hash passed in. To disable any of those features, pass in a
2426 hashref like the following
2427
2428  { ignore_constraint_names => 0, # ... other options }
2429
2430
2431 WARNING: You are strongly advised to check all SQL files created, before applying
2432 them.
2433
2434 =cut
2435
2436 sub create_ddl_dir {
2437   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2438
2439   unless ($dir) {
2440     carp "No directory given, using ./\n";
2441     $dir = './';
2442   } else {
2443       -d $dir
2444         or
2445       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2446         or
2447       $self->throw_exception(
2448         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2449       );
2450   }
2451
2452   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2453
2454   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2455   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2456
2457   my $schema_version = $schema->schema_version || '1.x';
2458   $version ||= $schema_version;
2459
2460   $sqltargs = {
2461     add_drop_table => 1,
2462     ignore_constraint_names => 1,
2463     ignore_index_names => 1,
2464     %{$sqltargs || {}}
2465   };
2466
2467   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2468     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2469   }
2470
2471   my $sqlt = SQL::Translator->new( $sqltargs );
2472
2473   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2474   my $sqlt_schema = $sqlt->translate({ data => $schema })
2475     or $self->throw_exception ($sqlt->error);
2476
2477   foreach my $db (@$databases) {
2478     $sqlt->reset();
2479     $sqlt->{schema} = $sqlt_schema;
2480     $sqlt->producer($db);
2481
2482     my $file;
2483     my $filename = $schema->ddl_filename($db, $version, $dir);
2484     if (-e $filename && ($version eq $schema_version )) {
2485       # if we are dumping the current version, overwrite the DDL
2486       carp "Overwriting existing DDL file - $filename";
2487       unlink($filename);
2488     }
2489
2490     my $output = $sqlt->translate;
2491     if(!$output) {
2492       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2493       next;
2494     }
2495     if(!open($file, ">$filename")) {
2496       $self->throw_exception("Can't open $filename for writing ($!)");
2497       next;
2498     }
2499     print $file $output;
2500     close($file);
2501
2502     next unless ($preversion);
2503
2504     require SQL::Translator::Diff;
2505
2506     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2507     if(!-e $prefilename) {
2508       carp("No previous schema file found ($prefilename)");
2509       next;
2510     }
2511
2512     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2513     if(-e $difffile) {
2514       carp("Overwriting existing diff file - $difffile");
2515       unlink($difffile);
2516     }
2517
2518     my $source_schema;
2519     {
2520       my $t = SQL::Translator->new($sqltargs);
2521       $t->debug( 0 );
2522       $t->trace( 0 );
2523
2524       $t->parser( $db )
2525         or $self->throw_exception ($t->error);
2526
2527       my $out = $t->translate( $prefilename )
2528         or $self->throw_exception ($t->error);
2529
2530       $source_schema = $t->schema;
2531
2532       $source_schema->name( $prefilename )
2533         unless ( $source_schema->name );
2534     }
2535
2536     # The "new" style of producers have sane normalization and can support
2537     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2538     # And we have to diff parsed SQL against parsed SQL.
2539     my $dest_schema = $sqlt_schema;
2540
2541     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2542       my $t = SQL::Translator->new($sqltargs);
2543       $t->debug( 0 );
2544       $t->trace( 0 );
2545
2546       $t->parser( $db )
2547         or $self->throw_exception ($t->error);
2548
2549       my $out = $t->translate( $filename )
2550         or $self->throw_exception ($t->error);
2551
2552       $dest_schema = $t->schema;
2553
2554       $dest_schema->name( $filename )
2555         unless $dest_schema->name;
2556     }
2557
2558     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2559                                                   $dest_schema,   $db,
2560                                                   $sqltargs
2561                                                  );
2562     if(!open $file, ">$difffile") {
2563       $self->throw_exception("Can't write to $difffile ($!)");
2564       next;
2565     }
2566     print $file $diff;
2567     close($file);
2568   }
2569 }
2570
2571 =head2 deployment_statements
2572
2573 =over 4
2574
2575 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2576
2577 =back
2578
2579 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2580
2581 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2582 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2583
2584 C<$directory> is used to return statements from files in a previously created
2585 L</create_ddl_dir> directory and is optional. The filenames are constructed
2586 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2587
2588 If no C<$directory> is specified then the statements are constructed on the
2589 fly using L<SQL::Translator> and C<$version> is ignored.
2590
2591 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2592
2593 =cut
2594
2595 sub deployment_statements {
2596   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2597   $type ||= $self->sqlt_type;
2598   $version ||= $schema->schema_version || '1.x';
2599   $dir ||= './';
2600   my $filename = $schema->ddl_filename($type, $version, $dir);
2601   if(-f $filename)
2602   {
2603       my $file;
2604       open($file, "<$filename")
2605         or $self->throw_exception("Can't open $filename ($!)");
2606       my @rows = <$file>;
2607       close($file);
2608       return join('', @rows);
2609   }
2610
2611   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2612     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2613   }
2614
2615   # sources needs to be a parser arg, but for simplicty allow at top level
2616   # coming in
2617   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2618       if exists $sqltargs->{sources};
2619
2620   my $tr = SQL::Translator->new(
2621     producer => "SQL::Translator::Producer::${type}",
2622     %$sqltargs,
2623     parser => 'SQL::Translator::Parser::DBIx::Class',
2624     data => $schema,
2625   );
2626
2627   my @ret;
2628   my $wa = wantarray;
2629   if ($wa) {
2630     @ret = $tr->translate;
2631   }
2632   else {
2633     $ret[0] = $tr->translate;
2634   }
2635
2636   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2637     unless (@ret && defined $ret[0]);
2638
2639   return $wa ? @ret : $ret[0];
2640 }
2641
2642 sub deploy {
2643   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2644   my $deploy = sub {
2645     my $line = shift;
2646     return if($line =~ /^--/);
2647     return if(!$line);
2648     # next if($line =~ /^DROP/m);
2649     return if($line =~ /^BEGIN TRANSACTION/m);
2650     return if($line =~ /^COMMIT/m);
2651     return if $line =~ /^\s+$/; # skip whitespace only
2652     $self->_query_start($line);
2653     try {
2654       # do a dbh_do cycle here, as we need some error checking in
2655       # place (even though we will ignore errors)
2656       $self->dbh_do (sub { $_[1]->do($line) });
2657     } catch {
2658       carp qq{$_ (running "${line}")};
2659     };
2660     $self->_query_end($line);
2661   };
2662   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2663   if (@statements > 1) {
2664     foreach my $statement (@statements) {
2665       $deploy->( $statement );
2666     }
2667   }
2668   elsif (@statements == 1) {
2669     foreach my $line ( split(";\n", $statements[0])) {
2670       $deploy->( $line );
2671     }
2672   }
2673 }
2674
2675 =head2 datetime_parser
2676
2677 Returns the datetime parser class
2678
2679 =cut
2680
2681 sub datetime_parser {
2682   my $self = shift;
2683   return $self->{datetime_parser} ||= do {
2684     $self->build_datetime_parser(@_);
2685   };
2686 }
2687
2688 =head2 datetime_parser_type
2689
2690 Defines (returns) the datetime parser class - currently hardwired to
2691 L<DateTime::Format::MySQL>
2692
2693 =cut
2694
2695 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2696
2697 =head2 build_datetime_parser
2698
2699 See L</datetime_parser>
2700
2701 =cut
2702
2703 sub build_datetime_parser {
2704   my $self = shift;
2705   my $type = $self->datetime_parser_type(@_);
2706   $self->ensure_class_loaded ($type);
2707   return $type;
2708 }
2709
2710
2711 =head2 is_replicating
2712
2713 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2714 replicate from a master database.  Default is undef, which is the result
2715 returned by databases that don't support replication.
2716
2717 =cut
2718
2719 sub is_replicating {
2720     return;
2721
2722 }
2723
2724 =head2 lag_behind_master
2725
2726 Returns a number that represents a certain amount of lag behind a master db
2727 when a given storage is replicating.  The number is database dependent, but
2728 starts at zero and increases with the amount of lag. Default in undef
2729
2730 =cut
2731
2732 sub lag_behind_master {
2733     return;
2734 }
2735
2736 =head2 relname_to_table_alias
2737
2738 =over 4
2739
2740 =item Arguments: $relname, $join_count
2741
2742 =back
2743
2744 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2745 queries.
2746
2747 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2748 way these aliases are named.
2749
2750 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2751 otherwise C<"$relname">.
2752
2753 =cut
2754
2755 sub relname_to_table_alias {
2756   my ($self, $relname, $join_count) = @_;
2757
2758   my $alias = ($join_count && $join_count > 1 ?
2759     join('_', $relname, $join_count) : $relname);
2760
2761   return $alias;
2762 }
2763
2764 1;
2765
2766 =head1 USAGE NOTES
2767
2768 =head2 DBIx::Class and AutoCommit
2769
2770 DBIx::Class can do some wonderful magic with handling exceptions,
2771 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2772 (the default) combined with C<txn_do> for transaction support.
2773
2774 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2775 in an assumed transaction between commits, and you're telling us you'd
2776 like to manage that manually.  A lot of the magic protections offered by
2777 this module will go away.  We can't protect you from exceptions due to database
2778 disconnects because we don't know anything about how to restart your
2779 transactions.  You're on your own for handling all sorts of exceptional
2780 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2781 be with raw DBI.
2782
2783
2784 =head1 AUTHORS
2785
2786 Matt S. Trout <mst@shadowcatsystems.co.uk>
2787
2788 Andy Grundman <andy@hybridized.org>
2789
2790 =head1 LICENSE
2791
2792 You may distribute this code under the same terms as Perl itself.
2793
2794 =cut