Another overhaul of transaction/savepoint handling
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Try::Tiny;
16 use overload ();
17 use namespace::clean;
18
19 # default cursor class, overridable in connect_info attributes
20 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
21
22 __PACKAGE__->mk_group_accessors('inherited' => qw/
23   sql_limit_dialect sql_quote_char sql_name_sep
24 /);
25
26 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
27
28 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
29 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
30
31 __PACKAGE__->sql_name_sep('.');
32
33 __PACKAGE__->mk_group_accessors('simple' => qw/
34   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
35   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
36 /);
37
38 # the values for these accessors are picked out (and deleted) from
39 # the attribute hashref passed to connect_info
40 my @storage_options = qw/
41   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
42   disable_sth_caching unsafe auto_savepoint
43 /;
44 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
45
46
47 # capability definitions, using a 2-tiered accessor system
48 # The rationale is:
49 #
50 # A driver/user may define _use_X, which blindly without any checks says:
51 # "(do not) use this capability", (use_dbms_capability is an "inherited"
52 # type accessor)
53 #
54 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
55 # accessor, which in turn calls _determine_supports_X, and stores the return
56 # in a special slot on the storage object, which is wiped every time a $dbh
57 # reconnection takes place (it is not guaranteed that upon reconnection we
58 # will get the same rdbms version). _determine_supports_X does not need to
59 # exist on a driver, as we ->can for it before calling.
60
61 my @capabilities = (qw/
62   insert_returning
63   insert_returning_bound
64   placeholders
65   typeless_placeholders
66   join_optimizer
67 /);
68 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
69 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
70
71 # on by default, not strictly a capability (pending rewrite)
72 __PACKAGE__->_use_join_optimizer (1);
73 sub _determine_supports_join_optimizer { 1 };
74
75 # Each of these methods need _determine_driver called before itself
76 # in order to function reliably. This is a purely DRY optimization
77 #
78 # get_(use)_dbms_capability need to be called on the correct Storage
79 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
80 # _determine_supports_X which obv. needs a correct driver as well
81 my @rdbms_specific_methods = qw/
82   deployment_statements
83   sqlt_type
84   sql_maker
85   build_datetime_parser
86   datetime_parser_type
87
88   txn_begin
89   insert
90   insert_bulk
91   update
92   delete
93   select
94   select_single
95
96   get_use_dbms_capability
97   get_dbms_capability
98
99   _server_info
100   _get_server_version
101 /;
102
103 for my $meth (@rdbms_specific_methods) {
104
105   my $orig = __PACKAGE__->can ($meth)
106     or die "$meth is not a ::Storage::DBI method!";
107
108   no strict qw/refs/;
109   no warnings qw/redefine/;
110   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
111     if (
112       # only fire when invoked on an instance, a valid class-based invocation
113       # would e.g. be setting a default for an inherited accessor
114       ref $_[0]
115         and
116       ! $_[0]->_driver_determined
117         and
118       ! $_[0]->{_in_determine_driver}
119     ) {
120       $_[0]->_determine_driver;
121
122       # This for some reason crashes and burns on perl 5.8.1
123       # IFF the method ends up throwing an exception
124       #goto $_[0]->can ($meth);
125
126       my $cref = $_[0]->can ($meth);
127       goto $cref;
128     }
129
130     goto $orig;
131   };
132 }
133
134 =head1 NAME
135
136 DBIx::Class::Storage::DBI - DBI storage handler
137
138 =head1 SYNOPSIS
139
140   my $schema = MySchema->connect('dbi:SQLite:my.db');
141
142   $schema->storage->debug(1);
143
144   my @stuff = $schema->storage->dbh_do(
145     sub {
146       my ($storage, $dbh, @args) = @_;
147       $dbh->do("DROP TABLE authors");
148     },
149     @column_list
150   );
151
152   $schema->resultset('Book')->search({
153      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
154   });
155
156 =head1 DESCRIPTION
157
158 This class represents the connection to an RDBMS via L<DBI>.  See
159 L<DBIx::Class::Storage> for general information.  This pod only
160 documents DBI-specific methods and behaviors.
161
162 =head1 METHODS
163
164 =cut
165
166 sub new {
167   my $new = shift->next::method(@_);
168
169   $new->_sql_maker_opts({});
170   $new->_dbh_details({});
171   $new->{_in_do_block} = 0;
172   $new->{_dbh_gen} = 0;
173
174   # read below to see what this does
175   $new->_arm_global_destructor;
176
177   $new;
178 }
179
180 # This is hack to work around perl shooting stuff in random
181 # order on exit(). If we do not walk the remaining storage
182 # objects in an END block, there is a *small but real* chance
183 # of a fork()ed child to kill the parent's shared DBI handle,
184 # *before perl reaches the DESTROY in this package*
185 # Yes, it is ugly and effective.
186 # Additionally this registry is used by the CLONE method to
187 # make sure no handles are shared between threads
188 {
189   my %seek_and_destroy;
190
191   sub _arm_global_destructor {
192     my $self = shift;
193     my $key = refaddr ($self);
194     $seek_and_destroy{$key} = $self;
195     weaken ($seek_and_destroy{$key});
196   }
197
198   END {
199     local $?; # just in case the DBI destructor changes it somehow
200
201     # destroy just the object if not native to this process/thread
202     $_->_verify_pid for (grep
203       { defined $_ }
204       values %seek_and_destroy
205     );
206   }
207
208   sub CLONE {
209     # As per DBI's recommendation, DBIC disconnects all handles as
210     # soon as possible (DBIC will reconnect only on demand from within
211     # the thread)
212     for (values %seek_and_destroy) {
213       next unless $_;
214       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
215       $_->_dbh(undef);
216
217       $_->transaction_depth(0);
218       $_->savepoints([]);
219     }
220   }
221 }
222
223 sub DESTROY {
224   my $self = shift;
225
226   # some databases spew warnings on implicit disconnect
227   local $SIG{__WARN__} = sub {};
228   $self->_dbh(undef);
229
230   # this op is necessary, since the very last perl runtime statement
231   # triggers a global destruction shootout, and the $SIG localization
232   # may very well be destroyed before perl actually gets to do the
233   # $dbh undef
234   1;
235 }
236
237 # handle pid changes correctly - do not destroy parent's connection
238 sub _verify_pid {
239   my $self = shift;
240
241   my $pid = $self->_conn_pid;
242   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
243     $dbh->{InactiveDestroy} = 1;
244     $self->{_dbh_gen}++;
245     $self->_dbh(undef);
246     $self->transaction_depth(0);
247     $self->savepoints([]);
248   }
249
250   return;
251 }
252
253 =head2 connect_info
254
255 This method is normally called by L<DBIx::Class::Schema/connection>, which
256 encapsulates its argument list in an arrayref before passing them here.
257
258 The argument list may contain:
259
260 =over
261
262 =item *
263
264 The same 4-element argument set one would normally pass to
265 L<DBI/connect>, optionally followed by
266 L<extra attributes|/DBIx::Class specific connection attributes>
267 recognized by DBIx::Class:
268
269   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
270
271 =item *
272
273 A single code reference which returns a connected
274 L<DBI database handle|DBI/connect> optionally followed by
275 L<extra attributes|/DBIx::Class specific connection attributes> recognized
276 by DBIx::Class:
277
278   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
279
280 =item *
281
282 A single hashref with all the attributes and the dsn/user/password
283 mixed together:
284
285   $connect_info_args = [{
286     dsn => $dsn,
287     user => $user,
288     password => $pass,
289     %dbi_attributes,
290     %extra_attributes,
291   }];
292
293   $connect_info_args = [{
294     dbh_maker => sub { DBI->connect (...) },
295     %dbi_attributes,
296     %extra_attributes,
297   }];
298
299 This is particularly useful for L<Catalyst> based applications, allowing the
300 following config (L<Config::General> style):
301
302   <Model::DB>
303     schema_class   App::DB
304     <connect_info>
305       dsn          dbi:mysql:database=test
306       user         testuser
307       password     TestPass
308       AutoCommit   1
309     </connect_info>
310   </Model::DB>
311
312 The C<dsn>/C<user>/C<password> combination can be substituted by the
313 C<dbh_maker> key whose value is a coderef that returns a connected
314 L<DBI database handle|DBI/connect>
315
316 =back
317
318 Please note that the L<DBI> docs recommend that you always explicitly
319 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
320 recommends that it be set to I<1>, and that you perform transactions
321 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
322 to I<1> if you do not do explicitly set it to zero.  This is the default
323 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
324
325 =head3 DBIx::Class specific connection attributes
326
327 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
328 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
329 the following connection options. These options can be mixed in with your other
330 L<DBI> connection attributes, or placed in a separate hashref
331 (C<\%extra_attributes>) as shown above.
332
333 Every time C<connect_info> is invoked, any previous settings for
334 these options will be cleared before setting the new ones, regardless of
335 whether any options are specified in the new C<connect_info>.
336
337
338 =over
339
340 =item on_connect_do
341
342 Specifies things to do immediately after connecting or re-connecting to
343 the database.  Its value may contain:
344
345 =over
346
347 =item a scalar
348
349 This contains one SQL statement to execute.
350
351 =item an array reference
352
353 This contains SQL statements to execute in order.  Each element contains
354 a string or a code reference that returns a string.
355
356 =item a code reference
357
358 This contains some code to execute.  Unlike code references within an
359 array reference, its return value is ignored.
360
361 =back
362
363 =item on_disconnect_do
364
365 Takes arguments in the same form as L</on_connect_do> and executes them
366 immediately before disconnecting from the database.
367
368 Note, this only runs if you explicitly call L</disconnect> on the
369 storage object.
370
371 =item on_connect_call
372
373 A more generalized form of L</on_connect_do> that calls the specified
374 C<connect_call_METHOD> methods in your storage driver.
375
376   on_connect_do => 'select 1'
377
378 is equivalent to:
379
380   on_connect_call => [ [ do_sql => 'select 1' ] ]
381
382 Its values may contain:
383
384 =over
385
386 =item a scalar
387
388 Will call the C<connect_call_METHOD> method.
389
390 =item a code reference
391
392 Will execute C<< $code->($storage) >>
393
394 =item an array reference
395
396 Each value can be a method name or code reference.
397
398 =item an array of arrays
399
400 For each array, the first item is taken to be the C<connect_call_> method name
401 or code reference, and the rest are parameters to it.
402
403 =back
404
405 Some predefined storage methods you may use:
406
407 =over
408
409 =item do_sql
410
411 Executes a SQL string or a code reference that returns a SQL string. This is
412 what L</on_connect_do> and L</on_disconnect_do> use.
413
414 It can take:
415
416 =over
417
418 =item a scalar
419
420 Will execute the scalar as SQL.
421
422 =item an arrayref
423
424 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
425 attributes hashref and bind values.
426
427 =item a code reference
428
429 Will execute C<< $code->($storage) >> and execute the return array refs as
430 above.
431
432 =back
433
434 =item datetime_setup
435
436 Execute any statements necessary to initialize the database session to return
437 and accept datetime/timestamp values used with
438 L<DBIx::Class::InflateColumn::DateTime>.
439
440 Only necessary for some databases, see your specific storage driver for
441 implementation details.
442
443 =back
444
445 =item on_disconnect_call
446
447 Takes arguments in the same form as L</on_connect_call> and executes them
448 immediately before disconnecting from the database.
449
450 Calls the C<disconnect_call_METHOD> methods as opposed to the
451 C<connect_call_METHOD> methods called by L</on_connect_call>.
452
453 Note, this only runs if you explicitly call L</disconnect> on the
454 storage object.
455
456 =item disable_sth_caching
457
458 If set to a true value, this option will disable the caching of
459 statement handles via L<DBI/prepare_cached>.
460
461 =item limit_dialect
462
463 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
464 default L</sql_limit_dialect> setting of the storage (if any). For a list
465 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
466
467 =item quote_names
468
469 When true automatically sets L</quote_char> and L</name_sep> to the characters
470 appropriate for your particular RDBMS. This option is preferred over specifying
471 L</quote_char> directly.
472
473 =item quote_char
474
475 Specifies what characters to use to quote table and column names.
476
477 C<quote_char> expects either a single character, in which case is it
478 is placed on either side of the table/column name, or an arrayref of length
479 2 in which case the table/column name is placed between the elements.
480
481 For example under MySQL you should use C<< quote_char => '`' >>, and for
482 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
483
484 =item name_sep
485
486 This parameter is only useful in conjunction with C<quote_char>, and is used to
487 specify the character that separates elements (schemas, tables, columns) from
488 each other. If unspecified it defaults to the most commonly used C<.>.
489
490 =item unsafe
491
492 This Storage driver normally installs its own C<HandleError>, sets
493 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
494 all database handles, including those supplied by a coderef.  It does this
495 so that it can have consistent and useful error behavior.
496
497 If you set this option to a true value, Storage will not do its usual
498 modifications to the database handle's attributes, and instead relies on
499 the settings in your connect_info DBI options (or the values you set in
500 your connection coderef, in the case that you are connecting via coderef).
501
502 Note that your custom settings can cause Storage to malfunction,
503 especially if you set a C<HandleError> handler that suppresses exceptions
504 and/or disable C<RaiseError>.
505
506 =item auto_savepoint
507
508 If this option is true, L<DBIx::Class> will use savepoints when nesting
509 transactions, making it possible to recover from failure in the inner
510 transaction without having to abort all outer transactions.
511
512 =item cursor_class
513
514 Use this argument to supply a cursor class other than the default
515 L<DBIx::Class::Storage::DBI::Cursor>.
516
517 =back
518
519 Some real-life examples of arguments to L</connect_info> and
520 L<DBIx::Class::Schema/connect>
521
522   # Simple SQLite connection
523   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
524
525   # Connect via subref
526   ->connect_info([ sub { DBI->connect(...) } ]);
527
528   # Connect via subref in hashref
529   ->connect_info([{
530     dbh_maker => sub { DBI->connect(...) },
531     on_connect_do => 'alter session ...',
532   }]);
533
534   # A bit more complicated
535   ->connect_info(
536     [
537       'dbi:Pg:dbname=foo',
538       'postgres',
539       'my_pg_password',
540       { AutoCommit => 1 },
541       { quote_char => q{"} },
542     ]
543   );
544
545   # Equivalent to the previous example
546   ->connect_info(
547     [
548       'dbi:Pg:dbname=foo',
549       'postgres',
550       'my_pg_password',
551       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
552     ]
553   );
554
555   # Same, but with hashref as argument
556   # See parse_connect_info for explanation
557   ->connect_info(
558     [{
559       dsn         => 'dbi:Pg:dbname=foo',
560       user        => 'postgres',
561       password    => 'my_pg_password',
562       AutoCommit  => 1,
563       quote_char  => q{"},
564       name_sep    => q{.},
565     }]
566   );
567
568   # Subref + DBIx::Class-specific connection options
569   ->connect_info(
570     [
571       sub { DBI->connect(...) },
572       {
573           quote_char => q{`},
574           name_sep => q{@},
575           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
576           disable_sth_caching => 1,
577       },
578     ]
579   );
580
581
582
583 =cut
584
585 sub connect_info {
586   my ($self, $info) = @_;
587
588   return $self->_connect_info if !$info;
589
590   $self->_connect_info($info); # copy for _connect_info
591
592   $info = $self->_normalize_connect_info($info)
593     if ref $info eq 'ARRAY';
594
595   for my $storage_opt (keys %{ $info->{storage_options} }) {
596     my $value = $info->{storage_options}{$storage_opt};
597
598     $self->$storage_opt($value);
599   }
600
601   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
602   #  the new set of options
603   $self->_sql_maker(undef);
604   $self->_sql_maker_opts({});
605
606   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
607     my $value = $info->{sql_maker_options}{$sql_maker_opt};
608
609     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
610   }
611
612   my %attrs = (
613     %{ $self->_default_dbi_connect_attributes || {} },
614     %{ $info->{attributes} || {} },
615   );
616
617   my @args = @{ $info->{arguments} };
618
619   if (keys %attrs and ref $args[0] ne 'CODE') {
620     carp
621         'You provided explicit AutoCommit => 0 in your connection_info. '
622       . 'This is almost universally a bad idea (see the footnotes of '
623       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
624       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
625       . 'this warning.'
626       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
627
628     push @args, \%attrs if keys %attrs;
629   }
630   $self->_dbi_connect_info(\@args);
631
632   # FIXME - dirty:
633   # save attributes them in a separate accessor so they are always
634   # introspectable, even in case of a CODE $dbhmaker
635   $self->_dbic_connect_attributes (\%attrs);
636
637   return $self->_connect_info;
638 }
639
640 sub _normalize_connect_info {
641   my ($self, $info_arg) = @_;
642   my %info;
643
644   my @args = @$info_arg;  # take a shallow copy for further mutilation
645
646   # combine/pre-parse arguments depending on invocation style
647
648   my %attrs;
649   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
650     %attrs = %{ $args[1] || {} };
651     @args = $args[0];
652   }
653   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
654     %attrs = %{$args[0]};
655     @args = ();
656     if (my $code = delete $attrs{dbh_maker}) {
657       @args = $code;
658
659       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
660       if (@ignored) {
661         carp sprintf (
662             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
663           . "to the result of 'dbh_maker'",
664
665           join (', ', map { "'$_'" } (@ignored) ),
666         );
667       }
668     }
669     else {
670       @args = delete @attrs{qw/dsn user password/};
671     }
672   }
673   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
674     %attrs = (
675       % { $args[3] || {} },
676       % { $args[4] || {} },
677     );
678     @args = @args[0,1,2];
679   }
680
681   $info{arguments} = \@args;
682
683   my @storage_opts = grep exists $attrs{$_},
684     @storage_options, 'cursor_class';
685
686   @{ $info{storage_options} }{@storage_opts} =
687     delete @attrs{@storage_opts} if @storage_opts;
688
689   my @sql_maker_opts = grep exists $attrs{$_},
690     qw/limit_dialect quote_char name_sep quote_names/;
691
692   @{ $info{sql_maker_options} }{@sql_maker_opts} =
693     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
694
695   $info{attributes} = \%attrs if %attrs;
696
697   return \%info;
698 }
699
700 sub _default_dbi_connect_attributes () {
701   +{
702     AutoCommit => 1,
703     PrintError => 0,
704     RaiseError => 1,
705     ShowErrorStatement => 1,
706   };
707 }
708
709 =head2 on_connect_do
710
711 This method is deprecated in favour of setting via L</connect_info>.
712
713 =cut
714
715 =head2 on_disconnect_do
716
717 This method is deprecated in favour of setting via L</connect_info>.
718
719 =cut
720
721 sub _parse_connect_do {
722   my ($self, $type) = @_;
723
724   my $val = $self->$type;
725   return () if not defined $val;
726
727   my @res;
728
729   if (not ref($val)) {
730     push @res, [ 'do_sql', $val ];
731   } elsif (ref($val) eq 'CODE') {
732     push @res, $val;
733   } elsif (ref($val) eq 'ARRAY') {
734     push @res, map { [ 'do_sql', $_ ] } @$val;
735   } else {
736     $self->throw_exception("Invalid type for $type: ".ref($val));
737   }
738
739   return \@res;
740 }
741
742 =head2 dbh_do
743
744 Arguments: ($subref | $method_name), @extra_coderef_args?
745
746 Execute the given $subref or $method_name using the new exception-based
747 connection management.
748
749 The first two arguments will be the storage object that C<dbh_do> was called
750 on and a database handle to use.  Any additional arguments will be passed
751 verbatim to the called subref as arguments 2 and onwards.
752
753 Using this (instead of $self->_dbh or $self->dbh) ensures correct
754 exception handling and reconnection (or failover in future subclasses).
755
756 Your subref should have no side-effects outside of the database, as
757 there is the potential for your subref to be partially double-executed
758 if the database connection was stale/dysfunctional.
759
760 Example:
761
762   my @stuff = $schema->storage->dbh_do(
763     sub {
764       my ($storage, $dbh, @cols) = @_;
765       my $cols = join(q{, }, @cols);
766       $dbh->selectrow_array("SELECT $cols FROM foo");
767     },
768     @column_list
769   );
770
771 =cut
772
773 sub dbh_do {
774   my $self = shift;
775   my $code = shift;
776
777   my $dbh = $self->_get_dbh;
778
779   return $self->$code($dbh, @_)
780     if ( $self->{_in_do_block} || $self->{transaction_depth} );
781
782   local $self->{_in_do_block} = 1;
783
784   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
785   my $args = \@_;
786
787   try {
788     $self->$code ($dbh, @$args);
789   } catch {
790     $self->throw_exception($_) if $self->connected;
791
792     # We were not connected - reconnect and retry, but let any
793     #  exception fall right through this time
794     carp "Retrying dbh_do($code) after catching disconnected exception: $_"
795       if $ENV{DBIC_STORAGE_RETRY_DEBUG};
796
797     $self->_populate_dbh;
798     $self->$code($self->_dbh, @$args);
799   };
800 }
801
802 sub txn_do {
803   # connects or reconnects on pid change, necessary to grab correct txn_depth
804   $_[0]->_get_dbh;
805   local $_[0]->{_in_do_block} = 1;
806   shift->next::method(@_);
807 }
808
809 =head2 disconnect
810
811 Our C<disconnect> method also performs a rollback first if the
812 database is not in C<AutoCommit> mode.
813
814 =cut
815
816 sub disconnect {
817   my ($self) = @_;
818
819   if( $self->_dbh ) {
820     my @actions;
821
822     push @actions, ( $self->on_disconnect_call || () );
823     push @actions, $self->_parse_connect_do ('on_disconnect_do');
824
825     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
826
827     # stops the "implicit rollback on disconnect" warning
828     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
829
830     %{ $self->_dbh->{CachedKids} } = ();
831     $self->_dbh->disconnect;
832     $self->_dbh(undef);
833     $self->{_dbh_gen}++;
834   }
835 }
836
837 =head2 with_deferred_fk_checks
838
839 =over 4
840
841 =item Arguments: C<$coderef>
842
843 =item Return Value: The return value of $coderef
844
845 =back
846
847 Storage specific method to run the code ref with FK checks deferred or
848 in MySQL's case disabled entirely.
849
850 =cut
851
852 # Storage subclasses should override this
853 sub with_deferred_fk_checks {
854   my ($self, $sub) = @_;
855   $sub->();
856 }
857
858 =head2 connected
859
860 =over
861
862 =item Arguments: none
863
864 =item Return Value: 1|0
865
866 =back
867
868 Verifies that the current database handle is active and ready to execute
869 an SQL statement (e.g. the connection did not get stale, server is still
870 answering, etc.) This method is used internally by L</dbh>.
871
872 =cut
873
874 sub connected {
875   my $self = shift;
876   return 0 unless $self->_seems_connected;
877
878   #be on the safe side
879   local $self->_dbh->{RaiseError} = 1;
880
881   return $self->_ping;
882 }
883
884 sub _seems_connected {
885   my $self = shift;
886
887   $self->_verify_pid;
888
889   my $dbh = $self->_dbh
890     or return 0;
891
892   return $dbh->FETCH('Active');
893 }
894
895 sub _ping {
896   my $self = shift;
897
898   my $dbh = $self->_dbh or return 0;
899
900   return $dbh->ping;
901 }
902
903 sub ensure_connected {
904   my ($self) = @_;
905
906   unless ($self->connected) {
907     $self->_populate_dbh;
908   }
909 }
910
911 =head2 dbh
912
913 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
914 is guaranteed to be healthy by implicitly calling L</connected>, and if
915 necessary performing a reconnection before returning. Keep in mind that this
916 is very B<expensive> on some database engines. Consider using L</dbh_do>
917 instead.
918
919 =cut
920
921 sub dbh {
922   my ($self) = @_;
923
924   if (not $self->_dbh) {
925     $self->_populate_dbh;
926   } else {
927     $self->ensure_connected;
928   }
929   return $self->_dbh;
930 }
931
932 # this is the internal "get dbh or connect (don't check)" method
933 sub _get_dbh {
934   my $self = shift;
935   $self->_verify_pid;
936   $self->_populate_dbh unless $self->_dbh;
937   return $self->_dbh;
938 }
939
940 sub sql_maker {
941   my ($self) = @_;
942   unless ($self->_sql_maker) {
943     my $sql_maker_class = $self->sql_maker_class;
944
945     my %opts = %{$self->_sql_maker_opts||{}};
946     my $dialect =
947       $opts{limit_dialect}
948         ||
949       $self->sql_limit_dialect
950         ||
951       do {
952         my $s_class = (ref $self) || $self;
953         carp (
954           "Your storage class ($s_class) does not set sql_limit_dialect and you "
955         . 'have not supplied an explicit limit_dialect in your connection_info. '
956         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
957         . 'databases but can be (and often is) painfully slow. '
958         . "Please file an RT ticket against '$s_class' ."
959         );
960
961         'GenericSubQ';
962       }
963     ;
964
965     my ($quote_char, $name_sep);
966
967     if ($opts{quote_names}) {
968       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
969         my $s_class = (ref $self) || $self;
970         carp (
971           "You requested 'quote_names' but your storage class ($s_class) does "
972         . 'not explicitly define a default sql_quote_char and you have not '
973         . 'supplied a quote_char as part of your connection_info. DBIC will '
974         .q{default to the ANSI SQL standard quote '"', which works most of }
975         . "the time. Please file an RT ticket against '$s_class'."
976         );
977
978         '"'; # RV
979       };
980
981       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
982     }
983
984     $self->_sql_maker($sql_maker_class->new(
985       bindtype=>'columns',
986       array_datatypes => 1,
987       limit_dialect => $dialect,
988       ($quote_char ? (quote_char => $quote_char) : ()),
989       name_sep => ($name_sep || '.'),
990       %opts,
991     ));
992   }
993   return $self->_sql_maker;
994 }
995
996 # nothing to do by default
997 sub _rebless {}
998 sub _init {}
999
1000 sub _populate_dbh {
1001   my ($self) = @_;
1002
1003   my @info = @{$self->_dbi_connect_info || []};
1004   $self->_dbh(undef); # in case ->connected failed we might get sent here
1005   $self->_dbh_details({}); # reset everything we know
1006
1007   $self->_dbh($self->_connect(@info));
1008
1009   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1010
1011   $self->_determine_driver;
1012
1013   # Always set the transaction depth on connect, since
1014   #  there is no transaction in progress by definition
1015   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1016
1017   $self->_run_connection_actions unless $self->{_in_determine_driver};
1018 }
1019
1020 sub _run_connection_actions {
1021   my $self = shift;
1022   my @actions;
1023
1024   push @actions, ( $self->on_connect_call || () );
1025   push @actions, $self->_parse_connect_do ('on_connect_do');
1026
1027   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1028 }
1029
1030
1031
1032 sub set_use_dbms_capability {
1033   $_[0]->set_inherited ($_[1], $_[2]);
1034 }
1035
1036 sub get_use_dbms_capability {
1037   my ($self, $capname) = @_;
1038
1039   my $use = $self->get_inherited ($capname);
1040   return defined $use
1041     ? $use
1042     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1043   ;
1044 }
1045
1046 sub set_dbms_capability {
1047   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1048 }
1049
1050 sub get_dbms_capability {
1051   my ($self, $capname) = @_;
1052
1053   my $cap = $self->_dbh_details->{capability}{$capname};
1054
1055   unless (defined $cap) {
1056     if (my $meth = $self->can ("_determine$capname")) {
1057       $cap = $self->$meth ? 1 : 0;
1058     }
1059     else {
1060       $cap = 0;
1061     }
1062
1063     $self->set_dbms_capability ($capname, $cap);
1064   }
1065
1066   return $cap;
1067 }
1068
1069 sub _server_info {
1070   my $self = shift;
1071
1072   my $info;
1073   unless ($info = $self->_dbh_details->{info}) {
1074
1075     $info = {};
1076
1077     my $server_version = try { $self->_get_server_version };
1078
1079     if (defined $server_version) {
1080       $info->{dbms_version} = $server_version;
1081
1082       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1083       my @verparts = split (/\./, $numeric_version);
1084       if (
1085         @verparts
1086           &&
1087         $verparts[0] <= 999
1088       ) {
1089         # consider only up to 3 version parts, iff not more than 3 digits
1090         my @use_parts;
1091         while (@verparts && @use_parts < 3) {
1092           my $p = shift @verparts;
1093           last if $p > 999;
1094           push @use_parts, $p;
1095         }
1096         push @use_parts, 0 while @use_parts < 3;
1097
1098         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1099       }
1100     }
1101
1102     $self->_dbh_details->{info} = $info;
1103   }
1104
1105   return $info;
1106 }
1107
1108 sub _get_server_version {
1109   shift->_dbh_get_info(18);
1110 }
1111
1112 sub _dbh_get_info {
1113   my ($self, $info) = @_;
1114
1115   return try { $self->_get_dbh->get_info($info) } || undef;
1116 }
1117
1118 sub _determine_driver {
1119   my ($self) = @_;
1120
1121   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1122     my $started_connected = 0;
1123     local $self->{_in_determine_driver} = 1;
1124
1125     if (ref($self) eq __PACKAGE__) {
1126       my $driver;
1127       if ($self->_dbh) { # we are connected
1128         $driver = $self->_dbh->{Driver}{Name};
1129         $started_connected = 1;
1130       } else {
1131         # if connect_info is a CODEREF, we have no choice but to connect
1132         if (ref $self->_dbi_connect_info->[0] &&
1133             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1134           $self->_populate_dbh;
1135           $driver = $self->_dbh->{Driver}{Name};
1136         }
1137         else {
1138           # try to use dsn to not require being connected, the driver may still
1139           # force a connection in _rebless to determine version
1140           # (dsn may not be supplied at all if all we do is make a mock-schema)
1141           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1142           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1143           $driver ||= $ENV{DBI_DRIVER};
1144         }
1145       }
1146
1147       if ($driver) {
1148         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1149         if ($self->load_optional_class($storage_class)) {
1150           mro::set_mro($storage_class, 'c3');
1151           bless $self, $storage_class;
1152           $self->_rebless();
1153         }
1154       }
1155     }
1156
1157     $self->_driver_determined(1);
1158
1159     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1160
1161     $self->_init; # run driver-specific initializations
1162
1163     $self->_run_connection_actions
1164         if !$started_connected && defined $self->_dbh;
1165   }
1166 }
1167
1168 sub _do_connection_actions {
1169   my $self          = shift;
1170   my $method_prefix = shift;
1171   my $call          = shift;
1172
1173   if (not ref($call)) {
1174     my $method = $method_prefix . $call;
1175     $self->$method(@_);
1176   } elsif (ref($call) eq 'CODE') {
1177     $self->$call(@_);
1178   } elsif (ref($call) eq 'ARRAY') {
1179     if (ref($call->[0]) ne 'ARRAY') {
1180       $self->_do_connection_actions($method_prefix, $_) for @$call;
1181     } else {
1182       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1183     }
1184   } else {
1185     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1186   }
1187
1188   return $self;
1189 }
1190
1191 sub connect_call_do_sql {
1192   my $self = shift;
1193   $self->_do_query(@_);
1194 }
1195
1196 sub disconnect_call_do_sql {
1197   my $self = shift;
1198   $self->_do_query(@_);
1199 }
1200
1201 # override in db-specific backend when necessary
1202 sub connect_call_datetime_setup { 1 }
1203
1204 sub _do_query {
1205   my ($self, $action) = @_;
1206
1207   if (ref $action eq 'CODE') {
1208     $action = $action->($self);
1209     $self->_do_query($_) foreach @$action;
1210   }
1211   else {
1212     # Most debuggers expect ($sql, @bind), so we need to exclude
1213     # the attribute hash which is the second argument to $dbh->do
1214     # furthermore the bind values are usually to be presented
1215     # as named arrayref pairs, so wrap those here too
1216     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1217     my $sql = shift @do_args;
1218     my $attrs = shift @do_args;
1219     my @bind = map { [ undef, $_ ] } @do_args;
1220
1221     $self->_query_start($sql, \@bind);
1222     $self->_get_dbh->do($sql, $attrs, @do_args);
1223     $self->_query_end($sql, \@bind);
1224   }
1225
1226   return $self;
1227 }
1228
1229 sub _connect {
1230   my ($self, @info) = @_;
1231
1232   $self->throw_exception("You failed to provide any connection info")
1233     if !@info;
1234
1235   my ($old_connect_via, $dbh);
1236
1237   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1238     $old_connect_via = $DBI::connect_via;
1239     $DBI::connect_via = 'connect';
1240   }
1241
1242   try {
1243     if(ref $info[0] eq 'CODE') {
1244       $dbh = $info[0]->();
1245     }
1246     else {
1247       require DBI;
1248       $dbh = DBI->connect(@info);
1249     }
1250
1251     if (!$dbh) {
1252       die $DBI::errstr;
1253     }
1254
1255     unless ($self->unsafe) {
1256
1257       $self->throw_exception(
1258         'Refusing clobbering of {HandleError} installed on externally supplied '
1259        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1260       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1261
1262       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1263       # request, or an external handle. Complain and set anyway
1264       unless ($dbh->{RaiseError}) {
1265         carp( ref $info[0] eq 'CODE'
1266
1267           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1268            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1269            .'attribute has been supplied'
1270
1271           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1272            .'unsafe => 1. Toggling RaiseError back to true'
1273         );
1274
1275         $dbh->{RaiseError} = 1;
1276       }
1277
1278       # this odd anonymous coderef dereference is in fact really
1279       # necessary to avoid the unwanted effect described in perl5
1280       # RT#75792
1281       sub {
1282         my $weak_self = $_[0];
1283         weaken $weak_self;
1284
1285         # the coderef is blessed so we can distinguish it from externally
1286         # supplied handles (which must be preserved)
1287         $_[1]->{HandleError} = bless sub {
1288           if ($weak_self) {
1289             $weak_self->throw_exception("DBI Exception: $_[0]");
1290           }
1291           else {
1292             # the handler may be invoked by something totally out of
1293             # the scope of DBIC
1294             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1295           }
1296         }, '__DBIC__DBH__ERROR__HANDLER__';
1297       }->($self, $dbh);
1298     }
1299   }
1300   catch {
1301     $self->throw_exception("DBI Connection failed: $_")
1302   }
1303   finally {
1304     $DBI::connect_via = $old_connect_via if $old_connect_via;
1305   };
1306
1307   $self->_dbh_autocommit($dbh->{AutoCommit});
1308   $dbh;
1309 }
1310
1311 sub txn_begin {
1312   my $self = shift;
1313
1314   # this means we have not yet connected and do not know the AC status
1315   # (e.g. coderef $dbh), need a full-fledged connection check
1316   if (! defined $self->_dbh_autocommit) {
1317     $self->ensure_connected;
1318   }
1319   # Otherwise simply connect or re-connect on pid changes
1320   else {
1321     $self->_get_dbh;
1322   }
1323
1324   $self->next::method(@_);
1325 }
1326
1327 sub _exec_txn_begin {
1328   my $self = shift;
1329
1330   # if the user is utilizing txn_do - good for him, otherwise we need to
1331   # ensure that the $dbh is healthy on BEGIN.
1332   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1333   # will be replaced by a failure of begin_work itself (which will be
1334   # then retried on reconnect)
1335   if ($self->{_in_do_block}) {
1336     $self->_dbh->begin_work;
1337   } else {
1338     $self->dbh_do(sub { $_[1]->begin_work });
1339   }
1340 }
1341
1342 sub txn_commit {
1343   my $self = shift;
1344
1345   $self->_verify_pid if $self->_dbh;
1346   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1347     unless $self->_dbh;
1348
1349   # esoteric case for folks using external $dbh handles
1350   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1351     carp "Storage transaction_depth 0 does not match "
1352         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1353     $self->transaction_depth(1);
1354   }
1355
1356   $self->next::method(@_);
1357
1358   # if AutoCommit is disabled txn_depth never goes to 0
1359   # as a new txn is started immediately on commit
1360   $self->transaction_depth(1) if (
1361     !$self->transaction_depth
1362       and 
1363     defined $self->_dbh_autocommit
1364       and
1365     ! $self->_dbh_autocommit
1366   );
1367 }
1368
1369 sub _exec_txn_commit {
1370   shift->_dbh->commit;
1371 }
1372
1373 sub txn_rollback {
1374   my $self = shift;
1375
1376   $self->_verify_pid if $self->_dbh;
1377   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1378     unless $self->_dbh;
1379
1380   # esoteric case for folks using external $dbh handles
1381   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1382     carp "Storage transaction_depth 0 does not match "
1383         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1384     $self->transaction_depth(1);
1385   }
1386
1387   $self->next::method(@_);
1388
1389   # if AutoCommit is disabled txn_depth never goes to 0
1390   # as a new txn is started immediately on commit
1391   $self->transaction_depth(1) if (
1392     !$self->transaction_depth
1393       and 
1394     defined $self->_dbh_autocommit
1395       and
1396     ! $self->_dbh_autocommit
1397   );
1398 }
1399
1400 sub _exec_txn_rollback {
1401   shift->_dbh->rollback;
1402 }
1403
1404 # generate some identical methods
1405 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1406   no strict qw/refs/;
1407   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1408     my $self = shift;
1409     $self->_verify_pid if $self->_dbh;
1410     $self->throw_exception("Unable to $meth() on a disconnected storage")
1411       unless $self->_dbh;
1412     $self->next::method(@_);
1413   };
1414 }
1415
1416 # This used to be the top-half of _execute.  It was split out to make it
1417 #  easier to override in NoBindVars without duping the rest.  It takes up
1418 #  all of _execute's args, and emits $sql, @bind.
1419 sub _prep_for_execute {
1420   my ($self, $op, $ident, $args) = @_;
1421
1422   my ($sql, @bind) = $self->sql_maker->$op(
1423     blessed($ident) ? $ident->from : $ident,
1424     @$args,
1425   );
1426
1427   my (@final_bind, $colinfos);
1428   my $resolve_bindinfo = sub {
1429     $colinfos ||= $self->_resolve_column_info($ident);
1430     if (my $col = $_[1]->{dbic_colname}) {
1431       $_[1]->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1432         if $colinfos->{$col}{data_type};
1433       $_[1]->{sqlt_size} ||= $colinfos->{$col}{size}
1434         if $colinfos->{$col}{size};
1435     }
1436     $_[1];
1437   };
1438
1439   for my $e (@{$args->[2]{bind}||[]}, @bind) {
1440     push @final_bind, [ do {
1441       if (ref $e ne 'ARRAY') {
1442         ({}, $e)
1443       }
1444       elsif (! defined $e->[0]) {
1445         ({}, $e->[1])
1446       }
1447       elsif (ref $e->[0] eq 'HASH') {
1448         (
1449           (first { $e->[0]{$_} } qw/dbd_attrs sqlt_datatype/) ? $e->[0] : $self->$resolve_bindinfo($e->[0]),
1450           $e->[1]
1451         )
1452       }
1453       elsif (ref $e->[0] eq 'SCALAR') {
1454         ( { sqlt_datatype => ${$e->[0]} }, $e->[1] )
1455       }
1456       else {
1457         ( $self->$resolve_bindinfo({ dbic_colname => $e->[0] }), $e->[1] )
1458       }
1459     }];
1460   }
1461
1462   ($sql, \@final_bind);
1463 }
1464
1465 sub _format_for_trace {
1466   #my ($self, $bind) = @_;
1467
1468   ### Turn @bind from something like this:
1469   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1470   ### to this:
1471   ###   ( "'1'", "'3'" )
1472
1473   map {
1474     defined( $_ && $_->[1] )
1475       ? qq{'$_->[1]'}
1476       : q{NULL}
1477   } @{$_[1] || []};
1478 }
1479
1480 sub _query_start {
1481   my ( $self, $sql, $bind ) = @_;
1482
1483   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1484     if $self->debug;
1485 }
1486
1487 sub _query_end {
1488   my ( $self, $sql, $bind ) = @_;
1489
1490   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1491     if $self->debug;
1492 }
1493
1494 my $sba_compat;
1495 sub _dbi_attrs_for_bind {
1496   my ($self, $ident, $bind) = @_;
1497
1498   if (! defined $sba_compat) {
1499     $self->_determine_driver;
1500     $sba_compat = $self->can('source_bind_attributes') == \&source_bind_attributes
1501       ? 0
1502       : 1
1503     ;
1504   }
1505
1506   my $sba_attrs;
1507   if ($sba_compat) {
1508     my $class = ref $self;
1509     carp_unique (
1510       "The source_bind_attributes() override in $class relies on a deprecated codepath. "
1511      .'You are strongly advised to switch your code to override bind_attribute_by_datatype() '
1512      .'instead. This legacy compat shim will also disappear some time before DBIC 0.09'
1513     );
1514
1515     my $sba_attrs = $self->source_bind_attributes
1516   }
1517
1518   my @attrs;
1519
1520   for (map { $_->[0] } @$bind) {
1521     push @attrs, do {
1522       if (exists $_->{dbd_attrs}) {
1523         $_->{dbd_attrs}
1524       }
1525       elsif($_->{sqlt_datatype}) {
1526         $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1527       }
1528       elsif ($sba_attrs and $_->{dbic_colname}) {
1529         $sba_attrs->{$_->{dbic_colname}} || undef;
1530       }
1531       else {
1532         undef;  # always push something at this position
1533       }
1534     }
1535   }
1536
1537   return \@attrs;
1538 }
1539
1540 sub _execute {
1541   my ($self, $op, $ident, @args) = @_;
1542
1543   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1544
1545   shift->dbh_do(    # retry over disconnects
1546     '_dbh_execute',
1547     $sql,
1548     $bind,
1549     $self->_dbi_attrs_for_bind($ident, $bind)
1550   );
1551 }
1552
1553 sub _dbh_execute {
1554   my ($self, undef, $sql, $bind, $bind_attrs) = @_;
1555
1556   $self->_query_start( $sql, $bind );
1557   my $sth = $self->_sth($sql);
1558
1559   for my $i (0 .. $#$bind) {
1560     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1561       $sth->bind_param_inout(
1562         $i + 1, # bind params counts are 1-based
1563         $bind->[$i][1],
1564         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1565         $bind_attrs->[$i],
1566       );
1567     }
1568     else {
1569       $sth->bind_param(
1570         $i + 1,
1571         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1572           ? "$bind->[$i][1]"
1573           : $bind->[$i][1]
1574         ,
1575         $bind_attrs->[$i],
1576       );
1577     }
1578   }
1579
1580   # Can this fail without throwing an exception anyways???
1581   my $rv = $sth->execute();
1582   $self->throw_exception(
1583     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1584   ) if !$rv;
1585
1586   $self->_query_end( $sql, $bind );
1587
1588   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1589 }
1590
1591 sub _prefetch_autovalues {
1592   my ($self, $source, $to_insert) = @_;
1593
1594   my $colinfo = $source->columns_info;
1595
1596   my %values;
1597   for my $col (keys %$colinfo) {
1598     if (
1599       $colinfo->{$col}{auto_nextval}
1600         and
1601       (
1602         ! exists $to_insert->{$col}
1603           or
1604         ref $to_insert->{$col} eq 'SCALAR'
1605           or
1606         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1607       )
1608     ) {
1609       $values{$col} = $self->_sequence_fetch(
1610         'NEXTVAL',
1611         ( $colinfo->{$col}{sequence} ||=
1612             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1613         ),
1614       );
1615     }
1616   }
1617
1618   \%values;
1619 }
1620
1621 sub insert {
1622   my ($self, $source, $to_insert) = @_;
1623
1624   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1625
1626   # fuse the values, but keep a separate list of prefetched_values so that
1627   # they can be fused once again with the final return
1628   $to_insert = { %$to_insert, %$prefetched_values };
1629
1630   my $col_infos = $source->columns_info;
1631   my %pcols = map { $_ => 1 } $source->primary_columns;
1632   my %retrieve_cols;
1633   for my $col ($source->columns) {
1634     # nothing to retrieve when explicit values are supplied
1635     next if (defined $to_insert->{$col} and ! (
1636       ref $to_insert->{$col} eq 'SCALAR'
1637         or
1638       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1639     ));
1640
1641     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1642     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1643       $pcols{$col}
1644         or
1645       $col_infos->{$col}{retrieve_on_insert}
1646     );
1647   };
1648
1649   my ($sqla_opts, @ir_container);
1650   if (%retrieve_cols and $self->_use_insert_returning) {
1651     $sqla_opts->{returning_container} = \@ir_container
1652       if $self->_use_insert_returning_bound;
1653
1654     $sqla_opts->{returning} = [
1655       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1656     ];
1657   }
1658
1659   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1660
1661   my %returned_cols = %$to_insert;
1662   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1663     @ir_container = try {
1664       local $SIG{__WARN__} = sub {};
1665       my @r = $sth->fetchrow_array;
1666       $sth->finish;
1667       @r;
1668     } unless @ir_container;
1669
1670     @returned_cols{@$retlist} = @ir_container if @ir_container;
1671   }
1672   else {
1673     # pull in PK if needed and then everything else
1674     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1675
1676       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1677         unless $self->can('last_insert_id');
1678
1679       my @pri_values = $self->last_insert_id($source, @missing_pri);
1680
1681       $self->throw_exception( "Can't get last insert id" )
1682         unless (@pri_values == @missing_pri);
1683
1684       @returned_cols{@missing_pri} = @pri_values;
1685       delete $retrieve_cols{$_} for @missing_pri;
1686     }
1687
1688     # if there is more left to pull
1689     if (%retrieve_cols) {
1690       $self->throw_exception(
1691         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1692       ) unless %pcols;
1693
1694       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1695
1696       my $cur = DBIx::Class::ResultSet->new($source, {
1697         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1698         select => \@left_to_fetch,
1699       })->cursor;
1700
1701       @returned_cols{@left_to_fetch} = $cur->next;
1702
1703       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1704         if scalar $cur->next;
1705     }
1706   }
1707
1708   return { %$prefetched_values, %returned_cols };
1709 }
1710
1711 sub insert_bulk {
1712   my ($self, $source, $cols, $data) = @_;
1713
1714   # FIXME - perhaps this is not even needed? does DBI stringify?
1715   #
1716   # forcibly stringify whatever is stringifiable
1717   for my $r (0 .. $#$data) {
1718     for my $c (0 .. $#{$data->[$r]}) {
1719       $data->[$r][$c] = "$data->[$r][$c]"
1720         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1721     }
1722   }
1723
1724   # check the data for consistency
1725   # report a sensible error on bad data
1726   #
1727   # also create a list of dynamic binds (ones that will be changing
1728   # for each row)
1729   my $dyn_bind_idx;
1730   for my $col_idx (0..$#$cols) {
1731
1732     # the first "row" is used as a point of reference
1733     my $reference_val = $data->[0][$col_idx];
1734     my $is_literal = ref $reference_val eq 'SCALAR';
1735     my $is_literal_bind = ( !$is_literal and (
1736       ref $reference_val eq 'REF'
1737         and
1738       ref $$reference_val eq 'ARRAY'
1739     ) );
1740
1741     $dyn_bind_idx->{$col_idx} = 1
1742       if (!$is_literal and !$is_literal_bind);
1743
1744     # use a closure for convenience (less to pass)
1745     my $bad_slice = sub {
1746       my ($msg, $slice_idx) = @_;
1747       $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1748         $msg,
1749         $cols->[$col_idx],
1750         do {
1751           require Data::Dumper::Concise;
1752           local $Data::Dumper::Maxdepth = 2;
1753           Data::Dumper::Concise::Dumper ({
1754             map { $cols->[$_] =>
1755               $data->[$slice_idx][$_]
1756             } (0 .. $#$cols)
1757           }),
1758         }
1759       );
1760     };
1761
1762     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
1763       my $val = $data->[$row_idx][$col_idx];
1764
1765       if ($is_literal) {
1766         if (ref $val ne 'SCALAR') {
1767           $bad_slice->(
1768             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
1769             $row_idx
1770           );
1771         }
1772         elsif ($$val ne $$reference_val) {
1773           $bad_slice->(
1774             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
1775             $row_idx
1776           );
1777         }
1778       }
1779       elsif ($is_literal_bind) {
1780         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
1781           $bad_slice->(
1782             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
1783             $row_idx
1784           );
1785         }
1786         elsif (${$val}->[0] ne ${$reference_val}->[0]) {
1787           $bad_slice->(
1788             "Inconsistent literal SQL-bind value (expecting \\['${$reference_val}->[0]', ... ])",
1789             $row_idx
1790           );
1791         }
1792       }
1793       elsif (ref $val) {
1794         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
1795           $bad_slice->("Literal SQL found where a plain bind value is expected", $row_idx);
1796         }
1797         else {
1798           $bad_slice->("$val reference found where bind expected", $row_idx);
1799         }
1800       }
1801     }
1802   }
1803
1804   # Get the sql with bind values interpolated where necessary. For dynamic
1805   # binds convert the values of the first row into a literal+bind combo, with
1806   # extra positional info in the bind attr hashref. This will allow us to match
1807   # the order properly, and is so contrived because a user-supplied literal
1808   # bind (or something else specific to a resultsource and/or storage driver)
1809   # can inject extra binds along the way, so one can't rely on "shift
1810   # positions" ordering at all. Also we can't just hand SQLA a set of some
1811   # known "values" (e.g. hashrefs that can be later matched up by address),
1812   # because we want to supply a real value on which perhaps e.g. datatype
1813   # checks will be performed
1814   my ($sql, $proto_bind) = $self->_prep_for_execute (
1815     'insert',
1816     $source,
1817     [ { map { $cols->[$_] => $dyn_bind_idx->{$_}
1818       ? \[ '?', [
1819           { dbic_colname => $cols->[$_], _bind_data_slice_idx => $_ }
1820             =>
1821           $data->[0][$_]
1822         ] ]
1823       : $data->[0][$_]
1824     } (0..$#$cols) } ],
1825   );
1826
1827   if (! @$proto_bind and keys %$dyn_bind_idx) {
1828     # if the bindlist is empty and we had some dynamic binds, this means the
1829     # storage ate them away (e.g. the NoBindVars component) and interpolated
1830     # them directly into the SQL. This obviosly can't be good for multi-inserts
1831     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1832   }
1833
1834   # neither _execute_array, nor _execute_inserts_with_no_binds are
1835   # atomic (even if _execute _array is a single call). Thus a safety
1836   # scope guard
1837   my $guard = $self->txn_scope_guard;
1838
1839   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
1840   my $sth = $self->_sth($sql);
1841   my $rv = do {
1842     if (@$proto_bind) {
1843       # proto bind contains the information on which pieces of $data to pull
1844       # $cols is passed in only for prettier error-reporting
1845       $self->_execute_array( $source, $sth, $proto_bind, $cols, $data );
1846     }
1847     else {
1848       # bind_param_array doesn't work if there are no binds
1849       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1850     }
1851   };
1852
1853   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
1854
1855   $guard->commit;
1856
1857   return (wantarray ? ($rv, $sth, @$proto_bind) : $rv);
1858 }
1859
1860 sub _execute_array {
1861   my ($self, $source, $sth, $proto_bind, $cols, $data, @extra) = @_;
1862
1863   ## This must be an arrayref, else nothing works!
1864   my $tuple_status = [];
1865
1866   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
1867
1868   # Bind the values by column slices
1869   for my $i (0 .. $#$proto_bind) {
1870     my $data_slice_idx = (
1871       ref $proto_bind->[$i][0] eq 'HASH'
1872         and
1873       exists $proto_bind->[$i][0]{_bind_data_slice_idx}
1874     ) ? $proto_bind->[$i][0]{_bind_data_slice_idx} : undef;
1875
1876     $sth->bind_param_array(
1877       $i+1, # DBI bind indexes are 1-based
1878       defined $data_slice_idx
1879         # either get a "column" of dynamic values, or just repeat the same
1880         # bind over and over
1881         ? [ map { $_->[$data_slice_idx] } @$data ]
1882         : [ ($proto_bind->[$i][1]) x @$data ]
1883       ,
1884       defined $bind_attrs->[$i] ? $bind_attrs->[$i] : (), # some DBDs throw up when given an undef
1885     );
1886   }
1887
1888   my ($rv, $err);
1889   try {
1890     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1891   }
1892   catch {
1893     $err = shift;
1894   };
1895
1896   # Not all DBDs are create equal. Some throw on error, some return
1897   # an undef $rv, and some set $sth->err - try whatever we can
1898   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1899     ! defined $err
1900       and
1901     ( !defined $rv or $sth->err )
1902   );
1903
1904   # Statement must finish even if there was an exception.
1905   try {
1906     $sth->finish
1907   }
1908   catch {
1909     $err = shift unless defined $err
1910   };
1911
1912   if (defined $err) {
1913     my $i = 0;
1914     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1915
1916     $self->throw_exception("Unexpected populate error: $err")
1917       if ($i > $#$tuple_status);
1918
1919     require Data::Dumper::Concise;
1920     $self->throw_exception(sprintf "execute_array() aborted with '%s' at populate slice:\n%s",
1921       ($tuple_status->[$i][1] || $err),
1922       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
1923     );
1924   }
1925
1926   return $rv;
1927 }
1928
1929 sub _dbh_execute_array {
1930   #my ($self, $sth, $tuple_status, @extra) = @_;
1931   return $_[1]->execute_array({ArrayTupleStatus => $_[2]});
1932 }
1933
1934 sub _dbh_execute_inserts_with_no_binds {
1935   my ($self, $sth, $count) = @_;
1936
1937   my $err;
1938   try {
1939     my $dbh = $self->_get_dbh;
1940     local $dbh->{RaiseError} = 1;
1941     local $dbh->{PrintError} = 0;
1942
1943     $sth->execute foreach 1..$count;
1944   }
1945   catch {
1946     $err = shift;
1947   };
1948
1949   # Make sure statement is finished even if there was an exception.
1950   try {
1951     $sth->finish
1952   }
1953   catch {
1954     $err = shift unless defined $err;
1955   };
1956
1957   $self->throw_exception($err) if defined $err;
1958
1959   return $count;
1960 }
1961
1962 sub update {
1963   #my ($self, $source, @args) = @_;
1964   shift->_execute('update', @_);
1965 }
1966
1967
1968 sub delete {
1969   #my ($self, $source, @args) = @_;
1970   shift->_execute('delete', @_);
1971 }
1972
1973 # We were sent here because the $rs contains a complex search
1974 # which will require a subquery to select the correct rows
1975 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1976 #
1977 # Generating a single PK column subquery is trivial and supported
1978 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1979 # Look at _multipk_update_delete()
1980 sub _subq_update_delete {
1981   my $self = shift;
1982   my ($rs, $op, $values) = @_;
1983
1984   my $rsrc = $rs->result_source;
1985
1986   # quick check if we got a sane rs on our hands
1987   my @pcols = $rsrc->_pri_cols;
1988
1989   my $sel = $rs->_resolved_attrs->{select};
1990   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1991
1992   if (
1993       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1994         ne
1995       join ("\x00", sort @$sel )
1996   ) {
1997     $self->throw_exception (
1998       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1999     );
2000   }
2001
2002   if (@pcols == 1) {
2003     return $self->$op (
2004       $rsrc,
2005       $op eq 'update' ? $values : (),
2006       { $pcols[0] => { -in => $rs->as_query } },
2007     );
2008   }
2009
2010   else {
2011     return $self->_multipk_update_delete (@_);
2012   }
2013 }
2014
2015 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2016 # resultset update/delete involving subqueries. So by default resort
2017 # to simple (and inefficient) delete_all style per-row opearations,
2018 # while allowing specific storages to override this with a faster
2019 # implementation.
2020 #
2021 sub _multipk_update_delete {
2022   return shift->_per_row_update_delete (@_);
2023 }
2024
2025 # This is the default loop used to delete/update rows for multi PK
2026 # resultsets, and used by mysql exclusively (because it can't do anything
2027 # else).
2028 #
2029 # We do not use $row->$op style queries, because resultset update/delete
2030 # is not expected to cascade (this is what delete_all/update_all is for).
2031 #
2032 # There should be no race conditions as the entire operation is rolled
2033 # in a transaction.
2034 #
2035 sub _per_row_update_delete {
2036   my $self = shift;
2037   my ($rs, $op, $values) = @_;
2038
2039   my $rsrc = $rs->result_source;
2040   my @pcols = $rsrc->_pri_cols;
2041
2042   my $guard = $self->txn_scope_guard;
2043
2044   # emulate the return value of $sth->execute for non-selects
2045   my $row_cnt = '0E0';
2046
2047   my $subrs_cur = $rs->cursor;
2048   my @all_pk = $subrs_cur->all;
2049   for my $pks ( @all_pk) {
2050
2051     my $cond;
2052     for my $i (0.. $#pcols) {
2053       $cond->{$pcols[$i]} = $pks->[$i];
2054     }
2055
2056     $self->$op (
2057       $rsrc,
2058       $op eq 'update' ? $values : (),
2059       $cond,
2060     );
2061
2062     $row_cnt++;
2063   }
2064
2065   $guard->commit;
2066
2067   return $row_cnt;
2068 }
2069
2070 sub _select {
2071   my $self = shift;
2072   $self->_execute($self->_select_args(@_));
2073 }
2074
2075 sub _select_args_to_query {
2076   my $self = shift;
2077
2078   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2079   #  = $self->_select_args($ident, $select, $cond, $attrs);
2080   my ($op, $ident, @args) =
2081     $self->_select_args(@_);
2082
2083   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2084   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $ident, \@args);
2085   $prepared_bind ||= [];
2086
2087   return wantarray
2088     ? ($sql, $prepared_bind)
2089     : \[ "($sql)", @$prepared_bind ]
2090   ;
2091 }
2092
2093 sub _select_args {
2094   my ($self, $ident, $select, $where, $attrs) = @_;
2095
2096   my $sql_maker = $self->sql_maker;
2097   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2098
2099   $attrs = {
2100     %$attrs,
2101     select => $select,
2102     from => $ident,
2103     where => $where,
2104     $rs_alias && $alias2source->{$rs_alias}
2105       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2106       : ()
2107     ,
2108   };
2109
2110   # Sanity check the attributes (SQLMaker does it too, but
2111   # in case of a software_limit we'll never reach there)
2112   if (defined $attrs->{offset}) {
2113     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2114       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2115   }
2116
2117   if (defined $attrs->{rows}) {
2118     $self->throw_exception("The rows attribute must be a positive integer if present")
2119       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2120   }
2121   elsif ($attrs->{offset}) {
2122     # MySQL actually recommends this approach.  I cringe.
2123     $attrs->{rows} = $sql_maker->__max_int;
2124   }
2125
2126   my @limit;
2127
2128   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2129   # storage, unless software limit was requested
2130   if (
2131     #limited has_many
2132     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2133        ||
2134     # grouped prefetch (to satisfy group_by == select)
2135     ( $attrs->{group_by}
2136         &&
2137       @{$attrs->{group_by}}
2138         &&
2139       $attrs->{_prefetch_selector_range}
2140     )
2141   ) {
2142     ($ident, $select, $where, $attrs)
2143       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2144   }
2145   elsif (! $attrs->{software_limit} ) {
2146     push @limit, (
2147       $attrs->{rows} || (),
2148       $attrs->{offset} || (),
2149     );
2150   }
2151
2152   # try to simplify the joinmap further (prune unreferenced type-single joins)
2153   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2154
2155 ###
2156   # This would be the point to deflate anything found in $where
2157   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2158   # expect a row object. And all we have is a resultsource (it is trivial
2159   # to extract deflator coderefs via $alias2source above).
2160   #
2161   # I don't see a way forward other than changing the way deflators are
2162   # invoked, and that's just bad...
2163 ###
2164
2165   return ('select', $ident, $select, $where, $attrs, @limit);
2166 }
2167
2168 # Returns a counting SELECT for a simple count
2169 # query. Abstracted so that a storage could override
2170 # this to { count => 'firstcol' } or whatever makes
2171 # sense as a performance optimization
2172 sub _count_select {
2173   #my ($self, $source, $rs_attrs) = @_;
2174   return { count => '*' };
2175 }
2176
2177 sub source_bind_attributes {
2178   shift->throw_exception(
2179     'source_bind_attributes() was never meant to be a callable public method - '
2180    .'please contact the DBIC dev-team and describe your use case so that a reasonable '
2181    .'solution can be provided'
2182    ."\nhttp://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT"
2183   );
2184 }
2185
2186 =head2 select
2187
2188 =over 4
2189
2190 =item Arguments: $ident, $select, $condition, $attrs
2191
2192 =back
2193
2194 Handle a SQL select statement.
2195
2196 =cut
2197
2198 sub select {
2199   my $self = shift;
2200   my ($ident, $select, $condition, $attrs) = @_;
2201   return $self->cursor_class->new($self, \@_, $attrs);
2202 }
2203
2204 sub select_single {
2205   my $self = shift;
2206   my ($rv, $sth, @bind) = $self->_select(@_);
2207   my @row = $sth->fetchrow_array;
2208   my @nextrow = $sth->fetchrow_array if @row;
2209   if(@row && @nextrow) {
2210     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2211   }
2212   # Need to call finish() to work round broken DBDs
2213   $sth->finish();
2214   return @row;
2215 }
2216
2217 =head2 sql_limit_dialect
2218
2219 This is an accessor for the default SQL limit dialect used by a particular
2220 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2221 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2222 see L<DBIx::Class::SQLMaker::LimitDialects>.
2223
2224 =head2 sth
2225
2226 =over 4
2227
2228 =item Arguments: $sql
2229
2230 =back
2231
2232 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2233
2234 =cut
2235
2236 sub _dbh_sth {
2237   my ($self, $dbh, $sql) = @_;
2238
2239   # 3 is the if_active parameter which avoids active sth re-use
2240   my $sth = $self->disable_sth_caching
2241     ? $dbh->prepare($sql)
2242     : $dbh->prepare_cached($sql, {}, 3);
2243
2244   # XXX You would think RaiseError would make this impossible,
2245   #  but apparently that's not true :(
2246   $self->throw_exception(
2247     $dbh->errstr
2248       ||
2249     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2250             .'an exception and/or setting $dbh->errstr',
2251       length ($sql) > 20
2252         ? substr($sql, 0, 20) . '...'
2253         : $sql
2254       ,
2255       'DBD::' . $dbh->{Driver}{Name},
2256     )
2257   ) if !$sth;
2258
2259   $sth;
2260 }
2261
2262 sub sth {
2263   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2264   shift->_sth(@_);
2265 }
2266
2267 sub _sth {
2268   my ($self, $sql) = @_;
2269   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2270 }
2271
2272 sub _dbh_columns_info_for {
2273   my ($self, $dbh, $table) = @_;
2274
2275   if ($dbh->can('column_info')) {
2276     my %result;
2277     my $caught;
2278     try {
2279       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2280       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2281       $sth->execute();
2282       while ( my $info = $sth->fetchrow_hashref() ){
2283         my %column_info;
2284         $column_info{data_type}   = $info->{TYPE_NAME};
2285         $column_info{size}      = $info->{COLUMN_SIZE};
2286         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2287         $column_info{default_value} = $info->{COLUMN_DEF};
2288         my $col_name = $info->{COLUMN_NAME};
2289         $col_name =~ s/^\"(.*)\"$/$1/;
2290
2291         $result{$col_name} = \%column_info;
2292       }
2293     } catch {
2294       $caught = 1;
2295     };
2296     return \%result if !$caught && scalar keys %result;
2297   }
2298
2299   my %result;
2300   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2301   $sth->execute;
2302   my @columns = @{$sth->{NAME_lc}};
2303   for my $i ( 0 .. $#columns ){
2304     my %column_info;
2305     $column_info{data_type} = $sth->{TYPE}->[$i];
2306     $column_info{size} = $sth->{PRECISION}->[$i];
2307     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2308
2309     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2310       $column_info{data_type} = $1;
2311       $column_info{size}    = $2;
2312     }
2313
2314     $result{$columns[$i]} = \%column_info;
2315   }
2316   $sth->finish;
2317
2318   foreach my $col (keys %result) {
2319     my $colinfo = $result{$col};
2320     my $type_num = $colinfo->{data_type};
2321     my $type_name;
2322     if(defined $type_num && $dbh->can('type_info')) {
2323       my $type_info = $dbh->type_info($type_num);
2324       $type_name = $type_info->{TYPE_NAME} if $type_info;
2325       $colinfo->{data_type} = $type_name if $type_name;
2326     }
2327   }
2328
2329   return \%result;
2330 }
2331
2332 sub columns_info_for {
2333   my ($self, $table) = @_;
2334   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2335 }
2336
2337 =head2 last_insert_id
2338
2339 Return the row id of the last insert.
2340
2341 =cut
2342
2343 sub _dbh_last_insert_id {
2344     my ($self, $dbh, $source, $col) = @_;
2345
2346     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2347
2348     return $id if defined $id;
2349
2350     my $class = ref $self;
2351     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2352 }
2353
2354 sub last_insert_id {
2355   my $self = shift;
2356   $self->_dbh_last_insert_id ($self->_dbh, @_);
2357 }
2358
2359 =head2 _native_data_type
2360
2361 =over 4
2362
2363 =item Arguments: $type_name
2364
2365 =back
2366
2367 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2368 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2369 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2370
2371 The default implementation returns C<undef>, implement in your Storage driver if
2372 you need this functionality.
2373
2374 Should map types from other databases to the native RDBMS type, for example
2375 C<VARCHAR2> to C<VARCHAR>.
2376
2377 Types with modifiers should map to the underlying data type. For example,
2378 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2379
2380 Composite types should map to the container type, for example
2381 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2382
2383 =cut
2384
2385 sub _native_data_type {
2386   #my ($self, $data_type) = @_;
2387   return undef
2388 }
2389
2390 # Check if placeholders are supported at all
2391 sub _determine_supports_placeholders {
2392   my $self = shift;
2393   my $dbh  = $self->_get_dbh;
2394
2395   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2396   # but it is inaccurate more often than not
2397   return try {
2398     local $dbh->{PrintError} = 0;
2399     local $dbh->{RaiseError} = 1;
2400     $dbh->do('select ?', {}, 1);
2401     1;
2402   }
2403   catch {
2404     0;
2405   };
2406 }
2407
2408 # Check if placeholders bound to non-string types throw exceptions
2409 #
2410 sub _determine_supports_typeless_placeholders {
2411   my $self = shift;
2412   my $dbh  = $self->_get_dbh;
2413
2414   return try {
2415     local $dbh->{PrintError} = 0;
2416     local $dbh->{RaiseError} = 1;
2417     # this specifically tests a bind that is NOT a string
2418     $dbh->do('select 1 where 1 = ?', {}, 1);
2419     1;
2420   }
2421   catch {
2422     0;
2423   };
2424 }
2425
2426 =head2 sqlt_type
2427
2428 Returns the database driver name.
2429
2430 =cut
2431
2432 sub sqlt_type {
2433   shift->_get_dbh->{Driver}->{Name};
2434 }
2435
2436 =head2 bind_attribute_by_data_type
2437
2438 Given a datatype from column info, returns a database specific bind
2439 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2440 let the database planner just handle it.
2441
2442 Generally only needed for special case column types, like bytea in postgres.
2443
2444 =cut
2445
2446 sub bind_attribute_by_data_type {
2447     return;
2448 }
2449
2450 =head2 is_datatype_numeric
2451
2452 Given a datatype from column_info, returns a boolean value indicating if
2453 the current RDBMS considers it a numeric value. This controls how
2454 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2455 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2456 be performed instead of the usual C<eq>.
2457
2458 =cut
2459
2460 sub is_datatype_numeric {
2461   #my ($self, $dt) = @_;
2462
2463   return 0 unless $_[1];
2464
2465   $_[1] =~ /^ (?:
2466     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2467   ) $/ix;
2468 }
2469
2470
2471 =head2 create_ddl_dir
2472
2473 =over 4
2474
2475 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2476
2477 =back
2478
2479 Creates a SQL file based on the Schema, for each of the specified
2480 database engines in C<\@databases> in the given directory.
2481 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2482
2483 Given a previous version number, this will also create a file containing
2484 the ALTER TABLE statements to transform the previous schema into the
2485 current one. Note that these statements may contain C<DROP TABLE> or
2486 C<DROP COLUMN> statements that can potentially destroy data.
2487
2488 The file names are created using the C<ddl_filename> method below, please
2489 override this method in your schema if you would like a different file
2490 name format. For the ALTER file, the same format is used, replacing
2491 $version in the name with "$preversion-$version".
2492
2493 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2494 The most common value for this would be C<< { add_drop_table => 1 } >>
2495 to have the SQL produced include a C<DROP TABLE> statement for each table
2496 created. For quoting purposes supply C<quote_table_names> and
2497 C<quote_field_names>.
2498
2499 If no arguments are passed, then the following default values are assumed:
2500
2501 =over 4
2502
2503 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2504
2505 =item version    - $schema->schema_version
2506
2507 =item directory  - './'
2508
2509 =item preversion - <none>
2510
2511 =back
2512
2513 By default, C<\%sqlt_args> will have
2514
2515  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2516
2517 merged with the hash passed in. To disable any of those features, pass in a
2518 hashref like the following
2519
2520  { ignore_constraint_names => 0, # ... other options }
2521
2522
2523 WARNING: You are strongly advised to check all SQL files created, before applying
2524 them.
2525
2526 =cut
2527
2528 sub create_ddl_dir {
2529   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2530
2531   unless ($dir) {
2532     carp "No directory given, using ./\n";
2533     $dir = './';
2534   } else {
2535       -d $dir
2536         or
2537       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2538         or
2539       $self->throw_exception(
2540         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2541       );
2542   }
2543
2544   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2545
2546   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2547   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2548
2549   my $schema_version = $schema->schema_version || '1.x';
2550   $version ||= $schema_version;
2551
2552   $sqltargs = {
2553     add_drop_table => 1,
2554     ignore_constraint_names => 1,
2555     ignore_index_names => 1,
2556     %{$sqltargs || {}}
2557   };
2558
2559   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2560     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2561   }
2562
2563   my $sqlt = SQL::Translator->new( $sqltargs );
2564
2565   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2566   my $sqlt_schema = $sqlt->translate({ data => $schema })
2567     or $self->throw_exception ($sqlt->error);
2568
2569   foreach my $db (@$databases) {
2570     $sqlt->reset();
2571     $sqlt->{schema} = $sqlt_schema;
2572     $sqlt->producer($db);
2573
2574     my $file;
2575     my $filename = $schema->ddl_filename($db, $version, $dir);
2576     if (-e $filename && ($version eq $schema_version )) {
2577       # if we are dumping the current version, overwrite the DDL
2578       carp "Overwriting existing DDL file - $filename";
2579       unlink($filename);
2580     }
2581
2582     my $output = $sqlt->translate;
2583     if(!$output) {
2584       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2585       next;
2586     }
2587     if(!open($file, ">$filename")) {
2588       $self->throw_exception("Can't open $filename for writing ($!)");
2589       next;
2590     }
2591     print $file $output;
2592     close($file);
2593
2594     next unless ($preversion);
2595
2596     require SQL::Translator::Diff;
2597
2598     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2599     if(!-e $prefilename) {
2600       carp("No previous schema file found ($prefilename)");
2601       next;
2602     }
2603
2604     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2605     if(-e $difffile) {
2606       carp("Overwriting existing diff file - $difffile");
2607       unlink($difffile);
2608     }
2609
2610     my $source_schema;
2611     {
2612       my $t = SQL::Translator->new($sqltargs);
2613       $t->debug( 0 );
2614       $t->trace( 0 );
2615
2616       $t->parser( $db )
2617         or $self->throw_exception ($t->error);
2618
2619       my $out = $t->translate( $prefilename )
2620         or $self->throw_exception ($t->error);
2621
2622       $source_schema = $t->schema;
2623
2624       $source_schema->name( $prefilename )
2625         unless ( $source_schema->name );
2626     }
2627
2628     # The "new" style of producers have sane normalization and can support
2629     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2630     # And we have to diff parsed SQL against parsed SQL.
2631     my $dest_schema = $sqlt_schema;
2632
2633     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2634       my $t = SQL::Translator->new($sqltargs);
2635       $t->debug( 0 );
2636       $t->trace( 0 );
2637
2638       $t->parser( $db )
2639         or $self->throw_exception ($t->error);
2640
2641       my $out = $t->translate( $filename )
2642         or $self->throw_exception ($t->error);
2643
2644       $dest_schema = $t->schema;
2645
2646       $dest_schema->name( $filename )
2647         unless $dest_schema->name;
2648     }
2649
2650     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2651                                                   $dest_schema,   $db,
2652                                                   $sqltargs
2653                                                  );
2654     if(!open $file, ">$difffile") {
2655       $self->throw_exception("Can't write to $difffile ($!)");
2656       next;
2657     }
2658     print $file $diff;
2659     close($file);
2660   }
2661 }
2662
2663 =head2 deployment_statements
2664
2665 =over 4
2666
2667 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2668
2669 =back
2670
2671 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2672
2673 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2674 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2675
2676 C<$directory> is used to return statements from files in a previously created
2677 L</create_ddl_dir> directory and is optional. The filenames are constructed
2678 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2679
2680 If no C<$directory> is specified then the statements are constructed on the
2681 fly using L<SQL::Translator> and C<$version> is ignored.
2682
2683 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2684
2685 =cut
2686
2687 sub deployment_statements {
2688   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2689   $type ||= $self->sqlt_type;
2690   $version ||= $schema->schema_version || '1.x';
2691   $dir ||= './';
2692   my $filename = $schema->ddl_filename($type, $version, $dir);
2693   if(-f $filename)
2694   {
2695       # FIXME replace this block when a proper sane sql parser is available
2696       my $file;
2697       open($file, "<$filename")
2698         or $self->throw_exception("Can't open $filename ($!)");
2699       my @rows = <$file>;
2700       close($file);
2701       return join('', @rows);
2702   }
2703
2704   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2705     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2706   }
2707
2708   # sources needs to be a parser arg, but for simplicty allow at top level
2709   # coming in
2710   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2711       if exists $sqltargs->{sources};
2712
2713   my $tr = SQL::Translator->new(
2714     producer => "SQL::Translator::Producer::${type}",
2715     %$sqltargs,
2716     parser => 'SQL::Translator::Parser::DBIx::Class',
2717     data => $schema,
2718   );
2719
2720   my @ret;
2721   if (wantarray) {
2722     @ret = $tr->translate;
2723   }
2724   else {
2725     $ret[0] = $tr->translate;
2726   }
2727
2728   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2729     unless (@ret && defined $ret[0]);
2730
2731   return wantarray ? @ret : $ret[0];
2732 }
2733
2734 # FIXME deploy() currently does not accurately report sql errors
2735 # Will always return true while errors are warned
2736 sub deploy {
2737   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2738   my $deploy = sub {
2739     my $line = shift;
2740     return if(!$line);
2741     return if($line =~ /^--/);
2742     # next if($line =~ /^DROP/m);
2743     return if($line =~ /^BEGIN TRANSACTION/m);
2744     return if($line =~ /^COMMIT/m);
2745     return if $line =~ /^\s+$/; # skip whitespace only
2746     $self->_query_start($line);
2747     try {
2748       # do a dbh_do cycle here, as we need some error checking in
2749       # place (even though we will ignore errors)
2750       $self->dbh_do (sub { $_[1]->do($line) });
2751     } catch {
2752       carp qq{$_ (running "${line}")};
2753     };
2754     $self->_query_end($line);
2755   };
2756   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2757   if (@statements > 1) {
2758     foreach my $statement (@statements) {
2759       $deploy->( $statement );
2760     }
2761   }
2762   elsif (@statements == 1) {
2763     # split on single line comments and end of statements
2764     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2765       $deploy->( $line );
2766     }
2767   }
2768 }
2769
2770 =head2 datetime_parser
2771
2772 Returns the datetime parser class
2773
2774 =cut
2775
2776 sub datetime_parser {
2777   my $self = shift;
2778   return $self->{datetime_parser} ||= do {
2779     $self->build_datetime_parser(@_);
2780   };
2781 }
2782
2783 =head2 datetime_parser_type
2784
2785 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2786
2787 =head2 build_datetime_parser
2788
2789 See L</datetime_parser>
2790
2791 =cut
2792
2793 sub build_datetime_parser {
2794   my $self = shift;
2795   my $type = $self->datetime_parser_type(@_);
2796   return $type;
2797 }
2798
2799
2800 =head2 is_replicating
2801
2802 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2803 replicate from a master database.  Default is undef, which is the result
2804 returned by databases that don't support replication.
2805
2806 =cut
2807
2808 sub is_replicating {
2809     return;
2810
2811 }
2812
2813 =head2 lag_behind_master
2814
2815 Returns a number that represents a certain amount of lag behind a master db
2816 when a given storage is replicating.  The number is database dependent, but
2817 starts at zero and increases with the amount of lag. Default in undef
2818
2819 =cut
2820
2821 sub lag_behind_master {
2822     return;
2823 }
2824
2825 =head2 relname_to_table_alias
2826
2827 =over 4
2828
2829 =item Arguments: $relname, $join_count
2830
2831 =back
2832
2833 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2834 queries.
2835
2836 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2837 way these aliases are named.
2838
2839 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2840 otherwise C<"$relname">.
2841
2842 =cut
2843
2844 sub relname_to_table_alias {
2845   my ($self, $relname, $join_count) = @_;
2846
2847   my $alias = ($join_count && $join_count > 1 ?
2848     join('_', $relname, $join_count) : $relname);
2849
2850   return $alias;
2851 }
2852
2853 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2854 # version and it may be necessary to amend or override it for a specific storage
2855 # if such binds are necessary.
2856 sub _max_column_bytesize {
2857   my ($self, $attr) = @_;
2858
2859   my $max_size;
2860
2861   if ($attr->{sqlt_datatype}) {
2862     my $data_type = lc($attr->{sqlt_datatype});
2863
2864     if ($attr->{sqlt_size}) {
2865
2866       # String/sized-binary types
2867       if ($data_type =~ /^(?:
2868           l? (?:var)? char(?:acter)? (?:\s*varying)?
2869             |
2870           (?:var)? binary (?:\s*varying)? 
2871             |
2872           raw
2873         )\b/x
2874       ) {
2875         $max_size = $attr->{sqlt_size};
2876       }
2877       # Other charset/unicode types, assume scale of 4
2878       elsif ($data_type =~ /^(?:
2879           national \s* character (?:\s*varying)?
2880             |
2881           nchar
2882             |
2883           univarchar
2884             |
2885           nvarchar
2886         )\b/x
2887       ) {
2888         $max_size = $attr->{sqlt_size} * 4;
2889       }
2890     }
2891
2892     if (!$max_size and !$self->_is_lob_type($data_type)) {
2893       $max_size = 100 # for all other (numeric?) datatypes
2894     }
2895   }
2896
2897   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
2898 }
2899
2900 # Determine if a data_type is some type of BLOB
2901 sub _is_lob_type {
2902   my ($self, $data_type) = @_;
2903   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2904     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2905                                   |varchar|character\s*varying|nvarchar
2906                                   |national\s*character\s*varying))?\z/xi);
2907 }
2908
2909 sub _is_binary_lob_type {
2910   my ($self, $data_type) = @_;
2911   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2912     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2913 }
2914
2915 sub _is_text_lob_type {
2916   my ($self, $data_type) = @_;
2917   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2918     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2919                         |national\s*character\s*varying))\z/xi);
2920 }
2921
2922 1;
2923
2924 =head1 USAGE NOTES
2925
2926 =head2 DBIx::Class and AutoCommit
2927
2928 DBIx::Class can do some wonderful magic with handling exceptions,
2929 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2930 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
2931 transaction support.
2932
2933 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2934 in an assumed transaction between commits, and you're telling us you'd
2935 like to manage that manually.  A lot of the magic protections offered by
2936 this module will go away.  We can't protect you from exceptions due to database
2937 disconnects because we don't know anything about how to restart your
2938 transactions.  You're on your own for handling all sorts of exceptional
2939 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2940 be with raw DBI.
2941
2942
2943 =head1 AUTHORS
2944
2945 Matt S. Trout <mst@shadowcatsystems.co.uk>
2946
2947 Andy Grundman <andy@hybridized.org>
2948
2949 =head1 LICENSE
2950
2951 You may distribute this code under the same terms as Perl itself.
2952
2953 =cut