Deprecate insert_bulk - we will be changing its signature down the road
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use Scalar::Util qw/refaddr weaken reftype blessed/;
12 use List::Util qw/first/;
13 use Context::Preserve 'preserve_context';
14 use Try::Tiny;
15 use Data::Compare (); # no imports!!! guard against insane architecture
16 use SQL::Abstract qw(is_plain_value is_literal_value);
17 use DBIx::Class::_Util qw(quote_sub perlstring);
18 use namespace::clean;
19
20 # default cursor class, overridable in connect_info attributes
21 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
22
23 __PACKAGE__->mk_group_accessors('inherited' => qw/
24   sql_limit_dialect sql_quote_char sql_name_sep
25 /);
26
27 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
28
29 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
30 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
31
32 __PACKAGE__->sql_name_sep('.');
33
34 __PACKAGE__->mk_group_accessors('simple' => qw/
35   _connect_info _dbic_connect_attributes _driver_determined
36   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
37   _perform_autoinc_retrieval _autoinc_supplied_for_op
38 /);
39
40 # the values for these accessors are picked out (and deleted) from
41 # the attribute hashref passed to connect_info
42 my @storage_options = qw/
43   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
44   disable_sth_caching unsafe auto_savepoint
45 /;
46 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
47
48
49 # capability definitions, using a 2-tiered accessor system
50 # The rationale is:
51 #
52 # A driver/user may define _use_X, which blindly without any checks says:
53 # "(do not) use this capability", (use_dbms_capability is an "inherited"
54 # type accessor)
55 #
56 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
57 # accessor, which in turn calls _determine_supports_X, and stores the return
58 # in a special slot on the storage object, which is wiped every time a $dbh
59 # reconnection takes place (it is not guaranteed that upon reconnection we
60 # will get the same rdbms version). _determine_supports_X does not need to
61 # exist on a driver, as we ->can for it before calling.
62
63 my @capabilities = (qw/
64   insert_returning
65   insert_returning_bound
66
67   multicolumn_in
68
69   placeholders
70   typeless_placeholders
71
72   join_optimizer
73 /);
74 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
75 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
76
77 # on by default, not strictly a capability (pending rewrite)
78 __PACKAGE__->_use_join_optimizer (1);
79 sub _determine_supports_join_optimizer { 1 };
80
81 # Each of these methods need _determine_driver called before itself
82 # in order to function reliably. We also need to separate accessors
83 # from plain old method calls, since an accessor called as a setter
84 # does *not* need the driver determination loop fired (and in fact
85 # can produce hard to find bugs, like e.g. losing on_connect_*
86 # semantics on fresh connections)
87 #
88 # The construct below is simply a parameterized around()
89 my $storage_accessor_idx = { map { $_ => 1 } qw(
90   sqlt_type
91   datetime_parser_type
92
93   sql_maker
94   cursor_class
95 )};
96 for my $meth (keys %$storage_accessor_idx, qw(
97   deployment_statements
98
99   build_datetime_parser
100
101   txn_begin
102
103   insert
104   update
105   delete
106   select
107   select_single
108
109   _insert_bulk
110
111   with_deferred_fk_checks
112
113   get_use_dbms_capability
114   get_dbms_capability
115
116   _server_info
117   _get_server_version
118 )) {
119
120   my $orig = __PACKAGE__->can ($meth)
121     or die "$meth is not a ::Storage::DBI method!";
122
123   my $is_getter = $storage_accessor_idx->{$meth} ? 0 : 1;
124
125   quote_sub
126     __PACKAGE__ ."::$meth", sprintf( <<'EOC', $is_getter, perlstring $meth ), { '$orig' => \$orig };
127
128     if (
129       # only fire when invoked on an instance, a valid class-based invocation
130       # would e.g. be setting a default for an inherited accessor
131       ref $_[0]
132         and
133       ! $_[0]->{_driver_determined}
134         and
135       ! $_[0]->{_in_determine_driver}
136         and
137       # if this is a known *setter* - just set it, no need to connect
138       # and determine the driver
139       ( %1$s or @_ <= 1 )
140         and
141       # Only try to determine stuff if we have *something* that either is or can
142       # provide a DSN. Allows for bare $schema's generated with a plain ->connect()
143       # to still be marginally useful
144       $_[0]->_dbi_connect_info->[0]
145     ) {
146       $_[0]->_determine_driver;
147
148       # work around http://rt.perl.org/rt3/Public/Bug/Display.html?id=35878
149       goto $_[0]->can(%2$s) unless DBIx::Class::_ENV_::BROKEN_GOTO;
150
151       my $cref = $_[0]->can(%2$s);
152       goto $cref;
153     }
154
155     goto $orig;
156 EOC
157 }
158
159 =head1 NAME
160
161 DBIx::Class::Storage::DBI - DBI storage handler
162
163 =head1 SYNOPSIS
164
165   my $schema = MySchema->connect('dbi:SQLite:my.db');
166
167   $schema->storage->debug(1);
168
169   my @stuff = $schema->storage->dbh_do(
170     sub {
171       my ($storage, $dbh, @args) = @_;
172       $dbh->do("DROP TABLE authors");
173     },
174     @column_list
175   );
176
177   $schema->resultset('Book')->search({
178      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
179   });
180
181 =head1 DESCRIPTION
182
183 This class represents the connection to an RDBMS via L<DBI>.  See
184 L<DBIx::Class::Storage> for general information.  This pod only
185 documents DBI-specific methods and behaviors.
186
187 =head1 METHODS
188
189 =cut
190
191 sub new {
192   my $new = shift->next::method(@_);
193
194   $new->_sql_maker_opts({});
195   $new->_dbh_details({});
196   $new->{_in_do_block} = 0;
197
198   # read below to see what this does
199   $new->_arm_global_destructor;
200
201   $new;
202 }
203
204 # This is hack to work around perl shooting stuff in random
205 # order on exit(). If we do not walk the remaining storage
206 # objects in an END block, there is a *small but real* chance
207 # of a fork()ed child to kill the parent's shared DBI handle,
208 # *before perl reaches the DESTROY in this package*
209 # Yes, it is ugly and effective.
210 # Additionally this registry is used by the CLONE method to
211 # make sure no handles are shared between threads
212 {
213   my %seek_and_destroy;
214
215   sub _arm_global_destructor {
216
217     # quick "garbage collection" pass - prevents the registry
218     # from slowly growing with a bunch of undef-valued keys
219     defined $seek_and_destroy{$_} or delete $seek_and_destroy{$_}
220       for keys %seek_and_destroy;
221
222     weaken (
223       $seek_and_destroy{ refaddr($_[0]) } = $_[0]
224     );
225   }
226
227   END {
228     local $?; # just in case the DBI destructor changes it somehow
229
230     # destroy just the object if not native to this process
231     $_->_verify_pid for (grep
232       { defined $_ }
233       values %seek_and_destroy
234     );
235   }
236
237   sub CLONE {
238     # As per DBI's recommendation, DBIC disconnects all handles as
239     # soon as possible (DBIC will reconnect only on demand from within
240     # the thread)
241     my @instances = grep { defined $_ } values %seek_and_destroy;
242     %seek_and_destroy = ();
243
244     for (@instances) {
245       $_->_dbh(undef);
246
247       $_->transaction_depth(0);
248       $_->savepoints([]);
249
250       # properly renumber existing refs
251       $_->_arm_global_destructor
252     }
253   }
254 }
255
256 sub DESTROY {
257   my $self = shift;
258
259   # some databases spew warnings on implicit disconnect
260   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
261   local $SIG{__WARN__} = sub {};
262   $self->_dbh(undef);
263
264   # this op is necessary, since the very last perl runtime statement
265   # triggers a global destruction shootout, and the $SIG localization
266   # may very well be destroyed before perl actually gets to do the
267   # $dbh undef
268   1;
269 }
270
271 # handle pid changes correctly - do not destroy parent's connection
272 sub _verify_pid {
273   my $self = shift;
274
275   my $pid = $self->_conn_pid;
276   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
277     $dbh->{InactiveDestroy} = 1;
278     $self->_dbh(undef);
279     $self->transaction_depth(0);
280     $self->savepoints([]);
281   }
282
283   return;
284 }
285
286 =head2 connect_info
287
288 This method is normally called by L<DBIx::Class::Schema/connection>, which
289 encapsulates its argument list in an arrayref before passing them here.
290
291 The argument list may contain:
292
293 =over
294
295 =item *
296
297 The same 4-element argument set one would normally pass to
298 L<DBI/connect>, optionally followed by
299 L<extra attributes|/DBIx::Class specific connection attributes>
300 recognized by DBIx::Class:
301
302   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
303
304 =item *
305
306 A single code reference which returns a connected
307 L<DBI database handle|DBI/connect> optionally followed by
308 L<extra attributes|/DBIx::Class specific connection attributes> recognized
309 by DBIx::Class:
310
311   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
312
313 =item *
314
315 A single hashref with all the attributes and the dsn/user/password
316 mixed together:
317
318   $connect_info_args = [{
319     dsn => $dsn,
320     user => $user,
321     password => $pass,
322     %dbi_attributes,
323     %extra_attributes,
324   }];
325
326   $connect_info_args = [{
327     dbh_maker => sub { DBI->connect (...) },
328     %dbi_attributes,
329     %extra_attributes,
330   }];
331
332 This is particularly useful for L<Catalyst> based applications, allowing the
333 following config (L<Config::General> style):
334
335   <Model::DB>
336     schema_class   App::DB
337     <connect_info>
338       dsn          dbi:mysql:database=test
339       user         testuser
340       password     TestPass
341       AutoCommit   1
342     </connect_info>
343   </Model::DB>
344
345 The C<dsn>/C<user>/C<password> combination can be substituted by the
346 C<dbh_maker> key whose value is a coderef that returns a connected
347 L<DBI database handle|DBI/connect>
348
349 =back
350
351 Please note that the L<DBI> docs recommend that you always explicitly
352 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
353 recommends that it be set to I<1>, and that you perform transactions
354 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
355 to I<1> if you do not do explicitly set it to zero.  This is the default
356 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
357
358 =head3 DBIx::Class specific connection attributes
359
360 In addition to the standard L<DBI|DBI/ATTRIBUTES COMMON TO ALL HANDLES>
361 L<connection|DBI/Database Handle Attributes> attributes, DBIx::Class recognizes
362 the following connection options. These options can be mixed in with your other
363 L<DBI> connection attributes, or placed in a separate hashref
364 (C<\%extra_attributes>) as shown above.
365
366 Every time C<connect_info> is invoked, any previous settings for
367 these options will be cleared before setting the new ones, regardless of
368 whether any options are specified in the new C<connect_info>.
369
370
371 =over
372
373 =item on_connect_do
374
375 Specifies things to do immediately after connecting or re-connecting to
376 the database.  Its value may contain:
377
378 =over
379
380 =item a scalar
381
382 This contains one SQL statement to execute.
383
384 =item an array reference
385
386 This contains SQL statements to execute in order.  Each element contains
387 a string or a code reference that returns a string.
388
389 =item a code reference
390
391 This contains some code to execute.  Unlike code references within an
392 array reference, its return value is ignored.
393
394 =back
395
396 =item on_disconnect_do
397
398 Takes arguments in the same form as L</on_connect_do> and executes them
399 immediately before disconnecting from the database.
400
401 Note, this only runs if you explicitly call L</disconnect> on the
402 storage object.
403
404 =item on_connect_call
405
406 A more generalized form of L</on_connect_do> that calls the specified
407 C<connect_call_METHOD> methods in your storage driver.
408
409   on_connect_do => 'select 1'
410
411 is equivalent to:
412
413   on_connect_call => [ [ do_sql => 'select 1' ] ]
414
415 Its values may contain:
416
417 =over
418
419 =item a scalar
420
421 Will call the C<connect_call_METHOD> method.
422
423 =item a code reference
424
425 Will execute C<< $code->($storage) >>
426
427 =item an array reference
428
429 Each value can be a method name or code reference.
430
431 =item an array of arrays
432
433 For each array, the first item is taken to be the C<connect_call_> method name
434 or code reference, and the rest are parameters to it.
435
436 =back
437
438 Some predefined storage methods you may use:
439
440 =over
441
442 =item do_sql
443
444 Executes a SQL string or a code reference that returns a SQL string. This is
445 what L</on_connect_do> and L</on_disconnect_do> use.
446
447 It can take:
448
449 =over
450
451 =item a scalar
452
453 Will execute the scalar as SQL.
454
455 =item an arrayref
456
457 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
458 attributes hashref and bind values.
459
460 =item a code reference
461
462 Will execute C<< $code->($storage) >> and execute the return array refs as
463 above.
464
465 =back
466
467 =item datetime_setup
468
469 Execute any statements necessary to initialize the database session to return
470 and accept datetime/timestamp values used with
471 L<DBIx::Class::InflateColumn::DateTime>.
472
473 Only necessary for some databases, see your specific storage driver for
474 implementation details.
475
476 =back
477
478 =item on_disconnect_call
479
480 Takes arguments in the same form as L</on_connect_call> and executes them
481 immediately before disconnecting from the database.
482
483 Calls the C<disconnect_call_METHOD> methods as opposed to the
484 C<connect_call_METHOD> methods called by L</on_connect_call>.
485
486 Note, this only runs if you explicitly call L</disconnect> on the
487 storage object.
488
489 =item disable_sth_caching
490
491 If set to a true value, this option will disable the caching of
492 statement handles via L<DBI/prepare_cached>.
493
494 =item limit_dialect
495
496 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
497 default L</sql_limit_dialect> setting of the storage (if any). For a list
498 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
499
500 =item quote_names
501
502 When true automatically sets L</quote_char> and L</name_sep> to the characters
503 appropriate for your particular RDBMS. This option is preferred over specifying
504 L</quote_char> directly.
505
506 =item quote_char
507
508 Specifies what characters to use to quote table and column names.
509
510 C<quote_char> expects either a single character, in which case is it
511 is placed on either side of the table/column name, or an arrayref of length
512 2 in which case the table/column name is placed between the elements.
513
514 For example under MySQL you should use C<< quote_char => '`' >>, and for
515 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
516
517 =item name_sep
518
519 This parameter is only useful in conjunction with C<quote_char>, and is used to
520 specify the character that separates elements (schemas, tables, columns) from
521 each other. If unspecified it defaults to the most commonly used C<.>.
522
523 =item unsafe
524
525 This Storage driver normally installs its own C<HandleError>, sets
526 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
527 all database handles, including those supplied by a coderef.  It does this
528 so that it can have consistent and useful error behavior.
529
530 If you set this option to a true value, Storage will not do its usual
531 modifications to the database handle's attributes, and instead relies on
532 the settings in your connect_info DBI options (or the values you set in
533 your connection coderef, in the case that you are connecting via coderef).
534
535 Note that your custom settings can cause Storage to malfunction,
536 especially if you set a C<HandleError> handler that suppresses exceptions
537 and/or disable C<RaiseError>.
538
539 =item auto_savepoint
540
541 If this option is true, L<DBIx::Class> will use savepoints when nesting
542 transactions, making it possible to recover from failure in the inner
543 transaction without having to abort all outer transactions.
544
545 =item cursor_class
546
547 Use this argument to supply a cursor class other than the default
548 L<DBIx::Class::Storage::DBI::Cursor>.
549
550 =back
551
552 Some real-life examples of arguments to L</connect_info> and
553 L<DBIx::Class::Schema/connect>
554
555   # Simple SQLite connection
556   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
557
558   # Connect via subref
559   ->connect_info([ sub { DBI->connect(...) } ]);
560
561   # Connect via subref in hashref
562   ->connect_info([{
563     dbh_maker => sub { DBI->connect(...) },
564     on_connect_do => 'alter session ...',
565   }]);
566
567   # A bit more complicated
568   ->connect_info(
569     [
570       'dbi:Pg:dbname=foo',
571       'postgres',
572       'my_pg_password',
573       { AutoCommit => 1 },
574       { quote_char => q{"} },
575     ]
576   );
577
578   # Equivalent to the previous example
579   ->connect_info(
580     [
581       'dbi:Pg:dbname=foo',
582       'postgres',
583       'my_pg_password',
584       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
585     ]
586   );
587
588   # Same, but with hashref as argument
589   # See parse_connect_info for explanation
590   ->connect_info(
591     [{
592       dsn         => 'dbi:Pg:dbname=foo',
593       user        => 'postgres',
594       password    => 'my_pg_password',
595       AutoCommit  => 1,
596       quote_char  => q{"},
597       name_sep    => q{.},
598     }]
599   );
600
601   # Subref + DBIx::Class-specific connection options
602   ->connect_info(
603     [
604       sub { DBI->connect(...) },
605       {
606           quote_char => q{`},
607           name_sep => q{@},
608           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
609           disable_sth_caching => 1,
610       },
611     ]
612   );
613
614
615
616 =cut
617
618 sub connect_info {
619   my ($self, $info) = @_;
620
621   return $self->_connect_info if !$info;
622
623   $self->_connect_info($info); # copy for _connect_info
624
625   $info = $self->_normalize_connect_info($info)
626     if ref $info eq 'ARRAY';
627
628   my %attrs = (
629     %{ $self->_default_dbi_connect_attributes || {} },
630     %{ $info->{attributes} || {} },
631   );
632
633   my @args = @{ $info->{arguments} };
634
635   if (keys %attrs and ref $args[0] ne 'CODE') {
636     carp_unique (
637         'You provided explicit AutoCommit => 0 in your connection_info. '
638       . 'This is almost universally a bad idea (see the footnotes of '
639       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
640       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
641       . 'this warning.'
642     ) if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
643
644     push @args, \%attrs if keys %attrs;
645   }
646
647   # this is the authoritative "always an arrayref" thing fed to DBI->connect
648   # OR a single-element coderef-based $dbh factory
649   $self->_dbi_connect_info(\@args);
650
651   # extract the individual storage options
652   for my $storage_opt (keys %{ $info->{storage_options} }) {
653     my $value = $info->{storage_options}{$storage_opt};
654
655     $self->$storage_opt($value);
656   }
657
658   # Extract the individual sqlmaker options
659   #
660   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
661   #  the new set of options
662   $self->_sql_maker(undef);
663   $self->_sql_maker_opts({});
664
665   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
666     my $value = $info->{sql_maker_options}{$sql_maker_opt};
667
668     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
669   }
670
671   # FIXME - dirty:
672   # save attributes in a separate accessor so they are always
673   # introspectable, even in case of a CODE $dbhmaker
674   $self->_dbic_connect_attributes (\%attrs);
675
676   return $self->_connect_info;
677 }
678
679 sub _dbi_connect_info {
680   my $self = shift;
681
682   return $self->{_dbi_connect_info} = $_[0]
683     if @_;
684
685   my $conninfo = $self->{_dbi_connect_info} || [];
686
687   # last ditch effort to grab a DSN
688   if ( ! defined $conninfo->[0] and $ENV{DBI_DSN} ) {
689     my @new_conninfo = @$conninfo;
690     $new_conninfo[0] = $ENV{DBI_DSN};
691     $conninfo = \@new_conninfo;
692   }
693
694   return $conninfo;
695 }
696
697
698 sub _normalize_connect_info {
699   my ($self, $info_arg) = @_;
700   my %info;
701
702   my @args = @$info_arg;  # take a shallow copy for further mutilation
703
704   # combine/pre-parse arguments depending on invocation style
705
706   my %attrs;
707   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
708     %attrs = %{ $args[1] || {} };
709     @args = $args[0];
710   }
711   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
712     %attrs = %{$args[0]};
713     @args = ();
714     if (my $code = delete $attrs{dbh_maker}) {
715       @args = $code;
716
717       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
718       if (@ignored) {
719         carp sprintf (
720             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
721           . "to the result of 'dbh_maker'",
722
723           join (', ', map { "'$_'" } (@ignored) ),
724         );
725       }
726     }
727     else {
728       @args = delete @attrs{qw/dsn user password/};
729     }
730   }
731   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
732     %attrs = (
733       % { $args[3] || {} },
734       % { $args[4] || {} },
735     );
736     @args = @args[0,1,2];
737   }
738
739   $info{arguments} = \@args;
740
741   my @storage_opts = grep exists $attrs{$_},
742     @storage_options, 'cursor_class';
743
744   @{ $info{storage_options} }{@storage_opts} =
745     delete @attrs{@storage_opts} if @storage_opts;
746
747   my @sql_maker_opts = grep exists $attrs{$_},
748     qw/limit_dialect quote_char name_sep quote_names/;
749
750   @{ $info{sql_maker_options} }{@sql_maker_opts} =
751     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
752
753   $info{attributes} = \%attrs if %attrs;
754
755   return \%info;
756 }
757
758 sub _default_dbi_connect_attributes () {
759   +{
760     AutoCommit => 1,
761     PrintError => 0,
762     RaiseError => 1,
763     ShowErrorStatement => 1,
764   };
765 }
766
767 =head2 on_connect_do
768
769 This method is deprecated in favour of setting via L</connect_info>.
770
771 =cut
772
773 =head2 on_disconnect_do
774
775 This method is deprecated in favour of setting via L</connect_info>.
776
777 =cut
778
779 sub _parse_connect_do {
780   my ($self, $type) = @_;
781
782   my $val = $self->$type;
783   return () if not defined $val;
784
785   my @res;
786
787   if (not ref($val)) {
788     push @res, [ 'do_sql', $val ];
789   } elsif (ref($val) eq 'CODE') {
790     push @res, $val;
791   } elsif (ref($val) eq 'ARRAY') {
792     push @res, map { [ 'do_sql', $_ ] } @$val;
793   } else {
794     $self->throw_exception("Invalid type for $type: ".ref($val));
795   }
796
797   return \@res;
798 }
799
800 =head2 dbh_do
801
802 Arguments: ($subref | $method_name), @extra_coderef_args?
803
804 Execute the given $subref or $method_name using the new exception-based
805 connection management.
806
807 The first two arguments will be the storage object that C<dbh_do> was called
808 on and a database handle to use.  Any additional arguments will be passed
809 verbatim to the called subref as arguments 2 and onwards.
810
811 Using this (instead of $self->_dbh or $self->dbh) ensures correct
812 exception handling and reconnection (or failover in future subclasses).
813
814 Your subref should have no side-effects outside of the database, as
815 there is the potential for your subref to be partially double-executed
816 if the database connection was stale/dysfunctional.
817
818 Example:
819
820   my @stuff = $schema->storage->dbh_do(
821     sub {
822       my ($storage, $dbh, @cols) = @_;
823       my $cols = join(q{, }, @cols);
824       $dbh->selectrow_array("SELECT $cols FROM foo");
825     },
826     @column_list
827   );
828
829 =cut
830
831 sub dbh_do {
832   my $self = shift;
833   my $run_target = shift; # either a coderef or a method name
834
835   # short circuit when we know there is no need for a runner
836   #
837   # FIXME - assumption may be wrong
838   # the rationale for the txn_depth check is that if this block is a part
839   # of a larger transaction, everything up to that point is screwed anyway
840   return $self->$run_target($self->_get_dbh, @_)
841     if $self->{_in_do_block} or $self->transaction_depth;
842
843   # take a ref instead of a copy, to preserve @_ aliasing
844   # semantics within the coderef, but only if needed
845   # (pseudoforking doesn't like this trick much)
846   my $args = @_ ? \@_ : [];
847
848   DBIx::Class::Storage::BlockRunner->new(
849     storage => $self,
850     wrap_txn => 0,
851     retry_handler => sub {
852       $_[0]->failed_attempt_count == 1
853         and
854       ! $_[0]->storage->connected
855     },
856   )->run(sub {
857     $self->$run_target ($self->_get_dbh, @$args )
858   });
859 }
860
861 sub txn_do {
862   $_[0]->_get_dbh; # connects or reconnects on pid change, necessary to grab correct txn_depth
863   shift->next::method(@_);
864 }
865
866 =head2 disconnect
867
868 Our C<disconnect> method also performs a rollback first if the
869 database is not in C<AutoCommit> mode.
870
871 =cut
872
873 sub disconnect {
874   my ($self) = @_;
875
876   if( $self->_dbh ) {
877     my @actions;
878
879     push @actions, ( $self->on_disconnect_call || () );
880     push @actions, $self->_parse_connect_do ('on_disconnect_do');
881
882     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
883
884     # stops the "implicit rollback on disconnect" warning
885     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
886
887     %{ $self->_dbh->{CachedKids} } = ();
888     $self->_dbh->disconnect;
889     $self->_dbh(undef);
890   }
891 }
892
893 =head2 with_deferred_fk_checks
894
895 =over 4
896
897 =item Arguments: C<$coderef>
898
899 =item Return Value: The return value of $coderef
900
901 =back
902
903 Storage specific method to run the code ref with FK checks deferred or
904 in MySQL's case disabled entirely.
905
906 =cut
907
908 # Storage subclasses should override this
909 sub with_deferred_fk_checks {
910   my ($self, $sub) = @_;
911   $sub->();
912 }
913
914 =head2 connected
915
916 =over
917
918 =item Arguments: none
919
920 =item Return Value: 1|0
921
922 =back
923
924 Verifies that the current database handle is active and ready to execute
925 an SQL statement (e.g. the connection did not get stale, server is still
926 answering, etc.) This method is used internally by L</dbh>.
927
928 =cut
929
930 sub connected {
931   my $self = shift;
932   return 0 unless $self->_seems_connected;
933
934   #be on the safe side
935   local $self->_dbh->{RaiseError} = 1;
936
937   return $self->_ping;
938 }
939
940 sub _seems_connected {
941   my $self = shift;
942
943   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
944
945   my $dbh = $self->_dbh
946     or return 0;
947
948   return $dbh->FETCH('Active');
949 }
950
951 sub _ping {
952   my $self = shift;
953
954   my $dbh = $self->_dbh or return 0;
955
956   return $dbh->ping;
957 }
958
959 sub ensure_connected {
960   my ($self) = @_;
961
962   unless ($self->connected) {
963     $self->_populate_dbh;
964   }
965 }
966
967 =head2 dbh
968
969 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
970 is guaranteed to be healthy by implicitly calling L</connected>, and if
971 necessary performing a reconnection before returning. Keep in mind that this
972 is very B<expensive> on some database engines. Consider using L</dbh_do>
973 instead.
974
975 =cut
976
977 sub dbh {
978   my ($self) = @_;
979
980   if (not $self->_dbh) {
981     $self->_populate_dbh;
982   } else {
983     $self->ensure_connected;
984   }
985   return $self->_dbh;
986 }
987
988 # this is the internal "get dbh or connect (don't check)" method
989 sub _get_dbh {
990   my $self = shift;
991   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
992   $self->_populate_dbh unless $self->_dbh;
993   return $self->_dbh;
994 }
995
996 # *DELIBERATELY* not a setter (for the time being)
997 # Too intertwined with everything else for any kind of sanity
998 sub sql_maker {
999   my $self = shift;
1000
1001   $self->throw_exception('sql_maker() is not a setter method') if @_;
1002
1003   unless ($self->_sql_maker) {
1004     my $sql_maker_class = $self->sql_maker_class;
1005
1006     my %opts = %{$self->_sql_maker_opts||{}};
1007     my $dialect =
1008       $opts{limit_dialect}
1009         ||
1010       $self->sql_limit_dialect
1011         ||
1012       do {
1013         my $s_class = (ref $self) || $self;
1014         carp_unique (
1015           "Your storage class ($s_class) does not set sql_limit_dialect and you "
1016         . 'have not supplied an explicit limit_dialect in your connection_info. '
1017         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1018         . 'databases but can be (and often is) painfully slow. '
1019         . "Please file an RT ticket against '$s_class'"
1020         ) if $self->_dbi_connect_info->[0];
1021
1022         'GenericSubQ';
1023       }
1024     ;
1025
1026     my ($quote_char, $name_sep);
1027
1028     if ($opts{quote_names}) {
1029       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1030         my $s_class = (ref $self) || $self;
1031         carp_unique (
1032           "You requested 'quote_names' but your storage class ($s_class) does "
1033         . 'not explicitly define a default sql_quote_char and you have not '
1034         . 'supplied a quote_char as part of your connection_info. DBIC will '
1035         .q{default to the ANSI SQL standard quote '"', which works most of }
1036         . "the time. Please file an RT ticket against '$s_class'."
1037         );
1038
1039         '"'; # RV
1040       };
1041
1042       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1043     }
1044
1045     $self->_sql_maker($sql_maker_class->new(
1046       bindtype=>'columns',
1047       array_datatypes => 1,
1048       limit_dialect => $dialect,
1049       ($quote_char ? (quote_char => $quote_char) : ()),
1050       name_sep => ($name_sep || '.'),
1051       %opts,
1052     ));
1053   }
1054   return $self->_sql_maker;
1055 }
1056
1057 # nothing to do by default
1058 sub _rebless {}
1059 sub _init {}
1060
1061 sub _populate_dbh {
1062   my ($self) = @_;
1063
1064   $self->_dbh(undef); # in case ->connected failed we might get sent here
1065   $self->_dbh_details({}); # reset everything we know
1066   $self->_sql_maker(undef); # this may also end up being different
1067
1068   $self->_dbh($self->_connect);
1069
1070   $self->_conn_pid($$) unless DBIx::Class::_ENV_::BROKEN_FORK; # on win32 these are in fact threads
1071
1072   $self->_determine_driver;
1073
1074   # Always set the transaction depth on connect, since
1075   #  there is no transaction in progress by definition
1076   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1077
1078   $self->_run_connection_actions unless $self->{_in_determine_driver};
1079 }
1080
1081 sub _run_connection_actions {
1082   my $self = shift;
1083   my @actions;
1084
1085   push @actions, ( $self->on_connect_call || () );
1086   push @actions, $self->_parse_connect_do ('on_connect_do');
1087
1088   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1089 }
1090
1091
1092
1093 sub set_use_dbms_capability {
1094   $_[0]->set_inherited ($_[1], $_[2]);
1095 }
1096
1097 sub get_use_dbms_capability {
1098   my ($self, $capname) = @_;
1099
1100   my $use = $self->get_inherited ($capname);
1101   return defined $use
1102     ? $use
1103     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1104   ;
1105 }
1106
1107 sub set_dbms_capability {
1108   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1109 }
1110
1111 sub get_dbms_capability {
1112   my ($self, $capname) = @_;
1113
1114   my $cap = $self->_dbh_details->{capability}{$capname};
1115
1116   unless (defined $cap) {
1117     if (my $meth = $self->can ("_determine$capname")) {
1118       $cap = $self->$meth ? 1 : 0;
1119     }
1120     else {
1121       $cap = 0;
1122     }
1123
1124     $self->set_dbms_capability ($capname, $cap);
1125   }
1126
1127   return $cap;
1128 }
1129
1130 sub _server_info {
1131   my $self = shift;
1132
1133   my $info;
1134   unless ($info = $self->_dbh_details->{info}) {
1135
1136     $info = {};
1137
1138     my $server_version = try {
1139       $self->_get_server_version
1140     } catch {
1141       # driver determination *may* use this codepath
1142       # in which case we must rethrow
1143       $self->throw_exception($_) if $self->{_in_determine_driver};
1144
1145       # $server_version on failure
1146       undef;
1147     };
1148
1149     if (defined $server_version) {
1150       $info->{dbms_version} = $server_version;
1151
1152       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1153       my @verparts = split (/\./, $numeric_version);
1154       if (
1155         @verparts
1156           &&
1157         $verparts[0] <= 999
1158       ) {
1159         # consider only up to 3 version parts, iff not more than 3 digits
1160         my @use_parts;
1161         while (@verparts && @use_parts < 3) {
1162           my $p = shift @verparts;
1163           last if $p > 999;
1164           push @use_parts, $p;
1165         }
1166         push @use_parts, 0 while @use_parts < 3;
1167
1168         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1169       }
1170     }
1171
1172     $self->_dbh_details->{info} = $info;
1173   }
1174
1175   return $info;
1176 }
1177
1178 sub _get_server_version {
1179   shift->_dbh_get_info('SQL_DBMS_VER');
1180 }
1181
1182 sub _dbh_get_info {
1183   my ($self, $info) = @_;
1184
1185   if ($info =~ /[^0-9]/) {
1186     require DBI::Const::GetInfoType;
1187     $info = $DBI::Const::GetInfoType::GetInfoType{$info};
1188     $self->throw_exception("Info type '$_[1]' not provided by DBI::Const::GetInfoType")
1189       unless defined $info;
1190   }
1191
1192   $self->_get_dbh->get_info($info);
1193 }
1194
1195 sub _describe_connection {
1196   require DBI::Const::GetInfoReturn;
1197
1198   my $self = shift;
1199
1200   my $drv;
1201   try {
1202     $drv = $self->_extract_driver_from_connect_info;
1203     $self->ensure_connected;
1204   };
1205
1206   $drv = "DBD::$drv" if $drv;
1207
1208   my $res = {
1209     DBIC_DSN => $self->_dbi_connect_info->[0],
1210     DBI_VER => DBI->VERSION,
1211     DBIC_VER => DBIx::Class->VERSION,
1212     DBIC_DRIVER => ref $self,
1213     $drv ? (
1214       DBD => $drv,
1215       DBD_VER => try { $drv->VERSION },
1216     ) : (),
1217   };
1218
1219   # try to grab data even if we never managed to connect
1220   # will cover us in cases of an oddly broken half-connect
1221   for my $inf (
1222     #keys %DBI::Const::GetInfoType::GetInfoType,
1223     qw/
1224       SQL_CURSOR_COMMIT_BEHAVIOR
1225       SQL_CURSOR_ROLLBACK_BEHAVIOR
1226       SQL_CURSOR_SENSITIVITY
1227       SQL_DATA_SOURCE_NAME
1228       SQL_DBMS_NAME
1229       SQL_DBMS_VER
1230       SQL_DEFAULT_TXN_ISOLATION
1231       SQL_DM_VER
1232       SQL_DRIVER_NAME
1233       SQL_DRIVER_ODBC_VER
1234       SQL_DRIVER_VER
1235       SQL_EXPRESSIONS_IN_ORDERBY
1236       SQL_GROUP_BY
1237       SQL_IDENTIFIER_CASE
1238       SQL_IDENTIFIER_QUOTE_CHAR
1239       SQL_MAX_CATALOG_NAME_LEN
1240       SQL_MAX_COLUMN_NAME_LEN
1241       SQL_MAX_IDENTIFIER_LEN
1242       SQL_MAX_TABLE_NAME_LEN
1243       SQL_MULTIPLE_ACTIVE_TXN
1244       SQL_MULT_RESULT_SETS
1245       SQL_NEED_LONG_DATA_LEN
1246       SQL_NON_NULLABLE_COLUMNS
1247       SQL_ODBC_VER
1248       SQL_QUALIFIER_NAME_SEPARATOR
1249       SQL_QUOTED_IDENTIFIER_CASE
1250       SQL_TXN_CAPABLE
1251       SQL_TXN_ISOLATION_OPTION
1252     /
1253   ) {
1254     # some drivers barf on things they do not know about instead
1255     # of returning undef
1256     my $v = try { $self->_dbh_get_info($inf) };
1257     next unless defined $v;
1258
1259     #my $key = sprintf( '%s(%s)', $inf, $DBI::Const::GetInfoType::GetInfoType{$inf} );
1260     my $expl = DBI::Const::GetInfoReturn::Explain($inf, $v);
1261     $res->{$inf} = DBI::Const::GetInfoReturn::Format($inf, $v) . ( $expl ? " ($expl)" : '' );
1262   }
1263
1264   $res;
1265 }
1266
1267 sub _determine_driver {
1268   my ($self) = @_;
1269
1270   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1271     my $started_connected = 0;
1272     local $self->{_in_determine_driver} = 1;
1273
1274     if (ref($self) eq __PACKAGE__) {
1275       my $driver;
1276       if ($self->_dbh) { # we are connected
1277         $driver = $self->_dbh->{Driver}{Name};
1278         $started_connected = 1;
1279       }
1280       else {
1281         $driver = $self->_extract_driver_from_connect_info;
1282       }
1283
1284       if ($driver) {
1285         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1286         if ($self->load_optional_class($storage_class)) {
1287           mro::set_mro($storage_class, 'c3');
1288           bless $self, $storage_class;
1289           $self->_rebless();
1290         }
1291         else {
1292           $self->_warn_undetermined_driver(
1293             'This version of DBIC does not yet seem to supply a driver for '
1294           . "your particular RDBMS and/or connection method ('$driver')."
1295           );
1296         }
1297       }
1298       else {
1299         $self->_warn_undetermined_driver(
1300           'Unable to extract a driver name from connect info - this '
1301         . 'should not have happened.'
1302         );
1303       }
1304     }
1305
1306     $self->_driver_determined(1);
1307
1308     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1309
1310     if ($self->can('source_bind_attributes')) {
1311       $self->throw_exception(
1312         "Your storage subclass @{[ ref $self ]} provides (or inherits) the method "
1313       . 'source_bind_attributes() for which support has been removed as of Jan 2013. '
1314       . 'If you are not sure how to proceed please contact the development team via '
1315       . DBIx::Class::_ENV_::HELP_URL
1316       );
1317     }
1318
1319     $self->_init; # run driver-specific initializations
1320
1321     $self->_run_connection_actions
1322         if !$started_connected && defined $self->_dbh;
1323   }
1324 }
1325
1326 sub _extract_driver_from_connect_info {
1327   my $self = shift;
1328
1329   my $drv;
1330
1331   # if connect_info is a CODEREF, we have no choice but to connect
1332   if (
1333     ref $self->_dbi_connect_info->[0]
1334       and
1335     reftype $self->_dbi_connect_info->[0] eq 'CODE'
1336   ) {
1337     $self->_populate_dbh;
1338     $drv = $self->_dbh->{Driver}{Name};
1339   }
1340   else {
1341     # try to use dsn to not require being connected, the driver may still
1342     # force a connection later in _rebless to determine version
1343     # (dsn may not be supplied at all if all we do is make a mock-schema)
1344     ($drv) = ($self->_dbi_connect_info->[0] || '') =~ /^dbi:([^:]+):/i;
1345     $drv ||= $ENV{DBI_DRIVER};
1346   }
1347
1348   return $drv;
1349 }
1350
1351 sub _determine_connector_driver {
1352   my ($self, $conn) = @_;
1353
1354   my $dbtype = $self->_dbh_get_info('SQL_DBMS_NAME');
1355
1356   if (not $dbtype) {
1357     $self->_warn_undetermined_driver(
1358       'Unable to retrieve RDBMS type (SQL_DBMS_NAME) of the engine behind your '
1359     . "$conn connector - this should not have happened."
1360     );
1361     return;
1362   }
1363
1364   $dbtype =~ s/\W/_/gi;
1365
1366   my $subclass = "DBIx::Class::Storage::DBI::${conn}::${dbtype}";
1367   return if $self->isa($subclass);
1368
1369   if ($self->load_optional_class($subclass)) {
1370     bless $self, $subclass;
1371     $self->_rebless;
1372   }
1373   else {
1374     $self->_warn_undetermined_driver(
1375       'This version of DBIC does not yet seem to supply a driver for '
1376     . "your particular RDBMS and/or connection method ('$conn/$dbtype')."
1377     );
1378   }
1379 }
1380
1381 sub _warn_undetermined_driver {
1382   my ($self, $msg) = @_;
1383
1384   require Data::Dumper::Concise;
1385
1386   carp_once ($msg . ' While we will attempt to continue anyway, the results '
1387   . 'are likely to be underwhelming. Please upgrade DBIC, and if this message '
1388   . "does not go away, file a bugreport including the following info:\n"
1389   . Data::Dumper::Concise::Dumper($self->_describe_connection)
1390   );
1391 }
1392
1393 sub _do_connection_actions {
1394   my $self          = shift;
1395   my $method_prefix = shift;
1396   my $call          = shift;
1397
1398   if (not ref($call)) {
1399     my $method = $method_prefix . $call;
1400     $self->$method(@_);
1401   } elsif (ref($call) eq 'CODE') {
1402     $self->$call(@_);
1403   } elsif (ref($call) eq 'ARRAY') {
1404     if (ref($call->[0]) ne 'ARRAY') {
1405       $self->_do_connection_actions($method_prefix, $_) for @$call;
1406     } else {
1407       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1408     }
1409   } else {
1410     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1411   }
1412
1413   return $self;
1414 }
1415
1416 sub connect_call_do_sql {
1417   my $self = shift;
1418   $self->_do_query(@_);
1419 }
1420
1421 sub disconnect_call_do_sql {
1422   my $self = shift;
1423   $self->_do_query(@_);
1424 }
1425
1426 # override in db-specific backend when necessary
1427 sub connect_call_datetime_setup { 1 }
1428
1429 sub _do_query {
1430   my ($self, $action) = @_;
1431
1432   if (ref $action eq 'CODE') {
1433     $action = $action->($self);
1434     $self->_do_query($_) foreach @$action;
1435   }
1436   else {
1437     # Most debuggers expect ($sql, @bind), so we need to exclude
1438     # the attribute hash which is the second argument to $dbh->do
1439     # furthermore the bind values are usually to be presented
1440     # as named arrayref pairs, so wrap those here too
1441     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1442     my $sql = shift @do_args;
1443     my $attrs = shift @do_args;
1444     my @bind = map { [ undef, $_ ] } @do_args;
1445
1446     $self->dbh_do(sub {
1447       $_[0]->_query_start($sql, \@bind);
1448       $_[1]->do($sql, $attrs, @do_args);
1449       $_[0]->_query_end($sql, \@bind);
1450     });
1451   }
1452
1453   return $self;
1454 }
1455
1456 sub _connect {
1457   my $self = shift;
1458
1459   my $info = $self->_dbi_connect_info;
1460
1461   $self->throw_exception("You did not provide any connection_info")
1462     unless defined $info->[0];
1463
1464   my ($old_connect_via, $dbh);
1465
1466   local $DBI::connect_via = 'connect' if $INC{'Apache/DBI.pm'} && $ENV{MOD_PERL};
1467
1468   # this odd anonymous coderef dereference is in fact really
1469   # necessary to avoid the unwanted effect described in perl5
1470   # RT#75792
1471   #
1472   # in addition the coderef itself can't reside inside the try{} block below
1473   # as it somehow triggers a leak under perl -d
1474   my $dbh_error_handler_installer = sub {
1475     weaken (my $weak_self = $_[0]);
1476
1477     # the coderef is blessed so we can distinguish it from externally
1478     # supplied handles (which must be preserved)
1479     $_[1]->{HandleError} = bless sub {
1480       if ($weak_self) {
1481         $weak_self->throw_exception("DBI Exception: $_[0]");
1482       }
1483       else {
1484         # the handler may be invoked by something totally out of
1485         # the scope of DBIC
1486         DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1487       }
1488     }, '__DBIC__DBH__ERROR__HANDLER__';
1489   };
1490
1491   try {
1492     if(ref $info->[0] eq 'CODE') {
1493       $dbh = $info->[0]->();
1494     }
1495     else {
1496       require DBI;
1497       $dbh = DBI->connect(@$info);
1498     }
1499
1500     die $DBI::errstr unless $dbh;
1501
1502     die sprintf ("%s fresh DBI handle with a *false* 'Active' attribute. "
1503       . 'This handle is disconnected as far as DBIC is concerned, and we can '
1504       . 'not continue',
1505       ref $info->[0] eq 'CODE'
1506         ? "Connection coderef $info->[0] returned a"
1507         : 'DBI->connect($schema->storage->connect_info) resulted in a'
1508     ) unless $dbh->FETCH('Active');
1509
1510     # sanity checks unless asked otherwise
1511     unless ($self->unsafe) {
1512
1513       $self->throw_exception(
1514         'Refusing clobbering of {HandleError} installed on externally supplied '
1515        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1516       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1517
1518       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1519       # request, or an external handle. Complain and set anyway
1520       unless ($dbh->{RaiseError}) {
1521         carp( ref $info->[0] eq 'CODE'
1522
1523           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1524            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1525            .'attribute has been supplied'
1526
1527           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1528            .'unsafe => 1. Toggling RaiseError back to true'
1529         );
1530
1531         $dbh->{RaiseError} = 1;
1532       }
1533
1534       $dbh_error_handler_installer->($self, $dbh);
1535     }
1536   }
1537   catch {
1538     $self->throw_exception("DBI Connection failed: $_")
1539   };
1540
1541   $self->_dbh_autocommit($dbh->{AutoCommit});
1542   return $dbh;
1543 }
1544
1545 sub txn_begin {
1546   my $self = shift;
1547
1548   # this means we have not yet connected and do not know the AC status
1549   # (e.g. coderef $dbh), need a full-fledged connection check
1550   if (! defined $self->_dbh_autocommit) {
1551     $self->ensure_connected;
1552   }
1553   # Otherwise simply connect or re-connect on pid changes
1554   else {
1555     $self->_get_dbh;
1556   }
1557
1558   $self->next::method(@_);
1559 }
1560
1561 sub _exec_txn_begin {
1562   my $self = shift;
1563
1564   # if the user is utilizing txn_do - good for him, otherwise we need to
1565   # ensure that the $dbh is healthy on BEGIN.
1566   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1567   # will be replaced by a failure of begin_work itself (which will be
1568   # then retried on reconnect)
1569   if ($self->{_in_do_block}) {
1570     $self->_dbh->begin_work;
1571   } else {
1572     $self->dbh_do(sub { $_[1]->begin_work });
1573   }
1574 }
1575
1576 sub txn_commit {
1577   my $self = shift;
1578
1579   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1580   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1581     unless $self->_dbh;
1582
1583   # esoteric case for folks using external $dbh handles
1584   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1585     carp "Storage transaction_depth 0 does not match "
1586         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1587     $self->transaction_depth(1);
1588   }
1589
1590   $self->next::method(@_);
1591
1592   # if AutoCommit is disabled txn_depth never goes to 0
1593   # as a new txn is started immediately on commit
1594   $self->transaction_depth(1) if (
1595     !$self->transaction_depth
1596       and
1597     defined $self->_dbh_autocommit
1598       and
1599     ! $self->_dbh_autocommit
1600   );
1601 }
1602
1603 sub _exec_txn_commit {
1604   shift->_dbh->commit;
1605 }
1606
1607 sub txn_rollback {
1608   my $self = shift;
1609
1610   $self->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1611   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1612     unless $self->_dbh;
1613
1614   # esoteric case for folks using external $dbh handles
1615   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1616     carp "Storage transaction_depth 0 does not match "
1617         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1618     $self->transaction_depth(1);
1619   }
1620
1621   $self->next::method(@_);
1622
1623   # if AutoCommit is disabled txn_depth never goes to 0
1624   # as a new txn is started immediately on commit
1625   $self->transaction_depth(1) if (
1626     !$self->transaction_depth
1627       and
1628     defined $self->_dbh_autocommit
1629       and
1630     ! $self->_dbh_autocommit
1631   );
1632 }
1633
1634 sub _exec_txn_rollback {
1635   shift->_dbh->rollback;
1636 }
1637
1638 # generate the DBI-specific stubs, which then fallback to ::Storage proper
1639 quote_sub __PACKAGE__ . "::$_" => sprintf (<<'EOS', $_) for qw(svp_begin svp_release svp_rollback);
1640   $_[0]->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
1641   $_[0]->throw_exception('Unable to %s() on a disconnected storage')
1642     unless $_[0]->_dbh;
1643   shift->next::method(@_);
1644 EOS
1645
1646 # This used to be the top-half of _execute.  It was split out to make it
1647 #  easier to override in NoBindVars without duping the rest.  It takes up
1648 #  all of _execute's args, and emits $sql, @bind.
1649 sub _prep_for_execute {
1650   #my ($self, $op, $ident, $args) = @_;
1651   return shift->_gen_sql_bind(@_)
1652 }
1653
1654 sub _gen_sql_bind {
1655   my ($self, $op, $ident, $args) = @_;
1656
1657   my ($colinfos, $from);
1658   if ( blessed($ident) ) {
1659     $from = $ident->from;
1660     $colinfos = $ident->columns_info;
1661   }
1662
1663   my ($sql, $bind);
1664   ($sql, @$bind) = $self->sql_maker->$op( ($from || $ident), @$args );
1665
1666   $bind = $self->_resolve_bindattrs(
1667     $ident, [ @{$args->[2]{bind}||[]}, @$bind ], $colinfos
1668   );
1669
1670   if (
1671     ! $ENV{DBIC_DT_SEARCH_OK}
1672       and
1673     $op eq 'select'
1674       and
1675     first {
1676       length ref $_->[1]
1677         and
1678       blessed($_->[1])
1679         and
1680       $_->[1]->isa('DateTime')
1681     } @$bind
1682   ) {
1683     carp_unique 'DateTime objects passed to search() are not supported '
1684       . 'properly (InflateColumn::DateTime formats and settings are not '
1685       . 'respected.) See "Formatting DateTime objects in queries" in '
1686       . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
1687       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
1688   }
1689
1690   return( $sql, $bind );
1691 }
1692
1693 sub _resolve_bindattrs {
1694   my ($self, $ident, $bind, $colinfos) = @_;
1695
1696   my $resolve_bindinfo = sub {
1697     #my $infohash = shift;
1698
1699     $colinfos ||= { %{ $self->_resolve_column_info($ident) } };
1700
1701     my $ret;
1702     if (my $col = $_[0]->{dbic_colname}) {
1703       $ret = { %{$_[0]} };
1704
1705       $ret->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1706         if $colinfos->{$col}{data_type};
1707
1708       $ret->{sqlt_size} ||= $colinfos->{$col}{size}
1709         if $colinfos->{$col}{size};
1710     }
1711
1712     $ret || $_[0];
1713   };
1714
1715   return [ map {
1716     my $resolved =
1717       ( ref $_ ne 'ARRAY' or @$_ != 2 ) ? [ {}, $_ ]
1718     : ( ! defined $_->[0] )             ? [ {}, $_->[1] ]
1719     : (ref $_->[0] eq 'HASH')           ? [(
1720                                             ! keys %{$_->[0]}
1721                                               or
1722                                             exists $_->[0]{dbd_attrs}
1723                                               or
1724                                             $_->[0]{sqlt_datatype}
1725                                            ) ? $_->[0]
1726                                              : $resolve_bindinfo->($_->[0])
1727                                            , $_->[1]
1728                                           ]
1729     : (ref $_->[0] eq 'SCALAR')         ? [ { sqlt_datatype => ${$_->[0]} }, $_->[1] ]
1730     :                                     [ $resolve_bindinfo->(
1731                                               { dbic_colname => $_->[0] }
1732                                             ), $_->[1] ]
1733     ;
1734
1735     if (
1736       ! exists $resolved->[0]{dbd_attrs}
1737         and
1738       ! $resolved->[0]{sqlt_datatype}
1739         and
1740       length ref $resolved->[1]
1741         and
1742       ! is_plain_value $resolved->[1]
1743     ) {
1744       require Data::Dumper;
1745       local $Data::Dumper::Maxdepth = 1;
1746       local $Data::Dumper::Terse = 1;
1747       local $Data::Dumper::Useqq = 1;
1748       local $Data::Dumper::Indent = 0;
1749       local $Data::Dumper::Pad = ' ';
1750       $self->throw_exception(
1751         'You must supply a datatype/bindtype (see DBIx::Class::ResultSet/DBIC BIND VALUES) '
1752       . 'for non-scalar value '. Data::Dumper::Dumper ($resolved->[1])
1753       );
1754     }
1755
1756     $resolved;
1757
1758   } @$bind ];
1759 }
1760
1761 sub _format_for_trace {
1762   #my ($self, $bind) = @_;
1763
1764   ### Turn @bind from something like this:
1765   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1766   ### to this:
1767   ###   ( "'1'", "'3'" )
1768
1769   map {
1770     defined( $_ && $_->[1] )
1771       ? qq{'$_->[1]'}
1772       : q{NULL}
1773   } @{$_[1] || []};
1774 }
1775
1776 sub _query_start {
1777   my ( $self, $sql, $bind ) = @_;
1778
1779   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1780     if $self->debug;
1781 }
1782
1783 sub _query_end {
1784   my ( $self, $sql, $bind ) = @_;
1785
1786   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1787     if $self->debug;
1788 }
1789
1790 sub _dbi_attrs_for_bind {
1791   my ($self, $ident, $bind) = @_;
1792
1793   my @attrs;
1794
1795   for (map { $_->[0] } @$bind) {
1796     push @attrs, do {
1797       if (exists $_->{dbd_attrs}) {
1798         $_->{dbd_attrs}
1799       }
1800       elsif($_->{sqlt_datatype}) {
1801         # cache the result in the dbh_details hash, as it can not change unless
1802         # we connect to something else
1803         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1804         if (not exists $cache->{$_->{sqlt_datatype}}) {
1805           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1806         }
1807         $cache->{$_->{sqlt_datatype}};
1808       }
1809       else {
1810         undef;  # always push something at this position
1811       }
1812     }
1813   }
1814
1815   return \@attrs;
1816 }
1817
1818 sub _execute {
1819   my ($self, $op, $ident, @args) = @_;
1820
1821   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1822
1823   # not even a PID check - we do not care about the state of the _dbh.
1824   # All we need is to get the appropriate drivers loaded if they aren't
1825   # already so that the assumption in ad7c50fc26e holds
1826   $self->_populate_dbh unless $self->_dbh;
1827
1828   $self->dbh_do( _dbh_execute =>     # retry over disconnects
1829     $sql,
1830     $bind,
1831     $self->_dbi_attrs_for_bind($ident, $bind),
1832   );
1833 }
1834
1835 sub _dbh_execute {
1836   my ($self, $dbh, $sql, $bind, $bind_attrs) = @_;
1837
1838   $self->_query_start( $sql, $bind );
1839
1840   my $sth = $self->_bind_sth_params(
1841     $self->_prepare_sth($dbh, $sql),
1842     $bind,
1843     $bind_attrs,
1844   );
1845
1846   # Can this fail without throwing an exception anyways???
1847   my $rv = $sth->execute();
1848   $self->throw_exception(
1849     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1850   ) if !$rv;
1851
1852   $self->_query_end( $sql, $bind );
1853
1854   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1855 }
1856
1857 sub _prepare_sth {
1858   my ($self, $dbh, $sql) = @_;
1859
1860   # 3 is the if_active parameter which avoids active sth re-use
1861   my $sth = $self->disable_sth_caching
1862     ? $dbh->prepare($sql)
1863     : $dbh->prepare_cached($sql, {}, 3);
1864
1865   # XXX You would think RaiseError would make this impossible,
1866   #  but apparently that's not true :(
1867   $self->throw_exception(
1868     $dbh->errstr
1869       ||
1870     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
1871             .'an exception and/or setting $dbh->errstr',
1872       length ($sql) > 20
1873         ? substr($sql, 0, 20) . '...'
1874         : $sql
1875       ,
1876       'DBD::' . $dbh->{Driver}{Name},
1877     )
1878   ) if !$sth;
1879
1880   $sth;
1881 }
1882
1883 sub _bind_sth_params {
1884   my ($self, $sth, $bind, $bind_attrs) = @_;
1885
1886   for my $i (0 .. $#$bind) {
1887     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1888       $sth->bind_param_inout(
1889         $i + 1, # bind params counts are 1-based
1890         $bind->[$i][1],
1891         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1892         $bind_attrs->[$i],
1893       );
1894     }
1895     else {
1896       # FIXME SUBOPTIMAL - DBI needs fixing to always stringify regardless of DBD
1897       my $v = ( length ref $bind->[$i][1] and is_plain_value $bind->[$i][1] )
1898         ? "$bind->[$i][1]"
1899         : $bind->[$i][1]
1900       ;
1901
1902       $sth->bind_param(
1903         $i + 1,
1904         # The temp-var is CRUCIAL - DO NOT REMOVE IT, breaks older DBD::SQLite RT#79576
1905         $v,
1906         $bind_attrs->[$i],
1907       );
1908     }
1909   }
1910
1911   $sth;
1912 }
1913
1914 sub _prefetch_autovalues {
1915   my ($self, $source, $colinfo, $to_insert) = @_;
1916
1917   my %values;
1918   for my $col (keys %$colinfo) {
1919     if (
1920       $colinfo->{$col}{auto_nextval}
1921         and
1922       (
1923         ! exists $to_insert->{$col}
1924           or
1925         is_literal_value($to_insert->{$col})
1926       )
1927     ) {
1928       $values{$col} = $self->_sequence_fetch(
1929         'NEXTVAL',
1930         ( $colinfo->{$col}{sequence} ||=
1931             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1932         ),
1933       );
1934     }
1935   }
1936
1937   \%values;
1938 }
1939
1940 sub insert {
1941   my ($self, $source, $to_insert) = @_;
1942
1943   my $col_infos = $source->columns_info;
1944
1945   my $prefetched_values = $self->_prefetch_autovalues($source, $col_infos, $to_insert);
1946
1947   # fuse the values, but keep a separate list of prefetched_values so that
1948   # they can be fused once again with the final return
1949   $to_insert = { %$to_insert, %$prefetched_values };
1950
1951   # FIXME - we seem to assume undef values as non-supplied. This is wrong.
1952   # Investigate what does it take to s/defined/exists/
1953   my %pcols = map { $_ => 1 } $source->primary_columns;
1954   my (%retrieve_cols, $autoinc_supplied, $retrieve_autoinc_col);
1955   for my $col ($source->columns) {
1956     if ($col_infos->{$col}{is_auto_increment}) {
1957       $autoinc_supplied ||= 1 if defined $to_insert->{$col};
1958       $retrieve_autoinc_col ||= $col unless $autoinc_supplied;
1959     }
1960
1961     # nothing to retrieve when explicit values are supplied
1962     next if (
1963       defined $to_insert->{$col} and ! is_literal_value($to_insert->{$col})
1964     );
1965
1966     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1967     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1968       $pcols{$col}
1969         or
1970       $col_infos->{$col}{retrieve_on_insert}
1971     );
1972   };
1973
1974   local $self->{_autoinc_supplied_for_op} = $autoinc_supplied;
1975   local $self->{_perform_autoinc_retrieval} = $retrieve_autoinc_col;
1976
1977   my ($sqla_opts, @ir_container);
1978   if (%retrieve_cols and $self->_use_insert_returning) {
1979     $sqla_opts->{returning_container} = \@ir_container
1980       if $self->_use_insert_returning_bound;
1981
1982     $sqla_opts->{returning} = [
1983       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1984     ];
1985   }
1986
1987   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1988
1989   my %returned_cols = %$to_insert;
1990   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1991     @ir_container = try {
1992       local $SIG{__WARN__} = sub {};
1993       my @r = $sth->fetchrow_array;
1994       $sth->finish;
1995       @r;
1996     } unless @ir_container;
1997
1998     @returned_cols{@$retlist} = @ir_container if @ir_container;
1999   }
2000   else {
2001     # pull in PK if needed and then everything else
2002     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
2003
2004       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
2005         unless $self->can('last_insert_id');
2006
2007       my @pri_values = $self->last_insert_id($source, @missing_pri);
2008
2009       $self->throw_exception( "Can't get last insert id" )
2010         unless (@pri_values == @missing_pri);
2011
2012       @returned_cols{@missing_pri} = @pri_values;
2013       delete @retrieve_cols{@missing_pri};
2014     }
2015
2016     # if there is more left to pull
2017     if (%retrieve_cols) {
2018       $self->throw_exception(
2019         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
2020       ) unless %pcols;
2021
2022       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
2023
2024       my $cur = DBIx::Class::ResultSet->new($source, {
2025         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
2026         select => \@left_to_fetch,
2027       })->cursor;
2028
2029       @returned_cols{@left_to_fetch} = $cur->next;
2030
2031       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
2032         if scalar $cur->next;
2033     }
2034   }
2035
2036   return { %$prefetched_values, %returned_cols };
2037 }
2038
2039 sub insert_bulk {
2040   carp_unique(
2041     'insert_bulk() should have never been exposed as a public method and '
2042   . 'calling it is depecated as of Aug 2014. If you believe having a genuine '
2043   . 'use for this method please contact the development team via '
2044   . DBIx::Class::_ENV_::HELP_URL
2045   );
2046
2047   return '0E0' unless @{$_[3]||[]};
2048
2049   shift->_insert_bulk(@_);
2050 }
2051
2052 sub _insert_bulk {
2053   my ($self, $source, $cols, $data) = @_;
2054
2055   $self->throw_exception('Calling _insert_bulk without a dataset to process makes no sense')
2056     unless @{$data||[]};
2057
2058   my $colinfos = $source->columns_info($cols);
2059
2060   local $self->{_autoinc_supplied_for_op} =
2061     (grep { $_->{is_auto_increment} } values %$colinfos)
2062       ? 1
2063       : 0
2064   ;
2065
2066   # get a slice type index based on first row of data
2067   # a "column" in this context may refer to more than one bind value
2068   # e.g. \[ '?, ?', [...], [...] ]
2069   #
2070   # construct the value type index - a description of values types for every
2071   # per-column slice of $data:
2072   #
2073   # nonexistent - nonbind literal
2074   # 0 - regular value
2075   # [] of bindattrs - resolved attribute(s) of bind(s) passed via literal+bind \[] combo
2076   #
2077   # also construct the column hash to pass to the SQL generator. For plain
2078   # (non literal) values - convert the members of the first row into a
2079   # literal+bind combo, with extra positional info in the bind attr hashref.
2080   # This will allow us to match the order properly, and is so contrived
2081   # because a user-supplied literal/bind (or something else specific to a
2082   # resultsource and/or storage driver) can inject extra binds along the
2083   # way, so one can't rely on "shift positions" ordering at all. Also we
2084   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
2085   # can be later matched up by address), because we want to supply a real
2086   # value on which perhaps e.g. datatype checks will be performed
2087   my ($proto_data, $value_type_by_col_idx);
2088   for my $col_idx (0..$#$cols) {
2089     my $colname = $cols->[$col_idx];
2090     if (ref $data->[0][$col_idx] eq 'SCALAR') {
2091       # no bind value at all - no type
2092
2093       $proto_data->{$colname} = $data->[0][$col_idx];
2094     }
2095     elsif (ref $data->[0][$col_idx] eq 'REF' and ref ${$data->[0][$col_idx]} eq 'ARRAY' ) {
2096       # repack, so we don't end up mangling the original \[]
2097       my ($sql, @bind) = @${$data->[0][$col_idx]};
2098
2099       # normalization of user supplied stuff
2100       my $resolved_bind = $self->_resolve_bindattrs(
2101         $source, \@bind, $colinfos,
2102       );
2103
2104       # store value-less (attrs only) bind info - we will be comparing all
2105       # supplied binds against this for sanity
2106       $value_type_by_col_idx->{$col_idx} = [ map { $_->[0] } @$resolved_bind ];
2107
2108       $proto_data->{$colname} = \[ $sql, map { [
2109         # inject slice order to use for $proto_bind construction
2110           { %{$resolved_bind->[$_][0]}, _bind_data_slice_idx => $col_idx, _literal_bind_subindex => $_+1 }
2111             =>
2112           $resolved_bind->[$_][1]
2113         ] } (0 .. $#bind)
2114       ];
2115     }
2116     else {
2117       $value_type_by_col_idx->{$col_idx} = undef;
2118
2119       $proto_data->{$colname} = \[ '?', [
2120         { dbic_colname => $colname, _bind_data_slice_idx => $col_idx }
2121           =>
2122         $data->[0][$col_idx]
2123       ] ];
2124     }
2125   }
2126
2127   my ($sql, $proto_bind) = $self->_prep_for_execute (
2128     'insert',
2129     $source,
2130     [ $proto_data ],
2131   );
2132
2133   if (! @$proto_bind and keys %$value_type_by_col_idx) {
2134     # if the bindlist is empty and we had some dynamic binds, this means the
2135     # storage ate them away (e.g. the NoBindVars component) and interpolated
2136     # them directly into the SQL. This obviously can't be good for multi-inserts
2137     $self->throw_exception('Unable to invoke fast-path insert without storage placeholder support');
2138   }
2139
2140   # sanity checks
2141   # FIXME - devise a flag "no babysitting" or somesuch to shut this off
2142   #
2143   # use an error reporting closure for convenience (less to pass)
2144   my $bad_slice_report_cref = sub {
2145     my ($msg, $r_idx, $c_idx) = @_;
2146     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
2147       $msg,
2148       $cols->[$c_idx],
2149       do {
2150         require Data::Dumper::Concise;
2151         local $Data::Dumper::Maxdepth = 5;
2152         Data::Dumper::Concise::Dumper ({
2153           map { $cols->[$_] =>
2154             $data->[$r_idx][$_]
2155           } 0..$#$cols
2156         }),
2157       }
2158     );
2159   };
2160
2161   for my $col_idx (0..$#$cols) {
2162     my $reference_val = $data->[0][$col_idx];
2163
2164     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
2165       my $val = $data->[$row_idx][$col_idx];
2166
2167       if (! exists $value_type_by_col_idx->{$col_idx}) { # literal no binds
2168         if (ref $val ne 'SCALAR') {
2169           $bad_slice_report_cref->(
2170             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
2171             $row_idx,
2172             $col_idx,
2173           );
2174         }
2175         elsif ($$val ne $$reference_val) {
2176           $bad_slice_report_cref->(
2177             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
2178             $row_idx,
2179             $col_idx,
2180           );
2181         }
2182       }
2183       elsif (! defined $value_type_by_col_idx->{$col_idx} ) {  # regular non-literal value
2184         if (is_literal_value($val)) {
2185           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
2186         }
2187       }
2188       else {  # binds from a \[], compare type and attrs
2189         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
2190           $bad_slice_report_cref->(
2191             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
2192             $row_idx,
2193             $col_idx,
2194           );
2195         }
2196         # start drilling down and bail out early on identical refs
2197         elsif (
2198           $reference_val != $val
2199             or
2200           $$reference_val != $$val
2201         ) {
2202           if (${$val}->[0] ne ${$reference_val}->[0]) {
2203             $bad_slice_report_cref->(
2204               "Inconsistent literal/bind SQL (expecting \\['${$reference_val}->[0]', ... ])",
2205               $row_idx,
2206               $col_idx,
2207             );
2208           }
2209           # need to check the bind attrs - a bind will happen only once for
2210           # the entire dataset, so any changes further down will be ignored.
2211           elsif (! Data::Compare::Compare(
2212             $value_type_by_col_idx->{$col_idx},
2213             [
2214               map
2215               { $_->[0] }
2216               @{$self->_resolve_bindattrs(
2217                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
2218               )}
2219             ],
2220           )) {
2221             $bad_slice_report_cref->(
2222               'Differing bind attributes on literal/bind values not supported',
2223               $row_idx,
2224               $col_idx,
2225             );
2226           }
2227         }
2228       }
2229     }
2230   }
2231
2232   # neither _dbh_execute_for_fetch, nor _dbh_execute_inserts_with_no_binds
2233   # are atomic (even if execute_for_fetch is a single call). Thus a safety
2234   # scope guard
2235   my $guard = $self->txn_scope_guard;
2236
2237   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
2238   my $sth = $self->_prepare_sth($self->_dbh, $sql);
2239   my $rv = do {
2240     if (@$proto_bind) {
2241       # proto bind contains the information on which pieces of $data to pull
2242       # $cols is passed in only for prettier error-reporting
2243       $self->_dbh_execute_for_fetch( $source, $sth, $proto_bind, $cols, $data );
2244     }
2245     else {
2246       # bind_param_array doesn't work if there are no binds
2247       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
2248     }
2249   };
2250
2251   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
2252
2253   $guard->commit;
2254
2255   return wantarray ? ($rv, $sth, @$proto_bind) : $rv;
2256 }
2257
2258 # execute_for_fetch is capable of returning data just fine (it means it
2259 # can be used for INSERT...RETURNING and UPDATE...RETURNING. Since this
2260 # is the void-populate fast-path we will just ignore this altogether
2261 # for the time being.
2262 sub _dbh_execute_for_fetch {
2263   my ($self, $source, $sth, $proto_bind, $cols, $data) = @_;
2264
2265   # If we have any bind attributes to take care of, we will bind the
2266   # proto-bind data (which will never be used by execute_for_fetch)
2267   # However since column bindtypes are "sticky", this is sufficient
2268   # to get the DBD to apply the bindtype to all values later on
2269   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
2270
2271   for my $i (0 .. $#$proto_bind) {
2272     $sth->bind_param (
2273       $i+1, # DBI bind indexes are 1-based
2274       $proto_bind->[$i][1],
2275       $bind_attrs->[$i],
2276     ) if defined $bind_attrs->[$i];
2277   }
2278
2279   # At this point $data slots named in the _bind_data_slice_idx of
2280   # each piece of $proto_bind are either \[]s or plain values to be
2281   # passed in. Construct the dispensing coderef. *NOTE* the order
2282   # of $data will differ from this of the ?s in the SQL (due to
2283   # alphabetical ordering by colname). We actually do want to
2284   # preserve this behavior so that prepare_cached has a better
2285   # chance of matching on unrelated calls
2286
2287   my $fetch_row_idx = -1; # saner loop this way
2288   my $fetch_tuple = sub {
2289     return undef if ++$fetch_row_idx > $#$data;
2290
2291     return [ map {
2292       my $v = ! defined $_->{_literal_bind_subindex}
2293
2294         ? $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]
2295
2296         # There are no attributes to resolve here - we already did everything
2297         # when we constructed proto_bind. However we still want to sanity-check
2298         # what the user supplied, so pass stuff through to the resolver *anyway*
2299         : $self->_resolve_bindattrs (
2300             undef,  # a fake rsrc
2301             [ ${ $data->[ $fetch_row_idx ]->[ $_->{_bind_data_slice_idx} ]}->[ $_->{_literal_bind_subindex} ] ],
2302             {},     # a fake column_info bag
2303           )->[0][1]
2304       ;
2305
2306       # FIXME SUBOPTIMAL - DBI needs fixing to always stringify regardless of DBD
2307       # For the time being forcibly stringify whatever is stringifiable
2308       (length ref $v and is_plain_value $v)
2309         ? "$v"
2310         : $v
2311       ;
2312     } map { $_->[0] } @$proto_bind ];
2313   };
2314
2315   my $tuple_status = [];
2316   my ($rv, $err);
2317   try {
2318     $rv = $sth->execute_for_fetch(
2319       $fetch_tuple,
2320       $tuple_status,
2321     );
2322   }
2323   catch {
2324     $err = shift;
2325   };
2326
2327   # Not all DBDs are create equal. Some throw on error, some return
2328   # an undef $rv, and some set $sth->err - try whatever we can
2329   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
2330     ! defined $err
2331       and
2332     ( !defined $rv or $sth->err )
2333   );
2334
2335   # Statement must finish even if there was an exception.
2336   try {
2337     $sth->finish
2338   }
2339   catch {
2340     $err = shift unless defined $err
2341   };
2342
2343   if (defined $err) {
2344     my $i = 0;
2345     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
2346
2347     $self->throw_exception("Unexpected populate error: $err")
2348       if ($i > $#$tuple_status);
2349
2350     require Data::Dumper::Concise;
2351     $self->throw_exception(sprintf "execute_for_fetch() aborted with '%s' at populate slice:\n%s",
2352       ($tuple_status->[$i][1] || $err),
2353       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
2354     );
2355   }
2356
2357   return $rv;
2358 }
2359
2360 sub _dbh_execute_inserts_with_no_binds {
2361   my ($self, $sth, $count) = @_;
2362
2363   my $err;
2364   try {
2365     my $dbh = $self->_get_dbh;
2366     local $dbh->{RaiseError} = 1;
2367     local $dbh->{PrintError} = 0;
2368
2369     $sth->execute foreach 1..$count;
2370   }
2371   catch {
2372     $err = shift;
2373   };
2374
2375   # Make sure statement is finished even if there was an exception.
2376   try {
2377     $sth->finish
2378   }
2379   catch {
2380     $err = shift unless defined $err;
2381   };
2382
2383   $self->throw_exception($err) if defined $err;
2384
2385   return $count;
2386 }
2387
2388 sub update {
2389   #my ($self, $source, @args) = @_;
2390   shift->_execute('update', @_);
2391 }
2392
2393
2394 sub delete {
2395   #my ($self, $source, @args) = @_;
2396   shift->_execute('delete', @_);
2397 }
2398
2399 sub _select {
2400   my $self = shift;
2401   $self->_execute($self->_select_args(@_));
2402 }
2403
2404 sub _select_args_to_query {
2405   my $self = shift;
2406
2407   $self->throw_exception(
2408     "Unable to generate limited query representation with 'software_limit' enabled"
2409   ) if ($_[3]->{software_limit} and ($_[3]->{offset} or $_[3]->{rows}) );
2410
2411   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2412   #  = $self->_select_args($ident, $select, $cond, $attrs);
2413   my ($op, $ident, @args) =
2414     $self->_select_args(@_);
2415
2416   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2417   my ($sql, $bind) = $self->_gen_sql_bind($op, $ident, \@args);
2418
2419   # reuse the bind arrayref
2420   unshift @{$bind}, "($sql)";
2421   \$bind;
2422 }
2423
2424 sub _select_args {
2425   my ($self, $ident, $select, $where, $orig_attrs) = @_;
2426
2427   # FIXME - that kind of caching would be nice to have
2428   # however currently we *may* pass the same $orig_attrs
2429   # with different ident/select/where
2430   # the whole interface needs to be rethought, since it
2431   # was centered around the flawed SQLA API. We can do
2432   # soooooo much better now. But that is also another
2433   # battle...
2434   #return (
2435   #  'select', $orig_attrs->{!args_as_stored_at_the_end_of_this_method!}
2436   #) if $orig_attrs->{!args_as_stored_at_the_end_of_this_method!};
2437
2438   my $sql_maker = $self->sql_maker;
2439
2440   my $attrs = {
2441     %$orig_attrs,
2442     select => $select,
2443     from => $ident,
2444     where => $where,
2445   };
2446
2447   # Sanity check the attributes (SQLMaker does it too, but
2448   # in case of a software_limit we'll never reach there)
2449   if (defined $attrs->{offset}) {
2450     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2451       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2452   }
2453
2454   if (defined $attrs->{rows}) {
2455     $self->throw_exception("The rows attribute must be a positive integer if present")
2456       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2457   }
2458   elsif ($attrs->{offset}) {
2459     # MySQL actually recommends this approach.  I cringe.
2460     $attrs->{rows} = $sql_maker->__max_int;
2461   }
2462
2463   # see if we will need to tear the prefetch apart to satisfy group_by == select
2464   # this is *extremely tricky* to get right, I am still not sure I did
2465   #
2466   my ($prefetch_needs_subquery, @limit_args);
2467
2468   if ( $attrs->{_grouped_by_distinct} and $attrs->{collapse} ) {
2469     # we already know there is a valid group_by (we made it) and we know it is
2470     # intended to be based *only* on non-multi stuff
2471     # short circuit the group_by parsing below
2472     $prefetch_needs_subquery = 1;
2473   }
2474   elsif (
2475     # The rationale is that even if we do *not* have collapse, we still
2476     # need to wrap the core grouped select/group_by in a subquery
2477     # so that databases that care about group_by/select equivalence
2478     # are happy (this includes MySQL in strict_mode)
2479     # If any of the other joined tables are referenced in the group_by
2480     # however - the user is on their own
2481     ( $prefetch_needs_subquery or ! $attrs->{_simple_passthrough_construction} )
2482       and
2483     $attrs->{group_by}
2484       and
2485     @{$attrs->{group_by}}
2486       and
2487     my $grp_aliases = try { # try{} because $attrs->{from} may be unreadable
2488       $self->_resolve_aliastypes_from_select_args({ from => $attrs->{from}, group_by => $attrs->{group_by} })
2489     }
2490   ) {
2491     # no aliases other than our own in group_by
2492     # if there are - do not allow subquery even if limit is present
2493     $prefetch_needs_subquery = ! scalar grep { $_ ne $attrs->{alias} } keys %{ $grp_aliases->{grouping} || {} };
2494   }
2495   elsif ( $attrs->{rows} && $attrs->{collapse} ) {
2496     # active collapse with a limit - that one is a no-brainer unless
2497     # overruled by a group_by above
2498     $prefetch_needs_subquery = 1;
2499   }
2500
2501   if ($prefetch_needs_subquery) {
2502     $attrs = $self->_adjust_select_args_for_complex_prefetch ($attrs);
2503   }
2504   elsif (! $attrs->{software_limit} ) {
2505     push @limit_args, (
2506       $attrs->{rows} || (),
2507       $attrs->{offset} || (),
2508     );
2509   }
2510
2511   # try to simplify the joinmap further (prune unreferenced type-single joins)
2512   if (
2513     ! $prefetch_needs_subquery  # already pruned
2514       and
2515     ref $attrs->{from}
2516       and
2517     reftype $attrs->{from} eq 'ARRAY'
2518       and
2519     @{$attrs->{from}} != 1
2520   ) {
2521     ($attrs->{from}, $attrs->{_aliastypes}) = $self->_prune_unused_joins ($attrs);
2522   }
2523
2524   # FIXME this is a gross, inefficient, largely incorrect and fragile hack
2525   # during the result inflation stage we *need* to know what was the aliastype
2526   # map as sqla saw it when the final pieces of SQL were being assembled
2527   # Originally we simply carried around the entirety of $attrs, but this
2528   # resulted in resultsets that are being reused growing continuously, as
2529   # the hash in question grew deeper and deeper.
2530   # Instead hand-pick what to take with us here (we actually don't need much
2531   # at this point just the map itself)
2532   $orig_attrs->{_last_sqlmaker_alias_map} = $attrs->{_aliastypes};
2533
2534 ###
2535   #   my $alias2source = $self->_resolve_ident_sources ($ident);
2536   #
2537   # This would be the point to deflate anything found in $attrs->{where}
2538   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2539   # expect a result object. And all we have is a resultsource (it is trivial
2540   # to extract deflator coderefs via $alias2source above).
2541   #
2542   # I don't see a way forward other than changing the way deflators are
2543   # invoked, and that's just bad...
2544 ###
2545
2546   return ( 'select', @{$attrs}{qw(from select where)}, $attrs, @limit_args );
2547 }
2548
2549 # Returns a counting SELECT for a simple count
2550 # query. Abstracted so that a storage could override
2551 # this to { count => 'firstcol' } or whatever makes
2552 # sense as a performance optimization
2553 sub _count_select {
2554   #my ($self, $source, $rs_attrs) = @_;
2555   return { count => '*' };
2556 }
2557
2558 =head2 select
2559
2560 =over 4
2561
2562 =item Arguments: $ident, $select, $condition, $attrs
2563
2564 =back
2565
2566 Handle a SQL select statement.
2567
2568 =cut
2569
2570 sub select {
2571   my $self = shift;
2572   my ($ident, $select, $condition, $attrs) = @_;
2573   return $self->cursor_class->new($self, \@_, $attrs);
2574 }
2575
2576 sub select_single {
2577   my $self = shift;
2578   my ($rv, $sth, @bind) = $self->_select(@_);
2579   my @row = $sth->fetchrow_array;
2580   my @nextrow = $sth->fetchrow_array if @row;
2581   if(@row && @nextrow) {
2582     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2583   }
2584   # Need to call finish() to work round broken DBDs
2585   $sth->finish();
2586   return @row;
2587 }
2588
2589 =head2 sql_limit_dialect
2590
2591 This is an accessor for the default SQL limit dialect used by a particular
2592 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2593 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2594 see L<DBIx::Class::SQLMaker::LimitDialects>.
2595
2596 =cut
2597
2598 sub _dbh_columns_info_for {
2599   my ($self, $dbh, $table) = @_;
2600
2601   if ($dbh->can('column_info')) {
2602     my %result;
2603     my $caught;
2604     try {
2605       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2606       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2607       $sth->execute();
2608       while ( my $info = $sth->fetchrow_hashref() ){
2609         my %column_info;
2610         $column_info{data_type}   = $info->{TYPE_NAME};
2611         $column_info{size}      = $info->{COLUMN_SIZE};
2612         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2613         $column_info{default_value} = $info->{COLUMN_DEF};
2614         my $col_name = $info->{COLUMN_NAME};
2615         $col_name =~ s/^\"(.*)\"$/$1/;
2616
2617         $result{$col_name} = \%column_info;
2618       }
2619     } catch {
2620       $caught = 1;
2621     };
2622     return \%result if !$caught && scalar keys %result;
2623   }
2624
2625   my %result;
2626   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2627   $sth->execute;
2628   my @columns = @{$sth->{NAME_lc}};
2629   for my $i ( 0 .. $#columns ){
2630     my %column_info;
2631     $column_info{data_type} = $sth->{TYPE}->[$i];
2632     $column_info{size} = $sth->{PRECISION}->[$i];
2633     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2634
2635     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2636       $column_info{data_type} = $1;
2637       $column_info{size}    = $2;
2638     }
2639
2640     $result{$columns[$i]} = \%column_info;
2641   }
2642   $sth->finish;
2643
2644   foreach my $col (keys %result) {
2645     my $colinfo = $result{$col};
2646     my $type_num = $colinfo->{data_type};
2647     my $type_name;
2648     if(defined $type_num && $dbh->can('type_info')) {
2649       my $type_info = $dbh->type_info($type_num);
2650       $type_name = $type_info->{TYPE_NAME} if $type_info;
2651       $colinfo->{data_type} = $type_name if $type_name;
2652     }
2653   }
2654
2655   return \%result;
2656 }
2657
2658 sub columns_info_for {
2659   my ($self, $table) = @_;
2660   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2661 }
2662
2663 =head2 last_insert_id
2664
2665 Return the row id of the last insert.
2666
2667 =cut
2668
2669 sub _dbh_last_insert_id {
2670     my ($self, $dbh, $source, $col) = @_;
2671
2672     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2673
2674     return $id if defined $id;
2675
2676     my $class = ref $self;
2677     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2678 }
2679
2680 sub last_insert_id {
2681   my $self = shift;
2682   $self->_dbh_last_insert_id ($self->_dbh, @_);
2683 }
2684
2685 =head2 _native_data_type
2686
2687 =over 4
2688
2689 =item Arguments: $type_name
2690
2691 =back
2692
2693 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2694 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2695 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2696
2697 The default implementation returns C<undef>, implement in your Storage driver if
2698 you need this functionality.
2699
2700 Should map types from other databases to the native RDBMS type, for example
2701 C<VARCHAR2> to C<VARCHAR>.
2702
2703 Types with modifiers should map to the underlying data type. For example,
2704 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2705
2706 Composite types should map to the container type, for example
2707 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2708
2709 =cut
2710
2711 sub _native_data_type {
2712   #my ($self, $data_type) = @_;
2713   return undef
2714 }
2715
2716 # Check if placeholders are supported at all
2717 sub _determine_supports_placeholders {
2718   my $self = shift;
2719   my $dbh  = $self->_get_dbh;
2720
2721   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2722   # but it is inaccurate more often than not
2723   return try {
2724     local $dbh->{PrintError} = 0;
2725     local $dbh->{RaiseError} = 1;
2726     $dbh->do('select ?', {}, 1);
2727     1;
2728   }
2729   catch {
2730     0;
2731   };
2732 }
2733
2734 # Check if placeholders bound to non-string types throw exceptions
2735 #
2736 sub _determine_supports_typeless_placeholders {
2737   my $self = shift;
2738   my $dbh  = $self->_get_dbh;
2739
2740   return try {
2741     local $dbh->{PrintError} = 0;
2742     local $dbh->{RaiseError} = 1;
2743     # this specifically tests a bind that is NOT a string
2744     $dbh->do('select 1 where 1 = ?', {}, 1);
2745     1;
2746   }
2747   catch {
2748     0;
2749   };
2750 }
2751
2752 =head2 sqlt_type
2753
2754 Returns the database driver name.
2755
2756 =cut
2757
2758 sub sqlt_type {
2759   shift->_get_dbh->{Driver}->{Name};
2760 }
2761
2762 =head2 bind_attribute_by_data_type
2763
2764 Given a datatype from column info, returns a database specific bind
2765 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2766 let the database planner just handle it.
2767
2768 This method is always called after the driver has been determined and a DBI
2769 connection has been established. Therefore you can refer to C<DBI::$constant>
2770 and/or C<DBD::$driver::$constant> directly, without worrying about loading
2771 the correct modules.
2772
2773 =cut
2774
2775 sub bind_attribute_by_data_type {
2776     return;
2777 }
2778
2779 =head2 is_datatype_numeric
2780
2781 Given a datatype from column_info, returns a boolean value indicating if
2782 the current RDBMS considers it a numeric value. This controls how
2783 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2784 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2785 be performed instead of the usual C<eq>.
2786
2787 =cut
2788
2789 sub is_datatype_numeric {
2790   #my ($self, $dt) = @_;
2791
2792   return 0 unless $_[1];
2793
2794   $_[1] =~ /^ (?:
2795     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2796   ) $/ix;
2797 }
2798
2799
2800 =head2 create_ddl_dir
2801
2802 =over 4
2803
2804 =item Arguments: $schema, \@databases, $version, $directory, $preversion, \%sqlt_args
2805
2806 =back
2807
2808 Creates a SQL file based on the Schema, for each of the specified
2809 database engines in C<\@databases> in the given directory.
2810 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2811
2812 Given a previous version number, this will also create a file containing
2813 the ALTER TABLE statements to transform the previous schema into the
2814 current one. Note that these statements may contain C<DROP TABLE> or
2815 C<DROP COLUMN> statements that can potentially destroy data.
2816
2817 The file names are created using the C<ddl_filename> method below, please
2818 override this method in your schema if you would like a different file
2819 name format. For the ALTER file, the same format is used, replacing
2820 $version in the name with "$preversion-$version".
2821
2822 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2823 The most common value for this would be C<< { add_drop_table => 1 } >>
2824 to have the SQL produced include a C<DROP TABLE> statement for each table
2825 created. For quoting purposes supply C<quote_identifiers>.
2826
2827 If no arguments are passed, then the following default values are assumed:
2828
2829 =over 4
2830
2831 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2832
2833 =item version    - $schema->schema_version
2834
2835 =item directory  - './'
2836
2837 =item preversion - <none>
2838
2839 =back
2840
2841 By default, C<\%sqlt_args> will have
2842
2843  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2844
2845 merged with the hash passed in. To disable any of those features, pass in a
2846 hashref like the following
2847
2848  { ignore_constraint_names => 0, # ... other options }
2849
2850
2851 WARNING: You are strongly advised to check all SQL files created, before applying
2852 them.
2853
2854 =cut
2855
2856 sub create_ddl_dir {
2857   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2858
2859   unless ($dir) {
2860     carp "No directory given, using ./\n";
2861     $dir = './';
2862   } else {
2863       -d $dir
2864         or
2865       (require File::Path and File::Path::mkpath (["$dir"]))  # mkpath does not like objects (i.e. Path::Class::Dir)
2866         or
2867       $self->throw_exception(
2868         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2869       );
2870   }
2871
2872   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2873
2874   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2875   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2876
2877   my $schema_version = $schema->schema_version || '1.x';
2878   $version ||= $schema_version;
2879
2880   $sqltargs = {
2881     add_drop_table => 1,
2882     ignore_constraint_names => 1,
2883     ignore_index_names => 1,
2884     quote_identifiers => $self->sql_maker->_quoting_enabled,
2885     %{$sqltargs || {}}
2886   };
2887
2888   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2889     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2890   }
2891
2892   my $sqlt = SQL::Translator->new( $sqltargs );
2893
2894   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2895   my $sqlt_schema = $sqlt->translate({ data => $schema })
2896     or $self->throw_exception ($sqlt->error);
2897
2898   foreach my $db (@$databases) {
2899     $sqlt->reset();
2900     $sqlt->{schema} = $sqlt_schema;
2901     $sqlt->producer($db);
2902
2903     my $file;
2904     my $filename = $schema->ddl_filename($db, $version, $dir);
2905     if (-e $filename && ($version eq $schema_version )) {
2906       # if we are dumping the current version, overwrite the DDL
2907       carp "Overwriting existing DDL file - $filename";
2908       unlink($filename);
2909     }
2910
2911     my $output = $sqlt->translate;
2912     if(!$output) {
2913       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2914       next;
2915     }
2916     if(!open($file, ">$filename")) {
2917       $self->throw_exception("Can't open $filename for writing ($!)");
2918       next;
2919     }
2920     print $file $output;
2921     close($file);
2922
2923     next unless ($preversion);
2924
2925     require SQL::Translator::Diff;
2926
2927     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2928     if(!-e $prefilename) {
2929       carp("No previous schema file found ($prefilename)");
2930       next;
2931     }
2932
2933     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2934     if(-e $difffile) {
2935       carp("Overwriting existing diff file - $difffile");
2936       unlink($difffile);
2937     }
2938
2939     my $source_schema;
2940     {
2941       my $t = SQL::Translator->new($sqltargs);
2942       $t->debug( 0 );
2943       $t->trace( 0 );
2944
2945       $t->parser( $db )
2946         or $self->throw_exception ($t->error);
2947
2948       my $out = $t->translate( $prefilename )
2949         or $self->throw_exception ($t->error);
2950
2951       $source_schema = $t->schema;
2952
2953       $source_schema->name( $prefilename )
2954         unless ( $source_schema->name );
2955     }
2956
2957     # The "new" style of producers have sane normalization and can support
2958     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2959     # And we have to diff parsed SQL against parsed SQL.
2960     my $dest_schema = $sqlt_schema;
2961
2962     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2963       my $t = SQL::Translator->new($sqltargs);
2964       $t->debug( 0 );
2965       $t->trace( 0 );
2966
2967       $t->parser( $db )
2968         or $self->throw_exception ($t->error);
2969
2970       my $out = $t->translate( $filename )
2971         or $self->throw_exception ($t->error);
2972
2973       $dest_schema = $t->schema;
2974
2975       $dest_schema->name( $filename )
2976         unless $dest_schema->name;
2977     }
2978
2979     my $diff = do {
2980       # FIXME - this is a terrible workaround for
2981       # https://github.com/dbsrgits/sql-translator/commit/2d23c1e
2982       # Fixing it in this sloppy manner so that we don't hve to
2983       # lockstep an SQLT release as well. Needs to be removed at
2984       # some point, and SQLT dep bumped
2985       local $SQL::Translator::Producer::SQLite::NO_QUOTES
2986         if $SQL::Translator::Producer::SQLite::NO_QUOTES;
2987
2988       SQL::Translator::Diff::schema_diff($source_schema, $db,
2989                                          $dest_schema,   $db,
2990                                          $sqltargs
2991                                        );
2992     };
2993
2994     if(!open $file, ">$difffile") {
2995       $self->throw_exception("Can't write to $difffile ($!)");
2996       next;
2997     }
2998     print $file $diff;
2999     close($file);
3000   }
3001 }
3002
3003 =head2 deployment_statements
3004
3005 =over 4
3006
3007 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
3008
3009 =back
3010
3011 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
3012
3013 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
3014 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
3015
3016 C<$directory> is used to return statements from files in a previously created
3017 L</create_ddl_dir> directory and is optional. The filenames are constructed
3018 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
3019
3020 If no C<$directory> is specified then the statements are constructed on the
3021 fly using L<SQL::Translator> and C<$version> is ignored.
3022
3023 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
3024
3025 =cut
3026
3027 sub deployment_statements {
3028   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
3029   $type ||= $self->sqlt_type;
3030   $version ||= $schema->schema_version || '1.x';
3031   $dir ||= './';
3032   my $filename = $schema->ddl_filename($type, $version, $dir);
3033   if(-f $filename)
3034   {
3035       # FIXME replace this block when a proper sane sql parser is available
3036       my $file;
3037       open($file, "<$filename")
3038         or $self->throw_exception("Can't open $filename ($!)");
3039       my @rows = <$file>;
3040       close($file);
3041       return join('', @rows);
3042   }
3043
3044   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
3045     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
3046   }
3047
3048   # sources needs to be a parser arg, but for simplicity allow at top level
3049   # coming in
3050   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
3051       if exists $sqltargs->{sources};
3052
3053   $sqltargs->{quote_identifiers} = $self->sql_maker->_quoting_enabled
3054     unless exists $sqltargs->{quote_identifiers};
3055
3056   my $tr = SQL::Translator->new(
3057     producer => "SQL::Translator::Producer::${type}",
3058     %$sqltargs,
3059     parser => 'SQL::Translator::Parser::DBIx::Class',
3060     data => $schema,
3061   );
3062
3063   return preserve_context {
3064     $tr->translate
3065   } after => sub {
3066     $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
3067       unless defined $_[0];
3068   };
3069 }
3070
3071 # FIXME deploy() currently does not accurately report sql errors
3072 # Will always return true while errors are warned
3073 sub deploy {
3074   my ($self, $schema, $type, $sqltargs, $dir) = @_;
3075   my $deploy = sub {
3076     my $line = shift;
3077     return if(!$line);
3078     return if($line =~ /^--/);
3079     # next if($line =~ /^DROP/m);
3080     return if($line =~ /^BEGIN TRANSACTION/m);
3081     return if($line =~ /^COMMIT/m);
3082     return if $line =~ /^\s+$/; # skip whitespace only
3083     $self->_query_start($line);
3084     try {
3085       # do a dbh_do cycle here, as we need some error checking in
3086       # place (even though we will ignore errors)
3087       $self->dbh_do (sub { $_[1]->do($line) });
3088     } catch {
3089       carp qq{$_ (running "${line}")};
3090     };
3091     $self->_query_end($line);
3092   };
3093   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
3094   if (@statements > 1) {
3095     foreach my $statement (@statements) {
3096       $deploy->( $statement );
3097     }
3098   }
3099   elsif (@statements == 1) {
3100     # split on single line comments and end of statements
3101     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
3102       $deploy->( $line );
3103     }
3104   }
3105 }
3106
3107 =head2 datetime_parser
3108
3109 Returns the datetime parser class
3110
3111 =cut
3112
3113 sub datetime_parser {
3114   my $self = shift;
3115   return $self->{datetime_parser} ||= do {
3116     $self->build_datetime_parser(@_);
3117   };
3118 }
3119
3120 =head2 datetime_parser_type
3121
3122 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
3123
3124 =head2 build_datetime_parser
3125
3126 See L</datetime_parser>
3127
3128 =cut
3129
3130 sub build_datetime_parser {
3131   my $self = shift;
3132   my $type = $self->datetime_parser_type(@_);
3133   return $type;
3134 }
3135
3136
3137 =head2 is_replicating
3138
3139 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
3140 replicate from a master database.  Default is undef, which is the result
3141 returned by databases that don't support replication.
3142
3143 =cut
3144
3145 sub is_replicating {
3146     return;
3147
3148 }
3149
3150 =head2 lag_behind_master
3151
3152 Returns a number that represents a certain amount of lag behind a master db
3153 when a given storage is replicating.  The number is database dependent, but
3154 starts at zero and increases with the amount of lag. Default in undef
3155
3156 =cut
3157
3158 sub lag_behind_master {
3159     return;
3160 }
3161
3162 =head2 relname_to_table_alias
3163
3164 =over 4
3165
3166 =item Arguments: $relname, $join_count
3167
3168 =item Return Value: $alias
3169
3170 =back
3171
3172 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
3173 queries.
3174
3175 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
3176 way these aliases are named.
3177
3178 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
3179 otherwise C<"$relname">.
3180
3181 =cut
3182
3183 sub relname_to_table_alias {
3184   my ($self, $relname, $join_count) = @_;
3185
3186   my $alias = ($join_count && $join_count > 1 ?
3187     join('_', $relname, $join_count) : $relname);
3188
3189   return $alias;
3190 }
3191
3192 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
3193 # version and it may be necessary to amend or override it for a specific storage
3194 # if such binds are necessary.
3195 sub _max_column_bytesize {
3196   my ($self, $attr) = @_;
3197
3198   my $max_size;
3199
3200   if ($attr->{sqlt_datatype}) {
3201     my $data_type = lc($attr->{sqlt_datatype});
3202
3203     if ($attr->{sqlt_size}) {
3204
3205       # String/sized-binary types
3206       if ($data_type =~ /^(?:
3207           l? (?:var)? char(?:acter)? (?:\s*varying)?
3208             |
3209           (?:var)? binary (?:\s*varying)?
3210             |
3211           raw
3212         )\b/x
3213       ) {
3214         $max_size = $attr->{sqlt_size};
3215       }
3216       # Other charset/unicode types, assume scale of 4
3217       elsif ($data_type =~ /^(?:
3218           national \s* character (?:\s*varying)?
3219             |
3220           nchar
3221             |
3222           univarchar
3223             |
3224           nvarchar
3225         )\b/x
3226       ) {
3227         $max_size = $attr->{sqlt_size} * 4;
3228       }
3229     }
3230
3231     if (!$max_size and !$self->_is_lob_type($data_type)) {
3232       $max_size = 100 # for all other (numeric?) datatypes
3233     }
3234   }
3235
3236   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
3237 }
3238
3239 # Determine if a data_type is some type of BLOB
3240 sub _is_lob_type {
3241   my ($self, $data_type) = @_;
3242   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
3243     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
3244                                   |varchar|character\s*varying|nvarchar
3245                                   |national\s*character\s*varying))?\z/xi);
3246 }
3247
3248 sub _is_binary_lob_type {
3249   my ($self, $data_type) = @_;
3250   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
3251     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
3252 }
3253
3254 sub _is_text_lob_type {
3255   my ($self, $data_type) = @_;
3256   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
3257     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
3258                         |national\s*character\s*varying))\z/xi);
3259 }
3260
3261 # Determine if a data_type is some type of a binary type
3262 sub _is_binary_type {
3263   my ($self, $data_type) = @_;
3264   $data_type && ($self->_is_binary_lob_type($data_type)
3265     || $data_type =~ /(?:var)?(?:binary|bit|graphic)(?:\s*varying)?/i);
3266 }
3267
3268 1;
3269
3270 =head1 USAGE NOTES
3271
3272 =head2 DBIx::Class and AutoCommit
3273
3274 DBIx::Class can do some wonderful magic with handling exceptions,
3275 disconnections, and transactions when you use C<< AutoCommit => 1 >>
3276 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
3277 transaction support.
3278
3279 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
3280 in an assumed transaction between commits, and you're telling us you'd
3281 like to manage that manually.  A lot of the magic protections offered by
3282 this module will go away.  We can't protect you from exceptions due to database
3283 disconnects because we don't know anything about how to restart your
3284 transactions.  You're on your own for handling all sorts of exceptional
3285 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
3286 be with raw DBI.
3287
3288
3289 =head1 AUTHOR AND CONTRIBUTORS
3290
3291 See L<AUTHOR|DBIx::Class/AUTHOR> and L<CONTRIBUTORS|DBIx::Class/CONTRIBUTORS> in DBIx::Class
3292
3293 =head1 LICENSE
3294
3295 You may distribute this code under the same terms as Perl itself.
3296
3297 =cut