merge and improve _is_lob_type from Sybase::ASE into Storage::DBI
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use overload ();
20 use namespace::clean;
21
22
23 # default cursor class, overridable in connect_info attributes
24 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
25
26 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
27 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/
30   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
31   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
32   transaction_depth _dbh_autocommit  savepoints
33 /);
34
35 # the values for these accessors are picked out (and deleted) from
36 # the attribute hashref passed to connect_info
37 my @storage_options = qw/
38   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
39   disable_sth_caching unsafe auto_savepoint
40 /;
41 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
42
43
44 # capability definitions, using a 2-tiered accessor system
45 # The rationale is:
46 #
47 # A driver/user may define _use_X, which blindly without any checks says:
48 # "(do not) use this capability", (use_dbms_capability is an "inherited"
49 # type accessor)
50 #
51 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
52 # accessor, which in turn calls _determine_supports_X, and stores the return
53 # in a special slot on the storage object, which is wiped every time a $dbh
54 # reconnection takes place (it is not guaranteed that upon reconnection we
55 # will get the same rdbms version). _determine_supports_X does not need to
56 # exist on a driver, as we ->can for it before calling.
57
58 my @capabilities = (qw/
59   insert_returning
60   insert_returning_bound
61   placeholders
62   typeless_placeholders
63   join_optimizer
64 /);
65 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
66 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
67
68 # on by default, not strictly a capability (pending rewrite)
69 __PACKAGE__->_use_join_optimizer (1);
70 sub _determine_supports_join_optimizer { 1 };
71
72 # Each of these methods need _determine_driver called before itself
73 # in order to function reliably. This is a purely DRY optimization
74 #
75 # get_(use)_dbms_capability need to be called on the correct Storage
76 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
77 # _determine_supports_X which obv. needs a correct driver as well
78 my @rdbms_specific_methods = qw/
79   deployment_statements
80   sqlt_type
81   sql_maker
82   build_datetime_parser
83   datetime_parser_type
84
85   insert
86   insert_bulk
87   update
88   delete
89   select
90   select_single
91
92   get_use_dbms_capability
93   get_dbms_capability
94
95   _server_info
96   _get_server_version
97 /;
98
99 for my $meth (@rdbms_specific_methods) {
100
101   my $orig = __PACKAGE__->can ($meth)
102     or die "$meth is not a ::Storage::DBI method!";
103
104   no strict qw/refs/;
105   no warnings qw/redefine/;
106   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
107     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
108       $_[0]->_determine_driver;
109
110       # This for some reason crashes and burns on perl 5.8.1
111       # IFF the method ends up throwing an exception
112       #goto $_[0]->can ($meth);
113
114       my $cref = $_[0]->can ($meth);
115       goto $cref;
116     }
117     goto $orig;
118   };
119 }
120
121
122 =head1 NAME
123
124 DBIx::Class::Storage::DBI - DBI storage handler
125
126 =head1 SYNOPSIS
127
128   my $schema = MySchema->connect('dbi:SQLite:my.db');
129
130   $schema->storage->debug(1);
131
132   my @stuff = $schema->storage->dbh_do(
133     sub {
134       my ($storage, $dbh, @args) = @_;
135       $dbh->do("DROP TABLE authors");
136     },
137     @column_list
138   );
139
140   $schema->resultset('Book')->search({
141      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
142   });
143
144 =head1 DESCRIPTION
145
146 This class represents the connection to an RDBMS via L<DBI>.  See
147 L<DBIx::Class::Storage> for general information.  This pod only
148 documents DBI-specific methods and behaviors.
149
150 =head1 METHODS
151
152 =cut
153
154 sub new {
155   my $new = shift->next::method(@_);
156
157   $new->transaction_depth(0);
158   $new->_sql_maker_opts({});
159   $new->_dbh_details({});
160   $new->{savepoints} = [];
161   $new->{_in_dbh_do} = 0;
162   $new->{_dbh_gen} = 0;
163
164   # read below to see what this does
165   $new->_arm_global_destructor;
166
167   $new;
168 }
169
170 # This is hack to work around perl shooting stuff in random
171 # order on exit(). If we do not walk the remaining storage
172 # objects in an END block, there is a *small but real* chance
173 # of a fork()ed child to kill the parent's shared DBI handle,
174 # *before perl reaches the DESTROY in this package*
175 # Yes, it is ugly and effective.
176 # Additionally this registry is used by the CLONE method to
177 # make sure no handles are shared between threads
178 {
179   my %seek_and_destroy;
180
181   sub _arm_global_destructor {
182     my $self = shift;
183     my $key = refaddr ($self);
184     $seek_and_destroy{$key} = $self;
185     weaken ($seek_and_destroy{$key});
186   }
187
188   END {
189     local $?; # just in case the DBI destructor changes it somehow
190
191     # destroy just the object if not native to this process/thread
192     $_->_verify_pid for (grep
193       { defined $_ }
194       values %seek_and_destroy
195     );
196   }
197
198   sub CLONE {
199     # As per DBI's recommendation, DBIC disconnects all handles as
200     # soon as possible (DBIC will reconnect only on demand from within
201     # the thread)
202     for (values %seek_and_destroy) {
203       next unless $_;
204       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
205       $_->_dbh(undef);
206     }
207   }
208 }
209
210 sub DESTROY {
211   my $self = shift;
212
213   # some databases spew warnings on implicit disconnect
214   local $SIG{__WARN__} = sub {};
215   $self->_dbh(undef);
216
217   # this op is necessary, since the very last perl runtime statement
218   # triggers a global destruction shootout, and the $SIG localization
219   # may very well be destroyed before perl actually gets to do the
220   # $dbh undef
221   1;
222 }
223
224 # handle pid changes correctly - do not destroy parent's connection
225 sub _verify_pid {
226   my $self = shift;
227
228   my $pid = $self->_conn_pid;
229   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
230     $dbh->{InactiveDestroy} = 1;
231     $self->{_dbh_gen}++;
232     $self->_dbh(undef);
233   }
234
235   return;
236 }
237
238 =head2 connect_info
239
240 This method is normally called by L<DBIx::Class::Schema/connection>, which
241 encapsulates its argument list in an arrayref before passing them here.
242
243 The argument list may contain:
244
245 =over
246
247 =item *
248
249 The same 4-element argument set one would normally pass to
250 L<DBI/connect>, optionally followed by
251 L<extra attributes|/DBIx::Class specific connection attributes>
252 recognized by DBIx::Class:
253
254   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
255
256 =item *
257
258 A single code reference which returns a connected
259 L<DBI database handle|DBI/connect> optionally followed by
260 L<extra attributes|/DBIx::Class specific connection attributes> recognized
261 by DBIx::Class:
262
263   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
264
265 =item *
266
267 A single hashref with all the attributes and the dsn/user/password
268 mixed together:
269
270   $connect_info_args = [{
271     dsn => $dsn,
272     user => $user,
273     password => $pass,
274     %dbi_attributes,
275     %extra_attributes,
276   }];
277
278   $connect_info_args = [{
279     dbh_maker => sub { DBI->connect (...) },
280     %dbi_attributes,
281     %extra_attributes,
282   }];
283
284 This is particularly useful for L<Catalyst> based applications, allowing the
285 following config (L<Config::General> style):
286
287   <Model::DB>
288     schema_class   App::DB
289     <connect_info>
290       dsn          dbi:mysql:database=test
291       user         testuser
292       password     TestPass
293       AutoCommit   1
294     </connect_info>
295   </Model::DB>
296
297 The C<dsn>/C<user>/C<password> combination can be substituted by the
298 C<dbh_maker> key whose value is a coderef that returns a connected
299 L<DBI database handle|DBI/connect>
300
301 =back
302
303 Please note that the L<DBI> docs recommend that you always explicitly
304 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
305 recommends that it be set to I<1>, and that you perform transactions
306 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
307 to I<1> if you do not do explicitly set it to zero.  This is the default
308 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
309
310 =head3 DBIx::Class specific connection attributes
311
312 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
313 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
314 the following connection options. These options can be mixed in with your other
315 L<DBI> connection attributes, or placed in a separate hashref
316 (C<\%extra_attributes>) as shown above.
317
318 Every time C<connect_info> is invoked, any previous settings for
319 these options will be cleared before setting the new ones, regardless of
320 whether any options are specified in the new C<connect_info>.
321
322
323 =over
324
325 =item on_connect_do
326
327 Specifies things to do immediately after connecting or re-connecting to
328 the database.  Its value may contain:
329
330 =over
331
332 =item a scalar
333
334 This contains one SQL statement to execute.
335
336 =item an array reference
337
338 This contains SQL statements to execute in order.  Each element contains
339 a string or a code reference that returns a string.
340
341 =item a code reference
342
343 This contains some code to execute.  Unlike code references within an
344 array reference, its return value is ignored.
345
346 =back
347
348 =item on_disconnect_do
349
350 Takes arguments in the same form as L</on_connect_do> and executes them
351 immediately before disconnecting from the database.
352
353 Note, this only runs if you explicitly call L</disconnect> on the
354 storage object.
355
356 =item on_connect_call
357
358 A more generalized form of L</on_connect_do> that calls the specified
359 C<connect_call_METHOD> methods in your storage driver.
360
361   on_connect_do => 'select 1'
362
363 is equivalent to:
364
365   on_connect_call => [ [ do_sql => 'select 1' ] ]
366
367 Its values may contain:
368
369 =over
370
371 =item a scalar
372
373 Will call the C<connect_call_METHOD> method.
374
375 =item a code reference
376
377 Will execute C<< $code->($storage) >>
378
379 =item an array reference
380
381 Each value can be a method name or code reference.
382
383 =item an array of arrays
384
385 For each array, the first item is taken to be the C<connect_call_> method name
386 or code reference, and the rest are parameters to it.
387
388 =back
389
390 Some predefined storage methods you may use:
391
392 =over
393
394 =item do_sql
395
396 Executes a SQL string or a code reference that returns a SQL string. This is
397 what L</on_connect_do> and L</on_disconnect_do> use.
398
399 It can take:
400
401 =over
402
403 =item a scalar
404
405 Will execute the scalar as SQL.
406
407 =item an arrayref
408
409 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
410 attributes hashref and bind values.
411
412 =item a code reference
413
414 Will execute C<< $code->($storage) >> and execute the return array refs as
415 above.
416
417 =back
418
419 =item datetime_setup
420
421 Execute any statements necessary to initialize the database session to return
422 and accept datetime/timestamp values used with
423 L<DBIx::Class::InflateColumn::DateTime>.
424
425 Only necessary for some databases, see your specific storage driver for
426 implementation details.
427
428 =back
429
430 =item on_disconnect_call
431
432 Takes arguments in the same form as L</on_connect_call> and executes them
433 immediately before disconnecting from the database.
434
435 Calls the C<disconnect_call_METHOD> methods as opposed to the
436 C<connect_call_METHOD> methods called by L</on_connect_call>.
437
438 Note, this only runs if you explicitly call L</disconnect> on the
439 storage object.
440
441 =item disable_sth_caching
442
443 If set to a true value, this option will disable the caching of
444 statement handles via L<DBI/prepare_cached>.
445
446 =item limit_dialect
447
448 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
449 default L</sql_limit_dialect> setting of the storage (if any). For a list
450 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
451
452 =item quote_char
453
454 Specifies what characters to use to quote table and column names.
455
456 C<quote_char> expects either a single character, in which case is it
457 is placed on either side of the table/column name, or an arrayref of length
458 2 in which case the table/column name is placed between the elements.
459
460 For example under MySQL you should use C<< quote_char => '`' >>, and for
461 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
462
463 =item name_sep
464
465 This parameter is only useful in conjunction with C<quote_char>, and is used to
466 specify the character that separates elements (schemas, tables, columns) from
467 each other. If unspecified it defaults to the most commonly used C<.>.
468
469 =item unsafe
470
471 This Storage driver normally installs its own C<HandleError>, sets
472 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
473 all database handles, including those supplied by a coderef.  It does this
474 so that it can have consistent and useful error behavior.
475
476 If you set this option to a true value, Storage will not do its usual
477 modifications to the database handle's attributes, and instead relies on
478 the settings in your connect_info DBI options (or the values you set in
479 your connection coderef, in the case that you are connecting via coderef).
480
481 Note that your custom settings can cause Storage to malfunction,
482 especially if you set a C<HandleError> handler that suppresses exceptions
483 and/or disable C<RaiseError>.
484
485 =item auto_savepoint
486
487 If this option is true, L<DBIx::Class> will use savepoints when nesting
488 transactions, making it possible to recover from failure in the inner
489 transaction without having to abort all outer transactions.
490
491 =item cursor_class
492
493 Use this argument to supply a cursor class other than the default
494 L<DBIx::Class::Storage::DBI::Cursor>.
495
496 =back
497
498 Some real-life examples of arguments to L</connect_info> and
499 L<DBIx::Class::Schema/connect>
500
501   # Simple SQLite connection
502   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
503
504   # Connect via subref
505   ->connect_info([ sub { DBI->connect(...) } ]);
506
507   # Connect via subref in hashref
508   ->connect_info([{
509     dbh_maker => sub { DBI->connect(...) },
510     on_connect_do => 'alter session ...',
511   }]);
512
513   # A bit more complicated
514   ->connect_info(
515     [
516       'dbi:Pg:dbname=foo',
517       'postgres',
518       'my_pg_password',
519       { AutoCommit => 1 },
520       { quote_char => q{"} },
521     ]
522   );
523
524   # Equivalent to the previous example
525   ->connect_info(
526     [
527       'dbi:Pg:dbname=foo',
528       'postgres',
529       'my_pg_password',
530       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
531     ]
532   );
533
534   # Same, but with hashref as argument
535   # See parse_connect_info for explanation
536   ->connect_info(
537     [{
538       dsn         => 'dbi:Pg:dbname=foo',
539       user        => 'postgres',
540       password    => 'my_pg_password',
541       AutoCommit  => 1,
542       quote_char  => q{"},
543       name_sep    => q{.},
544     }]
545   );
546
547   # Subref + DBIx::Class-specific connection options
548   ->connect_info(
549     [
550       sub { DBI->connect(...) },
551       {
552           quote_char => q{`},
553           name_sep => q{@},
554           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
555           disable_sth_caching => 1,
556       },
557     ]
558   );
559
560
561
562 =cut
563
564 sub connect_info {
565   my ($self, $info) = @_;
566
567   return $self->_connect_info if !$info;
568
569   $self->_connect_info($info); # copy for _connect_info
570
571   $info = $self->_normalize_connect_info($info)
572     if ref $info eq 'ARRAY';
573
574   for my $storage_opt (keys %{ $info->{storage_options} }) {
575     my $value = $info->{storage_options}{$storage_opt};
576
577     $self->$storage_opt($value);
578   }
579
580   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
581   #  the new set of options
582   $self->_sql_maker(undef);
583   $self->_sql_maker_opts({});
584
585   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
586     my $value = $info->{sql_maker_options}{$sql_maker_opt};
587
588     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
589   }
590
591   my %attrs = (
592     %{ $self->_default_dbi_connect_attributes || {} },
593     %{ $info->{attributes} || {} },
594   );
595
596   my @args = @{ $info->{arguments} };
597
598   $self->_dbi_connect_info([@args,
599     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
600
601   # FIXME - dirty:
602   # save attributes them in a separate accessor so they are always
603   # introspectable, even in case of a CODE $dbhmaker
604   $self->_dbic_connect_attributes (\%attrs);
605
606   return $self->_connect_info;
607 }
608
609 sub _normalize_connect_info {
610   my ($self, $info_arg) = @_;
611   my %info;
612
613   my @args = @$info_arg;  # take a shallow copy for further mutilation
614
615   # combine/pre-parse arguments depending on invocation style
616
617   my %attrs;
618   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
619     %attrs = %{ $args[1] || {} };
620     @args = $args[0];
621   }
622   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
623     %attrs = %{$args[0]};
624     @args = ();
625     if (my $code = delete $attrs{dbh_maker}) {
626       @args = $code;
627
628       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
629       if (@ignored) {
630         carp sprintf (
631             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
632           . "to the result of 'dbh_maker'",
633
634           join (', ', map { "'$_'" } (@ignored) ),
635         );
636       }
637     }
638     else {
639       @args = delete @attrs{qw/dsn user password/};
640     }
641   }
642   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
643     %attrs = (
644       % { $args[3] || {} },
645       % { $args[4] || {} },
646     );
647     @args = @args[0,1,2];
648   }
649
650   $info{arguments} = \@args;
651
652   my @storage_opts = grep exists $attrs{$_},
653     @storage_options, 'cursor_class';
654
655   @{ $info{storage_options} }{@storage_opts} =
656     delete @attrs{@storage_opts} if @storage_opts;
657
658   my @sql_maker_opts = grep exists $attrs{$_},
659     qw/limit_dialect quote_char name_sep/;
660
661   @{ $info{sql_maker_options} }{@sql_maker_opts} =
662     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
663
664   $info{attributes} = \%attrs if %attrs;
665
666   return \%info;
667 }
668
669 sub _default_dbi_connect_attributes {
670   return {
671     AutoCommit => 1,
672     RaiseError => 1,
673     PrintError => 0,
674   };
675 }
676
677 =head2 on_connect_do
678
679 This method is deprecated in favour of setting via L</connect_info>.
680
681 =cut
682
683 =head2 on_disconnect_do
684
685 This method is deprecated in favour of setting via L</connect_info>.
686
687 =cut
688
689 sub _parse_connect_do {
690   my ($self, $type) = @_;
691
692   my $val = $self->$type;
693   return () if not defined $val;
694
695   my @res;
696
697   if (not ref($val)) {
698     push @res, [ 'do_sql', $val ];
699   } elsif (ref($val) eq 'CODE') {
700     push @res, $val;
701   } elsif (ref($val) eq 'ARRAY') {
702     push @res, map { [ 'do_sql', $_ ] } @$val;
703   } else {
704     $self->throw_exception("Invalid type for $type: ".ref($val));
705   }
706
707   return \@res;
708 }
709
710 =head2 dbh_do
711
712 Arguments: ($subref | $method_name), @extra_coderef_args?
713
714 Execute the given $subref or $method_name using the new exception-based
715 connection management.
716
717 The first two arguments will be the storage object that C<dbh_do> was called
718 on and a database handle to use.  Any additional arguments will be passed
719 verbatim to the called subref as arguments 2 and onwards.
720
721 Using this (instead of $self->_dbh or $self->dbh) ensures correct
722 exception handling and reconnection (or failover in future subclasses).
723
724 Your subref should have no side-effects outside of the database, as
725 there is the potential for your subref to be partially double-executed
726 if the database connection was stale/dysfunctional.
727
728 Example:
729
730   my @stuff = $schema->storage->dbh_do(
731     sub {
732       my ($storage, $dbh, @cols) = @_;
733       my $cols = join(q{, }, @cols);
734       $dbh->selectrow_array("SELECT $cols FROM foo");
735     },
736     @column_list
737   );
738
739 =cut
740
741 sub dbh_do {
742   my $self = shift;
743   my $code = shift;
744
745   my $dbh = $self->_get_dbh;
746
747   return $self->$code($dbh, @_)
748     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
749
750   local $self->{_in_dbh_do} = 1;
751
752   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
753   my $args = \@_;
754   return try {
755     $self->$code ($dbh, @$args);
756   } catch {
757     $self->throw_exception($_) if $self->connected;
758
759     # We were not connected - reconnect and retry, but let any
760     #  exception fall right through this time
761     carp "Retrying $code after catching disconnected exception: $_"
762       if $ENV{DBIC_DBIRETRY_DEBUG};
763
764     $self->_populate_dbh;
765     $self->$code($self->_dbh, @$args);
766   };
767 }
768
769 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
770 # It also informs dbh_do to bypass itself while under the direction of txn_do,
771 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
772 sub txn_do {
773   my $self = shift;
774   my $coderef = shift;
775
776   ref $coderef eq 'CODE' or $self->throw_exception
777     ('$coderef must be a CODE reference');
778
779   local $self->{_in_dbh_do} = 1;
780
781   my @result;
782   my $want = wantarray;
783
784   my $tried = 0;
785   while(1) {
786     my $exception;
787
788     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
789     my $args = \@_;
790
791     try {
792       $self->txn_begin;
793       my $txn_start_depth = $self->transaction_depth;
794       if($want) {
795           @result = $coderef->(@$args);
796       }
797       elsif(defined $want) {
798           $result[0] = $coderef->(@$args);
799       }
800       else {
801           $coderef->(@$args);
802       }
803
804       my $delta_txn = $txn_start_depth - $self->transaction_depth;
805       if ($delta_txn == 0) {
806         $self->txn_commit;
807       }
808       elsif ($delta_txn != 1) {
809         # an off-by-one would mean we fired a rollback
810         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
811       }
812     } catch {
813       $exception = $_;
814     };
815
816     if(! defined $exception) { return wantarray ? @result : $result[0] }
817
818     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
819       my $rollback_exception;
820       try { $self->txn_rollback } catch { $rollback_exception = shift };
821       if(defined $rollback_exception) {
822         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
823         $self->throw_exception($exception)  # propagate nested rollback
824           if $rollback_exception =~ /$exception_class/;
825
826         $self->throw_exception(
827           "Transaction aborted: ${exception}. "
828           . "Rollback failed: ${rollback_exception}"
829         );
830       }
831       $self->throw_exception($exception)
832     }
833
834     # We were not connected, and was first try - reconnect and retry
835     # via the while loop
836     carp "Retrying $coderef after catching disconnected exception: $exception"
837       if $ENV{DBIC_TXNRETRY_DEBUG};
838     $self->_populate_dbh;
839   }
840 }
841
842 =head2 disconnect
843
844 Our C<disconnect> method also performs a rollback first if the
845 database is not in C<AutoCommit> mode.
846
847 =cut
848
849 sub disconnect {
850   my ($self) = @_;
851
852   if( $self->_dbh ) {
853     my @actions;
854
855     push @actions, ( $self->on_disconnect_call || () );
856     push @actions, $self->_parse_connect_do ('on_disconnect_do');
857
858     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
859
860     $self->_dbh_rollback unless $self->_dbh_autocommit;
861
862     %{ $self->_dbh->{CachedKids} } = ();
863     $self->_dbh->disconnect;
864     $self->_dbh(undef);
865     $self->{_dbh_gen}++;
866   }
867 }
868
869 =head2 with_deferred_fk_checks
870
871 =over 4
872
873 =item Arguments: C<$coderef>
874
875 =item Return Value: The return value of $coderef
876
877 =back
878
879 Storage specific method to run the code ref with FK checks deferred or
880 in MySQL's case disabled entirely.
881
882 =cut
883
884 # Storage subclasses should override this
885 sub with_deferred_fk_checks {
886   my ($self, $sub) = @_;
887   $sub->();
888 }
889
890 =head2 connected
891
892 =over
893
894 =item Arguments: none
895
896 =item Return Value: 1|0
897
898 =back
899
900 Verifies that the current database handle is active and ready to execute
901 an SQL statement (e.g. the connection did not get stale, server is still
902 answering, etc.) This method is used internally by L</dbh>.
903
904 =cut
905
906 sub connected {
907   my $self = shift;
908   return 0 unless $self->_seems_connected;
909
910   #be on the safe side
911   local $self->_dbh->{RaiseError} = 1;
912
913   return $self->_ping;
914 }
915
916 sub _seems_connected {
917   my $self = shift;
918
919   $self->_verify_pid;
920
921   my $dbh = $self->_dbh
922     or return 0;
923
924   return $dbh->FETCH('Active');
925 }
926
927 sub _ping {
928   my $self = shift;
929
930   my $dbh = $self->_dbh or return 0;
931
932   return $dbh->ping;
933 }
934
935 sub ensure_connected {
936   my ($self) = @_;
937
938   unless ($self->connected) {
939     $self->_populate_dbh;
940   }
941 }
942
943 =head2 dbh
944
945 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
946 is guaranteed to be healthy by implicitly calling L</connected>, and if
947 necessary performing a reconnection before returning. Keep in mind that this
948 is very B<expensive> on some database engines. Consider using L</dbh_do>
949 instead.
950
951 =cut
952
953 sub dbh {
954   my ($self) = @_;
955
956   if (not $self->_dbh) {
957     $self->_populate_dbh;
958   } else {
959     $self->ensure_connected;
960   }
961   return $self->_dbh;
962 }
963
964 # this is the internal "get dbh or connect (don't check)" method
965 sub _get_dbh {
966   my $self = shift;
967   $self->_verify_pid;
968   $self->_populate_dbh unless $self->_dbh;
969   return $self->_dbh;
970 }
971
972 sub sql_maker {
973   my ($self) = @_;
974   unless ($self->_sql_maker) {
975     my $sql_maker_class = $self->sql_maker_class;
976     $self->ensure_class_loaded ($sql_maker_class);
977
978     my %opts = %{$self->_sql_maker_opts||{}};
979     my $dialect =
980       $opts{limit_dialect}
981         ||
982       $self->sql_limit_dialect
983         ||
984       do {
985         my $s_class = (ref $self) || $self;
986         carp (
987           "Your storage class ($s_class) does not set sql_limit_dialect and you "
988         . 'have not supplied an explicit limit_dialect in your connection_info. '
989         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
990         . 'databases but can be (and often is) painfully slow.'
991         );
992
993         'GenericSubQ';
994       }
995     ;
996
997     $self->_sql_maker($sql_maker_class->new(
998       bindtype=>'columns',
999       array_datatypes => 1,
1000       limit_dialect => $dialect,
1001       name_sep => '.',
1002       %opts,
1003     ));
1004   }
1005   return $self->_sql_maker;
1006 }
1007
1008 # nothing to do by default
1009 sub _rebless {}
1010 sub _init {}
1011
1012 sub _populate_dbh {
1013   my ($self) = @_;
1014
1015   my @info = @{$self->_dbi_connect_info || []};
1016   $self->_dbh(undef); # in case ->connected failed we might get sent here
1017   $self->_dbh_details({}); # reset everything we know
1018
1019   $self->_dbh($self->_connect(@info));
1020
1021   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1022
1023   $self->_determine_driver;
1024
1025   # Always set the transaction depth on connect, since
1026   #  there is no transaction in progress by definition
1027   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1028
1029   $self->_run_connection_actions unless $self->{_in_determine_driver};
1030 }
1031
1032 sub _run_connection_actions {
1033   my $self = shift;
1034   my @actions;
1035
1036   push @actions, ( $self->on_connect_call || () );
1037   push @actions, $self->_parse_connect_do ('on_connect_do');
1038
1039   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1040 }
1041
1042
1043
1044 sub set_use_dbms_capability {
1045   $_[0]->set_inherited ($_[1], $_[2]);
1046 }
1047
1048 sub get_use_dbms_capability {
1049   my ($self, $capname) = @_;
1050
1051   my $use = $self->get_inherited ($capname);
1052   return defined $use
1053     ? $use
1054     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1055   ;
1056 }
1057
1058 sub set_dbms_capability {
1059   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1060 }
1061
1062 sub get_dbms_capability {
1063   my ($self, $capname) = @_;
1064
1065   my $cap = $self->_dbh_details->{capability}{$capname};
1066
1067   unless (defined $cap) {
1068     if (my $meth = $self->can ("_determine$capname")) {
1069       $cap = $self->$meth ? 1 : 0;
1070     }
1071     else {
1072       $cap = 0;
1073     }
1074
1075     $self->set_dbms_capability ($capname, $cap);
1076   }
1077
1078   return $cap;
1079 }
1080
1081 sub _server_info {
1082   my $self = shift;
1083
1084   my $info;
1085   unless ($info = $self->_dbh_details->{info}) {
1086
1087     $info = {};
1088
1089     my $server_version = try { $self->_get_server_version };
1090
1091     if (defined $server_version) {
1092       $info->{dbms_version} = $server_version;
1093
1094       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1095       my @verparts = split (/\./, $numeric_version);
1096       if (
1097         @verparts
1098           &&
1099         $verparts[0] <= 999
1100       ) {
1101         # consider only up to 3 version parts, iff not more than 3 digits
1102         my @use_parts;
1103         while (@verparts && @use_parts < 3) {
1104           my $p = shift @verparts;
1105           last if $p > 999;
1106           push @use_parts, $p;
1107         }
1108         push @use_parts, 0 while @use_parts < 3;
1109
1110         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1111       }
1112     }
1113
1114     $self->_dbh_details->{info} = $info;
1115   }
1116
1117   return $info;
1118 }
1119
1120 sub _get_server_version {
1121   shift->_get_dbh->get_info(18);
1122 }
1123
1124 sub _determine_driver {
1125   my ($self) = @_;
1126
1127   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1128     my $started_connected = 0;
1129     local $self->{_in_determine_driver} = 1;
1130
1131     if (ref($self) eq __PACKAGE__) {
1132       my $driver;
1133       if ($self->_dbh) { # we are connected
1134         $driver = $self->_dbh->{Driver}{Name};
1135         $started_connected = 1;
1136       } else {
1137         # if connect_info is a CODEREF, we have no choice but to connect
1138         if (ref $self->_dbi_connect_info->[0] &&
1139             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1140           $self->_populate_dbh;
1141           $driver = $self->_dbh->{Driver}{Name};
1142         }
1143         else {
1144           # try to use dsn to not require being connected, the driver may still
1145           # force a connection in _rebless to determine version
1146           # (dsn may not be supplied at all if all we do is make a mock-schema)
1147           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1148           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1149           $driver ||= $ENV{DBI_DRIVER};
1150         }
1151       }
1152
1153       if ($driver) {
1154         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1155         if ($self->load_optional_class($storage_class)) {
1156           mro::set_mro($storage_class, 'c3');
1157           bless $self, $storage_class;
1158           $self->_rebless();
1159         }
1160       }
1161     }
1162
1163     $self->_driver_determined(1);
1164
1165     $self->_init; # run driver-specific initializations
1166
1167     $self->_run_connection_actions
1168         if !$started_connected && defined $self->_dbh;
1169   }
1170 }
1171
1172 sub _do_connection_actions {
1173   my $self          = shift;
1174   my $method_prefix = shift;
1175   my $call          = shift;
1176
1177   if (not ref($call)) {
1178     my $method = $method_prefix . $call;
1179     $self->$method(@_);
1180   } elsif (ref($call) eq 'CODE') {
1181     $self->$call(@_);
1182   } elsif (ref($call) eq 'ARRAY') {
1183     if (ref($call->[0]) ne 'ARRAY') {
1184       $self->_do_connection_actions($method_prefix, $_) for @$call;
1185     } else {
1186       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1187     }
1188   } else {
1189     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1190   }
1191
1192   return $self;
1193 }
1194
1195 sub connect_call_do_sql {
1196   my $self = shift;
1197   $self->_do_query(@_);
1198 }
1199
1200 sub disconnect_call_do_sql {
1201   my $self = shift;
1202   $self->_do_query(@_);
1203 }
1204
1205 # override in db-specific backend when necessary
1206 sub connect_call_datetime_setup { 1 }
1207
1208 sub _do_query {
1209   my ($self, $action) = @_;
1210
1211   if (ref $action eq 'CODE') {
1212     $action = $action->($self);
1213     $self->_do_query($_) foreach @$action;
1214   }
1215   else {
1216     # Most debuggers expect ($sql, @bind), so we need to exclude
1217     # the attribute hash which is the second argument to $dbh->do
1218     # furthermore the bind values are usually to be presented
1219     # as named arrayref pairs, so wrap those here too
1220     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1221     my $sql = shift @do_args;
1222     my $attrs = shift @do_args;
1223     my @bind = map { [ undef, $_ ] } @do_args;
1224
1225     $self->_query_start($sql, @bind);
1226     $self->_get_dbh->do($sql, $attrs, @do_args);
1227     $self->_query_end($sql, @bind);
1228   }
1229
1230   return $self;
1231 }
1232
1233 sub _connect {
1234   my ($self, @info) = @_;
1235
1236   $self->throw_exception("You failed to provide any connection info")
1237     if !@info;
1238
1239   my ($old_connect_via, $dbh);
1240
1241   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1242     $old_connect_via = $DBI::connect_via;
1243     $DBI::connect_via = 'connect';
1244   }
1245
1246   try {
1247     if(ref $info[0] eq 'CODE') {
1248        $dbh = $info[0]->();
1249     }
1250     else {
1251        $dbh = DBI->connect(@info);
1252     }
1253
1254     if (!$dbh) {
1255       die $DBI::errstr;
1256     }
1257
1258     unless ($self->unsafe) {
1259
1260       # this odd anonymous coderef dereference is in fact really
1261       # necessary to avoid the unwanted effect described in perl5
1262       # RT#75792
1263       sub {
1264         my $weak_self = $_[0];
1265         weaken $weak_self;
1266
1267         $_[1]->{HandleError} = sub {
1268           if ($weak_self) {
1269             $weak_self->throw_exception("DBI Exception: $_[0]");
1270           }
1271           else {
1272             # the handler may be invoked by something totally out of
1273             # the scope of DBIC
1274             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1275           }
1276         };
1277       }->($self, $dbh);
1278
1279       $dbh->{ShowErrorStatement} = 1;
1280       $dbh->{RaiseError} = 1;
1281       $dbh->{PrintError} = 0;
1282     }
1283   }
1284   catch {
1285     $self->throw_exception("DBI Connection failed: $_")
1286   }
1287   finally {
1288     $DBI::connect_via = $old_connect_via if $old_connect_via;
1289   };
1290
1291   $self->_dbh_autocommit($dbh->{AutoCommit});
1292   $dbh;
1293 }
1294
1295 sub svp_begin {
1296   my ($self, $name) = @_;
1297
1298   $name = $self->_svp_generate_name
1299     unless defined $name;
1300
1301   $self->throw_exception ("You can't use savepoints outside a transaction")
1302     if $self->{transaction_depth} == 0;
1303
1304   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1305     unless $self->can('_svp_begin');
1306
1307   push @{ $self->{savepoints} }, $name;
1308
1309   $self->debugobj->svp_begin($name) if $self->debug;
1310
1311   return $self->_svp_begin($name);
1312 }
1313
1314 sub svp_release {
1315   my ($self, $name) = @_;
1316
1317   $self->throw_exception ("You can't use savepoints outside a transaction")
1318     if $self->{transaction_depth} == 0;
1319
1320   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1321     unless $self->can('_svp_release');
1322
1323   if (defined $name) {
1324     $self->throw_exception ("Savepoint '$name' does not exist")
1325       unless grep { $_ eq $name } @{ $self->{savepoints} };
1326
1327     # Dig through the stack until we find the one we are releasing.  This keeps
1328     # the stack up to date.
1329     my $svp;
1330
1331     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1332   } else {
1333     $name = pop @{ $self->{savepoints} };
1334   }
1335
1336   $self->debugobj->svp_release($name) if $self->debug;
1337
1338   return $self->_svp_release($name);
1339 }
1340
1341 sub svp_rollback {
1342   my ($self, $name) = @_;
1343
1344   $self->throw_exception ("You can't use savepoints outside a transaction")
1345     if $self->{transaction_depth} == 0;
1346
1347   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1348     unless $self->can('_svp_rollback');
1349
1350   if (defined $name) {
1351       # If they passed us a name, verify that it exists in the stack
1352       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1353           $self->throw_exception("Savepoint '$name' does not exist!");
1354       }
1355
1356       # Dig through the stack until we find the one we are releasing.  This keeps
1357       # the stack up to date.
1358       while(my $s = pop(@{ $self->{savepoints} })) {
1359           last if($s eq $name);
1360       }
1361       # Add the savepoint back to the stack, as a rollback doesn't remove the
1362       # named savepoint, only everything after it.
1363       push(@{ $self->{savepoints} }, $name);
1364   } else {
1365       # We'll assume they want to rollback to the last savepoint
1366       $name = $self->{savepoints}->[-1];
1367   }
1368
1369   $self->debugobj->svp_rollback($name) if $self->debug;
1370
1371   return $self->_svp_rollback($name);
1372 }
1373
1374 sub _svp_generate_name {
1375   my ($self) = @_;
1376   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1377 }
1378
1379 sub txn_begin {
1380   my $self = shift;
1381
1382   # this means we have not yet connected and do not know the AC status
1383   # (e.g. coderef $dbh)
1384   if (! defined $self->_dbh_autocommit) {
1385     $self->ensure_connected;
1386   }
1387   # otherwise re-connect on pid changes, so
1388   # that the txn_depth is adjusted properly
1389   # the lightweight _get_dbh is good enoug here
1390   # (only superficial handle check, no pings)
1391   else {
1392     $self->_get_dbh;
1393   }
1394
1395   if($self->transaction_depth == 0) {
1396     $self->debugobj->txn_begin()
1397       if $self->debug;
1398     $self->_dbh_begin_work;
1399   }
1400   elsif ($self->auto_savepoint) {
1401     $self->svp_begin;
1402   }
1403   $self->{transaction_depth}++;
1404 }
1405
1406 sub _dbh_begin_work {
1407   my $self = shift;
1408
1409   # if the user is utilizing txn_do - good for him, otherwise we need to
1410   # ensure that the $dbh is healthy on BEGIN.
1411   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1412   # will be replaced by a failure of begin_work itself (which will be
1413   # then retried on reconnect)
1414   if ($self->{_in_dbh_do}) {
1415     $self->_dbh->begin_work;
1416   } else {
1417     $self->dbh_do(sub { $_[1]->begin_work });
1418   }
1419 }
1420
1421 sub txn_commit {
1422   my $self = shift;
1423   if ($self->{transaction_depth} == 1) {
1424     $self->debugobj->txn_commit()
1425       if ($self->debug);
1426     $self->_dbh_commit;
1427     $self->{transaction_depth} = 0
1428       if $self->_dbh_autocommit;
1429   }
1430   elsif($self->{transaction_depth} > 1) {
1431     $self->{transaction_depth}--;
1432     $self->svp_release
1433       if $self->auto_savepoint;
1434   }
1435   else {
1436     $self->throw_exception( 'Refusing to commit without a started transaction' );
1437   }
1438 }
1439
1440 sub _dbh_commit {
1441   my $self = shift;
1442   my $dbh  = $self->_dbh
1443     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1444   $dbh->commit;
1445 }
1446
1447 sub txn_rollback {
1448   my $self = shift;
1449   my $dbh = $self->_dbh;
1450   try {
1451     if ($self->{transaction_depth} == 1) {
1452       $self->debugobj->txn_rollback()
1453         if ($self->debug);
1454       $self->{transaction_depth} = 0
1455         if $self->_dbh_autocommit;
1456       $self->_dbh_rollback;
1457     }
1458     elsif($self->{transaction_depth} > 1) {
1459       $self->{transaction_depth}--;
1460       if ($self->auto_savepoint) {
1461         $self->svp_rollback;
1462         $self->svp_release;
1463       }
1464     }
1465     else {
1466       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1467     }
1468   }
1469   catch {
1470     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1471
1472     if ($_ !~ /$exception_class/) {
1473       # ensure that a failed rollback resets the transaction depth
1474       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1475     }
1476
1477     $self->throw_exception($_)
1478   };
1479 }
1480
1481 sub _dbh_rollback {
1482   my $self = shift;
1483   my $dbh  = $self->_dbh
1484     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1485   $dbh->rollback;
1486 }
1487
1488 # This used to be the top-half of _execute.  It was split out to make it
1489 #  easier to override in NoBindVars without duping the rest.  It takes up
1490 #  all of _execute's args, and emits $sql, @bind.
1491 sub _prep_for_execute {
1492   my ($self, $op, $extra_bind, $ident, $args) = @_;
1493
1494   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1495     $ident = $ident->from();
1496   }
1497
1498   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1499
1500   unshift(@bind,
1501     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1502       if $extra_bind;
1503   return ($sql, \@bind);
1504 }
1505
1506
1507 sub _fix_bind_params {
1508     my ($self, @bind) = @_;
1509
1510     ### Turn @bind from something like this:
1511     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1512     ### to this:
1513     ###   ( "'1'", "'1'", "'3'" )
1514     return
1515         map {
1516             if ( defined( $_ && $_->[1] ) ) {
1517                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1518             }
1519             else { q{NULL}; }
1520         } @bind;
1521 }
1522
1523 sub _query_start {
1524     my ( $self, $sql, @bind ) = @_;
1525
1526     if ( $self->debug ) {
1527         @bind = $self->_fix_bind_params(@bind);
1528
1529         $self->debugobj->query_start( $sql, @bind );
1530     }
1531 }
1532
1533 sub _query_end {
1534     my ( $self, $sql, @bind ) = @_;
1535
1536     if ( $self->debug ) {
1537         @bind = $self->_fix_bind_params(@bind);
1538         $self->debugobj->query_end( $sql, @bind );
1539     }
1540 }
1541
1542 sub _dbh_execute {
1543   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1544
1545   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1546
1547   $self->_query_start( $sql, @$bind );
1548
1549   my $sth = $self->sth($sql,$op);
1550
1551   my $placeholder_index = 1;
1552
1553   foreach my $bound (@$bind) {
1554     my $attributes = {};
1555     my($column_name, @data) = @$bound;
1556
1557     if ($bind_attributes) {
1558       $attributes = $bind_attributes->{$column_name}
1559       if defined $bind_attributes->{$column_name};
1560     }
1561
1562     foreach my $data (@data) {
1563       my $ref = ref $data;
1564
1565       if ($ref and overload::Method($data, '""') ) {
1566         $data = "$data";
1567       }
1568       elsif ($ref eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1569         $sth->bind_param_inout(
1570           $placeholder_index++,
1571           $data,
1572           $self->_max_column_bytesize($ident, $column_name),
1573           $attributes
1574         );
1575         next;
1576       }
1577
1578       $sth->bind_param($placeholder_index++, $data, $attributes);
1579     }
1580   }
1581
1582   # Can this fail without throwing an exception anyways???
1583   my $rv = $sth->execute();
1584   $self->throw_exception(
1585     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1586   ) if !$rv;
1587
1588   $self->_query_end( $sql, @$bind );
1589
1590   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1591 }
1592
1593 sub _execute {
1594     my $self = shift;
1595     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1596 }
1597
1598 sub _prefetch_autovalues {
1599   my ($self, $source, $to_insert) = @_;
1600
1601   my $colinfo = $source->columns_info;
1602
1603   my %values;
1604   for my $col (keys %$colinfo) {
1605     if (
1606       $colinfo->{$col}{auto_nextval}
1607         and
1608       (
1609         ! exists $to_insert->{$col}
1610           or
1611         ref $to_insert->{$col} eq 'SCALAR'
1612       )
1613     ) {
1614       $values{$col} = $self->_sequence_fetch(
1615         'NEXTVAL',
1616         ( $colinfo->{$col}{sequence} ||=
1617             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1618         ),
1619       );
1620     }
1621   }
1622
1623   \%values;
1624 }
1625
1626 sub insert {
1627   my ($self, $source, $to_insert) = @_;
1628
1629   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1630
1631   # fuse the values
1632   $to_insert = { %$to_insert, %$prefetched_values };
1633
1634   # list of primary keys we try to fetch from the database
1635   # both not-exsists and scalarrefs are considered
1636   my %fetch_pks;
1637   for ($source->primary_columns) {
1638     $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
1639       if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
1640   }
1641
1642   my ($sqla_opts, @ir_container);
1643   if ($self->_use_insert_returning) {
1644
1645     # retain order as declared in the resultsource
1646     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1647       push @{$sqla_opts->{returning}}, $_;
1648       $sqla_opts->{returning_container} = \@ir_container
1649         if $self->_use_insert_returning_bound;
1650     }
1651   }
1652
1653   my $bind_attributes = $self->source_bind_attributes($source);
1654
1655   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1656
1657   my %returned_cols;
1658
1659   if (my $retlist = $sqla_opts->{returning}) {
1660     @ir_container = try {
1661       local $SIG{__WARN__} = sub {};
1662       my @r = $sth->fetchrow_array;
1663       $sth->finish;
1664       @r;
1665     } unless @ir_container;
1666
1667     @returned_cols{@$retlist} = @ir_container if @ir_container;
1668   }
1669
1670   return { %$prefetched_values, %returned_cols };
1671 }
1672
1673
1674 ## Currently it is assumed that all values passed will be "normal", i.e. not
1675 ## scalar refs, or at least, all the same type as the first set, the statement is
1676 ## only prepped once.
1677 sub insert_bulk {
1678   my ($self, $source, $cols, $data) = @_;
1679
1680   my %colvalues;
1681   @colvalues{@$cols} = (0..$#$cols);
1682
1683   for my $i (0..$#$cols) {
1684     my $first_val = $data->[0][$i];
1685     next unless ref $first_val eq 'SCALAR';
1686
1687     $colvalues{ $cols->[$i] } = $first_val;
1688   }
1689
1690   # check for bad data and stringify stringifiable objects
1691   my $bad_slice = sub {
1692     my ($msg, $col_idx, $slice_idx) = @_;
1693     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1694       $msg,
1695       $cols->[$col_idx],
1696       do {
1697         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1698         Dumper {
1699           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1700         },
1701       }
1702     );
1703   };
1704
1705   for my $datum_idx (0..$#$data) {
1706     my $datum = $data->[$datum_idx];
1707
1708     for my $col_idx (0..$#$cols) {
1709       my $val            = $datum->[$col_idx];
1710       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1711       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1712
1713       if ($is_literal_sql) {
1714         if (not ref $val) {
1715           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1716         }
1717         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1718           $bad_slice->("$reftype reference found where literal SQL expected",
1719             $col_idx, $datum_idx);
1720         }
1721         elsif ($$val ne $$sqla_bind){
1722           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1723             $col_idx, $datum_idx);
1724         }
1725       }
1726       elsif (my $reftype = ref $val) {
1727         require overload;
1728         if (overload::Method($val, '""')) {
1729           $datum->[$col_idx] = "".$val;
1730         }
1731         else {
1732           $bad_slice->("$reftype reference found where bind expected",
1733             $col_idx, $datum_idx);
1734         }
1735       }
1736     }
1737   }
1738
1739   my ($sql, $bind) = $self->_prep_for_execute (
1740     'insert', undef, $source, [\%colvalues]
1741   );
1742
1743   if (! @$bind) {
1744     # if the bindlist is empty - make sure all "values" are in fact
1745     # literal scalarrefs. If not the case this means the storage ate
1746     # them away (e.g. the NoBindVars component) and interpolated them
1747     # directly into the SQL. This obviosly can't be good for multi-inserts
1748
1749     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1750       if first { ref $_ ne 'SCALAR' } values %colvalues;
1751   }
1752
1753   # neither _execute_array, nor _execute_inserts_with_no_binds are
1754   # atomic (even if _execute _array is a single call). Thus a safety
1755   # scope guard
1756   my $guard = $self->txn_scope_guard;
1757
1758   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1759   my $sth = $self->sth($sql);
1760   my $rv = do {
1761     if (@$bind) {
1762       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1763       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1764     }
1765     else {
1766       # bind_param_array doesn't work if there are no binds
1767       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1768     }
1769   };
1770
1771   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1772
1773   $guard->commit;
1774
1775   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1776 }
1777
1778 sub _execute_array {
1779   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1780
1781   ## This must be an arrayref, else nothing works!
1782   my $tuple_status = [];
1783
1784   ## Get the bind_attributes, if any exist
1785   my $bind_attributes = $self->source_bind_attributes($source);
1786
1787   ## Bind the values and execute
1788   my $placeholder_index = 1;
1789
1790   foreach my $bound (@$bind) {
1791
1792     my $attributes = {};
1793     my ($column_name, $data_index) = @$bound;
1794
1795     if( $bind_attributes ) {
1796       $attributes = $bind_attributes->{$column_name}
1797       if defined $bind_attributes->{$column_name};
1798     }
1799
1800     my @data = map { $_->[$data_index] } @$data;
1801
1802     $sth->bind_param_array(
1803       $placeholder_index,
1804       [@data],
1805       (%$attributes ?  $attributes : ()),
1806     );
1807     $placeholder_index++;
1808   }
1809
1810   my ($rv, $err);
1811   try {
1812     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1813   }
1814   catch {
1815     $err = shift;
1816   };
1817
1818   # Statement must finish even if there was an exception.
1819   try {
1820     $sth->finish
1821   }
1822   catch {
1823     $err = shift unless defined $err
1824   };
1825
1826   $err = $sth->errstr
1827     if (! defined $err and $sth->err);
1828
1829   if (defined $err) {
1830     my $i = 0;
1831     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1832
1833     $self->throw_exception("Unexpected populate error: $err")
1834       if ($i > $#$tuple_status);
1835
1836     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1837       ($tuple_status->[$i][1] || $err),
1838       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1839     );
1840   }
1841
1842   return $rv;
1843 }
1844
1845 sub _dbh_execute_array {
1846     my ($self, $sth, $tuple_status, @extra) = @_;
1847
1848     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1849 }
1850
1851 sub _dbh_execute_inserts_with_no_binds {
1852   my ($self, $sth, $count) = @_;
1853
1854   my $err;
1855   try {
1856     my $dbh = $self->_get_dbh;
1857     local $dbh->{RaiseError} = 1;
1858     local $dbh->{PrintError} = 0;
1859
1860     $sth->execute foreach 1..$count;
1861   }
1862   catch {
1863     $err = shift;
1864   };
1865
1866   # Make sure statement is finished even if there was an exception.
1867   try {
1868     $sth->finish
1869   }
1870   catch {
1871     $err = shift unless defined $err;
1872   };
1873
1874   $self->throw_exception($err) if defined $err;
1875
1876   return $count;
1877 }
1878
1879 sub update {
1880   my ($self, $source, @args) = @_;
1881
1882   my $bind_attrs = $self->source_bind_attributes($source);
1883
1884   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1885 }
1886
1887
1888 sub delete {
1889   my ($self, $source, @args) = @_;
1890
1891   my $bind_attrs = $self->source_bind_attributes($source);
1892
1893   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1894 }
1895
1896 # We were sent here because the $rs contains a complex search
1897 # which will require a subquery to select the correct rows
1898 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1899 #
1900 # Generating a single PK column subquery is trivial and supported
1901 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1902 # Look at _multipk_update_delete()
1903 sub _subq_update_delete {
1904   my $self = shift;
1905   my ($rs, $op, $values) = @_;
1906
1907   my $rsrc = $rs->result_source;
1908
1909   # quick check if we got a sane rs on our hands
1910   my @pcols = $rsrc->_pri_cols;
1911
1912   my $sel = $rs->_resolved_attrs->{select};
1913   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1914
1915   if (
1916       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1917         ne
1918       join ("\x00", sort @$sel )
1919   ) {
1920     $self->throw_exception (
1921       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1922     );
1923   }
1924
1925   if (@pcols == 1) {
1926     return $self->$op (
1927       $rsrc,
1928       $op eq 'update' ? $values : (),
1929       { $pcols[0] => { -in => $rs->as_query } },
1930     );
1931   }
1932
1933   else {
1934     return $self->_multipk_update_delete (@_);
1935   }
1936 }
1937
1938 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1939 # resultset update/delete involving subqueries. So by default resort
1940 # to simple (and inefficient) delete_all style per-row opearations,
1941 # while allowing specific storages to override this with a faster
1942 # implementation.
1943 #
1944 sub _multipk_update_delete {
1945   return shift->_per_row_update_delete (@_);
1946 }
1947
1948 # This is the default loop used to delete/update rows for multi PK
1949 # resultsets, and used by mysql exclusively (because it can't do anything
1950 # else).
1951 #
1952 # We do not use $row->$op style queries, because resultset update/delete
1953 # is not expected to cascade (this is what delete_all/update_all is for).
1954 #
1955 # There should be no race conditions as the entire operation is rolled
1956 # in a transaction.
1957 #
1958 sub _per_row_update_delete {
1959   my $self = shift;
1960   my ($rs, $op, $values) = @_;
1961
1962   my $rsrc = $rs->result_source;
1963   my @pcols = $rsrc->_pri_cols;
1964
1965   my $guard = $self->txn_scope_guard;
1966
1967   # emulate the return value of $sth->execute for non-selects
1968   my $row_cnt = '0E0';
1969
1970   my $subrs_cur = $rs->cursor;
1971   my @all_pk = $subrs_cur->all;
1972   for my $pks ( @all_pk) {
1973
1974     my $cond;
1975     for my $i (0.. $#pcols) {
1976       $cond->{$pcols[$i]} = $pks->[$i];
1977     }
1978
1979     $self->$op (
1980       $rsrc,
1981       $op eq 'update' ? $values : (),
1982       $cond,
1983     );
1984
1985     $row_cnt++;
1986   }
1987
1988   $guard->commit;
1989
1990   return $row_cnt;
1991 }
1992
1993 sub _select {
1994   my $self = shift;
1995   $self->_execute($self->_select_args(@_));
1996 }
1997
1998 sub _select_args_to_query {
1999   my $self = shift;
2000
2001   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
2002   #  = $self->_select_args($ident, $select, $cond, $attrs);
2003   my ($op, $bind, $ident, $bind_attrs, @args) =
2004     $self->_select_args(@_);
2005
2006   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2007   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
2008   $prepared_bind ||= [];
2009
2010   return wantarray
2011     ? ($sql, $prepared_bind, $bind_attrs)
2012     : \[ "($sql)", @$prepared_bind ]
2013   ;
2014 }
2015
2016 sub _select_args {
2017   my ($self, $ident, $select, $where, $attrs) = @_;
2018
2019   my $sql_maker = $self->sql_maker;
2020   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2021
2022   $attrs = {
2023     %$attrs,
2024     select => $select,
2025     from => $ident,
2026     where => $where,
2027     $rs_alias && $alias2source->{$rs_alias}
2028       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
2029       : ()
2030     ,
2031   };
2032
2033   # calculate bind_attrs before possible $ident mangling
2034   my $bind_attrs = {};
2035   for my $alias (keys %$alias2source) {
2036     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2037     for my $col (keys %$bindtypes) {
2038
2039       my $fqcn = join ('.', $alias, $col);
2040       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2041
2042       # Unqialified column names are nice, but at the same time can be
2043       # rather ambiguous. What we do here is basically go along with
2044       # the loop, adding an unqualified column slot to $bind_attrs,
2045       # alongside the fully qualified name. As soon as we encounter
2046       # another column by that name (which would imply another table)
2047       # we unset the unqualified slot and never add any info to it
2048       # to avoid erroneous type binding. If this happens the users
2049       # only choice will be to fully qualify his column name
2050
2051       if (exists $bind_attrs->{$col}) {
2052         $bind_attrs->{$col} = {};
2053       }
2054       else {
2055         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2056       }
2057     }
2058   }
2059
2060   # Sanity check the attributes (SQLMaker does it too, but
2061   # in case of a software_limit we'll never reach there)
2062   if (defined $attrs->{offset}) {
2063     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2064       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2065   }
2066   $attrs->{offset} ||= 0;
2067
2068   if (defined $attrs->{rows}) {
2069     $self->throw_exception("The rows attribute must be a positive integer if present")
2070       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2071   }
2072   elsif ($attrs->{offset}) {
2073     # MySQL actually recommends this approach.  I cringe.
2074     $attrs->{rows} = $sql_maker->__max_int;
2075   }
2076
2077   my @limit;
2078
2079   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2080   # storage, unless software limit was requested
2081   if (
2082     #limited has_many
2083     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2084        ||
2085     # grouped prefetch (to satisfy group_by == select)
2086     ( $attrs->{group_by}
2087         &&
2088       @{$attrs->{group_by}}
2089         &&
2090       $attrs->{_prefetch_select}
2091         &&
2092       @{$attrs->{_prefetch_select}}
2093     )
2094   ) {
2095     ($ident, $select, $where, $attrs)
2096       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2097   }
2098   elsif (! $attrs->{software_limit} ) {
2099     push @limit, $attrs->{rows}, $attrs->{offset};
2100   }
2101
2102   # try to simplify the joinmap further (prune unreferenced type-single joins)
2103   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2104
2105 ###
2106   # This would be the point to deflate anything found in $where
2107   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2108   # expect a row object. And all we have is a resultsource (it is trivial
2109   # to extract deflator coderefs via $alias2source above).
2110   #
2111   # I don't see a way forward other than changing the way deflators are
2112   # invoked, and that's just bad...
2113 ###
2114
2115   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2116 }
2117
2118 # Returns a counting SELECT for a simple count
2119 # query. Abstracted so that a storage could override
2120 # this to { count => 'firstcol' } or whatever makes
2121 # sense as a performance optimization
2122 sub _count_select {
2123   #my ($self, $source, $rs_attrs) = @_;
2124   return { count => '*' };
2125 }
2126
2127
2128 sub source_bind_attributes {
2129   my ($self, $source) = @_;
2130
2131   my $bind_attributes;
2132
2133   my $colinfo = $source->columns_info;
2134
2135   for my $col (keys %$colinfo) {
2136     if (my $dt = $colinfo->{$col}{data_type} ) {
2137       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2138     }
2139   }
2140
2141   return $bind_attributes;
2142 }
2143
2144 =head2 select
2145
2146 =over 4
2147
2148 =item Arguments: $ident, $select, $condition, $attrs
2149
2150 =back
2151
2152 Handle a SQL select statement.
2153
2154 =cut
2155
2156 sub select {
2157   my $self = shift;
2158   my ($ident, $select, $condition, $attrs) = @_;
2159   return $self->cursor_class->new($self, \@_, $attrs);
2160 }
2161
2162 sub select_single {
2163   my $self = shift;
2164   my ($rv, $sth, @bind) = $self->_select(@_);
2165   my @row = $sth->fetchrow_array;
2166   my @nextrow = $sth->fetchrow_array if @row;
2167   if(@row && @nextrow) {
2168     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2169   }
2170   # Need to call finish() to work round broken DBDs
2171   $sth->finish();
2172   return @row;
2173 }
2174
2175 =head2 sql_limit_dialect
2176
2177 This is an accessor for the default SQL limit dialect used by a particular
2178 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2179 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2180 see L<DBIx::Class::SQLMaker::LimitDialects>.
2181
2182 =head2 sth
2183
2184 =over 4
2185
2186 =item Arguments: $sql
2187
2188 =back
2189
2190 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2191
2192 =cut
2193
2194 sub _dbh_sth {
2195   my ($self, $dbh, $sql) = @_;
2196
2197   # 3 is the if_active parameter which avoids active sth re-use
2198   my $sth = $self->disable_sth_caching
2199     ? $dbh->prepare($sql)
2200     : $dbh->prepare_cached($sql, {}, 3);
2201
2202   # XXX You would think RaiseError would make this impossible,
2203   #  but apparently that's not true :(
2204   $self->throw_exception($dbh->errstr) if !$sth;
2205
2206   $sth;
2207 }
2208
2209 sub sth {
2210   my ($self, $sql) = @_;
2211   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2212 }
2213
2214 sub _dbh_columns_info_for {
2215   my ($self, $dbh, $table) = @_;
2216
2217   if ($dbh->can('column_info')) {
2218     my %result;
2219     my $caught;
2220     try {
2221       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2222       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2223       $sth->execute();
2224       while ( my $info = $sth->fetchrow_hashref() ){
2225         my %column_info;
2226         $column_info{data_type}   = $info->{TYPE_NAME};
2227         $column_info{size}      = $info->{COLUMN_SIZE};
2228         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2229         $column_info{default_value} = $info->{COLUMN_DEF};
2230         my $col_name = $info->{COLUMN_NAME};
2231         $col_name =~ s/^\"(.*)\"$/$1/;
2232
2233         $result{$col_name} = \%column_info;
2234       }
2235     } catch {
2236       $caught = 1;
2237     };
2238     return \%result if !$caught && scalar keys %result;
2239   }
2240
2241   my %result;
2242   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2243   $sth->execute;
2244   my @columns = @{$sth->{NAME_lc}};
2245   for my $i ( 0 .. $#columns ){
2246     my %column_info;
2247     $column_info{data_type} = $sth->{TYPE}->[$i];
2248     $column_info{size} = $sth->{PRECISION}->[$i];
2249     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2250
2251     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2252       $column_info{data_type} = $1;
2253       $column_info{size}    = $2;
2254     }
2255
2256     $result{$columns[$i]} = \%column_info;
2257   }
2258   $sth->finish;
2259
2260   foreach my $col (keys %result) {
2261     my $colinfo = $result{$col};
2262     my $type_num = $colinfo->{data_type};
2263     my $type_name;
2264     if(defined $type_num && $dbh->can('type_info')) {
2265       my $type_info = $dbh->type_info($type_num);
2266       $type_name = $type_info->{TYPE_NAME} if $type_info;
2267       $colinfo->{data_type} = $type_name if $type_name;
2268     }
2269   }
2270
2271   return \%result;
2272 }
2273
2274 sub columns_info_for {
2275   my ($self, $table) = @_;
2276   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2277 }
2278
2279 =head2 last_insert_id
2280
2281 Return the row id of the last insert.
2282
2283 =cut
2284
2285 sub _dbh_last_insert_id {
2286     my ($self, $dbh, $source, $col) = @_;
2287
2288     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2289
2290     return $id if defined $id;
2291
2292     my $class = ref $self;
2293     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2294 }
2295
2296 sub last_insert_id {
2297   my $self = shift;
2298   $self->_dbh_last_insert_id ($self->_dbh, @_);
2299 }
2300
2301 =head2 _native_data_type
2302
2303 =over 4
2304
2305 =item Arguments: $type_name
2306
2307 =back
2308
2309 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2310 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2311 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2312
2313 The default implementation returns C<undef>, implement in your Storage driver if
2314 you need this functionality.
2315
2316 Should map types from other databases to the native RDBMS type, for example
2317 C<VARCHAR2> to C<VARCHAR>.
2318
2319 Types with modifiers should map to the underlying data type. For example,
2320 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2321
2322 Composite types should map to the container type, for example
2323 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2324
2325 =cut
2326
2327 sub _native_data_type {
2328   #my ($self, $data_type) = @_;
2329   return undef
2330 }
2331
2332 # Check if placeholders are supported at all
2333 sub _determine_supports_placeholders {
2334   my $self = shift;
2335   my $dbh  = $self->_get_dbh;
2336
2337   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2338   # but it is inaccurate more often than not
2339   return try {
2340     local $dbh->{PrintError} = 0;
2341     local $dbh->{RaiseError} = 1;
2342     $dbh->do('select ?', {}, 1);
2343     1;
2344   }
2345   catch {
2346     0;
2347   };
2348 }
2349
2350 # Check if placeholders bound to non-string types throw exceptions
2351 #
2352 sub _determine_supports_typeless_placeholders {
2353   my $self = shift;
2354   my $dbh  = $self->_get_dbh;
2355
2356   return try {
2357     local $dbh->{PrintError} = 0;
2358     local $dbh->{RaiseError} = 1;
2359     # this specifically tests a bind that is NOT a string
2360     $dbh->do('select 1 where 1 = ?', {}, 1);
2361     1;
2362   }
2363   catch {
2364     0;
2365   };
2366 }
2367
2368 =head2 sqlt_type
2369
2370 Returns the database driver name.
2371
2372 =cut
2373
2374 sub sqlt_type {
2375   shift->_get_dbh->{Driver}->{Name};
2376 }
2377
2378 =head2 bind_attribute_by_data_type
2379
2380 Given a datatype from column info, returns a database specific bind
2381 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2382 let the database planner just handle it.
2383
2384 Generally only needed for special case column types, like bytea in postgres.
2385
2386 =cut
2387
2388 sub bind_attribute_by_data_type {
2389     return;
2390 }
2391
2392 =head2 is_datatype_numeric
2393
2394 Given a datatype from column_info, returns a boolean value indicating if
2395 the current RDBMS considers it a numeric value. This controls how
2396 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2397 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2398 be performed instead of the usual C<eq>.
2399
2400 =cut
2401
2402 sub is_datatype_numeric {
2403   my ($self, $dt) = @_;
2404
2405   return 0 unless $dt;
2406
2407   return $dt =~ /^ (?:
2408     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2409   ) $/ix;
2410 }
2411
2412
2413 =head2 create_ddl_dir
2414
2415 =over 4
2416
2417 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2418
2419 =back
2420
2421 Creates a SQL file based on the Schema, for each of the specified
2422 database engines in C<\@databases> in the given directory.
2423 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2424
2425 Given a previous version number, this will also create a file containing
2426 the ALTER TABLE statements to transform the previous schema into the
2427 current one. Note that these statements may contain C<DROP TABLE> or
2428 C<DROP COLUMN> statements that can potentially destroy data.
2429
2430 The file names are created using the C<ddl_filename> method below, please
2431 override this method in your schema if you would like a different file
2432 name format. For the ALTER file, the same format is used, replacing
2433 $version in the name with "$preversion-$version".
2434
2435 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2436 The most common value for this would be C<< { add_drop_table => 1 } >>
2437 to have the SQL produced include a C<DROP TABLE> statement for each table
2438 created. For quoting purposes supply C<quote_table_names> and
2439 C<quote_field_names>.
2440
2441 If no arguments are passed, then the following default values are assumed:
2442
2443 =over 4
2444
2445 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2446
2447 =item version    - $schema->schema_version
2448
2449 =item directory  - './'
2450
2451 =item preversion - <none>
2452
2453 =back
2454
2455 By default, C<\%sqlt_args> will have
2456
2457  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2458
2459 merged with the hash passed in. To disable any of those features, pass in a
2460 hashref like the following
2461
2462  { ignore_constraint_names => 0, # ... other options }
2463
2464
2465 WARNING: You are strongly advised to check all SQL files created, before applying
2466 them.
2467
2468 =cut
2469
2470 sub create_ddl_dir {
2471   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2472
2473   unless ($dir) {
2474     carp "No directory given, using ./\n";
2475     $dir = './';
2476   } else {
2477       -d $dir
2478         or
2479       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2480         or
2481       $self->throw_exception(
2482         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2483       );
2484   }
2485
2486   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2487
2488   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2489   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2490
2491   my $schema_version = $schema->schema_version || '1.x';
2492   $version ||= $schema_version;
2493
2494   $sqltargs = {
2495     add_drop_table => 1,
2496     ignore_constraint_names => 1,
2497     ignore_index_names => 1,
2498     %{$sqltargs || {}}
2499   };
2500
2501   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2502     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2503   }
2504
2505   my $sqlt = SQL::Translator->new( $sqltargs );
2506
2507   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2508   my $sqlt_schema = $sqlt->translate({ data => $schema })
2509     or $self->throw_exception ($sqlt->error);
2510
2511   foreach my $db (@$databases) {
2512     $sqlt->reset();
2513     $sqlt->{schema} = $sqlt_schema;
2514     $sqlt->producer($db);
2515
2516     my $file;
2517     my $filename = $schema->ddl_filename($db, $version, $dir);
2518     if (-e $filename && ($version eq $schema_version )) {
2519       # if we are dumping the current version, overwrite the DDL
2520       carp "Overwriting existing DDL file - $filename";
2521       unlink($filename);
2522     }
2523
2524     my $output = $sqlt->translate;
2525     if(!$output) {
2526       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2527       next;
2528     }
2529     if(!open($file, ">$filename")) {
2530       $self->throw_exception("Can't open $filename for writing ($!)");
2531       next;
2532     }
2533     print $file $output;
2534     close($file);
2535
2536     next unless ($preversion);
2537
2538     require SQL::Translator::Diff;
2539
2540     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2541     if(!-e $prefilename) {
2542       carp("No previous schema file found ($prefilename)");
2543       next;
2544     }
2545
2546     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2547     if(-e $difffile) {
2548       carp("Overwriting existing diff file - $difffile");
2549       unlink($difffile);
2550     }
2551
2552     my $source_schema;
2553     {
2554       my $t = SQL::Translator->new($sqltargs);
2555       $t->debug( 0 );
2556       $t->trace( 0 );
2557
2558       $t->parser( $db )
2559         or $self->throw_exception ($t->error);
2560
2561       my $out = $t->translate( $prefilename )
2562         or $self->throw_exception ($t->error);
2563
2564       $source_schema = $t->schema;
2565
2566       $source_schema->name( $prefilename )
2567         unless ( $source_schema->name );
2568     }
2569
2570     # The "new" style of producers have sane normalization and can support
2571     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2572     # And we have to diff parsed SQL against parsed SQL.
2573     my $dest_schema = $sqlt_schema;
2574
2575     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2576       my $t = SQL::Translator->new($sqltargs);
2577       $t->debug( 0 );
2578       $t->trace( 0 );
2579
2580       $t->parser( $db )
2581         or $self->throw_exception ($t->error);
2582
2583       my $out = $t->translate( $filename )
2584         or $self->throw_exception ($t->error);
2585
2586       $dest_schema = $t->schema;
2587
2588       $dest_schema->name( $filename )
2589         unless $dest_schema->name;
2590     }
2591
2592     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2593                                                   $dest_schema,   $db,
2594                                                   $sqltargs
2595                                                  );
2596     if(!open $file, ">$difffile") {
2597       $self->throw_exception("Can't write to $difffile ($!)");
2598       next;
2599     }
2600     print $file $diff;
2601     close($file);
2602   }
2603 }
2604
2605 =head2 deployment_statements
2606
2607 =over 4
2608
2609 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2610
2611 =back
2612
2613 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2614
2615 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2616 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2617
2618 C<$directory> is used to return statements from files in a previously created
2619 L</create_ddl_dir> directory and is optional. The filenames are constructed
2620 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2621
2622 If no C<$directory> is specified then the statements are constructed on the
2623 fly using L<SQL::Translator> and C<$version> is ignored.
2624
2625 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2626
2627 =cut
2628
2629 sub deployment_statements {
2630   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2631   $type ||= $self->sqlt_type;
2632   $version ||= $schema->schema_version || '1.x';
2633   $dir ||= './';
2634   my $filename = $schema->ddl_filename($type, $version, $dir);
2635   if(-f $filename)
2636   {
2637       my $file;
2638       open($file, "<$filename")
2639         or $self->throw_exception("Can't open $filename ($!)");
2640       my @rows = <$file>;
2641       close($file);
2642       return join('', @rows);
2643   }
2644
2645   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2646     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2647   }
2648
2649   # sources needs to be a parser arg, but for simplicty allow at top level
2650   # coming in
2651   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2652       if exists $sqltargs->{sources};
2653
2654   my $tr = SQL::Translator->new(
2655     producer => "SQL::Translator::Producer::${type}",
2656     %$sqltargs,
2657     parser => 'SQL::Translator::Parser::DBIx::Class',
2658     data => $schema,
2659   );
2660
2661   my @ret;
2662   if (wantarray) {
2663     @ret = $tr->translate;
2664   }
2665   else {
2666     $ret[0] = $tr->translate;
2667   }
2668
2669   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2670     unless (@ret && defined $ret[0]);
2671
2672   return wantarray ? @ret : $ret[0];
2673 }
2674
2675 sub deploy {
2676   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2677   my $deploy = sub {
2678     my $line = shift;
2679     return if($line =~ /^--/);
2680     return if(!$line);
2681     # next if($line =~ /^DROP/m);
2682     return if($line =~ /^BEGIN TRANSACTION/m);
2683     return if($line =~ /^COMMIT/m);
2684     return if $line =~ /^\s+$/; # skip whitespace only
2685     $self->_query_start($line);
2686     try {
2687       # do a dbh_do cycle here, as we need some error checking in
2688       # place (even though we will ignore errors)
2689       $self->dbh_do (sub { $_[1]->do($line) });
2690     } catch {
2691       carp qq{$_ (running "${line}")};
2692     };
2693     $self->_query_end($line);
2694   };
2695   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2696   if (@statements > 1) {
2697     foreach my $statement (@statements) {
2698       $deploy->( $statement );
2699     }
2700   }
2701   elsif (@statements == 1) {
2702     foreach my $line ( split(";\n", $statements[0])) {
2703       $deploy->( $line );
2704     }
2705   }
2706 }
2707
2708 =head2 datetime_parser
2709
2710 Returns the datetime parser class
2711
2712 =cut
2713
2714 sub datetime_parser {
2715   my $self = shift;
2716   return $self->{datetime_parser} ||= do {
2717     $self->build_datetime_parser(@_);
2718   };
2719 }
2720
2721 =head2 datetime_parser_type
2722
2723 Defines (returns) the datetime parser class - currently hardwired to
2724 L<DateTime::Format::MySQL>
2725
2726 =cut
2727
2728 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2729
2730 =head2 build_datetime_parser
2731
2732 See L</datetime_parser>
2733
2734 =cut
2735
2736 sub build_datetime_parser {
2737   my $self = shift;
2738   my $type = $self->datetime_parser_type(@_);
2739   $self->ensure_class_loaded ($type);
2740   return $type;
2741 }
2742
2743
2744 =head2 is_replicating
2745
2746 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2747 replicate from a master database.  Default is undef, which is the result
2748 returned by databases that don't support replication.
2749
2750 =cut
2751
2752 sub is_replicating {
2753     return;
2754
2755 }
2756
2757 =head2 lag_behind_master
2758
2759 Returns a number that represents a certain amount of lag behind a master db
2760 when a given storage is replicating.  The number is database dependent, but
2761 starts at zero and increases with the amount of lag. Default in undef
2762
2763 =cut
2764
2765 sub lag_behind_master {
2766     return;
2767 }
2768
2769 =head2 relname_to_table_alias
2770
2771 =over 4
2772
2773 =item Arguments: $relname, $join_count
2774
2775 =back
2776
2777 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2778 queries.
2779
2780 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2781 way these aliases are named.
2782
2783 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2784 otherwise C<"$relname">.
2785
2786 =cut
2787
2788 sub relname_to_table_alias {
2789   my ($self, $relname, $join_count) = @_;
2790
2791   my $alias = ($join_count && $join_count > 1 ?
2792     join('_', $relname, $join_count) : $relname);
2793
2794   return $alias;
2795 }
2796
2797 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2798 # version and it may be necessary to amend or override it for a specific storage
2799 # if such binds are necessary.
2800 sub _max_column_bytesize {
2801   my ($self, $source, $col) = @_;
2802
2803   my $inf = $source->column_info($col);
2804   return $inf->{_max_bytesize} ||= do {
2805
2806     my $max_size;
2807
2808     if (my $data_type = $inf->{data_type}) {
2809       $data_type = lc($data_type);
2810
2811       # String/sized-binary types
2812       if ($data_type =~ /^(?:l?(?:var)?char(?:acter)?(?:\s*varying)?
2813                              |(?:var)?binary(?:\s*varying)?|raw)\b/x
2814       ) {
2815         $max_size = $inf->{size};
2816       }
2817       # Other charset/unicode types, assume scale of 4
2818       elsif ($data_type =~ /^(?:national\s*character(?:\s*varying)?|nchar
2819                               |univarchar
2820                               |nvarchar)\b/x
2821       ) {
2822         $max_size = $inf->{size} * 4 if $inf->{size};
2823       }
2824       # Blob types
2825       elsif ($self->_is_lob_type($data_type)) {
2826         # default to longreadlen
2827       }
2828       else {
2829         $max_size = 100;  # for all other (numeric?) datatypes
2830       }
2831     }
2832
2833     $max_size ||= $self->_get_dbh->{LongReadLen} || 8000;
2834   };
2835 }
2836
2837 # Determine if a data_type is some type of BLOB
2838 sub _is_lob_type {
2839   my ($self, $data_type) = @_;
2840   $data_type && ($data_type =~ /(?:lob|bfile|text|image|bytea|memo)/i
2841     || $data_type =~ /^long(?:\s*(?:raw|bit\s*varying|varbit|binary
2842                                   |varchar|character\s*varying|nvarchar
2843                                   |national\s*character\s*varying))?$/xi);
2844 }
2845
2846 1;
2847
2848 =head1 USAGE NOTES
2849
2850 =head2 DBIx::Class and AutoCommit
2851
2852 DBIx::Class can do some wonderful magic with handling exceptions,
2853 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2854 (the default) combined with C<txn_do> for transaction support.
2855
2856 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2857 in an assumed transaction between commits, and you're telling us you'd
2858 like to manage that manually.  A lot of the magic protections offered by
2859 this module will go away.  We can't protect you from exceptions due to database
2860 disconnects because we don't know anything about how to restart your
2861 transactions.  You're on your own for handling all sorts of exceptional
2862 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2863 be with raw DBI.
2864
2865
2866 =head1 AUTHORS
2867
2868 Matt S. Trout <mst@shadowcatsystems.co.uk>
2869
2870 Andy Grundman <andy@hybridized.org>
2871
2872 =head1 LICENSE
2873
2874 You may distribute this code under the same terms as Perl itself.
2875
2876 =cut