Fix long standing issue with resultset growth on repeated execution (GHPR#29)
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use Scalar::Util qw/refaddr weaken reftype blessed/;
12 use List::Util qw/first/;
13 use Sub::Name 'subname';
14 use Context::Preserve 'preserve_context';
15 use Try::Tiny;
16 use overload ();
17 use Data::Compare (); # no imports!!! guard against insane architecture
18 use namespace::clean;
19
20 # default cursor class, overridable in connect_info attributes
21 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
22
23 __PACKAGE__->mk_group_accessors('inherited' => qw/
24   sql_limit_dialect sql_quote_char sql_name_sep
25 /);
26
27 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
28
29 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
30 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
31
32 __PACKAGE__->sql_name_sep('.');
33
34 __PACKAGE__->mk_group_accessors('simple' => qw/
35   _connect_info _dbic_connect_attributes _driver_determined
36   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
37   _perform_autoinc_retrieval _autoinc_supplied_for_op
38 /);
39
40 # the values for these accessors are picked out (and deleted) from
41 # the attribute hashref passed to connect_info
42 my @storage_options = qw/
43   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
44   disable_sth_caching unsafe auto_savepoint
45 /;
46 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
47
48
49 # capability definitions, using a 2-tiered accessor system
50 # The rationale is:
51 #
52 # A driver/user may define _use_X, which blindly without any checks says:
53 # "(do not) use this capability", (use_dbms_capability is an "inherited"
54 # type accessor)
55 #
56 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
57 # accessor, which in turn calls _determine_supports_X, and stores the return
58 # in a special slot on the storage object, which is wiped every time a $dbh
59 # reconnection takes place (it is not guaranteed that upon reconnection we
60 # will get the same rdbms version). _determine_supports_X does not need to
61 # exist on a driver, as we ->can for it before calling.
62
63 my @capabilities = (qw/
64   insert_returning
65   insert_returning_bound
66
67   multicolumn_in
68
69   placeholders
70   typeless_placeholders
71
72   join_optimizer
73 /);
74 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
75 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
76
77 # on by default, not strictly a capability (pending rewrite)
78 __PACKAGE__->_use_join_optimizer (1);
79 sub _determine_supports_join_optimizer { 1 };
80
81 # Each of these methods need _determine_driver called before itself
82 # in order to function reliably. This is a purely DRY optimization
83 #
84 # get_(use)_dbms_capability need to be called on the correct Storage
85 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
86 # _determine_supports_X which obv. needs a correct driver as well
87 my @rdbms_specific_methods = qw/
88   sqlt_type
89   deployment_statements
90
91   sql_maker
92   cursor_class
93
94   build_datetime_parser
95   datetime_parser_type
96
97   txn_begin
98
99   insert
100   insert_bulk
101   update
102   delete
103   select
104   select_single
105
106   with_deferred_fk_checks
107
108   get_use_dbms_capability
109   get_dbms_capability
110
111   _server_info
112   _get_server_version
113 /;
114
115 for my $meth (@rdbms_specific_methods) {
116
117   my $orig = __PACKAGE__->can ($meth)
118     or die "$meth is not a ::Storage::DBI method!";
119
120   no strict qw/refs/;
121   no warnings qw/redefine/;
122   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
123     if (
124       # only fire when invoked on an instance, a valid class-based invocation
125       # would e.g. be setting a default for an inherited accessor
126       ref $_[0]
127         and
128       ! $_[0]->{_driver_determined}
129         and
130       ! $_[0]->{_in_determine_driver}
131         and
132       # Only try to determine stuff if we have *something* that either is or can
133       # provide a DSN. Allows for bare $schema's generated with a plain ->connect()
134       # to still be marginally useful
135       $_[0]->_dbi_connect_info->[0]
136     ) {
137       $_[0]->_determine_driver;
138
139       # This for some reason crashes and burns on perl 5.8.1
140       # IFF the method ends up throwing an exception
141       #goto $_[0]->can ($meth);
142
143       my $cref = $_[0]->can ($meth);
144       goto $cref;
145     }
146
147     goto $orig;
148   };
149 }
150
151 =head1 NAME
152
153 DBIx::Class::Storage::DBI - DBI storage handler
154
155 =head1 SYNOPSIS
156
157   my $schema = MySchema->connect('dbi:SQLite:my.db');
158
159   $schema->storage->debug(1);
160
161   my @stuff = $schema->storage->dbh_do(
162     sub {
163       my ($storage, $dbh, @args) = @_;
164       $dbh->do("DROP TABLE authors");
165     },
166     @column_list
167   );
168
169   $schema->resultset('Book')->search({
170      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
171   });
172
173 =head1 DESCRIPTION
174
175 This class represents the connection to an RDBMS via L<DBI>.  See
176 L<DBIx::Class::Storage> for general information.  This pod only
177 documents DBI-specific methods and behaviors.
178
179 =head1 METHODS
180
181 =cut
182
183 sub new {
184   my $new = shift->next::method(@_);
185
186   $new->_sql_maker_opts({});
187   $new->_dbh_details({});
188   $new->{_in_do_block} = 0;
189
190   # read below to see what this does
191   $new->_arm_global_destructor;
192
193   $new;
194 }
195
196 # This is hack to work around perl shooting stuff in random
197 # order on exit(). If we do not walk the remaining storage
198 # objects in an END block, there is a *small but real* chance
199 # of a fork()ed child to kill the parent's shared DBI handle,
200 # *before perl reaches the DESTROY in this package*
201 # Yes, it is ugly and effective.
202 # Additionally this registry is used by the CLONE method to
203 # make sure no handles are shared between threads
204 {
205   my %seek_and_destroy;
206
207   sub _arm_global_destructor {
208
209     # quick "garbage collection" pass - prevents the registry
210     # from slowly growing with a bunch of undef-valued keys
211     defined $seek_and_destroy{$_} or delete $seek_and_destroy{$_}
212       for keys %seek_and_destroy;
213
214     weaken (
215       $seek_and_destroy{ refaddr($_[0]) } = $_[0]
216     );
217   }
218
219   END {
220     local $?; # just in case the DBI destructor changes it somehow
221
222     # destroy just the object if not native to this process
223     $_->_verify_pid for (grep
224       { defined $_ }
225       values %seek_and_destroy
226     );
227   }
228
229   sub CLONE {
230     # As per DBI's recommendation, DBIC disconnects all handles as
231     # soon as possible (DBIC will reconnect only on demand from within
232     # the thread)
233     my @instances = grep { defined $_ } values %seek_and_destroy;
234     %seek_and_destroy = ();
235
236     for (@instances) {
237       $_->_dbh(undef);
238
239       $_->transaction_depth(0);
240       $_->savepoints([]);
241
242       # properly renumber existing refs
243       $_->_arm_global_destructor
244     }
245   }
246 }
247
248 sub DESTROY {
249   my $self = shift;
250
251   # some databases spew warnings on implicit disconnect
252   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
253   local $SIG{__WARN__} = sub {};
254   $self->_dbh(undef);
255
256   # this op is necessary, since the very last perl runtime statement
257   # triggers a global destruction shootout, and the $SIG localization
258   # may very well be destroyed before perl actually gets to do the
259   # $dbh undef
260   1;
261 }
262
263 # handle pid changes correctly - do not destroy parent's connection
264 sub _verify_pid {
265   my $self = shift;
266
267   my $pid = $self->_conn_pid;
268   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
269     $dbh->{InactiveDestroy} = 1;
270     $self->_dbh(undef);
271     $self->transaction_depth(0);
272     $self->savepoints([]);
273   }
274
275   return;
276 }
277
278 =head2 connect_info
279
280 This method is normally called by L<DBIx::Class::Schema/connection>, which
281 encapsulates its argument list in an arrayref before passing them here.
282
283 The argument list may contain:
284
285 =over
286
287 =item *
288
289 The same 4-element argument set one would normally pass to
290 L<DBI/connect>, optionally followed by
291 L<extra attributes|/DBIx::Class specific connection attributes>
292 recognized by DBIx::Class:
293
294   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
295
296 =item *
297
298 A single code reference which returns a connected
299 L<DBI database handle|DBI/connect> optionally followed by
300 L<extra attributes|/DBIx::Class specific connection attributes> recognized
301 by DBIx::Class:
302
303   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
304
305 =item *
306
307 A single hashref with all the attributes and the dsn/user/password
308 mixed together:
309
310   $connect_info_args = [{
311     dsn => $dsn,
312     user => $user,
313     password => $pass,
314     %dbi_attributes,
315     %extra_attributes,
316   }];
317
318   $connect_info_args = [{
319     dbh_maker => sub { DBI->connect (...) },
320     %dbi_attributes,
321     %extra_attributes,
322   }];
323
324 This is particularly useful for L<Catalyst> based applications, allowing the
325 following config (L<Config::General> style):
326
327   <Model::DB>
328     schema_class   App::DB
329     <connect_info>
330       dsn          dbi:mysql:database=test
331       user         testuser
332       password     TestPass
333       AutoCommit   1
334     </connect_info>
335   </Model::DB>
336
337 The C<dsn>/C<user>/C<password> combination can be substituted by the
338 C<dbh_maker> key whose value is a coderef that returns a connected
339 L<DBI database handle|DBI/connect>
340
341 =back
342
343 Please note that the L<DBI> docs recommend that you always explicitly
344 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
345 recommends that it be set to I<1>, and that you perform transactions
346 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
347 to I<1> if you do not do explicitly set it to zero.  This is the default
348 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
349
350 =head3 DBIx::Class specific connection attributes
351
352 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
353 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
354 the following connection options. These options can be mixed in with your other
355 L<DBI> connection attributes, or placed in a separate hashref
356 (C<\%extra_attributes>) as shown above.
357
358 Every time C<connect_info> is invoked, any previous settings for
359 these options will be cleared before setting the new ones, regardless of
360 whether any options are specified in the new C<connect_info>.
361
362
363 =over
364
365 =item on_connect_do
366
367 Specifies things to do immediately after connecting or re-connecting to
368 the database.  Its value may contain:
369
370 =over
371
372 =item a scalar
373
374 This contains one SQL statement to execute.
375
376 =item an array reference
377
378 This contains SQL statements to execute in order.  Each element contains
379 a string or a code reference that returns a string.
380
381 =item a code reference
382
383 This contains some code to execute.  Unlike code references within an
384 array reference, its return value is ignored.
385
386 =back
387
388 =item on_disconnect_do
389
390 Takes arguments in the same form as L</on_connect_do> and executes them
391 immediately before disconnecting from the database.
392
393 Note, this only runs if you explicitly call L</disconnect> on the
394 storage object.
395
396 =item on_connect_call
397
398 A more generalized form of L</on_connect_do> that calls the specified
399 C<connect_call_METHOD> methods in your storage driver.
400
401   on_connect_do => 'select 1'
402
403 is equivalent to:
404
405   on_connect_call => [ [ do_sql => 'select 1' ] ]
406
407 Its values may contain:
408
409 =over
410
411 =item a scalar
412
413 Will call the C<connect_call_METHOD> method.
414
415 =item a code reference
416
417 Will execute C<< $code->($storage) >>
418
419 =item an array reference
420
421 Each value can be a method name or code reference.
422
423 =item an array of arrays
424
425 For each array, the first item is taken to be the C<connect_call_> method name
426 or code reference, and the rest are parameters to it.
427
428 =back
429
430 Some predefined storage methods you may use:
431
432 =over
433
434 =item do_sql
435
436 Executes a SQL string or a code reference that returns a SQL string. This is
437 what L</on_connect_do> and L</on_disconnect_do> use.
438
439 It can take:
440
441 =over
442
443 =item a scalar
444
445 Will execute the scalar as SQL.
446
447 =item an arrayref
448
449 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
450 attributes hashref and bind values.
451
452 =item a code reference
453
454 Will execute C<< $code->($storage) >> and execute the return array refs as
455 above.
456
457 =back
458
459 =item datetime_setup
460
461 Execute any statements necessary to initialize the database session to return
462 and accept datetime/timestamp values used with
463 L<DBIx::Class::InflateColumn::DateTime>.
464
465 Only necessary for some databases, see your specific storage driver for
466 implementation details.
467
468 =back
469
470 =item on_disconnect_call
471
472 Takes arguments in the same form as L</on_connect_call> and executes them
473 immediately before disconnecting from the database.
474
475 Calls the C<disconnect_call_METHOD> methods as opposed to the
476 C<connect_call_METHOD> methods called by L</on_connect_call>.
477
478 Note, this only runs if you explicitly call L</disconnect> on the
479 storage object.
480
481 =item disable_sth_caching
482
483 If set to a true value, this option will disable the caching of
484 statement handles via L<DBI/prepare_cached>.
485
486 =item limit_dialect
487
488 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
489 default L</sql_limit_dialect> setting of the storage (if any). For a list
490 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
491
492 =item quote_names
493
494 When true automatically sets L</quote_char> and L</name_sep> to the characters
495 appropriate for your particular RDBMS. This option is preferred over specifying
496 L</quote_char> directly.
497
498 =item quote_char
499
500 Specifies what characters to use to quote table and column names.
501
502 C<quote_char> expects either a single character, in which case is it
503 is placed on either side of the table/column name, or an arrayref of length
504 2 in which case the table/column name is placed between the elements.
505
506 For example under MySQL you should use C<< quote_char => '`' >>, and for
507 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
508
509 =item name_sep
510
511 This parameter is only useful in conjunction with C<quote_char>, and is used to
512 specify the character that separates elements (schemas, tables, columns) from
513 each other. If unspecified it defaults to the most commonly used C<.>.
514
515 =item unsafe
516
517 This Storage driver normally installs its own C<HandleError>, sets
518 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
519 all database handles, including those supplied by a coderef.  It does this
520 so that it can have consistent and useful error behavior.
521
522 If you set this option to a true value, Storage will not do its usual
523 modifications to the database handle's attributes, and instead relies on
524 the settings in your connect_info DBI options (or the values you set in
525 your connection coderef, in the case that you are connecting via coderef).
526
527 Note that your custom settings can cause Storage to malfunction,
528 especially if you set a C<HandleError> handler that suppresses exceptions
529 and/or disable C<RaiseError>.
530
531 =item auto_savepoint
532
533 If this option is true, L<DBIx::Class> will use savepoints when nesting
534 transactions, making it possible to recover from failure in the inner
535 transaction without having to abort all outer transactions.
536
537 =item cursor_class
538
539 Use this argument to supply a cursor class other than the default
540 L<DBIx::Class::Storage::DBI::Cursor>.
541
542 =back
543
544 Some real-life examples of arguments to L</connect_info> and
545 L<DBIx::Class::Schema/connect>
546
547   # Simple SQLite connection
548   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
549
550   # Connect via subref
551   ->connect_info([ sub { DBI->connect(...) } ]);
552
553   # Connect via subref in hashref
554   ->connect_info([{
555     dbh_maker => sub { DBI->connect(...) },
556     on_connect_do => 'alter session ...',
557   }]);
558
559   # A bit more complicated
560   ->connect_info(
561     [
562       'dbi:Pg:dbname=foo',
563       'postgres',
564       'my_pg_password',
565       { AutoCommit => 1 },
566       { quote_char => q{"} },
567     ]
568   );
569
570   # Equivalent to the previous example
571   ->connect_info(
572     [
573       'dbi:Pg:dbname=foo',
574       'postgres',
575       'my_pg_password',
576       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
577     ]
578   );
579
580   # Same, but with hashref as argument
581   # See parse_connect_info for explanation
582   ->connect_info(
583     [{
584       dsn         => 'dbi:Pg:dbname=foo',
585       user        => 'postgres',
586       password    => 'my_pg_password',
587       AutoCommit  => 1,
588       quote_char  => q{"},
589       name_sep    => q{.},
590     }]
591   );
592
593   # Subref + DBIx::Class-specific connection options
594   ->connect_info(
595     [
596       sub { DBI->connect(...) },
597       {
598           quote_char => q{`},
599           name_sep => q{@},
600           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
601           disable_sth_caching => 1,
602       },
603     ]
604   );
605
606
607
608 =cut
609
610 sub connect_info {
611   my ($self, $info) = @_;
612
613   return $self->_connect_info if !$info;
614
615   $self->_connect_info($info); # copy for _connect_info
616
617   $info = $self->_normalize_connect_info($info)
618     if ref $info eq 'ARRAY';
619
620   my %attrs = (
621     %{ $self->_default_dbi_connect_attributes || {} },
622     %{ $info->{attributes} || {} },
623   );
624
625   my @args = @{ $info->{arguments} };
626
627   if (keys %attrs and ref $args[0] ne 'CODE') {
628     carp_unique (
629         'You provided explicit AutoCommit => 0 in your connection_info. '
630       . 'This is almost universally a bad idea (see the footnotes of '
631       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
632       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
633       . 'this warning.'
634     ) if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
635
636     push @args, \%attrs if keys %attrs;
637   }
638
639   # this is the authoritative "always an arrayref" thing fed to DBI->connect
640   # OR a single-element coderef-based $dbh factory
641   $self->_dbi_connect_info(\@args);
642
643   # extract the individual storage options
644   for my $storage_opt (keys %{ $info->{storage_options} }) {
645     my $value = $info->{storage_options}{$storage_opt};
646
647     $self->$storage_opt($value);
648   }
649
650   # Extract the individual sqlmaker options
651   #
652   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
653   #  the new set of options
654   $self->_sql_maker(undef);
655   $self->_sql_maker_opts({});
656
657   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
658     my $value = $info->{sql_maker_options}{$sql_maker_opt};
659
660     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
661   }
662
663   # FIXME - dirty:
664   # save attributes in a separate accessor so they are always
665   # introspectable, even in case of a CODE $dbhmaker
666   $self->_dbic_connect_attributes (\%attrs);
667
668   return $self->_connect_info;
669 }
670
671 sub _dbi_connect_info {
672   my $self = shift;
673
674   return $self->{_dbi_connect_info} = $_[0]
675     if @_;
676
677   my $conninfo = $self->{_dbi_connect_info} || [];
678
679   # last ditch effort to grab a DSN
680   if ( ! defined $conninfo->[0] and $ENV{DBI_DSN} ) {
681     my @new_conninfo = @$conninfo;
682     $new_conninfo[0] = $ENV{DBI_DSN};
683     $conninfo = \@new_conninfo;
684   }
685
686   return $conninfo;
687 }
688
689
690 sub _normalize_connect_info {
691   my ($self, $info_arg) = @_;
692   my %info;
693
694   my @args = @$info_arg;  # take a shallow copy for further mutilation
695
696   # combine/pre-parse arguments depending on invocation style
697
698   my %attrs;
699   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
700     %attrs = %{ $args[1] || {} };
701     @args = $args[0];
702   }
703   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
704     %attrs = %{$args[0]};
705     @args = ();
706     if (my $code = delete $attrs{dbh_maker}) {
707       @args = $code;
708
709       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
710       if (@ignored) {
711         carp sprintf (
712             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
713           . "to the result of 'dbh_maker'",
714
715           join (', ', map { "'$_'" } (@ignored) ),
716         );
717       }
718     }
719     else {
720       @args = delete @attrs{qw/dsn user password/};
721     }
722   }
723   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
724     %attrs = (
725       % { $args[3] || {} },
726       % { $args[4] || {} },
727     );
728     @args = @args[0,1,2];
729   }
730
731   $info{arguments} = \@args;
732
733   my @storage_opts = grep exists $attrs{$_},
734     @storage_options, 'cursor_class';
735
736   @{ $info{storage_options} }{@storage_opts} =
737     delete @attrs{@storage_opts} if @storage_opts;
738
739   my @sql_maker_opts = grep exists $attrs{$_},
740     qw/limit_dialect quote_char name_sep quote_names/;
741
742   @{ $info{sql_maker_options} }{@sql_maker_opts} =
743     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
744
745   $info{attributes} = \%attrs if %attrs;
746
747   return \%info;
748 }
749
750 sub _default_dbi_connect_attributes () {
751   +{
752     AutoCommit => 1,
753     PrintError => 0,
754     RaiseError => 1,
755     ShowErrorStatement => 1,
756   };
757 }
758
759 =head2 on_connect_do
760
761 This method is deprecated in favour of setting via L</connect_info>.
762
763 =cut
764
765 =head2 on_disconnect_do
766
767 This method is deprecated in favour of setting via L</connect_info>.
768
769 =cut
770
771 sub _parse_connect_do {
772   my ($self, $type) = @_;
773
774   my $val = $self->$type;
775   return () if not defined $val;
776
777   my @res;
778
779   if (not ref($val)) {
780     push @res, [ 'do_sql', $val ];
781   } elsif (ref($val) eq 'CODE') {
782     push @res, $val;
783   } elsif (ref($val) eq 'ARRAY') {
784     push @res, map { [ 'do_sql', $_ ] } @$val;
785   } else {
786     $self->throw_exception("Invalid type for $type: ".ref($val));
787   }
788
789   return \@res;
790 }
791
792 =head2 dbh_do
793
794 Arguments: ($subref | $method_name), @extra_coderef_args?
795
796 Execute the given $subref or $method_name using the new exception-based
797 connection management.
798
799 The first two arguments will be the storage object that C<dbh_do> was called
800 on and a database handle to use.  Any additional arguments will be passed
801 verbatim to the called subref as arguments 2 and onwards.
802
803 Using this (instead of $self->_dbh or $self->dbh) ensures correct
804 exception handling and reconnection (or failover in future subclasses).
805
806 Your subref should have no side-effects outside of the database, as
807 there is the potential for your subref to be partially double-executed
808 if the database connection was stale/dysfunctional.
809
810 Example:
811
812   my @stuff = $schema->storage->dbh_do(
813     sub {
814       my ($storage, $dbh, @cols) = @_;
815       my $cols = join(q{, }, @cols);
816       $dbh->selectrow_array("SELECT $cols FROM foo");
817     },
818     @column_list
819   );
820
821 =cut
822
823 sub dbh_do {
824   my $self = shift;
825   my $run_target = shift;
826
827   # short circuit when we know there is no need for a runner
828   #
829   # FIXME - assumption may be wrong
830   # the rationale for the txn_depth check is that if this block is a part
831   # of a larger transaction, everything up to that point is screwed anyway
832   return $self->$run_target($self->_get_dbh, @_)
833     if $self->{_in_do_block} or $self->transaction_depth;
834
835   # take a ref instead of a copy, to preserve @_ aliasing
836   # semantics within the coderef, but only if needed
837   # (pseudoforking doesn't like this trick much)
838   my $args = @_ ? \@_ : [];
839
840   DBIx::Class::Storage::BlockRunner->new(
841     storage => $self,
842     run_code => sub { $self->$run_target ($self->_get_dbh, @$args ) },
843     wrap_txn => 0,
844     retry_handler => sub { ! ( $_[0]->retried_count or $_[0]->storage->connected ) },
845   )->run;
846 }
847
848 sub txn_do {
849   $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
850   shift->next::method(@_);
851 }
852
853 =head2 disconnect
854
855 Our C<disconnect> method also performs a rollback first if the
856 database is not in C<AutoCommit> mode.
857
858 =cut
859
860 sub disconnect {
861   my ($self) = @_;
862
863   if( $self->_dbh ) {
864     my @actions;
865
866     push @actions, ( $self->on_disconnect_call || () );
867     push @actions, $self->_parse_connect_do ('on_disconnect_do');
868
869     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
870
871     # stops the "implicit rollback on disconnect" warning
872     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
873
874     %{ $self->_dbh->{CachedKids} } = ();
875     $self->_dbh->disconnect;
876     $self->_dbh(undef);
877   }
878 }
879
880 =head2 with_deferred_fk_checks
881
882 =over 4
883
884 =item Arguments: C<$coderef>
885
886 =item Return Value: The return value of $coderef
887
888 =back
889
890 Storage specific method to run the code ref with FK checks deferred or
891 in MySQL's case disabled entirely.
892
893 =cut
894
895 # Storage subclasses should override this
896 sub with_deferred_fk_checks {
897   my ($self, $sub) = @_;
898   $sub->();
899 }
900
901 =head2 connected
902
903 =over
904
905 =item Arguments: none
906
907 =item Return Value: 1|0
908
909 =back
910
911 Verifies that the current database handle is active and ready to execute
912 an SQL statement (e.g. the connection did not get stale, server is still
913 answering, etc.) This method is used internally by L</dbh>.
914
915 =cut
916
917 sub connected {
918   my $self = shift;
919   return 0 unless $self->_seems_connected;
920
921   #be on the safe side
922   local $self->_dbh->{RaiseError} = 1;
923
924   return $self->_ping;
925 }
926
927 sub _seems_connected {
928   my $self = shift;
929
930   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
931
932   my $dbh = $self->_dbh
933     or return 0;
934
935   return $dbh->FETCH('Active');
936 }
937
938 sub _ping {
939   my $self = shift;
940
941   my $dbh = $self->_dbh or return 0;
942
943   return $dbh->ping;
944 }
945
946 sub ensure_connected {
947   my ($self) = @_;
948
949   unless ($self->connected) {
950     $self->_populate_dbh;
951   }
952 }
953
954 =head2 dbh
955
956 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
957 is guaranteed to be healthy by implicitly calling L</connected>, and if
958 necessary performing a reconnection before returning. Keep in mind that this
959 is very B<expensive> on some database engines. Consider using L</dbh_do>
960 instead.
961
962 =cut
963
964 sub dbh {
965   my ($self) = @_;
966
967   if (not $self->_dbh) {
968     $self->_populate_dbh;
969   } else {
970     $self->ensure_connected;
971   }
972   return $self->_dbh;
973 }
974
975 # this is the internal "get dbh or connect (don't check)" method
976 sub _get_dbh {
977   my $self = shift;
978   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
979   $self->_populate_dbh unless $self->_dbh;
980   return $self->_dbh;
981 }
982
983 sub sql_maker {
984   my ($self) = @_;
985   unless ($self->_sql_maker) {
986     my $sql_maker_class = $self->sql_maker_class;
987
988     my %opts = %{$self->_sql_maker_opts||{}};
989     my $dialect =
990       $opts{limit_dialect}
991         ||
992       $self->sql_limit_dialect
993         ||
994       do {
995         my $s_class = (ref $self) || $self;
996         carp_unique (
997           "Your storage class ($s_class) does not set sql_limit_dialect and you "
998         . 'have not supplied an explicit limit_dialect in your connection_info. '
999         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1000         . 'databases but can be (and often is) painfully slow. '
1001         . "Please file an RT ticket against '$s_class'"
1002         ) if $self->_dbi_connect_info->[0];
1003
1004         'GenericSubQ';
1005       }
1006     ;
1007
1008     my ($quote_char, $name_sep);
1009
1010     if ($opts{quote_names}) {
1011       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1012         my $s_class = (ref $self) || $self;
1013         carp_unique (
1014           "You requested 'quote_names' but your storage class ($s_class) does "
1015         . 'not explicitly define a default sql_quote_char and you have not '
1016         . 'supplied a quote_char as part of your connection_info. DBIC will '
1017         .q{default to the ANSI SQL standard quote '"', which works most of }
1018         . "the time. Please file an RT ticket against '$s_class'."
1019         );
1020
1021         '"'; # RV
1022       };
1023
1024       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1025     }
1026
1027     $self->_sql_maker($sql_maker_class->new(
1028       bindtype=>'columns',
1029       array_datatypes => 1,
1030       limit_dialect => $dialect,
1031       ($quote_char ? (quote_char => $quote_char) : ()),
1032       name_sep => ($name_sep || '.'),
1033       %opts,
1034     ));
1035   }
1036   return $self->_sql_maker;
1037 }
1038
1039 # nothing to do by default
1040 sub _rebless {}
1041 sub _init {}
1042
1043 sub _populate_dbh {
1044   my ($self) = @_;
1045
1046   $self->_dbh(undef); # in case ->connected failed we might get sent here
1047   $self->_dbh_details({}); # reset everything we know
1048
1049   $self->_dbh($self->_connect);
1050
1051   $self->_conn_pid($$) unless DBIx::Class::_ENV_::BROKEN_FORK; # on win32 these are in fact threads
1052
1053   $self->_determine_driver;
1054
1055   # Always set the transaction depth on connect, since
1056   #  there is no transaction in progress by definition
1057   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1058
1059   $self->_run_connection_actions unless $self->{_in_determine_driver};
1060 }
1061
1062 sub _run_connection_actions {
1063   my $self = shift;
1064   my @actions;
1065
1066   push @actions, ( $self->on_connect_call || () );
1067   push @actions, $self->_parse_connect_do ('on_connect_do');
1068
1069   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1070 }
1071
1072
1073
1074 sub set_use_dbms_capability {
1075   $_[0]->set_inherited ($_[1], $_[2]);
1076 }
1077
1078 sub get_use_dbms_capability {
1079   my ($self, $capname) = @_;
1080
1081   my $use = $self->get_inherited ($capname);
1082   return defined $use
1083     ? $use
1084     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1085   ;
1086 }
1087
1088 sub set_dbms_capability {
1089   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1090 }
1091
1092 sub get_dbms_capability {
1093   my ($self, $capname) = @_;
1094
1095   my $cap = $self->_dbh_details->{capability}{$capname};
1096
1097   unless (defined $cap) {
1098     if (my $meth = $self->can ("_determine$capname")) {
1099       $cap = $self->$meth ? 1 : 0;
1100     }
1101     else {
1102       $cap = 0;
1103     }
1104
1105     $self->set_dbms_capability ($capname, $cap);
1106   }
1107
1108   return $cap;
1109 }
1110
1111 sub _server_info {
1112   my $self = shift;
1113
1114   my $info;
1115   unless ($info = $self->_dbh_details->{info}) {
1116
1117     $info = {};
1118
1119     my $server_version = try {
1120       $self->_get_server_version
1121     } catch {
1122       # driver determination *may* use this codepath
1123       # in which case we must rethrow
1124       $self->throw_exception($_) if $self->{_in_determine_driver};
1125
1126       # $server_version on failure
1127       undef;
1128     };
1129
1130     if (defined $server_version) {
1131       $info->{dbms_version} = $server_version;
1132
1133       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1134       my @verparts = split (/\./, $numeric_version);
1135       if (
1136         @verparts
1137           &&
1138         $verparts[0] <= 999
1139       ) {
1140         # consider only up to 3 version parts, iff not more than 3 digits
1141         my @use_parts;
1142         while (@verparts && @use_parts < 3) {
1143           my $p = shift @verparts;
1144           last if $p > 999;
1145           push @use_parts, $p;
1146         }
1147         push @use_parts, 0 while @use_parts < 3;
1148
1149         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1150       }
1151     }
1152
1153     $self->_dbh_details->{info} = $info;
1154   }
1155
1156   return $info;
1157 }
1158
1159 sub _get_server_version {
1160   shift->_dbh_get_info('SQL_DBMS_VER');
1161 }
1162
1163 sub _dbh_get_info {
1164   my ($self, $info) = @_;
1165
1166   if ($info =~ /[^0-9]/) {
1167     require DBI::Const::GetInfoType;
1168     $info = $DBI::Const::GetInfoType::GetInfoType{$info};
1169     $self->throw_exception("Info type '$_[1]' not provided by DBI::Const::GetInfoType")
1170       unless defined $info;
1171   }
1172
1173   $self->_get_dbh->get_info($info);
1174 }
1175
1176 sub _describe_connection {
1177   require DBI::Const::GetInfoReturn;
1178
1179   my $self = shift;
1180
1181   my $drv;
1182   try {
1183     $drv = $self->_extract_driver_from_connect_info;
1184     $self->ensure_connected;
1185   };
1186
1187   $drv = "DBD::$drv" if $drv;
1188
1189   my $res = {
1190     DBIC_DSN => $self->_dbi_connect_info->[0],
1191     DBI_VER => DBI->VERSION,
1192     DBIC_VER => DBIx::Class->VERSION,
1193     DBIC_DRIVER => ref $self,
1194     $drv ? (
1195       DBD => $drv,
1196       DBD_VER => try { $drv->VERSION },
1197     ) : (),
1198   };
1199
1200   # try to grab data even if we never managed to connect
1201   # will cover us in cases of an oddly broken half-connect
1202   for my $inf (
1203     #keys %DBI::Const::GetInfoType::GetInfoType,
1204     qw/
1205       SQL_CURSOR_COMMIT_BEHAVIOR
1206       SQL_CURSOR_ROLLBACK_BEHAVIOR
1207       SQL_CURSOR_SENSITIVITY
1208       SQL_DATA_SOURCE_NAME
1209       SQL_DBMS_NAME
1210       SQL_DBMS_VER
1211       SQL_DEFAULT_TXN_ISOLATION
1212       SQL_DM_VER
1213       SQL_DRIVER_NAME
1214       SQL_DRIVER_ODBC_VER
1215       SQL_DRIVER_VER
1216       SQL_EXPRESSIONS_IN_ORDERBY
1217       SQL_GROUP_BY
1218       SQL_IDENTIFIER_CASE
1219       SQL_IDENTIFIER_QUOTE_CHAR
1220       SQL_MAX_CATALOG_NAME_LEN
1221       SQL_MAX_COLUMN_NAME_LEN
1222       SQL_MAX_IDENTIFIER_LEN
1223       SQL_MAX_TABLE_NAME_LEN
1224       SQL_MULTIPLE_ACTIVE_TXN
1225       SQL_MULT_RESULT_SETS
1226       SQL_NEED_LONG_DATA_LEN
1227       SQL_NON_NULLABLE_COLUMNS
1228       SQL_ODBC_VER
1229       SQL_QUALIFIER_NAME_SEPARATOR
1230       SQL_QUOTED_IDENTIFIER_CASE
1231       SQL_TXN_CAPABLE
1232       SQL_TXN_ISOLATION_OPTION
1233     /
1234   ) {
1235     # some drivers barf on things they do not know about instead
1236     # of returning undef
1237     my $v = try { $self->_dbh_get_info($inf) };
1238     next unless defined $v;
1239
1240     #my $key = sprintf( '%s(%s)', $inf, $DBI::Const::GetInfoType::GetInfoType{$inf} );
1241     my $expl = DBI::Const::GetInfoReturn::Explain($inf, $v);
1242     $res->{$inf} = DBI::Const::GetInfoReturn::Format($inf, $v) . ( $expl ? " ($expl)" : '' );
1243   }
1244
1245   $res;
1246 }
1247
1248 sub _determine_driver {
1249   my ($self) = @_;
1250
1251   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1252     my $started_connected = 0;
1253     local $self->{_in_determine_driver} = 1;
1254
1255     if (ref($self) eq __PACKAGE__) {
1256       my $driver;
1257       if ($self->_dbh) { # we are connected
1258         $driver = $self->_dbh->{Driver}{Name};
1259         $started_connected = 1;
1260       }
1261       else {
1262         $driver = $self->_extract_driver_from_connect_info;
1263       }
1264
1265       if ($driver) {
1266         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1267         if ($self->load_optional_class($storage_class)) {
1268           mro::set_mro($storage_class, 'c3');
1269           bless $self, $storage_class;
1270           $self->_rebless();
1271         }
1272         else {
1273           $self->_warn_undetermined_driver(
1274             'This version of DBIC does not yet seem to supply a driver for '
1275           . "your particular RDBMS and/or connection method ('$driver')."
1276           );
1277         }
1278       }
1279       else {
1280         $self->_warn_undetermined_driver(
1281           'Unable to extract a driver name from connect info - this '
1282         . 'should not have happened.'
1283         );
1284       }
1285     }
1286
1287     $self->_driver_determined(1);
1288
1289     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1290
1291     if ($self->can('source_bind_attributes')) {
1292       $self->throw_exception(
1293         "Your storage subclass @{[ ref $self ]} provides (or inherits) the method "
1294       . 'source_bind_attributes() for which support has been removed as of Jan 2013. '
1295       . 'If you are not sure how to proceed please contact the development team via '
1296       . 'http://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT'
1297       );
1298     }
1299
1300     $self->_init; # run driver-specific initializations
1301
1302     $self->_run_connection_actions
1303         if !$started_connected && defined $self->_dbh;
1304   }
1305 }
1306
1307 sub _extract_driver_from_connect_info {
1308   my $self = shift;
1309
1310   my $drv;
1311
1312   # if connect_info is a CODEREF, we have no choice but to connect
1313   if (
1314     ref $self->_dbi_connect_info->[0]
1315       and
1316     reftype $self->_dbi_connect_info->[0] eq 'CODE'
1317   ) {
1318     $self->_populate_dbh;
1319     $drv = $self->_dbh->{Driver}{Name};
1320   }
1321   else {
1322     # try to use dsn to not require being connected, the driver may still
1323     # force a connection later in _rebless to determine version
1324     # (dsn may not be supplied at all if all we do is make a mock-schema)
1325     ($drv) = ($self->_dbi_connect_info->[0] || '') =~ /^dbi:([^:]+):/i;
1326     $drv ||= $ENV{DBI_DRIVER};
1327   }
1328
1329   return $drv;
1330 }
1331
1332 sub _determine_connector_driver {
1333   my ($self, $conn) = @_;
1334
1335   my $dbtype = $self->_dbh_get_info('SQL_DBMS_NAME');
1336
1337   if (not $dbtype) {
1338     $self->_warn_undetermined_driver(
1339       'Unable to retrieve RDBMS type (SQL_DBMS_NAME) of the engine behind your '
1340     . "$conn connector - this should not have happened."
1341     );
1342     return;
1343   }
1344
1345   $dbtype =~ s/\W/_/gi;
1346
1347   my $subclass = "DBIx::Class::Storage::DBI::${conn}::${dbtype}";
1348   return if $self->isa($subclass);
1349
1350   if ($self->load_optional_class($subclass)) {
1351     bless $self, $subclass;
1352     $self->_rebless;
1353   }
1354   else {
1355     $self->_warn_undetermined_driver(
1356       'This version of DBIC does not yet seem to supply a driver for '
1357     . "your particular RDBMS and/or connection method ('$conn/$dbtype')."
1358     );
1359   }
1360 }
1361
1362 sub _warn_undetermined_driver {
1363   my ($self, $msg) = @_;
1364
1365   require Data::Dumper::Concise;
1366
1367   carp_once ($msg . ' While we will attempt to continue anyway, the results '
1368   . 'are likely to be underwhelming. Please upgrade DBIC, and if this message '
1369   . "does not go away, file a bugreport including the following info:\n"
1370   . Data::Dumper::Concise::Dumper($self->_describe_connection)
1371   );
1372 }
1373
1374 sub _do_connection_actions {
1375   my $self          = shift;
1376   my $method_prefix = shift;
1377   my $call          = shift;
1378
1379   if (not ref($call)) {
1380     my $method = $method_prefix . $call;
1381     $self->$method(@_);
1382   } elsif (ref($call) eq 'CODE') {
1383     $self->$call(@_);
1384   } elsif (ref($call) eq 'ARRAY') {
1385     if (ref($call->[0]) ne 'ARRAY') {
1386       $self->_do_connection_actions($method_prefix, $_) for @$call;
1387     } else {
1388       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1389     }
1390   } else {
1391     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1392   }
1393
1394   return $self;
1395 }
1396
1397 sub connect_call_do_sql {
1398   my $self = shift;
1399   $self->_do_query(@_);
1400 }
1401
1402 sub disconnect_call_do_sql {
1403   my $self = shift;
1404   $self->_do_query(@_);
1405 }
1406
1407 # override in db-specific backend when necessary
1408 sub connect_call_datetime_setup { 1 }
1409
1410 sub _do_query {
1411   my ($self, $action) = @_;
1412
1413   if (ref $action eq 'CODE') {
1414     $action = $action->($self);
1415     $self->_do_query($_) foreach @$action;
1416   }
1417   else {
1418     # Most debuggers expect ($sql, @bind), so we need to exclude
1419     # the attribute hash which is the second argument to $dbh->do
1420     # furthermore the bind values are usually to be presented
1421     # as named arrayref pairs, so wrap those here too
1422     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1423     my $sql = shift @do_args;
1424     my $attrs = shift @do_args;
1425     my @bind = map { [ undef, $_ ] } @do_args;
1426
1427     $self->dbh_do(sub {
1428       $_[0]->_query_start($sql, \@bind);
1429       $_[1]->do($sql, $attrs, @do_args);
1430       $_[0]->_query_end($sql, \@bind);
1431     });
1432   }
1433
1434   return $self;
1435 }
1436
1437 sub _connect {
1438   my $self = shift;
1439
1440   my $info = $self->_dbi_connect_info;
1441
1442   $self->throw_exception("You did not provide any connection_info")
1443     unless defined $info->[0];
1444
1445   my ($old_connect_via, $dbh);
1446
1447   local $DBI::connect_via = 'connect' if $INC{'Apache/DBI.pm'} && $ENV{MOD_PERL};
1448
1449   # this odd anonymous coderef dereference is in fact really
1450   # necessary to avoid the unwanted effect described in perl5
1451   # RT#75792
1452   #
1453   # in addition the coderef itself can't reside inside the try{} block below
1454   # as it somehow triggers a leak under perl -d
1455   my $dbh_error_handler_installer = sub {
1456     weaken (my $weak_self = $_[0]);
1457
1458     # the coderef is blessed so we can distinguish it from externally
1459     # supplied handles (which must be preserved)
1460     $_[1]->{HandleError} = bless sub {
1461       if ($weak_self) {
1462         $weak_self->throw_exception("DBI Exception: $_[0]");
1463       }
1464       else {
1465         # the handler may be invoked by something totally out of
1466         # the scope of DBIC
1467         DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1468       }
1469     }, '__DBIC__DBH__ERROR__HANDLER__';
1470   };
1471
1472   try {
1473     if(ref $info->[0] eq 'CODE') {
1474       $dbh = $info->[0]->();
1475     }
1476     else {
1477       require DBI;
1478       $dbh = DBI->connect(@$info);
1479     }
1480
1481     die $DBI::errstr unless $dbh;
1482
1483     die sprintf ("%s fresh DBI handle with a *false* 'Active' attribute. "
1484       . 'This handle is disconnected as far as DBIC is concerned, and we can '
1485       . 'not continue',
1486       ref $info->[0] eq 'CODE'
1487         ? "Connection coderef $info->[0] returned a"
1488         : 'DBI->connect($schema->storage->connect_info) resulted in a'
1489     ) unless $dbh->FETCH('Active');
1490
1491     # sanity checks unless asked otherwise
1492     unless ($self->unsafe) {
1493
1494       $self->throw_exception(
1495         'Refusing clobbering of {HandleError} installed on externally supplied '
1496        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1497       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1498
1499       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1500       # request, or an external handle. Complain and set anyway
1501       unless ($dbh->{RaiseError}) {
1502         carp( ref $info->[0] eq 'CODE'
1503
1504           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1505            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1506            .'attribute has been supplied'
1507
1508           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1509            .'unsafe => 1. Toggling RaiseError back to true'
1510         );
1511
1512         $dbh->{RaiseError} = 1;
1513       }
1514
1515       $dbh_error_handler_installer->($self, $dbh);
1516     }
1517   }
1518   catch {
1519     $self->throw_exception("DBI Connection failed: $_")
1520   };
1521
1522   $self->_dbh_autocommit($dbh->{AutoCommit});
1523   return $dbh;
1524 }
1525
1526 sub txn_begin {
1527   my $self = shift;
1528
1529   # this means we have not yet connected and do not know the AC status
1530   # (e.g. coderef $dbh), need a full-fledged connection check
1531   if (! defined $self->_dbh_autocommit) {
1532     $self->ensure_connected;
1533   }
1534   # Otherwise simply connect or re-connect on pid changes
1535   else {
1536     $self->_get_dbh;
1537   }
1538
1539   $self->next::method(@_);
1540 }
1541
1542 sub _exec_txn_begin {
1543   my $self = shift;
1544
1545   # if the user is utilizing txn_do - good for him, otherwise we need to
1546   # ensure that the $dbh is healthy on BEGIN.
1547   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1548   # will be replaced by a failure of begin_work itself (which will be
1549   # then retried on reconnect)
1550   if ($self->{_in_do_block}) {
1551     $self->_dbh->begin_work;
1552   } else {
1553     $self->dbh_do(sub { $_[1]->begin_work });
1554   }
1555 }
1556
1557 sub txn_commit {
1558   my $self = shift;
1559
1560   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1561   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1562     unless $self->_dbh;
1563
1564   # esoteric case for folks using external $dbh handles
1565   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1566     carp "Storage transaction_depth 0 does not match "
1567         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1568     $self->transaction_depth(1);
1569   }
1570
1571   $self->next::method(@_);
1572
1573   # if AutoCommit is disabled txn_depth never goes to 0
1574   # as a new txn is started immediately on commit
1575   $self->transaction_depth(1) if (
1576     !$self->transaction_depth
1577       and
1578     defined $self->_dbh_autocommit
1579       and
1580     ! $self->_dbh_autocommit
1581   );
1582 }
1583
1584 sub _exec_txn_commit {
1585   shift->_dbh->commit;
1586 }
1587
1588 sub txn_rollback {
1589   my $self = shift;
1590
1591   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1592   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1593     unless $self->_dbh;
1594
1595   # esoteric case for folks using external $dbh handles
1596   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1597     carp "Storage transaction_depth 0 does not match "
1598         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1599     $self->transaction_depth(1);
1600   }
1601
1602   $self->next::method(@_);
1603
1604   # if AutoCommit is disabled txn_depth never goes to 0
1605   # as a new txn is started immediately on commit
1606   $self->transaction_depth(1) if (
1607     !$self->transaction_depth
1608       and
1609     defined $self->_dbh_autocommit
1610       and
1611     ! $self->_dbh_autocommit
1612   );
1613 }
1614
1615 sub _exec_txn_rollback {
1616   shift->_dbh->rollback;
1617 }
1618
1619 # generate some identical methods
1620 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1621   no strict qw/refs/;
1622   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1623     my $self = shift;
1624     $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1625     $self->throw_exception("Unable to $meth() on a disconnected storage")
1626       unless $self->_dbh;
1627     $self->next::method(@_);
1628   };
1629 }
1630
1631 # This used to be the top-half of _execute.  It was split out to make it
1632 #  easier to override in NoBindVars without duping the rest.  It takes up
1633 #  all of _execute's args, and emits $sql, @bind.
1634 sub _prep_for_execute {
1635   #my ($self, $op, $ident, $args) = @_;
1636   return shift->_gen_sql_bind(@_)
1637 }
1638
1639 sub _gen_sql_bind {
1640   my ($self, $op, $ident, $args) = @_;
1641
1642   my ($colinfos, $from);
1643   if ( blessed($ident) ) {
1644     $from = $ident->from;
1645     $colinfos = $ident->columns_info;
1646   }
1647
1648   my ($sql, $bind);
1649   ($sql, @$bind) = $self->sql_maker->$op( ($from || $ident), @$args );
1650
1651   $bind = $self->_resolve_bindattrs(
1652     $ident, [ @{$args->[2]{bind}||[]}, @$bind ], $colinfos
1653   );
1654
1655   if (
1656     ! $ENV{DBIC_DT_SEARCH_OK}
1657       and
1658     $op eq 'select'
1659       and
1660     first {
1661       length ref $_->[1]
1662         and
1663       blessed($_->[1])
1664         and
1665       $_->[1]->isa('DateTime')
1666     } @$bind
1667   ) {
1668     carp_unique 'DateTime objects passed to search() are not supported '
1669       . 'properly (InflateColumn::DateTime formats and settings are not '
1670       . 'respected.) See "Formatting DateTime objects in queries" in '
1671       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1672       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1673   }
1674
1675   return( $sql, $bind );
1676 }
1677
1678 sub _resolve_bindattrs {
1679   my ($self, $ident, $bind, $colinfos) = @_;
1680
1681   $colinfos ||= {};
1682
1683   my $resolve_bindinfo = sub {
1684     #my $infohash = shift;
1685
1686     %$colinfos = %{ $self->_resolve_column_info($ident) }
1687       unless keys %$colinfos;
1688
1689     my $ret;
1690     if (my $col = $_[0]->{dbic_colname}) {
1691       $ret = { %{$_[0]} };
1692
1693       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1694         if $colinfos->{$col}{data_type};
1695
1696       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1697         if $colinfos->{$col}{size};
1698     }
1699
1700     $ret || $_[0];
1701   };
1702
1703   return [ map {
1704     my $resolved =
1705       ( ref $_ ne 'ARRAY' or @$_ != 2 ) ? [ {}, $_ ]
1706     : ( ! defined $_->[0] )             ? [ {}, $_->[1] ]
1707     : (ref $_->[0] eq 'HASH')           ? [ (exists $_->[0]{dbd_attrs} or $_->[0]{sqlt_datatype})
1708                                               ? $_->[0]
1709                                               : $resolve_bindinfo->($_->[0])
1710                                             , $_->[1] ]
1711     : (ref $_->[0] eq 'SCALAR')         ? [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1712     :                                     [ $resolve_bindinfo->(
1713                                               { dbic_colname => $_->[0] }
1714                                             ), $_->[1] ]
1715     ;
1716
1717     if (
1718       ! exists $resolved->[0]{dbd_attrs}
1719         and
1720       ! $resolved->[0]{sqlt_datatype}
1721         and
1722       length ref $resolved->[1]
1723         and
1724       ! overload::Method($resolved->[1], '""')
1725     ) {
1726       require Data::Dumper;
1727       local $Data::Dumper::Maxdepth = 1;
1728       local $Data::Dumper::Terse = 1;
1729       local $Data::Dumper::Useqq = 1;
1730       local $Data::Dumper::Indent = 0;
1731       local $Data::Dumper::Pad = ' ';
1732       $self->throw_exception(
1733         'You must supply a datatype/bindtype (see DBIx::Class::ResultSet/DBIC BIND VALUES) '
1734       . 'for non-scalar value '. Data::Dumper::Dumper ($resolved->[1])
1735       );
1736     }
1737
1738     $resolved;
1739
1740   } @$bind ];
1741 }
1742
1743 sub _format_for_trace {
1744   #my ($self, $bind) = @_;
1745
1746   ### Turn @bind from something like this:
1747   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1748   ### to this:
1749   ###   ( "'1'", "'3'" )
1750
1751   map {
1752     defined( $_ && $_->[1] )
1753       ? qq{'$_->[1]'}
1754       : q{NULL}
1755   } @{$_[1] || []};
1756 }
1757
1758 sub _query_start {
1759   my ( $self, $sql, $bind ) = @_;
1760
1761   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1762     if $self->debug;
1763 }
1764
1765 sub _query_end {
1766   my ( $self, $sql, $bind ) = @_;
1767
1768   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1769     if $self->debug;
1770 }
1771
1772 sub _dbi_attrs_for_bind {
1773   my ($self, $ident, $bind) = @_;
1774
1775   my @attrs;
1776
1777   for (map { $_->[0] } @$bind) {
1778     push @attrs, do {
1779       if (exists $_->{dbd_attrs}) {
1780         $_->{dbd_attrs}
1781       }
1782       elsif($_->{sqlt_datatype}) {
1783         # cache the result in the dbh_details hash, as it can not change unless
1784         # we connect to something else
1785         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1786         if (not exists $cache->{$_->{sqlt_datatype}}) {
1787           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1788         }
1789         $cache->{$_->{sqlt_datatype}};
1790       }
1791       else {
1792         undef;  # always push something at this position
1793       }
1794     }
1795   }
1796
1797   return \@attrs;
1798 }
1799
1800 sub _execute {
1801   my ($self, $op, $ident, @args) = @_;
1802
1803   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1804
1805   # not even a PID check - we do not care about the state of the _dbh.
1806   # All we need is to get the appropriate drivers loaded if they aren't
1807   # already so that the assumption in ad7c50fc26e holds
1808   $self->_populate_dbh unless $self->_dbh;
1809
1810   $self->dbh_do( _dbh_execute =>     # retry over disconnects
1811     $sql,
1812     $bind,
1813     $self->_dbi_attrs_for_bind($ident, $bind),
1814   );
1815 }
1816
1817 sub _dbh_execute {
1818   my ($self, $dbh, $sql, $bind, $bind_attrs) = @_;
1819
1820   $self->_query_start( $sql, $bind );
1821
1822   my $sth = $self->_bind_sth_params(
1823     $self->_prepare_sth($dbh, $sql),
1824     $bind,
1825     $bind_attrs,
1826   );
1827
1828   # Can this fail without throwing an exception anyways???
1829   my $rv = $sth->execute();
1830   $self->throw_exception(
1831     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1832   ) if !$rv;
1833
1834   $self->_query_end( $sql, $bind );
1835
1836   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1837 }
1838
1839 sub _prepare_sth {
1840   my ($self, $dbh, $sql) = @_;
1841
1842   # 3 is the if_active parameter which avoids active sth re-use
1843   my $sth = $self->disable_sth_caching
1844     ? $dbh->prepare($sql)
1845     : $dbh->prepare_cached($sql, {}, 3);
1846
1847   # XXX You would think RaiseError would make this impossible,
1848   #  but apparently that's not true :(
1849   $self->throw_exception(
1850     $dbh->errstr
1851       ||
1852     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
1853             .'an exception and/or setting $dbh->errstr',
1854       length ($sql) > 20
1855         ? substr($sql, 0, 20) . '...'
1856         : $sql
1857       ,
1858       'DBD::' . $dbh->{Driver}{Name},
1859     )
1860   ) if !$sth;
1861
1862   $sth;
1863 }
1864
1865 sub _bind_sth_params {
1866   my ($self, $sth, $bind, $bind_attrs) = @_;
1867
1868   for my $i (0 .. $#$bind) {
1869     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1870       $sth->bind_param_inout(
1871         $i + 1, # bind params counts are 1-based
1872         $bind->[$i][1],
1873         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1874         $bind_attrs->[$i],
1875       );
1876     }
1877     else {
1878       # FIXME SUBOPTIMAL - most likely this is not necessary at all
1879       # confirm with dbi-dev whether explicit stringification is needed
1880       my $v = ( length ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""') )
1881         ? "$bind->[$i][1]"
1882         : $bind->[$i][1]
1883       ;
1884       $sth->bind_param(
1885         $i + 1,
1886         $v,
1887         $bind_attrs->[$i],
1888       );
1889     }
1890   }
1891
1892   $sth;
1893 }
1894
1895 sub _prefetch_autovalues {
1896   my ($self, $source, $colinfo, $to_insert) = @_;
1897
1898   my %values;
1899   for my $col (keys %$colinfo) {
1900     if (
1901       $colinfo->{$col}{auto_nextval}
1902         and
1903       (
1904         ! exists $to_insert->{$col}
1905           or
1906         ref $to_insert->{$col} eq 'SCALAR'
1907           or
1908         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1909       )
1910     ) {
1911       $values{$col} = $self->_sequence_fetch(
1912         'NEXTVAL',
1913         ( $colinfo->{$col}{sequence} ||=
1914             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1915         ),
1916       );
1917     }
1918   }
1919
1920   \%values;
1921 }
1922
1923 sub insert {
1924   my ($self, $source, $to_insert) = @_;
1925
1926   my $col_infos = $source->columns_info;
1927
1928   my $prefetched_values = $self->_prefetch_autovalues($source, $col_infos, $to_insert);
1929
1930   # fuse the values, but keep a separate list of prefetched_values so that
1931   # they can be fused once again with the final return
1932   $to_insert = { %$to_insert, %$prefetched_values };
1933
1934   # FIXME - we seem to assume undef values as non-supplied. This is wrong.
1935   # Investigate what does it take to s/defined/exists/
1936   my %pcols = map { $_ => 1 } $source->primary_columns;
1937   my (%retrieve_cols, $autoinc_supplied, $retrieve_autoinc_col);
1938   for my $col ($source->columns) {
1939     if ($col_infos->{$col}{is_auto_increment}) {
1940       $autoinc_supplied ||= 1 if defined $to_insert->{$col};
1941       $retrieve_autoinc_col ||= $col unless $autoinc_supplied;
1942     }
1943
1944     # nothing to retrieve when explicit values are supplied
1945     next if (defined $to_insert->{$col} and ! (
1946       ref $to_insert->{$col} eq 'SCALAR'
1947         or
1948       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1949     ));
1950
1951     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1952     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1953       $pcols{$col}
1954         or
1955       $col_infos->{$col}{retrieve_on_insert}
1956     );
1957   };
1958
1959   local $self->{_autoinc_supplied_for_op} = $autoinc_supplied;
1960   local $self->{_perform_autoinc_retrieval} = $retrieve_autoinc_col;
1961
1962   my ($sqla_opts, @ir_container);
1963   if (%retrieve_cols and $self->_use_insert_returning) {
1964     $sqla_opts->{returning_container} = \@ir_container
1965       if $self->_use_insert_returning_bound;
1966
1967     $sqla_opts->{returning} = [
1968       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1969     ];
1970   }
1971
1972   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1973
1974   my %returned_cols = %$to_insert;
1975   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1976     @ir_container = try {
1977       local $SIG{__WARN__} = sub {};
1978       my @r = $sth->fetchrow_array;
1979       $sth->finish;
1980       @r;
1981     } unless @ir_container;
1982
1983     @returned_cols{@$retlist} = @ir_container if @ir_container;
1984   }
1985   else {
1986     # pull in PK if needed and then everything else
1987     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1988
1989       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1990         unless $self->can('last_insert_id');
1991
1992       my @pri_values = $self->last_insert_id($source, @missing_pri);
1993
1994       $self->throw_exception( "Can't get last insert id" )
1995         unless (@pri_values == @missing_pri);
1996
1997       @returned_cols{@missing_pri} = @pri_values;
1998       delete @retrieve_cols{@missing_pri};
1999     }
2000
2001     # if there is more left to pull
2002     if (%retrieve_cols) {
2003       $self->throw_exception(
2004         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
2005       ) unless %pcols;
2006
2007       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
2008
2009       my $cur = DBIx::Class::ResultSet->new($source, {
2010         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
2011         select => \@left_to_fetch,
2012       })->cursor;
2013
2014       @returned_cols{@left_to_fetch} = $cur->next;
2015
2016       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
2017         if scalar $cur->next;
2018     }
2019   }
2020
2021   return { %$prefetched_values, %returned_cols };
2022 }
2023
2024 sub insert_bulk {
2025   my ($self, $source, $cols, $data) = @_;
2026
2027   my @col_range = (0..$#$cols);
2028
2029   # FIXME SUBOPTIMAL - most likely this is not necessary at all
2030   # confirm with dbi-dev whether explicit stringification is needed
2031   #
2032   # forcibly stringify whatever is stringifiable
2033   # ResultSet::populate() hands us a copy - safe to mangle
2034   for my $r (0 .. $#$data) {
2035     for my $c (0 .. $#{$data->[$r]}) {
2036       $data->[$r][$c] = "$data->[$r][$c]"
2037         if ( length ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
2038     }
2039   }
2040
2041   my $colinfos = $source->columns_info($cols);
2042
2043   local $self->{_autoinc_supplied_for_op} =
2044     (first { $_->{is_auto_increment} } values %$colinfos)
2045       ? 1
2046       : 0
2047   ;
2048
2049   # get a slice type index based on first row of data
2050   # a "column" in this context may refer to more than one bind value
2051   # e.g. \[ '?, ?', [...], [...] ]
2052   #
2053   # construct the value type index - a description of values types for every
2054   # per-column slice of $data:
2055   #
2056   # nonexistent - nonbind literal
2057   # 0 - regular value
2058   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
2059   #
2060   # also construct the column hash to pass to the SQL generator. For plain
2061   # (non literal) values - convert the members of the first row into a
2062   # literal+bind combo, with extra positional info in the bind attr hashref.
2063   # This will allow us to match the order properly, and is so contrived
2064   # because a user-supplied literal/bind (or something else specific to a
2065   # resultsource and/or storage driver) can inject extra binds along the
2066   # way, so one can't rely on "shift positions" ordering at all. Also we
2067   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
2068   # can be later matched up by address), because we want to supply a real
2069   # value on which perhaps e.g. datatype checks will be performed
2070   my ($proto_data, $value_type_by_col_idx);
2071   for my $i (@col_range) {
2072     my $colname = $cols->[$i];
2073     if (ref $data->[0][$i] eq 'SCALAR') {
2074       # no bind value at all - no type
2075
2076       $proto_data->{$colname} = $data->[0][$i];
2077     }
2078     elsif (ref $data->[0][$i] eq 'REF' and ref ${$data->[0][$i]} eq 'ARRAY' ) {
2079       # repack, so we don't end up mangling the original \[]
2080       my ($sql, @bind) = @${$data->[0][$i]};
2081
2082       # normalization of user supplied stuff
2083       my $resolved_bind = $self->_resolve_bindattrs(
2084         $source, \@bind, $colinfos,
2085       );
2086
2087       # store value-less (attrs only) bind info - we will be comparing all
2088       # supplied binds against this for sanity
2089       $value_type_by_col_idx->{$i} = [ map { $_->[0] } @$resolved_bind ];
2090
2091       $proto_data->{$colname} = \[ $sql, map { [
2092         # inject slice order to use for $proto_bind construction
2093           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $i, _literal_bind_subindex => $_+1 }
2094             =>
2095           $resolved_bind->[$_][1]
2096         ] } (0 .. $#bind)
2097       ];
2098     }
2099     else {
2100       $value_type_by_col_idx->{$i} = undef;
2101
2102       $proto_data->{$colname} = \[ '?', [
2103         { dbic_colname => $colname, _bind_data_slice_idx => $i }
2104           =>
2105         $data->[0][$i]
2106       ] ];
2107     }
2108   }
2109
2110   my ($sql, $proto_bind) = $self->_prep_for_execute (
2111     'insert',
2112     $source,
2113     [ $proto_data ],
2114   );
2115
2116   if (! @$proto_bind and keys %$value_type_by_col_idx) {
2117     # if the bindlist is empty and we had some dynamic binds, this means the
2118     # storage ate them away (e.g. the NoBindVars component) and interpolated
2119     # them directly into the SQL. This obviously can't be good for multi-inserts
2120     $self->throw_exception('Cannot insert_bulk without support for placeholders');
2121   }
2122
2123   # sanity checks
2124   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
2125   #
2126   # use an error reporting closure for convenience (less to pass)
2127   my $bad_slice_report_cref = sub {
2128     my ($msg, $r_idx, $c_idx) = @_;
2129     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
2130       $msg,
2131       $cols->[$c_idx],
2132       do {
2133         require Data::Dumper::Concise;
2134         local $Data::Dumper::Maxdepth = 5;
2135         Data::Dumper::Concise::Dumper ({
2136           map { $cols->[$_] =>
2137             $data->[$r_idx][$_]
2138           } @col_range
2139         }),
2140       }
2141     );
2142   };
2143
2144   for my $col_idx (@col_range) {
2145     my $reference_val = $data->[0][$col_idx];
2146
2147     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
2148       my $val = $data->[$row_idx][$col_idx];
2149
2150       if (! exists $value_type_by_col_idx->{$col_idx}) { # literal no binds
2151         if (ref $val ne 'SCALAR') {
2152           $bad_slice_report_cref->(
2153             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
2154             $row_idx,
2155             $col_idx,
2156           );
2157         }
2158         elsif ($$val ne $$reference_val) {
2159           $bad_slice_report_cref->(
2160             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
2161             $row_idx,
2162             $col_idx,
2163           );
2164         }
2165       }
2166       elsif (! defined $value_type_by_col_idx->{$col_idx} ) {  # regular non-literal value
2167         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
2168           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
2169         }
2170       }
2171       else {  # binds from a \[], compare type and attrs
2172         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
2173           $bad_slice_report_cref->(
2174             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
2175             $row_idx,
2176             $col_idx,
2177           );
2178         }
2179         # start drilling down and bail out early on identical refs
2180         elsif (
2181           $reference_val != $val
2182             or
2183           $$reference_val != $$val
2184         ) {
2185           if (${$val}->[0] ne ${$reference_val}->[0]) {
2186             $bad_slice_report_cref->(
2187               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
2188               $row_idx,
2189               $col_idx,
2190             );
2191           }
2192           # need to check the bind attrs - a bind will happen only once for
2193           # the entire dataset, so any changes further down will be ignored.
2194           elsif (! Data::Compare::Compare(
2195             $value_type_by_col_idx->{$col_idx},
2196             [
2197               map
2198               { $_->[0] }
2199               @{$self->_resolve_bindattrs(
2200                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
2201               )}
2202             ],
2203           )) {
2204             $bad_slice_report_cref->(
2205               'Differing bind attributes on literal/bind values not supported',
2206               $row_idx,
2207               $col_idx,
2208             );
2209           }
2210         }
2211       }
2212     }
2213   }
2214
2215   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
2216   # are atomic (even if execute_for_fetch is a single call). Thus a safety
2217   # scope guard
2218   my $guard = $self->txn_scope_guard;
2219
2220   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
2221   my $sth = $self->_prepare_sth($self->_dbh, $sql);
2222   my $rv = do {
2223     if (@$proto_bind) {
2224       # proto bind contains the information on which pieces of $data to pull
2225       # $cols is passed in only for prettier error-reporting
2226       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
2227     }
2228     else {
2229       # bind_param_array doesn't work if there are no binds
2230       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
2231     }
2232   };
2233
2234   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
2235
2236   $guard->commit;
2237
2238   return wantarray ? ($rv, $sth, @$proto_bind) : $rv;
2239 }
2240
2241 # execute_for_fetch is capable of returning data just fine (it means it
2242 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
2243 # is the void-populate fast-path we will just ignore this altogether
2244 # for the time being.
2245 sub _dbh_execute_for_fetch {
2246   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
2247
2248   my @idx_range = ( 0 .. $#$proto_bind );
2249
2250   # If we have any bind attributes to take care of, we will bind the
2251   # proto-bind data (which will never be used by execute_for_fetch)
2252   # However since column bindtypes are "sticky", this is sufficient
2253   # to get the DBD to apply the bindtype to all values later on
2254
2255   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
2256
2257   for my $i (@idx_range) {
2258     $sth->bind_param (
2259       $i+1, # DBI bind indexes are 1-based
2260       $proto_bind->[$i][1],
2261       $bind_attrs->[$i],
2262     ) if defined $bind_attrs->[$i];
2263   }
2264
2265   # At this point $data slots named in the _bind_data_slice_idx of
2266   # each piece of $proto_bind are either \[]s or plain values to be
2267   # passed in. Construct the dispensing coderef. *NOTE* the order
2268   # of $data will differ from this of the ?s in the SQL (due to
2269   # alphabetical ordering by colname). We actually do want to
2270   # preserve this behavior so that prepare_cached has a better
2271   # chance of matching on unrelated calls
2272
2273   my $fetch_row_idx = -1; # saner loop this way
2274   my $fetch_tuple = sub {
2275     return undef if ++$fetch_row_idx > $#$data;
2276
2277     return [ map { defined $_->{_literal_bind_subindex}
2278       ? ${ $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]}
2279          ->[ $_->{_literal_bind_subindex} ]
2280           ->[1]
2281       : $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]
2282     } map { $_->[0] } @$proto_bind];
2283   };
2284
2285   my $tuple_status = [];
2286   my ($rv, $err);
2287   try {
2288     $rv = $sth->execute_for_fetch(
2289       $fetch_tuple,
2290       $tuple_status,
2291     );
2292   }
2293   catch {
2294     $err = shift;
2295   };
2296
2297   # Not all DBDs are create equal. Some throw on error, some return
2298   # an undef $rv, and some set $sth->err - try whatever we can
2299   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2300     ! defined $err
2301       and
2302     ( !defined $rv or $sth->err )
2303   );
2304
2305   # Statement must finish even if there was an exception.
2306   try {
2307     $sth->finish
2308   }
2309   catch {
2310     $err = shift unless defined $err
2311   };
2312
2313   if (defined $err) {
2314     my $i = 0;
2315     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2316
2317     $self->throw_exception("Unexpected populate error: $err")
2318       if ($i > $#$tuple_status);
2319
2320     require Data::Dumper::Concise;
2321     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2322       ($tuple_status->[$i][1] || $err),
2323       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2324     );
2325   }
2326
2327   return $rv;
2328 }
2329
2330 sub _dbh_execute_inserts_with_no_binds {
2331   my ($self, $sth, $count) = @_;
2332
2333   my $err;
2334   try {
2335     my $dbh = $self->_get_dbh;
2336     local $dbh->{RaiseError} = 1;
2337     local $dbh->{PrintError} = 0;
2338
2339     $sth->execute foreach 1..$count;
2340   }
2341   catch {
2342     $err = shift;
2343   };
2344
2345   # Make sure statement is finished even if there was an exception.
2346   try {
2347     $sth->finish
2348   }
2349   catch {
2350     $err = shift unless defined $err;
2351   };
2352
2353   $self->throw_exception($err) if defined $err;
2354
2355   return $count;
2356 }
2357
2358 sub update {
2359   #my ($self, $source, @args) = @_;
2360   shift->_execute('update', @_);
2361 }
2362
2363
2364 sub delete {
2365   #my ($self, $source, @args) = @_;
2366   shift->_execute('delete', @_);
2367 }
2368
2369 sub _select {
2370   my $self = shift;
2371   $self->_execute($self->_select_args(@_));
2372 }
2373
2374 sub _select_args_to_query {
2375   my $self = shift;
2376
2377   $self->throw_exception(
2378     "Unable to generate limited query representation with 'software_limit' enabled"
2379   ) if ($_[3]->{software_limit} and ($_[3]->{offset} or $_[3]->{rows}) );
2380
2381   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2382   #  = $self->_select_args($ident, $select, $cond, $attrs);
2383   my ($op, $ident, @args) =
2384     $self->_select_args(@_);
2385
2386   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2387   my ($sql, $bind) = $self->_gen_sql_bind($op, $ident, \@args);
2388
2389   # reuse the bind arrayref
2390   unshift @{$bind}, "($sql)";
2391   \$bind;
2392 }
2393
2394 sub _select_args {
2395   my ($self, $ident, $select, $where, $orig_attrs) = @_;
2396
2397   # FIXME - that kind of caching would be nice to have
2398   # however currently we *may* pass the same $orig_attrs
2399   # with different ident/select/where
2400   # the whole interface needs to be rethought, since it
2401   # was centered around the flawed SQLA API. We can do
2402   # soooooo much better now. But that is also another
2403   # battle...
2404   #return (
2405   #  'select', $orig_attrs->{!args_as_stored_at_the_end_of_this_method!}
2406   #) if $orig_attrs->{!args_as_stored_at_the_end_of_this_method!};
2407
2408   my $sql_maker = $self->sql_maker;
2409   my $alias2source = $self->_resolve_ident_sources ($ident);
2410
2411   my $attrs = {
2412     %$orig_attrs,
2413     select => $select,
2414     from => $ident,
2415     where => $where,
2416
2417     # limit dialects use this stuff
2418     # yes, some CDBICompat crap does not supply an {alias} >.<
2419     ( $orig_attrs->{alias} and $alias2source->{$orig_attrs->{alias}} )
2420       ? ( _rsroot_rsrc => $alias2source->{$orig_attrs->{alias}} )
2421       : ()
2422     ,
2423   };
2424
2425   # Sanity check the attributes (SQLMaker does it too, but
2426   # in case of a software_limit we'll never reach there)
2427   if (defined $attrs->{offset}) {
2428     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2429       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2430   }
2431
2432   if (defined $attrs->{rows}) {
2433     $self->throw_exception("The rows attribute must be a positive integer if present")
2434       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2435   }
2436   elsif ($attrs->{offset}) {
2437     # MySQL actually recommends this approach.  I cringe.
2438     $attrs->{rows} = $sql_maker->__max_int;
2439   }
2440
2441   # see if we will need to tear the prefetch apart to satisfy group_by == select
2442   # this is *extremely tricky* to get right, I am still not sure I did
2443   #
2444   my ($prefetch_needs_subquery, @limit_args);
2445
2446   if ( $attrs->{_grouped_by_distinct} and $attrs->{collapse} ) {
2447     # we already know there is a valid group_by (we made it) and we know it is
2448     # intended to be based *only* on non-multi stuff
2449     # short circuit the group_by parsing below
2450     $prefetch_needs_subquery = 1;
2451   }
2452   elsif (
2453     # The rationale is that even if we do *not* have collapse, we still
2454     # need to wrap the core grouped select/group_by in a subquery
2455     # so that databases that care about group_by/select equivalence
2456     # are happy (this includes MySQL in strict_mode)
2457     # If any of the other joined tables are referenced in the group_by
2458     # however - the user is on their own
2459     ( $prefetch_needs_subquery or $attrs->{_related_results_construction} )
2460       and
2461     $attrs->{group_by}
2462       and
2463     @{$attrs->{group_by}}
2464       and
2465     my $grp_aliases = try { # try{} because $attrs->{from} may be unreadable
2466       $self->_resolve_aliastypes_from_select_args({ from => $attrs->{from}, group_by => $attrs->{group_by} })
2467     }
2468   ) {
2469     # no aliases other than our own in group_by
2470     # if there are - do not allow subquery even if limit is present
2471     $prefetch_needs_subquery = ! scalar grep { $_ ne $attrs->{alias} } keys %{ $grp_aliases->{grouping} || {} };
2472   }
2473   elsif ( $attrs->{rows} && $attrs->{collapse} ) {
2474     # active collapse with a limit - that one is a no-brainer unless
2475     # overruled by a group_by above
2476     $prefetch_needs_subquery = 1;
2477   }
2478
2479   if ($prefetch_needs_subquery) {
2480     $attrs = $self->_adjust_select_args_for_complex_prefetch ($attrs);
2481   }
2482   elsif (! $attrs->{software_limit} ) {
2483     push @limit_args, (
2484       $attrs->{rows} || (),
2485       $attrs->{offset} || (),
2486     );
2487   }
2488
2489   # try to simplify the joinmap further (prune unreferenced type-single joins)
2490   if (
2491     ! $prefetch_needs_subquery  # already pruned
2492       and
2493     ref $attrs->{from}
2494       and
2495     reftype $attrs->{from} eq 'ARRAY'
2496       and
2497     @{$attrs->{from}} != 1
2498   ) {
2499     ($attrs->{from}, $attrs->{_aliastypes}) = $self->_prune_unused_joins ($attrs);
2500   }
2501
2502   # FIXME this is a gross, inefficient, largely incorrect and fragile hack
2503   # during the result inflation stage we *need* to know what was the aliastype
2504   # map as sqla saw it when the final pieces of SQL were being assembled
2505   # Originally we simply carried around the entirety of $attrs, but this
2506   # resulted in resultsets that are being reused growing continuously, as
2507   # the hash in question grew deeper and deeper.
2508   # Instead hand-pick what to take with us here (we actually don't need much
2509   # at this point just the map itself)
2510   $orig_attrs->{_last_sqlmaker_alias_map} = $attrs->{_aliastypes};
2511
2512 ###
2513   # This would be the point to deflate anything found in $attrs->{where}
2514   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2515   # expect a result object. And all we have is a resultsource (it is trivial
2516   # to extract deflator coderefs via $alias2source above).
2517   #
2518   # I don't see a way forward other than changing the way deflators are
2519   # invoked, and that's just bad...
2520 ###
2521
2522   return ( 'select', @{$attrs}{qw(from select where)}, $attrs, @limit_args );
2523 }
2524
2525 # Returns a counting SELECT for a simple count
2526 # query. Abstracted so that a storage could override
2527 # this to { count => 'firstcol' } or whatever makes
2528 # sense as a performance optimization
2529 sub _count_select {
2530   #my ($self, $source, $rs_attrs) = @_;
2531   return { count => '*' };
2532 }
2533
2534 =head2 select
2535
2536 =over 4
2537
2538 =item Arguments: $ident, $select, $condition, $attrs
2539
2540 =back
2541
2542 Handle a SQL select statement.
2543
2544 =cut
2545
2546 sub select {
2547   my $self = shift;
2548   my ($ident, $select, $condition, $attrs) = @_;
2549   return $self->cursor_class->new($self, \@_, $attrs);
2550 }
2551
2552 sub select_single {
2553   my $self = shift;
2554   my ($rv, $sth, @bind) = $self->_select(@_);
2555   my @row = $sth->fetchrow_array;
2556   my @nextrow = $sth->fetchrow_array if @row;
2557   if(@row && @nextrow) {
2558     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2559   }
2560   # Need to call finish() to work round broken DBDs
2561   $sth->finish();
2562   return @row;
2563 }
2564
2565 =head2 sql_limit_dialect
2566
2567 This is an accessor for the default SQL limit dialect used by a particular
2568 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2569 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2570 see L<DBIx::Class::SQLMaker::LimitDialects>.
2571
2572 =cut
2573
2574 sub _dbh_columns_info_for {
2575   my ($self, $dbh, $table) = @_;
2576
2577   if ($dbh->can('column_info')) {
2578     my %result;
2579     my $caught;
2580     try {
2581       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2582       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2583       $sth->execute();
2584       while ( my $info = $sth->fetchrow_hashref() ){
2585         my %column_info;
2586         $column_info{data_type}   = $info->{TYPE_NAME};
2587         $column_info{size}      = $info->{COLUMN_SIZE};
2588         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2589         $column_info{default_value} = $info->{COLUMN_DEF};
2590         my $col_name = $info->{COLUMN_NAME};
2591         $col_name =~ s/^\"(.*)\"$/$1/;
2592
2593         $result{$col_name} = \%column_info;
2594       }
2595     } catch {
2596       $caught = 1;
2597     };
2598     return \%result if !$caught && scalar keys %result;
2599   }
2600
2601   my %result;
2602   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2603   $sth->execute;
2604   my @columns = @{$sth->{NAME_lc}};
2605   for my $i ( 0 .. $#columns ){
2606     my %column_info;
2607     $column_info{data_type} = $sth->{TYPE}->[$i];
2608     $column_info{size} = $sth->{PRECISION}->[$i];
2609     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2610
2611     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2612       $column_info{data_type} = $1;
2613       $column_info{size}    = $2;
2614     }
2615
2616     $result{$columns[$i]} = \%column_info;
2617   }
2618   $sth->finish;
2619
2620   foreach my $col (keys %result) {
2621     my $colinfo = $result{$col};
2622     my $type_num = $colinfo->{data_type};
2623     my $type_name;
2624     if(defined $type_num && $dbh->can('type_info')) {
2625       my $type_info = $dbh->type_info($type_num);
2626       $type_name = $type_info->{TYPE_NAME} if $type_info;
2627       $colinfo->{data_type} = $type_name if $type_name;
2628     }
2629   }
2630
2631   return \%result;
2632 }
2633
2634 sub columns_info_for {
2635   my ($self, $table) = @_;
2636   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2637 }
2638
2639 =head2 last_insert_id
2640
2641 Return the row id of the last insert.
2642
2643 =cut
2644
2645 sub _dbh_last_insert_id {
2646     my ($self, $dbh, $source, $col) = @_;
2647
2648     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2649
2650     return $id if defined $id;
2651
2652     my $class = ref $self;
2653     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2654 }
2655
2656 sub last_insert_id {
2657   my $self = shift;
2658   $self->_dbh_last_insert_id ($self->_dbh, @_);
2659 }
2660
2661 =head2 _native_data_type
2662
2663 =over 4
2664
2665 =item Arguments: $type_name
2666
2667 =back
2668
2669 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2670 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2671 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2672
2673 The default implementation returns C<undef>, implement in your Storage driver if
2674 you need this functionality.
2675
2676 Should map types from other databases to the native RDBMS type, for example
2677 C<VARCHAR2> to C<VARCHAR>.
2678
2679 Types with modifiers should map to the underlying data type. For example,
2680 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2681
2682 Composite types should map to the container type, for example
2683 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2684
2685 =cut
2686
2687 sub _native_data_type {
2688   #my ($self, $data_type) = @_;
2689   return undef
2690 }
2691
2692 # Check if placeholders are supported at all
2693 sub _determine_supports_placeholders {
2694   my $self = shift;
2695   my $dbh  = $self->_get_dbh;
2696
2697   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2698   # but it is inaccurate more often than not
2699   return try {
2700     local $dbh->{PrintError} = 0;
2701     local $dbh->{RaiseError} = 1;
2702     $dbh->do('select ?', {}, 1);
2703     1;
2704   }
2705   catch {
2706     0;
2707   };
2708 }
2709
2710 # Check if placeholders bound to non-string types throw exceptions
2711 #
2712 sub _determine_supports_typeless_placeholders {
2713   my $self = shift;
2714   my $dbh  = $self->_get_dbh;
2715
2716   return try {
2717     local $dbh->{PrintError} = 0;
2718     local $dbh->{RaiseError} = 1;
2719     # this specifically tests a bind that is NOT a string
2720     $dbh->do('select 1 where 1 = ?', {}, 1);
2721     1;
2722   }
2723   catch {
2724     0;
2725   };
2726 }
2727
2728 =head2 sqlt_type
2729
2730 Returns the database driver name.
2731
2732 =cut
2733
2734 sub sqlt_type {
2735   shift->_get_dbh->{Driver}->{Name};
2736 }
2737
2738 =head2 bind_attribute_by_data_type
2739
2740 Given a datatype from column info, returns a database specific bind
2741 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2742 let the database planner just handle it.
2743
2744 This method is always called after the driver has been determined and a DBI
2745 connection has been established. Therefore you can refer to C<DBI::$constant>
2746 and/or C<DBD::$driver::$constant> directly, without worrying about loading
2747 the correct modules.
2748
2749 =cut
2750
2751 sub bind_attribute_by_data_type {
2752     return;
2753 }
2754
2755 =head2 is_datatype_numeric
2756
2757 Given a datatype from column_info, returns a boolean value indicating if
2758 the current RDBMS considers it a numeric value. This controls how
2759 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2760 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2761 be performed instead of the usual C<eq>.
2762
2763 =cut
2764
2765 sub is_datatype_numeric {
2766   #my ($self, $dt) = @_;
2767
2768   return 0 unless $_[1];
2769
2770   $_[1] =~ /^ (?:
2771     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2772   ) $/ix;
2773 }
2774
2775
2776 =head2 create_ddl_dir
2777
2778 =over 4
2779
2780 =item Arguments: $schema, \@databases, $version, $directory, $preversion, \%sqlt_args
2781
2782 =back
2783
2784 Creates a SQL file based on the Schema, for each of the specified
2785 database engines in C<\@databases> in the given directory.
2786 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2787
2788 Given a previous version number, this will also create a file containing
2789 the ALTER TABLE statements to transform the previous schema into the
2790 current one. Note that these statements may contain C<DROP TABLE> or
2791 C<DROP COLUMN> statements that can potentially destroy data.
2792
2793 The file names are created using the C<ddl_filename> method below, please
2794 override this method in your schema if you would like a different file
2795 name format. For the ALTER file, the same format is used, replacing
2796 $version in the name with "$preversion-$version".
2797
2798 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2799 The most common value for this would be C<< { add_drop_table => 1 } >>
2800 to have the SQL produced include a C<DROP TABLE> statement for each table
2801 created. For quoting purposes supply C<quote_identifiers>.
2802
2803 If no arguments are passed, then the following default values are assumed:
2804
2805 =over 4
2806
2807 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2808
2809 =item version    - $schema->schema_version
2810
2811 =item directory  - './'
2812
2813 =item preversion - <none>
2814
2815 =back
2816
2817 By default, C<\%sqlt_args> will have
2818
2819  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2820
2821 merged with the hash passed in. To disable any of those features, pass in a
2822 hashref like the following
2823
2824  { ignore_constraint_names => 0, # ... other options }
2825
2826
2827 WARNING: You are strongly advised to check all SQL files created, before applying
2828 them.
2829
2830 =cut
2831
2832 sub create_ddl_dir {
2833   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2834
2835   unless ($dir) {
2836     carp "No directory given, using ./\n";
2837     $dir = './';
2838   } else {
2839       -d $dir
2840         or
2841       (require File::Path and File::Path::mkpath (["$dir"]))  # mkpath does not like objects (i.e. Path::Class::Dir)
2842         or
2843       $self->throw_exception(
2844         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2845       );
2846   }
2847
2848   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2849
2850   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2851   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2852
2853   my $schema_version = $schema->schema_version || '1.x';
2854   $version ||= $schema_version;
2855
2856   $sqltargs = {
2857     add_drop_table => 1,
2858     ignore_constraint_names => 1,
2859     ignore_index_names => 1,
2860     %{$sqltargs || {}}
2861   };
2862
2863   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2864     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2865   }
2866
2867   my $sqlt = SQL::Translator->new( $sqltargs );
2868
2869   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2870   my $sqlt_schema = $sqlt->translate({ data => $schema })
2871     or $self->throw_exception ($sqlt->error);
2872
2873   foreach my $db (@$databases) {
2874     $sqlt->reset();
2875     $sqlt->{schema} = $sqlt_schema;
2876     $sqlt->producer($db);
2877
2878     my $file;
2879     my $filename = $schema->ddl_filename($db, $version, $dir);
2880     if (-e $filename && ($version eq $schema_version )) {
2881       # if we are dumping the current version, overwrite the DDL
2882       carp "Overwriting existing DDL file - $filename";
2883       unlink($filename);
2884     }
2885
2886     my $output = $sqlt->translate;
2887     if(!$output) {
2888       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2889       next;
2890     }
2891     if(!open($file, ">$filename")) {
2892       $self->throw_exception("Can't open $filename for writing ($!)");
2893       next;
2894     }
2895     print $file $output;
2896     close($file);
2897
2898     next unless ($preversion);
2899
2900     require SQL::Translator::Diff;
2901
2902     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2903     if(!-e $prefilename) {
2904       carp("No previous schema file found ($prefilename)");
2905       next;
2906     }
2907
2908     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2909     if(-e $difffile) {
2910       carp("Overwriting existing diff file - $difffile");
2911       unlink($difffile);
2912     }
2913
2914     my $source_schema;
2915     {
2916       my $t = SQL::Translator->new($sqltargs);
2917       $t->debug( 0 );
2918       $t->trace( 0 );
2919
2920       $t->parser( $db )
2921         or $self->throw_exception ($t->error);
2922
2923       my $out = $t->translate( $prefilename )
2924         or $self->throw_exception ($t->error);
2925
2926       $source_schema = $t->schema;
2927
2928       $source_schema->name( $prefilename )
2929         unless ( $source_schema->name );
2930     }
2931
2932     # The "new" style of producers have sane normalization and can support
2933     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2934     # And we have to diff parsed SQL against parsed SQL.
2935     my $dest_schema = $sqlt_schema;
2936
2937     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2938       my $t = SQL::Translator->new($sqltargs);
2939       $t->debug( 0 );
2940       $t->trace( 0 );
2941
2942       $t->parser( $db )
2943         or $self->throw_exception ($t->error);
2944
2945       my $out = $t->translate( $filename )
2946         or $self->throw_exception ($t->error);
2947
2948       $dest_schema = $t->schema;
2949
2950       $dest_schema->name( $filename )
2951         unless $dest_schema->name;
2952     }
2953
2954     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2955                                                   $dest_schema,   $db,
2956                                                   $sqltargs
2957                                                  );
2958     if(!open $file, ">$difffile") {
2959       $self->throw_exception("Can't write to $difffile ($!)");
2960       next;
2961     }
2962     print $file $diff;
2963     close($file);
2964   }
2965 }
2966
2967 =head2 deployment_statements
2968
2969 =over 4
2970
2971 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2972
2973 =back
2974
2975 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2976
2977 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2978 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2979
2980 C<$directory> is used to return statements from files in a previously created
2981 L</create_ddl_dir> directory and is optional. The filenames are constructed
2982 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2983
2984 If no C<$directory> is specified then the statements are constructed on the
2985 fly using L<SQL::Translator> and C<$version> is ignored.
2986
2987 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2988
2989 =cut
2990
2991 sub deployment_statements {
2992   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2993   $type ||= $self->sqlt_type;
2994   $version ||= $schema->schema_version || '1.x';
2995   $dir ||= './';
2996   my $filename = $schema->ddl_filename($type, $version, $dir);
2997   if(-f $filename)
2998   {
2999       # FIXME replace this block when a proper sane sql parser is available
3000       my $file;
3001       open($file, "<$filename")
3002         or $self->throw_exception("Can't open $filename ($!)");
3003       my @rows = <$file>;
3004       close($file);
3005       return join('', @rows);
3006   }
3007
3008   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
3009     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
3010   }
3011
3012   # sources needs to be a parser arg, but for simplicity allow at top level
3013   # coming in
3014   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
3015       if exists $sqltargs->{sources};
3016
3017   my $tr = SQL::Translator->new(
3018     producer => "SQL::Translator::Producer::${type}",
3019     %$sqltargs,
3020     parser => 'SQL::Translator::Parser::DBIx::Class',
3021     data => $schema,
3022   );
3023
3024   return preserve_context {
3025     $tr->translate
3026   } after => sub {
3027     $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
3028       unless defined $_[0];
3029   };
3030 }
3031
3032 # FIXME deploy() currently does not accurately report sql errors
3033 # Will always return true while errors are warned
3034 sub deploy {
3035   my ($self, $schema, $type, $sqltargs, $dir) = @_;
3036   my $deploy = sub {
3037     my $line = shift;
3038     return if(!$line);
3039     return if($line =~ /^--/);
3040     # next if($line =~ /^DROP/m);
3041     return if($line =~ /^BEGIN TRANSACTION/m);
3042     return if($line =~ /^COMMIT/m);
3043     return if $line =~ /^\s+$/; # skip whitespace only
3044     $self->_query_start($line);
3045     try {
3046       # do a dbh_do cycle here, as we need some error checking in
3047       # place (even though we will ignore errors)
3048       $self->dbh_do (sub { $_[1]->do($line) });
3049     } catch {
3050       carp qq{$_ (running "${line}")};
3051     };
3052     $self->_query_end($line);
3053   };
3054   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
3055   if (@statements > 1) {
3056     foreach my $statement (@statements) {
3057       $deploy->( $statement );
3058     }
3059   }
3060   elsif (@statements == 1) {
3061     # split on single line comments and end of statements
3062     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
3063       $deploy->( $line );
3064     }
3065   }
3066 }
3067
3068 =head2 datetime_parser
3069
3070 Returns the datetime parser class
3071
3072 =cut
3073
3074 sub datetime_parser {
3075   my $self = shift;
3076   return $self->{datetime_parser} ||= do {
3077     $self->build_datetime_parser(@_);
3078   };
3079 }
3080
3081 =head2 datetime_parser_type
3082
3083 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
3084
3085 =head2 build_datetime_parser
3086
3087 See L</datetime_parser>
3088
3089 =cut
3090
3091 sub build_datetime_parser {
3092   my $self = shift;
3093   my $type = $self->datetime_parser_type(@_);
3094   return $type;
3095 }
3096
3097
3098 =head2 is_replicating
3099
3100 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
3101 replicate from a master database.  Default is undef, which is the result
3102 returned by databases that don't support replication.
3103
3104 =cut
3105
3106 sub is_replicating {
3107     return;
3108
3109 }
3110
3111 =head2 lag_behind_master
3112
3113 Returns a number that represents a certain amount of lag behind a master db
3114 when a given storage is replicating.  The number is database dependent, but
3115 starts at zero and increases with the amount of lag. Default in undef
3116
3117 =cut
3118
3119 sub lag_behind_master {
3120     return;
3121 }
3122
3123 =head2 relname_to_table_alias
3124
3125 =over 4
3126
3127 =item Arguments: $relname, $join_count
3128
3129 =item Return Value: $alias
3130
3131 =back
3132
3133 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
3134 queries.
3135
3136 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
3137 way these aliases are named.
3138
3139 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
3140 otherwise C<"$relname">.
3141
3142 =cut
3143
3144 sub relname_to_table_alias {
3145   my ($self, $relname, $join_count) = @_;
3146
3147   my $alias = ($join_count && $join_count > 1 ?
3148     join('_', $relname, $join_count) : $relname);
3149
3150   return $alias;
3151 }
3152
3153 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
3154 # version and it may be necessary to amend or override it for a specific storage
3155 # if such binds are necessary.
3156 sub _max_column_bytesize {
3157   my ($self, $attr) = @_;
3158
3159   my $max_size;
3160
3161   if ($attr->{sqlt_datatype}) {
3162     my $data_type = lc($attr->{sqlt_datatype});
3163
3164     if ($attr->{sqlt_size}) {
3165
3166       # String/sized-binary types
3167       if ($data_type =~ /^(?:
3168           l? (?:var)? char(?:acter)? (?:\s*varying)?
3169             |
3170           (?:var)? binary (?:\s*varying)?
3171             |
3172           raw
3173         )\b/x
3174       ) {
3175         $max_size = $attr->{sqlt_size};
3176       }
3177       # Other charset/unicode types, assume scale of 4
3178       elsif ($data_type =~ /^(?:
3179           national \s* character (?:\s*varying)?
3180             |
3181           nchar
3182             |
3183           univarchar
3184             |
3185           nvarchar
3186         )\b/x
3187       ) {
3188         $max_size = $attr->{sqlt_size} * 4;
3189       }
3190     }
3191
3192     if (!$max_size and !$self->_is_lob_type($data_type)) {
3193       $max_size = 100 # for all other (numeric?) datatypes
3194     }
3195   }
3196
3197   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
3198 }
3199
3200 # Determine if a data_type is some type of BLOB
3201 sub _is_lob_type {
3202   my ($self, $data_type) = @_;
3203   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
3204     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
3205                                   |varchar|character\s*varying|nvarchar
3206                                   |national\s*character\s*varying))?\z/xi);
3207 }
3208
3209 sub _is_binary_lob_type {
3210   my ($self, $data_type) = @_;
3211   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
3212     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
3213 }
3214
3215 sub _is_text_lob_type {
3216   my ($self, $data_type) = @_;
3217   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
3218     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
3219                         |national\s*character\s*varying))\z/xi);
3220 }
3221
3222 # Determine if a data_type is some type of a binary type
3223 sub _is_binary_type {
3224   my ($self, $data_type) = @_;
3225   $data_type && ($self->_is_binary_lob_type($data_type)
3226     || $data_type =~ /(?:var)?(?:binary|bit|graphic)(?:\s*varying)?/i);
3227 }
3228
3229 1;
3230
3231 =head1 USAGE NOTES
3232
3233 =head2 DBIx::Class and AutoCommit
3234
3235 DBIx::Class can do some wonderful magic with handling exceptions,
3236 disconnections, and transactions when you use C<< AutoCommit => 1 >>
3237 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
3238 transaction support.
3239
3240 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
3241 in an assumed transaction between commits, and you're telling us you'd
3242 like to manage that manually.  A lot of the magic protections offered by
3243 this module will go away.  We can't protect you from exceptions due to database
3244 disconnects because we don't know anything about how to restart your
3245 transactions.  You're on your own for handling all sorts of exceptional
3246 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
3247 be with raw DBI.
3248
3249
3250 =head1 AUTHOR AND CONTRIBUTORS
3251
3252 See L<AUTHOR|DBIx::Class/AUTHOR> and L<CONTRIBUTORS|DBIx::Class/CONTRIBUTORS> in DBIx::Class
3253
3254 =head1 LICENSE
3255
3256 You may distribute this code under the same terms as Perl itself.
3257
3258 =cut