23a7f71e30002658cd4496e8bb34fa1e9d363fbf
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use Scalar::Util qw/refaddr weaken reftype blessed/;
12 use List::Util qw/first/;
13 use Sub::Name 'subname';
14 use Context::Preserve 'preserve_context';
15 use Try::Tiny;
16 use overload ();
17 use Data::Compare (); # no imports!!! guard against insane architecture
18 use namespace::clean;
19
20 # default cursor class, overridable in connect_info attributes
21 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
22
23 __PACKAGE__->mk_group_accessors('inherited' => qw/
24   sql_limit_dialect sql_quote_char sql_name_sep
25 /);
26
27 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
28
29 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
30 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
31
32 __PACKAGE__->sql_name_sep('.');
33
34 __PACKAGE__->mk_group_accessors('simple' => qw/
35   _connect_info _dbic_connect_attributes _driver_determined
36   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
37   _perform_autoinc_retrieval _autoinc_supplied_for_op
38 /);
39
40 # the values for these accessors are picked out (and deleted) from
41 # the attribute hashref passed to connect_info
42 my @storage_options = qw/
43   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
44   disable_sth_caching unsafe auto_savepoint
45 /;
46 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
47
48
49 # capability definitions, using a 2-tiered accessor system
50 # The rationale is:
51 #
52 # A driver/user may define _use_X, which blindly without any checks says:
53 # "(do not) use this capability", (use_dbms_capability is an "inherited"
54 # type accessor)
55 #
56 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
57 # accessor, which in turn calls _determine_supports_X, and stores the return
58 # in a special slot on the storage object, which is wiped every time a $dbh
59 # reconnection takes place (it is not guaranteed that upon reconnection we
60 # will get the same rdbms version). _determine_supports_X does not need to
61 # exist on a driver, as we ->can for it before calling.
62
63 my @capabilities = (qw/
64   insert_returning
65   insert_returning_bound
66
67   multicolumn_in
68
69   placeholders
70   typeless_placeholders
71
72   join_optimizer
73 /);
74 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
75 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
76
77 # on by default, not strictly a capability (pending rewrite)
78 __PACKAGE__->_use_join_optimizer (1);
79 sub _determine_supports_join_optimizer { 1 };
80
81 # Each of these methods need _determine_driver called before itself
82 # in order to function reliably. This is a purely DRY optimization
83 #
84 # get_(use)_dbms_capability need to be called on the correct Storage
85 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
86 # _determine_supports_X which obv. needs a correct driver as well
87 my @rdbms_specific_methods = qw/
88   sqlt_type
89   deployment_statements
90
91   sql_maker
92   cursor_class
93
94   build_datetime_parser
95   datetime_parser_type
96
97   txn_begin
98
99   insert
100   insert_bulk
101   update
102   delete
103   select
104   select_single
105
106   with_deferred_fk_checks
107
108   get_use_dbms_capability
109   get_dbms_capability
110
111   _server_info
112   _get_server_version
113 /;
114
115 for my $meth (@rdbms_specific_methods) {
116
117   my $orig = __PACKAGE__->can ($meth)
118     or die "$meth is not a ::Storage::DBI method!";
119
120   no strict qw/refs/;
121   no warnings qw/redefine/;
122   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
123     if (
124       # only fire when invoked on an instance, a valid class-based invocation
125       # would e.g. be setting a default for an inherited accessor
126       ref $_[0]
127         and
128       ! $_[0]->{_driver_determined}
129         and
130       ! $_[0]->{_in_determine_driver}
131         and
132       # Only try to determine stuff if we have *something* that either is or can
133       # provide a DSN. Allows for bare $schema's generated with a plain ->connect()
134       # to still be marginally useful
135       $_[0]->_dbi_connect_info->[0]
136     ) {
137       $_[0]->_determine_driver;
138
139       # This for some reason crashes and burns on perl 5.8.1
140       # IFF the method ends up throwing an exception
141       #goto $_[0]->can ($meth);
142
143       my $cref = $_[0]->can ($meth);
144       goto $cref;
145     }
146
147     goto $orig;
148   };
149 }
150
151 =head1 NAME
152
153 DBIx::Class::Storage::DBI - DBI storage handler
154
155 =head1 SYNOPSIS
156
157   my $schema = MySchema->connect('dbi:SQLite:my.db');
158
159   $schema->storage->debug(1);
160
161   my @stuff = $schema->storage->dbh_do(
162     sub {
163       my ($storage, $dbh, @args) = @_;
164       $dbh->do("DROP TABLE authors");
165     },
166     @column_list
167   );
168
169   $schema->resultset('Book')->search({
170      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
171   });
172
173 =head1 DESCRIPTION
174
175 This class represents the connection to an RDBMS via L<DBI>.  See
176 L<DBIx::Class::Storage> for general information.  This pod only
177 documents DBI-specific methods and behaviors.
178
179 =head1 METHODS
180
181 =cut
182
183 sub new {
184   my $new = shift->next::method(@_);
185
186   $new->_sql_maker_opts({});
187   $new->_dbh_details({});
188   $new->{_in_do_block} = 0;
189
190   # read below to see what this does
191   $new->_arm_global_destructor;
192
193   $new;
194 }
195
196 # This is hack to work around perl shooting stuff in random
197 # order on exit(). If we do not walk the remaining storage
198 # objects in an END block, there is a *small but real* chance
199 # of a fork()ed child to kill the parent's shared DBI handle,
200 # *before perl reaches the DESTROY in this package*
201 # Yes, it is ugly and effective.
202 # Additionally this registry is used by the CLONE method to
203 # make sure no handles are shared between threads
204 {
205   my %seek_and_destroy;
206
207   sub _arm_global_destructor {
208
209     # quick "garbage collection" pass - prevents the registry
210     # from slowly growing with a bunch of undef-valued keys
211     defined $seek_and_destroy{$_} or delete $seek_and_destroy{$_}
212       for keys %seek_and_destroy;
213
214     weaken (
215       $seek_and_destroy{ refaddr($_[0]) } = $_[0]
216     );
217   }
218
219   END {
220     local $?; # just in case the DBI destructor changes it somehow
221
222     # destroy just the object if not native to this process
223     $_->_verify_pid for (grep
224       { defined $_ }
225       values %seek_and_destroy
226     );
227   }
228
229   sub CLONE {
230     # As per DBI's recommendation, DBIC disconnects all handles as
231     # soon as possible (DBIC will reconnect only on demand from within
232     # the thread)
233     my @instances = grep { defined $_ } values %seek_and_destroy;
234     %seek_and_destroy = ();
235
236     for (@instances) {
237       $_->_dbh(undef);
238
239       $_->transaction_depth(0);
240       $_->savepoints([]);
241
242       # properly renumber existing refs
243       $_->_arm_global_destructor
244     }
245   }
246 }
247
248 sub DESTROY {
249   my $self = shift;
250
251   # some databases spew warnings on implicit disconnect
252   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
253   local $SIG{__WARN__} = sub {};
254   $self->_dbh(undef);
255
256   # this op is necessary, since the very last perl runtime statement
257   # triggers a global destruction shootout, and the $SIG localization
258   # may very well be destroyed before perl actually gets to do the
259   # $dbh undef
260   1;
261 }
262
263 # handle pid changes correctly - do not destroy parent's connection
264 sub _verify_pid {
265   my $self = shift;
266
267   my $pid = $self->_conn_pid;
268   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
269     $dbh->{InactiveDestroy} = 1;
270     $self->_dbh(undef);
271     $self->transaction_depth(0);
272     $self->savepoints([]);
273   }
274
275   return;
276 }
277
278 =head2 connect_info
279
280 This method is normally called by L<DBIx::Class::Schema/connection>, which
281 encapsulates its argument list in an arrayref before passing them here.
282
283 The argument list may contain:
284
285 =over
286
287 =item *
288
289 The same 4-element argument set one would normally pass to
290 L<DBI/connect>, optionally followed by
291 L<extra attributes|/DBIx::Class specific connection attributes>
292 recognized by DBIx::Class:
293
294   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
295
296 =item *
297
298 A single code reference which returns a connected
299 L<DBI database handle|DBI/connect> optionally followed by
300 L<extra attributes|/DBIx::Class specific connection attributes> recognized
301 by DBIx::Class:
302
303   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
304
305 =item *
306
307 A single hashref with all the attributes and the dsn/user/password
308 mixed together:
309
310   $connect_info_args = [{
311     dsn => $dsn,
312     user => $user,
313     password => $pass,
314     %dbi_attributes,
315     %extra_attributes,
316   }];
317
318   $connect_info_args = [{
319     dbh_maker => sub { DBI->connect (...) },
320     %dbi_attributes,
321     %extra_attributes,
322   }];
323
324 This is particularly useful for L<Catalyst> based applications, allowing the
325 following config (L<Config::General> style):
326
327   <Model::DB>
328     schema_class   App::DB
329     <connect_info>
330       dsn          dbi:mysql:database=test
331       user         testuser
332       password     TestPass
333       AutoCommit   1
334     </connect_info>
335   </Model::DB>
336
337 The C<dsn>/C<user>/C<password> combination can be substituted by the
338 C<dbh_maker> key whose value is a coderef that returns a connected
339 L<DBI database handle|DBI/connect>
340
341 =back
342
343 Please note that the L<DBI> docs recommend that you always explicitly
344 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
345 recommends that it be set to I<1>, and that you perform transactions
346 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
347 to I<1> if you do not do explicitly set it to zero.  This is the default
348 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
349
350 =head3 DBIx::Class specific connection attributes
351
352 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
353 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
354 the following connection options. These options can be mixed in with your other
355 L<DBI> connection attributes, or placed in a separate hashref
356 (C<\%extra_attributes>) as shown above.
357
358 Every time C<connect_info> is invoked, any previous settings for
359 these options will be cleared before setting the new ones, regardless of
360 whether any options are specified in the new C<connect_info>.
361
362
363 =over
364
365 =item on_connect_do
366
367 Specifies things to do immediately after connecting or re-connecting to
368 the database.  Its value may contain:
369
370 =over
371
372 =item a scalar
373
374 This contains one SQL statement to execute.
375
376 =item an array reference
377
378 This contains SQL statements to execute in order.  Each element contains
379 a string or a code reference that returns a string.
380
381 =item a code reference
382
383 This contains some code to execute.  Unlike code references within an
384 array reference, its return value is ignored.
385
386 =back
387
388 =item on_disconnect_do
389
390 Takes arguments in the same form as L</on_connect_do> and executes them
391 immediately before disconnecting from the database.
392
393 Note, this only runs if you explicitly call L</disconnect> on the
394 storage object.
395
396 =item on_connect_call
397
398 A more generalized form of L</on_connect_do> that calls the specified
399 C<connect_call_METHOD> methods in your storage driver.
400
401   on_connect_do => 'select 1'
402
403 is equivalent to:
404
405   on_connect_call => [ [ do_sql => 'select 1' ] ]
406
407 Its values may contain:
408
409 =over
410
411 =item a scalar
412
413 Will call the C<connect_call_METHOD> method.
414
415 =item a code reference
416
417 Will execute C<< $code->($storage) >>
418
419 =item an array reference
420
421 Each value can be a method name or code reference.
422
423 =item an array of arrays
424
425 For each array, the first item is taken to be the C<connect_call_> method name
426 or code reference, and the rest are parameters to it.
427
428 =back
429
430 Some predefined storage methods you may use:
431
432 =over
433
434 =item do_sql
435
436 Executes a SQL string or a code reference that returns a SQL string. This is
437 what L</on_connect_do> and L</on_disconnect_do> use.
438
439 It can take:
440
441 =over
442
443 =item a scalar
444
445 Will execute the scalar as SQL.
446
447 =item an arrayref
448
449 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
450 attributes hashref and bind values.
451
452 =item a code reference
453
454 Will execute C<< $code->($storage) >> and execute the return array refs as
455 above.
456
457 =back
458
459 =item datetime_setup
460
461 Execute any statements necessary to initialize the database session to return
462 and accept datetime/timestamp values used with
463 L<DBIx::Class::InflateColumn::DateTime>.
464
465 Only necessary for some databases, see your specific storage driver for
466 implementation details.
467
468 =back
469
470 =item on_disconnect_call
471
472 Takes arguments in the same form as L</on_connect_call> and executes them
473 immediately before disconnecting from the database.
474
475 Calls the C<disconnect_call_METHOD> methods as opposed to the
476 C<connect_call_METHOD> methods called by L</on_connect_call>.
477
478 Note, this only runs if you explicitly call L</disconnect> on the
479 storage object.
480
481 =item disable_sth_caching
482
483 If set to a true value, this option will disable the caching of
484 statement handles via L<DBI/prepare_cached>.
485
486 =item limit_dialect
487
488 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
489 default L</sql_limit_dialect> setting of the storage (if any). For a list
490 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
491
492 =item quote_names
493
494 When true automatically sets L</quote_char> and L</name_sep> to the characters
495 appropriate for your particular RDBMS. This option is preferred over specifying
496 L</quote_char> directly.
497
498 =item quote_char
499
500 Specifies what characters to use to quote table and column names.
501
502 C<quote_char> expects either a single character, in which case is it
503 is placed on either side of the table/column name, or an arrayref of length
504 2 in which case the table/column name is placed between the elements.
505
506 For example under MySQL you should use C<< quote_char => '`' >>, and for
507 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
508
509 =item name_sep
510
511 This parameter is only useful in conjunction with C<quote_char>, and is used to
512 specify the character that separates elements (schemas, tables, columns) from
513 each other. If unspecified it defaults to the most commonly used C<.>.
514
515 =item unsafe
516
517 This Storage driver normally installs its own C<HandleError>, sets
518 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
519 all database handles, including those supplied by a coderef.  It does this
520 so that it can have consistent and useful error behavior.
521
522 If you set this option to a true value, Storage will not do its usual
523 modifications to the database handle's attributes, and instead relies on
524 the settings in your connect_info DBI options (or the values you set in
525 your connection coderef, in the case that you are connecting via coderef).
526
527 Note that your custom settings can cause Storage to malfunction,
528 especially if you set a C<HandleError> handler that suppresses exceptions
529 and/or disable C<RaiseError>.
530
531 =item auto_savepoint
532
533 If this option is true, L<DBIx::Class> will use savepoints when nesting
534 transactions, making it possible to recover from failure in the inner
535 transaction without having to abort all outer transactions.
536
537 =item cursor_class
538
539 Use this argument to supply a cursor class other than the default
540 L<DBIx::Class::Storage::DBI::Cursor>.
541
542 =back
543
544 Some real-life examples of arguments to L</connect_info> and
545 L<DBIx::Class::Schema/connect>
546
547   # Simple SQLite connection
548   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
549
550   # Connect via subref
551   ->connect_info([ sub { DBI->connect(...) } ]);
552
553   # Connect via subref in hashref
554   ->connect_info([{
555     dbh_maker => sub { DBI->connect(...) },
556     on_connect_do => 'alter session ...',
557   }]);
558
559   # A bit more complicated
560   ->connect_info(
561     [
562       'dbi:Pg:dbname=foo',
563       'postgres',
564       'my_pg_password',
565       { AutoCommit => 1 },
566       { quote_char => q{"} },
567     ]
568   );
569
570   # Equivalent to the previous example
571   ->connect_info(
572     [
573       'dbi:Pg:dbname=foo',
574       'postgres',
575       'my_pg_password',
576       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
577     ]
578   );
579
580   # Same, but with hashref as argument
581   # See parse_connect_info for explanation
582   ->connect_info(
583     [{
584       dsn         => 'dbi:Pg:dbname=foo',
585       user        => 'postgres',
586       password    => 'my_pg_password',
587       AutoCommit  => 1,
588       quote_char  => q{"},
589       name_sep    => q{.},
590     }]
591   );
592
593   # Subref + DBIx::Class-specific connection options
594   ->connect_info(
595     [
596       sub { DBI->connect(...) },
597       {
598           quote_char => q{`},
599           name_sep => q{@},
600           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
601           disable_sth_caching => 1,
602       },
603     ]
604   );
605
606
607
608 =cut
609
610 sub connect_info {
611   my ($self, $info) = @_;
612
613   return $self->_connect_info if !$info;
614
615   $self->_connect_info($info); # copy for _connect_info
616
617   $info = $self->_normalize_connect_info($info)
618     if ref $info eq 'ARRAY';
619
620   my %attrs = (
621     %{ $self->_default_dbi_connect_attributes || {} },
622     %{ $info->{attributes} || {} },
623   );
624
625   my @args = @{ $info->{arguments} };
626
627   if (keys %attrs and ref $args[0] ne 'CODE') {
628     carp_unique (
629         'You provided explicit AutoCommit => 0 in your connection_info. '
630       . 'This is almost universally a bad idea (see the footnotes of '
631       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
632       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
633       . 'this warning.'
634     ) if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
635
636     push @args, \%attrs if keys %attrs;
637   }
638
639   # this is the authoritative "always an arrayref" thing fed to DBI->connect
640   # OR a single-element coderef-based $dbh factory
641   $self->_dbi_connect_info(\@args);
642
643   # extract the individual storage options
644   for my $storage_opt (keys %{ $info->{storage_options} }) {
645     my $value = $info->{storage_options}{$storage_opt};
646
647     $self->$storage_opt($value);
648   }
649
650   # Extract the individual sqlmaker options
651   #
652   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
653   #  the new set of options
654   $self->_sql_maker(undef);
655   $self->_sql_maker_opts({});
656
657   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
658     my $value = $info->{sql_maker_options}{$sql_maker_opt};
659
660     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
661   }
662
663   # FIXME - dirty:
664   # save attributes in a separate accessor so they are always
665   # introspectable, even in case of a CODE $dbhmaker
666   $self->_dbic_connect_attributes (\%attrs);
667
668   return $self->_connect_info;
669 }
670
671 sub _dbi_connect_info {
672   my $self = shift;
673
674   return $self->{_dbi_connect_info} = $_[0]
675     if @_;
676
677   my $conninfo = $self->{_dbi_connect_info} || [];
678
679   # last ditch effort to grab a DSN
680   if ( ! defined $conninfo->[0] and $ENV{DBI_DSN} ) {
681     my @new_conninfo = @$conninfo;
682     $new_conninfo[0] = $ENV{DBI_DSN};
683     $conninfo = \@new_conninfo;
684   }
685
686   return $conninfo;
687 }
688
689
690 sub _normalize_connect_info {
691   my ($self, $info_arg) = @_;
692   my %info;
693
694   my @args = @$info_arg;  # take a shallow copy for further mutilation
695
696   # combine/pre-parse arguments depending on invocation style
697
698   my %attrs;
699   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
700     %attrs = %{ $args[1] || {} };
701     @args = $args[0];
702   }
703   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
704     %attrs = %{$args[0]};
705     @args = ();
706     if (my $code = delete $attrs{dbh_maker}) {
707       @args = $code;
708
709       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
710       if (@ignored) {
711         carp sprintf (
712             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
713           . "to the result of 'dbh_maker'",
714
715           join (', ', map { "'$_'" } (@ignored) ),
716         );
717       }
718     }
719     else {
720       @args = delete @attrs{qw/dsn user password/};
721     }
722   }
723   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
724     %attrs = (
725       % { $args[3] || {} },
726       % { $args[4] || {} },
727     );
728     @args = @args[0,1,2];
729   }
730
731   $info{arguments} = \@args;
732
733   my @storage_opts = grep exists $attrs{$_},
734     @storage_options, 'cursor_class';
735
736   @{ $info{storage_options} }{@storage_opts} =
737     delete @attrs{@storage_opts} if @storage_opts;
738
739   my @sql_maker_opts = grep exists $attrs{$_},
740     qw/limit_dialect quote_char name_sep quote_names/;
741
742   @{ $info{sql_maker_options} }{@sql_maker_opts} =
743     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
744
745   $info{attributes} = \%attrs if %attrs;
746
747   return \%info;
748 }
749
750 sub _default_dbi_connect_attributes () {
751   +{
752     AutoCommit => 1,
753     PrintError => 0,
754     RaiseError => 1,
755     ShowErrorStatement => 1,
756   };
757 }
758
759 =head2 on_connect_do
760
761 This method is deprecated in favour of setting via L</connect_info>.
762
763 =cut
764
765 =head2 on_disconnect_do
766
767 This method is deprecated in favour of setting via L</connect_info>.
768
769 =cut
770
771 sub _parse_connect_do {
772   my ($self, $type) = @_;
773
774   my $val = $self->$type;
775   return () if not defined $val;
776
777   my @res;
778
779   if (not ref($val)) {
780     push @res, [ 'do_sql', $val ];
781   } elsif (ref($val) eq 'CODE') {
782     push @res, $val;
783   } elsif (ref($val) eq 'ARRAY') {
784     push @res, map { [ 'do_sql', $_ ] } @$val;
785   } else {
786     $self->throw_exception("Invalid type for $type: ".ref($val));
787   }
788
789   return \@res;
790 }
791
792 =head2 dbh_do
793
794 Arguments: ($subref | $method_name), @extra_coderef_args?
795
796 Execute the given $subref or $method_name using the new exception-based
797 connection management.
798
799 The first two arguments will be the storage object that C<dbh_do> was called
800 on and a database handle to use.  Any additional arguments will be passed
801 verbatim to the called subref as arguments 2 and onwards.
802
803 Using this (instead of $self->_dbh or $self->dbh) ensures correct
804 exception handling and reconnection (or failover in future subclasses).
805
806 Your subref should have no side-effects outside of the database, as
807 there is the potential for your subref to be partially double-executed
808 if the database connection was stale/dysfunctional.
809
810 Example:
811
812   my @stuff = $schema->storage->dbh_do(
813     sub {
814       my ($storage, $dbh, @cols) = @_;
815       my $cols = join(q{, }, @cols);
816       $dbh->selectrow_array("SELECT $cols FROM foo");
817     },
818     @column_list
819   );
820
821 =cut
822
823 sub dbh_do {
824   my $self = shift;
825   my $run_target = shift; # either a coderef or a method name
826
827   # short circuit when we know there is no need for a runner
828   #
829   # FIXME - assumption may be wrong
830   # the rationale for the txn_depth check is that if this block is a part
831   # of a larger transaction, everything up to that point is screwed anyway
832   return $self->$run_target($self->_get_dbh, @_)
833     if $self->{_in_do_block} or $self->transaction_depth;
834
835   # take a ref instead of a copy, to preserve @_ aliasing
836   # semantics within the coderef, but only if needed
837   # (pseudoforking doesn't like this trick much)
838   my $args = @_ ? \@_ : [];
839
840   DBIx::Class::Storage::BlockRunner->new(
841     storage => $self,
842     wrap_txn => 0,
843     retry_handler => sub {
844       $_[0]->failed_attempt_count == 1
845         and
846       ! $_[0]->storage->connected
847     },
848   )->run(sub {
849     $self->$run_target ($self->_get_dbh, @$args )
850   });
851 }
852
853 sub txn_do {
854   $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
855   shift->next::method(@_);
856 }
857
858 =head2 disconnect
859
860 Our C<disconnect> method also performs a rollback first if the
861 database is not in C<AutoCommit> mode.
862
863 =cut
864
865 sub disconnect {
866   my ($self) = @_;
867
868   if( $self->_dbh ) {
869     my @actions;
870
871     push @actions, ( $self->on_disconnect_call || () );
872     push @actions, $self->_parse_connect_do ('on_disconnect_do');
873
874     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
875
876     # stops the "implicit rollback on disconnect" warning
877     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
878
879     %{ $self->_dbh->{CachedKids} } = ();
880     $self->_dbh->disconnect;
881     $self->_dbh(undef);
882   }
883 }
884
885 =head2 with_deferred_fk_checks
886
887 =over 4
888
889 =item Arguments: C<$coderef>
890
891 =item Return Value: The return value of $coderef
892
893 =back
894
895 Storage specific method to run the code ref with FK checks deferred or
896 in MySQL's case disabled entirely.
897
898 =cut
899
900 # Storage subclasses should override this
901 sub with_deferred_fk_checks {
902   my ($self, $sub) = @_;
903   $sub->();
904 }
905
906 =head2 connected
907
908 =over
909
910 =item Arguments: none
911
912 =item Return Value: 1|0
913
914 =back
915
916 Verifies that the current database handle is active and ready to execute
917 an SQL statement (e.g. the connection did not get stale, server is still
918 answering, etc.) This method is used internally by L</dbh>.
919
920 =cut
921
922 sub connected {
923   my $self = shift;
924   return 0 unless $self->_seems_connected;
925
926   #be on the safe side
927   local $self->_dbh->{RaiseError} = 1;
928
929   return $self->_ping;
930 }
931
932 sub _seems_connected {
933   my $self = shift;
934
935   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
936
937   my $dbh = $self->_dbh
938     or return 0;
939
940   return $dbh->FETCH('Active');
941 }
942
943 sub _ping {
944   my $self = shift;
945
946   my $dbh = $self->_dbh or return 0;
947
948   return $dbh->ping;
949 }
950
951 sub ensure_connected {
952   my ($self) = @_;
953
954   unless ($self->connected) {
955     $self->_populate_dbh;
956   }
957 }
958
959 =head2 dbh
960
961 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
962 is guaranteed to be healthy by implicitly calling L</connected>, and if
963 necessary performing a reconnection before returning. Keep in mind that this
964 is very B<expensive> on some database engines. Consider using L</dbh_do>
965 instead.
966
967 =cut
968
969 sub dbh {
970   my ($self) = @_;
971
972   if (not $self->_dbh) {
973     $self->_populate_dbh;
974   } else {
975     $self->ensure_connected;
976   }
977   return $self->_dbh;
978 }
979
980 # this is the internal "get dbh or connect (don't check)" method
981 sub _get_dbh {
982   my $self = shift;
983   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
984   $self->_populate_dbh unless $self->_dbh;
985   return $self->_dbh;
986 }
987
988 sub sql_maker {
989   my ($self) = @_;
990   unless ($self->_sql_maker) {
991     my $sql_maker_class = $self->sql_maker_class;
992
993     my %opts = %{$self->_sql_maker_opts||{}};
994     my $dialect =
995       $opts{limit_dialect}
996         ||
997       $self->sql_limit_dialect
998         ||
999       do {
1000         my $s_class = (ref $self) || $self;
1001         carp_unique (
1002           "Your storage class ($s_class) does not set sql_limit_dialect and you "
1003         . 'have not supplied an explicit limit_dialect in your connection_info. '
1004         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1005         . 'databases but can be (and often is) painfully slow. '
1006         . "Please file an RT ticket against '$s_class'"
1007         ) if $self->_dbi_connect_info->[0];
1008
1009         'GenericSubQ';
1010       }
1011     ;
1012
1013     my ($quote_char, $name_sep);
1014
1015     if ($opts{quote_names}) {
1016       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1017         my $s_class = (ref $self) || $self;
1018         carp_unique (
1019           "You requested 'quote_names' but your storage class ($s_class) does "
1020         . 'not explicitly define a default sql_quote_char and you have not '
1021         . 'supplied a quote_char as part of your connection_info. DBIC will '
1022         .q{default to the ANSI SQL standard quote '"', which works most of }
1023         . "the time. Please file an RT ticket against '$s_class'."
1024         );
1025
1026         '"'; # RV
1027       };
1028
1029       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1030     }
1031
1032     $self->_sql_maker($sql_maker_class->new(
1033       bindtype=>'columns',
1034       array_datatypes => 1,
1035       limit_dialect => $dialect,
1036       ($quote_char ? (quote_char => $quote_char) : ()),
1037       name_sep => ($name_sep || '.'),
1038       %opts,
1039     ));
1040   }
1041   return $self->_sql_maker;
1042 }
1043
1044 # nothing to do by default
1045 sub _rebless {}
1046 sub _init {}
1047
1048 sub _populate_dbh {
1049   my ($self) = @_;
1050
1051   $self->_dbh(undef); # in case ->connected failed we might get sent here
1052   $self->_dbh_details({}); # reset everything we know
1053
1054   $self->_dbh($self->_connect);
1055
1056   $self->_conn_pid($$) unless DBIx::Class::_ENV_::BROKEN_FORK; # on win32 these are in fact threads
1057
1058   $self->_determine_driver;
1059
1060   # Always set the transaction depth on connect, since
1061   #  there is no transaction in progress by definition
1062   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1063
1064   $self->_run_connection_actions unless $self->{_in_determine_driver};
1065 }
1066
1067 sub _run_connection_actions {
1068   my $self = shift;
1069   my @actions;
1070
1071   push @actions, ( $self->on_connect_call || () );
1072   push @actions, $self->_parse_connect_do ('on_connect_do');
1073
1074   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1075 }
1076
1077
1078
1079 sub set_use_dbms_capability {
1080   $_[0]->set_inherited ($_[1], $_[2]);
1081 }
1082
1083 sub get_use_dbms_capability {
1084   my ($self, $capname) = @_;
1085
1086   my $use = $self->get_inherited ($capname);
1087   return defined $use
1088     ? $use
1089     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1090   ;
1091 }
1092
1093 sub set_dbms_capability {
1094   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1095 }
1096
1097 sub get_dbms_capability {
1098   my ($self, $capname) = @_;
1099
1100   my $cap = $self->_dbh_details->{capability}{$capname};
1101
1102   unless (defined $cap) {
1103     if (my $meth = $self->can ("_determine$capname")) {
1104       $cap = $self->$meth ? 1 : 0;
1105     }
1106     else {
1107       $cap = 0;
1108     }
1109
1110     $self->set_dbms_capability ($capname, $cap);
1111   }
1112
1113   return $cap;
1114 }
1115
1116 sub _server_info {
1117   my $self = shift;
1118
1119   my $info;
1120   unless ($info = $self->_dbh_details->{info}) {
1121
1122     $info = {};
1123
1124     my $server_version = try {
1125       $self->_get_server_version
1126     } catch {
1127       # driver determination *may* use this codepath
1128       # in which case we must rethrow
1129       $self->throw_exception($_) if $self->{_in_determine_driver};
1130
1131       # $server_version on failure
1132       undef;
1133     };
1134
1135     if (defined $server_version) {
1136       $info->{dbms_version} = $server_version;
1137
1138       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1139       my @verparts = split (/\./, $numeric_version);
1140       if (
1141         @verparts
1142           &&
1143         $verparts[0] <= 999
1144       ) {
1145         # consider only up to 3 version parts, iff not more than 3 digits
1146         my @use_parts;
1147         while (@verparts && @use_parts < 3) {
1148           my $p = shift @verparts;
1149           last if $p > 999;
1150           push @use_parts, $p;
1151         }
1152         push @use_parts, 0 while @use_parts < 3;
1153
1154         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1155       }
1156     }
1157
1158     $self->_dbh_details->{info} = $info;
1159   }
1160
1161   return $info;
1162 }
1163
1164 sub _get_server_version {
1165   shift->_dbh_get_info('SQL_DBMS_VER');
1166 }
1167
1168 sub _dbh_get_info {
1169   my ($self, $info) = @_;
1170
1171   if ($info =~ /[^0-9]/) {
1172     require DBI::Const::GetInfoType;
1173     $info = $DBI::Const::GetInfoType::GetInfoType{$info};
1174     $self->throw_exception("Info type '$_[1]' not provided by DBI::Const::GetInfoType")
1175       unless defined $info;
1176   }
1177
1178   $self->_get_dbh->get_info($info);
1179 }
1180
1181 sub _describe_connection {
1182   require DBI::Const::GetInfoReturn;
1183
1184   my $self = shift;
1185
1186   my $drv;
1187   try {
1188     $drv = $self->_extract_driver_from_connect_info;
1189     $self->ensure_connected;
1190   };
1191
1192   $drv = "DBD::$drv" if $drv;
1193
1194   my $res = {
1195     DBIC_DSN => $self->_dbi_connect_info->[0],
1196     DBI_VER => DBI->VERSION,
1197     DBIC_VER => DBIx::Class->VERSION,
1198     DBIC_DRIVER => ref $self,
1199     $drv ? (
1200       DBD => $drv,
1201       DBD_VER => try { $drv->VERSION },
1202     ) : (),
1203   };
1204
1205   # try to grab data even if we never managed to connect
1206   # will cover us in cases of an oddly broken half-connect
1207   for my $inf (
1208     #keys %DBI::Const::GetInfoType::GetInfoType,
1209     qw/
1210       SQL_CURSOR_COMMIT_BEHAVIOR
1211       SQL_CURSOR_ROLLBACK_BEHAVIOR
1212       SQL_CURSOR_SENSITIVITY
1213       SQL_DATA_SOURCE_NAME
1214       SQL_DBMS_NAME
1215       SQL_DBMS_VER
1216       SQL_DEFAULT_TXN_ISOLATION
1217       SQL_DM_VER
1218       SQL_DRIVER_NAME
1219       SQL_DRIVER_ODBC_VER
1220       SQL_DRIVER_VER
1221       SQL_EXPRESSIONS_IN_ORDERBY
1222       SQL_GROUP_BY
1223       SQL_IDENTIFIER_CASE
1224       SQL_IDENTIFIER_QUOTE_CHAR
1225       SQL_MAX_CATALOG_NAME_LEN
1226       SQL_MAX_COLUMN_NAME_LEN
1227       SQL_MAX_IDENTIFIER_LEN
1228       SQL_MAX_TABLE_NAME_LEN
1229       SQL_MULTIPLE_ACTIVE_TXN
1230       SQL_MULT_RESULT_SETS
1231       SQL_NEED_LONG_DATA_LEN
1232       SQL_NON_NULLABLE_COLUMNS
1233       SQL_ODBC_VER
1234       SQL_QUALIFIER_NAME_SEPARATOR
1235       SQL_QUOTED_IDENTIFIER_CASE
1236       SQL_TXN_CAPABLE
1237       SQL_TXN_ISOLATION_OPTION
1238     /
1239   ) {
1240     # some drivers barf on things they do not know about instead
1241     # of returning undef
1242     my $v = try { $self->_dbh_get_info($inf) };
1243     next unless defined $v;
1244
1245     #my $key = sprintf( '%s(%s)', $inf, $DBI::Const::GetInfoType::GetInfoType{$inf} );
1246     my $expl = DBI::Const::GetInfoReturn::Explain($inf, $v);
1247     $res->{$inf} = DBI::Const::GetInfoReturn::Format($inf, $v) . ( $expl ? " ($expl)" : '' );
1248   }
1249
1250   $res;
1251 }
1252
1253 sub _determine_driver {
1254   my ($self) = @_;
1255
1256   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1257     my $started_connected = 0;
1258     local $self->{_in_determine_driver} = 1;
1259
1260     if (ref($self) eq __PACKAGE__) {
1261       my $driver;
1262       if ($self->_dbh) { # we are connected
1263         $driver = $self->_dbh->{Driver}{Name};
1264         $started_connected = 1;
1265       }
1266       else {
1267         $driver = $self->_extract_driver_from_connect_info;
1268       }
1269
1270       if ($driver) {
1271         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1272         if ($self->load_optional_class($storage_class)) {
1273           mro::set_mro($storage_class, 'c3');
1274           bless $self, $storage_class;
1275           $self->_rebless();
1276         }
1277         else {
1278           $self->_warn_undetermined_driver(
1279             'This version of DBIC does not yet seem to supply a driver for '
1280           . "your particular RDBMS and/or connection method ('$driver')."
1281           );
1282         }
1283       }
1284       else {
1285         $self->_warn_undetermined_driver(
1286           'Unable to extract a driver name from connect info - this '
1287         . 'should not have happened.'
1288         );
1289       }
1290     }
1291
1292     $self->_driver_determined(1);
1293
1294     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1295
1296     if ($self->can('source_bind_attributes')) {
1297       $self->throw_exception(
1298         "Your storage subclass @{[ ref $self ]} provides (or inherits) the method "
1299       . 'source_bind_attributes() for which support has been removed as of Jan 2013. '
1300       . 'If you are not sure how to proceed please contact the development team via '
1301       . 'http://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT'
1302       );
1303     }
1304
1305     $self->_init; # run driver-specific initializations
1306
1307     $self->_run_connection_actions
1308         if !$started_connected && defined $self->_dbh;
1309   }
1310 }
1311
1312 sub _extract_driver_from_connect_info {
1313   my $self = shift;
1314
1315   my $drv;
1316
1317   # if connect_info is a CODEREF, we have no choice but to connect
1318   if (
1319     ref $self->_dbi_connect_info->[0]
1320       and
1321     reftype $self->_dbi_connect_info->[0] eq 'CODE'
1322   ) {
1323     $self->_populate_dbh;
1324     $drv = $self->_dbh->{Driver}{Name};
1325   }
1326   else {
1327     # try to use dsn to not require being connected, the driver may still
1328     # force a connection later in _rebless to determine version
1329     # (dsn may not be supplied at all if all we do is make a mock-schema)
1330     ($drv) = ($self->_dbi_connect_info->[0] || '') =~ /^dbi:([^:]+):/i;
1331     $drv ||= $ENV{DBI_DRIVER};
1332   }
1333
1334   return $drv;
1335 }
1336
1337 sub _determine_connector_driver {
1338   my ($self, $conn) = @_;
1339
1340   my $dbtype = $self->_dbh_get_info('SQL_DBMS_NAME');
1341
1342   if (not $dbtype) {
1343     $self->_warn_undetermined_driver(
1344       'Unable to retrieve RDBMS type (SQL_DBMS_NAME) of the engine behind your '
1345     . "$conn connector - this should not have happened."
1346     );
1347     return;
1348   }
1349
1350   $dbtype =~ s/\W/_/gi;
1351
1352   my $subclass = "DBIx::Class::Storage::DBI::${conn}::${dbtype}";
1353   return if $self->isa($subclass);
1354
1355   if ($self->load_optional_class($subclass)) {
1356     bless $self, $subclass;
1357     $self->_rebless;
1358   }
1359   else {
1360     $self->_warn_undetermined_driver(
1361       'This version of DBIC does not yet seem to supply a driver for '
1362     . "your particular RDBMS and/or connection method ('$conn/$dbtype')."
1363     );
1364   }
1365 }
1366
1367 sub _warn_undetermined_driver {
1368   my ($self, $msg) = @_;
1369
1370   require Data::Dumper::Concise;
1371
1372   carp_once ($msg . ' While we will attempt to continue anyway, the results '
1373   . 'are likely to be underwhelming. Please upgrade DBIC, and if this message '
1374   . "does not go away, file a bugreport including the following info:\n"
1375   . Data::Dumper::Concise::Dumper($self->_describe_connection)
1376   );
1377 }
1378
1379 sub _do_connection_actions {
1380   my $self          = shift;
1381   my $method_prefix = shift;
1382   my $call          = shift;
1383
1384   if (not ref($call)) {
1385     my $method = $method_prefix . $call;
1386     $self->$method(@_);
1387   } elsif (ref($call) eq 'CODE') {
1388     $self->$call(@_);
1389   } elsif (ref($call) eq 'ARRAY') {
1390     if (ref($call->[0]) ne 'ARRAY') {
1391       $self->_do_connection_actions($method_prefix, $_) for @$call;
1392     } else {
1393       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1394     }
1395   } else {
1396     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1397   }
1398
1399   return $self;
1400 }
1401
1402 sub connect_call_do_sql {
1403   my $self = shift;
1404   $self->_do_query(@_);
1405 }
1406
1407 sub disconnect_call_do_sql {
1408   my $self = shift;
1409   $self->_do_query(@_);
1410 }
1411
1412 # override in db-specific backend when necessary
1413 sub connect_call_datetime_setup { 1 }
1414
1415 sub _do_query {
1416   my ($self, $action) = @_;
1417
1418   if (ref $action eq 'CODE') {
1419     $action = $action->($self);
1420     $self->_do_query($_) foreach @$action;
1421   }
1422   else {
1423     # Most debuggers expect ($sql, @bind), so we need to exclude
1424     # the attribute hash which is the second argument to $dbh->do
1425     # furthermore the bind values are usually to be presented
1426     # as named arrayref pairs, so wrap those here too
1427     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1428     my $sql = shift @do_args;
1429     my $attrs = shift @do_args;
1430     my @bind = map { [ undef, $_ ] } @do_args;
1431
1432     $self->dbh_do(sub {
1433       $_[0]->_query_start($sql, \@bind);
1434       $_[1]->do($sql, $attrs, @do_args);
1435       $_[0]->_query_end($sql, \@bind);
1436     });
1437   }
1438
1439   return $self;
1440 }
1441
1442 sub _connect {
1443   my $self = shift;
1444
1445   my $info = $self->_dbi_connect_info;
1446
1447   $self->throw_exception("You did not provide any connection_info")
1448     unless defined $info->[0];
1449
1450   my ($old_connect_via, $dbh);
1451
1452   local $DBI::connect_via = 'connect' if $INC{'Apache/DBI.pm'} && $ENV{MOD_PERL};
1453
1454   # this odd anonymous coderef dereference is in fact really
1455   # necessary to avoid the unwanted effect described in perl5
1456   # RT#75792
1457   #
1458   # in addition the coderef itself can't reside inside the try{} block below
1459   # as it somehow triggers a leak under perl -d
1460   my $dbh_error_handler_installer = sub {
1461     weaken (my $weak_self = $_[0]);
1462
1463     # the coderef is blessed so we can distinguish it from externally
1464     # supplied handles (which must be preserved)
1465     $_[1]->{HandleError} = bless sub {
1466       if ($weak_self) {
1467         $weak_self->throw_exception("DBI Exception: $_[0]");
1468       }
1469       else {
1470         # the handler may be invoked by something totally out of
1471         # the scope of DBIC
1472         DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1473       }
1474     }, '__DBIC__DBH__ERROR__HANDLER__';
1475   };
1476
1477   try {
1478     if(ref $info->[0] eq 'CODE') {
1479       $dbh = $info->[0]->();
1480     }
1481     else {
1482       require DBI;
1483       $dbh = DBI->connect(@$info);
1484     }
1485
1486     die $DBI::errstr unless $dbh;
1487
1488     die sprintf ("%s fresh DBI handle with a *false* 'Active' attribute. "
1489       . 'This handle is disconnected as far as DBIC is concerned, and we can '
1490       . 'not continue',
1491       ref $info->[0] eq 'CODE'
1492         ? "Connection coderef $info->[0] returned a"
1493         : 'DBI->connect($schema->storage->connect_info) resulted in a'
1494     ) unless $dbh->FETCH('Active');
1495
1496     # sanity checks unless asked otherwise
1497     unless ($self->unsafe) {
1498
1499       $self->throw_exception(
1500         'Refusing clobbering of {HandleError} installed on externally supplied '
1501        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1502       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1503
1504       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1505       # request, or an external handle. Complain and set anyway
1506       unless ($dbh->{RaiseError}) {
1507         carp( ref $info->[0] eq 'CODE'
1508
1509           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1510            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1511            .'attribute has been supplied'
1512
1513           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1514            .'unsafe => 1. Toggling RaiseError back to true'
1515         );
1516
1517         $dbh->{RaiseError} = 1;
1518       }
1519
1520       $dbh_error_handler_installer->($self, $dbh);
1521     }
1522   }
1523   catch {
1524     $self->throw_exception("DBI Connection failed: $_")
1525   };
1526
1527   $self->_dbh_autocommit($dbh->{AutoCommit});
1528   return $dbh;
1529 }
1530
1531 sub txn_begin {
1532   my $self = shift;
1533
1534   # this means we have not yet connected and do not know the AC status
1535   # (e.g. coderef $dbh), need a full-fledged connection check
1536   if (! defined $self->_dbh_autocommit) {
1537     $self->ensure_connected;
1538   }
1539   # Otherwise simply connect or re-connect on pid changes
1540   else {
1541     $self->_get_dbh;
1542   }
1543
1544   $self->next::method(@_);
1545 }
1546
1547 sub _exec_txn_begin {
1548   my $self = shift;
1549
1550   # if the user is utilizing txn_do - good for him, otherwise we need to
1551   # ensure that the $dbh is healthy on BEGIN.
1552   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1553   # will be replaced by a failure of begin_work itself (which will be
1554   # then retried on reconnect)
1555   if ($self->{_in_do_block}) {
1556     $self->_dbh->begin_work;
1557   } else {
1558     $self->dbh_do(sub { $_[1]->begin_work });
1559   }
1560 }
1561
1562 sub txn_commit {
1563   my $self = shift;
1564
1565   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1566   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1567     unless $self->_dbh;
1568
1569   # esoteric case for folks using external $dbh handles
1570   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1571     carp "Storage transaction_depth 0 does not match "
1572         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1573     $self->transaction_depth(1);
1574   }
1575
1576   $self->next::method(@_);
1577
1578   # if AutoCommit is disabled txn_depth never goes to 0
1579   # as a new txn is started immediately on commit
1580   $self->transaction_depth(1) if (
1581     !$self->transaction_depth
1582       and
1583     defined $self->_dbh_autocommit
1584       and
1585     ! $self->_dbh_autocommit
1586   );
1587 }
1588
1589 sub _exec_txn_commit {
1590   shift->_dbh->commit;
1591 }
1592
1593 sub txn_rollback {
1594   my $self = shift;
1595
1596   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1597   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1598     unless $self->_dbh;
1599
1600   # esoteric case for folks using external $dbh handles
1601   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1602     carp "Storage transaction_depth 0 does not match "
1603         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1604     $self->transaction_depth(1);
1605   }
1606
1607   $self->next::method(@_);
1608
1609   # if AutoCommit is disabled txn_depth never goes to 0
1610   # as a new txn is started immediately on commit
1611   $self->transaction_depth(1) if (
1612     !$self->transaction_depth
1613       and
1614     defined $self->_dbh_autocommit
1615       and
1616     ! $self->_dbh_autocommit
1617   );
1618 }
1619
1620 sub _exec_txn_rollback {
1621   shift->_dbh->rollback;
1622 }
1623
1624 # generate some identical methods
1625 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1626   no strict qw/refs/;
1627   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1628     my $self = shift;
1629     $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1630     $self->throw_exception("Unable to $meth() on a disconnected storage")
1631       unless $self->_dbh;
1632     $self->next::method(@_);
1633   };
1634 }
1635
1636 # This used to be the top-half of _execute.  It was split out to make it
1637 #  easier to override in NoBindVars without duping the rest.  It takes up
1638 #  all of _execute's args, and emits $sql, @bind.
1639 sub _prep_for_execute {
1640   #my ($self, $op, $ident, $args) = @_;
1641   return shift->_gen_sql_bind(@_)
1642 }
1643
1644 sub _gen_sql_bind {
1645   my ($self, $op, $ident, $args) = @_;
1646
1647   my ($colinfos, $from);
1648   if ( blessed($ident) ) {
1649     $from = $ident->from;
1650     $colinfos = $ident->columns_info;
1651   }
1652
1653   my ($sql, $bind);
1654   ($sql, @$bind) = $self->sql_maker->$op( ($from || $ident), @$args );
1655
1656   $bind = $self->_resolve_bindattrs(
1657     $ident, [ @{$args->[2]{bind}||[]}, @$bind ], $colinfos
1658   );
1659
1660   if (
1661     ! $ENV{DBIC_DT_SEARCH_OK}
1662       and
1663     $op eq 'select'
1664       and
1665     first {
1666       length ref $_->[1]
1667         and
1668       blessed($_->[1])
1669         and
1670       $_->[1]->isa('DateTime')
1671     } @$bind
1672   ) {
1673     carp_unique 'DateTime objects passed to search() are not supported '
1674       . 'properly (InflateColumn::DateTime formats and settings are not '
1675       . 'respected.) See "Formatting DateTime objects in queries" in '
1676       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1677       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1678   }
1679
1680   return( $sql, $bind );
1681 }
1682
1683 sub _resolve_bindattrs {
1684   my ($self, $ident, $bind, $colinfos) = @_;
1685
1686   $colinfos ||= {};
1687
1688   my $resolve_bindinfo = sub {
1689     #my $infohash = shift;
1690
1691     %$colinfos = %{ $self->_resolve_column_info($ident) }
1692       unless keys %$colinfos;
1693
1694     my $ret;
1695     if (my $col = $_[0]->{dbic_colname}) {
1696       $ret = { %{$_[0]} };
1697
1698       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1699         if $colinfos->{$col}{data_type};
1700
1701       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1702         if $colinfos->{$col}{size};
1703     }
1704
1705     $ret || $_[0];
1706   };
1707
1708   return [ map {
1709     my $resolved =
1710       ( ref $_ ne 'ARRAY' or @$_ != 2 ) ? [ {}, $_ ]
1711     : ( ! defined $_->[0] )             ? [ {}, $_->[1] ]
1712     : (ref $_->[0] eq 'HASH')           ? [ (exists $_->[0]{dbd_attrs} or $_->[0]{sqlt_datatype})
1713                                               ? $_->[0]
1714                                               : $resolve_bindinfo->($_->[0])
1715                                             , $_->[1] ]
1716     : (ref $_->[0] eq 'SCALAR')         ? [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1717     :                                     [ $resolve_bindinfo->(
1718                                               { dbic_colname => $_->[0] }
1719                                             ), $_->[1] ]
1720     ;
1721
1722     if (
1723       ! exists $resolved->[0]{dbd_attrs}
1724         and
1725       ! $resolved->[0]{sqlt_datatype}
1726         and
1727       length ref $resolved->[1]
1728         and
1729       ! overload::Method($resolved->[1], '""')
1730     ) {
1731       require Data::Dumper;
1732       local $Data::Dumper::Maxdepth = 1;
1733       local $Data::Dumper::Terse = 1;
1734       local $Data::Dumper::Useqq = 1;
1735       local $Data::Dumper::Indent = 0;
1736       local $Data::Dumper::Pad = ' ';
1737       $self->throw_exception(
1738         'You must supply a datatype/bindtype (see DBIx::Class::ResultSet/DBIC BIND VALUES) '
1739       . 'for non-scalar value '. Data::Dumper::Dumper ($resolved->[1])
1740       );
1741     }
1742
1743     $resolved;
1744
1745   } @$bind ];
1746 }
1747
1748 sub _format_for_trace {
1749   #my ($self, $bind) = @_;
1750
1751   ### Turn @bind from something like this:
1752   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1753   ### to this:
1754   ###   ( "'1'", "'3'" )
1755
1756   map {
1757     defined( $_ && $_->[1] )
1758       ? qq{'$_->[1]'}
1759       : q{NULL}
1760   } @{$_[1] || []};
1761 }
1762
1763 sub _query_start {
1764   my ( $self, $sql, $bind ) = @_;
1765
1766   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1767     if $self->debug;
1768 }
1769
1770 sub _query_end {
1771   my ( $self, $sql, $bind ) = @_;
1772
1773   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1774     if $self->debug;
1775 }
1776
1777 sub _dbi_attrs_for_bind {
1778   my ($self, $ident, $bind) = @_;
1779
1780   my @attrs;
1781
1782   for (map { $_->[0] } @$bind) {
1783     push @attrs, do {
1784       if (exists $_->{dbd_attrs}) {
1785         $_->{dbd_attrs}
1786       }
1787       elsif($_->{sqlt_datatype}) {
1788         # cache the result in the dbh_details hash, as it can not change unless
1789         # we connect to something else
1790         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1791         if (not exists $cache->{$_->{sqlt_datatype}}) {
1792           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1793         }
1794         $cache->{$_->{sqlt_datatype}};
1795       }
1796       else {
1797         undef;  # always push something at this position
1798       }
1799     }
1800   }
1801
1802   return \@attrs;
1803 }
1804
1805 sub _execute {
1806   my ($self, $op, $ident, @args) = @_;
1807
1808   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1809
1810   # not even a PID check - we do not care about the state of the _dbh.
1811   # All we need is to get the appropriate drivers loaded if they aren't
1812   # already so that the assumption in ad7c50fc26e holds
1813   $self->_populate_dbh unless $self->_dbh;
1814
1815   $self->dbh_do( _dbh_execute =>     # retry over disconnects
1816     $sql,
1817     $bind,
1818     $self->_dbi_attrs_for_bind($ident, $bind),
1819   );
1820 }
1821
1822 sub _dbh_execute {
1823   my ($self, $dbh, $sql, $bind, $bind_attrs) = @_;
1824
1825   $self->_query_start( $sql, $bind );
1826
1827   my $sth = $self->_bind_sth_params(
1828     $self->_prepare_sth($dbh, $sql),
1829     $bind,
1830     $bind_attrs,
1831   );
1832
1833   # Can this fail without throwing an exception anyways???
1834   my $rv = $sth->execute();
1835   $self->throw_exception(
1836     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1837   ) if !$rv;
1838
1839   $self->_query_end( $sql, $bind );
1840
1841   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1842 }
1843
1844 sub _prepare_sth {
1845   my ($self, $dbh, $sql) = @_;
1846
1847   # 3 is the if_active parameter which avoids active sth re-use
1848   my $sth = $self->disable_sth_caching
1849     ? $dbh->prepare($sql)
1850     : $dbh->prepare_cached($sql, {}, 3);
1851
1852   # XXX You would think RaiseError would make this impossible,
1853   #  but apparently that's not true :(
1854   $self->throw_exception(
1855     $dbh->errstr
1856       ||
1857     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
1858             .'an exception and/or setting $dbh->errstr',
1859       length ($sql) > 20
1860         ? substr($sql, 0, 20) . '...'
1861         : $sql
1862       ,
1863       'DBD::' . $dbh->{Driver}{Name},
1864     )
1865   ) if !$sth;
1866
1867   $sth;
1868 }
1869
1870 sub _bind_sth_params {
1871   my ($self, $sth, $bind, $bind_attrs) = @_;
1872
1873   for my $i (0 .. $#$bind) {
1874     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1875       $sth->bind_param_inout(
1876         $i + 1, # bind params counts are 1-based
1877         $bind->[$i][1],
1878         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1879         $bind_attrs->[$i],
1880       );
1881     }
1882     else {
1883       # FIXME SUBOPTIMAL - most likely this is not necessary at all
1884       # confirm with dbi-dev whether explicit stringification is needed
1885       my $v = ( length ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""') )
1886         ? "$bind->[$i][1]"
1887         : $bind->[$i][1]
1888       ;
1889       $sth->bind_param(
1890         $i + 1,
1891         $v,
1892         $bind_attrs->[$i],
1893       );
1894     }
1895   }
1896
1897   $sth;
1898 }
1899
1900 sub _prefetch_autovalues {
1901   my ($self, $source, $colinfo, $to_insert) = @_;
1902
1903   my %values;
1904   for my $col (keys %$colinfo) {
1905     if (
1906       $colinfo->{$col}{auto_nextval}
1907         and
1908       (
1909         ! exists $to_insert->{$col}
1910           or
1911         ref $to_insert->{$col} eq 'SCALAR'
1912           or
1913         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1914       )
1915     ) {
1916       $values{$col} = $self->_sequence_fetch(
1917         'NEXTVAL',
1918         ( $colinfo->{$col}{sequence} ||=
1919             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1920         ),
1921       );
1922     }
1923   }
1924
1925   \%values;
1926 }
1927
1928 sub insert {
1929   my ($self, $source, $to_insert) = @_;
1930
1931   my $col_infos = $source->columns_info;
1932
1933   my $prefetched_values = $self->_prefetch_autovalues($source, $col_infos, $to_insert);
1934
1935   # fuse the values, but keep a separate list of prefetched_values so that
1936   # they can be fused once again with the final return
1937   $to_insert = { %$to_insert, %$prefetched_values };
1938
1939   # FIXME - we seem to assume undef values as non-supplied. This is wrong.
1940   # Investigate what does it take to s/defined/exists/
1941   my %pcols = map { $_ => 1 } $source->primary_columns;
1942   my (%retrieve_cols, $autoinc_supplied, $retrieve_autoinc_col);
1943   for my $col ($source->columns) {
1944     if ($col_infos->{$col}{is_auto_increment}) {
1945       $autoinc_supplied ||= 1 if defined $to_insert->{$col};
1946       $retrieve_autoinc_col ||= $col unless $autoinc_supplied;
1947     }
1948
1949     # nothing to retrieve when explicit values are supplied
1950     next if (defined $to_insert->{$col} and ! (
1951       ref $to_insert->{$col} eq 'SCALAR'
1952         or
1953       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1954     ));
1955
1956     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1957     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1958       $pcols{$col}
1959         or
1960       $col_infos->{$col}{retrieve_on_insert}
1961     );
1962   };
1963
1964   local $self->{_autoinc_supplied_for_op} = $autoinc_supplied;
1965   local $self->{_perform_autoinc_retrieval} = $retrieve_autoinc_col;
1966
1967   my ($sqla_opts, @ir_container);
1968   if (%retrieve_cols and $self->_use_insert_returning) {
1969     $sqla_opts->{returning_container} = \@ir_container
1970       if $self->_use_insert_returning_bound;
1971
1972     $sqla_opts->{returning} = [
1973       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1974     ];
1975   }
1976
1977   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1978
1979   my %returned_cols = %$to_insert;
1980   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1981     @ir_container = try {
1982       local $SIG{__WARN__} = sub {};
1983       my @r = $sth->fetchrow_array;
1984       $sth->finish;
1985       @r;
1986     } unless @ir_container;
1987
1988     @returned_cols{@$retlist} = @ir_container if @ir_container;
1989   }
1990   else {
1991     # pull in PK if needed and then everything else
1992     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1993
1994       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1995         unless $self->can('last_insert_id');
1996
1997       my @pri_values = $self->last_insert_id($source, @missing_pri);
1998
1999       $self->throw_exception( "Can't get last insert id" )
2000         unless (@pri_values == @missing_pri);
2001
2002       @returned_cols{@missing_pri} = @pri_values;
2003       delete @retrieve_cols{@missing_pri};
2004     }
2005
2006     # if there is more left to pull
2007     if (%retrieve_cols) {
2008       $self->throw_exception(
2009         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
2010       ) unless %pcols;
2011
2012       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
2013
2014       my $cur = DBIx::Class::ResultSet->new($source, {
2015         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
2016         select => \@left_to_fetch,
2017       })->cursor;
2018
2019       @returned_cols{@left_to_fetch} = $cur->next;
2020
2021       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
2022         if scalar $cur->next;
2023     }
2024   }
2025
2026   return { %$prefetched_values, %returned_cols };
2027 }
2028
2029 sub insert_bulk {
2030   my ($self, $source, $cols, $data) = @_;
2031
2032   my @col_range = (0..$#$cols);
2033
2034   # FIXME SUBOPTIMAL - most likely this is not necessary at all
2035   # confirm with dbi-dev whether explicit stringification is needed
2036   #
2037   # forcibly stringify whatever is stringifiable
2038   # ResultSet::populate() hands us a copy - safe to mangle
2039   for my $r (0 .. $#$data) {
2040     for my $c (0 .. $#{$data->[$r]}) {
2041       $data->[$r][$c] = "$data->[$r][$c]"
2042         if ( length ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
2043     }
2044   }
2045
2046   my $colinfos = $source->columns_info($cols);
2047
2048   local $self->{_autoinc_supplied_for_op} =
2049     (first { $_->{is_auto_increment} } values %$colinfos)
2050       ? 1
2051       : 0
2052   ;
2053
2054   # get a slice type index based on first row of data
2055   # a "column" in this context may refer to more than one bind value
2056   # e.g. \[ '?, ?', [...], [...] ]
2057   #
2058   # construct the value type index - a description of values types for every
2059   # per-column slice of $data:
2060   #
2061   # nonexistent - nonbind literal
2062   # 0 - regular value
2063   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
2064   #
2065   # also construct the column hash to pass to the SQL generator. For plain
2066   # (non literal) values - convert the members of the first row into a
2067   # literal+bind combo, with extra positional info in the bind attr hashref.
2068   # This will allow us to match the order properly, and is so contrived
2069   # because a user-supplied literal/bind (or something else specific to a
2070   # resultsource and/or storage driver) can inject extra binds along the
2071   # way, so one can't rely on "shift positions" ordering at all. Also we
2072   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
2073   # can be later matched up by address), because we want to supply a real
2074   # value on which perhaps e.g. datatype checks will be performed
2075   my ($proto_data, $value_type_by_col_idx);
2076   for my $i (@col_range) {
2077     my $colname = $cols->[$i];
2078     if (ref $data->[0][$i] eq 'SCALAR') {
2079       # no bind value at all - no type
2080
2081       $proto_data->{$colname} = $data->[0][$i];
2082     }
2083     elsif (ref $data->[0][$i] eq 'REF' and ref ${$data->[0][$i]} eq 'ARRAY' ) {
2084       # repack, so we don't end up mangling the original \[]
2085       my ($sql, @bind) = @${$data->[0][$i]};
2086
2087       # normalization of user supplied stuff
2088       my $resolved_bind = $self->_resolve_bindattrs(
2089         $source, \@bind, $colinfos,
2090       );
2091
2092       # store value-less (attrs only) bind info - we will be comparing all
2093       # supplied binds against this for sanity
2094       $value_type_by_col_idx->{$i} = [ map { $_->[0] } @$resolved_bind ];
2095
2096       $proto_data->{$colname} = \[ $sql, map { [
2097         # inject slice order to use for $proto_bind construction
2098           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $i, _literal_bind_subindex => $_+1 }
2099             =>
2100           $resolved_bind->[$_][1]
2101         ] } (0 .. $#bind)
2102       ];
2103     }
2104     else {
2105       $value_type_by_col_idx->{$i} = undef;
2106
2107       $proto_data->{$colname} = \[ '?', [
2108         { dbic_colname => $colname, _bind_data_slice_idx => $i }
2109           =>
2110         $data->[0][$i]
2111       ] ];
2112     }
2113   }
2114
2115   my ($sql, $proto_bind) = $self->_prep_for_execute (
2116     'insert',
2117     $source,
2118     [ $proto_data ],
2119   );
2120
2121   if (! @$proto_bind and keys %$value_type_by_col_idx) {
2122     # if the bindlist is empty and we had some dynamic binds, this means the
2123     # storage ate them away (e.g. the NoBindVars component) and interpolated
2124     # them directly into the SQL. This obviously can't be good for multi-inserts
2125     $self->throw_exception('Cannot insert_bulk without support for placeholders');
2126   }
2127
2128   # sanity checks
2129   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
2130   #
2131   # use an error reporting closure for convenience (less to pass)
2132   my $bad_slice_report_cref = sub {
2133     my ($msg, $r_idx, $c_idx) = @_;
2134     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
2135       $msg,
2136       $cols->[$c_idx],
2137       do {
2138         require Data::Dumper::Concise;
2139         local $Data::Dumper::Maxdepth = 5;
2140         Data::Dumper::Concise::Dumper ({
2141           map { $cols->[$_] =>
2142             $data->[$r_idx][$_]
2143           } @col_range
2144         }),
2145       }
2146     );
2147   };
2148
2149   for my $col_idx (@col_range) {
2150     my $reference_val = $data->[0][$col_idx];
2151
2152     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
2153       my $val = $data->[$row_idx][$col_idx];
2154
2155       if (! exists $value_type_by_col_idx->{$col_idx}) { # literal no binds
2156         if (ref $val ne 'SCALAR') {
2157           $bad_slice_report_cref->(
2158             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
2159             $row_idx,
2160             $col_idx,
2161           );
2162         }
2163         elsif ($$val ne $$reference_val) {
2164           $bad_slice_report_cref->(
2165             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
2166             $row_idx,
2167             $col_idx,
2168           );
2169         }
2170       }
2171       elsif (! defined $value_type_by_col_idx->{$col_idx} ) {  # regular non-literal value
2172         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
2173           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
2174         }
2175       }
2176       else {  # binds from a \[], compare type and attrs
2177         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
2178           $bad_slice_report_cref->(
2179             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
2180             $row_idx,
2181             $col_idx,
2182           );
2183         }
2184         # start drilling down and bail out early on identical refs
2185         elsif (
2186           $reference_val != $val
2187             or
2188           $$reference_val != $$val
2189         ) {
2190           if (${$val}->[0] ne ${$reference_val}->[0]) {
2191             $bad_slice_report_cref->(
2192               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
2193               $row_idx,
2194               $col_idx,
2195             );
2196           }
2197           # need to check the bind attrs - a bind will happen only once for
2198           # the entire dataset, so any changes further down will be ignored.
2199           elsif (! Data::Compare::Compare(
2200             $value_type_by_col_idx->{$col_idx},
2201             [
2202               map
2203               { $_->[0] }
2204               @{$self->_resolve_bindattrs(
2205                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
2206               )}
2207             ],
2208           )) {
2209             $bad_slice_report_cref->(
2210               'Differing bind attributes on literal/bind values not supported',
2211               $row_idx,
2212               $col_idx,
2213             );
2214           }
2215         }
2216       }
2217     }
2218   }
2219
2220   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
2221   # are atomic (even if execute_for_fetch is a single call). Thus a safety
2222   # scope guard
2223   my $guard = $self->txn_scope_guard;
2224
2225   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
2226   my $sth = $self->_prepare_sth($self->_dbh, $sql);
2227   my $rv = do {
2228     if (@$proto_bind) {
2229       # proto bind contains the information on which pieces of $data to pull
2230       # $cols is passed in only for prettier error-reporting
2231       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
2232     }
2233     else {
2234       # bind_param_array doesn't work if there are no binds
2235       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
2236     }
2237   };
2238
2239   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
2240
2241   $guard->commit;
2242
2243   return wantarray ? ($rv, $sth, @$proto_bind) : $rv;
2244 }
2245
2246 # execute_for_fetch is capable of returning data just fine (it means it
2247 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
2248 # is the void-populate fast-path we will just ignore this altogether
2249 # for the time being.
2250 sub _dbh_execute_for_fetch {
2251   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
2252
2253   my @idx_range = ( 0 .. $#$proto_bind );
2254
2255   # If we have any bind attributes to take care of, we will bind the
2256   # proto-bind data (which will never be used by execute_for_fetch)
2257   # However since column bindtypes are "sticky", this is sufficient
2258   # to get the DBD to apply the bindtype to all values later on
2259
2260   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
2261
2262   for my $i (@idx_range) {
2263     $sth->bind_param (
2264       $i+1, # DBI bind indexes are 1-based
2265       $proto_bind->[$i][1],
2266       $bind_attrs->[$i],
2267     ) if defined $bind_attrs->[$i];
2268   }
2269
2270   # At this point $data slots named in the _bind_data_slice_idx of
2271   # each piece of $proto_bind are either \[]s or plain values to be
2272   # passed in. Construct the dispensing coderef. *NOTE* the order
2273   # of $data will differ from this of the ?s in the SQL (due to
2274   # alphabetical ordering by colname). We actually do want to
2275   # preserve this behavior so that prepare_cached has a better
2276   # chance of matching on unrelated calls
2277
2278   my $fetch_row_idx = -1; # saner loop this way
2279   my $fetch_tuple = sub {
2280     return undef if ++$fetch_row_idx > $#$data;
2281
2282     return [ map { defined $_->{_literal_bind_subindex}
2283       ? ${ $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]}
2284          ->[ $_->{_literal_bind_subindex} ]
2285           ->[1]
2286       : $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]
2287     } map { $_->[0] } @$proto_bind];
2288   };
2289
2290   my $tuple_status = [];
2291   my ($rv, $err);
2292   try {
2293     $rv = $sth->execute_for_fetch(
2294       $fetch_tuple,
2295       $tuple_status,
2296     );
2297   }
2298   catch {
2299     $err = shift;
2300   };
2301
2302   # Not all DBDs are create equal. Some throw on error, some return
2303   # an undef $rv, and some set $sth->err - try whatever we can
2304   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2305     ! defined $err
2306       and
2307     ( !defined $rv or $sth->err )
2308   );
2309
2310   # Statement must finish even if there was an exception.
2311   try {
2312     $sth->finish
2313   }
2314   catch {
2315     $err = shift unless defined $err
2316   };
2317
2318   if (defined $err) {
2319     my $i = 0;
2320     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2321
2322     $self->throw_exception("Unexpected populate error: $err")
2323       if ($i > $#$tuple_status);
2324
2325     require Data::Dumper::Concise;
2326     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2327       ($tuple_status->[$i][1] || $err),
2328       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2329     );
2330   }
2331
2332   return $rv;
2333 }
2334
2335 sub _dbh_execute_inserts_with_no_binds {
2336   my ($self, $sth, $count) = @_;
2337
2338   my $err;
2339   try {
2340     my $dbh = $self->_get_dbh;
2341     local $dbh->{RaiseError} = 1;
2342     local $dbh->{PrintError} = 0;
2343
2344     $sth->execute foreach 1..$count;
2345   }
2346   catch {
2347     $err = shift;
2348   };
2349
2350   # Make sure statement is finished even if there was an exception.
2351   try {
2352     $sth->finish
2353   }
2354   catch {
2355     $err = shift unless defined $err;
2356   };
2357
2358   $self->throw_exception($err) if defined $err;
2359
2360   return $count;
2361 }
2362
2363 sub update {
2364   #my ($self, $source, @args) = @_;
2365   shift->_execute('update', @_);
2366 }
2367
2368
2369 sub delete {
2370   #my ($self, $source, @args) = @_;
2371   shift->_execute('delete', @_);
2372 }
2373
2374 sub _select {
2375   my $self = shift;
2376   $self->_execute($self->_select_args(@_));
2377 }
2378
2379 sub _select_args_to_query {
2380   my $self = shift;
2381
2382   $self->throw_exception(
2383     "Unable to generate limited query representation with 'software_limit' enabled"
2384   ) if ($_[3]->{software_limit} and ($_[3]->{offset} or $_[3]->{rows}) );
2385
2386   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2387   #  = $self->_select_args($ident, $select, $cond, $attrs);
2388   my ($op, $ident, @args) =
2389     $self->_select_args(@_);
2390
2391   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2392   my ($sql, $bind) = $self->_gen_sql_bind($op, $ident, \@args);
2393
2394   # reuse the bind arrayref
2395   unshift @{$bind}, "($sql)";
2396   \$bind;
2397 }
2398
2399 sub _select_args {
2400   my ($self, $ident, $select, $where, $orig_attrs) = @_;
2401
2402   # FIXME - that kind of caching would be nice to have
2403   # however currently we *may* pass the same $orig_attrs
2404   # with different ident/select/where
2405   # the whole interface needs to be rethought, since it
2406   # was centered around the flawed SQLA API. We can do
2407   # soooooo much better now. But that is also another
2408   # battle...
2409   #return (
2410   #  'select', $orig_attrs->{!args_as_stored_at_the_end_of_this_method!}
2411   #) if $orig_attrs->{!args_as_stored_at_the_end_of_this_method!};
2412
2413   my $sql_maker = $self->sql_maker;
2414   my $alias2source = $self->_resolve_ident_sources ($ident);
2415
2416   my $attrs = {
2417     %$orig_attrs,
2418     select => $select,
2419     from => $ident,
2420     where => $where,
2421
2422     # limit dialects use this stuff
2423     # yes, some CDBICompat crap does not supply an {alias} >.<
2424     ( $orig_attrs->{alias} and $alias2source->{$orig_attrs->{alias}} )
2425       ? ( _rsroot_rsrc => $alias2source->{$orig_attrs->{alias}} )
2426       : ()
2427     ,
2428   };
2429
2430   # Sanity check the attributes (SQLMaker does it too, but
2431   # in case of a software_limit we'll never reach there)
2432   if (defined $attrs->{offset}) {
2433     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2434       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2435   }
2436
2437   if (defined $attrs->{rows}) {
2438     $self->throw_exception("The rows attribute must be a positive integer if present")
2439       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2440   }
2441   elsif ($attrs->{offset}) {
2442     # MySQL actually recommends this approach.  I cringe.
2443     $attrs->{rows} = $sql_maker->__max_int;
2444   }
2445
2446   # see if we will need to tear the prefetch apart to satisfy group_by == select
2447   # this is *extremely tricky* to get right, I am still not sure I did
2448   #
2449   my ($prefetch_needs_subquery, @limit_args);
2450
2451   if ( $attrs->{_grouped_by_distinct} and $attrs->{collapse} ) {
2452     # we already know there is a valid group_by (we made it) and we know it is
2453     # intended to be based *only* on non-multi stuff
2454     # short circuit the group_by parsing below
2455     $prefetch_needs_subquery = 1;
2456   }
2457   elsif (
2458     # The rationale is that even if we do *not* have collapse, we still
2459     # need to wrap the core grouped select/group_by in a subquery
2460     # so that databases that care about group_by/select equivalence
2461     # are happy (this includes MySQL in strict_mode)
2462     # If any of the other joined tables are referenced in the group_by
2463     # however - the user is on their own
2464     ( $prefetch_needs_subquery or $attrs->{_related_results_construction} )
2465       and
2466     $attrs->{group_by}
2467       and
2468     @{$attrs->{group_by}}
2469       and
2470     my $grp_aliases = try { # try{} because $attrs->{from} may be unreadable
2471       $self->_resolve_aliastypes_from_select_args({ from => $attrs->{from}, group_by => $attrs->{group_by} })
2472     }
2473   ) {
2474     # no aliases other than our own in group_by
2475     # if there are - do not allow subquery even if limit is present
2476     $prefetch_needs_subquery = ! scalar grep { $_ ne $attrs->{alias} } keys %{ $grp_aliases->{grouping} || {} };
2477   }
2478   elsif ( $attrs->{rows} && $attrs->{collapse} ) {
2479     # active collapse with a limit - that one is a no-brainer unless
2480     # overruled by a group_by above
2481     $prefetch_needs_subquery = 1;
2482   }
2483
2484   if ($prefetch_needs_subquery) {
2485     $attrs = $self->_adjust_select_args_for_complex_prefetch ($attrs);
2486   }
2487   elsif (! $attrs->{software_limit} ) {
2488     push @limit_args, (
2489       $attrs->{rows} || (),
2490       $attrs->{offset} || (),
2491     );
2492   }
2493
2494   # try to simplify the joinmap further (prune unreferenced type-single joins)
2495   if (
2496     ! $prefetch_needs_subquery  # already pruned
2497       and
2498     ref $attrs->{from}
2499       and
2500     reftype $attrs->{from} eq 'ARRAY'
2501       and
2502     @{$attrs->{from}} != 1
2503   ) {
2504     ($attrs->{from}, $attrs->{_aliastypes}) = $self->_prune_unused_joins ($attrs);
2505   }
2506
2507   # FIXME this is a gross, inefficient, largely incorrect and fragile hack
2508   # during the result inflation stage we *need* to know what was the aliastype
2509   # map as sqla saw it when the final pieces of SQL were being assembled
2510   # Originally we simply carried around the entirety of $attrs, but this
2511   # resulted in resultsets that are being reused growing continuously, as
2512   # the hash in question grew deeper and deeper.
2513   # Instead hand-pick what to take with us here (we actually don't need much
2514   # at this point just the map itself)
2515   $orig_attrs->{_last_sqlmaker_alias_map} = $attrs->{_aliastypes};
2516
2517 ###
2518   # This would be the point to deflate anything found in $attrs->{where}
2519   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2520   # expect a result object. And all we have is a resultsource (it is trivial
2521   # to extract deflator coderefs via $alias2source above).
2522   #
2523   # I don't see a way forward other than changing the way deflators are
2524   # invoked, and that's just bad...
2525 ###
2526
2527   return ( 'select', @{$attrs}{qw(from select where)}, $attrs, @limit_args );
2528 }
2529
2530 # Returns a counting SELECT for a simple count
2531 # query. Abstracted so that a storage could override
2532 # this to { count => 'firstcol' } or whatever makes
2533 # sense as a performance optimization
2534 sub _count_select {
2535   #my ($self, $source, $rs_attrs) = @_;
2536   return { count => '*' };
2537 }
2538
2539 =head2 select
2540
2541 =over 4
2542
2543 =item Arguments: $ident, $select, $condition, $attrs
2544
2545 =back
2546
2547 Handle a SQL select statement.
2548
2549 =cut
2550
2551 sub select {
2552   my $self = shift;
2553   my ($ident, $select, $condition, $attrs) = @_;
2554   return $self->cursor_class->new($self, \@_, $attrs);
2555 }
2556
2557 sub select_single {
2558   my $self = shift;
2559   my ($rv, $sth, @bind) = $self->_select(@_);
2560   my @row = $sth->fetchrow_array;
2561   my @nextrow = $sth->fetchrow_array if @row;
2562   if(@row && @nextrow) {
2563     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2564   }
2565   # Need to call finish() to work round broken DBDs
2566   $sth->finish();
2567   return @row;
2568 }
2569
2570 =head2 sql_limit_dialect
2571
2572 This is an accessor for the default SQL limit dialect used by a particular
2573 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2574 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2575 see L<DBIx::Class::SQLMaker::LimitDialects>.
2576
2577 =cut
2578
2579 sub _dbh_columns_info_for {
2580   my ($self, $dbh, $table) = @_;
2581
2582   if ($dbh->can('column_info')) {
2583     my %result;
2584     my $caught;
2585     try {
2586       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2587       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2588       $sth->execute();
2589       while ( my $info = $sth->fetchrow_hashref() ){
2590         my %column_info;
2591         $column_info{data_type}   = $info->{TYPE_NAME};
2592         $column_info{size}      = $info->{COLUMN_SIZE};
2593         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2594         $column_info{default_value} = $info->{COLUMN_DEF};
2595         my $col_name = $info->{COLUMN_NAME};
2596         $col_name =~ s/^\"(.*)\"$/$1/;
2597
2598         $result{$col_name} = \%column_info;
2599       }
2600     } catch {
2601       $caught = 1;
2602     };
2603     return \%result if !$caught && scalar keys %result;
2604   }
2605
2606   my %result;
2607   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2608   $sth->execute;
2609   my @columns = @{$sth->{NAME_lc}};
2610   for my $i ( 0 .. $#columns ){
2611     my %column_info;
2612     $column_info{data_type} = $sth->{TYPE}->[$i];
2613     $column_info{size} = $sth->{PRECISION}->[$i];
2614     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2615
2616     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2617       $column_info{data_type} = $1;
2618       $column_info{size}    = $2;
2619     }
2620
2621     $result{$columns[$i]} = \%column_info;
2622   }
2623   $sth->finish;
2624
2625   foreach my $col (keys %result) {
2626     my $colinfo = $result{$col};
2627     my $type_num = $colinfo->{data_type};
2628     my $type_name;
2629     if(defined $type_num && $dbh->can('type_info')) {
2630       my $type_info = $dbh->type_info($type_num);
2631       $type_name = $type_info->{TYPE_NAME} if $type_info;
2632       $colinfo->{data_type} = $type_name if $type_name;
2633     }
2634   }
2635
2636   return \%result;
2637 }
2638
2639 sub columns_info_for {
2640   my ($self, $table) = @_;
2641   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2642 }
2643
2644 =head2 last_insert_id
2645
2646 Return the row id of the last insert.
2647
2648 =cut
2649
2650 sub _dbh_last_insert_id {
2651     my ($self, $dbh, $source, $col) = @_;
2652
2653     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2654
2655     return $id if defined $id;
2656
2657     my $class = ref $self;
2658     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2659 }
2660
2661 sub last_insert_id {
2662   my $self = shift;
2663   $self->_dbh_last_insert_id ($self->_dbh, @_);
2664 }
2665
2666 =head2 _native_data_type
2667
2668 =over 4
2669
2670 =item Arguments: $type_name
2671
2672 =back
2673
2674 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2675 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2676 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2677
2678 The default implementation returns C<undef>, implement in your Storage driver if
2679 you need this functionality.
2680
2681 Should map types from other databases to the native RDBMS type, for example
2682 C<VARCHAR2> to C<VARCHAR>.
2683
2684 Types with modifiers should map to the underlying data type. For example,
2685 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2686
2687 Composite types should map to the container type, for example
2688 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2689
2690 =cut
2691
2692 sub _native_data_type {
2693   #my ($self, $data_type) = @_;
2694   return undef
2695 }
2696
2697 # Check if placeholders are supported at all
2698 sub _determine_supports_placeholders {
2699   my $self = shift;
2700   my $dbh  = $self->_get_dbh;
2701
2702   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2703   # but it is inaccurate more often than not
2704   return try {
2705     local $dbh->{PrintError} = 0;
2706     local $dbh->{RaiseError} = 1;
2707     $dbh->do('select ?', {}, 1);
2708     1;
2709   }
2710   catch {
2711     0;
2712   };
2713 }
2714
2715 # Check if placeholders bound to non-string types throw exceptions
2716 #
2717 sub _determine_supports_typeless_placeholders {
2718   my $self = shift;
2719   my $dbh  = $self->_get_dbh;
2720
2721   return try {
2722     local $dbh->{PrintError} = 0;
2723     local $dbh->{RaiseError} = 1;
2724     # this specifically tests a bind that is NOT a string
2725     $dbh->do('select 1 where 1 = ?', {}, 1);
2726     1;
2727   }
2728   catch {
2729     0;
2730   };
2731 }
2732
2733 =head2 sqlt_type
2734
2735 Returns the database driver name.
2736
2737 =cut
2738
2739 sub sqlt_type {
2740   shift->_get_dbh->{Driver}->{Name};
2741 }
2742
2743 =head2 bind_attribute_by_data_type
2744
2745 Given a datatype from column info, returns a database specific bind
2746 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2747 let the database planner just handle it.
2748
2749 This method is always called after the driver has been determined and a DBI
2750 connection has been established. Therefore you can refer to C<DBI::$constant>
2751 and/or C<DBD::$driver::$constant> directly, without worrying about loading
2752 the correct modules.
2753
2754 =cut
2755
2756 sub bind_attribute_by_data_type {
2757     return;
2758 }
2759
2760 =head2 is_datatype_numeric
2761
2762 Given a datatype from column_info, returns a boolean value indicating if
2763 the current RDBMS considers it a numeric value. This controls how
2764 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2765 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2766 be performed instead of the usual C<eq>.
2767
2768 =cut
2769
2770 sub is_datatype_numeric {
2771   #my ($self, $dt) = @_;
2772
2773   return 0 unless $_[1];
2774
2775   $_[1] =~ /^ (?:
2776     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2777   ) $/ix;
2778 }
2779
2780
2781 =head2 create_ddl_dir
2782
2783 =over 4
2784
2785 =item Arguments: $schema, \@databases, $version, $directory, $preversion, \%sqlt_args
2786
2787 =back
2788
2789 Creates a SQL file based on the Schema, for each of the specified
2790 database engines in C<\@databases> in the given directory.
2791 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2792
2793 Given a previous version number, this will also create a file containing
2794 the ALTER TABLE statements to transform the previous schema into the
2795 current one. Note that these statements may contain C<DROP TABLE> or
2796 C<DROP COLUMN> statements that can potentially destroy data.
2797
2798 The file names are created using the C<ddl_filename> method below, please
2799 override this method in your schema if you would like a different file
2800 name format. For the ALTER file, the same format is used, replacing
2801 $version in the name with "$preversion-$version".
2802
2803 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2804 The most common value for this would be C<< { add_drop_table => 1 } >>
2805 to have the SQL produced include a C<DROP TABLE> statement for each table
2806 created. For quoting purposes supply C<quote_identifiers>.
2807
2808 If no arguments are passed, then the following default values are assumed:
2809
2810 =over 4
2811
2812 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2813
2814 =item version    - $schema->schema_version
2815
2816 =item directory  - './'
2817
2818 =item preversion - <none>
2819
2820 =back
2821
2822 By default, C<\%sqlt_args> will have
2823
2824  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2825
2826 merged with the hash passed in. To disable any of those features, pass in a
2827 hashref like the following
2828
2829  { ignore_constraint_names => 0, # ... other options }
2830
2831
2832 WARNING: You are strongly advised to check all SQL files created, before applying
2833 them.
2834
2835 =cut
2836
2837 sub create_ddl_dir {
2838   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2839
2840   unless ($dir) {
2841     carp "No directory given, using ./\n";
2842     $dir = './';
2843   } else {
2844       -d $dir
2845         or
2846       (require File::Path and File::Path::mkpath (["$dir"]))  # mkpath does not like objects (i.e. Path::Class::Dir)
2847         or
2848       $self->throw_exception(
2849         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2850       );
2851   }
2852
2853   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2854
2855   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2856   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2857
2858   my $schema_version = $schema->schema_version || '1.x';
2859   $version ||= $schema_version;
2860
2861   $sqltargs = {
2862     add_drop_table => 1,
2863     ignore_constraint_names => 1,
2864     ignore_index_names => 1,
2865     %{$sqltargs || {}}
2866   };
2867
2868   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2869     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2870   }
2871
2872   my $sqlt = SQL::Translator->new( $sqltargs );
2873
2874   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2875   my $sqlt_schema = $sqlt->translate({ data => $schema })
2876     or $self->throw_exception ($sqlt->error);
2877
2878   foreach my $db (@$databases) {
2879     $sqlt->reset();
2880     $sqlt->{schema} = $sqlt_schema;
2881     $sqlt->producer($db);
2882
2883     my $file;
2884     my $filename = $schema->ddl_filename($db, $version, $dir);
2885     if (-e $filename && ($version eq $schema_version )) {
2886       # if we are dumping the current version, overwrite the DDL
2887       carp "Overwriting existing DDL file - $filename";
2888       unlink($filename);
2889     }
2890
2891     my $output = $sqlt->translate;
2892     if(!$output) {
2893       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2894       next;
2895     }
2896     if(!open($file, ">$filename")) {
2897       $self->throw_exception("Can't open $filename for writing ($!)");
2898       next;
2899     }
2900     print $file $output;
2901     close($file);
2902
2903     next unless ($preversion);
2904
2905     require SQL::Translator::Diff;
2906
2907     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2908     if(!-e $prefilename) {
2909       carp("No previous schema file found ($prefilename)");
2910       next;
2911     }
2912
2913     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2914     if(-e $difffile) {
2915       carp("Overwriting existing diff file - $difffile");
2916       unlink($difffile);
2917     }
2918
2919     my $source_schema;
2920     {
2921       my $t = SQL::Translator->new($sqltargs);
2922       $t->debug( 0 );
2923       $t->trace( 0 );
2924
2925       $t->parser( $db )
2926         or $self->throw_exception ($t->error);
2927
2928       my $out = $t->translate( $prefilename )
2929         or $self->throw_exception ($t->error);
2930
2931       $source_schema = $t->schema;
2932
2933       $source_schema->name( $prefilename )
2934         unless ( $source_schema->name );
2935     }
2936
2937     # The "new" style of producers have sane normalization and can support
2938     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2939     # And we have to diff parsed SQL against parsed SQL.
2940     my $dest_schema = $sqlt_schema;
2941
2942     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2943       my $t = SQL::Translator->new($sqltargs);
2944       $t->debug( 0 );
2945       $t->trace( 0 );
2946
2947       $t->parser( $db )
2948         or $self->throw_exception ($t->error);
2949
2950       my $out = $t->translate( $filename )
2951         or $self->throw_exception ($t->error);
2952
2953       $dest_schema = $t->schema;
2954
2955       $dest_schema->name( $filename )
2956         unless $dest_schema->name;
2957     }
2958
2959     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2960                                                   $dest_schema,   $db,
2961                                                   $sqltargs
2962                                                  );
2963     if(!open $file, ">$difffile") {
2964       $self->throw_exception("Can't write to $difffile ($!)");
2965       next;
2966     }
2967     print $file $diff;
2968     close($file);
2969   }
2970 }
2971
2972 =head2 deployment_statements
2973
2974 =over 4
2975
2976 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2977
2978 =back
2979
2980 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2981
2982 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2983 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2984
2985 C<$directory> is used to return statements from files in a previously created
2986 L</create_ddl_dir> directory and is optional. The filenames are constructed
2987 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2988
2989 If no C<$directory> is specified then the statements are constructed on the
2990 fly using L<SQL::Translator> and C<$version> is ignored.
2991
2992 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2993
2994 =cut
2995
2996 sub deployment_statements {
2997   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2998   $type ||= $self->sqlt_type;
2999   $version ||= $schema->schema_version || '1.x';
3000   $dir ||= './';
3001   my $filename = $schema->ddl_filename($type, $version, $dir);
3002   if(-f $filename)
3003   {
3004       # FIXME replace this block when a proper sane sql parser is available
3005       my $file;
3006       open($file, "<$filename")
3007         or $self->throw_exception("Can't open $filename ($!)");
3008       my @rows = <$file>;
3009       close($file);
3010       return join('', @rows);
3011   }
3012
3013   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
3014     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
3015   }
3016
3017   # sources needs to be a parser arg, but for simplicity allow at top level
3018   # coming in
3019   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
3020       if exists $sqltargs->{sources};
3021
3022   my $tr = SQL::Translator->new(
3023     producer => "SQL::Translator::Producer::${type}",
3024     %$sqltargs,
3025     parser => 'SQL::Translator::Parser::DBIx::Class',
3026     data => $schema,
3027   );
3028
3029   return preserve_context {
3030     $tr->translate
3031   } after => sub {
3032     $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
3033       unless defined $_[0];
3034   };
3035 }
3036
3037 # FIXME deploy() currently does not accurately report sql errors
3038 # Will always return true while errors are warned
3039 sub deploy {
3040   my ($self, $schema, $type, $sqltargs, $dir) = @_;
3041   my $deploy = sub {
3042     my $line = shift;
3043     return if(!$line);
3044     return if($line =~ /^--/);
3045     # next if($line =~ /^DROP/m);
3046     return if($line =~ /^BEGIN TRANSACTION/m);
3047     return if($line =~ /^COMMIT/m);
3048     return if $line =~ /^\s+$/; # skip whitespace only
3049     $self->_query_start($line);
3050     try {
3051       # do a dbh_do cycle here, as we need some error checking in
3052       # place (even though we will ignore errors)
3053       $self->dbh_do (sub { $_[1]->do($line) });
3054     } catch {
3055       carp qq{$_ (running "${line}")};
3056     };
3057     $self->_query_end($line);
3058   };
3059   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
3060   if (@statements > 1) {
3061     foreach my $statement (@statements) {
3062       $deploy->( $statement );
3063     }
3064   }
3065   elsif (@statements == 1) {
3066     # split on single line comments and end of statements
3067     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
3068       $deploy->( $line );
3069     }
3070   }
3071 }
3072
3073 =head2 datetime_parser
3074
3075 Returns the datetime parser class
3076
3077 =cut
3078
3079 sub datetime_parser {
3080   my $self = shift;
3081   return $self->{datetime_parser} ||= do {
3082     $self->build_datetime_parser(@_);
3083   };
3084 }
3085
3086 =head2 datetime_parser_type
3087
3088 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
3089
3090 =head2 build_datetime_parser
3091
3092 See L</datetime_parser>
3093
3094 =cut
3095
3096 sub build_datetime_parser {
3097   my $self = shift;
3098   my $type = $self->datetime_parser_type(@_);
3099   return $type;
3100 }
3101
3102
3103 =head2 is_replicating
3104
3105 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
3106 replicate from a master database.  Default is undef, which is the result
3107 returned by databases that don't support replication.
3108
3109 =cut
3110
3111 sub is_replicating {
3112     return;
3113
3114 }
3115
3116 =head2 lag_behind_master
3117
3118 Returns a number that represents a certain amount of lag behind a master db
3119 when a given storage is replicating.  The number is database dependent, but
3120 starts at zero and increases with the amount of lag. Default in undef
3121
3122 =cut
3123
3124 sub lag_behind_master {
3125     return;
3126 }
3127
3128 =head2 relname_to_table_alias
3129
3130 =over 4
3131
3132 =item Arguments: $relname, $join_count
3133
3134 =item Return Value: $alias
3135
3136 =back
3137
3138 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
3139 queries.
3140
3141 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
3142 way these aliases are named.
3143
3144 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
3145 otherwise C<"$relname">.
3146
3147 =cut
3148
3149 sub relname_to_table_alias {
3150   my ($self, $relname, $join_count) = @_;
3151
3152   my $alias = ($join_count && $join_count > 1 ?
3153     join('_', $relname, $join_count) : $relname);
3154
3155   return $alias;
3156 }
3157
3158 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
3159 # version and it may be necessary to amend or override it for a specific storage
3160 # if such binds are necessary.
3161 sub _max_column_bytesize {
3162   my ($self, $attr) = @_;
3163
3164   my $max_size;
3165
3166   if ($attr->{sqlt_datatype}) {
3167     my $data_type = lc($attr->{sqlt_datatype});
3168
3169     if ($attr->{sqlt_size}) {
3170
3171       # String/sized-binary types
3172       if ($data_type =~ /^(?:
3173           l? (?:var)? char(?:acter)? (?:\s*varying)?
3174             |
3175           (?:var)? binary (?:\s*varying)?
3176             |
3177           raw
3178         )\b/x
3179       ) {
3180         $max_size = $attr->{sqlt_size};
3181       }
3182       # Other charset/unicode types, assume scale of 4
3183       elsif ($data_type =~ /^(?:
3184           national \s* character (?:\s*varying)?
3185             |
3186           nchar
3187             |
3188           univarchar
3189             |
3190           nvarchar
3191         )\b/x
3192       ) {
3193         $max_size = $attr->{sqlt_size} * 4;
3194       }
3195     }
3196
3197     if (!$max_size and !$self->_is_lob_type($data_type)) {
3198       $max_size = 100 # for all other (numeric?) datatypes
3199     }
3200   }
3201
3202   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
3203 }
3204
3205 # Determine if a data_type is some type of BLOB
3206 sub _is_lob_type {
3207   my ($self, $data_type) = @_;
3208   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
3209     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
3210                                   |varchar|character\s*varying|nvarchar
3211                                   |national\s*character\s*varying))?\z/xi);
3212 }
3213
3214 sub _is_binary_lob_type {
3215   my ($self, $data_type) = @_;
3216   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
3217     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
3218 }
3219
3220 sub _is_text_lob_type {
3221   my ($self, $data_type) = @_;
3222   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
3223     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
3224                         |national\s*character\s*varying))\z/xi);
3225 }
3226
3227 # Determine if a data_type is some type of a binary type
3228 sub _is_binary_type {
3229   my ($self, $data_type) = @_;
3230   $data_type && ($self->_is_binary_lob_type($data_type)
3231     || $data_type =~ /(?:var)?(?:binary|bit|graphic)(?:\s*varying)?/i);
3232 }
3233
3234 1;
3235
3236 =head1 USAGE NOTES
3237
3238 =head2 DBIx::Class and AutoCommit
3239
3240 DBIx::Class can do some wonderful magic with handling exceptions,
3241 disconnections, and transactions when you use C<< AutoCommit => 1 >>
3242 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
3243 transaction support.
3244
3245 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
3246 in an assumed transaction between commits, and you're telling us you'd
3247 like to manage that manually.  A lot of the magic protections offered by
3248 this module will go away.  We can't protect you from exceptions due to database
3249 disconnects because we don't know anything about how to restart your
3250 transactions.  You're on your own for handling all sorts of exceptional
3251 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
3252 be with raw DBI.
3253
3254
3255 =head1 AUTHOR AND CONTRIBUTORS
3256
3257 See L<AUTHOR|DBIx::Class/AUTHOR> and L<CONTRIBUTORS|DBIx::Class/CONTRIBUTORS> in DBIx::Class
3258
3259 =head1 LICENSE
3260
3261 You may distribute this code under the same terms as Perl itself.
3262
3263 =cut