DBIC now warns on explicit false AutoCommit, and when altering external $dbh's
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use overload ();
20 use namespace::clean;
21
22
23 # default cursor class, overridable in connect_info attributes
24 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
25
26 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
27 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/
30   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
31   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
32   transaction_depth _dbh_autocommit  savepoints
33 /);
34
35 # the values for these accessors are picked out (and deleted) from
36 # the attribute hashref passed to connect_info
37 my @storage_options = qw/
38   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
39   disable_sth_caching unsafe auto_savepoint
40 /;
41 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
42
43
44 # capability definitions, using a 2-tiered accessor system
45 # The rationale is:
46 #
47 # A driver/user may define _use_X, which blindly without any checks says:
48 # "(do not) use this capability", (use_dbms_capability is an "inherited"
49 # type accessor)
50 #
51 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
52 # accessor, which in turn calls _determine_supports_X, and stores the return
53 # in a special slot on the storage object, which is wiped every time a $dbh
54 # reconnection takes place (it is not guaranteed that upon reconnection we
55 # will get the same rdbms version). _determine_supports_X does not need to
56 # exist on a driver, as we ->can for it before calling.
57
58 my @capabilities = (qw/
59   insert_returning
60   insert_returning_bound
61   placeholders
62   typeless_placeholders
63   join_optimizer
64 /);
65 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
66 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
67
68 # on by default, not strictly a capability (pending rewrite)
69 __PACKAGE__->_use_join_optimizer (1);
70 sub _determine_supports_join_optimizer { 1 };
71
72 # Each of these methods need _determine_driver called before itself
73 # in order to function reliably. This is a purely DRY optimization
74 #
75 # get_(use)_dbms_capability need to be called on the correct Storage
76 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
77 # _determine_supports_X which obv. needs a correct driver as well
78 my @rdbms_specific_methods = qw/
79   deployment_statements
80   sqlt_type
81   sql_maker
82   build_datetime_parser
83   datetime_parser_type
84
85   insert
86   insert_bulk
87   update
88   delete
89   select
90   select_single
91
92   get_use_dbms_capability
93   get_dbms_capability
94
95   _server_info
96   _get_server_version
97 /;
98
99 for my $meth (@rdbms_specific_methods) {
100
101   my $orig = __PACKAGE__->can ($meth)
102     or die "$meth is not a ::Storage::DBI method!";
103
104   no strict qw/refs/;
105   no warnings qw/redefine/;
106   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
107     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
108       $_[0]->_determine_driver;
109
110       # This for some reason crashes and burns on perl 5.8.1
111       # IFF the method ends up throwing an exception
112       #goto $_[0]->can ($meth);
113
114       my $cref = $_[0]->can ($meth);
115       goto $cref;
116     }
117     goto $orig;
118   };
119 }
120
121
122 =head1 NAME
123
124 DBIx::Class::Storage::DBI - DBI storage handler
125
126 =head1 SYNOPSIS
127
128   my $schema = MySchema->connect('dbi:SQLite:my.db');
129
130   $schema->storage->debug(1);
131
132   my @stuff = $schema->storage->dbh_do(
133     sub {
134       my ($storage, $dbh, @args) = @_;
135       $dbh->do("DROP TABLE authors");
136     },
137     @column_list
138   );
139
140   $schema->resultset('Book')->search({
141      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
142   });
143
144 =head1 DESCRIPTION
145
146 This class represents the connection to an RDBMS via L<DBI>.  See
147 L<DBIx::Class::Storage> for general information.  This pod only
148 documents DBI-specific methods and behaviors.
149
150 =head1 METHODS
151
152 =cut
153
154 sub new {
155   my $new = shift->next::method(@_);
156
157   $new->transaction_depth(0);
158   $new->_sql_maker_opts({});
159   $new->_dbh_details({});
160   $new->{savepoints} = [];
161   $new->{_in_dbh_do} = 0;
162   $new->{_dbh_gen} = 0;
163
164   # read below to see what this does
165   $new->_arm_global_destructor;
166
167   $new;
168 }
169
170 # This is hack to work around perl shooting stuff in random
171 # order on exit(). If we do not walk the remaining storage
172 # objects in an END block, there is a *small but real* chance
173 # of a fork()ed child to kill the parent's shared DBI handle,
174 # *before perl reaches the DESTROY in this package*
175 # Yes, it is ugly and effective.
176 # Additionally this registry is used by the CLONE method to
177 # make sure no handles are shared between threads
178 {
179   my %seek_and_destroy;
180
181   sub _arm_global_destructor {
182     my $self = shift;
183     my $key = refaddr ($self);
184     $seek_and_destroy{$key} = $self;
185     weaken ($seek_and_destroy{$key});
186   }
187
188   END {
189     local $?; # just in case the DBI destructor changes it somehow
190
191     # destroy just the object if not native to this process/thread
192     $_->_verify_pid for (grep
193       { defined $_ }
194       values %seek_and_destroy
195     );
196   }
197
198   sub CLONE {
199     # As per DBI's recommendation, DBIC disconnects all handles as
200     # soon as possible (DBIC will reconnect only on demand from within
201     # the thread)
202     for (values %seek_and_destroy) {
203       next unless $_;
204       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
205       $_->_dbh(undef);
206     }
207   }
208 }
209
210 sub DESTROY {
211   my $self = shift;
212
213   # some databases spew warnings on implicit disconnect
214   local $SIG{__WARN__} = sub {};
215   $self->_dbh(undef);
216
217   # this op is necessary, since the very last perl runtime statement
218   # triggers a global destruction shootout, and the $SIG localization
219   # may very well be destroyed before perl actually gets to do the
220   # $dbh undef
221   1;
222 }
223
224 # handle pid changes correctly - do not destroy parent's connection
225 sub _verify_pid {
226   my $self = shift;
227
228   my $pid = $self->_conn_pid;
229   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
230     $dbh->{InactiveDestroy} = 1;
231     $self->{_dbh_gen}++;
232     $self->_dbh(undef);
233   }
234
235   return;
236 }
237
238 =head2 connect_info
239
240 This method is normally called by L<DBIx::Class::Schema/connection>, which
241 encapsulates its argument list in an arrayref before passing them here.
242
243 The argument list may contain:
244
245 =over
246
247 =item *
248
249 The same 4-element argument set one would normally pass to
250 L<DBI/connect>, optionally followed by
251 L<extra attributes|/DBIx::Class specific connection attributes>
252 recognized by DBIx::Class:
253
254   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
255
256 =item *
257
258 A single code reference which returns a connected
259 L<DBI database handle|DBI/connect> optionally followed by
260 L<extra attributes|/DBIx::Class specific connection attributes> recognized
261 by DBIx::Class:
262
263   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
264
265 =item *
266
267 A single hashref with all the attributes and the dsn/user/password
268 mixed together:
269
270   $connect_info_args = [{
271     dsn => $dsn,
272     user => $user,
273     password => $pass,
274     %dbi_attributes,
275     %extra_attributes,
276   }];
277
278   $connect_info_args = [{
279     dbh_maker => sub { DBI->connect (...) },
280     %dbi_attributes,
281     %extra_attributes,
282   }];
283
284 This is particularly useful for L<Catalyst> based applications, allowing the
285 following config (L<Config::General> style):
286
287   <Model::DB>
288     schema_class   App::DB
289     <connect_info>
290       dsn          dbi:mysql:database=test
291       user         testuser
292       password     TestPass
293       AutoCommit   1
294     </connect_info>
295   </Model::DB>
296
297 The C<dsn>/C<user>/C<password> combination can be substituted by the
298 C<dbh_maker> key whose value is a coderef that returns a connected
299 L<DBI database handle|DBI/connect>
300
301 =back
302
303 Please note that the L<DBI> docs recommend that you always explicitly
304 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
305 recommends that it be set to I<1>, and that you perform transactions
306 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
307 to I<1> if you do not do explicitly set it to zero.  This is the default
308 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
309
310 =head3 DBIx::Class specific connection attributes
311
312 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
313 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
314 the following connection options. These options can be mixed in with your other
315 L<DBI> connection attributes, or placed in a separate hashref
316 (C<\%extra_attributes>) as shown above.
317
318 Every time C<connect_info> is invoked, any previous settings for
319 these options will be cleared before setting the new ones, regardless of
320 whether any options are specified in the new C<connect_info>.
321
322
323 =over
324
325 =item on_connect_do
326
327 Specifies things to do immediately after connecting or re-connecting to
328 the database.  Its value may contain:
329
330 =over
331
332 =item a scalar
333
334 This contains one SQL statement to execute.
335
336 =item an array reference
337
338 This contains SQL statements to execute in order.  Each element contains
339 a string or a code reference that returns a string.
340
341 =item a code reference
342
343 This contains some code to execute.  Unlike code references within an
344 array reference, its return value is ignored.
345
346 =back
347
348 =item on_disconnect_do
349
350 Takes arguments in the same form as L</on_connect_do> and executes them
351 immediately before disconnecting from the database.
352
353 Note, this only runs if you explicitly call L</disconnect> on the
354 storage object.
355
356 =item on_connect_call
357
358 A more generalized form of L</on_connect_do> that calls the specified
359 C<connect_call_METHOD> methods in your storage driver.
360
361   on_connect_do => 'select 1'
362
363 is equivalent to:
364
365   on_connect_call => [ [ do_sql => 'select 1' ] ]
366
367 Its values may contain:
368
369 =over
370
371 =item a scalar
372
373 Will call the C<connect_call_METHOD> method.
374
375 =item a code reference
376
377 Will execute C<< $code->($storage) >>
378
379 =item an array reference
380
381 Each value can be a method name or code reference.
382
383 =item an array of arrays
384
385 For each array, the first item is taken to be the C<connect_call_> method name
386 or code reference, and the rest are parameters to it.
387
388 =back
389
390 Some predefined storage methods you may use:
391
392 =over
393
394 =item do_sql
395
396 Executes a SQL string or a code reference that returns a SQL string. This is
397 what L</on_connect_do> and L</on_disconnect_do> use.
398
399 It can take:
400
401 =over
402
403 =item a scalar
404
405 Will execute the scalar as SQL.
406
407 =item an arrayref
408
409 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
410 attributes hashref and bind values.
411
412 =item a code reference
413
414 Will execute C<< $code->($storage) >> and execute the return array refs as
415 above.
416
417 =back
418
419 =item datetime_setup
420
421 Execute any statements necessary to initialize the database session to return
422 and accept datetime/timestamp values used with
423 L<DBIx::Class::InflateColumn::DateTime>.
424
425 Only necessary for some databases, see your specific storage driver for
426 implementation details.
427
428 =back
429
430 =item on_disconnect_call
431
432 Takes arguments in the same form as L</on_connect_call> and executes them
433 immediately before disconnecting from the database.
434
435 Calls the C<disconnect_call_METHOD> methods as opposed to the
436 C<connect_call_METHOD> methods called by L</on_connect_call>.
437
438 Note, this only runs if you explicitly call L</disconnect> on the
439 storage object.
440
441 =item disable_sth_caching
442
443 If set to a true value, this option will disable the caching of
444 statement handles via L<DBI/prepare_cached>.
445
446 =item limit_dialect
447
448 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
449 default L</sql_limit_dialect> setting of the storage (if any). For a list
450 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
451
452 =item quote_char
453
454 Specifies what characters to use to quote table and column names.
455
456 C<quote_char> expects either a single character, in which case is it
457 is placed on either side of the table/column name, or an arrayref of length
458 2 in which case the table/column name is placed between the elements.
459
460 For example under MySQL you should use C<< quote_char => '`' >>, and for
461 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
462
463 =item name_sep
464
465 This parameter is only useful in conjunction with C<quote_char>, and is used to
466 specify the character that separates elements (schemas, tables, columns) from
467 each other. If unspecified it defaults to the most commonly used C<.>.
468
469 =item unsafe
470
471 This Storage driver normally installs its own C<HandleError>, sets
472 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
473 all database handles, including those supplied by a coderef.  It does this
474 so that it can have consistent and useful error behavior.
475
476 If you set this option to a true value, Storage will not do its usual
477 modifications to the database handle's attributes, and instead relies on
478 the settings in your connect_info DBI options (or the values you set in
479 your connection coderef, in the case that you are connecting via coderef).
480
481 Note that your custom settings can cause Storage to malfunction,
482 especially if you set a C<HandleError> handler that suppresses exceptions
483 and/or disable C<RaiseError>.
484
485 =item auto_savepoint
486
487 If this option is true, L<DBIx::Class> will use savepoints when nesting
488 transactions, making it possible to recover from failure in the inner
489 transaction without having to abort all outer transactions.
490
491 =item cursor_class
492
493 Use this argument to supply a cursor class other than the default
494 L<DBIx::Class::Storage::DBI::Cursor>.
495
496 =back
497
498 Some real-life examples of arguments to L</connect_info> and
499 L<DBIx::Class::Schema/connect>
500
501   # Simple SQLite connection
502   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
503
504   # Connect via subref
505   ->connect_info([ sub { DBI->connect(...) } ]);
506
507   # Connect via subref in hashref
508   ->connect_info([{
509     dbh_maker => sub { DBI->connect(...) },
510     on_connect_do => 'alter session ...',
511   }]);
512
513   # A bit more complicated
514   ->connect_info(
515     [
516       'dbi:Pg:dbname=foo',
517       'postgres',
518       'my_pg_password',
519       { AutoCommit => 1 },
520       { quote_char => q{"} },
521     ]
522   );
523
524   # Equivalent to the previous example
525   ->connect_info(
526     [
527       'dbi:Pg:dbname=foo',
528       'postgres',
529       'my_pg_password',
530       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
531     ]
532   );
533
534   # Same, but with hashref as argument
535   # See parse_connect_info for explanation
536   ->connect_info(
537     [{
538       dsn         => 'dbi:Pg:dbname=foo',
539       user        => 'postgres',
540       password    => 'my_pg_password',
541       AutoCommit  => 1,
542       quote_char  => q{"},
543       name_sep    => q{.},
544     }]
545   );
546
547   # Subref + DBIx::Class-specific connection options
548   ->connect_info(
549     [
550       sub { DBI->connect(...) },
551       {
552           quote_char => q{`},
553           name_sep => q{@},
554           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
555           disable_sth_caching => 1,
556       },
557     ]
558   );
559
560
561
562 =cut
563
564 sub connect_info {
565   my ($self, $info) = @_;
566
567   return $self->_connect_info if !$info;
568
569   $self->_connect_info($info); # copy for _connect_info
570
571   $info = $self->_normalize_connect_info($info)
572     if ref $info eq 'ARRAY';
573
574   for my $storage_opt (keys %{ $info->{storage_options} }) {
575     my $value = $info->{storage_options}{$storage_opt};
576
577     $self->$storage_opt($value);
578   }
579
580   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
581   #  the new set of options
582   $self->_sql_maker(undef);
583   $self->_sql_maker_opts({});
584
585   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
586     my $value = $info->{sql_maker_options}{$sql_maker_opt};
587
588     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
589   }
590
591   my %attrs = (
592     %{ $self->_default_dbi_connect_attributes || {} },
593     %{ $info->{attributes} || {} },
594   );
595
596   my @args = @{ $info->{arguments} };
597
598   if (keys %attrs and ref $args[0] ne 'CODE') {
599     carp
600         'You provided explicit AutoCommit => 0 in your connection_info. '
601       . 'This is almost universally a bad idea (see the footnotes of '
602       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
603       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
604       . 'this warning.'
605       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
606
607     push @args, \%attrs if keys %attrs;
608   }
609   $self->_dbi_connect_info(\@args);
610
611   # FIXME - dirty:
612   # save attributes them in a separate accessor so they are always
613   # introspectable, even in case of a CODE $dbhmaker
614   $self->_dbic_connect_attributes (\%attrs);
615
616   return $self->_connect_info;
617 }
618
619 sub _normalize_connect_info {
620   my ($self, $info_arg) = @_;
621   my %info;
622
623   my @args = @$info_arg;  # take a shallow copy for further mutilation
624
625   # combine/pre-parse arguments depending on invocation style
626
627   my %attrs;
628   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
629     %attrs = %{ $args[1] || {} };
630     @args = $args[0];
631   }
632   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
633     %attrs = %{$args[0]};
634     @args = ();
635     if (my $code = delete $attrs{dbh_maker}) {
636       @args = $code;
637
638       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
639       if (@ignored) {
640         carp sprintf (
641             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
642           . "to the result of 'dbh_maker'",
643
644           join (', ', map { "'$_'" } (@ignored) ),
645         );
646       }
647     }
648     else {
649       @args = delete @attrs{qw/dsn user password/};
650     }
651   }
652   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
653     %attrs = (
654       % { $args[3] || {} },
655       % { $args[4] || {} },
656     );
657     @args = @args[0,1,2];
658   }
659
660   $info{arguments} = \@args;
661
662   my @storage_opts = grep exists $attrs{$_},
663     @storage_options, 'cursor_class';
664
665   @{ $info{storage_options} }{@storage_opts} =
666     delete @attrs{@storage_opts} if @storage_opts;
667
668   my @sql_maker_opts = grep exists $attrs{$_},
669     qw/limit_dialect quote_char name_sep/;
670
671   @{ $info{sql_maker_options} }{@sql_maker_opts} =
672     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
673
674   $info{attributes} = \%attrs if %attrs;
675
676   return \%info;
677 }
678
679 sub _default_dbi_connect_attributes () {
680   +{
681     AutoCommit => 1,
682     PrintError => 0,
683     RaiseError => 1,
684     ShowErrorStatement => 1,
685   };
686 }
687
688 =head2 on_connect_do
689
690 This method is deprecated in favour of setting via L</connect_info>.
691
692 =cut
693
694 =head2 on_disconnect_do
695
696 This method is deprecated in favour of setting via L</connect_info>.
697
698 =cut
699
700 sub _parse_connect_do {
701   my ($self, $type) = @_;
702
703   my $val = $self->$type;
704   return () if not defined $val;
705
706   my @res;
707
708   if (not ref($val)) {
709     push @res, [ 'do_sql', $val ];
710   } elsif (ref($val) eq 'CODE') {
711     push @res, $val;
712   } elsif (ref($val) eq 'ARRAY') {
713     push @res, map { [ 'do_sql', $_ ] } @$val;
714   } else {
715     $self->throw_exception("Invalid type for $type: ".ref($val));
716   }
717
718   return \@res;
719 }
720
721 =head2 dbh_do
722
723 Arguments: ($subref | $method_name), @extra_coderef_args?
724
725 Execute the given $subref or $method_name using the new exception-based
726 connection management.
727
728 The first two arguments will be the storage object that C<dbh_do> was called
729 on and a database handle to use.  Any additional arguments will be passed
730 verbatim to the called subref as arguments 2 and onwards.
731
732 Using this (instead of $self->_dbh or $self->dbh) ensures correct
733 exception handling and reconnection (or failover in future subclasses).
734
735 Your subref should have no side-effects outside of the database, as
736 there is the potential for your subref to be partially double-executed
737 if the database connection was stale/dysfunctional.
738
739 Example:
740
741   my @stuff = $schema->storage->dbh_do(
742     sub {
743       my ($storage, $dbh, @cols) = @_;
744       my $cols = join(q{, }, @cols);
745       $dbh->selectrow_array("SELECT $cols FROM foo");
746     },
747     @column_list
748   );
749
750 =cut
751
752 sub dbh_do {
753   my $self = shift;
754   my $code = shift;
755
756   my $dbh = $self->_get_dbh;
757
758   return $self->$code($dbh, @_)
759     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
760
761   local $self->{_in_dbh_do} = 1;
762
763   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
764   my $args = \@_;
765   return try {
766     $self->$code ($dbh, @$args);
767   } catch {
768     $self->throw_exception($_) if $self->connected;
769
770     # We were not connected - reconnect and retry, but let any
771     #  exception fall right through this time
772     carp "Retrying $code after catching disconnected exception: $_"
773       if $ENV{DBIC_DBIRETRY_DEBUG};
774
775     $self->_populate_dbh;
776     $self->$code($self->_dbh, @$args);
777   };
778 }
779
780 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
781 # It also informs dbh_do to bypass itself while under the direction of txn_do,
782 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
783 sub txn_do {
784   my $self = shift;
785   my $coderef = shift;
786
787   ref $coderef eq 'CODE' or $self->throw_exception
788     ('$coderef must be a CODE reference');
789
790   local $self->{_in_dbh_do} = 1;
791
792   my @result;
793   my $want = wantarray;
794
795   my $tried = 0;
796   while(1) {
797     my $exception;
798
799     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
800     my $args = \@_;
801
802     try {
803       $self->txn_begin;
804       my $txn_start_depth = $self->transaction_depth;
805       if($want) {
806           @result = $coderef->(@$args);
807       }
808       elsif(defined $want) {
809           $result[0] = $coderef->(@$args);
810       }
811       else {
812           $coderef->(@$args);
813       }
814
815       my $delta_txn = $txn_start_depth - $self->transaction_depth;
816       if ($delta_txn == 0) {
817         $self->txn_commit;
818       }
819       elsif ($delta_txn != 1) {
820         # an off-by-one would mean we fired a rollback
821         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
822       }
823     } catch {
824       $exception = $_;
825     };
826
827     if(! defined $exception) { return wantarray ? @result : $result[0] }
828
829     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
830       my $rollback_exception;
831       try { $self->txn_rollback } catch { $rollback_exception = shift };
832       if(defined $rollback_exception) {
833         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
834         $self->throw_exception($exception)  # propagate nested rollback
835           if $rollback_exception =~ /$exception_class/;
836
837         $self->throw_exception(
838           "Transaction aborted: ${exception}. "
839           . "Rollback failed: ${rollback_exception}"
840         );
841       }
842       $self->throw_exception($exception)
843     }
844
845     # We were not connected, and was first try - reconnect and retry
846     # via the while loop
847     carp "Retrying $coderef after catching disconnected exception: $exception"
848       if $ENV{DBIC_TXNRETRY_DEBUG};
849     $self->_populate_dbh;
850   }
851 }
852
853 =head2 disconnect
854
855 Our C<disconnect> method also performs a rollback first if the
856 database is not in C<AutoCommit> mode.
857
858 =cut
859
860 sub disconnect {
861   my ($self) = @_;
862
863   if( $self->_dbh ) {
864     my @actions;
865
866     push @actions, ( $self->on_disconnect_call || () );
867     push @actions, $self->_parse_connect_do ('on_disconnect_do');
868
869     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
870
871     $self->_dbh_rollback unless $self->_dbh_autocommit;
872
873     %{ $self->_dbh->{CachedKids} } = ();
874     $self->_dbh->disconnect;
875     $self->_dbh(undef);
876     $self->{_dbh_gen}++;
877   }
878 }
879
880 =head2 with_deferred_fk_checks
881
882 =over 4
883
884 =item Arguments: C<$coderef>
885
886 =item Return Value: The return value of $coderef
887
888 =back
889
890 Storage specific method to run the code ref with FK checks deferred or
891 in MySQL's case disabled entirely.
892
893 =cut
894
895 # Storage subclasses should override this
896 sub with_deferred_fk_checks {
897   my ($self, $sub) = @_;
898   $sub->();
899 }
900
901 =head2 connected
902
903 =over
904
905 =item Arguments: none
906
907 =item Return Value: 1|0
908
909 =back
910
911 Verifies that the current database handle is active and ready to execute
912 an SQL statement (e.g. the connection did not get stale, server is still
913 answering, etc.) This method is used internally by L</dbh>.
914
915 =cut
916
917 sub connected {
918   my $self = shift;
919   return 0 unless $self->_seems_connected;
920
921   #be on the safe side
922   local $self->_dbh->{RaiseError} = 1;
923
924   return $self->_ping;
925 }
926
927 sub _seems_connected {
928   my $self = shift;
929
930   $self->_verify_pid;
931
932   my $dbh = $self->_dbh
933     or return 0;
934
935   return $dbh->FETCH('Active');
936 }
937
938 sub _ping {
939   my $self = shift;
940
941   my $dbh = $self->_dbh or return 0;
942
943   return $dbh->ping;
944 }
945
946 sub ensure_connected {
947   my ($self) = @_;
948
949   unless ($self->connected) {
950     $self->_populate_dbh;
951   }
952 }
953
954 =head2 dbh
955
956 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
957 is guaranteed to be healthy by implicitly calling L</connected>, and if
958 necessary performing a reconnection before returning. Keep in mind that this
959 is very B<expensive> on some database engines. Consider using L</dbh_do>
960 instead.
961
962 =cut
963
964 sub dbh {
965   my ($self) = @_;
966
967   if (not $self->_dbh) {
968     $self->_populate_dbh;
969   } else {
970     $self->ensure_connected;
971   }
972   return $self->_dbh;
973 }
974
975 # this is the internal "get dbh or connect (don't check)" method
976 sub _get_dbh {
977   my $self = shift;
978   $self->_verify_pid;
979   $self->_populate_dbh unless $self->_dbh;
980   return $self->_dbh;
981 }
982
983 sub sql_maker {
984   my ($self) = @_;
985   unless ($self->_sql_maker) {
986     my $sql_maker_class = $self->sql_maker_class;
987     $self->ensure_class_loaded ($sql_maker_class);
988
989     my %opts = %{$self->_sql_maker_opts||{}};
990     my $dialect =
991       $opts{limit_dialect}
992         ||
993       $self->sql_limit_dialect
994         ||
995       do {
996         my $s_class = (ref $self) || $self;
997         carp (
998           "Your storage class ($s_class) does not set sql_limit_dialect and you "
999         . 'have not supplied an explicit limit_dialect in your connection_info. '
1000         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1001         . 'databases but can be (and often is) painfully slow.'
1002         );
1003
1004         'GenericSubQ';
1005       }
1006     ;
1007
1008     $self->_sql_maker($sql_maker_class->new(
1009       bindtype=>'columns',
1010       array_datatypes => 1,
1011       limit_dialect => $dialect,
1012       name_sep => '.',
1013       %opts,
1014     ));
1015   }
1016   return $self->_sql_maker;
1017 }
1018
1019 # nothing to do by default
1020 sub _rebless {}
1021 sub _init {}
1022
1023 sub _populate_dbh {
1024   my ($self) = @_;
1025
1026   my @info = @{$self->_dbi_connect_info || []};
1027   $self->_dbh(undef); # in case ->connected failed we might get sent here
1028   $self->_dbh_details({}); # reset everything we know
1029
1030   $self->_dbh($self->_connect(@info));
1031
1032   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1033
1034   $self->_determine_driver;
1035
1036   # Always set the transaction depth on connect, since
1037   #  there is no transaction in progress by definition
1038   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1039
1040   $self->_run_connection_actions unless $self->{_in_determine_driver};
1041 }
1042
1043 sub _run_connection_actions {
1044   my $self = shift;
1045   my @actions;
1046
1047   push @actions, ( $self->on_connect_call || () );
1048   push @actions, $self->_parse_connect_do ('on_connect_do');
1049
1050   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1051 }
1052
1053
1054
1055 sub set_use_dbms_capability {
1056   $_[0]->set_inherited ($_[1], $_[2]);
1057 }
1058
1059 sub get_use_dbms_capability {
1060   my ($self, $capname) = @_;
1061
1062   my $use = $self->get_inherited ($capname);
1063   return defined $use
1064     ? $use
1065     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1066   ;
1067 }
1068
1069 sub set_dbms_capability {
1070   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1071 }
1072
1073 sub get_dbms_capability {
1074   my ($self, $capname) = @_;
1075
1076   my $cap = $self->_dbh_details->{capability}{$capname};
1077
1078   unless (defined $cap) {
1079     if (my $meth = $self->can ("_determine$capname")) {
1080       $cap = $self->$meth ? 1 : 0;
1081     }
1082     else {
1083       $cap = 0;
1084     }
1085
1086     $self->set_dbms_capability ($capname, $cap);
1087   }
1088
1089   return $cap;
1090 }
1091
1092 sub _server_info {
1093   my $self = shift;
1094
1095   my $info;
1096   unless ($info = $self->_dbh_details->{info}) {
1097
1098     $info = {};
1099
1100     my $server_version = try { $self->_get_server_version };
1101
1102     if (defined $server_version) {
1103       $info->{dbms_version} = $server_version;
1104
1105       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1106       my @verparts = split (/\./, $numeric_version);
1107       if (
1108         @verparts
1109           &&
1110         $verparts[0] <= 999
1111       ) {
1112         # consider only up to 3 version parts, iff not more than 3 digits
1113         my @use_parts;
1114         while (@verparts && @use_parts < 3) {
1115           my $p = shift @verparts;
1116           last if $p > 999;
1117           push @use_parts, $p;
1118         }
1119         push @use_parts, 0 while @use_parts < 3;
1120
1121         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1122       }
1123     }
1124
1125     $self->_dbh_details->{info} = $info;
1126   }
1127
1128   return $info;
1129 }
1130
1131 sub _get_server_version {
1132   shift->_get_dbh->get_info(18);
1133 }
1134
1135 sub _determine_driver {
1136   my ($self) = @_;
1137
1138   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1139     my $started_connected = 0;
1140     local $self->{_in_determine_driver} = 1;
1141
1142     if (ref($self) eq __PACKAGE__) {
1143       my $driver;
1144       if ($self->_dbh) { # we are connected
1145         $driver = $self->_dbh->{Driver}{Name};
1146         $started_connected = 1;
1147       } else {
1148         # if connect_info is a CODEREF, we have no choice but to connect
1149         if (ref $self->_dbi_connect_info->[0] &&
1150             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1151           $self->_populate_dbh;
1152           $driver = $self->_dbh->{Driver}{Name};
1153         }
1154         else {
1155           # try to use dsn to not require being connected, the driver may still
1156           # force a connection in _rebless to determine version
1157           # (dsn may not be supplied at all if all we do is make a mock-schema)
1158           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1159           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1160           $driver ||= $ENV{DBI_DRIVER};
1161         }
1162       }
1163
1164       if ($driver) {
1165         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1166         if ($self->load_optional_class($storage_class)) {
1167           mro::set_mro($storage_class, 'c3');
1168           bless $self, $storage_class;
1169           $self->_rebless();
1170         }
1171       }
1172     }
1173
1174     $self->_driver_determined(1);
1175
1176     $self->_init; # run driver-specific initializations
1177
1178     $self->_run_connection_actions
1179         if !$started_connected && defined $self->_dbh;
1180   }
1181 }
1182
1183 sub _do_connection_actions {
1184   my $self          = shift;
1185   my $method_prefix = shift;
1186   my $call          = shift;
1187
1188   if (not ref($call)) {
1189     my $method = $method_prefix . $call;
1190     $self->$method(@_);
1191   } elsif (ref($call) eq 'CODE') {
1192     $self->$call(@_);
1193   } elsif (ref($call) eq 'ARRAY') {
1194     if (ref($call->[0]) ne 'ARRAY') {
1195       $self->_do_connection_actions($method_prefix, $_) for @$call;
1196     } else {
1197       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1198     }
1199   } else {
1200     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1201   }
1202
1203   return $self;
1204 }
1205
1206 sub connect_call_do_sql {
1207   my $self = shift;
1208   $self->_do_query(@_);
1209 }
1210
1211 sub disconnect_call_do_sql {
1212   my $self = shift;
1213   $self->_do_query(@_);
1214 }
1215
1216 # override in db-specific backend when necessary
1217 sub connect_call_datetime_setup { 1 }
1218
1219 sub _do_query {
1220   my ($self, $action) = @_;
1221
1222   if (ref $action eq 'CODE') {
1223     $action = $action->($self);
1224     $self->_do_query($_) foreach @$action;
1225   }
1226   else {
1227     # Most debuggers expect ($sql, @bind), so we need to exclude
1228     # the attribute hash which is the second argument to $dbh->do
1229     # furthermore the bind values are usually to be presented
1230     # as named arrayref pairs, so wrap those here too
1231     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1232     my $sql = shift @do_args;
1233     my $attrs = shift @do_args;
1234     my @bind = map { [ undef, $_ ] } @do_args;
1235
1236     $self->_query_start($sql, @bind);
1237     $self->_get_dbh->do($sql, $attrs, @do_args);
1238     $self->_query_end($sql, @bind);
1239   }
1240
1241   return $self;
1242 }
1243
1244 sub _connect {
1245   my ($self, @info) = @_;
1246
1247   $self->throw_exception("You failed to provide any connection info")
1248     if !@info;
1249
1250   my ($old_connect_via, $dbh);
1251
1252   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1253     $old_connect_via = $DBI::connect_via;
1254     $DBI::connect_via = 'connect';
1255   }
1256
1257   try {
1258     if(ref $info[0] eq 'CODE') {
1259        $dbh = $info[0]->();
1260     }
1261     else {
1262        $dbh = DBI->connect(@info);
1263     }
1264
1265     if (!$dbh) {
1266       die $DBI::errstr;
1267     }
1268
1269     unless ($self->unsafe) {
1270
1271       $self->throw_exception(
1272         'Refusing clobbering of {HandleError} installed on externally supplied '
1273        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1274       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1275
1276       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1277       # request, or an external handle. Complain and set anyway
1278       unless ($dbh->{RaiseError}) {
1279         carp( ref $info[0] eq 'CODE'
1280
1281           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1282            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1283            .'attribute has been supplied'
1284
1285           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1286            .'unsafe => 1. Toggling RaiseError back to true'
1287         );
1288
1289         $dbh->{RaiseError} = 1;
1290       }
1291
1292       # this odd anonymous coderef dereference is in fact really
1293       # necessary to avoid the unwanted effect described in perl5
1294       # RT#75792
1295       sub {
1296         my $weak_self = $_[0];
1297         weaken $weak_self;
1298
1299         # the coderef is blessed so we can distinguish it from externally
1300         # supplied handles (which must be preserved)
1301         $_[1]->{HandleError} = bless sub {
1302           if ($weak_self) {
1303             $weak_self->throw_exception("DBI Exception: $_[0]");
1304           }
1305           else {
1306             # the handler may be invoked by something totally out of
1307             # the scope of DBIC
1308             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1309           }
1310         }, '__DBIC__DBH__ERROR__HANDLER__';
1311       }->($self, $dbh);
1312     }
1313   }
1314   catch {
1315     $self->throw_exception("DBI Connection failed: $_")
1316   }
1317   finally {
1318     $DBI::connect_via = $old_connect_via if $old_connect_via;
1319   };
1320
1321   $self->_dbh_autocommit($dbh->{AutoCommit});
1322   $dbh;
1323 }
1324
1325 sub svp_begin {
1326   my ($self, $name) = @_;
1327
1328   $name = $self->_svp_generate_name
1329     unless defined $name;
1330
1331   $self->throw_exception ("You can't use savepoints outside a transaction")
1332     if $self->{transaction_depth} == 0;
1333
1334   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1335     unless $self->can('_svp_begin');
1336
1337   push @{ $self->{savepoints} }, $name;
1338
1339   $self->debugobj->svp_begin($name) if $self->debug;
1340
1341   return $self->_svp_begin($name);
1342 }
1343
1344 sub svp_release {
1345   my ($self, $name) = @_;
1346
1347   $self->throw_exception ("You can't use savepoints outside a transaction")
1348     if $self->{transaction_depth} == 0;
1349
1350   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1351     unless $self->can('_svp_release');
1352
1353   if (defined $name) {
1354     $self->throw_exception ("Savepoint '$name' does not exist")
1355       unless grep { $_ eq $name } @{ $self->{savepoints} };
1356
1357     # Dig through the stack until we find the one we are releasing.  This keeps
1358     # the stack up to date.
1359     my $svp;
1360
1361     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1362   } else {
1363     $name = pop @{ $self->{savepoints} };
1364   }
1365
1366   $self->debugobj->svp_release($name) if $self->debug;
1367
1368   return $self->_svp_release($name);
1369 }
1370
1371 sub svp_rollback {
1372   my ($self, $name) = @_;
1373
1374   $self->throw_exception ("You can't use savepoints outside a transaction")
1375     if $self->{transaction_depth} == 0;
1376
1377   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1378     unless $self->can('_svp_rollback');
1379
1380   if (defined $name) {
1381       # If they passed us a name, verify that it exists in the stack
1382       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1383           $self->throw_exception("Savepoint '$name' does not exist!");
1384       }
1385
1386       # Dig through the stack until we find the one we are releasing.  This keeps
1387       # the stack up to date.
1388       while(my $s = pop(@{ $self->{savepoints} })) {
1389           last if($s eq $name);
1390       }
1391       # Add the savepoint back to the stack, as a rollback doesn't remove the
1392       # named savepoint, only everything after it.
1393       push(@{ $self->{savepoints} }, $name);
1394   } else {
1395       # We'll assume they want to rollback to the last savepoint
1396       $name = $self->{savepoints}->[-1];
1397   }
1398
1399   $self->debugobj->svp_rollback($name) if $self->debug;
1400
1401   return $self->_svp_rollback($name);
1402 }
1403
1404 sub _svp_generate_name {
1405   my ($self) = @_;
1406   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1407 }
1408
1409 sub txn_begin {
1410   my $self = shift;
1411
1412   # this means we have not yet connected and do not know the AC status
1413   # (e.g. coderef $dbh)
1414   if (! defined $self->_dbh_autocommit) {
1415     $self->ensure_connected;
1416   }
1417   # otherwise re-connect on pid changes, so
1418   # that the txn_depth is adjusted properly
1419   # the lightweight _get_dbh is good enoug here
1420   # (only superficial handle check, no pings)
1421   else {
1422     $self->_get_dbh;
1423   }
1424
1425   if($self->transaction_depth == 0) {
1426     $self->debugobj->txn_begin()
1427       if $self->debug;
1428     $self->_dbh_begin_work;
1429   }
1430   elsif ($self->auto_savepoint) {
1431     $self->svp_begin;
1432   }
1433   $self->{transaction_depth}++;
1434 }
1435
1436 sub _dbh_begin_work {
1437   my $self = shift;
1438
1439   # if the user is utilizing txn_do - good for him, otherwise we need to
1440   # ensure that the $dbh is healthy on BEGIN.
1441   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1442   # will be replaced by a failure of begin_work itself (which will be
1443   # then retried on reconnect)
1444   if ($self->{_in_dbh_do}) {
1445     $self->_dbh->begin_work;
1446   } else {
1447     $self->dbh_do(sub { $_[1]->begin_work });
1448   }
1449 }
1450
1451 sub txn_commit {
1452   my $self = shift;
1453   if (! $self->_dbh) {
1454     $self->throw_exception('cannot COMMIT on a disconnected handle');
1455   }
1456   elsif ($self->{transaction_depth} == 1) {
1457     $self->debugobj->txn_commit()
1458       if ($self->debug);
1459     $self->_dbh_commit;
1460     $self->{transaction_depth} = 0
1461       if $self->_dbh_autocommit;
1462   }
1463   elsif($self->{transaction_depth} > 1) {
1464     $self->{transaction_depth}--;
1465     $self->svp_release
1466       if $self->auto_savepoint;
1467   }
1468   elsif (! $self->_dbh->FETCH('AutoCommit') ) {
1469
1470     carp "Storage transaction_depth $self->{transaction_depth} does not match "
1471         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1472
1473     $self->debugobj->txn_commit()
1474       if ($self->debug);
1475     $self->_dbh_commit;
1476     $self->{transaction_depth} = 0
1477       if $self->_dbh_autocommit;
1478   }
1479   else {
1480     $self->throw_exception( 'Refusing to commit without a started transaction' );
1481   }
1482 }
1483
1484 sub _dbh_commit {
1485   my $self = shift;
1486   my $dbh  = $self->_dbh
1487     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1488   $dbh->commit;
1489 }
1490
1491 sub txn_rollback {
1492   my $self = shift;
1493   my $dbh = $self->_dbh;
1494   try {
1495     if ($self->{transaction_depth} == 1) {
1496       $self->debugobj->txn_rollback()
1497         if ($self->debug);
1498       $self->{transaction_depth} = 0
1499         if $self->_dbh_autocommit;
1500       $self->_dbh_rollback;
1501     }
1502     elsif($self->{transaction_depth} > 1) {
1503       $self->{transaction_depth}--;
1504       if ($self->auto_savepoint) {
1505         $self->svp_rollback;
1506         $self->svp_release;
1507       }
1508     }
1509     else {
1510       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1511     }
1512   }
1513   catch {
1514     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1515
1516     if ($_ !~ /$exception_class/) {
1517       # ensure that a failed rollback resets the transaction depth
1518       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1519     }
1520
1521     $self->throw_exception($_)
1522   };
1523 }
1524
1525 sub _dbh_rollback {
1526   my $self = shift;
1527   my $dbh  = $self->_dbh
1528     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1529   $dbh->rollback;
1530 }
1531
1532 # This used to be the top-half of _execute.  It was split out to make it
1533 #  easier to override in NoBindVars without duping the rest.  It takes up
1534 #  all of _execute's args, and emits $sql, @bind.
1535 sub _prep_for_execute {
1536   my ($self, $op, $extra_bind, $ident, $args) = @_;
1537
1538   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1539     $ident = $ident->from();
1540   }
1541
1542   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1543
1544   unshift(@bind,
1545     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1546       if $extra_bind;
1547   return ($sql, \@bind);
1548 }
1549
1550
1551 sub _fix_bind_params {
1552     my ($self, @bind) = @_;
1553
1554     ### Turn @bind from something like this:
1555     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1556     ### to this:
1557     ###   ( "'1'", "'1'", "'3'" )
1558     return
1559         map {
1560             if ( defined( $_ && $_->[1] ) ) {
1561                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1562             }
1563             else { q{NULL}; }
1564         } @bind;
1565 }
1566
1567 sub _query_start {
1568     my ( $self, $sql, @bind ) = @_;
1569
1570     if ( $self->debug ) {
1571         @bind = $self->_fix_bind_params(@bind);
1572
1573         $self->debugobj->query_start( $sql, @bind );
1574     }
1575 }
1576
1577 sub _query_end {
1578     my ( $self, $sql, @bind ) = @_;
1579
1580     if ( $self->debug ) {
1581         @bind = $self->_fix_bind_params(@bind);
1582         $self->debugobj->query_end( $sql, @bind );
1583     }
1584 }
1585
1586 sub _dbh_execute {
1587   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1588
1589   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1590
1591   $self->_query_start( $sql, @$bind );
1592
1593   my $sth = $self->sth($sql,$op);
1594
1595   my $placeholder_index = 1;
1596
1597   foreach my $bound (@$bind) {
1598     my $attributes = {};
1599     my($column_name, @data) = @$bound;
1600
1601     if ($bind_attributes) {
1602       $attributes = $bind_attributes->{$column_name}
1603       if defined $bind_attributes->{$column_name};
1604     }
1605
1606     foreach my $data (@data) {
1607       my $ref = ref $data;
1608
1609       if ($ref and overload::Method($data, '""') ) {
1610         $data = "$data";
1611       }
1612       elsif ($ref eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1613         $sth->bind_param_inout(
1614           $placeholder_index++,
1615           $data,
1616           $self->_max_column_bytesize($ident, $column_name),
1617           $attributes
1618         );
1619         next;
1620       }
1621
1622       $sth->bind_param($placeholder_index++, $data, $attributes);
1623     }
1624   }
1625
1626   # Can this fail without throwing an exception anyways???
1627   my $rv = $sth->execute();
1628   $self->throw_exception(
1629     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1630   ) if !$rv;
1631
1632   $self->_query_end( $sql, @$bind );
1633
1634   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1635 }
1636
1637 sub _execute {
1638     my $self = shift;
1639     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1640 }
1641
1642 sub _prefetch_autovalues {
1643   my ($self, $source, $to_insert) = @_;
1644
1645   my $colinfo = $source->columns_info;
1646
1647   my %values;
1648   for my $col (keys %$colinfo) {
1649     if (
1650       $colinfo->{$col}{auto_nextval}
1651         and
1652       (
1653         ! exists $to_insert->{$col}
1654           or
1655         ref $to_insert->{$col} eq 'SCALAR'
1656       )
1657     ) {
1658       $values{$col} = $self->_sequence_fetch(
1659         'NEXTVAL',
1660         ( $colinfo->{$col}{sequence} ||=
1661             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1662         ),
1663       );
1664     }
1665   }
1666
1667   \%values;
1668 }
1669
1670 sub insert {
1671   my ($self, $source, $to_insert) = @_;
1672
1673   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1674
1675   # fuse the values
1676   $to_insert = { %$to_insert, %$prefetched_values };
1677
1678   # list of primary keys we try to fetch from the database
1679   # both not-exsists and scalarrefs are considered
1680   my %fetch_pks;
1681   for ($source->primary_columns) {
1682     $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
1683       if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
1684   }
1685
1686   my ($sqla_opts, @ir_container);
1687   if ($self->_use_insert_returning) {
1688
1689     # retain order as declared in the resultsource
1690     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1691       push @{$sqla_opts->{returning}}, $_;
1692       $sqla_opts->{returning_container} = \@ir_container
1693         if $self->_use_insert_returning_bound;
1694     }
1695   }
1696
1697   my $bind_attributes = $self->source_bind_attributes($source);
1698
1699   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1700
1701   my %returned_cols;
1702
1703   if (my $retlist = $sqla_opts->{returning}) {
1704     @ir_container = try {
1705       local $SIG{__WARN__} = sub {};
1706       my @r = $sth->fetchrow_array;
1707       $sth->finish;
1708       @r;
1709     } unless @ir_container;
1710
1711     @returned_cols{@$retlist} = @ir_container if @ir_container;
1712   }
1713
1714   return { %$prefetched_values, %returned_cols };
1715 }
1716
1717
1718 ## Currently it is assumed that all values passed will be "normal", i.e. not
1719 ## scalar refs, or at least, all the same type as the first set, the statement is
1720 ## only prepped once.
1721 sub insert_bulk {
1722   my ($self, $source, $cols, $data) = @_;
1723
1724   my %colvalues;
1725   @colvalues{@$cols} = (0..$#$cols);
1726
1727   for my $i (0..$#$cols) {
1728     my $first_val = $data->[0][$i];
1729     next unless ref $first_val eq 'SCALAR';
1730
1731     $colvalues{ $cols->[$i] } = $first_val;
1732   }
1733
1734   # check for bad data and stringify stringifiable objects
1735   my $bad_slice = sub {
1736     my ($msg, $col_idx, $slice_idx) = @_;
1737     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1738       $msg,
1739       $cols->[$col_idx],
1740       do {
1741         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1742         Dumper {
1743           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1744         },
1745       }
1746     );
1747   };
1748
1749   for my $datum_idx (0..$#$data) {
1750     my $datum = $data->[$datum_idx];
1751
1752     for my $col_idx (0..$#$cols) {
1753       my $val            = $datum->[$col_idx];
1754       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1755       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1756
1757       if ($is_literal_sql) {
1758         if (not ref $val) {
1759           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1760         }
1761         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1762           $bad_slice->("$reftype reference found where literal SQL expected",
1763             $col_idx, $datum_idx);
1764         }
1765         elsif ($$val ne $$sqla_bind){
1766           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1767             $col_idx, $datum_idx);
1768         }
1769       }
1770       elsif (my $reftype = ref $val) {
1771         require overload;
1772         if (overload::Method($val, '""')) {
1773           $datum->[$col_idx] = "".$val;
1774         }
1775         else {
1776           $bad_slice->("$reftype reference found where bind expected",
1777             $col_idx, $datum_idx);
1778         }
1779       }
1780     }
1781   }
1782
1783   my ($sql, $bind) = $self->_prep_for_execute (
1784     'insert', undef, $source, [\%colvalues]
1785   );
1786
1787   if (! @$bind) {
1788     # if the bindlist is empty - make sure all "values" are in fact
1789     # literal scalarrefs. If not the case this means the storage ate
1790     # them away (e.g. the NoBindVars component) and interpolated them
1791     # directly into the SQL. This obviosly can't be good for multi-inserts
1792
1793     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1794       if first { ref $_ ne 'SCALAR' } values %colvalues;
1795   }
1796
1797   # neither _execute_array, nor _execute_inserts_with_no_binds are
1798   # atomic (even if _execute _array is a single call). Thus a safety
1799   # scope guard
1800   my $guard = $self->txn_scope_guard;
1801
1802   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1803   my $sth = $self->sth($sql);
1804   my $rv = do {
1805     if (@$bind) {
1806       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1807       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1808     }
1809     else {
1810       # bind_param_array doesn't work if there are no binds
1811       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1812     }
1813   };
1814
1815   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1816
1817   $guard->commit;
1818
1819   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1820 }
1821
1822 sub _execute_array {
1823   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1824
1825   ## This must be an arrayref, else nothing works!
1826   my $tuple_status = [];
1827
1828   ## Get the bind_attributes, if any exist
1829   my $bind_attributes = $self->source_bind_attributes($source);
1830
1831   ## Bind the values and execute
1832   my $placeholder_index = 1;
1833
1834   foreach my $bound (@$bind) {
1835
1836     my $attributes = {};
1837     my ($column_name, $data_index) = @$bound;
1838
1839     if( $bind_attributes ) {
1840       $attributes = $bind_attributes->{$column_name}
1841       if defined $bind_attributes->{$column_name};
1842     }
1843
1844     my @data = map { $_->[$data_index] } @$data;
1845
1846     $sth->bind_param_array(
1847       $placeholder_index,
1848       [@data],
1849       (%$attributes ?  $attributes : ()),
1850     );
1851     $placeholder_index++;
1852   }
1853
1854   my ($rv, $err);
1855   try {
1856     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1857   }
1858   catch {
1859     $err = shift;
1860   };
1861
1862   # Not all DBDs are create equal. Some throw on error, some return
1863   # an undef $rv, and some set $sth->err - try whatever we can
1864   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1865     ! defined $err
1866       and
1867     ( !defined $rv or $sth->err )
1868   );
1869
1870   # Statement must finish even if there was an exception.
1871   try {
1872     $sth->finish
1873   }
1874   catch {
1875     $err = shift unless defined $err
1876   };
1877
1878   if (defined $err) {
1879     my $i = 0;
1880     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1881
1882     $self->throw_exception("Unexpected populate error: $err")
1883       if ($i > $#$tuple_status);
1884
1885     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1886       ($tuple_status->[$i][1] || $err),
1887       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1888     );
1889   }
1890
1891   return $rv;
1892 }
1893
1894 sub _dbh_execute_array {
1895     my ($self, $sth, $tuple_status, @extra) = @_;
1896
1897     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1898 }
1899
1900 sub _dbh_execute_inserts_with_no_binds {
1901   my ($self, $sth, $count) = @_;
1902
1903   my $err;
1904   try {
1905     my $dbh = $self->_get_dbh;
1906     local $dbh->{RaiseError} = 1;
1907     local $dbh->{PrintError} = 0;
1908
1909     $sth->execute foreach 1..$count;
1910   }
1911   catch {
1912     $err = shift;
1913   };
1914
1915   # Make sure statement is finished even if there was an exception.
1916   try {
1917     $sth->finish
1918   }
1919   catch {
1920     $err = shift unless defined $err;
1921   };
1922
1923   $self->throw_exception($err) if defined $err;
1924
1925   return $count;
1926 }
1927
1928 sub update {
1929   my ($self, $source, @args) = @_;
1930
1931   my $bind_attrs = $self->source_bind_attributes($source);
1932
1933   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1934 }
1935
1936
1937 sub delete {
1938   my ($self, $source, @args) = @_;
1939
1940   my $bind_attrs = $self->source_bind_attributes($source);
1941
1942   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1943 }
1944
1945 # We were sent here because the $rs contains a complex search
1946 # which will require a subquery to select the correct rows
1947 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1948 #
1949 # Generating a single PK column subquery is trivial and supported
1950 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1951 # Look at _multipk_update_delete()
1952 sub _subq_update_delete {
1953   my $self = shift;
1954   my ($rs, $op, $values) = @_;
1955
1956   my $rsrc = $rs->result_source;
1957
1958   # quick check if we got a sane rs on our hands
1959   my @pcols = $rsrc->_pri_cols;
1960
1961   my $sel = $rs->_resolved_attrs->{select};
1962   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1963
1964   if (
1965       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1966         ne
1967       join ("\x00", sort @$sel )
1968   ) {
1969     $self->throw_exception (
1970       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1971     );
1972   }
1973
1974   if (@pcols == 1) {
1975     return $self->$op (
1976       $rsrc,
1977       $op eq 'update' ? $values : (),
1978       { $pcols[0] => { -in => $rs->as_query } },
1979     );
1980   }
1981
1982   else {
1983     return $self->_multipk_update_delete (@_);
1984   }
1985 }
1986
1987 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1988 # resultset update/delete involving subqueries. So by default resort
1989 # to simple (and inefficient) delete_all style per-row opearations,
1990 # while allowing specific storages to override this with a faster
1991 # implementation.
1992 #
1993 sub _multipk_update_delete {
1994   return shift->_per_row_update_delete (@_);
1995 }
1996
1997 # This is the default loop used to delete/update rows for multi PK
1998 # resultsets, and used by mysql exclusively (because it can't do anything
1999 # else).
2000 #
2001 # We do not use $row->$op style queries, because resultset update/delete
2002 # is not expected to cascade (this is what delete_all/update_all is for).
2003 #
2004 # There should be no race conditions as the entire operation is rolled
2005 # in a transaction.
2006 #
2007 sub _per_row_update_delete {
2008   my $self = shift;
2009   my ($rs, $op, $values) = @_;
2010
2011   my $rsrc = $rs->result_source;
2012   my @pcols = $rsrc->_pri_cols;
2013
2014   my $guard = $self->txn_scope_guard;
2015
2016   # emulate the return value of $sth->execute for non-selects
2017   my $row_cnt = '0E0';
2018
2019   my $subrs_cur = $rs->cursor;
2020   my @all_pk = $subrs_cur->all;
2021   for my $pks ( @all_pk) {
2022
2023     my $cond;
2024     for my $i (0.. $#pcols) {
2025       $cond->{$pcols[$i]} = $pks->[$i];
2026     }
2027
2028     $self->$op (
2029       $rsrc,
2030       $op eq 'update' ? $values : (),
2031       $cond,
2032     );
2033
2034     $row_cnt++;
2035   }
2036
2037   $guard->commit;
2038
2039   return $row_cnt;
2040 }
2041
2042 sub _select {
2043   my $self = shift;
2044   $self->_execute($self->_select_args(@_));
2045 }
2046
2047 sub _select_args_to_query {
2048   my $self = shift;
2049
2050   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
2051   #  = $self->_select_args($ident, $select, $cond, $attrs);
2052   my ($op, $bind, $ident, $bind_attrs, @args) =
2053     $self->_select_args(@_);
2054
2055   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2056   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
2057   $prepared_bind ||= [];
2058
2059   return wantarray
2060     ? ($sql, $prepared_bind, $bind_attrs)
2061     : \[ "($sql)", @$prepared_bind ]
2062   ;
2063 }
2064
2065 sub _select_args {
2066   my ($self, $ident, $select, $where, $attrs) = @_;
2067
2068   my $sql_maker = $self->sql_maker;
2069   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2070
2071   $attrs = {
2072     %$attrs,
2073     select => $select,
2074     from => $ident,
2075     where => $where,
2076     $rs_alias && $alias2source->{$rs_alias}
2077       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2078       : ()
2079     ,
2080   };
2081
2082   # calculate bind_attrs before possible $ident mangling
2083   my $bind_attrs = {};
2084   for my $alias (keys %$alias2source) {
2085     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2086     for my $col (keys %$bindtypes) {
2087
2088       my $fqcn = join ('.', $alias, $col);
2089       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2090
2091       # Unqialified column names are nice, but at the same time can be
2092       # rather ambiguous. What we do here is basically go along with
2093       # the loop, adding an unqualified column slot to $bind_attrs,
2094       # alongside the fully qualified name. As soon as we encounter
2095       # another column by that name (which would imply another table)
2096       # we unset the unqualified slot and never add any info to it
2097       # to avoid erroneous type binding. If this happens the users
2098       # only choice will be to fully qualify his column name
2099
2100       if (exists $bind_attrs->{$col}) {
2101         $bind_attrs->{$col} = {};
2102       }
2103       else {
2104         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2105       }
2106     }
2107   }
2108
2109   # Sanity check the attributes (SQLMaker does it too, but
2110   # in case of a software_limit we'll never reach there)
2111   if (defined $attrs->{offset}) {
2112     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2113       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2114   }
2115   $attrs->{offset} ||= 0;
2116
2117   if (defined $attrs->{rows}) {
2118     $self->throw_exception("The rows attribute must be a positive integer if present")
2119       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2120   }
2121   elsif ($attrs->{offset}) {
2122     # MySQL actually recommends this approach.  I cringe.
2123     $attrs->{rows} = $sql_maker->__max_int;
2124   }
2125
2126   my @limit;
2127
2128   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2129   # storage, unless software limit was requested
2130   if (
2131     #limited has_many
2132     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2133        ||
2134     # grouped prefetch (to satisfy group_by == select)
2135     ( $attrs->{group_by}
2136         &&
2137       @{$attrs->{group_by}}
2138         &&
2139       $attrs->{_prefetch_selector_range}
2140     )
2141   ) {
2142     ($ident, $select, $where, $attrs)
2143       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2144   }
2145   elsif (! $attrs->{software_limit} ) {
2146     push @limit, $attrs->{rows}, $attrs->{offset};
2147   }
2148
2149   # try to simplify the joinmap further (prune unreferenced type-single joins)
2150   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2151
2152 ###
2153   # This would be the point to deflate anything found in $where
2154   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2155   # expect a row object. And all we have is a resultsource (it is trivial
2156   # to extract deflator coderefs via $alias2source above).
2157   #
2158   # I don't see a way forward other than changing the way deflators are
2159   # invoked, and that's just bad...
2160 ###
2161
2162   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2163 }
2164
2165 # Returns a counting SELECT for a simple count
2166 # query. Abstracted so that a storage could override
2167 # this to { count => 'firstcol' } or whatever makes
2168 # sense as a performance optimization
2169 sub _count_select {
2170   #my ($self, $source, $rs_attrs) = @_;
2171   return { count => '*' };
2172 }
2173
2174
2175 sub source_bind_attributes {
2176   my ($self, $source) = @_;
2177
2178   my $bind_attributes;
2179
2180   my $colinfo = $source->columns_info;
2181
2182   for my $col (keys %$colinfo) {
2183     if (my $dt = $colinfo->{$col}{data_type} ) {
2184       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2185     }
2186   }
2187
2188   return $bind_attributes;
2189 }
2190
2191 =head2 select
2192
2193 =over 4
2194
2195 =item Arguments: $ident, $select, $condition, $attrs
2196
2197 =back
2198
2199 Handle a SQL select statement.
2200
2201 =cut
2202
2203 sub select {
2204   my $self = shift;
2205   my ($ident, $select, $condition, $attrs) = @_;
2206   return $self->cursor_class->new($self, \@_, $attrs);
2207 }
2208
2209 sub select_single {
2210   my $self = shift;
2211   my ($rv, $sth, @bind) = $self->_select(@_);
2212   my @row = $sth->fetchrow_array;
2213   my @nextrow = $sth->fetchrow_array if @row;
2214   if(@row && @nextrow) {
2215     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2216   }
2217   # Need to call finish() to work round broken DBDs
2218   $sth->finish();
2219   return @row;
2220 }
2221
2222 =head2 sql_limit_dialect
2223
2224 This is an accessor for the default SQL limit dialect used by a particular
2225 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2226 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2227 see L<DBIx::Class::SQLMaker::LimitDialects>.
2228
2229 =head2 sth
2230
2231 =over 4
2232
2233 =item Arguments: $sql
2234
2235 =back
2236
2237 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2238
2239 =cut
2240
2241 sub _dbh_sth {
2242   my ($self, $dbh, $sql) = @_;
2243
2244   # 3 is the if_active parameter which avoids active sth re-use
2245   my $sth = $self->disable_sth_caching
2246     ? $dbh->prepare($sql)
2247     : $dbh->prepare_cached($sql, {}, 3);
2248
2249   # XXX You would think RaiseError would make this impossible,
2250   #  but apparently that's not true :(
2251   $self->throw_exception($dbh->errstr) if !$sth;
2252
2253   $sth;
2254 }
2255
2256 sub sth {
2257   my ($self, $sql) = @_;
2258   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2259 }
2260
2261 sub _dbh_columns_info_for {
2262   my ($self, $dbh, $table) = @_;
2263
2264   if ($dbh->can('column_info')) {
2265     my %result;
2266     my $caught;
2267     try {
2268       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2269       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2270       $sth->execute();
2271       while ( my $info = $sth->fetchrow_hashref() ){
2272         my %column_info;
2273         $column_info{data_type}   = $info->{TYPE_NAME};
2274         $column_info{size}      = $info->{COLUMN_SIZE};
2275         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2276         $column_info{default_value} = $info->{COLUMN_DEF};
2277         my $col_name = $info->{COLUMN_NAME};
2278         $col_name =~ s/^\"(.*)\"$/$1/;
2279
2280         $result{$col_name} = \%column_info;
2281       }
2282     } catch {
2283       $caught = 1;
2284     };
2285     return \%result if !$caught && scalar keys %result;
2286   }
2287
2288   my %result;
2289   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2290   $sth->execute;
2291   my @columns = @{$sth->{NAME_lc}};
2292   for my $i ( 0 .. $#columns ){
2293     my %column_info;
2294     $column_info{data_type} = $sth->{TYPE}->[$i];
2295     $column_info{size} = $sth->{PRECISION}->[$i];
2296     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2297
2298     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2299       $column_info{data_type} = $1;
2300       $column_info{size}    = $2;
2301     }
2302
2303     $result{$columns[$i]} = \%column_info;
2304   }
2305   $sth->finish;
2306
2307   foreach my $col (keys %result) {
2308     my $colinfo = $result{$col};
2309     my $type_num = $colinfo->{data_type};
2310     my $type_name;
2311     if(defined $type_num && $dbh->can('type_info')) {
2312       my $type_info = $dbh->type_info($type_num);
2313       $type_name = $type_info->{TYPE_NAME} if $type_info;
2314       $colinfo->{data_type} = $type_name if $type_name;
2315     }
2316   }
2317
2318   return \%result;
2319 }
2320
2321 sub columns_info_for {
2322   my ($self, $table) = @_;
2323   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2324 }
2325
2326 =head2 last_insert_id
2327
2328 Return the row id of the last insert.
2329
2330 =cut
2331
2332 sub _dbh_last_insert_id {
2333     my ($self, $dbh, $source, $col) = @_;
2334
2335     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2336
2337     return $id if defined $id;
2338
2339     my $class = ref $self;
2340     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2341 }
2342
2343 sub last_insert_id {
2344   my $self = shift;
2345   $self->_dbh_last_insert_id ($self->_dbh, @_);
2346 }
2347
2348 =head2 _native_data_type
2349
2350 =over 4
2351
2352 =item Arguments: $type_name
2353
2354 =back
2355
2356 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2357 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2358 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2359
2360 The default implementation returns C<undef>, implement in your Storage driver if
2361 you need this functionality.
2362
2363 Should map types from other databases to the native RDBMS type, for example
2364 C<VARCHAR2> to C<VARCHAR>.
2365
2366 Types with modifiers should map to the underlying data type. For example,
2367 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2368
2369 Composite types should map to the container type, for example
2370 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2371
2372 =cut
2373
2374 sub _native_data_type {
2375   #my ($self, $data_type) = @_;
2376   return undef
2377 }
2378
2379 # Check if placeholders are supported at all
2380 sub _determine_supports_placeholders {
2381   my $self = shift;
2382   my $dbh  = $self->_get_dbh;
2383
2384   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2385   # but it is inaccurate more often than not
2386   return try {
2387     local $dbh->{PrintError} = 0;
2388     local $dbh->{RaiseError} = 1;
2389     $dbh->do('select ?', {}, 1);
2390     1;
2391   }
2392   catch {
2393     0;
2394   };
2395 }
2396
2397 # Check if placeholders bound to non-string types throw exceptions
2398 #
2399 sub _determine_supports_typeless_placeholders {
2400   my $self = shift;
2401   my $dbh  = $self->_get_dbh;
2402
2403   return try {
2404     local $dbh->{PrintError} = 0;
2405     local $dbh->{RaiseError} = 1;
2406     # this specifically tests a bind that is NOT a string
2407     $dbh->do('select 1 where 1 = ?', {}, 1);
2408     1;
2409   }
2410   catch {
2411     0;
2412   };
2413 }
2414
2415 =head2 sqlt_type
2416
2417 Returns the database driver name.
2418
2419 =cut
2420
2421 sub sqlt_type {
2422   shift->_get_dbh->{Driver}->{Name};
2423 }
2424
2425 =head2 bind_attribute_by_data_type
2426
2427 Given a datatype from column info, returns a database specific bind
2428 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2429 let the database planner just handle it.
2430
2431 Generally only needed for special case column types, like bytea in postgres.
2432
2433 =cut
2434
2435 sub bind_attribute_by_data_type {
2436     return;
2437 }
2438
2439 =head2 is_datatype_numeric
2440
2441 Given a datatype from column_info, returns a boolean value indicating if
2442 the current RDBMS considers it a numeric value. This controls how
2443 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2444 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2445 be performed instead of the usual C<eq>.
2446
2447 =cut
2448
2449 sub is_datatype_numeric {
2450   my ($self, $dt) = @_;
2451
2452   return 0 unless $dt;
2453
2454   return $dt =~ /^ (?:
2455     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2456   ) $/ix;
2457 }
2458
2459
2460 =head2 create_ddl_dir
2461
2462 =over 4
2463
2464 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2465
2466 =back
2467
2468 Creates a SQL file based on the Schema, for each of the specified
2469 database engines in C<\@databases> in the given directory.
2470 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2471
2472 Given a previous version number, this will also create a file containing
2473 the ALTER TABLE statements to transform the previous schema into the
2474 current one. Note that these statements may contain C<DROP TABLE> or
2475 C<DROP COLUMN> statements that can potentially destroy data.
2476
2477 The file names are created using the C<ddl_filename> method below, please
2478 override this method in your schema if you would like a different file
2479 name format. For the ALTER file, the same format is used, replacing
2480 $version in the name with "$preversion-$version".
2481
2482 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2483 The most common value for this would be C<< { add_drop_table => 1 } >>
2484 to have the SQL produced include a C<DROP TABLE> statement for each table
2485 created. For quoting purposes supply C<quote_table_names> and
2486 C<quote_field_names>.
2487
2488 If no arguments are passed, then the following default values are assumed:
2489
2490 =over 4
2491
2492 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2493
2494 =item version    - $schema->schema_version
2495
2496 =item directory  - './'
2497
2498 =item preversion - <none>
2499
2500 =back
2501
2502 By default, C<\%sqlt_args> will have
2503
2504  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2505
2506 merged with the hash passed in. To disable any of those features, pass in a
2507 hashref like the following
2508
2509  { ignore_constraint_names => 0, # ... other options }
2510
2511
2512 WARNING: You are strongly advised to check all SQL files created, before applying
2513 them.
2514
2515 =cut
2516
2517 sub create_ddl_dir {
2518   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2519
2520   unless ($dir) {
2521     carp "No directory given, using ./\n";
2522     $dir = './';
2523   } else {
2524       -d $dir
2525         or
2526       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2527         or
2528       $self->throw_exception(
2529         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2530       );
2531   }
2532
2533   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2534
2535   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2536   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2537
2538   my $schema_version = $schema->schema_version || '1.x';
2539   $version ||= $schema_version;
2540
2541   $sqltargs = {
2542     add_drop_table => 1,
2543     ignore_constraint_names => 1,
2544     ignore_index_names => 1,
2545     %{$sqltargs || {}}
2546   };
2547
2548   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2549     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2550   }
2551
2552   my $sqlt = SQL::Translator->new( $sqltargs );
2553
2554   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2555   my $sqlt_schema = $sqlt->translate({ data => $schema })
2556     or $self->throw_exception ($sqlt->error);
2557
2558   foreach my $db (@$databases) {
2559     $sqlt->reset();
2560     $sqlt->{schema} = $sqlt_schema;
2561     $sqlt->producer($db);
2562
2563     my $file;
2564     my $filename = $schema->ddl_filename($db, $version, $dir);
2565     if (-e $filename && ($version eq $schema_version )) {
2566       # if we are dumping the current version, overwrite the DDL
2567       carp "Overwriting existing DDL file - $filename";
2568       unlink($filename);
2569     }
2570
2571     my $output = $sqlt->translate;
2572     if(!$output) {
2573       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2574       next;
2575     }
2576     if(!open($file, ">$filename")) {
2577       $self->throw_exception("Can't open $filename for writing ($!)");
2578       next;
2579     }
2580     print $file $output;
2581     close($file);
2582
2583     next unless ($preversion);
2584
2585     require SQL::Translator::Diff;
2586
2587     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2588     if(!-e $prefilename) {
2589       carp("No previous schema file found ($prefilename)");
2590       next;
2591     }
2592
2593     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2594     if(-e $difffile) {
2595       carp("Overwriting existing diff file - $difffile");
2596       unlink($difffile);
2597     }
2598
2599     my $source_schema;
2600     {
2601       my $t = SQL::Translator->new($sqltargs);
2602       $t->debug( 0 );
2603       $t->trace( 0 );
2604
2605       $t->parser( $db )
2606         or $self->throw_exception ($t->error);
2607
2608       my $out = $t->translate( $prefilename )
2609         or $self->throw_exception ($t->error);
2610
2611       $source_schema = $t->schema;
2612
2613       $source_schema->name( $prefilename )
2614         unless ( $source_schema->name );
2615     }
2616
2617     # The "new" style of producers have sane normalization and can support
2618     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2619     # And we have to diff parsed SQL against parsed SQL.
2620     my $dest_schema = $sqlt_schema;
2621
2622     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2623       my $t = SQL::Translator->new($sqltargs);
2624       $t->debug( 0 );
2625       $t->trace( 0 );
2626
2627       $t->parser( $db )
2628         or $self->throw_exception ($t->error);
2629
2630       my $out = $t->translate( $filename )
2631         or $self->throw_exception ($t->error);
2632
2633       $dest_schema = $t->schema;
2634
2635       $dest_schema->name( $filename )
2636         unless $dest_schema->name;
2637     }
2638
2639     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2640                                                   $dest_schema,   $db,
2641                                                   $sqltargs
2642                                                  );
2643     if(!open $file, ">$difffile") {
2644       $self->throw_exception("Can't write to $difffile ($!)");
2645       next;
2646     }
2647     print $file $diff;
2648     close($file);
2649   }
2650 }
2651
2652 =head2 deployment_statements
2653
2654 =over 4
2655
2656 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2657
2658 =back
2659
2660 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2661
2662 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2663 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2664
2665 C<$directory> is used to return statements from files in a previously created
2666 L</create_ddl_dir> directory and is optional. The filenames are constructed
2667 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2668
2669 If no C<$directory> is specified then the statements are constructed on the
2670 fly using L<SQL::Translator> and C<$version> is ignored.
2671
2672 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2673
2674 =cut
2675
2676 sub deployment_statements {
2677   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2678   $type ||= $self->sqlt_type;
2679   $version ||= $schema->schema_version || '1.x';
2680   $dir ||= './';
2681   my $filename = $schema->ddl_filename($type, $version, $dir);
2682   if(-f $filename)
2683   {
2684       my $file;
2685       open($file, "<$filename")
2686         or $self->throw_exception("Can't open $filename ($!)");
2687       my @rows = <$file>;
2688       close($file);
2689       return join('', @rows);
2690   }
2691
2692   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2693     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2694   }
2695
2696   # sources needs to be a parser arg, but for simplicty allow at top level
2697   # coming in
2698   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2699       if exists $sqltargs->{sources};
2700
2701   my $tr = SQL::Translator->new(
2702     producer => "SQL::Translator::Producer::${type}",
2703     %$sqltargs,
2704     parser => 'SQL::Translator::Parser::DBIx::Class',
2705     data => $schema,
2706   );
2707
2708   my @ret;
2709   if (wantarray) {
2710     @ret = $tr->translate;
2711   }
2712   else {
2713     $ret[0] = $tr->translate;
2714   }
2715
2716   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2717     unless (@ret && defined $ret[0]);
2718
2719   return wantarray ? @ret : $ret[0];
2720 }
2721
2722 sub deploy {
2723   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2724   my $deploy = sub {
2725     my $line = shift;
2726     return if($line =~ /^--/);
2727     return if(!$line);
2728     # next if($line =~ /^DROP/m);
2729     return if($line =~ /^BEGIN TRANSACTION/m);
2730     return if($line =~ /^COMMIT/m);
2731     return if $line =~ /^\s+$/; # skip whitespace only
2732     $self->_query_start($line);
2733     try {
2734       # do a dbh_do cycle here, as we need some error checking in
2735       # place (even though we will ignore errors)
2736       $self->dbh_do (sub { $_[1]->do($line) });
2737     } catch {
2738       carp qq{$_ (running "${line}")};
2739     };
2740     $self->_query_end($line);
2741   };
2742   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2743   if (@statements > 1) {
2744     foreach my $statement (@statements) {
2745       $deploy->( $statement );
2746     }
2747   }
2748   elsif (@statements == 1) {
2749     foreach my $line ( split(";\n", $statements[0])) {
2750       $deploy->( $line );
2751     }
2752   }
2753 }
2754
2755 =head2 datetime_parser
2756
2757 Returns the datetime parser class
2758
2759 =cut
2760
2761 sub datetime_parser {
2762   my $self = shift;
2763   return $self->{datetime_parser} ||= do {
2764     $self->build_datetime_parser(@_);
2765   };
2766 }
2767
2768 =head2 datetime_parser_type
2769
2770 Defines (returns) the datetime parser class - currently hardwired to
2771 L<DateTime::Format::MySQL>
2772
2773 =cut
2774
2775 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2776
2777 =head2 build_datetime_parser
2778
2779 See L</datetime_parser>
2780
2781 =cut
2782
2783 sub build_datetime_parser {
2784   my $self = shift;
2785   my $type = $self->datetime_parser_type(@_);
2786   $self->ensure_class_loaded ($type);
2787   return $type;
2788 }
2789
2790
2791 =head2 is_replicating
2792
2793 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2794 replicate from a master database.  Default is undef, which is the result
2795 returned by databases that don't support replication.
2796
2797 =cut
2798
2799 sub is_replicating {
2800     return;
2801
2802 }
2803
2804 =head2 lag_behind_master
2805
2806 Returns a number that represents a certain amount of lag behind a master db
2807 when a given storage is replicating.  The number is database dependent, but
2808 starts at zero and increases with the amount of lag. Default in undef
2809
2810 =cut
2811
2812 sub lag_behind_master {
2813     return;
2814 }
2815
2816 =head2 relname_to_table_alias
2817
2818 =over 4
2819
2820 =item Arguments: $relname, $join_count
2821
2822 =back
2823
2824 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2825 queries.
2826
2827 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2828 way these aliases are named.
2829
2830 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2831 otherwise C<"$relname">.
2832
2833 =cut
2834
2835 sub relname_to_table_alias {
2836   my ($self, $relname, $join_count) = @_;
2837
2838   my $alias = ($join_count && $join_count > 1 ?
2839     join('_', $relname, $join_count) : $relname);
2840
2841   return $alias;
2842 }
2843
2844 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2845 # version and it may be necessary to amend or override it for a specific storage
2846 # if such binds are necessary.
2847 sub _max_column_bytesize {
2848   my ($self, $source, $col) = @_;
2849
2850   my $inf = $source->column_info($col);
2851   return $inf->{_max_bytesize} ||= do {
2852
2853     my $max_size;
2854
2855     if (my $data_type = $inf->{data_type}) {
2856       $data_type = lc($data_type);
2857
2858       # String/sized-binary types
2859       if ($data_type =~ /^(?:l?(?:var)?char(?:acter)?(?:\s*varying)?
2860                              |(?:var)?binary(?:\s*varying)?|raw)\b/x
2861       ) {
2862         $max_size = $inf->{size};
2863       }
2864       # Other charset/unicode types, assume scale of 4
2865       elsif ($data_type =~ /^(?:national\s*character(?:\s*varying)?|nchar
2866                               |univarchar
2867                               |nvarchar)\b/x
2868       ) {
2869         $max_size = $inf->{size} * 4 if $inf->{size};
2870       }
2871       # Blob types
2872       elsif ($self->_is_lob_type($data_type)) {
2873         # default to longreadlen
2874       }
2875       else {
2876         $max_size = 100;  # for all other (numeric?) datatypes
2877       }
2878     }
2879
2880     $max_size ||= $self->_get_dbh->{LongReadLen} || 8000;
2881   };
2882 }
2883
2884 # Determine if a data_type is some type of BLOB
2885 sub _is_lob_type {
2886   my ($self, $data_type) = @_;
2887   $data_type && ($data_type =~ /(?:lob|bfile|text|image|bytea|memo)/i
2888     || $data_type =~ /^long(?:\s*(?:raw|bit\s*varying|varbit|binary
2889                                   |varchar|character\s*varying|nvarchar
2890                                   |national\s*character\s*varying))?$/xi);
2891 }
2892
2893 1;
2894
2895 =head1 USAGE NOTES
2896
2897 =head2 DBIx::Class and AutoCommit
2898
2899 DBIx::Class can do some wonderful magic with handling exceptions,
2900 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2901 (the default) combined with C<txn_do> for transaction support.
2902
2903 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2904 in an assumed transaction between commits, and you're telling us you'd
2905 like to manage that manually.  A lot of the magic protections offered by
2906 this module will go away.  We can't protect you from exceptions due to database
2907 disconnects because we don't know anything about how to restart your
2908 transactions.  You're on your own for handling all sorts of exceptional
2909 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2910 be with raw DBI.
2911
2912
2913 =head1 AUTHORS
2914
2915 Matt S. Trout <mst@shadowcatsystems.co.uk>
2916
2917 Andy Grundman <andy@hybridized.org>
2918
2919 =head1 LICENSE
2920
2921 You may distribute this code under the same terms as Perl itself.
2922
2923 =cut