Cleanup/extend blob/clob detection/checks
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use overload ();
20 use namespace::clean;
21
22
23 # default cursor class, overridable in connect_info attributes
24 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
25
26 __PACKAGE__->mk_group_accessors('inherited' => qw/
27   sql_maker_class sql_limit_dialect sql_quote_char sql_name_sep
28 /);
29
30 __PACKAGE__->sql_name_sep('.');
31
32 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
33
34 __PACKAGE__->mk_group_accessors('simple' => qw/
35   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
36   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
37   transaction_depth _dbh_autocommit  savepoints
38 /);
39
40 # the values for these accessors are picked out (and deleted) from
41 # the attribute hashref passed to connect_info
42 my @storage_options = qw/
43   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
44   disable_sth_caching unsafe auto_savepoint
45 /;
46 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
47
48
49 # capability definitions, using a 2-tiered accessor system
50 # The rationale is:
51 #
52 # A driver/user may define _use_X, which blindly without any checks says:
53 # "(do not) use this capability", (use_dbms_capability is an "inherited"
54 # type accessor)
55 #
56 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
57 # accessor, which in turn calls _determine_supports_X, and stores the return
58 # in a special slot on the storage object, which is wiped every time a $dbh
59 # reconnection takes place (it is not guaranteed that upon reconnection we
60 # will get the same rdbms version). _determine_supports_X does not need to
61 # exist on a driver, as we ->can for it before calling.
62
63 my @capabilities = (qw/
64   insert_returning
65   insert_returning_bound
66   placeholders
67   typeless_placeholders
68   join_optimizer
69 /);
70 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
71 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
72
73 # on by default, not strictly a capability (pending rewrite)
74 __PACKAGE__->_use_join_optimizer (1);
75 sub _determine_supports_join_optimizer { 1 };
76
77 # Each of these methods need _determine_driver called before itself
78 # in order to function reliably. This is a purely DRY optimization
79 #
80 # get_(use)_dbms_capability need to be called on the correct Storage
81 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
82 # _determine_supports_X which obv. needs a correct driver as well
83 my @rdbms_specific_methods = qw/
84   deployment_statements
85   sqlt_type
86   sql_maker
87   build_datetime_parser
88   datetime_parser_type
89
90   insert
91   insert_bulk
92   update
93   delete
94   select
95   select_single
96
97   get_use_dbms_capability
98   get_dbms_capability
99
100   _server_info
101   _get_server_version
102 /;
103
104 for my $meth (@rdbms_specific_methods) {
105
106   my $orig = __PACKAGE__->can ($meth)
107     or die "$meth is not a ::Storage::DBI method!";
108
109   no strict qw/refs/;
110   no warnings qw/redefine/;
111   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
112     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
113       $_[0]->_determine_driver;
114
115       # This for some reason crashes and burns on perl 5.8.1
116       # IFF the method ends up throwing an exception
117       #goto $_[0]->can ($meth);
118
119       my $cref = $_[0]->can ($meth);
120       goto $cref;
121     }
122     goto $orig;
123   };
124 }
125
126
127 =head1 NAME
128
129 DBIx::Class::Storage::DBI - DBI storage handler
130
131 =head1 SYNOPSIS
132
133   my $schema = MySchema->connect('dbi:SQLite:my.db');
134
135   $schema->storage->debug(1);
136
137   my @stuff = $schema->storage->dbh_do(
138     sub {
139       my ($storage, $dbh, @args) = @_;
140       $dbh->do("DROP TABLE authors");
141     },
142     @column_list
143   );
144
145   $schema->resultset('Book')->search({
146      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
147   });
148
149 =head1 DESCRIPTION
150
151 This class represents the connection to an RDBMS via L<DBI>.  See
152 L<DBIx::Class::Storage> for general information.  This pod only
153 documents DBI-specific methods and behaviors.
154
155 =head1 METHODS
156
157 =cut
158
159 sub new {
160   my $new = shift->next::method(@_);
161
162   $new->transaction_depth(0);
163   $new->_sql_maker_opts({});
164   $new->_dbh_details({});
165   $new->{savepoints} = [];
166   $new->{_in_dbh_do} = 0;
167   $new->{_dbh_gen} = 0;
168
169   # read below to see what this does
170   $new->_arm_global_destructor;
171
172   $new;
173 }
174
175 # This is hack to work around perl shooting stuff in random
176 # order on exit(). If we do not walk the remaining storage
177 # objects in an END block, there is a *small but real* chance
178 # of a fork()ed child to kill the parent's shared DBI handle,
179 # *before perl reaches the DESTROY in this package*
180 # Yes, it is ugly and effective.
181 # Additionally this registry is used by the CLONE method to
182 # make sure no handles are shared between threads
183 {
184   my %seek_and_destroy;
185
186   sub _arm_global_destructor {
187     my $self = shift;
188     my $key = refaddr ($self);
189     $seek_and_destroy{$key} = $self;
190     weaken ($seek_and_destroy{$key});
191   }
192
193   END {
194     local $?; # just in case the DBI destructor changes it somehow
195
196     # destroy just the object if not native to this process/thread
197     $_->_verify_pid for (grep
198       { defined $_ }
199       values %seek_and_destroy
200     );
201   }
202
203   sub CLONE {
204     # As per DBI's recommendation, DBIC disconnects all handles as
205     # soon as possible (DBIC will reconnect only on demand from within
206     # the thread)
207     for (values %seek_and_destroy) {
208       next unless $_;
209       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
210       $_->_dbh(undef);
211     }
212   }
213 }
214
215 sub DESTROY {
216   my $self = shift;
217
218   # some databases spew warnings on implicit disconnect
219   local $SIG{__WARN__} = sub {};
220   $self->_dbh(undef);
221
222   # this op is necessary, since the very last perl runtime statement
223   # triggers a global destruction shootout, and the $SIG localization
224   # may very well be destroyed before perl actually gets to do the
225   # $dbh undef
226   1;
227 }
228
229 # handle pid changes correctly - do not destroy parent's connection
230 sub _verify_pid {
231   my $self = shift;
232
233   my $pid = $self->_conn_pid;
234   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
235     $dbh->{InactiveDestroy} = 1;
236     $self->{_dbh_gen}++;
237     $self->_dbh(undef);
238   }
239
240   return;
241 }
242
243 =head2 connect_info
244
245 This method is normally called by L<DBIx::Class::Schema/connection>, which
246 encapsulates its argument list in an arrayref before passing them here.
247
248 The argument list may contain:
249
250 =over
251
252 =item *
253
254 The same 4-element argument set one would normally pass to
255 L<DBI/connect>, optionally followed by
256 L<extra attributes|/DBIx::Class specific connection attributes>
257 recognized by DBIx::Class:
258
259   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
260
261 =item *
262
263 A single code reference which returns a connected
264 L<DBI database handle|DBI/connect> optionally followed by
265 L<extra attributes|/DBIx::Class specific connection attributes> recognized
266 by DBIx::Class:
267
268   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
269
270 =item *
271
272 A single hashref with all the attributes and the dsn/user/password
273 mixed together:
274
275   $connect_info_args = [{
276     dsn => $dsn,
277     user => $user,
278     password => $pass,
279     %dbi_attributes,
280     %extra_attributes,
281   }];
282
283   $connect_info_args = [{
284     dbh_maker => sub { DBI->connect (...) },
285     %dbi_attributes,
286     %extra_attributes,
287   }];
288
289 This is particularly useful for L<Catalyst> based applications, allowing the
290 following config (L<Config::General> style):
291
292   <Model::DB>
293     schema_class   App::DB
294     <connect_info>
295       dsn          dbi:mysql:database=test
296       user         testuser
297       password     TestPass
298       AutoCommit   1
299     </connect_info>
300   </Model::DB>
301
302 The C<dsn>/C<user>/C<password> combination can be substituted by the
303 C<dbh_maker> key whose value is a coderef that returns a connected
304 L<DBI database handle|DBI/connect>
305
306 =back
307
308 Please note that the L<DBI> docs recommend that you always explicitly
309 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
310 recommends that it be set to I<1>, and that you perform transactions
311 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
312 to I<1> if you do not do explicitly set it to zero.  This is the default
313 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
314
315 =head3 DBIx::Class specific connection attributes
316
317 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
318 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
319 the following connection options. These options can be mixed in with your other
320 L<DBI> connection attributes, or placed in a separate hashref
321 (C<\%extra_attributes>) as shown above.
322
323 Every time C<connect_info> is invoked, any previous settings for
324 these options will be cleared before setting the new ones, regardless of
325 whether any options are specified in the new C<connect_info>.
326
327
328 =over
329
330 =item on_connect_do
331
332 Specifies things to do immediately after connecting or re-connecting to
333 the database.  Its value may contain:
334
335 =over
336
337 =item a scalar
338
339 This contains one SQL statement to execute.
340
341 =item an array reference
342
343 This contains SQL statements to execute in order.  Each element contains
344 a string or a code reference that returns a string.
345
346 =item a code reference
347
348 This contains some code to execute.  Unlike code references within an
349 array reference, its return value is ignored.
350
351 =back
352
353 =item on_disconnect_do
354
355 Takes arguments in the same form as L</on_connect_do> and executes them
356 immediately before disconnecting from the database.
357
358 Note, this only runs if you explicitly call L</disconnect> on the
359 storage object.
360
361 =item on_connect_call
362
363 A more generalized form of L</on_connect_do> that calls the specified
364 C<connect_call_METHOD> methods in your storage driver.
365
366   on_connect_do => 'select 1'
367
368 is equivalent to:
369
370   on_connect_call => [ [ do_sql => 'select 1' ] ]
371
372 Its values may contain:
373
374 =over
375
376 =item a scalar
377
378 Will call the C<connect_call_METHOD> method.
379
380 =item a code reference
381
382 Will execute C<< $code->($storage) >>
383
384 =item an array reference
385
386 Each value can be a method name or code reference.
387
388 =item an array of arrays
389
390 For each array, the first item is taken to be the C<connect_call_> method name
391 or code reference, and the rest are parameters to it.
392
393 =back
394
395 Some predefined storage methods you may use:
396
397 =over
398
399 =item do_sql
400
401 Executes a SQL string or a code reference that returns a SQL string. This is
402 what L</on_connect_do> and L</on_disconnect_do> use.
403
404 It can take:
405
406 =over
407
408 =item a scalar
409
410 Will execute the scalar as SQL.
411
412 =item an arrayref
413
414 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
415 attributes hashref and bind values.
416
417 =item a code reference
418
419 Will execute C<< $code->($storage) >> and execute the return array refs as
420 above.
421
422 =back
423
424 =item datetime_setup
425
426 Execute any statements necessary to initialize the database session to return
427 and accept datetime/timestamp values used with
428 L<DBIx::Class::InflateColumn::DateTime>.
429
430 Only necessary for some databases, see your specific storage driver for
431 implementation details.
432
433 =back
434
435 =item on_disconnect_call
436
437 Takes arguments in the same form as L</on_connect_call> and executes them
438 immediately before disconnecting from the database.
439
440 Calls the C<disconnect_call_METHOD> methods as opposed to the
441 C<connect_call_METHOD> methods called by L</on_connect_call>.
442
443 Note, this only runs if you explicitly call L</disconnect> on the
444 storage object.
445
446 =item disable_sth_caching
447
448 If set to a true value, this option will disable the caching of
449 statement handles via L<DBI/prepare_cached>.
450
451 =item limit_dialect
452
453 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
454 default L</sql_limit_dialect> setting of the storage (if any). For a list
455 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
456
457 =item quote_names
458
459 When true automatically sets L</quote_char> and L</name_sep> to the characters
460 appropriate for your particular RDBMS. This option is preferred over specifying
461 L</quote_char> directly.
462
463 =item quote_char
464
465 Specifies what characters to use to quote table and column names.
466
467 C<quote_char> expects either a single character, in which case is it
468 is placed on either side of the table/column name, or an arrayref of length
469 2 in which case the table/column name is placed between the elements.
470
471 For example under MySQL you should use C<< quote_char => '`' >>, and for
472 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
473
474 =item name_sep
475
476 This parameter is only useful in conjunction with C<quote_char>, and is used to
477 specify the character that separates elements (schemas, tables, columns) from
478 each other. If unspecified it defaults to the most commonly used C<.>.
479
480 =item unsafe
481
482 This Storage driver normally installs its own C<HandleError>, sets
483 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
484 all database handles, including those supplied by a coderef.  It does this
485 so that it can have consistent and useful error behavior.
486
487 If you set this option to a true value, Storage will not do its usual
488 modifications to the database handle's attributes, and instead relies on
489 the settings in your connect_info DBI options (or the values you set in
490 your connection coderef, in the case that you are connecting via coderef).
491
492 Note that your custom settings can cause Storage to malfunction,
493 especially if you set a C<HandleError> handler that suppresses exceptions
494 and/or disable C<RaiseError>.
495
496 =item auto_savepoint
497
498 If this option is true, L<DBIx::Class> will use savepoints when nesting
499 transactions, making it possible to recover from failure in the inner
500 transaction without having to abort all outer transactions.
501
502 =item cursor_class
503
504 Use this argument to supply a cursor class other than the default
505 L<DBIx::Class::Storage::DBI::Cursor>.
506
507 =back
508
509 Some real-life examples of arguments to L</connect_info> and
510 L<DBIx::Class::Schema/connect>
511
512   # Simple SQLite connection
513   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
514
515   # Connect via subref
516   ->connect_info([ sub { DBI->connect(...) } ]);
517
518   # Connect via subref in hashref
519   ->connect_info([{
520     dbh_maker => sub { DBI->connect(...) },
521     on_connect_do => 'alter session ...',
522   }]);
523
524   # A bit more complicated
525   ->connect_info(
526     [
527       'dbi:Pg:dbname=foo',
528       'postgres',
529       'my_pg_password',
530       { AutoCommit => 1 },
531       { quote_char => q{"} },
532     ]
533   );
534
535   # Equivalent to the previous example
536   ->connect_info(
537     [
538       'dbi:Pg:dbname=foo',
539       'postgres',
540       'my_pg_password',
541       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
542     ]
543   );
544
545   # Same, but with hashref as argument
546   # See parse_connect_info for explanation
547   ->connect_info(
548     [{
549       dsn         => 'dbi:Pg:dbname=foo',
550       user        => 'postgres',
551       password    => 'my_pg_password',
552       AutoCommit  => 1,
553       quote_char  => q{"},
554       name_sep    => q{.},
555     }]
556   );
557
558   # Subref + DBIx::Class-specific connection options
559   ->connect_info(
560     [
561       sub { DBI->connect(...) },
562       {
563           quote_char => q{`},
564           name_sep => q{@},
565           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
566           disable_sth_caching => 1,
567       },
568     ]
569   );
570
571
572
573 =cut
574
575 sub connect_info {
576   my ($self, $info) = @_;
577
578   return $self->_connect_info if !$info;
579
580   $self->_connect_info($info); # copy for _connect_info
581
582   $info = $self->_normalize_connect_info($info)
583     if ref $info eq 'ARRAY';
584
585   for my $storage_opt (keys %{ $info->{storage_options} }) {
586     my $value = $info->{storage_options}{$storage_opt};
587
588     $self->$storage_opt($value);
589   }
590
591   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
592   #  the new set of options
593   $self->_sql_maker(undef);
594   $self->_sql_maker_opts({});
595
596   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
597     my $value = $info->{sql_maker_options}{$sql_maker_opt};
598
599     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
600   }
601
602   my %attrs = (
603     %{ $self->_default_dbi_connect_attributes || {} },
604     %{ $info->{attributes} || {} },
605   );
606
607   my @args = @{ $info->{arguments} };
608
609   if (keys %attrs and ref $args[0] ne 'CODE') {
610     carp
611         'You provided explicit AutoCommit => 0 in your connection_info. '
612       . 'This is almost universally a bad idea (see the footnotes of '
613       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
614       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
615       . 'this warning.'
616       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
617
618     push @args, \%attrs if keys %attrs;
619   }
620   $self->_dbi_connect_info(\@args);
621
622   # FIXME - dirty:
623   # save attributes them in a separate accessor so they are always
624   # introspectable, even in case of a CODE $dbhmaker
625   $self->_dbic_connect_attributes (\%attrs);
626
627   return $self->_connect_info;
628 }
629
630 sub _normalize_connect_info {
631   my ($self, $info_arg) = @_;
632   my %info;
633
634   my @args = @$info_arg;  # take a shallow copy for further mutilation
635
636   # combine/pre-parse arguments depending on invocation style
637
638   my %attrs;
639   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
640     %attrs = %{ $args[1] || {} };
641     @args = $args[0];
642   }
643   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
644     %attrs = %{$args[0]};
645     @args = ();
646     if (my $code = delete $attrs{dbh_maker}) {
647       @args = $code;
648
649       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
650       if (@ignored) {
651         carp sprintf (
652             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
653           . "to the result of 'dbh_maker'",
654
655           join (', ', map { "'$_'" } (@ignored) ),
656         );
657       }
658     }
659     else {
660       @args = delete @attrs{qw/dsn user password/};
661     }
662   }
663   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
664     %attrs = (
665       % { $args[3] || {} },
666       % { $args[4] || {} },
667     );
668     @args = @args[0,1,2];
669   }
670
671   $info{arguments} = \@args;
672
673   my @storage_opts = grep exists $attrs{$_},
674     @storage_options, 'cursor_class';
675
676   @{ $info{storage_options} }{@storage_opts} =
677     delete @attrs{@storage_opts} if @storage_opts;
678
679   my @sql_maker_opts = grep exists $attrs{$_},
680     qw/limit_dialect quote_char name_sep quote_names/;
681
682   @{ $info{sql_maker_options} }{@sql_maker_opts} =
683     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
684
685   $info{attributes} = \%attrs if %attrs;
686
687   return \%info;
688 }
689
690 sub _default_dbi_connect_attributes () {
691   +{
692     AutoCommit => 1,
693     PrintError => 0,
694     RaiseError => 1,
695     ShowErrorStatement => 1,
696   };
697 }
698
699 =head2 on_connect_do
700
701 This method is deprecated in favour of setting via L</connect_info>.
702
703 =cut
704
705 =head2 on_disconnect_do
706
707 This method is deprecated in favour of setting via L</connect_info>.
708
709 =cut
710
711 sub _parse_connect_do {
712   my ($self, $type) = @_;
713
714   my $val = $self->$type;
715   return () if not defined $val;
716
717   my @res;
718
719   if (not ref($val)) {
720     push @res, [ 'do_sql', $val ];
721   } elsif (ref($val) eq 'CODE') {
722     push @res, $val;
723   } elsif (ref($val) eq 'ARRAY') {
724     push @res, map { [ 'do_sql', $_ ] } @$val;
725   } else {
726     $self->throw_exception("Invalid type for $type: ".ref($val));
727   }
728
729   return \@res;
730 }
731
732 =head2 dbh_do
733
734 Arguments: ($subref | $method_name), @extra_coderef_args?
735
736 Execute the given $subref or $method_name using the new exception-based
737 connection management.
738
739 The first two arguments will be the storage object that C<dbh_do> was called
740 on and a database handle to use.  Any additional arguments will be passed
741 verbatim to the called subref as arguments 2 and onwards.
742
743 Using this (instead of $self->_dbh or $self->dbh) ensures correct
744 exception handling and reconnection (or failover in future subclasses).
745
746 Your subref should have no side-effects outside of the database, as
747 there is the potential for your subref to be partially double-executed
748 if the database connection was stale/dysfunctional.
749
750 Example:
751
752   my @stuff = $schema->storage->dbh_do(
753     sub {
754       my ($storage, $dbh, @cols) = @_;
755       my $cols = join(q{, }, @cols);
756       $dbh->selectrow_array("SELECT $cols FROM foo");
757     },
758     @column_list
759   );
760
761 =cut
762
763 sub dbh_do {
764   my $self = shift;
765   my $code = shift;
766
767   my $dbh = $self->_get_dbh;
768
769   return $self->$code($dbh, @_)
770     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
771
772   local $self->{_in_dbh_do} = 1;
773
774   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
775   my $args = \@_;
776   return try {
777     $self->$code ($dbh, @$args);
778   } catch {
779     $self->throw_exception($_) if $self->connected;
780
781     # We were not connected - reconnect and retry, but let any
782     #  exception fall right through this time
783     carp "Retrying $code after catching disconnected exception: $_"
784       if $ENV{DBIC_DBIRETRY_DEBUG};
785
786     $self->_populate_dbh;
787     $self->$code($self->_dbh, @$args);
788   };
789 }
790
791 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
792 # It also informs dbh_do to bypass itself while under the direction of txn_do,
793 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
794 sub txn_do {
795   my $self = shift;
796   my $coderef = shift;
797
798   ref $coderef eq 'CODE' or $self->throw_exception
799     ('$coderef must be a CODE reference');
800
801   local $self->{_in_dbh_do} = 1;
802
803   my @result;
804   my $want = wantarray;
805
806   my $tried = 0;
807   while(1) {
808     my $exception;
809
810     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
811     my $args = \@_;
812
813     try {
814       $self->txn_begin;
815       my $txn_start_depth = $self->transaction_depth;
816       if($want) {
817           @result = $coderef->(@$args);
818       }
819       elsif(defined $want) {
820           $result[0] = $coderef->(@$args);
821       }
822       else {
823           $coderef->(@$args);
824       }
825
826       my $delta_txn = $txn_start_depth - $self->transaction_depth;
827       if ($delta_txn == 0) {
828         $self->txn_commit;
829       }
830       elsif ($delta_txn != 1) {
831         # an off-by-one would mean we fired a rollback
832         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
833       }
834     } catch {
835       $exception = $_;
836     };
837
838     if(! defined $exception) { return wantarray ? @result : $result[0] }
839
840     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
841       my $rollback_exception;
842       try { $self->txn_rollback } catch { $rollback_exception = shift };
843       if(defined $rollback_exception) {
844         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
845         $self->throw_exception($exception)  # propagate nested rollback
846           if $rollback_exception =~ /$exception_class/;
847
848         $self->throw_exception(
849           "Transaction aborted: ${exception}. "
850           . "Rollback failed: ${rollback_exception}"
851         );
852       }
853       $self->throw_exception($exception)
854     }
855
856     # We were not connected, and was first try - reconnect and retry
857     # via the while loop
858     carp "Retrying $coderef after catching disconnected exception: $exception"
859       if $ENV{DBIC_TXNRETRY_DEBUG};
860     $self->_populate_dbh;
861   }
862 }
863
864 =head2 disconnect
865
866 Our C<disconnect> method also performs a rollback first if the
867 database is not in C<AutoCommit> mode.
868
869 =cut
870
871 sub disconnect {
872   my ($self) = @_;
873
874   if( $self->_dbh ) {
875     my @actions;
876
877     push @actions, ( $self->on_disconnect_call || () );
878     push @actions, $self->_parse_connect_do ('on_disconnect_do');
879
880     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
881
882     $self->_dbh_rollback unless $self->_dbh_autocommit;
883
884     %{ $self->_dbh->{CachedKids} } = ();
885     $self->_dbh->disconnect;
886     $self->_dbh(undef);
887     $self->{_dbh_gen}++;
888   }
889 }
890
891 =head2 with_deferred_fk_checks
892
893 =over 4
894
895 =item Arguments: C<$coderef>
896
897 =item Return Value: The return value of $coderef
898
899 =back
900
901 Storage specific method to run the code ref with FK checks deferred or
902 in MySQL's case disabled entirely.
903
904 =cut
905
906 # Storage subclasses should override this
907 sub with_deferred_fk_checks {
908   my ($self, $sub) = @_;
909   $sub->();
910 }
911
912 =head2 connected
913
914 =over
915
916 =item Arguments: none
917
918 =item Return Value: 1|0
919
920 =back
921
922 Verifies that the current database handle is active and ready to execute
923 an SQL statement (e.g. the connection did not get stale, server is still
924 answering, etc.) This method is used internally by L</dbh>.
925
926 =cut
927
928 sub connected {
929   my $self = shift;
930   return 0 unless $self->_seems_connected;
931
932   #be on the safe side
933   local $self->_dbh->{RaiseError} = 1;
934
935   return $self->_ping;
936 }
937
938 sub _seems_connected {
939   my $self = shift;
940
941   $self->_verify_pid;
942
943   my $dbh = $self->_dbh
944     or return 0;
945
946   return $dbh->FETCH('Active');
947 }
948
949 sub _ping {
950   my $self = shift;
951
952   my $dbh = $self->_dbh or return 0;
953
954   return $dbh->ping;
955 }
956
957 sub ensure_connected {
958   my ($self) = @_;
959
960   unless ($self->connected) {
961     $self->_populate_dbh;
962   }
963 }
964
965 =head2 dbh
966
967 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
968 is guaranteed to be healthy by implicitly calling L</connected>, and if
969 necessary performing a reconnection before returning. Keep in mind that this
970 is very B<expensive> on some database engines. Consider using L</dbh_do>
971 instead.
972
973 =cut
974
975 sub dbh {
976   my ($self) = @_;
977
978   if (not $self->_dbh) {
979     $self->_populate_dbh;
980   } else {
981     $self->ensure_connected;
982   }
983   return $self->_dbh;
984 }
985
986 # this is the internal "get dbh or connect (don't check)" method
987 sub _get_dbh {
988   my $self = shift;
989   $self->_verify_pid;
990   $self->_populate_dbh unless $self->_dbh;
991   return $self->_dbh;
992 }
993
994 sub sql_maker {
995   my ($self) = @_;
996   unless ($self->_sql_maker) {
997     my $sql_maker_class = $self->sql_maker_class;
998     $self->ensure_class_loaded ($sql_maker_class);
999
1000     my %opts = %{$self->_sql_maker_opts||{}};
1001     my $dialect =
1002       $opts{limit_dialect}
1003         ||
1004       $self->sql_limit_dialect
1005         ||
1006       do {
1007         my $s_class = (ref $self) || $self;
1008         carp (
1009           "Your storage class ($s_class) does not set sql_limit_dialect and you "
1010         . 'have not supplied an explicit limit_dialect in your connection_info. '
1011         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1012         . 'databases but can be (and often is) painfully slow. '
1013         . "Please file an RT ticket against '$s_class' ."
1014         );
1015
1016         'GenericSubQ';
1017       }
1018     ;
1019
1020     my ($quote_char, $name_sep);
1021
1022     if ($opts{quote_names}) {
1023       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1024         my $s_class = (ref $self) || $self;
1025         carp (
1026           "You requested 'quote_names' but your storage class ($s_class) does "
1027         . 'not explicitly define a default sql_quote_char and you have not '
1028         . 'supplied a quote_char as part of your connection_info. DBIC will '
1029         .q{default to the ANSI SQL standard quote '"', which works most of }
1030         . "the time. Please file an RT ticket against '$s_class'."
1031         );
1032
1033         '"'; # RV
1034       };
1035
1036       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1037     }
1038
1039     $self->_sql_maker($sql_maker_class->new(
1040       bindtype=>'columns',
1041       array_datatypes => 1,
1042       limit_dialect => $dialect,
1043       ($quote_char ? (quote_char => $quote_char) : ()),
1044       name_sep => ($name_sep || '.'),
1045       %opts,
1046     ));
1047   }
1048   return $self->_sql_maker;
1049 }
1050
1051 # nothing to do by default
1052 sub _rebless {}
1053 sub _init {}
1054
1055 sub _populate_dbh {
1056   my ($self) = @_;
1057
1058   my @info = @{$self->_dbi_connect_info || []};
1059   $self->_dbh(undef); # in case ->connected failed we might get sent here
1060   $self->_dbh_details({}); # reset everything we know
1061
1062   $self->_dbh($self->_connect(@info));
1063
1064   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1065
1066   $self->_determine_driver;
1067
1068   # Always set the transaction depth on connect, since
1069   #  there is no transaction in progress by definition
1070   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1071
1072   $self->_run_connection_actions unless $self->{_in_determine_driver};
1073 }
1074
1075 sub _run_connection_actions {
1076   my $self = shift;
1077   my @actions;
1078
1079   push @actions, ( $self->on_connect_call || () );
1080   push @actions, $self->_parse_connect_do ('on_connect_do');
1081
1082   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1083 }
1084
1085
1086
1087 sub set_use_dbms_capability {
1088   $_[0]->set_inherited ($_[1], $_[2]);
1089 }
1090
1091 sub get_use_dbms_capability {
1092   my ($self, $capname) = @_;
1093
1094   my $use = $self->get_inherited ($capname);
1095   return defined $use
1096     ? $use
1097     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1098   ;
1099 }
1100
1101 sub set_dbms_capability {
1102   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1103 }
1104
1105 sub get_dbms_capability {
1106   my ($self, $capname) = @_;
1107
1108   my $cap = $self->_dbh_details->{capability}{$capname};
1109
1110   unless (defined $cap) {
1111     if (my $meth = $self->can ("_determine$capname")) {
1112       $cap = $self->$meth ? 1 : 0;
1113     }
1114     else {
1115       $cap = 0;
1116     }
1117
1118     $self->set_dbms_capability ($capname, $cap);
1119   }
1120
1121   return $cap;
1122 }
1123
1124 sub _server_info {
1125   my $self = shift;
1126
1127   my $info;
1128   unless ($info = $self->_dbh_details->{info}) {
1129
1130     $info = {};
1131
1132     my $server_version = try { $self->_get_server_version };
1133
1134     if (defined $server_version) {
1135       $info->{dbms_version} = $server_version;
1136
1137       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1138       my @verparts = split (/\./, $numeric_version);
1139       if (
1140         @verparts
1141           &&
1142         $verparts[0] <= 999
1143       ) {
1144         # consider only up to 3 version parts, iff not more than 3 digits
1145         my @use_parts;
1146         while (@verparts && @use_parts < 3) {
1147           my $p = shift @verparts;
1148           last if $p > 999;
1149           push @use_parts, $p;
1150         }
1151         push @use_parts, 0 while @use_parts < 3;
1152
1153         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1154       }
1155     }
1156
1157     $self->_dbh_details->{info} = $info;
1158   }
1159
1160   return $info;
1161 }
1162
1163 sub _get_server_version {
1164   shift->_dbh_get_info(18);
1165 }
1166
1167 sub _dbh_get_info {
1168   my ($self, $info) = @_;
1169
1170   return try { $self->_get_dbh->get_info($info) } || undef;
1171 }
1172
1173 sub _determine_driver {
1174   my ($self) = @_;
1175
1176   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1177     my $started_connected = 0;
1178     local $self->{_in_determine_driver} = 1;
1179
1180     if (ref($self) eq __PACKAGE__) {
1181       my $driver;
1182       if ($self->_dbh) { # we are connected
1183         $driver = $self->_dbh->{Driver}{Name};
1184         $started_connected = 1;
1185       } else {
1186         # if connect_info is a CODEREF, we have no choice but to connect
1187         if (ref $self->_dbi_connect_info->[0] &&
1188             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1189           $self->_populate_dbh;
1190           $driver = $self->_dbh->{Driver}{Name};
1191         }
1192         else {
1193           # try to use dsn to not require being connected, the driver may still
1194           # force a connection in _rebless to determine version
1195           # (dsn may not be supplied at all if all we do is make a mock-schema)
1196           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1197           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1198           $driver ||= $ENV{DBI_DRIVER};
1199         }
1200       }
1201
1202       if ($driver) {
1203         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1204         if ($self->load_optional_class($storage_class)) {
1205           mro::set_mro($storage_class, 'c3');
1206           bless $self, $storage_class;
1207           $self->_rebless();
1208         }
1209       }
1210     }
1211
1212     $self->_driver_determined(1);
1213
1214     $self->_init; # run driver-specific initializations
1215
1216     $self->_run_connection_actions
1217         if !$started_connected && defined $self->_dbh;
1218   }
1219 }
1220
1221 sub _do_connection_actions {
1222   my $self          = shift;
1223   my $method_prefix = shift;
1224   my $call          = shift;
1225
1226   if (not ref($call)) {
1227     my $method = $method_prefix . $call;
1228     $self->$method(@_);
1229   } elsif (ref($call) eq 'CODE') {
1230     $self->$call(@_);
1231   } elsif (ref($call) eq 'ARRAY') {
1232     if (ref($call->[0]) ne 'ARRAY') {
1233       $self->_do_connection_actions($method_prefix, $_) for @$call;
1234     } else {
1235       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1236     }
1237   } else {
1238     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1239   }
1240
1241   return $self;
1242 }
1243
1244 sub connect_call_do_sql {
1245   my $self = shift;
1246   $self->_do_query(@_);
1247 }
1248
1249 sub disconnect_call_do_sql {
1250   my $self = shift;
1251   $self->_do_query(@_);
1252 }
1253
1254 # override in db-specific backend when necessary
1255 sub connect_call_datetime_setup { 1 }
1256
1257 sub _do_query {
1258   my ($self, $action) = @_;
1259
1260   if (ref $action eq 'CODE') {
1261     $action = $action->($self);
1262     $self->_do_query($_) foreach @$action;
1263   }
1264   else {
1265     # Most debuggers expect ($sql, @bind), so we need to exclude
1266     # the attribute hash which is the second argument to $dbh->do
1267     # furthermore the bind values are usually to be presented
1268     # as named arrayref pairs, so wrap those here too
1269     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1270     my $sql = shift @do_args;
1271     my $attrs = shift @do_args;
1272     my @bind = map { [ undef, $_ ] } @do_args;
1273
1274     $self->_query_start($sql, @bind);
1275     $self->_get_dbh->do($sql, $attrs, @do_args);
1276     $self->_query_end($sql, @bind);
1277   }
1278
1279   return $self;
1280 }
1281
1282 sub _connect {
1283   my ($self, @info) = @_;
1284
1285   $self->throw_exception("You failed to provide any connection info")
1286     if !@info;
1287
1288   my ($old_connect_via, $dbh);
1289
1290   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1291     $old_connect_via = $DBI::connect_via;
1292     $DBI::connect_via = 'connect';
1293   }
1294
1295   try {
1296     if(ref $info[0] eq 'CODE') {
1297        $dbh = $info[0]->();
1298     }
1299     else {
1300        $dbh = DBI->connect(@info);
1301     }
1302
1303     if (!$dbh) {
1304       die $DBI::errstr;
1305     }
1306
1307     unless ($self->unsafe) {
1308
1309       $self->throw_exception(
1310         'Refusing clobbering of {HandleError} installed on externally supplied '
1311        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1312       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1313
1314       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1315       # request, or an external handle. Complain and set anyway
1316       unless ($dbh->{RaiseError}) {
1317         carp( ref $info[0] eq 'CODE'
1318
1319           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1320            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1321            .'attribute has been supplied'
1322
1323           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1324            .'unsafe => 1. Toggling RaiseError back to true'
1325         );
1326
1327         $dbh->{RaiseError} = 1;
1328       }
1329
1330       # this odd anonymous coderef dereference is in fact really
1331       # necessary to avoid the unwanted effect described in perl5
1332       # RT#75792
1333       sub {
1334         my $weak_self = $_[0];
1335         weaken $weak_self;
1336
1337         # the coderef is blessed so we can distinguish it from externally
1338         # supplied handles (which must be preserved)
1339         $_[1]->{HandleError} = bless sub {
1340           if ($weak_self) {
1341             $weak_self->throw_exception("DBI Exception: $_[0]");
1342           }
1343           else {
1344             # the handler may be invoked by something totally out of
1345             # the scope of DBIC
1346             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1347           }
1348         }, '__DBIC__DBH__ERROR__HANDLER__';
1349       }->($self, $dbh);
1350     }
1351   }
1352   catch {
1353     $self->throw_exception("DBI Connection failed: $_")
1354   }
1355   finally {
1356     $DBI::connect_via = $old_connect_via if $old_connect_via;
1357   };
1358
1359   $self->_dbh_autocommit($dbh->{AutoCommit});
1360   $dbh;
1361 }
1362
1363 sub svp_begin {
1364   my ($self, $name) = @_;
1365
1366   $name = $self->_svp_generate_name
1367     unless defined $name;
1368
1369   $self->throw_exception ("You can't use savepoints outside a transaction")
1370     if $self->{transaction_depth} == 0;
1371
1372   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1373     unless $self->can('_svp_begin');
1374
1375   push @{ $self->{savepoints} }, $name;
1376
1377   $self->debugobj->svp_begin($name) if $self->debug;
1378
1379   return $self->_svp_begin($name);
1380 }
1381
1382 sub svp_release {
1383   my ($self, $name) = @_;
1384
1385   $self->throw_exception ("You can't use savepoints outside a transaction")
1386     if $self->{transaction_depth} == 0;
1387
1388   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1389     unless $self->can('_svp_release');
1390
1391   if (defined $name) {
1392     $self->throw_exception ("Savepoint '$name' does not exist")
1393       unless grep { $_ eq $name } @{ $self->{savepoints} };
1394
1395     # Dig through the stack until we find the one we are releasing.  This keeps
1396     # the stack up to date.
1397     my $svp;
1398
1399     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1400   } else {
1401     $name = pop @{ $self->{savepoints} };
1402   }
1403
1404   $self->debugobj->svp_release($name) if $self->debug;
1405
1406   return $self->_svp_release($name);
1407 }
1408
1409 sub svp_rollback {
1410   my ($self, $name) = @_;
1411
1412   $self->throw_exception ("You can't use savepoints outside a transaction")
1413     if $self->{transaction_depth} == 0;
1414
1415   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1416     unless $self->can('_svp_rollback');
1417
1418   if (defined $name) {
1419       # If they passed us a name, verify that it exists in the stack
1420       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1421           $self->throw_exception("Savepoint '$name' does not exist!");
1422       }
1423
1424       # Dig through the stack until we find the one we are releasing.  This keeps
1425       # the stack up to date.
1426       while(my $s = pop(@{ $self->{savepoints} })) {
1427           last if($s eq $name);
1428       }
1429       # Add the savepoint back to the stack, as a rollback doesn't remove the
1430       # named savepoint, only everything after it.
1431       push(@{ $self->{savepoints} }, $name);
1432   } else {
1433       # We'll assume they want to rollback to the last savepoint
1434       $name = $self->{savepoints}->[-1];
1435   }
1436
1437   $self->debugobj->svp_rollback($name) if $self->debug;
1438
1439   return $self->_svp_rollback($name);
1440 }
1441
1442 sub _svp_generate_name {
1443   my ($self) = @_;
1444   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1445 }
1446
1447 sub txn_begin {
1448   my $self = shift;
1449
1450   # this means we have not yet connected and do not know the AC status
1451   # (e.g. coderef $dbh)
1452   if (! defined $self->_dbh_autocommit) {
1453     $self->ensure_connected;
1454   }
1455   # otherwise re-connect on pid changes, so
1456   # that the txn_depth is adjusted properly
1457   # the lightweight _get_dbh is good enoug here
1458   # (only superficial handle check, no pings)
1459   else {
1460     $self->_get_dbh;
1461   }
1462
1463   if($self->transaction_depth == 0) {
1464     $self->debugobj->txn_begin()
1465       if $self->debug;
1466     $self->_dbh_begin_work;
1467   }
1468   elsif ($self->auto_savepoint) {
1469     $self->svp_begin;
1470   }
1471   $self->{transaction_depth}++;
1472 }
1473
1474 sub _dbh_begin_work {
1475   my $self = shift;
1476
1477   # if the user is utilizing txn_do - good for him, otherwise we need to
1478   # ensure that the $dbh is healthy on BEGIN.
1479   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1480   # will be replaced by a failure of begin_work itself (which will be
1481   # then retried on reconnect)
1482   if ($self->{_in_dbh_do}) {
1483     $self->_dbh->begin_work;
1484   } else {
1485     $self->dbh_do(sub { $_[1]->begin_work });
1486   }
1487 }
1488
1489 sub txn_commit {
1490   my $self = shift;
1491   if (! $self->_dbh) {
1492     $self->throw_exception('cannot COMMIT on a disconnected handle');
1493   }
1494   elsif ($self->{transaction_depth} == 1) {
1495     $self->debugobj->txn_commit()
1496       if ($self->debug);
1497     $self->_dbh_commit;
1498     $self->{transaction_depth} = 0
1499       if $self->_dbh_autocommit;
1500   }
1501   elsif($self->{transaction_depth} > 1) {
1502     $self->{transaction_depth}--;
1503     $self->svp_release
1504       if $self->auto_savepoint;
1505   }
1506   elsif (! $self->_dbh->FETCH('AutoCommit') ) {
1507
1508     carp "Storage transaction_depth $self->{transaction_depth} does not match "
1509         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1510
1511     $self->debugobj->txn_commit()
1512       if ($self->debug);
1513     $self->_dbh_commit;
1514     $self->{transaction_depth} = 0
1515       if $self->_dbh_autocommit;
1516   }
1517   else {
1518     $self->throw_exception( 'Refusing to commit without a started transaction' );
1519   }
1520 }
1521
1522 sub _dbh_commit {
1523   my $self = shift;
1524   my $dbh  = $self->_dbh
1525     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1526   $dbh->commit;
1527 }
1528
1529 sub txn_rollback {
1530   my $self = shift;
1531   my $dbh = $self->_dbh;
1532   try {
1533     if ($self->{transaction_depth} == 1) {
1534       $self->debugobj->txn_rollback()
1535         if ($self->debug);
1536       $self->{transaction_depth} = 0
1537         if $self->_dbh_autocommit;
1538       $self->_dbh_rollback;
1539     }
1540     elsif($self->{transaction_depth} > 1) {
1541       $self->{transaction_depth}--;
1542       if ($self->auto_savepoint) {
1543         $self->svp_rollback;
1544         $self->svp_release;
1545       }
1546     }
1547     else {
1548       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1549     }
1550   }
1551   catch {
1552     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1553
1554     if ($_ !~ /$exception_class/) {
1555       # ensure that a failed rollback resets the transaction depth
1556       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1557     }
1558
1559     $self->throw_exception($_)
1560   };
1561 }
1562
1563 sub _dbh_rollback {
1564   my $self = shift;
1565   my $dbh  = $self->_dbh
1566     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1567   $dbh->rollback;
1568 }
1569
1570 # This used to be the top-half of _execute.  It was split out to make it
1571 #  easier to override in NoBindVars without duping the rest.  It takes up
1572 #  all of _execute's args, and emits $sql, @bind.
1573 sub _prep_for_execute {
1574   my ($self, $op, $extra_bind, $ident, $args) = @_;
1575
1576   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1577     $ident = $ident->from();
1578   }
1579
1580   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1581
1582   unshift(@bind,
1583     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1584       if $extra_bind;
1585   return ($sql, \@bind);
1586 }
1587
1588
1589 sub _fix_bind_params {
1590     my ($self, @bind) = @_;
1591
1592     ### Turn @bind from something like this:
1593     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1594     ### to this:
1595     ###   ( "'1'", "'1'", "'3'" )
1596     return
1597         map {
1598             if ( defined( $_ && $_->[1] ) ) {
1599                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1600             }
1601             else { q{NULL}; }
1602         } @bind;
1603 }
1604
1605 sub _query_start {
1606     my ( $self, $sql, @bind ) = @_;
1607
1608     if ( $self->debug ) {
1609         @bind = $self->_fix_bind_params(@bind);
1610
1611         $self->debugobj->query_start( $sql, @bind );
1612     }
1613 }
1614
1615 sub _query_end {
1616     my ( $self, $sql, @bind ) = @_;
1617
1618     if ( $self->debug ) {
1619         @bind = $self->_fix_bind_params(@bind);
1620         $self->debugobj->query_end( $sql, @bind );
1621     }
1622 }
1623
1624 sub _dbh_execute {
1625   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1626
1627   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1628
1629   $self->_query_start( $sql, @$bind );
1630
1631   my $sth = $self->sth($sql,$op);
1632
1633   my $placeholder_index = 1;
1634
1635   foreach my $bound (@$bind) {
1636     my $attributes = {};
1637     my($column_name, @data) = @$bound;
1638
1639     if ($bind_attributes) {
1640       $attributes = $bind_attributes->{$column_name}
1641       if defined $bind_attributes->{$column_name};
1642     }
1643
1644     foreach my $data (@data) {
1645       my $ref = ref $data;
1646
1647       if ($ref and overload::Method($data, '""') ) {
1648         $data = "$data";
1649       }
1650       elsif ($ref eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1651         $sth->bind_param_inout(
1652           $placeholder_index++,
1653           $data,
1654           $self->_max_column_bytesize($ident, $column_name),
1655           $attributes
1656         );
1657         next;
1658       }
1659
1660       $sth->bind_param($placeholder_index++, $data, $attributes);
1661     }
1662   }
1663
1664   # Can this fail without throwing an exception anyways???
1665   my $rv = $sth->execute();
1666   $self->throw_exception(
1667     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1668   ) if !$rv;
1669
1670   $self->_query_end( $sql, @$bind );
1671
1672   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1673 }
1674
1675 sub _execute {
1676     my $self = shift;
1677     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1678 }
1679
1680 sub _prefetch_autovalues {
1681   my ($self, $source, $to_insert) = @_;
1682
1683   my $colinfo = $source->columns_info;
1684
1685   my %values;
1686   for my $col (keys %$colinfo) {
1687     if (
1688       $colinfo->{$col}{auto_nextval}
1689         and
1690       (
1691         ! exists $to_insert->{$col}
1692           or
1693         ref $to_insert->{$col} eq 'SCALAR'
1694       )
1695     ) {
1696       $values{$col} = $self->_sequence_fetch(
1697         'NEXTVAL',
1698         ( $colinfo->{$col}{sequence} ||=
1699             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1700         ),
1701       );
1702     }
1703   }
1704
1705   \%values;
1706 }
1707
1708 sub insert {
1709   my ($self, $source, $to_insert) = @_;
1710
1711   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1712
1713   # fuse the values
1714   $to_insert = { %$to_insert, %$prefetched_values };
1715
1716   # list of primary keys we try to fetch from the database
1717   # both not-exsists and scalarrefs are considered
1718   my %fetch_pks;
1719   for ($source->primary_columns) {
1720     $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
1721       if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
1722   }
1723
1724   my ($sqla_opts, @ir_container);
1725   if ($self->_use_insert_returning) {
1726
1727     # retain order as declared in the resultsource
1728     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1729       push @{$sqla_opts->{returning}}, $_;
1730       $sqla_opts->{returning_container} = \@ir_container
1731         if $self->_use_insert_returning_bound;
1732     }
1733   }
1734
1735   my $bind_attributes = $self->source_bind_attributes($source);
1736
1737   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1738
1739   my %returned_cols;
1740
1741   if (my $retlist = $sqla_opts->{returning}) {
1742     @ir_container = try {
1743       local $SIG{__WARN__} = sub {};
1744       my @r = $sth->fetchrow_array;
1745       $sth->finish;
1746       @r;
1747     } unless @ir_container;
1748
1749     @returned_cols{@$retlist} = @ir_container if @ir_container;
1750   }
1751
1752   return { %$prefetched_values, %returned_cols };
1753 }
1754
1755
1756 ## Currently it is assumed that all values passed will be "normal", i.e. not
1757 ## scalar refs, or at least, all the same type as the first set, the statement is
1758 ## only prepped once.
1759 sub insert_bulk {
1760   my ($self, $source, $cols, $data) = @_;
1761
1762   my %colvalues;
1763   @colvalues{@$cols} = (0..$#$cols);
1764
1765   for my $i (0..$#$cols) {
1766     my $first_val = $data->[0][$i];
1767     next unless ref $first_val eq 'SCALAR';
1768
1769     $colvalues{ $cols->[$i] } = $first_val;
1770   }
1771
1772   # check for bad data and stringify stringifiable objects
1773   my $bad_slice = sub {
1774     my ($msg, $col_idx, $slice_idx) = @_;
1775     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1776       $msg,
1777       $cols->[$col_idx],
1778       do {
1779         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1780         Dumper {
1781           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1782         },
1783       }
1784     );
1785   };
1786
1787   for my $datum_idx (0..$#$data) {
1788     my $datum = $data->[$datum_idx];
1789
1790     for my $col_idx (0..$#$cols) {
1791       my $val            = $datum->[$col_idx];
1792       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1793       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1794
1795       if ($is_literal_sql) {
1796         if (not ref $val) {
1797           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1798         }
1799         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1800           $bad_slice->("$reftype reference found where literal SQL expected",
1801             $col_idx, $datum_idx);
1802         }
1803         elsif ($$val ne $$sqla_bind){
1804           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1805             $col_idx, $datum_idx);
1806         }
1807       }
1808       elsif (my $reftype = ref $val) {
1809         require overload;
1810         if (overload::Method($val, '""')) {
1811           $datum->[$col_idx] = "".$val;
1812         }
1813         else {
1814           $bad_slice->("$reftype reference found where bind expected",
1815             $col_idx, $datum_idx);
1816         }
1817       }
1818     }
1819   }
1820
1821   my ($sql, $bind) = $self->_prep_for_execute (
1822     'insert', undef, $source, [\%colvalues]
1823   );
1824
1825   if (! @$bind) {
1826     # if the bindlist is empty - make sure all "values" are in fact
1827     # literal scalarrefs. If not the case this means the storage ate
1828     # them away (e.g. the NoBindVars component) and interpolated them
1829     # directly into the SQL. This obviosly can't be good for multi-inserts
1830
1831     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1832       if first { ref $_ ne 'SCALAR' } values %colvalues;
1833   }
1834
1835   # neither _execute_array, nor _execute_inserts_with_no_binds are
1836   # atomic (even if _execute _array is a single call). Thus a safety
1837   # scope guard
1838   my $guard = $self->txn_scope_guard;
1839
1840   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1841   my $sth = $self->sth($sql);
1842   my $rv = do {
1843     if (@$bind) {
1844       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1845       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1846     }
1847     else {
1848       # bind_param_array doesn't work if there are no binds
1849       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1850     }
1851   };
1852
1853   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1854
1855   $guard->commit;
1856
1857   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1858 }
1859
1860 sub _execute_array {
1861   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1862
1863   ## This must be an arrayref, else nothing works!
1864   my $tuple_status = [];
1865
1866   ## Get the bind_attributes, if any exist
1867   my $bind_attributes = $self->source_bind_attributes($source);
1868
1869   ## Bind the values and execute
1870   my $placeholder_index = 1;
1871
1872   foreach my $bound (@$bind) {
1873
1874     my $attributes = {};
1875     my ($column_name, $data_index) = @$bound;
1876
1877     if( $bind_attributes ) {
1878       $attributes = $bind_attributes->{$column_name}
1879       if defined $bind_attributes->{$column_name};
1880     }
1881
1882     my @data = map { $_->[$data_index] } @$data;
1883
1884     $sth->bind_param_array(
1885       $placeholder_index,
1886       [@data],
1887       (%$attributes ?  $attributes : ()),
1888     );
1889     $placeholder_index++;
1890   }
1891
1892   my ($rv, $err);
1893   try {
1894     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1895   }
1896   catch {
1897     $err = shift;
1898   };
1899
1900   # Not all DBDs are create equal. Some throw on error, some return
1901   # an undef $rv, and some set $sth->err - try whatever we can
1902   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1903     ! defined $err
1904       and
1905     ( !defined $rv or $sth->err )
1906   );
1907
1908   # Statement must finish even if there was an exception.
1909   try {
1910     $sth->finish
1911   }
1912   catch {
1913     $err = shift unless defined $err
1914   };
1915
1916   if (defined $err) {
1917     my $i = 0;
1918     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1919
1920     $self->throw_exception("Unexpected populate error: $err")
1921       if ($i > $#$tuple_status);
1922
1923     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1924       ($tuple_status->[$i][1] || $err),
1925       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1926     );
1927   }
1928
1929   return $rv;
1930 }
1931
1932 sub _dbh_execute_array {
1933     my ($self, $sth, $tuple_status, @extra) = @_;
1934
1935     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1936 }
1937
1938 sub _dbh_execute_inserts_with_no_binds {
1939   my ($self, $sth, $count) = @_;
1940
1941   my $err;
1942   try {
1943     my $dbh = $self->_get_dbh;
1944     local $dbh->{RaiseError} = 1;
1945     local $dbh->{PrintError} = 0;
1946
1947     $sth->execute foreach 1..$count;
1948   }
1949   catch {
1950     $err = shift;
1951   };
1952
1953   # Make sure statement is finished even if there was an exception.
1954   try {
1955     $sth->finish
1956   }
1957   catch {
1958     $err = shift unless defined $err;
1959   };
1960
1961   $self->throw_exception($err) if defined $err;
1962
1963   return $count;
1964 }
1965
1966 sub update {
1967   my ($self, $source, @args) = @_;
1968
1969   my $bind_attrs = $self->source_bind_attributes($source);
1970
1971   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1972 }
1973
1974
1975 sub delete {
1976   my ($self, $source, @args) = @_;
1977
1978   my $bind_attrs = $self->source_bind_attributes($source);
1979
1980   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1981 }
1982
1983 # We were sent here because the $rs contains a complex search
1984 # which will require a subquery to select the correct rows
1985 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1986 #
1987 # Generating a single PK column subquery is trivial and supported
1988 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1989 # Look at _multipk_update_delete()
1990 sub _subq_update_delete {
1991   my $self = shift;
1992   my ($rs, $op, $values) = @_;
1993
1994   my $rsrc = $rs->result_source;
1995
1996   # quick check if we got a sane rs on our hands
1997   my @pcols = $rsrc->_pri_cols;
1998
1999   my $sel = $rs->_resolved_attrs->{select};
2000   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
2001
2002   if (
2003       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
2004         ne
2005       join ("\x00", sort @$sel )
2006   ) {
2007     $self->throw_exception (
2008       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
2009     );
2010   }
2011
2012   if (@pcols == 1) {
2013     return $self->$op (
2014       $rsrc,
2015       $op eq 'update' ? $values : (),
2016       { $pcols[0] => { -in => $rs->as_query } },
2017     );
2018   }
2019
2020   else {
2021     return $self->_multipk_update_delete (@_);
2022   }
2023 }
2024
2025 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2026 # resultset update/delete involving subqueries. So by default resort
2027 # to simple (and inefficient) delete_all style per-row opearations,
2028 # while allowing specific storages to override this with a faster
2029 # implementation.
2030 #
2031 sub _multipk_update_delete {
2032   return shift->_per_row_update_delete (@_);
2033 }
2034
2035 # This is the default loop used to delete/update rows for multi PK
2036 # resultsets, and used by mysql exclusively (because it can't do anything
2037 # else).
2038 #
2039 # We do not use $row->$op style queries, because resultset update/delete
2040 # is not expected to cascade (this is what delete_all/update_all is for).
2041 #
2042 # There should be no race conditions as the entire operation is rolled
2043 # in a transaction.
2044 #
2045 sub _per_row_update_delete {
2046   my $self = shift;
2047   my ($rs, $op, $values) = @_;
2048
2049   my $rsrc = $rs->result_source;
2050   my @pcols = $rsrc->_pri_cols;
2051
2052   my $guard = $self->txn_scope_guard;
2053
2054   # emulate the return value of $sth->execute for non-selects
2055   my $row_cnt = '0E0';
2056
2057   my $subrs_cur = $rs->cursor;
2058   my @all_pk = $subrs_cur->all;
2059   for my $pks ( @all_pk) {
2060
2061     my $cond;
2062     for my $i (0.. $#pcols) {
2063       $cond->{$pcols[$i]} = $pks->[$i];
2064     }
2065
2066     $self->$op (
2067       $rsrc,
2068       $op eq 'update' ? $values : (),
2069       $cond,
2070     );
2071
2072     $row_cnt++;
2073   }
2074
2075   $guard->commit;
2076
2077   return $row_cnt;
2078 }
2079
2080 sub _select {
2081   my $self = shift;
2082   $self->_execute($self->_select_args(@_));
2083 }
2084
2085 sub _select_args_to_query {
2086   my $self = shift;
2087
2088   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
2089   #  = $self->_select_args($ident, $select, $cond, $attrs);
2090   my ($op, $bind, $ident, $bind_attrs, @args) =
2091     $self->_select_args(@_);
2092
2093   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2094   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
2095   $prepared_bind ||= [];
2096
2097   return wantarray
2098     ? ($sql, $prepared_bind, $bind_attrs)
2099     : \[ "($sql)", @$prepared_bind ]
2100   ;
2101 }
2102
2103 sub _select_args {
2104   my ($self, $ident, $select, $where, $attrs) = @_;
2105
2106   my $sql_maker = $self->sql_maker;
2107   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2108
2109   $attrs = {
2110     %$attrs,
2111     select => $select,
2112     from => $ident,
2113     where => $where,
2114     $rs_alias && $alias2source->{$rs_alias}
2115       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2116       : ()
2117     ,
2118   };
2119
2120   # calculate bind_attrs before possible $ident mangling
2121   my $bind_attrs = {};
2122   for my $alias (keys %$alias2source) {
2123     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2124     for my $col (keys %$bindtypes) {
2125
2126       my $fqcn = join ('.', $alias, $col);
2127       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2128
2129       # Unqialified column names are nice, but at the same time can be
2130       # rather ambiguous. What we do here is basically go along with
2131       # the loop, adding an unqualified column slot to $bind_attrs,
2132       # alongside the fully qualified name. As soon as we encounter
2133       # another column by that name (which would imply another table)
2134       # we unset the unqualified slot and never add any info to it
2135       # to avoid erroneous type binding. If this happens the users
2136       # only choice will be to fully qualify his column name
2137
2138       if (exists $bind_attrs->{$col}) {
2139         $bind_attrs->{$col} = {};
2140       }
2141       else {
2142         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2143       }
2144     }
2145   }
2146
2147   # Sanity check the attributes (SQLMaker does it too, but
2148   # in case of a software_limit we'll never reach there)
2149   if (defined $attrs->{offset}) {
2150     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2151       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2152   }
2153   $attrs->{offset} ||= 0;
2154
2155   if (defined $attrs->{rows}) {
2156     $self->throw_exception("The rows attribute must be a positive integer if present")
2157       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2158   }
2159   elsif ($attrs->{offset}) {
2160     # MySQL actually recommends this approach.  I cringe.
2161     $attrs->{rows} = $sql_maker->__max_int;
2162   }
2163
2164   my @limit;
2165
2166   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2167   # storage, unless software limit was requested
2168   if (
2169     #limited has_many
2170     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2171        ||
2172     # grouped prefetch (to satisfy group_by == select)
2173     ( $attrs->{group_by}
2174         &&
2175       @{$attrs->{group_by}}
2176         &&
2177       $attrs->{_prefetch_selector_range}
2178     )
2179   ) {
2180     ($ident, $select, $where, $attrs)
2181       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2182   }
2183   elsif (! $attrs->{software_limit} ) {
2184     push @limit, $attrs->{rows}, $attrs->{offset};
2185   }
2186
2187   # try to simplify the joinmap further (prune unreferenced type-single joins)
2188   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2189
2190 ###
2191   # This would be the point to deflate anything found in $where
2192   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2193   # expect a row object. And all we have is a resultsource (it is trivial
2194   # to extract deflator coderefs via $alias2source above).
2195   #
2196   # I don't see a way forward other than changing the way deflators are
2197   # invoked, and that's just bad...
2198 ###
2199
2200   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2201 }
2202
2203 # Returns a counting SELECT for a simple count
2204 # query. Abstracted so that a storage could override
2205 # this to { count => 'firstcol' } or whatever makes
2206 # sense as a performance optimization
2207 sub _count_select {
2208   #my ($self, $source, $rs_attrs) = @_;
2209   return { count => '*' };
2210 }
2211
2212
2213 sub source_bind_attributes {
2214   my ($self, $source) = @_;
2215
2216   my $bind_attributes;
2217
2218   my $colinfo = $source->columns_info;
2219
2220   for my $col (keys %$colinfo) {
2221     if (my $dt = $colinfo->{$col}{data_type} ) {
2222       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2223     }
2224   }
2225
2226   return $bind_attributes;
2227 }
2228
2229 =head2 select
2230
2231 =over 4
2232
2233 =item Arguments: $ident, $select, $condition, $attrs
2234
2235 =back
2236
2237 Handle a SQL select statement.
2238
2239 =cut
2240
2241 sub select {
2242   my $self = shift;
2243   my ($ident, $select, $condition, $attrs) = @_;
2244   return $self->cursor_class->new($self, \@_, $attrs);
2245 }
2246
2247 sub select_single {
2248   my $self = shift;
2249   my ($rv, $sth, @bind) = $self->_select(@_);
2250   my @row = $sth->fetchrow_array;
2251   my @nextrow = $sth->fetchrow_array if @row;
2252   if(@row && @nextrow) {
2253     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2254   }
2255   # Need to call finish() to work round broken DBDs
2256   $sth->finish();
2257   return @row;
2258 }
2259
2260 =head2 sql_limit_dialect
2261
2262 This is an accessor for the default SQL limit dialect used by a particular
2263 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2264 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2265 see L<DBIx::Class::SQLMaker::LimitDialects>.
2266
2267 =head2 sth
2268
2269 =over 4
2270
2271 =item Arguments: $sql
2272
2273 =back
2274
2275 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2276
2277 =cut
2278
2279 sub _dbh_sth {
2280   my ($self, $dbh, $sql) = @_;
2281
2282   # 3 is the if_active parameter which avoids active sth re-use
2283   my $sth = $self->disable_sth_caching
2284     ? $dbh->prepare($sql)
2285     : $dbh->prepare_cached($sql, {}, 3);
2286
2287   # XXX You would think RaiseError would make this impossible,
2288   #  but apparently that's not true :(
2289   $self->throw_exception(
2290     $dbh->errstr
2291       ||
2292     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2293             .'an exception and/or setting $dbh->errstr',
2294       length ($sql) > 20
2295         ? substr($sql, 0, 20) . '...'
2296         : $sql
2297       ,
2298       'DBD::' . $dbh->{Driver}{Name},
2299     )
2300   ) if !$sth;
2301
2302   $sth;
2303 }
2304
2305 sub sth {
2306   my ($self, $sql) = @_;
2307   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2308 }
2309
2310 sub _dbh_columns_info_for {
2311   my ($self, $dbh, $table) = @_;
2312
2313   if ($dbh->can('column_info')) {
2314     my %result;
2315     my $caught;
2316     try {
2317       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2318       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2319       $sth->execute();
2320       while ( my $info = $sth->fetchrow_hashref() ){
2321         my %column_info;
2322         $column_info{data_type}   = $info->{TYPE_NAME};
2323         $column_info{size}      = $info->{COLUMN_SIZE};
2324         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2325         $column_info{default_value} = $info->{COLUMN_DEF};
2326         my $col_name = $info->{COLUMN_NAME};
2327         $col_name =~ s/^\"(.*)\"$/$1/;
2328
2329         $result{$col_name} = \%column_info;
2330       }
2331     } catch {
2332       $caught = 1;
2333     };
2334     return \%result if !$caught && scalar keys %result;
2335   }
2336
2337   my %result;
2338   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2339   $sth->execute;
2340   my @columns = @{$sth->{NAME_lc}};
2341   for my $i ( 0 .. $#columns ){
2342     my %column_info;
2343     $column_info{data_type} = $sth->{TYPE}->[$i];
2344     $column_info{size} = $sth->{PRECISION}->[$i];
2345     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2346
2347     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2348       $column_info{data_type} = $1;
2349       $column_info{size}    = $2;
2350     }
2351
2352     $result{$columns[$i]} = \%column_info;
2353   }
2354   $sth->finish;
2355
2356   foreach my $col (keys %result) {
2357     my $colinfo = $result{$col};
2358     my $type_num = $colinfo->{data_type};
2359     my $type_name;
2360     if(defined $type_num && $dbh->can('type_info')) {
2361       my $type_info = $dbh->type_info($type_num);
2362       $type_name = $type_info->{TYPE_NAME} if $type_info;
2363       $colinfo->{data_type} = $type_name if $type_name;
2364     }
2365   }
2366
2367   return \%result;
2368 }
2369
2370 sub columns_info_for {
2371   my ($self, $table) = @_;
2372   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2373 }
2374
2375 =head2 last_insert_id
2376
2377 Return the row id of the last insert.
2378
2379 =cut
2380
2381 sub _dbh_last_insert_id {
2382     my ($self, $dbh, $source, $col) = @_;
2383
2384     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2385
2386     return $id if defined $id;
2387
2388     my $class = ref $self;
2389     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2390 }
2391
2392 sub last_insert_id {
2393   my $self = shift;
2394   $self->_dbh_last_insert_id ($self->_dbh, @_);
2395 }
2396
2397 =head2 _native_data_type
2398
2399 =over 4
2400
2401 =item Arguments: $type_name
2402
2403 =back
2404
2405 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2406 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2407 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2408
2409 The default implementation returns C<undef>, implement in your Storage driver if
2410 you need this functionality.
2411
2412 Should map types from other databases to the native RDBMS type, for example
2413 C<VARCHAR2> to C<VARCHAR>.
2414
2415 Types with modifiers should map to the underlying data type. For example,
2416 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2417
2418 Composite types should map to the container type, for example
2419 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2420
2421 =cut
2422
2423 sub _native_data_type {
2424   #my ($self, $data_type) = @_;
2425   return undef
2426 }
2427
2428 # Check if placeholders are supported at all
2429 sub _determine_supports_placeholders {
2430   my $self = shift;
2431   my $dbh  = $self->_get_dbh;
2432
2433   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2434   # but it is inaccurate more often than not
2435   return try {
2436     local $dbh->{PrintError} = 0;
2437     local $dbh->{RaiseError} = 1;
2438     $dbh->do('select ?', {}, 1);
2439     1;
2440   }
2441   catch {
2442     0;
2443   };
2444 }
2445
2446 # Check if placeholders bound to non-string types throw exceptions
2447 #
2448 sub _determine_supports_typeless_placeholders {
2449   my $self = shift;
2450   my $dbh  = $self->_get_dbh;
2451
2452   return try {
2453     local $dbh->{PrintError} = 0;
2454     local $dbh->{RaiseError} = 1;
2455     # this specifically tests a bind that is NOT a string
2456     $dbh->do('select 1 where 1 = ?', {}, 1);
2457     1;
2458   }
2459   catch {
2460     0;
2461   };
2462 }
2463
2464 =head2 sqlt_type
2465
2466 Returns the database driver name.
2467
2468 =cut
2469
2470 sub sqlt_type {
2471   shift->_get_dbh->{Driver}->{Name};
2472 }
2473
2474 =head2 bind_attribute_by_data_type
2475
2476 Given a datatype from column info, returns a database specific bind
2477 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2478 let the database planner just handle it.
2479
2480 Generally only needed for special case column types, like bytea in postgres.
2481
2482 =cut
2483
2484 sub bind_attribute_by_data_type {
2485     return;
2486 }
2487
2488 =head2 is_datatype_numeric
2489
2490 Given a datatype from column_info, returns a boolean value indicating if
2491 the current RDBMS considers it a numeric value. This controls how
2492 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2493 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2494 be performed instead of the usual C<eq>.
2495
2496 =cut
2497
2498 sub is_datatype_numeric {
2499   my ($self, $dt) = @_;
2500
2501   return 0 unless $dt;
2502
2503   return $dt =~ /^ (?:
2504     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2505   ) $/ix;
2506 }
2507
2508
2509 =head2 create_ddl_dir
2510
2511 =over 4
2512
2513 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2514
2515 =back
2516
2517 Creates a SQL file based on the Schema, for each of the specified
2518 database engines in C<\@databases> in the given directory.
2519 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2520
2521 Given a previous version number, this will also create a file containing
2522 the ALTER TABLE statements to transform the previous schema into the
2523 current one. Note that these statements may contain C<DROP TABLE> or
2524 C<DROP COLUMN> statements that can potentially destroy data.
2525
2526 The file names are created using the C<ddl_filename> method below, please
2527 override this method in your schema if you would like a different file
2528 name format. For the ALTER file, the same format is used, replacing
2529 $version in the name with "$preversion-$version".
2530
2531 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2532 The most common value for this would be C<< { add_drop_table => 1 } >>
2533 to have the SQL produced include a C<DROP TABLE> statement for each table
2534 created. For quoting purposes supply C<quote_table_names> and
2535 C<quote_field_names>.
2536
2537 If no arguments are passed, then the following default values are assumed:
2538
2539 =over 4
2540
2541 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2542
2543 =item version    - $schema->schema_version
2544
2545 =item directory  - './'
2546
2547 =item preversion - <none>
2548
2549 =back
2550
2551 By default, C<\%sqlt_args> will have
2552
2553  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2554
2555 merged with the hash passed in. To disable any of those features, pass in a
2556 hashref like the following
2557
2558  { ignore_constraint_names => 0, # ... other options }
2559
2560
2561 WARNING: You are strongly advised to check all SQL files created, before applying
2562 them.
2563
2564 =cut
2565
2566 sub create_ddl_dir {
2567   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2568
2569   unless ($dir) {
2570     carp "No directory given, using ./\n";
2571     $dir = './';
2572   } else {
2573       -d $dir
2574         or
2575       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2576         or
2577       $self->throw_exception(
2578         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2579       );
2580   }
2581
2582   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2583
2584   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2585   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2586
2587   my $schema_version = $schema->schema_version || '1.x';
2588   $version ||= $schema_version;
2589
2590   $sqltargs = {
2591     add_drop_table => 1,
2592     ignore_constraint_names => 1,
2593     ignore_index_names => 1,
2594     %{$sqltargs || {}}
2595   };
2596
2597   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2598     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2599   }
2600
2601   my $sqlt = SQL::Translator->new( $sqltargs );
2602
2603   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2604   my $sqlt_schema = $sqlt->translate({ data => $schema })
2605     or $self->throw_exception ($sqlt->error);
2606
2607   foreach my $db (@$databases) {
2608     $sqlt->reset();
2609     $sqlt->{schema} = $sqlt_schema;
2610     $sqlt->producer($db);
2611
2612     my $file;
2613     my $filename = $schema->ddl_filename($db, $version, $dir);
2614     if (-e $filename && ($version eq $schema_version )) {
2615       # if we are dumping the current version, overwrite the DDL
2616       carp "Overwriting existing DDL file - $filename";
2617       unlink($filename);
2618     }
2619
2620     my $output = $sqlt->translate;
2621     if(!$output) {
2622       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2623       next;
2624     }
2625     if(!open($file, ">$filename")) {
2626       $self->throw_exception("Can't open $filename for writing ($!)");
2627       next;
2628     }
2629     print $file $output;
2630     close($file);
2631
2632     next unless ($preversion);
2633
2634     require SQL::Translator::Diff;
2635
2636     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2637     if(!-e $prefilename) {
2638       carp("No previous schema file found ($prefilename)");
2639       next;
2640     }
2641
2642     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2643     if(-e $difffile) {
2644       carp("Overwriting existing diff file - $difffile");
2645       unlink($difffile);
2646     }
2647
2648     my $source_schema;
2649     {
2650       my $t = SQL::Translator->new($sqltargs);
2651       $t->debug( 0 );
2652       $t->trace( 0 );
2653
2654       $t->parser( $db )
2655         or $self->throw_exception ($t->error);
2656
2657       my $out = $t->translate( $prefilename )
2658         or $self->throw_exception ($t->error);
2659
2660       $source_schema = $t->schema;
2661
2662       $source_schema->name( $prefilename )
2663         unless ( $source_schema->name );
2664     }
2665
2666     # The "new" style of producers have sane normalization and can support
2667     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2668     # And we have to diff parsed SQL against parsed SQL.
2669     my $dest_schema = $sqlt_schema;
2670
2671     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2672       my $t = SQL::Translator->new($sqltargs);
2673       $t->debug( 0 );
2674       $t->trace( 0 );
2675
2676       $t->parser( $db )
2677         or $self->throw_exception ($t->error);
2678
2679       my $out = $t->translate( $filename )
2680         or $self->throw_exception ($t->error);
2681
2682       $dest_schema = $t->schema;
2683
2684       $dest_schema->name( $filename )
2685         unless $dest_schema->name;
2686     }
2687
2688     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2689                                                   $dest_schema,   $db,
2690                                                   $sqltargs
2691                                                  );
2692     if(!open $file, ">$difffile") {
2693       $self->throw_exception("Can't write to $difffile ($!)");
2694       next;
2695     }
2696     print $file $diff;
2697     close($file);
2698   }
2699 }
2700
2701 =head2 deployment_statements
2702
2703 =over 4
2704
2705 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2706
2707 =back
2708
2709 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2710
2711 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2712 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2713
2714 C<$directory> is used to return statements from files in a previously created
2715 L</create_ddl_dir> directory and is optional. The filenames are constructed
2716 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2717
2718 If no C<$directory> is specified then the statements are constructed on the
2719 fly using L<SQL::Translator> and C<$version> is ignored.
2720
2721 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2722
2723 =cut
2724
2725 sub deployment_statements {
2726   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2727   $type ||= $self->sqlt_type;
2728   $version ||= $schema->schema_version || '1.x';
2729   $dir ||= './';
2730   my $filename = $schema->ddl_filename($type, $version, $dir);
2731   if(-f $filename)
2732   {
2733       my $file;
2734       open($file, "<$filename")
2735         or $self->throw_exception("Can't open $filename ($!)");
2736       my @rows = <$file>;
2737       close($file);
2738       return join('', @rows);
2739   }
2740
2741   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2742     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2743   }
2744
2745   # sources needs to be a parser arg, but for simplicty allow at top level
2746   # coming in
2747   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2748       if exists $sqltargs->{sources};
2749
2750   my $tr = SQL::Translator->new(
2751     producer => "SQL::Translator::Producer::${type}",
2752     %$sqltargs,
2753     parser => 'SQL::Translator::Parser::DBIx::Class',
2754     data => $schema,
2755   );
2756
2757   my @ret;
2758   if (wantarray) {
2759     @ret = $tr->translate;
2760   }
2761   else {
2762     $ret[0] = $tr->translate;
2763   }
2764
2765   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2766     unless (@ret && defined $ret[0]);
2767
2768   return wantarray ? @ret : $ret[0];
2769 }
2770
2771 sub deploy {
2772   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2773   my $deploy = sub {
2774     my $line = shift;
2775     return if($line =~ /^--/);
2776     return if(!$line);
2777     # next if($line =~ /^DROP/m);
2778     return if($line =~ /^BEGIN TRANSACTION/m);
2779     return if($line =~ /^COMMIT/m);
2780     return if $line =~ /^\s+$/; # skip whitespace only
2781     $self->_query_start($line);
2782     try {
2783       # do a dbh_do cycle here, as we need some error checking in
2784       # place (even though we will ignore errors)
2785       $self->dbh_do (sub { $_[1]->do($line) });
2786     } catch {
2787       carp qq{$_ (running "${line}")};
2788     };
2789     $self->_query_end($line);
2790   };
2791   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2792   if (@statements > 1) {
2793     foreach my $statement (@statements) {
2794       $deploy->( $statement );
2795     }
2796   }
2797   elsif (@statements == 1) {
2798     foreach my $line ( split(";\n", $statements[0])) {
2799       $deploy->( $line );
2800     }
2801   }
2802 }
2803
2804 =head2 datetime_parser
2805
2806 Returns the datetime parser class
2807
2808 =cut
2809
2810 sub datetime_parser {
2811   my $self = shift;
2812   return $self->{datetime_parser} ||= do {
2813     $self->build_datetime_parser(@_);
2814   };
2815 }
2816
2817 =head2 datetime_parser_type
2818
2819 Defines (returns) the datetime parser class - currently hardwired to
2820 L<DateTime::Format::MySQL>
2821
2822 =cut
2823
2824 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2825
2826 =head2 build_datetime_parser
2827
2828 See L</datetime_parser>
2829
2830 =cut
2831
2832 sub build_datetime_parser {
2833   my $self = shift;
2834   my $type = $self->datetime_parser_type(@_);
2835   $self->ensure_class_loaded ($type);
2836   return $type;
2837 }
2838
2839
2840 =head2 is_replicating
2841
2842 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2843 replicate from a master database.  Default is undef, which is the result
2844 returned by databases that don't support replication.
2845
2846 =cut
2847
2848 sub is_replicating {
2849     return;
2850
2851 }
2852
2853 =head2 lag_behind_master
2854
2855 Returns a number that represents a certain amount of lag behind a master db
2856 when a given storage is replicating.  The number is database dependent, but
2857 starts at zero and increases with the amount of lag. Default in undef
2858
2859 =cut
2860
2861 sub lag_behind_master {
2862     return;
2863 }
2864
2865 =head2 relname_to_table_alias
2866
2867 =over 4
2868
2869 =item Arguments: $relname, $join_count
2870
2871 =back
2872
2873 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2874 queries.
2875
2876 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2877 way these aliases are named.
2878
2879 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2880 otherwise C<"$relname">.
2881
2882 =cut
2883
2884 sub relname_to_table_alias {
2885   my ($self, $relname, $join_count) = @_;
2886
2887   my $alias = ($join_count && $join_count > 1 ?
2888     join('_', $relname, $join_count) : $relname);
2889
2890   return $alias;
2891 }
2892
2893 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2894 # version and it may be necessary to amend or override it for a specific storage
2895 # if such binds are necessary.
2896 sub _max_column_bytesize {
2897   my ($self, $source, $col) = @_;
2898
2899   my $inf = $source->column_info($col);
2900   return $inf->{_max_bytesize} ||= do {
2901
2902     my $max_size;
2903
2904     if (my $data_type = $inf->{data_type}) {
2905       $data_type = lc($data_type);
2906
2907       # String/sized-binary types
2908       if ($data_type =~ /^(?:l?(?:var)?char(?:acter)?(?:\s*varying)?
2909                              |(?:var)?binary(?:\s*varying)?|raw)\b/x
2910       ) {
2911         $max_size = $inf->{size};
2912       }
2913       # Other charset/unicode types, assume scale of 4
2914       elsif ($data_type =~ /^(?:national\s*character(?:\s*varying)?|nchar
2915                               |univarchar
2916                               |nvarchar)\b/x
2917       ) {
2918         $max_size = $inf->{size} * 4 if $inf->{size};
2919       }
2920       # Blob types
2921       elsif ($self->_is_lob_type($data_type)) {
2922         # default to longreadlen
2923       }
2924       else {
2925         $max_size = 100;  # for all other (numeric?) datatypes
2926       }
2927     }
2928
2929     $max_size ||= $self->_get_dbh->{LongReadLen} || 8000;
2930   };
2931 }
2932
2933 # Determine if a data_type is some type of BLOB
2934 # FIXME: these regexes are expensive, result of these checks should be cached in
2935 # the column_info .
2936 sub _is_lob_type {
2937   my ($self, $data_type) = @_;
2938   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2939     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2940                                   |varchar|character\s*varying|nvarchar
2941                                   |national\s*character\s*varying))?\z/xi);
2942 }
2943
2944 sub _is_binary_lob_type {
2945   my ($self, $data_type) = @_;
2946   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2947     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2948 }
2949
2950 sub _is_text_lob_type {
2951   my ($self, $data_type) = @_;
2952   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2953     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2954                         |national\s*character\s*varying))\z/xi);
2955 }
2956
2957 1;
2958
2959 =head1 USAGE NOTES
2960
2961 =head2 DBIx::Class and AutoCommit
2962
2963 DBIx::Class can do some wonderful magic with handling exceptions,
2964 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2965 (the default) combined with C<txn_do> for transaction support.
2966
2967 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2968 in an assumed transaction between commits, and you're telling us you'd
2969 like to manage that manually.  A lot of the magic protections offered by
2970 this module will go away.  We can't protect you from exceptions due to database
2971 disconnects because we don't know anything about how to restart your
2972 transactions.  You're on your own for handling all sorts of exceptional
2973 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2974 be with raw DBI.
2975
2976
2977 =head1 AUTHORS
2978
2979 Matt S. Trout <mst@shadowcatsystems.co.uk>
2980
2981 Andy Grundman <andy@hybridized.org>
2982
2983 =head1 LICENSE
2984
2985 You may distribute this code under the same terms as Perl itself.
2986
2987 =cut